1use std::collections::BTreeMap;
12use std::fs;
13use std::io::Write;
14use std::path::{Path, PathBuf};
15
16use fsqlite_error::{FrankenError, Result};
17use fsqlite_types::{ObjectId, Oti, SymbolRecord, SymbolRecordFlags, source_symbol_count};
18use tracing::{debug, error, info, warn};
19use xxhash_rust::xxh3::xxh3_64;
20
21const BEAD_ID: &str = "bd-1hi.24";
22const LOGGING_STANDARD_BEAD: &str = "bd-1fpm";
23
24pub const SYMBOL_SEGMENT_MAGIC: [u8; 4] = *b"FSSY";
26pub const SYMBOL_SEGMENT_VERSION: u32 = 1;
28pub const SYMBOL_SEGMENT_HEADER_BYTES: usize = 40;
30
31const SYMBOL_SEGMENT_HASH_INPUT_BYTES: usize = 32;
32
33const SYMBOL_RECORD_HEADER_BYTES: usize = 51;
36const SYMBOL_RECORD_TRAILER_BYTES: usize = 25;
37const SYMBOL_SIZE_FIELD_OFFSET: usize = 47;
38const SYMBOL_SIZE_FIELD_BYTES: usize = 4;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub struct SymbolSegmentHeader {
51 pub segment_id: u64,
53 pub epoch_id: u64,
55 pub created_at: u64,
57}
58
59impl SymbolSegmentHeader {
60 #[must_use]
62 pub const fn new(segment_id: u64, epoch_id: u64, created_at: u64) -> Self {
63 Self {
64 segment_id,
65 epoch_id,
66 created_at,
67 }
68 }
69
70 #[must_use]
72 pub fn encode(&self) -> [u8; SYMBOL_SEGMENT_HEADER_BYTES] {
73 let mut out = [0_u8; SYMBOL_SEGMENT_HEADER_BYTES];
74 out[0..4].copy_from_slice(&SYMBOL_SEGMENT_MAGIC);
75 out[4..8].copy_from_slice(&SYMBOL_SEGMENT_VERSION.to_le_bytes());
76 out[8..16].copy_from_slice(&self.segment_id.to_le_bytes());
77 out[16..24].copy_from_slice(&self.epoch_id.to_le_bytes());
78 out[24..32].copy_from_slice(&self.created_at.to_le_bytes());
79 let checksum = xxh3_64(&out[..SYMBOL_SEGMENT_HASH_INPUT_BYTES]);
80 out[32..40].copy_from_slice(&checksum.to_le_bytes());
81 out
82 }
83
84 pub fn decode(bytes: &[u8]) -> Result<Self> {
86 if bytes.len() < SYMBOL_SEGMENT_HEADER_BYTES {
87 return Err(FrankenError::DatabaseCorrupt {
88 detail: format!(
89 "symbol segment header too short: expected {SYMBOL_SEGMENT_HEADER_BYTES}, got {}",
90 bytes.len()
91 ),
92 });
93 }
94
95 if bytes[0..4] != SYMBOL_SEGMENT_MAGIC {
96 return Err(FrankenError::DatabaseCorrupt {
97 detail: format!("invalid symbol segment magic: {:02X?}", &bytes[0..4]),
98 });
99 }
100
101 let version = read_u32_at(bytes, 4, "version")?;
102 if version != SYMBOL_SEGMENT_VERSION {
103 return Err(FrankenError::DatabaseCorrupt {
104 detail: format!(
105 "unsupported symbol segment version {version}, expected {SYMBOL_SEGMENT_VERSION}"
106 ),
107 });
108 }
109
110 let segment_id = read_u64_at(bytes, 8, "segment_id")?;
111 let epoch_id = read_u64_at(bytes, 16, "epoch_id")?;
112 let created_at = read_u64_at(bytes, 24, "created_at")?;
113 let stored_checksum = read_u64_at(bytes, 32, "header_xxh3")?;
114 let computed_checksum = xxh3_64(&bytes[..SYMBOL_SEGMENT_HASH_INPUT_BYTES]);
115
116 if stored_checksum != computed_checksum {
117 return Err(FrankenError::DatabaseCorrupt {
118 detail: format!(
119 "symbol segment header checksum mismatch: stored {stored_checksum:#018X}, computed {computed_checksum:#018X}"
120 ),
121 });
122 }
123
124 Ok(Self {
125 segment_id,
126 epoch_id,
127 created_at,
128 })
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
134pub struct SymbolLogOffset {
135 pub segment_id: u64,
137 pub offset_bytes: u64,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct SymbolLogRecord {
144 pub offset: SymbolLogOffset,
146 pub record: SymbolRecord,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub struct SymbolSegmentScan {
153 pub header: SymbolSegmentHeader,
155 pub records: Vec<SymbolLogRecord>,
157 pub torn_tail: bool,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub struct AlignedSymbolIndexEntry {
164 pub offset: SymbolLogOffset,
166 pub logical_len: u32,
168 pub padded_len: u32,
170}
171
172#[derive(Debug, Clone)]
174pub struct SymbolLogManager {
175 symbols_dir: PathBuf,
176 active_header: SymbolSegmentHeader,
177}
178
179impl SymbolLogManager {
180 pub fn new(
182 symbols_dir: &Path,
183 active_segment_id: u64,
184 epoch_id: u64,
185 created_at: u64,
186 ) -> Result<Self> {
187 let active_header = SymbolSegmentHeader::new(active_segment_id, epoch_id, created_at);
188 let segment_path = symbol_segment_path(symbols_dir, active_segment_id);
189 ensure_symbol_segment(&segment_path, active_header)?;
190
191 info!(
192 bead_id = BEAD_ID,
193 logging_standard = LOGGING_STANDARD_BEAD,
194 segment_id = active_segment_id,
195 epoch_id,
196 "opened symbol log manager"
197 );
198
199 Ok(Self {
200 symbols_dir: symbols_dir.to_path_buf(),
201 active_header,
202 })
203 }
204
205 #[must_use]
207 pub const fn active_segment_id(&self) -> u64 {
208 self.active_header.segment_id
209 }
210
211 #[must_use]
213 pub fn active_segment_path(&self) -> PathBuf {
214 symbol_segment_path(&self.symbols_dir, self.active_header.segment_id)
215 }
216
217 pub fn append(&self, record: &SymbolRecord) -> Result<SymbolLogOffset> {
219 append_symbol_record(&self.symbols_dir, self.active_header, record)
220 }
221
222 pub fn append_to_segment(
226 &self,
227 segment_id: u64,
228 record: &SymbolRecord,
229 ) -> Result<SymbolLogOffset> {
230 if segment_id != self.active_header.segment_id {
231 warn!(
232 bead_id = BEAD_ID,
233 logging_standard = LOGGING_STANDARD_BEAD,
234 requested_segment = segment_id,
235 active_segment = self.active_header.segment_id,
236 "append rejected because segment is immutable"
237 );
238 return Err(FrankenError::Internal(format!(
239 "segment {segment_id} is immutable; active segment is {}",
240 self.active_header.segment_id
241 )));
242 }
243 self.append(record)
244 }
245
246 pub fn rotate(
248 &mut self,
249 next_segment_id: u64,
250 next_epoch_id: u64,
251 next_created_at: u64,
252 ) -> Result<()> {
253 if next_segment_id <= self.active_header.segment_id {
254 return Err(FrankenError::Internal(format!(
255 "next segment id {next_segment_id} must be greater than current {}",
256 self.active_header.segment_id
257 )));
258 }
259
260 let next_header = SymbolSegmentHeader::new(next_segment_id, next_epoch_id, next_created_at);
261 let next_path = symbol_segment_path(&self.symbols_dir, next_segment_id);
262 ensure_symbol_segment(&next_path, next_header)?;
263 self.active_header = next_header;
264
265 info!(
266 bead_id = BEAD_ID,
267 logging_standard = LOGGING_STANDARD_BEAD,
268 segment_id = next_segment_id,
269 epoch_id = next_epoch_id,
270 "rotated symbol log segment"
271 );
272
273 Ok(())
274 }
275}
276
277#[must_use]
279pub fn symbol_segment_path(symbols_dir: &Path, segment_id: u64) -> PathBuf {
280 symbols_dir.join(format!("segment-{segment_id:06}.log"))
281}
282
283pub fn ensure_symbol_segment(segment_path: &Path, header: SymbolSegmentHeader) -> Result<()> {
285 if let Some(parent) = segment_path.parent() {
286 if !parent.as_os_str().is_empty() {
287 fs::create_dir_all(parent)?;
288 }
289 }
290
291 if !segment_path.exists() {
292 let encoded = header.encode();
293 fs::write(segment_path, encoded)?;
294 info!(
295 bead_id = BEAD_ID,
296 logging_standard = LOGGING_STANDARD_BEAD,
297 path = %segment_path.display(),
298 segment_id = header.segment_id,
299 epoch_id = header.epoch_id,
300 "created symbol segment"
301 );
302 return Ok(());
303 }
304
305 let bytes = fs::read(segment_path)?;
306 if bytes.len() < SYMBOL_SEGMENT_HEADER_BYTES {
307 return Err(FrankenError::DatabaseCorrupt {
308 detail: format!(
309 "existing segment {} shorter than header: {} bytes",
310 segment_path.display(),
311 bytes.len()
312 ),
313 });
314 }
315
316 let existing = SymbolSegmentHeader::decode(&bytes[..SYMBOL_SEGMENT_HEADER_BYTES])?;
317 if existing != header {
318 return Err(FrankenError::DatabaseCorrupt {
319 detail: format!(
320 "segment header mismatch for {}: existing={existing:?}, requested={header:?}",
321 segment_path.display()
322 ),
323 });
324 }
325
326 Ok(())
327}
328
329pub fn append_symbol_record(
331 symbols_dir: &Path,
332 header: SymbolSegmentHeader,
333 record: &SymbolRecord,
334) -> Result<SymbolLogOffset> {
335 let segment_path = symbol_segment_path(symbols_dir, header.segment_id);
336 ensure_symbol_segment(&segment_path, header)?;
337
338 let current_len = file_len_usize(&segment_path)?;
339 if current_len < SYMBOL_SEGMENT_HEADER_BYTES {
340 return Err(FrankenError::DatabaseCorrupt {
341 detail: format!(
342 "segment {} length {} shorter than header",
343 segment_path.display(),
344 current_len
345 ),
346 });
347 }
348
349 let offset_bytes = usize_to_u64(
350 current_len - SYMBOL_SEGMENT_HEADER_BYTES,
351 "symbol log offset",
352 )?;
353
354 let mut file = fs::OpenOptions::new().append(true).open(&segment_path)?;
355 let record_bytes = record.to_bytes();
356 file.write_all(&record_bytes)?;
357 file.sync_data()?;
358
359 debug!(
360 bead_id = BEAD_ID,
361 logging_standard = LOGGING_STANDARD_BEAD,
362 path = %segment_path.display(),
363 segment_id = header.segment_id,
364 offset_bytes,
365 logical_len = record_bytes.len(),
366 "appended packed symbol record"
367 );
368
369 Ok(SymbolLogOffset {
370 segment_id: header.segment_id,
371 offset_bytes,
372 })
373}
374
375pub fn append_symbol_record_aligned(
379 symbols_dir: &Path,
380 header: SymbolSegmentHeader,
381 record: &SymbolRecord,
382 sector_size: u32,
383) -> Result<AlignedSymbolIndexEntry> {
384 if sector_size == 0 {
385 return Err(FrankenError::Internal(
386 "sector_size must be non-zero for aligned symbol append".to_owned(),
387 ));
388 }
389
390 let segment_path = symbol_segment_path(symbols_dir, header.segment_id);
391 ensure_symbol_segment(&segment_path, header)?;
392
393 let current_len = file_len_usize(&segment_path)?;
394 if current_len < SYMBOL_SEGMENT_HEADER_BYTES {
395 return Err(FrankenError::DatabaseCorrupt {
396 detail: format!(
397 "segment {} length {} shorter than header",
398 segment_path.display(),
399 current_len
400 ),
401 });
402 }
403
404 let record_bytes = record.to_bytes();
405 let logical_len = record_bytes.len();
406 let alignment_bytes = u32_to_usize(sector_size, "sector_size")?;
407 let padded_len = align_up(logical_len, alignment_bytes)?;
408 let padding = padded_len.saturating_sub(logical_len);
409
410 let offset = SymbolLogOffset {
411 segment_id: header.segment_id,
412 offset_bytes: usize_to_u64(
413 current_len - SYMBOL_SEGMENT_HEADER_BYTES,
414 "symbol log offset",
415 )?,
416 };
417
418 let mut file = fs::OpenOptions::new().append(true).open(&segment_path)?;
419 file.write_all(&record_bytes)?;
420 if padding > 0 {
421 file.write_all(&vec![0_u8; padding])?;
422 }
423 file.sync_data()?;
424
425 let entry = AlignedSymbolIndexEntry {
426 offset,
427 logical_len: usize_to_u32(logical_len, "logical_len")?,
428 padded_len: usize_to_u32(padded_len, "padded_len")?,
429 };
430
431 debug!(
432 bead_id = BEAD_ID,
433 logging_standard = LOGGING_STANDARD_BEAD,
434 path = %segment_path.display(),
435 segment_id = header.segment_id,
436 offset_bytes = offset.offset_bytes,
437 logical_len = entry.logical_len,
438 padded_len = entry.padded_len,
439 sector_size,
440 "appended aligned symbol record"
441 );
442
443 Ok(entry)
444}
445
446pub fn scan_symbol_segment(segment_path: &Path) -> Result<SymbolSegmentScan> {
448 let bytes = fs::read(segment_path)?;
449 if bytes.len() < SYMBOL_SEGMENT_HEADER_BYTES {
450 return Err(FrankenError::DatabaseCorrupt {
451 detail: format!(
452 "segment {} shorter than header: {} bytes",
453 segment_path.display(),
454 bytes.len()
455 ),
456 });
457 }
458
459 let header = SymbolSegmentHeader::decode(&bytes[..SYMBOL_SEGMENT_HEADER_BYTES])?;
460 let mut cursor = SYMBOL_SEGMENT_HEADER_BYTES;
461 let mut records = Vec::new();
462 let mut torn_tail = false;
463
464 while cursor < bytes.len() {
465 let parsed = parse_symbol_record_at(&bytes, header.segment_id, cursor)?;
466 let Some((record, len)) = parsed else {
467 torn_tail = true;
468 warn!(
469 bead_id = BEAD_ID,
470 logging_standard = LOGGING_STANDARD_BEAD,
471 path = %segment_path.display(),
472 segment_id = header.segment_id,
473 absolute_offset = cursor,
474 "detected torn tail while scanning symbol segment"
475 );
476 break;
477 };
478 records.push(record);
479 cursor = cursor
480 .checked_add(len)
481 .ok_or_else(|| FrankenError::DatabaseCorrupt {
482 detail: "cursor overflow while scanning symbol segment".to_owned(),
483 })?;
484 }
485
486 info!(
487 bead_id = BEAD_ID,
488 logging_standard = LOGGING_STANDARD_BEAD,
489 path = %segment_path.display(),
490 segment_id = header.segment_id,
491 record_count = records.len(),
492 torn_tail,
493 "scanned symbol segment"
494 );
495
496 Ok(SymbolSegmentScan {
497 header,
498 records,
499 torn_tail,
500 })
501}
502
503pub fn read_symbol_record_at_offset(
505 segment_path: &Path,
506 offset: SymbolLogOffset,
507) -> Result<SymbolRecord> {
508 let bytes = fs::read(segment_path)?;
509 if bytes.len() < SYMBOL_SEGMENT_HEADER_BYTES {
510 return Err(FrankenError::DatabaseCorrupt {
511 detail: format!(
512 "segment {} shorter than header: {} bytes",
513 segment_path.display(),
514 bytes.len()
515 ),
516 });
517 }
518
519 let header = SymbolSegmentHeader::decode(&bytes[..SYMBOL_SEGMENT_HEADER_BYTES])?;
520 if header.segment_id != offset.segment_id {
521 return Err(FrankenError::DatabaseCorrupt {
522 detail: format!(
523 "segment id mismatch: locator={}, header={}",
524 offset.segment_id, header.segment_id
525 ),
526 });
527 }
528
529 let offset_usize = u64_to_usize(offset.offset_bytes, "offset_bytes")?;
530 let absolute_offset = SYMBOL_SEGMENT_HEADER_BYTES
531 .checked_add(offset_usize)
532 .ok_or_else(|| FrankenError::DatabaseCorrupt {
533 detail: "absolute offset overflow while reading symbol record".to_owned(),
534 })?;
535
536 let Some((record, _)) = parse_symbol_record_at(&bytes, header.segment_id, absolute_offset)?
537 else {
538 return Err(FrankenError::DatabaseCorrupt {
539 detail: format!(
540 "no complete symbol record at offset {} in {}",
541 offset.offset_bytes,
542 segment_path.display()
543 ),
544 });
545 };
546
547 Ok(record.record)
548}
549
550pub fn read_aligned_symbol_record(
552 segment_path: &Path,
553 entry: AlignedSymbolIndexEntry,
554) -> Result<SymbolRecord> {
555 let bytes = fs::read(segment_path)?;
556 if bytes.len() < SYMBOL_SEGMENT_HEADER_BYTES {
557 return Err(FrankenError::DatabaseCorrupt {
558 detail: format!(
559 "segment {} shorter than header: {} bytes",
560 segment_path.display(),
561 bytes.len()
562 ),
563 });
564 }
565
566 let header = SymbolSegmentHeader::decode(&bytes[..SYMBOL_SEGMENT_HEADER_BYTES])?;
567 if header.segment_id != entry.offset.segment_id {
568 return Err(FrankenError::DatabaseCorrupt {
569 detail: format!(
570 "segment id mismatch: locator={}, header={}",
571 entry.offset.segment_id, header.segment_id
572 ),
573 });
574 }
575
576 let offset_usize = u64_to_usize(entry.offset.offset_bytes, "offset_bytes")?;
577 let absolute_offset = SYMBOL_SEGMENT_HEADER_BYTES
578 .checked_add(offset_usize)
579 .ok_or_else(|| FrankenError::DatabaseCorrupt {
580 detail: "absolute offset overflow while reading aligned symbol".to_owned(),
581 })?;
582 let logical_len = u32_to_usize(entry.logical_len, "logical_len")?;
583 let padded_len = u32_to_usize(entry.padded_len, "padded_len")?;
584 if logical_len > padded_len {
585 return Err(FrankenError::DatabaseCorrupt {
586 detail: format!(
587 "invalid aligned index entry: logical_len {} exceeds padded_len {}",
588 entry.logical_len, entry.padded_len
589 ),
590 });
591 }
592 let padded_end =
593 absolute_offset
594 .checked_add(padded_len)
595 .ok_or_else(|| FrankenError::DatabaseCorrupt {
596 detail: "aligned padded read overflow".to_owned(),
597 })?;
598 if padded_end > bytes.len() {
599 return Err(FrankenError::DatabaseCorrupt {
600 detail: format!(
601 "aligned symbol padded range out of bounds: end={}, file_len={}",
602 padded_end,
603 bytes.len()
604 ),
605 });
606 }
607 let end =
608 absolute_offset
609 .checked_add(logical_len)
610 .ok_or_else(|| FrankenError::DatabaseCorrupt {
611 detail: "aligned logical read overflow".to_owned(),
612 })?;
613 if end > padded_end {
614 return Err(FrankenError::DatabaseCorrupt {
615 detail: format!(
616 "aligned logical range exceeds padded slot: logical_end={}, padded_end={}",
617 end, padded_end
618 ),
619 });
620 }
621
622 SymbolRecord::from_bytes(&bytes[absolute_offset..end]).map_err(|err| {
623 error!(
624 bead_id = BEAD_ID,
625 logging_standard = LOGGING_STANDARD_BEAD,
626 path = %segment_path.display(),
627 offset_bytes = entry.offset.offset_bytes,
628 error = %err,
629 "failed to decode aligned symbol record"
630 );
631 FrankenError::DatabaseCorrupt {
632 detail: format!(
633 "invalid aligned SymbolRecord at offset {}: {err}",
634 entry.offset.offset_bytes
635 ),
636 }
637 })
638}
639
640pub fn rebuild_object_locator(
642 symbols_dir: &Path,
643) -> Result<BTreeMap<ObjectId, Vec<SymbolLogOffset>>> {
644 let mut locator: BTreeMap<ObjectId, Vec<SymbolLogOffset>> = BTreeMap::new();
645 let segments = sorted_segment_paths(symbols_dir)?;
646
647 for (segment_id, path) in segments {
648 let scan = scan_symbol_segment(&path)?;
649 for row in scan.records {
650 locator
651 .entry(row.record.object_id)
652 .or_default()
653 .push(row.offset);
654 }
655 if scan.torn_tail {
656 warn!(
657 bead_id = BEAD_ID,
658 logging_standard = LOGGING_STANDARD_BEAD,
659 segment_id,
660 path = %path.display(),
661 "locator rebuild ignored torn tail in segment"
662 );
663 }
664 }
665
666 for offsets in locator.values_mut() {
667 offsets.sort_unstable();
668 }
669
670 info!(
671 bead_id = BEAD_ID,
672 logging_standard = LOGGING_STANDARD_BEAD,
673 objects = locator.len(),
674 "rebuilt object locator from symbol segments"
675 );
676
677 Ok(locator)
678}
679
680#[derive(Debug, Clone, PartialEq, Eq)]
686pub struct SystematicRunLocator {
687 pub object_id: ObjectId,
689 pub segment_id: u64,
691 pub esi_start: u32,
693 pub esi_end_inclusive: u32,
695 pub offsets: Vec<SymbolLogOffset>,
697}
698
699impl SystematicRunLocator {
700 #[must_use]
702 pub fn source_symbol_count(&self) -> usize {
703 self.offsets.len()
704 }
705}
706
707pub fn rebuild_systematic_run_locator(
718 symbols_dir: &Path,
719) -> Result<BTreeMap<ObjectId, SystematicRunLocator>> {
720 let mut locator: BTreeMap<ObjectId, SystematicRunLocator> = BTreeMap::new();
721 let segments = sorted_segment_paths(symbols_dir)?;
722
723 for (segment_id, path) in segments {
724 let scan = scan_symbol_segment(&path)?;
725 let rows = &scan.records;
726
727 for start_idx in 0..rows.len() {
728 let start = &rows[start_idx].record;
729 if start.esi != 0
730 || !start
731 .flags
732 .contains(SymbolRecordFlags::SYSTEMATIC_RUN_START)
733 {
734 continue;
735 }
736
737 match build_systematic_run_locator(rows, start_idx) {
738 Ok(run) => {
739 locator.insert(run.object_id, run);
740 }
741 Err(detail) => {
742 warn!(
743 bead_id = BEAD_ID,
744 logging_standard = LOGGING_STANDARD_BEAD,
745 segment_id,
746 path = %path.display(),
747 start_offset = rows[start_idx].offset.offset_bytes,
748 start_object_id = %start.object_id,
749 reason = %detail,
750 "invalid systematic run start; fast-path must fall back"
751 );
752 }
753 }
754 }
755
756 if scan.torn_tail {
757 warn!(
758 bead_id = BEAD_ID,
759 logging_standard = LOGGING_STANDARD_BEAD,
760 segment_id,
761 path = %path.display(),
762 "systematic-run locator rebuild ignored torn tail in segment"
763 );
764 }
765 }
766
767 info!(
768 bead_id = BEAD_ID,
769 logging_standard = LOGGING_STANDARD_BEAD,
770 objects = locator.len(),
771 "rebuilt systematic run locator from symbol segments"
772 );
773
774 Ok(locator)
775}
776
777#[derive(Debug, Clone, Copy)]
788struct SystematicFastPathPlan {
789 source_symbols: usize,
790 symbol_size: usize,
791 transfer_len: usize,
792 total_len: usize,
793}
794
795#[derive(Debug, Clone, Copy)]
796struct SystematicFastPathExpectations<'a> {
797 run: &'a SystematicRunLocator,
798 object_id: ObjectId,
799 oti: Oti,
800 symbol_size: usize,
801 auth_epoch_key: Option<&'a [u8; 32]>,
802}
803
804fn fast_path_unavailable(object_id: ObjectId, detail: &str) {
805 warn!(
806 bead_id = BEAD_ID,
807 logging_standard = LOGGING_STANDARD_BEAD,
808 object_id = %object_id,
809 detail,
810 "systematic fast path unavailable"
811 );
812}
813
814fn fast_path_unavailable_esi(object_id: ObjectId, expected_esi: u32, detail: &str) {
815 warn!(
816 bead_id = BEAD_ID,
817 logging_standard = LOGGING_STANDARD_BEAD,
818 object_id = %object_id,
819 expected_esi,
820 detail,
821 "systematic fast path unavailable"
822 );
823}
824
825pub fn read_systematic_fast_path(
826 symbols_dir: &Path,
827 run: &SystematicRunLocator,
828 object_id: ObjectId,
829 oti: Oti,
830 auth_epoch_key: Option<&[u8; 32]>,
831) -> Result<Option<Vec<u8>>> {
832 let Some(plan) = build_systematic_fast_path_plan(run, object_id, oti) else {
833 return Ok(None);
834 };
835 if plan.source_symbols == 0 {
836 return Ok(Some(Vec::new()));
837 }
838
839 let Some((bytes, _header)) = load_systematic_fast_path_segment(symbols_dir, run, object_id)?
840 else {
841 return Ok(None);
842 };
843
844 let expectations = SystematicFastPathExpectations {
845 run,
846 object_id,
847 oti,
848 symbol_size: plan.symbol_size,
849 auth_epoch_key,
850 };
851 let mut out = vec![0_u8; plan.total_len];
852
853 for (index, offset) in run.offsets.iter().copied().enumerate() {
854 let Ok(expected_esi) = u32::try_from(index) else {
855 fast_path_unavailable(object_id, "index does not fit ESI");
856 return Ok(None);
857 };
858 let Some(parsed) =
859 read_systematic_fast_path_record(&bytes, &expectations, offset, expected_esi)
860 else {
861 return Ok(None);
862 };
863
864 let Some(start) = index.checked_mul(plan.symbol_size) else {
865 fast_path_unavailable_esi(object_id, expected_esi, "output offset overflow");
866 return Ok(None);
867 };
868 let Some(end) = start.checked_add(plan.symbol_size) else {
869 fast_path_unavailable_esi(object_id, expected_esi, "output end overflow");
870 return Ok(None);
871 };
872 if end > out.len() {
873 fast_path_unavailable_esi(object_id, expected_esi, "output bounds check failed");
874 return Ok(None);
875 }
876 out[start..end].copy_from_slice(&parsed.symbol_data);
877 }
878
879 out.truncate(plan.transfer_len);
880 Ok(Some(out))
881}
882
883fn build_systematic_fast_path_plan(
884 run: &SystematicRunLocator,
885 object_id: ObjectId,
886 oti: Oti,
887) -> Option<SystematicFastPathPlan> {
888 let source_symbols = match source_symbol_count(oti) {
889 Ok(value) => value,
890 Err(err) => {
891 let detail = format!("invalid source symbol count: {err}");
892 fast_path_unavailable(object_id, &detail);
893 return None;
894 }
895 };
896 if source_symbols == 0 {
897 return Some(SystematicFastPathPlan {
898 source_symbols,
899 symbol_size: 0,
900 transfer_len: 0,
901 total_len: 0,
902 });
903 }
904 if run.object_id != object_id {
905 fast_path_unavailable(object_id, "locator object mismatch");
906 return None;
907 }
908 if run.esi_start != 0 {
909 fast_path_unavailable(object_id, "run does not start at ESI 0");
910 return None;
911 }
912 if run.offsets.len() != source_symbols {
913 let detail = format!(
914 "locator offset count mismatch: expected={source_symbols} found={}",
915 run.offsets.len()
916 );
917 fast_path_unavailable(object_id, &detail);
918 return None;
919 }
920 let Ok(expected_end) = u32::try_from(source_symbols.saturating_sub(1)) else {
921 fast_path_unavailable(object_id, "source symbol count exceeds ESI range");
922 return None;
923 };
924 if run.esi_end_inclusive != expected_end {
925 fast_path_unavailable(object_id, "locator ESI range mismatch");
926 return None;
927 }
928
929 let Ok(symbol_size) = usize::try_from(oti.t) else {
930 fast_path_unavailable(object_id, "invalid OTI.t");
931 return None;
932 };
933 let Ok(transfer_len) = usize::try_from(oti.f) else {
934 fast_path_unavailable(object_id, "invalid OTI.f");
935 return None;
936 };
937 let Some(total_len) = source_symbols.checked_mul(symbol_size) else {
938 fast_path_unavailable(object_id, "reconstruction size overflow");
939 return None;
940 };
941
942 Some(SystematicFastPathPlan {
943 source_symbols,
944 symbol_size,
945 transfer_len,
946 total_len,
947 })
948}
949
950fn load_systematic_fast_path_segment(
951 symbols_dir: &Path,
952 run: &SystematicRunLocator,
953 object_id: ObjectId,
954) -> Result<Option<(Vec<u8>, SymbolSegmentHeader)>> {
955 let segment_path = symbol_segment_path(symbols_dir, run.segment_id);
956 if !segment_path.exists() {
957 fast_path_unavailable(object_id, "locator segment missing");
958 return Ok(None);
959 }
960
961 let bytes = fs::read(&segment_path)?;
962 if bytes.len() < SYMBOL_SEGMENT_HEADER_BYTES {
963 fast_path_unavailable(object_id, "segment shorter than header");
964 return Ok(None);
965 }
966
967 let header = match SymbolSegmentHeader::decode(&bytes[..SYMBOL_SEGMENT_HEADER_BYTES]) {
968 Ok(value) => value,
969 Err(err) => {
970 let detail = format!("invalid segment header: {err}");
971 fast_path_unavailable(object_id, &detail);
972 return Ok(None);
973 }
974 };
975 if header.segment_id != run.segment_id {
976 fast_path_unavailable(object_id, "segment id mismatch");
977 return Ok(None);
978 }
979
980 Ok(Some((bytes, header)))
981}
982
983fn read_systematic_fast_path_record(
984 bytes: &[u8],
985 expectations: &SystematicFastPathExpectations<'_>,
986 offset: SymbolLogOffset,
987 expected_esi: u32,
988) -> Option<SymbolRecord> {
989 if offset.segment_id != expectations.run.segment_id {
990 fast_path_unavailable_esi(
991 expectations.object_id,
992 expected_esi,
993 "wrong segment in offset",
994 );
995 return None;
996 }
997
998 let Ok(offset_usize) = usize::try_from(offset.offset_bytes) else {
999 fast_path_unavailable_esi(expectations.object_id, expected_esi, "bad record offset");
1000 return None;
1001 };
1002 let Some(absolute_offset) = SYMBOL_SEGMENT_HEADER_BYTES.checked_add(offset_usize) else {
1003 fast_path_unavailable_esi(
1004 expectations.object_id,
1005 expected_esi,
1006 "absolute offset overflow",
1007 );
1008 return None;
1009 };
1010
1011 let parsed = match parse_symbol_record_at(bytes, expectations.run.segment_id, absolute_offset) {
1012 Ok(Some((row, _))) => row.record,
1013 Ok(None) => {
1014 fast_path_unavailable_esi(
1015 expectations.object_id,
1016 expected_esi,
1017 "missing symbol record at offset",
1018 );
1019 return None;
1020 }
1021 Err(err) => {
1022 let detail = format!("invalid symbol record: {err}");
1023 fast_path_unavailable_esi(expectations.object_id, expected_esi, &detail);
1024 return None;
1025 }
1026 };
1027
1028 if parsed.object_id != expectations.object_id {
1029 fast_path_unavailable_esi(expectations.object_id, expected_esi, "object mismatch");
1030 return None;
1031 }
1032 if parsed.oti != expectations.oti {
1033 fast_path_unavailable_esi(expectations.object_id, expected_esi, "OTI mismatch");
1034 return None;
1035 }
1036 if parsed.esi != expected_esi {
1037 fast_path_unavailable_esi(expectations.object_id, expected_esi, "non-contiguous ESI");
1038 return None;
1039 }
1040 if parsed.symbol_data.len() != expectations.symbol_size {
1041 fast_path_unavailable_esi(expectations.object_id, expected_esi, "symbol size mismatch");
1042 return None;
1043 }
1044 if !parsed.verify_integrity() {
1045 fast_path_unavailable_esi(
1046 expectations.object_id,
1047 expected_esi,
1048 "integrity check failed",
1049 );
1050 return None;
1051 }
1052 if parsed.auth_tag != [0_u8; 16] {
1053 let Some(epoch_key) = expectations.auth_epoch_key else {
1054 fast_path_unavailable_esi(
1055 expectations.object_id,
1056 expected_esi,
1057 "auth tag present but no epoch key provided",
1058 );
1059 return None;
1060 };
1061 if !parsed.verify_auth(epoch_key) {
1062 fast_path_unavailable_esi(expectations.object_id, expected_esi, "auth check failed");
1063 return None;
1064 }
1065 }
1066
1067 Some(parsed)
1068}
1069
1070fn build_systematic_run_locator(
1071 rows: &[SymbolLogRecord],
1072 start_idx: usize,
1073) -> std::result::Result<SystematicRunLocator, String> {
1074 let start_row = rows
1075 .get(start_idx)
1076 .ok_or_else(|| format!("run start index {start_idx} out of bounds"))?;
1077 let start = &start_row.record;
1078 let source_symbols = source_symbol_count(start.oti)
1079 .map_err(|err| format!("invalid source symbol count at run start: {err}"))?;
1080 if source_symbols == 0 {
1081 return Err("source symbol count is zero".to_owned());
1082 }
1083 let source_symbols_u32 = u32::try_from(source_symbols)
1084 .map_err(|_| format!("source symbol count does not fit u32: {source_symbols}"))?;
1085 let end_exclusive = start_idx
1086 .checked_add(source_symbols)
1087 .ok_or_else(|| "systematic run index overflow".to_owned())?;
1088 if end_exclusive > rows.len() {
1089 return Err(format!(
1090 "incomplete systematic run: need {} rows from index {}, have {}",
1091 source_symbols,
1092 start_idx,
1093 rows.len().saturating_sub(start_idx)
1094 ));
1095 }
1096
1097 let mut offsets = Vec::with_capacity(source_symbols);
1098 for relative in 0..source_symbols {
1099 let row = &rows[start_idx + relative];
1100 let rec = &row.record;
1101 let expected_esi = u32::try_from(relative).expect("relative index fits u32");
1102
1103 if rec.object_id != start.object_id {
1104 return Err(format!(
1105 "object boundary at relative={} expected={} found={}",
1106 relative, start.object_id, rec.object_id
1107 ));
1108 }
1109 if rec.oti != start.oti {
1110 return Err(format!(
1111 "OTI mismatch at relative={} expected={:?} found={:?}",
1112 relative, start.oti, rec.oti
1113 ));
1114 }
1115 if rec.esi != expected_esi {
1116 return Err(format!(
1117 "non-contiguous ESI at relative={} expected={} found={}",
1118 relative, expected_esi, rec.esi
1119 ));
1120 }
1121 if relative == 0 {
1122 if !rec.flags.contains(SymbolRecordFlags::SYSTEMATIC_RUN_START) {
1123 return Err("missing SYSTEMATIC_RUN_START on ESI 0".to_owned());
1124 }
1125 } else if rec.flags.contains(SymbolRecordFlags::SYSTEMATIC_RUN_START) {
1126 return Err(format!(
1127 "unexpected SYSTEMATIC_RUN_START on non-zero ESI {}",
1128 rec.esi
1129 ));
1130 }
1131
1132 offsets.push(row.offset);
1133 }
1134
1135 Ok(SystematicRunLocator {
1136 object_id: start.object_id,
1137 segment_id: start_row.offset.segment_id,
1138 esi_start: 0,
1139 esi_end_inclusive: source_symbols_u32.saturating_sub(1),
1140 offsets,
1141 })
1142}
1143
1144fn parse_symbol_record_at(
1145 bytes: &[u8],
1146 segment_id: u64,
1147 absolute_offset: usize,
1148) -> Result<Option<(SymbolLogRecord, usize)>> {
1149 if absolute_offset >= bytes.len() {
1150 return Ok(None);
1151 }
1152
1153 let Some(record_len) = record_wire_len_at(bytes, absolute_offset)? else {
1154 return Ok(None);
1155 };
1156
1157 let end =
1158 absolute_offset
1159 .checked_add(record_len)
1160 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1161 detail: "record end overflow while parsing symbol record".to_owned(),
1162 })?;
1163 let record = SymbolRecord::from_bytes(&bytes[absolute_offset..end]).map_err(|err| {
1164 error!(
1165 bead_id = BEAD_ID,
1166 logging_standard = LOGGING_STANDARD_BEAD,
1167 segment_id,
1168 absolute_offset,
1169 error = %err,
1170 "failed to decode SymbolRecord during scan"
1171 );
1172 FrankenError::DatabaseCorrupt {
1173 detail: format!("invalid SymbolRecord at absolute offset {absolute_offset}: {err}"),
1174 }
1175 })?;
1176
1177 let offset_without_header = absolute_offset
1178 .checked_sub(SYMBOL_SEGMENT_HEADER_BYTES)
1179 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1180 detail: format!(
1181 "record offset {absolute_offset} precedes segment header of {SYMBOL_SEGMENT_HEADER_BYTES} bytes"
1182 ),
1183 })?;
1184
1185 let offset = SymbolLogOffset {
1186 segment_id,
1187 offset_bytes: usize_to_u64(offset_without_header, "offset_without_header")?,
1188 };
1189
1190 Ok(Some((SymbolLogRecord { offset, record }, record_len)))
1191}
1192
1193fn record_wire_len_at(bytes: &[u8], absolute_offset: usize) -> Result<Option<usize>> {
1194 let remaining = bytes.len().saturating_sub(absolute_offset);
1195 if remaining < SYMBOL_RECORD_HEADER_BYTES {
1196 return Ok(None);
1197 }
1198
1199 let size_start = absolute_offset
1200 .checked_add(SYMBOL_SIZE_FIELD_OFFSET)
1201 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1202 detail: "symbol size field offset overflow".to_owned(),
1203 })?;
1204 let size_end = size_start
1205 .checked_add(SYMBOL_SIZE_FIELD_BYTES)
1206 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1207 detail: "symbol size field end overflow".to_owned(),
1208 })?;
1209 let symbol_size_u32 = read_u32_at(bytes, size_start, "symbol_size")?;
1210 let symbol_size = u32_to_usize(symbol_size_u32, "symbol_size")?;
1211
1212 let total_len = SYMBOL_RECORD_HEADER_BYTES
1213 .checked_add(symbol_size)
1214 .and_then(|v| v.checked_add(SYMBOL_RECORD_TRAILER_BYTES))
1215 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1216 detail: "symbol record size overflow".to_owned(),
1217 })?;
1218 if remaining < total_len {
1219 return Ok(None);
1220 }
1221
1222 if size_end > bytes.len() {
1223 return Err(FrankenError::DatabaseCorrupt {
1224 detail: format!(
1225 "symbol size field out of bounds: end={}, file_len={}",
1226 size_end,
1227 bytes.len()
1228 ),
1229 });
1230 }
1231
1232 Ok(Some(total_len))
1233}
1234
1235fn sorted_segment_paths(symbols_dir: &Path) -> Result<Vec<(u64, PathBuf)>> {
1236 if !symbols_dir.exists() {
1237 return Ok(Vec::new());
1238 }
1239
1240 let mut segments = Vec::new();
1241 for entry in fs::read_dir(symbols_dir)? {
1242 let entry = entry?;
1243 if !entry.file_type()?.is_file() {
1244 continue;
1245 }
1246 let name = entry.file_name();
1247 let Some(name) = name.to_str() else {
1248 continue;
1249 };
1250 let Some(segment_id) = parse_segment_id_from_name(name) else {
1251 continue;
1252 };
1253 segments.push((segment_id, entry.path()));
1254 }
1255 segments.sort_by_key(|(segment_id, _)| *segment_id);
1256 Ok(segments)
1257}
1258
1259fn parse_segment_id_from_name(file_name: &str) -> Option<u64> {
1260 let prefix = "segment-";
1261 let suffix = ".log";
1262 if !file_name.starts_with(prefix) || !file_name.ends_with(suffix) {
1263 return None;
1264 }
1265 let id_text = &file_name[prefix.len()..file_name.len() - suffix.len()];
1266 id_text.parse::<u64>().ok()
1267}
1268
1269fn file_len_usize(path: &Path) -> Result<usize> {
1270 let len = fs::metadata(path)?.len();
1271 u64_to_usize(len, "file length")
1272}
1273
1274fn align_up(value: usize, alignment: usize) -> Result<usize> {
1275 if alignment == 0 {
1276 return Err(FrankenError::Internal(
1277 "alignment must be non-zero".to_owned(),
1278 ));
1279 }
1280 let remainder = value % alignment;
1281 if remainder == 0 {
1282 return Ok(value);
1283 }
1284 value
1285 .checked_add(alignment - remainder)
1286 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1287 detail: "alignment overflow".to_owned(),
1288 })
1289}
1290
1291fn read_u32_at(bytes: &[u8], start: usize, field: &str) -> Result<u32> {
1292 let end = start
1293 .checked_add(4)
1294 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1295 detail: format!("overflow while reading field {field}"),
1296 })?;
1297 let slice = bytes
1298 .get(start..end)
1299 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1300 detail: format!(
1301 "field {field} out of bounds: start={start}, end={end}, len={}",
1302 bytes.len()
1303 ),
1304 })?;
1305 let array: [u8; 4] = slice
1306 .try_into()
1307 .map_err(|_| FrankenError::DatabaseCorrupt {
1308 detail: format!("failed to parse field {field}"),
1309 })?;
1310 Ok(u32::from_le_bytes(array))
1311}
1312
1313fn read_u64_at(bytes: &[u8], start: usize, field: &str) -> Result<u64> {
1314 let end = start
1315 .checked_add(8)
1316 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1317 detail: format!("overflow while reading field {field}"),
1318 })?;
1319 let slice = bytes
1320 .get(start..end)
1321 .ok_or_else(|| FrankenError::DatabaseCorrupt {
1322 detail: format!(
1323 "field {field} out of bounds: start={start}, end={end}, len={}",
1324 bytes.len()
1325 ),
1326 })?;
1327 let array: [u8; 8] = slice
1328 .try_into()
1329 .map_err(|_| FrankenError::DatabaseCorrupt {
1330 detail: format!("failed to parse field {field}"),
1331 })?;
1332 Ok(u64::from_le_bytes(array))
1333}
1334
1335fn u64_to_usize(value: u64, what: &str) -> Result<usize> {
1336 usize::try_from(value).map_err(|_| FrankenError::DatabaseCorrupt {
1337 detail: format!("{what} does not fit in usize: {value}"),
1338 })
1339}
1340
1341fn usize_to_u64(value: usize, what: &str) -> Result<u64> {
1342 u64::try_from(value).map_err(|_| FrankenError::DatabaseCorrupt {
1343 detail: format!("{what} does not fit in u64: {value}"),
1344 })
1345}
1346
1347fn u32_to_usize(value: u32, what: &str) -> Result<usize> {
1348 usize::try_from(value).map_err(|_| FrankenError::DatabaseCorrupt {
1349 detail: format!("{what} does not fit in usize: {value}"),
1350 })
1351}
1352
1353fn usize_to_u32(value: usize, what: &str) -> Result<u32> {
1354 u32::try_from(value).map_err(|_| FrankenError::DatabaseCorrupt {
1355 detail: format!("{what} does not fit in u32: {value}"),
1356 })
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361 use std::collections::BTreeMap;
1362 use std::fs::OpenOptions;
1363 use std::io::Write;
1364
1365 use fsqlite_types::{ObjectId, Oti, SymbolRecordFlags};
1366 use tempfile::tempdir;
1367
1368 use super::*;
1369
1370 const BD_1HI_24_COMPLIANCE_SENTINEL: &str = "test_bd_1hi_24_unit_compliance_gate prop_bd_1hi_24_structure_compliance \
1371 test_e2e_bd_1hi_24_compliance DEBUG INFO WARN ERROR bd-1fpm";
1372
1373 fn test_record(object_seed: u8, esi: u32, symbol_size: u32, fill: u8) -> SymbolRecord {
1374 let symbol_len = usize::try_from(symbol_size).expect("symbol_size fits usize for tests");
1375 let oti = Oti {
1376 f: u64::from(symbol_size),
1377 al: 1,
1378 t: symbol_size,
1379 z: 1,
1380 n: 1,
1381 };
1382 let mut data = vec![fill; symbol_len];
1383 data[0] = object_seed;
1384 SymbolRecord::new(
1385 ObjectId::from_bytes([object_seed; 16]),
1386 oti,
1387 esi,
1388 data,
1389 SymbolRecordFlags::empty(),
1390 )
1391 }
1392
1393 fn systematic_record(
1394 object_seed: u8,
1395 oti: Oti,
1396 esi: u32,
1397 fill: u8,
1398 systematic_start: bool,
1399 ) -> SymbolRecord {
1400 let symbol_len = usize::try_from(oti.t).expect("OTI.t fits usize for tests");
1401 let mut data = vec![fill; symbol_len];
1402 if let Some(first) = data.first_mut() {
1403 let esi_tag = u8::try_from(esi).unwrap_or(0);
1404 *first = object_seed.wrapping_add(esi_tag);
1405 }
1406 let flags = if systematic_start {
1407 SymbolRecordFlags::SYSTEMATIC_RUN_START
1408 } else {
1409 SymbolRecordFlags::empty()
1410 };
1411 SymbolRecord::new(
1412 ObjectId::from_bytes([object_seed; 16]),
1413 oti,
1414 esi,
1415 data,
1416 flags,
1417 )
1418 }
1419
1420 #[test]
1421 fn test_symbol_segment_header_encode_decode() {
1422 let header = SymbolSegmentHeader::new(17, 42, 1_731_000_000);
1423 let bytes = header.encode();
1424 assert_eq!(bytes.len(), SYMBOL_SEGMENT_HEADER_BYTES);
1425 let decoded = SymbolSegmentHeader::decode(&bytes).expect("decode header");
1426 assert_eq!(decoded, header);
1427 }
1428
1429 #[test]
1430 fn test_symbol_segment_header_magic() {
1431 let header = SymbolSegmentHeader::new(3, 7, 99);
1432 let mut bytes = header.encode();
1433 bytes[0] = b'X';
1434 let err = SymbolSegmentHeader::decode(&bytes).expect_err("bad magic must fail");
1435 assert!(err.to_string().contains("invalid symbol segment magic"));
1436 }
1437
1438 #[test]
1439 fn test_symbol_log_append_records() {
1440 let dir = tempdir().expect("tempdir");
1441 let mut manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1442 let sizes = [1024_u32, 1536, 2048, 3072, 4096];
1443 for (idx, size) in sizes.into_iter().enumerate() {
1444 let idx_u32 = u32::try_from(idx).expect("test index fits u32");
1445 let seed = u8::try_from(idx + 1).expect("test index fits u8");
1446 let rec = test_record(seed, idx_u32, size, 0xA0);
1447 manager.append(&rec).expect("append record");
1448 }
1449
1450 let scan = scan_symbol_segment(&manager.active_segment_path()).expect("scan segment");
1451 assert_eq!(scan.records.len(), 5);
1452 assert!(!scan.torn_tail);
1453 assert_eq!(scan.records[0].record.symbol_data.len(), 1024);
1454 assert_eq!(scan.records[4].record.symbol_data.len(), 4096);
1455 manager.rotate(2, 43, 200).expect("rotation succeeds");
1456 }
1457
1458 #[test]
1459 fn test_symbol_log_torn_tail_recovery() {
1460 let dir = tempdir().expect("tempdir");
1461 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1462 for idx in 0_u32..3_u32 {
1463 let seed = u8::try_from(idx + 1).expect("small index fits u8");
1464 let rec = test_record(seed, idx, 1024, 0xB0);
1465 manager.append(&rec).expect("append record");
1466 }
1467
1468 let partial = test_record(9, 9, 1024, 0xCC).to_bytes();
1469 let partial_len = partial.len() / 2;
1470 let mut file = OpenOptions::new()
1471 .append(true)
1472 .open(manager.active_segment_path())
1473 .expect("open for append");
1474 file.write_all(&partial[..partial_len])
1475 .expect("write partial record");
1476 file.sync_data().expect("sync partial tail");
1477
1478 let scan = scan_symbol_segment(&manager.active_segment_path()).expect("scan segment");
1479 assert_eq!(scan.records.len(), 3);
1480 assert!(scan.torn_tail);
1481 }
1482
1483 #[test]
1484 fn test_locator_offset_computation() {
1485 let dir = tempdir().expect("tempdir");
1486 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1487 let record = test_record(7, 11, 2048, 0x44);
1488 let offset = manager.append(&record).expect("append record");
1489
1490 let loaded = read_symbol_record_at_offset(&manager.active_segment_path(), offset)
1491 .expect("read by offset");
1492 assert_eq!(loaded.object_id, record.object_id);
1493 assert_eq!(loaded.esi, record.esi);
1494 assert_eq!(loaded.symbol_data, record.symbol_data);
1495 }
1496
1497 #[test]
1498 fn test_locator_cache_rebuild() {
1499 let dir = tempdir().expect("tempdir");
1500 let mut manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1501
1502 let record_alpha_first = test_record(1, 0, 1024, 0x01);
1503 let record_bravo_first = test_record(2, 1, 1024, 0x02);
1504 manager.append(&record_alpha_first).expect("append a1");
1505 manager.append(&record_bravo_first).expect("append b1");
1506
1507 manager.rotate(2, 43, 200).expect("rotate");
1508 let record_alpha_second = test_record(1, 2, 1024, 0x03);
1509 let record_charlie_second = test_record(3, 3, 1024, 0x04);
1510 manager.append(&record_alpha_second).expect("append a2");
1511 manager.append(&record_charlie_second).expect("append c2");
1512
1513 let locator = rebuild_object_locator(dir.path()).expect("rebuild locator");
1514 assert_eq!(locator.len(), 3);
1515 assert_eq!(
1516 locator
1517 .get(&ObjectId::from_bytes([1_u8; 16]))
1518 .expect("object 1 exists")
1519 .len(),
1520 2
1521 );
1522 assert_eq!(
1523 locator
1524 .get(&ObjectId::from_bytes([2_u8; 16]))
1525 .expect("object 2 exists")
1526 .len(),
1527 1
1528 );
1529 assert_eq!(
1530 locator
1531 .get(&ObjectId::from_bytes([3_u8; 16]))
1532 .expect("object 3 exists")
1533 .len(),
1534 1
1535 );
1536 }
1537
1538 #[test]
1539 fn test_locator_cache_missing() {
1540 let dir = tempdir().expect("tempdir");
1541 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1542 let rec = test_record(9, 0, 1024, 0x55);
1543 manager.append(&rec).expect("append");
1544
1545 let locator = rebuild_object_locator(dir.path()).expect("rebuild from scan");
1546 assert_eq!(locator.len(), 1);
1547 assert!(locator.contains_key(&ObjectId::from_bytes([9_u8; 16])));
1548 }
1549
1550 #[test]
1551 fn test_systematic_run_locator_rebuild_happy_path() {
1552 let dir = tempdir().expect("tempdir");
1553 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1554 let oti = Oti {
1555 f: 64_u64 * 3,
1556 al: 1,
1557 t: 64,
1558 z: 1,
1559 n: 1,
1560 };
1561 let object_id = ObjectId::from_bytes([7_u8; 16]);
1562
1563 let r0 = systematic_record(7, oti, 0, 0xA1, true);
1564 let r1 = systematic_record(7, oti, 1, 0xA2, false);
1565 let r2 = systematic_record(7, oti, 2, 0xA3, false);
1566 let repair = systematic_record(7, oti, 3, 0xAF, false);
1567
1568 let o0 = manager.append(&r0).expect("append esi0");
1569 let o1 = manager.append(&r1).expect("append esi1");
1570 let o2 = manager.append(&r2).expect("append esi2");
1571 let _o3 = manager.append(&repair).expect("append repair");
1572
1573 let locator =
1574 rebuild_systematic_run_locator(dir.path()).expect("rebuild systematic locator");
1575 let run = locator.get(&object_id).expect("run must exist");
1576 assert_eq!(run.segment_id, 1);
1577 assert_eq!(run.esi_start, 0);
1578 assert_eq!(run.esi_end_inclusive, 2);
1579 assert_eq!(run.source_symbol_count(), 3);
1580 assert_eq!(run.offsets, vec![o0, o1, o2]);
1581 }
1582
1583 #[test]
1584 fn test_systematic_run_locator_missing_symbol_is_ignored() {
1585 let dir = tempdir().expect("tempdir");
1586 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1587 let oti = Oti {
1588 f: 64_u64 * 3,
1589 al: 1,
1590 t: 64,
1591 z: 1,
1592 n: 1,
1593 };
1594 let object_id = ObjectId::from_bytes([8_u8; 16]);
1595
1596 manager
1597 .append(&systematic_record(8, oti, 0, 0xB1, true))
1598 .expect("append esi0");
1599 manager
1600 .append(&systematic_record(8, oti, 2, 0xB3, false))
1601 .expect("append esi2");
1602
1603 let locator =
1604 rebuild_systematic_run_locator(dir.path()).expect("rebuild systematic locator");
1605 assert!(
1606 !locator.contains_key(&object_id),
1607 "incomplete run must not be indexed as fast-path eligible"
1608 );
1609 }
1610
1611 #[test]
1612 fn test_systematic_run_locator_interleaved_object_is_ignored() {
1613 let dir = tempdir().expect("tempdir");
1614 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1615 let oti = Oti {
1616 f: 64_u64 * 2,
1617 al: 1,
1618 t: 64,
1619 z: 1,
1620 n: 1,
1621 };
1622 let object_a = ObjectId::from_bytes([11_u8; 16]);
1623 let object_b = ObjectId::from_bytes([12_u8; 16]);
1624
1625 manager
1626 .append(&systematic_record(11, oti, 0, 0xC1, true))
1627 .expect("append A esi0");
1628 manager
1629 .append(&systematic_record(12, oti, 0, 0xD1, true))
1630 .expect("append B esi0");
1631 manager
1632 .append(&systematic_record(11, oti, 1, 0xC2, false))
1633 .expect("append A esi1");
1634
1635 let locator =
1636 rebuild_systematic_run_locator(dir.path()).expect("rebuild systematic locator");
1637 assert!(
1638 !locator.contains_key(&object_a),
1639 "interleaved run must be rejected for fast-path"
1640 );
1641 assert!(
1642 !locator.contains_key(&object_b),
1643 "single-symbol run with K=2 must be rejected as incomplete"
1644 );
1645 }
1646
1647 #[test]
1648 fn test_systematic_run_locator_prefers_newest_complete_run() {
1649 let dir = tempdir().expect("tempdir");
1650 let mut manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1651 let oti = Oti {
1652 f: 64_u64 * 2,
1653 al: 1,
1654 t: 64,
1655 z: 1,
1656 n: 1,
1657 };
1658 let object_id = ObjectId::from_bytes([13_u8; 16]);
1659
1660 manager
1661 .append(&systematic_record(13, oti, 0, 0xE1, true))
1662 .expect("append seg1 esi0");
1663 manager
1664 .append(&systematic_record(13, oti, 1, 0xE2, false))
1665 .expect("append seg1 esi1");
1666
1667 manager.rotate(2, 43, 200).expect("rotate");
1668 let newer_o0 = manager
1669 .append(&systematic_record(13, oti, 0, 0xF1, true))
1670 .expect("append seg2 esi0");
1671 let newer_o1 = manager
1672 .append(&systematic_record(13, oti, 1, 0xF2, false))
1673 .expect("append seg2 esi1");
1674
1675 let locator =
1676 rebuild_systematic_run_locator(dir.path()).expect("rebuild systematic locator");
1677 let run = locator.get(&object_id).expect("run exists");
1678 assert_eq!(
1679 run.segment_id, 2,
1680 "newest complete run should win in append-order locator rebuild"
1681 );
1682 assert_eq!(run.offsets, vec![newer_o0, newer_o1]);
1683 }
1684
1685 #[test]
1686 fn test_systematic_fast_path_success() {
1687 let dir = tempdir().expect("tempdir");
1688 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1689 let oti = Oti {
1690 f: 64_u64 * 3 - 11,
1691 al: 1,
1692 t: 64,
1693 z: 1,
1694 n: 1,
1695 };
1696 let object_id = ObjectId::from_bytes([21_u8; 16]);
1697
1698 let r0 = systematic_record(21, oti, 0, 0x11, true);
1699 let r1 = systematic_record(21, oti, 1, 0x22, false);
1700 let r2 = systematic_record(21, oti, 2, 0x33, false);
1701 manager.append(&r0).expect("append esi0");
1702 manager.append(&r1).expect("append esi1");
1703 manager.append(&r2).expect("append esi2");
1704 manager
1705 .append(&systematic_record(21, oti, 3, 0x44, false))
1706 .expect("append repair");
1707
1708 let runs = rebuild_systematic_run_locator(dir.path()).expect("rebuild runs");
1709 let run = runs.get(&object_id).expect("run exists");
1710 let maybe_payload = read_systematic_fast_path(dir.path(), run, object_id, oti, None)
1711 .expect("fast-path read");
1712 let payload = maybe_payload.expect("fast path should reconstruct");
1713
1714 let mut expected = Vec::new();
1715 expected.extend_from_slice(&r0.symbol_data);
1716 expected.extend_from_slice(&r1.symbol_data);
1717 expected.extend_from_slice(&r2.symbol_data);
1718 expected.truncate(usize::try_from(oti.f).expect("f fits usize"));
1719 assert_eq!(payload, expected);
1720 }
1721
1722 #[test]
1723 fn test_systematic_fast_path_corrupt_symbol_requires_fallback() {
1724 let dir = tempdir().expect("tempdir");
1725 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1726 let oti = Oti {
1727 f: 64_u64 * 3 - 7,
1728 al: 1,
1729 t: 64,
1730 z: 1,
1731 n: 1,
1732 };
1733 let object_id = ObjectId::from_bytes([22_u8; 16]);
1734
1735 let r0 = systematic_record(22, oti, 0, 0x51, true);
1736 let r1 = systematic_record(22, oti, 1, 0x52, false);
1737 let r2 = systematic_record(22, oti, 2, 0x53, false);
1738 manager.append(&r0).expect("append esi0");
1739 let r1_offset = manager.append(&r1).expect("append esi1");
1740 manager.append(&r2).expect("append esi2");
1741
1742 let runs = rebuild_systematic_run_locator(dir.path()).expect("rebuild runs");
1743 let run = runs.get(&object_id).expect("run exists").clone();
1744
1745 let segment_path = symbol_segment_path(dir.path(), r1_offset.segment_id);
1746 let mut bytes = fs::read(&segment_path).expect("read segment bytes");
1747 let record_offset = usize::try_from(r1_offset.offset_bytes).expect("offset fits usize");
1748 let absolute_record_offset = SYMBOL_SEGMENT_HEADER_BYTES
1749 .checked_add(record_offset)
1750 .expect("absolute offset");
1751 let data_byte_offset = absolute_record_offset
1752 .checked_add(SYMBOL_RECORD_HEADER_BYTES)
1753 .expect("data offset");
1754 bytes[data_byte_offset] ^= 0xFF;
1755 fs::write(&segment_path, bytes).expect("write corrupted segment");
1756
1757 let result = read_systematic_fast_path(dir.path(), &run, object_id, oti, None)
1758 .expect("fast-path read should not hard-fail on corrupt symbol");
1759 assert!(
1760 result.is_none(),
1761 "corrupt symbol should force fallback path"
1762 );
1763 }
1764
1765 #[test]
1766 fn test_systematic_fast_path_missing_symbol_requires_fallback() {
1767 let dir = tempdir().expect("tempdir");
1768 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1769 let oti = Oti {
1770 f: 64_u64 * 3 - 3,
1771 al: 1,
1772 t: 64,
1773 z: 1,
1774 n: 1,
1775 };
1776 let object_id = ObjectId::from_bytes([23_u8; 16]);
1777
1778 manager
1779 .append(&systematic_record(23, oti, 0, 0x61, true))
1780 .expect("append esi0");
1781 manager
1782 .append(&systematic_record(23, oti, 1, 0x62, false))
1783 .expect("append esi1");
1784 manager
1785 .append(&systematic_record(23, oti, 2, 0x63, false))
1786 .expect("append esi2");
1787
1788 let runs = rebuild_systematic_run_locator(dir.path()).expect("rebuild runs");
1789 let mut run = runs.get(&object_id).expect("run exists").clone();
1790 run.offsets[1].offset_bytes = run.offsets[1].offset_bytes.saturating_add(1_000_000);
1791
1792 let result = read_systematic_fast_path(dir.path(), &run, object_id, oti, None)
1793 .expect("fast-path read should not hard-fail on missing symbol");
1794 assert!(
1795 result.is_none(),
1796 "missing symbol should force fallback path"
1797 );
1798 }
1799
1800 #[test]
1801 fn test_systematic_fast_path_auth_failure_requires_fallback() {
1802 let dir = tempdir().expect("tempdir");
1803 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1804 let oti = Oti {
1805 f: 64_u64 * 3 - 5,
1806 al: 1,
1807 t: 64,
1808 z: 1,
1809 n: 1,
1810 };
1811 let object_id = ObjectId::from_bytes([24_u8; 16]);
1812 let auth_epoch_key = [0xA5_u8; 32];
1813 let wrong_epoch_key = [0x5A_u8; 32];
1814
1815 let r0 = systematic_record(24, oti, 0, 0x71, true).with_auth_tag(&auth_epoch_key);
1816 let r1 = systematic_record(24, oti, 1, 0x72, false).with_auth_tag(&auth_epoch_key);
1817 let r2 = systematic_record(24, oti, 2, 0x73, false).with_auth_tag(&auth_epoch_key);
1818 manager.append(&r0).expect("append esi0");
1819 manager.append(&r1).expect("append esi1");
1820 manager.append(&r2).expect("append esi2");
1821
1822 let runs = rebuild_systematic_run_locator(dir.path()).expect("rebuild runs");
1823 let run = runs.get(&object_id).expect("run exists");
1824
1825 let wrong_key_result =
1826 read_systematic_fast_path(dir.path(), run, object_id, oti, Some(&wrong_epoch_key))
1827 .expect("fast-path read with wrong key");
1828 assert!(
1829 wrong_key_result.is_none(),
1830 "auth mismatch should force fallback path"
1831 );
1832
1833 let correct_key_result =
1834 read_systematic_fast_path(dir.path(), run, object_id, oti, Some(&auth_epoch_key))
1835 .expect("fast-path read with correct key");
1836 assert!(
1837 correct_key_result.is_some(),
1838 "correct auth key should keep fast path eligible"
1839 );
1840 }
1841
1842 #[test]
1843 fn test_epoch_id_stored() {
1844 let dir = tempdir().expect("tempdir");
1845 let manager = SymbolLogManager::new(dir.path(), 1, 42, 123_456).expect("manager");
1846 let bytes = fs::read(manager.active_segment_path()).expect("read segment bytes");
1847 let header = SymbolSegmentHeader::decode(&bytes[..SYMBOL_SEGMENT_HEADER_BYTES])
1848 .expect("decode header");
1849 assert_eq!(header.epoch_id, 42);
1850 }
1851
1852 #[test]
1853 fn test_immutable_rotated_segments() {
1854 let dir = tempdir().expect("tempdir");
1855 let mut manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1856 manager
1857 .append(&test_record(1, 0, 1024, 0x11))
1858 .expect("append segment 1");
1859 manager.rotate(2, 43, 200).expect("rotate");
1860
1861 let err = manager
1862 .append_to_segment(1, &test_record(2, 1, 1024, 0x22))
1863 .expect_err("rotated segment should be immutable");
1864 assert!(err.to_string().contains("immutable"));
1865 }
1866
1867 #[test]
1868 fn test_variable_size_records() {
1869 let dir = tempdir().expect("tempdir");
1870 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1871 for (idx, size) in [1024_u32, 4096, 65_536].into_iter().enumerate() {
1872 let idx_u32 = u32::try_from(idx).expect("small test index fits u32");
1873 let seed = u8::try_from(idx + 1).expect("small test index fits u8");
1874 let rec = test_record(seed, idx_u32, size, 0x66);
1875 manager.append(&rec).expect("append variable-size record");
1876 }
1877
1878 let scan = scan_symbol_segment(&manager.active_segment_path()).expect("scan");
1879 assert_eq!(scan.records.len(), 3);
1880 assert_eq!(scan.records[0].record.symbol_data.len(), 1024);
1881 assert_eq!(scan.records[1].record.symbol_data.len(), 4096);
1882 assert_eq!(scan.records[2].record.symbol_data.len(), 65_536);
1883 }
1884
1885 #[test]
1886 fn test_no_o_direct_requirement() {
1887 let dir = tempdir().expect("tempdir");
1888 let manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1889 manager
1890 .append(&test_record(4, 0, 1024, 0x77))
1891 .expect("buffered append succeeds");
1892 let scan =
1893 scan_symbol_segment(&manager.active_segment_path()).expect("buffered scan succeeds");
1894 assert_eq!(scan.records.len(), 1);
1895 assert!(!scan.torn_tail);
1896 }
1897
1898 #[test]
1899 fn test_aligned_variant_optional() {
1900 let dir = tempdir().expect("tempdir");
1901 let header = SymbolSegmentHeader::new(1, 42, 100);
1902 let record = test_record(5, 0, 1024, 0x88);
1903 let entry = append_symbol_record_aligned(dir.path(), header, &record, 4096)
1904 .expect("aligned append");
1905
1906 assert_eq!(u64::from(entry.padded_len) % 4096, 0);
1907 assert!(entry.padded_len >= entry.logical_len);
1908
1909 let segment_path = symbol_segment_path(dir.path(), 1);
1910 let loaded = read_aligned_symbol_record(&segment_path, entry).expect("read aligned");
1911 assert_eq!(loaded.object_id, record.object_id);
1912 assert_eq!(loaded.esi, record.esi);
1913 assert_eq!(loaded.frame_xxh3, record.frame_xxh3);
1914 assert!(loaded.verify_integrity());
1915 }
1916
1917 #[test]
1918 fn test_aligned_read_rejects_inconsistent_index_entry() {
1919 let dir = tempdir().expect("tempdir");
1920 let header = SymbolSegmentHeader::new(1, 42, 100);
1921 let record = test_record(6, 0, 1024, 0x99);
1922 let entry = append_symbol_record_aligned(dir.path(), header, &record, 4096)
1923 .expect("aligned append");
1924
1925 let segment_path = symbol_segment_path(dir.path(), 1);
1926 let mut bad = entry;
1927 bad.padded_len = bad.logical_len.saturating_sub(1);
1928
1929 let err =
1930 read_aligned_symbol_record(&segment_path, bad).expect_err("must reject bad index");
1931 let FrankenError::DatabaseCorrupt { detail } = err else {
1932 panic!("expected DatabaseCorrupt for inconsistent aligned index");
1933 };
1934 assert!(
1935 detail.contains("logical_len") && detail.contains("padded_len"),
1936 "unexpected detail: {detail}"
1937 );
1938 }
1939
1940 #[test]
1941 fn test_bd_1hi_24_unit_compliance_gate() {
1942 assert_eq!(SYMBOL_SEGMENT_HEADER_BYTES, 40);
1943 assert_eq!(SYMBOL_SEGMENT_MAGIC, *b"FSSY");
1944 for token in [
1945 "test_bd_1hi_24_unit_compliance_gate",
1946 "prop_bd_1hi_24_structure_compliance",
1947 "test_e2e_bd_1hi_24_compliance",
1948 "DEBUG",
1949 "INFO",
1950 "WARN",
1951 "ERROR",
1952 "bd-1fpm",
1953 ] {
1954 assert!(BD_1HI_24_COMPLIANCE_SENTINEL.contains(token));
1955 }
1956 }
1957
1958 #[test]
1959 fn prop_bd_1hi_24_structure_compliance() {
1960 let dir = tempdir().expect("tempdir");
1961 let mut manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1962 let mut expected: BTreeMap<ObjectId, Vec<SymbolLogOffset>> = BTreeMap::new();
1963
1964 for segment_index in 0_u64..3_u64 {
1965 if segment_index > 0 {
1966 manager
1967 .rotate(segment_index + 1, 42 + segment_index, 100 + segment_index)
1968 .expect("rotate");
1969 }
1970 for record_index in 0_u32..6_u32 {
1971 let object_seed = u8::try_from((segment_index + u64::from(record_index)) % 4)
1972 .expect("small seed");
1973 let fill = u8::try_from(0x90_u64 + segment_index + u64::from(record_index))
1974 .expect("small fill");
1975 let rec = test_record(object_seed, record_index, 1024, fill);
1976 let offset = manager.append(&rec).expect("append");
1977 expected.entry(rec.object_id).or_default().push(offset);
1978 }
1979 }
1980
1981 for offsets in expected.values_mut() {
1982 offsets.sort_unstable();
1983 }
1984
1985 let rebuilt = rebuild_object_locator(dir.path()).expect("rebuild locator");
1986 assert_eq!(rebuilt, expected);
1987 }
1988
1989 #[test]
1990 fn test_e2e_bd_1hi_24_compliance() {
1991 let dir = tempdir().expect("tempdir");
1992 let mut manager = SymbolLogManager::new(dir.path(), 1, 42, 100).expect("manager");
1993 let mut written = Vec::new();
1994
1995 let rec_a = test_record(1, 0, 1024, 0x11);
1996 let rec_b = test_record(2, 1, 2048, 0x22);
1997 written.push((
1998 rec_a.object_id,
1999 manager.append(&rec_a).expect("append rec_a to segment 1"),
2000 ));
2001 written.push((
2002 rec_b.object_id,
2003 manager.append(&rec_b).expect("append rec_b to segment 1"),
2004 ));
2005
2006 manager.rotate(2, 43, 200).expect("rotate");
2007 let rec_c = test_record(1, 2, 4096, 0x33);
2008 let rec_d = test_record(3, 3, 1024, 0x44);
2009 written.push((
2010 rec_c.object_id,
2011 manager.append(&rec_c).expect("append rec_c to segment 2"),
2012 ));
2013 written.push((
2014 rec_d.object_id,
2015 manager.append(&rec_d).expect("append rec_d to segment 2"),
2016 ));
2017
2018 let locator = rebuild_object_locator(dir.path()).expect("rebuild locator");
2019 assert_eq!(locator.len(), 3);
2020
2021 for (object_id, offset) in &written {
2022 let path = symbol_segment_path(dir.path(), offset.segment_id);
2023 let loaded = read_symbol_record_at_offset(&path, *offset).expect("direct offset read");
2024 assert_eq!(&loaded.object_id, object_id);
2025 }
2026
2027 let active_scan_before =
2028 scan_symbol_segment(&manager.active_segment_path()).expect("scan active before crash");
2029 let active_count_before = active_scan_before.records.len();
2030
2031 let crash_partial = test_record(9, 99, 1024, 0xEE).to_bytes();
2032 let partial_len = crash_partial.len() / 2;
2033 let mut file = OpenOptions::new()
2034 .append(true)
2035 .open(manager.active_segment_path())
2036 .expect("open active segment for crash tail");
2037 file.write_all(&crash_partial[..partial_len])
2038 .expect("append torn tail");
2039 file.sync_data().expect("sync torn tail");
2040
2041 let active_scan_after =
2042 scan_symbol_segment(&manager.active_segment_path()).expect("scan active after crash");
2043 assert_eq!(active_scan_after.records.len(), active_count_before);
2044 assert!(active_scan_after.torn_tail);
2045 }
2046}