mf_file/zipdoc/formats/
msgpack.rs

1use std::io::{self, Read, Seek, Write};
2use serde::{Serialize, de::DeserializeOwned};
3
4use crate::zipdoc::{ZipDocumentReader, ZipDocumentWriter};
5use crate::zipdoc::snapshot::{
6    SnapshotShardMeta, read_snapshot_shards, for_each_snapshot_shard_raw,
7};
8
9pub fn write_snapshot_shards_msgpack<W, F, T>(
10    zw: &mut ZipDocumentWriter<W>,
11    meta: &SnapshotShardMeta,
12    mut get_shard_value: F,
13    zstd_level: i32,
14) -> io::Result<()>
15where
16    W: Write + Seek,
17    F: FnMut(usize) -> io::Result<T>,
18    T: Serialize,
19{
20    let meta_val = serde_json::to_value(meta).map_err(io::Error::other)?;
21    zw.add_json("snapshot/meta.json", &meta_val)?;
22    for i in 0..meta.num_shards {
23        let v = get_shard_value(i)?;
24        let bytes = rmp_serde::to_vec(&v).map_err(io::Error::other)?;
25        let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
26            .map_err(io::Error::other)?;
27        let name = format!("snapshot/shard-{i:03}.bin.zst");
28        zw.add_stored(&name, &zst)?;
29    }
30    Ok(())
31}
32
33pub fn read_and_decode_snapshot_shards_msgpack<
34    R: Read + Seek,
35    T: DeserializeOwned,
36>(
37    zr: &mut ZipDocumentReader<R>
38) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
39    let (meta, shards_raw) = read_snapshot_shards(zr)?;
40    let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
41    for raw in shards_raw.iter() {
42        let val: T = rmp_serde::from_slice(raw).map_err(io::Error::other)?;
43        out.push(val);
44    }
45    Ok((meta, out))
46}
47
48pub fn for_each_snapshot_shard_msgpack<R: Read + Seek, T, F>(
49    zr: &mut ZipDocumentReader<R>,
50    mut on_shard: F,
51) -> io::Result<SnapshotShardMeta>
52where
53    T: DeserializeOwned,
54    F: FnMut(usize, T) -> io::Result<()>,
55{
56    for_each_snapshot_shard_raw(zr, |i, raw| {
57        let val: T = rmp_serde::from_slice(&raw).map_err(io::Error::other)?;
58        on_shard(i, val)
59    })
60}
61
62pub fn write_parent_map_msgpack<W, T>(
63    zw: &mut ZipDocumentWriter<W>,
64    parent_map: &T,
65    zstd_level: i32,
66) -> io::Result<()>
67where
68    W: Write + Seek,
69    T: Serialize,
70{
71    let bytes = rmp_serde::to_vec(parent_map).map_err(io::Error::other)?;
72    let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
73        .map_err(io::Error::other)?;
74    zw.add_stored("snapshot/parent_map.msgpack.zst", &zst)
75}
76
77pub fn read_parent_map_msgpack<R, T>(
78    zr: &mut ZipDocumentReader<R>
79) -> io::Result<T>
80where
81    R: Read + Seek,
82    T: DeserializeOwned,
83{
84    let zst = zr.read_all("snapshot/parent_map.msgpack.zst")?;
85    let raw = zstd::stream::decode_all(&zst[..]).map_err(io::Error::other)?;
86    let val: T = rmp_serde::from_slice(&raw).map_err(io::Error::other)?;
87    Ok(val)
88}