near_async/test_loop/
futures.rs

1//! Utilities that let the deterministic TestLoop runtime drive async-style workloads.
2//!
3//! This module exposes two adapters that translate runtime interactions into TestLoop events:
4//! - `TestLoopFutureSpawner` implements [`FutureSpawner`] by enqueueing futures onto the loop so
5//!   any `spawn_boxed` call becomes a scheduled callback that the loop will poll to completion.
6//! - `TestLoopAsyncComputationSpawner` implements [`AsyncComputationSpawner`] by scheduling
7//!   blocking or synchronous computations through the loop, optionally delaying them via a
8//!   caller-provided function.
9//!
10//! Using these adapters keeps all side effects observable and controllable by tests that advance
11//! the loop manually.
12
13use super::PendingEventsSender;
14use super::data::TestLoopData;
15use crate::futures::{AsyncComputationSpawner, FutureSpawner};
16use futures::future::BoxFuture;
17use futures::task::{ArcWake, waker_ref};
18use near_time::Duration;
19use parking_lot::Mutex;
20use std::sync::Arc;
21use std::task::Context;
22
23/// Alias so the pending event sender can be handed to components that expect a [`FutureSpawner`].
24pub type TestLoopFutureSpawner = PendingEventsSender;
25
26impl FutureSpawner for TestLoopFutureSpawner {
27    fn spawn_boxed(&self, description: &str, f: BoxFuture<'static, ()>) {
28        let task = Arc::new(FutureTask {
29            future: Mutex::new(Some(f)),
30            sender: self.clone(),
31            description: description.to_string(),
32        });
33        let callback = move |_: &mut TestLoopData| drive_futures(&task);
34        self.send(format!("FutureSpawn({})", description), Box::new(callback));
35    }
36}
37
38struct FutureTask {
39    future: Mutex<Option<BoxFuture<'static, ()>>>,
40    sender: PendingEventsSender,
41    description: String,
42}
43
44impl ArcWake for FutureTask {
45    fn wake_by_ref(arc_self: &Arc<Self>) {
46        let clone = arc_self.clone();
47        arc_self.sender.send(
48            format!("FutureTask({})", arc_self.description),
49            Box::new(move |_: &mut TestLoopData| drive_futures(&clone)),
50        );
51    }
52}
53
54fn drive_futures(task: &Arc<FutureTask>) {
55    // Mirrors the single-threaded executor from the Rust async book. If the task still owns a
56    // future, poll it; on Pending, stash it back so the next wake-up can retry.
57    let mut future_slot = task.future.lock();
58    if let Some(mut future) = future_slot.take() {
59        let waker = waker_ref(&task);
60        let context = &mut Context::from_waker(&*waker);
61        if future.as_mut().poll(context).is_pending() {
62            // Not done yet; store the future again so the waker can reschedule it.
63            *future_slot = Some(future);
64        }
65    }
66}
67
68/// [`AsyncComputationSpawner`] implementation that schedules the computation via the TestLoop.
69pub struct TestLoopAsyncComputationSpawner {
70    sender: PendingEventsSender,
71    artificial_delay: Box<dyn Fn(&str) -> Duration + Send + Sync>,
72}
73
74impl TestLoopAsyncComputationSpawner {
75    pub fn new(
76        sender: PendingEventsSender,
77        artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
78    ) -> Self {
79        Self { sender, artificial_delay: Box::new(artificial_delay) }
80    }
81}
82
83impl AsyncComputationSpawner for TestLoopAsyncComputationSpawner {
84    fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>) {
85        self.sender.send_with_delay(
86            format!("AsyncComputation({})", name),
87            Box::new(move |_| f()),
88            (self.artificial_delay)(name),
89        );
90    }
91}