use crate::decoder::{decode, decode_record_at, DecodedValue};
use crate::error::{NxsError, Result};
use crate::wal::{SpanWal, MAGIC_WAL};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone)]
pub struct Span {
pub trace_id: u128,
pub span_id: u64,
pub parent_span_id: Option<u64>,
pub name: String,
pub service: String,
pub start_time_ns: i64,
pub duration_ns: i64,
pub status_code: i64,
pub payload: Option<Vec<u8>>,
}
pub struct SegmentReader {
segments: Vec<SealedSegment>,
wal: Option<WalReader>,
}
struct SealedSegment {
path: PathBuf,
data: Vec<u8>,
index: HashMap<u128, Vec<u64>>,
keys: Vec<String>,
sigils: Vec<u8>,
}
struct WalReader {
wal: SpanWal,
data: Vec<u8>,
}
impl SegmentReader {
pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
let dir = dir.as_ref();
let mut segments = Vec::new();
let mut wal_path: Option<PathBuf> = None;
let entries = fs::read_dir(dir).map_err(|e| NxsError::IoError(e.to_string()))?;
for entry in entries {
let entry = entry.map_err(|e| NxsError::IoError(e.to_string()))?;
let path = entry.path();
match path.extension().and_then(|e| e.to_str()) {
Some("nxb") => {
let data = fs::read(&path).map_err(|e| NxsError::IoError(e.to_string()))?;
let seg = SealedSegment::load(path, data)?;
segments.push(seg);
}
Some("nxsw") => {
wal_path = Some(path);
}
_ => {}
}
}
segments.sort_by(|a, b| a.path.cmp(&b.path));
let wal = if let Some(p) = wal_path {
let mut w = SpanWal::open(&p)?;
w.recover()?;
let data = fs::read(w.path()).map_err(|e| NxsError::IoError(e.to_string()))?;
Some(WalReader { wal: w, data })
} else {
None
};
Ok(SegmentReader { segments, wal })
}
pub fn find_by_trace(&self, trace_id: u128) -> Result<Vec<Span>> {
let mut spans = Vec::new();
for seg in &self.segments {
if let Some(offsets) = seg.index.get(&trace_id) {
for &abs_off in offsets {
if let Ok(span) = seg.decode_span_at(abs_off) {
spans.push(span);
}
}
}
}
if let Some(ref wr) = self.wal {
for entry in &wr.wal.index {
if entry.trace_id == trace_id {
if let Some(span) = decode_span_from_raw(&wr.data, entry.offset as usize) {
spans.push(span);
}
}
}
}
spans.sort_by_key(|s| s.start_time_ns);
Ok(spans)
}
pub fn find_span(&self, trace_id: u128, span_id: u64) -> Result<Option<Span>> {
let spans = self.find_by_trace(trace_id)?;
Ok(spans.into_iter().find(|s| s.span_id == span_id))
}
pub fn find_by_time(&self, start_ns: i64, end_ns: i64) -> Result<Vec<Span>> {
let mut spans = Vec::new();
for seg in &self.segments {
for offsets in seg.index.values() {
for &abs_off in offsets {
if let Ok(span) = seg.decode_span_at(abs_off) {
if span.start_time_ns >= start_ns && span.start_time_ns <= end_ns {
spans.push(span);
}
}
}
}
}
if let Some(ref wr) = self.wal {
for entry in &wr.wal.index {
if let Some(span) = decode_span_from_raw(&wr.data, entry.offset as usize) {
if span.start_time_ns >= start_ns && span.start_time_ns <= end_ns {
spans.push(span);
}
}
}
}
spans.sort_by_key(|s| s.start_time_ns);
Ok(spans)
}
pub fn stats(&self) -> ReaderStats {
let segment_count = self.segments.len();
let sealed_records: u64 = self
.segments
.iter()
.map(|s| s.index.values().map(|v| v.len() as u64).sum::<u64>())
.sum();
let wal_records = self.wal.as_ref().map(|w| w.wal.record_count()).unwrap_or(0);
ReaderStats {
segment_count,
sealed_records,
wal_records,
}
}
}
#[derive(Debug)]
pub struct ReaderStats {
pub segment_count: usize,
pub sealed_records: u64,
pub wal_records: u64,
}
impl SealedSegment {
fn load(path: PathBuf, data: Vec<u8>) -> Result<Self> {
let decoded = decode(&data)?;
let mut index: HashMap<u128, Vec<u64>> = HashMap::new();
let tail_start = decoded.tail_start;
let tail_end = data.len().saturating_sub(8);
let mut pos = tail_start;
while pos + 10 <= tail_end {
let _key_id = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
let abs_off = u64::from_le_bytes(data[pos + 2..pos + 10].try_into().unwrap());
pos += 10;
if abs_off as usize + 8 <= data.len() {
let fields =
decode_record_at(&data, abs_off as usize, &decoded.keys, &decoded.key_sigils)
.unwrap_or_default();
if let Some(trace_id) = extract_trace_id(&fields) {
index.entry(trace_id).or_default().push(abs_off);
}
}
}
Ok(SealedSegment {
path,
data,
index,
keys: decoded.keys,
sigils: decoded.key_sigils,
})
}
fn decode_span_at(&self, abs_off: u64) -> Result<Span> {
let fields = decode_record_at(&self.data, abs_off as usize, &self.keys, &self.sigils)?;
fields_to_span(&fields).ok_or(NxsError::OutOfBounds)
}
}
fn extract_trace_id(fields: &[(String, DecodedValue)]) -> Option<u128> {
let hi = get_i64(fields, "trace_id_hi")? as u64;
let lo = get_i64(fields, "trace_id_lo")? as u64;
Some(((hi as u128) << 64) | lo as u128)
}
fn fields_to_span(fields: &[(String, DecodedValue)]) -> Option<Span> {
let trace_id = extract_trace_id(fields)?;
let span_id = get_i64(fields, "span_id")? as u64;
let parent_span_id = if is_null(fields, "parent_span_id") {
None
} else {
get_i64(fields, "parent_span_id")
.map(|v| v as u64)
.filter(|&v| v != 0)
};
Some(Span {
trace_id,
span_id,
parent_span_id,
name: get_str(fields, "name"),
service: get_str(fields, "service"),
start_time_ns: get_i64(fields, "start_time_ns").unwrap_or(0),
duration_ns: get_i64(fields, "duration_ns").unwrap_or(0),
status_code: get_i64(fields, "status_code").unwrap_or(0),
payload: get_bytes(fields, "payload"),
})
}
const SPAN_KEYS: &[&str] = &[
"trace_id_hi",
"trace_id_lo",
"span_id",
"parent_span_id",
"name",
"service",
"start_time_ns",
"duration_ns",
"status_code",
"payload",
];
const SPAN_SIGILS: &[u8] = b"====\"\"@==<";
fn decode_span_from_raw(data: &[u8], offset: usize) -> Option<Span> {
if offset + 4 > data.len() {
return None;
}
let magic = u32::from_le_bytes(data[offset..offset + 4].try_into().ok()?);
if magic != crate::wal::MAGIC_OBJ {
return None;
}
let keys: Vec<String> = SPAN_KEYS.iter().map(|s| s.to_string()).collect();
let fields = decode_record_at(data, offset, &keys, SPAN_SIGILS).ok()?;
fields_to_span(&fields)
}
fn is_null(fields: &[(String, DecodedValue)], name: &str) -> bool {
fields
.iter()
.any(|(k, v)| k == name && matches!(v, DecodedValue::Null))
}
fn get_i64(fields: &[(String, DecodedValue)], name: &str) -> Option<i64> {
fields.iter().find_map(|(k, v)| {
if k == name {
match v {
DecodedValue::Int(i) => Some(*i),
DecodedValue::Time(i) => Some(*i),
_ => None,
}
} else {
None
}
})
}
fn get_str(fields: &[(String, DecodedValue)], name: &str) -> String {
fields
.iter()
.find_map(|(k, v)| {
if k == name {
if let DecodedValue::Str(s) = v {
Some(s.clone())
} else {
None
}
} else {
None
}
})
.unwrap_or_default()
}
fn get_bytes(fields: &[(String, DecodedValue)], name: &str) -> Option<Vec<u8>> {
fields.iter().find_map(|(k, v)| {
if k == name {
if let DecodedValue::Binary(b) = v {
Some(b.clone())
} else {
None
}
} else {
None
}
})
}