xerv-core 0.1.0

Workflow orchestration core: memory-mapped arena, write-ahead log, traits, and type system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
//! WAL record types and serialization.

use crate::types::{ArenaOffset, NodeId, TraceId};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::io::{self, Read, Write};

/// Minimum record size (header without payload).
pub const MIN_RECORD_SIZE: usize = 4 + 4 + 1 + 16 + 4; // 29 bytes

/// Type of WAL record.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum WalRecordType {
    /// Trace started.
    TraceStart = 0,
    /// Node execution started.
    NodeStart = 1,
    /// Node execution completed successfully.
    NodeDone = 2,
    /// Node execution failed.
    NodeError = 3,
    /// Trace completed successfully.
    TraceComplete = 4,
    /// Trace failed.
    TraceFailed = 5,
    /// Trace suspended (e.g., at a wait node).
    TraceSuspended = 6,
    /// Trace resumed.
    TraceResumed = 7,
    /// Checkpoint marker.
    Checkpoint = 8,
    /// Loop iteration started.
    LoopIteration = 9,
    /// Loop exited.
    LoopExit = 10,
}

impl TryFrom<u8> for WalRecordType {
    type Error = &'static str;

    fn try_from(value: u8) -> Result<Self, Self::Error> {
        match value {
            0 => Ok(Self::TraceStart),
            1 => Ok(Self::NodeStart),
            2 => Ok(Self::NodeDone),
            3 => Ok(Self::NodeError),
            4 => Ok(Self::TraceComplete),
            5 => Ok(Self::TraceFailed),
            6 => Ok(Self::TraceSuspended),
            7 => Ok(Self::TraceResumed),
            8 => Ok(Self::Checkpoint),
            9 => Ok(Self::LoopIteration),
            10 => Ok(Self::LoopExit),
            _ => Err("Unknown WAL record type"),
        }
    }
}

/// A single WAL record.
#[derive(Debug, Clone)]
pub struct WalRecord {
    /// Type of this record.
    pub record_type: WalRecordType,
    /// The trace this record belongs to.
    pub trace_id: TraceId,
    /// The node this record is about (if applicable).
    pub node_id: NodeId,
    /// Timestamp (Unix epoch nanoseconds).
    pub timestamp_ns: u64,
    /// Arena offset for output data (if applicable).
    pub output_offset: ArenaOffset,
    /// Output size in bytes.
    pub output_size: u32,
    /// Schema version hash.
    pub schema_hash: u64,
    /// Error message (if applicable).
    pub error_message: Option<String>,
    /// Loop iteration number (for loop records).
    pub iteration: u32,
    /// Additional metadata (JSON-encoded).
    pub metadata: Option<String>,
}

