palladium_actor/pool.rs
1use crate::errors::{AskError, SendError};
2use crate::message::Message;
3use crate::path::AddrHash;
4use crate::policy::{RestartPolicy, ShutdownPolicy};
5use crate::stable::StableAddr;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10// ── PoolConfig ─────────────────────────────────────────────────────────────────
11
12/// Configuration for a [`WorkerPool`].
13///
14/// Pass to [`EngineHandle::spawn_worker_pool`] to create and supervise N
15/// identical worker actors under the `/user` supervisor.
16#[derive(Debug, Clone)]
17pub struct PoolConfig {
18 /// Number of worker actors to spawn. Must be ≥ 1.
19 pub size: usize,
20 /// Prefix used to derive per-worker actor names.
21 ///
22 /// Workers are spawned with paths `{parent}/{name_prefix}-0`,
23 /// `{parent}/{name_prefix}-1`, … under whichever supervisor the pool
24 /// builder targets.
25 pub name_prefix: String,
26 /// Per-worker mailbox capacity. Default: `1024`.
27 pub mailbox_capacity: usize,
28 /// Restart policy applied to each worker. Default: [`RestartPolicy::Permanent`].
29 pub restart: RestartPolicy,
30 /// Shutdown policy applied to each worker. Default: 5-second graceful stop.
31 pub shutdown: ShutdownPolicy,
32 /// Timeout for `ask()` calls routed through this pool. Default: 30 seconds.
33 pub ask_timeout: Duration,
34}
35
36impl Default for PoolConfig {
37 fn default() -> Self {
38 Self {
39 size: 4,
40 name_prefix: "worker".to_string(),
41 mailbox_capacity: 1024,
42 restart: RestartPolicy::Permanent,
43 shutdown: ShutdownPolicy::Timeout(Duration::from_secs(5)),
44 ask_timeout: Duration::from_secs(30),
45 }
46 }
47}
48
49// ── WorkerPool ─────────────────────────────────────────────────────────────────
50
51/// A supervision-aware, round-robin routing handle for a pool of identical
52/// worker actors.
53///
54/// Created by [`EngineHandle::spawn_worker_pool`]. Workers are spawned and
55/// supervised by the `/user` supervisor; this struct is a thin routing facade
56/// over N [`StableAddr<M>`]s that remain valid across individual worker
57/// restarts.
58///
59/// ## Routing semantics
60///
61/// - **Round-robin**: each call to [`send`](WorkerPool::send) or
62/// [`ask`](WorkerPool::ask) selects the next worker in rotation.
63/// - **Shared counter across clones**: all clones share the rotation counter
64/// (`Arc<AtomicUsize>`), so concurrent senders interleave rather than
65/// duplicate.
66/// - **Backpressure**: [`SendError::MailboxFull`] is returned immediately when
67/// the selected worker's mailbox is at capacity.
68/// - **Restart window**: [`SendError::Unroutable`] is returned during the brief
69/// interval between a worker crash and the supervisor refreshing its
70/// [`StableAddr`]. The window is transient; callers should yield and retry.
71///
72/// ## In-flight message loss
73///
74/// Messages that were already in a crashed worker's mailbox at crash time are
75/// **lost** (best-effort semantics). Full at-least-once delivery requires
76/// application-level acknowledgement — outside the scope of this primitive.
77pub struct WorkerPool<M: Message> {
78 workers: Vec<StableAddr<M>>,
79 next: Arc<AtomicUsize>,
80}
81
82impl<M: Message> Clone for WorkerPool<M> {
83 fn clone(&self) -> Self {
84 Self {
85 workers: self.workers.clone(),
86 next: Arc::clone(&self.next),
87 }
88 }
89}
90
91impl<M: Message> WorkerPool<M> {
92 /// Create a pool handle from a pre-built list of [`StableAddr<M>`].
93 ///
94 /// Called by the runtime builder; not intended for direct use in
95 /// application code.
96 ///
97 /// # Panics
98 ///
99 /// Panics if `workers` is empty.
100 pub fn new(workers: Vec<StableAddr<M>>) -> Self {
101 assert!(
102 !workers.is_empty(),
103 "WorkerPool requires at least one worker"
104 );
105 Self {
106 workers,
107 next: Arc::new(AtomicUsize::new(0)),
108 }
109 }
110
111 /// Round-robin fire-and-forget send to one worker.
112 ///
113 /// Advances the rotation counter and calls [`StableAddr::send`] on the
114 /// selected worker. Does **not** retry on `Unroutable`; callers that
115 /// need retry-on-crash semantics should yield and call `send` again.
116 ///
117 /// # Errors
118 ///
119 /// - [`SendError::MailboxFull`] — the selected worker's mailbox is full.
120 /// - [`SendError::Unroutable`] — the worker is between crash and restart.
121 pub fn send(&self, msg: M) -> Result<(), SendError> {
122 let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
123 self.workers[idx].send(msg)
124 }
125
126 /// Round-robin request/response.
127 ///
128 /// Selects one worker and awaits its reply.
129 /// `AskError::Send(Unroutable)` may be returned during a crash window;
130 /// callers decide retry.
131 pub async fn ask(&self, msg: M) -> Result<M::Response, AskError> {
132 let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
133 self.workers[idx].ask(msg).await
134 }
135
136 /// Number of worker slots in the pool.
137 pub fn size(&self) -> usize {
138 self.workers.len()
139 }
140
141 /// Number of workers whose [`StableAddr`] has been populated by the
142 /// supervisor (i.e., the worker has been spawned at least once).
143 ///
144 /// Useful in tests and health checks to wait until all workers are live
145 /// before sending messages.
146 pub fn workers_ready_count(&self) -> usize {
147 self.workers
148 .iter()
149 .filter(|w| w.current().is_some())
150 .count()
151 }
152
153 /// Current [`AddrHash`] of the worker at `index`, or `None` if the worker
154 /// has not yet been spawned or is in the crash/restart window.
155 ///
156 /// Index is stable (slot 0 always refers to `{prefix}-0`); the hash
157 /// changes on each restart as a new generation is assigned.
158 pub fn worker_addr(&self, index: usize) -> Option<AddrHash> {
159 self.workers.get(index)?.current().map(|a| a.target())
160 }
161}