1use std::io::{BufRead as _, BufReader, BufWriter};
19use std::path::{Path, PathBuf};
20use std::time::SystemTime;
21
22use anyhow::{Context, Result};
23use flate2::read::GzDecoder;
24use flate2::write::GzEncoder;
25use flate2::Compression;
26use tokio::io::AsyncWriteExt as _;
27
28use crate::task_registry::TaskId;
29
30pub const DEFAULT_QUOTA_BYTES: u64 = 500 * 1024 * 1024;
36
37#[derive(Clone, Debug)]
45pub struct LogStore {
46 pub dir: PathBuf,
48 pub quota_bytes: u64,
50}
51
52#[derive(Debug, Clone)]
54pub struct LogEntry {
55 pub task_id: Option<TaskId>,
57 pub path: PathBuf,
58 pub size_bytes: u64,
59 pub modified_at: SystemTime,
60 pub compressed: bool,
61}
62
63impl LogStore {
64 pub fn new(dir: PathBuf, quota_bytes: u64) -> Self {
68 Self { dir, quota_bytes }
69 }
70
71 pub fn log_path(&self, task_id: TaskId) -> PathBuf {
77 self.dir.join(format!("{}.log", task_id.0))
78 }
79
80 pub fn gz_path(&self, task_id: TaskId) -> PathBuf {
82 self.dir.join(format!("{}.log.gz", task_id.0))
83 }
84
85 pub async fn open_writer(&self, task_id: TaskId) -> Result<LogWriter> {
92 tokio::fs::create_dir_all(&self.dir)
93 .await
94 .with_context(|| format!("create log dir {:?}", self.dir))?;
95
96 let path = self.log_path(task_id);
97 let file = tokio::fs::File::create(&path)
98 .await
99 .with_context(|| format!("create log file {:?}", path))?;
100
101 Ok(LogWriter {
102 path,
103 writer: tokio::io::BufWriter::new(file),
104 })
105 }
106
107 pub async fn list_entries(&self) -> Result<Vec<LogEntry>> {
115 if !self.dir.exists() {
116 return Ok(vec![]);
117 }
118
119 let mut read_dir = tokio::fs::read_dir(&self.dir)
120 .await
121 .with_context(|| format!("read log dir {:?}", self.dir))?;
122
123 let mut entries = Vec::new();
124 while let Some(entry) = read_dir.next_entry().await? {
125 let path = entry.path();
126 let name = match path.file_name().and_then(|n| n.to_str()) {
127 Some(n) => n.to_string(),
128 None => continue,
129 };
130
131 let (task_id, compressed) = if let Some(id_str) = name.strip_suffix(".log.gz") {
132 (id_str.parse::<u64>().ok().map(TaskId), true)
133 } else if let Some(id_str) = name.strip_suffix(".log") {
134 (id_str.parse::<u64>().ok().map(TaskId), false)
135 } else {
136 continue;
137 };
138
139 let meta = match entry.metadata().await {
140 Ok(m) => m,
141 Err(_) => continue,
142 };
143
144 entries.push(LogEntry {
145 task_id,
146 path,
147 size_bytes: meta.len(),
148 modified_at: meta.modified().unwrap_or(SystemTime::UNIX_EPOCH),
149 compressed,
150 });
151 }
152
153 Ok(entries)
154 }
155
156 pub async fn total_size(&self) -> Result<u64> {
158 Ok(self.list_entries().await?.iter().map(|e| e.size_bytes).sum())
159 }
160
161 pub async fn enforce_quota(&self) -> Result<Vec<PathBuf>> {
169 let mut entries = self.list_entries().await?;
170 let mut total: u64 = entries.iter().map(|e| e.size_bytes).sum();
171 if total <= self.quota_bytes {
172 return Ok(vec![]);
173 }
174
175 entries.sort_by_key(|e| e.modified_at);
176
177 let mut deleted = Vec::new();
178 for entry in &entries {
179 if total <= self.quota_bytes {
180 break;
181 }
182 if tokio::fs::remove_file(&entry.path).await.is_ok() {
183 total = total.saturating_sub(entry.size_bytes);
184 deleted.push(entry.path.clone());
185 }
186 }
187 Ok(deleted)
188 }
189
190 pub async fn read_log(&self, task_id: TaskId) -> Result<Vec<String>> {
199 let gz = self.gz_path(task_id);
200 if gz.exists() {
201 return tokio::task::spawn_blocking(move || read_gz_lines(&gz))
202 .await
203 .context("spawn_blocking for gz read")?;
204 }
205
206 let plain = self.log_path(task_id);
207 if plain.exists() {
208 let content = tokio::fs::read_to_string(&plain)
209 .await
210 .with_context(|| format!("read log {:?}", plain))?;
211 return Ok(content.lines().map(|l| l.to_string()).collect());
212 }
213
214 Ok(vec![])
215 }
216}
217
218pub struct LogWriter {
226 pub path: PathBuf,
228 writer: tokio::io::BufWriter<tokio::fs::File>,
229}
230
231impl LogWriter {
232 pub async fn append_line(&mut self, text: &str) -> Result<()> {
236 self.writer.write_all(text.as_bytes()).await.context("log write")?;
237 self.writer.write_all(b"\n").await.context("log write newline")?;
238 Ok(())
239 }
240
241 pub async fn close(mut self) -> Result<PathBuf> {
245 self.writer.flush().await.context("log flush")?;
246 Ok(self.path)
247 }
248
249 pub async fn close_and_compress(mut self, store: &LogStore) -> Result<PathBuf> {
254 self.writer.flush().await.context("log flush before compress")?;
255
256 let log_path = self.path.clone();
257
258 let stem = log_path
260 .file_stem()
261 .and_then(|s| s.to_str())
262 .unwrap_or("unknown");
263 let gz_path = store.dir.join(format!("{stem}.log.gz"));
264
265 drop(self.writer);
268
269 let lp = log_path.clone();
270 let gp = gz_path.clone();
271 tokio::task::spawn_blocking(move || compress_sync(&lp, &gp))
272 .await
273 .context("spawn_blocking for compression")??;
274
275 store.enforce_quota().await?;
276
277 Ok(gz_path)
278 }
279}
280
281fn compress_sync(src: &Path, dst: &Path) -> Result<()> {
286 let input =
287 std::fs::File::open(src).with_context(|| format!("open for compress {:?}", src))?;
288 let output =
289 std::fs::File::create(dst).with_context(|| format!("create gz {:?}", dst))?;
290 let mut reader = BufReader::new(input);
291 let mut encoder = GzEncoder::new(BufWriter::new(output), Compression::default());
292 std::io::copy(&mut reader, &mut encoder).context("compress: io::copy")?;
293 encoder.finish().context("compress: gz finish")?;
294 std::fs::remove_file(src)
297 .with_context(|| format!("remove after compress {:?}", src))?;
298 Ok(())
299}
300
301fn read_gz_lines(path: &Path) -> Result<Vec<String>> {
302 let file =
303 std::fs::File::open(path).with_context(|| format!("open gz {:?}", path))?;
304 let decoder = GzDecoder::new(BufReader::new(file));
305 let reader = BufReader::new(decoder);
306 let mut lines = Vec::new();
307 for line in reader.lines() {
308 lines.push(line.context("read gz line")?);
309 }
310 Ok(lines)
311}
312
313#[cfg(test)]
318mod tests {
319 use super::*;
320 use std::time::Duration;
321
322 fn tmp_store(name: &str) -> LogStore {
323 let dir = std::env::temp_dir().join(format!("oo_log_test_{name}"));
324 let _ = std::fs::remove_dir_all(&dir);
326 LogStore::new(dir, DEFAULT_QUOTA_BYTES)
327 }
328
329 fn task(n: u64) -> TaskId {
330 TaskId(n)
331 }
332
333 #[tokio::test]
334 async fn creates_dir_and_file() {
335 let store = tmp_store("creates_dir");
336 assert!(!store.dir.exists(), "dir should not exist yet");
337 let writer = store.open_writer(task(1)).await.unwrap();
338 assert!(store.dir.exists(), "dir should be created on open");
339 assert!(writer.path.exists(), "log file should be created");
340 writer.close().await.unwrap();
341 }
342
343 #[tokio::test]
344 async fn append_and_close_writes_lines() {
345 let store = tmp_store("append_close");
346 let mut writer = store.open_writer(task(2)).await.unwrap();
347 writer.append_line("hello").await.unwrap();
348 writer.append_line("world").await.unwrap();
349 let path = writer.close().await.unwrap();
350 let content = std::fs::read_to_string(&path).unwrap();
351 assert_eq!(content, "hello\nworld\n");
352 }
353
354 #[tokio::test]
355 async fn list_entries_nonexistent_dir() {
356 let store = tmp_store("list_nonexistent");
357 let entries = store.list_entries().await.unwrap();
358 assert!(entries.is_empty());
359 }
360
361 #[tokio::test]
362 async fn list_entries_sees_log_file() {
363 let store = tmp_store("list_log");
364 let mut w = store.open_writer(task(10)).await.unwrap();
365 w.append_line("test").await.unwrap();
366 w.close().await.unwrap();
367
368 let entries = store.list_entries().await.unwrap();
369 assert_eq!(entries.len(), 1);
370 assert_eq!(entries[0].task_id, Some(task(10)));
371 assert!(!entries[0].compressed);
372 }
373
374 #[tokio::test]
375 async fn compress_and_read_back() {
376 let store = tmp_store("compress_read");
377 let mut w = store.open_writer(task(20)).await.unwrap();
378 for i in 0..10 {
379 w.append_line(&format!("line {i}")).await.unwrap();
380 }
381 let gz = w.close_and_compress(&store).await.unwrap();
382
383 assert!(gz.exists(), ".log.gz should exist");
384 assert!(
385 !store.log_path(task(20)).exists(),
386 "original .log should be removed after compression"
387 );
388
389 let lines = store.read_log(task(20)).await.unwrap();
390 assert_eq!(lines.len(), 10);
391 assert_eq!(lines[0], "line 0");
392 assert_eq!(lines[9], "line 9");
393 }
394
395 #[tokio::test]
396 async fn read_log_plain() {
397 let store = tmp_store("read_plain");
398 let mut w = store.open_writer(task(30)).await.unwrap();
399 w.append_line("alpha").await.unwrap();
400 w.close().await.unwrap();
401
402 let lines = store.read_log(task(30)).await.unwrap();
403 assert_eq!(lines, vec!["alpha"]);
404 }
405
406 #[tokio::test]
407 async fn read_log_missing_returns_empty() {
408 let store = tmp_store("read_missing");
409 let lines = store.read_log(task(99)).await.unwrap();
410 assert!(lines.is_empty());
411 }
412
413 #[tokio::test]
414 async fn list_entries_sees_compressed_file() {
415 let store = tmp_store("list_compressed");
416 let mut w = store.open_writer(task(50)).await.unwrap();
417 w.append_line("compressed data").await.unwrap();
418 w.close_and_compress(&store).await.unwrap();
419
420 let entries = store.list_entries().await.unwrap();
421 assert_eq!(entries.len(), 1);
422 assert!(entries[0].compressed);
423 assert_eq!(entries[0].task_id, Some(task(50)));
424 }
425
426 #[tokio::test]
427 async fn total_size_is_sum_of_entries() {
428 let store = tmp_store("total_size");
429 let mut w = store.open_writer(task(60)).await.unwrap();
430 w.append_line("some content").await.unwrap();
431 w.close().await.unwrap();
432
433 let total = store.total_size().await.unwrap();
434 assert!(total > 0);
435 let sum: u64 = store.list_entries().await.unwrap().iter().map(|e| e.size_bytes).sum();
436 assert_eq!(total, sum);
437 }
438
439 #[tokio::test]
440 async fn enforce_quota_deletes_oldest() {
441 let store = LogStore::new(
442 std::env::temp_dir().join("oo_log_test_quota"),
443 1, );
445 let _ = std::fs::remove_dir_all(&store.dir);
446
447 let mut w1 = store.open_writer(task(100)).await.unwrap();
448 w1.append_line("file one").await.unwrap();
449 w1.close().await.unwrap();
450
451 tokio::time::sleep(Duration::from_millis(20)).await;
453
454 let mut w2 = store.open_writer(task(101)).await.unwrap();
455 w2.append_line("file two").await.unwrap();
456 w2.close().await.unwrap();
457
458 let deleted = store.enforce_quota().await.unwrap();
459 assert!(!deleted.is_empty(), "expected at least one deletion");
460 assert!(
462 deleted[0].to_string_lossy().contains("100"),
463 "oldest file should be deleted first; got {:?}",
464 deleted
465 );
466 }
467
468 #[tokio::test]
469 async fn multiple_writers_are_independent() {
470 let store = tmp_store("multi_writer");
471 let mut w1 = store.open_writer(task(200)).await.unwrap();
472 let mut w2 = store.open_writer(task(201)).await.unwrap();
473 w1.append_line("task 200").await.unwrap();
474 w2.append_line("task 201").await.unwrap();
475 w1.close().await.unwrap();
476 w2.close().await.unwrap();
477
478 let l1 = store.read_log(task(200)).await.unwrap();
479 let l2 = store.read_log(task(201)).await.unwrap();
480 assert_eq!(l1, vec!["task 200"]);
481 assert_eq!(l2, vec!["task 201"]);
482 }
483}