Skip to main content

magic_bird/store/
compact.rs

1//! Compaction and archival operations.
2//!
3//! Supports two modes:
4//! - **Client-specific**: Compact files for a single session/client (used by shell hooks)
5//! - **Global**: Compact files across all sessions (used for scheduled maintenance)
6
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::{NaiveDate, Utc};
12use uuid::Uuid;
13
14use super::atomic;
15use super::Store;
16use crate::Result;
17
18/// Check if a filename is a compacted file (contains `__compacted-N__`).
19fn is_compacted_file(name: &str) -> bool {
20    name.contains("__compacted-")
21}
22
23/// Check if a filename is a seed file.
24fn is_seed_file(name: &str) -> bool {
25    name.starts_with("_seed")
26}
27
28/// Extract the session/group key from a filename.
29///
30/// For files like `session--cmd--uuid.parquet`, returns `session`.
31/// For files like `session_id.parquet`, returns `session_id`.
32/// For compacted files like `session--__compacted-0__--uuid.parquet`, returns `session`.
33fn extract_session(name: &str) -> Option<String> {
34    let stem = name.strip_suffix(".parquet")?;
35
36    // Find first segment before "--"
37    if let Some(idx) = stem.find("--") {
38        Some(stem[..idx].to_string())
39    } else {
40        // No "--", use whole stem (e.g., session files)
41        Some(stem.to_string())
42    }
43}
44
45/// Extract the compaction sequence number from a filename, if present.
46fn extract_compaction_number(name: &str) -> Option<u32> {
47    // Pattern: __compacted-N__
48    if let Some(start) = name.find("__compacted-") {
49        let after_prefix = &name[start + 12..]; // skip "__compacted-"
50        if let Some(end) = after_prefix.find("__") {
51            if let Ok(n) = after_prefix[..end].parse::<u32>() {
52                return Some(n);
53            }
54        }
55    }
56    None
57}
58
59/// Find the next compaction sequence number for a session in a partition.
60fn next_compaction_number(partition_dir: &Path, session: &str) -> u32 {
61    fs::read_dir(partition_dir)
62        .ok()
63        .map(|entries| {
64            entries
65                .filter_map(|e| e.ok())
66                .filter_map(|e| {
67                    let name = e.file_name().to_string_lossy().to_string();
68                    // Only consider compacted files for this session
69                    if name.starts_with(&format!("{}--__compacted-", session)) {
70                        extract_compaction_number(&name)
71                    } else {
72                        None
73                    }
74                })
75                .max()
76                .map(|n| n + 1)
77                .unwrap_or(0)
78        })
79        .unwrap_or(0)
80}
81
82/// Get modification time for sorting files by age.
83fn file_mtime(path: &Path) -> u64 {
84    fs::metadata(path)
85        .and_then(|m| m.modified())
86        .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs())
87        .unwrap_or(0)
88}
89
90/// Statistics from a compaction operation.
91#[derive(Debug, Default)]
92pub struct CompactStats {
93    pub partitions_compacted: usize,
94    pub sessions_compacted: usize,
95    pub files_before: usize,
96    pub files_after: usize,
97    pub bytes_before: u64,
98    pub bytes_after: u64,
99}
100
101impl CompactStats {
102    pub fn add(&mut self, other: &CompactStats) {
103        self.partitions_compacted += other.partitions_compacted;
104        self.sessions_compacted += other.sessions_compacted;
105        self.files_before += other.files_before;
106        self.files_after += other.files_after;
107        self.bytes_before += other.bytes_before;
108        self.bytes_after += other.bytes_after;
109    }
110}
111
112/// Statistics from an archive operation.
113#[derive(Debug, Default)]
114pub struct ArchiveStats {
115    pub partitions_archived: usize,
116    pub files_moved: usize,
117    pub bytes_moved: u64,
118}
119
120/// Options for compaction operations.
121#[derive(Debug, Clone)]
122pub struct CompactOptions {
123    /// Compact when a session has more than this many non-compacted files.
124    pub file_threshold: usize,
125    /// Re-compact when a session has more than this many compacted files.
126    /// Set to 0 to disable re-compaction.
127    pub recompact_threshold: usize,
128    /// If true, consolidate ALL files (including compacted) into a single file.
129    pub consolidate: bool,
130    /// If true, don't actually make changes.
131    pub dry_run: bool,
132    /// If set, only compact files for this specific session.
133    pub session_filter: Option<String>,
134}
135
136impl Default for CompactOptions {
137    fn default() -> Self {
138        Self {
139            file_threshold: 50,
140            recompact_threshold: 10,
141            consolidate: false,
142            dry_run: false,
143            session_filter: None,
144        }
145    }
146}
147
148/// Options for auto-compaction (includes archive settings).
149#[derive(Debug)]
150pub struct AutoCompactOptions {
151    /// Compact options.
152    pub compact: CompactOptions,
153    /// Migrate data older than this many days to archive.
154    pub archive_days: u32,
155}
156
157impl Default for AutoCompactOptions {
158    fn default() -> Self {
159        Self {
160            compact: CompactOptions::default(),
161            archive_days: 14,
162        }
163    }
164}
165
166/// Options for clean/prune operations.
167#[derive(Debug, Clone)]
168pub struct CleanOptions {
169    /// If true, don't actually make changes.
170    pub dry_run: bool,
171    /// Maximum age in hours for pending files before they're considered stale.
172    pub max_age_hours: u32,
173    /// If true, also prune old archive data.
174    pub prune: bool,
175    /// Delete archive data older than this many days (when prune=true).
176    pub older_than_days: u32,
177}
178
179impl Default for CleanOptions {
180    fn default() -> Self {
181        Self {
182            dry_run: false,
183            max_age_hours: 24,
184            prune: false,
185            older_than_days: 365,
186        }
187    }
188}
189
190/// Statistics from a clean operation.
191#[derive(Debug, Default)]
192pub struct CleanStats {
193    /// Number of pending files checked.
194    pub pending_checked: usize,
195    /// Number of invocations still running.
196    pub still_running: usize,
197    /// Number of invocations marked as orphaned.
198    pub orphaned: usize,
199    /// Number of archive files pruned.
200    pub pruned_files: usize,
201    /// Bytes freed from pruning.
202    pub bytes_freed: u64,
203}
204
205impl Store {
206    /// Compact files for a specific session in a partition.
207    ///
208    /// Keeps the most recent `keep_count` files, compacts the rest.
209    /// Uses naming: `<session>--__compacted-N__--<uuid>.parquet`
210    fn compact_session_files(
211        &self,
212        partition_dir: &Path,
213        session: &str,
214        files: &mut Vec<PathBuf>,
215        keep_count: usize,
216        dry_run: bool,
217    ) -> Result<CompactStats> {
218        // Sort by modification time (oldest first)
219        files.sort_by_key(|p| file_mtime(p));
220
221        // Keep the most recent `keep_count` files
222        let to_keep = files.len().saturating_sub(keep_count).max(0);
223        if to_keep < 2 {
224            // Need at least 2 files to compact
225            return Ok(CompactStats::default());
226        }
227
228        let files_to_compact: Vec<PathBuf> = files.drain(..to_keep).collect();
229        let files_before = files_to_compact.len();
230        let bytes_before: u64 = files_to_compact
231            .iter()
232            .filter_map(|p| fs::metadata(p).ok())
233            .map(|m| m.len())
234            .sum();
235
236        if dry_run {
237            return Ok(CompactStats {
238                partitions_compacted: 0, // Will be set by caller
239                sessions_compacted: 1,
240                files_before,
241                files_after: 1,
242                bytes_before,
243                bytes_after: bytes_before, // Estimate
244            });
245        }
246
247        // Use minimal connection to avoid view setup overhead
248        let conn = self.connection_with_options(false)?;
249
250        // Build list of files to read
251        let file_list: Vec<String> = files_to_compact
252            .iter()
253            .map(|p| p.display().to_string())
254            .collect();
255        let file_list_sql = file_list
256            .iter()
257            .map(|f| format!("'{}'", f))
258            .collect::<Vec<_>>()
259            .join(", ");
260
261        // Create temp table with data from selected files
262        conn.execute(
263            &format!(
264                "CREATE OR REPLACE TEMP TABLE compact_temp AS
265                 SELECT * FROM read_parquet([{}], union_by_name = true)",
266                file_list_sql
267            ),
268            [],
269        )?;
270
271        // Generate compacted filename: <session>--__compacted-N__--<uuid>.parquet
272        let seq_num = next_compaction_number(partition_dir, session);
273        let uuid = Uuid::now_v7();
274        let compacted_name = format!("{}--__compacted-{}__--{}.parquet", session, seq_num, uuid);
275        let compacted_path = partition_dir.join(&compacted_name);
276        let temp_path = atomic::temp_path(&compacted_path);
277
278        conn.execute(
279            &format!(
280                "COPY compact_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
281                temp_path.display()
282            ),
283            [],
284        )?;
285
286        conn.execute("DROP TABLE compact_temp", [])?;
287
288        // Get size of new file
289        let bytes_after = fs::metadata(&temp_path)?.len();
290
291        // Remove old files
292        for file in &files_to_compact {
293            fs::remove_file(file)?;
294        }
295
296        // Rename compacted file to final location
297        atomic::rename_into_place(&temp_path, &compacted_path)?;
298
299        Ok(CompactStats {
300            partitions_compacted: 0, // Will be set by caller
301            sessions_compacted: 1,
302            files_before,
303            files_after: 1,
304            bytes_before,
305            bytes_after,
306        })
307    }
308
309    /// Consolidate ALL files for a session into a single file.
310    ///
311    /// Unlike regular compaction, this merges everything including previously
312    /// compacted files into a single `data_0.parquet` file.
313    fn consolidate_session_files(
314        &self,
315        partition_dir: &Path,
316        _session: &str,
317        files: Vec<PathBuf>,
318        dry_run: bool,
319    ) -> Result<CompactStats> {
320        if files.len() < 2 {
321            return Ok(CompactStats::default());
322        }
323
324        let files_before = files.len();
325        let bytes_before: u64 = files
326            .iter()
327            .filter_map(|p| fs::metadata(p).ok())
328            .map(|m| m.len())
329            .sum();
330
331        if dry_run {
332            return Ok(CompactStats {
333                partitions_compacted: 0,
334                sessions_compacted: 1,
335                files_before,
336                files_after: 1,
337                bytes_before,
338                bytes_after: bytes_before,
339            });
340        }
341
342        // Use minimal connection to avoid view setup overhead
343        let conn = self.connection_with_options(false)?;
344
345        // Build list of files to read
346        let file_list_sql = files
347            .iter()
348            .map(|p| format!("'{}'", p.display()))
349            .collect::<Vec<_>>()
350            .join(", ");
351
352        // Create temp table with data from all files
353        conn.execute(
354            &format!(
355                "CREATE OR REPLACE TEMP TABLE consolidate_temp AS
356                 SELECT * FROM read_parquet([{}], union_by_name = true)",
357                file_list_sql
358            ),
359            [],
360        )?;
361
362        // Use simple data_0.parquet naming for consolidated files
363        let uuid = Uuid::now_v7();
364        let consolidated_name = format!("data_{}.parquet", uuid);
365        let consolidated_path = partition_dir.join(&consolidated_name);
366        let temp_path = atomic::temp_path(&consolidated_path);
367
368        conn.execute(
369            &format!(
370                "COPY consolidate_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
371                temp_path.display()
372            ),
373            [],
374        )?;
375
376        conn.execute("DROP TABLE consolidate_temp", [])?;
377
378        let bytes_after = fs::metadata(&temp_path)?.len();
379
380        // Remove old files
381        for file in &files {
382            fs::remove_file(file)?;
383        }
384
385        // Rename consolidated file to final location
386        atomic::rename_into_place(&temp_path, &consolidated_path)?;
387
388        Ok(CompactStats {
389            partitions_compacted: 0,
390            sessions_compacted: 1,
391            files_before,
392            files_after: 1,
393            bytes_before,
394            bytes_after,
395        })
396    }
397
398    /// Compact files in a partition directory, grouped by session.
399    ///
400    /// Behavior depends on options:
401    /// - `consolidate`: Merge ALL files into single file per session
402    /// - `file_threshold`: Compact when > N non-compacted files
403    /// - `recompact_threshold`: Re-compact when > N compacted files exist
404    pub fn compact_partition_with_opts(
405        &self,
406        partition_dir: &Path,
407        opts: &CompactOptions,
408    ) -> Result<CompactStats> {
409        let mut total_stats = CompactStats::default();
410
411        // Group files by session, separating compacted from non-compacted
412        let mut session_files: HashMap<String, (Vec<PathBuf>, Vec<PathBuf>)> = HashMap::new();
413
414        for entry in fs::read_dir(partition_dir)? {
415            let entry = entry?;
416            let path = entry.path();
417            let name = entry.file_name().to_string_lossy().to_string();
418
419            // Skip non-parquet and seed files
420            if !name.ends_with(".parquet") || is_seed_file(&name) {
421                continue;
422            }
423
424            // Extract session from filename
425            if let Some(session) = extract_session(&name) {
426                // Apply session filter if specified
427                if let Some(ref filter) = opts.session_filter {
428                    if session != *filter {
429                        continue;
430                    }
431                }
432
433                let entry = session_files.entry(session).or_insert_with(|| (vec![], vec![]));
434                if is_compacted_file(&name) || name.starts_with("data_") {
435                    entry.1.push(path); // compacted/consolidated
436                } else {
437                    entry.0.push(path); // non-compacted
438                }
439            }
440        }
441
442        let mut any_compacted = false;
443        for (session, (non_compacted, compacted)) in session_files {
444            if opts.consolidate {
445                // Consolidate mode: merge ALL files into one
446                let all_files: Vec<PathBuf> = non_compacted.into_iter().chain(compacted).collect();
447                if all_files.len() >= 2 {
448                    let stats = self.consolidate_session_files(
449                        partition_dir,
450                        &session,
451                        all_files,
452                        opts.dry_run,
453                    )?;
454                    if stats.sessions_compacted > 0 {
455                        any_compacted = true;
456                        total_stats.add(&stats);
457                    }
458                }
459            } else {
460                // Regular compaction mode
461
462                // First: compact non-compacted files if threshold exceeded
463                if non_compacted.len() >= opts.file_threshold {
464                    let mut to_process = non_compacted;
465                    let stats = self.compact_session_files(
466                        partition_dir,
467                        &session,
468                        &mut to_process,
469                        opts.file_threshold,
470                        opts.dry_run,
471                    )?;
472                    if stats.sessions_compacted > 0 {
473                        any_compacted = true;
474                        total_stats.add(&stats);
475                    }
476                }
477
478                // Second: re-compact compacted files if recompact threshold exceeded
479                if opts.recompact_threshold > 0 && compacted.len() >= opts.recompact_threshold {
480                    let stats = self.consolidate_session_files(
481                        partition_dir,
482                        &session,
483                        compacted,
484                        opts.dry_run,
485                    )?;
486                    if stats.sessions_compacted > 0 {
487                        any_compacted = true;
488                        total_stats.add(&stats);
489                    }
490                }
491            }
492        }
493
494        if any_compacted {
495            total_stats.partitions_compacted = 1;
496        }
497
498        Ok(total_stats)
499    }
500
501    /// Compact files in a partition directory (legacy API).
502    pub fn compact_partition(
503        &self,
504        partition_dir: &Path,
505        file_threshold: usize,
506        session_filter: Option<&str>,
507        dry_run: bool,
508    ) -> Result<CompactStats> {
509        let opts = CompactOptions {
510            file_threshold,
511            recompact_threshold: 0, // Disable re-compaction for legacy API
512            consolidate: false,
513            dry_run,
514            session_filter: session_filter.map(|s| s.to_string()),
515        };
516        self.compact_partition_with_opts(partition_dir, &opts)
517    }
518
519    /// Compact all partitions in a data type directory (attempts, outcomes, outputs, sessions).
520    pub fn compact_data_type(
521        &self,
522        data_dir: &Path,
523        file_threshold: usize,
524        session_filter: Option<&str>,
525        dry_run: bool,
526    ) -> Result<CompactStats> {
527        let mut total_stats = CompactStats::default();
528
529        if !data_dir.exists() {
530            return Ok(total_stats);
531        }
532
533        // v5 schema: iterate over date= partitions directly (no status= partitions)
534        for entry in fs::read_dir(data_dir)? {
535            let entry = entry?;
536            let path = entry.path();
537
538            if !path.is_dir() {
539                continue;
540            }
541
542            let dir_name = entry.file_name().to_string_lossy().to_string();
543
544            // v5 schema: only date= partitions
545            if dir_name.starts_with("date=") {
546                let stats =
547                    self.compact_partition(&path, file_threshold, session_filter, dry_run)?;
548                total_stats.add(&stats);
549            }
550            // Skip non-date directories
551        }
552
553        Ok(total_stats)
554    }
555
556    /// Compact recent data for a specific session (used by shell hooks).
557    ///
558    /// Checks all date partitions in recent data.
559    pub fn compact_for_session(
560        &self,
561        session_id: &str,
562        file_threshold: usize,
563        dry_run: bool,
564    ) -> Result<CompactStats> {
565        let mut total_stats = CompactStats::default();
566        let recent_dir = self.config().recent_dir();
567
568        // v5 schema: attempts/outcomes instead of invocations with status partitions
569        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
570            let data_dir = recent_dir.join(data_type);
571            let stats =
572                self.compact_data_type(&data_dir, file_threshold, Some(session_id), dry_run)?;
573            total_stats.add(&stats);
574        }
575
576        Ok(total_stats)
577    }
578
579    /// Fast compaction check for today's partition only (used by shell hooks).
580    ///
581    /// This is the most lightweight check - only looks at today's date partition.
582    pub fn compact_session_today(
583        &self,
584        session_id: &str,
585        file_threshold: usize,
586        dry_run: bool,
587    ) -> Result<CompactStats> {
588        let mut total_stats = CompactStats::default();
589        let recent_dir = self.config().recent_dir();
590        let today = Utc::now().date_naive();
591        let date_partition = format!("date={}", today.format("%Y-%m-%d"));
592
593        // v5 schema: attempts/outcomes instead of invocations with status partitions
594        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
595            let partition_dir = recent_dir.join(data_type).join(&date_partition);
596            if partition_dir.exists() {
597                let stats = self.compact_partition(
598                    &partition_dir,
599                    file_threshold,
600                    Some(session_id),
601                    dry_run,
602                )?;
603                total_stats.add(&stats);
604            }
605        }
606
607        Ok(total_stats)
608    }
609
610    /// Compact all recent data that exceeds the file threshold (global mode).
611    pub fn compact_recent(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
612        let mut total_stats = CompactStats::default();
613        let recent_dir = self.config().recent_dir();
614
615        // v5 schema: attempts/outcomes instead of invocations with status partitions
616        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
617            let data_dir = recent_dir.join(data_type);
618            let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
619            total_stats.add(&stats);
620        }
621
622        Ok(total_stats)
623    }
624
625    /// Compact all archive data that exceeds the file threshold.
626    pub fn compact_archive(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
627        let mut total_stats = CompactStats::default();
628        let archive_dir = self.config().archive_dir();
629
630        // v5 schema: attempts/outcomes instead of invocations with status partitions
631        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
632            let data_dir = archive_dir.join(data_type);
633            let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
634            total_stats.add(&stats);
635        }
636
637        Ok(total_stats)
638    }
639
640    /// Migrate old data from recent to archive with consolidation.
641    ///
642    /// Consolidates all files in each date partition into a single parquet file
643    /// in the archive, then removes the source files.
644    pub fn archive_old_data(&self, older_than_days: u32, dry_run: bool) -> Result<ArchiveStats> {
645        let mut stats = ArchiveStats::default();
646        let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
647
648        let recent_dir = self.config().recent_dir();
649        let archive_dir = self.config().archive_dir();
650
651        // v5 schema: attempts/outcomes instead of invocations with status partitions
652        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
653            let recent_data_dir = recent_dir.join(data_type);
654            let archive_data_dir = archive_dir.join(data_type);
655
656            if !recent_data_dir.exists() {
657                continue;
658            }
659
660            // Collect partitions to archive
661            // Use <= so that "older_than_days=0" archives everything including today
662            // Skip seed partitions (date=1970-01-01) which contain schema-only files
663            let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
664
665            // v5 schema: all data types have date partitions directly (no status= partitions)
666            let date_partition_parent = recent_data_dir.clone();
667
668            if !date_partition_parent.exists() {
669                continue;
670            }
671
672            let partitions_to_archive: Vec<(NaiveDate, PathBuf, String)> = fs::read_dir(&date_partition_parent)?
673                .filter_map(|e| e.ok())
674                .filter_map(|e| {
675                    let path = e.path();
676                    if !path.is_dir() {
677                        return None;
678                    }
679                    let dir_name = e.file_name().to_string_lossy().to_string();
680                    let date_str = dir_name.strip_prefix("date=")?;
681                    let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?;
682                    // Skip seed partition
683                    if date == seed_date {
684                        return None;
685                    }
686                    if date <= cutoff_date {
687                        Some((date, path, dir_name))
688                    } else {
689                        None
690                    }
691                })
692                .collect();
693
694            for (_date, partition_path, dir_name) in partitions_to_archive {
695                let dest_dir = archive_data_dir.join(&dir_name);
696
697                // Count source stats
698                let source_files: Vec<PathBuf> = fs::read_dir(&partition_path)?
699                    .filter_map(|e| e.ok())
700                    .map(|e| e.path())
701                    .filter(|p| p.extension().map(|ext| ext == "parquet").unwrap_or(false))
702                    .collect();
703
704                if source_files.is_empty() {
705                    continue;
706                }
707
708                let file_count = source_files.len();
709                let bytes_before: u64 = source_files
710                    .iter()
711                    .filter_map(|p| fs::metadata(p).ok())
712                    .map(|m| m.len())
713                    .sum();
714
715                if dry_run {
716                    stats.partitions_archived += 1;
717                    stats.files_moved += file_count;
718                    stats.bytes_moved += bytes_before;
719                    continue;
720                }
721
722                // Skip if archive partition already exists with data
723                if dest_dir.exists() {
724                    let existing_files = fs::read_dir(&dest_dir)?
725                        .filter_map(|e| e.ok())
726                        .any(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false));
727                    if existing_files {
728                        // Already archived, skip
729                        continue;
730                    }
731                }
732
733                fs::create_dir_all(&dest_dir)?;
734
735                // Consolidate using DuckDB's COPY
736                // Use minimal connection to avoid view setup (which can fail if
737                // some data type directories are empty)
738                let conn = self.connection_with_options(false)?;
739                let src_glob = format!("{}/*.parquet", partition_path.display());
740                let dest_file = dest_dir.join("data_0.parquet");
741                let temp_file = dest_dir.join(".data_0.parquet.tmp");
742
743                conn.execute(
744                    &format!(
745                        "COPY (SELECT * FROM read_parquet('{}', union_by_name = true)) \
746                         TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
747                        src_glob,
748                        temp_file.display()
749                    ),
750                    [],
751                )?;
752
753                // Atomic rename
754                fs::rename(&temp_file, &dest_file)?;
755
756                // Get consolidated size
757                let bytes_after = fs::metadata(&dest_file)?.len();
758
759                // Remove source files
760                for file in &source_files {
761                    let _ = fs::remove_file(file);
762                }
763                let _ = fs::remove_dir(&partition_path);
764
765                stats.partitions_archived += 1;
766                stats.files_moved += file_count;
767                stats.bytes_moved += bytes_after;
768            }
769        }
770
771        Ok(stats)
772    }
773
774    /// Run auto-compaction based on options.
775    pub fn auto_compact(&self, opts: &AutoCompactOptions) -> Result<(CompactStats, ArchiveStats)> {
776        // First, archive old data (skip if doing session-specific compaction)
777        let archive_stats = if opts.compact.session_filter.is_none() {
778            self.archive_old_data(opts.archive_days, opts.compact.dry_run)?
779        } else {
780            ArchiveStats::default()
781        };
782
783        // Then compact based on mode
784        let compact_stats = if let Some(ref session) = opts.compact.session_filter {
785            // Client-specific mode
786            self.compact_for_session_with_opts(session, &opts.compact)?
787        } else {
788            // Global mode: compact both recent and archive
789            let mut stats = self.compact_recent_with_opts(&opts.compact)?;
790            let archive_compact = self.compact_archive_with_opts(&opts.compact)?;
791            stats.add(&archive_compact);
792            stats
793        };
794
795        Ok((compact_stats, archive_stats))
796    }
797
798    /// Compact recent data with full options.
799    pub fn compact_recent_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
800        let mut total_stats = CompactStats::default();
801        let recent_dir = self.config().recent_dir();
802
803        // v5 schema: attempts/outcomes instead of invocations with status partitions
804        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
805            let data_dir = recent_dir.join(data_type);
806            let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
807            total_stats.add(&stats);
808        }
809
810        Ok(total_stats)
811    }
812
813    /// Compact archive data with full options.
814    pub fn compact_archive_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
815        let mut total_stats = CompactStats::default();
816        let archive_dir = self.config().archive_dir();
817
818        // v5 schema: attempts/outcomes instead of invocations with status partitions
819        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
820            let data_dir = archive_dir.join(data_type);
821            let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
822            total_stats.add(&stats);
823        }
824
825        Ok(total_stats)
826    }
827
828    /// Compact data type directory with full options.
829    pub fn compact_data_type_with_opts(
830        &self,
831        data_dir: &Path,
832        opts: &CompactOptions,
833    ) -> Result<CompactStats> {
834        let mut total_stats = CompactStats::default();
835
836        if !data_dir.exists() {
837            return Ok(total_stats);
838        }
839
840        for entry in fs::read_dir(data_dir)? {
841            let entry = entry?;
842            let path = entry.path();
843
844            if !path.is_dir() {
845                continue;
846            }
847
848            let dir_name = entry.file_name().to_string_lossy().to_string();
849
850            // v5 schema: no status= partitions, only date= partitions
851            if dir_name.starts_with("date=") {
852                // This is a date= partition
853                let stats = self.compact_partition_with_opts(&path, opts)?;
854                total_stats.add(&stats);
855            }
856            // Skip non-date directories
857        }
858
859        Ok(total_stats)
860    }
861
862    /// Compact files for a specific session with full options.
863    pub fn compact_for_session_with_opts(
864        &self,
865        session_id: &str,
866        opts: &CompactOptions,
867    ) -> Result<CompactStats> {
868        let mut total_stats = CompactStats::default();
869        let recent_dir = self.config().recent_dir();
870
871        let session_opts = CompactOptions {
872            session_filter: Some(session_id.to_string()),
873            ..opts.clone()
874        };
875
876        // v5 schema: attempts/outcomes instead of invocations with status partitions
877        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
878            let data_dir = recent_dir.join(data_type);
879            let stats = self.compact_data_type_with_opts(&data_dir, &session_opts)?;
880            total_stats.add(&stats);
881        }
882
883        Ok(total_stats)
884    }
885
886    /// Clean operation: recover orphaned invocations and optionally prune archive.
887    ///
888    /// This:
889    /// 1. Scans for pending invocations (attempts without matching outcomes)
890    /// 2. Creates outcome records for orphaned invocations
891    /// 3. Optionally prunes old archive data
892    pub fn clean(&self, opts: &CleanOptions) -> Result<CleanStats> {
893        let mut stats = CleanStats::default();
894
895        // Recover orphaned invocations
896        let recovery_stats =
897            self.recover_orphaned_invocations(opts.max_age_hours, opts.dry_run)?;
898
899        stats.pending_checked = recovery_stats.pending_checked;
900        stats.still_running = recovery_stats.still_running;
901        stats.orphaned = recovery_stats.orphaned;
902
903        // Prune old archive data if requested
904        if opts.prune {
905            let prune_stats = self.prune_archive(opts.older_than_days, opts.dry_run)?;
906            stats.pruned_files = prune_stats.files_pruned;
907            stats.bytes_freed = prune_stats.bytes_freed;
908        }
909
910        Ok(stats)
911    }
912
913    /// Prune old archive data.
914    ///
915    /// Deletes data from the archive tier older than the specified number of days.
916    pub fn prune_archive(&self, older_than_days: u32, dry_run: bool) -> Result<PruneStats> {
917        let mut stats = PruneStats::default();
918        let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
919        let archive_dir = self.config().archive_dir();
920
921        // v5 schema: attempts/outcomes instead of invocations with status partitions
922        for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
923            let data_dir = archive_dir.join(data_type);
924            if !data_dir.exists() {
925                continue;
926            }
927
928            // Archive has date= partitions directly
929            let partition_stats =
930                self.prune_date_partitions(&data_dir, cutoff_date, dry_run)?;
931            stats.add(&partition_stats);
932        }
933
934        Ok(stats)
935    }
936
937    /// Prune date partitions older than cutoff.
938    fn prune_date_partitions(
939        &self,
940        parent_dir: &Path,
941        cutoff_date: NaiveDate,
942        dry_run: bool,
943    ) -> Result<PruneStats> {
944        let mut stats = PruneStats::default();
945        let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
946
947        for entry in fs::read_dir(parent_dir)?.flatten() {
948            let path = entry.path();
949            if !path.is_dir() {
950                continue;
951            }
952
953            let dir_name = entry.file_name().to_string_lossy().to_string();
954            let date_str = match dir_name.strip_prefix("date=") {
955                Some(s) => s,
956                None => continue,
957            };
958
959            let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
960                Ok(d) => d,
961                Err(_) => continue,
962            };
963
964            // Skip seed partition
965            if date == seed_date {
966                continue;
967            }
968
969            // Skip if not old enough
970            if date > cutoff_date {
971                continue;
972            }
973
974            // Count files and bytes
975            for file_entry in fs::read_dir(&path)?.flatten() {
976                let file_path = file_entry.path();
977                if file_path.extension().map(|e| e == "parquet").unwrap_or(false) {
978                    if let Ok(metadata) = fs::metadata(&file_path) {
979                        stats.files_pruned += 1;
980                        stats.bytes_freed += metadata.len();
981                    }
982
983                    if !dry_run {
984                        let _ = fs::remove_file(&file_path);
985                    }
986                }
987            }
988
989            // Remove the partition directory if empty (and not dry run)
990            if !dry_run {
991                let _ = fs::remove_dir(&path);
992            }
993        }
994
995        Ok(stats)
996    }
997}
998
999/// Statistics from a prune operation.
1000#[derive(Debug, Default)]
1001pub struct PruneStats {
1002    /// Number of files pruned.
1003    pub files_pruned: usize,
1004    /// Bytes freed.
1005    pub bytes_freed: u64,
1006}
1007
1008impl PruneStats {
1009    fn add(&mut self, other: &PruneStats) {
1010        self.files_pruned += other.files_pruned;
1011        self.bytes_freed += other.bytes_freed;
1012    }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use super::*;
1018    use crate::init::initialize;
1019    use crate::schema::InvocationRecord;
1020    use crate::Config;
1021    use tempfile::TempDir;
1022
1023    fn setup_store() -> (TempDir, Store) {
1024        let tmp = TempDir::new().unwrap();
1025        let config = Config::with_root(tmp.path());
1026        initialize(&config).unwrap();
1027        let store = Store::open(config).unwrap();
1028        (tmp, store)
1029    }
1030
1031    fn setup_store_duckdb() -> (TempDir, Store) {
1032        let tmp = TempDir::new().unwrap();
1033        let config = Config::with_duckdb_mode(tmp.path());
1034        initialize(&config).unwrap();
1035        let store = Store::open(config).unwrap();
1036        (tmp, store)
1037    }
1038
1039    #[test]
1040    fn test_extract_session() {
1041        assert_eq!(
1042            extract_session("zsh-1234--make--uuid.parquet"),
1043            Some("zsh-1234".to_string())
1044        );
1045        assert_eq!(
1046            extract_session("session-id.parquet"),
1047            Some("session-id".to_string())
1048        );
1049        assert_eq!(
1050            extract_session("zsh-1234--__compacted-0__--uuid.parquet"),
1051            Some("zsh-1234".to_string())
1052        );
1053    }
1054
1055    #[test]
1056    fn test_is_compacted_file() {
1057        assert!(is_compacted_file("zsh-1234--__compacted-0__--uuid.parquet"));
1058        assert!(is_compacted_file("session--__compacted-5__--abc.parquet"));
1059        assert!(!is_compacted_file("zsh-1234--make--uuid.parquet"));
1060        assert!(!is_compacted_file("session.parquet"));
1061    }
1062
1063    #[test]
1064    fn test_extract_compaction_number() {
1065        assert_eq!(
1066            extract_compaction_number("zsh--__compacted-0__--uuid.parquet"),
1067            Some(0)
1068        );
1069        assert_eq!(
1070            extract_compaction_number("zsh--__compacted-42__--uuid.parquet"),
1071            Some(42)
1072        );
1073        assert_eq!(
1074            extract_compaction_number("zsh--make--uuid.parquet"),
1075            None
1076        );
1077    }
1078
1079    #[test]
1080    fn test_compact_recent_no_files() {
1081        let (_tmp, store) = setup_store();
1082
1083        let stats = store.compact_recent(2, false).unwrap();
1084        assert_eq!(stats.partitions_compacted, 0);
1085    }
1086
1087    #[test]
1088    fn test_compact_recent_with_files() {
1089        let (_tmp, store) = setup_store();
1090
1091        // Write multiple invocations to create multiple files
1092        for i in 0..5 {
1093            let record = InvocationRecord::new(
1094                "test-session",
1095                format!("command-{}", i),
1096                "/home/user",
1097                0,
1098                "test@client",
1099            );
1100            store.write_invocation(&record).unwrap();
1101        }
1102
1103        // With threshold of 2, should compact oldest 3 (keeping 2)
1104        let stats = store.compact_recent(2, false).unwrap();
1105        assert_eq!(stats.sessions_compacted, 1);
1106        assert_eq!(stats.files_before, 3); // 5 - 2 kept = 3 compacted
1107        assert_eq!(stats.files_after, 1);
1108    }
1109
1110    #[test]
1111    fn test_compact_for_session() {
1112        let (_tmp, store) = setup_store();
1113
1114        // Write files for two different sessions
1115        for i in 0..5 {
1116            let record = InvocationRecord::new(
1117                "session-a",
1118                format!("command-{}", i),
1119                "/home/user",
1120                0,
1121                "test@client",
1122            );
1123            store.write_invocation(&record).unwrap();
1124        }
1125        for i in 0..3 {
1126            let record = InvocationRecord::new(
1127                "session-b",
1128                format!("command-{}", i),
1129                "/home/user",
1130                0,
1131                "test@client",
1132            );
1133            store.write_invocation(&record).unwrap();
1134        }
1135
1136        // Compact only session-a with threshold of 2
1137        let stats = store.compact_for_session("session-a", 2, false).unwrap();
1138        assert_eq!(stats.sessions_compacted, 1);
1139        assert_eq!(stats.files_before, 3); // 5 - 2 kept = 3
1140
1141        // session-b should be untouched (only 3 files, below threshold)
1142        let date = chrono::Utc::now().date_naive();
1143        let inv_dir = store.config().attempts_dir(&date);
1144        let session_b_count = std::fs::read_dir(&inv_dir)
1145            .unwrap()
1146            .filter_map(|e| e.ok())
1147            .filter(|e| e.file_name().to_string_lossy().starts_with("session-b"))
1148            .count();
1149        assert_eq!(session_b_count, 3);
1150    }
1151
1152    #[test]
1153    fn test_compact_dry_run() {
1154        let (_tmp, store) = setup_store();
1155
1156        // Write multiple invocations
1157        for i in 0..5 {
1158            let record = InvocationRecord::new(
1159                "test-session",
1160                format!("command-{}", i),
1161                "/home/user",
1162                0,
1163                "test@client",
1164            );
1165            store.write_invocation(&record).unwrap();
1166        }
1167
1168        // Dry run should report stats but not actually compact
1169        let stats = store.compact_recent(2, true).unwrap();
1170        assert_eq!(stats.sessions_compacted, 1);
1171        assert_eq!(stats.files_before, 3);
1172
1173        // Files should still be there
1174        let date = chrono::Utc::now().date_naive();
1175        let inv_dir = store.config().attempts_dir(&date);
1176        let file_count = std::fs::read_dir(&inv_dir)
1177            .unwrap()
1178            .filter_map(|e| e.ok())
1179            .filter(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false))
1180            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1181            .count();
1182        assert_eq!(file_count, 5);
1183    }
1184
1185    #[test]
1186    fn test_compacted_file_naming() {
1187        let (_tmp, store) = setup_store();
1188
1189        // Write enough files to trigger compaction
1190        for i in 0..5 {
1191            let record = InvocationRecord::new(
1192                "zsh-9999",
1193                format!("cmd-{}", i),
1194                "/home/user",
1195                0,
1196                "test@client",
1197            );
1198            store.write_invocation(&record).unwrap();
1199        }
1200
1201        // Compact with threshold of 2
1202        store.compact_recent(2, false).unwrap();
1203
1204        // Check that compacted file has correct naming
1205        let date = chrono::Utc::now().date_naive();
1206        let inv_dir = store.config().attempts_dir(&date);
1207        let compacted_files: Vec<_> = std::fs::read_dir(&inv_dir)
1208            .unwrap()
1209            .filter_map(|e| e.ok())
1210            .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1211            .collect();
1212
1213        assert_eq!(compacted_files.len(), 1);
1214        let name = compacted_files[0].file_name().to_string_lossy().to_string();
1215        assert!(name.starts_with("zsh-9999--__compacted-0__--"));
1216        assert!(name.ends_with(".parquet"));
1217    }
1218
1219    // ===== DuckDB Mode Tests =====
1220    // In DuckDB mode, compact should be a no-op since there are no parquet files.
1221
1222    #[test]
1223    fn test_compact_duckdb_mode_no_op() {
1224        let (_tmp, store) = setup_store_duckdb();
1225
1226        // Write data in DuckDB mode (goes to local.invocations table, not parquet files)
1227        for i in 0..10 {
1228            let record = InvocationRecord::new(
1229                "test-session",
1230                format!("command-{}", i),
1231                "/home/user",
1232                0,
1233                "test@client",
1234            );
1235            store.write_invocation(&record).unwrap();
1236        }
1237
1238        // Verify data is stored (not in parquet files)
1239        let conn = store.connection().unwrap();
1240        let count: i64 = conn
1241            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1242            .unwrap();
1243        assert_eq!(count, 10, "Data should be in DuckDB table");
1244
1245        // Compact should be a no-op (no parquet files to compact)
1246        let stats = store.compact_recent(2, false).unwrap();
1247        assert_eq!(stats.partitions_compacted, 0);
1248        assert_eq!(stats.sessions_compacted, 0);
1249        assert_eq!(stats.files_before, 0);
1250        assert_eq!(stats.files_after, 0);
1251
1252        // Data should still be there
1253        let count: i64 = conn
1254            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1255            .unwrap();
1256        assert_eq!(count, 10, "Data should be unaffected");
1257    }
1258
1259    #[test]
1260    fn test_compact_for_session_duckdb_mode_no_op() {
1261        let (_tmp, store) = setup_store_duckdb();
1262
1263        // Write data in DuckDB mode
1264        for i in 0..5 {
1265            let record = InvocationRecord::new(
1266                "session-a",
1267                format!("command-{}", i),
1268                "/home/user",
1269                0,
1270                "test@client",
1271            );
1272            store.write_invocation(&record).unwrap();
1273        }
1274
1275        // Session-specific compact should also be a no-op
1276        let stats = store.compact_for_session("session-a", 2, false).unwrap();
1277        assert_eq!(stats.partitions_compacted, 0);
1278        assert_eq!(stats.sessions_compacted, 0);
1279    }
1280
1281    #[test]
1282    fn test_compact_session_today_duckdb_mode_no_op() {
1283        let (_tmp, store) = setup_store_duckdb();
1284
1285        // Write data in DuckDB mode
1286        let record = InvocationRecord::new(
1287            "test-session",
1288            "echo hello",
1289            "/home/user",
1290            0,
1291            "test@client",
1292        );
1293        store.write_invocation(&record).unwrap();
1294
1295        // Today's session compact should be a no-op
1296        let stats = store.compact_session_today("test-session", 2, false).unwrap();
1297        assert_eq!(stats.partitions_compacted, 0);
1298        assert_eq!(stats.sessions_compacted, 0);
1299    }
1300
1301    #[test]
1302    fn test_auto_compact_duckdb_mode_no_op() {
1303        let (_tmp, store) = setup_store_duckdb();
1304
1305        // Write data
1306        for i in 0..5 {
1307            let record = InvocationRecord::new(
1308                "test-session",
1309                format!("command-{}", i),
1310                "/home/user",
1311                0,
1312                "test@client",
1313            );
1314            store.write_invocation(&record).unwrap();
1315        }
1316
1317        // Auto-compact should be a no-op
1318        let opts = AutoCompactOptions::default();
1319        let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1320
1321        assert_eq!(compact_stats.partitions_compacted, 0);
1322        assert_eq!(compact_stats.sessions_compacted, 0);
1323        assert_eq!(archive_stats.partitions_archived, 0);
1324    }
1325
1326    #[test]
1327    fn test_archive_old_data_duckdb_mode_no_op() {
1328        let (_tmp, store) = setup_store_duckdb();
1329
1330        // Write data
1331        let record = InvocationRecord::new(
1332            "test-session",
1333            "echo hello",
1334            "/home/user",
1335            0,
1336            "test@client",
1337        );
1338        store.write_invocation(&record).unwrap();
1339
1340        // Archive should be a no-op (no parquet partitions to archive)
1341        let stats = store.archive_old_data(0, false).unwrap(); // 0 days = archive everything
1342        assert_eq!(stats.partitions_archived, 0);
1343        assert_eq!(stats.files_moved, 0);
1344    }
1345
1346    // ===== Archive Tests (Parquet Mode) =====
1347
1348    #[test]
1349    fn test_archive_old_data_moves_partitions() {
1350        let (_tmp, store) = setup_store();
1351
1352        // Write multiple invocations
1353        for i in 0..3 {
1354            let record = InvocationRecord::new(
1355                "test-session",
1356                format!("command-{}", i),
1357                "/home/user",
1358                0,
1359                "test@client",
1360            );
1361            store.write_invocation(&record).unwrap();
1362        }
1363
1364        // Verify files exist in recent
1365        let date = chrono::Utc::now().date_naive();
1366        let recent_dir = store.config().attempts_dir(&date);
1367        let recent_count = std::fs::read_dir(&recent_dir)
1368            .unwrap()
1369            .filter_map(|e| e.ok())
1370            .filter(|e| {
1371                e.path()
1372                    .extension()
1373                    .map(|ext| ext == "parquet")
1374                    .unwrap_or(false)
1375            })
1376            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1377            .count();
1378        assert_eq!(recent_count, 3, "Should have 3 files in recent");
1379
1380        // Archive with 0 days (archive everything)
1381        let stats = store.archive_old_data(0, false).unwrap();
1382
1383        // Archives 5 data types: attempts, outcomes, outputs, sessions, events
1384        // Only attempts has data (outcomes too but may be empty in test), but partitions exist
1385        assert!(stats.partitions_archived >= 1, "Should archive at least 1 partition");
1386        assert!(stats.files_moved > 0, "Should move files");
1387
1388        // Verify files moved to archive
1389        let archive_dir = store
1390            .config()
1391            .archive_dir()
1392            .join("attempts")
1393            .join(format!("date={}", date));
1394        assert!(archive_dir.exists(), "Archive partition should exist");
1395
1396        // Archive consolidates to single file
1397        let archive_files: Vec<_> = std::fs::read_dir(&archive_dir)
1398            .unwrap()
1399            .filter_map(|e| e.ok())
1400            .filter(|e| {
1401                e.path()
1402                    .extension()
1403                    .map(|ext| ext == "parquet")
1404                    .unwrap_or(false)
1405            })
1406            .collect();
1407        assert_eq!(archive_files.len(), 1, "Archive should have 1 consolidated file");
1408
1409        // Recent partition should be removed or empty (only seed files remain)
1410        let remaining = std::fs::read_dir(&recent_dir)
1411            .map(|entries| {
1412                entries
1413                    .filter_map(|e| e.ok())
1414                    .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1415                    .count()
1416            })
1417            .unwrap_or(0);
1418        assert_eq!(remaining, 0, "Recent partition should have no data files");
1419    }
1420
1421    #[test]
1422    fn test_archive_dry_run() {
1423        let (_tmp, store) = setup_store();
1424
1425        // Write data
1426        for i in 0..3 {
1427            let record = InvocationRecord::new(
1428                "test-session",
1429                format!("command-{}", i),
1430                "/home/user",
1431                0,
1432                "test@client",
1433            );
1434            store.write_invocation(&record).unwrap();
1435        }
1436
1437        // Dry run should report stats but not move files
1438        let stats = store.archive_old_data(0, true).unwrap();
1439        // Archives 4 data types, but counts partitions with files
1440        assert!(stats.partitions_archived >= 1, "Should report at least 1 partition");
1441        assert!(stats.files_moved > 0, "Should report files to move");
1442
1443        // Files should still be in recent
1444        let date = chrono::Utc::now().date_naive();
1445        let recent_dir = store.config().attempts_dir(&date);
1446        let recent_count = std::fs::read_dir(&recent_dir)
1447            .unwrap()
1448            .filter_map(|e| e.ok())
1449            .filter(|e| {
1450                e.path()
1451                    .extension()
1452                    .map(|ext| ext == "parquet")
1453                    .unwrap_or(false)
1454            })
1455            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1456            .count();
1457        assert_eq!(recent_count, 3, "Files should still be in recent after dry run");
1458    }
1459
1460    #[test]
1461    fn test_archive_respects_age_threshold() {
1462        let (_tmp, store) = setup_store();
1463
1464        // Write data today
1465        let record = InvocationRecord::new(
1466            "test-session",
1467            "echo hello",
1468            "/home/user",
1469            0,
1470            "test@client",
1471        );
1472        store.write_invocation(&record).unwrap();
1473
1474        // Archive with 7 days threshold - today's data should NOT be archived
1475        let stats = store.archive_old_data(7, false).unwrap();
1476        assert_eq!(stats.partitions_archived, 0, "Today's data should not be archived with 7 day threshold");
1477
1478        // Verify data is still queryable (via filesystem check, not view)
1479        let date = chrono::Utc::now().date_naive();
1480        let recent_dir = store.config().attempts_dir(&date);
1481        let file_count = std::fs::read_dir(&recent_dir)
1482            .unwrap()
1483            .filter_map(|e| e.ok())
1484            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1485            .count();
1486        assert_eq!(file_count, 1, "Data file should still exist");
1487    }
1488
1489    // ===== Consolidation Tests =====
1490
1491    #[test]
1492    fn test_consolidate_merges_all_files() {
1493        let (_tmp, store) = setup_store();
1494
1495        // Write multiple files
1496        for i in 0..5 {
1497            let record = InvocationRecord::new(
1498                "test-session",
1499                format!("command-{}", i),
1500                "/home/user",
1501                0,
1502                "test@client",
1503            );
1504            store.write_invocation(&record).unwrap();
1505        }
1506
1507        // Compact first to create a compacted file
1508        store.compact_recent(2, false).unwrap();
1509
1510        // Now consolidate everything
1511        let opts = CompactOptions {
1512            consolidate: true,
1513            ..Default::default()
1514        };
1515        let stats = store.compact_recent_with_opts(&opts).unwrap();
1516
1517        // Should consolidate all files (compacted + remaining) into one
1518        assert!(stats.sessions_compacted > 0, "Should consolidate session files");
1519
1520        // Verify single file remains
1521        let date = chrono::Utc::now().date_naive();
1522        let inv_dir = store.config().attempts_dir(&date);
1523        let file_count = std::fs::read_dir(&inv_dir)
1524            .unwrap()
1525            .filter_map(|e| e.ok())
1526            .filter(|e| {
1527                e.path()
1528                    .extension()
1529                    .map(|ext| ext == "parquet")
1530                    .unwrap_or(false)
1531            })
1532            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1533            .count();
1534        assert_eq!(file_count, 1, "Should have single consolidated file");
1535    }
1536
1537    #[test]
1538    fn test_recompact_threshold() {
1539        let (_tmp, store) = setup_store();
1540
1541        // Create many compacted files by doing multiple compact cycles
1542        // First, write and compact several batches
1543        for batch in 0..3 {
1544            for i in 0..5 {
1545                let record = InvocationRecord::new(
1546                    "test-session",
1547                    format!("batch-{}-cmd-{}", batch, i),
1548                    "/home/user",
1549                    0,
1550                    "test@client",
1551                );
1552                store.write_invocation(&record).unwrap();
1553            }
1554            // Compact after each batch (creates compacted files)
1555            store.compact_recent(2, false).unwrap();
1556        }
1557
1558        // Count compacted files
1559        let date = chrono::Utc::now().date_naive();
1560        let inv_dir = store.config().attempts_dir(&date);
1561        let compacted_count = std::fs::read_dir(&inv_dir)
1562            .unwrap()
1563            .filter_map(|e| e.ok())
1564            .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1565            .count();
1566
1567        // With recompact_threshold=2, should trigger re-compaction
1568        let opts = CompactOptions {
1569            file_threshold: 50, // High threshold so we don't compact non-compacted
1570            recompact_threshold: 2,
1571            ..Default::default()
1572        };
1573        let stats = store.compact_recent_with_opts(&opts).unwrap();
1574
1575        // If we had enough compacted files, should have re-compacted
1576        if compacted_count >= 2 {
1577            assert!(
1578                stats.sessions_compacted > 0 || stats.files_before > 0,
1579                "Should trigger re-compaction when threshold exceeded"
1580            );
1581        }
1582    }
1583
1584    #[test]
1585    fn test_auto_compact_parquet_mode() {
1586        let (_tmp, store) = setup_store();
1587
1588        // Write enough files to trigger compaction
1589        for i in 0..10 {
1590            let record = InvocationRecord::new(
1591                "test-session",
1592                format!("command-{}", i),
1593                "/home/user",
1594                0,
1595                "test@client",
1596            );
1597            store.write_invocation(&record).unwrap();
1598        }
1599
1600        // Verify files before compact
1601        let date = chrono::Utc::now().date_naive();
1602        let inv_dir = store.config().attempts_dir(&date);
1603        let files_before = std::fs::read_dir(&inv_dir)
1604            .unwrap()
1605            .filter_map(|e| e.ok())
1606            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1607            .count();
1608        assert_eq!(files_before, 10, "Should have 10 files before compact");
1609
1610        // Run auto_compact with low threshold
1611        let opts = AutoCompactOptions {
1612            compact: CompactOptions {
1613                file_threshold: 3,
1614                ..Default::default()
1615            },
1616            archive_days: 14, // Don't archive today's data
1617        };
1618        let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1619
1620        // Should compact but not archive (data is from today)
1621        assert!(compact_stats.sessions_compacted > 0, "Should compact files");
1622        assert_eq!(archive_stats.partitions_archived, 0, "Should not archive today's data");
1623
1624        // Verify files were compacted (fewer files now)
1625        let files_after = std::fs::read_dir(&inv_dir)
1626            .unwrap()
1627            .filter_map(|e| e.ok())
1628            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1629            .count();
1630        assert!(files_after < files_before, "Should have fewer files after compact");
1631    }
1632
1633    #[test]
1634    fn test_compact_preserves_data_integrity() {
1635        let (_tmp, store) = setup_store();
1636
1637        // Write known data
1638        let commands: Vec<String> = (0..10).map(|i| format!("command-{}", i)).collect();
1639        for cmd in &commands {
1640            let record = InvocationRecord::new(
1641                "test-session",
1642                cmd.clone(),
1643                "/home/user",
1644                0,
1645                "test@client",
1646            );
1647            store.write_invocation(&record).unwrap();
1648        }
1649
1650        // Verify data before compact
1651        let conn = store.connection().unwrap();
1652        let count_before: i64 = conn
1653            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1654            .unwrap();
1655        assert_eq!(count_before, 10);
1656
1657        // Compact
1658        store.compact_recent(2, false).unwrap();
1659
1660        // Verify data after compact - count should be same
1661        let count_after: i64 = conn
1662            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1663            .unwrap();
1664        assert_eq!(count_after, 10, "Compaction should preserve all records");
1665
1666        // Verify all commands are present
1667        let mut found_cmds: Vec<String> = conn
1668            .prepare("SELECT cmd FROM local.invocations ORDER BY cmd")
1669            .unwrap()
1670            .query_map([], |r| r.get(0))
1671            .unwrap()
1672            .filter_map(|r| r.ok())
1673            .collect();
1674        found_cmds.sort();
1675        let mut expected = commands.clone();
1676        expected.sort();
1677        assert_eq!(found_cmds, expected, "All commands should be preserved");
1678    }
1679
1680    // ===== Clean/Prune Tests =====
1681
1682    #[test]
1683    fn test_clean_recovers_orphaned() {
1684        let (_tmp, store) = setup_store();
1685
1686        // Create a pending invocation with a dead PID
1687        // v5: Just write the attempt - no pending file needed
1688        let record = InvocationRecord::new_pending_local(
1689            "test-session",
1690            "crashed-command",
1691            "/home/user",
1692            999999999, // PID that doesn't exist
1693            "test@client",
1694        );
1695
1696        // v5: write_invocation writes only an attempt (no outcome) for pending records
1697        store.write_invocation(&record).unwrap();
1698
1699        // Run clean - should find the pending attempt via the invocations view
1700        let opts = CleanOptions::default();
1701        let stats = store.clean(&opts).unwrap();
1702
1703        assert_eq!(stats.pending_checked, 1);
1704        assert_eq!(stats.orphaned, 1);
1705        assert_eq!(stats.still_running, 0);
1706    }
1707
1708    #[test]
1709    fn test_clean_dry_run() {
1710        let (_tmp, store) = setup_store();
1711
1712        // Create a pending invocation with a dead PID
1713        let record = InvocationRecord::new_pending_local(
1714            "test-session",
1715            "crashed-command",
1716            "/home/user",
1717            999999999, // PID that doesn't exist
1718            "test@client",
1719        );
1720
1721        // v5: write_invocation writes only an attempt (no outcome) for pending records
1722        store.write_invocation(&record).unwrap();
1723
1724        // Run clean in dry-run mode
1725        let opts = CleanOptions {
1726            dry_run: true,
1727            ..Default::default()
1728        };
1729        let stats = store.clean(&opts).unwrap();
1730
1731        // Stats should report the orphan
1732        assert_eq!(stats.pending_checked, 1);
1733        assert_eq!(stats.orphaned, 1);
1734
1735        // v5: Verify the attempt still has no outcome (dry-run didn't create one)
1736        let pending = store.get_pending_attempts().unwrap();
1737        assert_eq!(pending.len(), 1, "Attempt should still be pending after dry run");
1738    }
1739
1740    #[test]
1741    fn test_prune_archive() {
1742        let (tmp, store) = setup_store();
1743
1744        // Write some data
1745        for i in 0..3 {
1746            let record = InvocationRecord::new(
1747                "test-session",
1748                format!("command-{}", i),
1749                "/home/user",
1750                0,
1751                "test@client",
1752            );
1753            store.write_invocation(&record).unwrap();
1754        }
1755
1756        // Archive everything (0 days = archive all)
1757        let archive_stats = store.archive_old_data(0, false).unwrap();
1758        assert!(archive_stats.partitions_archived > 0, "Should archive data");
1759
1760        // Verify archive exists
1761        let archive_attempts = tmp.path().join("db/data/archive/attempts");
1762        assert!(archive_attempts.exists(), "Archive should exist");
1763
1764        // Prune with 0 days (prune everything)
1765        let prune_stats = store.prune_archive(0, false).unwrap();
1766        assert!(prune_stats.files_pruned > 0, "Should prune files");
1767        assert!(prune_stats.bytes_freed > 0, "Should free bytes");
1768    }
1769
1770    #[test]
1771    fn test_clean_with_prune() {
1772        let (_tmp, store) = setup_store();
1773
1774        // Write and archive some completed invocations first
1775        for i in 0..3 {
1776            let record = InvocationRecord::new(
1777                "another-session",
1778                format!("command-{}", i),
1779                "/home/user",
1780                0,
1781                "test@client",
1782            );
1783            store.write_invocation(&record).unwrap();
1784        }
1785        store.archive_old_data(0, false).unwrap();
1786
1787        // Now create orphaned pending invocation (after archiving, so it stays in recent)
1788        // v5: just write the attempt, no pending file needed
1789        let record = InvocationRecord::new_pending_local(
1790            "test-session",
1791            "crashed-command",
1792            "/home/user",
1793            999999999,
1794            "test@client",
1795        );
1796        store.write_invocation(&record).unwrap();
1797
1798        // Run clean with prune
1799        let opts = CleanOptions {
1800            prune: true,
1801            older_than_days: 0, // Prune everything
1802            ..Default::default()
1803        };
1804        let stats = store.clean(&opts).unwrap();
1805
1806        // Should both recover orphaned and prune archive
1807        assert_eq!(stats.orphaned, 1, "Should recover 1 orphaned");
1808        assert!(stats.pruned_files > 0, "Should prune archive files");
1809    }
1810}