Skip to main content

reddb_server/storage/queue/
presence.rs

1//! Queue consumer presence — issue #742.
2//!
3//! Tracks consumer liveness as an explicit heartbeat contract that is
4//! orthogonal to pending-delivery state. Red UI needs to render "is
5//! anyone reading queue X / group Y right now?" without inferring it
6//! from PEL entries: a worker can be alive on a quiet queue with zero
7//! pending deliveries, and a worker with stuck PEL entries may itself
8//! be dead. Presence is the answer to that question.
9//!
10//! Contract surface (process-local registry):
11//!
12//! - `heartbeat(queue, group, consumer, lease_count)` — record or
13//!   refresh the last-seen timestamp for a `(queue, group, consumer)`
14//!   triple. Called on every `QUEUE READ` (whether or not a message
15//!   was returned) so an idle poller still counts as alive, and may
16//!   be called explicitly by a `QUEUE HEARTBEAT` command in a follow-
17//!   up slice.
18//! - `snapshot(now_ns, ttl_ms)` — list every tracked consumer with
19//!   the derived `last_seen_age_ms`, `lease_count`, and lifecycle
20//!   flags (`active` / `stale` / `expired`). Stale means the consumer
21//!   missed at least one heartbeat budget but is still tracked;
22//!   expired means it crossed the prune horizon and is queued for
23//!   removal on the next sweep.
24//! - `count_active_by_group(now_ns, ttl_ms)` — per-`(queue, group)`
25//!   active consumer count, the field the operator-facing metadata
26//!   surfaces consume.
27//! - `prune_expired(now_ns, ttl_ms)` — drop entries whose age exceeds
28//!   `2 * ttl_ms` (the expiry horizon). Safe to call on every
29//!   snapshot path or on a background timer.
30//!
31//! Aliveness model:
32//!
33//! - `age_ms <= ttl_ms`                      → **active**
34//! - `ttl_ms < age_ms <= 2 * ttl_ms`         → **stale**  (one missed beat)
35//! - `age_ms > 2 * ttl_ms`                   → **expired** (prune-eligible)
36//!
37//! Durability follow-up: this slice is the typed contract + the
38//! metadata snapshot every consumer of presence (Red UI, `red.*`
39//! virtual tables, drivers) talks to. Mirroring writes into
40//! `red_queue_meta` rows so presence survives restart is the
41//! immediately-next slice; the public surface here does not change
42//! when that lands.
43
44use std::collections::HashMap;
45use std::sync::Mutex;
46
47/// Default heartbeat budget — a consumer is considered active for
48/// this long after its last beat. Operators can override per server
49/// via the runtime config; the registry itself is agnostic and takes
50/// the budget as an argument on every read path.
51pub const DEFAULT_PRESENCE_TTL_MS: u64 = 30_000;
52
53/// Lifecycle bucket derived from `last_seen_age_ms` vs the configured
54/// `ttl_ms`. Snapshot consumers (Red UI, virtual tables) read this
55/// flag and never re-derive the rule.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum PresenceState {
58    /// Last heartbeat within `ttl_ms`. Worker is alive.
59    Active,
60    /// Beyond `ttl_ms` but within `2 * ttl_ms`. Worker missed a beat;
61    /// surface a warning in Red UI but keep the row visible.
62    Stale,
63    /// Beyond `2 * ttl_ms`. Worker is assumed dead; will be pruned on
64    /// the next sweep but still appears in the snapshot until then so
65    /// the UI can show "last seen 5 minutes ago" rather than a gap.
66    Expired,
67}
68
69impl PresenceState {
70    pub fn as_str(self) -> &'static str {
71        match self {
72            PresenceState::Active => "active",
73            PresenceState::Stale => "stale",
74            PresenceState::Expired => "expired",
75        }
76    }
77
78    fn classify(age_ms: u64, ttl_ms: u64) -> Self {
79        if age_ms <= ttl_ms {
80            PresenceState::Active
81        } else if age_ms <= ttl_ms.saturating_mul(2) {
82            PresenceState::Stale
83        } else {
84            PresenceState::Expired
85        }
86    }
87}
88
89/// One row of presence state, returned by `snapshot`.
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct ConsumerPresence {
92    pub queue: String,
93    pub group: String,
94    pub consumer: String,
95    pub registered_at_ns: u64,
96    pub last_heartbeat_ns: u64,
97    /// `now_ns - last_heartbeat_ns`, milliseconds. Snapshotted at
98    /// read time so the UI does not have to derive it.
99    pub last_seen_age_ms: u64,
100    /// Caller-reported number of in-flight (locked but unacked)
101    /// messages for this consumer. Stored verbatim — the registry
102    /// does not cross-check it against the live PEL because the
103    /// presence contract is intentionally independent of pending
104    /// delivery state.
105    pub lease_count: u32,
106    pub state: PresenceState,
107}
108
109#[derive(Debug, Clone)]
110struct PresenceEntry {
111    registered_at_ns: u64,
112    last_heartbeat_ns: u64,
113    lease_count: u32,
114}
115
116/// Composite key for the registry — kept private so callers can only
117/// reach entries via the typed surface.
118type PresenceKey = (String, String, String);
119
120/// Process-local registry of consumer presence. Cheap mutex + small
121/// hashmap is the right shape: writes are O(1), reads are a single
122/// snapshot copy, and the cardinality is bounded by the operator's
123/// worker fleet (typically dozens, not thousands).
124#[derive(Debug, Default)]
125pub struct ConsumerPresenceRegistry {
126    entries: Mutex<HashMap<PresenceKey, PresenceEntry>>,
127}
128
129impl ConsumerPresenceRegistry {
130    pub fn new() -> Self {
131        Self::default()
132    }
133
134    /// Record or refresh a heartbeat. `now_ns` is taken from the
135    /// caller so tests can drive a deterministic clock and so the
136    /// runtime can reuse a wall-clock it already captured.
137    pub fn heartbeat(
138        &self,
139        queue: &str,
140        group: &str,
141        consumer: &str,
142        lease_count: u32,
143        now_ns: u64,
144    ) {
145        let key = (queue.to_string(), group.to_string(), consumer.to_string());
146        let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
147        map.entry(key)
148            .and_modify(|e| {
149                e.last_heartbeat_ns = now_ns;
150                e.lease_count = lease_count;
151            })
152            .or_insert(PresenceEntry {
153                registered_at_ns: now_ns,
154                last_heartbeat_ns: now_ns,
155                lease_count,
156            });
157    }
158
159    /// Explicitly drop a consumer (e.g. on graceful shutdown). Returns
160    /// whether an entry was actually removed.
161    pub fn deregister(&self, queue: &str, group: &str, consumer: &str) -> bool {
162        let key = (queue.to_string(), group.to_string(), consumer.to_string());
163        let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
164        map.remove(&key).is_some()
165    }
166
167    /// Full snapshot, deterministically ordered by `(queue, group,
168    /// consumer)` so test assertions and Red UI tables both see a
169    /// stable shape.
170    pub fn snapshot(&self, now_ns: u64, ttl_ms: u64) -> Vec<ConsumerPresence> {
171        let map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
172        let mut rows: Vec<ConsumerPresence> = map
173            .iter()
174            .map(|((queue, group, consumer), entry)| {
175                let age_ms = now_ns.saturating_sub(entry.last_heartbeat_ns) / 1_000_000;
176                ConsumerPresence {
177                    queue: queue.clone(),
178                    group: group.clone(),
179                    consumer: consumer.clone(),
180                    registered_at_ns: entry.registered_at_ns,
181                    last_heartbeat_ns: entry.last_heartbeat_ns,
182                    last_seen_age_ms: age_ms,
183                    lease_count: entry.lease_count,
184                    state: PresenceState::classify(age_ms, ttl_ms),
185                }
186            })
187            .collect();
188        rows.sort_by(|a, b| {
189            a.queue
190                .cmp(&b.queue)
191                .then_with(|| a.group.cmp(&b.group))
192                .then_with(|| a.consumer.cmp(&b.consumer))
193        });
194        rows
195    }
196
197    /// Active-consumer count per `(queue, group)`. Only entries whose
198    /// derived state is `Active` are counted — Red UI surfaces this
199    /// as the "workers alive on this group right now" number, so
200    /// stale/expired must not inflate it.
201    pub fn count_active_by_group(
202        &self,
203        now_ns: u64,
204        ttl_ms: u64,
205    ) -> HashMap<(String, String), u32> {
206        let mut by_group: HashMap<(String, String), u32> = HashMap::new();
207        for row in self.snapshot(now_ns, ttl_ms) {
208            if row.state == PresenceState::Active {
209                *by_group.entry((row.queue, row.group)).or_insert(0) += 1;
210            }
211        }
212        by_group
213    }
214
215    /// Drop entries whose `last_seen_age_ms` exceeds `2 * ttl_ms`.
216    /// Returns the number of entries removed. Safe to call on any
217    /// metadata-read path; not strictly required for correctness
218    /// (snapshot already classifies them as `Expired`), but bounds
219    /// memory after worker churn.
220    pub fn prune_expired(&self, now_ns: u64, ttl_ms: u64) -> usize {
221        let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
222        let horizon_ns = ttl_ms.saturating_mul(2).saturating_mul(1_000_000);
223        let before = map.len();
224        map.retain(|_, entry| now_ns.saturating_sub(entry.last_heartbeat_ns) <= horizon_ns);
225        before - map.len()
226    }
227
228    /// Total entry count (active + stale + expired). Mostly useful
229    /// for tests and debug surfaces.
230    pub fn len(&self) -> usize {
231        self.entries.lock().unwrap_or_else(|p| p.into_inner()).len()
232    }
233
234    pub fn is_empty(&self) -> bool {
235        self.len() == 0
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    const TTL_MS: u64 = 30_000;
244    const MS_NS: u64 = 1_000_000;
245
246    /// Acceptance: "Tests cover active consumer registration".
247    #[test]
248    fn first_heartbeat_registers_consumer_as_active() {
249        let reg = ConsumerPresenceRegistry::new();
250        let t0 = 1_000_000_000_000_u64;
251        reg.heartbeat("orders", "workers", "w1", 0, t0);
252
253        let snap = reg.snapshot(t0, TTL_MS);
254        assert_eq!(snap.len(), 1);
255        let row = &snap[0];
256        assert_eq!(row.queue, "orders");
257        assert_eq!(row.group, "workers");
258        assert_eq!(row.consumer, "w1");
259        assert_eq!(row.registered_at_ns, t0);
260        assert_eq!(row.last_heartbeat_ns, t0);
261        assert_eq!(row.last_seen_age_ms, 0);
262        assert_eq!(row.lease_count, 0);
263        assert_eq!(row.state, PresenceState::Active);
264    }
265
266    /// Acceptance: "Tests cover ... heartbeat update".
267    #[test]
268    fn heartbeat_refreshes_last_seen_but_preserves_registered_at() {
269        let reg = ConsumerPresenceRegistry::new();
270        let t0 = 1_000_000_000_000_u64;
271        let t1 = t0 + 5_000 * MS_NS;
272        reg.heartbeat("orders", "workers", "w1", 0, t0);
273        reg.heartbeat("orders", "workers", "w1", 3, t1);
274
275        let snap = reg.snapshot(t1, TTL_MS);
276        assert_eq!(
277            snap.len(),
278            1,
279            "heartbeat must update in place, not duplicate"
280        );
281        let row = &snap[0];
282        assert_eq!(row.registered_at_ns, t0, "registered_at is sticky");
283        assert_eq!(row.last_heartbeat_ns, t1);
284        assert_eq!(row.last_seen_age_ms, 0);
285        assert_eq!(row.lease_count, 3);
286        assert_eq!(row.state, PresenceState::Active);
287    }
288
289    /// Acceptance: "Tests cover ... expiry".
290    #[test]
291    fn state_transitions_active_then_stale_then_expired() {
292        let reg = ConsumerPresenceRegistry::new();
293        let t0 = 1_000_000_000_000_u64;
294        reg.heartbeat("orders", "workers", "w1", 0, t0);
295
296        // Within TTL → Active.
297        let in_ttl = t0 + (TTL_MS - 1) * MS_NS;
298        assert_eq!(reg.snapshot(in_ttl, TTL_MS)[0].state, PresenceState::Active);
299
300        // Between TTL and 2*TTL → Stale.
301        let in_stale = t0 + (TTL_MS + 1) * MS_NS;
302        let row = &reg.snapshot(in_stale, TTL_MS)[0];
303        assert_eq!(row.state, PresenceState::Stale);
304        assert_eq!(row.last_seen_age_ms, TTL_MS + 1);
305
306        // Beyond 2*TTL → Expired.
307        let in_expired = t0 + (TTL_MS * 2 + 1) * MS_NS;
308        assert_eq!(
309            reg.snapshot(in_expired, TTL_MS)[0].state,
310            PresenceState::Expired
311        );
312    }
313
314    #[test]
315    fn prune_expired_removes_only_beyond_horizon() {
316        let reg = ConsumerPresenceRegistry::new();
317        let t0 = 1_000_000_000_000_u64;
318        // active
319        reg.heartbeat("q", "g", "alive", 0, t0);
320        // stale (1.5 * TTL)
321        reg.heartbeat("q", "g", "stale", 0, t0 - (TTL_MS + TTL_MS / 2) * MS_NS);
322        // expired (3 * TTL)
323        reg.heartbeat("q", "g", "expired", 0, t0 - TTL_MS * 3 * MS_NS);
324
325        assert_eq!(reg.len(), 3);
326        let pruned = reg.prune_expired(t0, TTL_MS);
327        assert_eq!(pruned, 1, "only the >2*TTL entry is dropped");
328        let names: Vec<_> = reg
329            .snapshot(t0, TTL_MS)
330            .into_iter()
331            .map(|p| p.consumer)
332            .collect();
333        assert_eq!(names, vec!["alive".to_string(), "stale".to_string()]);
334    }
335
336    /// Acceptance: "Queue metadata includes active consumer count by
337    /// queue and group" and "Tests cover ... queue/group visibility".
338    #[test]
339    fn count_active_by_group_segregates_queue_and_group() {
340        let reg = ConsumerPresenceRegistry::new();
341        let t0 = 1_000_000_000_000_u64;
342
343        reg.heartbeat("orders", "workers", "w1", 0, t0);
344        reg.heartbeat("orders", "workers", "w2", 0, t0);
345        reg.heartbeat("orders", "audit", "a1", 0, t0);
346        reg.heartbeat("billing", "workers", "b1", 0, t0);
347        // stale — must not be counted as active
348        reg.heartbeat("orders", "workers", "ghost", 0, t0 - (TTL_MS + 1) * MS_NS);
349
350        let counts = reg.count_active_by_group(t0, TTL_MS);
351        assert_eq!(counts[&("orders".into(), "workers".into())], 2);
352        assert_eq!(counts[&("orders".into(), "audit".into())], 1);
353        assert_eq!(counts[&("billing".into(), "workers".into())], 1);
354        assert_eq!(counts.len(), 3, "stale ghost does not create a new bucket");
355    }
356
357    /// Acceptance: "The presence contract does not infer aliveness
358    /// solely from pending deliveries."
359    ///
360    /// Encoded as a property: a consumer with `lease_count == 0` (no
361    /// pending deliveries) that beats is still active; a consumer
362    /// with `lease_count > 0` (PEL entries) that has not beat in
363    /// `>TTL` is *not* active. Presence is heartbeat-driven, not
364    /// PEL-driven.
365    #[test]
366    fn aliveness_is_heartbeat_driven_not_pending_driven() {
367        let reg = ConsumerPresenceRegistry::new();
368        let t0 = 1_000_000_000_000_u64;
369
370        // Idle poller — no pending deliveries, fresh heartbeat.
371        reg.heartbeat("q", "g", "idle_poller", 0, t0);
372        // Worker that grabbed messages then died — still has PEL
373        // leases, but its last heartbeat is ancient.
374        reg.heartbeat("q", "g", "stuck_with_leases", 5, t0 - (TTL_MS * 3) * MS_NS);
375
376        let snap = reg.snapshot(t0, TTL_MS);
377        let by_consumer: HashMap<String, ConsumerPresence> =
378            snap.into_iter().map(|p| (p.consumer.clone(), p)).collect();
379
380        assert_eq!(
381            by_consumer["idle_poller"].state,
382            PresenceState::Active,
383            "zero pending must not demote an actively-heartbeating consumer"
384        );
385        assert_eq!(by_consumer["idle_poller"].lease_count, 0);
386        assert_eq!(
387            by_consumer["stuck_with_leases"].state,
388            PresenceState::Expired,
389            "non-zero pending must not promote a consumer that stopped beating"
390        );
391        assert_eq!(by_consumer["stuck_with_leases"].lease_count, 5);
392
393        let counts = reg.count_active_by_group(t0, TTL_MS);
394        assert_eq!(
395            counts.get(&("q".into(), "g".into())).copied().unwrap_or(0),
396            1,
397            "active count must reflect heartbeats, not pending deliveries"
398        );
399    }
400
401    #[test]
402    fn deregister_removes_consumer() {
403        let reg = ConsumerPresenceRegistry::new();
404        let t0 = 1_000_000_000_000_u64;
405        reg.heartbeat("q", "g", "w1", 0, t0);
406        reg.heartbeat("q", "g", "w2", 0, t0);
407        assert!(reg.deregister("q", "g", "w1"));
408        assert!(!reg.deregister("q", "g", "w1"), "second deregister no-ops");
409        let names: Vec<_> = reg
410            .snapshot(t0, TTL_MS)
411            .into_iter()
412            .map(|p| p.consumer)
413            .collect();
414        assert_eq!(names, vec!["w2".to_string()]);
415    }
416
417    #[test]
418    fn snapshot_is_deterministically_ordered() {
419        let reg = ConsumerPresenceRegistry::new();
420        let t0 = 1_000_000_000_000_u64;
421        // Insert in shuffled order.
422        reg.heartbeat("zeta", "g", "c", 0, t0);
423        reg.heartbeat("alpha", "z", "a", 0, t0);
424        reg.heartbeat("alpha", "a", "z", 0, t0);
425        reg.heartbeat("alpha", "a", "a", 0, t0);
426
427        let snap = reg.snapshot(t0, TTL_MS);
428        let shape: Vec<_> = snap
429            .into_iter()
430            .map(|p| (p.queue, p.group, p.consumer))
431            .collect();
432        assert_eq!(
433            shape,
434            vec![
435                ("alpha".into(), "a".into(), "a".into()),
436                ("alpha".into(), "a".into(), "z".into()),
437                ("alpha".into(), "z".into(), "a".into()),
438                ("zeta".into(), "g".into(), "c".into()),
439            ]
440        );
441    }
442}