1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4};
5use arrow::{
6 array::{
7 Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
8 UInt64Builder,
9 },
10 datatypes::{DataType, Field, Schema, TimeUnit},
11 record_batch::RecordBatch,
12};
13use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
14use std::{
15 fs::{self, File},
16 path::{Path, PathBuf},
17 sync::{
18 Arc, Mutex,
19 atomic::{AtomicU64, Ordering},
20 },
21 time::{Duration, Instant},
22};
23
24pub const DEFAULT_BATCH_SIZE: usize = 10_000;
26
27pub const DEFAULT_FLUSH_TIMEOUT_MS: u64 = 5_000;
29
30#[derive(Debug, Clone)]
32pub struct ParquetStorageConfig {
33 pub batch_size: usize,
35 pub flush_timeout: Duration,
37 pub compression: parquet::basic::Compression,
39}
40
41impl Default for ParquetStorageConfig {
42 fn default() -> Self {
43 Self {
44 batch_size: DEFAULT_BATCH_SIZE,
45 flush_timeout: Duration::from_millis(DEFAULT_FLUSH_TIMEOUT_MS),
46 compression: parquet::basic::Compression::SNAPPY,
47 }
48 }
49}
50
51impl ParquetStorageConfig {
52 pub fn high_throughput() -> Self {
54 Self {
55 batch_size: 50_000,
56 flush_timeout: Duration::from_secs(10),
57 compression: parquet::basic::Compression::SNAPPY,
58 }
59 }
60
61 pub fn low_latency() -> Self {
63 Self {
64 batch_size: 1_000,
65 flush_timeout: Duration::from_secs(1),
66 compression: parquet::basic::Compression::SNAPPY,
67 }
68 }
69}
70
71#[derive(Debug, Clone, Default)]
73pub struct BatchWriteStats {
74 pub batches_written: u64,
76 pub events_written: u64,
78 pub bytes_written: u64,
80 pub avg_batch_size: f64,
82 pub events_per_sec: f64,
84 pub total_write_time_ns: u64,
86 pub timeout_flushes: u64,
88 pub size_flushes: u64,
90}
91
92#[derive(Debug, Clone)]
94pub struct BatchWriteResult {
95 pub events_written: usize,
97 pub batches_flushed: usize,
99 pub duration: Duration,
101 pub events_per_sec: f64,
103}
104
105pub struct ParquetStorage {
114 storage_dir: PathBuf,
116
117 current_batch: Mutex<Vec<Event>>,
119
120 config: ParquetStorageConfig,
122
123 schema: Arc<Schema>,
125
126 last_flush_time: Mutex<Instant>,
128
129 batches_written: AtomicU64,
131 events_written: AtomicU64,
132 bytes_written: AtomicU64,
133 total_write_time_ns: AtomicU64,
134 timeout_flushes: AtomicU64,
135 size_flushes: AtomicU64,
136}
137
138impl ParquetStorage {
139 pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
141 Self::with_config(storage_dir, ParquetStorageConfig::default())
142 }
143
144 pub fn with_config(
146 storage_dir: impl AsRef<Path>,
147 config: ParquetStorageConfig,
148 ) -> Result<Self> {
149 let storage_dir = storage_dir.as_ref().to_path_buf();
150
151 fs::create_dir_all(&storage_dir).map_err(|e| {
153 AllSourceError::StorageError(format!("Failed to create storage directory: {e}"))
154 })?;
155
156 let schema = Arc::new(Schema::new(vec![
158 Field::new("event_id", DataType::Utf8, false),
159 Field::new("event_type", DataType::Utf8, false),
160 Field::new("entity_id", DataType::Utf8, false),
161 Field::new("payload", DataType::Utf8, false),
162 Field::new(
163 "timestamp",
164 DataType::Timestamp(TimeUnit::Microsecond, None),
165 false,
166 ),
167 Field::new("metadata", DataType::Utf8, true),
168 Field::new("version", DataType::UInt64, false),
169 ]));
170
171 Ok(Self {
172 storage_dir,
173 current_batch: Mutex::new(Vec::with_capacity(config.batch_size)),
174 config,
175 schema,
176 last_flush_time: Mutex::new(Instant::now()),
177 batches_written: AtomicU64::new(0),
178 events_written: AtomicU64::new(0),
179 bytes_written: AtomicU64::new(0),
180 total_write_time_ns: AtomicU64::new(0),
181 timeout_flushes: AtomicU64::new(0),
182 size_flushes: AtomicU64::new(0),
183 })
184 }
185
186 #[deprecated(note = "Use new() or with_config() instead - default batch size is now 10,000")]
188 pub fn with_legacy_batch_size(storage_dir: impl AsRef<Path>) -> Result<Self> {
189 Self::with_config(
190 storage_dir,
191 ParquetStorageConfig {
192 batch_size: 1000,
193 ..Default::default()
194 },
195 )
196 }
197
198 #[cfg_attr(feature = "hotpath", hotpath::measure)]
205 pub fn append_event(&self, event: Event) -> Result<()> {
206 let should_flush = {
207 let mut batch = self.current_batch.lock().unwrap();
208 batch.push(event);
209 batch.len() >= self.config.batch_size
210 };
211
212 if should_flush {
213 self.size_flushes.fetch_add(1, Ordering::Relaxed);
214 self.flush()?;
215 }
216
217 Ok(())
218 }
219
220 #[cfg_attr(feature = "hotpath", hotpath::measure)]
225 pub fn batch_write(&self, events: Vec<Event>) -> Result<BatchWriteResult> {
226 let start = Instant::now();
227 let event_count = events.len();
228 let mut batches_flushed = 0;
229
230 let mut remaining_events = events.into_iter().peekable();
232
233 while remaining_events.peek().is_some() {
234 let should_flush = {
235 let mut batch = self.current_batch.lock().unwrap();
236 let available_space = self.config.batch_size.saturating_sub(batch.len());
237
238 if available_space == 0 {
239 true
240 } else {
241 let to_add: Vec<Event> =
243 remaining_events.by_ref().take(available_space).collect();
244 batch.extend(to_add);
245 batch.len() >= self.config.batch_size
246 }
247 };
248
249 if should_flush {
250 self.size_flushes.fetch_add(1, Ordering::Relaxed);
251 self.flush()?;
252 batches_flushed += 1;
253 }
254 }
255
256 let duration = start.elapsed();
257
258 Ok(BatchWriteResult {
259 events_written: event_count,
260 batches_flushed,
261 duration,
262 events_per_sec: event_count as f64 / duration.as_secs_f64(),
263 })
264 }
265
266 #[cfg_attr(feature = "hotpath", hotpath::measure)]
271 pub fn check_timeout_flush(&self) -> Result<bool> {
272 let should_flush = {
273 let last_flush = self.last_flush_time.lock().unwrap();
274 let batch = self.current_batch.lock().unwrap();
275 !batch.is_empty() && last_flush.elapsed() >= self.config.flush_timeout
276 };
277
278 if should_flush {
279 self.timeout_flushes.fetch_add(1, Ordering::Relaxed);
280 self.flush()?;
281 Ok(true)
282 } else {
283 Ok(false)
284 }
285 }
286
287 #[cfg_attr(feature = "hotpath", hotpath::measure)]
291 pub fn flush(&self) -> Result<()> {
292 let events_to_write = {
293 let mut batch = self.current_batch.lock().unwrap();
294 if batch.is_empty() {
295 return Ok(());
296 }
297 std::mem::take(&mut *batch)
298 };
299
300 let batch_count = events_to_write.len();
301 tracing::info!("Flushing {} events to Parquet storage", batch_count);
302
303 let start = Instant::now();
304
305 let record_batch = self.events_to_record_batch(&events_to_write)?;
307
308 let filename = format!(
310 "events-{}-{}.parquet",
311 chrono::Utc::now().format("%Y%m%d-%H%M%S%3f"),
312 uuid::Uuid::new_v4().as_simple()
313 );
314 let file_path = self.storage_dir.join(&filename);
315
316 let file = File::create(&file_path).map_err(|e| {
318 AllSourceError::StorageError(format!("Failed to create parquet file: {e}"))
319 })?;
320
321 let props = WriterProperties::builder()
322 .set_compression(self.config.compression)
323 .build();
324
325 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
326
327 writer.write(&record_batch)?;
328 let file_metadata = writer.close()?;
329
330 let duration = start.elapsed();
331
332 self.batches_written.fetch_add(1, Ordering::Relaxed);
334 self.events_written
335 .fetch_add(batch_count as u64, Ordering::Relaxed);
336 if let Some(size) = file_metadata
337 .row_groups()
338 .first()
339 .map(parquet::file::metadata::RowGroupMetaData::total_byte_size)
340 {
341 self.bytes_written.fetch_add(size as u64, Ordering::Relaxed);
342 }
343 self.total_write_time_ns
344 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
345
346 {
348 let mut last_flush = self.last_flush_time.lock().unwrap();
349 *last_flush = Instant::now();
350 }
351
352 tracing::info!(
353 "Successfully wrote {} events to {} in {:?}",
354 batch_count,
355 file_path.display(),
356 duration
357 );
358
359 Ok(())
360 }
361
362 #[cfg_attr(feature = "hotpath", hotpath::measure)]
366 pub fn flush_on_shutdown(&self) -> Result<usize> {
367 let batch_size = {
368 let batch = self.current_batch.lock().unwrap();
369 batch.len()
370 };
371
372 if batch_size > 0 {
373 tracing::info!("Shutdown: flushing partial batch of {} events", batch_size);
374 self.flush()?;
375 }
376
377 Ok(batch_size)
378 }
379
380 pub fn batch_stats(&self) -> BatchWriteStats {
382 let batches = self.batches_written.load(Ordering::Relaxed);
383 let events = self.events_written.load(Ordering::Relaxed);
384 let bytes = self.bytes_written.load(Ordering::Relaxed);
385 let time_ns = self.total_write_time_ns.load(Ordering::Relaxed);
386
387 let time_secs = time_ns as f64 / 1_000_000_000.0;
388
389 BatchWriteStats {
390 batches_written: batches,
391 events_written: events,
392 bytes_written: bytes,
393 avg_batch_size: if batches > 0 {
394 events as f64 / batches as f64
395 } else {
396 0.0
397 },
398 events_per_sec: if time_secs > 0.0 {
399 events as f64 / time_secs
400 } else {
401 0.0
402 },
403 total_write_time_ns: time_ns,
404 timeout_flushes: self.timeout_flushes.load(Ordering::Relaxed),
405 size_flushes: self.size_flushes.load(Ordering::Relaxed),
406 }
407 }
408
409 pub fn pending_count(&self) -> usize {
411 self.current_batch.lock().unwrap().len()
412 }
413
414 pub fn batch_size(&self) -> usize {
416 self.config.batch_size
417 }
418
419 pub fn flush_timeout(&self) -> Duration {
421 self.config.flush_timeout
422 }
423
424 #[cfg_attr(feature = "hotpath", hotpath::measure)]
426 fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
427 let mut event_id_builder = StringBuilder::new();
428 let mut event_type_builder = StringBuilder::new();
429 let mut entity_id_builder = StringBuilder::new();
430 let mut payload_builder = StringBuilder::new();
431 let mut timestamp_builder = TimestampMicrosecondBuilder::new();
432 let mut metadata_builder = StringBuilder::new();
433 let mut version_builder = UInt64Builder::new();
434
435 for event in events {
436 event_id_builder.append_value(event.id.to_string());
437 event_type_builder.append_value(event.event_type_str());
438 entity_id_builder.append_value(event.entity_id_str());
439 payload_builder.append_value(serde_json::to_string(&event.payload)?);
440
441 let timestamp_micros = event.timestamp.timestamp_micros();
443 timestamp_builder.append_value(timestamp_micros);
444
445 if let Some(ref metadata) = event.metadata {
446 metadata_builder.append_value(serde_json::to_string(metadata)?);
447 } else {
448 metadata_builder.append_null();
449 }
450
451 version_builder.append_value(event.version as u64);
452 }
453
454 let arrays: Vec<ArrayRef> = vec![
455 Arc::new(event_id_builder.finish()),
456 Arc::new(event_type_builder.finish()),
457 Arc::new(entity_id_builder.finish()),
458 Arc::new(payload_builder.finish()),
459 Arc::new(timestamp_builder.finish()),
460 Arc::new(metadata_builder.finish()),
461 Arc::new(version_builder.finish()),
462 ];
463
464 let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
465
466 Ok(record_batch)
467 }
468
469 #[cfg_attr(feature = "hotpath", hotpath::measure)]
471 pub fn load_all_events(&self) -> Result<Vec<Event>> {
472 let mut all_events = Vec::new();
473
474 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
476 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
477 })?;
478
479 let mut parquet_files: Vec<PathBuf> = entries
480 .filter_map(std::result::Result::ok)
481 .map(|entry| entry.path())
482 .filter(|path| {
483 path.extension()
484 .and_then(|ext| ext.to_str())
485 .is_some_and(|ext| ext == "parquet")
486 })
487 .collect();
488
489 parquet_files.sort();
491
492 for file_path in parquet_files {
493 tracing::info!("Loading events from {}", file_path.display());
494 let file_events = self.load_events_from_file(&file_path)?;
495 all_events.extend(file_events);
496 }
497
498 tracing::info!("Loaded {} total events from storage", all_events.len());
499
500 Ok(all_events)
501 }
502
503 #[cfg_attr(feature = "hotpath", hotpath::measure)]
505 fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
506 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
507
508 let file = File::open(file_path).map_err(|e| {
509 AllSourceError::StorageError(format!("Failed to open parquet file: {e}"))
510 })?;
511
512 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
513 let mut reader = builder.build()?;
514
515 let mut events = Vec::new();
516
517 while let Some(Ok(batch)) = reader.next() {
518 let batch_events = self.record_batch_to_events(&batch)?;
519 events.extend(batch_events);
520 }
521
522 Ok(events)
523 }
524
525 #[cfg_attr(feature = "hotpath", hotpath::measure)]
527 fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
528 let event_ids = batch
529 .column(0)
530 .as_any()
531 .downcast_ref::<arrow::array::StringArray>()
532 .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
533
534 let event_types = batch
535 .column(1)
536 .as_any()
537 .downcast_ref::<arrow::array::StringArray>()
538 .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
539
540 let entity_ids = batch
541 .column(2)
542 .as_any()
543 .downcast_ref::<arrow::array::StringArray>()
544 .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
545
546 let payloads = batch
547 .column(3)
548 .as_any()
549 .downcast_ref::<arrow::array::StringArray>()
550 .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
551
552 let timestamps = batch
553 .column(4)
554 .as_any()
555 .downcast_ref::<TimestampMicrosecondArray>()
556 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
557
558 let metadatas = batch
559 .column(5)
560 .as_any()
561 .downcast_ref::<arrow::array::StringArray>()
562 .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
563
564 let versions = batch
565 .column(6)
566 .as_any()
567 .downcast_ref::<arrow::array::UInt64Array>()
568 .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
569
570 let mut events = Vec::new();
571
572 for i in 0..batch.num_rows() {
573 let id = uuid::Uuid::parse_str(event_ids.value(i))
574 .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {e}")))?;
575
576 let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
577 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
578
579 let metadata = if metadatas.is_null(i) {
580 None
581 } else {
582 Some(serde_json::from_str(metadatas.value(i))?)
583 };
584
585 let event = Event::reconstruct_from_strings(
586 id,
587 event_types.value(i).to_string(),
588 entity_ids.value(i).to_string(),
589 "default".to_string(), serde_json::from_str(payloads.value(i))?,
591 timestamp,
592 metadata,
593 versions.value(i) as i64,
594 );
595
596 events.push(event);
597 }
598
599 Ok(events)
600 }
601
602 pub fn list_parquet_files(&self) -> Result<Vec<PathBuf>> {
607 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
608 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
609 })?;
610
611 let mut parquet_files: Vec<PathBuf> = entries
612 .filter_map(std::result::Result::ok)
613 .map(|entry| entry.path())
614 .filter(|path| {
615 path.extension()
616 .and_then(|ext| ext.to_str())
617 .is_some_and(|ext| ext == "parquet")
618 })
619 .collect();
620
621 parquet_files.sort();
622 Ok(parquet_files)
623 }
624
625 pub fn storage_dir(&self) -> &Path {
627 &self.storage_dir
628 }
629
630 pub fn stats(&self) -> Result<StorageStats> {
632 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
633 AllSourceError::StorageError(format!("Failed to read storage directory: {e}"))
634 })?;
635
636 let mut total_files = 0;
637 let mut total_size_bytes = 0u64;
638
639 for entry in entries.flatten() {
640 let path = entry.path();
641 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
642 total_files += 1;
643 if let Ok(metadata) = entry.metadata() {
644 total_size_bytes += metadata.len();
645 }
646 }
647 }
648
649 let current_batch_size = self.current_batch.lock().unwrap().len();
650
651 Ok(StorageStats {
652 total_files,
653 total_size_bytes,
654 storage_dir: self.storage_dir.clone(),
655 current_batch_size,
656 })
657 }
658}
659
660impl Drop for ParquetStorage {
661 fn drop(&mut self) {
662 if let Err(e) = self.flush_on_shutdown() {
664 tracing::error!("Failed to flush events on drop: {}", e);
665 }
666 }
667}
668
669#[derive(Debug, serde::Serialize)]
670pub struct StorageStats {
671 pub total_files: usize,
672 pub total_size_bytes: u64,
673 pub storage_dir: PathBuf,
674 pub current_batch_size: usize,
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use serde_json::json;
681 use std::sync::Arc;
682 use tempfile::TempDir;
683
684 fn create_test_event(entity_id: &str) -> Event {
685 Event::reconstruct_from_strings(
686 uuid::Uuid::new_v4(),
687 "test.event".to_string(),
688 entity_id.to_string(),
689 "default".to_string(),
690 json!({
691 "test": "data",
692 "value": 42
693 }),
694 chrono::Utc::now(),
695 None,
696 1,
697 )
698 }
699
700 #[test]
701 fn test_parquet_storage_write_read() {
702 let temp_dir = TempDir::new().unwrap();
703 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
704
705 for i in 0..10 {
707 let event = create_test_event(&format!("entity-{i}"));
708 storage.append_event(event).unwrap();
709 }
710
711 storage.flush().unwrap();
713
714 let loaded_events = storage.load_all_events().unwrap();
716 assert_eq!(loaded_events.len(), 10);
717 }
718
719 #[test]
720 fn test_storage_stats() {
721 let temp_dir = TempDir::new().unwrap();
722 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
723
724 for i in 0..5 {
726 storage
727 .append_event(create_test_event(&format!("entity-{i}")))
728 .unwrap();
729 }
730 storage.flush().unwrap();
731
732 let stats = storage.stats().unwrap();
733 assert_eq!(stats.total_files, 1);
734 assert!(stats.total_size_bytes > 0);
735 }
736
737 #[test]
738 fn test_default_batch_size() {
739 let temp_dir = TempDir::new().unwrap();
740 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
741
742 assert_eq!(storage.batch_size(), DEFAULT_BATCH_SIZE);
744 assert_eq!(storage.batch_size(), 10_000);
745 }
746
747 #[test]
748 fn test_custom_config() {
749 let temp_dir = TempDir::new().unwrap();
750 let config = ParquetStorageConfig {
751 batch_size: 5_000,
752 flush_timeout: Duration::from_secs(2),
753 compression: parquet::basic::Compression::SNAPPY,
754 };
755 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
756
757 assert_eq!(storage.batch_size(), 5_000);
758 assert_eq!(storage.flush_timeout(), Duration::from_secs(2));
759 }
760
761 #[test]
762 fn test_batch_write() {
763 let temp_dir = TempDir::new().unwrap();
764 let config = ParquetStorageConfig {
765 batch_size: 100, ..Default::default()
767 };
768 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
769
770 let events: Vec<Event> = (0..250)
772 .map(|i| create_test_event(&format!("entity-{i}")))
773 .collect();
774
775 let result = storage.batch_write(events).unwrap();
776 assert_eq!(result.events_written, 250);
777 assert_eq!(result.batches_flushed, 2);
778
779 assert_eq!(storage.pending_count(), 50);
781
782 storage.flush().unwrap();
784
785 let loaded = storage.load_all_events().unwrap();
787 assert_eq!(loaded.len(), 250);
788 }
789
790 #[test]
791 fn test_auto_flush_on_batch_size() {
792 let temp_dir = TempDir::new().unwrap();
793 let config = ParquetStorageConfig {
794 batch_size: 10, ..Default::default()
796 };
797 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
798
799 for i in 0..15 {
801 storage
802 .append_event(create_test_event(&format!("entity-{i}")))
803 .unwrap();
804 }
805
806 assert_eq!(storage.pending_count(), 5);
808
809 let stats = storage.batch_stats();
810 assert_eq!(stats.events_written, 10);
811 assert_eq!(stats.batches_written, 1);
812 assert_eq!(stats.size_flushes, 1);
813 }
814
815 #[test]
816 fn test_flush_on_shutdown() {
817 let temp_dir = TempDir::new().unwrap();
818 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
819
820 for i in 0..5 {
822 storage
823 .append_event(create_test_event(&format!("entity-{i}")))
824 .unwrap();
825 }
826
827 assert_eq!(storage.pending_count(), 5);
828
829 let flushed = storage.flush_on_shutdown().unwrap();
831 assert_eq!(flushed, 5);
832 assert_eq!(storage.pending_count(), 0);
833
834 let loaded = storage.load_all_events().unwrap();
836 assert_eq!(loaded.len(), 5);
837 }
838
839 #[test]
840 fn test_thread_safe_writes() {
841 let temp_dir = TempDir::new().unwrap();
842 let config = ParquetStorageConfig {
843 batch_size: 100,
844 ..Default::default()
845 };
846 let storage = Arc::new(ParquetStorage::with_config(temp_dir.path(), config).unwrap());
847
848 let events_per_thread = 50;
849 let thread_count = 4;
850
851 std::thread::scope(|s| {
852 for t in 0..thread_count {
853 let storage_ref = Arc::clone(&storage);
854 s.spawn(move || {
855 for i in 0..events_per_thread {
856 let event = create_test_event(&format!("thread-{t}-entity-{i}"));
857 storage_ref.append_event(event).unwrap();
858 }
859 });
860 }
861 });
862
863 storage.flush().unwrap();
865
866 let loaded = storage.load_all_events().unwrap();
868 assert_eq!(loaded.len(), events_per_thread * thread_count);
869 }
870
871 #[test]
872 fn test_batch_stats() {
873 let temp_dir = TempDir::new().unwrap();
874 let config = ParquetStorageConfig {
875 batch_size: 50,
876 ..Default::default()
877 };
878 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
879
880 let events: Vec<Event> = (0..100)
882 .map(|i| create_test_event(&format!("entity-{i}")))
883 .collect();
884
885 storage.batch_write(events).unwrap();
886
887 let stats = storage.batch_stats();
888 assert_eq!(stats.batches_written, 2);
889 assert_eq!(stats.events_written, 100);
890 assert!(stats.avg_batch_size > 0.0);
891 assert!(stats.events_per_sec > 0.0);
892 assert_eq!(stats.size_flushes, 2);
893 }
894
895 #[test]
896 fn test_config_presets() {
897 let high_throughput = ParquetStorageConfig::high_throughput();
898 assert_eq!(high_throughput.batch_size, 50_000);
899 assert_eq!(high_throughput.flush_timeout, Duration::from_secs(10));
900
901 let low_latency = ParquetStorageConfig::low_latency();
902 assert_eq!(low_latency.batch_size, 1_000);
903 assert_eq!(low_latency.flush_timeout, Duration::from_secs(1));
904
905 let default = ParquetStorageConfig::default();
906 assert_eq!(default.batch_size, DEFAULT_BATCH_SIZE);
907 assert_eq!(default.batch_size, 10_000);
908 }
909
910 #[test]
913 #[ignore]
914 fn test_batch_write_throughput() {
915 let temp_dir = TempDir::new().unwrap();
916 let storage = ParquetStorage::new(temp_dir.path()).unwrap();
917
918 let event_count = 50_000;
919
920 let events: Vec<Event> = (0..event_count)
922 .map(|i| create_test_event(&format!("entity-{i}")))
923 .collect();
924
925 let start = std::time::Instant::now();
926 let result = storage.batch_write(events).unwrap();
927 storage.flush().unwrap(); let batch_duration = start.elapsed();
929
930 let batch_stats = storage.batch_stats();
931
932 println!("\n=== Parquet Batch Write Performance (BATCH_SIZE=10,000) ===");
933 println!("Events: {event_count}");
934 println!("Duration: {batch_duration:?}");
935 println!("Events/sec: {:.0}", result.events_per_sec);
936 println!("Batches written: {}", batch_stats.batches_written);
937 println!("Avg batch size: {:.0}", batch_stats.avg_batch_size);
938 println!("Bytes written: {} KB", batch_stats.bytes_written / 1024);
939
940 assert!(
943 result.events_per_sec > 10_000.0,
944 "Batch write throughput too low: {:.0} events/sec (expected >10K in debug, >100K in release)",
945 result.events_per_sec
946 );
947 }
948
949 #[test]
951 #[ignore]
952 fn test_single_event_write_baseline() {
953 let temp_dir = TempDir::new().unwrap();
954 let config = ParquetStorageConfig {
955 batch_size: 1, ..Default::default()
957 };
958 let storage = ParquetStorage::with_config(temp_dir.path(), config).unwrap();
959
960 let event_count = 1_000; let start = std::time::Instant::now();
963 for i in 0..event_count {
964 let event = create_test_event(&format!("entity-{i}"));
965 storage.append_event(event).unwrap();
966 }
967 let duration = start.elapsed();
968
969 let events_per_sec = f64::from(event_count) / duration.as_secs_f64();
970
971 println!("\n=== Single-Event Write Baseline ===");
972 println!("Events: {event_count}");
973 println!("Duration: {duration:?}");
974 println!("Events/sec: {events_per_sec:.0}");
975
976 }
979}