mf_file/zipdoc/
snapshot.rs1use std::io::{self, Read, Seek, Write};
2use std::path::Path;
3use rayon::prelude::*;
4use serde::{Serialize, Deserialize, de::DeserializeOwned};
5
6use super::writer::ZipDocumentWriter;
7use super::reader::ZipDocumentReader;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SnapshotShardMeta {
13 pub root_id: String,
14 pub num_shards: usize,
15 pub counts: Vec<usize>,
16}
17
18pub fn write_snapshot_shards<W, F>(
22 zw: &mut ZipDocumentWriter<W>,
23 meta: &SnapshotShardMeta,
24 mut get_shard_bytes: F,
25 zstd_level: i32,
26) -> io::Result<()>
27where
28 W: Write + Seek,
29 F: FnMut(usize) -> io::Result<Vec<u8>>,
30{
31 let meta_val = serde_json::to_value(meta).map_err(io::Error::other)?;
33 zw.add_json("snapshot/meta.json", &meta_val)?;
34 for i in 0..meta.num_shards {
36 let raw = get_shard_bytes(i)?;
37 let zst = zstd::stream::encode_all(&raw[..], zstd_level)
38 .map_err(io::Error::other)?;
39 let name = format!("snapshot/shard-{i:03}.bin.zst");
40 zw.add_stored(&name, &zst)?;
41 }
42 Ok(())
43}
44
45pub fn read_snapshot_shards<R: Read + Seek>(
47 zr: &mut ZipDocumentReader<R>
48) -> io::Result<(SnapshotShardMeta, Vec<Vec<u8>>)> {
49 let meta_bytes = zr.read_all("snapshot/meta.json")?;
50 let meta: SnapshotShardMeta =
51 serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
52 let mut compressed: Vec<Vec<u8>> = Vec::with_capacity(meta.num_shards);
54 for i in 0..meta.num_shards {
55 let name = format!("snapshot/shard-{i:03}.bin.zst");
56 let zst = zr.read_all(&name)?;
57 compressed.push(zst);
58 }
59 let shards: Vec<Vec<u8>> = compressed
60 .into_par_iter()
61 .map(|zst| zstd::stream::decode_all(&zst[..]).map_err(io::Error::other))
62 .collect::<Result<Vec<_>, _>>()?;
63 Ok((meta, shards))
64}
65
66pub fn read_and_decode_snapshot_shards<R: Read + Seek, T: DeserializeOwned>(
68 zr: &mut ZipDocumentReader<R>
69) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
70 let (meta, shards_raw) = read_snapshot_shards(zr)?;
71 let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
73 for raw in shards_raw.iter() {
74 let (val, _): (T, _) =
75 bincode::serde::decode_from_slice(raw, bincode::config::standard())
76 .map_err(io::Error::other)?;
77 out.push(val);
78 }
79 Ok((meta, out))
80}
81
82pub fn for_each_snapshot_shard_raw<R: Read + Seek, F>(
84 zr: &mut ZipDocumentReader<R>,
85 mut on_shard: F,
86) -> io::Result<SnapshotShardMeta>
87where
88 F: FnMut(usize, Vec<u8>) -> io::Result<()>,
89{
90 let meta_bytes = zr.read_all("snapshot/meta.json")?;
91 let meta: SnapshotShardMeta =
92 serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
93 for i in 0..meta.num_shards {
94 let name = format!("snapshot/shard-{i:03}.bin.zst");
95 let zst = zr.read_all(&name)?;
96 let raw =
97 zstd::stream::decode_all(&zst[..]).map_err(io::Error::other)?;
98 on_shard(i, raw)?;
99 }
100 Ok(meta)
101}
102
103pub fn export_zip_with_shards<P, F>(
105 path: P,
106 meta_json: &serde_json::Value,
107 schema_xml: &[u8],
108 shard_meta: &SnapshotShardMeta,
109 get_shard_bytes: F,
110 zstd_level: i32,
111) -> io::Result<()>
112where
113 P: AsRef<Path>,
114 F: FnMut(usize) -> io::Result<Vec<u8>>,
115{
116 let file = std::fs::File::create(path)?;
117 let mut zw = ZipDocumentWriter::new(file)?;
118 zw.add_json("meta.json", meta_json)?;
119 zw.add_deflated("schema.xml", schema_xml)?;
120 write_snapshot_shards(&mut zw, shard_meta, get_shard_bytes, zstd_level)?;
121 let _ = zw.finalize()?;
122 Ok(())
123}
124
125pub fn import_zip_with_shards<P, T>(
127 path: P
128) -> io::Result<(serde_json::Value, Vec<u8>, SnapshotShardMeta, Vec<T>)>
129where
130 P: AsRef<Path>,
131 T: DeserializeOwned,
132{
133 let file = std::fs::File::open(path)?;
134 let mut zr = ZipDocumentReader::new(file)?;
135 let meta_json = zr.read_all("meta.json")?;
136 let meta_val: serde_json::Value =
137 serde_json::from_slice(&meta_json).map_err(io::Error::other)?;
138 let schema_xml = zr.read_all("schema.xml")?;
139 let (shard_meta, decoded) =
140 read_and_decode_snapshot_shards::<_, T>(&mut zr)?;
141 Ok((meta_val, schema_xml, shard_meta, decoded))
142}