near_async/
test_loop.rs

1//! This is a framework to test async code in a way that is versatile, deterministic,
2//! easy-to-setup, and easy-to-debug.
3//!
4//! The primary concept here is an event loop that the test framework controls. The
5//! event loop acts as a central hub for all messages, including Actix messages,
6//! network messages, timers, etc. Business logic is only executed as a response to
7//! such events.
8//!
9//! This brings several major benefits:
10//!  - Ease of setup:
11//!     - There is no need to set up mock objects that implement some
12//!       message sender interface, instead, the test loop provides a sender object that
13//!       can be used to send messages to the event loop. For example, suppose we were
14//!       to make a Client whose constructor requires a shards_manager adapter; instead
15//!       of having to make a mock for the shards_manager adapter, we can simply register
16//!       the shards_manager actor with testloop and pass in its sender.
17//!     - Compared to writing synchronous tests, there is no need to manually deliver
18//!       network messages or handle actix messages at certain points of the test. Instead,
19//!       the event loop will invoke the appropriate event handlers whenever there is any
20//!       event remaining in the event loop. This ensures that no messages are ever missed.
21//!     - Test setup code can be modular and reusable, because the test specification
22//!       consists entirely of registering the data and actors. Rather than passing a giant
23//!       callback into a giant setup(...) function to customize one part of a huge
24//!       integration test, we can flexibly compose specific modules with event handlers.
25//!       For example, we may add an event handler to route all ShardsManager-related network
26//!       messages reliably, and at the same time another event handler to drop 50% of Block
27//!       network messages. Also, we can use an event handler as long as it is relevant for a
28//!       test (i.e. a ForwardShardsManagerRequest event handler can be used as long as the
29//!       test involves ShardsManagers), regardless of the exact architecture of the test.
30//!
31//!  - Debuggability:
32//!     - Because ALL execution is in response of events, the whole test can be cleanly
33//!       segmented into the response to each event. The framework automatically outputs
34//!       a log message at the beginning of each event execution, so that the log output
35//!       can be loaded into a visualizer to show the exact sequence of events, their
36//!       relationship, the exact contents of the event messages, and the log output
37//!       during the handling of each event. This is especially useful when debugging
38//!       multi-instance tests.
39//!
40//!  - Determinism:
41//!     - Many tests, especially those that involve multiple instances, are most easily
42//!       written by spawning actual actors and threads. This however makes the tests
43//!       inherently asynchronous and may be more flaky.
44//!     - The test loop framework also provides a synchronous and deterministic way to
45//!       invoke timers without waiting for the actual duration. This makes tests run
46//!       much faster than asynchronous tests.
47//!
48//!  - Versatility:
49//!     - A test can be constructed with any combination of components. The framework does
50//!       not dictate what components should exist, or how many instances there should be.
51//!       This allows for both small and focused tests, and large multi-instance tests.
52//!     - Timed tests can be written to check the theoretical performance of certain tasks,
53//!       such as distributing chunks to other nodes within X milliseconds provided that
54//!       network messages have a 10ms delay.
55//!     - The framework does not require major migrations to existing code, e.g. it is
56//!       compatible with the Actix framework and futures.
57//!
58//! A note on the order of execution of the events: all events that are due at the same
59//! timestamp are executed in FIFO order. For example, if the events are emitted in the
60//! following order: (A due 100ms), (B due 0ms), (C due 200ms), (D due 0ms), (E due 100ms)
61//! then the actual order of execution is B, D, A, E, C.
62pub mod data;
63pub mod futures;
64pub mod pending_events_sender;
65pub mod sender;
66
67use data::TestLoopData;
68use futures::{TestLoopAsyncComputationSpawner, TestLoopFutureSpawner};
69use near_time::{Clock, Duration, FakeClock};
70use pending_events_sender::{CallbackEvent, PendingEventsSender};
71use sender::TestLoopSender;
72use serde::Serialize;
73use std::collections::BinaryHeap;
74use std::sync::atomic::{AtomicBool, Ordering};
75use std::sync::Arc;
76use std::sync::Mutex;
77use time::ext::InstantExt;
78
79use crate::messaging::{Actor, LateBoundSender};
80
81/// Main struct for the Test Loop framework.
82/// The `TestLoopData` should contain all the business logic state that is relevant
83/// to the test. All possible `Event` that are sent to the event loop are callbacks.
84/// See TestLoopData for mode details.
85///
86/// Events are sent to the testloop, with a possible delay, via the pending_events_sender.
87pub struct TestLoopV2 {
88    /// The data that is stored and accessed by the test loop.
89    pub data: TestLoopData,
90    /// The sender is used to send events to the event loop.
91    pending_events_sender: PendingEventsSender,
92    /// The events that are yet to be handled. They are kept in a heap so that
93    /// events that shall execute earlier (by our own virtual clock) are popped
94    /// first.
95    events: BinaryHeap<EventInHeap>,
96    /// The events that will enter the events heap upon the next iteration.
97    pending_events: Arc<Mutex<InFlightEvents>>,
98    /// The next ID to assign to an event we receive.
99    next_event_index: usize,
100    /// The current virtual time.
101    pub current_time: Duration,
102    /// Fake clock that always returns the virtual time.
103    clock: near_time::FakeClock,
104    /// Shutdown flag. When this flag is true, delayed action runners will no
105    /// longer post any new events to the event loop.
106    shutting_down: Arc<AtomicBool>,
107    /// If present, a function to call to print something every time an event is
108    /// handled. Intended only for debugging.
109    every_event_callback: Option<Box<dyn FnMut(&TestLoopData)>>,
110}
111
112/// An event waiting to be executed, ordered by the due time and then by ID.
113struct EventInHeap {
114    event: CallbackEvent,
115    due: Duration,
116    id: usize,
117}
118
119impl PartialEq for EventInHeap {
120    fn eq(&self, other: &Self) -> bool {
121        self.due == other.due && self.id == other.id
122    }
123}
124
125impl Eq for EventInHeap {}
126
127impl PartialOrd for EventInHeap {
128    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129        Some(self.cmp(other))
130    }
131}
132
133impl Ord for EventInHeap {
134    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
135        (self.due, self.id).cmp(&(other.due, other.id)).reverse()
136    }
137}
138
139struct InFlightEvents {
140    events: Vec<CallbackEvent>,
141    /// The TestLoop thread ID. This and the following field are used to detect unintended
142    /// parallel processing.
143    event_loop_thread_id: std::thread::ThreadId,
144    /// Whether we're currently handling an event.
145    is_handling_event: bool,
146}
147
148impl InFlightEvents {
149    fn add(&mut self, event: CallbackEvent) {
150        if !self.is_handling_event && std::thread::current().id() != self.event_loop_thread_id {
151            // Another thread shall not be sending an event while we're not handling an event.
152            // If that happens, it means we have a rogue thread spawned somewhere that has not been
153            // converted to TestLoop. TestLoop tests should be single-threaded (or at least, look
154            // as if it were single-threaded). So if we catch this, panic.
155            panic!(
156                "Event was sent from the wrong thread. TestLoop tests should be single-threaded. \
157                    Check if there's any code that spawns computation on another thread such as \
158                    rayon::spawn, and convert it to AsyncComputationSpawner or FutureSpawner. \
159                    Event: {}",
160                event.description
161            );
162        }
163        self.events.push(event);
164    }
165}
166
167/// The log output line that can be used to visualize the execution of a test.
168/// It is only used to serialize into JSON. This is enough data to reconstruct
169/// the event dependency graph, and to segment log messages.
170#[derive(Serialize)]
171struct EventStartLogOutput {
172    /// Index of the current event we're about to handle.
173    current_index: usize,
174    /// See `EventEndLogOutput::total_events`.
175    total_events: usize,
176    /// The Debug representation of the event payload.
177    current_event: String,
178    /// The current virtual time.
179    current_time_ms: u64,
180}
181
182#[derive(Serialize)]
183struct EventEndLogOutput {
184    /// The total number of events we have seen so far. This is combined with
185    /// `EventStartLogOutput::total_events` to determine which new events are
186    /// emitted by the current event's handler.
187    total_events: usize,
188}
189
190impl TestLoopV2 {
191    pub fn new() -> Self {
192        let pending_events = Arc::new(Mutex::new(InFlightEvents {
193            events: Vec::new(),
194            event_loop_thread_id: std::thread::current().id(),
195            is_handling_event: false,
196        }));
197        let pending_events_clone = pending_events.clone();
198        let pending_events_sender = PendingEventsSender::new(move |callback_event| {
199            let mut pending_events = pending_events_clone.lock().unwrap();
200            pending_events.add(callback_event);
201        });
202        let shutting_down = Arc::new(AtomicBool::new(false));
203        // Needed for the log visualizer to know when the test loop starts.
204        tracing::info!(target: "test_loop", "TEST_LOOP_INIT");
205        Self {
206            data: TestLoopData::new(pending_events_sender.clone(), shutting_down.clone()),
207            events: BinaryHeap::new(),
208            pending_events,
209            pending_events_sender,
210            next_event_index: 0,
211            current_time: Duration::ZERO,
212            clock: FakeClock::default(),
213            shutting_down,
214            every_event_callback: None,
215        }
216    }
217
218    /// Returns a FutureSpawner that can be used to spawn futures into the loop.
219    pub fn future_spawner(&self) -> TestLoopFutureSpawner {
220        self.pending_events_sender.clone()
221    }
222
223    /// Returns an AsyncComputationSpawner that can be used to spawn async computation into the
224    /// loop. The `artificial_delay` allows the test to determine an artificial delay that the
225    /// computation should take, based on the name of the computation.
226    pub fn async_computation_spawner(
227        &self,
228        artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
229    ) -> TestLoopAsyncComputationSpawner {
230        TestLoopAsyncComputationSpawner::new(self.pending_events_sender.clone(), artificial_delay)
231    }
232
233    /// Returns a sender that can be used anywhere to send events to the loop.
234    pub fn sender(&self) -> PendingEventsSender {
235        self.pending_events_sender.clone()
236    }
237
238    /// Sends any ad-hoc event to the loop.
239    pub fn send_adhoc_event(
240        &self,
241        description: String,
242        callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
243    ) {
244        self.pending_events_sender.send(format!("Adhoc({})", description), Box::new(callback));
245    }
246
247    /// Sends any ad-hoc event to the loop, after some delay.
248    pub fn send_adhoc_event_with_delay(
249        &self,
250        description: String,
251        delay: Duration,
252        callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
253    ) {
254        self.pending_events_sender.send_with_delay(
255            format!("Adhoc({})", description),
256            Box::new(callback),
257            delay,
258        );
259    }
260
261    /// Returns a clock that will always return the current virtual time.
262    pub fn clock(&self) -> Clock {
263        self.clock.clock()
264    }
265
266    pub fn register_actor<A>(
267        &mut self,
268        actor: A,
269        adapter: Option<Arc<LateBoundSender<TestLoopSender<A>>>>,
270    ) -> TestLoopSender<A>
271    where
272        A: Actor + 'static,
273    {
274        self.data.register_actor_for_index(0, actor, adapter)
275    }
276
277    pub fn register_actor_for_index<A>(
278        &mut self,
279        index: usize,
280        actor: A,
281        adapter: Option<Arc<LateBoundSender<TestLoopSender<A>>>>,
282    ) -> TestLoopSender<A>
283    where
284        A: Actor + 'static,
285    {
286        self.data.register_actor_for_index(index, actor, adapter)
287    }
288
289    pub fn set_every_event_callback(&mut self, callback: impl FnMut(&TestLoopData) + 'static) {
290        self.every_event_callback = Some(Box::new(callback));
291    }
292
293    /// Helper to push events we have just received into the heap.
294    fn queue_received_events(&mut self) {
295        for event in self.pending_events.lock().unwrap().events.drain(..) {
296            self.events.push(EventInHeap {
297                due: self.current_time + event.delay,
298                id: self.next_event_index,
299                event,
300            });
301            self.next_event_index += 1;
302        }
303    }
304
305    /// Performs the logic to find the next event, advance to its time, and dequeue it.
306    /// Takes a decider to determine whether to advance time, handle the next event, and/or to stop.
307    fn advance_till_next_event(
308        &mut self,
309        decider: &mut impl FnMut(Option<Duration>, &mut TestLoopData) -> AdvanceDecision,
310    ) -> Option<EventInHeap> {
311        loop {
312            // New events may have been sent to the TestLoop from outside, and the previous
313            // iteration of the loop may have made new futures ready, so queue up any received
314            // events.
315            self.queue_received_events();
316
317            // Now there are two ways an event may be/become available. One is that the event is
318            // queued into the event loop at a specific time; the other is that some future is
319            // waiting on our fake clock to advance beyond a specific time. Pick the earliest.
320            let next_timestamp = {
321                let next_event_timestamp = self.events.peek().map(|event| event.due);
322                let next_future_waiter_timestamp = self
323                    .clock
324                    .first_waiter()
325                    .map(|time| time.signed_duration_since(self.clock.now() - self.current_time));
326                next_event_timestamp
327                    .map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1))
328                    .or(next_future_waiter_timestamp)
329            };
330            // If the next event is immediately available (i.e. its time is same as current time),
331            // just return that event; there's no decision to make (as we only give deciders a
332            // chance to stop processing if we would advance the clock) and no need to advance time.
333            if next_timestamp == Some(self.current_time) {
334                let event = self.events.pop().expect("Programming error in TestLoop");
335                assert_eq!(event.due, self.current_time);
336                return Some(event);
337            }
338            // If we reach this point, it means we need to advance the clock. Let the decider choose
339            // if we should do that, or if we should stop.
340            let decision = decider(next_timestamp, &mut self.data);
341            match decision {
342                AdvanceDecision::AdvanceToNextEvent => {
343                    let next_timestamp = next_timestamp.unwrap();
344                    self.clock.advance(next_timestamp - self.current_time);
345                    self.current_time = next_timestamp;
346                    // Run the loop again, because if the reason why we advance the clock to this
347                    // time is due to a possible future waiting on the clock, we may or may not get
348                    // another future queued into the TestLoop, so we just check the whole thing
349                    // again.
350                    continue;
351                }
352                AdvanceDecision::AdvanceToAndStop(target) => {
353                    self.clock.advance(target - self.current_time);
354                    self.current_time = target;
355                    return None;
356                }
357                AdvanceDecision::Stop => {
358                    return None;
359                }
360            }
361        }
362    }
363
364    /// Processes the given event, by logging a line first and then finding a handler to run it.
365    fn process_event(&mut self, event: EventInHeap) {
366        let start_json = serde_json::to_string(&EventStartLogOutput {
367            current_index: event.id,
368            total_events: self.next_event_index,
369            current_event: event.event.description,
370            current_time_ms: event.due.whole_milliseconds() as u64,
371        })
372        .unwrap();
373        tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json);
374        assert_eq!(self.current_time, event.due);
375
376        if let Some(callback) = &mut self.every_event_callback {
377            callback(&self.data);
378        }
379
380        let callback = event.event.callback;
381        callback(&mut self.data);
382
383        // Push any new events into the queue. Do this before emitting the end log line,
384        // so that it contains the correct new total number of events.
385        self.queue_received_events();
386        let end_json =
387            serde_json::to_string(&EventEndLogOutput { total_events: self.next_event_index })
388                .unwrap();
389        tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_END {}", end_json);
390    }
391
392    /// Runs the test loop for the given duration. This function may be called
393    /// multiple times, but further test handlers may not be registered after
394    /// the first call.
395    pub fn run_for(&mut self, duration: Duration) {
396        let deadline = self.current_time + duration;
397        while let Some(event) = self.advance_till_next_event(&mut |next_time, _| {
398            if let Some(next_time) = next_time {
399                if next_time <= deadline {
400                    return AdvanceDecision::AdvanceToNextEvent;
401                }
402            }
403            AdvanceDecision::AdvanceToAndStop(deadline)
404        }) {
405            self.process_event(event);
406        }
407    }
408
409    /// Run until the given condition is true, asserting that it happens before the maximum duration
410    /// is reached.
411    ///
412    /// To maximize logical consistency, the condition is only checked before the clock would
413    /// advance. If it returns true, execution stops before advancing the clock.
414    pub fn run_until(
415        &mut self,
416        mut condition: impl FnMut(&mut TestLoopData) -> bool,
417        maximum_duration: Duration,
418    ) {
419        let deadline = self.current_time + maximum_duration;
420        let mut decider = move |next_time, data: &mut TestLoopData| {
421            if condition(data) {
422                return AdvanceDecision::Stop;
423            }
424            if let Some(next_time) = next_time {
425                if next_time <= deadline {
426                    return AdvanceDecision::AdvanceToNextEvent;
427                }
428            }
429            panic!("run_until did not fulfill the condition within the given deadline");
430        };
431        while let Some(event) = self.advance_till_next_event(&mut decider) {
432            self.process_event(event);
433        }
434    }
435
436    pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) {
437        self.shutting_down.store(true, Ordering::Relaxed);
438        self.run_for(maximum_duration);
439        // Implicitly dropped here, which asserts that no more events are remaining.
440    }
441
442    pub fn run_instant(&mut self) {
443        self.run_for(Duration::ZERO);
444    }
445}
446
447impl Drop for TestLoopV2 {
448    fn drop(&mut self) {
449        self.queue_received_events();
450        if let Some(event) = self.events.pop() {
451            // Drop any references that may be held by the event callbacks. This can help
452            // with destruction of the data.
453            self.events.clear();
454            panic!(
455                "Event scheduled at {} is not handled at the end of the test: {}.
456                 Consider calling `test.shutdown_and_drain_remaining_events(...)`.",
457                event.due, event.event.description
458            );
459        }
460        // Needed for the log visualizer to know when the test loop ends.
461        tracing::info!(target: "test_loop", "TEST_LOOP_SHUTDOWN");
462    }
463}
464
465enum AdvanceDecision {
466    AdvanceToNextEvent,
467    AdvanceToAndStop(Duration),
468    Stop,
469}
470
471#[cfg(test)]
472mod tests {
473    use crate::futures::FutureSpawnerExt;
474    use crate::test_loop::TestLoopV2;
475    use std::sync::atomic::{AtomicUsize, Ordering};
476    use std::sync::Arc;
477    use time::Duration;
478
479    // Tests that the TestLoop correctly handles futures that sleep on the fake clock.
480    #[test]
481    fn test_futures() {
482        let mut test_loop = TestLoopV2::new();
483        let clock = test_loop.clock();
484        let start_time = clock.now();
485
486        let finished = Arc::new(AtomicUsize::new(0));
487
488        let clock1 = clock.clone();
489        let finished1 = finished.clone();
490        test_loop.future_spawner().spawn("test1", async move {
491            assert_eq!(clock1.now(), start_time);
492            clock1.sleep(Duration::seconds(10)).await;
493            assert_eq!(clock1.now(), start_time + Duration::seconds(10));
494            clock1.sleep(Duration::seconds(5)).await;
495            assert_eq!(clock1.now(), start_time + Duration::seconds(15));
496            finished1.fetch_add(1, Ordering::Relaxed);
497        });
498
499        test_loop.run_for(Duration::seconds(2));
500
501        let clock2 = clock;
502        let finished2 = finished.clone();
503        test_loop.future_spawner().spawn("test2", async move {
504            assert_eq!(clock2.now(), start_time + Duration::seconds(2));
505            clock2.sleep(Duration::seconds(3)).await;
506            assert_eq!(clock2.now(), start_time + Duration::seconds(5));
507            clock2.sleep(Duration::seconds(20)).await;
508            assert_eq!(clock2.now(), start_time + Duration::seconds(25));
509            finished2.fetch_add(1, Ordering::Relaxed);
510        });
511        // During these 30 virtual seconds, the TestLoop should've automatically advanced the clock
512        // to wake each future as they become ready to run again. The code inside the futures
513        // assert that the fake clock does indeed have the expected times.
514        test_loop.run_for(Duration::seconds(30));
515        assert_eq!(finished.load(Ordering::Relaxed), 2);
516    }
517}