rust_supervisor/runtime/concurrent_gate.rs
1//! Concurrent restart throttle gates for preventing restart storm.
2//!
3//! This module implements instance-global and group-level concurrent restart
4//! limits to prevent resource contention during mass failure scenarios.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::{Arc, Mutex};
9
10/// Instance-global concurrent restart gate counter.
11///
12/// Tracks the number of currently active restart attempts across all children
13/// supervised by this supervisor instance. When the limit is reached, new
14/// restart requests are queued or denied based on protection policy.
15#[derive(Debug, Clone)]
16pub struct SupervisorInstanceGate {
17 /// Maximum concurrent restarts allowed at instance level.
18 max_concurrent: u32,
19 /// Current count of active restart attempts.
20 active_count: Arc<AtomicU32>,
21}
22
23impl SupervisorInstanceGate {
24 /// Creates a new instance-global concurrent restart gate.
25 ///
26 /// # Arguments
27 ///
28 /// - `max_concurrent`: Maximum number of concurrent restart attempts allowed.
29 ///
30 /// # Returns
31 ///
32 /// Returns a new [`SupervisorInstanceGate`] with zero active count.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// use rust_supervisor::runtime::concurrent_gate::SupervisorInstanceGate;
38 ///
39 /// let gate = SupervisorInstanceGate::new(5);
40 /// assert_eq!(gate.get_active_count(), 0);
41 /// ```
42 pub fn new(max_concurrent: u32) -> Self {
43 Self {
44 max_concurrent,
45 active_count: Arc::new(AtomicU32::new(0)),
46 }
47 }
48
49 /// Attempts to acquire a restart slot from the instance gate.
50 ///
51 /// # Returns
52 ///
53 /// Returns `true` if a slot was successfully acquired (active count < limit),
54 /// `false` if the gate is saturated (active count >= limit).
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// use rust_supervisor::runtime::concurrent_gate::SupervisorInstanceGate;
60 ///
61 /// let gate = SupervisorInstanceGate::new(2);
62 /// assert!(gate.try_acquire()); // First acquisition succeeds
63 /// assert!(gate.try_acquire()); // Second acquisition succeeds
64 /// assert!(!gate.try_acquire()); // Third acquisition fails (limit reached)
65 /// ```
66 pub fn try_acquire(&self) -> bool {
67 loop {
68 let current = self.active_count.load(Ordering::SeqCst);
69 if current >= self.max_concurrent {
70 return false;
71 }
72 // Attempt atomic increment
73 match self.active_count.compare_exchange_weak(
74 current,
75 current + 1,
76 Ordering::SeqCst,
77 Ordering::SeqCst,
78 ) {
79 Ok(_) => return true,
80 Err(_) => continue, // Retry on CAS failure
81 }
82 }
83 }
84
85 /// Releases a restart slot after restart initiation completes.
86 ///
87 /// NOTE: The gate counter is decremented immediately when restart starts,
88 /// not when restart finishes. If the supervisor crashes before restart
89 /// completes, the slot is reclaimed by timeout or garbage collection.
90 ///
91 /// # Examples
92 ///
93 /// ```
94 /// use rust_supervisor::runtime::concurrent_gate::SupervisorInstanceGate;
95 ///
96 /// let gate = SupervisorInstanceGate::new(2);
97 /// gate.try_acquire();
98 /// gate.release();
99 /// assert_eq!(gate.get_active_count(), 0);
100 /// ```
101 pub fn release(&self) {
102 let previous = self.active_count.fetch_sub(1, Ordering::SeqCst);
103 debug_assert!(previous > 0, "Released more slots than acquired");
104 }
105
106 /// Returns the current number of active restart attempts.
107 ///
108 /// # Returns
109 ///
110 /// Returns the current active count for monitoring and diagnostics.
111 pub fn get_active_count(&self) -> u32 {
112 self.active_count.load(Ordering::SeqCst)
113 }
114
115 /// Returns the configured maximum concurrent restart limit.
116 ///
117 /// # Returns
118 ///
119 /// Returns the maximum allowed concurrent restarts.
120 pub fn get_max_concurrent(&self) -> u32 {
121 self.max_concurrent
122 }
123
124 /// Checks if the gate is currently saturated.
125 ///
126 /// # Returns
127 ///
128 /// Returns `true` if active count has reached or exceeded the limit.
129 pub fn is_saturated(&self) -> bool {
130 self.get_active_count() >= self.max_concurrent
131 }
132}
133
134/// Group-level concurrent restart gate for optional per-group throttling.
135///
136/// When enabled, tracks concurrent restarts within a specific restart execution
137/// plan group. Falls back to instance-global gate when not configured.
138#[derive(Debug, Clone)]
139pub struct GroupLevelGate {
140 /// Map from group identifier to per-group gate state.
141 group_gates: Arc<Mutex<HashMap<String, Arc<AtomicU32>>>>,
142 /// Default maximum concurrent restarts per group.
143 max_per_group: u32,
144}
145
146impl GroupLevelGate {
147 /// Creates a new group-level concurrent restart gate manager.
148 ///
149 /// # Arguments
150 ///
151 /// - `max_per_group`: Maximum concurrent restarts allowed per group.
152 ///
153 /// # Returns
154 ///
155 /// Returns a new [`GroupLevelGate`] with empty group map.
156 ///
157 /// # Examples
158 ///
159 /// ```
160 /// use rust_supervisor::runtime::concurrent_gate::GroupLevelGate;
161 ///
162 /// let gate = GroupLevelGate::new(3);
163 /// assert_eq!(gate.get_active_count_for_group("group-a"), 0);
164 /// ```
165 pub fn new(max_per_group: u32) -> Self {
166 Self {
167 group_gates: Arc::new(Mutex::new(HashMap::new())),
168 max_per_group,
169 }
170 }
171
172 /// Attempts to acquire a restart slot for a specific group.
173 ///
174 /// # Arguments
175 ///
176 /// - `group_id`: Identifier of the restart execution plan group.
177 ///
178 /// # Returns
179 ///
180 /// Returns `true` if a slot was acquired for the group, `false` if saturated.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// use rust_supervisor::runtime::concurrent_gate::GroupLevelGate;
186 ///
187 /// let gate = GroupLevelGate::new(2);
188 /// assert!(gate.try_acquire_for_group("group-a"));
189 /// assert!(gate.try_acquire_for_group("group-a"));
190 /// assert!(!gate.try_acquire_for_group("group-a")); // Limit reached
191 /// ```
192 pub fn try_acquire_for_group(&self, group_id: &str) -> bool {
193 let mut gates = self.group_gates.lock().unwrap();
194 let gate = gates
195 .entry(group_id.to_string())
196 .or_insert_with(|| Arc::new(AtomicU32::new(0)));
197
198 loop {
199 let current = gate.load(Ordering::SeqCst);
200 if current >= self.max_per_group {
201 return false;
202 }
203 match gate.compare_exchange_weak(
204 current,
205 current + 1,
206 Ordering::SeqCst,
207 Ordering::SeqCst,
208 ) {
209 Ok(_) => return true,
210 Err(_) => continue,
211 }
212 }
213 }
214
215 /// Releases a restart slot for a specific group.
216 ///
217 /// # Arguments
218 ///
219 /// - `group_id`: Identifier of the restart execution plan group.
220 pub fn release_for_group(&self, group_id: &str) {
221 let gates = self.group_gates.lock().unwrap();
222 if let Some(gate) = gates.get(group_id) {
223 let previous = gate.fetch_sub(1, Ordering::SeqCst);
224 debug_assert!(previous > 0, "Released more group slots than acquired");
225 }
226 }
227
228 /// Returns the current active count for a specific group.
229 ///
230 /// # Arguments
231 ///
232 /// - `group_id`: Identifier of the restart execution plan group.
233 ///
234 /// # Returns
235 ///
236 /// Returns the active restart count for the specified group.
237 pub fn get_active_count_for_group(&self, group_id: &str) -> u32 {
238 let gates = self.group_gates.lock().unwrap();
239 gates
240 .get(group_id)
241 .map(|g| g.load(Ordering::SeqCst))
242 .unwrap_or(0)
243 }
244
245 /// Checks if a specific group's gate is saturated.
246 ///
247 /// # Arguments
248 ///
249 /// - `group_id`: Identifier of the restart execution plan group.
250 ///
251 /// # Returns
252 ///
253 /// Returns `true` if the group's active count has reached the limit.
254 pub fn is_group_saturated(&self, group_id: &str) -> bool {
255 self.get_active_count_for_group(group_id) >= self.max_per_group
256 }
257}
258
259/// Combined throttle gate that enforces both instance and group limits.
260///
261/// When both gates are active, takes the stricter verdict: if either gate
262/// is saturated, the restart request is throttled.
263#[derive(Debug, Clone)]
264pub struct CombinedThrottleGate {
265 /// Instance-global concurrent restart gate.
266 instance_gate: SupervisorInstanceGate,
267 /// Optional group-level concurrent restart gate.
268 group_gate: Option<GroupLevelGate>,
269}
270
271impl CombinedThrottleGate {
272 /// Creates a combined throttle gate with both instance and group limits.
273 ///
274 /// # Arguments
275 ///
276 /// - `instance_gate`: Instance-global concurrent restart gate.
277 /// - `group_gate`: Optional group-level gate for per-group throttling.
278 ///
279 /// # Returns
280 ///
281 /// Returns a new [`CombinedThrottleGate`].
282 ///
283 /// # Examples
284 ///
285 /// ```
286 /// use rust_supervisor::runtime::concurrent_gate::{
287 /// CombinedThrottleGate, SupervisorInstanceGate, GroupLevelGate,
288 /// };
289 ///
290 /// let instance = SupervisorInstanceGate::new(10);
291 /// let group = GroupLevelGate::new(5);
292 /// let combined = CombinedThrottleGate::new(instance, Some(group));
293 /// ```
294 pub fn new(instance_gate: SupervisorInstanceGate, group_gate: Option<GroupLevelGate>) -> Self {
295 Self {
296 instance_gate,
297 group_gate,
298 }
299 }
300
301 /// Attempts to acquire restart permission through both gates.
302 ///
303 /// Takes the stricter verdict: if either gate is saturated, returns `false`.
304 ///
305 /// # Arguments
306 ///
307 /// - `group_id`: Optional group identifier for group-level gate check.
308 ///
309 /// # Returns
310 ///
311 /// Returns `true` only if both instance and group gates allow the restart.
312 ///
313 /// # Examples
314 ///
315 /// ```
316 /// use rust_supervisor::runtime::concurrent_gate::{
317 /// CombinedThrottleGate, SupervisorInstanceGate, GroupLevelGate,
318 /// };
319 ///
320 /// let instance = SupervisorInstanceGate::new(2);
321 /// let group = GroupLevelGate::new(1);
322 /// let combined = CombinedThrottleGate::new(instance, Some(group));
323 ///
324 /// assert!(combined.try_acquire(Some("group-a")));
325 /// assert!(!combined.try_acquire(Some("group-a"))); // Group limit reached
326 /// ```
327 pub fn try_acquire(&self, group_id: Option<&str>) -> bool {
328 // Check instance gate first
329 if !self.instance_gate.try_acquire() {
330 return false;
331 }
332
333 // If group gate exists and group_id provided, check group limit
334 if let (Some(group_gate), Some(gid)) = (&self.group_gate, group_id)
335 && !group_gate.try_acquire_for_group(gid)
336 {
337 // Release instance slot since group gate failed
338 self.instance_gate.release();
339 return false;
340 }
341
342 true
343 }
344
345 /// Releases restart slots from both instance and group gates.
346 ///
347 /// # Arguments
348 ///
349 /// - `group_id`: Optional group identifier for group-level release.
350 pub fn release(&self, group_id: Option<&str>) {
351 self.instance_gate.release();
352 if let (Some(group_gate), Some(gid)) = (&self.group_gate, group_id) {
353 group_gate.release_for_group(gid);
354 }
355 }
356
357 /// Returns the instance-global gate reference.
358 ///
359 /// # Returns
360 ///
361 /// Returns a reference to the instance gate for monitoring.
362 pub fn instance_gate(&self) -> &SupervisorInstanceGate {
363 &self.instance_gate
364 }
365
366 /// Returns the group-level gate reference if configured.
367 ///
368 /// # Returns
369 ///
370 /// Returns an optional reference to the group gate.
371 pub fn group_gate(&self) -> Option<&GroupLevelGate> {
372 self.group_gate.as_ref()
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use crate::runtime::concurrent_gate::{
379 CombinedThrottleGate, GroupLevelGate, SupervisorInstanceGate,
380 };
381
382 /// Tests basic acquire and release operations on supervisor instance gate.
383 #[test]
384 fn test_instance_gate_basic_acquire_release() {
385 let gate = SupervisorInstanceGate::new(3);
386 assert_eq!(gate.get_active_count(), 0);
387
388 assert!(gate.try_acquire());
389 assert_eq!(gate.get_active_count(), 1);
390
391 assert!(gate.try_acquire());
392 assert_eq!(gate.get_active_count(), 2);
393
394 gate.release();
395 assert_eq!(gate.get_active_count(), 1);
396
397 gate.release();
398 assert_eq!(gate.get_active_count(), 0);
399 }
400
401 /// Tests that instance gate correctly reports saturation when limit is reached.
402 #[test]
403 fn test_instance_gate_saturation() {
404 let gate = SupervisorInstanceGate::new(2);
405
406 assert!(gate.try_acquire());
407 assert!(gate.try_acquire());
408 assert!(!gate.try_acquire()); // Saturated
409
410 assert!(gate.is_saturated());
411 }
412
413 /// Tests that group-level gates isolate concurrency limits per group independently.
414 #[test]
415 fn test_group_gate_isolation() {
416 let gate = GroupLevelGate::new(2);
417
418 // Group A can acquire up to limit
419 assert!(gate.try_acquire_for_group("group-a"));
420 assert!(gate.try_acquire_for_group("group-a"));
421 assert!(!gate.try_acquire_for_group("group-a"));
422
423 // Group B is independent and unaffected
424 assert!(gate.try_acquire_for_group("group-b"));
425 assert_eq!(gate.get_active_count_for_group("group-b"), 1);
426 assert_eq!(gate.get_active_count_for_group("group-a"), 2);
427 }
428
429 /// Tests that combined gate takes the stricter verdict between instance and group gates.
430 #[test]
431 fn test_combined_gate_takes_stricter_verdict() {
432 let instance = SupervisorInstanceGate::new(5);
433 let group = GroupLevelGate::new(2);
434 let combined = CombinedThrottleGate::new(instance, Some(group));
435
436 // Group limit is stricter (2 vs 5)
437 assert!(combined.try_acquire(Some("test-group")));
438 assert!(combined.try_acquire(Some("test-group")));
439 assert!(!combined.try_acquire(Some("test-group"))); // Group saturated
440 }
441
442 /// Tests that combined gate works correctly without a group gate configured.
443 #[test]
444 fn test_combined_gate_without_group() {
445 let instance = SupervisorInstanceGate::new(2);
446 let combined = CombinedThrottleGate::new(instance, None);
447
448 // Only instance gate applies
449 assert!(combined.try_acquire(None));
450 assert!(combined.try_acquire(None));
451 assert!(!combined.try_acquire(None)); // Instance saturated
452 }
453}