openlatch_client/core/logging/
mod.rs1pub mod daemon_log;
7
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11use tokio::sync::mpsc;
12
13pub fn log_file_name(date: &chrono::NaiveDate) -> String {
26 format!("events-{}.jsonl", date.format("%Y-%m-%d"))
27}
28
29pub fn current_log_path(log_dir: &Path) -> PathBuf {
31 let today = chrono::Utc::now().date_naive();
32 log_dir.join(log_file_name(&today))
33}
34
35pub fn append_event_sync(path: &Path, event: &serde_json::Value) -> std::io::Result<()> {
48 let line = serde_json::to_string(event).map_err(std::io::Error::other)?;
49 append_line_sync(path, &line)
50}
51
52fn append_line_sync(path: &Path, line: &str) -> std::io::Result<()> {
56 let mut file = std::fs::OpenOptions::new()
57 .create(true)
58 .append(true)
59 .open(path)?;
60 writeln!(file, "{}", line)?;
61 Ok(())
62}
63
64#[derive(Clone)]
82pub struct EventLogger {
83 tx: mpsc::Sender<String>,
84}
85
86pub struct EventLoggerHandle {
88 join_handle: tokio::task::JoinHandle<()>,
89}
90
91impl EventLogger {
92 pub fn new(log_dir: PathBuf) -> (Self, EventLoggerHandle) {
97 let (tx, rx) = mpsc::channel(1024);
98 let handle = EventLoggerHandle {
99 join_handle: tokio::spawn(writer_task(log_dir, rx)),
100 };
101 (Self { tx }, handle)
102 }
103
104 pub fn log(&self, event_json: String) {
109 if self.tx.try_send(event_json).is_err() {
111 tracing::warn!("event log channel full or closed, event dropped");
112 }
113 }
114}
115
116impl EventLoggerHandle {
117 pub async fn shutdown(self) {
122 let _ = self.join_handle.await;
123 }
124}
125
126async fn writer_task(log_dir: PathBuf, mut rx: mpsc::Receiver<String>) {
131 if let Err(e) = tokio::fs::create_dir_all(&log_dir).await {
133 tracing::error!(error = %e, "failed to create log directory");
134 return;
135 }
136 while let Some(json_line) = rx.recv().await {
137 let path = current_log_path(&log_dir);
138 if let Err(e) = append_line_sync(&path, &json_line) {
139 tracing::warn!(error = %e, "failed to append event to log");
141 }
142 }
143 tracing::debug!("event logger writer task shutting down");
144}
145
146pub fn cleanup_old_logs(log_dir: &Path, retention_days: u32) -> std::io::Result<u32> {
161 let cutoff = chrono::Utc::now().date_naive() - chrono::Duration::days(retention_days as i64);
162 let mut deleted = 0u32;
163
164 for entry in std::fs::read_dir(log_dir)? {
165 let entry = entry?;
166 let name = entry.file_name();
167 let name_str = name.to_string_lossy();
168 if let Some(date_str) = name_str
170 .strip_prefix("events-")
171 .and_then(|s| s.strip_suffix(".jsonl"))
172 {
173 if let Ok(date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
174 if date < cutoff {
175 std::fs::remove_file(entry.path())?;
176 deleted += 1;
177 }
178 }
179 }
180 }
181 Ok(deleted)
182}
183
184#[cfg(test)]
189mod tests {
190 use super::*;
191 use serde_json::json;
192 use std::io::BufRead;
193 use tempfile::TempDir;
194
195 fn make_date(year: i32, month: u32, day: u32) -> chrono::NaiveDate {
196 chrono::NaiveDate::from_ymd_opt(year, month, day).unwrap()
197 }
198
199 #[test]
201 fn test_log_file_name_format() {
202 let date = make_date(2026, 4, 7);
203 assert_eq!(log_file_name(&date), "events-2026-04-07.jsonl");
204 }
205
206 #[test]
208 fn test_append_event_sync_writes_valid_json() {
209 let dir = TempDir::new().unwrap();
210 let path = dir.path().join("test.jsonl");
211 let event = json!({"tool": "bash", "session": "abc123"});
212
213 append_event_sync(&path, &event).unwrap();
214
215 let content = std::fs::read_to_string(&path).unwrap();
216 let line = content.lines().next().unwrap();
217 let parsed: serde_json::Value = serde_json::from_str(line).unwrap();
219 assert_eq!(parsed["tool"], "bash");
220 }
221
222 #[test]
224 fn test_append_event_sync_single_line() {
225 let dir = TempDir::new().unwrap();
226 let path = dir.path().join("test.jsonl");
227 let event = json!({"key": "value"});
228
229 append_event_sync(&path, &event).unwrap();
230
231 let content = std::fs::read_to_string(&path).unwrap();
232 assert!(content.ends_with('\n'), "file must end with newline");
234 let lines: Vec<&str> = content.lines().collect();
235 assert_eq!(lines.len(), 1, "must produce exactly one line");
236 }
237
238 #[test]
240 fn test_append_event_sync_two_calls_two_lines() {
241 let dir = TempDir::new().unwrap();
242 let path = dir.path().join("test.jsonl");
243 let event1 = json!({"seq": 1});
244 let event2 = json!({"seq": 2});
245
246 append_event_sync(&path, &event1).unwrap();
247 append_event_sync(&path, &event2).unwrap();
248
249 let file = std::fs::File::open(&path).unwrap();
250 let lines: Vec<_> = std::io::BufReader::new(file)
251 .lines()
252 .collect::<Result<_, _>>()
253 .unwrap();
254 assert_eq!(lines.len(), 2, "must produce exactly two lines");
255
256 let parsed1: serde_json::Value = serde_json::from_str(&lines[0]).unwrap();
257 let parsed2: serde_json::Value = serde_json::from_str(&lines[1]).unwrap();
258 assert_eq!(parsed1["seq"], 1);
259 assert_eq!(parsed2["seq"], 2);
260 }
261
262 #[test]
264 fn test_cleanup_old_logs_retention_zero_deletes_all() {
265 let dir = TempDir::new().unwrap();
266 let yesterday = chrono::Utc::now().date_naive() - chrono::Duration::days(1);
268 let two_days_ago = chrono::Utc::now().date_naive() - chrono::Duration::days(2);
269
270 std::fs::write(dir.path().join(log_file_name(&yesterday)), b"line\n").unwrap();
271 std::fs::write(dir.path().join(log_file_name(&two_days_ago)), b"line\n").unwrap();
272
273 let deleted = cleanup_old_logs(dir.path(), 0).unwrap();
274 assert_eq!(deleted, 2, "retention_days=0 must delete all past files");
275
276 let remaining: Vec<_> = std::fs::read_dir(dir.path())
278 .unwrap()
279 .collect::<Result<_, _>>()
280 .unwrap();
281 assert!(remaining.is_empty(), "no files should remain");
282 }
283
284 #[test]
286 fn test_cleanup_old_logs_retention_keeps_recent() {
287 let dir = TempDir::new().unwrap();
288 let recent = chrono::Utc::now().date_naive() - chrono::Duration::days(5);
289 let old = chrono::Utc::now().date_naive() - chrono::Duration::days(40);
290
291 let recent_file = dir.path().join(log_file_name(&recent));
292 let old_file = dir.path().join(log_file_name(&old));
293
294 std::fs::write(&recent_file, b"line\n").unwrap();
295 std::fs::write(&old_file, b"line\n").unwrap();
296
297 let deleted = cleanup_old_logs(dir.path(), 30).unwrap();
298 assert_eq!(deleted, 1, "only the old file should be deleted");
299 assert!(recent_file.exists(), "recent file must not be deleted");
300 assert!(!old_file.exists(), "old file must be deleted");
301 }
302
303 #[tokio::test]
305 async fn test_event_logger_sends_and_appends() {
306 let dir = TempDir::new().unwrap();
307 let log_dir = dir.path().to_path_buf();
308
309 let (logger, handle) = EventLogger::new(log_dir.clone());
310
311 let event = json!({"tool": "write_file", "session": "sess_001"});
312 let event_str = serde_json::to_string(&event).unwrap();
313 logger.log(event_str);
314
315 drop(logger);
317 handle.shutdown().await;
318
319 let today = chrono::Utc::now().date_naive();
320 let log_path = log_dir.join(log_file_name(&today));
321
322 assert!(log_path.exists(), "log file must be created");
323 let content = std::fs::read_to_string(&log_path).unwrap();
324 let parsed: serde_json::Value =
325 serde_json::from_str(content.lines().next().unwrap()).unwrap();
326 assert_eq!(parsed["tool"], "write_file");
327 }
328}