Skip to main content

reddb_file/
spill.rs

1//! Spill file frame contract.
2//!
3//! Runtime cache policy lives in `reddb-server`; the durable spill frame lives
4//! here so the server does not define persisted bytes directly.
5
6use std::fmt;
7use std::path::{Path, PathBuf};
8
9pub const SPILL_FILE_MAGIC: [u8; 4] = *b"SPIL";
10pub const SPILL_FILE_VERSION_V1: u8 = 1;
11pub const SPILL_FILE_VERSION_V2: u8 = 2;
12pub const SPILL_FILE_HEADER_LEN: usize = 4 + 1 + 4 + 8;
13pub const SPILL_FILE_EXTENSION: &str = "spill";
14pub const DEFAULT_SPILL_DIR_NAME: &str = "reddb-spill";
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum SpillFileFrameError {
18    BadMagic,
19    UnsupportedVersion(u8),
20    ChecksumMismatch { expected: u32, actual: u32 },
21    Truncated,
22    SizeOverflow,
23}
24
25impl fmt::Display for SpillFileFrameError {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        match self {
28            Self::BadMagic => write!(f, "bad spill file magic"),
29            Self::UnsupportedVersion(version) => {
30                write!(f, "unsupported spill file version {version}")
31            }
32            Self::ChecksumMismatch { expected, actual } => write!(
33                f,
34                "spill file checksum mismatch: expected {expected:#010x}, got {actual:#010x}"
35            ),
36            Self::Truncated => write!(f, "truncated spill file frame"),
37            Self::SizeOverflow => write!(f, "spill file payload size overflows this platform"),
38        }
39    }
40}
41
42impl std::error::Error for SpillFileFrameError {}
43
44pub fn spill_file_name(segment: &str, pid: u32) -> String {
45    format!("{segment}-{pid}.{SPILL_FILE_EXTENSION}")
46}
47
48pub fn default_spill_dir() -> PathBuf {
49    std::env::temp_dir().join(DEFAULT_SPILL_DIR_NAME)
50}
51
52pub fn is_spill_file_path(path: &Path) -> bool {
53    path.extension()
54        .and_then(|extension| extension.to_str())
55        .is_some_and(|extension| extension == SPILL_FILE_EXTENSION)
56}
57
58pub fn encode_spill_file_frame(data: &[u8]) -> Vec<u8> {
59    let mut out = Vec::with_capacity(SPILL_FILE_HEADER_LEN + data.len());
60    out.extend_from_slice(&SPILL_FILE_MAGIC);
61    out.push(SPILL_FILE_VERSION_V2);
62    out.extend_from_slice(&crc32(data).to_le_bytes());
63    out.extend_from_slice(&(data.len() as u64).to_le_bytes());
64    out.extend_from_slice(data);
65    out
66}
67
68pub fn decode_spill_file_frame(bytes: &[u8]) -> Result<Vec<u8>, SpillFileFrameError> {
69    if bytes.len() < SPILL_FILE_HEADER_LEN {
70        return Err(SpillFileFrameError::Truncated);
71    }
72    if bytes[..4] != SPILL_FILE_MAGIC {
73        return Err(SpillFileFrameError::BadMagic);
74    }
75
76    let version = bytes[4];
77    let expected_checksum = u32::from_le_bytes(bytes[5..9].try_into().expect("checksum slice"));
78    let payload_len_u64 = u64::from_le_bytes(bytes[9..17].try_into().expect("size slice"));
79    let payload_len: usize = payload_len_u64
80        .try_into()
81        .map_err(|_| SpillFileFrameError::SizeOverflow)?;
82
83    let payload_end = SPILL_FILE_HEADER_LEN
84        .checked_add(payload_len)
85        .ok_or(SpillFileFrameError::SizeOverflow)?;
86    if bytes.len() < payload_end {
87        return Err(SpillFileFrameError::Truncated);
88    }
89
90    let payload = &bytes[SPILL_FILE_HEADER_LEN..payload_end];
91    let actual_checksum = match version {
92        SPILL_FILE_VERSION_V1 => legacy_v1_checksum(payload),
93        SPILL_FILE_VERSION_V2 => crc32(payload),
94        other => return Err(SpillFileFrameError::UnsupportedVersion(other)),
95    };
96    if actual_checksum != expected_checksum {
97        return Err(SpillFileFrameError::ChecksumMismatch {
98            expected: expected_checksum,
99            actual: actual_checksum,
100        });
101    }
102
103    Ok(payload.to_vec())
104}
105
106fn legacy_v1_checksum(data: &[u8]) -> u32 {
107    data.iter()
108        .fold(0u32, |acc, &byte| acc.wrapping_add(byte as u32))
109}
110
111fn crc32(data: &[u8]) -> u32 {
112    let mut hasher = crc32fast::Hasher::new();
113    hasher.update(data);
114    hasher.finalize()
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn v2_round_trip() {
123        let data: Vec<u8> = (0u8..=127).collect();
124        let frame = encode_spill_file_frame(&data);
125
126        assert_eq!(frame.len(), SPILL_FILE_HEADER_LEN + data.len());
127        assert_eq!(decode_spill_file_frame(&frame).unwrap(), data);
128    }
129
130    #[test]
131    fn reads_legacy_v1_checksum() {
132        let data = b"legacy spill";
133        let mut frame = Vec::new();
134        frame.extend_from_slice(&SPILL_FILE_MAGIC);
135        frame.push(SPILL_FILE_VERSION_V1);
136        frame.extend_from_slice(&legacy_v1_checksum(data).to_le_bytes());
137        frame.extend_from_slice(&(data.len() as u64).to_le_bytes());
138        frame.extend_from_slice(data);
139
140        assert_eq!(decode_spill_file_frame(&frame).unwrap(), data);
141    }
142
143    #[test]
144    fn rejects_single_byte_mutation() {
145        let data = b"hello world mutation test data";
146        let mut frame = encode_spill_file_frame(data);
147        frame[SPILL_FILE_HEADER_LEN] ^= 0xff;
148
149        assert!(matches!(
150            decode_spill_file_frame(&frame),
151            Err(SpillFileFrameError::ChecksumMismatch { .. })
152        ));
153    }
154
155    #[test]
156    fn rejects_byte_permutation() {
157        let data = b"abcdefghij";
158        let mut frame = encode_spill_file_frame(data);
159        frame.swap(SPILL_FILE_HEADER_LEN, SPILL_FILE_HEADER_LEN + 1);
160
161        assert!(matches!(
162            decode_spill_file_frame(&frame),
163            Err(SpillFileFrameError::ChecksumMismatch { .. })
164        ));
165    }
166}