running_process/broker/server/handoff/
pending.rs1use std::collections::VecDeque;
8use std::time::{Duration, Instant};
9
10use super::{HandoffAttemptDecision, HandoffFallbackDecision, HandoffFallbackReason};
11
12pub const DEFAULT_MAX_PENDING_HANDOFFS: usize = 64;
14
15pub const DEFAULT_PENDING_HANDOFF_TTL: Duration = Duration::from_millis(100);
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub struct PendingHandoffQueueConfig {
21 pub max_pending_handoffs: usize,
23 pub pending_ttl: Duration,
25}
26
27impl PendingHandoffQueueConfig {
28 pub fn new(max_pending_handoffs: usize, pending_ttl: Duration) -> Self {
30 Self {
31 max_pending_handoffs: max_pending_handoffs.max(1),
32 pending_ttl: if pending_ttl.is_zero() {
33 Duration::from_millis(1)
34 } else {
35 pending_ttl
36 },
37 }
38 }
39}
40
41impl Default for PendingHandoffQueueConfig {
42 fn default() -> Self {
43 Self {
44 max_pending_handoffs: DEFAULT_MAX_PENDING_HANDOFFS,
45 pending_ttl: DEFAULT_PENDING_HANDOFF_TTL,
46 }
47 }
48}
49
50#[derive(Debug)]
52pub struct PendingHandoffQueue<T> {
53 config: PendingHandoffQueueConfig,
54 queue: VecDeque<PendingHandoffEntry<T>>,
55}
56
57impl<T> PendingHandoffQueue<T> {
58 pub fn new() -> Self {
60 Self::with_config(PendingHandoffQueueConfig::default())
61 }
62
63 pub fn with_config(config: PendingHandoffQueueConfig) -> Self {
65 Self {
66 config,
67 queue: VecDeque::with_capacity(config.max_pending_handoffs),
68 }
69 }
70
71 pub fn config(&self) -> PendingHandoffQueueConfig {
73 self.config
74 }
75
76 pub fn pending_len(&self) -> usize {
78 self.queue.len()
79 }
80
81 pub fn is_empty(&self) -> bool {
83 self.queue.is_empty()
84 }
85
86 pub fn enqueue(&mut self, handoff: T, now: Instant) -> Result<(), PendingHandoffOverflow> {
91 self.expire(now);
92 if self.queue.len() >= self.config.max_pending_handoffs {
93 return Err(PendingHandoffOverflow {
94 max_pending_handoffs: self.config.max_pending_handoffs,
95 });
96 }
97
98 self.queue.push_back(PendingHandoffEntry {
99 handoff,
100 expires_at: expires_at(now, self.config.pending_ttl),
101 });
102 Ok(())
103 }
104
105 pub fn dequeue(&mut self, now: Instant) -> Option<T> {
107 self.expire(now);
108 self.queue.pop_front().map(|entry| entry.handoff)
109 }
110
111 pub fn expire(&mut self, now: Instant) -> usize {
113 let before = self.queue.len();
114 self.queue.retain(|entry| now < entry.expires_at);
115 before - self.queue.len()
116 }
117}
118
119impl<T> Default for PendingHandoffQueue<T> {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
127#[error("pending handoff queue full ({max_pending_handoffs})")]
128pub struct PendingHandoffOverflow {
129 pub max_pending_handoffs: usize,
131}
132
133impl PendingHandoffOverflow {
134 pub fn fallback_reason(&self) -> HandoffFallbackReason {
136 HandoffFallbackReason::FdPressureDisabled
137 }
138
139 pub fn fallback_decision(&self) -> HandoffFallbackDecision {
141 HandoffFallbackDecision::new(self.fallback_reason())
142 }
143
144 pub fn fallback_attempt_decision(&self) -> HandoffAttemptDecision {
146 HandoffAttemptDecision::FallbackToReconnect(self.fallback_decision())
147 }
148
149 pub fn is_fallback_safe(&self) -> bool {
151 let fallback = self.fallback_decision();
152 fallback.uses_backend_reconnect() && !fallback.sends_client_error()
153 }
154}
155
156#[derive(Debug)]
157struct PendingHandoffEntry<T> {
158 handoff: T,
159 expires_at: Instant,
160}
161
162fn expires_at(now: Instant, ttl: Duration) -> Instant {
163 now.checked_add(ttl).unwrap_or(now)
164}