mf_file/zipdoc/
snapshot.rs

1use std::io::{self, Read, Seek, Write};
2use std::path::Path;
3use rayon::prelude::*;
4use serde::{Serialize, Deserialize, de::DeserializeOwned};
5
6use super::writer::ZipDocumentWriter;
7use super::reader::ZipDocumentReader;
8
9// =============== 分片快照辅助函数 ===============
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SnapshotShardMeta {
13    pub root_id: String,
14    pub num_shards: usize,
15    pub counts: Vec<usize>,
16}
17
18/// 写入分片快照:
19/// - snapshot/meta.json(分片元数据,JSON)
20/// - snapshot/shard-XXX.bin.zst(每个分片的压缩数据)
21pub fn write_snapshot_shards<W, F>(
22    zw: &mut ZipDocumentWriter<W>,
23    meta: &SnapshotShardMeta,
24    mut get_shard_bytes: F,
25    zstd_level: i32,
26) -> io::Result<()>
27where
28    W: Write + Seek,
29    F: FnMut(usize) -> io::Result<Vec<u8>>,
30{
31    // 写 meta
32    let meta_val = serde_json::to_value(meta)
33        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
34    zw.add_json("snapshot/meta.json", &meta_val)?;
35    // 写每个分片
36    for i in 0..meta.num_shards {
37        let raw = get_shard_bytes(i)?;
38        let zst = zstd::stream::encode_all(&raw[..], zstd_level)
39            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
40        let name = format!("snapshot/shard-{:03}.bin.zst", i);
41        zw.add_stored(&name, &zst)?;
42    }
43    Ok(())
44}
45
46/// 读取分片快照:返回元数据和解压后的每个分片字节
47pub fn read_snapshot_shards<R: Read + Seek>(
48    zr: &mut ZipDocumentReader<R>
49) -> io::Result<(SnapshotShardMeta, Vec<Vec<u8>>)> {
50    let meta_bytes = zr.read_all("snapshot/meta.json")?;
51    let meta: SnapshotShardMeta = serde_json::from_slice(&meta_bytes)
52        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
53    // 顺序读取 ZIP 条目(ZIP 读取器不支持并发),但解压并行
54    let mut compressed: Vec<Vec<u8>> = Vec::with_capacity(meta.num_shards);
55    for i in 0..meta.num_shards {
56        let name = format!("snapshot/shard-{:03}.bin.zst", i);
57        let zst = zr.read_all(&name)?;
58        compressed.push(zst);
59    }
60    let shards: Vec<Vec<u8>> = compressed
61        .into_par_iter()
62        .map(|zst| {
63            zstd::stream::decode_all(&zst[..])
64                .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
65        })
66        .collect::<Result<Vec<_>, _>>()?;
67    Ok((meta, shards))
68}
69
70/// 读取并用 bincode 反序列化每个分片
71pub fn read_and_decode_snapshot_shards<R: Read + Seek, T: DeserializeOwned>(
72    zr: &mut ZipDocumentReader<R>
73) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
74    let (meta, shards_raw) = read_snapshot_shards(zr)?;
75    // 为避免对 T 施加 Send 约束,这里顺序反序列化
76    let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
77    for raw in shards_raw.iter() {
78        let (val, _): (T, _) =
79            bincode::serde::decode_from_slice(raw, bincode::config::standard())
80                .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
81        out.push(val);
82    }
83    Ok((meta, out))
84}
85
86/// 流式:逐个分片解压为原始字节并回调处理,避免一次性加载内存
87pub fn for_each_snapshot_shard_raw<R: Read + Seek, F>(
88    zr: &mut ZipDocumentReader<R>,
89    mut on_shard: F,
90) -> io::Result<SnapshotShardMeta>
91where
92    F: FnMut(usize, Vec<u8>) -> io::Result<()>,
93{
94    let meta_bytes = zr.read_all("snapshot/meta.json")?;
95    let meta: SnapshotShardMeta = serde_json::from_slice(&meta_bytes)
96        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
97    for i in 0..meta.num_shards {
98        let name = format!("snapshot/shard-{:03}.bin.zst", i);
99        let zst = zr.read_all(&name)?;
100        let raw = zstd::stream::decode_all(&zst[..])
101            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
102        on_shard(i, raw)?;
103    }
104    Ok(meta)
105}
106
107/// 高层接口:导出包含 meta.json、schema.xml 和分片快照的 ZIP 文档
108pub fn export_zip_with_shards<P, F>(
109    path: P,
110    meta_json: &serde_json::Value,
111    schema_xml: &[u8],
112    shard_meta: &SnapshotShardMeta,
113    get_shard_bytes: F,
114    zstd_level: i32,
115) -> io::Result<()>
116where
117    P: AsRef<Path>,
118    F: FnMut(usize) -> io::Result<Vec<u8>>,
119{
120    let file = std::fs::File::create(path)?;
121    let mut zw = ZipDocumentWriter::new(file)?;
122    zw.add_json("meta.json", meta_json)?;
123    zw.add_deflated("schema.xml", schema_xml)?;
124    write_snapshot_shards(&mut zw, shard_meta, get_shard_bytes, zstd_level)?;
125    let _ = zw.finalize()?;
126    Ok(())
127}
128
129/// 高层接口:导入 ZIP 文档,返回(meta.json、schema.xml、分片元数据、解码后的分片)
130pub fn import_zip_with_shards<P, T>(
131    path: P
132) -> io::Result<(serde_json::Value, Vec<u8>, SnapshotShardMeta, Vec<T>)>
133where
134    P: AsRef<Path>,
135    T: DeserializeOwned,
136{
137    let file = std::fs::File::open(path)?;
138    let mut zr = ZipDocumentReader::new(file)?;
139    let meta_json = zr.read_all("meta.json")?;
140    let meta_val: serde_json::Value = serde_json::from_slice(&meta_json)
141        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
142    let schema_xml = zr.read_all("schema.xml")?;
143    let (shard_meta, decoded) =
144        read_and_decode_snapshot_shards::<_, T>(&mut zr)?;
145    Ok((meta_val, schema_xml, shard_meta, decoded))
146}