Skip to main content

allsource_core/infrastructure/persistence/
storage.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4};
5use arrow::{
6    array::{
7        Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8        UInt64Builder,
9    },
10    datatypes::{DataType, Field, Schema, TimeUnit},
11    record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15    fs::{self, File},
16    path::{Path, PathBuf},
17    sync::{
18        Arc, Mutex,
19        atomic::{AtomicU64, Ordering},
20    },
21    time::{Duration, Instant},
22};
23
24/// Default batch size for Parquet writes (10,000 events as per US-023)
25pub const DEFAULT_BATCH_SIZE: usize = 10_000;
26
27/// Default flush timeout in milliseconds
28pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
29
30/// Configuration for ParquetStorage batch processing
31#[derive(Debug, Clone)]
32pub struct ParquetStorageConfig {
33    /// Batch size before automatic flush (default: 10,000)
34    pub batch_size: usize,
35    /// Timeout before flushing partial batch (default: 5 seconds)
36    pub flush_timeout: Duration,
37    /// Compression codec for Parquet files
38    pub compression: parquet::basic::Compression,
39}
40
41impl Default for ParquetStorageConfig {
42    fn default() -> Self {
43        Self {
44            batch_size: DEFAULT_BATCH_SIZE,
45            flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
46            compression: parquet::basic::Compression::SNAPPY,
47        }
48    }
49}
50
51impl ParquetStorageConfig {
52    /// High-throughput configuration optimized for large batch writes
53    pub fn high_throughput() -> Self {
54        Self {
55            batch_size: 50_000,
56            flush_timeout: Duration::from_secs(10),
57            compression: parquet::basic::Compression::SNAPPY,
58        }
59    }
60
61    /// Low-latency configuration for smaller, more frequent writes
62    pub fn low_latency() -> Self {
63        Self {
64            batch_size: 1_000,
65            flush_timeout: Duration::from_secs(1),
66            compression: parquet::basic::Compression::SNAPPY,
67        }
68    }
69}
70
71/// Statistics for batch write operations
72#[derive(Debug, Clone, Default)]
73pub struct BatchWriteStats {
74    /// Total batches written
75    pub batches_written: u64,
76    /// Total events written
77    pub events_written: u64,
78    /// Total bytes written
79    pub bytes_written: u64,
80    /// Average batch size
81    pub avg_batch_size: f64,
82    /// Events per second (throughput)
83    pub events_per_sec: f64,
84    /// Total write time in nanoseconds
85    pub total_write_time_ns: u64,
86    /// Number of timeout-triggered flushes
87    pub timeout_flushes: u64,
88    /// Number of size-triggered flushes
89    pub size_flushes: u64,
90}
91
92/// Result of a batch write operation
93#[derive(Debug, Clone)]
94pub struct BatchWriteResult {
95    /// Number of events written
96    pub events_written: usize,
97    /// Number of batches flushed to disk
98    pub batches_flushed: usize,
99    /// Total duration of the write operation
100    pub duration: Duration,
101    /// Write throughput in events per second
102    pub events_per_sec: f64,
103}
104
105/// Parquet-based persistent storage for events with batch processing
106///
107/// Features:
108/// - Configurable batch size (default: 10,000 events per US-023)
109/// - Timeout-based flushing for partial batches
110/// - Thread-safe batch accumulation
111/// - SNAPPY compression for efficient storage
112/// - Automatic flush on shutdown via Drop
113pub struct ParquetStorage {
114    /// Base directory for storing parquet files
115    storage_dir: PathBuf,
116
117    /// Current batch being accumulated (protected by mutex for thread safety)
118    current_batch: Mutex<Vec<Event>>,
119
120    /// Configuration
121    config: ParquetStorageConfig,
122
123    /// Schema for Arrow/Parquet
124    schema: Arc<Schema>,
125
126    /// Last flush timestamp for timeout tracking
127    last_flush_time: Mutex<Instant>,
128
129    /// Statistics tracking
130    batches_written: AtomicU64,
131    events_written: AtomicU64,
132    bytes_written: AtomicU64,
133    total_write_time_ns: AtomicU64,
134    timeout_flushes: AtomicU64,
135    size_flushes: AtomicU64,
136}
137
138impl ParquetStorage {
139    /// Create a new ParquetStorage with default configuration (10,000 event batches)
140    pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
141        Self::with_config(storage_dir, ParquetStorageConfig::default())
142    }
143
144    /// Create a new ParquetStorage with custom configuration
145    pub fn with_config(
146        storage_dir: impl AsRef<Path>,
147        config: ParquetStorageConfig,
148    ) -> Result<Self> {
149        let storage_dir = storage_dir.as_ref().to_path_buf();
150
151        // Create storage directory if it doesn't exist
152        fs::create_dir_all(&storage_dir).map_err(|e| {
153            AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
154        })?;
155
156        // Define Arrow schema for events
157        let schema = Arc::new(Schema::new(vec![
158            Field::new("event_id", DataType::Utf8, false),
159            Field::new("event_type", DataType::Utf8, false),
160            Field::new("entity_id", DataType::Utf8, false),
161            Field::new("payload", DataType::Utf8, false),
162            Field::new(
163                "timestamp",
164                DataType::Timestamp(TimeUnit::Microsecond, None),
165                false,
166            ),
167            Field::new("metadata", DataType::Utf8, true),
168            Field::new("version", DataType::UInt64, false),
169        ]));
170
171        Ok(Self {
172            storage_dir,
173            current_batch: Mutex::new(Vec::with_capacity(config.batch_size)),
174            config,
175            schema,
176            last_flush_time: Mutex::new(Instant::now()),
177            batches_written: AtomicU64::new(0),
178            events_written: AtomicU64::new(0),
179            bytes_written: AtomicU64::new(0),
180            total_write_time_ns: AtomicU64::new(0),
181            timeout_flushes: AtomicU64::new(0),
182            size_flushes: AtomicU64::new(0),
183        })
184    }
185
186    /// Create storage with legacy batch size (1000) for backward compatibility
187    #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
188    pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
189        Self::with_config(
190            storage_dir,
191            ParquetStorageConfig {
192                batch_size: 1000,
193                ..Default::default()
194            },
195        )
196    }
197
198    /// Add an event to the current batch
199    ///
200    /// Events are buffered until either:
201    /// - Batch size reaches configured limit (default: 10,000)
202    /// - Flush timeout is exceeded
203    /// - Manual flush() is called
204    pub fn append_event(&self, event: Event) -> Result<()> {
205        let should_flush = {
206            let mut batch = self.current_batch.lock().unwrap();
207            batch.push(event);
208            batch.len() >= self.config.batch_size
209        };
210
211        if should_flush {
212            self.size_flushes.fetch_add(1, Ordering::Relaxed);
213            self.flush()?;
214        }
215
216        Ok(())
217    }
218
219    /// Add multiple events to the batch (optimized batch insertion)
220    ///
221    /// This is the preferred method for high-throughput ingestion.
222    /// Events are added atomically and flushed if batch size is reached.
223    pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
224        let start = Instant::now();
225        let event_count = events.len();
226        let mut batches_flushed = 0;
227
228        // Add events in chunks to avoid holding lock too long
229        let mut remaining_events = events.into_iter().peekable();
230
231        while remaining_events.peek().is_some() {
232            let should_flush = {
233                let mut batch = self.current_batch.lock().unwrap();
234                let available_space = self.config.batch_size.saturating_sub(batch.len());
235
236                if available_space == 0 {
237                    true
238                } else {
239                    // Take up to available_space events
240                    let to_add: Vec<Event> =
241                        remaining_events.by_ref().take(available_space).collect();
242                    batch.extend(to_add);
243                    batch.len() >= self.config.batch_size
244                }
245            };
246
247            if should_flush {
248                self.size_flushes.fetch_add(1, Ordering::Relaxed);
249                self.flush()?;
250                batches_flushed += 1;
251            }
252        }
253
254        let duration = start.elapsed();
255
256        Ok(BatchWriteResult {
257            events_written: event_count,
258            batches_flushed,
259            duration,
260            events_per_sec: event_count as f64 / duration.as_secs_f64(),
261        })
262    }
263
264    /// Check if a timeout-based flush is needed and perform it
265    ///
266    /// Call this periodically (e.g., from a background task) to ensure
267    /// partial batches are flushed within the configured timeout.
268    pub fn check_timeout_flush(&self) -> Result<bool> {
269        let should_flush = {
270            let last_flush = self.last_flush_time.lock().unwrap();
271            let batch = self.current_batch.lock().unwrap();
272            !batch.is_empty() && last_flush.elapsed() >= self.config.flush_timeout
273        };
274
275        if should_flush {
276            self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
277            self.flush()?;
278            Ok(true)
279        } else {
280            Ok(false)
281        }
282    }
283
284    /// Flush current batch to a Parquet file
285    ///
286    /// Thread-safe: Can be called from multiple threads.
287    pub fn flush(&self) -> Result<()> {
288        let events_to_write = {
289            let mut batch = self.current_batch.lock().unwrap();
290            if batch.is_empty() {
291                return Ok(());
292            }
293            std::mem::take(&mut *batch)
294        };
295
296        let batch_count = events_to_write.len();
297        tracing::info!("Flushing {} events to Parquet storage", batch_count);
298
299        let start = Instant::now();
300
301        // Create record batch from events
302        let record_batch = self.events_to_record_batch(&events_to_write)?;
303
304        // Generate filename with timestamp and unique suffix for concurrent writes
305        let filename = format!(
306            "events-{}-{}.parquet",
307            chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
308            uuid::Uuid::new_v4().as_simple()
309        );
310        let file_path = self.storage_dir.join(&filename);
311
312        // Write to Parquet file
313        let file = File::create(&file_path).map_err(|e| {
314            AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
315        })?;
316
317        let props = WriterProperties::builder()
318            .set_compression(self.config.compression)
319            .build();
320
321        let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
322
323        writer.write(&record_batch)?;
324        let file_metadata = writer.close()?;
325
326        let duration = start.elapsed();
327
328        // Update statistics
329        self.batches_written.fetch_add(1, Ordering::Relaxed);
330        self.events_written
331            .fetch_add(batch_count as u64, Ordering::Relaxed);
332        if let Some(size) = file_metadata
333            .row_groups()
334            .first()
335            .map(|rg| rg.total_byte_size())
336        {
337            self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
338        }
339        self.total_write_time_ns
340            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
341
342        // Update last flush time
343        {
344            let mut last_flush = self.last_flush_time.lock().unwrap();
345            *last_flush = Instant::now();
346        }
347
348        tracing::info!(
349            "Successfully wrote {} events to {} in {:?}",
350            batch_count,
351            file_path.display(),
352            duration
353        );
354
355        Ok(())
356    }
357
358    /// Force flush any remaining events (for shutdown handling)
359    ///
360    /// This ensures partial batches are persisted on graceful shutdown.
361    pub fn flush_on_shutdown(&self) -> Result<usize> {
362        let batch_size = {
363            let batch = self.current_batch.lock().unwrap();
364            batch.len()
365        };
366
367        if batch_size > 0 {
368            tracing::info!("Shutdown: flushing partial batch of {} events", batch_size);
369            self.flush()?;
370        }
371
372        Ok(batch_size)
373    }
374
375    /// Get batch write statistics
376    pub fn batch_stats(&self) -> BatchWriteStats {
377        let batches = self.batches_written.load(Ordering::Relaxed);
378        let events = self.events_written.load(Ordering::Relaxed);
379        let bytes = self.bytes_written.load(Ordering::Relaxed);
380        let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
381
382        let time_secs = time_ns as f64 / 1_000_000_000.0;
383
384        BatchWriteStats {
385            batches_written: batches,
386            events_written: events,
387            bytes_written: bytes,
388            avg_batch_size: if batches > 0 {
389                events as f64 / batches as f64
390            } else {
391                0.0
392            },
393            events_per_sec: if time_secs > 0.0 {
394                events as f64 / time_secs
395            } else {
396                0.0
397            },
398            total_write_time_ns: time_ns,
399            timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
400            size_flushes: self.size_flushes.load(Ordering::Relaxed),
401        }
402    }
403
404    /// Get current batch size (pending events)
405    pub fn pending_count(&self) -> usize {
406        self.current_batch.lock().unwrap().len()
407    }
408
409    /// Get configured batch size
410    pub fn batch_size(&self) -> usize {
411        self.config.batch_size
412    }
413
414    /// Get configured flush timeout
415    pub fn flush_timeout(&self) -> Duration {
416        self.config.flush_timeout
417    }
418
419    /// Convert events to Arrow RecordBatch
420    fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
421        let mut event_id_builder = StringBuilder::new();
422        let mut event_type_builder = StringBuilder::new();
423        let mut entity_id_builder = StringBuilder::new();
424        let mut payload_builder = StringBuilder::new();
425        let mut timestamp_builder = TimestampMicrosecondBuilder::new();
426        let mut metadata_builder = StringBuilder::new();
427        let mut version_builder = UInt64Builder::new();
428
429        for event in events {
430            event_id_builder.append_value(event.id.to_string());
431            event_type_builder.append_value(event.event_type_str());
432            entity_id_builder.append_value(event.entity_id_str());
433            payload_builder.append_value(serde_json::to_string(&event.payload)?);
434
435            // Convert timestamp to microseconds
436            let timestamp_micros = event.timestamp.timestamp_micros();
437            timestamp_builder.append_value(timestamp_micros);
438
439            if let Some(ref metadata) = event.metadata {
440                metadata_builder.append_value(serde_json::to_string(metadata)?);
441            } else {
442                metadata_builder.append_null();
443            }
444
445            version_builder.append_value(event.version as u64);
446        }
447
448        let arrays: Vec<ArrayRef> = vec![
449            Arc::new(event_id_builder.finish()),
450            Arc::new(event_type_builder.finish()),
451            Arc::new(entity_id_builder.finish()),
452            Arc::new(payload_builder.finish()),
453            Arc::new(timestamp_builder.finish()),
454            Arc::new(metadata_builder.finish()),
455            Arc::new(version_builder.finish()),
456        ];
457
458        let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
459
460        Ok(record_batch)
461    }
462
463    /// Load events from all Parquet files
464    pub fn load_all_events(&self) -> Result<Vec<Event>> {
465        let mut all_events = Vec::new();
466
467        // Read all parquet files in storage directory
468        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
469            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
470        })?;
471
472        let mut parquet_files: Vec<PathBuf> = entries
473            .filter_map(|entry| entry.ok())
474            .map(|entry| entry.path())
475            .filter(|path| {
476                path.extension()
477                    .and_then(|ext| ext.to_str())
478                    .map(|ext| ext == "parquet")
479                    .unwrap_or(false)
480            })
481            .collect();
482
483        // Sort files by name (which includes timestamp)
484        parquet_files.sort();
485
486        for file_path in parquet_files {
487            tracing::info!("Loading events from {}", file_path.display());
488            let file_events = self.load_events_from_file(&file_path)?;
489            all_events.extend(file_events);
490        }
491
492        tracing::info!("Loaded {} total events from storage", all_events.len());
493
494        Ok(all_events)
495    }
496
497    /// Load events from a single Parquet file
498    fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
499        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
500
501        let file = File::open(file_path).map_err(|e| {
502            AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
503        })?;
504
505        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
506        let mut reader = builder.build()?;
507
508        let mut events = Vec::new();
509
510        while let Some(Ok(batch)) = reader.next() {
511            let batch_events = self.record_batch_to_events(&batch)?;
512            events.extend(batch_events);
513        }
514
515        Ok(events)
516    }
517
518    /// Convert Arrow RecordBatch back to events
519    fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
520        let event_ids = batch
521            .column(0)
522            .as_any()
523            .downcast_ref::<arrow::array::StringArray>()
524            .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
525
526        let event_types = batch
527            .column(1)
528            .as_any()
529            .downcast_ref::<arrow::array::StringArray>()
530            .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
531
532        let entity_ids = batch
533            .column(2)
534            .as_any()
535            .downcast_ref::<arrow::array::StringArray>()
536            .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
537
538        let payloads = batch
539            .column(3)
540            .as_any()
541            .downcast_ref::<arrow::array::StringArray>()
542            .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
543
544        let timestamps = batch
545            .column(4)
546            .as_any()
547            .downcast_ref::<TimestampMicrosecondArray>()
548            .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
549
550        let metadatas = batch
551            .column(5)
552            .as_any()
553            .downcast_ref::<arrow::array::StringArray>()
554            .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
555
556        let versions = batch
557            .column(6)
558            .as_any()
559            .downcast_ref::<arrow::array::UInt64Array>()
560            .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
561
562        let mut events = Vec::new();
563
564        for i in 0..batch.num_rows() {
565            let id = uuid::Uuid::parse_str(event_ids.value(i))
566                .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
567
568            let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
569                .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
570
571            let metadata = if metadatas.is_null(i) {
572                None
573            } else {
574                Some(serde_json::from_str(metadatas.value(i))?)
575            };
576
577            let event = Event::reconstruct_from_strings(
578                id,
579                event_types.value(i).to_string(),
580                entity_ids.value(i).to_string(),
581                "default".to_string(), // Default tenant for backward compatibility
582                serde_json::from_str(payloads.value(i))?,
583                timestamp,
584                metadata,
585                versions.value(i) as i64,
586            );
587
588            events.push(event);
589        }
590
591        Ok(events)
592    }
593
594    /// List all Parquet file paths in the storage directory, sorted by name.
595    ///
596    /// Used by the replication catch-up protocol to stream snapshot files
597    /// to followers that are too far behind for WAL-only catch-up.
598    pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
599        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
600            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
601        })?;
602
603        let mut parquet_files: Vec<PathBuf> = entries
604            .filter_map(|entry| entry.ok())
605            .map(|entry| entry.path())
606            .filter(|path| {
607                path.extension()
608                    .and_then(|ext| ext.to_str())
609                    .map(|ext| ext == "parquet")
610                    .unwrap_or(false)
611            })
612            .collect();
613
614        parquet_files.sort();
615        Ok(parquet_files)
616    }
617
618    /// Get the storage directory path.
619    pub fn storage_dir(&self) -> &Path {
620        &self.storage_dir
621    }
622
623    /// Get storage statistics
624    pub fn stats(&self) -> Result<StorageStats> {
625        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
626            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
627        })?;
628
629        let mut total_files = 0;
630        let mut total_size_bytes = 0u64;
631
632        for entry in entries.flatten() {
633            let path = entry.path();
634            if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
635                total_files += 1;
636                if let Ok(metadata) = entry.metadata() {
637                    total_size_bytes += metadata.len();
638                }
639            }
640        }
641
642        let current_batch_size = self.current_batch.lock().unwrap().len();
643
644        Ok(StorageStats {
645            total_files,
646            total_size_bytes,
647            storage_dir: self.storage_dir.clone(),
648            current_batch_size,
649        })
650    }
651}
652
653impl Drop for ParquetStorage {
654    fn drop(&mut self) {
655        // Ensure any remaining events are flushed on shutdown
656        if let Err(e) = self.flush_on_shutdown() {
657            tracing::error!("Failed to flush events on drop: {}", e);
658        }
659    }
660}
661
662#[derive(Debug, serde::Serialize)]
663pub struct StorageStats {
664    pub total_files: usize,
665    pub total_size_bytes: u64,
666    pub storage_dir: PathBuf,
667    pub current_batch_size: usize,
668}
669
670#[cfg(test)]
671mod tests {
672    use super::*;
673    use serde_json::json;
674    use std::sync::Arc;
675    use tempfile::TempDir;
676
677    fn create_test_event(entity_id: &str) -> Event {
678        Event::reconstruct_from_strings(
679            uuid::Uuid::new_v4(),
680            "test.event".to_string(),
681            entity_id.to_string(),
682            "default".to_string(),
683            json!({
684                "test": "data",
685                "value": 42
686            }),
687            chrono::Utc::now(),
688            None,
689            1,
690        )
691    }
692
693    #[test]
694    fn test_parquet_storage_write_read() {
695        let temp_dir = TempDir::new().unwrap();
696        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
697
698        // Add events
699        for i in 0..10 {
700            let event = create_test_event(&format!("entity-{}", i));
701            storage.append_event(event).unwrap();
702        }
703
704        // Flush to disk
705        storage.flush().unwrap();
706
707        // Load back
708        let loaded_events = storage.load_all_events().unwrap();
709        assert_eq!(loaded_events.len(), 10);
710    }
711
712    #[test]
713    fn test_storage_stats() {
714        let temp_dir = TempDir::new().unwrap();
715        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
716
717        // Add and flush events
718        for i in 0..5 {
719            storage
720                .append_event(create_test_event(&format!("entity-{}", i)))
721                .unwrap();
722        }
723        storage.flush().unwrap();
724
725        let stats = storage.stats().unwrap();
726        assert_eq!(stats.total_files, 1);
727        assert!(stats.total_size_bytes > 0);
728    }
729
730    #[test]
731    fn test_default_batch_size() {
732        let temp_dir = TempDir::new().unwrap();
733        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
734
735        // Default batch size should be 10,000 as per US-023
736        assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
737        assert_eq!(storage.batch_size(), 10_000);
738    }
739
740    #[test]
741    fn test_custom_config() {
742        let temp_dir = TempDir::new().unwrap();
743        let config = ParquetStorageConfig {
744            batch_size: 5_000,
745            flush_timeout: Duration::from_secs(2),
746            compression: parquet::basic::Compression::SNAPPY,
747        };
748        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
749
750        assert_eq!(storage.batch_size(), 5_000);
751        assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
752    }
753
754    #[test]
755    fn test_batch_write() {
756        let temp_dir = TempDir::new().unwrap();
757        let config = ParquetStorageConfig {
758            batch_size: 100, // Small batch for testing
759            ..Default::default()
760        };
761        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
762
763        // Create 250 events (should trigger 2 flushes with 50 remaining)
764        let events: Vec<Event> = (0..250)
765            .map(|i| create_test_event(&format!("entity-{}", i)))
766            .collect();
767
768        let result = storage.batch_write(events).unwrap();
769        assert_eq!(result.events_written, 250);
770        assert_eq!(result.batches_flushed, 2);
771
772        // 50 events should be pending
773        assert_eq!(storage.pending_count(), 50);
774
775        // Flush remaining
776        storage.flush().unwrap();
777
778        // Load all events back
779        let loaded = storage.load_all_events().unwrap();
780        assert_eq!(loaded.len(), 250);
781    }
782
783    #[test]
784    fn test_auto_flush_on_batch_size() {
785        let temp_dir = TempDir::new().unwrap();
786        let config = ParquetStorageConfig {
787            batch_size: 10, // Very small for testing
788            ..Default::default()
789        };
790        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
791
792        // Add 15 events - should auto-flush at 10
793        for i in 0..15 {
794            storage
795                .append_event(create_test_event(&format!("entity-{}", i)))
796                .unwrap();
797        }
798
799        // Should have 5 pending, 10 written
800        assert_eq!(storage.pending_count(), 5);
801
802        let stats = storage.batch_stats();
803        assert_eq!(stats.events_written, 10);
804        assert_eq!(stats.batches_written, 1);
805        assert_eq!(stats.size_flushes, 1);
806    }
807
808    #[test]
809    fn test_flush_on_shutdown() {
810        let temp_dir = TempDir::new().unwrap();
811        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
812
813        // Add some events without reaching batch size
814        for i in 0..5 {
815            storage
816                .append_event(create_test_event(&format!("entity-{}", i)))
817                .unwrap();
818        }
819
820        assert_eq!(storage.pending_count(), 5);
821
822        // Manually trigger shutdown flush
823        let flushed = storage.flush_on_shutdown().unwrap();
824        assert_eq!(flushed, 5);
825        assert_eq!(storage.pending_count(), 0);
826
827        // Verify events are persisted
828        let loaded = storage.load_all_events().unwrap();
829        assert_eq!(loaded.len(), 5);
830    }
831
832    #[test]
833    fn test_thread_safe_writes() {
834        let temp_dir = TempDir::new().unwrap();
835        let config = ParquetStorageConfig {
836            batch_size: 100,
837            ..Default::default()
838        };
839        let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
840
841        let events_per_thread = 50;
842        let thread_count = 4;
843
844        std::thread::scope(|s| {
845            for t in 0..thread_count {
846                let storage_ref = Arc::clone(&storage);
847                s.spawn(move || {
848                    for i in 0..events_per_thread {
849                        let event = create_test_event(&format!("thread-{}-entity-{}", t, i));
850                        storage_ref.append_event(event).unwrap();
851                    }
852                });
853            }
854        });
855
856        // Flush remaining
857        storage.flush().unwrap();
858
859        // All events should be written
860        let loaded = storage.load_all_events().unwrap();
861        assert_eq!(loaded.len(), events_per_thread * thread_count);
862    }
863
864    #[test]
865    fn test_batch_stats() {
866        let temp_dir = TempDir::new().unwrap();
867        let config = ParquetStorageConfig {
868            batch_size: 50,
869            ..Default::default()
870        };
871        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
872
873        // Write 100 events (2 batches)
874        let events: Vec<Event> = (0..100)
875            .map(|i| create_test_event(&format!("entity-{}", i)))
876            .collect();
877
878        storage.batch_write(events).unwrap();
879
880        let stats = storage.batch_stats();
881        assert_eq!(stats.batches_written, 2);
882        assert_eq!(stats.events_written, 100);
883        assert!(stats.avg_batch_size > 0.0);
884        assert!(stats.events_per_sec > 0.0);
885        assert_eq!(stats.size_flushes, 2);
886    }
887
888    #[test]
889    fn test_config_presets() {
890        let high_throughput = ParquetStorageConfig::high_throughput();
891        assert_eq!(high_throughput.batch_size, 50_000);
892        assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
893
894        let low_latency = ParquetStorageConfig::low_latency();
895        assert_eq!(low_latency.batch_size, 1_000);
896        assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
897
898        let default = ParquetStorageConfig::default();
899        assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
900        assert_eq!(default.batch_size, 10_000);
901    }
902
903    /// Benchmark: Compare single-event writes vs batch writes
904    /// Run with: cargo test --release -- --ignored test_batch_write_throughput
905    #[test]
906    #[ignore]
907    fn test_batch_write_throughput() {
908        let temp_dir = TempDir::new().unwrap();
909        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
910
911        let event_count = 50_000;
912
913        // Benchmark batch write
914        let events: Vec<Event> = (0..event_count)
915            .map(|i| create_test_event(&format!("entity-{}", i)))
916            .collect();
917
918        let start = std::time::Instant::now();
919        let result = storage.batch_write(events).unwrap();
920        storage.flush().unwrap(); // Flush any remaining
921        let batch_duration = start.elapsed();
922
923        let batch_stats = storage.batch_stats();
924
925        println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
926        println!("Events: {}", event_count);
927        println!("Duration: {:?}", batch_duration);
928        println!("Events/sec: {:.0}", result.events_per_sec);
929        println!("Batches written: {}", batch_stats.batches_written);
930        println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
931        println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
932
933        // Target: Batch writes should achieve at least 100K events/sec in release mode
934        // This represents 40%+ improvement over single-event writes
935        assert!(
936            result.events_per_sec > 10_000.0,
937            "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
938            result.events_per_sec
939        );
940    }
941
942    /// Benchmark: Single-event write baseline (for comparison)
943    #[test]
944    #[ignore]
945    fn test_single_event_write_baseline() {
946        let temp_dir = TempDir::new().unwrap();
947        let config = ParquetStorageConfig {
948            batch_size: 1, // Force flush after each event
949            ..Default::default()
950        };
951        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
952
953        let event_count = 1_000; // Fewer events since this is slow
954
955        let start = std::time::Instant::now();
956        for i in 0..event_count {
957            let event = create_test_event(&format!("entity-{}", i));
958            storage.append_event(event).unwrap();
959        }
960        let duration = start.elapsed();
961
962        let events_per_sec = event_count as f64 / duration.as_secs_f64();
963
964        println!("\n=== Single-Event Write Baseline ===");
965        println!("Events: {}", event_count);
966        println!("Duration: {:?}", duration);
967        println!("Events/sec: {:.0}", events_per_sec);
968
969        // This should be significantly slower than batch writes
970        // Used as a baseline to demonstrate 40%+ improvement
971    }
972}