near_async/test_loop.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
//! This is a framework to test async code in a way that is versatile, deterministic,
//! easy-to-setup, and easy-to-debug.
//!
//! The primary concept here is an event loop that the test framework controls. The
//! event loop acts as a central hub for all messages, including Actix messages,
//! network messages, timers, etc. Business logic is only executed as a response to
//! such events.
//!
//! This brings several major benefits:
//! - Ease of setup:
//! - There is no need to set up mock objects that implement some
//! message sender interface, instead, the test loop provides a sender object that
//! can be used to send messages to the event loop. For example, suppose we were
//! to make a Client whose constructor requires a shards_manager adapter; instead
//! of having to make a mock for the shards_manager adapter, we can simply register
//! the shards_manager actor with testloop and pass in its sender.
//! - Compared to writing synchronous tests, there is no need to manually deliver
//! network messages or handle actix messages at certain points of the test. Instead,
//! the event loop will invoke the appropriate event handlers whenever there is any
//! event remaining in the event loop. This ensures that no messages are ever missed.
//! - Test setup code can be modular and reusable, because the test specification
//! consists entirely of registering the data and actors. Rather than passing a giant
//! callback into a giant setup(...) function to customize one part of a huge
//! integration test, we can flexibly compose specific modules with event handlers.
//! For example, we may add an event handler to route all ShardsManager-related network
//! messages reliably, and at the same time another event handler to drop 50% of Block
//! network messages. Also, we can use an event handler as long as it is relevant for a
//! test (i.e. a ForwardShardsManagerRequest event handler can be used as long as the
//! test involves ShardsManagers), regardless of the exact architecture of the test.
//!
//! - Debuggability:
//! - Because ALL execution is in response of events, the whole test can be cleanly
//! segmented into the response to each event. The framework automatically outputs
//! a log message at the beginning of each event execution, so that the log output
//! can be loaded into a visualizer to show the exact sequence of events, their
//! relationship, the exact contents of the event messages, and the log output
//! during the handling of each event. This is especially useful when debugging
//! multi-instance tests.
//!
//! - Determinism:
//! - Many tests, especially those that involve multiple instances, are most easily
//! written by spawning actual actors and threads. This however makes the tests
//! inherently asynchronous and may be more flaky.
//! - The test loop framework also provides a synchronous and deterministic way to
//! invoke timers without waiting for the actual duration. This makes tests run
//! much faster than asynchronous tests.
//!
//! - Versatilty:
//! - A test can be constructed with any combination of components. The framework does
//! not dictate what components should exist, or how many instances there should be.
//! This allows for both small and focused tests, and large multi-instance tests.
//! - Timed tests can be written to check the theoretical performance of certain tasks,
//! such as distributing chunks to other nodes within X milliseconds provided that
//! network messages have a 10ms delay.
//! - The framework does not require major migrations to existing code, e.g. it is
//! compatible with the Actix framework and futures.
//!
//! A note on the order of execution of the events: all events that are due at the same
//! timestamp are executed in FIFO order. For example, if the events are emitted in the
//! following order: (A due 100ms), (B due 0ms), (C due 200ms), (D due 0ms), (E due 100ms)
//! then the actual order of execution is B, D, A, E, C.
pub mod data;
pub mod futures;
pub mod pending_events_sender;
pub mod sender;
use data::TestLoopData;
use futures::{TestLoopAsyncComputationSpawner, TestLoopFututeSpawner};
use near_time::{Clock, Duration, FakeClock};
use pending_events_sender::{CallbackEvent, PendingEventsSender};
use sender::TestLoopSender;
use serde::Serialize;
use std::collections::BinaryHeap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use time::ext::InstantExt;
use crate::messaging::{Actor, LateBoundSender};
/// Main struct for the Test Loop framework.
/// The `TestLoopData` should contain all the business logic state that is relevant
/// to the test. All possible `Event` that are sent to the event loop are callbacks.
/// See TestLoopData for mode details.
///
/// Events are sent to the testloop, with a possible delay, via the pending_events_sender.
pub struct TestLoopV2 {
/// The data that is stored and accessed by the test loop.
pub data: TestLoopData,
/// The sender is used to send events to the event loop.
pending_events_sender: PendingEventsSender,
/// The events that are yet to be handled. They are kept in a heap so that
/// events that shall execute earlier (by our own virtual clock) are popped
/// first.
events: BinaryHeap<EventInHeap>,
/// The events that will enter the events heap upon the next iteration.
pending_events: Arc<Mutex<InFlightEvents>>,
/// The next ID to assign to an event we receive.
next_event_index: usize,
/// The current virtual time.
pub current_time: Duration,
/// Fake clock that always returns the virtual time.
clock: near_time::FakeClock,
/// Shutdown flag. When this flag is true, delayed action runners will no
/// longer post any new events to the event loop.
shutting_down: Arc<AtomicBool>,
/// If present, a function to call to print something every time an event is
/// handled. Intended only for debugging.
every_event_callback: Option<Box<dyn FnMut(&TestLoopData)>>,
}
/// An event waiting to be executed, ordered by the due time and then by ID.
struct EventInHeap {
event: CallbackEvent,
due: Duration,
id: usize,
}
impl PartialEq for EventInHeap {
fn eq(&self, other: &Self) -> bool {
self.due == other.due && self.id == other.id
}
}
impl Eq for EventInHeap {}
impl PartialOrd for EventInHeap {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for EventInHeap {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.due, self.id).cmp(&(other.due, other.id)).reverse()
}
}
struct InFlightEvents {
events: Vec<CallbackEvent>,
/// The TestLoop thread ID. This and the following field are used to detect unintended
/// parallel processing.
event_loop_thread_id: std::thread::ThreadId,
/// Whether we're currently handling an event.
is_handling_event: bool,
}
impl InFlightEvents {
fn add(&mut self, event: CallbackEvent) {
if !self.is_handling_event && std::thread::current().id() != self.event_loop_thread_id {
// Another thread shall not be sending an event while we're not handling an event.
// If that happens, it means we have a rogue thread spawned somewhere that has not been
// converted to TestLoop. TestLoop tests should be single-threaded (or at least, look
// as if it were single-threaded). So if we catch this, panic.
panic!(
"Event was sent from the wrong thread. TestLoop tests should be single-threaded. \
Check if there's any code that spawns computation on another thread such as \
rayon::spawn, and convert it to AsyncComputationSpawner or FutureSpawner. \
Event: {}",
event.description
);
}
self.events.push(event);
}
}
/// The log output line that can be used to visualize the execution of a test.
/// It is only used to serialize into JSON. This is enough data to reconstruct
/// the event dependency graph, and to segment log messages.
#[derive(Serialize)]
struct EventStartLogOutput {
/// Index of the current event we're about to handle.
current_index: usize,
/// See `EventEndLogOutput::total_events`.
total_events: usize,
/// The Debug representation of the event payload.
current_event: String,
/// The current virtual time.
current_time_ms: u64,
}
#[derive(Serialize)]
struct EventEndLogOutput {
/// The total number of events we have seen so far. This is combined with
/// `EventStartLogOutput::total_events` to determine which new events are
/// emitted by the current event's handler.
total_events: usize,
}
impl TestLoopV2 {
pub fn new() -> Self {
let pending_events = Arc::new(Mutex::new(InFlightEvents {
events: Vec::new(),
event_loop_thread_id: std::thread::current().id(),
is_handling_event: false,
}));
let pending_events_clone = pending_events.clone();
let pending_events_sender = PendingEventsSender::new(move |callback_event| {
let mut pending_events = pending_events_clone.lock().unwrap();
pending_events.add(callback_event);
});
let shutting_down = Arc::new(AtomicBool::new(false));
// Needed for the log visualizer to know when the test loop starts.
tracing::info!(target: "test_loop", "TEST_LOOP_INIT");
Self {
data: TestLoopData::new(pending_events_sender.clone(), shutting_down.clone()),
events: BinaryHeap::new(),
pending_events,
pending_events_sender,
next_event_index: 0,
current_time: Duration::ZERO,
clock: FakeClock::default(),
shutting_down,
every_event_callback: None,
}
}
/// Returns a FutureSpawner that can be used to spawn futures into the loop.
pub fn future_spawner(&self) -> TestLoopFututeSpawner {
self.pending_events_sender.clone()
}
/// Returns an AsyncComputationSpawner that can be used to spawn async computation into the
/// loop. The `artificial_delay` allows the test to determine an artificial delay that the
/// computation should take, based on the name of the computation.
pub fn async_computation_spawner(
&self,
artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
) -> TestLoopAsyncComputationSpawner {
TestLoopAsyncComputationSpawner::new(self.pending_events_sender.clone(), artificial_delay)
}
/// Returns a sender that can be used anywhere to send events to the loop.
pub fn sender(&self) -> PendingEventsSender {
self.pending_events_sender.clone()
}
/// Sends any ad-hoc event to the loop.
pub fn send_adhoc_event(
&self,
description: String,
callback: impl FnOnce(&mut TestLoopData) + Send + 'static,
) {
self.pending_events_sender.send(format!("Adhoc({})", description), Box::new(callback));
}
/// Returns a clock that will always return the current virtual time.
pub fn clock(&self) -> Clock {
self.clock.clock()
}
pub fn register_actor<A>(
&mut self,
actor: A,
adapter: Option<Arc<LateBoundSender<TestLoopSender<A>>>>,
) -> TestLoopSender<A>
where
A: Actor + 'static,
{
self.data.register_actor_for_index(0, actor, adapter)
}
pub fn register_actor_for_index<A>(
&mut self,
index: usize,
actor: A,
adapter: Option<Arc<LateBoundSender<TestLoopSender<A>>>>,
) -> TestLoopSender<A>
where
A: Actor + 'static,
{
self.data.register_actor_for_index(index, actor, adapter)
}
pub fn set_every_event_callback(&mut self, callback: impl FnMut(&TestLoopData) + 'static) {
self.every_event_callback = Some(Box::new(callback));
}
/// Helper to push events we have just received into the heap.
fn queue_received_events(&mut self) {
for event in self.pending_events.lock().unwrap().events.drain(..) {
self.events.push(EventInHeap {
due: self.current_time + event.delay,
id: self.next_event_index,
event,
});
self.next_event_index += 1;
}
}
/// Performs the logic to find the next event, advance to its time, and dequeue it.
/// Takes a decider to determine whether to advance time, handle the next event, and/or to stop.
fn advance_till_next_event(
&mut self,
decider: &impl Fn(Option<Duration>, &mut TestLoopData) -> AdvanceDecision,
) -> Option<EventInHeap> {
loop {
// New events may have been sent to the TestLoop from outside, and the previous
// iteration of the loop may have made new futures ready, so queue up any received
// events.
self.queue_received_events();
// Now there are two ways an event may be/become available. One is that the event is
// queued into the event loop at a specific time; the other is that some future is
// waiting on our fake clock to advance beyond a specific time. Pick the earliest.
let next_timestamp = {
let next_event_timestamp = self.events.peek().map(|event| event.due);
let next_future_waiter_timestamp = self
.clock
.first_waiter()
.map(|time| time.signed_duration_since(self.clock.now() - self.current_time));
next_event_timestamp
.map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1))
.or(next_future_waiter_timestamp)
};
// If the next event is immediately available (i.e. its time is same as current time),
// just return that event; there's no decision to make (as we only give deciders a
// chance to stop processing if we would advance the clock) and no need to advance time.
if next_timestamp == Some(self.current_time) {
let event = self.events.pop().expect("Programming error in TestLoop");
assert_eq!(event.due, self.current_time);
return Some(event);
}
// If we reach this point, it means we need to advance the clock. Let the decider choose
// if we should do that, or if we should stop.
let decision = decider(next_timestamp, &mut self.data);
match decision {
AdvanceDecision::AdvanceToNextEvent => {
let next_timestamp = next_timestamp.unwrap();
self.clock.advance(next_timestamp - self.current_time);
self.current_time = next_timestamp;
// Run the loop again, because if the reason why we advance the clock to this
// time is due to a possible future waiting on the clock, we may or may not get
// another future queued into the TestLoop, so we just check the whole thing
// again.
continue;
}
AdvanceDecision::AdvanceToAndStop(target) => {
self.clock.advance(target - self.current_time);
self.current_time = target;
return None;
}
AdvanceDecision::Stop => {
return None;
}
}
}
}
/// Processes the given event, by logging a line first and then finding a handler to run it.
fn process_event(&mut self, event: EventInHeap) {
let start_json = serde_json::to_string(&EventStartLogOutput {
current_index: event.id,
total_events: self.next_event_index,
current_event: event.event.description,
current_time_ms: event.due.whole_milliseconds() as u64,
})
.unwrap();
tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json);
assert_eq!(self.current_time, event.due);
if let Some(callback) = &mut self.every_event_callback {
callback(&self.data);
}
let callback = event.event.callback;
callback(&mut self.data);
// Push any new events into the queue. Do this before emitting the end log line,
// so that it contains the correct new total number of events.
self.queue_received_events();
let end_json =
serde_json::to_string(&EventEndLogOutput { total_events: self.next_event_index })
.unwrap();
tracing::info!(target: "test_loop", "TEST_LOOP_EVENT_END {}", end_json);
}
/// Runs the test loop for the given duration. This function may be called
/// multiple times, but further test handlers may not be registered after
/// the first call.
pub fn run_for(&mut self, duration: Duration) {
let deadline = self.current_time + duration;
while let Some(event) = self.advance_till_next_event(&|next_time, _| {
if let Some(next_time) = next_time {
if next_time <= deadline {
return AdvanceDecision::AdvanceToNextEvent;
}
}
AdvanceDecision::AdvanceToAndStop(deadline)
}) {
self.process_event(event);
}
}
/// Run until the given condition is true, asserting that it happens before the maximum duration
/// is reached.
///
/// To maximize logical consistency, the condition is only checked before the clock would
/// advance. If it returns true, execution stops before advancing the clock.
pub fn run_until(
&mut self,
condition: impl Fn(&mut TestLoopData) -> bool,
maximum_duration: Duration,
) {
let deadline = self.current_time + maximum_duration;
let decider = |next_time, data: &mut TestLoopData| {
if condition(data) {
return AdvanceDecision::Stop;
}
if let Some(next_time) = next_time {
if next_time <= deadline {
return AdvanceDecision::AdvanceToNextEvent;
}
}
panic!("run_until did not fulfill the condition within the given deadline");
};
while let Some(event) = self.advance_till_next_event(&decider) {
self.process_event(event);
}
}
pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) {
self.shutting_down.store(true, Ordering::Relaxed);
self.run_for(maximum_duration);
// Implicitly dropped here, which asserts that no more events are remaining.
}
pub fn run_instant(&mut self) {
self.run_for(Duration::ZERO);
}
}
impl Drop for TestLoopV2 {
fn drop(&mut self) {
self.queue_received_events();
if let Some(event) = self.events.pop() {
panic!(
"Event scheduled at {} is not handled at the end of the test: {}.
Consider calling `test.shutdown_and_drain_remaining_events(...)`.",
event.due, event.event.description
);
}
// Needed for the log visualizer to know when the test loop ends.
tracing::info!(target: "test_loop", "TEST_LOOP_SHUTDOWN");
}
}
enum AdvanceDecision {
AdvanceToNextEvent,
AdvanceToAndStop(Duration),
Stop,
}
#[cfg(test)]
mod tests {
use crate::futures::FutureSpawnerExt;
use crate::test_loop::TestLoopV2;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use time::Duration;
// Tests that the TestLoop correctly handles futures that sleep on the fake clock.
#[test]
fn test_futures() {
let mut test_loop = TestLoopV2::new();
let clock = test_loop.clock();
let start_time = clock.now();
let finished = Arc::new(AtomicUsize::new(0));
let clock1 = clock.clone();
let finished1 = finished.clone();
test_loop.future_spawner().spawn("test1", async move {
assert_eq!(clock1.now(), start_time);
clock1.sleep(Duration::seconds(10)).await;
assert_eq!(clock1.now(), start_time + Duration::seconds(10));
clock1.sleep(Duration::seconds(5)).await;
assert_eq!(clock1.now(), start_time + Duration::seconds(15));
finished1.fetch_add(1, Ordering::Relaxed);
});
test_loop.run_for(Duration::seconds(2));
let clock2 = clock;
let finished2 = finished.clone();
test_loop.future_spawner().spawn("test2", async move {
assert_eq!(clock2.now(), start_time + Duration::seconds(2));
clock2.sleep(Duration::seconds(3)).await;
assert_eq!(clock2.now(), start_time + Duration::seconds(5));
clock2.sleep(Duration::seconds(20)).await;
assert_eq!(clock2.now(), start_time + Duration::seconds(25));
finished2.fetch_add(1, Ordering::Relaxed);
});
// During these 30 virtual seconds, the TestLoop should've automatically advanced the clock
// to wake each future as they become ready to run again. The code inside the futures
// assert that the fake clock does indeed have the expected times.
test_loop.run_for(Duration::seconds(30));
assert_eq!(finished.load(Ordering::Relaxed), 2);
}
}