xerv_core/wal/
record.rs

1//! WAL record types and serialization.
2
3use crate::types::{ArenaOffset, NodeId, TraceId};
4use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5use std::io::{self, Read, Write};
6
7/// Minimum record size (header without payload).
8pub const MIN_RECORD_SIZE: usize = 4 + 4 + 1 + 16 + 4; // 29 bytes
9
10/// Type of WAL record.
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12#[repr(u8)]
13pub enum WalRecordType {
14    /// Trace started.
15    TraceStart = 0,
16    /// Node execution started.
17    NodeStart = 1,
18    /// Node execution completed successfully.
19    NodeDone = 2,
20    /// Node execution failed.
21    NodeError = 3,
22    /// Trace completed successfully.
23    TraceComplete = 4,
24    /// Trace failed.
25    TraceFailed = 5,
26    /// Trace suspended (e.g., at a wait node).
27    TraceSuspended = 6,
28    /// Trace resumed.
29    TraceResumed = 7,
30    /// Checkpoint marker.
31    Checkpoint = 8,
32    /// Loop iteration started.
33    LoopIteration = 9,
34    /// Loop exited.
35    LoopExit = 10,
36}
37
38impl TryFrom<u8> for WalRecordType {
39    type Error = &'static str;
40
41    fn try_from(value: u8) -> Result<Self, Self::Error> {
42        match value {
43            0 => Ok(Self::TraceStart),
44            1 => Ok(Self::NodeStart),
45            2 => Ok(Self::NodeDone),
46            3 => Ok(Self::NodeError),
47            4 => Ok(Self::TraceComplete),
48            5 => Ok(Self::TraceFailed),
49            6 => Ok(Self::TraceSuspended),
50            7 => Ok(Self::TraceResumed),
51            8 => Ok(Self::Checkpoint),
52            9 => Ok(Self::LoopIteration),
53            10 => Ok(Self::LoopExit),
54            _ => Err("Unknown WAL record type"),
55        }
56    }
57}
58
59/// A single WAL record.
60#[derive(Debug, Clone)]
61pub struct WalRecord {
62    /// Type of this record.
63    pub record_type: WalRecordType,
64    /// The trace this record belongs to.
65    pub trace_id: TraceId,
66    /// The node this record is about (if applicable).
67    pub node_id: NodeId,
68    /// Timestamp (Unix epoch nanoseconds).
69    pub timestamp_ns: u64,
70    /// Arena offset for output data (if applicable).
71    pub output_offset: ArenaOffset,
72    /// Output size in bytes.
73    pub output_size: u32,
74    /// Schema version hash.
75    pub schema_hash: u64,
76    /// Error message (if applicable).
77    pub error_message: Option<String>,
78    /// Loop iteration number (for loop records).
79    pub iteration: u32,
80    /// Additional metadata (JSON-encoded).
81    pub metadata: Option<String>,
82}
83
84impl WalRecord {
85    /// Create a trace start record.
86    pub fn trace_start(trace_id: TraceId) -> Self {
87        Self {
88            record_type: WalRecordType::TraceStart,
89            trace_id,
90            node_id: NodeId::new(0),
91            timestamp_ns: current_timestamp_ns(),
92            output_offset: ArenaOffset::NULL,
93            output_size: 0,
94            schema_hash: 0,
95            error_message: None,
96            iteration: 0,
97            metadata: None,
98        }
99    }
100
101    /// Create a node start record.
102    pub fn node_start(trace_id: TraceId, node_id: NodeId) -> Self {
103        Self {
104            record_type: WalRecordType::NodeStart,
105            trace_id,
106            node_id,
107            timestamp_ns: current_timestamp_ns(),
108            output_offset: ArenaOffset::NULL,
109            output_size: 0,
110            schema_hash: 0,
111            error_message: None,
112            iteration: 0,
113            metadata: None,
114        }
115    }
116
117    /// Create a node done record.
118    pub fn node_done(
119        trace_id: TraceId,
120        node_id: NodeId,
121        output_offset: ArenaOffset,
122        output_size: u32,
123        schema_hash: u64,
124    ) -> Self {
125        Self {
126            record_type: WalRecordType::NodeDone,
127            trace_id,
128            node_id,
129            timestamp_ns: current_timestamp_ns(),
130            output_offset,
131            output_size,
132            schema_hash,
133            error_message: None,
134            iteration: 0,
135            metadata: None,
136        }
137    }
138
139    /// Create a node error record.
140    pub fn node_error(trace_id: TraceId, node_id: NodeId, error: impl ToString) -> Self {
141        Self {
142            record_type: WalRecordType::NodeError,
143            trace_id,
144            node_id,
145            timestamp_ns: current_timestamp_ns(),
146            output_offset: ArenaOffset::NULL,
147            output_size: 0,
148            schema_hash: 0,
149            error_message: Some(error.to_string()),
150            iteration: 0,
151            metadata: None,
152        }
153    }
154
155    /// Create a trace complete record.
156    pub fn trace_complete(trace_id: TraceId) -> Self {
157        Self {
158            record_type: WalRecordType::TraceComplete,
159            trace_id,
160            node_id: NodeId::new(0),
161            timestamp_ns: current_timestamp_ns(),
162            output_offset: ArenaOffset::NULL,
163            output_size: 0,
164            schema_hash: 0,
165            error_message: None,
166            iteration: 0,
167            metadata: None,
168        }
169    }
170
171    /// Create a trace failed record.
172    pub fn trace_failed(trace_id: TraceId, error: impl ToString) -> Self {
173        Self {
174            record_type: WalRecordType::TraceFailed,
175            trace_id,
176            node_id: NodeId::new(0),
177            timestamp_ns: current_timestamp_ns(),
178            output_offset: ArenaOffset::NULL,
179            output_size: 0,
180            schema_hash: 0,
181            error_message: Some(error.to_string()),
182            iteration: 0,
183            metadata: None,
184        }
185    }
186
187    /// Create a trace suspended record.
188    pub fn trace_suspended(trace_id: TraceId, node_id: NodeId) -> Self {
189        Self {
190            record_type: WalRecordType::TraceSuspended,
191            trace_id,
192            node_id,
193            timestamp_ns: current_timestamp_ns(),
194            output_offset: ArenaOffset::NULL,
195            output_size: 0,
196            schema_hash: 0,
197            error_message: None,
198            iteration: 0,
199            metadata: None,
200        }
201    }
202
203    /// Create a trace resumed record (for crash recovery or wait resumption).
204    pub fn trace_resumed(trace_id: TraceId) -> Self {
205        Self {
206            record_type: WalRecordType::TraceResumed,
207            trace_id,
208            node_id: NodeId::new(0),
209            timestamp_ns: current_timestamp_ns(),
210            output_offset: ArenaOffset::NULL,
211            output_size: 0,
212            schema_hash: 0,
213            error_message: None,
214            iteration: 0,
215            metadata: None,
216        }
217    }
218
219    /// Create a loop iteration record.
220    pub fn loop_iteration(trace_id: TraceId, node_id: NodeId, iteration: u32) -> Self {
221        Self {
222            record_type: WalRecordType::LoopIteration,
223            trace_id,
224            node_id,
225            timestamp_ns: current_timestamp_ns(),
226            output_offset: ArenaOffset::NULL,
227            output_size: 0,
228            schema_hash: 0,
229            error_message: None,
230            iteration,
231            metadata: None,
232        }
233    }
234
235    /// Set metadata on this record.
236    pub fn with_metadata(mut self, metadata: impl ToString) -> Self {
237        self.metadata = Some(metadata.to_string());
238        self
239    }
240
241    /// Serialize the record to bytes.
242    pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
243        let mut payload = Vec::new();
244
245        // Write fixed fields
246        payload.write_u64::<LittleEndian>(self.timestamp_ns)?;
247        payload.write_u64::<LittleEndian>(self.output_offset.as_u64())?;
248        payload.write_u32::<LittleEndian>(self.output_size)?;
249        payload.write_u64::<LittleEndian>(self.schema_hash)?;
250        payload.write_u32::<LittleEndian>(self.iteration)?;
251
252        // Write error message (length-prefixed)
253        if let Some(ref msg) = self.error_message {
254            let bytes = msg.as_bytes();
255            payload.write_u32::<LittleEndian>(bytes.len() as u32)?;
256            payload.write_all(bytes)?;
257        } else {
258            payload.write_u32::<LittleEndian>(0)?;
259        }
260
261        // Write metadata (length-prefixed)
262        if let Some(ref meta) = self.metadata {
263            let bytes = meta.as_bytes();
264            payload.write_u32::<LittleEndian>(bytes.len() as u32)?;
265            payload.write_all(bytes)?;
266        } else {
267            payload.write_u32::<LittleEndian>(0)?;
268        }
269
270        // Calculate CRC32 of the payload
271        let crc = crc32fast::hash(&payload);
272
273        // Build final record
274        let total_len = MIN_RECORD_SIZE + payload.len();
275        let mut record = Vec::with_capacity(total_len);
276
277        record.write_u32::<LittleEndian>(total_len as u32)?;
278        record.write_u32::<LittleEndian>(crc)?;
279        record.write_u8(self.record_type as u8)?;
280        record.write_all(self.trace_id.as_uuid().as_bytes())?;
281        record.write_u32::<LittleEndian>(self.node_id.as_u32())?;
282        record.write_all(&payload)?;
283
284        Ok(record)
285    }
286
287    /// Deserialize a record from bytes.
288    pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
289        if bytes.len() < MIN_RECORD_SIZE {
290            return Err(io::Error::new(
291                io::ErrorKind::InvalidData,
292                "Record too small",
293            ));
294        }
295
296        let mut cursor = io::Cursor::new(bytes);
297
298        let total_len = cursor.read_u32::<LittleEndian>()? as usize;
299        let stored_crc = cursor.read_u32::<LittleEndian>()?;
300        let record_type_byte = cursor.read_u8()?;
301
302        let record_type = WalRecordType::try_from(record_type_byte)
303            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
304
305        let mut uuid_bytes = [0u8; 16];
306        cursor.read_exact(&mut uuid_bytes)?;
307        let trace_id = TraceId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes));
308
309        let node_id = NodeId::new(cursor.read_u32::<LittleEndian>()?);
310
311        // Read payload
312        let payload_start = MIN_RECORD_SIZE;
313        let payload_end = total_len;
314
315        if bytes.len() < payload_end {
316            return Err(io::Error::new(
317                io::ErrorKind::InvalidData,
318                format!(
319                    "Record truncated: expected {} bytes, got {}",
320                    total_len,
321                    bytes.len()
322                ),
323            ));
324        }
325
326        let payload = &bytes[payload_start..payload_end];
327
328        // Verify CRC
329        let computed_crc = crc32fast::hash(payload);
330        if computed_crc != stored_crc {
331            return Err(io::Error::new(
332                io::ErrorKind::InvalidData,
333                format!(
334                    "CRC mismatch: expected {}, got {}",
335                    stored_crc, computed_crc
336                ),
337            ));
338        }
339
340        // Parse payload
341        let mut payload_cursor = io::Cursor::new(payload);
342
343        let timestamp_ns = payload_cursor.read_u64::<LittleEndian>()?;
344        let output_offset = ArenaOffset::new(payload_cursor.read_u64::<LittleEndian>()?);
345        let output_size = payload_cursor.read_u32::<LittleEndian>()?;
346        let schema_hash = payload_cursor.read_u64::<LittleEndian>()?;
347        let iteration = payload_cursor.read_u32::<LittleEndian>()?;
348
349        // Read error message
350        let error_len = payload_cursor.read_u32::<LittleEndian>()? as usize;
351        let error_message = if error_len > 0 {
352            let mut buf = vec![0u8; error_len];
353            payload_cursor.read_exact(&mut buf)?;
354            Some(String::from_utf8_lossy(&buf).into_owned())
355        } else {
356            None
357        };
358
359        // Read metadata
360        let meta_len = payload_cursor.read_u32::<LittleEndian>()? as usize;
361        let metadata = if meta_len > 0 {
362            let mut buf = vec![0u8; meta_len];
363            payload_cursor.read_exact(&mut buf)?;
364            Some(String::from_utf8_lossy(&buf).into_owned())
365        } else {
366            None
367        };
368
369        Ok(Self {
370            record_type,
371            trace_id,
372            node_id,
373            timestamp_ns,
374            output_offset,
375            output_size,
376            schema_hash,
377            error_message,
378            iteration,
379            metadata,
380        })
381    }
382
383    /// Get the total serialized size of this record.
384    pub fn serialized_size(&self) -> usize {
385        let mut size = MIN_RECORD_SIZE;
386        size += 8 + 8 + 4 + 8 + 4; // Fixed payload fields
387        size += 4; // Error message length
388        if let Some(ref msg) = self.error_message {
389            size += msg.len();
390        }
391        size += 4; // Metadata length
392        if let Some(ref meta) = self.metadata {
393            size += meta.len();
394        }
395        size
396    }
397}
398
399/// Get current timestamp in nanoseconds since Unix epoch.
400fn current_timestamp_ns() -> u64 {
401    std::time::SystemTime::now()
402        .duration_since(std::time::UNIX_EPOCH)
403        .map(|d| d.as_nanos() as u64)
404        .unwrap_or(0)
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn record_roundtrip() {
413        let trace_id = TraceId::new();
414        let node_id = NodeId::new(42);
415
416        let record =
417            WalRecord::node_done(trace_id, node_id, ArenaOffset::new(0x1000), 256, 0xDEADBEEF);
418
419        let bytes = record.to_bytes().unwrap();
420        let restored = WalRecord::from_bytes(&bytes).unwrap();
421
422        assert_eq!(restored.record_type, WalRecordType::NodeDone);
423        assert_eq!(restored.trace_id, trace_id);
424        assert_eq!(restored.node_id, node_id);
425        assert_eq!(restored.output_offset, ArenaOffset::new(0x1000));
426        assert_eq!(restored.output_size, 256);
427        assert_eq!(restored.schema_hash, 0xDEADBEEF);
428    }
429
430    #[test]
431    fn record_with_error() {
432        let trace_id = TraceId::new();
433        let node_id = NodeId::new(1);
434
435        let record = WalRecord::node_error(trace_id, node_id, "Something went wrong");
436
437        let bytes = record.to_bytes().unwrap();
438        let restored = WalRecord::from_bytes(&bytes).unwrap();
439
440        assert_eq!(restored.record_type, WalRecordType::NodeError);
441        assert_eq!(
442            restored.error_message.as_deref(),
443            Some("Something went wrong")
444        );
445    }
446
447    #[test]
448    fn record_with_metadata() {
449        let trace_id = TraceId::new();
450
451        let record =
452            WalRecord::trace_start(trace_id).with_metadata(r#"{"pipeline": "order_processing"}"#);
453
454        let bytes = record.to_bytes().unwrap();
455        let restored = WalRecord::from_bytes(&bytes).unwrap();
456
457        assert_eq!(
458            restored.metadata.as_deref(),
459            Some(r#"{"pipeline": "order_processing"}"#)
460        );
461    }
462
463    #[test]
464    fn crc_verification() {
465        let record = WalRecord::trace_start(TraceId::new());
466        let mut bytes = record.to_bytes().unwrap();
467
468        // Corrupt a byte in the payload
469        if bytes.len() > MIN_RECORD_SIZE {
470            bytes[MIN_RECORD_SIZE] ^= 0xFF;
471        }
472
473        // Should fail CRC check
474        assert!(WalRecord::from_bytes(&bytes).is_err());
475    }
476}