Skip to main content

allsource_core/infrastructure/persistence/
storage.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4};
5use arrow::{
6    array::{
7        Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8        UInt64Builder,
9    },
10    datatypes::{DataType, Field, Schema, TimeUnit},
11    record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15    collections::HashMap,
16    fs::{self, File},
17    path::{Path, PathBuf},
18    sync::{
19        Arc, Mutex,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25/// Default batch size for Parquet writes (10,000 events as per US-023)
26pub const DEFAULT_BATCH_SIZE: usize = 10_000;
27
28/// Default flush timeout in milliseconds
29pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
30
31/// Configuration for ParquetStorage batch processing
32#[derive(Debug, Clone)]
33pub struct ParquetStorageConfig {
34    /// Batch size before automatic flush (default: 10,000)
35    pub batch_size: usize,
36    /// Timeout before flushing partial batch (default: 5 seconds)
37    pub flush_timeout: Duration,
38    /// Compression codec for Parquet files
39    pub compression: parquet::basic::Compression,
40}
41
42impl Default for ParquetStorageConfig {
43    fn default() -> Self {
44        Self {
45            batch_size: DEFAULT_BATCH_SIZE,
46            flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
47            compression: parquet::basic::Compression::SNAPPY,
48        }
49    }
50}
51
52impl ParquetStorageConfig {
53    /// High-throughput configuration optimized for large batch writes
54    pub fn high_throughput() -> Self {
55        Self {
56            batch_size: 50_000,
57            flush_timeout: Duration::from_secs(10),
58            compression: parquet::basic::Compression::SNAPPY,
59        }
60    }
61
62    /// Low-latency configuration for smaller, more frequent writes
63    pub fn low_latency() -> Self {
64        Self {
65            batch_size: 1_000,
66            flush_timeout: Duration::from_secs(1),
67            compression: parquet::basic::Compression::SNAPPY,
68        }
69    }
70}
71
72/// Statistics for batch write operations
73#[derive(Debug, Clone, Default)]
74pub struct BatchWriteStats {
75    /// Total batches written
76    pub batches_written: u64,
77    /// Total events written
78    pub events_written: u64,
79    /// Total bytes written
80    pub bytes_written: u64,
81    /// Average batch size
82    pub avg_batch_size: f64,
83    /// Events per second (throughput)
84    pub events_per_sec: f64,
85    /// Total write time in nanoseconds
86    pub total_write_time_ns: u64,
87    /// Number of timeout-triggered flushes
88    pub timeout_flushes: u64,
89    /// Number of size-triggered flushes
90    pub size_flushes: u64,
91}
92
93/// Result of a batch write operation
94#[derive(Debug, Clone)]
95pub struct BatchWriteResult {
96    /// Number of events written
97    pub events_written: usize,
98    /// Number of batches flushed to disk
99    pub batches_flushed: usize,
100    /// Total duration of the write operation
101    pub duration: Duration,
102    /// Write throughput in events per second
103    pub events_per_sec: f64,
104}
105
106/// Parquet-based persistent storage for events with batch processing
107///
108/// Features:
109/// - Configurable batch size (default: 10,000 events per US-023)
110/// - Timeout-based flushing for partial batches
111/// - Thread-safe batch accumulation
112/// - SNAPPY compression for efficient storage
113/// - Automatic flush on shutdown via Drop
114pub struct ParquetStorage {
115    /// Base directory for storing parquet files
116    storage_dir: PathBuf,
117
118    /// Buffered events keyed by tenant_id. Each tenant accumulates its own
119    /// batch and flushes independently into its partition under
120    /// `storage_dir/<tenant_id>/<yyyy-mm>/`. Single outer mutex protects the
121    /// whole map: lookup is O(1), tenant cardinality is low (single digits
122    /// today; bounded by Step 3's cache budget later), so contention is
123    /// fine. We keep the mutex held only for the push, not for disk I/O —
124    /// flush takes ownership of a tenant's batch via remove() and writes
125    /// after the lock is released.
126    current_batches: Mutex<HashMap<String, Vec<Event>>>,
127
128    /// Configuration
129    config: ParquetStorageConfig,
130
131    /// Schema for Arrow/Parquet
132    schema: Arc<Schema>,
133
134    /// Last flush timestamp for timeout tracking
135    last_flush_time: Mutex<Instant>,
136
137    /// Statistics tracking
138    batches_written: AtomicU64,
139    events_written: AtomicU64,
140    bytes_written: AtomicU64,
141    total_write_time_ns: AtomicU64,
142    timeout_flushes: AtomicU64,
143    size_flushes: AtomicU64,
144}
145
146impl ParquetStorage {
147    /// Create a new ParquetStorage with default configuration (10,000 event batches)
148    pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
149        Self::with_config(storage_dir, ParquetStorageConfig::default())
150    }
151
152    /// Create a new ParquetStorage with custom configuration
153    pub fn with_config(
154        storage_dir: impl AsRef<Path>,
155        config: ParquetStorageConfig,
156    ) -> Result<Self> {
157        let storage_dir = storage_dir.as_ref().to_path_buf();
158
159        // Create storage directory if it doesn't exist
160        fs::create_dir_all(&storage_dir).map_err(|e| {
161            AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
162        })?;
163
164        // Define Arrow schema for events
165        let schema = Arc::new(Schema::new(vec![
166            Field::new("event_id", DataType::Utf8, false),
167            Field::new("event_type", DataType::Utf8, false),
168            Field::new("entity_id", DataType::Utf8, false),
169            Field::new("payload", DataType::Utf8, false),
170            Field::new(
171                "timestamp",
172                DataType::Timestamp(TimeUnit::Microsecond, None),
173                false,
174            ),
175            Field::new("metadata", DataType::Utf8, true),
176            Field::new("version", DataType::UInt64, false),
177        ]));
178
179        let storage = Self {
180            storage_dir,
181            current_batches: Mutex::new(HashMap::new()),
182            config,
183            schema,
184            last_flush_time: Mutex::new(Instant::now()),
185            batches_written: AtomicU64::new(0),
186            events_written: AtomicU64::new(0),
187            bytes_written: AtomicU64::new(0),
188            total_write_time_ns: AtomicU64::new(0),
189            timeout_flushes: AtomicU64::new(0),
190            size_flushes: AtomicU64::new(0),
191        };
192
193        // Boot-time crash recovery: heal *.parquet.tmp files from
194        // crashed atomic writes (snapshot/compaction or, post-#166-fix,
195        // checkpoint flushes) and quarantine any 0-byte *.parquet files
196        // left over from pre-fix checkpoint crashes (issue #166).
197        match storage.cleanup_partial_writes() {
198            Ok(0) => {}
199            Ok(n) => tracing::warn!(
200                "cleanup_partial_writes acted on {n} crash-detritus file(s) on boot — \
201                 see preceding logs for per-file detail"
202            ),
203            Err(e) => tracing::error!("cleanup_partial_writes failed on boot: {e}"),
204        }
205
206        Ok(storage)
207    }
208
209    /// Create storage with legacy batch size (1000) for backward compatibility
210    #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
211    pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
212        Self::with_config(
213            storage_dir,
214            ParquetStorageConfig {
215                batch_size: 1000,
216                ..Default::default()
217            },
218        )
219    }
220
221    /// Add an event to the current batch
222    ///
223    /// Events are routed to a per-tenant batch keyed by `event.tenant_id_str()`.
224    /// A tenant's batch is buffered until any of:
225    /// - That tenant's batch hits the configured `batch_size` (default 10,000)
226    ///   — flushes only that tenant, not the whole world
227    /// - The flush timeout elapses — flushes every tenant with pending data
228    /// - `flush()` is called explicitly
229    /// - The process shuts down
230    #[cfg_attr(feature = "hotpath", hotpath::measure)]
231    pub fn append_event(&self, event: Event) -> Result<()> {
232        let tenant = event.tenant_id_str().to_string();
233        let should_flush_tenant = {
234            let mut batches = self.current_batches.lock().unwrap();
235            let entry = batches.entry(tenant.clone()).or_default();
236            entry.push(event);
237            entry.len() >= self.config.batch_size
238        };
239
240        if should_flush_tenant {
241            self.size_flushes.fetch_add(1, Ordering::Relaxed);
242            self.flush_tenant(&tenant)?;
243        }
244
245        Ok(())
246    }
247
248    /// Add multiple events to the batch (optimized batch insertion)
249    ///
250    /// Preferred entry point for high-throughput ingestion. Events are
251    /// grouped by tenant under a single mutex acquisition and any tenant
252    /// that crosses `batch_size` is flushed on the spot.
253    #[cfg_attr(feature = "hotpath", hotpath::measure)]
254    pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
255        let start = Instant::now();
256        let event_count = events.len();
257
258        // Pre-group by tenant to keep the lock window short — one acquire,
259        // one extend per tenant, decide which tenants are over threshold.
260        let mut grouped: HashMap<String, Vec<Event>> = HashMap::new();
261        for event in events {
262            grouped
263                .entry(event.tenant_id_str().to_string())
264                .or_default()
265                .push(event);
266        }
267
268        let mut tenants_to_flush: Vec<String> = Vec::new();
269        {
270            let mut batches = self.current_batches.lock().unwrap();
271            for (tenant, mut new_events) in grouped {
272                let entry = batches.entry(tenant.clone()).or_default();
273                entry.append(&mut new_events);
274                if entry.len() >= self.config.batch_size {
275                    tenants_to_flush.push(tenant);
276                }
277            }
278        }
279
280        let mut batches_flushed = 0;
281        for tenant in tenants_to_flush {
282            self.size_flushes.fetch_add(1, Ordering::Relaxed);
283            self.flush_tenant(&tenant)?;
284            batches_flushed += 1;
285        }
286
287        let duration = start.elapsed();
288
289        Ok(BatchWriteResult {
290            events_written: event_count,
291            batches_flushed,
292            duration,
293            events_per_sec: event_count as f64 / duration.as_secs_f64(),
294        })
295    }
296
297    /// Check if a timeout-based flush is needed and perform it
298    ///
299    /// Call this periodically (e.g., from a background task) to ensure
300    /// partial batches are flushed within the configured timeout. When
301    /// triggered, every tenant with pending events flushes — the timer is
302    /// global, not per-tenant, so a slow-trickle tenant doesn't get
303    /// stranded waiting for its own batch to fill.
304    #[cfg_attr(feature = "hotpath", hotpath::measure)]
305    pub fn check_timeout_flush(&self) -> Result<bool> {
306        let should_flush = {
307            let last_flush = self.last_flush_time.lock().unwrap();
308            let batches = self.current_batches.lock().unwrap();
309            let any_pending = batches.values().any(|v| !v.is_empty());
310            any_pending && last_flush.elapsed() >= self.config.flush_timeout
311        };
312
313        if should_flush {
314            self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
315            self.flush()?;
316            Ok(true)
317        } else {
318            Ok(false)
319        }
320    }
321
322    /// Flush every tenant's pending batch to its partition.
323    ///
324    /// Thread-safe: callable from any thread. A snapshot of which tenants
325    /// have pending data is taken under a short lock; each tenant is then
326    /// flushed individually with its own lock cycle, so disk I/O for one
327    /// tenant doesn't block writes against another.
328    #[cfg_attr(feature = "hotpath", hotpath::measure)]
329    pub fn flush(&self) -> Result<()> {
330        let tenants: Vec<String> = {
331            let batches = self.current_batches.lock().unwrap();
332            batches
333                .iter()
334                .filter(|(_, v)| !v.is_empty())
335                .map(|(k, _)| k.clone())
336                .collect()
337        };
338        if tenants.is_empty() {
339            return Ok(());
340        }
341        for tenant in tenants {
342            self.flush_tenant(&tenant)?;
343        }
344        Ok(())
345    }
346
347    /// Flush a single tenant's pending events into its partition file.
348    ///
349    /// File path: `storage_dir/<sanitized_tenant_id>/<yyyy-mm>/events-<ts>-<uuid>.parquet`.
350    /// `<yyyy-mm>` is taken from the wall-clock at flush time (matching the
351    /// pre-tenant filename's timestamp semantics) rather than from
352    /// individual event timestamps — keeps each flush to a single output
353    /// file even when buffered events span months.
354    ///
355    /// The file is written atomically via `write_record_batch_atomic`:
356    /// crash mid-write leaves a `.parquet.tmp` file (cleaned on next boot)
357    /// rather than a 0-byte `.parquet` file with the final name. Issue
358    /// #166 — pre-fix, a SIGKILL inside this function bricked subsequent
359    /// loads because `load_all_events` aborted on the first unreadable
360    /// file.
361    fn flush_tenant(&self, tenant_id: &str) -> Result<()> {
362        let events_to_write = {
363            let mut batches = self.current_batches.lock().unwrap();
364            match batches.get_mut(tenant_id) {
365                Some(v) if !v.is_empty() => std::mem::take(v),
366                _ => return Ok(()),
367            }
368        };
369
370        let batch_count = events_to_write.len();
371        let start = Instant::now();
372
373        let record_batch = self.events_to_record_batch(&events_to_write)?;
374
375        let now = chrono::Utc::now();
376        let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, now)?;
377        fs::create_dir_all(&partition_dir).map_err(|e| {
378            AllSourceError::StorageError(format!(
379                "Failed to create tenant partition {}: {e}",
380                partition_dir.display()
381            ))
382        })?;
383        let file_stem = format!(
384            "events-{}-{}",
385            now.format("%Y%m%d-%H%M%S%3f"),
386            uuid::Uuid::new_v4().as_simple()
387        );
388
389        tracing::info!(
390            "Flushing {} events for tenant={} to {}/{}.parquet",
391            batch_count,
392            tenant_id,
393            partition_dir.display(),
394            file_stem
395        );
396
397        let (file_path, file_metadata) =
398            self.write_record_batch_atomic(&partition_dir, &file_stem, &record_batch)?;
399
400        let duration = start.elapsed();
401
402        self.batches_written.fetch_add(1, Ordering::Relaxed);
403        self.events_written
404            .fetch_add(batch_count as u64, Ordering::Relaxed);
405        if let Some(size) = file_metadata
406            .row_groups()
407            .first()
408            .map(parquet::file::metadata::RowGroupMetaData::total_byte_size)
409        {
410            self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
411        }
412        self.total_write_time_ns
413            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
414
415        {
416            let mut last_flush = self.last_flush_time.lock().unwrap();
417            *last_flush = Instant::now();
418        }
419
420        tracing::info!(
421            "Wrote {} events for tenant={} to {} in {:?}",
422            batch_count,
423            tenant_id,
424            file_path.display(),
425            duration
426        );
427
428        Ok(())
429    }
430
431    /// Write a single record batch atomically to
432    /// `<partition_dir>/<file_stem>.parquet`, returning the final path
433    /// and the parquet writer's metadata (used for byte-size metrics by
434    /// the checkpoint flush path).
435    ///
436    /// Crash-safety contract — same four steps as `write_atomic_parquet`:
437    /// 1. Write to `<final_path>.tmp` first.
438    /// 2. fsync the .tmp file so the data is durably on disk.
439    /// 3. Rename `.tmp` → final name (atomic POSIX rename).
440    /// 4. fsync the parent directory so the rename survives crash.
441    ///
442    /// Caller is responsible for choosing `partition_dir` (and creating
443    /// it). On any failure mid-way, the `.tmp` file gets cleaned up by
444    /// `cleanup_partial_writes` on next boot. The final file appears
445    /// atomically — readers either see the old state (no file) or the
446    /// complete new file, never a half-written one.
447    fn write_record_batch_atomic(
448        &self,
449        partition_dir: &Path,
450        file_stem: &str,
451        record_batch: &RecordBatch,
452    ) -> Result<(PathBuf, parquet::file::metadata::ParquetMetaData)> {
453        let final_path = partition_dir.join(format!("{file_stem}.parquet"));
454        let tmp_path = partition_dir.join(format!("{file_stem}.parquet.tmp"));
455
456        // 1. Write to .tmp.
457        let metadata = {
458            let file = File::create(&tmp_path).map_err(|e| {
459                AllSourceError::StorageError(format!(
460                    "Failed to create parquet tmp file {}: {e}",
461                    tmp_path.display()
462                ))
463            })?;
464
465            let props = WriterProperties::builder()
466                .set_compression(self.config.compression)
467                .build();
468
469            let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
470            writer.write(record_batch)?;
471            // close() consumes writer and returns the file metadata; the
472            // underlying File is flushed and closed here.
473            writer.close()?
474        };
475
476        // 2. fsync the .tmp file.
477        let tmp_file = File::open(&tmp_path).map_err(|e| {
478            AllSourceError::StorageError(format!(
479                "Failed to reopen parquet tmp for fsync {}: {e}",
480                tmp_path.display()
481            ))
482        })?;
483        tmp_file.sync_all().map_err(|e| {
484            AllSourceError::StorageError(format!("fsync on parquet tmp failed: {e}"))
485        })?;
486        drop(tmp_file);
487
488        // 3. Atomic rename.
489        fs::rename(&tmp_path, &final_path).map_err(|e| {
490            AllSourceError::StorageError(format!(
491                "Failed to rename {} → {}: {e}",
492                tmp_path.display(),
493                final_path.display()
494            ))
495        })?;
496
497        // 4. fsync the parent directory so the rename survives crash.
498        // Linux fsync-on-dir is the canonical way to make a rename
499        // durable; macOS no-ops the dir fsync but doesn't error.
500        if let Ok(dir) = File::open(partition_dir) {
501            let _ = dir.sync_all();
502        }
503
504        Ok((final_path, metadata))
505    }
506
507    /// Atomically write `events` to a Parquet file under the tenant's
508    /// partition. Step 4 of the sustainable data strategy uses this to
509    /// emit per-tenant snapshot/compaction files (`snapshot.<tenant>.<from>-<to>`)
510    /// without risking partial files on crash.
511    ///
512    /// Crash-safety contract:
513    /// 1. Write to `<final_path>.tmp` first.
514    /// 2. fsync the .tmp file so the data is durably on disk.
515    /// 3. Rename `.tmp` → final name (atomic POSIX rename).
516    /// 4. fsync the parent directory so the rename is durable.
517    ///
518    /// On any failure mid-way, the .tmp file gets cleaned up by
519    /// `cleanup_partial_writes` on next boot. The final file appears
520    /// atomically — readers either see the old state (no file) or
521    /// the complete new file, never a half-written one.
522    ///
523    /// `file_stem` is the filename without extension or partition path
524    /// — caller-controlled so the snapshot naming convention
525    /// (`snapshot.<tenant>.<from>-<to>`) lives in the compaction layer,
526    /// not here. The `.parquet` extension is appended automatically.
527    ///
528    /// Returns the final (post-rename) path so the caller can
529    /// confirm the file landed where expected.
530    pub fn write_atomic_parquet(
531        &self,
532        tenant_id: &str,
533        file_stem: &str,
534        events: &[Event],
535    ) -> Result<PathBuf> {
536        if events.is_empty() {
537            return Err(AllSourceError::StorageError(
538                "write_atomic_parquet called with empty event slice".to_string(),
539            ));
540        }
541        // Anchor partition by the earliest event's month — keeps the
542        // file in the same yyyy-mm bucket as the data it represents.
543        // Callers that span multiple months can use the earliest
544        // event's month and trust the recursive walker to find it.
545        let anchor_ts = events
546            .iter()
547            .map(|e| e.timestamp)
548            .min()
549            .unwrap_or_else(chrono::Utc::now);
550        let partition_dir = partition_path_for_tenant(&self.storage_dir, tenant_id, anchor_ts)?;
551        fs::create_dir_all(&partition_dir).map_err(|e| {
552            AllSourceError::StorageError(format!(
553                "Failed to create tenant partition {}: {e}",
554                partition_dir.display()
555            ))
556        })?;
557
558        let record_batch = self.events_to_record_batch(events)?;
559        let (final_path, _meta) =
560            self.write_record_batch_atomic(&partition_dir, file_stem, &record_batch)?;
561
562        tracing::info!(
563            tenant_id = tenant_id,
564            file = %final_path.display(),
565            event_count = events.len(),
566            "wrote atomic snapshot file"
567        );
568
569        Ok(final_path)
570    }
571
572    /// Sweep the storage tree for crash detritus and heal it on boot.
573    /// Called on boot from `ParquetStorage::new` so every cold start
574    /// starts clean. Two kinds of leftovers are handled:
575    ///
576    /// 1. **`*.parquet.tmp` files** — a snapshot or flush write that
577    ///    crashed between fsync and rename. Safe to delete: the data is
578    ///    either still in the WAL (for checkpoint flushes) or in the
579    ///    constituent raw files (for snapshot/compaction, which only
580    ///    deletes the inputs AFTER the rename succeeds). Either way, the
581    ///    failed write will be retried.
582    ///
583    /// 2. **0-byte `*.parquet` files** — issue #166. Pre-fix releases
584    ///    (≤ 0.20.0 for the checkpoint path) wrote directly to the
585    ///    final filename, so SIGKILL between `File::create` and the
586    ///    first flush left a 0-byte `events-*.parquet` that bricked
587    ///    every subsequent load. Rather than delete, we rename to
588    ///    `<original>.parquet.corrupt-<unix-ts>` so an operator can
589    ///    inspect/forensicate before destroying the evidence — most
590    ///    will just `rm` after seeing the size.
591    ///
592    /// Returns the total number of files acted on, for boot-log
593    /// observability.
594    pub fn cleanup_partial_writes(&self) -> Result<usize> {
595        let mut acted = 0usize;
596        let mut stack: Vec<PathBuf> = vec![self.storage_dir.clone()];
597        while let Some(dir) = stack.pop() {
598            let Ok(entries) = fs::read_dir(&dir) else {
599                continue;
600            };
601            for entry in entries.flatten() {
602                let path = entry.path();
603                let Ok(ft) = entry.file_type() else { continue };
604                if ft.is_dir() {
605                    stack.push(path);
606                    continue;
607                }
608                if !ft.is_file() {
609                    continue;
610                }
611                let path_str = path.to_string_lossy();
612                if path_str.ends_with(".parquet.tmp") {
613                    match fs::remove_file(&path) {
614                        Ok(()) => {
615                            tracing::warn!(
616                                file = %path.display(),
617                                "cleaned up orphan snapshot tmp file (crash recovery)"
618                            );
619                            acted += 1;
620                        }
621                        Err(e) => {
622                            tracing::error!(
623                                file = %path.display(),
624                                "failed to remove orphan snapshot tmp file: {e}"
625                            );
626                        }
627                    }
628                } else if path_str.ends_with(".parquet")
629                    && fs::metadata(&path).is_ok_and(|m| m.len() == 0)
630                {
631                    // Issue #166: pre-fix checkpoint write left a 0-byte
632                    // .parquet file with the final name on crash. Quarantine
633                    // (rename) rather than delete — operator can inspect.
634                    let ts = chrono::Utc::now().timestamp();
635                    let quarantine_path = path.with_extension(format!("parquet.corrupt-{ts}"));
636                    match fs::rename(&path, &quarantine_path) {
637                        Ok(()) => {
638                            tracing::error!(
639                                from = %path.display(),
640                                to = %quarantine_path.display(),
641                                "quarantined 0-byte parquet file (issue #166 pre-fix crash). \
642                                 Operator: inspect and rm if not needed."
643                            );
644                            acted += 1;
645                        }
646                        Err(e) => {
647                            tracing::error!(
648                                file = %path.display(),
649                                "failed to quarantine 0-byte parquet file: {e}"
650                            );
651                        }
652                    }
653                }
654            }
655        }
656        Ok(acted)
657    }
658
659    /// Force flush any remaining events (for shutdown handling).
660    ///
661    /// Sums pending counts across every tenant's batch so the caller can
662    /// log "we flushed N events on shutdown" without caring about
663    /// per-tenant breakdown.
664    #[cfg_attr(feature = "hotpath", hotpath::measure)]
665    pub fn flush_on_shutdown(&self) -> Result<usize> {
666        let total_pending: usize = {
667            let batches = self.current_batches.lock().unwrap();
668            batches.values().map(Vec::len).sum()
669        };
670
671        if total_pending > 0 {
672            tracing::info!(
673                "Shutdown: flushing {} pending events across all tenants",
674                total_pending
675            );
676            self.flush()?;
677        }
678
679        Ok(total_pending)
680    }
681
682    /// Get batch write statistics
683    pub fn batch_stats(&self) -> BatchWriteStats {
684        let batches = self.batches_written.load(Ordering::Relaxed);
685        let events = self.events_written.load(Ordering::Relaxed);
686        let bytes = self.bytes_written.load(Ordering::Relaxed);
687        let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
688
689        let time_secs = time_ns as f64 / 1_000_000_000.0;
690
691        BatchWriteStats {
692            batches_written: batches,
693            events_written: events,
694            bytes_written: bytes,
695            avg_batch_size: if batches > 0 {
696                events as f64 / batches as f64
697            } else {
698                0.0
699            },
700            events_per_sec: if time_secs > 0.0 {
701                events as f64 / time_secs
702            } else {
703                0.0
704            },
705            total_write_time_ns: time_ns,
706            timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
707            size_flushes: self.size_flushes.load(Ordering::Relaxed),
708        }
709    }
710
711    /// Total pending events across all tenant batches.
712    pub fn pending_count(&self) -> usize {
713        self.current_batches
714            .lock()
715            .unwrap()
716            .values()
717            .map(Vec::len)
718            .sum()
719    }
720
721    /// Get configured batch size
722    pub fn batch_size(&self) -> usize {
723        self.config.batch_size
724    }
725
726    /// Get configured flush timeout
727    pub fn flush_timeout(&self) -> Duration {
728        self.config.flush_timeout
729    }
730
731    /// Convert events to Arrow RecordBatch
732    #[cfg_attr(feature = "hotpath", hotpath::measure)]
733    fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
734        let mut event_id_builder = StringBuilder::new();
735        let mut event_type_builder = StringBuilder::new();
736        let mut entity_id_builder = StringBuilder::new();
737        let mut payload_builder = StringBuilder::new();
738        let mut timestamp_builder = TimestampMicrosecondBuilder::new();
739        let mut metadata_builder = StringBuilder::new();
740        let mut version_builder = UInt64Builder::new();
741
742        for event in events {
743            event_id_builder.append_value(event.id.to_string());
744            event_type_builder.append_value(event.event_type_str());
745            entity_id_builder.append_value(event.entity_id_str());
746            payload_builder.append_value(serde_json::to_string(&event.payload)?);
747
748            // Convert timestamp to microseconds
749            let timestamp_micros = event.timestamp.timestamp_micros();
750            timestamp_builder.append_value(timestamp_micros);
751
752            if let Some(ref metadata) = event.metadata {
753                metadata_builder.append_value(serde_json::to_string(metadata)?);
754            } else {
755                metadata_builder.append_null();
756            }
757
758            version_builder.append_value(event.version as u64);
759        }
760
761        let arrays: Vec<ArrayRef> = vec![
762            Arc::new(event_id_builder.finish()),
763            Arc::new(event_type_builder.finish()),
764            Arc::new(entity_id_builder.finish()),
765            Arc::new(payload_builder.finish()),
766            Arc::new(timestamp_builder.finish()),
767            Arc::new(metadata_builder.finish()),
768            Arc::new(version_builder.finish()),
769        ];
770
771        let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
772
773        Ok(record_batch)
774    }
775
776    /// Load events from all Parquet files under the storage directory.
777    ///
778    /// Walks the tree recursively so both layouts work: legacy flat
779    /// (`storage_dir/events-*.parquet`) and the tenant-partitioned tree
780    /// introduced by the data-strategy work (`storage_dir/<tenant>/<yyyy-mm>/
781    /// events-*.parquet`). The two coexist on disk during migration.
782    #[cfg_attr(feature = "hotpath", hotpath::measure)]
783    pub fn load_all_events(&self) -> Result<Vec<Event>> {
784        let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
785
786        let mut all_events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
787        let mut skipped = 0usize;
788        for file_path in parquet_files {
789            tracing::info!("Loading events from {}", file_path.display());
790            let tenant_id = tenant_id_from_path(&self.storage_dir, &file_path);
791            // Issue #166: a single corrupt/0-byte parquet file (crash mid-write
792            // on pre-fix releases) would propagate up and abort the whole load,
793            // bricking access to every healthy file alongside it. Log and skip
794            // instead so one bad file can't take the rest down.
795            match self.load_events_from_file(&file_path, &tenant_id) {
796                Ok(file_events) => all_events.extend(file_events),
797                Err(e) => {
798                    tracing::error!(
799                        file = %file_path.display(),
800                        error = %e,
801                        "Skipping unreadable parquet file — other files will still load. \
802                         Likely a 0-byte or truncated file from an unclean shutdown; \
803                         inspect and remove manually after confirming."
804                    );
805                    skipped += 1;
806                }
807            }
808        }
809
810        if skipped > 0 {
811            tracing::warn!(
812                "Loaded {} events from storage; skipped {} unreadable file(s)",
813                all_events.len(),
814                skipped
815            );
816        } else {
817            tracing::info!("Loaded {} total events from storage", all_events.len());
818        }
819
820        Ok(all_events)
821    }
822
823    /// Load events from a single Parquet file. `tenant_id` is the value to
824    /// stamp onto each loaded event — derived from the file's location in
825    /// the tree by `load_all_events`. The Parquet schema doesn't include
826    /// tenant_id today (path is the source of truth), so this is how
827    /// per-tenant identity survives the round trip.
828    #[cfg_attr(feature = "hotpath", hotpath::measure)]
829    fn load_events_from_file(&self, file_path: &Path, tenant_id: &str) -> Result<Vec<Event>> {
830        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
831
832        let file = File::open(file_path).map_err(|e| {
833            AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
834        })?;
835
836        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
837        let mut reader = builder.build()?;
838
839        let mut events = Vec::new();
840
841        while let Some(Ok(batch)) = reader.next() {
842            let batch_events = self.record_batch_to_events(&batch, tenant_id)?;
843            events.extend(batch_events);
844        }
845
846        Ok(events)
847    }
848
849    /// Public wrapper around the internal single-file loader. Used
850    /// by the per-tenant compaction (Step 4) which needs to read a
851    /// specific candidate set rather than the whole tenant subtree.
852    /// Caller passes the tenant_id explicitly because the schema
853    /// doesn't carry it.
854    pub fn load_events_from_file_path(
855        &self,
856        file_path: &Path,
857        tenant_id: &str,
858    ) -> Result<Vec<Event>> {
859        self.load_events_from_file(file_path, tenant_id)
860    }
861
862    /// Convert Arrow RecordBatch back to events. `tenant_id` is stamped onto
863    /// each reconstructed event — the schema doesn't carry it today, so the
864    /// caller passes the value derived from the file path.
865    #[cfg_attr(feature = "hotpath", hotpath::measure)]
866    fn record_batch_to_events(&self, batch: &RecordBatch, tenant_id: &str) -> Result<Vec<Event>> {
867        let event_ids = batch
868            .column(0)
869            .as_any()
870            .downcast_ref::<arrow::array::StringArray>()
871            .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
872
873        let event_types = batch
874            .column(1)
875            .as_any()
876            .downcast_ref::<arrow::array::StringArray>()
877            .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
878
879        let entity_ids = batch
880            .column(2)
881            .as_any()
882            .downcast_ref::<arrow::array::StringArray>()
883            .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
884
885        let payloads = batch
886            .column(3)
887            .as_any()
888            .downcast_ref::<arrow::array::StringArray>()
889            .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
890
891        let timestamps = batch
892            .column(4)
893            .as_any()
894            .downcast_ref::<TimestampMicrosecondArray>()
895            .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
896
897        let metadatas = batch
898            .column(5)
899            .as_any()
900            .downcast_ref::<arrow::array::StringArray>()
901            .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
902
903        let versions = batch
904            .column(6)
905            .as_any()
906            .downcast_ref::<arrow::array::UInt64Array>()
907            .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
908
909        let mut events = Vec::new();
910
911        for i in 0..batch.num_rows() {
912            let id = uuid::Uuid::parse_str(event_ids.value(i))
913                .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
914
915            let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
916                .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
917
918            let metadata = if metadatas.is_null(i) {
919                None
920            } else {
921                Some(serde_json::from_str(metadatas.value(i))?)
922            };
923
924            let event = Event::reconstruct_from_strings(
925                id,
926                event_types.value(i).to_string(),
927                entity_ids.value(i).to_string(),
928                tenant_id.to_string(),
929                serde_json::from_str(payloads.value(i))?,
930                timestamp,
931                metadata,
932                versions.value(i) as i64,
933            );
934
935            events.push(event);
936        }
937
938        Ok(events)
939    }
940
941    /// List all Parquet file paths under the storage directory, sorted by
942    /// the relative path so files in the same partition stay grouped.
943    ///
944    /// Used by the replication catch-up protocol to stream snapshot files
945    /// to followers that are too far behind for WAL-only catch-up.
946    pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
947        find_parquet_files_recursive(&self.storage_dir)
948    }
949
950    /// List Parquet files belonging to a single tenant — i.e. only files
951    /// under `<storage_dir>/<tenant>/...`. Legacy flat-layout files at the
952    /// root are intentionally excluded; the migration tool moves them under
953    /// `default/` so once it has run a `tenant=default` query sees them.
954    ///
955    /// Returns an empty vec when the tenant subtree doesn't exist (no data
956    /// for that tenant yet). Returns an error only if `tenant_id` fails the
957    /// path-safety whitelist.
958    ///
959    /// This is the building block for tenant-scoped reads: the caller knows
960    /// which files might contain the tenant's data without opening any of
961    /// the others.
962    pub fn list_parquet_files_for_tenant(&self, tenant_id: &str) -> Result<Vec<PathBuf>> {
963        let safe = sanitize_tenant_id_for_path(tenant_id)?;
964        let tenant_root = self.storage_dir.join(safe);
965        if !tenant_root.is_dir() {
966            return Ok(Vec::new());
967        }
968        find_parquet_files_recursive(&tenant_root)
969    }
970
971    /// Load only the events belonging to `tenant_id`, walking just that
972    /// tenant's subtree on disk. The full-storage loader
973    /// (`load_all_events`) opens every Parquet file regardless of tenant;
974    /// this one only opens files under `<storage_dir>/<tenant>/`.
975    ///
976    /// Returns an empty vec when the tenant has no on-disk data. Returns
977    /// an error if the tenant_id fails the path-safety whitelist or any
978    /// individual file fails to load.
979    ///
980    /// This is the read-side complement to per-tenant flushing. It's the
981    /// foundation Step 2 (lazy per-tenant load on demand) needs: a way to
982    /// hydrate one tenant without paying the cost of loading every other
983    /// tenant's data into memory.
984    ///
985    /// Tenant identity for loaded events comes from the file path, the
986    /// same as `load_all_events` — `record_batch_to_events` stamps the
987    /// passed `tenant_id` onto every reconstructed event.
988    #[cfg_attr(feature = "hotpath", hotpath::measure)]
989    pub fn load_events_for_tenant(&self, tenant_id: &str) -> Result<Vec<Event>> {
990        let parquet_files = self.list_parquet_files_for_tenant(tenant_id)?;
991        tracing::info!(
992            tenant_id = tenant_id,
993            file_count = parquet_files.len(),
994            "load_events_for_tenant: walking tenant subtree only"
995        );
996
997        let mut events = Vec::with_capacity(parquet_files.len() * self.config.batch_size);
998        let mut skipped = 0usize;
999        for file_path in parquet_files {
1000            tracing::debug!(
1001                tenant_id = tenant_id,
1002                file = %file_path.display(),
1003                "load_events_for_tenant: opening file"
1004            );
1005            // Issue #166: same defensive log-and-skip as load_all_events.
1006            // The lazy-load path can hit a corrupt file just as easily as
1007            // boot can — failing the whole tenant load on one bad file
1008            // would brick every query for that tenant.
1009            match self.load_events_from_file(&file_path, tenant_id) {
1010                Ok(file_events) => events.extend(file_events),
1011                Err(e) => {
1012                    tracing::error!(
1013                        tenant_id = tenant_id,
1014                        file = %file_path.display(),
1015                        error = %e,
1016                        "Skipping unreadable parquet file in tenant subtree"
1017                    );
1018                    skipped += 1;
1019                }
1020            }
1021        }
1022
1023        tracing::info!(
1024            tenant_id = tenant_id,
1025            event_count = events.len(),
1026            skipped_files = skipped,
1027            "load_events_for_tenant: complete"
1028        );
1029        Ok(events)
1030    }
1031
1032    /// Get the storage directory path.
1033    pub fn storage_dir(&self) -> &Path {
1034        &self.storage_dir
1035    }
1036
1037    /// One-shot migration of flat-layout files into the tenant-partitioned
1038    /// tree. Run with Core stopped (no concurrent writes).
1039    ///
1040    /// Walks `storage_dir`'s top level (non-recursive) for the legacy
1041    /// `events-*.parquet` files. For each one it loads the events,
1042    /// regroups them by (tenant_id, yyyy-mm) — events from a flat file
1043    /// take the path-derived "default" tenant, since pre-partitioning
1044    /// data carried no tenant in its on-disk form — writes a fresh
1045    /// Parquet under the corresponding partition directory, and deletes
1046    /// the original flat file once the new file is closed.
1047    ///
1048    /// `dry_run = true` reports what would happen without touching disk.
1049    /// Run dry first; production data deserves the rehearsal.
1050    ///
1051    /// Crash safety: this writes the new partition file before deleting
1052    /// the flat file, so a crash between the two leaves both on disk.
1053    /// The recursive loader (`load_all_events`) will then return both,
1054    /// duplicating those events on next boot. Mitigation: stop Core
1055    /// before running, and re-run the migration after any crash so the
1056    /// flat file gets deleted. A future commit can add atomic rename +
1057    /// fsync semantics; for the one-time migration the stop-Core
1058    /// constraint is enough.
1059    pub fn migrate_flat_layout(&self, dry_run: bool) -> Result<MigrationReport> {
1060        let flat_files = list_flat_layout_files(&self.storage_dir)?;
1061        let mut report = MigrationReport {
1062            dry_run,
1063            ..Default::default()
1064        };
1065
1066        for flat_file in flat_files {
1067            // Pre-partition events used path-derived tenant. For flat-layout
1068            // files that's always "default" (`tenant_id_from_path` falls back
1069            // to "default" for single-component paths).
1070            let events = self.load_events_from_file(&flat_file, "default")?;
1071            report.flat_files_seen += 1;
1072
1073            if events.is_empty() {
1074                // Stale empty file (zero rows). Just remove it.
1075                if !dry_run {
1076                    fs::remove_file(&flat_file).map_err(|e| {
1077                        AllSourceError::StorageError(format!(
1078                            "Failed to remove empty flat file {}: {e}",
1079                            flat_file.display()
1080                        ))
1081                    })?;
1082                }
1083                report.flat_files_removed += 1;
1084                continue;
1085            }
1086
1087            // Group by (tenant, yyyy-mm-from-event-timestamp). The
1088            // partition month tracks the event's wall-clock time so that
1089            // post-migration the layout reflects when data happened, not
1090            // when migration ran. Step 4 (per-tenant snapshots) and Step
1091            // 5 (retention) will key on that.
1092            let mut groups: HashMap<(String, String), Vec<Event>> = HashMap::new();
1093            for event in events {
1094                let key = (
1095                    event.tenant_id_str().to_string(),
1096                    event.timestamp().format("%Y-%m").to_string(),
1097                );
1098                groups.entry(key).or_default().push(event);
1099            }
1100
1101            for ((tenant, yyyy_mm), group_events) in groups {
1102                let count = group_events.len();
1103                if !dry_run {
1104                    let safe_tenant = sanitize_tenant_id_for_path(&tenant)?;
1105                    let target_dir = self.storage_dir.join(safe_tenant).join(&yyyy_mm);
1106                    fs::create_dir_all(&target_dir).map_err(|e| {
1107                        AllSourceError::StorageError(format!(
1108                            "Failed to create partition {}: {e}",
1109                            target_dir.display()
1110                        ))
1111                    })?;
1112                    let filename = format!(
1113                        "events-{}-{}.parquet",
1114                        chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
1115                        uuid::Uuid::new_v4().as_simple()
1116                    );
1117                    let target_path = target_dir.join(&filename);
1118                    let record_batch = self.events_to_record_batch(&group_events)?;
1119                    let file = File::create(&target_path).map_err(|e| {
1120                        AllSourceError::StorageError(format!(
1121                            "Failed to create migration target {}: {e}",
1122                            target_path.display()
1123                        ))
1124                    })?;
1125                    let props = WriterProperties::builder()
1126                        .set_compression(self.config.compression)
1127                        .build();
1128                    let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
1129                    writer.write(&record_batch)?;
1130                    writer.close()?;
1131                    report.partitions_written += 1;
1132                }
1133                report.events_migrated += count;
1134            }
1135
1136            if !dry_run {
1137                fs::remove_file(&flat_file).map_err(|e| {
1138                    AllSourceError::StorageError(format!(
1139                        "Failed to remove flat file {} after migration: {e}",
1140                        flat_file.display()
1141                    ))
1142                })?;
1143                report.flat_files_removed += 1;
1144            }
1145        }
1146
1147        Ok(report)
1148    }
1149
1150    /// Get storage statistics
1151    pub fn stats(&self) -> Result<StorageStats> {
1152        let parquet_files = find_parquet_files_recursive(&self.storage_dir)?;
1153        let mut total_size_bytes = 0u64;
1154        for path in &parquet_files {
1155            if let Ok(metadata) = fs::metadata(path) {
1156                total_size_bytes += metadata.len();
1157            }
1158        }
1159
1160        let current_batch_size: usize = self
1161            .current_batches
1162            .lock()
1163            .unwrap()
1164            .values()
1165            .map(Vec::len)
1166            .sum();
1167
1168        Ok(StorageStats {
1169            total_files: parquet_files.len(),
1170            total_size_bytes,
1171            storage_dir: self.storage_dir.clone(),
1172            current_batch_size,
1173        })
1174    }
1175}
1176
1177/// Validate a tenant ID for use as a filesystem path component.
1178///
1179/// Whitelist: ASCII letters, digits, `-`, `_`, `.` — covers UUIDs, the
1180/// hyphen-and-lowercase tenant strings the onboarding flow produces, and
1181/// the `system` tenant the heartbeat emitter uses. Rejects empty input,
1182/// any path separator (`/`, `\`), and any "..". The whitelist is the
1183/// primary defence against path traversal; the explicit ".." check is
1184/// belt-and-braces in case the whitelist ever loosens.
1185///
1186/// Length capped at 128 bytes — comfortably above the 36-byte UUID and
1187/// the longest onboarding tenant the system has produced, well below
1188/// every common filesystem's NAME_MAX (typically 255).
1189fn sanitize_tenant_id_for_path(tenant_id: &str) -> Result<&str> {
1190    if tenant_id.is_empty() {
1191        return Err(AllSourceError::StorageError(
1192            "tenant_id is empty (cannot derive partition path)".to_string(),
1193        ));
1194    }
1195    if tenant_id.len() > 128 {
1196        return Err(AllSourceError::StorageError(format!(
1197            "tenant_id is too long for partition path: {} bytes (max 128)",
1198            tenant_id.len()
1199        )));
1200    }
1201    if tenant_id == "." || tenant_id == ".." {
1202        return Err(AllSourceError::StorageError(format!(
1203            "tenant_id {tenant_id:?} is reserved"
1204        )));
1205    }
1206    for c in tenant_id.chars() {
1207        let ok = c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.';
1208        if !ok {
1209            return Err(AllSourceError::StorageError(format!(
1210                "tenant_id {tenant_id:?} contains disallowed character {c:?} for partition path"
1211            )));
1212        }
1213    }
1214    Ok(tenant_id)
1215}
1216
1217/// Resolve the directory a flush should write into for `(tenant, when)`.
1218///
1219/// Returns `<root>/<tenant>/<yyyy-mm>/`. Caller is responsible for
1220/// `create_dir_all`-ing the result before opening files in it.
1221fn partition_path_for_tenant(
1222    root: &Path,
1223    tenant_id: &str,
1224    when: chrono::DateTime<chrono::Utc>,
1225) -> Result<PathBuf> {
1226    let safe = sanitize_tenant_id_for_path(tenant_id)?;
1227    Ok(root.join(safe).join(when.format("%Y-%m").to_string()))
1228}
1229
1230/// Reverse of `partition_path_for_tenant` — given a parquet file's full
1231/// path and the storage root, return the tenant_id stored in the path.
1232///
1233/// Tenant-partitioned shape: `<root>/<tenant>/<yyyy-mm>/events-*.parquet`
1234/// → first component after root is the tenant.
1235///
1236/// Legacy flat shape: `<root>/events-*.parquet` → no tenant in path, fall
1237/// back to `"default"` so events written before the partitioning change
1238/// keep loading with their original (and only ever) tenant identity.
1239fn tenant_id_from_path(root: &Path, file_path: &Path) -> String {
1240    let Ok(rel) = file_path.strip_prefix(root) else {
1241        return "default".to_string();
1242    };
1243    let mut comps = rel.components();
1244    let first = comps.next();
1245    let next = comps.next();
1246    match (first, next) {
1247        // Two or more components: <tenant>/<rest>... → tenant
1248        (Some(std::path::Component::Normal(tenant)), Some(_)) => {
1249            tenant.to_string_lossy().into_owned()
1250        }
1251        // Single component (the parquet file itself): legacy flat layout.
1252        _ => "default".to_string(),
1253    }
1254}
1255
1256/// List Parquet files at the top level of `root` only — i.e. the legacy
1257/// flat-layout files. Used by the one-shot migration tool to find data
1258/// that needs moving into the tenant-partitioned tree. The opposite of
1259/// `find_parquet_files_recursive`: stops at the first directory level so
1260/// already-partitioned data isn't included.
1261fn list_flat_layout_files(root: &Path) -> Result<Vec<PathBuf>> {
1262    let entries = fs::read_dir(root).map_err(|e| {
1263        AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
1264    })?;
1265    let mut out: Vec<PathBuf> = entries
1266        .filter_map(std::result::Result::ok)
1267        .filter_map(|entry| {
1268            let ft = entry.file_type().ok()?;
1269            if !ft.is_file() {
1270                return None;
1271            }
1272            let path = entry.path();
1273            if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
1274                Some(path)
1275            } else {
1276                None
1277            }
1278        })
1279        .collect();
1280    out.sort();
1281    Ok(out)
1282}
1283
1284/// Recursively collect all `*.parquet` files under `root`, sorted by path so
1285/// callers see a deterministic, tenant-grouped order.
1286///
1287/// Existence rationale: the storage layout is moving from a flat
1288/// `storage_dir/events-*.parquet` pile to a tenant-partitioned tree of the
1289/// shape `storage_dir/<tenant>/<yyyy-mm>/events-*.parquet`. During the
1290/// migration both shapes coexist, so every code path that asks "what
1291/// parquet files do we have?" needs to walk subdirectories. Symlinks are
1292/// not followed — the storage tree is mounted from a single volume and
1293/// chasing symlinks invites cycles.
1294fn find_parquet_files_recursive(root: &Path) -> Result<Vec<PathBuf>> {
1295    let mut out = Vec::new();
1296    let mut stack: Vec<PathBuf> = vec![root.to_path_buf()];
1297
1298    while let Some(dir) = stack.pop() {
1299        let entries = match fs::read_dir(&dir) {
1300            Ok(e) => e,
1301            // Root must exist (we created it in `new`); subdirectories may
1302            // race a delete from compaction. Skip vanished subdirs rather
1303            // than failing the whole load.
1304            Err(e) if dir == root => {
1305                return Err(AllSourceError::StorageError(format!(
1306                    "Failed to read storage directory: {e}"
1307                )));
1308            }
1309            Err(_) => continue,
1310        };
1311
1312        for entry in entries.flatten() {
1313            let path = entry.path();
1314            // Use file_type() rather than metadata() so symlinks don't get
1315            // followed by accident (metadata() resolves symlinks, file_type()
1316            // doesn't).
1317            let Ok(ft) = entry.file_type() else {
1318                continue;
1319            };
1320            if ft.is_dir() {
1321                stack.push(path);
1322            } else if ft.is_file()
1323                && path
1324                    .extension()
1325                    .and_then(|ext| ext.to_str())
1326                    .is_some_and(|ext| ext == "parquet")
1327            {
1328                out.push(path);
1329            }
1330        }
1331    }
1332
1333    out.sort();
1334    Ok(out)
1335}
1336
1337impl Drop for ParquetStorage {
1338    fn drop(&mut self) {
1339        // Ensure any remaining events are flushed on shutdown
1340        if let Err(e) = self.flush_on_shutdown() {
1341            tracing::error!("Failed to flush events on drop: {}", e);
1342        }
1343    }
1344}
1345
1346/// Outcome of a `migrate_flat_layout` run.
1347#[derive(Debug, Default, Clone, serde::Serialize)]
1348pub struct MigrationReport {
1349    /// Whether the run was a rehearsal (no disk changes).
1350    pub dry_run: bool,
1351    /// Number of legacy flat-layout files discovered.
1352    pub flat_files_seen: usize,
1353    /// Number of legacy flat files deleted (always 0 when `dry_run`).
1354    pub flat_files_removed: usize,
1355    /// Number of new partition files written under the tenant tree.
1356    pub partitions_written: usize,
1357    /// Total events copied into the new tree (counted in dry-run too).
1358    pub events_migrated: usize,
1359}
1360
1361#[derive(Debug, serde::Serialize)]
1362pub struct StorageStats {
1363    pub total_files: usize,
1364    pub total_size_bytes: u64,
1365    pub storage_dir: PathBuf,
1366    pub current_batch_size: usize,
1367}
1368
1369#[cfg(test)]
1370mod tests {
1371    use super::*;
1372    use serde_json::json;
1373    use std::sync::Arc;
1374    use tempfile::TempDir;
1375
1376    fn create_test_event(entity_id: &str) -> Event {
1377        Event::reconstruct_from_strings(
1378            uuid::Uuid::new_v4(),
1379            "test.event".to_string(),
1380            entity_id.to_string(),
1381            "default".to_string(),
1382            json!({
1383                "test": "data",
1384                "value": 42
1385            }),
1386            chrono::Utc::now(),
1387            None,
1388            1,
1389        )
1390    }
1391
1392    #[test]
1393    fn test_parquet_storage_write_read() {
1394        let temp_dir = TempDir::new().unwrap();
1395        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1396
1397        // Add events
1398        for i in 0..10 {
1399            let event = create_test_event(&format!("entity-{i}"));
1400            storage.append_event(event).unwrap();
1401        }
1402
1403        // Flush to disk
1404        storage.flush().unwrap();
1405
1406        // Load back
1407        let loaded_events = storage.load_all_events().unwrap();
1408        assert_eq!(loaded_events.len(), 10);
1409    }
1410
1411    #[test]
1412    fn test_storage_stats() {
1413        let temp_dir = TempDir::new().unwrap();
1414        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1415
1416        // Add and flush events
1417        for i in 0..5 {
1418            storage
1419                .append_event(create_test_event(&format!("entity-{i}")))
1420                .unwrap();
1421        }
1422        storage.flush().unwrap();
1423
1424        let stats = storage.stats().unwrap();
1425        assert_eq!(stats.total_files, 1);
1426        assert!(stats.total_size_bytes > 0);
1427    }
1428
1429    #[test]
1430    fn test_default_batch_size() {
1431        let temp_dir = TempDir::new().unwrap();
1432        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1433
1434        // Default batch size should be 10,000 as per US-023
1435        assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
1436        assert_eq!(storage.batch_size(), 10_000);
1437    }
1438
1439    #[test]
1440    fn test_custom_config() {
1441        let temp_dir = TempDir::new().unwrap();
1442        let config = ParquetStorageConfig {
1443            batch_size: 5_000,
1444            flush_timeout: Duration::from_secs(2),
1445            compression: parquet::basic::Compression::SNAPPY,
1446        };
1447        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1448
1449        assert_eq!(storage.batch_size(), 5_000);
1450        assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
1451    }
1452
1453    #[test]
1454    fn test_batch_write() {
1455        let temp_dir = TempDir::new().unwrap();
1456        let config = ParquetStorageConfig {
1457            batch_size: 100, // Small batch for testing
1458            ..Default::default()
1459        };
1460        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1461
1462        // 250 events for a single tenant. With per-tenant flush, when the
1463        // tenant's pending batch crosses batch_size we drain the whole
1464        // tenant in one flush — not chunk at exactly batch_size like the
1465        // old global-batch path did. So 250 events triggers exactly one
1466        // size-flush (the appender pushes all 250 onto the tenant's batch
1467        // under one lock, sees length >= 100, schedules a flush which
1468        // drains everything). 0 left pending.
1469        let events: Vec<Event> = (0..250)
1470            .map(|i| create_test_event(&format!("entity-{i}")))
1471            .collect();
1472
1473        let result = storage.batch_write(events).unwrap();
1474        assert_eq!(result.events_written, 250);
1475        assert_eq!(result.batches_flushed, 1);
1476        assert_eq!(storage.pending_count(), 0);
1477
1478        // Manual flush is a no-op since nothing's pending.
1479        storage.flush().unwrap();
1480
1481        // All 250 events round-trip through the tenant-partitioned tree.
1482        let loaded = storage.load_all_events().unwrap();
1483        assert_eq!(loaded.len(), 250);
1484    }
1485
1486    #[test]
1487    fn test_auto_flush_on_batch_size() {
1488        let temp_dir = TempDir::new().unwrap();
1489        let config = ParquetStorageConfig {
1490            batch_size: 10, // Very small for testing
1491            ..Default::default()
1492        };
1493        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1494
1495        // Add 15 events - should auto-flush at 10
1496        for i in 0..15 {
1497            storage
1498                .append_event(create_test_event(&format!("entity-{i}")))
1499                .unwrap();
1500        }
1501
1502        // Should have 5 pending, 10 written
1503        assert_eq!(storage.pending_count(), 5);
1504
1505        let stats = storage.batch_stats();
1506        assert_eq!(stats.events_written, 10);
1507        assert_eq!(stats.batches_written, 1);
1508        assert_eq!(stats.size_flushes, 1);
1509    }
1510
1511    #[test]
1512    fn test_flush_on_shutdown() {
1513        let temp_dir = TempDir::new().unwrap();
1514        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1515
1516        // Add some events without reaching batch size
1517        for i in 0..5 {
1518            storage
1519                .append_event(create_test_event(&format!("entity-{i}")))
1520                .unwrap();
1521        }
1522
1523        assert_eq!(storage.pending_count(), 5);
1524
1525        // Manually trigger shutdown flush
1526        let flushed = storage.flush_on_shutdown().unwrap();
1527        assert_eq!(flushed, 5);
1528        assert_eq!(storage.pending_count(), 0);
1529
1530        // Verify events are persisted
1531        let loaded = storage.load_all_events().unwrap();
1532        assert_eq!(loaded.len(), 5);
1533    }
1534
1535    #[test]
1536    fn test_thread_safe_writes() {
1537        let temp_dir = TempDir::new().unwrap();
1538        let config = ParquetStorageConfig {
1539            batch_size: 100,
1540            ..Default::default()
1541        };
1542        let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
1543
1544        let events_per_thread = 50;
1545        let thread_count = 4;
1546
1547        std::thread::scope(|s| {
1548            for t in 0..thread_count {
1549                let storage_ref = Arc::clone(&storage);
1550                s.spawn(move || {
1551                    for i in 0..events_per_thread {
1552                        let event = create_test_event(&format!("thread-{t}-entity-{i}"));
1553                        storage_ref.append_event(event).unwrap();
1554                    }
1555                });
1556            }
1557        });
1558
1559        // Flush remaining
1560        storage.flush().unwrap();
1561
1562        // All events should be written
1563        let loaded = storage.load_all_events().unwrap();
1564        assert_eq!(loaded.len(), events_per_thread * thread_count);
1565    }
1566
1567    #[test]
1568    fn test_batch_stats() {
1569        let temp_dir = TempDir::new().unwrap();
1570        let config = ParquetStorageConfig {
1571            batch_size: 50,
1572            ..Default::default()
1573        };
1574        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1575
1576        // 100 events, single tenant, batch_size=50. Per-tenant flush
1577        // drains the whole tenant on the first size trigger, so this
1578        // produces exactly one size-flush and one batches_written event
1579        // (vs. the pre-tenant world's two).
1580        let events: Vec<Event> = (0..100)
1581            .map(|i| create_test_event(&format!("entity-{i}")))
1582            .collect();
1583
1584        storage.batch_write(events).unwrap();
1585
1586        let stats = storage.batch_stats();
1587        assert_eq!(stats.batches_written, 1);
1588        assert_eq!(stats.events_written, 100);
1589        assert!(stats.avg_batch_size > 0.0);
1590        assert!(stats.events_per_sec > 0.0);
1591        assert_eq!(stats.size_flushes, 1);
1592    }
1593
1594    #[test]
1595    fn test_config_presets() {
1596        let high_throughput = ParquetStorageConfig::high_throughput();
1597        assert_eq!(high_throughput.batch_size, 50_000);
1598        assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
1599
1600        let low_latency = ParquetStorageConfig::low_latency();
1601        assert_eq!(low_latency.batch_size, 1_000);
1602        assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
1603
1604        let default = ParquetStorageConfig::default();
1605        assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
1606        assert_eq!(default.batch_size, 10_000);
1607    }
1608
1609    /// Benchmark: Compare single-event writes vs batch writes
1610    /// Run with: cargo test --release -- --ignored test_batch_write_throughput
1611    #[test]
1612    #[ignore]
1613    fn test_batch_write_throughput() {
1614        let temp_dir = TempDir::new().unwrap();
1615        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1616
1617        let event_count = 50_000;
1618
1619        // Benchmark batch write
1620        let events: Vec<Event> = (0..event_count)
1621            .map(|i| create_test_event(&format!("entity-{i}")))
1622            .collect();
1623
1624        let start = std::time::Instant::now();
1625        let result = storage.batch_write(events).unwrap();
1626        storage.flush().unwrap(); // Flush any remaining
1627        let batch_duration = start.elapsed();
1628
1629        let batch_stats = storage.batch_stats();
1630
1631        println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
1632        println!("Events: {event_count}");
1633        println!("Duration: {batch_duration:?}");
1634        println!("Events/sec: {:.0}", result.events_per_sec);
1635        println!("Batches written: {}", batch_stats.batches_written);
1636        println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
1637        println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
1638
1639        // Target: Batch writes should achieve at least 100K events/sec in release mode
1640        // This represents 40%+ improvement over single-event writes
1641        assert!(
1642            result.events_per_sec > 10_000.0,
1643            "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
1644            result.events_per_sec
1645        );
1646    }
1647
1648    /// Benchmark: Single-event write baseline (for comparison)
1649    #[test]
1650    #[ignore]
1651    fn test_single_event_write_baseline() {
1652        let temp_dir = TempDir::new().unwrap();
1653        let config = ParquetStorageConfig {
1654            batch_size: 1, // Force flush after each event
1655            ..Default::default()
1656        };
1657        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1658
1659        let event_count = 1_000; // Fewer events since this is slow
1660
1661        let start = std::time::Instant::now();
1662        for i in 0..event_count {
1663            let event = create_test_event(&format!("entity-{i}"));
1664            storage.append_event(event).unwrap();
1665        }
1666        let duration = start.elapsed();
1667
1668        let events_per_sec = f64::from(event_count) / duration.as_secs_f64();
1669
1670        println!("\n=== Single-Event Write Baseline ===");
1671        println!("Events: {event_count}");
1672        println!("Duration: {duration:?}");
1673        println!("Events/sec: {events_per_sec:.0}");
1674
1675        // This should be significantly slower than batch writes
1676        // Used as a baseline to demonstrate 40%+ improvement
1677    }
1678
1679    // -----------------------------------------------------------------
1680    // Tests for the recursive parquet walker (Step 1, commit #1: read-side
1681    // bidirectional layout support — see SUSTAINABLE_DATA_STRATEGY.md).
1682    // -----------------------------------------------------------------
1683
1684    /// Helper: write a tiny placeholder parquet file at an arbitrary path so
1685    /// the walker has something concrete to find. We only care that the
1686    /// walker discovers the path, not that the file is loadable here — the
1687    /// load path is exercised by the existing read tests.
1688    fn touch_parquet(path: &Path) {
1689        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
1690        std::fs::write(path, b"").unwrap();
1691    }
1692
1693    #[test]
1694    fn test_walker_finds_files_in_flat_layout() {
1695        let temp_dir = TempDir::new().unwrap();
1696        let root = temp_dir.path();
1697        touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1698        touch_parquet(&root.join("events-20260101-130000000-bbbb.parquet"));
1699
1700        let mut found = find_parquet_files_recursive(root).unwrap();
1701        found.sort();
1702        assert_eq!(found.len(), 2);
1703        assert!(
1704            found[0]
1705                .file_name()
1706                .unwrap()
1707                .to_str()
1708                .unwrap()
1709                .starts_with("events-"),
1710            "expected events-* file, got {found:?}"
1711        );
1712    }
1713
1714    #[test]
1715    fn test_walker_finds_files_in_tenant_partitioned_tree() {
1716        let temp_dir = TempDir::new().unwrap();
1717        let root = temp_dir.path();
1718        // Tenant-partitioned shape: storage_dir/<tenant>/<yyyy-mm>/events-*.parquet
1719        touch_parquet(&root.join("tenant-a/2026-01/events-20260101-120000000-aaaa.parquet"));
1720        touch_parquet(&root.join("tenant-a/2026-02/events-20260201-120000000-bbbb.parquet"));
1721        touch_parquet(&root.join("tenant-b/2026-01/events-20260103-120000000-cccc.parquet"));
1722
1723        let found = find_parquet_files_recursive(root).unwrap();
1724        assert_eq!(found.len(), 3);
1725        // Sort places tenant-a files before tenant-b — that's the
1726        // tenant-grouping the docs claim.
1727        assert!(found[0].to_str().unwrap().contains("tenant-a"));
1728        assert!(found[1].to_str().unwrap().contains("tenant-a"));
1729        assert!(found[2].to_str().unwrap().contains("tenant-b"));
1730    }
1731
1732    #[test]
1733    fn test_walker_handles_mixed_legacy_and_partitioned_layouts() {
1734        // The migration window: some tenants have been moved into the tree,
1735        // some flat files still sit at the root. The walker must surface
1736        // both so load_all_events sees every event regardless of where it
1737        // currently lives.
1738        let temp_dir = TempDir::new().unwrap();
1739        let root = temp_dir.path();
1740        touch_parquet(&root.join("events-legacy-aaaa.parquet"));
1741        touch_parquet(&root.join("tenant-a/2026-01/events-new-bbbb.parquet"));
1742
1743        let found = find_parquet_files_recursive(root).unwrap();
1744        assert_eq!(found.len(), 2);
1745    }
1746
1747    #[test]
1748    fn test_walker_ignores_non_parquet_files() {
1749        let temp_dir = TempDir::new().unwrap();
1750        let root = temp_dir.path();
1751        std::fs::write(root.join("README.md"), b"hello").unwrap();
1752        std::fs::write(root.join("events.json"), b"[]").unwrap();
1753        touch_parquet(&root.join("events-20260101-120000000-aaaa.parquet"));
1754        // Files that just happen to have "parquet" in the name but no
1755        // .parquet extension stay out — extension-only filter, no name match.
1756        std::fs::write(root.join("not-a-parquet-file.bin"), b"").unwrap();
1757
1758        let found = find_parquet_files_recursive(root).unwrap();
1759        assert_eq!(found.len(), 1);
1760        assert_eq!(
1761            found[0].extension().and_then(|s| s.to_str()),
1762            Some("parquet")
1763        );
1764    }
1765
1766    /// Build an event whose tenant_id and entity_id we control, so tests
1767    /// can verify per-tenant routing without depending on the helper that
1768    /// hardcodes "default".
1769    fn event_with_tenant(tenant: &str, entity_id: &str) -> Event {
1770        Event::reconstruct_from_strings(
1771            uuid::Uuid::new_v4(),
1772            "test.event".to_string(),
1773            entity_id.to_string(),
1774            tenant.to_string(),
1775            json!({"k": "v"}),
1776            chrono::Utc::now(),
1777            None,
1778            1,
1779        )
1780    }
1781
1782    #[test]
1783    fn test_flush_writes_into_per_tenant_partition() {
1784        // End-to-end check that the new write path produces
1785        // <root>/<tenant>/<yyyy-mm>/events-*.parquet — no flat file at the
1786        // root, no cross-tenant mixing.
1787        let temp_dir = TempDir::new().unwrap();
1788        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1789
1790        for i in 0..3 {
1791            storage
1792                .append_event(event_with_tenant("default", &format!("entity-{i}")))
1793                .unwrap();
1794        }
1795        storage.flush().unwrap();
1796
1797        let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1798        assert_eq!(parquet_files.len(), 1);
1799
1800        let rel = parquet_files[0]
1801            .strip_prefix(temp_dir.path())
1802            .unwrap()
1803            .to_string_lossy()
1804            .into_owned();
1805        // Path shape: default/<yyyy-mm>/events-*.parquet
1806        let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
1807        assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
1808        assert_eq!(parts[0], "default");
1809        // yyyy-mm is two digits dash four — loose check, exact month
1810        // varies with wall-clock at test runtime.
1811        assert!(
1812            parts[1].len() == 7 && parts[1].as_bytes()[4] == b'-',
1813            "expected yyyy-mm, got {}",
1814            parts[1]
1815        );
1816        assert!(parts[2].starts_with("events-") && parts[2].ends_with(".parquet"));
1817
1818        let loaded = storage.load_all_events().unwrap();
1819        assert_eq!(loaded.len(), 3);
1820    }
1821
1822    #[test]
1823    fn test_multiple_tenants_get_isolated_subtrees() {
1824        // Per-tenant flush must not mix tenants into the same Parquet file
1825        // and must put each tenant under its own subdirectory.
1826        let temp_dir = TempDir::new().unwrap();
1827        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1828
1829        for i in 0..2 {
1830            storage
1831                .append_event(event_with_tenant("alice", &format!("a-{i}")))
1832                .unwrap();
1833        }
1834        for i in 0..3 {
1835            storage
1836                .append_event(event_with_tenant("bob", &format!("b-{i}")))
1837                .unwrap();
1838        }
1839        storage.flush().unwrap();
1840
1841        let alice_subtree = temp_dir.path().join("alice");
1842        let bob_subtree = temp_dir.path().join("bob");
1843        assert!(alice_subtree.is_dir(), "alice should have its own subtree");
1844        assert!(bob_subtree.is_dir(), "bob should have its own subtree");
1845
1846        let alice_files = find_parquet_files_recursive(&alice_subtree).unwrap();
1847        let bob_files = find_parquet_files_recursive(&bob_subtree).unwrap();
1848        assert_eq!(alice_files.len(), 1);
1849        assert_eq!(bob_files.len(), 1);
1850
1851        // Loaded events keep their tenant_id — round-trip preserves which
1852        // tenant each event belonged to.
1853        let loaded = storage.load_all_events().unwrap();
1854        let (alice_count, bob_count) =
1855            loaded
1856                .iter()
1857                .fold((0, 0), |(a, b), e| match e.tenant_id_str() {
1858                    "alice" => (a + 1, b),
1859                    "bob" => (a, b + 1),
1860                    _ => (a, b),
1861                });
1862        assert_eq!(alice_count, 2);
1863        assert_eq!(bob_count, 3);
1864    }
1865
1866    #[test]
1867    fn test_size_flush_only_drains_full_tenant() {
1868        // When one tenant exactly hits batch_size, only that tenant
1869        // flushes; the other tenant keeps its events buffered. Prevents
1870        // one noisy tenant from causing fragmented writes for everyone.
1871        let temp_dir = TempDir::new().unwrap();
1872        let config = ParquetStorageConfig {
1873            batch_size: 5,
1874            ..Default::default()
1875        };
1876        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
1877
1878        // Alice: 5 events → on the 5th, len == batch_size triggers flush
1879        // which drains all 5. Alice ends empty.
1880        for i in 0..5 {
1881            storage
1882                .append_event(event_with_tenant("alice", &format!("a-{i}")))
1883                .unwrap();
1884        }
1885        // Bob: 2 events → still under threshold, stays pending.
1886        for i in 0..2 {
1887            storage
1888                .append_event(event_with_tenant("bob", &format!("b-{i}")))
1889                .unwrap();
1890        }
1891
1892        assert_eq!(
1893            storage.pending_count(),
1894            2,
1895            "only bob's 2 events should be pending"
1896        );
1897
1898        let parquet_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
1899        assert_eq!(parquet_files.len(), 1, "only alice should have flushed");
1900        assert!(
1901            parquet_files[0]
1902                .to_string_lossy()
1903                .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
1904            "expected alice partition, got {}",
1905            parquet_files[0].display()
1906        );
1907    }
1908
1909    #[test]
1910    fn test_tenant_id_from_path_recovers_tenant_for_partitioned_files() {
1911        let root = Path::new("/data/storage");
1912        let f = Path::new("/data/storage/alice/2026-04/events-20260426-120000000-aaaa.parquet");
1913        assert_eq!(tenant_id_from_path(root, f), "alice");
1914    }
1915
1916    #[test]
1917    fn test_tenant_id_from_path_falls_back_to_default_for_legacy_flat_layout() {
1918        let root = Path::new("/data/storage");
1919        let f = Path::new("/data/storage/events-20260426-120000000-aaaa.parquet");
1920        // Legacy single-component path. Pre-tenant data was always
1921        // commingled with tenant=default, so default is the right fallback.
1922        assert_eq!(tenant_id_from_path(root, f), "default");
1923    }
1924
1925    #[test]
1926    fn test_sanitize_tenant_id_for_path_accepts_safe_inputs() {
1927        for ok in [
1928            "default",
1929            "system",
1930            "1e6b2d1c-2f64-4441-9cf9-42f2e451aa17",
1931            "onboard-diagnostic-160-at-example-com",
1932            "tenant_with_underscore",
1933            "v1.0",
1934        ] {
1935            assert!(
1936                sanitize_tenant_id_for_path(ok).is_ok(),
1937                "{ok:?} should be accepted"
1938            );
1939        }
1940    }
1941
1942    #[test]
1943    fn test_sanitize_tenant_id_for_path_rejects_unsafe_inputs() {
1944        for bad in [
1945            "",         // empty
1946            "..",       // parent traversal
1947            ".",        // current dir
1948            "foo/bar",  // path separator
1949            "foo\\bar", // windows-style separator
1950            "foo bar",  // whitespace
1951            "foo\nbar", // newline
1952            "foo\0bar", // null byte
1953            "tenant?",  // shell glob char
1954            "tenant*",  // shell glob char
1955        ] {
1956            assert!(
1957                sanitize_tenant_id_for_path(bad).is_err(),
1958                "{bad:?} should be rejected"
1959            );
1960        }
1961
1962        // Length cap.
1963        let too_long = "a".repeat(129);
1964        assert!(sanitize_tenant_id_for_path(&too_long).is_err());
1965    }
1966
1967    #[test]
1968    fn test_partition_path_for_tenant_shape() {
1969        let root = Path::new("/data");
1970        let when = chrono::DateTime::parse_from_rfc3339("2026-04-26T12:00:00Z")
1971            .unwrap()
1972            .with_timezone(&chrono::Utc);
1973        let path = partition_path_for_tenant(root, "alice", when).unwrap();
1974        assert_eq!(path, Path::new("/data/alice/2026-04"));
1975    }
1976
1977    #[test]
1978    fn test_append_event_rejects_unsafe_tenant_at_flush() {
1979        // Defence in depth: even if some upstream forgets to validate, the
1980        // sanitizer in flush_tenant catches it. Since append doesn't write
1981        // synchronously, the bad tenant is rejected on the first flush
1982        // attempt. We test that flush surfaces an error rather than
1983        // silently writing somewhere weird.
1984        let temp_dir = TempDir::new().unwrap();
1985        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
1986
1987        // append accepts whatever tenant_id the event carries — domain
1988        // construction would normally reject this, but if it slipped
1989        // through, flush should refuse to derive a path from it.
1990        storage
1991            .append_event(event_with_tenant("../escape", "e-0"))
1992            .unwrap();
1993        let result = storage.flush();
1994        assert!(result.is_err(), "flush should reject unsafe tenant_id");
1995        let msg = format!("{}", result.unwrap_err());
1996        assert!(
1997            msg.contains("disallowed character") || msg.contains("reserved"),
1998            "expected sanitization error message, got: {msg}"
1999        );
2000    }
2001
2002    // -----------------------------------------------------------------
2003    // Tenant-pruned read tests (Step 1, commit #4).
2004    // -----------------------------------------------------------------
2005
2006    #[test]
2007    fn test_load_events_for_tenant_only_walks_target_subtree() {
2008        // Seed three tenants with distinct event counts. Loading one
2009        // tenant must return only that tenant's events — and the file
2010        // list helper must report only that tenant's files (the strong
2011        // form of "didn't open the others").
2012        let temp_dir = TempDir::new().unwrap();
2013        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2014
2015        for i in 0..2 {
2016            storage
2017                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2018                .unwrap();
2019        }
2020        for i in 0..3 {
2021            storage
2022                .append_event(event_with_tenant("bob", &format!("b-{i}")))
2023                .unwrap();
2024        }
2025        for i in 0..1 {
2026            storage
2027                .append_event(event_with_tenant("carol", &format!("c-{i}")))
2028                .unwrap();
2029        }
2030        storage.flush().unwrap();
2031
2032        let alice_files = storage.list_parquet_files_for_tenant("alice").unwrap();
2033        assert_eq!(alice_files.len(), 1);
2034        assert!(
2035            alice_files[0]
2036                .to_string_lossy()
2037                .contains(&format!("alice{}", std::path::MAIN_SEPARATOR)),
2038            "expected alice file, got {}",
2039            alice_files[0].display()
2040        );
2041        // The pruned listing must NOT include any bob/carol files — this
2042        // is the property Step 2 will rely on to avoid loading every
2043        // tenant's data on a single-tenant query.
2044        for f in &alice_files {
2045            let s = f.to_string_lossy();
2046            assert!(!s.contains("bob"), "alice listing leaked bob file: {s}");
2047            assert!(!s.contains("carol"), "alice listing leaked carol file: {s}");
2048        }
2049
2050        let alice_events = storage.load_events_for_tenant("alice").unwrap();
2051        assert_eq!(alice_events.len(), 2);
2052        for e in &alice_events {
2053            assert_eq!(e.tenant_id_str(), "alice");
2054        }
2055
2056        let bob_events = storage.load_events_for_tenant("bob").unwrap();
2057        assert_eq!(bob_events.len(), 3);
2058        for e in &bob_events {
2059            assert_eq!(e.tenant_id_str(), "bob");
2060        }
2061
2062        let carol_events = storage.load_events_for_tenant("carol").unwrap();
2063        assert_eq!(carol_events.len(), 1);
2064        assert_eq!(carol_events[0].tenant_id_str(), "carol");
2065    }
2066
2067    #[test]
2068    fn test_load_events_for_tenant_returns_empty_when_subtree_missing() {
2069        // Querying a tenant that has never written must not error — it's
2070        // a normal "no data" case, not a misconfiguration. Important for
2071        // first-query latency on a fresh tenant.
2072        let temp_dir = TempDir::new().unwrap();
2073        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2074
2075        // Seed only alice so the storage_dir isn't empty (rule out the
2076        // empty-dir trivial case).
2077        storage
2078            .append_event(event_with_tenant("alice", "a-0"))
2079            .unwrap();
2080        storage.flush().unwrap();
2081
2082        let files = storage
2083            .list_parquet_files_for_tenant("nobody-here")
2084            .unwrap();
2085        assert!(files.is_empty());
2086
2087        let events = storage.load_events_for_tenant("nobody-here").unwrap();
2088        assert!(events.is_empty());
2089    }
2090
2091    #[test]
2092    fn test_load_events_for_tenant_rejects_unsafe_tenant_id() {
2093        // Path traversal must fail at the API boundary, not after disk
2094        // reads. Same whitelist as the write path.
2095        let temp_dir = TempDir::new().unwrap();
2096        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2097
2098        for unsafe_tid in ["..", "a/b", "a\\b", "", "a..b/.."] {
2099            let result = storage.load_events_for_tenant(unsafe_tid);
2100            assert!(
2101                result.is_err(),
2102                "tenant_id {unsafe_tid:?} should have been rejected"
2103            );
2104        }
2105    }
2106
2107    #[test]
2108    fn test_load_events_for_tenant_ignores_legacy_flat_layout_files() {
2109        // Flat-layout files at the storage root predate partitioning. A
2110        // tenant-scoped load must not pick them up — the migration tool
2111        // is what relocates them under default/. Until it runs, those
2112        // files are invisible to per-tenant queries (correct behavior:
2113        // the system has no way to tell which tenant they belong to
2114        // beyond "default", and pretending otherwise would mis-attribute
2115        // them).
2116        let temp_dir = TempDir::new().unwrap();
2117        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2118
2119        // Seed a flat-layout file (relocates default/<yyyy-mm>/ → root).
2120        let _flat = seed_flat_layout_file(&storage, 4);
2121
2122        // Querying default returns nothing — the flat file at the root
2123        // isn't under default/.
2124        let default_events = storage.load_events_for_tenant("default").unwrap();
2125        assert!(
2126            default_events.is_empty(),
2127            "tenant-scoped load must not pick up flat-layout files; got {} events",
2128            default_events.len()
2129        );
2130
2131        // Sanity: the full loader still sees them via the recursive walk.
2132        let all_events = storage.load_all_events().unwrap();
2133        assert_eq!(all_events.len(), 4);
2134    }
2135
2136    // -----------------------------------------------------------------
2137    // Atomic snapshot write tests (Step 4, commit #1).
2138    // -----------------------------------------------------------------
2139
2140    #[test]
2141    fn test_write_atomic_parquet_emits_file_under_tenant_partition() {
2142        // Happy path: write 3 events for tenant alice, get back a
2143        // path under <root>/alice/<yyyy-mm>/. The .tmp file should
2144        // be gone (rename completed); the final file readable via
2145        // load_events_for_tenant.
2146        let temp_dir = TempDir::new().unwrap();
2147        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2148
2149        let events: Vec<Event> = (0..3)
2150            .map(|i| event_with_tenant("alice", &format!("a-{i}")))
2151            .collect();
2152
2153        let final_path = storage
2154            .write_atomic_parquet("alice", "snapshot.alice.range", &events)
2155            .unwrap();
2156
2157        // Path shape: <root>/alice/<yyyy-mm>/snapshot.alice.range.parquet
2158        let rel = final_path
2159            .strip_prefix(temp_dir.path())
2160            .unwrap()
2161            .to_string_lossy()
2162            .into_owned();
2163        let parts: Vec<&str> = rel.split(std::path::MAIN_SEPARATOR).collect();
2164        assert_eq!(parts.len(), 3, "expected tenant/yyyy-mm/file, got {rel}");
2165        assert_eq!(parts[0], "alice");
2166        assert_eq!(parts[2], "snapshot.alice.range.parquet");
2167
2168        // Final file must exist; .tmp must NOT.
2169        assert!(final_path.is_file());
2170        let tmp = final_path.with_extension("parquet.tmp");
2171        assert!(
2172            !tmp.exists(),
2173            "tmp should have been renamed away; still at {}",
2174            tmp.display()
2175        );
2176
2177        // Loadable via the tenant loader.
2178        let loaded = storage.load_events_for_tenant("alice").unwrap();
2179        assert_eq!(loaded.len(), 3);
2180    }
2181
2182    #[test]
2183    fn test_write_atomic_parquet_rejects_empty_events() {
2184        let temp_dir = TempDir::new().unwrap();
2185        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2186        let result = storage.write_atomic_parquet("alice", "snap", &[]);
2187        assert!(result.is_err());
2188    }
2189
2190    #[test]
2191    fn test_write_atomic_parquet_rejects_unsafe_tenant() {
2192        let temp_dir = TempDir::new().unwrap();
2193        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2194        let events = [event_with_tenant("alice", "e-0")];
2195        for unsafe_tid in ["..", "a/b", ""] {
2196            let result = storage.write_atomic_parquet(unsafe_tid, "snap", &events);
2197            assert!(
2198                result.is_err(),
2199                "unsafe tenant_id {unsafe_tid:?} should have been rejected"
2200            );
2201        }
2202    }
2203
2204    #[test]
2205    fn test_cleanup_partial_writes_removes_orphan_tmps() {
2206        // Simulate a crashed mid-snapshot: pretend we have a leftover
2207        // events-x.parquet.tmp file in a tenant partition. Cleanup
2208        // must delete it without touching real .parquet files.
2209        let temp_dir = TempDir::new().unwrap();
2210        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2211
2212        // Seed a real parquet via a normal flush, so we have one
2213        // legit file we don't want to delete.
2214        for i in 0..2 {
2215            storage
2216                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2217                .unwrap();
2218        }
2219        storage.flush().unwrap();
2220        let real_files_before = find_parquet_files_recursive(temp_dir.path()).unwrap();
2221        assert_eq!(real_files_before.len(), 1);
2222
2223        // Manufacture an orphan .tmp file in alice's partition.
2224        let alice_subtree = temp_dir.path().join("alice");
2225        let orphan_dir = real_files_before[0].parent().unwrap();
2226        let orphan_path = orphan_dir.join("snapshot.alice.crashed.parquet.tmp");
2227        std::fs::write(&orphan_path, b"fake partial parquet").unwrap();
2228        assert!(orphan_path.is_file());
2229
2230        // And one nested deeper, just to confirm recursion.
2231        let nested_dir = alice_subtree.join("2099-01");
2232        std::fs::create_dir_all(&nested_dir).unwrap();
2233        let nested_orphan = nested_dir.join("events-x.parquet.tmp");
2234        std::fs::write(&nested_orphan, b"junk").unwrap();
2235
2236        let removed = storage.cleanup_partial_writes().unwrap();
2237        assert_eq!(removed, 2, "two orphan tmps should have been cleaned");
2238        assert!(!orphan_path.exists());
2239        assert!(!nested_orphan.exists());
2240
2241        // Real parquet untouched.
2242        let real_files_after = find_parquet_files_recursive(temp_dir.path()).unwrap();
2243        assert_eq!(real_files_after, real_files_before);
2244    }
2245
2246    #[test]
2247    fn test_cleanup_partial_writes_quarantines_zero_byte_parquet() {
2248        // Issue #166 regression: a pre-fix checkpoint crash left a 0-byte
2249        // events-*.parquet file with the FINAL extension (no .tmp). The
2250        // cleanup pass must rename it aside rather than delete, so an
2251        // operator can inspect.
2252        let temp_dir = TempDir::new().unwrap();
2253        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2254
2255        // Seed a real parquet alongside, to confirm we don't molest healthy data.
2256        for i in 0..2 {
2257            storage
2258                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2259                .unwrap();
2260        }
2261        storage.flush().unwrap();
2262        let healthy_files_before = find_parquet_files_recursive(temp_dir.path()).unwrap();
2263        assert_eq!(healthy_files_before.len(), 1);
2264        let healthy_file = &healthy_files_before[0];
2265        let healthy_dir = healthy_file.parent().unwrap();
2266
2267        // Drop a 0-byte parquet next to the healthy one (the issue's failure mode).
2268        let bricked = healthy_dir.join("events-bricked-deadbeef.parquet");
2269        std::fs::write(&bricked, b"").unwrap();
2270        assert_eq!(std::fs::metadata(&bricked).unwrap().len(), 0);
2271
2272        let acted = storage.cleanup_partial_writes().unwrap();
2273        assert_eq!(acted, 1, "only the 0-byte file should have been acted on");
2274
2275        // 0-byte file is gone from its original name, but a quarantined
2276        // sibling exists.
2277        assert!(!bricked.exists(), "0-byte file should have been renamed");
2278        let quarantined: Vec<_> = std::fs::read_dir(healthy_dir)
2279            .unwrap()
2280            .flatten()
2281            .map(|e| e.path())
2282            .filter(|p| {
2283                p.file_name()
2284                    .and_then(|n| n.to_str())
2285                    .is_some_and(|n| n.starts_with("events-bricked-deadbeef.parquet.corrupt-"))
2286            })
2287            .collect();
2288        assert_eq!(
2289            quarantined.len(),
2290            1,
2291            "expected one .parquet.corrupt-<ts> sibling"
2292        );
2293
2294        // Healthy file untouched.
2295        assert!(
2296            healthy_file.exists(),
2297            "healthy parquet must not be molested"
2298        );
2299    }
2300
2301    #[test]
2302    fn test_load_all_events_skips_zero_byte_parquet() {
2303        // Issue #166 defensive layer: a single bad file must not abort
2304        // the whole load. With t-d8b1's match-and-continue, the healthy
2305        // events still come back and the bad file is logged-and-skipped.
2306        let temp_dir = TempDir::new().unwrap();
2307        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2308
2309        // Two healthy events, flushed.
2310        for i in 0..2 {
2311            storage
2312                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2313                .unwrap();
2314        }
2315        storage.flush().unwrap();
2316
2317        // Now drop a 0-byte parquet alongside (sneaking it in AFTER
2318        // construction so cleanup_partial_writes doesn't rename it
2319        // before we get to test the load path).
2320        let healthy_files = find_parquet_files_recursive(temp_dir.path()).unwrap();
2321        let bricked = healthy_files[0]
2322            .parent()
2323            .unwrap()
2324            .join("events-bricked-cafef00d.parquet");
2325        std::fs::write(&bricked, b"").unwrap();
2326
2327        let loaded = storage.load_all_events().unwrap();
2328        assert_eq!(
2329            loaded.len(),
2330            2,
2331            "all healthy events should still load despite the 0-byte file"
2332        );
2333    }
2334
2335    #[test]
2336    fn test_load_events_for_tenant_skips_zero_byte_parquet() {
2337        // Same defensive layer for the lazy-load path. A bad file in
2338        // the tenant subtree must not poison subsequent queries.
2339        let temp_dir = TempDir::new().unwrap();
2340        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2341
2342        for i in 0..3 {
2343            storage
2344                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2345                .unwrap();
2346        }
2347        storage.flush().unwrap();
2348
2349        let healthy = find_parquet_files_recursive(temp_dir.path()).unwrap();
2350        let bricked = healthy[0]
2351            .parent()
2352            .unwrap()
2353            .join("events-bricked-feedface.parquet");
2354        std::fs::write(&bricked, b"").unwrap();
2355
2356        let events = storage.load_events_for_tenant("alice").unwrap();
2357        assert_eq!(events.len(), 3, "lazy load must skip the 0-byte file");
2358    }
2359
2360    #[test]
2361    fn test_flush_tenant_leaves_no_zero_byte_parquet_after_normal_flush() {
2362        // Direct verification of t-c1d3: a successful flush_tenant
2363        // leaves exactly one healthy *.parquet — no .tmp survivors,
2364        // no 0-byte files.
2365        let temp_dir = TempDir::new().unwrap();
2366        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2367
2368        for i in 0..5 {
2369            storage
2370                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2371                .unwrap();
2372        }
2373        storage.flush().unwrap();
2374
2375        let mut tmp_count = 0;
2376        let mut zero_byte_count = 0;
2377        let mut healthy_count = 0;
2378        let mut stack = vec![temp_dir.path().to_path_buf()];
2379        while let Some(d) = stack.pop() {
2380            for entry in std::fs::read_dir(&d).unwrap().flatten() {
2381                let p = entry.path();
2382                if p.is_dir() {
2383                    stack.push(p);
2384                    continue;
2385                }
2386                let name = p.file_name().unwrap().to_string_lossy().into_owned();
2387                if name.ends_with(".parquet.tmp") {
2388                    tmp_count += 1;
2389                } else if name.ends_with(".parquet") {
2390                    if std::fs::metadata(&p).unwrap().len() == 0 {
2391                        zero_byte_count += 1;
2392                    } else {
2393                        healthy_count += 1;
2394                    }
2395                }
2396            }
2397        }
2398        assert_eq!(tmp_count, 0, ".parquet.tmp survivors after flush");
2399        assert_eq!(zero_byte_count, 0, "0-byte .parquet survivors after flush");
2400        assert_eq!(healthy_count, 1, "expected exactly one healthy parquet");
2401    }
2402
2403    #[test]
2404    fn test_new_calls_cleanup_partial_writes_on_boot() {
2405        // Drop a stale .tmp into a directory, then construct a
2406        // fresh ParquetStorage on it. The constructor must clean
2407        // it up.
2408        let temp_dir = TempDir::new().unwrap();
2409        let stale = temp_dir.path().join("orphan.parquet.tmp");
2410        std::fs::write(&stale, b"crash detritus").unwrap();
2411        assert!(stale.is_file());
2412
2413        let _storage = ParquetStorage::new(temp_dir.path()).unwrap();
2414        assert!(
2415            !stale.exists(),
2416            "stale tmp should have been cleaned by ParquetStorage::new"
2417        );
2418    }
2419
2420    // -----------------------------------------------------------------
2421    // Migration tests (Step 1, commit #3: flat → tenant-tree migration).
2422    // -----------------------------------------------------------------
2423
2424    /// Helper: produce a flat-layout Parquet file at the storage root,
2425    /// matching what pre-#2 deploys wrote. Uses the existing flush path
2426    /// briefly and then relocates the resulting file from
2427    /// default/<yyyy-mm>/ back up to the root, simulating the legacy
2428    /// state.
2429    fn seed_flat_layout_file(storage: &ParquetStorage, count: usize) -> PathBuf {
2430        for i in 0..count {
2431            storage
2432                .append_event(create_test_event(&format!("entity-{i}")))
2433                .unwrap();
2434        }
2435        storage.flush().unwrap();
2436
2437        // create_test_event uses tenant="default", so the just-flushed file
2438        // landed under <root>/default/<yyyy-mm>/. Find the newest file in
2439        // that subtree to avoid picking up files from other tenants seeded
2440        // by the test before us.
2441        let default_subtree = storage.storage_dir().join("default");
2442        let candidates = find_parquet_files_recursive(&default_subtree).unwrap();
2443        assert!(
2444            !candidates.is_empty(),
2445            "seed expected at least one file under default/"
2446        );
2447        let src = candidates.into_iter().max().unwrap();
2448
2449        let dst = storage.storage_dir().join(src.file_name().unwrap());
2450        std::fs::rename(&src, &dst).unwrap();
2451        // Best-effort cleanup of the now-empty intermediate dirs so the
2452        // migration tool only sees the flat file. (`remove_dir` succeeds
2453        // only on empty dirs, which is exactly the safety we want here.)
2454        if let Some(month_dir) = src.parent() {
2455            let _ = std::fs::remove_dir(month_dir);
2456            if let Some(tenant_dir) = month_dir.parent() {
2457                let _ = std::fs::remove_dir(tenant_dir);
2458            }
2459        }
2460        dst
2461    }
2462
2463    #[test]
2464    fn test_migrate_flat_layout_dry_run_touches_nothing() {
2465        let temp_dir = TempDir::new().unwrap();
2466        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2467        let flat = seed_flat_layout_file(&storage, 7);
2468        assert!(flat.is_file(), "test setup: flat file should exist");
2469
2470        let report = storage.migrate_flat_layout(true).unwrap();
2471        assert!(report.dry_run);
2472        assert_eq!(report.flat_files_seen, 1);
2473        assert_eq!(report.events_migrated, 7);
2474        assert_eq!(report.flat_files_removed, 0);
2475        assert_eq!(report.partitions_written, 0);
2476        assert!(
2477            flat.is_file(),
2478            "flat file must still be present after dry run"
2479        );
2480    }
2481
2482    #[test]
2483    fn test_migrate_flat_layout_moves_events_into_default_tree_and_removes_flat() {
2484        let temp_dir = TempDir::new().unwrap();
2485        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2486        let flat = seed_flat_layout_file(&storage, 5);
2487
2488        let report = storage.migrate_flat_layout(false).unwrap();
2489        assert!(!report.dry_run);
2490        assert_eq!(report.flat_files_seen, 1);
2491        assert_eq!(report.flat_files_removed, 1);
2492        assert_eq!(report.events_migrated, 5);
2493        assert!(report.partitions_written >= 1);
2494        assert!(
2495            !flat.exists(),
2496            "flat file should be deleted after migration"
2497        );
2498
2499        let post = find_parquet_files_recursive(temp_dir.path()).unwrap();
2500        assert!(
2501            post.iter().all(|p| {
2502                let rel = p
2503                    .strip_prefix(temp_dir.path())
2504                    .unwrap()
2505                    .to_string_lossy()
2506                    .into_owned();
2507                rel.starts_with(&format!("default{}", std::path::MAIN_SEPARATOR))
2508            }),
2509            "all migrated files should be under default/"
2510        );
2511
2512        let loaded = storage.load_all_events().unwrap();
2513        assert_eq!(loaded.len(), 5);
2514    }
2515
2516    #[test]
2517    fn test_migrate_flat_layout_is_idempotent_when_re_run_after_completion() {
2518        let temp_dir = TempDir::new().unwrap();
2519        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2520        let _flat = seed_flat_layout_file(&storage, 4);
2521
2522        let first = storage.migrate_flat_layout(false).unwrap();
2523        assert_eq!(first.events_migrated, 4);
2524
2525        // Second run sees no flat files at the root, so it's a no-op —
2526        // events do not duplicate even if an operator runs the tool twice.
2527        let second = storage.migrate_flat_layout(false).unwrap();
2528        assert_eq!(second.flat_files_seen, 0);
2529        assert_eq!(second.events_migrated, 0);
2530        assert_eq!(second.flat_files_removed, 0);
2531
2532        let loaded = storage.load_all_events().unwrap();
2533        assert_eq!(loaded.len(), 4, "rerun must not duplicate or lose events");
2534    }
2535
2536    #[test]
2537    fn test_migrate_flat_layout_ignores_already_partitioned_data() {
2538        // Mixed state: a tenant tree already exists alongside one flat file.
2539        // Migration must touch only the flat file.
2540        let temp_dir = TempDir::new().unwrap();
2541        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2542
2543        for i in 0..3 {
2544            storage
2545                .append_event(event_with_tenant("alice", &format!("a-{i}")))
2546                .unwrap();
2547        }
2548        storage.flush().unwrap();
2549
2550        let _flat = seed_flat_layout_file(&storage, 2);
2551
2552        let report = storage.migrate_flat_layout(false).unwrap();
2553        assert_eq!(report.flat_files_seen, 1, "only the flat file is in scope");
2554        assert_eq!(report.events_migrated, 2);
2555
2556        let alice_files = find_parquet_files_recursive(&temp_dir.path().join("alice")).unwrap();
2557        assert_eq!(alice_files.len(), 1, "alice's tree must be untouched");
2558
2559        let loaded = storage.load_all_events().unwrap();
2560        assert_eq!(loaded.len(), 5);
2561        let alice_count = loaded
2562            .iter()
2563            .filter(|e| e.tenant_id_str() == "alice")
2564            .count();
2565        let default_count = loaded
2566            .iter()
2567            .filter(|e| e.tenant_id_str() == "default")
2568            .count();
2569        assert_eq!(alice_count, 3);
2570        assert_eq!(default_count, 2);
2571    }
2572
2573    #[test]
2574    fn test_migrate_flat_layout_with_no_flat_files_is_a_clean_noop() {
2575        let temp_dir = TempDir::new().unwrap();
2576        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
2577        let report = storage.migrate_flat_layout(false).unwrap();
2578        assert_eq!(report.flat_files_seen, 0);
2579        assert_eq!(report.events_migrated, 0);
2580    }
2581}