use std::{
collections::HashMap,
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use serde::{Deserialize, Serialize};
use tokio::{sync::RwLock, time::Instant};
use tracing::debug;
#[derive(Debug, Clone)]
pub struct PresenceConfig {
pub max_members_per_room: usize,
pub max_rooms: usize,
pub heartbeat_timeout: Duration,
}
impl PresenceConfig {
#[must_use]
pub const fn new() -> Self {
Self {
max_members_per_room: 500,
max_rooms: 10_000,
heartbeat_timeout: Duration::from_secs(30),
}
}
}
impl Default for PresenceConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PresenceMember {
pub id: String,
pub state: serde_json::Value,
#[serde(skip, default = "Instant::now")]
pub last_seen: Instant,
}
#[derive(Debug, Clone, Serialize)]
pub struct PresenceState {
pub room: String,
pub members: Vec<PresenceMember>,
}
#[derive(Debug, Clone, Serialize)]
pub struct PresenceDiff {
pub room: String,
pub joins: Vec<PresenceMember>,
pub leaves: Vec<String>,
}
#[derive(Debug)]
struct PresenceRoom {
members: HashMap<String, PresenceMember>,
}
impl PresenceRoom {
fn new() -> Self {
Self {
members: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct PresenceStats {
pub active_rooms: usize,
pub total_members: usize,
pub joins_total: u64,
pub leaves_total: u64,
pub evictions_total: u64,
}
#[derive(Debug)]
pub struct PresenceManager {
rooms: RwLock<HashMap<String, PresenceRoom>>,
config: PresenceConfig,
joins_total: AtomicU64,
leaves_total: AtomicU64,
evictions_total: AtomicU64,
}
impl PresenceManager {
#[must_use]
pub fn new(config: PresenceConfig) -> Self {
Self {
rooms: RwLock::new(HashMap::new()),
config,
joins_total: AtomicU64::new(0),
leaves_total: AtomicU64::new(0),
evictions_total: AtomicU64::new(0),
}
}
pub async fn join(
&self,
room: &str,
member_id: &str,
state: serde_json::Value,
) -> Result<(PresenceState, PresenceDiff), PresenceError> {
let mut rooms = self.rooms.write().await;
if !rooms.contains_key(room) {
if rooms.len() >= self.config.max_rooms {
return Err(PresenceError::TooManyRooms {
max: self.config.max_rooms,
});
}
rooms.insert(room.to_string(), PresenceRoom::new());
}
let Some(presence_room) = rooms.get_mut(room) else {
return Err(PresenceError::TooManyRooms {
max: self.config.max_rooms,
});
};
if !presence_room.members.contains_key(member_id)
&& presence_room.members.len() >= self.config.max_members_per_room
{
return Err(PresenceError::RoomFull {
room: room.to_string(),
max: self.config.max_members_per_room,
});
}
let member = PresenceMember {
id: member_id.to_string(),
state,
last_seen: Instant::now(),
};
presence_room.members.insert(member_id.to_string(), member.clone());
self.joins_total.fetch_add(1, Ordering::Relaxed);
let presence_state = PresenceState {
room: room.to_string(),
members: presence_room.members.values().cloned().collect(),
};
let diff = PresenceDiff {
room: room.to_string(),
joins: vec![member],
leaves: vec![],
};
debug!(
room,
member_id,
members = presence_room.members.len(),
"presence: member joined"
);
Ok((presence_state, diff))
}
pub async fn leave(&self, room: &str, member_id: &str) -> Option<PresenceDiff> {
let mut rooms = self.rooms.write().await;
let presence_room = rooms.get_mut(room)?;
presence_room.members.remove(member_id)?;
self.leaves_total.fetch_add(1, Ordering::Relaxed);
debug!(room, member_id, members = presence_room.members.len(), "presence: member left");
let diff = PresenceDiff {
room: room.to_string(),
joins: vec![],
leaves: vec![member_id.to_string()],
};
if presence_room.members.is_empty() {
rooms.remove(room);
debug!(room, "presence: room removed (empty)");
}
Some(diff)
}
pub async fn heartbeat(&self, room: &str, member_id: &str) -> bool {
let mut rooms = self.rooms.write().await;
if let Some(presence_room) = rooms.get_mut(room) {
if let Some(member) = presence_room.members.get_mut(member_id) {
member.last_seen = Instant::now();
return true;
}
}
false
}
pub async fn update_state(
&self,
room: &str,
member_id: &str,
new_state: serde_json::Value,
) -> Option<PresenceDiff> {
let mut rooms = self.rooms.write().await;
let presence_room = rooms.get_mut(room)?;
let member = presence_room.members.get_mut(member_id)?;
member.state = new_state;
member.last_seen = Instant::now();
Some(PresenceDiff {
room: room.to_string(),
joins: vec![member.clone()],
leaves: vec![],
})
}
pub async fn evict_stale(&self) -> Vec<PresenceDiff> {
let timeout = self.config.heartbeat_timeout;
let mut rooms = self.rooms.write().await;
let mut diffs = Vec::new();
let mut empty_rooms = Vec::new();
for (room_name, room) in rooms.iter_mut() {
let mut evicted = Vec::new();
room.members.retain(|id, member| {
if member.last_seen.elapsed() > timeout {
evicted.push(id.clone());
false
} else {
true
}
});
if !evicted.is_empty() {
let count = evicted.len();
self.evictions_total.fetch_add(count as u64, Ordering::Relaxed);
debug!(room = %room_name, evicted = count, "presence: evicted stale members");
diffs.push(PresenceDiff {
room: room_name.clone(),
joins: vec![],
leaves: evicted,
});
}
if room.members.is_empty() {
empty_rooms.push(room_name.clone());
}
}
for room_name in empty_rooms {
rooms.remove(&room_name);
}
diffs
}
pub async fn get_room(&self, room: &str) -> Option<PresenceState> {
let rooms = self.rooms.read().await;
let presence_room = rooms.get(room)?;
Some(PresenceState {
room: room.to_string(),
members: presence_room.members.values().cloned().collect(),
})
}
pub async fn stats(&self) -> PresenceStats {
let rooms = self.rooms.read().await;
let total_members: usize = rooms.values().map(|r| r.members.len()).sum();
PresenceStats {
active_rooms: rooms.len(),
total_members,
joins_total: self.joins_total.load(Ordering::Relaxed),
leaves_total: self.leaves_total.load(Ordering::Relaxed),
evictions_total: self.evictions_total.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum PresenceError {
#[error("room '{room}' is full: max {max} members")]
RoomFull {
room: String,
max: usize,
},
#[error("room limit exceeded: max {max} rooms")]
TooManyRooms {
max: usize,
},
}