use crate::error::{NxsError, Result};
use crate::writer::{NxsWriter, Schema, Slot};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
pub const MAGIC_WAL: u32 = 0x5753584E; pub const MAGIC_OBJ: u32 = 0x4E58534F; const WAL_VERSION: u16 = 0x0100;
const WAL_FLAG_SCHEMA_EMBEDDED: u16 = 0x0001;
const MAX_RECORD_BYTES: u64 = 10 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct WalEntry {
pub trace_id: u128,
pub span_id: u64,
pub offset: u64,
}
#[derive(Debug)]
pub struct SpanFields<'a> {
pub trace_id_hi: i64,
pub trace_id_lo: i64,
pub span_id: i64,
pub parent_span_id: Option<i64>,
pub name: &'a str,
pub service: &'a str,
pub start_time_ns: i64,
pub duration_ns: i64,
pub status_code: i64,
pub payload: Option<&'a [u8]>,
}
pub struct SpanSchema {
pub schema: Schema,
}
impl SpanSchema {
pub fn new() -> Self {
SpanSchema {
schema: Schema::new(&[
"trace_id_hi", "trace_id_lo", "span_id", "parent_span_id", "name", "service", "start_time_ns", "duration_ns", "status_code", "payload", ]),
}
}
}
pub mod slot {
use crate::writer::Slot;
pub const TRACE_ID_HI: Slot = Slot(0);
pub const TRACE_ID_LO: Slot = Slot(1);
pub const SPAN_ID: Slot = Slot(2);
pub const PARENT_SPAN_ID: Slot = Slot(3);
pub const NAME: Slot = Slot(4);
pub const SERVICE: Slot = Slot(5);
pub const START_TIME_NS: Slot = Slot(6);
pub const DURATION_NS: Slot = Slot(7);
pub const STATUS_CODE: Slot = Slot(8);
pub const PAYLOAD: Slot = Slot(9);
}
pub struct SpanWal {
path: PathBuf,
file: BufWriter<File>,
pub index: Vec<WalEntry>,
pub record_count: u64,
schema: SpanSchema,
data_start: u64,
current_offset: u64,
}
impl SpanWal {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let schema = SpanSchema::new();
let schema_bytes = build_wal_schema_bytes(&schema.schema);
let file_exists = path.exists();
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|e| NxsError::IoError(e.to_string()))?;
let mut writer = BufWriter::new(file);
let data_start = 8 + 4 + schema_bytes.len() as u64;
if !file_exists {
writer
.write_all(&MAGIC_WAL.to_le_bytes())
.map_err(|e| NxsError::IoError(e.to_string()))?;
writer
.write_all(&WAL_VERSION.to_le_bytes())
.map_err(|e| NxsError::IoError(e.to_string()))?;
writer
.write_all(&WAL_FLAG_SCHEMA_EMBEDDED.to_le_bytes())
.map_err(|e| NxsError::IoError(e.to_string()))?;
writer
.write_all(&(schema_bytes.len() as u32).to_le_bytes())
.map_err(|e| NxsError::IoError(e.to_string()))?;
writer
.write_all(&schema_bytes)
.map_err(|e| NxsError::IoError(e.to_string()))?;
writer
.flush()
.map_err(|e| NxsError::IoError(e.to_string()))?;
}
let initial_offset = if file_exists {
0 } else {
data_start
};
Ok(SpanWal {
path,
file: writer,
index: Vec::new(),
record_count: 0,
schema,
data_start,
current_offset: initial_offset,
})
}
pub fn append(&mut self, span: &SpanFields) -> Result<u64> {
let file_offset = self.current_offset;
let mut w = NxsWriter::new(&self.schema.schema);
w.begin_object();
w.write_i64(slot::TRACE_ID_HI, span.trace_id_hi);
w.write_i64(slot::TRACE_ID_LO, span.trace_id_lo);
w.write_i64(slot::SPAN_ID, span.span_id);
match span.parent_span_id {
Some(p) => w.write_i64(slot::PARENT_SPAN_ID, p),
None => w.write_null(slot::PARENT_SPAN_ID),
}
w.write_str(slot::NAME, span.name);
w.write_str(slot::SERVICE, span.service);
w.write_time(slot::START_TIME_NS, span.start_time_ns);
w.write_i64(slot::DURATION_NS, span.duration_ns);
w.write_i64(slot::STATUS_CODE, span.status_code);
if let Some(payload) = span.payload {
w.write_bytes(slot::PAYLOAD, payload);
}
w.end_object();
let nxb = w.finish();
let data_sector = extract_data_sector(&nxb)?;
self.file
.write_all(data_sector)
.map_err(|e| NxsError::IoError(e.to_string()))?;
self.current_offset += data_sector.len() as u64;
let trace_id =
((span.trace_id_hi as u64 as u128) << 64) | (span.trace_id_lo as u64 as u128);
self.index.push(WalEntry {
trace_id,
span_id: span.span_id as u64,
offset: file_offset,
});
self.record_count += 1;
Ok(file_offset)
}
pub fn flush(&mut self) -> Result<()> {
self.file
.flush()
.map_err(|e| NxsError::IoError(e.to_string()))
}
pub fn recover(&mut self) -> Result<()> {
let mut file = File::open(&self.path).map_err(|e| NxsError::IoError(e.to_string()))?;
let file_len = file
.metadata()
.map_err(|e| NxsError::IoError(e.to_string()))?
.len();
let mut header = [0u8; 8];
file.read_exact(&mut header)
.map_err(|e| NxsError::IoError(e.to_string()))?;
let magic = u32::from_le_bytes(header[0..4].try_into().unwrap());
if magic != MAGIC_WAL {
return Err(NxsError::BadMagic);
}
let mut schema_len_buf = [0u8; 4];
file.read_exact(&mut schema_len_buf)
.map_err(|e| NxsError::IoError(e.to_string()))?;
let schema_len = u32::from_le_bytes(schema_len_buf) as u64;
file.seek(SeekFrom::Current(schema_len as i64))
.map_err(|e| NxsError::IoError(e.to_string()))?;
let mut index = Vec::new();
let mut record_count = 0u64;
loop {
let pos = file
.stream_position()
.map_err(|e| NxsError::IoError(e.to_string()))?;
if pos + 8 > file_len {
break;
}
let mut rec_header = [0u8; 8];
file.read_exact(&mut rec_header)
.map_err(|e| NxsError::IoError(e.to_string()))?;
let obj_magic = u32::from_le_bytes(rec_header[0..4].try_into().unwrap());
if obj_magic != MAGIC_OBJ {
break; }
let obj_len = u32::from_le_bytes(rec_header[4..8].try_into().unwrap()) as u64;
if !(8..=MAX_RECORD_BYTES).contains(&obj_len) || pos + obj_len > file_len {
break;
}
let mut obj_buf = vec![0u8; obj_len as usize];
obj_buf[0..8].copy_from_slice(&rec_header);
file.read_exact(&mut obj_buf[8..])
.map_err(|e| NxsError::IoError(e.to_string()))?;
if let Some((trace_id, span_id)) = extract_trace_span_id(&obj_buf) {
index.push(WalEntry {
trace_id,
span_id,
offset: pos,
});
}
record_count += 1;
}
self.index = index;
self.record_count = record_count;
self.current_offset = file_len;
Ok(())
}
pub fn seal(&mut self, out_path: impl AsRef<Path>) -> Result<SealReport> {
self.flush()?;
let mut file = File::open(&self.path).map_err(|e| NxsError::IoError(e.to_string()))?;
file.seek(SeekFrom::Start(self.data_start))
.map_err(|e| NxsError::IoError(e.to_string()))?;
let schema_for_seal = SpanSchema::new();
let mut w = NxsWriter::with_capacity(&schema_for_seal.schema, 1024 * 1024);
for entry in &self.index {
file.seek(SeekFrom::Start(entry.offset))
.map_err(|e| NxsError::IoError(e.to_string()))?;
let mut hdr = [0u8; 8];
file.read_exact(&mut hdr)
.map_err(|e| NxsError::IoError(e.to_string()))?;
let obj_len = u32::from_le_bytes(hdr[4..8].try_into().unwrap()) as usize;
let mut obj_buf = vec![0u8; obj_len];
obj_buf[0..8].copy_from_slice(&hdr);
file.read_exact(&mut obj_buf[8..])
.map_err(|e| NxsError::IoError(e.to_string()))?;
if let Some(span) = decode_span_object(&obj_buf) {
w.begin_object();
w.write_i64(slot::TRACE_ID_HI, span.trace_id_hi);
w.write_i64(slot::TRACE_ID_LO, span.trace_id_lo);
w.write_i64(slot::SPAN_ID, span.span_id);
match span.parent_span_id {
Some(p) => w.write_i64(slot::PARENT_SPAN_ID, p),
None => w.write_null(slot::PARENT_SPAN_ID),
}
w.write_str(slot::NAME, &span.name_owned);
w.write_str(slot::SERVICE, &span.service_owned);
w.write_time(slot::START_TIME_NS, span.start_time_ns);
w.write_i64(slot::DURATION_NS, span.duration_ns);
w.write_i64(slot::STATUS_CODE, span.status_code);
if let Some(ref payload) = span.payload_owned {
w.write_bytes(slot::PAYLOAD, payload);
}
w.end_object();
}
}
let nxb = w.finish();
let bytes_written = nxb.len() as u64;
let records = self.record_count;
std::fs::write(out_path.as_ref(), &nxb).map_err(|e| NxsError::IoError(e.to_string()))?;
Ok(SealReport {
records,
bytes_written,
segment_path: out_path.as_ref().to_path_buf(),
})
}
pub fn record_count(&self) -> u64 {
self.record_count
}
pub fn path(&self) -> &Path {
&self.path
}
}
#[derive(Debug)]
pub struct SealReport {
pub records: u64,
pub bytes_written: u64,
pub segment_path: PathBuf,
}
fn extract_data_sector(nxb: &[u8]) -> Result<&[u8]> {
if nxb.len() < 32 {
return Err(NxsError::OutOfBounds);
}
let tail_ptr =
u64::from_le_bytes(nxb[16..24].try_into().map_err(|_| NxsError::OutOfBounds)?) as usize;
if tail_ptr > nxb.len() {
return Err(NxsError::OutOfBounds);
}
let mut pos = 32usize;
if pos + 2 > nxb.len() {
return Err(NxsError::OutOfBounds);
}
let key_count = u16::from_le_bytes(nxb[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2 + key_count; for _ in 0..key_count {
while pos < nxb.len() && nxb[pos] != 0 {
pos += 1;
}
pos += 1; }
while pos % 8 != 0 {
pos += 1;
}
if pos > tail_ptr {
return Err(NxsError::OutOfBounds);
}
Ok(&nxb[pos..tail_ptr])
}
fn build_wal_schema_bytes(schema: &Schema) -> Vec<u8> {
let keys = schema_keys(schema);
let n = keys.len();
let mut b = Vec::new();
b.extend_from_slice(&(n as u16).to_le_bytes());
for _ in 0..n {
b.push(b'"'); }
for key in &keys {
b.extend_from_slice(key.as_bytes());
b.push(0x00);
}
while b.len() % 8 != 0 {
b.push(0x00);
}
b
}
fn schema_keys(schema: &Schema) -> Vec<&'static str> {
let _ = schema; vec![
"trace_id_hi",
"trace_id_lo",
"span_id",
"parent_span_id",
"name",
"service",
"start_time_ns",
"duration_ns",
"status_code",
"payload",
]
}
fn extract_trace_span_id(obj: &[u8]) -> Option<(u128, u64)> {
let mut pos = 8usize;
let mut bitmask_bytes = 0usize;
loop {
if pos >= obj.len() {
return None;
}
let b = obj[pos];
pos += 1;
bitmask_bytes += 1;
if b & 0x80 == 0 {
break;
}
if bitmask_bytes > 16 {
return None;
}
}
let bitmask_start = 8;
let mut present = Vec::new();
let mut bp = bitmask_start;
loop {
if bp >= obj.len() {
break;
}
let byte = obj[bp];
bp += 1;
for bit in 0..7 {
present.push((byte >> bit) & 1 == 1);
}
if byte & 0x80 == 0 {
break;
}
}
let present_count = present.iter().filter(|&&b| b).count();
let ot_start = bp;
if ot_start + present_count * 2 > obj.len() {
return None;
}
let mut slot_to_ot: Vec<Option<usize>> = vec![None; present.len()];
let mut ot_idx = 0;
for (slot, &p) in present.iter().enumerate() {
if p {
slot_to_ot[slot] = Some(ot_idx);
ot_idx += 1;
}
}
let read_i64_at_slot = |slot: usize| -> Option<i64> {
let ot_i = slot_to_ot.get(slot)?.as_ref()?;
let ot_off = ot_start + ot_i * 2;
if ot_off + 2 > obj.len() {
return None;
}
let rel = u16::from_le_bytes(obj[ot_off..ot_off + 2].try_into().ok()?) as usize;
let val_off = rel; if val_off + 8 > obj.len() {
return None;
}
Some(i64::from_le_bytes(
obj[val_off..val_off + 8].try_into().ok()?,
))
};
let hi = read_i64_at_slot(0)? as u64;
let lo = read_i64_at_slot(1)? as u64;
let span_id = read_i64_at_slot(2)? as u64;
let trace_id = ((hi as u128) << 64) | (lo as u128);
Some((trace_id, span_id))
}
struct DecodedSpan {
trace_id_hi: i64,
trace_id_lo: i64,
span_id: i64,
parent_span_id: Option<i64>,
name_owned: String,
service_owned: String,
start_time_ns: i64,
duration_ns: i64,
status_code: i64,
payload_owned: Option<Vec<u8>>,
}
const SPAN_KEYS: &[&str] = &[
"trace_id_hi",
"trace_id_lo",
"span_id",
"parent_span_id",
"name",
"service",
"start_time_ns",
"duration_ns",
"status_code",
"payload",
];
const SPAN_SIGILS: &[u8] = b"====\"\"@==<";
fn decode_span_object(obj: &[u8]) -> Option<DecodedSpan> {
use crate::decoder::{decode_record_at, DecodedValue};
let keys: Vec<String> = SPAN_KEYS.iter().map(|s| s.to_string()).collect();
let sigils = SPAN_SIGILS;
let fields = decode_record_at(obj, 0, &keys, sigils).ok()?;
let get_i64 = |name: &str| -> Option<i64> {
fields.iter().find_map(|(k, v)| {
if k == name {
match v {
DecodedValue::Int(i) => Some(*i),
DecodedValue::Time(i) => Some(*i),
_ => None,
}
} else {
None
}
})
};
let get_str = |name: &str| -> String {
fields
.iter()
.find_map(|(k, v)| {
if k == name {
if let DecodedValue::Str(s) = v {
Some(s.clone())
} else {
None
}
} else {
None
}
})
.unwrap_or_default()
};
let get_bytes = |name: &str| -> Option<Vec<u8>> {
fields.iter().find_map(|(k, v)| {
if k == name {
if let DecodedValue::Binary(b) = v {
Some(b.clone())
} else {
None
}
} else {
None
}
})
};
let get_null = |name: &str| -> bool {
fields
.iter()
.any(|(k, v)| k == name && *v == DecodedValue::Null)
};
Some(DecodedSpan {
trace_id_hi: get_i64("trace_id_hi")?,
trace_id_lo: get_i64("trace_id_lo")?,
span_id: get_i64("span_id")?,
parent_span_id: if get_null("parent_span_id") {
None
} else {
get_i64("parent_span_id").filter(|&v| v != 0)
},
name_owned: get_str("name"),
service_owned: get_str("service"),
start_time_ns: get_i64("start_time_ns").unwrap_or(0),
duration_ns: get_i64("duration_ns").unwrap_or(0),
status_code: get_i64("status_code").unwrap_or(0),
payload_owned: get_bytes("payload"),
})
}