transact/scheduler/serial/
execution.rs1use crate::scheduler::ExecutionTask;
22use crate::scheduler::ExecutionTaskCompletionNotification;
23use crate::scheduler::ExecutionTaskCompletionNotifier;
24
25use std::sync::mpsc::{Receiver, Sender};
26
27use super::core::CoreMessage;
28
29pub struct SerialExecutionTaskIterator {
30 tx: Sender<CoreMessage>,
31 rx: Receiver<Option<ExecutionTask>>,
32 is_complete: bool,
33}
34
35impl SerialExecutionTaskIterator {
36 pub fn new(tx: Sender<CoreMessage>, rx: Receiver<Option<ExecutionTask>>) -> Self {
37 SerialExecutionTaskIterator {
38 tx,
39 rx,
40 is_complete: false,
41 }
42 }
43}
44
45impl Iterator for SerialExecutionTaskIterator {
46 type Item = ExecutionTask;
47
48 fn next(&mut self) -> Option<ExecutionTask> {
50 if self.is_complete {
51 debug!(
52 "Execution task iterator already returned `None`; `next` should not be called again"
53 );
54 return None;
55 }
56
57 match self.tx.send(CoreMessage::Next) {
59 Ok(_) => match self.rx.recv() {
60 Ok(task) => {
61 self.is_complete = task.is_none();
62 task
63 }
64 Err(_) => {
65 error!(
66 "Failed to receive next execution task; scheduler shutdown unexpectedly"
67 );
68 self.is_complete = true;
69 None
70 }
71 },
72 Err(_) => {
73 trace!("Scheduler core message receiver dropped; checking if it shutdown properly");
74 match self.rx.recv() {
75 Ok(Some(_)) => error!(
76 "Scheduler sent unexpected execution task before shutting down unexpectedly"
77 ),
78 Ok(None) => {}
80 _ => error!(
81 "Failed to request next execution task; scheduler shutdown unexpectedly"
82 ),
83 }
84 self.is_complete = true;
85 None
86 }
87 }
88 }
89}
90
91#[derive(Clone)]
92pub struct SerialExecutionTaskCompletionNotifier {
93 tx: Sender<CoreMessage>,
94}
95
96impl SerialExecutionTaskCompletionNotifier {
97 pub fn new(tx: Sender<CoreMessage>) -> Self {
98 SerialExecutionTaskCompletionNotifier { tx }
99 }
100}
101
102impl ExecutionTaskCompletionNotifier for SerialExecutionTaskCompletionNotifier {
103 fn notify(&self, notification: ExecutionTaskCompletionNotification) {
104 self.tx
105 .send(CoreMessage::ExecutionResult(notification))
106 .unwrap_or_else(|err| error!("failed to send notification to core: {}", err));
107 }
108
109 fn clone_box(&self) -> Box<dyn ExecutionTaskCompletionNotifier> {
110 Box::new(self.clone())
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 use std::sync::{mpsc::channel, Arc, Mutex};
119
120 use cylinder::{secp256k1::Secp256k1Context, Context, Signer};
121 use log::{set_boxed_logger, set_max_level, Level, LevelFilter, Log, Metadata, Record};
122 use rusty_fork::rusty_fork_test;
123
124 use crate::context::ContextId;
125 use crate::protocol::transaction::{HashMethod, TransactionBuilder};
126
127 rusty_fork_test! {
131 #[test]
140 fn task_iterator_successful() {
141 let logger = init_logger();
142
143 let (core_tx, core_rx) = channel();
144 let (task_tx, task_rx) = channel();
145
146 let join_handle = std::thread::spawn(move || {
147 let mut iter = SerialExecutionTaskIterator::new(core_tx, task_rx);
148 (iter.next(), iter.next())
149 });
150
151 recv_next(&core_rx);
152 task_tx
153 .send(Some(mock_execution_task()))
154 .expect("Failed to send execution task");
155 recv_next(&core_rx);
156 task_tx.send(None).expect("Failed to send `None`");
157
158 let (task1, task2) = join_handle.join().expect("Iterator thread panicked");
159 assert!(task1.is_some());
160 assert!(task2.is_none());
161
162 assert!(!logger.has_err());
163 }
164
165 #[test]
178 fn task_iterator_multiple_nones() {
179 let logger = init_logger();
180
181 let (core_tx, core_rx) = channel();
182 let (task_tx, task_rx) = channel();
183
184 let join_handle = std::thread::spawn(move || {
185 let mut iter = SerialExecutionTaskIterator::new(core_tx, task_rx);
186 (iter.next(), iter.next(), iter.next())
187 });
188
189 recv_next(&core_rx);
190 task_tx
191 .send(Some(mock_execution_task()))
192 .expect("Failed to send execution task");
193 recv_next(&core_rx);
194 task_tx.send(None).expect("Failed to send `None`");
195
196 core_rx.try_recv().expect_err("Got an unexpected task request");
197
198 let (task1, task2, task3) = join_handle.join().expect("Iterator thread panicked");
199 assert!(task1.is_some());
200 assert!(task2.is_none());
201 assert!(task3.is_none());
202
203 assert!(logger.has_debug());
204 }
205
206 #[test]
216 fn task_iterator_send_failed_but_shutdown_properly() {
217 let logger = init_logger();
218
219 let (core_tx, _) = channel();
220 let (task_tx, task_rx) = channel();
221
222 let join_handle = std::thread::spawn(move || {
223 SerialExecutionTaskIterator::new(core_tx, task_rx).next()
224 });
225
226 task_tx.send(None).expect("Failed to send `None`");
227
228 let task = join_handle.join().expect("Iterator thread panicked");
229 assert!(task.is_none());
230
231 assert!(!logger.has_err());
232 }
233
234 #[test]
244 fn task_iterator_send_failed_with_unexpected_task() {
245 let logger = init_logger();
246
247 let (core_tx, _) = channel();
248 let (task_tx, task_rx) = channel();
249
250 let join_handle = std::thread::spawn(move || {
251 SerialExecutionTaskIterator::new(core_tx, task_rx).next()
252 });
253
254 task_tx.send(Some(mock_execution_task())).expect("Failed to send task");
255
256 let task = join_handle.join().expect("Iterator thread panicked");
257 assert!(task.is_none());
258
259 assert!(logger.has_err());
260 }
261
262 #[test]
272 fn task_iterator_send_failed_no_notification() {
273 let logger = init_logger();
274
275 let (core_tx, _) = channel();
276 let (_, task_rx) = channel();
277
278 let join_handle = std::thread::spawn(move || {
279 SerialExecutionTaskIterator::new(core_tx, task_rx).next()
280 });
281
282 let task = join_handle.join().expect("Iterator thread panicked");
283 assert!(task.is_none());
284
285 assert!(logger.has_err());
286 }
287
288 #[test]
297 fn task_iterator_send_successful_but_receive_failed() {
298 let logger = init_logger();
299
300 let (core_tx, _core_rx) = channel();
301 let (_, task_rx) = channel();
302
303 let join_handle = std::thread::spawn(move || {
304 SerialExecutionTaskIterator::new(core_tx, task_rx).next()
305 });
306
307 let task = join_handle.join().expect("Iterator thread panicked");
308 assert!(task.is_none());
309
310 assert!(logger.has_err());
311 }
312 }
313
314 fn recv_next(core_rx: &Receiver<CoreMessage>) {
315 match core_rx.recv() {
316 Ok(CoreMessage::Next) => {}
317 res => panic!("Expected `Ok(CoreMessage::Next)`, got {:?} instead", res),
318 }
319 }
320
321 fn mock_execution_task() -> ExecutionTask {
322 ExecutionTask {
323 pair: TransactionBuilder::new()
324 .with_family_name("test".into())
325 .with_family_version("0.1".into())
326 .with_inputs(vec![])
327 .with_outputs(vec![])
328 .with_payload_hash_method(HashMethod::Sha512)
329 .with_payload(vec![])
330 .build_pair(&*new_signer())
331 .expect("Failed to build txn pair"),
332 context_id: ContextId::default(),
333 }
334 }
335
336 fn new_signer() -> Box<dyn Signer> {
337 let context = Secp256k1Context::new();
338 let key = context.new_random_private_key();
339 context.new_signer(key)
340 }
341
342 fn init_logger() -> MockLogger {
343 let logger = MockLogger::default();
344 set_boxed_logger(Box::new(logger.clone())).expect("Failed to set logger");
345 set_max_level(LevelFilter::Debug);
346 logger
347 }
348
349 #[derive(Clone, Default)]
350 struct MockLogger {
351 log_levels: Arc<Mutex<Vec<Level>>>,
352 }
353
354 impl MockLogger {
355 pub fn has_err(&self) -> bool {
357 self.log_levels
358 .lock()
359 .expect("Failed to get log_levels lock")
360 .iter()
361 .any(|level| level == &Level::Error)
362 }
363
364 pub fn has_debug(&self) -> bool {
366 self.log_levels
367 .lock()
368 .expect("Failed to get log_levels lock")
369 .iter()
370 .any(|level| level == &Level::Debug)
371 }
372 }
373
374 impl Log for MockLogger {
375 fn enabled(&self, _metadata: &Metadata) -> bool {
376 true
377 }
378
379 fn log(&self, record: &Record) {
380 self.log_levels
381 .lock()
382 .expect("Failed to get log_levels lock")
383 .push(record.level());
384 }
385
386 fn flush(&self) {}
387 }
388}