use crate::types::{ArenaOffset, TraceId};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::io::{self, Read, Write};
pub const ARENA_MAGIC: u64 = 0x5845_5256_4152_454E;
pub const ARENA_VERSION: u32 = 1;
pub const HEADER_SIZE: usize = 128;
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct ArenaHeader {
pub magic: u64,
pub version: u32,
pub flags: u32,
pub trace_id: TraceId,
pub config_offset: ArenaOffset,
pub config_size: u32,
pub data_offset: ArenaOffset,
pub write_pos: ArenaOffset,
pub capacity: u64,
pub created_at: u64,
pub schema_hash: u64,
pub pipeline_version: u32,
pub _reserved: [u8; 32],
}
impl ArenaHeader {
pub fn new(trace_id: TraceId, capacity: u64) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
Self {
magic: ARENA_MAGIC,
version: ARENA_VERSION,
flags: 0,
trace_id,
config_offset: ArenaOffset::new(HEADER_SIZE as u64),
config_size: 0,
data_offset: ArenaOffset::new(HEADER_SIZE as u64),
write_pos: ArenaOffset::new(HEADER_SIZE as u64),
capacity,
created_at: now,
schema_hash: 0,
pipeline_version: 0,
_reserved: [0u8; 32],
}
}
pub fn validate(&self) -> Result<(), &'static str> {
if self.magic != ARENA_MAGIC {
return Err("Invalid magic number");
}
if self.version != ARENA_VERSION {
return Err("Unsupported arena version");
}
if self.write_pos.as_u64() > self.capacity {
return Err("Write position exceeds capacity");
}
Ok(())
}
pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
if bytes.len() < HEADER_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Buffer too small for header",
));
}
let mut cursor = io::Cursor::new(bytes);
let magic = cursor.read_u64::<LittleEndian>()?;
let version = cursor.read_u32::<LittleEndian>()?;
let flags = cursor.read_u32::<LittleEndian>()?;
let mut uuid_bytes = [0u8; 16];
cursor.read_exact(&mut uuid_bytes)?;
let trace_id = TraceId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes));
let config_offset = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
let config_size = cursor.read_u32::<LittleEndian>()?;
let _padding1 = cursor.read_u32::<LittleEndian>()?; let data_offset = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
let write_pos = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
let capacity = cursor.read_u64::<LittleEndian>()?;
let created_at = cursor.read_u64::<LittleEndian>()?;
let schema_hash = cursor.read_u64::<LittleEndian>()?;
let pipeline_version = cursor.read_u32::<LittleEndian>()?;
let _padding2 = cursor.read_u32::<LittleEndian>()?;
let mut reserved = [0u8; 32];
cursor.read_exact(&mut reserved)?;
Ok(Self {
magic,
version,
flags,
trace_id,
config_offset,
config_size,
data_offset,
write_pos,
capacity,
created_at,
schema_hash,
pipeline_version,
_reserved: reserved,
})
}
pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
let mut buf = Vec::with_capacity(HEADER_SIZE);
buf.write_u64::<LittleEndian>(self.magic)?;
buf.write_u32::<LittleEndian>(self.version)?;
buf.write_u32::<LittleEndian>(self.flags)?;
buf.write_all(self.trace_id.as_uuid().as_bytes())?;
buf.write_u64::<LittleEndian>(self.config_offset.as_u64())?;
buf.write_u32::<LittleEndian>(self.config_size)?;
buf.write_u32::<LittleEndian>(0)?; buf.write_u64::<LittleEndian>(self.data_offset.as_u64())?;
buf.write_u64::<LittleEndian>(self.write_pos.as_u64())?;
buf.write_u64::<LittleEndian>(self.capacity)?;
buf.write_u64::<LittleEndian>(self.created_at)?;
buf.write_u64::<LittleEndian>(self.schema_hash)?;
buf.write_u32::<LittleEndian>(self.pipeline_version)?;
buf.write_u32::<LittleEndian>(0)?;
buf.write_all(&self._reserved)?;
debug_assert_eq!(buf.len(), HEADER_SIZE);
Ok(buf)
}
pub fn available_space(&self) -> u64 {
self.capacity.saturating_sub(self.write_pos.as_u64())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn header_roundtrip() {
let trace_id = TraceId::new();
let header = ArenaHeader::new(trace_id, 1024 * 1024);
let bytes = header.to_bytes().unwrap();
assert_eq!(bytes.len(), HEADER_SIZE);
let restored = ArenaHeader::from_bytes(&bytes).unwrap();
assert_eq!(restored.magic, ARENA_MAGIC);
assert_eq!(restored.version, ARENA_VERSION);
assert_eq!(restored.trace_id, trace_id);
assert_eq!(restored.capacity, 1024 * 1024);
}
#[test]
fn header_validation() {
let header = ArenaHeader::new(TraceId::new(), 1024);
assert!(header.validate().is_ok());
let mut bad_magic = header;
bad_magic.magic = 0xDEADBEEF;
assert!(bad_magic.validate().is_err());
let mut bad_pos = header;
bad_pos.write_pos = ArenaOffset::new(2048);
assert!(bad_pos.validate().is_err());
}
#[test]
fn header_size_is_128() {
assert_eq!(HEADER_SIZE, 128);
}
}