allsource_core/
storage.rs

1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use arrow::array::{
4    Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
5    UInt64Builder,
6};
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow::record_batch::RecordBatch;
9use parquet::arrow::ArrowWriter;
10use parquet::file::properties::WriterProperties;
11use std::fs::{self, File};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14
15/// Parquet-based persistent storage for events
16pub struct ParquetStorage {
17    /// Base directory for storing parquet files
18    storage_dir: PathBuf,
19
20    /// Current batch being accumulated
21    current_batch: Vec<Event>,
22
23    /// Batch size before flushing to disk
24    batch_size: usize,
25
26    /// Schema for Arrow/Parquet
27    schema: Arc<Schema>,
28}
29
30impl ParquetStorage {
31    pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
32        let storage_dir = storage_dir.as_ref().to_path_buf();
33
34        // Create storage directory if it doesn't exist
35        fs::create_dir_all(&storage_dir).map_err(|e| {
36            AllSourceError::StorageError(format!("Failed to create storage directory: {}", e))
37        })?;
38
39        // Define Arrow schema for events
40        let schema = Arc::new(Schema::new(vec![
41            Field::new("event_id", DataType::Utf8, false),
42            Field::new("event_type", DataType::Utf8, false),
43            Field::new("entity_id", DataType::Utf8, false),
44            Field::new("payload", DataType::Utf8, false),
45            Field::new(
46                "timestamp",
47                DataType::Timestamp(TimeUnit::Microsecond, None),
48                false,
49            ),
50            Field::new("metadata", DataType::Utf8, true),
51            Field::new("version", DataType::UInt64, false),
52        ]));
53
54        Ok(Self {
55            storage_dir,
56            current_batch: Vec::new(),
57            batch_size: 1000, // Flush every 1000 events
58            schema,
59        })
60    }
61
62    /// Add an event to the current batch
63    pub fn append_event(&mut self, event: Event) -> Result<()> {
64        self.current_batch.push(event);
65
66        // Auto-flush if batch is full
67        if self.current_batch.len() >= self.batch_size {
68            self.flush()?;
69        }
70
71        Ok(())
72    }
73
74    /// Flush current batch to a Parquet file
75    pub fn flush(&mut self) -> Result<()> {
76        if self.current_batch.is_empty() {
77            return Ok(());
78        }
79
80        let batch_count = self.current_batch.len();
81        tracing::info!("Flushing {} events to Parquet storage", batch_count);
82
83        // Create record batch from events
84        let record_batch = self.events_to_record_batch(&self.current_batch)?;
85
86        // Generate filename with timestamp
87        let filename = format!(
88            "events-{}.parquet",
89            chrono::Utc::now().format("%Y%m%d-%H%M%S")
90        );
91        let file_path = self.storage_dir.join(filename);
92
93        // Write to Parquet file
94        let file = File::create(&file_path).map_err(|e| {
95            AllSourceError::StorageError(format!("Failed to create parquet file: {}", e))
96        })?;
97
98        let props = WriterProperties::builder()
99            .set_compression(parquet::basic::Compression::SNAPPY)
100            .build();
101
102        let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
103
104        writer.write(&record_batch)?;
105        writer.close()?;
106
107        tracing::info!(
108            "Successfully wrote {} events to {}",
109            batch_count,
110            file_path.display()
111        );
112
113        // Clear current batch
114        self.current_batch.clear();
115
116        Ok(())
117    }
118
119    /// Convert events to Arrow RecordBatch
120    fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
121        let mut event_id_builder = StringBuilder::new();
122        let mut event_type_builder = StringBuilder::new();
123        let mut entity_id_builder = StringBuilder::new();
124        let mut payload_builder = StringBuilder::new();
125        let mut timestamp_builder = TimestampMicrosecondBuilder::new();
126        let mut metadata_builder = StringBuilder::new();
127        let mut version_builder = UInt64Builder::new();
128
129        for event in events {
130            event_id_builder.append_value(event.id.to_string());
131            event_type_builder.append_value(event.event_type_str());
132            entity_id_builder.append_value(event.entity_id_str());
133            payload_builder.append_value(serde_json::to_string(&event.payload)?);
134
135            // Convert timestamp to microseconds
136            let timestamp_micros = event.timestamp.timestamp_micros();
137            timestamp_builder.append_value(timestamp_micros);
138
139            if let Some(ref metadata) = event.metadata {
140                metadata_builder.append_value(serde_json::to_string(metadata)?);
141            } else {
142                metadata_builder.append_null();
143            }
144
145            version_builder.append_value(event.version as u64);
146        }
147
148        let arrays: Vec<ArrayRef> = vec![
149            Arc::new(event_id_builder.finish()),
150            Arc::new(event_type_builder.finish()),
151            Arc::new(entity_id_builder.finish()),
152            Arc::new(payload_builder.finish()),
153            Arc::new(timestamp_builder.finish()),
154            Arc::new(metadata_builder.finish()),
155            Arc::new(version_builder.finish()),
156        ];
157
158        let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
159
160        Ok(record_batch)
161    }
162
163    /// Load events from all Parquet files
164    pub fn load_all_events(&self) -> Result<Vec<Event>> {
165        let mut all_events = Vec::new();
166
167        // Read all parquet files in storage directory
168        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
169            AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
170        })?;
171
172        let mut parquet_files: Vec<PathBuf> = entries
173            .filter_map(|entry| entry.ok())
174            .map(|entry| entry.path())
175            .filter(|path| {
176                path.extension()
177                    .and_then(|ext| ext.to_str())
178                    .map(|ext| ext == "parquet")
179                    .unwrap_or(false)
180            })
181            .collect();
182
183        // Sort files by name (which includes timestamp)
184        parquet_files.sort();
185
186        for file_path in parquet_files {
187            tracing::info!("Loading events from {}", file_path.display());
188            let file_events = self.load_events_from_file(&file_path)?;
189            all_events.extend(file_events);
190        }
191
192        tracing::info!("Loaded {} total events from storage", all_events.len());
193
194        Ok(all_events)
195    }
196
197    /// Load events from a single Parquet file
198    fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
199        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
200
201        let file = File::open(file_path).map_err(|e| {
202            AllSourceError::StorageError(format!("Failed to open parquet file: {}", e))
203        })?;
204
205        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
206        let mut reader = builder.build()?;
207
208        let mut events = Vec::new();
209
210        while let Some(Ok(batch)) = reader.next() {
211            let batch_events = self.record_batch_to_events(&batch)?;
212            events.extend(batch_events);
213        }
214
215        Ok(events)
216    }
217
218    /// Convert Arrow RecordBatch back to events
219    fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
220        let event_ids = batch
221            .column(0)
222            .as_any()
223            .downcast_ref::<arrow::array::StringArray>()
224            .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
225
226        let event_types = batch
227            .column(1)
228            .as_any()
229            .downcast_ref::<arrow::array::StringArray>()
230            .ok_or_else(|| {
231                AllSourceError::StorageError("Invalid event_type column".to_string())
232            })?;
233
234        let entity_ids = batch
235            .column(2)
236            .as_any()
237            .downcast_ref::<arrow::array::StringArray>()
238            .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
239
240        let payloads = batch
241            .column(3)
242            .as_any()
243            .downcast_ref::<arrow::array::StringArray>()
244            .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
245
246        let timestamps = batch
247            .column(4)
248            .as_any()
249            .downcast_ref::<TimestampMicrosecondArray>()
250            .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
251
252        let metadatas = batch
253            .column(5)
254            .as_any()
255            .downcast_ref::<arrow::array::StringArray>()
256            .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
257
258        let versions = batch
259            .column(6)
260            .as_any()
261            .downcast_ref::<arrow::array::UInt64Array>()
262            .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
263
264        let mut events = Vec::new();
265
266        for i in 0..batch.num_rows() {
267            let id = uuid::Uuid::parse_str(event_ids.value(i)).map_err(|e| {
268                AllSourceError::StorageError(format!("Invalid UUID: {}", e))
269            })?;
270
271            let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
272                .ok_or_else(|| {
273                    AllSourceError::StorageError("Invalid timestamp".to_string())
274                })?;
275
276            let metadata = if metadatas.is_null(i) {
277                None
278            } else {
279                Some(serde_json::from_str(metadatas.value(i))?)
280            };
281
282            let event = Event::reconstruct_from_strings(
283                id,
284                event_types.value(i).to_string(),
285                entity_ids.value(i).to_string(),
286                "default".to_string(), // Default tenant for backward compatibility
287                serde_json::from_str(payloads.value(i))?,
288                timestamp,
289                metadata,
290                versions.value(i) as i64,
291            );
292
293            events.push(event);
294        }
295
296        Ok(events)
297    }
298
299    /// Get storage statistics
300    pub fn stats(&self) -> Result<StorageStats> {
301        let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
302            AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
303        })?;
304
305        let mut total_files = 0;
306        let mut total_size_bytes = 0u64;
307
308        for entry in entries.flatten() {
309            let path = entry.path();
310            if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
311                total_files += 1;
312                if let Ok(metadata) = entry.metadata() {
313                    total_size_bytes += metadata.len();
314                }
315            }
316        }
317
318        Ok(StorageStats {
319            total_files,
320            total_size_bytes,
321            storage_dir: self.storage_dir.clone(),
322            current_batch_size: self.current_batch.len(),
323        })
324    }
325}
326
327impl Drop for ParquetStorage {
328    fn drop(&mut self) {
329        // Ensure any remaining events are flushed
330        if !self.current_batch.is_empty() {
331            if let Err(e) = self.flush() {
332                tracing::error!("Failed to flush events on drop: {}", e);
333            }
334        }
335    }
336}
337
338#[derive(Debug, serde::Serialize)]
339pub struct StorageStats {
340    pub total_files: usize,
341    pub total_size_bytes: u64,
342    pub storage_dir: PathBuf,
343    pub current_batch_size: usize,
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use serde_json::json;
350    use tempfile::TempDir;
351
352    fn create_test_event(entity_id: &str) -> Event {
353        Event::reconstruct_from_strings(
354            uuid::Uuid::new_v4(),
355            "test.event".to_string(),
356            entity_id.to_string(),
357            "default".to_string(),
358            json!({
359                "test": "data",
360                "value": 42
361            }),
362            chrono::Utc::now(),
363            None,
364            1,
365        )
366    }
367
368    #[test]
369    fn test_parquet_storage_write_read() {
370        let temp_dir = TempDir::new().unwrap();
371        let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
372
373        // Add events
374        for i in 0..10 {
375            let event = create_test_event(&format!("entity-{}", i));
376            storage.append_event(event).unwrap();
377        }
378
379        // Flush to disk
380        storage.flush().unwrap();
381
382        // Load back
383        let loaded_events = storage.load_all_events().unwrap();
384        assert_eq!(loaded_events.len(), 10);
385    }
386
387    #[test]
388    fn test_storage_stats() {
389        let temp_dir = TempDir::new().unwrap();
390        let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
391
392        // Add and flush events
393        for i in 0..5 {
394            storage.append_event(create_test_event(&format!("entity-{}", i))).unwrap();
395        }
396        storage.flush().unwrap();
397
398        let stats = storage.stats().unwrap();
399        assert_eq!(stats.total_files, 1);
400        assert!(stats.total_size_bytes > 0);
401    }
402}