1use crc32fast::Hasher as Crc32;
2use memmap2::{Mmap, MmapOptions};
3use std::fs::{File, OpenOptions};
4use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::Path;
6
7use crate::error::{FileError, Result};
8
9pub const MAGIC: &[u8; 8] = b"MFFILE01";
10pub const HEADER_LEN: usize = 16; pub const REC_HDR: usize = 8; #[inline]
14pub fn crc32(data: &[u8]) -> u32 {
15 let mut h = Crc32::new();
16 h.update(data);
17 h.finalize()
18}
19#[inline]
20pub fn read_u32_le(buf: &[u8]) -> u32 {
21 u32::from_le_bytes(buf.try_into().unwrap())
22}
23#[inline]
24pub fn write_u32_le(
25 out: &mut [u8],
26 v: u32,
27) {
28 out.copy_from_slice(&v.to_le_bytes());
29}
30
31fn write_header(file: &mut File) -> Result<()> {
33 file.seek(SeekFrom::Start(0))?;
34 let mut buf = [0u8; HEADER_LEN];
35 buf[..8].copy_from_slice(MAGIC);
36 file.write_all(&buf)?;
37 Ok(())
38}
39
40fn check_header(file: &mut File) -> Result<()> {
42 file.seek(SeekFrom::Start(0))?;
43 let mut hdr = [0u8; HEADER_LEN];
44 file.read_exact(&mut hdr)?;
45 if &hdr[..8] != MAGIC {
46 return Err(FileError::BadHeader);
47 }
48 Ok(())
49}
50
51#[derive(Debug)]
52pub struct Writer {
53 pub(crate) file: File,
54 buf: BufWriter<File>,
55 pub(crate) logical_end: u64,
56 prealloc_until: u64,
57 prealloc_chunk: u64,
58}
59
60impl Writer {
61 pub fn create<P: AsRef<Path>>(
63 path: P,
64 prealloc_chunk: u64,
65 ) -> Result<Self> {
66 let mut file = OpenOptions::new()
67 .create(true)
68 .read(true)
69 .write(true)
70 .open(&path)?;
71
72 let meta_len = file.metadata()?.len();
73 if meta_len == 0 {
74 write_header(&mut file)?;
75 } else {
76 check_header(&mut file)?;
77 }
78
79 let (logical_end, file_len) = {
81 let mmap = unsafe { MmapOptions::new().map(&file)? };
82 let l = scan_logical_end(&mmap)?;
83 (l, mmap.len() as u64)
84 };
85
86 let mut prealloc_until = file_len.max(logical_end);
87 let prealloc_chunk = prealloc_chunk;
88 if prealloc_chunk > 0 {
89 if prealloc_until < logical_end + prealloc_chunk {
90 prealloc_until =
91 (logical_end + prealloc_chunk).max(HEADER_LEN as u64);
92 file.set_len(prealloc_until)?;
93 }
94 }
95
96 file.seek(SeekFrom::Start(logical_end))?;
97 let buf = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone()?);
98
99 Ok(Self { file, buf, logical_end, prealloc_until, prealloc_chunk })
100 }
101
102 pub fn append(
104 &mut self,
105 payload: &[u8],
106 ) -> Result<u64> {
107 if payload.len() > (u32::MAX as usize) {
108 return Err(FileError::RecordTooLarge(payload.len()));
109 }
110 let need = REC_HDR as u64 + payload.len() as u64;
111 self.ensure_capacity(need)?;
112
113 let offset = self.logical_end;
114 let mut hdr = [0u8; REC_HDR];
115 write_u32_le(&mut hdr[0..4], payload.len() as u32);
116 write_u32_le(&mut hdr[4..8], crc32(payload));
117 self.buf.write_all(&hdr)?;
118 self.buf.write_all(payload)?;
119 self.logical_end += need;
120 Ok(offset)
121 }
122
123 pub fn flush(&mut self) -> Result<()> {
125 self.buf.flush()?;
126 self.file.sync_data()?;
127 Ok(())
128 }
129 pub fn len(&self) -> u64 {
131 self.logical_end
132 }
133
134 fn ensure_capacity(
136 &mut self,
137 need: u64,
138 ) -> Result<()> {
139 if self.prealloc_chunk == 0 {
140 return Ok(());
141 }
142 let want = self.logical_end + need;
143 if want <= self.prealloc_until {
144 return Ok(());
145 }
146 let mut new_size = self.prealloc_until;
147 while new_size < want {
148 new_size += self.prealloc_chunk;
149 }
150 self.buf.flush()?;
151 self.file.set_len(new_size)?;
152 self.prealloc_until = new_size;
153 Ok(())
154 }
155}
156
157#[derive(Debug)]
158pub struct Reader {
159 pub(crate) file: File,
160 pub(crate) mmap: Mmap,
161 pub(crate) logical_end: u64,
162}
163
164impl Reader {
165 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
167 let mut file = OpenOptions::new().read(true).open(path)?;
168 check_header(&mut file)?;
169 let mmap = unsafe { MmapOptions::new().map(&file)? };
170 let logical_end = scan_logical_end(&mmap)?;
171 Ok(Self { file, mmap, logical_end })
172 }
173 pub fn logical_len(&self) -> u64 {
175 self.logical_end
176 }
177 pub fn get_at(
179 &self,
180 offset: u64,
181 ) -> Result<&[u8]> {
182 let end = self.logical_end as usize;
183 let p = offset as usize;
184 if p + REC_HDR > end {
185 return Err(FileError::BadHeader);
186 }
187 let len = read_u32_le(&self.mmap[p..p + 4]) as usize;
188 let stored_crc = read_u32_le(&self.mmap[p + 4..p + 8]);
189 if len == 0 {
190 return Err(FileError::BadHeader);
191 }
192 let s = p + REC_HDR;
193 let e = s + len;
194 if e > end {
195 return Err(FileError::BadHeader);
196 }
197 let payload = &self.mmap[s..e];
198 if crc32(payload) != stored_crc {
199 return Err(FileError::CrcMismatch(offset));
200 }
201 Ok(payload)
202 }
203 pub fn iter(&self) -> Iter<'_> {
205 Iter { mmap: &self.mmap, p: HEADER_LEN, end: self.logical_end as usize }
206 }
207}
208
209pub struct Iter<'a> {
210 mmap: &'a Mmap,
211 p: usize,
212 end: usize,
213}
214impl<'a> Iterator for Iter<'a> {
215 type Item = &'a [u8];
216 fn next(&mut self) -> Option<Self::Item> {
217 if self.p + REC_HDR > self.end {
218 return None;
219 }
220 let len = read_u32_le(&self.mmap[self.p..self.p + 4]) as usize;
221 let stored_crc = read_u32_le(&self.mmap[self.p + 4..self.p + 8]);
222 if len == 0 {
223 return None;
224 }
225 let s = self.p + REC_HDR;
226 let e = s + len;
227 if e > self.end {
228 return None;
229 }
230 let payload = &self.mmap[s..e];
231 if crc32(payload) != stored_crc {
232 return None;
233 }
234 self.p = e;
235 Some(payload)
236 }
237}
238
239pub fn scan_logical_end(mmap: &Mmap) -> Result<u64> {
241 if mmap.len() < HEADER_LEN {
242 return Err(FileError::BadHeader);
243 }
244 if &mmap[..8] != MAGIC {
245 return Err(FileError::BadHeader);
246 }
247 let mut p = HEADER_LEN;
248 let n = mmap.len();
249 while p + REC_HDR <= n {
250 let len = read_u32_le(&mmap[p..p + 4]) as usize;
251 if len == 0 {
252 break;
253 }
254 let s = p + REC_HDR;
255 let e = s + len;
256 if e > n {
257 break;
258 }
259 let stored_crc = read_u32_le(&mmap[p + 4..p + 8]);
260 if crc32(&mmap[s..e]) != stored_crc {
261 break;
262 }
263 p = e;
264 }
265 Ok(p as u64)
266}