1use azoth_core::{
2 error::{AzothError, Result},
3 event_log::{EventLog, EventLogIterator, EventLogStats},
4 types::EventId,
5};
6use serde::{Deserialize, Serialize};
7use std::fs::{File, OpenOptions};
8use std::io::{BufWriter, Read, Write};
9use std::path::{Path, PathBuf};
10use std::sync::{
11 atomic::{AtomicU64, Ordering},
12 Arc, Mutex,
13};
14
15#[derive(Debug, Clone)]
17pub struct FileEventLogConfig {
18 pub base_dir: PathBuf,
20
21 pub max_file_size: u64,
23
24 pub write_buffer_size: usize,
26
27 pub batch_buffer_size: usize,
29
30 pub max_event_size: usize,
32
33 pub max_batch_bytes: usize,
35}
36
37impl Default for FileEventLogConfig {
38 fn default() -> Self {
39 Self {
40 base_dir: PathBuf::from("./data/event-log"),
41 max_file_size: 512 * 1024 * 1024, write_buffer_size: 256 * 1024, batch_buffer_size: 1024 * 1024, max_event_size: 4 * 1024 * 1024, max_batch_bytes: 64 * 1024 * 1024, }
47 }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, Default)]
52struct EventLogMeta {
53 next_event_id: EventId,
55
56 current_file_num: u64,
58
59 oldest_event_id: EventId,
61
62 total_events: u64,
64}
65
66pub struct FileEventLog {
68 config: FileEventLogConfig,
69 meta: Arc<Mutex<EventLogMeta>>,
70 next_event_id: Arc<AtomicU64>,
71 writer: Arc<Mutex<BufWriter<File>>>,
72 current_file_num: Arc<AtomicU64>,
73}
74
75impl FileEventLog {
76 pub fn open(config: FileEventLogConfig) -> Result<Self> {
78 std::fs::create_dir_all(&config.base_dir)?;
80
81 let meta_path = config.base_dir.join("meta.json");
83 let meta = if meta_path.exists() {
84 let data = std::fs::read_to_string(&meta_path)?;
85 serde_json::from_str(&data)
86 .map_err(|e| AzothError::Projection(format!("Failed to parse meta.json: {}", e)))?
87 } else {
88 EventLogMeta::default()
89 };
90
91 let next_event_id = Arc::new(AtomicU64::new(meta.next_event_id));
92 let current_file_num = Arc::new(AtomicU64::new(meta.current_file_num));
93
94 let log_path = Self::log_file_path(&config.base_dir, meta.current_file_num);
96 let file = OpenOptions::new()
97 .create(true)
98 .append(true)
99 .open(&log_path)?;
100 let writer = Arc::new(Mutex::new(BufWriter::with_capacity(
101 config.write_buffer_size,
102 file,
103 )));
104
105 Ok(Self {
106 config,
107 meta: Arc::new(Mutex::new(meta)),
108 next_event_id,
109 writer,
110 current_file_num,
111 })
112 }
113
114 fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
116 base_dir.join(format!("events-{:08}.log", file_num))
117 }
118
119 fn save_meta(&self) -> Result<()> {
121 let meta = self.meta.lock().unwrap();
122 let meta_path = self.config.base_dir.join("meta.json");
123 let data = serde_json::to_string(&*meta)
125 .map_err(|e| AzothError::Projection(format!("Failed to serialize meta: {}", e)))?;
126 std::fs::write(&meta_path, data)?;
127 Ok(())
128 }
129
130 fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
134 if event_bytes.len() > self.config.max_event_size {
135 return Err(AzothError::InvalidState(format!(
136 "Event size {} exceeds max_event_size {}",
137 event_bytes.len(),
138 self.config.max_event_size
139 )));
140 }
141
142 if event_bytes.len() > u32::MAX as usize {
143 return Err(AzothError::InvalidState(format!(
144 "Event size {} exceeds u32 encoding limit",
145 event_bytes.len()
146 )));
147 }
148
149 let mut writer = self.writer.lock().unwrap();
150
151 writer.write_all(&event_id.to_be_bytes())?;
153
154 let size = event_bytes.len() as u32;
156 writer.write_all(&size.to_be_bytes())?;
157
158 writer.write_all(event_bytes)?;
160
161 Ok(())
162 }
163
164 fn check_rotation(&self) -> Result<Option<PathBuf>> {
166 let log_path = Self::log_file_path(
167 &self.config.base_dir,
168 self.current_file_num.load(Ordering::SeqCst),
169 );
170
171 let file_size = std::fs::metadata(&log_path)?.len();
172
173 if file_size >= self.config.max_file_size {
174 self.rotate_internal()
175 } else {
176 Ok(None)
177 }
178 }
179
180 fn rotate_internal(&self) -> Result<Option<PathBuf>> {
182 {
184 let mut writer = self.writer.lock().unwrap();
185 writer.flush()?;
186 }
187
188 let old_file_num = self.current_file_num.load(Ordering::SeqCst);
189 let old_path = Self::log_file_path(&self.config.base_dir, old_file_num);
190
191 let new_file_num = old_file_num + 1;
193 self.current_file_num.store(new_file_num, Ordering::SeqCst);
194
195 {
197 let mut meta = self.meta.lock().unwrap();
198 meta.current_file_num = new_file_num;
199 }
200 self.save_meta()?;
201
202 let new_path = Self::log_file_path(&self.config.base_dir, new_file_num);
204 let file = OpenOptions::new()
205 .create(true)
206 .append(true)
207 .open(&new_path)?;
208
209 {
211 let mut writer = self.writer.lock().unwrap();
212 *writer = BufWriter::with_capacity(self.config.write_buffer_size, file);
213 }
214
215 tracing::info!(
216 "Rotated event log: {} -> {}",
217 old_path.display(),
218 new_path.display()
219 );
220
221 Ok(Some(old_path))
222 }
223}
224
225impl EventLog for FileEventLog {
226 fn append_with_id(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
227 self.write_event_entry(event_id, event_bytes)?;
229
230 {
232 let mut meta = self.meta.lock().unwrap();
233 meta.next_event_id = event_id + 1;
234 meta.total_events += 1;
235 }
236
237 self.next_event_id.store(event_id + 1, Ordering::SeqCst);
239
240 self.check_rotation()?;
242
243 Ok(())
244 }
245
246 fn append_batch_with_ids(&self, first_event_id: EventId, events: &[Vec<u8>]) -> Result<()> {
247 if events.is_empty() {
248 return Err(AzothError::InvalidState("Cannot append empty batch".into()));
249 }
250
251 let total_size: usize = events
253 .iter()
254 .map(|e| 8 + 4 + e.len()) .sum();
256
257 if total_size > self.config.max_batch_bytes {
258 return Err(AzothError::InvalidState(format!(
259 "Batch size {} exceeds max_batch_bytes {}",
260 total_size, self.config.max_batch_bytes
261 )));
262 }
263
264 for event in events {
265 if event.len() > self.config.max_event_size {
266 return Err(AzothError::InvalidState(format!(
267 "Event size {} exceeds max_event_size {}",
268 event.len(),
269 self.config.max_event_size
270 )));
271 }
272 if event.len() > u32::MAX as usize {
273 return Err(AzothError::InvalidState(format!(
274 "Event size {} exceeds u32 encoding limit",
275 event.len()
276 )));
277 }
278 }
279
280 if total_size > self.config.batch_buffer_size {
282 for (i, event_bytes) in events.iter().enumerate() {
285 let event_id = first_event_id + i as u64;
286 self.write_event_entry(event_id, event_bytes)?;
287 }
288 } else {
289 let mut buffer = Vec::with_capacity(total_size);
291
292 for (i, event_bytes) in events.iter().enumerate() {
294 let event_id = first_event_id + i as u64;
295
296 buffer.extend_from_slice(&event_id.to_be_bytes());
298
299 let size = event_bytes.len() as u32;
301 buffer.extend_from_slice(&size.to_be_bytes());
302
303 buffer.extend_from_slice(event_bytes);
305 }
306
307 let mut writer = self.writer.lock().unwrap();
309 writer.write_all(&buffer)?;
310 }
311
312 {
314 let mut writer = self.writer.lock().unwrap();
315 writer.flush()?;
316 }
317
318 let last_id = first_event_id + events.len() as u64 - 1;
320 {
321 let mut meta = self.meta.lock().unwrap();
322 meta.next_event_id = last_id + 1;
323 meta.total_events += events.len() as u64;
324 }
325
326 self.next_event_id.store(last_id + 1, Ordering::SeqCst);
328
329 self.check_rotation()?;
331
332 Ok(())
333 }
334
335 fn next_event_id(&self) -> Result<EventId> {
336 Ok(self.next_event_id.load(Ordering::SeqCst))
337 }
338
339 fn iter_range(
340 &self,
341 start: EventId,
342 end: Option<EventId>,
343 ) -> Result<Box<dyn EventLogIterator>> {
344 {
346 let mut writer = self.writer.lock().unwrap();
347 writer.flush()?;
348 }
349
350 let meta = self.meta.lock().unwrap();
351 let end_id = end.unwrap_or(meta.next_event_id);
352
353 Ok(Box::new(FileEventLogIter::new(
354 self.config.base_dir.clone(),
355 start,
356 end_id,
357 meta.current_file_num,
358 self.config.max_event_size,
359 )?))
360 }
361
362 fn get(&self, event_id: EventId) -> Result<Option<Vec<u8>>> {
363 let mut iter = self.iter_range(event_id, Some(event_id + 1))?;
365
366 match iter.next() {
367 Some(Ok((id, data))) if id == event_id => Ok(Some(data)),
368 Some(Ok(_)) => Ok(None),
369 Some(Err(e)) => Err(e),
370 None => Ok(None),
371 }
372 }
373
374 fn delete_range(&self, start: EventId, end: EventId) -> Result<usize> {
375 tracing::warn!("delete_range not yet implemented: {} to {}", start, end);
383 Ok(0)
384 }
385
386 fn rotate(&self) -> Result<PathBuf> {
387 self.rotate_internal()?
388 .ok_or_else(|| AzothError::InvalidState("No rotation needed".into()))
389 }
390
391 fn oldest_event_id(&self) -> Result<EventId> {
392 let meta = self.meta.lock().unwrap();
393 Ok(meta.oldest_event_id)
394 }
395
396 fn newest_event_id(&self) -> Result<EventId> {
397 let next = self.next_event_id.load(Ordering::SeqCst);
398 if next == 0 {
399 Ok(0)
400 } else {
401 Ok(next - 1)
402 }
403 }
404
405 fn sync(&self) -> Result<()> {
406 let mut writer = self.writer.lock().unwrap();
407 writer.flush()?;
408 writer.get_ref().sync_all()?;
409 self.save_meta()?;
410 Ok(())
411 }
412
413 fn stats(&self) -> Result<EventLogStats> {
414 let meta = self.meta.lock().unwrap();
415
416 let mut total_bytes = 0u64;
418 let mut file_count = 0usize;
419
420 for file_num in 0..=meta.current_file_num {
421 let path = Self::log_file_path(&self.config.base_dir, file_num);
422 if path.exists() {
423 total_bytes += std::fs::metadata(&path)?.len();
424 file_count += 1;
425 }
426 }
427
428 Ok(EventLogStats {
429 event_count: meta.total_events,
430 oldest_event_id: meta.oldest_event_id,
431 newest_event_id: if meta.next_event_id == 0 {
432 0
433 } else {
434 meta.next_event_id - 1
435 },
436 total_bytes,
437 file_count,
438 })
439 }
440}
441
442impl Drop for FileEventLog {
444 fn drop(&mut self) {
445 if let Err(e) = self.sync() {
448 eprintln!("Warning: Failed to sync FileEventLog on drop: {}", e);
449 }
450 }
451}
452
453struct FileEventLogIter {
455 base_dir: PathBuf,
456 current_file_num: u64,
457 max_file_num: u64,
458 current_file: Option<File>,
459 next_event_id: EventId,
460 end_event_id: EventId,
461 max_event_size: usize,
462}
463
464impl FileEventLogIter {
465 fn new(
466 base_dir: PathBuf,
467 start: EventId,
468 end: EventId,
469 max_file_num: u64,
470 max_event_size: usize,
471 ) -> Result<Self> {
472 let mut iter = Self {
473 base_dir,
474 current_file_num: 0,
475 max_file_num,
476 current_file: None,
477 next_event_id: start,
478 end_event_id: end,
479 max_event_size,
480 };
481
482 iter.open_next_file()?;
484
485 Ok(iter)
486 }
487
488 fn open_next_file(&mut self) -> Result<bool> {
489 while self.current_file_num <= self.max_file_num {
490 let path = FileEventLog::log_file_path(&self.base_dir, self.current_file_num);
491
492 if path.exists() {
493 let file = File::open(&path)?;
494 self.current_file = Some(file);
495 return Ok(true);
496 }
497
498 self.current_file_num += 1;
499 }
500
501 Ok(false)
502 }
503
504 fn read_next_event(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
505 loop {
506 if self.next_event_id >= self.end_event_id {
507 return Ok(None);
508 }
509
510 let file = match self.current_file.as_mut() {
511 Some(f) => f,
512 None => return Ok(None),
513 };
514
515 let mut header = [0u8; 12];
517 match file.read_exact(&mut header) {
518 Ok(_) => {}
519 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
520 self.current_file_num += 1;
522 if !self.open_next_file()? {
523 return Ok(None);
524 }
525 continue;
526 }
527 Err(e) => return Err(e.into()),
528 }
529
530 let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap());
531 let size = u32::from_be_bytes(header[8..12].try_into().unwrap());
532
533 if size as usize > self.max_event_size {
534 return Err(AzothError::InvalidState(format!(
535 "Event {} size {} exceeds max_event_size {}",
536 event_id, size, self.max_event_size
537 )));
538 }
539
540 let mut data = vec![0u8; size as usize];
542 file.read_exact(&mut data)?;
543
544 if event_id < self.next_event_id {
546 continue;
547 }
548
549 self.next_event_id = event_id + 1;
550 return Ok(Some((event_id, data)));
551 }
552 }
553}
554
555impl Iterator for FileEventLogIter {
556 type Item = Result<(EventId, Vec<u8>)>;
557
558 fn next(&mut self) -> Option<Self::Item> {
559 match self.read_next_event() {
560 Ok(Some(event)) => Some(Ok(event)),
561 Ok(None) => None,
562 Err(e) => Some(Err(e)),
563 }
564 }
565}
566
567#[cfg(test)]
570mod tests {
571 use super::*;
572 use tempfile::TempDir;
573
574 fn setup() -> (FileEventLog, TempDir) {
575 let temp_dir = TempDir::new().unwrap();
576 let config = FileEventLogConfig {
577 base_dir: temp_dir.path().to_path_buf(),
578 max_file_size: 1024, write_buffer_size: 128,
580 batch_buffer_size: 4096,
581 max_event_size: 1024 * 1024,
582 max_batch_bytes: 16 * 1024 * 1024,
583 };
584 let log = FileEventLog::open(config).unwrap();
585 (log, temp_dir)
586 }
587
588 #[test]
589 fn test_append_and_read() {
590 let (log, _temp) = setup();
591
592 log.append_with_id(0, b"event 0").unwrap();
594 log.append_with_id(1, b"event 1").unwrap();
595 log.append_with_id(2, b"event 2").unwrap();
596
597 let mut iter = log.iter_range(0, None).unwrap();
599 assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
600 assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
601 assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
602 assert!(iter.next().is_none());
603 }
604
605 #[test]
606 fn test_batch_append() {
607 let (log, _temp) = setup();
608
609 let events = vec![
610 b"event 0".to_vec(),
611 b"event 1".to_vec(),
612 b"event 2".to_vec(),
613 ];
614
615 log.append_batch_with_ids(0, &events).unwrap();
616
617 let mut iter = log.iter_range(0, None).unwrap();
618 assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
619 assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
620 assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
621 }
622
623 #[test]
624 fn test_get_single_event() {
625 let (log, _temp) = setup();
626
627 log.append_with_id(0, b"event 0").unwrap();
628 log.append_with_id(1, b"event 1").unwrap();
629 log.append_with_id(2, b"event 2").unwrap();
630
631 assert_eq!(log.get(1).unwrap(), Some(b"event 1".to_vec()));
632 assert_eq!(log.get(99).unwrap(), None);
633 }
634
635 #[test]
636 fn test_stats() {
637 let (log, _temp) = setup();
638
639 log.append_with_id(0, b"event 0").unwrap();
640 log.append_with_id(1, b"event 1").unwrap();
641
642 log.sync().unwrap();
644
645 let stats = log.stats().unwrap();
646 assert_eq!(stats.event_count, 2);
647 assert_eq!(stats.oldest_event_id, 0);
648 assert_eq!(stats.newest_event_id, 1);
649 assert!(stats.total_bytes > 0);
650 assert_eq!(stats.file_count, 1);
651 }
652}