1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
use super::{delay_sender::DelaySender, event_handler::LoopEventHandler, TestLoop};
use crate::futures::{AsyncComputationSpawner, DelayedActionRunner};
use crate::test_loop::event_handler::TryIntoOrSelf;
use crate::{futures::FutureSpawner, messaging::CanSend};
use futures::future::BoxFuture;
use futures::task::{waker_ref, ArcWake};
use near_time::Duration;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::Context;
// Support for futures in TestLoop.
//
// There are two key features this file provides for TestLoop:
//
// 1. A general way to spawn futures and have the TestLoop drive the futures.
// To support this, add () to the Data, add Arc<TestLoopTask> as an Event,
// and add DriveFutures as a handler. Finally, pass
// DelaySender<Arc<TestLoopTask>> as the &dyn FutureSpawner to any
// component that needs to spawn futures.
//
// This causes any futures spawned during the test to end up as an event
// (an Arc<TestLoopTask>) in the test loop. The event will eventually be
// executed by the DriveFutures handler, which will drive the future
// until it is either suspended or completed. If suspended, then the waker
// of the future (called when the future is ready to resume) will place
// the Arc<TestLoopTask> event back into the test loop to be executed
// again.
//
// 2. A way to send a message to the TestLoop and expect a response as a
// future, which will resolve whenever the TestLoop handles the message.
// To support this, use MessageWithCallback<Request, Response> as the
// event type, and in the handler, call (event.responder)(result)
// (possibly asynchronously) to complete the future.
//
// This is needed to support the AsyncSender interface, which is required
// by some components as they expect a response to each message. The way
// this is implemented is by implementing a conversion from
// DelaySender<MessageWithCallback<Request, Response>> to
// AsyncSender<Request, Response>.
/// A message, plus a response callback. This should be used as the event type
/// when testing an Actix component that's expected to return a result.
///
/// The response is used to complete the future that is returned by
/// our `AsyncSender::send_async` implementation.
pub struct TestLoopTask {
future: Mutex<Option<BoxFuture<'static, ()>>>,
sender: DelaySender<Arc<TestLoopTask>>,
description: String,
}
impl ArcWake for TestLoopTask {
fn wake_by_ref(arc_self: &Arc<Self>) {
let clone = arc_self.clone();
arc_self.sender.send(clone);
}
}
impl Debug for TestLoopTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Task").field(&self.description).finish()
}
}
/// Drives any Arc<TestLoopTask> events (futures spawned by our implementation
/// of FutureSpawner) that are remaining in the loop.
pub fn drive_futures() -> LoopEventHandler<(), Arc<TestLoopTask>> {
LoopEventHandler::new_simple(|task: Arc<TestLoopTask>, _| {
// The following is copied from the Rust async book.
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
if future.as_mut().poll(context).is_pending() {
// We're still not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
})
}
/// A DelaySender<Arc<TestLoopTask>> is a FutureSpawner that can be used to
/// spawn futures into the test loop. We give it a convenient alias.
pub type TestLoopFutureSpawner = DelaySender<Arc<TestLoopTask>>;
impl FutureSpawner for TestLoopFutureSpawner {
fn spawn_boxed(&self, description: &str, f: BoxFuture<'static, ()>) {
let task = Arc::new(TestLoopTask {
future: Mutex::new(Some(f)),
sender: self.clone(),
description: description.to_string(),
});
self.send(task);
}
}
/// Represents an action that was scheduled to run later, by using
/// `DelayedActionRunner::run_later`.
pub struct TestLoopDelayedActionEvent<T> {
name: String,
action: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
}
impl<T> Debug for TestLoopDelayedActionEvent<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("DelayedAction").field(&self.name).finish()
}
}
/// An event handler that handles only `TestLoopDelayedActionEvent`s, by
/// running the action encapsulated in the event.
pub fn drive_delayed_action_runners<T>(
sender: DelaySender<TestLoopDelayedActionEvent<T>>,
shutting_down: Arc<AtomicBool>,
) -> LoopEventHandler<T, TestLoopDelayedActionEvent<T>> {
LoopEventHandler::new_simple(move |event: TestLoopDelayedActionEvent<T>, data: &mut T| {
let mut runner = TestLoopDelayedActionRunner {
sender: sender.clone(),
shutting_down: shutting_down.clone(),
};
(event.action)(data, &mut runner);
})
}
/// `DelayedActionRunner` that schedules the action to be run later by the
/// TestLoop event loop.
pub struct TestLoopDelayedActionRunner<T> {
pub(crate) sender: DelaySender<TestLoopDelayedActionEvent<T>>,
pub(crate) shutting_down: Arc<AtomicBool>,
}
impl<T> DelayedActionRunner<T> for TestLoopDelayedActionRunner<T> {
fn run_later_boxed(
&mut self,
name: &str,
dur: Duration,
action: Box<dyn FnOnce(&mut T, &mut dyn DelayedActionRunner<T>) + Send + 'static>,
) {
if self.shutting_down.load(Ordering::Relaxed) {
return;
}
self.sender.send_with_delay(
TestLoopDelayedActionEvent { name: name.to_string(), action },
dur.try_into().unwrap(),
);
}
}
impl<Data: 'static, Event: Debug + Send + 'static> TestLoop<Data, Event> {
/// Shorthand for registering this frequently used handler.
pub fn register_delayed_action_handler<T>(&mut self)
where
T: 'static,
Data: AsMut<T>,
Event: TryIntoOrSelf<TestLoopDelayedActionEvent<T>>
+ From<TestLoopDelayedActionEvent<T>>
+ 'static,
{
self.register_handler(
drive_delayed_action_runners::<T>(self.sender().narrow(), self.shutting_down()).widen(),
);
}
}
impl<Data: 'static, Event: Debug + Send + 'static> TestLoop<Vec<Data>, (usize, Event)> {
/// Shorthand for registering this frequently used handler for a multi-instance test.
pub fn register_delayed_action_handler_for_index<T>(&mut self, idx: usize)
where
T: 'static,
Data: AsMut<T>,
Event: TryIntoOrSelf<TestLoopDelayedActionEvent<T>>
+ From<TestLoopDelayedActionEvent<T>>
+ 'static,
{
self.register_handler(
drive_delayed_action_runners::<T>(
self.sender().for_index(idx).narrow(),
self.shutting_down(),
)
.widen()
.for_index(idx),
);
}
}
/// An event that represents async computation. See async_computation_spawner() in DelaySender.
pub struct TestLoopAsyncComputationEvent {
name: String,
f: Box<dyn FnOnce() + Send>,
}
impl Debug for TestLoopAsyncComputationEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("AsyncComputation").field(&self.name).finish()
}
}
/// AsyncComputationSpawner that spawns the computation in the TestLoop.
pub struct TestLoopAsyncComputationSpawner {
pub(crate) sender: DelaySender<TestLoopAsyncComputationEvent>,
pub(crate) artificial_delay: Box<dyn Fn(&str) -> Duration + Send + Sync>,
}
impl AsyncComputationSpawner for TestLoopAsyncComputationSpawner {
fn spawn_boxed(&self, name: &str, f: Box<dyn FnOnce() + Send>) {
self.sender.send_with_delay(
TestLoopAsyncComputationEvent { name: name.to_string(), f },
(self.artificial_delay)(name),
);
}
}
pub fn drive_async_computations() -> LoopEventHandler<(), TestLoopAsyncComputationEvent> {
LoopEventHandler::new_simple(|event: TestLoopAsyncComputationEvent, _data: &mut ()| {
(event.f)();
})
}