pub mod config;
pub mod metrics;
pub mod poller;
#[cfg(feature = "daemon")]
pub mod http;
#[cfg(test)]
mod tests;
use std::future::Future;
use std::sync::Arc;
use tracing::{info, warn};
use crate::activity::monitor::{ActivityMonitor, LlmClassifier};
use crate::session_manager::SessionManager;
pub use config::SupervisorConfig;
pub use metrics::{FleetMetrics, PendingDecision, SupervisorRunStats};
pub use poller::{TickReport, run_tick};
#[cfg(feature = "daemon")]
pub use http::{MetricsHandle, new_handle};
pub struct Supervisor<C: LlmClassifier> {
mgr: Arc<SessionManager>,
cfg: SupervisorConfig,
monitor: Option<ActivityMonitor<C>>,
stats: SupervisorRunStats,
}
impl<C: LlmClassifier> Supervisor<C> {
pub fn new(
mgr: Arc<SessionManager>,
cfg: SupervisorConfig,
monitor: Option<ActivityMonitor<C>>,
) -> Self {
Self {
mgr,
cfg,
monitor,
stats: SupervisorRunStats::default(),
}
}
pub fn stats(&self) -> &SupervisorRunStats {
&self.stats
}
pub async fn tick(&mut self) -> TickReport {
let report = run_tick(&self.mgr, &self.cfg, self.monitor.as_ref()).await;
self.stats.sweeps += 1;
self.stats.auto_resumed += report.resumed.len() as u64;
self.stats.resume_failures += report.resume_failures as u64;
self.stats.classified += report.classified as u64;
report
}
pub async fn snapshot(&self) -> FleetMetrics {
let records = self.mgr.list().await;
let mut m = FleetMetrics::from_records(&records);
m.run_stats = self.stats.clone();
m
}
#[cfg(feature = "daemon")]
pub async fn run(self, handle: http::MetricsHandle) -> anyhow::Result<()> {
self.run_until(handle, shutdown_signal()).await
}
#[cfg(feature = "daemon")]
pub async fn run_until(
mut self,
handle: http::MetricsHandle,
shutdown: impl Future<Output = ()>,
) -> anyhow::Result<()> {
info!(
interval_secs = self.cfg.interval.as_secs(),
auto_resume = self.cfg.auto_resume,
classify_idle = self.cfg.classify_idle,
"supervisor loop starting"
);
if !self.cfg.auto_resume {
warn!(
"supervisor: auto-resume DISABLED ({}=1 to enable); running as observe-only",
config::ENV_AUTO_RESUME
);
}
let mut timer = tokio::time::interval(self.cfg.interval);
*handle.write().await = self.snapshot().await;
let mut shutdown = std::pin::pin!(shutdown);
loop {
tokio::select! {
biased;
() = &mut shutdown => {
info!(
sweeps = self.stats.sweeps,
auto_resumed = self.stats.auto_resumed,
"supervisor received shutdown signal; stopping cleanly"
);
return Ok(());
}
_ = timer.tick() => {
self.tick().await;
*handle.write().await = self.snapshot().await;
}
}
}
}
}
#[cfg(feature = "daemon")]
async fn shutdown_signal() {
let ctrl_c = async {
if let Err(e) = tokio::signal::ctrl_c().await {
warn!("failed to listen for Ctrl-C: {e}");
}
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sig) => {
sig.recv().await;
}
Err(e) => warn!("failed to install SIGTERM handler: {e}"),
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => {},
() = terminate => {},
}
}