Skip to main content

turmoil_net/fixture/
client_server.rs

1//! Multi-host client/server fixture.
2
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use tokio::task::LocalSet;
8use tokio::time::sleep;
9
10use crate::fixture::{Scheduler, TICK};
11use crate::{HostId, KernelConfig, Net, ToIpAddrs};
12
13type BoxFut = Pin<Box<dyn Future<Output = ()>>>;
14
15/// Multi-host fixture with N servers plus one client. All host
16/// futures run on a single [`LocalSet`] driven by a single paused
17/// tokio runtime.
18pub struct ClientServer {
19    net: Net,
20    servers: Vec<(HostId, BoxFut)>,
21}
22
23impl ClientServer {
24    pub fn new() -> Self {
25        Self::with_config(KernelConfig::default())
26    }
27
28    /// Construct with a custom `KernelConfig` applied to every host
29    /// added later. Use for tests that need to tweak MTU, buffer caps,
30    /// retx thresholds, etc.
31    pub fn with_config(cfg: KernelConfig) -> Self {
32        Self {
33            net: Net::with_config(cfg),
34            servers: Vec::new(),
35        }
36    }
37
38    /// Register a server. `addrs` accepts hostnames or literal IPs;
39    /// loopback is implicit. `fut` runs inside that host's scope —
40    /// every `sys()` call from its socket operations sees this host
41    /// as `current`.
42    pub fn server<A, F>(mut self, addrs: A, fut: F) -> Self
43    where
44        A: ToIpAddrs,
45        F: Future<Output = ()> + 'static,
46    {
47        let id = self.net.add_host(addrs);
48        self.servers.push((id, Box::pin(fut)));
49        self
50    }
51
52    /// Run the fixture with `fut` as the client. Every server is
53    /// driven in parallel; the fixture returns `fut`'s output as
54    /// soon as it resolves.
55    pub fn run<A, T, F>(self, addrs: A, fut: F) -> T
56    where
57        A: ToIpAddrs,
58        F: Future<Output = T> + 'static,
59        T: 'static,
60    {
61        let Self { mut net, servers } = self;
62        assert!(
63            !servers.is_empty(),
64            "ClientServer needs at least one server — use fixture::lo for single-host tests"
65        );
66        let client_id = net.add_host(addrs);
67        let guard = net.enter();
68
69        let rt = tokio::runtime::Builder::new_current_thread()
70            .enable_time()
71            .start_paused(true)
72            .build()
73            .expect("build current_thread runtime");
74
75        let guard_ref = &guard;
76        let result = rt.block_on(async move {
77            let set = LocalSet::new();
78            for (id, fut) in servers {
79                set.spawn_local(HostScoped { id, inner: fut });
80            }
81            let client_handle = set.spawn_local(HostScoped {
82                id: client_id,
83                inner: Box::pin(fut),
84            });
85
86            let mut scheduler = Scheduler::new();
87            loop {
88                // Single LocalSet drain per iter → tokio clock += TICK.
89                set.run_until(sleep(TICK)).await;
90                // Scheduler drains host egress, applies rules, delivers
91                // scheduled packets. Sim clock += TICK, matching tokio.
92                scheduler.tick(guard_ref, TICK);
93
94                if client_handle.is_finished() {
95                    break client_handle.await.unwrap();
96                }
97            }
98        });
99        drop(guard);
100        result
101    }
102}
103
104impl Default for ClientServer {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110/// Wraps a host's future so every poll pins the thread-local
111/// `current` to that host before the inner future runs. Without
112/// this, tasks on a shared `LocalSet` would see whichever host was
113/// set last — `sys()` lookups would land in the wrong kernel.
114struct HostScoped<F> {
115    id: HostId,
116    inner: F,
117}
118
119impl<F> Future for HostScoped<F>
120where
121    F: Future + Unpin,
122{
123    type Output = F::Output;
124
125    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126        crate::set_current(self.id);
127        Pin::new(&mut self.inner).poll(cx)
128    }
129}