1#![allow(clippy::disallowed_types, clippy::disallowed_methods)]
17
18use std::{
19 fs::{File, OpenOptions},
20 io::{self, Read, Seek, SeekFrom, Write},
21 path::{Path, PathBuf},
22 sync::Arc,
23};
24
25use obs_proto::{__private::Message, obs::v1::ObsEnvelope};
26use parking_lot::Mutex;
27
28use crate::config::{AuditFailureMode, AuditFsyncMode};
29
30const CRC32C_POLY: u32 = 0x82F63B78;
32
33#[must_use]
36pub fn crc32c(data: &[u8]) -> u32 {
37 let mut crc: u32 = !0;
38 for &b in data {
39 crc ^= u32::from(b);
40 for _ in 0..8 {
41 let mask = (crc & 1).wrapping_neg();
42 crc = (crc >> 1) ^ (CRC32C_POLY & mask);
43 }
44 }
45 !crc
46}
47
48#[derive(Debug)]
51pub struct SpoolWriter {
52 inner: Arc<Mutex<SpoolInner>>,
53 on_failure: AuditFailureMode,
54 fsync_mode: AuditFsyncMode,
55}
56
57#[derive(Debug)]
58struct SpoolInner {
59 dir: PathBuf,
60 bin: Option<File>,
61 crc: Option<File>,
62 bin_path: PathBuf,
63 crc_path: PathBuf,
64 bytes_written: u64,
65 max_bytes: u64,
66 pending_records: u32,
68}
69
70const FSYNC_BATCH_SIZE: u32 = 64;
75
76impl SpoolWriter {
77 pub fn open(
85 dir: impl Into<PathBuf>,
86 max_bytes: u64,
87 on_failure: AuditFailureMode,
88 ) -> io::Result<Self> {
89 Self::open_with_fsync(dir, max_bytes, on_failure, AuditFsyncMode::default())
90 }
91
92 pub fn open_with_fsync(
100 dir: impl Into<PathBuf>,
101 max_bytes: u64,
102 on_failure: AuditFailureMode,
103 fsync_mode: AuditFsyncMode,
104 ) -> io::Result<Self> {
105 let dir: PathBuf = dir.into();
106 std::fs::create_dir_all(&dir)?;
107 let stamp = batch_stamp();
108 let bin_path = dir.join(format!("{stamp}.audit.bin"));
109 let crc_path = dir.join(format!("{stamp}.audit.bin.crc"));
110 let bin = OpenOptions::new()
111 .create(true)
112 .append(true)
113 .open(&bin_path)?;
114 let crc = OpenOptions::new()
115 .create(true)
116 .append(true)
117 .open(&crc_path)?;
118 if !matches!(fsync_mode, AuditFsyncMode::None) {
123 let dir_handle = File::open(&dir)?;
124 let _ = dir_handle.sync_all();
125 }
126 Ok(Self {
127 inner: Arc::new(Mutex::new(SpoolInner {
128 dir,
129 bin: Some(bin),
130 crc: Some(crc),
131 bin_path,
132 crc_path,
133 bytes_written: 0,
134 max_bytes,
135 pending_records: 0,
136 })),
137 on_failure,
138 fsync_mode,
139 })
140 }
141
142 pub fn append(&self, env: &ObsEnvelope) -> io::Result<()> {
149 let mut buf = Vec::with_capacity(64 + env.encoded_len() as usize);
150 env.encode(&mut buf);
151 let len = buf.len() as u32;
152 let crc = crc32c(&buf);
153 let mut inner = self.inner.lock();
154 if inner.bytes_written.saturating_add(buf.len() as u64 + 4) > inner.max_bytes {
155 return Err(io::Error::other("audit spool full"));
158 }
159 let bin = inner
160 .bin
161 .as_mut()
162 .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "spool bin file missing"))?;
163 bin.write_all(&len.to_le_bytes())?;
164 bin.write_all(&buf)?;
165 bin.flush()?;
166 let crc_file = inner
167 .crc
168 .as_mut()
169 .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "spool crc file missing"))?;
170 crc_file.write_all(&crc.to_le_bytes())?;
171 crc_file.flush()?;
172 inner.bytes_written += buf.len() as u64 + 4;
173 inner.pending_records += 1;
174 let should_fsync = match self.fsync_mode {
179 AuditFsyncMode::None => false,
180 AuditFsyncMode::PerRecord => true,
181 AuditFsyncMode::PerBatch => inner.pending_records >= FSYNC_BATCH_SIZE,
182 };
183 if should_fsync {
184 if let Some(bin) = inner.bin.as_mut() {
187 bin.sync_data()?;
188 }
189 if let Some(crc) = inner.crc.as_mut() {
190 crc.sync_data()?;
191 }
192 inner.pending_records = 0;
193 }
194 Ok(())
195 }
196
197 pub fn fsync_now(&self) -> io::Result<()> {
205 let mut inner = self.inner.lock();
206 if let Some(bin) = inner.bin.as_mut() {
207 bin.sync_data()?;
208 }
209 if let Some(crc) = inner.crc.as_mut() {
210 crc.sync_data()?;
211 }
212 inner.pending_records = 0;
213 Ok(())
214 }
215
216 pub fn close(&self) {
219 let mut inner = self.inner.lock();
220 inner.bin.take();
221 inner.crc.take();
222 }
223
224 #[must_use]
227 pub fn on_failure(&self) -> AuditFailureMode {
228 self.on_failure
229 }
230
231 pub fn dir(&self) -> PathBuf {
233 self.inner.lock().dir.clone()
234 }
235
236 pub fn bin_path(&self) -> PathBuf {
238 self.inner.lock().bin_path.clone()
239 }
240
241 pub fn crc_path(&self) -> PathBuf {
243 self.inner.lock().crc_path.clone()
244 }
245}
246
247#[derive(Debug)]
249pub struct RecoveryReport {
250 pub path: PathBuf,
252 pub records: usize,
254 pub dropped: usize,
256}
257
258pub fn recover<F>(dir: &Path, mut consume: F) -> io::Result<Vec<RecoveryReport>>
267where
268 F: FnMut(ObsEnvelope) -> io::Result<()>,
269{
270 let mut reports = Vec::new();
271 if !dir.exists() {
272 return Ok(reports);
273 }
274 let entries = std::fs::read_dir(dir)?;
275 let mut bin_files: Vec<_> = entries
276 .filter_map(Result::ok)
277 .filter(|e| {
278 e.file_name()
279 .to_str()
280 .is_some_and(|n| n.ends_with(".audit.bin"))
281 })
282 .collect();
283 bin_files.sort_by_key(|e| {
284 e.metadata()
285 .and_then(|m| m.modified())
286 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
287 });
288 for entry in bin_files {
289 let bin_path = entry.path();
290 let crc_path = with_crc_suffix(&bin_path);
291 let report = recover_one(&bin_path, &crc_path, &mut consume)?;
292 let _ = std::fs::remove_file(&bin_path);
293 let _ = std::fs::remove_file(&crc_path);
294 reports.push(report);
295 }
296 Ok(reports)
297}
298
299fn with_crc_suffix(bin: &Path) -> PathBuf {
300 let mut s = bin.as_os_str().to_os_string();
301 s.push(".crc");
302 PathBuf::from(s)
303}
304
305fn recover_one<F>(bin_path: &Path, crc_path: &Path, consume: &mut F) -> io::Result<RecoveryReport>
306where
307 F: FnMut(ObsEnvelope) -> io::Result<()>,
308{
309 let mut bin = File::open(bin_path)?;
310 let mut crc = match File::open(crc_path) {
311 Ok(f) => Some(f),
312 Err(e) if e.kind() == io::ErrorKind::NotFound => None,
313 Err(e) => return Err(e),
314 };
315 let mut records = 0;
316 let mut dropped = 0;
317 loop {
318 let pos = bin.stream_position()?;
319 let mut len_buf = [0u8; 4];
320 match bin.read_exact(&mut len_buf) {
321 Ok(()) => {}
322 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
323 Err(e) => return Err(e),
324 }
325 let len = u32::from_le_bytes(len_buf) as usize;
326 let mut record = vec![0u8; len];
327 match bin.read_exact(&mut record) {
328 Ok(()) => {}
329 Err(_) => {
330 dropped += 1;
331 bin.seek(SeekFrom::Start(pos))?;
332 break;
333 }
334 }
335 let mut sidecar_buf = [0u8; 4];
336 let sidecar = if let Some(c) = crc.as_mut() {
337 match c.read_exact(&mut sidecar_buf) {
338 Ok(()) => Some(u32::from_le_bytes(sidecar_buf)),
339 Err(_) => None,
340 }
341 } else {
342 None
343 };
344 let actual = crc32c(&record);
345 if let Some(expected) = sidecar
346 && expected != actual
347 {
348 dropped += 1;
349 continue;
350 }
351 match ObsEnvelope::decode_from_slice(&record) {
352 Ok(env) => {
353 consume(env)?;
354 records += 1;
355 }
356 Err(_) => {
357 dropped += 1;
358 }
359 }
360 }
361 Ok(RecoveryReport {
362 path: bin_path.to_path_buf(),
363 records,
364 dropped,
365 })
366}
367
368fn batch_stamp() -> String {
369 use std::time::{SystemTime, UNIX_EPOCH};
370 let nanos = SystemTime::now()
371 .duration_since(UNIX_EPOCH)
372 .map(|d| d.as_nanos() as u64)
373 .unwrap_or(0);
374 let pid = std::process::id();
375 format!("{nanos:020}-{pid}")
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 fn env_with_name(name: &str) -> ObsEnvelope {
383 ObsEnvelope {
384 full_name: name.to_string(),
385 ts_ns: 1_700_000_000_000_000_000,
386 ..Default::default()
387 }
388 }
389
390 #[test]
391 fn test_crc32c_canonical_vector() {
392 assert_eq!(crc32c(b"123456789"), 0xE306_9283);
393 }
394
395 #[test]
396 fn test_round_trip_recovery() {
397 let dir = tempfile::tempdir().unwrap();
398 let writer = SpoolWriter::open(dir.path(), 1 << 20, AuditFailureMode::WarnOnly).unwrap();
399 let envs = (0..5)
400 .map(|i| env_with_name(&format!("test.v1.Audit{i}")))
401 .collect::<Vec<_>>();
402 for env in &envs {
403 writer.append(env).unwrap();
404 }
405 writer.close();
406 let mut recovered = Vec::new();
407 let reports = recover(dir.path(), |env| {
408 recovered.push(env);
409 Ok(())
410 })
411 .unwrap();
412 assert_eq!(recovered.len(), 5);
413 assert_eq!(reports.len(), 1);
414 assert_eq!(reports[0].records, 5);
415 assert_eq!(reports[0].dropped, 0);
416 }
417
418 #[test]
419 fn test_truncated_tail_is_discarded() {
420 let dir = tempfile::tempdir().unwrap();
421 let writer = SpoolWriter::open(dir.path(), 1 << 20, AuditFailureMode::WarnOnly).unwrap();
422 for i in 0..3 {
423 writer
424 .append(&env_with_name(&format!("test.v1.Trunc{i}")))
425 .unwrap();
426 }
427 let bin_path = writer.bin_path();
428 writer.close();
429 let mut data = std::fs::read(&bin_path).unwrap();
432 data.truncate(data.len() - 8);
433 std::fs::write(&bin_path, data).unwrap();
434 let mut recovered = Vec::new();
435 let _ = recover(dir.path(), |env| {
436 recovered.push(env);
437 Ok(())
438 })
439 .unwrap();
440 assert!(
441 recovered.len() < 3,
442 "truncation should drop the partial tail"
443 );
444 }
445}