Skip to main content

liminal/channel/
supervisor.rs

1//! LIM-002 R4: channel supervision.
2//!
3//! [`ChannelSupervisor`] owns the beamr [`Scheduler`] every channel actor runs
4//! on, the bytecode module they share, and the per-scheduler [`ActorRuntime`]
5//! that maps actor pids to their cores. It spawns each channel actor as a
6//! `trap_exit` process and re-spawns one on demand if its pid is no longer live,
7//! so a crashed channel is restarted WITHOUT affecting any other channel (each
8//! channel is an independent process with its own pid and subscriber list). The
9//! restart strategy is configurable through [`ChannelRestartPolicy`].
10//!
11//! A process-global default supervisor (`shared_supervisor`) backs the
12//! infallible [`crate::channel::ChannelHandle::new`] constructor so existing
13//! call-sites keep working; tests and the registry can construct dedicated
14//! supervisors for isolation.
15
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::sync::{Arc, OnceLock};
18
19use beamr::atom::{Atom, AtomTable};
20use beamr::distribution::{DistributionConfig, Resolver};
21use beamr::module::ModuleRegistry;
22use beamr::scheduler::{Scheduler, SchedulerConfig};
23
24use crate::channel::actor::{ActorRuntime, ChannelActorCore, actor_module, private_data};
25use crate::channel::observer::ClusterObserver;
26use crate::channel::schema::Schema;
27use crate::error::LiminalError;
28
29/// How a supervised channel actor is restarted after its process dies.
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
31pub struct ChannelRestartPolicy {
32    /// Maximum number of restarts permitted before the actor is left dead.
33    pub max_restarts: u32,
34    /// Whether a dead actor is restarted at all (one-for-one when `true`).
35    pub restart: bool,
36}
37
38impl ChannelRestartPolicy {
39    /// One-for-one restart with a bounded restart budget.
40    #[must_use]
41    pub const fn one_for_one(max_restarts: u32) -> Self {
42        Self {
43            max_restarts,
44            restart: true,
45        }
46    }
47
48    /// No automatic restart (the actor stays dead once it exits).
49    #[must_use]
50    pub const fn never() -> Self {
51        Self {
52            max_restarts: 0,
53            restart: false,
54        }
55    }
56}
57
58impl Default for ChannelRestartPolicy {
59    fn default() -> Self {
60        Self::one_for_one(8)
61    }
62}
63
64/// Number of scheduler threads channel actors share. One thread keeps every
65/// actor's mailbox processing serialized per-process while remaining cheap.
66const CHANNEL_SCHEDULER_THREADS: usize = 1;
67
68/// Supervises channel actor processes on a shared beamr scheduler.
69#[derive(Clone)]
70pub struct ChannelSupervisor {
71    inner: Arc<SupervisorInner>,
72}
73
74struct SupervisorInner {
75    scheduler: Arc<Scheduler>,
76    runtime: Arc<ActorRuntime>,
77    policy: ChannelRestartPolicy,
78    module_name: Atom,
79    entry_function: Atom,
80    /// Optional cluster observer installed once, after construction, by the
81    /// standalone server when clustering is configured (SRV-005). The library
82    /// itself never installs one — clustering is an out-of-library concern.
83    observer: OnceLock<Arc<dyn ClusterObserver>>,
84}
85
86impl std::fmt::Debug for ChannelSupervisor {
87    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        formatter
89            .debug_struct("ChannelSupervisor")
90            .field("policy", &self.inner.policy)
91            .finish_non_exhaustive()
92    }
93}
94
95impl ChannelSupervisor {
96    /// Builds a supervisor with its own scheduler and the default restart policy.
97    ///
98    /// # Errors
99    /// Returns [`LiminalError::ConversationFailed`] when the scheduler cannot start.
100    pub fn new() -> Result<Self, LiminalError> {
101        Self::with_policy(ChannelRestartPolicy::default())
102    }
103
104    /// Builds a supervisor with an explicit restart policy on a non-clustered
105    /// scheduler.
106    ///
107    /// # Errors
108    /// Returns [`LiminalError::ConversationFailed`] when the scheduler cannot start.
109    pub fn with_policy(policy: ChannelRestartPolicy) -> Result<Self, LiminalError> {
110        Self::build(policy, None, None, None)
111    }
112
113    /// Builds a supervisor whose scheduler is distribution-enabled (SRV-005), so
114    /// every channel actor and every subscriber process this supervisor spawns
115    /// shares ONE clustered scheduler. This is the scheduler the cluster attaches
116    /// its process-group transport to: a subscriber pid joined to a channel's pg
117    /// group MUST live on the same scheduler that owns the distribution
118    /// connections, or cross-node delivery cannot reach it.
119    ///
120    /// `node_name`/`creation` form this node's distribution identity; `cookie`
121    /// and `resolver` are handed verbatim to the scheduler's
122    /// [`DistributionConfig`] (the resolver MUST be the same instance the cluster
123    /// uses to dial seeds, so handshake-established names resolve consistently).
124    ///
125    /// # Errors
126    /// Returns [`LiminalError::ConversationFailed`] when the scheduler cannot start.
127    pub fn with_distribution(
128        node_name: String,
129        creation: u32,
130        cookie: String,
131        resolver: Resolver,
132        policy: ChannelRestartPolicy,
133    ) -> Result<Self, LiminalError> {
134        let distribution = DistributionConfig { resolver, cookie };
135        Self::build(policy, Some(node_name), Some(creation), Some(distribution))
136    }
137
138    fn build(
139        policy: ChannelRestartPolicy,
140        node_name: Option<String>,
141        creation: Option<u32>,
142        distribution: Option<DistributionConfig>,
143    ) -> Result<Self, LiminalError> {
144        let atoms = AtomTable::with_common_atoms();
145        let module_name = atoms.intern("liminal_channel_actor");
146        let entry_function = atoms.intern("main");
147        let command_function = atoms.intern("process_command");
148        let command_atom = atoms.intern("liminal_channel_command");
149        let runtime = Arc::new(ActorRuntime::new(command_atom));
150        let registry = Arc::new(ModuleRegistry::new());
151        registry.insert(actor_module(module_name, entry_function, command_function));
152        let scheduler = Scheduler::new(
153            SchedulerConfig {
154                thread_count: Some(CHANNEL_SCHEDULER_THREADS),
155                nif_private_data: Some(private_data(Arc::clone(&runtime))),
156                node_name,
157                creation,
158                distribution,
159                ..SchedulerConfig::default()
160            },
161            registry,
162        )
163        .map_err(|message| LiminalError::ConversationFailed { message })?;
164        Ok(Self {
165            inner: Arc::new(SupervisorInner {
166                scheduler: Arc::new(scheduler),
167                runtime,
168                policy,
169                module_name,
170                entry_function,
171                observer: OnceLock::new(),
172            }),
173        })
174    }
175
176    /// The scheduler channel actors and their subscribers run on.
177    #[must_use]
178    pub fn scheduler(&self) -> Arc<Scheduler> {
179        Arc::clone(&self.inner.scheduler)
180    }
181
182    /// Installs the cluster observer (SRV-005). Idempotent: the first install
183    /// wins and later attempts are ignored, so the observer can be wired exactly
184    /// once after the supervisor (and its scheduler) exist.
185    pub fn install_observer(&self, observer: Arc<dyn ClusterObserver>) {
186        let _ = self.inner.observer.set(observer);
187    }
188
189    /// The installed cluster observer, if any.
190    #[must_use]
191    pub(crate) fn observer(&self) -> Option<&Arc<dyn ClusterObserver>> {
192        self.inner.observer.get()
193    }
194
195    /// The configured restart policy.
196    #[must_use]
197    pub fn policy(&self) -> &ChannelRestartPolicy {
198        &self.inner.policy
199    }
200
201    /// Spawns a fresh channel actor for `schema` and returns its shared core.
202    ///
203    /// # Errors
204    /// Returns [`LiminalError`] when the actor process cannot be spawned.
205    pub(crate) fn spawn_channel(
206        &self,
207        schema: Schema,
208    ) -> Result<Arc<ChannelActorCore>, LiminalError> {
209        let core = Arc::new(ChannelActorCore::new(
210            self.scheduler(),
211            self.inner.runtime.command_atom(),
212            schema,
213        ));
214        self.spawn_actor_for(&core)?;
215        Ok(core)
216    }
217
218    /// Ensures `core` has a live actor process, spawning (or restarting) one if
219    /// its current pid is dead. Honours the restart budget: once exhausted, a
220    /// dead actor is not restarted and a [`LiminalError::DeliveryFailed`] is
221    /// returned. This is the one-for-one restart that leaves other channels
222    /// untouched (each `core` is supervised independently).
223    ///
224    /// # Errors
225    /// Returns [`LiminalError`] when restart is disabled/exhausted or the spawn
226    /// fails.
227    pub(crate) fn ensure_running(
228        &self,
229        core: &Arc<ChannelActorCore>,
230        restarts: &AtomicU32,
231    ) -> Result<u64, LiminalError> {
232        // Fast path: a live pid needs no lock. The slow (respawn) path below is
233        // serialised so two concurrent callers that both see a dead pid cannot
234        // both spawn a replacement (the restart TOCTOU).
235        if let Some(pid) = self.live_pid(core)? {
236            return Ok(pid);
237        }
238        // Hold the per-channel restart lock across the dead-check and respawn so
239        // exactly one thread restarts; any racing caller re-reads the now-live
240        // pid below. The lock lives on `core` so each channel is supervised
241        // independently (mirrors `conversation/actor/core.rs`'s `restart_lock`).
242        let guard = core
243            .restart_lock()
244            .lock()
245            .map_err(|error| LiminalError::DeliveryFailed {
246                message: format!("channel actor restart lock poisoned: {error}"),
247            })?;
248        // Double-checked liveness AFTER acquiring the lock: the thread that won
249        // the race has already respawned, so we must not spawn a second actor.
250        if let Some(pid) = self.live_pid(core)? {
251            return Ok(pid);
252        }
253        if !self.inner.policy.restart {
254            return Err(LiminalError::DeliveryFailed {
255                message: "channel actor died and restart is disabled".to_owned(),
256            });
257        }
258        let used = restarts.fetch_add(1, Ordering::Relaxed);
259        if used >= self.inner.policy.max_restarts {
260            return Err(LiminalError::DeliveryFailed {
261                message: format!(
262                    "channel actor restart budget ({}) exhausted",
263                    self.inner.policy.max_restarts
264                ),
265            });
266        }
267        let pid = self.spawn_actor_for(core)?;
268        drop(guard);
269        Ok(pid)
270    }
271
272    /// The current pid if it is still live in the scheduler's process table,
273    /// otherwise `None` (the actor needs spawning/restarting).
274    fn live_pid(&self, core: &Arc<ChannelActorCore>) -> Result<Option<u64>, LiminalError> {
275        if let Some(pid) = core.current_pid()? {
276            if self.inner.scheduler.process_table().get(pid).is_some() {
277                return Ok(Some(pid));
278            }
279        }
280        Ok(None)
281    }
282
283    fn spawn_actor_for(&self, core: &Arc<ChannelActorCore>) -> Result<u64, LiminalError> {
284        let pid = self
285            .inner
286            .scheduler
287            .spawn_trap_exit(
288                self.inner.module_name,
289                self.inner.entry_function,
290                Vec::new(),
291            )
292            .map_err(|error| LiminalError::ConversationFailed {
293                message: format!("failed to spawn channel actor: {error:?}"),
294            })?;
295        self.inner.runtime.register(pid, Arc::downgrade(core))?;
296        core.set_current_pid(pid)?;
297        // Re-link the new process to every surviving subscriber so subscriber
298        // death (EXIT) detection works after a restart — exactly as the
299        // conversation actor's `spawn_actor_for` calls `core.boot(...)`. On the
300        // very first spawn the subscriber list is empty, so this is a no-op.
301        core.boot()?;
302        Ok(pid)
303    }
304
305    /// Stops the underlying scheduler.
306    pub fn shutdown(&self) {
307        self.inner.scheduler.shutdown();
308    }
309}
310
311/// The process-global default channel supervisor, lazily started on first use.
312static SHARED: OnceLock<ChannelSupervisor> = OnceLock::new();
313
314/// Returns the shared default supervisor, starting it on first use.
315///
316/// # Errors
317/// Returns [`LiminalError`] when the shared scheduler cannot start.
318pub fn shared_supervisor() -> Result<ChannelSupervisor, LiminalError> {
319    if let Some(existing) = SHARED.get() {
320        return Ok(existing.clone());
321    }
322    let supervisor = ChannelSupervisor::new()?;
323    Ok(SHARED.get_or_init(|| supervisor).clone())
324}