mf_file/zipdoc/formats/
msgpack.rs1use std::io::{self, Read, Seek, Write};
2use serde::{Serialize, de::DeserializeOwned};
3
4use crate::zipdoc::{ZipDocumentReader, ZipDocumentWriter};
5use crate::zipdoc::snapshot::{
6 SnapshotShardMeta, read_snapshot_shards, for_each_snapshot_shard_raw,
7};
8
9pub fn write_snapshot_shards_msgpack<W, F, T>(
10 zw: &mut ZipDocumentWriter<W>,
11 meta: &SnapshotShardMeta,
12 mut get_shard_value: F,
13 zstd_level: i32,
14) -> io::Result<()>
15where
16 W: Write + Seek,
17 F: FnMut(usize) -> io::Result<T>,
18 T: Serialize,
19{
20 let meta_val = serde_json::to_value(meta)
21 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
22 zw.add_json("snapshot/meta.json", &meta_val)?;
23 for i in 0..meta.num_shards {
24 let v = get_shard_value(i)?;
25 let bytes = rmp_serde::to_vec(&v)
26 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
27 let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
28 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
29 let name = format!("snapshot/shard-{:03}.bin.zst", i);
30 zw.add_stored(&name, &zst)?;
31 }
32 Ok(())
33}
34
35pub fn read_and_decode_snapshot_shards_msgpack<
36 R: Read + Seek,
37 T: DeserializeOwned,
38>(
39 zr: &mut ZipDocumentReader<R>
40) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
41 let (meta, shards_raw) = read_snapshot_shards(zr)?;
42 let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
43 for raw in shards_raw.iter() {
44 let val: T = rmp_serde::from_slice(raw)
45 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
46 out.push(val);
47 }
48 Ok((meta, out))
49}
50
51pub fn for_each_snapshot_shard_msgpack<R: Read + Seek, T, F>(
52 zr: &mut ZipDocumentReader<R>,
53 mut on_shard: F,
54) -> io::Result<SnapshotShardMeta>
55where
56 T: DeserializeOwned,
57 F: FnMut(usize, T) -> io::Result<()>,
58{
59 for_each_snapshot_shard_raw(zr, |i, raw| {
60 let val: T = rmp_serde::from_slice(&raw)
61 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
62 on_shard(i, val)
63 })
64}
65
66pub fn write_parent_map_msgpack<W, T>(
67 zw: &mut ZipDocumentWriter<W>,
68 parent_map: &T,
69 zstd_level: i32,
70) -> io::Result<()>
71where
72 W: Write + Seek,
73 T: Serialize,
74{
75 let bytes = rmp_serde::to_vec(parent_map)
76 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
77 let zst = zstd::stream::encode_all(&bytes[..], zstd_level)
78 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
79 zw.add_stored("snapshot/parent_map.msgpack.zst", &zst)
80}
81
82pub fn read_parent_map_msgpack<R, T>(
83 zr: &mut ZipDocumentReader<R>
84) -> io::Result<T>
85where
86 R: Read + Seek,
87 T: DeserializeOwned,
88{
89 let zst = zr.read_all("snapshot/parent_map.msgpack.zst")?;
90 let raw = zstd::stream::decode_all(&zst[..])
91 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
92 let val: T = rmp_serde::from_slice(&raw)
93 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
94 Ok(val)
95}