mf_file/
document.rs

1use std::io;
2use std::io::{Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4
5use blake3::Hasher as Blake3;
6use serde::{Deserialize, Serialize};
7
8use crate::error::{FileError, Result};
9use crate::record::{crc32, read_u32_le, Reader, Writer, HEADER_LEN, REC_HDR};
10
11// 固定尾指针:用于在 finalize 后快速定位目录起始偏移,避免全量扫描
12const TAIL_MAGIC: &[u8; 8] = b"MFFTAIL1"; // 8B 魔数 + 8B 目录偏移 (LE)
13
14// 段类型:用于描述容器中存储的数据类别
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16pub struct SegmentType(String);
17// 段目录项:记录段的类型、偏移、长度与 CRC
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct SegmentEntry {
20    pub kind: SegmentType,
21    pub offset: u64,
22    pub length: u64,
23    pub crc32: u32,
24}
25
26// 总目录:包含所有段的索引及文件级哈希
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Directory {
29    pub entries: Vec<SegmentEntry>,
30    pub flags: u32,
31    pub file_hash: [u8; 32],
32}
33
34// 文档写入器:基于 append-only 文件写入段,并在末尾写目录
35pub struct DocumentWriter {
36    w: Writer,
37    segments: Vec<SegmentEntry>,
38    path: PathBuf,
39}
40impl DocumentWriter {
41    // 开始写入
42    pub fn begin<P: AsRef<Path>>(path: P) -> Result<Self> {
43        let p = path.as_ref().to_path_buf();
44        Ok(Self { w: Writer::create(&p, 0)?, segments: Vec::new(), path: p })
45    }
46    // 追加一个段
47    pub fn add_segment(
48        &mut self,
49        kind: SegmentType,
50        payload: &[u8],
51    ) -> Result<()> {
52        let off = self.w.len();
53        let _ = self.w.append(payload)?;
54        let crc = crc32(payload);
55        self.segments.push(SegmentEntry {
56            kind,
57            offset: off,
58            length: (REC_HDR as u64) + payload.len() as u64,
59            crc32: crc,
60        });
61        Ok(())
62    }
63    // 完成写入:生成并写入目录,计算全文件哈希
64    pub fn finalize(mut self) -> Result<()> {
65        // 计算数据哈希
66        self.w.flush()?;
67        let mut hasher = Blake3::new();
68        let r = Reader::open(&self.path)?;
69        for bytes in r.iter() {
70            hasher.update(bytes);
71        }
72        let hash = *hasher.finalize().as_bytes();
73        // 写入目录记录
74        let dir =
75            Directory { entries: self.segments, flags: 0, file_hash: hash };
76        let bytes =
77            bincode::serde::encode_to_vec(&dir, bincode::config::standard())
78                .map_err(io::Error::other)
79                .map_err(FileError::Io)?;
80        let dir_off = self.w.append(&bytes)?;
81        self.w.flush()?;
82
83        // 写入尾指针,不计入逻辑长度:MAGIC(8) + dir_off(8)
84        // 这样 Reader 扫描逻辑结尾仍停在目录记录处,但可通过物理文件尾部快速读取目录偏移
85        {
86            // 直接使用底层文件写入尾部,不更新 logical_end
87            let file = &mut self.w.file;
88            file.seek(SeekFrom::Start(self.w.logical_end))?;
89            file.write_all(TAIL_MAGIC)?;
90            file.write_all(&dir_off.to_le_bytes())?;
91            file.sync_data()?;
92        }
93        Ok(())
94    }
95}
96
97// 文档读取器:读取末尾目录并提供段访问
98pub struct DocumentReader {
99    r: Reader,
100    dir: Directory,
101}
102impl DocumentReader {
103    // 打开并读取目录
104    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
105        let r = Reader::open(path)?;
106        // 优先通过尾指针快速定位目录偏移
107        let mut last_off = HEADER_LEN as u64;
108        let phys_len = r.mmap.len();
109        if phys_len >= 16 {
110            let tail = &r.mmap[phys_len - 16..phys_len];
111            if &tail[..8] == TAIL_MAGIC {
112                let mut off_bytes = [0u8; 8];
113                off_bytes.copy_from_slice(&tail[8..16]);
114                let off = u64::from_le_bytes(off_bytes);
115                // 基本校验:offset 落在逻辑区间内且指向一条有效记录
116                if (off as usize) + REC_HDR <= r.logical_end as usize {
117                    let len =
118                        read_u32_le(&r.mmap[off as usize..off as usize + 4])
119                            as usize;
120                    let s = off as usize + REC_HDR;
121                    let e = s + len;
122                    if e <= r.logical_end as usize {
123                        let stored_crc = read_u32_le(
124                            &r.mmap[off as usize + 4..off as usize + 8],
125                        );
126                        if crc32(&r.mmap[s..e]) == stored_crc {
127                            last_off = off;
128                        }
129                    }
130                }
131            }
132        }
133        // 如尾指针缺失/非法,回退到顺序扫描
134        if last_off == (HEADER_LEN as u64) {
135            let mut p = HEADER_LEN;
136            let end = r.logical_end as usize;
137            let mut fallback_last = HEADER_LEN as u64;
138            while p + REC_HDR <= end {
139                let len = read_u32_le(&r.mmap[p..p + 4]) as usize;
140                if len == 0 {
141                    break;
142                }
143                let s = p + REC_HDR;
144                let e = s + len;
145                if e > end {
146                    break;
147                }
148                let stored_crc = read_u32_le(&r.mmap[p + 4..p + 8]);
149                if crc32(&r.mmap[s..e]) != stored_crc {
150                    break;
151                }
152                fallback_last = p as u64;
153                p = e;
154            }
155            last_off = fallback_last;
156        }
157        let dir_bytes = r.get_at(last_off)?;
158        let (dir, _) = bincode::serde::decode_from_slice::<Directory, _>(
159            dir_bytes,
160            bincode::config::standard(),
161        )
162        .map_err(io::Error::other)
163        .map_err(FileError::Io)?;
164        // 校验除目录外的数据哈希
165        let mut hasher = Blake3::new();
166        let mut q = HEADER_LEN;
167        let end2 = last_off as usize;
168        while q + REC_HDR <= end2 {
169            let len = read_u32_le(&r.mmap[q..q + 4]) as usize;
170            if len == 0 {
171                break;
172            }
173            let s = q + REC_HDR;
174            let e = s + len;
175            if e > end2 {
176                break;
177            }
178            let stored_crc = read_u32_le(&r.mmap[q + 4..q + 8]);
179            if crc32(&r.mmap[s..e]) != stored_crc {
180                break;
181            }
182            hasher.update(&r.mmap[s..e]);
183            q = e;
184        }
185        let calc = *hasher.finalize().as_bytes();
186        if calc != dir.file_hash {
187            return Err(FileError::BadHeader);
188        }
189        Ok(Self { r, dir })
190    }
191
192    // 读取所有指定类型的段
193    pub fn read_segments<F>(
194        &self,
195        kind: SegmentType,
196        mut callback: F,
197    ) -> Result<()>
198    where
199        F: FnMut(usize, &[u8]) -> Result<()>,
200    {
201        for (index, entry) in self.dir.entries.iter().enumerate() {
202            if entry.kind == kind {
203                let bytes = self.r.get_at(entry.offset)?;
204                if crc32(bytes) != entry.crc32 {
205                    return Err(FileError::CrcMismatch(entry.offset));
206                }
207                callback(index, bytes)?;
208            }
209        }
210        Ok(())
211    }
212}