eat-rocks 0.1.1

Restore a rocks database from object storage
Documentation
/// rocks backup meta file parse errors
#[derive(Debug, PartialEq, thiserror::Error)]
pub enum ParseError {
    #[error("empty meta file")]
    Empty,
    #[error("could not parse schema version from {0:?} (v1 and v2 supported)")]
    InvalidSchemaVersion(String),
    #[error("missing timestamp line")]
    MissingTimestamp,
    #[error("invalid timestamp: {0:?}")]
    InvalidTimestamp(String),
    #[error("missing sequence number line")]
    MissingSequenceNumber,
    #[error("invalid sequence number: {0:?}")]
    InvalidSequenceNumber(String),
    #[error("unexpected end of meta file while looking for file count")]
    UnexpectedEndBeforeFileCount,
    #[error("invalid file count: {0:?}")]
    InvalidFileCount(String),
    #[error("expected {expected} file entries but file ended after {actual}")]
    FileCountMismatch { expected: usize, actual: usize },
    #[error("empty file entry at position {0}")]
    EmptyFileEntry(usize),
    #[error("field {0:?} missing its value")]
    MissingFieldValue(String),
    #[error("invalid crc32 value: {0:?}")]
    InvalidCrc32(String),
    #[error("invalid size value: {0:?}")]
    InvalidSize(String),
    #[error("unrecognized non-ignorable field: {0:?}")]
    NonIgnorableField(String),
    #[error("unrecognized value {value:?} for field {field:?}")]
    UnrecognizedFieldValue { field: String, value: String },
}

/// Parsed contents of a rocks backup meta file (`meta/<id>`).
///
/// See [`BackupMeta::parse`] and the
/// [backup format docs](https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB).
#[derive(Debug, PartialEq)]
pub struct BackupMeta {
    pub timestamp: u64,
    pub sequence_number: u64,
    pub metadata: Option<String>,
    pub files: Vec<BackupFile>,
}

/// Each file entry in a rocks backup meta file
#[derive(Debug, PartialEq)]
pub struct BackupFile {
    /// path is relative to backup root, eg. `shared_checksum/000007_123_456.sst`
    pub path: String,
    pub crc32c: Option<u32>,
    pub size: Option<u64>,
    pub excluded: bool,
}

impl BackupMeta {
    /// Parse a RocksDB backup meta file.
    ///
    /// works with schema v1 (implicit) and v2.
    pub fn parse(content: &str) -> Result<Self, ParseError> {
        let mut lines = content.lines();

        let first = lines.next().ok_or(ParseError::Empty)?;

        // schema v1: first line is the timestamp.
        // schema v2 starts with "schema_version 2.x".
        let ts_line = if let Some(ver_str) = first.strip_prefix("schema_version ") {
            let Some("2") = ver_str.split('.').next() else {
                return Err(ParseError::InvalidSchemaVersion(ver_str.to_string()));
            };
            lines.next().ok_or(ParseError::MissingTimestamp)?
        } else {
            first
        };

        let timestamp: u64 = ts_line
            .parse()
            .map_err(|_| ParseError::InvalidTimestamp(ts_line.to_string()))?;

        let seq_line = lines.next().ok_or(ParseError::MissingSequenceNumber)?;
        let sequence_number: u64 = seq_line
            .parse()
            .map_err(|_| ParseError::InvalidSequenceNumber(seq_line.to_string()))?;

        // optional metadata lines before file count (first purely numeric line)
        let mut metadata = None;
        let num_files: usize = loop {
            let line = lines
                .next()
                .ok_or(ParseError::UnexpectedEndBeforeFileCount)?;
            if let Ok(n) = line.parse::<usize>() {
                break n;
            }
            if let Some(hex) = line.strip_prefix("metadata ") {
                metadata = Some(hex.to_string());
            } else if line.starts_with("ni::") {
                let field = line.split_whitespace().next().unwrap_or(line);
                return Err(ParseError::NonIgnorableField(field.to_string()));
            }
            // safe to skip unknown ignorable (non-"ni::") fields
        };

        // file entries, finally
        let mut files = Vec::new();
        for i in 0..num_files {
            let line = lines.next().ok_or(ParseError::FileCountMismatch {
                expected: num_files,
                actual: i, // 0-indexed so it's last round's count (what we actually did)
            })?;
            files.push(parse_file_line(line, i)?);
        }

        Ok(BackupMeta {
            timestamp,
            sequence_number,
            metadata,
            files,
        })
    }
}

