use libz_ng_sys::*;
use crate::PathMap;
use crate::TrieValue;
use crate::alloc::Allocator;
use crate::zipper::{
ZipperReadOnlyConditionalIteration,
ZipperWriting,
ZipperIteration,
ZipperValues,
};
#[cfg(feature="nightly")]
#[path="paths_serialization_nightly.rs"]
mod paths_serialization_nightly;
#[cfg(feature="nightly")]
pub use paths_serialization_nightly::*;
#[derive(Debug, Clone, Copy)]
pub struct SerializationStats {
pub bytes_out : usize,
pub bytes_in : usize,
pub path_count : usize
}
#[derive(Debug, Clone, Copy)]
pub struct DeserializationStats {
pub bytes_in : usize,
pub bytes_out : usize,
pub path_count : usize
}
pub fn serialize_paths<'a, V, W, RZ>(rz: RZ, target: &mut W) -> std::io::Result<SerializationStats>
where
V: TrieValue,
RZ: ZipperReadOnlyConditionalIteration<'a, V>,
W: std::io::Write
{
serialize_paths_with_auxdata(rz, target, |_, _, _| {})
}
pub fn serialize_paths_with_auxdata<'a, V : TrieValue, RZ : ZipperValues<V> + ZipperIteration, W: std::io::Write, F: FnMut(usize, &[u8], &V) -> ()>(mut rz: RZ, target: &mut W, mut fv: F) -> std::io::Result<SerializationStats> {
let mut k = 0;
serialize_paths_from_funcs(target, &mut rz, |rz| Ok(rz.to_next_val()), |rz| {
let path = rz.path();
fv(k, path, rz.val().unwrap());
k += 1;
Some(path)
})
}
pub fn serialize_paths_from_funcs<PathSrc, AdvanceF, PathF, W>(target: &mut W, src: &mut PathSrc, mut advance_f: AdvanceF, mut path_f: PathF) -> std::io::Result<SerializationStats>
where
AdvanceF: FnMut(&mut PathSrc) -> std::io::Result<bool>,
PathF: FnMut(&PathSrc) -> Option<&[u8]>,
W: std::io::Write,
{
const CHUNK: usize = 4096; let mut buffer = [0u8; CHUNK];
#[allow(invalid_value)] let mut strm: z_stream = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
let mut ret = unsafe { zng_deflateInit(&mut strm, 7) };
assert_eq!(ret, Z_OK);
let mut total_paths : usize = 0;
while advance_f(src)? {
let p = match path_f(src) {
Some(p) => p,
None => continue,
};
let l = p.len();
let mut lin = (l as u32).to_le_bytes();
strm.avail_in = 4;
strm.next_in = lin.as_mut_ptr();
loop {
strm.avail_out = CHUNK as _;
strm.next_out = buffer.as_mut_ptr();
ret = unsafe { deflate(&mut strm, Z_NO_FLUSH) };
assert_ne!(ret, Z_STREAM_ERROR);
let have = CHUNK - strm.avail_out as usize;
target.write_all(&mut buffer[..have])?;
if strm.avail_out != 0 { break }
}
assert_eq!(strm.avail_in, 0);
strm.avail_in = l as _;
strm.next_in = p.as_ptr().cast_mut();
loop {
strm.avail_out = CHUNK as _;
strm.next_out = buffer.as_mut_ptr();
ret = unsafe { deflate(&mut strm, Z_NO_FLUSH) };
assert_ne!(ret, Z_STREAM_ERROR);
let have = CHUNK - strm.avail_out as usize;
target.write_all(&mut buffer[..have])?;
if strm.avail_out != 0 { break }
}
assert_eq!(strm.avail_in, 0);
total_paths += 1;
}
loop {
strm.avail_out = CHUNK as _;
strm.next_out = buffer.as_mut_ptr();
ret = unsafe { deflate(&mut strm, Z_FINISH) };
let have = CHUNK - strm.avail_out as usize;
target.write_all(&buffer[..have])?;
if ret == Z_STREAM_END { break; }
assert_eq!(ret, Z_OK);
}
ret = unsafe { deflateEnd(&mut strm) };
assert_eq!(ret, Z_OK);
Ok(SerializationStats {
bytes_out : strm.total_out,
bytes_in : strm.total_in,
path_count : total_paths
})
}
pub fn deserialize_paths<V: TrieValue, A: Allocator, WZ : ZipperWriting<V, A>, R: std::io::Read>(wz: WZ, source: R, v: V) -> std::io::Result<DeserializationStats> {
deserialize_paths_with_auxdata(wz, source, |_, _| v.clone())
}
pub fn deserialize_paths_with_auxdata<V: TrieValue, A: Allocator, WZ : ZipperWriting<V, A>, R: std::io::Read, F: Fn(usize, &[u8]) -> V>(mut wz: WZ, source: R, fv: F) -> std::io::Result<DeserializationStats> {
let mut submap = PathMap::new_in(wz.alloc());
let r = for_each_deserialized_path(source, |k, p| {
let v = fv(k, p);
submap.set_val_at(p, v);
Ok(())
});
wz.graft_map(submap);
r
}
pub fn for_each_deserialized_path<R: std::io::Read, F: FnMut(usize, &[u8]) -> std::io::Result<()>>(mut source: R, mut f: F) -> std::io::Result<DeserializationStats> {
use libz_ng_sys::*;
const IN: usize = 1024;
const OUT: usize = 2048;
let mut ibuffer = [0u8; IN];
let mut obuffer = [0u8; OUT];
let mut l = 0u32;
let mut lbuf = [0u8; 4];
let mut lbuf_offset = 0;
let mut finished_path = true;
let mut total_paths : usize = 0usize;
#[allow(invalid_value)] let mut strm: z_stream = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
let mut ret = unsafe { zng_inflateInit(&mut strm) };
if ret != Z_OK { return Err(std::io::Error::new(std::io::ErrorKind::Other, "failed to init zlib-ng inflate")) }
let mut wz_buf = vec![];
'reading: loop {
strm.avail_in = source.read(&mut ibuffer)? as _;
if strm.avail_in == 0 { break; }
strm.next_in = &mut ibuffer as _;
'decompressing: loop {
strm.avail_out = OUT as _;
strm.next_out = obuffer.as_mut_ptr();
let mut pos = 0usize;
ret = unsafe { inflate(&mut strm, Z_NO_FLUSH) };
if ret == Z_STREAM_ERROR { return Err(std::io::Error::new(std::io::ErrorKind::Other, "Z_STREAM_ERROR")) }
if strm.avail_out as usize == OUT {
if ret == Z_STREAM_END { break 'reading }
else { continue 'reading }
}
let end = OUT - strm.avail_out as usize;
'descending: loop {
if finished_path {
let have = (end - pos).min(4-lbuf_offset);
lbuf[lbuf_offset..lbuf_offset+have].copy_from_slice(&obuffer[pos..pos+have]);
pos += have;
lbuf_offset += have;
if lbuf_offset == 4 {
l = u32::from_le_bytes(lbuf);
lbuf_offset = 0;
} else {
if strm.avail_in == 0 { continue 'reading }
else { continue 'decompressing }
}
}
if pos + l as usize <= end {
wz_buf.extend(&obuffer[pos..pos + l as usize]);
f(total_paths, &wz_buf[..])?;
wz_buf.clear();
total_paths += 1;
pos += l as usize;
finished_path = true;
if pos == end { continue 'decompressing }
else { continue 'descending }
} else {
wz_buf.extend(&obuffer[pos..end]);
finished_path = false;
l -= (end-pos) as u32;
if strm.avail_in == 0 { continue 'reading }
else { continue 'decompressing }
}
}
}
}
unsafe { inflateEnd(&mut strm) };
Ok(DeserializationStats {
bytes_in : strm.total_in,
bytes_out : strm.total_out,
path_count : total_paths
})
}
#[cfg(test)]
mod test {
use crate::zipper::{ZipperIteration, ZipperValues, ZipperMoving};
use super::*;
#[cfg(not(miri))] #[test]
fn path_serialize_deserialize() {
let mut btm = PathMap::new();
let rs = ["arrow", "bow", "cannon", "roman", "romane", "romanus", "romulus", "rubens", "ruber", "rubicon", "rubicundus", "rom'i"];
rs.iter().for_each(|r| { btm.set_val_at(r.as_bytes(), ()); });
let mut v = vec![];
match serialize_paths(btm.read_zipper(), &mut v) {
Ok(SerializationStats { bytes_out : c, bytes_in : bw, path_count : pw}) => {
println!("ser {} {} {}", c, bw, pw);
println!("vlen {}", v.len());
let mut restored_btm = PathMap::new();
match deserialize_paths(restored_btm.write_zipper(), v.as_slice(), ()) {
Ok(DeserializationStats { bytes_in : c, bytes_out : bw, path_count : pw}) => {
println!("de {} {} {}", c, bw, pw);
let mut lrz = restored_btm.read_zipper();
while lrz.to_next_val() {
assert!(btm.contains(lrz.path()), "{}", std::str::from_utf8(lrz.path()).unwrap());
}
let mut rrz = btm.read_zipper();
while rrz.to_next_val() {
assert!(restored_btm.contains(rrz.path()));
}
}
Err(e) => { println!("de e {}", e) }
}
}
Err(e) => { println!("ser e {}", e) }
}
}
#[cfg(not(miri))] #[test]
fn path_serialize_deserialize_blow_out_buffer() {
for zeros in 0..10 {
println!("{zeros} zeros");
let mut btm = PathMap::new();
let mut rs = vec![];
for i in 0..400 {
rs.push(format!("{}{}{}{}", "0".repeat(zeros), i/100, (i/10)%10, i%10))
}
rs.iter().for_each(|r| { btm.set_val_at(r.as_bytes(), ()); });
let mut v = vec![];
match serialize_paths(btm.read_zipper(), &mut v) {
Ok(SerializationStats { bytes_out : c, bytes_in : bw, path_count : pw}) => {
println!("ser {} {} {}", c, bw, pw);
println!("vlen {}", v.len());
let mut restored_btm = PathMap::new();
match deserialize_paths(restored_btm.write_zipper(), v.as_slice(), ()) {
Ok(DeserializationStats { bytes_in : c, bytes_out : bw, path_count : pw}) => {
println!("de {} {} {}", c, bw, pw);
let mut lrz = restored_btm.read_zipper();
while lrz.to_next_val() {
assert!(btm.contains(lrz.path()), "{}", std::str::from_utf8(lrz.path()).unwrap());
}
let mut rrz = btm.read_zipper();
while rrz.to_next_val() {
assert!(restored_btm.contains(rrz.path()));
}
}
Err(e) => { println!("de e {}", e) }
}
}
Err(e) => { println!("ser e {}", e) }
}
}
}
#[cfg(not(miri))] #[test]
fn path_serialize_deserialize_values() {
let mut btm = PathMap::new();
let rs = ["arrow", "bow", "cannon", "roman", "romane", "romanus", "romulus", "rubens", "ruber", "rubicon", "rubicundus", "rom'i"];
rs.iter().enumerate().for_each(|(i, r)| { btm.set_val_at(r.as_bytes(), i); });
let mut values = vec![];
let mut v = vec![];
match serialize_paths_with_auxdata(btm.read_zipper(), &mut v,
|c, _p, value| { assert_eq!(values.len(), c); values.push(*value) }) {
Ok(SerializationStats { bytes_out : c, bytes_in : bw, path_count : pw}) => {
println!("ser {} {} {}", c, bw, pw);
println!("vlen {}", v.len());
let mut restored_btm = PathMap::new();
match deserialize_paths_with_auxdata(restored_btm.write_zipper(), v.as_slice(), |c, _p| values[c]) {
Ok(DeserializationStats { bytes_in : c, bytes_out : bw, path_count : pw}) => {
println!("de {} {} {}", c, bw, pw);
let mut lrz = restored_btm.read_zipper();
while lrz.to_next_val() {
assert_eq!(btm.get_val_at(lrz.path()), Some(lrz.val().unwrap()));
}
let mut rrz = btm.read_zipper();
while rrz.to_next_val() {
assert_eq!(restored_btm.get_val_at(rrz.path()), Some(rrz.val().unwrap()));
}
}
Err(e) => { println!("de e {}", e) }
}
}
Err(e) => { println!("ser e {}", e) }
}
}
}