reddb-io-file 1.11.0

RedDB file artifact layer: single-file .rdb layout, WAL, snapshots, checkpoints, locks, and recovery.
Documentation
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::io::{self, Read, Write};
use std::path::{Path, PathBuf};

pub const L2_CONTROL_MAGIC: &[u8; 4] = b"RDB2";
pub const L2_METADATA_MAGIC: &[u8; 4] = b"RDCM";
pub const L2_BLOB_MAGIC: &[u8; 4] = b"RDCB";

pub const L2_FORMAT_V1_RAW: u8 = 0;
pub const L2_FORMAT_V2_FRAMED: u8 = 1;

pub const L2_FRAME_TAG_RAW: u8 = 0;
pub const L2_FRAME_TAG_ZSTD: u8 = 1;

const L2_CONTROL_EXTENSION: &str = "blob-cache.ctl";
const L2_CONTROL_TEMP_EXTENSION: &str = "ctl.tmp";
const L2_DOUBLE_WRITE_EXTENSION: &str = "dwb";
pub const L2_BACKUP_PAGER_SUFFIX: &str = "l2.pager";
pub const L2_BACKUP_CONTROL_SUFFIX: &str = "l2.ctl";

pub fn blob_cache_control_path(l2_path: &Path) -> PathBuf {
    l2_path.with_extension(L2_CONTROL_EXTENSION)
}

pub fn blob_cache_control_temp_path(control_path: &Path) -> PathBuf {
    control_path.with_extension(L2_CONTROL_TEMP_EXTENSION)
}

pub fn blob_cache_double_write_path(l2_path: impl AsRef<Path>) -> PathBuf {
    l2_path.as_ref().with_extension(L2_DOUBLE_WRITE_EXTENSION)
}

pub fn blob_cache_l2_backup_pager_key(prefix: &str) -> String {
    format!(
        "{}{}",
        normalize_backup_prefix(prefix),
        L2_BACKUP_PAGER_SUFFIX
    )
}

pub fn blob_cache_l2_backup_control_key(prefix: &str) -> String {
    format!(
        "{}{}",
        normalize_backup_prefix(prefix),
        L2_BACKUP_CONTROL_SUFFIX
    )
}

fn normalize_backup_prefix(prefix: &str) -> String {
    if prefix.is_empty() || prefix.ends_with('/') {
        prefix.to_string()
    } else {
        format!("{prefix}/")
    }
}

#[derive(Debug, Clone, Default)]
pub struct L2Control {
    pub metadata_root: u32,
    pub bytes_in_use: u64,
}

impl L2Control {
    pub fn read(path: &Path) -> io::Result<Self> {
        if !path.exists() {
            return Ok(Self::default());
        }
        let mut file = File::open(path)?;
        let mut bytes = Vec::new();
        file.read_to_end(&mut bytes)?;
        if bytes.len() < 16 || &bytes[0..4] != L2_CONTROL_MAGIC {
            return Err(invalid_data("invalid blob-cache L2 control file"));
        }
        Ok(Self {
            metadata_root: u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
            bytes_in_use: u64::from_le_bytes([
                bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
                bytes[15],
            ]),
        })
    }

    pub fn write(&self, path: &Path) -> io::Result<()> {
        let mut bytes = Vec::with_capacity(16);
        bytes.extend_from_slice(L2_CONTROL_MAGIC);
        bytes.extend_from_slice(&self.metadata_root.to_le_bytes());
        bytes.extend_from_slice(&self.bytes_in_use.to_le_bytes());
        let tmp = blob_cache_control_temp_path(path);
        {
            let mut file = File::create(&tmp)?;
            file.write_all(&bytes).and_then(|_| file.sync_all())?;
        }
        std::fs::rename(&tmp, path)
    }
}

