Skip to main content

allsource_core/infrastructure/persistence/
storage.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use arrow::array::{
4    Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
5    UInt64Builder,
6};
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow::record_batch::RecordBatch;
9use parquet::arrow::ArrowWriter;
10use parquet::file::properties::WriterProperties;
11use std::fs::{self, File};
12use std::path::{Path, PathBuf};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17/// Default batch size for Parquet writes (10,000 events as per US-023)
18pub const DEFAULT_BATCH_SIZE: usize = 10_000;
19
20/// Default flush timeout in milliseconds
21pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
22
23/// Configuration for ParquetStorage batch processing
24#[derive(Debug, Clone)]
25pub struct ParquetStorageConfig {
26    /// Batch size before automatic flush (default: 10,000)
27    pub batch_size: usize,
28    /// Timeout before flushing partial batch (default: 5 seconds)
29    pub flush_timeout: Duration,
30    /// Compression codec for Parquet files
31    pub compression: parquet::basic::Compression,
32}
33
34impl Default for ParquetStorageConfig {
35    fn default() -> Self {
36        Self {
37            batch_size: DEFAULT_BATCH_SIZE,
38            flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
39            compression: parquet::basic::Compression::SNAPPY,
40        }
41    }
42}
43
44impl ParquetStorageConfig {
45    /// High-throughput configuration optimized for large batch writes
46    pub fn high_throughput() -> Self {
47        Self {
48            batch_size: 50_000,
49            flush_timeout: Duration::from_secs(10),
50            compression: parquet::basic::Compression::SNAPPY,
51        }
52    }
53
54    /// Low-latency configuration for smaller, more frequent writes
55    pub fn low_latency() -> Self {
56        Self {
57            batch_size: 1_000,
58            flush_timeout: Duration::from_secs(1),
59            compression: parquet::basic::Compression::SNAPPY,
60        }
61    }
62}
63
64/// Statistics for batch write operations
65#[derive(Debug, Clone, Default)]
66pub struct BatchWriteStats {
67    /// Total batches written
68    pub batches_written: u64,
69    /// Total events written
70    pub events_written: u64,
71    /// Total bytes written
72    pub bytes_written: u64,
73    /// Average batch size
74    pub avg_batch_size: f64,
75    /// Events per second (throughput)
76    pub events_per_sec: f64,
77    /// Total write time in nanoseconds
78    pub total_write_time_ns: u64,
79    /// Number of timeout-triggered flushes
80    pub timeout_flushes: u64,
81    /// Number of size-triggered flushes
82    pub size_flushes: u64,
83}
84
85/// Result of a batch write operation
86#[derive(Debug, Clone)]
87pub struct BatchWriteResult {
88    /// Number of events written
89    pub events_written: usize,
90    /// Number of batches flushed to disk
91    pub batches_flushed: usize,
92    /// Total duration of the write operation
93    pub duration: Duration,
94    /// Write throughput in events per second
95    pub events_per_sec: f64,
96}
97
98/// Parquet-based persistent storage for events with batch processing
99///
100/// Features:
101/// - Configurable batch size (default: 10,000 events per US-023)
102/// - Timeout-based flushing for partial batches
103/// - Thread-safe batch accumulation
104/// - SNAPPY compression for efficient storage
105/// - Automatic flush on shutdown via Drop
106pub struct ParquetStorage {
107    /// Base directory for storing parquet files
108    storage_dir: PathBuf,
109
110    /// Current batch being accumulated (protected by mutex for thread safety)
111    current_batch: Mutex<Vec<Event>>,
112
113    /// Configuration
114    config: ParquetStorageConfig,
115
116    /// Schema for Arrow/Parquet
117    schema: Arc<Schema>,
118
119    /// Last flush timestamp for timeout tracking
120    last_flush_time: Mutex<Instant>,
121
122    /// Statistics tracking
123    batches_written: AtomicU64,
124    events_written: AtomicU64,
125    bytes_written: AtomicU64,
126    total_write_time_ns: AtomicU64,
127    timeout_flushes: AtomicU64,
128    size_flushes: AtomicU64,
129}
130
131impl ParquetStorage {
132    /// Create a new ParquetStorage with default configuration (10,000 event batches)
133    pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
134        Self::with_config(storage_dir, ParquetStorageConfig::default())
135    }
136
137    /// Create a new ParquetStorage with custom configuration
138    pub fn with_config(storage_dir: impl AsRef<Path>, config: ParquetStorageConfig) -> Result<Self> {
139        let storage_dir = storage_dir.as_ref().to_path_buf();
140
141        // Create storage directory if it doesn't exist
142        fs::create_dir_all(&storage_dir).map_err(|e| {
143            AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
144        })?;
145
146        // Define Arrow schema for events
147        let schema = Arc::new(Schema::new(vec![
148            Field::new("event_id", DataType::Utf8, false),
149            Field::new("event_type", DataType::Utf8, false),
150            Field::new("entity_id", DataType::Utf8, false),
151            Field::new("payload", DataType::Utf8, false),
152            Field::new(
153                "timestamp",
154                DataType::Timestamp(TimeUnit::Microsecond, None),
155                false,
156            ),
157            Field::new("metadata", DataType::Utf8, true),
158            Field::new("version", DataType::UInt64, false),
159        ]));
160
161        Ok(Self {
162            storage_dir,
163            current_batch: Mutex::new(Vec::with_capacity(config.batch_size)),
164            config,
165            schema,
166            last_flush_time: Mutex::new(Instant::now()),
167            batches_written: AtomicU64::new(0),
168            events_written: AtomicU64::new(0),
169            bytes_written: AtomicU64::new(0),
170            total_write_time_ns: AtomicU64::new(0),
171            timeout_flushes: AtomicU64::new(0),
172            size_flushes: AtomicU64::new(0),
173        })
174    }
175
176    /// Create storage with legacy batch size (1000) for backward compatibility
177    #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
178    pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
179        Self::with_config(
180            storage_dir,
181            ParquetStorageConfig {
182                batch_size: 1000,
183                ..Default::default()
184            },
185        )
186    }
187
188    /// Add an event to the current batch
189    ///
190    /// Events are buffered until either:
191    /// - Batch size reaches configured limit (default: 10,000)
192    /// - Flush timeout is exceeded
193    /// - Manual flush() is called
194    pub fn append_event(&self, event: Event) -> Result<()> {
195        let should_flush = {
196            let mut batch = self.current_batch.lock().unwrap();
197            batch.push(event);
198            batch.len() >= self.config.batch_size
199        };
200
201        if should_flush {
202            self.size_flushes.fetch_add(1, Ordering::Relaxed);
203            self.flush()?;
204        }
205
206        Ok(())
207    }
208
209    /// Add multiple events to the batch (optimized batch insertion)
210    ///
211    /// This is the preferred method for high-throughput ingestion.
212    /// Events are added atomically and flushed if batch size is reached.
213    pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
214        let start = Instant::now();
215        let event_count = events.len();
216        let mut batches_flushed = 0;
217
218        // Add events in chunks to avoid holding lock too long
219        let mut remaining_events = events.into_iter().peekable();
220
221        while remaining_events.peek().is_some() {
222            let should_flush = {
223                let mut batch = self.current_batch.lock().unwrap();
224                let available_space = self.config.batch_size.saturating_sub(batch.len());
225
226                if available_space == 0 {
227                    true
228                } else {
229                    // Take up to available_space events
230                    let to_add: Vec<Event> = remaining_events.by_ref().take(available_space).collect();
231                    batch.extend(to_add);
232                    batch.len() >= self.config.batch_size
233                }
234            };
235
236            if should_flush {
237                self.size_flushes.fetch_add(1, Ordering::Relaxed);
238                self.flush()?;
239                batches_flushed += 1;
240            }
241        }
242
243        let duration = start.elapsed();
244
245        Ok(BatchWriteResult {
246            events_written: event_count,
247            batches_flushed,
248            duration,
249            events_per_sec: event_count as f64 / duration.as_secs_f64(),
250        })
251    }
252
253    /// Check if a timeout-based flush is needed and perform it
254    ///
255    /// Call this periodically (e.g., from a background task) to ensure
256    /// partial batches are flushed within the configured timeout.
257    pub fn check_timeout_flush(&self) -> Result<bool> {
258        let should_flush = {
259            let last_flush = self.last_flush_time.lock().unwrap();
260            let batch = self.current_batch.lock().unwrap();
261            !batch.is_empty() && last_flush.elapsed() >= self.config.flush_timeout
262        };
263
264        if should_flush {
265            self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
266            self.flush()?;
267            Ok(true)
268        } else {
269            Ok(false)
270        }
271    }
272
273    /// Flush current batch to a Parquet file
274    ///
275    /// Thread-safe: Can be called from multiple threads.
276    pub fn flush(&self) -> Result<()> {
277        let events_to_write = {
278            let mut batch = self.current_batch.lock().unwrap();
279            if batch.is_empty() {
280                return Ok(());
281            }
282            std::mem::take(&mut *batch)
283        };
284
285        let batch_count = events_to_write.len();
286        tracing::info!("Flushing {} events to Parquet storage", batch_count);
287
288        let start = Instant::now();
289
290        // Create record batch from events
291        let record_batch = self.events_to_record_batch(&events_to_write)?;
292
293        // Generate filename with timestamp and unique suffix for concurrent writes
294        let filename = format!(
295            "events-{}-{}.parquet",
296            chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
297            uuid::Uuid::new_v4().as_simple()
298        );
299        let file_path = self.storage_dir.join(&filename);
300
301        // Write to Parquet file
302        let file = File::create(&file_path).map_err(|e| {
303            AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
304        })?;
305
306        let props = WriterProperties::builder()
307            .set_compression(self.config.compression)
308            .build();
309
310        let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
311
312        writer.write(&record_batch)?;
313        let file_metadata = writer.close()?;
314
315        let duration = start.elapsed();
316
317        // Update statistics
318        self.batches_written.fetch_add(1, Ordering::Relaxed);
319        self.events_written
320            .fetch_add(batch_count as u64, Ordering::Relaxed);
321        if let Some(size) = file_metadata.row_groups.first().map(|rg| rg.total_byte_size) {
322            self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
323        }
324        self.total_write_time_ns
325            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
326
327        // Update last flush time
328        {
329            let mut last_flush = self.last_flush_time.lock().unwrap();
330            *last_flush = Instant::now();
331        }
332
333        tracing::info!(
334            "Successfully wrote {} events to {} in {:?}",
335            batch_count,
336            file_path.display(),
337            duration
338        );
339
340        Ok(())
341    }
342
343    /// Force flush any remaining events (for shutdown handling)
344    ///
345    /// This ensures partial batches are persisted on graceful shutdown.
346    pub fn flush_on_shutdown(&self) -> Result<usize> {
347        let batch_size = {
348            let batch = self.current_batch.lock().unwrap();
349            batch.len()
350        };
351
352        if batch_size > 0 {
353            tracing::info!(
354                "Shutdown: flushing partial batch of {} events",
355                batch_size
356            );
357            self.flush()?;
358        }
359
360        Ok(batch_size)
361    }
362
363    /// Get batch write statistics
364    pub fn batch_stats(&self) -> BatchWriteStats {
365        let batches = self.batches_written.load(Ordering::Relaxed);
366        let events = self.events_written.load(Ordering::Relaxed);
367        let bytes = self.bytes_written.load(Ordering::Relaxed);
368        let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
369
370        let time_secs = time_ns as f64 / 1_000_000_000.0;
371
372        BatchWriteStats {
373            batches_written: batches,
374            events_written: events,
375            bytes_written: bytes,
376            avg_batch_size: if batches > 0 {
377                events as f64 / batches as f64
378            } else {
379                0.0
380            },
381            events_per_sec: if time_secs > 0.0 {
382                events as f64 / time_secs
383            } else {
384                0.0
385            },
386            total_write_time_ns: time_ns,
387            timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
388            size_flushes: self.size_flushes.load(Ordering::Relaxed),
389        }
390    }
391
392    /// Get current batch size (pending events)
393    pub fn pending_count(&self) -> usize {
394        self.current_batch.lock().unwrap().len()
395    }
396
397    /// Get configured batch size
398    pub fn batch_size(&self) -> usize {
399        self.config.batch_size
400    }
401
402    /// Get configured flush timeout
403    pub fn flush_timeout(&self) -> Duration {
404        self.config.flush_timeout
405    }
406
407    /// Convert events to Arrow RecordBatch
408    fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
409        let mut event_id_builder = StringBuilder::new();
410        let mut event_type_builder = StringBuilder::new();
411        let mut entity_id_builder = StringBuilder::new();
412        let mut payload_builder = StringBuilder::new();
413        let mut timestamp_builder = TimestampMicrosecondBuilder::new();
414        let mut metadata_builder = StringBuilder::new();
415        let mut version_builder = UInt64Builder::new();
416
417        for event in events {
418            event_id_builder.append_value(event.id.to_string());
419            event_type_builder.append_value(event.event_type_str());
420            entity_id_builder.append_value(event.entity_id_str());
421            payload_builder.append_value(serde_json::to_string(&event.payload)?);
422
423            // Convert timestamp to microseconds
424            let timestamp_micros = event.timestamp.timestamp_micros();
425            timestamp_builder.append_value(timestamp_micros);
426
427            if let Some(ref metadata) = event.metadata {
428                metadata_builder.append_value(serde_json::to_string(metadata)?);
429            } else {
430                metadata_builder.append_null();
431            }
432
433            version_builder.append_value(event.version as u64);
434        }
435
436        let arrays: Vec<ArrayRef> = vec![
437            Arc::new(event_id_builder.finish()),
438            Arc::new(event_type_builder.finish()),
439            Arc::new(entity_id_builder.finish()),
440            Arc::new(payload_builder.finish()),
441            Arc::new(timestamp_builder.finish()),
442            Arc::new(metadata_builder.finish()),
443            Arc::new(version_builder.finish()),
444        ];
445
446        let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
447
448        Ok(record_batch)
449    }
450
451    /// Load events from all Parquet files
452    pub fn load_all_events(&self) -> Result<Vec<Event>> {
453        let mut all_events = Vec::new();
454
455        // Read all parquet files in storage directory
456        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
457            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
458        })?;
459
460        let mut parquet_files: Vec<PathBuf> = entries
461            .filter_map(|entry| entry.ok())
462            .map(|entry| entry.path())
463            .filter(|path| {
464                path.extension()
465                    .and_then(|ext| ext.to_str())
466                    .map(|ext| ext == "parquet")
467                    .unwrap_or(false)
468            })
469            .collect();
470
471        // Sort files by name (which includes timestamp)
472        parquet_files.sort();
473
474        for file_path in parquet_files {
475            tracing::info!("Loading events from {}", file_path.display());
476            let file_events = self.load_events_from_file(&file_path)?;
477            all_events.extend(file_events);
478        }
479
480        tracing::info!("Loaded {} total events from storage", all_events.len());
481
482        Ok(all_events)
483    }
484
485    /// Load events from a single Parquet file
486    fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
487        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
488
489        let file = File::open(file_path).map_err(|e| {
490            AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
491        })?;
492
493        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
494        let mut reader = builder.build()?;
495
496        let mut events = Vec::new();
497
498        while let Some(Ok(batch)) = reader.next() {
499            let batch_events = self.record_batch_to_events(&batch)?;
500            events.extend(batch_events);
501        }
502
503        Ok(events)
504    }
505
506    /// Convert Arrow RecordBatch back to events
507    fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
508        let event_ids = batch
509            .column(0)
510            .as_any()
511            .downcast_ref::<arrow::array::StringArray>()
512            .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
513
514        let event_types = batch
515            .column(1)
516            .as_any()
517            .downcast_ref::<arrow::array::StringArray>()
518            .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
519
520        let entity_ids = batch
521            .column(2)
522            .as_any()
523            .downcast_ref::<arrow::array::StringArray>()
524            .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
525
526        let payloads = batch
527            .column(3)
528            .as_any()
529            .downcast_ref::<arrow::array::StringArray>()
530            .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
531
532        let timestamps = batch
533            .column(4)
534            .as_any()
535            .downcast_ref::<TimestampMicrosecondArray>()
536            .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
537
538        let metadatas = batch
539            .column(5)
540            .as_any()
541            .downcast_ref::<arrow::array::StringArray>()
542            .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
543
544        let versions = batch
545            .column(6)
546            .as_any()
547            .downcast_ref::<arrow::array::UInt64Array>()
548            .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
549
550        let mut events = Vec::new();
551
552        for i in 0..batch.num_rows() {
553            let id = uuid::Uuid::parse_str(event_ids.value(i))
554                .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
555
556            let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
557                .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
558
559            let metadata = if metadatas.is_null(i) {
560                None
561            } else {
562                Some(serde_json::from_str(metadatas.value(i))?)
563            };
564
565            let event = Event::reconstruct_from_strings(
566                id,
567                event_types.value(i).to_string(),
568                entity_ids.value(i).to_string(),
569                "default".to_string(), // Default tenant for backward compatibility
570                serde_json::from_str(payloads.value(i))?,
571                timestamp,
572                metadata,
573                versions.value(i) as i64,
574            );
575
576            events.push(event);
577        }
578
579        Ok(events)
580    }
581
582    /// Get storage statistics
583    pub fn stats(&self) -> Result<StorageStats> {
584        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
585            AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
586        })?;
587
588        let mut total_files = 0;
589        let mut total_size_bytes = 0u64;
590
591        for entry in entries.flatten() {
592            let path = entry.path();
593            if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
594                total_files += 1;
595                if let Ok(metadata) = entry.metadata() {
596                    total_size_bytes += metadata.len();
597                }
598            }
599        }
600
601        let current_batch_size = self.current_batch.lock().unwrap().len();
602
603        Ok(StorageStats {
604            total_files,
605            total_size_bytes,
606            storage_dir: self.storage_dir.clone(),
607            current_batch_size,
608        })
609    }
610}
611
612impl Drop for ParquetStorage {
613    fn drop(&mut self) {
614        // Ensure any remaining events are flushed on shutdown
615        if let Err(e) = self.flush_on_shutdown() {
616            tracing::error!("Failed to flush events on drop: {}", e);
617        }
618    }
619}
620
621#[derive(Debug, serde::Serialize)]
622pub struct StorageStats {
623    pub total_files: usize,
624    pub total_size_bytes: u64,
625    pub storage_dir: PathBuf,
626    pub current_batch_size: usize,
627}
628
629#[cfg(test)]
630mod tests {
631    use super::*;
632    use serde_json::json;
633    use std::sync::Arc;
634    use tempfile::TempDir;
635
636    fn create_test_event(entity_id: &str) -> Event {
637        Event::reconstruct_from_strings(
638            uuid::Uuid::new_v4(),
639            "test.event".to_string(),
640            entity_id.to_string(),
641            "default".to_string(),
642            json!({
643                "test": "data",
644                "value": 42
645            }),
646            chrono::Utc::now(),
647            None,
648            1,
649        )
650    }
651
652    #[test]
653    fn test_parquet_storage_write_read() {
654        let temp_dir = TempDir::new().unwrap();
655        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
656
657        // Add events
658        for i in 0..10 {
659            let event = create_test_event(&format!("entity-{}", i));
660            storage.append_event(event).unwrap();
661        }
662
663        // Flush to disk
664        storage.flush().unwrap();
665
666        // Load back
667        let loaded_events = storage.load_all_events().unwrap();
668        assert_eq!(loaded_events.len(), 10);
669    }
670
671    #[test]
672    fn test_storage_stats() {
673        let temp_dir = TempDir::new().unwrap();
674        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
675
676        // Add and flush events
677        for i in 0..5 {
678            storage
679                .append_event(create_test_event(&format!("entity-{}", i)))
680                .unwrap();
681        }
682        storage.flush().unwrap();
683
684        let stats = storage.stats().unwrap();
685        assert_eq!(stats.total_files, 1);
686        assert!(stats.total_size_bytes > 0);
687    }
688
689    #[test]
690    fn test_default_batch_size() {
691        let temp_dir = TempDir::new().unwrap();
692        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
693
694        // Default batch size should be 10,000 as per US-023
695        assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
696        assert_eq!(storage.batch_size(), 10_000);
697    }
698
699    #[test]
700    fn test_custom_config() {
701        let temp_dir = TempDir::new().unwrap();
702        let config = ParquetStorageConfig {
703            batch_size: 5_000,
704            flush_timeout: Duration::from_secs(2),
705            compression: parquet::basic::Compression::SNAPPY,
706        };
707        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
708
709        assert_eq!(storage.batch_size(), 5_000);
710        assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
711    }
712
713    #[test]
714    fn test_batch_write() {
715        let temp_dir = TempDir::new().unwrap();
716        let config = ParquetStorageConfig {
717            batch_size: 100, // Small batch for testing
718            ..Default::default()
719        };
720        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
721
722        // Create 250 events (should trigger 2 flushes with 50 remaining)
723        let events: Vec<Event> = (0..250)
724            .map(|i| create_test_event(&format!("entity-{}", i)))
725            .collect();
726
727        let result = storage.batch_write(events).unwrap();
728        assert_eq!(result.events_written, 250);
729        assert_eq!(result.batches_flushed, 2);
730
731        // 50 events should be pending
732        assert_eq!(storage.pending_count(), 50);
733
734        // Flush remaining
735        storage.flush().unwrap();
736
737        // Load all events back
738        let loaded = storage.load_all_events().unwrap();
739        assert_eq!(loaded.len(), 250);
740    }
741
742    #[test]
743    fn test_auto_flush_on_batch_size() {
744        let temp_dir = TempDir::new().unwrap();
745        let config = ParquetStorageConfig {
746            batch_size: 10, // Very small for testing
747            ..Default::default()
748        };
749        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
750
751        // Add 15 events - should auto-flush at 10
752        for i in 0..15 {
753            storage
754                .append_event(create_test_event(&format!("entity-{}", i)))
755                .unwrap();
756        }
757
758        // Should have 5 pending, 10 written
759        assert_eq!(storage.pending_count(), 5);
760
761        let stats = storage.batch_stats();
762        assert_eq!(stats.events_written, 10);
763        assert_eq!(stats.batches_written, 1);
764        assert_eq!(stats.size_flushes, 1);
765    }
766
767    #[test]
768    fn test_flush_on_shutdown() {
769        let temp_dir = TempDir::new().unwrap();
770        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
771
772        // Add some events without reaching batch size
773        for i in 0..5 {
774            storage
775                .append_event(create_test_event(&format!("entity-{}", i)))
776                .unwrap();
777        }
778
779        assert_eq!(storage.pending_count(), 5);
780
781        // Manually trigger shutdown flush
782        let flushed = storage.flush_on_shutdown().unwrap();
783        assert_eq!(flushed, 5);
784        assert_eq!(storage.pending_count(), 0);
785
786        // Verify events are persisted
787        let loaded = storage.load_all_events().unwrap();
788        assert_eq!(loaded.len(), 5);
789    }
790
791    #[test]
792    fn test_thread_safe_writes() {
793        let temp_dir = TempDir::new().unwrap();
794        let config = ParquetStorageConfig {
795            batch_size: 100,
796            ..Default::default()
797        };
798        let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
799
800        let events_per_thread = 50;
801        let thread_count = 4;
802
803        std::thread::scope(|s| {
804            for t in 0..thread_count {
805                let storage_ref = Arc::clone(&storage);
806                s.spawn(move || {
807                    for i in 0..events_per_thread {
808                        let event = create_test_event(&format!("thread-{}-entity-{}", t, i));
809                        storage_ref.append_event(event).unwrap();
810                    }
811                });
812            }
813        });
814
815        // Flush remaining
816        storage.flush().unwrap();
817
818        // All events should be written
819        let loaded = storage.load_all_events().unwrap();
820        assert_eq!(loaded.len(), events_per_thread * thread_count);
821    }
822
823    #[test]
824    fn test_batch_stats() {
825        let temp_dir = TempDir::new().unwrap();
826        let config = ParquetStorageConfig {
827            batch_size: 50,
828            ..Default::default()
829        };
830        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
831
832        // Write 100 events (2 batches)
833        let events: Vec<Event> = (0..100)
834            .map(|i| create_test_event(&format!("entity-{}", i)))
835            .collect();
836
837        storage.batch_write(events).unwrap();
838
839        let stats = storage.batch_stats();
840        assert_eq!(stats.batches_written, 2);
841        assert_eq!(stats.events_written, 100);
842        assert!(stats.avg_batch_size > 0.0);
843        assert!(stats.events_per_sec > 0.0);
844        assert_eq!(stats.size_flushes, 2);
845    }
846
847    #[test]
848    fn test_config_presets() {
849        let high_throughput = ParquetStorageConfig::high_throughput();
850        assert_eq!(high_throughput.batch_size, 50_000);
851        assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
852
853        let low_latency = ParquetStorageConfig::low_latency();
854        assert_eq!(low_latency.batch_size, 1_000);
855        assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
856
857        let default = ParquetStorageConfig::default();
858        assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
859        assert_eq!(default.batch_size, 10_000);
860    }
861
862    /// Benchmark: Compare single-event writes vs batch writes
863    /// Run with: cargo test --release -- --ignored test_batch_write_throughput
864    #[test]
865    #[ignore]
866    fn test_batch_write_throughput() {
867        let temp_dir = TempDir::new().unwrap();
868        let storage = ParquetStorage::new(temp_dir.path()).unwrap();
869
870        let event_count = 50_000;
871
872        // Benchmark batch write
873        let events: Vec<Event> = (0..event_count)
874            .map(|i| create_test_event(&format!("entity-{}", i)))
875            .collect();
876
877        let start = std::time::Instant::now();
878        let result = storage.batch_write(events).unwrap();
879        storage.flush().unwrap(); // Flush any remaining
880        let batch_duration = start.elapsed();
881
882        let batch_stats = storage.batch_stats();
883
884        println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
885        println!("Events: {}", event_count);
886        println!("Duration: {:?}", batch_duration);
887        println!("Events/sec: {:.0}", result.events_per_sec);
888        println!("Batches written: {}", batch_stats.batches_written);
889        println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
890        println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
891
892        // Target: Batch writes should achieve at least 100K events/sec in release mode
893        // This represents 40%+ improvement over single-event writes
894        assert!(
895            result.events_per_sec > 10_000.0,
896            "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
897            result.events_per_sec
898        );
899    }
900
901    /// Benchmark: Single-event write baseline (for comparison)
902    #[test]
903    #[ignore]
904    fn test_single_event_write_baseline() {
905        let temp_dir = TempDir::new().unwrap();
906        let config = ParquetStorageConfig {
907            batch_size: 1, // Force flush after each event
908            ..Default::default()
909        };
910        let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
911
912        let event_count = 1_000; // Fewer events since this is slow
913
914        let start = std::time::Instant::now();
915        for i in 0..event_count {
916            let event = create_test_event(&format!("entity-{}", i));
917            storage.append_event(event).unwrap();
918        }
919        let duration = start.elapsed();
920
921        let events_per_sec = event_count as f64 / duration.as_secs_f64();
922
923        println!("\n=== Single-Event Write Baseline ===");
924        println!("Events: {}", event_count);
925        println!("Duration: {:?}", duration);
926        println!("Events/sec: {:.0}", events_per_sec);
927
928        // This should be significantly slower than batch writes
929        // Used as a baseline to demonstrate 40%+ improvement
930    }
931}