xan_actor/types.rs
1use crate::channel;
2use crate::{ActorError, actor::Actor};
3use async_trait::async_trait;
4use std::any::Any;
5use std::sync::Arc;
6
7/// Internal helper trait that bridges the channel-specific sender into the
8/// `Mailbox` abstraction. Implemented automatically for `TypedMailbox<A>`;
9/// you should never need to implement it manually.
10pub(crate) trait Messenger {
11 type TargetActor: Actor;
12
13 async fn send(
14 tx: channel::Sender<Message<Self::TargetActor>>,
15 msg: Arc<<Self::TargetActor as Actor>::Message>,
16 ) -> Result<(), ActorError> {
17 channel::send(&tx, Message::new(msg, None))
18 .await
19 .map_err(|e| ActorError::ChannelSend(e.to_string()))
20 }
21
22 async fn send_and_recv(
23 tx: channel::Sender<Message<Self::TargetActor>>,
24 msg: Arc<<Self::TargetActor as Actor>::Message>,
25 ) -> Result<<Self::TargetActor as Actor>::Result, ActorError> {
26 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
27 channel::send(&tx, Message::new(msg, Some(result_tx)))
28 .await
29 .map_err(|e| ActorError::ChannelSend(e.to_string()))?;
30 Ok(result_rx.await?)
31 }
32}
33
34/// Type-erased mailbox interface stored in the system's actor map.
35///
36/// `actor_system_loop` keeps `Arc<dyn Mailbox>` per address so the loop
37/// can route arbitrary actor types through one HashMap. The payload type
38/// is `Arc<dyn Any + Send + Sync>`; the concrete `TypedMailbox<A>` impl
39/// downcasts it to `A::Message` before sending.
40#[async_trait]
41pub trait Mailbox: Send + Sync {
42 /// Fire-and-forget send. Downcast the payload to `A::Message` and push
43 /// it onto the actor's mpsc channel.
44 async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError>;
45 /// Send and await the actor's reply. The reply is returned as a boxed
46 /// `Any`; the caller downcasts to `A::Result`.
47 async fn send_and_recv(
48 &self,
49 msg: Arc<dyn Any + Send + Sync>,
50 ) -> Result<Box<dyn Any + Send>, ActorError>;
51}
52
53/// Concrete `Mailbox` for a specific actor type `A`. Holds the channel
54/// sender to the actor's run loop and performs the type-safe downcast on
55/// each send. Constructed in `Actor::run_actor` and erased to
56/// `Arc<dyn Mailbox>` for storage in the system's actor map.
57pub struct TypedMailbox<A: Actor + Send + Sync> {
58 tx: channel::Sender<Message<A>>,
59}
60
61impl<A: Actor + Send + Sync> TypedMailbox<A> {
62 /// Wrap an mpsc sender. Only called from `Actor::run_actor`.
63 pub fn new(tx: channel::Sender<Message<A>>) -> Self {
64 Self { tx }
65 }
66}
67
68impl<A: Actor + Send + Sync> Messenger for TypedMailbox<A> {
69 type TargetActor = A;
70}
71
72#[async_trait]
73impl<A> Mailbox for TypedMailbox<A>
74where
75 A: Actor + Send + Sync + 'static,
76 A::Message: Any + Send + Sync + 'static,
77 A::Result: Any + Send + 'static,
78{
79 async fn send(&self, msg: Arc<dyn Any + Send + Sync>) -> Result<(), ActorError> {
80 let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
81 <Self as Messenger>::send(self.tx.clone(), msg).await
82 }
83
84 async fn send_and_recv(
85 &self,
86 msg: Arc<dyn Any + Send + Sync>,
87 ) -> Result<Box<dyn Any + Send>, ActorError> {
88 let msg = Arc::downcast::<A::Message>(msg).map_err(|_| ActorError::MessageTypeMismatch)?;
89 let result = <Self as Messenger>::send_and_recv(self.tx.clone(), msg).await?;
90 Ok(Box::new(result))
91 }
92}
93
94/// Internal channel envelope wrapping the user's `<T as Actor>::Message`
95/// plus an optional `oneshot` reply channel.
96///
97/// Users never construct this directly — `send` / `send_and_recv` wrap
98/// the user's message into a `Message<T>` before pushing it onto the
99/// mailbox channel. Exposed so advanced callers can drive the mailbox
100/// directly via `handler_tx`. (Not in the prelude — the name is too
101/// close to `Self::Message` and would shadow user types.)
102///
103/// `Clone` is implemented but drops `result_tx` on the clone (you can't
104/// clone a oneshot sender). Clones therefore become fire-and-forget
105/// envelopes regardless of the original's reply channel.
106#[derive(Debug)]
107pub struct Message<T: Actor> {
108 inner: Arc<<T as Actor>::Message>,
109 result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
110}
111impl<T> Clone for Message<T>
112where
113 T: Actor,
114{
115 fn clone(&self) -> Self {
116 Self {
117 inner: self.inner.clone(),
118 result_tx: None,
119 }
120 }
121}
122
123impl<T> Message<T>
124where
125 T: Actor,
126{
127 /// Build an envelope. `result_tx = Some(_)` makes the send wait for a
128 /// reply (`send_and_recv` flow); `None` is fire-and-forget (`send`).
129 pub fn new(
130 inner: Arc<<T as Actor>::Message>,
131 result_tx: Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>>,
132 ) -> Self {
133 Self { inner, result_tx }
134 }
135
136 /// Clone-out the inner payload `Arc` for the receiving handler. The
137 /// envelope keeps its own reference until dropped.
138 pub fn inner(&self) -> Arc<<T as Actor>::Message> {
139 self.inner.clone()
140 }
141
142 /// Take the reply channel (if any), leaving `None` behind. The actor
143 /// loop calls this once after `handle` produces its `Ok` result and
144 /// uses the returned `Sender` to deliver the reply.
145 pub fn result_tx(&mut self) -> Option<tokio::sync::oneshot::Sender<<T as Actor>::Result>> {
146 let tx = self.result_tx.take();
147 self.result_tx = None;
148 tx
149 }
150}
151
152/// Schedule for a job submitted via `ActorSystem::run_job`.
153///
154/// - `max_iter` — cap on iterations. `None` = run forever (until
155/// `abort_job` or the actor disappears). `Some(n)` exits after `n`
156/// successful iterations.
157/// - `interval` — wait between iterations. `None` is a single-shot job
158/// (`max_iter` is forced to `Some(1)` in `new`).
159/// - `start_at` — earliest moment the first iteration may run. If in the
160/// past, the job starts immediately. If in the future, the job loop
161/// sleeps until then before the first dispatch.
162#[derive(Debug, Clone)]
163pub struct JobSpec {
164 max_iter: Option<usize>,
165 interval: Option<std::time::Duration>,
166 start_at: std::time::SystemTime,
167}
168
169impl JobSpec {
170 /// Build a `JobSpec`. If `interval` is `None`, `max_iter` is forced
171 /// to `Some(1)` regardless of the value you pass — single-shot jobs
172 /// always run exactly once.
173 pub fn new(
174 max_iter: Option<usize>,
175 interval: Option<std::time::Duration>,
176 start_at: std::time::SystemTime,
177 ) -> Self {
178 if let None = interval {
179 Self {
180 max_iter: Some(1),
181 interval,
182 start_at,
183 }
184 } else {
185 Self {
186 max_iter,
187 interval,
188 start_at,
189 }
190 }
191 }
192
193 /// Maximum number of iterations, or `None` for infinite.
194 pub fn max_iter(&self) -> Option<usize> {
195 self.max_iter
196 }
197
198 /// Earliest moment the first iteration may run.
199 pub fn start_at(&self) -> std::time::SystemTime {
200 self.start_at
201 }
202
203 /// Time between iterations, or `None` for single-shot.
204 pub fn interval(&self) -> Option<std::time::Duration> {
205 self.interval
206 }
207}
208
209impl Default for JobSpec {
210 /// Single-shot, runs immediately. Equivalent to
211 /// `JobSpec::new(Some(1), None, SystemTime::now())`.
212 fn default() -> Self {
213 Self {
214 max_iter: Some(1),
215 interval: None,
216 start_at: std::time::SystemTime::now(),
217 }
218 }
219}
220
221/// Handle returned by `run_job`.
222///
223/// `job_id` is stable for the life of the job — pass it to
224/// `abort_job` / `stop_job` / `resume_job`. `result_subscriber_rx` is
225/// `Some` only when `run_job` was called with `subscribe = true`; each
226/// iteration's outcome is pushed onto it. When the job ends (max_iter
227/// reached, aborted, or single-shot completion) the channel is closed,
228/// so `recv()` returning `None` is the natural termination signal.
229#[derive(Debug)]
230pub struct RunJobResult<T: Actor> {
231 /// Identifier you pass to `abort_job` / `stop_job` / `resume_job`.
232 /// Defaults to a fresh UUID v4 unless an explicit id was supplied.
233 pub job_id: String,
234 /// Per-iteration result stream. `None` when the job was submitted
235 /// with `subscribe = false` (fire-and-forget).
236 pub result_subscriber_rx: Option<channel::Receiver<Result<<T as Actor>::Result, ActorError>>>,
237}
238
239/// Control-plane senders for a running job, kept by `actor_system_loop`
240/// and looked up by `job_id` when the user calls `abort_job` / `stop_job`
241/// / `resume_job`. Each channel carries `()` signals only; the job loop
242/// awakens via `tokio::select!` and decides what to do.
243///
244/// Users don't construct or inspect this directly — it lives behind the
245/// `abort_job` / `stop_job` / `resume_job` methods on `ActorSystem`.
246#[derive(Clone)]
247pub struct JobController {
248 /// Signal the job loop to exit immediately. Honored at every wait
249 /// point (`start_at`, pause-for-resume, inter-iteration sleep).
250 pub abort_tx: channel::Sender<()>,
251 /// Signal the job loop to pause after the current iteration. The
252 /// loop then waits on `resume_tx` (or `abort_tx`) before continuing.
253 pub stop_tx: channel::Sender<()>,
254 /// Signal a stopped job to continue iterating.
255 pub resume_tx: channel::Sender<()>,
256}