use crate::bplus::{ArrayBPlusBuilder, ObjectBPlusBuilder, ObjectCascade, ObjectLeafItem};
use crate::config::{BuildPolicy, WriterOptions};
use crate::error::WriteError;
use crate::sink::{Sink, WriteCtx};
use crate::types::{EMPTY_ARRAY, EMPTY_OBJECT};
#[derive(Default, Clone)]
pub(crate) struct ObjectState {
pub(crate) current_run: Vec<(Vec<u8>, u64, u64)>, pub(crate) runs_cascade: ObjectCascade,
pub(crate) pending_key: Option<(Vec<u8>, u64)>,
pub(crate) current_run_key_bytes: usize,
}
impl ObjectState {
pub(crate) fn set_pending_key(&mut self, key_bytes: Vec<u8>, key_off: u64) {
debug_assert!(
self.pending_key.is_none(),
"ObjectBuilder consumes pending_key on every push; double-set is unreachable"
);
self.pending_key = Some((key_bytes, key_off));
}
pub(crate) fn accept_value<S: Sink>(
&mut self,
val_off: u64,
run_buffer: usize,
policy: &BuildPolicy,
ctx: &mut WriteCtx<'_, S>,
) -> Result<(), WriteError> {
let (kb, koff) = self.pending_key.take().expect(
"ObjectBuilder always sets pending_key before producing a value; None is unreachable",
);
self.current_run_key_bytes += kb.capacity();
self.current_run.push((kb, koff, val_off));
if self.current_run.len() >= run_buffer {
self.flush_run(policy, ctx)?;
}
Ok(())
}
fn flush_run<S: Sink>(
&mut self,
policy: &BuildPolicy,
ctx: &mut WriteCtx<'_, S>,
) -> Result<(), WriteError> {
let mut run = std::mem::take(&mut self.current_run);
self.current_run_key_bytes = 0;
if run.is_empty() {
return Ok(());
}
run.sort_by(|a, b| a.0.cmp(&b.0));
let mut deduped: Vec<(Vec<u8>, u64, u64)> = Vec::with_capacity(run.len());
for entry in run {
match deduped.last_mut() {
Some(last) if last.0 == entry.0 => *last = entry,
_ => deduped.push(entry),
}
}
let mut builder = ObjectBPlusBuilder::new();
for (kb, k, v) in deduped {
builder.push(
ObjectLeafItem {
key_off: k,
val_off: v,
key_bytes: kb,
},
policy,
ctx,
)?;
}
let entry = builder
.finalize(policy, ctx)?
.expect("non-empty run yields a root");
self.runs_cascade.push(1, entry, policy, ctx)
}
fn finalize<S: Sink>(
mut self,
policy: &BuildPolicy,
ctx: &mut WriteCtx<'_, S>,
) -> Result<u64, WriteError> {
debug_assert!(
self.pending_key.is_none(),
"ObjectBuilder consumes pending_key before close; Some is unreachable"
);
if !self.current_run.is_empty() {
self.flush_run(policy, ctx)?;
}
match self.runs_cascade.finalize(policy, ctx)? {
None => {
let off = *ctx.pos;
ctx.sink.write_all(&[EMPTY_OBJECT])?;
*ctx.pos += 1;
Ok(off)
}
Some(root) => Ok(root.node_off),
}
}
}
#[derive(Clone)]
pub(crate) enum Frame {
Array(ArrayBPlusBuilder),
Object(ObjectState),
}
pub(crate) fn close_frame<S: Sink>(
frame: Frame,
policy: &BuildPolicy,
ctx: &mut WriteCtx<'_, S>,
) -> Result<u64, WriteError> {
match frame {
Frame::Array(builder) => match builder.finalize(policy, ctx)? {
Some((_, off)) => Ok(off),
None => {
let off = *ctx.pos;
ctx.sink.write_all(&[EMPTY_ARRAY])?;
*ctx.pos += 1;
Ok(off)
}
},
Frame::Object(obj) => obj.finalize(policy, ctx),
}
}
pub(crate) fn register_into_frame<S: Sink>(
frame: &mut Frame,
off: u64,
opts: &WriterOptions,
ctx: &mut WriteCtx<'_, S>,
) -> Result<(), WriteError> {
match frame {
Frame::Array(a) => a.push(off, &opts.policy, ctx),
Frame::Object(o) => o.accept_value(off, opts.object_sort_window, &opts.policy, ctx),
}
}