use super::error::JournalError;
use super::journal::{ENTRY_CRC_SIZE, ENTRY_HEADER_SIZE, Journal, JournalEntry, JournalReadIter};
use super::types::SequencerEvent;
use memmap2::MmapMut;
use serde::{Deserialize, Serialize};
use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
const DEFAULT_SEGMENT_SIZE: usize = 256 * 1024 * 1024;
struct SegmentWriter {
mmap: MmapMut,
write_pos: usize,
capacity: usize,
path: PathBuf,
}
impl SegmentWriter {
fn create(path: &Path, capacity: usize) -> Result<Self, JournalError> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path)
.map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.to_path_buf()),
})?;
file.set_len(capacity as u64)
.map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.to_path_buf()),
})?;
let mmap = unsafe {
MmapMut::map_mut(&file).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.to_path_buf()),
})?
};
Ok(Self {
mmap,
write_pos: 0,
capacity,
path: path.to_path_buf(),
})
}
fn open_existing(path: &Path) -> Result<Self, JournalError> {
let file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.to_path_buf()),
})?;
let metadata = file.metadata().map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.to_path_buf()),
})?;
let capacity = metadata.len() as usize;
let mmap = unsafe {
MmapMut::map_mut(&file).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.to_path_buf()),
})?
};
let write_pos = scan_write_position(&mmap, capacity);
Ok(Self {
mmap,
write_pos,
capacity,
path: path.to_path_buf(),
})
}
#[inline]
fn remaining(&self) -> usize {
self.capacity.saturating_sub(self.write_pos)
}
fn write_entry(&mut self, entry_bytes: &[u8]) -> Result<(), JournalError> {
let end =
self.write_pos
.checked_add(entry_bytes.len())
.ok_or(JournalError::EntryTooLarge {
entry_bytes: entry_bytes.len(),
segment_size: self.capacity,
})?;
if end > self.capacity {
return Err(JournalError::EntryTooLarge {
entry_bytes: entry_bytes.len(),
segment_size: self.capacity,
});
}
self.mmap[self.write_pos..end].copy_from_slice(entry_bytes);
self.mmap
.flush_range(self.write_pos, entry_bytes.len())
.map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(self.path.clone()),
})?;
self.write_pos = end;
Ok(())
}
}
pub struct FileJournal<T> {
dir: PathBuf,
writer: Mutex<SegmentWriter>,
segment_size: usize,
segment_start_seq: Mutex<u64>,
last_seq: Mutex<Option<u64>>,
_phantom: PhantomData<T>,
}
impl<T> FileJournal<T>
where
T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
{
pub fn open<P: AsRef<Path>>(dir: P) -> Result<Self, JournalError> {
Self::open_with_segment_size(dir, DEFAULT_SEGMENT_SIZE)
}
pub fn open_with_segment_size<P: AsRef<Path>>(
dir: P,
segment_size: usize,
) -> Result<Self, JournalError> {
let dir = dir.as_ref().to_path_buf();
fs::create_dir_all(&dir).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(dir.clone()),
})?;
let mut segments = list_segments(&dir)?;
segments.sort();
let (writer, segment_start_seq, last_seq) = if let Some(latest) = segments.last() {
let path = segment_path(&dir, *latest);
let seg = SegmentWriter::open_existing(&path)?;
let last = scan_last_sequence(&seg.mmap, seg.write_pos);
(seg, *latest, last)
} else {
let path = segment_path(&dir, 0);
let seg = SegmentWriter::create(&path, segment_size)?;
(seg, 0, None)
};
Ok(Self {
dir,
writer: Mutex::new(writer),
segment_size,
segment_start_seq: Mutex::new(segment_start_seq),
last_seq: Mutex::new(last_seq),
_phantom: PhantomData,
})
}
pub fn archive_segments_before(&self, before_sequence: u64) -> Result<usize, JournalError> {
let segments = list_segments(&self.dir)?;
let mut archived = 0usize;
let active_start = self
.segment_start_seq
.lock()
.map_err(|_| JournalError::MutexPoisoned)?;
for start_seq in segments {
if start_seq < before_sequence && start_seq != *active_start {
let src = segment_path(&self.dir, start_seq);
let mut dst = src.clone();
dst.set_extension("journal.archived");
fs::rename(&src, &dst).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(src),
})?;
archived = archived.saturating_add(1);
}
}
Ok(archived)
}
fn rotate_segment(
&self,
writer: &mut SegmentWriter,
start_seq: u64,
) -> Result<(), JournalError> {
let old_path = writer.path.clone();
let old_len = writer.write_pos;
writer.mmap.flush().map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(old_path.clone()),
})?;
let new_path = segment_path(&self.dir, start_seq);
let new_writer = SegmentWriter::create(&new_path, self.segment_size)?;
*writer = new_writer;
if let Ok(file) = OpenOptions::new().write(true).open(&old_path) {
let _ = file.set_len(old_len as u64);
}
if let Ok(mut start) = self.segment_start_seq.lock() {
*start = start_seq;
}
Ok(())
}
fn encode_entry(event: &SequencerEvent<T>) -> Result<Vec<u8>, JournalError> {
let payload = serde_json::to_vec(event).map_err(|e| JournalError::SerializationError {
message: e.to_string(),
})?;
let payload_len = payload.len();
let entry_length = 8u32
.checked_add(8)
.and_then(|v| v.checked_add(payload_len as u32))
.and_then(|v| v.checked_add(4))
.ok_or(JournalError::SerializationError {
message: "entry size overflow".to_string(),
})?;
let total_bytes =
(entry_length as usize)
.checked_add(4)
.ok_or(JournalError::SerializationError {
message: "total entry size overflow".to_string(),
})?;
let mut buf = Vec::with_capacity(total_bytes);
buf.write_all(&entry_length.to_le_bytes()).map_err(|e| {
JournalError::SerializationError {
message: e.to_string(),
}
})?;
buf.write_all(&event.sequence_num.to_le_bytes())
.map_err(|e| JournalError::SerializationError {
message: e.to_string(),
})?;
buf.write_all(&event.timestamp_ns.to_le_bytes())
.map_err(|e| JournalError::SerializationError {
message: e.to_string(),
})?;
buf.write_all(&payload)
.map_err(|e| JournalError::SerializationError {
message: e.to_string(),
})?;
let crc_data = &buf[4..]; let crc_end = crc_data.len().saturating_sub(0); let crc = crc32fast::hash(&crc_data[..crc_end]);
buf.write_all(&crc.to_le_bytes())
.map_err(|e| JournalError::SerializationError {
message: e.to_string(),
})?;
Ok(buf)
}
}
impl<T> Journal<T> for FileJournal<T>
where
T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
{
fn append(&self, event: &SequencerEvent<T>) -> Result<(), JournalError> {
let entry_bytes = Self::encode_entry(event)?;
let mut writer = self
.writer
.lock()
.map_err(|_| JournalError::MutexPoisoned)?;
if writer.remaining() < entry_bytes.len() {
self.rotate_segment(&mut writer, event.sequence_num)?;
}
if writer.remaining() < entry_bytes.len() {
return Err(JournalError::EntryTooLarge {
entry_bytes: entry_bytes.len(),
segment_size: self.segment_size,
});
}
writer.write_entry(&entry_bytes)?;
if let Ok(mut last) = self.last_seq.lock() {
*last = Some(event.sequence_num);
}
Ok(())
}
fn read_from(&self, sequence: u64) -> Result<JournalReadIter<T>, JournalError> {
let mut segments = list_segments(&self.dir)?;
segments.sort();
let start_idx = match segments.binary_search(&sequence) {
Ok(idx) => idx,
Err(0) => 0,
Err(idx) => idx.saturating_sub(1),
};
let dir = self.dir.clone();
let segments_from: Vec<u64> = segments.into_iter().skip(start_idx).collect();
let iter = SegmentIterator::<T> {
dir,
segments: segments_from,
segment_idx: 0,
offset: 0,
mmap: None,
mmap_len: 0,
start_sequence: sequence,
started: false,
_phantom: PhantomData,
};
Ok(Box::new(iter))
}
fn last_sequence(&self) -> Option<u64> {
self.last_seq.lock().ok().and_then(|guard| *guard)
}
fn verify_integrity(&self) -> Result<(), JournalError> {
let mut segments = list_segments(&self.dir)?;
segments.sort();
for start_seq in segments {
let path = segment_path(&self.dir, start_seq);
let file = File::open(&path).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.clone()),
})?;
let mmap = unsafe {
memmap2::Mmap::map(&file).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.clone()),
})?
};
let data = &mmap[..];
let mut offset = 0usize;
while offset.checked_add(ENTRY_HEADER_SIZE).is_some()
&& offset + ENTRY_HEADER_SIZE <= data.len()
{
let el_bytes =
data.get(offset..offset + 4)
.ok_or(JournalError::InvalidEntryHeader {
offset,
message: "truncated entry_length".to_string(),
})?;
let entry_length =
u32::from_le_bytes([el_bytes[0], el_bytes[1], el_bytes[2], el_bytes[3]])
as usize;
if entry_length == 0 {
break; }
let entry_end = offset
.checked_add(4)
.and_then(|v| v.checked_add(entry_length));
let entry_end = match entry_end {
Some(end) if end <= data.len() => end,
_ => {
return Err(JournalError::InvalidEntryHeader {
offset,
message: "truncated entry (extends beyond segment data)".to_string(),
});
}
};
let crc_start = entry_end.checked_sub(ENTRY_CRC_SIZE).ok_or(
JournalError::InvalidEntryHeader {
offset,
message: "entry too small for CRC".to_string(),
},
)?;
let payload_start =
offset
.checked_add(4)
.ok_or(JournalError::InvalidEntryHeader {
offset,
message: "offset overflow".to_string(),
})?;
let crc_bytes =
data.get(crc_start..entry_end)
.ok_or(JournalError::InvalidEntryHeader {
offset,
message: "truncated CRC".to_string(),
})?;
let stored_crc =
u32::from_le_bytes([crc_bytes[0], crc_bytes[1], crc_bytes[2], crc_bytes[3]]);
let checksummed_data =
data.get(payload_start..crc_start)
.ok_or(JournalError::InvalidEntryHeader {
offset,
message: "truncated payload".to_string(),
})?;
let computed_crc = crc32fast::hash(checksummed_data);
if stored_crc != computed_crc {
let seq_bytes = data.get(payload_start..payload_start + 8).ok_or(
JournalError::InvalidEntryHeader {
offset,
message: "truncated sequence_num".to_string(),
},
)?;
let seq = u64::from_le_bytes([
seq_bytes[0],
seq_bytes[1],
seq_bytes[2],
seq_bytes[3],
seq_bytes[4],
seq_bytes[5],
seq_bytes[6],
seq_bytes[7],
]);
return Err(JournalError::CorruptEntry {
sequence: seq,
expected_crc: stored_crc,
actual_crc: computed_crc,
});
}
offset = entry_end;
}
}
Ok(())
}
}
impl<T> std::fmt::Debug for FileJournal<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileJournal")
.field("dir", &self.dir)
.field("segment_size", &self.segment_size)
.field("last_seq", &self.last_seq.lock().ok().and_then(|g| *g))
.finish()
}
}
struct SegmentIterator<T> {
dir: PathBuf,
segments: Vec<u64>,
segment_idx: usize,
offset: usize,
mmap: Option<memmap2::Mmap>,
mmap_len: usize,
start_sequence: u64,
started: bool,
_phantom: PhantomData<T>,
}
impl<T> SegmentIterator<T>
where
T: for<'de> Deserialize<'de> + Clone + 'static,
{
fn load_next_segment(&mut self) -> Result<bool, JournalError> {
if self.segment_idx >= self.segments.len() {
return Ok(false);
}
let start_seq = self.segments[self.segment_idx];
let path = segment_path(&self.dir, start_seq);
self.segment_idx = self.segment_idx.saturating_add(1);
self.offset = 0;
let file = File::open(&path).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path.clone()),
})?;
let mmap = unsafe {
memmap2::Mmap::map(&file).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(path),
})?
};
self.mmap_len = mmap.len();
self.mmap = Some(mmap);
Ok(true)
}
fn decode_next(&mut self) -> Option<Result<JournalEntry<T>, JournalError>> {
let mmap = self.mmap.as_ref()?;
let data = &mmap[..];
if self.offset.checked_add(ENTRY_HEADER_SIZE).is_none()
|| self.offset + ENTRY_HEADER_SIZE > data.len()
{
return None;
}
let el_bytes = data.get(self.offset..self.offset + 4)?;
let entry_length =
u32::from_le_bytes([el_bytes[0], el_bytes[1], el_bytes[2], el_bytes[3]]) as usize;
if entry_length == 0 {
return None; }
let entry_end = self.offset.checked_add(4)?.checked_add(entry_length)?;
if entry_end > data.len() {
return None; }
let payload_start = self.offset.checked_add(4)?;
let crc_start = entry_end.checked_sub(ENTRY_CRC_SIZE)?;
let crc_bytes = data.get(crc_start..entry_end)?;
let stored_crc =
u32::from_le_bytes([crc_bytes[0], crc_bytes[1], crc_bytes[2], crc_bytes[3]]);
let checksummed_data = data.get(payload_start..crc_start)?;
let computed_crc = crc32fast::hash(checksummed_data);
if stored_crc != computed_crc {
let seq_bytes = data.get(payload_start..payload_start + 8)?;
let seq = u64::from_le_bytes([
seq_bytes[0],
seq_bytes[1],
seq_bytes[2],
seq_bytes[3],
seq_bytes[4],
seq_bytes[5],
seq_bytes[6],
seq_bytes[7],
]);
self.offset = entry_end;
return Some(Err(JournalError::CorruptEntry {
sequence: seq,
expected_crc: stored_crc,
actual_crc: computed_crc,
}));
}
let seq_bytes = data.get(payload_start..payload_start + 8)?;
let sequence_num = u64::from_le_bytes([
seq_bytes[0],
seq_bytes[1],
seq_bytes[2],
seq_bytes[3],
seq_bytes[4],
seq_bytes[5],
seq_bytes[6],
seq_bytes[7],
]);
let json_start = payload_start.checked_add(16)?;
let json_data = data.get(json_start..crc_start)?;
let event: SequencerEvent<T> = match serde_json::from_slice(json_data) {
Ok(ev) => ev,
Err(e) => {
self.offset = entry_end;
return Some(Err(JournalError::DeserializationError {
sequence: sequence_num,
message: e.to_string(),
}));
}
};
self.offset = entry_end;
Some(Ok(JournalEntry { event, stored_crc }))
}
}
impl<T> Iterator for SegmentIterator<T>
where
T: for<'de> Deserialize<'de> + Clone + 'static,
{
type Item = Result<JournalEntry<T>, JournalError>;
fn next(&mut self) -> Option<Self::Item> {
if !self.started {
self.started = true;
match self.load_next_segment() {
Ok(true) => {}
Ok(false) => return None,
Err(e) => return Some(Err(e)),
}
}
loop {
if let Some(result) = self.decode_next() {
if let Ok(entry) = &result {
if entry.event.sequence_num < self.start_sequence {
continue;
}
}
return Some(result);
}
match self.load_next_segment() {
Ok(true) => continue,
Ok(false) => return None,
Err(e) => return Some(Err(e)),
}
}
}
}
fn segment_path(dir: &Path, start_sequence: u64) -> PathBuf {
dir.join(format!("segment-{start_sequence:020}.journal"))
}
fn list_segments(dir: &Path) -> Result<Vec<u64>, JournalError> {
let mut seqs = Vec::new();
let entries = fs::read_dir(dir).map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(dir.to_path_buf()),
})?;
for entry in entries {
let entry = entry.map_err(|e| JournalError::Io {
message: e.to_string(),
path: Some(dir.to_path_buf()),
})?;
let name = entry.file_name();
let name_str = name.to_string_lossy();
if let Some(rest) = name_str.strip_prefix("segment-")
&& let Some(seq_str) = rest.strip_suffix(".journal")
&& let Ok(seq) = seq_str.parse::<u64>()
{
seqs.push(seq);
}
}
Ok(seqs)
}
fn scan_write_position(data: &[u8], capacity: usize) -> usize {
let mut offset = 0usize;
while let Some(end) = offset.checked_add(4) {
if end > capacity || end > data.len() {
break;
}
let el_bytes = match data.get(offset..end) {
Some(b) => b,
None => break,
};
let entry_length =
u32::from_le_bytes([el_bytes[0], el_bytes[1], el_bytes[2], el_bytes[3]]) as usize;
if entry_length == 0 {
break;
}
let entry_end = match offset
.checked_add(4)
.and_then(|v| v.checked_add(entry_length))
{
Some(end) if end <= capacity && end <= data.len() => end,
_ => break,
};
offset = entry_end;
}
offset
}
fn scan_last_sequence(data: &[u8], write_pos: usize) -> Option<u64> {
let mut offset = 0usize;
let mut last_seq: Option<u64> = None;
while offset.checked_add(ENTRY_HEADER_SIZE).is_some() && offset + ENTRY_HEADER_SIZE <= write_pos
{
let el_bytes = data.get(offset..offset + 4)?;
let entry_length =
u32::from_le_bytes([el_bytes[0], el_bytes[1], el_bytes[2], el_bytes[3]]) as usize;
if entry_length == 0 {
break;
}
let entry_end = offset.checked_add(4)?.checked_add(entry_length)?;
if entry_end > write_pos {
break;
}
let seq_start = offset.checked_add(4)?;
let seq_bytes = data.get(seq_start..seq_start + 8)?;
let seq = u64::from_le_bytes([
seq_bytes[0],
seq_bytes[1],
seq_bytes[2],
seq_bytes[3],
seq_bytes[4],
seq_bytes[5],
seq_bytes[6],
seq_bytes[7],
]);
last_seq = Some(seq);
offset = entry_end;
}
last_seq
}
#[cfg(test)]
mod tests {
use super::*;
use crate::orderbook::sequencer::types::{SequencerCommand, SequencerResult};
use pricelevel::Id;
fn make_event(seq: u64) -> SequencerEvent<()> {
SequencerEvent {
sequence_num: seq,
timestamp_ns: 1_700_000_000_000_000_000u64.checked_add(seq).unwrap_or(0),
command: SequencerCommand::CancelOrder(Id::new_uuid()),
result: SequencerResult::OrderCancelled {
order_id: Id::new_uuid(),
},
}
}
#[test]
fn test_encode_entry_and_decode() {
let event = make_event(42);
let entry_bytes = FileJournal::<()>::encode_entry(&event);
assert!(entry_bytes.is_ok());
let buf = entry_bytes.unwrap_or_default();
assert!(!buf.is_empty());
let entry_length = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
assert_eq!(entry_length + 4, buf.len());
let seq = u64::from_le_bytes([
buf[4], buf[5], buf[6], buf[7], buf[8], buf[9], buf[10], buf[11],
]);
assert_eq!(seq, 42);
}
#[test]
fn test_write_and_read_single_entry() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
let journal = FileJournal::<()>::open(dir.path());
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
let event = make_event(0);
let result = journal.append(&event);
assert!(result.is_ok());
assert_eq!(journal.last_sequence(), Some(0));
let entries: Vec<_> = journal
.read_from(0)
.unwrap_or_else(|_| panic!("read_from"))
.collect();
assert_eq!(entries.len(), 1);
assert!(entries[0].is_ok());
let entry = entries[0].as_ref().unwrap_or_else(|_| panic!("entry"));
assert_eq!(entry.event.sequence_num, 0);
}
#[test]
fn test_write_and_read_multiple_entries() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
let journal = FileJournal::<()>::open(dir.path());
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
for i in 0..10 {
let event = make_event(i);
let result = journal.append(&event);
assert!(result.is_ok());
}
assert_eq!(journal.last_sequence(), Some(9));
let entries: Vec<_> = journal
.read_from(5)
.unwrap_or_else(|_| panic!("read_from"))
.collect();
assert_eq!(entries.len(), 5);
for (i, entry) in entries.iter().enumerate() {
assert!(entry.is_ok());
let e = entry.as_ref().unwrap_or_else(|_| panic!("entry"));
assert_eq!(e.event.sequence_num, 5 + i as u64);
}
}
#[test]
fn test_read_from_empty_journal() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
let journal = FileJournal::<()>::open(dir.path());
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
assert_eq!(journal.last_sequence(), None);
let entries: Vec<_> = journal
.read_from(0)
.unwrap_or_else(|_| panic!("read_from"))
.collect();
assert!(entries.is_empty());
}
#[test]
fn test_segment_rotation() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
let journal = FileJournal::<()>::open_with_segment_size(dir.path(), 512);
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
for i in 0..20 {
let event = make_event(i);
let result = journal.append(&event);
assert!(result.is_ok());
}
assert_eq!(journal.last_sequence(), Some(19));
let entries: Vec<_> = journal
.read_from(0)
.unwrap_or_else(|_| panic!("read_from"))
.collect();
assert_eq!(entries.len(), 20);
for (i, entry) in entries.iter().enumerate() {
assert!(entry.is_ok());
let e = entry.as_ref().unwrap_or_else(|_| panic!("entry"));
assert_eq!(e.event.sequence_num, i as u64);
}
let segments = list_segments(dir.path());
assert!(segments.is_ok());
let segs = segments.unwrap_or_default();
assert!(
segs.len() > 1,
"expected multiple segments, got {}",
segs.len()
);
}
#[test]
fn test_verify_integrity_on_valid_journal() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
let journal = FileJournal::<()>::open(dir.path());
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
for i in 0..5 {
let event = make_event(i);
let result = journal.append(&event);
assert!(result.is_ok());
}
let integrity = journal.verify_integrity();
assert!(integrity.is_ok());
}
#[test]
fn test_verify_integrity_detects_corruption() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
let journal = FileJournal::<()>::open(dir.path());
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
let event = make_event(0);
let result = journal.append(&event);
assert!(result.is_ok());
assert!(journal.verify_integrity().is_ok());
drop(journal);
let segments = list_segments(dir.path());
assert!(segments.is_ok());
let segs = segments.unwrap_or_default();
assert!(!segs.is_empty());
let seg_path = segment_path(dir.path(), segs[0]);
let mut data = fs::read(&seg_path).unwrap_or_default();
if data.len() > 30 {
data[25] ^= 0xFF;
}
fs::write(&seg_path, &data).unwrap_or_default();
let journal2 = FileJournal::<()>::open(dir.path());
assert!(journal2.is_ok());
let journal2 = journal2.unwrap_or_else(|_| panic!("reopen"));
let integrity = journal2.verify_integrity();
assert!(integrity.is_err());
let err_msg = format!("{}", integrity.unwrap_err());
assert!(err_msg.contains("corrupt journal entry"));
}
#[test]
fn test_archive_segments_before() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
let journal = FileJournal::<()>::open_with_segment_size(dir.path(), 512);
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
for i in 0..20 {
let event = make_event(i);
let result = journal.append(&event);
assert!(result.is_ok());
}
let segments_before = list_segments(dir.path()).unwrap_or_default();
assert!(segments_before.len() > 1);
let last_start = *segments_before.iter().max().unwrap_or(&0);
let archived = journal.archive_segments_before(last_start);
assert!(archived.is_ok());
let archived_count = archived.unwrap_or(0);
assert!(archived_count > 0);
let segments_after = list_segments(dir.path()).unwrap_or_default();
assert!(segments_after.len() < segments_before.len());
}
#[test]
fn test_reopen_journal_resumes() {
let dir = tempfile::tempdir();
assert!(dir.is_ok());
let dir = dir.unwrap_or_else(|_| panic!("tempdir"));
{
let journal = FileJournal::<()>::open(dir.path());
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("open"));
for i in 0..5 {
let event = make_event(i);
let result = journal.append(&event);
assert!(result.is_ok());
}
}
{
let journal = FileJournal::<()>::open(dir.path());
assert!(journal.is_ok());
let journal = journal.unwrap_or_else(|_| panic!("reopen"));
assert_eq!(journal.last_sequence(), Some(4));
for i in 5..10 {
let event = make_event(i);
let result = journal.append(&event);
assert!(result.is_ok());
}
assert_eq!(journal.last_sequence(), Some(9));
let entries: Vec<_> = journal
.read_from(0)
.unwrap_or_else(|_| panic!("read_from"))
.collect();
assert_eq!(entries.len(), 10);
}
}
#[test]
fn test_segment_path_format() {
let dir = PathBuf::from("/tmp/journal");
let path = segment_path(&dir, 42);
assert_eq!(
path.to_string_lossy(),
"/tmp/journal/segment-00000000000000000042.journal"
);
}
#[test]
fn test_entry_overhead_constant() {
assert_eq!(super::super::journal::ENTRY_OVERHEAD, 24);
assert_eq!(ENTRY_HEADER_SIZE, 20);
assert_eq!(ENTRY_CRC_SIZE, 4);
}
#[test]
fn test_journal_error_display() {
let err = JournalError::CorruptEntry {
sequence: 42,
expected_crc: 0xDEAD_BEEF,
actual_crc: 0xCAFE_BABE,
};
let display = format!("{err}");
assert!(display.contains("corrupt journal entry"));
assert!(display.contains("42"));
let err2 = JournalError::MutexPoisoned;
let display2 = format!("{err2}");
assert!(display2.contains("mutex poisoned"));
}
#[test]
fn test_sequencer_event_serialize_roundtrip() {
let event = make_event(7);
let json = serde_json::to_vec(&event);
assert!(json.is_ok());
let bytes = json.unwrap_or_default();
let decoded: Result<SequencerEvent<()>, _> = serde_json::from_slice(&bytes);
assert!(decoded.is_ok());
let decoded = decoded.unwrap_or_else(|_| panic!("decode"));
assert_eq!(decoded.sequence_num, 7);
}
}