1use crate::types::{ArenaOffset, NodeId, TraceId};
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use std::io::{self, Read, Write};
6
7pub const MIN_RECORD_SIZE: usize = 4 + 4 + 1 + 16 + 4; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
12#[repr(u8)]
13pub enum WalRecordType {
14 TraceStart = 0,
16 NodeStart = 1,
18 NodeDone = 2,
20 NodeError = 3,
22 TraceComplete = 4,
24 TraceFailed = 5,
26 TraceSuspended = 6,
28 TraceResumed = 7,
30 Checkpoint = 8,
32 LoopIteration = 9,
34 LoopExit = 10,
36}
37
38impl TryFrom<u8> for WalRecordType {
39 type Error = &'static str;
40
41 fn try_from(value: u8) -> Result<Self, Self::Error> {
42 match value {
43 0 => Ok(Self::TraceStart),
44 1 => Ok(Self::NodeStart),
45 2 => Ok(Self::NodeDone),
46 3 => Ok(Self::NodeError),
47 4 => Ok(Self::TraceComplete),
48 5 => Ok(Self::TraceFailed),
49 6 => Ok(Self::TraceSuspended),
50 7 => Ok(Self::TraceResumed),
51 8 => Ok(Self::Checkpoint),
52 9 => Ok(Self::LoopIteration),
53 10 => Ok(Self::LoopExit),
54 _ => Err("Unknown WAL record type"),
55 }
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct WalRecord {
62 pub record_type: WalRecordType,
64 pub trace_id: TraceId,
66 pub node_id: NodeId,
68 pub timestamp_ns: u64,
70 pub output_offset: ArenaOffset,
72 pub output_size: u32,
74 pub schema_hash: u64,
76 pub error_message: Option<String>,
78 pub iteration: u32,
80 pub metadata: Option<String>,
82}
83
84impl WalRecord {
85 pub fn trace_start(trace_id: TraceId) -> Self {
87 Self {
88 record_type: WalRecordType::TraceStart,
89 trace_id,
90 node_id: NodeId::new(0),
91 timestamp_ns: current_timestamp_ns(),
92 output_offset: ArenaOffset::NULL,
93 output_size: 0,
94 schema_hash: 0,
95 error_message: None,
96 iteration: 0,
97 metadata: None,
98 }
99 }
100
101 pub fn node_start(trace_id: TraceId, node_id: NodeId) -> Self {
103 Self {
104 record_type: WalRecordType::NodeStart,
105 trace_id,
106 node_id,
107 timestamp_ns: current_timestamp_ns(),
108 output_offset: ArenaOffset::NULL,
109 output_size: 0,
110 schema_hash: 0,
111 error_message: None,
112 iteration: 0,
113 metadata: None,
114 }
115 }
116
117 pub fn node_done(
119 trace_id: TraceId,
120 node_id: NodeId,
121 output_offset: ArenaOffset,
122 output_size: u32,
123 schema_hash: u64,
124 ) -> Self {
125 Self {
126 record_type: WalRecordType::NodeDone,
127 trace_id,
128 node_id,
129 timestamp_ns: current_timestamp_ns(),
130 output_offset,
131 output_size,
132 schema_hash,
133 error_message: None,
134 iteration: 0,
135 metadata: None,
136 }
137 }
138
139 pub fn node_error(trace_id: TraceId, node_id: NodeId, error: impl ToString) -> Self {
141 Self {
142 record_type: WalRecordType::NodeError,
143 trace_id,
144 node_id,
145 timestamp_ns: current_timestamp_ns(),
146 output_offset: ArenaOffset::NULL,
147 output_size: 0,
148 schema_hash: 0,
149 error_message: Some(error.to_string()),
150 iteration: 0,
151 metadata: None,
152 }
153 }
154
155 pub fn trace_complete(trace_id: TraceId) -> Self {
157 Self {
158 record_type: WalRecordType::TraceComplete,
159 trace_id,
160 node_id: NodeId::new(0),
161 timestamp_ns: current_timestamp_ns(),
162 output_offset: ArenaOffset::NULL,
163 output_size: 0,
164 schema_hash: 0,
165 error_message: None,
166 iteration: 0,
167 metadata: None,
168 }
169 }
170
171 pub fn trace_failed(trace_id: TraceId, error: impl ToString) -> Self {
173 Self {
174 record_type: WalRecordType::TraceFailed,
175 trace_id,
176 node_id: NodeId::new(0),
177 timestamp_ns: current_timestamp_ns(),
178 output_offset: ArenaOffset::NULL,
179 output_size: 0,
180 schema_hash: 0,
181 error_message: Some(error.to_string()),
182 iteration: 0,
183 metadata: None,
184 }
185 }
186
187 pub fn trace_suspended(trace_id: TraceId, node_id: NodeId) -> Self {
189 Self {
190 record_type: WalRecordType::TraceSuspended,
191 trace_id,
192 node_id,
193 timestamp_ns: current_timestamp_ns(),
194 output_offset: ArenaOffset::NULL,
195 output_size: 0,
196 schema_hash: 0,
197 error_message: None,
198 iteration: 0,
199 metadata: None,
200 }
201 }
202
203 pub fn trace_resumed(trace_id: TraceId) -> Self {
205 Self {
206 record_type: WalRecordType::TraceResumed,
207 trace_id,
208 node_id: NodeId::new(0),
209 timestamp_ns: current_timestamp_ns(),
210 output_offset: ArenaOffset::NULL,
211 output_size: 0,
212 schema_hash: 0,
213 error_message: None,
214 iteration: 0,
215 metadata: None,
216 }
217 }
218
219 pub fn loop_iteration(trace_id: TraceId, node_id: NodeId, iteration: u32) -> Self {
221 Self {
222 record_type: WalRecordType::LoopIteration,
223 trace_id,
224 node_id,
225 timestamp_ns: current_timestamp_ns(),
226 output_offset: ArenaOffset::NULL,
227 output_size: 0,
228 schema_hash: 0,
229 error_message: None,
230 iteration,
231 metadata: None,
232 }
233 }
234
235 pub fn with_metadata(mut self, metadata: impl ToString) -> Self {
237 self.metadata = Some(metadata.to_string());
238 self
239 }
240
241 pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
243 let mut payload = Vec::new();
244
245 payload.write_u64::<LittleEndian>(self.timestamp_ns)?;
247 payload.write_u64::<LittleEndian>(self.output_offset.as_u64())?;
248 payload.write_u32::<LittleEndian>(self.output_size)?;
249 payload.write_u64::<LittleEndian>(self.schema_hash)?;
250 payload.write_u32::<LittleEndian>(self.iteration)?;
251
252 if let Some(ref msg) = self.error_message {
254 let bytes = msg.as_bytes();
255 payload.write_u32::<LittleEndian>(bytes.len() as u32)?;
256 payload.write_all(bytes)?;
257 } else {
258 payload.write_u32::<LittleEndian>(0)?;
259 }
260
261 if let Some(ref meta) = self.metadata {
263 let bytes = meta.as_bytes();
264 payload.write_u32::<LittleEndian>(bytes.len() as u32)?;
265 payload.write_all(bytes)?;
266 } else {
267 payload.write_u32::<LittleEndian>(0)?;
268 }
269
270 let crc = crc32fast::hash(&payload);
272
273 let total_len = MIN_RECORD_SIZE + payload.len();
275 let mut record = Vec::with_capacity(total_len);
276
277 record.write_u32::<LittleEndian>(total_len as u32)?;
278 record.write_u32::<LittleEndian>(crc)?;
279 record.write_u8(self.record_type as u8)?;
280 record.write_all(self.trace_id.as_uuid().as_bytes())?;
281 record.write_u32::<LittleEndian>(self.node_id.as_u32())?;
282 record.write_all(&payload)?;
283
284 Ok(record)
285 }
286
287 pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
289 if bytes.len() < MIN_RECORD_SIZE {
290 return Err(io::Error::new(
291 io::ErrorKind::InvalidData,
292 "Record too small",
293 ));
294 }
295
296 let mut cursor = io::Cursor::new(bytes);
297
298 let total_len = cursor.read_u32::<LittleEndian>()? as usize;
299 let stored_crc = cursor.read_u32::<LittleEndian>()?;
300 let record_type_byte = cursor.read_u8()?;
301
302 let record_type = WalRecordType::try_from(record_type_byte)
303 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
304
305 let mut uuid_bytes = [0u8; 16];
306 cursor.read_exact(&mut uuid_bytes)?;
307 let trace_id = TraceId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes));
308
309 let node_id = NodeId::new(cursor.read_u32::<LittleEndian>()?);
310
311 let payload_start = MIN_RECORD_SIZE;
313 let payload_end = total_len;
314
315 if bytes.len() < payload_end {
316 return Err(io::Error::new(
317 io::ErrorKind::InvalidData,
318 format!(
319 "Record truncated: expected {} bytes, got {}",
320 total_len,
321 bytes.len()
322 ),
323 ));
324 }
325
326 let payload = &bytes[payload_start..payload_end];
327
328 let computed_crc = crc32fast::hash(payload);
330 if computed_crc != stored_crc {
331 return Err(io::Error::new(
332 io::ErrorKind::InvalidData,
333 format!(
334 "CRC mismatch: expected {}, got {}",
335 stored_crc, computed_crc
336 ),
337 ));
338 }
339
340 let mut payload_cursor = io::Cursor::new(payload);
342
343 let timestamp_ns = payload_cursor.read_u64::<LittleEndian>()?;
344 let output_offset = ArenaOffset::new(payload_cursor.read_u64::<LittleEndian>()?);
345 let output_size = payload_cursor.read_u32::<LittleEndian>()?;
346 let schema_hash = payload_cursor.read_u64::<LittleEndian>()?;
347 let iteration = payload_cursor.read_u32::<LittleEndian>()?;
348
349 let error_len = payload_cursor.read_u32::<LittleEndian>()? as usize;
351 let error_message = if error_len > 0 {
352 let mut buf = vec![0u8; error_len];
353 payload_cursor.read_exact(&mut buf)?;
354 Some(String::from_utf8_lossy(&buf).into_owned())
355 } else {
356 None
357 };
358
359 let meta_len = payload_cursor.read_u32::<LittleEndian>()? as usize;
361 let metadata = if meta_len > 0 {
362 let mut buf = vec![0u8; meta_len];
363 payload_cursor.read_exact(&mut buf)?;
364 Some(String::from_utf8_lossy(&buf).into_owned())
365 } else {
366 None
367 };
368
369 Ok(Self {
370 record_type,
371 trace_id,
372 node_id,
373 timestamp_ns,
374 output_offset,
375 output_size,
376 schema_hash,
377 error_message,
378 iteration,
379 metadata,
380 })
381 }
382
383 pub fn serialized_size(&self) -> usize {
385 let mut size = MIN_RECORD_SIZE;
386 size += 8 + 8 + 4 + 8 + 4; size += 4; if let Some(ref msg) = self.error_message {
389 size += msg.len();
390 }
391 size += 4; if let Some(ref meta) = self.metadata {
393 size += meta.len();
394 }
395 size
396 }
397}
398
399fn current_timestamp_ns() -> u64 {
401 std::time::SystemTime::now()
402 .duration_since(std::time::UNIX_EPOCH)
403 .map(|d| d.as_nanos() as u64)
404 .unwrap_or(0)
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[test]
412 fn record_roundtrip() {
413 let trace_id = TraceId::new();
414 let node_id = NodeId::new(42);
415
416 let record =
417 WalRecord::node_done(trace_id, node_id, ArenaOffset::new(0x1000), 256, 0xDEADBEEF);
418
419 let bytes = record.to_bytes().unwrap();
420 let restored = WalRecord::from_bytes(&bytes).unwrap();
421
422 assert_eq!(restored.record_type, WalRecordType::NodeDone);
423 assert_eq!(restored.trace_id, trace_id);
424 assert_eq!(restored.node_id, node_id);
425 assert_eq!(restored.output_offset, ArenaOffset::new(0x1000));
426 assert_eq!(restored.output_size, 256);
427 assert_eq!(restored.schema_hash, 0xDEADBEEF);
428 }
429
430 #[test]
431 fn record_with_error() {
432 let trace_id = TraceId::new();
433 let node_id = NodeId::new(1);
434
435 let record = WalRecord::node_error(trace_id, node_id, "Something went wrong");
436
437 let bytes = record.to_bytes().unwrap();
438 let restored = WalRecord::from_bytes(&bytes).unwrap();
439
440 assert_eq!(restored.record_type, WalRecordType::NodeError);
441 assert_eq!(
442 restored.error_message.as_deref(),
443 Some("Something went wrong")
444 );
445 }
446
447 #[test]
448 fn record_with_metadata() {
449 let trace_id = TraceId::new();
450
451 let record =
452 WalRecord::trace_start(trace_id).with_metadata(r#"{"pipeline": "order_processing"}"#);
453
454 let bytes = record.to_bytes().unwrap();
455 let restored = WalRecord::from_bytes(&bytes).unwrap();
456
457 assert_eq!(
458 restored.metadata.as_deref(),
459 Some(r#"{"pipeline": "order_processing"}"#)
460 );
461 }
462
463 #[test]
464 fn crc_verification() {
465 let record = WalRecord::trace_start(TraceId::new());
466 let mut bytes = record.to_bytes().unwrap();
467
468 if bytes.len() > MIN_RECORD_SIZE {
470 bytes[MIN_RECORD_SIZE] ^= 0xFF;
471 }
472
473 assert!(WalRecord::from_bytes(&bytes).is_err());
475 }
476}