near_async/test_loop/futures.rs
1//! Support for futures in TestLoop.
2//!
3//! There are two key features this file provides for TestLoop:
4//!
5//! 1. A general way to spawn futures and have the TestLoop drive the futures.
6//! To support this, pass test_loop.future_spawner() the &dyn FutureSpawner
7//! to any component that needs to spawn futures.
8//!
9//! This causes any futures spawned during the test to end up as a callback in the
10//! test loop. The event will eventually be executed by the drive_futures function,
11//! which will drive the future until it is either suspended or completed. If suspended,
12//! then the waker of the future (called when the future is ready to resume) will place
13//! the event back into the test loop to be executed again.
14//!
15//! 2. A way to send a message to the TestLoop and expect a response as a
16//! future, which will resolve whenever the TestLoop handles the message.
17//! To support this, use MessageWithCallback<Request, Response> as the
18//! event type, and in the handler, call (event.responder)(result)
19//! (possibly asynchronously) to complete the future.
20//!
21//! This is needed to support the AsyncSender interface, which is required
22//! by some components as they expect a response to each message. The way
23//! this is implemented is by implementing a conversion from
24//! DelaySender<MessageWithCallback<Request, Response>> to
25//! AsyncSender<Request, Response>.
26
27use super::PendingEventsSender;
28use super::data::TestLoopData;
29use crate::futures::{AsyncComputationSpawner, FutureSpawner};
30use futures::future::BoxFuture;
31use futures::task::{ArcWake, waker_ref};
32use near_time::Duration;
33use parking_lot::Mutex;
34use std::sync::Arc;
35use std::task::Context;
36
37/// A DelaySender is a FutureSpawner that can be used to
38/// spawn futures into the test loop. We give it a convenient alias.
39pub type TestLoopFutureSpawner = PendingEventsSender;
40
41impl FutureSpawner for TestLoopFutureSpawner {
42 fn spawn_boxed(&self, description: &str, f: BoxFuture<'static, ()>) {
43 let task = Arc::new(FutureTask {
44 future: Mutex::new(Some(f)),
45 sender: self.clone(),
46 description: description.to_string(),
47 });
48 let callback = move |_: &mut TestLoopData| {
49 drive_futures(&task);
50 };
51 self.send(format!("FutureSpawn({})", description), Box::new(callback));
52 }
53}
54
55struct FutureTask {
56 future: Mutex<Option<BoxFuture<'static, ()>>>,
57 sender: PendingEventsSender,
58 description: String,
59}
60
61impl ArcWake for FutureTask {
62 fn wake_by_ref(arc_self: &Arc<Self>) {
63 let clone = arc_self.clone();
64 arc_self.sender.send(
65 format!("FutureTask({})", arc_self.description),
66 Box::new(move |_: &mut TestLoopData| drive_futures(&clone)),
67 );
68 }
69}
70
71fn drive_futures(task: &Arc<FutureTask>) {
72 // The following is copied from the Rust async book.
73 // Take the future, and if it has not yet completed (is still Some),
74 // poll it in an attempt to complete it.
75 let mut future_slot = task.future.lock();
76 if let Some(mut future) = future_slot.take() {
77 let waker = waker_ref(&task);
78 let context = &mut Context::from_waker(&*waker);
79 if future.as_mut().poll(context).is_pending() {
80 // We're still not done processing the future, so put it
81 // back in its task to be run again in the future.
82 *future_slot = Some(future);
83 }
84 }
85}
86
87/// AsyncComputationSpawner that spawns the computation in the TestLoop.
88pub struct TestLoopAsyncComputationSpawner {
89 sender: PendingEventsSender,
90 artificial_delay: Box<dyn Fn(&str) -> Duration + Send + Sync>,
91}
92
93impl TestLoopAsyncComputationSpawner {
94 pub fn new(
95 sender: PendingEventsSender,
96 artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
97 ) -> Self {
98 Self { sender, artificial_delay: Box::new(artificial_delay) }
99 }
100}
101
102impl AsyncComputationSpawner for TestLoopAsyncComputationSpawner {
103 fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>) {
104 self.sender.send_with_delay(
105 format!("AsyncComputation({})", name),
106 Box::new(move |_| f()),
107 (self.artificial_delay)(name),
108 );
109 }
110}