near_async/test_loop/
sender.rs1use std::any::type_name;
2use std::fmt::Debug;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use crate::futures::DelayedActionRunner;
7use crate::messaging::{Actor, CanSend, HandlerWithContext, MessageWithCallback};
8use crate::time::Duration;
9
10use super::data::{TestLoopData, TestLoopDataHandle};
11use super::PendingEventsSender;
12use futures::FutureExt;
13
14pub struct TestLoopSender<A>
32where
33 A: 'static,
34{
35 actor_handle: TestLoopDataHandle<A>,
36 pending_events_sender: PendingEventsSender,
37 shutting_down: Arc<AtomicBool>,
38 sender_delay: Duration,
39}
40
41impl<A> Clone for TestLoopSender<A> {
42 fn clone(&self) -> Self {
43 Self {
44 actor_handle: self.actor_handle.clone(),
45 pending_events_sender: self.pending_events_sender.clone(),
46 shutting_down: self.shutting_down.clone(),
47 sender_delay: self.sender_delay,
48 }
49 }
50}
51
52impl<A> DelayedActionRunner<A> for TestLoopSender<A>
54where
55 A: 'static,
56{
57 fn run_later_boxed(
58 &mut self,
59 name: &str,
60 dur: Duration,
61 f: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send + 'static>,
62 ) {
63 if self.shutting_down.load(Ordering::Relaxed) {
64 return;
65 }
66
67 let mut this = self.clone();
68 let callback = move |data: &mut TestLoopData| {
69 let actor = data.get_mut(&this.actor_handle);
70 f(actor, &mut this);
71 };
72 self.pending_events_sender.send_with_delay(
73 format!("DelayedAction {}({:?})", pretty_type_name::<A>(), name),
74 Box::new(callback),
75 dur,
76 );
77 }
78}
79
80impl<M, A> CanSend<M> for TestLoopSender<A>
81where
82 M: actix::Message + Debug + Send + 'static,
83 A: Actor + HandlerWithContext<M> + 'static,
84 M::Result: Send,
85{
86 fn send(&self, msg: M) {
87 let mut this = self.clone();
88 let description = format!("{}({:?})", pretty_type_name::<A>(), &msg);
89 let callback = move |data: &mut TestLoopData| {
90 let actor = data.get_mut(&this.actor_handle);
91 actor.handle(msg, &mut this);
92 };
93 self.pending_events_sender.send_with_delay(
94 description,
95 Box::new(callback),
96 self.sender_delay,
97 );
98 }
99}
100
101impl<M, R, A> CanSend<MessageWithCallback<M, R>> for TestLoopSender<A>
102where
103 M: actix::Message<Result = R> + Debug + Send + 'static,
104 A: Actor + HandlerWithContext<M> + 'static,
105 R: 'static + Send,
106{
107 fn send(&self, msg: MessageWithCallback<M, R>) {
108 let mut this = self.clone();
109 let description = format!("{}({:?})", pretty_type_name::<A>(), &msg.message);
110 let callback = move |data: &mut TestLoopData| {
111 let MessageWithCallback { message: msg, callback } = msg;
112 let actor = data.get_mut(&this.actor_handle);
113 let result = actor.handle(msg, &mut this);
114 callback(async move { Ok(result) }.boxed());
115 };
116 self.pending_events_sender.send_with_delay(
117 description,
118 Box::new(callback),
119 self.sender_delay,
120 );
121 }
122}
123
124impl<A> TestLoopSender<A>
125where
126 A: Actor + 'static,
127{
128 pub(crate) fn new(
129 actor_handle: TestLoopDataHandle<A>,
130 pending_events_sender: PendingEventsSender,
131 shutting_down: Arc<AtomicBool>,
132 ) -> Self {
133 Self { actor_handle, pending_events_sender, shutting_down, sender_delay: Duration::ZERO }
134 }
135
136 pub fn with_delay(self, delay: Duration) -> Self {
138 Self { sender_delay: delay, ..self }
139 }
140
141 pub fn actor_handle(&self) -> TestLoopDataHandle<A> {
142 self.actor_handle.clone()
143 }
144}
145
146fn pretty_type_name<T>() -> &'static str {
150 type_name::<T>().split("::").last().unwrap()
151}