Skip to main content

allsource_core/infrastructure/persistence/
storage.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4};
5use arrow::{
6    array::{
7        Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8        UInt64Builder,
9    },
10    datatypes::{DataType, Field, Schema, TimeUnit},
11    record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15    fs::{self, File},
16    path::{Path, PathBuf},
17    sync::{
18        Arc, Mutex,
19        atomic::{AtomicU64, Ordering},
20    },
21    time::{Duration, Instant},
22};
23
24/// Default batch size for Parquet writes (10,000 events as per US-023)
25pub const DEFAULT_BATCH_SIZE: usize = 10_000;
26
27/// Default flush timeout in milliseconds
28pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
29
30/// Configuration for ParquetStorage batch processing
31#[derive(Debug, Clone)]
32pub struct ParquetStorageConfig {
33    /// Batch size before automatic flush (default: 10,000)
34    pub batch_size: usize,
35    /// Timeout before flushing partial batch (default: 5 seconds)
36    pub flush_timeout: Duration,
37    /// Compression codec for Parquet files
38    pub compression: parquet::basic::Compression,
39}
40
41impl Default for ParquetStorageConfig {
42    fn default() -> Self {
43        Self {
44            batch_size: DEFAULT_BATCH_SIZE,
45            flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
46            compression: parquet::basic::Compression::SNAPPY,
47        }
48    }
49}
50
51impl ParquetStorageConfig {
52    /// High-throughput configuration optimized for large batch writes
53    pub fn high_throughput() -> Self {
54        Self {
55            batch_size: 50_000,
56            flush_timeout: Duration::from_secs(10),
57            compression: parquet::basic::Compression::SNAPPY,
58        }
59    }
60
61    /// Low-latency configuration for smaller, more frequent writes
62    pub fn low_latency() -> Self {
63        Self {
64            batch_size: 1_000,
65            flush_timeout: Duration::from_secs(1),
66            compression: parquet::basic::Compression::SNAPPY,
67        }
68    }
69}
70
71/// Statistics for batch write operations
72#[derive(Debug, Clone, Default)]
73pub struct BatchWriteStats {
74    /// Total batches written
75    pub batches_written: u64,
76    /// Total events written
77    pub events_written: u64,
78    /// Total bytes written
79    pub bytes_written: u64,
80    /// Average batch size
81    pub avg_batch_size: f64,
82    /// Events per second (throughput)
83    pub events_per_sec: f64,
84    /// Total write time in nanoseconds
85    pub total_write_time_ns: u64,
86    /// Number of timeout-triggered flushes
87    pub timeout_flushes: u64,
88    /// Number of size-triggered flushes
89    pub size_flushes: u64,
90}
91
92/// Result of a batch write operation
93#[derive(Debug, Clone)]
94pub struct BatchWriteResult {
95    /// Number of events written
96    pub events_written: usize,
97    /// Number of batches flushed to disk
98    pub batches_flushed: usize,
99    /// Total duration of the write operation
100    pub duration: Duration,
101    /// Write throughput in events per second
102    pub events_per_sec: f64,
103}
104
105/// Parquet-based persistent storage for events with batch processing
106///
107/// Features:
108/// - Configurable batch size (default: 10,000 events per US-023)
109/// - Timeout-based flushing for partial batches
110/// - Thread-safe batch accumulation
111/// - SNAPPY compression for efficient storage
112/// - Automatic flush on shutdown via Drop
113pub struct ParquetStorage {
114    /// Base directory for storing parquet files
115    storage_dir: PathBuf,
116
117    /// Current batch being accumulated (protected by mutex for thread safety)
118    current_batch: Mutex<Vec<Event>>,
119
120    /// Configuration
121    config: ParquetStorageConfig,
122
123    /// Schema for Arrow/Parquet
124    schema: Arc<Schema>,
125
126    /// Last flush timestamp for timeout tracking
127    last_flush_time: Mutex<Instant>,
128
129    /// Statistics tracking
130    batches_written: AtomicU64,
131    events_written: AtomicU64,
132    bytes_written: AtomicU64,
133    total_write_time_ns: AtomicU64,
134    timeout_flushes: AtomicU64,
135    size_flushes: AtomicU64,
136}
137
138impl ParquetStorage {
139    /// Create a new ParquetStorage with default configuration (10,000 event batches)
140    pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
141        Self::with_config(storage_dir, ParquetStorageConfig::default())
142    }
143
144    /// Create a new ParquetStorage with custom configuration
145    pub fn with_config(
146        storage_dir: impl AsRef<Path>,
147        config: ParquetStorageConfig,
148    ) -> Result<Self> {
149        let storage_dir = storage_dir.as_ref().to_path_buf();
150
151        // Create storage directory if it doesn't exist
152        fs::create_dir_all(&storage_dir).map_err(|e| {
153            AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
154        })?;
155
156        // Define Arrow schema for events
157        let schema = Arc::new(Schema::new(vec![
158            Field::new("event_id", DataType::Utf8, false),
159            Field::new("event_type", DataType::Utf8, false),
160            Field::new("entity_id", DataType::Utf8, false),
161            Field::new("payload", DataType::Utf8, false),
162            Field::new(
163                "timestamp",
164                DataType::Timestamp(TimeUnit::Microsecond, None),
165                false,
166            ),
167            Field::new("metadata", DataType::Utf8, true),
168            Field::new("version", DataType::UInt64, false),
169        ]));
170
171        Ok(Self {
172            storage_dir,
173            current_batch: Mutex::new(Vec::with_capacity(config.batch_size)),
174            config,
175            schema,
176            last_flush_time: Mutex::new(Instant::now()),
177            batches_written: AtomicU64::new(0),
178            events_written: AtomicU64::new(0),
179            bytes_written: AtomicU64::new(0),
180            total_write_time_ns: AtomicU64::new(0),
181            timeout_flushes: AtomicU64::new(0),
182            size_flushes: AtomicU64::new(0),
183        })
184    }
185
186    /// Create storage with legacy batch size (1000) for backward compatibility
187    #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
188    pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
189        Self::with_config(
190            storage_dir,
191            ParquetStorageConfig {
192                batch_size: 1000,
193                ..Default::default()
194            },
195        )
196    }
197
198    /// Add an event to the current batch
199    ///
200    /// Events are buffered until either:
201    /// - Batch size reaches configured limit (default: 10,000)
202    /// - Flush timeout is exceeded
203    /// - Manual flush() is called
204    #[cfg_attr(feature = "hotpath", hotpath::measure)]
205    pub fn append_event(&self, event: Event) -> Result<()> {
206        let should_flush = {
207            let mut batch = self.current_batch.lock().unwrap();
208            batch.push(event);
209            batch.len() >= self.config.batch_size
210        };
211
212        if should_flush {
213            self.size_flushes.fetch_add(1, Ordering::Relaxed);
214            self.flush()?;
215        }
216
217        Ok(())
218    }
219
220    /// Add multiple events to the batch (optimized batch insertion)
221    ///
222    /// This is the preferred method for high-throughput ingestion.
223    /// Events are added atomically and flushed if batch size is reached.
224    #[cfg_attr(feature = "hotpath", hotpath::measure)]
225    pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
226        let start = Instant::now();
227        let event_count = events.len();
228        let mut batches_flushed = 0;
229
230        // Add events in chunks to avoid holding lock too long
231        let mut remaining_events = events.into_iter().peekable();
232
233        while remaining_events.peek().is_some() {
234            let should_flush = {
235                let mut batch = self.current_batch.lock().unwrap();
236                let available_space = self.config.batch_size.saturating_sub(batch.len());
237
238                if available_space == 0 {
239                    true
240                } else {
241                    // Take up to available_space events
242                    let to_add: Vec<Event> =
243                        remaining_events.by_ref().take(available_space).collect();
244                    batch.extend(to_add);
245                    batch.len() >= self.config.batch_size
246                }
247            };
248
249            if should_flush {
250                self.size_flushes.fetch_add(1, Ordering::Relaxed);
251                self.flush()?;
252                batches_flushed += 1;
253            }
254        }
255
256        let duration = start.elapsed();
257
258        Ok(BatchWriteResult {
259            events_written: event_count,
260            batches_flushed,
261            duration,
262            events_per_sec: event_count as f64 / duration.as_secs_f64(),
263        })
264    }
265
266    /// Check if a timeout-based flush is needed and perform it
267    ///
268    /// Call this periodically (e.g., from a background task) to ensure
269    /// partial batches are flushed within the configured timeout.
270    #[cfg_attr(feature = "hotpath", hotpath::measure)]
271    pub fn check_timeout_flush(&self) -> Result<bool> {
272        let should_flush = {
273            let last_flush = self.last_flush_time.lock().unwrap();
274            let batch = self.current_batch.lock().unwrap();
275            !batch.is_empty() && last_flush.elapsed() >= self.config.flush_timeout
276        };
277
278        if should_flush {
279            self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
280            self.flush()?;
281            Ok(true)
282        } else {
283            Ok(false)
284        }
285    }
286
287    /// Flush current batch to a Parquet file
288    ///
289    /// Thread-safe: Can be called from multiple threads.
290    #[cfg_attr(feature = "hotpath", hotpath::measure)]
291    pub fn flush(&self) -> Result<()> {
292        let events_to_write = {
293            let mut batch = self.current_batch.lock().unwrap();
294            if batch.is_empty() {
295                return Ok(());
296            }
297            std::mem::take(&mut *batch)
298        };
299
300        let batch_count = events_to_write.len();
301        tracing::info!("Flushing {} events to Parquet storage", batch_count);
302
303        let start = Instant::now();
304
305        // Create record batch from events
306        let record_batch = self.events_to_record_batch(&events_to_write)?;
307
308        // Generate filename with timestamp and unique suffix for concurrent writes
309        let filename = format!(
310            "events-{}-{}.parquet",
311            chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
312            uuid::Uuid::new_v4().as_simple()
313        );
314        let file_path = self.storage_dir.join(&filename);
315
316        // Write to Parquet file
317        let file = File::create(&file_path).map_err(|e| {
318            AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
319        })?;
320
321        let props = WriterProperties::builder()
322            .set_compression(self.config.compression)
323            .build();
324
325        let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
326
327        writer.write(&record_batch)?;
328        let file_metadata = writer.close()?;
329
330        let duration = start.elapsed();
331
332        // Update statistics
333        self.batches_written.fetch_add(1, Ordering::Relaxed);
334        self.events_written
335            .fetch_add(batch_count as u64, Ordering::Relaxed);
336        if let Some(size) = file_metadata
337            .row_groups()
338            .first()
339            .map(parquet::file::metadata::RowGroupMetaData::total_byte_size)
340        {
341            self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
342        }
343        self.total_write_time_ns
344            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
345
346        // Update last flush time
347        {
348            let mut last_flush = self.last_flush_time.lock().unwrap();
349            *last_flush = Instant::now();
350        }
351
352        tracing::info!(
353            "Successfully wrote {} events to {} in {:?}",
354            batch_count,
355            file_path.display(),
356            duration
357        );
358
359        Ok(())
360    }
361
362    /// Force flush any remaining events (for shutdown handling)
363    ///
364    /// This ensures partial batches are persisted on graceful shutdown.
365    #[cfg_attr(feature = "hotpath", hotpath::measure)]
366    pub fn flush_on_shutdown(&self) -> Result<usize> {
367        let batch_size = {
368            let batch = self.current_batch.lock().unwrap();
369            batch.len()
370        };
371
372        if batch_size > 0 {
373            tracing::info!("Shutdown: flushing partial batch of {} events", batch_size);
374            self.flush()?;
375        }
376
377        Ok(batch_size)
378    }
379
380    /// Get batch write statistics
381    pub fn batch_stats(&self) -> BatchWriteStats {
382        let batches = self.batches_written.load(Ordering::Relaxed);
383        let events = self.events_written.load(Ordering::Relaxed);
384        let bytes = self.bytes_written.load(Ordering::Relaxed);
385        let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
386
387        let time_secs = time_ns as f64 / 1_000_000_000.0;
388
389        BatchWriteStats {
390            batches_written: batches,
391            events_written: events,
392            bytes_written: bytes,
393            avg_batch_size: if batches > 0 {
394                events as f64 / batches as f64
395            } else {
396                0.0
397            },
398            events_per_sec: if time_secs > 0.0 {
399                events as f64 / time_secs
400            } else {
401                0.0
402            },
403            total_write_time_ns: time_ns,
404            timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
405            size_flushes: self.size_flushes.load(Ordering::Relaxed),
406        }
407    }
408
409    /// Get current batch size (pending events)
410    pub fn pending_count(&self) -> usize {
411        self.current_batch.lock().unwrap().len()
412    }
413
414    /// Get configured batch size
415    pub fn batch_size(&self) -> usize {
416        self.config.batch_size
417    }
418
419    /// Get configured flush timeout
420    pub fn flush_timeout(&self) -> Duration {
421        self.config.flush_timeout
422    }
423
424    /// Convert events to Arrow RecordBatch
425    #[cfg_attr(feature = "hotpath", hotpath::measure)]
426    fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
427        let mut event_id_builder = StringBuilder::new();
428        let mut event_type_builder = StringBuilder::new();
429        let mut entity_id_builder = StringBuilder::new();
430        let mut payload_builder = StringBuilder::new();
431        let mut timestamp_builder = TimestampMicrosecondBuilder::new();
432        let mut metadata_builder = StringBuilder::new();
433        let mut version_builder = UInt64Builder::new();
434
435        for event in events {
436            event_id_builder.append_value(event.id.to_string());
437            event_type_builder.append_value(event.event_type_str());
438            entity_id_builder.append_value(event.entity_id_str());
439            payload_builder.append_value(serde_json::to_string(&event.payload)?);
440
441            // Convert timestamp to microseconds
442            let timestamp_micros = event.timestamp.timestamp_micros();
443            timestamp_builder.append_value(timestamp_micros);
444
445            if let Some(ref metadata) = event.metadata {
446                metadata_builder.append_value(serde_json::to_string(metadata)?);
447            } else {
448                metadata_builder.append_null();
449            }
450
451            version_builder.append_value(event.version as u64);
452        }
453
454        let arrays: Vec<ArrayRef> = vec![
455            Arc::new(event_id_builder.finish()),
456            Arc::new(event_type_builder.finish()),
457            Arc::new(entity_id_builder.finish()),
458            Arc::new(payload_builder.finish()),
459            Arc::new(timestamp_builder.finish()),
460            Arc::new(metadata_builder.finish()),
461            Arc::new(version_builder.finish()),
462        ];
463
464        let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
465
466        Ok(record_batch)
467    }
468
469    /// Load events from all Parquet files
470    #[cfg_attr(feature = "hotpath", hotpath::measure)]
471    pub fn load_all_events(&self) -> Result<Vec<Event>> {
472        let mut all_events = Vec::new();
473
474        // Read all parquet files in storage directory
475        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
476            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
477        })?;
478
479        let mut parquet_files: Vec<PathBuf> = entries
480            .filter_map(std::result::Result::ok)
481            .map(|entry| entry.path())
482            .filter(|path| {
483                path.extension()
484                    .and_then(|ext| ext.to_str())
485                    .is_some_and(|ext| ext == "parquet")
486            })
487            .collect();
488
489        // Sort files by name (which includes timestamp)
490        parquet_files.sort();
491
492        for file_path in parquet_files {
493            tracing::info!("Loading events from {}", file_path.display());
494            let file_events = self.load_events_from_file(&file_path)?;
495            all_events.extend(file_events);
496        }
497
498        tracing::info!("Loaded {} total events from storage", all_events.len());
499
500        Ok(all_events)
501    }
502
503    /// Load events from a single Parquet file
504    #[cfg_attr(feature = "hotpath", hotpath::measure)]
505    fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
506        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
507
508        let file = File::open(file_path).map_err(|e| {
509            AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
510        })?;
511
512        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
513        let mut reader = builder.build()?;
514
515        let mut events = Vec::new();
516
517        while let Some(Ok(batch)) = reader.next() {
518            let batch_events = self.record_batch_to_events(&batch)?;
519            events.extend(batch_events);
520        }
521
522        Ok(events)
523    }
524
525    /// Convert Arrow RecordBatch back to events
526    #[cfg_attr(feature = "hotpath", hotpath::measure)]
527    fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
528        let event_ids = batch
529            .column(0)
530            .as_any()
531            .downcast_ref::<arrow::array::StringArray>()
532            .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
533
534        let event_types = batch
535            .column(1)
536            .as_any()
537            .downcast_ref::<arrow::array::StringArray>()
538            .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
539
540        let entity_ids = batch
541            .column(2)
542            .as_any()
543            .downcast_ref::<arrow::array::StringArray>()
544            .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
545
546        let payloads = batch
547            .column(3)
548            .as_any()
549            .downcast_ref::<arrow::array::StringArray>()
550            .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
551
552        let timestamps = batch
553            .column(4)
554            .as_any()
555            .downcast_ref::<TimestampMicrosecondArray>()
556            .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
557
558        let metadatas = batch
559            .column(5)
560            .as_any()
561            .downcast_ref::<arrow::array::StringArray>()
562            .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
563
564        let versions = batch
565            .column(6)
566            .as_any()
567            .downcast_ref::<arrow::array::UInt64Array>()
568            .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
569
570        let mut events = Vec::new();
571
572        for i in 0..batch.num_rows() {
573            let id = uuid::Uuid::parse_str(event_ids.value(i))
574                .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
575
576            let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
577                .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
578
579            let metadata = if metadatas.is_null(i) {
580                None
581            } else {
582                Some(serde_json::from_str(metadatas.value(i))?)
583            };
584
585            let event = Event::reconstruct_from_strings(
586                id,
587                event_types.value(i).to_string(),
588                entity_ids.value(i).to_string(),
589                "default".to_string(), // Default tenant for backward compatibility
590                serde_json::from_str(payloads.value(i))?,
591                timestamp,
592                metadata,
593                versions.value(i) as i64,
594            );
595
596            events.push(event);
597        }
598
599        Ok(events)
600    }
601
602    /// List all Parquet file paths in the storage directory, sorted by name.
603    ///
604    /// Used by the replication catch-up protocol to stream snapshot files
605    /// to followers that are too far behind for WAL-only catch-up.
606    pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
607        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
608            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
609        })?;
610
611        let mut parquet_files: Vec<PathBuf> = entries
612            .filter_map(std::result::Result::ok)
613            .map(|entry| entry.path())
614            .filter(|path| {
615                path.extension()
616                    .and_then(|ext| ext.to_str())
617                    .is_some_and(|ext| ext == "parquet")
618            })
619            .collect();
620
621        parquet_files.sort();
622        Ok(parquet_files)
623    }
624
625    /// Get the storage directory path.
626    pub fn storage_dir(&self) -> &Path {
627        &self.storage_dir
628    }
629
630    /// Get storage statistics
631    pub fn stats(&self) -> Result<StorageStats> {
632        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
633            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
634        })?;
635
636        let mut total_files = 0;
637        let mut total_size_bytes = 0u64;
638
639        for entry in entries.flatten() {
640            let path = entry.path();
641            if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
642                total_files += 1;
643                if let Ok(metadata) = entry.metadata() {
644                    total_size_bytes += metadata.len();
645                }
646            }
647        }
648
649        let current_batch_size = self.current_batch.lock().unwrap().len();
650
651        Ok(StorageStats {
652            total_files,
653            total_size_bytes,
654            storage_dir: self.storage_dir.clone(),
655            current_batch_size,
656        })
657    }
658}
659
660impl Drop for ParquetStorage {
661    fn drop(&mut self) {
662        // Ensure any remaining events are flushed on shutdown
663        if let Err(e) = self.flush_on_shutdown() {
664            tracing::error!("Failed to flush events on drop: {}", e);
665        }
666    }
667}
668
669#[derive(Debug, serde::Serialize)]
670pub struct StorageStats {
671    pub total_files: usize,
672    pub total_size_bytes: u64,
673    pub storage_dir: PathBuf,
674    pub current_batch_size: usize,
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use serde_json::json;
681    use std::sync::Arc;
682    use tempfile::TempDir;
683
684    fn create_test_event(entity_id: &str) -> Event {
685        Event::reconstruct_from_strings(
686            uuid::Uuid::new_v4(),
687            "test.event".to_string(),
688            entity_id.to_string(),
689            "default".to_string(),
690            json!({
691                "test": "data",
692                "value": 42
693            }),
694            chrono::Utc::now(),
695            None,
696            1,
697        )
698    }
699
700    #[test]
701    fn test_parquet_storage_write_read() {
702        let temp_dir = TempDir::new().unwrap();
703        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
704
705        // Add events
706        for i in 0..10 {
707            let event = create_test_event(&format!("entity-{i}"));
708            storage.append_event(event).unwrap();
709        }
710
711        // Flush to disk
712        storage.flush().unwrap();
713
714        // Load back
715        let loaded_events = storage.load_all_events().unwrap();
716        assert_eq!(loaded_events.len(), 10);
717    }
718
719    #[test]
720    fn test_storage_stats() {
721        let temp_dir = TempDir::new().unwrap();
722        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
723
724        // Add and flush events
725        for i in 0..5 {
726            storage
727                .append_event(create_test_event(&format!("entity-{i}")))
728                .unwrap();
729        }
730        storage.flush().unwrap();
731
732        let stats = storage.stats().unwrap();
733        assert_eq!(stats.total_files, 1);
734        assert!(stats.total_size_bytes > 0);
735    }
736
737    #[test]
738    fn test_default_batch_size() {
739        let temp_dir = TempDir::new().unwrap();
740        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
741
742        // Default batch size should be 10,000 as per US-023
743        assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
744        assert_eq!(storage.batch_size(), 10_000);
745    }
746
747    #[test]
748    fn test_custom_config() {
749        let temp_dir = TempDir::new().unwrap();
750        let config = ParquetStorageConfig {
751            batch_size: 5_000,
752            flush_timeout: Duration::from_secs(2),
753            compression: parquet::basic::Compression::SNAPPY,
754        };
755        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
756
757        assert_eq!(storage.batch_size(), 5_000);
758        assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
759    }
760
761    #[test]
762    fn test_batch_write() {
763        let temp_dir = TempDir::new().unwrap();
764        let config = ParquetStorageConfig {
765            batch_size: 100, // Small batch for testing
766            ..Default::default()
767        };
768        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
769
770        // Create 250 events (should trigger 2 flushes with 50 remaining)
771        let events: Vec<Event> = (0..250)
772            .map(|i| create_test_event(&format!("entity-{i}")))
773            .collect();
774
775        let result = storage.batch_write(events).unwrap();
776        assert_eq!(result.events_written, 250);
777        assert_eq!(result.batches_flushed, 2);
778
779        // 50 events should be pending
780        assert_eq!(storage.pending_count(), 50);
781
782        // Flush remaining
783        storage.flush().unwrap();
784
785        // Load all events back
786        let loaded = storage.load_all_events().unwrap();
787        assert_eq!(loaded.len(), 250);
788    }
789
790    #[test]
791    fn test_auto_flush_on_batch_size() {
792        let temp_dir = TempDir::new().unwrap();
793        let config = ParquetStorageConfig {
794            batch_size: 10, // Very small for testing
795            ..Default::default()
796        };
797        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
798
799        // Add 15 events - should auto-flush at 10
800        for i in 0..15 {
801            storage
802                .append_event(create_test_event(&format!("entity-{i}")))
803                .unwrap();
804        }
805
806        // Should have 5 pending, 10 written
807        assert_eq!(storage.pending_count(), 5);
808
809        let stats = storage.batch_stats();
810        assert_eq!(stats.events_written, 10);
811        assert_eq!(stats.batches_written, 1);
812        assert_eq!(stats.size_flushes, 1);
813    }
814
815    #[test]
816    fn test_flush_on_shutdown() {
817        let temp_dir = TempDir::new().unwrap();
818        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
819
820        // Add some events without reaching batch size
821        for i in 0..5 {
822            storage
823                .append_event(create_test_event(&format!("entity-{i}")))
824                .unwrap();
825        }
826
827        assert_eq!(storage.pending_count(), 5);
828
829        // Manually trigger shutdown flush
830        let flushed = storage.flush_on_shutdown().unwrap();
831        assert_eq!(flushed, 5);
832        assert_eq!(storage.pending_count(), 0);
833
834        // Verify events are persisted
835        let loaded = storage.load_all_events().unwrap();
836        assert_eq!(loaded.len(), 5);
837    }
838
839    #[test]
840    fn test_thread_safe_writes() {
841        let temp_dir = TempDir::new().unwrap();
842        let config = ParquetStorageConfig {
843            batch_size: 100,
844            ..Default::default()
845        };
846        let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
847
848        let events_per_thread = 50;
849        let thread_count = 4;
850
851        std::thread::scope(|s| {
852            for t in 0..thread_count {
853                let storage_ref = Arc::clone(&storage);
854                s.spawn(move || {
855                    for i in 0..events_per_thread {
856                        let event = create_test_event(&format!("thread-{t}-entity-{i}"));
857                        storage_ref.append_event(event).unwrap();
858                    }
859                });
860            }
861        });
862
863        // Flush remaining
864        storage.flush().unwrap();
865
866        // All events should be written
867        let loaded = storage.load_all_events().unwrap();
868        assert_eq!(loaded.len(), events_per_thread * thread_count);
869    }
870
871    #[test]
872    fn test_batch_stats() {
873        let temp_dir = TempDir::new().unwrap();
874        let config = ParquetStorageConfig {
875            batch_size: 50,
876            ..Default::default()
877        };
878        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
879
880        // Write 100 events (2 batches)
881        let events: Vec<Event> = (0..100)
882            .map(|i| create_test_event(&format!("entity-{i}")))
883            .collect();
884
885        storage.batch_write(events).unwrap();
886
887        let stats = storage.batch_stats();
888        assert_eq!(stats.batches_written, 2);
889        assert_eq!(stats.events_written, 100);
890        assert!(stats.avg_batch_size > 0.0);
891        assert!(stats.events_per_sec > 0.0);
892        assert_eq!(stats.size_flushes, 2);
893    }
894
895    #[test]
896    fn test_config_presets() {
897        let high_throughput = ParquetStorageConfig::high_throughput();
898        assert_eq!(high_throughput.batch_size, 50_000);
899        assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
900
901        let low_latency = ParquetStorageConfig::low_latency();
902        assert_eq!(low_latency.batch_size, 1_000);
903        assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
904
905        let default = ParquetStorageConfig::default();
906        assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
907        assert_eq!(default.batch_size, 10_000);
908    }
909
910    /// Benchmark: Compare single-event writes vs batch writes
911    /// Run with: cargo test --release -- --ignored test_batch_write_throughput
912    #[test]
913    #[ignore]
914    fn test_batch_write_throughput() {
915        let temp_dir = TempDir::new().unwrap();
916        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
917
918        let event_count = 50_000;
919
920        // Benchmark batch write
921        let events: Vec<Event> = (0..event_count)
922            .map(|i| create_test_event(&format!("entity-{i}")))
923            .collect();
924
925        let start = std::time::Instant::now();
926        let result = storage.batch_write(events).unwrap();
927        storage.flush().unwrap(); // Flush any remaining
928        let batch_duration = start.elapsed();
929
930        let batch_stats = storage.batch_stats();
931
932        println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
933        println!("Events: {event_count}");
934        println!("Duration: {batch_duration:?}");
935        println!("Events/sec: {:.0}", result.events_per_sec);
936        println!("Batches written: {}", batch_stats.batches_written);
937        println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
938        println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
939
940        // Target: Batch writes should achieve at least 100K events/sec in release mode
941        // This represents 40%+ improvement over single-event writes
942        assert!(
943            result.events_per_sec > 10_000.0,
944            "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
945            result.events_per_sec
946        );
947    }
948
949    /// Benchmark: Single-event write baseline (for comparison)
950    #[test]
951    #[ignore]
952    fn test_single_event_write_baseline() {
953        let temp_dir = TempDir::new().unwrap();
954        let config = ParquetStorageConfig {
955            batch_size: 1, // Force flush after each event
956            ..Default::default()
957        };
958        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
959
960        let event_count = 1_000; // Fewer events since this is slow
961
962        let start = std::time::Instant::now();
963        for i in 0..event_count {
964            let event = create_test_event(&format!("entity-{i}"));
965            storage.append_event(event).unwrap();
966        }
967        let duration = start.elapsed();
968
969        let events_per_sec = f64::from(event_count) / duration.as_secs_f64();
970
971        println!("\n=== Single-Event Write Baseline ===");
972        println!("Events: {event_count}");
973        println!("Duration: {duration:?}");
974        println!("Events/sec: {events_per_sec:.0}");
975
976        // This should be significantly slower than batch writes
977        // Used as a baseline to demonstrate 40%+ improvement
978    }
979}