mf_file/
document.rs

1use std::io;
2use std::io::{Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4
5use blake3::Hasher as Blake3;
6use serde::{Deserialize, Serialize};
7
8use crate::error::{FileError, Result};
9use crate::record::{crc32, read_u32_le, Reader, Writer, HEADER_LEN, REC_HDR};
10
11// 固定尾指针:用于在 finalize 后快速定位目录起始偏移,避免全量扫描
12const TAIL_MAGIC: &[u8; 8] = b"MFFTAIL1"; // 8B 魔数 + 8B 目录偏移 (LE)
13
14// 段类型:用于描述容器中存储的数据类别
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16pub struct SegmentType(pub String);
17// 段目录项:记录段的类型、偏移、长度与 CRC
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct SegmentEntry {
20    pub kind: SegmentType,
21    pub offset: u64,
22    pub length: u64,
23    pub crc32: u32,
24}
25
26// 总目录:包含所有段的索引及文件级哈希
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Directory {
29    pub entries: Vec<SegmentEntry>,
30    pub flags: u32,
31    pub file_hash: [u8; 32],
32}
33
34// 文档写入器:基于 append-only 文件写入段,并在末尾写目录
35pub struct DocumentWriter {
36    w: Writer,
37    segments: Vec<SegmentEntry>,
38    path: PathBuf,
39}
40impl DocumentWriter {
41    // 开始写入
42    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
43        crate_name = "file",
44        file_path = %path.as_ref().display()
45    )))]
46    pub fn begin<P: AsRef<Path>>(path: P) -> Result<Self> {
47        let p = path.as_ref().to_path_buf();
48        Ok(Self { w: Writer::create(&p, 0)?, segments: Vec::new(), path: p })
49    }
50    // 追加一个段
51    pub fn add_segment(
52        &mut self,
53        kind: SegmentType,
54        payload: &[u8],
55    ) -> Result<()> {
56        let off = self.w.len();
57        let _ = self.w.append(payload)?;
58        let crc = crc32(payload);
59        self.segments.push(SegmentEntry {
60            kind,
61            offset: off,
62            length: (REC_HDR as u64) + payload.len() as u64,
63            crc32: crc,
64        });
65        Ok(())
66    }
67    // 完成写入:生成并写入目录,计算全文件哈希
68    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
69        crate_name = "file",
70        segment_count = self.segments.len(),
71        file_path = %self.path.display()
72    )))]
73    pub fn finalize(mut self) -> Result<()> {
74        // 计算数据哈希
75        self.w.flush()?;
76        let mut hasher = Blake3::new();
77        let r = Reader::open(&self.path)?;
78        for bytes in r.iter() {
79            hasher.update(bytes);
80        }
81        let hash = *hasher.finalize().as_bytes();
82        // 写入目录记录
83        let dir =
84            Directory { entries: self.segments, flags: 0, file_hash: hash };
85        let bytes =
86            bincode::serde::encode_to_vec(&dir, bincode::config::standard())
87                .map_err(io::Error::other)
88                .map_err(FileError::Io)?;
89        let dir_off = self.w.append(&bytes)?;
90        self.w.flush()?;
91
92        // 写入尾指针,不计入逻辑长度:MAGIC(8) + dir_off(8)
93        // 这样 Reader 扫描逻辑结尾仍停在目录记录处,但可通过物理文件尾部快速读取目录偏移
94        {
95            // 直接使用底层文件写入尾部,不更新 logical_end
96            let file = &mut self.w.file;
97            file.seek(SeekFrom::Start(self.w.logical_end))?;
98            file.write_all(TAIL_MAGIC)?;
99            file.write_all(&dir_off.to_le_bytes())?;
100            file.sync_data()?;
101        }
102        Ok(())
103    }
104}
105
106// 文档读取器:读取末尾目录并提供段访问
107pub struct DocumentReader {
108    r: Reader,
109    dir: Directory,
110}
111impl DocumentReader {
112    // 打开并读取目录
113    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
114        crate_name = "file",
115        file_path = %path.as_ref().display()
116    )))]
117    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
118        let r = Reader::open(path)?;
119        // 优先通过尾指针快速定位目录偏移
120        let mut last_off = HEADER_LEN as u64;
121        let phys_len = r.mmap.len();
122        if phys_len >= 16 {
123            let tail = &r.mmap[phys_len - 16..phys_len];
124            if &tail[..8] == TAIL_MAGIC {
125                let mut off_bytes = [0u8; 8];
126                off_bytes.copy_from_slice(&tail[8..16]);
127                let off = u64::from_le_bytes(off_bytes);
128                // 基本校验:offset 落在逻辑区间内且指向一条有效记录
129                if (off as usize) + REC_HDR <= r.logical_end as usize {
130                    let len =
131                        read_u32_le(&r.mmap[off as usize..off as usize + 4])
132                            as usize;
133                    let s = off as usize + REC_HDR;
134                    let e = s + len;
135                    if e <= r.logical_end as usize {
136                        let stored_crc = read_u32_le(
137                            &r.mmap[off as usize + 4..off as usize + 8],
138                        );
139                        if crc32(&r.mmap[s..e]) == stored_crc {
140                            last_off = off;
141                        }
142                    }
143                }
144            }
145        }
146        // 如尾指针缺失/非法,回退到顺序扫描
147        if last_off == (HEADER_LEN as u64) {
148            let mut p = HEADER_LEN;
149            let end = r.logical_end as usize;
150            let mut fallback_last = HEADER_LEN as u64;
151            while p + REC_HDR <= end {
152                let len = read_u32_le(&r.mmap[p..p + 4]) as usize;
153                if len == 0 {
154                    break;
155                }
156                let s = p + REC_HDR;
157                let e = s + len;
158                if e > end {
159                    break;
160                }
161                let stored_crc = read_u32_le(&r.mmap[p + 4..p + 8]);
162                if crc32(&r.mmap[s..e]) != stored_crc {
163                    break;
164                }
165                fallback_last = p as u64;
166                p = e;
167            }
168            last_off = fallback_last;
169        }
170        let dir_bytes = r.get_at(last_off)?;
171        let (dir, _) = bincode::serde::decode_from_slice::<Directory, _>(
172            dir_bytes,
173            bincode::config::standard(),
174        )
175        .map_err(io::Error::other)
176        .map_err(FileError::Io)?;
177        // 校验除目录外的数据哈希
178        let mut hasher = Blake3::new();
179        let mut q = HEADER_LEN;
180        let end2 = last_off as usize;
181        while q + REC_HDR <= end2 {
182            let len = read_u32_le(&r.mmap[q..q + 4]) as usize;
183            if len == 0 {
184                break;
185            }
186            let s = q + REC_HDR;
187            let e = s + len;
188            if e > end2 {
189                break;
190            }
191            let stored_crc = read_u32_le(&r.mmap[q + 4..q + 8]);
192            if crc32(&r.mmap[s..e]) != stored_crc {
193                break;
194            }
195            hasher.update(&r.mmap[s..e]);
196            q = e;
197        }
198        let calc = *hasher.finalize().as_bytes();
199        if calc != dir.file_hash {
200            return Err(FileError::BadHeader);
201        }
202        Ok(Self { r, dir })
203    }
204
205    // 读取所有指定类型的段
206    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, callback), fields(
207        crate_name = "file",
208        segment_type = ?kind,
209        total_segments = self.dir.segments.len()
210    )))]
211    pub fn read_segments<F>(
212        &self,
213        kind: SegmentType,
214        mut callback: F,
215    ) -> Result<()>
216    where
217        F: FnMut(usize, &[u8]) -> Result<()>,
218    {
219        for (index, entry) in self.dir.entries.iter().enumerate() {
220            if entry.kind == kind {
221                let bytes = self.r.get_at(entry.offset)?;
222                if crc32(bytes) != entry.crc32 {
223                    return Err(FileError::CrcMismatch(entry.offset));
224                }
225                callback(index, bytes)?;
226            }
227        }
228        Ok(())
229    }
230}