Skip to main content

xan_actor/
types.rs

1use crate::{ActorError, actor::Actor};
2use async_trait::async_trait;
3use std::any::Any;
4use std::sync::Arc;
5
6pub(crate) trait Messenger {
7    type TargetActor: Actor;
8
9    #[cfg(feature = "unbounded-channel")]
10    fn send(
11        tx: tokio::sync::mpsc::UnboundedSender<Message<Self::TargetActor>>,
12        msg: Arc<<Self::TargetActor as Actor>::Message>,
13    ) -> Result<(), ActorError> {
14        tx.send(Message::new(msg, None))
15            .map_err(|e| ActorError::UnboundedChannelSend(e.to_string()))
16    }
17    #[cfg(feature = "bounded-channel")]
18    async fn send(
19        tx: tokio::sync::mpsc::Sender<Message<Self::TargetActor>>,
20        msg: Arc<<Self::TargetActor as Actor>::Message>,
21    ) -> Result<(), ActorError> {
22        tx.send(Message::new(msg, None))
23            .await
24            .map_err(|e| ActorError::BoundedChannelSend(e.to_string()))
25    }
26
27    #[cfg(feature = "unbounded-channel")]
28    async fn send_and_recv(
29        tx: tokio::sync::mpsc::UnboundedSender<Message<Self::TargetActor>>,
30        msg: Arc<<Self::TargetActor as Actor>::Message>,
31    ) -> Result<<Self::TargetActor as Actor>::Result, ActorError> {
32        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
33        tx.send(Message::new(msg, Some(result_tx)))
34            .map_err(|e| ActorError::UnboundedChannelSend(e.to_string()))?;
35        Ok(result_rx.await?)
36    }
37    #[cfg(feature = "bounded-channel")]
38    async fn send_and_recv(
39        tx: tokio::sync::mpsc::Sender<Message<Self::TargetActor>>,
40        msg: Arc<<Self::TargetActor as Actor>::Message>,
41    ) -> Result<<Self::TargetActor as Actor>::Result, ActorError> {
42        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
43        tx.send(Message::new(msg, Some(result_tx)))
44            .await
45            .map_err(|e| ActorError::BoundedChannelSend(e.to_string()))?;
46        Ok(result_rx.await?)
47    }
48}
49
50#[async_trait]
51pub trait Mailbox: Send + Sync {
52    async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError>;
53    async fn send_and_recv(
54        &self,
55        msg: Arc<dyn Any + Send + Sync>,
56    ) -> Result<Box<dyn Any + Send>, ActorError>;
57}
58#[cfg(feature = "unbounded-channel")]
59pub struct TypedMailbox<A: Actor + Send + Sync> {
60    tx: tokio::sync::mpsc::UnboundedSender<Message<A>>,
61}
62#[cfg(feature = "bounded-channel")]
63pub struct TypedMailbox<A: Actor + Send + Sync> {
64    tx: tokio::sync::mpsc::Sender<Message<A>>,
65}
66
67#[cfg(feature = "unbounded-channel")]
68impl<A: Actor + Send + Sync> TypedMailbox<A> {
69    pub fn new(tx: tokio::sync::mpsc::UnboundedSender<Message<A>>) -> Self {
70        Self { tx }
71    }
72}
73#[cfg(feature = "bounded-channel")]
74impl<A: Actor + Send + Sync> TypedMailbox<A> {
75    pub fn new(tx: tokio::sync::mpsc::Sender<Message<A>>) -> Self {
76        Self { tx }
77    }
78}
79
80impl<A: Actor + Send + Sync> Messenger for TypedMailbox<A> {
81    type TargetActor = A;
82}
83
84#[async_trait]
85impl<A> Mailbox for TypedMailbox<A>
86where
87    A: Actor + Send + Sync + 'static,
88    A::Message: Any + Send + Sync + 'static,
89    A::Result: Any + Send + 'static,
90{
91    #[cfg(feature = "unbounded-channel")]
92    async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError> {
93        let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
94        <Self as Messenger>::send(self.tx.clone(), msg)
95    }
96
97    #[cfg(feature = "bounded-channel")]
98    async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError> {
99        let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
100        <Self as Messenger>::send(self.tx.clone(), msg).await
101    }
102
103    async fn send_and_recv(
104        &self,
105        msg: Arc<dyn Any + Send + Sync>,
106    ) -> Result<Box<dyn Any + Send>, ActorError> {
107        let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
108        let result = <Self as Messenger>::send_and_recv(self.tx.clone(), msg).await?;
109        Ok(Box::new(result))
110    }
111}
112
113#[derive(Debug)]
114/// A message that can be sent to a worker thread.
115/// > You don't need to implement this trait.
116/// > It is a wrapper for data that you sent.
117/// This message contains the data to be processed and a result channel.
118/// The result channel is used to send the result back to the sender.
119pub struct Message<T: Actor> {
120    inner: Arc<<T as Actor>::Message>,
121    result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
122}
123impl<T> Clone for Message<T>
124where
125    T: Actor,
126{
127    fn clone(&self) -> Self {
128        Self {
129            inner: self.inner.clone(),
130            result_tx: None,
131        }
132    }
133}
134
135impl<T> Message<T>
136where
137    T: Actor,
138{
139    pub fn new(
140        inner: Arc<<T as Actor>::Message>,
141        result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
142    ) -> Self {
143        Self { inner, result_tx }
144    }
145
146    pub fn inner(&self) -> Arc<<T as Actor>::Message> {
147        self.inner.clone()
148    }
149
150    pub fn result_tx(&mut self) -> Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>> {
151        let tx = self.result_tx.take();
152        self.result_tx = None;
153        tx
154    }
155}
156
157#[derive(Debug, Clone)]
158/// A specification for a job that can be executed by a worker thread.
159/// The `max_iter` is the maximum number of iterations the job will be executed.
160/// If `max_iter` is `None`, the job will be executed indefinitely.
161/// The `interval` is the time between two iterations of the job.
162/// If the `interval` is `None`, the job will be executed only once.
163/// The `start_at` is the time when the job will start executing.
164/// If the `start_at` is in the past, the job will start executing immediately.
165pub struct JobSpec {
166    max_iter: Option<usize>,
167    interval: Option<std::time::Duration>,
168    start_at: std::time::SystemTime,
169}
170
171impl JobSpec {
172    pub fn new(
173        max_iter: Option<usize>,
174        interval: Option<std::time::Duration>,
175        start_at: std::time::SystemTime,
176    ) -> Self {
177        if let None = interval {
178            Self {
179                max_iter: Some(1),
180                interval,
181                start_at,
182            }
183        } else {
184            Self {
185                max_iter,
186                interval,
187                start_at,
188            }
189        }
190    }
191
192    pub fn max_iter(&self) -> Option<usize> {
193        self.max_iter
194    }
195
196    pub fn start_at(&self) -> std::time::SystemTime {
197        self.start_at
198    }
199
200    pub fn interval(&self) -> Option<std::time::Duration> {
201        self.interval
202    }
203}
204
205impl Default for JobSpec {
206    fn default() -> Self {
207        Self {
208            max_iter: Some(1),
209            interval: None,
210            start_at: std::time::SystemTime::now(),
211        }
212    }
213}