Skip to main content

azoth_file_log/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    event_log::{EventLog, EventLogIterator, EventLogStats},
4    types::EventId,
5};
6use serde::{Deserialize, Serialize};
7use std::fs::{File, OpenOptions};
8use std::io::{BufWriter, Read, Write};
9use std::path::{Path, PathBuf};
10use std::sync::{
11    atomic::{AtomicU64, Ordering},
12    Arc, Mutex,
13};
14
15/// Configuration for file-based event log
16#[derive(Debug, Clone)]
17pub struct FileEventLogConfig {
18    /// Base directory for event log files
19    pub base_dir: PathBuf,
20
21    /// Maximum size of a single log file before rotation (bytes)
22    pub max_file_size: u64,
23
24    /// Buffer size for writes
25    pub write_buffer_size: usize,
26
27    /// Maximum size for batch write buffer (bytes)
28    pub batch_buffer_size: usize,
29
30    /// Maximum size of a single event payload (bytes)
31    pub max_event_size: usize,
32
33    /// Maximum total size for a single append batch (bytes)
34    pub max_batch_bytes: usize,
35}
36
37impl Default for FileEventLogConfig {
38    fn default() -> Self {
39        Self {
40            base_dir: PathBuf::from("./data/event-log"),
41            max_file_size: 512 * 1024 * 1024,  // 512MB
42            write_buffer_size: 256 * 1024,     // 256KB (increased from 64KB)
43            batch_buffer_size: 1024 * 1024,    // 1MB for batch writes
44            max_event_size: 4 * 1024 * 1024,   // 4MB single-event limit
45            max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit
46        }
47    }
48}
49
50/// Metadata stored in meta.json
51#[derive(Debug, Clone, Serialize, Deserialize, Default)]
52struct EventLogMeta {
53    /// Next EventId to assign
54    next_event_id: EventId,
55
56    /// Current active log file number
57    current_file_num: u64,
58
59    /// Oldest event ID still in storage
60    oldest_event_id: EventId,
61
62    /// Total number of events across all files
63    total_events: u64,
64}
65
66/// File-based event log implementation
67pub struct FileEventLog {
68    config: FileEventLogConfig,
69    meta: Arc<Mutex<EventLogMeta>>,
70    next_event_id: Arc<AtomicU64>,
71    writer: Arc<Mutex<BufWriter<File>>>,
72    current_file_num: Arc<AtomicU64>,
73}
74
75impl FileEventLog {
76    /// Open or create a file-based event log
77    pub fn open(config: FileEventLogConfig) -> Result<Self> {
78        // Create base directory
79        std::fs::create_dir_all(&config.base_dir)?;
80
81        // Load or create metadata
82        let meta_path = config.base_dir.join("meta.json");
83        let meta = if meta_path.exists() {
84            let data = std::fs::read_to_string(&meta_path)?;
85            serde_json::from_str(&data)
86                .map_err(|e| AzothError::Projection(format!("Failed to parse meta.json: {}", e)))?
87        } else {
88            EventLogMeta::default()
89        };
90
91        let next_event_id = Arc::new(AtomicU64::new(meta.next_event_id));
92        let current_file_num = Arc::new(AtomicU64::new(meta.current_file_num));
93
94        // Open current log file for appending
95        let log_path = Self::log_file_path(&config.base_dir, meta.current_file_num);
96        let file = OpenOptions::new()
97            .create(true)
98            .append(true)
99            .open(&log_path)?;
100        let writer = Arc::new(Mutex::new(BufWriter::with_capacity(
101            config.write_buffer_size,
102            file,
103        )));
104
105        Ok(Self {
106            config,
107            meta: Arc::new(Mutex::new(meta)),
108            next_event_id,
109            writer,
110            current_file_num,
111        })
112    }
113
114    /// Get path to a log file by number
115    fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
116        base_dir.join(format!("events-{:08}.log", file_num))
117    }
118
119    /// Save metadata to disk
120    fn save_meta(&self) -> Result<()> {
121        let meta = self.meta.lock().unwrap();
122        let meta_path = self.config.base_dir.join("meta.json");
123        // Use compact serialization (not pretty) for better performance
124        let data = serde_json::to_string(&*meta)
125            .map_err(|e| AzothError::Projection(format!("Failed to serialize meta: {}", e)))?;
126        std::fs::write(&meta_path, data)?;
127        Ok(())
128    }
129
130    /// Write a single event entry to the current log file
131    ///
132    /// Format: [event_id: u64][size: u32][data: bytes]
133    fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
134        if event_bytes.len() > self.config.max_event_size {
135            return Err(AzothError::InvalidState(format!(
136                "Event size {} exceeds max_event_size {}",
137                event_bytes.len(),
138                self.config.max_event_size
139            )));
140        }
141
142        if event_bytes.len() > u32::MAX as usize {
143            return Err(AzothError::InvalidState(format!(
144                "Event size {} exceeds u32 encoding limit",
145                event_bytes.len()
146            )));
147        }
148
149        let mut writer = self.writer.lock().unwrap();
150
151        // Write event_id (8 bytes, big-endian)
152        writer.write_all(&event_id.to_be_bytes())?;
153
154        // Write size (4 bytes, big-endian)
155        let size = event_bytes.len() as u32;
156        writer.write_all(&size.to_be_bytes())?;
157
158        // Write event data
159        writer.write_all(event_bytes)?;
160
161        Ok(())
162    }
163
164    /// Check if rotation is needed and perform it
165    fn check_rotation(&self) -> Result<Option<PathBuf>> {
166        let log_path = Self::log_file_path(
167            &self.config.base_dir,
168            self.current_file_num.load(Ordering::SeqCst),
169        );
170
171        let file_size = std::fs::metadata(&log_path)?.len();
172
173        if file_size >= self.config.max_file_size {
174            self.rotate_internal()
175        } else {
176            Ok(None)
177        }
178    }
179
180    /// Perform log rotation
181    fn rotate_internal(&self) -> Result<Option<PathBuf>> {
182        // Flush current writer
183        {
184            let mut writer = self.writer.lock().unwrap();
185            writer.flush()?;
186        }
187
188        let old_file_num = self.current_file_num.load(Ordering::SeqCst);
189        let old_path = Self::log_file_path(&self.config.base_dir, old_file_num);
190
191        // Increment file number
192        let new_file_num = old_file_num + 1;
193        self.current_file_num.store(new_file_num, Ordering::SeqCst);
194
195        // Update metadata
196        {
197            let mut meta = self.meta.lock().unwrap();
198            meta.current_file_num = new_file_num;
199        }
200        self.save_meta()?;
201
202        // Open new log file
203        let new_path = Self::log_file_path(&self.config.base_dir, new_file_num);
204        let file = OpenOptions::new()
205            .create(true)
206            .append(true)
207            .open(&new_path)?;
208
209        // Replace writer
210        {
211            let mut writer = self.writer.lock().unwrap();
212            *writer = BufWriter::with_capacity(self.config.write_buffer_size, file);
213        }
214
215        tracing::info!(
216            "Rotated event log: {} -> {}",
217            old_path.display(),
218            new_path.display()
219        );
220
221        Ok(Some(old_path))
222    }
223}
224
225impl EventLog for FileEventLog {
226    fn append_with_id(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
227        // Write event entry
228        self.write_event_entry(event_id, event_bytes)?;
229
230        // Update metadata
231        {
232            let mut meta = self.meta.lock().unwrap();
233            meta.next_event_id = event_id + 1;
234            meta.total_events += 1;
235        }
236
237        // Update atomic counter to match
238        self.next_event_id.store(event_id + 1, Ordering::SeqCst);
239
240        // Check for rotation
241        self.check_rotation()?;
242
243        Ok(())
244    }
245
246    fn append_batch_with_ids(&self, first_event_id: EventId, events: &[Vec<u8>]) -> Result<()> {
247        if events.is_empty() {
248            return Err(AzothError::InvalidState("Cannot append empty batch".into()));
249        }
250
251        // Calculate total size needed: (8 + 4 + data_len) per event
252        let total_size: usize = events
253            .iter()
254            .map(|e| 8 + 4 + e.len()) // event_id + size + data
255            .sum();
256
257        if total_size > self.config.max_batch_bytes {
258            return Err(AzothError::InvalidState(format!(
259                "Batch size {} exceeds max_batch_bytes {}",
260                total_size, self.config.max_batch_bytes
261            )));
262        }
263
264        for event in events {
265            if event.len() > self.config.max_event_size {
266                return Err(AzothError::InvalidState(format!(
267                    "Event size {} exceeds max_event_size {}",
268                    event.len(),
269                    self.config.max_event_size
270                )));
271            }
272            if event.len() > u32::MAX as usize {
273                return Err(AzothError::InvalidState(format!(
274                    "Event size {} exceeds u32 encoding limit",
275                    event.len()
276                )));
277            }
278        }
279
280        // Check if batch exceeds reasonable size
281        if total_size > self.config.batch_buffer_size {
282            // For very large batches, fall back to individual writes
283            // to avoid excessive memory allocation
284            for (i, event_bytes) in events.iter().enumerate() {
285                let event_id = first_event_id + i as u64;
286                self.write_event_entry(event_id, event_bytes)?;
287            }
288        } else {
289            // Pre-allocate buffer for entire batch
290            let mut buffer = Vec::with_capacity(total_size);
291
292            // Serialize all events into buffer
293            for (i, event_bytes) in events.iter().enumerate() {
294                let event_id = first_event_id + i as u64;
295
296                // Write event_id (8 bytes, big-endian)
297                buffer.extend_from_slice(&event_id.to_be_bytes());
298
299                // Write size (4 bytes, big-endian)
300                let size = event_bytes.len() as u32;
301                buffer.extend_from_slice(&size.to_be_bytes());
302
303                // Write event data
304                buffer.extend_from_slice(event_bytes);
305            }
306
307            // Single lock acquisition + single write syscall
308            let mut writer = self.writer.lock().unwrap();
309            writer.write_all(&buffer)?;
310        }
311
312        // Flush after batch
313        {
314            let mut writer = self.writer.lock().unwrap();
315            writer.flush()?;
316        }
317
318        // Update metadata
319        let last_id = first_event_id + events.len() as u64 - 1;
320        {
321            let mut meta = self.meta.lock().unwrap();
322            meta.next_event_id = last_id + 1;
323            meta.total_events += events.len() as u64;
324        }
325
326        // Update atomic counter to match
327        self.next_event_id.store(last_id + 1, Ordering::SeqCst);
328
329        // Check for rotation
330        self.check_rotation()?;
331
332        Ok(())
333    }
334
335    fn next_event_id(&self) -> Result<EventId> {
336        Ok(self.next_event_id.load(Ordering::SeqCst))
337    }
338
339    fn iter_range(
340        &self,
341        start: EventId,
342        end: Option<EventId>,
343    ) -> Result<Box<dyn EventLogIterator>> {
344        // Flush writer to ensure all data is on disk
345        {
346            let mut writer = self.writer.lock().unwrap();
347            writer.flush()?;
348        }
349
350        let meta = self.meta.lock().unwrap();
351        let end_id = end.unwrap_or(meta.next_event_id);
352
353        Ok(Box::new(FileEventLogIter::new(
354            self.config.base_dir.clone(),
355            start,
356            end_id,
357            meta.current_file_num,
358            self.config.max_event_size,
359        )?))
360    }
361
362    fn get(&self, event_id: EventId) -> Result<Option<Vec<u8>>> {
363        // Use iter_range to find single event
364        let mut iter = self.iter_range(event_id, Some(event_id + 1))?;
365
366        match iter.next() {
367            Some(Ok((id, data))) if id == event_id => Ok(Some(data)),
368            Some(Ok(_)) => Ok(None),
369            Some(Err(e)) => Err(e),
370            None => Ok(None),
371        }
372    }
373
374    fn delete_range(&self, start: EventId, end: EventId) -> Result<usize> {
375        // For now, deletion is not implemented
376        // In production, this would:
377        // 1. Identify log files that contain only events in [start, end]
378        // 2. Delete those files
379        // 3. Update oldest_event_id in metadata
380        // 4. Compact remaining files if needed
381
382        tracing::warn!("delete_range not yet implemented: {} to {}", start, end);
383        Ok(0)
384    }
385
386    fn rotate(&self) -> Result<PathBuf> {
387        self.rotate_internal()?
388            .ok_or_else(|| AzothError::InvalidState("No rotation needed".into()))
389    }
390
391    fn oldest_event_id(&self) -> Result<EventId> {
392        let meta = self.meta.lock().unwrap();
393        Ok(meta.oldest_event_id)
394    }
395
396    fn newest_event_id(&self) -> Result<EventId> {
397        let next = self.next_event_id.load(Ordering::SeqCst);
398        if next == 0 {
399            Ok(0)
400        } else {
401            Ok(next - 1)
402        }
403    }
404
405    fn sync(&self) -> Result<()> {
406        let mut writer = self.writer.lock().unwrap();
407        writer.flush()?;
408        writer.get_ref().sync_all()?;
409        self.save_meta()?;
410        Ok(())
411    }
412
413    fn stats(&self) -> Result<EventLogStats> {
414        let meta = self.meta.lock().unwrap();
415
416        // Calculate total bytes across all log files
417        let mut total_bytes = 0u64;
418        let mut file_count = 0usize;
419
420        for file_num in 0..=meta.current_file_num {
421            let path = Self::log_file_path(&self.config.base_dir, file_num);
422            if path.exists() {
423                total_bytes += std::fs::metadata(&path)?.len();
424                file_count += 1;
425            }
426        }
427
428        Ok(EventLogStats {
429            event_count: meta.total_events,
430            oldest_event_id: meta.oldest_event_id,
431            newest_event_id: if meta.next_event_id == 0 {
432                0
433            } else {
434                meta.next_event_id - 1
435            },
436            total_bytes,
437            file_count,
438        })
439    }
440}
441
442/// Ensure metadata is saved on drop
443impl Drop for FileEventLog {
444    fn drop(&mut self) {
445        // Attempt to flush writer and save metadata on drop
446        // This ensures meta.json is written even if the user forgets to call sync()
447        if let Err(e) = self.sync() {
448            eprintln!("Warning: Failed to sync FileEventLog on drop: {}", e);
449        }
450    }
451}
452
453/// Iterator over events in file-based log
454struct FileEventLogIter {
455    base_dir: PathBuf,
456    current_file_num: u64,
457    max_file_num: u64,
458    current_file: Option<File>,
459    next_event_id: EventId,
460    end_event_id: EventId,
461    max_event_size: usize,
462}
463
464impl FileEventLogIter {
465    fn new(
466        base_dir: PathBuf,
467        start: EventId,
468        end: EventId,
469        max_file_num: u64,
470        max_event_size: usize,
471    ) -> Result<Self> {
472        let mut iter = Self {
473            base_dir,
474            current_file_num: 0,
475            max_file_num,
476            current_file: None,
477            next_event_id: start,
478            end_event_id: end,
479            max_event_size,
480        };
481
482        // Open first file
483        iter.open_next_file()?;
484
485        Ok(iter)
486    }
487
488    fn open_next_file(&mut self) -> Result<bool> {
489        while self.current_file_num <= self.max_file_num {
490            let path = FileEventLog::log_file_path(&self.base_dir, self.current_file_num);
491
492            if path.exists() {
493                let file = File::open(&path)?;
494                self.current_file = Some(file);
495                return Ok(true);
496            }
497
498            self.current_file_num += 1;
499        }
500
501        Ok(false)
502    }
503
504    fn read_next_event(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
505        loop {
506            if self.next_event_id >= self.end_event_id {
507                return Ok(None);
508            }
509
510            let file = match self.current_file.as_mut() {
511                Some(f) => f,
512                None => return Ok(None),
513            };
514
515            // Try to read event header: [event_id: u64][size: u32]
516            let mut header = [0u8; 12];
517            match file.read_exact(&mut header) {
518                Ok(_) => {}
519                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
520                    // End of file, try next file
521                    self.current_file_num += 1;
522                    if !self.open_next_file()? {
523                        return Ok(None);
524                    }
525                    continue;
526                }
527                Err(e) => return Err(e.into()),
528            }
529
530            let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap());
531            let size = u32::from_be_bytes(header[8..12].try_into().unwrap());
532
533            if size as usize > self.max_event_size {
534                return Err(AzothError::InvalidState(format!(
535                    "Event {} size {} exceeds max_event_size {}",
536                    event_id, size, self.max_event_size
537                )));
538            }
539
540            // Read event data
541            let mut data = vec![0u8; size as usize];
542            file.read_exact(&mut data)?;
543
544            // Skip events before start
545            if event_id < self.next_event_id {
546                continue;
547            }
548
549            self.next_event_id = event_id + 1;
550            return Ok(Some((event_id, data)));
551        }
552    }
553}
554
555impl Iterator for FileEventLogIter {
556    type Item = Result<(EventId, Vec<u8>)>;
557
558    fn next(&mut self) -> Option<Self::Item> {
559        match self.read_next_event() {
560            Ok(Some(event)) => Some(Ok(event)),
561            Ok(None) => None,
562            Err(e) => Some(Err(e)),
563        }
564    }
565}
566
567// EventLogIterator is automatically implemented via blanket impl in azoth-core
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use tempfile::TempDir;
573
574    fn setup() -> (FileEventLog, TempDir) {
575        let temp_dir = TempDir::new().unwrap();
576        let config = FileEventLogConfig {
577            base_dir: temp_dir.path().to_path_buf(),
578            max_file_size: 1024, // Small for testing rotation
579            write_buffer_size: 128,
580            batch_buffer_size: 4096,
581            max_event_size: 1024 * 1024,
582            max_batch_bytes: 16 * 1024 * 1024,
583        };
584        let log = FileEventLog::open(config).unwrap();
585        (log, temp_dir)
586    }
587
588    #[test]
589    fn test_append_and_read() {
590        let (log, _temp) = setup();
591
592        // Append events with pre-allocated IDs
593        log.append_with_id(0, b"event 0").unwrap();
594        log.append_with_id(1, b"event 1").unwrap();
595        log.append_with_id(2, b"event 2").unwrap();
596
597        // Read events
598        let mut iter = log.iter_range(0, None).unwrap();
599        assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
600        assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
601        assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
602        assert!(iter.next().is_none());
603    }
604
605    #[test]
606    fn test_batch_append() {
607        let (log, _temp) = setup();
608
609        let events = vec![
610            b"event 0".to_vec(),
611            b"event 1".to_vec(),
612            b"event 2".to_vec(),
613        ];
614
615        log.append_batch_with_ids(0, &events).unwrap();
616
617        let mut iter = log.iter_range(0, None).unwrap();
618        assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
619        assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
620        assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
621    }
622
623    #[test]
624    fn test_get_single_event() {
625        let (log, _temp) = setup();
626
627        log.append_with_id(0, b"event 0").unwrap();
628        log.append_with_id(1, b"event 1").unwrap();
629        log.append_with_id(2, b"event 2").unwrap();
630
631        assert_eq!(log.get(1).unwrap(), Some(b"event 1".to_vec()));
632        assert_eq!(log.get(99).unwrap(), None);
633    }
634
635    #[test]
636    fn test_stats() {
637        let (log, _temp) = setup();
638
639        log.append_with_id(0, b"event 0").unwrap();
640        log.append_with_id(1, b"event 1").unwrap();
641
642        // Sync to ensure data is written to disk
643        log.sync().unwrap();
644
645        let stats = log.stats().unwrap();
646        assert_eq!(stats.event_count, 2);
647        assert_eq!(stats.oldest_event_id, 0);
648        assert_eq!(stats.newest_event_id, 1);
649        assert!(stats.total_bytes > 0);
650        assert_eq!(stats.file_count, 1);
651    }
652}