1macro_rules! errno {
4 ($res:expr) => {{
5 let res = $res;
6 if res < 0 {
7 Err(io::Error::last_os_error())
8 } else {
9 Ok(res)
10 }
11 }};
12}
13
14pub mod iface;
15mod namespace;
16
17pub use namespace::{unshare_user, Namespace};
18
19use async_process::Command;
20use async_std::future::timeout;
21use futures::{
22 channel::{mpsc, oneshot},
23 future::{FusedFuture, FutureExt},
24 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
25 sink::SinkExt,
26 stream::{FusedStream, StreamExt},
27};
28use netsim_embed_core::{Ipv4Range, Packet, Plug};
29use std::{
30 collections::VecDeque,
31 fmt::{self, Display},
32 future::{pending, poll_fn},
33 io::{Error, ErrorKind, Result, Write},
34 net::Ipv4Addr,
35 process::Stdio,
36 str::FromStr,
37 task::Poll,
38 thread,
39 time::Duration,
40};
41
42#[derive(Debug)]
43enum IfaceCtrl {
44 Up,
45 Down,
46 SetAddr(Ipv4Addr, u8, oneshot::Sender<()>),
47 Exit,
48}
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
51pub struct MachineId(pub usize);
52
53impl fmt::Display for MachineId {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 write!(f, "Machine-#{}", self.0)
56 }
57}
58
59#[derive(Debug)]
62pub struct Machine<C, E> {
63 id: MachineId,
64 addr: Ipv4Addr,
65 mask: u8,
66 ns: Namespace,
67 ctrl: mpsc::UnboundedSender<IfaceCtrl>,
68 tx: mpsc::UnboundedSender<C>,
69 rx: mpsc::UnboundedReceiver<E>,
70 join: Option<thread::JoinHandle<Result<()>>>,
71 buffer: VecDeque<E>,
72}
73
74impl<C, E> Machine<C, E>
75where
76 C: Display + Send + 'static,
77 E: FromStr + Display + Send + 'static,
78 E::Err: std::fmt::Debug + Display + Send + Sync,
79{
80 pub async fn new(id: MachineId, plug: Plug, cmd: Command) -> Self {
81 let (ctrl_tx, ctrl_rx) = mpsc::unbounded();
82 let (cmd_tx, cmd_rx) = mpsc::unbounded();
83 let (event_tx, event_rx) = mpsc::unbounded();
84 let (ns_tx, ns_rx) = oneshot::channel();
85 let join = machine(id, plug, cmd, ctrl_rx, ns_tx, cmd_rx, event_tx);
86 let ns = ns_rx.await.unwrap();
87 Self {
88 id,
89 addr: Ipv4Addr::UNSPECIFIED,
90 mask: 32,
91 ns,
92 ctrl: ctrl_tx,
93 tx: cmd_tx,
94 rx: event_rx,
95 join: Some(join),
96 buffer: VecDeque::new(),
97 }
98 }
99}
100
101impl<C, E> Machine<C, E> {
102 pub fn id(&self) -> MachineId {
103 self.id
104 }
105
106 pub fn addr(&self) -> Ipv4Addr {
107 self.addr
108 }
109
110 pub fn mask(&self) -> u8 {
111 self.mask
112 }
113
114 pub async fn set_addr(&mut self, addr: Ipv4Addr, mask: u8) {
115 let (tx, rx) = oneshot::channel();
116 self.ctrl
117 .unbounded_send(IfaceCtrl::SetAddr(addr, mask, tx))
118 .unwrap();
119 rx.await.unwrap();
120 self.addr = addr;
121 self.mask = mask;
122 }
123
124 pub fn send(&self, cmd: C) {
125 self.tx.unbounded_send(cmd).unwrap();
126 }
127
128 pub fn drain(&mut self) -> Vec<E> {
129 let mut res = self.buffer.drain(..).collect::<Vec<_>>();
130 if !self.rx.is_terminated() {
131 while let Ok(Some(x)) = self.rx.try_next() {
132 res.push(x);
133 }
134 }
135 res
136 }
137
138 pub fn up(&self) {
139 self.ctrl.unbounded_send(IfaceCtrl::Up).unwrap();
140 }
141
142 pub fn down(&self) {
143 self.ctrl.unbounded_send(IfaceCtrl::Down).unwrap();
144 }
145
146 pub fn namespace(&self) -> Namespace {
147 self.ns
148 }
149
150 pub async fn recv(&mut self) -> Option<E> {
151 if let Some(ev) = self.buffer.pop_front() {
152 Some(ev)
153 } else {
154 self.rx.next().await
155 }
156 }
157
158 pub async fn select<F, T>(&mut self, mut f: F) -> Option<T>
159 where
160 F: FnMut(&E) -> Option<T>,
161 {
162 if let Some((idx, res)) = self
163 .buffer
164 .iter()
165 .enumerate()
166 .find_map(|(idx, ev)| f(ev).map(|x| (idx, x)))
167 {
168 self.buffer.remove(idx);
169 return Some(res);
170 }
171 loop {
172 match self.rx.next().await {
173 Some(ev) => {
174 if let Some(res) = f(&ev) {
175 return Some(res);
176 } else {
177 self.buffer.push_back(ev);
178 }
179 }
180 None => return None,
181 }
182 }
183 }
184
185 pub async fn select_draining<F, T>(&mut self, mut f: F) -> Option<T>
186 where
187 F: FnMut(E) -> Option<T>,
188 {
189 while let Some(ev) = self.buffer.pop_front() {
190 if let Some(res) = f(ev) {
191 return Some(res);
192 }
193 }
194 loop {
195 match self.rx.next().await {
196 Some(ev) => {
197 if let Some(res) = f(ev) {
198 return Some(res);
199 }
200 }
201 None => return None,
202 }
203 }
204 }
205
206 pub fn drain_matching<F: FnMut(&E) -> bool>(&mut self, mut f: F) -> Vec<E> {
207 let mut ret = Vec::new();
208 for e in std::mem::take(&mut self.buffer) {
209 if f(&e) {
210 ret.push(e);
211 } else {
212 self.buffer.push_back(e);
213 }
214 }
215 ret
216 }
217}
218
219impl<C, E> Drop for Machine<C, E> {
220 fn drop(&mut self) {
221 self.ctrl.unbounded_send(IfaceCtrl::Exit).ok();
222 self.join.take().unwrap().join().unwrap().unwrap();
223 }
224}
225
226fn abbrev<T: Display>(t: &T, limit: usize, cut_len: usize) -> String {
227 use std::fmt::Write;
228 struct S(String, usize);
229 impl Write for S {
230 fn write_str(&mut self, s: &str) -> fmt::Result {
231 self.1 += s.len();
232 let mut bytes = (self.0.capacity() - self.0.len()).min(s.len());
233 while !s.is_char_boundary(bytes) {
234 bytes -= 1;
235 }
236 self.0.push_str(&s[..bytes]);
237 Ok(())
238 }
239 }
240 let mut writer = S(String::with_capacity(limit), 0);
241 write!(writer, "{t}").unwrap();
242 let S(mut result, length) = writer;
243 if length > limit {
244 let mut cut = cut_len;
245 while !result.is_char_boundary(cut) {
246 cut -= 1;
247 }
248 result.truncate(cut);
249 write!(&mut result, "... ({length} bytes)").unwrap();
250 }
251 result
252}
253
254#[allow(clippy::too_many_arguments)]
255fn machine<C, E>(
256 id: MachineId,
257 plug: Plug,
258 mut bin: Command,
259 mut ctrl: mpsc::UnboundedReceiver<IfaceCtrl>,
260 ns_tx: oneshot::Sender<Namespace>,
261 mut cmd: mpsc::UnboundedReceiver<C>,
262 event: mpsc::UnboundedSender<E>,
263) -> thread::JoinHandle<Result<()>>
264where
265 C: Display + Send + 'static,
266 E: FromStr + Display + Send + 'static,
267 E::Err: std::fmt::Debug + Display + Send + Sync,
268{
269 thread::spawn(move || {
270 let ns = Namespace::unshare()?;
271
272 let res = async_global_executor::block_on(async move {
273 let iface = iface::Iface::new()?;
274 let iface = async_io::Async::new(iface)?;
275 let (mut tx, mut rx) = plug.split();
276
277 let ctrl_task = async {
278 while let Some(ctrl) = ctrl.next().await {
279 log::debug!("{} CTRL {:?}", id, ctrl);
280 match ctrl {
281 IfaceCtrl::Up => iface.get_ref().put_up()?,
282 IfaceCtrl::Down => iface.get_ref().put_down()?,
283 IfaceCtrl::SetAddr(addr, mask, tx) => {
284 iface.get_ref().set_ipv4_addr(addr, mask)?;
285 iface.get_ref().put_up()?;
286 iface.get_ref().add_ipv4_route(Ipv4Range::global().into())?;
287 tx.send(()).ok();
288 }
289 IfaceCtrl::Exit => {
290 break;
291 }
292 }
293 }
294 log::info!("{} (ctrl): closed", id);
295 Result::Ok(())
296 }
297 .fuse();
298 futures::pin_mut!(ctrl_task);
299
300 let reader_task = async {
301 loop {
302 let mut buf = [0; libc::ETH_FRAME_LEN as usize];
303 let n = iface.read_with(|iface| iface.recv(&mut buf)).await?;
304 if n == 0 {
305 break;
306 }
307 if buf[0] >> 4 != 4 {
309 continue;
310 }
311 log::trace!("{} (reader): sending packet", id);
312 let mut bytes = buf[..n].to_vec();
313 if let Some(mut packet) = Packet::new(&mut bytes) {
314 packet.set_checksum();
315 }
316 if tx.send(bytes).await.is_err() {
317 break;
318 }
319 }
320 log::info!("{} (reader): closed", id);
321 Result::Ok(())
322 }
323 .fuse();
324 futures::pin_mut!(reader_task);
325
326 let writer_task = async {
327 while let Some(packet) = rx.next().await {
328 log::trace!("{} (writer): received packet", id);
329 if let Ok(n) = iface.write_with(|iface| iface.send(&packet)).await {
331 if n == 0 {
332 break;
333 }
334 }
335 }
336 log::info!("{} (writer): closed", id);
337 Result::Ok(())
338 }
339 .fuse();
340 futures::pin_mut!(writer_task);
341
342 bin.stdin(Stdio::piped())
343 .stdout(Stdio::piped())
344 .stderr(Stdio::piped());
345 let mut child = bin.spawn().map_err(|e| {
346 log::error!("cannot start machine {:?}: {}", bin, e);
347 e
348 })?;
349 let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines().fuse();
350 let mut stderr = BufReader::new(child.stderr.take().unwrap()).lines().fuse();
351 let mut stdin = child.stdin.take().unwrap();
352
353 let command_task = async {
354 let mut buf = Vec::with_capacity(4096);
355 while let Some(cmd) = cmd.next().await {
356 buf.clear();
357 log::debug!("{} {}", id, abbrev(&cmd, 2000, 80));
358 writeln!(buf, "{cmd}")?;
359 stdin.write_all(&buf).await?;
360 }
361 log::info!("{} (command): closed", id);
362 Result::Ok(())
363 }
364 .fuse();
365 futures::pin_mut!(command_task);
366
367 let event_task = async {
368 while let Some(ev) = stdout.next().await {
369 let ev = ev?;
370 if ev.starts_with('<') {
371 let ev = match E::from_str(&ev) {
372 Ok(ev) => ev,
373 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
374 };
375 log::debug!("{} {}", id, abbrev(&ev, 2000, 80));
376 if event.unbounded_send(ev).is_err() {
377 break;
378 }
379 } else {
380 println!("{id} (stdout): {ev}");
381 }
382 }
383 log::info!("{} (stdout): closed", id);
384 Result::Ok(())
385 }
386 .fuse();
387 futures::pin_mut!(event_task);
388
389 let stderr_task = async {
390 while let Some(ev) = stderr.next().await {
391 let ev = ev?;
392 println!("{id} (stderr): {ev}");
393 }
394 log::info!("{} (stderr): closed", id);
395 Result::Ok(())
396 }
397 .fuse();
398 futures::pin_mut!(stderr_task);
399
400 let _ = ns_tx.send(ns);
402
403 futures::select! {
404 res = ctrl_task => res?,
405 res = reader_task => res?,
406 res = writer_task => res?,
407 res = command_task => res?,
408 res = event_task => res?,
409 res = stderr_task => res?,
410 };
411 log::info!("{} killing", id);
412 child.kill()?;
413 let deadline = timeout(Duration::from_secs(3), pending::<()>());
414 futures::pin_mut!(deadline);
415 let mut event_task = (!event_task.is_terminated()).then_some(event_task);
416 let mut stderr_task = (!stderr_task.is_terminated()).then_some(stderr_task);
417 poll_fn(|cx| {
418 if deadline.poll_unpin(cx).is_ready() {
419 log::warn!(
420 "{} ev: {} err: {}",
421 id,
422 event_task.is_some(),
423 stderr_task.is_some()
424 );
425 return Poll::Ready(Err(ErrorKind::TimedOut.into()));
426 }
427 match (&mut event_task, &mut stderr_task) {
428 (None, None) => return Poll::Ready(Ok(())),
429 (None, Some(err)) => return err.poll_unpin(cx),
430 (Some(ev), None) => return ev.poll_unpin(cx),
431 (Some(ev), Some(err)) => {
432 let ev = ev.poll_unpin(cx);
433 let err = err.poll_unpin(cx);
434 match (ev, err) {
435 (Poll::Ready(Err(e)), _) => return Poll::Ready(Err(e)),
436 (_, Poll::Ready(Err(e))) => return Poll::Ready(Err(e)),
437 (Poll::Ready(Ok(_)), Poll::Ready(Ok(_))) => return Poll::Ready(Ok(())),
438 (Poll::Ready(Ok(_)), _) => event_task = None,
439 (_, Poll::Ready(Ok(_))) => stderr_task = None,
440 _ => {}
441 }
442 }
443 }
444 Poll::Pending
445 })
446 .await?;
447 Ok(())
448 });
449 log::info!("{}'s event loop yielded with {:?}", id, res);
450 res
451 })
452}