ferridriver_test/
dispatcher.rs1use std::sync::Arc;
7
8use crate::model::{Hooks, SuiteMode, TestCase, TestFn, TestId};
9
10pub struct TestAssignment {
12 pub test: TestCase,
13 pub attempt: u32,
14 pub suite_key: String,
16 pub hooks: Arc<Hooks>,
18 pub suite_mode: SuiteMode,
20}
21
22pub struct SerialBatch {
24 pub suite_key: String,
25 pub assignments: Vec<TestAssignment>,
26 pub hooks: Arc<Hooks>,
27}
28
29pub enum WorkItem {
31 Single(TestAssignment),
33 Serial(SerialBatch),
35}
36
37pub struct Dispatcher {
40 tx: async_channel::Sender<WorkItem>,
41 rx: async_channel::Receiver<WorkItem>,
42 stopped: Arc<std::sync::atomic::AtomicBool>,
47}
48
49impl Dispatcher {
50 pub fn new() -> Self {
51 let (tx, rx) = async_channel::unbounded();
52 Self {
53 tx,
54 rx,
55 stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)),
56 }
57 }
58
59 pub fn enqueue_single(&self, assignment: TestAssignment) {
61 let _ = self.tx.try_send(WorkItem::Single(assignment));
62 }
63
64 pub fn enqueue_serial(&self, batch: SerialBatch) {
66 let _ = self.tx.try_send(WorkItem::Serial(batch));
67 }
68
69 pub fn retry_shared(
71 &self,
72 test_fn: &TestFn,
73 id: &TestId,
74 fixture_requests: Vec<String>,
75 attempt: u32,
76 suite_key: String,
77 hooks: Arc<Hooks>,
78 ) {
79 let assignment = TestAssignment {
80 test: TestCase {
81 id: id.clone(),
82 test_fn: Arc::clone(test_fn),
83 fixture_requests,
84 annotations: Vec::new(),
85 timeout: None,
86 retries: None,
87 expected_status: crate::model::ExpectedStatus::Pass,
88 use_options: None,
89 },
90 attempt,
91 suite_key,
92 hooks,
93 suite_mode: SuiteMode::Parallel,
94 };
95 let _ = self.tx.try_send(WorkItem::Single(assignment));
96 }
97
98 pub fn receiver(&self) -> async_channel::Receiver<WorkItem> {
100 self.rx.clone()
101 }
102
103 pub fn close(&self) {
105 self.tx.close();
106 }
107
108 pub fn stop(&self) {
111 self.stopped.store(true, std::sync::atomic::Ordering::SeqCst);
112 self.tx.close();
113 }
114
115 pub fn is_stopped(&self) -> bool {
117 self.stopped.load(std::sync::atomic::Ordering::SeqCst)
118 }
119
120 pub fn stop_flag(&self) -> Arc<std::sync::atomic::AtomicBool> {
122 Arc::clone(&self.stopped)
123 }
124}
125
126impl Default for Dispatcher {
127 fn default() -> Self {
128 Self::new()
129 }
130}