Skip to main content

obs_core/
audit_spool.rs

1//! AUDIT spool — binary length-prefixed envelope file with CRC32C tail
2//! integrity. Spec 11 § 6.4.
3//!
4//! Format:
5//!
6//! ```text
7//! audit-spool-record := u32_le_length || ObsEnvelope_buffa_bytes
8//! audit-spool-file   := record* (no header)
9//! audit-spool-crc    := u32_le crc per record (parallel `.crc` file)
10//! ```
11//!
12//! `std::fs` is used synchronously here because the AUDIT path runs
13//! on the emit thread (spec 11 § 6.4 documents the blocking trade-off);
14//! switching to `tokio::fs` would require a `block_on` round-trip per
15//! envelope, defeating the latency budget.
16#![allow(clippy::disallowed_types, clippy::disallowed_methods)]
17
18use std::{
19    fs::{File, OpenOptions},
20    io::{self, Read, Seek, SeekFrom, Write},
21    path::{Path, PathBuf},
22    sync::Arc,
23};
24
25use obs_proto::{__private::Message, obs::v1::ObsEnvelope};
26use parking_lot::Mutex;
27
28use crate::config::{AuditFailureMode, AuditFsyncMode};
29
30/// Polynomial used by CRC-32C / Castagnoli.
31const CRC32C_POLY: u32 = 0x82F63B78;
32
33/// Compute CRC32C (Castagnoli). Software implementation, fast enough
34/// for the AUDIT path's bounded throughput.
35#[must_use]
36pub fn crc32c(data: &[u8]) -> u32 {
37    let mut crc: u32 = !0;
38    for &b in data {
39        crc ^= u32::from(b);
40        for _ in 0..8 {
41            let mask = (crc & 1).wrapping_neg();
42            crc = (crc >> 1) ^ (CRC32C_POLY & mask);
43        }
44    }
45    !crc
46}
47
48/// One spool batch is bounded by record count or wall-clock time;
49/// larger of the two is kept simple here.
50#[derive(Debug)]
51pub struct SpoolWriter {
52    inner: Arc<Mutex<SpoolInner>>,
53    on_failure: AuditFailureMode,
54    fsync_mode: AuditFsyncMode,
55}
56
57#[derive(Debug)]
58struct SpoolInner {
59    dir: PathBuf,
60    bin: Option<File>,
61    crc: Option<File>,
62    bin_path: PathBuf,
63    crc_path: PathBuf,
64    bytes_written: u64,
65    max_bytes: u64,
66    /// Records appended since the last fsync; reset to 0 on flush.
67    pending_records: u32,
68}
69
70/// Records per fsync window when `AuditFsyncMode::PerBatch` is in
71/// effect. 64 was picked to balance throughput against the size of
72/// the durability window — at 1 KiB/record that is a 64 KiB blast
73/// radius per host crash, well under typical SSD page sizes.
74const FSYNC_BATCH_SIZE: u32 = 64;
75
76impl SpoolWriter {
77    /// Open a fresh batch in `dir`. Files are named
78    /// `<batch_id>.audit.bin` / `<batch_id>.audit.bin.crc`.
79    ///
80    /// # Errors
81    ///
82    /// Returns `io::Error` when the directory cannot be created or the
83    /// files cannot be opened for append.
84    pub fn open(
85        dir: impl Into<PathBuf>,
86        max_bytes: u64,
87        on_failure: AuditFailureMode,
88    ) -> io::Result<Self> {
89        Self::open_with_fsync(dir, max_bytes, on_failure, AuditFsyncMode::default())
90    }
91
92    /// Same as [`Self::open`] but with an explicit fsync policy. Spec
93    /// 11 § 6.4 / decision D6-5.
94    ///
95    /// # Errors
96    ///
97    /// Returns `io::Error` when the directory cannot be created or the
98    /// files cannot be opened for append.
99    pub fn open_with_fsync(
100        dir: impl Into<PathBuf>,
101        max_bytes: u64,
102        on_failure: AuditFailureMode,
103        fsync_mode: AuditFsyncMode,
104    ) -> io::Result<Self> {
105        let dir: PathBuf = dir.into();
106        std::fs::create_dir_all(&dir)?;
107        let stamp = batch_stamp();
108        let bin_path = dir.join(format!("{stamp}.audit.bin"));
109        let crc_path = dir.join(format!("{stamp}.audit.bin.crc"));
110        let bin = OpenOptions::new()
111            .create(true)
112            .append(true)
113            .open(&bin_path)?;
114        let crc = OpenOptions::new()
115            .create(true)
116            .append(true)
117            .open(&crc_path)?;
118        // After creating new spool files, fsync the parent directory
119        // so the directory entries themselves are durable. Without
120        // this, an immediate host crash can leave the entries
121        // unrecorded on ext4/APFS even though the file open succeeded.
122        if !matches!(fsync_mode, AuditFsyncMode::None) {
123            let dir_handle = File::open(&dir)?;
124            let _ = dir_handle.sync_all();
125        }
126        Ok(Self {
127            inner: Arc::new(Mutex::new(SpoolInner {
128                dir,
129                bin: Some(bin),
130                crc: Some(crc),
131                bin_path,
132                crc_path,
133                bytes_written: 0,
134                max_bytes,
135                pending_records: 0,
136            })),
137            on_failure,
138            fsync_mode,
139        })
140    }
141
142    /// Append one envelope to the spool. Returns `Err` only when the
143    /// underlying write or flush fails.
144    ///
145    /// # Errors
146    ///
147    /// I/O errors propagate from the underlying file writes.
148    pub fn append(&self, env: &ObsEnvelope) -> io::Result<()> {
149        let mut buf = Vec::with_capacity(64 + env.encoded_len() as usize);
150        env.encode(&mut buf);
151        let len = buf.len() as u32;
152        let crc = crc32c(&buf);
153        let mut inner = self.inner.lock();
154        if inner.bytes_written.saturating_add(buf.len() as u64 + 4) > inner.max_bytes {
155            // Surface as a write error; caller decides how to react
156            // per `audit.on_failure`.
157            return Err(io::Error::other("audit spool full"));
158        }
159        let bin = inner
160            .bin
161            .as_mut()
162            .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "spool bin file missing"))?;
163        bin.write_all(&len.to_le_bytes())?;
164        bin.write_all(&buf)?;
165        bin.flush()?;
166        let crc_file = inner
167            .crc
168            .as_mut()
169            .ok_or_else(|| io::Error::new(io::ErrorKind::BrokenPipe, "spool crc file missing"))?;
170        crc_file.write_all(&crc.to_le_bytes())?;
171        crc_file.flush()?;
172        inner.bytes_written += buf.len() as u64 + 4;
173        inner.pending_records += 1;
174        // Spec 11 § 6.4 / decision D6-5: apply the configured fsync
175        // policy. `flush()` only pushes data into the kernel buffer;
176        // `sync_data()` blocks until the disk acknowledges so a host
177        // crash cannot lose recently-appended records.
178        let should_fsync = match self.fsync_mode {
179            AuditFsyncMode::None => false,
180            AuditFsyncMode::PerRecord => true,
181            AuditFsyncMode::PerBatch => inner.pending_records >= FSYNC_BATCH_SIZE,
182        };
183        if should_fsync {
184            // Borrow each file separately to avoid holding two mutable
185            // borrows on `inner`.
186            if let Some(bin) = inner.bin.as_mut() {
187                bin.sync_data()?;
188            }
189            if let Some(crc) = inner.crc.as_mut() {
190                crc.sync_data()?;
191            }
192            inner.pending_records = 0;
193        }
194        Ok(())
195    }
196
197    /// Force an immediate fsync regardless of the configured policy.
198    /// Used by `Sink::flush` / `shutdown` paths to guarantee that all
199    /// AUDIT records on disk are durable before returning.
200    ///
201    /// # Errors
202    ///
203    /// Returns the underlying I/O error if `sync_data` fails.
204    pub fn fsync_now(&self) -> io::Result<()> {
205        let mut inner = self.inner.lock();
206        if let Some(bin) = inner.bin.as_mut() {
207            bin.sync_data()?;
208        }
209        if let Some(crc) = inner.crc.as_mut() {
210            crc.sync_data()?;
211        }
212        inner.pending_records = 0;
213        Ok(())
214    }
215
216    /// Close the current batch (the `.audit.bin` is left intact for
217    /// the drainer / a later process to recover).
218    pub fn close(&self) {
219        let mut inner = self.inner.lock();
220        inner.bin.take();
221        inner.crc.take();
222    }
223
224    /// Configured failure mode (used by the AUDIT path to decide
225    /// `panic` / `abort` / `warn_only` on append failure).
226    #[must_use]
227    pub fn on_failure(&self) -> AuditFailureMode {
228        self.on_failure
229    }
230
231    /// Spool dir (used by tests and the drainer).
232    pub fn dir(&self) -> PathBuf {
233        self.inner.lock().dir.clone()
234    }
235
236    /// Path of the active `.audit.bin` file (test helper).
237    pub fn bin_path(&self) -> PathBuf {
238        self.inner.lock().bin_path.clone()
239    }
240
241    /// Path of the active `.audit.bin.crc` file (test helper).
242    pub fn crc_path(&self) -> PathBuf {
243        self.inner.lock().crc_path.clone()
244    }
245}
246
247/// Outcome of recovering one spool file.
248#[derive(Debug)]
249pub struct RecoveryReport {
250    /// Path that was recovered.
251    pub path: PathBuf,
252    /// Number of valid records.
253    pub records: usize,
254    /// Number of records dropped due to CRC mismatch / truncation.
255    pub dropped: usize,
256}
257
258/// Walk `dir` for any `*.audit.bin` files, validate each record's
259/// CRC32C, and feed valid records to `consume`. CRC-mismatched tails
260/// are discarded; the `.audit.bin` and `.crc` files are deleted only
261/// after `consume` returns `Ok(())` for every valid record.
262///
263/// # Errors
264///
265/// I/O errors propagate from the underlying directory + file reads.
266pub fn recover<F>(dir: &Path, mut consume: F) -> io::Result<Vec<RecoveryReport>>
267where
268    F: FnMut(ObsEnvelope) -> io::Result<()>,
269{
270    let mut reports = Vec::new();
271    if !dir.exists() {
272        return Ok(reports);
273    }
274    let entries = std::fs::read_dir(dir)?;
275    let mut bin_files: Vec<_> = entries
276        .filter_map(Result::ok)
277        .filter(|e| {
278            e.file_name()
279                .to_str()
280                .is_some_and(|n| n.ends_with(".audit.bin"))
281        })
282        .collect();
283    bin_files.sort_by_key(|e| {
284        e.metadata()
285            .and_then(|m| m.modified())
286            .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
287    });
288    for entry in bin_files {
289        let bin_path = entry.path();
290        let crc_path = with_crc_suffix(&bin_path);
291        let report = recover_one(&bin_path, &crc_path, &mut consume)?;
292        let _ = std::fs::remove_file(&bin_path);
293        let _ = std::fs::remove_file(&crc_path);
294        reports.push(report);
295    }
296    Ok(reports)
297}
298
299fn with_crc_suffix(bin: &Path) -> PathBuf {
300    let mut s = bin.as_os_str().to_os_string();
301    s.push(".crc");
302    PathBuf::from(s)
303}
304
305fn recover_one<F>(bin_path: &Path, crc_path: &Path, consume: &mut F) -> io::Result<RecoveryReport>
306where
307    F: FnMut(ObsEnvelope) -> io::Result<()>,
308{
309    let mut bin = File::open(bin_path)?;
310    let mut crc = match File::open(crc_path) {
311        Ok(f) => Some(f),
312        Err(e) if e.kind() == io::ErrorKind::NotFound => None,
313        Err(e) => return Err(e),
314    };
315    let mut records = 0;
316    let mut dropped = 0;
317    loop {
318        let pos = bin.stream_position()?;
319        let mut len_buf = [0u8; 4];
320        match bin.read_exact(&mut len_buf) {
321            Ok(()) => {}
322            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
323            Err(e) => return Err(e),
324        }
325        let len = u32::from_le_bytes(len_buf) as usize;
326        let mut record = vec![0u8; len];
327        match bin.read_exact(&mut record) {
328            Ok(()) => {}
329            Err(_) => {
330                dropped += 1;
331                bin.seek(SeekFrom::Start(pos))?;
332                break;
333            }
334        }
335        let mut sidecar_buf = [0u8; 4];
336        let sidecar = if let Some(c) = crc.as_mut() {
337            match c.read_exact(&mut sidecar_buf) {
338                Ok(()) => Some(u32::from_le_bytes(sidecar_buf)),
339                Err(_) => None,
340            }
341        } else {
342            None
343        };
344        let actual = crc32c(&record);
345        if let Some(expected) = sidecar
346            && expected != actual
347        {
348            dropped += 1;
349            continue;
350        }
351        match ObsEnvelope::decode_from_slice(&record) {
352            Ok(env) => {
353                consume(env)?;
354                records += 1;
355            }
356            Err(_) => {
357                dropped += 1;
358            }
359        }
360    }
361    Ok(RecoveryReport {
362        path: bin_path.to_path_buf(),
363        records,
364        dropped,
365    })
366}
367
368fn batch_stamp() -> String {
369    use std::time::{SystemTime, UNIX_EPOCH};
370    let nanos = SystemTime::now()
371        .duration_since(UNIX_EPOCH)
372        .map(|d| d.as_nanos() as u64)
373        .unwrap_or(0);
374    let pid = std::process::id();
375    format!("{nanos:020}-{pid}")
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    fn env_with_name(name: &str) -> ObsEnvelope {
383        ObsEnvelope {
384            full_name: name.to_string(),
385            ts_ns: 1_700_000_000_000_000_000,
386            ..Default::default()
387        }
388    }
389
390    #[test]
391    fn test_crc32c_canonical_vector() {
392        assert_eq!(crc32c(b"123456789"), 0xE306_9283);
393    }
394
395    #[test]
396    fn test_round_trip_recovery() {
397        let dir = tempfile::tempdir().unwrap();
398        let writer = SpoolWriter::open(dir.path(), 1 << 20, AuditFailureMode::WarnOnly).unwrap();
399        let envs = (0..5)
400            .map(|i| env_with_name(&format!("test.v1.Audit{i}")))
401            .collect::<Vec<_>>();
402        for env in &envs {
403            writer.append(env).unwrap();
404        }
405        writer.close();
406        let mut recovered = Vec::new();
407        let reports = recover(dir.path(), |env| {
408            recovered.push(env);
409            Ok(())
410        })
411        .unwrap();
412        assert_eq!(recovered.len(), 5);
413        assert_eq!(reports.len(), 1);
414        assert_eq!(reports[0].records, 5);
415        assert_eq!(reports[0].dropped, 0);
416    }
417
418    #[test]
419    fn test_truncated_tail_is_discarded() {
420        let dir = tempfile::tempdir().unwrap();
421        let writer = SpoolWriter::open(dir.path(), 1 << 20, AuditFailureMode::WarnOnly).unwrap();
422        for i in 0..3 {
423            writer
424                .append(&env_with_name(&format!("test.v1.Trunc{i}")))
425                .unwrap();
426        }
427        let bin_path = writer.bin_path();
428        writer.close();
429        // Truncate the last record by chopping off the last 8 bytes —
430        // simulates a kill -9 between buffa.encode and fsync.
431        let mut data = std::fs::read(&bin_path).unwrap();
432        data.truncate(data.len() - 8);
433        std::fs::write(&bin_path, data).unwrap();
434        let mut recovered = Vec::new();
435        let _ = recover(dir.path(), |env| {
436            recovered.push(env);
437            Ok(())
438        })
439        .unwrap();
440        assert!(
441            recovered.len() < 3,
442            "truncation should drop the partial tail"
443        );
444    }
445}