use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, OnceLock};
use beamr::atom::{Atom, AtomTable};
use beamr::distribution::{DistributionConfig, Resolver};
use beamr::module::ModuleRegistry;
use beamr::scheduler::{Scheduler, SchedulerConfig};
use crate::channel::actor::{ActorRuntime, ChannelActorCore, actor_module, private_data};
use crate::channel::observer::ClusterObserver;
use crate::channel::schema::Schema;
use crate::error::LiminalError;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ChannelRestartPolicy {
pub max_restarts: u32,
pub restart: bool,
}
impl ChannelRestartPolicy {
#[must_use]
pub const fn one_for_one(max_restarts: u32) -> Self {
Self {
max_restarts,
restart: true,
}
}
#[must_use]
pub const fn never() -> Self {
Self {
max_restarts: 0,
restart: false,
}
}
}
impl Default for ChannelRestartPolicy {
fn default() -> Self {
Self::one_for_one(8)
}
}
const CHANNEL_SCHEDULER_THREADS: usize = 1;
#[derive(Clone)]
pub struct ChannelSupervisor {
inner: Arc<SupervisorInner>,
}
struct SupervisorInner {
scheduler: Arc<Scheduler>,
runtime: Arc<ActorRuntime>,
policy: ChannelRestartPolicy,
module_name: Atom,
entry_function: Atom,
observer: OnceLock<Arc<dyn ClusterObserver>>,
}
impl std::fmt::Debug for ChannelSupervisor {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ChannelSupervisor")
.field("policy", &self.inner.policy)
.finish_non_exhaustive()
}
}
impl ChannelSupervisor {
pub fn new() -> Result<Self, LiminalError> {
Self::with_policy(ChannelRestartPolicy::default())
}
pub fn with_policy(policy: ChannelRestartPolicy) -> Result<Self, LiminalError> {
Self::build(policy, None, None, None)
}
pub fn with_distribution(
node_name: String,
creation: u32,
cookie: String,
resolver: Resolver,
policy: ChannelRestartPolicy,
) -> Result<Self, LiminalError> {
let distribution = DistributionConfig { resolver, cookie };
Self::build(policy, Some(node_name), Some(creation), Some(distribution))
}
fn build(
policy: ChannelRestartPolicy,
node_name: Option<String>,
creation: Option<u32>,
distribution: Option<DistributionConfig>,
) -> Result<Self, LiminalError> {
let atoms = AtomTable::with_common_atoms();
let module_name = atoms.intern("liminal_channel_actor");
let entry_function = atoms.intern("main");
let command_function = atoms.intern("process_command");
let command_atom = atoms.intern("liminal_channel_command");
let runtime = Arc::new(ActorRuntime::new(command_atom));
let registry = Arc::new(ModuleRegistry::new());
registry.insert(actor_module(module_name, entry_function, command_function));
let scheduler = Scheduler::new(
SchedulerConfig {
thread_count: Some(CHANNEL_SCHEDULER_THREADS),
nif_private_data: Some(private_data(Arc::clone(&runtime))),
node_name,
creation,
distribution,
..SchedulerConfig::default()
},
registry,
)
.map_err(|message| LiminalError::ConversationFailed { message })?;
Ok(Self {
inner: Arc::new(SupervisorInner {
scheduler: Arc::new(scheduler),
runtime,
policy,
module_name,
entry_function,
observer: OnceLock::new(),
}),
})
}
#[must_use]
pub fn scheduler(&self) -> Arc<Scheduler> {
Arc::clone(&self.inner.scheduler)
}
pub fn install_observer(&self, observer: Arc<dyn ClusterObserver>) {
let _ = self.inner.observer.set(observer);
}
#[must_use]
pub(crate) fn observer(&self) -> Option<&Arc<dyn ClusterObserver>> {
self.inner.observer.get()
}
#[must_use]
pub fn policy(&self) -> &ChannelRestartPolicy {
&self.inner.policy
}
pub(crate) fn spawn_channel(
&self,
schema: Schema,
) -> Result<Arc<ChannelActorCore>, LiminalError> {
let core = Arc::new(ChannelActorCore::new(
self.scheduler(),
self.inner.runtime.command_atom(),
schema,
));
self.spawn_actor_for(&core)?;
Ok(core)
}
pub(crate) fn ensure_running(
&self,
core: &Arc<ChannelActorCore>,
restarts: &AtomicU32,
) -> Result<u64, LiminalError> {
if let Some(pid) = self.live_pid(core)? {
return Ok(pid);
}
let guard = core
.restart_lock()
.lock()
.map_err(|error| LiminalError::DeliveryFailed {
message: format!("channel actor restart lock poisoned: {error}"),
})?;
if let Some(pid) = self.live_pid(core)? {
return Ok(pid);
}
if !self.inner.policy.restart {
return Err(LiminalError::DeliveryFailed {
message: "channel actor died and restart is disabled".to_owned(),
});
}
let used = restarts.fetch_add(1, Ordering::Relaxed);
if used >= self.inner.policy.max_restarts {
return Err(LiminalError::DeliveryFailed {
message: format!(
"channel actor restart budget ({}) exhausted",
self.inner.policy.max_restarts
),
});
}
let pid = self.spawn_actor_for(core)?;
drop(guard);
Ok(pid)
}
fn live_pid(&self, core: &Arc<ChannelActorCore>) -> Result<Option<u64>, LiminalError> {
if let Some(pid) = core.current_pid()? {
if self.inner.scheduler.process_table().get(pid).is_some() {
return Ok(Some(pid));
}
}
Ok(None)
}
fn spawn_actor_for(&self, core: &Arc<ChannelActorCore>) -> Result<u64, LiminalError> {
let pid = self
.inner
.scheduler
.spawn_trap_exit(
self.inner.module_name,
self.inner.entry_function,
Vec::new(),
)
.map_err(|error| LiminalError::ConversationFailed {
message: format!("failed to spawn channel actor: {error:?}"),
})?;
self.inner.runtime.register(pid, Arc::downgrade(core))?;
core.set_current_pid(pid)?;
core.boot()?;
Ok(pid)
}
pub fn shutdown(&self) {
self.inner.scheduler.shutdown();
}
}
static SHARED: OnceLock<ChannelSupervisor> = OnceLock::new();
pub fn shared_supervisor() -> Result<ChannelSupervisor, LiminalError> {
if let Some(existing) = SHARED.get() {
return Ok(existing.clone());
}
let supervisor = ChannelSupervisor::new()?;
Ok(SHARED.get_or_init(|| supervisor).clone())
}