1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4};
5use arrow::{
6 array::{
7 Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8 UInt64Builder,
9 },
10 datatypes::{DataType, Field, Schema, TimeUnit},
11 record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15 fs::{self, File},
16 path::{Path, PathBuf},
17 sync::{
18 Arc, Mutex,
19 atomic::{AtomicU64, Ordering},
20 },
21 time::{Duration, Instant},
22};
23
24pub const DEFAULT_BATCH_SIZE: usize = 10_000;
26
27pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
29
30#[derive(Debug, Clone)]
32pub struct ParquetStorageConfig {
33 pub batch_size: usize,
35 pub flush_timeout: Duration,
37 pub compression: parquet::basic::Compression,
39}
40
41impl Default for ParquetStorageConfig {
42 fn default() -> Self {
43 Self {
44 batch_size: DEFAULT_BATCH_SIZE,
45 flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
46 compression: parquet::basic::Compression::SNAPPY,
47 }
48 }
49}
50
51impl ParquetStorageConfig {
52 pub fn high_throughput() -> Self {
54 Self {
55 batch_size: 50_000,
56 flush_timeout: Duration::from_secs(10),
57 compression: parquet::basic::Compression::SNAPPY,
58 }
59 }
60
61 pub fn low_latency() -> Self {
63 Self {
64 batch_size: 1_000,
65 flush_timeout: Duration::from_secs(1),
66 compression: parquet::basic::Compression::SNAPPY,
67 }
68 }
69}
70
71#[derive(Debug, Clone, Default)]
73pub struct BatchWriteStats {
74 pub batches_written: u64,
76 pub events_written: u64,
78 pub bytes_written: u64,
80 pub avg_batch_size: f64,
82 pub events_per_sec: f64,
84 pub total_write_time_ns: u64,
86 pub timeout_flushes: u64,
88 pub size_flushes: u64,
90}
91
92#[derive(Debug, Clone)]
94pub struct BatchWriteResult {
95 pub events_written: usize,
97 pub batches_flushed: usize,
99 pub duration: Duration,
101 pub events_per_sec: f64,
103}
104
105pub struct ParquetStorage {
114 storage_dir: PathBuf,
116
117 current_batch: Mutex<Vec<Event>>,
119
120 config: ParquetStorageConfig,
122
123 schema: Arc<Schema>,
125
126 last_flush_time: Mutex<Instant>,
128
129 batches_written: AtomicU64,
131 events_written: AtomicU64,
132 bytes_written: AtomicU64,
133 total_write_time_ns: AtomicU64,
134 timeout_flushes: AtomicU64,
135 size_flushes: AtomicU64,
136}
137
138impl ParquetStorage {
139 pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
141 Self::with_config(storage_dir, ParquetStorageConfig::default())
142 }
143
144 pub fn with_config(
146 storage_dir: impl AsRef<Path>,
147 config: ParquetStorageConfig,
148 ) -> Result<Self> {
149 let storage_dir = storage_dir.as_ref().to_path_buf();
150
151 fs::create_dir_all(&storage_dir).map_err(|e| {
153 AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
154 })?;
155
156 let schema = Arc::new(Schema::new(vec![
158 Field::new("event_id", DataType::Utf8, false),
159 Field::new("event_type", DataType::Utf8, false),
160 Field::new("entity_id", DataType::Utf8, false),
161 Field::new("payload", DataType::Utf8, false),
162 Field::new(
163 "timestamp",
164 DataType::Timestamp(TimeUnit::Microsecond, None),
165 false,
166 ),
167 Field::new("metadata", DataType::Utf8, true),
168 Field::new("version", DataType::UInt64, false),
169 ]));
170
171 Ok(Self {
172 storage_dir,
173 current_batch: Mutex::new(Vec::with_capacity(config.batch_size)),
174 config,
175 schema,
176 last_flush_time: Mutex::new(Instant::now()),
177 batches_written: AtomicU64::new(0),
178 events_written: AtomicU64::new(0),
179 bytes_written: AtomicU64::new(0),
180 total_write_time_ns: AtomicU64::new(0),
181 timeout_flushes: AtomicU64::new(0),
182 size_flushes: AtomicU64::new(0),
183 })
184 }
185
186 #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
188 pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
189 Self::with_config(
190 storage_dir,
191 ParquetStorageConfig {
192 batch_size: 1000,
193 ..Default::default()
194 },
195 )
196 }
197
198 pub fn append_event(&self, event: Event) -> Result<()> {
205 let should_flush = {
206 let mut batch = self.current_batch.lock().unwrap();
207 batch.push(event);
208 batch.len() >= self.config.batch_size
209 };
210
211 if should_flush {
212 self.size_flushes.fetch_add(1, Ordering::Relaxed);
213 self.flush()?;
214 }
215
216 Ok(())
217 }
218
219 pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
224 let start = Instant::now();
225 let event_count = events.len();
226 let mut batches_flushed = 0;
227
228 let mut remaining_events = events.into_iter().peekable();
230
231 while remaining_events.peek().is_some() {
232 let should_flush = {
233 let mut batch = self.current_batch.lock().unwrap();
234 let available_space = self.config.batch_size.saturating_sub(batch.len());
235
236 if available_space == 0 {
237 true
238 } else {
239 let to_add: Vec<Event> =
241 remaining_events.by_ref().take(available_space).collect();
242 batch.extend(to_add);
243 batch.len() >= self.config.batch_size
244 }
245 };
246
247 if should_flush {
248 self.size_flushes.fetch_add(1, Ordering::Relaxed);
249 self.flush()?;
250 batches_flushed += 1;
251 }
252 }
253
254 let duration = start.elapsed();
255
256 Ok(BatchWriteResult {
257 events_written: event_count,
258 batches_flushed,
259 duration,
260 events_per_sec: event_count as f64 / duration.as_secs_f64(),
261 })
262 }
263
264 pub fn check_timeout_flush(&self) -> Result<bool> {
269 let should_flush = {
270 let last_flush = self.last_flush_time.lock().unwrap();
271 let batch = self.current_batch.lock().unwrap();
272 !batch.is_empty() && last_flush.elapsed() >= self.config.flush_timeout
273 };
274
275 if should_flush {
276 self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
277 self.flush()?;
278 Ok(true)
279 } else {
280 Ok(false)
281 }
282 }
283
284 pub fn flush(&self) -> Result<()> {
288 let events_to_write = {
289 let mut batch = self.current_batch.lock().unwrap();
290 if batch.is_empty() {
291 return Ok(());
292 }
293 std::mem::take(&mut *batch)
294 };
295
296 let batch_count = events_to_write.len();
297 tracing::info!("Flushing {} events to Parquet storage", batch_count);
298
299 let start = Instant::now();
300
301 let record_batch = self.events_to_record_batch(&events_to_write)?;
303
304 let filename = format!(
306 "events-{}-{}.parquet",
307 chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
308 uuid::Uuid::new_v4().as_simple()
309 );
310 let file_path = self.storage_dir.join(&filename);
311
312 let file = File::create(&file_path).map_err(|e| {
314 AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
315 })?;
316
317 let props = WriterProperties::builder()
318 .set_compression(self.config.compression)
319 .build();
320
321 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
322
323 writer.write(&record_batch)?;
324 let file_metadata = writer.close()?;
325
326 let duration = start.elapsed();
327
328 self.batches_written.fetch_add(1, Ordering::Relaxed);
330 self.events_written
331 .fetch_add(batch_count as u64, Ordering::Relaxed);
332 if let Some(size) = file_metadata
333 .row_groups()
334 .first()
335 .map(|rg| rg.total_byte_size())
336 {
337 self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
338 }
339 self.total_write_time_ns
340 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
341
342 {
344 let mut last_flush = self.last_flush_time.lock().unwrap();
345 *last_flush = Instant::now();
346 }
347
348 tracing::info!(
349 "Successfully wrote {} events to {} in {:?}",
350 batch_count,
351 file_path.display(),
352 duration
353 );
354
355 Ok(())
356 }
357
358 pub fn flush_on_shutdown(&self) -> Result<usize> {
362 let batch_size = {
363 let batch = self.current_batch.lock().unwrap();
364 batch.len()
365 };
366
367 if batch_size > 0 {
368 tracing::info!("Shutdown: flushing partial batch of {} events", batch_size);
369 self.flush()?;
370 }
371
372 Ok(batch_size)
373 }
374
375 pub fn batch_stats(&self) -> BatchWriteStats {
377 let batches = self.batches_written.load(Ordering::Relaxed);
378 let events = self.events_written.load(Ordering::Relaxed);
379 let bytes = self.bytes_written.load(Ordering::Relaxed);
380 let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
381
382 let time_secs = time_ns as f64 / 1_000_000_000.0;
383
384 BatchWriteStats {
385 batches_written: batches,
386 events_written: events,
387 bytes_written: bytes,
388 avg_batch_size: if batches > 0 {
389 events as f64 / batches as f64
390 } else {
391 0.0
392 },
393 events_per_sec: if time_secs > 0.0 {
394 events as f64 / time_secs
395 } else {
396 0.0
397 },
398 total_write_time_ns: time_ns,
399 timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
400 size_flushes: self.size_flushes.load(Ordering::Relaxed),
401 }
402 }
403
404 pub fn pending_count(&self) -> usize {
406 self.current_batch.lock().unwrap().len()
407 }
408
409 pub fn batch_size(&self) -> usize {
411 self.config.batch_size
412 }
413
414 pub fn flush_timeout(&self) -> Duration {
416 self.config.flush_timeout
417 }
418
419 fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
421 let mut event_id_builder = StringBuilder::new();
422 let mut event_type_builder = StringBuilder::new();
423 let mut entity_id_builder = StringBuilder::new();
424 let mut payload_builder = StringBuilder::new();
425 let mut timestamp_builder = TimestampMicrosecondBuilder::new();
426 let mut metadata_builder = StringBuilder::new();
427 let mut version_builder = UInt64Builder::new();
428
429 for event in events {
430 event_id_builder.append_value(event.id.to_string());
431 event_type_builder.append_value(event.event_type_str());
432 entity_id_builder.append_value(event.entity_id_str());
433 payload_builder.append_value(serde_json::to_string(&event.payload)?);
434
435 let timestamp_micros = event.timestamp.timestamp_micros();
437 timestamp_builder.append_value(timestamp_micros);
438
439 if let Some(ref metadata) = event.metadata {
440 metadata_builder.append_value(serde_json::to_string(metadata)?);
441 } else {
442 metadata_builder.append_null();
443 }
444
445 version_builder.append_value(event.version as u64);
446 }
447
448 let arrays: Vec<ArrayRef> = vec![
449 Arc::new(event_id_builder.finish()),
450 Arc::new(event_type_builder.finish()),
451 Arc::new(entity_id_builder.finish()),
452 Arc::new(payload_builder.finish()),
453 Arc::new(timestamp_builder.finish()),
454 Arc::new(metadata_builder.finish()),
455 Arc::new(version_builder.finish()),
456 ];
457
458 let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
459
460 Ok(record_batch)
461 }
462
463 pub fn load_all_events(&self) -> Result<Vec<Event>> {
465 let mut all_events = Vec::new();
466
467 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
469 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
470 })?;
471
472 let mut parquet_files: Vec<PathBuf> = entries
473 .filter_map(|entry| entry.ok())
474 .map(|entry| entry.path())
475 .filter(|path| {
476 path.extension()
477 .and_then(|ext| ext.to_str())
478 .map(|ext| ext == "parquet")
479 .unwrap_or(false)
480 })
481 .collect();
482
483 parquet_files.sort();
485
486 for file_path in parquet_files {
487 tracing::info!("Loading events from {}", file_path.display());
488 let file_events = self.load_events_from_file(&file_path)?;
489 all_events.extend(file_events);
490 }
491
492 tracing::info!("Loaded {} total events from storage", all_events.len());
493
494 Ok(all_events)
495 }
496
497 fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
499 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
500
501 let file = File::open(file_path).map_err(|e| {
502 AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
503 })?;
504
505 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
506 let mut reader = builder.build()?;
507
508 let mut events = Vec::new();
509
510 while let Some(Ok(batch)) = reader.next() {
511 let batch_events = self.record_batch_to_events(&batch)?;
512 events.extend(batch_events);
513 }
514
515 Ok(events)
516 }
517
518 fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
520 let event_ids = batch
521 .column(0)
522 .as_any()
523 .downcast_ref::<arrow::array::StringArray>()
524 .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
525
526 let event_types = batch
527 .column(1)
528 .as_any()
529 .downcast_ref::<arrow::array::StringArray>()
530 .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
531
532 let entity_ids = batch
533 .column(2)
534 .as_any()
535 .downcast_ref::<arrow::array::StringArray>()
536 .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
537
538 let payloads = batch
539 .column(3)
540 .as_any()
541 .downcast_ref::<arrow::array::StringArray>()
542 .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
543
544 let timestamps = batch
545 .column(4)
546 .as_any()
547 .downcast_ref::<TimestampMicrosecondArray>()
548 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
549
550 let metadatas = batch
551 .column(5)
552 .as_any()
553 .downcast_ref::<arrow::array::StringArray>()
554 .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
555
556 let versions = batch
557 .column(6)
558 .as_any()
559 .downcast_ref::<arrow::array::UInt64Array>()
560 .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
561
562 let mut events = Vec::new();
563
564 for i in 0..batch.num_rows() {
565 let id = uuid::Uuid::parse_str(event_ids.value(i))
566 .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
567
568 let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
569 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
570
571 let metadata = if metadatas.is_null(i) {
572 None
573 } else {
574 Some(serde_json::from_str(metadatas.value(i))?)
575 };
576
577 let event = Event::reconstruct_from_strings(
578 id,
579 event_types.value(i).to_string(),
580 entity_ids.value(i).to_string(),
581 "default".to_string(), serde_json::from_str(payloads.value(i))?,
583 timestamp,
584 metadata,
585 versions.value(i) as i64,
586 );
587
588 events.push(event);
589 }
590
591 Ok(events)
592 }
593
594 pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
599 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
600 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
601 })?;
602
603 let mut parquet_files: Vec<PathBuf> = entries
604 .filter_map(|entry| entry.ok())
605 .map(|entry| entry.path())
606 .filter(|path| {
607 path.extension()
608 .and_then(|ext| ext.to_str())
609 .map(|ext| ext == "parquet")
610 .unwrap_or(false)
611 })
612 .collect();
613
614 parquet_files.sort();
615 Ok(parquet_files)
616 }
617
618 pub fn storage_dir(&self) -> &Path {
620 &self.storage_dir
621 }
622
623 pub fn stats(&self) -> Result<StorageStats> {
625 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
626 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
627 })?;
628
629 let mut total_files = 0;
630 let mut total_size_bytes = 0u64;
631
632 for entry in entries.flatten() {
633 let path = entry.path();
634 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
635 total_files += 1;
636 if let Ok(metadata) = entry.metadata() {
637 total_size_bytes += metadata.len();
638 }
639 }
640 }
641
642 let current_batch_size = self.current_batch.lock().unwrap().len();
643
644 Ok(StorageStats {
645 total_files,
646 total_size_bytes,
647 storage_dir: self.storage_dir.clone(),
648 current_batch_size,
649 })
650 }
651}
652
653impl Drop for ParquetStorage {
654 fn drop(&mut self) {
655 if let Err(e) = self.flush_on_shutdown() {
657 tracing::error!("Failed to flush events on drop: {}", e);
658 }
659 }
660}
661
662#[derive(Debug, serde::Serialize)]
663pub struct StorageStats {
664 pub total_files: usize,
665 pub total_size_bytes: u64,
666 pub storage_dir: PathBuf,
667 pub current_batch_size: usize,
668}
669
670#[cfg(test)]
671mod tests {
672 use super::*;
673 use serde_json::json;
674 use std::sync::Arc;
675 use tempfile::TempDir;
676
677 fn create_test_event(entity_id: &str) -> Event {
678 Event::reconstruct_from_strings(
679 uuid::Uuid::new_v4(),
680 "test.event".to_string(),
681 entity_id.to_string(),
682 "default".to_string(),
683 json!({
684 "test": "data",
685 "value": 42
686 }),
687 chrono::Utc::now(),
688 None,
689 1,
690 )
691 }
692
693 #[test]
694 fn test_parquet_storage_write_read() {
695 let temp_dir = TempDir::new().unwrap();
696 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
697
698 for i in 0..10 {
700 let event = create_test_event(&format!("entity-{}", i));
701 storage.append_event(event).unwrap();
702 }
703
704 storage.flush().unwrap();
706
707 let loaded_events = storage.load_all_events().unwrap();
709 assert_eq!(loaded_events.len(), 10);
710 }
711
712 #[test]
713 fn test_storage_stats() {
714 let temp_dir = TempDir::new().unwrap();
715 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
716
717 for i in 0..5 {
719 storage
720 .append_event(create_test_event(&format!("entity-{}", i)))
721 .unwrap();
722 }
723 storage.flush().unwrap();
724
725 let stats = storage.stats().unwrap();
726 assert_eq!(stats.total_files, 1);
727 assert!(stats.total_size_bytes > 0);
728 }
729
730 #[test]
731 fn test_default_batch_size() {
732 let temp_dir = TempDir::new().unwrap();
733 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
734
735 assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
737 assert_eq!(storage.batch_size(), 10_000);
738 }
739
740 #[test]
741 fn test_custom_config() {
742 let temp_dir = TempDir::new().unwrap();
743 let config = ParquetStorageConfig {
744 batch_size: 5_000,
745 flush_timeout: Duration::from_secs(2),
746 compression: parquet::basic::Compression::SNAPPY,
747 };
748 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
749
750 assert_eq!(storage.batch_size(), 5_000);
751 assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
752 }
753
754 #[test]
755 fn test_batch_write() {
756 let temp_dir = TempDir::new().unwrap();
757 let config = ParquetStorageConfig {
758 batch_size: 100, ..Default::default()
760 };
761 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
762
763 let events: Vec<Event> = (0..250)
765 .map(|i| create_test_event(&format!("entity-{}", i)))
766 .collect();
767
768 let result = storage.batch_write(events).unwrap();
769 assert_eq!(result.events_written, 250);
770 assert_eq!(result.batches_flushed, 2);
771
772 assert_eq!(storage.pending_count(), 50);
774
775 storage.flush().unwrap();
777
778 let loaded = storage.load_all_events().unwrap();
780 assert_eq!(loaded.len(), 250);
781 }
782
783 #[test]
784 fn test_auto_flush_on_batch_size() {
785 let temp_dir = TempDir::new().unwrap();
786 let config = ParquetStorageConfig {
787 batch_size: 10, ..Default::default()
789 };
790 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
791
792 for i in 0..15 {
794 storage
795 .append_event(create_test_event(&format!("entity-{}", i)))
796 .unwrap();
797 }
798
799 assert_eq!(storage.pending_count(), 5);
801
802 let stats = storage.batch_stats();
803 assert_eq!(stats.events_written, 10);
804 assert_eq!(stats.batches_written, 1);
805 assert_eq!(stats.size_flushes, 1);
806 }
807
808 #[test]
809 fn test_flush_on_shutdown() {
810 let temp_dir = TempDir::new().unwrap();
811 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
812
813 for i in 0..5 {
815 storage
816 .append_event(create_test_event(&format!("entity-{}", i)))
817 .unwrap();
818 }
819
820 assert_eq!(storage.pending_count(), 5);
821
822 let flushed = storage.flush_on_shutdown().unwrap();
824 assert_eq!(flushed, 5);
825 assert_eq!(storage.pending_count(), 0);
826
827 let loaded = storage.load_all_events().unwrap();
829 assert_eq!(loaded.len(), 5);
830 }
831
832 #[test]
833 fn test_thread_safe_writes() {
834 let temp_dir = TempDir::new().unwrap();
835 let config = ParquetStorageConfig {
836 batch_size: 100,
837 ..Default::default()
838 };
839 let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
840
841 let events_per_thread = 50;
842 let thread_count = 4;
843
844 std::thread::scope(|s| {
845 for t in 0..thread_count {
846 let storage_ref = Arc::clone(&storage);
847 s.spawn(move || {
848 for i in 0..events_per_thread {
849 let event = create_test_event(&format!("thread-{}-entity-{}", t, i));
850 storage_ref.append_event(event).unwrap();
851 }
852 });
853 }
854 });
855
856 storage.flush().unwrap();
858
859 let loaded = storage.load_all_events().unwrap();
861 assert_eq!(loaded.len(), events_per_thread * thread_count);
862 }
863
864 #[test]
865 fn test_batch_stats() {
866 let temp_dir = TempDir::new().unwrap();
867 let config = ParquetStorageConfig {
868 batch_size: 50,
869 ..Default::default()
870 };
871 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
872
873 let events: Vec<Event> = (0..100)
875 .map(|i| create_test_event(&format!("entity-{}", i)))
876 .collect();
877
878 storage.batch_write(events).unwrap();
879
880 let stats = storage.batch_stats();
881 assert_eq!(stats.batches_written, 2);
882 assert_eq!(stats.events_written, 100);
883 assert!(stats.avg_batch_size > 0.0);
884 assert!(stats.events_per_sec > 0.0);
885 assert_eq!(stats.size_flushes, 2);
886 }
887
888 #[test]
889 fn test_config_presets() {
890 let high_throughput = ParquetStorageConfig::high_throughput();
891 assert_eq!(high_throughput.batch_size, 50_000);
892 assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
893
894 let low_latency = ParquetStorageConfig::low_latency();
895 assert_eq!(low_latency.batch_size, 1_000);
896 assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
897
898 let default = ParquetStorageConfig::default();
899 assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
900 assert_eq!(default.batch_size, 10_000);
901 }
902
903 #[test]
906 #[ignore]
907 fn test_batch_write_throughput() {
908 let temp_dir = TempDir::new().unwrap();
909 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
910
911 let event_count = 50_000;
912
913 let events: Vec<Event> = (0..event_count)
915 .map(|i| create_test_event(&format!("entity-{}", i)))
916 .collect();
917
918 let start = std::time::Instant::now();
919 let result = storage.batch_write(events).unwrap();
920 storage.flush().unwrap(); let batch_duration = start.elapsed();
922
923 let batch_stats = storage.batch_stats();
924
925 println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
926 println!("Events: {}", event_count);
927 println!("Duration: {:?}", batch_duration);
928 println!("Events/sec: {:.0}", result.events_per_sec);
929 println!("Batches written: {}", batch_stats.batches_written);
930 println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
931 println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
932
933 assert!(
936 result.events_per_sec > 10_000.0,
937 "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
938 result.events_per_sec
939 );
940 }
941
942 #[test]
944 #[ignore]
945 fn test_single_event_write_baseline() {
946 let temp_dir = TempDir::new().unwrap();
947 let config = ParquetStorageConfig {
948 batch_size: 1, ..Default::default()
950 };
951 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
952
953 let event_count = 1_000; let start = std::time::Instant::now();
956 for i in 0..event_count {
957 let event = create_test_event(&format!("entity-{}", i));
958 storage.append_event(event).unwrap();
959 }
960 let duration = start.elapsed();
961
962 let events_per_sec = event_count as f64 / duration.as_secs_f64();
963
964 println!("\n=== Single-Event Write Baseline ===");
965 println!("Events: {}", event_count);
966 println!("Duration: {:?}", duration);
967 println!("Events/sec: {:.0}", events_per_sec);
968
969 }
972}