reddb-io-wire 1.12.0

RedDB wire protocol vocabulary: connection-string parser, RedWire frames, payload codecs, topology, sanitizers, and replication messages.
Documentation
use serde_json::Value as JsonValue;

use super::util::{get_opt_string, get_opt_u64, get_string, get_u64, object_from_slice, Result};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineForkNotice {
    pub parent_timeline: u64,
    pub new_timeline: u64,
    pub fork_lsn: u64,
    pub promoted_replica_id: Option<String>,
    pub created_at_unix_ms: Option<u64>,
}

impl TimelineForkNotice {
    pub fn encode_json(&self) -> Vec<u8> {
        let mut obj = serde_json::Map::new();
        obj.insert(
            "parent_timeline".to_string(),
            JsonValue::Number(self.parent_timeline.into()),
        );
        obj.insert(
            "new_timeline".to_string(),
            JsonValue::Number(self.new_timeline.into()),
        );
        obj.insert(
            "fork_lsn".to_string(),
            JsonValue::Number(self.fork_lsn.into()),
        );
        if let Some(replica_id) = &self.promoted_replica_id {
            obj.insert(
                "promoted_replica_id".to_string(),
                JsonValue::String(replica_id.clone()),
            );
        }
        if let Some(created_at) = self.created_at_unix_ms {
            obj.insert(
                "created_at_unix_ms".to_string(),
                JsonValue::Number(created_at.into()),
            );
        }
        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
    }

    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
        let obj = object_from_slice(bytes)?;
        Ok(Self {
            parent_timeline: get_u64(&obj, "parent_timeline")?,
            new_timeline: get_u64(&obj, "new_timeline")?,
            fork_lsn: get_u64(&obj, "fork_lsn")?,
            promoted_replica_id: get_opt_string(&obj, "promoted_replica_id"),
            created_at_unix_ms: get_opt_u64(&obj, "created_at_unix_ms"),
        })
    }

    pub fn decode_legacy_rejoin_plan(bytes: &[u8]) -> Result<Self> {
        let obj = object_from_slice(bytes)?;
        Ok(Self {
            parent_timeline: get_opt_u64(&obj, "rejoin_node_timeline").unwrap_or(0),
            new_timeline: get_u64(&obj, "rejoin_target_timeline")?,
            fork_lsn: get_u64(&obj, "rejoin_start_lsn")
                .or_else(|_| get_u64(&obj, "rejoin_rewind_to_lsn"))?,
            promoted_replica_id: get_opt_string(&obj, "promoted_replica_id")
                .or_else(|| get_opt_string(&obj, "replica_id")),
            created_at_unix_ms: None,
        })
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RejoinPlanNotice {
    pub state: String,
    pub node_timeline: u64,
    pub node_flushed_lsn: u64,
    pub available_from_lsn: u64,
    pub target_timeline: u64,
    pub rewind_to_lsn: Option<u64>,
    pub start_lsn: u64,
}

impl RejoinPlanNotice {
    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
        let obj = object_from_slice(bytes)?;
        Ok(Self {
            state: get_string(&obj, "state")?,
            node_timeline: get_u64(&obj, "rejoin_node_timeline")?,
            node_flushed_lsn: get_u64(&obj, "rejoin_node_flushed_lsn")?,
            available_from_lsn: get_u64(&obj, "rejoin_available_from_lsn")?,
            target_timeline: get_u64(&obj, "rejoin_target_timeline")?,
            rewind_to_lsn: get_opt_u64(&obj, "rejoin_rewind_to_lsn"),
            start_lsn: get_u64(&obj, "rejoin_start_lsn")?,
        })
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RejoinRewindConfirmation {
    pub target_timeline: u64,
    pub rewind_to_lsn: u64,
}

impl RejoinRewindConfirmation {
    pub fn encode_json(&self) -> Vec<u8> {
        let mut obj = serde_json::Map::new();
        obj.insert(
            "target_timeline".to_string(),
            JsonValue::Number(self.target_timeline.into()),
        );
        obj.insert(
            "rewind_to_lsn".to_string(),
            JsonValue::Number(self.rewind_to_lsn.into()),
        );
        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
    }

    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
        let obj = object_from_slice(bytes)?;
        Ok(Self {
            target_timeline: get_u64(&obj, "target_timeline")?,
            rewind_to_lsn: get_u64(&obj, "rewind_to_lsn")?,
        })
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RejoinRewindConfirmationReply {
    pub ok: bool,
    pub target_timeline: u64,
    pub rewind_to_lsn: u64,
    pub next_step: String,
}

impl RejoinRewindConfirmationReply {
    pub fn confirmed(target_timeline: u64, rewind_to_lsn: u64) -> Self {
        Self {
            ok: true,
            target_timeline,
            rewind_to_lsn,
            next_step: "restart or resume replica apply from the confirmed LSN".to_string(),
        }
    }

    pub fn encode_json(&self) -> Vec<u8> {
        let mut obj = serde_json::Map::new();
        obj.insert("ok".to_string(), JsonValue::Bool(self.ok));
        obj.insert(
            "target_timeline".to_string(),
            JsonValue::Number(self.target_timeline.into()),
        );
        obj.insert(
            "rewind_to_lsn".to_string(),
            JsonValue::Number(self.rewind_to_lsn.into()),
        );
        obj.insert(
            "next_step".to_string(),
            JsonValue::String(self.next_step.clone()),
        );
        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
    }

    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
        let obj = object_from_slice(bytes)?;
        Ok(Self {
            ok: obj.get("ok").and_then(JsonValue::as_bool).unwrap_or(false),
            target_timeline: get_u64(&obj, "target_timeline")?,
            rewind_to_lsn: get_u64(&obj, "rewind_to_lsn")?,
            next_step: get_string(&obj, "next_step")?,
        })
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailoverPromotionRequest {
    pub holder_id: Option<String>,
    pub ttl_ms: Option<u64>,
}

impl FailoverPromotionRequest {
    pub fn encode_json(&self) -> Vec<u8> {
        let mut obj = serde_json::Map::new();
        if let Some(holder_id) = &self.holder_id {
            obj.insert(
                "holder_id".to_string(),
                JsonValue::String(holder_id.clone()),
            );
        }
        if let Some(ttl_ms) = self.ttl_ms.filter(|ttl| *ttl > 0) {
            obj.insert("ttl_ms".to_string(), JsonValue::Number(ttl_ms.into()));
        }
        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
    }

    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
        let obj = object_from_slice(bytes)?;
        Ok(Self {
            holder_id: get_opt_string(&obj, "holder_id"),
            ttl_ms: get_opt_u64(&obj, "ttl_ms").filter(|ttl| *ttl > 0),
        })
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailoverPromotionReply {
    pub ok: bool,
    pub holder_id: String,
    pub generation: u64,
    pub acquired_at_ms: u64,
    pub expires_at_ms: u64,
    pub timeline: u64,
    pub applied_lsn: u64,
    pub next_step: String,
}

impl FailoverPromotionReply {
    pub fn promoted(
        holder_id: impl Into<String>,
        generation: u64,
        acquired_at_ms: u64,
        expires_at_ms: u64,
        timeline: u64,
        applied_lsn: u64,
    ) -> Self {
        Self {
            ok: true,
            holder_id: holder_id.into(),
            generation,
            acquired_at_ms,
            expires_at_ms,
            timeline,
            applied_lsn,
            next_step: "restart with RED_REPLICATION_MODE=primary to start accepting writes"
                .to_string(),
        }
    }

    pub fn encode_json(&self) -> Vec<u8> {
        let mut obj = serde_json::Map::new();
        obj.insert("ok".to_string(), JsonValue::Bool(self.ok));
        obj.insert(
            "holder_id".to_string(),
            JsonValue::String(self.holder_id.clone()),
        );
        obj.insert(
            "generation".to_string(),
            JsonValue::Number(self.generation.into()),
        );
        obj.insert(
            "acquired_at_ms".to_string(),
            JsonValue::Number(self.acquired_at_ms.into()),
        );
        obj.insert(
            "expires_at_ms".to_string(),
            JsonValue::Number(self.expires_at_ms.into()),
        );
        obj.insert(
            "timeline".to_string(),
            JsonValue::Number(self.timeline.into()),
        );
        obj.insert(
            "applied_lsn".to_string(),
            JsonValue::Number(self.applied_lsn.into()),
        );
        obj.insert(
            "next_step".to_string(),
            JsonValue::String(self.next_step.clone()),
        );
        serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
    }

    pub fn decode_json(bytes: &[u8]) -> Result<Self> {
        let obj = object_from_slice(bytes)?;
        Ok(Self {
            ok: obj.get("ok").and_then(JsonValue::as_bool).unwrap_or(false),
            holder_id: get_string(&obj, "holder_id")?,
            generation: get_u64(&obj, "generation")?,
            acquired_at_ms: get_u64(&obj, "acquired_at_ms")?,
            expires_at_ms: get_u64(&obj, "expires_at_ms")?,
            timeline: get_u64(&obj, "timeline")?,
            applied_lsn: get_u64(&obj, "applied_lsn")?,
            next_step: get_string(&obj, "next_step")?,
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn timeline_fork_notice_round_trips() {
        let notice = TimelineForkNotice {
            parent_timeline: 1,
            new_timeline: 2,
            fork_lsn: 42,
            promoted_replica_id: Some("replica-a".to_string()),
            created_at_unix_ms: Some(1000),
        };
        assert_eq!(
            TimelineForkNotice::decode_json(&notice.encode_json()).unwrap(),
            notice
        );
    }

    #[test]
    fn rejoin_plan_notice_decodes_existing_runtime_shape() {
        let plan = RejoinPlanNotice::decode_json(
            br#"{
                "state":"rejoin_rewind_required",
                "rejoin_node_timeline":1,
                "rejoin_node_flushed_lsn":60,
                "rejoin_available_from_lsn":40,
                "rejoin_target_timeline":3,
                "rejoin_rewind_to_lsn":42,
                "rejoin_start_lsn":42
            }"#,
        )
        .unwrap();
        assert_eq!(plan.target_timeline, 3);
        assert_eq!(plan.rewind_to_lsn, Some(42));
    }

    #[test]
    fn rejoin_rewind_confirmation_contract_round_trips() {
        let request = RejoinRewindConfirmation {
            target_timeline: 3,
            rewind_to_lsn: 42,
        };
        let request = RejoinRewindConfirmation::decode_json(&request.encode_json()).unwrap();
        assert_eq!(request.target_timeline, 3);
        assert_eq!(request.rewind_to_lsn, 42);

        let reply = RejoinRewindConfirmationReply::confirmed(3, 42);
        assert_eq!(
            RejoinRewindConfirmationReply::decode_json(&reply.encode_json()).unwrap(),
            reply
        );
    }

    #[test]
    fn failover_promotion_payloads_round_trip() {
        let request = FailoverPromotionRequest {
            holder_id: Some("replica-a".to_string()),
            ttl_ms: Some(30_000),
        };
        let request = FailoverPromotionRequest::decode_json(&request.encode_json()).unwrap();
        assert_eq!(request.holder_id.as_deref(), Some("replica-a"));
        assert_eq!(request.ttl_ms, Some(30_000));

        let reply = FailoverPromotionReply::promoted("replica-a", 7, 100, 200, 2, 42);
        assert_eq!(
            FailoverPromotionReply::decode_json(&reply.encode_json()).unwrap(),
            reply
        );
    }
}