handoff/role.rs
1//! Successor detection via env vars + inherited listener handling.
2//!
3//! Spawned-as-successor processes receive their identity through env vars
4//! (systemd-style):
5//!
6//! - `HANDOFF_ROLE=successor`
7//! - `HANDOFF_SOCK_FD=<n>` — open Unix socket to supervisor
8//! - `LISTEN_FDS=<n>` — count of inherited listener FDs (starting at FD 3)
9//! - `LISTEN_FDNAMES=resp:http:…` — colon-separated logical names in FD order
10//!
11//! [`detect_role`] reads these and consumes them so an accidental double-detect
12//! gives [`Role::ColdStart`] (which is what fresh re-execs should do).
13
14// Env mutation (`env::remove_var`, `env::set_var`) is `unsafe` in Rust 2024
15// because it races with concurrent env reads in other threads; this module
16// is contracted to run before the primitive spawns its serving threads.
17// `FromRawFd` is `unsafe` because the safe wrapper assumes exclusive
18// ownership; the supervisor handed us the FD via fork+exec, so that holds.
19#![allow(unsafe_code)]
20
21use std::collections::HashMap;
22use std::env;
23use std::marker::PhantomData;
24use std::net::TcpListener;
25use std::os::fd::{FromRawFd, RawFd};
26use std::os::unix::net::UnixStream;
27use std::sync::mpsc;
28use std::thread;
29use std::time::Duration;
30
31use crate::drainable::ReadinessSnapshot;
32use crate::error::{Error, Result};
33use crate::frame::{read_message, write_message};
34use crate::protocol::{
35 Capabilities, HandoffId, Message, PROTO_MAX, PROTO_MIN, ProtoVersion, Side, short_name,
36};
37use crate::util::now_unix_ms;
38
39/// Cadence matched to the incumbent's heartbeat thread; with the
40/// supervisor's `LIVENESS_TIMEOUT` of 10s, 2s gives a 5× margin against
41/// scheduler hiccups before the supervisor would declare the successor dead.
42const SUCCESSOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(2);
43
44pub const ENV_HANDOFF_ROLE: &str = "HANDOFF_ROLE";
45pub const ENV_HANDOFF_SOCK_FD: &str = "HANDOFF_SOCK_FD";
46pub const ENV_LISTEN_FDS: &str = "LISTEN_FDS";
47pub const ENV_LISTEN_FDNAMES: &str = "LISTEN_FDNAMES";
48
49/// Inherited listener FDs start here, matching the systemd convention.
50pub const SD_LISTEN_FDS_START: RawFd = 3;
51
52pub enum Role {
53 /// No `HANDOFF_ROLE` env var was set. This is a fresh boot — either a
54 /// supervisor's first spawn (in which case `inherited` may carry listeners)
55 /// or an unsupervised local-dev run (in which case `inherited` is empty
56 /// and the primitive binds its own listeners).
57 ColdStart { inherited: InheritedListeners },
58 /// Spawned as a successor mid-handoff. Use the embedded [`Successor`] to
59 /// drive the protocol (handshake, wait_for_begin, take_listener, announce_ready).
60 Successor(Successor),
61}
62
63/// Listener FDs handed in via `LISTEN_FDS` / `LISTEN_FDNAMES`. Available on
64/// both `ColdStart` (supervisor's first spawn) and inside `Successor`.
65#[derive(Default)]
66pub struct InheritedListeners {
67 listeners: HashMap<String, RawFd>,
68}
69
70impl InheritedListeners {
71 /// Consume the inherited listener for `name`. Returns `None` if no such
72 /// listener was passed (or it was already taken).
73 pub fn take(&mut self, name: &str) -> Option<TcpListener> {
74 let fd = self.listeners.remove(name)?;
75 // SAFETY: kernel inherited the FD to us via fork+exec; we own it.
76 Some(unsafe { TcpListener::from_raw_fd(fd) })
77 }
78
79 /// Names of all listeners that haven't yet been taken.
80 pub fn names(&self) -> Vec<String> {
81 self.listeners.keys().cloned().collect()
82 }
83
84 /// True if no listeners were inherited (or all have been taken).
85 pub fn is_empty(&self) -> bool {
86 self.listeners.is_empty()
87 }
88}
89
90/// Successor-side state machine, encoded as three concrete types so the
91/// compiler enforces protocol ordering. Lifecycle:
92///
93/// 1. [`detect_role`] returns [`Role::Successor(Successor)`] — initial state.
94/// 2. [`Successor::handshake`] consumes self and returns [`HandshookSuccessor`].
95/// 3. [`HandshookSuccessor::wait_for_begin`] consumes self and returns
96/// [`BegunSuccessor`].
97/// 4. From [`BegunSuccessor`] the consumer takes inherited listeners,
98/// opens its state, then calls
99/// [`BegunSuccessor::announce_and_bind`] (preferred) or
100/// [`BegunSuccessor::announce_ready`].
101///
102/// Out-of-order calls don't compile: there is no path from `Successor` to
103/// `take_listener` or `announce_ready` that doesn't pass through every
104/// preceding state.
105pub struct Successor {
106 control: UnixStream,
107 inherited: InheritedListeners,
108}
109
110/// `Hello`/`HelloAck` exchanged with the supervisor; waiting for `Begin`.
111/// Created by [`Successor::handshake`].
112pub struct HandshookSuccessor {
113 control: UnixStream,
114 inherited: InheritedListeners,
115 handoff_id: HandoffId,
116 proto_version: ProtoVersion,
117}
118
119/// `Begin` received from the supervisor; the consumer may now take its
120/// inherited listeners, open state, and announce readiness. Created by
121/// [`HandshookSuccessor::wait_for_begin`].
122pub struct BegunSuccessor {
123 control: UnixStream,
124 inherited: InheritedListeners,
125 handoff_id: HandoffId,
126 proto_version: ProtoVersion,
127}
128
129/// Inspect the environment and decide whether this process is a fresh start
130/// or a successor of a running supervisor.
131///
132/// In both cases, any `LISTEN_FDS`/`LISTEN_FDNAMES` are consumed into the
133/// returned struct so the caller can take ownership of the inherited
134/// listeners. Env vars are removed so re-entry yields a clean state.
135pub fn detect_role() -> Result<Role> {
136 let inherited = read_inherited_listeners();
137 // SAFETY: `env::remove_var` races with concurrent env reads on other
138 // threads (`std::env::set_var` / `getenv` from libc). `detect_role` is
139 // contracted to run during single-threaded startup before the primitive
140 // spawns its serving threads — see the module docstring. Callers that
141 // violate that contract are responsible for the data race.
142 unsafe {
143 env::remove_var(ENV_LISTEN_FDS);
144 env::remove_var(ENV_LISTEN_FDNAMES);
145 }
146
147 match env::var(ENV_HANDOFF_ROLE) {
148 Ok(s) if s == "successor" => {}
149 _ => return Ok(Role::ColdStart { inherited }),
150 }
151
152 let sock_raw =
153 env::var(ENV_HANDOFF_SOCK_FD).map_err(|_| Error::MissingEnv(ENV_HANDOFF_SOCK_FD))?;
154 let sock_fd: RawFd = sock_raw.parse().map_err(|_| Error::BadEnv {
155 var: ENV_HANDOFF_SOCK_FD,
156 value: sock_raw,
157 })?;
158
159 // SAFETY: same single-threaded-startup invariant as the listener env
160 // removal above; clearing these vars makes a re-entry take the
161 // ColdStart branch instead of trying to re-attach to a consumed FD.
162 unsafe {
163 env::remove_var(ENV_HANDOFF_ROLE);
164 env::remove_var(ENV_HANDOFF_SOCK_FD);
165 }
166
167 // SAFETY: the supervisor handed us this FD via `fork+exec`. It's open and
168 // owned by us from here on.
169 let control = unsafe { UnixStream::from_raw_fd(sock_fd) };
170 Ok(Role::Successor(Successor { control, inherited }))
171}
172
173fn read_inherited_listeners() -> InheritedListeners {
174 let count: usize = env::var(ENV_LISTEN_FDS)
175 .ok()
176 .and_then(|s| s.parse().ok())
177 .unwrap_or(0);
178 if count == 0 {
179 return InheritedListeners::default();
180 }
181 let names: Vec<String> = env::var(ENV_LISTEN_FDNAMES)
182 .ok()
183 .map(|s| s.split(':').map(|s| s.to_string()).collect())
184 .unwrap_or_default();
185 let mut map = HashMap::with_capacity(count);
186 for i in 0..count {
187 let fd = SD_LISTEN_FDS_START + i as RawFd;
188 let name = names.get(i).cloned().unwrap_or_else(|| i.to_string());
189 map.insert(name, fd);
190 }
191 InheritedListeners { listeners: map }
192}
193
194impl Successor {
195 /// Send `Hello`, receive `HelloAck`. Consumes self and returns a
196 /// [`HandshookSuccessor`] from which the next phase can proceed. On
197 /// protocol error the underlying `UnixStream` and inherited listeners
198 /// are dropped — the caller has no usable post-error state.
199 pub fn handshake(mut self, build_id: Vec<u8>) -> Result<HandshookSuccessor> {
200 let hello = Message::Hello {
201 role: Side::Successor,
202 pid: std::process::id(),
203 build_id,
204 proto_min: PROTO_MIN,
205 proto_max: PROTO_MAX,
206 capabilities: Capabilities::default(),
207 };
208 write_message(&mut self.control, PROTO_MAX, &hello)?;
209 let (_ver, ack) = read_message(&mut self.control)?;
210 match ack {
211 Message::HelloAck {
212 proto_version_chosen,
213 handoff_id,
214 } => Ok(HandshookSuccessor {
215 control: self.control,
216 inherited: self.inherited,
217 handoff_id,
218 proto_version: proto_version_chosen,
219 }),
220 other => Err(Error::UnexpectedMessage(short_name(&other))),
221 }
222 }
223
224 /// Names of all inherited listeners. Safe to call before `handshake` for
225 /// diagnostic / sanity checks (e.g. asserting the supervisor passed the
226 /// expected listeners). Listener consumption only happens post-`Begin`
227 /// via [`BegunSuccessor::take_listener`].
228 pub fn listener_names(&self) -> Vec<String> {
229 self.inherited.names()
230 }
231}
232
233impl HandshookSuccessor {
234 /// Block until the supervisor sends `Begin`. Consumes self and returns a
235 /// [`BegunSuccessor`] on success. `Abort` from the supervisor surfaces
236 /// as [`Error::Aborted`]; `Heartbeat` frames are skipped silently. A
237 /// `Begin` with a different handoff id than the one negotiated in
238 /// [`Successor::handshake`] returns [`Error::Protocol`] — that's a
239 /// supervisor bug, not a recoverable condition.
240 pub fn wait_for_begin(mut self) -> Result<BegunSuccessor> {
241 let expected = self.handoff_id;
242 loop {
243 let (_ver, msg) = read_message(&mut self.control)?;
244 match msg {
245 Message::Begin { handoff_id } if handoff_id == expected => {
246 return Ok(BegunSuccessor {
247 control: self.control,
248 inherited: self.inherited,
249 handoff_id,
250 proto_version: self.proto_version,
251 });
252 }
253 Message::Begin { handoff_id } => {
254 return Err(Error::Protocol(format!(
255 "Begin handoff_id {handoff_id} does not match \
256 handshake id {expected}"
257 )));
258 }
259 Message::Abort { reason, .. } => return Err(Error::Aborted(reason)),
260 Message::Heartbeat { .. } => continue,
261 other => return Err(Error::UnexpectedMessage(short_name(&other))),
262 }
263 }
264 }
265
266 /// Names of all inherited listeners. Diagnostic accessor.
267 pub fn listener_names(&self) -> Vec<String> {
268 self.inherited.names()
269 }
270
271 /// The handoff id negotiated in [`Successor::handshake`].
272 pub fn handoff_id(&self) -> HandoffId {
273 self.handoff_id
274 }
275}
276
277impl BegunSuccessor {
278 /// Consume the inherited listener for `name`. Returns `None` if no such
279 /// listener was passed (or has already been taken).
280 pub fn take_listener(&mut self, name: &str) -> Option<TcpListener> {
281 self.inherited.take(name)
282 }
283
284 /// Names of inherited listeners that haven't yet been taken.
285 pub fn listener_names(&self) -> Vec<String> {
286 self.inherited.names()
287 }
288
289 /// The handoff id negotiated in [`Successor::handshake`].
290 pub fn handoff_id(&self) -> HandoffId {
291 self.handoff_id
292 }
293
294 /// Send `Ready`. Consumes self because once the supervisor knows we're
295 /// ready, the main serving loop takes over and this object's job is done.
296 ///
297 /// # Caveat: don't bind the control socket immediately after this
298 ///
299 /// After `Ready` is sent there is a brief window during which the
300 /// supervisor still has to deliver `Commit` to the prior incumbent and
301 /// that incumbent still has to exit. During that window the prior
302 /// incumbent is the authoritative owner of the control-socket path;
303 /// rebinding from the successor here would unlink its path-binding and
304 /// break the abort path if the successor then crashes.
305 ///
306 /// Prefer [`announce_and_bind`](Self::announce_and_bind) — it combines
307 /// `Ready` + bind into one call and is the only path that orders them
308 /// correctly by construction. Use this lower-level entry point only
309 /// when you genuinely need to delay binding (e.g. for additional
310 /// post-`Ready` setup that does not require the control socket).
311 pub fn announce_ready(mut self, snapshot: ReadinessSnapshot) -> Result<()> {
312 let ready = Message::Ready {
313 handoff_id: self.handoff_id,
314 listening_on: snapshot.listening_on,
315 healthz_ok: snapshot.healthz_ok,
316 advertised_revision_per_shard: snapshot.advertised_revision_per_shard,
317 };
318 write_message(&mut self.control, self.proto_version, &ready)?;
319 Ok(())
320 }
321
322 /// Send `Ready` to the supervisor, then bind this process as the new
323 /// incumbent on `socket_path`. The two operations are combined to
324 /// enforce ordering: by the time the bind runs, the supervisor has
325 /// observed `Ready` and is about to (or has just) committed the prior
326 /// incumbent, so the path-binding takeover is safe.
327 ///
328 /// This is the safe path for successor processes; cold-start callers
329 /// use [`crate::Incumbent::bind_cold_start`] directly.
330 pub fn announce_and_bind(
331 self,
332 snapshot: ReadinessSnapshot,
333 socket_path: &std::path::Path,
334 lock: crate::DataDirLock,
335 ) -> Result<crate::Incumbent> {
336 self.announce_ready(snapshot)?;
337 crate::Incumbent::bind_after_ready(socket_path, lock)
338 }
339
340 /// Spawn a background thread that emits `Heartbeat` frames on the
341 /// control socket every ~2s. The returned guard stops + joins the
342 /// thread on drop. Use this to keep the supervisor's per-recv
343 /// `LIVENESS_TIMEOUT` (10s) from tripping while the successor is
344 /// doing slow synchronous init work between `wait_for_begin` and
345 /// `announce_and_bind` — DB pool warm-up, state rebuild, TLS load,
346 /// etc. Mirrors the incumbent's existing heartbeat thread which
347 /// covers `drain` / `seal`.
348 ///
349 /// # Ordering contract
350 ///
351 /// **The guard must be dropped before `announce_ready` /
352 /// `announce_and_bind`.** While the guard is live, the heartbeat
353 /// thread is the sole writer to the control socket; interleaving
354 /// the main thread's `Ready` frame with a heartbeat would corrupt
355 /// the wire. The borrow of `&self` enforces this: the compiler
356 /// rejects any path that calls a consuming method while the guard
357 /// is still alive.
358 ///
359 /// # Failure mode
360 ///
361 /// On `try_clone` failure (rare; FD table exhausted) the returned
362 /// guard is inert — no thread spawned — and a warning is logged.
363 /// The supervisor's liveness timer then bounds init in wall-clock
364 /// terms exactly as before this API existed.
365 pub fn start_heartbeats(&self) -> HeartbeatGuard<'_> {
366 HeartbeatGuard::start(&self.control, self.proto_version)
367 }
368}
369
370/// RAII handle for a successor-side heartbeat thread. See
371/// [`BegunSuccessor::start_heartbeats`].
372pub struct HeartbeatGuard<'a> {
373 stop_tx: Option<mpsc::Sender<()>>,
374 thread: Option<thread::JoinHandle<()>>,
375 _borrow: PhantomData<&'a UnixStream>,
376}
377
378impl<'a> HeartbeatGuard<'a> {
379 fn start(stream: &'a UnixStream, chosen: ProtoVersion) -> Self {
380 let writer = match stream.try_clone() {
381 Ok(w) => w,
382 Err(e) => {
383 tracing::warn!(
384 error = %e,
385 "could not clone control stream for successor heartbeats; running without"
386 );
387 return Self {
388 stop_tx: None,
389 thread: None,
390 _borrow: PhantomData,
391 };
392 }
393 };
394 let (stop_tx, stop_rx) = mpsc::channel::<()>();
395 let thread = thread::spawn(move || {
396 let mut writer = writer;
397 // `recv_timeout` returns Err on timeout — "no stop yet, send
398 // another heartbeat". `Ok(())` or any other Err (sender
399 // dropped) is the stop signal.
400 while stop_rx.recv_timeout(SUCCESSOR_HEARTBEAT_INTERVAL).is_err() {
401 let msg = Message::Heartbeat {
402 ts_ms: now_unix_ms(),
403 };
404 if write_message(&mut writer, chosen, &msg).is_err() {
405 // Supervisor gone or socket broken — no point continuing.
406 return;
407 }
408 }
409 });
410 Self {
411 stop_tx: Some(stop_tx),
412 thread: Some(thread),
413 _borrow: PhantomData,
414 }
415 }
416}
417
418impl Drop for HeartbeatGuard<'_> {
419 fn drop(&mut self) {
420 if let Some(tx) = self.stop_tx.take() {
421 let _ = tx.send(());
422 }
423 if let Some(h) = self.thread.take() {
424 let _ = h.join();
425 }
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432
433 // Env mutation is process-global; run env-touching tests sequentially in
434 // one function to avoid races with the cargo test thread pool.
435 #[test]
436 fn detect_env_branches() {
437 // SAFETY: env mutation is process-global and unsafe under
438 // concurrent reads. This whole test runs as a single function on
439 // one thread; no other code in the test process touches these
440 // handoff-specific vars, so there is no concurrent reader to race.
441 unsafe {
442 env::remove_var(ENV_HANDOFF_ROLE);
443 env::remove_var(ENV_HANDOFF_SOCK_FD);
444 env::remove_var(ENV_LISTEN_FDS);
445 env::remove_var(ENV_LISTEN_FDNAMES);
446 }
447 assert!(matches!(detect_role().unwrap(), Role::ColdStart { .. }));
448
449 // SAFETY: same single-threaded-test invariant as above.
450 unsafe {
451 env::set_var(ENV_HANDOFF_ROLE, "other");
452 }
453 assert!(matches!(detect_role().unwrap(), Role::ColdStart { .. }));
454 // SAFETY: same single-threaded-test invariant as above.
455 unsafe {
456 env::remove_var(ENV_HANDOFF_ROLE);
457 }
458 }
459}