1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use arrow::array::{
4 Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
5 UInt64Builder,
6};
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow::record_batch::RecordBatch;
9use parquet::arrow::ArrowWriter;
10use parquet::file::properties::WriterProperties;
11use std::fs::{self, File};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14
15pub struct ParquetStorage {
17 storage_dir: PathBuf,
19
20 current_batch: Vec<Event>,
22
23 batch_size: usize,
25
26 schema: Arc<Schema>,
28}
29
30impl ParquetStorage {
31 pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
32 let storage_dir = storage_dir.as_ref().to_path_buf();
33
34 fs::create_dir_all(&storage_dir).map_err(|e| {
36 AllSourceError::StorageError(format!("Failed to create storage directory: {}", e))
37 })?;
38
39 let schema = Arc::new(Schema::new(vec![
41 Field::new("event_id", DataType::Utf8, false),
42 Field::new("event_type", DataType::Utf8, false),
43 Field::new("entity_id", DataType::Utf8, false),
44 Field::new("payload", DataType::Utf8, false),
45 Field::new(
46 "timestamp",
47 DataType::Timestamp(TimeUnit::Microsecond, None),
48 false,
49 ),
50 Field::new("metadata", DataType::Utf8, true),
51 Field::new("version", DataType::UInt64, false),
52 ]));
53
54 Ok(Self {
55 storage_dir,
56 current_batch: Vec::new(),
57 batch_size: 1000, schema,
59 })
60 }
61
62 pub fn append_event(&mut self, event: Event) -> Result<()> {
64 self.current_batch.push(event);
65
66 if self.current_batch.len() >= self.batch_size {
68 self.flush()?;
69 }
70
71 Ok(())
72 }
73
74 pub fn flush(&mut self) -> Result<()> {
76 if self.current_batch.is_empty() {
77 return Ok(());
78 }
79
80 let batch_count = self.current_batch.len();
81 tracing::info!("Flushing {} events to Parquet storage", batch_count);
82
83 let record_batch = self.events_to_record_batch(&self.current_batch)?;
85
86 let filename = format!(
88 "events-{}.parquet",
89 chrono::Utc::now().format("%Y%m%d-%H%M%S")
90 );
91 let file_path = self.storage_dir.join(filename);
92
93 let file = File::create(&file_path).map_err(|e| {
95 AllSourceError::StorageError(format!("Failed to create parquet file: {}", e))
96 })?;
97
98 let props = WriterProperties::builder()
99 .set_compression(parquet::basic::Compression::SNAPPY)
100 .build();
101
102 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
103
104 writer.write(&record_batch)?;
105 writer.close()?;
106
107 tracing::info!(
108 "Successfully wrote {} events to {}",
109 batch_count,
110 file_path.display()
111 );
112
113 self.current_batch.clear();
115
116 Ok(())
117 }
118
119 fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
121 let mut event_id_builder = StringBuilder::new();
122 let mut event_type_builder = StringBuilder::new();
123 let mut entity_id_builder = StringBuilder::new();
124 let mut payload_builder = StringBuilder::new();
125 let mut timestamp_builder = TimestampMicrosecondBuilder::new();
126 let mut metadata_builder = StringBuilder::new();
127 let mut version_builder = UInt64Builder::new();
128
129 for event in events {
130 event_id_builder.append_value(event.id.to_string());
131 event_type_builder.append_value(event.event_type_str());
132 entity_id_builder.append_value(event.entity_id_str());
133 payload_builder.append_value(serde_json::to_string(&event.payload)?);
134
135 let timestamp_micros = event.timestamp.timestamp_micros();
137 timestamp_builder.append_value(timestamp_micros);
138
139 if let Some(ref metadata) = event.metadata {
140 metadata_builder.append_value(serde_json::to_string(metadata)?);
141 } else {
142 metadata_builder.append_null();
143 }
144
145 version_builder.append_value(event.version as u64);
146 }
147
148 let arrays: Vec<ArrayRef> = vec![
149 Arc::new(event_id_builder.finish()),
150 Arc::new(event_type_builder.finish()),
151 Arc::new(entity_id_builder.finish()),
152 Arc::new(payload_builder.finish()),
153 Arc::new(timestamp_builder.finish()),
154 Arc::new(metadata_builder.finish()),
155 Arc::new(version_builder.finish()),
156 ];
157
158 let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
159
160 Ok(record_batch)
161 }
162
163 pub fn load_all_events(&self) -> Result<Vec<Event>> {
165 let mut all_events = Vec::new();
166
167 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
169 AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
170 })?;
171
172 let mut parquet_files: Vec<PathBuf> = entries
173 .filter_map(|entry| entry.ok())
174 .map(|entry| entry.path())
175 .filter(|path| {
176 path.extension()
177 .and_then(|ext| ext.to_str())
178 .map(|ext| ext == "parquet")
179 .unwrap_or(false)
180 })
181 .collect();
182
183 parquet_files.sort();
185
186 for file_path in parquet_files {
187 tracing::info!("Loading events from {}", file_path.display());
188 let file_events = self.load_events_from_file(&file_path)?;
189 all_events.extend(file_events);
190 }
191
192 tracing::info!("Loaded {} total events from storage", all_events.len());
193
194 Ok(all_events)
195 }
196
197 fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
199 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
200
201 let file = File::open(file_path).map_err(|e| {
202 AllSourceError::StorageError(format!("Failed to open parquet file: {}", e))
203 })?;
204
205 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
206 let mut reader = builder.build()?;
207
208 let mut events = Vec::new();
209
210 while let Some(Ok(batch)) = reader.next() {
211 let batch_events = self.record_batch_to_events(&batch)?;
212 events.extend(batch_events);
213 }
214
215 Ok(events)
216 }
217
218 fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
220 let event_ids = batch
221 .column(0)
222 .as_any()
223 .downcast_ref::<arrow::array::StringArray>()
224 .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
225
226 let event_types = batch
227 .column(1)
228 .as_any()
229 .downcast_ref::<arrow::array::StringArray>()
230 .ok_or_else(|| AllSourceError::StorageError("Invalid event_type column".to_string()))?;
231
232 let entity_ids = batch
233 .column(2)
234 .as_any()
235 .downcast_ref::<arrow::array::StringArray>()
236 .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
237
238 let payloads = batch
239 .column(3)
240 .as_any()
241 .downcast_ref::<arrow::array::StringArray>()
242 .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
243
244 let timestamps = batch
245 .column(4)
246 .as_any()
247 .downcast_ref::<TimestampMicrosecondArray>()
248 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
249
250 let metadatas = batch
251 .column(5)
252 .as_any()
253 .downcast_ref::<arrow::array::StringArray>()
254 .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
255
256 let versions = batch
257 .column(6)
258 .as_any()
259 .downcast_ref::<arrow::array::UInt64Array>()
260 .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
261
262 let mut events = Vec::new();
263
264 for i in 0..batch.num_rows() {
265 let id = uuid::Uuid::parse_str(event_ids.value(i))
266 .map_err(|e| AllSourceError::StorageError(format!("Invalid UUID: {}", e)))?;
267
268 let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
269 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp".to_string()))?;
270
271 let metadata = if metadatas.is_null(i) {
272 None
273 } else {
274 Some(serde_json::from_str(metadatas.value(i))?)
275 };
276
277 let event = Event::reconstruct_from_strings(
278 id,
279 event_types.value(i).to_string(),
280 entity_ids.value(i).to_string(),
281 "default".to_string(), serde_json::from_str(payloads.value(i))?,
283 timestamp,
284 metadata,
285 versions.value(i) as i64,
286 );
287
288 events.push(event);
289 }
290
291 Ok(events)
292 }
293
294 pub fn stats(&self) -> Result<StorageStats> {
296 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
297 AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
298 })?;
299
300 let mut total_files = 0;
301 let mut total_size_bytes = 0u64;
302
303 for entry in entries.flatten() {
304 let path = entry.path();
305 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
306 total_files += 1;
307 if let Ok(metadata) = entry.metadata() {
308 total_size_bytes += metadata.len();
309 }
310 }
311 }
312
313 Ok(StorageStats {
314 total_files,
315 total_size_bytes,
316 storage_dir: self.storage_dir.clone(),
317 current_batch_size: self.current_batch.len(),
318 })
319 }
320}
321
322impl Drop for ParquetStorage {
323 fn drop(&mut self) {
324 if !self.current_batch.is_empty() {
326 if let Err(e) = self.flush() {
327 tracing::error!("Failed to flush events on drop: {}", e);
328 }
329 }
330 }
331}
332
333#[derive(Debug, serde::Serialize)]
334pub struct StorageStats {
335 pub total_files: usize,
336 pub total_size_bytes: u64,
337 pub storage_dir: PathBuf,
338 pub current_batch_size: usize,
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use serde_json::json;
345 use tempfile::TempDir;
346
347 fn create_test_event(entity_id: &str) -> Event {
348 Event::reconstruct_from_strings(
349 uuid::Uuid::new_v4(),
350 "test.event".to_string(),
351 entity_id.to_string(),
352 "default".to_string(),
353 json!({
354 "test": "data",
355 "value": 42
356 }),
357 chrono::Utc::now(),
358 None,
359 1,
360 )
361 }
362
363 #[test]
364 fn test_parquet_storage_write_read() {
365 let temp_dir = TempDir::new().unwrap();
366 let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
367
368 for i in 0..10 {
370 let event = create_test_event(&format!("entity-{}", i));
371 storage.append_event(event).unwrap();
372 }
373
374 storage.flush().unwrap();
376
377 let loaded_events = storage.load_all_events().unwrap();
379 assert_eq!(loaded_events.len(), 10);
380 }
381
382 #[test]
383 fn test_storage_stats() {
384 let temp_dir = TempDir::new().unwrap();
385 let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
386
387 for i in 0..5 {
389 storage
390 .append_event(create_test_event(&format!("entity-{}", i)))
391 .unwrap();
392 }
393 storage.flush().unwrap();
394
395 let stats = storage.stats().unwrap();
396 assert_eq!(stats.total_files, 1);
397 assert!(stats.total_size_bytes > 0);
398 }
399}