mf_file/zipdoc/
snapshot.rs1use std::io::{self, Read, Seek, Write};
2use std::path::Path;
3use rayon::prelude::*;
4use serde::{Serialize, Deserialize, de::DeserializeOwned};
5
6use super::writer::ZipDocumentWriter;
7use super::reader::ZipDocumentReader;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SnapshotShardMeta {
13 pub root_id: String,
14 pub num_shards: usize,
15 pub counts: Vec<usize>,
16}
17
18pub fn write_snapshot_shards<W, F>(
22 zw: &mut ZipDocumentWriter<W>,
23 meta: &SnapshotShardMeta,
24 mut get_shard_bytes: F,
25 zstd_level: i32,
26) -> io::Result<()>
27where
28 W: Write + Seek,
29 F: FnMut(usize) -> io::Result<Vec<u8>>,
30{
31 let meta_val = serde_json::to_value(meta)
33 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
34 zw.add_json("snapshot/meta.json", &meta_val)?;
35 for i in 0..meta.num_shards {
37 let raw = get_shard_bytes(i)?;
38 let zst = zstd::stream::encode_all(&raw[..], zstd_level)
39 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
40 let name = format!("snapshot/shard-{:03}.bin.zst", i);
41 zw.add_stored(&name, &zst)?;
42 }
43 Ok(())
44}
45
46pub fn read_snapshot_shards<R: Read + Seek>(
48 zr: &mut ZipDocumentReader<R>
49) -> io::Result<(SnapshotShardMeta, Vec<Vec<u8>>)> {
50 let meta_bytes = zr.read_all("snapshot/meta.json")?;
51 let meta: SnapshotShardMeta = serde_json::from_slice(&meta_bytes)
52 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
53 let mut compressed: Vec<Vec<u8>> = Vec::with_capacity(meta.num_shards);
55 for i in 0..meta.num_shards {
56 let name = format!("snapshot/shard-{:03}.bin.zst", i);
57 let zst = zr.read_all(&name)?;
58 compressed.push(zst);
59 }
60 let shards: Vec<Vec<u8>> = compressed
61 .into_par_iter()
62 .map(|zst| {
63 zstd::stream::decode_all(&zst[..])
64 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
65 })
66 .collect::<Result<Vec<_>, _>>()?;
67 Ok((meta, shards))
68}
69
70pub fn read_and_decode_snapshot_shards<R: Read + Seek, T: DeserializeOwned>(
72 zr: &mut ZipDocumentReader<R>
73) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
74 let (meta, shards_raw) = read_snapshot_shards(zr)?;
75 let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
77 for raw in shards_raw.iter() {
78 let (val, _): (T, _) =
79 bincode::serde::decode_from_slice(raw, bincode::config::standard())
80 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
81 out.push(val);
82 }
83 Ok((meta, out))
84}
85
86pub fn for_each_snapshot_shard_raw<R: Read + Seek, F>(
88 zr: &mut ZipDocumentReader<R>,
89 mut on_shard: F,
90) -> io::Result<SnapshotShardMeta>
91where
92 F: FnMut(usize, Vec<u8>) -> io::Result<()>,
93{
94 let meta_bytes = zr.read_all("snapshot/meta.json")?;
95 let meta: SnapshotShardMeta = serde_json::from_slice(&meta_bytes)
96 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
97 for i in 0..meta.num_shards {
98 let name = format!("snapshot/shard-{:03}.bin.zst", i);
99 let zst = zr.read_all(&name)?;
100 let raw = zstd::stream::decode_all(&zst[..])
101 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
102 on_shard(i, raw)?;
103 }
104 Ok(meta)
105}
106
107pub fn export_zip_with_shards<P, F>(
109 path: P,
110 meta_json: &serde_json::Value,
111 schema_xml: &[u8],
112 shard_meta: &SnapshotShardMeta,
113 get_shard_bytes: F,
114 zstd_level: i32,
115) -> io::Result<()>
116where
117 P: AsRef<Path>,
118 F: FnMut(usize) -> io::Result<Vec<u8>>,
119{
120 let file = std::fs::File::create(path)?;
121 let mut zw = ZipDocumentWriter::new(file)?;
122 zw.add_json("meta.json", meta_json)?;
123 zw.add_deflated("schema.xml", schema_xml)?;
124 write_snapshot_shards(&mut zw, shard_meta, get_shard_bytes, zstd_level)?;
125 let _ = zw.finalize()?;
126 Ok(())
127}
128
129pub fn import_zip_with_shards<P, T>(
131 path: P
132) -> io::Result<(serde_json::Value, Vec<u8>, SnapshotShardMeta, Vec<T>)>
133where
134 P: AsRef<Path>,
135 T: DeserializeOwned,
136{
137 let file = std::fs::File::open(path)?;
138 let mut zr = ZipDocumentReader::new(file)?;
139 let meta_json = zr.read_all("meta.json")?;
140 let meta_val: serde_json::Value = serde_json::from_slice(&meta_json)
141 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
142 let schema_xml = zr.read_all("schema.xml")?;
143 let (shard_meta, decoded) =
144 read_and_decode_snapshot_shards::<_, T>(&mut zr)?;
145 Ok((meta_val, schema_xml, shard_meta, decoded))
146}