reddb-io-file 1.11.0

RedDB file artifact layer: single-file .rdb layout, WAL, snapshots, checkpoints, locks, and recovery.
Documentation
use super::*;
use serde_json::{Map, Value as JsonValue};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaRebootstrapReadyMarker {
    pub pending_path: PathBuf,
    pub checkpoint_lsn: u64,
    pub timeline: TimelineId,
}

pub fn encode_rebootstrap_ready_marker_json(
    ready: &ReplicaRebootstrapReadyMarker,
) -> RdbFileResult<Vec<u8>> {
    let mut object = Map::new();
    object.insert(
        "pending_path".into(),
        JsonValue::String(ready.pending_path.display().to_string()),
    );
    object.insert(
        "checkpoint_lsn".into(),
        JsonValue::Number(ready.checkpoint_lsn.into()),
    );
    object.insert(
        "timeline".into(),
        JsonValue::Number(ready.timeline.0.into()),
    );
    serde_json::to_vec(&JsonValue::Object(object))
        .map_err(|err| RdbFileError::InvalidOperation(format!("encode rebootstrap marker: {err}")))
}

pub fn decode_rebootstrap_ready_marker_json(
    bytes: &[u8],
) -> RdbFileResult<ReplicaRebootstrapReadyMarker> {
    let value: JsonValue = serde_json::from_slice(bytes).map_err(|err| {
        RdbFileError::InvalidOperation(format!("decode rebootstrap marker: {err}"))
    })?;
    Ok(ReplicaRebootstrapReadyMarker {
        pending_path: value
            .get("pending_path")
            .and_then(JsonValue::as_str)
            .map(PathBuf::from)
            .ok_or_else(|| RdbFileError::InvalidOperation("missing pending_path".into()))?,
        checkpoint_lsn: value
            .get("checkpoint_lsn")
            .and_then(JsonValue::as_u64)
            .ok_or_else(|| RdbFileError::InvalidOperation("missing checkpoint_lsn".into()))?,
        timeline: TimelineId(
            value
                .get("timeline")
                .and_then(JsonValue::as_u64)
                .ok_or_else(|| RdbFileError::InvalidOperation("missing timeline".into()))?,
        ),
    })
}

pub fn write_rebootstrap_ready_marker(
    data_path: impl AsRef<Path>,
    ready: &ReplicaRebootstrapReadyMarker,
) -> RdbFileResult<()> {
    let marker_path = crate::layout::rebootstrap_ready_marker_path(data_path.as_ref());
    write_bytes_atomically(&marker_path, &encode_rebootstrap_ready_marker_json(ready)?)
}

pub fn read_rebootstrap_ready_marker(
    data_path: impl AsRef<Path>,
) -> RdbFileResult<ReplicaRebootstrapReadyMarker> {
    let data_path = data_path.as_ref();
    let marker_path = crate::layout::rebootstrap_ready_marker_path(data_path);
    let ready = decode_rebootstrap_ready_marker_json(&fs::read(marker_path)?)?;
    let expected_pending = crate::layout::rebootstrap_pending_path(data_path);
    if ready.pending_path != expected_pending {
        return Err(RdbFileError::InvalidOperation(
            "invalid rebootstrap pending_path".into(),
        ));
    }
    Ok(ready)
}

pub fn cleanup_rebootstrap_artifacts(data_path: impl AsRef<Path>) {
    let data_path = data_path.as_ref();
    let _ = fs::remove_dir_all(crate::layout::rebootstrap_staging_root(data_path));
    let _ = fs::remove_file(crate::layout::rebootstrap_pending_path(data_path));
    let _ = fs::remove_file(crate::layout::rebootstrap_ready_marker_path(data_path));
    let _ = fs::remove_file(crate::layout::rebootstrap_intent_log_path(data_path));
    let _ = fs::remove_file(crate::layout::rebootstrap_previous_path(data_path));
}

pub fn discard_ready_rebootstrap_marker(data_path: impl AsRef<Path>) -> RdbFileResult<()> {
    let marker_path = crate::layout::rebootstrap_ready_marker_path(data_path.as_ref());
    fs::remove_file(&marker_path)?;
    fsync_parent_dir(&marker_path);
    Ok(())
}

pub fn promote_rebootstrap_pending_database(data_path: impl AsRef<Path>) -> RdbFileResult<()> {
    let data_path = data_path.as_ref();
    let pending_path = crate::layout::rebootstrap_pending_path(data_path);
    let previous_path = crate::layout::rebootstrap_previous_path(data_path);

    if let Some(parent) = data_path.parent() {
        fs::create_dir_all(parent)?;
    }

    let _ = fs::remove_file(&previous_path);
    if data_path.exists() {
        fs::rename(data_path, &previous_path)?;
        fsync_parent_dir(data_path);
    }
    fs::rename(&pending_path, data_path)?;
    fsync_parent_dir(data_path);
    discard_ready_rebootstrap_marker(data_path)?;
    let _ = fs::remove_dir_all(crate::layout::rebootstrap_staging_root(data_path));
    Ok(())
}

