running_process/broker/server/
broadcast.rs1use std::path::PathBuf;
9use std::time::Duration;
10
11use super::backend_registry::BackendKey;
12
13pub const DEFAULT_BROADCAST_ACK_TIMEOUT: Duration = Duration::from_secs(5);
15
16#[derive(Clone, Debug, PartialEq, Eq)]
18pub enum BroadcastOperation {
19 ReleaseHandles {
21 path_prefix: PathBuf,
23 },
24 Quiesce {
26 reason: QuiesceReason,
28 },
29}
30
31impl BroadcastOperation {
32 pub fn release_handles(path_prefix: impl Into<PathBuf>) -> Self {
34 Self::ReleaseHandles {
35 path_prefix: path_prefix.into(),
36 }
37 }
38
39 pub fn quiesce(reason: QuiesceReason) -> Self {
41 Self::Quiesce { reason }
42 }
43}
44
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47pub enum QuiesceReason {
48 IdleTimeout,
50 BrokerShutdown,
52 Maintenance,
54}
55
56#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub struct BroadcastPolicy {
59 pub ack_timeout: Duration,
61}
62
63impl BroadcastPolicy {
64 pub fn new(ack_timeout: Duration) -> Self {
66 Self {
67 ack_timeout: if ack_timeout.is_zero() {
68 Duration::from_millis(1)
69 } else {
70 ack_timeout
71 },
72 }
73 }
74}
75
76impl Default for BroadcastPolicy {
77 fn default() -> Self {
78 Self {
79 ack_timeout: DEFAULT_BROADCAST_ACK_TIMEOUT,
80 }
81 }
82}
83
84#[derive(Clone, Debug, PartialEq, Eq)]
86pub struct BroadcastBackend {
87 pub key: BackendKey,
89 live: bool,
90 response: BroadcastBackendResponse,
91 received: Vec<BroadcastOperation>,
92}
93
94impl BroadcastBackend {
95 pub fn live(key: BackendKey) -> Self {
97 Self {
98 key,
99 live: true,
100 response: BroadcastBackendResponse::Ack,
101 received: Vec::new(),
102 }
103 }
104
105 pub fn dead(key: BackendKey) -> Self {
107 Self {
108 key,
109 live: false,
110 response: BroadcastBackendResponse::Ack,
111 received: Vec::new(),
112 }
113 }
114
115 pub fn with_response(mut self, response: BroadcastBackendResponse) -> Self {
117 self.response = response;
118 self
119 }
120
121 pub fn set_live(&mut self, live: bool) {
123 self.live = live;
124 }
125
126 pub fn is_live(&self) -> bool {
128 self.live
129 }
130
131 pub fn received_operations(&self) -> &[BroadcastOperation] {
133 &self.received
134 }
135}
136
137#[derive(Clone, Copy, Debug, PartialEq, Eq)]
139pub enum BroadcastBackendResponse {
140 Ack,
142 Timeout,
144 Failure(BroadcastFailureReason),
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq)]
150pub enum BroadcastFailureReason {
151 UnsupportedOperation,
153 Rejected,
155 BackendError,
157}
158
159#[derive(Debug)]
161pub struct LifecycleBroadcastModel {
162 policy: BroadcastPolicy,
163 backends: Vec<BroadcastBackend>,
164}
165
166impl LifecycleBroadcastModel {
167 pub fn new() -> Self {
169 Self::with_policy(BroadcastPolicy::default())
170 }
171
172 pub fn with_policy(policy: BroadcastPolicy) -> Self {
174 Self {
175 policy,
176 backends: Vec::new(),
177 }
178 }
179
180 pub fn register_backend(&mut self, backend: BroadcastBackend) -> Option<BroadcastBackend> {
182 if let Some(existing) = self
183 .backends
184 .iter_mut()
185 .find(|existing| existing.key == backend.key)
186 {
187 return Some(std::mem::replace(existing, backend));
188 }
189
190 self.backends.push(backend);
191 None
192 }
193
194 pub fn backend(&self, key: &BackendKey) -> Option<&BroadcastBackend> {
196 self.backends.iter().find(|backend| &backend.key == key)
197 }
198
199 pub fn backends(&self) -> &[BroadcastBackend] {
201 &self.backends
202 }
203
204 pub fn release_handles_under_path(
206 &mut self,
207 path_prefix: impl Into<PathBuf>,
208 ) -> BroadcastResult {
209 self.broadcast(BroadcastOperation::release_handles(path_prefix))
210 }
211
212 pub fn quiesce(&mut self, reason: QuiesceReason) -> BroadcastResult {
214 self.broadcast(BroadcastOperation::quiesce(reason))
215 }
216
217 pub fn broadcast(&mut self, operation: BroadcastOperation) -> BroadcastResult {
219 let mut result = BroadcastResult::new(operation.clone());
220
221 for backend in &mut self.backends {
222 if !backend.live {
223 result.skipped_dead.push(backend.key.clone());
224 continue;
225 }
226
227 backend.received.push(operation.clone());
228 match backend.response {
229 BroadcastBackendResponse::Ack => {
230 result.acks.push(BroadcastAck {
231 key: backend.key.clone(),
232 });
233 }
234 BroadcastBackendResponse::Timeout => {
235 result.timeouts.push(BroadcastTimeout {
236 key: backend.key.clone(),
237 timeout: self.policy.ack_timeout,
238 });
239 }
240 BroadcastBackendResponse::Failure(reason) => {
241 result.failures.push(BroadcastFailure {
242 key: backend.key.clone(),
243 reason,
244 });
245 }
246 }
247 }
248
249 result
250 }
251}
252
253impl Default for LifecycleBroadcastModel {
254 fn default() -> Self {
255 Self::new()
256 }
257}
258
259#[derive(Clone, Debug, PartialEq, Eq)]
261pub struct BroadcastResult {
262 pub operation: BroadcastOperation,
264 pub acks: Vec<BroadcastAck>,
266 pub timeouts: Vec<BroadcastTimeout>,
268 pub failures: Vec<BroadcastFailure>,
270 pub skipped_dead: Vec<BackendKey>,
272}
273
274impl BroadcastResult {
275 fn new(operation: BroadcastOperation) -> Self {
276 Self {
277 operation,
278 acks: Vec::new(),
279 timeouts: Vec::new(),
280 failures: Vec::new(),
281 skipped_dead: Vec::new(),
282 }
283 }
284
285 pub fn sent_count(&self) -> usize {
287 self.acks.len() + self.timeouts.len() + self.failures.len()
288 }
289
290 pub fn all_live_backends_acked(&self) -> bool {
292 self.timeouts.is_empty() && self.failures.is_empty()
293 }
294}
295
296#[derive(Clone, Debug, PartialEq, Eq)]
298pub struct BroadcastAck {
299 pub key: BackendKey,
301}
302
303#[derive(Clone, Debug, PartialEq, Eq)]
305pub struct BroadcastTimeout {
306 pub key: BackendKey,
308 pub timeout: Duration,
310}
311
312#[derive(Clone, Debug, PartialEq, Eq)]
314pub struct BroadcastFailure {
315 pub key: BackendKey,
317 pub reason: BroadcastFailureReason,
319}