1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! Test dispatcher: MPMC work queue supporting parallel and serial suites.
//!
//! Parallel tests go to a shared channel (natural load balancing).
//! Serial suites go as batches — one worker picks up the entire suite.
use std::sync::Arc;
use crate::model::{Hooks, SuiteMode, TestCase, TestFn, TestId};
/// A single test assigned to a worker.
pub struct TestAssignment {
pub test: TestCase,
pub attempt: u32,
/// Suite key for hook tracking (e.g. "file.rs::suite_name").
pub suite_key: String,
/// Shared hooks for this test's suite.
pub hooks: Arc<Hooks>,
/// Suite execution mode.
pub suite_mode: SuiteMode,
}
/// A batch of serial tests — all run on one worker, in order.
pub struct SerialBatch {
pub suite_key: String,
pub assignments: Vec<TestAssignment>,
pub hooks: Arc<Hooks>,
}
/// Work item pulled by a worker.
pub enum WorkItem {
/// Single parallel test.
Single(TestAssignment),
/// Batch of serial tests (one worker runs all, in order).
Serial(SerialBatch),
}
/// Dispatch strategy: parallel tests via shared MPMC channel,
/// serial suites as batches on the same channel.
pub struct Dispatcher {
tx: async_channel::Sender<WorkItem>,
rx: async_channel::Receiver<WorkItem>,
/// Hard-stop signal. When set, workers must drop their current `recv()`
/// outcome rather than processing it — `close()` alone keeps already-buffered
/// items in the queue, which means `--max-failures N` and `-x` would still
/// run every test that was enqueued before the threshold tripped.
stopped: Arc<std::sync::atomic::AtomicBool>,
}
impl Dispatcher {
pub fn new() -> Self {
let (tx, rx) = async_channel::unbounded();
Self {
tx,
rx,
stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
/// Enqueue a single parallel test.
pub fn enqueue_single(&self, assignment: TestAssignment) {
let _ = self.tx.try_send(WorkItem::Single(assignment));
}
/// Enqueue an entire serial suite as a batch.
pub fn enqueue_serial(&self, batch: SerialBatch) {
let _ = self.tx.try_send(WorkItem::Serial(batch));
}
/// Re-enqueue a test for retry (always as single item).
pub fn retry_shared(
&self,
test_fn: &TestFn,
id: &TestId,
fixture_requests: Vec<String>,
attempt: u32,
suite_key: String,
hooks: Arc<Hooks>,
) {
let assignment = TestAssignment {
test: TestCase {
id: id.clone(),
test_fn: Arc::clone(test_fn),
fixture_requests,
annotations: Vec::new(),
timeout: None,
retries: None,
expected_status: crate::model::ExpectedStatus::Pass,
use_options: None,
},
attempt,
suite_key,
hooks,
suite_mode: SuiteMode::Parallel,
};
let _ = self.tx.try_send(WorkItem::Single(assignment));
}
/// Get a receiver clone for a worker.
pub fn receiver(&self) -> async_channel::Receiver<WorkItem> {
self.rx.clone()
}
/// Signal no more work will be enqueued.
pub fn close(&self) {
self.tx.close();
}
/// Trip the hard-stop signal so workers drop any remaining queued items
/// instead of processing them. Used by `--max-failures` / `-x`.
pub fn stop(&self) {
self.stopped.store(true, std::sync::atomic::Ordering::SeqCst);
self.tx.close();
}
/// Worker-side check: should the next item be processed or dropped?
pub fn is_stopped(&self) -> bool {
self.stopped.load(std::sync::atomic::Ordering::SeqCst)
}
/// Worker-side handle to the same atomic flag (cheap clone of the Arc).
pub fn stop_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
Arc::clone(&self.stopped)
}
}
impl Default for Dispatcher {
fn default() -> Self {
Self::new()
}
}