Skip to main content

handoff/
role.rs

1//! Successor detection via env vars + inherited listener handling.
2//!
3//! Spawned-as-successor processes receive their identity through env vars
4//! (systemd-style):
5//!
6//! - `HANDOFF_ROLE=successor`
7//! - `HANDOFF_SOCK_FD=<n>` — open Unix socket to supervisor
8//! - `LISTEN_FDS=<n>` — count of inherited listener FDs (starting at FD 3)
9//! - `LISTEN_FDNAMES=resp:http:…` — colon-separated logical names in FD order
10//!
11//! [`detect_role`] reads these and consumes them so an accidental double-detect
12//! gives [`Role::ColdStart`] (which is what fresh re-execs should do).
13
14// Env mutation (`env::remove_var`, `env::set_var`) is `unsafe` in Rust 2024
15// because it races with concurrent env reads in other threads; this module
16// is contracted to run before the primitive spawns its serving threads.
17// `FromRawFd` is `unsafe` because the safe wrapper assumes exclusive
18// ownership; the supervisor handed us the FD via fork+exec, so that holds.
19#![allow(unsafe_code)]
20
21use std::collections::HashMap;
22use std::env;
23use std::marker::PhantomData;
24use std::net::TcpListener;
25use std::os::fd::{FromRawFd, RawFd};
26use std::os::unix::net::UnixStream;
27use std::sync::mpsc;
28use std::thread;
29use std::time::Duration;
30
31use crate::drainable::ReadinessSnapshot;
32use crate::error::{Error, Result};
33use crate::frame::{read_message, write_message};
34use crate::protocol::{
35    Capabilities, HandoffId, Message, PROTO_MAX, PROTO_MIN, ProtoVersion, Side, short_name,
36};
37use crate::util::now_unix_ms;
38
39/// Cadence matched to the incumbent's heartbeat thread; with the
40/// supervisor's `LIVENESS_TIMEOUT` of 10s, 2s gives a 5× margin against
41/// scheduler hiccups before the supervisor would declare the successor dead.
42const SUCCESSOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(2);
43
44pub const ENV_HANDOFF_ROLE: &str = "HANDOFF_ROLE";
45pub const ENV_HANDOFF_SOCK_FD: &str = "HANDOFF_SOCK_FD";
46pub const ENV_LISTEN_FDS: &str = "LISTEN_FDS";
47pub const ENV_LISTEN_FDNAMES: &str = "LISTEN_FDNAMES";
48
49/// Inherited listener FDs start here, matching the systemd convention.
50pub const SD_LISTEN_FDS_START: RawFd = 3;
51
52pub enum Role {
53    /// No `HANDOFF_ROLE` env var was set. This is a fresh boot — either a
54    /// supervisor's first spawn (in which case `inherited` may carry listeners)
55    /// or an unsupervised local-dev run (in which case `inherited` is empty
56    /// and the primitive binds its own listeners).
57    ColdStart { inherited: InheritedListeners },
58    /// Spawned as a successor mid-handoff. Use the embedded [`Successor`] to
59    /// drive the protocol (handshake, wait_for_begin, take_listener, announce_ready).
60    Successor(Successor),
61}
62
63/// Listener FDs handed in via `LISTEN_FDS` / `LISTEN_FDNAMES`. Available on
64/// both `ColdStart` (supervisor's first spawn) and inside `Successor`.
65#[derive(Default)]
66pub struct InheritedListeners {
67    listeners: HashMap<String, RawFd>,
68}
69
70impl InheritedListeners {
71    /// Consume the inherited listener for `name`. Returns `None` if no such
72    /// listener was passed (or it was already taken).
73    pub fn take(&mut self, name: &str) -> Option<TcpListener> {
74        let fd = self.listeners.remove(name)?;
75        // SAFETY: kernel inherited the FD to us via fork+exec; we own it.
76        Some(unsafe { TcpListener::from_raw_fd(fd) })
77    }
78
79    /// Names of all listeners that haven't yet been taken.
80    pub fn names(&self) -> Vec<String> {
81        self.listeners.keys().cloned().collect()
82    }
83
84    /// True if no listeners were inherited (or all have been taken).
85    pub fn is_empty(&self) -> bool {
86        self.listeners.is_empty()
87    }
88}
89
90/// Successor-side state machine, encoded as three concrete types so the
91/// compiler enforces protocol ordering. Lifecycle:
92///
93/// 1. [`detect_role`] returns [`Role::Successor(Successor)`] — initial state.
94/// 2. [`Successor::handshake`] consumes self and returns [`HandshookSuccessor`].
95/// 3. [`HandshookSuccessor::wait_for_begin`] consumes self and returns
96///    [`BegunSuccessor`].
97/// 4. From [`BegunSuccessor`] the consumer takes inherited listeners,
98///    opens its state, then calls
99///    [`BegunSuccessor::announce_and_bind`] (preferred) or
100///    [`BegunSuccessor::announce_ready`].
101///
102/// Out-of-order calls don't compile: there is no path from `Successor` to
103/// `take_listener` or `announce_ready` that doesn't pass through every
104/// preceding state.
105pub struct Successor {
106    control: UnixStream,
107    inherited: InheritedListeners,
108}
109
110/// `Hello`/`HelloAck` exchanged with the supervisor; waiting for `Begin`.
111/// Created by [`Successor::handshake`].
112pub struct HandshookSuccessor {
113    control: UnixStream,
114    inherited: InheritedListeners,
115    handoff_id: HandoffId,
116    proto_version: ProtoVersion,
117}
118
119/// `Begin` received from the supervisor; the consumer may now take its
120/// inherited listeners, open state, and announce readiness. Created by
121/// [`HandshookSuccessor::wait_for_begin`].
122pub struct BegunSuccessor {
123    control: UnixStream,
124    inherited: InheritedListeners,
125    handoff_id: HandoffId,
126    proto_version: ProtoVersion,
127}
128
129/// Inspect the environment and decide whether this process is a fresh start
130/// or a successor of a running supervisor.
131///
132/// In both cases, any `LISTEN_FDS`/`LISTEN_FDNAMES` are consumed into the
133/// returned struct so the caller can take ownership of the inherited
134/// listeners. Env vars are removed so re-entry yields a clean state.
135pub fn detect_role() -> Result<Role> {
136    let inherited = read_inherited_listeners();
137    // SAFETY: `env::remove_var` races with concurrent env reads on other
138    // threads (`std::env::set_var` / `getenv` from libc). `detect_role` is
139    // contracted to run during single-threaded startup before the primitive
140    // spawns its serving threads — see the module docstring. Callers that
141    // violate that contract are responsible for the data race.
142    unsafe {
143        env::remove_var(ENV_LISTEN_FDS);
144        env::remove_var(ENV_LISTEN_FDNAMES);
145    }
146
147    match env::var(ENV_HANDOFF_ROLE) {
148        Ok(s) if s == "successor" => {}
149        _ => return Ok(Role::ColdStart { inherited }),
150    }
151
152    let sock_raw =
153        env::var(ENV_HANDOFF_SOCK_FD).map_err(|_| Error::MissingEnv(ENV_HANDOFF_SOCK_FD))?;
154    let sock_fd: RawFd = sock_raw.parse().map_err(|_| Error::BadEnv {
155        var: ENV_HANDOFF_SOCK_FD,
156        value: sock_raw,
157    })?;
158
159    // SAFETY: same single-threaded-startup invariant as the listener env
160    // removal above; clearing these vars makes a re-entry take the
161    // ColdStart branch instead of trying to re-attach to a consumed FD.
162    unsafe {
163        env::remove_var(ENV_HANDOFF_ROLE);
164        env::remove_var(ENV_HANDOFF_SOCK_FD);
165    }
166
167    // SAFETY: the supervisor handed us this FD via `fork+exec`. It's open and
168    // owned by us from here on.
169    let control = unsafe { UnixStream::from_raw_fd(sock_fd) };
170    Ok(Role::Successor(Successor { control, inherited }))
171}
172
173fn read_inherited_listeners() -> InheritedListeners {
174    let count: usize = env::var(ENV_LISTEN_FDS)
175        .ok()
176        .and_then(|s| s.parse().ok())
177        .unwrap_or(0);
178    if count == 0 {
179        return InheritedListeners::default();
180    }
181    let names: Vec<String> = env::var(ENV_LISTEN_FDNAMES)
182        .ok()
183        .map(|s| s.split(':').map(|s| s.to_string()).collect())
184        .unwrap_or_default();
185    let mut map = HashMap::with_capacity(count);
186    for i in 0..count {
187        let fd = SD_LISTEN_FDS_START + i as RawFd;
188        let name = names.get(i).cloned().unwrap_or_else(|| i.to_string());
189        map.insert(name, fd);
190    }
191    InheritedListeners { listeners: map }
192}
193
194impl Successor {
195    /// Send `Hello`, receive `HelloAck`. Consumes self and returns a
196    /// [`HandshookSuccessor`] from which the next phase can proceed. On
197    /// protocol error the underlying `UnixStream` and inherited listeners
198    /// are dropped — the caller has no usable post-error state.
199    pub fn handshake(mut self, build_id: Vec<u8>) -> Result<HandshookSuccessor> {
200        let hello = Message::Hello {
201            role: Side::Successor,
202            pid: std::process::id(),
203            build_id,
204            proto_min: PROTO_MIN,
205            proto_max: PROTO_MAX,
206            capabilities: Capabilities::default(),
207        };
208        write_message(&mut self.control, PROTO_MAX, &hello)?;
209        let (_ver, ack) = read_message(&mut self.control)?;
210        match ack {
211            Message::HelloAck {
212                proto_version_chosen,
213                handoff_id,
214            } => Ok(HandshookSuccessor {
215                control: self.control,
216                inherited: self.inherited,
217                handoff_id,
218                proto_version: proto_version_chosen,
219            }),
220            other => Err(Error::UnexpectedMessage(short_name(&other))),
221        }
222    }
223
224    /// Names of all inherited listeners. Safe to call before `handshake` for
225    /// diagnostic / sanity checks (e.g. asserting the supervisor passed the
226    /// expected listeners). Listener consumption only happens post-`Begin`
227    /// via [`BegunSuccessor::take_listener`].
228    pub fn listener_names(&self) -> Vec<String> {
229        self.inherited.names()
230    }
231}
232
233impl HandshookSuccessor {
234    /// Block until the supervisor sends `Begin`. Consumes self and returns a
235    /// [`BegunSuccessor`] on success. `Abort` from the supervisor surfaces
236    /// as [`Error::Aborted`]; `Heartbeat` frames are skipped silently. A
237    /// `Begin` with a different handoff id than the one negotiated in
238    /// [`Successor::handshake`] returns [`Error::Protocol`] — that's a
239    /// supervisor bug, not a recoverable condition.
240    pub fn wait_for_begin(mut self) -> Result<BegunSuccessor> {
241        let expected = self.handoff_id;
242        loop {
243            let (_ver, msg) = read_message(&mut self.control)?;
244            match msg {
245                Message::Begin { handoff_id } if handoff_id == expected => {
246                    return Ok(BegunSuccessor {
247                        control: self.control,
248                        inherited: self.inherited,
249                        handoff_id,
250                        proto_version: self.proto_version,
251                    });
252                }
253                Message::Begin { handoff_id } => {
254                    return Err(Error::Protocol(format!(
255                        "Begin handoff_id {handoff_id} does not match \
256                         handshake id {expected}"
257                    )));
258                }
259                Message::Abort { reason, .. } => return Err(Error::Aborted(reason)),
260                Message::Heartbeat { .. } => continue,
261                other => return Err(Error::UnexpectedMessage(short_name(&other))),
262            }
263        }
264    }
265
266    /// Names of all inherited listeners. Diagnostic accessor.
267    pub fn listener_names(&self) -> Vec<String> {
268        self.inherited.names()
269    }
270
271    /// The handoff id negotiated in [`Successor::handshake`].
272    pub fn handoff_id(&self) -> HandoffId {
273        self.handoff_id
274    }
275}
276
277impl BegunSuccessor {
278    /// Consume the inherited listener for `name`. Returns `None` if no such
279    /// listener was passed (or has already been taken).
280    pub fn take_listener(&mut self, name: &str) -> Option<TcpListener> {
281        self.inherited.take(name)
282    }
283
284    /// Names of inherited listeners that haven't yet been taken.
285    pub fn listener_names(&self) -> Vec<String> {
286        self.inherited.names()
287    }
288
289    /// The handoff id negotiated in [`Successor::handshake`].
290    pub fn handoff_id(&self) -> HandoffId {
291        self.handoff_id
292    }
293
294    /// Send `Ready`. Consumes self because once the supervisor knows we're
295    /// ready, the main serving loop takes over and this object's job is done.
296    ///
297    /// # Caveat: don't bind the control socket immediately after this
298    ///
299    /// After `Ready` is sent there is a brief window during which the
300    /// supervisor still has to deliver `Commit` to the prior incumbent and
301    /// that incumbent still has to exit. During that window the prior
302    /// incumbent is the authoritative owner of the control-socket path;
303    /// rebinding from the successor here would unlink its path-binding and
304    /// break the abort path if the successor then crashes.
305    ///
306    /// Prefer [`announce_and_bind`](Self::announce_and_bind) — it combines
307    /// `Ready` + bind into one call and is the only path that orders them
308    /// correctly by construction. Use this lower-level entry point only
309    /// when you genuinely need to delay binding (e.g. for additional
310    /// post-`Ready` setup that does not require the control socket).
311    pub fn announce_ready(mut self, snapshot: ReadinessSnapshot) -> Result<()> {
312        let ready = Message::Ready {
313            handoff_id: self.handoff_id,
314            listening_on: snapshot.listening_on,
315            healthz_ok: snapshot.healthz_ok,
316            advertised_revision_per_shard: snapshot.advertised_revision_per_shard,
317        };
318        write_message(&mut self.control, self.proto_version, &ready)?;
319        Ok(())
320    }
321
322    /// Send `Ready` to the supervisor, then bind this process as the new
323    /// incumbent on `socket_path`. The two operations are combined to
324    /// enforce ordering: by the time the bind runs, the supervisor has
325    /// observed `Ready` and is about to (or has just) committed the prior
326    /// incumbent, so the path-binding takeover is safe.
327    ///
328    /// This is the safe path for successor processes; cold-start callers
329    /// use [`crate::Incumbent::bind_cold_start`] directly.
330    pub fn announce_and_bind(
331        self,
332        snapshot: ReadinessSnapshot,
333        socket_path: &std::path::Path,
334        lock: crate::DataDirLock,
335    ) -> Result<crate::Incumbent> {
336        self.announce_ready(snapshot)?;
337        crate::Incumbent::bind_after_ready(socket_path, lock)
338    }
339
340    /// Spawn a background thread that emits `Heartbeat` frames on the
341    /// control socket every ~2s. The returned guard stops + joins the
342    /// thread on drop. Use this to keep the supervisor's per-recv
343    /// `LIVENESS_TIMEOUT` (10s) from tripping while the successor is
344    /// doing slow synchronous init work between `wait_for_begin` and
345    /// `announce_and_bind` — DB pool warm-up, state rebuild, TLS load,
346    /// etc. Mirrors the incumbent's existing heartbeat thread which
347    /// covers `drain` / `seal`.
348    ///
349    /// # Ordering contract
350    ///
351    /// **The guard must be dropped before `announce_ready` /
352    /// `announce_and_bind`.** While the guard is live, the heartbeat
353    /// thread is the sole writer to the control socket; interleaving
354    /// the main thread's `Ready` frame with a heartbeat would corrupt
355    /// the wire. The borrow of `&self` enforces this: the compiler
356    /// rejects any path that calls a consuming method while the guard
357    /// is still alive.
358    ///
359    /// # Failure mode
360    ///
361    /// On `try_clone` failure (rare; FD table exhausted) the returned
362    /// guard is inert — no thread spawned — and a warning is logged.
363    /// The supervisor's liveness timer then bounds init in wall-clock
364    /// terms exactly as before this API existed.
365    pub fn start_heartbeats(&self) -> HeartbeatGuard<'_> {
366        HeartbeatGuard::start(&self.control, self.proto_version)
367    }
368}
369
370/// RAII handle for a successor-side heartbeat thread. See
371/// [`BegunSuccessor::start_heartbeats`].
372pub struct HeartbeatGuard<'a> {
373    stop_tx: Option<mpsc::Sender<()>>,
374    thread: Option<thread::JoinHandle<()>>,
375    _borrow: PhantomData<&'a UnixStream>,
376}
377
378impl<'a> HeartbeatGuard<'a> {
379    fn start(stream: &'a UnixStream, chosen: ProtoVersion) -> Self {
380        let writer = match stream.try_clone() {
381            Ok(w) => w,
382            Err(e) => {
383                tracing::warn!(
384                    error = %e,
385                    "could not clone control stream for successor heartbeats; running without"
386                );
387                return Self {
388                    stop_tx: None,
389                    thread: None,
390                    _borrow: PhantomData,
391                };
392            }
393        };
394        let (stop_tx, stop_rx) = mpsc::channel::<()>();
395        let thread = thread::spawn(move || {
396            let mut writer = writer;
397            // `recv_timeout` returns Err on timeout — "no stop yet, send
398            // another heartbeat". `Ok(())` or any other Err (sender
399            // dropped) is the stop signal.
400            while stop_rx.recv_timeout(SUCCESSOR_HEARTBEAT_INTERVAL).is_err() {
401                let msg = Message::Heartbeat {
402                    ts_ms: now_unix_ms(),
403                };
404                if write_message(&mut writer, chosen, &msg).is_err() {
405                    // Supervisor gone or socket broken — no point continuing.
406                    return;
407                }
408            }
409        });
410        Self {
411            stop_tx: Some(stop_tx),
412            thread: Some(thread),
413            _borrow: PhantomData,
414        }
415    }
416}
417
418impl Drop for HeartbeatGuard<'_> {
419    fn drop(&mut self) {
420        if let Some(tx) = self.stop_tx.take() {
421            let _ = tx.send(());
422        }
423        if let Some(h) = self.thread.take() {
424            let _ = h.join();
425        }
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    // Env mutation is process-global; run env-touching tests sequentially in
434    // one function to avoid races with the cargo test thread pool.
435    #[test]
436    fn detect_env_branches() {
437        // SAFETY: env mutation is process-global and unsafe under
438        // concurrent reads. This whole test runs as a single function on
439        // one thread; no other code in the test process touches these
440        // handoff-specific vars, so there is no concurrent reader to race.
441        unsafe {
442            env::remove_var(ENV_HANDOFF_ROLE);
443            env::remove_var(ENV_HANDOFF_SOCK_FD);
444            env::remove_var(ENV_LISTEN_FDS);
445            env::remove_var(ENV_LISTEN_FDNAMES);
446        }
447        assert!(matches!(detect_role().unwrap(), Role::ColdStart { .. }));
448
449        // SAFETY: same single-threaded-test invariant as above.
450        unsafe {
451            env::set_var(ENV_HANDOFF_ROLE, "other");
452        }
453        assert!(matches!(detect_role().unwrap(), Role::ColdStart { .. }));
454        // SAFETY: same single-threaded-test invariant as above.
455        unsafe {
456            env::remove_var(ENV_HANDOFF_ROLE);
457        }
458    }
459}