mf_file/zipdoc/
snapshot.rs

1use std::io::{self, Read, Seek, Write};
2use std::path::Path;
3use rayon::prelude::*;
4use serde::{Serialize, Deserialize, de::DeserializeOwned};
5
6use super::writer::ZipDocumentWriter;
7use super::reader::ZipDocumentReader;
8
9// =============== 分片快照辅助函数 ===============
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SnapshotShardMeta {
13    pub root_id: String,
14    pub num_shards: usize,
15    pub counts: Vec<usize>,
16}
17
18/// 写入分片快照:
19/// - snapshot/meta.json(分片元数据,JSON)
20/// - snapshot/shard-XXX.bin.zst(每个分片的压缩数据)
21#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(zw, meta, get_shard_bytes), fields(
22    crate_name = "file",
23    shard_count = meta.shard_count,
24    zstd_level = zstd_level
25)))]
26pub fn write_snapshot_shards<W, F>(
27    zw: &mut ZipDocumentWriter<W>,
28    meta: &SnapshotShardMeta,
29    mut get_shard_bytes: F,
30    zstd_level: i32,
31) -> io::Result<()>
32where
33    W: Write + Seek,
34    F: FnMut(usize) -> io::Result<Vec<u8>>,
35{
36    // 写 meta
37    let meta_val = serde_json::to_value(meta).map_err(io::Error::other)?;
38    zw.add_json("snapshot/meta.json", &meta_val)?;
39    // 写每个分片
40    for i in 0..meta.num_shards {
41        let raw = get_shard_bytes(i)?;
42        let zst = zstd::stream::encode_all(&raw[..], zstd_level)
43            .map_err(io::Error::other)?;
44        let name = format!("snapshot/shard-{i:03}.bin.zst");
45        zw.add_stored(&name, &zst)?;
46    }
47    Ok(())
48}
49
50/// 读取分片快照:返回元数据和解压后的每个分片字节
51#[cfg_attr(
52    feature = "dev-tracing",
53    tracing::instrument(skip(zr), fields(crate_name = "file"))
54)]
55pub fn read_snapshot_shards<R: Read + Seek>(
56    zr: &mut ZipDocumentReader<R>
57) -> io::Result<(SnapshotShardMeta, Vec<Vec<u8>>)> {
58    let meta_bytes = zr.read_all("snapshot/meta.json")?;
59    let meta: SnapshotShardMeta =
60        serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
61    // 顺序读取 ZIP 条目(ZIP 读取器不支持并发),但解压并行
62    let mut compressed: Vec<Vec<u8>> = Vec::with_capacity(meta.num_shards);
63    for i in 0..meta.num_shards {
64        let name = format!("snapshot/shard-{i:03}.bin.zst");
65        let zst = zr.read_all(&name)?;
66        compressed.push(zst);
67    }
68    let shards: Vec<Vec<u8>> = compressed
69        .into_par_iter()
70        .map(|zst| zstd::stream::decode_all(&zst[..]).map_err(io::Error::other))
71        .collect::<Result<Vec<_>, _>>()?;
72    Ok((meta, shards))
73}
74
75/// 读取并用 bincode 反序列化每个分片
76pub fn read_and_decode_snapshot_shards<R: Read + Seek, T: DeserializeOwned>(
77    zr: &mut ZipDocumentReader<R>
78) -> io::Result<(SnapshotShardMeta, Vec<T>)> {
79    let (meta, shards_raw) = read_snapshot_shards(zr)?;
80    // 为避免对 T 施加 Send 约束,这里顺序反序列化
81    let mut out: Vec<T> = Vec::with_capacity(shards_raw.len());
82    for raw in shards_raw.iter() {
83        let (val, _): (T, _) =
84            bincode::serde::decode_from_slice(raw, bincode::config::standard())
85                .map_err(io::Error::other)?;
86        out.push(val);
87    }
88    Ok((meta, out))
89}
90
91/// 流式:逐个分片解压为原始字节并回调处理,避免一次性加载内存
92pub fn for_each_snapshot_shard_raw<R: Read + Seek, F>(
93    zr: &mut ZipDocumentReader<R>,
94    mut on_shard: F,
95) -> io::Result<SnapshotShardMeta>
96where
97    F: FnMut(usize, Vec<u8>) -> io::Result<()>,
98{
99    let meta_bytes = zr.read_all("snapshot/meta.json")?;
100    let meta: SnapshotShardMeta =
101        serde_json::from_slice(&meta_bytes).map_err(io::Error::other)?;
102    for i in 0..meta.num_shards {
103        let name = format!("snapshot/shard-{i:03}.bin.zst");
104        let zst = zr.read_all(&name)?;
105        let raw =
106            zstd::stream::decode_all(&zst[..]).map_err(io::Error::other)?;
107        on_shard(i, raw)?;
108    }
109    Ok(meta)
110}
111
112/// 高层接口:导出包含 meta.json、schema.xml 和分片快照的 ZIP 文档
113#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path, meta_json, schema_xml, shard_meta, get_shard_bytes), fields(
114    crate_name = "file",
115    file_path = %path.as_ref().display(),
116    shard_count = shard_meta.counts,
117    schema_size = schema_xml.len(),
118    zstd_level = zstd_level
119)))]
120pub fn export_zip_with_shards<P, F>(
121    path: P,
122    meta_json: &serde_json::Value,
123    schema_xml: &[u8],
124    shard_meta: &SnapshotShardMeta,
125    get_shard_bytes: F,
126    zstd_level: i32,
127) -> io::Result<()>
128where
129    P: AsRef<Path>,
130    F: FnMut(usize) -> io::Result<Vec<u8>>,
131{
132    let file = std::fs::File::create(path)?;
133    let mut zw = ZipDocumentWriter::new(file)?;
134    zw.add_json("meta.json", meta_json)?;
135    zw.add_deflated("schema.xml", schema_xml)?;
136    write_snapshot_shards(&mut zw, shard_meta, get_shard_bytes, zstd_level)?;
137    let _ = zw.finalize()?;
138    Ok(())
139}
140
141/// 高层接口:导入 ZIP 文档,返回(meta.json、schema.xml、分片元数据、解码后的分片)
142#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
143    crate_name = "file",
144    file_path = %path.as_ref().display()
145)))]
146pub fn import_zip_with_shards<P, T>(
147    path: P
148) -> io::Result<(serde_json::Value, Vec<u8>, SnapshotShardMeta, Vec<T>)>
149where
150    P: AsRef<Path>,
151    T: DeserializeOwned,
152{
153    let file = std::fs::File::open(path)?;
154    let mut zr = ZipDocumentReader::new(file)?;
155    let meta_json = zr.read_all("meta.json")?;
156    let meta_val: serde_json::Value =
157        serde_json::from_slice(&meta_json).map_err(io::Error::other)?;
158    let schema_xml = zr.read_all("schema.xml")?;
159    let (shard_meta, decoded) =
160        read_and_decode_snapshot_shards::<_, T>(&mut zr)?;
161    Ok((meta_val, schema_xml, shard_meta, decoded))
162}