pub mod backoff;
pub mod health;
pub mod process;
pub mod spec;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Instant;
use tokio::sync::{watch, Mutex, RwLock};
use tracing::{error, info, warn};
use crate::error::{OlError, OL_4302_STARTUP_PROBE_TIMEOUT, OL_4303_RESTART_RATE_LIMIT};
use crate::generated::ManifestBinding;
use crate::telemetry;
use backoff::{backoff_for, RestartTracker};
use health::{liveness_loop, wait_for_startup, LivenessOutcome};
use process::{spawn_process, ManagedChild};
pub use spec::{HealthCheckSpec, ProcessSpec, RestartPolicy};
pub const DEFAULT_ADMIN_PORT: u16 = 8444;
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ToolState {
Starting,
Ready,
Degraded,
Stopped,
}
impl ToolState {
pub fn as_str(self) -> &'static str {
match self {
ToolState::Starting => "starting",
ToolState::Ready => "ready",
ToolState::Degraded => "degraded",
ToolState::Stopped => "stopped",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FailureKind {
CleanExit,
NonZeroExit,
LivenessFailed,
StartupTimeout,
SpawnFailed,
}
impl FailureKind {
pub fn as_str(self) -> &'static str {
match self {
FailureKind::CleanExit => "clean_exit",
FailureKind::NonZeroExit => "non_zero_exit",
FailureKind::LivenessFailed => "liveness_failed",
FailureKind::StartupTimeout => "startup_timeout",
FailureKind::SpawnFailed => "spawn_failed",
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ToolStatus {
pub binding_id: String,
pub tool_slug: String,
pub state: ToolState,
pub pid: Option<u32>,
pub restart_count: u32,
pub started_at_unix: Option<i64>,
pub last_failure: Option<FailureKind>,
}
#[derive(Debug)]
struct MutableState {
state: ToolState,
pid: Option<u32>,
started_at_unix: Option<i64>,
tracker: RestartTracker,
last_failure: Option<FailureKind>,
}
struct Tool {
spec: ProcessSpec,
state: StdMutex<MutableState>,
child: Mutex<Option<ManagedChild>>,
liveness_shutdown: watch::Sender<bool>,
}
impl Tool {
fn snapshot(&self) -> ToolStatus {
let s = self.state.lock().expect("state mutex poisoned");
ToolStatus {
binding_id: self.spec.binding_id.clone(),
tool_slug: self.spec.tool_slug.clone(),
state: s.state,
pid: s.pid,
restart_count: s.tracker.total(),
started_at_unix: s.started_at_unix,
last_failure: s.last_failure,
}
}
fn set_state(&self, state: ToolState) {
self.state.lock().expect("state mutex poisoned").state = state;
}
fn set_state_and_failure(&self, state: ToolState, failure: FailureKind) {
let mut s = self.state.lock().expect("state mutex poisoned");
s.state = state;
s.last_failure = Some(failure);
}
fn current_state(&self) -> ToolState {
self.state.lock().expect("state mutex poisoned").state
}
fn record_restart_and_check(&self) -> (bool, u32) {
let mut s = self.state.lock().expect("state mutex poisoned");
let tripped = s.tracker.record_and_check(Instant::now());
let total = s.tracker.total();
s.state = if tripped {
ToolState::Degraded
} else {
ToolState::Starting
};
(tripped, total)
}
fn record_respawn(&self, pid: Option<u32>) {
let mut s = self.state.lock().expect("state mutex poisoned");
s.pid = pid;
s.started_at_unix = Some(chrono::Utc::now().timestamp());
}
}
pub struct Supervisor {
tools: RwLock<HashMap<String, Arc<Tool>>>,
}
impl Default for Supervisor {
fn default() -> Self {
Self {
tools: RwLock::new(HashMap::new()),
}
}
}
impl Supervisor {
pub fn new() -> Self {
Self::default()
}
pub async fn start_all(
self: &Arc<Self>,
bindings: &[(String, ManifestBinding)],
manifest_dir: &Path,
) -> Result<(), OlError> {
let binding_ids: Vec<&str> = bindings.iter().map(|(id, _)| id.as_str()).collect();
if let Err(e) = process::reconcile_orphans(&binding_ids) {
warn!(error = %e, "orphan reconcile reported error (continuing)");
}
for (id, b) in bindings {
let spec = ProcessSpec::from_manifest(id.clone(), b, manifest_dir)?;
if let Err(e) = self.spawn_and_wait(spec).await {
self.shutdown_all().await;
return Err(e);
}
}
Ok(())
}
async fn spawn_and_wait(self: &Arc<Self>, spec: ProcessSpec) -> Result<(), OlError> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let mut tools = self.tools.write().await;
if let Some(existing) = tools.get(&spec.binding_id).cloned() {
drop(tools);
let _ = existing.liveness_shutdown.send(true);
if let Some(child) = existing.child.lock().await.take() {
child.shutdown(existing.spec.kill_timeout).await;
}
tools = self.tools.write().await;
}
let child = spawn_process(&spec)?;
let pid = child.pid;
let started_at_unix = chrono::Utc::now().timestamp();
let lang = telemetry::events::language_hint_for(&spec.command[0]);
let tool = Arc::new(Tool {
state: StdMutex::new(MutableState {
state: ToolState::Starting,
pid,
started_at_unix: Some(started_at_unix),
tracker: RestartTracker::new(spec.restart_limit),
last_failure: None,
}),
child: Mutex::new(Some(child)),
liveness_shutdown: shutdown_tx,
spec: spec.clone(),
});
tools.insert(spec.binding_id.clone(), tool.clone());
drop(tools);
telemetry::capture_global(telemetry::Event::tool_process_started(
&spec.binding_id,
lang,
0,
));
let url = spec.health_url();
match wait_for_startup(&spec.binding_id, &spec.health, &url, spec.start_timeout).await {
Ok(()) => {
info!(binding_id = %spec.binding_id, tool = %spec.tool_slug, "tool ready");
tool.set_state(ToolState::Ready);
}
Err(e) => {
tool.set_state_and_failure(ToolState::Degraded, FailureKind::StartupTimeout);
telemetry::capture_global(telemetry::Event::tool_process_crashed(
&spec.binding_id,
FailureKind::StartupTimeout.as_str(),
0,
));
return Err(e);
}
}
let supervisor = self.clone();
let spec_for_loop = spec.clone();
tokio::spawn(async move {
supervisor
.run_liveness_loop(spec_for_loop, shutdown_rx)
.await;
});
Ok(())
}
async fn run_liveness_loop(
self: Arc<Self>,
spec: ProcessSpec,
mut shutdown_rx: watch::Receiver<bool>,
) {
loop {
let tool = match self.tools.read().await.get(&spec.binding_id).cloned() {
Some(t) => t,
None => return,
};
let mut child = match tool.child.lock().await.take() {
Some(c) => c,
None => return,
};
let url = spec.health_url();
let live_fut = liveness_loop(&spec.binding_id, &spec.health, &url, shutdown_rx.clone());
let wait_fut = child.wait();
let outcome = tokio::select! {
w = wait_fut => RestartReason::ChildExited(w.unwrap_or(false)),
l = live_fut => match l {
LivenessOutcome::Healthy => RestartReason::Shutdown,
LivenessOutcome::FailureThresholdHit => RestartReason::LivenessFailed,
},
_ = shutdown_rx.changed() => RestartReason::Shutdown,
};
if matches!(outcome, RestartReason::Shutdown) || *shutdown_rx.borrow_and_update() {
*tool.child.lock().await = Some(child);
return;
}
child.shutdown(spec.kill_timeout).await;
let clean_exit = matches!(outcome, RestartReason::ChildExited(true));
let failure_kind = match outcome {
RestartReason::ChildExited(true) => FailureKind::CleanExit,
RestartReason::ChildExited(false) => FailureKind::NonZeroExit,
RestartReason::LivenessFailed => FailureKind::LivenessFailed,
RestartReason::Shutdown => unreachable!(),
};
if !spec.restart.should_restart(clean_exit) {
info!(
binding_id = %spec.binding_id,
policy = ?spec.restart,
"restart policy says do not respawn"
);
tool.set_state_and_failure(ToolState::Stopped, failure_kind);
return;
}
let (tripped, total) = tool.record_restart_and_check();
tool.set_state_and_failure(
if tripped {
ToolState::Degraded
} else {
ToolState::Starting
},
failure_kind,
);
telemetry::capture_global(telemetry::Event::tool_process_crashed(
&spec.binding_id,
failure_kind.as_str(),
total,
));
if tripped {
warn!(
binding_id = %spec.binding_id,
restart_count = total,
window_ms = spec.restart_limit.window.as_millis() as u64,
"restart rate-limit hit; marking binding degraded (OL-4303)"
);
telemetry::capture_global(telemetry::Event::tool_process_degraded(
&spec.binding_id,
total,
spec.restart_limit.window.as_secs() as u32,
));
return;
}
let delay = backoff_for(total);
info!(
binding_id = %spec.binding_id,
restart_count = total,
delay_ms = delay.as_millis() as u64,
failure_kind = failure_kind.as_str(),
"respawning after backoff"
);
tokio::time::sleep(delay).await;
match spawn_process(&spec) {
Ok(new_child) => {
tool.record_respawn(new_child.pid);
*tool.child.lock().await = Some(new_child);
let url = spec.health_url();
match wait_for_startup(&spec.binding_id, &spec.health, &url, spec.start_timeout)
.await
{
Ok(()) => tool.set_state(ToolState::Ready),
Err(e) => {
error!(
binding_id = %spec.binding_id,
error = %e,
"startup probe failed after respawn"
);
tool.set_state_and_failure(
ToolState::Degraded,
FailureKind::StartupTimeout,
);
telemetry::capture_global(telemetry::Event::tool_process_crashed(
&spec.binding_id,
FailureKind::StartupTimeout.as_str(),
total,
));
return;
}
}
}
Err(e) => {
error!(binding_id = %spec.binding_id, error = %e, "respawn failed");
tool.set_state_and_failure(ToolState::Degraded, FailureKind::SpawnFailed);
return;
}
}
}
}
pub async fn shutdown_all(&self) {
let tools: Vec<Arc<Tool>> = self.tools.read().await.values().cloned().collect();
for tool in &tools {
let _ = tool.liveness_shutdown.send(true);
tool.set_state(ToolState::Stopped);
}
for tool in tools {
if let Some(child) = tool.child.lock().await.take() {
info!(binding_id = %tool.spec.binding_id, "stopping managed tool");
child.shutdown(tool.spec.kill_timeout).await;
}
}
}
pub async fn snapshot(&self) -> Vec<ToolStatus> {
self.tools
.read()
.await
.values()
.map(|t| t.snapshot())
.collect()
}
pub async fn restart(self: &Arc<Self>, binding_id: &str) -> Result<(), OlError> {
let tool = self
.tools
.read()
.await
.get(binding_id)
.cloned()
.ok_or_else(|| {
OlError::new(
crate::error::OL_4222_BINDING_NOT_CONFIGURED,
format!("binding `{binding_id}` is not managed by this daemon"),
)
})?;
if tool.current_state() == ToolState::Degraded {
return Err(OlError::new(
OL_4303_RESTART_RATE_LIMIT,
format!(
"binding `{binding_id}` is degraded (restart rate-limit was hit); \
restart the daemon to reset the rate-limit window"
),
));
}
let _ = tool.liveness_shutdown.send(true);
if let Some(child) = tool.child.lock().await.take() {
child.shutdown(tool.spec.kill_timeout).await;
}
self.spawn_and_wait(tool.spec.clone()).await
}
pub async fn probe(&self, binding_id: &str) -> Result<(), OlError> {
let tool = self
.tools
.read()
.await
.get(binding_id)
.cloned()
.ok_or_else(|| {
OlError::new(
crate::error::OL_4222_BINDING_NOT_CONFIGURED,
format!("binding `{binding_id}` is not managed by this daemon"),
)
})?;
let url = tool.spec.health_url();
health::probe_once(&url, tool.spec.health.liveness_timeout)
.await
.map_err(|e| {
OlError::new(
OL_4302_STARTUP_PROBE_TIMEOUT,
format!("binding `{binding_id}`: probe failed: {e}"),
)
})
}
pub async fn log_path(&self, binding_id: &str) -> Option<std::path::PathBuf> {
let tool = self.tools.read().await.get(binding_id).cloned()?;
let child = tool.child.lock().await;
child.as_ref().map(|c| c.log_path.clone())
}
pub async fn resolve_slug(&self, slug: &str) -> Option<String> {
self.tools
.read()
.await
.values()
.find(|t| t.spec.tool_slug == slug)
.map(|t| t.spec.binding_id.clone())
}
}
#[derive(Debug)]
enum RestartReason {
ChildExited(bool),
LivenessFailed,
Shutdown,
}