mf_file/
record.rs

1use crc32fast::Hasher as Crc32;
2use memmap2::{Mmap, MmapOptions};
3use std::fs::{File, OpenOptions};
4use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::Path;
6
7use crate::error::{FileError, Result};
8
9pub const MAGIC: &[u8; 8] = b"MFFILE01";
10pub const HEADER_LEN: usize = 16; // 8 字节魔数 + 8 字节预留区
11pub const REC_HDR: usize = 8; // 记录头: u32 负载长度 + u32 CRC32
12
13#[inline]
14pub fn crc32(data: &[u8]) -> u32 {
15    let mut h = Crc32::new();
16    h.update(data);
17    h.finalize()
18}
19#[inline]
20pub fn read_u32_le(buf: &[u8]) -> u32 {
21    u32::from_le_bytes(buf.try_into().unwrap())
22}
23#[inline]
24pub fn write_u32_le(
25    out: &mut [u8],
26    v: u32,
27) {
28    out.copy_from_slice(&v.to_le_bytes());
29}
30
31// 写入文件头(包含魔数)
32fn write_header(file: &mut File) -> Result<()> {
33    file.seek(SeekFrom::Start(0))?;
34    let mut buf = [0u8; HEADER_LEN];
35    buf[..8].copy_from_slice(MAGIC);
36    file.write_all(&buf)?;
37    Ok(())
38}
39
40// 校验文件头(校验魔数)
41fn check_header(file: &mut File) -> Result<()> {
42    file.seek(SeekFrom::Start(0))?;
43    let mut hdr = [0u8; HEADER_LEN];
44    file.read_exact(&mut hdr)?;
45    if &hdr[..8] != MAGIC {
46        return Err(FileError::BadHeader);
47    }
48    Ok(())
49}
50
51#[derive(Debug)]
52pub struct Writer {
53    pub(crate) file: File,
54    buf: BufWriter<File>,
55    pub(crate) logical_end: u64,
56    prealloc_until: u64,
57    prealloc_chunk: u64,
58}
59
60impl Writer {
61    // 创建写入器; prealloc_chunk 为预分配块大小(0 表示不预分配)
62    pub fn create<P: AsRef<Path>>(
63        path: P,
64        prealloc_chunk: u64,
65    ) -> Result<Self> {
66        let mut file = OpenOptions::new()
67            .create(true)
68            .read(true)
69            .write(true)
70            .open(&path)?;
71
72        let meta_len = file.metadata()?.len();
73        if meta_len == 0 {
74            write_header(&mut file)?;
75        } else {
76            check_header(&mut file)?;
77        }
78
79        // 通过 mmap 扫描逻辑结尾(容忍尾部不完整记录)
80        let (logical_end, file_len) = {
81            let mmap = unsafe { MmapOptions::new().map(&file)? };
82            let l = scan_logical_end(&mmap)?;
83            (l, mmap.len() as u64)
84        };
85
86        let mut prealloc_until = file_len.max(logical_end);
87        let prealloc_chunk = prealloc_chunk;
88        if prealloc_chunk > 0 {
89            if prealloc_until < logical_end + prealloc_chunk {
90                prealloc_until =
91                    (logical_end + prealloc_chunk).max(HEADER_LEN as u64);
92                file.set_len(prealloc_until)?;
93            }
94        }
95
96        file.seek(SeekFrom::Start(logical_end))?;
97        let buf = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone()?);
98
99        Ok(Self { file, buf, logical_end, prealloc_until, prealloc_chunk })
100    }
101
102    // 追加一条记录,返回该记录的起始偏移
103    pub fn append(
104        &mut self,
105        payload: &[u8],
106    ) -> Result<u64> {
107        if payload.len() > (u32::MAX as usize) {
108            return Err(FileError::RecordTooLarge(payload.len()));
109        }
110        let need = REC_HDR as u64 + payload.len() as u64;
111        self.ensure_capacity(need)?;
112
113        let offset = self.logical_end;
114        let mut hdr = [0u8; REC_HDR];
115        write_u32_le(&mut hdr[0..4], payload.len() as u32);
116        write_u32_le(&mut hdr[4..8], crc32(payload));
117        self.buf.write_all(&hdr)?;
118        self.buf.write_all(payload)?;
119        self.logical_end += need;
120        Ok(offset)
121    }
122
123    // 刷新缓冲区并同步到磁盘
124    pub fn flush(&mut self) -> Result<()> {
125        self.buf.flush()?;
126        self.file.sync_data()?;
127        Ok(())
128    }
129    // 当前逻辑长度
130    pub fn len(&self) -> u64 {
131        self.logical_end
132    }
133
134    // 确保物理空间足够; 按块扩容
135    fn ensure_capacity(
136        &mut self,
137        need: u64,
138    ) -> Result<()> {
139        if self.prealloc_chunk == 0 {
140            return Ok(());
141        }
142        let want = self.logical_end + need;
143        if want <= self.prealloc_until {
144            return Ok(());
145        }
146        let mut new_size = self.prealloc_until;
147        while new_size < want {
148            new_size += self.prealloc_chunk;
149        }
150        self.buf.flush()?;
151        self.file.set_len(new_size)?;
152        self.prealloc_until = new_size;
153        Ok(())
154    }
155}
156
157#[derive(Debug)]
158pub struct Reader {
159    pub(crate) file: File,
160    pub(crate) mmap: Mmap,
161    pub(crate) logical_end: u64,
162}
163
164impl Reader {
165    // 打开只读映射
166    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
167        let mut file = OpenOptions::new().read(true).open(path)?;
168        check_header(&mut file)?;
169        let mmap = unsafe { MmapOptions::new().map(&file)? };
170        let logical_end = scan_logical_end(&mmap)?;
171        Ok(Self { file, mmap, logical_end })
172    }
173    // 逻辑结尾
174    pub fn logical_len(&self) -> u64 {
175        self.logical_end
176    }
177    // 读取指定偏移的记录负载
178    pub fn get_at(
179        &self,
180        offset: u64,
181    ) -> Result<&[u8]> {
182        let end = self.logical_end as usize;
183        let p = offset as usize;
184        if p + REC_HDR > end {
185            return Err(FileError::BadHeader);
186        }
187        let len = read_u32_le(&self.mmap[p..p + 4]) as usize;
188        let stored_crc = read_u32_le(&self.mmap[p + 4..p + 8]);
189        if len == 0 {
190            return Err(FileError::BadHeader);
191        }
192        let s = p + REC_HDR;
193        let e = s + len;
194        if e > end {
195            return Err(FileError::BadHeader);
196        }
197        let payload = &self.mmap[s..e];
198        if crc32(payload) != stored_crc {
199            return Err(FileError::CrcMismatch(offset));
200        }
201        Ok(payload)
202    }
203    // 迭代所有记录(校验 CRC,遇到损坏或不完整即停止)
204    pub fn iter(&self) -> Iter<'_> {
205        Iter { mmap: &self.mmap, p: HEADER_LEN, end: self.logical_end as usize }
206    }
207}
208
209pub struct Iter<'a> {
210    mmap: &'a Mmap,
211    p: usize,
212    end: usize,
213}
214impl<'a> Iterator for Iter<'a> {
215    type Item = &'a [u8];
216    fn next(&mut self) -> Option<Self::Item> {
217        if self.p + REC_HDR > self.end {
218            return None;
219        }
220        let len = read_u32_le(&self.mmap[self.p..self.p + 4]) as usize;
221        let stored_crc = read_u32_le(&self.mmap[self.p + 4..self.p + 8]);
222        if len == 0 {
223            return None;
224        }
225        let s = self.p + REC_HDR;
226        let e = s + len;
227        if e > self.end {
228            return None;
229        }
230        let payload = &self.mmap[s..e];
231        if crc32(payload) != stored_crc {
232            return None;
233        }
234        self.p = e;
235        Some(payload)
236    }
237}
238
239// 扫描逻辑结尾:从文件头开始按记录推进,直到遇到越界/校验失败/零长度
240pub fn scan_logical_end(mmap: &Mmap) -> Result<u64> {
241    if mmap.len() < HEADER_LEN {
242        return Err(FileError::BadHeader);
243    }
244    if &mmap[..8] != MAGIC {
245        return Err(FileError::BadHeader);
246    }
247    let mut p = HEADER_LEN;
248    let n = mmap.len();
249    while p + REC_HDR <= n {
250        let len = read_u32_le(&mmap[p..p + 4]) as usize;
251        if len == 0 {
252            break;
253        }
254        let s = p + REC_HDR;
255        let e = s + len;
256        if e > n {
257            break;
258        }
259        let stored_crc = read_u32_le(&mmap[p + 4..p + 8]);
260        if crc32(&mmap[s..e]) != stored_crc {
261            break;
262        }
263        p = e;
264    }
265    Ok(p as u64)
266}