1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
//! This is a framework to test async code in a way that is versatile, deterministic,
//! easy-to-setup, and easy-to-debug.
//!
//! The primary concept here is an event loop that the test framework controls. The
//! event loop acts as a central hub for all messages, including Actix messages,
//! network messages, timers, etc. Business logic is only executed as a response to
//! such events.
//!
//! This brings several major benefits:
//! - Ease of setup:
//! - There is no need to set up mock objects that implement some
//! message sender interface, instead, the test loop provides a sender object that
//! can be used to send messages to the event loop. For example, suppose we were
//! to make a Client whose constructor requires a network adapter; instead of having
//! to make a mock for the network adapter, we can simply pass in `loop.sender()`.
//! - Compared to writing synchronous tests, there is no need to manually deliver
//! network messages or handle actix messages at certain points of the test. Instead,
//! the event loop will invoke the appropriate event handlers whenever there is any
//! event remaining in the event loop. This ensures that no messages are ever missed.
//! - Test setup code can be modular and reusable, because the test specification
//! consists entirely of registering the desired event handlers. Rather than passing
//! a giant callback into a giant setup(...) function to customize one part of a huge
//! integration test, we can flexibly compose specific event handlers. For example,
//! we may add an event handler to route all ShardsManager-related network messages
//! reliably, and at the same time another event handler to drop 50% of Block network
//! messages. Also, we can use an event handler as long as it is relevant for a test
//! (i.e. a ForwardShardsManagerRequest event handler can be used as long as the test
//! involves ShardsManagers), regardless of the exact architecture of the test.
//! See `LoopEventHandler` for more details.
//!
//! - Debuggability:
//! - Because ALL execution is in response of events, the whole test can be cleanly
//! segmented into the response to each event. The framework automatically outputs
//! a log message at the beginning of each event execution, so that the log output
//! can be loaded into a visualizer to show the exact sequence of events, their
//! relationship, the exact contents of the event messages, and the log output
//! during the handling of each event. This is especially useful when debugging
//! multi-instance tests.
//!
//! - Determinism:
//! - Many tests, especially those that involve multiple instances, are most easily
//! written by spawning actual actors and threads. This however makes the tests
//! inherently asynchronous and may be more flaky.
//! - The test loop framework also provides a synchronous and deterministic way to
//! invoke timers without waiting for the actual duration. This makes tests run
//! much faster than asynchronous tests.
//!
//! - Versatilty:
//! - A test can be constructed with any combination of components. The framework does
//! not dictate what components should exist, or how many instances there should be.
//! This allows for both small and focused tests, and large multi-instance tests.
//! - Timed tests can be written to check the theoretical performance of certain tasks,
//! such as distributing chunks to other nodes within X milliseconds provided that
//! network messages have a 10ms delay.
//! - The framework does not require major migrations to existing code, e.g. it is
//! compatible with the Actix framework (and possibly futures in the future).
//!
//! A note on the order of execution of the events: all events that are due at the same
//! timestamp are executed in FIFO order. For example, if the events are emitted in the
//! following order: (A due 100ms), (B due 0ms), (C due 200ms), (D due 0ms), (E due 100ms)
//! then the actual order of execution is B, D, A, E, C.
pub mod adhoc;
pub mod delay_sender;
pub mod event_handler;
pub mod futures;
use self::{
delay_sender::DelaySender,
event_handler::LoopEventHandler,
futures::{TestLoopFutureSpawner, TestLoopTask},
};
use ::time::ext::InstantExt as _;
use near_o11y::{testonly::init_test_logger, tracing::info};
use near_time::{self, Clock, Duration};
use serde::Serialize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::{collections::BinaryHeap, fmt::Debug, sync::Arc};
/// Main struct for the Test Loop framework.
/// The `Data` type should contain all the business logic state that is relevant
/// to the test. The `Event` type should contain all the possible events that
/// are sent to the event loop.
///
/// The convention is that, for single-instance tests,
/// - `Data` should be a struct with a derive_more::AsMut and derive_more::AsRef
/// (so that `Data` implements AsMut<Field> and AsRef<Field> for each of its
/// fields.)
/// - `Event` should be an enum with a derive(EnumTryInto, EnumFrom), so that it
/// implements TryInto<Variant> and From<Variant> for each of its variants.
/// and that for multi-instance tests, `Data` is `Vec<SingleData>` and `Event` is
/// `(usize, SingleEvent)`.
pub struct TestLoop<Data: 'static, Event: Debug + Send + 'static> {
pub data: Data,
/// The sender is used to send events to the event loop.
sender: DelaySender<Event>,
/// The events that are yet to be handled. They are kept in a heap so that
/// events that shall execute earlier (by our own virtual clock) are popped
/// first.
events: BinaryHeap<EventInHeap<Event>>,
/// The events that will enter the events heap upon the next iteration.
pending_events: Arc<Mutex<InFlightEvents<Event>>>,
/// The next ID to assign to an event we receive.
next_event_index: usize,
/// The current virtual time.
current_time: Duration,
/// Fake clock that always returns the virtual time.
clock: near_time::FakeClock,
/// Shutdown flag. When this flag is true, delayed action runners will no
/// longer post any new events to the event loop.
shutting_down: Arc<AtomicBool>,
/// All the event handlers that are registered. We invoke them one by one
/// for each event, until one of them handles the event (or panic if no one
/// handles it).
handlers: Vec<LoopEventHandler<Data, Event>>,
}
/// An event waiting to be executed, ordered by the due time and then by ID.
struct EventInHeap<Event> {
event: Event,
due: Duration,
id: usize,
}
impl<Event> PartialEq for EventInHeap<Event> {
fn eq(&self, other: &Self) -> bool {
self.due == other.due && self.id == other.id
}
}
impl<Event> Eq for EventInHeap<Event> {}
impl<Event> PartialOrd for EventInHeap<Event> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<Event> Ord for EventInHeap<Event> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.due, self.id).cmp(&(other.due, other.id)).reverse()
}
}
/// An event that is in-flight. The delay here is relative to the virtual time
/// when the handler that emitted this event is invoked (e.g. a network routing
/// handler may respond to an outbound message and emit an inbound message with
/// a 10ms delay).
struct EventInFlight<Event> {
event: Event,
delay: Duration,
}
struct InFlightEvents<Event> {
events: Vec<EventInFlight<Event>>,
/// The TestLoop thread ID. This and the following field are used to detect unintended
/// parallel processing.
event_loop_thread_id: std::thread::ThreadId,
/// Whether we're currently handling an event.
is_handling_event: bool,
}
impl<Event: Debug> InFlightEvents<Event> {
fn add(&mut self, event: Event, delay: Duration) {
if !self.is_handling_event && std::thread::current().id() != self.event_loop_thread_id {
// Another thread shall not be sending an event while we're not handling an event.
// If that happens, it means we have a rogue thread spawned somewhere that has not been
// converted to TestLoop. TestLoop tests should be single-threaded (or at least, look
// as if it were single-threaded). So if we catch this, panic.
panic!(
"Event was sent from the wrong thread. TestLoop tests should be single-threaded. \
Check if there's any code that spawns computation on another thread such as \
rayon::spawn, and convert it to AsyncComputationSpawner or FutureSpawner. \
Event: {:?}",
event
);
}
self.events.push(EventInFlight { event, delay });
}
}
/// Builder that should be used to construct a `TestLoop`. The reason why the
/// builder exists is that usually the `Data` type can only be constructed using
/// the event sender provided by the test loop, so this way we can avoid a
/// construction dependency cycle.
pub struct TestLoopBuilder<Event: Debug + Send + 'static> {
clock: near_time::FakeClock,
pending_events: Arc<Mutex<InFlightEvents<Event>>>,
pending_events_sender: DelaySender<Event>,
shutting_down: Arc<AtomicBool>,
}
impl<Event: Debug + Send + 'static> TestLoopBuilder<Event> {
pub fn new() -> Self {
// Initialize the logger to make sure the test loop printouts are visible.
init_test_logger();
let pending_events = Arc::new(Mutex::new(InFlightEvents {
events: Vec::new(),
event_loop_thread_id: std::thread::current().id(),
is_handling_event: false,
}));
Self {
clock: near_time::FakeClock::default(),
pending_events: pending_events.clone(),
pending_events_sender: DelaySender::new(move |event, delay| {
pending_events.lock().unwrap().add(event, delay);
}),
shutting_down: Arc::new(AtomicBool::new(false)),
}
}
/// Returns a sender that can be used anywhere to send events to the loop.
pub fn sender(&self) -> DelaySender<Event> {
self.pending_events_sender.clone()
}
/// Returns a clock that will always return the current virtual time.
pub fn clock(&self) -> near_time::Clock {
self.clock.clock()
}
/// Returns a flag indicating whether the TestLoop system is being shut down;
/// this is similar to whether the Actix system is shutting down.
pub fn shutting_down(&self) -> Arc<AtomicBool> {
self.shutting_down.clone()
}
pub fn build<Data>(self, data: Data) -> TestLoop<Data, Event> {
TestLoop::new(
self.pending_events,
self.pending_events_sender,
self.clock,
self.shutting_down,
data,
)
}
}
/// The log output line that can be used to visualize the execution of a test.
/// It is only used to serialize into JSON. This is enough data to reconstruct
/// the event dependency graph, and to segment log messages.
#[derive(Serialize)]
struct EventStartLogOutput {
/// Index of the current event we're about to handle.
current_index: usize,
/// See `EventEndLogOutput::total_events`.
total_events: usize,
/// The Debug representation of the event payload.
current_event: String,
/// The current virtual time.
current_time_ms: u64,
}
#[derive(Serialize)]
struct EventEndLogOutput {
/// The total number of events we have seen so far. This is combined with
/// `EventStartLogOutput::total_events` to determine which new events are
/// emitted by the current event's handler.
total_events: usize,
}
impl<Data, Event: Debug + Send + 'static> TestLoop<Data, Event> {
fn new(
pending_events: Arc<Mutex<InFlightEvents<Event>>>,
sender: DelaySender<Event>,
clock: near_time::FakeClock,
shutting_down: Arc<AtomicBool>,
data: Data,
) -> Self {
Self {
data,
sender,
events: BinaryHeap::new(),
pending_events,
next_event_index: 0,
current_time: time::Duration::ZERO,
clock,
shutting_down,
handlers: Vec::new(),
}
}
pub fn sender(&self) -> DelaySender<Event> {
self.sender.clone()
}
pub fn clock(&self) -> Clock {
self.clock.clock()
}
pub fn shutting_down(&self) -> Arc<AtomicBool> {
self.shutting_down.clone()
}
/// Registers a new event handler to the test loop.
pub fn register_handler(&mut self, handler: LoopEventHandler<Data, Event>) {
self.handlers.push(handler);
}
/// Helper to push events we have just received into the heap.
fn queue_received_events(&mut self) {
for event in self.pending_events.lock().unwrap().events.drain(..) {
self.events.push(EventInHeap {
due: self.current_time + event.delay,
event: event.event,
id: self.next_event_index,
});
self.next_event_index += 1;
}
}
/// Performs the logic to find the next event, advance to its time, and dequeue it.
/// Takes a decider to determine whether to advance time, handle the next event, and/or to stop.
fn advance_till_next_event(
&mut self,
decider: &impl Fn(Option<Duration>, &mut Data) -> AdvanceDecision,
) -> Option<EventInHeap<Event>> {
loop {
// New events may have been sent to the TestLoop from outside, and the previous
// iteration of the loop may have made new futures ready, so queue up any received
// events.
self.queue_received_events();
// Now there are two ways an event may be/become available. One is that the event is
// queued into the event loop at a specific time; the other is that some future is
// waiting on our fake clock to advance beyond a specific time. Pick the earliest.
let next_timestamp = {
let next_event_timestamp = self.events.peek().map(|event| event.due);
let next_future_waiter_timestamp = self
.clock
.first_waiter()
.map(|time| time.signed_duration_since(self.clock.now() - self.current_time));
next_event_timestamp
.map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1))
.or(next_future_waiter_timestamp)
};
// If the next event is immediately available (i.e. its time is same as current time),
// just return that event; there's no decision to make (as we only give deciders a
// chance to stop processing if we would advance the clock) and no need to advance time.
if next_timestamp == Some(self.current_time) {
let event = self.events.pop().expect("Programming error in TestLoop");
assert_eq!(event.due, self.current_time);
return Some(event);
}
// If we reach this point, it means we need to advance the clock. Let the decider choose
// if we should do that, or if we should stop.
let decision = decider(next_timestamp, &mut self.data);
match decision {
AdvanceDecision::AdvanceToNextEvent => {
let next_timestamp = next_timestamp.unwrap();
self.clock.advance(next_timestamp - self.current_time);
self.current_time = next_timestamp;
// Run the loop again, because if the reason why we advance the clock to this
// time is due to a possible future waiting on the clock, we may or may not get
// another future queued into the TestLoop, so we just check the whole thing
// again.
continue;
}
AdvanceDecision::AdvanceToAndStop(target) => {
self.clock.advance(target - self.current_time);
self.current_time = target;
return None;
}
AdvanceDecision::Stop => {
return None;
}
}
}
}
/// Processes the given event, by logging a line first and then finding a handler to run it.
fn process_event(&mut self, mut event: EventInHeap<Event>) {
let start_json = serde_json::to_string(&EventStartLogOutput {
current_index: event.id,
total_events: self.next_event_index,
current_event: format!("{:?}", event.event),
current_time_ms: event.due.whole_milliseconds() as u64,
})
.unwrap();
info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json);
assert_eq!(self.current_time, event.due);
for handler in &mut self.handlers {
if let Err(e) = handler.handle(event.event, &mut self.data) {
event.event = e;
} else {
// Push any new events into the queue. Do this before emitting the end log line,
// so that it contains the correct new total number of events.
self.queue_received_events();
let end_json = serde_json::to_string(&EventEndLogOutput {
total_events: self.next_event_index,
})
.unwrap();
info!(target: "test_loop", "TEST_LOOP_EVENT_END {}", end_json);
return;
}
}
panic!("Unhandled event: {:?}", event.event);
}
/// Runs the test loop for the given duration. This function may be called
/// multiple times, but further test handlers may not be registered after
/// the first call.
pub fn run_for(&mut self, duration: Duration) {
let deadline = self.current_time + duration;
while let Some(event) = self.advance_till_next_event(&|next_time, _| {
if let Some(next_time) = next_time {
if next_time <= deadline {
return AdvanceDecision::AdvanceToNextEvent;
}
}
AdvanceDecision::AdvanceToAndStop(deadline)
}) {
self.process_event(event);
}
}
/// Run until the given condition is true, asserting that it happens before the maximum duration
/// is reached.
///
/// To maximize logical consistency, the condition is only checked before the clock would
/// advance. If it returns true, execution stops before advancing the clock.
pub fn run_until(&mut self, condition: impl Fn(&mut Data) -> bool, maximum_duration: Duration) {
let deadline = self.current_time + maximum_duration;
let decider = |next_time, data: &mut Data| {
if condition(data) {
return AdvanceDecision::Stop;
}
if let Some(next_time) = next_time {
if next_time <= deadline {
return AdvanceDecision::AdvanceToNextEvent;
}
}
panic!("run_until did not fulfill the condition within the given deadline");
};
while let Some(event) = self.advance_till_next_event(&decider) {
self.process_event(event);
}
}
/// Used to finish off remaining events that are still in the loop. This can be necessary if the
/// destructor of some components wait for certain condition to become true. Otherwise, the
/// destructors may end up waiting forever. This also helps avoid a panic when destructing
/// TestLoop itself, as it asserts that all events have been handled.
pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) {
self.shutting_down.store(true, Ordering::Relaxed);
self.run_for(maximum_duration);
// Implicitly dropped here, which asserts that no more events are remaining.
}
pub fn run_instant(&mut self) {
self.run_for(Duration::ZERO);
}
pub fn future_spawner(&self) -> TestLoopFutureSpawner
where
Event: From<Arc<TestLoopTask>>,
{
self.sender().narrow()
}
}
impl<Data: 'static, Event: Debug + Send + 'static> Drop for TestLoop<Data, Event> {
fn drop(&mut self) {
self.queue_received_events();
if let Some(event) = self.events.pop() {
panic!(
"Event scheduled at {} is not handled at the end of the test: {:?}.
Consider calling `test.shutdown_and_drain_remaining_events(...)`.",
event.due, event.event
);
}
}
}
enum AdvanceDecision {
AdvanceToNextEvent,
AdvanceToAndStop(Duration),
Stop,
}
#[cfg(test)]
mod tests {
use crate::futures::FutureSpawnerExt;
use crate::test_loop::futures::{drive_futures, TestLoopTask};
use crate::test_loop::TestLoopBuilder;
use derive_enum_from_into::{EnumFrom, EnumTryInto};
use derive_more::AsMut;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use time::Duration;
#[derive(Debug, EnumFrom, EnumTryInto)]
enum TestEvent {
Task(Arc<TestLoopTask>),
}
#[derive(AsMut)]
struct TestData {
dummy: (),
}
// Tests that the TestLoop correctly handles futures that sleep on the fake clock.
#[test]
fn test_futures() {
let builder = TestLoopBuilder::<TestEvent>::new();
let clock = builder.clock();
let mut test = builder.build::<TestData>(TestData { dummy: () });
test.register_handler(drive_futures().widen());
let start_time = clock.now();
let finished = Arc::new(AtomicUsize::new(0));
let clock1 = clock.clone();
let finished1 = finished.clone();
test.sender().into_future_spawner().spawn("test1", async move {
assert_eq!(clock1.now(), start_time);
clock1.sleep(Duration::seconds(10)).await;
assert_eq!(clock1.now(), start_time + Duration::seconds(10));
clock1.sleep(Duration::seconds(5)).await;
assert_eq!(clock1.now(), start_time + Duration::seconds(15));
finished1.fetch_add(1, Ordering::Relaxed);
});
test.run_for(Duration::seconds(2));
let clock2 = clock;
let finished2 = finished.clone();
test.sender().into_future_spawner().spawn("test2", async move {
assert_eq!(clock2.now(), start_time + Duration::seconds(2));
clock2.sleep(Duration::seconds(3)).await;
assert_eq!(clock2.now(), start_time + Duration::seconds(5));
clock2.sleep(Duration::seconds(20)).await;
assert_eq!(clock2.now(), start_time + Duration::seconds(25));
finished2.fetch_add(1, Ordering::Relaxed);
});
// During these 30 virtual seconds, the TestLoop should've automatically advanced the clock
// to wake each future as they become ready to run again. The code inside the futures
// assert that the fake clock does indeed have the expected times.
test.run_for(Duration::seconds(30));
assert_eq!(finished.load(Ordering::Relaxed), 2);
}
}