Skip to main content

moonpool_sim/runner/
tokio.rs

1//! Tokio-based runner for executing workloads with real networking and timing.
2//!
3//! This module provides TokioRunner, which executes the same workloads as SimulationBuilder
4//! but using real Tokio implementations instead of simulated ones. This validates that
5//! workloads behave correctly in real-world conditions.
6
7use std::fmt;
8use std::future::Future;
9use std::pin::Pin;
10use std::time::{Duration, Instant};
11
12use crate::{SimulationResult, TokioNetworkProvider, TokioTaskProvider, TokioTimeProvider};
13
14use super::report::SimulationMetrics;
15use super::topology::WorkloadTopology;
16
17/// Type alias for Tokio workload function signature.
18type TokioWorkloadFn = Box<
19    dyn Fn(
20        TokioNetworkProvider,
21        TokioTimeProvider,
22        TokioTaskProvider,
23        WorkloadTopology,
24    ) -> Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>,
25>;
26
27/// A registered workload that can be executed with real Tokio providers.
28pub struct TokioWorkload {
29    name: String,
30    ip_address: String,
31    workload: TokioWorkloadFn,
32}
33
34impl fmt::Debug for TokioWorkload {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        f.debug_struct("TokioWorkload")
37            .field("name", &self.name)
38            .field("ip_address", &self.ip_address)
39            .field("workload", &"<closure>")
40            .finish()
41    }
42}
43
44/// Report generated after running workloads with TokioRunner.
45#[derive(Debug, Clone)]
46pub struct TokioReport {
47    /// Results from each workload
48    pub workload_results: Vec<(String, SimulationResult<SimulationMetrics>)>,
49    /// Total wall-clock time for execution
50    pub total_wall_time: Duration,
51    /// Number of successful workloads
52    pub successful: usize,
53    /// Number of failed workloads
54    pub failed: usize,
55}
56
57impl TokioReport {
58    /// Calculate the success rate as a percentage.
59    pub fn success_rate(&self) -> f64 {
60        let total = self.successful + self.failed;
61        if total == 0 {
62            0.0
63        } else {
64            (self.successful as f64 / total as f64) * 100.0
65        }
66    }
67}
68
69impl fmt::Display for TokioReport {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        writeln!(f, "=== Tokio Execution Report ===")?;
72        writeln!(f, "Total Workloads: {}", self.successful + self.failed)?;
73        writeln!(f, "Successful: {}", self.successful)?;
74        writeln!(f, "Failed: {}", self.failed)?;
75        writeln!(f, "Success Rate: {:.2}%", self.success_rate())?;
76        writeln!(f, "Total Wall Time: {:?}", self.total_wall_time)?;
77        writeln!(f)?;
78
79        for (name, result) in &self.workload_results {
80            match result {
81                Ok(_) => writeln!(f, "✅ {}: SUCCESS", name)?,
82                Err(e) => writeln!(f, "❌ {}: FAILED - {:?}", name, e)?,
83            }
84        }
85
86        Ok(())
87    }
88}
89
90/// Builder for executing workloads with real Tokio implementations.
91#[derive(Debug)]
92pub struct TokioRunner {
93    workloads: Vec<TokioWorkload>,
94    next_port: u16, // For auto-assigning ports starting from 9001
95}
96
97impl Default for TokioRunner {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl TokioRunner {
104    /// Create a new TokioRunner.
105    pub fn new() -> Self {
106        Self {
107            workloads: Vec::new(),
108            next_port: 9001, // Start from port 9001
109        }
110    }
111
112    /// Register a workload with the runner.
113    ///
114    /// # Arguments
115    /// * `name` - Name for the workload (for reporting purposes)
116    /// * `workload` - Async function that takes Tokio providers and topology and returns metrics
117    pub fn register_workload<S, F, Fut>(mut self, name: S, workload: F) -> Self
118    where
119        S: Into<String>,
120        F: Fn(TokioNetworkProvider, TokioTimeProvider, TokioTaskProvider, WorkloadTopology) -> Fut
121            + 'static,
122        Fut: Future<Output = SimulationResult<SimulationMetrics>> + 'static,
123    {
124        // Auto-assign IP address with sequential port
125        let ip_address = format!("127.0.0.1:{}", self.next_port);
126        self.next_port += 1;
127
128        let boxed_workload = Box::new(move |provider, time_provider, task_provider, topology| {
129            let fut = workload(provider, time_provider, task_provider, topology);
130            Box::pin(fut) as Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>
131        });
132
133        self.workloads.push(TokioWorkload {
134            name: name.into(),
135            ip_address,
136            workload: boxed_workload,
137        });
138        self
139    }
140
141    /// Run all registered workloads and generate a report.
142    pub async fn run(self) -> TokioReport {
143        if self.workloads.is_empty() {
144            return TokioReport {
145                workload_results: Vec::new(),
146                total_wall_time: Duration::ZERO,
147                successful: 0,
148                failed: 0,
149            };
150        }
151
152        let start_time = Instant::now();
153
154        // Create shutdown signal for this execution
155        let shutdown_signal = tokio_util::sync::CancellationToken::new();
156
157        // Build topology information for each workload
158        let all_ips: Vec<String> = self
159            .workloads
160            .iter()
161            .map(|w| w.ip_address.clone())
162            .collect();
163
164        let mut workload_results = Vec::new();
165        let mut successful = 0;
166        let mut failed = 0;
167
168        // Execute workloads
169        if self.workloads.len() == 1 {
170            // Single workload - execute directly
171            let workload = &self.workloads[0];
172            let my_ip = workload.ip_address.clone();
173            let peer_ips = all_ips.iter().filter(|ip| *ip != &my_ip).cloned().collect();
174            let peer_names = self
175                .workloads
176                .iter()
177                .filter(|w| w.ip_address != my_ip)
178                .map(|w| w.name.clone())
179                .collect();
180            let topology = WorkloadTopology {
181                my_ip,
182                client_id: 0,
183                client_count: 1,
184                peer_ips,
185                peer_names,
186                process_ips: Vec::new(),
187                my_tags: crate::runner::tags::ProcessTags::default(),
188                tag_registry: crate::runner::tags::TagRegistry::default(),
189                shutdown_signal: shutdown_signal.clone(),
190            };
191
192            let provider = TokioNetworkProvider::new();
193            let time_provider = TokioTimeProvider::new();
194            let task_provider = TokioTaskProvider;
195
196            let result =
197                (workload.workload)(provider, time_provider, task_provider, topology).await;
198
199            // For single workload, trigger shutdown signal if successful
200            if result.is_ok() {
201                shutdown_signal.cancel();
202            }
203
204            match result {
205                Ok(_) => successful += 1,
206                Err(_) => failed += 1,
207            }
208            workload_results.push((workload.name.clone(), result));
209        } else {
210            // Multiple workloads - spawn them and run concurrently
211            let mut handles = Vec::new();
212
213            for workload in &self.workloads {
214                let my_ip = workload.ip_address.clone();
215                let peer_ips = all_ips.iter().filter(|ip| *ip != &my_ip).cloned().collect();
216                let peer_names = self
217                    .workloads
218                    .iter()
219                    .filter(|w| w.ip_address != my_ip)
220                    .map(|w| w.name.clone())
221                    .collect();
222                let topology = WorkloadTopology {
223                    my_ip,
224                    client_id: 0,
225                    client_count: 1,
226                    peer_ips,
227                    peer_names,
228                    process_ips: Vec::new(),
229                    my_tags: crate::runner::tags::ProcessTags::default(),
230                    tag_registry: crate::runner::tags::TagRegistry::default(),
231                    shutdown_signal: shutdown_signal.clone(),
232                };
233
234                let provider = TokioNetworkProvider::new();
235                let time_provider = TokioTimeProvider::new();
236                let task_provider = TokioTaskProvider;
237
238                let handle = tokio::task::spawn_local((workload.workload)(
239                    provider,
240                    time_provider,
241                    task_provider,
242                    topology,
243                ));
244                handles.push((workload.name.clone(), handle));
245            }
246
247            // Convert handles to boxed futures with names
248            let mut pending_futures: Vec<_> = handles
249                .into_iter()
250                .map(|(name, handle)| {
251                    Box::pin(async move {
252                        let result = handle.await;
253                        (name, result)
254                    })
255                })
256                .collect();
257
258            let mut first_success_triggered = false;
259
260            // Wait for all workloads to complete, triggering shutdown on first success
261            while !pending_futures.is_empty() {
262                let (completed_result, _index, remaining_futures) =
263                    futures::future::select_all(pending_futures).await;
264
265                pending_futures = remaining_futures;
266
267                let (name, handle_result) = completed_result;
268                let result = match handle_result {
269                    Ok(workload_result) => workload_result,
270                    Err(_) => Err(crate::SimulationError::InvalidState(
271                        "Task panicked".to_string(),
272                    )),
273                };
274
275                // If this is the first successful workload, trigger shutdown signal
276                if !first_success_triggered && result.is_ok() {
277                    tracing::debug!(
278                        "TokioRunner: Workload '{}' completed successfully, triggering shutdown",
279                        name
280                    );
281                    shutdown_signal.cancel();
282                    first_success_triggered = true;
283                }
284
285                match result {
286                    Ok(_) => successful += 1,
287                    Err(_) => failed += 1,
288                }
289                workload_results.push((name, result));
290            }
291        }
292
293        let total_wall_time = start_time.elapsed();
294
295        TokioReport {
296            workload_results,
297            total_wall_time,
298            successful,
299            failed,
300        }
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    #[test]
309    fn test_tokio_runner_empty() {
310        let local_runtime = tokio::runtime::Builder::new_current_thread()
311            .enable_io()
312            .enable_time()
313            .build_local(Default::default())
314            .expect("Failed to build local runtime");
315
316        let report = local_runtime.block_on(async { TokioRunner::new().run().await });
317
318        assert_eq!(report.successful, 0);
319        assert_eq!(report.failed, 0);
320        assert_eq!(report.success_rate(), 0.0);
321    }
322
323    #[test]
324    fn test_tokio_runner_single_workload() {
325        let local_runtime = tokio::runtime::Builder::new_current_thread()
326            .enable_io()
327            .enable_time()
328            .build_local(Default::default())
329            .expect("Failed to build local runtime");
330
331        let report = local_runtime.block_on(async {
332            TokioRunner::new()
333                .register_workload(
334                    "test_workload",
335                    |_provider, _time_provider, _task_provider, _topology| async {
336                        Ok(SimulationMetrics::default())
337                    },
338                )
339                .run()
340                .await
341        });
342
343        assert_eq!(report.successful, 1);
344        assert_eq!(report.failed, 0);
345        assert_eq!(report.success_rate(), 100.0);
346        assert!(report.total_wall_time > Duration::ZERO);
347    }
348
349    #[test]
350    fn test_tokio_runner_multiple_workloads() {
351        let local_runtime = tokio::runtime::Builder::new_current_thread()
352            .enable_io()
353            .enable_time()
354            .build_local(Default::default())
355            .expect("Failed to build local runtime");
356
357        let report = local_runtime.block_on(async {
358            TokioRunner::new()
359                .register_workload(
360                    "workload1",
361                    |_provider, _time_provider, _task_provider, _topology| async {
362                        Ok(SimulationMetrics::default())
363                    },
364                )
365                .register_workload(
366                    "workload2",
367                    |_provider, _time_provider, _task_provider, _topology| async {
368                        Ok(SimulationMetrics::default())
369                    },
370                )
371                .run()
372                .await
373        });
374
375        assert_eq!(report.successful, 2);
376        assert_eq!(report.failed, 0);
377        assert_eq!(report.success_rate(), 100.0);
378    }
379}