mf_file/zipdoc/formats/
msgpack.rs

1use std::io::{self, Read, Seek, Write};
2use serde::{Serialize, de::DeserializeOwned};
3
4use crate::zipdoc::{ZipDocumentReader, ZipDocumentWriter};
5use crate::zipdoc::snapshot::{
6    SnapshotShardMeta, read_snapshot_shards, for_each_snapshot_shard_raw,
7};
8
9pub fn write_snapshot_shards_msgpack<W, F, T>(
10    zw: &mut ZipDocumentWriter<W>,
11    meta: &SnapshotShardMeta,
12    mut get_shard_value: F,
13    zstd_level: i32,
14) -> io::Result<()>
15where
16    W: Write + Seek,
17    F: FnMut(usize) -> io::Result<T>,
18    T: Serialize,
19{
20    let meta_val = serde_json::to_value(meta)
21        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
22    zw.add_json("snapshot/meta.json", &meta_val)?;
23    for i in 0..meta.num_shards {
24        let v = get_shard_value(i)?;
25        let bytes = rmp_serde::to_vec(&v)
26            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
27        let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
28            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
29        let name = format!("snapshot/shard-{:03}.bin.zst", i);
30        zw.add_stored(&name, &zst)?;
31    }
32    Ok(())
33}
34
35pub fn read_and_decode_snapshot_shards_msgpack<
36    R: Read + Seek,
37    T: DeserializeOwned,
38>(
39    zr: &mut ZipDocumentReader<R>
40) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
41    let (meta, shards_raw) = read_snapshot_shards(zr)?;
42    let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
43    for raw in shards_raw.iter() {
44        let val: T = rmp_serde::from_slice(raw)
45            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
46        out.push(val);
47    }
48    Ok((meta, out))
49}
50
51pub fn for_each_snapshot_shard_msgpack<R: Read + Seek, T, F>(
52    zr: &mut ZipDocumentReader<R>,
53    mut on_shard: F,
54) -> io::Result<SnapshotShardMeta>
55where
56    T: DeserializeOwned,
57    F: FnMut(usize, T) -> io::Result<()>,
58{
59    for_each_snapshot_shard_raw(zr, |i, raw| {
60        let val: T = rmp_serde::from_slice(&raw)
61            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
62        on_shard(i, val)
63    })
64}
65
66pub fn write_parent_map_msgpack<W, T>(
67    zw: &mut ZipDocumentWriter<W>,
68    parent_map: &T,
69    zstd_level: i32,
70) -> io::Result<()>
71where
72    W: Write + Seek,
73    T: Serialize,
74{
75    let bytes = rmp_serde::to_vec(parent_map)
76        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
77    let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
78        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
79    zw.add_stored("snapshot/parent_map.msgpack.zst", &zst)
80}
81
82pub fn read_parent_map_msgpack<R, T>(
83    zr: &mut ZipDocumentReader<R>
84) -> io::Result<T>
85where
86    R: Read + Seek,
87    T: DeserializeOwned,
88{
89    let zst = zr.read_all("snapshot/parent_map.msgpack.zst")?;
90    let raw = zstd::stream::decode_all(&zst[..])
91        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
92    let val: T = rmp_serde::from_slice(&raw)
93        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
94    Ok(val)
95}