use std::collections::HashMap;
use std::sync::Mutex;
pub const DEFAULT_PRESENCE_TTL_MS: u64 = 30_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PresenceState {
Active,
Stale,
Expired,
}
impl PresenceState {
pub fn as_str(self) -> &'static str {
match self {
PresenceState::Active => "active",
PresenceState::Stale => "stale",
PresenceState::Expired => "expired",
}
}
fn classify(age_ms: u64, ttl_ms: u64) -> Self {
if age_ms <= ttl_ms {
PresenceState::Active
} else if age_ms <= ttl_ms.saturating_mul(2) {
PresenceState::Stale
} else {
PresenceState::Expired
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsumerPresence {
pub queue: String,
pub group: String,
pub consumer: String,
pub registered_at_ns: u64,
pub last_heartbeat_ns: u64,
pub last_seen_age_ms: u64,
pub lease_count: u32,
pub state: PresenceState,
}
#[derive(Debug, Clone)]
struct PresenceEntry {
registered_at_ns: u64,
last_heartbeat_ns: u64,
lease_count: u32,
}
type PresenceKey = (String, String, String);
#[derive(Debug, Default)]
pub struct ConsumerPresenceRegistry {
entries: Mutex<HashMap<PresenceKey, PresenceEntry>>,
}
impl ConsumerPresenceRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn heartbeat(
&self,
queue: &str,
group: &str,
consumer: &str,
lease_count: u32,
now_ns: u64,
) {
let key = (queue.to_string(), group.to_string(), consumer.to_string());
let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
map.entry(key)
.and_modify(|e| {
e.last_heartbeat_ns = now_ns;
e.lease_count = lease_count;
})
.or_insert(PresenceEntry {
registered_at_ns: now_ns,
last_heartbeat_ns: now_ns,
lease_count,
});
}
pub fn deregister(&self, queue: &str, group: &str, consumer: &str) -> bool {
let key = (queue.to_string(), group.to_string(), consumer.to_string());
let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
map.remove(&key).is_some()
}
pub fn snapshot(&self, now_ns: u64, ttl_ms: u64) -> Vec<ConsumerPresence> {
let map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
let mut rows: Vec<ConsumerPresence> = map
.iter()
.map(|((queue, group, consumer), entry)| {
let age_ms = now_ns.saturating_sub(entry.last_heartbeat_ns) / 1_000_000;
ConsumerPresence {
queue: queue.clone(),
group: group.clone(),
consumer: consumer.clone(),
registered_at_ns: entry.registered_at_ns,
last_heartbeat_ns: entry.last_heartbeat_ns,
last_seen_age_ms: age_ms,
lease_count: entry.lease_count,
state: PresenceState::classify(age_ms, ttl_ms),
}
})
.collect();
rows.sort_by(|a, b| {
a.queue
.cmp(&b.queue)
.then_with(|| a.group.cmp(&b.group))
.then_with(|| a.consumer.cmp(&b.consumer))
});
rows
}
pub fn count_active_by_group(
&self,
now_ns: u64,
ttl_ms: u64,
) -> HashMap<(String, String), u32> {
let mut by_group: HashMap<(String, String), u32> = HashMap::new();
for row in self.snapshot(now_ns, ttl_ms) {
if row.state == PresenceState::Active {
*by_group.entry((row.queue, row.group)).or_insert(0) += 1;
}
}
by_group
}
pub fn prune_expired(&self, now_ns: u64, ttl_ms: u64) -> usize {
let mut map = self.entries.lock().unwrap_or_else(|p| p.into_inner());
let horizon_ns = ttl_ms.saturating_mul(2).saturating_mul(1_000_000);
let before = map.len();
map.retain(|_, entry| now_ns.saturating_sub(entry.last_heartbeat_ns) <= horizon_ns);
before - map.len()
}
pub fn len(&self) -> usize {
self.entries.lock().unwrap_or_else(|p| p.into_inner()).len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
const TTL_MS: u64 = 30_000;
const MS_NS: u64 = 1_000_000;
#[test]
fn first_heartbeat_registers_consumer_as_active() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
reg.heartbeat("orders", "workers", "w1", 0, t0);
let snap = reg.snapshot(t0, TTL_MS);
assert_eq!(snap.len(), 1);
let row = &snap[0];
assert_eq!(row.queue, "orders");
assert_eq!(row.group, "workers");
assert_eq!(row.consumer, "w1");
assert_eq!(row.registered_at_ns, t0);
assert_eq!(row.last_heartbeat_ns, t0);
assert_eq!(row.last_seen_age_ms, 0);
assert_eq!(row.lease_count, 0);
assert_eq!(row.state, PresenceState::Active);
}
#[test]
fn heartbeat_refreshes_last_seen_but_preserves_registered_at() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
let t1 = t0 + 5_000 * MS_NS;
reg.heartbeat("orders", "workers", "w1", 0, t0);
reg.heartbeat("orders", "workers", "w1", 3, t1);
let snap = reg.snapshot(t1, TTL_MS);
assert_eq!(
snap.len(),
1,
"heartbeat must update in place, not duplicate"
);
let row = &snap[0];
assert_eq!(row.registered_at_ns, t0, "registered_at is sticky");
assert_eq!(row.last_heartbeat_ns, t1);
assert_eq!(row.last_seen_age_ms, 0);
assert_eq!(row.lease_count, 3);
assert_eq!(row.state, PresenceState::Active);
}
#[test]
fn state_transitions_active_then_stale_then_expired() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
reg.heartbeat("orders", "workers", "w1", 0, t0);
let in_ttl = t0 + (TTL_MS - 1) * MS_NS;
assert_eq!(reg.snapshot(in_ttl, TTL_MS)[0].state, PresenceState::Active);
let in_stale = t0 + (TTL_MS + 1) * MS_NS;
let row = ®.snapshot(in_stale, TTL_MS)[0];
assert_eq!(row.state, PresenceState::Stale);
assert_eq!(row.last_seen_age_ms, TTL_MS + 1);
let in_expired = t0 + (TTL_MS * 2 + 1) * MS_NS;
assert_eq!(
reg.snapshot(in_expired, TTL_MS)[0].state,
PresenceState::Expired
);
}
#[test]
fn prune_expired_removes_only_beyond_horizon() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
reg.heartbeat("q", "g", "alive", 0, t0);
reg.heartbeat("q", "g", "stale", 0, t0 - (TTL_MS + TTL_MS / 2) * MS_NS);
reg.heartbeat("q", "g", "expired", 0, t0 - TTL_MS * 3 * MS_NS);
assert_eq!(reg.len(), 3);
let pruned = reg.prune_expired(t0, TTL_MS);
assert_eq!(pruned, 1, "only the >2*TTL entry is dropped");
let names: Vec<_> = reg
.snapshot(t0, TTL_MS)
.into_iter()
.map(|p| p.consumer)
.collect();
assert_eq!(names, vec!["alive".to_string(), "stale".to_string()]);
}
#[test]
fn count_active_by_group_segregates_queue_and_group() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
reg.heartbeat("orders", "workers", "w1", 0, t0);
reg.heartbeat("orders", "workers", "w2", 0, t0);
reg.heartbeat("orders", "audit", "a1", 0, t0);
reg.heartbeat("billing", "workers", "b1", 0, t0);
reg.heartbeat("orders", "workers", "ghost", 0, t0 - (TTL_MS + 1) * MS_NS);
let counts = reg.count_active_by_group(t0, TTL_MS);
assert_eq!(counts[&("orders".into(), "workers".into())], 2);
assert_eq!(counts[&("orders".into(), "audit".into())], 1);
assert_eq!(counts[&("billing".into(), "workers".into())], 1);
assert_eq!(counts.len(), 3, "stale ghost does not create a new bucket");
}
#[test]
fn aliveness_is_heartbeat_driven_not_pending_driven() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
reg.heartbeat("q", "g", "idle_poller", 0, t0);
reg.heartbeat("q", "g", "stuck_with_leases", 5, t0 - (TTL_MS * 3) * MS_NS);
let snap = reg.snapshot(t0, TTL_MS);
let by_consumer: HashMap<String, ConsumerPresence> =
snap.into_iter().map(|p| (p.consumer.clone(), p)).collect();
assert_eq!(
by_consumer["idle_poller"].state,
PresenceState::Active,
"zero pending must not demote an actively-heartbeating consumer"
);
assert_eq!(by_consumer["idle_poller"].lease_count, 0);
assert_eq!(
by_consumer["stuck_with_leases"].state,
PresenceState::Expired,
"non-zero pending must not promote a consumer that stopped beating"
);
assert_eq!(by_consumer["stuck_with_leases"].lease_count, 5);
let counts = reg.count_active_by_group(t0, TTL_MS);
assert_eq!(
counts.get(&("q".into(), "g".into())).copied().unwrap_or(0),
1,
"active count must reflect heartbeats, not pending deliveries"
);
}
#[test]
fn deregister_removes_consumer() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
reg.heartbeat("q", "g", "w1", 0, t0);
reg.heartbeat("q", "g", "w2", 0, t0);
assert!(reg.deregister("q", "g", "w1"));
assert!(!reg.deregister("q", "g", "w1"), "second deregister no-ops");
let names: Vec<_> = reg
.snapshot(t0, TTL_MS)
.into_iter()
.map(|p| p.consumer)
.collect();
assert_eq!(names, vec!["w2".to_string()]);
}
#[test]
fn snapshot_is_deterministically_ordered() {
let reg = ConsumerPresenceRegistry::new();
let t0 = 1_000_000_000_000_u64;
reg.heartbeat("zeta", "g", "c", 0, t0);
reg.heartbeat("alpha", "z", "a", 0, t0);
reg.heartbeat("alpha", "a", "z", 0, t0);
reg.heartbeat("alpha", "a", "a", 0, t0);
let snap = reg.snapshot(t0, TTL_MS);
let shape: Vec<_> = snap
.into_iter()
.map(|p| (p.queue, p.group, p.consumer))
.collect();
assert_eq!(
shape,
vec![
("alpha".into(), "a".into(), "a".into()),
("alpha".into(), "a".into(), "z".into()),
("alpha".into(), "z".into(), "a".into()),
("zeta".into(), "g".into(), "c".into()),
]
);
}
}