Skip to main content

magic_bird/store/
compact.rs

1//! Compaction and archival operations.
2//!
3//! Supports two modes:
4//! - **Client-specific**: Compact files for a single session/client (used by shell hooks)
5//! - **Global**: Compact files across all sessions (used for scheduled maintenance)
6
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::{NaiveDate, Utc};
12use uuid::Uuid;
13
14use super::atomic;
15use super::Store;
16use crate::Result;
17
18/// Check if a filename is a compacted file (contains `__compacted-N__`).
19fn is_compacted_file(name: &str) -> bool {
20    name.contains("__compacted-")
21}
22
23/// Check if a filename is a seed file.
24fn is_seed_file(name: &str) -> bool {
25    name.starts_with("_seed")
26}
27
28/// Extract the session/group key from a filename.
29///
30/// For files like `session--cmd--uuid.parquet`, returns `session`.
31/// For files like `session_id.parquet`, returns `session_id`.
32/// For compacted files like `session--__compacted-0__--uuid.parquet`, returns `session`.
33fn extract_session(name: &str) -> Option<String> {
34    let stem = name.strip_suffix(".parquet")?;
35
36    // Find first segment before "--"
37    if let Some(idx) = stem.find("--") {
38        Some(stem[..idx].to_string())
39    } else {
40        // No "--", use whole stem (e.g., session files)
41        Some(stem.to_string())
42    }
43}
44
45/// Extract the compaction sequence number from a filename, if present.
46fn extract_compaction_number(name: &str) -> Option<u32> {
47    // Pattern: __compacted-N__
48    if let Some(start) = name.find("__compacted-") {
49        let after_prefix = &name[start + 12..]; // skip "__compacted-"
50        if let Some(end) = after_prefix.find("__") {
51            if let Ok(n) = after_prefix[..end].parse::<u32>() {
52                return Some(n);
53            }
54        }
55    }
56    None
57}
58
59/// Find the next compaction sequence number for a session in a partition.
60fn next_compaction_number(partition_dir: &Path, session: &str) -> u32 {
61    fs::read_dir(partition_dir)
62        .ok()
63        .map(|entries| {
64            entries
65                .filter_map(|e| e.ok())
66                .filter_map(|e| {
67                    let name = e.file_name().to_string_lossy().to_string();
68                    // Only consider compacted files for this session
69                    if name.starts_with(&format!("{}--__compacted-", session)) {
70                        extract_compaction_number(&name)
71                    } else {
72                        None
73                    }
74                })
75                .max()
76                .map(|n| n + 1)
77                .unwrap_or(0)
78        })
79        .unwrap_or(0)
80}
81
82/// Get modification time for sorting files by age.
83fn file_mtime(path: &Path) -> u64 {
84    fs::metadata(path)
85        .and_then(|m| m.modified())
86        .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs())
87        .unwrap_or(0)
88}
89
90/// Statistics from a compaction operation.
91#[derive(Debug, Default)]
92pub struct CompactStats {
93    pub partitions_compacted: usize,
94    pub sessions_compacted: usize,
95    pub files_before: usize,
96    pub files_after: usize,
97    pub bytes_before: u64,
98    pub bytes_after: u64,
99}
100
101impl CompactStats {
102    pub fn add(&mut self, other: &CompactStats) {
103        self.partitions_compacted += other.partitions_compacted;
104        self.sessions_compacted += other.sessions_compacted;
105        self.files_before += other.files_before;
106        self.files_after += other.files_after;
107        self.bytes_before += other.bytes_before;
108        self.bytes_after += other.bytes_after;
109    }
110}
111
112/// Statistics from an archive operation.
113#[derive(Debug, Default)]
114pub struct ArchiveStats {
115    pub partitions_archived: usize,
116    pub files_moved: usize,
117    pub bytes_moved: u64,
118}
119
120/// Options for compaction operations.
121#[derive(Debug, Clone)]
122pub struct CompactOptions {
123    /// Compact when a session has more than this many non-compacted files.
124    pub file_threshold: usize,
125    /// Re-compact when a session has more than this many compacted files.
126    /// Set to 0 to disable re-compaction.
127    pub recompact_threshold: usize,
128    /// If true, consolidate ALL files (including compacted) into a single file.
129    pub consolidate: bool,
130    /// If true, don't actually make changes.
131    pub dry_run: bool,
132    /// If set, only compact files for this specific session.
133    pub session_filter: Option<String>,
134}
135
136impl Default for CompactOptions {
137    fn default() -> Self {
138        Self {
139            file_threshold: 50,
140            recompact_threshold: 10,
141            consolidate: false,
142            dry_run: false,
143            session_filter: None,
144        }
145    }
146}
147
148/// Options for auto-compaction (includes archive settings).
149#[derive(Debug)]
150pub struct AutoCompactOptions {
151    /// Compact options.
152    pub compact: CompactOptions,
153    /// Migrate data older than this many days to archive.
154    pub archive_days: u32,
155}
156
157impl Default for AutoCompactOptions {
158    fn default() -> Self {
159        Self {
160            compact: CompactOptions::default(),
161            archive_days: 14,
162        }
163    }
164}
165
166/// Options for clean/prune operations.
167#[derive(Debug, Clone)]
168pub struct CleanOptions {
169    /// If true, don't actually make changes.
170    pub dry_run: bool,
171    /// Maximum age in hours for pending files before they're considered stale.
172    pub max_age_hours: u32,
173    /// If true, also prune old archive data.
174    pub prune: bool,
175    /// Delete archive data older than this many days (when prune=true).
176    pub older_than_days: u32,
177}
178
179impl Default for CleanOptions {
180    fn default() -> Self {
181        Self {
182            dry_run: false,
183            max_age_hours: 24,
184            prune: false,
185            older_than_days: 365,
186        }
187    }
188}
189
190/// Statistics from a clean operation.
191#[derive(Debug, Default)]
192pub struct CleanStats {
193    /// Number of pending files checked.
194    pub pending_checked: usize,
195    /// Number of invocations still running.
196    pub still_running: usize,
197    /// Number of invocations marked as orphaned.
198    pub orphaned: usize,
199    /// Number of archive files pruned.
200    pub pruned_files: usize,
201    /// Bytes freed from pruning.
202    pub bytes_freed: u64,
203}
204
205impl Store {
206    /// Compact files for a specific session in a partition.
207    ///
208    /// Keeps the most recent `keep_count` files, compacts the rest.
209    /// Uses naming: `<session>--__compacted-N__--<uuid>.parquet`
210    fn compact_session_files(
211        &self,
212        partition_dir: &Path,
213        session: &str,
214        files: &mut Vec<PathBuf>,
215        keep_count: usize,
216        dry_run: bool,
217    ) -> Result<CompactStats> {
218        // Sort by modification time (oldest first)
219        files.sort_by_key(|p| file_mtime(p));
220
221        // Keep the most recent `keep_count` files
222        let to_keep = files.len().saturating_sub(keep_count).max(0);
223        if to_keep < 2 {
224            // Need at least 2 files to compact
225            return Ok(CompactStats::default());
226        }
227
228        let files_to_compact: Vec<PathBuf> = files.drain(..to_keep).collect();
229        let files_before = files_to_compact.len();
230        let bytes_before: u64 = files_to_compact
231            .iter()
232            .filter_map(|p| fs::metadata(p).ok())
233            .map(|m| m.len())
234            .sum();
235
236        if dry_run {
237            return Ok(CompactStats {
238                partitions_compacted: 0, // Will be set by caller
239                sessions_compacted: 1,
240                files_before,
241                files_after: 1,
242                bytes_before,
243                bytes_after: bytes_before, // Estimate
244            });
245        }
246
247        // Use minimal connection to avoid view setup overhead
248        let conn = self.connection_with_options(false)?;
249
250        // Build list of files to read
251        let file_list: Vec<String> = files_to_compact
252            .iter()
253            .map(|p| p.display().to_string())
254            .collect();
255        let file_list_sql = file_list
256            .iter()
257            .map(|f| format!("'{}'", f))
258            .collect::<Vec<_>>()
259            .join(", ");
260
261        // Create temp table with data from selected files
262        conn.execute(
263            &format!(
264                "CREATE OR REPLACE TEMP TABLE compact_temp AS
265                 SELECT * FROM read_parquet([{}], union_by_name = true)",
266                file_list_sql
267            ),
268            [],
269        )?;
270
271        // Generate compacted filename: <session>--__compacted-N__--<uuid>.parquet
272        let seq_num = next_compaction_number(partition_dir, session);
273        let uuid = Uuid::now_v7();
274        let compacted_name = format!("{}--__compacted-{}__--{}.parquet", session, seq_num, uuid);
275        let compacted_path = partition_dir.join(&compacted_name);
276        let temp_path = atomic::temp_path(&compacted_path);
277
278        conn.execute(
279            &format!(
280                "COPY compact_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
281                temp_path.display()
282            ),
283            [],
284        )?;
285
286        conn.execute("DROP TABLE compact_temp", [])?;
287
288        // Get size of new file
289        let bytes_after = fs::metadata(&temp_path)?.len();
290
291        // Remove old files
292        for file in &files_to_compact {
293            fs::remove_file(file)?;
294        }
295
296        // Rename compacted file to final location
297        atomic::rename_into_place(&temp_path, &compacted_path)?;
298
299        Ok(CompactStats {
300            partitions_compacted: 0, // Will be set by caller
301            sessions_compacted: 1,
302            files_before,
303            files_after: 1,
304            bytes_before,
305            bytes_after,
306        })
307    }
308
309    /// Consolidate ALL files for a session into a single file.
310    ///
311    /// Unlike regular compaction, this merges everything including previously
312    /// compacted files into a single `data_0.parquet` file.
313    fn consolidate_session_files(
314        &self,
315        partition_dir: &Path,
316        _session: &str,
317        files: Vec<PathBuf>,
318        dry_run: bool,
319    ) -> Result<CompactStats> {
320        if files.len() < 2 {
321            return Ok(CompactStats::default());
322        }
323
324        let files_before = files.len();
325        let bytes_before: u64 = files
326            .iter()
327            .filter_map(|p| fs::metadata(p).ok())
328            .map(|m| m.len())
329            .sum();
330
331        if dry_run {
332            return Ok(CompactStats {
333                partitions_compacted: 0,
334                sessions_compacted: 1,
335                files_before,
336                files_after: 1,
337                bytes_before,
338                bytes_after: bytes_before,
339            });
340        }
341
342        // Use minimal connection to avoid view setup overhead
343        let conn = self.connection_with_options(false)?;
344
345        // Build list of files to read
346        let file_list_sql = files
347            .iter()
348            .map(|p| format!("'{}'", p.display()))
349            .collect::<Vec<_>>()
350            .join(", ");
351
352        // Create temp table with data from all files
353        conn.execute(
354            &format!(
355                "CREATE OR REPLACE TEMP TABLE consolidate_temp AS
356                 SELECT * FROM read_parquet([{}], union_by_name = true)",
357                file_list_sql
358            ),
359            [],
360        )?;
361
362        // Use simple data_0.parquet naming for consolidated files
363        let uuid = Uuid::now_v7();
364        let consolidated_name = format!("data_{}.parquet", uuid);
365        let consolidated_path = partition_dir.join(&consolidated_name);
366        let temp_path = atomic::temp_path(&consolidated_path);
367
368        conn.execute(
369            &format!(
370                "COPY consolidate_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
371                temp_path.display()
372            ),
373            [],
374        )?;
375
376        conn.execute("DROP TABLE consolidate_temp", [])?;
377
378        let bytes_after = fs::metadata(&temp_path)?.len();
379
380        // Remove old files
381        for file in &files {
382            fs::remove_file(file)?;
383        }
384
385        // Rename consolidated file to final location
386        atomic::rename_into_place(&temp_path, &consolidated_path)?;
387
388        Ok(CompactStats {
389            partitions_compacted: 0,
390            sessions_compacted: 1,
391            files_before,
392            files_after: 1,
393            bytes_before,
394            bytes_after,
395        })
396    }
397
398    /// Compact files in a partition directory, grouped by session.
399    ///
400    /// Behavior depends on options:
401    /// - `consolidate`: Merge ALL files into single file per session
402    /// - `file_threshold`: Compact when > N non-compacted files
403    /// - `recompact_threshold`: Re-compact when > N compacted files exist
404    pub fn compact_partition_with_opts(
405        &self,
406        partition_dir: &Path,
407        opts: &CompactOptions,
408    ) -> Result<CompactStats> {
409        let mut total_stats = CompactStats::default();
410
411        // Group files by session, separating compacted from non-compacted
412        let mut session_files: HashMap<String, (Vec<PathBuf>, Vec<PathBuf>)> = HashMap::new();
413
414        for entry in fs::read_dir(partition_dir)? {
415            let entry = entry?;
416            let path = entry.path();
417            let name = entry.file_name().to_string_lossy().to_string();
418
419            // Skip non-parquet and seed files
420            if !name.ends_with(".parquet") || is_seed_file(&name) {
421                continue;
422            }
423
424            // Extract session from filename
425            if let Some(session) = extract_session(&name) {
426                // Apply session filter if specified
427                if let Some(ref filter) = opts.session_filter {
428                    if session != *filter {
429                        continue;
430                    }
431                }
432
433                let entry = session_files.entry(session).or_insert_with(|| (vec![], vec![]));
434                if is_compacted_file(&name) || name.starts_with("data_") {
435                    entry.1.push(path); // compacted/consolidated
436                } else {
437                    entry.0.push(path); // non-compacted
438                }
439            }
440        }
441
442        let mut any_compacted = false;
443        for (session, (non_compacted, compacted)) in session_files {
444            if opts.consolidate {
445                // Consolidate mode: merge ALL files into one
446                let all_files: Vec<PathBuf> = non_compacted.into_iter().chain(compacted).collect();
447                if all_files.len() >= 2 {
448                    let stats = self.consolidate_session_files(
449                        partition_dir,
450                        &session,
451                        all_files,
452                        opts.dry_run,
453                    )?;
454                    if stats.sessions_compacted > 0 {
455                        any_compacted = true;
456                        total_stats.add(&stats);
457                    }
458                }
459            } else {
460                // Regular compaction mode
461
462                // First: compact non-compacted files if threshold exceeded
463                if non_compacted.len() >= opts.file_threshold {
464                    let mut to_process = non_compacted;
465                    let stats = self.compact_session_files(
466                        partition_dir,
467                        &session,
468                        &mut to_process,
469                        opts.file_threshold,
470                        opts.dry_run,
471                    )?;
472                    if stats.sessions_compacted > 0 {
473                        any_compacted = true;
474                        total_stats.add(&stats);
475                    }
476                }
477
478                // Second: re-compact compacted files if recompact threshold exceeded
479                if opts.recompact_threshold > 0 && compacted.len() >= opts.recompact_threshold {
480                    let stats = self.consolidate_session_files(
481                        partition_dir,
482                        &session,
483                        compacted,
484                        opts.dry_run,
485                    )?;
486                    if stats.sessions_compacted > 0 {
487                        any_compacted = true;
488                        total_stats.add(&stats);
489                    }
490                }
491            }
492        }
493
494        if any_compacted {
495            total_stats.partitions_compacted = 1;
496        }
497
498        Ok(total_stats)
499    }
500
501    /// Compact files in a partition directory (legacy API).
502    pub fn compact_partition(
503        &self,
504        partition_dir: &Path,
505        file_threshold: usize,
506        session_filter: Option<&str>,
507        dry_run: bool,
508    ) -> Result<CompactStats> {
509        let opts = CompactOptions {
510            file_threshold,
511            recompact_threshold: 0, // Disable re-compaction for legacy API
512            consolidate: false,
513            dry_run,
514            session_filter: session_filter.map(|s| s.to_string()),
515        };
516        self.compact_partition_with_opts(partition_dir, &opts)
517    }
518
519    /// Compact all partitions in a data type directory (invocations, outputs, sessions).
520    pub fn compact_data_type(
521        &self,
522        data_dir: &Path,
523        file_threshold: usize,
524        session_filter: Option<&str>,
525        dry_run: bool,
526    ) -> Result<CompactStats> {
527        let mut total_stats = CompactStats::default();
528
529        if !data_dir.exists() {
530            return Ok(total_stats);
531        }
532
533        // Iterate over partitions (may have status= level for invocations)
534        for entry in fs::read_dir(data_dir)? {
535            let entry = entry?;
536            let path = entry.path();
537
538            if !path.is_dir() {
539                continue;
540            }
541
542            let dir_name = entry.file_name().to_string_lossy().to_string();
543
544            // Check if this is a status= partition (for invocations)
545            if dir_name.starts_with("status=") {
546                // Recurse into status partition to find date partitions
547                let stats =
548                    self.compact_data_type(&path, file_threshold, session_filter, dry_run)?;
549                total_stats.add(&stats);
550            } else {
551                // This is a date= partition or other partition
552                let stats =
553                    self.compact_partition(&path, file_threshold, session_filter, dry_run)?;
554                total_stats.add(&stats);
555            }
556        }
557
558        Ok(total_stats)
559    }
560
561    /// Compact recent data for a specific session (used by shell hooks).
562    ///
563    /// Checks all date partitions in recent data.
564    pub fn compact_for_session(
565        &self,
566        session_id: &str,
567        file_threshold: usize,
568        dry_run: bool,
569    ) -> Result<CompactStats> {
570        let mut total_stats = CompactStats::default();
571        let recent_dir = self.config().recent_dir();
572
573        for data_type in &["invocations", "outputs", "sessions", "events"] {
574            let data_dir = recent_dir.join(data_type);
575            let stats =
576                self.compact_data_type(&data_dir, file_threshold, Some(session_id), dry_run)?;
577            total_stats.add(&stats);
578        }
579
580        Ok(total_stats)
581    }
582
583    /// Fast compaction check for today's partition only (used by shell hooks).
584    ///
585    /// This is the most lightweight check - only looks at today's date partition.
586    pub fn compact_session_today(
587        &self,
588        session_id: &str,
589        file_threshold: usize,
590        dry_run: bool,
591    ) -> Result<CompactStats> {
592        let mut total_stats = CompactStats::default();
593        let recent_dir = self.config().recent_dir();
594        let today = Utc::now().date_naive();
595        let date_partition = format!("date={}", today.format("%Y-%m-%d"));
596
597        for data_type in &["invocations", "outputs", "sessions", "events"] {
598            let partition_dir = recent_dir.join(data_type).join(&date_partition);
599            if partition_dir.exists() {
600                let stats = self.compact_partition(
601                    &partition_dir,
602                    file_threshold,
603                    Some(session_id),
604                    dry_run,
605                )?;
606                total_stats.add(&stats);
607            }
608        }
609
610        Ok(total_stats)
611    }
612
613    /// Compact all recent data that exceeds the file threshold (global mode).
614    pub fn compact_recent(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
615        let mut total_stats = CompactStats::default();
616        let recent_dir = self.config().recent_dir();
617
618        for data_type in &["invocations", "outputs", "sessions", "events"] {
619            let data_dir = recent_dir.join(data_type);
620            let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
621            total_stats.add(&stats);
622        }
623
624        Ok(total_stats)
625    }
626
627    /// Compact all archive data that exceeds the file threshold.
628    pub fn compact_archive(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
629        let mut total_stats = CompactStats::default();
630        let archive_dir = self.config().archive_dir();
631
632        for data_type in &["invocations", "outputs", "sessions", "events"] {
633            let data_dir = archive_dir.join(data_type);
634            let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
635            total_stats.add(&stats);
636        }
637
638        Ok(total_stats)
639    }
640
641    /// Migrate old data from recent to archive with consolidation.
642    ///
643    /// Consolidates all files in each date partition into a single parquet file
644    /// in the archive, then removes the source files.
645    pub fn archive_old_data(&self, older_than_days: u32, dry_run: bool) -> Result<ArchiveStats> {
646        let mut stats = ArchiveStats::default();
647        let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
648
649        let recent_dir = self.config().recent_dir();
650        let archive_dir = self.config().archive_dir();
651
652        for data_type in &["invocations", "outputs", "sessions", "events"] {
653            let recent_data_dir = recent_dir.join(data_type);
654            let archive_data_dir = archive_dir.join(data_type);
655
656            if !recent_data_dir.exists() {
657                continue;
658            }
659
660            // Collect partitions to archive
661            // Use <= so that "older_than_days=0" archives everything including today
662            // Skip seed partitions (date=1970-01-01) which contain schema-only files
663            let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
664
665            // For invocations, we need to look inside status=completed/ for date partitions
666            // Other data types have date partitions directly
667            let date_partition_parent = if *data_type == "invocations" {
668                recent_data_dir.join("status=completed")
669            } else {
670                recent_data_dir.clone()
671            };
672
673            if !date_partition_parent.exists() {
674                continue;
675            }
676
677            let partitions_to_archive: Vec<(NaiveDate, PathBuf, String)> = fs::read_dir(&date_partition_parent)?
678                .filter_map(|e| e.ok())
679                .filter_map(|e| {
680                    let path = e.path();
681                    if !path.is_dir() {
682                        return None;
683                    }
684                    let dir_name = e.file_name().to_string_lossy().to_string();
685                    let date_str = dir_name.strip_prefix("date=")?;
686                    let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?;
687                    // Skip seed partition
688                    if date == seed_date {
689                        return None;
690                    }
691                    if date <= cutoff_date {
692                        Some((date, path, dir_name))
693                    } else {
694                        None
695                    }
696                })
697                .collect();
698
699            for (_date, partition_path, dir_name) in partitions_to_archive {
700                let dest_dir = archive_data_dir.join(&dir_name);
701
702                // Count source stats
703                let source_files: Vec<PathBuf> = fs::read_dir(&partition_path)?
704                    .filter_map(|e| e.ok())
705                    .map(|e| e.path())
706                    .filter(|p| p.extension().map(|ext| ext == "parquet").unwrap_or(false))
707                    .collect();
708
709                if source_files.is_empty() {
710                    continue;
711                }
712
713                let file_count = source_files.len();
714                let bytes_before: u64 = source_files
715                    .iter()
716                    .filter_map(|p| fs::metadata(p).ok())
717                    .map(|m| m.len())
718                    .sum();
719
720                if dry_run {
721                    stats.partitions_archived += 1;
722                    stats.files_moved += file_count;
723                    stats.bytes_moved += bytes_before;
724                    continue;
725                }
726
727                // Skip if archive partition already exists with data
728                if dest_dir.exists() {
729                    let existing_files = fs::read_dir(&dest_dir)?
730                        .filter_map(|e| e.ok())
731                        .any(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false));
732                    if existing_files {
733                        // Already archived, skip
734                        continue;
735                    }
736                }
737
738                fs::create_dir_all(&dest_dir)?;
739
740                // Consolidate using DuckDB's COPY
741                // Use minimal connection to avoid view setup (which can fail if
742                // some data type directories are empty)
743                let conn = self.connection_with_options(false)?;
744                let src_glob = format!("{}/*.parquet", partition_path.display());
745                let dest_file = dest_dir.join("data_0.parquet");
746                let temp_file = dest_dir.join(".data_0.parquet.tmp");
747
748                conn.execute(
749                    &format!(
750                        "COPY (SELECT * FROM read_parquet('{}', union_by_name = true)) \
751                         TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
752                        src_glob,
753                        temp_file.display()
754                    ),
755                    [],
756                )?;
757
758                // Atomic rename
759                fs::rename(&temp_file, &dest_file)?;
760
761                // Get consolidated size
762                let bytes_after = fs::metadata(&dest_file)?.len();
763
764                // Remove source files
765                for file in &source_files {
766                    let _ = fs::remove_file(file);
767                }
768                let _ = fs::remove_dir(&partition_path);
769
770                stats.partitions_archived += 1;
771                stats.files_moved += file_count;
772                stats.bytes_moved += bytes_after;
773            }
774        }
775
776        Ok(stats)
777    }
778
779    /// Run auto-compaction based on options.
780    pub fn auto_compact(&self, opts: &AutoCompactOptions) -> Result<(CompactStats, ArchiveStats)> {
781        // First, archive old data (skip if doing session-specific compaction)
782        let archive_stats = if opts.compact.session_filter.is_none() {
783            self.archive_old_data(opts.archive_days, opts.compact.dry_run)?
784        } else {
785            ArchiveStats::default()
786        };
787
788        // Then compact based on mode
789        let compact_stats = if let Some(ref session) = opts.compact.session_filter {
790            // Client-specific mode
791            self.compact_for_session_with_opts(session, &opts.compact)?
792        } else {
793            // Global mode: compact both recent and archive
794            let mut stats = self.compact_recent_with_opts(&opts.compact)?;
795            let archive_compact = self.compact_archive_with_opts(&opts.compact)?;
796            stats.add(&archive_compact);
797            stats
798        };
799
800        Ok((compact_stats, archive_stats))
801    }
802
803    /// Compact recent data with full options.
804    pub fn compact_recent_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
805        let mut total_stats = CompactStats::default();
806        let recent_dir = self.config().recent_dir();
807
808        for data_type in &["invocations", "outputs", "sessions", "events"] {
809            let data_dir = recent_dir.join(data_type);
810            let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
811            total_stats.add(&stats);
812        }
813
814        Ok(total_stats)
815    }
816
817    /// Compact archive data with full options.
818    pub fn compact_archive_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
819        let mut total_stats = CompactStats::default();
820        let archive_dir = self.config().archive_dir();
821
822        for data_type in &["invocations", "outputs", "sessions", "events"] {
823            let data_dir = archive_dir.join(data_type);
824            let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
825            total_stats.add(&stats);
826        }
827
828        Ok(total_stats)
829    }
830
831    /// Compact data type directory with full options.
832    pub fn compact_data_type_with_opts(
833        &self,
834        data_dir: &Path,
835        opts: &CompactOptions,
836    ) -> Result<CompactStats> {
837        let mut total_stats = CompactStats::default();
838
839        if !data_dir.exists() {
840            return Ok(total_stats);
841        }
842
843        for entry in fs::read_dir(data_dir)? {
844            let entry = entry?;
845            let path = entry.path();
846
847            if !path.is_dir() {
848                continue;
849            }
850
851            let dir_name = entry.file_name().to_string_lossy().to_string();
852
853            // Check if this is a status= partition (for invocations)
854            if dir_name.starts_with("status=") {
855                // Recurse into status partition to find date partitions
856                let stats = self.compact_data_type_with_opts(&path, opts)?;
857                total_stats.add(&stats);
858            } else {
859                // This is a date= partition or other partition
860                let stats = self.compact_partition_with_opts(&path, opts)?;
861                total_stats.add(&stats);
862            }
863        }
864
865        Ok(total_stats)
866    }
867
868    /// Compact files for a specific session with full options.
869    pub fn compact_for_session_with_opts(
870        &self,
871        session_id: &str,
872        opts: &CompactOptions,
873    ) -> Result<CompactStats> {
874        let mut total_stats = CompactStats::default();
875        let recent_dir = self.config().recent_dir();
876
877        let session_opts = CompactOptions {
878            session_filter: Some(session_id.to_string()),
879            ..opts.clone()
880        };
881
882        for data_type in &["invocations", "outputs", "sessions", "events"] {
883            let data_dir = recent_dir.join(data_type);
884            let stats = self.compact_data_type_with_opts(&data_dir, &session_opts)?;
885            total_stats.add(&stats);
886        }
887
888        Ok(total_stats)
889    }
890
891    /// Clean operation: recover orphaned invocations and optionally prune archive.
892    ///
893    /// This:
894    /// 1. Scans pending files for crashed/dead invocations
895    /// 2. Marks them as orphaned in status=orphaned partition
896    /// 3. Optionally prunes old archive data
897    pub fn clean(&self, opts: &CleanOptions) -> Result<CleanStats> {
898        let mut stats = CleanStats::default();
899
900        // Recover orphaned invocations
901        let recovery_stats =
902            self.recover_orphaned_invocations(opts.max_age_hours, opts.dry_run)?;
903
904        stats.pending_checked = recovery_stats.pending_checked;
905        stats.still_running = recovery_stats.still_running;
906        stats.orphaned = recovery_stats.orphaned;
907
908        // Prune old archive data if requested
909        if opts.prune {
910            let prune_stats = self.prune_archive(opts.older_than_days, opts.dry_run)?;
911            stats.pruned_files = prune_stats.files_pruned;
912            stats.bytes_freed = prune_stats.bytes_freed;
913        }
914
915        Ok(stats)
916    }
917
918    /// Prune old archive data.
919    ///
920    /// Deletes data from the archive tier older than the specified number of days.
921    pub fn prune_archive(&self, older_than_days: u32, dry_run: bool) -> Result<PruneStats> {
922        let mut stats = PruneStats::default();
923        let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
924        let archive_dir = self.config().archive_dir();
925
926        for data_type in &["invocations", "outputs", "sessions", "events"] {
927            let data_dir = archive_dir.join(data_type);
928            if !data_dir.exists() {
929                continue;
930            }
931
932            // Archive doesn't have status partitioning - just date= partitions directly
933            let partition_stats =
934                self.prune_date_partitions(&data_dir, cutoff_date, dry_run)?;
935            stats.add(&partition_stats);
936        }
937
938        Ok(stats)
939    }
940
941    /// Prune date partitions older than cutoff.
942    fn prune_date_partitions(
943        &self,
944        parent_dir: &Path,
945        cutoff_date: NaiveDate,
946        dry_run: bool,
947    ) -> Result<PruneStats> {
948        let mut stats = PruneStats::default();
949        let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
950
951        for entry in fs::read_dir(parent_dir)?.flatten() {
952            let path = entry.path();
953            if !path.is_dir() {
954                continue;
955            }
956
957            let dir_name = entry.file_name().to_string_lossy().to_string();
958            let date_str = match dir_name.strip_prefix("date=") {
959                Some(s) => s,
960                None => continue,
961            };
962
963            let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
964                Ok(d) => d,
965                Err(_) => continue,
966            };
967
968            // Skip seed partition
969            if date == seed_date {
970                continue;
971            }
972
973            // Skip if not old enough
974            if date > cutoff_date {
975                continue;
976            }
977
978            // Count files and bytes
979            for file_entry in fs::read_dir(&path)?.flatten() {
980                let file_path = file_entry.path();
981                if file_path.extension().map(|e| e == "parquet").unwrap_or(false) {
982                    if let Ok(metadata) = fs::metadata(&file_path) {
983                        stats.files_pruned += 1;
984                        stats.bytes_freed += metadata.len();
985                    }
986
987                    if !dry_run {
988                        let _ = fs::remove_file(&file_path);
989                    }
990                }
991            }
992
993            // Remove the partition directory if empty (and not dry run)
994            if !dry_run {
995                let _ = fs::remove_dir(&path);
996            }
997        }
998
999        Ok(stats)
1000    }
1001}
1002
1003/// Statistics from a prune operation.
1004#[derive(Debug, Default)]
1005pub struct PruneStats {
1006    /// Number of files pruned.
1007    pub files_pruned: usize,
1008    /// Bytes freed.
1009    pub bytes_freed: u64,
1010}
1011
1012impl PruneStats {
1013    fn add(&mut self, other: &PruneStats) {
1014        self.files_pruned += other.files_pruned;
1015        self.bytes_freed += other.bytes_freed;
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use crate::init::initialize;
1023    use crate::schema::InvocationRecord;
1024    use crate::Config;
1025    use tempfile::TempDir;
1026
1027    fn setup_store() -> (TempDir, Store) {
1028        let tmp = TempDir::new().unwrap();
1029        let config = Config::with_root(tmp.path());
1030        initialize(&config).unwrap();
1031        let store = Store::open(config).unwrap();
1032        (tmp, store)
1033    }
1034
1035    fn setup_store_duckdb() -> (TempDir, Store) {
1036        let tmp = TempDir::new().unwrap();
1037        let config = Config::with_duckdb_mode(tmp.path());
1038        initialize(&config).unwrap();
1039        let store = Store::open(config).unwrap();
1040        (tmp, store)
1041    }
1042
1043    #[test]
1044    fn test_extract_session() {
1045        assert_eq!(
1046            extract_session("zsh-1234--make--uuid.parquet"),
1047            Some("zsh-1234".to_string())
1048        );
1049        assert_eq!(
1050            extract_session("session-id.parquet"),
1051            Some("session-id".to_string())
1052        );
1053        assert_eq!(
1054            extract_session("zsh-1234--__compacted-0__--uuid.parquet"),
1055            Some("zsh-1234".to_string())
1056        );
1057    }
1058
1059    #[test]
1060    fn test_is_compacted_file() {
1061        assert!(is_compacted_file("zsh-1234--__compacted-0__--uuid.parquet"));
1062        assert!(is_compacted_file("session--__compacted-5__--abc.parquet"));
1063        assert!(!is_compacted_file("zsh-1234--make--uuid.parquet"));
1064        assert!(!is_compacted_file("session.parquet"));
1065    }
1066
1067    #[test]
1068    fn test_extract_compaction_number() {
1069        assert_eq!(
1070            extract_compaction_number("zsh--__compacted-0__--uuid.parquet"),
1071            Some(0)
1072        );
1073        assert_eq!(
1074            extract_compaction_number("zsh--__compacted-42__--uuid.parquet"),
1075            Some(42)
1076        );
1077        assert_eq!(
1078            extract_compaction_number("zsh--make--uuid.parquet"),
1079            None
1080        );
1081    }
1082
1083    #[test]
1084    fn test_compact_recent_no_files() {
1085        let (_tmp, store) = setup_store();
1086
1087        let stats = store.compact_recent(2, false).unwrap();
1088        assert_eq!(stats.partitions_compacted, 0);
1089    }
1090
1091    #[test]
1092    fn test_compact_recent_with_files() {
1093        let (_tmp, store) = setup_store();
1094
1095        // Write multiple invocations to create multiple files
1096        for i in 0..5 {
1097            let record = InvocationRecord::new(
1098                "test-session",
1099                format!("command-{}", i),
1100                "/home/user",
1101                0,
1102                "test@client",
1103            );
1104            store.write_invocation(&record).unwrap();
1105        }
1106
1107        // With threshold of 2, should compact oldest 3 (keeping 2)
1108        let stats = store.compact_recent(2, false).unwrap();
1109        assert_eq!(stats.sessions_compacted, 1);
1110        assert_eq!(stats.files_before, 3); // 5 - 2 kept = 3 compacted
1111        assert_eq!(stats.files_after, 1);
1112    }
1113
1114    #[test]
1115    fn test_compact_for_session() {
1116        let (_tmp, store) = setup_store();
1117
1118        // Write files for two different sessions
1119        for i in 0..5 {
1120            let record = InvocationRecord::new(
1121                "session-a",
1122                format!("command-{}", i),
1123                "/home/user",
1124                0,
1125                "test@client",
1126            );
1127            store.write_invocation(&record).unwrap();
1128        }
1129        for i in 0..3 {
1130            let record = InvocationRecord::new(
1131                "session-b",
1132                format!("command-{}", i),
1133                "/home/user",
1134                0,
1135                "test@client",
1136            );
1137            store.write_invocation(&record).unwrap();
1138        }
1139
1140        // Compact only session-a with threshold of 2
1141        let stats = store.compact_for_session("session-a", 2, false).unwrap();
1142        assert_eq!(stats.sessions_compacted, 1);
1143        assert_eq!(stats.files_before, 3); // 5 - 2 kept = 3
1144
1145        // session-b should be untouched (only 3 files, below threshold)
1146        let date = chrono::Utc::now().date_naive();
1147        let inv_dir = store.config().invocations_dir(&date);
1148        let session_b_count = std::fs::read_dir(&inv_dir)
1149            .unwrap()
1150            .filter_map(|e| e.ok())
1151            .filter(|e| e.file_name().to_string_lossy().starts_with("session-b"))
1152            .count();
1153        assert_eq!(session_b_count, 3);
1154    }
1155
1156    #[test]
1157    fn test_compact_dry_run() {
1158        let (_tmp, store) = setup_store();
1159
1160        // Write multiple invocations
1161        for i in 0..5 {
1162            let record = InvocationRecord::new(
1163                "test-session",
1164                format!("command-{}", i),
1165                "/home/user",
1166                0,
1167                "test@client",
1168            );
1169            store.write_invocation(&record).unwrap();
1170        }
1171
1172        // Dry run should report stats but not actually compact
1173        let stats = store.compact_recent(2, true).unwrap();
1174        assert_eq!(stats.sessions_compacted, 1);
1175        assert_eq!(stats.files_before, 3);
1176
1177        // Files should still be there
1178        let date = chrono::Utc::now().date_naive();
1179        let inv_dir = store.config().invocations_dir(&date);
1180        let file_count = std::fs::read_dir(&inv_dir)
1181            .unwrap()
1182            .filter_map(|e| e.ok())
1183            .filter(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false))
1184            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1185            .count();
1186        assert_eq!(file_count, 5);
1187    }
1188
1189    #[test]
1190    fn test_compacted_file_naming() {
1191        let (_tmp, store) = setup_store();
1192
1193        // Write enough files to trigger compaction
1194        for i in 0..5 {
1195            let record = InvocationRecord::new(
1196                "zsh-9999",
1197                format!("cmd-{}", i),
1198                "/home/user",
1199                0,
1200                "test@client",
1201            );
1202            store.write_invocation(&record).unwrap();
1203        }
1204
1205        // Compact with threshold of 2
1206        store.compact_recent(2, false).unwrap();
1207
1208        // Check that compacted file has correct naming
1209        let date = chrono::Utc::now().date_naive();
1210        let inv_dir = store.config().invocations_dir(&date);
1211        let compacted_files: Vec<_> = std::fs::read_dir(&inv_dir)
1212            .unwrap()
1213            .filter_map(|e| e.ok())
1214            .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1215            .collect();
1216
1217        assert_eq!(compacted_files.len(), 1);
1218        let name = compacted_files[0].file_name().to_string_lossy().to_string();
1219        assert!(name.starts_with("zsh-9999--__compacted-0__--"));
1220        assert!(name.ends_with(".parquet"));
1221    }
1222
1223    // ===== DuckDB Mode Tests =====
1224    // In DuckDB mode, compact should be a no-op since there are no parquet files.
1225
1226    #[test]
1227    fn test_compact_duckdb_mode_no_op() {
1228        let (_tmp, store) = setup_store_duckdb();
1229
1230        // Write data in DuckDB mode (goes to local.invocations table, not parquet files)
1231        for i in 0..10 {
1232            let record = InvocationRecord::new(
1233                "test-session",
1234                format!("command-{}", i),
1235                "/home/user",
1236                0,
1237                "test@client",
1238            );
1239            store.write_invocation(&record).unwrap();
1240        }
1241
1242        // Verify data is stored (not in parquet files)
1243        let conn = store.connection().unwrap();
1244        let count: i64 = conn
1245            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1246            .unwrap();
1247        assert_eq!(count, 10, "Data should be in DuckDB table");
1248
1249        // Compact should be a no-op (no parquet files to compact)
1250        let stats = store.compact_recent(2, false).unwrap();
1251        assert_eq!(stats.partitions_compacted, 0);
1252        assert_eq!(stats.sessions_compacted, 0);
1253        assert_eq!(stats.files_before, 0);
1254        assert_eq!(stats.files_after, 0);
1255
1256        // Data should still be there
1257        let count: i64 = conn
1258            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1259            .unwrap();
1260        assert_eq!(count, 10, "Data should be unaffected");
1261    }
1262
1263    #[test]
1264    fn test_compact_for_session_duckdb_mode_no_op() {
1265        let (_tmp, store) = setup_store_duckdb();
1266
1267        // Write data in DuckDB mode
1268        for i in 0..5 {
1269            let record = InvocationRecord::new(
1270                "session-a",
1271                format!("command-{}", i),
1272                "/home/user",
1273                0,
1274                "test@client",
1275            );
1276            store.write_invocation(&record).unwrap();
1277        }
1278
1279        // Session-specific compact should also be a no-op
1280        let stats = store.compact_for_session("session-a", 2, false).unwrap();
1281        assert_eq!(stats.partitions_compacted, 0);
1282        assert_eq!(stats.sessions_compacted, 0);
1283    }
1284
1285    #[test]
1286    fn test_compact_session_today_duckdb_mode_no_op() {
1287        let (_tmp, store) = setup_store_duckdb();
1288
1289        // Write data in DuckDB mode
1290        let record = InvocationRecord::new(
1291            "test-session",
1292            "echo hello",
1293            "/home/user",
1294            0,
1295            "test@client",
1296        );
1297        store.write_invocation(&record).unwrap();
1298
1299        // Today's session compact should be a no-op
1300        let stats = store.compact_session_today("test-session", 2, false).unwrap();
1301        assert_eq!(stats.partitions_compacted, 0);
1302        assert_eq!(stats.sessions_compacted, 0);
1303    }
1304
1305    #[test]
1306    fn test_auto_compact_duckdb_mode_no_op() {
1307        let (_tmp, store) = setup_store_duckdb();
1308
1309        // Write data
1310        for i in 0..5 {
1311            let record = InvocationRecord::new(
1312                "test-session",
1313                format!("command-{}", i),
1314                "/home/user",
1315                0,
1316                "test@client",
1317            );
1318            store.write_invocation(&record).unwrap();
1319        }
1320
1321        // Auto-compact should be a no-op
1322        let opts = AutoCompactOptions::default();
1323        let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1324
1325        assert_eq!(compact_stats.partitions_compacted, 0);
1326        assert_eq!(compact_stats.sessions_compacted, 0);
1327        assert_eq!(archive_stats.partitions_archived, 0);
1328    }
1329
1330    #[test]
1331    fn test_archive_old_data_duckdb_mode_no_op() {
1332        let (_tmp, store) = setup_store_duckdb();
1333
1334        // Write data
1335        let record = InvocationRecord::new(
1336            "test-session",
1337            "echo hello",
1338            "/home/user",
1339            0,
1340            "test@client",
1341        );
1342        store.write_invocation(&record).unwrap();
1343
1344        // Archive should be a no-op (no parquet partitions to archive)
1345        let stats = store.archive_old_data(0, false).unwrap(); // 0 days = archive everything
1346        assert_eq!(stats.partitions_archived, 0);
1347        assert_eq!(stats.files_moved, 0);
1348    }
1349
1350    // ===== Archive Tests (Parquet Mode) =====
1351
1352    #[test]
1353    fn test_archive_old_data_moves_partitions() {
1354        let (_tmp, store) = setup_store();
1355
1356        // Write multiple invocations
1357        for i in 0..3 {
1358            let record = InvocationRecord::new(
1359                "test-session",
1360                format!("command-{}", i),
1361                "/home/user",
1362                0,
1363                "test@client",
1364            );
1365            store.write_invocation(&record).unwrap();
1366        }
1367
1368        // Verify files exist in recent
1369        let date = chrono::Utc::now().date_naive();
1370        let recent_dir = store.config().invocations_dir(&date);
1371        let recent_count = std::fs::read_dir(&recent_dir)
1372            .unwrap()
1373            .filter_map(|e| e.ok())
1374            .filter(|e| {
1375                e.path()
1376                    .extension()
1377                    .map(|ext| ext == "parquet")
1378                    .unwrap_or(false)
1379            })
1380            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1381            .count();
1382        assert_eq!(recent_count, 3, "Should have 3 files in recent");
1383
1384        // Archive with 0 days (archive everything)
1385        let stats = store.archive_old_data(0, false).unwrap();
1386
1387        // Archives 4 data types: invocations, outputs, sessions, events
1388        // Only invocations has data, but all 4 partitions exist
1389        assert!(stats.partitions_archived >= 1, "Should archive at least 1 partition");
1390        assert!(stats.files_moved > 0, "Should move files");
1391
1392        // Verify files moved to archive
1393        let archive_dir = store
1394            .config()
1395            .archive_dir()
1396            .join("invocations")
1397            .join(format!("date={}", date));
1398        assert!(archive_dir.exists(), "Archive partition should exist");
1399
1400        // Archive consolidates to single file
1401        let archive_files: Vec<_> = std::fs::read_dir(&archive_dir)
1402            .unwrap()
1403            .filter_map(|e| e.ok())
1404            .filter(|e| {
1405                e.path()
1406                    .extension()
1407                    .map(|ext| ext == "parquet")
1408                    .unwrap_or(false)
1409            })
1410            .collect();
1411        assert_eq!(archive_files.len(), 1, "Archive should have 1 consolidated file");
1412
1413        // Recent partition should be removed or empty (only seed files remain)
1414        let remaining = std::fs::read_dir(&recent_dir)
1415            .map(|entries| {
1416                entries
1417                    .filter_map(|e| e.ok())
1418                    .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1419                    .count()
1420            })
1421            .unwrap_or(0);
1422        assert_eq!(remaining, 0, "Recent partition should have no data files");
1423    }
1424
1425    #[test]
1426    fn test_archive_dry_run() {
1427        let (_tmp, store) = setup_store();
1428
1429        // Write data
1430        for i in 0..3 {
1431            let record = InvocationRecord::new(
1432                "test-session",
1433                format!("command-{}", i),
1434                "/home/user",
1435                0,
1436                "test@client",
1437            );
1438            store.write_invocation(&record).unwrap();
1439        }
1440
1441        // Dry run should report stats but not move files
1442        let stats = store.archive_old_data(0, true).unwrap();
1443        // Archives 4 data types, but counts partitions with files
1444        assert!(stats.partitions_archived >= 1, "Should report at least 1 partition");
1445        assert!(stats.files_moved > 0, "Should report files to move");
1446
1447        // Files should still be in recent
1448        let date = chrono::Utc::now().date_naive();
1449        let recent_dir = store.config().invocations_dir(&date);
1450        let recent_count = std::fs::read_dir(&recent_dir)
1451            .unwrap()
1452            .filter_map(|e| e.ok())
1453            .filter(|e| {
1454                e.path()
1455                    .extension()
1456                    .map(|ext| ext == "parquet")
1457                    .unwrap_or(false)
1458            })
1459            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1460            .count();
1461        assert_eq!(recent_count, 3, "Files should still be in recent after dry run");
1462    }
1463
1464    #[test]
1465    fn test_archive_respects_age_threshold() {
1466        let (_tmp, store) = setup_store();
1467
1468        // Write data today
1469        let record = InvocationRecord::new(
1470            "test-session",
1471            "echo hello",
1472            "/home/user",
1473            0,
1474            "test@client",
1475        );
1476        store.write_invocation(&record).unwrap();
1477
1478        // Archive with 7 days threshold - today's data should NOT be archived
1479        let stats = store.archive_old_data(7, false).unwrap();
1480        assert_eq!(stats.partitions_archived, 0, "Today's data should not be archived with 7 day threshold");
1481
1482        // Verify data is still queryable (via filesystem check, not view)
1483        let date = chrono::Utc::now().date_naive();
1484        let recent_dir = store.config().invocations_dir(&date);
1485        let file_count = std::fs::read_dir(&recent_dir)
1486            .unwrap()
1487            .filter_map(|e| e.ok())
1488            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1489            .count();
1490        assert_eq!(file_count, 1, "Data file should still exist");
1491    }
1492
1493    // ===== Consolidation Tests =====
1494
1495    #[test]
1496    fn test_consolidate_merges_all_files() {
1497        let (_tmp, store) = setup_store();
1498
1499        // Write multiple files
1500        for i in 0..5 {
1501            let record = InvocationRecord::new(
1502                "test-session",
1503                format!("command-{}", i),
1504                "/home/user",
1505                0,
1506                "test@client",
1507            );
1508            store.write_invocation(&record).unwrap();
1509        }
1510
1511        // Compact first to create a compacted file
1512        store.compact_recent(2, false).unwrap();
1513
1514        // Now consolidate everything
1515        let opts = CompactOptions {
1516            consolidate: true,
1517            ..Default::default()
1518        };
1519        let stats = store.compact_recent_with_opts(&opts).unwrap();
1520
1521        // Should consolidate all files (compacted + remaining) into one
1522        assert!(stats.sessions_compacted > 0, "Should consolidate session files");
1523
1524        // Verify single file remains
1525        let date = chrono::Utc::now().date_naive();
1526        let inv_dir = store.config().invocations_dir(&date);
1527        let file_count = std::fs::read_dir(&inv_dir)
1528            .unwrap()
1529            .filter_map(|e| e.ok())
1530            .filter(|e| {
1531                e.path()
1532                    .extension()
1533                    .map(|ext| ext == "parquet")
1534                    .unwrap_or(false)
1535            })
1536            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1537            .count();
1538        assert_eq!(file_count, 1, "Should have single consolidated file");
1539    }
1540
1541    #[test]
1542    fn test_recompact_threshold() {
1543        let (_tmp, store) = setup_store();
1544
1545        // Create many compacted files by doing multiple compact cycles
1546        // First, write and compact several batches
1547        for batch in 0..3 {
1548            for i in 0..5 {
1549                let record = InvocationRecord::new(
1550                    "test-session",
1551                    format!("batch-{}-cmd-{}", batch, i),
1552                    "/home/user",
1553                    0,
1554                    "test@client",
1555                );
1556                store.write_invocation(&record).unwrap();
1557            }
1558            // Compact after each batch (creates compacted files)
1559            store.compact_recent(2, false).unwrap();
1560        }
1561
1562        // Count compacted files
1563        let date = chrono::Utc::now().date_naive();
1564        let inv_dir = store.config().invocations_dir(&date);
1565        let compacted_count = std::fs::read_dir(&inv_dir)
1566            .unwrap()
1567            .filter_map(|e| e.ok())
1568            .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1569            .count();
1570
1571        // With recompact_threshold=2, should trigger re-compaction
1572        let opts = CompactOptions {
1573            file_threshold: 50, // High threshold so we don't compact non-compacted
1574            recompact_threshold: 2,
1575            ..Default::default()
1576        };
1577        let stats = store.compact_recent_with_opts(&opts).unwrap();
1578
1579        // If we had enough compacted files, should have re-compacted
1580        if compacted_count >= 2 {
1581            assert!(
1582                stats.sessions_compacted > 0 || stats.files_before > 0,
1583                "Should trigger re-compaction when threshold exceeded"
1584            );
1585        }
1586    }
1587
1588    #[test]
1589    fn test_auto_compact_parquet_mode() {
1590        let (_tmp, store) = setup_store();
1591
1592        // Write enough files to trigger compaction
1593        for i in 0..10 {
1594            let record = InvocationRecord::new(
1595                "test-session",
1596                format!("command-{}", i),
1597                "/home/user",
1598                0,
1599                "test@client",
1600            );
1601            store.write_invocation(&record).unwrap();
1602        }
1603
1604        // Verify files before compact
1605        let date = chrono::Utc::now().date_naive();
1606        let inv_dir = store.config().invocations_dir(&date);
1607        let files_before = std::fs::read_dir(&inv_dir)
1608            .unwrap()
1609            .filter_map(|e| e.ok())
1610            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1611            .count();
1612        assert_eq!(files_before, 10, "Should have 10 files before compact");
1613
1614        // Run auto_compact with low threshold
1615        let opts = AutoCompactOptions {
1616            compact: CompactOptions {
1617                file_threshold: 3,
1618                ..Default::default()
1619            },
1620            archive_days: 14, // Don't archive today's data
1621        };
1622        let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1623
1624        // Should compact but not archive (data is from today)
1625        assert!(compact_stats.sessions_compacted > 0, "Should compact files");
1626        assert_eq!(archive_stats.partitions_archived, 0, "Should not archive today's data");
1627
1628        // Verify files were compacted (fewer files now)
1629        let files_after = std::fs::read_dir(&inv_dir)
1630            .unwrap()
1631            .filter_map(|e| e.ok())
1632            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1633            .count();
1634        assert!(files_after < files_before, "Should have fewer files after compact");
1635    }
1636
1637    #[test]
1638    fn test_compact_preserves_data_integrity() {
1639        let (_tmp, store) = setup_store();
1640
1641        // Write known data
1642        let commands: Vec<String> = (0..10).map(|i| format!("command-{}", i)).collect();
1643        for cmd in &commands {
1644            let record = InvocationRecord::new(
1645                "test-session",
1646                cmd.clone(),
1647                "/home/user",
1648                0,
1649                "test@client",
1650            );
1651            store.write_invocation(&record).unwrap();
1652        }
1653
1654        // Verify data before compact
1655        let conn = store.connection().unwrap();
1656        let count_before: i64 = conn
1657            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1658            .unwrap();
1659        assert_eq!(count_before, 10);
1660
1661        // Compact
1662        store.compact_recent(2, false).unwrap();
1663
1664        // Verify data after compact - count should be same
1665        let count_after: i64 = conn
1666            .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1667            .unwrap();
1668        assert_eq!(count_after, 10, "Compaction should preserve all records");
1669
1670        // Verify all commands are present
1671        let mut found_cmds: Vec<String> = conn
1672            .prepare("SELECT cmd FROM local.invocations ORDER BY cmd")
1673            .unwrap()
1674            .query_map([], |r| r.get(0))
1675            .unwrap()
1676            .filter_map(|r| r.ok())
1677            .collect();
1678        found_cmds.sort();
1679        let mut expected = commands.clone();
1680        expected.sort();
1681        assert_eq!(found_cmds, expected, "All commands should be preserved");
1682    }
1683
1684    // ===== Clean/Prune Tests =====
1685
1686    #[test]
1687    fn test_clean_recovers_orphaned() {
1688        let (tmp, store) = setup_store();
1689
1690        // Create a pending invocation with a dead PID
1691        let record = InvocationRecord::new_pending_local(
1692            "test-session",
1693            "crashed-command",
1694            "/home/user",
1695            999999999, // PID that doesn't exist
1696            "test@client",
1697        );
1698
1699        // Write pending file manually (simulating a crash scenario)
1700        let pending =
1701            super::super::pending::PendingInvocation::from_record(&record).unwrap();
1702        let pending_dir = tmp.path().join("db/pending");
1703        super::super::pending::write_pending_file(&pending_dir, &pending).unwrap();
1704
1705        // Write to status=pending partition
1706        store.write_invocation(&record).unwrap();
1707
1708        // Run clean
1709        let opts = CleanOptions::default();
1710        let stats = store.clean(&opts).unwrap();
1711
1712        assert_eq!(stats.pending_checked, 1);
1713        assert_eq!(stats.orphaned, 1);
1714        assert_eq!(stats.still_running, 0);
1715    }
1716
1717    #[test]
1718    fn test_clean_dry_run() {
1719        let (tmp, store) = setup_store();
1720
1721        // Create a pending invocation with a dead PID
1722        let record = InvocationRecord::new_pending_local(
1723            "test-session",
1724            "crashed-command",
1725            "/home/user",
1726            999999999, // PID that doesn't exist
1727            "test@client",
1728        );
1729
1730        // Write pending file
1731        let pending =
1732            super::super::pending::PendingInvocation::from_record(&record).unwrap();
1733        let pending_dir = tmp.path().join("db/pending");
1734        super::super::pending::write_pending_file(&pending_dir, &pending).unwrap();
1735
1736        // Write to status=pending partition
1737        store.write_invocation(&record).unwrap();
1738
1739        // Run clean in dry-run mode
1740        let opts = CleanOptions {
1741            dry_run: true,
1742            ..Default::default()
1743        };
1744        let stats = store.clean(&opts).unwrap();
1745
1746        // Stats should report the orphan
1747        assert_eq!(stats.pending_checked, 1);
1748        assert_eq!(stats.orphaned, 1);
1749
1750        // But pending file should still exist
1751        let pending_path = pending.path(&pending_dir);
1752        assert!(pending_path.exists(), "Pending file should still exist after dry run");
1753    }
1754
1755    #[test]
1756    fn test_prune_archive() {
1757        let (tmp, store) = setup_store();
1758
1759        // Write some data
1760        for i in 0..3 {
1761            let record = InvocationRecord::new(
1762                "test-session",
1763                format!("command-{}", i),
1764                "/home/user",
1765                0,
1766                "test@client",
1767            );
1768            store.write_invocation(&record).unwrap();
1769        }
1770
1771        // Archive everything (0 days = archive all)
1772        let archive_stats = store.archive_old_data(0, false).unwrap();
1773        assert!(archive_stats.partitions_archived > 0, "Should archive data");
1774
1775        // Verify archive exists
1776        let archive_invocations = tmp.path().join("db/data/archive/invocations");
1777        assert!(archive_invocations.exists(), "Archive should exist");
1778
1779        // Prune with 0 days (prune everything)
1780        let prune_stats = store.prune_archive(0, false).unwrap();
1781        assert!(prune_stats.files_pruned > 0, "Should prune files");
1782        assert!(prune_stats.bytes_freed > 0, "Should free bytes");
1783    }
1784
1785    #[test]
1786    fn test_clean_with_prune() {
1787        let (tmp, store) = setup_store();
1788
1789        // Create orphaned pending file
1790        let record = InvocationRecord::new_pending_local(
1791            "test-session",
1792            "crashed-command",
1793            "/home/user",
1794            999999999,
1795            "test@client",
1796        );
1797        let pending =
1798            super::super::pending::PendingInvocation::from_record(&record).unwrap();
1799        let pending_dir = tmp.path().join("db/pending");
1800        super::super::pending::write_pending_file(&pending_dir, &pending).unwrap();
1801        store.write_invocation(&record).unwrap();
1802
1803        // Write and archive some data
1804        for i in 0..3 {
1805            let record = InvocationRecord::new(
1806                "another-session",
1807                format!("command-{}", i),
1808                "/home/user",
1809                0,
1810                "test@client",
1811            );
1812            store.write_invocation(&record).unwrap();
1813        }
1814        store.archive_old_data(0, false).unwrap();
1815
1816        // Run clean with prune
1817        let opts = CleanOptions {
1818            prune: true,
1819            older_than_days: 0, // Prune everything
1820            ..Default::default()
1821        };
1822        let stats = store.clean(&opts).unwrap();
1823
1824        // Should both recover orphaned and prune archive
1825        assert_eq!(stats.orphaned, 1, "Should recover 1 orphaned");
1826        assert!(stats.pruned_files > 0, "Should prune archive files");
1827    }
1828}