Skip to main content

sqry_core/uses/
storage.rs

1//! Uses storage - background event writing
2//!
3//! This module handles persisting use events to disk in a robust, concurrent-safe manner.
4//!
5//! # Storage Layout
6//!
7//! ```text
8//! ~/.sqry/uses/
9//! ├── events/
10//! │   ├── 2025-12-13.jsonl     # Daily event log (append-only)
11//! │   ├── 2025-12-12.jsonl
12//! │   └── ...
13//! ├── summaries/
14//! │   ├── 2025-W50.json        # Weekly aggregated summary
15//! │   └── ...
16//! └── troubleshoot/
17//!     └── 2025-12-13_143022.json  # Issue bundles
18//! ```
19//!
20//! # Cross-Process Safety
21//!
22//! - File locking (via `fs2`) ensures safe concurrent writes
23//! - Atomic writes (via temp file + rename) prevent corruption
24//! - Corrupt JSONL lines are skipped during reads
25
26use super::types::{StructuredError, UseEvent};
27use chrono::Utc;
28use crossbeam_channel::Receiver;
29use fs2::FileExt;
30use std::fs::{self, File, OpenOptions};
31use std::io::{BufRead, BufReader, BufWriter, Write};
32use std::path::{Path, PathBuf};
33
34/// Background writer that receives events from the collector channel
35/// and writes them to daily JSONL files.
36pub struct UsesWriter {
37    uses_dir: PathBuf,
38}
39
40impl UsesWriter {
41    /// Create a new writer for the given uses directory
42    #[must_use]
43    pub fn new(uses_dir: PathBuf) -> Self {
44        Self { uses_dir }
45    }
46
47    /// Run the writer loop, receiving events from the channel
48    ///
49    /// This method blocks and runs until the channel is disconnected.
50    /// It should be called from a dedicated background thread.
51    pub fn run(self, receiver: &Receiver<UseEvent>) {
52        // Ensure events directory exists
53        let events_dir = self.uses_dir.join("events");
54        if let Err(e) = fs::create_dir_all(&events_dir) {
55            log::warn!("Failed to create uses events directory: {e}");
56            // Continue anyway - events will be dropped
57        }
58
59        // Process events until channel closes
60        while let Ok(event) = receiver.recv() {
61            if let Err(e) = Self::write_event(&events_dir, &event) {
62                log::debug!("Failed to write use event: {e}");
63                // Continue processing - don't let one failure stop the writer
64            }
65        }
66    }
67
68    /// Write a single event to the daily log file
69    fn write_event(events_dir: &Path, event: &UseEvent) -> std::io::Result<()> {
70        let date = event.timestamp.format("%Y-%m-%d");
71        let file_path = events_dir.join(format!("{date}.jsonl"));
72
73        // Open file for append with exclusive lock.
74        // `.read(true)` is required on Windows: fs2::lock_exclusive uses
75        // LockFileEx which needs read access on the file handle.
76        let file = OpenOptions::new()
77            .create(true)
78            .read(true)
79            .append(true)
80            .open(&file_path)?;
81
82        // Acquire exclusive lock for append
83        file.lock_exclusive()?;
84
85        let mut writer = BufWriter::new(&file);
86        serde_json::to_writer(&mut writer, event)?;
87        writeln!(writer)?;
88        writer.flush()?;
89
90        file.unlock()?;
91
92        Ok(())
93    }
94}
95
96/// Storage operations for reading and managing use event files
97pub struct UsesStorage {
98    uses_dir: PathBuf,
99}
100
101impl UsesStorage {
102    /// Create a new storage instance for the given uses directory
103    #[must_use]
104    pub fn new(uses_dir: PathBuf) -> Self {
105        Self { uses_dir }
106    }
107
108    /// Get the path to the events directory
109    #[must_use]
110    pub fn events_dir(&self) -> PathBuf {
111        self.uses_dir.join("events")
112    }
113
114    /// Get the path to the summaries directory
115    #[must_use]
116    pub fn summaries_dir(&self) -> PathBuf {
117        self.uses_dir.join("summaries")
118    }
119
120    /// Get the path to the troubleshoot directory
121    #[must_use]
122    pub fn troubleshoot_dir(&self) -> PathBuf {
123        self.uses_dir.join("troubleshoot")
124    }
125
126    /// Get the path to the errors directory
127    #[must_use]
128    pub fn errors_dir(&self) -> PathBuf {
129        self.uses_dir.join("errors")
130    }
131
132    /// Ensure all required directories exist
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if directory creation fails.
137    pub fn ensure_directories(&self) -> std::io::Result<()> {
138        fs::create_dir_all(self.events_dir())?;
139        fs::create_dir_all(self.summaries_dir())?;
140        fs::create_dir_all(self.troubleshoot_dir())?;
141        fs::create_dir_all(self.errors_dir())?;
142        Ok(())
143    }
144
145    /// Load events from a daily log file
146    ///
147    /// Skips malformed lines gracefully.
148    ///
149    /// # Arguments
150    ///
151    /// * `date` - Date string in "YYYY-MM-DD" format
152    ///
153    /// # Returns
154    ///
155    /// A tuple of (events, `skipped_count`) where `skipped_count` is the number
156    /// of malformed lines that were skipped.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the file cannot be opened or read.
161    pub fn load_events_for_date(&self, date: &str) -> std::io::Result<(Vec<UseEvent>, usize)> {
162        let file_path = self.events_dir().join(format!("{date}.jsonl"));
163        load_events_from_file(&file_path)
164    }
165
166    /// Load events for a date range
167    ///
168    /// Loads all events from files in the given date range (inclusive).
169    ///
170    /// # Arguments
171    ///
172    /// * `start_date` - Start date in "YYYY-MM-DD" format
173    /// * `end_date` - End date in "YYYY-MM-DD" format
174    ///
175    /// # Returns
176    ///
177    /// A tuple of (events, `skipped_count`) containing all events from the range.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if any file cannot be read (missing files are skipped).
182    pub fn load_events_for_range(
183        &self,
184        start_date: &str,
185        end_date: &str,
186    ) -> std::io::Result<(Vec<UseEvent>, usize)> {
187        let events_dir = self.events_dir();
188        let mut all_events = Vec::new();
189        let mut total_skipped = 0;
190
191        // List all JSONL files in the events directory
192        if !events_dir.exists() {
193            return Ok((all_events, total_skipped));
194        }
195
196        for entry in fs::read_dir(&events_dir)? {
197            let entry = entry?;
198            let path = entry.path();
199
200            if let Some(ext) = path.extension()
201                && ext == "jsonl"
202                && let Some(stem) = path.file_stem()
203            {
204                let date = stem.to_string_lossy();
205                // Check if date is in range
206                if date.as_ref() >= start_date && date.as_ref() <= end_date {
207                    match load_events_from_file(&path) {
208                        Ok((events, skipped)) => {
209                            all_events.extend(events);
210                            total_skipped += skipped;
211                        }
212                        Err(e) => {
213                            log::debug!("Failed to load events from {}: {}", path.display(), e);
214                        }
215                    }
216                }
217            }
218        }
219
220        // Sort by timestamp
221        all_events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
222
223        Ok((all_events, total_skipped))
224    }
225
226    /// Load events for the last N days
227    ///
228    /// # Arguments
229    ///
230    /// * `days` - Number of days to look back
231    ///
232    /// # Returns
233    ///
234    /// A tuple of (events, `skipped_count`) containing recent events.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if any file cannot be read (missing files are skipped).
239    pub fn load_recent_events(&self, days: u32) -> std::io::Result<(Vec<UseEvent>, usize)> {
240        let now = Utc::now();
241        let start = now - chrono::Duration::days(i64::from(days));
242        let start_date = start.format("%Y-%m-%d").to_string();
243        let end_date = now.format("%Y-%m-%d").to_string();
244
245        self.load_events_for_range(&start_date, &end_date)
246    }
247
248    /// Delete event files older than the retention period
249    ///
250    /// # Arguments
251    ///
252    /// * `retain_days` - Number of days to retain
253    ///
254    /// # Returns
255    ///
256    /// The number of files deleted.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if the directory cannot be read.
261    pub fn prune_old_events(&self, retain_days: u32) -> std::io::Result<usize> {
262        let events_dir = self.events_dir();
263        let cutoff = Utc::now() - chrono::Duration::days(i64::from(retain_days));
264        let cutoff_date = cutoff.format("%Y-%m-%d").to_string();
265        let mut deleted = 0;
266
267        if !events_dir.exists() {
268            return Ok(0);
269        }
270
271        for entry in fs::read_dir(&events_dir)? {
272            let entry = entry?;
273            let path = entry.path();
274
275            if let Some(ext) = path.extension()
276                && ext == "jsonl"
277                && let Some(stem) = path.file_stem()
278            {
279                let date = stem.to_string_lossy();
280                if date.as_ref() < cutoff_date.as_str() {
281                    if let Err(e) = fs::remove_file(&path) {
282                        log::debug!("Failed to delete old event file {}: {}", path.display(), e);
283                    } else {
284                        deleted += 1;
285                    }
286                }
287            }
288        }
289
290        Ok(deleted)
291    }
292
293    /// Write data atomically to a file
294    ///
295    /// Uses a temp file + rename pattern to prevent corruption.
296    ///
297    /// # Arguments
298    ///
299    /// * `target` - The target file path
300    /// * `content` - The content to write
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the write fails.
305    pub fn atomic_write(target: &Path, content: &[u8]) -> std::io::Result<()> {
306        use tempfile::NamedTempFile;
307
308        // Create temp file in same directory as target (same filesystem for atomic rename)
309        let parent = target.parent().unwrap_or(Path::new("."));
310        fs::create_dir_all(parent)?;
311
312        let mut temp = NamedTempFile::new_in(parent)?;
313        temp.as_file_mut().write_all(content)?;
314        temp.as_file().sync_all()?;
315
316        // Atomic rename (persist consumes temp file)
317        temp.persist(target)?;
318
319        Ok(())
320    }
321
322    /// Write a summary file atomically
323    ///
324    /// # Arguments
325    ///
326    /// * `week` - ISO week string (e.g., "2025-W50")
327    /// * `content` - JSON content to write
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the write fails.
332    pub fn write_summary(&self, week: &str, content: &[u8]) -> std::io::Result<()> {
333        let path = self.summaries_dir().join(format!("{week}.json"));
334        Self::atomic_write(&path, content)
335    }
336
337    /// Read a summary file
338    ///
339    /// # Arguments
340    ///
341    /// * `week` - ISO week string (e.g., "2025-W50")
342    ///
343    /// # Returns
344    ///
345    /// The file content as bytes.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if the file cannot be read.
350    pub fn read_summary(&self, week: &str) -> std::io::Result<Vec<u8>> {
351        let path = self.summaries_dir().join(format!("{week}.json"));
352        fs::read(path)
353    }
354
355    /// Check if a summary file exists for a given week
356    #[must_use]
357    pub fn summary_exists(&self, week: &str) -> bool {
358        self.summaries_dir().join(format!("{week}.json")).exists()
359    }
360
361    /// Load errors for the last N days
362    ///
363    /// Loads error records from daily JSONL files for troubleshooting.
364    ///
365    /// # Arguments
366    ///
367    /// * `days` - Number of days to look back
368    ///
369    /// # Returns
370    ///
371    /// A tuple of (errors, `skipped_count`) containing recent errors.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if any file cannot be read (missing files are skipped).
376    pub fn load_recent_errors(&self, days: u32) -> std::io::Result<(Vec<StructuredError>, usize)> {
377        let errors_dir = self.errors_dir();
378        let mut all_errors = Vec::new();
379        let mut total_skipped = 0;
380
381        if !errors_dir.exists() {
382            return Ok((all_errors, total_skipped));
383        }
384
385        let now = Utc::now();
386        let cutoff = now - chrono::Duration::days(i64::from(days));
387        let cutoff_date = cutoff.format("%Y-%m-%d").to_string();
388        let end_date = now.format("%Y-%m-%d").to_string();
389
390        for entry in fs::read_dir(&errors_dir)? {
391            let entry = entry?;
392            let path = entry.path();
393
394            if let Some(ext) = path.extension()
395                && ext == "jsonl"
396                && let Some(stem) = path.file_stem()
397            {
398                let date = stem.to_string_lossy();
399                // Check if date is in range
400                if date.as_ref() >= cutoff_date.as_str() && date.as_ref() <= end_date.as_str() {
401                    match load_errors_from_file(&path) {
402                        Ok((errors, skipped)) => {
403                            all_errors.extend(errors);
404                            total_skipped += skipped;
405                        }
406                        Err(e) => {
407                            log::debug!("Failed to load errors from {}: {}", path.display(), e);
408                        }
409                    }
410                }
411            }
412        }
413
414        // Sort by timestamp (most recent first)
415        all_errors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
416
417        Ok((all_errors, total_skipped))
418    }
419
420    /// Record an error to the daily error log
421    ///
422    /// Writes error records in append-only JSONL format for troubleshooting.
423    ///
424    /// # Arguments
425    ///
426    /// * `error` - The structured error to record
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if the write fails.
431    pub fn record_error(&self, error: &StructuredError) -> std::io::Result<()> {
432        let errors_dir = self.errors_dir();
433        fs::create_dir_all(&errors_dir)?;
434
435        let date = error.timestamp.format("%Y-%m-%d");
436        let file_path = errors_dir.join(format!("{date}.jsonl"));
437
438        // Open file for append with exclusive lock.
439        // `.read(true)` is required on Windows: fs2::lock_exclusive uses
440        // LockFileEx which needs read access on the file handle.
441        let file = OpenOptions::new()
442            .create(true)
443            .read(true)
444            .append(true)
445            .open(&file_path)?;
446
447        // Acquire exclusive lock for append
448        file.lock_exclusive()?;
449
450        let mut writer = BufWriter::new(&file);
451        serde_json::to_writer(&mut writer, error)?;
452        writeln!(writer)?;
453        writer.flush()?;
454
455        file.unlock()?;
456
457        Ok(())
458    }
459}
460
461/// Load events from a JSONL file, skipping malformed lines
462fn load_events_from_file(path: &Path) -> std::io::Result<(Vec<UseEvent>, usize)> {
463    let file = File::open(path)?;
464    let reader = BufReader::new(file);
465    let mut events = Vec::new();
466    let mut skipped = 0;
467
468    for line in reader.lines() {
469        match line {
470            Ok(line_content) => {
471                if line_content.trim().is_empty() {
472                    continue;
473                }
474                match serde_json::from_str(&line_content) {
475                    Ok(event) => events.push(event),
476                    Err(_) => skipped += 1,
477                }
478            }
479            Err(_) => skipped += 1,
480        }
481    }
482
483    Ok((events, skipped))
484}
485
486/// Load errors from a JSONL file, skipping malformed lines
487fn load_errors_from_file(path: &Path) -> std::io::Result<(Vec<StructuredError>, usize)> {
488    let file = File::open(path)?;
489    let reader = BufReader::new(file);
490    let mut errors = Vec::new();
491    let mut skipped = 0;
492
493    for line in reader.lines() {
494        match line {
495            Ok(line_content) => {
496                if line_content.trim().is_empty() {
497                    continue;
498                }
499                match serde_json::from_str(&line_content) {
500                    Ok(error) => errors.push(error),
501                    Err(_) => skipped += 1,
502                }
503            }
504            Err(_) => skipped += 1,
505        }
506    }
507
508    Ok((errors, skipped))
509}
510
511// ============================================================================
512// Tests
513// ============================================================================
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::uses::types::{QueryKind, UseEventType};
519    use tempfile::tempdir;
520
521    #[test]
522    fn test_storage_ensure_directories() {
523        let dir = tempdir().unwrap();
524        let storage = UsesStorage::new(dir.path().join("uses"));
525
526        storage.ensure_directories().unwrap();
527
528        assert!(storage.events_dir().exists());
529        assert!(storage.summaries_dir().exists());
530        assert!(storage.troubleshoot_dir().exists());
531        assert!(storage.errors_dir().exists());
532    }
533
534    #[test]
535    fn test_writer_creates_daily_file() {
536        let dir = tempdir().unwrap();
537        let storage = UsesStorage::new(dir.path().join("uses"));
538        storage.ensure_directories().unwrap();
539
540        let events_dir = dir.path().join("uses").join("events");
541
542        let event = UseEvent::new(UseEventType::QueryExecuted {
543            kind: QueryKind::CallChain,
544            result_count: 42,
545        });
546
547        UsesWriter::write_event(&events_dir, &event).unwrap();
548
549        let today = Utc::now().format("%Y-%m-%d").to_string();
550        let file = events_dir.join(format!("{today}.jsonl"));
551        assert!(file.exists());
552    }
553
554    #[test]
555    fn test_load_events_from_file() {
556        let dir = tempdir().unwrap();
557        let storage = UsesStorage::new(dir.path().join("uses"));
558        storage.ensure_directories().unwrap();
559
560        // Write some events
561        let events_dir = storage.events_dir();
562        let file_path = events_dir.join("2025-12-13.jsonl");
563
564        let event = UseEvent::new(UseEventType::QueryExecuted {
565            kind: QueryKind::CallChain,
566            result_count: 42,
567        });
568
569        // Write event to file
570        let mut file = File::create(&file_path).unwrap();
571        serde_json::to_writer(&mut file, &event).unwrap();
572        writeln!(file).unwrap();
573
574        // Read it back
575        let (events, skipped) = load_events_from_file(&file_path).unwrap();
576        assert_eq!(events.len(), 1);
577        assert_eq!(skipped, 0);
578    }
579
580    #[test]
581    fn test_load_events_skips_malformed() {
582        let dir = tempdir().unwrap();
583        let storage = UsesStorage::new(dir.path().join("uses"));
584        storage.ensure_directories().unwrap();
585
586        // Write mix of valid and invalid lines
587        let events_dir = storage.events_dir();
588        let file_path = events_dir.join("2025-12-13.jsonl");
589
590        let event = UseEvent::new(UseEventType::QueryExecuted {
591            kind: QueryKind::CallChain,
592            result_count: 42,
593        });
594
595        let mut file = File::create(&file_path).unwrap();
596        writeln!(file, "invalid json").unwrap();
597        serde_json::to_writer(&mut file, &event).unwrap();
598        writeln!(file).unwrap();
599        writeln!(file, "another invalid").unwrap();
600
601        let (events, skipped) = load_events_from_file(&file_path).unwrap();
602        assert_eq!(events.len(), 1);
603        assert_eq!(skipped, 2);
604    }
605
606    #[test]
607    fn test_atomic_write() {
608        let dir = tempdir().unwrap();
609        let target = dir.path().join("test.json");
610
611        UsesStorage::atomic_write(&target, b"{\"test\": true}").unwrap();
612
613        let content = fs::read_to_string(&target).unwrap();
614        assert_eq!(content, "{\"test\": true}");
615    }
616
617    #[test]
618    fn test_summary_read_write() {
619        let dir = tempdir().unwrap();
620        let storage = UsesStorage::new(dir.path().join("uses"));
621        storage.ensure_directories().unwrap();
622
623        let content = b"{\"period\": \"2025-W50\"}";
624        storage.write_summary("2025-W50", content).unwrap();
625
626        assert!(storage.summary_exists("2025-W50"));
627        assert!(!storage.summary_exists("2025-W51"));
628
629        let read_content = storage.read_summary("2025-W50").unwrap();
630        assert_eq!(read_content, content);
631    }
632
633    #[test]
634    fn test_load_recent_events() {
635        let dir = tempdir().unwrap();
636        let storage = UsesStorage::new(dir.path().join("uses"));
637        storage.ensure_directories().unwrap();
638
639        // Write an event for today
640        let today = Utc::now().format("%Y-%m-%d").to_string();
641        let file_path = storage.events_dir().join(format!("{today}.jsonl"));
642
643        let event = UseEvent::new(UseEventType::QueryExecuted {
644            kind: QueryKind::CallChain,
645            result_count: 42,
646        });
647
648        let mut file = File::create(&file_path).unwrap();
649        serde_json::to_writer(&mut file, &event).unwrap();
650        writeln!(file).unwrap();
651
652        // Load recent events
653        let (events, skipped) = storage.load_recent_events(1).unwrap();
654        assert_eq!(events.len(), 1);
655        assert_eq!(skipped, 0);
656    }
657
658    #[test]
659    fn test_prune_old_events() {
660        let dir = tempdir().unwrap();
661        let storage = UsesStorage::new(dir.path().join("uses"));
662        storage.ensure_directories().unwrap();
663
664        // Create files with old dates
665        let old_date = "2020-01-01";
666        let recent_date = Utc::now().format("%Y-%m-%d").to_string();
667
668        let old_file = storage.events_dir().join(format!("{old_date}.jsonl"));
669        let recent_file = storage.events_dir().join(format!("{recent_date}.jsonl"));
670
671        File::create(&old_file).unwrap();
672        File::create(&recent_file).unwrap();
673
674        // Prune events older than 365 days
675        let deleted = storage.prune_old_events(365).unwrap();
676
677        assert_eq!(deleted, 1);
678        assert!(!old_file.exists());
679        assert!(recent_file.exists());
680    }
681}