Skip to main content

xan_actor/
actor_system.rs

1use crate::channel::{self, SendAsync};
2use crate::{
3    Actor, ActorError, CHANNEL_SIZE, JobController, JobSpec, LifeCycle, Mailbox, MaybeCodec,
4    RunJobResult,
5};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9#[cfg(feature = "multi-node")]
10use crate::inter_node::InterNodeRuntime;
11
12/// Wire protocol between `ActorSystem` and the single `actor_system_loop`
13/// task that owns the registry. Most users never construct these directly
14/// — the public `ActorSystem` methods (`send`, `register`, `restart`,
15/// `run_job`, etc.) wrap each variant.
16///
17/// Exposed (and reachable via [`ActorSystem::handler_tx`]) so advanced
18/// callers can drive the loop manually — e.g. to issue several
19/// `FindActor` lookups without going through the cache.
20pub enum ActorSystemCmd {
21    /// Insert a fresh actor into the registry. `is_restarted = true`
22    /// permits replacing an existing entry (used by the restart cycle);
23    /// `false` rejects duplicates with `AddressAlreadyExist`.
24    Register {
25        actor_type: String,
26        #[cfg(not(feature = "multi-node"))]
27        address: String,
28        #[cfg(feature = "multi-node")]
29        address: crate::inter_node::Address,
30        mailbox: Arc<dyn Mailbox>,
31        restart_tx: channel::Sender<()>,
32        kill_tx: channel::Sender<()>,
33        life_cycle: LifeCycle,
34        result_tx: tokio::sync::oneshot::Sender<Result<(), ActorError>>,
35        is_restarted: bool,
36    },
37    /// Trigger the restart cycle for every actor whose name matches the
38    /// regex. Backs `ActorSystem::restart`.
39    Restart {
40        address_regex: String,
41    },
42    /// Tear down every actor whose name matches the regex. Backs
43    /// `ActorSystem::unregister`.
44    Unregister {
45        address_regex: String,
46    },
47    /// Snapshot of address names matching the regex. Backs
48    /// `filter_address` and the local side of `send_broadcast`.
49    FilterAddress {
50        address_regex: String,
51        result_tx: tokio::sync::oneshot::Sender<Vec<String>>,
52    },
53    /// Look up a specific actor by `(actor_type, address)`. Returns the
54    /// mailbox plus a `ready` flag (false while the actor is in
55    /// `Starting`/`Restarting`). Backs the send-family methods'
56    /// post-cache lookups.
57    FindActor {
58        actor_type: String,
59        address: String,
60        result_tx: tokio::sync::oneshot::Sender<
61            Option<(Arc<dyn Mailbox>, bool)>, // mailbox, ready
62        >,
63    },
64    /// Update an actor's `LifeCycle`. Pushed by the actor's own
65    /// `run_actor` loop at each transition.
66    SetLifeCycle {
67        address: String,
68        life_cycle: LifeCycle,
69    },
70    /// Register a job's `JobController` under `job_id`. Pushed by
71    /// `run_job` after spawning the job loop, so `abort_job` / `stop_job`
72    /// / `resume_job` can later look it up.
73    RegisterJob {
74        job_id: String,
75        controller: JobController,
76    },
77    /// Look up a `JobController` by id. Backs `abort_job` / `stop_job` /
78    /// `resume_job`.
79    FindJob {
80        job_id: String,
81        result_tx: tokio::sync::oneshot::Sender<Option<JobController>>,
82    },
83}
84
85#[derive(Clone)]
86/// Main entry point for spawning, addressing, and messaging actors.
87///
88/// Owns one background task — the `actor_system_loop` — that holds the
89/// authoritative registry of `(actor_type, address) → mailbox` mappings.
90/// All public methods funnel through that loop via [`Self::handler_tx`],
91/// which keeps registry mutations strictly sequential.
92///
93/// `Clone` is cheap: clones share the same handler channel and (under
94/// `multi-node`) the same `InterNodeRuntime`. Each clone has its own
95/// `cache` (a local `HashMap` of mailboxes for fast resends without
96/// touching the loop), so caching is per-clone, not global.
97///
98/// Under the `multi-node` feature, an `ActorSystem` may also carry a
99/// xanq broker connection; `send`/`send_and_recv`/`run_job` automatically
100/// route to the right node based on `Address::node`.
101pub struct ActorSystem {
102    handler_tx: channel::Sender<ActorSystemCmd>,
103    cache: HashMap<String, (String, Arc<dyn Mailbox>)>,
104    channel_size: usize,
105    /// The cluster identity of this system. Always set when `multi-node` is on.
106    #[cfg(feature = "multi-node")]
107    node_name: String,
108    /// Some when a broker connection was provided; None means local-only
109    /// (sends to other nodes will fail with `InterNodeNotConfigured`).
110    #[cfg(feature = "multi-node")]
111    inter_node: Option<InterNodeRuntime>,
112}
113
114#[cfg(not(feature = "multi-node"))]
115impl Default for ActorSystem {
116    fn default() -> Self {
117        let (handler_tx, handler_rx) = channel::channel(CHANNEL_SIZE);
118        let mut me = Self {
119            handler_tx,
120            cache: HashMap::new(),
121            channel_size: CHANNEL_SIZE,
122        };
123        me.run(handler_rx);
124        me
125    }
126}
127
128impl ActorSystem {
129    /// Spin up a new `ActorSystem` and its backing loop task.
130    ///
131    /// `channel_size` overrides the default `CHANNEL_SIZE` (4096) for
132    /// the command channel that feeds the loop; `None` keeps the
133    /// default. Returned eagerly — the loop task is already running by
134    /// the time this returns.
135    #[cfg(not(feature = "multi-node"))]
136    pub fn new(channel_size: Option<usize>) -> Self {
137        let (handler_tx, handler_rx) =
138            channel::channel(channel_size.unwrap_or(CHANNEL_SIZE));
139        let mut me = Self {
140            handler_tx,
141            cache: HashMap::new(),
142            channel_size: channel_size.unwrap_or(CHANNEL_SIZE),
143        };
144        me.run(handler_rx);
145        me
146    }
147
148    /// Spin up a new `ActorSystem` under `multi-node`.
149    ///
150    /// - `channel_size`: as in the single-node variant.
151    /// - `node_name`: this system's identity in the cluster; embedded in
152    ///   every [`Address`] it owns and used as the xanq topic prefix.
153    /// - `broker_addr`: pass `Some(addr)` to connect to a xanq broker and
154    ///   participate in inter-node delivery; pass `None` to run local-only
155    ///   (cross-node `send`/`send_and_recv`/`run_job` will fail with
156    ///   [`ActorError::InterNodeNotConfigured`]).
157    ///
158    /// When a broker is configured the connect uses
159    /// [`DEFAULT_BROKER_CONNECT_TIMEOUT`] so a missing broker fails
160    /// promptly. After the connect succeeds the request and response
161    /// consumer tasks are spawned automatically — no further setup is
162    /// required.
163    ///
164    /// [`Address`]: crate::inter_node::Address
165    /// [`DEFAULT_BROKER_CONNECT_TIMEOUT`]: crate::inter_node::DEFAULT_BROKER_CONNECT_TIMEOUT
166    #[cfg(feature = "multi-node")]
167    pub async fn new(
168        channel_size: Option<usize>,
169        node_name: String,
170        broker_addr: Option<String>,
171    ) -> Result<Self, ActorError> {
172        let (handler_tx, handler_rx) =
173            channel::channel(channel_size.unwrap_or(CHANNEL_SIZE));
174        let inter_node = match broker_addr {
175            Some(addr) => Some(InterNodeRuntime::connect(node_name.clone(), addr).await?),
176            None => None,
177        };
178        let mut me = Self {
179            handler_tx,
180            cache: HashMap::new(),
181            channel_size: channel_size.unwrap_or(CHANNEL_SIZE),
182            node_name,
183            inter_node: inter_node.clone(),
184        };
185        me.run(handler_rx);
186        if let Some(rt) = inter_node {
187            rt.start_consumers(me.clone()).await?;
188        }
189        Ok(me)
190    }
191
192    /// This system's cluster node name (set during `new`).
193    #[cfg(feature = "multi-node")]
194    pub fn node_name(&self) -> &str {
195        &self.node_name
196    }
197
198    /// Forward an already-decoded payload to a local actor identified by
199    /// `(actor_type, name)`. Returns once the mailbox accepts the message;
200    /// does not wait for the handler to run.
201    ///
202    /// Public because the inter-node request consumer
203    /// ([`InterNodeRuntime::start_consumers`]) calls it after decoding an
204    /// incoming `InterNodeMessage::Fire` envelope. Application code should
205    /// prefer [`Self::send`].
206    ///
207    /// [`InterNodeRuntime::start_consumers`]: crate::inter_node::InterNodeRuntime::start_consumers
208    #[cfg(feature = "multi-node")]
209    pub async fn dispatch_local_any(
210        &self,
211        actor_type: String,
212        address: String,
213        payload: Arc<dyn std::any::Any + Send + Sync>,
214    ) -> Result<(), ActorError> {
215        let (tx, rx) = tokio::sync::oneshot::channel();
216        let _ = self
217            .handler_tx
218            .send_async(ActorSystemCmd::FindActor {
219                actor_type,
220                address: address.clone(),
221                result_tx: tx,
222            })
223            .await;
224        if let Ok(Some((mailbox, ready))) = rx.await {
225            if ready {
226                mailbox.send(payload).await
227            } else {
228                Err(ActorError::ActorNotReady(address))
229            }
230        } else {
231            Err(ActorError::AddressNotFound(address))
232        }
233    }
234
235    /// Request-response counterpart to [`Self::dispatch_local_any`].
236    /// Awaits the handler's return value as `Box<dyn Any>`, which the
237    /// caller is expected to encode (typically via
238    /// [`encode_result_for`]) before shipping back in an
239    /// [`InterNodeResponse`].
240    ///
241    /// [`encode_result_for`]: crate::inter_node::encode_result_for
242    /// [`InterNodeResponse`]: crate::inter_node::InterNodeResponse
243    #[cfg(feature = "multi-node")]
244    pub async fn dispatch_local_any_and_recv(
245        &self,
246        actor_type: String,
247        address: String,
248        payload: Arc<dyn std::any::Any + Send + Sync>,
249    ) -> Result<Box<dyn std::any::Any + Send>, ActorError> {
250        let (tx, rx) = tokio::sync::oneshot::channel();
251        let _ = self
252            .handler_tx
253            .send_async(ActorSystemCmd::FindActor {
254                actor_type,
255                address: address.clone(),
256                result_tx: tx,
257            })
258            .await;
259        if let Ok(Some((mailbox, ready))) = rx.await {
260            if ready {
261                mailbox.send_and_recv(payload).await
262            } else {
263                Err(ActorError::ActorNotReady(address))
264            }
265        } else {
266            Err(ActorError::AddressNotFound(address))
267        }
268    }
269
270    /// Inter-node variant of `run_job`. Encodes the payload once, then spawns
271    /// a loop that drives the remote actor over the xanq broker on the same
272    /// `JobSpec` schedule and exposes the same `JobController`.
273    #[cfg(feature = "multi-node")]
274    async fn spawn_remote_job<T>(
275        &self,
276        address: crate::inter_node::Address,
277        subscribe: bool,
278        job: JobSpec,
279        msg: <T as Actor>::Message,
280        job_id: String,
281        rt: crate::inter_node::InterNodeRuntime,
282    ) -> Result<RunJobResult<T>, ActorError>
283    where
284        T: Actor,
285        <T as Actor>::Message: MaybeCodec,
286        <T as Actor>::Result: MaybeCodec,
287    {
288        let payload_bytes = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
289        let actor_type = std::any::type_name::<T>().to_string();
290        let channel_size = self.channel_size;
291
292        let (abort_tx, mut abort_rx) = channel::channel(channel_size);
293        let (stop_tx, mut stop_rx) = channel::channel(channel_size);
294        let (resume_tx, mut resume_rx) = channel::channel(channel_size);
295
296        let result_subscriber_rx = if subscribe {
297            let (sub_tx, sub_rx) = channel::channel(channel_size);
298            let rt = rt.clone();
299            let address = address.clone();
300            let actor_type = actor_type.clone();
301            let payload_bytes = payload_bytes.clone();
302            tokio::spawn(async move {
303                let mut i = 0usize;
304                loop {
305                    let until_start = job
306                        .start_at()
307                        .duration_since(std::time::SystemTime::now())
308                        .unwrap_or(std::time::Duration::ZERO);
309                    if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
310                        .await
311                    {
312                        drop(sub_tx);
313                        return;
314                    }
315                    i += 1;
316                    let outcome: Result<<T as Actor>::Result, ActorError> = match rt
317                        .call(&address, &actor_type, payload_bytes.clone())
318                        .await
319                    {
320                        Ok(bytes) => <<T as Actor>::Result as xancode::Codec>::decode(
321                            &xancode::Bytes::copy_from_slice(&bytes),
322                        )
323                        .map_err(|_| {
324                            ActorError::InterNodeDecode(format!(
325                                "decode failed for {}",
326                                std::any::type_name::<<T as Actor>::Result>()
327                            ))
328                        }),
329                        Err(e) => Err(e),
330                    };
331                    let _ = sub_tx.send_async(outcome).await;
332                    if let Some(max_iter) = job.max_iter() {
333                        if i >= max_iter {
334                            drop(sub_tx);
335                            return;
336                        }
337                    }
338                    let interval = match job.interval() {
339                        Some(d) => d,
340                        None => {
341                            drop(sub_tx);
342                            return;
343                        }
344                    };
345                    if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
346                    {
347                        drop(sub_tx);
348                        return;
349                    }
350                }
351            });
352            Some(sub_rx)
353        } else {
354            let rt = rt.clone();
355            tokio::spawn(async move {
356                let mut i = 0usize;
357                loop {
358                    let until_start = job
359                        .start_at()
360                        .duration_since(std::time::SystemTime::now())
361                        .unwrap_or(std::time::Duration::ZERO);
362                    if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
363                        .await
364                    {
365                        return;
366                    }
367                    i += 1;
368                    let _ = rt
369                        .fire(&address, &actor_type, payload_bytes.clone())
370                        .await;
371                    if let Some(max_iter) = job.max_iter() {
372                        if i >= max_iter {
373                            return;
374                        }
375                    }
376                    let interval = match job.interval() {
377                        Some(d) => d,
378                        None => return,
379                    };
380                    if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
381                    {
382                        return;
383                    }
384                }
385            });
386            None
387        };
388
389        let _ = self
390            .handler_tx
391            .send_async(ActorSystemCmd::RegisterJob {
392                job_id: job_id.clone(),
393                controller: JobController {
394                    abort_tx,
395                    stop_tx,
396                    resume_tx,
397                },
398            })
399            .await;
400
401        Ok(RunJobResult {
402            job_id,
403            result_subscriber_rx,
404        })
405    }
406
407    /// Clone of the loop's command channel.
408    ///
409    /// Lets advanced callers drive the registry directly with
410    /// [`ActorSystemCmd`] variants — typically to bypass the per-clone TX
411    /// cache or to issue several `FindActor`/`FilterAddress` lookups
412    /// without going through the helper methods. The channel is the same
413    /// one all built-in methods use, so commands are interleaved in
414    /// arrival order with regular traffic.
415    pub fn handler_tx(&self) -> channel::Sender<ActorSystemCmd> {
416        self.handler_tx.clone()
417    }
418
419    /// Snapshot of local actor addresses whose names match `address_regex`.
420    ///
421    /// Uses `*` as a wildcard (converted to the `(\S+)` regex
422    /// alternative); the pattern is anchored as `^...$`. Returns an empty
423    /// vector if the loop drops the response — never panics.
424    pub async fn filter_address(&self, address_regex: String) -> Vec<String> {
425        let (tx, rx) = tokio::sync::oneshot::channel();
426        let _ = self
427            .handler_tx
428            .send_async(ActorSystemCmd::FilterAddress {
429                address_regex,
430                result_tx: tx,
431            })
432            .await;
433        match rx.await {
434            Ok(addresses) => addresses,
435            Err(e) => {
436                error!("Receive address list failed: {:?}", e);
437                Vec::new()
438            }
439        }
440    }
441
442    /// Restart every local actor whose name matches `address_regex`
443    /// (`*` wildcard syntax, same as [`filter_address`]).
444    ///
445    /// Fire-and-forget — sends a `Restart` command and returns. Each
446    /// matched actor goes `Receiving → Stopping → Restarting → Starting →
447    /// Receiving` via the lifecycle hooks; this method does not wait for
448    /// the cycle to complete, so a `send` immediately after `restart` may
449    /// hit an actor still in `Restarting` and get [`ActorError::ActorNotReady`]
450    /// after the system's retry budget.
451    ///
452    /// [`filter_address`]: Self::filter_address
453    pub fn restart(&mut self, address_regex: String) {
454        if let Err(e) =
455            channel::try_send(&self.handler_tx, ActorSystemCmd::Restart { address_regex })
456        {
457            error!("Send restart command failed: {}", e);
458        }
459    }
460
461    /// Unregister every local actor whose name matches `address_regex`
462    /// and tear down its task. Fire-and-forget.
463    ///
464    /// Each matched actor receives a kill signal, runs `post_stop`,
465    /// transitions to `Terminated`, and is removed from the local map.
466    /// `"*"` matches everything — convenient for "kill them all" at
467    /// shutdown.
468    pub fn unregister(&mut self, address_regex: String) {
469        if let Err(e) =
470            channel::try_send(&self.handler_tx, ActorSystemCmd::Unregister { address_regex })
471        {
472            error!("Send unregister command failed: {}", e);
473        }
474    }
475
476    /// Fire-and-forget send to a single actor.
477    ///
478    /// Tries the per-clone TX cache first; on miss, asks the loop for the
479    /// mailbox via `FindActor` and (on success) caches it for next time.
480    /// If the actor is registered but not yet `Receiving`, retries up to
481    /// 10 times with 100 ms sleeps before returning
482    /// [`ActorError::ActorNotReady`].
483    ///
484    /// Under `multi-node`, if `address.node != self.node_name` the
485    /// message is encoded via `xancode::Codec` and shipped through the
486    /// broker as `InterNodeMessage::Fire` (no retry — broker delivery is
487    /// best-effort once accepted).
488    pub async fn send<T>(
489        &mut self,
490        #[cfg(not(feature = "multi-node"))] address: String,
491        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
492        msg: <T as Actor>::Message,
493    ) -> Result<(), ActorError>
494    where
495        T: Actor,
496        <T as Actor>::Message: MaybeCodec,
497    {
498        #[cfg(feature = "multi-node")]
499        if address.node != self.node_name {
500            let rt = self
501                .inter_node
502                .as_ref()
503                .ok_or(ActorError::InterNodeNotConfigured)?
504                .clone();
505            let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
506            return rt
507                .fire(&address, std::any::type_name::<T>(), payload)
508                .await;
509        }
510        #[cfg(feature = "multi-node")]
511        let address: String = address.name;
512
513        let mut retry_count = 0;
514        let actor_type = std::any::type_name::<T>();
515        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
516        loop {
517            let (tx, rx) = tokio::sync::oneshot::channel();
518            match self.cache.entry(address.clone()) {
519                std::collections::hash_map::Entry::Occupied(o) => {
520                    let (cached_actor_type, mailbox) = o.get().clone();
521                    if actor_type == cached_actor_type {
522                        match mailbox.send(payload.clone()).await {
523                            Ok(()) => {
524                                debug!(
525                                    "Send message to actor {} through cached_tx succeeded",
526                                    address
527                                );
528                                return Ok(());
529                            }
530                            Err(e) => {
531                                warn!(
532                                    "Send message to actor {} through cached_tx failed: {:?} ... removing from cache",
533                                    address, e
534                                );
535                                self.cache.remove(&address);
536                            }
537                        }
538                    } else {
539                        warn!(
540                            "Send message with cached tx failed: cached tx of address {} and target actor {} is mismatched ... removing from cache",
541                            address, actor_type,
542                        );
543                        self.cache.remove(&address);
544                    }
545                }
546                _ => {}
547            }
548            let _ = self
549                .handler_tx
550                .send_async(ActorSystemCmd::FindActor {
551                    actor_type: actor_type.to_string(),
552                    address: address.clone(),
553                    result_tx: tx,
554                })
555                .await;
556            if let Ok(Some((tx, ready))) = rx.await {
557                if ready {
558                    debug!("Saving actor {} tx to cache", address);
559                    self.cache
560                        .insert(address.clone(), (actor_type.to_string(), tx.clone()));
561                    let _ = tx.send(payload.clone()).await?;
562                    return Ok(());
563                } else {
564                    retry_count += 1;
565                    debug!(
566                        "Actor {} not ready, retrying... ({}/10)",
567                        address, retry_count
568                    );
569                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
570                    if retry_count < 10 {
571                        continue;
572                    } else {
573                        error!("Actor {} not ready after 10 retries, giving up", address);
574                        return Err(ActorError::ActorNotReady(address));
575                    }
576                }
577            } else {
578                return Err(ActorError::AddressNotFound(address));
579            }
580        }
581    }
582
583    /// Cache-bypassing variant of [`send`].
584    ///
585    /// Always issues a fresh `FindActor` against the loop — useful when
586    /// you don't want to retain a mailbox `Arc` (e.g. one-shot sends) or
587    /// when you're calling from `&self` and the cache mutation isn't
588    /// allowed. Same retry policy and the same multi-node routing as
589    /// [`send`].
590    ///
591    /// [`send`]: Self::send
592    pub async fn send_without_tx_cache<T>(
593        &self,
594        #[cfg(not(feature = "multi-node"))] address: String,
595        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
596        msg: <T as Actor>::Message,
597    ) -> Result<(), ActorError>
598    where
599        T: Actor,
600        <T as Actor>::Message: MaybeCodec,
601    {
602        #[cfg(feature = "multi-node")]
603        if address.node != self.node_name {
604            let rt = self
605                .inter_node
606                .as_ref()
607                .ok_or(ActorError::InterNodeNotConfigured)?
608                .clone();
609            let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
610            return rt
611                .fire(&address, std::any::type_name::<T>(), payload)
612                .await;
613        }
614        #[cfg(feature = "multi-node")]
615        let address: String = address.name;
616
617        let mut retry_count = 0;
618        let actor_type = std::any::type_name::<T>();
619        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
620        loop {
621            let (tx, rx) = tokio::sync::oneshot::channel();
622            let _ = self
623                .handler_tx
624                .send_async(ActorSystemCmd::FindActor {
625                    actor_type: actor_type.to_string(),
626                    address: address.clone(),
627                    result_tx: tx,
628                })
629                .await;
630            if let Ok(Some((tx, ready))) = rx.await {
631                if ready {
632                    let _ = tx.send(payload.clone()).await?;
633                    return Ok(());
634                } else {
635                    retry_count += 1;
636                    debug!(
637                        "Actor {} not ready, retrying... ({}/10)",
638                        address, retry_count
639                    );
640                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
641                    if retry_count < 10 {
642                        continue;
643                    } else {
644                        error!("Actor {} not ready after 10 retries, giving up", address);
645                        return Err(ActorError::ActorNotReady(address));
646                    }
647                }
648            } else {
649                return Err(ActorError::AddressNotFound(address));
650            }
651        }
652    }
653
654    /// Local regex fan-out helper (cached). One entry per matched local actor.
655    async fn broadcast_local_cached<T>(
656        &mut self,
657        address_regex: String,
658        msg: <T as Actor>::Message,
659    ) -> Vec<Result<(), ActorError>>
660    where
661        T: Actor,
662    {
663        let actor_type = std::any::type_name::<T>();
664        let (tx, rx) = tokio::sync::oneshot::channel();
665        let _ = self
666            .handler_tx
667            .send_async(ActorSystemCmd::FilterAddress {
668                address_regex,
669                result_tx: tx,
670            })
671            .await;
672        let addresses = match rx.await {
673            Ok(addresses) => addresses,
674            Err(e) => {
675                error!("Receive address list failed: {:?}", e);
676                return vec![Err(ActorError::from(e))];
677            }
678        };
679        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
680        let mut result = Vec::new();
681        for address in addresses.iter() {
682            match self.cache.entry(address.clone()) {
683                std::collections::hash_map::Entry::Occupied(o) => {
684                    let (cached_actor_type, tx) = o.get().clone();
685                    if cached_actor_type == actor_type {
686                        match tx.send(payload.clone()).await {
687                            Ok(()) => {
688                                result.push(Ok(()));
689                                continue;
690                            }
691                            Err(e) => {
692                                warn!(
693                                    "Send message to actor {} through cached_tx failed: {:?} ... removing from cache",
694                                    address, e
695                                );
696                                self.cache.remove(address);
697                            }
698                        }
699                    } else {
700                        warn!(
701                            "Send message with cached tx failed: cached tx of address {} and target actor {} is mismatched ... removing from cache",
702                            address, actor_type,
703                        );
704                        self.cache.remove(address);
705                    }
706                }
707                _ => {}
708            }
709            let mut retry_count = 0;
710            loop {
711                let (tx, rx) = tokio::sync::oneshot::channel();
712                let _ = self
713                    .handler_tx
714                    .send_async(ActorSystemCmd::FindActor {
715                        actor_type: actor_type.to_string(),
716                        address: address.clone(),
717                        result_tx: tx,
718                    })
719                    .await;
720                if let Ok(Some((tx, ready))) = rx.await {
721                    if ready {
722                        self.cache
723                            .insert(address.clone(), (actor_type.to_string(), tx.clone()));
724                        result.push(tx.send(payload.clone()).await);
725                        break;
726                    } else {
727                        retry_count += 1;
728                        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
729                        if retry_count < 10 {
730                            continue;
731                        } else {
732                            error!("Actor {} not ready after 10 retries, giving up", address);
733                            result.push(Err(ActorError::ActorNotReady(address.clone())));
734                            break;
735                        }
736                    }
737                } else {
738                    result.push(Err(ActorError::AddressNotFound(address.clone())));
739                    break;
740                }
741            }
742        }
743        result
744    }
745
746    /// Sends a message to all actors that match the given address regex.
747    /// Returns one entry per matched local actor.
748    #[cfg(not(feature = "multi-node"))]
749    pub async fn send_broadcast<T>(
750        &mut self,
751        address_regex: String,
752        msg: <T as Actor>::Message,
753    ) -> Vec<Result<(), ActorError>>
754    where
755        T: Actor,
756        <T as Actor>::Message: MaybeCodec,
757    {
758        self.broadcast_local_cached::<T>(address_regex, msg).await
759    }
760
761    /// Sends a message across the nodes selected by `filter`.
762    ///
763    /// `address_regex` is the `*`-wildcard pattern used by [`filter_address`];
764    /// on each targeted node it's matched against that node's local actor
765    /// addresses.
766    ///
767    /// Returns a [`crate::inter_node::BroadcastResult`] with two separately
768    /// counted vectors:
769    /// - `local` — per-actor results for the caller's own local fan-out
770    ///   (matched actors on this node).
771    /// - `remote` — per-peer envelope acks for `BroadcastFire` sends. Each
772    ///   peer regex-matches against *its own* local actors and dispatches
773    ///   fire-and-forget, so `remote.len()` does **not** tell you how many
774    ///   remote actors actually received the message.
775    ///
776    /// `NodeFilter::Peers` containing only the caller's own node degenerates
777    /// to a `SelfOnly` fan-out (no broker traffic).
778    ///
779    /// [`filter_address`]: crate::ActorSystem::filter_address
780    #[cfg(feature = "multi-node")]
781    pub async fn send_broadcast<T>(
782        &mut self,
783        address_regex: String,
784        filter: crate::inter_node::NodeFilter,
785        msg: <T as Actor>::Message,
786    ) -> crate::inter_node::BroadcastResult
787    where
788        T: Actor,
789        <T as Actor>::Message: MaybeCodec,
790    {
791        use crate::inter_node::{BroadcastResult, NodeFilter};
792        use std::collections::HashSet;
793
794        let (run_local, remote_nodes): (bool, Vec<String>) = {
795            let raw: Vec<String> = match filter {
796                NodeFilter::SelfOnly => vec![self.node_name.clone()],
797                NodeFilter::Node(n) => vec![n],
798                NodeFilter::Peers(ns) => ns,
799            };
800            let mut seen = HashSet::new();
801            let mut local = false;
802            let mut remote = Vec::new();
803            for node in raw {
804                if !seen.insert(node.clone()) {
805                    continue;
806                }
807                if node == self.node_name {
808                    local = true;
809                } else {
810                    remote.push(node);
811                }
812            }
813            (local, remote)
814        };
815
816        let remote: Vec<Result<(), ActorError>> = if !remote_nodes.is_empty() {
817            let rt = match self.inter_node.as_ref() {
818                Some(rt) => rt.clone(),
819                None => {
820                    return BroadcastResult {
821                        local: Vec::new(),
822                        remote: vec![Err(ActorError::InterNodeNotConfigured)],
823                    };
824                }
825            };
826            let bytes = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
827            let actor_type = std::any::type_name::<T>();
828            let mut out = Vec::with_capacity(remote_nodes.len());
829            for node in &remote_nodes {
830                out.push(
831                    rt.broadcast_fire(node, actor_type, &address_regex, bytes.clone())
832                        .await,
833                );
834            }
835            out
836        } else {
837            Vec::new()
838        };
839
840        let local = if run_local {
841            self.broadcast_local_cached::<T>(address_regex, msg).await
842        } else {
843            Vec::new()
844        };
845
846        BroadcastResult { local, remote }
847    }
848
849    /// Local regex fan-out helper (no cache). One entry per matched local actor.
850    async fn broadcast_local_uncached<T>(
851        &self,
852        address_regex: String,
853        msg: <T as Actor>::Message,
854    ) -> Vec<Result<(), ActorError>>
855    where
856        T: Actor,
857    {
858        let actor_type = std::any::type_name::<T>();
859        let (tx, rx) = tokio::sync::oneshot::channel();
860        let _ = self
861            .handler_tx
862            .send_async(ActorSystemCmd::FilterAddress {
863                address_regex,
864                result_tx: tx,
865            })
866            .await;
867        let addresses = match rx.await {
868            Ok(addresses) => addresses,
869            Err(e) => {
870                error!("Receive address list failed: {:?}", e);
871                return vec![Err(ActorError::from(e))];
872            }
873        };
874        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
875        let mut result = Vec::new();
876        for address in addresses.iter() {
877            let mut retry_count = 0;
878            loop {
879                let (tx, rx) = tokio::sync::oneshot::channel();
880                let _ = self
881                    .handler_tx
882                    .send_async(ActorSystemCmd::FindActor {
883                        actor_type: actor_type.to_string(),
884                        address: address.clone(),
885                        result_tx: tx,
886                    })
887                    .await;
888                if let Ok(Some((tx, ready))) = rx.await {
889                    if ready {
890                        result.push(tx.send(payload.clone()).await);
891                        break;
892                    } else {
893                        retry_count += 1;
894                        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
895                        if retry_count < 10 {
896                            continue;
897                        } else {
898                            error!("Actor {} not ready after 10 retries, giving up", address);
899                            result.push(Err(ActorError::ActorNotReady(address.clone())));
900                            break;
901                        }
902                    }
903                } else {
904                    result.push(Err(ActorError::AddressNotFound(address.clone())));
905                    break;
906                }
907            }
908        }
909        result
910    }
911
912    /// Non-cached version of `send_broadcast`. Same return-shape rules apply.
913    #[cfg(not(feature = "multi-node"))]
914    pub async fn send_broadcast_without_tx_cache<T>(
915        &self,
916        address_regex: String,
917        msg: <T as Actor>::Message,
918    ) -> Vec<Result<(), ActorError>>
919    where
920        T: Actor,
921        <T as Actor>::Message: MaybeCodec,
922    {
923        self.broadcast_local_uncached::<T>(address_regex, msg).await
924    }
925
926    /// Cache-bypassing variant of [`send_broadcast`] for multi-node.
927    /// Same `BroadcastResult { local, remote }` shape and the same per-actor
928    /// vs. per-peer count semantics — see `send_broadcast`'s docs for the
929    /// details. Skips the local TX cache, so every local match re-issues
930    /// `FindActor`.
931    ///
932    /// [`send_broadcast`]: Self::send_broadcast
933    #[cfg(feature = "multi-node")]
934    pub async fn send_broadcast_without_tx_cache<T>(
935        &self,
936        address_regex: String,
937        filter: crate::inter_node::NodeFilter,
938        msg: <T as Actor>::Message,
939    ) -> crate::inter_node::BroadcastResult
940    where
941        T: Actor,
942        <T as Actor>::Message: MaybeCodec,
943    {
944        use crate::inter_node::{BroadcastResult, NodeFilter};
945        use std::collections::HashSet;
946
947        let (run_local, remote_nodes): (bool, Vec<String>) = {
948            let raw: Vec<String> = match filter {
949                NodeFilter::SelfOnly => vec![self.node_name.clone()],
950                NodeFilter::Node(n) => vec![n],
951                NodeFilter::Peers(ns) => ns,
952            };
953            let mut seen = HashSet::new();
954            let mut local = false;
955            let mut remote = Vec::new();
956            for node in raw {
957                if !seen.insert(node.clone()) {
958                    continue;
959                }
960                if node == self.node_name {
961                    local = true;
962                } else {
963                    remote.push(node);
964                }
965            }
966            (local, remote)
967        };
968
969        let remote: Vec<Result<(), ActorError>> = if !remote_nodes.is_empty() {
970            let rt = match self.inter_node.as_ref() {
971                Some(rt) => rt.clone(),
972                None => {
973                    return BroadcastResult {
974                        local: Vec::new(),
975                        remote: vec![Err(ActorError::InterNodeNotConfigured)],
976                    };
977                }
978            };
979            let bytes = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
980            let actor_type = std::any::type_name::<T>();
981            let mut out = Vec::with_capacity(remote_nodes.len());
982            for node in &remote_nodes {
983                out.push(
984                    rt.broadcast_fire(node, actor_type, &address_regex, bytes.clone())
985                        .await,
986                );
987            }
988            out
989        } else {
990            Vec::new()
991        };
992
993        let local = if run_local {
994            self.broadcast_local_uncached::<T>(address_regex, msg).await
995        } else {
996            Vec::new()
997        };
998
999        BroadcastResult { local, remote }
1000    }
1001
1002    /// Request-response send: deliver `msg` and wait for the handler's
1003    /// `T::Result`.
1004    ///
1005    /// Uses the TX cache and the same readiness retry policy as
1006    /// [`send`]. On a cross-node address the call goes out as
1007    /// `InterNodeMessage::Call`; the response is matched back via the
1008    /// `req_id` in the pending-requests map and decoded via
1009    /// `xancode::Codec`.
1010    ///
1011    /// Errors:
1012    /// - [`ActorError::AddressNotFound`] / [`ActorError::ActorNotReady`]
1013    ///   when the local lookup fails or the actor never becomes ready.
1014    /// - [`ActorError::MessageTypeMismatch`] if the handler returns a
1015    ///   different concrete type than `T::Result` (only possible if you
1016    ///   bypass the trait at compile time).
1017    /// - `ActorError::InterNodeRemote` / `ActorError::InterNodeDecode`
1018    ///   for cross-node failures.
1019    ///
1020    /// [`send`]: Self::send
1021    pub async fn send_and_recv<T>(
1022        &mut self,
1023        #[cfg(not(feature = "multi-node"))] address: String,
1024        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1025        msg: <T as Actor>::Message,
1026    ) -> Result<<T as Actor>::Result, ActorError>
1027    where
1028        T: Actor,
1029        <T as Actor>::Message: MaybeCodec,
1030        <T as Actor>::Result: MaybeCodec,
1031    {
1032        #[cfg(feature = "multi-node")]
1033        if address.node != self.node_name {
1034            let rt = self
1035                .inter_node
1036                .as_ref()
1037                .ok_or(ActorError::InterNodeNotConfigured)?
1038                .clone();
1039            let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
1040            let bytes = rt
1041                .call(&address, std::any::type_name::<T>(), payload)
1042                .await?;
1043            let result = <<T as Actor>::Result as xancode::Codec>::decode(
1044                &xancode::Bytes::copy_from_slice(&bytes),
1045            )
1046            .map_err(|_| {
1047                ActorError::InterNodeDecode(format!(
1048                    "decode failed for {}",
1049                    std::any::type_name::<<T as Actor>::Result>()
1050                ))
1051            })?;
1052            return Ok(result);
1053        }
1054        #[cfg(feature = "multi-node")]
1055        let address: String = address.name;
1056
1057        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1058        let actor_type = std::any::type_name::<T>();
1059        let mut retry_count = 0;
1060        loop {
1061            match self.cache.entry(address.clone()) {
1062                std::collections::hash_map::Entry::Occupied(o) => {
1063                    let (cached_actor_type, tx) = o.get().clone();
1064                    if cached_actor_type == actor_type {
1065                        match tx.send_and_recv(payload.clone()).await {
1066                            Ok(result_any) => {
1067                                debug!(
1068                                    "Send message to actor {} through cached_tx succeeded",
1069                                    address
1070                                );
1071                                let result = result_any
1072                                    .downcast::<T::Result>()
1073                                    .map_err(|_| ActorError::MessageTypeMismatch)?;
1074                                return Ok(*result);
1075                            }
1076                            Err(e) => {
1077                                warn!(
1078                                    "Send message to actor {} through cached_tx failed: {:?} ... removing from cache",
1079                                    address, e
1080                                );
1081                                self.cache.remove(&address);
1082                            }
1083                        }
1084                    } else {
1085                        warn!(
1086                            "Send message with cached tx failed: cached tx of address {} and target actor {} is mismatched ... removing from cache",
1087                            address, actor_type,
1088                        );
1089                        self.cache.remove(&address);
1090                    }
1091                }
1092                _ => {}
1093            }
1094            let (tx, rx) = tokio::sync::oneshot::channel();
1095            let _ = self
1096                .handler_tx
1097                .send_async(ActorSystemCmd::FindActor {
1098                    actor_type: actor_type.to_string(),
1099                    address: address.clone(),
1100                    result_tx: tx,
1101                })
1102                .await;
1103            if let Ok(Some((tx, ready))) = rx.await {
1104                if ready {
1105                    debug!("Saving actor {} tx to cache", address);
1106                    self.cache
1107                        .insert(address.clone(), (actor_type.to_string(), tx.clone()));
1108                    let result_any = tx.send_and_recv(payload.clone()).await?;
1109                    let result = result_any
1110                        .downcast::<T::Result>()
1111                        .map_err(|_| ActorError::MessageTypeMismatch)?;
1112                    return Ok(*result);
1113                } else {
1114                    retry_count += 1;
1115                    debug!(
1116                        "Actor {} not ready, retrying... ({}/10)",
1117                        address, retry_count
1118                    );
1119                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1120                    if retry_count < 10 {
1121                        continue;
1122                    } else {
1123                        error!("Actor {} not ready after 10 retries, giving up", address);
1124                        return Err(ActorError::ActorNotReady(address));
1125                    }
1126                }
1127            } else {
1128                return Err(ActorError::AddressNotFound(address));
1129            }
1130        }
1131    }
1132
1133    /// Cache-bypassing variant of [`send_and_recv`]. Same semantics,
1134    /// always issues a fresh `FindActor`.
1135    ///
1136    /// [`send_and_recv`]: Self::send_and_recv
1137    pub async fn send_and_recv_without_tx_cache<T>(
1138        &self,
1139        #[cfg(not(feature = "multi-node"))] address: String,
1140        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1141        msg: <T as Actor>::Message,
1142    ) -> Result<<T as Actor>::Result, ActorError>
1143    where
1144        T: Actor,
1145        <T as Actor>::Message: MaybeCodec,
1146        <T as Actor>::Result: MaybeCodec,
1147    {
1148        #[cfg(feature = "multi-node")]
1149        if address.node != self.node_name {
1150            let rt = self
1151                .inter_node
1152                .as_ref()
1153                .ok_or(ActorError::InterNodeNotConfigured)?
1154                .clone();
1155            let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
1156            let bytes = rt
1157                .call(&address, std::any::type_name::<T>(), payload)
1158                .await?;
1159            let result = <<T as Actor>::Result as xancode::Codec>::decode(
1160                &xancode::Bytes::copy_from_slice(&bytes),
1161            )
1162            .map_err(|_| {
1163                ActorError::InterNodeDecode(format!(
1164                    "decode failed for {}",
1165                    std::any::type_name::<<T as Actor>::Result>()
1166                ))
1167            })?;
1168            return Ok(result);
1169        }
1170        #[cfg(feature = "multi-node")]
1171        let address: String = address.name;
1172
1173        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1174        let actor_type = std::any::type_name::<T>();
1175        let mut retry_count = 0;
1176        loop {
1177            let (tx, rx) = tokio::sync::oneshot::channel();
1178            let _ = self
1179                .handler_tx
1180                .send_async(ActorSystemCmd::FindActor {
1181                    actor_type: actor_type.to_string(),
1182                    address: address.clone(),
1183                    result_tx: tx,
1184                })
1185                .await;
1186            if let Ok(Some((tx, ready))) = rx.await {
1187                if ready {
1188                    let result_any = tx.send_and_recv(payload.clone()).await?;
1189                    let result = result_any
1190                        .downcast::<T::Result>()
1191                        .map_err(|_| ActorError::MessageTypeMismatch)?;
1192                    return Ok(*result);
1193                } else {
1194                    retry_count += 1;
1195                    debug!(
1196                        "Actor {} not ready, retrying... ({}/10)",
1197                        address, retry_count
1198                    );
1199                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1200                    if retry_count < 10 {
1201                        continue;
1202                    } else {
1203                        error!("Actor {} not ready after 10 retries, giving up", address);
1204                        return Err(ActorError::ActorNotReady(address));
1205                    }
1206                }
1207            } else {
1208                return Err(ActorError::AddressNotFound(address));
1209            }
1210        }
1211    }
1212
1213    /// `send_and_recv` with a wall-clock deadline. Returns
1214    /// [`ActorError::Timeout`] if the reply doesn't arrive within
1215    /// `timeout`.
1216    ///
1217    /// Useful under `multi-node` where a dead peer would otherwise leave
1218    /// the call awaiting a response that will never arrive — the
1219    /// underlying `oneshot` waiter can only fire when the remote sends a
1220    /// response back, so there's no implicit failure detection on the
1221    /// network path. Applies to the local path too: a slow `handle`
1222    /// implementation gets cut off the same way.
1223    ///
1224    /// Cancellation: when `timeout` fires the inner future is dropped.
1225    /// For cross-node calls the dispatcher's pending-requests entry is
1226    /// reclaimed via a `Drop` guard, so a late-arriving response is
1227    /// silently discarded rather than leaking.
1228    ///
1229    /// Takes `&mut self` (unlike the `_without_tx_cache` variant) because
1230    /// the wrapped `send_and_recv` mutates the per-clone TX cache.
1231    pub async fn send_and_recv_with_timeout<T>(
1232        &mut self,
1233        #[cfg(not(feature = "multi-node"))] address: String,
1234        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1235        msg: <T as Actor>::Message,
1236        timeout: std::time::Duration,
1237    ) -> Result<<T as Actor>::Result, ActorError>
1238    where
1239        T: Actor,
1240        <T as Actor>::Message: MaybeCodec,
1241        <T as Actor>::Result: MaybeCodec,
1242    {
1243        match tokio::time::timeout(timeout, self.send_and_recv::<T>(address, msg)).await {
1244            Ok(result) => result,
1245            Err(_) => Err(ActorError::Timeout(timeout)),
1246        }
1247    }
1248
1249    /// Cache-bypassing variant of [`send_and_recv_with_timeout`]. Same
1250    /// timeout semantics; same cleanup on the cross-node path.
1251    ///
1252    /// [`send_and_recv_with_timeout`]: Self::send_and_recv_with_timeout
1253    pub async fn send_and_recv_without_tx_cache_with_timeout<T>(
1254        &self,
1255        #[cfg(not(feature = "multi-node"))] address: String,
1256        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1257        msg: <T as Actor>::Message,
1258        timeout: std::time::Duration,
1259    ) -> Result<<T as Actor>::Result, ActorError>
1260    where
1261        T: Actor,
1262        <T as Actor>::Message: MaybeCodec,
1263        <T as Actor>::Result: MaybeCodec,
1264    {
1265        match tokio::time::timeout(
1266            timeout,
1267            self.send_and_recv_without_tx_cache::<T>(address, msg),
1268        )
1269        .await
1270        {
1271            Ok(result) => result,
1272            Err(_) => Err(ActorError::Timeout(timeout)),
1273        }
1274    }
1275
1276    /// Spawn a background job that delivers `msg` to `address` on the
1277    /// schedule described by [`JobSpec`].
1278    ///
1279    /// - `subscribe = true` sends `send_and_recv` per iteration and pipes
1280    ///   each result through `RunJobResult::result_subscriber_rx`;
1281    ///   `false` uses fire-and-forget `send` and returns `None` for the
1282    ///   subscriber.
1283    /// - `job_id`: provide one to address the job later with
1284    ///   [`abort_job`]/[`stop_job`]/[`resume_job`]; `None` generates a
1285    ///   fresh UUID.
1286    ///
1287    /// The job loop uses `tokio::select!` to race the sleep against the
1288    /// abort/stop/resume channels, so abort/stop are responsive even
1289    /// mid-sleep. Returns once the actor lookup succeeds and the loop
1290    /// has been spawned; the loop itself runs until `max_iter` is
1291    /// reached, the job is aborted, or — for `interval = None` — the
1292    /// single iteration completes.
1293    ///
1294    /// Under `multi-node`, a cross-node address spawns an analogous loop
1295    /// that issues `InterNodeMessage::Call`/`Fire` envelopes via the
1296    /// broker on the same schedule.
1297    ///
1298    /// [`abort_job`]: Self::abort_job
1299    /// [`stop_job`]: Self::stop_job
1300    /// [`resume_job`]: Self::resume_job
1301    pub async fn run_job<T>(
1302        &mut self,
1303        #[cfg(not(feature = "multi-node"))] address: String,
1304        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1305        subscribe: bool,
1306        job: JobSpec,
1307        msg: <T as Actor>::Message,
1308        job_id: Option<String>,
1309    ) -> Result<RunJobResult<T>, ActorError>
1310    where
1311        T: Actor,
1312        <T as Actor>::Message: MaybeCodec,
1313        <T as Actor>::Result: MaybeCodec,
1314    {
1315        let job_id = job_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1316
1317        #[cfg(feature = "multi-node")]
1318        if address.node != self.node_name {
1319            let rt = self
1320                .inter_node
1321                .as_ref()
1322                .ok_or(ActorError::InterNodeNotConfigured)?
1323                .clone();
1324            return self
1325                .spawn_remote_job::<T>(address, subscribe, job, msg, job_id, rt)
1326                .await;
1327        }
1328        #[cfg(feature = "multi-node")]
1329        let address: String = address.name;
1330
1331        let mut retry_count = 0;
1332        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1333        let actor_type = std::any::type_name::<T>();
1334        let mailbox = loop {
1335            let (tx, rx) = tokio::sync::oneshot::channel();
1336            let _ = self
1337                .handler_tx
1338                .send_async(ActorSystemCmd::FindActor {
1339                    actor_type: actor_type.to_string(),
1340                    address: address.clone(),
1341                    result_tx: tx,
1342                })
1343                .await;
1344            if let Ok(Some((mailbox, ready))) = rx.await {
1345                if ready {
1346                    debug!("Saving actor {} tx to cache", address);
1347                    if let None = self.cache.get(&address) {
1348                        self.cache
1349                            .insert(address.clone(), (actor_type.to_string(), mailbox.clone()));
1350                    }
1351                    break mailbox;
1352                } else {
1353                    retry_count += 1;
1354                    debug!(
1355                        "Actor {} not ready, retrying... ({}/10)",
1356                        address, retry_count
1357                    );
1358                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1359                    if retry_count < 10 {
1360                        continue;
1361                    } else {
1362                        error!("Actor {} not ready after 10 retries, giving up", address);
1363                        return Err(ActorError::ActorNotReady(address));
1364                    }
1365                }
1366            } else {
1367                return Err(ActorError::AddressNotFound(address.clone()));
1368            }
1369        };
1370        if subscribe {
1371            let (sub_tx, sub_rx) = channel::channel(self.channel_size);
1372            let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1373            let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1374            let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1375            let payload = payload.clone();
1376            let _ = tokio::spawn(async move {
1377                let mut i = 0usize;
1378                loop {
1379                    let until_start = job
1380                        .start_at()
1381                        .duration_since(std::time::SystemTime::now())
1382                        .unwrap_or(std::time::Duration::ZERO);
1383                    if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1384                        .await
1385                    {
1386                        drop(sub_tx);
1387                        return;
1388                    }
1389                    i += 1;
1390                    let result = match mailbox.send_and_recv(payload.clone()).await {
1391                        Ok(result_any) => result_any
1392                            .downcast::<T::Result>()
1393                            .map(|x| Ok(*x))
1394                            .unwrap_or_else(|_| Err(ActorError::MessageTypeMismatch)),
1395                        Err(e) => Err(e),
1396                    };
1397                    let _ = sub_tx.send_async(result).await;
1398                    if let Some(max_iter) = job.max_iter() {
1399                        if i >= max_iter {
1400                            drop(sub_tx);
1401                            return;
1402                        }
1403                    }
1404                    let interval = match job.interval() {
1405                        Some(d) => d,
1406                        None => {
1407                            drop(sub_tx);
1408                            return;
1409                        }
1410                    };
1411                    if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1412                    {
1413                        drop(sub_tx);
1414                        return;
1415                    }
1416                }
1417            });
1418            let _ = self
1419                .handler_tx
1420                .send_async(ActorSystemCmd::RegisterJob {
1421                    job_id: job_id.clone(),
1422                    controller: JobController {
1423                        abort_tx,
1424                        stop_tx,
1425                        resume_tx,
1426                    },
1427                })
1428                .await;
1429            return Ok(RunJobResult {
1430                job_id,
1431                result_subscriber_rx: Some(sub_rx),
1432            });
1433        } else {
1434            let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1435            let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1436            let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1437            let _ = tokio::spawn(async move {
1438                let mut i = 0usize;
1439                loop {
1440                    let until_start = job
1441                        .start_at()
1442                        .duration_since(std::time::SystemTime::now())
1443                        .unwrap_or(std::time::Duration::ZERO);
1444                    if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1445                        .await
1446                    {
1447                        return;
1448                    }
1449                    i += 1;
1450                    let _ = mailbox.send(payload.clone()).await;
1451                    if let Some(max_iter) = job.max_iter() {
1452                        if i >= max_iter {
1453                            return;
1454                        }
1455                    }
1456                    let interval = match job.interval() {
1457                        Some(d) => d,
1458                        None => return,
1459                    };
1460                    if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1461                    {
1462                        return;
1463                    }
1464                }
1465            });
1466            let _ = self
1467                .handler_tx
1468                .send_async(ActorSystemCmd::RegisterJob {
1469                    job_id: job_id.clone(),
1470                    controller: JobController {
1471                        abort_tx,
1472                        stop_tx,
1473                        resume_tx,
1474                    },
1475                })
1476                .await;
1477            return Ok(RunJobResult {
1478                job_id,
1479                result_subscriber_rx: None,
1480            });
1481        }
1482    }
1483
1484    /// Cache-bypassing variant of [`run_job`]. Same semantics; the
1485    /// resolved mailbox is not inserted into the TX cache, so subsequent
1486    /// `send` calls to the same address will issue another `FindActor`.
1487    ///
1488    /// [`run_job`]: Self::run_job
1489    pub async fn run_job_without_tx_cache<T>(
1490        &self,
1491        #[cfg(not(feature = "multi-node"))] address: String,
1492        #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1493        subscribe: bool,
1494        job: JobSpec,
1495        msg: <T as Actor>::Message,
1496        job_id: Option<String>,
1497    ) -> Result<RunJobResult<T>, ActorError>
1498    where
1499        T: Actor,
1500        <T as Actor>::Message: MaybeCodec,
1501        <T as Actor>::Result: MaybeCodec,
1502    {
1503        let job_id = job_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1504
1505        #[cfg(feature = "multi-node")]
1506        if address.node != self.node_name {
1507            let rt = self
1508                .inter_node
1509                .as_ref()
1510                .ok_or(ActorError::InterNodeNotConfigured)?
1511                .clone();
1512            return self
1513                .spawn_remote_job::<T>(address, subscribe, job, msg, job_id, rt)
1514                .await;
1515        }
1516        #[cfg(feature = "multi-node")]
1517        let address: String = address.name;
1518
1519        let mut retry_count = 0;
1520        let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1521        let actor_type = std::any::type_name::<T>();
1522        let mailbox = loop {
1523            let (tx, rx) = tokio::sync::oneshot::channel();
1524            let _ = self
1525                .handler_tx
1526                .send_async(ActorSystemCmd::FindActor {
1527                    actor_type: actor_type.to_string(),
1528                    address: address.clone(),
1529                    result_tx: tx,
1530                })
1531                .await;
1532            if let Ok(Some((mailbox, ready))) = rx.await {
1533                if ready {
1534                    break mailbox;
1535                } else {
1536                    retry_count += 1;
1537                    debug!(
1538                        "Actor {} not ready, retrying... ({}/10)",
1539                        address, retry_count
1540                    );
1541                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1542                    if retry_count < 10 {
1543                        continue;
1544                    } else {
1545                        error!("Actor {} not ready after 10 retries, giving up", address);
1546                        return Err(ActorError::ActorNotReady(address));
1547                    }
1548                }
1549            } else {
1550                return Err(ActorError::AddressNotFound(address.clone()));
1551            }
1552        };
1553        if subscribe {
1554            let (sub_tx, sub_rx) = channel::channel(self.channel_size);
1555            let payload = payload.clone();
1556            let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1557            let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1558            let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1559            let _ = tokio::spawn(async move {
1560                let mut i = 0usize;
1561                loop {
1562                    let until_start = job
1563                        .start_at()
1564                        .duration_since(std::time::SystemTime::now())
1565                        .unwrap_or(std::time::Duration::ZERO);
1566                    if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1567                        .await
1568                    {
1569                        drop(sub_tx);
1570                        return;
1571                    }
1572                    i += 1;
1573                    let result = match mailbox.send_and_recv(payload.clone()).await {
1574                        Ok(result_any) => result_any
1575                            .downcast::<T::Result>()
1576                            .map(|x| Ok(*x))
1577                            .unwrap_or_else(|_| Err(ActorError::MessageTypeMismatch)),
1578                        Err(e) => Err(e),
1579                    };
1580                    let _ = sub_tx.send_async(result).await;
1581                    if let Some(max_iter) = job.max_iter() {
1582                        if i >= max_iter {
1583                            drop(sub_tx);
1584                            return;
1585                        }
1586                    }
1587                    let interval = match job.interval() {
1588                        Some(d) => d,
1589                        None => {
1590                            drop(sub_tx);
1591                            return;
1592                        }
1593                    };
1594                    if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1595                    {
1596                        drop(sub_tx);
1597                        return;
1598                    }
1599                }
1600            });
1601            let _ = self
1602                .handler_tx
1603                .send_async(ActorSystemCmd::RegisterJob {
1604                    job_id: job_id.clone(),
1605                    controller: JobController {
1606                        abort_tx,
1607                        stop_tx,
1608                        resume_tx,
1609                    },
1610                })
1611                .await;
1612            return Ok(RunJobResult {
1613                job_id,
1614                result_subscriber_rx: Some(sub_rx),
1615            });
1616        } else {
1617            let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1618            let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1619            let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1620            let _ = tokio::spawn(async move {
1621                let mut i = 0usize;
1622                loop {
1623                    let until_start = job
1624                        .start_at()
1625                        .duration_since(std::time::SystemTime::now())
1626                        .unwrap_or(std::time::Duration::ZERO);
1627                    if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1628                        .await
1629                    {
1630                        return;
1631                    }
1632                    i += 1;
1633                    let _ = mailbox.send(payload.clone()).await;
1634                    if let Some(max_iter) = job.max_iter() {
1635                        if i >= max_iter {
1636                            return;
1637                        }
1638                    }
1639                    let interval = match job.interval() {
1640                        Some(d) => d,
1641                        None => return,
1642                    };
1643                    if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1644                    {
1645                        return;
1646                    }
1647                }
1648            });
1649            let _ = self
1650                .handler_tx
1651                .send_async(ActorSystemCmd::RegisterJob {
1652                    job_id: job_id.clone(),
1653                    controller: JobController {
1654                        abort_tx,
1655                        stop_tx,
1656                        resume_tx,
1657                    },
1658                })
1659                .await;
1660            return Ok(RunJobResult {
1661                job_id,
1662                result_subscriber_rx: None,
1663            });
1664        }
1665    }
1666
1667    /// Abort a running job: signal its loop to exit at the next wait
1668    /// point (`start_at` wait, inter-iteration sleep, or during a pause).
1669    /// The job is removed; subsequent `stop_job` / `resume_job` for the
1670    /// same id are no-ops.
1671    ///
1672    /// Honored within milliseconds — abort is raced against the loop's
1673    /// sleep via `tokio::select!`, so you don't wait out the remaining
1674    /// `interval`.
1675    pub async fn abort_job(&self, job_id: String) {
1676        info!("Aborting job {}", job_id);
1677        let (tx, rx) = tokio::sync::oneshot::channel();
1678        let _ = self
1679            .handler_tx
1680            .send_async(ActorSystemCmd::FindJob {
1681                job_id: job_id.clone(),
1682                result_tx: tx,
1683            })
1684            .await;
1685        match rx.await {
1686            Ok(Some(controller)) => {
1687                let _ = controller.abort_tx.send_async(()).await;
1688            }
1689            Ok(None) => {
1690                error!("Job {} not found", job_id);
1691            }
1692            Err(e) => {
1693                error!("Find job {} failed: {}", job_id, e);
1694            }
1695        }
1696    }
1697
1698    /// Pause a running job: the loop finishes the current iteration (if
1699    /// in flight) and then waits on `resume_tx`. Pair with [`resume_job`]
1700    /// to continue, or with [`abort_job`] to terminate while paused.
1701    ///
1702    /// [`resume_job`]: Self::resume_job
1703    /// [`abort_job`]: Self::abort_job
1704    pub async fn stop_job(&self, job_id: String) {
1705        info!("Stopping job {}", job_id);
1706        let (tx, rx) = tokio::sync::oneshot::channel();
1707        let _ = self
1708            .handler_tx
1709            .send_async(ActorSystemCmd::FindJob {
1710                job_id: job_id.clone(),
1711                result_tx: tx,
1712            })
1713            .await;
1714        match rx.await {
1715            Ok(Some(controller)) => {
1716                let _ = controller.stop_tx.send_async(()).await;
1717            }
1718            Ok(None) => {
1719                error!("Job {} not found", job_id);
1720            }
1721            Err(e) => {
1722                error!("Find job {} failed: {}", job_id, e);
1723            }
1724        }
1725    }
1726
1727    /// Resume a [`stop_job`]'d job. The loop wakes up and immediately
1728    /// starts the next iteration's `start_at` wait (which is usually
1729    /// zero by the time you resume).
1730    ///
1731    /// [`stop_job`]: Self::stop_job
1732    pub async fn resume_job(&self, job_id: String) {
1733        info!("Resuming job {}", job_id);
1734        let (tx, rx) = tokio::sync::oneshot::channel();
1735        let _ = self
1736            .handler_tx
1737            .send_async(ActorSystemCmd::FindJob {
1738                job_id: job_id.clone(),
1739                result_tx: tx,
1740            })
1741            .await;
1742        match rx.await {
1743            Ok(Some(controller)) => {
1744                let _ = controller.resume_tx.send_async(()).await;
1745            }
1746            Ok(None) => {
1747                error!("Job {} not found", job_id);
1748            }
1749            Err(e) => {
1750                error!("Find job {} failed: {}", job_id, e);
1751            }
1752        }
1753    }
1754
1755    fn run(
1756        &mut self,
1757        handler_rx: channel::Receiver<ActorSystemCmd>,
1758    ) -> tokio::task::JoinHandle<()> {
1759        #[cfg(feature = "multi-node")]
1760        let inter_node = self.inter_node.clone();
1761        let handle = tokio::task::spawn_blocking(move || {
1762            tokio::runtime::Handle::current().block_on(actor_system_loop(
1763                handler_rx,
1764                #[cfg(feature = "multi-node")]
1765                inter_node,
1766            ))
1767        });
1768        handle
1769    }
1770}
1771
1772// {{{ fn actor_system_loop
1773async fn actor_system_loop(
1774    mut handler_rx: channel::Receiver<ActorSystemCmd>,
1775    #[cfg(feature = "multi-node")] inter_node: Option<InterNodeRuntime>,
1776) {
1777    let mut address_list = HashSet::<String>::new();
1778    let mut actor_map = HashMap::<
1779        String,
1780        (
1781            String,
1782            Arc<dyn Mailbox>,
1783            channel::Sender<()>,
1784            channel::Sender<()>,
1785            LifeCycle,
1786        ),
1787    >::new();
1788    let mut job_controllers = HashMap::new();
1789    while let Some(msg) = handler_rx.recv().await {
1790        match msg {
1791            ActorSystemCmd::Register {
1792                actor_type,
1793                address,
1794                mailbox,
1795                restart_tx,
1796                kill_tx,
1797                life_cycle,
1798                result_tx,
1799                is_restarted,
1800            } => {
1801                #[cfg(not(feature = "multi-node"))]
1802                let name: String = address;
1803                #[cfg(feature = "multi-node")]
1804                let name: String = {
1805                    if let Some(rt) = inter_node.as_ref() {
1806                        if address.node != rt.node_name() {
1807                            let _ = result_tx
1808                                .send(Err(ActorError::AddressNotOwned(address.to_string())));
1809                            continue;
1810                        }
1811                    }
1812                    address.name
1813                };
1814                debug!(
1815                    "Register actor with address {} with type {}",
1816                    name, actor_type
1817                );
1818                if actor_map.contains_key(&name) && !is_restarted {
1819                    let _ = result_tx.send(Err(ActorError::AddressAlreadyExist(name)));
1820                    continue;
1821                }
1822                actor_map.insert(
1823                    name.clone(),
1824                    (actor_type, mailbox, restart_tx, kill_tx, life_cycle),
1825                );
1826                address_list.insert(name);
1827                let _ = result_tx.send(Ok(()));
1828            }
1829            ActorSystemCmd::Restart { address_regex } => {
1830                debug!("Restart actor with address {}", address_regex);
1831                let addresses = match filter_address(&address_list, &address_regex) {
1832                    Ok(addresses) => addresses,
1833                    Err(e) => {
1834                        error!("Filter address failed: {:?}", e);
1835                        continue;
1836                    }
1837                };
1838                for address in addresses {
1839                    if let Some((_actor_type, _tx, restart_tx, _kill_tx, life_cycle)) =
1840                        actor_map.get_mut(&address)
1841                    {
1842                        *life_cycle = LifeCycle::Restarting;
1843                        let _ = restart_tx.send_async(()).await;
1844                    }
1845                }
1846            }
1847            ActorSystemCmd::Unregister { address_regex } => {
1848                debug!("Unregister actor with address {}", address_regex);
1849                let addresses = match filter_address(&address_list, &address_regex) {
1850                    Ok(addresses) => addresses,
1851                    Err(e) => {
1852                        error!("Filter address failed: {:?}", e);
1853                        continue;
1854                    }
1855                };
1856                for address in addresses {
1857                    match actor_map.entry(address.to_string()) {
1858                        std::collections::hash_map::Entry::Occupied(mut entry) => {
1859                            let _ = entry.get_mut().3.send_async(()).await;
1860                            entry.remove_entry();
1861                            address_list.remove(&address);
1862                        }
1863                        std::collections::hash_map::Entry::Vacant(_) => {
1864                            continue;
1865                        }
1866                    }
1867                }
1868            }
1869            ActorSystemCmd::FilterAddress {
1870                address_regex,
1871                result_tx,
1872            } => {
1873                debug!("FilterAddress with regex {}", address_regex);
1874                let addresses = match filter_address(&address_list, &address_regex) {
1875                    Ok(addresses) => addresses,
1876                    Err(e) => {
1877                        error!("Filter address failed: {:?}", e);
1878                        continue;
1879                    }
1880                };
1881                let _ = result_tx.send(addresses);
1882            }
1883            ActorSystemCmd::FindActor {
1884                actor_type,
1885                address,
1886                result_tx,
1887            } => {
1888                debug!(
1889                    "FindActor with address {} with type {}",
1890                    address, actor_type
1891                );
1892                if let Some((target_actor_type, tx, _restart_tx, _kill_tx, life_cycle)) =
1893                    actor_map.get(&address)
1894                {
1895                    match life_cycle {
1896                        LifeCycle::Receiving => {
1897                            if *target_actor_type == actor_type {
1898                                let _ = result_tx.send(Some((tx.clone(), true)));
1899                            } else {
1900                                let _ = result_tx.send(None);
1901                            }
1902                        }
1903                        _ => {
1904                            let _ = result_tx.send(Some((tx.clone(), false)));
1905                        }
1906                    }
1907                } else {
1908                    let _ = result_tx.send(None);
1909                }
1910            }
1911            ActorSystemCmd::SetLifeCycle {
1912                address,
1913                life_cycle,
1914            } => {
1915                debug!(
1916                    "SetLifecycle with address {} into {:?}",
1917                    address, life_cycle
1918                );
1919                if let Some(actor) = actor_map.get_mut(&address) {
1920                    actor.4 = life_cycle;
1921                };
1922            }
1923            ActorSystemCmd::RegisterJob { job_id, controller } => {
1924                debug!("RegisterJob with id {}", job_id);
1925                let _ = job_controllers.insert(job_id, controller);
1926            }
1927            ActorSystemCmd::FindJob { job_id, result_tx } => {
1928                debug!("FindJob with id {}", job_id);
1929                let _ = result_tx.send(job_controllers.get(&job_id).cloned());
1930            }
1931        };
1932    }
1933}
1934// }}}
1935
1936// {{{ fn delay_or_abort
1937/// Sleep for `duration` while staying responsive to the job's control channels.
1938///
1939/// Returns:
1940/// - `true` if the delay elapsed normally (or was zero), or if the job was
1941///   paused via `stop_tx` and later resumed via `resume_tx`.
1942/// - `false` if `abort_tx` fired at any point (during the delay, or during a
1943///   pause). Callers should exit immediately on `false`.
1944///
1945/// Replaces the older `try_recv` polling + `sleep(interval).await` shape,
1946/// which had two problems: (1) abort/stop signals were only checked at the
1947/// top of each iteration, so a fired abort waited up to `interval` to take
1948/// effect; (2) `resume_rx.recv()` was awaited unconditionally during a stop,
1949/// so abort couldn't break a paused job.
1950async fn delay_or_abort(
1951    duration: std::time::Duration,
1952    abort_rx: &mut channel::Receiver<()>,
1953    stop_rx: &mut channel::Receiver<()>,
1954    resume_rx: &mut channel::Receiver<()>,
1955) -> bool {
1956    if duration.is_zero() {
1957        return true;
1958    }
1959    tokio::select! {
1960        _ = tokio::time::sleep(duration) => true,
1961        _ = abort_rx.recv() => false,
1962        _ = stop_rx.recv() => {
1963            tokio::select! {
1964                _ = resume_rx.recv() => true,
1965                _ = abort_rx.recv() => false,
1966            }
1967        }
1968    }
1969}
1970// }}}
1971
1972// {{{ fn filter_address
1973fn filter_address(
1974    address_list: &HashSet<String>,
1975    regex: &str,
1976) -> Result<Vec<String>, regex::Error> {
1977    let regex = regex::Regex::new(&format!("^{}$", regex.replace("*", "(\\S+)"))).map_err(|e| {
1978        error!("Regex error: {:?}", e);
1979        e
1980    })?;
1981    Ok(address_list
1982        .iter()
1983        .filter(|x| regex.is_match(x))
1984        .map(|x| x.to_string())
1985        .collect())
1986}
1987// }}}