allsource_core/
storage.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use arrow::array::{
4    Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
5    UInt64Builder,
6};
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow::record_batch::RecordBatch;
9use parquet::arrow::ArrowWriter;
10use parquet::file::properties::WriterProperties;
11use std::fs::{self, File};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14
15/// Parquet-based persistent storage for events
16pub struct ParquetStorage {
17    /// Base directory for storing parquet files
18    storage_dir: PathBuf,
19
20    /// Current batch being accumulated
21    current_batch: Vec<Event>,
22
23    /// Batch size before flushing to disk
24    batch_size: usize,
25
26    /// Schema for Arrow/Parquet
27    schema: Arc<Schema>,
28}
29
30impl ParquetStorage {
31    pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
32        let storage_dir = storage_dir.as_ref().to_path_buf();
33
34        // Create storage directory if it doesn't exist
35        fs::create_dir_all(&storage_dir).map_err(|e| {
36            AllSourceError::StorageError(format!("Failed to create storage directory: {}", e))
37        })?;
38
39        // Define Arrow schema for events
40        let schema = Arc::new(Schema::new(vec![
41            Field::new("event_id", DataType::Utf8, false),
42            Field::new("event_type", DataType::Utf8, false),
43            Field::new("entity_id", DataType::Utf8, false),
44            Field::new("payload", DataType::Utf8, false),
45            Field::new(
46                "timestamp",
47                DataType::Timestamp(TimeUnit::Microsecond, None),
48                false,
49            ),
50            Field::new("metadata", DataType::Utf8, true),
51            Field::new("version", DataType::UInt64, false),
52        ]));
53
54        Ok(Self {
55            storage_dir,
56            current_batch: Vec::new(),
57            batch_size: 1000, // Flush every 1000 events
58            schema,
59        })
60    }
61
62    /// Add an event to the current batch
63    pub fn append_event(&mut self, event: Event) -> Result<()> {
64        self.current_batch.push(event);
65
66        // Auto-flush if batch is full
67        if self.current_batch.len() >= self.batch_size {
68            self.flush()?;
69        }
70
71        Ok(())
72    }
73
74    /// Flush current batch to a Parquet file
75    pub fn flush(&mut self) -> Result<()> {
76        if self.current_batch.is_empty() {
77            return Ok(());
78        }
79
80        let batch_count = self.current_batch.len();
81        tracing::info!("Flushing {} events to Parquet storage", batch_count);
82
83        // Create record batch from events
84        let record_batch = self.events_to_record_batch(&self.current_batch)?;
85
86        // Generate filename with timestamp
87        let filename = format!(
88            "events-{}.parquet",
89            chrono::Utc::now().format("%Y%m%d-%H%M%S")
90        );
91        let file_path = self.storage_dir.join(filename);
92
93        // Write to Parquet file
94        let file = File::create(&file_path).map_err(|e| {
95            AllSourceError::StorageError(format!("Failed to create parquet file: {}", e))
96        })?;
97
98        let props = WriterProperties::builder()
99            .set_compression(parquet::basic::Compression::SNAPPY)
100            .build();
101
102        let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
103
104        writer.write(&record_batch)?;
105        writer.close()?;
106
107        tracing::info!(
108            "Successfully wrote {} events to {}",
109            batch_count,
110            file_path.display()
111        );
112
113        // Clear current batch
114        self.current_batch.clear();
115
116        Ok(())
117    }
118
119    /// Convert events to Arrow RecordBatch
120    fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
121        let mut event_id_builder = StringBuilder::new();
122        let mut event_type_builder = StringBuilder::new();
123        let mut entity_id_builder = StringBuilder::new();
124        let mut payload_builder = StringBuilder::new();
125        let mut timestamp_builder = TimestampMicrosecondBuilder::new();
126        let mut metadata_builder = StringBuilder::new();
127        let mut version_builder = UInt64Builder::new();
128
129        for event in events {
130            event_id_builder.append_value(event.id.to_string());
131            event_type_builder.append_value(event.event_type_str());
132            entity_id_builder.append_value(event.entity_id_str());
133            payload_builder.append_value(serde_json::to_string(&event.payload)?);
134
135            // Convert timestamp to microseconds
136            let timestamp_micros = event.timestamp.timestamp_micros();
137            timestamp_builder.append_value(timestamp_micros);
138
139            if let Some(ref metadata) = event.metadata {
140                metadata_builder.append_value(serde_json::to_string(metadata)?);
141            } else {
142                metadata_builder.append_null();
143            }
144
145            version_builder.append_value(event.version as u64);
146        }
147
148        let arrays: Vec<ArrayRef> = vec![
149            Arc::new(event_id_builder.finish()),
150            Arc::new(event_type_builder.finish()),
151            Arc::new(entity_id_builder.finish()),
152            Arc::new(payload_builder.finish()),
153            Arc::new(timestamp_builder.finish()),
154            Arc::new(metadata_builder.finish()),
155            Arc::new(version_builder.finish()),
156        ];
157
158        let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
159
160        Ok(record_batch)
161    }
162
163    /// Load events from all Parquet files
164    pub fn load_all_events(&self) -> Result<Vec<Event>> {
165        let mut all_events = Vec::new();
166
167        // Read all parquet files in storage directory
168        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
169            AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
170        })?;
171
172        let mut parquet_files: Vec<PathBuf> = entries
173            .filter_map(|entry| entry.ok())
174            .map(|entry| entry.path())
175            .filter(|path| {
176                path.extension()
177                    .and_then(|ext| ext.to_str())
178                    .map(|ext| ext == "parquet")
179                    .unwrap_or(false)
180            })
181            .collect();
182
183        // Sort files by name (which includes timestamp)
184        parquet_files.sort();
185
186        for file_path in parquet_files {
187            tracing::info!("Loading events from {}", file_path.display());
188            let file_events = self.load_events_from_file(&file_path)?;
189            all_events.extend(file_events);
190        }
191
192        tracing::info!("Loaded {} total events from storage", all_events.len());
193
194        Ok(all_events)
195    }
196
197    /// Load events from a single Parquet file
198    fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
199        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
200
201        let file = File::open(file_path).map_err(|e| {
202            AllSourceError::StorageError(format!("Failed to open parquet file: {}", e))
203        })?;
204
205        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
206        let mut reader = builder.build()?;
207
208        let mut events = Vec::new();
209
210        while let Some(Ok(batch)) = reader.next() {
211            let batch_events = self.record_batch_to_events(&batch)?;
212            events.extend(batch_events);
213        }
214
215        Ok(events)
216    }
217
218    /// Convert Arrow RecordBatch back to events
219    fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
220        let event_ids = batch
221            .column(0)
222            .as_any()
223            .downcast_ref::<arrow::array::StringArray>()
224            .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
225
226        let event_types = batch
227            .column(1)
228            .as_any()
229            .downcast_ref::<arrow::array::StringArray>()
230            .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
231
232        let entity_ids = batch
233            .column(2)
234            .as_any()
235            .downcast_ref::<arrow::array::StringArray>()
236            .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
237
238        let payloads = batch
239            .column(3)
240            .as_any()
241            .downcast_ref::<arrow::array::StringArray>()
242            .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
243
244        let timestamps = batch
245            .column(4)
246            .as_any()
247            .downcast_ref::<TimestampMicrosecondArray>()
248            .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
249
250        let metadatas = batch
251            .column(5)
252            .as_any()
253            .downcast_ref::<arrow::array::StringArray>()
254            .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
255
256        let versions = batch
257            .column(6)
258            .as_any()
259            .downcast_ref::<arrow::array::UInt64Array>()
260            .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
261
262        let mut events = Vec::new();
263
264        for i in 0..batch.num_rows() {
265            let id = uuid::Uuid::parse_str(event_ids.value(i))
266                .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {}", e)))?;
267
268            let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
269                .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
270
271            let metadata = if metadatas.is_null(i) {
272                None
273            } else {
274                Some(serde_json::from_str(metadatas.value(i))?)
275            };
276
277            let event = Event::reconstruct_from_strings(
278                id,
279                event_types.value(i).to_string(),
280                entity_ids.value(i).to_string(),
281                "default".to_string(), // Default tenant for backward compatibility
282                serde_json::from_str(payloads.value(i))?,
283                timestamp,
284                metadata,
285                versions.value(i) as i64,
286            );
287
288            events.push(event);
289        }
290
291        Ok(events)
292    }
293
294    /// Get storage statistics
295    pub fn stats(&self) -> Result<StorageStats> {
296        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
297            AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
298        })?;
299
300        let mut total_files = 0;
301        let mut total_size_bytes = 0u64;
302
303        for entry in entries.flatten() {
304            let path = entry.path();
305            if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
306                total_files += 1;
307                if let Ok(metadata) = entry.metadata() {
308                    total_size_bytes += metadata.len();
309                }
310            }
311        }
312
313        Ok(StorageStats {
314            total_files,
315            total_size_bytes,
316            storage_dir: self.storage_dir.clone(),
317            current_batch_size: self.current_batch.len(),
318        })
319    }
320}
321
322impl Drop for ParquetStorage {
323    fn drop(&mut self) {
324        // Ensure any remaining events are flushed
325        if !self.current_batch.is_empty() {
326            if let Err(e) = self.flush() {
327                tracing::error!("Failed to flush events on drop: {}", e);
328            }
329        }
330    }
331}
332
333#[derive(Debug, serde::Serialize)]
334pub struct StorageStats {
335    pub total_files: usize,
336    pub total_size_bytes: u64,
337    pub storage_dir: PathBuf,
338    pub current_batch_size: usize,
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use serde_json::json;
345    use tempfile::TempDir;
346
347    fn create_test_event(entity_id: &str) -> Event {
348        Event::reconstruct_from_strings(
349            uuid::Uuid::new_v4(),
350            "test.event".to_string(),
351            entity_id.to_string(),
352            "default".to_string(),
353            json!({
354                "test": "data",
355                "value": 42
356            }),
357            chrono::Utc::now(),
358            None,
359            1,
360        )
361    }
362
363    #[test]
364    fn test_parquet_storage_write_read() {
365        let temp_dir = TempDir::new().unwrap();
366        let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
367
368        // Add events
369        for i in 0..10 {
370            let event = create_test_event(&format!("entity-{}", i));
371            storage.append_event(event).unwrap();
372        }
373
374        // Flush to disk
375        storage.flush().unwrap();
376
377        // Load back
378        let loaded_events = storage.load_all_events().unwrap();
379        assert_eq!(loaded_events.len(), 10);
380    }
381
382    #[test]
383    fn test_storage_stats() {
384        let temp_dir = TempDir::new().unwrap();
385        let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
386
387        // Add and flush events
388        for i in 0..5 {
389            storage
390                .append_event(create_test_event(&format!("entity-{}", i)))
391                .unwrap();
392        }
393        storage.flush().unwrap();
394
395        let stats = storage.stats().unwrap();
396        assert_eq!(stats.total_files, 1);
397        assert!(stats.total_size_bytes > 0);
398    }
399}