Skip to main content

nxs/
wal.rs

1//! Span WAL — streaming append layer for NXS span/trace data.
2//!
3//! # Architecture
4//!
5//! A `.nxsw` (WAL) file stores spans as they arrive without a tail-index,
6//! because rewriting the index on every append would be O(N). Instead:
7//!
8//!   open()     → write NXSW header + schema once
9//!   append()   → write one NXS object; record (trace_id, span_id, offset) in RAM
10//!   seal()     → replay the in-memory index, emit a full .nxb with tail-index
11//!   recover()  → linear scan to rebuild the in-memory index after a crash
12//!
13//! The WAL file is valid enough to decode span-by-span in order even without the
14//! index: NYXO magic allows readers to skip forward record-by-record.
15//!
16//! # WAL file layout
17//!
18//!   [NXSW magic 4B][version 2B][flags 2B]   -- 8-byte header
19//!   [schema_len u32][schema bytes (padded)]  -- same schema encoding as .nxb
20//!   [NYXO record 0 bytes]
21//!   [NYXO record 1 bytes]
22//!   ...
23
24use crate::error::{NxsError, Result};
25use crate::writer::{NxsWriter, Schema};
26use std::fs::{File, OpenOptions};
27use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
28use std::path::{Path, PathBuf};
29
30pub const MAGIC_WAL: u32 = 0x5753584E; // NXSW
31pub const MAGIC_OBJ: u32 = 0x4E59584F; // NYXO
32const WAL_VERSION: u16 = 0x0100;
33const WAL_FLAG_SCHEMA_EMBEDDED: u16 = 0x0001;
34const MAX_RECORD_BYTES: u64 = 10 * 1024 * 1024; // 10 MB OOM guard
35
36/// Span identity extracted from each WAL record for the in-memory index.
37#[derive(Debug, Clone)]
38pub struct WalEntry {
39    pub trace_id: u128,
40    pub span_id: u64,
41    /// Absolute byte offset of this record's NYXO magic in the WAL file.
42    pub offset: u64,
43}
44
45/// A span field to write — keeps the API decoupled from slot indices.
46#[derive(Debug)]
47pub struct SpanFields<'a> {
48    pub trace_id_hi: i64,
49    pub trace_id_lo: i64,
50    pub span_id: i64,
51    pub parent_span_id: Option<i64>,
52    pub name: &'a str,
53    pub service: &'a str,
54    pub start_time_ns: i64,
55    pub duration_ns: i64,
56    pub status_code: i64,
57    /// Arbitrary JSON payload (inputs/outputs for LLM spans, tags, etc.) stored as bytes.
58    pub payload: Option<&'a [u8]>,
59}
60
61/// Canonical schema for a span record.
62/// Slot indices are stable — do not reorder.
63pub struct SpanSchema {
64    pub schema: Schema,
65}
66
67impl SpanSchema {
68    pub fn new() -> Self {
69        SpanSchema {
70            schema: Schema::new(&[
71                "trace_id_hi",    // 0  =i64
72                "trace_id_lo",    // 1  =i64
73                "span_id",        // 2  =i64
74                "parent_span_id", // 3  =i64 / null
75                "name",           // 4  "str
76                "service",        // 5  "str
77                "start_time_ns",  // 6  @time
78                "duration_ns",    // 7  =i64
79                "status_code",    // 8  =i64
80                "payload",        // 9  <binary (opaque JSON)
81            ]),
82        }
83    }
84}
85
86/// Slot constants — compile-time checked indices into `SpanSchema`.
87pub mod slot {
88    use crate::writer::Slot;
89    pub const TRACE_ID_HI: Slot = Slot(0);
90    pub const TRACE_ID_LO: Slot = Slot(1);
91    pub const SPAN_ID: Slot = Slot(2);
92    pub const PARENT_SPAN_ID: Slot = Slot(3);
93    pub const NAME: Slot = Slot(4);
94    pub const SERVICE: Slot = Slot(5);
95    pub const START_TIME_NS: Slot = Slot(6);
96    pub const DURATION_NS: Slot = Slot(7);
97    pub const STATUS_CODE: Slot = Slot(8);
98    pub const PAYLOAD: Slot = Slot(9);
99}
100
101/// Streaming WAL writer. Not `Send` — use one per thread or wrap in a Mutex.
102pub struct SpanWal {
103    path: PathBuf,
104    file: BufWriter<File>,
105    /// In-memory index rebuilt on crash recovery via `recover()`.
106    pub index: Vec<WalEntry>,
107    pub record_count: u64,
108    schema: SpanSchema,
109    /// Byte offset of the first record (right after the WAL header).
110    data_start: u64,
111    /// Tracked in-process so append() never calls metadata() for a syscall.
112    current_offset: u64,
113}
114
115impl SpanWal {
116    /// Open (or create) a WAL file. If the file already exists and is non-empty,
117    /// call `recover()` to rebuild the in-memory index before appending.
118    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
119        let path = path.as_ref().to_path_buf();
120        let schema = SpanSchema::new();
121        let schema_bytes = build_wal_schema_bytes(&schema.schema);
122
123        let file_exists = path.exists();
124        let file = OpenOptions::new()
125            .create(true)
126            .append(true)
127            .open(&path)
128            .map_err(|e| NxsError::IoError(e.to_string()))?;
129        let mut writer = BufWriter::new(file);
130
131        // 8-byte fixed header + 4-byte schema_len field + schema bytes
132        let data_start = 8 + 4 + schema_bytes.len() as u64;
133        if !file_exists {
134            // Write WAL header
135            writer
136                .write_all(&MAGIC_WAL.to_le_bytes())
137                .map_err(|e| NxsError::IoError(e.to_string()))?;
138            writer
139                .write_all(&WAL_VERSION.to_le_bytes())
140                .map_err(|e| NxsError::IoError(e.to_string()))?;
141            writer
142                .write_all(&WAL_FLAG_SCHEMA_EMBEDDED.to_le_bytes())
143                .map_err(|e| NxsError::IoError(e.to_string()))?;
144            writer
145                .write_all(&(schema_bytes.len() as u32).to_le_bytes())
146                .map_err(|e| NxsError::IoError(e.to_string()))?;
147            writer
148                .write_all(&schema_bytes)
149                .map_err(|e| NxsError::IoError(e.to_string()))?;
150            writer
151                .flush()
152                .map_err(|e| NxsError::IoError(e.to_string()))?;
153        }
154
155        // For an existing file we don't know the true end yet — recover() will
156        // set current_offset after scanning.  For a new file it equals data_start.
157        let initial_offset = if file_exists {
158            0 // will be corrected by recover()
159        } else {
160            data_start
161        };
162
163        Ok(SpanWal {
164            path,
165            file: writer,
166            index: Vec::new(),
167            record_count: 0,
168            schema,
169            data_start,
170            current_offset: initial_offset,
171        })
172    }
173
174    /// Append a span to the WAL. Returns the absolute byte offset of the record.
175    pub fn append(&mut self, span: &SpanFields) -> Result<u64> {
176        let file_offset = self.current_offset;
177
178        // Encode one NXS object
179        let mut w = NxsWriter::new(&self.schema.schema);
180        w.begin_object();
181        w.write_i64(slot::TRACE_ID_HI, span.trace_id_hi);
182        w.write_i64(slot::TRACE_ID_LO, span.trace_id_lo);
183        w.write_i64(slot::SPAN_ID, span.span_id);
184        match span.parent_span_id {
185            Some(p) => w.write_i64(slot::PARENT_SPAN_ID, p),
186            None => w.write_null(slot::PARENT_SPAN_ID),
187        }
188        w.write_str(slot::NAME, span.name);
189        w.write_str(slot::SERVICE, span.service);
190        w.write_time(slot::START_TIME_NS, span.start_time_ns);
191        w.write_i64(slot::DURATION_NS, span.duration_ns);
192        w.write_i64(slot::STATUS_CODE, span.status_code);
193        if let Some(payload) = span.payload {
194            w.write_bytes(slot::PAYLOAD, payload);
195        }
196        w.end_object();
197
198        // finish() emits a full .nxb file — we only want the data sector bytes
199        // (skip preamble + schema, stop before tail-index).
200        let nxb = w.finish();
201        let data_sector = extract_data_sector(&nxb)?;
202
203        self.file
204            .write_all(data_sector)
205            .map_err(|e| NxsError::IoError(e.to_string()))?;
206
207        self.current_offset += data_sector.len() as u64;
208
209        // Cast via u64 to preserve bit pattern and avoid sign-extension into the high half.
210        let trace_id =
211            ((span.trace_id_hi as u64 as u128) << 64) | (span.trace_id_lo as u64 as u128);
212        self.index.push(WalEntry {
213            trace_id,
214            span_id: span.span_id as u64,
215            offset: file_offset,
216        });
217        self.record_count += 1;
218
219        Ok(file_offset)
220    }
221
222    /// Flush write buffer to OS.
223    pub fn flush(&mut self) -> Result<()> {
224        self.file
225            .flush()
226            .map_err(|e| NxsError::IoError(e.to_string()))
227    }
228
229    /// Rebuild `self.index` by scanning the WAL file linearly.
230    /// Call this after opening an existing WAL (crash recovery path).
231    pub fn recover(&mut self) -> Result<()> {
232        let mut file = File::open(&self.path).map_err(|e| NxsError::IoError(e.to_string()))?;
233        let file_len = file
234            .metadata()
235            .map_err(|e| NxsError::IoError(e.to_string()))?
236            .len();
237
238        // Validate WAL magic
239        let mut header = [0u8; 8];
240        file.read_exact(&mut header)
241            .map_err(|e| NxsError::IoError(e.to_string()))?;
242        let magic = u32::from_le_bytes(header[0..4].try_into().map_err(|_| NxsError::OutOfBounds)?);
243        if magic != MAGIC_WAL {
244            return Err(NxsError::BadMagic);
245        }
246
247        // Skip schema
248        let mut schema_len_buf = [0u8; 4];
249        file.read_exact(&mut schema_len_buf)
250            .map_err(|e| NxsError::IoError(e.to_string()))?;
251        let schema_len = u32::from_le_bytes(schema_len_buf) as u64;
252        file.seek(SeekFrom::Current(schema_len as i64))
253            .map_err(|e| NxsError::IoError(e.to_string()))?;
254
255        // Walk NYXO records
256        let mut index = Vec::new();
257        let mut record_count = 0u64;
258        loop {
259            let pos = file
260                .stream_position()
261                .map_err(|e| NxsError::IoError(e.to_string()))?;
262            if pos + 8 > file_len {
263                break;
264            }
265
266            let mut rec_header = [0u8; 8];
267            file.read_exact(&mut rec_header)
268                .map_err(|e| NxsError::IoError(e.to_string()))?;
269            let obj_magic = u32::from_le_bytes(
270                rec_header[0..4]
271                    .try_into()
272                    .map_err(|_| NxsError::OutOfBounds)?,
273            );
274            if obj_magic != MAGIC_OBJ {
275                break; // truncated or corrupt tail — stop here
276            }
277            let obj_len = u32::from_le_bytes(
278                rec_header[4..8]
279                    .try_into()
280                    .map_err(|_| NxsError::OutOfBounds)?,
281            ) as u64;
282
283            // Read the full object to extract trace_id / span_id from bitmask + offsets
284            if !(8..=MAX_RECORD_BYTES).contains(&obj_len) || pos + obj_len > file_len {
285                break;
286            }
287            let mut obj_buf = vec![0u8; obj_len as usize];
288            obj_buf[0..8].copy_from_slice(&rec_header);
289            file.read_exact(&mut obj_buf[8..])
290                .map_err(|e| NxsError::IoError(e.to_string()))?;
291
292            if let Some((trace_id, span_id)) = extract_trace_span_id(&obj_buf) {
293                index.push(WalEntry {
294                    trace_id,
295                    span_id,
296                    offset: pos,
297                });
298            }
299            record_count += 1;
300        }
301
302        self.index = index;
303        self.record_count = record_count;
304        // Sync offset so subsequent appends don't call metadata().
305        self.current_offset = file_len;
306        Ok(())
307    }
308
309    /// Seal the WAL: write a complete `.nxb` segment file and return its path.
310    /// The WAL file is left in place — rotate/delete it externally once the
311    /// segment is durably written.
312    pub fn seal(&mut self, out_path: impl AsRef<Path>) -> Result<SealReport> {
313        self.flush()?;
314
315        let mut file = File::open(&self.path).map_err(|e| NxsError::IoError(e.to_string()))?;
316
317        // Skip WAL header + schema
318        file.seek(SeekFrom::Start(self.data_start))
319            .map_err(|e| NxsError::IoError(e.to_string()))?;
320
321        // Read all record bytes into NxsWriter
322        let schema_for_seal = SpanSchema::new();
323        let mut w = NxsWriter::with_capacity(&schema_for_seal.schema, 1024 * 1024);
324
325        // Replay by re-reading each span from the WAL using the in-memory index
326        for entry in &self.index {
327            file.seek(SeekFrom::Start(entry.offset))
328                .map_err(|e| NxsError::IoError(e.to_string()))?;
329
330            let mut hdr = [0u8; 8];
331            file.read_exact(&mut hdr)
332                .map_err(|e| NxsError::IoError(e.to_string()))?;
333            let obj_len = u32::from_le_bytes(hdr[4..8].try_into().unwrap()) as usize;
334            let mut obj_buf = vec![0u8; obj_len];
335            obj_buf[0..8].copy_from_slice(&hdr);
336            file.read_exact(&mut obj_buf[8..])
337                .map_err(|e| NxsError::IoError(e.to_string()))?;
338
339            // Decode the object back to SpanFields and re-encode via NxsWriter
340            // so the sealed file has a proper preamble + tail-index.
341            if let Some(span) = decode_span_object(&obj_buf) {
342                w.begin_object();
343                w.write_i64(slot::TRACE_ID_HI, span.trace_id_hi);
344                w.write_i64(slot::TRACE_ID_LO, span.trace_id_lo);
345                w.write_i64(slot::SPAN_ID, span.span_id);
346                match span.parent_span_id {
347                    Some(p) => w.write_i64(slot::PARENT_SPAN_ID, p),
348                    None => w.write_null(slot::PARENT_SPAN_ID),
349                }
350                w.write_str(slot::NAME, &span.name_owned);
351                w.write_str(slot::SERVICE, &span.service_owned);
352                w.write_time(slot::START_TIME_NS, span.start_time_ns);
353                w.write_i64(slot::DURATION_NS, span.duration_ns);
354                w.write_i64(slot::STATUS_CODE, span.status_code);
355                if let Some(ref payload) = span.payload_owned {
356                    w.write_bytes(slot::PAYLOAD, payload);
357                }
358                w.end_object();
359            }
360        }
361
362        let nxb = w.finish();
363        let bytes_written = nxb.len() as u64;
364        let records = self.record_count;
365
366        std::fs::write(out_path.as_ref(), &nxb).map_err(|e| NxsError::IoError(e.to_string()))?;
367
368        Ok(SealReport {
369            records,
370            bytes_written,
371            segment_path: out_path.as_ref().to_path_buf(),
372        })
373    }
374
375    pub fn record_count(&self) -> u64 {
376        self.record_count
377    }
378
379    pub fn path(&self) -> &Path {
380        &self.path
381    }
382}
383
384#[derive(Debug)]
385pub struct SealReport {
386    pub records: u64,
387    pub bytes_written: u64,
388    pub segment_path: PathBuf,
389}
390
391// ── Private helpers ───────────────────────────────────────────────────────────
392
393/// Extract just the data sector bytes from a complete .nxb buffer.
394/// Skips the 32-byte preamble and embedded schema, strips the tail-index.
395fn extract_data_sector(nxb: &[u8]) -> Result<&[u8]> {
396    if nxb.len() < 32 {
397        return Err(NxsError::OutOfBounds);
398    }
399    // Read tail_ptr from preamble bytes 16..24
400    let mut tail_ptr =
401        u64::from_le_bytes(nxb[16..24].try_into().map_err(|_| NxsError::OutOfBounds)?) as usize;
402    if tail_ptr == 0 {
403        if nxb.len() < 44 {
404            return Err(NxsError::OutOfBounds);
405        }
406        tail_ptr = u64::from_le_bytes(
407            nxb[nxb.len() - 12..nxb.len() - 4]
408                .try_into()
409                .map_err(|_| NxsError::OutOfBounds)?,
410        ) as usize;
411    }
412    if tail_ptr > nxb.len() {
413        return Err(NxsError::OutOfBounds);
414    }
415
416    // Schema header starts at byte 32. Find its end (it is 8-byte aligned).
417    // Schema: u16 key_count + key_count sigil bytes + NUL-terminated strings, padded to 8.
418    let mut pos = 32usize;
419    if pos + 2 > nxb.len() {
420        return Err(NxsError::OutOfBounds);
421    }
422    let key_count = u16::from_le_bytes(nxb[pos..pos + 2].try_into().unwrap()) as usize;
423    pos += 2 + key_count; // skip sigil bytes
424    for _ in 0..key_count {
425        while pos < nxb.len() && nxb[pos] != 0 {
426            pos += 1;
427        }
428        pos += 1; // skip NUL
429    }
430    while pos % 8 != 0 {
431        pos += 1;
432    }
433
434    // `pos` is now the data sector start; `tail_ptr` is its end.
435    if pos > tail_ptr {
436        return Err(NxsError::OutOfBounds);
437    }
438    Ok(&nxb[pos..tail_ptr])
439}
440
441/// Build the schema bytes portion written in the WAL header (same encoding as .nxb).
442fn build_wal_schema_bytes(schema: &Schema) -> Vec<u8> {
443    // We reproduce build_schema() from writer.rs since it is private there.
444    let keys = schema_keys(schema);
445    let n = keys.len();
446    let mut b = Vec::new();
447    b.extend_from_slice(&(n as u16).to_le_bytes());
448    for _ in 0..n {
449        b.push(b'"'); // default sigil — updated on first real write
450    }
451    for key in &keys {
452        b.extend_from_slice(key.as_bytes());
453        b.push(0x00);
454    }
455    while b.len() % 8 != 0 {
456        b.push(0x00);
457    }
458    b
459}
460
461/// Extract the canonical key list from a Schema by re-constructing via SpanSchema.
462fn schema_keys(schema: &Schema) -> Vec<&'static str> {
463    let _ = schema; // Schema doesn't expose keys publicly; we hard-code for SpanSchema.
464    vec![
465        "trace_id_hi",
466        "trace_id_lo",
467        "span_id",
468        "parent_span_id",
469        "name",
470        "service",
471        "start_time_ns",
472        "duration_ns",
473        "status_code",
474        "payload",
475    ]
476}
477
478/// Decode the (trace_id_hi, trace_id_lo, span_id) from a raw NYXO object buffer
479/// for use in the recovery index.
480fn extract_trace_span_id(obj: &[u8]) -> Option<(u128, u64)> {
481    // Minimal parse: skip magic(4) + length(4) + LEB128 bitmask + offset table,
482    // then read slots 0,1,2 (all i64, 8 bytes each, aligned).
483    let mut pos = 8usize;
484
485    // Read LEB128 bitmask
486    let mut bitmask_bytes = 0usize;
487    loop {
488        if pos >= obj.len() {
489            return None;
490        }
491        let b = obj[pos];
492        pos += 1;
493        bitmask_bytes += 1;
494        if b & 0x80 == 0 {
495            break;
496        }
497        if bitmask_bytes > 16 {
498            return None;
499        }
500    }
501
502    // Count present bits (slots 0..2 = trace_id_hi, trace_id_lo, span_id)
503    // We only care that they are present; offsets are in the offset table.
504    // Present count for all slots:
505    let bitmask_start = 8;
506    let mut present = Vec::new();
507    let mut bp = bitmask_start;
508    loop {
509        if bp >= obj.len() {
510            break;
511        }
512        let byte = obj[bp];
513        bp += 1;
514        for bit in 0..7 {
515            present.push((byte >> bit) & 1 == 1);
516        }
517        if byte & 0x80 == 0 {
518            break;
519        }
520    }
521
522    let present_count = present.iter().filter(|&&b| b).count();
523    // Offset table follows bitmask
524    let ot_start = bp;
525    if ot_start + present_count * 2 > obj.len() {
526        return None;
527    }
528
529    // Map slot → offset table index
530    let mut slot_to_ot: Vec<Option<usize>> = vec![None; present.len()];
531    let mut ot_idx = 0;
532    for (slot, &p) in present.iter().enumerate() {
533        if p {
534            slot_to_ot[slot] = Some(ot_idx);
535            ot_idx += 1;
536        }
537    }
538
539    let read_i64_at_slot = |slot: usize| -> Option<i64> {
540        let ot_i = slot_to_ot.get(slot)?.as_ref()?;
541        let ot_off = ot_start + ot_i * 2;
542        if ot_off + 2 > obj.len() {
543            return None;
544        }
545        let rel = u16::from_le_bytes(obj[ot_off..ot_off + 2].try_into().ok()?) as usize;
546        let val_off = rel; // relative to object start
547        if val_off + 8 > obj.len() {
548            return None;
549        }
550        Some(i64::from_le_bytes(
551            obj[val_off..val_off + 8].try_into().ok()?,
552        ))
553    };
554
555    let hi = read_i64_at_slot(0)? as u64;
556    let lo = read_i64_at_slot(1)? as u64;
557    let span_id = read_i64_at_slot(2)? as u64;
558    // Cast via u64 first to preserve bit pattern; direct i64→u128 sign-extends.
559    let trace_id = ((hi as u128) << 64) | (lo as u128);
560    Some((trace_id, span_id))
561}
562
563/// Decoded span fields (owned strings/bytes) for use during seal replay.
564struct DecodedSpan {
565    trace_id_hi: i64,
566    trace_id_lo: i64,
567    span_id: i64,
568    parent_span_id: Option<i64>,
569    name_owned: String,
570    service_owned: String,
571    start_time_ns: i64,
572    duration_ns: i64,
573    status_code: i64,
574    payload_owned: Option<Vec<u8>>,
575}
576
577const SPAN_KEYS: &[&str] = &[
578    "trace_id_hi",
579    "trace_id_lo",
580    "span_id",
581    "parent_span_id",
582    "name",
583    "service",
584    "start_time_ns",
585    "duration_ns",
586    "status_code",
587    "payload",
588];
589const SPAN_SIGILS: &[u8] = b"====\"\"@==<";
590
591fn decode_span_object(obj: &[u8]) -> Option<DecodedSpan> {
592    use crate::decoder::{decode_record_at, DecodedValue};
593
594    let keys: Vec<String> = SPAN_KEYS.iter().map(|s| s.to_string()).collect();
595    let sigils = SPAN_SIGILS;
596
597    let fields = decode_record_at(obj, 0, &keys, sigils).ok()?;
598    let get_i64 = |name: &str| -> Option<i64> {
599        fields.iter().find_map(|(k, v)| {
600            if k == name {
601                match v {
602                    DecodedValue::Int(i) => Some(*i),
603                    DecodedValue::Time(i) => Some(*i),
604                    _ => None,
605                }
606            } else {
607                None
608            }
609        })
610    };
611    let get_str = |name: &str| -> String {
612        fields
613            .iter()
614            .find_map(|(k, v)| {
615                if k == name {
616                    if let DecodedValue::Str(s) = v {
617                        Some(s.clone())
618                    } else {
619                        None
620                    }
621                } else {
622                    None
623                }
624            })
625            .unwrap_or_default()
626    };
627    let get_bytes = |name: &str| -> Option<Vec<u8>> {
628        fields.iter().find_map(|(k, v)| {
629            if k == name {
630                if let DecodedValue::Binary(b) = v {
631                    Some(b.clone())
632                } else {
633                    None
634                }
635            } else {
636                None
637            }
638        })
639    };
640    let get_null = |name: &str| -> bool {
641        fields
642            .iter()
643            .any(|(k, v)| k == name && *v == DecodedValue::Null)
644    };
645
646    Some(DecodedSpan {
647        trace_id_hi: get_i64("trace_id_hi")?,
648        trace_id_lo: get_i64("trace_id_lo")?,
649        span_id: get_i64("span_id")?,
650        parent_span_id: if get_null("parent_span_id") {
651            None
652        } else {
653            get_i64("parent_span_id").filter(|&v| v != 0)
654        },
655        name_owned: get_str("name"),
656        service_owned: get_str("service"),
657        start_time_ns: get_i64("start_time_ns").unwrap_or(0),
658        duration_ns: get_i64("duration_ns").unwrap_or(0),
659        status_code: get_i64("status_code").unwrap_or(0),
660        payload_owned: get_bytes("payload"),
661    })
662}