near_async/test_loop/
sender.rs

1use std::any::type_name;
2use std::fmt::Debug;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use crate::futures::DelayedActionRunner;
7use crate::messaging::{Actor, AsyncSendError, CanSend, CanSendAsync, HandlerWithContext};
8use crate::time::Duration;
9
10use super::PendingEventsSender;
11use super::data::{TestLoopData, TestLoopDataHandle};
12use futures::FutureExt;
13use futures::channel::oneshot;
14use futures::future::BoxFuture;
15
16/// TestLoopSender implements the CanSend methods for an actor that can Handle them.
17///
18/// ```rust, ignore
19/// let actor = TestActor::new();
20/// let adapter = LateBoundSender::new();
21///
22/// let sender: TestLoopSender<TestActor> = data.register_actor(actor, Some(adapter));
23///
24/// // We can now send messages to the actor using the sender and adapter.
25/// sender.send(TestMessage {});
26/// adapter.send(TestMessage {});
27/// ```
28///
29/// For the purposes of testloop, we keep a copy of the delay sender that is used to schedule
30/// callbacks on the testloop to execute either the actor.handle() function or the
31/// DelayedActionRunner.run_later_boxed() function.
32pub struct TestLoopSender<A>
33where
34    A: 'static,
35{
36    actor_handle: TestLoopDataHandle<A>,
37    pending_events_sender: PendingEventsSender,
38    shutting_down: Arc<AtomicBool>,
39    sender_delay: Duration,
40}
41
42impl<A> Clone for TestLoopSender<A> {
43    fn clone(&self) -> Self {
44        Self {
45            actor_handle: self.actor_handle.clone(),
46            pending_events_sender: self.pending_events_sender.clone(),
47            shutting_down: self.shutting_down.clone(),
48            sender_delay: self.sender_delay,
49        }
50    }
51}
52
53/// `DelayedActionRunner` that schedules the action to be run later by the TestLoop event loop.
54impl<A> DelayedActionRunner<A> for TestLoopSender<A>
55where
56    A: 'static,
57{
58    fn run_later_boxed(
59        &mut self,
60        name: &str,
61        dur: Duration,
62        f: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send + 'static>,
63    ) {
64        if self.shutting_down.load(Ordering::Relaxed) {
65            return;
66        }
67
68        let mut this = self.clone();
69        let callback = move |data: &mut TestLoopData| {
70            let actor = data.get_mut(&this.actor_handle);
71            f(actor, &mut this);
72        };
73        self.pending_events_sender.send_with_delay(
74            format!("DelayedAction {}({:?})", pretty_type_name::<A>(), name),
75            Box::new(callback),
76            dur,
77        );
78    }
79}
80
81impl<M, A> CanSend<M> for TestLoopSender<A>
82where
83    M: Debug + Send + 'static,
84    A: Actor + HandlerWithContext<M> + 'static,
85{
86    fn send(&self, msg: M) {
87        let mut this = self.clone();
88        let description = format!("{}({:?})", pretty_type_name::<A>(), &msg);
89        let callback = move |data: &mut TestLoopData| {
90            let actor = data.get_mut(&this.actor_handle);
91            actor.handle(msg, &mut this);
92        };
93        self.pending_events_sender.send_with_delay(
94            description,
95            Box::new(callback),
96            self.sender_delay,
97        );
98    }
99}
100
101impl<M, R, A> CanSendAsync<M, R> for TestLoopSender<A>
102where
103    M: Debug + Send + 'static,
104    A: Actor + HandlerWithContext<M, R> + 'static,
105    R: Send + 'static,
106{
107    fn send_async(&self, msg: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
108        let mut this = self.clone();
109        let description = format!("{}({:?})", pretty_type_name::<A>(), &msg);
110        let (sender, receiver) = oneshot::channel::<R>();
111        let callback = move |data: &mut TestLoopData| {
112            let actor = data.get_mut(&this.actor_handle);
113            let result = actor.handle(msg, &mut this);
114            sender.send(result).ok();
115        };
116        self.pending_events_sender.send_with_delay(
117            description,
118            Box::new(callback),
119            self.sender_delay,
120        );
121        async move { Ok(receiver.await.unwrap()) }.boxed()
122    }
123}
124
125impl<A> TestLoopSender<A>
126where
127    A: Actor + 'static,
128{
129    pub(crate) fn new(
130        actor_handle: TestLoopDataHandle<A>,
131        pending_events_sender: PendingEventsSender,
132        shutting_down: Arc<AtomicBool>,
133    ) -> Self {
134        Self { actor_handle, pending_events_sender, shutting_down, sender_delay: Duration::ZERO }
135    }
136
137    /// Returns a new TestLoopSender which sends messages with the given delay.
138    pub fn with_delay(self, delay: Duration) -> Self {
139        Self { sender_delay: delay, ..self }
140    }
141
142    pub fn actor_handle(&self) -> TestLoopDataHandle<A> {
143        self.actor_handle.clone()
144    }
145}
146
147// Quick and dirty way of getting the type name without the module path.
148// Does not work for more complex types like std::sync::Arc<std::sync::atomic::AtomicBool<...>>
149// example near_chunks::shards_manager_actor::ShardsManagerActor -> ShardsManagerActor
150fn pretty_type_name<T>() -> &'static str {
151    type_name::<T>().split("::").last().unwrap()
152}