Skip to main content

libcontainer/process/
channel.rs

1use std::collections::HashMap;
2use std::os::unix::prelude::{AsRawFd, RawFd};
3
4use nix::unistd::Pid;
5
6use crate::channel::{Receiver, Sender, channel};
7use crate::network::cidr::CidrAddress;
8use crate::process::message::Message;
9
10#[derive(Debug, thiserror::Error)]
11pub enum ChannelError {
12    #[error("received unexpected message: {received:?}, expected: {expected:?}")]
13    UnexpectedMessage {
14        expected: Message,
15        received: Message,
16    },
17    #[error("failed to receive. {msg:?}. {source:?}")]
18    ReceiveError {
19        msg: String,
20        #[source]
21        source: crate::channel::ChannelError,
22    },
23    #[error(transparent)]
24    BaseChannelError(#[from] crate::channel::ChannelError),
25    #[error("missing fds from seccomp request")]
26    MissingSeccompFds,
27    #[error("exec process failed with error {0}")]
28    ExecError(String),
29    #[error("intermediate process error {0}")]
30    OtherError(String),
31}
32
33// Channel Design
34//
35// Each of the main, intermediate, and init process will have a uni-directional
36// channel, a sender and a receiver. Each process will hold the receiver and
37// listen message on it. Each sender is shared between each process to send
38// message to the corresponding receiver. For example, main_sender and
39// main_receiver is used for the main process. The main process will use
40// receiver to receive all message sent to the main process. The other
41// processes will share the main_sender and use it to send message to the main
42// process.
43
44pub fn main_channel() -> Result<(MainSender, MainReceiver), ChannelError> {
45    let (sender, receiver) = channel::<Message>()?;
46    Ok((MainSender { sender }, MainReceiver { receiver }))
47}
48
49pub struct MainSender {
50    sender: Sender<Message>,
51}
52
53impl MainSender {
54    // requests the Main to write the id mappings for the intermediate process
55    // this needs to be done from the parent see https://man7.org/linux/man-pages/man7/user_namespaces.7.html
56    pub fn identifier_mapping_request(&mut self) -> Result<(), ChannelError> {
57        tracing::debug!("send identifier mapping request");
58        self.sender.send(Message::WriteMapping)?;
59
60        Ok(())
61    }
62
63    pub fn seccomp_notify_request(&mut self, fd: RawFd) -> Result<(), ChannelError> {
64        self.sender
65            .send_fds(Message::SeccompNotify, &[fd.as_raw_fd()])?;
66
67        Ok(())
68    }
69
70    pub fn network_setup_ready(&mut self) -> Result<(), ChannelError> {
71        tracing::debug!("notify network setup ready");
72        self.sender.send(Message::SetupNetworkDeviceReady)?;
73
74        Ok(())
75    }
76
77    pub fn intermediate_ready(&mut self, pid: Pid) -> Result<(), ChannelError> {
78        // Send over the IntermediateReady follow by the pid.
79        tracing::debug!("sending init pid ({:?})", pid);
80        self.sender.send(Message::IntermediateReady(pid.as_raw()))?;
81
82        Ok(())
83    }
84
85    pub fn init_ready(&mut self) -> Result<(), ChannelError> {
86        self.sender.send(Message::InitReady)?;
87
88        Ok(())
89    }
90
91    pub fn exec_failed(&mut self, err: String) -> Result<(), ChannelError> {
92        self.sender.send(Message::ExecFailed(err))?;
93        Ok(())
94    }
95
96    pub fn send_error(&mut self, err: String) -> Result<(), ChannelError> {
97        self.sender.send(Message::OtherError(err))?;
98        Ok(())
99    }
100
101    pub fn hook_request(&mut self) -> Result<(), ChannelError> {
102        self.sender.send(Message::HookRequest)?;
103        Ok(())
104    }
105
106    pub fn close(&self) -> Result<(), ChannelError> {
107        self.sender.close()?;
108
109        Ok(())
110    }
111}
112
113pub struct MainReceiver {
114    receiver: Receiver<Message>,
115}
116
117impl MainReceiver {
118    /// Waits for associated intermediate process to send ready message
119    /// and return the pid of init process which is forked by intermediate process
120    pub fn wait_for_intermediate_ready(&mut self) -> Result<Pid, ChannelError> {
121        let msg = self
122            .receiver
123            .recv()
124            .map_err(|err| ChannelError::ReceiveError {
125                msg: "waiting for intermediate process".to_string(),
126                source: err,
127            })?;
128
129        match msg {
130            Message::IntermediateReady(pid) => Ok(Pid::from_raw(pid)),
131            Message::ExecFailed(err) => Err(ChannelError::ExecError(err)),
132            Message::OtherError(err) => Err(ChannelError::OtherError(err)),
133            msg => Err(ChannelError::UnexpectedMessage {
134                expected: Message::IntermediateReady(0),
135                received: msg,
136            }),
137        }
138    }
139
140    pub fn wait_for_mapping_request(&mut self) -> Result<(), ChannelError> {
141        let msg = self
142            .receiver
143            .recv()
144            .map_err(|err| ChannelError::ReceiveError {
145                msg: "waiting for mapping request".to_string(),
146                source: err,
147            })?;
148        match msg {
149            Message::WriteMapping => Ok(()),
150            msg => Err(ChannelError::UnexpectedMessage {
151                expected: Message::WriteMapping,
152                received: msg,
153            }),
154        }
155    }
156
157    pub fn wait_for_seccomp_request(&mut self) -> Result<i32, ChannelError> {
158        let (msg, fds) = self.receiver.recv_with_fds::<[RawFd; 1]>().map_err(|err| {
159            ChannelError::ReceiveError {
160                msg: "waiting for seccomp request".to_string(),
161                source: err,
162            }
163        })?;
164
165        match msg {
166            Message::SeccompNotify => {
167                let fd = match fds {
168                    Some(fds) => {
169                        if fds.is_empty() {
170                            Err(ChannelError::MissingSeccompFds)
171                        } else {
172                            Ok(fds[0])
173                        }
174                    }
175                    None => Err(ChannelError::MissingSeccompFds),
176                }?;
177                Ok(fd)
178            }
179            msg => Err(ChannelError::UnexpectedMessage {
180                expected: Message::SeccompNotify,
181                received: msg,
182            }),
183        }
184    }
185
186    pub fn wait_for_network_setup_ready(&mut self) -> Result<(), ChannelError> {
187        let msg = self
188            .receiver
189            .recv()
190            .map_err(|err| ChannelError::ReceiveError {
191                msg: "waiting for init ready".to_string(),
192                source: err,
193            })?;
194        match msg {
195            Message::SetupNetworkDeviceReady => Ok(()),
196            msg => Err(ChannelError::UnexpectedMessage {
197                expected: Message::SetupNetworkDeviceReady,
198                received: msg,
199            }),
200        }
201    }
202
203    /// Waits for associated init process to send ready message
204    /// and return the pid of init process which is forked by init process
205    pub fn wait_for_init_ready(&mut self) -> Result<(), ChannelError> {
206        let msg = self
207            .receiver
208            .recv()
209            .map_err(|err| ChannelError::ReceiveError {
210                msg: "waiting for init ready".to_string(),
211                source: err,
212            })?;
213        match msg {
214            Message::InitReady => Ok(()),
215            // this case in unique and known enough to have a special error format
216            Message::ExecFailed(err) => Err(ChannelError::ExecError(format!(
217                "error in executing process : {err}"
218            ))),
219            msg => Err(ChannelError::UnexpectedMessage {
220                expected: Message::InitReady,
221                received: msg,
222            }),
223        }
224    }
225
226    pub fn wait_for_hook_request(&mut self) -> Result<(), ChannelError> {
227        let msg = self
228            .receiver
229            .recv()
230            .map_err(|err| ChannelError::ReceiveError {
231                msg: "waiting for hook request".to_string(),
232                source: err,
233            })?;
234        match msg {
235            Message::HookRequest => Ok(()),
236            msg => Err(ChannelError::UnexpectedMessage {
237                expected: Message::HookRequest,
238                received: msg,
239            }),
240        }
241    }
242
243    pub fn close(&self) -> Result<(), ChannelError> {
244        self.receiver.close()?;
245
246        Ok(())
247    }
248}
249
250pub fn intermediate_channel() -> Result<(IntermediateSender, IntermediateReceiver), ChannelError> {
251    let (sender, receiver) = channel::<Message>()?;
252    Ok((
253        IntermediateSender { sender },
254        IntermediateReceiver { receiver },
255    ))
256}
257
258pub struct IntermediateSender {
259    sender: Sender<Message>,
260}
261
262impl IntermediateSender {
263    pub fn mapping_written(&mut self) -> Result<(), ChannelError> {
264        tracing::debug!("identifier mapping written");
265        self.sender.send(Message::MappingWritten)?;
266
267        Ok(())
268    }
269
270    pub fn close(&self) -> Result<(), ChannelError> {
271        self.sender.close()?;
272
273        Ok(())
274    }
275}
276
277pub struct IntermediateReceiver {
278    receiver: Receiver<Message>,
279}
280
281impl IntermediateReceiver {
282    // wait until the parent process has finished writing the id mappings
283    pub fn wait_for_mapping_ack(&mut self) -> Result<(), ChannelError> {
284        tracing::debug!("waiting for mapping ack");
285        let msg = self
286            .receiver
287            .recv()
288            .map_err(|err| ChannelError::ReceiveError {
289                msg: "waiting for mapping ack".to_string(),
290                source: err,
291            })?;
292        match msg {
293            Message::MappingWritten => Ok(()),
294            msg => Err(ChannelError::UnexpectedMessage {
295                expected: Message::MappingWritten,
296                received: msg,
297            }),
298        }
299    }
300
301    pub fn close(&self) -> Result<(), ChannelError> {
302        self.receiver.close()?;
303
304        Ok(())
305    }
306}
307
308pub fn init_channel() -> Result<(InitSender, InitReceiver), ChannelError> {
309    let (sender, receiver) = channel::<Message>()?;
310    Ok((InitSender { sender }, InitReceiver { receiver }))
311}
312
313pub struct InitSender {
314    sender: Sender<Message>,
315}
316
317impl InitSender {
318    pub fn seccomp_notify_done(&mut self) -> Result<(), ChannelError> {
319        self.sender.send(Message::SeccompNotifyDone)?;
320
321        Ok(())
322    }
323
324    pub fn hook_done(&mut self) -> Result<(), ChannelError> {
325        self.sender.send(Message::HookDone)?;
326        Ok(())
327    }
328
329    pub fn move_network_device(
330        &mut self,
331        addrs: HashMap<String, Vec<CidrAddress>>,
332    ) -> Result<(), ChannelError> {
333        self.sender.send(Message::MoveNetworkDevice(addrs))?;
334
335        Ok(())
336    }
337
338    pub fn close(&self) -> Result<(), ChannelError> {
339        self.sender.close()?;
340
341        Ok(())
342    }
343}
344
345pub struct InitReceiver {
346    receiver: Receiver<Message>,
347}
348
349impl InitReceiver {
350    pub fn wait_for_seccomp_request_done(&mut self) -> Result<(), ChannelError> {
351        let msg = self
352            .receiver
353            .recv()
354            .map_err(|err| ChannelError::ReceiveError {
355                msg: "waiting for seccomp request".to_string(),
356                source: err,
357            })?;
358
359        match msg {
360            Message::SeccompNotifyDone => Ok(()),
361            msg => Err(ChannelError::UnexpectedMessage {
362                expected: Message::SeccompNotifyDone,
363                received: msg,
364            }),
365        }
366    }
367
368    pub fn wait_for_move_network_device(
369        &mut self,
370    ) -> Result<HashMap<String, Vec<CidrAddress>>, ChannelError> {
371        let msg = self
372            .receiver
373            .recv()
374            .map_err(|err| ChannelError::ReceiveError {
375                msg: "waiting for mapping request".to_string(),
376                source: err,
377            })?;
378        match msg {
379            Message::MoveNetworkDevice(addr) => Ok(addr),
380            msg => Err(ChannelError::UnexpectedMessage {
381                expected: Message::WriteMapping,
382                received: msg,
383            }),
384        }
385    }
386
387    pub fn wait_for_hook_request_done(&mut self) -> Result<(), ChannelError> {
388        let msg = self
389            .receiver
390            .recv()
391            .map_err(|err| ChannelError::ReceiveError {
392                msg: "waiting for hook done".to_string(),
393                source: err,
394            })?;
395        match msg {
396            Message::HookDone => Ok(()),
397            msg => Err(ChannelError::UnexpectedMessage {
398                expected: Message::HookDone,
399                received: msg,
400            }),
401        }
402    }
403
404    pub fn close(&self) -> Result<(), ChannelError> {
405        self.receiver.close()?;
406
407        Ok(())
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use anyhow::{Context, Result};
414    use nix::sys::wait;
415    use nix::unistd;
416    use serial_test::serial;
417
418    use super::*;
419
420    // Note: due to cargo test by default runs tests in parallel using a single
421    // process, these tests should not be running in parallel with other tests.
422    // Because we run tests in the same process, other tests may decide to close
423    // down file descriptors or saturate the IOs in the OS.  The channel uses
424    // pipe to communicate and can potentially become flaky as a result. There
425    // is not much else we can do other than to run the tests in serial.
426
427    #[test]
428    #[serial]
429    fn test_channel_intermadiate_ready() -> Result<()> {
430        let (sender, receiver) = &mut main_channel()?;
431        match unsafe { unistd::fork()? } {
432            unistd::ForkResult::Parent { child } => {
433                wait::waitpid(child, None)?;
434                let pid = receiver
435                    .wait_for_intermediate_ready()
436                    .with_context(|| "Failed to wait for intermadiate ready")?;
437                receiver.close()?;
438                assert_eq!(pid, child);
439            }
440            unistd::ForkResult::Child => {
441                let pid = unistd::getpid();
442                sender.intermediate_ready(pid)?;
443                sender.close()?;
444                std::process::exit(0);
445            }
446        };
447
448        Ok(())
449    }
450
451    #[test]
452    #[serial]
453    fn test_channel_id_mapping_request() -> Result<()> {
454        let (sender, receiver) = &mut main_channel()?;
455        match unsafe { unistd::fork()? } {
456            unistd::ForkResult::Parent { child } => {
457                wait::waitpid(child, None)?;
458                receiver.wait_for_mapping_request()?;
459                receiver.close()?;
460            }
461            unistd::ForkResult::Child => {
462                sender
463                    .identifier_mapping_request()
464                    .with_context(|| "Failed to send mapping written")?;
465                sender.close()?;
466                std::process::exit(0);
467            }
468        };
469
470        Ok(())
471    }
472
473    #[test]
474    #[serial]
475    fn test_channel_id_mapping_ack() -> Result<()> {
476        let (sender, receiver) = &mut intermediate_channel()?;
477        match unsafe { unistd::fork()? } {
478            unistd::ForkResult::Parent { child } => {
479                wait::waitpid(child, None)?;
480                receiver.wait_for_mapping_ack()?;
481            }
482            unistd::ForkResult::Child => {
483                sender
484                    .mapping_written()
485                    .with_context(|| "Failed to send mapping written")?;
486                std::process::exit(0);
487            }
488        };
489
490        Ok(())
491    }
492
493    #[test]
494    #[serial]
495    fn test_channel_init_ready() -> Result<()> {
496        let (sender, receiver) = &mut main_channel()?;
497        match unsafe { unistd::fork()? } {
498            unistd::ForkResult::Parent { child } => {
499                wait::waitpid(child, None)?;
500                receiver.wait_for_init_ready()?;
501                receiver.close()?;
502            }
503            unistd::ForkResult::Child => {
504                sender
505                    .init_ready()
506                    .with_context(|| "Failed to send init ready")?;
507                sender.close()?;
508                std::process::exit(0);
509            }
510        };
511
512        Ok(())
513    }
514
515    #[test]
516    #[serial]
517    fn test_channel_main_graceful_exit() -> Result<()> {
518        let (sender, receiver) = &mut main_channel()?;
519        match unsafe { unistd::fork()? } {
520            unistd::ForkResult::Parent { child } => {
521                sender.close().context("failed to close sender")?;
522                // The child process will exit without send the intermediate ready
523                // message. This should cause the wait_for_intermediate_ready to error
524                // out, instead of keep blocking.
525                let ret = receiver.wait_for_intermediate_ready();
526                assert!(ret.is_err());
527                wait::waitpid(child, None)?;
528            }
529            unistd::ForkResult::Child => {
530                receiver.close()?;
531                std::process::exit(0);
532            }
533        };
534
535        Ok(())
536    }
537
538    #[test]
539    #[serial]
540    fn test_channel_intermediate_graceful_exit() -> Result<()> {
541        let (sender, receiver) = &mut main_channel()?;
542        match unsafe { unistd::fork()? } {
543            unistd::ForkResult::Parent { child } => {
544                sender.close().context("failed to close sender")?;
545                // The child process will exit without send the init ready
546                // message. This should cause the wait_for_init_ready to error
547                // out, instead of keep blocking.
548                let ret = receiver.wait_for_init_ready();
549                assert!(ret.is_err());
550                wait::waitpid(child, None)?;
551            }
552            unistd::ForkResult::Child => {
553                receiver.close()?;
554                std::process::exit(0);
555            }
556        };
557
558        Ok(())
559    }
560
561    #[test]
562    #[serial]
563    fn test_move_network_device_message() -> Result<()> {
564        use crate::network::cidr::CidrAddress;
565
566        let device_name = "dummy".to_string();
567        let ip = "10.0.0.1".parse().unwrap();
568        let addr = CidrAddress {
569            prefix_len: 24,
570            address: ip,
571        };
572        let mut addrs = HashMap::new();
573        addrs.insert(device_name.clone(), vec![addr.clone()]);
574
575        let (sender, receiver) = &mut init_channel()?;
576
577        match unsafe { unistd::fork()? } {
578            unistd::ForkResult::Parent { child } => {
579                sender.move_network_device(addrs)?;
580                sender.close().context("failed to close sender")?;
581                let status = wait::waitpid(child, None)?;
582                if let nix::sys::wait::WaitStatus::Exited(_, code) = status {
583                    assert_eq!(code, 0, "Child process failed assertions");
584                } else {
585                    panic!("Child did not exit normally: {:?}", status);
586                }
587            }
588            unistd::ForkResult::Child => {
589                let received_addrs = receiver.wait_for_move_network_device()?;
590                receiver.close()?;
591                if let Some(received_addr) = received_addrs.get(&device_name) {
592                    if !(received_addr[0].prefix_len == addr.prefix_len
593                        && received_addr[0].address == addr.address)
594                    {
595                        eprintln!("assertion failed in child");
596                        std::process::exit(1);
597                    }
598                } else {
599                    eprintln!("assertion failed in child");
600                    std::process::exit(1);
601                }
602                std::process::exit(0);
603            }
604        };
605
606        Ok(())
607    }
608
609    #[test]
610    #[serial]
611    fn test_network_setup_ready() -> Result<()> {
612        let (sender, receiver) = &mut main_channel()?;
613        match unsafe { unistd::fork()? } {
614            unistd::ForkResult::Parent { child } => {
615                wait::waitpid(child, None)?;
616                receiver.wait_for_network_setup_ready()?;
617                receiver.close()?;
618            }
619            unistd::ForkResult::Child => {
620                sender
621                    .network_setup_ready()
622                    .with_context(|| "Failed to send network setup ready")?;
623                sender.close()?;
624                std::process::exit(0);
625            }
626        };
627
628        Ok(())
629    }
630}