1use crate::error::{NxsError, Result};
25use crate::writer::{NxsWriter, Schema};
26use std::fs::{File, OpenOptions};
27use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
28use std::path::{Path, PathBuf};
29
30pub const MAGIC_WAL: u32 = 0x5753584E; pub const MAGIC_OBJ: u32 = 0x4E59584F; const WAL_VERSION: u16 = 0x0100;
33const WAL_FLAG_SCHEMA_EMBEDDED: u16 = 0x0001;
34const MAX_RECORD_BYTES: u64 = 10 * 1024 * 1024; #[derive(Debug, Clone)]
38pub struct WalEntry {
39 pub trace_id: u128,
40 pub span_id: u64,
41 pub offset: u64,
43}
44
45#[derive(Debug)]
47pub struct SpanFields<'a> {
48 pub trace_id_hi: i64,
49 pub trace_id_lo: i64,
50 pub span_id: i64,
51 pub parent_span_id: Option<i64>,
52 pub name: &'a str,
53 pub service: &'a str,
54 pub start_time_ns: i64,
55 pub duration_ns: i64,
56 pub status_code: i64,
57 pub payload: Option<&'a [u8]>,
59}
60
61pub struct SpanSchema {
64 pub schema: Schema,
65}
66
67impl SpanSchema {
68 pub fn new() -> Self {
69 SpanSchema {
70 schema: Schema::new(&[
71 "trace_id_hi", "trace_id_lo", "span_id", "parent_span_id", "name", "service", "start_time_ns", "duration_ns", "status_code", "payload", ]),
82 }
83 }
84}
85
86pub mod slot {
88 use crate::writer::Slot;
89 pub const TRACE_ID_HI: Slot = Slot(0);
90 pub const TRACE_ID_LO: Slot = Slot(1);
91 pub const SPAN_ID: Slot = Slot(2);
92 pub const PARENT_SPAN_ID: Slot = Slot(3);
93 pub const NAME: Slot = Slot(4);
94 pub const SERVICE: Slot = Slot(5);
95 pub const START_TIME_NS: Slot = Slot(6);
96 pub const DURATION_NS: Slot = Slot(7);
97 pub const STATUS_CODE: Slot = Slot(8);
98 pub const PAYLOAD: Slot = Slot(9);
99}
100
101pub struct SpanWal {
103 path: PathBuf,
104 file: BufWriter<File>,
105 pub index: Vec<WalEntry>,
107 pub record_count: u64,
108 schema: SpanSchema,
109 data_start: u64,
111 current_offset: u64,
113}
114
115impl SpanWal {
116 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
119 let path = path.as_ref().to_path_buf();
120 let schema = SpanSchema::new();
121 let schema_bytes = build_wal_schema_bytes(&schema.schema);
122
123 let file_exists = path.exists();
124 let file = OpenOptions::new()
125 .create(true)
126 .append(true)
127 .open(&path)
128 .map_err(|e| NxsError::IoError(e.to_string()))?;
129 let mut writer = BufWriter::new(file);
130
131 let data_start = 8 + 4 + schema_bytes.len() as u64;
133 if !file_exists {
134 writer
136 .write_all(&MAGIC_WAL.to_le_bytes())
137 .map_err(|e| NxsError::IoError(e.to_string()))?;
138 writer
139 .write_all(&WAL_VERSION.to_le_bytes())
140 .map_err(|e| NxsError::IoError(e.to_string()))?;
141 writer
142 .write_all(&WAL_FLAG_SCHEMA_EMBEDDED.to_le_bytes())
143 .map_err(|e| NxsError::IoError(e.to_string()))?;
144 writer
145 .write_all(&(schema_bytes.len() as u32).to_le_bytes())
146 .map_err(|e| NxsError::IoError(e.to_string()))?;
147 writer
148 .write_all(&schema_bytes)
149 .map_err(|e| NxsError::IoError(e.to_string()))?;
150 writer
151 .flush()
152 .map_err(|e| NxsError::IoError(e.to_string()))?;
153 }
154
155 let initial_offset = if file_exists {
158 0 } else {
160 data_start
161 };
162
163 Ok(SpanWal {
164 path,
165 file: writer,
166 index: Vec::new(),
167 record_count: 0,
168 schema,
169 data_start,
170 current_offset: initial_offset,
171 })
172 }
173
174 pub fn append(&mut self, span: &SpanFields) -> Result<u64> {
176 let file_offset = self.current_offset;
177
178 let mut w = NxsWriter::new(&self.schema.schema);
180 w.begin_object();
181 w.write_i64(slot::TRACE_ID_HI, span.trace_id_hi);
182 w.write_i64(slot::TRACE_ID_LO, span.trace_id_lo);
183 w.write_i64(slot::SPAN_ID, span.span_id);
184 match span.parent_span_id {
185 Some(p) => w.write_i64(slot::PARENT_SPAN_ID, p),
186 None => w.write_null(slot::PARENT_SPAN_ID),
187 }
188 w.write_str(slot::NAME, span.name);
189 w.write_str(slot::SERVICE, span.service);
190 w.write_time(slot::START_TIME_NS, span.start_time_ns);
191 w.write_i64(slot::DURATION_NS, span.duration_ns);
192 w.write_i64(slot::STATUS_CODE, span.status_code);
193 if let Some(payload) = span.payload {
194 w.write_bytes(slot::PAYLOAD, payload);
195 }
196 w.end_object();
197
198 let nxb = w.finish();
201 let data_sector = extract_data_sector(&nxb)?;
202
203 self.file
204 .write_all(data_sector)
205 .map_err(|e| NxsError::IoError(e.to_string()))?;
206
207 self.current_offset += data_sector.len() as u64;
208
209 let trace_id =
211 ((span.trace_id_hi as u64 as u128) << 64) | (span.trace_id_lo as u64 as u128);
212 self.index.push(WalEntry {
213 trace_id,
214 span_id: span.span_id as u64,
215 offset: file_offset,
216 });
217 self.record_count += 1;
218
219 Ok(file_offset)
220 }
221
222 pub fn flush(&mut self) -> Result<()> {
224 self.file
225 .flush()
226 .map_err(|e| NxsError::IoError(e.to_string()))
227 }
228
229 pub fn recover(&mut self) -> Result<()> {
232 let mut file = File::open(&self.path).map_err(|e| NxsError::IoError(e.to_string()))?;
233 let file_len = file
234 .metadata()
235 .map_err(|e| NxsError::IoError(e.to_string()))?
236 .len();
237
238 let mut header = [0u8; 8];
240 file.read_exact(&mut header)
241 .map_err(|e| NxsError::IoError(e.to_string()))?;
242 let magic = u32::from_le_bytes(header[0..4].try_into().map_err(|_| NxsError::OutOfBounds)?);
243 if magic != MAGIC_WAL {
244 return Err(NxsError::BadMagic);
245 }
246
247 let mut schema_len_buf = [0u8; 4];
249 file.read_exact(&mut schema_len_buf)
250 .map_err(|e| NxsError::IoError(e.to_string()))?;
251 let schema_len = u32::from_le_bytes(schema_len_buf) as u64;
252 file.seek(SeekFrom::Current(schema_len as i64))
253 .map_err(|e| NxsError::IoError(e.to_string()))?;
254
255 let mut index = Vec::new();
257 let mut record_count = 0u64;
258 loop {
259 let pos = file
260 .stream_position()
261 .map_err(|e| NxsError::IoError(e.to_string()))?;
262 if pos + 8 > file_len {
263 break;
264 }
265
266 let mut rec_header = [0u8; 8];
267 file.read_exact(&mut rec_header)
268 .map_err(|e| NxsError::IoError(e.to_string()))?;
269 let obj_magic = u32::from_le_bytes(
270 rec_header[0..4]
271 .try_into()
272 .map_err(|_| NxsError::OutOfBounds)?,
273 );
274 if obj_magic != MAGIC_OBJ {
275 break; }
277 let obj_len = u32::from_le_bytes(
278 rec_header[4..8]
279 .try_into()
280 .map_err(|_| NxsError::OutOfBounds)?,
281 ) as u64;
282
283 if !(8..=MAX_RECORD_BYTES).contains(&obj_len) || pos + obj_len > file_len {
285 break;
286 }
287 let mut obj_buf = vec![0u8; obj_len as usize];
288 obj_buf[0..8].copy_from_slice(&rec_header);
289 file.read_exact(&mut obj_buf[8..])
290 .map_err(|e| NxsError::IoError(e.to_string()))?;
291
292 if let Some((trace_id, span_id)) = extract_trace_span_id(&obj_buf) {
293 index.push(WalEntry {
294 trace_id,
295 span_id,
296 offset: pos,
297 });
298 }
299 record_count += 1;
300 }
301
302 self.index = index;
303 self.record_count = record_count;
304 self.current_offset = file_len;
306 Ok(())
307 }
308
309 pub fn seal(&mut self, out_path: impl AsRef<Path>) -> Result<SealReport> {
313 self.flush()?;
314
315 let mut file = File::open(&self.path).map_err(|e| NxsError::IoError(e.to_string()))?;
316
317 file.seek(SeekFrom::Start(self.data_start))
319 .map_err(|e| NxsError::IoError(e.to_string()))?;
320
321 let schema_for_seal = SpanSchema::new();
323 let mut w = NxsWriter::with_capacity(&schema_for_seal.schema, 1024 * 1024);
324
325 for entry in &self.index {
327 file.seek(SeekFrom::Start(entry.offset))
328 .map_err(|e| NxsError::IoError(e.to_string()))?;
329
330 let mut hdr = [0u8; 8];
331 file.read_exact(&mut hdr)
332 .map_err(|e| NxsError::IoError(e.to_string()))?;
333 let obj_len = u32::from_le_bytes(hdr[4..8].try_into().unwrap()) as usize;
334 let mut obj_buf = vec![0u8; obj_len];
335 obj_buf[0..8].copy_from_slice(&hdr);
336 file.read_exact(&mut obj_buf[8..])
337 .map_err(|e| NxsError::IoError(e.to_string()))?;
338
339 if let Some(span) = decode_span_object(&obj_buf) {
342 w.begin_object();
343 w.write_i64(slot::TRACE_ID_HI, span.trace_id_hi);
344 w.write_i64(slot::TRACE_ID_LO, span.trace_id_lo);
345 w.write_i64(slot::SPAN_ID, span.span_id);
346 match span.parent_span_id {
347 Some(p) => w.write_i64(slot::PARENT_SPAN_ID, p),
348 None => w.write_null(slot::PARENT_SPAN_ID),
349 }
350 w.write_str(slot::NAME, &span.name_owned);
351 w.write_str(slot::SERVICE, &span.service_owned);
352 w.write_time(slot::START_TIME_NS, span.start_time_ns);
353 w.write_i64(slot::DURATION_NS, span.duration_ns);
354 w.write_i64(slot::STATUS_CODE, span.status_code);
355 if let Some(ref payload) = span.payload_owned {
356 w.write_bytes(slot::PAYLOAD, payload);
357 }
358 w.end_object();
359 }
360 }
361
362 let nxb = w.finish();
363 let bytes_written = nxb.len() as u64;
364 let records = self.record_count;
365
366 std::fs::write(out_path.as_ref(), &nxb).map_err(|e| NxsError::IoError(e.to_string()))?;
367
368 Ok(SealReport {
369 records,
370 bytes_written,
371 segment_path: out_path.as_ref().to_path_buf(),
372 })
373 }
374
375 pub fn record_count(&self) -> u64 {
376 self.record_count
377 }
378
379 pub fn path(&self) -> &Path {
380 &self.path
381 }
382}
383
384#[derive(Debug)]
385pub struct SealReport {
386 pub records: u64,
387 pub bytes_written: u64,
388 pub segment_path: PathBuf,
389}
390
391fn extract_data_sector(nxb: &[u8]) -> Result<&[u8]> {
396 if nxb.len() < 32 {
397 return Err(NxsError::OutOfBounds);
398 }
399 let mut tail_ptr =
401 u64::from_le_bytes(nxb[16..24].try_into().map_err(|_| NxsError::OutOfBounds)?) as usize;
402 if tail_ptr == 0 {
403 if nxb.len() < 44 {
404 return Err(NxsError::OutOfBounds);
405 }
406 tail_ptr = u64::from_le_bytes(
407 nxb[nxb.len() - 12..nxb.len() - 4]
408 .try_into()
409 .map_err(|_| NxsError::OutOfBounds)?,
410 ) as usize;
411 }
412 if tail_ptr > nxb.len() {
413 return Err(NxsError::OutOfBounds);
414 }
415
416 let mut pos = 32usize;
419 if pos + 2 > nxb.len() {
420 return Err(NxsError::OutOfBounds);
421 }
422 let key_count = u16::from_le_bytes(nxb[pos..pos + 2].try_into().unwrap()) as usize;
423 pos += 2 + key_count; for _ in 0..key_count {
425 while pos < nxb.len() && nxb[pos] != 0 {
426 pos += 1;
427 }
428 pos += 1; }
430 while pos % 8 != 0 {
431 pos += 1;
432 }
433
434 if pos > tail_ptr {
436 return Err(NxsError::OutOfBounds);
437 }
438 Ok(&nxb[pos..tail_ptr])
439}
440
441fn build_wal_schema_bytes(schema: &Schema) -> Vec<u8> {
443 let keys = schema_keys(schema);
445 let n = keys.len();
446 let mut b = Vec::new();
447 b.extend_from_slice(&(n as u16).to_le_bytes());
448 for _ in 0..n {
449 b.push(b'"'); }
451 for key in &keys {
452 b.extend_from_slice(key.as_bytes());
453 b.push(0x00);
454 }
455 while b.len() % 8 != 0 {
456 b.push(0x00);
457 }
458 b
459}
460
461fn schema_keys(schema: &Schema) -> Vec<&'static str> {
463 let _ = schema; vec![
465 "trace_id_hi",
466 "trace_id_lo",
467 "span_id",
468 "parent_span_id",
469 "name",
470 "service",
471 "start_time_ns",
472 "duration_ns",
473 "status_code",
474 "payload",
475 ]
476}
477
478fn extract_trace_span_id(obj: &[u8]) -> Option<(u128, u64)> {
481 let mut pos = 8usize;
484
485 let mut bitmask_bytes = 0usize;
487 loop {
488 if pos >= obj.len() {
489 return None;
490 }
491 let b = obj[pos];
492 pos += 1;
493 bitmask_bytes += 1;
494 if b & 0x80 == 0 {
495 break;
496 }
497 if bitmask_bytes > 16 {
498 return None;
499 }
500 }
501
502 let bitmask_start = 8;
506 let mut present = Vec::new();
507 let mut bp = bitmask_start;
508 loop {
509 if bp >= obj.len() {
510 break;
511 }
512 let byte = obj[bp];
513 bp += 1;
514 for bit in 0..7 {
515 present.push((byte >> bit) & 1 == 1);
516 }
517 if byte & 0x80 == 0 {
518 break;
519 }
520 }
521
522 let present_count = present.iter().filter(|&&b| b).count();
523 let ot_start = bp;
525 if ot_start + present_count * 2 > obj.len() {
526 return None;
527 }
528
529 let mut slot_to_ot: Vec<Option<usize>> = vec![None; present.len()];
531 let mut ot_idx = 0;
532 for (slot, &p) in present.iter().enumerate() {
533 if p {
534 slot_to_ot[slot] = Some(ot_idx);
535 ot_idx += 1;
536 }
537 }
538
539 let read_i64_at_slot = |slot: usize| -> Option<i64> {
540 let ot_i = slot_to_ot.get(slot)?.as_ref()?;
541 let ot_off = ot_start + ot_i * 2;
542 if ot_off + 2 > obj.len() {
543 return None;
544 }
545 let rel = u16::from_le_bytes(obj[ot_off..ot_off + 2].try_into().ok()?) as usize;
546 let val_off = rel; if val_off + 8 > obj.len() {
548 return None;
549 }
550 Some(i64::from_le_bytes(
551 obj[val_off..val_off + 8].try_into().ok()?,
552 ))
553 };
554
555 let hi = read_i64_at_slot(0)? as u64;
556 let lo = read_i64_at_slot(1)? as u64;
557 let span_id = read_i64_at_slot(2)? as u64;
558 let trace_id = ((hi as u128) << 64) | (lo as u128);
560 Some((trace_id, span_id))
561}
562
563struct DecodedSpan {
565 trace_id_hi: i64,
566 trace_id_lo: i64,
567 span_id: i64,
568 parent_span_id: Option<i64>,
569 name_owned: String,
570 service_owned: String,
571 start_time_ns: i64,
572 duration_ns: i64,
573 status_code: i64,
574 payload_owned: Option<Vec<u8>>,
575}
576
577const SPAN_KEYS: &[&str] = &[
578 "trace_id_hi",
579 "trace_id_lo",
580 "span_id",
581 "parent_span_id",
582 "name",
583 "service",
584 "start_time_ns",
585 "duration_ns",
586 "status_code",
587 "payload",
588];
589const SPAN_SIGILS: &[u8] = b"====\"\"@==<";
590
591fn decode_span_object(obj: &[u8]) -> Option<DecodedSpan> {
592 use crate::decoder::{decode_record_at, DecodedValue};
593
594 let keys: Vec<String> = SPAN_KEYS.iter().map(|s| s.to_string()).collect();
595 let sigils = SPAN_SIGILS;
596
597 let fields = decode_record_at(obj, 0, &keys, sigils).ok()?;
598 let get_i64 = |name: &str| -> Option<i64> {
599 fields.iter().find_map(|(k, v)| {
600 if k == name {
601 match v {
602 DecodedValue::Int(i) => Some(*i),
603 DecodedValue::Time(i) => Some(*i),
604 _ => None,
605 }
606 } else {
607 None
608 }
609 })
610 };
611 let get_str = |name: &str| -> String {
612 fields
613 .iter()
614 .find_map(|(k, v)| {
615 if k == name {
616 if let DecodedValue::Str(s) = v {
617 Some(s.clone())
618 } else {
619 None
620 }
621 } else {
622 None
623 }
624 })
625 .unwrap_or_default()
626 };
627 let get_bytes = |name: &str| -> Option<Vec<u8>> {
628 fields.iter().find_map(|(k, v)| {
629 if k == name {
630 if let DecodedValue::Binary(b) = v {
631 Some(b.clone())
632 } else {
633 None
634 }
635 } else {
636 None
637 }
638 })
639 };
640 let get_null = |name: &str| -> bool {
641 fields
642 .iter()
643 .any(|(k, v)| k == name && *v == DecodedValue::Null)
644 };
645
646 Some(DecodedSpan {
647 trace_id_hi: get_i64("trace_id_hi")?,
648 trace_id_lo: get_i64("trace_id_lo")?,
649 span_id: get_i64("span_id")?,
650 parent_span_id: if get_null("parent_span_id") {
651 None
652 } else {
653 get_i64("parent_span_id").filter(|&v| v != 0)
654 },
655 name_owned: get_str("name"),
656 service_owned: get_str("service"),
657 start_time_ns: get_i64("start_time_ns").unwrap_or(0),
658 duration_ns: get_i64("duration_ns").unwrap_or(0),
659 status_code: get_i64("status_code").unwrap_or(0),
660 payload_owned: get_bytes("payload"),
661 })
662}