openlatch-provider 0.0.0

Self-service onboarding CLI + runtime daemon for OpenLatch Editors and Providers
//! Audit-log writer — one JSONL line per processed event.
//!
//! Hot-path cost: 0 ms. Per `.claude/rules/logging.md` we use an mpsc channel
//! sized at 1024 entries; the foreground request handler `try_send`s the
//! record and never awaits. A single background task drains the channel and
//! sync-writes to a daily-rotated file under
//! `~/.openlatch/provider/logs/runtime-YYYY-MM-DD.jsonl`.
//!
//! Channel-overflow policy: drop with `warn!`. Records here have already
//! cleared HMAC verification, so dropping a single line is safer than blocking
//! the proxy hot-path. (We could persist to a circular buffer instead — P3
//! decision if overflow becomes a real problem.)
//!
//! **Never logged:** the event payload, the verdict body, secrets. Only
//! metadata — see `Record` below.

use std::path::{Path, PathBuf};
use std::time::SystemTime;

use chrono::{DateTime, NaiveDate, Utc};
use serde::Serialize;
use serde_json::Value;
use tokio::sync::mpsc;
use tracing::{error, warn};

use crate::error::{OlError, OL_4272_XDG_DIR_UNWRITABLE};

/// Default mpsc buffer capacity. Sized to absorb retry storms; sustained
/// throughput (≥ 500 events/sec) drains in well under 100 ms.
pub const CHANNEL_CAPACITY: usize = 1024;

/// One row of the audit log. Public-shape so the runtime tests can assert on
/// it after parsing the JSONL line back.
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct Record {
    pub timestamp: DateTime<Utc>,
    pub event_id: String,
    pub binding_id: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub verdict_hint: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub risk_score: Option<u8>,
    pub processing_ms: u64,
    pub tool_ms: u64,
    /// One of: `delivered` | `tool_unreachable` | `tool_5xx` | `oversize` |
    /// `timeout` | `replay_cache_hit` | `hmac_failed`.
    pub outcome: String,
}

impl Record {
    pub fn now(
        event_id: impl Into<String>,
        binding_id: impl Into<String>,
        outcome: impl Into<String>,
    ) -> Self {
        Self {
            timestamp: SystemTime::now().into(),
            event_id: event_id.into(),
            binding_id: binding_id.into(),
            verdict_hint: None,
            risk_score: None,
            processing_ms: 0,
            tool_ms: 0,
            outcome: outcome.into(),
        }
    }

    pub fn with_verdict(mut self, verdict_hint: Option<String>, risk_score: Option<u8>) -> Self {
        self.verdict_hint = verdict_hint;
        self.risk_score = risk_score;
        self
    }

    pub fn with_timings(mut self, processing_ms: u64, tool_ms: u64) -> Self {
        self.processing_ms = processing_ms;
        self.tool_ms = tool_ms;
        self
    }
}

/// Foreground handle. Cloneable; cheap to clone (Arc inside).
#[derive(Clone)]
pub struct AuditTx {
    sender: mpsc::Sender<Record>,
}

impl AuditTx {
    /// Try to enqueue a record. Drops with `warn!` on overflow — never
    /// blocks the hot path.
    pub fn append(&self, record: Record) {
        match self.sender.try_send(record) {
            Ok(()) => {}
            Err(mpsc::error::TrySendError::Full(_)) => {
                warn!("audit log channel full — dropping 1 record");
            }
            Err(mpsc::error::TrySendError::Closed(_)) => {
                // Background task already exited; suppress to avoid spam.
            }
        }
    }
}

/// Spawn the writer task and return the foreground handle. The task lives
/// for the lifetime of the runtime; senders dropping closes the channel which
/// drains the task.
pub fn spawn_writer(
    log_dir: PathBuf,
    retention_days: u64,
) -> Result<(AuditTx, tokio::task::JoinHandle<()>), OlError> {
    std::fs::create_dir_all(&log_dir).map_err(|e| {
        OlError::new(
            OL_4272_XDG_DIR_UNWRITABLE,
            format!("cannot create '{}': {e}", log_dir.display()),
        )
    })?;
    cleanup_old(&log_dir, retention_days);

    let (tx, mut rx) = mpsc::channel::<Record>(CHANNEL_CAPACITY);
    let dir = log_dir.clone();
    let handle = tokio::spawn(async move {
        while let Some(record) = rx.recv().await {
            if let Err(e) = append_to_file(&dir, &record) {
                error!(error = %e, "audit log write failed");
            }
        }
    });

    Ok((AuditTx { sender: tx }, handle))
}

