turmoil_net/fixture/
client_server.rs1use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use tokio::task::LocalSet;
8use tokio::time::sleep;
9
10use crate::fixture::{Scheduler, TICK};
11use crate::{HostId, KernelConfig, Net, ToIpAddrs};
12
13type BoxFut = Pin<Box<dyn Future<Output = ()>>>;
14
15pub struct ClientServer {
19 net: Net,
20 servers: Vec<(HostId, BoxFut)>,
21}
22
23impl ClientServer {
24 pub fn new() -> Self {
25 Self::with_config(KernelConfig::default())
26 }
27
28 pub fn with_config(cfg: KernelConfig) -> Self {
32 Self {
33 net: Net::with_config(cfg),
34 servers: Vec::new(),
35 }
36 }
37
38 pub fn server<A, F>(mut self, addrs: A, fut: F) -> Self
43 where
44 A: ToIpAddrs,
45 F: Future<Output = ()> + 'static,
46 {
47 let id = self.net.add_host(addrs);
48 self.servers.push((id, Box::pin(fut)));
49 self
50 }
51
52 pub fn run<A, T, F>(self, addrs: A, fut: F) -> T
56 where
57 A: ToIpAddrs,
58 F: Future<Output = T> + 'static,
59 T: 'static,
60 {
61 let Self { mut net, servers } = self;
62 assert!(
63 !servers.is_empty(),
64 "ClientServer needs at least one server — use fixture::lo for single-host tests"
65 );
66 let client_id = net.add_host(addrs);
67 let guard = net.enter();
68
69 let rt = tokio::runtime::Builder::new_current_thread()
70 .enable_time()
71 .start_paused(true)
72 .build()
73 .expect("build current_thread runtime");
74
75 let guard_ref = &guard;
76 let result = rt.block_on(async move {
77 let set = LocalSet::new();
78 for (id, fut) in servers {
79 set.spawn_local(HostScoped { id, inner: fut });
80 }
81 let client_handle = set.spawn_local(HostScoped {
82 id: client_id,
83 inner: Box::pin(fut),
84 });
85
86 let mut scheduler = Scheduler::new();
87 loop {
88 set.run_until(sleep(TICK)).await;
90 scheduler.tick(guard_ref, TICK);
93
94 if client_handle.is_finished() {
95 break client_handle.await.unwrap();
96 }
97 }
98 });
99 drop(guard);
100 result
101 }
102}
103
104impl Default for ClientServer {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110struct HostScoped<F> {
115 id: HostId,
116 inner: F,
117}
118
119impl<F> Future for HostScoped<F>
120where
121 F: Future + Unpin,
122{
123 type Output = F::Output;
124
125 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126 crate::set_current(self.id);
127 Pin::new(&mut self.inner).poll(cx)
128 }
129}