openlatch-client 0.1.6

The open-source security layer for AI agents — client forwarder
/// JSONL event audit logger with daily rotation and retention cleanup.
///
/// This is a leaf module — it has zero imports from other modules in this crate.
/// The writer task runs in its own tokio task and is the ONLY writer for log files,
/// preventing concurrent write corruption (architecture rule: single writer per file).
pub mod daemon_log;
pub mod tamper_log;

use std::io::Write;
use std::path::{Path, PathBuf};

use tokio::sync::mpsc;

// ---------------------------------------------------------------------------
// Log file naming (LOG-01, LOG-02)
// ---------------------------------------------------------------------------

/// Returns the log file name for the given date.
///
/// # Examples
///
/// ```
/// use chrono::NaiveDate;
/// // Returns "events-2026-04-07.jsonl"
/// ```
pub fn log_file_name(date: &chrono::NaiveDate) -> String {
    format!("events-{}.jsonl", date.format("%Y-%m-%d"))
}

/// Returns the full path for today's event log file (UTC date).
pub fn current_log_path(log_dir: &Path) -> PathBuf {
    let today = chrono::Utc::now().date_naive();
    log_dir.join(log_file_name(&today))
}

// ---------------------------------------------------------------------------
// Synchronous append helper (used by writer task only)
// ---------------------------------------------------------------------------

/// Append a single JSON event as one line to the log file.
///
/// Creates the file if it does not exist (append mode).
/// Returns an error if the file cannot be opened or written.
///
/// # Errors
///
/// Returns `std::io::Error` if the file cannot be opened or if serialization fails.
pub fn append_event_sync(path: &Path, event: &serde_json::Value) -> std::io::Result<()> {
    let line = serde_json::to_string(event).map_err(std::io::Error::other)?;
    append_line_sync(path, &line)
}

/// Append a pre-serialized JSON line to the log file.
///
/// Creates the file if it does not exist (append mode).
fn append_line_sync(path: &Path, line: &str) -> std::io::Result<()> {
    let mut file = std::fs::OpenOptions::new()
        .create(true)
        .append(true)
        .open(path)?;
    writeln!(file, "{}", line)?;
    Ok(())
}

// ---------------------------------------------------------------------------
// EventLogger with mpsc channel (PERFORMANCE: non-blocking on hot path)
// ---------------------------------------------------------------------------

/// Sends pre-serialized event JSON lines to the background writer task via an mpsc channel.
///
/// `log()` is non-blocking: if the channel is full (capacity 1024), the event
/// is dropped with a warning rather than blocking the verdict path.
///
/// # Architecture
///
/// `EventLogger` is the sender side; `EventLoggerHandle` owns the background task.
/// The caller must drop `EventLogger` before calling `EventLoggerHandle::shutdown()`
/// so the writer task sees the channel close and exits cleanly.
///
/// Events are accepted as pre-serialized JSON strings to avoid double serialization
/// (struct → Value → String). The caller serializes once; the writer appends as-is.
#[derive(Clone)]
pub struct EventLogger {
    tx: mpsc::Sender<String>,
}

/// Owns the background writer task. MUST be held until shutdown.
pub struct EventLoggerHandle {
    join_handle: tokio::task::JoinHandle<()>,
}

impl EventLogger {
    /// Create a new `EventLogger` and its background writer task.
    ///
    /// Returns the logger (sender) and the handle (task owner) as a pair.
    /// The caller must hold the `EventLoggerHandle` until the daemon shuts down.
    pub fn new(log_dir: PathBuf) -> (Self, EventLoggerHandle) {
        let (tx, rx) = mpsc::channel(1024);
        let handle = EventLoggerHandle {
            join_handle: tokio::spawn(writer_task(log_dir, rx)),
        };
        (Self { tx }, handle)
    }

    /// Send a pre-serialized JSON line to the background writer (non-blocking).
    ///
    /// Uses `try_send` so the hot path never awaits channel capacity.
    /// If the channel is full or closed, the event is dropped with a warning.
    pub fn log(&self, event_json: String) {
        // PERFORMANCE: try_send is synchronous — never blocks the verdict path
        if self.tx.try_send(event_json).is_err() {
            tracing::warn!("event log channel full or closed, event dropped");
        }
    }
}

