near_async/test_loop/
futures.rs

1//! Support for futures in TestLoop.
2//!
3//! There are two key features this file provides for TestLoop:
4//!
5//!   1. A general way to spawn futures and have the TestLoop drive the futures.
6//!      To support this, pass test_loop.future_spawner() the &dyn FutureSpawner
7//!      to any component that needs to spawn futures.
8//!
9//!      This causes any futures spawned during the test to end up as a callback in the
10//!      test loop. The event will eventually be executed by the drive_futures function,
11//!      which will drive the future until it is either suspended or completed. If suspended,
12//!      then the waker of the future (called when the future is ready to resume) will place
13//!      the event back into the test loop to be executed again.
14//!
15//!   2. A way to send a message to the TestLoop and expect a response as a
16//!      future, which will resolve whenever the TestLoop handles the message.
17//!      To support this, use MessageWithCallback<Request, Response> as the
18//!      event type, and in the handler, call (event.responder)(result)
19//!      (possibly asynchronously) to complete the future.
20//!
21//!      This is needed to support the AsyncSender interface, which is required
22//!      by some components as they expect a response to each message. The way
23//!      this is implemented is by implementing a conversion from
24//!      DelaySender<MessageWithCallback<Request, Response>> to
25//!      AsyncSender<Request, Response>.
26
27use super::PendingEventsSender;
28use super::data::TestLoopData;
29use crate::futures::{AsyncComputationSpawner, FutureSpawner};
30use futures::future::BoxFuture;
31use futures::task::{ArcWake, waker_ref};
32use near_time::Duration;
33use parking_lot::Mutex;
34use std::sync::Arc;
35use std::task::Context;
36
37/// A DelaySender is a FutureSpawner that can be used to
38/// spawn futures into the test loop. We give it a convenient alias.
39pub type TestLoopFutureSpawner = PendingEventsSender;
40
41impl FutureSpawner for TestLoopFutureSpawner {
42    fn spawn_boxed(&self, description: &str, f: BoxFuture<'static, ()>) {
43        let task = Arc::new(FutureTask {
44            future: Mutex::new(Some(f)),
45            sender: self.clone(),
46            description: description.to_string(),
47        });
48        let callback = move |_: &mut TestLoopData| {
49            drive_futures(&task);
50        };
51        self.send(format!("FutureSpawn({})", description), Box::new(callback));
52    }
53}
54
55struct FutureTask {
56    future: Mutex<Option<BoxFuture<'static, ()>>>,
57    sender: PendingEventsSender,
58    description: String,
59}
60
61impl ArcWake for FutureTask {
62    fn wake_by_ref(arc_self: &Arc<Self>) {
63        let clone = arc_self.clone();
64        arc_self.sender.send(
65            format!("FutureTask({})", arc_self.description),
66            Box::new(move |_: &mut TestLoopData| drive_futures(&clone)),
67        );
68    }
69}
70
71fn drive_futures(task: &Arc<FutureTask>) {
72    // The following is copied from the Rust async book.
73    // Take the future, and if it has not yet completed (is still Some),
74    // poll it in an attempt to complete it.
75    let mut future_slot = task.future.lock();
76    if let Some(mut future) = future_slot.take() {
77        let waker = waker_ref(&task);
78        let context = &mut Context::from_waker(&*waker);
79        if future.as_mut().poll(context).is_pending() {
80            // We're still not done processing the future, so put it
81            // back in its task to be run again in the future.
82            *future_slot = Some(future);
83        }
84    }
85}
86
87/// AsyncComputationSpawner that spawns the computation in the TestLoop.
88pub struct TestLoopAsyncComputationSpawner {
89    sender: PendingEventsSender,
90    artificial_delay: Box<dyn Fn(&str) -> Duration + Send + Sync>,
91}
92
93impl TestLoopAsyncComputationSpawner {
94    pub fn new(
95        sender: PendingEventsSender,
96        artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
97    ) -> Self {
98        Self { sender, artificial_delay: Box::new(artificial_delay) }
99    }
100}
101
102impl AsyncComputationSpawner for TestLoopAsyncComputationSpawner {
103    fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>) {
104        self.sender.send_with_delay(
105            format!("AsyncComputation({})", name),
106            Box::new(move |_| f()),
107            (self.artificial_delay)(name),
108        );
109    }
110}