near_async/test_loop/
sender.rs1use std::any::type_name;
2use std::fmt::Debug;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use crate::futures::DelayedActionRunner;
7use crate::messaging::{Actor, AsyncSendError, CanSend, CanSendAsync, HandlerWithContext};
8use crate::time::Duration;
9
10use super::PendingEventsSender;
11use super::data::{TestLoopData, TestLoopDataHandle};
12use futures::FutureExt;
13use futures::channel::oneshot;
14use futures::future::BoxFuture;
15
16pub struct TestLoopSender<A>
33where
34 A: 'static,
35{
36 actor_handle: TestLoopDataHandle<A>,
37 pending_events_sender: PendingEventsSender,
38 shutting_down: Arc<AtomicBool>,
39 sender_delay: Duration,
40}
41
42impl<A> Clone for TestLoopSender<A> {
43 fn clone(&self) -> Self {
44 Self {
45 actor_handle: self.actor_handle.clone(),
46 pending_events_sender: self.pending_events_sender.clone(),
47 shutting_down: self.shutting_down.clone(),
48 sender_delay: self.sender_delay,
49 }
50 }
51}
52
53impl<A> DelayedActionRunner<A> for TestLoopSender<A>
55where
56 A: 'static,
57{
58 fn run_later_boxed(
59 &mut self,
60 name: &str,
61 dur: Duration,
62 f: Box<dyn FnOnce(&mut A, &mut dyn DelayedActionRunner<A>) + Send + 'static>,
63 ) {
64 if self.shutting_down.load(Ordering::Relaxed) {
65 return;
66 }
67
68 let mut this = self.clone();
69 let callback = move |data: &mut TestLoopData| {
70 let actor = data.get_mut(&this.actor_handle);
71 f(actor, &mut this);
72 };
73 self.pending_events_sender.send_with_delay(
74 format!("DelayedAction {}({:?})", pretty_type_name::<A>(), name),
75 Box::new(callback),
76 dur,
77 );
78 }
79}
80
81impl<M, A> CanSend<M> for TestLoopSender<A>
82where
83 M: Debug + Send + 'static,
84 A: Actor + HandlerWithContext<M> + 'static,
85{
86 fn send(&self, msg: M) {
87 let mut this = self.clone();
88 let description = format!("{}({:?})", pretty_type_name::<A>(), &msg);
89 let callback = move |data: &mut TestLoopData| {
90 let actor = data.get_mut(&this.actor_handle);
91 actor.handle(msg, &mut this);
92 };
93 self.pending_events_sender.send_with_delay(
94 description,
95 Box::new(callback),
96 self.sender_delay,
97 );
98 }
99}
100
101impl<M, R, A> CanSendAsync<M, R> for TestLoopSender<A>
102where
103 M: Debug + Send + 'static,
104 A: Actor + HandlerWithContext<M, R> + 'static,
105 R: Send + 'static,
106{
107 fn send_async(&self, msg: M) -> BoxFuture<'static, Result<R, AsyncSendError>> {
108 let mut this = self.clone();
109 let description = format!("{}({:?})", pretty_type_name::<A>(), &msg);
110 let (sender, receiver) = oneshot::channel::<R>();
111 let callback = move |data: &mut TestLoopData| {
112 let actor = data.get_mut(&this.actor_handle);
113 let result = actor.handle(msg, &mut this);
114 sender.send(result).ok();
115 };
116 self.pending_events_sender.send_with_delay(
117 description,
118 Box::new(callback),
119 self.sender_delay,
120 );
121 async move { Ok(receiver.await.unwrap()) }.boxed()
122 }
123}
124
125impl<A> TestLoopSender<A>
126where
127 A: Actor + 'static,
128{
129 pub(crate) fn new(
130 actor_handle: TestLoopDataHandle<A>,
131 pending_events_sender: PendingEventsSender,
132 shutting_down: Arc<AtomicBool>,
133 ) -> Self {
134 Self { actor_handle, pending_events_sender, shutting_down, sender_delay: Duration::ZERO }
135 }
136
137 pub fn with_delay(self, delay: Duration) -> Self {
139 Self { sender_delay: delay, ..self }
140 }
141
142 pub fn actor_handle(&self) -> TestLoopDataHandle<A> {
143 self.actor_handle.clone()
144 }
145}
146
147fn pretty_type_name<T>() -> &'static str {
151 type_name::<T>().split("::").last().unwrap()
152}