1use azoth_core::{
2 error::{AzothError, Result},
3 event_log::{EventLog, EventLogIterator, EventLogStats},
4 types::EventId,
5};
6use parking_lot::Mutex;
7use serde::{Deserialize, Serialize};
8use std::fs::{File, OpenOptions};
9use std::io::{BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{
12 atomic::{AtomicU64, Ordering},
13 Arc,
14};
15use tokio::sync::Notify;
16
17#[derive(Debug, Clone)]
19pub struct FileEventLogConfig {
20 pub base_dir: PathBuf,
22
23 pub max_file_size: u64,
25
26 pub write_buffer_size: usize,
28
29 pub batch_buffer_size: usize,
31
32 pub max_event_size: usize,
34
35 pub max_batch_bytes: usize,
37
38 pub flush_on_append: bool,
46}
47
48impl Default for FileEventLogConfig {
49 fn default() -> Self {
50 Self {
51 base_dir: PathBuf::from("./data/event-log"),
52 max_file_size: 512 * 1024 * 1024, write_buffer_size: 256 * 1024, batch_buffer_size: 1024 * 1024, max_event_size: 4 * 1024 * 1024, max_batch_bytes: 64 * 1024 * 1024, flush_on_append: true,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64struct EventLogMeta {
65 next_event_id: EventId,
67
68 current_file_num: u64,
70
71 oldest_event_id: EventId,
73
74 total_events: u64,
76}
77
78pub struct FileEventLog {
80 config: FileEventLogConfig,
81 meta: Arc<Mutex<EventLogMeta>>,
82 next_event_id: Arc<AtomicU64>,
83 writer: Arc<Mutex<BufWriter<File>>>,
84 current_file_num: Arc<AtomicU64>,
85 event_notify: Option<Arc<Notify>>,
91}
92
93impl FileEventLog {
94 pub fn open(config: FileEventLogConfig) -> Result<Self> {
96 std::fs::create_dir_all(&config.base_dir)?;
98
99 let meta_path = config.base_dir.join("meta.json");
101 let meta = if meta_path.exists() {
102 let data = std::fs::read_to_string(&meta_path)?;
103 serde_json::from_str(&data)
104 .map_err(|e| AzothError::Projection(format!("Failed to parse meta.json: {}", e)))?
105 } else {
106 EventLogMeta::default()
107 };
108
109 let next_event_id = Arc::new(AtomicU64::new(meta.next_event_id));
110 let current_file_num = Arc::new(AtomicU64::new(meta.current_file_num));
111
112 let log_path = Self::log_file_path(&config.base_dir, meta.current_file_num);
114 let file = OpenOptions::new()
115 .create(true)
116 .append(true)
117 .open(&log_path)?;
118 let writer = Arc::new(Mutex::new(BufWriter::with_capacity(
119 config.write_buffer_size,
120 file,
121 )));
122
123 Ok(Self {
124 config,
125 meta: Arc::new(Mutex::new(meta)),
126 next_event_id,
127 writer,
128 current_file_num,
129 event_notify: None,
130 })
131 }
132
133 pub fn set_event_notify(&mut self, notify: Arc<Notify>) {
139 self.event_notify = Some(notify);
140 }
141
142 pub fn event_notify(&self) -> Option<Arc<Notify>> {
144 self.event_notify.clone()
145 }
146
147 fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
149 base_dir.join(format!("events-{:08}.log", file_num))
150 }
151
152 fn save_meta(&self) -> Result<()> {
154 let meta = self.meta.lock();
155 let meta_path = self.config.base_dir.join("meta.json");
156 let data = serde_json::to_string(&*meta)
158 .map_err(|e| AzothError::Projection(format!("Failed to serialize meta: {}", e)))?;
159 std::fs::write(&meta_path, data)?;
160 Ok(())
161 }
162
163 fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
167 if event_bytes.len() > self.config.max_event_size {
168 return Err(AzothError::InvalidState(format!(
169 "Event size {} exceeds max_event_size {}",
170 event_bytes.len(),
171 self.config.max_event_size
172 )));
173 }
174
175 if event_bytes.len() > u32::MAX as usize {
176 return Err(AzothError::InvalidState(format!(
177 "Event size {} exceeds u32 encoding limit",
178 event_bytes.len()
179 )));
180 }
181
182 let mut writer = self.writer.lock();
183
184 writer.write_all(&event_id.to_be_bytes())?;
186
187 let size = event_bytes.len() as u32;
189 writer.write_all(&size.to_be_bytes())?;
190
191 writer.write_all(event_bytes)?;
193
194 Ok(())
195 }
196
197 fn check_rotation(&self) -> Result<Option<PathBuf>> {
199 let log_path = Self::log_file_path(
200 &self.config.base_dir,
201 self.current_file_num.load(Ordering::SeqCst),
202 );
203
204 let file_size = std::fs::metadata(&log_path)?.len();
205
206 if file_size >= self.config.max_file_size {
207 self.rotate_internal()
208 } else {
209 Ok(None)
210 }
211 }
212
213 fn rotate_internal(&self) -> Result<Option<PathBuf>> {
215 {
217 let mut writer = self.writer.lock();
218 writer.flush()?;
219 }
220
221 let old_file_num = self.current_file_num.load(Ordering::SeqCst);
222 let old_path = Self::log_file_path(&self.config.base_dir, old_file_num);
223
224 let new_file_num = old_file_num + 1;
226 self.current_file_num.store(new_file_num, Ordering::SeqCst);
227
228 {
230 let mut meta = self.meta.lock();
231 meta.current_file_num = new_file_num;
232 }
233 self.save_meta()?;
234
235 let new_path = Self::log_file_path(&self.config.base_dir, new_file_num);
237 let file = OpenOptions::new()
238 .create(true)
239 .append(true)
240 .open(&new_path)?;
241
242 {
244 let mut writer = self.writer.lock();
245 *writer = BufWriter::with_capacity(self.config.write_buffer_size, file);
246 }
247
248 tracing::info!(
249 "Rotated event log: {} -> {}",
250 old_path.display(),
251 new_path.display()
252 );
253
254 Ok(Some(old_path))
255 }
256}
257
258impl EventLog for FileEventLog {
259 fn append_with_id(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
260 self.write_event_entry(event_id, event_bytes)?;
262
263 if self.config.flush_on_append {
265 let mut writer = self.writer.lock();
266 writer.flush()?;
267 }
268
269 {
271 let mut meta = self.meta.lock();
272 meta.next_event_id = event_id + 1;
273 meta.total_events += 1;
274 }
275
276 self.next_event_id.store(event_id + 1, Ordering::SeqCst);
278
279 self.check_rotation()?;
281
282 if let Some(notify) = &self.event_notify {
284 notify.notify_waiters();
285 }
286
287 Ok(())
288 }
289
290 fn append_batch_with_ids(&self, first_event_id: EventId, events: &[Vec<u8>]) -> Result<()> {
291 if events.is_empty() {
292 return Err(AzothError::InvalidState("Cannot append empty batch".into()));
293 }
294
295 let total_size: usize = events
297 .iter()
298 .map(|e| 8 + 4 + e.len()) .sum();
300
301 if total_size > self.config.max_batch_bytes {
302 return Err(AzothError::InvalidState(format!(
303 "Batch size {} exceeds max_batch_bytes {}",
304 total_size, self.config.max_batch_bytes
305 )));
306 }
307
308 for event in events {
309 if event.len() > self.config.max_event_size {
310 return Err(AzothError::InvalidState(format!(
311 "Event size {} exceeds max_event_size {}",
312 event.len(),
313 self.config.max_event_size
314 )));
315 }
316 if event.len() > u32::MAX as usize {
317 return Err(AzothError::InvalidState(format!(
318 "Event size {} exceeds u32 encoding limit",
319 event.len()
320 )));
321 }
322 }
323
324 if total_size > self.config.batch_buffer_size {
326 for (i, event_bytes) in events.iter().enumerate() {
329 let event_id = first_event_id + i as u64;
330 self.write_event_entry(event_id, event_bytes)?;
331 }
332 } else {
333 let mut buffer = Vec::with_capacity(total_size);
335
336 for (i, event_bytes) in events.iter().enumerate() {
338 let event_id = first_event_id + i as u64;
339
340 buffer.extend_from_slice(&event_id.to_be_bytes());
342
343 let size = event_bytes.len() as u32;
345 buffer.extend_from_slice(&size.to_be_bytes());
346
347 buffer.extend_from_slice(event_bytes);
349 }
350
351 let mut writer = self.writer.lock();
353 writer.write_all(&buffer)?;
354 }
355
356 if self.config.flush_on_append {
358 let mut writer = self.writer.lock();
359 writer.flush()?;
360 }
361
362 let last_id = first_event_id + events.len() as u64 - 1;
364 {
365 let mut meta = self.meta.lock();
366 meta.next_event_id = last_id + 1;
367 meta.total_events += events.len() as u64;
368 }
369
370 self.next_event_id.store(last_id + 1, Ordering::SeqCst);
372
373 self.check_rotation()?;
375
376 if let Some(notify) = &self.event_notify {
378 notify.notify_waiters();
379 }
380
381 Ok(())
382 }
383
384 fn next_event_id(&self) -> Result<EventId> {
385 Ok(self.next_event_id.load(Ordering::SeqCst))
386 }
387
388 fn iter_range(
389 &self,
390 start: EventId,
391 end: Option<EventId>,
392 ) -> Result<Box<dyn EventLogIterator>> {
393 {
395 let mut writer = self.writer.lock();
396 writer.flush()?;
397 }
398
399 let meta = self.meta.lock();
400 let end_id = end.unwrap_or(meta.next_event_id);
401
402 Ok(Box::new(FileEventLogIter::new(
403 self.config.base_dir.clone(),
404 start,
405 end_id,
406 meta.current_file_num,
407 self.config.max_event_size,
408 )?))
409 }
410
411 fn get(&self, event_id: EventId) -> Result<Option<Vec<u8>>> {
412 let mut iter = self.iter_range(event_id, Some(event_id + 1))?;
414
415 match iter.next() {
416 Some(Ok((id, data))) if id == event_id => Ok(Some(data)),
417 Some(Ok(_)) => Ok(None),
418 Some(Err(e)) => Err(e),
419 None => Ok(None),
420 }
421 }
422
423 fn delete_range(&self, start: EventId, end: EventId) -> Result<usize> {
424 tracing::warn!("delete_range not yet implemented: {} to {}", start, end);
432 Ok(0)
433 }
434
435 fn rotate(&self) -> Result<PathBuf> {
436 self.rotate_internal()?
437 .ok_or_else(|| AzothError::InvalidState("No rotation needed".into()))
438 }
439
440 fn oldest_event_id(&self) -> Result<EventId> {
441 let meta = self.meta.lock();
442 Ok(meta.oldest_event_id)
443 }
444
445 fn newest_event_id(&self) -> Result<EventId> {
446 let next = self.next_event_id.load(Ordering::SeqCst);
447 if next == 0 {
448 Ok(0)
449 } else {
450 Ok(next - 1)
451 }
452 }
453
454 fn sync(&self) -> Result<()> {
455 let mut writer = self.writer.lock();
456 writer.flush()?;
457 writer.get_ref().sync_all()?;
458 self.save_meta()?;
459 Ok(())
460 }
461
462 fn stats(&self) -> Result<EventLogStats> {
463 let meta = self.meta.lock();
464
465 let mut total_bytes = 0u64;
467 let mut file_count = 0usize;
468
469 for file_num in 0..=meta.current_file_num {
470 let path = Self::log_file_path(&self.config.base_dir, file_num);
471 if path.exists() {
472 total_bytes += std::fs::metadata(&path)?.len();
473 file_count += 1;
474 }
475 }
476
477 Ok(EventLogStats {
478 event_count: meta.total_events,
479 oldest_event_id: meta.oldest_event_id,
480 newest_event_id: if meta.next_event_id == 0 {
481 0
482 } else {
483 meta.next_event_id - 1
484 },
485 total_bytes,
486 file_count,
487 })
488 }
489}
490
491impl Drop for FileEventLog {
493 fn drop(&mut self) {
494 if let Err(e) = self.sync() {
497 eprintln!("Warning: Failed to sync FileEventLog on drop: {}", e);
498 }
499 }
500}
501
502struct FileEventLogIter {
504 base_dir: PathBuf,
505 current_file_num: u64,
506 max_file_num: u64,
507 current_file: Option<File>,
508 next_event_id: EventId,
509 end_event_id: EventId,
510 max_event_size: usize,
511}
512
513impl FileEventLogIter {
514 fn new(
515 base_dir: PathBuf,
516 start: EventId,
517 end: EventId,
518 max_file_num: u64,
519 max_event_size: usize,
520 ) -> Result<Self> {
521 let mut iter = Self {
522 base_dir,
523 current_file_num: 0,
524 max_file_num,
525 current_file: None,
526 next_event_id: start,
527 end_event_id: end,
528 max_event_size,
529 };
530
531 iter.open_next_file()?;
533
534 Ok(iter)
535 }
536
537 fn open_next_file(&mut self) -> Result<bool> {
538 while self.current_file_num <= self.max_file_num {
539 let path = FileEventLog::log_file_path(&self.base_dir, self.current_file_num);
540
541 if path.exists() {
542 let file = File::open(&path)?;
543 self.current_file = Some(file);
544 return Ok(true);
545 }
546
547 self.current_file_num += 1;
548 }
549
550 Ok(false)
551 }
552
553 fn read_next_event(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
554 loop {
555 if self.next_event_id >= self.end_event_id {
556 return Ok(None);
557 }
558
559 let file = match self.current_file.as_mut() {
560 Some(f) => f,
561 None => return Ok(None),
562 };
563
564 let mut header = [0u8; 12];
566 match file.read_exact(&mut header) {
567 Ok(_) => {}
568 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
569 self.current_file_num += 1;
571 if !self.open_next_file()? {
572 return Ok(None);
573 }
574 continue;
575 }
576 Err(e) => return Err(e.into()),
577 }
578
579 let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap());
580 let size = u32::from_be_bytes(header[8..12].try_into().unwrap());
581
582 if size as usize > self.max_event_size {
583 return Err(AzothError::InvalidState(format!(
584 "Event {} size {} exceeds max_event_size {}",
585 event_id, size, self.max_event_size
586 )));
587 }
588
589 let mut data = vec![0u8; size as usize];
591 file.read_exact(&mut data)?;
592
593 if event_id < self.next_event_id {
595 continue;
596 }
597
598 self.next_event_id = event_id + 1;
599 return Ok(Some((event_id, data)));
600 }
601 }
602}
603
604impl Iterator for FileEventLogIter {
605 type Item = Result<(EventId, Vec<u8>)>;
606
607 fn next(&mut self) -> Option<Self::Item> {
608 match self.read_next_event() {
609 Ok(Some(event)) => Some(Ok(event)),
610 Ok(None) => None,
611 Err(e) => Some(Err(e)),
612 }
613 }
614}
615
616#[cfg(test)]
619mod tests {
620 use super::*;
621 use tempfile::TempDir;
622
623 fn setup() -> (FileEventLog, TempDir) {
624 let temp_dir = TempDir::new().unwrap();
625 let config = FileEventLogConfig {
626 base_dir: temp_dir.path().to_path_buf(),
627 max_file_size: 1024, write_buffer_size: 128,
629 batch_buffer_size: 4096,
630 max_event_size: 1024 * 1024,
631 max_batch_bytes: 16 * 1024 * 1024,
632 flush_on_append: true,
633 };
634 let log = FileEventLog::open(config).unwrap();
635 (log, temp_dir)
636 }
637
638 #[test]
639 fn test_append_and_read() {
640 let (log, _temp) = setup();
641
642 log.append_with_id(0, b"event 0").unwrap();
644 log.append_with_id(1, b"event 1").unwrap();
645 log.append_with_id(2, b"event 2").unwrap();
646
647 let mut iter = log.iter_range(0, None).unwrap();
649 assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
650 assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
651 assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
652 assert!(iter.next().is_none());
653 }
654
655 #[test]
656 fn test_batch_append() {
657 let (log, _temp) = setup();
658
659 let events = vec![
660 b"event 0".to_vec(),
661 b"event 1".to_vec(),
662 b"event 2".to_vec(),
663 ];
664
665 log.append_batch_with_ids(0, &events).unwrap();
666
667 let mut iter = log.iter_range(0, None).unwrap();
668 assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
669 assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
670 assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
671 }
672
673 #[test]
674 fn test_get_single_event() {
675 let (log, _temp) = setup();
676
677 log.append_with_id(0, b"event 0").unwrap();
678 log.append_with_id(1, b"event 1").unwrap();
679 log.append_with_id(2, b"event 2").unwrap();
680
681 assert_eq!(log.get(1).unwrap(), Some(b"event 1".to_vec()));
682 assert_eq!(log.get(99).unwrap(), None);
683 }
684
685 #[test]
686 fn test_stats() {
687 let (log, _temp) = setup();
688
689 log.append_with_id(0, b"event 0").unwrap();
690 log.append_with_id(1, b"event 1").unwrap();
691
692 log.sync().unwrap();
694
695 let stats = log.stats().unwrap();
696 assert_eq!(stats.event_count, 2);
697 assert_eq!(stats.oldest_event_id, 0);
698 assert_eq!(stats.newest_event_id, 1);
699 assert!(stats.total_bytes > 0);
700 assert_eq!(stats.file_count, 1);
701 }
702}