use crate::dtob_ffi;
use crate::dtob_wire::*;
use std::fs::{File};
use std::os::unix::fs::FileExt;
use std::path::Path;
const SCAN_CHUNK_SIZE: usize = 1 << 20;
const ROOT_CLOSE_ARR: [u8; 2] = [0xC0, 0x01];
#[derive(Clone, Copy, Debug)]
pub struct EntrySpan {
pub offset: u64,
pub len: usize,
}
struct DifIndex {
prefix: Vec<u8>,
entries: Vec<EntrySpan>,
}
pub struct BuildContentResult {
pub current_content: Vec<u8>,
pub target_root: *mut dtob_ffi::DtobValue,
pub entry_count: usize,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum TokenKind {
Open,
ArrClose,
KvClose,
TypesClose,
Primitive(u16),
Custom(u16),
Data,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum CloseKind {
Arr,
Kv,
Types,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct TokenSpan {
kind: TokenKind,
end: usize,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ScanStatus<T> {
Complete(T),
NeedMore,
Invalid,
}
fn is_ctrl_byte(byte: u8) -> bool {
(byte & DTOB_CTRL_MASK) == DTOB_CTRL_MASK
}
fn ctrl_code(buf: &[u8], pos: usize) -> Option<(u16, usize)> {
if pos + 2 > buf.len() || !is_ctrl_byte(buf[pos]) {
return None;
}
let code = (((buf[pos] & DTOB_CODE_MASK) as u16) << 8) | buf[pos + 1] as u16;
Some((code, pos + 2))
}
fn next_token(buf: &[u8], pos: usize, eof: bool) -> ScanStatus<TokenSpan> {
if pos >= buf.len() {
return if eof {
ScanStatus::Invalid
} else {
ScanStatus::NeedMore
};
}
let mut p = pos;
loop {
if p >= buf.len() {
return if eof {
ScanStatus::Invalid
} else {
ScanStatus::NeedMore
};
}
if let Some((code, end)) = ctrl_code(buf, p) {
if code == DTOB_CODE_BLAST {
p = end;
continue;
}
let kind = match code {
DTOB_CODE_OPEN => TokenKind::Open,
DTOB_CODE_ARR_CLOSE => TokenKind::ArrClose,
DTOB_CODE_KV_CLOSE => TokenKind::KvClose,
DTOB_CODE_TYPES_CLOSE => TokenKind::TypesClose,
DTOB_CODE_RAW
| DTOB_CODE_FLOAT
| DTOB_CODE_DOUBLE
| DTOB_CODE_INT8
| DTOB_CODE_INT16
| DTOB_CODE_INT32
| DTOB_CODE_INT64
| DTOB_CODE_UINT8
| DTOB_CODE_UINT16
| DTOB_CODE_UINT32
| DTOB_CODE_UINT64 => TokenKind::Primitive(code),
DTOB_CUSTOM_MIN..=DTOB_CUSTOM_MAX => TokenKind::Custom(code),
_ => return ScanStatus::Invalid,
};
return ScanStatus::Complete(TokenSpan { kind, end });
}
break;
}
let mut cursor = p;
let mut saw_padding = false;
while cursor < buf.len() {
let byte = buf[cursor];
if is_ctrl_byte(byte) {
break;
}
let mut in_padding = false;
for shift in (0..=6).rev().step_by(2) {
let pair = (byte >> shift) & 0x03;
if pair == 0x03 {
in_padding = true;
saw_padding = true;
} else if in_padding {
return ScanStatus::Invalid;
}
}
cursor += 1;
if in_padding {
break;
}
}
if cursor == pos {
return ScanStatus::Invalid;
}
if !saw_padding && cursor == buf.len() && !eof {
return ScanStatus::NeedMore;
}
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Data,
end: cursor,
})
}
fn consume_data_token(buf: &[u8], pos: usize, eof: bool) -> ScanStatus<usize> {
match next_token(buf, pos, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Data,
end,
}) => ScanStatus::Complete(end),
ScanStatus::NeedMore => ScanStatus::NeedMore,
ScanStatus::Invalid => ScanStatus::Invalid,
ScanStatus::Complete(_) => ScanStatus::Invalid,
}
}
fn parse_payload(buf: &[u8], pos: usize, eof: bool, opcode: u16) -> ScanStatus<usize> {
match opcode {
DTOB_CODE_RAW | DIF_CCID => match next_token(buf, pos, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Data,
end,
}) => ScanStatus::Complete(end),
ScanStatus::Complete(_) => ScanStatus::Complete(pos),
other => other.map_complete(|_| pos),
},
DTOB_CODE_FLOAT
| DTOB_CODE_DOUBLE
| DTOB_CODE_INT8
| DTOB_CODE_INT16
| DTOB_CODE_INT32
| DTOB_CODE_INT64
| DTOB_CODE_UINT8
| DTOB_CODE_UINT16
| DTOB_CODE_UINT32
| DTOB_CODE_UINT64
| DIF_START
| DIF_END
| DIF_DATA => consume_data_token(buf, pos, eof),
DIF_COPY | DIF_ADD => match next_token(buf, pos, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Open,
end,
}) => scan_collection(buf, end, eof).and_then(|(end, close)| {
if close == CloseKind::Arr {
ScanStatus::Complete(end)
} else {
ScanStatus::Invalid
}
}),
other => other.map_complete(|_| pos),
},
DIF_OP => {
match next_token(buf, pos, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Custom(inner_opcode),
end,
}) => parse_payload(buf, end, eof, inner_opcode),
ScanStatus::NeedMore => ScanStatus::NeedMore,
_ => ScanStatus::Invalid,
}
}
_ => {
eprintln!("[parse_payload] unhandled opcode={} at pos={}", opcode, pos);
ScanStatus::Invalid
}
}
}
fn scan_value(buf: &[u8], pos: usize, eof: bool) -> ScanStatus<usize> {
match next_token(buf, pos, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Open,
end,
}) => scan_collection(buf, end, eof).map_complete(|(end, _)| end),
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Primitive(opcode),
end,
}) => parse_payload(buf, end, eof, opcode),
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Custom(opcode),
end,
}) => parse_payload(buf, end, eof, opcode),
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Data,
end,
}) => ScanStatus::Complete(end),
ScanStatus::Complete(ts) => {
eprintln!("[scan_value] unexpected token {:?} at pos={}", ts, pos);
ScanStatus::Invalid
}
ScanStatus::NeedMore => ScanStatus::NeedMore,
ScanStatus::Invalid => ScanStatus::Invalid,
}
}
fn scan_collection(buf: &[u8], mut pos: usize, eof: bool) -> ScanStatus<(usize, CloseKind)> {
loop {
match next_token(buf, pos, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::ArrClose,
end,
}) => return ScanStatus::Complete((end, CloseKind::Arr)),
ScanStatus::Complete(TokenSpan {
kind: TokenKind::KvClose,
end,
}) => return ScanStatus::Complete((end, CloseKind::Kv)),
ScanStatus::Complete(TokenSpan {
kind: TokenKind::TypesClose,
end,
}) => return ScanStatus::Complete((end, CloseKind::Types)),
ScanStatus::Complete(_) => match scan_value(buf, pos, eof) {
ScanStatus::Complete(end) => pos = end,
ScanStatus::NeedMore => return ScanStatus::NeedMore,
ScanStatus::Invalid => return ScanStatus::Invalid,
},
ScanStatus::NeedMore => return ScanStatus::NeedMore,
ScanStatus::Invalid => return ScanStatus::Invalid,
}
}
}
fn skip_to_types_close(buf: &[u8], mut pos: usize, eof: bool) -> ScanStatus<usize> {
loop {
match next_token(buf, pos, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::TypesClose,
end,
}) => return ScanStatus::Complete(end),
ScanStatus::Complete(TokenSpan { end, .. }) => {
pos = end;
}
ScanStatus::NeedMore => return ScanStatus::NeedMore,
ScanStatus::Invalid => return ScanStatus::Invalid,
}
}
}
fn parse_prefix(buf: &[u8], eof: bool) -> ScanStatus<usize> {
if buf.len() < DTOB_MAGIC.len() {
return if eof {
ScanStatus::Invalid
} else {
ScanStatus::NeedMore
};
}
if &buf[..DTOB_MAGIC.len()] != DTOB_MAGIC {
return ScanStatus::Invalid;
}
let root_open = match next_token(buf, DTOB_MAGIC.len(), eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Open,
end,
}) => end,
ScanStatus::Complete(ts) => {
eprintln!("[parse_prefix] expected root Open, got {:?}", ts);
return ScanStatus::Invalid;
}
ScanStatus::NeedMore => return ScanStatus::NeedMore,
ScanStatus::Invalid => {
eprintln!("[parse_prefix] next_token for root Open returned Invalid");
return ScanStatus::Invalid;
}
};
match next_token(buf, root_open, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::Open,
end,
}) => match skip_to_types_close(buf, end, eof) {
ScanStatus::Complete(end) => ScanStatus::Complete(end),
ScanStatus::NeedMore => ScanStatus::NeedMore,
ScanStatus::Invalid => {
eprintln!("[parse_prefix] skip_to_types_close returned Invalid");
ScanStatus::Invalid
}
},
ScanStatus::Complete(_) => ScanStatus::Complete(root_open),
ScanStatus::NeedMore => ScanStatus::NeedMore,
ScanStatus::Invalid => ScanStatus::Invalid,
}
}
fn append_scan_chunk(file: &File, file_len: u64, offset: &mut u64, buffer: &mut Vec<u8>) -> Option<bool> {
if *offset >= file_len {
return Some(true);
}
let mut chunk = vec![0u8; SCAN_CHUNK_SIZE];
let read = file.read_at(&mut chunk, *offset).ok()?;
if read == 0 {
return Some(true);
}
*offset += read as u64;
buffer.extend_from_slice(&chunk[..read]);
Some(*offset >= file_len)
}
pub(crate) fn read_dif_prefix(file: &File, file_len: u64) -> Option<Vec<u8>> {
let mut buffer = Vec::new();
let mut read_offset = 0u64;
let mut eof = append_scan_chunk(&file, file_len, &mut read_offset, &mut buffer)?;
let prefix_len = loop {
match parse_prefix(&buffer, eof) {
ScanStatus::Complete(prefix_len) => break prefix_len,
ScanStatus::NeedMore => {
eof = append_scan_chunk(&file, file_len, &mut read_offset, &mut buffer)?;
}
ScanStatus::Invalid => {
eprintln!("[read_dif_prefix] parse_prefix returned Invalid at buf.len={}", buffer.len());
return None;
}
}
};
Some(buffer[..prefix_len].to_vec())
}
pub(crate) fn scan_entry_spans_from_offset(
file: &File,
file_len: u64,
start_offset: u64,
max_entries: Option<usize>,
) -> Option<Vec<EntrySpan>> {
let mut buffer = Vec::new();
let mut read_offset = start_offset;
let mut eof = append_scan_chunk(file, file_len, &mut read_offset, &mut buffer)?;
let mut base_offset = start_offset;
let mut entries = Vec::new();
let mut cursor = 0usize;
loop {
if let Some(limit) = max_entries {
if entries.len() >= limit {
break;
}
}
if cursor == buffer.len() && eof {
break;
}
match next_token(&buffer, cursor, eof) {
ScanStatus::Complete(TokenSpan {
kind: TokenKind::ArrClose,
..
}) => break,
ScanStatus::Complete(TokenSpan {
kind: TokenKind::KvClose | TokenKind::TypesClose,
..
}) => {
eprintln!("[scan_entry_spans] unexpected close token at cursor={} (base_offset={}, {} entries so far)", cursor, base_offset, entries.len());
return None;
}
ScanStatus::Complete(_) => match scan_value(&buffer, cursor, eof) {
ScanStatus::Complete(end) => {
entries.push(EntrySpan {
offset: base_offset + cursor as u64,
len: end - cursor,
});
cursor = end;
}
ScanStatus::NeedMore => {
if cursor > 0 {
base_offset += cursor as u64;
buffer.drain(..cursor);
cursor = 0;
}
if eof {
eprintln!("[scan_entry_spans] scan_value NeedMore at EOF, cursor={} base_offset={} ({} entries so far)", cursor, base_offset, entries.len());
return None;
}
eof = append_scan_chunk(&file, file_len, &mut read_offset, &mut buffer)?;
}
ScanStatus::Invalid => {
eprintln!("[scan_entry_spans] scan_value returned Invalid at cursor={} base_offset={} ({} entries so far)", cursor, base_offset, entries.len());
return None;
}
},
ScanStatus::NeedMore => {
if cursor > 0 {
base_offset += cursor as u64;
buffer.drain(..cursor);
cursor = 0;
}
if eof {
eprintln!("[scan_entry_spans] next_token NeedMore at EOF, cursor={} base_offset={} ({} entries so far)", cursor, base_offset, entries.len());
return None;
}
eof = append_scan_chunk(&file, file_len, &mut read_offset, &mut buffer)?;
}
ScanStatus::Invalid => {
eprintln!("[scan_entry_spans] next_token returned Invalid at cursor={} base_offset={} ({} entries so far)", cursor, base_offset, entries.len());
return None;
}
}
}
Some(entries)
}
fn index_dif_file(dif_path: &Path) -> Option<DifIndex> {
let file = match File::open(dif_path) {
Ok(f) => f,
Err(e) => {
eprintln!("[index_dif_file] failed to open {:?}: {}", dif_path, e);
return None;
}
};
let file_len = match file.metadata() {
Ok(m) => m.len(),
Err(e) => {
eprintln!("[index_dif_file] failed to read metadata for {:?}: {}", dif_path, e);
return None;
}
};
let prefix = match read_dif_prefix(&file, file_len) {
Some(p) => p,
None => {
eprintln!("[index_dif_file] read_dif_prefix failed for {:?} (file_len={})", dif_path, file_len);
return None;
}
};
let prefix_len = prefix.len() as u64;
let entries = match scan_entry_spans_from_offset(&file, file_len, prefix_len, None) {
Some(e) => e,
None => {
eprintln!("[index_dif_file] scan_entry_spans failed for {:?} (prefix_len={}, file_len={})", dif_path, prefix_len, file_len);
return None;
}
};
Some(DifIndex { prefix, entries })
}
pub fn snapshot_offsets(dif_path: &Path) -> Option<Vec<(usize, u64)>> {
let dif_index = index_dif_file(dif_path)?;
Some(
dif_index
.entries
.iter()
.enumerate()
.filter(|(idx, _)| idx % crate::SNAPSHOT_INTERVAL == 0)
.map(|(idx, span)| (idx, span.offset))
.collect(),
)
}
fn read_file_range(file: &File, offset: u64, len: usize) -> Option<Vec<u8>> {
let mut buf = vec![0u8; len];
let read = file.read_at(&mut buf, offset).ok()?;
if read != len {
return None;
}
Some(buf)
}
fn decode_entry_chunk(prefix: &[u8], chunk: &[u8]) -> Option<*mut dtob_ffi::DtobValue> {
let mut buf = Vec::with_capacity(prefix.len() + chunk.len() + ROOT_CLOSE_ARR.len());
buf.extend_from_slice(prefix);
buf.extend_from_slice(chunk);
buf.extend_from_slice(&ROOT_CLOSE_ARR);
let root = unsafe { dtob_ffi::ffi_decode_dif(buf.as_ptr(), buf.len()) };
if root.is_null() {
None
} else {
Some(root)
}
}
pub fn entry_count(dif_path: &Path) -> Option<usize> {
let idx = index_dif_file(dif_path)?;
Some(idx.entries.len())
}
pub fn build_content(dif_path: &Path, target_offset: Option<usize>, debug: bool) -> Option<BuildContentResult> {
build_content_from_snapshot_offset(dif_path, target_offset, None, debug)
}
pub fn build_content_from_snapshot_offset(
dif_path: &Path,
target_offset: Option<usize>,
snapshot_file_offset: Option<u64>,
debug: bool,
) -> Option<BuildContentResult> {
if !dif_path.exists() {
eprintln!("[build_content] dif_path does not exist: {:?}", dif_path);
return None;
}
let file = match File::open(dif_path) {
Ok(f) => f,
Err(e) => {
eprintln!("[build_content] failed to open {:?}: {}", dif_path, e);
return None;
}
};
let file_len = match file.metadata() {
Ok(m) => m.len(),
Err(e) => {
eprintln!("[build_content] failed to read metadata for {:?}: {}", dif_path, e);
return None;
}
};
let dif_index = index_dif_file(dif_path).or_else(|| {
eprintln!("[build_content] index_dif_file failed for {:?}", dif_path);
None
})?;
let prefix = &dif_index.prefix;
let entry_count = dif_index.entries.len();
if entry_count == 0 {
eprintln!("[build_content] dif_index had no entries for {:?}", dif_path);
return None;
}
if debug { eprintln!("[build_content] target_offset={:?}, file has {} entries", target_offset, entry_count); }
let target_n = target_offset
.unwrap_or(entry_count - 1)
.min(entry_count - 1);
let snapshot_start = (target_n / crate::SNAPSHOT_INTERVAL) * crate::SNAPSHOT_INTERVAL;
let mut current_content = Vec::new();
let mut target_root = std::ptr::null_mut();
let spans = if let Some(start_offset) = snapshot_file_offset {
scan_entry_spans_from_offset(
&file,
file_len,
start_offset,
Some(target_n - snapshot_start + 1),
)?
} else {
dif_index.entries[snapshot_start..=target_n].to_vec()
};
if spans.len() != target_n - snapshot_start + 1 {
eprintln!("[build_content] missing spans! fetched {} but expected {}", spans.len(), target_n - snapshot_start + 1);
return None;
}
for (relative_idx, span) in spans.into_iter().enumerate() {
let i = snapshot_start + relative_idx;
let chunk = read_file_range(&file, span.offset, span.len)?;
let root = decode_entry_chunk(&prefix, &chunk).or_else(|| {
eprintln!("[build_content] iteration {} (span offset {}): decode_entry_chunk failed", i, span.offset);
None
})?;
unsafe {
let ops = dtob_ffi::ffi_dtob_array_get(root, 0);
if ops.is_null() {
eprintln!("[build_content] ffi_dtob_array_get(root, 0) returned null at iteration {}", i);
dtob_ffi::ffi_dtob_free(root);
return None;
}
if i == snapshot_start {
current_content.clear();
}
let mut out_len = 0;
let patched = dtob_ffi::ffi_apply_diff(
current_content.as_ptr(),
current_content.len(),
ops,
&mut out_len,
);
if !patched.is_null() {
current_content = std::slice::from_raw_parts(patched, out_len).to_vec();
libc::free(patched as *mut libc::c_void);
} else {
eprintln!("[build_content] iteration {}: ffi_apply_diff returned null", i);
dtob_ffi::ffi_dtob_free(root);
return None;
}
if i == target_n {
target_root = root;
} else {
dtob_ffi::ffi_dtob_free(root);
}
}
}
if target_root.is_null() {
eprintln!("[build_content] target_root is null after processing all spans (target_n={}, snapshot_start={}, spans processed)",
target_n, snapshot_start);
return None;
}
Some(BuildContentResult {
current_content,
target_root,
entry_count,
})
}
trait ScanStatusExt<T> {
fn map_complete<U>(self, f: impl FnOnce(T) -> U) -> ScanStatus<U>;
fn and_then<U>(self, f: impl FnOnce(T) -> ScanStatus<U>) -> ScanStatus<U>;
}
impl<T> ScanStatusExt<T> for ScanStatus<T> {
fn map_complete<U>(self, f: impl FnOnce(T) -> U) -> ScanStatus<U> {
match self {
ScanStatus::Complete(v) => ScanStatus::Complete(f(v)),
ScanStatus::NeedMore => ScanStatus::NeedMore,
ScanStatus::Invalid => ScanStatus::Invalid,
}
}
fn and_then<U>(self, f: impl FnOnce(T) -> ScanStatus<U>) -> ScanStatus<U> {
match self {
ScanStatus::Complete(v) => f(v),
ScanStatus::NeedMore => ScanStatus::NeedMore,
ScanStatus::Invalid => ScanStatus::Invalid,
}
}
}