mf_file/zipdoc/formats/
msgpack.rs1use std::io::{self, Read, Seek, Write};
2use serde::{Serialize, de::DeserializeOwned};
3
4use crate::zipdoc::{ZipDocumentReader, ZipDocumentWriter};
5use crate::zipdoc::snapshot::{
6 SnapshotShardMeta, read_snapshot_shards, for_each_snapshot_shard_raw,
7};
8
9pub fn write_snapshot_shards_msgpack<W, F, T>(
10 zw: &mut ZipDocumentWriter<W>,
11 meta: &SnapshotShardMeta,
12 mut get_shard_value: F,
13 zstd_level: i32,
14) -> io::Result<()>
15where
16 W: Write + Seek,
17 F: FnMut(usize) -> io::Result<T>,
18 T: Serialize,
19{
20 let meta_val = serde_json::to_value(meta).map_err(io::Error::other)?;
21 zw.add_json("snapshot/meta.json", &meta_val)?;
22 for i in 0..meta.num_shards {
23 let v = get_shard_value(i)?;
24 let bytes = rmp_serde::to_vec(&v).map_err(io::Error::other)?;
25 let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
26 .map_err(io::Error::other)?;
27 let name = format!("snapshot/shard-{i:03}.bin.zst");
28 zw.add_stored(&name, &zst)?;
29 }
30 Ok(())
31}
32
33pub fn read_and_decode_snapshot_shards_msgpack<
34 R: Read + Seek,
35 T: DeserializeOwned,
36>(
37 zr: &mut ZipDocumentReader<R>
38) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
39 let (meta, shards_raw) = read_snapshot_shards(zr)?;
40 let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
41 for raw in shards_raw.iter() {
42 let val: T = rmp_serde::from_slice(raw).map_err(io::Error::other)?;
43 out.push(val);
44 }
45 Ok((meta, out))
46}
47
48pub fn for_each_snapshot_shard_msgpack<R: Read + Seek, T, F>(
49 zr: &mut ZipDocumentReader<R>,
50 mut on_shard: F,
51) -> io::Result<SnapshotShardMeta>
52where
53 T: DeserializeOwned,
54 F: FnMut(usize, T) -> io::Result<()>,
55{
56 for_each_snapshot_shard_raw(zr, |i, raw| {
57 let val: T = rmp_serde::from_slice(&raw).map_err(io::Error::other)?;
58 on_shard(i, val)
59 })
60}
61
62pub fn write_parent_map_msgpack<W, T>(
63 zw: &mut ZipDocumentWriter<W>,
64 parent_map: &T,
65 zstd_level: i32,
66) -> io::Result<()>
67where
68 W: Write + Seek,
69 T: Serialize,
70{
71 let bytes = rmp_serde::to_vec(parent_map).map_err(io::Error::other)?;
72 let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
73 .map_err(io::Error::other)?;
74 zw.add_stored("snapshot/parent_map.msgpack.zst", &zst)
75}
76
77pub fn read_parent_map_msgpack<R, T>(
78 zr: &mut ZipDocumentReader<R>
79) -> io::Result<T>
80where
81 R: Read + Seek,
82 T: DeserializeOwned,
83{
84 let zst = zr.read_all("snapshot/parent_map.msgpack.zst")?;
85 let raw = zstd::stream::decode_all(&zst[..]).map_err(io::Error::other)?;
86 let val: T = rmp_serde::from_slice(&raw).map_err(io::Error::other)?;
87 Ok(val)
88}