1use serde::{Deserialize, Serialize};
16use uuid::Uuid;
17
18pub type ProtoVersion = u16;
20
21pub const PROTO_MIN: ProtoVersion = 1;
23pub const PROTO_MAX: ProtoVersion = 1;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31pub enum Side {
32 Incumbent,
33 Successor,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct HandoffId(pub Uuid);
39
40impl HandoffId {
41 pub fn new() -> Self {
42 Self(Uuid::new_v4())
43 }
44}
45
46impl Default for HandoffId {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl std::fmt::Display for HandoffId {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 self.0.fmt(f)
55 }
56}
57
58#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
60pub struct Capabilities {
61 pub reserved: u64,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
67pub enum Message {
68 Hello {
70 role: Side,
71 pid: u32,
72 build_id: Vec<u8>,
75 proto_min: ProtoVersion,
76 proto_max: ProtoVersion,
77 capabilities: Capabilities,
78 },
79 HelloAck {
81 proto_version_chosen: ProtoVersion,
82 handoff_id: HandoffId,
83 },
84 PrepareHandoff {
86 handoff_id: HandoffId,
87 successor_pid: u32,
88 deadline_ms: u64,
89 drain_grace_ms: u64,
90 },
91 Drained {
93 open_conns_remaining: u32,
94 accept_closed: bool,
95 },
96 SealRequest { handoff_id: HandoffId },
98 SealProgress {
100 shards_sealed: u32,
101 shards_total: u32,
102 last_revision: u64,
103 },
104 SealComplete {
106 handoff_id: HandoffId,
107 last_revision_per_shard: Vec<u64>,
108 data_dir_fingerprint: [u8; 32],
109 },
110 SealFailed {
112 handoff_id: HandoffId,
113 error: String,
114 partial_state: String,
115 },
116 Begin { handoff_id: HandoffId },
118 Ready {
120 handoff_id: HandoffId,
121 listening_on: Vec<String>,
122 healthz_ok: bool,
123 advertised_revision_per_shard: Vec<u64>,
124 },
125 Commit { handoff_id: HandoffId },
127 Abort {
129 handoff_id: HandoffId,
130 reason: String,
131 },
132 ResumeAfterAbort { handoff_id: HandoffId },
134 Heartbeat { ts_ms: u64 },
137}
138
139pub fn short_name(msg: &Message) -> &'static str {
144 match msg {
145 Message::Hello { .. } => "Hello",
146 Message::HelloAck { .. } => "HelloAck",
147 Message::PrepareHandoff { .. } => "PrepareHandoff",
148 Message::Drained { .. } => "Drained",
149 Message::SealRequest { .. } => "SealRequest",
150 Message::SealProgress { .. } => "SealProgress",
151 Message::SealComplete { .. } => "SealComplete",
152 Message::SealFailed { .. } => "SealFailed",
153 Message::Begin { .. } => "Begin",
154 Message::Ready { .. } => "Ready",
155 Message::Commit { .. } => "Commit",
156 Message::Abort { .. } => "Abort",
157 Message::ResumeAfterAbort { .. } => "ResumeAfterAbort",
158 Message::Heartbeat { .. } => "Heartbeat",
159 }
160}
161
162pub fn negotiate_version(
165 our_min: ProtoVersion,
166 our_max: ProtoVersion,
167 their_min: ProtoVersion,
168 their_max: ProtoVersion,
169) -> crate::error::Result<ProtoVersion> {
170 let lo = our_min.max(their_min);
171 let hi = our_max.min(their_max);
172 if lo > hi {
173 Err(crate::error::Error::VersionMismatch {
174 our_min,
175 our_max,
176 their_min,
177 their_max,
178 })
179 } else {
180 Ok(hi)
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 #[test]
189 fn negotiate_picks_highest_overlap() {
190 assert_eq!(negotiate_version(1, 3, 2, 5).unwrap(), 3);
191 assert_eq!(negotiate_version(1, 1, 1, 1).unwrap(), 1);
192 assert_eq!(negotiate_version(1, 5, 3, 4).unwrap(), 4);
193 }
194
195 #[test]
196 fn negotiate_rejects_disjoint() {
197 assert!(matches!(
198 negotiate_version(1, 1, 2, 2),
199 Err(crate::error::Error::VersionMismatch { .. })
200 ));
201 }
202
203 #[test]
204 fn handoff_id_unique() {
205 let a = HandoffId::new();
206 let b = HandoffId::new();
207 assert_ne!(a, b);
208 }
209}