1use super::types::{StructuredError, UseEvent};
27use chrono::Utc;
28use crossbeam_channel::Receiver;
29use fs2::FileExt;
30use std::fs::{self, File, OpenOptions};
31use std::io::{BufRead, BufReader, BufWriter, Write};
32use std::path::{Path, PathBuf};
33
34pub struct UsesWriter {
37 uses_dir: PathBuf,
38}
39
40impl UsesWriter {
41 #[must_use]
43 pub fn new(uses_dir: PathBuf) -> Self {
44 Self { uses_dir }
45 }
46
47 pub fn run(self, receiver: &Receiver<UseEvent>) {
52 let events_dir = self.uses_dir.join("events");
54 if let Err(e) = fs::create_dir_all(&events_dir) {
55 log::warn!("Failed to create uses events directory: {e}");
56 }
58
59 while let Ok(event) = receiver.recv() {
61 if let Err(e) = Self::write_event(&events_dir, &event) {
62 log::debug!("Failed to write use event: {e}");
63 }
65 }
66 }
67
68 fn write_event(events_dir: &Path, event: &UseEvent) -> std::io::Result<()> {
70 let date = event.timestamp.format("%Y-%m-%d");
71 let file_path = events_dir.join(format!("{date}.jsonl"));
72
73 let file = OpenOptions::new()
77 .create(true)
78 .read(true)
79 .append(true)
80 .open(&file_path)?;
81
82 file.lock_exclusive()?;
84
85 let mut writer = BufWriter::new(&file);
86 serde_json::to_writer(&mut writer, event)?;
87 writeln!(writer)?;
88 writer.flush()?;
89
90 file.unlock()?;
91
92 Ok(())
93 }
94}
95
96pub struct UsesStorage {
98 uses_dir: PathBuf,
99}
100
101impl UsesStorage {
102 #[must_use]
104 pub fn new(uses_dir: PathBuf) -> Self {
105 Self { uses_dir }
106 }
107
108 #[must_use]
110 pub fn events_dir(&self) -> PathBuf {
111 self.uses_dir.join("events")
112 }
113
114 #[must_use]
116 pub fn summaries_dir(&self) -> PathBuf {
117 self.uses_dir.join("summaries")
118 }
119
120 #[must_use]
122 pub fn troubleshoot_dir(&self) -> PathBuf {
123 self.uses_dir.join("troubleshoot")
124 }
125
126 #[must_use]
128 pub fn errors_dir(&self) -> PathBuf {
129 self.uses_dir.join("errors")
130 }
131
132 pub fn ensure_directories(&self) -> std::io::Result<()> {
138 fs::create_dir_all(self.events_dir())?;
139 fs::create_dir_all(self.summaries_dir())?;
140 fs::create_dir_all(self.troubleshoot_dir())?;
141 fs::create_dir_all(self.errors_dir())?;
142 Ok(())
143 }
144
145 pub fn load_events_for_date(&self, date: &str) -> std::io::Result<(Vec<UseEvent>, usize)> {
162 let file_path = self.events_dir().join(format!("{date}.jsonl"));
163 load_events_from_file(&file_path)
164 }
165
166 pub fn load_events_for_range(
183 &self,
184 start_date: &str,
185 end_date: &str,
186 ) -> std::io::Result<(Vec<UseEvent>, usize)> {
187 let events_dir = self.events_dir();
188 let mut all_events = Vec::new();
189 let mut total_skipped = 0;
190
191 if !events_dir.exists() {
193 return Ok((all_events, total_skipped));
194 }
195
196 for entry in fs::read_dir(&events_dir)? {
197 let entry = entry?;
198 let path = entry.path();
199
200 if let Some(ext) = path.extension()
201 && ext == "jsonl"
202 && let Some(stem) = path.file_stem()
203 {
204 let date = stem.to_string_lossy();
205 if date.as_ref() >= start_date && date.as_ref() <= end_date {
207 match load_events_from_file(&path) {
208 Ok((events, skipped)) => {
209 all_events.extend(events);
210 total_skipped += skipped;
211 }
212 Err(e) => {
213 log::debug!("Failed to load events from {}: {}", path.display(), e);
214 }
215 }
216 }
217 }
218 }
219
220 all_events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
222
223 Ok((all_events, total_skipped))
224 }
225
226 pub fn load_recent_events(&self, days: u32) -> std::io::Result<(Vec<UseEvent>, usize)> {
240 let now = Utc::now();
241 let start = now - chrono::Duration::days(i64::from(days));
242 let start_date = start.format("%Y-%m-%d").to_string();
243 let end_date = now.format("%Y-%m-%d").to_string();
244
245 self.load_events_for_range(&start_date, &end_date)
246 }
247
248 pub fn prune_old_events(&self, retain_days: u32) -> std::io::Result<usize> {
262 let events_dir = self.events_dir();
263 let cutoff = Utc::now() - chrono::Duration::days(i64::from(retain_days));
264 let cutoff_date = cutoff.format("%Y-%m-%d").to_string();
265 let mut deleted = 0;
266
267 if !events_dir.exists() {
268 return Ok(0);
269 }
270
271 for entry in fs::read_dir(&events_dir)? {
272 let entry = entry?;
273 let path = entry.path();
274
275 if let Some(ext) = path.extension()
276 && ext == "jsonl"
277 && let Some(stem) = path.file_stem()
278 {
279 let date = stem.to_string_lossy();
280 if date.as_ref() < cutoff_date.as_str() {
281 if let Err(e) = fs::remove_file(&path) {
282 log::debug!("Failed to delete old event file {}: {}", path.display(), e);
283 } else {
284 deleted += 1;
285 }
286 }
287 }
288 }
289
290 Ok(deleted)
291 }
292
293 pub fn atomic_write(target: &Path, content: &[u8]) -> std::io::Result<()> {
306 use tempfile::NamedTempFile;
307
308 let parent = target.parent().unwrap_or(Path::new("."));
310 fs::create_dir_all(parent)?;
311
312 let mut temp = NamedTempFile::new_in(parent)?;
313 temp.as_file_mut().write_all(content)?;
314 temp.as_file().sync_all()?;
315
316 temp.persist(target)?;
318
319 Ok(())
320 }
321
322 pub fn write_summary(&self, week: &str, content: &[u8]) -> std::io::Result<()> {
333 let path = self.summaries_dir().join(format!("{week}.json"));
334 Self::atomic_write(&path, content)
335 }
336
337 pub fn read_summary(&self, week: &str) -> std::io::Result<Vec<u8>> {
351 let path = self.summaries_dir().join(format!("{week}.json"));
352 fs::read(path)
353 }
354
355 #[must_use]
357 pub fn summary_exists(&self, week: &str) -> bool {
358 self.summaries_dir().join(format!("{week}.json")).exists()
359 }
360
361 pub fn load_recent_errors(&self, days: u32) -> std::io::Result<(Vec<StructuredError>, usize)> {
377 let errors_dir = self.errors_dir();
378 let mut all_errors = Vec::new();
379 let mut total_skipped = 0;
380
381 if !errors_dir.exists() {
382 return Ok((all_errors, total_skipped));
383 }
384
385 let now = Utc::now();
386 let cutoff = now - chrono::Duration::days(i64::from(days));
387 let cutoff_date = cutoff.format("%Y-%m-%d").to_string();
388 let end_date = now.format("%Y-%m-%d").to_string();
389
390 for entry in fs::read_dir(&errors_dir)? {
391 let entry = entry?;
392 let path = entry.path();
393
394 if let Some(ext) = path.extension()
395 && ext == "jsonl"
396 && let Some(stem) = path.file_stem()
397 {
398 let date = stem.to_string_lossy();
399 if date.as_ref() >= cutoff_date.as_str() && date.as_ref() <= end_date.as_str() {
401 match load_errors_from_file(&path) {
402 Ok((errors, skipped)) => {
403 all_errors.extend(errors);
404 total_skipped += skipped;
405 }
406 Err(e) => {
407 log::debug!("Failed to load errors from {}: {}", path.display(), e);
408 }
409 }
410 }
411 }
412 }
413
414 all_errors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
416
417 Ok((all_errors, total_skipped))
418 }
419
420 pub fn record_error(&self, error: &StructuredError) -> std::io::Result<()> {
432 let errors_dir = self.errors_dir();
433 fs::create_dir_all(&errors_dir)?;
434
435 let date = error.timestamp.format("%Y-%m-%d");
436 let file_path = errors_dir.join(format!("{date}.jsonl"));
437
438 let file = OpenOptions::new()
442 .create(true)
443 .read(true)
444 .append(true)
445 .open(&file_path)?;
446
447 file.lock_exclusive()?;
449
450 let mut writer = BufWriter::new(&file);
451 serde_json::to_writer(&mut writer, error)?;
452 writeln!(writer)?;
453 writer.flush()?;
454
455 file.unlock()?;
456
457 Ok(())
458 }
459}
460
461fn load_events_from_file(path: &Path) -> std::io::Result<(Vec<UseEvent>, usize)> {
463 let file = File::open(path)?;
464 let reader = BufReader::new(file);
465 let mut events = Vec::new();
466 let mut skipped = 0;
467
468 for line in reader.lines() {
469 match line {
470 Ok(line_content) => {
471 if line_content.trim().is_empty() {
472 continue;
473 }
474 match serde_json::from_str(&line_content) {
475 Ok(event) => events.push(event),
476 Err(_) => skipped += 1,
477 }
478 }
479 Err(_) => skipped += 1,
480 }
481 }
482
483 Ok((events, skipped))
484}
485
486fn load_errors_from_file(path: &Path) -> std::io::Result<(Vec<StructuredError>, usize)> {
488 let file = File::open(path)?;
489 let reader = BufReader::new(file);
490 let mut errors = Vec::new();
491 let mut skipped = 0;
492
493 for line in reader.lines() {
494 match line {
495 Ok(line_content) => {
496 if line_content.trim().is_empty() {
497 continue;
498 }
499 match serde_json::from_str(&line_content) {
500 Ok(error) => errors.push(error),
501 Err(_) => skipped += 1,
502 }
503 }
504 Err(_) => skipped += 1,
505 }
506 }
507
508 Ok((errors, skipped))
509}
510
511#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::uses::types::{QueryKind, UseEventType};
519 use tempfile::tempdir;
520
521 #[test]
522 fn test_storage_ensure_directories() {
523 let dir = tempdir().unwrap();
524 let storage = UsesStorage::new(dir.path().join("uses"));
525
526 storage.ensure_directories().unwrap();
527
528 assert!(storage.events_dir().exists());
529 assert!(storage.summaries_dir().exists());
530 assert!(storage.troubleshoot_dir().exists());
531 assert!(storage.errors_dir().exists());
532 }
533
534 #[test]
535 fn test_writer_creates_daily_file() {
536 let dir = tempdir().unwrap();
537 let storage = UsesStorage::new(dir.path().join("uses"));
538 storage.ensure_directories().unwrap();
539
540 let events_dir = dir.path().join("uses").join("events");
541
542 let event = UseEvent::new(UseEventType::QueryExecuted {
543 kind: QueryKind::CallChain,
544 result_count: 42,
545 });
546
547 UsesWriter::write_event(&events_dir, &event).unwrap();
548
549 let today = Utc::now().format("%Y-%m-%d").to_string();
550 let file = events_dir.join(format!("{today}.jsonl"));
551 assert!(file.exists());
552 }
553
554 #[test]
555 fn test_load_events_from_file() {
556 let dir = tempdir().unwrap();
557 let storage = UsesStorage::new(dir.path().join("uses"));
558 storage.ensure_directories().unwrap();
559
560 let events_dir = storage.events_dir();
562 let file_path = events_dir.join("2025-12-13.jsonl");
563
564 let event = UseEvent::new(UseEventType::QueryExecuted {
565 kind: QueryKind::CallChain,
566 result_count: 42,
567 });
568
569 let mut file = File::create(&file_path).unwrap();
571 serde_json::to_writer(&mut file, &event).unwrap();
572 writeln!(file).unwrap();
573
574 let (events, skipped) = load_events_from_file(&file_path).unwrap();
576 assert_eq!(events.len(), 1);
577 assert_eq!(skipped, 0);
578 }
579
580 #[test]
581 fn test_load_events_skips_malformed() {
582 let dir = tempdir().unwrap();
583 let storage = UsesStorage::new(dir.path().join("uses"));
584 storage.ensure_directories().unwrap();
585
586 let events_dir = storage.events_dir();
588 let file_path = events_dir.join("2025-12-13.jsonl");
589
590 let event = UseEvent::new(UseEventType::QueryExecuted {
591 kind: QueryKind::CallChain,
592 result_count: 42,
593 });
594
595 let mut file = File::create(&file_path).unwrap();
596 writeln!(file, "invalid json").unwrap();
597 serde_json::to_writer(&mut file, &event).unwrap();
598 writeln!(file).unwrap();
599 writeln!(file, "another invalid").unwrap();
600
601 let (events, skipped) = load_events_from_file(&file_path).unwrap();
602 assert_eq!(events.len(), 1);
603 assert_eq!(skipped, 2);
604 }
605
606 #[test]
607 fn test_atomic_write() {
608 let dir = tempdir().unwrap();
609 let target = dir.path().join("test.json");
610
611 UsesStorage::atomic_write(&target, b"{\"test\": true}").unwrap();
612
613 let content = fs::read_to_string(&target).unwrap();
614 assert_eq!(content, "{\"test\": true}");
615 }
616
617 #[test]
618 fn test_summary_read_write() {
619 let dir = tempdir().unwrap();
620 let storage = UsesStorage::new(dir.path().join("uses"));
621 storage.ensure_directories().unwrap();
622
623 let content = b"{\"period\": \"2025-W50\"}";
624 storage.write_summary("2025-W50", content).unwrap();
625
626 assert!(storage.summary_exists("2025-W50"));
627 assert!(!storage.summary_exists("2025-W51"));
628
629 let read_content = storage.read_summary("2025-W50").unwrap();
630 assert_eq!(read_content, content);
631 }
632
633 #[test]
634 fn test_load_recent_events() {
635 let dir = tempdir().unwrap();
636 let storage = UsesStorage::new(dir.path().join("uses"));
637 storage.ensure_directories().unwrap();
638
639 let today = Utc::now().format("%Y-%m-%d").to_string();
641 let file_path = storage.events_dir().join(format!("{today}.jsonl"));
642
643 let event = UseEvent::new(UseEventType::QueryExecuted {
644 kind: QueryKind::CallChain,
645 result_count: 42,
646 });
647
648 let mut file = File::create(&file_path).unwrap();
649 serde_json::to_writer(&mut file, &event).unwrap();
650 writeln!(file).unwrap();
651
652 let (events, skipped) = storage.load_recent_events(1).unwrap();
654 assert_eq!(events.len(), 1);
655 assert_eq!(skipped, 0);
656 }
657
658 #[test]
659 fn test_prune_old_events() {
660 let dir = tempdir().unwrap();
661 let storage = UsesStorage::new(dir.path().join("uses"));
662 storage.ensure_directories().unwrap();
663
664 let old_date = "2020-01-01";
666 let recent_date = Utc::now().format("%Y-%m-%d").to_string();
667
668 let old_file = storage.events_dir().join(format!("{old_date}.jsonl"));
669 let recent_file = storage.events_dir().join(format!("{recent_date}.jsonl"));
670
671 File::create(&old_file).unwrap();
672 File::create(&recent_file).unwrap();
673
674 let deleted = storage.prune_old_events(365).unwrap();
676
677 assert_eq!(deleted, 1);
678 assert!(!old_file.exists());
679 assert!(recent_file.exists());
680 }
681}