#[derive(Debug, Clone)]
pub struct L2Record {
    pub namespace: String,
    pub key: String,
    pub expires_at_unix_ms: Option<u64>,
    pub namespace_generation: u64,
    pub priority: u8,
    pub version: Option<u64>,
    pub root_page: u32,
    pub page_count: u32,
    pub byte_len: u64,
    pub checksum: u32,
    pub format_version: u8,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum L2BlobFrame {
    Raw(Vec<u8>),
    Zstd { bytes: Vec<u8>, original_len: u32 },
}

pub fn encode_l2_v2_frame(frame: &L2BlobFrame) -> Vec<u8> {
    match frame {
        L2BlobFrame::Raw(bytes) => {
            let mut out = Vec::with_capacity(1 + bytes.len());
            out.push(L2_FRAME_TAG_RAW);
            out.extend_from_slice(bytes);
            out
        }
        L2BlobFrame::Zstd {
            bytes,
            original_len,
        } => {
            let mut out = Vec::with_capacity(5 + bytes.len());
            out.push(L2_FRAME_TAG_ZSTD);
            out.extend_from_slice(&original_len.to_le_bytes());
            out.extend_from_slice(bytes);
            out
        }
    }
}

pub fn decode_l2_v2_frame(bytes: &[u8]) -> io::Result<L2BlobFrame> {
    if bytes.is_empty() {
        return Err(invalid_data("empty blob-cache L2 v2 frame"));
    }
    match bytes[0] {
        L2_FRAME_TAG_RAW => Ok(L2BlobFrame::Raw(bytes[1..].to_vec())),
        L2_FRAME_TAG_ZSTD => {
            if bytes.len() < 5 {
                return Err(invalid_data("truncated blob-cache L2 zstd frame"));
            }
            let original_len = u32::from_le_bytes(bytes[1..5].try_into().expect("len checked"));
            Ok(L2BlobFrame::Zstd {
                bytes: bytes[5..].to_vec(),
                original_len,
            })
        }
        other => Err(invalid_data(format!(
            "unknown blob-cache L2 frame tag {other}"
        ))),
    }
}

impl L2Record {
    pub fn encode(&self) -> Vec<u8> {
        let mut out = Vec::with_capacity(96 + self.namespace.len() + self.key.len());
        out.extend_from_slice(L2_METADATA_MAGIC);
        write_l2_string(&mut out, &self.namespace);
        write_l2_string(&mut out, &self.key);
        out.extend_from_slice(&self.expires_at_unix_ms.unwrap_or(0).to_le_bytes());
        out.extend_from_slice(&self.namespace_generation.to_le_bytes());
        out.push(self.priority);
        out.extend_from_slice(&self.version.unwrap_or(0).to_le_bytes());
        out.extend_from_slice(&self.root_page.to_le_bytes());
        out.extend_from_slice(&self.page_count.to_le_bytes());
        out.extend_from_slice(&self.byte_len.to_le_bytes());
        out.extend_from_slice(&self.checksum.to_le_bytes());
        out.push(self.format_version);
        out
    }

    pub fn decode(mut bytes: &[u8]) -> io::Result<Self> {
        if bytes.len() < 4 || &bytes[0..4] != L2_METADATA_MAGIC {
            return Err(invalid_data("invalid blob-cache L2 metadata"));
        }
        bytes = &bytes[4..];
        let namespace = read_l2_string(&mut bytes)?;
        let key = read_l2_string(&mut bytes)?;
        if bytes.len() < 41 {
            return Err(invalid_data("truncated blob-cache L2 metadata"));
        }
        let expires_at = u64::from_le_bytes(bytes[0..8].try_into().expect("len checked"));
        let namespace_generation =
            u64::from_le_bytes(bytes[8..16].try_into().expect("len checked"));
        let priority = bytes[16];
        let version = u64::from_le_bytes(bytes[17..25].try_into().expect("len checked"));
        let root_page = u32::from_le_bytes(bytes[25..29].try_into().expect("len checked"));
        let page_count = u32::from_le_bytes(bytes[29..33].try_into().expect("len checked"));
        let byte_len = u64::from_le_bytes(bytes[33..41].try_into().expect("len checked"));
        let checksum = if bytes.len() >= 45 {
            u32::from_le_bytes(bytes[41..45].try_into().expect("len checked"))
        } else {
            0
        };
        let format_version = if bytes.len() >= 46 {
            bytes[45]
        } else {
            L2_FORMAT_V1_RAW
        };
        Ok(Self {
            namespace,
            key,
            expires_at_unix_ms: (expires_at != 0).then_some(expires_at),
            namespace_generation,
            priority,
            version: (version != 0).then_some(version),
            root_page,
            page_count,
            byte_len,
            checksum,
            format_version,
        })
    }

