near_async/
test_loop.rs

1//! This is a framework to test async code in a way that is versatile, deterministic,
2//! easy-to-setup, and easy-to-debug.
3//!
4//! The primary concept here is an event loop that the test framework controls. The
5//! event loop acts as a central hub for all messages, including Actix messages,
6//! network messages, timers, etc. Business logic is only executed as a response to
7//! such events.
8//!
9//! This brings several major benefits:
10//!  - Ease of setup:
11//!     - There is no need to set up mock objects that implement some
12//!       message sender interface, instead, the test loop provides a sender object that
13//!       can be used to send messages to the event loop. For example, suppose we were
14//!       to make a Client whose constructor requires a shards_manager adapter; instead
15//!       of having to make a mock for the shards_manager adapter, we can simply register
16//!       the shards_manager actor with testloop and pass in its sender.
17//!     - Compared to writing synchronous tests, there is no need to manually deliver
18//!       network messages or handle actix messages at certain points of the test. Instead,
19//!       the event loop will invoke the appropriate event handlers whenever there is any
20//!       event remaining in the event loop. This ensures that no messages are ever missed.
21//!     - Test setup code can be modular and reusable, because the test specification
22//!       consists entirely of registering the data and actors. Rather than passing a giant
23//!       callback into a giant setup(...) function to customize one part of a huge
24//!       integration test, we can flexibly compose specific modules with event handlers.
25//!       For example, we may add an event handler to route all ShardsManager-related network
26//!       messages reliably, and at the same time another event handler to drop 50% of Block
27//!       network messages. Also, we can use an event handler as long as it is relevant for a
28//!       test (i.e. a ForwardShardsManagerRequest event handler can be used as long as the
29//!       test involves ShardsManagers), regardless of the exact architecture of the test.
30//!
31//!  - Debuggability:
32//!     - Because ALL execution is in response of events, the whole test can be cleanly
33//!       segmented into the response to each event. The framework automatically outputs
34//!       a log message at the beginning of each event execution, so that the log output
35//!       can be loaded into a visualizer to show the exact sequence of events, their
36//!       relationship, the exact contents of the event messages, and the log output
37//!       during the handling of each event. This is especially useful when debugging
38//!       multi-instance tests.
39//!
40//!  - Determinism:
41//!     - Many tests, especially those that involve multiple instances, are most easily
42//!       written by spawning actual actors and threads. This however makes the tests
43//!       inherently asynchronous and may be more flaky.
44//!     - The test loop framework also provides a synchronous and deterministic way to
45//!       invoke timers without waiting for the actual duration. This makes tests run
46//!       much faster than asynchronous tests.
47//!
48//!  - Versatility:
49//!     - A test can be constructed with any combination of components. The framework does
50//!       not dictate what components should exist, or how many instances there should be.
51//!       This allows for both small and focused tests, and large multi-instance tests.
52//!     - Timed tests can be written to check the theoretical performance of certain tasks,
53//!       such as distributing chunks to other nodes within X milliseconds provided that
54//!       network messages have a 10ms delay.
55//!     - The framework does not require major migrations to existing code, e.g. it is
56//!       compatible with the Actix framework and futures.
57//!
58//! A note on the order of execution of the events: all events that are due at the same
59//! timestamp are executed in FIFO order. For example, if the events are emitted in the
60//! following order: (A due 100ms), (B due 0ms), (C due 200ms), (D due 0ms), (E due 100ms)
61//! then the actual order of execution is B, D, A, E, C.
62pub mod data;
63pub mod futures;
64pub mod pending_events_sender;
65pub mod sender;
66
67use data::TestLoopData;
68use futures::{TestLoopAsyncComputationSpawner, TestLoopFutureSpawner};
69use near_time::{Clock, Duration, FakeClock};
70use parking_lot::Mutex;
71use pending_events_sender::{CallbackEvent, PendingEventsSender, RawPendingEventsSender};
72use serde::Serialize;
73use std::collections::{BinaryHeap, HashSet};
74use std::sync::Arc;
75use std::sync::atomic::{AtomicBool, Ordering};
76use time::ext::InstantExt;
77
78/// Main struct for the Test Loop framework.
79/// The `TestLoopData` should contain all the business logic state that is relevant
80/// to the test. All possible `Event` that are sent to the event loop are callbacks.
81/// See TestLoopData for mode details.
82///
83/// Events are sent to the testloop, with a possible delay, via the pending_events_sender.
84pub struct TestLoopV2 {
85    /// The data that is stored and accessed by the test loop.
86    pub data: TestLoopData,
87    /// The sender is used to send events to the event loop.
88    raw_pending_events_sender: RawPendingEventsSender,
89    /// The events that are yet to be handled. They are kept in a heap so that
90    /// events that shall execute earlier (by our own virtual clock) are popped
91    /// first.
92    events: BinaryHeap<EventInHeap>,
93    /// The events that will enter the events heap upon the next iteration.
94    pending_events: Arc<Mutex<InFlightEvents>>,
95    /// The next ID to assign to an event we receive.
96    next_event_index: usize,
97    /// The current virtual time.
98    current_time: Duration,
99    /// Fake clock that always returns the virtual time.
100    clock: near_time::FakeClock,
101    /// Shutdown flag. When this flag is true, delayed action runners will no
102    /// longer post any new events to the event loop.
103    shutting_down: Arc<AtomicBool>,
104    /// If present, a function to call to print something every time an event is
105    /// handled. Intended only for debugging.
106    every_event_callback: Option<Box<dyn FnMut(&TestLoopData)>>,
107    /// All events with this identifier are ignored in testloop execution environment.
108    denylisted_identifiers: HashSet<String>,
109}
110
111/// An event waiting to be executed, ordered by the due time and then by ID.
112struct EventInHeap {
113    event: CallbackEvent,
114    due: Duration,
115    id: usize,
116}
117
118impl PartialEq for EventInHeap {
119    fn eq(&self, other: &Self) -> bool {
120        self.due == other.due && self.id == other.id
121    }
122}
123
124impl Eq for EventInHeap {}
125
126impl PartialOrd for EventInHeap {
127    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
128        Some(self.cmp(other))
129    }
130}
131
132impl Ord for EventInHeap {
133    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
134        (self.due, self.id).cmp(&(other.due, other.id)).reverse()
135    }
136}
137
138struct InFlightEvents {
139    events: Vec<CallbackEvent>,
140    /// The TestLoop thread ID. This and the following field are used to detect unintended
141    /// parallel processing.
142    event_loop_thread_id: std::thread::ThreadId,
143}
144
145impl InFlightEvents {
146    fn new() -> Self {
147        Self { events: Vec::new(), event_loop_thread_id: std::thread::current().id() }
148    }
149
150    fn add(&mut self, event: CallbackEvent) {
151        if std::thread::current().id() != self.event_loop_thread_id {
152            // Another thread shall not be sending an event while we're not handling an event.
153            // If that happens, it means we have a rogue thread spawned somewhere that has not been
154            // converted to TestLoop. TestLoop tests should be single-threaded (or at least, look
155            // as if it were single-threaded). So if we catch this, panic.
156            panic!(
157                "Event was sent from the wrong thread. TestLoop tests should be single-threaded. \
158                    Check if there's any code that spawns computation on another thread such as \
159                    rayon::spawn, and convert it to AsyncComputationSpawner or FutureSpawner. \
160                    Event: {}",
161                event.description
162            );
163        }
164        self.events.push(event);
165    }
166}
167
168/// The log output line that can be used to visualize the execution of a test.
169/// It is only used to serialize into JSON. This is enough data to reconstruct
170/// the event dependency graph, and to segment log messages.
171#[derive(Serialize)]
172struct EventStartLogOutput {
173    /// Index of the current event we're about to handle.
174    current_index: usize,
175    /// See `EventEndLogOutput::total_events`.
176    total_events: usize,
177    /// The identifier of the event, usually the node_id.
178    identifier: String,
179    /// The Debug representation of the event payload.
180    current_event: String,
181    /// The current virtual time.
182    current_time_ms: u64,
183    /// Whether this event is executed or not
184    event_ignored: bool,
185}
186
187#[derive(Serialize)]
188struct EventEndLogOutput {
189    /// The total number of events we have seen so far. This is combined with
190    /// `EventStartLogOutput::total_events` to determine which new events are
191    /// emitted by the current event's handler.
192    total_events: usize,
193}
194
195impl TestLoopV2 {
196    pub fn new() -> Self {
197        let pending_events = Arc::new(Mutex::new(InFlightEvents::new()));
198        let pending_events_clone = pending_events.clone();
199        let raw_pending_events_sender = RawPendingEventsSender::new(move |callback_event| {
200            let mut pending_events = pending_events_clone.lock();
201            pending_events.add(callback_event);
202        });
203        let shutting_down = Arc::new(AtomicBool::new(false));
204        // Needed for the log visualizer to know when the test loop starts.
205        tracing::info!(target: "test_loop", "TEST_LOOP_INIT");
206        Self {
207            data: TestLoopData::new(raw_pending_events_sender.clone(), shutting_down.clone()),
208            events: BinaryHeap::new(),
209            pending_events,
210            raw_pending_events_sender,
211            next_event_index: 0,
212            current_time: Duration::ZERO,
213            clock: FakeClock::default(),
214            shutting_down,
215            every_event_callback: None,
216            denylisted_identifiers: HashSet::new(),
217        }
218    }
219
220    /// Returns a FutureSpawner that can be used to spawn futures into the loop.
221    pub fn future_spawner(&self, identifier: &str) -> TestLoopFutureSpawner {
222        self.raw_pending_events_sender.for_identifier(identifier)
223    }
224
225    /// Returns an AsyncComputationSpawner that can be used to spawn async computation into the
226    /// loop. The `artificial_delay` allows the test to determine an artificial delay that the
227    /// computation should take, based on the name of the computation.
228    pub fn async_computation_spawner(
229        &self,
230        identifier: &str,
231        artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
232    ) -> TestLoopAsyncComputationSpawner {
233        TestLoopAsyncComputationSpawner::new(
234            self.raw_pending_events_sender.for_identifier(identifier),
235            artificial_delay,
236        )
237    }
238
239    /// Sends any ad-hoc event to the loop.
240    pub fn send_adhoc_event(
241        &self,
242        description: String,
243        callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
244    ) {
245        self.send_adhoc_event_with_delay(description, Duration::ZERO, callback)
246    }
247
248    /// Sends any ad-hoc event to the loop, after some delay.
249    pub fn send_adhoc_event_with_delay(
250        &self,
251        description: String,
252        delay: Duration,
253        callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
254    ) {
255        self.raw_pending_events_sender.for_identifier("Adhoc").send_with_delay(
256            description,
257            Box::new(callback),
258            delay,
259        );
260    }
261
262    /// This function is used to filter out all events that belong to a certain identifier.
263    /// The use case is while shutting down a node, we would like to not execute any more events from that node.
264    pub fn remove_events_with_identifier(&mut self, identifier: &str) {
265        self.denylisted_identifiers.insert(identifier.to_string());
266    }
267
268    /// Returns a clock that will always return the current virtual time.
269    pub fn clock(&self) -> Clock {
270        self.clock.clock()
271    }
272
273    pub fn set_every_event_callback(&mut self, callback: impl FnMut(&TestLoopData) + 'static) {
274        self.every_event_callback = Some(Box::new(callback));
275    }
276
277    /// Helper to push events we have just received into the heap.
278    fn queue_received_events(&mut self) {
279        for event in self.pending_events.lock().events.drain(..) {
280            self.events.push(EventInHeap {
281                due: self.current_time + event.delay,
282                id: self.next_event_index,
283                event,
284            });
285            self.next_event_index += 1;
286        }
287    }
288
289    /// Performs the logic to find the next event, advance to its time, and dequeue it.
290    /// Takes a decider to determine whether to advance time, handle the next event, and/or to stop.
291    fn advance_till_next_event(
292        &mut self,
293        decider: &mut impl FnMut(Option<Duration>, &mut TestLoopData) -> AdvanceDecision,
294    ) -> Option<EventInHeap> {
295        loop {
296            // New events may have been sent to the TestLoop from outside, and the previous
297            // iteration of the loop may have made new futures ready, so queue up any received
298            // events.
299            self.queue_received_events();
300
301            // Now there are two ways an event may be/become available. One is that the event is
302            // queued into the event loop at a specific time; the other is that some future is
303            // waiting on our fake clock to advance beyond a specific time. Pick the earliest.
304            let next_timestamp = {
305                let next_event_timestamp = self.events.peek().map(|event| event.due);
306                let next_future_waiter_timestamp = self
307                    .clock
308                    .first_waiter()
309                    .map(|time| time.signed_duration_since(self.clock.now() - self.current_time));
310                next_event_timestamp
311                    .map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1))
312                    .or(next_future_waiter_timestamp)
313            };
314            // If the next event is immediately available (i.e. its time is same as current time),
315            // just return that event; there's no decision to make (as we only give deciders a
316            // chance to stop processing if we would advance the clock) and no need to advance time.
317            if next_timestamp == Some(self.current_time) {
318                let event = self.events.pop().expect("Programming error in TestLoop");
319                assert_eq!(event.due, self.current_time);
320                return Some(event);
321            }
322            // If we reach this point, it means we need to advance the clock. Let the decider choose
323            // if we should do that, or if we should stop.
324            let decision = decider(next_timestamp, &mut self.data);
325            match decision {
326                AdvanceDecision::AdvanceToNextEvent => {
327                    let next_timestamp = next_timestamp.unwrap();
328                    self.clock.advance(next_timestamp - self.current_time);
329                    self.current_time = next_timestamp;
330                    // Run the loop again, because if the reason why we advance the clock to this
331                    // time is due to a possible future waiting on the clock, we may or may not get
332                    // another future queued into the TestLoop, so we just check the whole thing
333                    // again.
334                    continue;
335                }
336                AdvanceDecision::AdvanceToAndStop(target) => {
337                    self.clock.advance(target - self.current_time);
338                    self.current_time = target;
339                    return None;
340                }
341                AdvanceDecision::Stop => {
342                    return None;
343                }
344            }
345        }
346    }
347
348    /// Processes the given event, by logging a line first and then finding a handler to run it.
349    fn process_event(&mut self, event: EventInHeap) {
350        if self.shutting_down.load(Ordering::Relaxed) {
351            return;
352        }
353
354        let event_ignored = self.denylisted_identifiers.contains(&event.event.identifier);
355        let start_json = serde_json::to_string(&EventStartLogOutput {
356            current_index: event.id,
357            total_events: self.next_event_index,
358            identifier: event.event.identifier.clone(),
359            current_event: event.event.description,
360            current_time_ms: event.due.whole_milliseconds() as u64,
361            event_ignored,
362        })
363        .unwrap();
364        tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json);
365        assert_eq!(self.current_time, event.due);
366
367        if !event_ignored {
368            if let Some(callback) = &mut self.every_event_callback {
369                callback(&self.data);
370            }
371
372            let callback = event.event.callback;
373            callback(&mut self.data);
374        }
375
376        // Push any new events into the queue. Do this before emitting the end log line,
377        // so that it contains the correct new total number of events.
378        self.queue_received_events();
379        let end_json =
380            serde_json::to_string(&EventEndLogOutput { total_events: self.next_event_index })
381                .unwrap();
382        tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_END {}", end_json);
383    }
384
385    /// Runs the test loop for the given duration. This function may be called
386    /// multiple times, but further test handlers may not be registered after
387    /// the first call.
388    pub fn run_for(&mut self, duration: Duration) {
389        let deadline = self.current_time + duration;
390        while let Some(event) = self.advance_till_next_event(&mut |next_time, _| {
391            if let Some(next_time) = next_time {
392                if next_time <= deadline {
393                    return AdvanceDecision::AdvanceToNextEvent;
394                }
395            }
396            AdvanceDecision::AdvanceToAndStop(deadline)
397        }) {
398            self.process_event(event);
399        }
400    }
401
402    /// Run until the given condition is true, asserting that it happens before the maximum duration
403    /// is reached.
404    ///
405    /// To maximize logical consistency, the condition is only checked before the clock would
406    /// advance. If it returns true, execution stops before advancing the clock.
407    pub fn run_until(
408        &mut self,
409        mut condition: impl FnMut(&mut TestLoopData) -> bool,
410        maximum_duration: Duration,
411    ) {
412        let deadline = self.current_time + maximum_duration;
413        let mut decider = move |next_time, data: &mut TestLoopData| {
414            if condition(data) {
415                return AdvanceDecision::Stop;
416            }
417            if let Some(next_time) = next_time {
418                if next_time <= deadline {
419                    return AdvanceDecision::AdvanceToNextEvent;
420                }
421            }
422            panic!("run_until did not fulfill the condition within the given deadline");
423        };
424        while let Some(event) = self.advance_till_next_event(&mut decider) {
425            self.process_event(event);
426        }
427    }
428
429    pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) {
430        self.shutting_down.store(true, Ordering::Relaxed);
431        self.run_for(maximum_duration);
432        // Implicitly dropped here, which asserts that no more events are remaining.
433    }
434
435    pub fn run_instant(&mut self) {
436        self.run_for(Duration::ZERO);
437    }
438}
439
440impl Drop for TestLoopV2 {
441    fn drop(&mut self) {
442        self.queue_received_events();
443        if let Some(event) = self.events.pop() {
444            // Drop any references that may be held by the event callbacks. This can help
445            // with destruction of the data.
446            self.events.clear();
447            panic!(
448                "Event scheduled at {} is not handled at the end of the test: {}.
449                     Consider calling `test.shutdown_and_drain_remaining_events(...)`.",
450                event.due, event.event.description
451            );
452        }
453        // Needed for the log visualizer to know when the test loop ends.
454        tracing::info!(target: "test_loop", "TEST_LOOP_SHUTDOWN");
455    }
456}
457
458enum AdvanceDecision {
459    AdvanceToNextEvent,
460    AdvanceToAndStop(Duration),
461    Stop,
462}
463
464#[cfg(test)]
465mod tests {
466    use crate::futures::FutureSpawnerExt;
467    use crate::test_loop::TestLoopV2;
468    use std::sync::Arc;
469    use std::sync::atomic::{AtomicUsize, Ordering};
470    use time::Duration;
471
472    // Tests that the TestLoop correctly handles futures that sleep on the fake clock.
473    #[test]
474    fn test_futures() {
475        let mut test_loop = TestLoopV2::new();
476        let clock = test_loop.clock();
477        let start_time = clock.now();
478
479        let finished = Arc::new(AtomicUsize::new(0));
480
481        let clock1 = clock.clone();
482        let finished1 = finished.clone();
483        test_loop.future_spawner("adhoc future spawner").spawn("test1", async move {
484            assert_eq!(clock1.now(), start_time);
485            clock1.sleep(Duration::seconds(10)).await;
486            assert_eq!(clock1.now(), start_time + Duration::seconds(10));
487            clock1.sleep(Duration::seconds(5)).await;
488            assert_eq!(clock1.now(), start_time + Duration::seconds(15));
489            finished1.fetch_add(1, Ordering::Relaxed);
490        });
491
492        test_loop.run_for(Duration::seconds(2));
493
494        let clock2 = clock;
495        let finished2 = finished.clone();
496        test_loop.future_spawner("adhoc future spawner").spawn("test2", async move {
497            assert_eq!(clock2.now(), start_time + Duration::seconds(2));
498            clock2.sleep(Duration::seconds(3)).await;
499            assert_eq!(clock2.now(), start_time + Duration::seconds(5));
500            clock2.sleep(Duration::seconds(20)).await;
501            assert_eq!(clock2.now(), start_time + Duration::seconds(25));
502            finished2.fetch_add(1, Ordering::Relaxed);
503        });
504        // During these 30 virtual seconds, the TestLoop should've automatically advanced the clock
505        // to wake each future as they become ready to run again. The code inside the futures
506        // assert that the fake clock does indeed have the expected times.
507        test_loop.run_for(Duration::seconds(30));
508        assert_eq!(finished.load(Ordering::Relaxed), 2);
509    }
510}