impl WalRecord {
    /// Create a trace start record.
    pub fn trace_start(trace_id: TraceId) -> Self {
        Self {
            record_type: WalRecordType::TraceStart,
            trace_id,
            node_id: NodeId::new(0),
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: None,
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a node start record.
    pub fn node_start(trace_id: TraceId, node_id: NodeId) -> Self {
        Self {
            record_type: WalRecordType::NodeStart,
            trace_id,
            node_id,
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: None,
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a node done record.
    pub fn node_done(
        trace_id: TraceId,
        node_id: NodeId,
        output_offset: ArenaOffset,
        output_size: u32,
        schema_hash: u64,
    ) -> Self {
        Self {
            record_type: WalRecordType::NodeDone,
            trace_id,
            node_id,
            timestamp_ns: current_timestamp_ns(),
            output_offset,
            output_size,
            schema_hash,
            error_message: None,
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a node error record.
    pub fn node_error(trace_id: TraceId, node_id: NodeId, error: impl ToString) -> Self {
        Self {
            record_type: WalRecordType::NodeError,
            trace_id,
            node_id,
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: Some(error.to_string()),
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a trace complete record.
    pub fn trace_complete(trace_id: TraceId) -> Self {
        Self {
            record_type: WalRecordType::TraceComplete,
            trace_id,
            node_id: NodeId::new(0),
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: None,
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a trace failed record.
    pub fn trace_failed(trace_id: TraceId, error: impl ToString) -> Self {
        Self {
            record_type: WalRecordType::TraceFailed,
            trace_id,
            node_id: NodeId::new(0),
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: Some(error.to_string()),
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a trace suspended record.
    pub fn trace_suspended(trace_id: TraceId, node_id: NodeId) -> Self {
        Self {
            record_type: WalRecordType::TraceSuspended,
            trace_id,
            node_id,
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: None,
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a trace resumed record (for crash recovery or wait resumption).
    pub fn trace_resumed(trace_id: TraceId) -> Self {
        Self {
            record_type: WalRecordType::TraceResumed,
            trace_id,
            node_id: NodeId::new(0),
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: None,
            iteration: 0,
            metadata: None,
        }
    }

    /// Create a loop iteration record.
    pub fn loop_iteration(trace_id: TraceId, node_id: NodeId, iteration: u32) -> Self {
        Self {
            record_type: WalRecordType::LoopIteration,
            trace_id,
            node_id,
            timestamp_ns: current_timestamp_ns(),
            output_offset: ArenaOffset::NULL,
            output_size: 0,
            schema_hash: 0,
            error_message: None,
            iteration,
            metadata: None,
        }
    }

    /// Set metadata on this record.
    pub fn with_metadata(mut self, metadata: impl ToString) -> Self {
        self.metadata = Some(metadata.to_string());
        self
    }

    /// Serialize the record to bytes.
    pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
        let mut payload = Vec::new();

        // Write fixed fields
        payload.write_u64::<LittleEndian>(self.timestamp_ns)?;
        payload.write_u64::<LittleEndian>(self.output_offset.as_u64())?;
        payload.write_u32::<LittleEndian>(self.output_size)?;
        payload.write_u64::<LittleEndian>(self.schema_hash)?;
        payload.write_u32::<LittleEndian>(self.iteration)?;

        // Write error message (length-prefixed)
        if let Some(ref msg) = self.error_message {
            let bytes = msg.as_bytes();
            payload.write_u32::<LittleEndian>(bytes.len() as u32)?;
            payload.write_all(bytes)?;
        } else {
            payload.write_u32::<LittleEndian>(0)?;
        }

        // Write metadata (length-prefixed)
        if let Some(ref meta) = self.metadata {
            let bytes = meta.as_bytes();
            payload.write_u32::<LittleEndian>(bytes.len() as u32)?;
            payload.write_all(bytes)?;
        } else {
            payload.write_u32::<LittleEndian>(0)?;
        }

        // Calculate CRC32 of the payload
        let crc = crc32fast::hash(&payload);

        // Build final record
        let total_len = MIN_RECORD_SIZE + payload.len();
        let mut record = Vec::with_capacity(total_len);

        record.write_u32::<LittleEndian>(total_len as u32)?;
        record.write_u32::<LittleEndian>(crc)?;
        record.write_u8(self.record_type as u8)?;
        record.write_all(self.trace_id.as_uuid().as_bytes())?;
        record.write_u32::<LittleEndian>(self.node_id.as_u32())?;
        record.write_all(&payload)?;

        Ok(record)
    }

    /// Deserialize a record from bytes.
    pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
        if bytes.len() < MIN_RECORD_SIZE {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                "Record too small",
            ));
        }

        let mut cursor = io::Cursor::new(bytes);

        let total_len = cursor.read_u32::<LittleEndian>()? as usize;
        let stored_crc = cursor.read_u32::<LittleEndian>()?;
        let record_type_byte = cursor.read_u8()?;

        let record_type = WalRecordType::try_from(record_type_byte)
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

        let mut uuid_bytes = [0u8; 16];
        cursor.read_exact(&mut uuid_bytes)?;
        let trace_id = TraceId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes));

        let node_id = NodeId::new(cursor.read_u32::<LittleEndian>()?);

        // Read payload
        let payload_start = MIN_RECORD_SIZE;
        let payload_end = total_len;

        if bytes.len() < payload_end {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!(
                    "Record truncated: expected {} bytes, got {}",
                    total_len,
                    bytes.len()
                ),
            ));
        }

        let payload = &bytes[payload_start..payload_end];

        // Verify CRC
        let computed_crc = crc32fast::hash(payload);
        if computed_crc != stored_crc {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!(
                    "CRC mismatch: expected {}, got {}",
                    stored_crc, computed_crc
                ),
            ));
        }

        // Parse payload
        let mut payload_cursor = io::Cursor::new(payload);

        let timestamp_ns = payload_cursor.read_u64::<LittleEndian>()?;
        let output_offset = ArenaOffset::new(payload_cursor.read_u64::<LittleEndian>()?);
        let output_size = payload_cursor.read_u32::<LittleEndian>()?;
        let schema_hash = payload_cursor.read_u64::<LittleEndian>()?;
        let iteration = payload_cursor.read_u32::<LittleEndian>()?;

        // Read error message
        let error_len = payload_cursor.read_u32::<LittleEndian>()? as usize;
        let error_message = if error_len > 0 {
            let mut buf = vec![0u8; error_len];
            payload_cursor.read_exact(&mut buf)?;
            Some(String::from_utf8_lossy(&buf).into_owned())
        } else {
            None
        };

        // Read metadata
        let meta_len = payload_cursor.read_u32::<LittleEndian>()? as usize;
        let metadata = if meta_len > 0 {
            let mut buf = vec![0u8; meta_len];
            payload_cursor.read_exact(&mut buf)?;
            Some(String::from_utf8_lossy(&buf).into_owned())
        } else {
            None
        };

        Ok(Self {
            record_type,
            trace_id,
            node_id,
            timestamp_ns,
            output_offset,
            output_size,
            schema_hash,
            error_message,
            iteration,
            metadata,
        })
    }

    /// Get the total serialized size of this record.
    pub fn serialized_size(&self) -> usize {
        let mut size = MIN_RECORD_SIZE;
        size += 8 + 8 + 4 + 8 + 4; // Fixed payload fields
        size += 4; // Error message length
        if let Some(ref msg) = self.error_message {
            size += msg.len();
        }
        size += 4; // Metadata length
        if let Some(ref meta) = self.metadata {
            size += meta.len();
        }
        size
    }
}

/// Get current timestamp in nanoseconds since Unix epoch.
fn current_timestamp_ns() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_nanos() as u64)
        .unwrap_or(0)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn record_roundtrip() {
        let trace_id = TraceId::new();
        let node_id = NodeId::new(42);

        let record =
            WalRecord::node_done(trace_id, node_id, ArenaOffset::new(0x1000), 256, 0xDEADBEEF);

        let bytes = record.to_bytes().unwrap();
        let restored = WalRecord::from_bytes(&bytes).unwrap();

        assert_eq!(restored.record_type, WalRecordType::NodeDone);
        assert_eq!(restored.trace_id, trace_id);
        assert_eq!(restored.node_id, node_id);
        assert_eq!(restored.output_offset, ArenaOffset::new(0x1000));
        assert_eq!(restored.output_size, 256);
        assert_eq!(restored.schema_hash, 0xDEADBEEF);
    }

    #[test]
    fn record_with_error() {
        let trace_id = TraceId::new();
        let node_id = NodeId::new(1);

        let record = WalRecord::node_error(trace_id, node_id, "Something went wrong");

        let bytes = record.to_bytes().unwrap();
        let restored = WalRecord::from_bytes(&bytes).unwrap();

        assert_eq!(restored.record_type, WalRecordType::NodeError);
        assert_eq!(
            restored.error_message.as_deref(),
            Some("Something went wrong")
        );
    }

    #[test]
    fn record_with_metadata() {
        let trace_id = TraceId::new();

        let record =
            WalRecord::trace_start(trace_id).with_metadata(r#"{"pipeline": "order_processing"}"#);

        let bytes = record.to_bytes().unwrap();
        let restored = WalRecord::from_bytes(&bytes).unwrap();

        assert_eq!(
            restored.metadata.as_deref(),
            Some(r#"{"pipeline": "order_processing"}"#)
        );
    }

    #[test]
    fn crc_verification() {
        let record = WalRecord::trace_start(TraceId::new());
        let mut bytes = record.to_bytes().unwrap();

        // Corrupt a byte in the payload
        if bytes.len() > MIN_RECORD_SIZE {
            bytes[MIN_RECORD_SIZE] ^= 0xFF;
        }

        // Should fail CRC check
        assert!(WalRecord::from_bytes(&bytes).is_err());
    }
}