Skip to main content

ferridriver_test/
dispatcher.rs

1//! Test dispatcher: MPMC work queue supporting parallel and serial suites.
2//!
3//! Parallel tests go to a shared channel (natural load balancing).
4//! Serial suites go as batches — one worker picks up the entire suite.
5
6use std::sync::Arc;
7
8use crate::model::{Hooks, SuiteMode, TestCase, TestFn, TestId};
9
10/// A single test assigned to a worker.
11pub struct TestAssignment {
12  pub test: TestCase,
13  pub attempt: u32,
14  /// Suite key for hook tracking (e.g. "file.rs::suite_name").
15  pub suite_key: String,
16  /// Shared hooks for this test's suite.
17  pub hooks: Arc<Hooks>,
18  /// Suite execution mode.
19  pub suite_mode: SuiteMode,
20}
21
22/// A batch of serial tests — all run on one worker, in order.
23pub struct SerialBatch {
24  pub suite_key: String,
25  pub assignments: Vec<TestAssignment>,
26  pub hooks: Arc<Hooks>,
27}
28
29/// Work item pulled by a worker.
30pub enum WorkItem {
31  /// Single parallel test.
32  Single(TestAssignment),
33  /// Batch of serial tests (one worker runs all, in order).
34  Serial(SerialBatch),
35}
36
37/// Dispatch strategy: parallel tests via shared MPMC channel,
38/// serial suites as batches on the same channel.
39pub struct Dispatcher {
40  tx: async_channel::Sender<WorkItem>,
41  rx: async_channel::Receiver<WorkItem>,
42  /// Hard-stop signal. When set, workers must drop their current `recv()`
43  /// outcome rather than processing it — `close()` alone keeps already-buffered
44  /// items in the queue, which means `--max-failures N` and `-x` would still
45  /// run every test that was enqueued before the threshold tripped.
46  stopped: Arc<std::sync::atomic::AtomicBool>,
47}
48
49impl Dispatcher {
50  pub fn new() -> Self {
51    let (tx, rx) = async_channel::unbounded();
52    Self {
53      tx,
54      rx,
55      stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)),
56    }
57  }
58
59  /// Enqueue a single parallel test.
60  pub fn enqueue_single(&self, assignment: TestAssignment) {
61    let _ = self.tx.try_send(WorkItem::Single(assignment));
62  }
63
64  /// Enqueue an entire serial suite as a batch.
65  pub fn enqueue_serial(&self, batch: SerialBatch) {
66    let _ = self.tx.try_send(WorkItem::Serial(batch));
67  }
68
69  /// Re-enqueue a test for retry (always as single item).
70  pub fn retry_shared(
71    &self,
72    test_fn: &TestFn,
73    id: &TestId,
74    fixture_requests: Vec<String>,
75    attempt: u32,
76    suite_key: String,
77    hooks: Arc<Hooks>,
78  ) {
79    let assignment = TestAssignment {
80      test: TestCase {
81        id: id.clone(),
82        test_fn: Arc::clone(test_fn),
83        fixture_requests,
84        annotations: Vec::new(),
85        timeout: None,
86        retries: None,
87        expected_status: crate::model::ExpectedStatus::Pass,
88        use_options: None,
89      },
90      attempt,
91      suite_key,
92      hooks,
93      suite_mode: SuiteMode::Parallel,
94    };
95    let _ = self.tx.try_send(WorkItem::Single(assignment));
96  }
97
98  /// Get a receiver clone for a worker.
99  pub fn receiver(&self) -> async_channel::Receiver<WorkItem> {
100    self.rx.clone()
101  }
102
103  /// Signal no more work will be enqueued.
104  pub fn close(&self) {
105    self.tx.close();
106  }
107
108  /// Trip the hard-stop signal so workers drop any remaining queued items
109  /// instead of processing them. Used by `--max-failures` / `-x`.
110  pub fn stop(&self) {
111    self.stopped.store(true, std::sync::atomic::Ordering::SeqCst);
112    self.tx.close();
113  }
114
115  /// Worker-side check: should the next item be processed or dropped?
116  pub fn is_stopped(&self) -> bool {
117    self.stopped.load(std::sync::atomic::Ordering::SeqCst)
118  }
119
120  /// Worker-side handle to the same atomic flag (cheap clone of the Arc).
121  pub fn stop_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
122    Arc::clone(&self.stopped)
123  }
124}
125
126impl Default for Dispatcher {
127  fn default() -> Self {
128    Self::new()
129  }
130}