netsim_embed_machine/
lib.rs

1//! Small embeddable network simulator.
2
3macro_rules! errno {
4    ($res:expr) => {{
5        let res = $res;
6        if res < 0 {
7            Err(io::Error::last_os_error())
8        } else {
9            Ok(res)
10        }
11    }};
12}
13
14pub mod iface;
15mod namespace;
16
17pub use namespace::{unshare_user, Namespace};
18
19use async_process::Command;
20use async_std::future::timeout;
21use futures::{
22    channel::{mpsc, oneshot},
23    future::{FusedFuture, FutureExt},
24    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
25    sink::SinkExt,
26    stream::{FusedStream, StreamExt},
27};
28use netsim_embed_core::{Ipv4Range, Packet, Plug};
29use std::{
30    collections::VecDeque,
31    fmt::{self, Display},
32    future::{pending, poll_fn},
33    io::{Error, ErrorKind, Result, Write},
34    net::Ipv4Addr,
35    process::Stdio,
36    str::FromStr,
37    task::Poll,
38    thread,
39    time::Duration,
40};
41
42#[derive(Debug)]
43enum IfaceCtrl {
44    Up,
45    Down,
46    SetAddr(Ipv4Addr, u8, oneshot::Sender<()>),
47    Exit,
48}
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
51pub struct MachineId(pub usize);
52
53impl fmt::Display for MachineId {
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55        write!(f, "Machine-#{}", self.0)
56    }
57}
58
59/// Spawns a thread in a new network namespace and configures a TUN interface that sends and
60/// receives IP packets from the tx/rx channels and runs some UDP/TCP networking code in task.
61#[derive(Debug)]
62pub struct Machine<C, E> {
63    id: MachineId,
64    addr: Ipv4Addr,
65    mask: u8,
66    ns: Namespace,
67    ctrl: mpsc::UnboundedSender<IfaceCtrl>,
68    tx: mpsc::UnboundedSender<C>,
69    rx: mpsc::UnboundedReceiver<E>,
70    join: Option<thread::JoinHandle<Result<()>>>,
71    buffer: VecDeque<E>,
72}
73
74impl<C, E> Machine<C, E>
75where
76    C: Display + Send + 'static,
77    E: FromStr + Display + Send + 'static,
78    E::Err: std::fmt::Debug + Display + Send + Sync,
79{
80    pub async fn new(id: MachineId, plug: Plug, cmd: Command) -> Self {
81        let (ctrl_tx, ctrl_rx) = mpsc::unbounded();
82        let (cmd_tx, cmd_rx) = mpsc::unbounded();
83        let (event_tx, event_rx) = mpsc::unbounded();
84        let (ns_tx, ns_rx) = oneshot::channel();
85        let join = machine(id, plug, cmd, ctrl_rx, ns_tx, cmd_rx, event_tx);
86        let ns = ns_rx.await.unwrap();
87        Self {
88            id,
89            addr: Ipv4Addr::UNSPECIFIED,
90            mask: 32,
91            ns,
92            ctrl: ctrl_tx,
93            tx: cmd_tx,
94            rx: event_rx,
95            join: Some(join),
96            buffer: VecDeque::new(),
97        }
98    }
99}
100
101impl<C, E> Machine<C, E> {
102    pub fn id(&self) -> MachineId {
103        self.id
104    }
105
106    pub fn addr(&self) -> Ipv4Addr {
107        self.addr
108    }
109
110    pub fn mask(&self) -> u8 {
111        self.mask
112    }
113
114    pub async fn set_addr(&mut self, addr: Ipv4Addr, mask: u8) {
115        let (tx, rx) = oneshot::channel();
116        self.ctrl
117            .unbounded_send(IfaceCtrl::SetAddr(addr, mask, tx))
118            .unwrap();
119        rx.await.unwrap();
120        self.addr = addr;
121        self.mask = mask;
122    }
123
124    pub fn send(&self, cmd: C) {
125        self.tx.unbounded_send(cmd).unwrap();
126    }
127
128    pub fn drain(&mut self) -> Vec<E> {
129        let mut res = self.buffer.drain(..).collect::<Vec<_>>();
130        if !self.rx.is_terminated() {
131            while let Ok(Some(x)) = self.rx.try_next() {
132                res.push(x);
133            }
134        }
135        res
136    }
137
138    pub fn up(&self) {
139        self.ctrl.unbounded_send(IfaceCtrl::Up).unwrap();
140    }
141
142    pub fn down(&self) {
143        self.ctrl.unbounded_send(IfaceCtrl::Down).unwrap();
144    }
145
146    pub fn namespace(&self) -> Namespace {
147        self.ns
148    }
149
150    pub async fn recv(&mut self) -> Option<E> {
151        if let Some(ev) = self.buffer.pop_front() {
152            Some(ev)
153        } else {
154            self.rx.next().await
155        }
156    }
157
158    pub async fn select<F, T>(&mut self, mut f: F) -> Option<T>
159    where
160        F: FnMut(&E) -> Option<T>,
161    {
162        if let Some((idx, res)) = self
163            .buffer
164            .iter()
165            .enumerate()
166            .find_map(|(idx, ev)| f(ev).map(|x| (idx, x)))
167        {
168            self.buffer.remove(idx);
169            return Some(res);
170        }
171        loop {
172            match self.rx.next().await {
173                Some(ev) => {
174                    if let Some(res) = f(&ev) {
175                        return Some(res);
176                    } else {
177                        self.buffer.push_back(ev);
178                    }
179                }
180                None => return None,
181            }
182        }
183    }
184
185    pub async fn select_draining<F, T>(&mut self, mut f: F) -> Option<T>
186    where
187        F: FnMut(E) -> Option<T>,
188    {
189        while let Some(ev) = self.buffer.pop_front() {
190            if let Some(res) = f(ev) {
191                return Some(res);
192            }
193        }
194        loop {
195            match self.rx.next().await {
196                Some(ev) => {
197                    if let Some(res) = f(ev) {
198                        return Some(res);
199                    }
200                }
201                None => return None,
202            }
203        }
204    }
205
206    pub fn drain_matching<F: FnMut(&E) -> bool>(&mut self, mut f: F) -> Vec<E> {
207        let mut ret = Vec::new();
208        for e in std::mem::take(&mut self.buffer) {
209            if f(&e) {
210                ret.push(e);
211            } else {
212                self.buffer.push_back(e);
213            }
214        }
215        ret
216    }
217}
218
219impl<C, E> Drop for Machine<C, E> {
220    fn drop(&mut self) {
221        self.ctrl.unbounded_send(IfaceCtrl::Exit).ok();
222        self.join.take().unwrap().join().unwrap().unwrap();
223    }
224}
225
226fn abbrev<T: Display>(t: &T, limit: usize, cut_len: usize) -> String {
227    use std::fmt::Write;
228    struct S(String, usize);
229    impl Write for S {
230        fn write_str(&mut self, s: &str) -> fmt::Result {
231            self.1 += s.len();
232            let mut bytes = (self.0.capacity() - self.0.len()).min(s.len());
233            while !s.is_char_boundary(bytes) {
234                bytes -= 1;
235            }
236            self.0.push_str(&s[..bytes]);
237            Ok(())
238        }
239    }
240    let mut writer = S(String::with_capacity(limit), 0);
241    write!(writer, "{t}").unwrap();
242    let S(mut result, length) = writer;
243    if length > limit {
244        let mut cut = cut_len;
245        while !result.is_char_boundary(cut) {
246            cut -= 1;
247        }
248        result.truncate(cut);
249        write!(&mut result, "... ({length} bytes)").unwrap();
250    }
251    result
252}
253
254#[allow(clippy::too_many_arguments)]
255fn machine<C, E>(
256    id: MachineId,
257    plug: Plug,
258    mut bin: Command,
259    mut ctrl: mpsc::UnboundedReceiver<IfaceCtrl>,
260    ns_tx: oneshot::Sender<Namespace>,
261    mut cmd: mpsc::UnboundedReceiver<C>,
262    event: mpsc::UnboundedSender<E>,
263) -> thread::JoinHandle<Result<()>>
264where
265    C: Display + Send + 'static,
266    E: FromStr + Display + Send + 'static,
267    E::Err: std::fmt::Debug + Display + Send + Sync,
268{
269    thread::spawn(move || {
270        let ns = Namespace::unshare()?;
271
272        let res = async_global_executor::block_on(async move {
273            let iface = iface::Iface::new()?;
274            let iface = async_io::Async::new(iface)?;
275            let (mut tx, mut rx) = plug.split();
276
277            let ctrl_task = async {
278                while let Some(ctrl) = ctrl.next().await {
279                    log::debug!("{} CTRL {:?}", id, ctrl);
280                    match ctrl {
281                        IfaceCtrl::Up => iface.get_ref().put_up()?,
282                        IfaceCtrl::Down => iface.get_ref().put_down()?,
283                        IfaceCtrl::SetAddr(addr, mask, tx) => {
284                            iface.get_ref().set_ipv4_addr(addr, mask)?;
285                            iface.get_ref().put_up()?;
286                            iface.get_ref().add_ipv4_route(Ipv4Range::global().into())?;
287                            tx.send(()).ok();
288                        }
289                        IfaceCtrl::Exit => {
290                            break;
291                        }
292                    }
293                }
294                log::info!("{} (ctrl): closed", id);
295                Result::Ok(())
296            }
297            .fuse();
298            futures::pin_mut!(ctrl_task);
299
300            let reader_task = async {
301                loop {
302                    let mut buf = [0; libc::ETH_FRAME_LEN as usize];
303                    let n = iface.read_with(|iface| iface.recv(&mut buf)).await?;
304                    if n == 0 {
305                        break;
306                    }
307                    // drop ipv6 packets
308                    if buf[0] >> 4 != 4 {
309                        continue;
310                    }
311                    log::trace!("{} (reader): sending packet", id);
312                    let mut bytes = buf[..n].to_vec();
313                    if let Some(mut packet) = Packet::new(&mut bytes) {
314                        packet.set_checksum();
315                    }
316                    if tx.send(bytes).await.is_err() {
317                        break;
318                    }
319                }
320                log::info!("{} (reader): closed", id);
321                Result::Ok(())
322            }
323            .fuse();
324            futures::pin_mut!(reader_task);
325
326            let writer_task = async {
327                while let Some(packet) = rx.next().await {
328                    log::trace!("{} (writer): received packet", id);
329                    // can error if the interface is down
330                    if let Ok(n) = iface.write_with(|iface| iface.send(&packet)).await {
331                        if n == 0 {
332                            break;
333                        }
334                    }
335                }
336                log::info!("{} (writer): closed", id);
337                Result::Ok(())
338            }
339            .fuse();
340            futures::pin_mut!(writer_task);
341
342            bin.stdin(Stdio::piped())
343                .stdout(Stdio::piped())
344                .stderr(Stdio::piped());
345            let mut child = bin.spawn().map_err(|e| {
346                log::error!("cannot start machine {:?}: {}", bin, e);
347                e
348            })?;
349            let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines().fuse();
350            let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines().fuse();
351            let mut stdin = child.stdin.take().unwrap();
352
353            let command_task = async {
354                let mut buf = Vec::with_capacity(4096);
355                while let Some(cmd) = cmd.next().await {
356                    buf.clear();
357                    log::debug!("{} {}", id, abbrev(&cmd, 2000, 80));
358                    writeln!(buf, "{cmd}")?;
359                    stdin.write_all(&buf).await?;
360                }
361                log::info!("{} (command): closed", id);
362                Result::Ok(())
363            }
364            .fuse();
365            futures::pin_mut!(command_task);
366
367            let event_task = async {
368                while let Some(ev) = stdout.next().await {
369                    let ev = ev?;
370                    if ev.starts_with('<') {
371                        let ev = match E::from_str(&ev) {
372                            Ok(ev) => ev,
373                            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
374                        };
375                        log::debug!("{} {}", id, abbrev(&ev, 2000, 80));
376                        if event.unbounded_send(ev).is_err() {
377                            break;
378                        }
379                    } else {
380                        println!("{id} (stdout): {ev}");
381                    }
382                }
383                log::info!("{} (stdout): closed", id);
384                Result::Ok(())
385            }
386            .fuse();
387            futures::pin_mut!(event_task);
388
389            let stderr_task = async {
390                while let Some(ev) = stderr.next().await {
391                    let ev = ev?;
392                    println!("{id} (stderr): {ev}");
393                }
394                log::info!("{} (stderr): closed", id);
395                Result::Ok(())
396            }
397            .fuse();
398            futures::pin_mut!(stderr_task);
399
400            // unblock here so that possible exec error has a chance to get out
401            let _ = ns_tx.send(ns);
402
403            futures::select! {
404                res = ctrl_task => res?,
405                res = reader_task => res?,
406                res = writer_task => res?,
407                res = command_task => res?,
408                res = event_task => res?,
409                res = stderr_task => res?,
410            };
411            log::info!("{} killing", id);
412            child.kill()?;
413            let deadline = timeout(Duration::from_secs(3), pending::<()>());
414            futures::pin_mut!(deadline);
415            let mut event_task = (!event_task.is_terminated()).then_some(event_task);
416            let mut stderr_task = (!stderr_task.is_terminated()).then_some(stderr_task);
417            poll_fn(|cx| {
418                if deadline.poll_unpin(cx).is_ready() {
419                    log::warn!(
420                        "{} ev: {} err: {}",
421                        id,
422                        event_task.is_some(),
423                        stderr_task.is_some()
424                    );
425                    return Poll::Ready(Err(ErrorKind::TimedOut.into()));
426                }
427                match (&mut event_task, &mut stderr_task) {
428                    (None, None) => return Poll::Ready(Ok(())),
429                    (None, Some(err)) => return err.poll_unpin(cx),
430                    (Some(ev), None) => return ev.poll_unpin(cx),
431                    (Some(ev), Some(err)) => {
432                        let ev = ev.poll_unpin(cx);
433                        let err = err.poll_unpin(cx);
434                        match (ev, err) {
435                            (Poll::Ready(Err(e)), _) => return Poll::Ready(Err(e)),
436                            (_, Poll::Ready(Err(e))) => return Poll::Ready(Err(e)),
437                            (Poll::Ready(Ok(_)), Poll::Ready(Ok(_))) => return Poll::Ready(Ok(())),
438                            (Poll::Ready(Ok(_)), _) => event_task = None,
439                            (_, Poll::Ready(Ok(_))) => stderr_task = None,
440                            _ => {}
441                        }
442                    }
443                }
444                Poll::Pending
445            })
446            .await?;
447            Ok(())
448        });
449        log::info!("{}'s event loop yielded with {:?}", id, res);
450        res
451    })
452}