near_async/test_loop/
sender.rs

1use std::any::type_name;
2use std::fmt::Debug;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use crate::futures::DelayedActionRunner;
7use crate::messaging::{Actor, CanSend, HandlerWithContext, MessageWithCallback};
8use crate::time::Duration;
9
10use super::data::{TestLoopData, TestLoopDataHandle};
11use super::PendingEventsSender;
12use futures::FutureExt;
13
14/// TestLoopSender implements the CanSend methods for an actor that can Handle them. This is
15/// similar to our pattern of having an ActixWrapper around an actor to send messages to it.
16///
17/// ```rust, ignore
18/// let actor = TestActor::new();
19/// let adapter = LateBoundSender::new();
20///
21/// let sender: TestLoopSender<TestActor> = data.register_actor(actor, Some(adapter));
22///
23/// // We can now send messages to the actor using the sender and adapter.
24/// sender.send(TestMessage {});
25/// adapter.send(TestMessage {});
26/// ```
27///
28/// For the purposes of testloop, we keep a copy of the delay sender that is used to schedule
29/// callbacks on the testloop to execute either the actor.handle() function or the
30/// DelayedActionRunner.run_later_boxed() function.
31pub struct TestLoopSender<A>
32where
33    A: 'static,
34{
35    actor_handle: TestLoopDataHandle<A>,
36    pending_events_sender: PendingEventsSender,
37    shutting_down: Arc<AtomicBool>,
38    sender_delay: Duration,
39}
40
41impl<A> Clone for TestLoopSender<A> {
42    fn clone(&self) -> Self {
43        Self {
44            actor_handle: self.actor_handle.clone(),
45            pending_events_sender: self.pending_events_sender.clone(),
46            shutting_down: self.shutting_down.clone(),
47            sender_delay: self.sender_delay,
48        }
49    }
50}
51
52/// `DelayedActionRunner` that schedules the action to be run later by the TestLoop event loop.
53impl<A> DelayedActionRunner<A> for TestLoopSender<A>
54where
55    A: 'static,
56{
57    fn run_later_boxed(
58        &mut self,
59        name: &str,
60        dur: Duration,
61        f: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send + 'static>,
62    ) {
63        if self.shutting_down.load(Ordering::Relaxed) {
64            return;
65        }
66
67        let mut this = self.clone();
68        let callback = move |data: &mut TestLoopData| {
69            let actor = data.get_mut(&this.actor_handle);
70            f(actor, &mut this);
71        };
72        self.pending_events_sender.send_with_delay(
73            format!("DelayedAction {}({:?})", pretty_type_name::<A>(), name),
74            Box::new(callback),
75            dur,
76        );
77    }
78}
79
80impl<M, A> CanSend<M> for TestLoopSender<A>
81where
82    M: actix::Message + Debug + Send + 'static,
83    A: Actor + HandlerWithContext<M> + 'static,
84    M::Result: Send,
85{
86    fn send(&self, msg: M) {
87        let mut this = self.clone();
88        let description = format!("{}({:?})", pretty_type_name::<A>(), &msg);
89        let callback = move |data: &mut TestLoopData| {
90            let actor = data.get_mut(&this.actor_handle);
91            actor.handle(msg, &mut this);
92        };
93        self.pending_events_sender.send_with_delay(
94            description,
95            Box::new(callback),
96            self.sender_delay,
97        );
98    }
99}
100
101impl<M, R, A> CanSend<MessageWithCallback<M, R>> for TestLoopSender<A>
102where
103    M: actix::Message<Result = R> + Debug + Send + 'static,
104    A: Actor + HandlerWithContext<M> + 'static,
105    R: 'static + Send,
106{
107    fn send(&self, msg: MessageWithCallback<M, R>) {
108        let mut this = self.clone();
109        let description = format!("{}({:?})", pretty_type_name::<A>(), &msg.message);
110        let callback = move |data: &mut TestLoopData| {
111            let MessageWithCallback { message: msg, callback } = msg;
112            let actor = data.get_mut(&this.actor_handle);
113            let result = actor.handle(msg, &mut this);
114            callback(async move { Ok(result) }.boxed());
115        };
116        self.pending_events_sender.send_with_delay(
117            description,
118            Box::new(callback),
119            self.sender_delay,
120        );
121    }
122}
123
124impl<A> TestLoopSender<A>
125where
126    A: Actor + 'static,
127{
128    pub(crate) fn new(
129        actor_handle: TestLoopDataHandle<A>,
130        pending_events_sender: PendingEventsSender,
131        shutting_down: Arc<AtomicBool>,
132    ) -> Self {
133        Self { actor_handle, pending_events_sender, shutting_down, sender_delay: Duration::ZERO }
134    }
135
136    /// Returns a new TestLoopSender which sends messages with the given delay.
137    pub fn with_delay(self, delay: Duration) -> Self {
138        Self { sender_delay: delay, ..self }
139    }
140
141    pub fn actor_handle(&self) -> TestLoopDataHandle<A> {
142        self.actor_handle.clone()
143    }
144}
145
146// Quick and dirty way of getting the type name without the module path.
147// Does not work for more complex types like std::sync::Arc<std::sync::atomic::AtomicBool<...>>
148// example near_chunks::shards_manager_actor::ShardsManagerActor -> ShardsManagerActor
149fn pretty_type_name<T>() -> &'static str {
150    type_name::<T>().split("::").last().unwrap()
151}