1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use arrow::array::{
4 Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
5 UInt64Builder,
6};
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow::record_batch::RecordBatch;
9use parquet::arrow::ArrowWriter;
10use parquet::file::properties::WriterProperties;
11use std::fs::{self, File};
12use std::path::{Path, PathBuf};
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17pub const DEFAULT_BATCH_SIZE: usize = 10_000;
19
20pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
22
23#[derive(Debug, Clone)]
25pub struct ParquetStorageConfig {
26 pub batch_size: usize,
28 pub flush_timeout: Duration,
30 pub compression: parquet::basic::Compression,
32}
33
34impl Default for ParquetStorageConfig {
35 fn default() -> Self {
36 Self {
37 batch_size: DEFAULT_BATCH_SIZE,
38 flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
39 compression: parquet::basic::Compression::SNAPPY,
40 }
41 }
42}
43
44impl ParquetStorageConfig {
45 pub fn high_throughput() -> Self {
47 Self {
48 batch_size: 50_000,
49 flush_timeout: Duration::from_secs(10),
50 compression: parquet::basic::Compression::SNAPPY,
51 }
52 }
53
54 pub fn low_latency() -> Self {
56 Self {
57 batch_size: 1_000,
58 flush_timeout: Duration::from_secs(1),
59 compression: parquet::basic::Compression::SNAPPY,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct BatchWriteStats {
67 pub batches_written: u64,
69 pub events_written: u64,
71 pub bytes_written: u64,
73 pub avg_batch_size: f64,
75 pub events_per_sec: f64,
77 pub total_write_time_ns: u64,
79 pub timeout_flushes: u64,
81 pub size_flushes: u64,
83}
84
85#[derive(Debug, Clone)]
87pub struct BatchWriteResult {
88 pub events_written: usize,
90 pub batches_flushed: usize,
92 pub duration: Duration,
94 pub events_per_sec: f64,
96}
97
98pub struct ParquetStorage {
107 storage_dir: PathBuf,
109
110 current_batch: Mutex<Vec<Event>>,
112
113 config: ParquetStorageConfig,
115
116 schema: Arc<Schema>,
118
119 last_flush_time: Mutex<Instant>,
121
122 batches_written: AtomicU64,
124 events_written: AtomicU64,
125 bytes_written: AtomicU64,
126 total_write_time_ns: AtomicU64,
127 timeout_flushes: AtomicU64,
128 size_flushes: AtomicU64,
129}
130
131impl ParquetStorage {
132 pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
134 Self::with_config(storage_dir, ParquetStorageConfig::default())
135 }
136
137 pub fn with_config(storage_dir: impl AsRef<Path>, config: ParquetStorageConfig) -> Result<Self> {
139 let storage_dir = storage_dir.as_ref().to_path_buf();
140
141 fs::create_dir_all(&storage_dir).map_err(|e| {
143 AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
144 })?;
145
146 let schema = Arc::new(Schema::new(vec![
148 Field::new("event_id", DataType::Utf8, false),
149 Field::new("event_type", DataType::Utf8, false),
150 Field::new("entity_id", DataType::Utf8, false),
151 Field::new("payload", DataType::Utf8, false),
152 Field::new(
153 "timestamp",
154 DataType::Timestamp(TimeUnit::Microsecond, None),
155 false,
156 ),
157 Field::new("metadata", DataType::Utf8, true),
158 Field::new("version", DataType::UInt64, false),
159 ]));
160
161 Ok(Self {
162 storage_dir,
163 current_batch: Mutex::new(Vec::with_capacity(config.batch_size)),
164 config,
165 schema,
166 last_flush_time: Mutex::new(Instant::now()),
167 batches_written: AtomicU64::new(0),
168 events_written: AtomicU64::new(0),
169 bytes_written: AtomicU64::new(0),
170 total_write_time_ns: AtomicU64::new(0),
171 timeout_flushes: AtomicU64::new(0),
172 size_flushes: AtomicU64::new(0),
173 })
174 }
175
176 #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
178 pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
179 Self::with_config(
180 storage_dir,
181 ParquetStorageConfig {
182 batch_size: 1000,
183 ..Default::default()
184 },
185 )
186 }
187
188 pub fn append_event(&self, event: Event) -> Result<()> {
195 let should_flush = {
196 let mut batch = self.current_batch.lock().unwrap();
197 batch.push(event);
198 batch.len() >= self.config.batch_size
199 };
200
201 if should_flush {
202 self.size_flushes.fetch_add(1, Ordering::Relaxed);
203 self.flush()?;
204 }
205
206 Ok(())
207 }
208
209 pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
214 let start = Instant::now();
215 let event_count = events.len();
216 let mut batches_flushed = 0;
217
218 let mut remaining_events = events.into_iter().peekable();
220
221 while remaining_events.peek().is_some() {
222 let should_flush = {
223 let mut batch = self.current_batch.lock().unwrap();
224 let available_space = self.config.batch_size.saturating_sub(batch.len());
225
226 if available_space == 0 {
227 true
228 } else {
229 let to_add: Vec<Event> = remaining_events.by_ref().take(available_space).collect();
231 batch.extend(to_add);
232 batch.len() >= self.config.batch_size
233 }
234 };
235
236 if should_flush {
237 self.size_flushes.fetch_add(1, Ordering::Relaxed);
238 self.flush()?;
239 batches_flushed += 1;
240 }
241 }
242
243 let duration = start.elapsed();
244
245 Ok(BatchWriteResult {
246 events_written: event_count,
247 batches_flushed,
248 duration,
249 events_per_sec: event_count as f64 / duration.as_secs_f64(),
250 })
251 }
252
253 pub fn check_timeout_flush(&self) -> Result<bool> {
258 let should_flush = {
259 let last_flush = self.last_flush_time.lock().unwrap();
260 let batch = self.current_batch.lock().unwrap();
261 !batch.is_empty() && last_flush.elapsed() >= self.config.flush_timeout
262 };
263
264 if should_flush {
265 self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
266 self.flush()?;
267 Ok(true)
268 } else {
269 Ok(false)
270 }
271 }
272
273 pub fn flush(&self) -> Result<()> {
277 let events_to_write = {
278 let mut batch = self.current_batch.lock().unwrap();
279 if batch.is_empty() {
280 return Ok(());
281 }
282 std::mem::take(&mut *batch)
283 };
284
285 let batch_count = events_to_write.len();
286 tracing::info!("Flushing {} events to Parquet storage", batch_count);
287
288 let start = Instant::now();
289
290 let record_batch = self.events_to_record_batch(&events_to_write)?;
292
293 let filename = format!(
295 "events-{}-{}.parquet",
296 chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
297 uuid::Uuid::new_v4().as_simple()
298 );
299 let file_path = self.storage_dir.join(&filename);
300
301 let file = File::create(&file_path).map_err(|e| {
303 AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
304 })?;
305
306 let props = WriterProperties::builder()
307 .set_compression(self.config.compression)
308 .build();
309
310 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
311
312 writer.write(&record_batch)?;
313 let file_metadata = writer.close()?;
314
315 let duration = start.elapsed();
316
317 self.batches_written.fetch_add(1, Ordering::Relaxed);
319 self.events_written
320 .fetch_add(batch_count as u64, Ordering::Relaxed);
321 if let Some(size) = file_metadata.row_groups.first().map(|rg| rg.total_byte_size) {
322 self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
323 }
324 self.total_write_time_ns
325 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
326
327 {
329 let mut last_flush = self.last_flush_time.lock().unwrap();
330 *last_flush = Instant::now();
331 }
332
333 tracing::info!(
334 "Successfully wrote {} events to {} in {:?}",
335 batch_count,
336 file_path.display(),
337 duration
338 );
339
340 Ok(())
341 }
342
343 pub fn flush_on_shutdown(&self) -> Result<usize> {
347 let batch_size = {
348 let batch = self.current_batch.lock().unwrap();
349 batch.len()
350 };
351
352 if batch_size > 0 {
353 tracing::info!(
354 "Shutdown: flushing partial batch of {} events",
355 batch_size
356 );
357 self.flush()?;
358 }
359
360 Ok(batch_size)
361 }
362
363 pub fn batch_stats(&self) -> BatchWriteStats {
365 let batches = self.batches_written.load(Ordering::Relaxed);
366 let events = self.events_written.load(Ordering::Relaxed);
367 let bytes = self.bytes_written.load(Ordering::Relaxed);
368 let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
369
370 let time_secs = time_ns as f64 / 1_000_000_000.0;
371
372 BatchWriteStats {
373 batches_written: batches,
374 events_written: events,
375 bytes_written: bytes,
376 avg_batch_size: if batches > 0 {
377 events as f64 / batches as f64
378 } else {
379 0.0
380 },
381 events_per_sec: if time_secs > 0.0 {
382 events as f64 / time_secs
383 } else {
384 0.0
385 },
386 total_write_time_ns: time_ns,
387 timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
388 size_flushes: self.size_flushes.load(Ordering::Relaxed),
389 }
390 }
391
392 pub fn pending_count(&self) -> usize {
394 self.current_batch.lock().unwrap().len()
395 }
396
397 pub fn batch_size(&self) -> usize {
399 self.config.batch_size
400 }
401
402 pub fn flush_timeout(&self) -> Duration {
404 self.config.flush_timeout
405 }
406
407 fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
409 let mut event_id_builder = StringBuilder::new();
410 let mut event_type_builder = StringBuilder::new();
411 let mut entity_id_builder = StringBuilder::new();
412 let mut payload_builder = StringBuilder::new();
413 let mut timestamp_builder = TimestampMicrosecondBuilder::new();
414 let mut metadata_builder = StringBuilder::new();
415 let mut version_builder = UInt64Builder::new();
416
417 for event in events {
418 event_id_builder.append_value(event.id.to_string());
419 event_type_builder.append_value(event.event_type_str());
420 entity_id_builder.append_value(event.entity_id_str());
421 payload_builder.append_value(serde_json::to_string(&event.payload)?);
422
423 let timestamp_micros = event.timestamp.timestamp_micros();
425 timestamp_builder.append_value(timestamp_micros);
426
427 if let Some(ref metadata) = event.metadata {
428 metadata_builder.append_value(serde_json::to_string(metadata)?);
429 } else {
430 metadata_builder.append_null();
431 }
432
433 version_builder.append_value(event.version as u64);
434 }
435
436 let arrays: Vec<ArrayRef> = vec![
437 Arc::new(event_id_builder.finish()),
438 Arc::new(event_type_builder.finish()),
439 Arc::new(entity_id_builder.finish()),
440 Arc::new(payload_builder.finish()),
441 Arc::new(timestamp_builder.finish()),
442 Arc::new(metadata_builder.finish()),
443 Arc::new(version_builder.finish()),
444 ];
445
446 let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
447
448 Ok(record_batch)
449 }
450
451 pub fn load_all_events(&self) -> Result<Vec<Event>> {
453 let mut all_events = Vec::new();
454
455 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
457 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
458 })?;
459
460 let mut parquet_files: Vec<PathBuf> = entries
461 .filter_map(|entry| entry.ok())
462 .map(|entry| entry.path())
463 .filter(|path| {
464 path.extension()
465 .and_then(|ext| ext.to_str())
466 .map(|ext| ext == "parquet")
467 .unwrap_or(false)
468 })
469 .collect();
470
471 parquet_files.sort();
473
474 for file_path in parquet_files {
475 tracing::info!("Loading events from {}", file_path.display());
476 let file_events = self.load_events_from_file(&file_path)?;
477 all_events.extend(file_events);
478 }
479
480 tracing::info!("Loaded {} total events from storage", all_events.len());
481
482 Ok(all_events)
483 }
484
485 fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
487 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
488
489 let file = File::open(file_path).map_err(|e| {
490 AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
491 })?;
492
493 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
494 let mut reader = builder.build()?;
495
496 let mut events = Vec::new();
497
498 while let Some(Ok(batch)) = reader.next() {
499 let batch_events = self.record_batch_to_events(&batch)?;
500 events.extend(batch_events);
501 }
502
503 Ok(events)
504 }
505
506 fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
508 let event_ids = batch
509 .column(0)
510 .as_any()
511 .downcast_ref::<arrow::array::StringArray>()
512 .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
513
514 let event_types = batch
515 .column(1)
516 .as_any()
517 .downcast_ref::<arrow::array::StringArray>()
518 .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
519
520 let entity_ids = batch
521 .column(2)
522 .as_any()
523 .downcast_ref::<arrow::array::StringArray>()
524 .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
525
526 let payloads = batch
527 .column(3)
528 .as_any()
529 .downcast_ref::<arrow::array::StringArray>()
530 .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
531
532 let timestamps = batch
533 .column(4)
534 .as_any()
535 .downcast_ref::<TimestampMicrosecondArray>()
536 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
537
538 let metadatas = batch
539 .column(5)
540 .as_any()
541 .downcast_ref::<arrow::array::StringArray>()
542 .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
543
544 let versions = batch
545 .column(6)
546 .as_any()
547 .downcast_ref::<arrow::array::UInt64Array>()
548 .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
549
550 let mut events = Vec::new();
551
552 for i in 0..batch.num_rows() {
553 let id = uuid::Uuid::parse_str(event_ids.value(i))
554 .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
555
556 let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
557 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
558
559 let metadata = if metadatas.is_null(i) {
560 None
561 } else {
562 Some(serde_json::from_str(metadatas.value(i))?)
563 };
564
565 let event = Event::reconstruct_from_strings(
566 id,
567 event_types.value(i).to_string(),
568 entity_ids.value(i).to_string(),
569 "default".to_string(), serde_json::from_str(payloads.value(i))?,
571 timestamp,
572 metadata,
573 versions.value(i) as i64,
574 );
575
576 events.push(event);
577 }
578
579 Ok(events)
580 }
581
582 pub fn stats(&self) -> Result<StorageStats> {
584 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
585 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
586 })?;
587
588 let mut total_files = 0;
589 let mut total_size_bytes = 0u64;
590
591 for entry in entries.flatten() {
592 let path = entry.path();
593 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
594 total_files += 1;
595 if let Ok(metadata) = entry.metadata() {
596 total_size_bytes += metadata.len();
597 }
598 }
599 }
600
601 let current_batch_size = self.current_batch.lock().unwrap().len();
602
603 Ok(StorageStats {
604 total_files,
605 total_size_bytes,
606 storage_dir: self.storage_dir.clone(),
607 current_batch_size,
608 })
609 }
610}
611
612impl Drop for ParquetStorage {
613 fn drop(&mut self) {
614 if let Err(e) = self.flush_on_shutdown() {
616 tracing::error!("Failed to flush events on drop: {}", e);
617 }
618 }
619}
620
621#[derive(Debug, serde::Serialize)]
622pub struct StorageStats {
623 pub total_files: usize,
624 pub total_size_bytes: u64,
625 pub storage_dir: PathBuf,
626 pub current_batch_size: usize,
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632 use serde_json::json;
633 use std::sync::Arc;
634 use tempfile::TempDir;
635
636 fn create_test_event(entity_id: &str) -> Event {
637 Event::reconstruct_from_strings(
638 uuid::Uuid::new_v4(),
639 "test.event".to_string(),
640 entity_id.to_string(),
641 "default".to_string(),
642 json!({
643 "test": "data",
644 "value": 42
645 }),
646 chrono::Utc::now(),
647 None,
648 1,
649 )
650 }
651
652 #[test]
653 fn test_parquet_storage_write_read() {
654 let temp_dir = TempDir::new().unwrap();
655 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
656
657 for i in 0..10 {
659 let event = create_test_event(&format!("entity-{}", i));
660 storage.append_event(event).unwrap();
661 }
662
663 storage.flush().unwrap();
665
666 let loaded_events = storage.load_all_events().unwrap();
668 assert_eq!(loaded_events.len(), 10);
669 }
670
671 #[test]
672 fn test_storage_stats() {
673 let temp_dir = TempDir::new().unwrap();
674 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
675
676 for i in 0..5 {
678 storage
679 .append_event(create_test_event(&format!("entity-{}", i)))
680 .unwrap();
681 }
682 storage.flush().unwrap();
683
684 let stats = storage.stats().unwrap();
685 assert_eq!(stats.total_files, 1);
686 assert!(stats.total_size_bytes > 0);
687 }
688
689 #[test]
690 fn test_default_batch_size() {
691 let temp_dir = TempDir::new().unwrap();
692 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
693
694 assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
696 assert_eq!(storage.batch_size(), 10_000);
697 }
698
699 #[test]
700 fn test_custom_config() {
701 let temp_dir = TempDir::new().unwrap();
702 let config = ParquetStorageConfig {
703 batch_size: 5_000,
704 flush_timeout: Duration::from_secs(2),
705 compression: parquet::basic::Compression::SNAPPY,
706 };
707 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
708
709 assert_eq!(storage.batch_size(), 5_000);
710 assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
711 }
712
713 #[test]
714 fn test_batch_write() {
715 let temp_dir = TempDir::new().unwrap();
716 let config = ParquetStorageConfig {
717 batch_size: 100, ..Default::default()
719 };
720 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
721
722 let events: Vec<Event> = (0..250)
724 .map(|i| create_test_event(&format!("entity-{}", i)))
725 .collect();
726
727 let result = storage.batch_write(events).unwrap();
728 assert_eq!(result.events_written, 250);
729 assert_eq!(result.batches_flushed, 2);
730
731 assert_eq!(storage.pending_count(), 50);
733
734 storage.flush().unwrap();
736
737 let loaded = storage.load_all_events().unwrap();
739 assert_eq!(loaded.len(), 250);
740 }
741
742 #[test]
743 fn test_auto_flush_on_batch_size() {
744 let temp_dir = TempDir::new().unwrap();
745 let config = ParquetStorageConfig {
746 batch_size: 10, ..Default::default()
748 };
749 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
750
751 for i in 0..15 {
753 storage
754 .append_event(create_test_event(&format!("entity-{}", i)))
755 .unwrap();
756 }
757
758 assert_eq!(storage.pending_count(), 5);
760
761 let stats = storage.batch_stats();
762 assert_eq!(stats.events_written, 10);
763 assert_eq!(stats.batches_written, 1);
764 assert_eq!(stats.size_flushes, 1);
765 }
766
767 #[test]
768 fn test_flush_on_shutdown() {
769 let temp_dir = TempDir::new().unwrap();
770 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
771
772 for i in 0..5 {
774 storage
775 .append_event(create_test_event(&format!("entity-{}", i)))
776 .unwrap();
777 }
778
779 assert_eq!(storage.pending_count(), 5);
780
781 let flushed = storage.flush_on_shutdown().unwrap();
783 assert_eq!(flushed, 5);
784 assert_eq!(storage.pending_count(), 0);
785
786 let loaded = storage.load_all_events().unwrap();
788 assert_eq!(loaded.len(), 5);
789 }
790
791 #[test]
792 fn test_thread_safe_writes() {
793 let temp_dir = TempDir::new().unwrap();
794 let config = ParquetStorageConfig {
795 batch_size: 100,
796 ..Default::default()
797 };
798 let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
799
800 let events_per_thread = 50;
801 let thread_count = 4;
802
803 std::thread::scope(|s| {
804 for t in 0..thread_count {
805 let storage_ref = Arc::clone(&storage);
806 s.spawn(move || {
807 for i in 0..events_per_thread {
808 let event = create_test_event(&format!("thread-{}-entity-{}", t, i));
809 storage_ref.append_event(event).unwrap();
810 }
811 });
812 }
813 });
814
815 storage.flush().unwrap();
817
818 let loaded = storage.load_all_events().unwrap();
820 assert_eq!(loaded.len(), events_per_thread * thread_count);
821 }
822
823 #[test]
824 fn test_batch_stats() {
825 let temp_dir = TempDir::new().unwrap();
826 let config = ParquetStorageConfig {
827 batch_size: 50,
828 ..Default::default()
829 };
830 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
831
832 let events: Vec<Event> = (0..100)
834 .map(|i| create_test_event(&format!("entity-{}", i)))
835 .collect();
836
837 storage.batch_write(events).unwrap();
838
839 let stats = storage.batch_stats();
840 assert_eq!(stats.batches_written, 2);
841 assert_eq!(stats.events_written, 100);
842 assert!(stats.avg_batch_size > 0.0);
843 assert!(stats.events_per_sec > 0.0);
844 assert_eq!(stats.size_flushes, 2);
845 }
846
847 #[test]
848 fn test_config_presets() {
849 let high_throughput = ParquetStorageConfig::high_throughput();
850 assert_eq!(high_throughput.batch_size, 50_000);
851 assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
852
853 let low_latency = ParquetStorageConfig::low_latency();
854 assert_eq!(low_latency.batch_size, 1_000);
855 assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
856
857 let default = ParquetStorageConfig::default();
858 assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
859 assert_eq!(default.batch_size, 10_000);
860 }
861
862 #[test]
865 #[ignore]
866 fn test_batch_write_throughput() {
867 let temp_dir = TempDir::new().unwrap();
868 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
869
870 let event_count = 50_000;
871
872 let events: Vec<Event> = (0..event_count)
874 .map(|i| create_test_event(&format!("entity-{}", i)))
875 .collect();
876
877 let start = std::time::Instant::now();
878 let result = storage.batch_write(events).unwrap();
879 storage.flush().unwrap(); let batch_duration = start.elapsed();
881
882 let batch_stats = storage.batch_stats();
883
884 println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
885 println!("Events: {}", event_count);
886 println!("Duration: {:?}", batch_duration);
887 println!("Events/sec: {:.0}", result.events_per_sec);
888 println!("Batches written: {}", batch_stats.batches_written);
889 println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
890 println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
891
892 assert!(
895 result.events_per_sec > 10_000.0,
896 "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
897 result.events_per_sec
898 );
899 }
900
901 #[test]
903 #[ignore]
904 fn test_single_event_write_baseline() {
905 let temp_dir = TempDir::new().unwrap();
906 let config = ParquetStorageConfig {
907 batch_size: 1, ..Default::default()
909 };
910 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
911
912 let event_count = 1_000; let start = std::time::Instant::now();
915 for i in 0..event_count {
916 let event = create_test_event(&format!("entity-{}", i));
917 storage.append_event(event).unwrap();
918 }
919 let duration = start.elapsed();
920
921 let events_per_sec = event_count as f64 / duration.as_secs_f64();
922
923 println!("\n=== Single-Event Write Baseline ===");
924 println!("Events: {}", event_count);
925 println!("Duration: {:?}", duration);
926 println!("Events/sec: {:.0}", events_per_sec);
927
928 }
931}