transact/scheduler/serial/
execution.rs

1/*
2 * Copyright 2019 Cargill Incorporated
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 * -----------------------------------------------------------------------------
16 */
17
18//! Implementation of the components used for interfacing with the component
19//! reponsible for the execution of transactions (usually the Executor).
20
21use crate::scheduler::ExecutionTask;
22use crate::scheduler::ExecutionTaskCompletionNotification;
23use crate::scheduler::ExecutionTaskCompletionNotifier;
24
25use std::sync::mpsc::{Receiver, Sender};
26
27use super::core::CoreMessage;
28
29pub struct SerialExecutionTaskIterator {
30    tx: Sender<CoreMessage>,
31    rx: Receiver<Option<ExecutionTask>>,
32    is_complete: bool,
33}
34
35impl SerialExecutionTaskIterator {
36    pub fn new(tx: Sender<CoreMessage>, rx: Receiver<Option<ExecutionTask>>) -> Self {
37        SerialExecutionTaskIterator {
38            tx,
39            rx,
40            is_complete: false,
41        }
42    }
43}
44
45impl Iterator for SerialExecutionTaskIterator {
46    type Item = ExecutionTask;
47
48    /// Return the next execution task which is available to be executed.
49    fn next(&mut self) -> Option<ExecutionTask> {
50        if self.is_complete {
51            debug!(
52                "Execution task iterator already returned `None`; `next` should not be called again"
53            );
54            return None;
55        }
56
57        // Send a message to the scheduler requesting the next task be sent.
58        match self.tx.send(CoreMessage::Next) {
59            Ok(_) => match self.rx.recv() {
60                Ok(task) => {
61                    self.is_complete = task.is_none();
62                    task
63                }
64                Err(_) => {
65                    error!(
66                        "Failed to receive next execution task; scheduler shutdown unexpectedly"
67                    );
68                    self.is_complete = true;
69                    None
70                }
71            },
72            Err(_) => {
73                trace!("Scheduler core message receiver dropped; checking if it shutdown properly");
74                match self.rx.recv() {
75                    Ok(Some(_)) => error!(
76                        "Scheduler sent unexpected execution task before shutting down unexpectedly"
77                    ),
78                    // If `None` was sent, the scheduler had no more tasks so a shutdown is expected
79                    Ok(None) => {}
80                    _ => error!(
81                        "Failed to request next execution task; scheduler shutdown unexpectedly"
82                    ),
83                }
84                self.is_complete = true;
85                None
86            }
87        }
88    }
89}
90
91#[derive(Clone)]
92pub struct SerialExecutionTaskCompletionNotifier {
93    tx: Sender<CoreMessage>,
94}
95
96impl SerialExecutionTaskCompletionNotifier {
97    pub fn new(tx: Sender<CoreMessage>) -> Self {
98        SerialExecutionTaskCompletionNotifier { tx }
99    }
100}
101
102impl ExecutionTaskCompletionNotifier for SerialExecutionTaskCompletionNotifier {
103    fn notify(&self, notification: ExecutionTaskCompletionNotification) {
104        self.tx
105            .send(CoreMessage::ExecutionResult(notification))
106            .unwrap_or_else(|err| error!("failed to send notification to core: {}", err));
107    }
108
109    fn clone_box(&self) -> Box<dyn ExecutionTaskCompletionNotifier> {
110        Box::new(self.clone())
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    use std::sync::{mpsc::channel, Arc, Mutex};
119
120    use cylinder::{secp256k1::Secp256k1Context, Context, Signer};
121    use log::{set_boxed_logger, set_max_level, Level, LevelFilter, Log, Metadata, Record};
122    use rusty_fork::rusty_fork_test;
123
124    use crate::context::ContextId;
125    use crate::protocol::transaction::{HashMethod, TransactionBuilder};
126
127    // This macro runs each of the tests in a separate process, which is necessary because the
128    // logger is per-process. If we ran the tests normally (which is accomplished with threads), the
129    // loggers would interfere with each other.
130    rusty_fork_test! {
131        /// Verifies that the task iterator works properly under normal conditions
132        ///
133        /// 1. Initialize the logger
134        /// 2. Initialize the channels used by the iterator
135        /// 3. Create the iterator and attempt to get two tasks in a new thread
136        /// 4. Simulate receiving the requests, send one task, and send a `None` to signal no more tasks
137        /// 5. Join the thread and verify the tasks were returned by the iterator
138        /// 6. Verify that no errors were logged
139        #[test]
140        fn task_iterator_successful() {
141            let logger = init_logger();
142
143            let (core_tx, core_rx) = channel();
144            let (task_tx, task_rx) = channel();
145
146            let join_handle = std::thread::spawn(move || {
147                let mut iter = SerialExecutionTaskIterator::new(core_tx, task_rx);
148                (iter.next(), iter.next())
149            });
150
151            recv_next(&core_rx);
152            task_tx
153                .send(Some(mock_execution_task()))
154                .expect("Failed to send execution task");
155            recv_next(&core_rx);
156            task_tx.send(None).expect("Failed to send `None`");
157
158            let (task1, task2) = join_handle.join().expect("Iterator thread panicked");
159            assert!(task1.is_some());
160            assert!(task2.is_none());
161
162            assert!(!logger.has_err());
163        }
164
165        /// Verifies that the task iterator makes a debug log and returns `None` if `next` is called
166        /// after a `None` result has already been returned.
167        ///
168        /// 1. Initialize the logger
169        /// 2. Initialize the channels used by the iterator
170        /// 3. Create the iterator and attempt to get three tasks in a new thread
171        /// 4. Simulate receiving two of the requests, send one task, and send a `None` to signal no
172        ///    more tasks
173        /// 5. Verify that a third request was not sent, because the iterator should have detected
174        ///    that `next` was called after a `None` result was received
175        /// 6. Join the thread and verify the correct tasks were returned by the iterator
176        /// 7. Verify that a debug log was made
177        #[test]
178        fn task_iterator_multiple_nones() {
179            let logger = init_logger();
180
181            let (core_tx, core_rx) = channel();
182            let (task_tx, task_rx) = channel();
183
184            let join_handle = std::thread::spawn(move || {
185                let mut iter = SerialExecutionTaskIterator::new(core_tx, task_rx);
186                (iter.next(), iter.next(), iter.next())
187            });
188
189            recv_next(&core_rx);
190            task_tx
191                .send(Some(mock_execution_task()))
192                .expect("Failed to send execution task");
193            recv_next(&core_rx);
194            task_tx.send(None).expect("Failed to send `None`");
195
196            core_rx.try_recv().expect_err("Got an unexpected task request");
197
198            let (task1, task2, task3) = join_handle.join().expect("Iterator thread panicked");
199            assert!(task1.is_some());
200            assert!(task2.is_none());
201            assert!(task3.is_none());
202
203            assert!(logger.has_debug());
204        }
205
206        /// Verifies that the task iterator returns `None` without logging an error if the next task
207        /// request fails to send but the scheduler sent a `None` result on shutdown.
208        ///
209        /// 1. Initialize the logger
210        /// 2. Initialize the channels used by the iterator but drop the core receiver immediately
211        /// 3. Create the iterator and attempt to get a task in a new thread
212        /// 4. Send a `None` to the task receiver to simulate proper scheduler thread shutdown
213        /// 5. Join the thread and verify the `None` was returned by the iterator
214        /// 7. Verify that no error was logged
215        #[test]
216        fn task_iterator_send_failed_but_shutdown_properly() {
217            let logger = init_logger();
218
219            let (core_tx, _) = channel();
220            let (task_tx, task_rx) = channel();
221
222            let join_handle = std::thread::spawn(move || {
223                SerialExecutionTaskIterator::new(core_tx, task_rx).next()
224            });
225
226            task_tx.send(None).expect("Failed to send `None`");
227
228            let task = join_handle.join().expect("Iterator thread panicked");
229            assert!(task.is_none());
230
231            assert!(!logger.has_err());
232        }
233
234        /// Verifies that the task iterator returns `None` and logs an error if the next task
235        /// request fails but an execution task is still received.
236        ///
237        /// 1. Initialize the logger
238        /// 2. Initialize the channels used by the iterator but drop the core receiver immediately
239        /// 3. Create the iterator and attempt to get a task in a new thread
240        /// 4. Send a task to the receiver to simulate an unexpected task
241        /// 5. Join the thread and verify `None` was returned by the iterator
242        /// 7. Verify that an error was logged
243        #[test]
244        fn task_iterator_send_failed_with_unexpected_task() {
245            let logger = init_logger();
246
247            let (core_tx, _) = channel();
248            let (task_tx, task_rx) = channel();
249
250            let join_handle = std::thread::spawn(move || {
251                SerialExecutionTaskIterator::new(core_tx, task_rx).next()
252            });
253
254            task_tx.send(Some(mock_execution_task())).expect("Failed to send task");
255
256            let task = join_handle.join().expect("Iterator thread panicked");
257            assert!(task.is_none());
258
259            assert!(logger.has_err());
260        }
261
262        /// Verifies that the task iterator returns `None` and logs an error if the next task
263        /// request fails and `None` task was never received.
264        ///
265        /// 1. Initialize the logger
266        /// 2. Initialize the channels used by the iterator but drop the core receiver and the task
267        ///    sender immediately
268        /// 3. Create the iterator and attempt to get a task in a new thread
269        /// 4. Join the thread and verify `None` was returned by the iterator
270        /// 5. Verify that an error was logged
271        #[test]
272        fn task_iterator_send_failed_no_notification() {
273            let logger = init_logger();
274
275            let (core_tx, _) = channel();
276            let (_, task_rx) = channel();
277
278            let join_handle = std::thread::spawn(move || {
279                SerialExecutionTaskIterator::new(core_tx, task_rx).next()
280            });
281
282            let task = join_handle.join().expect("Iterator thread panicked");
283            assert!(task.is_none());
284
285            assert!(logger.has_err());
286        }
287
288        /// Verifies that the task iterator returns `None` and logs an error if the next task
289        /// request succeeds but receiving the response fails.
290        ///
291        /// 1. Initialize the logger
292        /// 2. Initialize the channels used by the iterator but drop the task sender immediately
293        /// 3. Create the iterator and attempt to get a task in a new thread
294        /// 4. Join the thread and verify `None` was returned by the iterator
295        /// 5. Verify that an error was logged
296        #[test]
297        fn task_iterator_send_successful_but_receive_failed() {
298            let logger = init_logger();
299
300            let (core_tx, _core_rx) = channel();
301            let (_, task_rx) = channel();
302
303            let join_handle = std::thread::spawn(move || {
304                SerialExecutionTaskIterator::new(core_tx, task_rx).next()
305            });
306
307            let task = join_handle.join().expect("Iterator thread panicked");
308            assert!(task.is_none());
309
310            assert!(logger.has_err());
311        }
312    }
313
314    fn recv_next(core_rx: &Receiver<CoreMessage>) {
315        match core_rx.recv() {
316            Ok(CoreMessage::Next) => {}
317            res => panic!("Expected `Ok(CoreMessage::Next)`, got {:?} instead", res),
318        }
319    }
320
321    fn mock_execution_task() -> ExecutionTask {
322        ExecutionTask {
323            pair: TransactionBuilder::new()
324                .with_family_name("test".into())
325                .with_family_version("0.1".into())
326                .with_inputs(vec![])
327                .with_outputs(vec![])
328                .with_payload_hash_method(HashMethod::Sha512)
329                .with_payload(vec![])
330                .build_pair(&*new_signer())
331                .expect("Failed to build txn pair"),
332            context_id: ContextId::default(),
333        }
334    }
335
336    fn new_signer() -> Box<dyn Signer> {
337        let context = Secp256k1Context::new();
338        let key = context.new_random_private_key();
339        context.new_signer(key)
340    }
341
342    fn init_logger() -> MockLogger {
343        let logger = MockLogger::default();
344        set_boxed_logger(Box::new(logger.clone())).expect("Failed to set logger");
345        set_max_level(LevelFilter::Debug);
346        logger
347    }
348
349    #[derive(Clone, Default)]
350    struct MockLogger {
351        log_levels: Arc<Mutex<Vec<Level>>>,
352    }
353
354    impl MockLogger {
355        /// Determines whether or not an error message was logged
356        pub fn has_err(&self) -> bool {
357            self.log_levels
358                .lock()
359                .expect("Failed to get log_levels lock")
360                .iter()
361                .any(|level| level == &Level::Error)
362        }
363
364        /// Determines whether or not a debug message was logged
365        pub fn has_debug(&self) -> bool {
366            self.log_levels
367                .lock()
368                .expect("Failed to get log_levels lock")
369                .iter()
370                .any(|level| level == &Level::Debug)
371        }
372    }
373
374    impl Log for MockLogger {
375        fn enabled(&self, _metadata: &Metadata) -> bool {
376            true
377        }
378
379        fn log(&self, record: &Record) {
380            self.log_levels
381                .lock()
382                .expect("Failed to get log_levels lock")
383                .push(record.level());
384        }
385
386        fn flush(&self) {}
387    }
388}