fn parse_file_line(line: &str, position: usize) -> Result<BackupFile, ParseError> {
    let mut parts = line.split_whitespace();
    let path = parts
        .next()
        .ok_or(ParseError::EmptyFileEntry(position))?
        .to_string();

    let mut crc32c = None;
    let mut size = None;
    let mut excluded = false;

    while let Some(field_name) = parts.next() {
        let field_value = parts
            .next()
            .ok_or_else(|| ParseError::MissingFieldValue(field_name.to_string()))?;

        match field_name {
            "crc32" => {
                crc32c = Some(
                    field_value
                        .parse()
                        .map_err(|_| ParseError::InvalidCrc32(field_value.to_string()))?,
                );
            }
            "size" => {
                size = Some(
                    field_value
                        .parse()
                        .map_err(|_| ParseError::InvalidSize(field_value.to_string()))?,
                );
            }
            "ni::excluded" => match field_value {
                "true" => excluded = true,
                "false" => excluded = false,
                _ => {
                    return Err(ParseError::UnrecognizedFieldValue {
                        field: field_name.to_string(),
                        value: field_value.to_string(),
                    });
                }
            },
            "temp" => {}
            other if other.starts_with("ni::") => {
                return Err(ParseError::NonIgnorableField(other.to_string()));
            }
            _ => {} // unknown non-"ni" fields are safe to ignore
        }
    }

    Ok(BackupFile {
        path,
        crc32c,
        size,
        excluded,
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parse_schema_v1() {
        let content = "\
1498774076
590
3
private/1/CURRENT crc32 123456
private/1/MANIFEST-000008 crc32 789012
shared_checksum/000007_1498774076_590.sst crc32 345678";

        let meta = BackupMeta::parse(content).unwrap();
        assert_eq!(meta.timestamp, 1498774076);
        assert_eq!(meta.sequence_number, 590);
        assert_eq!(meta.files.len(), 3);
        assert_eq!(meta.files[0].path, "private/1/CURRENT");
        assert_eq!(meta.files[0].crc32c, Some(123456));
    }

    #[test]
    fn parse_schema_v2() {
        let content = "\
schema_version 2.1
1498774076
590
metadata 48656c6c6f
3
private/1/CURRENT crc32 123456 size 16
private/1/MANIFEST-000008 crc32 789012 size 1024
shared_checksum/000007_1498774076_590.sst crc32 345678 size 65536 temp kCold";

        let meta = BackupMeta::parse(content).unwrap();
        assert_eq!(meta.timestamp, 1498774076);
        assert_eq!(meta.sequence_number, 590);
        assert_eq!(meta.metadata.as_deref(), Some("48656c6c6f"));
        assert_eq!(meta.files.len(), 3);
        assert_eq!(meta.files[2].size, Some(65536));
    }

    #[test]
    fn parse_empty() {
        assert!(matches!(BackupMeta::parse(""), Err(ParseError::Empty)));
    }

    #[test]
    fn parse_unsupported_schema() {
        let content = "schema_version 3.0\n0\n0\n0\n";
        assert_eq!(
            BackupMeta::parse(content),
            Err(ParseError::InvalidSchemaVersion("3.0".to_string()))
        );
    }

    #[test]
    fn non_ignorable_field_rejected() {
        let content = "\
1498774076
590
1
private/1/CURRENT ni::unknown_field true";

        assert!(matches!(
            BackupMeta::parse(content),
            Err(ParseError::NonIgnorableField(_))
        ));
    }

    #[test]
    fn bogus_file_count_does_not_allocate() {
        // Regression: "2\n2\n64406400" claimed 64M files with 0 lines remaining,
        // causing a multi-GB Vec::with_capacity before the loop could fail.
        let content = "2\n2\n64406400";
        assert!(matches!(
            BackupMeta::parse(content),
            Err(ParseError::FileCountMismatch {
                expected: 64406400,
                actual: 0
            })
        ));
    }

    #[test]
    fn explicit_schema_v1_rejected() {
        let content = "schema_version 1.0\n1000\n100\n0\n";
        assert_eq!(
            BackupMeta::parse(content),
            Err(ParseError::InvalidSchemaVersion("1.0".to_string()))
        );
    }

    #[test]
    fn non_ignorable_header_field_rejected() {
        let content = "\
schema_version 2.1
1498774076
590
ni::future_breaking_field something
0";
        assert!(matches!(
            BackupMeta::parse(content),
            Err(ParseError::NonIgnorableField(_))
        ));
    }

    #[test]
    fn unknown_ignorable_header_field_skipped() {
        let content = "\
schema_version 2.1
1498774076
590
some_future_field data
0";
        let meta = BackupMeta::parse(content).unwrap();
        assert_eq!(meta.timestamp, 1498774076);
        assert_eq!(meta.files.len(), 0);
    }

    #[test]
    fn duplicate_metadata_uses_last() {
        let content = "\
schema_version 2.1
1498774076
590
metadata aaa
metadata bbb
0";
        let meta = BackupMeta::parse(content).unwrap();
        assert_eq!(meta.metadata.as_deref(), Some("bbb"));
    }

    #[test]
    fn file_fields_in_any_order() {
        let content = "\
1498774076
590
1
private/1/CURRENT size 16 crc32 123456 temp kHot";
        let meta = BackupMeta::parse(content).unwrap();
        assert_eq!(meta.files[0].crc32c, Some(123456));
        assert_eq!(meta.files[0].size, Some(16));
    }

    #[test]
    fn duplicate_file_fields_use_last() {
        let content = "\
1498774076
590
1
private/1/CURRENT crc32 111 crc32 222";
        let meta = BackupMeta::parse(content).unwrap();
        assert_eq!(meta.files[0].crc32c, Some(222));
    }

    #[test]
    fn unknown_ignorable_file_field_skipped() {
        let content = "\
1498774076
590
1
private/1/CURRENT crc32 123 future_field value123";
        let meta = BackupMeta::parse(content).unwrap();
        assert_eq!(meta.files[0].crc32c, Some(123));
    }

    #[test]
    fn excluded_bad_value_rejected() {
        let content = "\
1498774076
590
1
private/1/CURRENT ni::excluded banana";
        assert!(matches!(
            BackupMeta::parse(content),
            Err(ParseError::UnrecognizedFieldValue { .. })
        ));
    }

    #[test]
    fn excluded_file_parsed() {
        let content = "\
schema_version 2.1
1498774076
590
1
shared_checksum/000007_123_456.sst crc32 999 ni::excluded true";

        let meta = BackupMeta::parse(content).unwrap();
        assert!(meta.files[0].excluded);
    }
}