impl EventLoggerHandle {
    /// Wait for the background writer to drain and exit.
    ///
    /// The caller must drop (or let go of) the `EventLogger` sender BEFORE calling
    /// this method, otherwise the writer task will block waiting for more events.
    pub async fn shutdown(self) {
        let _ = self.join_handle.await;
    }
}

/// Background task that drains the mpsc channel and appends pre-serialized JSON lines to disk.
///
/// This is the ONLY writer for event log files — single-writer design prevents
/// file corruption without requiring a mutex.
async fn writer_task(log_dir: PathBuf, mut rx: mpsc::Receiver<String>) {
    // Ensure log directory exists before writing any events.
    if let Err(e) = tokio::fs::create_dir_all(&log_dir).await {
        tracing::error!(error = %e, "failed to create log directory");
        return;
    }
    while let Some(json_line) = rx.recv().await {
        let path = current_log_path(&log_dir);
        if let Err(e) = append_line_sync(&path, &json_line) {
            // Fail-open: log warning but keep processing subsequent events.
            tracing::warn!(error = %e, "failed to append event to log");
        }
    }
    tracing::debug!("event logger writer task shutting down");
}

// ---------------------------------------------------------------------------
// Retention cleanup (LOG-03)
// ---------------------------------------------------------------------------

/// Delete log files older than `retention_days` from `log_dir`.
///
/// Only files matching the pattern `events-YYYY-MM-DD.jsonl` are considered.
/// Files with unrecognized names are left untouched.
///
/// Returns the number of files deleted.
///
/// # Errors
///
/// Returns `std::io::Error` if the directory cannot be read or a file cannot be deleted.
pub fn cleanup_old_logs(log_dir: &Path, retention_days: u32) -> std::io::Result<u32> {
    let cutoff = chrono::Utc::now().date_naive() - chrono::Duration::days(retention_days as i64);
    let mut deleted = 0u32;

    for entry in std::fs::read_dir(log_dir)? {
        let entry = entry?;
        let name = entry.file_name();
        let name_str = name.to_string_lossy();
        // Match pattern: events-YYYY-MM-DD.jsonl
        if let Some(date_str) = name_str
            .strip_prefix("events-")
            .and_then(|s| s.strip_suffix(".jsonl"))
        {
            if let Ok(date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
                if date < cutoff {
                    std::fs::remove_file(entry.path())?;
                    deleted += 1;
                }
            }
        }
    }
    Ok(deleted)
}

