1use crate::{ActorError, actor::Actor};
2use async_trait::async_trait;
3use std::any::Any;
4use std::sync::Arc;
5
6pub(crate) trait Messenger {
7 type TargetActor: Actor;
8
9 #[cfg(feature = "unbounded-channel")]
10 fn send(
11 tx: tokio::sync::mpsc::UnboundedSender<Message<Self::TargetActor>>,
12 msg: Arc<<Self::TargetActor as Actor>::Message>,
13 ) -> Result<(), ActorError> {
14 tx.send(Message::new(msg, None))
15 .map_err(|e| ActorError::UnboundedChannelSend(e.to_string()))
16 }
17 #[cfg(feature = "bounded-channel")]
18 async fn send(
19 tx: tokio::sync::mpsc::Sender<Message<Self::TargetActor>>,
20 msg: Arc<<Self::TargetActor as Actor>::Message>,
21 ) -> Result<(), ActorError> {
22 tx.send(Message::new(msg, None))
23 .await
24 .map_err(|e| ActorError::BoundedChannelSend(e.to_string()))
25 }
26
27 #[cfg(feature = "unbounded-channel")]
28 async fn send_and_recv(
29 tx: tokio::sync::mpsc::UnboundedSender<Message<Self::TargetActor>>,
30 msg: Arc<<Self::TargetActor as Actor>::Message>,
31 ) -> Result<<Self::TargetActor as Actor>::Result, ActorError> {
32 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
33 tx.send(Message::new(msg, Some(result_tx)))
34 .map_err(|e| ActorError::UnboundedChannelSend(e.to_string()))?;
35 Ok(result_rx.await?)
36 }
37 #[cfg(feature = "bounded-channel")]
38 async fn send_and_recv(
39 tx: tokio::sync::mpsc::Sender<Message<Self::TargetActor>>,
40 msg: Arc<<Self::TargetActor as Actor>::Message>,
41 ) -> Result<<Self::TargetActor as Actor>::Result, ActorError> {
42 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
43 tx.send(Message::new(msg, Some(result_tx)))
44 .await
45 .map_err(|e| ActorError::BoundedChannelSend(e.to_string()))?;
46 Ok(result_rx.await?)
47 }
48}
49
50#[async_trait]
51pub trait Mailbox: Send + Sync {
52 async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError>;
53 async fn send_and_recv(
54 &self,
55 msg: Arc<dyn Any + Send + Sync>,
56 ) -> Result<Box<dyn Any + Send>, ActorError>;
57}
58#[cfg(feature = "unbounded-channel")]
59pub struct TypedMailbox<A: Actor + Send + Sync> {
60 tx: tokio::sync::mpsc::UnboundedSender<Message<A>>,
61}
62#[cfg(feature = "bounded-channel")]
63pub struct TypedMailbox<A: Actor + Send + Sync> {
64 tx: tokio::sync::mpsc::Sender<Message<A>>,
65}
66
67#[cfg(feature = "unbounded-channel")]
68impl<A: Actor + Send + Sync> TypedMailbox<A> {
69 pub fn new(tx: tokio::sync::mpsc::UnboundedSender<Message<A>>) -> Self {
70 Self { tx }
71 }
72}
73#[cfg(feature = "bounded-channel")]
74impl<A: Actor + Send + Sync> TypedMailbox<A> {
75 pub fn new(tx: tokio::sync::mpsc::Sender<Message<A>>) -> Self {
76 Self { tx }
77 }
78}
79
80impl<A: Actor + Send + Sync> Messenger for TypedMailbox<A> {
81 type TargetActor = A;
82}
83
84#[async_trait]
85impl<A> Mailbox for TypedMailbox<A>
86where
87 A: Actor + Send + Sync + 'static,
88 A::Message: Any + Send + Sync + 'static,
89 A::Result: Any + Send + 'static,
90{
91 #[cfg(feature = "unbounded-channel")]
92 async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError> {
93 let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
94 <Self as Messenger>::send(self.tx.clone(), msg)
95 }
96
97 #[cfg(feature = "bounded-channel")]
98 async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError> {
99 let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
100 <Self as Messenger>::send(self.tx.clone(), msg).await
101 }
102
103 async fn send_and_recv(
104 &self,
105 msg: Arc<dyn Any + Send + Sync>,
106 ) -> Result<Box<dyn Any + Send>, ActorError> {
107 let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
108 let result = <Self as Messenger>::send_and_recv(self.tx.clone(), msg).await?;
109 Ok(Box::new(result))
110 }
111}
112
113#[derive(Debug)]
114pub struct Message<T: Actor> {
120 inner: Arc<<T as Actor>::Message>,
121 result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
122}
123impl<T> Clone for Message<T>
124where
125 T: Actor,
126{
127 fn clone(&self) -> Self {
128 Self {
129 inner: self.inner.clone(),
130 result_tx: None,
131 }
132 }
133}
134
135impl<T> Message<T>
136where
137 T: Actor,
138{
139 pub fn new(
140 inner: Arc<<T as Actor>::Message>,
141 result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
142 ) -> Self {
143 Self { inner, result_tx }
144 }
145
146 pub fn inner(&self) -> Arc<<T as Actor>::Message> {
147 self.inner.clone()
148 }
149
150 pub fn result_tx(&mut self) -> Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>> {
151 let tx = self.result_tx.take();
152 self.result_tx = None;
153 tx
154 }
155}
156
157#[derive(Debug, Clone)]
158pub struct JobSpec {
166 max_iter: Option<usize>,
167 interval: Option<std::time::Duration>,
168 start_at: std::time::SystemTime,
169}
170
171impl JobSpec {
172 pub fn new(
173 max_iter: Option<usize>,
174 interval: Option<std::time::Duration>,
175 start_at: std::time::SystemTime,
176 ) -> Self {
177 if let None = interval {
178 Self {
179 max_iter: Some(1),
180 interval,
181 start_at,
182 }
183 } else {
184 Self {
185 max_iter,
186 interval,
187 start_at,
188 }
189 }
190 }
191
192 pub fn max_iter(&self) -> Option<usize> {
193 self.max_iter
194 }
195
196 pub fn start_at(&self) -> std::time::SystemTime {
197 self.start_at
198 }
199
200 pub fn interval(&self) -> Option<std::time::Duration> {
201 self.interval
202 }
203}
204
205impl Default for JobSpec {
206 fn default() -> Self {
207 Self {
208 max_iter: Some(1),
209 interval: None,
210 start_at: std::time::SystemTime::now(),
211 }
212 }
213}