near_async/test_loop/mod.rs
1//! This is a framework to test async code in a way that is versatile, deterministic,
2//! easy-to-setup, and easy-to-debug.
3//!
4//! The primary concept here is an event loop that the test framework controls. The
5//! event loop acts as a central hub for all messages, including actor messages,
6//! network messages, timers, etc. Business logic is only executed as a response to
7//! such events.
8//!
9//! This brings several major benefits:
10//! - Ease of setup:
11//! - There is no need to set up mock objects that implement some
12//! message sender interface, instead, the test loop provides a sender object that
13//! can be used to send messages to the event loop. For example, suppose we were
14//! to make a Client whose constructor requires a shards_manager adapter; instead
15//! of having to make a mock for the shards_manager adapter, we can simply register
16//! the shards_manager actor with testloop and pass in its sender.
17//! - Compared to writing synchronous tests, there is no need to manually deliver
18//! network messages or handle actor messages at certain points of the test. Instead,
19//! the event loop will invoke the appropriate event handlers whenever there is any
20//! event remaining in the event loop. This ensures that no messages are ever missed.
21//! - Test setup code can be modular and reusable, because the test specification
22//! consists entirely of registering the data and actors. Rather than passing a giant
23//! callback into a giant setup(...) function to customize one part of a huge
24//! integration test, we can flexibly compose specific modules with event handlers.
25//! For example, we may add an event handler to route all ShardsManager-related network
26//! messages reliably, and at the same time another event handler to drop 50% of Block
27//! network messages. Also, we can use an event handler as long as it is relevant for a
28//! test (i.e. a ForwardShardsManagerRequest event handler can be used as long as the
29//! test involves ShardsManagers), regardless of the exact architecture of the test.
30//!
31//! - Debuggability:
32//! - Because ALL execution is in response of events, the whole test can be cleanly
33//! segmented into the response to each event. The framework automatically outputs
34//! a log message at the beginning of each event execution, so that the log output
35//! can be loaded into a visualizer to show the exact sequence of events, their
36//! relationship, the exact contents of the event messages, and the log output
37//! during the handling of each event. This is especially useful when debugging
38//! multi-instance tests.
39//!
40//! - Determinism:
41//! - Many tests, especially those that involve multiple instances, are most easily
42//! written by spawning actual actors and threads. This however makes the tests
43//! inherently asynchronous and may be more flaky.
44//! - The test loop framework also provides a synchronous and deterministic way to
45//! invoke timers without waiting for the actual duration. This makes tests run
46//! much faster than asynchronous tests.
47//!
48//! - Versatility:
49//! - A test can be constructed with any combination of components. The framework does
50//! not dictate what components should exist, or how many instances there should be.
51//! This allows for both small and focused tests, and large multi-instance tests.
52//! - Timed tests can be written to check the theoretical performance of certain tasks,
53//! such as distributing chunks to other nodes within X milliseconds provided that
54//! network messages have a 10ms delay.
55//! - The framework does not require major migrations to existing code, e.g. it is
56//! compatible with the actor framework and futures.
57//!
58//! A note on the order of execution of the events: all events that are due at the same
59//! timestamp are executed in FIFO order. For example, if the events are emitted in the
60//! following order: (A due 100ms), (B due 0ms), (C due 200ms), (D due 0ms), (E due 100ms)
61//! then the actual order of execution is B, D, A, E, C.
62pub mod data;
63pub mod futures;
64pub mod pending_events_sender;
65pub mod sender;
66
67use data::TestLoopData;
68use futures::{TestLoopAsyncComputationSpawner, TestLoopFutureSpawner};
69use near_time::{Clock, Duration, FakeClock};
70use parking_lot::Mutex;
71use pending_events_sender::{CallbackEvent, PendingEventsSender, RawPendingEventsSender};
72use serde::Serialize;
73use std::collections::{BinaryHeap, HashSet};
74use std::sync::Arc;
75use std::sync::atomic::{AtomicBool, Ordering};
76use std::thread::panicking;
77use time::ext::InstantExt;
78
79/// Main struct for the Test Loop framework.
80/// The `TestLoopData` should contain all the business logic state that is relevant
81/// to the test. All possible `Event` that are sent to the event loop are callbacks.
82/// See TestLoopData for mode details.
83///
84/// Events are sent to the testloop, with a possible delay, via the pending_events_sender.
85pub struct TestLoopV2 {
86 /// The data that is stored and accessed by the test loop.
87 pub data: TestLoopData,
88 /// The sender is used to send events to the event loop.
89 raw_pending_events_sender: RawPendingEventsSender,
90 /// The events that are yet to be handled. They are kept in a heap so that
91 /// events that shall execute earlier (by our own virtual clock) are popped
92 /// first.
93 events: BinaryHeap<EventInHeap>,
94 /// The events that will enter the events heap upon the next iteration.
95 pending_events: Arc<Mutex<InFlightEvents>>,
96 /// The next ID to assign to an event we receive.
97 next_event_index: usize,
98 /// The current virtual time.
99 current_time: Duration,
100 /// Fake clock that always returns the virtual time.
101 clock: near_time::FakeClock,
102 /// Shutdown flag. When this flag is true, delayed action runners will no
103 /// longer post any new events to the event loop.
104 shutting_down: Arc<AtomicBool>,
105 /// If present, a function to call to print something every time an event is
106 /// handled. Intended only for debugging.
107 every_event_callback: Option<Box<dyn FnMut(&TestLoopData)>>,
108 /// All events with this identifier are ignored in testloop execution environment.
109 denylisted_identifiers: HashSet<String>,
110}
111
112/// An event waiting to be executed, ordered by the due time and then by ID.
113struct EventInHeap {
114 event: CallbackEvent,
115 due: Duration,
116 id: usize,
117}
118
119impl PartialEq for EventInHeap {
120 fn eq(&self, other: &Self) -> bool {
121 self.due == other.due && self.id == other.id
122 }
123}
124
125impl Eq for EventInHeap {}
126
127impl PartialOrd for EventInHeap {
128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129 Some(self.cmp(other))
130 }
131}
132
133impl Ord for EventInHeap {
134 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
135 (self.due, self.id).cmp(&(other.due, other.id)).reverse()
136 }
137}
138
139struct InFlightEvents {
140 events: Vec<CallbackEvent>,
141 /// The TestLoop thread ID. This and the following field are used to detect unintended
142 /// parallel processing.
143 event_loop_thread_id: std::thread::ThreadId,
144}
145
146impl InFlightEvents {
147 fn new() -> Self {
148 Self { events: Vec::new(), event_loop_thread_id: std::thread::current().id() }
149 }
150
151 fn add(&mut self, event: CallbackEvent) {
152 if std::thread::current().id() != self.event_loop_thread_id {
153 // Another thread shall not be sending an event while we're not handling an event.
154 // If that happens, it means we have a rogue thread spawned somewhere that has not been
155 // converted to TestLoop. TestLoop tests should be single-threaded (or at least, look
156 // as if it were single-threaded). So if we catch this, panic.
157 panic!(
158 "Event was sent from the wrong thread. TestLoop tests should be single-threaded. \
159 Check if there's any code that spawns computation on another thread such as \
160 rayon::spawn, and convert it to AsyncComputationSpawner or FutureSpawner. \
161 Event: {}",
162 event.description
163 );
164 }
165 self.events.push(event);
166 }
167}
168
169/// The log output line that can be used to visualize the execution of a test.
170/// It is only used to serialize into JSON. This is enough data to reconstruct
171/// the event dependency graph, and to segment log messages.
172#[derive(Serialize)]
173struct EventStartLogOutput {
174 /// Index of the current event we're about to handle.
175 current_index: usize,
176 /// See `EventEndLogOutput::total_events`.
177 total_events: usize,
178 /// The identifier of the event, usually the node_id.
179 identifier: String,
180 /// The Debug representation of the event payload.
181 current_event: String,
182 /// The current virtual time.
183 current_time_ms: u64,
184 /// Whether this event is executed or not
185 event_ignored: bool,
186}
187
188#[derive(Serialize)]
189struct EventEndLogOutput {
190 /// The total number of events we have seen so far. This is combined with
191 /// `EventStartLogOutput::total_events` to determine which new events are
192 /// emitted by the current event's handler.
193 total_events: usize,
194}
195
196impl TestLoopV2 {
197 pub fn new() -> Self {
198 let pending_events = Arc::new(Mutex::new(InFlightEvents::new()));
199 let pending_events_clone = pending_events.clone();
200 let raw_pending_events_sender = RawPendingEventsSender::new(move |callback_event| {
201 let mut pending_events = pending_events_clone.lock();
202 pending_events.add(callback_event);
203 });
204 let shutting_down = Arc::new(AtomicBool::new(false));
205 // Needed for the log visualizer to know when the test loop starts.
206 tracing::info!(target: "test_loop", "TEST_LOOP_INIT");
207 Self {
208 data: TestLoopData::new(raw_pending_events_sender.clone(), shutting_down.clone()),
209 events: BinaryHeap::new(),
210 pending_events,
211 raw_pending_events_sender,
212 next_event_index: 0,
213 current_time: Duration::ZERO,
214 clock: FakeClock::default(),
215 shutting_down,
216 every_event_callback: None,
217 denylisted_identifiers: HashSet::new(),
218 }
219 }
220
221 /// Returns a FutureSpawner that can be used to spawn futures into the loop.
222 pub fn future_spawner(&self, identifier: &str) -> TestLoopFutureSpawner {
223 self.raw_pending_events_sender.for_identifier(identifier)
224 }
225
226 /// Returns an AsyncComputationSpawner that can be used to spawn async computation into the
227 /// loop. The `artificial_delay` allows the test to determine an artificial delay that the
228 /// computation should take, based on the name of the computation.
229 pub fn async_computation_spawner(
230 &self,
231 identifier: &str,
232 artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
233 ) -> TestLoopAsyncComputationSpawner {
234 TestLoopAsyncComputationSpawner::new(
235 self.raw_pending_events_sender.for_identifier(identifier),
236 artificial_delay,
237 )
238 }
239
240 /// Sends any ad-hoc event to the loop.
241 pub fn send_adhoc_event(
242 &self,
243 description: String,
244 callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
245 ) {
246 self.send_adhoc_event_with_delay(description, Duration::ZERO, callback)
247 }
248
249 /// Sends any ad-hoc event to the loop, after some delay.
250 pub fn send_adhoc_event_with_delay(
251 &self,
252 description: String,
253 delay: Duration,
254 callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
255 ) {
256 self.raw_pending_events_sender.for_identifier("Adhoc").send_with_delay(
257 description,
258 Box::new(callback),
259 delay,
260 );
261 }
262
263 /// This function is used to filter out all events that belong to a certain identifier.
264 /// The use case is while shutting down a node, we would like to not execute any more events from that node.
265 pub fn remove_events_with_identifier(&mut self, identifier: &str) {
266 self.denylisted_identifiers.insert(identifier.to_string());
267 }
268
269 /// Returns a clock that will always return the current virtual time.
270 pub fn clock(&self) -> Clock {
271 self.clock.clock()
272 }
273
274 pub fn set_every_event_callback(&mut self, callback: impl FnMut(&TestLoopData) + 'static) {
275 self.every_event_callback = Some(Box::new(callback));
276 }
277
278 /// Helper to push events we have just received into the heap.
279 fn queue_received_events(&mut self) {
280 for event in self.pending_events.lock().events.drain(..) {
281 self.events.push(EventInHeap {
282 due: self.current_time + event.delay,
283 id: self.next_event_index,
284 event,
285 });
286 self.next_event_index += 1;
287 }
288 }
289
290 /// Performs the logic to find the next event, advance to its time, and dequeue it.
291 /// Takes a decider to determine whether to advance time, handle the next event, and/or to stop.
292 fn advance_till_next_event(
293 &mut self,
294 decider: &mut impl FnMut(Option<Duration>, &mut TestLoopData) -> AdvanceDecision,
295 ) -> Option<EventInHeap> {
296 loop {
297 // New events may have been sent to the TestLoop from outside, and the previous
298 // iteration of the loop may have made new futures ready, so queue up any received
299 // events.
300 self.queue_received_events();
301
302 // Now there are two ways an event may be/become available. One is that the event is
303 // queued into the event loop at a specific time; the other is that some future is
304 // waiting on our fake clock to advance beyond a specific time. Pick the earliest.
305 let next_timestamp = {
306 let next_event_timestamp = self.events.peek().map(|event| event.due);
307 let next_future_waiter_timestamp = self
308 .clock
309 .first_waiter()
310 .map(|time| time.signed_duration_since(self.clock.now() - self.current_time));
311 next_event_timestamp
312 .map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1))
313 .or(next_future_waiter_timestamp)
314 };
315 // If the next event is immediately available (i.e. its time is same as current time),
316 // just return that event; there's no decision to make (as we only give deciders a
317 // chance to stop processing if we would advance the clock) and no need to advance time.
318 if next_timestamp == Some(self.current_time) {
319 let event = self.events.pop().expect("Programming error in TestLoop");
320 assert_eq!(event.due, self.current_time);
321 return Some(event);
322 }
323 // If we reach this point, it means we need to advance the clock. Let the decider choose
324 // if we should do that, or if we should stop.
325 let decision = decider(next_timestamp, &mut self.data);
326 match decision {
327 AdvanceDecision::AdvanceToNextEvent => {
328 let next_timestamp = next_timestamp.unwrap();
329 self.clock.advance(next_timestamp - self.current_time);
330 self.current_time = next_timestamp;
331 // Run the loop again, because if the reason why we advance the clock to this
332 // time is due to a possible future waiting on the clock, we may or may not get
333 // another future queued into the TestLoop, so we just check the whole thing
334 // again.
335 continue;
336 }
337 AdvanceDecision::AdvanceToAndStop(target) => {
338 self.clock.advance(target - self.current_time);
339 self.current_time = target;
340 return None;
341 }
342 AdvanceDecision::Stop => {
343 return None;
344 }
345 }
346 }
347 }
348
349 /// Processes the given event, by logging a line first and then finding a handler to run it.
350 fn process_event(&mut self, event: EventInHeap) {
351 if self.shutting_down.load(Ordering::Relaxed) {
352 return;
353 }
354
355 let event_ignored = self.denylisted_identifiers.contains(&event.event.identifier);
356 let start_json = serde_json::to_string(&EventStartLogOutput {
357 current_index: event.id,
358 total_events: self.next_event_index,
359 identifier: event.event.identifier.clone(),
360 current_event: event.event.description,
361 current_time_ms: event.due.whole_milliseconds() as u64,
362 event_ignored,
363 })
364 .unwrap();
365 tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json);
366 assert_eq!(self.current_time, event.due);
367
368 if !event_ignored {
369 if let Some(callback) = &mut self.every_event_callback {
370 callback(&self.data);
371 }
372
373 let callback = event.event.callback;
374 callback(&mut self.data);
375 }
376
377 // Push any new events into the queue. Do this before emitting the end log line,
378 // so that it contains the correct new total number of events.
379 self.queue_received_events();
380 let end_json =
381 serde_json::to_string(&EventEndLogOutput { total_events: self.next_event_index })
382 .unwrap();
383 tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_END {}", end_json);
384 }
385
386 /// Runs the test loop for the given duration. This function may be called
387 /// multiple times, but further test handlers may not be registered after
388 /// the first call.
389 pub fn run_for(&mut self, duration: Duration) {
390 let deadline = self.current_time + duration;
391 while let Some(event) = self.advance_till_next_event(&mut |next_time, _| {
392 if let Some(next_time) = next_time {
393 if next_time <= deadline {
394 return AdvanceDecision::AdvanceToNextEvent;
395 }
396 }
397 AdvanceDecision::AdvanceToAndStop(deadline)
398 }) {
399 self.process_event(event);
400 }
401 }
402
403 /// Run until the given condition is true, asserting that it happens before the maximum duration
404 /// is reached.
405 ///
406 /// To maximize logical consistency, the condition is only checked before the clock would
407 /// advance. If it returns true, execution stops before advancing the clock.
408 pub fn run_until(
409 &mut self,
410 mut condition: impl FnMut(&mut TestLoopData) -> bool,
411 maximum_duration: Duration,
412 ) {
413 let deadline = self.current_time + maximum_duration;
414 let mut decider = move |next_time, data: &mut TestLoopData| {
415 if condition(data) {
416 return AdvanceDecision::Stop;
417 }
418 if let Some(next_time) = next_time {
419 if next_time <= deadline {
420 return AdvanceDecision::AdvanceToNextEvent;
421 }
422 }
423 panic!("run_until did not fulfill the condition within the given deadline");
424 };
425 while let Some(event) = self.advance_till_next_event(&mut decider) {
426 self.process_event(event);
427 }
428 }
429
430 pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) {
431 self.shutting_down.store(true, Ordering::Relaxed);
432 self.run_for(maximum_duration);
433 // Implicitly dropped here, which asserts that no more events are remaining.
434 }
435
436 pub fn run_instant(&mut self) {
437 self.run_for(Duration::ZERO);
438 }
439}
440
441impl Drop for TestLoopV2 {
442 fn drop(&mut self) {
443 self.queue_received_events();
444 if let Some(event) = self.events.pop() {
445 // Drop any references that may be held by the event callbacks. This can help
446 // with destruction of the data.
447 self.events.clear();
448 if !panicking() {
449 panic!(
450 "Event scheduled at {} is not handled at the end of the test: {}.
451 Consider calling `test.shutdown_and_drain_remaining_events(...)`.",
452 event.due, event.event.description
453 );
454 }
455 }
456 // Needed for the log visualizer to know when the test loop ends.
457 tracing::info!(target: "test_loop", "TEST_LOOP_SHUTDOWN");
458 }
459}
460
461enum AdvanceDecision {
462 AdvanceToNextEvent,
463 AdvanceToAndStop(Duration),
464 Stop,
465}
466
467#[cfg(test)]
468mod tests {
469 use crate::futures::FutureSpawnerExt;
470 use crate::test_loop::TestLoopV2;
471 use std::sync::Arc;
472 use std::sync::atomic::{AtomicUsize, Ordering};
473 use time::Duration;
474
475 // Tests that the TestLoop correctly handles futures that sleep on the fake clock.
476 #[test]
477 fn test_futures() {
478 let mut test_loop = TestLoopV2::new();
479 let clock = test_loop.clock();
480 let start_time = clock.now();
481
482 let finished = Arc::new(AtomicUsize::new(0));
483
484 let clock1 = clock.clone();
485 let finished1 = finished.clone();
486 test_loop.future_spawner("adhoc future spawner").spawn("test1", async move {
487 assert_eq!(clock1.now(), start_time);
488 clock1.sleep(Duration::seconds(10)).await;
489 assert_eq!(clock1.now(), start_time + Duration::seconds(10));
490 clock1.sleep(Duration::seconds(5)).await;
491 assert_eq!(clock1.now(), start_time + Duration::seconds(15));
492 finished1.fetch_add(1, Ordering::Relaxed);
493 });
494
495 test_loop.run_for(Duration::seconds(2));
496
497 let clock2 = clock;
498 let finished2 = finished.clone();
499 test_loop.future_spawner("adhoc future spawner").spawn("test2", async move {
500 assert_eq!(clock2.now(), start_time + Duration::seconds(2));
501 clock2.sleep(Duration::seconds(3)).await;
502 assert_eq!(clock2.now(), start_time + Duration::seconds(5));
503 clock2.sleep(Duration::seconds(20)).await;
504 assert_eq!(clock2.now(), start_time + Duration::seconds(25));
505 finished2.fetch_add(1, Ordering::Relaxed);
506 });
507 // During these 30 virtual seconds, the TestLoop should've automatically advanced the clock
508 // to wake each future as they become ready to run again. The code inside the futures
509 // assert that the fake clock does indeed have the expected times.
510 test_loop.run_for(Duration::seconds(30));
511 assert_eq!(finished.load(Ordering::Relaxed), 2);
512 }
513}