use std::collections::BTreeMap;
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::event_log::EventLog;
use crate::value::{VmError, VmValue};
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub(crate) struct DaemonLoopConfig {
pub persist_path: Option<String>,
pub resume_path: Option<String>,
pub wake_interval_ms: Option<u64>,
pub watch_paths: Vec<String>,
pub consolidate_on_idle: bool,
pub idle_watchdog_attempts: Option<usize>,
}
impl DaemonLoopConfig {
pub(crate) fn effective_persist_path(&self) -> Option<&str> {
self.persist_path.as_deref().or(self.resume_path.as_deref())
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub(crate) struct DaemonSnapshot {
#[serde(rename = "_type")]
pub type_name: String,
pub saved_at: String,
pub daemon_state: String,
pub visible_messages: Vec<serde_json::Value>,
pub recorded_messages: Vec<serde_json::Value>,
pub transcript_summary: Option<String>,
pub transcript_events: Vec<serde_json::Value>,
pub total_text: String,
pub last_iteration_text: String,
pub all_tools_used: Vec<String>,
pub rejected_tools: Vec<String>,
pub deferred_user_messages: Vec<String>,
pub total_iterations: usize,
pub idle_backoff_ms: u64,
pub last_run_exit_code: Option<i32>,
pub watch_state: BTreeMap<String, u64>,
}
impl DaemonSnapshot {
pub(crate) fn normalize(mut self) -> Self {
if self.type_name.is_empty() {
self.type_name = "daemon_snapshot".to_string();
}
if self.saved_at.is_empty() {
self.saved_at = crate::orchestration::now_rfc3339();
}
if self.daemon_state.is_empty() {
self.daemon_state = "active".to_string();
}
self
}
}
pub(crate) fn load_snapshot(path: &str) -> Result<DaemonSnapshot, VmError> {
let content = std::fs::read_to_string(path)
.map_err(|error| VmError::Runtime(format!("daemon snapshot read error: {error}")))?;
let snapshot: DaemonSnapshot = serde_json::from_str(&content)
.map_err(|error| VmError::Runtime(format!("daemon snapshot parse error: {error}")))?;
let snapshot = snapshot.normalize();
append_daemon_state_event(path, "snapshot_loaded", &snapshot);
Ok(snapshot)
}
pub(crate) fn persist_snapshot(path: &str, snapshot: &DaemonSnapshot) -> Result<String, VmError> {
let path_buf = Path::new(path);
if let Some(parent) = path_buf.parent() {
std::fs::create_dir_all(parent)
.map_err(|error| VmError::Runtime(format!("daemon snapshot mkdir error: {error}")))?;
}
let json = serde_json::to_string_pretty(&snapshot.clone().normalize())
.map_err(|error| VmError::Runtime(format!("daemon snapshot encode error: {error}")))?;
let tmp = path_buf.with_extension("json.tmp");
std::fs::write(&tmp, json)
.map_err(|error| VmError::Runtime(format!("daemon snapshot write error: {error}")))?;
std::fs::rename(&tmp, path_buf)
.map_err(|error| VmError::Runtime(format!("daemon snapshot finalize error: {error}")))?;
append_daemon_state_event(path, "snapshot_persisted", &snapshot.clone().normalize());
Ok(path.to_string())
}
pub(crate) fn parse_daemon_loop_config(
options: Option<&BTreeMap<String, VmValue>>,
) -> DaemonLoopConfig {
let Some(options) = options else {
return DaemonLoopConfig::default();
};
let watch_paths = match options.get("watch_paths") {
Some(VmValue::List(items)) => items
.iter()
.map(VmValue::display)
.filter(|path| !path.is_empty())
.collect(),
Some(VmValue::String(path)) if !path.is_empty() => vec![path.to_string()],
Some(value) => {
let path = value.display();
if path.is_empty() {
Vec::new()
} else {
vec![path]
}
}
None => Vec::new(),
};
DaemonLoopConfig {
persist_path: options
.get("persist_path")
.map(VmValue::display)
.filter(|value| !value.is_empty()),
resume_path: options
.get("resume_path")
.map(VmValue::display)
.filter(|value| !value.is_empty()),
wake_interval_ms: options
.get("wake_interval_ms")
.and_then(VmValue::as_int)
.map(|value| value as u64)
.filter(|value| *value > 0),
watch_paths,
consolidate_on_idle: options
.get("consolidate_on_idle")
.is_some_and(|value| matches!(value, VmValue::Bool(true))),
idle_watchdog_attempts: options
.get("idle_watchdog_attempts")
.and_then(VmValue::as_int)
.and_then(|value| usize::try_from(value).ok()),
}
}
pub(crate) trait MtimeProvider {
fn mtime_ns(&self, path: &str) -> u64;
}
pub(crate) struct RealMtimeProvider;
impl MtimeProvider for RealMtimeProvider {
fn mtime_ns(&self, path: &str) -> u64 {
std::fs::metadata(path)
.ok()
.and_then(|metadata| metadata.modified().ok())
.and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
.and_then(|duration| u64::try_from(duration.as_nanos()).ok())
.unwrap_or(0)
}
}
pub(crate) fn watch_state(provider: &dyn MtimeProvider, paths: &[String]) -> BTreeMap<String, u64> {
paths
.iter()
.map(|path| (path.clone(), provider.mtime_ns(path)))
.collect()
}
fn append_daemon_state_event(path: &str, kind: &str, snapshot: &DaemonSnapshot) {
let Some(log) = crate::event_log::active_event_log() else {
return;
};
let stem = Path::new(path)
.file_stem()
.and_then(|value| value.to_str())
.map(crate::event_log::sanitize_topic_component)
.unwrap_or_else(|| "snapshot".to_string());
let Ok(topic) = crate::event_log::Topic::new(format!("daemon.{stem}.state")) else {
return;
};
let mut headers = BTreeMap::new();
headers.insert("path".to_string(), path.to_string());
let payload = serde_json::to_value(snapshot).unwrap_or(serde_json::Value::Null);
let event = crate::event_log::LogEvent::new(kind, payload).with_headers(headers);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = log.append(&topic, event).await;
});
} else {
let _ = futures::executor::block_on(log.append(&topic, event));
}
}