1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
//! Small embeddable network simulator.

macro_rules! errno {
    ($res:expr) => {{
        let res = $res;
        if res < 0 {
            Err(io::Error::last_os_error())
        } else {
            Ok(res)
        }
    }};
}

pub mod iface;
pub mod namespace;
//#[cfg(feature = "tokio2")]
//pub mod tokio;

use futures::future::Future;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use netsim_embed_core::{Ipv4Range, Plug};
use std::net::Ipv4Addr;
use std::thread;

/// Spawns a thread in a new network namespace and configures a TUN interface that sends and
/// receives IP packets from the tx/rx channels and runs some UDP/TCP networking code in task.
pub fn machine<F>(addr: Ipv4Addr, mask: u8, plug: Plug, task: F) -> thread::JoinHandle<F::Output>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    thread::spawn(move || {
        namespace::unshare_network().unwrap();

        let create_tun_iface = || {
            let iface = iface::Iface::new().unwrap();
            iface.set_ipv4_addr(addr, mask).unwrap();
            iface.put_up().unwrap();
            iface.add_ipv4_route(Ipv4Range::global().into()).unwrap();

            #[cfg(not(feature = "tokio2"))]
            let iface = smol::Async::new(iface).unwrap();
            #[cfg(feature = "tokio2")]
            let iface = tokio::TokioFd::new(iface).unwrap();

            let (mut tx, mut rx) = plug.split();
            let (mut reader, mut writer) = iface.split();

            let reader_task = async move {
                loop {
                    let mut buf = [0; libc::ETH_FRAME_LEN as usize];
                    let n = reader.read(&mut buf).await.unwrap();
                    if n == 0 {
                        break;
                    }
                    // drop ipv6 packets
                    if buf[0] >> 4 != 4 {
                        continue;
                    }
                    log::debug!("machine {}: sending packet", addr);
                    if tx.send(buf[..n].to_vec()).await.is_err() {
                        break;
                    }
                }
            };

            let writer_task = async move {
                while let Some(packet) = rx.next().await {
                    log::debug!("machine {}: received packet", addr);
                    let n = writer.write(&packet).await.unwrap();
                    if n == 0 {
                        break;
                    }
                }
            };

            (reader_task, writer_task)
        };

        #[cfg(not(feature = "tokio2"))]
        let result = smol::block_on(async move {
            let (reader_task, writer_task) = create_tun_iface();
            smol::spawn(reader_task).detach();
            smol::spawn(writer_task).detach();
            task.await
        });
        #[cfg(feature = "tokio2")]
        let result = {
            let mut rt = ::tokio::runtime::Builder::new()
                .threaded_scheduler()
                .enable_all()
                .build()
                .unwrap();
            rt.block_on(async move {
                let (reader_task, writer_task) = create_tun_iface();
                ::tokio::task::spawn(reader_task);
                ::tokio::task::spawn(writer_task);
                task.await
            })
        };

        result
    })
}