Skip to main content

handoff/
protocol.rs

1//! Wire protocol message types and constants.
2//!
3//! The codec is [`postcard`] over a length-prefixed framing layer (see
4//! [`crate::frame`]). The on-wire format of a single message is:
5//!
6//! ```text
7//! u32 frame_len   (little-endian; covers everything after this field)
8//! u16 proto_version (little-endian)
9//! [u8] postcard-encoded `Message`
10//! ```
11//!
12//! The `Message` enum's variant discriminant is encoded by postcard as part of
13//! the payload, so we don't carry a separate `msg_type` byte on the wire.
14
15use serde::{Deserialize, Serialize};
16use uuid::Uuid;
17
18/// Wire-protocol version. Bumped on any breaking payload change.
19pub type ProtoVersion = u16;
20
21/// Minimum version this build can speak.
22pub const PROTO_MIN: ProtoVersion = 1;
23/// Maximum version this build can speak. Equal to `PROTO_MIN` until we ship a v2.
24pub const PROTO_MAX: ProtoVersion = 1;
25
26/// The role a peer announces in `Hello`.
27///
28/// Distinct from [`crate::role::Role`] (which signals cold-start vs successor
29/// on the primitive side). `Side` is the protocol-level identity over the wire.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31pub enum Side {
32    Incumbent,
33    Successor,
34}
35
36/// Opaque per-handoff identifier. Generated by the supervisor in `HelloAck`.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct HandoffId(pub Uuid);
39
40impl HandoffId {
41    pub fn new() -> Self {
42        Self(Uuid::new_v4())
43    }
44}
45
46impl Default for HandoffId {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl std::fmt::Display for HandoffId {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        self.0.fmt(f)
55    }
56}
57
58/// Reserved space for future feature negotiation flags.
59#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
60pub struct Capabilities {
61    pub reserved: u64,
62}
63
64/// One control-channel message. Variants correspond 1:1 to the rows of the
65/// wire-protocol table in `ARCHITECTURE.md`.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub enum Message {
68    /// Peer announces its identity and supported protocol range.
69    Hello {
70        role: Side,
71        pid: u32,
72        /// Implementation-defined build identifier (e.g. GNU build-id, length 20).
73        /// Variable length to accommodate sha-1 / sha-256 / git-sha hex.
74        build_id: Vec<u8>,
75        proto_min: ProtoVersion,
76        proto_max: ProtoVersion,
77        capabilities: Capabilities,
78    },
79    /// Supervisor confirms negotiated version and assigns the handoff id.
80    HelloAck {
81        proto_version_chosen: ProtoVersion,
82        handoff_id: HandoffId,
83    },
84    /// S asks O to begin draining.
85    PrepareHandoff {
86        handoff_id: HandoffId,
87        successor_pid: u32,
88        deadline_ms: u64,
89        drain_grace_ms: u64,
90    },
91    /// O reports drain complete.
92    Drained {
93        open_conns_remaining: u32,
94        accept_closed: bool,
95    },
96    /// S asks O to seal.
97    SealRequest { handoff_id: HandoffId },
98    /// Optional progress heartbeat during a long-running seal.
99    SealProgress {
100        shards_sealed: u32,
101        shards_total: u32,
102        last_revision: u64,
103    },
104    /// O reports seal complete and has released the flock.
105    SealComplete {
106        handoff_id: HandoffId,
107        last_revision_per_shard: Vec<u64>,
108        data_dir_fingerprint: [u8; 32],
109    },
110    /// O reports seal failure; flock is still held; O can resume.
111    SealFailed {
112        handoff_id: HandoffId,
113        error: String,
114        partial_state: String,
115    },
116    /// S cues N to acquire the flock, open state, and start serving.
117    Begin { handoff_id: HandoffId },
118    /// N declares readiness; supervisor may now commit O.
119    Ready {
120        handoff_id: HandoffId,
121        listening_on: Vec<String>,
122        healthz_ok: bool,
123        advertised_revision_per_shard: Vec<u64>,
124    },
125    /// S tells O it may exit.
126    Commit { handoff_id: HandoffId },
127    /// S tells O or N to abort the in-flight handoff and roll back.
128    Abort {
129        handoff_id: HandoffId,
130        reason: String,
131    },
132    /// S tells O to reopen its writer state after a post-seal abort.
133    ResumeAfterAbort { handoff_id: HandoffId },
134    /// Liveness heartbeat. Each side emits one every second during a handoff;
135    /// a 5s gap is treated as peer-died.
136    Heartbeat { ts_ms: u64 },
137}
138
139/// Short, stable name for a [`Message`] variant. Used in error messages
140/// and tracing where the full payload is noise. The name is the variant
141/// identifier, nothing more — callers that want "unexpected for state X"
142/// framing should compose it at the call site.
143pub fn short_name(msg: &Message) -> &'static str {
144    match msg {
145        Message::Hello { .. } => "Hello",
146        Message::HelloAck { .. } => "HelloAck",
147        Message::PrepareHandoff { .. } => "PrepareHandoff",
148        Message::Drained { .. } => "Drained",
149        Message::SealRequest { .. } => "SealRequest",
150        Message::SealProgress { .. } => "SealProgress",
151        Message::SealComplete { .. } => "SealComplete",
152        Message::SealFailed { .. } => "SealFailed",
153        Message::Begin { .. } => "Begin",
154        Message::Ready { .. } => "Ready",
155        Message::Commit { .. } => "Commit",
156        Message::Abort { .. } => "Abort",
157        Message::ResumeAfterAbort { .. } => "ResumeAfterAbort",
158        Message::Heartbeat { .. } => "Heartbeat",
159    }
160}
161
162/// Negotiate a single protocol version from two `proto_min..=proto_max` ranges.
163/// Returns the highest version both sides can speak, or [`Error::VersionMismatch`].
164pub fn negotiate_version(
165    our_min: ProtoVersion,
166    our_max: ProtoVersion,
167    their_min: ProtoVersion,
168    their_max: ProtoVersion,
169) -> crate::error::Result<ProtoVersion> {
170    let lo = our_min.max(their_min);
171    let hi = our_max.min(their_max);
172    if lo > hi {
173        Err(crate::error::Error::VersionMismatch {
174            our_min,
175            our_max,
176            their_min,
177            their_max,
178        })
179    } else {
180        Ok(hi)
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    #[test]
189    fn negotiate_picks_highest_overlap() {
190        assert_eq!(negotiate_version(1, 3, 2, 5).unwrap(), 3);
191        assert_eq!(negotiate_version(1, 1, 1, 1).unwrap(), 1);
192        assert_eq!(negotiate_version(1, 5, 3, 4).unwrap(), 4);
193    }
194
195    #[test]
196    fn negotiate_rejects_disjoint() {
197        assert!(matches!(
198            negotiate_version(1, 1, 2, 2),
199            Err(crate::error::Error::VersionMismatch { .. })
200        ));
201    }
202
203    #[test]
204    fn handoff_id_unique() {
205        let a = HandoffId::new();
206        let b = HandoffId::new();
207        assert_ne!(a, b);
208    }
209}