running_process/broker/server/
idle_coord.rs1use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10use super::backend_registry::BackendKey;
11use super::broadcast::QuiesceReason;
12
13pub const DEFAULT_BACKEND_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
15
16#[derive(Clone, Copy, Debug, PartialEq, Eq)]
18pub struct BackendIdlePolicy {
19 pub default_idle_timeout: Duration,
21}
22
23impl BackendIdlePolicy {
24 pub fn new(default_idle_timeout: Duration) -> Self {
26 Self {
27 default_idle_timeout: non_zero_duration(default_idle_timeout),
28 }
29 }
30
31 pub fn default_idle_timeout(&self) -> Duration {
33 self.default_idle_timeout
34 }
35}
36
37impl Default for BackendIdlePolicy {
38 fn default() -> Self {
39 Self {
40 default_idle_timeout: DEFAULT_BACKEND_IDLE_TIMEOUT,
41 }
42 }
43}
44
45#[derive(Debug)]
47pub struct BackendIdleCoordinator {
48 policy: BackendIdlePolicy,
49 entries: HashMap<BackendKey, BackendIdleEntry>,
50}
51
52impl BackendIdleCoordinator {
53 pub fn new() -> Self {
55 Self::with_policy(BackendIdlePolicy::default())
56 }
57
58 pub fn with_policy(policy: BackendIdlePolicy) -> Self {
60 Self {
61 policy,
62 entries: HashMap::new(),
63 }
64 }
65
66 pub fn mark_activity(&mut self, key: BackendKey, now: Instant) {
68 self.mark_activity_with_timeout(key, now, self.policy.default_idle_timeout);
69 }
70
71 pub fn mark_activity_with_timeout(
73 &mut self,
74 key: BackendKey,
75 now: Instant,
76 idle_timeout: Duration,
77 ) {
78 self.entries.insert(
79 key,
80 BackendIdleEntry {
81 last_active_at: now,
82 idle_timeout: non_zero_duration(idle_timeout),
83 state: BackendIdleState::Running,
84 },
85 );
86 }
87
88 pub fn mark_draining(&mut self, key: &BackendKey) -> bool {
92 self.mark_state(key, BackendIdleState::Draining)
93 }
94
95 pub fn mark_quiesced(&mut self, key: &BackendKey) -> bool {
99 self.mark_state(key, BackendIdleState::Quiesced)
100 }
101
102 pub fn remove_backend(&mut self, key: &BackendKey) -> bool {
104 self.entries.remove(key).is_some()
105 }
106
107 pub fn collect_due_for_quiesce(&mut self, now: Instant) -> Vec<BackendIdleDue> {
112 let mut due = Vec::new();
113
114 for (key, entry) in &mut self.entries {
115 if entry.state != BackendIdleState::Running {
116 continue;
117 }
118
119 let idle_for = elapsed_since(entry.last_active_at, now);
120 if idle_for < entry.idle_timeout {
121 continue;
122 }
123
124 entry.state = BackendIdleState::Draining;
125 due.push(BackendIdleDue {
126 key: key.clone(),
127 idle_for,
128 configured_timeout: entry.idle_timeout,
129 reason: QuiesceReason::IdleTimeout,
130 });
131 }
132
133 due
134 }
135
136 fn mark_state(&mut self, key: &BackendKey, state: BackendIdleState) -> bool {
137 let Some(entry) = self.entries.get_mut(key) else {
138 return false;
139 };
140 entry.state = state;
141 true
142 }
143}
144
145impl Default for BackendIdleCoordinator {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151#[derive(Clone, Debug, PartialEq, Eq)]
153pub struct BackendIdleDue {
154 pub key: BackendKey,
156 pub idle_for: Duration,
158 pub configured_timeout: Duration,
160 pub reason: QuiesceReason,
162}
163
164#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165enum BackendIdleState {
166 Running,
167 Draining,
168 Quiesced,
169}
170
171#[derive(Clone, Debug)]
172struct BackendIdleEntry {
173 last_active_at: Instant,
174 idle_timeout: Duration,
175 state: BackendIdleState,
176}
177
178fn non_zero_duration(duration: Duration) -> Duration {
179 if duration.is_zero() {
180 Duration::from_millis(1)
181 } else {
182 duration
183 }
184}
185
186fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
187 now.checked_duration_since(started_at)
188 .unwrap_or(Duration::ZERO)
189}