beyond-handoff 0.1.2

Zero-downtime atomic binary handoff for long-running daemons
Documentation
//! Wire protocol message types and constants.
//!
//! The codec is [`postcard`] over a length-prefixed framing layer (see
//! [`crate::frame`]). The on-wire format of a single message is:
//!
//! ```text
//! u32 frame_len   (little-endian; covers everything after this field)
//! u16 proto_version (little-endian)
//! [u8] postcard-encoded `Message`
//! ```
//!
//! The `Message` enum's variant discriminant is encoded by postcard as part of
//! the payload, so we don't carry a separate `msg_type` byte on the wire.

use serde::{Deserialize, Serialize};
use uuid::Uuid;

/// Wire-protocol version. Bumped on any breaking payload change.
pub type ProtoVersion = u16;

/// Minimum version this build can speak.
pub const PROTO_MIN: ProtoVersion = 1;
/// Maximum version this build can speak. Equal to `PROTO_MIN` until we ship a v2.
pub const PROTO_MAX: ProtoVersion = 1;

/// The role a peer announces in `Hello`.
///
/// Distinct from [`crate::role::Role`] (which signals cold-start vs successor
/// on the primitive side). `Side` is the protocol-level identity over the wire.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Side {
    Incumbent,
    Successor,
}

/// Opaque per-handoff identifier. Generated by the supervisor in `HelloAck`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct HandoffId(pub Uuid);

impl HandoffId {
    pub fn new() -> Self {
        Self(Uuid::new_v4())
    }
}

impl Default for HandoffId {
    fn default() -> Self {
        Self::new()
    }
}

impl std::fmt::Display for HandoffId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        self.0.fmt(f)
    }
}

/// Reserved space for future feature negotiation flags.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Capabilities {
    pub reserved: u64,
}

/// One control-channel message. Variants correspond 1:1 to the rows of the
/// wire-protocol table in `ARCHITECTURE.md`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
    /// Peer announces its identity and supported protocol range.
    Hello {
        role: Side,
        pid: u32,
        /// Implementation-defined build identifier (e.g. GNU build-id, length 20).
        /// Variable length to accommodate sha-1 / sha-256 / git-sha hex.
        build_id: Vec<u8>,
        proto_min: ProtoVersion,
        proto_max: ProtoVersion,
        capabilities: Capabilities,
    },
    /// Supervisor confirms negotiated version and assigns the handoff id.
    HelloAck {
        proto_version_chosen: ProtoVersion,
        handoff_id: HandoffId,
    },
    /// S asks O to begin draining.
    PrepareHandoff {
        handoff_id: HandoffId,
        successor_pid: u32,
        deadline_ms: u64,
        drain_grace_ms: u64,
    },
    /// O reports drain complete.
    Drained {
        open_conns_remaining: u32,
        accept_closed: bool,
    },
    /// S asks O to seal.
    SealRequest { handoff_id: HandoffId },
    /// Optional progress heartbeat during a long-running seal.
    SealProgress {
        shards_sealed: u32,
        shards_total: u32,
        last_revision: u64,
    },
    /// O reports seal complete and has released the flock.
    SealComplete {
        handoff_id: HandoffId,
        last_revision_per_shard: Vec<u64>,
        data_dir_fingerprint: [u8; 32],
    },
    /// O reports seal failure; flock is still held; O can resume.
    SealFailed {
        handoff_id: HandoffId,
        error: String,
        partial_state: String,
    },
    /// S cues N to acquire the flock, open state, and start serving.
    Begin { handoff_id: HandoffId },
    /// N declares readiness; supervisor may now commit O.
    Ready {
        handoff_id: HandoffId,
        listening_on: Vec<String>,
        healthz_ok: bool,
        advertised_revision_per_shard: Vec<u64>,
    },
    /// S tells O it may exit.
    Commit { handoff_id: HandoffId },
    /// S tells O or N to abort the in-flight handoff and roll back.
    Abort {
        handoff_id: HandoffId,
        reason: String,
    },
    /// S tells O to reopen its writer state after a post-seal abort.
    ResumeAfterAbort { handoff_id: HandoffId },
    /// Liveness heartbeat. Each side emits one every second during a handoff;
    /// a 5s gap is treated as peer-died.
    Heartbeat { ts_ms: u64 },
}

/// Short, stable name for a [`Message`] variant. Used in error messages
/// and tracing where the full payload is noise. The name is the variant
/// identifier, nothing more — callers that want "unexpected for state X"
/// framing should compose it at the call site.
pub fn short_name(msg: &Message) -> &'static str {
    match msg {
        Message::Hello { .. } => "Hello",
        Message::HelloAck { .. } => "HelloAck",
        Message::PrepareHandoff { .. } => "PrepareHandoff",
        Message::Drained { .. } => "Drained",
        Message::SealRequest { .. } => "SealRequest",
        Message::SealProgress { .. } => "SealProgress",
        Message::SealComplete { .. } => "SealComplete",
        Message::SealFailed { .. } => "SealFailed",
        Message::Begin { .. } => "Begin",
        Message::Ready { .. } => "Ready",
        Message::Commit { .. } => "Commit",
        Message::Abort { .. } => "Abort",
        Message::ResumeAfterAbort { .. } => "ResumeAfterAbort",
        Message::Heartbeat { .. } => "Heartbeat",
    }
}

/// Negotiate a single protocol version from two `proto_min..=proto_max` ranges.
/// Returns the highest version both sides can speak, or [`Error::VersionMismatch`].
pub fn negotiate_version(
    our_min: ProtoVersion,
    our_max: ProtoVersion,
    their_min: ProtoVersion,
    their_max: ProtoVersion,
) -> crate::error::Result<ProtoVersion> {
    let lo = our_min.max(their_min);
    let hi = our_max.min(their_max);
    if lo > hi {
        Err(crate::error::Error::VersionMismatch {
            our_min,
            our_max,
            their_min,
            their_max,
        })
    } else {
        Ok(hi)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn negotiate_picks_highest_overlap() {
        assert_eq!(negotiate_version(1, 3, 2, 5).unwrap(), 3);
        assert_eq!(negotiate_version(1, 1, 1, 1).unwrap(), 1);
        assert_eq!(negotiate_version(1, 5, 3, 4).unwrap(), 4);
    }

    #[test]
    fn negotiate_rejects_disjoint() {
        assert!(matches!(
            negotiate_version(1, 1, 2, 2),
            Err(crate::error::Error::VersionMismatch { .. })
        ));
    }

    #[test]
    fn handoff_id_unique() {
        let a = HandoffId::new();
        let b = HandoffId::new();
        assert_ne!(a, b);
    }
}