1use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::{NaiveDate, Utc};
12use uuid::Uuid;
13
14use super::atomic;
15use super::Store;
16use crate::Result;
17
18fn is_compacted_file(name: &str) -> bool {
20 name.contains("__compacted-")
21}
22
23fn is_seed_file(name: &str) -> bool {
25 name.starts_with("_seed")
26}
27
28fn extract_session(name: &str) -> Option<String> {
34 let stem = name.strip_suffix(".parquet")?;
35
36 if let Some(idx) = stem.find("--") {
38 Some(stem[..idx].to_string())
39 } else {
40 Some(stem.to_string())
42 }
43}
44
45fn extract_compaction_number(name: &str) -> Option<u32> {
47 if let Some(start) = name.find("__compacted-") {
49 let after_prefix = &name[start + 12..]; if let Some(end) = after_prefix.find("__") {
51 if let Ok(n) = after_prefix[..end].parse::<u32>() {
52 return Some(n);
53 }
54 }
55 }
56 None
57}
58
59fn next_compaction_number(partition_dir: &Path, session: &str) -> u32 {
61 fs::read_dir(partition_dir)
62 .ok()
63 .map(|entries| {
64 entries
65 .filter_map(|e| e.ok())
66 .filter_map(|e| {
67 let name = e.file_name().to_string_lossy().to_string();
68 if name.starts_with(&format!("{}--__compacted-", session)) {
70 extract_compaction_number(&name)
71 } else {
72 None
73 }
74 })
75 .max()
76 .map(|n| n + 1)
77 .unwrap_or(0)
78 })
79 .unwrap_or(0)
80}
81
82fn file_mtime(path: &Path) -> u64 {
84 fs::metadata(path)
85 .and_then(|m| m.modified())
86 .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs())
87 .unwrap_or(0)
88}
89
90#[derive(Debug, Default)]
92pub struct CompactStats {
93 pub partitions_compacted: usize,
94 pub sessions_compacted: usize,
95 pub files_before: usize,
96 pub files_after: usize,
97 pub bytes_before: u64,
98 pub bytes_after: u64,
99}
100
101impl CompactStats {
102 pub fn add(&mut self, other: &CompactStats) {
103 self.partitions_compacted += other.partitions_compacted;
104 self.sessions_compacted += other.sessions_compacted;
105 self.files_before += other.files_before;
106 self.files_after += other.files_after;
107 self.bytes_before += other.bytes_before;
108 self.bytes_after += other.bytes_after;
109 }
110}
111
112#[derive(Debug, Default)]
114pub struct ArchiveStats {
115 pub partitions_archived: usize,
116 pub files_moved: usize,
117 pub bytes_moved: u64,
118}
119
120#[derive(Debug, Clone)]
122pub struct CompactOptions {
123 pub file_threshold: usize,
125 pub recompact_threshold: usize,
128 pub consolidate: bool,
130 pub dry_run: bool,
132 pub session_filter: Option<String>,
134}
135
136impl Default for CompactOptions {
137 fn default() -> Self {
138 Self {
139 file_threshold: 50,
140 recompact_threshold: 10,
141 consolidate: false,
142 dry_run: false,
143 session_filter: None,
144 }
145 }
146}
147
148#[derive(Debug)]
150pub struct AutoCompactOptions {
151 pub compact: CompactOptions,
153 pub archive_days: u32,
155}
156
157impl Default for AutoCompactOptions {
158 fn default() -> Self {
159 Self {
160 compact: CompactOptions::default(),
161 archive_days: 14,
162 }
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct CleanOptions {
169 pub dry_run: bool,
171 pub max_age_hours: u32,
173 pub prune: bool,
175 pub older_than_days: u32,
177}
178
179impl Default for CleanOptions {
180 fn default() -> Self {
181 Self {
182 dry_run: false,
183 max_age_hours: 24,
184 prune: false,
185 older_than_days: 365,
186 }
187 }
188}
189
190#[derive(Debug, Default)]
192pub struct CleanStats {
193 pub pending_checked: usize,
195 pub still_running: usize,
197 pub orphaned: usize,
199 pub pruned_files: usize,
201 pub bytes_freed: u64,
203}
204
205impl Store {
206 fn compact_session_files(
211 &self,
212 partition_dir: &Path,
213 session: &str,
214 files: &mut Vec<PathBuf>,
215 keep_count: usize,
216 dry_run: bool,
217 ) -> Result<CompactStats> {
218 files.sort_by_key(|p| file_mtime(p));
220
221 let to_keep = files.len().saturating_sub(keep_count).max(0);
223 if to_keep < 2 {
224 return Ok(CompactStats::default());
226 }
227
228 let files_to_compact: Vec<PathBuf> = files.drain(..to_keep).collect();
229 let files_before = files_to_compact.len();
230 let bytes_before: u64 = files_to_compact
231 .iter()
232 .filter_map(|p| fs::metadata(p).ok())
233 .map(|m| m.len())
234 .sum();
235
236 if dry_run {
237 return Ok(CompactStats {
238 partitions_compacted: 0, sessions_compacted: 1,
240 files_before,
241 files_after: 1,
242 bytes_before,
243 bytes_after: bytes_before, });
245 }
246
247 let conn = self.connection_with_options(false)?;
249
250 let file_list: Vec<String> = files_to_compact
252 .iter()
253 .map(|p| p.display().to_string())
254 .collect();
255 let file_list_sql = file_list
256 .iter()
257 .map(|f| format!("'{}'", f))
258 .collect::<Vec<_>>()
259 .join(", ");
260
261 conn.execute(
263 &format!(
264 "CREATE OR REPLACE TEMP TABLE compact_temp AS
265 SELECT * FROM read_parquet([{}], union_by_name = true)",
266 file_list_sql
267 ),
268 [],
269 )?;
270
271 let seq_num = next_compaction_number(partition_dir, session);
273 let uuid = Uuid::now_v7();
274 let compacted_name = format!("{}--__compacted-{}__--{}.parquet", session, seq_num, uuid);
275 let compacted_path = partition_dir.join(&compacted_name);
276 let temp_path = atomic::temp_path(&compacted_path);
277
278 conn.execute(
279 &format!(
280 "COPY compact_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
281 temp_path.display()
282 ),
283 [],
284 )?;
285
286 conn.execute("DROP TABLE compact_temp", [])?;
287
288 let bytes_after = fs::metadata(&temp_path)?.len();
290
291 for file in &files_to_compact {
293 fs::remove_file(file)?;
294 }
295
296 atomic::rename_into_place(&temp_path, &compacted_path)?;
298
299 Ok(CompactStats {
300 partitions_compacted: 0, sessions_compacted: 1,
302 files_before,
303 files_after: 1,
304 bytes_before,
305 bytes_after,
306 })
307 }
308
309 fn consolidate_session_files(
314 &self,
315 partition_dir: &Path,
316 _session: &str,
317 files: Vec<PathBuf>,
318 dry_run: bool,
319 ) -> Result<CompactStats> {
320 if files.len() < 2 {
321 return Ok(CompactStats::default());
322 }
323
324 let files_before = files.len();
325 let bytes_before: u64 = files
326 .iter()
327 .filter_map(|p| fs::metadata(p).ok())
328 .map(|m| m.len())
329 .sum();
330
331 if dry_run {
332 return Ok(CompactStats {
333 partitions_compacted: 0,
334 sessions_compacted: 1,
335 files_before,
336 files_after: 1,
337 bytes_before,
338 bytes_after: bytes_before,
339 });
340 }
341
342 let conn = self.connection_with_options(false)?;
344
345 let file_list_sql = files
347 .iter()
348 .map(|p| format!("'{}'", p.display()))
349 .collect::<Vec<_>>()
350 .join(", ");
351
352 conn.execute(
354 &format!(
355 "CREATE OR REPLACE TEMP TABLE consolidate_temp AS
356 SELECT * FROM read_parquet([{}], union_by_name = true)",
357 file_list_sql
358 ),
359 [],
360 )?;
361
362 let uuid = Uuid::now_v7();
364 let consolidated_name = format!("data_{}.parquet", uuid);
365 let consolidated_path = partition_dir.join(&consolidated_name);
366 let temp_path = atomic::temp_path(&consolidated_path);
367
368 conn.execute(
369 &format!(
370 "COPY consolidate_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
371 temp_path.display()
372 ),
373 [],
374 )?;
375
376 conn.execute("DROP TABLE consolidate_temp", [])?;
377
378 let bytes_after = fs::metadata(&temp_path)?.len();
379
380 for file in &files {
382 fs::remove_file(file)?;
383 }
384
385 atomic::rename_into_place(&temp_path, &consolidated_path)?;
387
388 Ok(CompactStats {
389 partitions_compacted: 0,
390 sessions_compacted: 1,
391 files_before,
392 files_after: 1,
393 bytes_before,
394 bytes_after,
395 })
396 }
397
398 pub fn compact_partition_with_opts(
405 &self,
406 partition_dir: &Path,
407 opts: &CompactOptions,
408 ) -> Result<CompactStats> {
409 let mut total_stats = CompactStats::default();
410
411 let mut session_files: HashMap<String, (Vec<PathBuf>, Vec<PathBuf>)> = HashMap::new();
413
414 for entry in fs::read_dir(partition_dir)? {
415 let entry = entry?;
416 let path = entry.path();
417 let name = entry.file_name().to_string_lossy().to_string();
418
419 if !name.ends_with(".parquet") || is_seed_file(&name) {
421 continue;
422 }
423
424 if let Some(session) = extract_session(&name) {
426 if let Some(ref filter) = opts.session_filter {
428 if session != *filter {
429 continue;
430 }
431 }
432
433 let entry = session_files.entry(session).or_insert_with(|| (vec![], vec![]));
434 if is_compacted_file(&name) || name.starts_with("data_") {
435 entry.1.push(path); } else {
437 entry.0.push(path); }
439 }
440 }
441
442 let mut any_compacted = false;
443 for (session, (non_compacted, compacted)) in session_files {
444 if opts.consolidate {
445 let all_files: Vec<PathBuf> = non_compacted.into_iter().chain(compacted).collect();
447 if all_files.len() >= 2 {
448 let stats = self.consolidate_session_files(
449 partition_dir,
450 &session,
451 all_files,
452 opts.dry_run,
453 )?;
454 if stats.sessions_compacted > 0 {
455 any_compacted = true;
456 total_stats.add(&stats);
457 }
458 }
459 } else {
460 if non_compacted.len() >= opts.file_threshold {
464 let mut to_process = non_compacted;
465 let stats = self.compact_session_files(
466 partition_dir,
467 &session,
468 &mut to_process,
469 opts.file_threshold,
470 opts.dry_run,
471 )?;
472 if stats.sessions_compacted > 0 {
473 any_compacted = true;
474 total_stats.add(&stats);
475 }
476 }
477
478 if opts.recompact_threshold > 0 && compacted.len() >= opts.recompact_threshold {
480 let stats = self.consolidate_session_files(
481 partition_dir,
482 &session,
483 compacted,
484 opts.dry_run,
485 )?;
486 if stats.sessions_compacted > 0 {
487 any_compacted = true;
488 total_stats.add(&stats);
489 }
490 }
491 }
492 }
493
494 if any_compacted {
495 total_stats.partitions_compacted = 1;
496 }
497
498 Ok(total_stats)
499 }
500
501 pub fn compact_partition(
503 &self,
504 partition_dir: &Path,
505 file_threshold: usize,
506 session_filter: Option<&str>,
507 dry_run: bool,
508 ) -> Result<CompactStats> {
509 let opts = CompactOptions {
510 file_threshold,
511 recompact_threshold: 0, consolidate: false,
513 dry_run,
514 session_filter: session_filter.map(|s| s.to_string()),
515 };
516 self.compact_partition_with_opts(partition_dir, &opts)
517 }
518
519 pub fn compact_data_type(
521 &self,
522 data_dir: &Path,
523 file_threshold: usize,
524 session_filter: Option<&str>,
525 dry_run: bool,
526 ) -> Result<CompactStats> {
527 let mut total_stats = CompactStats::default();
528
529 if !data_dir.exists() {
530 return Ok(total_stats);
531 }
532
533 for entry in fs::read_dir(data_dir)? {
535 let entry = entry?;
536 let path = entry.path();
537
538 if !path.is_dir() {
539 continue;
540 }
541
542 let dir_name = entry.file_name().to_string_lossy().to_string();
543
544 if dir_name.starts_with("status=") {
546 let stats =
548 self.compact_data_type(&path, file_threshold, session_filter, dry_run)?;
549 total_stats.add(&stats);
550 } else {
551 let stats =
553 self.compact_partition(&path, file_threshold, session_filter, dry_run)?;
554 total_stats.add(&stats);
555 }
556 }
557
558 Ok(total_stats)
559 }
560
561 pub fn compact_for_session(
565 &self,
566 session_id: &str,
567 file_threshold: usize,
568 dry_run: bool,
569 ) -> Result<CompactStats> {
570 let mut total_stats = CompactStats::default();
571 let recent_dir = self.config().recent_dir();
572
573 for data_type in &["invocations", "outputs", "sessions", "events"] {
574 let data_dir = recent_dir.join(data_type);
575 let stats =
576 self.compact_data_type(&data_dir, file_threshold, Some(session_id), dry_run)?;
577 total_stats.add(&stats);
578 }
579
580 Ok(total_stats)
581 }
582
583 pub fn compact_session_today(
587 &self,
588 session_id: &str,
589 file_threshold: usize,
590 dry_run: bool,
591 ) -> Result<CompactStats> {
592 let mut total_stats = CompactStats::default();
593 let recent_dir = self.config().recent_dir();
594 let today = Utc::now().date_naive();
595 let date_partition = format!("date={}", today.format("%Y-%m-%d"));
596
597 for data_type in &["invocations", "outputs", "sessions", "events"] {
598 let partition_dir = recent_dir.join(data_type).join(&date_partition);
599 if partition_dir.exists() {
600 let stats = self.compact_partition(
601 &partition_dir,
602 file_threshold,
603 Some(session_id),
604 dry_run,
605 )?;
606 total_stats.add(&stats);
607 }
608 }
609
610 Ok(total_stats)
611 }
612
613 pub fn compact_recent(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
615 let mut total_stats = CompactStats::default();
616 let recent_dir = self.config().recent_dir();
617
618 for data_type in &["invocations", "outputs", "sessions", "events"] {
619 let data_dir = recent_dir.join(data_type);
620 let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
621 total_stats.add(&stats);
622 }
623
624 Ok(total_stats)
625 }
626
627 pub fn compact_archive(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
629 let mut total_stats = CompactStats::default();
630 let archive_dir = self.config().archive_dir();
631
632 for data_type in &["invocations", "outputs", "sessions", "events"] {
633 let data_dir = archive_dir.join(data_type);
634 let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
635 total_stats.add(&stats);
636 }
637
638 Ok(total_stats)
639 }
640
641 pub fn archive_old_data(&self, older_than_days: u32, dry_run: bool) -> Result<ArchiveStats> {
646 let mut stats = ArchiveStats::default();
647 let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
648
649 let recent_dir = self.config().recent_dir();
650 let archive_dir = self.config().archive_dir();
651
652 for data_type in &["invocations", "outputs", "sessions", "events"] {
653 let recent_data_dir = recent_dir.join(data_type);
654 let archive_data_dir = archive_dir.join(data_type);
655
656 if !recent_data_dir.exists() {
657 continue;
658 }
659
660 let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
664
665 let date_partition_parent = if *data_type == "invocations" {
668 recent_data_dir.join("status=completed")
669 } else {
670 recent_data_dir.clone()
671 };
672
673 if !date_partition_parent.exists() {
674 continue;
675 }
676
677 let partitions_to_archive: Vec<(NaiveDate, PathBuf, String)> = fs::read_dir(&date_partition_parent)?
678 .filter_map(|e| e.ok())
679 .filter_map(|e| {
680 let path = e.path();
681 if !path.is_dir() {
682 return None;
683 }
684 let dir_name = e.file_name().to_string_lossy().to_string();
685 let date_str = dir_name.strip_prefix("date=")?;
686 let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?;
687 if date == seed_date {
689 return None;
690 }
691 if date <= cutoff_date {
692 Some((date, path, dir_name))
693 } else {
694 None
695 }
696 })
697 .collect();
698
699 for (_date, partition_path, dir_name) in partitions_to_archive {
700 let dest_dir = archive_data_dir.join(&dir_name);
701
702 let source_files: Vec<PathBuf> = fs::read_dir(&partition_path)?
704 .filter_map(|e| e.ok())
705 .map(|e| e.path())
706 .filter(|p| p.extension().map(|ext| ext == "parquet").unwrap_or(false))
707 .collect();
708
709 if source_files.is_empty() {
710 continue;
711 }
712
713 let file_count = source_files.len();
714 let bytes_before: u64 = source_files
715 .iter()
716 .filter_map(|p| fs::metadata(p).ok())
717 .map(|m| m.len())
718 .sum();
719
720 if dry_run {
721 stats.partitions_archived += 1;
722 stats.files_moved += file_count;
723 stats.bytes_moved += bytes_before;
724 continue;
725 }
726
727 if dest_dir.exists() {
729 let existing_files = fs::read_dir(&dest_dir)?
730 .filter_map(|e| e.ok())
731 .any(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false));
732 if existing_files {
733 continue;
735 }
736 }
737
738 fs::create_dir_all(&dest_dir)?;
739
740 let conn = self.connection_with_options(false)?;
744 let src_glob = format!("{}/*.parquet", partition_path.display());
745 let dest_file = dest_dir.join("data_0.parquet");
746 let temp_file = dest_dir.join(".data_0.parquet.tmp");
747
748 conn.execute(
749 &format!(
750 "COPY (SELECT * FROM read_parquet('{}', union_by_name = true)) \
751 TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
752 src_glob,
753 temp_file.display()
754 ),
755 [],
756 )?;
757
758 fs::rename(&temp_file, &dest_file)?;
760
761 let bytes_after = fs::metadata(&dest_file)?.len();
763
764 for file in &source_files {
766 let _ = fs::remove_file(file);
767 }
768 let _ = fs::remove_dir(&partition_path);
769
770 stats.partitions_archived += 1;
771 stats.files_moved += file_count;
772 stats.bytes_moved += bytes_after;
773 }
774 }
775
776 Ok(stats)
777 }
778
779 pub fn auto_compact(&self, opts: &AutoCompactOptions) -> Result<(CompactStats, ArchiveStats)> {
781 let archive_stats = if opts.compact.session_filter.is_none() {
783 self.archive_old_data(opts.archive_days, opts.compact.dry_run)?
784 } else {
785 ArchiveStats::default()
786 };
787
788 let compact_stats = if let Some(ref session) = opts.compact.session_filter {
790 self.compact_for_session_with_opts(session, &opts.compact)?
792 } else {
793 let mut stats = self.compact_recent_with_opts(&opts.compact)?;
795 let archive_compact = self.compact_archive_with_opts(&opts.compact)?;
796 stats.add(&archive_compact);
797 stats
798 };
799
800 Ok((compact_stats, archive_stats))
801 }
802
803 pub fn compact_recent_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
805 let mut total_stats = CompactStats::default();
806 let recent_dir = self.config().recent_dir();
807
808 for data_type in &["invocations", "outputs", "sessions", "events"] {
809 let data_dir = recent_dir.join(data_type);
810 let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
811 total_stats.add(&stats);
812 }
813
814 Ok(total_stats)
815 }
816
817 pub fn compact_archive_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
819 let mut total_stats = CompactStats::default();
820 let archive_dir = self.config().archive_dir();
821
822 for data_type in &["invocations", "outputs", "sessions", "events"] {
823 let data_dir = archive_dir.join(data_type);
824 let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
825 total_stats.add(&stats);
826 }
827
828 Ok(total_stats)
829 }
830
831 pub fn compact_data_type_with_opts(
833 &self,
834 data_dir: &Path,
835 opts: &CompactOptions,
836 ) -> Result<CompactStats> {
837 let mut total_stats = CompactStats::default();
838
839 if !data_dir.exists() {
840 return Ok(total_stats);
841 }
842
843 for entry in fs::read_dir(data_dir)? {
844 let entry = entry?;
845 let path = entry.path();
846
847 if !path.is_dir() {
848 continue;
849 }
850
851 let dir_name = entry.file_name().to_string_lossy().to_string();
852
853 if dir_name.starts_with("status=") {
855 let stats = self.compact_data_type_with_opts(&path, opts)?;
857 total_stats.add(&stats);
858 } else {
859 let stats = self.compact_partition_with_opts(&path, opts)?;
861 total_stats.add(&stats);
862 }
863 }
864
865 Ok(total_stats)
866 }
867
868 pub fn compact_for_session_with_opts(
870 &self,
871 session_id: &str,
872 opts: &CompactOptions,
873 ) -> Result<CompactStats> {
874 let mut total_stats = CompactStats::default();
875 let recent_dir = self.config().recent_dir();
876
877 let session_opts = CompactOptions {
878 session_filter: Some(session_id.to_string()),
879 ..opts.clone()
880 };
881
882 for data_type in &["invocations", "outputs", "sessions", "events"] {
883 let data_dir = recent_dir.join(data_type);
884 let stats = self.compact_data_type_with_opts(&data_dir, &session_opts)?;
885 total_stats.add(&stats);
886 }
887
888 Ok(total_stats)
889 }
890
891 pub fn clean(&self, opts: &CleanOptions) -> Result<CleanStats> {
898 let mut stats = CleanStats::default();
899
900 let recovery_stats =
902 self.recover_orphaned_invocations(opts.max_age_hours, opts.dry_run)?;
903
904 stats.pending_checked = recovery_stats.pending_checked;
905 stats.still_running = recovery_stats.still_running;
906 stats.orphaned = recovery_stats.orphaned;
907
908 if opts.prune {
910 let prune_stats = self.prune_archive(opts.older_than_days, opts.dry_run)?;
911 stats.pruned_files = prune_stats.files_pruned;
912 stats.bytes_freed = prune_stats.bytes_freed;
913 }
914
915 Ok(stats)
916 }
917
918 pub fn prune_archive(&self, older_than_days: u32, dry_run: bool) -> Result<PruneStats> {
922 let mut stats = PruneStats::default();
923 let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
924 let archive_dir = self.config().archive_dir();
925
926 for data_type in &["invocations", "outputs", "sessions", "events"] {
927 let data_dir = archive_dir.join(data_type);
928 if !data_dir.exists() {
929 continue;
930 }
931
932 let partition_stats =
934 self.prune_date_partitions(&data_dir, cutoff_date, dry_run)?;
935 stats.add(&partition_stats);
936 }
937
938 Ok(stats)
939 }
940
941 fn prune_date_partitions(
943 &self,
944 parent_dir: &Path,
945 cutoff_date: NaiveDate,
946 dry_run: bool,
947 ) -> Result<PruneStats> {
948 let mut stats = PruneStats::default();
949 let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
950
951 for entry in fs::read_dir(parent_dir)?.flatten() {
952 let path = entry.path();
953 if !path.is_dir() {
954 continue;
955 }
956
957 let dir_name = entry.file_name().to_string_lossy().to_string();
958 let date_str = match dir_name.strip_prefix("date=") {
959 Some(s) => s,
960 None => continue,
961 };
962
963 let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
964 Ok(d) => d,
965 Err(_) => continue,
966 };
967
968 if date == seed_date {
970 continue;
971 }
972
973 if date > cutoff_date {
975 continue;
976 }
977
978 for file_entry in fs::read_dir(&path)?.flatten() {
980 let file_path = file_entry.path();
981 if file_path.extension().map(|e| e == "parquet").unwrap_or(false) {
982 if let Ok(metadata) = fs::metadata(&file_path) {
983 stats.files_pruned += 1;
984 stats.bytes_freed += metadata.len();
985 }
986
987 if !dry_run {
988 let _ = fs::remove_file(&file_path);
989 }
990 }
991 }
992
993 if !dry_run {
995 let _ = fs::remove_dir(&path);
996 }
997 }
998
999 Ok(stats)
1000 }
1001}
1002
1003#[derive(Debug, Default)]
1005pub struct PruneStats {
1006 pub files_pruned: usize,
1008 pub bytes_freed: u64,
1010}
1011
1012impl PruneStats {
1013 fn add(&mut self, other: &PruneStats) {
1014 self.files_pruned += other.files_pruned;
1015 self.bytes_freed += other.bytes_freed;
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022 use crate::init::initialize;
1023 use crate::schema::InvocationRecord;
1024 use crate::Config;
1025 use tempfile::TempDir;
1026
1027 fn setup_store() -> (TempDir, Store) {
1028 let tmp = TempDir::new().unwrap();
1029 let config = Config::with_root(tmp.path());
1030 initialize(&config).unwrap();
1031 let store = Store::open(config).unwrap();
1032 (tmp, store)
1033 }
1034
1035 fn setup_store_duckdb() -> (TempDir, Store) {
1036 let tmp = TempDir::new().unwrap();
1037 let config = Config::with_duckdb_mode(tmp.path());
1038 initialize(&config).unwrap();
1039 let store = Store::open(config).unwrap();
1040 (tmp, store)
1041 }
1042
1043 #[test]
1044 fn test_extract_session() {
1045 assert_eq!(
1046 extract_session("zsh-1234--make--uuid.parquet"),
1047 Some("zsh-1234".to_string())
1048 );
1049 assert_eq!(
1050 extract_session("session-id.parquet"),
1051 Some("session-id".to_string())
1052 );
1053 assert_eq!(
1054 extract_session("zsh-1234--__compacted-0__--uuid.parquet"),
1055 Some("zsh-1234".to_string())
1056 );
1057 }
1058
1059 #[test]
1060 fn test_is_compacted_file() {
1061 assert!(is_compacted_file("zsh-1234--__compacted-0__--uuid.parquet"));
1062 assert!(is_compacted_file("session--__compacted-5__--abc.parquet"));
1063 assert!(!is_compacted_file("zsh-1234--make--uuid.parquet"));
1064 assert!(!is_compacted_file("session.parquet"));
1065 }
1066
1067 #[test]
1068 fn test_extract_compaction_number() {
1069 assert_eq!(
1070 extract_compaction_number("zsh--__compacted-0__--uuid.parquet"),
1071 Some(0)
1072 );
1073 assert_eq!(
1074 extract_compaction_number("zsh--__compacted-42__--uuid.parquet"),
1075 Some(42)
1076 );
1077 assert_eq!(
1078 extract_compaction_number("zsh--make--uuid.parquet"),
1079 None
1080 );
1081 }
1082
1083 #[test]
1084 fn test_compact_recent_no_files() {
1085 let (_tmp, store) = setup_store();
1086
1087 let stats = store.compact_recent(2, false).unwrap();
1088 assert_eq!(stats.partitions_compacted, 0);
1089 }
1090
1091 #[test]
1092 fn test_compact_recent_with_files() {
1093 let (_tmp, store) = setup_store();
1094
1095 for i in 0..5 {
1097 let record = InvocationRecord::new(
1098 "test-session",
1099 format!("command-{}", i),
1100 "/home/user",
1101 0,
1102 "test@client",
1103 );
1104 store.write_invocation(&record).unwrap();
1105 }
1106
1107 let stats = store.compact_recent(2, false).unwrap();
1109 assert_eq!(stats.sessions_compacted, 1);
1110 assert_eq!(stats.files_before, 3); assert_eq!(stats.files_after, 1);
1112 }
1113
1114 #[test]
1115 fn test_compact_for_session() {
1116 let (_tmp, store) = setup_store();
1117
1118 for i in 0..5 {
1120 let record = InvocationRecord::new(
1121 "session-a",
1122 format!("command-{}", i),
1123 "/home/user",
1124 0,
1125 "test@client",
1126 );
1127 store.write_invocation(&record).unwrap();
1128 }
1129 for i in 0..3 {
1130 let record = InvocationRecord::new(
1131 "session-b",
1132 format!("command-{}", i),
1133 "/home/user",
1134 0,
1135 "test@client",
1136 );
1137 store.write_invocation(&record).unwrap();
1138 }
1139
1140 let stats = store.compact_for_session("session-a", 2, false).unwrap();
1142 assert_eq!(stats.sessions_compacted, 1);
1143 assert_eq!(stats.files_before, 3); let date = chrono::Utc::now().date_naive();
1147 let inv_dir = store.config().invocations_dir(&date);
1148 let session_b_count = std::fs::read_dir(&inv_dir)
1149 .unwrap()
1150 .filter_map(|e| e.ok())
1151 .filter(|e| e.file_name().to_string_lossy().starts_with("session-b"))
1152 .count();
1153 assert_eq!(session_b_count, 3);
1154 }
1155
1156 #[test]
1157 fn test_compact_dry_run() {
1158 let (_tmp, store) = setup_store();
1159
1160 for i in 0..5 {
1162 let record = InvocationRecord::new(
1163 "test-session",
1164 format!("command-{}", i),
1165 "/home/user",
1166 0,
1167 "test@client",
1168 );
1169 store.write_invocation(&record).unwrap();
1170 }
1171
1172 let stats = store.compact_recent(2, true).unwrap();
1174 assert_eq!(stats.sessions_compacted, 1);
1175 assert_eq!(stats.files_before, 3);
1176
1177 let date = chrono::Utc::now().date_naive();
1179 let inv_dir = store.config().invocations_dir(&date);
1180 let file_count = std::fs::read_dir(&inv_dir)
1181 .unwrap()
1182 .filter_map(|e| e.ok())
1183 .filter(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false))
1184 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1185 .count();
1186 assert_eq!(file_count, 5);
1187 }
1188
1189 #[test]
1190 fn test_compacted_file_naming() {
1191 let (_tmp, store) = setup_store();
1192
1193 for i in 0..5 {
1195 let record = InvocationRecord::new(
1196 "zsh-9999",
1197 format!("cmd-{}", i),
1198 "/home/user",
1199 0,
1200 "test@client",
1201 );
1202 store.write_invocation(&record).unwrap();
1203 }
1204
1205 store.compact_recent(2, false).unwrap();
1207
1208 let date = chrono::Utc::now().date_naive();
1210 let inv_dir = store.config().invocations_dir(&date);
1211 let compacted_files: Vec<_> = std::fs::read_dir(&inv_dir)
1212 .unwrap()
1213 .filter_map(|e| e.ok())
1214 .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1215 .collect();
1216
1217 assert_eq!(compacted_files.len(), 1);
1218 let name = compacted_files[0].file_name().to_string_lossy().to_string();
1219 assert!(name.starts_with("zsh-9999--__compacted-0__--"));
1220 assert!(name.ends_with(".parquet"));
1221 }
1222
1223 #[test]
1227 fn test_compact_duckdb_mode_no_op() {
1228 let (_tmp, store) = setup_store_duckdb();
1229
1230 for i in 0..10 {
1232 let record = InvocationRecord::new(
1233 "test-session",
1234 format!("command-{}", i),
1235 "/home/user",
1236 0,
1237 "test@client",
1238 );
1239 store.write_invocation(&record).unwrap();
1240 }
1241
1242 let conn = store.connection().unwrap();
1244 let count: i64 = conn
1245 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1246 .unwrap();
1247 assert_eq!(count, 10, "Data should be in DuckDB table");
1248
1249 let stats = store.compact_recent(2, false).unwrap();
1251 assert_eq!(stats.partitions_compacted, 0);
1252 assert_eq!(stats.sessions_compacted, 0);
1253 assert_eq!(stats.files_before, 0);
1254 assert_eq!(stats.files_after, 0);
1255
1256 let count: i64 = conn
1258 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1259 .unwrap();
1260 assert_eq!(count, 10, "Data should be unaffected");
1261 }
1262
1263 #[test]
1264 fn test_compact_for_session_duckdb_mode_no_op() {
1265 let (_tmp, store) = setup_store_duckdb();
1266
1267 for i in 0..5 {
1269 let record = InvocationRecord::new(
1270 "session-a",
1271 format!("command-{}", i),
1272 "/home/user",
1273 0,
1274 "test@client",
1275 );
1276 store.write_invocation(&record).unwrap();
1277 }
1278
1279 let stats = store.compact_for_session("session-a", 2, false).unwrap();
1281 assert_eq!(stats.partitions_compacted, 0);
1282 assert_eq!(stats.sessions_compacted, 0);
1283 }
1284
1285 #[test]
1286 fn test_compact_session_today_duckdb_mode_no_op() {
1287 let (_tmp, store) = setup_store_duckdb();
1288
1289 let record = InvocationRecord::new(
1291 "test-session",
1292 "echo hello",
1293 "/home/user",
1294 0,
1295 "test@client",
1296 );
1297 store.write_invocation(&record).unwrap();
1298
1299 let stats = store.compact_session_today("test-session", 2, false).unwrap();
1301 assert_eq!(stats.partitions_compacted, 0);
1302 assert_eq!(stats.sessions_compacted, 0);
1303 }
1304
1305 #[test]
1306 fn test_auto_compact_duckdb_mode_no_op() {
1307 let (_tmp, store) = setup_store_duckdb();
1308
1309 for i in 0..5 {
1311 let record = InvocationRecord::new(
1312 "test-session",
1313 format!("command-{}", i),
1314 "/home/user",
1315 0,
1316 "test@client",
1317 );
1318 store.write_invocation(&record).unwrap();
1319 }
1320
1321 let opts = AutoCompactOptions::default();
1323 let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1324
1325 assert_eq!(compact_stats.partitions_compacted, 0);
1326 assert_eq!(compact_stats.sessions_compacted, 0);
1327 assert_eq!(archive_stats.partitions_archived, 0);
1328 }
1329
1330 #[test]
1331 fn test_archive_old_data_duckdb_mode_no_op() {
1332 let (_tmp, store) = setup_store_duckdb();
1333
1334 let record = InvocationRecord::new(
1336 "test-session",
1337 "echo hello",
1338 "/home/user",
1339 0,
1340 "test@client",
1341 );
1342 store.write_invocation(&record).unwrap();
1343
1344 let stats = store.archive_old_data(0, false).unwrap(); assert_eq!(stats.partitions_archived, 0);
1347 assert_eq!(stats.files_moved, 0);
1348 }
1349
1350 #[test]
1353 fn test_archive_old_data_moves_partitions() {
1354 let (_tmp, store) = setup_store();
1355
1356 for i in 0..3 {
1358 let record = InvocationRecord::new(
1359 "test-session",
1360 format!("command-{}", i),
1361 "/home/user",
1362 0,
1363 "test@client",
1364 );
1365 store.write_invocation(&record).unwrap();
1366 }
1367
1368 let date = chrono::Utc::now().date_naive();
1370 let recent_dir = store.config().invocations_dir(&date);
1371 let recent_count = std::fs::read_dir(&recent_dir)
1372 .unwrap()
1373 .filter_map(|e| e.ok())
1374 .filter(|e| {
1375 e.path()
1376 .extension()
1377 .map(|ext| ext == "parquet")
1378 .unwrap_or(false)
1379 })
1380 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1381 .count();
1382 assert_eq!(recent_count, 3, "Should have 3 files in recent");
1383
1384 let stats = store.archive_old_data(0, false).unwrap();
1386
1387 assert!(stats.partitions_archived >= 1, "Should archive at least 1 partition");
1390 assert!(stats.files_moved > 0, "Should move files");
1391
1392 let archive_dir = store
1394 .config()
1395 .archive_dir()
1396 .join("invocations")
1397 .join(format!("date={}", date));
1398 assert!(archive_dir.exists(), "Archive partition should exist");
1399
1400 let archive_files: Vec<_> = std::fs::read_dir(&archive_dir)
1402 .unwrap()
1403 .filter_map(|e| e.ok())
1404 .filter(|e| {
1405 e.path()
1406 .extension()
1407 .map(|ext| ext == "parquet")
1408 .unwrap_or(false)
1409 })
1410 .collect();
1411 assert_eq!(archive_files.len(), 1, "Archive should have 1 consolidated file");
1412
1413 let remaining = std::fs::read_dir(&recent_dir)
1415 .map(|entries| {
1416 entries
1417 .filter_map(|e| e.ok())
1418 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1419 .count()
1420 })
1421 .unwrap_or(0);
1422 assert_eq!(remaining, 0, "Recent partition should have no data files");
1423 }
1424
1425 #[test]
1426 fn test_archive_dry_run() {
1427 let (_tmp, store) = setup_store();
1428
1429 for i in 0..3 {
1431 let record = InvocationRecord::new(
1432 "test-session",
1433 format!("command-{}", i),
1434 "/home/user",
1435 0,
1436 "test@client",
1437 );
1438 store.write_invocation(&record).unwrap();
1439 }
1440
1441 let stats = store.archive_old_data(0, true).unwrap();
1443 assert!(stats.partitions_archived >= 1, "Should report at least 1 partition");
1445 assert!(stats.files_moved > 0, "Should report files to move");
1446
1447 let date = chrono::Utc::now().date_naive();
1449 let recent_dir = store.config().invocations_dir(&date);
1450 let recent_count = std::fs::read_dir(&recent_dir)
1451 .unwrap()
1452 .filter_map(|e| e.ok())
1453 .filter(|e| {
1454 e.path()
1455 .extension()
1456 .map(|ext| ext == "parquet")
1457 .unwrap_or(false)
1458 })
1459 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1460 .count();
1461 assert_eq!(recent_count, 3, "Files should still be in recent after dry run");
1462 }
1463
1464 #[test]
1465 fn test_archive_respects_age_threshold() {
1466 let (_tmp, store) = setup_store();
1467
1468 let record = InvocationRecord::new(
1470 "test-session",
1471 "echo hello",
1472 "/home/user",
1473 0,
1474 "test@client",
1475 );
1476 store.write_invocation(&record).unwrap();
1477
1478 let stats = store.archive_old_data(7, false).unwrap();
1480 assert_eq!(stats.partitions_archived, 0, "Today's data should not be archived with 7 day threshold");
1481
1482 let date = chrono::Utc::now().date_naive();
1484 let recent_dir = store.config().invocations_dir(&date);
1485 let file_count = std::fs::read_dir(&recent_dir)
1486 .unwrap()
1487 .filter_map(|e| e.ok())
1488 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1489 .count();
1490 assert_eq!(file_count, 1, "Data file should still exist");
1491 }
1492
1493 #[test]
1496 fn test_consolidate_merges_all_files() {
1497 let (_tmp, store) = setup_store();
1498
1499 for i in 0..5 {
1501 let record = InvocationRecord::new(
1502 "test-session",
1503 format!("command-{}", i),
1504 "/home/user",
1505 0,
1506 "test@client",
1507 );
1508 store.write_invocation(&record).unwrap();
1509 }
1510
1511 store.compact_recent(2, false).unwrap();
1513
1514 let opts = CompactOptions {
1516 consolidate: true,
1517 ..Default::default()
1518 };
1519 let stats = store.compact_recent_with_opts(&opts).unwrap();
1520
1521 assert!(stats.sessions_compacted > 0, "Should consolidate session files");
1523
1524 let date = chrono::Utc::now().date_naive();
1526 let inv_dir = store.config().invocations_dir(&date);
1527 let file_count = std::fs::read_dir(&inv_dir)
1528 .unwrap()
1529 .filter_map(|e| e.ok())
1530 .filter(|e| {
1531 e.path()
1532 .extension()
1533 .map(|ext| ext == "parquet")
1534 .unwrap_or(false)
1535 })
1536 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1537 .count();
1538 assert_eq!(file_count, 1, "Should have single consolidated file");
1539 }
1540
1541 #[test]
1542 fn test_recompact_threshold() {
1543 let (_tmp, store) = setup_store();
1544
1545 for batch in 0..3 {
1548 for i in 0..5 {
1549 let record = InvocationRecord::new(
1550 "test-session",
1551 format!("batch-{}-cmd-{}", batch, i),
1552 "/home/user",
1553 0,
1554 "test@client",
1555 );
1556 store.write_invocation(&record).unwrap();
1557 }
1558 store.compact_recent(2, false).unwrap();
1560 }
1561
1562 let date = chrono::Utc::now().date_naive();
1564 let inv_dir = store.config().invocations_dir(&date);
1565 let compacted_count = std::fs::read_dir(&inv_dir)
1566 .unwrap()
1567 .filter_map(|e| e.ok())
1568 .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1569 .count();
1570
1571 let opts = CompactOptions {
1573 file_threshold: 50, recompact_threshold: 2,
1575 ..Default::default()
1576 };
1577 let stats = store.compact_recent_with_opts(&opts).unwrap();
1578
1579 if compacted_count >= 2 {
1581 assert!(
1582 stats.sessions_compacted > 0 || stats.files_before > 0,
1583 "Should trigger re-compaction when threshold exceeded"
1584 );
1585 }
1586 }
1587
1588 #[test]
1589 fn test_auto_compact_parquet_mode() {
1590 let (_tmp, store) = setup_store();
1591
1592 for i in 0..10 {
1594 let record = InvocationRecord::new(
1595 "test-session",
1596 format!("command-{}", i),
1597 "/home/user",
1598 0,
1599 "test@client",
1600 );
1601 store.write_invocation(&record).unwrap();
1602 }
1603
1604 let date = chrono::Utc::now().date_naive();
1606 let inv_dir = store.config().invocations_dir(&date);
1607 let files_before = std::fs::read_dir(&inv_dir)
1608 .unwrap()
1609 .filter_map(|e| e.ok())
1610 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1611 .count();
1612 assert_eq!(files_before, 10, "Should have 10 files before compact");
1613
1614 let opts = AutoCompactOptions {
1616 compact: CompactOptions {
1617 file_threshold: 3,
1618 ..Default::default()
1619 },
1620 archive_days: 14, };
1622 let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1623
1624 assert!(compact_stats.sessions_compacted > 0, "Should compact files");
1626 assert_eq!(archive_stats.partitions_archived, 0, "Should not archive today's data");
1627
1628 let files_after = std::fs::read_dir(&inv_dir)
1630 .unwrap()
1631 .filter_map(|e| e.ok())
1632 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1633 .count();
1634 assert!(files_after < files_before, "Should have fewer files after compact");
1635 }
1636
1637 #[test]
1638 fn test_compact_preserves_data_integrity() {
1639 let (_tmp, store) = setup_store();
1640
1641 let commands: Vec<String> = (0..10).map(|i| format!("command-{}", i)).collect();
1643 for cmd in &commands {
1644 let record = InvocationRecord::new(
1645 "test-session",
1646 cmd.clone(),
1647 "/home/user",
1648 0,
1649 "test@client",
1650 );
1651 store.write_invocation(&record).unwrap();
1652 }
1653
1654 let conn = store.connection().unwrap();
1656 let count_before: i64 = conn
1657 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1658 .unwrap();
1659 assert_eq!(count_before, 10);
1660
1661 store.compact_recent(2, false).unwrap();
1663
1664 let count_after: i64 = conn
1666 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1667 .unwrap();
1668 assert_eq!(count_after, 10, "Compaction should preserve all records");
1669
1670 let mut found_cmds: Vec<String> = conn
1672 .prepare("SELECT cmd FROM local.invocations ORDER BY cmd")
1673 .unwrap()
1674 .query_map([], |r| r.get(0))
1675 .unwrap()
1676 .filter_map(|r| r.ok())
1677 .collect();
1678 found_cmds.sort();
1679 let mut expected = commands.clone();
1680 expected.sort();
1681 assert_eq!(found_cmds, expected, "All commands should be preserved");
1682 }
1683
1684 #[test]
1687 fn test_clean_recovers_orphaned() {
1688 let (tmp, store) = setup_store();
1689
1690 let record = InvocationRecord::new_pending_local(
1692 "test-session",
1693 "crashed-command",
1694 "/home/user",
1695 999999999, "test@client",
1697 );
1698
1699 let pending =
1701 super::super::pending::PendingInvocation::from_record(&record).unwrap();
1702 let pending_dir = tmp.path().join("db/pending");
1703 super::super::pending::write_pending_file(&pending_dir, &pending).unwrap();
1704
1705 store.write_invocation(&record).unwrap();
1707
1708 let opts = CleanOptions::default();
1710 let stats = store.clean(&opts).unwrap();
1711
1712 assert_eq!(stats.pending_checked, 1);
1713 assert_eq!(stats.orphaned, 1);
1714 assert_eq!(stats.still_running, 0);
1715 }
1716
1717 #[test]
1718 fn test_clean_dry_run() {
1719 let (tmp, store) = setup_store();
1720
1721 let record = InvocationRecord::new_pending_local(
1723 "test-session",
1724 "crashed-command",
1725 "/home/user",
1726 999999999, "test@client",
1728 );
1729
1730 let pending =
1732 super::super::pending::PendingInvocation::from_record(&record).unwrap();
1733 let pending_dir = tmp.path().join("db/pending");
1734 super::super::pending::write_pending_file(&pending_dir, &pending).unwrap();
1735
1736 store.write_invocation(&record).unwrap();
1738
1739 let opts = CleanOptions {
1741 dry_run: true,
1742 ..Default::default()
1743 };
1744 let stats = store.clean(&opts).unwrap();
1745
1746 assert_eq!(stats.pending_checked, 1);
1748 assert_eq!(stats.orphaned, 1);
1749
1750 let pending_path = pending.path(&pending_dir);
1752 assert!(pending_path.exists(), "Pending file should still exist after dry run");
1753 }
1754
1755 #[test]
1756 fn test_prune_archive() {
1757 let (tmp, store) = setup_store();
1758
1759 for i in 0..3 {
1761 let record = InvocationRecord::new(
1762 "test-session",
1763 format!("command-{}", i),
1764 "/home/user",
1765 0,
1766 "test@client",
1767 );
1768 store.write_invocation(&record).unwrap();
1769 }
1770
1771 let archive_stats = store.archive_old_data(0, false).unwrap();
1773 assert!(archive_stats.partitions_archived > 0, "Should archive data");
1774
1775 let archive_invocations = tmp.path().join("db/data/archive/invocations");
1777 assert!(archive_invocations.exists(), "Archive should exist");
1778
1779 let prune_stats = store.prune_archive(0, false).unwrap();
1781 assert!(prune_stats.files_pruned > 0, "Should prune files");
1782 assert!(prune_stats.bytes_freed > 0, "Should free bytes");
1783 }
1784
1785 #[test]
1786 fn test_clean_with_prune() {
1787 let (tmp, store) = setup_store();
1788
1789 let record = InvocationRecord::new_pending_local(
1791 "test-session",
1792 "crashed-command",
1793 "/home/user",
1794 999999999,
1795 "test@client",
1796 );
1797 let pending =
1798 super::super::pending::PendingInvocation::from_record(&record).unwrap();
1799 let pending_dir = tmp.path().join("db/pending");
1800 super::super::pending::write_pending_file(&pending_dir, &pending).unwrap();
1801 store.write_invocation(&record).unwrap();
1802
1803 for i in 0..3 {
1805 let record = InvocationRecord::new(
1806 "another-session",
1807 format!("command-{}", i),
1808 "/home/user",
1809 0,
1810 "test@client",
1811 );
1812 store.write_invocation(&record).unwrap();
1813 }
1814 store.archive_old_data(0, false).unwrap();
1815
1816 let opts = CleanOptions {
1818 prune: true,
1819 older_than_days: 0, ..Default::default()
1821 };
1822 let stats = store.clean(&opts).unwrap();
1823
1824 assert_eq!(stats.orphaned, 1, "Should recover 1 orphaned");
1826 assert!(stats.pruned_files > 0, "Should prune archive files");
1827 }
1828}