1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use arrow::array::{
4 Array, ArrayRef, StringBuilder, TimestampMicrosecondArray, TimestampMicrosecondBuilder,
5 UInt64Builder,
6};
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow::record_batch::RecordBatch;
9use parquet::arrow::ArrowWriter;
10use parquet::file::properties::WriterProperties;
11use std::fs::{self, File};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14
15pub struct ParquetStorage {
17 storage_dir: PathBuf,
19
20 current_batch: Vec<Event>,
22
23 batch_size: usize,
25
26 schema: Arc<Schema>,
28}
29
30impl ParquetStorage {
31 pub fn new(storage_dir: impl AsRef<Path>) -> Result<Self> {
32 let storage_dir = storage_dir.as_ref().to_path_buf();
33
34 fs::create_dir_all(&storage_dir).map_err(|e| {
36 AllSourceError::StorageError(format!("Failed to create storage directory: {}", e))
37 })?;
38
39 let schema = Arc::new(Schema::new(vec![
41 Field::new("event_id", DataType::Utf8, false),
42 Field::new("event_type", DataType::Utf8, false),
43 Field::new("entity_id", DataType::Utf8, false),
44 Field::new("payload", DataType::Utf8, false),
45 Field::new(
46 "timestamp",
47 DataType::Timestamp(TimeUnit::Microsecond, None),
48 false,
49 ),
50 Field::new("metadata", DataType::Utf8, true),
51 Field::new("version", DataType::UInt64, false),
52 ]));
53
54 Ok(Self {
55 storage_dir,
56 current_batch: Vec::new(),
57 batch_size: 1000, schema,
59 })
60 }
61
62 pub fn append_event(&mut self, event: Event) -> Result<()> {
64 self.current_batch.push(event);
65
66 if self.current_batch.len() >= self.batch_size {
68 self.flush()?;
69 }
70
71 Ok(())
72 }
73
74 pub fn flush(&mut self) -> Result<()> {
76 if self.current_batch.is_empty() {
77 return Ok(());
78 }
79
80 let batch_count = self.current_batch.len();
81 tracing::info!("Flushing {} events to Parquet storage", batch_count);
82
83 let record_batch = self.events_to_record_batch(&self.current_batch)?;
85
86 let filename = format!(
88 "events-{}.parquet",
89 chrono::Utc::now().format("%Y%m%d-%H%M%S")
90 );
91 let file_path = self.storage_dir.join(filename);
92
93 let file = File::create(&file_path).map_err(|e| {
95 AllSourceError::StorageError(format!("Failed to create parquet file: {}", e))
96 })?;
97
98 let props = WriterProperties::builder()
99 .set_compression(parquet::basic::Compression::SNAPPY)
100 .build();
101
102 let mut writer = ArrowWriter::try_new(file, self.schema.clone(), Some(props))?;
103
104 writer.write(&record_batch)?;
105 writer.close()?;
106
107 tracing::info!(
108 "Successfully wrote {} events to {}",
109 batch_count,
110 file_path.display()
111 );
112
113 self.current_batch.clear();
115
116 Ok(())
117 }
118
119 fn events_to_record_batch(&self, events: &[Event]) -> Result<RecordBatch> {
121 let mut event_id_builder = StringBuilder::new();
122 let mut event_type_builder = StringBuilder::new();
123 let mut entity_id_builder = StringBuilder::new();
124 let mut payload_builder = StringBuilder::new();
125 let mut timestamp_builder = TimestampMicrosecondBuilder::new();
126 let mut metadata_builder = StringBuilder::new();
127 let mut version_builder = UInt64Builder::new();
128
129 for event in events {
130 event_id_builder.append_value(event.id.to_string());
131 event_type_builder.append_value(event.event_type_str());
132 entity_id_builder.append_value(event.entity_id_str());
133 payload_builder.append_value(serde_json::to_string(&event.payload)?);
134
135 let timestamp_micros = event.timestamp.timestamp_micros();
137 timestamp_builder.append_value(timestamp_micros);
138
139 if let Some(ref metadata) = event.metadata {
140 metadata_builder.append_value(serde_json::to_string(metadata)?);
141 } else {
142 metadata_builder.append_null();
143 }
144
145 version_builder.append_value(event.version as u64);
146 }
147
148 let arrays: Vec<ArrayRef> = vec![
149 Arc::new(event_id_builder.finish()),
150 Arc::new(event_type_builder.finish()),
151 Arc::new(entity_id_builder.finish()),
152 Arc::new(payload_builder.finish()),
153 Arc::new(timestamp_builder.finish()),
154 Arc::new(metadata_builder.finish()),
155 Arc::new(version_builder.finish()),
156 ];
157
158 let record_batch = RecordBatch::try_new(self.schema.clone(), arrays)?;
159
160 Ok(record_batch)
161 }
162
163 pub fn load_all_events(&self) -> Result<Vec<Event>> {
165 let mut all_events = Vec::new();
166
167 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
169 AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
170 })?;
171
172 let mut parquet_files: Vec<PathBuf> = entries
173 .filter_map(|entry| entry.ok())
174 .map(|entry| entry.path())
175 .filter(|path| {
176 path.extension()
177 .and_then(|ext| ext.to_str())
178 .map(|ext| ext == "parquet")
179 .unwrap_or(false)
180 })
181 .collect();
182
183 parquet_files.sort();
185
186 for file_path in parquet_files {
187 tracing::info!("Loading events from {}", file_path.display());
188 let file_events = self.load_events_from_file(&file_path)?;
189 all_events.extend(file_events);
190 }
191
192 tracing::info!("Loaded {} total events from storage", all_events.len());
193
194 Ok(all_events)
195 }
196
197 fn load_events_from_file(&self, file_path: &Path) -> Result<Vec<Event>> {
199 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
200
201 let file = File::open(file_path).map_err(|e| {
202 AllSourceError::StorageError(format!("Failed to open parquet file: {}", e))
203 })?;
204
205 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
206 let mut reader = builder.build()?;
207
208 let mut events = Vec::new();
209
210 while let Some(Ok(batch)) = reader.next() {
211 let batch_events = self.record_batch_to_events(&batch)?;
212 events.extend(batch_events);
213 }
214
215 Ok(events)
216 }
217
218 fn record_batch_to_events(&self, batch: &RecordBatch) -> Result<Vec<Event>> {
220 let event_ids = batch
221 .column(0)
222 .as_any()
223 .downcast_ref::<arrow::array::StringArray>()
224 .ok_or_else(|| AllSourceError::StorageError("Invalid event_id column".to_string()))?;
225
226 let event_types = batch
227 .column(1)
228 .as_any()
229 .downcast_ref::<arrow::array::StringArray>()
230 .ok_or_else(|| {
231 AllSourceError::StorageError("Invalid event_type column".to_string())
232 })?;
233
234 let entity_ids = batch
235 .column(2)
236 .as_any()
237 .downcast_ref::<arrow::array::StringArray>()
238 .ok_or_else(|| AllSourceError::StorageError("Invalid entity_id column".to_string()))?;
239
240 let payloads = batch
241 .column(3)
242 .as_any()
243 .downcast_ref::<arrow::array::StringArray>()
244 .ok_or_else(|| AllSourceError::StorageError("Invalid payload column".to_string()))?;
245
246 let timestamps = batch
247 .column(4)
248 .as_any()
249 .downcast_ref::<TimestampMicrosecondArray>()
250 .ok_or_else(|| AllSourceError::StorageError("Invalid timestamp column".to_string()))?;
251
252 let metadatas = batch
253 .column(5)
254 .as_any()
255 .downcast_ref::<arrow::array::StringArray>()
256 .ok_or_else(|| AllSourceError::StorageError("Invalid metadata column".to_string()))?;
257
258 let versions = batch
259 .column(6)
260 .as_any()
261 .downcast_ref::<arrow::array::UInt64Array>()
262 .ok_or_else(|| AllSourceError::StorageError("Invalid version column".to_string()))?;
263
264 let mut events = Vec::new();
265
266 for i in 0..batch.num_rows() {
267 let id = uuid::Uuid::parse_str(event_ids.value(i)).map_err(|e| {
268 AllSourceError::StorageError(format!("Invalid UUID: {}", e))
269 })?;
270
271 let timestamp = chrono::DateTime::from_timestamp_micros(timestamps.value(i))
272 .ok_or_else(|| {
273 AllSourceError::StorageError("Invalid timestamp".to_string())
274 })?;
275
276 let metadata = if metadatas.is_null(i) {
277 None
278 } else {
279 Some(serde_json::from_str(metadatas.value(i))?)
280 };
281
282 let event = Event::reconstruct_from_strings(
283 id,
284 event_types.value(i).to_string(),
285 entity_ids.value(i).to_string(),
286 "default".to_string(), serde_json::from_str(payloads.value(i))?,
288 timestamp,
289 metadata,
290 versions.value(i) as i64,
291 );
292
293 events.push(event);
294 }
295
296 Ok(events)
297 }
298
299 pub fn stats(&self) -> Result<StorageStats> {
301 let entries = fs::read_dir(&self.storage_dir).map_err(|e| {
302 AllSourceError::StorageError(format!("Failed to read storage directory: {}", e))
303 })?;
304
305 let mut total_files = 0;
306 let mut total_size_bytes = 0u64;
307
308 for entry in entries.flatten() {
309 let path = entry.path();
310 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
311 total_files += 1;
312 if let Ok(metadata) = entry.metadata() {
313 total_size_bytes += metadata.len();
314 }
315 }
316 }
317
318 Ok(StorageStats {
319 total_files,
320 total_size_bytes,
321 storage_dir: self.storage_dir.clone(),
322 current_batch_size: self.current_batch.len(),
323 })
324 }
325}
326
327impl Drop for ParquetStorage {
328 fn drop(&mut self) {
329 if !self.current_batch.is_empty() {
331 if let Err(e) = self.flush() {
332 tracing::error!("Failed to flush events on drop: {}", e);
333 }
334 }
335 }
336}
337
338#[derive(Debug, serde::Serialize)]
339pub struct StorageStats {
340 pub total_files: usize,
341 pub total_size_bytes: u64,
342 pub storage_dir: PathBuf,
343 pub current_batch_size: usize,
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use serde_json::json;
350 use tempfile::TempDir;
351
352 fn create_test_event(entity_id: &str) -> Event {
353 Event::reconstruct_from_strings(
354 uuid::Uuid::new_v4(),
355 "test.event".to_string(),
356 entity_id.to_string(),
357 "default".to_string(),
358 json!({
359 "test": "data",
360 "value": 42
361 }),
362 chrono::Utc::now(),
363 None,
364 1,
365 )
366 }
367
368 #[test]
369 fn test_parquet_storage_write_read() {
370 let temp_dir = TempDir::new().unwrap();
371 let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
372
373 for i in 0..10 {
375 let event = create_test_event(&format!("entity-{}", i));
376 storage.append_event(event).unwrap();
377 }
378
379 storage.flush().unwrap();
381
382 let loaded_events = storage.load_all_events().unwrap();
384 assert_eq!(loaded_events.len(), 10);
385 }
386
387 #[test]
388 fn test_storage_stats() {
389 let temp_dir = TempDir::new().unwrap();
390 let mut storage = ParquetStorage::new(temp_dir.path()).unwrap();
391
392 for i in 0..5 {
394 storage.append_event(create_test_event(&format!("entity-{}", i))).unwrap();
395 }
396 storage.flush().unwrap();
397
398 let stats = storage.stats().unwrap();
399 assert_eq!(stats.total_files, 1);
400 assert!(stats.total_size_bytes > 0);
401 }
402}