1use crate::decoder::{decode, decode_record_at, DecodedValue};
20use crate::error::{NxsError, Result};
21use crate::wal::SpanWal;
22use memmap2::Mmap;
23use std::collections::HashMap;
24use std::fs::{self, File};
25use std::path::{Path, PathBuf};
26
27#[derive(Debug, Clone)]
29pub struct Span {
30 pub trace_id: u128,
31 pub span_id: u64,
32 pub parent_span_id: Option<u64>,
33 pub name: String,
34 pub service: String,
35 pub start_time_ns: i64,
36 pub duration_ns: i64,
37 pub status_code: i64,
38 pub payload: Option<Vec<u8>>,
39}
40
41pub struct SegmentReader {
43 segments: Vec<SealedSegment>,
44 wal: Option<WalReader>,
45}
46
47struct SealedSegment {
48 path: PathBuf,
49 _file: File,
50 mmap: Mmap,
51 index: HashMap<u128, Vec<u64>>,
53 keys: Vec<String>,
54 sigils: Vec<u8>,
55}
56
57struct WalReader {
59 wal: SpanWal,
60 data: Vec<u8>,
62}
63
64impl SegmentReader {
65 pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
67 let dir = dir.as_ref();
68 let mut segments = Vec::new();
69 let mut wal_path: Option<PathBuf> = None;
70
71 let entries = fs::read_dir(dir).map_err(|e| NxsError::IoError(e.to_string()))?;
72 for entry in entries {
73 let entry = entry.map_err(|e| NxsError::IoError(e.to_string()))?;
74 let path = entry.path();
75 match path.extension().and_then(|e| e.to_str()) {
76 Some("nxb") => {
77 let seg = SealedSegment::load(path)?;
78 segments.push(seg);
79 }
80 Some("nxsw") => {
81 wal_path = Some(path);
83 }
84 _ => {}
85 }
86 }
87
88 segments.sort_by(|a, b| a.path.cmp(&b.path));
90
91 let wal = if let Some(p) = wal_path {
92 let mut w = SpanWal::open(&p)?;
93 w.recover()?;
94 let data = fs::read(w.path()).map_err(|e| NxsError::IoError(e.to_string()))?;
95 Some(WalReader { wal: w, data })
96 } else {
97 None
98 };
99
100 Ok(SegmentReader { segments, wal })
101 }
102
103 pub fn find_by_trace(&self, trace_id: u128) -> Result<Vec<Span>> {
105 let mut spans = Vec::new();
106
107 for seg in &self.segments {
108 if let Some(offsets) = seg.index.get(&trace_id) {
109 for &abs_off in offsets {
110 if let Ok(span) = seg.decode_span_at(abs_off) {
111 spans.push(span);
112 }
113 }
114 }
115 }
116
117 if let Some(ref wr) = self.wal {
118 for entry in &wr.wal.index {
119 if entry.trace_id == trace_id {
120 if let Some(span) = decode_span_from_raw(&wr.data, entry.offset as usize) {
121 spans.push(span);
122 }
123 }
124 }
125 }
126
127 spans.sort_by_key(|s| s.start_time_ns);
128 Ok(spans)
129 }
130
131 pub fn find_span(&self, trace_id: u128, span_id: u64) -> Result<Option<Span>> {
133 let spans = self.find_by_trace(trace_id)?;
134 Ok(spans.into_iter().find(|s| s.span_id == span_id))
135 }
136
137 pub fn find_by_time(&self, start_ns: i64, end_ns: i64) -> Result<Vec<Span>> {
139 let mut spans = Vec::new();
140
141 for seg in &self.segments {
142 for offsets in seg.index.values() {
143 for &abs_off in offsets {
144 if let Ok(span) = seg.decode_span_at(abs_off) {
145 if span.start_time_ns >= start_ns && span.start_time_ns <= end_ns {
146 spans.push(span);
147 }
148 }
149 }
150 }
151 }
152
153 if let Some(ref wr) = self.wal {
154 for entry in &wr.wal.index {
155 if let Some(span) = decode_span_from_raw(&wr.data, entry.offset as usize) {
156 if span.start_time_ns >= start_ns && span.start_time_ns <= end_ns {
157 spans.push(span);
158 }
159 }
160 }
161 }
162
163 spans.sort_by_key(|s| s.start_time_ns);
164 Ok(spans)
165 }
166
167 pub fn stats(&self) -> ReaderStats {
169 let segment_count = self.segments.len();
170 let sealed_records: u64 = self
171 .segments
172 .iter()
173 .map(|s| s.index.values().map(|v| v.len() as u64).sum::<u64>())
174 .sum();
175 let wal_records = self.wal.as_ref().map(|w| w.wal.record_count()).unwrap_or(0);
176 ReaderStats {
177 segment_count,
178 sealed_records,
179 wal_records,
180 }
181 }
182}
183
184#[derive(Debug)]
185pub struct ReaderStats {
186 pub segment_count: usize,
187 pub sealed_records: u64,
188 pub wal_records: u64,
189}
190
191impl SealedSegment {
192 fn load(path: PathBuf) -> Result<Self> {
193 let file = File::open(&path).map_err(|e| NxsError::IoError(e.to_string()))?;
194 let mmap = unsafe {
195 Mmap::map(&file)
196 .map_err(|e| NxsError::IoError(format!("mmap {}: {e}", path.display())))?
197 };
198 let data = &mmap[..];
199 let decoded = decode(data)?;
200
201 let mut index: HashMap<u128, Vec<u64>> = HashMap::new();
204 let tail_start = decoded.tail_start;
205 let tail_end = data.len().saturating_sub(12); let mut pos = tail_start;
208 while pos + 10 <= tail_end {
209 let _key_id = u16::from_le_bytes(
210 data[pos..pos + 2]
211 .try_into()
212 .map_err(|_| NxsError::OutOfBounds)?,
213 );
214 let abs_off = u64::from_le_bytes(
215 data[pos + 2..pos + 10]
216 .try_into()
217 .map_err(|_| NxsError::OutOfBounds)?,
218 );
219 pos += 10;
220
221 if abs_off as usize + 8 <= data.len() {
223 let fields =
224 decode_record_at(data, abs_off as usize, &decoded.keys, &decoded.key_sigils)
225 .unwrap_or_default();
226 if let Some(trace_id) = extract_trace_id(&fields) {
227 index.entry(trace_id).or_default().push(abs_off);
228 }
229 }
230 }
231
232 Ok(SealedSegment {
233 path,
234 _file: file,
235 mmap,
236 index,
237 keys: decoded.keys,
238 sigils: decoded.key_sigils,
239 })
240 }
241
242 fn data(&self) -> &[u8] {
243 &self.mmap
244 }
245
246 fn decode_span_at(&self, abs_off: u64) -> Result<Span> {
247 let fields = decode_record_at(self.data(), abs_off as usize, &self.keys, &self.sigils)?;
248 fields_to_span(&fields).ok_or(NxsError::OutOfBounds)
249 }
250}
251
252fn extract_trace_id(fields: &[(String, DecodedValue)]) -> Option<u128> {
255 let hi = get_i64(fields, "trace_id_hi")? as u64;
256 let lo = get_i64(fields, "trace_id_lo")? as u64;
257 Some(((hi as u128) << 64) | lo as u128)
258}
259
260fn fields_to_span(fields: &[(String, DecodedValue)]) -> Option<Span> {
261 let trace_id = extract_trace_id(fields)?;
262 let span_id = get_i64(fields, "span_id")? as u64;
263 let parent_span_id = if is_null(fields, "parent_span_id") {
266 None
267 } else {
268 get_i64(fields, "parent_span_id")
269 .map(|v| v as u64)
270 .filter(|&v| v != 0)
271 };
272
273 Some(Span {
274 trace_id,
275 span_id,
276 parent_span_id,
277 name: get_str(fields, "name"),
278 service: get_str(fields, "service"),
279 start_time_ns: get_i64(fields, "start_time_ns").unwrap_or(0),
280 duration_ns: get_i64(fields, "duration_ns").unwrap_or(0),
281 status_code: get_i64(fields, "status_code").unwrap_or(0),
282 payload: get_bytes(fields, "payload"),
283 })
284}
285
286const SPAN_KEYS: &[&str] = &[
287 "trace_id_hi",
288 "trace_id_lo",
289 "span_id",
290 "parent_span_id",
291 "name",
292 "service",
293 "start_time_ns",
294 "duration_ns",
295 "status_code",
296 "payload",
297];
298const SPAN_SIGILS: &[u8] = b"====\"\"@==<";
299
300fn decode_span_from_raw(data: &[u8], offset: usize) -> Option<Span> {
301 if offset + 4 > data.len() {
302 return None;
303 }
304 let magic = u32::from_le_bytes(data[offset..offset + 4].try_into().ok()?);
305 if magic != crate::wal::MAGIC_OBJ {
306 return None;
307 }
308
309 let keys: Vec<String> = SPAN_KEYS.iter().map(|s| s.to_string()).collect();
310 let fields = decode_record_at(data, offset, &keys, SPAN_SIGILS).ok()?;
311 fields_to_span(&fields)
312}
313
314fn is_null(fields: &[(String, DecodedValue)], name: &str) -> bool {
315 fields
316 .iter()
317 .any(|(k, v)| k == name && matches!(v, DecodedValue::Null))
318}
319
320fn get_i64(fields: &[(String, DecodedValue)], name: &str) -> Option<i64> {
321 fields.iter().find_map(|(k, v)| {
322 if k == name {
323 match v {
324 DecodedValue::Int(i) => Some(*i),
325 DecodedValue::Time(i) => Some(*i),
326 _ => None,
327 }
328 } else {
329 None
330 }
331 })
332}
333
334fn get_str(fields: &[(String, DecodedValue)], name: &str) -> String {
335 fields
336 .iter()
337 .find_map(|(k, v)| {
338 if k == name {
339 if let DecodedValue::Str(s) = v {
340 Some(s.clone())
341 } else {
342 None
343 }
344 } else {
345 None
346 }
347 })
348 .unwrap_or_default()
349}
350
351fn get_bytes(fields: &[(String, DecodedValue)], name: &str) -> Option<Vec<u8>> {
352 fields.iter().find_map(|(k, v)| {
353 if k == name {
354 if let DecodedValue::Binary(b) = v {
355 Some(b.clone())
356 } else {
357 None
358 }
359 } else {
360 None
361 }
362 })
363}