1use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10
11use chrono::{NaiveDate, Utc};
12use uuid::Uuid;
13
14use super::atomic;
15use super::Store;
16use crate::Result;
17
18fn is_compacted_file(name: &str) -> bool {
20 name.contains("__compacted-")
21}
22
23fn is_seed_file(name: &str) -> bool {
25 name.starts_with("_seed")
26}
27
28fn extract_session(name: &str) -> Option<String> {
34 let stem = name.strip_suffix(".parquet")?;
35
36 if let Some(idx) = stem.find("--") {
38 Some(stem[..idx].to_string())
39 } else {
40 Some(stem.to_string())
42 }
43}
44
45fn extract_compaction_number(name: &str) -> Option<u32> {
47 if let Some(start) = name.find("__compacted-") {
49 let after_prefix = &name[start + 12..]; if let Some(end) = after_prefix.find("__") {
51 if let Ok(n) = after_prefix[..end].parse::<u32>() {
52 return Some(n);
53 }
54 }
55 }
56 None
57}
58
59fn next_compaction_number(partition_dir: &Path, session: &str) -> u32 {
61 fs::read_dir(partition_dir)
62 .ok()
63 .map(|entries| {
64 entries
65 .filter_map(|e| e.ok())
66 .filter_map(|e| {
67 let name = e.file_name().to_string_lossy().to_string();
68 if name.starts_with(&format!("{}--__compacted-", session)) {
70 extract_compaction_number(&name)
71 } else {
72 None
73 }
74 })
75 .max()
76 .map(|n| n + 1)
77 .unwrap_or(0)
78 })
79 .unwrap_or(0)
80}
81
82fn file_mtime(path: &Path) -> u64 {
84 fs::metadata(path)
85 .and_then(|m| m.modified())
86 .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs())
87 .unwrap_or(0)
88}
89
90#[derive(Debug, Default)]
92pub struct CompactStats {
93 pub partitions_compacted: usize,
94 pub sessions_compacted: usize,
95 pub files_before: usize,
96 pub files_after: usize,
97 pub bytes_before: u64,
98 pub bytes_after: u64,
99}
100
101impl CompactStats {
102 pub fn add(&mut self, other: &CompactStats) {
103 self.partitions_compacted += other.partitions_compacted;
104 self.sessions_compacted += other.sessions_compacted;
105 self.files_before += other.files_before;
106 self.files_after += other.files_after;
107 self.bytes_before += other.bytes_before;
108 self.bytes_after += other.bytes_after;
109 }
110}
111
112#[derive(Debug, Default)]
114pub struct ArchiveStats {
115 pub partitions_archived: usize,
116 pub files_moved: usize,
117 pub bytes_moved: u64,
118}
119
120#[derive(Debug, Clone)]
122pub struct CompactOptions {
123 pub file_threshold: usize,
125 pub recompact_threshold: usize,
128 pub consolidate: bool,
130 pub dry_run: bool,
132 pub session_filter: Option<String>,
134}
135
136impl Default for CompactOptions {
137 fn default() -> Self {
138 Self {
139 file_threshold: 50,
140 recompact_threshold: 10,
141 consolidate: false,
142 dry_run: false,
143 session_filter: None,
144 }
145 }
146}
147
148#[derive(Debug)]
150pub struct AutoCompactOptions {
151 pub compact: CompactOptions,
153 pub archive_days: u32,
155}
156
157impl Default for AutoCompactOptions {
158 fn default() -> Self {
159 Self {
160 compact: CompactOptions::default(),
161 archive_days: 14,
162 }
163 }
164}
165
166impl Store {
167 fn compact_session_files(
172 &self,
173 partition_dir: &Path,
174 session: &str,
175 files: &mut Vec<PathBuf>,
176 keep_count: usize,
177 dry_run: bool,
178 ) -> Result<CompactStats> {
179 files.sort_by_key(|p| file_mtime(p));
181
182 let to_keep = files.len().saturating_sub(keep_count).max(0);
184 if to_keep < 2 {
185 return Ok(CompactStats::default());
187 }
188
189 let files_to_compact: Vec<PathBuf> = files.drain(..to_keep).collect();
190 let files_before = files_to_compact.len();
191 let bytes_before: u64 = files_to_compact
192 .iter()
193 .filter_map(|p| fs::metadata(p).ok())
194 .map(|m| m.len())
195 .sum();
196
197 if dry_run {
198 return Ok(CompactStats {
199 partitions_compacted: 0, sessions_compacted: 1,
201 files_before,
202 files_after: 1,
203 bytes_before,
204 bytes_after: bytes_before, });
206 }
207
208 let conn = self.connection()?;
209
210 let file_list: Vec<String> = files_to_compact
212 .iter()
213 .map(|p| p.display().to_string())
214 .collect();
215 let file_list_sql = file_list
216 .iter()
217 .map(|f| format!("'{}'", f))
218 .collect::<Vec<_>>()
219 .join(", ");
220
221 conn.execute(
223 &format!(
224 "CREATE OR REPLACE TEMP TABLE compact_temp AS
225 SELECT * FROM read_parquet([{}], union_by_name = true)",
226 file_list_sql
227 ),
228 [],
229 )?;
230
231 let seq_num = next_compaction_number(partition_dir, session);
233 let uuid = Uuid::now_v7();
234 let compacted_name = format!("{}--__compacted-{}__--{}.parquet", session, seq_num, uuid);
235 let compacted_path = partition_dir.join(&compacted_name);
236 let temp_path = atomic::temp_path(&compacted_path);
237
238 conn.execute(
239 &format!(
240 "COPY compact_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
241 temp_path.display()
242 ),
243 [],
244 )?;
245
246 conn.execute("DROP TABLE compact_temp", [])?;
247
248 let bytes_after = fs::metadata(&temp_path)?.len();
250
251 for file in &files_to_compact {
253 fs::remove_file(file)?;
254 }
255
256 atomic::rename_into_place(&temp_path, &compacted_path)?;
258
259 Ok(CompactStats {
260 partitions_compacted: 0, sessions_compacted: 1,
262 files_before,
263 files_after: 1,
264 bytes_before,
265 bytes_after,
266 })
267 }
268
269 fn consolidate_session_files(
274 &self,
275 partition_dir: &Path,
276 _session: &str,
277 files: Vec<PathBuf>,
278 dry_run: bool,
279 ) -> Result<CompactStats> {
280 if files.len() < 2 {
281 return Ok(CompactStats::default());
282 }
283
284 let files_before = files.len();
285 let bytes_before: u64 = files
286 .iter()
287 .filter_map(|p| fs::metadata(p).ok())
288 .map(|m| m.len())
289 .sum();
290
291 if dry_run {
292 return Ok(CompactStats {
293 partitions_compacted: 0,
294 sessions_compacted: 1,
295 files_before,
296 files_after: 1,
297 bytes_before,
298 bytes_after: bytes_before,
299 });
300 }
301
302 let conn = self.connection()?;
303
304 let file_list_sql = files
306 .iter()
307 .map(|p| format!("'{}'", p.display()))
308 .collect::<Vec<_>>()
309 .join(", ");
310
311 conn.execute(
313 &format!(
314 "CREATE OR REPLACE TEMP TABLE consolidate_temp AS
315 SELECT * FROM read_parquet([{}], union_by_name = true)",
316 file_list_sql
317 ),
318 [],
319 )?;
320
321 let uuid = Uuid::now_v7();
323 let consolidated_name = format!("data_{}.parquet", uuid);
324 let consolidated_path = partition_dir.join(&consolidated_name);
325 let temp_path = atomic::temp_path(&consolidated_path);
326
327 conn.execute(
328 &format!(
329 "COPY consolidate_temp TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
330 temp_path.display()
331 ),
332 [],
333 )?;
334
335 conn.execute("DROP TABLE consolidate_temp", [])?;
336
337 let bytes_after = fs::metadata(&temp_path)?.len();
338
339 for file in &files {
341 fs::remove_file(file)?;
342 }
343
344 atomic::rename_into_place(&temp_path, &consolidated_path)?;
346
347 Ok(CompactStats {
348 partitions_compacted: 0,
349 sessions_compacted: 1,
350 files_before,
351 files_after: 1,
352 bytes_before,
353 bytes_after,
354 })
355 }
356
357 pub fn compact_partition_with_opts(
364 &self,
365 partition_dir: &Path,
366 opts: &CompactOptions,
367 ) -> Result<CompactStats> {
368 let mut total_stats = CompactStats::default();
369
370 let mut session_files: HashMap<String, (Vec<PathBuf>, Vec<PathBuf>)> = HashMap::new();
372
373 for entry in fs::read_dir(partition_dir)? {
374 let entry = entry?;
375 let path = entry.path();
376 let name = entry.file_name().to_string_lossy().to_string();
377
378 if !name.ends_with(".parquet") || is_seed_file(&name) {
380 continue;
381 }
382
383 if let Some(session) = extract_session(&name) {
385 if let Some(ref filter) = opts.session_filter {
387 if session != *filter {
388 continue;
389 }
390 }
391
392 let entry = session_files.entry(session).or_insert_with(|| (vec![], vec![]));
393 if is_compacted_file(&name) || name.starts_with("data_") {
394 entry.1.push(path); } else {
396 entry.0.push(path); }
398 }
399 }
400
401 let mut any_compacted = false;
402 for (session, (non_compacted, compacted)) in session_files {
403 if opts.consolidate {
404 let all_files: Vec<PathBuf> = non_compacted.into_iter().chain(compacted).collect();
406 if all_files.len() >= 2 {
407 let stats = self.consolidate_session_files(
408 partition_dir,
409 &session,
410 all_files,
411 opts.dry_run,
412 )?;
413 if stats.sessions_compacted > 0 {
414 any_compacted = true;
415 total_stats.add(&stats);
416 }
417 }
418 } else {
419 if non_compacted.len() >= opts.file_threshold {
423 let mut to_process = non_compacted;
424 let stats = self.compact_session_files(
425 partition_dir,
426 &session,
427 &mut to_process,
428 opts.file_threshold,
429 opts.dry_run,
430 )?;
431 if stats.sessions_compacted > 0 {
432 any_compacted = true;
433 total_stats.add(&stats);
434 }
435 }
436
437 if opts.recompact_threshold > 0 && compacted.len() >= opts.recompact_threshold {
439 let stats = self.consolidate_session_files(
440 partition_dir,
441 &session,
442 compacted,
443 opts.dry_run,
444 )?;
445 if stats.sessions_compacted > 0 {
446 any_compacted = true;
447 total_stats.add(&stats);
448 }
449 }
450 }
451 }
452
453 if any_compacted {
454 total_stats.partitions_compacted = 1;
455 }
456
457 Ok(total_stats)
458 }
459
460 pub fn compact_partition(
462 &self,
463 partition_dir: &Path,
464 file_threshold: usize,
465 session_filter: Option<&str>,
466 dry_run: bool,
467 ) -> Result<CompactStats> {
468 let opts = CompactOptions {
469 file_threshold,
470 recompact_threshold: 0, consolidate: false,
472 dry_run,
473 session_filter: session_filter.map(|s| s.to_string()),
474 };
475 self.compact_partition_with_opts(partition_dir, &opts)
476 }
477
478 pub fn compact_data_type(
480 &self,
481 data_dir: &Path,
482 file_threshold: usize,
483 session_filter: Option<&str>,
484 dry_run: bool,
485 ) -> Result<CompactStats> {
486 let mut total_stats = CompactStats::default();
487
488 if !data_dir.exists() {
489 return Ok(total_stats);
490 }
491
492 for entry in fs::read_dir(data_dir)? {
494 let entry = entry?;
495 let path = entry.path();
496
497 if !path.is_dir() {
498 continue;
499 }
500
501 let stats = self.compact_partition(&path, file_threshold, session_filter, dry_run)?;
502 total_stats.add(&stats);
503 }
504
505 Ok(total_stats)
506 }
507
508 pub fn compact_for_session(
512 &self,
513 session_id: &str,
514 file_threshold: usize,
515 dry_run: bool,
516 ) -> Result<CompactStats> {
517 let mut total_stats = CompactStats::default();
518 let recent_dir = self.config().recent_dir();
519
520 for data_type in &["invocations", "outputs", "sessions", "events"] {
521 let data_dir = recent_dir.join(data_type);
522 let stats =
523 self.compact_data_type(&data_dir, file_threshold, Some(session_id), dry_run)?;
524 total_stats.add(&stats);
525 }
526
527 Ok(total_stats)
528 }
529
530 pub fn compact_session_today(
534 &self,
535 session_id: &str,
536 file_threshold: usize,
537 dry_run: bool,
538 ) -> Result<CompactStats> {
539 let mut total_stats = CompactStats::default();
540 let recent_dir = self.config().recent_dir();
541 let today = Utc::now().date_naive();
542 let date_partition = format!("date={}", today.format("%Y-%m-%d"));
543
544 for data_type in &["invocations", "outputs", "sessions", "events"] {
545 let partition_dir = recent_dir.join(data_type).join(&date_partition);
546 if partition_dir.exists() {
547 let stats = self.compact_partition(
548 &partition_dir,
549 file_threshold,
550 Some(session_id),
551 dry_run,
552 )?;
553 total_stats.add(&stats);
554 }
555 }
556
557 Ok(total_stats)
558 }
559
560 pub fn compact_recent(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
562 let mut total_stats = CompactStats::default();
563 let recent_dir = self.config().recent_dir();
564
565 for data_type in &["invocations", "outputs", "sessions", "events"] {
566 let data_dir = recent_dir.join(data_type);
567 let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
568 total_stats.add(&stats);
569 }
570
571 Ok(total_stats)
572 }
573
574 pub fn compact_archive(&self, file_threshold: usize, dry_run: bool) -> Result<CompactStats> {
576 let mut total_stats = CompactStats::default();
577 let archive_dir = self.config().archive_dir();
578
579 for data_type in &["invocations", "outputs", "sessions", "events"] {
580 let data_dir = archive_dir.join(data_type);
581 let stats = self.compact_data_type(&data_dir, file_threshold, None, dry_run)?;
582 total_stats.add(&stats);
583 }
584
585 Ok(total_stats)
586 }
587
588 pub fn archive_old_data(&self, older_than_days: u32, dry_run: bool) -> Result<ArchiveStats> {
593 let mut stats = ArchiveStats::default();
594 let cutoff_date = Utc::now().date_naive() - chrono::Duration::days(older_than_days as i64);
595
596 let recent_dir = self.config().recent_dir();
597 let archive_dir = self.config().archive_dir();
598
599 for data_type in &["invocations", "outputs", "sessions", "events"] {
600 let recent_data_dir = recent_dir.join(data_type);
601 let archive_data_dir = archive_dir.join(data_type);
602
603 if !recent_data_dir.exists() {
604 continue;
605 }
606
607 let partitions_to_archive: Vec<(NaiveDate, PathBuf, String)> = fs::read_dir(&recent_data_dir)?
609 .filter_map(|e| e.ok())
610 .filter_map(|e| {
611 let path = e.path();
612 if !path.is_dir() {
613 return None;
614 }
615 let dir_name = e.file_name().to_string_lossy().to_string();
616 let date_str = dir_name.strip_prefix("date=")?;
617 let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?;
618 if date < cutoff_date {
619 Some((date, path, dir_name))
620 } else {
621 None
622 }
623 })
624 .collect();
625
626 for (_date, partition_path, dir_name) in partitions_to_archive {
627 let dest_dir = archive_data_dir.join(&dir_name);
628
629 let source_files: Vec<PathBuf> = fs::read_dir(&partition_path)?
631 .filter_map(|e| e.ok())
632 .map(|e| e.path())
633 .filter(|p| p.extension().map(|ext| ext == "parquet").unwrap_or(false))
634 .collect();
635
636 if source_files.is_empty() {
637 continue;
638 }
639
640 let file_count = source_files.len();
641 let bytes_before: u64 = source_files
642 .iter()
643 .filter_map(|p| fs::metadata(p).ok())
644 .map(|m| m.len())
645 .sum();
646
647 if dry_run {
648 stats.partitions_archived += 1;
649 stats.files_moved += file_count;
650 stats.bytes_moved += bytes_before;
651 continue;
652 }
653
654 if dest_dir.exists() {
656 let existing_files = fs::read_dir(&dest_dir)?
657 .filter_map(|e| e.ok())
658 .any(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false));
659 if existing_files {
660 continue;
662 }
663 }
664
665 fs::create_dir_all(&dest_dir)?;
666
667 let conn = self.connection()?;
669 let src_glob = format!("{}/*.parquet", partition_path.display());
670 let dest_file = dest_dir.join(format!("data_0.parquet"));
671 let temp_file = dest_dir.join(format!(".data_0.parquet.tmp"));
672
673 conn.execute(
674 &format!(
675 "COPY (SELECT * FROM read_parquet('{}', union_by_name = true)) \
676 TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
677 src_glob,
678 temp_file.display()
679 ),
680 [],
681 )?;
682
683 fs::rename(&temp_file, &dest_file)?;
685
686 let bytes_after = fs::metadata(&dest_file)?.len();
688
689 for file in &source_files {
691 let _ = fs::remove_file(file);
692 }
693 let _ = fs::remove_dir(&partition_path);
694
695 stats.partitions_archived += 1;
696 stats.files_moved += file_count;
697 stats.bytes_moved += bytes_after;
698 }
699 }
700
701 Ok(stats)
702 }
703
704 pub fn auto_compact(&self, opts: &AutoCompactOptions) -> Result<(CompactStats, ArchiveStats)> {
706 let archive_stats = if opts.compact.session_filter.is_none() {
708 self.archive_old_data(opts.archive_days, opts.compact.dry_run)?
709 } else {
710 ArchiveStats::default()
711 };
712
713 let compact_stats = if let Some(ref session) = opts.compact.session_filter {
715 self.compact_for_session_with_opts(session, &opts.compact)?
717 } else {
718 let mut stats = self.compact_recent_with_opts(&opts.compact)?;
720 let archive_compact = self.compact_archive_with_opts(&opts.compact)?;
721 stats.add(&archive_compact);
722 stats
723 };
724
725 Ok((compact_stats, archive_stats))
726 }
727
728 pub fn compact_recent_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
730 let mut total_stats = CompactStats::default();
731 let recent_dir = self.config().recent_dir();
732
733 for data_type in &["invocations", "outputs", "sessions", "events"] {
734 let data_dir = recent_dir.join(data_type);
735 let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
736 total_stats.add(&stats);
737 }
738
739 Ok(total_stats)
740 }
741
742 pub fn compact_archive_with_opts(&self, opts: &CompactOptions) -> Result<CompactStats> {
744 let mut total_stats = CompactStats::default();
745 let archive_dir = self.config().archive_dir();
746
747 for data_type in &["invocations", "outputs", "sessions", "events"] {
748 let data_dir = archive_dir.join(data_type);
749 let stats = self.compact_data_type_with_opts(&data_dir, opts)?;
750 total_stats.add(&stats);
751 }
752
753 Ok(total_stats)
754 }
755
756 pub fn compact_data_type_with_opts(
758 &self,
759 data_dir: &Path,
760 opts: &CompactOptions,
761 ) -> Result<CompactStats> {
762 let mut total_stats = CompactStats::default();
763
764 if !data_dir.exists() {
765 return Ok(total_stats);
766 }
767
768 for entry in fs::read_dir(data_dir)? {
769 let entry = entry?;
770 let path = entry.path();
771
772 if !path.is_dir() {
773 continue;
774 }
775
776 let stats = self.compact_partition_with_opts(&path, opts)?;
777 total_stats.add(&stats);
778 }
779
780 Ok(total_stats)
781 }
782
783 pub fn compact_for_session_with_opts(
785 &self,
786 session_id: &str,
787 opts: &CompactOptions,
788 ) -> Result<CompactStats> {
789 let mut total_stats = CompactStats::default();
790 let recent_dir = self.config().recent_dir();
791
792 let session_opts = CompactOptions {
793 session_filter: Some(session_id.to_string()),
794 ..opts.clone()
795 };
796
797 for data_type in &["invocations", "outputs", "sessions", "events"] {
798 let data_dir = recent_dir.join(data_type);
799 let stats = self.compact_data_type_with_opts(&data_dir, &session_opts)?;
800 total_stats.add(&stats);
801 }
802
803 Ok(total_stats)
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use super::*;
810 use crate::init::initialize;
811 use crate::schema::InvocationRecord;
812 use crate::Config;
813 use tempfile::TempDir;
814
815 fn setup_store() -> (TempDir, Store) {
816 let tmp = TempDir::new().unwrap();
817 let config = Config::with_root(tmp.path());
818 initialize(&config).unwrap();
819 let store = Store::open(config).unwrap();
820 (tmp, store)
821 }
822
823 #[test]
824 fn test_extract_session() {
825 assert_eq!(
826 extract_session("zsh-1234--make--uuid.parquet"),
827 Some("zsh-1234".to_string())
828 );
829 assert_eq!(
830 extract_session("session-id.parquet"),
831 Some("session-id".to_string())
832 );
833 assert_eq!(
834 extract_session("zsh-1234--__compacted-0__--uuid.parquet"),
835 Some("zsh-1234".to_string())
836 );
837 }
838
839 #[test]
840 fn test_is_compacted_file() {
841 assert!(is_compacted_file("zsh-1234--__compacted-0__--uuid.parquet"));
842 assert!(is_compacted_file("session--__compacted-5__--abc.parquet"));
843 assert!(!is_compacted_file("zsh-1234--make--uuid.parquet"));
844 assert!(!is_compacted_file("session.parquet"));
845 }
846
847 #[test]
848 fn test_extract_compaction_number() {
849 assert_eq!(
850 extract_compaction_number("zsh--__compacted-0__--uuid.parquet"),
851 Some(0)
852 );
853 assert_eq!(
854 extract_compaction_number("zsh--__compacted-42__--uuid.parquet"),
855 Some(42)
856 );
857 assert_eq!(
858 extract_compaction_number("zsh--make--uuid.parquet"),
859 None
860 );
861 }
862
863 #[test]
864 fn test_compact_recent_no_files() {
865 let (_tmp, store) = setup_store();
866
867 let stats = store.compact_recent(2, false).unwrap();
868 assert_eq!(stats.partitions_compacted, 0);
869 }
870
871 #[test]
872 fn test_compact_recent_with_files() {
873 let (_tmp, store) = setup_store();
874
875 for i in 0..5 {
877 let record = InvocationRecord::new(
878 "test-session",
879 format!("command-{}", i),
880 "/home/user",
881 0,
882 "test@client",
883 );
884 store.write_invocation(&record).unwrap();
885 }
886
887 let stats = store.compact_recent(2, false).unwrap();
889 assert_eq!(stats.sessions_compacted, 1);
890 assert_eq!(stats.files_before, 3); assert_eq!(stats.files_after, 1);
892 }
893
894 #[test]
895 fn test_compact_for_session() {
896 let (_tmp, store) = setup_store();
897
898 for i in 0..5 {
900 let record = InvocationRecord::new(
901 "session-a",
902 format!("command-{}", i),
903 "/home/user",
904 0,
905 "test@client",
906 );
907 store.write_invocation(&record).unwrap();
908 }
909 for i in 0..3 {
910 let record = InvocationRecord::new(
911 "session-b",
912 format!("command-{}", i),
913 "/home/user",
914 0,
915 "test@client",
916 );
917 store.write_invocation(&record).unwrap();
918 }
919
920 let stats = store.compact_for_session("session-a", 2, false).unwrap();
922 assert_eq!(stats.sessions_compacted, 1);
923 assert_eq!(stats.files_before, 3); let date = chrono::Utc::now().date_naive();
927 let inv_dir = store.config().invocations_dir(&date);
928 let session_b_count = std::fs::read_dir(&inv_dir)
929 .unwrap()
930 .filter_map(|e| e.ok())
931 .filter(|e| e.file_name().to_string_lossy().starts_with("session-b"))
932 .count();
933 assert_eq!(session_b_count, 3);
934 }
935
936 #[test]
937 fn test_compact_dry_run() {
938 let (_tmp, store) = setup_store();
939
940 for i in 0..5 {
942 let record = InvocationRecord::new(
943 "test-session",
944 format!("command-{}", i),
945 "/home/user",
946 0,
947 "test@client",
948 );
949 store.write_invocation(&record).unwrap();
950 }
951
952 let stats = store.compact_recent(2, true).unwrap();
954 assert_eq!(stats.sessions_compacted, 1);
955 assert_eq!(stats.files_before, 3);
956
957 let date = chrono::Utc::now().date_naive();
959 let inv_dir = store.config().invocations_dir(&date);
960 let file_count = std::fs::read_dir(&inv_dir)
961 .unwrap()
962 .filter_map(|e| e.ok())
963 .filter(|e| e.path().extension().map(|ext| ext == "parquet").unwrap_or(false))
964 .filter(|e| !e.file_name().to_string_lossy().starts_with("_seed"))
965 .count();
966 assert_eq!(file_count, 5);
967 }
968
969 #[test]
970 fn test_compacted_file_naming() {
971 let (_tmp, store) = setup_store();
972
973 for i in 0..5 {
975 let record = InvocationRecord::new(
976 "zsh-9999",
977 format!("cmd-{}", i),
978 "/home/user",
979 0,
980 "test@client",
981 );
982 store.write_invocation(&record).unwrap();
983 }
984
985 store.compact_recent(2, false).unwrap();
987
988 let date = chrono::Utc::now().date_naive();
990 let inv_dir = store.config().invocations_dir(&date);
991 let compacted_files: Vec<_> = std::fs::read_dir(&inv_dir)
992 .unwrap()
993 .filter_map(|e| e.ok())
994 .filter(|e| is_compacted_file(&e.file_name().to_string_lossy()))
995 .collect();
996
997 assert_eq!(compacted_files.len(), 1);
998 let name = compacted_files[0].file_name().to_string_lossy().to_string();
999 assert!(name.starts_with("zsh-9999--__compacted-0__--"));
1000 assert!(name.ends_with(".parquet"));
1001 }
1002}