Skip to main content

reddb_file/
blob_cache.rs

1use std::fs::File;
2use std::hash::{Hash, Hasher};
3use std::io::{self, Read, Write};
4use std::path::{Path, PathBuf};
5
6pub const L2_CONTROL_MAGIC: &[u8; 4] = b"RDB2";
7pub const L2_METADATA_MAGIC: &[u8; 4] = b"RDCM";
8pub const L2_BLOB_MAGIC: &[u8; 4] = b"RDCB";
9
10pub const L2_FORMAT_V1_RAW: u8 = 0;
11pub const L2_FORMAT_V2_FRAMED: u8 = 1;
12
13pub const L2_FRAME_TAG_RAW: u8 = 0;
14pub const L2_FRAME_TAG_ZSTD: u8 = 1;
15
16const L2_CONTROL_EXTENSION: &str = "blob-cache.ctl";
17const L2_CONTROL_TEMP_EXTENSION: &str = "ctl.tmp";
18const L2_DOUBLE_WRITE_EXTENSION: &str = "dwb";
19pub const L2_BACKUP_PAGER_SUFFIX: &str = "l2.pager";
20pub const L2_BACKUP_CONTROL_SUFFIX: &str = "l2.ctl";
21
22pub fn blob_cache_control_path(l2_path: &Path) -> PathBuf {
23    l2_path.with_extension(L2_CONTROL_EXTENSION)
24}
25
26pub fn blob_cache_control_temp_path(control_path: &Path) -> PathBuf {
27    control_path.with_extension(L2_CONTROL_TEMP_EXTENSION)
28}
29
30pub fn blob_cache_double_write_path(l2_path: impl AsRef<Path>) -> PathBuf {
31    l2_path.as_ref().with_extension(L2_DOUBLE_WRITE_EXTENSION)
32}
33
34pub fn blob_cache_l2_backup_pager_key(prefix: &str) -> String {
35    format!(
36        "{}{}",
37        normalize_backup_prefix(prefix),
38        L2_BACKUP_PAGER_SUFFIX
39    )
40}
41
42pub fn blob_cache_l2_backup_control_key(prefix: &str) -> String {
43    format!(
44        "{}{}",
45        normalize_backup_prefix(prefix),
46        L2_BACKUP_CONTROL_SUFFIX
47    )
48}
49
50fn normalize_backup_prefix(prefix: &str) -> String {
51    if prefix.is_empty() || prefix.ends_with('/') {
52        prefix.to_string()
53    } else {
54        format!("{prefix}/")
55    }
56}
57
58#[derive(Debug, Clone, Default)]
59pub struct L2Control {
60    pub metadata_root: u32,
61    pub bytes_in_use: u64,
62}
63
64impl L2Control {
65    pub fn read(path: &Path) -> io::Result<Self> {
66        if !path.exists() {
67            return Ok(Self::default());
68        }
69        let mut file = File::open(path)?;
70        let mut bytes = Vec::new();
71        file.read_to_end(&mut bytes)?;
72        if bytes.len() < 16 || &bytes[0..4] != L2_CONTROL_MAGIC {
73            return Err(invalid_data("invalid blob-cache L2 control file"));
74        }
75        Ok(Self {
76            metadata_root: u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
77            bytes_in_use: u64::from_le_bytes([
78                bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
79                bytes[15],
80            ]),
81        })
82    }
83
84    pub fn write(&self, path: &Path) -> io::Result<()> {
85        let mut bytes = Vec::with_capacity(16);
86        bytes.extend_from_slice(L2_CONTROL_MAGIC);
87        bytes.extend_from_slice(&self.metadata_root.to_le_bytes());
88        bytes.extend_from_slice(&self.bytes_in_use.to_le_bytes());
89        let tmp = blob_cache_control_temp_path(path);
90        {
91            let mut file = File::create(&tmp)?;
92            file.write_all(&bytes).and_then(|_| file.sync_all())?;
93        }
94        std::fs::rename(&tmp, path)
95    }
96}
97
98#[derive(Debug, Clone)]
99pub struct L2Record {
100    pub namespace: String,
101    pub key: String,
102    pub expires_at_unix_ms: Option<u64>,
103    pub namespace_generation: u64,
104    pub priority: u8,
105    pub version: Option<u64>,
106    pub root_page: u32,
107    pub page_count: u32,
108    pub byte_len: u64,
109    pub checksum: u32,
110    pub format_version: u8,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub enum L2BlobFrame {
115    Raw(Vec<u8>),
116    Zstd { bytes: Vec<u8>, original_len: u32 },
117}
118
119pub fn encode_l2_v2_frame(frame: &L2BlobFrame) -> Vec<u8> {
120    match frame {
121        L2BlobFrame::Raw(bytes) => {
122            let mut out = Vec::with_capacity(1 + bytes.len());
123            out.push(L2_FRAME_TAG_RAW);
124            out.extend_from_slice(bytes);
125            out
126        }
127        L2BlobFrame::Zstd {
128            bytes,
129            original_len,
130        } => {
131            let mut out = Vec::with_capacity(5 + bytes.len());
132            out.push(L2_FRAME_TAG_ZSTD);
133            out.extend_from_slice(&original_len.to_le_bytes());
134            out.extend_from_slice(bytes);
135            out
136        }
137    }
138}
139
140pub fn decode_l2_v2_frame(bytes: &[u8]) -> io::Result<L2BlobFrame> {
141    if bytes.is_empty() {
142        return Err(invalid_data("empty blob-cache L2 v2 frame"));
143    }
144    match bytes[0] {
145        L2_FRAME_TAG_RAW => Ok(L2BlobFrame::Raw(bytes[1..].to_vec())),
146        L2_FRAME_TAG_ZSTD => {
147            if bytes.len() < 5 {
148                return Err(invalid_data("truncated blob-cache L2 zstd frame"));
149            }
150            let original_len = u32::from_le_bytes(bytes[1..5].try_into().expect("len checked"));
151            Ok(L2BlobFrame::Zstd {
152                bytes: bytes[5..].to_vec(),
153                original_len,
154            })
155        }
156        other => Err(invalid_data(format!(
157            "unknown blob-cache L2 frame tag {other}"
158        ))),
159    }
160}
161
162impl L2Record {
163    pub fn encode(&self) -> Vec<u8> {
164        let mut out = Vec::with_capacity(96 + self.namespace.len() + self.key.len());
165        out.extend_from_slice(L2_METADATA_MAGIC);
166        write_l2_string(&mut out, &self.namespace);
167        write_l2_string(&mut out, &self.key);
168        out.extend_from_slice(&self.expires_at_unix_ms.unwrap_or(0).to_le_bytes());
169        out.extend_from_slice(&self.namespace_generation.to_le_bytes());
170        out.push(self.priority);
171        out.extend_from_slice(&self.version.unwrap_or(0).to_le_bytes());
172        out.extend_from_slice(&self.root_page.to_le_bytes());
173        out.extend_from_slice(&self.page_count.to_le_bytes());
174        out.extend_from_slice(&self.byte_len.to_le_bytes());
175        out.extend_from_slice(&self.checksum.to_le_bytes());
176        out.push(self.format_version);
177        out
178    }
179
180    pub fn decode(mut bytes: &[u8]) -> io::Result<Self> {
181        if bytes.len() < 4 || &bytes[0..4] != L2_METADATA_MAGIC {
182            return Err(invalid_data("invalid blob-cache L2 metadata"));
183        }
184        bytes = &bytes[4..];
185        let namespace = read_l2_string(&mut bytes)?;
186        let key = read_l2_string(&mut bytes)?;
187        if bytes.len() < 41 {
188            return Err(invalid_data("truncated blob-cache L2 metadata"));
189        }
190        let expires_at = u64::from_le_bytes(bytes[0..8].try_into().expect("len checked"));
191        let namespace_generation =
192            u64::from_le_bytes(bytes[8..16].try_into().expect("len checked"));
193        let priority = bytes[16];
194        let version = u64::from_le_bytes(bytes[17..25].try_into().expect("len checked"));
195        let root_page = u32::from_le_bytes(bytes[25..29].try_into().expect("len checked"));
196        let page_count = u32::from_le_bytes(bytes[29..33].try_into().expect("len checked"));
197        let byte_len = u64::from_le_bytes(bytes[33..41].try_into().expect("len checked"));
198        let checksum = if bytes.len() >= 45 {
199            u32::from_le_bytes(bytes[41..45].try_into().expect("len checked"))
200        } else {
201            0
202        };
203        let format_version = if bytes.len() >= 46 {
204            bytes[45]
205        } else {
206            L2_FORMAT_V1_RAW
207        };
208        Ok(Self {
209            namespace,
210            key,
211            expires_at_unix_ms: (expires_at != 0).then_some(expires_at),
212            namespace_generation,
213            priority,
214            version: (version != 0).then_some(version),
215            root_page,
216            page_count,
217            byte_len,
218            checksum,
219            format_version,
220        })
221    }
222
223    pub fn is_expired_at(&self, now_ms: u64) -> bool {
224        self.expires_at_unix_ms
225            .is_some_and(|expires_at| now_ms >= expires_at)
226    }
227}
228
229pub fn encode_l2_key(namespace: &str, key: &str) -> Vec<u8> {
230    let mut hasher = std::collections::hash_map::DefaultHasher::new();
231    namespace.hash(&mut hasher);
232    let namespace_hash = hasher.finish();
233    let mut hasher = std::collections::hash_map::DefaultHasher::new();
234    key.hash(&mut hasher);
235    let key_hash = hasher.finish();
236    let mut out = Vec::with_capacity(20 + namespace.len() + key.len());
237    out.extend_from_slice(&namespace_hash.to_be_bytes());
238    out.extend_from_slice(&key_hash.to_be_bytes());
239    write_l2_string(&mut out, namespace);
240    write_l2_string(&mut out, key);
241    out
242}
243
244fn write_l2_string(out: &mut Vec<u8>, value: &str) {
245    out.extend_from_slice(&(value.len() as u16).to_le_bytes());
246    out.extend_from_slice(value.as_bytes());
247}
248
249fn read_l2_string(bytes: &mut &[u8]) -> io::Result<String> {
250    if bytes.len() < 2 {
251        return Err(invalid_data("truncated blob-cache L2 string"));
252    }
253    let len = u16::from_le_bytes([bytes[0], bytes[1]]) as usize;
254    *bytes = &bytes[2..];
255    if bytes.len() < len {
256        return Err(invalid_data("truncated blob-cache L2 string"));
257    }
258    let value = std::str::from_utf8(&bytes[..len])
259        .map_err(|err| invalid_data(err.to_string()))?
260        .to_string();
261    *bytes = &bytes[len..];
262    Ok(value)
263}
264
265fn invalid_data(message: impl Into<String>) -> io::Error {
266    io::Error::new(io::ErrorKind::InvalidData, message.into())
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn control_round_trips() {
275        let path = std::env::temp_dir().join(format!(
276            "reddb-file-blob-cache-control-{}-{}.ctl",
277            std::process::id(),
278            unique_nanos()
279        ));
280        let control = L2Control {
281            metadata_root: 42,
282            bytes_in_use: 8192,
283        };
284        control.write(&path).unwrap();
285        assert_eq!(L2Control::read(&path).unwrap().metadata_root, 42);
286        assert_eq!(L2Control::read(&path).unwrap().bytes_in_use, 8192);
287        let _ = std::fs::remove_file(path);
288    }
289
290    #[test]
291    fn record_round_trips_and_keeps_legacy_format_default() {
292        let record = L2Record {
293            namespace: "ns".into(),
294            key: "key".into(),
295            expires_at_unix_ms: Some(123),
296            namespace_generation: 7,
297            priority: 9,
298            version: Some(11),
299            root_page: 13,
300            page_count: 17,
301            byte_len: 19,
302            checksum: 23,
303            format_version: L2_FORMAT_V2_FRAMED,
304        };
305        let decoded = L2Record::decode(&record.encode()).unwrap();
306        assert_eq!(decoded.namespace, "ns");
307        assert_eq!(decoded.key, "key");
308        assert_eq!(decoded.expires_at_unix_ms, Some(123));
309        assert_eq!(decoded.namespace_generation, 7);
310        assert_eq!(decoded.priority, 9);
311        assert_eq!(decoded.version, Some(11));
312        assert_eq!(decoded.root_page, 13);
313        assert_eq!(decoded.page_count, 17);
314        assert_eq!(decoded.byte_len, 19);
315        assert_eq!(decoded.checksum, 23);
316        assert_eq!(decoded.format_version, L2_FORMAT_V2_FRAMED);
317
318        let mut legacy = record.encode();
319        legacy.pop();
320        assert_eq!(
321            L2Record::decode(&legacy).unwrap().format_version,
322            L2_FORMAT_V1_RAW
323        );
324    }
325
326    #[test]
327    fn v2_frame_round_trips_raw_and_zstd_payloads() {
328        let raw = L2BlobFrame::Raw(b"payload".to_vec());
329        assert_eq!(decode_l2_v2_frame(&encode_l2_v2_frame(&raw)).unwrap(), raw);
330
331        let zstd = L2BlobFrame::Zstd {
332            bytes: b"compressed".to_vec(),
333            original_len: 1024,
334        };
335        assert_eq!(
336            decode_l2_v2_frame(&encode_l2_v2_frame(&zstd)).unwrap(),
337            zstd
338        );
339    }
340
341    #[test]
342    fn blob_cache_control_path_uses_stable_sidecar_extension() {
343        assert_eq!(
344            blob_cache_control_path(Path::new("/tmp/cache.rdb")),
345            PathBuf::from("/tmp/cache.blob-cache.ctl")
346        );
347        assert_eq!(
348            blob_cache_double_write_path(Path::new("/tmp/cache.rdb")),
349            PathBuf::from("/tmp/cache.dwb")
350        );
351    }
352
353    fn unique_nanos() -> u128 {
354        std::time::SystemTime::now()
355            .duration_since(std::time::UNIX_EPOCH)
356            .unwrap()
357            .as_nanos()
358    }
359}