Skip to main content

azoth_file_log/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    event_log::{EventLog, EventLogIterator, EventLogStats},
4    types::EventId,
5};
6use parking_lot::Mutex;
7use serde::{Deserialize, Serialize};
8use std::fs::{File, OpenOptions};
9use std::io::{BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{
12    atomic::{AtomicU64, Ordering},
13    Arc,
14};
15use tokio::sync::Notify;
16
17/// Configuration for file-based event log
18#[derive(Debug, Clone)]
19pub struct FileEventLogConfig {
20    /// Base directory for event log files
21    pub base_dir: PathBuf,
22
23    /// Maximum size of a single log file before rotation (bytes)
24    pub max_file_size: u64,
25
26    /// Buffer size for writes
27    pub write_buffer_size: usize,
28
29    /// Maximum size for batch write buffer (bytes)
30    pub batch_buffer_size: usize,
31
32    /// Maximum size of a single event payload (bytes)
33    pub max_event_size: usize,
34
35    /// Maximum total size for a single append batch (bytes)
36    pub max_batch_bytes: usize,
37
38    /// Whether to flush the write buffer after each append (default: true).
39    ///
40    /// When `true`, every `append_with_id` / `append_events_batch` call flushes
41    /// the `BufWriter`, ensuring data reaches the OS page cache immediately.
42    /// Set to `false` for maximum throughput at the cost of losing in-flight
43    /// buffered events on a process crash (the OS-level file won't be corrupted,
44    /// but un-flushed events will be lost).
45    pub flush_on_append: bool,
46}
47
48impl Default for FileEventLogConfig {
49    fn default() -> Self {
50        Self {
51            base_dir: PathBuf::from("./data/event-log"),
52            max_file_size: 512 * 1024 * 1024,  // 512MB
53            write_buffer_size: 256 * 1024,     // 256KB (increased from 64KB)
54            batch_buffer_size: 1024 * 1024,    // 1MB for batch writes
55            max_event_size: 4 * 1024 * 1024,   // 4MB single-event limit
56            max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit
57            flush_on_append: true,
58        }
59    }
60}
61
62/// Metadata stored in meta.json
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64struct EventLogMeta {
65    /// Next EventId to assign
66    next_event_id: EventId,
67
68    /// Current active log file number
69    current_file_num: u64,
70
71    /// Oldest event ID still in storage
72    oldest_event_id: EventId,
73
74    /// Total number of events across all files
75    total_events: u64,
76}
77
78/// File-based event log implementation
79pub struct FileEventLog {
80    config: FileEventLogConfig,
81    meta: Arc<Mutex<EventLogMeta>>,
82    next_event_id: Arc<AtomicU64>,
83    writer: Arc<Mutex<BufWriter<File>>>,
84    current_file_num: Arc<AtomicU64>,
85    /// Optional notifier that fires after each successful event append.
86    ///
87    /// Enables push-based projection: the projector can `notify.notified().await`
88    /// instead of polling, giving near-zero-latency event processing with zero
89    /// CPU waste when idle.
90    event_notify: Option<Arc<Notify>>,
91}
92
93impl FileEventLog {
94    /// Open or create a file-based event log
95    pub fn open(config: FileEventLogConfig) -> Result<Self> {
96        // Create base directory
97        std::fs::create_dir_all(&config.base_dir)?;
98
99        // Load or create metadata
100        let meta_path = config.base_dir.join("meta.json");
101        let meta = if meta_path.exists() {
102            let data = std::fs::read_to_string(&meta_path)?;
103            serde_json::from_str(&data)
104                .map_err(|e| AzothError::Projection(format!("Failed to parse meta.json: {}", e)))?
105        } else {
106            EventLogMeta::default()
107        };
108
109        let next_event_id = Arc::new(AtomicU64::new(meta.next_event_id));
110        let current_file_num = Arc::new(AtomicU64::new(meta.current_file_num));
111
112        // Open current log file for appending
113        let log_path = Self::log_file_path(&config.base_dir, meta.current_file_num);
114        let file = OpenOptions::new()
115            .create(true)
116            .append(true)
117            .open(&log_path)?;
118        let writer = Arc::new(Mutex::new(BufWriter::with_capacity(
119            config.write_buffer_size,
120            file,
121        )));
122
123        Ok(Self {
124            config,
125            meta: Arc::new(Mutex::new(meta)),
126            next_event_id,
127            writer,
128            current_file_num,
129            event_notify: None,
130        })
131    }
132
133    /// Set the event notification handle.
134    ///
135    /// When set, `notify_waiters()` is called after every successful
136    /// `append_with_id` / `append_batch_with_ids`, waking any task that
137    /// is awaiting `notified()`.
138    pub fn set_event_notify(&mut self, notify: Arc<Notify>) {
139        self.event_notify = Some(notify);
140    }
141
142    /// Get a clone of the event notification handle (if set).
143    pub fn event_notify(&self) -> Option<Arc<Notify>> {
144        self.event_notify.clone()
145    }
146
147    /// Get path to a log file by number
148    fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
149        base_dir.join(format!("events-{:08}.log", file_num))
150    }
151
152    /// Save metadata to disk
153    fn save_meta(&self) -> Result<()> {
154        let meta = self.meta.lock();
155        let meta_path = self.config.base_dir.join("meta.json");
156        // Use compact serialization (not pretty) for better performance
157        let data = serde_json::to_string(&*meta)
158            .map_err(|e| AzothError::Projection(format!("Failed to serialize meta: {}", e)))?;
159        std::fs::write(&meta_path, data)?;
160        Ok(())
161    }
162
163    /// Write a single event entry to the current log file
164    ///
165    /// Format: [event_id: u64][size: u32][data: bytes]
166    fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
167        if event_bytes.len() > self.config.max_event_size {
168            return Err(AzothError::InvalidState(format!(
169                "Event size {} exceeds max_event_size {}",
170                event_bytes.len(),
171                self.config.max_event_size
172            )));
173        }
174
175        if event_bytes.len() > u32::MAX as usize {
176            return Err(AzothError::InvalidState(format!(
177                "Event size {} exceeds u32 encoding limit",
178                event_bytes.len()
179            )));
180        }
181
182        let mut writer = self.writer.lock();
183
184        // Write event_id (8 bytes, big-endian)
185        writer.write_all(&event_id.to_be_bytes())?;
186
187        // Write size (4 bytes, big-endian)
188        let size = event_bytes.len() as u32;
189        writer.write_all(&size.to_be_bytes())?;
190
191        // Write event data
192        writer.write_all(event_bytes)?;
193
194        Ok(())
195    }
196
197    /// Check if rotation is needed and perform it
198    fn check_rotation(&self) -> Result<Option<PathBuf>> {
199        let log_path = Self::log_file_path(
200            &self.config.base_dir,
201            self.current_file_num.load(Ordering::SeqCst),
202        );
203
204        let file_size = std::fs::metadata(&log_path)?.len();
205
206        if file_size >= self.config.max_file_size {
207            self.rotate_internal()
208        } else {
209            Ok(None)
210        }
211    }
212
213    /// Perform log rotation
214    fn rotate_internal(&self) -> Result<Option<PathBuf>> {
215        // Flush current writer
216        {
217            let mut writer = self.writer.lock();
218            writer.flush()?;
219        }
220
221        let old_file_num = self.current_file_num.load(Ordering::SeqCst);
222        let old_path = Self::log_file_path(&self.config.base_dir, old_file_num);
223
224        // Increment file number
225        let new_file_num = old_file_num + 1;
226        self.current_file_num.store(new_file_num, Ordering::SeqCst);
227
228        // Update metadata
229        {
230            let mut meta = self.meta.lock();
231            meta.current_file_num = new_file_num;
232        }
233        self.save_meta()?;
234
235        // Open new log file
236        let new_path = Self::log_file_path(&self.config.base_dir, new_file_num);
237        let file = OpenOptions::new()
238            .create(true)
239            .append(true)
240            .open(&new_path)?;
241
242        // Replace writer
243        {
244            let mut writer = self.writer.lock();
245            *writer = BufWriter::with_capacity(self.config.write_buffer_size, file);
246        }
247
248        tracing::info!(
249            "Rotated event log: {} -> {}",
250            old_path.display(),
251            new_path.display()
252        );
253
254        Ok(Some(old_path))
255    }
256}
257
258impl EventLog for FileEventLog {
259    fn append_with_id(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
260        // Write event entry
261        self.write_event_entry(event_id, event_bytes)?;
262
263        // Conditionally flush based on config
264        if self.config.flush_on_append {
265            let mut writer = self.writer.lock();
266            writer.flush()?;
267        }
268
269        // Update metadata
270        {
271            let mut meta = self.meta.lock();
272            meta.next_event_id = event_id + 1;
273            meta.total_events += 1;
274        }
275
276        // Update atomic counter to match
277        self.next_event_id.store(event_id + 1, Ordering::SeqCst);
278
279        // Check for rotation
280        self.check_rotation()?;
281
282        // Wake any waiting projectors / event processors
283        if let Some(notify) = &self.event_notify {
284            notify.notify_waiters();
285        }
286
287        Ok(())
288    }
289
290    fn append_batch_with_ids(&self, first_event_id: EventId, events: &[Vec<u8>]) -> Result<()> {
291        if events.is_empty() {
292            return Err(AzothError::InvalidState("Cannot append empty batch".into()));
293        }
294
295        // Calculate total size needed: (8 + 4 + data_len) per event
296        let total_size: usize = events
297            .iter()
298            .map(|e| 8 + 4 + e.len()) // event_id + size + data
299            .sum();
300
301        if total_size > self.config.max_batch_bytes {
302            return Err(AzothError::InvalidState(format!(
303                "Batch size {} exceeds max_batch_bytes {}",
304                total_size, self.config.max_batch_bytes
305            )));
306        }
307
308        for event in events {
309            if event.len() > self.config.max_event_size {
310                return Err(AzothError::InvalidState(format!(
311                    "Event size {} exceeds max_event_size {}",
312                    event.len(),
313                    self.config.max_event_size
314                )));
315            }
316            if event.len() > u32::MAX as usize {
317                return Err(AzothError::InvalidState(format!(
318                    "Event size {} exceeds u32 encoding limit",
319                    event.len()
320                )));
321            }
322        }
323
324        // Check if batch exceeds reasonable size
325        if total_size > self.config.batch_buffer_size {
326            // For very large batches, fall back to individual writes
327            // to avoid excessive memory allocation
328            for (i, event_bytes) in events.iter().enumerate() {
329                let event_id = first_event_id + i as u64;
330                self.write_event_entry(event_id, event_bytes)?;
331            }
332        } else {
333            // Pre-allocate buffer for entire batch
334            let mut buffer = Vec::with_capacity(total_size);
335
336            // Serialize all events into buffer
337            for (i, event_bytes) in events.iter().enumerate() {
338                let event_id = first_event_id + i as u64;
339
340                // Write event_id (8 bytes, big-endian)
341                buffer.extend_from_slice(&event_id.to_be_bytes());
342
343                // Write size (4 bytes, big-endian)
344                let size = event_bytes.len() as u32;
345                buffer.extend_from_slice(&size.to_be_bytes());
346
347                // Write event data
348                buffer.extend_from_slice(event_bytes);
349            }
350
351            // Single lock acquisition + single write syscall
352            let mut writer = self.writer.lock();
353            writer.write_all(&buffer)?;
354        }
355
356        // Conditionally flush based on config
357        if self.config.flush_on_append {
358            let mut writer = self.writer.lock();
359            writer.flush()?;
360        }
361
362        // Update metadata
363        let last_id = first_event_id + events.len() as u64 - 1;
364        {
365            let mut meta = self.meta.lock();
366            meta.next_event_id = last_id + 1;
367            meta.total_events += events.len() as u64;
368        }
369
370        // Update atomic counter to match
371        self.next_event_id.store(last_id + 1, Ordering::SeqCst);
372
373        // Check for rotation
374        self.check_rotation()?;
375
376        // Wake any waiting projectors / event processors
377        if let Some(notify) = &self.event_notify {
378            notify.notify_waiters();
379        }
380
381        Ok(())
382    }
383
384    fn next_event_id(&self) -> Result<EventId> {
385        Ok(self.next_event_id.load(Ordering::SeqCst))
386    }
387
388    fn iter_range(
389        &self,
390        start: EventId,
391        end: Option<EventId>,
392    ) -> Result<Box<dyn EventLogIterator>> {
393        // Flush writer to ensure all data is on disk
394        {
395            let mut writer = self.writer.lock();
396            writer.flush()?;
397        }
398
399        let meta = self.meta.lock();
400        let end_id = end.unwrap_or(meta.next_event_id);
401
402        Ok(Box::new(FileEventLogIter::new(
403            self.config.base_dir.clone(),
404            start,
405            end_id,
406            meta.current_file_num,
407            self.config.max_event_size,
408        )?))
409    }
410
411    fn get(&self, event_id: EventId) -> Result<Option<Vec<u8>>> {
412        // Use iter_range to find single event
413        let mut iter = self.iter_range(event_id, Some(event_id + 1))?;
414
415        match iter.next() {
416            Some(Ok((id, data))) if id == event_id => Ok(Some(data)),
417            Some(Ok(_)) => Ok(None),
418            Some(Err(e)) => Err(e),
419            None => Ok(None),
420        }
421    }
422
423    fn delete_range(&self, start: EventId, end: EventId) -> Result<usize> {
424        // For now, deletion is not implemented
425        // In production, this would:
426        // 1. Identify log files that contain only events in [start, end]
427        // 2. Delete those files
428        // 3. Update oldest_event_id in metadata
429        // 4. Compact remaining files if needed
430
431        tracing::warn!("delete_range not yet implemented: {} to {}", start, end);
432        Ok(0)
433    }
434
435    fn rotate(&self) -> Result<PathBuf> {
436        self.rotate_internal()?
437            .ok_or_else(|| AzothError::InvalidState("No rotation needed".into()))
438    }
439
440    fn oldest_event_id(&self) -> Result<EventId> {
441        let meta = self.meta.lock();
442        Ok(meta.oldest_event_id)
443    }
444
445    fn newest_event_id(&self) -> Result<EventId> {
446        let next = self.next_event_id.load(Ordering::SeqCst);
447        if next == 0 {
448            Ok(0)
449        } else {
450            Ok(next - 1)
451        }
452    }
453
454    fn sync(&self) -> Result<()> {
455        let mut writer = self.writer.lock();
456        writer.flush()?;
457        writer.get_ref().sync_all()?;
458        self.save_meta()?;
459        Ok(())
460    }
461
462    fn stats(&self) -> Result<EventLogStats> {
463        let meta = self.meta.lock();
464
465        // Calculate total bytes across all log files
466        let mut total_bytes = 0u64;
467        let mut file_count = 0usize;
468
469        for file_num in 0..=meta.current_file_num {
470            let path = Self::log_file_path(&self.config.base_dir, file_num);
471            if path.exists() {
472                total_bytes += std::fs::metadata(&path)?.len();
473                file_count += 1;
474            }
475        }
476
477        Ok(EventLogStats {
478            event_count: meta.total_events,
479            oldest_event_id: meta.oldest_event_id,
480            newest_event_id: if meta.next_event_id == 0 {
481                0
482            } else {
483                meta.next_event_id - 1
484            },
485            total_bytes,
486            file_count,
487        })
488    }
489}
490
491/// Ensure metadata is saved on drop
492impl Drop for FileEventLog {
493    fn drop(&mut self) {
494        // Attempt to flush writer and save metadata on drop
495        // This ensures meta.json is written even if the user forgets to call sync()
496        if let Err(e) = self.sync() {
497            eprintln!("Warning: Failed to sync FileEventLog on drop: {}", e);
498        }
499    }
500}
501
502/// Iterator over events in file-based log
503struct FileEventLogIter {
504    base_dir: PathBuf,
505    current_file_num: u64,
506    max_file_num: u64,
507    current_file: Option<File>,
508    next_event_id: EventId,
509    end_event_id: EventId,
510    max_event_size: usize,
511}
512
513impl FileEventLogIter {
514    fn new(
515        base_dir: PathBuf,
516        start: EventId,
517        end: EventId,
518        max_file_num: u64,
519        max_event_size: usize,
520    ) -> Result<Self> {
521        let mut iter = Self {
522            base_dir,
523            current_file_num: 0,
524            max_file_num,
525            current_file: None,
526            next_event_id: start,
527            end_event_id: end,
528            max_event_size,
529        };
530
531        // Open first file
532        iter.open_next_file()?;
533
534        Ok(iter)
535    }
536
537    fn open_next_file(&mut self) -> Result<bool> {
538        while self.current_file_num <= self.max_file_num {
539            let path = FileEventLog::log_file_path(&self.base_dir, self.current_file_num);
540
541            if path.exists() {
542                let file = File::open(&path)?;
543                self.current_file = Some(file);
544                return Ok(true);
545            }
546
547            self.current_file_num += 1;
548        }
549
550        Ok(false)
551    }
552
553    fn read_next_event(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
554        loop {
555            if self.next_event_id >= self.end_event_id {
556                return Ok(None);
557            }
558
559            let file = match self.current_file.as_mut() {
560                Some(f) => f,
561                None => return Ok(None),
562            };
563
564            // Try to read event header: [event_id: u64][size: u32]
565            let mut header = [0u8; 12];
566            match file.read_exact(&mut header) {
567                Ok(_) => {}
568                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
569                    // End of file, try next file
570                    self.current_file_num += 1;
571                    if !self.open_next_file()? {
572                        return Ok(None);
573                    }
574                    continue;
575                }
576                Err(e) => return Err(e.into()),
577            }
578
579            let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap());
580            let size = u32::from_be_bytes(header[8..12].try_into().unwrap());
581
582            if size as usize > self.max_event_size {
583                return Err(AzothError::InvalidState(format!(
584                    "Event {} size {} exceeds max_event_size {}",
585                    event_id, size, self.max_event_size
586                )));
587            }
588
589            // Read event data
590            let mut data = vec![0u8; size as usize];
591            file.read_exact(&mut data)?;
592
593            // Skip events before start
594            if event_id < self.next_event_id {
595                continue;
596            }
597
598            self.next_event_id = event_id + 1;
599            return Ok(Some((event_id, data)));
600        }
601    }
602}
603
604impl Iterator for FileEventLogIter {
605    type Item = Result<(EventId, Vec<u8>)>;
606
607    fn next(&mut self) -> Option<Self::Item> {
608        match self.read_next_event() {
609            Ok(Some(event)) => Some(Ok(event)),
610            Ok(None) => None,
611            Err(e) => Some(Err(e)),
612        }
613    }
614}
615
616// EventLogIterator is automatically implemented via blanket impl in azoth-core
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use tempfile::TempDir;
622
623    fn setup() -> (FileEventLog, TempDir) {
624        let temp_dir = TempDir::new().unwrap();
625        let config = FileEventLogConfig {
626            base_dir: temp_dir.path().to_path_buf(),
627            max_file_size: 1024, // Small for testing rotation
628            write_buffer_size: 128,
629            batch_buffer_size: 4096,
630            max_event_size: 1024 * 1024,
631            max_batch_bytes: 16 * 1024 * 1024,
632            flush_on_append: true,
633        };
634        let log = FileEventLog::open(config).unwrap();
635        (log, temp_dir)
636    }
637
638    #[test]
639    fn test_append_and_read() {
640        let (log, _temp) = setup();
641
642        // Append events with pre-allocated IDs
643        log.append_with_id(0, b"event 0").unwrap();
644        log.append_with_id(1, b"event 1").unwrap();
645        log.append_with_id(2, b"event 2").unwrap();
646
647        // Read events
648        let mut iter = log.iter_range(0, None).unwrap();
649        assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
650        assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
651        assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
652        assert!(iter.next().is_none());
653    }
654
655    #[test]
656    fn test_batch_append() {
657        let (log, _temp) = setup();
658
659        let events = vec![
660            b"event 0".to_vec(),
661            b"event 1".to_vec(),
662            b"event 2".to_vec(),
663        ];
664
665        log.append_batch_with_ids(0, &events).unwrap();
666
667        let mut iter = log.iter_range(0, None).unwrap();
668        assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
669        assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
670        assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
671    }
672
673    #[test]
674    fn test_get_single_event() {
675        let (log, _temp) = setup();
676
677        log.append_with_id(0, b"event 0").unwrap();
678        log.append_with_id(1, b"event 1").unwrap();
679        log.append_with_id(2, b"event 2").unwrap();
680
681        assert_eq!(log.get(1).unwrap(), Some(b"event 1".to_vec()));
682        assert_eq!(log.get(99).unwrap(), None);
683    }
684
685    #[test]
686    fn test_stats() {
687        let (log, _temp) = setup();
688
689        log.append_with_id(0, b"event 0").unwrap();
690        log.append_with_id(1, b"event 1").unwrap();
691
692        // Sync to ensure data is written to disk
693        log.sync().unwrap();
694
695        let stats = log.stats().unwrap();
696        assert_eq!(stats.event_count, 2);
697        assert_eq!(stats.oldest_event_id, 0);
698        assert_eq!(stats.newest_event_id, 1);
699        assert!(stats.total_bytes > 0);
700        assert_eq!(stats.file_count, 1);
701    }
702}