Skip to main content

running_process/broker/server/
broadcast.rs

1//! Backend lifecycle broadcast model.
2//!
3//! This module only models broker-to-backend lifecycle control fanout. It does
4//! not open sockets, send frames, or define backend RPC. Callers can use the
5//! result shape here when later Phase 5 work wires real maintenance requests
6//! into live backend connections.
7
8use std::path::PathBuf;
9use std::time::Duration;
10
11use super::backend_registry::BackendKey;
12
13/// Default time allowed for one backend to acknowledge a lifecycle broadcast.
14pub const DEFAULT_BROADCAST_ACK_TIMEOUT: Duration = Duration::from_secs(5);
15
16/// Broker lifecycle broadcast operation.
17#[derive(Clone, Debug, PartialEq, Eq)]
18pub enum BroadcastOperation {
19    /// Ask backends to release file handles below a path prefix.
20    ReleaseHandles {
21        /// Path prefix whose handles should be released.
22        path_prefix: PathBuf,
23    },
24    /// Ask backends to stop accepting new work and drain.
25    Quiesce {
26        /// Reason the backend is being asked to drain.
27        reason: QuiesceReason,
28    },
29}
30
31impl BroadcastOperation {
32    /// Build a release-handles operation for `path_prefix`.
33    pub fn release_handles(path_prefix: impl Into<PathBuf>) -> Self {
34        Self::ReleaseHandles {
35            path_prefix: path_prefix.into(),
36        }
37    }
38
39    /// Build a quiesce operation.
40    pub fn quiesce(reason: QuiesceReason) -> Self {
41        Self::Quiesce { reason }
42    }
43}
44
45/// Reason attached to a broker quiesce broadcast.
46#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47pub enum QuiesceReason {
48    /// Backend crossed its configured idle threshold.
49    IdleTimeout,
50    /// Broker is shutting down gracefully.
51    BrokerShutdown,
52    /// Operator or maintenance policy requested a drain.
53    Maintenance,
54}
55
56/// Broadcast timeout policy.
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub struct BroadcastPolicy {
59    /// Maximum time to wait for each backend acknowledgement.
60    pub ack_timeout: Duration,
61}
62
63impl BroadcastPolicy {
64    /// Build a policy, clamping zero timeout to a non-zero floor.
65    pub fn new(ack_timeout: Duration) -> Self {
66        Self {
67            ack_timeout: if ack_timeout.is_zero() {
68                Duration::from_millis(1)
69            } else {
70                ack_timeout
71            },
72        }
73    }
74}
75
76impl Default for BroadcastPolicy {
77    fn default() -> Self {
78        Self {
79            ack_timeout: DEFAULT_BROADCAST_ACK_TIMEOUT,
80        }
81    }
82}
83
84/// Testable model of one backend's lifecycle-broadcast endpoint.
85#[derive(Clone, Debug, PartialEq, Eq)]
86pub struct BroadcastBackend {
87    /// Backend key receiving broadcasts.
88    pub key: BackendKey,
89    live: bool,
90    response: BroadcastBackendResponse,
91    received: Vec<BroadcastOperation>,
92}
93
94impl BroadcastBackend {
95    /// Build a live backend endpoint that acknowledges broadcasts.
96    pub fn live(key: BackendKey) -> Self {
97        Self {
98            key,
99            live: true,
100            response: BroadcastBackendResponse::Ack,
101            received: Vec::new(),
102        }
103    }
104
105    /// Build a dead backend endpoint that should be skipped.
106    pub fn dead(key: BackendKey) -> Self {
107        Self {
108            key,
109            live: false,
110            response: BroadcastBackendResponse::Ack,
111            received: Vec::new(),
112        }
113    }
114
115    /// Set the modeled response returned when this backend receives a request.
116    pub fn with_response(mut self, response: BroadcastBackendResponse) -> Self {
117        self.response = response;
118        self
119    }
120
121    /// Mark the endpoint live or dead.
122    pub fn set_live(&mut self, live: bool) {
123        self.live = live;
124    }
125
126    /// Return true when this backend should receive broadcasts.
127    pub fn is_live(&self) -> bool {
128        self.live
129    }
130
131    /// Operations that reached this backend in the model.
132    pub fn received_operations(&self) -> &[BroadcastOperation] {
133        &self.received
134    }
135}
136
137/// Modeled backend response to a lifecycle broadcast.
138#[derive(Clone, Copy, Debug, PartialEq, Eq)]
139pub enum BroadcastBackendResponse {
140    /// Backend acknowledged the operation.
141    Ack,
142    /// Backend did not acknowledge before the broadcast timeout.
143    Timeout,
144    /// Backend rejected or failed the operation.
145    Failure(BroadcastFailureReason),
146}
147
148/// Failure reason returned by a modeled backend endpoint.
149#[derive(Clone, Copy, Debug, PartialEq, Eq)]
150pub enum BroadcastFailureReason {
151    /// Backend does not support the requested lifecycle operation.
152    UnsupportedOperation,
153    /// Backend rejected the operation.
154    Rejected,
155    /// Backend failed while processing the operation.
156    BackendError,
157}
158
159/// In-repo lifecycle broadcast model for broker-managed backends.
160#[derive(Debug)]
161pub struct LifecycleBroadcastModel {
162    policy: BroadcastPolicy,
163    backends: Vec<BroadcastBackend>,
164}
165
166impl LifecycleBroadcastModel {
167    /// Create an empty model with the default timeout policy.
168    pub fn new() -> Self {
169        Self::with_policy(BroadcastPolicy::default())
170    }
171
172    /// Create an empty model with an explicit timeout policy.
173    pub fn with_policy(policy: BroadcastPolicy) -> Self {
174        Self {
175            policy,
176            backends: Vec::new(),
177        }
178    }
179
180    /// Register or replace one backend endpoint.
181    pub fn register_backend(&mut self, backend: BroadcastBackend) -> Option<BroadcastBackend> {
182        if let Some(existing) = self
183            .backends
184            .iter_mut()
185            .find(|existing| existing.key == backend.key)
186        {
187            return Some(std::mem::replace(existing, backend));
188        }
189
190        self.backends.push(backend);
191        None
192    }
193
194    /// Return a registered backend by key.
195    pub fn backend(&self, key: &BackendKey) -> Option<&BroadcastBackend> {
196        self.backends.iter().find(|backend| &backend.key == key)
197    }
198
199    /// Return all registered backend endpoints in insertion order.
200    pub fn backends(&self) -> &[BroadcastBackend] {
201        &self.backends
202    }
203
204    /// Broadcast a release-handles operation to all live backends.
205    pub fn release_handles_under_path(
206        &mut self,
207        path_prefix: impl Into<PathBuf>,
208    ) -> BroadcastResult {
209        self.broadcast(BroadcastOperation::release_handles(path_prefix))
210    }
211
212    /// Broadcast a quiesce operation to all live backends.
213    pub fn quiesce(&mut self, reason: QuiesceReason) -> BroadcastResult {
214        self.broadcast(BroadcastOperation::quiesce(reason))
215    }
216
217    /// Broadcast one lifecycle operation to all live backends.
218    pub fn broadcast(&mut self, operation: BroadcastOperation) -> BroadcastResult {
219        let mut result = BroadcastResult::new(operation.clone());
220
221        for backend in &mut self.backends {
222            if !backend.live {
223                result.skipped_dead.push(backend.key.clone());
224                continue;
225            }
226
227            backend.received.push(operation.clone());
228            match backend.response {
229                BroadcastBackendResponse::Ack => {
230                    result.acks.push(BroadcastAck {
231                        key: backend.key.clone(),
232                    });
233                }
234                BroadcastBackendResponse::Timeout => {
235                    result.timeouts.push(BroadcastTimeout {
236                        key: backend.key.clone(),
237                        timeout: self.policy.ack_timeout,
238                    });
239                }
240                BroadcastBackendResponse::Failure(reason) => {
241                    result.failures.push(BroadcastFailure {
242                        key: backend.key.clone(),
243                        reason,
244                    });
245                }
246            }
247        }
248
249        result
250    }
251}
252
253impl Default for LifecycleBroadcastModel {
254    fn default() -> Self {
255        Self::new()
256    }
257}
258
259/// Broadcast result across all registered model endpoints.
260#[derive(Clone, Debug, PartialEq, Eq)]
261pub struct BroadcastResult {
262    /// Operation that was broadcast.
263    pub operation: BroadcastOperation,
264    /// Backends that acknowledged the operation.
265    pub acks: Vec<BroadcastAck>,
266    /// Live backends that timed out.
267    pub timeouts: Vec<BroadcastTimeout>,
268    /// Live backends that failed the operation.
269    pub failures: Vec<BroadcastFailure>,
270    /// Dead backends skipped before fanout.
271    pub skipped_dead: Vec<BackendKey>,
272}
273
274impl BroadcastResult {
275    fn new(operation: BroadcastOperation) -> Self {
276        Self {
277            operation,
278            acks: Vec::new(),
279            timeouts: Vec::new(),
280            failures: Vec::new(),
281            skipped_dead: Vec::new(),
282        }
283    }
284
285    /// Number of live backends that received the broadcast.
286    pub fn sent_count(&self) -> usize {
287        self.acks.len() + self.timeouts.len() + self.failures.len()
288    }
289
290    /// Return true when all live backends acknowledged the operation.
291    pub fn all_live_backends_acked(&self) -> bool {
292        self.timeouts.is_empty() && self.failures.is_empty()
293    }
294}
295
296/// Successful backend acknowledgement.
297#[derive(Clone, Debug, PartialEq, Eq)]
298pub struct BroadcastAck {
299    /// Backend that acknowledged the operation.
300    pub key: BackendKey,
301}
302
303/// Backend timeout during broadcast acknowledgement.
304#[derive(Clone, Debug, PartialEq, Eq)]
305pub struct BroadcastTimeout {
306    /// Backend that timed out.
307    pub key: BackendKey,
308    /// Timeout from the active broadcast policy.
309    pub timeout: Duration,
310}
311
312/// Backend failure during broadcast handling.
313#[derive(Clone, Debug, PartialEq, Eq)]
314pub struct BroadcastFailure {
315    /// Backend that failed the operation.
316    pub key: BackendKey,
317    /// Failure reason.
318    pub reason: BroadcastFailureReason,
319}