fn append_to_file(dir: &Path, record: &Record) -> std::io::Result<()> {
    use std::io::Write;
    let date = record.timestamp.naive_utc().date();
    let path = log_path_for(dir, date);
    let mut file = std::fs::OpenOptions::new()
        .create(true)
        .append(true)
        .open(&path)?;
    let mut line = serde_json::to_string(record).expect("Record always serializable");
    line.push('\n');
    file.write_all(line.as_bytes())
}

pub fn log_path_for(dir: &Path, date: NaiveDate) -> PathBuf {
    dir.join(format!("runtime-{}.jsonl", date.format("%Y-%m-%d")))
}

/// Delete log files older than `retention_days` based on the date in the
/// filename (the OS's mtime would be subject to clock-skew on shared mounts).
pub fn cleanup_old(dir: &Path, retention_days: u64) {
    if retention_days == 0 {
        return;
    }
    let cutoff = Utc::now().date_naive() - chrono::Duration::days(retention_days as i64);
    let Ok(entries) = std::fs::read_dir(dir) else {
        return;
    };
    for entry in entries.flatten() {
        let Some(name) = entry.file_name().to_str().map(str::to_string) else {
            continue;
        };
        let Some(date_str) = name
            .strip_prefix("runtime-")
            .and_then(|s| s.strip_suffix(".jsonl"))
        else {
            continue;
        };
        let Ok(date) = NaiveDate::parse_from_str(date_str, "%Y-%m-%d") else {
            continue;
        };
        if date < cutoff {
            let _ = std::fs::remove_file(entry.path());
        }
    }
}

/// Shape the runtime's per-event metadata into a [`Record`] given a parsed
/// verdict. `outcome` and timings are filled in by the caller.
pub fn record_from_verdict(
    event_id: &str,
    binding_id: &str,
    parsed: Option<&Value>,
    outcome: &str,
    processing_ms: u64,
    tool_ms: u64,
) -> Record {
    let (verdict_hint, risk_score) = parsed
        .map(|v| {
            (
                v.get("verdictHint")
                    .and_then(Value::as_str)
                    .map(String::from),
                v.get("riskScore")
                    .and_then(Value::as_u64)
                    .map(|n| n.min(255) as u8),
            )
        })
        .unwrap_or((None, None));
    Record::now(event_id, binding_id, outcome)
        .with_verdict(verdict_hint, risk_score)
        .with_timings(processing_ms, tool_ms)
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    #[test]
    fn record_serializes_with_iso_timestamp_and_no_extra_fields() {
        let r = Record::now("evt_x", "bnd_y", "delivered")
            .with_verdict(Some("allow".into()), Some(12))
            .with_timings(47, 42);
        let s = serde_json::to_string(&r).unwrap();
        assert!(s.contains("\"event_id\":\"evt_x\""));
        assert!(s.contains("\"binding_id\":\"bnd_y\""));
        assert!(s.contains("\"verdict_hint\":\"allow\""));
        assert!(s.contains("\"risk_score\":12"));
        assert!(s.contains("\"outcome\":\"delivered\""));
    }

    #[test]
    fn log_path_uses_date_format() {
        let p = log_path_for(
            Path::new("/tmp/x"),
            NaiveDate::from_ymd_opt(2026, 5, 4).unwrap(),
        );
        assert!(p.to_string_lossy().ends_with("runtime-2026-05-04.jsonl"));
    }

    #[test]
    fn cleanup_old_removes_dated_files_past_cutoff() {
        let dir = TempDir::new().unwrap();
        let old = dir.path().join("runtime-2020-01-01.jsonl");
        let recent = dir
            .path()
            .join(format!("runtime-{}.jsonl", Utc::now().date_naive()));
        std::fs::write(&old, b"x").unwrap();
        std::fs::write(&recent, b"x").unwrap();
        cleanup_old(dir.path(), 14);
        assert!(!old.exists(), "old file must be deleted");
        assert!(recent.exists(), "today's file must survive");
    }

    #[tokio::test]
    async fn writer_appends_one_line_per_record() {
        let dir = TempDir::new().unwrap();
        let (tx, handle) = spawn_writer(dir.path().to_path_buf(), 14).unwrap();
        tx.append(Record::now("evt_a", "bnd_a", "delivered"));
        tx.append(Record::now("evt_b", "bnd_a", "delivered"));
        drop(tx);
        handle.await.unwrap();

        let path = log_path_for(dir.path(), Utc::now().date_naive());
        let raw = std::fs::read_to_string(&path).expect("log file written");
        let lines: Vec<&str> = raw.lines().collect();
        assert_eq!(lines.len(), 2, "two records → two lines");
        let first: Record = serde_json::from_str(lines[0]).unwrap();
        assert_eq!(first.event_id, "evt_a");
    }
}