1use crate::{
2 error::{AllSourceError, Result},
3 infrastructure::persistence::{cold_tier::ArchiveTarget, storage::ParquetStorage},
4};
5use chrono::{DateTime, Utc};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use std::{collections::HashMap, fs, path::PathBuf, sync::Arc, time::Duration};
9
10pub struct CompactionManager {
20 storage_dir: PathBuf,
22
23 config: CompactionConfig,
25
26 stats: Arc<RwLock<CompactionStats>>,
28
29 last_compaction: Arc<RwLock<Option<DateTime<Utc>>>>,
31}
32
33const SNAPSHOT_PREFIX: &str = "snapshot.";
38
39#[derive(Debug, Clone)]
40pub struct CompactionConfig {
41 pub min_files_to_compact: usize,
43
44 pub target_file_size: usize,
46
47 pub max_file_size: usize,
49
50 pub small_file_threshold: usize,
52
53 pub compaction_interval_seconds: u64,
55
56 pub auto_compact: bool,
58
59 pub strategy: CompactionStrategy,
61
62 pub retention: RetentionConfig,
69
70 pub archive: Option<Arc<dyn ArchiveTarget>>,
78}
79
80#[derive(Debug, Clone)]
94pub struct RetentionConfig {
95 pub default_ttl: Option<Duration>,
97 pub per_tenant_ttl: HashMap<String, Option<Duration>>,
102}
103
104impl Default for RetentionConfig {
105 fn default() -> Self {
106 let mut per_tenant_ttl = HashMap::new();
107 per_tenant_ttl.insert(
108 "system".to_string(),
109 Some(Duration::from_secs(30 * 24 * 3600)),
110 );
111 Self {
112 default_ttl: None,
113 per_tenant_ttl,
114 }
115 }
116}
117
118impl RetentionConfig {
119 pub fn ttl_for(&self, tenant_id: &str) -> Option<Duration> {
126 match self.per_tenant_ttl.get(tenant_id) {
127 Some(v) => *v,
128 None => self.default_ttl,
129 }
130 }
131
132 pub fn set(&mut self, tenant_id: &str, ttl: Option<Duration>) {
135 self.per_tenant_ttl.insert(tenant_id.to_string(), ttl);
136 }
137}
138
139impl Default for CompactionConfig {
140 fn default() -> Self {
141 Self {
142 min_files_to_compact: 3,
143 target_file_size: 128 * 1024 * 1024, max_file_size: 256 * 1024 * 1024, small_file_threshold: 10 * 1024 * 1024, compaction_interval_seconds: 3600, auto_compact: true,
148 strategy: CompactionStrategy::SizeBased,
149 retention: RetentionConfig::default(),
150 archive: None,
151 }
152 }
153}
154
155impl CompactionConfig {
156 pub fn from_env() -> Self {
165 Self::from_env_vars(
166 std::env::var("ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS").ok(),
167 std::env::var("ALLSOURCE_RETENTION_SYSTEM_DAYS").ok(),
168 )
169 }
170
171 pub fn from_env_vars(
174 interval_var: Option<String>,
175 system_retention_days_var: Option<String>,
176 ) -> Self {
177 let mut config = Self::default();
178 if let Some(s) = interval_var.filter(|s| !s.is_empty()) {
179 match s.parse::<u64>() {
180 Ok(v) => config.compaction_interval_seconds = v,
181 Err(e) => {
182 tracing::warn!(
183 "ALLSOURCE_SNAPSHOT_INTERVAL_SECONDS={s:?} could not be parsed as \
184 u64: {e}; defaulting to {}s",
185 config.compaction_interval_seconds
186 );
187 }
188 }
189 }
190 if let Some(s) = system_retention_days_var.filter(|s| !s.is_empty()) {
191 match s.parse::<u64>() {
192 Ok(days) => {
193 config
194 .retention
195 .set("system", Some(Duration::from_secs(days * 24 * 3600)));
196 }
197 Err(e) => {
198 tracing::warn!(
199 "ALLSOURCE_RETENTION_SYSTEM_DAYS={s:?} could not be parsed as u64: \
200 {e}; defaulting to 30 days for tenant=system"
201 );
202 }
203 }
204 }
205 config
206 }
207
208 pub fn from_env_var(interval_var: Option<String>) -> Self {
211 Self::from_env_vars(interval_var, None)
212 }
213}
214
215#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
216#[serde(rename_all = "lowercase")]
217pub enum CompactionStrategy {
218 SizeBased,
220 TimeBased,
222 FullCompaction,
224}
225
226#[derive(Debug, Clone, Default, Serialize)]
227pub struct CompactionStats {
228 pub total_compactions: u64,
229 pub total_files_compacted: u64,
230 pub total_bytes_before: u64,
231 pub total_bytes_after: u64,
232 pub total_events_compacted: u64,
233 pub last_compaction_duration_ms: u64,
234 pub space_saved_bytes: u64,
235}
236
237#[derive(Debug, Clone)]
239struct FileInfo {
240 path: PathBuf,
241 size: u64,
242 created: DateTime<Utc>,
243}
244
245impl CompactionManager {
246 pub fn new(storage_dir: impl Into<PathBuf>, config: CompactionConfig) -> Self {
248 let storage_dir = storage_dir.into();
249
250 tracing::info!(
251 "✅ Compaction manager initialized at: {}",
252 storage_dir.display()
253 );
254
255 Self {
256 storage_dir,
257 config,
258 stats: Arc::new(RwLock::new(CompactionStats::default())),
259 last_compaction: Arc::new(RwLock::new(None)),
260 }
261 }
262
263 fn list_parquet_files(&self) -> Result<Vec<FileInfo>> {
265 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
266 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
267 })?;
268
269 let mut files = Vec::new();
270
271 for entry in entries {
272 let entry = entry.map_err(|e| {
273 AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
274 })?;
275
276 let path = entry.path();
277 if let Some(ext) = path.extension()
278 && ext == "parquet"
279 {
280 let metadata = entry.metadata().map_err(|e| {
281 AllSourceError::StorageError(format!("Failed to read file metadata: {e}"))
282 })?;
283
284 let size = metadata.len();
285 let created = metadata
286 .created()
287 .ok()
288 .and_then(|t| {
289 t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| {
290 DateTime::from_timestamp(d.as_secs() as i64, 0).unwrap_or_else(Utc::now)
291 })
292 })
293 .unwrap_or_else(Utc::now);
294
295 files.push(FileInfo {
296 path,
297 size,
298 created,
299 });
300 }
301 }
302
303 files.sort_by_key(|f| f.created);
305
306 Ok(files)
307 }
308
309 fn select_files_for_compaction(&self, files: &[FileInfo]) -> Vec<FileInfo> {
311 match self.config.strategy {
312 CompactionStrategy::SizeBased => self.select_small_files(files),
313 CompactionStrategy::TimeBased => self.select_old_files(files),
314 CompactionStrategy::FullCompaction => files.to_vec(),
315 }
316 }
317
318 fn select_small_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
320 let small_files: Vec<FileInfo> = files
321 .iter()
322 .filter(|f| f.size < self.config.small_file_threshold as u64)
323 .cloned()
324 .collect();
325
326 if small_files.len() >= self.config.min_files_to_compact {
328 small_files
329 } else {
330 Vec::new()
331 }
332 }
333
334 fn select_old_files(&self, files: &[FileInfo]) -> Vec<FileInfo> {
336 let now = Utc::now();
337 let age_threshold = chrono::Duration::hours(24); let old_files: Vec<FileInfo> = files
340 .iter()
341 .filter(|f| now - f.created > age_threshold)
342 .cloned()
343 .collect();
344
345 if old_files.len() >= self.config.min_files_to_compact {
346 old_files
347 } else {
348 Vec::new()
349 }
350 }
351
352 #[cfg_attr(feature = "hotpath", hotpath::measure)]
354 pub fn should_compact(&self) -> bool {
355 if !self.config.auto_compact {
356 return false;
357 }
358
359 let last = self.last_compaction.read();
360 match *last {
361 None => true, Some(last_time) => {
363 let elapsed = (Utc::now() - last_time).num_seconds();
364 elapsed >= self.config.compaction_interval_seconds as i64
365 }
366 }
367 }
368
369 #[cfg_attr(feature = "hotpath", hotpath::measure)]
381 pub fn compact(&self) -> Result<CompactionResult> {
382 let start_time = std::time::Instant::now();
383 tracing::info!("🔄 Starting per-tenant compaction sweep...");
384
385 let tenants = self.discover_tenants()?;
386 if tenants.is_empty() {
387 tracing::debug!("No tenants found under {}", self.storage_dir.display());
388 return Ok(CompactionResult::default());
389 }
390
391 let mut aggregate = CompactionResult::default();
392 for tenant in &tenants {
393 match self.compact_tenant(tenant) {
394 Ok(r) => {
395 aggregate.files_compacted += r.files_compacted;
396 aggregate.bytes_before += r.bytes_before;
397 aggregate.bytes_after += r.bytes_after;
398 aggregate.events_compacted += r.events_compacted;
399 }
400 Err(e) => {
401 tracing::error!(
402 tenant_id = %tenant,
403 "compact_tenant failed: {e}"
404 );
405 }
406 }
407 }
408 aggregate.duration_ms = start_time.elapsed().as_millis() as u64;
409
410 if aggregate.files_compacted > 0 {
411 let mut stats = self.stats.write();
412 stats.total_compactions += 1;
413 stats.total_files_compacted += aggregate.files_compacted as u64;
414 stats.total_bytes_before += aggregate.bytes_before;
415 stats.total_bytes_after += aggregate.bytes_after;
416 stats.total_events_compacted += aggregate.events_compacted as u64;
417 stats.last_compaction_duration_ms = aggregate.duration_ms;
418 stats.space_saved_bytes += aggregate.bytes_before.saturating_sub(aggregate.bytes_after);
419 }
420 *self.last_compaction.write() = Some(Utc::now());
421
422 tracing::info!(
423 "✅ Compaction sweep complete: {} files → 1 snapshot per tenant, \
424 {:.2} MB → {:.2} MB, {} events, {} tenants in {}ms",
425 aggregate.files_compacted,
426 aggregate.bytes_before as f64 / (1024.0 * 1024.0),
427 aggregate.bytes_after as f64 / (1024.0 * 1024.0),
428 aggregate.events_compacted,
429 tenants.len(),
430 aggregate.duration_ms
431 );
432
433 Ok(aggregate)
434 }
435
436 pub fn compact_tenant(&self, tenant_id: &str) -> Result<CompactionResult> {
454 let start_time = std::time::Instant::now();
455
456 let storage = ParquetStorage::new(&self.storage_dir)?;
458 let all_files = storage.list_parquet_files_for_tenant(tenant_id)?;
459 let raw_files: Vec<FileInfo> = all_files
460 .into_iter()
461 .filter(|p| {
462 p.file_name()
463 .and_then(|n| n.to_str())
464 .is_none_or(|n| !n.starts_with(SNAPSHOT_PREFIX))
465 })
466 .filter_map(|p| {
467 let metadata = fs::metadata(&p).ok()?;
468 let size = metadata.len();
469 let created = metadata
470 .created()
471 .ok()
472 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
473 .and_then(|d| DateTime::from_timestamp(d.as_secs() as i64, 0))
474 .unwrap_or_else(Utc::now);
475 Some(FileInfo {
476 path: p,
477 size,
478 created,
479 })
480 })
481 .collect();
482
483 let candidates = self.select_files_for_compaction(&raw_files);
485 if candidates.is_empty() {
486 tracing::debug!(
487 tenant_id = tenant_id,
488 strategy = ?self.config.strategy,
489 "no files meet compaction criteria"
490 );
491 return Ok(CompactionResult::default());
492 }
493
494 let bytes_before: u64 = candidates.iter().map(|f| f.size).sum();
495 tracing::info!(
496 tenant_id = tenant_id,
497 files = candidates.len(),
498 mib = bytes_before as f64 / (1024.0 * 1024.0),
499 "compacting tenant"
500 );
501
502 let mut events = Vec::new();
508 for fi in &candidates {
509 let event_tenant = match fi.path.strip_prefix(&self.storage_dir).ok() {
512 Some(rel) => rel
513 .components()
514 .next()
515 .and_then(|c| match c {
516 std::path::Component::Normal(t) => Some(t.to_string_lossy().into_owned()),
517 _ => None,
518 })
519 .unwrap_or_else(|| "default".to_string()),
520 None => "default".to_string(),
521 };
522 match storage.load_events_from_file_path(&fi.path, &event_tenant) {
523 Ok(mut e) => events.append(&mut e),
524 Err(e) => {
525 tracing::error!(
526 file = %fi.path.display(),
527 "failed to read parquet file for compaction: {e}"
528 );
529 }
530 }
531 }
532
533 if events.is_empty() {
534 tracing::warn!(
535 tenant_id = tenant_id,
536 "candidate files had no readable events; skipping snapshot"
537 );
538 return Ok(CompactionResult::default());
539 }
540
541 let dropped_by_retention = if let Some(ttl) = self.config.retention.ttl_for(tenant_id) {
557 let cutoff = Utc::now()
558 - chrono::Duration::from_std(ttl).unwrap_or_else(|_| chrono::Duration::zero());
559 let before = events.len();
560
561 let (drained, kept): (Vec<_>, Vec<_>) = std::mem::take(&mut events)
566 .into_iter()
567 .partition(|e| e.timestamp < cutoff);
568 events = kept;
569 let dropped = before - events.len();
570
571 if dropped > 0 {
572 tracing::info!(
573 retention_tenant = tenant_id,
574 dropped = dropped,
575 kept = events.len(),
576 cutoff = %cutoff.to_rfc3339(),
577 ttl_secs = ttl.as_secs(),
578 "retention: dropped events older than TTL"
579 );
580
581 if let Some(archive) = self.config.archive.as_ref() {
582 let from = drained
583 .iter()
584 .map(|e| e.timestamp)
585 .min()
586 .expect("dropped > 0 guarantees non-empty drained");
587 let to = drained
588 .iter()
589 .map(|e| e.timestamp)
590 .max()
591 .expect("dropped > 0 guarantees non-empty drained");
592 archive.archive(tenant_id, from, to, &drained)?;
593 tracing::info!(
594 retention_tenant = tenant_id,
595 archived_to = %archive.description(),
596 archived = drained.len(),
597 "retention: dropped events archived to cold tier"
598 );
599 }
600 }
601 dropped
602 } else {
603 0
604 };
605
606 if events.is_empty() {
614 tracing::info!(
615 tenant_id = tenant_id,
616 files_dropped = candidates.len(),
617 events_dropped = dropped_by_retention,
618 "retention: every event aged out — deleting originals without snapshot"
619 );
620 for fi in &candidates {
621 if let Err(e) = fs::remove_file(&fi.path) {
622 tracing::error!(
623 file = %fi.path.display(),
624 "failed to remove fully-aged raw file: {e}"
625 );
626 }
627 }
628 return Ok(CompactionResult {
629 files_compacted: candidates.len(),
630 bytes_before,
631 bytes_after: 0,
632 events_compacted: 0,
633 duration_ms: start_time.elapsed().as_millis() as u64,
634 });
635 }
636
637 events.sort_by_key(|e| e.timestamp);
638 let from = events.first().expect("non-empty checked above").timestamp;
639 let to = events.last().expect("non-empty checked above").timestamp;
640
641 let file_stem = format!(
644 "snapshot.{tenant_id}.{}-{}",
645 format_iso_basic(from),
646 format_iso_basic(to)
647 );
648 let snapshot_path = storage.write_atomic_parquet(tenant_id, &file_stem, &events)?;
649 let bytes_after = fs::metadata(&snapshot_path).map(|m| m.len()).unwrap_or(0);
650
651 for fi in &candidates {
656 if let Err(e) = fs::remove_file(&fi.path) {
657 tracing::error!(
658 file = %fi.path.display(),
659 "failed to remove pre-snapshot raw file: {e}"
660 );
661 }
662 }
663
664 let duration_ms = start_time.elapsed().as_millis() as u64;
665 tracing::info!(
666 tenant_id = tenant_id,
667 files_compacted = candidates.len(),
668 events = events.len(),
669 dropped_by_retention = dropped_by_retention,
670 mib_before = bytes_before as f64 / (1024.0 * 1024.0),
671 mib_after = bytes_after as f64 / (1024.0 * 1024.0),
672 duration_ms = duration_ms,
673 "tenant compaction complete"
674 );
675
676 Ok(CompactionResult {
677 files_compacted: candidates.len(),
678 bytes_before,
679 bytes_after,
680 events_compacted: events.len(),
681 duration_ms,
682 })
683 }
684
685 fn discover_tenants(&self) -> Result<Vec<String>> {
690 let Ok(entries) = fs::read_dir(&self.storage_dir) else {
691 return Ok(Vec::new());
692 };
693 let mut tenants: Vec<String> = entries
694 .filter_map(std::result::Result::ok)
695 .filter_map(|entry| {
696 let ft = entry.file_type().ok()?;
697 if !ft.is_dir() {
698 return None;
699 }
700 let name = entry.file_name().to_string_lossy().into_owned();
701 if name.starts_with('.') || name == "__system" {
704 return None;
705 }
706 Some(name)
707 })
708 .collect();
709 tenants.sort();
710 Ok(tenants)
711 }
712
713 pub fn stats(&self) -> CompactionStats {
715 (*self.stats.read()).clone()
716 }
717
718 pub fn config(&self) -> &CompactionConfig {
720 &self.config
721 }
722
723 #[cfg_attr(feature = "hotpath", hotpath::measure)]
725 pub fn compact_now(&self) -> Result<CompactionResult> {
726 tracing::info!("Manual compaction triggered");
727 self.compact()
728 }
729}
730
731#[derive(Debug, Clone, Default, Serialize)]
733pub struct CompactionResult {
734 pub files_compacted: usize,
735 pub bytes_before: u64,
736 pub bytes_after: u64,
737 pub events_compacted: usize,
738 pub duration_ms: u64,
739}
740
741pub(super) fn format_iso_basic(t: DateTime<Utc>) -> String {
747 t.format("%Y-%m-%dT%H%M%SZ").to_string()
748}
749
750pub struct CompactionTask {
752 manager: Arc<CompactionManager>,
753 interval: Duration,
754}
755
756impl CompactionTask {
757 pub fn new(manager: Arc<CompactionManager>, interval_seconds: u64) -> Self {
759 Self {
760 manager,
761 interval: Duration::from_secs(interval_seconds),
762 }
763 }
764
765 #[cfg_attr(feature = "hotpath", hotpath::measure)]
767 pub async fn run(self) {
768 let mut interval = tokio::time::interval(self.interval);
769
770 loop {
771 interval.tick().await;
772
773 if self.manager.should_compact() {
774 tracing::debug!("Auto-compaction check triggered");
775
776 match self.manager.compact() {
777 Ok(result) => {
778 if result.files_compacted > 0 {
779 tracing::info!(
780 "Auto-compaction succeeded: {} files, {:.2} MB saved",
781 result.files_compacted,
782 (result.bytes_before - result.bytes_after) as f64
783 / (1024.0 * 1024.0)
784 );
785 }
786 }
787 Err(e) => {
788 tracing::error!("Auto-compaction failed: {}", e);
789 }
790 }
791 }
792 }
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799 use tempfile::TempDir;
800
801 #[test]
802 fn test_compaction_manager_creation() {
803 let temp_dir = TempDir::new().unwrap();
804 let config = CompactionConfig::default();
805 let manager = CompactionManager::new(temp_dir.path(), config);
806
807 assert_eq!(manager.stats().total_compactions, 0);
808 }
809
810 #[test]
811 fn test_should_compact() {
812 let temp_dir = TempDir::new().unwrap();
813 let config = CompactionConfig {
814 auto_compact: true,
815 compaction_interval_seconds: 1,
816 ..Default::default()
817 };
818 let manager = CompactionManager::new(temp_dir.path(), config);
819
820 assert!(manager.should_compact());
822 }
823
824 #[test]
825 fn test_file_selection_size_based() {
826 let temp_dir = TempDir::new().unwrap();
827 let config = CompactionConfig {
828 small_file_threshold: 1024 * 1024, min_files_to_compact: 2,
830 strategy: CompactionStrategy::SizeBased,
831 ..Default::default()
832 };
833 let manager = CompactionManager::new(temp_dir.path(), config);
834
835 let files = vec![
836 FileInfo {
837 path: PathBuf::from("small1.parquet"),
838 size: 500_000, created: Utc::now(),
840 },
841 FileInfo {
842 path: PathBuf::from("small2.parquet"),
843 size: 600_000, created: Utc::now(),
845 },
846 FileInfo {
847 path: PathBuf::from("large.parquet"),
848 size: 10_000_000, created: Utc::now(),
850 },
851 ];
852
853 let selected = manager.select_files_for_compaction(&files);
854 assert_eq!(selected.len(), 2); }
856
857 #[test]
858 fn test_default_compaction_config() {
859 let config = CompactionConfig::default();
860 assert_eq!(config.min_files_to_compact, 3);
861 assert_eq!(config.target_file_size, 128 * 1024 * 1024);
862 assert_eq!(config.max_file_size, 256 * 1024 * 1024);
863 assert_eq!(config.small_file_threshold, 10 * 1024 * 1024);
864 assert_eq!(config.compaction_interval_seconds, 3600);
865 assert!(config.auto_compact);
866 assert_eq!(config.strategy, CompactionStrategy::SizeBased);
867 }
868
869 #[test]
870 fn test_should_compact_disabled() {
871 let temp_dir = TempDir::new().unwrap();
872 let config = CompactionConfig {
873 auto_compact: false,
874 ..Default::default()
875 };
876 let manager = CompactionManager::new(temp_dir.path(), config);
877
878 assert!(!manager.should_compact());
879 }
880
881 #[test]
882 fn test_compact_empty_directory() {
883 let temp_dir = TempDir::new().unwrap();
884 let config = CompactionConfig::default();
885 let manager = CompactionManager::new(temp_dir.path(), config);
886
887 let result = manager.compact().unwrap();
888 assert_eq!(result.files_compacted, 0);
889 assert_eq!(result.bytes_before, 0);
890 assert_eq!(result.bytes_after, 0);
891 assert_eq!(result.events_compacted, 0);
892 }
893
894 #[test]
895 fn test_compact_now() {
896 let temp_dir = TempDir::new().unwrap();
897 let config = CompactionConfig::default();
898 let manager = CompactionManager::new(temp_dir.path(), config);
899
900 let result = manager.compact_now().unwrap();
901 assert_eq!(result.files_compacted, 0);
902 }
903
904 #[test]
905 fn test_get_config() {
906 let temp_dir = TempDir::new().unwrap();
907 let config = CompactionConfig {
908 min_files_to_compact: 5,
909 ..Default::default()
910 };
911 let manager = CompactionManager::new(temp_dir.path(), config);
912
913 assert_eq!(manager.config().min_files_to_compact, 5);
914 }
915
916 #[test]
917 fn test_get_stats() {
918 let temp_dir = TempDir::new().unwrap();
919 let config = CompactionConfig::default();
920 let manager = CompactionManager::new(temp_dir.path(), config);
921
922 let stats = manager.stats();
923 assert_eq!(stats.total_compactions, 0);
924 assert_eq!(stats.total_files_compacted, 0);
925 assert_eq!(stats.total_bytes_before, 0);
926 assert_eq!(stats.total_bytes_after, 0);
927 assert_eq!(stats.total_events_compacted, 0);
928 assert_eq!(stats.last_compaction_duration_ms, 0);
929 assert_eq!(stats.space_saved_bytes, 0);
930 }
931
932 #[test]
933 fn test_file_selection_not_enough_small_files() {
934 let temp_dir = TempDir::new().unwrap();
935 let config = CompactionConfig {
936 small_file_threshold: 1024 * 1024,
937 min_files_to_compact: 3, strategy: CompactionStrategy::SizeBased,
939 ..Default::default()
940 };
941 let manager = CompactionManager::new(temp_dir.path(), config);
942
943 let files = vec![
944 FileInfo {
945 path: PathBuf::from("small1.parquet"),
946 size: 500_000,
947 created: Utc::now(),
948 },
949 FileInfo {
950 path: PathBuf::from("small2.parquet"),
951 size: 600_000,
952 created: Utc::now(),
953 },
954 ];
955
956 let selected = manager.select_files_for_compaction(&files);
957 assert_eq!(selected.len(), 0); }
959
960 #[test]
961 fn test_file_selection_time_based() {
962 let temp_dir = TempDir::new().unwrap();
963 let config = CompactionConfig {
964 min_files_to_compact: 2,
965 strategy: CompactionStrategy::TimeBased,
966 ..Default::default()
967 };
968 let manager = CompactionManager::new(temp_dir.path(), config);
969
970 let old_time = Utc::now() - chrono::Duration::hours(48);
971 let files = vec![
972 FileInfo {
973 path: PathBuf::from("old1.parquet"),
974 size: 1_000_000,
975 created: old_time,
976 },
977 FileInfo {
978 path: PathBuf::from("old2.parquet"),
979 size: 2_000_000,
980 created: old_time,
981 },
982 FileInfo {
983 path: PathBuf::from("new.parquet"),
984 size: 500_000,
985 created: Utc::now(),
986 },
987 ];
988
989 let selected = manager.select_files_for_compaction(&files);
990 assert_eq!(selected.len(), 2); }
992
993 #[test]
994 fn test_file_selection_time_based_not_enough() {
995 let temp_dir = TempDir::new().unwrap();
996 let config = CompactionConfig {
997 min_files_to_compact: 3,
998 strategy: CompactionStrategy::TimeBased,
999 ..Default::default()
1000 };
1001 let manager = CompactionManager::new(temp_dir.path(), config);
1002
1003 let old_time = Utc::now() - chrono::Duration::hours(48);
1004 let files = vec![
1005 FileInfo {
1006 path: PathBuf::from("old1.parquet"),
1007 size: 1_000_000,
1008 created: old_time,
1009 },
1010 FileInfo {
1011 path: PathBuf::from("new.parquet"),
1012 size: 500_000,
1013 created: Utc::now(),
1014 },
1015 ];
1016
1017 let selected = manager.select_files_for_compaction(&files);
1018 assert_eq!(selected.len(), 0); }
1020
1021 #[test]
1022 fn test_file_selection_full_compaction() {
1023 let temp_dir = TempDir::new().unwrap();
1024 let config = CompactionConfig {
1025 strategy: CompactionStrategy::FullCompaction,
1026 ..Default::default()
1027 };
1028 let manager = CompactionManager::new(temp_dir.path(), config);
1029
1030 let files = vec![
1031 FileInfo {
1032 path: PathBuf::from("file1.parquet"),
1033 size: 1_000_000,
1034 created: Utc::now(),
1035 },
1036 FileInfo {
1037 path: PathBuf::from("file2.parquet"),
1038 size: 2_000_000,
1039 created: Utc::now(),
1040 },
1041 ];
1042
1043 let selected = manager.select_files_for_compaction(&files);
1044 assert_eq!(selected.len(), 2); }
1046
1047 #[test]
1048 fn test_compaction_strategy_serde() {
1049 let strategies = vec![
1050 CompactionStrategy::SizeBased,
1051 CompactionStrategy::TimeBased,
1052 CompactionStrategy::FullCompaction,
1053 ];
1054
1055 for strategy in strategies {
1056 let json = serde_json::to_string(&strategy).unwrap();
1057 let parsed: CompactionStrategy = serde_json::from_str(&json).unwrap();
1058 assert_eq!(parsed, strategy);
1059 }
1060 }
1061
1062 #[test]
1063 fn test_compaction_stats_default() {
1064 let stats = CompactionStats::default();
1065 assert_eq!(stats.total_compactions, 0);
1066 assert_eq!(stats.total_files_compacted, 0);
1067 }
1068
1069 #[test]
1070 fn test_compaction_stats_serde() {
1071 let stats = CompactionStats {
1072 total_compactions: 5,
1073 total_files_compacted: 20,
1074 total_bytes_before: 1000000,
1075 total_bytes_after: 500000,
1076 total_events_compacted: 10000,
1077 last_compaction_duration_ms: 500,
1078 space_saved_bytes: 500000,
1079 };
1080
1081 let json = serde_json::to_string(&stats).unwrap();
1082 assert!(json.contains("\"total_compactions\":5"));
1083 assert!(json.contains("\"space_saved_bytes\":500000"));
1084 }
1085
1086 #[test]
1087 fn test_compaction_result_serde() {
1088 let result = CompactionResult {
1089 files_compacted: 3,
1090 bytes_before: 1000000,
1091 bytes_after: 500000,
1092 events_compacted: 5000,
1093 duration_ms: 250,
1094 };
1095
1096 let json = serde_json::to_string(&result).unwrap();
1097 assert!(json.contains("\"files_compacted\":3"));
1098 assert!(json.contains("\"bytes_before\":1000000"));
1099 }
1100
1101 #[test]
1102 fn test_compaction_task_creation() {
1103 let temp_dir = TempDir::new().unwrap();
1104 let config = CompactionConfig::default();
1105 let manager = Arc::new(CompactionManager::new(temp_dir.path(), config));
1106
1107 let _task = CompactionTask::new(manager.clone(), 60);
1108 }
1110
1111 #[test]
1112 fn test_list_parquet_files_empty() {
1113 let temp_dir = TempDir::new().unwrap();
1114 let config = CompactionConfig::default();
1115 let manager = CompactionManager::new(temp_dir.path(), config);
1116
1117 let files = manager.list_parquet_files().unwrap();
1118 assert!(files.is_empty());
1119 }
1120
1121 #[test]
1122 fn test_list_parquet_files_with_non_parquet() {
1123 let temp_dir = TempDir::new().unwrap();
1124 let config = CompactionConfig::default();
1125 let manager = CompactionManager::new(temp_dir.path(), config);
1126
1127 std::fs::write(temp_dir.path().join("test.txt"), "test").unwrap();
1129 std::fs::write(temp_dir.path().join("data.json"), "{}").unwrap();
1130
1131 let files = manager.list_parquet_files().unwrap();
1132 assert!(files.is_empty()); }
1134
1135 fn ingest_and_flush_per_call(storage_dir: &std::path::Path, tenant: &str, count: usize) {
1140 for i in 0..count {
1144 let storage = ParquetStorage::with_config(
1145 storage_dir,
1146 crate::infrastructure::persistence::ParquetStorageConfig {
1147 batch_size: 1,
1148 ..Default::default()
1149 },
1150 )
1151 .unwrap();
1152 let event = crate::domain::entities::Event::from_strings(
1153 "test.event".to_string(),
1154 format!("{tenant}-{i}"),
1155 tenant.to_string(),
1156 serde_json::json!({"i": i}),
1157 None,
1158 )
1159 .unwrap();
1160 storage.append_event(event).unwrap();
1161 storage.flush().unwrap();
1162 }
1163 }
1164
1165 #[test]
1166 fn test_compact_tenant_emits_one_snapshot_and_removes_originals() {
1167 let temp_dir = TempDir::new().unwrap();
1168
1169 ingest_and_flush_per_call(temp_dir.path(), "alice", 4);
1171
1172 let config = CompactionConfig {
1173 min_files_to_compact: 2,
1174 small_file_threshold: 100 * 1024 * 1024,
1175 strategy: CompactionStrategy::SizeBased,
1176 ..Default::default()
1177 };
1178 let manager = CompactionManager::new(temp_dir.path(), config);
1179
1180 let result = manager.compact_tenant("alice").unwrap();
1181 assert_eq!(result.files_compacted, 4);
1182 assert_eq!(result.events_compacted, 4);
1183
1184 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1187 let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1188 assert_eq!(
1189 alice_files.len(),
1190 1,
1191 "expected exactly one snapshot file for alice"
1192 );
1193
1194 let name = alice_files[0]
1195 .file_name()
1196 .and_then(|n| n.to_str())
1197 .unwrap()
1198 .to_string();
1199 assert!(
1200 name.starts_with("snapshot.alice."),
1201 "expected snapshot prefix, got {name}"
1202 );
1203 assert!(name.ends_with(".parquet"));
1204
1205 let tmps: Vec<_> = std::fs::read_dir(alice_files[0].parent().unwrap())
1207 .unwrap()
1208 .filter_map(std::result::Result::ok)
1209 .filter(|e| e.path().to_string_lossy().ends_with(".tmp"))
1210 .collect();
1211 assert!(tmps.is_empty());
1212
1213 let loaded = storage.load_events_for_tenant("alice").unwrap();
1215 assert_eq!(loaded.len(), 4);
1216 for e in &loaded {
1217 assert_eq!(e.tenant_id_str(), "alice");
1218 }
1219 }
1220
1221 #[test]
1222 fn test_compact_tenant_skips_existing_snapshot_files() {
1223 let temp_dir = TempDir::new().unwrap();
1228 ingest_and_flush_per_call(temp_dir.path(), "alice", 4);
1229
1230 let config = CompactionConfig {
1231 min_files_to_compact: 2,
1232 small_file_threshold: 100 * 1024 * 1024,
1233 ..Default::default()
1234 };
1235 let manager = CompactionManager::new(temp_dir.path(), config);
1236
1237 let r1 = manager.compact_tenant("alice").unwrap();
1238 assert_eq!(r1.files_compacted, 4);
1239
1240 let r2 = manager.compact_tenant("alice").unwrap();
1241 assert_eq!(r2.files_compacted, 0, "snapshot must not be re-compacted");
1242
1243 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1245 let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1246 assert_eq!(alice_files.len(), 1);
1247 }
1248
1249 #[test]
1250 fn test_compact_tenant_below_threshold_is_a_noop() {
1251 let temp_dir = TempDir::new().unwrap();
1253 ingest_and_flush_per_call(temp_dir.path(), "alice", 1);
1254
1255 let manager = CompactionManager::new(temp_dir.path(), CompactionConfig::default());
1256 let result = manager.compact_tenant("alice").unwrap();
1257 assert_eq!(result.files_compacted, 0);
1258
1259 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1261 let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1262 assert_eq!(alice_files.len(), 1);
1263 let name = alice_files[0]
1264 .file_name()
1265 .unwrap()
1266 .to_string_lossy()
1267 .into_owned();
1268 assert!(
1269 !name.starts_with("snapshot."),
1270 "raw file must not be renamed"
1271 );
1272 }
1273
1274 #[test]
1275 fn test_compact_iterates_every_tenant() {
1276 let temp_dir = TempDir::new().unwrap();
1279 ingest_and_flush_per_call(temp_dir.path(), "alice", 3);
1280 ingest_and_flush_per_call(temp_dir.path(), "bob", 3);
1281
1282 let config = CompactionConfig {
1283 min_files_to_compact: 2,
1284 small_file_threshold: 100 * 1024 * 1024,
1285 strategy: CompactionStrategy::SizeBased,
1286 ..Default::default()
1287 };
1288 let manager = CompactionManager::new(temp_dir.path(), config);
1289
1290 let result = manager.compact().unwrap();
1291 assert_eq!(result.files_compacted, 6);
1292 assert_eq!(result.events_compacted, 6);
1293
1294 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1295 for tenant in ["alice", "bob"] {
1296 let files = storage.list_parquet_files_for_tenant(tenant).unwrap();
1297 assert_eq!(files.len(), 1, "{tenant} should have one snapshot");
1298 let name = files[0].file_name().unwrap().to_string_lossy().into_owned();
1299 assert!(name.starts_with(&format!("snapshot.{tenant}.")));
1300 }
1301 }
1302
1303 #[test]
1304 fn test_retention_drops_events_older_than_ttl() {
1305 let temp_dir = TempDir::new().unwrap();
1309
1310 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1315 let now = Utc::now();
1316 for i in 0..100 {
1317 let day_offset = 60 - (i * 60 / 99);
1319 let ts = now - chrono::Duration::days(i64::from(day_offset));
1320 let event = crate::domain::entities::Event::reconstruct_from_strings(
1321 uuid::Uuid::new_v4(),
1322 "test.event".to_string(),
1323 format!("e-{i}"),
1324 "alice".to_string(),
1325 serde_json::json!({"i": i}),
1326 ts,
1327 None,
1328 1,
1329 );
1330 storage.append_event(event).unwrap();
1331 if i % 10 == 9 {
1335 storage.flush().unwrap();
1336 }
1337 }
1338 storage.flush().unwrap();
1339
1340 let mut retention = RetentionConfig::default();
1342 retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
1343 let config = CompactionConfig {
1344 min_files_to_compact: 2,
1345 small_file_threshold: 100 * 1024 * 1024,
1346 strategy: CompactionStrategy::SizeBased,
1347 retention,
1348 ..Default::default()
1349 };
1350 let manager = CompactionManager::new(temp_dir.path(), config);
1351
1352 let result = manager.compact_tenant("alice").unwrap();
1353 assert!(result.events_compacted > 0);
1354 assert!(
1355 result.events_compacted < 100,
1356 "retention should have dropped some events; kept {} of 100",
1357 result.events_compacted
1358 );
1359
1360 let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
1362 let loaded = storage2.load_events_for_tenant("alice").unwrap();
1363 assert_eq!(loaded.len(), result.events_compacted);
1364
1365 let cutoff = Utc::now() - chrono::Duration::days(30);
1368 for e in &loaded {
1369 assert!(
1370 e.timestamp >= cutoff - chrono::Duration::seconds(60),
1371 "event with ts {} survived retention but is older than cutoff {}",
1372 e.timestamp.to_rfc3339(),
1373 cutoff.to_rfc3339()
1374 );
1375 }
1376 }
1377
1378 #[test]
1379 fn test_retention_keeps_forever_by_default_for_non_system_tenants() {
1380 let temp_dir = TempDir::new().unwrap();
1383 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1384 let now = Utc::now();
1385 for i in 0..6 {
1386 let ts = now - chrono::Duration::days(i * 365);
1387 let event = crate::domain::entities::Event::reconstruct_from_strings(
1388 uuid::Uuid::new_v4(),
1389 "test.event".to_string(),
1390 format!("e-{i}"),
1391 "alice".to_string(),
1392 serde_json::json!({"i": i}),
1393 ts,
1394 None,
1395 1,
1396 );
1397 storage.append_event(event).unwrap();
1398 if i % 2 == 1 {
1399 storage.flush().unwrap();
1400 }
1401 }
1402 storage.flush().unwrap();
1403
1404 let config = CompactionConfig {
1406 min_files_to_compact: 2,
1407 small_file_threshold: 100 * 1024 * 1024,
1408 strategy: CompactionStrategy::SizeBased,
1409 ..Default::default()
1410 };
1411 let manager = CompactionManager::new(temp_dir.path(), config);
1412 let result = manager.compact_tenant("alice").unwrap();
1413 assert_eq!(result.events_compacted, 6, "no events should be dropped");
1414 }
1415
1416 #[test]
1417 fn test_retention_system_tenant_default_is_30_days() {
1418 let cfg = RetentionConfig::default();
1422 let ttl = cfg.ttl_for("system").unwrap();
1423 assert_eq!(ttl.as_secs(), 30 * 24 * 3600);
1424 assert!(cfg.ttl_for("acme").is_none());
1426 }
1427
1428 #[test]
1429 fn test_retention_drops_all_events_deletes_originals_without_snapshot() {
1430 let temp_dir = TempDir::new().unwrap();
1434 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1435 let very_old = Utc::now() - chrono::Duration::days(90);
1436 for i in 0..6 {
1437 let event = crate::domain::entities::Event::reconstruct_from_strings(
1438 uuid::Uuid::new_v4(),
1439 "test.event".to_string(),
1440 format!("e-{i}"),
1441 "alice".to_string(),
1442 serde_json::json!({"i": i}),
1443 very_old,
1444 None,
1445 1,
1446 );
1447 storage.append_event(event).unwrap();
1448 if i % 2 == 1 {
1449 storage.flush().unwrap();
1450 }
1451 }
1452 storage.flush().unwrap();
1453
1454 let mut retention = RetentionConfig::default();
1455 retention.set("alice", Some(Duration::from_secs(7 * 24 * 3600)));
1456 let config = CompactionConfig {
1457 min_files_to_compact: 2,
1458 small_file_threshold: 100 * 1024 * 1024,
1459 strategy: CompactionStrategy::SizeBased,
1460 retention,
1461 ..Default::default()
1462 };
1463 let manager = CompactionManager::new(temp_dir.path(), config);
1464 let result = manager.compact_tenant("alice").unwrap();
1465 assert_eq!(result.events_compacted, 0);
1466 assert!(result.files_compacted >= 2); let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
1470 let alice_files = storage2.list_parquet_files_for_tenant("alice").unwrap();
1471 assert!(alice_files.is_empty(), "all originals should be deleted");
1472 }
1473
1474 #[test]
1475 fn test_compaction_with_simulated_crash_leaves_data_recoverable() {
1476 let temp_dir = TempDir::new().unwrap();
1482
1483 ingest_and_flush_per_call(temp_dir.path(), "alice", 3);
1485
1486 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1488 let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1489 let partition = alice_files[0].parent().unwrap().to_path_buf();
1490
1491 let crashed_tmp = partition.join("snapshot.alice.range.parquet.tmp");
1494 std::fs::write(&crashed_tmp, b"partial parquet bytes").unwrap();
1495 assert!(crashed_tmp.is_file());
1496
1497 let storage2 = ParquetStorage::new(temp_dir.path()).unwrap();
1499 assert!(
1500 !crashed_tmp.exists(),
1501 "stale tmp file should have been cleaned by ParquetStorage::new"
1502 );
1503
1504 let events = storage2.load_events_for_tenant("alice").unwrap();
1506 assert_eq!(events.len(), 3);
1507 }
1508
1509 #[test]
1510 fn test_cold_tier_archives_dropped_events_before_deletion() {
1511 use crate::infrastructure::persistence::cold_tier::LocalFsArchive;
1517
1518 let live_dir = TempDir::new().unwrap();
1519 let archive_dir = TempDir::new().unwrap();
1520
1521 let storage = ParquetStorage::new(live_dir.path()).unwrap();
1523 let now = Utc::now();
1524 for i in 0..50 {
1525 let day_offset = 60 - (i * 60 / 49);
1526 let ts = now - chrono::Duration::days(i64::from(day_offset));
1527 let event = crate::domain::entities::Event::reconstruct_from_strings(
1528 uuid::Uuid::new_v4(),
1529 "test.event".to_string(),
1530 format!("e-{i}"),
1531 "alice".to_string(),
1532 serde_json::json!({"i": i}),
1533 ts,
1534 None,
1535 1,
1536 );
1537 storage.append_event(event).unwrap();
1538 if i % 5 == 4 {
1539 storage.flush().unwrap();
1540 }
1541 }
1542 storage.flush().unwrap();
1543
1544 let mut retention = RetentionConfig::default();
1546 retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
1547 let archive: Arc<dyn ArchiveTarget> =
1548 Arc::new(LocalFsArchive::new(archive_dir.path()).unwrap());
1549 let config = CompactionConfig {
1550 min_files_to_compact: 2,
1551 small_file_threshold: 100 * 1024 * 1024,
1552 strategy: CompactionStrategy::SizeBased,
1553 retention,
1554 archive: Some(archive),
1555 ..Default::default()
1556 };
1557 let manager = CompactionManager::new(live_dir.path(), config);
1558
1559 let result = manager.compact_tenant("alice").unwrap();
1560 assert!(result.events_compacted > 0, "some events kept");
1561 assert!(
1562 result.events_compacted < 50,
1563 "some events dropped to retention; kept {} of 50",
1564 result.events_compacted
1565 );
1566
1567 let live_after = ParquetStorage::new(live_dir.path())
1569 .unwrap()
1570 .load_events_for_tenant("alice")
1571 .unwrap();
1572 assert_eq!(live_after.len(), result.events_compacted);
1573
1574 let mut archive_files = vec![];
1577 let mut stack = vec![archive_dir.path().to_path_buf()];
1578 while let Some(d) = stack.pop() {
1579 for entry in std::fs::read_dir(&d).unwrap().flatten() {
1580 let p = entry.path();
1581 if p.is_dir() {
1582 stack.push(p);
1583 } else if p
1584 .file_name()
1585 .is_some_and(|n| n.to_string_lossy().starts_with("archive.alice."))
1586 {
1587 archive_files.push(p);
1588 }
1589 }
1590 }
1591 assert!(
1592 !archive_files.is_empty(),
1593 "archive directory must contain at least one archive.alice.* file"
1594 );
1595
1596 let archive_storage = ParquetStorage::new(archive_dir.path()).unwrap();
1599 let archived = archive_storage.load_events_for_tenant("alice").unwrap();
1600 assert_eq!(
1601 live_after.len() + archived.len(),
1602 50,
1603 "live + archived must equal original event count (live={}, archived={})",
1604 live_after.len(),
1605 archived.len()
1606 );
1607 }
1608
1609 #[test]
1610 fn test_cold_tier_failure_keeps_originals_on_disk() {
1611 let live_dir = TempDir::new().unwrap();
1615 let storage = ParquetStorage::new(live_dir.path()).unwrap();
1616 let now = Utc::now();
1617 for i in 0..20 {
1618 let ts = now - chrono::Duration::days(60 - i);
1619 let event = crate::domain::entities::Event::reconstruct_from_strings(
1620 uuid::Uuid::new_v4(),
1621 "test.event".to_string(),
1622 format!("e-{i}"),
1623 "alice".to_string(),
1624 serde_json::json!({"i": i}),
1625 ts,
1626 None,
1627 1,
1628 );
1629 storage.append_event(event).unwrap();
1630 if i % 5 == 4 {
1631 storage.flush().unwrap();
1632 }
1633 }
1634 storage.flush().unwrap();
1635
1636 let count_files = |dir: &std::path::Path| -> usize {
1639 let mut n = 0;
1640 let mut stack = vec![dir.to_path_buf()];
1641 while let Some(d) = stack.pop() {
1642 for entry in std::fs::read_dir(&d).unwrap().flatten() {
1643 let p = entry.path();
1644 if p.is_dir() {
1645 stack.push(p);
1646 } else if p.extension().is_some_and(|e| e == "parquet") {
1647 n += 1;
1648 }
1649 }
1650 }
1651 n
1652 };
1653 let before = count_files(live_dir.path());
1654 assert!(before > 0);
1655
1656 #[derive(Debug)]
1657 struct FailingArchive;
1658 impl ArchiveTarget for FailingArchive {
1659 fn archive(
1660 &self,
1661 _: &str,
1662 _: DateTime<Utc>,
1663 _: DateTime<Utc>,
1664 _: &[crate::domain::entities::Event],
1665 ) -> Result<()> {
1666 Err(AllSourceError::StorageError(
1667 "simulated archive outage".to_string(),
1668 ))
1669 }
1670 }
1671
1672 let mut retention = RetentionConfig::default();
1673 retention.set("alice", Some(Duration::from_secs(30 * 24 * 3600)));
1674 let config = CompactionConfig {
1675 min_files_to_compact: 2,
1676 small_file_threshold: 100 * 1024 * 1024,
1677 strategy: CompactionStrategy::SizeBased,
1678 retention,
1679 archive: Some(Arc::new(FailingArchive) as Arc<dyn ArchiveTarget>),
1680 ..Default::default()
1681 };
1682 let manager = CompactionManager::new(live_dir.path(), config);
1683
1684 let result = manager.compact_tenant("alice");
1685 assert!(result.is_err(), "compaction must fail when archive fails");
1686
1687 let after = count_files(live_dir.path());
1689 assert_eq!(
1690 before, after,
1691 "no files should be removed after archive failure"
1692 );
1693
1694 let storage2 = ParquetStorage::new(live_dir.path()).unwrap();
1695 let loaded = storage2.load_events_for_tenant("alice").unwrap();
1696 assert_eq!(
1697 loaded.len(),
1698 20,
1699 "all 20 events still present after failed archive"
1700 );
1701 }
1702
1703 #[test]
1704 fn test_cold_tier_not_invoked_when_no_events_dropped() {
1705 let live_dir = TempDir::new().unwrap();
1709 let storage = ParquetStorage::new(live_dir.path()).unwrap();
1710 let now = Utc::now();
1711 for i in 0..10 {
1712 let ts = now - chrono::Duration::hours(i);
1713 let event = crate::domain::entities::Event::reconstruct_from_strings(
1714 uuid::Uuid::new_v4(),
1715 "test.event".to_string(),
1716 format!("e-{i}"),
1717 "alice".to_string(),
1718 serde_json::json!({"i": i}),
1719 ts,
1720 None,
1721 1,
1722 );
1723 storage.append_event(event).unwrap();
1724 if i % 3 == 2 {
1725 storage.flush().unwrap();
1726 }
1727 }
1728 storage.flush().unwrap();
1729
1730 #[derive(Debug)]
1731 struct PanickingArchive;
1732 impl ArchiveTarget for PanickingArchive {
1733 fn archive(
1734 &self,
1735 _: &str,
1736 _: DateTime<Utc>,
1737 _: DateTime<Utc>,
1738 _: &[crate::domain::entities::Event],
1739 ) -> Result<()> {
1740 panic!("archive must not be called when no events are dropped");
1741 }
1742 }
1743
1744 let config = CompactionConfig {
1746 min_files_to_compact: 2,
1747 small_file_threshold: 100 * 1024 * 1024,
1748 strategy: CompactionStrategy::SizeBased,
1749 archive: Some(Arc::new(PanickingArchive) as Arc<dyn ArchiveTarget>),
1750 ..Default::default()
1751 };
1752 let manager = CompactionManager::new(live_dir.path(), config);
1753 let result = manager.compact_tenant("alice").unwrap();
1754 assert_eq!(result.events_compacted, 10);
1755 }
1756
1757 #[test]
1758 fn test_discover_tenants_skips_system_and_hidden() {
1759 let temp_dir = TempDir::new().unwrap();
1762 std::fs::create_dir_all(temp_dir.path().join("alice")).unwrap();
1763 std::fs::create_dir_all(temp_dir.path().join("bob")).unwrap();
1764 std::fs::create_dir_all(temp_dir.path().join("__system")).unwrap();
1765 std::fs::create_dir_all(temp_dir.path().join(".hidden")).unwrap();
1766
1767 let manager = CompactionManager::new(temp_dir.path(), CompactionConfig::default());
1768 let tenants = manager.discover_tenants().unwrap();
1769 assert_eq!(tenants, vec!["alice".to_string(), "bob".to_string()]);
1770 }
1771}