liminal/channel/supervisor.rs
1//! LIM-002 R4: channel supervision.
2//!
3//! [`ChannelSupervisor`] owns the beamr [`Scheduler`] every channel actor runs
4//! on, the bytecode module they share, and the per-scheduler [`ActorRuntime`]
5//! that maps actor pids to their cores. It spawns each channel actor as a
6//! `trap_exit` process and re-spawns one on demand if its pid is no longer live,
7//! so a crashed channel is restarted WITHOUT affecting any other channel (each
8//! channel is an independent process with its own pid and subscriber list). The
9//! restart strategy is configurable through [`ChannelRestartPolicy`].
10//!
11//! A process-global default supervisor (`shared_supervisor`) backs the
12//! infallible [`crate::channel::ChannelHandle::new`] constructor so existing
13//! call-sites keep working; tests and the registry can construct dedicated
14//! supervisors for isolation.
15
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::sync::{Arc, OnceLock};
18
19use beamr::atom::{Atom, AtomTable};
20use beamr::distribution::{DistributionConfig, Resolver};
21use beamr::module::ModuleRegistry;
22use beamr::scheduler::{Scheduler, SchedulerConfig};
23
24use crate::channel::actor::{ActorRuntime, ChannelActorCore, actor_module, private_data};
25use crate::channel::observer::ClusterObserver;
26use crate::channel::schema::Schema;
27use crate::error::LiminalError;
28
29/// How a supervised channel actor is restarted after its process dies.
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
31pub struct ChannelRestartPolicy {
32 /// Maximum number of restarts permitted before the actor is left dead.
33 pub max_restarts: u32,
34 /// Whether a dead actor is restarted at all (one-for-one when `true`).
35 pub restart: bool,
36}
37
38impl ChannelRestartPolicy {
39 /// One-for-one restart with a bounded restart budget.
40 #[must_use]
41 pub const fn one_for_one(max_restarts: u32) -> Self {
42 Self {
43 max_restarts,
44 restart: true,
45 }
46 }
47
48 /// No automatic restart (the actor stays dead once it exits).
49 #[must_use]
50 pub const fn never() -> Self {
51 Self {
52 max_restarts: 0,
53 restart: false,
54 }
55 }
56}
57
58impl Default for ChannelRestartPolicy {
59 fn default() -> Self {
60 Self::one_for_one(8)
61 }
62}
63
64/// Number of scheduler threads channel actors share. One thread keeps every
65/// actor's mailbox processing serialized per-process while remaining cheap.
66const CHANNEL_SCHEDULER_THREADS: usize = 1;
67
68/// Supervises channel actor processes on a shared beamr scheduler.
69#[derive(Clone)]
70pub struct ChannelSupervisor {
71 inner: Arc<SupervisorInner>,
72}
73
74struct SupervisorInner {
75 scheduler: Arc<Scheduler>,
76 runtime: Arc<ActorRuntime>,
77 policy: ChannelRestartPolicy,
78 module_name: Atom,
79 entry_function: Atom,
80 /// Optional cluster observer installed once, after construction, by the
81 /// standalone server when clustering is configured (SRV-005). The library
82 /// itself never installs one — clustering is an out-of-library concern.
83 observer: OnceLock<Arc<dyn ClusterObserver>>,
84}
85
86impl std::fmt::Debug for ChannelSupervisor {
87 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 formatter
89 .debug_struct("ChannelSupervisor")
90 .field("policy", &self.inner.policy)
91 .finish_non_exhaustive()
92 }
93}
94
95impl ChannelSupervisor {
96 /// Builds a supervisor with its own scheduler and the default restart policy.
97 ///
98 /// # Errors
99 /// Returns [`LiminalError::ConversationFailed`] when the scheduler cannot start.
100 pub fn new() -> Result<Self, LiminalError> {
101 Self::with_policy(ChannelRestartPolicy::default())
102 }
103
104 /// Builds a supervisor with an explicit restart policy on a non-clustered
105 /// scheduler.
106 ///
107 /// # Errors
108 /// Returns [`LiminalError::ConversationFailed`] when the scheduler cannot start.
109 pub fn with_policy(policy: ChannelRestartPolicy) -> Result<Self, LiminalError> {
110 Self::build(policy, None, None, None)
111 }
112
113 /// Builds a supervisor whose scheduler is distribution-enabled (SRV-005), so
114 /// every channel actor and every subscriber process this supervisor spawns
115 /// shares ONE clustered scheduler. This is the scheduler the cluster attaches
116 /// its process-group transport to: a subscriber pid joined to a channel's pg
117 /// group MUST live on the same scheduler that owns the distribution
118 /// connections, or cross-node delivery cannot reach it.
119 ///
120 /// `node_name`/`creation` form this node's distribution identity; `cookie`
121 /// and `resolver` are handed verbatim to the scheduler's
122 /// [`DistributionConfig`] (the resolver MUST be the same instance the cluster
123 /// uses to dial seeds, so handshake-established names resolve consistently).
124 ///
125 /// # Errors
126 /// Returns [`LiminalError::ConversationFailed`] when the scheduler cannot start.
127 pub fn with_distribution(
128 node_name: String,
129 creation: u32,
130 cookie: String,
131 resolver: Resolver,
132 policy: ChannelRestartPolicy,
133 ) -> Result<Self, LiminalError> {
134 let distribution = DistributionConfig { resolver, cookie };
135 Self::build(policy, Some(node_name), Some(creation), Some(distribution))
136 }
137
138 fn build(
139 policy: ChannelRestartPolicy,
140 node_name: Option<String>,
141 creation: Option<u32>,
142 distribution: Option<DistributionConfig>,
143 ) -> Result<Self, LiminalError> {
144 let atoms = AtomTable::with_common_atoms();
145 let module_name = atoms.intern("liminal_channel_actor");
146 let entry_function = atoms.intern("main");
147 let command_function = atoms.intern("process_command");
148 let command_atom = atoms.intern("liminal_channel_command");
149 let runtime = Arc::new(ActorRuntime::new(command_atom));
150 let registry = Arc::new(ModuleRegistry::new());
151 registry.insert(actor_module(module_name, entry_function, command_function));
152 let scheduler = Scheduler::new(
153 SchedulerConfig {
154 thread_count: Some(CHANNEL_SCHEDULER_THREADS),
155 nif_private_data: Some(private_data(Arc::clone(&runtime))),
156 node_name,
157 creation,
158 distribution,
159 ..SchedulerConfig::default()
160 },
161 registry,
162 )
163 .map_err(|message| LiminalError::ConversationFailed { message })?;
164 Ok(Self {
165 inner: Arc::new(SupervisorInner {
166 scheduler: Arc::new(scheduler),
167 runtime,
168 policy,
169 module_name,
170 entry_function,
171 observer: OnceLock::new(),
172 }),
173 })
174 }
175
176 /// The scheduler channel actors and their subscribers run on.
177 #[must_use]
178 pub fn scheduler(&self) -> Arc<Scheduler> {
179 Arc::clone(&self.inner.scheduler)
180 }
181
182 /// Installs the cluster observer (SRV-005). Idempotent: the first install
183 /// wins and later attempts are ignored, so the observer can be wired exactly
184 /// once after the supervisor (and its scheduler) exist.
185 pub fn install_observer(&self, observer: Arc<dyn ClusterObserver>) {
186 let _ = self.inner.observer.set(observer);
187 }
188
189 /// The installed cluster observer, if any.
190 #[must_use]
191 pub(crate) fn observer(&self) -> Option<&Arc<dyn ClusterObserver>> {
192 self.inner.observer.get()
193 }
194
195 /// The configured restart policy.
196 #[must_use]
197 pub fn policy(&self) -> &ChannelRestartPolicy {
198 &self.inner.policy
199 }
200
201 /// Spawns a fresh channel actor for `schema` and returns its shared core.
202 ///
203 /// # Errors
204 /// Returns [`LiminalError`] when the actor process cannot be spawned.
205 pub(crate) fn spawn_channel(
206 &self,
207 schema: Schema,
208 ) -> Result<Arc<ChannelActorCore>, LiminalError> {
209 let core = Arc::new(ChannelActorCore::new(
210 self.scheduler(),
211 self.inner.runtime.command_atom(),
212 schema,
213 ));
214 self.spawn_actor_for(&core)?;
215 Ok(core)
216 }
217
218 /// Ensures `core` has a live actor process, spawning (or restarting) one if
219 /// its current pid is dead. Honours the restart budget: once exhausted, a
220 /// dead actor is not restarted and a [`LiminalError::DeliveryFailed`] is
221 /// returned. This is the one-for-one restart that leaves other channels
222 /// untouched (each `core` is supervised independently).
223 ///
224 /// # Errors
225 /// Returns [`LiminalError`] when restart is disabled/exhausted or the spawn
226 /// fails.
227 pub(crate) fn ensure_running(
228 &self,
229 core: &Arc<ChannelActorCore>,
230 restarts: &AtomicU32,
231 ) -> Result<u64, LiminalError> {
232 // Fast path: a live pid needs no lock. The slow (respawn) path below is
233 // serialised so two concurrent callers that both see a dead pid cannot
234 // both spawn a replacement (the restart TOCTOU).
235 if let Some(pid) = self.live_pid(core)? {
236 return Ok(pid);
237 }
238 // Hold the per-channel restart lock across the dead-check and respawn so
239 // exactly one thread restarts; any racing caller re-reads the now-live
240 // pid below. The lock lives on `core` so each channel is supervised
241 // independently (mirrors `conversation/actor/core.rs`'s `restart_lock`).
242 let guard = core
243 .restart_lock()
244 .lock()
245 .map_err(|error| LiminalError::DeliveryFailed {
246 message: format!("channel actor restart lock poisoned: {error}"),
247 })?;
248 // Double-checked liveness AFTER acquiring the lock: the thread that won
249 // the race has already respawned, so we must not spawn a second actor.
250 if let Some(pid) = self.live_pid(core)? {
251 return Ok(pid);
252 }
253 if !self.inner.policy.restart {
254 return Err(LiminalError::DeliveryFailed {
255 message: "channel actor died and restart is disabled".to_owned(),
256 });
257 }
258 let used = restarts.fetch_add(1, Ordering::Relaxed);
259 if used >= self.inner.policy.max_restarts {
260 return Err(LiminalError::DeliveryFailed {
261 message: format!(
262 "channel actor restart budget ({}) exhausted",
263 self.inner.policy.max_restarts
264 ),
265 });
266 }
267 let pid = self.spawn_actor_for(core)?;
268 drop(guard);
269 Ok(pid)
270 }
271
272 /// The current pid if it is still live in the scheduler's process table,
273 /// otherwise `None` (the actor needs spawning/restarting).
274 fn live_pid(&self, core: &Arc<ChannelActorCore>) -> Result<Option<u64>, LiminalError> {
275 if let Some(pid) = core.current_pid()? {
276 if self.inner.scheduler.process_table().get(pid).is_some() {
277 return Ok(Some(pid));
278 }
279 }
280 Ok(None)
281 }
282
283 fn spawn_actor_for(&self, core: &Arc<ChannelActorCore>) -> Result<u64, LiminalError> {
284 let pid = self
285 .inner
286 .scheduler
287 .spawn_trap_exit(
288 self.inner.module_name,
289 self.inner.entry_function,
290 Vec::new(),
291 )
292 .map_err(|error| LiminalError::ConversationFailed {
293 message: format!("failed to spawn channel actor: {error:?}"),
294 })?;
295 self.inner.runtime.register(pid, Arc::downgrade(core))?;
296 core.set_current_pid(pid)?;
297 // Re-link the new process to every surviving subscriber so subscriber
298 // death (EXIT) detection works after a restart — exactly as the
299 // conversation actor's `spawn_actor_for` calls `core.boot(...)`. On the
300 // very first spawn the subscriber list is empty, so this is a no-op.
301 core.boot()?;
302 Ok(pid)
303 }
304
305 /// Stops the underlying scheduler.
306 pub fn shutdown(&self) {
307 self.inner.scheduler.shutdown();
308 }
309}
310
311/// The process-global default channel supervisor, lazily started on first use.
312static SHARED: OnceLock<ChannelSupervisor> = OnceLock::new();
313
314/// Returns the shared default supervisor, starting it on first use.
315///
316/// # Errors
317/// Returns [`LiminalError`] when the shared scheduler cannot start.
318pub fn shared_supervisor() -> Result<ChannelSupervisor, LiminalError> {
319 if let Some(existing) = SHARED.get() {
320 return Ok(existing.clone());
321 }
322 let supervisor = ChannelSupervisor::new()?;
323 Ok(SHARED.get_or_init(|| supervisor).clone())
324}