Skip to main content

running_process/broker/server/handoff/
pending.rs

1//! Capacity-bounded pending handoff backlog.
2//!
3//! The real platform transports may need to park accepted client handles while
4//! a backend handoff socket or acknowledgement catches up. This model keeps
5//! that backlog finite and maps overload into the existing reconnect fallback.
6
7use std::collections::VecDeque;
8use std::time::{Duration, Instant};
9
10use super::{HandoffAttemptDecision, HandoffFallbackDecision, HandoffFallbackReason};
11
12/// Default number of pending handoffs retained by one broker process.
13pub const DEFAULT_MAX_PENDING_HANDOFFS: usize = 64;
14
15/// Default maximum age for a pending handoff before it is expired.
16pub const DEFAULT_PENDING_HANDOFF_TTL: Duration = Duration::from_millis(100);
17
18/// Runtime bounds for the pending handoff backlog.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub struct PendingHandoffQueueConfig {
21    /// Maximum handoff attempts allowed to wait for backend handoff progress.
22    pub max_pending_handoffs: usize,
23    /// Maximum age of an unprocessed pending handoff.
24    pub pending_ttl: Duration,
25}
26
27impl PendingHandoffQueueConfig {
28    /// Build a config, clamping zero values to safe non-zero defaults.
29    pub fn new(max_pending_handoffs: usize, pending_ttl: Duration) -> Self {
30        Self {
31            max_pending_handoffs: max_pending_handoffs.max(1),
32            pending_ttl: if pending_ttl.is_zero() {
33                Duration::from_millis(1)
34            } else {
35                pending_ttl
36            },
37        }
38    }
39}
40
41impl Default for PendingHandoffQueueConfig {
42    fn default() -> Self {
43        Self {
44            max_pending_handoffs: DEFAULT_MAX_PENDING_HANDOFFS,
45            pending_ttl: DEFAULT_PENDING_HANDOFF_TTL,
46        }
47    }
48}
49
50/// FIFO queue for pending handoffs that cannot grow past its configured bound.
51#[derive(Debug)]
52pub struct PendingHandoffQueue<T> {
53    config: PendingHandoffQueueConfig,
54    queue: VecDeque<PendingHandoffEntry<T>>,
55}
56
57impl<T> PendingHandoffQueue<T> {
58    /// Create an empty queue with default bounds.
59    pub fn new() -> Self {
60        Self::with_config(PendingHandoffQueueConfig::default())
61    }
62
63    /// Create an empty queue with explicit bounds.
64    pub fn with_config(config: PendingHandoffQueueConfig) -> Self {
65        Self {
66            config,
67            queue: VecDeque::with_capacity(config.max_pending_handoffs),
68        }
69    }
70
71    /// Return the active queue bounds.
72    pub fn config(&self) -> PendingHandoffQueueConfig {
73        self.config
74    }
75
76    /// Return the number of currently pending, non-pruned handoffs.
77    pub fn pending_len(&self) -> usize {
78        self.queue.len()
79    }
80
81    /// Return true when no handoffs are pending.
82    pub fn is_empty(&self) -> bool {
83        self.queue.is_empty()
84    }
85
86    /// Enqueue one handoff in FIFO order.
87    ///
88    /// Expired entries are pruned first. If the backlog is still full, the
89    /// caller must use the returned overflow decision to fall back to reconnect.
90    pub fn enqueue(&mut self, handoff: T, now: Instant) -> Result<(), PendingHandoffOverflow> {
91        self.expire(now);
92        if self.queue.len() >= self.config.max_pending_handoffs {
93            return Err(PendingHandoffOverflow {
94                max_pending_handoffs: self.config.max_pending_handoffs,
95            });
96        }
97
98        self.queue.push_back(PendingHandoffEntry {
99            handoff,
100            expires_at: expires_at(now, self.config.pending_ttl),
101        });
102        Ok(())
103    }
104
105    /// Dequeue the oldest non-expired handoff.
106    pub fn dequeue(&mut self, now: Instant) -> Option<T> {
107        self.expire(now);
108        self.queue.pop_front().map(|entry| entry.handoff)
109    }
110
111    /// Drop all expired pending handoffs and return the number removed.
112    pub fn expire(&mut self, now: Instant) -> usize {
113        let before = self.queue.len();
114        self.queue.retain(|entry| now < entry.expires_at);
115        before - self.queue.len()
116    }
117}
118
119impl<T> Default for PendingHandoffQueue<T> {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125/// Overflow raised when the pending handoff queue reaches its configured cap.
126#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
127#[error("pending handoff queue full ({max_pending_handoffs})")]
128pub struct PendingHandoffOverflow {
129    /// Maximum pending handoffs allowed before reconnect fallback is required.
130    pub max_pending_handoffs: usize,
131}
132
133impl PendingHandoffOverflow {
134    /// Return the existing fallback reason used for handoff pressure.
135    pub fn fallback_reason(&self) -> HandoffFallbackReason {
136        HandoffFallbackReason::FdPressureDisabled
137    }
138
139    /// Return the silent reconnect fallback for this overload condition.
140    pub fn fallback_decision(&self) -> HandoffFallbackDecision {
141        HandoffFallbackDecision::new(self.fallback_reason())
142    }
143
144    /// Return the full attempt decision for callers that operate on broker decisions.
145    pub fn fallback_attempt_decision(&self) -> HandoffAttemptDecision {
146        HandoffAttemptDecision::FallbackToReconnect(self.fallback_decision())
147    }
148
149    /// Return true when overflow is safe to hide behind reconnect fallback.
150    pub fn is_fallback_safe(&self) -> bool {
151        let fallback = self.fallback_decision();
152        fallback.uses_backend_reconnect() && !fallback.sends_client_error()
153    }
154}
155
156#[derive(Debug)]
157struct PendingHandoffEntry<T> {
158    handoff: T,
159    expires_at: Instant,
160}
161
162fn expires_at(now: Instant, ttl: Duration) -> Instant {
163    now.checked_add(ttl).unwrap_or(now)
164}