xerv_core/arena/
header.rs1use crate::types::{ArenaOffset, TraceId};
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use std::io::{self, Read, Write};
6
7pub const ARENA_MAGIC: u64 = 0x5845_5256_4152_454E; pub const ARENA_VERSION: u32 = 1;
12
13pub const HEADER_SIZE: usize = 128;
15
16#[derive(Debug, Clone, Copy)]
21#[repr(C)]
22pub struct ArenaHeader {
23 pub magic: u64,
25 pub version: u32,
27 pub flags: u32,
29 pub trace_id: TraceId,
31 pub config_offset: ArenaOffset,
33 pub config_size: u32,
35 pub data_offset: ArenaOffset,
37 pub write_pos: ArenaOffset,
39 pub capacity: u64,
41 pub created_at: u64,
43 pub schema_hash: u64,
45 pub pipeline_version: u32,
47 pub _reserved: [u8; 32],
49}
50
51impl ArenaHeader {
52 pub fn new(trace_id: TraceId, capacity: u64) -> Self {
54 let now = std::time::SystemTime::now()
55 .duration_since(std::time::UNIX_EPOCH)
56 .map(|d| d.as_secs())
57 .unwrap_or(0);
58
59 Self {
60 magic: ARENA_MAGIC,
61 version: ARENA_VERSION,
62 flags: 0,
63 trace_id,
64 config_offset: ArenaOffset::new(HEADER_SIZE as u64),
65 config_size: 0,
66 data_offset: ArenaOffset::new(HEADER_SIZE as u64),
67 write_pos: ArenaOffset::new(HEADER_SIZE as u64),
68 capacity,
69 created_at: now,
70 schema_hash: 0,
71 pipeline_version: 0,
72 _reserved: [0u8; 32],
73 }
74 }
75
76 pub fn validate(&self) -> Result<(), &'static str> {
78 if self.magic != ARENA_MAGIC {
79 return Err("Invalid magic number");
80 }
81 if self.version != ARENA_VERSION {
82 return Err("Unsupported arena version");
83 }
84 if self.write_pos.as_u64() > self.capacity {
85 return Err("Write position exceeds capacity");
86 }
87 Ok(())
88 }
89
90 pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
92 if bytes.len() < HEADER_SIZE {
93 return Err(io::Error::new(
94 io::ErrorKind::InvalidData,
95 "Buffer too small for header",
96 ));
97 }
98
99 let mut cursor = io::Cursor::new(bytes);
100
101 let magic = cursor.read_u64::<LittleEndian>()?;
102 let version = cursor.read_u32::<LittleEndian>()?;
103 let flags = cursor.read_u32::<LittleEndian>()?;
104
105 let mut uuid_bytes = [0u8; 16];
107 cursor.read_exact(&mut uuid_bytes)?;
108 let trace_id = TraceId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes));
109
110 let config_offset = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
111 let config_size = cursor.read_u32::<LittleEndian>()?;
112 let _padding1 = cursor.read_u32::<LittleEndian>()?; let data_offset = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
114 let write_pos = ArenaOffset::new(cursor.read_u64::<LittleEndian>()?);
115 let capacity = cursor.read_u64::<LittleEndian>()?;
116 let created_at = cursor.read_u64::<LittleEndian>()?;
117 let schema_hash = cursor.read_u64::<LittleEndian>()?;
118 let pipeline_version = cursor.read_u32::<LittleEndian>()?;
119 let _padding2 = cursor.read_u32::<LittleEndian>()?; let mut reserved = [0u8; 32];
122 cursor.read_exact(&mut reserved)?;
123
124 Ok(Self {
125 magic,
126 version,
127 flags,
128 trace_id,
129 config_offset,
130 config_size,
131 data_offset,
132 write_pos,
133 capacity,
134 created_at,
135 schema_hash,
136 pipeline_version,
137 _reserved: reserved,
138 })
139 }
140
141 pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
143 let mut buf = Vec::with_capacity(HEADER_SIZE);
144
145 buf.write_u64::<LittleEndian>(self.magic)?;
146 buf.write_u32::<LittleEndian>(self.version)?;
147 buf.write_u32::<LittleEndian>(self.flags)?;
148
149 buf.write_all(self.trace_id.as_uuid().as_bytes())?;
151
152 buf.write_u64::<LittleEndian>(self.config_offset.as_u64())?;
153 buf.write_u32::<LittleEndian>(self.config_size)?;
154 buf.write_u32::<LittleEndian>(0)?; buf.write_u64::<LittleEndian>(self.data_offset.as_u64())?;
156 buf.write_u64::<LittleEndian>(self.write_pos.as_u64())?;
157 buf.write_u64::<LittleEndian>(self.capacity)?;
158 buf.write_u64::<LittleEndian>(self.created_at)?;
159 buf.write_u64::<LittleEndian>(self.schema_hash)?;
160 buf.write_u32::<LittleEndian>(self.pipeline_version)?;
161 buf.write_u32::<LittleEndian>(0)?; buf.write_all(&self._reserved)?;
164
165 debug_assert_eq!(buf.len(), HEADER_SIZE);
167
168 Ok(buf)
169 }
170
171 pub fn available_space(&self) -> u64 {
173 self.capacity.saturating_sub(self.write_pos.as_u64())
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn header_roundtrip() {
183 let trace_id = TraceId::new();
184 let header = ArenaHeader::new(trace_id, 1024 * 1024);
185
186 let bytes = header.to_bytes().unwrap();
187 assert_eq!(bytes.len(), HEADER_SIZE);
188
189 let restored = ArenaHeader::from_bytes(&bytes).unwrap();
190 assert_eq!(restored.magic, ARENA_MAGIC);
191 assert_eq!(restored.version, ARENA_VERSION);
192 assert_eq!(restored.trace_id, trace_id);
193 assert_eq!(restored.capacity, 1024 * 1024);
194 }
195
196 #[test]
197 fn header_validation() {
198 let header = ArenaHeader::new(TraceId::new(), 1024);
199 assert!(header.validate().is_ok());
200
201 let mut bad_magic = header;
202 bad_magic.magic = 0xDEADBEEF;
203 assert!(bad_magic.validate().is_err());
204
205 let mut bad_pos = header;
206 bad_pos.write_pos = ArenaOffset::new(2048);
207 assert!(bad_pos.validate().is_err());
208 }
209
210 #[test]
211 fn header_size_is_128() {
212 assert_eq!(HEADER_SIZE, 128);
214 }
215}