harn-vm 0.8.6

Async bytecode virtual machine for the Harn programming language
Documentation
use std::collections::BTreeMap;
use std::path::Path;

use serde::{Deserialize, Serialize};

use crate::event_log::EventLog;
use crate::value::{VmError, VmValue};

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub(crate) struct DaemonLoopConfig {
    pub persist_path: Option<String>,
    pub resume_path: Option<String>,
    pub wake_interval_ms: Option<u64>,
    pub watch_paths: Vec<String>,
    pub consolidate_on_idle: bool,
    pub idle_watchdog_attempts: Option<usize>,
}

impl DaemonLoopConfig {
    pub(crate) fn effective_persist_path(&self) -> Option<&str> {
        self.persist_path.as_deref().or(self.resume_path.as_deref())
    }
}

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub(crate) struct DaemonSnapshot {
    #[serde(rename = "_type")]
    pub type_name: String,
    pub saved_at: String,
    pub daemon_state: String,
    pub visible_messages: Vec<serde_json::Value>,
    pub recorded_messages: Vec<serde_json::Value>,
    pub transcript_summary: Option<String>,
    pub transcript_events: Vec<serde_json::Value>,
    pub total_text: String,
    pub last_iteration_text: String,
    pub all_tools_used: Vec<String>,
    pub rejected_tools: Vec<String>,
    pub deferred_user_messages: Vec<String>,
    pub total_iterations: usize,
    pub idle_backoff_ms: u64,
    pub last_run_exit_code: Option<i32>,
    pub watch_state: BTreeMap<String, u64>,
}

impl DaemonSnapshot {
    pub(crate) fn normalize(mut self) -> Self {
        if self.type_name.is_empty() {
            self.type_name = "daemon_snapshot".to_string();
        }
        if self.saved_at.is_empty() {
            self.saved_at = crate::orchestration::now_rfc3339();
        }
        if self.daemon_state.is_empty() {
            self.daemon_state = "active".to_string();
        }
        self
    }
}

pub(crate) fn load_snapshot(path: &str) -> Result<DaemonSnapshot, VmError> {
    let content = std::fs::read_to_string(path)
        .map_err(|error| VmError::Runtime(format!("daemon snapshot read error: {error}")))?;
    let snapshot: DaemonSnapshot = serde_json::from_str(&content)
        .map_err(|error| VmError::Runtime(format!("daemon snapshot parse error: {error}")))?;
    let snapshot = snapshot.normalize();
    append_daemon_state_event(path, "snapshot_loaded", &snapshot);
    Ok(snapshot)
}

pub(crate) fn persist_snapshot(path: &str, snapshot: &DaemonSnapshot) -> Result<String, VmError> {
    let path_buf = Path::new(path);
    if let Some(parent) = path_buf.parent() {
        std::fs::create_dir_all(parent)
            .map_err(|error| VmError::Runtime(format!("daemon snapshot mkdir error: {error}")))?;
    }
    let json = serde_json::to_string_pretty(&snapshot.clone().normalize())
        .map_err(|error| VmError::Runtime(format!("daemon snapshot encode error: {error}")))?;
    let tmp = path_buf.with_extension("json.tmp");
    std::fs::write(&tmp, json)
        .map_err(|error| VmError::Runtime(format!("daemon snapshot write error: {error}")))?;
    std::fs::rename(&tmp, path_buf)
        .map_err(|error| VmError::Runtime(format!("daemon snapshot finalize error: {error}")))?;
    append_daemon_state_event(path, "snapshot_persisted", &snapshot.clone().normalize());
    Ok(path.to_string())
}

pub(crate) fn parse_daemon_loop_config(
    options: Option<&BTreeMap<String, VmValue>>,
) -> DaemonLoopConfig {
    let Some(options) = options else {
        return DaemonLoopConfig::default();
    };
    let watch_paths = match options.get("watch_paths") {
        Some(VmValue::List(items)) => items
            .iter()
            .map(VmValue::display)
            .filter(|path| !path.is_empty())
            .collect(),
        Some(VmValue::String(path)) if !path.is_empty() => vec![path.to_string()],
        Some(value) => {
            let path = value.display();
            if path.is_empty() {
                Vec::new()
            } else {
                vec![path]
            }
        }
        None => Vec::new(),
    };
    DaemonLoopConfig {
        persist_path: options
            .get("persist_path")
            .map(VmValue::display)
            .filter(|value| !value.is_empty()),
        resume_path: options
            .get("resume_path")
            .map(VmValue::display)
            .filter(|value| !value.is_empty()),
        wake_interval_ms: options
            .get("wake_interval_ms")
            .and_then(VmValue::as_int)
            .map(|value| value as u64)
            .filter(|value| *value > 0),
        watch_paths,
        consolidate_on_idle: options
            .get("consolidate_on_idle")
            .is_some_and(|value| matches!(value, VmValue::Bool(true))),
        idle_watchdog_attempts: options
            .get("idle_watchdog_attempts")
            .and_then(VmValue::as_int)
            .and_then(|value| usize::try_from(value).ok()),
    }
}

pub(crate) trait MtimeProvider {
    fn mtime_ns(&self, path: &str) -> u64;
}

pub(crate) struct RealMtimeProvider;

impl MtimeProvider for RealMtimeProvider {
    fn mtime_ns(&self, path: &str) -> u64 {
        std::fs::metadata(path)
            .ok()
            .and_then(|metadata| metadata.modified().ok())
            .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
            .and_then(|duration| u64::try_from(duration.as_nanos()).ok())
            .unwrap_or(0)
    }
}

pub(crate) fn watch_state(provider: &dyn MtimeProvider, paths: &[String]) -> BTreeMap<String, u64> {
    paths
        .iter()
        .map(|path| (path.clone(), provider.mtime_ns(path)))
        .collect()
}

fn append_daemon_state_event(path: &str, kind: &str, snapshot: &DaemonSnapshot) {
    let Some(log) = crate::event_log::active_event_log() else {
        return;
    };
    let stem = Path::new(path)
        .file_stem()
        .and_then(|value| value.to_str())
        .map(crate::event_log::sanitize_topic_component)
        .unwrap_or_else(|| "snapshot".to_string());
    let Ok(topic) = crate::event_log::Topic::new(format!("daemon.{stem}.state")) else {
        return;
    };
    let mut headers = BTreeMap::new();
    headers.insert("path".to_string(), path.to_string());
    let payload = serde_json::to_value(snapshot).unwrap_or(serde_json::Value::Null);
    let event = crate::event_log::LogEvent::new(kind, payload).with_headers(headers);
    if let Ok(handle) = tokio::runtime::Handle::try_current() {
        handle.spawn(async move {
            let _ = log.append(&topic, event).await;
        });
    } else {
        let _ = futures::executor::block_on(log.append(&topic, event));
    }
}