Skip to main content

azoth_file_log/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    event_log::{EventLog, EventLogIterator, EventLogStats},
4    types::EventId,
5};
6use serde::{Deserialize, Serialize};
7use std::fs::{File, OpenOptions};
8use std::io::{BufWriter, Read, Write};
9use std::path::{Path, PathBuf};
10use std::sync::{
11    atomic::{AtomicU64, Ordering},
12    Arc, Mutex,
13};
14
15/// Configuration for file-based event log
16#[derive(Debug, Clone)]
17pub struct FileEventLogConfig {
18    /// Base directory for event log files
19    pub base_dir: PathBuf,
20
21    /// Maximum size of a single log file before rotation (bytes)
22    pub max_file_size: u64,
23
24    /// Buffer size for writes
25    pub write_buffer_size: usize,
26
27    /// Maximum size for batch write buffer (bytes)
28    pub batch_buffer_size: usize,
29}
30
31impl Default for FileEventLogConfig {
32    fn default() -> Self {
33        Self {
34            base_dir: PathBuf::from("./data/event-log"),
35            max_file_size: 512 * 1024 * 1024, // 512MB
36            write_buffer_size: 256 * 1024,    // 256KB (increased from 64KB)
37            batch_buffer_size: 1024 * 1024,   // 1MB for batch writes
38        }
39    }
40}
41
42/// Metadata stored in meta.json
43#[derive(Debug, Clone, Serialize, Deserialize, Default)]
44struct EventLogMeta {
45    /// Next EventId to assign
46    next_event_id: EventId,
47
48    /// Current active log file number
49    current_file_num: u64,
50
51    /// Oldest event ID still in storage
52    oldest_event_id: EventId,
53
54    /// Total number of events across all files
55    total_events: u64,
56}
57
58/// File-based event log implementation
59pub struct FileEventLog {
60    config: FileEventLogConfig,
61    meta: Arc<Mutex<EventLogMeta>>,
62    next_event_id: Arc<AtomicU64>,
63    writer: Arc<Mutex<BufWriter<File>>>,
64    current_file_num: Arc<AtomicU64>,
65}
66
67impl FileEventLog {
68    /// Open or create a file-based event log
69    pub fn open(config: FileEventLogConfig) -> Result<Self> {
70        // Create base directory
71        std::fs::create_dir_all(&config.base_dir)?;
72
73        // Load or create metadata
74        let meta_path = config.base_dir.join("meta.json");
75        let meta = if meta_path.exists() {
76            let data = std::fs::read_to_string(&meta_path)?;
77            serde_json::from_str(&data)
78                .map_err(|e| AzothError::Projection(format!("Failed to parse meta.json: {}", e)))?
79        } else {
80            EventLogMeta::default()
81        };
82
83        let next_event_id = Arc::new(AtomicU64::new(meta.next_event_id));
84        let current_file_num = Arc::new(AtomicU64::new(meta.current_file_num));
85
86        // Open current log file for appending
87        let log_path = Self::log_file_path(&config.base_dir, meta.current_file_num);
88        let file = OpenOptions::new()
89            .create(true)
90            .append(true)
91            .open(&log_path)?;
92        let writer = Arc::new(Mutex::new(BufWriter::with_capacity(
93            config.write_buffer_size,
94            file,
95        )));
96
97        Ok(Self {
98            config,
99            meta: Arc::new(Mutex::new(meta)),
100            next_event_id,
101            writer,
102            current_file_num,
103        })
104    }
105
106    /// Get path to a log file by number
107    fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
108        base_dir.join(format!("events-{:08}.log", file_num))
109    }
110
111    /// Save metadata to disk
112    fn save_meta(&self) -> Result<()> {
113        let meta = self.meta.lock().unwrap();
114        let meta_path = self.config.base_dir.join("meta.json");
115        // Use compact serialization (not pretty) for better performance
116        let data = serde_json::to_string(&*meta)
117            .map_err(|e| AzothError::Projection(format!("Failed to serialize meta: {}", e)))?;
118        std::fs::write(&meta_path, data)?;
119        Ok(())
120    }
121
122    /// Write a single event entry to the current log file
123    ///
124    /// Format: [event_id: u64][size: u32][data: bytes]
125    fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
126        let mut writer = self.writer.lock().unwrap();
127
128        // Write event_id (8 bytes, big-endian)
129        writer.write_all(&event_id.to_be_bytes())?;
130
131        // Write size (4 bytes, big-endian)
132        let size = event_bytes.len() as u32;
133        writer.write_all(&size.to_be_bytes())?;
134
135        // Write event data
136        writer.write_all(event_bytes)?;
137
138        Ok(())
139    }
140
141    /// Check if rotation is needed and perform it
142    fn check_rotation(&self) -> Result<Option<PathBuf>> {
143        let log_path = Self::log_file_path(
144            &self.config.base_dir,
145            self.current_file_num.load(Ordering::SeqCst),
146        );
147
148        let file_size = std::fs::metadata(&log_path)?.len();
149
150        if file_size >= self.config.max_file_size {
151            self.rotate_internal()
152        } else {
153            Ok(None)
154        }
155    }
156
157    /// Perform log rotation
158    fn rotate_internal(&self) -> Result<Option<PathBuf>> {
159        // Flush current writer
160        {
161            let mut writer = self.writer.lock().unwrap();
162            writer.flush()?;
163        }
164
165        let old_file_num = self.current_file_num.load(Ordering::SeqCst);
166        let old_path = Self::log_file_path(&self.config.base_dir, old_file_num);
167
168        // Increment file number
169        let new_file_num = old_file_num + 1;
170        self.current_file_num.store(new_file_num, Ordering::SeqCst);
171
172        // Update metadata
173        {
174            let mut meta = self.meta.lock().unwrap();
175            meta.current_file_num = new_file_num;
176        }
177        self.save_meta()?;
178
179        // Open new log file
180        let new_path = Self::log_file_path(&self.config.base_dir, new_file_num);
181        let file = OpenOptions::new()
182            .create(true)
183            .append(true)
184            .open(&new_path)?;
185
186        // Replace writer
187        {
188            let mut writer = self.writer.lock().unwrap();
189            *writer = BufWriter::with_capacity(self.config.write_buffer_size, file);
190        }
191
192        tracing::info!(
193            "Rotated event log: {} -> {}",
194            old_path.display(),
195            new_path.display()
196        );
197
198        Ok(Some(old_path))
199    }
200}
201
202impl EventLog for FileEventLog {
203    fn append_with_id(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
204        // Write event entry
205        self.write_event_entry(event_id, event_bytes)?;
206
207        // Update metadata
208        {
209            let mut meta = self.meta.lock().unwrap();
210            meta.next_event_id = event_id + 1;
211            meta.total_events += 1;
212        }
213
214        // Update atomic counter to match
215        self.next_event_id.store(event_id + 1, Ordering::SeqCst);
216
217        // Check for rotation
218        self.check_rotation()?;
219
220        Ok(())
221    }
222
223    fn append_batch_with_ids(&self, first_event_id: EventId, events: &[Vec<u8>]) -> Result<()> {
224        if events.is_empty() {
225            return Err(AzothError::InvalidState("Cannot append empty batch".into()));
226        }
227
228        // Calculate total size needed: (8 + 4 + data_len) per event
229        let total_size: usize = events
230            .iter()
231            .map(|e| 8 + 4 + e.len()) // event_id + size + data
232            .sum();
233
234        // Check if batch exceeds reasonable size
235        if total_size > self.config.batch_buffer_size {
236            // For very large batches, fall back to individual writes
237            // to avoid excessive memory allocation
238            for (i, event_bytes) in events.iter().enumerate() {
239                let event_id = first_event_id + i as u64;
240                self.write_event_entry(event_id, event_bytes)?;
241            }
242        } else {
243            // Pre-allocate buffer for entire batch
244            let mut buffer = Vec::with_capacity(total_size);
245
246            // Serialize all events into buffer
247            for (i, event_bytes) in events.iter().enumerate() {
248                let event_id = first_event_id + i as u64;
249
250                // Write event_id (8 bytes, big-endian)
251                buffer.extend_from_slice(&event_id.to_be_bytes());
252
253                // Write size (4 bytes, big-endian)
254                let size = event_bytes.len() as u32;
255                buffer.extend_from_slice(&size.to_be_bytes());
256
257                // Write event data
258                buffer.extend_from_slice(event_bytes);
259            }
260
261            // Single lock acquisition + single write syscall
262            let mut writer = self.writer.lock().unwrap();
263            writer.write_all(&buffer)?;
264        }
265
266        // Flush after batch
267        {
268            let mut writer = self.writer.lock().unwrap();
269            writer.flush()?;
270        }
271
272        // Update metadata
273        let last_id = first_event_id + events.len() as u64 - 1;
274        {
275            let mut meta = self.meta.lock().unwrap();
276            meta.next_event_id = last_id + 1;
277            meta.total_events += events.len() as u64;
278        }
279
280        // Update atomic counter to match
281        self.next_event_id.store(last_id + 1, Ordering::SeqCst);
282
283        // Check for rotation
284        self.check_rotation()?;
285
286        Ok(())
287    }
288
289    fn next_event_id(&self) -> Result<EventId> {
290        Ok(self.next_event_id.load(Ordering::SeqCst))
291    }
292
293    fn iter_range(
294        &self,
295        start: EventId,
296        end: Option<EventId>,
297    ) -> Result<Box<dyn EventLogIterator>> {
298        // Flush writer to ensure all data is on disk
299        {
300            let mut writer = self.writer.lock().unwrap();
301            writer.flush()?;
302        }
303
304        let meta = self.meta.lock().unwrap();
305        let end_id = end.unwrap_or(meta.next_event_id);
306
307        Ok(Box::new(FileEventLogIter::new(
308            self.config.base_dir.clone(),
309            start,
310            end_id,
311            meta.current_file_num,
312        )?))
313    }
314
315    fn get(&self, event_id: EventId) -> Result<Option<Vec<u8>>> {
316        // Use iter_range to find single event
317        let mut iter = self.iter_range(event_id, Some(event_id + 1))?;
318
319        match iter.next() {
320            Some(Ok((id, data))) if id == event_id => Ok(Some(data)),
321            Some(Ok(_)) => Ok(None),
322            Some(Err(e)) => Err(e),
323            None => Ok(None),
324        }
325    }
326
327    fn delete_range(&self, start: EventId, end: EventId) -> Result<usize> {
328        // For now, deletion is not implemented
329        // In production, this would:
330        // 1. Identify log files that contain only events in [start, end]
331        // 2. Delete those files
332        // 3. Update oldest_event_id in metadata
333        // 4. Compact remaining files if needed
334
335        tracing::warn!("delete_range not yet implemented: {} to {}", start, end);
336        Ok(0)
337    }
338
339    fn rotate(&self) -> Result<PathBuf> {
340        self.rotate_internal()?
341            .ok_or_else(|| AzothError::InvalidState("No rotation needed".into()))
342    }
343
344    fn oldest_event_id(&self) -> Result<EventId> {
345        let meta = self.meta.lock().unwrap();
346        Ok(meta.oldest_event_id)
347    }
348
349    fn newest_event_id(&self) -> Result<EventId> {
350        let next = self.next_event_id.load(Ordering::SeqCst);
351        if next == 0 {
352            Ok(0)
353        } else {
354            Ok(next - 1)
355        }
356    }
357
358    fn sync(&self) -> Result<()> {
359        let mut writer = self.writer.lock().unwrap();
360        writer.flush()?;
361        writer.get_ref().sync_all()?;
362        self.save_meta()?;
363        Ok(())
364    }
365
366    fn stats(&self) -> Result<EventLogStats> {
367        let meta = self.meta.lock().unwrap();
368
369        // Calculate total bytes across all log files
370        let mut total_bytes = 0u64;
371        let mut file_count = 0usize;
372
373        for file_num in 0..=meta.current_file_num {
374            let path = Self::log_file_path(&self.config.base_dir, file_num);
375            if path.exists() {
376                total_bytes += std::fs::metadata(&path)?.len();
377                file_count += 1;
378            }
379        }
380
381        Ok(EventLogStats {
382            event_count: meta.total_events,
383            oldest_event_id: meta.oldest_event_id,
384            newest_event_id: if meta.next_event_id == 0 {
385                0
386            } else {
387                meta.next_event_id - 1
388            },
389            total_bytes,
390            file_count,
391        })
392    }
393}
394
395/// Ensure metadata is saved on drop
396impl Drop for FileEventLog {
397    fn drop(&mut self) {
398        // Attempt to flush writer and save metadata on drop
399        // This ensures meta.json is written even if the user forgets to call sync()
400        if let Err(e) = self.sync() {
401            eprintln!("Warning: Failed to sync FileEventLog on drop: {}", e);
402        }
403    }
404}
405
406/// Iterator over events in file-based log
407struct FileEventLogIter {
408    base_dir: PathBuf,
409    current_file_num: u64,
410    max_file_num: u64,
411    current_file: Option<File>,
412    next_event_id: EventId,
413    end_event_id: EventId,
414}
415
416impl FileEventLogIter {
417    fn new(base_dir: PathBuf, start: EventId, end: EventId, max_file_num: u64) -> Result<Self> {
418        let mut iter = Self {
419            base_dir,
420            current_file_num: 0,
421            max_file_num,
422            current_file: None,
423            next_event_id: start,
424            end_event_id: end,
425        };
426
427        // Open first file
428        iter.open_next_file()?;
429
430        Ok(iter)
431    }
432
433    fn open_next_file(&mut self) -> Result<bool> {
434        while self.current_file_num <= self.max_file_num {
435            let path = FileEventLog::log_file_path(&self.base_dir, self.current_file_num);
436
437            if path.exists() {
438                let file = File::open(&path)?;
439                self.current_file = Some(file);
440                return Ok(true);
441            }
442
443            self.current_file_num += 1;
444        }
445
446        Ok(false)
447    }
448
449    fn read_next_event(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
450        loop {
451            if self.next_event_id >= self.end_event_id {
452                return Ok(None);
453            }
454
455            let file = match self.current_file.as_mut() {
456                Some(f) => f,
457                None => return Ok(None),
458            };
459
460            // Try to read event header: [event_id: u64][size: u32]
461            let mut header = [0u8; 12];
462            match file.read_exact(&mut header) {
463                Ok(_) => {}
464                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
465                    // End of file, try next file
466                    self.current_file_num += 1;
467                    if !self.open_next_file()? {
468                        return Ok(None);
469                    }
470                    continue;
471                }
472                Err(e) => return Err(e.into()),
473            }
474
475            let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap());
476            let size = u32::from_be_bytes(header[8..12].try_into().unwrap());
477
478            // Read event data
479            let mut data = vec![0u8; size as usize];
480            file.read_exact(&mut data)?;
481
482            // Skip events before start
483            if event_id < self.next_event_id {
484                continue;
485            }
486
487            self.next_event_id = event_id + 1;
488            return Ok(Some((event_id, data)));
489        }
490    }
491}
492
493impl Iterator for FileEventLogIter {
494    type Item = Result<(EventId, Vec<u8>)>;
495
496    fn next(&mut self) -> Option<Self::Item> {
497        match self.read_next_event() {
498            Ok(Some(event)) => Some(Ok(event)),
499            Ok(None) => None,
500            Err(e) => Some(Err(e)),
501        }
502    }
503}
504
505// EventLogIterator is automatically implemented via blanket impl in azoth-core
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510    use tempfile::TempDir;
511
512    fn setup() -> (FileEventLog, TempDir) {
513        let temp_dir = TempDir::new().unwrap();
514        let config = FileEventLogConfig {
515            base_dir: temp_dir.path().to_path_buf(),
516            max_file_size: 1024, // Small for testing rotation
517            write_buffer_size: 128,
518            batch_buffer_size: 4096,
519        };
520        let log = FileEventLog::open(config).unwrap();
521        (log, temp_dir)
522    }
523
524    #[test]
525    fn test_append_and_read() {
526        let (log, _temp) = setup();
527
528        // Append events with pre-allocated IDs
529        log.append_with_id(0, b"event 0").unwrap();
530        log.append_with_id(1, b"event 1").unwrap();
531        log.append_with_id(2, b"event 2").unwrap();
532
533        // Read events
534        let mut iter = log.iter_range(0, None).unwrap();
535        assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
536        assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
537        assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
538        assert!(iter.next().is_none());
539    }
540
541    #[test]
542    fn test_batch_append() {
543        let (log, _temp) = setup();
544
545        let events = vec![
546            b"event 0".to_vec(),
547            b"event 1".to_vec(),
548            b"event 2".to_vec(),
549        ];
550
551        log.append_batch_with_ids(0, &events).unwrap();
552
553        let mut iter = log.iter_range(0, None).unwrap();
554        assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
555        assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
556        assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
557    }
558
559    #[test]
560    fn test_get_single_event() {
561        let (log, _temp) = setup();
562
563        log.append_with_id(0, b"event 0").unwrap();
564        log.append_with_id(1, b"event 1").unwrap();
565        log.append_with_id(2, b"event 2").unwrap();
566
567        assert_eq!(log.get(1).unwrap(), Some(b"event 1".to_vec()));
568        assert_eq!(log.get(99).unwrap(), None);
569    }
570
571    #[test]
572    fn test_stats() {
573        let (log, _temp) = setup();
574
575        log.append_with_id(0, b"event 0").unwrap();
576        log.append_with_id(1, b"event 1").unwrap();
577
578        // Sync to ensure data is written to disk
579        log.sync().unwrap();
580
581        let stats = log.stats().unwrap();
582        assert_eq!(stats.event_count, 2);
583        assert_eq!(stats.oldest_event_id, 0);
584        assert_eq!(stats.newest_event_id, 1);
585        assert!(stats.total_bytes > 0);
586        assert_eq!(stats.file_count, 1);
587    }
588}