Skip to main content

allsource_core/infrastructure/persistence/
storage.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4};
5use arrow::{
6    array::{
7        Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8        UInt64Builder,
9    },
10    datatypes::{DataType, Field, Schema, TimeUnit},
11    record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15    collections::HashMap,
16    fs::{self, File},
17    path::{Path, PathBuf},
18    sync::{
19        Arc, Mutex,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25/// Default batch size for Parquet writes (10,000 events as per US-023)
26pub const DEFAULT_BATCH_SIZE: usize = 10_000;
27
28/// Default flush timeout in milliseconds
29pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
30
31/// Configuration for ParquetStorage batch processing
32#[derive(Debug, Clone)]
33pub struct ParquetStorageConfig {
34    /// Batch size before automatic flush (default: 10,000)
35    pub batch_size: usize,
36    /// Timeout before flushing partial batch (default: 5 seconds)
37    pub flush_timeout: Duration,
38    /// Compression codec for Parquet files
39    pub compression: parquet::basic::Compression,
40}
41
42impl Default for ParquetStorageConfig {
43    fn default() -> Self {
44        Self {
45            batch_size: DEFAULT_BATCH_SIZE,
46            flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
47            compression: parquet::basic::Compression::SNAPPY,
48        }
49    }
50}
51
52impl ParquetStorageConfig {
53    /// High-throughput configuration optimized for large batch writes
54    pub fn high_throughput() -> Self {
55        Self {
56            batch_size: 50_000,
57            flush_timeout: Duration::from_secs(10),
58            compression: parquet::basic::Compression::SNAPPY,
59        }
60    }
61
62    /// Low-latency configuration for smaller, more frequent writes
63    pub fn low_latency() -> Self {
64        Self {
65            batch_size: 1_000,
66            flush_timeout: Duration::from_secs(1),
67            compression: parquet::basic::Compression::SNAPPY,
68        }
69    }
70}
71
72/// Statistics for batch write operations
73#[derive(Debug, Clone, Default)]
74pub struct BatchWriteStats {
75    /// Total batches written
76    pub batches_written: u64,
77    /// Total events written
78    pub events_written: u64,
79    /// Total bytes written
80    pub bytes_written: u64,
81    /// Average batch size
82    pub avg_batch_size: f64,
83    /// Events per second (throughput)
84    pub events_per_sec: f64,
85    /// Total write time in nanoseconds
86    pub total_write_time_ns: u64,
87    /// Number of timeout-triggered flushes
88    pub timeout_flushes: u64,
89    /// Number of size-triggered flushes
90    pub size_flushes: u64,
91}
92
93/// Result of a batch write operation
94#[derive(Debug, Clone)]
95pub struct BatchWriteResult {
96    /// Number of events written
97    pub events_written: usize,
98    /// Number of batches flushed to disk
99    pub batches_flushed: usize,
100    /// Total duration of the write operation
101    pub duration: Duration,
102    /// Write throughput in events per second
103    pub events_per_sec: f64,
104}
105
106/// Parquet-based persistent storage for events with batch processing
107///
108/// Features:
109/// - Configurable batch size (default: 10,000 events per US-023)
110/// - Timeout-based flushing for partial batches
111/// - Thread-safe batch accumulation
112/// - SNAPPY compression for efficient storage
113/// - Automatic flush on shutdown via Drop
114pub struct ParquetStorage {
115    /// Base directory for storing parquet files
116    storage_dir: PathBuf,
117
118    /// Buffered events keyed by tenant_id. Each tenant accumulates its own
119    /// batch and flushes independently into its partition under
120    /// `storage_dir/<tenant_id>/<yyyy-mm>/`. Single outer mutex protects the
121    /// whole map: lookup is O(1), tenant cardinality is low (single digits
122    /// today; bounded by Step 3's cache budget later), so contention is
123    /// fine. We keep the mutex held only for the push, not for disk I/O —
124    /// flush takes ownership of a tenant's batch via remove() and writes
125    /// after the lock is released.
126    current_batches: Mutex<HashMap<String, Vec<Event>>>,
127
128    /// Configuration
129    config: ParquetStorageConfig,
130
131    /// Schema for Arrow/Parquet
132    schema: Arc<Schema>,
133
134    /// Last flush timestamp for timeout tracking
135    last_flush_time: Mutex<Instant>,
136
137    /// Statistics tracking
138    batches_written: AtomicU64,
139    events_written: AtomicU64,
140    bytes_written: AtomicU64,
141    total_write_time_ns: AtomicU64,
142    timeout_flushes: AtomicU64,
143    size_flushes: AtomicU64,
144}
145
146impl ParquetStorage {
147    /// Create a new ParquetStorage with default configuration (10,000 event batches)
148    pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
149        Self::with_config(storage_dir, ParquetStorageConfig::default())
150    }
151
152    /// Create a new ParquetStorage with custom configuration
153    pub fn with_config(
154        storage_dir: impl AsRef<Path>,
155        config: ParquetStorageConfig,
156    ) -> Result<Self> {
157        let storage_dir = storage_dir.as_ref().to_path_buf();
158
159        // Create storage directory if it doesn't exist
160        fs::create_dir_all(&storage_dir).map_err(|e| {
161            AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
162        })?;
163
164        // Define Arrow schema for events
165        let schema = Arc::new(Schema::new(vec![
166            Field::new("event_id", DataType::Utf8, false),
167            Field::new("event_type", DataType::Utf8, false),
168            Field::new("entity_id", DataType::Utf8, false),
169            Field::new("payload", DataType::Utf8, false),
170            Field::new(
171                "timestamp",
172                DataType::Timestamp(TimeUnit::Microsecond, None),
173                false,
174            ),
175            Field::new("metadata", DataType::Utf8, true),
176            Field::new("version", DataType::UInt64, false),
177        ]));
178
179        let storage = Self {
180            storage_dir,
181            current_batches: Mutex::new(HashMap::new()),
182            config,
183            schema,
184            last_flush_time: Mutex::new(Instant::now()),
185            batches_written: AtomicU64::new(0),
186            events_written: AtomicU64::new(0),
187            bytes_written: AtomicU64::new(0),
188            total_write_time_ns: AtomicU64::new(0),
189            timeout_flushes: AtomicU64::new(0),
190            size_flushes: AtomicU64::new(0),
191        };
192
193        // Boot-time crash recovery (Step 4 of the sustainable data
194        // strategy): clean up any leftover *.parquet.tmp files from
195        // a snapshot write that crashed mid-rename. Safe to delete
196        // unconditionally — the constituent raw files are still on
197        // disk (the snapshot pipeline deletes them only AFTER the
198        // rename succeeds).
199        match storage.cleanup_partial_writes() {
200            Ok(0) => {}
201            Ok(n) => tracing::warn!(
202                "cleaned up {n} orphan snapshot tmp file(s) on boot — \
203                 a previous run crashed mid-snapshot"
204            ),
205            Err(e) => tracing::error!("cleanup_partial_writes failed on boot: {e}"),
206        }
207
208        Ok(storage)
209    }
210
211    /// Create storage with legacy batch size (1000) for backward compatibility
212    #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
213    pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
214        Self::with_config(
215            storage_dir,
216            ParquetStorageConfig {
217                batch_size: 1000,
218                ..Default::default()
219            },
220        )
221    }
222
223    /// Add an event to the current batch
224    ///
225    /// Events are routed to a per-tenant batch keyed by `event.tenant_id_str()`.
226    /// A tenant's batch is buffered until any of:
227    /// - That tenant's batch hits the configured `batch_size` (default 10,000)
228    ///   — flushes only that tenant, not the whole world
229    /// - The flush timeout elapses — flushes every tenant with pending data
230    /// - `flush()` is called explicitly
231    /// - The process shuts down
232    #[cfg_attr(feature = "hotpath", hotpath::measure)]
233    pub fn append_event(&self, event: Event) -> Result<()> {
234        let tenant = event.tenant_id_str().to_string();
235        let should_flush_tenant = {
236            let mut batches = self.current_batches.lock().unwrap();
237            let entry = batches.entry(tenant.clone()).or_default();
238            entry.push(event);
239            entry.len() >= self.config.batch_size
240        };
241
242        if should_flush_tenant {
243            self.size_flushes.fetch_add(1, Ordering::Relaxed);
244            self.flush_tenant(&tenant)?;
245        }
246
247        Ok(())
248    }
249
250    /// Add multiple events to the batch (optimized batch insertion)
251    ///
252    /// Preferred entry point for high-throughput ingestion. Events are
253    /// grouped by tenant under a single mutex acquisition and any tenant
254    /// that crosses `batch_size` is flushed on the spot.
255    #[cfg_attr(feature = "hotpath", hotpath::measure)]
256    pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
257        let start = Instant::now();
258        let event_count = events.len();
259
260        // Pre-group by tenant to keep the lock window short — one acquire,
261        // one extend per tenant, decide which tenants are over threshold.
262        let mut grouped: HashMap<String, Vec<Event>> = HashMap::new();
263        for event in events {
264            grouped
265                .entry(event.tenant_id_str().to_string())
266                .or_default()
267                .push(event);
268        }
269
270        let mut tenants_to_flush: Vec<String> = Vec::new();
271        {
272            let mut batches = self.current_batches.lock().unwrap();
273            for (tenant, mut new_events) in grouped {
274                let entry = batches.entry(tenant.clone()).or_default();
275                entry.append(&mut new_events);
276                if entry.len() >= self.config.batch_size {
277                    tenants_to_flush.push(tenant);
278                }
279            }
280        }
281
282        let mut batches_flushed = 0;
283        for tenant in tenants_to_flush {
284            self.size_flushes.fetch_add(1, Ordering::Relaxed);
285            self.flush_tenant(&tenant)?;
286            batches_flushed += 1;
287        }
288
289        let duration = start.elapsed();
290
291        Ok(BatchWriteResult {
292            events_written: event_count,
293            batches_flushed,
294            duration,
295            events_per_sec: event_count as f64 / duration.as_secs_f64(),
296        })
297    }
298
299    /// Check if a timeout-based flush is needed and perform it
300    ///
301    /// Call this periodically (e.g., from a background task) to ensure
302    /// partial batches are flushed within the configured timeout. When
303    /// triggered, every tenant with pending events flushes — the timer is
304    /// global, not per-tenant, so a slow-trickle tenant doesn't get
305    /// stranded waiting for its own batch to fill.
306    #[cfg_attr(feature = "hotpath", hotpath::measure)]
307    pub fn check_timeout_flush(&self) -> Result<bool> {
308        let should_flush = {
309            let last_flush = self.last_flush_time.lock().unwrap();
310            let batches = self.current_batches.lock().unwrap();
311            let any_pending = batches.values().any(|v| !v.is_empty());
312            any_pending && last_flush.elapsed() >= self.config.flush_timeout
313        };
314
315        if should_flush {
316            self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
317            self.flush()?;
318            Ok(true)
319        } else {
320            Ok(false)
321        }
322    }
323
324    /// Flush every tenant's pending batch to its partition.
325    ///
326    /// Thread-safe: callable from any thread. A snapshot of which tenants
327    /// have pending data is taken under a short lock; each tenant is then
328    /// flushed individually with its own lock cycle, so disk I/O for one
329    /// tenant doesn't block writes against another.
330    #[cfg_attr(feature = "hotpath", hotpath::measure)]
331    pub fn flush(&self) -> Result<()> {
332        let tenants: Vec<String> = {
333            let batches = self.current_batches.lock().unwrap();
334            batches
335                .iter()
336                .filter(|(_, v)| !v.is_empty())
337                .map(|(k, _)| k.clone())
338                .collect()
339        };
340        if tenants.is_empty() {
341            return Ok(());
342        }
343        for tenant in tenants {
344            self.flush_tenant(&tenant)?;
345        }
346        Ok(())
347    }
348
349    /// Flush a single tenant's pending events into its partition file.
350    ///
351    /// File path: `storage_dir/<sanitized_tenant_id>/<yyyy-mm>/events-<ts>-<uuid>.parquet`.
352    /// `<yyyy-mm>` is taken from the wall-clock at flush time (matching the
353    /// pre-tenant filename's timestamp semantics) rather than from
354    /// individual event timestamps — keeps each flush to a single output
355    /// file even when buffered events span months.
356    fn flush_tenant(&self, tenant_id: &str) -> Result<()> {
357        let events_to_write = {
358            let mut batches = self.current_batches.lock().unwrap();
359            match batches.get_mut(tenant_id) {
360                Some(v) if !v.is_empty() => std::mem::take(v),
361                _ => return Ok(()),
362            }
363        };
364
365        let batch_count = events_to_write.len();
366        let start = Instant::now();
367
368        let record_batch = self.events_to_record_batch(&events_to_write)?;
369
370        let now = chrono::Utc::now();
371        let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, now)?;
372        fs::create_dir_all(&partition_dir).map_err(|e| {
373            AllSourceError::StorageError(format!(
374                "Failed to create tenant partition {}: {e}",
375                partition_dir.display()
376            ))
377        })?;
378        let filename = format!(
379            "events-{}-{}.parquet",
380            now.format("%Y%m%d-%H%M%S%3f"),
381            uuid::Uuid::new_v4().as_simple()
382        );
383        let file_path = partition_dir.join(&filename);
384
385        tracing::info!(
386            "Flushing {} events for tenant={} to {}",
387            batch_count,
388            tenant_id,
389            file_path.display()
390        );
391
392        let file = File::create(&file_path).map_err(|e| {
393            AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
394        })?;
395
396        let props = WriterProperties::builder()
397            .set_compression(self.config.compression)
398            .build();
399
400        let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
401        writer.write(&record_batch)?;
402        let file_metadata = writer.close()?;
403
404        let duration = start.elapsed();
405
406        self.batches_written.fetch_add(1, Ordering::Relaxed);
407        self.events_written
408            .fetch_add(batch_count as u64, Ordering::Relaxed);
409        if let Some(size) = file_metadata
410            .row_groups()
411            .first()
412            .map(parquet::file::metadata::RowGroupMetaData::total_byte_size)
413        {
414            self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
415        }
416        self.total_write_time_ns
417            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
418
419        {
420            let mut last_flush = self.last_flush_time.lock().unwrap();
421            *last_flush = Instant::now();
422        }
423
424        tracing::info!(
425            "Wrote {} events for tenant={} to {} in {:?}",
426            batch_count,
427            tenant_id,
428            file_path.display(),
429            duration
430        );
431
432        Ok(())
433    }
434
435    /// Atomically write `events` to a Parquet file under the tenant's
436    /// partition. Step 4 of the sustainable data strategy uses this to
437    /// emit per-tenant snapshot/compaction files (`snapshot.<tenant>.<from>-<to>`)
438    /// without risking partial files on crash.
439    ///
440    /// Crash-safety contract:
441    /// 1. Write to `<final_path>.tmp` first.
442    /// 2. fsync the .tmp file so the data is durably on disk.
443    /// 3. Rename `.tmp` → final name (atomic POSIX rename).
444    /// 4. fsync the parent directory so the rename is durable.
445    ///
446    /// On any failure mid-way, the .tmp file gets cleaned up by
447    /// `cleanup_partial_writes` on next boot. The final file appears
448    /// atomically — readers either see the old state (no file) or
449    /// the complete new file, never a half-written one.
450    ///
451    /// `file_stem` is the filename without extension or partition path
452    /// — caller-controlled so the snapshot naming convention
453    /// (`snapshot.<tenant>.<from>-<to>`) lives in the compaction layer,
454    /// not here. The `.parquet` extension is appended automatically.
455    ///
456    /// Returns the final (post-rename) path so the caller can
457    /// confirm the file landed where expected.
458    pub fn write_atomic_parquet(
459        &self,
460        tenant_id: &str,
461        file_stem: &str,
462        events: &[Event],
463    ) -> Result<PathBuf> {
464        if events.is_empty() {
465            return Err(AllSourceError::StorageError(
466                "write_atomic_parquet called with empty event slice".to_string(),
467            ));
468        }
469        // Anchor partition by the earliest event's month — keeps the
470        // file in the same yyyy-mm bucket as the data it represents.
471        // Callers that span multiple months can use the earliest
472        // event's month and trust the recursive walker to find it.
473        let anchor_ts = events
474            .iter()
475            .map(|e| e.timestamp)
476            .min()
477            .unwrap_or_else(chrono::Utc::now);
478        let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, anchor_ts)?;
479        fs::create_dir_all(&partition_dir).map_err(|e| {
480            AllSourceError::StorageError(format!(
481                "Failed to create tenant partition {}: {e}",
482                partition_dir.display()
483            ))
484        })?;
485
486        let final_path = partition_dir.join(format!("{file_stem}.parquet"));
487        let tmp_path = partition_dir.join(format!("{file_stem}.parquet.tmp"));
488
489        // 1. Build the Arrow batch and write to .tmp.
490        let record_batch = self.events_to_record_batch(events)?;
491        {
492            let file = File::create(&tmp_path).map_err(|e| {
493                AllSourceError::StorageError(format!(
494                    "Failed to create snapshot tmp file {}: {e}",
495                    tmp_path.display()
496                ))
497            })?;
498
499            let props = WriterProperties::builder()
500                .set_compression(self.config.compression)
501                .build();
502
503            let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
504            writer.write(&record_batch)?;
505            // close() consumes writer and returns the file metadata; the
506            // underlying File is flushed and closed here. Take the file
507            // back out for fsync.
508            let _meta = writer.close()?;
509        }
510
511        // 2. fsync the .tmp file.
512        let tmp_file = File::open(&tmp_path).map_err(|e| {
513            AllSourceError::StorageError(format!(
514                "Failed to reopen snapshot tmp for fsync {}: {e}",
515                tmp_path.display()
516            ))
517        })?;
518        tmp_file.sync_all().map_err(|e| {
519            AllSourceError::StorageError(format!("fsync on snapshot tmp failed: {e}"))
520        })?;
521        drop(tmp_file);
522
523        // 3. Atomic rename.
524        fs::rename(&tmp_path, &final_path).map_err(|e| {
525            AllSourceError::StorageError(format!(
526                "Failed to rename {} → {}: {e}",
527                tmp_path.display(),
528                final_path.display()
529            ))
530        })?;
531
532        // 4. fsync the parent directory so the rename survives crash.
533        // Linux fsync-on-dir is the canonical way to make a rename
534        // durable; macOS no-ops the dir fsync but doesn't error.
535        if let Ok(dir) = File::open(&partition_dir) {
536            let _ = dir.sync_all();
537        }
538
539        tracing::info!(
540            tenant_id = tenant_id,
541            file = %final_path.display(),
542            event_count = events.len(),
543            "wrote atomic snapshot file"
544        );
545
546        Ok(final_path)
547    }
548
549    /// Sweep the storage tree for any leftover `*.parquet.tmp` files
550    /// — crash detritus from a snapshot write that didn't complete
551    /// the rename. Called on boot from `ParquetStorage::new` so
552    /// every cold start starts clean.
553    ///
554    /// Safe to delete unconditionally: `write_atomic_parquet`'s
555    /// rename-after-fsync contract means the only way a `.tmp` file
556    /// survives is if Core died between fsync and rename. The events
557    /// in that file are still in the constituent raw files (the
558    /// caller deletes those AFTER the rename succeeds), so the data
559    /// isn't lost — we just need to retry the snapshot.
560    ///
561    /// Returns the count of files deleted, for observability.
562    pub fn cleanup_partial_writes(&self) -> Result<usize> {
563        let mut deleted = 0usize;
564        let mut stack: Vec<PathBuf> = vec![self.storage_dir.clone()];
565        while let Some(dir) = stack.pop() {
566            let Ok(entries) = fs::read_dir(&dir) else {
567                continue;
568            };
569            for entry in entries.flatten() {
570                let path = entry.path();
571                let Ok(ft) = entry.file_type() else { continue };
572                if ft.is_dir() {
573                    stack.push(path);
574                } else if ft.is_file() && path.to_string_lossy().ends_with(".parquet.tmp") {
575                    match fs::remove_file(&path) {
576                        Ok(()) => {
577                            tracing::warn!(
578                                file = %path.display(),
579                                "cleaned up orphan snapshot tmp file (crash recovery)"
580                            );
581                            deleted += 1;
582                        }
583                        Err(e) => {
584                            tracing::error!(
585                                file = %path.display(),
586                                "failed to remove orphan snapshot tmp file: {e}"
587                            );
588                        }
589                    }
590                }
591            }
592        }
593        Ok(deleted)
594    }
595
596    /// Force flush any remaining events (for shutdown handling).
597    ///
598    /// Sums pending counts across every tenant's batch so the caller can
599    /// log "we flushed N events on shutdown" without caring about
600    /// per-tenant breakdown.
601    #[cfg_attr(feature = "hotpath", hotpath::measure)]
602    pub fn flush_on_shutdown(&self) -> Result<usize> {
603        let total_pending: usize = {
604            let batches = self.current_batches.lock().unwrap();
605            batches.values().map(Vec::len).sum()
606        };
607
608        if total_pending > 0 {
609            tracing::info!(
610                "Shutdown: flushing {} pending events across all tenants",
611                total_pending
612            );
613            self.flush()?;
614        }
615
616        Ok(total_pending)
617    }
618
619    /// Get batch write statistics
620    pub fn batch_stats(&self) -> BatchWriteStats {
621        let batches = self.batches_written.load(Ordering::Relaxed);
622        let events = self.events_written.load(Ordering::Relaxed);
623        let bytes = self.bytes_written.load(Ordering::Relaxed);
624        let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
625
626        let time_secs = time_ns as f64 / 1_000_000_000.0;
627
628        BatchWriteStats {
629            batches_written: batches,
630            events_written: events,
631            bytes_written: bytes,
632            avg_batch_size: if batches > 0 {
633                events as f64 / batches as f64
634            } else {
635                0.0
636            },
637            events_per_sec: if time_secs > 0.0 {
638                events as f64 / time_secs
639            } else {
640                0.0
641            },
642            total_write_time_ns: time_ns,
643            timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
644            size_flushes: self.size_flushes.load(Ordering::Relaxed),
645        }
646    }
647
648    /// Total pending events across all tenant batches.
649    pub fn pending_count(&self) -> usize {
650        self.current_batches
651            .lock()
652            .unwrap()
653            .values()
654            .map(Vec::len)
655            .sum()
656    }
657
658    /// Get configured batch size
659    pub fn batch_size(&self) -> usize {
660        self.config.batch_size
661    }
662
663    /// Get configured flush timeout
664    pub fn flush_timeout(&self) -> Duration {
665        self.config.flush_timeout
666    }
667
668    /// Convert events to Arrow RecordBatch
669    #[cfg_attr(feature = "hotpath", hotpath::measure)]
670    fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
671        let mut event_id_builder = StringBuilder::new();
672        let mut event_type_builder = StringBuilder::new();
673        let mut entity_id_builder = StringBuilder::new();
674        let mut payload_builder = StringBuilder::new();
675        let mut timestamp_builder = TimestampMicrosecondBuilder::new();
676        let mut metadata_builder = StringBuilder::new();
677        let mut version_builder = UInt64Builder::new();
678
679        for event in events {
680            event_id_builder.append_value(event.id.to_string());
681            event_type_builder.append_value(event.event_type_str());
682            entity_id_builder.append_value(event.entity_id_str());
683            payload_builder.append_value(serde_json::to_string(&event.payload)?);
684
685            // Convert timestamp to microseconds
686            let timestamp_micros = event.timestamp.timestamp_micros();
687            timestamp_builder.append_value(timestamp_micros);
688
689            if let Some(ref metadata) = event.metadata {
690                metadata_builder.append_value(serde_json::to_string(metadata)?);
691            } else {
692                metadata_builder.append_null();
693            }
694
695            version_builder.append_value(event.version as u64);
696        }
697
698        let arrays: Vec<ArrayRef> = vec![
699            Arc::new(event_id_builder.finish()),
700            Arc::new(event_type_builder.finish()),
701            Arc::new(entity_id_builder.finish()),
702            Arc::new(payload_builder.finish()),
703            Arc::new(timestamp_builder.finish()),
704            Arc::new(metadata_builder.finish()),
705            Arc::new(version_builder.finish()),
706        ];
707
708        let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
709
710        Ok(record_batch)
711    }
712
713    /// Load events from all Parquet files under the storage directory.
714    ///
715    /// Walks the tree recursively so both layouts work: legacy flat
716    /// (`storage_dir/events-*.parquet`) and the tenant-partitioned tree
717    /// introduced by the data-strategy work (`storage_dir/<tenant>/<yyyy-mm>/
718    /// events-*.parquet`). The two coexist on disk during migration.
719    #[cfg_attr(feature = "hotpath", hotpath::measure)]
720    pub fn load_all_events(&self) -> Result<Vec<Event>> {
721        let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
722
723        let mut all_events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
724        for file_path in parquet_files {
725            tracing::info!("Loading events from {}", file_path.display());
726            let tenant_id = tenant_id_from_path(&self.storage_dir, &file_path);
727            let file_events = self.load_events_from_file(&file_path, &tenant_id)?;
728            all_events.extend(file_events);
729        }
730
731        tracing::info!("Loaded {} total events from storage", all_events.len());
732
733        Ok(all_events)
734    }
735
736    /// Load events from a single Parquet file. `tenant_id` is the value to
737    /// stamp onto each loaded event — derived from the file's location in
738    /// the tree by `load_all_events`. The Parquet schema doesn't include
739    /// tenant_id today (path is the source of truth), so this is how
740    /// per-tenant identity survives the round trip.
741    #[cfg_attr(feature = "hotpath", hotpath::measure)]
742    fn load_events_from_file(&self, file_path: &Path, tenant_id: &str) -> Result<Vec<Event>> {
743        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
744
745        let file = File::open(file_path).map_err(|e| {
746            AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
747        })?;
748
749        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
750        let mut reader = builder.build()?;
751
752        let mut events = Vec::new();
753
754        while let Some(Ok(batch)) = reader.next() {
755            let batch_events = self.record_batch_to_events(&batch, tenant_id)?;
756            events.extend(batch_events);
757        }
758
759        Ok(events)
760    }
761
762    /// Public wrapper around the internal single-file loader. Used
763    /// by the per-tenant compaction (Step 4) which needs to read a
764    /// specific candidate set rather than the whole tenant subtree.
765    /// Caller passes the tenant_id explicitly because the schema
766    /// doesn't carry it.
767    pub fn load_events_from_file_path(
768        &self,
769        file_path: &Path,
770        tenant_id: &str,
771    ) -> Result<Vec<Event>> {
772        self.load_events_from_file(file_path, tenant_id)
773    }
774
775    /// Convert Arrow RecordBatch back to events. `tenant_id` is stamped onto
776    /// each reconstructed event — the schema doesn't carry it today, so the
777    /// caller passes the value derived from the file path.
778    #[cfg_attr(feature = "hotpath", hotpath::measure)]
779    fn record_batch_to_events(&self, batch: &RecordBatch, tenant_id: &str) -> Result<Vec<Event>> {
780        let event_ids = batch
781            .column(0)
782            .as_any()
783            .downcast_ref::<arrow::array::StringArray>()
784            .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
785
786        let event_types = batch
787            .column(1)
788            .as_any()
789            .downcast_ref::<arrow::array::StringArray>()
790            .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
791
792        let entity_ids = batch
793            .column(2)
794            .as_any()
795            .downcast_ref::<arrow::array::StringArray>()
796            .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
797
798        let payloads = batch
799            .column(3)
800            .as_any()
801            .downcast_ref::<arrow::array::StringArray>()
802            .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
803
804        let timestamps = batch
805            .column(4)
806            .as_any()
807            .downcast_ref::<TimestampMicrosecondArray>()
808            .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
809
810        let metadatas = batch
811            .column(5)
812            .as_any()
813            .downcast_ref::<arrow::array::StringArray>()
814            .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
815
816        let versions = batch
817            .column(6)
818            .as_any()
819            .downcast_ref::<arrow::array::UInt64Array>()
820            .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
821
822        let mut events = Vec::new();
823
824        for i in 0..batch.num_rows() {
825            let id = uuid::Uuid::parse_str(event_ids.value(i))
826                .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
827
828            let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
829                .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
830
831            let metadata = if metadatas.is_null(i) {
832                None
833            } else {
834                Some(serde_json::from_str(metadatas.value(i))?)
835            };
836
837            let event = Event::reconstruct_from_strings(
838                id,
839                event_types.value(i).to_string(),
840                entity_ids.value(i).to_string(),
841                tenant_id.to_string(),
842                serde_json::from_str(payloads.value(i))?,
843                timestamp,
844                metadata,
845                versions.value(i) as i64,
846            );
847
848            events.push(event);
849        }
850
851        Ok(events)
852    }
853
854    /// List all Parquet file paths under the storage directory, sorted by
855    /// the relative path so files in the same partition stay grouped.
856    ///
857    /// Used by the replication catch-up protocol to stream snapshot files
858    /// to followers that are too far behind for WAL-only catch-up.
859    pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
860        find_parquet_files_recursive(&self.storage_dir)
861    }
862
863    /// List Parquet files belonging to a single tenant — i.e. only files
864    /// under `<storage_dir>/<tenant>/...`. Legacy flat-layout files at the
865    /// root are intentionally excluded; the migration tool moves them under
866    /// `default/` so once it has run a `tenant=default` query sees them.
867    ///
868    /// Returns an empty vec when the tenant subtree doesn't exist (no data
869    /// for that tenant yet). Returns an error only if `tenant_id` fails the
870    /// path-safety whitelist.
871    ///
872    /// This is the building block for tenant-scoped reads: the caller knows
873    /// which files might contain the tenant's data without opening any of
874    /// the others.
875    pub fn list_parquet_files_for_tenant(&self, tenant_id: &str) -> Result<Vec<PathBuf>> {
876        let safe = sanitize_tenant_id_for_path(tenant_id)?;
877        let tenant_root = self.storage_dir.join(safe);
878        if !tenant_root.is_dir() {
879            return Ok(Vec::new());
880        }
881        find_parquet_files_recursive(&tenant_root)
882    }
883
884    /// Load only the events belonging to `tenant_id`, walking just that
885    /// tenant's subtree on disk. The full-storage loader
886    /// (`load_all_events`) opens every Parquet file regardless of tenant;
887    /// this one only opens files under `<storage_dir>/<tenant>/`.
888    ///
889    /// Returns an empty vec when the tenant has no on-disk data. Returns
890    /// an error if the tenant_id fails the path-safety whitelist or any
891    /// individual file fails to load.
892    ///
893    /// This is the read-side complement to per-tenant flushing. It's the
894    /// foundation Step 2 (lazy per-tenant load on demand) needs: a way to
895    /// hydrate one tenant without paying the cost of loading every other
896    /// tenant's data into memory.
897    ///
898    /// Tenant identity for loaded events comes from the file path, the
899    /// same as `load_all_events` — `record_batch_to_events` stamps the
900    /// passed `tenant_id` onto every reconstructed event.
901    #[cfg_attr(feature = "hotpath", hotpath::measure)]
902    pub fn load_events_for_tenant(&self, tenant_id: &str) -> Result<Vec<Event>> {
903        let parquet_files = self.list_parquet_files_for_tenant(tenant_id)?;
904        tracing::info!(
905            tenant_id = tenant_id,
906            file_count = parquet_files.len(),
907            "load_events_for_tenant: walking tenant subtree only"
908        );
909
910        let mut events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
911        for file_path in parquet_files {
912            tracing::debug!(
913                tenant_id = tenant_id,
914                file = %file_path.display(),
915                "load_events_for_tenant: opening file"
916            );
917            let file_events = self.load_events_from_file(&file_path, tenant_id)?;
918            events.extend(file_events);
919        }
920
921        tracing::info!(
922            tenant_id = tenant_id,
923            event_count = events.len(),
924            "load_events_for_tenant: complete"
925        );
926        Ok(events)
927    }
928
929    /// Get the storage directory path.
930    pub fn storage_dir(&self) -> &Path {
931        &self.storage_dir
932    }
933
934    /// One-shot migration of flat-layout files into the tenant-partitioned
935    /// tree. Run with Core stopped (no concurrent writes).
936    ///
937    /// Walks `storage_dir`'s top level (non-recursive) for the legacy
938    /// `events-*.parquet` files. For each one it loads the events,
939    /// regroups them by (tenant_id, yyyy-mm) — events from a flat file
940    /// take the path-derived "default" tenant, since pre-partitioning
941    /// data carried no tenant in its on-disk form — writes a fresh
942    /// Parquet under the corresponding partition directory, and deletes
943    /// the original flat file once the new file is closed.
944    ///
945    /// `dry_run = true` reports what would happen without touching disk.
946    /// Run dry first; production data deserves the rehearsal.
947    ///
948    /// Crash safety: this writes the new partition file before deleting
949    /// the flat file, so a crash between the two leaves both on disk.
950    /// The recursive loader (`load_all_events`) will then return both,
951    /// duplicating those events on next boot. Mitigation: stop Core
952    /// before running, and re-run the migration after any crash so the
953    /// flat file gets deleted. A future commit can add atomic rename +
954    /// fsync semantics; for the one-time migration the stop-Core
955    /// constraint is enough.
956    pub fn migrate_flat_layout(&self, dry_run: bool) -> Result<MigrationReport> {
957        let flat_files = list_flat_layout_files(&self.storage_dir)?;
958        let mut report = MigrationReport {
959            dry_run,
960            ..Default::default()
961        };
962
963        for flat_file in flat_files {
964            // Pre-partition events used path-derived tenant. For flat-layout
965            // files that's always "default" (`tenant_id_from_path` falls back
966            // to "default" for single-component paths).
967            let events = self.load_events_from_file(&flat_file, "default")?;
968            report.flat_files_seen += 1;
969
970            if events.is_empty() {
971                // Stale empty file (zero rows). Just remove it.
972                if !dry_run {
973                    fs::remove_file(&flat_file).map_err(|e| {
974                        AllSourceError::StorageError(format!(
975                            "Failed to remove empty flat file {}: {e}",
976                            flat_file.display()
977                        ))
978                    })?;
979                }
980                report.flat_files_removed += 1;
981                continue;
982            }
983
984            // Group by (tenant, yyyy-mm-from-event-timestamp). The
985            // partition month tracks the event's wall-clock time so that
986            // post-migration the layout reflects when data happened, not
987            // when migration ran. Step 4 (per-tenant snapshots) and Step
988            // 5 (retention) will key on that.
989            let mut groups: HashMap<(String, String), Vec<Event>> = HashMap::new();
990            for event in events {
991                let key = (
992                    event.tenant_id_str().to_string(),
993                    event.timestamp().format("%Y-%m").to_string(),
994                );
995                groups.entry(key).or_default().push(event);
996            }
997
998            for ((tenant, yyyy_mm), group_events) in groups {
999                let count = group_events.len();
1000                if !dry_run {
1001                    let safe_tenant = sanitize_tenant_id_for_path(&tenant)?;
1002                    let target_dir = self.storage_dir.join(safe_tenant).join(&yyyy_mm);
1003                    fs::create_dir_all(&target_dir).map_err(|e| {
1004                        AllSourceError::StorageError(format!(
1005                            "Failed to create partition {}: {e}",
1006                            target_dir.display()
1007                        ))
1008                    })?;
1009                    let filename = format!(
1010                        "events-{}-{}.parquet",
1011                        chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
1012                        uuid::Uuid::new_v4().as_simple()
1013                    );
1014                    let target_path = target_dir.join(&filename);
1015                    let record_batch = self.events_to_record_batch(&group_events)?;
1016                    let file = File::create(&target_path).map_err(|e| {
1017                        AllSourceError::StorageError(format!(
1018                            "Failed to create migration target {}: {e}",
1019                            target_path.display()
1020                        ))
1021                    })?;
1022                    let props = WriterProperties::builder()
1023                        .set_compression(self.config.compression)
1024                        .build();
1025                    let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
1026                    writer.write(&record_batch)?;
1027                    writer.close()?;
1028                    report.partitions_written += 1;
1029                }
1030                report.events_migrated += count;
1031            }
1032
1033            if !dry_run {
1034                fs::remove_file(&flat_file).map_err(|e| {
1035                    AllSourceError::StorageError(format!(
1036                        "Failed to remove flat file {} after migration: {e}",
1037                        flat_file.display()
1038                    ))
1039                })?;
1040                report.flat_files_removed += 1;
1041            }
1042        }
1043
1044        Ok(report)
1045    }
1046
1047    /// Get storage statistics
1048    pub fn stats(&self) -> Result<StorageStats> {
1049        let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
1050        let mut total_size_bytes = 0u64;
1051        for path in &parquet_files {
1052            if let Ok(metadata) = fs::metadata(path) {
1053                total_size_bytes += metadata.len();
1054            }
1055        }
1056
1057        let current_batch_size: usize = self
1058            .current_batches
1059            .lock()
1060            .unwrap()
1061            .values()
1062            .map(Vec::len)
1063            .sum();
1064
1065        Ok(StorageStats {
1066            total_files: parquet_files.len(),
1067            total_size_bytes,
1068            storage_dir: self.storage_dir.clone(),
1069            current_batch_size,
1070        })
1071    }
1072}
1073
1074/// Validate a tenant ID for use as a filesystem path component.
1075///
1076/// Whitelist: ASCII letters, digits, `-`, `_`, `.` — covers UUIDs, the
1077/// hyphen-and-lowercase tenant strings the onboarding flow produces, and
1078/// the `system` tenant the heartbeat emitter uses. Rejects empty input,
1079/// any path separator (`/`, `\`), and any "..". The whitelist is the
1080/// primary defence against path traversal; the explicit ".." check is
1081/// belt-and-braces in case the whitelist ever loosens.
1082///
1083/// Length capped at 128 bytes — comfortably above the 36-byte UUID and
1084/// the longest onboarding tenant the system has produced, well below
1085/// every common filesystem's NAME_MAX (typically 255).
1086fn sanitize_tenant_id_for_path(tenant_id: &str) -> Result<&str> {
1087    if tenant_id.is_empty() {
1088        return Err(AllSourceError::StorageError(
1089            "tenant_id is empty (cannot derive partition path)".to_string(),
1090        ));
1091    }
1092    if tenant_id.len() > 128 {
1093        return Err(AllSourceError::StorageError(format!(
1094            "tenant_id is too long for partition path: {} bytes (max 128)",
1095            tenant_id.len()
1096        )));
1097    }
1098    if tenant_id == "." || tenant_id == ".." {
1099        return Err(AllSourceError::StorageError(format!(
1100            "tenant_id {tenant_id:?} is reserved"
1101        )));
1102    }
1103    for c in tenant_id.chars() {
1104        let ok = c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.';
1105        if !ok {
1106            return Err(AllSourceError::StorageError(format!(
1107                "tenant_id {tenant_id:?} contains disallowed character {c:?} for partition path"
1108            )));
1109        }
1110    }
1111    Ok(tenant_id)
1112}
1113
1114/// Resolve the directory a flush should write into for `(tenant, when)`.
1115///
1116/// Returns `<root>/<tenant>/<yyyy-mm>/`. Caller is responsible for
1117/// `create_dir_all`-ing the result before opening files in it.
1118fn partition_path_for_tenant(
1119    root: &Path,
1120    tenant_id: &str,
1121    when: chrono::DateTime<chrono::Utc>,
1122) -> Result<PathBuf> {
1123    let safe = sanitize_tenant_id_for_path(tenant_id)?;
1124    Ok(root.join(safe).join(when.format("%Y-%m").to_string()))
1125}
1126
1127/// Reverse of `partition_path_for_tenant` — given a parquet file's full
1128/// path and the storage root, return the tenant_id stored in the path.
1129///
1130/// Tenant-partitioned shape: `<root>/<tenant>/<yyyy-mm>/events-*.parquet`
1131/// → first component after root is the tenant.
1132///
1133/// Legacy flat shape: `<root>/events-*.parquet` → no tenant in path, fall
1134/// back to `"default"` so events written before the partitioning change
1135/// keep loading with their original (and only ever) tenant identity.
1136fn tenant_id_from_path(root: &Path, file_path: &Path) -> String {
1137    let Ok(rel) = file_path.strip_prefix(root) else {
1138        return "default".to_string();
1139    };
1140    let mut comps = rel.components();
1141    let first = comps.next();
1142    let next = comps.next();
1143    match (first, next) {
1144        // Two or more components: <tenant>/<rest>... → tenant
1145        (Some(std::path::Component::Normal(tenant)), Some(_)) => {
1146            tenant.to_string_lossy().into_owned()
1147        }
1148        // Single component (the parquet file itself): legacy flat layout.
1149        _ => "default".to_string(),
1150    }
1151}
1152
1153/// List Parquet files at the top level of `root` only — i.e. the legacy
1154/// flat-layout files. Used by the one-shot migration tool to find data
1155/// that needs moving into the tenant-partitioned tree. The opposite of
1156/// `find_parquet_files_recursive`: stops at the first directory level so
1157/// already-partitioned data isn't included.
1158fn list_flat_layout_files(root: &Path) -> Result<Vec<PathBuf>> {
1159    let entries = fs::read_dir(root).map_err(|e| {
1160        AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
1161    })?;
1162    let mut out: Vec<PathBuf> = entries
1163        .filter_map(std::result::Result::ok)
1164        .filter_map(|entry| {
1165            let ft = entry.file_type().ok()?;
1166            if !ft.is_file() {
1167                return None;
1168            }
1169            let path = entry.path();
1170            if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
1171                Some(path)
1172            } else {
1173                None
1174            }
1175        })
1176        .collect();
1177    out.sort();
1178    Ok(out)
1179}
1180
1181/// Recursively collect all `*.parquet` files under `root`, sorted by path so
1182/// callers see a deterministic, tenant-grouped order.
1183///
1184/// Existence rationale: the storage layout is moving from a flat
1185/// `storage_dir/events-*.parquet` pile to a tenant-partitioned tree of the
1186/// shape `storage_dir/<tenant>/<yyyy-mm>/events-*.parquet`. During the
1187/// migration both shapes coexist, so every code path that asks "what
1188/// parquet files do we have?" needs to walk subdirectories. Symlinks are
1189/// not followed — the storage tree is mounted from a single volume and
1190/// chasing symlinks invites cycles.
1191fn find_parquet_files_recursive(root: &Path) -> Result<Vec<PathBuf>> {
1192    let mut out = Vec::new();
1193    let mut stack: Vec<PathBuf> = vec![root.to_path_buf()];
1194
1195    while let Some(dir) = stack.pop() {
1196        let entries = match fs::read_dir(&dir) {
1197            Ok(e) => e,
1198            // Root must exist (we created it in `new`); subdirectories may
1199            // race a delete from compaction. Skip vanished subdirs rather
1200            // than failing the whole load.
1201            Err(e) if dir == root => {
1202                return Err(AllSourceError::StorageError(format!(
1203                    "Failed to read storage directory: {e}"
1204                )));
1205            }
1206            Err(_) => continue,
1207        };
1208
1209        for entry in entries.flatten() {
1210            let path = entry.path();
1211            // Use file_type() rather than metadata() so symlinks don't get
1212            // followed by accident (metadata() resolves symlinks, file_type()
1213            // doesn't).
1214            let Ok(ft) = entry.file_type() else {
1215                continue;
1216            };
1217            if ft.is_dir() {
1218                stack.push(path);
1219            } else if ft.is_file()
1220                && path
1221                    .extension()
1222                    .and_then(|ext| ext.to_str())
1223                    .is_some_and(|ext| ext == "parquet")
1224            {
1225                out.push(path);
1226            }
1227        }
1228    }
1229
1230    out.sort();
1231    Ok(out)
1232}
1233
1234impl Drop for ParquetStorage {
1235    fn drop(&mut self) {
1236        // Ensure any remaining events are flushed on shutdown
1237        if let Err(e) = self.flush_on_shutdown() {
1238            tracing::error!("Failed to flush events on drop: {}", e);
1239        }
1240    }
1241}
1242
1243/// Outcome of a `migrate_flat_layout` run.
1244#[derive(Debug, Default, Clone, serde::Serialize)]
1245pub struct MigrationReport {
1246    /// Whether the run was a rehearsal (no disk changes).
1247    pub dry_run: bool,
1248    /// Number of legacy flat-layout files discovered.
1249    pub flat_files_seen: usize,
1250    /// Number of legacy flat files deleted (always 0 when `dry_run`).
1251    pub flat_files_removed: usize,
1252    /// Number of new partition files written under the tenant tree.
1253    pub partitions_written: usize,
1254    /// Total events copied into the new tree (counted in dry-run too).
1255    pub events_migrated: usize,
1256}
1257
1258#[derive(Debug, serde::Serialize)]
1259pub struct StorageStats {
1260    pub total_files: usize,
1261    pub total_size_bytes: u64,
1262    pub storage_dir: PathBuf,
1263    pub current_batch_size: usize,
1264}
1265
1266#[cfg(test)]
1267mod tests {
1268    use super::*;
1269    use serde_json::json;
1270    use std::sync::Arc;
1271    use tempfile::TempDir;
1272
1273    fn create_test_event(entity_id: &str) -> Event {
1274        Event::reconstruct_from_strings(
1275            uuid::Uuid::new_v4(),
1276            "test.event".to_string(),
1277            entity_id.to_string(),
1278            "default".to_string(),
1279            json!({
1280                "test": "data",
1281                "value": 42
1282            }),
1283            chrono::Utc::now(),
1284            None,
1285            1,
1286        )
1287    }
1288
1289    #[test]
1290    fn test_parquet_storage_write_read() {
1291        let temp_dir = TempDir::new().unwrap();
1292        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1293
1294        // Add events
1295        for i in 0..10 {
1296            let event = create_test_event(&format!("entity-{i}"));
1297            storage.append_event(event).unwrap();
1298        }
1299
1300        // Flush to disk
1301        storage.flush().unwrap();
1302
1303        // Load back
1304        let loaded_events = storage.load_all_events().unwrap();
1305        assert_eq!(loaded_events.len(), 10);
1306    }
1307
1308    #[test]
1309    fn test_storage_stats() {
1310        let temp_dir = TempDir::new().unwrap();
1311        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1312
1313        // Add and flush events
1314        for i in 0..5 {
1315            storage
1316                .append_event(create_test_event(&format!("entity-{i}")))
1317                .unwrap();
1318        }
1319        storage.flush().unwrap();
1320
1321        let stats = storage.stats().unwrap();
1322        assert_eq!(stats.total_files, 1);
1323        assert!(stats.total_size_bytes > 0);
1324    }
1325
1326    #[test]
1327    fn test_default_batch_size() {
1328        let temp_dir = TempDir::new().unwrap();
1329        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1330
1331        // Default batch size should be 10,000 as per US-023
1332        assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
1333        assert_eq!(storage.batch_size(), 10_000);
1334    }
1335
1336    #[test]
1337    fn test_custom_config() {
1338        let temp_dir = TempDir::new().unwrap();
1339        let config = ParquetStorageConfig {
1340            batch_size: 5_000,
1341            flush_timeout: Duration::from_secs(2),
1342            compression: parquet::basic::Compression::SNAPPY,
1343        };
1344        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1345
1346        assert_eq!(storage.batch_size(), 5_000);
1347        assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
1348    }
1349
1350    #[test]
1351    fn test_batch_write() {
1352        let temp_dir = TempDir::new().unwrap();
1353        let config = ParquetStorageConfig {
1354            batch_size: 100, // Small batch for testing
1355            ..Default::default()
1356        };
1357        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1358
1359        // 250 events for a single tenant. With per-tenant flush, when the
1360        // tenant's pending batch crosses batch_size we drain the whole
1361        // tenant in one flush — not chunk at exactly batch_size like the
1362        // old global-batch path did. So 250 events triggers exactly one
1363        // size-flush (the appender pushes all 250 onto the tenant's batch
1364        // under one lock, sees length >= 100, schedules a flush which
1365        // drains everything). 0 left pending.
1366        let events: Vec<Event> = (0..250)
1367            .map(|i| create_test_event(&format!("entity-{i}")))
1368            .collect();
1369
1370        let result = storage.batch_write(events).unwrap();
1371        assert_eq!(result.events_written, 250);
1372        assert_eq!(result.batches_flushed, 1);
1373        assert_eq!(storage.pending_count(), 0);
1374
1375        // Manual flush is a no-op since nothing's pending.
1376        storage.flush().unwrap();
1377
1378        // All 250 events round-trip through the tenant-partitioned tree.
1379        let loaded = storage.load_all_events().unwrap();
1380        assert_eq!(loaded.len(), 250);
1381    }
1382
1383    #[test]
1384    fn test_auto_flush_on_batch_size() {
1385        let temp_dir = TempDir::new().unwrap();
1386        let config = ParquetStorageConfig {
1387            batch_size: 10, // Very small for testing
1388            ..Default::default()
1389        };
1390        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1391
1392        // Add 15 events - should auto-flush at 10
1393        for i in 0..15 {
1394            storage
1395                .append_event(create_test_event(&format!("entity-{i}")))
1396                .unwrap();
1397        }
1398
1399        // Should have 5 pending, 10 written
1400        assert_eq!(storage.pending_count(), 5);
1401
1402        let stats = storage.batch_stats();
1403        assert_eq!(stats.events_written, 10);
1404        assert_eq!(stats.batches_written, 1);
1405        assert_eq!(stats.size_flushes, 1);
1406    }
1407
1408    #[test]
1409    fn test_flush_on_shutdown() {
1410        let temp_dir = TempDir::new().unwrap();
1411        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1412
1413        // Add some events without reaching batch size
1414        for i in 0..5 {
1415            storage
1416                .append_event(create_test_event(&format!("entity-{i}")))
1417                .unwrap();
1418        }
1419
1420        assert_eq!(storage.pending_count(), 5);
1421
1422        // Manually trigger shutdown flush
1423        let flushed = storage.flush_on_shutdown().unwrap();
1424        assert_eq!(flushed, 5);
1425        assert_eq!(storage.pending_count(), 0);
1426
1427        // Verify events are persisted
1428        let loaded = storage.load_all_events().unwrap();
1429        assert_eq!(loaded.len(), 5);
1430    }
1431
1432    #[test]
1433    fn test_thread_safe_writes() {
1434        let temp_dir = TempDir::new().unwrap();
1435        let config = ParquetStorageConfig {
1436            batch_size: 100,
1437            ..Default::default()
1438        };
1439        let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
1440
1441        let events_per_thread = 50;
1442        let thread_count = 4;
1443
1444        std::thread::scope(|s| {
1445            for t in 0..thread_count {
1446                let storage_ref = Arc::clone(&storage);
1447                s.spawn(move || {
1448                    for i in 0..events_per_thread {
1449                        let event = create_test_event(&format!("thread-{t}-entity-{i}"));
1450                        storage_ref.append_event(event).unwrap();
1451                    }
1452                });
1453            }
1454        });
1455
1456        // Flush remaining
1457        storage.flush().unwrap();
1458
1459        // All events should be written
1460        let loaded = storage.load_all_events().unwrap();
1461        assert_eq!(loaded.len(), events_per_thread * thread_count);
1462    }
1463
1464    #[test]
1465    fn test_batch_stats() {
1466        let temp_dir = TempDir::new().unwrap();
1467        let config = ParquetStorageConfig {
1468            batch_size: 50,
1469            ..Default::default()
1470        };
1471        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1472
1473        // 100 events, single tenant, batch_size=50. Per-tenant flush
1474        // drains the whole tenant on the first size trigger, so this
1475        // produces exactly one size-flush and one batches_written event
1476        // (vs. the pre-tenant world's two).
1477        let events: Vec<Event> = (0..100)
1478            .map(|i| create_test_event(&format!("entity-{i}")))
1479            .collect();
1480
1481        storage.batch_write(events).unwrap();
1482
1483        let stats = storage.batch_stats();
1484        assert_eq!(stats.batches_written, 1);
1485        assert_eq!(stats.events_written, 100);
1486        assert!(stats.avg_batch_size > 0.0);
1487        assert!(stats.events_per_sec > 0.0);
1488        assert_eq!(stats.size_flushes, 1);
1489    }
1490
1491    #[test]
1492    fn test_config_presets() {
1493        let high_throughput = ParquetStorageConfig::high_throughput();
1494        assert_eq!(high_throughput.batch_size, 50_000);
1495        assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
1496
1497        let low_latency = ParquetStorageConfig::low_latency();
1498        assert_eq!(low_latency.batch_size, 1_000);
1499        assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
1500
1501        let default = ParquetStorageConfig::default();
1502        assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
1503        assert_eq!(default.batch_size, 10_000);
1504    }
1505
1506    /// Benchmark: Compare single-event writes vs batch writes
1507    /// Run with: cargo test --release -- --ignored test_batch_write_throughput
1508    #[test]
1509    #[ignore]
1510    fn test_batch_write_throughput() {
1511        let temp_dir = TempDir::new().unwrap();
1512        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1513
1514        let event_count = 50_000;
1515
1516        // Benchmark batch write
1517        let events: Vec<Event> = (0..event_count)
1518            .map(|i| create_test_event(&format!("entity-{i}")))
1519            .collect();
1520
1521        let start = std::time::Instant::now();
1522        let result = storage.batch_write(events).unwrap();
1523        storage.flush().unwrap(); // Flush any remaining
1524        let batch_duration = start.elapsed();
1525
1526        let batch_stats = storage.batch_stats();
1527
1528        println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
1529        println!("Events: {event_count}");
1530        println!("Duration: {batch_duration:?}");
1531        println!("Events/sec: {:.0}", result.events_per_sec);
1532        println!("Batches written: {}", batch_stats.batches_written);
1533        println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
1534        println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
1535
1536        // Target: Batch writes should achieve at least 100K events/sec in release mode
1537        // This represents 40%+ improvement over single-event writes
1538        assert!(
1539            result.events_per_sec > 10_000.0,
1540            "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
1541            result.events_per_sec
1542        );
1543    }
1544
1545    /// Benchmark: Single-event write baseline (for comparison)
1546    #[test]
1547    #[ignore]
1548    fn test_single_event_write_baseline() {
1549        let temp_dir = TempDir::new().unwrap();
1550        let config = ParquetStorageConfig {
1551            batch_size: 1, // Force flush after each event
1552            ..Default::default()
1553        };
1554        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1555
1556        let event_count = 1_000; // Fewer events since this is slow
1557
1558        let start = std::time::Instant::now();
1559        for i in 0..event_count {
1560            let event = create_test_event(&format!("entity-{i}"));
1561            storage.append_event(event).unwrap();
1562        }
1563        let duration = start.elapsed();
1564
1565        let events_per_sec = f64::from(event_count) / duration.as_secs_f64();
1566
1567        println!("\n=== Single-Event Write Baseline ===");
1568        println!("Events: {event_count}");
1569        println!("Duration: {duration:?}");
1570        println!("Events/sec: {events_per_sec:.0}");
1571
1572        // This should be significantly slower than batch writes
1573        // Used as a baseline to demonstrate 40%+ improvement
1574    }
1575
1576    // -----------------------------------------------------------------
1577    // Tests for the recursive parquet walker (Step 1, commit #1: read-side
1578    // bidirectional layout support — see SUSTAINABLE_DATA_STRATEGY.md).
1579    // -----------------------------------------------------------------
1580
1581    /// Helper: write a tiny placeholder parquet file at an arbitrary path so
1582    /// the walker has something concrete to find. We only care that the
1583    /// walker discovers the path, not that the file is loadable here — the
1584    /// load path is exercised by the existing read tests.
1585    fn touch_parquet(path: &Path) {
1586        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
1587        std::fs::write(path, b"").unwrap();
1588    }
1589
1590    #[test]
1591    fn test_walker_finds_files_in_flat_layout() {
1592        let temp_dir = TempDir::new().unwrap();
1593        let root = temp_dir.path();
1594        touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1595        touch_parquet(&root.join("events-20260101-130000000-bbbb.parquet"));
1596
1597        let mut found = find_parquet_files_recursive(root).unwrap();
1598        found.sort();
1599        assert_eq!(found.len(), 2);
1600        assert!(
1601            found[0]
1602                .file_name()
1603                .unwrap()
1604                .to_str()
1605                .unwrap()
1606                .starts_with("events-"),
1607            "expected events-* file, got {found:?}"
1608        );
1609    }
1610
1611    #[test]
1612    fn test_walker_finds_files_in_tenant_partitioned_tree() {
1613        let temp_dir = TempDir::new().unwrap();
1614        let root = temp_dir.path();
1615        // Tenant-partitioned shape: storage_dir/<tenant>/<yyyy-mm>/events-*.parquet
1616        touch_parquet(&root.join("tenant-a/2026-01/events-20260101-120000000-aaaa.parquet"));
1617        touch_parquet(&root.join("tenant-a/2026-02/events-20260201-120000000-bbbb.parquet"));
1618        touch_parquet(&root.join("tenant-b/2026-01/events-20260103-120000000-cccc.parquet"));
1619
1620        let found = find_parquet_files_recursive(root).unwrap();
1621        assert_eq!(found.len(), 3);
1622        // Sort places tenant-a files before tenant-b — that's the
1623        // tenant-grouping the docs claim.
1624        assert!(found[0].to_str().unwrap().contains("tenant-a"));
1625        assert!(found[1].to_str().unwrap().contains("tenant-a"));
1626        assert!(found[2].to_str().unwrap().contains("tenant-b"));
1627    }
1628
1629    #[test]
1630    fn test_walker_handles_mixed_legacy_and_partitioned_layouts() {
1631        // The migration window: some tenants have been moved into the tree,
1632        // some flat files still sit at the root. The walker must surface
1633        // both so load_all_events sees every event regardless of where it
1634        // currently lives.
1635        let temp_dir = TempDir::new().unwrap();
1636        let root = temp_dir.path();
1637        touch_parquet(&root.join("events-legacy-aaaa.parquet"));
1638        touch_parquet(&root.join("tenant-a/2026-01/events-new-bbbb.parquet"));
1639
1640        let found = find_parquet_files_recursive(root).unwrap();
1641        assert_eq!(found.len(), 2);
1642    }
1643
1644    #[test]
1645    fn test_walker_ignores_non_parquet_files() {
1646        let temp_dir = TempDir::new().unwrap();
1647        let root = temp_dir.path();
1648        std::fs::write(root.join("README.md"), b"hello").unwrap();
1649        std::fs::write(root.join("events.json"), b"[]").unwrap();
1650        touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1651        // Files that just happen to have "parquet" in the name but no
1652        // .parquet extension stay out — extension-only filter, no name match.
1653        std::fs::write(root.join("not-a-parquet-file.bin"), b"").unwrap();
1654
1655        let found = find_parquet_files_recursive(root).unwrap();
1656        assert_eq!(found.len(), 1);
1657        assert_eq!(
1658            found[0].extension().and_then(|s| s.to_str()),
1659            Some("parquet")
1660        );
1661    }
1662
1663    /// Build an event whose tenant_id and entity_id we control, so tests
1664    /// can verify per-tenant routing without depending on the helper that
1665    /// hardcodes "default".
1666    fn event_with_tenant(tenant: &str, entity_id: &str) -> Event {
1667        Event::reconstruct_from_strings(
1668            uuid::Uuid::new_v4(),
1669            "test.event".to_string(),
1670            entity_id.to_string(),
1671            tenant.to_string(),
1672            json!({"k": "v"}),
1673            chrono::Utc::now(),
1674            None,
1675            1,
1676        )
1677    }
1678
1679    #[test]
1680    fn test_flush_writes_into_per_tenant_partition() {
1681        // End-to-end check that the new write path produces
1682        // <root>/<tenant>/<yyyy-mm>/events-*.parquet — no flat file at the
1683        // root, no cross-tenant mixing.
1684        let temp_dir = TempDir::new().unwrap();
1685        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1686
1687        for i in 0..3 {
1688            storage
1689                .append_event(event_with_tenant("default", &format!("entity-{i}")))
1690                .unwrap();
1691        }
1692        storage.flush().unwrap();
1693
1694        let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1695        assert_eq!(parquet_files.len(), 1);
1696
1697        let rel = parquet_files[0]
1698            .strip_prefix(temp_dir.path())
1699            .unwrap()
1700            .to_string_lossy()
1701            .into_owned();
1702        // Path shape: default/<yyyy-mm>/events-*.parquet
1703        let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
1704        assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
1705        assert_eq!(parts[0], "default");
1706        // yyyy-mm is two digits dash four — loose check, exact month
1707        // varies with wall-clock at test runtime.
1708        assert!(
1709            parts[1].len() == 7 && parts[1].as_bytes()[4] == b'-',
1710            "expected yyyy-mm, got {}",
1711            parts[1]
1712        );
1713        assert!(parts[2].starts_with("events-") && parts[2].ends_with(".parquet"));
1714
1715        let loaded = storage.load_all_events().unwrap();
1716        assert_eq!(loaded.len(), 3);
1717    }
1718
1719    #[test]
1720    fn test_multiple_tenants_get_isolated_subtrees() {
1721        // Per-tenant flush must not mix tenants into the same Parquet file
1722        // and must put each tenant under its own subdirectory.
1723        let temp_dir = TempDir::new().unwrap();
1724        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1725
1726        for i in 0..2 {
1727            storage
1728                .append_event(event_with_tenant("alice", &format!("a-{i}")))
1729                .unwrap();
1730        }
1731        for i in 0..3 {
1732            storage
1733                .append_event(event_with_tenant("bob", &format!("b-{i}")))
1734                .unwrap();
1735        }
1736        storage.flush().unwrap();
1737
1738        let alice_subtree = temp_dir.path().join("alice");
1739        let bob_subtree = temp_dir.path().join("bob");
1740        assert!(alice_subtree.is_dir(), "alice should have its own subtree");
1741        assert!(bob_subtree.is_dir(), "bob should have its own subtree");
1742
1743        let alice_files = find_parquet_files_recursive(&alice_subtree).unwrap();
1744        let bob_files = find_parquet_files_recursive(&bob_subtree).unwrap();
1745        assert_eq!(alice_files.len(), 1);
1746        assert_eq!(bob_files.len(), 1);
1747
1748        // Loaded events keep their tenant_id — round-trip preserves which
1749        // tenant each event belonged to.
1750        let loaded = storage.load_all_events().unwrap();
1751        let (alice_count, bob_count) =
1752            loaded
1753                .iter()
1754                .fold((0, 0), |(a, b), e| match e.tenant_id_str() {
1755                    "alice" => (a + 1, b),
1756                    "bob" => (a, b + 1),
1757                    _ => (a, b),
1758                });
1759        assert_eq!(alice_count, 2);
1760        assert_eq!(bob_count, 3);
1761    }
1762
1763    #[test]
1764    fn test_size_flush_only_drains_full_tenant() {
1765        // When one tenant exactly hits batch_size, only that tenant
1766        // flushes; the other tenant keeps its events buffered. Prevents
1767        // one noisy tenant from causing fragmented writes for everyone.
1768        let temp_dir = TempDir::new().unwrap();
1769        let config = ParquetStorageConfig {
1770            batch_size: 5,
1771            ..Default::default()
1772        };
1773        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1774
1775        // Alice: 5 events → on the 5th, len == batch_size triggers flush
1776        // which drains all 5. Alice ends empty.
1777        for i in 0..5 {
1778            storage
1779                .append_event(event_with_tenant("alice", &format!("a-{i}")))
1780                .unwrap();
1781        }
1782        // Bob: 2 events → still under threshold, stays pending.
1783        for i in 0..2 {
1784            storage
1785                .append_event(event_with_tenant("bob", &format!("b-{i}")))
1786                .unwrap();
1787        }
1788
1789        assert_eq!(
1790            storage.pending_count(),
1791            2,
1792            "only bob's 2 events should be pending"
1793        );
1794
1795        let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1796        assert_eq!(parquet_files.len(), 1, "only alice should have flushed");
1797        assert!(
1798            parquet_files[0]
1799                .to_string_lossy()
1800                .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
1801            "expected alice partition, got {}",
1802            parquet_files[0].display()
1803        );
1804    }
1805
1806    #[test]
1807    fn test_tenant_id_from_path_recovers_tenant_for_partitioned_files() {
1808        let root = Path::new("/data/storage");
1809        let f = Path::new("/data/storage/alice/2026-04/events-20260426-120000000-aaaa.parquet");
1810        assert_eq!(tenant_id_from_path(root, f), "alice");
1811    }
1812
1813    #[test]
1814    fn test_tenant_id_from_path_falls_back_to_default_for_legacy_flat_layout() {
1815        let root = Path::new("/data/storage");
1816        let f = Path::new("/data/storage/events-20260426-120000000-aaaa.parquet");
1817        // Legacy single-component path. Pre-tenant data was always
1818        // commingled with tenant=default, so default is the right fallback.
1819        assert_eq!(tenant_id_from_path(root, f), "default");
1820    }
1821
1822    #[test]
1823    fn test_sanitize_tenant_id_for_path_accepts_safe_inputs() {
1824        for ok in [
1825            "default",
1826            "system",
1827            "1e6b2d1c-2f64-4441-9cf9-42f2e451aa17",
1828            "onboard-diagnostic-160-at-example-com",
1829            "tenant_with_underscore",
1830            "v1.0",
1831        ] {
1832            assert!(
1833                sanitize_tenant_id_for_path(ok).is_ok(),
1834                "{ok:?} should be accepted"
1835            );
1836        }
1837    }
1838
1839    #[test]
1840    fn test_sanitize_tenant_id_for_path_rejects_unsafe_inputs() {
1841        for bad in [
1842            "",         // empty
1843            "..",       // parent traversal
1844            ".",        // current dir
1845            "foo/bar",  // path separator
1846            "foo\\bar", // windows-style separator
1847            "foo bar",  // whitespace
1848            "foo\nbar", // newline
1849            "foo\0bar", // null byte
1850            "tenant?",  // shell glob char
1851            "tenant*",  // shell glob char
1852        ] {
1853            assert!(
1854                sanitize_tenant_id_for_path(bad).is_err(),
1855                "{bad:?} should be rejected"
1856            );
1857        }
1858
1859        // Length cap.
1860        let too_long = "a".repeat(129);
1861        assert!(sanitize_tenant_id_for_path(&too_long).is_err());
1862    }
1863
1864    #[test]
1865    fn test_partition_path_for_tenant_shape() {
1866        let root = Path::new("/data");
1867        let when = chrono::DateTime::parse_from_rfc3339("2026-04-26T12:00:00Z")
1868            .unwrap()
1869            .with_timezone(&chrono::Utc);
1870        let path = partition_path_for_tenant(root, "alice", when).unwrap();
1871        assert_eq!(path, Path::new("/data/alice/2026-04"));
1872    }
1873
1874    #[test]
1875    fn test_append_event_rejects_unsafe_tenant_at_flush() {
1876        // Defence in depth: even if some upstream forgets to validate, the
1877        // sanitizer in flush_tenant catches it. Since append doesn't write
1878        // synchronously, the bad tenant is rejected on the first flush
1879        // attempt. We test that flush surfaces an error rather than
1880        // silently writing somewhere weird.
1881        let temp_dir = TempDir::new().unwrap();
1882        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1883
1884        // append accepts whatever tenant_id the event carries — domain
1885        // construction would normally reject this, but if it slipped
1886        // through, flush should refuse to derive a path from it.
1887        storage
1888            .append_event(event_with_tenant("../escape", "e-0"))
1889            .unwrap();
1890        let result = storage.flush();
1891        assert!(result.is_err(), "flush should reject unsafe tenant_id");
1892        let msg = format!("{}", result.unwrap_err());
1893        assert!(
1894            msg.contains("disallowed character") || msg.contains("reserved"),
1895            "expected sanitization error message, got: {msg}"
1896        );
1897    }
1898
1899    // -----------------------------------------------------------------
1900    // Tenant-pruned read tests (Step 1, commit #4).
1901    // -----------------------------------------------------------------
1902
1903    #[test]
1904    fn test_load_events_for_tenant_only_walks_target_subtree() {
1905        // Seed three tenants with distinct event counts. Loading one
1906        // tenant must return only that tenant's events — and the file
1907        // list helper must report only that tenant's files (the strong
1908        // form of "didn't open the others").
1909        let temp_dir = TempDir::new().unwrap();
1910        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1911
1912        for i in 0..2 {
1913            storage
1914                .append_event(event_with_tenant("alice", &format!("a-{i}")))
1915                .unwrap();
1916        }
1917        for i in 0..3 {
1918            storage
1919                .append_event(event_with_tenant("bob", &format!("b-{i}")))
1920                .unwrap();
1921        }
1922        for i in 0..1 {
1923            storage
1924                .append_event(event_with_tenant("carol", &format!("c-{i}")))
1925                .unwrap();
1926        }
1927        storage.flush().unwrap();
1928
1929        let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
1930        assert_eq!(alice_files.len(), 1);
1931        assert!(
1932            alice_files[0]
1933                .to_string_lossy()
1934                .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
1935            "expected alice file, got {}",
1936            alice_files[0].display()
1937        );
1938        // The pruned listing must NOT include any bob/carol files — this
1939        // is the property Step 2 will rely on to avoid loading every
1940        // tenant's data on a single-tenant query.
1941        for f in &alice_files {
1942            let s = f.to_string_lossy();
1943            assert!(!s.contains("bob"), "alice listing leaked bob file: {s}");
1944            assert!(!s.contains("carol"), "alice listing leaked carol file: {s}");
1945        }
1946
1947        let alice_events = storage.load_events_for_tenant("alice").unwrap();
1948        assert_eq!(alice_events.len(), 2);
1949        for e in &alice_events {
1950            assert_eq!(e.tenant_id_str(), "alice");
1951        }
1952
1953        let bob_events = storage.load_events_for_tenant("bob").unwrap();
1954        assert_eq!(bob_events.len(), 3);
1955        for e in &bob_events {
1956            assert_eq!(e.tenant_id_str(), "bob");
1957        }
1958
1959        let carol_events = storage.load_events_for_tenant("carol").unwrap();
1960        assert_eq!(carol_events.len(), 1);
1961        assert_eq!(carol_events[0].tenant_id_str(), "carol");
1962    }
1963
1964    #[test]
1965    fn test_load_events_for_tenant_returns_empty_when_subtree_missing() {
1966        // Querying a tenant that has never written must not error — it's
1967        // a normal "no data" case, not a misconfiguration. Important for
1968        // first-query latency on a fresh tenant.
1969        let temp_dir = TempDir::new().unwrap();
1970        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1971
1972        // Seed only alice so the storage_dir isn't empty (rule out the
1973        // empty-dir trivial case).
1974        storage
1975            .append_event(event_with_tenant("alice", "a-0"))
1976            .unwrap();
1977        storage.flush().unwrap();
1978
1979        let files = storage
1980            .list_parquet_files_for_tenant("nobody-here")
1981            .unwrap();
1982        assert!(files.is_empty());
1983
1984        let events = storage.load_events_for_tenant("nobody-here").unwrap();
1985        assert!(events.is_empty());
1986    }
1987
1988    #[test]
1989    fn test_load_events_for_tenant_rejects_unsafe_tenant_id() {
1990        // Path traversal must fail at the API boundary, not after disk
1991        // reads. Same whitelist as the write path.
1992        let temp_dir = TempDir::new().unwrap();
1993        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1994
1995        for unsafe_tid in ["..", "a/b", "a\\b", "", "a..b/.."] {
1996            let result = storage.load_events_for_tenant(unsafe_tid);
1997            assert!(
1998                result.is_err(),
1999                "tenant_id {unsafe_tid:?} should have been rejected"
2000            );
2001        }
2002    }
2003
2004    #[test]
2005    fn test_load_events_for_tenant_ignores_legacy_flat_layout_files() {
2006        // Flat-layout files at the storage root predate partitioning. A
2007        // tenant-scoped load must not pick them up — the migration tool
2008        // is what relocates them under default/. Until it runs, those
2009        // files are invisible to per-tenant queries (correct behavior:
2010        // the system has no way to tell which tenant they belong to
2011        // beyond "default", and pretending otherwise would mis-attribute
2012        // them).
2013        let temp_dir = TempDir::new().unwrap();
2014        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2015
2016        // Seed a flat-layout file (relocates default/<yyyy-mm>/ → root).
2017        let _flat = seed_flat_layout_file(&storage, 4);
2018
2019        // Querying default returns nothing — the flat file at the root
2020        // isn't under default/.
2021        let default_events = storage.load_events_for_tenant("default").unwrap();
2022        assert!(
2023            default_events.is_empty(),
2024            "tenant-scoped load must not pick up flat-layout files; got {} events",
2025            default_events.len()
2026        );
2027
2028        // Sanity: the full loader still sees them via the recursive walk.
2029        let all_events = storage.load_all_events().unwrap();
2030        assert_eq!(all_events.len(), 4);
2031    }
2032
2033    // -----------------------------------------------------------------
2034    // Atomic snapshot write tests (Step 4, commit #1).
2035    // -----------------------------------------------------------------
2036
2037    #[test]
2038    fn test_write_atomic_parquet_emits_file_under_tenant_partition() {
2039        // Happy path: write 3 events for tenant alice, get back a
2040        // path under <root>/alice/<yyyy-mm>/. The .tmp file should
2041        // be gone (rename completed); the final file readable via
2042        // load_events_for_tenant.
2043        let temp_dir = TempDir::new().unwrap();
2044        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2045
2046        let events: Vec<Event> = (0..3)
2047            .map(|i| event_with_tenant("alice", &format!("a-{i}")))
2048            .collect();
2049
2050        let final_path = storage
2051            .write_atomic_parquet("alice", "snapshot.alice.range", &events)
2052            .unwrap();
2053
2054        // Path shape: <root>/alice/<yyyy-mm>/snapshot.alice.range.parquet
2055        let rel = final_path
2056            .strip_prefix(temp_dir.path())
2057            .unwrap()
2058            .to_string_lossy()
2059            .into_owned();
2060        let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
2061        assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
2062        assert_eq!(parts[0], "alice");
2063        assert_eq!(parts[2], "snapshot.alice.range.parquet");
2064
2065        // Final file must exist; .tmp must NOT.
2066        assert!(final_path.is_file());
2067        let tmp = final_path.with_extension("parquet.tmp");
2068        assert!(
2069            !tmp.exists(),
2070            "tmp should have been renamed away; still at {}",
2071            tmp.display()
2072        );
2073
2074        // Loadable via the tenant loader.
2075        let loaded = storage.load_events_for_tenant("alice").unwrap();
2076        assert_eq!(loaded.len(), 3);
2077    }
2078
2079    #[test]
2080    fn test_write_atomic_parquet_rejects_empty_events() {
2081        let temp_dir = TempDir::new().unwrap();
2082        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2083        let result = storage.write_atomic_parquet("alice", "snap", &[]);
2084        assert!(result.is_err());
2085    }
2086
2087    #[test]
2088    fn test_write_atomic_parquet_rejects_unsafe_tenant() {
2089        let temp_dir = TempDir::new().unwrap();
2090        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2091        let events = [event_with_tenant("alice", "e-0")];
2092        for unsafe_tid in ["..", "a/b", ""] {
2093            let result = storage.write_atomic_parquet(unsafe_tid, "snap", &events);
2094            assert!(
2095                result.is_err(),
2096                "unsafe tenant_id {unsafe_tid:?} should have been rejected"
2097            );
2098        }
2099    }
2100
2101    #[test]
2102    fn test_cleanup_partial_writes_removes_orphan_tmps() {
2103        // Simulate a crashed mid-snapshot: pretend we have a leftover
2104        // events-x.parquet.tmp file in a tenant partition. Cleanup
2105        // must delete it without touching real .parquet files.
2106        let temp_dir = TempDir::new().unwrap();
2107        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2108
2109        // Seed a real parquet via a normal flush, so we have one
2110        // legit file we don't want to delete.
2111        for i in 0..2 {
2112            storage
2113                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2114                .unwrap();
2115        }
2116        storage.flush().unwrap();
2117        let real_files_before = find_parquet_files_recursive(temp_dir.path()).unwrap();
2118        assert_eq!(real_files_before.len(), 1);
2119
2120        // Manufacture an orphan .tmp file in alice's partition.
2121        let alice_subtree = temp_dir.path().join("alice");
2122        let orphan_dir = real_files_before[0].parent().unwrap();
2123        let orphan_path = orphan_dir.join("snapshot.alice.crashed.parquet.tmp");
2124        std::fs::write(&orphan_path, b"fake partial parquet").unwrap();
2125        assert!(orphan_path.is_file());
2126
2127        // And one nested deeper, just to confirm recursion.
2128        let nested_dir = alice_subtree.join("2099-01");
2129        std::fs::create_dir_all(&nested_dir).unwrap();
2130        let nested_orphan = nested_dir.join("events-x.parquet.tmp");
2131        std::fs::write(&nested_orphan, b"junk").unwrap();
2132
2133        let removed = storage.cleanup_partial_writes().unwrap();
2134        assert_eq!(removed, 2, "two orphan tmps should have been cleaned");
2135        assert!(!orphan_path.exists());
2136        assert!(!nested_orphan.exists());
2137
2138        // Real parquet untouched.
2139        let real_files_after = find_parquet_files_recursive(temp_dir.path()).unwrap();
2140        assert_eq!(real_files_after, real_files_before);
2141    }
2142
2143    #[test]
2144    fn test_new_calls_cleanup_partial_writes_on_boot() {
2145        // Drop a stale .tmp into a directory, then construct a
2146        // fresh ParquetStorage on it. The constructor must clean
2147        // it up.
2148        let temp_dir = TempDir::new().unwrap();
2149        let stale = temp_dir.path().join("orphan.parquet.tmp");
2150        std::fs::write(&stale, b"crash detritus").unwrap();
2151        assert!(stale.is_file());
2152
2153        let _storage = ParquetStorage::new(temp_dir.path()).unwrap();
2154        assert!(
2155            !stale.exists(),
2156            "stale tmp should have been cleaned by ParquetStorage::new"
2157        );
2158    }
2159
2160    // -----------------------------------------------------------------
2161    // Migration tests (Step 1, commit #3: flat → tenant-tree migration).
2162    // -----------------------------------------------------------------
2163
2164    /// Helper: produce a flat-layout Parquet file at the storage root,
2165    /// matching what pre-#2 deploys wrote. Uses the existing flush path
2166    /// briefly and then relocates the resulting file from
2167    /// default/<yyyy-mm>/ back up to the root, simulating the legacy
2168    /// state.
2169    fn seed_flat_layout_file(storage: &ParquetStorage, count: usize) -> PathBuf {
2170        for i in 0..count {
2171            storage
2172                .append_event(create_test_event(&format!("entity-{i}")))
2173                .unwrap();
2174        }
2175        storage.flush().unwrap();
2176
2177        // create_test_event uses tenant="default", so the just-flushed file
2178        // landed under <root>/default/<yyyy-mm>/. Find the newest file in
2179        // that subtree to avoid picking up files from other tenants seeded
2180        // by the test before us.
2181        let default_subtree = storage.storage_dir().join("default");
2182        let candidates = find_parquet_files_recursive(&default_subtree).unwrap();
2183        assert!(
2184            !candidates.is_empty(),
2185            "seed expected at least one file under default/"
2186        );
2187        let src = candidates.into_iter().max().unwrap();
2188
2189        let dst = storage.storage_dir().join(src.file_name().unwrap());
2190        std::fs::rename(&src, &dst).unwrap();
2191        // Best-effort cleanup of the now-empty intermediate dirs so the
2192        // migration tool only sees the flat file. (`remove_dir` succeeds
2193        // only on empty dirs, which is exactly the safety we want here.)
2194        if let Some(month_dir) = src.parent() {
2195            let _ = std::fs::remove_dir(month_dir);
2196            if let Some(tenant_dir) = month_dir.parent() {
2197                let _ = std::fs::remove_dir(tenant_dir);
2198            }
2199        }
2200        dst
2201    }
2202
2203    #[test]
2204    fn test_migrate_flat_layout_dry_run_touches_nothing() {
2205        let temp_dir = TempDir::new().unwrap();
2206        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2207        let flat = seed_flat_layout_file(&storage, 7);
2208        assert!(flat.is_file(), "test setup: flat file should exist");
2209
2210        let report = storage.migrate_flat_layout(true).unwrap();
2211        assert!(report.dry_run);
2212        assert_eq!(report.flat_files_seen, 1);
2213        assert_eq!(report.events_migrated, 7);
2214        assert_eq!(report.flat_files_removed, 0);
2215        assert_eq!(report.partitions_written, 0);
2216        assert!(
2217            flat.is_file(),
2218            "flat file must still be present after dry run"
2219        );
2220    }
2221
2222    #[test]
2223    fn test_migrate_flat_layout_moves_events_into_default_tree_and_removes_flat() {
2224        let temp_dir = TempDir::new().unwrap();
2225        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2226        let flat = seed_flat_layout_file(&storage, 5);
2227
2228        let report = storage.migrate_flat_layout(false).unwrap();
2229        assert!(!report.dry_run);
2230        assert_eq!(report.flat_files_seen, 1);
2231        assert_eq!(report.flat_files_removed, 1);
2232        assert_eq!(report.events_migrated, 5);
2233        assert!(report.partitions_written >= 1);
2234        assert!(
2235            !flat.exists(),
2236            "flat file should be deleted after migration"
2237        );
2238
2239        let post = find_parquet_files_recursive(temp_dir.path()).unwrap();
2240        assert!(
2241            post.iter().all(|p| {
2242                let rel = p
2243                    .strip_prefix(temp_dir.path())
2244                    .unwrap()
2245                    .to_string_lossy()
2246                    .into_owned();
2247                rel.starts_with(&format!("default{}", std::path::MAIN_SEPARATOR))
2248            }),
2249            "all migrated files should be under default/"
2250        );
2251
2252        let loaded = storage.load_all_events().unwrap();
2253        assert_eq!(loaded.len(), 5);
2254    }
2255
2256    #[test]
2257    fn test_migrate_flat_layout_is_idempotent_when_re_run_after_completion() {
2258        let temp_dir = TempDir::new().unwrap();
2259        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2260        let _flat = seed_flat_layout_file(&storage, 4);
2261
2262        let first = storage.migrate_flat_layout(false).unwrap();
2263        assert_eq!(first.events_migrated, 4);
2264
2265        // Second run sees no flat files at the root, so it's a no-op —
2266        // events do not duplicate even if an operator runs the tool twice.
2267        let second = storage.migrate_flat_layout(false).unwrap();
2268        assert_eq!(second.flat_files_seen, 0);
2269        assert_eq!(second.events_migrated, 0);
2270        assert_eq!(second.flat_files_removed, 0);
2271
2272        let loaded = storage.load_all_events().unwrap();
2273        assert_eq!(loaded.len(), 4, "rerun must not duplicate or lose events");
2274    }
2275
2276    #[test]
2277    fn test_migrate_flat_layout_ignores_already_partitioned_data() {
2278        // Mixed state: a tenant tree already exists alongside one flat file.
2279        // Migration must touch only the flat file.
2280        let temp_dir = TempDir::new().unwrap();
2281        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2282
2283        for i in 0..3 {
2284            storage
2285                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2286                .unwrap();
2287        }
2288        storage.flush().unwrap();
2289
2290        let _flat = seed_flat_layout_file(&storage, 2);
2291
2292        let report = storage.migrate_flat_layout(false).unwrap();
2293        assert_eq!(report.flat_files_seen, 1, "only the flat file is in scope");
2294        assert_eq!(report.events_migrated, 2);
2295
2296        let alice_files = find_parquet_files_recursive(&temp_dir.path().join("alice")).unwrap();
2297        assert_eq!(alice_files.len(), 1, "alice's tree must be untouched");
2298
2299        let loaded = storage.load_all_events().unwrap();
2300        assert_eq!(loaded.len(), 5);
2301        let alice_count = loaded
2302            .iter()
2303            .filter(|e| e.tenant_id_str() == "alice")
2304            .count();
2305        let default_count = loaded
2306            .iter()
2307            .filter(|e| e.tenant_id_str() == "default")
2308            .count();
2309        assert_eq!(alice_count, 3);
2310        assert_eq!(default_count, 2);
2311    }
2312
2313    #[test]
2314    fn test_migrate_flat_layout_with_no_flat_files_is_a_clean_noop() {
2315        let temp_dir = TempDir::new().unwrap();
2316        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2317        let report = storage.migrate_flat_layout(false).unwrap();
2318        assert_eq!(report.flat_files_seen, 0);
2319        assert_eq!(report.events_migrated, 0);
2320    }
2321}