1use azoth_core::{
2 error::{AzothError, Result},
3 event_log::{EventLog, EventLogIterator, EventLogStats},
4 types::EventId,
5};
6use serde::{Deserialize, Serialize};
7use std::fs::{File, OpenOptions};
8use std::io::{BufWriter, Read, Write};
9use std::path::{Path, PathBuf};
10use std::sync::{
11 atomic::{AtomicU64, Ordering},
12 Arc, Mutex,
13};
14
15#[derive(Debug, Clone)]
17pub struct FileEventLogConfig {
18 pub base_dir: PathBuf,
20
21 pub max_file_size: u64,
23
24 pub write_buffer_size: usize,
26
27 pub batch_buffer_size: usize,
29}
30
31impl Default for FileEventLogConfig {
32 fn default() -> Self {
33 Self {
34 base_dir: PathBuf::from("./data/event-log"),
35 max_file_size: 512 * 1024 * 1024, write_buffer_size: 256 * 1024, batch_buffer_size: 1024 * 1024, }
39 }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, Default)]
44struct EventLogMeta {
45 next_event_id: EventId,
47
48 current_file_num: u64,
50
51 oldest_event_id: EventId,
53
54 total_events: u64,
56}
57
58pub struct FileEventLog {
60 config: FileEventLogConfig,
61 meta: Arc<Mutex<EventLogMeta>>,
62 next_event_id: Arc<AtomicU64>,
63 writer: Arc<Mutex<BufWriter<File>>>,
64 current_file_num: Arc<AtomicU64>,
65}
66
67impl FileEventLog {
68 pub fn open(config: FileEventLogConfig) -> Result<Self> {
70 std::fs::create_dir_all(&config.base_dir)?;
72
73 let meta_path = config.base_dir.join("meta.json");
75 let meta = if meta_path.exists() {
76 let data = std::fs::read_to_string(&meta_path)?;
77 serde_json::from_str(&data)
78 .map_err(|e| AzothError::Projection(format!("Failed to parse meta.json: {}", e)))?
79 } else {
80 EventLogMeta::default()
81 };
82
83 let next_event_id = Arc::new(AtomicU64::new(meta.next_event_id));
84 let current_file_num = Arc::new(AtomicU64::new(meta.current_file_num));
85
86 let log_path = Self::log_file_path(&config.base_dir, meta.current_file_num);
88 let file = OpenOptions::new()
89 .create(true)
90 .append(true)
91 .open(&log_path)?;
92 let writer = Arc::new(Mutex::new(BufWriter::with_capacity(
93 config.write_buffer_size,
94 file,
95 )));
96
97 Ok(Self {
98 config,
99 meta: Arc::new(Mutex::new(meta)),
100 next_event_id,
101 writer,
102 current_file_num,
103 })
104 }
105
106 fn log_file_path(base_dir: &Path, file_num: u64) -> PathBuf {
108 base_dir.join(format!("events-{:08}.log", file_num))
109 }
110
111 fn save_meta(&self) -> Result<()> {
113 let meta = self.meta.lock().unwrap();
114 let meta_path = self.config.base_dir.join("meta.json");
115 let data = serde_json::to_string(&*meta)
117 .map_err(|e| AzothError::Projection(format!("Failed to serialize meta: {}", e)))?;
118 std::fs::write(&meta_path, data)?;
119 Ok(())
120 }
121
122 fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
126 let mut writer = self.writer.lock().unwrap();
127
128 writer.write_all(&event_id.to_be_bytes())?;
130
131 let size = event_bytes.len() as u32;
133 writer.write_all(&size.to_be_bytes())?;
134
135 writer.write_all(event_bytes)?;
137
138 Ok(())
139 }
140
141 fn check_rotation(&self) -> Result<Option<PathBuf>> {
143 let log_path = Self::log_file_path(
144 &self.config.base_dir,
145 self.current_file_num.load(Ordering::SeqCst),
146 );
147
148 let file_size = std::fs::metadata(&log_path)?.len();
149
150 if file_size >= self.config.max_file_size {
151 self.rotate_internal()
152 } else {
153 Ok(None)
154 }
155 }
156
157 fn rotate_internal(&self) -> Result<Option<PathBuf>> {
159 {
161 let mut writer = self.writer.lock().unwrap();
162 writer.flush()?;
163 }
164
165 let old_file_num = self.current_file_num.load(Ordering::SeqCst);
166 let old_path = Self::log_file_path(&self.config.base_dir, old_file_num);
167
168 let new_file_num = old_file_num + 1;
170 self.current_file_num.store(new_file_num, Ordering::SeqCst);
171
172 {
174 let mut meta = self.meta.lock().unwrap();
175 meta.current_file_num = new_file_num;
176 }
177 self.save_meta()?;
178
179 let new_path = Self::log_file_path(&self.config.base_dir, new_file_num);
181 let file = OpenOptions::new()
182 .create(true)
183 .append(true)
184 .open(&new_path)?;
185
186 {
188 let mut writer = self.writer.lock().unwrap();
189 *writer = BufWriter::with_capacity(self.config.write_buffer_size, file);
190 }
191
192 tracing::info!(
193 "Rotated event log: {} -> {}",
194 old_path.display(),
195 new_path.display()
196 );
197
198 Ok(Some(old_path))
199 }
200}
201
202impl EventLog for FileEventLog {
203 fn append_with_id(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
204 self.write_event_entry(event_id, event_bytes)?;
206
207 {
209 let mut meta = self.meta.lock().unwrap();
210 meta.next_event_id = event_id + 1;
211 meta.total_events += 1;
212 }
213
214 self.next_event_id.store(event_id + 1, Ordering::SeqCst);
216
217 self.check_rotation()?;
219
220 Ok(())
221 }
222
223 fn append_batch_with_ids(&self, first_event_id: EventId, events: &[Vec<u8>]) -> Result<()> {
224 if events.is_empty() {
225 return Err(AzothError::InvalidState("Cannot append empty batch".into()));
226 }
227
228 let total_size: usize = events
230 .iter()
231 .map(|e| 8 + 4 + e.len()) .sum();
233
234 if total_size > self.config.batch_buffer_size {
236 for (i, event_bytes) in events.iter().enumerate() {
239 let event_id = first_event_id + i as u64;
240 self.write_event_entry(event_id, event_bytes)?;
241 }
242 } else {
243 let mut buffer = Vec::with_capacity(total_size);
245
246 for (i, event_bytes) in events.iter().enumerate() {
248 let event_id = first_event_id + i as u64;
249
250 buffer.extend_from_slice(&event_id.to_be_bytes());
252
253 let size = event_bytes.len() as u32;
255 buffer.extend_from_slice(&size.to_be_bytes());
256
257 buffer.extend_from_slice(event_bytes);
259 }
260
261 let mut writer = self.writer.lock().unwrap();
263 writer.write_all(&buffer)?;
264 }
265
266 {
268 let mut writer = self.writer.lock().unwrap();
269 writer.flush()?;
270 }
271
272 let last_id = first_event_id + events.len() as u64 - 1;
274 {
275 let mut meta = self.meta.lock().unwrap();
276 meta.next_event_id = last_id + 1;
277 meta.total_events += events.len() as u64;
278 }
279
280 self.next_event_id.store(last_id + 1, Ordering::SeqCst);
282
283 self.check_rotation()?;
285
286 Ok(())
287 }
288
289 fn next_event_id(&self) -> Result<EventId> {
290 Ok(self.next_event_id.load(Ordering::SeqCst))
291 }
292
293 fn iter_range(
294 &self,
295 start: EventId,
296 end: Option<EventId>,
297 ) -> Result<Box<dyn EventLogIterator>> {
298 {
300 let mut writer = self.writer.lock().unwrap();
301 writer.flush()?;
302 }
303
304 let meta = self.meta.lock().unwrap();
305 let end_id = end.unwrap_or(meta.next_event_id);
306
307 Ok(Box::new(FileEventLogIter::new(
308 self.config.base_dir.clone(),
309 start,
310 end_id,
311 meta.current_file_num,
312 )?))
313 }
314
315 fn get(&self, event_id: EventId) -> Result<Option<Vec<u8>>> {
316 let mut iter = self.iter_range(event_id, Some(event_id + 1))?;
318
319 match iter.next() {
320 Some(Ok((id, data))) if id == event_id => Ok(Some(data)),
321 Some(Ok(_)) => Ok(None),
322 Some(Err(e)) => Err(e),
323 None => Ok(None),
324 }
325 }
326
327 fn delete_range(&self, start: EventId, end: EventId) -> Result<usize> {
328 tracing::warn!("delete_range not yet implemented: {} to {}", start, end);
336 Ok(0)
337 }
338
339 fn rotate(&self) -> Result<PathBuf> {
340 self.rotate_internal()?
341 .ok_or_else(|| AzothError::InvalidState("No rotation needed".into()))
342 }
343
344 fn oldest_event_id(&self) -> Result<EventId> {
345 let meta = self.meta.lock().unwrap();
346 Ok(meta.oldest_event_id)
347 }
348
349 fn newest_event_id(&self) -> Result<EventId> {
350 let next = self.next_event_id.load(Ordering::SeqCst);
351 if next == 0 {
352 Ok(0)
353 } else {
354 Ok(next - 1)
355 }
356 }
357
358 fn sync(&self) -> Result<()> {
359 let mut writer = self.writer.lock().unwrap();
360 writer.flush()?;
361 writer.get_ref().sync_all()?;
362 self.save_meta()?;
363 Ok(())
364 }
365
366 fn stats(&self) -> Result<EventLogStats> {
367 let meta = self.meta.lock().unwrap();
368
369 let mut total_bytes = 0u64;
371 let mut file_count = 0usize;
372
373 for file_num in 0..=meta.current_file_num {
374 let path = Self::log_file_path(&self.config.base_dir, file_num);
375 if path.exists() {
376 total_bytes += std::fs::metadata(&path)?.len();
377 file_count += 1;
378 }
379 }
380
381 Ok(EventLogStats {
382 event_count: meta.total_events,
383 oldest_event_id: meta.oldest_event_id,
384 newest_event_id: if meta.next_event_id == 0 {
385 0
386 } else {
387 meta.next_event_id - 1
388 },
389 total_bytes,
390 file_count,
391 })
392 }
393}
394
395impl Drop for FileEventLog {
397 fn drop(&mut self) {
398 if let Err(e) = self.sync() {
401 eprintln!("Warning: Failed to sync FileEventLog on drop: {}", e);
402 }
403 }
404}
405
406struct FileEventLogIter {
408 base_dir: PathBuf,
409 current_file_num: u64,
410 max_file_num: u64,
411 current_file: Option<File>,
412 next_event_id: EventId,
413 end_event_id: EventId,
414}
415
416impl FileEventLogIter {
417 fn new(base_dir: PathBuf, start: EventId, end: EventId, max_file_num: u64) -> Result<Self> {
418 let mut iter = Self {
419 base_dir,
420 current_file_num: 0,
421 max_file_num,
422 current_file: None,
423 next_event_id: start,
424 end_event_id: end,
425 };
426
427 iter.open_next_file()?;
429
430 Ok(iter)
431 }
432
433 fn open_next_file(&mut self) -> Result<bool> {
434 while self.current_file_num <= self.max_file_num {
435 let path = FileEventLog::log_file_path(&self.base_dir, self.current_file_num);
436
437 if path.exists() {
438 let file = File::open(&path)?;
439 self.current_file = Some(file);
440 return Ok(true);
441 }
442
443 self.current_file_num += 1;
444 }
445
446 Ok(false)
447 }
448
449 fn read_next_event(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
450 loop {
451 if self.next_event_id >= self.end_event_id {
452 return Ok(None);
453 }
454
455 let file = match self.current_file.as_mut() {
456 Some(f) => f,
457 None => return Ok(None),
458 };
459
460 let mut header = [0u8; 12];
462 match file.read_exact(&mut header) {
463 Ok(_) => {}
464 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
465 self.current_file_num += 1;
467 if !self.open_next_file()? {
468 return Ok(None);
469 }
470 continue;
471 }
472 Err(e) => return Err(e.into()),
473 }
474
475 let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap());
476 let size = u32::from_be_bytes(header[8..12].try_into().unwrap());
477
478 let mut data = vec![0u8; size as usize];
480 file.read_exact(&mut data)?;
481
482 if event_id < self.next_event_id {
484 continue;
485 }
486
487 self.next_event_id = event_id + 1;
488 return Ok(Some((event_id, data)));
489 }
490 }
491}
492
493impl Iterator for FileEventLogIter {
494 type Item = Result<(EventId, Vec<u8>)>;
495
496 fn next(&mut self) -> Option<Self::Item> {
497 match self.read_next_event() {
498 Ok(Some(event)) => Some(Ok(event)),
499 Ok(None) => None,
500 Err(e) => Some(Err(e)),
501 }
502 }
503}
504
505#[cfg(test)]
508mod tests {
509 use super::*;
510 use tempfile::TempDir;
511
512 fn setup() -> (FileEventLog, TempDir) {
513 let temp_dir = TempDir::new().unwrap();
514 let config = FileEventLogConfig {
515 base_dir: temp_dir.path().to_path_buf(),
516 max_file_size: 1024, write_buffer_size: 128,
518 batch_buffer_size: 4096,
519 };
520 let log = FileEventLog::open(config).unwrap();
521 (log, temp_dir)
522 }
523
524 #[test]
525 fn test_append_and_read() {
526 let (log, _temp) = setup();
527
528 log.append_with_id(0, b"event 0").unwrap();
530 log.append_with_id(1, b"event 1").unwrap();
531 log.append_with_id(2, b"event 2").unwrap();
532
533 let mut iter = log.iter_range(0, None).unwrap();
535 assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
536 assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
537 assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
538 assert!(iter.next().is_none());
539 }
540
541 #[test]
542 fn test_batch_append() {
543 let (log, _temp) = setup();
544
545 let events = vec![
546 b"event 0".to_vec(),
547 b"event 1".to_vec(),
548 b"event 2".to_vec(),
549 ];
550
551 log.append_batch_with_ids(0, &events).unwrap();
552
553 let mut iter = log.iter_range(0, None).unwrap();
554 assert_eq!(iter.next().unwrap().unwrap(), (0, b"event 0".to_vec()));
555 assert_eq!(iter.next().unwrap().unwrap(), (1, b"event 1".to_vec()));
556 assert_eq!(iter.next().unwrap().unwrap(), (2, b"event 2".to_vec()));
557 }
558
559 #[test]
560 fn test_get_single_event() {
561 let (log, _temp) = setup();
562
563 log.append_with_id(0, b"event 0").unwrap();
564 log.append_with_id(1, b"event 1").unwrap();
565 log.append_with_id(2, b"event 2").unwrap();
566
567 assert_eq!(log.get(1).unwrap(), Some(b"event 1".to_vec()));
568 assert_eq!(log.get(99).unwrap(), None);
569 }
570
571 #[test]
572 fn test_stats() {
573 let (log, _temp) = setup();
574
575 log.append_with_id(0, b"event 0").unwrap();
576 log.append_with_id(1, b"event 1").unwrap();
577
578 log.sync().unwrap();
580
581 let stats = log.stats().unwrap();
582 assert_eq!(stats.event_count, 2);
583 assert_eq!(stats.oldest_event_id, 0);
584 assert_eq!(stats.newest_event_id, 1);
585 assert!(stats.total_bytes > 0);
586 assert_eq!(stats.file_count, 1);
587 }
588}