use crate::event::Event;
use crate::store::StoreError;
use serde::{Deserialize, Serialize};
use std::io::{Read, Seek, SeekFrom, Write};
pub const SEGMENT_MAGIC: &[u8; 4] = b"FBAT";
pub const SEGMENT_EXTENSION: &str = "fbat";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SegmentHeader {
pub version: u16,
pub flags: u16,
pub created_ns: i64,
pub segment_id: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FramePayload<P> {
pub event: Event<P>,
pub entity: String,
pub scope: String,
}
#[derive(Serialize)]
pub(crate) struct FramePayloadRef<'a, P> {
pub event: &'a Event<P>,
pub entity: &'a str,
pub scope: &'a str,
}
pub struct Active;
pub struct Sealed;
pub struct Segment<State> {
pub header: SegmentHeader,
pub path: std::path::PathBuf,
file: Option<std::fs::File>,
written_bytes: u64,
_state: std::marker::PhantomData<State>,
}
#[derive(Debug)]
pub struct CompactionResult {
pub segments_removed: usize,
pub bytes_reclaimed: u64,
}
pub fn frame_encode<T: serde::Serialize>(data: &T) -> Result<Vec<u8>, StoreError> {
let msgpack =
rmp_serde::to_vec_named(data).map_err(|e| StoreError::Serialization(Box::new(e)))?;
let crc = crc32fast::hash(&msgpack);
let len = u32::try_from(msgpack.len()).map_err(|_| StoreError::ser_msg("frame exceeds 4GB"))?;
let mut frame = Vec::with_capacity(8 + msgpack.len());
frame.extend_from_slice(&len.to_be_bytes());
frame.extend_from_slice(&crc.to_be_bytes());
frame.extend_from_slice(&msgpack);
Ok(frame)
}
#[derive(Debug)]
#[non_exhaustive]
pub enum FrameDecodeError {
TooShort,
Truncated {
expected_len: usize,
available: usize,
},
CrcMismatch {
expected: u32,
actual: u32,
},
}
impl std::fmt::Display for FrameDecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TooShort => write!(f, "frame too short for header"),
Self::Truncated {
expected_len,
available,
} => {
write!(
f,
"frame truncated: expected {expected_len} bytes, got {available}"
)
}
Self::CrcMismatch { expected, actual } => {
write!(
f,
"CRC mismatch: expected {expected:#010x}, got {actual:#010x}"
)
}
}
}
}
pub fn frame_decode(buf: &[u8]) -> Result<(&[u8], usize), FrameDecodeError> {
if buf.len() < 8 {
return Err(FrameDecodeError::TooShort);
}
let len = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
let expected_crc = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
if buf.len() < 8 + len {
return Err(FrameDecodeError::Truncated {
expected_len: 8 + len,
available: buf.len(),
});
}
let msgpack = &buf[8..8 + len];
let actual_crc = crc32fast::hash(msgpack);
if actual_crc != expected_crc {
return Err(FrameDecodeError::CrcMismatch {
expected: expected_crc,
actual: actual_crc,
});
}
Ok((msgpack, 8 + len))
}
pub fn segment_filename(segment_id: u64) -> String {
format!("{:06}.{}", segment_id, SEGMENT_EXTENSION)
}
impl Segment<Active> {
pub fn create(dir: &std::path::Path, segment_id: u64) -> Result<Self, StoreError> {
let path = dir.join(segment_filename(segment_id));
let mut file = std::fs::File::create_new(&path).map_err(StoreError::Io)?;
let header = SegmentHeader {
version: 1,
flags: 0,
#[allow(clippy::cast_possible_truncation)] created_ns: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64,
segment_id,
};
file.write_all(SEGMENT_MAGIC).map_err(StoreError::Io)?;
let header_bytes =
rmp_serde::to_vec_named(&header).map_err(|e| StoreError::Serialization(Box::new(e)))?;
#[allow(clippy::cast_possible_truncation)] let header_len = (header_bytes.len() as u32).to_be_bytes();
file.write_all(&header_len).map_err(StoreError::Io)?;
file.write_all(&header_bytes).map_err(StoreError::Io)?;
Ok(Self {
header,
path,
file: Some(file),
written_bytes: (4 + 4 + header_bytes.len()) as u64, _state: std::marker::PhantomData,
})
}
pub fn write_frame(&mut self, frame: &[u8]) -> Result<u64, StoreError> {
let offset = self.written_bytes;
if let Some(ref mut f) = self.file {
f.write_all(frame).map_err(StoreError::Io)?;
}
self.written_bytes += frame.len() as u64;
Ok(offset)
}
pub fn append_frames_from_segment(
&mut self,
path: &std::path::Path,
) -> Result<u64, StoreError> {
let mut source = std::fs::File::open(path).map_err(StoreError::Io)?;
let mut magic = [0u8; 4];
source.read_exact(&mut magic).map_err(StoreError::Io)?;
if &magic != SEGMENT_MAGIC {
return Err(StoreError::corrupt_magic(0));
}
let mut header_len_buf = [0u8; 4];
source
.read_exact(&mut header_len_buf)
.map_err(StoreError::Io)?;
let header_len = u32::from_be_bytes(header_len_buf) as u64;
let frames_start = 8 + header_len;
let file_len = source.seek(SeekFrom::End(0)).map_err(StoreError::Io)?;
let frames_end = detect_sidx_boundary(&mut source, file_len)?.unwrap_or(file_len);
source
.seek(SeekFrom::Start(frames_start))
.map_err(StoreError::Io)?;
let offset = self.written_bytes;
if let Some(ref mut destination) = self.file {
let bytes_to_copy = frames_end.saturating_sub(frames_start);
let copied = std::io::copy(&mut source.take(bytes_to_copy), destination)
.map_err(StoreError::Io)?;
self.written_bytes += copied;
}
Ok(offset)
}
pub fn needs_rotation(&self, max_bytes: u64) -> bool {
self.written_bytes >= max_bytes
}
pub fn sync_with_mode(&mut self, mode: &crate::store::SyncMode) -> Result<(), StoreError> {
if let Some(ref f) = self.file {
match mode {
crate::store::SyncMode::SyncAll => f.sync_all().map_err(StoreError::Io)?,
crate::store::SyncMode::SyncData => f.sync_data().map_err(StoreError::Io)?,
}
}
Ok(())
}
pub(crate) fn write_sidx_footer(
&mut self,
collector: &crate::store::sidx::SidxEntryCollector,
) -> Result<(), StoreError> {
if let Some(ref mut f) = self.file {
collector.write_footer(f)?;
}
Ok(())
}
pub fn seal(mut self) -> Segment<Sealed> {
drop(self.file.take());
Segment {
header: self.header,
path: self.path,
file: None,
written_bytes: self.written_bytes,
_state: std::marker::PhantomData,
}
}
}
fn detect_sidx_boundary<R: Read + Seek>(
source: &mut R,
file_len: u64,
) -> Result<Option<u64>, StoreError> {
const TRAILER_LEN: u64 = 16;
if file_len < TRAILER_LEN {
return Ok(None);
}
source
.seek(SeekFrom::End(-(TRAILER_LEN as i64)))
.map_err(StoreError::Io)?;
let mut trailer = [0u8; 16];
source.read_exact(&mut trailer).map_err(StoreError::Io)?;
if &trailer[12..16] != crate::store::sidx::SIDX_MAGIC {
return Ok(None);
}
let string_table_offset = u64::from_le_bytes([
trailer[0], trailer[1], trailer[2], trailer[3], trailer[4], trailer[5], trailer[6],
trailer[7],
]);
Ok(Some(string_table_offset))
}