Skip to main content

nxs/
segment_reader.rs

1//! Multi-segment span reader — queries across a set of sealed .nxb files
2//! plus an optional live .nxsw WAL.
3//!
4//! # Memory model
5//!
6//! Each sealed `.nxb` is mapped with [`memmap2::Mmap`] (no full-file `Vec` at open).
7//! Per segment, an in-memory index `trace_id → [absolute offsets]` is built by scanning
8//! the tail index once — **O(records)** heap for the index, not O(file size). WAL bytes
9//! are still loaded into a `Vec` for the live `.nxsw` path.
10//!
11//! # Usage
12//!
13//!   let reader = SegmentReader::open("traces/")? ;
14//!   // find all spans for a trace
15//!   let spans = reader.find_by_trace(trace_id)?;
16//!   // find one span by (trace_id, span_id)
17//!   let span  = reader.find_span(trace_id, span_id)?;
18
19use crate::decoder::{decode, decode_record_at, DecodedValue};
20use crate::error::{NxsError, Result};
21use crate::wal::SpanWal;
22use memmap2::Mmap;
23use std::collections::HashMap;
24use std::fs::{self, File};
25use std::path::{Path, PathBuf};
26
27/// A decoded span record returned by queries.
28#[derive(Debug, Clone)]
29pub struct Span {
30    pub trace_id: u128,
31    pub span_id: u64,
32    pub parent_span_id: Option<u64>,
33    pub name: String,
34    pub service: String,
35    pub start_time_ns: i64,
36    pub duration_ns: i64,
37    pub status_code: i64,
38    pub payload: Option<Vec<u8>>,
39}
40
41/// Queries sealed segments + live WAL for span data.
42pub struct SegmentReader {
43    segments: Vec<SealedSegment>,
44    wal: Option<WalReader>,
45}
46
47struct SealedSegment {
48    path: PathBuf,
49    _file: File,
50    mmap: Mmap,
51    /// (trace_id → [span absolute offsets]) built from the tail-index.
52    index: HashMap<u128, Vec<u64>>,
53    keys: Vec<String>,
54    sigils: Vec<u8>,
55}
56
57/// Read-only view of a live WAL — data loaded once at open time.
58struct WalReader {
59    wal: SpanWal,
60    /// Raw WAL file bytes cached so queries don't re-read the file each time.
61    data: Vec<u8>,
62}
63
64impl SegmentReader {
65    /// Open all `.nxb` files and at most one `.nxsw` file found under `dir`.
66    pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
67        let dir = dir.as_ref();
68        let mut segments = Vec::new();
69        let mut wal_path: Option<PathBuf> = None;
70
71        let entries = fs::read_dir(dir).map_err(|e| NxsError::IoError(e.to_string()))?;
72        for entry in entries {
73            let entry = entry.map_err(|e| NxsError::IoError(e.to_string()))?;
74            let path = entry.path();
75            match path.extension().and_then(|e| e.to_str()) {
76                Some("nxb") => {
77                    let seg = SealedSegment::load(path)?;
78                    segments.push(seg);
79                }
80                Some("nxsw") => {
81                    // Last .nxsw wins (there should only be one live WAL)
82                    wal_path = Some(path);
83                }
84                _ => {}
85            }
86        }
87
88        // Sort segments by filename for deterministic ordering (oldest first)
89        segments.sort_by(|a, b| a.path.cmp(&b.path));
90
91        let wal = if let Some(p) = wal_path {
92            let mut w = SpanWal::open(&p)?;
93            w.recover()?;
94            let data = fs::read(w.path()).map_err(|e| NxsError::IoError(e.to_string()))?;
95            Some(WalReader { wal: w, data })
96        } else {
97            None
98        };
99
100        Ok(SegmentReader { segments, wal })
101    }
102
103    /// Return all spans belonging to a trace, sorted by start_time_ns.
104    pub fn find_by_trace(&self, trace_id: u128) -> Result<Vec<Span>> {
105        let mut spans = Vec::new();
106
107        for seg in &self.segments {
108            if let Some(offsets) = seg.index.get(&trace_id) {
109                for &abs_off in offsets {
110                    if let Ok(span) = seg.decode_span_at(abs_off) {
111                        spans.push(span);
112                    }
113                }
114            }
115        }
116
117        if let Some(ref wr) = self.wal {
118            for entry in &wr.wal.index {
119                if entry.trace_id == trace_id {
120                    if let Some(span) = decode_span_from_raw(&wr.data, entry.offset as usize) {
121                        spans.push(span);
122                    }
123                }
124            }
125        }
126
127        spans.sort_by_key(|s| s.start_time_ns);
128        Ok(spans)
129    }
130
131    /// Find a specific span by (trace_id, span_id).
132    pub fn find_span(&self, trace_id: u128, span_id: u64) -> Result<Option<Span>> {
133        let spans = self.find_by_trace(trace_id)?;
134        Ok(spans.into_iter().find(|s| s.span_id == span_id))
135    }
136
137    /// Return spans in a time window across all segments.
138    pub fn find_by_time(&self, start_ns: i64, end_ns: i64) -> Result<Vec<Span>> {
139        let mut spans = Vec::new();
140
141        for seg in &self.segments {
142            for offsets in seg.index.values() {
143                for &abs_off in offsets {
144                    if let Ok(span) = seg.decode_span_at(abs_off) {
145                        if span.start_time_ns >= start_ns && span.start_time_ns <= end_ns {
146                            spans.push(span);
147                        }
148                    }
149                }
150            }
151        }
152
153        if let Some(ref wr) = self.wal {
154            for entry in &wr.wal.index {
155                if let Some(span) = decode_span_from_raw(&wr.data, entry.offset as usize) {
156                    if span.start_time_ns >= start_ns && span.start_time_ns <= end_ns {
157                        spans.push(span);
158                    }
159                }
160            }
161        }
162
163        spans.sort_by_key(|s| s.start_time_ns);
164        Ok(spans)
165    }
166
167    /// Summary: how many segments + records are loaded.
168    pub fn stats(&self) -> ReaderStats {
169        let segment_count = self.segments.len();
170        let sealed_records: u64 = self
171            .segments
172            .iter()
173            .map(|s| s.index.values().map(|v| v.len() as u64).sum::<u64>())
174            .sum();
175        let wal_records = self.wal.as_ref().map(|w| w.wal.record_count()).unwrap_or(0);
176        ReaderStats {
177            segment_count,
178            sealed_records,
179            wal_records,
180        }
181    }
182}
183
184#[derive(Debug)]
185pub struct ReaderStats {
186    pub segment_count: usize,
187    pub sealed_records: u64,
188    pub wal_records: u64,
189}
190
191impl SealedSegment {
192    fn load(path: PathBuf) -> Result<Self> {
193        let file = File::open(&path).map_err(|e| NxsError::IoError(e.to_string()))?;
194        let mmap = unsafe {
195            Mmap::map(&file)
196                .map_err(|e| NxsError::IoError(format!("mmap {}: {e}", path.display())))?
197        };
198        let data = &mmap[..];
199        let decoded = decode(data)?;
200
201        // Build trace_id → [offsets] index from the tail-index.
202        // Each tail-index entry: KeyID(u16) + AbsoluteOffset(u64) = 10 bytes
203        let mut index: HashMap<u128, Vec<u64>> = HashMap::new();
204        let tail_start = decoded.tail_start;
205        let tail_end = data.len().saturating_sub(12); // strip FooterTailPtr + MagicFooter
206
207        let mut pos = tail_start;
208        while pos + 10 <= tail_end {
209            let _key_id = u16::from_le_bytes(
210                data[pos..pos + 2]
211                    .try_into()
212                    .map_err(|_| NxsError::OutOfBounds)?,
213            );
214            let abs_off = u64::from_le_bytes(
215                data[pos + 2..pos + 10]
216                    .try_into()
217                    .map_err(|_| NxsError::OutOfBounds)?,
218            );
219            pos += 10;
220
221            // Extract trace_id from the span at abs_off to build the lookup map
222            if abs_off as usize + 8 <= data.len() {
223                let fields =
224                    decode_record_at(data, abs_off as usize, &decoded.keys, &decoded.key_sigils)
225                        .unwrap_or_default();
226                if let Some(trace_id) = extract_trace_id(&fields) {
227                    index.entry(trace_id).or_default().push(abs_off);
228                }
229            }
230        }
231
232        Ok(SealedSegment {
233            path,
234            _file: file,
235            mmap,
236            index,
237            keys: decoded.keys,
238            sigils: decoded.key_sigils,
239        })
240    }
241
242    fn data(&self) -> &[u8] {
243        &self.mmap
244    }
245
246    fn decode_span_at(&self, abs_off: u64) -> Result<Span> {
247        let fields = decode_record_at(self.data(), abs_off as usize, &self.keys, &self.sigils)?;
248        fields_to_span(&fields).ok_or(NxsError::OutOfBounds)
249    }
250}
251
252// ── Field helpers ─────────────────────────────────────────────────────────────
253
254fn extract_trace_id(fields: &[(String, DecodedValue)]) -> Option<u128> {
255    let hi = get_i64(fields, "trace_id_hi")? as u64;
256    let lo = get_i64(fields, "trace_id_lo")? as u64;
257    Some(((hi as u128) << 64) | lo as u128)
258}
259
260fn fields_to_span(fields: &[(String, DecodedValue)]) -> Option<Span> {
261    let trace_id = extract_trace_id(fields)?;
262    let span_id = get_i64(fields, "span_id")? as u64;
263    // A null parent is stored as either DecodedValue::Null or as Int(0).
264    // Span IDs of 0 are invalid per the OpenTelemetry spec, so 0 == absent.
265    let parent_span_id = if is_null(fields, "parent_span_id") {
266        None
267    } else {
268        get_i64(fields, "parent_span_id")
269            .map(|v| v as u64)
270            .filter(|&v| v != 0)
271    };
272
273    Some(Span {
274        trace_id,
275        span_id,
276        parent_span_id,
277        name: get_str(fields, "name"),
278        service: get_str(fields, "service"),
279        start_time_ns: get_i64(fields, "start_time_ns").unwrap_or(0),
280        duration_ns: get_i64(fields, "duration_ns").unwrap_or(0),
281        status_code: get_i64(fields, "status_code").unwrap_or(0),
282        payload: get_bytes(fields, "payload"),
283    })
284}
285
286const SPAN_KEYS: &[&str] = &[
287    "trace_id_hi",
288    "trace_id_lo",
289    "span_id",
290    "parent_span_id",
291    "name",
292    "service",
293    "start_time_ns",
294    "duration_ns",
295    "status_code",
296    "payload",
297];
298const SPAN_SIGILS: &[u8] = b"====\"\"@==<";
299
300fn decode_span_from_raw(data: &[u8], offset: usize) -> Option<Span> {
301    if offset + 4 > data.len() {
302        return None;
303    }
304    let magic = u32::from_le_bytes(data[offset..offset + 4].try_into().ok()?);
305    if magic != crate::wal::MAGIC_OBJ {
306        return None;
307    }
308
309    let keys: Vec<String> = SPAN_KEYS.iter().map(|s| s.to_string()).collect();
310    let fields = decode_record_at(data, offset, &keys, SPAN_SIGILS).ok()?;
311    fields_to_span(&fields)
312}
313
314fn is_null(fields: &[(String, DecodedValue)], name: &str) -> bool {
315    fields
316        .iter()
317        .any(|(k, v)| k == name && matches!(v, DecodedValue::Null))
318}
319
320fn get_i64(fields: &[(String, DecodedValue)], name: &str) -> Option<i64> {
321    fields.iter().find_map(|(k, v)| {
322        if k == name {
323            match v {
324                DecodedValue::Int(i) => Some(*i),
325                DecodedValue::Time(i) => Some(*i),
326                _ => None,
327            }
328        } else {
329            None
330        }
331    })
332}
333
334fn get_str(fields: &[(String, DecodedValue)], name: &str) -> String {
335    fields
336        .iter()
337        .find_map(|(k, v)| {
338            if k == name {
339                if let DecodedValue::Str(s) = v {
340                    Some(s.clone())
341                } else {
342                    None
343                }
344            } else {
345                None
346            }
347        })
348        .unwrap_or_default()
349}
350
351fn get_bytes(fields: &[(String, DecodedValue)], name: &str) -> Option<Vec<u8>> {
352    fields.iter().find_map(|(k, v)| {
353        if k == name {
354            if let DecodedValue::Binary(b) = v {
355                Some(b.clone())
356            } else {
357                None
358            }
359        } else {
360            None
361        }
362    })
363}