running_process/broker/server/handoff/ack.rs
1//! Backend ACK deadline tracking for pending handoffs (#354, slice 2).
2//!
3//! Issuing a `Negotiated.handle_passed_token` is not enough to consider a
4//! handoff complete: the backend must report that it adopted the passed
5//! handle before a broker-side deadline. This module owns that contract.
6//!
7//! Transport note: since #354 slice 6 the v1 envelope carries a
8//! backend-to-broker `HandoffAck` frame (see [`super::wire`]); the broker
9//! observes it through
10//! [`WireHandoffDelivery`](super::wire::WireHandoffDelivery) and reports it
11//! here through [`HandoffAckRegistry::acknowledge`]. The backend acceptance
12//! path (`backend_lib::accept_handed_off`) consumes the one-time token
13//! before the ACK is sent. Until an ACK arrives within the deadline,
14//! `Negotiated.backend_pipe` reconnect remains the correctness path; an
15//! expired handoff is abandoned and falls back to reconnect.
16
17use std::collections::HashMap;
18use std::time::{Duration, Instant};
19
20use super::fallback::{HandoffAttemptFailure, HandoffFallbackDecision, HandoffFallbackReason};
21use super::handoff_token::{HandoffToken, HandoffTokenStore};
22
23/// Default deadline for the backend to acknowledge a passed handle.
24///
25/// Deliberately much shorter than [`super::DEFAULT_HANDOFF_TOKEN_TTL`]
26/// (30s): a backend that has actually received a handle acknowledges in
27/// milliseconds, so waiting longer only delays the reconnect fallback.
28pub const DEFAULT_HANDOFF_ACK_DEADLINE: Duration = Duration::from_secs(5);
29
30/// Identity of the backend expected to acknowledge one pending handoff.
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub struct PendingHandoffBackend {
33 /// Service name the handoff was negotiated for.
34 pub service_name: String,
35 /// Backend process ID when known; `0` when the broker only knows the
36 /// service (the Hello path issues tokens before resolving a live pid).
37 pub backend_pid: u32,
38}
39
40impl PendingHandoffBackend {
41 /// Identity for a backend known only by its service name.
42 pub fn for_service(service_name: impl Into<String>) -> Self {
43 Self {
44 service_name: service_name.into(),
45 backend_pid: 0,
46 }
47 }
48
49 /// Identity for a backend known by service name and process ID.
50 pub fn new(service_name: impl Into<String>, backend_pid: u32) -> Self {
51 Self {
52 service_name: service_name.into(),
53 backend_pid,
54 }
55 }
56}
57
58/// Broker-side registry of handoffs awaiting backend acknowledgement.
59///
60/// Keyed by the issued one-time token. Entries leave the registry exactly
61/// once: through [`acknowledge`](Self::acknowledge) (completed) or through
62/// deadline expiry (failed, token revoked, reconnect fallback).
63#[derive(Debug)]
64pub struct HandoffAckRegistry {
65 ack_deadline: Duration,
66 pending: HashMap<HandoffToken, PendingAckEntry>,
67}
68
69impl HandoffAckRegistry {
70 /// Create an empty registry with the default ACK deadline.
71 pub fn new() -> Self {
72 Self::with_ack_deadline(DEFAULT_HANDOFF_ACK_DEADLINE)
73 }
74
75 /// Create an empty registry with an explicit ACK deadline.
76 ///
77 /// A zero deadline is clamped to one millisecond so every registered
78 /// handoff gets a non-empty acknowledgement window.
79 pub fn with_ack_deadline(ack_deadline: Duration) -> Self {
80 Self {
81 ack_deadline: if ack_deadline.is_zero() {
82 Duration::from_millis(1)
83 } else {
84 ack_deadline
85 },
86 pending: HashMap::new(),
87 }
88 }
89
90 /// Return the configured ACK deadline.
91 pub fn ack_deadline(&self) -> Duration {
92 self.ack_deadline
93 }
94
95 /// Return the number of handoffs still awaiting acknowledgement.
96 pub fn pending_len(&self) -> usize {
97 self.pending.len()
98 }
99
100 /// Return the backend identity recorded for a pending handoff.
101 pub fn pending_backend(&self, token: &HandoffToken) -> Option<&PendingHandoffBackend> {
102 self.pending.get(token).map(|entry| &entry.backend)
103 }
104
105 /// Register one issued token as awaiting backend acknowledgement.
106 ///
107 /// Re-registering the same token replaces the previous entry; the token
108 /// store already guarantees issued tokens are unique while pending.
109 pub fn register(&mut self, token: HandoffToken, backend: PendingHandoffBackend, now: Instant) {
110 let ack_deadline_at = now.checked_add(self.ack_deadline).unwrap_or(now);
111 self.pending.insert(
112 token,
113 PendingAckEntry {
114 backend,
115 issued_at: now,
116 ack_deadline_at,
117 },
118 );
119 }
120
121 /// Record that the backend adopted the handed-off connection.
122 ///
123 /// On success the pending entry transitions to completed (removed) and
124 /// the one-time token is revoked from `tokens` so it can never be
125 /// presented again. A second ACK for the same token, an ACK for an
126 /// unknown token, or an ACK after expiry sweep is rejected with
127 /// [`HandoffAckError::TokenNotPending`]. An ACK past the deadline (when
128 /// no sweep ran yet) is rejected with
129 /// [`HandoffAckError::AckDeadlineExceeded`] and the token is revoked.
130 pub fn acknowledge(
131 &mut self,
132 tokens: &mut HandoffTokenStore,
133 token: &HandoffToken,
134 now: Instant,
135 ) -> Result<AcknowledgedHandoff, HandoffAckError> {
136 let Some(entry) = self.pending.remove(token) else {
137 return Err(HandoffAckError::TokenNotPending);
138 };
139 if now >= entry.ack_deadline_at {
140 tokens.revoke(token);
141 return Err(HandoffAckError::AckDeadlineExceeded {
142 backend: entry.backend,
143 deadline: self.ack_deadline,
144 });
145 }
146
147 tokens.revoke(token);
148 Ok(AcknowledgedHandoff {
149 token: *token,
150 backend: entry.backend,
151 waited: now.saturating_duration_since(entry.issued_at),
152 })
153 }
154
155 /// Abandon one pending handoff before its ACK deadline.
156 ///
157 /// Used when a broker-side step (handle duplication or delivery) fails
158 /// after issuance: the pending entry is removed and the one-time token
159 /// is revoked from `tokens` even when the entry was already gone, so a
160 /// late backend presentation of the token is always rejected. Returns
161 /// the backend identity when an entry was pending.
162 pub fn abandon(
163 &mut self,
164 tokens: &mut HandoffTokenStore,
165 token: &HandoffToken,
166 ) -> Option<PendingHandoffBackend> {
167 let entry = self.pending.remove(token);
168 tokens.revoke(token);
169 entry.map(|entry| entry.backend)
170 }
171
172 /// Expire every pending handoff whose ACK deadline has passed.
173 ///
174 /// Each expired handoff has its one-time token revoked from `tokens`
175 /// (so a late backend ACK or token presentation is rejected) and is
176 /// returned to the caller, which must fall back to `backend_pipe`
177 /// reconnect for the affected connection.
178 pub fn expire_overdue(
179 &mut self,
180 tokens: &mut HandoffTokenStore,
181 now: Instant,
182 ) -> Vec<ExpiredHandoff> {
183 let overdue: Vec<HandoffToken> = self
184 .pending
185 .iter()
186 .filter(|(_, entry)| now >= entry.ack_deadline_at)
187 .map(|(token, _)| *token)
188 .collect();
189
190 let mut expired = Vec::with_capacity(overdue.len());
191 for token in overdue {
192 let Some(entry) = self.pending.remove(&token) else {
193 continue;
194 };
195 tokens.revoke(&token);
196 expired.push(ExpiredHandoff {
197 token,
198 backend: entry.backend,
199 deadline: self.ack_deadline,
200 });
201 }
202 expired
203 }
204}
205
206impl Default for HandoffAckRegistry {
207 fn default() -> Self {
208 Self::new()
209 }
210}
211
212/// A handoff completed by a timely backend acknowledgement.
213#[derive(Clone, Debug, PartialEq, Eq)]
214pub struct AcknowledgedHandoff {
215 /// The consumed one-time token.
216 pub token: HandoffToken,
217 /// Backend that acknowledged the handoff.
218 pub backend: PendingHandoffBackend,
219 /// Time the broker waited between issuance and acknowledgement.
220 pub waited: Duration,
221}
222
223/// A pending handoff abandoned because the backend never acknowledged it.
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct ExpiredHandoff {
226 /// The revoked one-time token.
227 pub token: HandoffToken,
228 /// Backend that failed to acknowledge in time.
229 pub backend: PendingHandoffBackend,
230 /// Deadline that was exceeded.
231 pub deadline: Duration,
232}
233
234impl ExpiredHandoff {
235 /// Map this expiry onto the shared handoff failure classification.
236 pub fn attempt_failure(&self) -> HandoffAttemptFailure {
237 HandoffAttemptFailure::BackendAckTimeout
238 }
239
240 /// Return the silent reconnect fallback decision for this expiry.
241 pub fn fallback_decision(&self) -> HandoffFallbackDecision {
242 HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout)
243 }
244}
245
246/// Errors raised when a backend acknowledgement cannot complete a handoff.
247#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
248pub enum HandoffAckError {
249 /// The token is unknown, already acknowledged, or already expired.
250 #[error("handoff ACK token is not pending")]
251 TokenNotPending,
252 /// The acknowledgement arrived after the broker ACK deadline.
253 #[error("backend ACK deadline ({deadline:?}) exceeded for {backend:?}")]
254 AckDeadlineExceeded {
255 /// Backend that acknowledged too late.
256 backend: PendingHandoffBackend,
257 /// Deadline that was exceeded.
258 deadline: Duration,
259 },
260}
261
262impl HandoffAckError {
263 /// Map this error onto the shared handoff failure classification, when
264 /// it represents a backend ACK timeout.
265 pub fn attempt_failure(&self) -> Option<HandoffAttemptFailure> {
266 match self {
267 Self::TokenNotPending => None,
268 Self::AckDeadlineExceeded { .. } => Some(HandoffAttemptFailure::BackendAckTimeout),
269 }
270 }
271}
272
273#[derive(Clone, Debug)]
274struct PendingAckEntry {
275 backend: PendingHandoffBackend,
276 issued_at: Instant,
277 ack_deadline_at: Instant,
278}