Skip to main content

running_process/broker/server/handoff/
ack.rs

1//! Backend ACK deadline tracking for pending handoffs (#354, slice 2).
2//!
3//! Issuing a `Negotiated.handle_passed_token` is not enough to consider a
4//! handoff complete: the backend must report that it adopted the passed
5//! handle before a broker-side deadline. This module owns that contract.
6//!
7//! Transport note: since #354 slice 6 the v1 envelope carries a
8//! backend-to-broker `HandoffAck` frame (see [`super::wire`]); the broker
9//! observes it through
10//! [`WireHandoffDelivery`](super::wire::WireHandoffDelivery) and reports it
11//! here through [`HandoffAckRegistry::acknowledge`]. The backend acceptance
12//! path (`backend_lib::accept_handed_off`) consumes the one-time token
13//! before the ACK is sent. Until an ACK arrives within the deadline,
14//! `Negotiated.backend_pipe` reconnect remains the correctness path; an
15//! expired handoff is abandoned and falls back to reconnect.
16
17use std::collections::HashMap;
18use std::time::{Duration, Instant};
19
20use super::fallback::{HandoffAttemptFailure, HandoffFallbackDecision, HandoffFallbackReason};
21use super::handoff_token::{HandoffToken, HandoffTokenStore};
22
23/// Default deadline for the backend to acknowledge a passed handle.
24///
25/// Deliberately much shorter than [`super::DEFAULT_HANDOFF_TOKEN_TTL`]
26/// (30s): a backend that has actually received a handle acknowledges in
27/// milliseconds, so waiting longer only delays the reconnect fallback.
28pub const DEFAULT_HANDOFF_ACK_DEADLINE: Duration = Duration::from_secs(5);
29
30/// Identity of the backend expected to acknowledge one pending handoff.
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub struct PendingHandoffBackend {
33    /// Service name the handoff was negotiated for.
34    pub service_name: String,
35    /// Backend process ID when known; `0` when the broker only knows the
36    /// service (the Hello path issues tokens before resolving a live pid).
37    pub backend_pid: u32,
38}
39
40impl PendingHandoffBackend {
41    /// Identity for a backend known only by its service name.
42    pub fn for_service(service_name: impl Into<String>) -> Self {
43        Self {
44            service_name: service_name.into(),
45            backend_pid: 0,
46        }
47    }
48
49    /// Identity for a backend known by service name and process ID.
50    pub fn new(service_name: impl Into<String>, backend_pid: u32) -> Self {
51        Self {
52            service_name: service_name.into(),
53            backend_pid,
54        }
55    }
56}
57
58/// Broker-side registry of handoffs awaiting backend acknowledgement.
59///
60/// Keyed by the issued one-time token. Entries leave the registry exactly
61/// once: through [`acknowledge`](Self::acknowledge) (completed) or through
62/// deadline expiry (failed, token revoked, reconnect fallback).
63#[derive(Debug)]
64pub struct HandoffAckRegistry {
65    ack_deadline: Duration,
66    pending: HashMap<HandoffToken, PendingAckEntry>,
67}
68
69impl HandoffAckRegistry {
70    /// Create an empty registry with the default ACK deadline.
71    pub fn new() -> Self {
72        Self::with_ack_deadline(DEFAULT_HANDOFF_ACK_DEADLINE)
73    }
74
75    /// Create an empty registry with an explicit ACK deadline.
76    ///
77    /// A zero deadline is clamped to one millisecond so every registered
78    /// handoff gets a non-empty acknowledgement window.
79    pub fn with_ack_deadline(ack_deadline: Duration) -> Self {
80        Self {
81            ack_deadline: if ack_deadline.is_zero() {
82                Duration::from_millis(1)
83            } else {
84                ack_deadline
85            },
86            pending: HashMap::new(),
87        }
88    }
89
90    /// Return the configured ACK deadline.
91    pub fn ack_deadline(&self) -> Duration {
92        self.ack_deadline
93    }
94
95    /// Return the number of handoffs still awaiting acknowledgement.
96    pub fn pending_len(&self) -> usize {
97        self.pending.len()
98    }
99
100    /// Return the backend identity recorded for a pending handoff.
101    pub fn pending_backend(&self, token: &HandoffToken) -> Option<&PendingHandoffBackend> {
102        self.pending.get(token).map(|entry| &entry.backend)
103    }
104
105    /// Register one issued token as awaiting backend acknowledgement.
106    ///
107    /// Re-registering the same token replaces the previous entry; the token
108    /// store already guarantees issued tokens are unique while pending.
109    pub fn register(&mut self, token: HandoffToken, backend: PendingHandoffBackend, now: Instant) {
110        let ack_deadline_at = now.checked_add(self.ack_deadline).unwrap_or(now);
111        self.pending.insert(
112            token,
113            PendingAckEntry {
114                backend,
115                issued_at: now,
116                ack_deadline_at,
117            },
118        );
119    }
120
121    /// Record that the backend adopted the handed-off connection.
122    ///
123    /// On success the pending entry transitions to completed (removed) and
124    /// the one-time token is revoked from `tokens` so it can never be
125    /// presented again. A second ACK for the same token, an ACK for an
126    /// unknown token, or an ACK after expiry sweep is rejected with
127    /// [`HandoffAckError::TokenNotPending`]. An ACK past the deadline (when
128    /// no sweep ran yet) is rejected with
129    /// [`HandoffAckError::AckDeadlineExceeded`] and the token is revoked.
130    pub fn acknowledge(
131        &mut self,
132        tokens: &mut HandoffTokenStore,
133        token: &HandoffToken,
134        now: Instant,
135    ) -> Result<AcknowledgedHandoff, HandoffAckError> {
136        let Some(entry) = self.pending.remove(token) else {
137            return Err(HandoffAckError::TokenNotPending);
138        };
139        if now >= entry.ack_deadline_at {
140            tokens.revoke(token);
141            return Err(HandoffAckError::AckDeadlineExceeded {
142                backend: entry.backend,
143                deadline: self.ack_deadline,
144            });
145        }
146
147        tokens.revoke(token);
148        Ok(AcknowledgedHandoff {
149            token: *token,
150            backend: entry.backend,
151            waited: now.saturating_duration_since(entry.issued_at),
152        })
153    }
154
155    /// Abandon one pending handoff before its ACK deadline.
156    ///
157    /// Used when a broker-side step (handle duplication or delivery) fails
158    /// after issuance: the pending entry is removed and the one-time token
159    /// is revoked from `tokens` even when the entry was already gone, so a
160    /// late backend presentation of the token is always rejected. Returns
161    /// the backend identity when an entry was pending.
162    pub fn abandon(
163        &mut self,
164        tokens: &mut HandoffTokenStore,
165        token: &HandoffToken,
166    ) -> Option<PendingHandoffBackend> {
167        let entry = self.pending.remove(token);
168        tokens.revoke(token);
169        entry.map(|entry| entry.backend)
170    }
171
172    /// Expire every pending handoff whose ACK deadline has passed.
173    ///
174    /// Each expired handoff has its one-time token revoked from `tokens`
175    /// (so a late backend ACK or token presentation is rejected) and is
176    /// returned to the caller, which must fall back to `backend_pipe`
177    /// reconnect for the affected connection.
178    pub fn expire_overdue(
179        &mut self,
180        tokens: &mut HandoffTokenStore,
181        now: Instant,
182    ) -> Vec<ExpiredHandoff> {
183        let overdue: Vec<HandoffToken> = self
184            .pending
185            .iter()
186            .filter(|(_, entry)| now >= entry.ack_deadline_at)
187            .map(|(token, _)| *token)
188            .collect();
189
190        let mut expired = Vec::with_capacity(overdue.len());
191        for token in overdue {
192            let Some(entry) = self.pending.remove(&token) else {
193                continue;
194            };
195            tokens.revoke(&token);
196            expired.push(ExpiredHandoff {
197                token,
198                backend: entry.backend,
199                deadline: self.ack_deadline,
200            });
201        }
202        expired
203    }
204}
205
206impl Default for HandoffAckRegistry {
207    fn default() -> Self {
208        Self::new()
209    }
210}
211
212/// A handoff completed by a timely backend acknowledgement.
213#[derive(Clone, Debug, PartialEq, Eq)]
214pub struct AcknowledgedHandoff {
215    /// The consumed one-time token.
216    pub token: HandoffToken,
217    /// Backend that acknowledged the handoff.
218    pub backend: PendingHandoffBackend,
219    /// Time the broker waited between issuance and acknowledgement.
220    pub waited: Duration,
221}
222
223/// A pending handoff abandoned because the backend never acknowledged it.
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct ExpiredHandoff {
226    /// The revoked one-time token.
227    pub token: HandoffToken,
228    /// Backend that failed to acknowledge in time.
229    pub backend: PendingHandoffBackend,
230    /// Deadline that was exceeded.
231    pub deadline: Duration,
232}
233
234impl ExpiredHandoff {
235    /// Map this expiry onto the shared handoff failure classification.
236    pub fn attempt_failure(&self) -> HandoffAttemptFailure {
237        HandoffAttemptFailure::BackendAckTimeout
238    }
239
240    /// Return the silent reconnect fallback decision for this expiry.
241    pub fn fallback_decision(&self) -> HandoffFallbackDecision {
242        HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout)
243    }
244}
245
246/// Errors raised when a backend acknowledgement cannot complete a handoff.
247#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
248pub enum HandoffAckError {
249    /// The token is unknown, already acknowledged, or already expired.
250    #[error("handoff ACK token is not pending")]
251    TokenNotPending,
252    /// The acknowledgement arrived after the broker ACK deadline.
253    #[error("backend ACK deadline ({deadline:?}) exceeded for {backend:?}")]
254    AckDeadlineExceeded {
255        /// Backend that acknowledged too late.
256        backend: PendingHandoffBackend,
257        /// Deadline that was exceeded.
258        deadline: Duration,
259    },
260}
261
262impl HandoffAckError {
263    /// Map this error onto the shared handoff failure classification, when
264    /// it represents a backend ACK timeout.
265    pub fn attempt_failure(&self) -> Option<HandoffAttemptFailure> {
266        match self {
267            Self::TokenNotPending => None,
268            Self::AckDeadlineExceeded { .. } => Some(HandoffAttemptFailure::BackendAckTimeout),
269        }
270    }
271}
272
273#[derive(Clone, Debug)]
274struct PendingAckEntry {
275    backend: PendingHandoffBackend,
276    issued_at: Instant,
277    ack_deadline_at: Instant,
278}