Skip to main content

handoff/
supervisor.rs

1//! Supervisor-side orchestration: spawn the successor, drive the protocol,
2//! handle abort/resume.
3//!
4//! This module is sync. The `handoff-supervisor` reference binary wraps it in
5//! tokio for the rest of its orchestration; primitive embedders (guest-agent,
6//! beyond-pg) can run `perform_handoff` from a worker thread.
7
8// `FromRawFd` on the socketpair endpoints in `make_socketpair` is `unsafe`
9// because the safe wrapper assumes exclusive ownership of the FD; the
10// `socketpair(2)` syscall has just produced both FDs and handed us
11// ownership, so the invariant holds.
12#![allow(unsafe_code)]
13
14use std::io::ErrorKind;
15use std::os::fd::{AsRawFd, IntoRawFd, RawFd};
16use std::os::unix::net::UnixStream;
17use std::path::{Path, PathBuf};
18use std::process::{Child, Command, Stdio};
19use std::sync::Mutex;
20use std::time::{Duration, Instant};
21
22use nix::sys::socket::{AddressFamily, SockFlag, SockType, socketpair};
23
24use crate::crash::points;
25use crate::crash_here;
26use crate::error::{Error, Result};
27use crate::fd::pass_listener_fds_on_spawn;
28use crate::frame::{read_message, write_message};
29use crate::metrics::events;
30use crate::protocol::{
31    HandoffId, Message, PROTO_MAX, PROTO_MIN, ProtoVersion, Side, negotiate_version, short_name,
32};
33use crate::state::{Phase, StateJournal};
34use crate::util::now_unix_ms;
35
36/// Floor for any phase read timeout. Reads shorter than this are likely a
37/// programming error (no time left to even receive one frame).
38const MIN_READ_TIMEOUT: Duration = Duration::from_millis(100);
39/// Maximum time the supervisor will wait on any one socket read for a
40/// frame from a peer before declaring that peer dead. Each successful
41/// read — including a `Heartbeat` — resets this clock. The incumbent
42/// emits a heartbeat every 2s while blocked in `drain`/`seal`, so this
43/// gives a 5× margin against scheduler hiccups before falsely concluding
44/// the peer has died.
45const LIVENESS_TIMEOUT: Duration = Duration::from_secs(10);
46/// Bound on the initial `Hello` read after connecting to a peer. The peer
47/// writes `Hello` as its first action, so this should be near-instant —
48/// generous slack still bounds a stuck-peer scenario.
49const HELLO_READ_TIMEOUT: Duration = Duration::from_secs(5);
50/// Slack the supervisor adds on top of any peer-side deadline before
51/// declaring a read timed out.
52///
53/// The peer's clock for a phase starts when it deserializes our request —
54/// `T_s + δ_net`, where `T_s` is the supervisor's send time — and the
55/// response it produces still has to traverse `δ_net + δ_serialize` on the
56/// way back. Without slack, the supervisor's read deadline (which starts
57/// at `T_s`) expires `2·δ_net + δ_serialize` before a peer that ran to its
58/// budget can land its reply on the wire. The reply is in flight; the
59/// supervisor has already given up.
60///
61/// 1s is two orders of magnitude above realistic worst-case Unix-socket
62/// round-trip plus frame serialization, so this is a conservative bound
63/// on the in-flight window — not a knob to tune.
64const WIRE_SLACK: Duration = Duration::from_secs(1);
65
66/// One supervised primitive instance.
67pub struct Supervisor {
68    socket_path: PathBuf,
69    /// Listener FDs the successor inherits, keyed by logical name. Stored in
70    /// insertion order so FD assignment is stable across spawns.
71    listener_fds: Vec<(String, RawFd)>,
72    journal_path: Option<PathBuf>,
73    build_id: Vec<u8>,
74    /// Serializes `perform_handoff` calls so two threads can't drive
75    /// overlapping handoffs against the same incumbent (correctness invariant:
76    /// at most one in-flight swap per primitive).
77    in_flight: Mutex<()>,
78}
79
80#[derive(Debug, Clone)]
81pub struct SpawnSpec {
82    pub binary: PathBuf,
83    pub args: Vec<String>,
84    /// Extra env vars to pass to the child (in addition to the handoff envelope).
85    pub env: Vec<(String, String)>,
86    /// Absolute wall-clock cap on the entire handoff, from
87    /// `perform_handoff` start through `Commit` send. Sized to comfortably
88    /// exceed the consumer's `Drainable::drain` + `Drainable::seal` p99
89    /// under load — the library does not interrupt long-but-progressing
90    /// hooks (heartbeats from the incumbent keep the supervisor's
91    /// liveness clock fresh), but it will abort once this wall-clock cap
92    /// is exceeded regardless of progress.
93    ///
94    /// The supervisor's reads for `SealComplete` and `Ready` extend up to
95    /// `WIRE_SLACK` (1 s) past this cap to pick up replies that were
96    /// already on the wire when the deadline elapsed; total wall time can
97    /// therefore exceed `deadline` by up to that slack.
98    ///
99    /// Tuning guidance: set this to `p99(drain) + p99(seal) + 30s`.
100    /// Default: 5 minutes.
101    pub deadline: Duration,
102    /// Wall-clock cap on the `drain` phase specifically. Useful when the
103    /// consumer's `drain` has a known upper bound (e.g. drain timeout
104    /// configured per-connection) and a tighter cap is wanted than the
105    /// overall `deadline`. The supervisor's read for `Drained` extends up
106    /// to `WIRE_SLACK` (1 s) past this cap to pick up an in-flight reply.
107    /// Default: 60 seconds.
108    pub drain_grace: Duration,
109}
110
111impl SpawnSpec {
112    /// Build a spec for `binary` with the recommended defaults: 5 minute
113    /// overall `deadline`, 60 second `drain_grace`, empty `args` and `env`.
114    /// Mutate the returned value or build the struct directly when you need
115    /// non-default fields. `binary` is required because the library has no
116    /// useful fallback — accepting a default of `PathBuf::new()` would just
117    /// fail at `Command::spawn` later with a confusing OS error.
118    pub fn new(binary: impl Into<PathBuf>) -> Self {
119        Self {
120            binary: binary.into(),
121            args: Vec::new(),
122            env: Vec::new(),
123            deadline: Duration::from_secs(300),
124            drain_grace: Duration::from_secs(60),
125        }
126    }
127}
128
129#[derive(Debug)]
130pub struct HandoffOutcome {
131    pub handoff_id: HandoffId,
132    pub committed: bool,
133    pub abort_reason: Option<String>,
134    /// On a committed handoff, the `Child` for the new primitive (N). The
135    /// caller owns it from here — the supervisor relinquishes lifecycle
136    /// tracking. `None` on every non-committed outcome (the child was
137    /// killed and reaped before this struct was constructed).
138    pub child: Option<Child>,
139}
140
141/// Kills + reaps the wrapped child on drop unless `disarm()` was called.
142/// Ensures we don't leak a spawned successor if `perform_handoff` returns
143/// via an early `?` after the spawn. The pid is cached at construction so
144/// [`ChildGuard::id`] never panics — it remains readable after the inner
145/// `Child` has been taken by `disarm` or the drop path.
146struct ChildGuard {
147    child: Option<Child>,
148    pid: u32,
149}
150
151impl ChildGuard {
152    fn new(child: Child) -> Self {
153        let pid = child.id();
154        Self {
155            child: Some(child),
156            pid,
157        }
158    }
159
160    fn id(&self) -> u32 {
161        self.pid
162    }
163
164    /// Take the child without killing it. Used on the commit path: the new
165    /// primitive is now legitimately running and we hand it to the caller.
166    fn disarm(mut self) -> Child {
167        // `new` always populates `child` and `disarm` consumes `self` by
168        // value, so this take cannot observe `None`. Treat as an invariant.
169        self.child
170            .take()
171            .expect("BUG: ChildGuard inner Child missing — constructor invariant violated")
172    }
173
174    /// Kill + reap explicitly so the caller can log the result.
175    fn kill_and_reap(mut self) {
176        if let Some(mut c) = self.child.take() {
177            let _ = c.kill();
178            let _ = c.wait();
179        }
180    }
181}
182
183impl Drop for ChildGuard {
184    fn drop(&mut self) {
185        if let Some(mut c) = self.child.take() {
186            tracing::warn!(
187                pid = self.pid,
188                "killing leaked successor child on guard drop"
189            );
190            let _ = c.kill();
191            let _ = c.wait();
192        }
193    }
194}
195
196impl Supervisor {
197    pub fn new(socket_path: &Path) -> Result<Self> {
198        Ok(Self {
199            socket_path: socket_path.to_path_buf(),
200            listener_fds: Vec::new(),
201            journal_path: None,
202            build_id: Vec::new(),
203            in_flight: Mutex::new(()),
204        })
205    }
206
207    /// Register an inherited listener. The supervisor must keep the underlying
208    /// `TcpListener` alive elsewhere; this just records the raw FD and the
209    /// name to advertise via `LISTEN_FDNAMES`. Consuming-builder shape so it
210    /// composes with `with_journal` / `with_build_id`:
211    ///
212    /// ```ignore
213    /// let sup = Supervisor::new(path)?
214    ///     .with_listener("http", fd)
215    ///     .with_journal(journal_path);
216    /// ```
217    pub fn with_listener(mut self, name: impl Into<String>, fd: RawFd) -> Self {
218        self.listener_fds.push((name.into(), fd));
219        self
220    }
221
222    pub fn with_journal(mut self, path: PathBuf) -> Self {
223        self.journal_path = Some(path);
224        self
225    }
226
227    pub fn with_build_id(mut self, build_id: Vec<u8>) -> Self {
228        self.build_id = build_id;
229        self
230    }
231
232    pub fn perform_handoff(&self, spec: SpawnSpec) -> Result<HandoffOutcome> {
233        let _in_flight = self
234            .in_flight
235            .try_lock()
236            .map_err(|_| Error::HandoffInProgress)?;
237
238        let handoff_id = HandoffId::new();
239        let started_instant = Instant::now();
240        let started_unix_ms = now_unix_ms();
241        let total_deadline_at = started_instant + spec.deadline;
242
243        // 1. Connect to O.
244        let mut o_stream = UnixStream::connect(&self.socket_path)?;
245        let chosen_o =
246            self.exchange_hello_as_supervisor(&mut o_stream, handoff_id, Side::Incumbent, None)?;
247        crash_here!(points::S_AFTER_O_HELLO);
248
249        // 2. Create a socketpair for N's control channel.
250        let (s_end, n_end) = make_socketpair()?;
251        let n_end_raw = n_end.as_raw_fd();
252
253        // 3. Spawn N. We keep s_end; n_end becomes the child's HANDOFF_SOCK_FD.
254        let child = self.spawn_successor(&spec, n_end_raw)?;
255        let child_guard = ChildGuard::new(child);
256        let successor_pid = child_guard.id();
257        // Drop our parent-side copy of n_end now that the child has its own
258        // duplicate. The kernel keeps the socket alive via the child's FD.
259        drop(n_end);
260        crash_here!(points::S_AFTER_SPAWN_SUCCESSOR);
261
262        let mut n_stream = s_end;
263        // 4. Hello/HelloAck with N. Verify the child's announced PID matches
264        // the one we spawned.
265        let chosen_n = self.exchange_hello_as_supervisor(
266            &mut n_stream,
267            handoff_id,
268            Side::Successor,
269            Some(successor_pid),
270        )?;
271        crash_here!(points::S_AFTER_N_HELLO);
272
273        self.journal_set(
274            handoff_id,
275            Phase::Negotiating,
276            successor_pid,
277            started_unix_ms,
278        )?;
279
280        // 5. PrepareHandoff → Drained.
281        let prepare_at = Instant::now();
282        // Send the budget *remaining*, not the raw configured duration: the
283        // hello exchange and successor spawn have already burned wall-clock
284        // time off the overall deadline, and we want O's view of the
285        // deadline to track ours. Floored at MIN_READ_TIMEOUT so O still
286        // has a usable budget if we cut it very close.
287        let deadline_ms = remaining_until(total_deadline_at).as_millis() as u64;
288        let drain_grace_ms = spec.drain_grace.as_millis() as u64;
289        tracing::info!(
290            target: events::PREPARE,
291            %handoff_id, successor_pid,
292            drain_grace_ms,
293            deadline_ms,
294            "prepare handoff"
295        );
296        write_message(
297            &mut o_stream,
298            chosen_o,
299            &Message::PrepareHandoff {
300                handoff_id,
301                successor_pid,
302                deadline_ms,
303                drain_grace_ms,
304            },
305        )?;
306        crash_here!(points::S_AFTER_PREPARE_SENT);
307        // `drain_grace` is the budget we hand O for the entire drain
308        // phase: O may consume up to that long inside `Drainable::drain`
309        // before producing a `Drained` frame. Our read deadline must
310        // therefore exceed `drain_grace` by `WIRE_SLACK` — O's clock for
311        // the grace starts when it deserializes `PrepareHandoff`
312        // (T_s + δ_net), and the `Drained` it sends after running to that
313        // limit still has to traverse δ_net + δ_serialize on the way back.
314        // Per-recv liveness handles peer-dead detection inside
315        // `read_until`; the slack is specifically about not aborting a
316        // reply that's already on the wire.
317        let drained_msg = read_until(
318            &mut o_stream,
319            spec.drain_grace + WIRE_SLACK,
320            "Drained",
321            |m| matches!(m, Message::Drained { .. }),
322        )?;
323        let (drained_open_conns, drained_accept_closed) = match &drained_msg {
324            Message::Drained {
325                open_conns_remaining,
326                accept_closed,
327            } => (*open_conns_remaining, *accept_closed),
328            _ => unreachable!("read_until predicate restricts variant"),
329        };
330        tracing::info!(
331            target: events::DRAINED,
332            %handoff_id,
333            open_conns_remaining = drained_open_conns,
334            accept_closed = drained_accept_closed,
335            drain_seconds = prepare_at.elapsed().as_secs_f64(),
336            "drain complete"
337        );
338        crash_here!(points::S_AFTER_DRAINED_RECV);
339        self.journal_set(handoff_id, Phase::Draining, successor_pid, started_unix_ms)?;
340
341        // 6. SealRequest → SealComplete (or SealFailed).
342        let seal_at = Instant::now();
343        tracing::info!(target: events::SEAL, %handoff_id, "seal request");
344        write_message(
345            &mut o_stream,
346            chosen_o,
347            &Message::SealRequest { handoff_id },
348        )?;
349        crash_here!(points::S_AFTER_SEAL_REQUEST_SENT);
350        // Seal-wait loop. Same two-tier timeout as `read_until`: per-recv
351        // capped at LIVENESS_TIMEOUT (heartbeats reset it), wall-clock
352        // capped by `total_deadline_at + WIRE_SLACK`. The slack covers a
353        // `SealComplete` that's already on the wire when the overall
354        // deadline expires — `seal()` runs without an internal deadline,
355        // so it can land its reply arbitrarily close to the cap.
356        // `SealProgress` and `Heartbeat` frames are both progress signals.
357        let seal_read_deadline = total_deadline_at + WIRE_SLACK;
358        let seal_outcome: std::result::Result<(), String> = loop {
359            let now = Instant::now();
360            if now >= seal_read_deadline {
361                let _ = o_stream.set_read_timeout(None);
362                send_best_effort_abort(
363                    &mut n_stream,
364                    chosen_n,
365                    handoff_id,
366                    "seal phase timed out".into(),
367                );
368                child_guard.kill_and_reap();
369                self.journal_clear();
370                return Err(Error::Timeout("SealComplete"));
371            }
372            let remaining = seal_read_deadline - now;
373            let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT);
374            o_stream.set_read_timeout(Some(recv_timeout))?;
375            match read_message(&mut o_stream) {
376                Ok((_, Message::SealProgress { .. })) => continue,
377                Ok((_, Message::Heartbeat { .. })) => continue,
378                Ok((_, Message::SealComplete { handoff_id: id, .. })) if id == handoff_id => {
379                    break Ok(());
380                }
381                Ok((
382                    _,
383                    Message::SealFailed {
384                        handoff_id: id,
385                        error,
386                        ..
387                    },
388                )) if id == handoff_id => break Err(error),
389                Ok((_, other)) => {
390                    let _ = o_stream.set_read_timeout(None);
391                    return Err(Error::UnexpectedMessage(short_name(&other)));
392                }
393                Err(Error::Io(e)) if is_timeout(&e) => {
394                    // No frame for `recv_timeout` — peer has gone silent
395                    // for longer than LIVENESS_TIMEOUT (or we're at the
396                    // overall wall-clock cap; the next loop iteration
397                    // detects that explicitly). Treat as peer-dead and
398                    // abort.
399                    let _ = o_stream.set_read_timeout(None);
400                    send_best_effort_abort(
401                        &mut n_stream,
402                        chosen_n,
403                        handoff_id,
404                        "incumbent unresponsive during seal".into(),
405                    );
406                    child_guard.kill_and_reap();
407                    self.journal_clear();
408                    return Err(Error::Timeout("SealComplete"));
409                }
410                Err(e) => {
411                    let _ = o_stream.set_read_timeout(None);
412                    return Err(e);
413                }
414            }
415        };
416        let _ = o_stream.set_read_timeout(None);
417
418        if let Err(error) = seal_outcome {
419            // O retains its writer state. Abort N and report failure.
420            tracing::warn!(
421                target: events::ABORT,
422                %handoff_id, error = %error, "seal failed; aborting handoff"
423            );
424            send_best_effort_abort(
425                &mut n_stream,
426                chosen_n,
427                handoff_id,
428                format!("seal failed: {error}"),
429            );
430            child_guard.kill_and_reap();
431            self.journal_clear();
432            return Ok(HandoffOutcome {
433                handoff_id,
434                committed: false,
435                abort_reason: Some(format!("seal failed: {error}")),
436                child: None,
437            });
438        }
439        tracing::info!(
440            target: events::SEAL_COMPLETE,
441            %handoff_id,
442            seal_seconds = seal_at.elapsed().as_secs_f64(),
443            "seal complete; flock released by O"
444        );
445        crash_here!(points::S_AFTER_SEAL_COMPLETE_RECV);
446        self.journal_set(handoff_id, Phase::Sealing, successor_pid, started_unix_ms)?;
447
448        // 7. Begin → Ready (deadline-bounded).
449        //
450        // The Begin write and the Ready read are funneled through one
451        // `Result<Message>` so a failure at either step lands in the same
452        // abort/resume flow below — sending `ResumeAfterAbort` to O is the
453        // correct response in both cases (N is unreachable; O has sealed
454        // and must be told to keep serving).
455        let begin_at = Instant::now();
456        let ready_result =
457            match write_message(&mut n_stream, chosen_n, &Message::Begin { handoff_id }) {
458                Ok(()) => {
459                    crash_here!(points::S_AFTER_BEGIN_SENT);
460                    self.journal_set(
461                        handoff_id,
462                        Phase::AwaitingReady,
463                        successor_pid,
464                        started_unix_ms,
465                    )?;
466                    // N has no internal deadline for `announce_and_bind`,
467                    // so `Ready` can be in flight at the moment
468                    // `total_deadline_at` elapses — give the read
469                    // `WIRE_SLACK` past that cap for the same reason the
470                    // drain and seal reads do.
471                    let ready_timeout = remaining_until(total_deadline_at) + WIRE_SLACK;
472                    read_until(&mut n_stream, ready_timeout, "Ready", |m| {
473                        matches!(m, Message::Ready { .. })
474                    })
475                }
476                Err(e) => Err(e),
477            };
478
479        // The `Ready` match arm pulls the handoff_id apart so a mismatched id
480        // produces a precise error rather than the misleading
481        // "expected Ready, got Ready" string the generic `other` arm would
482        // emit when `short_name` was called on a `Message::Ready` with the
483        // wrong id.
484        let ready_result = match ready_result {
485            Ok(Message::Ready { handoff_id: id, .. }) if id != handoff_id => Err(Error::Protocol(
486                format!("Ready carries wrong handoff_id: got {id}, expected {handoff_id}"),
487            )),
488            other => other,
489        };
490
491        match ready_result {
492            Ok(Message::Ready {
493                handoff_id: id,
494                listening_on,
495                healthz_ok,
496                advertised_revision_per_shard,
497            }) if id == handoff_id => {
498                tracing::info!(
499                    target: events::READY,
500                    %handoff_id,
501                    healthz_ok,
502                    listeners = ?listening_on,
503                    advertised_revisions = ?advertised_revision_per_shard,
504                    begin_to_ready_seconds = begin_at.elapsed().as_secs_f64(),
505                    "successor ready"
506                );
507                crash_here!(points::S_AFTER_READY_RECV);
508                // N has acknowledged readiness — N has acquired the flock
509                // and (when using `announce_and_bind`) bound the control
510                // socket, so N is the authoritative new incumbent from
511                // this point on regardless of what happens with O.
512                //
513                // Disarm the guard *before* the Commit write so a dead-O
514                // write failure does not propagate via `?` and let the
515                // guard's Drop kill the legitimate new incumbent. Any
516                // I/O after this point is best-effort cleanup of O.
517                let child = child_guard.disarm();
518
519                // 8. Commit O.
520                tracing::info!(
521                    target: events::COMMIT,
522                    %handoff_id,
523                    total_seconds = started_instant.elapsed().as_secs_f64(),
524                    "commit"
525                );
526                if let Err(e) =
527                    write_message(&mut o_stream, chosen_o, &Message::Commit { handoff_id })
528                {
529                    tracing::warn!(
530                        %handoff_id, error = %e,
531                        "failed to send Commit to incumbent; O may have crashed — \
532                         N is the new incumbent regardless, handoff is committed"
533                    );
534                }
535                crash_here!(points::S_AFTER_COMMIT_SENT);
536                // Journal updates are best-effort here: the handoff is
537                // committed and N is serving. A failure to journal
538                // `Committed` means the next supervisor sees `AwaitingReady`
539                // and runs its standard recovery — also fine.
540                if let Err(e) =
541                    self.journal_set(handoff_id, Phase::Committed, successor_pid, started_unix_ms)
542                {
543                    tracing::warn!(%handoff_id, error = %e, "journal Committed failed");
544                }
545                self.journal_clear();
546                crash_here!(points::S_AFTER_JOURNAL_CLEAR);
547                Ok(HandoffOutcome {
548                    handoff_id,
549                    committed: true,
550                    abort_reason: None,
551                    child: Some(child),
552                })
553            }
554            other => {
555                let reason = match &other {
556                    Ok(m) => format!("expected Ready, got {}", short_name(m)),
557                    Err(Error::Timeout(s)) => format!("ready phase timed out waiting for {s}"),
558                    Err(e) => format!("ready read failed: {e}"),
559                };
560                tracing::warn!(
561                    target: events::ABORT,
562                    %handoff_id, reason, "aborting handoff before commit"
563                );
564                // Abort N, resume O.
565                send_best_effort_abort(&mut n_stream, chosen_n, handoff_id, reason.clone());
566                child_guard.kill_and_reap();
567                write_message(
568                    &mut o_stream,
569                    chosen_o,
570                    &Message::ResumeAfterAbort { handoff_id },
571                )?;
572                tracing::info!(
573                    target: events::RESUME,
574                    %handoff_id, "sent ResumeAfterAbort to O"
575                );
576                self.journal_set(
577                    handoff_id,
578                    Phase::ResumingAfterAbort,
579                    successor_pid,
580                    started_unix_ms,
581                )?;
582                self.journal_clear();
583                Ok(HandoffOutcome {
584                    handoff_id,
585                    committed: false,
586                    abort_reason: Some(reason),
587                    child: None,
588                })
589            }
590        }
591    }
592
593    /// Read any persisted in-flight handoff state and clear it. Call once
594    /// before the first `perform_handoff` after a supervisor restart.
595    ///
596    /// The current incumbent auto-recovers from disconnect (sealed → re-acquire
597    /// flock + resume; drained → resume), so the supervisor's restart-time
598    /// responsibility is bounded: confirm the incumbent is reachable and
599    /// clear the journal. Returns the persisted state (for logging) or `None`
600    /// if no journal exists.
601    pub fn resume_from_journal(&self) -> Result<Option<StateJournal>> {
602        let Some(path) = self.journal_path.as_deref() else {
603            return Ok(None);
604        };
605        let Some(journal) = StateJournal::read(path)? else {
606            return Ok(None);
607        };
608        tracing::warn!(
609            handoff_id = %journal.handoff_id,
610            phase = ?journal.phase,
611            "found prior handoff state on disk; verifying incumbent then clearing"
612        );
613
614        // Best-effort liveness probe of the incumbent. The incumbent runs its
615        // own EOF-disconnect recovery, so this just confirms we can reach it.
616        match UnixStream::connect(&self.socket_path) {
617            Ok(mut stream) => {
618                // Drain the incumbent's Hello frame so the new session is
619                // clean; then drop the connection — incumbent observes EOF
620                // and its session-close path runs. A read error here is not
621                // fatal (the EOF on drop still triggers the incumbent's
622                // session-close path) but we log it at debug so a corrupted
623                // Hello can be correlated post-mortem.
624                let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
625                if let Err(e) = read_message(&mut stream) {
626                    tracing::debug!(
627                        error = %e,
628                        "incumbent Hello read failed during journal resume probe; \
629                         continuing — EOF on drop will reset incumbent session"
630                    );
631                }
632            }
633            Err(e) => {
634                tracing::warn!(error = %e, "incumbent unreachable during journal resume");
635            }
636        }
637
638        StateJournal::delete(path)?;
639        Ok(Some(journal))
640    }
641
642    /// Run the Hello/HelloAck exchange where we are the supervisor: receive
643    /// the peer's Hello, send a HelloAck with our chosen version. If
644    /// `expected_pid` is set, the peer's announced pid must match.
645    fn exchange_hello_as_supervisor(
646        &self,
647        stream: &mut UnixStream,
648        handoff_id: HandoffId,
649        expected_role: Side,
650        expected_pid: Option<u32>,
651    ) -> Result<ProtoVersion> {
652        // Bound the initial Hello so a peer that accepted the connection but
653        // hangs before writing can't block `perform_handoff` indefinitely.
654        // Cleared after the read regardless of outcome.
655        stream.set_read_timeout(Some(HELLO_READ_TIMEOUT))?;
656        let read_result = read_message(stream);
657        let _ = stream.set_read_timeout(None);
658        let (_v, peer_hello) = match read_result {
659            Ok(x) => x,
660            Err(Error::Io(e)) if is_timeout(&e) => return Err(Error::Timeout("peer Hello")),
661            Err(e) => return Err(e),
662        };
663        let (their_role, their_pid, their_min, their_max) = match peer_hello {
664            Message::Hello {
665                role,
666                pid,
667                proto_min,
668                proto_max,
669                ..
670            } => (role, pid, proto_min, proto_max),
671            other => return Err(Error::UnexpectedMessage(short_name(&other))),
672        };
673        if their_role != expected_role {
674            return Err(Error::Protocol(format!(
675                "peer announced role {:?}, expected {:?}",
676                their_role, expected_role
677            )));
678        }
679        if let Some(expected) = expected_pid
680            && their_pid != expected
681        {
682            return Err(Error::PidMismatch {
683                expected,
684                announced: their_pid,
685            });
686        }
687        let chosen = negotiate_version(PROTO_MIN, PROTO_MAX, their_min, their_max)?;
688        write_message(
689            stream,
690            chosen,
691            &Message::HelloAck {
692                proto_version_chosen: chosen,
693                handoff_id,
694            },
695        )?;
696        Ok(chosen)
697    }
698
699    fn spawn_successor(&self, spec: &SpawnSpec, n_sock_fd: RawFd) -> Result<Child> {
700        let sock_target_fd = 3 + self.listener_fds.len() as RawFd;
701
702        let mut cmd = Command::new(&spec.binary);
703        cmd.args(&spec.args);
704        for (k, v) in &spec.env {
705            cmd.env(k, v);
706        }
707        cmd.env("HANDOFF_ROLE", "successor");
708        cmd.env("HANDOFF_SOCK_FD", sock_target_fd.to_string());
709        cmd.stdin(Stdio::inherit())
710            .stdout(Stdio::inherit())
711            .stderr(Stdio::inherit());
712
713        // LISTEN_FDS env + FD-shuffle dance. Control socket lands at FD
714        // `3 + listener_count` so the successor reads HANDOFF_SOCK_FD from
715        // its env and finds it there.
716        pass_listener_fds_on_spawn(&mut cmd, &self.listener_fds, Some(n_sock_fd));
717
718        let child = cmd.spawn()?;
719        Ok(child)
720    }
721
722    fn journal_set(
723        &self,
724        handoff_id: HandoffId,
725        phase: Phase,
726        successor_pid: u32,
727        started_at_unix_ms: u64,
728    ) -> Result<()> {
729        if let Some(path) = &self.journal_path {
730            StateJournal {
731                handoff_id,
732                phase,
733                incumbent_pid: std::process::id(),
734                successor_pid: Some(successor_pid),
735                started_at_unix_ms,
736            }
737            .write_atomic(path)?;
738        }
739        Ok(())
740    }
741
742    fn journal_clear(&self) {
743        if let Some(path) = &self.journal_path
744            && let Err(e) = StateJournal::delete(path)
745        {
746            tracing::warn!(
747                error = %e,
748                path = %path.display(),
749                "failed to clear handoff journal; next supervisor start will see stale state"
750            );
751        }
752    }
753}
754
755/// Best-effort send of an `Abort` to the successor on a path where we have
756/// already decided to tear N down (kill+reap follows). Logs at WARN if the
757/// write fails — N may have died or its socket may already be closed, which
758/// is expected on these paths, but a silent discard would hide cases where
759/// the supervisor's view diverges from the child's.
760fn send_best_effort_abort(
761    stream: &mut UnixStream,
762    version: ProtoVersion,
763    handoff_id: HandoffId,
764    reason: String,
765) {
766    let reason_for_log = reason.clone();
767    if let Err(e) = write_message(stream, version, &Message::Abort { handoff_id, reason }) {
768        tracing::warn!(
769            %handoff_id,
770            reason = %reason_for_log,
771            error = %e,
772            "best-effort Abort to successor failed; child will still be killed and reaped"
773        );
774    }
775}
776
777fn make_socketpair() -> Result<(UnixStream, UnixStream)> {
778    let (a, b) = socketpair(
779        AddressFamily::Unix,
780        SockType::Stream,
781        None,
782        SockFlag::SOCK_CLOEXEC,
783    )?;
784    // SAFETY: both ends are freshly owned by us, valid, non-blocking unset.
785    let s_a = unsafe {
786        use std::os::fd::FromRawFd;
787        UnixStream::from_raw_fd(a.into_raw_fd())
788    };
789    let s_b = unsafe {
790        use std::os::fd::FromRawFd;
791        UnixStream::from_raw_fd(b.into_raw_fd())
792    };
793    Ok((s_a, s_b))
794}
795
796/// Read messages from `stream` until `pred` matches, ignoring incoming
797/// `Heartbeat` and `SealProgress` frames (both are pure liveness/progress
798/// signals — never the message a caller is blocking on). Two timeouts
799/// apply:
800///
801/// - **Liveness** (per-recv): each socket read waits at most
802///   [`LIVENESS_TIMEOUT`]. Any incoming frame — including a `Heartbeat`
803///   — resets this clock. A peer that emits heartbeats every 2s while
804///   blocked in a long-running `Drainable::drain` or `Drainable::seal`
805///   is therefore *not* declared dead, no matter how long the hook
806///   takes.
807/// - **Wall-clock** (overall budget): regardless of heartbeats, the
808///   call returns `Error::Timeout(awaiting)` once the `timeout` budget
809///   has elapsed. This bounds catastrophic cases — consumer hook alive
810///   but not making progress — from running forever.
811///
812/// `awaiting` names the message the caller is blocking on; it surfaces
813/// in the timeout error so an operator's log identifies which protocol
814/// step expired (e.g. `Timeout("Drained")` vs `Timeout("Ready")`).
815fn read_until<F>(
816    stream: &mut UnixStream,
817    timeout: Duration,
818    awaiting: &'static str,
819    pred: F,
820) -> Result<Message>
821where
822    F: Fn(&Message) -> bool,
823{
824    let deadline = Instant::now() + timeout.max(MIN_READ_TIMEOUT);
825    loop {
826        let now = Instant::now();
827        if now >= deadline {
828            let _ = stream.set_read_timeout(None);
829            return Err(Error::Timeout(awaiting));
830        }
831        let remaining = deadline - now;
832        // Per-recv timeout: bounded above by LIVENESS_TIMEOUT (peer-dead
833        // detection), below by what's left of the overall budget. Each
834        // successful frame — heartbeat or otherwise — resets this on
835        // the next iteration.
836        let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT);
837        stream.set_read_timeout(Some(recv_timeout))?;
838        match read_message(stream) {
839            Ok((_, Message::Heartbeat { .. })) => continue,
840            Ok((_, Message::SealProgress { .. })) => continue,
841            Ok((_, msg)) if pred(&msg) => {
842                let _ = stream.set_read_timeout(None);
843                return Ok(msg);
844            }
845            Ok((_, other)) => {
846                let _ = stream.set_read_timeout(None);
847                return Err(Error::UnexpectedMessage(short_name(&other)));
848            }
849            Err(Error::Io(e)) if is_timeout(&e) => {
850                let _ = stream.set_read_timeout(None);
851                return Err(Error::Timeout(awaiting));
852            }
853            Err(e) => {
854                let _ = stream.set_read_timeout(None);
855                return Err(e);
856            }
857        }
858    }
859}
860
861fn remaining_until(deadline: Instant) -> Duration {
862    deadline
863        .checked_duration_since(Instant::now())
864        .unwrap_or(MIN_READ_TIMEOUT)
865        .max(MIN_READ_TIMEOUT)
866}
867
868fn is_timeout(e: &std::io::Error) -> bool {
869    matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut)
870}