Skip to main content

openlatch_client/core/logging/
mod.rs

1/// JSONL event audit logger with daily rotation and retention cleanup.
2///
3/// This is a leaf module — it has zero imports from other modules in this crate.
4/// The writer task runs in its own tokio task and is the ONLY writer for log files,
5/// preventing concurrent write corruption (architecture rule: single writer per file).
6pub mod daemon_log;
7
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11use tokio::sync::mpsc;
12
13// ---------------------------------------------------------------------------
14// Log file naming (LOG-01, LOG-02)
15// ---------------------------------------------------------------------------
16
17/// Returns the log file name for the given date.
18///
19/// # Examples
20///
21/// ```
22/// use chrono::NaiveDate;
23/// // Returns "events-2026-04-07.jsonl"
24/// ```
25pub fn log_file_name(date: &chrono::NaiveDate) -> String {
26    format!("events-{}.jsonl", date.format("%Y-%m-%d"))
27}
28
29/// Returns the full path for today's event log file (UTC date).
30pub fn current_log_path(log_dir: &Path) -> PathBuf {
31    let today = chrono::Utc::now().date_naive();
32    log_dir.join(log_file_name(&today))
33}
34
35// ---------------------------------------------------------------------------
36// Synchronous append helper (used by writer task only)
37// ---------------------------------------------------------------------------
38
39/// Append a single JSON event as one line to the log file.
40///
41/// Creates the file if it does not exist (append mode).
42/// Returns an error if the file cannot be opened or written.
43///
44/// # Errors
45///
46/// Returns `std::io::Error` if the file cannot be opened or if serialization fails.
47pub fn append_event_sync(path: &Path, event: &serde_json::Value) -> std::io::Result<()> {
48    let line = serde_json::to_string(event).map_err(std::io::Error::other)?;
49    append_line_sync(path, &line)
50}
51
52/// Append a pre-serialized JSON line to the log file.
53///
54/// Creates the file if it does not exist (append mode).
55fn append_line_sync(path: &Path, line: &str) -> std::io::Result<()> {
56    let mut file = std::fs::OpenOptions::new()
57        .create(true)
58        .append(true)
59        .open(path)?;
60    writeln!(file, "{}", line)?;
61    Ok(())
62}
63
64// ---------------------------------------------------------------------------
65// EventLogger with mpsc channel (PERFORMANCE: non-blocking on hot path)
66// ---------------------------------------------------------------------------
67
68/// Sends pre-serialized event JSON lines to the background writer task via an mpsc channel.
69///
70/// `log()` is non-blocking: if the channel is full (capacity 1024), the event
71/// is dropped with a warning rather than blocking the verdict path.
72///
73/// # Architecture
74///
75/// `EventLogger` is the sender side; `EventLoggerHandle` owns the background task.
76/// The caller must drop `EventLogger` before calling `EventLoggerHandle::shutdown()`
77/// so the writer task sees the channel close and exits cleanly.
78///
79/// Events are accepted as pre-serialized JSON strings to avoid double serialization
80/// (struct → Value → String). The caller serializes once; the writer appends as-is.
81#[derive(Clone)]
82pub struct EventLogger {
83    tx: mpsc::Sender<String>,
84}
85
86/// Owns the background writer task. MUST be held until shutdown.
87pub struct EventLoggerHandle {
88    join_handle: tokio::task::JoinHandle<()>,
89}
90
91impl EventLogger {
92    /// Create a new `EventLogger` and its background writer task.
93    ///
94    /// Returns the logger (sender) and the handle (task owner) as a pair.
95    /// The caller must hold the `EventLoggerHandle` until the daemon shuts down.
96    pub fn new(log_dir: PathBuf) -> (Self, EventLoggerHandle) {
97        let (tx, rx) = mpsc::channel(1024);
98        let handle = EventLoggerHandle {
99            join_handle: tokio::spawn(writer_task(log_dir, rx)),
100        };
101        (Self { tx }, handle)
102    }
103
104    /// Send a pre-serialized JSON line to the background writer (non-blocking).
105    ///
106    /// Uses `try_send` so the hot path never awaits channel capacity.
107    /// If the channel is full or closed, the event is dropped with a warning.
108    pub fn log(&self, event_json: String) {
109        // PERFORMANCE: try_send is synchronous — never blocks the verdict path
110        if self.tx.try_send(event_json).is_err() {
111            tracing::warn!("event log channel full or closed, event dropped");
112        }
113    }
114}
115
116impl EventLoggerHandle {
117    /// Wait for the background writer to drain and exit.
118    ///
119    /// The caller must drop (or let go of) the `EventLogger` sender BEFORE calling
120    /// this method, otherwise the writer task will block waiting for more events.
121    pub async fn shutdown(self) {
122        let _ = self.join_handle.await;
123    }
124}
125
126/// Background task that drains the mpsc channel and appends pre-serialized JSON lines to disk.
127///
128/// This is the ONLY writer for event log files — single-writer design prevents
129/// file corruption without requiring a mutex.
130async fn writer_task(log_dir: PathBuf, mut rx: mpsc::Receiver<String>) {
131    // Ensure log directory exists before writing any events.
132    if let Err(e) = tokio::fs::create_dir_all(&log_dir).await {
133        tracing::error!(error = %e, "failed to create log directory");
134        return;
135    }
136    while let Some(json_line) = rx.recv().await {
137        let path = current_log_path(&log_dir);
138        if let Err(e) = append_line_sync(&path, &json_line) {
139            // Fail-open: log warning but keep processing subsequent events.
140            tracing::warn!(error = %e, "failed to append event to log");
141        }
142    }
143    tracing::debug!("event logger writer task shutting down");
144}
145
146// ---------------------------------------------------------------------------
147// Retention cleanup (LOG-03)
148// ---------------------------------------------------------------------------
149
150/// Delete log files older than `retention_days` from `log_dir`.
151///
152/// Only files matching the pattern `events-YYYY-MM-DD.jsonl` are considered.
153/// Files with unrecognized names are left untouched.
154///
155/// Returns the number of files deleted.
156///
157/// # Errors
158///
159/// Returns `std::io::Error` if the directory cannot be read or a file cannot be deleted.
160pub fn cleanup_old_logs(log_dir: &Path, retention_days: u32) -> std::io::Result<u32> {
161    let cutoff = chrono::Utc::now().date_naive() - chrono::Duration::days(retention_days as i64);
162    let mut deleted = 0u32;
163
164    for entry in std::fs::read_dir(log_dir)? {
165        let entry = entry?;
166        let name = entry.file_name();
167        let name_str = name.to_string_lossy();
168        // Match pattern: events-YYYY-MM-DD.jsonl
169        if let Some(date_str) = name_str
170            .strip_prefix("events-")
171            .and_then(|s| s.strip_suffix(".jsonl"))
172        {
173            if let Ok(date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
174                if date < cutoff {
175                    std::fs::remove_file(entry.path())?;
176                    deleted += 1;
177                }
178            }
179        }
180    }
181    Ok(deleted)
182}
183
184// ---------------------------------------------------------------------------
185// Unit tests
186// ---------------------------------------------------------------------------
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use serde_json::json;
192    use std::io::BufRead;
193    use tempfile::TempDir;
194
195    fn make_date(year: i32, month: u32, day: u32) -> chrono::NaiveDate {
196        chrono::NaiveDate::from_ymd_opt(year, month, day).unwrap()
197    }
198
199    // Test 1: log_file_name() for 2026-04-07 returns "events-2026-04-07.jsonl"
200    #[test]
201    fn test_log_file_name_format() {
202        let date = make_date(2026, 4, 7);
203        assert_eq!(log_file_name(&date), "events-2026-04-07.jsonl");
204    }
205
206    // Test 2: append_event writes a valid JSON line to the file (parseable by serde_json)
207    #[test]
208    fn test_append_event_sync_writes_valid_json() {
209        let dir = TempDir::new().unwrap();
210        let path = dir.path().join("test.jsonl");
211        let event = json!({"tool": "bash", "session": "abc123"});
212
213        append_event_sync(&path, &event).unwrap();
214
215        let content = std::fs::read_to_string(&path).unwrap();
216        let line = content.lines().next().unwrap();
217        // Must be parseable as JSON
218        let parsed: serde_json::Value = serde_json::from_str(line).unwrap();
219        assert_eq!(parsed["tool"], "bash");
220    }
221
222    // Test 3: append_event writes exactly one line (ends with \n, no embedded newlines)
223    #[test]
224    fn test_append_event_sync_single_line() {
225        let dir = TempDir::new().unwrap();
226        let path = dir.path().join("test.jsonl");
227        let event = json!({"key": "value"});
228
229        append_event_sync(&path, &event).unwrap();
230
231        let content = std::fs::read_to_string(&path).unwrap();
232        // Exactly one line (content ends with \n and has no embedded newlines in the JSON)
233        assert!(content.ends_with('\n'), "file must end with newline");
234        let lines: Vec<&str> = content.lines().collect();
235        assert_eq!(lines.len(), 1, "must produce exactly one line");
236    }
237
238    // Test 4: Two calls to append_event produce two lines in the file
239    #[test]
240    fn test_append_event_sync_two_calls_two_lines() {
241        let dir = TempDir::new().unwrap();
242        let path = dir.path().join("test.jsonl");
243        let event1 = json!({"seq": 1});
244        let event2 = json!({"seq": 2});
245
246        append_event_sync(&path, &event1).unwrap();
247        append_event_sync(&path, &event2).unwrap();
248
249        let file = std::fs::File::open(&path).unwrap();
250        let lines: Vec<_> = std::io::BufReader::new(file)
251            .lines()
252            .collect::<Result<_, _>>()
253            .unwrap();
254        assert_eq!(lines.len(), 2, "must produce exactly two lines");
255
256        let parsed1: serde_json::Value = serde_json::from_str(&lines[0]).unwrap();
257        let parsed2: serde_json::Value = serde_json::from_str(&lines[1]).unwrap();
258        assert_eq!(parsed1["seq"], 1);
259        assert_eq!(parsed2["seq"], 2);
260    }
261
262    // Test 5: cleanup_old_logs with retention_days=0 deletes all events-*.jsonl files
263    #[test]
264    fn test_cleanup_old_logs_retention_zero_deletes_all() {
265        let dir = TempDir::new().unwrap();
266        // Create files for yesterday and two days ago (both older than today when retention=0)
267        let yesterday = chrono::Utc::now().date_naive() - chrono::Duration::days(1);
268        let two_days_ago = chrono::Utc::now().date_naive() - chrono::Duration::days(2);
269
270        std::fs::write(dir.path().join(log_file_name(&yesterday)), b"line\n").unwrap();
271        std::fs::write(dir.path().join(log_file_name(&two_days_ago)), b"line\n").unwrap();
272
273        let deleted = cleanup_old_logs(dir.path(), 0).unwrap();
274        assert_eq!(deleted, 2, "retention_days=0 must delete all past files");
275
276        // Directory should now be empty
277        let remaining: Vec<_> = std::fs::read_dir(dir.path())
278            .unwrap()
279            .collect::<Result<_, _>>()
280            .unwrap();
281        assert!(remaining.is_empty(), "no files should remain");
282    }
283
284    // Test 6: cleanup_old_logs with retention_days=30 keeps files newer than 30 days
285    #[test]
286    fn test_cleanup_old_logs_retention_keeps_recent() {
287        let dir = TempDir::new().unwrap();
288        let recent = chrono::Utc::now().date_naive() - chrono::Duration::days(5);
289        let old = chrono::Utc::now().date_naive() - chrono::Duration::days(40);
290
291        let recent_file = dir.path().join(log_file_name(&recent));
292        let old_file = dir.path().join(log_file_name(&old));
293
294        std::fs::write(&recent_file, b"line\n").unwrap();
295        std::fs::write(&old_file, b"line\n").unwrap();
296
297        let deleted = cleanup_old_logs(dir.path(), 30).unwrap();
298        assert_eq!(deleted, 1, "only the old file should be deleted");
299        assert!(recent_file.exists(), "recent file must not be deleted");
300        assert!(!old_file.exists(), "old file must be deleted");
301    }
302
303    // Test 7: EventLogger sends events via mpsc channel and writer task appends them
304    #[tokio::test]
305    async fn test_event_logger_sends_and_appends() {
306        let dir = TempDir::new().unwrap();
307        let log_dir = dir.path().to_path_buf();
308
309        let (logger, handle) = EventLogger::new(log_dir.clone());
310
311        let event = json!({"tool": "write_file", "session": "sess_001"});
312        let event_str = serde_json::to_string(&event).unwrap();
313        logger.log(event_str);
314
315        // Drop the logger to close the sender channel, then wait for writer to drain.
316        drop(logger);
317        handle.shutdown().await;
318
319        let today = chrono::Utc::now().date_naive();
320        let log_path = log_dir.join(log_file_name(&today));
321
322        assert!(log_path.exists(), "log file must be created");
323        let content = std::fs::read_to_string(&log_path).unwrap();
324        let parsed: serde_json::Value =
325            serde_json::from_str(content.lines().next().unwrap()).unwrap();
326        assert_eq!(parsed["tool"], "write_file");
327    }
328}