// ---------------------------------------------------------------------------
// Unit tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;
    use std::io::BufRead;
    use tempfile::TempDir;

    fn make_date(year: i32, month: u32, day: u32) -> chrono::NaiveDate {
        chrono::NaiveDate::from_ymd_opt(year, month, day).unwrap()
    }

    // Test 1: log_file_name() for 2026-04-07 returns "events-2026-04-07.jsonl"
    #[test]
    fn test_log_file_name_format() {
        let date = make_date(2026, 4, 7);
        assert_eq!(log_file_name(&date), "events-2026-04-07.jsonl");
    }

    // Test 2: append_event writes a valid JSON line to the file (parseable by serde_json)
    #[test]
    fn test_append_event_sync_writes_valid_json() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("test.jsonl");
        let event = json!({"tool": "bash", "session": "abc123"});

        append_event_sync(&path, &event).unwrap();

        let content = std::fs::read_to_string(&path).unwrap();
        let line = content.lines().next().unwrap();
        // Must be parseable as JSON
        let parsed: serde_json::Value = serde_json::from_str(line).unwrap();
        assert_eq!(parsed["tool"], "bash");
    }

    // Test 3: append_event writes exactly one line (ends with \n, no embedded newlines)
    #[test]
    fn test_append_event_sync_single_line() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("test.jsonl");
        let event = json!({"key": "value"});

        append_event_sync(&path, &event).unwrap();

        let content = std::fs::read_to_string(&path).unwrap();
        // Exactly one line (content ends with \n and has no embedded newlines in the JSON)
        assert!(content.ends_with('\n'), "file must end with newline");
        let lines: Vec<&str> = content.lines().collect();
        assert_eq!(lines.len(), 1, "must produce exactly one line");
    }

    // Test 4: Two calls to append_event produce two lines in the file
    #[test]
    fn test_append_event_sync_two_calls_two_lines() {
        let dir = TempDir::new().unwrap();
        let path = dir.path().join("test.jsonl");
        let event1 = json!({"seq": 1});
        let event2 = json!({"seq": 2});

        append_event_sync(&path, &event1).unwrap();
        append_event_sync(&path, &event2).unwrap();

        let file = std::fs::File::open(&path).unwrap();
        let lines: Vec<_> = std::io::BufReader::new(file)
            .lines()
            .collect::<Result<_, _>>()
            .unwrap();
        assert_eq!(lines.len(), 2, "must produce exactly two lines");

        let parsed1: serde_json::Value = serde_json::from_str(&lines[0]).unwrap();
        let parsed2: serde_json::Value = serde_json::from_str(&lines[1]).unwrap();
        assert_eq!(parsed1["seq"], 1);
        assert_eq!(parsed2["seq"], 2);
    }

    // Test 5: cleanup_old_logs with retention_days=0 deletes all events-*.jsonl files
    #[test]
    fn test_cleanup_old_logs_retention_zero_deletes_all() {
        let dir = TempDir::new().unwrap();
        // Create files for yesterday and two days ago (both older than today when retention=0)
        let yesterday = chrono::Utc::now().date_naive() - chrono::Duration::days(1);
        let two_days_ago = chrono::Utc::now().date_naive() - chrono::Duration::days(2);

        std::fs::write(dir.path().join(log_file_name(&yesterday)), b"line\n").unwrap();
        std::fs::write(dir.path().join(log_file_name(&two_days_ago)), b"line\n").unwrap();

        let deleted = cleanup_old_logs(dir.path(), 0).unwrap();
        assert_eq!(deleted, 2, "retention_days=0 must delete all past files");

        // Directory should now be empty
        let remaining: Vec<_> = std::fs::read_dir(dir.path())
            .unwrap()
            .collect::<Result<_, _>>()
            .unwrap();
        assert!(remaining.is_empty(), "no files should remain");
    }

    // Test 6: cleanup_old_logs with retention_days=30 keeps files newer than 30 days
    #[test]
    fn test_cleanup_old_logs_retention_keeps_recent() {
        let dir = TempDir::new().unwrap();
        let recent = chrono::Utc::now().date_naive() - chrono::Duration::days(5);
        let old = chrono::Utc::now().date_naive() - chrono::Duration::days(40);

        let recent_file = dir.path().join(log_file_name(&recent));
        let old_file = dir.path().join(log_file_name(&old));

        std::fs::write(&recent_file, b"line\n").unwrap();
        std::fs::write(&old_file, b"line\n").unwrap();

        let deleted = cleanup_old_logs(dir.path(), 30).unwrap();
        assert_eq!(deleted, 1, "only the old file should be deleted");
        assert!(recent_file.exists(), "recent file must not be deleted");
        assert!(!old_file.exists(), "old file must be deleted");
    }

    // Test 7: EventLogger sends events via mpsc channel and writer task appends them
    #[tokio::test]
    async fn test_event_logger_sends_and_appends() {
        let dir = TempDir::new().unwrap();
        let log_dir = dir.path().to_path_buf();

        let (logger, handle) = EventLogger::new(log_dir.clone());

        let event = json!({"tool": "write_file", "session": "sess_001"});
        let event_str = serde_json::to_string(&event).unwrap();
        logger.log(event_str);

        // Drop the logger to close the sender channel, then wait for writer to drain.
        drop(logger);
        handle.shutdown().await;

        let today = chrono::Utc::now().date_naive();
        let log_path = log_dir.join(log_file_name(&today));

        assert!(log_path.exists(), "log file must be created");
        let content = std::fs::read_to_string(&log_path).unwrap();
        let parsed: serde_json::Value =
            serde_json::from_str(content.lines().next().unwrap()).unwrap();
        assert_eq!(parsed["tool"], "write_file");
    }
}