use crate::align::write_padding;
use crate::bplus::{ArrayBPlusBuilder, ObjectBPlusBuilder, ObjectCascade, ObjectLeafItem};
use crate::encode::{write_f64, write_false, write_integer, write_null, write_string, write_true};
use crate::error::WriteError;
use crate::sink::{Sink, WriteCtx};
use crate::types::{EMPTY_ARRAY, EMPTY_OBJECT, FLAGS, MAGIC, VERSION};
#[derive(Clone, Debug)]
pub enum NodeSizing {
Fanout(usize),
TargetBytes(usize),
}
#[derive(Clone, Debug)]
pub enum PageAlignment {
None,
Aligned { page_size: usize },
}
#[derive(Clone, Debug)]
pub struct BuildPolicy {
pub sizing: NodeSizing,
pub align: PageAlignment,
}
impl BuildPolicy {
pub const MIN_TARGET_BYTES: usize = 64;
pub fn compact(fanout: usize) -> Self {
Self {
sizing: NodeSizing::Fanout(fanout),
align: PageAlignment::None,
}
}
pub fn disk_aligned(page_size: usize) -> Self {
Self {
sizing: NodeSizing::TargetBytes(page_size),
align: PageAlignment::Aligned { page_size },
}
}
pub(crate) fn validate(&self) -> Result<(), WriteError> {
match self.sizing {
NodeSizing::Fanout(f) if f < 2 => {
return Err(WriteError::InvalidOption(
"fanout must be >= 2 (spec §8 invariant 3)",
));
}
NodeSizing::TargetBytes(t) if t < Self::MIN_TARGET_BYTES => {
return Err(WriteError::InvalidOption("target_node_bytes must be >= 64"));
}
_ => {}
}
if let PageAlignment::Aligned { page_size } = self.align {
if !page_size.is_power_of_two() || page_size < 64 {
return Err(WriteError::InvalidOption(
"page_size must be a power of two and >= 64",
));
}
}
Ok(())
}
}
impl Default for BuildPolicy {
fn default() -> Self {
Self::compact(128)
}
}
#[derive(Clone, Debug)]
pub struct WriterOptions {
pub object_sort_window: usize,
pub policy: BuildPolicy,
}
impl Default for WriterOptions {
fn default() -> Self {
Self {
object_sort_window: 16_384,
policy: BuildPolicy::default(),
}
}
}
#[derive(Default)]
struct ObjectState {
current_run: Vec<(Vec<u8>, u64, u64)>, runs_cascade: ObjectCascade,
pending_key: Option<(Vec<u8>, u64)>,
current_run_key_bytes: usize,
}
impl ObjectState {
fn set_pending_key(&mut self, key_bytes: Vec<u8>, key_off: u64) -> Result<(), WriteError> {
if self.pending_key.is_some() {
return Err(WriteError::MisuseObjectKey);
}
self.pending_key = Some((key_bytes, key_off));
Ok(())
}
fn accept_value<S: Sink>(
&mut self,
val_off: u64,
run_buffer: usize,
policy: &BuildPolicy,
ctx: &mut WriteCtx<'_, S>,
) -> Result<(), WriteError> {
let (kb, koff) = self
.pending_key
.take()
.ok_or(WriteError::MisuseObjectValue)?;
self.current_run_key_bytes += kb.capacity();
self.current_run.push((kb, koff, val_off));
if self.current_run.len() >= run_buffer {
self.flush_run(policy, ctx)?;
}
Ok(())
}
fn flush_run<S: Sink>(
&mut self,
policy: &BuildPolicy,
ctx: &mut WriteCtx<'_, S>,
) -> Result<(), WriteError> {
let mut run = std::mem::take(&mut self.current_run);
self.current_run_key_bytes = 0;
if run.is_empty() {
return Ok(());
}
run.sort_by(|a, b| a.0.cmp(&b.0));
let mut deduped: Vec<(Vec<u8>, u64, u64)> = Vec::with_capacity(run.len());
for entry in run {
match deduped.last_mut() {
Some(last) if last.0 == entry.0 => *last = entry,
_ => deduped.push(entry),
}
}
let mut builder = ObjectBPlusBuilder::new();
for (kb, k, v) in deduped {
builder.push(
ObjectLeafItem {
key_off: k,
val_off: v,
key_bytes: kb,
},
policy,
ctx,
)?;
}
let entry = builder
.finalize(policy, ctx)?
.expect("non-empty run yields a root");
self.runs_cascade.push(1, entry, policy, ctx)
}
fn finalize<S: Sink>(
mut self,
policy: &BuildPolicy,
ctx: &mut WriteCtx<'_, S>,
) -> Result<u64, WriteError> {
if self.pending_key.is_some() {
return Err(WriteError::MisuseObjectValue);
}
if !self.current_run.is_empty() {
self.flush_run(policy, ctx)?;
}
match self.runs_cascade.finalize(policy, ctx)? {
None => {
let off = *ctx.pos;
ctx.sink.write_all(&[EMPTY_OBJECT])?;
*ctx.pos += 1;
Ok(off)
}
Some(root) => Ok(root.node_off),
}
}
}
enum Frame {
Array(ArrayBPlusBuilder),
Object(ObjectState),
}
pub struct Writer<S: Sink> {
sink: S,
pos: u64,
scratch: Vec<u8>,
padding_written: u64,
frames: Vec<Frame>,
opts: WriterOptions,
root_offset: Option<u64>,
pub(crate) poisoned: bool,
finished: bool,
}
impl<S: Sink> Writer<S> {
pub fn new(sink: S) -> Self {
Self::with_options(sink, WriterOptions::default())
.expect("default WriterOptions must validate")
}
pub fn with_options(sink: S, opts: WriterOptions) -> Result<Self, WriteError> {
opts.policy.validate()?;
let mut w = Self {
sink,
pos: 0,
scratch: Vec::with_capacity(64),
padding_written: 0,
frames: Vec::new(),
opts,
root_offset: None,
poisoned: false,
finished: false,
};
if w.write_header().is_err() {
w.poisoned = true;
}
Ok(w)
}
pub fn bytes_written(&self) -> u64 {
self.pos
}
pub fn padding_bytes_written(&self) -> u64 {
self.padding_written
}
pub fn buffered_bytes(&self) -> usize {
let mut total = self.scratch.capacity();
total += self.frames.capacity() * std::mem::size_of::<Frame>();
for f in &self.frames {
total += match f {
Frame::Array(b) => b.buffered_bytes(),
Frame::Object(o) => {
let mut t =
o.current_run.capacity() * std::mem::size_of::<(Vec<u8>, u64, u64)>();
t += o.current_run_key_bytes;
t += o.runs_cascade.buffered_bytes();
if let Some((kb, _)) = &o.pending_key {
t += kb.capacity();
}
t
}
};
}
total
}
pub fn finish(mut self) -> Result<S, WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
if self.finished {
return Err(WriteError::AlreadyFinished);
}
if !self.frames.is_empty() {
self.poisoned = true;
return Err(WriteError::Poisoned);
}
let root = self.root_offset.ok_or(WriteError::EmptyDocument)?;
if let PageAlignment::Aligned { page_size } = self.opts.policy.align {
let ps = page_size as u64;
let target_mod = ps - 12;
let cur_mod = self.pos % ps;
let pad = if cur_mod <= target_mod {
target_mod - cur_mod
} else {
ps - cur_mod + target_mod
};
if pad > 0 {
let mut ctx = WriteCtx {
sink: &mut self.sink,
pos: &mut self.pos,
scratch: &mut self.scratch,
padding_written: &mut self.padding_written,
};
write_padding(&mut ctx, pad as usize)?;
}
}
let mut trailer = [0u8; 12];
trailer[..8].copy_from_slice(&root.to_le_bytes());
trailer[8..].copy_from_slice(&MAGIC);
self.sink.write_all(&trailer)?;
self.pos += trailer.len() as u64;
self.finished = true;
Ok(self.sink)
}
pub fn push_null(&mut self) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| {
write_null(b);
Ok(())
})?;
self.register_value(off)
}
pub fn push_bool(&mut self, v: bool) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| {
if v {
write_true(b);
} else {
write_false(b);
}
Ok(())
})?;
self.register_value(off)
}
pub fn push_i64(&mut self, v: i64) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| write_integer(b, v as i128))?;
self.register_value(off)
}
pub fn push_u64(&mut self, v: u64) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| write_integer(b, v as i128))?;
self.register_value(off)
}
pub fn push_f64(&mut self, v: f64) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| write_f64(b, v))?;
self.register_value(off)
}
pub fn push_str(&mut self, s: &str) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| {
write_string(b, s);
Ok(())
})?;
self.register_value(off)
}
pub fn start_array(&mut self) -> crate::builder::ArrayBuilder<'_, S> {
self.push_array_frame();
crate::builder::ArrayBuilder::new(self)
}
pub fn start_object(&mut self) -> crate::builder::ObjectBuilder<'_, S> {
self.push_object_frame();
crate::builder::ObjectBuilder::new(self)
}
pub(crate) fn push_array_frame(&mut self) {
self.frames.push(Frame::Array(ArrayBPlusBuilder::new()));
}
pub(crate) fn push_object_frame(&mut self) {
self.frames.push(Frame::Object(ObjectState::default()));
}
pub(crate) fn set_pending_key(&mut self, key: &str) -> Result<(), WriteError> {
let off = self.emit_scalar(|b| {
write_string(b, key);
Ok(())
})?;
match self.frames.last_mut() {
Some(Frame::Object(o)) => o.set_pending_key(key.as_bytes().to_vec(), off),
_ => Err(WriteError::MisuseObjectKey),
}
}
pub(crate) fn close_array_frame(&mut self) -> Result<(), WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
let Some(Frame::Array(builder)) = self.frames.pop() else {
unreachable!("top frame is not an array")
};
let policy = self.opts.policy.clone();
let root_off = match builder.finalize(&policy, &mut self.ctx())? {
Some((_, off)) => off,
None => {
let off = self.pos;
self.sink.write_all(&[EMPTY_ARRAY])?;
self.pos += 1;
off
}
};
self.register_value(root_off)
}
pub(crate) fn close_object_frame(&mut self) -> Result<(), WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
let Some(Frame::Object(obj)) = self.frames.pop() else {
unreachable!("top frame is not an object")
};
let policy = self.opts.policy.clone();
let root_off = obj.finalize(&policy, &mut self.ctx())?;
self.register_value(root_off)
}
fn write_header(&mut self) -> Result<(), WriteError> {
let header = [MAGIC[0], MAGIC[1], MAGIC[2], MAGIC[3], VERSION, FLAGS];
self.sink.write_all(&header)?;
self.pos += header.len() as u64;
Ok(())
}
fn check_ready(&self) -> Result<(), WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
if self.finished {
return Err(WriteError::AlreadyFinished);
}
Ok(())
}
fn ctx(&mut self) -> WriteCtx<'_, S> {
WriteCtx {
sink: &mut self.sink,
pos: &mut self.pos,
scratch: &mut self.scratch,
padding_written: &mut self.padding_written,
}
}
fn emit_scalar(
&mut self,
build: impl FnOnce(&mut Vec<u8>) -> Result<(), WriteError>,
) -> Result<u64, WriteError> {
self.scratch.clear();
build(&mut self.scratch)?;
let off = self.pos;
self.sink.write_all(&self.scratch)?;
self.pos += self.scratch.len() as u64;
Ok(off)
}
fn register_value(&mut self, off: u64) -> Result<(), WriteError> {
let policy = self.opts.policy.clone();
let run_buffer = self.opts.object_sort_window;
let mut ctx = WriteCtx {
sink: &mut self.sink,
pos: &mut self.pos,
scratch: &mut self.scratch,
padding_written: &mut self.padding_written,
};
match self.frames.last_mut() {
None => {
if self.root_offset.is_some() {
return Err(WriteError::MultipleRootValues);
}
self.root_offset = Some(off);
Ok(())
}
Some(Frame::Array(a)) => a.push(off, &policy, &mut ctx),
Some(Frame::Object(o)) => o.accept_value(off, run_buffer, &policy, &mut ctx),
}
}
}