1use std::collections::HashMap;
45use std::sync::Mutex;
46
47pub const DEFAULT_PRESENCE_TTL_MS: u64 = 30_000;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum PresenceState {
58 Active,
60 Stale,
63 Expired,
67}
68
69impl PresenceState {
70 pub fn as_str(self) -> &'static str {
71 match self {
72 PresenceState::Active => "active",
73 PresenceState::Stale => "stale",
74 PresenceState::Expired => "expired",
75 }
76 }
77
78 fn classify(age_ms: u64, ttl_ms: u64) -> Self {
79 if age_ms <= ttl_ms {
80 PresenceState::Active
81 } else if age_ms <= ttl_ms.saturating_mul(2) {
82 PresenceState::Stale
83 } else {
84 PresenceState::Expired
85 }
86 }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct ConsumerPresence {
92 pub queue: String,
93 pub group: String,
94 pub consumer: String,
95 pub registered_at_ns: u64,
96 pub last_heartbeat_ns: u64,
97 pub last_seen_age_ms: u64,
100 pub lease_count: u32,
106 pub state: PresenceState,
107}
108
109#[derive(Debug, Clone)]
110struct PresenceEntry {
111 registered_at_ns: u64,
112 last_heartbeat_ns: u64,
113 lease_count: u32,
114}
115
116type PresenceKey = (String, String, String);
119
120#[derive(Debug, Default)]
125pub struct ConsumerPresenceRegistry {
126 entries: Mutex<HashMap<PresenceKey, PresenceEntry>>,
127}
128
129impl ConsumerPresenceRegistry {
130 pub fn new() -> Self {
131 Self::default()
132 }
133
134 pub fn heartbeat(
138 &self,
139 queue: &str,
140 group: &str,
141 consumer: &str,
142 lease_count: u32,
143 now_ns: u64,
144 ) {
145 let key = (queue.to_string(), group.to_string(), consumer.to_string());
146 let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
147 map.entry(key)
148 .and_modify(|e| {
149 e.last_heartbeat_ns = now_ns;
150 e.lease_count = lease_count;
151 })
152 .or_insert(PresenceEntry {
153 registered_at_ns: now_ns,
154 last_heartbeat_ns: now_ns,
155 lease_count,
156 });
157 }
158
159 pub fn deregister(&self, queue: &str, group: &str, consumer: &str) -> bool {
162 let key = (queue.to_string(), group.to_string(), consumer.to_string());
163 let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
164 map.remove(&key).is_some()
165 }
166
167 pub fn snapshot(&self, now_ns: u64, ttl_ms: u64) -> Vec<ConsumerPresence> {
171 let map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
172 let mut rows: Vec<ConsumerPresence> = map
173 .iter()
174 .map(|((queue, group, consumer), entry)| {
175 let age_ms = now_ns.saturating_sub(entry.last_heartbeat_ns) / 1_000_000;
176 ConsumerPresence {
177 queue: queue.clone(),
178 group: group.clone(),
179 consumer: consumer.clone(),
180 registered_at_ns: entry.registered_at_ns,
181 last_heartbeat_ns: entry.last_heartbeat_ns,
182 last_seen_age_ms: age_ms,
183 lease_count: entry.lease_count,
184 state: PresenceState::classify(age_ms, ttl_ms),
185 }
186 })
187 .collect();
188 rows.sort_by(|a, b| {
189 a.queue
190 .cmp(&b.queue)
191 .then_with(|| a.group.cmp(&b.group))
192 .then_with(|| a.consumer.cmp(&b.consumer))
193 });
194 rows
195 }
196
197 pub fn count_active_by_group(
202 &self,
203 now_ns: u64,
204 ttl_ms: u64,
205 ) -> HashMap<(String, String), u32> {
206 let mut by_group: HashMap<(String, String), u32> = HashMap::new();
207 for row in self.snapshot(now_ns, ttl_ms) {
208 if row.state == PresenceState::Active {
209 *by_group.entry((row.queue, row.group)).or_insert(0) += 1;
210 }
211 }
212 by_group
213 }
214
215 pub fn prune_expired(&self, now_ns: u64, ttl_ms: u64) -> usize {
221 let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
222 let horizon_ns = ttl_ms.saturating_mul(2).saturating_mul(1_000_000);
223 let before = map.len();
224 map.retain(|_, entry| now_ns.saturating_sub(entry.last_heartbeat_ns) <= horizon_ns);
225 before - map.len()
226 }
227
228 pub fn len(&self) -> usize {
231 self.entries.lock().unwrap_or_else(|p| p.into_inner()).len()
232 }
233
234 pub fn is_empty(&self) -> bool {
235 self.len() == 0
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 const TTL_MS: u64 = 30_000;
244 const MS_NS: u64 = 1_000_000;
245
246 #[test]
248 fn first_heartbeat_registers_consumer_as_active() {
249 let reg = ConsumerPresenceRegistry::new();
250 let t0 = 1_000_000_000_000_u64;
251 reg.heartbeat("orders", "workers", "w1", 0, t0);
252
253 let snap = reg.snapshot(t0, TTL_MS);
254 assert_eq!(snap.len(), 1);
255 let row = &snap[0];
256 assert_eq!(row.queue, "orders");
257 assert_eq!(row.group, "workers");
258 assert_eq!(row.consumer, "w1");
259 assert_eq!(row.registered_at_ns, t0);
260 assert_eq!(row.last_heartbeat_ns, t0);
261 assert_eq!(row.last_seen_age_ms, 0);
262 assert_eq!(row.lease_count, 0);
263 assert_eq!(row.state, PresenceState::Active);
264 }
265
266 #[test]
268 fn heartbeat_refreshes_last_seen_but_preserves_registered_at() {
269 let reg = ConsumerPresenceRegistry::new();
270 let t0 = 1_000_000_000_000_u64;
271 let t1 = t0 + 5_000 * MS_NS;
272 reg.heartbeat("orders", "workers", "w1", 0, t0);
273 reg.heartbeat("orders", "workers", "w1", 3, t1);
274
275 let snap = reg.snapshot(t1, TTL_MS);
276 assert_eq!(
277 snap.len(),
278 1,
279 "heartbeat must update in place, not duplicate"
280 );
281 let row = &snap[0];
282 assert_eq!(row.registered_at_ns, t0, "registered_at is sticky");
283 assert_eq!(row.last_heartbeat_ns, t1);
284 assert_eq!(row.last_seen_age_ms, 0);
285 assert_eq!(row.lease_count, 3);
286 assert_eq!(row.state, PresenceState::Active);
287 }
288
289 #[test]
291 fn state_transitions_active_then_stale_then_expired() {
292 let reg = ConsumerPresenceRegistry::new();
293 let t0 = 1_000_000_000_000_u64;
294 reg.heartbeat("orders", "workers", "w1", 0, t0);
295
296 let in_ttl = t0 + (TTL_MS - 1) * MS_NS;
298 assert_eq!(reg.snapshot(in_ttl, TTL_MS)[0].state, PresenceState::Active);
299
300 let in_stale = t0 + (TTL_MS + 1) * MS_NS;
302 let row = ®.snapshot(in_stale, TTL_MS)[0];
303 assert_eq!(row.state, PresenceState::Stale);
304 assert_eq!(row.last_seen_age_ms, TTL_MS + 1);
305
306 let in_expired = t0 + (TTL_MS * 2 + 1) * MS_NS;
308 assert_eq!(
309 reg.snapshot(in_expired, TTL_MS)[0].state,
310 PresenceState::Expired
311 );
312 }
313
314 #[test]
315 fn prune_expired_removes_only_beyond_horizon() {
316 let reg = ConsumerPresenceRegistry::new();
317 let t0 = 1_000_000_000_000_u64;
318 reg.heartbeat("q", "g", "alive", 0, t0);
320 reg.heartbeat("q", "g", "stale", 0, t0 - (TTL_MS + TTL_MS / 2) * MS_NS);
322 reg.heartbeat("q", "g", "expired", 0, t0 - TTL_MS * 3 * MS_NS);
324
325 assert_eq!(reg.len(), 3);
326 let pruned = reg.prune_expired(t0, TTL_MS);
327 assert_eq!(pruned, 1, "only the >2*TTL entry is dropped");
328 let names: Vec<_> = reg
329 .snapshot(t0, TTL_MS)
330 .into_iter()
331 .map(|p| p.consumer)
332 .collect();
333 assert_eq!(names, vec!["alive".to_string(), "stale".to_string()]);
334 }
335
336 #[test]
339 fn count_active_by_group_segregates_queue_and_group() {
340 let reg = ConsumerPresenceRegistry::new();
341 let t0 = 1_000_000_000_000_u64;
342
343 reg.heartbeat("orders", "workers", "w1", 0, t0);
344 reg.heartbeat("orders", "workers", "w2", 0, t0);
345 reg.heartbeat("orders", "audit", "a1", 0, t0);
346 reg.heartbeat("billing", "workers", "b1", 0, t0);
347 reg.heartbeat("orders", "workers", "ghost", 0, t0 - (TTL_MS + 1) * MS_NS);
349
350 let counts = reg.count_active_by_group(t0, TTL_MS);
351 assert_eq!(counts[&("orders".into(), "workers".into())], 2);
352 assert_eq!(counts[&("orders".into(), "audit".into())], 1);
353 assert_eq!(counts[&("billing".into(), "workers".into())], 1);
354 assert_eq!(counts.len(), 3, "stale ghost does not create a new bucket");
355 }
356
357 #[test]
366 fn aliveness_is_heartbeat_driven_not_pending_driven() {
367 let reg = ConsumerPresenceRegistry::new();
368 let t0 = 1_000_000_000_000_u64;
369
370 reg.heartbeat("q", "g", "idle_poller", 0, t0);
372 reg.heartbeat("q", "g", "stuck_with_leases", 5, t0 - (TTL_MS * 3) * MS_NS);
375
376 let snap = reg.snapshot(t0, TTL_MS);
377 let by_consumer: HashMap<String, ConsumerPresence> =
378 snap.into_iter().map(|p| (p.consumer.clone(), p)).collect();
379
380 assert_eq!(
381 by_consumer["idle_poller"].state,
382 PresenceState::Active,
383 "zero pending must not demote an actively-heartbeating consumer"
384 );
385 assert_eq!(by_consumer["idle_poller"].lease_count, 0);
386 assert_eq!(
387 by_consumer["stuck_with_leases"].state,
388 PresenceState::Expired,
389 "non-zero pending must not promote a consumer that stopped beating"
390 );
391 assert_eq!(by_consumer["stuck_with_leases"].lease_count, 5);
392
393 let counts = reg.count_active_by_group(t0, TTL_MS);
394 assert_eq!(
395 counts.get(&("q".into(), "g".into())).copied().unwrap_or(0),
396 1,
397 "active count must reflect heartbeats, not pending deliveries"
398 );
399 }
400
401 #[test]
402 fn deregister_removes_consumer() {
403 let reg = ConsumerPresenceRegistry::new();
404 let t0 = 1_000_000_000_000_u64;
405 reg.heartbeat("q", "g", "w1", 0, t0);
406 reg.heartbeat("q", "g", "w2", 0, t0);
407 assert!(reg.deregister("q", "g", "w1"));
408 assert!(!reg.deregister("q", "g", "w1"), "second deregister no-ops");
409 let names: Vec<_> = reg
410 .snapshot(t0, TTL_MS)
411 .into_iter()
412 .map(|p| p.consumer)
413 .collect();
414 assert_eq!(names, vec!["w2".to_string()]);
415 }
416
417 #[test]
418 fn snapshot_is_deterministically_ordered() {
419 let reg = ConsumerPresenceRegistry::new();
420 let t0 = 1_000_000_000_000_u64;
421 reg.heartbeat("zeta", "g", "c", 0, t0);
423 reg.heartbeat("alpha", "z", "a", 0, t0);
424 reg.heartbeat("alpha", "a", "z", 0, t0);
425 reg.heartbeat("alpha", "a", "a", 0, t0);
426
427 let snap = reg.snapshot(t0, TTL_MS);
428 let shape: Vec<_> = snap
429 .into_iter()
430 .map(|p| (p.queue, p.group, p.consumer))
431 .collect();
432 assert_eq!(
433 shape,
434 vec![
435 ("alpha".into(), "a".into(), "a".into()),
436 ("alpha".into(), "a".into(), "z".into()),
437 ("alpha".into(), "z".into(), "a".into()),
438 ("zeta".into(), "g".into(), "c".into()),
439 ]
440 );
441 }
442}