handoff/supervisor.rs
1//! Supervisor-side orchestration: spawn the successor, drive the protocol,
2//! handle abort/resume.
3//!
4//! This module is sync. The `handoff-supervisor` reference binary wraps it in
5//! tokio for the rest of its orchestration; primitive embedders (guest-agent,
6//! beyond-pg) can run `perform_handoff` from a worker thread.
7
8// `FromRawFd` on the socketpair endpoints in `make_socketpair` is `unsafe`
9// because the safe wrapper assumes exclusive ownership of the FD; the
10// `socketpair(2)` syscall has just produced both FDs and handed us
11// ownership, so the invariant holds.
12#![allow(unsafe_code)]
13
14use std::io::ErrorKind;
15use std::os::fd::{AsRawFd, IntoRawFd, RawFd};
16use std::os::unix::net::UnixStream;
17use std::path::{Path, PathBuf};
18use std::process::{Child, Command, Stdio};
19use std::sync::Mutex;
20use std::time::{Duration, Instant};
21
22use nix::sys::socket::{AddressFamily, SockFlag, SockType, socketpair};
23
24use crate::crash::points;
25use crate::crash_here;
26use crate::error::{Error, Result};
27use crate::fd::pass_listener_fds_on_spawn;
28use crate::frame::{read_message, write_message};
29use crate::metrics::events;
30use crate::protocol::{
31 HandoffId, Message, PROTO_MAX, PROTO_MIN, ProtoVersion, Side, negotiate_version, short_name,
32};
33use crate::state::{Phase, StateJournal};
34use crate::util::now_unix_ms;
35
36/// Floor for any phase read timeout. Reads shorter than this are likely a
37/// programming error (no time left to even receive one frame).
38const MIN_READ_TIMEOUT: Duration = Duration::from_millis(100);
39/// Maximum time the supervisor will wait on any one socket read for a
40/// frame from a peer before declaring that peer dead. Each successful
41/// read — including a `Heartbeat` — resets this clock. The incumbent
42/// emits a heartbeat every 2s while blocked in `drain`/`seal`, so this
43/// gives a 5× margin against scheduler hiccups before falsely concluding
44/// the peer has died.
45const LIVENESS_TIMEOUT: Duration = Duration::from_secs(10);
46/// Bound on the initial `Hello` read after connecting to a peer. The peer
47/// writes `Hello` as its first action, so this should be near-instant —
48/// generous slack still bounds a stuck-peer scenario.
49const HELLO_READ_TIMEOUT: Duration = Duration::from_secs(5);
50/// Slack the supervisor adds on top of any peer-side deadline before
51/// declaring a read timed out.
52///
53/// The peer's clock for a phase starts when it deserializes our request —
54/// `T_s + δ_net`, where `T_s` is the supervisor's send time — and the
55/// response it produces still has to traverse `δ_net + δ_serialize` on the
56/// way back. Without slack, the supervisor's read deadline (which starts
57/// at `T_s`) expires `2·δ_net + δ_serialize` before a peer that ran to its
58/// budget can land its reply on the wire. The reply is in flight; the
59/// supervisor has already given up.
60///
61/// 1s is two orders of magnitude above realistic worst-case Unix-socket
62/// round-trip plus frame serialization, so this is a conservative bound
63/// on the in-flight window — not a knob to tune.
64const WIRE_SLACK: Duration = Duration::from_secs(1);
65
66/// One supervised primitive instance.
67pub struct Supervisor {
68 socket_path: PathBuf,
69 /// Listener FDs the successor inherits, keyed by logical name. Stored in
70 /// insertion order so FD assignment is stable across spawns.
71 listener_fds: Vec<(String, RawFd)>,
72 journal_path: Option<PathBuf>,
73 build_id: Vec<u8>,
74 /// Serializes `perform_handoff` calls so two threads can't drive
75 /// overlapping handoffs against the same incumbent (correctness invariant:
76 /// at most one in-flight swap per primitive).
77 in_flight: Mutex<()>,
78}
79
80#[derive(Debug, Clone)]
81pub struct SpawnSpec {
82 pub binary: PathBuf,
83 pub args: Vec<String>,
84 /// Extra env vars to pass to the child (in addition to the handoff envelope).
85 pub env: Vec<(String, String)>,
86 /// Absolute wall-clock cap on the entire handoff, from
87 /// `perform_handoff` start through `Commit` send. Sized to comfortably
88 /// exceed the consumer's `Drainable::drain` + `Drainable::seal` p99
89 /// under load — the library does not interrupt long-but-progressing
90 /// hooks (heartbeats from the incumbent keep the supervisor's
91 /// liveness clock fresh), but it will abort once this wall-clock cap
92 /// is exceeded regardless of progress.
93 ///
94 /// The supervisor's reads for `SealComplete` and `Ready` extend up to
95 /// `WIRE_SLACK` (1 s) past this cap to pick up replies that were
96 /// already on the wire when the deadline elapsed; total wall time can
97 /// therefore exceed `deadline` by up to that slack.
98 ///
99 /// Tuning guidance: set this to `p99(drain) + p99(seal) + 30s`.
100 /// Default: 5 minutes.
101 pub deadline: Duration,
102 /// Wall-clock cap on the `drain` phase specifically. Useful when the
103 /// consumer's `drain` has a known upper bound (e.g. drain timeout
104 /// configured per-connection) and a tighter cap is wanted than the
105 /// overall `deadline`. The supervisor's read for `Drained` extends up
106 /// to `WIRE_SLACK` (1 s) past this cap to pick up an in-flight reply.
107 /// Default: 60 seconds.
108 pub drain_grace: Duration,
109}
110
111impl SpawnSpec {
112 /// Build a spec for `binary` with the recommended defaults: 5 minute
113 /// overall `deadline`, 60 second `drain_grace`, empty `args` and `env`.
114 /// Mutate the returned value or build the struct directly when you need
115 /// non-default fields. `binary` is required because the library has no
116 /// useful fallback — accepting a default of `PathBuf::new()` would just
117 /// fail at `Command::spawn` later with a confusing OS error.
118 pub fn new(binary: impl Into<PathBuf>) -> Self {
119 Self {
120 binary: binary.into(),
121 args: Vec::new(),
122 env: Vec::new(),
123 deadline: Duration::from_secs(300),
124 drain_grace: Duration::from_secs(60),
125 }
126 }
127}
128
129#[derive(Debug)]
130pub struct HandoffOutcome {
131 pub handoff_id: HandoffId,
132 pub committed: bool,
133 pub abort_reason: Option<String>,
134 /// On a committed handoff, the `Child` for the new primitive (N). The
135 /// caller owns it from here — the supervisor relinquishes lifecycle
136 /// tracking. `None` on every non-committed outcome (the child was
137 /// killed and reaped before this struct was constructed).
138 pub child: Option<Child>,
139}
140
141/// Kills + reaps the wrapped child on drop unless `disarm()` was called.
142/// Ensures we don't leak a spawned successor if `perform_handoff` returns
143/// via an early `?` after the spawn. The pid is cached at construction so
144/// [`ChildGuard::id`] never panics — it remains readable after the inner
145/// `Child` has been taken by `disarm` or the drop path.
146struct ChildGuard {
147 child: Option<Child>,
148 pid: u32,
149}
150
151impl ChildGuard {
152 fn new(child: Child) -> Self {
153 let pid = child.id();
154 Self {
155 child: Some(child),
156 pid,
157 }
158 }
159
160 fn id(&self) -> u32 {
161 self.pid
162 }
163
164 /// Take the child without killing it. Used on the commit path: the new
165 /// primitive is now legitimately running and we hand it to the caller.
166 fn disarm(mut self) -> Child {
167 // `new` always populates `child` and `disarm` consumes `self` by
168 // value, so this take cannot observe `None`. Treat as an invariant.
169 self.child
170 .take()
171 .expect("BUG: ChildGuard inner Child missing — constructor invariant violated")
172 }
173
174 /// Kill + reap explicitly so the caller can log the result.
175 fn kill_and_reap(mut self) {
176 if let Some(mut c) = self.child.take() {
177 let _ = c.kill();
178 let _ = c.wait();
179 }
180 }
181}
182
183impl Drop for ChildGuard {
184 fn drop(&mut self) {
185 if let Some(mut c) = self.child.take() {
186 tracing::warn!(
187 pid = self.pid,
188 "killing leaked successor child on guard drop"
189 );
190 let _ = c.kill();
191 let _ = c.wait();
192 }
193 }
194}
195
196impl Supervisor {
197 pub fn new(socket_path: &Path) -> Result<Self> {
198 Ok(Self {
199 socket_path: socket_path.to_path_buf(),
200 listener_fds: Vec::new(),
201 journal_path: None,
202 build_id: Vec::new(),
203 in_flight: Mutex::new(()),
204 })
205 }
206
207 /// Register an inherited listener. The supervisor must keep the underlying
208 /// `TcpListener` alive elsewhere; this just records the raw FD and the
209 /// name to advertise via `LISTEN_FDNAMES`. Consuming-builder shape so it
210 /// composes with `with_journal` / `with_build_id`:
211 ///
212 /// ```ignore
213 /// let sup = Supervisor::new(path)?
214 /// .with_listener("http", fd)
215 /// .with_journal(journal_path);
216 /// ```
217 pub fn with_listener(mut self, name: impl Into<String>, fd: RawFd) -> Self {
218 self.listener_fds.push((name.into(), fd));
219 self
220 }
221
222 pub fn with_journal(mut self, path: PathBuf) -> Self {
223 self.journal_path = Some(path);
224 self
225 }
226
227 pub fn with_build_id(mut self, build_id: Vec<u8>) -> Self {
228 self.build_id = build_id;
229 self
230 }
231
232 pub fn perform_handoff(&self, spec: SpawnSpec) -> Result<HandoffOutcome> {
233 let _in_flight = self
234 .in_flight
235 .try_lock()
236 .map_err(|_| Error::HandoffInProgress)?;
237
238 let handoff_id = HandoffId::new();
239 let started_instant = Instant::now();
240 let started_unix_ms = now_unix_ms();
241 let total_deadline_at = started_instant + spec.deadline;
242
243 // 1. Connect to O.
244 let mut o_stream = UnixStream::connect(&self.socket_path)?;
245 let chosen_o =
246 self.exchange_hello_as_supervisor(&mut o_stream, handoff_id, Side::Incumbent, None)?;
247 crash_here!(points::S_AFTER_O_HELLO);
248
249 // 2. Create a socketpair for N's control channel.
250 let (s_end, n_end) = make_socketpair()?;
251 let n_end_raw = n_end.as_raw_fd();
252
253 // 3. Spawn N. We keep s_end; n_end becomes the child's HANDOFF_SOCK_FD.
254 let child = self.spawn_successor(&spec, n_end_raw)?;
255 let child_guard = ChildGuard::new(child);
256 let successor_pid = child_guard.id();
257 // Drop our parent-side copy of n_end now that the child has its own
258 // duplicate. The kernel keeps the socket alive via the child's FD.
259 drop(n_end);
260 crash_here!(points::S_AFTER_SPAWN_SUCCESSOR);
261
262 let mut n_stream = s_end;
263 // 4. Hello/HelloAck with N. Verify the child's announced PID matches
264 // the one we spawned.
265 let chosen_n = self.exchange_hello_as_supervisor(
266 &mut n_stream,
267 handoff_id,
268 Side::Successor,
269 Some(successor_pid),
270 )?;
271 crash_here!(points::S_AFTER_N_HELLO);
272
273 self.journal_set(
274 handoff_id,
275 Phase::Negotiating,
276 successor_pid,
277 started_unix_ms,
278 )?;
279
280 // 5. PrepareHandoff → Drained.
281 let prepare_at = Instant::now();
282 // Send the budget *remaining*, not the raw configured duration: the
283 // hello exchange and successor spawn have already burned wall-clock
284 // time off the overall deadline, and we want O's view of the
285 // deadline to track ours. Floored at MIN_READ_TIMEOUT so O still
286 // has a usable budget if we cut it very close.
287 let deadline_ms = remaining_until(total_deadline_at).as_millis() as u64;
288 let drain_grace_ms = spec.drain_grace.as_millis() as u64;
289 tracing::info!(
290 target: events::PREPARE,
291 %handoff_id, successor_pid,
292 drain_grace_ms,
293 deadline_ms,
294 "prepare handoff"
295 );
296 write_message(
297 &mut o_stream,
298 chosen_o,
299 &Message::PrepareHandoff {
300 handoff_id,
301 successor_pid,
302 deadline_ms,
303 drain_grace_ms,
304 },
305 )?;
306 crash_here!(points::S_AFTER_PREPARE_SENT);
307 // `drain_grace` is the budget we hand O for the entire drain
308 // phase: O may consume up to that long inside `Drainable::drain`
309 // before producing a `Drained` frame. Our read deadline must
310 // therefore exceed `drain_grace` by `WIRE_SLACK` — O's clock for
311 // the grace starts when it deserializes `PrepareHandoff`
312 // (T_s + δ_net), and the `Drained` it sends after running to that
313 // limit still has to traverse δ_net + δ_serialize on the way back.
314 // Per-recv liveness handles peer-dead detection inside
315 // `read_until`; the slack is specifically about not aborting a
316 // reply that's already on the wire.
317 let drained_msg = read_until(
318 &mut o_stream,
319 spec.drain_grace + WIRE_SLACK,
320 "Drained",
321 |m| matches!(m, Message::Drained { .. }),
322 )?;
323 let (drained_open_conns, drained_accept_closed) = match &drained_msg {
324 Message::Drained {
325 open_conns_remaining,
326 accept_closed,
327 } => (*open_conns_remaining, *accept_closed),
328 _ => unreachable!("read_until predicate restricts variant"),
329 };
330 tracing::info!(
331 target: events::DRAINED,
332 %handoff_id,
333 open_conns_remaining = drained_open_conns,
334 accept_closed = drained_accept_closed,
335 drain_seconds = prepare_at.elapsed().as_secs_f64(),
336 "drain complete"
337 );
338 crash_here!(points::S_AFTER_DRAINED_RECV);
339 self.journal_set(handoff_id, Phase::Draining, successor_pid, started_unix_ms)?;
340
341 // 6. SealRequest → SealComplete (or SealFailed).
342 let seal_at = Instant::now();
343 tracing::info!(target: events::SEAL, %handoff_id, "seal request");
344 write_message(
345 &mut o_stream,
346 chosen_o,
347 &Message::SealRequest { handoff_id },
348 )?;
349 crash_here!(points::S_AFTER_SEAL_REQUEST_SENT);
350 // Seal-wait loop. Same two-tier timeout as `read_until`: per-recv
351 // capped at LIVENESS_TIMEOUT (heartbeats reset it), wall-clock
352 // capped by `total_deadline_at + WIRE_SLACK`. The slack covers a
353 // `SealComplete` that's already on the wire when the overall
354 // deadline expires — `seal()` runs without an internal deadline,
355 // so it can land its reply arbitrarily close to the cap.
356 // `SealProgress` and `Heartbeat` frames are both progress signals.
357 let seal_read_deadline = total_deadline_at + WIRE_SLACK;
358 let seal_outcome: std::result::Result<(), String> = loop {
359 let now = Instant::now();
360 if now >= seal_read_deadline {
361 let _ = o_stream.set_read_timeout(None);
362 send_best_effort_abort(
363 &mut n_stream,
364 chosen_n,
365 handoff_id,
366 "seal phase timed out".into(),
367 );
368 child_guard.kill_and_reap();
369 self.journal_clear();
370 return Err(Error::Timeout("SealComplete"));
371 }
372 let remaining = seal_read_deadline - now;
373 let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT);
374 o_stream.set_read_timeout(Some(recv_timeout))?;
375 match read_message(&mut o_stream) {
376 Ok((_, Message::SealProgress { .. })) => continue,
377 Ok((_, Message::Heartbeat { .. })) => continue,
378 Ok((_, Message::SealComplete { handoff_id: id, .. })) if id == handoff_id => {
379 break Ok(());
380 }
381 Ok((
382 _,
383 Message::SealFailed {
384 handoff_id: id,
385 error,
386 ..
387 },
388 )) if id == handoff_id => break Err(error),
389 Ok((_, other)) => {
390 let _ = o_stream.set_read_timeout(None);
391 return Err(Error::UnexpectedMessage(short_name(&other)));
392 }
393 Err(Error::Io(e)) if is_timeout(&e) => {
394 // No frame for `recv_timeout` — peer has gone silent
395 // for longer than LIVENESS_TIMEOUT (or we're at the
396 // overall wall-clock cap; the next loop iteration
397 // detects that explicitly). Treat as peer-dead and
398 // abort.
399 let _ = o_stream.set_read_timeout(None);
400 send_best_effort_abort(
401 &mut n_stream,
402 chosen_n,
403 handoff_id,
404 "incumbent unresponsive during seal".into(),
405 );
406 child_guard.kill_and_reap();
407 self.journal_clear();
408 return Err(Error::Timeout("SealComplete"));
409 }
410 Err(e) => {
411 let _ = o_stream.set_read_timeout(None);
412 return Err(e);
413 }
414 }
415 };
416 let _ = o_stream.set_read_timeout(None);
417
418 if let Err(error) = seal_outcome {
419 // O retains its writer state. Abort N and report failure.
420 tracing::warn!(
421 target: events::ABORT,
422 %handoff_id, error = %error, "seal failed; aborting handoff"
423 );
424 send_best_effort_abort(
425 &mut n_stream,
426 chosen_n,
427 handoff_id,
428 format!("seal failed: {error}"),
429 );
430 child_guard.kill_and_reap();
431 self.journal_clear();
432 return Ok(HandoffOutcome {
433 handoff_id,
434 committed: false,
435 abort_reason: Some(format!("seal failed: {error}")),
436 child: None,
437 });
438 }
439 tracing::info!(
440 target: events::SEAL_COMPLETE,
441 %handoff_id,
442 seal_seconds = seal_at.elapsed().as_secs_f64(),
443 "seal complete; flock released by O"
444 );
445 crash_here!(points::S_AFTER_SEAL_COMPLETE_RECV);
446 self.journal_set(handoff_id, Phase::Sealing, successor_pid, started_unix_ms)?;
447
448 // 7. Begin → Ready (deadline-bounded).
449 //
450 // The Begin write and the Ready read are funneled through one
451 // `Result<Message>` so a failure at either step lands in the same
452 // abort/resume flow below — sending `ResumeAfterAbort` to O is the
453 // correct response in both cases (N is unreachable; O has sealed
454 // and must be told to keep serving).
455 let begin_at = Instant::now();
456 let ready_result =
457 match write_message(&mut n_stream, chosen_n, &Message::Begin { handoff_id }) {
458 Ok(()) => {
459 crash_here!(points::S_AFTER_BEGIN_SENT);
460 self.journal_set(
461 handoff_id,
462 Phase::AwaitingReady,
463 successor_pid,
464 started_unix_ms,
465 )?;
466 // N has no internal deadline for `announce_and_bind`,
467 // so `Ready` can be in flight at the moment
468 // `total_deadline_at` elapses — give the read
469 // `WIRE_SLACK` past that cap for the same reason the
470 // drain and seal reads do.
471 let ready_timeout = remaining_until(total_deadline_at) + WIRE_SLACK;
472 read_until(&mut n_stream, ready_timeout, "Ready", |m| {
473 matches!(m, Message::Ready { .. })
474 })
475 }
476 Err(e) => Err(e),
477 };
478
479 // The `Ready` match arm pulls the handoff_id apart so a mismatched id
480 // produces a precise error rather than the misleading
481 // "expected Ready, got Ready" string the generic `other` arm would
482 // emit when `short_name` was called on a `Message::Ready` with the
483 // wrong id.
484 let ready_result = match ready_result {
485 Ok(Message::Ready { handoff_id: id, .. }) if id != handoff_id => Err(Error::Protocol(
486 format!("Ready carries wrong handoff_id: got {id}, expected {handoff_id}"),
487 )),
488 other => other,
489 };
490
491 match ready_result {
492 Ok(Message::Ready {
493 handoff_id: id,
494 listening_on,
495 healthz_ok,
496 advertised_revision_per_shard,
497 }) if id == handoff_id => {
498 tracing::info!(
499 target: events::READY,
500 %handoff_id,
501 healthz_ok,
502 listeners = ?listening_on,
503 advertised_revisions = ?advertised_revision_per_shard,
504 begin_to_ready_seconds = begin_at.elapsed().as_secs_f64(),
505 "successor ready"
506 );
507 crash_here!(points::S_AFTER_READY_RECV);
508 // N has acknowledged readiness — N has acquired the flock
509 // and (when using `announce_and_bind`) bound the control
510 // socket, so N is the authoritative new incumbent from
511 // this point on regardless of what happens with O.
512 //
513 // Disarm the guard *before* the Commit write so a dead-O
514 // write failure does not propagate via `?` and let the
515 // guard's Drop kill the legitimate new incumbent. Any
516 // I/O after this point is best-effort cleanup of O.
517 let child = child_guard.disarm();
518
519 // 8. Commit O.
520 tracing::info!(
521 target: events::COMMIT,
522 %handoff_id,
523 total_seconds = started_instant.elapsed().as_secs_f64(),
524 "commit"
525 );
526 if let Err(e) =
527 write_message(&mut o_stream, chosen_o, &Message::Commit { handoff_id })
528 {
529 tracing::warn!(
530 %handoff_id, error = %e,
531 "failed to send Commit to incumbent; O may have crashed — \
532 N is the new incumbent regardless, handoff is committed"
533 );
534 }
535 crash_here!(points::S_AFTER_COMMIT_SENT);
536 // Journal updates are best-effort here: the handoff is
537 // committed and N is serving. A failure to journal
538 // `Committed` means the next supervisor sees `AwaitingReady`
539 // and runs its standard recovery — also fine.
540 if let Err(e) =
541 self.journal_set(handoff_id, Phase::Committed, successor_pid, started_unix_ms)
542 {
543 tracing::warn!(%handoff_id, error = %e, "journal Committed failed");
544 }
545 self.journal_clear();
546 crash_here!(points::S_AFTER_JOURNAL_CLEAR);
547 Ok(HandoffOutcome {
548 handoff_id,
549 committed: true,
550 abort_reason: None,
551 child: Some(child),
552 })
553 }
554 other => {
555 let reason = match &other {
556 Ok(m) => format!("expected Ready, got {}", short_name(m)),
557 Err(Error::Timeout(s)) => format!("ready phase timed out waiting for {s}"),
558 Err(e) => format!("ready read failed: {e}"),
559 };
560 tracing::warn!(
561 target: events::ABORT,
562 %handoff_id, reason, "aborting handoff before commit"
563 );
564 // Abort N, resume O.
565 send_best_effort_abort(&mut n_stream, chosen_n, handoff_id, reason.clone());
566 child_guard.kill_and_reap();
567 write_message(
568 &mut o_stream,
569 chosen_o,
570 &Message::ResumeAfterAbort { handoff_id },
571 )?;
572 tracing::info!(
573 target: events::RESUME,
574 %handoff_id, "sent ResumeAfterAbort to O"
575 );
576 self.journal_set(
577 handoff_id,
578 Phase::ResumingAfterAbort,
579 successor_pid,
580 started_unix_ms,
581 )?;
582 self.journal_clear();
583 Ok(HandoffOutcome {
584 handoff_id,
585 committed: false,
586 abort_reason: Some(reason),
587 child: None,
588 })
589 }
590 }
591 }
592
593 /// Read any persisted in-flight handoff state and clear it. Call once
594 /// before the first `perform_handoff` after a supervisor restart.
595 ///
596 /// The current incumbent auto-recovers from disconnect (sealed → re-acquire
597 /// flock + resume; drained → resume), so the supervisor's restart-time
598 /// responsibility is bounded: confirm the incumbent is reachable and
599 /// clear the journal. Returns the persisted state (for logging) or `None`
600 /// if no journal exists.
601 pub fn resume_from_journal(&self) -> Result<Option<StateJournal>> {
602 let Some(path) = self.journal_path.as_deref() else {
603 return Ok(None);
604 };
605 let Some(journal) = StateJournal::read(path)? else {
606 return Ok(None);
607 };
608 tracing::warn!(
609 handoff_id = %journal.handoff_id,
610 phase = ?journal.phase,
611 "found prior handoff state on disk; verifying incumbent then clearing"
612 );
613
614 // Best-effort liveness probe of the incumbent. The incumbent runs its
615 // own EOF-disconnect recovery, so this just confirms we can reach it.
616 match UnixStream::connect(&self.socket_path) {
617 Ok(mut stream) => {
618 // Drain the incumbent's Hello frame so the new session is
619 // clean; then drop the connection — incumbent observes EOF
620 // and its session-close path runs. A read error here is not
621 // fatal (the EOF on drop still triggers the incumbent's
622 // session-close path) but we log it at debug so a corrupted
623 // Hello can be correlated post-mortem.
624 let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
625 if let Err(e) = read_message(&mut stream) {
626 tracing::debug!(
627 error = %e,
628 "incumbent Hello read failed during journal resume probe; \
629 continuing — EOF on drop will reset incumbent session"
630 );
631 }
632 }
633 Err(e) => {
634 tracing::warn!(error = %e, "incumbent unreachable during journal resume");
635 }
636 }
637
638 StateJournal::delete(path)?;
639 Ok(Some(journal))
640 }
641
642 /// Run the Hello/HelloAck exchange where we are the supervisor: receive
643 /// the peer's Hello, send a HelloAck with our chosen version. If
644 /// `expected_pid` is set, the peer's announced pid must match.
645 fn exchange_hello_as_supervisor(
646 &self,
647 stream: &mut UnixStream,
648 handoff_id: HandoffId,
649 expected_role: Side,
650 expected_pid: Option<u32>,
651 ) -> Result<ProtoVersion> {
652 // Bound the initial Hello so a peer that accepted the connection but
653 // hangs before writing can't block `perform_handoff` indefinitely.
654 // Cleared after the read regardless of outcome.
655 stream.set_read_timeout(Some(HELLO_READ_TIMEOUT))?;
656 let read_result = read_message(stream);
657 let _ = stream.set_read_timeout(None);
658 let (_v, peer_hello) = match read_result {
659 Ok(x) => x,
660 Err(Error::Io(e)) if is_timeout(&e) => return Err(Error::Timeout("peer Hello")),
661 Err(e) => return Err(e),
662 };
663 let (their_role, their_pid, their_min, their_max) = match peer_hello {
664 Message::Hello {
665 role,
666 pid,
667 proto_min,
668 proto_max,
669 ..
670 } => (role, pid, proto_min, proto_max),
671 other => return Err(Error::UnexpectedMessage(short_name(&other))),
672 };
673 if their_role != expected_role {
674 return Err(Error::Protocol(format!(
675 "peer announced role {:?}, expected {:?}",
676 their_role, expected_role
677 )));
678 }
679 if let Some(expected) = expected_pid
680 && their_pid != expected
681 {
682 return Err(Error::PidMismatch {
683 expected,
684 announced: their_pid,
685 });
686 }
687 let chosen = negotiate_version(PROTO_MIN, PROTO_MAX, their_min, their_max)?;
688 write_message(
689 stream,
690 chosen,
691 &Message::HelloAck {
692 proto_version_chosen: chosen,
693 handoff_id,
694 },
695 )?;
696 Ok(chosen)
697 }
698
699 fn spawn_successor(&self, spec: &SpawnSpec, n_sock_fd: RawFd) -> Result<Child> {
700 let sock_target_fd = 3 + self.listener_fds.len() as RawFd;
701
702 let mut cmd = Command::new(&spec.binary);
703 cmd.args(&spec.args);
704 for (k, v) in &spec.env {
705 cmd.env(k, v);
706 }
707 cmd.env("HANDOFF_ROLE", "successor");
708 cmd.env("HANDOFF_SOCK_FD", sock_target_fd.to_string());
709 cmd.stdin(Stdio::inherit())
710 .stdout(Stdio::inherit())
711 .stderr(Stdio::inherit());
712
713 // LISTEN_FDS env + FD-shuffle dance. Control socket lands at FD
714 // `3 + listener_count` so the successor reads HANDOFF_SOCK_FD from
715 // its env and finds it there.
716 pass_listener_fds_on_spawn(&mut cmd, &self.listener_fds, Some(n_sock_fd));
717
718 let child = cmd.spawn()?;
719 Ok(child)
720 }
721
722 fn journal_set(
723 &self,
724 handoff_id: HandoffId,
725 phase: Phase,
726 successor_pid: u32,
727 started_at_unix_ms: u64,
728 ) -> Result<()> {
729 if let Some(path) = &self.journal_path {
730 StateJournal {
731 handoff_id,
732 phase,
733 incumbent_pid: std::process::id(),
734 successor_pid: Some(successor_pid),
735 started_at_unix_ms,
736 }
737 .write_atomic(path)?;
738 }
739 Ok(())
740 }
741
742 fn journal_clear(&self) {
743 if let Some(path) = &self.journal_path
744 && let Err(e) = StateJournal::delete(path)
745 {
746 tracing::warn!(
747 error = %e,
748 path = %path.display(),
749 "failed to clear handoff journal; next supervisor start will see stale state"
750 );
751 }
752 }
753}
754
755/// Best-effort send of an `Abort` to the successor on a path where we have
756/// already decided to tear N down (kill+reap follows). Logs at WARN if the
757/// write fails — N may have died or its socket may already be closed, which
758/// is expected on these paths, but a silent discard would hide cases where
759/// the supervisor's view diverges from the child's.
760fn send_best_effort_abort(
761 stream: &mut UnixStream,
762 version: ProtoVersion,
763 handoff_id: HandoffId,
764 reason: String,
765) {
766 let reason_for_log = reason.clone();
767 if let Err(e) = write_message(stream, version, &Message::Abort { handoff_id, reason }) {
768 tracing::warn!(
769 %handoff_id,
770 reason = %reason_for_log,
771 error = %e,
772 "best-effort Abort to successor failed; child will still be killed and reaped"
773 );
774 }
775}
776
777fn make_socketpair() -> Result<(UnixStream, UnixStream)> {
778 let (a, b) = socketpair(
779 AddressFamily::Unix,
780 SockType::Stream,
781 None,
782 SockFlag::SOCK_CLOEXEC,
783 )?;
784 // SAFETY: both ends are freshly owned by us, valid, non-blocking unset.
785 let s_a = unsafe {
786 use std::os::fd::FromRawFd;
787 UnixStream::from_raw_fd(a.into_raw_fd())
788 };
789 let s_b = unsafe {
790 use std::os::fd::FromRawFd;
791 UnixStream::from_raw_fd(b.into_raw_fd())
792 };
793 Ok((s_a, s_b))
794}
795
796/// Read messages from `stream` until `pred` matches, ignoring incoming
797/// `Heartbeat` and `SealProgress` frames (both are pure liveness/progress
798/// signals — never the message a caller is blocking on). Two timeouts
799/// apply:
800///
801/// - **Liveness** (per-recv): each socket read waits at most
802/// [`LIVENESS_TIMEOUT`]. Any incoming frame — including a `Heartbeat`
803/// — resets this clock. A peer that emits heartbeats every 2s while
804/// blocked in a long-running `Drainable::drain` or `Drainable::seal`
805/// is therefore *not* declared dead, no matter how long the hook
806/// takes.
807/// - **Wall-clock** (overall budget): regardless of heartbeats, the
808/// call returns `Error::Timeout(awaiting)` once the `timeout` budget
809/// has elapsed. This bounds catastrophic cases — consumer hook alive
810/// but not making progress — from running forever.
811///
812/// `awaiting` names the message the caller is blocking on; it surfaces
813/// in the timeout error so an operator's log identifies which protocol
814/// step expired (e.g. `Timeout("Drained")` vs `Timeout("Ready")`).
815fn read_until<F>(
816 stream: &mut UnixStream,
817 timeout: Duration,
818 awaiting: &'static str,
819 pred: F,
820) -> Result<Message>
821where
822 F: Fn(&Message) -> bool,
823{
824 let deadline = Instant::now() + timeout.max(MIN_READ_TIMEOUT);
825 loop {
826 let now = Instant::now();
827 if now >= deadline {
828 let _ = stream.set_read_timeout(None);
829 return Err(Error::Timeout(awaiting));
830 }
831 let remaining = deadline - now;
832 // Per-recv timeout: bounded above by LIVENESS_TIMEOUT (peer-dead
833 // detection), below by what's left of the overall budget. Each
834 // successful frame — heartbeat or otherwise — resets this on
835 // the next iteration.
836 let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT);
837 stream.set_read_timeout(Some(recv_timeout))?;
838 match read_message(stream) {
839 Ok((_, Message::Heartbeat { .. })) => continue,
840 Ok((_, Message::SealProgress { .. })) => continue,
841 Ok((_, msg)) if pred(&msg) => {
842 let _ = stream.set_read_timeout(None);
843 return Ok(msg);
844 }
845 Ok((_, other)) => {
846 let _ = stream.set_read_timeout(None);
847 return Err(Error::UnexpectedMessage(short_name(&other)));
848 }
849 Err(Error::Io(e)) if is_timeout(&e) => {
850 let _ = stream.set_read_timeout(None);
851 return Err(Error::Timeout(awaiting));
852 }
853 Err(e) => {
854 let _ = stream.set_read_timeout(None);
855 return Err(e);
856 }
857 }
858 }
859}
860
861fn remaining_until(deadline: Instant) -> Duration {
862 deadline
863 .checked_duration_since(Instant::now())
864 .unwrap_or(MIN_READ_TIMEOUT)
865 .max(MIN_READ_TIMEOUT)
866}
867
868fn is_timeout(e: &std::io::Error) -> bool {
869 matches!(e.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut)
870}