Skip to main content

xan_actor/
types.rs

1use crate::channel;
2use crate::{ActorError, actor::Actor};
3use async_trait::async_trait;
4use std::any::Any;
5use std::sync::Arc;
6
7/// Internal helper trait that bridges the channel-specific sender into the
8/// `Mailbox` abstraction. Implemented automatically for `TypedMailbox<A>`;
9/// you should never need to implement it manually.
10pub(crate) trait Messenger {
11    type TargetActor: Actor;
12
13    async fn send(
14        tx: channel::Sender<Message<Self::TargetActor>>,
15        msg: Arc<<Self::TargetActor as Actor>::Message>,
16    ) -> Result<(), ActorError> {
17        channel::send(&tx, Message::new(msg, None))
18            .await
19            .map_err(|e| ActorError::ChannelSend(e.to_string()))
20    }
21
22    async fn send_and_recv(
23        tx: channel::Sender<Message<Self::TargetActor>>,
24        msg: Arc<<Self::TargetActor as Actor>::Message>,
25    ) -> Result<<Self::TargetActor as Actor>::Result, ActorError> {
26        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
27        channel::send(&tx, Message::new(msg, Some(result_tx)))
28            .await
29            .map_err(|e| ActorError::ChannelSend(e.to_string()))?;
30        Ok(result_rx.await?)
31    }
32}
33
34/// Type-erased mailbox interface stored in the system's actor map.
35///
36/// `actor_system_loop` keeps `Arc<dyn Mailbox>` per address so the loop
37/// can route arbitrary actor types through one HashMap. The payload type
38/// is `Arc<dyn Any + Send + Sync>`; the concrete `TypedMailbox<A>` impl
39/// downcasts it to `A::Message` before sending.
40#[async_trait]
41pub trait Mailbox: Send + Sync {
42    /// Fire-and-forget send. Downcast the payload to `A::Message` and push
43    /// it onto the actor's mpsc channel.
44    async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError>;
45    /// Send and await the actor's reply. The reply is returned as a boxed
46    /// `Any`; the caller downcasts to `A::Result`.
47    async fn send_and_recv(
48        &self,
49        msg: Arc<dyn Any + Send + Sync>,
50    ) -> Result<Box<dyn Any + Send>, ActorError>;
51}
52
53/// Concrete `Mailbox` for a specific actor type `A`. Holds the channel
54/// sender to the actor's run loop and performs the type-safe downcast on
55/// each send. Constructed in `Actor::run_actor` and erased to
56/// `Arc<dyn Mailbox>` for storage in the system's actor map.
57pub struct TypedMailbox<A: Actor + Send + Sync> {
58    tx: channel::Sender<Message<A>>,
59}
60
61impl<A: Actor + Send + Sync> TypedMailbox<A> {
62    /// Wrap an mpsc sender. Only called from `Actor::run_actor`.
63    pub fn new(tx: channel::Sender<Message<A>>) -> Self {
64        Self { tx }
65    }
66}
67
68impl<A: Actor + Send + Sync> Messenger for TypedMailbox<A> {
69    type TargetActor = A;
70}
71
72#[async_trait]
73impl<A> Mailbox for TypedMailbox<A>
74where
75    A: Actor + Send + Sync + 'static,
76    A::Message: Any + Send + Sync + 'static,
77    A::Result: Any + Send + 'static,
78{
79    async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError> {
80        let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
81        <Self as Messenger>::send(self.tx.clone(), msg).await
82    }
83
84    async fn send_and_recv(
85        &self,
86        msg: Arc<dyn Any + Send + Sync>,
87    ) -> Result<Box<dyn Any + Send>, ActorError> {
88        let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
89        let result = <Self as Messenger>::send_and_recv(self.tx.clone(), msg).await?;
90        Ok(Box::new(result))
91    }
92}
93
94/// Internal channel envelope wrapping the user's `<T as Actor>::Message`
95/// plus an optional `oneshot` reply channel.
96///
97/// Users never construct this directly — `send` / `send_and_recv` wrap
98/// the user's message into a `Message<T>` before pushing it onto the
99/// mailbox channel. Exposed so advanced callers can drive the mailbox
100/// directly via `handler_tx`. (Not in the prelude — the name is too
101/// close to `Self::Message` and would shadow user types.)
102///
103/// `Clone` is implemented but drops `result_tx` on the clone (you can't
104/// clone a oneshot sender). Clones therefore become fire-and-forget
105/// envelopes regardless of the original's reply channel.
106#[derive(Debug)]
107pub struct Message<T: Actor> {
108    inner: Arc<<T as Actor>::Message>,
109    result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
110}
111impl<T> Clone for Message<T>
112where
113    T: Actor,
114{
115    fn clone(&self) -> Self {
116        Self {
117            inner: self.inner.clone(),
118            result_tx: None,
119        }
120    }
121}
122
123impl<T> Message<T>
124where
125    T: Actor,
126{
127    /// Build an envelope. `result_tx = Some(_)` makes the send wait for a
128    /// reply (`send_and_recv` flow); `None` is fire-and-forget (`send`).
129    pub fn new(
130        inner: Arc<<T as Actor>::Message>,
131        result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
132    ) -> Self {
133        Self { inner, result_tx }
134    }
135
136    /// Clone-out the inner payload `Arc` for the receiving handler. The
137    /// envelope keeps its own reference until dropped.
138    pub fn inner(&self) -> Arc<<T as Actor>::Message> {
139        self.inner.clone()
140    }
141
142    /// Take the reply channel (if any), leaving `None` behind. The actor
143    /// loop calls this once after `handle` produces its `Ok` result and
144    /// uses the returned `Sender` to deliver the reply.
145    pub fn result_tx(&mut self) -> Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>> {
146        let tx = self.result_tx.take();
147        self.result_tx = None;
148        tx
149    }
150}
151
152/// Schedule for a job submitted via `ActorSystem::run_job`.
153///
154/// - `max_iter` — cap on iterations. `None` = run forever (until
155///   `abort_job` or the actor disappears). `Some(n)` exits after `n`
156///   successful iterations.
157/// - `interval` — wait between iterations. `None` is a single-shot job
158///   (`max_iter` is forced to `Some(1)` in `new`).
159/// - `start_at` — earliest moment the first iteration may run. If in the
160///   past, the job starts immediately. If in the future, the job loop
161///   sleeps until then before the first dispatch.
162#[derive(Debug, Clone)]
163pub struct JobSpec {
164    max_iter: Option<usize>,
165    interval: Option<std::time::Duration>,
166    start_at: std::time::SystemTime,
167}
168
169impl JobSpec {
170    /// Build a `JobSpec`. If `interval` is `None`, `max_iter` is forced
171    /// to `Some(1)` regardless of the value you pass — single-shot jobs
172    /// always run exactly once.
173    pub fn new(
174        max_iter: Option<usize>,
175        interval: Option<std::time::Duration>,
176        start_at: std::time::SystemTime,
177    ) -> Self {
178        if let None = interval {
179            Self {
180                max_iter: Some(1),
181                interval,
182                start_at,
183            }
184        } else {
185            Self {
186                max_iter,
187                interval,
188                start_at,
189            }
190        }
191    }
192
193    /// Maximum number of iterations, or `None` for infinite.
194    pub fn max_iter(&self) -> Option<usize> {
195        self.max_iter
196    }
197
198    /// Earliest moment the first iteration may run.
199    pub fn start_at(&self) -> std::time::SystemTime {
200        self.start_at
201    }
202
203    /// Time between iterations, or `None` for single-shot.
204    pub fn interval(&self) -> Option<std::time::Duration> {
205        self.interval
206    }
207}
208
209impl Default for JobSpec {
210    /// Single-shot, runs immediately. Equivalent to
211    /// `JobSpec::new(Some(1), None, SystemTime::now())`.
212    fn default() -> Self {
213        Self {
214            max_iter: Some(1),
215            interval: None,
216            start_at: std::time::SystemTime::now(),
217        }
218    }
219}
220
221/// Handle returned by `run_job`.
222///
223/// `job_id` is stable for the life of the job — pass it to
224/// `abort_job` / `stop_job` / `resume_job`. `result_subscriber_rx` is
225/// `Some` only when `run_job` was called with `subscribe = true`; each
226/// iteration's outcome is pushed onto it. When the job ends (max_iter
227/// reached, aborted, or single-shot completion) the channel is closed,
228/// so `recv()` returning `None` is the natural termination signal.
229#[derive(Debug)]
230pub struct RunJobResult<T: Actor> {
231    /// Identifier you pass to `abort_job` / `stop_job` / `resume_job`.
232    /// Defaults to a fresh UUID v4 unless an explicit id was supplied.
233    pub job_id: String,
234    /// Per-iteration result stream. `None` when the job was submitted
235    /// with `subscribe = false` (fire-and-forget).
236    pub result_subscriber_rx: Option<channel::Receiver<Result<<T as Actor>::Result, ActorError>>>,
237}
238
239/// Control-plane senders for a running job, kept by `actor_system_loop`
240/// and looked up by `job_id` when the user calls `abort_job` / `stop_job`
241/// / `resume_job`. Each channel carries `()` signals only; the job loop
242/// awakens via `tokio::select!` and decides what to do.
243///
244/// Users don't construct or inspect this directly — it lives behind the
245/// `abort_job` / `stop_job` / `resume_job` methods on `ActorSystem`.
246#[derive(Clone)]
247pub struct JobController {
248    /// Signal the job loop to exit immediately. Honored at every wait
249    /// point (`start_at`, pause-for-resume, inter-iteration sleep).
250    pub abort_tx: channel::Sender<()>,
251    /// Signal the job loop to pause after the current iteration. The
252    /// loop then waits on `resume_tx` (or `abort_tx`) before continuing.
253    pub stop_tx: channel::Sender<()>,
254    /// Signal a stopped job to continue iterating.
255    pub resume_tx: channel::Sender<()>,
256}