1use crc32fast::Hasher as Crc32;
2use memmap2::{Mmap, MmapOptions};
3use std::fs::{File, OpenOptions};
4use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::Path;
6
7use crate::error::{FileError, Result};
8
9pub const MAGIC: &[u8; 8] = b"MFFILE01";
10pub const HEADER_LEN: usize = 16; pub const REC_HDR: usize = 8; #[inline]
14pub fn crc32(data: &[u8]) -> u32 {
15 let mut h = Crc32::new();
16 h.update(data);
17 h.finalize()
18}
19#[inline]
20pub fn read_u32_le(buf: &[u8]) -> u32 {
21 u32::from_le_bytes(buf.try_into().unwrap())
22}
23#[inline]
24pub fn write_u32_le(
25 out: &mut [u8],
26 v: u32,
27) {
28 out.copy_from_slice(&v.to_le_bytes());
29}
30
31fn write_header(file: &mut File) -> Result<()> {
33 file.seek(SeekFrom::Start(0))?;
34 let mut buf = [0u8; HEADER_LEN];
35 buf[..8].copy_from_slice(MAGIC);
36 file.write_all(&buf)?;
37 Ok(())
38}
39
40fn check_header(file: &mut File) -> Result<()> {
42 file.seek(SeekFrom::Start(0))?;
43 let mut hdr = [0u8; HEADER_LEN];
44 file.read_exact(&mut hdr)?;
45 if &hdr[..8] != MAGIC {
46 return Err(FileError::BadHeader);
47 }
48 Ok(())
49}
50
51#[derive(Debug)]
52pub struct Writer {
53 pub(crate) file: File,
54 buf: BufWriter<File>,
55 pub(crate) logical_end: u64,
56 prealloc_until: u64,
57 prealloc_chunk: u64,
58}
59
60impl Writer {
61 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
63 crate_name = "file",
64 file_path = %path.as_ref().display(),
65 prealloc_chunk = prealloc_chunk
66 )))]
67 pub fn create<P: AsRef<Path>>(
68 path: P,
69 prealloc_chunk: u64,
70 ) -> Result<Self> {
71 let mut file = OpenOptions::new()
72 .create(true)
73 .truncate(false)
74 .read(true)
75 .write(true)
76 .open(&path)?;
77
78 let meta_len = file.metadata()?.len();
79 if meta_len == 0 {
80 write_header(&mut file)?;
81 } else {
82 check_header(&mut file)?;
83 }
84
85 let (logical_end, file_len) = {
87 let mmap = unsafe { MmapOptions::new().map(&file)? };
88 let l = scan_logical_end(&mmap)?;
89 (l, mmap.len() as u64)
90 };
91
92 let mut prealloc_until = file_len.max(logical_end);
93 if prealloc_chunk > 0 && prealloc_until < logical_end + prealloc_chunk {
94 prealloc_until =
95 (logical_end + prealloc_chunk).max(HEADER_LEN as u64);
96 file.set_len(prealloc_until)?;
97 }
98
99 file.seek(SeekFrom::Start(logical_end))?;
100 let buf = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone()?);
101
102 Ok(Self { file, buf, logical_end, prealloc_until, prealloc_chunk })
103 }
104
105 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, payload), fields(
107 crate_name = "file",
108 payload_size = payload.len(),
109 current_offset = self.logical_end
110 )))]
111 pub fn append(
112 &mut self,
113 payload: &[u8],
114 ) -> Result<u64> {
115 if payload.is_empty() {
116 return Err(FileError::EmptyRecord);
117 }
118 if payload.len() > (u32::MAX as usize) {
119 return Err(FileError::RecordTooLarge(payload.len()));
120 }
121 let need = REC_HDR as u64 + payload.len() as u64;
122 self.ensure_capacity(need)?;
123
124 let offset = self.logical_end;
125 let mut hdr = [0u8; REC_HDR];
126 write_u32_le(&mut hdr[0..4], payload.len() as u32);
127 write_u32_le(&mut hdr[4..8], crc32(payload));
128 self.buf.write_all(&hdr)?;
129 self.buf.write_all(payload)?;
130 self.logical_end += need;
131 Ok(offset)
132 }
133
134 pub fn flush(&mut self) -> Result<()> {
136 self.buf.flush()?;
137 self.file.sync_data()?;
138 Ok(())
139 }
140 pub fn len(&self) -> u64 {
142 self.logical_end
143 }
144
145 pub fn is_empty(&self) -> bool {
147 self.logical_end == HEADER_LEN as u64
148 }
149
150 fn ensure_capacity(
152 &mut self,
153 need: u64,
154 ) -> Result<()> {
155 if self.prealloc_chunk == 0 {
156 return Ok(());
157 }
158 let want = self.logical_end + need;
159 if want <= self.prealloc_until {
160 return Ok(());
161 }
162 let mut new_size = self.prealloc_until;
163 while new_size < want {
164 new_size += self.prealloc_chunk;
165 }
166 self.buf.flush()?;
167 self.file.set_len(new_size)?;
168 self.prealloc_until = new_size;
169 Ok(())
170 }
171}
172
173#[derive(Debug)]
174pub struct Reader {
175 pub(crate) _file: File, pub(crate) mmap: Mmap,
177 pub(crate) logical_end: u64,
178}
179
180impl Reader {
181 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(path), fields(
183 crate_name = "file",
184 file_path = %path.as_ref().display()
185 )))]
186 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
187 let mut file = OpenOptions::new().read(true).open(path)?;
188 check_header(&mut file)?;
189 let mmap = unsafe { MmapOptions::new().map(&file)? };
190 let logical_end = scan_logical_end(&mmap)?;
191 Ok(Self { _file: file, mmap, logical_end })
192 }
193 pub fn logical_len(&self) -> u64 {
195 self.logical_end
196 }
197 pub fn get_at(
199 &self,
200 offset: u64,
201 ) -> Result<&[u8]> {
202 let end = usize::try_from(self.logical_end)
203 .map_err(|_| FileError::BadHeader)?;
204 let p = usize::try_from(offset).map_err(|_| FileError::BadHeader)?;
205 if p + REC_HDR > end {
206 return Err(FileError::BadHeader);
207 }
208 let len: usize = read_u32_le(&self.mmap[p..p + 4]) as usize;
209 let stored_crc = read_u32_le(&self.mmap[p + 4..p + 8]);
210 if len == 0 {
211 return Err(FileError::BadHeader);
212 }
213 let s = p + REC_HDR;
214 let e = s + len;
215 if e > end {
216 return Err(FileError::BadHeader);
217 }
218 let payload = &self.mmap[s..e];
219 if crc32(payload) != stored_crc {
220 return Err(FileError::CrcMismatch(offset));
221 }
222 Ok(payload)
223 }
224 pub fn iter(&self) -> Iter<'_> {
226 Iter { mmap: &self.mmap, p: HEADER_LEN, end: self.logical_end as usize }
227 }
228}
229
230pub struct Iter<'a> {
231 mmap: &'a Mmap,
232 p: usize,
233 end: usize,
234}
235impl<'a> Iterator for Iter<'a> {
236 type Item = &'a [u8];
237 fn next(&mut self) -> Option<Self::Item> {
238 if self.p + REC_HDR > self.end {
239 return None;
240 }
241 let len = read_u32_le(&self.mmap[self.p..self.p + 4]) as usize;
242 let stored_crc = read_u32_le(&self.mmap[self.p + 4..self.p + 8]);
243 if len == 0 {
244 return None;
245 }
246 let s = self.p + REC_HDR;
247 let e = s + len;
248 if e > self.end {
249 return None;
250 }
251 let payload = &self.mmap[s..e];
252 if crc32(payload) != stored_crc {
253 return None;
254 }
255 self.p = e;
256 Some(payload)
257 }
258}
259
260pub fn scan_logical_end(mmap: &Mmap) -> Result<u64> {
262 if mmap.len() < HEADER_LEN {
263 return Err(FileError::BadHeader);
264 }
265 if &mmap[..8] != MAGIC {
266 return Err(FileError::BadHeader);
267 }
268 let mut p = HEADER_LEN;
269 let n = mmap.len();
270 while p + REC_HDR <= n {
271 let len = read_u32_le(&mmap[p..p + 4]) as usize;
272 if len == 0 {
273 break;
274 }
275 let s = p + REC_HDR;
276 let e = s + len;
277 if e > n {
278 break;
279 }
280 let stored_crc = read_u32_le(&mmap[p + 4..p + 8]);
281 if crc32(&mmap[s..e]) != stored_crc {
282 break;
283 }
284 p = e;
285 }
286 Ok(p as u64)
287}
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use tempfile::tempdir;
292
293 #[test]
294 fn reject_zero_length_records() {
295 let dir = tempdir().unwrap();
296 let path = dir.path().join("zero.mff");
297
298 let mut writer = Writer::create(&path, 0).unwrap();
299 let err = writer.append(&[]).unwrap_err();
300 assert!(matches!(err, FileError::EmptyRecord));
301 writer.flush().unwrap();
302 drop(writer);
303
304 let reader = Reader::open(&path).unwrap();
305 assert_eq!(reader.logical_len(), HEADER_LEN as u64);
306 assert_eq!(reader.iter().count(), 0);
307 }
308}