use crate::align::write_padding;
use crate::bplus::ArrayBPlusBuilder;
use crate::config::{PageAlignment, WriterOptions};
use crate::encode::{write_f64, write_false, write_integer, write_null, write_string, write_true};
use crate::error::WriteError;
use crate::frame::{close_frame, Frame, ObjectState};
use crate::sink::{Sink, WriteCtx};
use crate::types::{FLAGS, MAGIC, VERSION};
pub struct Writer<S: Sink> {
pub(crate) sink: S,
pub(crate) pos: u64,
scratch: Vec<u8>,
pub(crate) padding_written: u64,
pub(crate) frames: Vec<Frame>,
pub(crate) opts: WriterOptions,
pub(crate) root_offset: Option<u64>,
pub(crate) poisoned: bool,
}
impl<S: Sink> Writer<S> {
pub fn new(sink: S) -> Self {
Self::with_options(sink, WriterOptions::default())
.expect("default WriterOptions must validate")
}
pub fn with_options(sink: S, opts: WriterOptions) -> Result<Self, WriteError> {
opts.policy.validate()?;
let mut w = Self {
sink,
pos: 0,
scratch: Vec::with_capacity(64),
padding_written: 0,
frames: Vec::new(),
opts,
root_offset: None,
poisoned: false,
};
if w.write_header().is_err() {
w.poisoned = true;
}
Ok(w)
}
pub fn bytes_written(&self) -> u64 {
self.pos
}
pub fn padding_bytes_written(&self) -> u64 {
self.padding_written
}
pub fn buffered_bytes(&self) -> usize {
let mut total = self.scratch.capacity();
total += self.frames.capacity() * std::mem::size_of::<Frame>();
for f in &self.frames {
total += match f {
Frame::Array(b) => b.buffered_bytes(),
Frame::Object(o) => {
let mut t =
o.current_run.capacity() * std::mem::size_of::<(Vec<u8>, u64, u64)>();
t += o.current_run_key_bytes;
t += o.runs_cascade.buffered_bytes();
if let Some((kb, _)) = &o.pending_key {
t += kb.capacity();
}
t
}
};
}
total
}
pub fn finish(mut self) -> Result<S, WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
if !self.frames.is_empty() {
self.poisoned = true;
return Err(WriteError::Poisoned);
}
let root = self.root_offset.ok_or(WriteError::EmptyDocument)?;
if let PageAlignment::Aligned { page_size } = self.opts.policy.align {
let ps = page_size as u64;
let target_mod = ps - 12;
let cur_mod = self.pos % ps;
let pad = if cur_mod <= target_mod {
target_mod - cur_mod
} else {
ps - cur_mod + target_mod
};
if pad > 0 {
let mut ctx = WriteCtx {
sink: &mut self.sink,
pos: &mut self.pos,
scratch: &mut self.scratch,
padding_written: &mut self.padding_written,
};
write_padding(&mut ctx, pad as usize)?;
}
}
let mut trailer = [0u8; 12];
trailer[..8].copy_from_slice(&root.to_le_bytes());
trailer[8..].copy_from_slice(&MAGIC);
self.sink.write_all(&trailer)?;
self.pos += trailer.len() as u64;
Ok(self.sink)
}
pub fn push_null(&mut self) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| {
write_null(b);
Ok(())
})?;
self.register_value(off)
}
pub fn push_bool(&mut self, v: bool) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| {
if v {
write_true(b);
} else {
write_false(b);
}
Ok(())
})?;
self.register_value(off)
}
pub fn push_i64(&mut self, v: i64) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| write_integer(b, v as i128))?;
self.register_value(off)
}
pub fn push_u64(&mut self, v: u64) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| write_integer(b, v as i128))?;
self.register_value(off)
}
pub fn push_f64(&mut self, v: f64) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| write_f64(b, v))?;
self.register_value(off)
}
pub fn push_str(&mut self, s: &str) -> Result<(), WriteError> {
self.check_ready()?;
let off = self.emit_scalar(|b| {
write_string(b, s);
Ok(())
})?;
self.register_value(off)
}
pub(crate) fn push_array_frame(&mut self) {
self.frames.push(Frame::Array(ArrayBPlusBuilder::new()));
}
pub(crate) fn push_object_frame(&mut self) {
self.frames.push(Frame::Object(ObjectState::default()));
}
pub(crate) fn set_pending_key(&mut self, key: &str) -> Result<(), WriteError> {
let off = self.emit_scalar(|b| {
write_string(b, key);
Ok(())
})?;
match self.frames.last_mut() {
Some(Frame::Object(o)) => {
o.set_pending_key(key.as_bytes().to_vec(), off);
Ok(())
}
_ => unreachable!(
"set_pending_key is pub(crate) and only called from ObjectBuilder \
with an Object frame on top"
),
}
}
pub(crate) fn close_array_frame(&mut self) -> Result<(), WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
let Some(frame @ Frame::Array(_)) = self.frames.pop() else {
unreachable!("top frame is not an array")
};
let mut ctx = WriteCtx {
sink: &mut self.sink,
pos: &mut self.pos,
scratch: &mut self.scratch,
padding_written: &mut self.padding_written,
};
let root_off = close_frame(frame, &self.opts.policy, &mut ctx)?;
self.register_value(root_off)
}
pub(crate) fn close_object_frame(&mut self) -> Result<(), WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
let Some(frame @ Frame::Object(_)) = self.frames.pop() else {
unreachable!("top frame is not an object")
};
let mut ctx = WriteCtx {
sink: &mut self.sink,
pos: &mut self.pos,
scratch: &mut self.scratch,
padding_written: &mut self.padding_written,
};
let root_off = close_frame(frame, &self.opts.policy, &mut ctx)?;
self.register_value(root_off)
}
fn write_header(&mut self) -> Result<(), WriteError> {
let header = [MAGIC[0], MAGIC[1], MAGIC[2], MAGIC[3], VERSION, FLAGS];
self.sink.write_all(&header)?;
self.pos += header.len() as u64;
Ok(())
}
fn check_ready(&self) -> Result<(), WriteError> {
if self.poisoned {
return Err(WriteError::Poisoned);
}
Ok(())
}
fn emit_scalar(
&mut self,
build: impl FnOnce(&mut Vec<u8>) -> Result<(), WriteError>,
) -> Result<u64, WriteError> {
self.scratch.clear();
build(&mut self.scratch)?;
let off = self.pos;
self.sink.write_all(&self.scratch)?;
self.pos += self.scratch.len() as u64;
Ok(off)
}
fn register_value(&mut self, off: u64) -> Result<(), WriteError> {
let run_buffer = self.opts.object_sort_window;
let policy = &self.opts.policy;
let mut ctx = WriteCtx {
sink: &mut self.sink,
pos: &mut self.pos,
scratch: &mut self.scratch,
padding_written: &mut self.padding_written,
};
match self.frames.last_mut() {
None => {
if self.root_offset.is_some() {
return Err(WriteError::MultipleRootValues);
}
self.root_offset = Some(off);
Ok(())
}
Some(Frame::Array(a)) => a.push(off, policy, &mut ctx),
Some(Frame::Object(o)) => o.accept_value(off, run_buffer, policy, &mut ctx),
}
}
}