Skip to main content

palladium_actor/
pool.rs

1use crate::errors::{AskError, SendError};
2use crate::message::Message;
3use crate::path::AddrHash;
4use crate::policy::{RestartPolicy, ShutdownPolicy};
5use crate::stable::StableAddr;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10// ── PoolConfig ─────────────────────────────────────────────────────────────────
11
12/// Configuration for a [`WorkerPool`].
13///
14/// Pass to [`EngineHandle::spawn_worker_pool`] to create and supervise N
15/// identical worker actors under the `/user` supervisor.
16#[derive(Debug, Clone)]
17pub struct PoolConfig {
18    /// Number of worker actors to spawn.  Must be ≥ 1.
19    pub size: usize,
20    /// Prefix used to derive per-worker actor names.
21    ///
22    /// Workers are spawned with paths `{parent}/{name_prefix}-0`,
23    /// `{parent}/{name_prefix}-1`, … under whichever supervisor the pool
24    /// builder targets.
25    pub name_prefix: String,
26    /// Per-worker mailbox capacity.  Default: `1024`.
27    pub mailbox_capacity: usize,
28    /// Restart policy applied to each worker.  Default: [`RestartPolicy::Permanent`].
29    pub restart: RestartPolicy,
30    /// Shutdown policy applied to each worker.  Default: 5-second graceful stop.
31    pub shutdown: ShutdownPolicy,
32    /// Timeout for `ask()` calls routed through this pool.  Default: 30 seconds.
33    pub ask_timeout: Duration,
34}
35
36impl Default for PoolConfig {
37    fn default() -> Self {
38        Self {
39            size: 4,
40            name_prefix: "worker".to_string(),
41            mailbox_capacity: 1024,
42            restart: RestartPolicy::Permanent,
43            shutdown: ShutdownPolicy::Timeout(Duration::from_secs(5)),
44            ask_timeout: Duration::from_secs(30),
45        }
46    }
47}
48
49// ── WorkerPool ─────────────────────────────────────────────────────────────────
50
51/// A supervision-aware, round-robin routing handle for a pool of identical
52/// worker actors.
53///
54/// Created by [`EngineHandle::spawn_worker_pool`].  Workers are spawned and
55/// supervised by the `/user` supervisor; this struct is a thin routing facade
56/// over N [`StableAddr<M>`]s that remain valid across individual worker
57/// restarts.
58///
59/// ## Routing semantics
60///
61/// - **Round-robin**: each call to [`send`](WorkerPool::send) or
62///   [`ask`](WorkerPool::ask) selects the next worker in rotation.
63/// - **Shared counter across clones**: all clones share the rotation counter
64///   (`Arc<AtomicUsize>`), so concurrent senders interleave rather than
65///   duplicate.
66/// - **Backpressure**: [`SendError::MailboxFull`] is returned immediately when
67///   the selected worker's mailbox is at capacity.
68/// - **Restart window**: [`SendError::Unroutable`] is returned during the brief
69///   interval between a worker crash and the supervisor refreshing its
70///   [`StableAddr`].  The window is transient; callers should yield and retry.
71///
72/// ## In-flight message loss
73///
74/// Messages that were already in a crashed worker's mailbox at crash time are
75/// **lost** (best-effort semantics).  Full at-least-once delivery requires
76/// application-level acknowledgement — outside the scope of this primitive.
77pub struct WorkerPool<M: Message> {
78    workers: Vec<StableAddr<M>>,
79    next: Arc<AtomicUsize>,
80}
81
82impl<M: Message> Clone for WorkerPool<M> {
83    fn clone(&self) -> Self {
84        Self {
85            workers: self.workers.clone(),
86            next: Arc::clone(&self.next),
87        }
88    }
89}
90
91impl<M: Message> WorkerPool<M> {
92    /// Create a pool handle from a pre-built list of [`StableAddr<M>`].
93    ///
94    /// Called by the runtime builder; not intended for direct use in
95    /// application code.
96    ///
97    /// # Panics
98    ///
99    /// Panics if `workers` is empty.
100    pub fn new(workers: Vec<StableAddr<M>>) -> Self {
101        assert!(
102            !workers.is_empty(),
103            "WorkerPool requires at least one worker"
104        );
105        Self {
106            workers,
107            next: Arc::new(AtomicUsize::new(0)),
108        }
109    }
110
111    /// Round-robin fire-and-forget send to one worker.
112    ///
113    /// Advances the rotation counter and calls [`StableAddr::send`] on the
114    /// selected worker.  Does **not** retry on `Unroutable`; callers that
115    /// need retry-on-crash semantics should yield and call `send` again.
116    ///
117    /// # Errors
118    ///
119    /// - [`SendError::MailboxFull`] — the selected worker's mailbox is full.
120    /// - [`SendError::Unroutable`] — the worker is between crash and restart.
121    pub fn send(&self, msg: M) -> Result<(), SendError> {
122        let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
123        self.workers[idx].send(msg)
124    }
125
126    /// Round-robin request/response.
127    ///
128    /// Selects one worker and awaits its reply.
129    /// `AskError::Send(Unroutable)` may be returned during a crash window;
130    /// callers decide retry.
131    pub async fn ask(&self, msg: M) -> Result<M::Response, AskError> {
132        let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
133        self.workers[idx].ask(msg).await
134    }
135
136    /// Number of worker slots in the pool.
137    pub fn size(&self) -> usize {
138        self.workers.len()
139    }
140
141    /// Number of workers whose [`StableAddr`] has been populated by the
142    /// supervisor (i.e., the worker has been spawned at least once).
143    ///
144    /// Useful in tests and health checks to wait until all workers are live
145    /// before sending messages.
146    pub fn workers_ready_count(&self) -> usize {
147        self.workers
148            .iter()
149            .filter(|w| w.current().is_some())
150            .count()
151    }
152
153    /// Current [`AddrHash`] of the worker at `index`, or `None` if the worker
154    /// has not yet been spawned or is in the crash/restart window.
155    ///
156    /// Index is stable (slot 0 always refers to `{prefix}-0`); the hash
157    /// changes on each restart as a new generation is assigned.
158    pub fn worker_addr(&self, index: usize) -> Option<AddrHash> {
159        self.workers.get(index)?.current().map(|a| a.target())
160    }
161}