1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4};
5use arrow::{
6 array::{
7 Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8 UInt64Builder,
9 },
10 datatypes::{DataType, Field, Schema, TimeUnit},
11 record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15 collections::HashMap,
16 fs::{self, File},
17 path::{Path, PathBuf},
18 sync::{
19 Arc, Mutex,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25pub const DEFAULT_BATCH_SIZE: usize = 10_000;
27
28pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
30
31#[derive(Debug, Clone)]
33pub struct ParquetStorageConfig {
34 pub batch_size: usize,
36 pub flush_timeout: Duration,
38 pub compression: parquet::basic::Compression,
40}
41
42impl Default for ParquetStorageConfig {
43 fn default() -> Self {
44 Self {
45 batch_size: DEFAULT_BATCH_SIZE,
46 flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
47 compression: parquet::basic::Compression::SNAPPY,
48 }
49 }
50}
51
52impl ParquetStorageConfig {
53 pub fn high_throughput() -> Self {
55 Self {
56 batch_size: 50_000,
57 flush_timeout: Duration::from_secs(10),
58 compression: parquet::basic::Compression::SNAPPY,
59 }
60 }
61
62 pub fn low_latency() -> Self {
64 Self {
65 batch_size: 1_000,
66 flush_timeout: Duration::from_secs(1),
67 compression: parquet::basic::Compression::SNAPPY,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Default)]
74pub struct BatchWriteStats {
75 pub batches_written: u64,
77 pub events_written: u64,
79 pub bytes_written: u64,
81 pub avg_batch_size: f64,
83 pub events_per_sec: f64,
85 pub total_write_time_ns: u64,
87 pub timeout_flushes: u64,
89 pub size_flushes: u64,
91}
92
93#[derive(Debug, Clone)]
95pub struct BatchWriteResult {
96 pub events_written: usize,
98 pub batches_flushed: usize,
100 pub duration: Duration,
102 pub events_per_sec: f64,
104}
105
106pub struct ParquetStorage {
115 storage_dir: PathBuf,
117
118 current_batches: Mutex<HashMap<String, Vec<Event>>>,
127
128 config: ParquetStorageConfig,
130
131 schema: Arc<Schema>,
133
134 last_flush_time: Mutex<Instant>,
136
137 batches_written: AtomicU64,
139 events_written: AtomicU64,
140 bytes_written: AtomicU64,
141 total_write_time_ns: AtomicU64,
142 timeout_flushes: AtomicU64,
143 size_flushes: AtomicU64,
144}
145
146impl ParquetStorage {
147 pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
149 Self::with_config(storage_dir, ParquetStorageConfig::default())
150 }
151
152 pub fn with_config(
154 storage_dir: impl AsRef<Path>,
155 config: ParquetStorageConfig,
156 ) -> Result<Self> {
157 let storage_dir = storage_dir.as_ref().to_path_buf();
158
159 fs::create_dir_all(&storage_dir).map_err(|e| {
161 AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
162 })?;
163
164 let schema = Arc::new(Schema::new(vec![
166 Field::new("event_id", DataType::Utf8, false),
167 Field::new("event_type", DataType::Utf8, false),
168 Field::new("entity_id", DataType::Utf8, false),
169 Field::new("payload", DataType::Utf8, false),
170 Field::new(
171 "timestamp",
172 DataType::Timestamp(TimeUnit::Microsecond, None),
173 false,
174 ),
175 Field::new("metadata", DataType::Utf8, true),
176 Field::new("version", DataType::UInt64, false),
177 ]));
178
179 let storage = Self {
180 storage_dir,
181 current_batches: Mutex::new(HashMap::new()),
182 config,
183 schema,
184 last_flush_time: Mutex::new(Instant::now()),
185 batches_written: AtomicU64::new(0),
186 events_written: AtomicU64::new(0),
187 bytes_written: AtomicU64::new(0),
188 total_write_time_ns: AtomicU64::new(0),
189 timeout_flushes: AtomicU64::new(0),
190 size_flushes: AtomicU64::new(0),
191 };
192
193 match storage.cleanup_partial_writes() {
200 Ok(0) => {}
201 Ok(n) => tracing::warn!(
202 "cleaned up {n} orphan snapshot tmp file(s) on boot — \
203 a previous run crashed mid-snapshot"
204 ),
205 Err(e) => tracing::error!("cleanup_partial_writes failed on boot: {e}"),
206 }
207
208 Ok(storage)
209 }
210
211 #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
213 pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
214 Self::with_config(
215 storage_dir,
216 ParquetStorageConfig {
217 batch_size: 1000,
218 ..Default::default()
219 },
220 )
221 }
222
223 #[cfg_attr(feature = "hotpath", hotpath::measure)]
233 pub fn append_event(&self, event: Event) -> Result<()> {
234 let tenant = event.tenant_id_str().to_string();
235 let should_flush_tenant = {
236 let mut batches = self.current_batches.lock().unwrap();
237 let entry = batches.entry(tenant.clone()).or_default();
238 entry.push(event);
239 entry.len() >= self.config.batch_size
240 };
241
242 if should_flush_tenant {
243 self.size_flushes.fetch_add(1, Ordering::Relaxed);
244 self.flush_tenant(&tenant)?;
245 }
246
247 Ok(())
248 }
249
250 #[cfg_attr(feature = "hotpath", hotpath::measure)]
256 pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
257 let start = Instant::now();
258 let event_count = events.len();
259
260 let mut grouped: HashMap<String, Vec<Event>> = HashMap::new();
263 for event in events {
264 grouped
265 .entry(event.tenant_id_str().to_string())
266 .or_default()
267 .push(event);
268 }
269
270 let mut tenants_to_flush: Vec<String> = Vec::new();
271 {
272 let mut batches = self.current_batches.lock().unwrap();
273 for (tenant, mut new_events) in grouped {
274 let entry = batches.entry(tenant.clone()).or_default();
275 entry.append(&mut new_events);
276 if entry.len() >= self.config.batch_size {
277 tenants_to_flush.push(tenant);
278 }
279 }
280 }
281
282 let mut batches_flushed = 0;
283 for tenant in tenants_to_flush {
284 self.size_flushes.fetch_add(1, Ordering::Relaxed);
285 self.flush_tenant(&tenant)?;
286 batches_flushed += 1;
287 }
288
289 let duration = start.elapsed();
290
291 Ok(BatchWriteResult {
292 events_written: event_count,
293 batches_flushed,
294 duration,
295 events_per_sec: event_count as f64 / duration.as_secs_f64(),
296 })
297 }
298
299 #[cfg_attr(feature = "hotpath", hotpath::measure)]
307 pub fn check_timeout_flush(&self) -> Result<bool> {
308 let should_flush = {
309 let last_flush = self.last_flush_time.lock().unwrap();
310 let batches = self.current_batches.lock().unwrap();
311 let any_pending = batches.values().any(|v| !v.is_empty());
312 any_pending && last_flush.elapsed() >= self.config.flush_timeout
313 };
314
315 if should_flush {
316 self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
317 self.flush()?;
318 Ok(true)
319 } else {
320 Ok(false)
321 }
322 }
323
324 #[cfg_attr(feature = "hotpath", hotpath::measure)]
331 pub fn flush(&self) -> Result<()> {
332 let tenants: Vec<String> = {
333 let batches = self.current_batches.lock().unwrap();
334 batches
335 .iter()
336 .filter(|(_, v)| !v.is_empty())
337 .map(|(k, _)| k.clone())
338 .collect()
339 };
340 if tenants.is_empty() {
341 return Ok(());
342 }
343 for tenant in tenants {
344 self.flush_tenant(&tenant)?;
345 }
346 Ok(())
347 }
348
349 fn flush_tenant(&self, tenant_id: &str) -> Result<()> {
357 let events_to_write = {
358 let mut batches = self.current_batches.lock().unwrap();
359 match batches.get_mut(tenant_id) {
360 Some(v) if !v.is_empty() => std::mem::take(v),
361 _ => return Ok(()),
362 }
363 };
364
365 let batch_count = events_to_write.len();
366 let start = Instant::now();
367
368 let record_batch = self.events_to_record_batch(&events_to_write)?;
369
370 let now = chrono::Utc::now();
371 let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, now)?;
372 fs::create_dir_all(&partition_dir).map_err(|e| {
373 AllSourceError::StorageError(format!(
374 "Failed to create tenant partition {}: {e}",
375 partition_dir.display()
376 ))
377 })?;
378 let filename = format!(
379 "events-{}-{}.parquet",
380 now.format("%Y%m%d-%H%M%S%3f"),
381 uuid::Uuid::new_v4().as_simple()
382 );
383 let file_path = partition_dir.join(&filename);
384
385 tracing::info!(
386 "Flushing {} events for tenant={} to {}",
387 batch_count,
388 tenant_id,
389 file_path.display()
390 );
391
392 let file = File::create(&file_path).map_err(|e| {
393 AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
394 })?;
395
396 let props = WriterProperties::builder()
397 .set_compression(self.config.compression)
398 .build();
399
400 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
401 writer.write(&record_batch)?;
402 let file_metadata = writer.close()?;
403
404 let duration = start.elapsed();
405
406 self.batches_written.fetch_add(1, Ordering::Relaxed);
407 self.events_written
408 .fetch_add(batch_count as u64, Ordering::Relaxed);
409 if let Some(size) = file_metadata
410 .row_groups()
411 .first()
412 .map(parquet::file::metadata::RowGroupMetaData::total_byte_size)
413 {
414 self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
415 }
416 self.total_write_time_ns
417 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
418
419 {
420 let mut last_flush = self.last_flush_time.lock().unwrap();
421 *last_flush = Instant::now();
422 }
423
424 tracing::info!(
425 "Wrote {} events for tenant={} to {} in {:?}",
426 batch_count,
427 tenant_id,
428 file_path.display(),
429 duration
430 );
431
432 Ok(())
433 }
434
435 pub fn write_atomic_parquet(
459 &self,
460 tenant_id: &str,
461 file_stem: &str,
462 events: &[Event],
463 ) -> Result<PathBuf> {
464 if events.is_empty() {
465 return Err(AllSourceError::StorageError(
466 "write_atomic_parquet called with empty event slice".to_string(),
467 ));
468 }
469 let anchor_ts = events
474 .iter()
475 .map(|e| e.timestamp)
476 .min()
477 .unwrap_or_else(chrono::Utc::now);
478 let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, anchor_ts)?;
479 fs::create_dir_all(&partition_dir).map_err(|e| {
480 AllSourceError::StorageError(format!(
481 "Failed to create tenant partition {}: {e}",
482 partition_dir.display()
483 ))
484 })?;
485
486 let final_path = partition_dir.join(format!("{file_stem}.parquet"));
487 let tmp_path = partition_dir.join(format!("{file_stem}.parquet.tmp"));
488
489 let record_batch = self.events_to_record_batch(events)?;
491 {
492 let file = File::create(&tmp_path).map_err(|e| {
493 AllSourceError::StorageError(format!(
494 "Failed to create snapshot tmp file {}: {e}",
495 tmp_path.display()
496 ))
497 })?;
498
499 let props = WriterProperties::builder()
500 .set_compression(self.config.compression)
501 .build();
502
503 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
504 writer.write(&record_batch)?;
505 let _meta = writer.close()?;
509 }
510
511 let tmp_file = File::open(&tmp_path).map_err(|e| {
513 AllSourceError::StorageError(format!(
514 "Failed to reopen snapshot tmp for fsync {}: {e}",
515 tmp_path.display()
516 ))
517 })?;
518 tmp_file.sync_all().map_err(|e| {
519 AllSourceError::StorageError(format!("fsync on snapshot tmp failed: {e}"))
520 })?;
521 drop(tmp_file);
522
523 fs::rename(&tmp_path, &final_path).map_err(|e| {
525 AllSourceError::StorageError(format!(
526 "Failed to rename {} → {}: {e}",
527 tmp_path.display(),
528 final_path.display()
529 ))
530 })?;
531
532 if let Ok(dir) = File::open(&partition_dir) {
536 let _ = dir.sync_all();
537 }
538
539 tracing::info!(
540 tenant_id = tenant_id,
541 file = %final_path.display(),
542 event_count = events.len(),
543 "wrote atomic snapshot file"
544 );
545
546 Ok(final_path)
547 }
548
549 pub fn cleanup_partial_writes(&self) -> Result<usize> {
563 let mut deleted = 0usize;
564 let mut stack: Vec<PathBuf> = vec![self.storage_dir.clone()];
565 while let Some(dir) = stack.pop() {
566 let Ok(entries) = fs::read_dir(&dir) else {
567 continue;
568 };
569 for entry in entries.flatten() {
570 let path = entry.path();
571 let Ok(ft) = entry.file_type() else { continue };
572 if ft.is_dir() {
573 stack.push(path);
574 } else if ft.is_file() && path.to_string_lossy().ends_with(".parquet.tmp") {
575 match fs::remove_file(&path) {
576 Ok(()) => {
577 tracing::warn!(
578 file = %path.display(),
579 "cleaned up orphan snapshot tmp file (crash recovery)"
580 );
581 deleted += 1;
582 }
583 Err(e) => {
584 tracing::error!(
585 file = %path.display(),
586 "failed to remove orphan snapshot tmp file: {e}"
587 );
588 }
589 }
590 }
591 }
592 }
593 Ok(deleted)
594 }
595
596 #[cfg_attr(feature = "hotpath", hotpath::measure)]
602 pub fn flush_on_shutdown(&self) -> Result<usize> {
603 let total_pending: usize = {
604 let batches = self.current_batches.lock().unwrap();
605 batches.values().map(Vec::len).sum()
606 };
607
608 if total_pending > 0 {
609 tracing::info!(
610 "Shutdown: flushing {} pending events across all tenants",
611 total_pending
612 );
613 self.flush()?;
614 }
615
616 Ok(total_pending)
617 }
618
619 pub fn batch_stats(&self) -> BatchWriteStats {
621 let batches = self.batches_written.load(Ordering::Relaxed);
622 let events = self.events_written.load(Ordering::Relaxed);
623 let bytes = self.bytes_written.load(Ordering::Relaxed);
624 let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
625
626 let time_secs = time_ns as f64 / 1_000_000_000.0;
627
628 BatchWriteStats {
629 batches_written: batches,
630 events_written: events,
631 bytes_written: bytes,
632 avg_batch_size: if batches > 0 {
633 events as f64 / batches as f64
634 } else {
635 0.0
636 },
637 events_per_sec: if time_secs > 0.0 {
638 events as f64 / time_secs
639 } else {
640 0.0
641 },
642 total_write_time_ns: time_ns,
643 timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
644 size_flushes: self.size_flushes.load(Ordering::Relaxed),
645 }
646 }
647
648 pub fn pending_count(&self) -> usize {
650 self.current_batches
651 .lock()
652 .unwrap()
653 .values()
654 .map(Vec::len)
655 .sum()
656 }
657
658 pub fn batch_size(&self) -> usize {
660 self.config.batch_size
661 }
662
663 pub fn flush_timeout(&self) -> Duration {
665 self.config.flush_timeout
666 }
667
668 #[cfg_attr(feature = "hotpath", hotpath::measure)]
670 fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
671 let mut event_id_builder = StringBuilder::new();
672 let mut event_type_builder = StringBuilder::new();
673 let mut entity_id_builder = StringBuilder::new();
674 let mut payload_builder = StringBuilder::new();
675 let mut timestamp_builder = TimestampMicrosecondBuilder::new();
676 let mut metadata_builder = StringBuilder::new();
677 let mut version_builder = UInt64Builder::new();
678
679 for event in events {
680 event_id_builder.append_value(event.id.to_string());
681 event_type_builder.append_value(event.event_type_str());
682 entity_id_builder.append_value(event.entity_id_str());
683 payload_builder.append_value(serde_json::to_string(&event.payload)?);
684
685 let timestamp_micros = event.timestamp.timestamp_micros();
687 timestamp_builder.append_value(timestamp_micros);
688
689 if let Some(ref metadata) = event.metadata {
690 metadata_builder.append_value(serde_json::to_string(metadata)?);
691 } else {
692 metadata_builder.append_null();
693 }
694
695 version_builder.append_value(event.version as u64);
696 }
697
698 let arrays: Vec<ArrayRef> = vec![
699 Arc::new(event_id_builder.finish()),
700 Arc::new(event_type_builder.finish()),
701 Arc::new(entity_id_builder.finish()),
702 Arc::new(payload_builder.finish()),
703 Arc::new(timestamp_builder.finish()),
704 Arc::new(metadata_builder.finish()),
705 Arc::new(version_builder.finish()),
706 ];
707
708 let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
709
710 Ok(record_batch)
711 }
712
713 #[cfg_attr(feature = "hotpath", hotpath::measure)]
720 pub fn load_all_events(&self) -> Result<Vec<Event>> {
721 let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
722
723 let mut all_events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
724 for file_path in parquet_files {
725 tracing::info!("Loading events from {}", file_path.display());
726 let tenant_id = tenant_id_from_path(&self.storage_dir, &file_path);
727 let file_events = self.load_events_from_file(&file_path, &tenant_id)?;
728 all_events.extend(file_events);
729 }
730
731 tracing::info!("Loaded {} total events from storage", all_events.len());
732
733 Ok(all_events)
734 }
735
736 #[cfg_attr(feature = "hotpath", hotpath::measure)]
742 fn load_events_from_file(&self, file_path: &Path, tenant_id: &str) -> Result<Vec<Event>> {
743 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
744
745 let file = File::open(file_path).map_err(|e| {
746 AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
747 })?;
748
749 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
750 let mut reader = builder.build()?;
751
752 let mut events = Vec::new();
753
754 while let Some(Ok(batch)) = reader.next() {
755 let batch_events = self.record_batch_to_events(&batch, tenant_id)?;
756 events.extend(batch_events);
757 }
758
759 Ok(events)
760 }
761
762 pub fn load_events_from_file_path(
768 &self,
769 file_path: &Path,
770 tenant_id: &str,
771 ) -> Result<Vec<Event>> {
772 self.load_events_from_file(file_path, tenant_id)
773 }
774
775 #[cfg_attr(feature = "hotpath", hotpath::measure)]
779 fn record_batch_to_events(&self, batch: &RecordBatch, tenant_id: &str) -> Result<Vec<Event>> {
780 let event_ids = batch
781 .column(0)
782 .as_any()
783 .downcast_ref::<arrow::array::StringArray>()
784 .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
785
786 let event_types = batch
787 .column(1)
788 .as_any()
789 .downcast_ref::<arrow::array::StringArray>()
790 .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
791
792 let entity_ids = batch
793 .column(2)
794 .as_any()
795 .downcast_ref::<arrow::array::StringArray>()
796 .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
797
798 let payloads = batch
799 .column(3)
800 .as_any()
801 .downcast_ref::<arrow::array::StringArray>()
802 .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
803
804 let timestamps = batch
805 .column(4)
806 .as_any()
807 .downcast_ref::<TimestampMicrosecondArray>()
808 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
809
810 let metadatas = batch
811 .column(5)
812 .as_any()
813 .downcast_ref::<arrow::array::StringArray>()
814 .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
815
816 let versions = batch
817 .column(6)
818 .as_any()
819 .downcast_ref::<arrow::array::UInt64Array>()
820 .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
821
822 let mut events = Vec::new();
823
824 for i in 0..batch.num_rows() {
825 let id = uuid::Uuid::parse_str(event_ids.value(i))
826 .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
827
828 let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
829 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
830
831 let metadata = if metadatas.is_null(i) {
832 None
833 } else {
834 Some(serde_json::from_str(metadatas.value(i))?)
835 };
836
837 let event = Event::reconstruct_from_strings(
838 id,
839 event_types.value(i).to_string(),
840 entity_ids.value(i).to_string(),
841 tenant_id.to_string(),
842 serde_json::from_str(payloads.value(i))?,
843 timestamp,
844 metadata,
845 versions.value(i) as i64,
846 );
847
848 events.push(event);
849 }
850
851 Ok(events)
852 }
853
854 pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
860 find_parquet_files_recursive(&self.storage_dir)
861 }
862
863 pub fn list_parquet_files_for_tenant(&self, tenant_id: &str) -> Result<Vec<PathBuf>> {
876 let safe = sanitize_tenant_id_for_path(tenant_id)?;
877 let tenant_root = self.storage_dir.join(safe);
878 if !tenant_root.is_dir() {
879 return Ok(Vec::new());
880 }
881 find_parquet_files_recursive(&tenant_root)
882 }
883
884 #[cfg_attr(feature = "hotpath", hotpath::measure)]
902 pub fn load_events_for_tenant(&self, tenant_id: &str) -> Result<Vec<Event>> {
903 let parquet_files = self.list_parquet_files_for_tenant(tenant_id)?;
904 tracing::info!(
905 tenant_id = tenant_id,
906 file_count = parquet_files.len(),
907 "load_events_for_tenant: walking tenant subtree only"
908 );
909
910 let mut events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
911 for file_path in parquet_files {
912 tracing::debug!(
913 tenant_id = tenant_id,
914 file = %file_path.display(),
915 "load_events_for_tenant: opening file"
916 );
917 let file_events = self.load_events_from_file(&file_path, tenant_id)?;
918 events.extend(file_events);
919 }
920
921 tracing::info!(
922 tenant_id = tenant_id,
923 event_count = events.len(),
924 "load_events_for_tenant: complete"
925 );
926 Ok(events)
927 }
928
929 pub fn storage_dir(&self) -> &Path {
931 &self.storage_dir
932 }
933
934 pub fn migrate_flat_layout(&self, dry_run: bool) -> Result<MigrationReport> {
957 let flat_files = list_flat_layout_files(&self.storage_dir)?;
958 let mut report = MigrationReport {
959 dry_run,
960 ..Default::default()
961 };
962
963 for flat_file in flat_files {
964 let events = self.load_events_from_file(&flat_file, "default")?;
968 report.flat_files_seen += 1;
969
970 if events.is_empty() {
971 if !dry_run {
973 fs::remove_file(&flat_file).map_err(|e| {
974 AllSourceError::StorageError(format!(
975 "Failed to remove empty flat file {}: {e}",
976 flat_file.display()
977 ))
978 })?;
979 }
980 report.flat_files_removed += 1;
981 continue;
982 }
983
984 let mut groups: HashMap<(String, String), Vec<Event>> = HashMap::new();
990 for event in events {
991 let key = (
992 event.tenant_id_str().to_string(),
993 event.timestamp().format("%Y-%m").to_string(),
994 );
995 groups.entry(key).or_default().push(event);
996 }
997
998 for ((tenant, yyyy_mm), group_events) in groups {
999 let count = group_events.len();
1000 if !dry_run {
1001 let safe_tenant = sanitize_tenant_id_for_path(&tenant)?;
1002 let target_dir = self.storage_dir.join(safe_tenant).join(&yyyy_mm);
1003 fs::create_dir_all(&target_dir).map_err(|e| {
1004 AllSourceError::StorageError(format!(
1005 "Failed to create partition {}: {e}",
1006 target_dir.display()
1007 ))
1008 })?;
1009 let filename = format!(
1010 "events-{}-{}.parquet",
1011 chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
1012 uuid::Uuid::new_v4().as_simple()
1013 );
1014 let target_path = target_dir.join(&filename);
1015 let record_batch = self.events_to_record_batch(&group_events)?;
1016 let file = File::create(&target_path).map_err(|e| {
1017 AllSourceError::StorageError(format!(
1018 "Failed to create migration target {}: {e}",
1019 target_path.display()
1020 ))
1021 })?;
1022 let props = WriterProperties::builder()
1023 .set_compression(self.config.compression)
1024 .build();
1025 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
1026 writer.write(&record_batch)?;
1027 writer.close()?;
1028 report.partitions_written += 1;
1029 }
1030 report.events_migrated += count;
1031 }
1032
1033 if !dry_run {
1034 fs::remove_file(&flat_file).map_err(|e| {
1035 AllSourceError::StorageError(format!(
1036 "Failed to remove flat file {} after migration: {e}",
1037 flat_file.display()
1038 ))
1039 })?;
1040 report.flat_files_removed += 1;
1041 }
1042 }
1043
1044 Ok(report)
1045 }
1046
1047 pub fn stats(&self) -> Result<StorageStats> {
1049 let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
1050 let mut total_size_bytes = 0u64;
1051 for path in &parquet_files {
1052 if let Ok(metadata) = fs::metadata(path) {
1053 total_size_bytes += metadata.len();
1054 }
1055 }
1056
1057 let current_batch_size: usize = self
1058 .current_batches
1059 .lock()
1060 .unwrap()
1061 .values()
1062 .map(Vec::len)
1063 .sum();
1064
1065 Ok(StorageStats {
1066 total_files: parquet_files.len(),
1067 total_size_bytes,
1068 storage_dir: self.storage_dir.clone(),
1069 current_batch_size,
1070 })
1071 }
1072}
1073
1074fn sanitize_tenant_id_for_path(tenant_id: &str) -> Result<&str> {
1087 if tenant_id.is_empty() {
1088 return Err(AllSourceError::StorageError(
1089 "tenant_id is empty (cannot derive partition path)".to_string(),
1090 ));
1091 }
1092 if tenant_id.len() > 128 {
1093 return Err(AllSourceError::StorageError(format!(
1094 "tenant_id is too long for partition path: {} bytes (max 128)",
1095 tenant_id.len()
1096 )));
1097 }
1098 if tenant_id == "." || tenant_id == ".." {
1099 return Err(AllSourceError::StorageError(format!(
1100 "tenant_id {tenant_id:?} is reserved"
1101 )));
1102 }
1103 for c in tenant_id.chars() {
1104 let ok = c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.';
1105 if !ok {
1106 return Err(AllSourceError::StorageError(format!(
1107 "tenant_id {tenant_id:?} contains disallowed character {c:?} for partition path"
1108 )));
1109 }
1110 }
1111 Ok(tenant_id)
1112}
1113
1114fn partition_path_for_tenant(
1119 root: &Path,
1120 tenant_id: &str,
1121 when: chrono::DateTime<chrono::Utc>,
1122) -> Result<PathBuf> {
1123 let safe = sanitize_tenant_id_for_path(tenant_id)?;
1124 Ok(root.join(safe).join(when.format("%Y-%m").to_string()))
1125}
1126
1127fn tenant_id_from_path(root: &Path, file_path: &Path) -> String {
1137 let Ok(rel) = file_path.strip_prefix(root) else {
1138 return "default".to_string();
1139 };
1140 let mut comps = rel.components();
1141 let first = comps.next();
1142 let next = comps.next();
1143 match (first, next) {
1144 (Some(std::path::Component::Normal(tenant)), Some(_)) => {
1146 tenant.to_string_lossy().into_owned()
1147 }
1148 _ => "default".to_string(),
1150 }
1151}
1152
1153fn list_flat_layout_files(root: &Path) -> Result<Vec<PathBuf>> {
1159 let entries = fs::read_dir(root).map_err(|e| {
1160 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
1161 })?;
1162 let mut out: Vec<PathBuf> = entries
1163 .filter_map(std::result::Result::ok)
1164 .filter_map(|entry| {
1165 let ft = entry.file_type().ok()?;
1166 if !ft.is_file() {
1167 return None;
1168 }
1169 let path = entry.path();
1170 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
1171 Some(path)
1172 } else {
1173 None
1174 }
1175 })
1176 .collect();
1177 out.sort();
1178 Ok(out)
1179}
1180
1181fn find_parquet_files_recursive(root: &Path) -> Result<Vec<PathBuf>> {
1192 let mut out = Vec::new();
1193 let mut stack: Vec<PathBuf> = vec![root.to_path_buf()];
1194
1195 while let Some(dir) = stack.pop() {
1196 let entries = match fs::read_dir(&dir) {
1197 Ok(e) => e,
1198 Err(e) if dir == root => {
1202 return Err(AllSourceError::StorageError(format!(
1203 "Failed to read storage directory: {e}"
1204 )));
1205 }
1206 Err(_) => continue,
1207 };
1208
1209 for entry in entries.flatten() {
1210 let path = entry.path();
1211 let Ok(ft) = entry.file_type() else {
1215 continue;
1216 };
1217 if ft.is_dir() {
1218 stack.push(path);
1219 } else if ft.is_file()
1220 && path
1221 .extension()
1222 .and_then(|ext| ext.to_str())
1223 .is_some_and(|ext| ext == "parquet")
1224 {
1225 out.push(path);
1226 }
1227 }
1228 }
1229
1230 out.sort();
1231 Ok(out)
1232}
1233
1234impl Drop for ParquetStorage {
1235 fn drop(&mut self) {
1236 if let Err(e) = self.flush_on_shutdown() {
1238 tracing::error!("Failed to flush events on drop: {}", e);
1239 }
1240 }
1241}
1242
1243#[derive(Debug, Default, Clone, serde::Serialize)]
1245pub struct MigrationReport {
1246 pub dry_run: bool,
1248 pub flat_files_seen: usize,
1250 pub flat_files_removed: usize,
1252 pub partitions_written: usize,
1254 pub events_migrated: usize,
1256}
1257
1258#[derive(Debug, serde::Serialize)]
1259pub struct StorageStats {
1260 pub total_files: usize,
1261 pub total_size_bytes: u64,
1262 pub storage_dir: PathBuf,
1263 pub current_batch_size: usize,
1264}
1265
1266#[cfg(test)]
1267mod tests {
1268 use super::*;
1269 use serde_json::json;
1270 use std::sync::Arc;
1271 use tempfile::TempDir;
1272
1273 fn create_test_event(entity_id: &str) -> Event {
1274 Event::reconstruct_from_strings(
1275 uuid::Uuid::new_v4(),
1276 "test.event".to_string(),
1277 entity_id.to_string(),
1278 "default".to_string(),
1279 json!({
1280 "test": "data",
1281 "value": 42
1282 }),
1283 chrono::Utc::now(),
1284 None,
1285 1,
1286 )
1287 }
1288
1289 #[test]
1290 fn test_parquet_storage_write_read() {
1291 let temp_dir = TempDir::new().unwrap();
1292 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1293
1294 for i in 0..10 {
1296 let event = create_test_event(&format!("entity-{i}"));
1297 storage.append_event(event).unwrap();
1298 }
1299
1300 storage.flush().unwrap();
1302
1303 let loaded_events = storage.load_all_events().unwrap();
1305 assert_eq!(loaded_events.len(), 10);
1306 }
1307
1308 #[test]
1309 fn test_storage_stats() {
1310 let temp_dir = TempDir::new().unwrap();
1311 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1312
1313 for i in 0..5 {
1315 storage
1316 .append_event(create_test_event(&format!("entity-{i}")))
1317 .unwrap();
1318 }
1319 storage.flush().unwrap();
1320
1321 let stats = storage.stats().unwrap();
1322 assert_eq!(stats.total_files, 1);
1323 assert!(stats.total_size_bytes > 0);
1324 }
1325
1326 #[test]
1327 fn test_default_batch_size() {
1328 let temp_dir = TempDir::new().unwrap();
1329 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1330
1331 assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
1333 assert_eq!(storage.batch_size(), 10_000);
1334 }
1335
1336 #[test]
1337 fn test_custom_config() {
1338 let temp_dir = TempDir::new().unwrap();
1339 let config = ParquetStorageConfig {
1340 batch_size: 5_000,
1341 flush_timeout: Duration::from_secs(2),
1342 compression: parquet::basic::Compression::SNAPPY,
1343 };
1344 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1345
1346 assert_eq!(storage.batch_size(), 5_000);
1347 assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
1348 }
1349
1350 #[test]
1351 fn test_batch_write() {
1352 let temp_dir = TempDir::new().unwrap();
1353 let config = ParquetStorageConfig {
1354 batch_size: 100, ..Default::default()
1356 };
1357 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1358
1359 let events: Vec<Event> = (0..250)
1367 .map(|i| create_test_event(&format!("entity-{i}")))
1368 .collect();
1369
1370 let result = storage.batch_write(events).unwrap();
1371 assert_eq!(result.events_written, 250);
1372 assert_eq!(result.batches_flushed, 1);
1373 assert_eq!(storage.pending_count(), 0);
1374
1375 storage.flush().unwrap();
1377
1378 let loaded = storage.load_all_events().unwrap();
1380 assert_eq!(loaded.len(), 250);
1381 }
1382
1383 #[test]
1384 fn test_auto_flush_on_batch_size() {
1385 let temp_dir = TempDir::new().unwrap();
1386 let config = ParquetStorageConfig {
1387 batch_size: 10, ..Default::default()
1389 };
1390 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1391
1392 for i in 0..15 {
1394 storage
1395 .append_event(create_test_event(&format!("entity-{i}")))
1396 .unwrap();
1397 }
1398
1399 assert_eq!(storage.pending_count(), 5);
1401
1402 let stats = storage.batch_stats();
1403 assert_eq!(stats.events_written, 10);
1404 assert_eq!(stats.batches_written, 1);
1405 assert_eq!(stats.size_flushes, 1);
1406 }
1407
1408 #[test]
1409 fn test_flush_on_shutdown() {
1410 let temp_dir = TempDir::new().unwrap();
1411 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1412
1413 for i in 0..5 {
1415 storage
1416 .append_event(create_test_event(&format!("entity-{i}")))
1417 .unwrap();
1418 }
1419
1420 assert_eq!(storage.pending_count(), 5);
1421
1422 let flushed = storage.flush_on_shutdown().unwrap();
1424 assert_eq!(flushed, 5);
1425 assert_eq!(storage.pending_count(), 0);
1426
1427 let loaded = storage.load_all_events().unwrap();
1429 assert_eq!(loaded.len(), 5);
1430 }
1431
1432 #[test]
1433 fn test_thread_safe_writes() {
1434 let temp_dir = TempDir::new().unwrap();
1435 let config = ParquetStorageConfig {
1436 batch_size: 100,
1437 ..Default::default()
1438 };
1439 let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
1440
1441 let events_per_thread = 50;
1442 let thread_count = 4;
1443
1444 std::thread::scope(|s| {
1445 for t in 0..thread_count {
1446 let storage_ref = Arc::clone(&storage);
1447 s.spawn(move || {
1448 for i in 0..events_per_thread {
1449 let event = create_test_event(&format!("thread-{t}-entity-{i}"));
1450 storage_ref.append_event(event).unwrap();
1451 }
1452 });
1453 }
1454 });
1455
1456 storage.flush().unwrap();
1458
1459 let loaded = storage.load_all_events().unwrap();
1461 assert_eq!(loaded.len(), events_per_thread * thread_count);
1462 }
1463
1464 #[test]
1465 fn test_batch_stats() {
1466 let temp_dir = TempDir::new().unwrap();
1467 let config = ParquetStorageConfig {
1468 batch_size: 50,
1469 ..Default::default()
1470 };
1471 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1472
1473 let events: Vec<Event> = (0..100)
1478 .map(|i| create_test_event(&format!("entity-{i}")))
1479 .collect();
1480
1481 storage.batch_write(events).unwrap();
1482
1483 let stats = storage.batch_stats();
1484 assert_eq!(stats.batches_written, 1);
1485 assert_eq!(stats.events_written, 100);
1486 assert!(stats.avg_batch_size > 0.0);
1487 assert!(stats.events_per_sec > 0.0);
1488 assert_eq!(stats.size_flushes, 1);
1489 }
1490
1491 #[test]
1492 fn test_config_presets() {
1493 let high_throughput = ParquetStorageConfig::high_throughput();
1494 assert_eq!(high_throughput.batch_size, 50_000);
1495 assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
1496
1497 let low_latency = ParquetStorageConfig::low_latency();
1498 assert_eq!(low_latency.batch_size, 1_000);
1499 assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
1500
1501 let default = ParquetStorageConfig::default();
1502 assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
1503 assert_eq!(default.batch_size, 10_000);
1504 }
1505
1506 #[test]
1509 #[ignore]
1510 fn test_batch_write_throughput() {
1511 let temp_dir = TempDir::new().unwrap();
1512 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1513
1514 let event_count = 50_000;
1515
1516 let events: Vec<Event> = (0..event_count)
1518 .map(|i| create_test_event(&format!("entity-{i}")))
1519 .collect();
1520
1521 let start = std::time::Instant::now();
1522 let result = storage.batch_write(events).unwrap();
1523 storage.flush().unwrap(); let batch_duration = start.elapsed();
1525
1526 let batch_stats = storage.batch_stats();
1527
1528 println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
1529 println!("Events: {event_count}");
1530 println!("Duration: {batch_duration:?}");
1531 println!("Events/sec: {:.0}", result.events_per_sec);
1532 println!("Batches written: {}", batch_stats.batches_written);
1533 println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
1534 println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
1535
1536 assert!(
1539 result.events_per_sec > 10_000.0,
1540 "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
1541 result.events_per_sec
1542 );
1543 }
1544
1545 #[test]
1547 #[ignore]
1548 fn test_single_event_write_baseline() {
1549 let temp_dir = TempDir::new().unwrap();
1550 let config = ParquetStorageConfig {
1551 batch_size: 1, ..Default::default()
1553 };
1554 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1555
1556 let event_count = 1_000; let start = std::time::Instant::now();
1559 for i in 0..event_count {
1560 let event = create_test_event(&format!("entity-{i}"));
1561 storage.append_event(event).unwrap();
1562 }
1563 let duration = start.elapsed();
1564
1565 let events_per_sec = f64::from(event_count) / duration.as_secs_f64();
1566
1567 println!("\n=== Single-Event Write Baseline ===");
1568 println!("Events: {event_count}");
1569 println!("Duration: {duration:?}");
1570 println!("Events/sec: {events_per_sec:.0}");
1571
1572 }
1575
1576 fn touch_parquet(path: &Path) {
1586 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
1587 std::fs::write(path, b"").unwrap();
1588 }
1589
1590 #[test]
1591 fn test_walker_finds_files_in_flat_layout() {
1592 let temp_dir = TempDir::new().unwrap();
1593 let root = temp_dir.path();
1594 touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1595 touch_parquet(&root.join("events-20260101-130000000-bbbb.parquet"));
1596
1597 let mut found = find_parquet_files_recursive(root).unwrap();
1598 found.sort();
1599 assert_eq!(found.len(), 2);
1600 assert!(
1601 found[0]
1602 .file_name()
1603 .unwrap()
1604 .to_str()
1605 .unwrap()
1606 .starts_with("events-"),
1607 "expected events-* file, got {found:?}"
1608 );
1609 }
1610
1611 #[test]
1612 fn test_walker_finds_files_in_tenant_partitioned_tree() {
1613 let temp_dir = TempDir::new().unwrap();
1614 let root = temp_dir.path();
1615 touch_parquet(&root.join("tenant-a/2026-01/events-20260101-120000000-aaaa.parquet"));
1617 touch_parquet(&root.join("tenant-a/2026-02/events-20260201-120000000-bbbb.parquet"));
1618 touch_parquet(&root.join("tenant-b/2026-01/events-20260103-120000000-cccc.parquet"));
1619
1620 let found = find_parquet_files_recursive(root).unwrap();
1621 assert_eq!(found.len(), 3);
1622 assert!(found[0].to_str().unwrap().contains("tenant-a"));
1625 assert!(found[1].to_str().unwrap().contains("tenant-a"));
1626 assert!(found[2].to_str().unwrap().contains("tenant-b"));
1627 }
1628
1629 #[test]
1630 fn test_walker_handles_mixed_legacy_and_partitioned_layouts() {
1631 let temp_dir = TempDir::new().unwrap();
1636 let root = temp_dir.path();
1637 touch_parquet(&root.join("events-legacy-aaaa.parquet"));
1638 touch_parquet(&root.join("tenant-a/2026-01/events-new-bbbb.parquet"));
1639
1640 let found = find_parquet_files_recursive(root).unwrap();
1641 assert_eq!(found.len(), 2);
1642 }
1643
1644 #[test]
1645 fn test_walker_ignores_non_parquet_files() {
1646 let temp_dir = TempDir::new().unwrap();
1647 let root = temp_dir.path();
1648 std::fs::write(root.join("README.md"), b"hello").unwrap();
1649 std::fs::write(root.join("events.json"), b"[]").unwrap();
1650 touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1651 std::fs::write(root.join("not-a-parquet-file.bin"), b"").unwrap();
1654
1655 let found = find_parquet_files_recursive(root).unwrap();
1656 assert_eq!(found.len(), 1);
1657 assert_eq!(
1658 found[0].extension().and_then(|s| s.to_str()),
1659 Some("parquet")
1660 );
1661 }
1662
1663 fn event_with_tenant(tenant: &str, entity_id: &str) -> Event {
1667 Event::reconstruct_from_strings(
1668 uuid::Uuid::new_v4(),
1669 "test.event".to_string(),
1670 entity_id.to_string(),
1671 tenant.to_string(),
1672 json!({"k": "v"}),
1673 chrono::Utc::now(),
1674 None,
1675 1,
1676 )
1677 }
1678
1679 #[test]
1680 fn test_flush_writes_into_per_tenant_partition() {
1681 let temp_dir = TempDir::new().unwrap();
1685 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1686
1687 for i in 0..3 {
1688 storage
1689 .append_event(event_with_tenant("default", &format!("entity-{i}")))
1690 .unwrap();
1691 }
1692 storage.flush().unwrap();
1693
1694 let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1695 assert_eq!(parquet_files.len(), 1);
1696
1697 let rel = parquet_files[0]
1698 .strip_prefix(temp_dir.path())
1699 .unwrap()
1700 .to_string_lossy()
1701 .into_owned();
1702 let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
1704 assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
1705 assert_eq!(parts[0], "default");
1706 assert!(
1709 parts[1].len() == 7 && parts[1].as_bytes()[4] == b'-',
1710 "expected yyyy-mm, got {}",
1711 parts[1]
1712 );
1713 assert!(parts[2].starts_with("events-") && parts[2].ends_with(".parquet"));
1714
1715 let loaded = storage.load_all_events().unwrap();
1716 assert_eq!(loaded.len(), 3);
1717 }
1718
1719 #[test]
1720 fn test_multiple_tenants_get_isolated_subtrees() {
1721 let temp_dir = TempDir::new().unwrap();
1724 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1725
1726 for i in 0..2 {
1727 storage
1728 .append_event(event_with_tenant("alice", &format!("a-{i}")))
1729 .unwrap();
1730 }
1731 for i in 0..3 {
1732 storage
1733 .append_event(event_with_tenant("bob", &format!("b-{i}")))
1734 .unwrap();
1735 }
1736 storage.flush().unwrap();
1737
1738 let alice_subtree = temp_dir.path().join("alice");
1739 let bob_subtree = temp_dir.path().join("bob");
1740 assert!(alice_subtree.is_dir(), "alice should have its own subtree");
1741 assert!(bob_subtree.is_dir(), "bob should have its own subtree");
1742
1743 let alice_files = find_parquet_files_recursive(&alice_subtree).unwrap();
1744 let bob_files = find_parquet_files_recursive(&bob_subtree).unwrap();
1745 assert_eq!(alice_files.len(), 1);
1746 assert_eq!(bob_files.len(), 1);
1747
1748 let loaded = storage.load_all_events().unwrap();
1751 let (alice_count, bob_count) =
1752 loaded
1753 .iter()
1754 .fold((0, 0), |(a, b), e| match e.tenant_id_str() {
1755 "alice" => (a + 1, b),
1756 "bob" => (a, b + 1),
1757 _ => (a, b),
1758 });
1759 assert_eq!(alice_count, 2);
1760 assert_eq!(bob_count, 3);
1761 }
1762
1763 #[test]
1764 fn test_size_flush_only_drains_full_tenant() {
1765 let temp_dir = TempDir::new().unwrap();
1769 let config = ParquetStorageConfig {
1770 batch_size: 5,
1771 ..Default::default()
1772 };
1773 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1774
1775 for i in 0..5 {
1778 storage
1779 .append_event(event_with_tenant("alice", &format!("a-{i}")))
1780 .unwrap();
1781 }
1782 for i in 0..2 {
1784 storage
1785 .append_event(event_with_tenant("bob", &format!("b-{i}")))
1786 .unwrap();
1787 }
1788
1789 assert_eq!(
1790 storage.pending_count(),
1791 2,
1792 "only bob's 2 events should be pending"
1793 );
1794
1795 let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1796 assert_eq!(parquet_files.len(), 1, "only alice should have flushed");
1797 assert!(
1798 parquet_files[0]
1799 .to_string_lossy()
1800 .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
1801 "expected alice partition, got {}",
1802 parquet_files[0].display()
1803 );
1804 }
1805
1806 #[test]
1807 fn test_tenant_id_from_path_recovers_tenant_for_partitioned_files() {
1808 let root = Path::new("/data/storage");
1809 let f = Path::new("/data/storage/alice/2026-04/events-20260426-120000000-aaaa.parquet");
1810 assert_eq!(tenant_id_from_path(root, f), "alice");
1811 }
1812
1813 #[test]
1814 fn test_tenant_id_from_path_falls_back_to_default_for_legacy_flat_layout() {
1815 let root = Path::new("/data/storage");
1816 let f = Path::new("/data/storage/events-20260426-120000000-aaaa.parquet");
1817 assert_eq!(tenant_id_from_path(root, f), "default");
1820 }
1821
1822 #[test]
1823 fn test_sanitize_tenant_id_for_path_accepts_safe_inputs() {
1824 for ok in [
1825 "default",
1826 "system",
1827 "1e6b2d1c-2f64-4441-9cf9-42f2e451aa17",
1828 "onboard-diagnostic-160-at-example-com",
1829 "tenant_with_underscore",
1830 "v1.0",
1831 ] {
1832 assert!(
1833 sanitize_tenant_id_for_path(ok).is_ok(),
1834 "{ok:?} should be accepted"
1835 );
1836 }
1837 }
1838
1839 #[test]
1840 fn test_sanitize_tenant_id_for_path_rejects_unsafe_inputs() {
1841 for bad in [
1842 "", "..", ".", "foo/bar", "foo\\bar", "foo bar", "foo\nbar", "foo\0bar", "tenant?", "tenant*", ] {
1853 assert!(
1854 sanitize_tenant_id_for_path(bad).is_err(),
1855 "{bad:?} should be rejected"
1856 );
1857 }
1858
1859 let too_long = "a".repeat(129);
1861 assert!(sanitize_tenant_id_for_path(&too_long).is_err());
1862 }
1863
1864 #[test]
1865 fn test_partition_path_for_tenant_shape() {
1866 let root = Path::new("/data");
1867 let when = chrono::DateTime::parse_from_rfc3339("2026-04-26T12:00:00Z")
1868 .unwrap()
1869 .with_timezone(&chrono::Utc);
1870 let path = partition_path_for_tenant(root, "alice", when).unwrap();
1871 assert_eq!(path, Path::new("/data/alice/2026-04"));
1872 }
1873
1874 #[test]
1875 fn test_append_event_rejects_unsafe_tenant_at_flush() {
1876 let temp_dir = TempDir::new().unwrap();
1882 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1883
1884 storage
1888 .append_event(event_with_tenant("../escape", "e-0"))
1889 .unwrap();
1890 let result = storage.flush();
1891 assert!(result.is_err(), "flush should reject unsafe tenant_id");
1892 let msg = format!("{}", result.unwrap_err());
1893 assert!(
1894 msg.contains("disallowed character") || msg.contains("reserved"),
1895 "expected sanitization error message, got: {msg}"
1896 );
1897 }
1898
1899 #[test]
1904 fn test_load_events_for_tenant_only_walks_target_subtree() {
1905 let temp_dir = TempDir::new().unwrap();
1910 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1911
1912 for i in 0..2 {
1913 storage
1914 .append_event(event_with_tenant("alice", &format!("a-{i}")))
1915 .unwrap();
1916 }
1917 for i in 0..3 {
1918 storage
1919 .append_event(event_with_tenant("bob", &format!("b-{i}")))
1920 .unwrap();
1921 }
1922 for i in 0..1 {
1923 storage
1924 .append_event(event_with_tenant("carol", &format!("c-{i}")))
1925 .unwrap();
1926 }
1927 storage.flush().unwrap();
1928
1929 let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1930 assert_eq!(alice_files.len(), 1);
1931 assert!(
1932 alice_files[0]
1933 .to_string_lossy()
1934 .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
1935 "expected alice file, got {}",
1936 alice_files[0].display()
1937 );
1938 for f in &alice_files {
1942 let s = f.to_string_lossy();
1943 assert!(!s.contains("bob"), "alice listing leaked bob file: {s}");
1944 assert!(!s.contains("carol"), "alice listing leaked carol file: {s}");
1945 }
1946
1947 let alice_events = storage.load_events_for_tenant("alice").unwrap();
1948 assert_eq!(alice_events.len(), 2);
1949 for e in &alice_events {
1950 assert_eq!(e.tenant_id_str(), "alice");
1951 }
1952
1953 let bob_events = storage.load_events_for_tenant("bob").unwrap();
1954 assert_eq!(bob_events.len(), 3);
1955 for e in &bob_events {
1956 assert_eq!(e.tenant_id_str(), "bob");
1957 }
1958
1959 let carol_events = storage.load_events_for_tenant("carol").unwrap();
1960 assert_eq!(carol_events.len(), 1);
1961 assert_eq!(carol_events[0].tenant_id_str(), "carol");
1962 }
1963
1964 #[test]
1965 fn test_load_events_for_tenant_returns_empty_when_subtree_missing() {
1966 let temp_dir = TempDir::new().unwrap();
1970 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1971
1972 storage
1975 .append_event(event_with_tenant("alice", "a-0"))
1976 .unwrap();
1977 storage.flush().unwrap();
1978
1979 let files = storage
1980 .list_parquet_files_for_tenant("nobody-here")
1981 .unwrap();
1982 assert!(files.is_empty());
1983
1984 let events = storage.load_events_for_tenant("nobody-here").unwrap();
1985 assert!(events.is_empty());
1986 }
1987
1988 #[test]
1989 fn test_load_events_for_tenant_rejects_unsafe_tenant_id() {
1990 let temp_dir = TempDir::new().unwrap();
1993 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1994
1995 for unsafe_tid in ["..", "a/b", "a\\b", "", "a..b/.."] {
1996 let result = storage.load_events_for_tenant(unsafe_tid);
1997 assert!(
1998 result.is_err(),
1999 "tenant_id {unsafe_tid:?} should have been rejected"
2000 );
2001 }
2002 }
2003
2004 #[test]
2005 fn test_load_events_for_tenant_ignores_legacy_flat_layout_files() {
2006 let temp_dir = TempDir::new().unwrap();
2014 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2015
2016 let _flat = seed_flat_layout_file(&storage, 4);
2018
2019 let default_events = storage.load_events_for_tenant("default").unwrap();
2022 assert!(
2023 default_events.is_empty(),
2024 "tenant-scoped load must not pick up flat-layout files; got {} events",
2025 default_events.len()
2026 );
2027
2028 let all_events = storage.load_all_events().unwrap();
2030 assert_eq!(all_events.len(), 4);
2031 }
2032
2033 #[test]
2038 fn test_write_atomic_parquet_emits_file_under_tenant_partition() {
2039 let temp_dir = TempDir::new().unwrap();
2044 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2045
2046 let events: Vec<Event> = (0..3)
2047 .map(|i| event_with_tenant("alice", &format!("a-{i}")))
2048 .collect();
2049
2050 let final_path = storage
2051 .write_atomic_parquet("alice", "snapshot.alice.range", &events)
2052 .unwrap();
2053
2054 let rel = final_path
2056 .strip_prefix(temp_dir.path())
2057 .unwrap()
2058 .to_string_lossy()
2059 .into_owned();
2060 let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
2061 assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
2062 assert_eq!(parts[0], "alice");
2063 assert_eq!(parts[2], "snapshot.alice.range.parquet");
2064
2065 assert!(final_path.is_file());
2067 let tmp = final_path.with_extension("parquet.tmp");
2068 assert!(
2069 !tmp.exists(),
2070 "tmp should have been renamed away; still at {}",
2071 tmp.display()
2072 );
2073
2074 let loaded = storage.load_events_for_tenant("alice").unwrap();
2076 assert_eq!(loaded.len(), 3);
2077 }
2078
2079 #[test]
2080 fn test_write_atomic_parquet_rejects_empty_events() {
2081 let temp_dir = TempDir::new().unwrap();
2082 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2083 let result = storage.write_atomic_parquet("alice", "snap", &[]);
2084 assert!(result.is_err());
2085 }
2086
2087 #[test]
2088 fn test_write_atomic_parquet_rejects_unsafe_tenant() {
2089 let temp_dir = TempDir::new().unwrap();
2090 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2091 let events = [event_with_tenant("alice", "e-0")];
2092 for unsafe_tid in ["..", "a/b", ""] {
2093 let result = storage.write_atomic_parquet(unsafe_tid, "snap", &events);
2094 assert!(
2095 result.is_err(),
2096 "unsafe tenant_id {unsafe_tid:?} should have been rejected"
2097 );
2098 }
2099 }
2100
2101 #[test]
2102 fn test_cleanup_partial_writes_removes_orphan_tmps() {
2103 let temp_dir = TempDir::new().unwrap();
2107 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2108
2109 for i in 0..2 {
2112 storage
2113 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2114 .unwrap();
2115 }
2116 storage.flush().unwrap();
2117 let real_files_before = find_parquet_files_recursive(temp_dir.path()).unwrap();
2118 assert_eq!(real_files_before.len(), 1);
2119
2120 let alice_subtree = temp_dir.path().join("alice");
2122 let orphan_dir = real_files_before[0].parent().unwrap();
2123 let orphan_path = orphan_dir.join("snapshot.alice.crashed.parquet.tmp");
2124 std::fs::write(&orphan_path, b"fake partial parquet").unwrap();
2125 assert!(orphan_path.is_file());
2126
2127 let nested_dir = alice_subtree.join("2099-01");
2129 std::fs::create_dir_all(&nested_dir).unwrap();
2130 let nested_orphan = nested_dir.join("events-x.parquet.tmp");
2131 std::fs::write(&nested_orphan, b"junk").unwrap();
2132
2133 let removed = storage.cleanup_partial_writes().unwrap();
2134 assert_eq!(removed, 2, "two orphan tmps should have been cleaned");
2135 assert!(!orphan_path.exists());
2136 assert!(!nested_orphan.exists());
2137
2138 let real_files_after = find_parquet_files_recursive(temp_dir.path()).unwrap();
2140 assert_eq!(real_files_after, real_files_before);
2141 }
2142
2143 #[test]
2144 fn test_new_calls_cleanup_partial_writes_on_boot() {
2145 let temp_dir = TempDir::new().unwrap();
2149 let stale = temp_dir.path().join("orphan.parquet.tmp");
2150 std::fs::write(&stale, b"crash detritus").unwrap();
2151 assert!(stale.is_file());
2152
2153 let _storage = ParquetStorage::new(temp_dir.path()).unwrap();
2154 assert!(
2155 !stale.exists(),
2156 "stale tmp should have been cleaned by ParquetStorage::new"
2157 );
2158 }
2159
2160 fn seed_flat_layout_file(storage: &ParquetStorage, count: usize) -> PathBuf {
2170 for i in 0..count {
2171 storage
2172 .append_event(create_test_event(&format!("entity-{i}")))
2173 .unwrap();
2174 }
2175 storage.flush().unwrap();
2176
2177 let default_subtree = storage.storage_dir().join("default");
2182 let candidates = find_parquet_files_recursive(&default_subtree).unwrap();
2183 assert!(
2184 !candidates.is_empty(),
2185 "seed expected at least one file under default/"
2186 );
2187 let src = candidates.into_iter().max().unwrap();
2188
2189 let dst = storage.storage_dir().join(src.file_name().unwrap());
2190 std::fs::rename(&src, &dst).unwrap();
2191 if let Some(month_dir) = src.parent() {
2195 let _ = std::fs::remove_dir(month_dir);
2196 if let Some(tenant_dir) = month_dir.parent() {
2197 let _ = std::fs::remove_dir(tenant_dir);
2198 }
2199 }
2200 dst
2201 }
2202
2203 #[test]
2204 fn test_migrate_flat_layout_dry_run_touches_nothing() {
2205 let temp_dir = TempDir::new().unwrap();
2206 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2207 let flat = seed_flat_layout_file(&storage, 7);
2208 assert!(flat.is_file(), "test setup: flat file should exist");
2209
2210 let report = storage.migrate_flat_layout(true).unwrap();
2211 assert!(report.dry_run);
2212 assert_eq!(report.flat_files_seen, 1);
2213 assert_eq!(report.events_migrated, 7);
2214 assert_eq!(report.flat_files_removed, 0);
2215 assert_eq!(report.partitions_written, 0);
2216 assert!(
2217 flat.is_file(),
2218 "flat file must still be present after dry run"
2219 );
2220 }
2221
2222 #[test]
2223 fn test_migrate_flat_layout_moves_events_into_default_tree_and_removes_flat() {
2224 let temp_dir = TempDir::new().unwrap();
2225 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2226 let flat = seed_flat_layout_file(&storage, 5);
2227
2228 let report = storage.migrate_flat_layout(false).unwrap();
2229 assert!(!report.dry_run);
2230 assert_eq!(report.flat_files_seen, 1);
2231 assert_eq!(report.flat_files_removed, 1);
2232 assert_eq!(report.events_migrated, 5);
2233 assert!(report.partitions_written >= 1);
2234 assert!(
2235 !flat.exists(),
2236 "flat file should be deleted after migration"
2237 );
2238
2239 let post = find_parquet_files_recursive(temp_dir.path()).unwrap();
2240 assert!(
2241 post.iter().all(|p| {
2242 let rel = p
2243 .strip_prefix(temp_dir.path())
2244 .unwrap()
2245 .to_string_lossy()
2246 .into_owned();
2247 rel.starts_with(&format!("default{}", std::path::MAIN_SEPARATOR))
2248 }),
2249 "all migrated files should be under default/"
2250 );
2251
2252 let loaded = storage.load_all_events().unwrap();
2253 assert_eq!(loaded.len(), 5);
2254 }
2255
2256 #[test]
2257 fn test_migrate_flat_layout_is_idempotent_when_re_run_after_completion() {
2258 let temp_dir = TempDir::new().unwrap();
2259 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2260 let _flat = seed_flat_layout_file(&storage, 4);
2261
2262 let first = storage.migrate_flat_layout(false).unwrap();
2263 assert_eq!(first.events_migrated, 4);
2264
2265 let second = storage.migrate_flat_layout(false).unwrap();
2268 assert_eq!(second.flat_files_seen, 0);
2269 assert_eq!(second.events_migrated, 0);
2270 assert_eq!(second.flat_files_removed, 0);
2271
2272 let loaded = storage.load_all_events().unwrap();
2273 assert_eq!(loaded.len(), 4, "rerun must not duplicate or lose events");
2274 }
2275
2276 #[test]
2277 fn test_migrate_flat_layout_ignores_already_partitioned_data() {
2278 let temp_dir = TempDir::new().unwrap();
2281 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2282
2283 for i in 0..3 {
2284 storage
2285 .append_event(event_with_tenant("alice", &format!("a-{i}")))
2286 .unwrap();
2287 }
2288 storage.flush().unwrap();
2289
2290 let _flat = seed_flat_layout_file(&storage, 2);
2291
2292 let report = storage.migrate_flat_layout(false).unwrap();
2293 assert_eq!(report.flat_files_seen, 1, "only the flat file is in scope");
2294 assert_eq!(report.events_migrated, 2);
2295
2296 let alice_files = find_parquet_files_recursive(&temp_dir.path().join("alice")).unwrap();
2297 assert_eq!(alice_files.len(), 1, "alice's tree must be untouched");
2298
2299 let loaded = storage.load_all_events().unwrap();
2300 assert_eq!(loaded.len(), 5);
2301 let alice_count = loaded
2302 .iter()
2303 .filter(|e| e.tenant_id_str() == "alice")
2304 .count();
2305 let default_count = loaded
2306 .iter()
2307 .filter(|e| e.tenant_id_str() == "default")
2308 .count();
2309 assert_eq!(alice_count, 3);
2310 assert_eq!(default_count, 2);
2311 }
2312
2313 #[test]
2314 fn test_migrate_flat_layout_with_no_flat_files_is_a_clean_noop() {
2315 let temp_dir = TempDir::new().unwrap();
2316 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2317 let report = storage.migrate_flat_layout(false).unwrap();
2318 assert_eq!(report.flat_files_seen, 0);
2319 assert_eq!(report.events_migrated, 0);
2320 }
2321}