mf_file/zipdoc/
snapshot.rs

1use std::io::{self, Read, Seek, Write};
2use std::path::Path;
3use rayon::prelude::*;
4use serde::{Serialize, Deserialize, de::DeserializeOwned};
5
6use super::writer::ZipDocumentWriter;
7use super::reader::ZipDocumentReader;
8
9// =============== 分片快照辅助函数 ===============
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SnapshotShardMeta {
13    pub root_id: String,
14    pub num_shards: usize,
15    pub counts: Vec<usize>,
16}
17
18/// 写入分片快照:
19/// - snapshot/meta.json(分片元数据,JSON)
20/// - snapshot/shard-XXX.bin.zst(每个分片的压缩数据)
21pub fn write_snapshot_shards<W, F>(
22    zw: &mut ZipDocumentWriter<W>,
23    meta: &SnapshotShardMeta,
24    mut get_shard_bytes: F,
25    zstd_level: i32,
26) -> io::Result<()>
27where
28    W: Write + Seek,
29    F: FnMut(usize) -> io::Result<Vec<u8>>,
30{
31    // 写 meta
32    let meta_val = serde_json::to_value(meta).map_err(io::Error::other)?;
33    zw.add_json("snapshot/meta.json", &meta_val)?;
34    // 写每个分片
35    for i in 0..meta.num_shards {
36        let raw = get_shard_bytes(i)?;
37        let zst = zstd::stream::encode_all(&raw[..], zstd_level)
38            .map_err(io::Error::other)?;
39        let name = format!("snapshot/shard-{i:03}.bin.zst");
40        zw.add_stored(&name, &zst)?;
41    }
42    Ok(())
43}
44
45/// 读取分片快照:返回元数据和解压后的每个分片字节
46pub fn read_snapshot_shards<R: Read + Seek>(
47    zr: &mut ZipDocumentReader<R>
48) -> io::Result<(SnapshotShardMeta, Vec<Vec<u8>>)> {
49    let meta_bytes = zr.read_all("snapshot/meta.json")?;
50    let meta: SnapshotShardMeta =
51        serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
52    // 顺序读取 ZIP 条目(ZIP 读取器不支持并发),但解压并行
53    let mut compressed: Vec<Vec<u8>> = Vec::with_capacity(meta.num_shards);
54    for i in 0..meta.num_shards {
55        let name = format!("snapshot/shard-{i:03}.bin.zst");
56        let zst = zr.read_all(&name)?;
57        compressed.push(zst);
58    }
59    let shards: Vec<Vec<u8>> = compressed
60        .into_par_iter()
61        .map(|zst| zstd::stream::decode_all(&zst[..]).map_err(io::Error::other))
62        .collect::<Result<Vec<_>, _>>()?;
63    Ok((meta, shards))
64}
65
66/// 读取并用 bincode 反序列化每个分片
67pub fn read_and_decode_snapshot_shards<R: Read + Seek, T: DeserializeOwned>(
68    zr: &mut ZipDocumentReader<R>
69) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
70    let (meta, shards_raw) = read_snapshot_shards(zr)?;
71    // 为避免对 T 施加 Send 约束,这里顺序反序列化
72    let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
73    for raw in shards_raw.iter() {
74        let (val, _): (T, _) =
75            bincode::serde::decode_from_slice(raw, bincode::config::standard())
76                .map_err(io::Error::other)?;
77        out.push(val);
78    }
79    Ok((meta, out))
80}
81
82/// 流式:逐个分片解压为原始字节并回调处理,避免一次性加载内存
83pub fn for_each_snapshot_shard_raw<R: Read + Seek, F>(
84    zr: &mut ZipDocumentReader<R>,
85    mut on_shard: F,
86) -> io::Result<SnapshotShardMeta>
87where
88    F: FnMut(usize, Vec<u8>) -> io::Result<()>,
89{
90    let meta_bytes = zr.read_all("snapshot/meta.json")?;
91    let meta: SnapshotShardMeta =
92        serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
93    for i in 0..meta.num_shards {
94        let name = format!("snapshot/shard-{i:03}.bin.zst");
95        let zst = zr.read_all(&name)?;
96        let raw =
97            zstd::stream::decode_all(&zst[..]).map_err(io::Error::other)?;
98        on_shard(i, raw)?;
99    }
100    Ok(meta)
101}
102
103/// 高层接口:导出包含 meta.json、schema.xml 和分片快照的 ZIP 文档
104pub fn export_zip_with_shards<P, F>(
105    path: P,
106    meta_json: &serde_json::Value,
107    schema_xml: &[u8],
108    shard_meta: &SnapshotShardMeta,
109    get_shard_bytes: F,
110    zstd_level: i32,
111) -> io::Result<()>
112where
113    P: AsRef<Path>,
114    F: FnMut(usize) -> io::Result<Vec<u8>>,
115{
116    let file = std::fs::File::create(path)?;
117    let mut zw = ZipDocumentWriter::new(file)?;
118    zw.add_json("meta.json", meta_json)?;
119    zw.add_deflated("schema.xml", schema_xml)?;
120    write_snapshot_shards(&mut zw, shard_meta, get_shard_bytes, zstd_level)?;
121    let _ = zw.finalize()?;
122    Ok(())
123}
124
125/// 高层接口:导入 ZIP 文档,返回(meta.json、schema.xml、分片元数据、解码后的分片)
126pub fn import_zip_with_shards<P, T>(
127    path: P
128) -> io::Result<(serde_json::Value, Vec<u8>, SnapshotShardMeta, Vec<T>)>
129where
130    P: AsRef<Path>,
131    T: DeserializeOwned,
132{
133    let file = std::fs::File::open(path)?;
134    let mut zr = ZipDocumentReader::new(file)?;
135    let meta_json = zr.read_all("meta.json")?;
136    let meta_val: serde_json::Value =
137        serde_json::from_slice(&meta_json).map_err(io::Error::other)?;
138    let schema_xml = zr.read_all("schema.xml")?;
139    let (shard_meta, decoded) =
140        read_and_decode_snapshot_shards::<_, T>(&mut zr)?;
141    Ok((meta_val, schema_xml, shard_meta, decoded))
142}