near_async/test_loop.rs
1//! This is a framework to test async code in a way that is versatile, deterministic,
2//! easy-to-setup, and easy-to-debug.
3//!
4//! The primary concept here is an event loop that the test framework controls. The
5//! event loop acts as a central hub for all messages, including Actix messages,
6//! network messages, timers, etc. Business logic is only executed as a response to
7//! such events.
8//!
9//! This brings several major benefits:
10//! - Ease of setup:
11//! - There is no need to set up mock objects that implement some
12//! message sender interface, instead, the test loop provides a sender object that
13//! can be used to send messages to the event loop. For example, suppose we were
14//! to make a Client whose constructor requires a shards_manager adapter; instead
15//! of having to make a mock for the shards_manager adapter, we can simply register
16//! the shards_manager actor with testloop and pass in its sender.
17//! - Compared to writing synchronous tests, there is no need to manually deliver
18//! network messages or handle actix messages at certain points of the test. Instead,
19//! the event loop will invoke the appropriate event handlers whenever there is any
20//! event remaining in the event loop. This ensures that no messages are ever missed.
21//! - Test setup code can be modular and reusable, because the test specification
22//! consists entirely of registering the data and actors. Rather than passing a giant
23//! callback into a giant setup(...) function to customize one part of a huge
24//! integration test, we can flexibly compose specific modules with event handlers.
25//! For example, we may add an event handler to route all ShardsManager-related network
26//! messages reliably, and at the same time another event handler to drop 50% of Block
27//! network messages. Also, we can use an event handler as long as it is relevant for a
28//! test (i.e. a ForwardShardsManagerRequest event handler can be used as long as the
29//! test involves ShardsManagers), regardless of the exact architecture of the test.
30//!
31//! - Debuggability:
32//! - Because ALL execution is in response of events, the whole test can be cleanly
33//! segmented into the response to each event. The framework automatically outputs
34//! a log message at the beginning of each event execution, so that the log output
35//! can be loaded into a visualizer to show the exact sequence of events, their
36//! relationship, the exact contents of the event messages, and the log output
37//! during the handling of each event. This is especially useful when debugging
38//! multi-instance tests.
39//!
40//! - Determinism:
41//! - Many tests, especially those that involve multiple instances, are most easily
42//! written by spawning actual actors and threads. This however makes the tests
43//! inherently asynchronous and may be more flaky.
44//! - The test loop framework also provides a synchronous and deterministic way to
45//! invoke timers without waiting for the actual duration. This makes tests run
46//! much faster than asynchronous tests.
47//!
48//! - Versatility:
49//! - A test can be constructed with any combination of components. The framework does
50//! not dictate what components should exist, or how many instances there should be.
51//! This allows for both small and focused tests, and large multi-instance tests.
52//! - Timed tests can be written to check the theoretical performance of certain tasks,
53//! such as distributing chunks to other nodes within X milliseconds provided that
54//! network messages have a 10ms delay.
55//! - The framework does not require major migrations to existing code, e.g. it is
56//! compatible with the Actix framework and futures.
57//!
58//! A note on the order of execution of the events: all events that are due at the same
59//! timestamp are executed in FIFO order. For example, if the events are emitted in the
60//! following order: (A due 100ms), (B due 0ms), (C due 200ms), (D due 0ms), (E due 100ms)
61//! then the actual order of execution is B, D, A, E, C.
62pub mod data;
63pub mod futures;
64pub mod pending_events_sender;
65pub mod sender;
66
67use data::TestLoopData;
68use futures::{TestLoopAsyncComputationSpawner, TestLoopFutureSpawner};
69use near_time::{Clock, Duration, FakeClock};
70use pending_events_sender::{CallbackEvent, PendingEventsSender};
71use sender::TestLoopSender;
72use serde::Serialize;
73use std::collections::BinaryHeap;
74use std::sync::atomic::{AtomicBool, Ordering};
75use std::sync::Arc;
76use std::sync::Mutex;
77use time::ext::InstantExt;
78
79use crate::messaging::{Actor, LateBoundSender};
80
81/// Main struct for the Test Loop framework.
82/// The `TestLoopData` should contain all the business logic state that is relevant
83/// to the test. All possible `Event` that are sent to the event loop are callbacks.
84/// See TestLoopData for mode details.
85///
86/// Events are sent to the testloop, with a possible delay, via the pending_events_sender.
87pub struct TestLoopV2 {
88 /// The data that is stored and accessed by the test loop.
89 pub data: TestLoopData,
90 /// The sender is used to send events to the event loop.
91 pending_events_sender: PendingEventsSender,
92 /// The events that are yet to be handled. They are kept in a heap so that
93 /// events that shall execute earlier (by our own virtual clock) are popped
94 /// first.
95 events: BinaryHeap<EventInHeap>,
96 /// The events that will enter the events heap upon the next iteration.
97 pending_events: Arc<Mutex<InFlightEvents>>,
98 /// The next ID to assign to an event we receive.
99 next_event_index: usize,
100 /// The current virtual time.
101 pub current_time: Duration,
102 /// Fake clock that always returns the virtual time.
103 clock: near_time::FakeClock,
104 /// Shutdown flag. When this flag is true, delayed action runners will no
105 /// longer post any new events to the event loop.
106 shutting_down: Arc<AtomicBool>,
107 /// If present, a function to call to print something every time an event is
108 /// handled. Intended only for debugging.
109 every_event_callback: Option<Box<dyn FnMut(&TestLoopData)>>,
110}
111
112/// An event waiting to be executed, ordered by the due time and then by ID.
113struct EventInHeap {
114 event: CallbackEvent,
115 due: Duration,
116 id: usize,
117}
118
119impl PartialEq for EventInHeap {
120 fn eq(&self, other: &Self) -> bool {
121 self.due == other.due && self.id == other.id
122 }
123}
124
125impl Eq for EventInHeap {}
126
127impl PartialOrd for EventInHeap {
128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129 Some(self.cmp(other))
130 }
131}
132
133impl Ord for EventInHeap {
134 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
135 (self.due, self.id).cmp(&(other.due, other.id)).reverse()
136 }
137}
138
139struct InFlightEvents {
140 events: Vec<CallbackEvent>,
141 /// The TestLoop thread ID. This and the following field are used to detect unintended
142 /// parallel processing.
143 event_loop_thread_id: std::thread::ThreadId,
144 /// Whether we're currently handling an event.
145 is_handling_event: bool,
146}
147
148impl InFlightEvents {
149 fn add(&mut self, event: CallbackEvent) {
150 if !self.is_handling_event && std::thread::current().id() != self.event_loop_thread_id {
151 // Another thread shall not be sending an event while we're not handling an event.
152 // If that happens, it means we have a rogue thread spawned somewhere that has not been
153 // converted to TestLoop. TestLoop tests should be single-threaded (or at least, look
154 // as if it were single-threaded). So if we catch this, panic.
155 panic!(
156 "Event was sent from the wrong thread. TestLoop tests should be single-threaded. \
157 Check if there's any code that spawns computation on another thread such as \
158 rayon::spawn, and convert it to AsyncComputationSpawner or FutureSpawner. \
159 Event: {}",
160 event.description
161 );
162 }
163 self.events.push(event);
164 }
165}
166
167/// The log output line that can be used to visualize the execution of a test.
168/// It is only used to serialize into JSON. This is enough data to reconstruct
169/// the event dependency graph, and to segment log messages.
170#[derive(Serialize)]
171struct EventStartLogOutput {
172 /// Index of the current event we're about to handle.
173 current_index: usize,
174 /// See `EventEndLogOutput::total_events`.
175 total_events: usize,
176 /// The Debug representation of the event payload.
177 current_event: String,
178 /// The current virtual time.
179 current_time_ms: u64,
180}
181
182#[derive(Serialize)]
183struct EventEndLogOutput {
184 /// The total number of events we have seen so far. This is combined with
185 /// `EventStartLogOutput::total_events` to determine which new events are
186 /// emitted by the current event's handler.
187 total_events: usize,
188}
189
190impl TestLoopV2 {
191 pub fn new() -> Self {
192 let pending_events = Arc::new(Mutex::new(InFlightEvents {
193 events: Vec::new(),
194 event_loop_thread_id: std::thread::current().id(),
195 is_handling_event: false,
196 }));
197 let pending_events_clone = pending_events.clone();
198 let pending_events_sender = PendingEventsSender::new(move |callback_event| {
199 let mut pending_events = pending_events_clone.lock().unwrap();
200 pending_events.add(callback_event);
201 });
202 let shutting_down = Arc::new(AtomicBool::new(false));
203 // Needed for the log visualizer to know when the test loop starts.
204 tracing::info!(target: "test_loop", "TEST_LOOP_INIT");
205 Self {
206 data: TestLoopData::new(pending_events_sender.clone(), shutting_down.clone()),
207 events: BinaryHeap::new(),
208 pending_events,
209 pending_events_sender,
210 next_event_index: 0,
211 current_time: Duration::ZERO,
212 clock: FakeClock::default(),
213 shutting_down,
214 every_event_callback: None,
215 }
216 }
217
218 /// Returns a FutureSpawner that can be used to spawn futures into the loop.
219 pub fn future_spawner(&self) -> TestLoopFutureSpawner {
220 self.pending_events_sender.clone()
221 }
222
223 /// Returns an AsyncComputationSpawner that can be used to spawn async computation into the
224 /// loop. The `artificial_delay` allows the test to determine an artificial delay that the
225 /// computation should take, based on the name of the computation.
226 pub fn async_computation_spawner(
227 &self,
228 artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
229 ) -> TestLoopAsyncComputationSpawner {
230 TestLoopAsyncComputationSpawner::new(self.pending_events_sender.clone(), artificial_delay)
231 }
232
233 /// Returns a sender that can be used anywhere to send events to the loop.
234 pub fn sender(&self) -> PendingEventsSender {
235 self.pending_events_sender.clone()
236 }
237
238 /// Sends any ad-hoc event to the loop.
239 pub fn send_adhoc_event(
240 &self,
241 description: String,
242 callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
243 ) {
244 self.pending_events_sender.send(format!("Adhoc({})", description), Box::new(callback));
245 }
246
247 /// Sends any ad-hoc event to the loop, after some delay.
248 pub fn send_adhoc_event_with_delay(
249 &self,
250 description: String,
251 delay: Duration,
252 callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
253 ) {
254 self.pending_events_sender.send_with_delay(
255 format!("Adhoc({})", description),
256 Box::new(callback),
257 delay,
258 );
259 }
260
261 /// Returns a clock that will always return the current virtual time.
262 pub fn clock(&self) -> Clock {
263 self.clock.clock()
264 }
265
266 pub fn register_actor<A>(
267 &mut self,
268 actor: A,
269 adapter: Option<Arc<LateBoundSender<TestLoopSender<A>>>>,
270 ) -> TestLoopSender<A>
271 where
272 A: Actor + 'static,
273 {
274 self.data.register_actor_for_index(0, actor, adapter)
275 }
276
277 pub fn register_actor_for_index<A>(
278 &mut self,
279 index: usize,
280 actor: A,
281 adapter: Option<Arc<LateBoundSender<TestLoopSender<A>>>>,
282 ) -> TestLoopSender<A>
283 where
284 A: Actor + 'static,
285 {
286 self.data.register_actor_for_index(index, actor, adapter)
287 }
288
289 pub fn set_every_event_callback(&mut self, callback: impl FnMut(&TestLoopData) + 'static) {
290 self.every_event_callback = Some(Box::new(callback));
291 }
292
293 /// Helper to push events we have just received into the heap.
294 fn queue_received_events(&mut self) {
295 for event in self.pending_events.lock().unwrap().events.drain(..) {
296 self.events.push(EventInHeap {
297 due: self.current_time + event.delay,
298 id: self.next_event_index,
299 event,
300 });
301 self.next_event_index += 1;
302 }
303 }
304
305 /// Performs the logic to find the next event, advance to its time, and dequeue it.
306 /// Takes a decider to determine whether to advance time, handle the next event, and/or to stop.
307 fn advance_till_next_event(
308 &mut self,
309 decider: &mut impl FnMut(Option<Duration>, &mut TestLoopData) -> AdvanceDecision,
310 ) -> Option<EventInHeap> {
311 loop {
312 // New events may have been sent to the TestLoop from outside, and the previous
313 // iteration of the loop may have made new futures ready, so queue up any received
314 // events.
315 self.queue_received_events();
316
317 // Now there are two ways an event may be/become available. One is that the event is
318 // queued into the event loop at a specific time; the other is that some future is
319 // waiting on our fake clock to advance beyond a specific time. Pick the earliest.
320 let next_timestamp = {
321 let next_event_timestamp = self.events.peek().map(|event| event.due);
322 let next_future_waiter_timestamp = self
323 .clock
324 .first_waiter()
325 .map(|time| time.signed_duration_since(self.clock.now() - self.current_time));
326 next_event_timestamp
327 .map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1))
328 .or(next_future_waiter_timestamp)
329 };
330 // If the next event is immediately available (i.e. its time is same as current time),
331 // just return that event; there's no decision to make (as we only give deciders a
332 // chance to stop processing if we would advance the clock) and no need to advance time.
333 if next_timestamp == Some(self.current_time) {
334 let event = self.events.pop().expect("Programming error in TestLoop");
335 assert_eq!(event.due, self.current_time);
336 return Some(event);
337 }
338 // If we reach this point, it means we need to advance the clock. Let the decider choose
339 // if we should do that, or if we should stop.
340 let decision = decider(next_timestamp, &mut self.data);
341 match decision {
342 AdvanceDecision::AdvanceToNextEvent => {
343 let next_timestamp = next_timestamp.unwrap();
344 self.clock.advance(next_timestamp - self.current_time);
345 self.current_time = next_timestamp;
346 // Run the loop again, because if the reason why we advance the clock to this
347 // time is due to a possible future waiting on the clock, we may or may not get
348 // another future queued into the TestLoop, so we just check the whole thing
349 // again.
350 continue;
351 }
352 AdvanceDecision::AdvanceToAndStop(target) => {
353 self.clock.advance(target - self.current_time);
354 self.current_time = target;
355 return None;
356 }
357 AdvanceDecision::Stop => {
358 return None;
359 }
360 }
361 }
362 }
363
364 /// Processes the given event, by logging a line first and then finding a handler to run it.
365 fn process_event(&mut self, event: EventInHeap) {
366 let start_json = serde_json::to_string(&EventStartLogOutput {
367 current_index: event.id,
368 total_events: self.next_event_index,
369 current_event: event.event.description,
370 current_time_ms: event.due.whole_milliseconds() as u64,
371 })
372 .unwrap();
373 tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json);
374 assert_eq!(self.current_time, event.due);
375
376 if let Some(callback) = &mut self.every_event_callback {
377 callback(&self.data);
378 }
379
380 let callback = event.event.callback;
381 callback(&mut self.data);
382
383 // Push any new events into the queue. Do this before emitting the end log line,
384 // so that it contains the correct new total number of events.
385 self.queue_received_events();
386 let end_json =
387 serde_json::to_string(&EventEndLogOutput { total_events: self.next_event_index })
388 .unwrap();
389 tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_END {}", end_json);
390 }
391
392 /// Runs the test loop for the given duration. This function may be called
393 /// multiple times, but further test handlers may not be registered after
394 /// the first call.
395 pub fn run_for(&mut self, duration: Duration) {
396 let deadline = self.current_time + duration;
397 while let Some(event) = self.advance_till_next_event(&mut |next_time, _| {
398 if let Some(next_time) = next_time {
399 if next_time <= deadline {
400 return AdvanceDecision::AdvanceToNextEvent;
401 }
402 }
403 AdvanceDecision::AdvanceToAndStop(deadline)
404 }) {
405 self.process_event(event);
406 }
407 }
408
409 /// Run until the given condition is true, asserting that it happens before the maximum duration
410 /// is reached.
411 ///
412 /// To maximize logical consistency, the condition is only checked before the clock would
413 /// advance. If it returns true, execution stops before advancing the clock.
414 pub fn run_until(
415 &mut self,
416 mut condition: impl FnMut(&mut TestLoopData) -> bool,
417 maximum_duration: Duration,
418 ) {
419 let deadline = self.current_time + maximum_duration;
420 let mut decider = move |next_time, data: &mut TestLoopData| {
421 if condition(data) {
422 return AdvanceDecision::Stop;
423 }
424 if let Some(next_time) = next_time {
425 if next_time <= deadline {
426 return AdvanceDecision::AdvanceToNextEvent;
427 }
428 }
429 panic!("run_until did not fulfill the condition within the given deadline");
430 };
431 while let Some(event) = self.advance_till_next_event(&mut decider) {
432 self.process_event(event);
433 }
434 }
435
436 pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) {
437 self.shutting_down.store(true, Ordering::Relaxed);
438 self.run_for(maximum_duration);
439 // Implicitly dropped here, which asserts that no more events are remaining.
440 }
441
442 pub fn run_instant(&mut self) {
443 self.run_for(Duration::ZERO);
444 }
445}
446
447impl Drop for TestLoopV2 {
448 fn drop(&mut self) {
449 self.queue_received_events();
450 if let Some(event) = self.events.pop() {
451 // Drop any references that may be held by the event callbacks. This can help
452 // with destruction of the data.
453 self.events.clear();
454 panic!(
455 "Event scheduled at {} is not handled at the end of the test: {}.
456 Consider calling `test.shutdown_and_drain_remaining_events(...)`.",
457 event.due, event.event.description
458 );
459 }
460 // Needed for the log visualizer to know when the test loop ends.
461 tracing::info!(target: "test_loop", "TEST_LOOP_SHUTDOWN");
462 }
463}
464
465enum AdvanceDecision {
466 AdvanceToNextEvent,
467 AdvanceToAndStop(Duration),
468 Stop,
469}
470
471#[cfg(test)]
472mod tests {
473 use crate::futures::FutureSpawnerExt;
474 use crate::test_loop::TestLoopV2;
475 use std::sync::atomic::{AtomicUsize, Ordering};
476 use std::sync::Arc;
477 use time::Duration;
478
479 // Tests that the TestLoop correctly handles futures that sleep on the fake clock.
480 #[test]
481 fn test_futures() {
482 let mut test_loop = TestLoopV2::new();
483 let clock = test_loop.clock();
484 let start_time = clock.now();
485
486 let finished = Arc::new(AtomicUsize::new(0));
487
488 let clock1 = clock.clone();
489 let finished1 = finished.clone();
490 test_loop.future_spawner().spawn("test1", async move {
491 assert_eq!(clock1.now(), start_time);
492 clock1.sleep(Duration::seconds(10)).await;
493 assert_eq!(clock1.now(), start_time + Duration::seconds(10));
494 clock1.sleep(Duration::seconds(5)).await;
495 assert_eq!(clock1.now(), start_time + Duration::seconds(15));
496 finished1.fetch_add(1, Ordering::Relaxed);
497 });
498
499 test_loop.run_for(Duration::seconds(2));
500
501 let clock2 = clock;
502 let finished2 = finished.clone();
503 test_loop.future_spawner().spawn("test2", async move {
504 assert_eq!(clock2.now(), start_time + Duration::seconds(2));
505 clock2.sleep(Duration::seconds(3)).await;
506 assert_eq!(clock2.now(), start_time + Duration::seconds(5));
507 clock2.sleep(Duration::seconds(20)).await;
508 assert_eq!(clock2.now(), start_time + Duration::seconds(25));
509 finished2.fetch_add(1, Ordering::Relaxed);
510 });
511 // During these 30 virtual seconds, the TestLoop should've automatically advanced the clock
512 // to wake each future as they become ready to run again. The code inside the futures
513 // assert that the fake clock does indeed have the expected times.
514 test_loop.run_for(Duration::seconds(30));
515 assert_eq!(finished.load(Ordering::Relaxed), 2);
516 }
517}