xerv_core/arena/
header.rs

1//! Arena header structure.
2
3use crate::types::{ArenaOffset, TraceId};
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use std::io::{self, Read, Write};
6
7/// Magic number for XERV arena files.
8pub const ARENA_MAGIC: u64 = 0x5845_5256_4152_454E; // "XERVARENA" in hex
9
10/// Current arena format version.
11pub const ARENA_VERSION: u32 = 1;
12
13/// Fixed size of the arena header in bytes.
14pub const HEADER_SIZE: usize = 128;
15
16/// Arena file header.
17///
18/// This is stored at the beginning of every arena file and contains
19/// metadata needed to interpret the rest of the file.
20#[derive(Debug, Clone, Copy)]
21#[repr(C)]
22pub struct ArenaHeader {
23    /// Magic number for file identification.
24    pub magic: u64,
25    /// Arena format version.
26    pub version: u32,
27    /// Flags (reserved for future use).
28    pub flags: u32,
29    /// Trace ID that owns this arena.
30    pub trace_id: TraceId,
31    /// Offset to the pipeline configuration data.
32    pub config_offset: ArenaOffset,
33    /// Size of the pipeline configuration in bytes.
34    pub config_size: u32,
35    /// Offset to the start of the data region.
36    pub data_offset: ArenaOffset,
37    /// Current write position (end of valid data).
38    pub write_pos: ArenaOffset,
39    /// Total capacity of the arena file.
40    pub capacity: u64,
41    /// Creation timestamp (Unix epoch seconds).
42    pub created_at: u64,
43    /// Schema version string hash (for compatibility checking).
44    pub schema_hash: u64,
45    /// Pipeline version number.
46    pub pipeline_version: u32,
47    /// Reserved for alignment and future use.
48    pub _reserved: [u8; 32],
49}
50
51impl ArenaHeader {
52    /// Create a new arena header for the given trace.
53    pub fn new(trace_id: TraceId, capacity: u64) -> Self {
54        let now = std::time::SystemTime::now()
55            .duration_since(std::time::UNIX_EPOCH)
56            .map(|d| d.as_secs())
57            .unwrap_or(0);
58
59        Self {
60            magic: ARENA_MAGIC,
61            version: ARENA_VERSION,
62            flags: 0,
63            trace_id,
64            config_offset: ArenaOffset::new(HEADER_SIZE as u64),
65            config_size: 0,
66            data_offset: ArenaOffset::new(HEADER_SIZE as u64),
67            write_pos: ArenaOffset::new(HEADER_SIZE as u64),
68            capacity,
69            created_at: now,
70            schema_hash: 0,
71            pipeline_version: 0,
72            _reserved: [0u8; 32],
73        }
74    }
75
76    /// Validate the header.
77    pub fn validate(&self) -> Result<(), &'static str> {
78        if self.magic != ARENA_MAGIC {
79            return Err("Invalid magic number");
80        }
81        if self.version != ARENA_VERSION {
82            return Err("Unsupported arena version");
83        }
84        if self.write_pos.as_u64() > self.capacity {
85            return Err("Write position exceeds capacity");
86        }
87        Ok(())
88    }
89
90    /// Read header from a byte slice.
91    pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
92        if bytes.len() < HEADER_SIZE {
93            return Err(io::Error::new(
94                io::ErrorKind::InvalidData,
95                "Buffer too small for header",
96            ));
97        }
98
99        let mut cursor = io::Cursor::new(bytes);
100
101        let magic = cursor.read_u64::<LittleEndian>()?;
102        let version = cursor.read_u32::<LittleEndian>()?;
103        let flags = cursor.read_u32::<LittleEndian>()?;
104
105        // Read trace_id (16 bytes UUID)
106        let mut uuid_bytes = [0u8; 16];
107        cursor.read_exact(&mut uuid_bytes)?;
108        let trace_id = TraceId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes));
109
110        let config_offset = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
111        let config_size = cursor.read_u32::<LittleEndian>()?;
112        let _padding1 = cursor.read_u32::<LittleEndian>()?; // alignment padding
113        let data_offset = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
114        let write_pos = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
115        let capacity = cursor.read_u64::<LittleEndian>()?;
116        let created_at = cursor.read_u64::<LittleEndian>()?;
117        let schema_hash = cursor.read_u64::<LittleEndian>()?;
118        let pipeline_version = cursor.read_u32::<LittleEndian>()?;
119        let _padding2 = cursor.read_u32::<LittleEndian>()?; // alignment padding after pipeline_version
120
121        let mut reserved = [0u8; 32];
122        cursor.read_exact(&mut reserved)?;
123
124        Ok(Self {
125            magic,
126            version,
127            flags,
128            trace_id,
129            config_offset,
130            config_size,
131            data_offset,
132            write_pos,
133            capacity,
134            created_at,
135            schema_hash,
136            pipeline_version,
137            _reserved: reserved,
138        })
139    }
140
141    /// Write header to a byte buffer.
142    pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
143        let mut buf = Vec::with_capacity(HEADER_SIZE);
144
145        buf.write_u64::<LittleEndian>(self.magic)?;
146        buf.write_u32::<LittleEndian>(self.version)?;
147        buf.write_u32::<LittleEndian>(self.flags)?;
148
149        // Write trace_id (16 bytes UUID)
150        buf.write_all(self.trace_id.as_uuid().as_bytes())?;
151
152        buf.write_u64::<LittleEndian>(self.config_offset.as_u64())?;
153        buf.write_u32::<LittleEndian>(self.config_size)?;
154        buf.write_u32::<LittleEndian>(0)?; // alignment padding
155        buf.write_u64::<LittleEndian>(self.data_offset.as_u64())?;
156        buf.write_u64::<LittleEndian>(self.write_pos.as_u64())?;
157        buf.write_u64::<LittleEndian>(self.capacity)?;
158        buf.write_u64::<LittleEndian>(self.created_at)?;
159        buf.write_u64::<LittleEndian>(self.schema_hash)?;
160        buf.write_u32::<LittleEndian>(self.pipeline_version)?;
161        buf.write_u32::<LittleEndian>(0)?; // alignment padding after pipeline_version
162
163        buf.write_all(&self._reserved)?;
164
165        // Ensure we wrote exactly HEADER_SIZE bytes
166        debug_assert_eq!(buf.len(), HEADER_SIZE);
167
168        Ok(buf)
169    }
170
171    /// Get available space for data.
172    pub fn available_space(&self) -> u64 {
173        self.capacity.saturating_sub(self.write_pos.as_u64())
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    #[test]
182    fn header_roundtrip() {
183        let trace_id = TraceId::new();
184        let header = ArenaHeader::new(trace_id, 1024 * 1024);
185
186        let bytes = header.to_bytes().unwrap();
187        assert_eq!(bytes.len(), HEADER_SIZE);
188
189        let restored = ArenaHeader::from_bytes(&bytes).unwrap();
190        assert_eq!(restored.magic, ARENA_MAGIC);
191        assert_eq!(restored.version, ARENA_VERSION);
192        assert_eq!(restored.trace_id, trace_id);
193        assert_eq!(restored.capacity, 1024 * 1024);
194    }
195
196    #[test]
197    fn header_validation() {
198        let header = ArenaHeader::new(TraceId::new(), 1024);
199        assert!(header.validate().is_ok());
200
201        let mut bad_magic = header;
202        bad_magic.magic = 0xDEADBEEF;
203        assert!(bad_magic.validate().is_err());
204
205        let mut bad_pos = header;
206        bad_pos.write_pos = ArenaOffset::new(2048);
207        assert!(bad_pos.validate().is_err());
208    }
209
210    #[test]
211    fn header_size_is_128() {
212        // Ensure header fits in exactly 128 bytes for alignment
213        assert_eq!(HEADER_SIZE, 128);
214    }
215}