1use std::fmt;
8use std::future::Future;
9use std::pin::Pin;
10use std::time::{Duration, Instant};
11
12use crate::{SimulationResult, TokioNetworkProvider, TokioTaskProvider, TokioTimeProvider};
13
14use super::report::SimulationMetrics;
15use super::topology::WorkloadTopology;
16
17type TokioWorkloadFn = Box<
19 dyn Fn(
20 TokioNetworkProvider,
21 TokioTimeProvider,
22 TokioTaskProvider,
23 WorkloadTopology,
24 ) -> Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>,
25>;
26
27pub struct TokioWorkload {
29 name: String,
30 ip_address: String,
31 workload: TokioWorkloadFn,
32}
33
34impl fmt::Debug for TokioWorkload {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 f.debug_struct("TokioWorkload")
37 .field("name", &self.name)
38 .field("ip_address", &self.ip_address)
39 .field("workload", &"<closure>")
40 .finish()
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct TokioReport {
47 pub workload_results: Vec<(String, SimulationResult<SimulationMetrics>)>,
49 pub total_wall_time: Duration,
51 pub successful: usize,
53 pub failed: usize,
55}
56
57impl TokioReport {
58 pub fn success_rate(&self) -> f64 {
60 let total = self.successful + self.failed;
61 if total == 0 {
62 0.0
63 } else {
64 (self.successful as f64 / total as f64) * 100.0
65 }
66 }
67}
68
69impl fmt::Display for TokioReport {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 writeln!(f, "=== Tokio Execution Report ===")?;
72 writeln!(f, "Total Workloads: {}", self.successful + self.failed)?;
73 writeln!(f, "Successful: {}", self.successful)?;
74 writeln!(f, "Failed: {}", self.failed)?;
75 writeln!(f, "Success Rate: {:.2}%", self.success_rate())?;
76 writeln!(f, "Total Wall Time: {:?}", self.total_wall_time)?;
77 writeln!(f)?;
78
79 for (name, result) in &self.workload_results {
80 match result {
81 Ok(_) => writeln!(f, "✅ {}: SUCCESS", name)?,
82 Err(e) => writeln!(f, "❌ {}: FAILED - {:?}", name, e)?,
83 }
84 }
85
86 Ok(())
87 }
88}
89
90#[derive(Debug)]
92pub struct TokioRunner {
93 workloads: Vec<TokioWorkload>,
94 next_port: u16, }
96
97impl Default for TokioRunner {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl TokioRunner {
104 pub fn new() -> Self {
106 Self {
107 workloads: Vec::new(),
108 next_port: 9001, }
110 }
111
112 pub fn register_workload<S, F, Fut>(mut self, name: S, workload: F) -> Self
118 where
119 S: Into<String>,
120 F: Fn(TokioNetworkProvider, TokioTimeProvider, TokioTaskProvider, WorkloadTopology) -> Fut
121 + 'static,
122 Fut: Future<Output = SimulationResult<SimulationMetrics>> + 'static,
123 {
124 let ip_address = format!("127.0.0.1:{}", self.next_port);
126 self.next_port += 1;
127
128 let boxed_workload = Box::new(move |provider, time_provider, task_provider, topology| {
129 let fut = workload(provider, time_provider, task_provider, topology);
130 Box::pin(fut) as Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>
131 });
132
133 self.workloads.push(TokioWorkload {
134 name: name.into(),
135 ip_address,
136 workload: boxed_workload,
137 });
138 self
139 }
140
141 pub async fn run(self) -> TokioReport {
143 if self.workloads.is_empty() {
144 return TokioReport {
145 workload_results: Vec::new(),
146 total_wall_time: Duration::ZERO,
147 successful: 0,
148 failed: 0,
149 };
150 }
151
152 let start_time = Instant::now();
153
154 let shutdown_signal = tokio_util::sync::CancellationToken::new();
156
157 let all_ips: Vec<String> = self
159 .workloads
160 .iter()
161 .map(|w| w.ip_address.clone())
162 .collect();
163
164 let mut workload_results = Vec::new();
165 let mut successful = 0;
166 let mut failed = 0;
167
168 if self.workloads.len() == 1 {
170 let workload = &self.workloads[0];
172 let my_ip = workload.ip_address.clone();
173 let peer_ips = all_ips.iter().filter(|ip| *ip != &my_ip).cloned().collect();
174 let peer_names = self
175 .workloads
176 .iter()
177 .filter(|w| w.ip_address != my_ip)
178 .map(|w| w.name.clone())
179 .collect();
180 let topology = WorkloadTopology {
181 my_ip,
182 client_id: 0,
183 client_count: 1,
184 peer_ips,
185 peer_names,
186 process_ips: Vec::new(),
187 my_tags: crate::runner::tags::ProcessTags::default(),
188 tag_registry: crate::runner::tags::TagRegistry::default(),
189 shutdown_signal: shutdown_signal.clone(),
190 };
191
192 let provider = TokioNetworkProvider::new();
193 let time_provider = TokioTimeProvider::new();
194 let task_provider = TokioTaskProvider;
195
196 let result =
197 (workload.workload)(provider, time_provider, task_provider, topology).await;
198
199 if result.is_ok() {
201 shutdown_signal.cancel();
202 }
203
204 match result {
205 Ok(_) => successful += 1,
206 Err(_) => failed += 1,
207 }
208 workload_results.push((workload.name.clone(), result));
209 } else {
210 let mut handles = Vec::new();
212
213 for workload in &self.workloads {
214 let my_ip = workload.ip_address.clone();
215 let peer_ips = all_ips.iter().filter(|ip| *ip != &my_ip).cloned().collect();
216 let peer_names = self
217 .workloads
218 .iter()
219 .filter(|w| w.ip_address != my_ip)
220 .map(|w| w.name.clone())
221 .collect();
222 let topology = WorkloadTopology {
223 my_ip,
224 client_id: 0,
225 client_count: 1,
226 peer_ips,
227 peer_names,
228 process_ips: Vec::new(),
229 my_tags: crate::runner::tags::ProcessTags::default(),
230 tag_registry: crate::runner::tags::TagRegistry::default(),
231 shutdown_signal: shutdown_signal.clone(),
232 };
233
234 let provider = TokioNetworkProvider::new();
235 let time_provider = TokioTimeProvider::new();
236 let task_provider = TokioTaskProvider;
237
238 let handle = tokio::task::spawn_local((workload.workload)(
239 provider,
240 time_provider,
241 task_provider,
242 topology,
243 ));
244 handles.push((workload.name.clone(), handle));
245 }
246
247 let mut pending_futures: Vec<_> = handles
249 .into_iter()
250 .map(|(name, handle)| {
251 Box::pin(async move {
252 let result = handle.await;
253 (name, result)
254 })
255 })
256 .collect();
257
258 let mut first_success_triggered = false;
259
260 while !pending_futures.is_empty() {
262 let (completed_result, _index, remaining_futures) =
263 futures::future::select_all(pending_futures).await;
264
265 pending_futures = remaining_futures;
266
267 let (name, handle_result) = completed_result;
268 let result = match handle_result {
269 Ok(workload_result) => workload_result,
270 Err(_) => Err(crate::SimulationError::InvalidState(
271 "Task panicked".to_string(),
272 )),
273 };
274
275 if !first_success_triggered && result.is_ok() {
277 tracing::debug!(
278 "TokioRunner: Workload '{}' completed successfully, triggering shutdown",
279 name
280 );
281 shutdown_signal.cancel();
282 first_success_triggered = true;
283 }
284
285 match result {
286 Ok(_) => successful += 1,
287 Err(_) => failed += 1,
288 }
289 workload_results.push((name, result));
290 }
291 }
292
293 let total_wall_time = start_time.elapsed();
294
295 TokioReport {
296 workload_results,
297 total_wall_time,
298 successful,
299 failed,
300 }
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 #[test]
309 fn test_tokio_runner_empty() {
310 let local_runtime = tokio::runtime::Builder::new_current_thread()
311 .enable_io()
312 .enable_time()
313 .build_local(Default::default())
314 .expect("Failed to build local runtime");
315
316 let report = local_runtime.block_on(async { TokioRunner::new().run().await });
317
318 assert_eq!(report.successful, 0);
319 assert_eq!(report.failed, 0);
320 assert_eq!(report.success_rate(), 0.0);
321 }
322
323 #[test]
324 fn test_tokio_runner_single_workload() {
325 let local_runtime = tokio::runtime::Builder::new_current_thread()
326 .enable_io()
327 .enable_time()
328 .build_local(Default::default())
329 .expect("Failed to build local runtime");
330
331 let report = local_runtime.block_on(async {
332 TokioRunner::new()
333 .register_workload(
334 "test_workload",
335 |_provider, _time_provider, _task_provider, _topology| async {
336 Ok(SimulationMetrics::default())
337 },
338 )
339 .run()
340 .await
341 });
342
343 assert_eq!(report.successful, 1);
344 assert_eq!(report.failed, 0);
345 assert_eq!(report.success_rate(), 100.0);
346 assert!(report.total_wall_time > Duration::ZERO);
347 }
348
349 #[test]
350 fn test_tokio_runner_multiple_workloads() {
351 let local_runtime = tokio::runtime::Builder::new_current_thread()
352 .enable_io()
353 .enable_time()
354 .build_local(Default::default())
355 .expect("Failed to build local runtime");
356
357 let report = local_runtime.block_on(async {
358 TokioRunner::new()
359 .register_workload(
360 "workload1",
361 |_provider, _time_provider, _task_provider, _topology| async {
362 Ok(SimulationMetrics::default())
363 },
364 )
365 .register_workload(
366 "workload2",
367 |_provider, _time_provider, _task_provider, _topology| async {
368 Ok(SimulationMetrics::default())
369 },
370 )
371 .run()
372 .await
373 });
374
375 assert_eq!(report.successful, 2);
376 assert_eq!(report.failed, 0);
377 assert_eq!(report.success_rate(), 100.0);
378 }
379}