1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4};
5use arrow::{
6 array::{
7 Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8 UInt64Builder,
9 },
10 datatypes::{DataType, Field, Schema, TimeUnit},
11 record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15 collections::HashMap,
16 fs::{self, File},
17 path::{Path, PathBuf},
18 sync::{
19 Arc, Mutex,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25pub const DEFAULT_BATCH_SIZE: usize = 10_000;
27
28pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
30
31#[derive(Debug, Clone)]
33pub struct ParquetStorageConfig {
34 pub batch_size: usize,
36 pub flush_timeout: Duration,
38 pub compression: parquet::basic::Compression,
40}
41
42impl Default for ParquetStorageConfig {
43 fn default() -> Self {
44 Self {
45 batch_size: DEFAULT_BATCH_SIZE,
46 flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
47 compression: parquet::basic::Compression::SNAPPY,
48 }
49 }
50}
51
52impl ParquetStorageConfig {
53 pub fn high_throughput() -> Self {
55 Self {
56 batch_size: 50_000,
57 flush_timeout: Duration::from_secs(10),
58 compression: parquet::basic::Compression::SNAPPY,
59 }
60 }
61
62 pub fn low_latency() -> Self {
64 Self {
65 batch_size: 1_000,
66 flush_timeout: Duration::from_secs(1),
67 compression: parquet::basic::Compression::SNAPPY,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Default)]
74pub struct BatchWriteStats {
75 pub batches_written: u64,
77 pub events_written: u64,
79 pub bytes_written: u64,
81 pub avg_batch_size: f64,
83 pub events_per_sec: f64,
85 pub total_write_time_ns: u64,
87 pub timeout_flushes: u64,
89 pub size_flushes: u64,
91}
92
93#[derive(Debug, Clone)]
95pub struct BatchWriteResult {
96 pub events_written: usize,
98 pub batches_flushed: usize,
100 pub duration: Duration,
102 pub events_per_sec: f64,
104}
105
106pub struct ParquetStorage {
115 storage_dir: PathBuf,
117
118 current_batches: Mutex<HashMap<String, Vec<Event>>>,
127
128 config: ParquetStorageConfig,
130
131 schema: Arc<Schema>,
133
134 last_flush_time: Mutex<Instant>,
136
137 batches_written: AtomicU64,
139 events_written: AtomicU64,
140 bytes_written: AtomicU64,
141 total_write_time_ns: AtomicU64,
142 timeout_flushes: AtomicU64,
143 size_flushes: AtomicU64,
144}
145
146impl ParquetStorage {
147 pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
149 Self::with_config(storage_dir, ParquetStorageConfig::default())
150 }
151
152 pub fn with_config(
154 storage_dir: impl AsRef<Path>,
155 config: ParquetStorageConfig,
156 ) -> Result<Self> {
157 let storage_dir = storage_dir.as_ref().to_path_buf();
158
159 fs::create_dir_all(&storage_dir).map_err(|e| {
161 AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
162 })?;
163
164 let schema = Arc::new(Schema::new(vec![
166 Field::new("event_id", DataType::Utf8, false),
167 Field::new("event_type", DataType::Utf8, false),
168 Field::new("entity_id", DataType::Utf8, false),
169 Field::new("payload", DataType::Utf8, false),
170 Field::new(
171 "timestamp",
172 DataType::Timestamp(TimeUnit::Microsecond, None),
173 false,
174 ),
175 Field::new("metadata", DataType::Utf8, true),
176 Field::new("version", DataType::UInt64, false),
177 ]));
178
179 let storage = Self {
180 storage_dir,
181 current_batches: Mutex::new(HashMap::new()),
182 config,
183 schema,
184 last_flush_time: Mutex::new(Instant::now()),
185 batches_written: AtomicU64::new(0),
186 events_written: AtomicU64::new(0),
187 bytes_written: AtomicU64::new(0),
188 total_write_time_ns: AtomicU64::new(0),
189 timeout_flushes: AtomicU64::new(0),
190 size_flushes: AtomicU64::new(0),
191 };
192
193 match storage.cleanup_partial_writes() {
198 Ok(0) => {}
199 Ok(n) => tracing::warn!(
200 "cleanup_partial_writes acted on {n} crash-detritus file(s) on boot — \
201 see preceding logs for per-file detail"
202 ),
203 Err(e) => tracing::error!("cleanup_partial_writes failed on boot: {e}"),
204 }
205
206 Ok(storage)
207 }
208
209 #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
211 pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
212 Self::with_config(
213 storage_dir,
214 ParquetStorageConfig {
215 batch_size: 1000,
216 ..Default::default()
217 },
218 )
219 }
220
221 #[cfg_attr(feature = "hotpath", hotpath::measure)]
231 pub fn append_event(&self, event: Event) -> Result<()> {
232 let tenant = event.tenant_id_str().to_string();
233 let should_flush_tenant = {
234 let mut batches = self.current_batches.lock().unwrap();
235 let entry = batches.entry(tenant.clone()).or_default();
236 entry.push(event);
237 entry.len() >= self.config.batch_size
238 };
239
240 if should_flush_tenant {
241 self.size_flushes.fetch_add(1, Ordering::Relaxed);
242 self.flush_tenant(&tenant)?;
243 }
244
245 Ok(())
246 }
247
248 #[cfg_attr(feature = "hotpath", hotpath::measure)]
254 pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
255 let start = Instant::now();
256 let event_count = events.len();
257
258 let mut grouped: HashMap<String, Vec<Event>> = HashMap::new();
261 for event in events {
262 grouped
263 .entry(event.tenant_id_str().to_string())
264 .or_default()
265 .push(event);
266 }
267
268 let mut tenants_to_flush: Vec<String> = Vec::new();
269 {
270 let mut batches = self.current_batches.lock().unwrap();
271 for (tenant, mut new_events) in grouped {
272 let entry = batches.entry(tenant.clone()).or_default();
273 entry.append(&mut new_events);
274 if entry.len() >= self.config.batch_size {
275 tenants_to_flush.push(tenant);
276 }
277 }
278 }
279
280 let mut batches_flushed = 0;
281 for tenant in tenants_to_flush {
282 self.size_flushes.fetch_add(1, Ordering::Relaxed);
283 self.flush_tenant(&tenant)?;
284 batches_flushed += 1;
285 }
286
287 let duration = start.elapsed();
288
289 Ok(BatchWriteResult {
290 events_written: event_count,
291 batches_flushed,
292 duration,
293 events_per_sec: event_count as f64 / duration.as_secs_f64(),
294 })
295 }
296
297 #[cfg_attr(feature = "hotpath", hotpath::measure)]
305 pub fn check_timeout_flush(&self) -> Result<bool> {
306 let should_flush = {
307 let last_flush = self.last_flush_time.lock().unwrap();
308 let batches = self.current_batches.lock().unwrap();
309 let any_pending = batches.values().any(|v| !v.is_empty());
310 any_pending && last_flush.elapsed() >= self.config.flush_timeout
311 };
312
313 if should_flush {
314 self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
315 self.flush()?;
316 Ok(true)
317 } else {
318 Ok(false)
319 }
320 }
321
322 #[cfg_attr(feature = "hotpath", hotpath::measure)]
329 pub fn flush(&self) -> Result<()> {
330 let tenants: Vec<String> = {
331 let batches = self.current_batches.lock().unwrap();
332 batches
333 .iter()
334 .filter(|(_, v)| !v.is_empty())
335 .map(|(k, _)| k.clone())
336 .collect()
337 };
338 if tenants.is_empty() {
339 return Ok(());
340 }
341 for tenant in tenants {
342 self.flush_tenant(&tenant)?;
343 }
344 Ok(())
345 }
346
347 fn flush_tenant(&self, tenant_id: &str) -> Result<()> {
362 let events_to_write = {
363 let mut batches = self.current_batches.lock().unwrap();
364 match batches.get_mut(tenant_id) {
365 Some(v) if !v.is_empty() => std::mem::take(v),
366 _ => return Ok(()),
367 }
368 };
369
370 let batch_count = events_to_write.len();
371 let start = Instant::now();
372
373 let record_batch = self.events_to_record_batch(&events_to_write)?;
374
375 let now = chrono::Utc::now();
376 let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, now)?;
377 fs::create_dir_all(&partition_dir).map_err(|e| {
378 AllSourceError::StorageError(format!(
379 "Failed to create tenant partition {}: {e}",
380 partition_dir.display()
381 ))
382 })?;
383 let file_stem = format!(
384 "events-{}-{}",
385 now.format("%Y%m%d-%H%M%S%3f"),
386 uuid::Uuid::new_v4().as_simple()
387 );
388
389 tracing::info!(
390 "Flushing {} events for tenant={} to {}/{}.parquet",
391 batch_count,
392 tenant_id,
393 partition_dir.display(),
394 file_stem
395 );
396
397 let (file_path, file_metadata) =
398 self.write_record_batch_atomic(&partition_dir, &file_stem, &record_batch)?;
399
400 let duration = start.elapsed();
401
402 self.batches_written.fetch_add(1, Ordering::Relaxed);
403 self.events_written
404 .fetch_add(batch_count as u64, Ordering::Relaxed);
405 if let Some(size) = file_metadata
406 .row_groups()
407 .first()
408 .map(parquet::file::metadata::RowGroupMetaData::total_byte_size)
409 {
410 self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
411 }
412 self.total_write_time_ns
413 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
414
415 {
416 let mut last_flush = self.last_flush_time.lock().unwrap();
417 *last_flush = Instant::now();
418 }
419
420 tracing::info!(
421 "Wrote {} events for tenant={} to {} in {:?}",
422 batch_count,
423 tenant_id,
424 file_path.display(),
425 duration
426 );
427
428 Ok(())
429 }
430
431 fn write_record_batch_atomic(
448 &self,
449 partition_dir: &Path,
450 file_stem: &str,
451 record_batch: &RecordBatch,
452 ) -> Result<(PathBuf, parquet::file::metadata::ParquetMetaData)> {
453 let final_path = partition_dir.join(format!("{file_stem}.parquet"));
454 let tmp_path = partition_dir.join(format!("{file_stem}.parquet.tmp"));
455
456 let metadata = {
458 let file = File::create(&tmp_path).map_err(|e| {
459 AllSourceError::StorageError(format!(
460 "Failed to create parquet tmp file {}: {e}",
461 tmp_path.display()
462 ))
463 })?;
464
465 let props = WriterProperties::builder()
466 .set_compression(self.config.compression)
467 .build();
468
469 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
470 writer.write(record_batch)?;
471 writer.close()?
474 };
475
476 let tmp_file = File::open(&tmp_path).map_err(|e| {
478 AllSourceError::StorageError(format!(
479 "Failed to reopen parquet tmp for fsync {}: {e}",
480 tmp_path.display()
481 ))
482 })?;
483 tmp_file.sync_all().map_err(|e| {
484 AllSourceError::StorageError(format!("fsync on parquet tmp failed: {e}"))
485 })?;
486 drop(tmp_file);
487
488 fs::rename(&tmp_path, &final_path).map_err(|e| {
490 AllSourceError::StorageError(format!(
491 "Failed to rename {} → {}: {e}",
492 tmp_path.display(),
493 final_path.display()
494 ))
495 })?;
496
497 if let Ok(dir) = File::open(partition_dir) {
501 let _ = dir.sync_all();
502 }
503
504 Ok((final_path, metadata))
505 }
506
507 pub fn write_atomic_parquet(
531 &self,
532 tenant_id: &str,
533 file_stem: &str,
534 events: &[Event],
535 ) -> Result<PathBuf> {
536 if events.is_empty() {
537 return Err(AllSourceError::StorageError(
538 "write_atomic_parquet called with empty event slice".to_string(),
539 ));
540 }
541 let anchor_ts = events
546 .iter()
547 .map(|e| e.timestamp)
548 .min()
549 .unwrap_or_else(chrono::Utc::now);
550 let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, anchor_ts)?;
551 fs::create_dir_all(&partition_dir).map_err(|e| {
552 AllSourceError::StorageError(format!(
553 "Failed to create tenant partition {}: {e}",
554 partition_dir.display()
555 ))
556 })?;
557
558 let record_batch = self.events_to_record_batch(events)?;
559 let (final_path, _meta) =
560 self.write_record_batch_atomic(&partition_dir, file_stem, &record_batch)?;
561
562 tracing::info!(
563 tenant_id = tenant_id,
564 file = %final_path.display(),
565 event_count = events.len(),
566 "wrote atomic snapshot file"
567 );
568
569 Ok(final_path)
570 }
571
572 pub fn cleanup_partial_writes(&self) -> Result<usize> {
595 let mut acted = 0usize;
596 let mut stack: Vec<PathBuf> = vec![self.storage_dir.clone()];
597 while let Some(dir) = stack.pop() {
598 let Ok(entries) = fs::read_dir(&dir) else {
599 continue;
600 };
601 for entry in entries.flatten() {
602 let path = entry.path();
603 let Ok(ft) = entry.file_type() else { continue };
604 if ft.is_dir() {
605 stack.push(path);
606 continue;
607 }
608 if !ft.is_file() {
609 continue;
610 }
611 let path_str = path.to_string_lossy();
612 if path_str.ends_with(".parquet.tmp") {
613 match fs::remove_file(&path) {
614 Ok(()) => {
615 tracing::warn!(
616 file = %path.display(),
617 "cleaned up orphan snapshot tmp file (crash recovery)"
618 );
619 acted += 1;
620 }
621 Err(e) => {
622 tracing::error!(
623 file = %path.display(),
624 "failed to remove orphan snapshot tmp file: {e}"
625 );
626 }
627 }
628 } else if path_str.ends_with(".parquet")
629 && fs::metadata(&path).is_ok_and(|m| m.len() == 0)
630 {
631 let ts = chrono::Utc::now().timestamp();
635 let quarantine_path = path.with_extension(format!("parquet.corrupt-{ts}"));
636 match fs::rename(&path, &quarantine_path) {
637 Ok(()) => {
638 tracing::error!(
639 from = %path.display(),
640 to = %quarantine_path.display(),
641 "quarantined 0-byte parquet file (issue #166 pre-fix crash). \
642 Operator: inspect and rm if not needed."
643 );
644 acted += 1;
645 }
646 Err(e) => {
647 tracing::error!(
648 file = %path.display(),
649 "failed to quarantine 0-byte parquet file: {e}"
650 );
651 }
652 }
653 }
654 }
655 }
656 Ok(acted)
657 }
658
659 #[cfg_attr(feature = "hotpath", hotpath::measure)]
665 pub fn flush_on_shutdown(&self) -> Result<usize> {
666 let total_pending: usize = {
667 let batches = self.current_batches.lock().unwrap();
668 batches.values().map(Vec::len).sum()
669 };
670
671 if total_pending > 0 {
672 tracing::info!(
673 "Shutdown: flushing {} pending events across all tenants",
674 total_pending
675 );
676 self.flush()?;
677 }
678
679 Ok(total_pending)
680 }
681
682 pub fn batch_stats(&self) -> BatchWriteStats {
684 let batches = self.batches_written.load(Ordering::Relaxed);
685 let events = self.events_written.load(Ordering::Relaxed);
686 let bytes = self.bytes_written.load(Ordering::Relaxed);
687 let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
688
689 let time_secs = time_ns as f64 / 1_000_000_000.0;
690
691 BatchWriteStats {
692 batches_written: batches,
693 events_written: events,
694 bytes_written: bytes,
695 avg_batch_size: if batches > 0 {
696 events as f64 / batches as f64
697 } else {
698 0.0
699 },
700 events_per_sec: if time_secs > 0.0 {
701 events as f64 / time_secs
702 } else {
703 0.0
704 },
705 total_write_time_ns: time_ns,
706 timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
707 size_flushes: self.size_flushes.load(Ordering::Relaxed),
708 }
709 }
710
711 pub fn pending_count(&self) -> usize {
713 self.current_batches
714 .lock()
715 .unwrap()
716 .values()
717 .map(Vec::len)
718 .sum()
719 }
720
721 pub fn batch_size(&self) -> usize {
723 self.config.batch_size
724 }
725
726 pub fn flush_timeout(&self) -> Duration {
728 self.config.flush_timeout
729 }
730
731 #[cfg_attr(feature = "hotpath", hotpath::measure)]
733 fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
734 let mut event_id_builder = StringBuilder::new();
735 let mut event_type_builder = StringBuilder::new();
736 let mut entity_id_builder = StringBuilder::new();
737 let mut payload_builder = StringBuilder::new();
738 let mut timestamp_builder = TimestampMicrosecondBuilder::new();
739 let mut metadata_builder = StringBuilder::new();
740 let mut version_builder = UInt64Builder::new();
741
742 for event in events {
743 event_id_builder.append_value(event.id.to_string());
744 event_type_builder.append_value(event.event_type_str());
745 entity_id_builder.append_value(event.entity_id_str());
746 payload_builder.append_value(serde_json::to_string(&event.payload)?);
747
748 let timestamp_micros = event.timestamp.timestamp_micros();
750 timestamp_builder.append_value(timestamp_micros);
751
752 if let Some(ref metadata) = event.metadata {
753 metadata_builder.append_value(serde_json::to_string(metadata)?);
754 } else {
755 metadata_builder.append_null();
756 }
757
758 version_builder.append_value(event.version as u64);
759 }
760
761 let arrays: Vec<ArrayRef> = vec![
762 Arc::new(event_id_builder.finish()),
763 Arc::new(event_type_builder.finish()),
764 Arc::new(entity_id_builder.finish()),
765 Arc::new(payload_builder.finish()),
766 Arc::new(timestamp_builder.finish()),
767 Arc::new(metadata_builder.finish()),
768 Arc::new(version_builder.finish()),
769 ];
770
771 let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
772
773 Ok(record_batch)
774 }
775
776 #[cfg_attr(feature = "hotpath", hotpath::measure)]
783 pub fn load_all_events(&self) -> Result<Vec<Event>> {
784 let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
785
786 let mut all_events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
787 let mut skipped = 0usize;
788 for file_path in parquet_files {
789 tracing::info!("Loading events from {}", file_path.display());
790 let tenant_id = tenant_id_from_path(&self.storage_dir, &file_path);
791 match self.load_events_from_file(&file_path, &tenant_id) {
796 Ok(file_events) => all_events.extend(file_events),
797 Err(e) => {
798 tracing::error!(
799 file = %file_path.display(),
800 error = %e,
801 "Skipping unreadable parquet file — other files will still load. \
802 Likely a 0-byte or truncated file from an unclean shutdown; \
803 inspect and remove manually after confirming."
804 );
805 skipped += 1;
806 }
807 }
808 }
809
810 if skipped > 0 {
811 tracing::warn!(
812 "Loaded {} events from storage; skipped {} unreadable file(s)",
813 all_events.len(),
814 skipped
815 );
816 } else {
817 tracing::info!("Loaded {} total events from storage", all_events.len());
818 }
819
820 Ok(all_events)
821 }
822
823 #[cfg_attr(feature = "hotpath", hotpath::measure)]
829 fn load_events_from_file(&self, file_path: &Path, tenant_id: &str) -> Result<Vec<Event>> {
830 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
831
832 let file = File::open(file_path).map_err(|e| {
833 AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
834 })?;
835
836 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
837 let mut reader = builder.build()?;
838
839 let mut events = Vec::new();
840
841 while let Some(Ok(batch)) = reader.next() {
842 let batch_events = self.record_batch_to_events(&batch, tenant_id)?;
843 events.extend(batch_events);
844 }
845
846 Ok(events)
847 }
848
849 pub fn load_events_from_file_path(
855 &self,
856 file_path: &Path,
857 tenant_id: &str,
858 ) -> Result<Vec<Event>> {
859 self.load_events_from_file(file_path, tenant_id)
860 }
861
862 #[cfg_attr(feature = "hotpath", hotpath::measure)]
866 fn record_batch_to_events(&self, batch: &RecordBatch, tenant_id: &str) -> Result<Vec<Event>> {
867 let event_ids = batch
868 .column(0)
869 .as_any()
870 .downcast_ref::<arrow::array::StringArray>()
871 .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
872
873 let event_types = batch
874 .column(1)
875 .as_any()
876 .downcast_ref::<arrow::array::StringArray>()
877 .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
878
879 let entity_ids = batch
880 .column(2)
881 .as_any()
882 .downcast_ref::<arrow::array::StringArray>()
883 .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
884
885 let payloads = batch
886 .column(3)
887 .as_any()
888 .downcast_ref::<arrow::array::StringArray>()
889 .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
890
891 let timestamps = batch
892 .column(4)
893 .as_any()
894 .downcast_ref::<TimestampMicrosecondArray>()
895 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
896
897 let metadatas = batch
898 .column(5)
899 .as_any()
900 .downcast_ref::<arrow::array::StringArray>()
901 .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
902
903 let versions = batch
904 .column(6)
905 .as_any()
906 .downcast_ref::<arrow::array::UInt64Array>()
907 .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
908
909 let mut events = Vec::new();
910
911 for i in 0..batch.num_rows() {
912 let id = uuid::Uuid::parse_str(event_ids.value(i))
913 .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
914
915 let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
916 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
917
918 let metadata = if metadatas.is_null(i) {
919 None
920 } else {
921 Some(serde_json::from_str(metadatas.value(i))?)
922 };
923
924 let event = Event::reconstruct_from_strings(
925 id,
926 event_types.value(i).to_string(),
927 entity_ids.value(i).to_string(),
928 tenant_id.to_string(),
929 serde_json::from_str(payloads.value(i))?,
930 timestamp,
931 metadata,
932 versions.value(i) as i64,
933 );
934
935 events.push(event);
936 }
937
938 Ok(events)
939 }
940
941 pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
947 find_parquet_files_recursive(&self.storage_dir)
948 }
949
950 pub fn list_parquet_files_for_tenant(&self, tenant_id: &str) -> Result<Vec<PathBuf>> {
963 let safe = sanitize_tenant_id_for_path(tenant_id)?;
964 let tenant_root = self.storage_dir.join(safe);
965 if !tenant_root.is_dir() {
966 return Ok(Vec::new());
967 }
968 find_parquet_files_recursive(&tenant_root)
969 }
970
971 #[cfg_attr(feature = "hotpath", hotpath::measure)]
989 pub fn load_events_for_tenant(&self, tenant_id: &str) -> Result<Vec<Event>> {
990 let parquet_files = self.list_parquet_files_for_tenant(tenant_id)?;
991 tracing::info!(
992 tenant_id = tenant_id,
993 file_count = parquet_files.len(),
994 "load_events_for_tenant: walking tenant subtree only"
995 );
996
997 let mut events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
998 let mut skipped = 0usize;
999 for file_path in parquet_files {
1000 tracing::debug!(
1001 tenant_id = tenant_id,
1002 file = %file_path.display(),
1003 "load_events_for_tenant: opening file"
1004 );
1005 match self.load_events_from_file(&file_path, tenant_id) {
1010 Ok(file_events) => events.extend(file_events),
1011 Err(e) => {
1012 tracing::error!(
1013 tenant_id = tenant_id,
1014 file = %file_path.display(),
1015 error = %e,
1016 "Skipping unreadable parquet file in tenant subtree"
1017 );
1018 skipped += 1;
1019 }
1020 }
1021 }
1022
1023 tracing::info!(
1024 tenant_id = tenant_id,
1025 event_count = events.len(),
1026 skipped_files = skipped,
1027 "load_events_for_tenant: complete"
1028 );
1029 Ok(events)
1030 }
1031
1032 pub fn storage_dir(&self) -> &Path {
1034 &self.storage_dir
1035 }
1036
1037 pub fn migrate_flat_layout(&self, dry_run: bool) -> Result<MigrationReport> {
1060 let flat_files = list_flat_layout_files(&self.storage_dir)?;
1061 let mut report = MigrationReport {
1062 dry_run,
1063 ..Default::default()
1064 };
1065
1066 for flat_file in flat_files {
1067 let events = self.load_events_from_file(&flat_file, "default")?;
1071 report.flat_files_seen += 1;
1072
1073 if events.is_empty() {
1074 if !dry_run {
1076 fs::remove_file(&flat_file).map_err(|e| {
1077 AllSourceError::StorageError(format!(
1078 "Failed to remove empty flat file {}: {e}",
1079 flat_file.display()
1080 ))
1081 })?;
1082 }
1083 report.flat_files_removed += 1;
1084 continue;
1085 }
1086
1087 let mut groups: HashMap<(String, String), Vec<Event>> = HashMap::new();
1093 for event in events {
1094 let key = (
1095 event.tenant_id_str().to_string(),
1096 event.timestamp().format("%Y-%m").to_string(),
1097 );
1098 groups.entry(key).or_default().push(event);
1099 }
1100
1101 for ((tenant, yyyy_mm), group_events) in groups {
1102 let count = group_events.len();
1103 if !dry_run {
1104 let safe_tenant = sanitize_tenant_id_for_path(&tenant)?;
1105 let target_dir = self.storage_dir.join(safe_tenant).join(&yyyy_mm);
1106 fs::create_dir_all(&target_dir).map_err(|e| {
1107 AllSourceError::StorageError(format!(
1108 "Failed to create partition {}: {e}",
1109 target_dir.display()
1110 ))
1111 })?;
1112 let filename = format!(
1113 "events-{}-{}.parquet",
1114 chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
1115 uuid::Uuid::new_v4().as_simple()
1116 );
1117 let target_path = target_dir.join(&filename);
1118 let record_batch = self.events_to_record_batch(&group_events)?;
1119 let file = File::create(&target_path).map_err(|e| {
1120 AllSourceError::StorageError(format!(
1121 "Failed to create migration target {}: {e}",
1122 target_path.display()
1123 ))
1124 })?;
1125 let props = WriterProperties::builder()
1126 .set_compression(self.config.compression)
1127 .build();
1128 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
1129 writer.write(&record_batch)?;
1130 writer.close()?;
1131 report.partitions_written += 1;
1132 }
1133 report.events_migrated += count;
1134 }
1135
1136 if !dry_run {
1137 fs::remove_file(&flat_file).map_err(|e| {
1138 AllSourceError::StorageError(format!(
1139 "Failed to remove flat file {} after migration: {e}",
1140 flat_file.display()
1141 ))
1142 })?;
1143 report.flat_files_removed += 1;
1144 }
1145 }
1146
1147 Ok(report)
1148 }
1149
1150 pub fn stats(&self) -> Result<StorageStats> {
1152 let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
1153 let mut total_size_bytes = 0u64;
1154 for path in &parquet_files {
1155 if let Ok(metadata) = fs::metadata(path) {
1156 total_size_bytes += metadata.len();
1157 }
1158 }
1159
1160 let current_batch_size: usize = self
1161 .current_batches
1162 .lock()
1163 .unwrap()
1164 .values()
1165 .map(Vec::len)
1166 .sum();
1167
1168 Ok(StorageStats {
1169 total_files: parquet_files.len(),
1170 total_size_bytes,
1171 storage_dir: self.storage_dir.clone(),
1172 current_batch_size,
1173 })
1174 }
1175}
1176
1177fn sanitize_tenant_id_for_path(tenant_id: &str) -> Result<&str> {
1190 if tenant_id.is_empty() {
1191 return Err(AllSourceError::StorageError(
1192 "tenant_id is empty (cannot derive partition path)".to_string(),
1193 ));
1194 }
1195 if tenant_id.len() > 128 {
1196 return Err(AllSourceError::StorageError(format!(
1197 "tenant_id is too long for partition path: {} bytes (max 128)",
1198 tenant_id.len()
1199 )));
1200 }
1201 if tenant_id == "." || tenant_id == ".." {
1202 return Err(AllSourceError::StorageError(format!(
1203 "tenant_id {tenant_id:?} is reserved"
1204 )));
1205 }
1206 for c in tenant_id.chars() {
1207 let ok = c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.';
1208 if !ok {
1209 return Err(AllSourceError::StorageError(format!(
1210 "tenant_id {tenant_id:?} contains disallowed character {c:?} for partition path"
1211 )));
1212 }
1213 }
1214 Ok(tenant_id)
1215}
1216
1217fn partition_path_for_tenant(
1222 root: &Path,
1223 tenant_id: &str,
1224 when: chrono::DateTime<chrono::Utc>,
1225) -> Result<PathBuf> {
1226 let safe = sanitize_tenant_id_for_path(tenant_id)?;
1227 Ok(root.join(safe).join(when.format("%Y-%m").to_string()))
1228}
1229
1230fn tenant_id_from_path(root: &Path, file_path: &Path) -> String {
1240 let Ok(rel) = file_path.strip_prefix(root) else {
1241 return "default".to_string();
1242 };
1243 let mut comps = rel.components();
1244 let first = comps.next();
1245 let next = comps.next();
1246 match (first, next) {
1247 (Some(std::path::Component::Normal(tenant)), Some(_)) => {
1249 tenant.to_string_lossy().into_owned()
1250 }
1251 _ => "default".to_string(),
1253 }
1254}
1255
1256fn list_flat_layout_files(root: &Path) -> Result<Vec<PathBuf>> {
1262 let entries = fs::read_dir(root).map_err(|e| {
1263 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
1264 })?;
1265 let mut out: Vec<PathBuf> = entries
1266 .filter_map(std::result::Result::ok)
1267 .filter_map(|entry| {
1268 let ft = entry.file_type().ok()?;
1269 if !ft.is_file() {
1270 return None;
1271 }
1272 let path = entry.path();
1273 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
1274 Some(path)
1275 } else {
1276 None
1277 }
1278 })
1279 .collect();
1280 out.sort();
1281 Ok(out)
1282}
1283
1284fn find_parquet_files_recursive(root: &Path) -> Result<Vec<PathBuf>> {
1295 let mut out = Vec::new();
1296 let mut stack: Vec<PathBuf> = vec![root.to_path_buf()];
1297
1298 while let Some(dir) = stack.pop() {
1299 let entries = match fs::read_dir(&dir) {
1300 Ok(e) => e,
1301 Err(e) if dir == root => {
1305 return Err(AllSourceError::StorageError(format!(
1306 "Failed to read storage directory: {e}"
1307 )));
1308 }
1309 Err(_) => continue,
1310 };
1311
1312 for entry in entries.flatten() {
1313 let path = entry.path();
1314 let Ok(ft) = entry.file_type() else {
1318 continue;
1319 };
1320 if ft.is_dir() {
1321 stack.push(path);
1322 } else if ft.is_file()
1323 && path
1324 .extension()
1325 .and_then(|ext| ext.to_str())
1326 .is_some_and(|ext| ext == "parquet")
1327 {
1328 out.push(path);
1329 }
1330 }
1331 }
1332
1333 out.sort();
1334 Ok(out)
1335}
1336
1337impl Drop for ParquetStorage {
1338 fn drop(&mut self) {
1339 if let Err(e) = self.flush_on_shutdown() {
1341 tracing::error!("Failed to flush events on drop: {}", e);
1342 }
1343 }
1344}
1345
1346#[derive(Debug, Default, Clone, serde::Serialize)]
1348pub struct MigrationReport {
1349 pub dry_run: bool,
1351 pub flat_files_seen: usize,
1353 pub flat_files_removed: usize,
1355 pub partitions_written: usize,
1357 pub events_migrated: usize,
1359}
1360
1361#[derive(Debug, serde::Serialize)]
1362pub struct StorageStats {
1363 pub total_files: usize,
1364 pub total_size_bytes: u64,
1365 pub storage_dir: PathBuf,
1366 pub current_batch_size: usize,
1367}
1368
1369#[cfg(test)]
1370mod tests {
1371 use super::*;
1372 use serde_json::json;
1373 use std::sync::Arc;
1374 use tempfile::TempDir;
1375
1376 fn create_test_event(entity_id: &str) -> Event {
1377 Event::reconstruct_from_strings(
1378 uuid::Uuid::new_v4(),
1379 "test.event".to_string(),
1380 entity_id.to_string(),
1381 "default".to_string(),
1382 json!({
1383 "test": "data",
1384 "value": 42
1385 }),
1386 chrono::Utc::now(),
1387 None,
1388 1,
1389 )
1390 }
1391
1392 #[test]
1393 fn test_parquet_storage_write_read() {
1394 let temp_dir = TempDir::new().unwrap();
1395 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1396
1397 for i in 0..10 {
1399 let event = create_test_event(&format!("entity-{i}"));
1400 storage.append_event(event).unwrap();
1401 }
1402
1403 storage.flush().unwrap();
1405
1406 let loaded_events = storage.load_all_events().unwrap();
1408 assert_eq!(loaded_events.len(), 10);
1409 }
1410
1411 #[test]
1412 fn test_storage_stats() {
1413 let temp_dir = TempDir::new().unwrap();
1414 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1415
1416 for i in 0..5 {
1418 storage
1419 .append_event(create_test_event(&format!("entity-{i}")))
1420 .unwrap();
1421 }
1422 storage.flush().unwrap();
1423
1424 let stats = storage.stats().unwrap();
1425 assert_eq!(stats.total_files, 1);
1426 assert!(stats.total_size_bytes > 0);
1427 }
1428
1429 #[test]
1430 fn test_default_batch_size() {
1431 let temp_dir = TempDir::new().unwrap();
1432 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1433
1434 assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
1436 assert_eq!(storage.batch_size(), 10_000);
1437 }
1438
1439 #[test]
1440 fn test_custom_config() {
1441 let temp_dir = TempDir::new().unwrap();
1442 let config = ParquetStorageConfig {
1443 batch_size: 5_000,
1444 flush_timeout: Duration::from_secs(2),
1445 compression: parquet::basic::Compression::SNAPPY,
1446 };
1447 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1448
1449 assert_eq!(storage.batch_size(), 5_000);
1450 assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
1451 }
1452
1453 #[test]
1454 fn test_batch_write() {
1455 let temp_dir = TempDir::new().unwrap();
1456 let config = ParquetStorageConfig {
1457 batch_size: 100, ..Default::default()
1459 };
1460 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1461
1462 let events: Vec<Event> = (0..250)
1470 .map(|i| create_test_event(&format!("entity-{i}")))
1471 .collect();
1472
1473 let result = storage.batch_write(events).unwrap();
1474 assert_eq!(result.events_written, 250);
1475 assert_eq!(result.batches_flushed, 1);
1476 assert_eq!(storage.pending_count(), 0);
1477
1478 storage.flush().unwrap();
1480
1481 let loaded = storage.load_all_events().unwrap();
1483 assert_eq!(loaded.len(), 250);
1484 }
1485
1486 #[test]
1487 fn test_auto_flush_on_batch_size() {
1488 let temp_dir = TempDir::new().unwrap();
1489 let config = ParquetStorageConfig {
1490 batch_size: 10, ..Default::default()
1492 };
1493 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1494
1495 for i in 0..15 {
1497 storage
1498 .append_event(create_test_event(&format!("entity-{i}")))
1499 .unwrap();
1500 }
1501
1502 assert_eq!(storage.pending_count(), 5);
1504
1505 let stats = storage.batch_stats();
1506 assert_eq!(stats.events_written, 10);
1507 assert_eq!(stats.batches_written, 1);
1508 assert_eq!(stats.size_flushes, 1);
1509 }
1510
1511 #[test]
1512 fn test_flush_on_shutdown() {
1513 let temp_dir = TempDir::new().unwrap();
1514 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1515
1516 for i in 0..5 {
1518 storage
1519 .append_event(create_test_event(&format!("entity-{i}")))
1520 .unwrap();
1521 }
1522
1523 assert_eq!(storage.pending_count(), 5);
1524
1525 let flushed = storage.flush_on_shutdown().unwrap();
1527 assert_eq!(flushed, 5);
1528 assert_eq!(storage.pending_count(), 0);
1529
1530 let loaded = storage.load_all_events().unwrap();
1532 assert_eq!(loaded.len(), 5);
1533 }
1534
1535 #[test]
1536 fn test_thread_safe_writes() {
1537 let temp_dir = TempDir::new().unwrap();
1538 let config = ParquetStorageConfig {
1539 batch_size: 100,
1540 ..Default::default()
1541 };
1542 let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
1543
1544 let events_per_thread = 50;
1545 let thread_count = 4;
1546
1547 std::thread::scope(|s| {
1548 for t in 0..thread_count {
1549 let storage_ref = Arc::clone(&storage);
1550 s.spawn(move || {
1551 for i in 0..events_per_thread {
1552 let event = create_test_event(&format!("thread-{t}-entity-{i}"));
1553 storage_ref.append_event(event).unwrap();
1554 }
1555 });
1556 }
1557 });
1558
1559 storage.flush().unwrap();
1561
1562 let loaded = storage.load_all_events().unwrap();
1564 assert_eq!(loaded.len(), events_per_thread * thread_count);
1565 }
1566
1567 #[test]
1568 fn test_batch_stats() {
1569 let temp_dir = TempDir::new().unwrap();
1570 let config = ParquetStorageConfig {
1571 batch_size: 50,
1572 ..Default::default()
1573 };
1574 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1575
1576 let events: Vec<Event> = (0..100)
1581 .map(|i| create_test_event(&format!("entity-{i}")))
1582 .collect();
1583
1584 storage.batch_write(events).unwrap();
1585
1586 let stats = storage.batch_stats();
1587 assert_eq!(stats.batches_written, 1);
1588 assert_eq!(stats.events_written, 100);
1589 assert!(stats.avg_batch_size > 0.0);
1590 assert!(stats.events_per_sec > 0.0);
1591 assert_eq!(stats.size_flushes, 1);
1592 }
1593
1594 #[test]
1595 fn test_config_presets() {
1596 let high_throughput = ParquetStorageConfig::high_throughput();
1597 assert_eq!(high_throughput.batch_size, 50_000);
1598 assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
1599
1600 let low_latency = ParquetStorageConfig::low_latency();
1601 assert_eq!(low_latency.batch_size, 1_000);
1602 assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
1603
1604 let default = ParquetStorageConfig::default();
1605 assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
1606 assert_eq!(default.batch_size, 10_000);
1607 }
1608
1609 #[test]
1612 #[ignore]
1613 fn test_batch_write_throughput() {
1614 let temp_dir = TempDir::new().unwrap();
1615 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1616
1617 let event_count = 50_000;
1618
1619 let events: Vec<Event> = (0..event_count)
1621 .map(|i| create_test_event(&format!("entity-{i}")))
1622 .collect();
1623
1624 let start = std::time::Instant::now();
1625 let result = storage.batch_write(events).unwrap();
1626 storage.flush().unwrap(); let batch_duration = start.elapsed();
1628
1629 let batch_stats = storage.batch_stats();
1630
1631 println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
1632 println!("Events: {event_count}");
1633 println!("Duration: {batch_duration:?}");
1634 println!("Events/sec: {:.0}", result.events_per_sec);
1635 println!("Batches written: {}", batch_stats.batches_written);
1636 println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
1637 println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
1638
1639 assert!(
1642 result.events_per_sec > 10_000.0,
1643 "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
1644 result.events_per_sec
1645 );
1646 }
1647
1648 #[test]
1650 #[ignore]
1651 fn test_single_event_write_baseline() {
1652 let temp_dir = TempDir::new().unwrap();
1653 let config = ParquetStorageConfig {
1654 batch_size: 1, ..Default::default()
1656 };
1657 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1658
1659 let event_count = 1_000; let start = std::time::Instant::now();
1662 for i in 0..event_count {
1663 let event = create_test_event(&format!("entity-{i}"));
1664 storage.append_event(event).unwrap();
1665 }
1666 let duration = start.elapsed();
1667
1668 let events_per_sec = f64::from(event_count) / duration.as_secs_f64();
1669
1670 println!("\n=== Single-Event Write Baseline ===");
1671 println!("Events: {event_count}");
1672 println!("Duration: {duration:?}");
1673 println!("Events/sec: {events_per_sec:.0}");
1674
1675 }
1678
1679 fn touch_parquet(path: &Path) {
1689 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
1690 std::fs::write(path, b"").unwrap();
1691 }
1692
1693 #[test]
1694 fn test_walker_finds_files_in_flat_layout() {
1695 let temp_dir = TempDir::new().unwrap();
1696 let root = temp_dir.path();
1697 touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1698 touch_parquet(&root.join("events-20260101-130000000-bbbb.parquet"));
1699
1700 let mut found = find_parquet_files_recursive(root).unwrap();
1701 found.sort();
1702 assert_eq!(found.len(), 2);
1703 assert!(
1704 found[0]
1705 .file_name()
1706 .unwrap()
1707 .to_str()
1708 .unwrap()
1709 .starts_with("events-"),
1710 "expected events-* file, got {found:?}"
1711 );
1712 }
1713
1714 #[test]
1715 fn test_walker_finds_files_in_tenant_partitioned_tree() {
1716 let temp_dir = TempDir::new().unwrap();
1717 let root = temp_dir.path();
1718 touch_parquet(&root.join("tenant-a/2026-01/events-20260101-120000000-aaaa.parquet"));
1720 touch_parquet(&root.join("tenant-a/2026-02/events-20260201-120000000-bbbb.parquet"));
1721 touch_parquet(&root.join("tenant-b/2026-01/events-20260103-120000000-cccc.parquet"));
1722
1723 let found = find_parquet_files_recursive(root).unwrap();
1724 assert_eq!(found.len(), 3);
1725 assert!(found[0].to_str().unwrap().contains("tenant-a"));
1728 assert!(found[1].to_str().unwrap().contains("tenant-a"));
1729 assert!(found[2].to_str().unwrap().contains("tenant-b"));
1730 }
1731
1732 #[test]
1733 fn test_walker_handles_mixed_legacy_and_partitioned_layouts() {
1734 let temp_dir = TempDir::new().unwrap();
1739 let root = temp_dir.path();
1740 touch_parquet(&root.join("events-legacy-aaaa.parquet"));
1741 touch_parquet(&root.join("tenant-a/2026-01/events-new-bbbb.parquet"));
1742
1743 let found = find_parquet_files_recursive(root).unwrap();
1744 assert_eq!(found.len(), 2);
1745 }
1746
1747 #[test]
1748 fn test_walker_ignores_non_parquet_files() {
1749 let temp_dir = TempDir::new().unwrap();
1750 let root = temp_dir.path();
1751 std::fs::write(root.join("README.md"), b"hello").unwrap();
1752 std::fs::write(root.join("events.json"), b"[]").unwrap();
1753 touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1754 std::fs::write(root.join("not-a-parquet-file.bin"), b"").unwrap();
1757
1758 let found = find_parquet_files_recursive(root).unwrap();
1759 assert_eq!(found.len(), 1);
1760 assert_eq!(
1761 found[0].extension().and_then(|s| s.to_str()),
1762 Some("parquet")
1763 );
1764 }
1765
1766 fn event_with_tenant(tenant: &str, entity_id: &str) -> Event {
1770 Event::reconstruct_from_strings(
1771 uuid::Uuid::new_v4(),
1772 "test.event".to_string(),
1773 entity_id.to_string(),
1774 tenant.to_string(),
1775 json!({"k": "v"}),
1776 chrono::Utc::now(),
1777 None,
1778 1,
1779 )
1780 }
1781
1782 #[test]
1783 fn test_flush_writes_into_per_tenant_partition() {
1784 let temp_dir = TempDir::new().unwrap();
1788 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1789
1790 for i in 0..3 {
1791 storage
1792 .append_event(event_with_tenant("default", &format!("entity-{i}")))
1793 .unwrap();
1794 }
1795 storage.flush().unwrap();
1796
1797 let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1798 assert_eq!(parquet_files.len(), 1);
1799
1800 let rel = parquet_files[0]
1801 .strip_prefix(temp_dir.path())
1802 .unwrap()
1803 .to_string_lossy()
1804 .into_owned();
1805 let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
1807 assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
1808 assert_eq!(parts[0], "default");
1809 assert!(
1812 parts[1].len() == 7 && parts[1].as_bytes()[4] == b'-',
1813 "expected yyyy-mm, got {}",
1814 parts[1]
1815 );
1816 assert!(parts[2].starts_with("events-") && parts[2].ends_with(".parquet"));
1817
1818 let loaded = storage.load_all_events().unwrap();
1819 assert_eq!(loaded.len(), 3);
1820 }
1821
1822 #[test]
1823 fn test_multiple_tenants_get_isolated_subtrees() {
1824 let temp_dir = TempDir::new().unwrap();
1827 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1828
1829 for i in 0..2 {
1830 storage
1831 .append_event(event_with_tenant("alice", &format!("a-{i}")))
1832 .unwrap();
1833 }
1834 for i in 0..3 {
1835 storage
1836 .append_event(event_with_tenant("bob", &format!("b-{i}")))
1837 .unwrap();
1838 }
1839 storage.flush().unwrap();
1840
1841 let alice_subtree = temp_dir.path().join("alice");
1842 let bob_subtree = temp_dir.path().join("bob");
1843 assert!(alice_subtree.is_dir(), "alice should have its own subtree");
1844 assert!(bob_subtree.is_dir(), "bob should have its own subtree");
1845
1846 let alice_files = find_parquet_files_recursive(&alice_subtree).unwrap();
1847 let bob_files = find_parquet_files_recursive(&bob_subtree).unwrap();
1848 assert_eq!(alice_files.len(), 1);
1849 assert_eq!(bob_files.len(), 1);
1850
1851 let loaded = storage.load_all_events().unwrap();
1854 let (alice_count, bob_count) =
1855 loaded
1856 .iter()
1857 .fold((0, 0), |(a, b), e| match e.tenant_id_str() {
1858 "alice" => (a + 1, b),
1859 "bob" => (a, b + 1),
1860 _ => (a, b),
1861 });
1862 assert_eq!(alice_count, 2);
1863 assert_eq!(bob_count, 3);
1864 }
1865
1866 #[test]
1867 fn test_size_flush_only_drains_full_tenant() {
1868 let temp_dir = TempDir::new().unwrap();
1872 let config = ParquetStorageConfig {
1873 batch_size: 5,
1874 ..Default::default()
1875 };
1876 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1877
1878 for i in 0..5 {
1881 storage
1882 .append_event(event_with_tenant("alice", &format!("a-{i}")))
1883 .unwrap();
1884 }
1885 for i in 0..2 {
1887 storage
1888 .append_event(event_with_tenant("bob", &format!("b-{i}")))
1889 .unwrap();
1890 }
1891
1892 assert_eq!(
1893 storage.pending_count(),
1894 2,
1895 "only bob's 2 events should be pending"
1896 );
1897
1898 let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1899 assert_eq!(parquet_files.len(), 1, "only alice should have flushed");
1900 assert!(
1901 parquet_files[0]
1902 .to_string_lossy()
1903 .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
1904 "expected alice partition, got {}",
1905 parquet_files[0].display()
1906 );
1907 }
1908
1909 #[test]
1910 fn test_tenant_id_from_path_recovers_tenant_for_partitioned_files() {
1911 let root = Path::new("/data/storage");
1912 let f = Path::new("/data/storage/alice/2026-04/events-20260426-120000000-aaaa.parquet");
1913 assert_eq!(tenant_id_from_path(root, f), "alice");
1914 }
1915
1916 #[test]
1917 fn test_tenant_id_from_path_falls_back_to_default_for_legacy_flat_layout() {
1918 let root = Path::new("/data/storage");
1919 let f = Path::new("/data/storage/events-20260426-120000000-aaaa.parquet");
1920 assert_eq!(tenant_id_from_path(root, f), "default");
1923 }
1924
1925 #[test]
1926 fn test_sanitize_tenant_id_for_path_accepts_safe_inputs() {
1927 for ok in [
1928 "default",
1929 "system",
1930 "1e6b2d1c-2f64-4441-9cf9-42f2e451aa17",
1931 "onboard-diagnostic-160-at-example-com",
1932 "tenant_with_underscore",
1933 "v1.0",
1934 ] {
1935 assert!(
1936 sanitize_tenant_id_for_path(ok).is_ok(),
1937 "{ok:?} should be accepted"
1938 );
1939 }
1940 }
1941
1942 #[test]
1943 fn test_sanitize_tenant_id_for_path_rejects_unsafe_inputs() {
1944 for bad in [
1945 "", "..", ".", "foo/bar", "foo\\bar", "foo bar", "foo\nbar", "foo\0bar", "tenant?", "tenant*", ] {
1956 assert!(
1957 sanitize_tenant_id_for_path(bad).is_err(),
1958 "{bad:?} should be rejected"
1959 );
1960 }
1961
1962 let too_long = "a".repeat(129);
1964 assert!(sanitize_tenant_id_for_path(&too_long).is_err());
1965 }
1966
1967 #[test]
1968 fn test_partition_path_for_tenant_shape() {
1969 let root = Path::new("/data");
1970 let when = chrono::DateTime::parse_from_rfc3339("2026-04-26T12:00:00Z")
1971 .unwrap()
1972 .with_timezone(&chrono::Utc);
1973 let path = partition_path_for_tenant(root, "alice", when).unwrap();
1974 assert_eq!(path, Path::new("/data/alice/2026-04"));
1975 }
1976
1977 #[test]
1978 fn test_append_event_rejects_unsafe_tenant_at_flush() {
1979 let temp_dir = TempDir::new().unwrap();
1985 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1986
1987 storage
1991 .append_event(event_with_tenant("../escape", "e-0"))
1992 .unwrap();
1993 let result = storage.flush();
1994 assert!(result.is_err(), "flush should reject unsafe tenant_id");
1995 let msg = format!("{}", result.unwrap_err());
1996 assert!(
1997 msg.contains("disallowed character") || msg.contains("reserved"),
1998 "expected sanitization error message, got: {msg}"
1999 );
2000 }
2001
2002 #[test]
2007 fn test_load_events_for_tenant_only_walks_target_subtree() {
2008 let temp_dir = TempDir::new().unwrap();
2013 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2014
2015 for i in 0..2 {
2016 storage
2017 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2018 .unwrap();
2019 }
2020 for i in 0..3 {
2021 storage
2022 .append_event(event_with_tenant("bob", &format!("b-{i}")))
2023 .unwrap();
2024 }
2025 for i in 0..1 {
2026 storage
2027 .append_event(event_with_tenant("carol", &format!("c-{i}")))
2028 .unwrap();
2029 }
2030 storage.flush().unwrap();
2031
2032 let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
2033 assert_eq!(alice_files.len(), 1);
2034 assert!(
2035 alice_files[0]
2036 .to_string_lossy()
2037 .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
2038 "expected alice file, got {}",
2039 alice_files[0].display()
2040 );
2041 for f in &alice_files {
2045 let s = f.to_string_lossy();
2046 assert!(!s.contains("bob"), "alice listing leaked bob file: {s}");
2047 assert!(!s.contains("carol"), "alice listing leaked carol file: {s}");
2048 }
2049
2050 let alice_events = storage.load_events_for_tenant("alice").unwrap();
2051 assert_eq!(alice_events.len(), 2);
2052 for e in &alice_events {
2053 assert_eq!(e.tenant_id_str(), "alice");
2054 }
2055
2056 let bob_events = storage.load_events_for_tenant("bob").unwrap();
2057 assert_eq!(bob_events.len(), 3);
2058 for e in &bob_events {
2059 assert_eq!(e.tenant_id_str(), "bob");
2060 }
2061
2062 let carol_events = storage.load_events_for_tenant("carol").unwrap();
2063 assert_eq!(carol_events.len(), 1);
2064 assert_eq!(carol_events[0].tenant_id_str(), "carol");
2065 }
2066
2067 #[test]
2068 fn test_load_events_for_tenant_returns_empty_when_subtree_missing() {
2069 let temp_dir = TempDir::new().unwrap();
2073 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2074
2075 storage
2078 .append_event(event_with_tenant("alice", "a-0"))
2079 .unwrap();
2080 storage.flush().unwrap();
2081
2082 let files = storage
2083 .list_parquet_files_for_tenant("nobody-here")
2084 .unwrap();
2085 assert!(files.is_empty());
2086
2087 let events = storage.load_events_for_tenant("nobody-here").unwrap();
2088 assert!(events.is_empty());
2089 }
2090
2091 #[test]
2092 fn test_load_events_for_tenant_rejects_unsafe_tenant_id() {
2093 let temp_dir = TempDir::new().unwrap();
2096 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2097
2098 for unsafe_tid in ["..", "a/b", "a\\b", "", "a..b/.."] {
2099 let result = storage.load_events_for_tenant(unsafe_tid);
2100 assert!(
2101 result.is_err(),
2102 "tenant_id {unsafe_tid:?} should have been rejected"
2103 );
2104 }
2105 }
2106
2107 #[test]
2108 fn test_load_events_for_tenant_ignores_legacy_flat_layout_files() {
2109 let temp_dir = TempDir::new().unwrap();
2117 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2118
2119 let _flat = seed_flat_layout_file(&storage, 4);
2121
2122 let default_events = storage.load_events_for_tenant("default").unwrap();
2125 assert!(
2126 default_events.is_empty(),
2127 "tenant-scoped load must not pick up flat-layout files; got {} events",
2128 default_events.len()
2129 );
2130
2131 let all_events = storage.load_all_events().unwrap();
2133 assert_eq!(all_events.len(), 4);
2134 }
2135
2136 #[test]
2141 fn test_write_atomic_parquet_emits_file_under_tenant_partition() {
2142 let temp_dir = TempDir::new().unwrap();
2147 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2148
2149 let events: Vec<Event> = (0..3)
2150 .map(|i| event_with_tenant("alice", &format!("a-{i}")))
2151 .collect();
2152
2153 let final_path = storage
2154 .write_atomic_parquet("alice", "snapshot.alice.range", &events)
2155 .unwrap();
2156
2157 let rel = final_path
2159 .strip_prefix(temp_dir.path())
2160 .unwrap()
2161 .to_string_lossy()
2162 .into_owned();
2163 let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
2164 assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
2165 assert_eq!(parts[0], "alice");
2166 assert_eq!(parts[2], "snapshot.alice.range.parquet");
2167
2168 assert!(final_path.is_file());
2170 let tmp = final_path.with_extension("parquet.tmp");
2171 assert!(
2172 !tmp.exists(),
2173 "tmp should have been renamed away; still at {}",
2174 tmp.display()
2175 );
2176
2177 let loaded = storage.load_events_for_tenant("alice").unwrap();
2179 assert_eq!(loaded.len(), 3);
2180 }
2181
2182 #[test]
2183 fn test_write_atomic_parquet_rejects_empty_events() {
2184 let temp_dir = TempDir::new().unwrap();
2185 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2186 let result = storage.write_atomic_parquet("alice", "snap", &[]);
2187 assert!(result.is_err());
2188 }
2189
2190 #[test]
2191 fn test_write_atomic_parquet_rejects_unsafe_tenant() {
2192 let temp_dir = TempDir::new().unwrap();
2193 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2194 let events = [event_with_tenant("alice", "e-0")];
2195 for unsafe_tid in ["..", "a/b", ""] {
2196 let result = storage.write_atomic_parquet(unsafe_tid, "snap", &events);
2197 assert!(
2198 result.is_err(),
2199 "unsafe tenant_id {unsafe_tid:?} should have been rejected"
2200 );
2201 }
2202 }
2203
2204 #[test]
2205 fn test_cleanup_partial_writes_removes_orphan_tmps() {
2206 let temp_dir = TempDir::new().unwrap();
2210 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2211
2212 for i in 0..2 {
2215 storage
2216 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2217 .unwrap();
2218 }
2219 storage.flush().unwrap();
2220 let real_files_before = find_parquet_files_recursive(temp_dir.path()).unwrap();
2221 assert_eq!(real_files_before.len(), 1);
2222
2223 let alice_subtree = temp_dir.path().join("alice");
2225 let orphan_dir = real_files_before[0].parent().unwrap();
2226 let orphan_path = orphan_dir.join("snapshot.alice.crashed.parquet.tmp");
2227 std::fs::write(&orphan_path, b"fake partial parquet").unwrap();
2228 assert!(orphan_path.is_file());
2229
2230 let nested_dir = alice_subtree.join("2099-01");
2232 std::fs::create_dir_all(&nested_dir).unwrap();
2233 let nested_orphan = nested_dir.join("events-x.parquet.tmp");
2234 std::fs::write(&nested_orphan, b"junk").unwrap();
2235
2236 let removed = storage.cleanup_partial_writes().unwrap();
2237 assert_eq!(removed, 2, "two orphan tmps should have been cleaned");
2238 assert!(!orphan_path.exists());
2239 assert!(!nested_orphan.exists());
2240
2241 let real_files_after = find_parquet_files_recursive(temp_dir.path()).unwrap();
2243 assert_eq!(real_files_after, real_files_before);
2244 }
2245
2246 #[test]
2247 fn test_cleanup_partial_writes_quarantines_zero_byte_parquet() {
2248 let temp_dir = TempDir::new().unwrap();
2253 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2254
2255 for i in 0..2 {
2257 storage
2258 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2259 .unwrap();
2260 }
2261 storage.flush().unwrap();
2262 let healthy_files_before = find_parquet_files_recursive(temp_dir.path()).unwrap();
2263 assert_eq!(healthy_files_before.len(), 1);
2264 let healthy_file = &healthy_files_before[0];
2265 let healthy_dir = healthy_file.parent().unwrap();
2266
2267 let bricked = healthy_dir.join("events-bricked-deadbeef.parquet");
2269 std::fs::write(&bricked, b"").unwrap();
2270 assert_eq!(std::fs::metadata(&bricked).unwrap().len(), 0);
2271
2272 let acted = storage.cleanup_partial_writes().unwrap();
2273 assert_eq!(acted, 1, "only the 0-byte file should have been acted on");
2274
2275 assert!(!bricked.exists(), "0-byte file should have been renamed");
2278 let quarantined: Vec<_> = std::fs::read_dir(healthy_dir)
2279 .unwrap()
2280 .flatten()
2281 .map(|e| e.path())
2282 .filter(|p| {
2283 p.file_name()
2284 .and_then(|n| n.to_str())
2285 .is_some_and(|n| n.starts_with("events-bricked-deadbeef.parquet.corrupt-"))
2286 })
2287 .collect();
2288 assert_eq!(
2289 quarantined.len(),
2290 1,
2291 "expected one .parquet.corrupt-<ts> sibling"
2292 );
2293
2294 assert!(
2296 healthy_file.exists(),
2297 "healthy parquet must not be molested"
2298 );
2299 }
2300
2301 #[test]
2302 fn test_load_all_events_skips_zero_byte_parquet() {
2303 let temp_dir = TempDir::new().unwrap();
2307 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2308
2309 for i in 0..2 {
2311 storage
2312 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2313 .unwrap();
2314 }
2315 storage.flush().unwrap();
2316
2317 let healthy_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
2321 let bricked = healthy_files[0]
2322 .parent()
2323 .unwrap()
2324 .join("events-bricked-cafef00d.parquet");
2325 std::fs::write(&bricked, b"").unwrap();
2326
2327 let loaded = storage.load_all_events().unwrap();
2328 assert_eq!(
2329 loaded.len(),
2330 2,
2331 "all healthy events should still load despite the 0-byte file"
2332 );
2333 }
2334
2335 #[test]
2336 fn test_load_events_for_tenant_skips_zero_byte_parquet() {
2337 let temp_dir = TempDir::new().unwrap();
2340 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2341
2342 for i in 0..3 {
2343 storage
2344 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2345 .unwrap();
2346 }
2347 storage.flush().unwrap();
2348
2349 let healthy = find_parquet_files_recursive(temp_dir.path()).unwrap();
2350 let bricked = healthy[0]
2351 .parent()
2352 .unwrap()
2353 .join("events-bricked-feedface.parquet");
2354 std::fs::write(&bricked, b"").unwrap();
2355
2356 let events = storage.load_events_for_tenant("alice").unwrap();
2357 assert_eq!(events.len(), 3, "lazy load must skip the 0-byte file");
2358 }
2359
2360 #[test]
2361 fn test_flush_tenant_leaves_no_zero_byte_parquet_after_normal_flush() {
2362 let temp_dir = TempDir::new().unwrap();
2366 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2367
2368 for i in 0..5 {
2369 storage
2370 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2371 .unwrap();
2372 }
2373 storage.flush().unwrap();
2374
2375 let mut tmp_count = 0;
2376 let mut zero_byte_count = 0;
2377 let mut healthy_count = 0;
2378 let mut stack = vec![temp_dir.path().to_path_buf()];
2379 while let Some(d) = stack.pop() {
2380 for entry in std::fs::read_dir(&d).unwrap().flatten() {
2381 let p = entry.path();
2382 if p.is_dir() {
2383 stack.push(p);
2384 continue;
2385 }
2386 let name = p.file_name().unwrap().to_string_lossy().into_owned();
2387 if name.ends_with(".parquet.tmp") {
2388 tmp_count += 1;
2389 } else if name.ends_with(".parquet") {
2390 if std::fs::metadata(&p).unwrap().len() == 0 {
2391 zero_byte_count += 1;
2392 } else {
2393 healthy_count += 1;
2394 }
2395 }
2396 }
2397 }
2398 assert_eq!(tmp_count, 0, ".parquet.tmp survivors after flush");
2399 assert_eq!(zero_byte_count, 0, "0-byte .parquet survivors after flush");
2400 assert_eq!(healthy_count, 1, "expected exactly one healthy parquet");
2401 }
2402
2403 #[test]
2404 fn test_new_calls_cleanup_partial_writes_on_boot() {
2405 let temp_dir = TempDir::new().unwrap();
2409 let stale = temp_dir.path().join("orphan.parquet.tmp");
2410 std::fs::write(&stale, b"crash detritus").unwrap();
2411 assert!(stale.is_file());
2412
2413 let _storage = ParquetStorage::new(temp_dir.path()).unwrap();
2414 assert!(
2415 !stale.exists(),
2416 "stale tmp should have been cleaned by ParquetStorage::new"
2417 );
2418 }
2419
2420 fn seed_flat_layout_file(storage: &ParquetStorage, count: usize) -> PathBuf {
2430 for i in 0..count {
2431 storage
2432 .append_event(create_test_event(&format!("entity-{i}")))
2433 .unwrap();
2434 }
2435 storage.flush().unwrap();
2436
2437 let default_subtree = storage.storage_dir().join("default");
2442 let candidates = find_parquet_files_recursive(&default_subtree).unwrap();
2443 assert!(
2444 !candidates.is_empty(),
2445 "seed expected at least one file under default/"
2446 );
2447 let src = candidates.into_iter().max().unwrap();
2448
2449 let dst = storage.storage_dir().join(src.file_name().unwrap());
2450 std::fs::rename(&src, &dst).unwrap();
2451 if let Some(month_dir) = src.parent() {
2455 let _ = std::fs::remove_dir(month_dir);
2456 if let Some(tenant_dir) = month_dir.parent() {
2457 let _ = std::fs::remove_dir(tenant_dir);
2458 }
2459 }
2460 dst
2461 }
2462
2463 #[test]
2464 fn test_migrate_flat_layout_dry_run_touches_nothing() {
2465 let temp_dir = TempDir::new().unwrap();
2466 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2467 let flat = seed_flat_layout_file(&storage, 7);
2468 assert!(flat.is_file(), "test setup: flat file should exist");
2469
2470 let report = storage.migrate_flat_layout(true).unwrap();
2471 assert!(report.dry_run);
2472 assert_eq!(report.flat_files_seen, 1);
2473 assert_eq!(report.events_migrated, 7);
2474 assert_eq!(report.flat_files_removed, 0);
2475 assert_eq!(report.partitions_written, 0);
2476 assert!(
2477 flat.is_file(),
2478 "flat file must still be present after dry run"
2479 );
2480 }
2481
2482 #[test]
2483 fn test_migrate_flat_layout_moves_events_into_default_tree_and_removes_flat() {
2484 let temp_dir = TempDir::new().unwrap();
2485 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2486 let flat = seed_flat_layout_file(&storage, 5);
2487
2488 let report = storage.migrate_flat_layout(false).unwrap();
2489 assert!(!report.dry_run);
2490 assert_eq!(report.flat_files_seen, 1);
2491 assert_eq!(report.flat_files_removed, 1);
2492 assert_eq!(report.events_migrated, 5);
2493 assert!(report.partitions_written >= 1);
2494 assert!(
2495 !flat.exists(),
2496 "flat file should be deleted after migration"
2497 );
2498
2499 let post = find_parquet_files_recursive(temp_dir.path()).unwrap();
2500 assert!(
2501 post.iter().all(|p| {
2502 let rel = p
2503 .strip_prefix(temp_dir.path())
2504 .unwrap()
2505 .to_string_lossy()
2506 .into_owned();
2507 rel.starts_with(&format!("default{}", std::path::MAIN_SEPARATOR))
2508 }),
2509 "all migrated files should be under default/"
2510 );
2511
2512 let loaded = storage.load_all_events().unwrap();
2513 assert_eq!(loaded.len(), 5);
2514 }
2515
2516 #[test]
2517 fn test_migrate_flat_layout_is_idempotent_when_re_run_after_completion() {
2518 let temp_dir = TempDir::new().unwrap();
2519 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2520 let _flat = seed_flat_layout_file(&storage, 4);
2521
2522 let first = storage.migrate_flat_layout(false).unwrap();
2523 assert_eq!(first.events_migrated, 4);
2524
2525 let second = storage.migrate_flat_layout(false).unwrap();
2528 assert_eq!(second.flat_files_seen, 0);
2529 assert_eq!(second.events_migrated, 0);
2530 assert_eq!(second.flat_files_removed, 0);
2531
2532 let loaded = storage.load_all_events().unwrap();
2533 assert_eq!(loaded.len(), 4, "rerun must not duplicate or lose events");
2534 }
2535
2536 #[test]
2537 fn test_migrate_flat_layout_ignores_already_partitioned_data() {
2538 let temp_dir = TempDir::new().unwrap();
2541 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2542
2543 for i in 0..3 {
2544 storage
2545 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2546 .unwrap();
2547 }
2548 storage.flush().unwrap();
2549
2550 let _flat = seed_flat_layout_file(&storage, 2);
2551
2552 let report = storage.migrate_flat_layout(false).unwrap();
2553 assert_eq!(report.flat_files_seen, 1, "only the flat file is in scope");
2554 assert_eq!(report.events_migrated, 2);
2555
2556 let alice_files = find_parquet_files_recursive(&temp_dir.path().join("alice")).unwrap();
2557 assert_eq!(alice_files.len(), 1, "alice's tree must be untouched");
2558
2559 let loaded = storage.load_all_events().unwrap();
2560 assert_eq!(loaded.len(), 5);
2561 let alice_count = loaded
2562 .iter()
2563 .filter(|e| e.tenant_id_str() == "alice")
2564 .count();
2565 let default_count = loaded
2566 .iter()
2567 .filter(|e| e.tenant_id_str() == "default")
2568 .count();
2569 assert_eq!(alice_count, 3);
2570 assert_eq!(default_count, 2);
2571 }
2572
2573 #[test]
2574 fn test_migrate_flat_layout_with_no_flat_files_is_a_clean_noop() {
2575 let temp_dir = TempDir::new().unwrap();
2576 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2577 let report = storage.migrate_flat_layout(false).unwrap();
2578 assert_eq!(report.flat_files_seen, 0);
2579 assert_eq!(report.events_migrated, 0);
2580 }
2581}