mf_file/
record.rs

1use crc32fast::Hasher as Crc32;
2use memmap2::{Mmap, MmapOptions};
3use std::fs::{File, OpenOptions};
4use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::Path;
6
7use crate::error::{FileError, Result};
8
9pub const MAGIC: &[u8; 8] = b"MFFILE01";
10pub const HEADER_LEN: usize = 16; // 8 字节魔数 + 8 字节预留区
11pub const REC_HDR: usize = 8; // 记录头: u32 负载长度 + u32 CRC32
12
13#[inline]
14pub fn crc32(data: &[u8]) -> u32 {
15    let mut h = Crc32::new();
16    h.update(data);
17    h.finalize()
18}
19#[inline]
20pub fn read_u32_le(buf: &[u8]) -> u32 {
21    u32::from_le_bytes(buf.try_into().unwrap())
22}
23#[inline]
24pub fn write_u32_le(
25    out: &mut [u8],
26    v: u32,
27) {
28    out.copy_from_slice(&v.to_le_bytes());
29}
30
31// 写入文件头(包含魔数)
32fn write_header(file: &mut File) -> Result<()> {
33    file.seek(SeekFrom::Start(0))?;
34    let mut buf = [0u8; HEADER_LEN];
35    buf[..8].copy_from_slice(MAGIC);
36    file.write_all(&buf)?;
37    Ok(())
38}
39
40// 校验文件头(校验魔数)
41fn check_header(file: &mut File) -> Result<()> {
42    file.seek(SeekFrom::Start(0))?;
43    let mut hdr = [0u8; HEADER_LEN];
44    file.read_exact(&mut hdr)?;
45    if &hdr[..8] != MAGIC {
46        return Err(FileError::BadHeader);
47    }
48    Ok(())
49}
50
51#[derive(Debug)]
52pub struct Writer {
53    pub(crate) file: File,
54    buf: BufWriter<File>,
55    pub(crate) logical_end: u64,
56    prealloc_until: u64,
57    prealloc_chunk: u64,
58}
59
60impl Writer {
61    // 创建写入器; prealloc_chunk 为预分配块大小(0 表示不预分配)
62    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
63        crate_name = "file",
64        file_path = %path.as_ref().display(),
65        prealloc_chunk = prealloc_chunk
66    )))]
67    pub fn create<P: AsRef<Path>>(
68        path: P,
69        prealloc_chunk: u64,
70    ) -> Result<Self> {
71        let mut file = OpenOptions::new()
72            .create(true)
73            .truncate(false)
74            .read(true)
75            .write(true)
76            .open(&path)?;
77
78        let meta_len = file.metadata()?.len();
79        if meta_len == 0 {
80            write_header(&mut file)?;
81        } else {
82            check_header(&mut file)?;
83        }
84
85        // 通过 mmap 扫描逻辑结尾(容忍尾部不完整记录)
86        let (logical_end, file_len) = {
87            let mmap = unsafe { MmapOptions::new().map(&file)? };
88            let l = scan_logical_end(&mmap)?;
89            (l, mmap.len() as u64)
90        };
91
92        let mut prealloc_until = file_len.max(logical_end);
93        if prealloc_chunk > 0 && prealloc_until < logical_end + prealloc_chunk {
94            prealloc_until =
95                (logical_end + prealloc_chunk).max(HEADER_LEN as u64);
96            file.set_len(prealloc_until)?;
97        }
98
99        file.seek(SeekFrom::Start(logical_end))?;
100        let buf = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone()?);
101
102        Ok(Self { file, buf, logical_end, prealloc_until, prealloc_chunk })
103    }
104
105    // 追加一条记录,返回该记录的起始偏移
106    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, payload), fields(
107        crate_name = "file",
108        payload_size = payload.len(),
109        current_offset = self.logical_end
110    )))]
111    pub fn append(
112        &mut self,
113        payload: &[u8],
114    ) -> Result<u64> {
115        if payload.is_empty() {
116            return Err(FileError::EmptyRecord);
117        }
118        if payload.len() > (u32::MAX as usize) {
119            return Err(FileError::RecordTooLarge(payload.len()));
120        }
121        let need = REC_HDR as u64 + payload.len() as u64;
122        self.ensure_capacity(need)?;
123
124        let offset = self.logical_end;
125        let mut hdr = [0u8; REC_HDR];
126        write_u32_le(&mut hdr[0..4], payload.len() as u32);
127        write_u32_le(&mut hdr[4..8], crc32(payload));
128        self.buf.write_all(&hdr)?;
129        self.buf.write_all(payload)?;
130        self.logical_end += need;
131        Ok(offset)
132    }
133
134    // 刷新缓冲区并同步到磁盘
135    pub fn flush(&mut self) -> Result<()> {
136        self.buf.flush()?;
137        self.file.sync_data()?;
138        Ok(())
139    }
140    // 当前逻辑长度
141    pub fn len(&self) -> u64 {
142        self.logical_end
143    }
144
145    // 检查是否为空
146    pub fn is_empty(&self) -> bool {
147        self.logical_end == HEADER_LEN as u64
148    }
149
150    // 确保物理空间足够; 按块扩容
151    fn ensure_capacity(
152        &mut self,
153        need: u64,
154    ) -> Result<()> {
155        if self.prealloc_chunk == 0 {
156            return Ok(());
157        }
158        let want = self.logical_end + need;
159        if want <= self.prealloc_until {
160            return Ok(());
161        }
162        let mut new_size = self.prealloc_until;
163        while new_size < want {
164            new_size += self.prealloc_chunk;
165        }
166        self.buf.flush()?;
167        self.file.set_len(new_size)?;
168        self.prealloc_until = new_size;
169        Ok(())
170    }
171}
172
173#[derive(Debug)]
174pub struct Reader {
175    pub(crate) _file: File, // 保持文件句柄存活以维持 mmap 有效性
176    pub(crate) mmap: Mmap,
177    pub(crate) logical_end: u64,
178}
179
180impl Reader {
181    // 打开只读映射
182    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
183        crate_name = "file",
184        file_path = %path.as_ref().display()
185    )))]
186    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
187        let mut file = OpenOptions::new().read(true).open(path)?;
188        check_header(&mut file)?;
189        let mmap = unsafe { MmapOptions::new().map(&file)? };
190        let logical_end = scan_logical_end(&mmap)?;
191        Ok(Self { _file: file, mmap, logical_end })
192    }
193    // 逻辑结尾
194    pub fn logical_len(&self) -> u64 {
195        self.logical_end
196    }
197    // 读取指定偏移的记录负载
198    pub fn get_at(
199        &self,
200        offset: u64,
201    ) -> Result<&[u8]> {
202        let end = usize::try_from(self.logical_end)
203            .map_err(|_| FileError::BadHeader)?;
204        let p = usize::try_from(offset).map_err(|_| FileError::BadHeader)?;
205        if p + REC_HDR > end {
206            return Err(FileError::BadHeader);
207        }
208        let len: usize = read_u32_le(&self.mmap[p..p + 4]) as usize;
209        let stored_crc = read_u32_le(&self.mmap[p + 4..p + 8]);
210        if len == 0 {
211            return Err(FileError::BadHeader);
212        }
213        let s = p + REC_HDR;
214        let e = s + len;
215        if e > end {
216            return Err(FileError::BadHeader);
217        }
218        let payload = &self.mmap[s..e];
219        if crc32(payload) != stored_crc {
220            return Err(FileError::CrcMismatch(offset));
221        }
222        Ok(payload)
223    }
224    // 迭代所有记录(校验 CRC,遇到损坏或不完整即停止)
225    pub fn iter(&self) -> Iter<'_> {
226        Iter { mmap: &self.mmap, p: HEADER_LEN, end: self.logical_end as usize }
227    }
228}
229
230pub struct Iter<'a> {
231    mmap: &'a Mmap,
232    p: usize,
233    end: usize,
234}
235impl<'a> Iterator for Iter<'a> {
236    type Item = &'a [u8];
237    fn next(&mut self) -> Option<Self::Item> {
238        if self.p + REC_HDR > self.end {
239            return None;
240        }
241        let len = read_u32_le(&self.mmap[self.p..self.p + 4]) as usize;
242        let stored_crc = read_u32_le(&self.mmap[self.p + 4..self.p + 8]);
243        if len == 0 {
244            return None;
245        }
246        let s = self.p + REC_HDR;
247        let e = s + len;
248        if e > self.end {
249            return None;
250        }
251        let payload = &self.mmap[s..e];
252        if crc32(payload) != stored_crc {
253            return None;
254        }
255        self.p = e;
256        Some(payload)
257    }
258}
259
260// 扫描逻辑结尾:从文件头开始按记录推进,直到遇到越界/校验失败/零长度
261pub fn scan_logical_end(mmap: &Mmap) -> Result<u64> {
262    if mmap.len() < HEADER_LEN {
263        return Err(FileError::BadHeader);
264    }
265    if &mmap[..8] != MAGIC {
266        return Err(FileError::BadHeader);
267    }
268    let mut p = HEADER_LEN;
269    let n = mmap.len();
270    while p + REC_HDR <= n {
271        let len = read_u32_le(&mmap[p..p + 4]) as usize;
272        if len == 0 {
273            break;
274        }
275        let s = p + REC_HDR;
276        let e = s + len;
277        if e > n {
278            break;
279        }
280        let stored_crc = read_u32_le(&mmap[p + 4..p + 8]);
281        if crc32(&mmap[s..e]) != stored_crc {
282            break;
283        }
284        p = e;
285    }
286    Ok(p as u64)
287}
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use tempfile::tempdir;
292
293    #[test]
294    fn reject_zero_length_records() {
295        let dir = tempdir().unwrap();
296        let path = dir.path().join("zero.mff");
297
298        let mut writer = Writer::create(&path, 0).unwrap();
299        let err = writer.append(&[]).unwrap_err();
300        assert!(matches!(err, FileError::EmptyRecord));
301        writer.flush().unwrap();
302        drop(writer);
303
304        let reader = Reader::open(&path).unwrap();
305        assert_eq!(reader.logical_len(), HEADER_LEN as u64);
306        assert_eq!(reader.iter().count(), 0);
307    }
308}