Skip to main content

rust_supervisor/runtime/
concurrent_gate.rs

1//! Concurrent restart throttle gates for preventing restart storm.
2//!
3//! This module implements instance-global and group-level concurrent restart
4//! limits to prevent resource contention during mass failure scenarios.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::{Arc, Mutex};
9
10/// Instance-global concurrent restart gate counter.
11///
12/// Tracks the number of currently active restart attempts across all children
13/// supervised by this supervisor instance. When the limit is reached, new
14/// restart requests are queued or denied based on protection policy.
15#[derive(Debug, Clone)]
16pub struct SupervisorInstanceGate {
17    /// Maximum concurrent restarts allowed at instance level.
18    max_concurrent: u32,
19    /// Current count of active restart attempts.
20    active_count: Arc<AtomicU32>,
21}
22
23impl SupervisorInstanceGate {
24    /// Creates a new instance-global concurrent restart gate.
25    ///
26    /// # Arguments
27    ///
28    /// - `max_concurrent`: Maximum number of concurrent restart attempts allowed.
29    ///
30    /// # Returns
31    ///
32    /// Returns a new [`SupervisorInstanceGate`] with zero active count.
33    ///
34    /// # Examples
35    ///
36    /// ```
37    /// use rust_supervisor::runtime::concurrent_gate::SupervisorInstanceGate;
38    ///
39    /// let gate = SupervisorInstanceGate::new(5);
40    /// assert_eq!(gate.get_active_count(), 0);
41    /// ```
42    pub fn new(max_concurrent: u32) -> Self {
43        Self {
44            max_concurrent,
45            active_count: Arc::new(AtomicU32::new(0)),
46        }
47    }
48
49    /// Attempts to acquire a restart slot from the instance gate.
50    ///
51    /// # Returns
52    ///
53    /// Returns `true` if a slot was successfully acquired (active count < limit),
54    /// `false` if the gate is saturated (active count >= limit).
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// use rust_supervisor::runtime::concurrent_gate::SupervisorInstanceGate;
60    ///
61    /// let gate = SupervisorInstanceGate::new(2);
62    /// assert!(gate.try_acquire()); // First acquisition succeeds
63    /// assert!(gate.try_acquire()); // Second acquisition succeeds
64    /// assert!(!gate.try_acquire()); // Third acquisition fails (limit reached)
65    /// ```
66    pub fn try_acquire(&self) -> bool {
67        loop {
68            let current = self.active_count.load(Ordering::SeqCst);
69            if current >= self.max_concurrent {
70                return false;
71            }
72            // Attempt atomic increment
73            match self.active_count.compare_exchange_weak(
74                current,
75                current + 1,
76                Ordering::SeqCst,
77                Ordering::SeqCst,
78            ) {
79                Ok(_) => return true,
80                Err(_) => continue, // Retry on CAS failure
81            }
82        }
83    }
84
85    /// Releases a restart slot after restart initiation completes.
86    ///
87    /// NOTE: The gate counter is decremented immediately when restart starts,
88    /// not when restart finishes. If the supervisor crashes before restart
89    /// completes, the slot is reclaimed by timeout or garbage collection.
90    ///
91    /// # Examples
92    ///
93    /// ```
94    /// use rust_supervisor::runtime::concurrent_gate::SupervisorInstanceGate;
95    ///
96    /// let gate = SupervisorInstanceGate::new(2);
97    /// gate.try_acquire();
98    /// gate.release();
99    /// assert_eq!(gate.get_active_count(), 0);
100    /// ```
101    pub fn release(&self) {
102        let previous = self.active_count.fetch_sub(1, Ordering::SeqCst);
103        debug_assert!(previous > 0, "Released more slots than acquired");
104    }
105
106    /// Returns the current number of active restart attempts.
107    ///
108    /// # Returns
109    ///
110    /// Returns the current active count for monitoring and diagnostics.
111    pub fn get_active_count(&self) -> u32 {
112        self.active_count.load(Ordering::SeqCst)
113    }
114
115    /// Returns the configured maximum concurrent restart limit.
116    ///
117    /// # Returns
118    ///
119    /// Returns the maximum allowed concurrent restarts.
120    pub fn get_max_concurrent(&self) -> u32 {
121        self.max_concurrent
122    }
123
124    /// Checks if the gate is currently saturated.
125    ///
126    /// # Returns
127    ///
128    /// Returns `true` if active count has reached or exceeded the limit.
129    pub fn is_saturated(&self) -> bool {
130        self.get_active_count() >= self.max_concurrent
131    }
132}
133
134/// Group-level concurrent restart gate for optional per-group throttling.
135///
136/// When enabled, tracks concurrent restarts within a specific restart execution
137/// plan group. Falls back to instance-global gate when not configured.
138#[derive(Debug, Clone)]
139pub struct GroupLevelGate {
140    /// Map from group identifier to per-group gate state.
141    group_gates: Arc<Mutex<HashMap<String, Arc<AtomicU32>>>>,
142    /// Default maximum concurrent restarts per group.
143    max_per_group: u32,
144}
145
146impl GroupLevelGate {
147    /// Creates a new group-level concurrent restart gate manager.
148    ///
149    /// # Arguments
150    ///
151    /// - `max_per_group`: Maximum concurrent restarts allowed per group.
152    ///
153    /// # Returns
154    ///
155    /// Returns a new [`GroupLevelGate`] with empty group map.
156    ///
157    /// # Examples
158    ///
159    /// ```
160    /// use rust_supervisor::runtime::concurrent_gate::GroupLevelGate;
161    ///
162    /// let gate = GroupLevelGate::new(3);
163    /// assert_eq!(gate.get_active_count_for_group("group-a"), 0);
164    /// ```
165    pub fn new(max_per_group: u32) -> Self {
166        Self {
167            group_gates: Arc::new(Mutex::new(HashMap::new())),
168            max_per_group,
169        }
170    }
171
172    /// Attempts to acquire a restart slot for a specific group.
173    ///
174    /// # Arguments
175    ///
176    /// - `group_id`: Identifier of the restart execution plan group.
177    ///
178    /// # Returns
179    ///
180    /// Returns `true` if a slot was acquired for the group, `false` if saturated.
181    ///
182    /// # Examples
183    ///
184    /// ```
185    /// use rust_supervisor::runtime::concurrent_gate::GroupLevelGate;
186    ///
187    /// let gate = GroupLevelGate::new(2);
188    /// assert!(gate.try_acquire_for_group("group-a"));
189    /// assert!(gate.try_acquire_for_group("group-a"));
190    /// assert!(!gate.try_acquire_for_group("group-a")); // Limit reached
191    /// ```
192    pub fn try_acquire_for_group(&self, group_id: &str) -> bool {
193        let mut gates = self.group_gates.lock().unwrap();
194        let gate = gates
195            .entry(group_id.to_string())
196            .or_insert_with(|| Arc::new(AtomicU32::new(0)));
197
198        loop {
199            let current = gate.load(Ordering::SeqCst);
200            if current >= self.max_per_group {
201                return false;
202            }
203            match gate.compare_exchange_weak(
204                current,
205                current + 1,
206                Ordering::SeqCst,
207                Ordering::SeqCst,
208            ) {
209                Ok(_) => return true,
210                Err(_) => continue,
211            }
212        }
213    }
214
215    /// Releases a restart slot for a specific group.
216    ///
217    /// # Arguments
218    ///
219    /// - `group_id`: Identifier of the restart execution plan group.
220    pub fn release_for_group(&self, group_id: &str) {
221        let gates = self.group_gates.lock().unwrap();
222        if let Some(gate) = gates.get(group_id) {
223            let previous = gate.fetch_sub(1, Ordering::SeqCst);
224            debug_assert!(previous > 0, "Released more group slots than acquired");
225        }
226    }
227
228    /// Returns the current active count for a specific group.
229    ///
230    /// # Arguments
231    ///
232    /// - `group_id`: Identifier of the restart execution plan group.
233    ///
234    /// # Returns
235    ///
236    /// Returns the active restart count for the specified group.
237    pub fn get_active_count_for_group(&self, group_id: &str) -> u32 {
238        let gates = self.group_gates.lock().unwrap();
239        gates
240            .get(group_id)
241            .map(|g| g.load(Ordering::SeqCst))
242            .unwrap_or(0)
243    }
244
245    /// Checks if a specific group's gate is saturated.
246    ///
247    /// # Arguments
248    ///
249    /// - `group_id`: Identifier of the restart execution plan group.
250    ///
251    /// # Returns
252    ///
253    /// Returns `true` if the group's active count has reached the limit.
254    pub fn is_group_saturated(&self, group_id: &str) -> bool {
255        self.get_active_count_for_group(group_id) >= self.max_per_group
256    }
257}
258
259/// Combined throttle gate that enforces both instance and group limits.
260///
261/// When both gates are active, takes the stricter verdict: if either gate
262/// is saturated, the restart request is throttled.
263#[derive(Debug, Clone)]
264pub struct CombinedThrottleGate {
265    /// Instance-global concurrent restart gate.
266    instance_gate: SupervisorInstanceGate,
267    /// Optional group-level concurrent restart gate.
268    group_gate: Option<GroupLevelGate>,
269}
270
271impl CombinedThrottleGate {
272    /// Creates a combined throttle gate with both instance and group limits.
273    ///
274    /// # Arguments
275    ///
276    /// - `instance_gate`: Instance-global concurrent restart gate.
277    /// - `group_gate`: Optional group-level gate for per-group throttling.
278    ///
279    /// # Returns
280    ///
281    /// Returns a new [`CombinedThrottleGate`].
282    ///
283    /// # Examples
284    ///
285    /// ```
286    /// use rust_supervisor::runtime::concurrent_gate::{
287    ///     CombinedThrottleGate, SupervisorInstanceGate, GroupLevelGate,
288    /// };
289    ///
290    /// let instance = SupervisorInstanceGate::new(10);
291    /// let group = GroupLevelGate::new(5);
292    /// let combined = CombinedThrottleGate::new(instance, Some(group));
293    /// ```
294    pub fn new(instance_gate: SupervisorInstanceGate, group_gate: Option<GroupLevelGate>) -> Self {
295        Self {
296            instance_gate,
297            group_gate,
298        }
299    }
300
301    /// Attempts to acquire restart permission through both gates.
302    ///
303    /// Takes the stricter verdict: if either gate is saturated, returns `false`.
304    ///
305    /// # Arguments
306    ///
307    /// - `group_id`: Optional group identifier for group-level gate check.
308    ///
309    /// # Returns
310    ///
311    /// Returns `true` only if both instance and group gates allow the restart.
312    ///
313    /// # Examples
314    ///
315    /// ```
316    /// use rust_supervisor::runtime::concurrent_gate::{
317    ///     CombinedThrottleGate, SupervisorInstanceGate, GroupLevelGate,
318    /// };
319    ///
320    /// let instance = SupervisorInstanceGate::new(2);
321    /// let group = GroupLevelGate::new(1);
322    /// let combined = CombinedThrottleGate::new(instance, Some(group));
323    ///
324    /// assert!(combined.try_acquire(Some("group-a")));
325    /// assert!(!combined.try_acquire(Some("group-a"))); // Group limit reached
326    /// ```
327    pub fn try_acquire(&self, group_id: Option<&str>) -> bool {
328        // Check instance gate first
329        if !self.instance_gate.try_acquire() {
330            return false;
331        }
332
333        // If group gate exists and group_id provided, check group limit
334        if let (Some(group_gate), Some(gid)) = (&self.group_gate, group_id)
335            && !group_gate.try_acquire_for_group(gid)
336        {
337            // Release instance slot since group gate failed
338            self.instance_gate.release();
339            return false;
340        }
341
342        true
343    }
344
345    /// Releases restart slots from both instance and group gates.
346    ///
347    /// # Arguments
348    ///
349    /// - `group_id`: Optional group identifier for group-level release.
350    pub fn release(&self, group_id: Option<&str>) {
351        self.instance_gate.release();
352        if let (Some(group_gate), Some(gid)) = (&self.group_gate, group_id) {
353            group_gate.release_for_group(gid);
354        }
355    }
356
357    /// Returns the instance-global gate reference.
358    ///
359    /// # Returns
360    ///
361    /// Returns a reference to the instance gate for monitoring.
362    pub fn instance_gate(&self) -> &SupervisorInstanceGate {
363        &self.instance_gate
364    }
365
366    /// Returns the group-level gate reference if configured.
367    ///
368    /// # Returns
369    ///
370    /// Returns an optional reference to the group gate.
371    pub fn group_gate(&self) -> Option<&GroupLevelGate> {
372        self.group_gate.as_ref()
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use crate::runtime::concurrent_gate::{
379        CombinedThrottleGate, GroupLevelGate, SupervisorInstanceGate,
380    };
381
382    /// Tests basic acquire and release operations on supervisor instance gate.
383    #[test]
384    fn test_instance_gate_basic_acquire_release() {
385        let gate = SupervisorInstanceGate::new(3);
386        assert_eq!(gate.get_active_count(), 0);
387
388        assert!(gate.try_acquire());
389        assert_eq!(gate.get_active_count(), 1);
390
391        assert!(gate.try_acquire());
392        assert_eq!(gate.get_active_count(), 2);
393
394        gate.release();
395        assert_eq!(gate.get_active_count(), 1);
396
397        gate.release();
398        assert_eq!(gate.get_active_count(), 0);
399    }
400
401    /// Tests that instance gate correctly reports saturation when limit is reached.
402    #[test]
403    fn test_instance_gate_saturation() {
404        let gate = SupervisorInstanceGate::new(2);
405
406        assert!(gate.try_acquire());
407        assert!(gate.try_acquire());
408        assert!(!gate.try_acquire()); // Saturated
409
410        assert!(gate.is_saturated());
411    }
412
413    /// Tests that group-level gates isolate concurrency limits per group independently.
414    #[test]
415    fn test_group_gate_isolation() {
416        let gate = GroupLevelGate::new(2);
417
418        // Group A can acquire up to limit
419        assert!(gate.try_acquire_for_group("group-a"));
420        assert!(gate.try_acquire_for_group("group-a"));
421        assert!(!gate.try_acquire_for_group("group-a"));
422
423        // Group B is independent and unaffected
424        assert!(gate.try_acquire_for_group("group-b"));
425        assert_eq!(gate.get_active_count_for_group("group-b"), 1);
426        assert_eq!(gate.get_active_count_for_group("group-a"), 2);
427    }
428
429    /// Tests that combined gate takes the stricter verdict between instance and group gates.
430    #[test]
431    fn test_combined_gate_takes_stricter_verdict() {
432        let instance = SupervisorInstanceGate::new(5);
433        let group = GroupLevelGate::new(2);
434        let combined = CombinedThrottleGate::new(instance, Some(group));
435
436        // Group limit is stricter (2 vs 5)
437        assert!(combined.try_acquire(Some("test-group")));
438        assert!(combined.try_acquire(Some("test-group")));
439        assert!(!combined.try_acquire(Some("test-group"))); // Group saturated
440    }
441
442    /// Tests that combined gate works correctly without a group gate configured.
443    #[test]
444    fn test_combined_gate_without_group() {
445        let instance = SupervisorInstanceGate::new(2);
446        let combined = CombinedThrottleGate::new(instance, None);
447
448        // Only instance gate applies
449        assert!(combined.try_acquire(None));
450        assert!(combined.try_acquire(None));
451        assert!(!combined.try_acquire(None)); // Instance saturated
452    }
453}