near_async/test_loop/
futures.rs1use super::PendingEventsSender;
14use super::data::TestLoopData;
15use crate::futures::{AsyncComputationSpawner, FutureSpawner};
16use futures::future::BoxFuture;
17use futures::task::{ArcWake, waker_ref};
18use near_time::Duration;
19use parking_lot::Mutex;
20use std::sync::Arc;
21use std::task::Context;
22
23pub type TestLoopFutureSpawner = PendingEventsSender;
25
26impl FutureSpawner for TestLoopFutureSpawner {
27 fn spawn_boxed(&self, description: &str, f: BoxFuture<'static, ()>) {
28 let task = Arc::new(FutureTask {
29 future: Mutex::new(Some(f)),
30 sender: self.clone(),
31 description: description.to_string(),
32 });
33 let callback = move |_: &mut TestLoopData| drive_futures(&task);
34 self.send(format!("FutureSpawn({})", description), Box::new(callback));
35 }
36}
37
38struct FutureTask {
39 future: Mutex<Option<BoxFuture<'static, ()>>>,
40 sender: PendingEventsSender,
41 description: String,
42}
43
44impl ArcWake for FutureTask {
45 fn wake_by_ref(arc_self: &Arc<Self>) {
46 let clone = arc_self.clone();
47 arc_self.sender.send(
48 format!("FutureTask({})", arc_self.description),
49 Box::new(move |_: &mut TestLoopData| drive_futures(&clone)),
50 );
51 }
52}
53
54fn drive_futures(task: &Arc<FutureTask>) {
55 let mut future_slot = task.future.lock();
58 if let Some(mut future) = future_slot.take() {
59 let waker = waker_ref(&task);
60 let context = &mut Context::from_waker(&*waker);
61 if future.as_mut().poll(context).is_pending() {
62 *future_slot = Some(future);
64 }
65 }
66}
67
68pub struct TestLoopAsyncComputationSpawner {
70 sender: PendingEventsSender,
71 artificial_delay: Box<dyn Fn(&str) -> Duration + Send + Sync>,
72}
73
74impl TestLoopAsyncComputationSpawner {
75 pub fn new(
76 sender: PendingEventsSender,
77 artificial_delay: impl Fn(&str) -> Duration + Send + Sync + 'static,
78 ) -> Self {
79 Self { sender, artificial_delay: Box::new(artificial_delay) }
80 }
81}
82
83impl AsyncComputationSpawner for TestLoopAsyncComputationSpawner {
84 fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>) {
85 self.sender.send_with_delay(
86 format!("AsyncComputation({})", name),
87 Box::new(move |_| f()),
88 (self.artificial_delay)(name),
89 );
90 }
91}