fn fsync_parent_dir(path: &Path) {
    if let Some(parent) = path.parent() {
        let _ = fs::File::open(parent).and_then(|dir| dir.sync_all());
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn rebootstrap_ready_marker_json_round_trips() {
        let ready = ReplicaRebootstrapReadyMarker {
            pending_path: PathBuf::from("/tmp/main.rebootstrap.pending.rdb"),
            checkpoint_lsn: 42,
            timeline: TimelineId(3),
        };

        let body = encode_rebootstrap_ready_marker_json(&ready).unwrap();
        let text = String::from_utf8(body.clone()).unwrap();
        assert!(text.contains("\"pending_path\""));
        assert!(text.contains("\"checkpoint_lsn\":42"));
        assert!(text.contains("\"timeline\":3"));
        assert_eq!(decode_rebootstrap_ready_marker_json(&body).unwrap(), ready);
    }

    #[test]
    fn rebootstrap_ready_marker_file_round_trips_and_validates_pending_path() {
        let root = std::env::temp_dir().join(format!(
            "reddb-file-rebootstrap-ready-{}",
            std::process::id()
        ));
        let _ = fs::remove_dir_all(&root);
        fs::create_dir_all(&root).unwrap();
        let data_path = root.join("main.rdb");
        let ready = ReplicaRebootstrapReadyMarker {
            pending_path: crate::layout::rebootstrap_pending_path(&data_path),
            checkpoint_lsn: 99,
            timeline: TimelineId(4),
        };

        write_rebootstrap_ready_marker(&data_path, &ready).unwrap();
        assert_eq!(read_rebootstrap_ready_marker(&data_path).unwrap(), ready);

        let bad = ReplicaRebootstrapReadyMarker {
            pending_path: root.join("other.rdb"),
            checkpoint_lsn: 99,
            timeline: TimelineId(4),
        };
        fs::write(
            crate::layout::rebootstrap_ready_marker_path(&data_path),
            encode_rebootstrap_ready_marker_json(&bad).unwrap(),
        )
        .unwrap();
        assert!(read_rebootstrap_ready_marker(&data_path).is_err());

        let _ = fs::remove_dir_all(root);
    }

    #[test]
    fn cleanup_rebootstrap_artifacts_removes_all_sidecars() {
        let root = std::env::temp_dir().join(format!(
            "reddb-file-rebootstrap-cleanup-{}",
            std::process::id()
        ));
        let _ = fs::remove_dir_all(&root);
        fs::create_dir_all(&root).unwrap();
        let data_path = root.join("main.rdb");
        let staging_root = crate::layout::rebootstrap_staging_root(&data_path);
        fs::create_dir_all(&staging_root).unwrap();
        fs::write(
            crate::layout::rebootstrap_pending_path(&data_path),
            b"pending",
        )
        .unwrap();
        fs::write(
            crate::layout::rebootstrap_ready_marker_path(&data_path),
            b"ready",
        )
        .unwrap();
        fs::write(
            crate::layout::rebootstrap_intent_log_path(&data_path),
            b"intent",
        )
        .unwrap();
        fs::write(
            crate::layout::rebootstrap_previous_path(&data_path),
            b"previous",
        )
        .unwrap();

        cleanup_rebootstrap_artifacts(&data_path);

        assert!(!staging_root.exists());
        assert!(!crate::layout::rebootstrap_pending_path(&data_path).exists());
        assert!(!crate::layout::rebootstrap_ready_marker_path(&data_path).exists());
        assert!(!crate::layout::rebootstrap_intent_log_path(&data_path).exists());
        assert!(!crate::layout::rebootstrap_previous_path(&data_path).exists());

        let _ = fs::remove_dir_all(root);
    }

    #[test]
    fn promote_rebootstrap_pending_database_swaps_pending_and_cleans_ready_state() {
        let root = std::env::temp_dir().join(format!(
            "reddb-file-rebootstrap-promote-{}",
            std::process::id()
        ));
        let _ = fs::remove_dir_all(&root);
        fs::create_dir_all(&root).unwrap();
        let data_path = root.join("main.rdb");
        fs::write(&data_path, b"old").unwrap();
        fs::write(crate::layout::rebootstrap_pending_path(&data_path), b"new").unwrap();
        fs::write(
            crate::layout::rebootstrap_ready_marker_path(&data_path),
            b"ready",
        )
        .unwrap();
        let staging_root = crate::layout::rebootstrap_staging_root(&data_path);
        fs::create_dir_all(&staging_root).unwrap();

        promote_rebootstrap_pending_database(&data_path).unwrap();

        assert_eq!(fs::read(&data_path).unwrap(), b"new");
        assert_eq!(
            fs::read(crate::layout::rebootstrap_previous_path(&data_path)).unwrap(),
            b"old"
        );
        assert!(!crate::layout::rebootstrap_pending_path(&data_path).exists());
        assert!(!crate::layout::rebootstrap_ready_marker_path(&data_path).exists());
        assert!(!staging_root.exists());

        let _ = fs::remove_dir_all(root);
    }
}