    pub fn is_expired_at(&self, now_ms: u64) -> bool {
        self.expires_at_unix_ms
            .is_some_and(|expires_at| now_ms >= expires_at)
    }
}

pub fn encode_l2_key(namespace: &str, key: &str) -> Vec<u8> {
    let mut hasher = std::collections::hash_map::DefaultHasher::new();
    namespace.hash(&mut hasher);
    let namespace_hash = hasher.finish();
    let mut hasher = std::collections::hash_map::DefaultHasher::new();
    key.hash(&mut hasher);
    let key_hash = hasher.finish();
    let mut out = Vec::with_capacity(20 + namespace.len() + key.len());
    out.extend_from_slice(&namespace_hash.to_be_bytes());
    out.extend_from_slice(&key_hash.to_be_bytes());
    write_l2_string(&mut out, namespace);
    write_l2_string(&mut out, key);
    out
}

fn write_l2_string(out: &mut Vec<u8>, value: &str) {
    out.extend_from_slice(&(value.len() as u16).to_le_bytes());
    out.extend_from_slice(value.as_bytes());
}

fn read_l2_string(bytes: &mut &[u8]) -> io::Result<String> {
    if bytes.len() < 2 {
        return Err(invalid_data("truncated blob-cache L2 string"));
    }
    let len = u16::from_le_bytes([bytes[0], bytes[1]]) as usize;
    *bytes = &bytes[2..];
    if bytes.len() < len {
        return Err(invalid_data("truncated blob-cache L2 string"));
    }
    let value = std::str::from_utf8(&bytes[..len])
        .map_err(|err| invalid_data(err.to_string()))?
        .to_string();
    *bytes = &bytes[len..];
    Ok(value)
}

fn invalid_data(message: impl Into<String>) -> io::Error {
    io::Error::new(io::ErrorKind::InvalidData, message.into())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn control_round_trips() {
        let path = std::env::temp_dir().join(format!(
            "reddb-file-blob-cache-control-{}-{}.ctl",
            std::process::id(),
            unique_nanos()
        ));
        let control = L2Control {
            metadata_root: 42,
            bytes_in_use: 8192,
        };
        control.write(&path).unwrap();
        assert_eq!(L2Control::read(&path).unwrap().metadata_root, 42);
        assert_eq!(L2Control::read(&path).unwrap().bytes_in_use, 8192);
        let _ = std::fs::remove_file(path);
    }

    #[test]
    fn record_round_trips_and_keeps_legacy_format_default() {
        let record = L2Record {
            namespace: "ns".into(),
            key: "key".into(),
            expires_at_unix_ms: Some(123),
            namespace_generation: 7,
            priority: 9,
            version: Some(11),
            root_page: 13,
            page_count: 17,
            byte_len: 19,
            checksum: 23,
            format_version: L2_FORMAT_V2_FRAMED,
        };
        let decoded = L2Record::decode(&record.encode()).unwrap();
        assert_eq!(decoded.namespace, "ns");
        assert_eq!(decoded.key, "key");
        assert_eq!(decoded.expires_at_unix_ms, Some(123));
        assert_eq!(decoded.namespace_generation, 7);
        assert_eq!(decoded.priority, 9);
        assert_eq!(decoded.version, Some(11));
        assert_eq!(decoded.root_page, 13);
        assert_eq!(decoded.page_count, 17);
        assert_eq!(decoded.byte_len, 19);
        assert_eq!(decoded.checksum, 23);
        assert_eq!(decoded.format_version, L2_FORMAT_V2_FRAMED);

        let mut legacy = record.encode();
        legacy.pop();
        assert_eq!(
            L2Record::decode(&legacy).unwrap().format_version,
            L2_FORMAT_V1_RAW
        );
    }

    #[test]
    fn v2_frame_round_trips_raw_and_zstd_payloads() {
        let raw = L2BlobFrame::Raw(b"payload".to_vec());
        assert_eq!(decode_l2_v2_frame(&encode_l2_v2_frame(&raw)).unwrap(), raw);

        let zstd = L2BlobFrame::Zstd {
            bytes: b"compressed".to_vec(),
            original_len: 1024,
        };
        assert_eq!(
            decode_l2_v2_frame(&encode_l2_v2_frame(&zstd)).unwrap(),
            zstd
        );
    }

    #[test]
    fn blob_cache_control_path_uses_stable_sidecar_extension() {
        assert_eq!(
            blob_cache_control_path(Path::new("/tmp/cache.rdb")),
            PathBuf::from("/tmp/cache.blob-cache.ctl")
        );
        assert_eq!(
            blob_cache_double_write_path(Path::new("/tmp/cache.rdb")),
            PathBuf::from("/tmp/cache.dwb")
        );
    }

    fn unique_nanos() -> u128 {
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_nanos()
    }
}