Skip to main content

running_process/broker/server/
idle_coord.rs

1//! Broker-side backend idle coordination model.
2//!
3//! This module is deliberately a pure state model. It tracks monotonic
4//! activity timestamps by backend key and reports which running backends should
5//! receive a future `Quiesce(IdleTimeout)` lifecycle broadcast.
6
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10use super::backend_registry::BackendKey;
11use super::broadcast::QuiesceReason;
12
13/// Default idle timeout for broker-managed backends.
14pub const DEFAULT_BACKEND_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
15
16/// Idle timeout policy for broker-managed backends.
17#[derive(Clone, Copy, Debug, PartialEq, Eq)]
18pub struct BackendIdlePolicy {
19    /// Idle duration after which a running backend should be quiesced.
20    pub default_idle_timeout: Duration,
21}
22
23impl BackendIdlePolicy {
24    /// Build a policy, clamping zero timeout to a non-zero floor.
25    pub fn new(default_idle_timeout: Duration) -> Self {
26        Self {
27            default_idle_timeout: non_zero_duration(default_idle_timeout),
28        }
29    }
30
31    /// Return the configured default idle timeout.
32    pub fn default_idle_timeout(&self) -> Duration {
33        self.default_idle_timeout
34    }
35}
36
37impl Default for BackendIdlePolicy {
38    fn default() -> Self {
39        Self {
40            default_idle_timeout: DEFAULT_BACKEND_IDLE_TIMEOUT,
41        }
42    }
43}
44
45/// Coordinates idle deadlines for backend keys.
46#[derive(Debug)]
47pub struct BackendIdleCoordinator {
48    policy: BackendIdlePolicy,
49    entries: HashMap<BackendKey, BackendIdleEntry>,
50}
51
52impl BackendIdleCoordinator {
53    /// Create an empty coordinator with the default idle policy.
54    pub fn new() -> Self {
55        Self::with_policy(BackendIdlePolicy::default())
56    }
57
58    /// Create an empty coordinator with an explicit idle policy.
59    pub fn with_policy(policy: BackendIdlePolicy) -> Self {
60        Self {
61            policy,
62            entries: HashMap::new(),
63        }
64    }
65
66    /// Record backend activity and reset its idle deadline.
67    pub fn mark_activity(&mut self, key: BackendKey, now: Instant) {
68        self.mark_activity_with_timeout(key, now, self.policy.default_idle_timeout);
69    }
70
71    /// Record backend activity with a backend-specific timeout.
72    pub fn mark_activity_with_timeout(
73        &mut self,
74        key: BackendKey,
75        now: Instant,
76        idle_timeout: Duration,
77    ) {
78        self.entries.insert(
79            key,
80            BackendIdleEntry {
81                last_active_at: now,
82                idle_timeout: non_zero_duration(idle_timeout),
83                state: BackendIdleState::Running,
84            },
85        );
86    }
87
88    /// Mark a backend as draining after a quiesce request has been emitted.
89    ///
90    /// Returns true when the backend key was tracked.
91    pub fn mark_draining(&mut self, key: &BackendKey) -> bool {
92        self.mark_state(key, BackendIdleState::Draining)
93    }
94
95    /// Mark a backend as quiesced after it has drained.
96    ///
97    /// Returns true when the backend key was tracked.
98    pub fn mark_quiesced(&mut self, key: &BackendKey) -> bool {
99        self.mark_state(key, BackendIdleState::Quiesced)
100    }
101
102    /// Remove a backend from idle tracking.
103    pub fn remove_backend(&mut self, key: &BackendKey) -> bool {
104        self.entries.remove(key).is_some()
105    }
106
107    /// Collect running backends whose idle timeout has elapsed.
108    ///
109    /// Collected backends are moved to `draining` so repeated collection does
110    /// not emit duplicate quiesce requests unless fresh activity is recorded.
111    pub fn collect_due_for_quiesce(&mut self, now: Instant) -> Vec<BackendIdleDue> {
112        let mut due = Vec::new();
113
114        for (key, entry) in &mut self.entries {
115            if entry.state != BackendIdleState::Running {
116                continue;
117            }
118
119            let idle_for = elapsed_since(entry.last_active_at, now);
120            if idle_for < entry.idle_timeout {
121                continue;
122            }
123
124            entry.state = BackendIdleState::Draining;
125            due.push(BackendIdleDue {
126                key: key.clone(),
127                idle_for,
128                configured_timeout: entry.idle_timeout,
129                reason: QuiesceReason::IdleTimeout,
130            });
131        }
132
133        due
134    }
135
136    fn mark_state(&mut self, key: &BackendKey, state: BackendIdleState) -> bool {
137        let Some(entry) = self.entries.get_mut(key) else {
138            return false;
139        };
140        entry.state = state;
141        true
142    }
143}
144
145impl Default for BackendIdleCoordinator {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151/// Backend due for idle quiesce.
152#[derive(Clone, Debug, PartialEq, Eq)]
153pub struct BackendIdleDue {
154    /// Backend key that crossed its idle deadline.
155    pub key: BackendKey,
156    /// Monotonic elapsed time since the backend was last marked active.
157    pub idle_for: Duration,
158    /// Timeout configured for this backend entry.
159    pub configured_timeout: Duration,
160    /// Quiesce reason for the future lifecycle broadcast call.
161    pub reason: QuiesceReason,
162}
163
164#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165enum BackendIdleState {
166    Running,
167    Draining,
168    Quiesced,
169}
170
171#[derive(Clone, Debug)]
172struct BackendIdleEntry {
173    last_active_at: Instant,
174    idle_timeout: Duration,
175    state: BackendIdleState,
176}
177
178fn non_zero_duration(duration: Duration) -> Duration {
179    if duration.is_zero() {
180        Duration::from_millis(1)
181    } else {
182        duration
183    }
184}
185
186fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
187    now.checked_duration_since(started_at)
188        .unwrap_or(Duration::ZERO)
189}