Skip to main content

oo_ide/
log_store.rs

1//! Log storage for task output.
2//!
3//! Each task gets its own log file under `.oo/cache/tasks/`:
4//!
5//! ```text
6//! .oo/cache/tasks/<task_id>.log      # written incrementally while task runs
7//! .oo/cache/tasks/<task_id>.log.gz   # compressed after task finishes
8//! ```
9//!
10//! Logs are written **incrementally** as the task produces output so that a
11//! tail-viewer can follow them in real-time.  When a task finishes the file is
12//! gzip-compressed.
13//!
14//! A configurable disk quota is enforced after every compression: oldest logs
15//! (by modification time) are deleted first until total usage is within the
16//! limit.
17
18use std::io::{BufRead as _, BufReader, BufWriter};
19use std::path::{Path, PathBuf};
20use std::time::SystemTime;
21
22use anyhow::{Context, Result};
23use flate2::read::GzDecoder;
24use flate2::write::GzEncoder;
25use flate2::Compression;
26use tokio::io::AsyncWriteExt as _;
27
28use crate::task_registry::TaskId;
29
30// ---------------------------------------------------------------------------
31// Constants
32// ---------------------------------------------------------------------------
33
34/// Default maximum total log storage: 500 MiB.
35pub const DEFAULT_QUOTA_BYTES: u64 = 500 * 1024 * 1024;
36
37// ---------------------------------------------------------------------------
38// LogStore
39// ---------------------------------------------------------------------------
40
41/// Manages the on-disk log storage directory and disk-quota policy.
42///
43/// Cheap to clone — only holds a `PathBuf` and a `u64`.
44#[derive(Clone, Debug)]
45pub struct LogStore {
46    /// Directory where task logs are stored.
47    pub dir: PathBuf,
48    /// Maximum total size of all logs before oldest-first eviction.
49    pub quota_bytes: u64,
50}
51
52/// Metadata about a single stored log file.
53#[derive(Debug, Clone)]
54pub struct LogEntry {
55    /// Task ID parsed from the file name.  `None` for unrecognised files.
56    pub task_id: Option<TaskId>,
57    pub path: PathBuf,
58    pub size_bytes: u64,
59    pub modified_at: SystemTime,
60    pub compressed: bool,
61}
62
63impl LogStore {
64    /// Create a new `LogStore` pointing at `dir` with the given quota.
65    ///
66    /// The directory is created lazily on first write.
67    pub fn new(dir: PathBuf, quota_bytes: u64) -> Self {
68        Self { dir, quota_bytes }
69    }
70
71    // ------------------------------------------------------------------
72    // Path helpers
73    // ------------------------------------------------------------------
74
75    /// Uncompressed log path for `task_id`.
76    pub fn log_path(&self, task_id: TaskId) -> PathBuf {
77        self.dir.join(format!("{}.log", task_id.0))
78    }
79
80    /// Compressed log path for `task_id`.
81    pub fn gz_path(&self, task_id: TaskId) -> PathBuf {
82        self.dir.join(format!("{}.log.gz", task_id.0))
83    }
84
85    // ------------------------------------------------------------------
86    // Writer
87    // ------------------------------------------------------------------
88
89    /// Open a new [`LogWriter`] for `task_id`, creating the directory if
90    /// needed.  Any pre-existing log for this task ID is overwritten.
91    pub async fn open_writer(&self, task_id: TaskId) -> Result<LogWriter> {
92        tokio::fs::create_dir_all(&self.dir)
93            .await
94            .with_context(|| format!("create log dir {:?}", self.dir))?;
95
96        let path = self.log_path(task_id);
97        let file = tokio::fs::File::create(&path)
98            .await
99            .with_context(|| format!("create log file {:?}", path))?;
100
101        Ok(LogWriter {
102            path,
103            writer: tokio::io::BufWriter::new(file),
104        })
105    }
106
107    // ------------------------------------------------------------------
108    // Directory listing
109    // ------------------------------------------------------------------
110
111    /// List all log entries (`.log` and `.log.gz`) in the store directory.
112    ///
113    /// Returns an empty `Vec` if the directory does not yet exist.
114    pub async fn list_entries(&self) -> Result<Vec<LogEntry>> {
115        if !self.dir.exists() {
116            return Ok(vec![]);
117        }
118
119        let mut read_dir = tokio::fs::read_dir(&self.dir)
120            .await
121            .with_context(|| format!("read log dir {:?}", self.dir))?;
122
123        let mut entries = Vec::new();
124        while let Some(entry) = read_dir.next_entry().await? {
125            let path = entry.path();
126            let name = match path.file_name().and_then(|n| n.to_str()) {
127                Some(n) => n.to_string(),
128                None => continue,
129            };
130
131            let (task_id, compressed) = if let Some(id_str) = name.strip_suffix(".log.gz") {
132                (id_str.parse::<u64>().ok().map(TaskId), true)
133            } else if let Some(id_str) = name.strip_suffix(".log") {
134                (id_str.parse::<u64>().ok().map(TaskId), false)
135            } else {
136                continue;
137            };
138
139            let meta = match entry.metadata().await {
140                Ok(m) => m,
141                Err(_) => continue,
142            };
143
144            entries.push(LogEntry {
145                task_id,
146                path,
147                size_bytes: meta.len(),
148                modified_at: meta.modified().unwrap_or(SystemTime::UNIX_EPOCH),
149                compressed,
150            });
151        }
152
153        Ok(entries)
154    }
155
156    /// Total disk usage across all log files.
157    pub async fn total_size(&self) -> Result<u64> {
158        Ok(self.list_entries().await?.iter().map(|e| e.size_bytes).sum())
159    }
160
161    // ------------------------------------------------------------------
162    // Quota enforcement
163    // ------------------------------------------------------------------
164
165    /// Delete oldest log files until total usage is within [`Self::quota_bytes`].
166    ///
167    /// Returns the paths of all deleted files.
168    pub async fn enforce_quota(&self) -> Result<Vec<PathBuf>> {
169        let mut entries = self.list_entries().await?;
170        let mut total: u64 = entries.iter().map(|e| e.size_bytes).sum();
171        if total <= self.quota_bytes {
172            return Ok(vec![]);
173        }
174
175        entries.sort_by_key(|e| e.modified_at);
176
177        let mut deleted = Vec::new();
178        for entry in &entries {
179            if total <= self.quota_bytes {
180                break;
181            }
182            if tokio::fs::remove_file(&entry.path).await.is_ok() {
183                total = total.saturating_sub(entry.size_bytes);
184                deleted.push(entry.path.clone());
185            }
186        }
187        Ok(deleted)
188    }
189
190    // ------------------------------------------------------------------
191    // Reading
192    // ------------------------------------------------------------------
193
194    /// Read all lines from a stored log.
195    ///
196    /// Prefers `.log.gz` if present; falls back to `.log`.  Returns an empty
197    /// `Vec` if neither exists.
198    pub async fn read_log(&self, task_id: TaskId) -> Result<Vec<String>> {
199        let gz = self.gz_path(task_id);
200        if gz.exists() {
201            return tokio::task::spawn_blocking(move || read_gz_lines(&gz))
202                .await
203                .context("spawn_blocking for gz read")?;
204        }
205
206        let plain = self.log_path(task_id);
207        if plain.exists() {
208            let content = tokio::fs::read_to_string(&plain)
209                .await
210                .with_context(|| format!("read log {:?}", plain))?;
211            return Ok(content.lines().map(|l| l.to_string()).collect());
212        }
213
214        Ok(vec![])
215    }
216}
217
218// ---------------------------------------------------------------------------
219// LogWriter
220// ---------------------------------------------------------------------------
221
222/// An open, buffered writer for a single task's log file.
223///
224/// Obtained via [`LogStore::open_writer`].
225pub struct LogWriter {
226    /// Absolute path to the open `.log` file.
227    pub path: PathBuf,
228    writer: tokio::io::BufWriter<tokio::fs::File>,
229}
230
231impl LogWriter {
232    /// Append one line of plain text (the `text` field of a `StyledLine`).
233    ///
234    /// A newline is appended automatically.
235    pub async fn append_line(&mut self, text: &str) -> Result<()> {
236        self.writer.write_all(text.as_bytes()).await.context("log write")?;
237        self.writer.write_all(b"\n").await.context("log write newline")?;
238        Ok(())
239    }
240
241    /// Flush and close the log file without compression.
242    ///
243    /// Returns the path of the log file.
244    pub async fn close(mut self) -> Result<PathBuf> {
245        self.writer.flush().await.context("log flush")?;
246        Ok(self.path)
247    }
248
249    /// Flush, close, gzip-compress, delete the original `.log`, then enforce
250    /// the store's disk quota.
251    ///
252    /// Returns the path of the compressed `.log.gz` file.
253    pub async fn close_and_compress(mut self, store: &LogStore) -> Result<PathBuf> {
254        self.writer.flush().await.context("log flush before compress")?;
255
256        let log_path = self.path.clone();
257
258        // Derive the .log.gz path from the .log stem.
259        let stem = log_path
260            .file_stem()
261            .and_then(|s| s.to_str())
262            .unwrap_or("unknown");
263        let gz_path = store.dir.join(format!("{stem}.log.gz"));
264
265        // Drop the writer to ensure the OS file handle is closed before we
266        // open it again for reading inside spawn_blocking.
267        drop(self.writer);
268
269        let lp = log_path.clone();
270        let gp = gz_path.clone();
271        tokio::task::spawn_blocking(move || compress_sync(&lp, &gp))
272            .await
273            .context("spawn_blocking for compression")??;
274
275        store.enforce_quota().await?;
276
277        Ok(gz_path)
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Sync helpers (run inside spawn_blocking)
283// ---------------------------------------------------------------------------
284
285fn compress_sync(src: &Path, dst: &Path) -> Result<()> {
286    let input =
287        std::fs::File::open(src).with_context(|| format!("open for compress {:?}", src))?;
288    let output =
289        std::fs::File::create(dst).with_context(|| format!("create gz {:?}", dst))?;
290    let mut reader = BufReader::new(input);
291    let mut encoder = GzEncoder::new(BufWriter::new(output), Compression::default());
292    std::io::copy(&mut reader, &mut encoder).context("compress: io::copy")?;
293    encoder.finish().context("compress: gz finish")?;
294    // Remove the original only after successful compression so we don't lose
295    // data if compression fails mid-stream.
296    std::fs::remove_file(src)
297        .with_context(|| format!("remove after compress {:?}", src))?;
298    Ok(())
299}
300
301fn read_gz_lines(path: &Path) -> Result<Vec<String>> {
302    let file =
303        std::fs::File::open(path).with_context(|| format!("open gz {:?}", path))?;
304    let decoder = GzDecoder::new(BufReader::new(file));
305    let reader = BufReader::new(decoder);
306    let mut lines = Vec::new();
307    for line in reader.lines() {
308        lines.push(line.context("read gz line")?);
309    }
310    Ok(lines)
311}
312
313// ---------------------------------------------------------------------------
314// Tests
315// ---------------------------------------------------------------------------
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320    use std::time::Duration;
321
322    fn tmp_store(name: &str) -> LogStore {
323        let dir = std::env::temp_dir().join(format!("oo_log_test_{name}"));
324        // Clean up any run from a previous test invocation.
325        let _ = std::fs::remove_dir_all(&dir);
326        LogStore::new(dir, DEFAULT_QUOTA_BYTES)
327    }
328
329    fn task(n: u64) -> TaskId {
330        TaskId(n)
331    }
332
333    #[tokio::test]
334    async fn creates_dir_and_file() {
335        let store = tmp_store("creates_dir");
336        assert!(!store.dir.exists(), "dir should not exist yet");
337        let writer = store.open_writer(task(1)).await.unwrap();
338        assert!(store.dir.exists(), "dir should be created on open");
339        assert!(writer.path.exists(), "log file should be created");
340        writer.close().await.unwrap();
341    }
342
343    #[tokio::test]
344    async fn append_and_close_writes_lines() {
345        let store = tmp_store("append_close");
346        let mut writer = store.open_writer(task(2)).await.unwrap();
347        writer.append_line("hello").await.unwrap();
348        writer.append_line("world").await.unwrap();
349        let path = writer.close().await.unwrap();
350        let content = std::fs::read_to_string(&path).unwrap();
351        assert_eq!(content, "hello\nworld\n");
352    }
353
354    #[tokio::test]
355    async fn list_entries_nonexistent_dir() {
356        let store = tmp_store("list_nonexistent");
357        let entries = store.list_entries().await.unwrap();
358        assert!(entries.is_empty());
359    }
360
361    #[tokio::test]
362    async fn list_entries_sees_log_file() {
363        let store = tmp_store("list_log");
364        let mut w = store.open_writer(task(10)).await.unwrap();
365        w.append_line("test").await.unwrap();
366        w.close().await.unwrap();
367
368        let entries = store.list_entries().await.unwrap();
369        assert_eq!(entries.len(), 1);
370        assert_eq!(entries[0].task_id, Some(task(10)));
371        assert!(!entries[0].compressed);
372    }
373
374    #[tokio::test]
375    async fn compress_and_read_back() {
376        let store = tmp_store("compress_read");
377        let mut w = store.open_writer(task(20)).await.unwrap();
378        for i in 0..10 {
379            w.append_line(&format!("line {i}")).await.unwrap();
380        }
381        let gz = w.close_and_compress(&store).await.unwrap();
382
383        assert!(gz.exists(), ".log.gz should exist");
384        assert!(
385            !store.log_path(task(20)).exists(),
386            "original .log should be removed after compression"
387        );
388
389        let lines = store.read_log(task(20)).await.unwrap();
390        assert_eq!(lines.len(), 10);
391        assert_eq!(lines[0], "line 0");
392        assert_eq!(lines[9], "line 9");
393    }
394
395    #[tokio::test]
396    async fn read_log_plain() {
397        let store = tmp_store("read_plain");
398        let mut w = store.open_writer(task(30)).await.unwrap();
399        w.append_line("alpha").await.unwrap();
400        w.close().await.unwrap();
401
402        let lines = store.read_log(task(30)).await.unwrap();
403        assert_eq!(lines, vec!["alpha"]);
404    }
405
406    #[tokio::test]
407    async fn read_log_missing_returns_empty() {
408        let store = tmp_store("read_missing");
409        let lines = store.read_log(task(99)).await.unwrap();
410        assert!(lines.is_empty());
411    }
412
413    #[tokio::test]
414    async fn list_entries_sees_compressed_file() {
415        let store = tmp_store("list_compressed");
416        let mut w = store.open_writer(task(50)).await.unwrap();
417        w.append_line("compressed data").await.unwrap();
418        w.close_and_compress(&store).await.unwrap();
419
420        let entries = store.list_entries().await.unwrap();
421        assert_eq!(entries.len(), 1);
422        assert!(entries[0].compressed);
423        assert_eq!(entries[0].task_id, Some(task(50)));
424    }
425
426    #[tokio::test]
427    async fn total_size_is_sum_of_entries() {
428        let store = tmp_store("total_size");
429        let mut w = store.open_writer(task(60)).await.unwrap();
430        w.append_line("some content").await.unwrap();
431        w.close().await.unwrap();
432
433        let total = store.total_size().await.unwrap();
434        assert!(total > 0);
435        let sum: u64 = store.list_entries().await.unwrap().iter().map(|e| e.size_bytes).sum();
436        assert_eq!(total, sum);
437    }
438
439    #[tokio::test]
440    async fn enforce_quota_deletes_oldest() {
441        let store = LogStore::new(
442            std::env::temp_dir().join("oo_log_test_quota"),
443            1, // 1-byte quota: any file exceeds it
444        );
445        let _ = std::fs::remove_dir_all(&store.dir);
446
447        let mut w1 = store.open_writer(task(100)).await.unwrap();
448        w1.append_line("file one").await.unwrap();
449        w1.close().await.unwrap();
450
451        // Allow mtime to differ on fast filesystems.
452        tokio::time::sleep(Duration::from_millis(20)).await;
453
454        let mut w2 = store.open_writer(task(101)).await.unwrap();
455        w2.append_line("file two").await.unwrap();
456        w2.close().await.unwrap();
457
458        let deleted = store.enforce_quota().await.unwrap();
459        assert!(!deleted.is_empty(), "expected at least one deletion");
460        // Oldest file (task 100) must be deleted first.
461        assert!(
462            deleted[0].to_string_lossy().contains("100"),
463            "oldest file should be deleted first; got {:?}",
464            deleted
465        );
466    }
467
468    #[tokio::test]
469    async fn multiple_writers_are_independent() {
470        let store = tmp_store("multi_writer");
471        let mut w1 = store.open_writer(task(200)).await.unwrap();
472        let mut w2 = store.open_writer(task(201)).await.unwrap();
473        w1.append_line("task 200").await.unwrap();
474        w2.append_line("task 201").await.unwrap();
475        w1.close().await.unwrap();
476        w2.close().await.unwrap();
477
478        let l1 = store.read_log(task(200)).await.unwrap();
479        let l2 = store.read_log(task(201)).await.unwrap();
480        assert_eq!(l1, vec!["task 200"]);
481        assert_eq!(l2, vec!["task 201"]);
482    }
483}