near_async/test_loop/
mod.rs

1//! This is a framework to test async code in a way that is versatile, deterministic,
2//! easy-to-setup, and easy-to-debug.
3//!
4//! The primary concept here is an event loop that the test framework controls. The
5//! event loop acts as a central hub for all messages, including actor messages,
6//! network messages, timers, etc. Business logic is only executed as a response to
7//! such events.
8//!
9//! This brings several major benefits:
10//!  - Ease of setup:
11//!     - There is no need to set up mock objects that implement some
12//!       message sender interface, instead, the test loop provides a sender object that
13//!       can be used to send messages to the event loop. For example, suppose we were
14//!       to make a Client whose constructor requires a shards_manager adapter; instead
15//!       of having to make a mock for the shards_manager adapter, we can simply register
16//!       the shards_manager actor with testloop and pass in its sender.
17//!     - Compared to writing synchronous tests, there is no need to manually deliver
18//!       network messages or handle actor messages at certain points of the test. Instead,
19//!       the event loop will invoke the appropriate event handlers whenever there is any
20//!       event remaining in the event loop. This ensures that no messages are ever missed.
21//!     - Test setup code can be modular and reusable, because the test specification
22//!       consists entirely of registering the data and actors. Rather than passing a giant
23//!       callback into a giant setup(...) function to customize one part of a huge
24//!       integration test, we can flexibly compose specific modules with event handlers.
25//!       For example, we may add an event handler to route all ShardsManager-related network
26//!       messages reliably, and at the same time another event handler to drop 50% of Block
27//!       network messages. Also, we can use an event handler as long as it is relevant for a
28//!       test (i.e. a ForwardShardsManagerRequest event handler can be used as long as the
29//!       test involves ShardsManagers), regardless of the exact architecture of the test.
30//!
31//!  - Debuggability:
32//!     - Because ALL execution is in response of events, the whole test can be cleanly
33//!       segmented into the response to each event. The framework automatically outputs
34//!       a log message at the beginning of each event execution, so that the log output
35//!       can be loaded into a visualizer to show the exact sequence of events, their
36//!       relationship, the exact contents of the event messages, and the log output
37//!       during the handling of each event. This is especially useful when debugging
38//!       multi-instance tests.
39//!
40//!  - Determinism:
41//!     - Many tests, especially those that involve multiple instances, are most easily
42//!       written by spawning actual actors and threads. This however makes the tests
43//!       inherently asynchronous and may be more flaky.
44//!     - The test loop framework also provides a synchronous and deterministic way to
45//!       invoke timers without waiting for the actual duration. This makes tests run
46//!       much faster than asynchronous tests.
47//!
48//!  - Versatility:
49//!     - A test can be constructed with any combination of components. The framework does
50//!       not dictate what components should exist, or how many instances there should be.
51//!       This allows for both small and focused tests, and large multi-instance tests.
52//!     - Timed tests can be written to check the theoretical performance of certain tasks,
53//!       such as distributing chunks to other nodes within X milliseconds provided that
54//!       network messages have a 10ms delay.
55//!     - The framework does not require major migrations to existing code, e.g. it is
56//!       compatible with the actor framework and futures.
57//!
58//! A note on the order of execution of the events: all events that are due at the same
59//! timestamp are executed in FIFO order. For example, if the events are emitted in the
60//! following order: (A due 100ms), (B due 0ms), (C due 200ms), (D due 0ms), (E due 100ms)
61//! then the actual order of execution is B, D, A, E, C.
62pub mod data;
63pub mod futures;
64pub mod pending_events_sender;
65pub mod sender;
66
67use data::TestLoopData;
68use futures::{TestLoopAsyncComputationSpawner, TestLoopFutureSpawner};
69use near_time::{Clock, Duration, FakeClock};
70use parking_lot::Mutex;
71use pending_events_sender::{CallbackEvent, PendingEventsSender, RawPendingEventsSender};
72use serde::Serialize;
73use std::collections::{BinaryHeap, HashSet};
74use std::sync::Arc;
75use std::sync::atomic::{AtomicBool, Ordering};
76use std::thread::panicking;
77use time::ext::InstantExt;
78
79/// Main struct for the Test Loop framework.
80/// The `TestLoopData` should contain all the business logic state that is relevant
81/// to the test. All possible `Event` that are sent to the event loop are callbacks.
82/// See TestLoopData for mode details.
83///
84/// Events are sent to the testloop, with a possible delay, via the pending_events_sender.
85pub struct TestLoopV2 {
86    /// The data that is stored and accessed by the test loop.
87    pub data: TestLoopData,
88    /// The sender is used to send events to the event loop.
89    raw_pending_events_sender: RawPendingEventsSender,
90    /// The events that are yet to be handled. They are kept in a heap so that
91    /// events that shall execute earlier (by our own virtual clock) are popped
92    /// first.
93    events: BinaryHeap<EventInHeap>,
94    /// The events that will enter the events heap upon the next iteration.
95    pending_events: Arc<Mutex<InFlightEvents>>,
96    /// The next ID to assign to an event we receive.
97    next_event_index: usize,
98    /// The current virtual time.
99    current_time: Duration,
100    /// Fake clock that always returns the virtual time.
101    clock: near_time::FakeClock,
102    /// Shutdown flag. When this flag is true, delayed action runners will no
103    /// longer post any new events to the event loop.
104    shutting_down: Arc<AtomicBool>,
105    /// If present, a function to call to print something every time an event is
106    /// handled. Intended only for debugging.
107    every_event_callback: Option<Box<dyn FnMut(&TestLoopData)>>,
108    /// All events with this identifier are ignored in testloop execution environment.
109    denylisted_identifiers: HashSet<String>,
110}
111
112/// An event waiting to be executed, ordered by the due time and then by ID.
113struct EventInHeap {
114    event: CallbackEvent,
115    due: Duration,
116    id: usize,
117}
118
119impl PartialEq for EventInHeap {
120    fn eq(&self, other: &Self) -> bool {
121        self.due == other.due && self.id == other.id
122    }
123}
124
125impl Eq for EventInHeap {}
126
127impl PartialOrd for EventInHeap {
128    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129        Some(self.cmp(other))
130    }
131}
132
133impl Ord for EventInHeap {
134    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
135        (self.due, self.id).cmp(&(other.due, other.id)).reverse()
136    }
137}
138
139struct InFlightEvents {
140    events: Vec<CallbackEvent>,
141    /// The TestLoop thread ID. This and the following field are used to detect unintended
142    /// parallel processing.
143    event_loop_thread_id: std::thread::ThreadId,
144}
145
146impl InFlightEvents {
147    fn new() -> Self {
148        Self { events: Vec::new(), event_loop_thread_id: std::thread::current().id() }
149    }
150
151    fn add(&mut self, event: CallbackEvent) {
152        if std::thread::current().id() != self.event_loop_thread_id {
153            // Another thread shall not be sending an event while we're not handling an event.
154            // If that happens, it means we have a rogue thread spawned somewhere that has not been
155            // converted to TestLoop. TestLoop tests should be single-threaded (or at least, look
156            // as if it were single-threaded). So if we catch this, panic.
157            panic!(
158                "Event was sent from the wrong thread. TestLoop tests should be single-threaded. \
159                    Check if there's any code that spawns computation on another thread such as \
160                    rayon::spawn, and convert it to AsyncComputationSpawner or FutureSpawner. \
161                    Event: {}",
162                event.description
163            );
164        }
165        self.events.push(event);
166    }
167}
168
169/// The log output line that can be used to visualize the execution of a test.
170/// It is only used to serialize into JSON. This is enough data to reconstruct
171/// the event dependency graph, and to segment log messages.
172#[derive(Serialize)]
173struct EventStartLogOutput {
174    /// Index of the current event we're about to handle.
175    current_index: usize,
176    /// See `EventEndLogOutput::total_events`.
177    total_events: usize,
178    /// The identifier of the event, usually the node_id.
179    identifier: String,
180    /// The Debug representation of the event payload.
181    current_event: String,
182    /// The current virtual time.
183    current_time_ms: u64,
184    /// Whether this event is executed or not
185    event_ignored: bool,
186}
187
188#[derive(Serialize)]
189struct EventEndLogOutput {
190    /// The total number of events we have seen so far. This is combined with
191    /// `EventStartLogOutput::total_events` to determine which new events are
192    /// emitted by the current event's handler.
193    total_events: usize,
194}
195
196impl TestLoopV2 {
197    pub fn new() -> Self {
198        let pending_events = Arc::new(Mutex::new(InFlightEvents::new()));
199        let pending_events_clone = pending_events.clone();
200        let raw_pending_events_sender = RawPendingEventsSender::new(move |callback_event| {
201            let mut pending_events = pending_events_clone.lock();
202            pending_events.add(callback_event);
203        });
204        let shutting_down = Arc::new(AtomicBool::new(false));
205        // Needed for the log visualizer to know when the test loop starts.
206        tracing::info!(target: "test_loop", "TEST_LOOP_INIT");
207        Self {
208            data: TestLoopData::new(raw_pending_events_sender.clone(), shutting_down.clone()),
209            events: BinaryHeap::new(),
210            pending_events,
211            raw_pending_events_sender,
212            next_event_index: 0,
213            current_time: Duration::ZERO,
214            clock: FakeClock::default(),
215            shutting_down,
216            every_event_callback: None,
217            denylisted_identifiers: HashSet::new(),
218        }
219    }
220
221    /// Returns a FutureSpawner that can be used to spawn futures into the loop.
222    pub fn future_spawner(&self, identifier: &str) -> TestLoopFutureSpawner {
223        self.raw_pending_events_sender.for_identifier(identifier)
224    }
225
226    /// Returns an AsyncComputationSpawner that can be used to spawn async computation into the
227    /// loop. The `artificial_delay` allows the test to determine an artificial delay that the
228    /// computation should take, based on the name of the computation.
229    pub fn async_computation_spawner(
230        &self,
231        identifier: &str,
232        artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
233    ) -> TestLoopAsyncComputationSpawner {
234        TestLoopAsyncComputationSpawner::new(
235            self.raw_pending_events_sender.for_identifier(identifier),
236            artificial_delay,
237        )
238    }
239
240    /// Sends any ad-hoc event to the loop.
241    pub fn send_adhoc_event(
242        &self,
243        description: String,
244        callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
245    ) {
246        self.send_adhoc_event_with_delay(description, Duration::ZERO, callback)
247    }
248
249    /// Sends any ad-hoc event to the loop, after some delay.
250    pub fn send_adhoc_event_with_delay(
251        &self,
252        description: String,
253        delay: Duration,
254        callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
255    ) {
256        self.raw_pending_events_sender.for_identifier("Adhoc").send_with_delay(
257            description,
258            Box::new(callback),
259            delay,
260        );
261    }
262
263    /// This function is used to filter out all events that belong to a certain identifier.
264    /// The use case is while shutting down a node, we would like to not execute any more events from that node.
265    pub fn remove_events_with_identifier(&mut self, identifier: &str) {
266        self.denylisted_identifiers.insert(identifier.to_string());
267    }
268
269    /// Returns a clock that will always return the current virtual time.
270    pub fn clock(&self) -> Clock {
271        self.clock.clock()
272    }
273
274    pub fn set_every_event_callback(&mut self, callback: impl FnMut(&TestLoopData) + 'static) {
275        self.every_event_callback = Some(Box::new(callback));
276    }
277
278    /// Helper to push events we have just received into the heap.
279    fn queue_received_events(&mut self) {
280        for event in self.pending_events.lock().events.drain(..) {
281            self.events.push(EventInHeap {
282                due: self.current_time + event.delay,
283                id: self.next_event_index,
284                event,
285            });
286            self.next_event_index += 1;
287        }
288    }
289
290    /// Performs the logic to find the next event, advance to its time, and dequeue it.
291    /// Takes a decider to determine whether to advance time, handle the next event, and/or to stop.
292    fn advance_till_next_event(
293        &mut self,
294        decider: &mut impl FnMut(Option<Duration>, &mut TestLoopData) -> AdvanceDecision,
295    ) -> Option<EventInHeap> {
296        loop {
297            // New events may have been sent to the TestLoop from outside, and the previous
298            // iteration of the loop may have made new futures ready, so queue up any received
299            // events.
300            self.queue_received_events();
301
302            // Now there are two ways an event may be/become available. One is that the event is
303            // queued into the event loop at a specific time; the other is that some future is
304            // waiting on our fake clock to advance beyond a specific time. Pick the earliest.
305            let next_timestamp = {
306                let next_event_timestamp = self.events.peek().map(|event| event.due);
307                let next_future_waiter_timestamp = self
308                    .clock
309                    .first_waiter()
310                    .map(|time| time.signed_duration_since(self.clock.now() - self.current_time));
311                next_event_timestamp
312                    .map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1))
313                    .or(next_future_waiter_timestamp)
314            };
315            // If the next event is immediately available (i.e. its time is same as current time),
316            // just return that event; there's no decision to make (as we only give deciders a
317            // chance to stop processing if we would advance the clock) and no need to advance time.
318            if next_timestamp == Some(self.current_time) {
319                let event = self.events.pop().expect("Programming error in TestLoop");
320                assert_eq!(event.due, self.current_time);
321                return Some(event);
322            }
323            // If we reach this point, it means we need to advance the clock. Let the decider choose
324            // if we should do that, or if we should stop.
325            let decision = decider(next_timestamp, &mut self.data);
326            match decision {
327                AdvanceDecision::AdvanceToNextEvent => {
328                    let next_timestamp = next_timestamp.unwrap();
329                    self.clock.advance(next_timestamp - self.current_time);
330                    self.current_time = next_timestamp;
331                    // Run the loop again, because if the reason why we advance the clock to this
332                    // time is due to a possible future waiting on the clock, we may or may not get
333                    // another future queued into the TestLoop, so we just check the whole thing
334                    // again.
335                    continue;
336                }
337                AdvanceDecision::AdvanceToAndStop(target) => {
338                    self.clock.advance(target - self.current_time);
339                    self.current_time = target;
340                    return None;
341                }
342                AdvanceDecision::Stop => {
343                    return None;
344                }
345            }
346        }
347    }
348
349    /// Processes the given event, by logging a line first and then finding a handler to run it.
350    fn process_event(&mut self, event: EventInHeap) {
351        if self.shutting_down.load(Ordering::Relaxed) {
352            return;
353        }
354
355        let event_ignored = self.denylisted_identifiers.contains(&event.event.identifier);
356        let start_json = serde_json::to_string(&EventStartLogOutput {
357            current_index: event.id,
358            total_events: self.next_event_index,
359            identifier: event.event.identifier.clone(),
360            current_event: event.event.description,
361            current_time_ms: event.due.whole_milliseconds() as u64,
362            event_ignored,
363        })
364        .unwrap();
365        tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json);
366        assert_eq!(self.current_time, event.due);
367
368        if !event_ignored {
369            if let Some(callback) = &mut self.every_event_callback {
370                callback(&self.data);
371            }
372
373            let callback = event.event.callback;
374            callback(&mut self.data);
375        }
376
377        // Push any new events into the queue. Do this before emitting the end log line,
378        // so that it contains the correct new total number of events.
379        self.queue_received_events();
380        let end_json =
381            serde_json::to_string(&EventEndLogOutput { total_events: self.next_event_index })
382                .unwrap();
383        tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_END {}", end_json);
384    }
385
386    /// Runs the test loop for the given duration. This function may be called
387    /// multiple times, but further test handlers may not be registered after
388    /// the first call.
389    pub fn run_for(&mut self, duration: Duration) {
390        let deadline = self.current_time + duration;
391        while let Some(event) = self.advance_till_next_event(&mut |next_time, _| {
392            if let Some(next_time) = next_time {
393                if next_time <= deadline {
394                    return AdvanceDecision::AdvanceToNextEvent;
395                }
396            }
397            AdvanceDecision::AdvanceToAndStop(deadline)
398        }) {
399            self.process_event(event);
400        }
401    }
402
403    /// Run until the given condition is true, asserting that it happens before the maximum duration
404    /// is reached.
405    ///
406    /// To maximize logical consistency, the condition is only checked before the clock would
407    /// advance. If it returns true, execution stops before advancing the clock.
408    pub fn run_until(
409        &mut self,
410        mut condition: impl FnMut(&mut TestLoopData) -> bool,
411        maximum_duration: Duration,
412    ) {
413        let deadline = self.current_time + maximum_duration;
414        let mut decider = move |next_time, data: &mut TestLoopData| {
415            if condition(data) {
416                return AdvanceDecision::Stop;
417            }
418            if let Some(next_time) = next_time {
419                if next_time <= deadline {
420                    return AdvanceDecision::AdvanceToNextEvent;
421                }
422            }
423            panic!("run_until did not fulfill the condition within the given deadline");
424        };
425        while let Some(event) = self.advance_till_next_event(&mut decider) {
426            self.process_event(event);
427        }
428    }
429
430    pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) {
431        self.shutting_down.store(true, Ordering::Relaxed);
432        self.run_for(maximum_duration);
433        // Implicitly dropped here, which asserts that no more events are remaining.
434    }
435
436    pub fn run_instant(&mut self) {
437        self.run_for(Duration::ZERO);
438    }
439}
440
441impl Drop for TestLoopV2 {
442    fn drop(&mut self) {
443        self.queue_received_events();
444        if let Some(event) = self.events.pop() {
445            // Drop any references that may be held by the event callbacks. This can help
446            // with destruction of the data.
447            self.events.clear();
448            if !panicking() {
449                panic!(
450                    "Event scheduled at {} is not handled at the end of the test: {}.
451                     Consider calling `test.shutdown_and_drain_remaining_events(...)`.",
452                    event.due, event.event.description
453                );
454            }
455        }
456        // Needed for the log visualizer to know when the test loop ends.
457        tracing::info!(target: "test_loop", "TEST_LOOP_SHUTDOWN");
458    }
459}
460
461enum AdvanceDecision {
462    AdvanceToNextEvent,
463    AdvanceToAndStop(Duration),
464    Stop,
465}
466
467#[cfg(test)]
468mod tests {
469    use crate::futures::FutureSpawnerExt;
470    use crate::test_loop::TestLoopV2;
471    use std::sync::Arc;
472    use std::sync::atomic::{AtomicUsize, Ordering};
473    use time::Duration;
474
475    // Tests that the TestLoop correctly handles futures that sleep on the fake clock.
476    #[test]
477    fn test_futures() {
478        let mut test_loop = TestLoopV2::new();
479        let clock = test_loop.clock();
480        let start_time = clock.now();
481
482        let finished = Arc::new(AtomicUsize::new(0));
483
484        let clock1 = clock.clone();
485        let finished1 = finished.clone();
486        test_loop.future_spawner("adhoc future spawner").spawn("test1", async move {
487            assert_eq!(clock1.now(), start_time);
488            clock1.sleep(Duration::seconds(10)).await;
489            assert_eq!(clock1.now(), start_time + Duration::seconds(10));
490            clock1.sleep(Duration::seconds(5)).await;
491            assert_eq!(clock1.now(), start_time + Duration::seconds(15));
492            finished1.fetch_add(1, Ordering::Relaxed);
493        });
494
495        test_loop.run_for(Duration::seconds(2));
496
497        let clock2 = clock;
498        let finished2 = finished.clone();
499        test_loop.future_spawner("adhoc future spawner").spawn("test2", async move {
500            assert_eq!(clock2.now(), start_time + Duration::seconds(2));
501            clock2.sleep(Duration::seconds(3)).await;
502            assert_eq!(clock2.now(), start_time + Duration::seconds(5));
503            clock2.sleep(Duration::seconds(20)).await;
504            assert_eq!(clock2.now(), start_time + Duration::seconds(25));
505            finished2.fetch_add(1, Ordering::Relaxed);
506        });
507        // During these 30 virtual seconds, the TestLoop should've automatically advanced the clock
508        // to wake each future as they become ready to run again. The code inside the futures
509        // assert that the fake clock does indeed have the expected times.
510        test_loop.run_for(Duration::seconds(30));
511        assert_eq!(finished.load(Ordering::Relaxed), 2);
512    }
513}