use crate::error::types::SupervisorError;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Notify;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RuntimeControlPlaneState {
Starting,
Alive,
ShuttingDown,
Completed,
Failed,
}
impl RuntimeControlPlaneState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Starting => "starting",
Self::Alive => "alive",
Self::ShuttingDown => "shutting_down",
Self::Completed => "completed",
Self::Failed => "failed",
}
}
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Completed | Self::Failed)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RuntimeFailureReason {
pub phase: String,
pub reason: String,
pub panic: bool,
pub recoverable: bool,
}
impl RuntimeFailureReason {
pub fn new(
phase: impl Into<String>,
reason: impl Into<String>,
panic: bool,
recoverable: bool,
) -> Self {
Self {
phase: phase.into(),
reason: reason.into(),
panic,
recoverable,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RuntimeExitReport {
pub state: RuntimeControlPlaneState,
pub phase: String,
pub reason: String,
pub recoverable: bool,
pub completed_at_unix_nanos: u128,
pub panic: bool,
}
impl RuntimeExitReport {
pub fn completed(phase: impl Into<String>, reason: impl Into<String>) -> Self {
Self {
state: RuntimeControlPlaneState::Completed,
phase: phase.into(),
reason: reason.into(),
recoverable: false,
completed_at_unix_nanos: unix_nanos_now(),
panic: false,
}
}
pub fn failed(
phase: impl Into<String>,
reason: impl Into<String>,
panic: bool,
recoverable: bool,
) -> Self {
Self {
state: RuntimeControlPlaneState::Failed,
phase: phase.into(),
reason: reason.into(),
recoverable,
completed_at_unix_nanos: unix_nanos_now(),
panic,
}
}
pub fn failure_reason(&self) -> Option<RuntimeFailureReason> {
(self.state == RuntimeControlPlaneState::Failed).then(|| {
RuntimeFailureReason::new(
self.phase.clone(),
self.reason.clone(),
self.panic,
self.recoverable,
)
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RuntimeHealthReport {
pub alive: bool,
pub state: RuntimeControlPlaneState,
pub started_at_unix_nanos: u128,
pub last_observed_at_unix_nanos: u128,
pub failure: Option<RuntimeFailureReason>,
pub exit_report: Option<RuntimeExitReport>,
}
#[derive(Debug, Clone)]
pub struct RuntimeControlPlane {
inner: Arc<Mutex<RuntimeControlPlaneInner>>,
notify: Arc<Notify>,
}
impl RuntimeControlPlane {
pub fn new() -> Self {
let now = unix_nanos_now();
Self {
inner: Arc::new(Mutex::new(RuntimeControlPlaneInner {
state: RuntimeControlPlaneState::Starting,
started_at_unix_nanos: now,
last_observed_at_unix_nanos: now,
exit_report: None,
failure: None,
shutdown_requested_by: None,
shutdown_reason: None,
})),
notify: Arc::new(Notify::new()),
}
}
pub fn mark_alive(&self) {
let mut inner = self.lock_inner();
if !inner.state.is_terminal() {
inner.state = RuntimeControlPlaneState::Alive;
inner.last_observed_at_unix_nanos = unix_nanos_now();
}
}
pub fn is_alive(&self) -> bool {
let mut inner = self.lock_inner();
inner.last_observed_at_unix_nanos = unix_nanos_now();
inner.state == RuntimeControlPlaneState::Alive
}
pub fn health(&self) -> RuntimeHealthReport {
let mut inner = self.lock_inner();
inner.last_observed_at_unix_nanos = unix_nanos_now();
RuntimeHealthReport {
alive: inner.state == RuntimeControlPlaneState::Alive,
state: inner.state,
started_at_unix_nanos: inner.started_at_unix_nanos,
last_observed_at_unix_nanos: inner.last_observed_at_unix_nanos,
failure: inner.failure.clone(),
exit_report: inner.exit_report.clone(),
}
}
pub fn mark_shutdown_requested(
&self,
requested_by: impl Into<String>,
reason: impl Into<String>,
) -> Result<Option<RuntimeExitReport>, SupervisorError> {
let requested_by = requested_by.into();
let reason = reason.into();
validate_required_text(&requested_by, "requested_by")?;
validate_required_text(&reason, "reason")?;
let mut inner = self.lock_inner();
if let Some(report) = &inner.exit_report {
return Ok(Some(report.clone()));
}
inner.state = RuntimeControlPlaneState::ShuttingDown;
inner.shutdown_requested_by = Some(requested_by);
inner.shutdown_reason = Some(reason);
inner.last_observed_at_unix_nanos = unix_nanos_now();
Ok(None)
}
pub fn complete(&self, report: RuntimeExitReport) -> RuntimeExitReport {
let mut inner = self.lock_inner();
if let Some(existing) = &inner.exit_report {
return existing.clone();
}
inner.state = report.state;
inner.failure = report.failure_reason();
inner.exit_report = Some(report.clone());
inner.last_observed_at_unix_nanos = report.completed_at_unix_nanos;
self.notify.notify_waiters();
report
}
pub fn final_report(&self) -> Option<RuntimeExitReport> {
self.lock_inner().exit_report.clone()
}
pub async fn join(&self) -> RuntimeExitReport {
loop {
let notified = self.notify.notified();
if let Some(report) = self.final_report() {
return report;
}
notified.await;
}
}
fn lock_inner(&self) -> std::sync::MutexGuard<'_, RuntimeControlPlaneInner> {
self.inner
.lock()
.expect("runtime control plane lock poisoned")
}
}
impl Default for RuntimeControlPlane {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
struct RuntimeControlPlaneInner {
state: RuntimeControlPlaneState,
started_at_unix_nanos: u128,
last_observed_at_unix_nanos: u128,
exit_report: Option<RuntimeExitReport>,
failure: Option<RuntimeFailureReason>,
shutdown_requested_by: Option<String>,
shutdown_reason: Option<String>,
}
fn validate_required_text(value: &str, field: &str) -> Result<(), SupervisorError> {
if value.trim().is_empty() {
return Err(SupervisorError::InvalidTransition {
message: format!("runtime control plane {field} must not be empty"),
});
}
Ok(())
}
fn unix_nanos_now() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or_default()
}