Skip to main content

magic_bird/store/
compact.rs

1//! Compaction and archival operations.
2//!
3//! Supports two modes:
4//! - **Client-specific**: Compact files for a single session/client (used by shell hooks)
5//! - **Global**: Compact files across all sessions (used for scheduled maintenance)
6
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::{NaiveDate, Utc};
12use uuid::Uuid;
13
14use super::atomic;
15use super::Store;
16use crate::Result;
17
18/// Check if a filename is a compacted file (contains `__compacted-N__`).
19fn is_compacted_file(name: &str) -> bool {
20    name.contains("__compacted-")
21}
22
23/// Check if a filename is a seed file.
24fn is_seed_file(name: &str) -> bool {
25    name.starts_with("_seed")
26}
27
28/// Extract the session/group key from a filename.
29///
30/// For files like `session--cmd--uuid.parquet`, returns `session`.
31/// For files like `session_id.parquet`, returns `session_id`.
32/// For compacted files like `session--__compacted-0__--uuid.parquet`, returns `session`.
33fn extract_session(name: &str) -> Option<String> {
34    let stem = name.strip_suffix(".parquet")?;
35
36    // Find first segment before "--"
37    if let Some(idx) = stem.find("--") {
38        Some(stem[..idx].to_string())
39    } else {
40        // No "--", use whole stem (e.g., session files)
41        Some(stem.to_string())
42    }
43}
44
45/// Extract the compaction sequence number from a filename, if present.
46fn extract_compaction_number(name: &str) -> Option<u32> {
47    // Pattern: __compacted-N__
48    if let Some(start) = name.find("__compacted-") {
49        let after_prefix = &name[start + 12..]; // skip "__compacted-"
50        if let Some(end) = after_prefix.find("__") {
51            if let Ok(n) = after_prefix[..end].parse::<u32>() {
52                return Some(n);
53            }
54        }
55    }
56    None
57}
58
59/// Find the next compaction sequence number for a session in a partition.
60fn next_compaction_number(partition_dir: &Path, session: &str) -> u32 {
61    fs::read_dir(partition_dir)
62        .ok()
63        .map(|entries| {
64            entries
65                .filter_map(|e| e.ok())
66                .filter_map(|e| {
67                    let name = e.file_name().to_string_lossy().to_string();
68                    // Only consider compacted files for this session
69                    if name.starts_with(&format!("{}--__compacted-", session)) {
70                        extract_compaction_number(&name)
71                    } else {
72                        None
73                    }
74                })
75                .max()
76                .map(|n| n + 1)
77                .unwrap_or(0)
78        })
79        .unwrap_or(0)
80}
81
82/// Get modification time for sorting files by age.
83fn file_mtime(path: &Path) -> u64 {
84    fs::metadata(path)
85        .and_then(|m| m.modified())
86        .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs())
87        .unwrap_or(0)
88}
89
90/// Statistics from a compaction operation.
91#[derive(Debug, Default)]
92pub struct CompactStats {
93    pub partitions_compacted: usize,
94    pub sessions_compacted: usize,
95    pub files_before: usize,
96    pub files_after: usize,
97    pub bytes_before: u64,
98    pub bytes_after: u64,
99}
100
101impl CompactStats {
102    pub fn add(&mut self, other: &CompactStats) {
103        self.partitions_compacted += other.partitions_compacted;
104        self.sessions_compacted += other.sessions_compacted;
105        self.files_before += other.files_before;
106        self.files_after += other.files_after;
107        self.bytes_before += other.bytes_before;
108        self.bytes_after += other.bytes_after;
109    }
110}
111
112/// Statistics from an archive operation.
113#[derive(Debug, Default)]
114pub struct ArchiveStats {
115    pub partitions_archived: usize,
116    pub files_moved: usize,
117    pub bytes_moved: u64,
118}
119
120/// Options for compaction operations.
121#[derive(Debug, Clone)]
122pub struct CompactOptions {
123    /// Compact when a session has more than this many non-compacted files.
124    pub file_threshold: usize,
125    /// Re-compact when a session has more than this many compacted files.
126    /// Set to 0 to disable re-compaction.
127    pub recompact_threshold: usize,
128    /// If true, consolidate ALL files (including compacted) into a single file.
129    pub consolidate: bool,
130    /// If true, don't actually make changes.
131    pub dry_run: bool,
132    /// If set, only compact files for this specific session.
133    pub session_filter: Option<String>,
134}
135
136impl Default for CompactOptions {
137    fn default() -> Self {
138        Self {
139            file_threshold: 50,
140            recompact_threshold: 10,
141            consolidate: false,
142            dry_run: false,
143            session_filter: None,
144        }
145    }
146}
147
148/// Options for auto-compaction (includes archive settings).
149#[derive(Debug)]
150pub struct AutoCompactOptions {
151    /// Compact options.
152    pub compact: CompactOptions,
153    /// Migrate data older than this many days to archive.
154    pub archive_days: u32,
155}
156
157impl Default for AutoCompactOptions {
158    fn default() -> Self {
159        Self {
160            compact: CompactOptions::default(),
161            archive_days: 14,
162        }
163    }
164}
165
166impl Store {
167    /// Compact files for a specific session in a partition.
168    ///
169    /// Keeps the most recent `keep_count` files, compacts the rest.
170    /// Uses naming: `<session>--__compacted-N__--<uuid>.parquet`
171    fn compact_session_files(
172        &self,
173        partition_dir: &Path,
174        session: &str,
175        files: &mut Vec<PathBuf>,
176        keep_count: usize,
177        dry_run: bool,
178    ) -> Result<CompactStats> {
179        // Sort by modification time (oldest first)
180        files.sort_by_key(|p| file_mtime(p));
181
182        // Keep the most recent `keep_count` files
183        let to_keep = files.len().saturating_sub(keep_count).max(0);
184        if to_keep < 2 {
185            // Need at least 2 files to compact
186            return Ok(CompactStats::default());
187        }
188
189        let files_to_compact: Vec<PathBuf> = files.drain(..to_keep).collect();
190        let files_before = files_to_compact.len();
191        let bytes_before: u64 = files_to_compact
192            .iter()
193            .filter_map(|p| fs::metadata(p).ok())
194            .map(|m| m.len())
195            .sum();
196
197        if dry_run {
198            return Ok(CompactStats {
199                partitions_compacted: 0, // Will be set by caller
200                sessions_compacted: 1,
201                files_before,
202                files_after: 1,
203                bytes_before,
204                bytes_after: bytes_before, // Estimate
205            });
206        }
207
208        let conn = self.connection()?;
209
210        // Build list of files to read
211        let file_list: Vec<String> = files_to_compact
212            .iter()
213            .map(|p| p.display().to_string())
214            .collect();
215        let file_list_sql = file_list
216            .iter()
217            .map(|f| format!("'{}'", f))
218            .collect::<Vec<_>>()
219            .join(", ");
220
221        // Create temp table with data from selected files
222        conn.execute(
223            &format!(
224                "CREATE OR REPLACE TEMP TABLE compact_temp AS
225                 SELECT * FROM read_parquet([{}], union_by_name = true)",
226                file_list_sql
227            ),
228            [],
229        )?;
230
231        // Generate compacted filename: <session>--__compacted-N__--<uuid>.parquet
232        let seq_num = next_compaction_number(partition_dir, session);
233        let uuid = Uuid::now_v7();
234        let compacted_name = format!("{}--__compacted-{}__--{}.parquet", session, seq_num, uuid);
235        let compacted_path = partition_dir.join(&compacted_name);
236        let temp_path = atomic::temp_path(&compacted_path);
237
238        conn.execute(
239            &format!(
240                "COPY compact_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
241                temp_path.display()
242            ),
243            [],
244        )?;
245
246        conn.execute("DROP TABLE compact_temp", [])?;
247
248        // Get size of new file
249        let bytes_after = fs::metadata(&temp_path)?.len();
250
251        // Remove old files
252        for file in &files_to_compact {
253            fs::remove_file(file)?;
254        }
255
256        // Rename compacted file to final location
257        atomic::rename_into_place(&temp_path, &compacted_path)?;
258
259        Ok(CompactStats {
260            partitions_compacted: 0, // Will be set by caller
261            sessions_compacted: 1,
262            files_before,
263            files_after: 1,
264            bytes_before,
265            bytes_after,
266        })
267    }
268
269    /// Consolidate ALL files for a session into a single file.
270    ///
271    /// Unlike regular compaction, this merges everything including previously
272    /// compacted files into a single `data_0.parquet` file.
273    fn consolidate_session_files(
274        &self,
275        partition_dir: &Path,
276        _session: &str,
277        files: Vec<PathBuf>,
278        dry_run: bool,
279    ) -> Result<CompactStats> {
280        if files.len() < 2 {
281            return Ok(CompactStats::default());
282        }
283
284        let files_before = files.len();
285        let bytes_before: u64 = files
286            .iter()
287            .filter_map(|p| fs::metadata(p).ok())
288            .map(|m| m.len())
289            .sum();
290
291        if dry_run {
292            return Ok(CompactStats {
293                partitions_compacted: 0,
294                sessions_compacted: 1,
295                files_before,
296                files_after: 1,
297                bytes_before,
298                bytes_after: bytes_before,
299            });
300        }
301
302        let conn = self.connection()?;
303
304        // Build list of files to read
305        let file_list_sql = files
306            .iter()
307            .map(|p| format!("'{}'", p.display()))
308            .collect::<Vec<_>>()
309            .join(", ");
310
311        // Create temp table with data from all files
312        conn.execute(
313            &format!(
314                "CREATE OR REPLACE TEMP TABLE consolidate_temp AS
315                 SELECT * FROM read_parquet([{}], union_by_name = true)",
316                file_list_sql
317            ),
318            [],
319        )?;
320
321        // Use simple data_0.parquet naming for consolidated files
322        let uuid = Uuid::now_v7();
323        let consolidated_name = format!("data_{}.parquet", uuid);
324        let consolidated_path = partition_dir.join(&consolidated_name);
325        let temp_path = atomic::temp_path(&consolidated_path);
326
327        conn.execute(
328            &format!(
329                "COPY consolidate_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
330                temp_path.display()
331            ),
332            [],
333        )?;
334
335        conn.execute("DROP TABLE consolidate_temp", [])?;
336
337        let bytes_after = fs::metadata(&temp_path)?.len();
338
339        // Remove old files
340        for file in &files {
341            fs::remove_file(file)?;
342        }
343
344        // Rename consolidated file to final location
345        atomic::rename_into_place(&temp_path, &consolidated_path)?;
346
347        Ok(CompactStats {
348            partitions_compacted: 0,
349            sessions_compacted: 1,
350            files_before,
351            files_after: 1,
352            bytes_before,
353            bytes_after,
354        })
355    }
356
357    /// Compact files in a partition directory, grouped by session.
358    ///
359    /// Behavior depends on options:
360    /// - `consolidate`: Merge ALL files into single file per session
361    /// - `file_threshold`: Compact when > N non-compacted files
362    /// - `recompact_threshold`: Re-compact when > N compacted files exist
363    pub fn compact_partition_with_opts(
364        &self,
365        partition_dir: &Path,
366        opts: &CompactOptions,
367    ) -> Result<CompactStats> {
368        let mut total_stats = CompactStats::default();
369
370        // Group files by session, separating compacted from non-compacted
371        let mut session_files: HashMap<String, (Vec<PathBuf>, Vec<PathBuf>)> = HashMap::new();
372
373        for entry in fs::read_dir(partition_dir)? {
374            let entry = entry?;
375            let path = entry.path();
376            let name = entry.file_name().to_string_lossy().to_string();
377
378            // Skip non-parquet and seed files
379            if !name.ends_with(".parquet") || is_seed_file(&name) {
380                continue;
381            }
382
383            // Extract session from filename
384            if let Some(session) = extract_session(&name) {
385                // Apply session filter if specified
386                if let Some(ref filter) = opts.session_filter {
387                    if session != *filter {
388                        continue;
389                    }
390                }
391
392                let entry = session_files.entry(session).or_insert_with(|| (vec![], vec![]));
393                if is_compacted_file(&name) || name.starts_with("data_") {
394                    entry.1.push(path); // compacted/consolidated
395                } else {
396                    entry.0.push(path); // non-compacted
397                }
398            }
399        }
400
401        let mut any_compacted = false;
402        for (session, (non_compacted, compacted)) in session_files {
403            if opts.consolidate {
404                // Consolidate mode: merge ALL files into one
405                let all_files: Vec<PathBuf> = non_compacted.into_iter().chain(compacted).collect();
406                if all_files.len() >= 2 {
407                    let stats = self.consolidate_session_files(
408                        partition_dir,
409                        &session,
410                        all_files,
411                        opts.dry_run,
412                    )?;
413                    if stats.sessions_compacted > 0 {
414                        any_compacted = true;
415                        total_stats.add(&stats);
416                    }
417                }
418            } else {
419                // Regular compaction mode
420
421                // First: compact non-compacted files if threshold exceeded
422                if non_compacted.len() >= opts.file_threshold {
423                    let mut to_process = non_compacted;
424                    let stats = self.compact_session_files(
425                        partition_dir,
426                        &session,
427                        &mut to_process,
428                        opts.file_threshold,
429                        opts.dry_run,
430                    )?;
431                    if stats.sessions_compacted > 0 {
432                        any_compacted = true;
433                        total_stats.add(&stats);
434                    }
435                }
436
437                // Second: re-compact compacted files if recompact threshold exceeded
438                if opts.recompact_threshold > 0 && compacted.len() >= opts.recompact_threshold {
439                    let stats = self.consolidate_session_files(
440                        partition_dir,
441                        &session,
442                        compacted,
443                        opts.dry_run,
444                    )?;
445                    if stats.sessions_compacted > 0 {
446                        any_compacted = true;
447                        total_stats.add(&stats);
448                    }
449                }
450            }
451        }
452
453        if any_compacted {
454            total_stats.partitions_compacted = 1;
455        }
456
457        Ok(total_stats)
458    }
459
460    /// Compact files in a partition directory (legacy API).
461    pub fn compact_partition(
462        &self,
463        partition_dir: &Path,
464        file_threshold: usize,
465        session_filter: Option<&str>,
466        dry_run: bool,
467    ) -> Result<CompactStats> {
468        let opts = CompactOptions {
469            file_threshold,
470            recompact_threshold: 0, // Disable re-compaction for legacy API
471            consolidate: false,
472            dry_run,
473            session_filter: session_filter.map(|s| s.to_string()),
474        };
475        self.compact_partition_with_opts(partition_dir, &opts)
476    }
477
478    /// Compact all partitions in a data type directory (invocations, outputs, sessions).
479    pub fn compact_data_type(
480        &self,
481        data_dir: &Path,
482        file_threshold: usize,
483        session_filter: Option<&str>,
484        dry_run: bool,
485    ) -> Result<CompactStats> {
486        let mut total_stats = CompactStats::default();
487
488        if !data_dir.exists() {
489            return Ok(total_stats);
490        }
491
492        // Iterate over date partitions
493        for entry in fs::read_dir(data_dir)? {
494            let entry = entry?;
495            let path = entry.path();
496
497            if !path.is_dir() {
498                continue;
499            }
500
501            let stats = self.compact_partition(&path, file_threshold, session_filter, dry_run)?;
502            total_stats.add(&stats);
503        }
504
505        Ok(total_stats)
506    }
507
508    /// Compact recent data for a specific session (used by shell hooks).
509    ///
510    /// Checks all date partitions in recent data.
511    pub fn compact_for_session(
512        &self,
513        session_id: &str,
514        file_threshold: usize,
515        dry_run: bool,
516    ) -> Result<CompactStats> {
517        let mut total_stats = CompactStats::default();
518        let recent_dir = self.config().recent_dir();
519
520        for data_type in &["invocations", "outputs", "sessions", "events"] {
521            let data_dir = recent_dir.join(data_type);
522            let stats =
523                self.compact_data_type(&data_dir, file_threshold, Some(session_id), dry_run)?;
524            total_stats.add(&stats);
525        }
526
527        Ok(total_stats)
528    }
529
530    /// Fast compaction check for today's partition only (used by shell hooks).
531    ///
532    /// This is the most lightweight check - only looks at today's date partition.
533    pub fn compact_session_today(
534        &self,
535        session_id: &str,
536        file_threshold: usize,
537        dry_run: bool,
538    ) -> Result<CompactStats> {
539        let mut total_stats = CompactStats::default();
540        let recent_dir = self.config().recent_dir();
541        let today = Utc::now().date_naive();
542        let date_partition = format!("date={}", today.format("%Y-%m-%d"));
543
544        for data_type in &["invocations", "outputs", "sessions", "events"] {
545            let partition_dir = recent_dir.join(data_type).join(&date_partition);
546            if partition_dir.exists() {
547                let stats = self.compact_partition(
548                    &partition_dir,
549                    file_threshold,
550                    Some(session_id),
551                    dry_run,
552                )?;
553                total_stats.add(&stats);
554            }
555        }
556
557        Ok(total_stats)
558    }
559
560    /// Compact all recent data that exceeds the file threshold (global mode).
561    pub fn compact_recent(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
562        let mut total_stats = CompactStats::default();
563        let recent_dir = self.config().recent_dir();
564
565        for data_type in &["invocations", "outputs", "sessions", "events"] {
566            let data_dir = recent_dir.join(data_type);
567            let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
568            total_stats.add(&stats);
569        }
570
571        Ok(total_stats)
572    }
573
574    /// Compact all archive data that exceeds the file threshold.
575    pub fn compact_archive(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
576        let mut total_stats = CompactStats::default();
577        let archive_dir = self.config().archive_dir();
578
579        for data_type in &["invocations", "outputs", "sessions", "events"] {
580            let data_dir = archive_dir.join(data_type);
581            let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
582            total_stats.add(&stats);
583        }
584
585        Ok(total_stats)
586    }
587
588    /// Migrate old data from recent to archive with consolidation.
589    ///
590    /// Consolidates all files in each date partition into a single parquet file
591    /// in the archive, then removes the source files.
592    pub fn archive_old_data(&self, older_than_days: u32, dry_run: bool) -> Result<ArchiveStats> {
593        let mut stats = ArchiveStats::default();
594        let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
595
596        let recent_dir = self.config().recent_dir();
597        let archive_dir = self.config().archive_dir();
598
599        for data_type in &["invocations", "outputs", "sessions", "events"] {
600            let recent_data_dir = recent_dir.join(data_type);
601            let archive_data_dir = archive_dir.join(data_type);
602
603            if !recent_data_dir.exists() {
604                continue;
605            }
606
607            // Collect partitions to archive
608            let partitions_to_archive: Vec<(NaiveDate, PathBuf, String)> = fs::read_dir(&recent_data_dir)?
609                .filter_map(|e| e.ok())
610                .filter_map(|e| {
611                    let path = e.path();
612                    if !path.is_dir() {
613                        return None;
614                    }
615                    let dir_name = e.file_name().to_string_lossy().to_string();
616                    let date_str = dir_name.strip_prefix("date=")?;
617                    let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?;
618                    if date < cutoff_date {
619                        Some((date, path, dir_name))
620                    } else {
621                        None
622                    }
623                })
624                .collect();
625
626            for (_date, partition_path, dir_name) in partitions_to_archive {
627                let dest_dir = archive_data_dir.join(&dir_name);
628
629                // Count source stats
630                let source_files: Vec<PathBuf> = fs::read_dir(&partition_path)?
631                    .filter_map(|e| e.ok())
632                    .map(|e| e.path())
633                    .filter(|p| p.extension().map(|ext| ext == "parquet").unwrap_or(false))
634                    .collect();
635
636                if source_files.is_empty() {
637                    continue;
638                }
639
640                let file_count = source_files.len();
641                let bytes_before: u64 = source_files
642                    .iter()
643                    .filter_map(|p| fs::metadata(p).ok())
644                    .map(|m| m.len())
645                    .sum();
646
647                if dry_run {
648                    stats.partitions_archived += 1;
649                    stats.files_moved += file_count;
650                    stats.bytes_moved += bytes_before;
651                    continue;
652                }
653
654                // Skip if archive partition already exists with data
655                if dest_dir.exists() {
656                    let existing_files = fs::read_dir(&dest_dir)?
657                        .filter_map(|e| e.ok())
658                        .any(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false));
659                    if existing_files {
660                        // Already archived, skip
661                        continue;
662                    }
663                }
664
665                fs::create_dir_all(&dest_dir)?;
666
667                // Consolidate using DuckDB's COPY
668                let conn = self.connection()?;
669                let src_glob = format!("{}/*.parquet", partition_path.display());
670                let dest_file = dest_dir.join(format!("data_0.parquet"));
671                let temp_file = dest_dir.join(format!(".data_0.parquet.tmp"));
672
673                conn.execute(
674                    &format!(
675                        "COPY (SELECT * FROM read_parquet('{}', union_by_name = true)) \
676                         TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
677                        src_glob,
678                        temp_file.display()
679                    ),
680                    [],
681                )?;
682
683                // Atomic rename
684                fs::rename(&temp_file, &dest_file)?;
685
686                // Get consolidated size
687                let bytes_after = fs::metadata(&dest_file)?.len();
688
689                // Remove source files
690                for file in &source_files {
691                    let _ = fs::remove_file(file);
692                }
693                let _ = fs::remove_dir(&partition_path);
694
695                stats.partitions_archived += 1;
696                stats.files_moved += file_count;
697                stats.bytes_moved += bytes_after;
698            }
699        }
700
701        Ok(stats)
702    }
703
704    /// Run auto-compaction based on options.
705    pub fn auto_compact(&self, opts: &AutoCompactOptions) -> Result<(CompactStats, ArchiveStats)> {
706        // First, archive old data (skip if doing session-specific compaction)
707        let archive_stats = if opts.compact.session_filter.is_none() {
708            self.archive_old_data(opts.archive_days, opts.compact.dry_run)?
709        } else {
710            ArchiveStats::default()
711        };
712
713        // Then compact based on mode
714        let compact_stats = if let Some(ref session) = opts.compact.session_filter {
715            // Client-specific mode
716            self.compact_for_session_with_opts(session, &opts.compact)?
717        } else {
718            // Global mode: compact both recent and archive
719            let mut stats = self.compact_recent_with_opts(&opts.compact)?;
720            let archive_compact = self.compact_archive_with_opts(&opts.compact)?;
721            stats.add(&archive_compact);
722            stats
723        };
724
725        Ok((compact_stats, archive_stats))
726    }
727
728    /// Compact recent data with full options.
729    pub fn compact_recent_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
730        let mut total_stats = CompactStats::default();
731        let recent_dir = self.config().recent_dir();
732
733        for data_type in &["invocations", "outputs", "sessions", "events"] {
734            let data_dir = recent_dir.join(data_type);
735            let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
736            total_stats.add(&stats);
737        }
738
739        Ok(total_stats)
740    }
741
742    /// Compact archive data with full options.
743    pub fn compact_archive_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
744        let mut total_stats = CompactStats::default();
745        let archive_dir = self.config().archive_dir();
746
747        for data_type in &["invocations", "outputs", "sessions", "events"] {
748            let data_dir = archive_dir.join(data_type);
749            let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
750            total_stats.add(&stats);
751        }
752
753        Ok(total_stats)
754    }
755
756    /// Compact data type directory with full options.
757    pub fn compact_data_type_with_opts(
758        &self,
759        data_dir: &Path,
760        opts: &CompactOptions,
761    ) -> Result<CompactStats> {
762        let mut total_stats = CompactStats::default();
763
764        if !data_dir.exists() {
765            return Ok(total_stats);
766        }
767
768        for entry in fs::read_dir(data_dir)? {
769            let entry = entry?;
770            let path = entry.path();
771
772            if !path.is_dir() {
773                continue;
774            }
775
776            let stats = self.compact_partition_with_opts(&path, opts)?;
777            total_stats.add(&stats);
778        }
779
780        Ok(total_stats)
781    }
782
783    /// Compact files for a specific session with full options.
784    pub fn compact_for_session_with_opts(
785        &self,
786        session_id: &str,
787        opts: &CompactOptions,
788    ) -> Result<CompactStats> {
789        let mut total_stats = CompactStats::default();
790        let recent_dir = self.config().recent_dir();
791
792        let session_opts = CompactOptions {
793            session_filter: Some(session_id.to_string()),
794            ..opts.clone()
795        };
796
797        for data_type in &["invocations", "outputs", "sessions", "events"] {
798            let data_dir = recent_dir.join(data_type);
799            let stats = self.compact_data_type_with_opts(&data_dir, &session_opts)?;
800            total_stats.add(&stats);
801        }
802
803        Ok(total_stats)
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810    use crate::init::initialize;
811    use crate::schema::InvocationRecord;
812    use crate::Config;
813    use tempfile::TempDir;
814
815    fn setup_store() -> (TempDir, Store) {
816        let tmp = TempDir::new().unwrap();
817        let config = Config::with_root(tmp.path());
818        initialize(&config).unwrap();
819        let store = Store::open(config).unwrap();
820        (tmp, store)
821    }
822
823    #[test]
824    fn test_extract_session() {
825        assert_eq!(
826            extract_session("zsh-1234--make--uuid.parquet"),
827            Some("zsh-1234".to_string())
828        );
829        assert_eq!(
830            extract_session("session-id.parquet"),
831            Some("session-id".to_string())
832        );
833        assert_eq!(
834            extract_session("zsh-1234--__compacted-0__--uuid.parquet"),
835            Some("zsh-1234".to_string())
836        );
837    }
838
839    #[test]
840    fn test_is_compacted_file() {
841        assert!(is_compacted_file("zsh-1234--__compacted-0__--uuid.parquet"));
842        assert!(is_compacted_file("session--__compacted-5__--abc.parquet"));
843        assert!(!is_compacted_file("zsh-1234--make--uuid.parquet"));
844        assert!(!is_compacted_file("session.parquet"));
845    }
846
847    #[test]
848    fn test_extract_compaction_number() {
849        assert_eq!(
850            extract_compaction_number("zsh--__compacted-0__--uuid.parquet"),
851            Some(0)
852        );
853        assert_eq!(
854            extract_compaction_number("zsh--__compacted-42__--uuid.parquet"),
855            Some(42)
856        );
857        assert_eq!(
858            extract_compaction_number("zsh--make--uuid.parquet"),
859            None
860        );
861    }
862
863    #[test]
864    fn test_compact_recent_no_files() {
865        let (_tmp, store) = setup_store();
866
867        let stats = store.compact_recent(2, false).unwrap();
868        assert_eq!(stats.partitions_compacted, 0);
869    }
870
871    #[test]
872    fn test_compact_recent_with_files() {
873        let (_tmp, store) = setup_store();
874
875        // Write multiple invocations to create multiple files
876        for i in 0..5 {
877            let record = InvocationRecord::new(
878                "test-session",
879                format!("command-{}", i),
880                "/home/user",
881                0,
882                "test@client",
883            );
884            store.write_invocation(&record).unwrap();
885        }
886
887        // With threshold of 2, should compact oldest 3 (keeping 2)
888        let stats = store.compact_recent(2, false).unwrap();
889        assert_eq!(stats.sessions_compacted, 1);
890        assert_eq!(stats.files_before, 3); // 5 - 2 kept = 3 compacted
891        assert_eq!(stats.files_after, 1);
892    }
893
894    #[test]
895    fn test_compact_for_session() {
896        let (_tmp, store) = setup_store();
897
898        // Write files for two different sessions
899        for i in 0..5 {
900            let record = InvocationRecord::new(
901                "session-a",
902                format!("command-{}", i),
903                "/home/user",
904                0,
905                "test@client",
906            );
907            store.write_invocation(&record).unwrap();
908        }
909        for i in 0..3 {
910            let record = InvocationRecord::new(
911                "session-b",
912                format!("command-{}", i),
913                "/home/user",
914                0,
915                "test@client",
916            );
917            store.write_invocation(&record).unwrap();
918        }
919
920        // Compact only session-a with threshold of 2
921        let stats = store.compact_for_session("session-a", 2, false).unwrap();
922        assert_eq!(stats.sessions_compacted, 1);
923        assert_eq!(stats.files_before, 3); // 5 - 2 kept = 3
924
925        // session-b should be untouched (only 3 files, below threshold)
926        let date = chrono::Utc::now().date_naive();
927        let inv_dir = store.config().invocations_dir(&date);
928        let session_b_count = std::fs::read_dir(&inv_dir)
929            .unwrap()
930            .filter_map(|e| e.ok())
931            .filter(|e| e.file_name().to_string_lossy().starts_with("session-b"))
932            .count();
933        assert_eq!(session_b_count, 3);
934    }
935
936    #[test]
937    fn test_compact_dry_run() {
938        let (_tmp, store) = setup_store();
939
940        // Write multiple invocations
941        for i in 0..5 {
942            let record = InvocationRecord::new(
943                "test-session",
944                format!("command-{}", i),
945                "/home/user",
946                0,
947                "test@client",
948            );
949            store.write_invocation(&record).unwrap();
950        }
951
952        // Dry run should report stats but not actually compact
953        let stats = store.compact_recent(2, true).unwrap();
954        assert_eq!(stats.sessions_compacted, 1);
955        assert_eq!(stats.files_before, 3);
956
957        // Files should still be there
958        let date = chrono::Utc::now().date_naive();
959        let inv_dir = store.config().invocations_dir(&date);
960        let file_count = std::fs::read_dir(&inv_dir)
961            .unwrap()
962            .filter_map(|e| e.ok())
963            .filter(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false))
964            .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
965            .count();
966        assert_eq!(file_count, 5);
967    }
968
969    #[test]
970    fn test_compacted_file_naming() {
971        let (_tmp, store) = setup_store();
972
973        // Write enough files to trigger compaction
974        for i in 0..5 {
975            let record = InvocationRecord::new(
976                "zsh-9999",
977                format!("cmd-{}", i),
978                "/home/user",
979                0,
980                "test@client",
981            );
982            store.write_invocation(&record).unwrap();
983        }
984
985        // Compact with threshold of 2
986        store.compact_recent(2, false).unwrap();
987
988        // Check that compacted file has correct naming
989        let date = chrono::Utc::now().date_naive();
990        let inv_dir = store.config().invocations_dir(&date);
991        let compacted_files: Vec<_> = std::fs::read_dir(&inv_dir)
992            .unwrap()
993            .filter_map(|e| e.ok())
994            .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
995            .collect();
996
997        assert_eq!(compacted_files.len(), 1);
998        let name = compacted_files[0].file_name().to_string_lossy().to_string();
999        assert!(name.starts_with("zsh-9999--__compacted-0__--"));
1000        assert!(name.ends_with(".parquet"));
1001    }
1002}