mf_file/zipdoc/
snapshot.rs1use std::io::{self, Read, Seek, Write};
2use std::path::Path;
3use rayon::prelude::*;
4use serde::{Serialize, Deserialize, de::DeserializeOwned};
5
6use super::writer::ZipDocumentWriter;
7use super::reader::ZipDocumentReader;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SnapshotShardMeta {
13 pub root_id: String,
14 pub num_shards: usize,
15 pub counts: Vec<usize>,
16}
17
18#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(zw, meta, get_shard_bytes), fields(
22 crate_name = "file",
23 shard_count = meta.shard_count,
24 zstd_level = zstd_level
25)))]
26pub fn write_snapshot_shards<W, F>(
27 zw: &mut ZipDocumentWriter<W>,
28 meta: &SnapshotShardMeta,
29 mut get_shard_bytes: F,
30 zstd_level: i32,
31) -> io::Result<()>
32where
33 W: Write + Seek,
34 F: FnMut(usize) -> io::Result<Vec<u8>>,
35{
36 let meta_val = serde_json::to_value(meta).map_err(io::Error::other)?;
38 zw.add_json("snapshot/meta.json", &meta_val)?;
39 for i in 0..meta.num_shards {
41 let raw = get_shard_bytes(i)?;
42 let zst = zstd::stream::encode_all(&raw[..], zstd_level)
43 .map_err(io::Error::other)?;
44 let name = format!("snapshot/shard-{i:03}.bin.zst");
45 zw.add_stored(&name, &zst)?;
46 }
47 Ok(())
48}
49
50#[cfg_attr(
52 feature = "dev-tracing",
53 tracing::instrument(skip(zr), fields(crate_name = "file"))
54)]
55pub fn read_snapshot_shards<R: Read + Seek>(
56 zr: &mut ZipDocumentReader<R>
57) -> io::Result<(SnapshotShardMeta, Vec<Vec<u8>>)> {
58 let meta_bytes = zr.read_all("snapshot/meta.json")?;
59 let meta: SnapshotShardMeta =
60 serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
61 let mut compressed: Vec<Vec<u8>> = Vec::with_capacity(meta.num_shards);
63 for i in 0..meta.num_shards {
64 let name = format!("snapshot/shard-{i:03}.bin.zst");
65 let zst = zr.read_all(&name)?;
66 compressed.push(zst);
67 }
68 let shards: Vec<Vec<u8>> = compressed
69 .into_par_iter()
70 .map(|zst| zstd::stream::decode_all(&zst[..]).map_err(io::Error::other))
71 .collect::<Result<Vec<_>, _>>()?;
72 Ok((meta, shards))
73}
74
75pub fn read_and_decode_snapshot_shards<R: Read + Seek, T: DeserializeOwned>(
77 zr: &mut ZipDocumentReader<R>
78) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
79 let (meta, shards_raw) = read_snapshot_shards(zr)?;
80 let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
82 for raw in shards_raw.iter() {
83 let (val, _): (T, _) =
84 bincode::serde::decode_from_slice(raw, bincode::config::standard())
85 .map_err(io::Error::other)?;
86 out.push(val);
87 }
88 Ok((meta, out))
89}
90
91pub fn for_each_snapshot_shard_raw<R: Read + Seek, F>(
93 zr: &mut ZipDocumentReader<R>,
94 mut on_shard: F,
95) -> io::Result<SnapshotShardMeta>
96where
97 F: FnMut(usize, Vec<u8>) -> io::Result<()>,
98{
99 let meta_bytes = zr.read_all("snapshot/meta.json")?;
100 let meta: SnapshotShardMeta =
101 serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
102 for i in 0..meta.num_shards {
103 let name = format!("snapshot/shard-{i:03}.bin.zst");
104 let zst = zr.read_all(&name)?;
105 let raw =
106 zstd::stream::decode_all(&zst[..]).map_err(io::Error::other)?;
107 on_shard(i, raw)?;
108 }
109 Ok(meta)
110}
111
112#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path, meta_json, schema_xml, shard_meta, get_shard_bytes), fields(
114 crate_name = "file",
115 file_path = %path.as_ref().display(),
116 shard_count = shard_meta.counts,
117 schema_size = schema_xml.len(),
118 zstd_level = zstd_level
119)))]
120pub fn export_zip_with_shards<P, F>(
121 path: P,
122 meta_json: &serde_json::Value,
123 schema_xml: &[u8],
124 shard_meta: &SnapshotShardMeta,
125 get_shard_bytes: F,
126 zstd_level: i32,
127) -> io::Result<()>
128where
129 P: AsRef<Path>,
130 F: FnMut(usize) -> io::Result<Vec<u8>>,
131{
132 let file = std::fs::File::create(path)?;
133 let mut zw = ZipDocumentWriter::new(file)?;
134 zw.add_json("meta.json", meta_json)?;
135 zw.add_deflated("schema.xml", schema_xml)?;
136 write_snapshot_shards(&mut zw, shard_meta, get_shard_bytes, zstd_level)?;
137 let _ = zw.finalize()?;
138 Ok(())
139}
140
141#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
143 crate_name = "file",
144 file_path = %path.as_ref().display()
145)))]
146pub fn import_zip_with_shards<P, T>(
147 path: P
148) -> io::Result<(serde_json::Value, Vec<u8>, SnapshotShardMeta, Vec<T>)>
149where
150 P: AsRef<Path>,
151 T: DeserializeOwned,
152{
153 let file = std::fs::File::open(path)?;
154 let mut zr = ZipDocumentReader::new(file)?;
155 let meta_json = zr.read_all("meta.json")?;
156 let meta_val: serde_json::Value =
157 serde_json::from_slice(&meta_json).map_err(io::Error::other)?;
158 let schema_xml = zr.read_all("schema.xml")?;
159 let (shard_meta, decoded) =
160 read_and_decode_snapshot_shards::<_, T>(&mut zr)?;
161 Ok((meta_val, schema_xml, shard_meta, decoded))
162}