1use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::{NaiveDate, Utc};
12use uuid::Uuid;
13
14use super::atomic;
15use super::Store;
16use crate::Result;
17
18fn is_compacted_file(name: &str) -> bool {
20 name.contains("__compacted-")
21}
22
23fn is_seed_file(name: &str) -> bool {
25 name.starts_with("_seed")
26}
27
28fn extract_session(name: &str) -> Option<String> {
34 let stem = name.strip_suffix(".parquet")?;
35
36 if let Some(idx) = stem.find("--") {
38 Some(stem[..idx].to_string())
39 } else {
40 Some(stem.to_string())
42 }
43}
44
45fn extract_compaction_number(name: &str) -> Option<u32> {
47 if let Some(start) = name.find("__compacted-") {
49 let after_prefix = &name[start + 12..]; if let Some(end) = after_prefix.find("__") {
51 if let Ok(n) = after_prefix[..end].parse::<u32>() {
52 return Some(n);
53 }
54 }
55 }
56 None
57}
58
59fn next_compaction_number(partition_dir: &Path, session: &str) -> u32 {
61 fs::read_dir(partition_dir)
62 .ok()
63 .map(|entries| {
64 entries
65 .filter_map(|e| e.ok())
66 .filter_map(|e| {
67 let name = e.file_name().to_string_lossy().to_string();
68 if name.starts_with(&format!("{}--__compacted-", session)) {
70 extract_compaction_number(&name)
71 } else {
72 None
73 }
74 })
75 .max()
76 .map(|n| n + 1)
77 .unwrap_or(0)
78 })
79 .unwrap_or(0)
80}
81
82fn file_mtime(path: &Path) -> u64 {
84 fs::metadata(path)
85 .and_then(|m| m.modified())
86 .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs())
87 .unwrap_or(0)
88}
89
90#[derive(Debug, Default)]
92pub struct CompactStats {
93 pub partitions_compacted: usize,
94 pub sessions_compacted: usize,
95 pub files_before: usize,
96 pub files_after: usize,
97 pub bytes_before: u64,
98 pub bytes_after: u64,
99}
100
101impl CompactStats {
102 pub fn add(&mut self, other: &CompactStats) {
103 self.partitions_compacted += other.partitions_compacted;
104 self.sessions_compacted += other.sessions_compacted;
105 self.files_before += other.files_before;
106 self.files_after += other.files_after;
107 self.bytes_before += other.bytes_before;
108 self.bytes_after += other.bytes_after;
109 }
110}
111
112#[derive(Debug, Default)]
114pub struct ArchiveStats {
115 pub partitions_archived: usize,
116 pub files_moved: usize,
117 pub bytes_moved: u64,
118}
119
120#[derive(Debug, Clone)]
122pub struct CompactOptions {
123 pub file_threshold: usize,
125 pub recompact_threshold: usize,
128 pub consolidate: bool,
130 pub dry_run: bool,
132 pub session_filter: Option<String>,
134}
135
136impl Default for CompactOptions {
137 fn default() -> Self {
138 Self {
139 file_threshold: 50,
140 recompact_threshold: 10,
141 consolidate: false,
142 dry_run: false,
143 session_filter: None,
144 }
145 }
146}
147
148#[derive(Debug)]
150pub struct AutoCompactOptions {
151 pub compact: CompactOptions,
153 pub archive_days: u32,
155}
156
157impl Default for AutoCompactOptions {
158 fn default() -> Self {
159 Self {
160 compact: CompactOptions::default(),
161 archive_days: 14,
162 }
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct CleanOptions {
169 pub dry_run: bool,
171 pub max_age_hours: u32,
173 pub prune: bool,
175 pub older_than_days: u32,
177}
178
179impl Default for CleanOptions {
180 fn default() -> Self {
181 Self {
182 dry_run: false,
183 max_age_hours: 24,
184 prune: false,
185 older_than_days: 365,
186 }
187 }
188}
189
190#[derive(Debug, Default)]
192pub struct CleanStats {
193 pub pending_checked: usize,
195 pub still_running: usize,
197 pub orphaned: usize,
199 pub pruned_files: usize,
201 pub bytes_freed: u64,
203}
204
205impl Store {
206 fn compact_session_files(
211 &self,
212 partition_dir: &Path,
213 session: &str,
214 files: &mut Vec<PathBuf>,
215 keep_count: usize,
216 dry_run: bool,
217 ) -> Result<CompactStats> {
218 files.sort_by_key(|p| file_mtime(p));
220
221 let to_keep = files.len().saturating_sub(keep_count).max(0);
223 if to_keep < 2 {
224 return Ok(CompactStats::default());
226 }
227
228 let files_to_compact: Vec<PathBuf> = files.drain(..to_keep).collect();
229 let files_before = files_to_compact.len();
230 let bytes_before: u64 = files_to_compact
231 .iter()
232 .filter_map(|p| fs::metadata(p).ok())
233 .map(|m| m.len())
234 .sum();
235
236 if dry_run {
237 return Ok(CompactStats {
238 partitions_compacted: 0, sessions_compacted: 1,
240 files_before,
241 files_after: 1,
242 bytes_before,
243 bytes_after: bytes_before, });
245 }
246
247 let conn = self.connection_with_options(false)?;
249
250 let file_list: Vec<String> = files_to_compact
252 .iter()
253 .map(|p| p.display().to_string())
254 .collect();
255 let file_list_sql = file_list
256 .iter()
257 .map(|f| format!("'{}'", f))
258 .collect::<Vec<_>>()
259 .join(", ");
260
261 conn.execute(
263 &format!(
264 "CREATE OR REPLACE TEMP TABLE compact_temp AS
265 SELECT * FROM read_parquet([{}], union_by_name = true)",
266 file_list_sql
267 ),
268 [],
269 )?;
270
271 let seq_num = next_compaction_number(partition_dir, session);
273 let uuid = Uuid::now_v7();
274 let compacted_name = format!("{}--__compacted-{}__--{}.parquet", session, seq_num, uuid);
275 let compacted_path = partition_dir.join(&compacted_name);
276 let temp_path = atomic::temp_path(&compacted_path);
277
278 conn.execute(
279 &format!(
280 "COPY compact_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
281 temp_path.display()
282 ),
283 [],
284 )?;
285
286 conn.execute("DROP TABLE compact_temp", [])?;
287
288 let bytes_after = fs::metadata(&temp_path)?.len();
290
291 for file in &files_to_compact {
293 fs::remove_file(file)?;
294 }
295
296 atomic::rename_into_place(&temp_path, &compacted_path)?;
298
299 Ok(CompactStats {
300 partitions_compacted: 0, sessions_compacted: 1,
302 files_before,
303 files_after: 1,
304 bytes_before,
305 bytes_after,
306 })
307 }
308
309 fn consolidate_session_files(
314 &self,
315 partition_dir: &Path,
316 _session: &str,
317 files: Vec<PathBuf>,
318 dry_run: bool,
319 ) -> Result<CompactStats> {
320 if files.len() < 2 {
321 return Ok(CompactStats::default());
322 }
323
324 let files_before = files.len();
325 let bytes_before: u64 = files
326 .iter()
327 .filter_map(|p| fs::metadata(p).ok())
328 .map(|m| m.len())
329 .sum();
330
331 if dry_run {
332 return Ok(CompactStats {
333 partitions_compacted: 0,
334 sessions_compacted: 1,
335 files_before,
336 files_after: 1,
337 bytes_before,
338 bytes_after: bytes_before,
339 });
340 }
341
342 let conn = self.connection_with_options(false)?;
344
345 let file_list_sql = files
347 .iter()
348 .map(|p| format!("'{}'", p.display()))
349 .collect::<Vec<_>>()
350 .join(", ");
351
352 conn.execute(
354 &format!(
355 "CREATE OR REPLACE TEMP TABLE consolidate_temp AS
356 SELECT * FROM read_parquet([{}], union_by_name = true)",
357 file_list_sql
358 ),
359 [],
360 )?;
361
362 let uuid = Uuid::now_v7();
364 let consolidated_name = format!("data_{}.parquet", uuid);
365 let consolidated_path = partition_dir.join(&consolidated_name);
366 let temp_path = atomic::temp_path(&consolidated_path);
367
368 conn.execute(
369 &format!(
370 "COPY consolidate_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
371 temp_path.display()
372 ),
373 [],
374 )?;
375
376 conn.execute("DROP TABLE consolidate_temp", [])?;
377
378 let bytes_after = fs::metadata(&temp_path)?.len();
379
380 for file in &files {
382 fs::remove_file(file)?;
383 }
384
385 atomic::rename_into_place(&temp_path, &consolidated_path)?;
387
388 Ok(CompactStats {
389 partitions_compacted: 0,
390 sessions_compacted: 1,
391 files_before,
392 files_after: 1,
393 bytes_before,
394 bytes_after,
395 })
396 }
397
398 pub fn compact_partition_with_opts(
405 &self,
406 partition_dir: &Path,
407 opts: &CompactOptions,
408 ) -> Result<CompactStats> {
409 let mut total_stats = CompactStats::default();
410
411 let mut session_files: HashMap<String, (Vec<PathBuf>, Vec<PathBuf>)> = HashMap::new();
413
414 for entry in fs::read_dir(partition_dir)? {
415 let entry = entry?;
416 let path = entry.path();
417 let name = entry.file_name().to_string_lossy().to_string();
418
419 if !name.ends_with(".parquet") || is_seed_file(&name) {
421 continue;
422 }
423
424 if let Some(session) = extract_session(&name) {
426 if let Some(ref filter) = opts.session_filter {
428 if session != *filter {
429 continue;
430 }
431 }
432
433 let entry = session_files.entry(session).or_insert_with(|| (vec![], vec![]));
434 if is_compacted_file(&name) || name.starts_with("data_") {
435 entry.1.push(path); } else {
437 entry.0.push(path); }
439 }
440 }
441
442 let mut any_compacted = false;
443 for (session, (non_compacted, compacted)) in session_files {
444 if opts.consolidate {
445 let all_files: Vec<PathBuf> = non_compacted.into_iter().chain(compacted).collect();
447 if all_files.len() >= 2 {
448 let stats = self.consolidate_session_files(
449 partition_dir,
450 &session,
451 all_files,
452 opts.dry_run,
453 )?;
454 if stats.sessions_compacted > 0 {
455 any_compacted = true;
456 total_stats.add(&stats);
457 }
458 }
459 } else {
460 if non_compacted.len() >= opts.file_threshold {
464 let mut to_process = non_compacted;
465 let stats = self.compact_session_files(
466 partition_dir,
467 &session,
468 &mut to_process,
469 opts.file_threshold,
470 opts.dry_run,
471 )?;
472 if stats.sessions_compacted > 0 {
473 any_compacted = true;
474 total_stats.add(&stats);
475 }
476 }
477
478 if opts.recompact_threshold > 0 && compacted.len() >= opts.recompact_threshold {
480 let stats = self.consolidate_session_files(
481 partition_dir,
482 &session,
483 compacted,
484 opts.dry_run,
485 )?;
486 if stats.sessions_compacted > 0 {
487 any_compacted = true;
488 total_stats.add(&stats);
489 }
490 }
491 }
492 }
493
494 if any_compacted {
495 total_stats.partitions_compacted = 1;
496 }
497
498 Ok(total_stats)
499 }
500
501 pub fn compact_partition(
503 &self,
504 partition_dir: &Path,
505 file_threshold: usize,
506 session_filter: Option<&str>,
507 dry_run: bool,
508 ) -> Result<CompactStats> {
509 let opts = CompactOptions {
510 file_threshold,
511 recompact_threshold: 0, consolidate: false,
513 dry_run,
514 session_filter: session_filter.map(|s| s.to_string()),
515 };
516 self.compact_partition_with_opts(partition_dir, &opts)
517 }
518
519 pub fn compact_data_type(
521 &self,
522 data_dir: &Path,
523 file_threshold: usize,
524 session_filter: Option<&str>,
525 dry_run: bool,
526 ) -> Result<CompactStats> {
527 let mut total_stats = CompactStats::default();
528
529 if !data_dir.exists() {
530 return Ok(total_stats);
531 }
532
533 for entry in fs::read_dir(data_dir)? {
535 let entry = entry?;
536 let path = entry.path();
537
538 if !path.is_dir() {
539 continue;
540 }
541
542 let dir_name = entry.file_name().to_string_lossy().to_string();
543
544 if dir_name.starts_with("date=") {
546 let stats =
547 self.compact_partition(&path, file_threshold, session_filter, dry_run)?;
548 total_stats.add(&stats);
549 }
550 }
552
553 Ok(total_stats)
554 }
555
556 pub fn compact_for_session(
560 &self,
561 session_id: &str,
562 file_threshold: usize,
563 dry_run: bool,
564 ) -> Result<CompactStats> {
565 let mut total_stats = CompactStats::default();
566 let recent_dir = self.config().recent_dir();
567
568 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
570 let data_dir = recent_dir.join(data_type);
571 let stats =
572 self.compact_data_type(&data_dir, file_threshold, Some(session_id), dry_run)?;
573 total_stats.add(&stats);
574 }
575
576 Ok(total_stats)
577 }
578
579 pub fn compact_session_today(
583 &self,
584 session_id: &str,
585 file_threshold: usize,
586 dry_run: bool,
587 ) -> Result<CompactStats> {
588 let mut total_stats = CompactStats::default();
589 let recent_dir = self.config().recent_dir();
590 let today = Utc::now().date_naive();
591 let date_partition = format!("date={}", today.format("%Y-%m-%d"));
592
593 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
595 let partition_dir = recent_dir.join(data_type).join(&date_partition);
596 if partition_dir.exists() {
597 let stats = self.compact_partition(
598 &partition_dir,
599 file_threshold,
600 Some(session_id),
601 dry_run,
602 )?;
603 total_stats.add(&stats);
604 }
605 }
606
607 Ok(total_stats)
608 }
609
610 pub fn compact_recent(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
612 let mut total_stats = CompactStats::default();
613 let recent_dir = self.config().recent_dir();
614
615 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
617 let data_dir = recent_dir.join(data_type);
618 let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
619 total_stats.add(&stats);
620 }
621
622 Ok(total_stats)
623 }
624
625 pub fn compact_archive(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
627 let mut total_stats = CompactStats::default();
628 let archive_dir = self.config().archive_dir();
629
630 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
632 let data_dir = archive_dir.join(data_type);
633 let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
634 total_stats.add(&stats);
635 }
636
637 Ok(total_stats)
638 }
639
640 pub fn archive_old_data(&self, older_than_days: u32, dry_run: bool) -> Result<ArchiveStats> {
645 let mut stats = ArchiveStats::default();
646 let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
647
648 let recent_dir = self.config().recent_dir();
649 let archive_dir = self.config().archive_dir();
650
651 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
653 let recent_data_dir = recent_dir.join(data_type);
654 let archive_data_dir = archive_dir.join(data_type);
655
656 if !recent_data_dir.exists() {
657 continue;
658 }
659
660 let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
664
665 let date_partition_parent = recent_data_dir.clone();
667
668 if !date_partition_parent.exists() {
669 continue;
670 }
671
672 let partitions_to_archive: Vec<(NaiveDate, PathBuf, String)> = fs::read_dir(&date_partition_parent)?
673 .filter_map(|e| e.ok())
674 .filter_map(|e| {
675 let path = e.path();
676 if !path.is_dir() {
677 return None;
678 }
679 let dir_name = e.file_name().to_string_lossy().to_string();
680 let date_str = dir_name.strip_prefix("date=")?;
681 let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?;
682 if date == seed_date {
684 return None;
685 }
686 if date <= cutoff_date {
687 Some((date, path, dir_name))
688 } else {
689 None
690 }
691 })
692 .collect();
693
694 for (_date, partition_path, dir_name) in partitions_to_archive {
695 let dest_dir = archive_data_dir.join(&dir_name);
696
697 let source_files: Vec<PathBuf> = fs::read_dir(&partition_path)?
699 .filter_map(|e| e.ok())
700 .map(|e| e.path())
701 .filter(|p| p.extension().map(|ext| ext == "parquet").unwrap_or(false))
702 .collect();
703
704 if source_files.is_empty() {
705 continue;
706 }
707
708 let file_count = source_files.len();
709 let bytes_before: u64 = source_files
710 .iter()
711 .filter_map(|p| fs::metadata(p).ok())
712 .map(|m| m.len())
713 .sum();
714
715 if dry_run {
716 stats.partitions_archived += 1;
717 stats.files_moved += file_count;
718 stats.bytes_moved += bytes_before;
719 continue;
720 }
721
722 if dest_dir.exists() {
724 let existing_files = fs::read_dir(&dest_dir)?
725 .filter_map(|e| e.ok())
726 .any(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false));
727 if existing_files {
728 continue;
730 }
731 }
732
733 fs::create_dir_all(&dest_dir)?;
734
735 let conn = self.connection_with_options(false)?;
739 let src_glob = format!("{}/*.parquet", partition_path.display());
740 let dest_file = dest_dir.join("data_0.parquet");
741 let temp_file = dest_dir.join(".data_0.parquet.tmp");
742
743 conn.execute(
744 &format!(
745 "COPY (SELECT * FROM read_parquet('{}', union_by_name = true)) \
746 TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
747 src_glob,
748 temp_file.display()
749 ),
750 [],
751 )?;
752
753 fs::rename(&temp_file, &dest_file)?;
755
756 let bytes_after = fs::metadata(&dest_file)?.len();
758
759 for file in &source_files {
761 let _ = fs::remove_file(file);
762 }
763 let _ = fs::remove_dir(&partition_path);
764
765 stats.partitions_archived += 1;
766 stats.files_moved += file_count;
767 stats.bytes_moved += bytes_after;
768 }
769 }
770
771 Ok(stats)
772 }
773
774 pub fn auto_compact(&self, opts: &AutoCompactOptions) -> Result<(CompactStats, ArchiveStats)> {
776 let archive_stats = if opts.compact.session_filter.is_none() {
778 self.archive_old_data(opts.archive_days, opts.compact.dry_run)?
779 } else {
780 ArchiveStats::default()
781 };
782
783 let compact_stats = if let Some(ref session) = opts.compact.session_filter {
785 self.compact_for_session_with_opts(session, &opts.compact)?
787 } else {
788 let mut stats = self.compact_recent_with_opts(&opts.compact)?;
790 let archive_compact = self.compact_archive_with_opts(&opts.compact)?;
791 stats.add(&archive_compact);
792 stats
793 };
794
795 Ok((compact_stats, archive_stats))
796 }
797
798 pub fn compact_recent_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
800 let mut total_stats = CompactStats::default();
801 let recent_dir = self.config().recent_dir();
802
803 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
805 let data_dir = recent_dir.join(data_type);
806 let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
807 total_stats.add(&stats);
808 }
809
810 Ok(total_stats)
811 }
812
813 pub fn compact_archive_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
815 let mut total_stats = CompactStats::default();
816 let archive_dir = self.config().archive_dir();
817
818 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
820 let data_dir = archive_dir.join(data_type);
821 let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
822 total_stats.add(&stats);
823 }
824
825 Ok(total_stats)
826 }
827
828 pub fn compact_data_type_with_opts(
830 &self,
831 data_dir: &Path,
832 opts: &CompactOptions,
833 ) -> Result<CompactStats> {
834 let mut total_stats = CompactStats::default();
835
836 if !data_dir.exists() {
837 return Ok(total_stats);
838 }
839
840 for entry in fs::read_dir(data_dir)? {
841 let entry = entry?;
842 let path = entry.path();
843
844 if !path.is_dir() {
845 continue;
846 }
847
848 let dir_name = entry.file_name().to_string_lossy().to_string();
849
850 if dir_name.starts_with("date=") {
852 let stats = self.compact_partition_with_opts(&path, opts)?;
854 total_stats.add(&stats);
855 }
856 }
858
859 Ok(total_stats)
860 }
861
862 pub fn compact_for_session_with_opts(
864 &self,
865 session_id: &str,
866 opts: &CompactOptions,
867 ) -> Result<CompactStats> {
868 let mut total_stats = CompactStats::default();
869 let recent_dir = self.config().recent_dir();
870
871 let session_opts = CompactOptions {
872 session_filter: Some(session_id.to_string()),
873 ..opts.clone()
874 };
875
876 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
878 let data_dir = recent_dir.join(data_type);
879 let stats = self.compact_data_type_with_opts(&data_dir, &session_opts)?;
880 total_stats.add(&stats);
881 }
882
883 Ok(total_stats)
884 }
885
886 pub fn clean(&self, opts: &CleanOptions) -> Result<CleanStats> {
893 let mut stats = CleanStats::default();
894
895 let recovery_stats =
897 self.recover_orphaned_invocations(opts.max_age_hours, opts.dry_run)?;
898
899 stats.pending_checked = recovery_stats.pending_checked;
900 stats.still_running = recovery_stats.still_running;
901 stats.orphaned = recovery_stats.orphaned;
902
903 if opts.prune {
905 let prune_stats = self.prune_archive(opts.older_than_days, opts.dry_run)?;
906 stats.pruned_files = prune_stats.files_pruned;
907 stats.bytes_freed = prune_stats.bytes_freed;
908 }
909
910 Ok(stats)
911 }
912
913 pub fn prune_archive(&self, older_than_days: u32, dry_run: bool) -> Result<PruneStats> {
917 let mut stats = PruneStats::default();
918 let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
919 let archive_dir = self.config().archive_dir();
920
921 for data_type in &["attempts", "outcomes", "outputs", "sessions", "events"] {
923 let data_dir = archive_dir.join(data_type);
924 if !data_dir.exists() {
925 continue;
926 }
927
928 let partition_stats =
930 self.prune_date_partitions(&data_dir, cutoff_date, dry_run)?;
931 stats.add(&partition_stats);
932 }
933
934 Ok(stats)
935 }
936
937 fn prune_date_partitions(
939 &self,
940 parent_dir: &Path,
941 cutoff_date: NaiveDate,
942 dry_run: bool,
943 ) -> Result<PruneStats> {
944 let mut stats = PruneStats::default();
945 let seed_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
946
947 for entry in fs::read_dir(parent_dir)?.flatten() {
948 let path = entry.path();
949 if !path.is_dir() {
950 continue;
951 }
952
953 let dir_name = entry.file_name().to_string_lossy().to_string();
954 let date_str = match dir_name.strip_prefix("date=") {
955 Some(s) => s,
956 None => continue,
957 };
958
959 let date = match NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
960 Ok(d) => d,
961 Err(_) => continue,
962 };
963
964 if date == seed_date {
966 continue;
967 }
968
969 if date > cutoff_date {
971 continue;
972 }
973
974 for file_entry in fs::read_dir(&path)?.flatten() {
976 let file_path = file_entry.path();
977 if file_path.extension().map(|e| e == "parquet").unwrap_or(false) {
978 if let Ok(metadata) = fs::metadata(&file_path) {
979 stats.files_pruned += 1;
980 stats.bytes_freed += metadata.len();
981 }
982
983 if !dry_run {
984 let _ = fs::remove_file(&file_path);
985 }
986 }
987 }
988
989 if !dry_run {
991 let _ = fs::remove_dir(&path);
992 }
993 }
994
995 Ok(stats)
996 }
997}
998
999#[derive(Debug, Default)]
1001pub struct PruneStats {
1002 pub files_pruned: usize,
1004 pub bytes_freed: u64,
1006}
1007
1008impl PruneStats {
1009 fn add(&mut self, other: &PruneStats) {
1010 self.files_pruned += other.files_pruned;
1011 self.bytes_freed += other.bytes_freed;
1012 }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017 use super::*;
1018 use crate::init::initialize;
1019 use crate::schema::InvocationRecord;
1020 use crate::Config;
1021 use tempfile::TempDir;
1022
1023 fn setup_store() -> (TempDir, Store) {
1024 let tmp = TempDir::new().unwrap();
1025 let config = Config::with_root(tmp.path());
1026 initialize(&config).unwrap();
1027 let store = Store::open(config).unwrap();
1028 (tmp, store)
1029 }
1030
1031 fn setup_store_duckdb() -> (TempDir, Store) {
1032 let tmp = TempDir::new().unwrap();
1033 let config = Config::with_duckdb_mode(tmp.path());
1034 initialize(&config).unwrap();
1035 let store = Store::open(config).unwrap();
1036 (tmp, store)
1037 }
1038
1039 #[test]
1040 fn test_extract_session() {
1041 assert_eq!(
1042 extract_session("zsh-1234--make--uuid.parquet"),
1043 Some("zsh-1234".to_string())
1044 );
1045 assert_eq!(
1046 extract_session("session-id.parquet"),
1047 Some("session-id".to_string())
1048 );
1049 assert_eq!(
1050 extract_session("zsh-1234--__compacted-0__--uuid.parquet"),
1051 Some("zsh-1234".to_string())
1052 );
1053 }
1054
1055 #[test]
1056 fn test_is_compacted_file() {
1057 assert!(is_compacted_file("zsh-1234--__compacted-0__--uuid.parquet"));
1058 assert!(is_compacted_file("session--__compacted-5__--abc.parquet"));
1059 assert!(!is_compacted_file("zsh-1234--make--uuid.parquet"));
1060 assert!(!is_compacted_file("session.parquet"));
1061 }
1062
1063 #[test]
1064 fn test_extract_compaction_number() {
1065 assert_eq!(
1066 extract_compaction_number("zsh--__compacted-0__--uuid.parquet"),
1067 Some(0)
1068 );
1069 assert_eq!(
1070 extract_compaction_number("zsh--__compacted-42__--uuid.parquet"),
1071 Some(42)
1072 );
1073 assert_eq!(
1074 extract_compaction_number("zsh--make--uuid.parquet"),
1075 None
1076 );
1077 }
1078
1079 #[test]
1080 fn test_compact_recent_no_files() {
1081 let (_tmp, store) = setup_store();
1082
1083 let stats = store.compact_recent(2, false).unwrap();
1084 assert_eq!(stats.partitions_compacted, 0);
1085 }
1086
1087 #[test]
1088 fn test_compact_recent_with_files() {
1089 let (_tmp, store) = setup_store();
1090
1091 for i in 0..5 {
1093 let record = InvocationRecord::new(
1094 "test-session",
1095 format!("command-{}", i),
1096 "/home/user",
1097 0,
1098 "test@client",
1099 );
1100 store.write_invocation(&record).unwrap();
1101 }
1102
1103 let stats = store.compact_recent(2, false).unwrap();
1105 assert_eq!(stats.sessions_compacted, 1);
1106 assert_eq!(stats.files_before, 3); assert_eq!(stats.files_after, 1);
1108 }
1109
1110 #[test]
1111 fn test_compact_for_session() {
1112 let (_tmp, store) = setup_store();
1113
1114 for i in 0..5 {
1116 let record = InvocationRecord::new(
1117 "session-a",
1118 format!("command-{}", i),
1119 "/home/user",
1120 0,
1121 "test@client",
1122 );
1123 store.write_invocation(&record).unwrap();
1124 }
1125 for i in 0..3 {
1126 let record = InvocationRecord::new(
1127 "session-b",
1128 format!("command-{}", i),
1129 "/home/user",
1130 0,
1131 "test@client",
1132 );
1133 store.write_invocation(&record).unwrap();
1134 }
1135
1136 let stats = store.compact_for_session("session-a", 2, false).unwrap();
1138 assert_eq!(stats.sessions_compacted, 1);
1139 assert_eq!(stats.files_before, 3); let date = chrono::Utc::now().date_naive();
1143 let inv_dir = store.config().attempts_dir(&date);
1144 let session_b_count = std::fs::read_dir(&inv_dir)
1145 .unwrap()
1146 .filter_map(|e| e.ok())
1147 .filter(|e| e.file_name().to_string_lossy().starts_with("session-b"))
1148 .count();
1149 assert_eq!(session_b_count, 3);
1150 }
1151
1152 #[test]
1153 fn test_compact_dry_run() {
1154 let (_tmp, store) = setup_store();
1155
1156 for i in 0..5 {
1158 let record = InvocationRecord::new(
1159 "test-session",
1160 format!("command-{}", i),
1161 "/home/user",
1162 0,
1163 "test@client",
1164 );
1165 store.write_invocation(&record).unwrap();
1166 }
1167
1168 let stats = store.compact_recent(2, true).unwrap();
1170 assert_eq!(stats.sessions_compacted, 1);
1171 assert_eq!(stats.files_before, 3);
1172
1173 let date = chrono::Utc::now().date_naive();
1175 let inv_dir = store.config().attempts_dir(&date);
1176 let file_count = std::fs::read_dir(&inv_dir)
1177 .unwrap()
1178 .filter_map(|e| e.ok())
1179 .filter(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false))
1180 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1181 .count();
1182 assert_eq!(file_count, 5);
1183 }
1184
1185 #[test]
1186 fn test_compacted_file_naming() {
1187 let (_tmp, store) = setup_store();
1188
1189 for i in 0..5 {
1191 let record = InvocationRecord::new(
1192 "zsh-9999",
1193 format!("cmd-{}", i),
1194 "/home/user",
1195 0,
1196 "test@client",
1197 );
1198 store.write_invocation(&record).unwrap();
1199 }
1200
1201 store.compact_recent(2, false).unwrap();
1203
1204 let date = chrono::Utc::now().date_naive();
1206 let inv_dir = store.config().attempts_dir(&date);
1207 let compacted_files: Vec<_> = std::fs::read_dir(&inv_dir)
1208 .unwrap()
1209 .filter_map(|e| e.ok())
1210 .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1211 .collect();
1212
1213 assert_eq!(compacted_files.len(), 1);
1214 let name = compacted_files[0].file_name().to_string_lossy().to_string();
1215 assert!(name.starts_with("zsh-9999--__compacted-0__--"));
1216 assert!(name.ends_with(".parquet"));
1217 }
1218
1219 #[test]
1223 fn test_compact_duckdb_mode_no_op() {
1224 let (_tmp, store) = setup_store_duckdb();
1225
1226 for i in 0..10 {
1228 let record = InvocationRecord::new(
1229 "test-session",
1230 format!("command-{}", i),
1231 "/home/user",
1232 0,
1233 "test@client",
1234 );
1235 store.write_invocation(&record).unwrap();
1236 }
1237
1238 let conn = store.connection().unwrap();
1240 let count: i64 = conn
1241 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1242 .unwrap();
1243 assert_eq!(count, 10, "Data should be in DuckDB table");
1244
1245 let stats = store.compact_recent(2, false).unwrap();
1247 assert_eq!(stats.partitions_compacted, 0);
1248 assert_eq!(stats.sessions_compacted, 0);
1249 assert_eq!(stats.files_before, 0);
1250 assert_eq!(stats.files_after, 0);
1251
1252 let count: i64 = conn
1254 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1255 .unwrap();
1256 assert_eq!(count, 10, "Data should be unaffected");
1257 }
1258
1259 #[test]
1260 fn test_compact_for_session_duckdb_mode_no_op() {
1261 let (_tmp, store) = setup_store_duckdb();
1262
1263 for i in 0..5 {
1265 let record = InvocationRecord::new(
1266 "session-a",
1267 format!("command-{}", i),
1268 "/home/user",
1269 0,
1270 "test@client",
1271 );
1272 store.write_invocation(&record).unwrap();
1273 }
1274
1275 let stats = store.compact_for_session("session-a", 2, false).unwrap();
1277 assert_eq!(stats.partitions_compacted, 0);
1278 assert_eq!(stats.sessions_compacted, 0);
1279 }
1280
1281 #[test]
1282 fn test_compact_session_today_duckdb_mode_no_op() {
1283 let (_tmp, store) = setup_store_duckdb();
1284
1285 let record = InvocationRecord::new(
1287 "test-session",
1288 "echo hello",
1289 "/home/user",
1290 0,
1291 "test@client",
1292 );
1293 store.write_invocation(&record).unwrap();
1294
1295 let stats = store.compact_session_today("test-session", 2, false).unwrap();
1297 assert_eq!(stats.partitions_compacted, 0);
1298 assert_eq!(stats.sessions_compacted, 0);
1299 }
1300
1301 #[test]
1302 fn test_auto_compact_duckdb_mode_no_op() {
1303 let (_tmp, store) = setup_store_duckdb();
1304
1305 for i in 0..5 {
1307 let record = InvocationRecord::new(
1308 "test-session",
1309 format!("command-{}", i),
1310 "/home/user",
1311 0,
1312 "test@client",
1313 );
1314 store.write_invocation(&record).unwrap();
1315 }
1316
1317 let opts = AutoCompactOptions::default();
1319 let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1320
1321 assert_eq!(compact_stats.partitions_compacted, 0);
1322 assert_eq!(compact_stats.sessions_compacted, 0);
1323 assert_eq!(archive_stats.partitions_archived, 0);
1324 }
1325
1326 #[test]
1327 fn test_archive_old_data_duckdb_mode_no_op() {
1328 let (_tmp, store) = setup_store_duckdb();
1329
1330 let record = InvocationRecord::new(
1332 "test-session",
1333 "echo hello",
1334 "/home/user",
1335 0,
1336 "test@client",
1337 );
1338 store.write_invocation(&record).unwrap();
1339
1340 let stats = store.archive_old_data(0, false).unwrap(); assert_eq!(stats.partitions_archived, 0);
1343 assert_eq!(stats.files_moved, 0);
1344 }
1345
1346 #[test]
1349 fn test_archive_old_data_moves_partitions() {
1350 let (_tmp, store) = setup_store();
1351
1352 for i in 0..3 {
1354 let record = InvocationRecord::new(
1355 "test-session",
1356 format!("command-{}", i),
1357 "/home/user",
1358 0,
1359 "test@client",
1360 );
1361 store.write_invocation(&record).unwrap();
1362 }
1363
1364 let date = chrono::Utc::now().date_naive();
1366 let recent_dir = store.config().attempts_dir(&date);
1367 let recent_count = std::fs::read_dir(&recent_dir)
1368 .unwrap()
1369 .filter_map(|e| e.ok())
1370 .filter(|e| {
1371 e.path()
1372 .extension()
1373 .map(|ext| ext == "parquet")
1374 .unwrap_or(false)
1375 })
1376 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1377 .count();
1378 assert_eq!(recent_count, 3, "Should have 3 files in recent");
1379
1380 let stats = store.archive_old_data(0, false).unwrap();
1382
1383 assert!(stats.partitions_archived >= 1, "Should archive at least 1 partition");
1386 assert!(stats.files_moved > 0, "Should move files");
1387
1388 let archive_dir = store
1390 .config()
1391 .archive_dir()
1392 .join("attempts")
1393 .join(format!("date={}", date));
1394 assert!(archive_dir.exists(), "Archive partition should exist");
1395
1396 let archive_files: Vec<_> = std::fs::read_dir(&archive_dir)
1398 .unwrap()
1399 .filter_map(|e| e.ok())
1400 .filter(|e| {
1401 e.path()
1402 .extension()
1403 .map(|ext| ext == "parquet")
1404 .unwrap_or(false)
1405 })
1406 .collect();
1407 assert_eq!(archive_files.len(), 1, "Archive should have 1 consolidated file");
1408
1409 let remaining = std::fs::read_dir(&recent_dir)
1411 .map(|entries| {
1412 entries
1413 .filter_map(|e| e.ok())
1414 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1415 .count()
1416 })
1417 .unwrap_or(0);
1418 assert_eq!(remaining, 0, "Recent partition should have no data files");
1419 }
1420
1421 #[test]
1422 fn test_archive_dry_run() {
1423 let (_tmp, store) = setup_store();
1424
1425 for i in 0..3 {
1427 let record = InvocationRecord::new(
1428 "test-session",
1429 format!("command-{}", i),
1430 "/home/user",
1431 0,
1432 "test@client",
1433 );
1434 store.write_invocation(&record).unwrap();
1435 }
1436
1437 let stats = store.archive_old_data(0, true).unwrap();
1439 assert!(stats.partitions_archived >= 1, "Should report at least 1 partition");
1441 assert!(stats.files_moved > 0, "Should report files to move");
1442
1443 let date = chrono::Utc::now().date_naive();
1445 let recent_dir = store.config().attempts_dir(&date);
1446 let recent_count = std::fs::read_dir(&recent_dir)
1447 .unwrap()
1448 .filter_map(|e| e.ok())
1449 .filter(|e| {
1450 e.path()
1451 .extension()
1452 .map(|ext| ext == "parquet")
1453 .unwrap_or(false)
1454 })
1455 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1456 .count();
1457 assert_eq!(recent_count, 3, "Files should still be in recent after dry run");
1458 }
1459
1460 #[test]
1461 fn test_archive_respects_age_threshold() {
1462 let (_tmp, store) = setup_store();
1463
1464 let record = InvocationRecord::new(
1466 "test-session",
1467 "echo hello",
1468 "/home/user",
1469 0,
1470 "test@client",
1471 );
1472 store.write_invocation(&record).unwrap();
1473
1474 let stats = store.archive_old_data(7, false).unwrap();
1476 assert_eq!(stats.partitions_archived, 0, "Today's data should not be archived with 7 day threshold");
1477
1478 let date = chrono::Utc::now().date_naive();
1480 let recent_dir = store.config().attempts_dir(&date);
1481 let file_count = std::fs::read_dir(&recent_dir)
1482 .unwrap()
1483 .filter_map(|e| e.ok())
1484 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1485 .count();
1486 assert_eq!(file_count, 1, "Data file should still exist");
1487 }
1488
1489 #[test]
1492 fn test_consolidate_merges_all_files() {
1493 let (_tmp, store) = setup_store();
1494
1495 for i in 0..5 {
1497 let record = InvocationRecord::new(
1498 "test-session",
1499 format!("command-{}", i),
1500 "/home/user",
1501 0,
1502 "test@client",
1503 );
1504 store.write_invocation(&record).unwrap();
1505 }
1506
1507 store.compact_recent(2, false).unwrap();
1509
1510 let opts = CompactOptions {
1512 consolidate: true,
1513 ..Default::default()
1514 };
1515 let stats = store.compact_recent_with_opts(&opts).unwrap();
1516
1517 assert!(stats.sessions_compacted > 0, "Should consolidate session files");
1519
1520 let date = chrono::Utc::now().date_naive();
1522 let inv_dir = store.config().attempts_dir(&date);
1523 let file_count = std::fs::read_dir(&inv_dir)
1524 .unwrap()
1525 .filter_map(|e| e.ok())
1526 .filter(|e| {
1527 e.path()
1528 .extension()
1529 .map(|ext| ext == "parquet")
1530 .unwrap_or(false)
1531 })
1532 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1533 .count();
1534 assert_eq!(file_count, 1, "Should have single consolidated file");
1535 }
1536
1537 #[test]
1538 fn test_recompact_threshold() {
1539 let (_tmp, store) = setup_store();
1540
1541 for batch in 0..3 {
1544 for i in 0..5 {
1545 let record = InvocationRecord::new(
1546 "test-session",
1547 format!("batch-{}-cmd-{}", batch, i),
1548 "/home/user",
1549 0,
1550 "test@client",
1551 );
1552 store.write_invocation(&record).unwrap();
1553 }
1554 store.compact_recent(2, false).unwrap();
1556 }
1557
1558 let date = chrono::Utc::now().date_naive();
1560 let inv_dir = store.config().attempts_dir(&date);
1561 let compacted_count = std::fs::read_dir(&inv_dir)
1562 .unwrap()
1563 .filter_map(|e| e.ok())
1564 .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
1565 .count();
1566
1567 let opts = CompactOptions {
1569 file_threshold: 50, recompact_threshold: 2,
1571 ..Default::default()
1572 };
1573 let stats = store.compact_recent_with_opts(&opts).unwrap();
1574
1575 if compacted_count >= 2 {
1577 assert!(
1578 stats.sessions_compacted > 0 || stats.files_before > 0,
1579 "Should trigger re-compaction when threshold exceeded"
1580 );
1581 }
1582 }
1583
1584 #[test]
1585 fn test_auto_compact_parquet_mode() {
1586 let (_tmp, store) = setup_store();
1587
1588 for i in 0..10 {
1590 let record = InvocationRecord::new(
1591 "test-session",
1592 format!("command-{}", i),
1593 "/home/user",
1594 0,
1595 "test@client",
1596 );
1597 store.write_invocation(&record).unwrap();
1598 }
1599
1600 let date = chrono::Utc::now().date_naive();
1602 let inv_dir = store.config().attempts_dir(&date);
1603 let files_before = std::fs::read_dir(&inv_dir)
1604 .unwrap()
1605 .filter_map(|e| e.ok())
1606 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1607 .count();
1608 assert_eq!(files_before, 10, "Should have 10 files before compact");
1609
1610 let opts = AutoCompactOptions {
1612 compact: CompactOptions {
1613 file_threshold: 3,
1614 ..Default::default()
1615 },
1616 archive_days: 14, };
1618 let (compact_stats, archive_stats) = store.auto_compact(&opts).unwrap();
1619
1620 assert!(compact_stats.sessions_compacted > 0, "Should compact files");
1622 assert_eq!(archive_stats.partitions_archived, 0, "Should not archive today's data");
1623
1624 let files_after = std::fs::read_dir(&inv_dir)
1626 .unwrap()
1627 .filter_map(|e| e.ok())
1628 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
1629 .count();
1630 assert!(files_after < files_before, "Should have fewer files after compact");
1631 }
1632
1633 #[test]
1634 fn test_compact_preserves_data_integrity() {
1635 let (_tmp, store) = setup_store();
1636
1637 let commands: Vec<String> = (0..10).map(|i| format!("command-{}", i)).collect();
1639 for cmd in &commands {
1640 let record = InvocationRecord::new(
1641 "test-session",
1642 cmd.clone(),
1643 "/home/user",
1644 0,
1645 "test@client",
1646 );
1647 store.write_invocation(&record).unwrap();
1648 }
1649
1650 let conn = store.connection().unwrap();
1652 let count_before: i64 = conn
1653 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1654 .unwrap();
1655 assert_eq!(count_before, 10);
1656
1657 store.compact_recent(2, false).unwrap();
1659
1660 let count_after: i64 = conn
1662 .query_row("SELECT COUNT(*) FROM local.invocations", [], |r| r.get(0))
1663 .unwrap();
1664 assert_eq!(count_after, 10, "Compaction should preserve all records");
1665
1666 let mut found_cmds: Vec<String> = conn
1668 .prepare("SELECT cmd FROM local.invocations ORDER BY cmd")
1669 .unwrap()
1670 .query_map([], |r| r.get(0))
1671 .unwrap()
1672 .filter_map(|r| r.ok())
1673 .collect();
1674 found_cmds.sort();
1675 let mut expected = commands.clone();
1676 expected.sort();
1677 assert_eq!(found_cmds, expected, "All commands should be preserved");
1678 }
1679
1680 #[test]
1683 fn test_clean_recovers_orphaned() {
1684 let (_tmp, store) = setup_store();
1685
1686 let record = InvocationRecord::new_pending_local(
1689 "test-session",
1690 "crashed-command",
1691 "/home/user",
1692 999999999, "test@client",
1694 );
1695
1696 store.write_invocation(&record).unwrap();
1698
1699 let opts = CleanOptions::default();
1701 let stats = store.clean(&opts).unwrap();
1702
1703 assert_eq!(stats.pending_checked, 1);
1704 assert_eq!(stats.orphaned, 1);
1705 assert_eq!(stats.still_running, 0);
1706 }
1707
1708 #[test]
1709 fn test_clean_dry_run() {
1710 let (_tmp, store) = setup_store();
1711
1712 let record = InvocationRecord::new_pending_local(
1714 "test-session",
1715 "crashed-command",
1716 "/home/user",
1717 999999999, "test@client",
1719 );
1720
1721 store.write_invocation(&record).unwrap();
1723
1724 let opts = CleanOptions {
1726 dry_run: true,
1727 ..Default::default()
1728 };
1729 let stats = store.clean(&opts).unwrap();
1730
1731 assert_eq!(stats.pending_checked, 1);
1733 assert_eq!(stats.orphaned, 1);
1734
1735 let pending = store.get_pending_attempts().unwrap();
1737 assert_eq!(pending.len(), 1, "Attempt should still be pending after dry run");
1738 }
1739
1740 #[test]
1741 fn test_prune_archive() {
1742 let (tmp, store) = setup_store();
1743
1744 for i in 0..3 {
1746 let record = InvocationRecord::new(
1747 "test-session",
1748 format!("command-{}", i),
1749 "/home/user",
1750 0,
1751 "test@client",
1752 );
1753 store.write_invocation(&record).unwrap();
1754 }
1755
1756 let archive_stats = store.archive_old_data(0, false).unwrap();
1758 assert!(archive_stats.partitions_archived > 0, "Should archive data");
1759
1760 let archive_attempts = tmp.path().join("db/data/archive/attempts");
1762 assert!(archive_attempts.exists(), "Archive should exist");
1763
1764 let prune_stats = store.prune_archive(0, false).unwrap();
1766 assert!(prune_stats.files_pruned > 0, "Should prune files");
1767 assert!(prune_stats.bytes_freed > 0, "Should free bytes");
1768 }
1769
1770 #[test]
1771 fn test_clean_with_prune() {
1772 let (_tmp, store) = setup_store();
1773
1774 for i in 0..3 {
1776 let record = InvocationRecord::new(
1777 "another-session",
1778 format!("command-{}", i),
1779 "/home/user",
1780 0,
1781 "test@client",
1782 );
1783 store.write_invocation(&record).unwrap();
1784 }
1785 store.archive_old_data(0, false).unwrap();
1786
1787 let record = InvocationRecord::new_pending_local(
1790 "test-session",
1791 "crashed-command",
1792 "/home/user",
1793 999999999,
1794 "test@client",
1795 );
1796 store.write_invocation(&record).unwrap();
1797
1798 let opts = CleanOptions {
1800 prune: true,
1801 older_than_days: 0, ..Default::default()
1803 };
1804 let stats = store.clean(&opts).unwrap();
1805
1806 assert_eq!(stats.orphaned, 1, "Should recover 1 orphaned");
1808 assert!(stats.pruned_files > 0, "Should prune archive files");
1809 }
1810}