use crate::adapter::connection_manager::ConnectionManager;
use crate::app::config::App;
use crate::error::Result;
use crate::protocol::messages::PusherMessage;
use crate::webhook::integration::WebhookIntegration;
use crate::websocket::SocketId;
use dashmap::DashMap;
use std::sync::Arc;
use tracing::{debug, error, warn};
fn presence_lock_key(app_id: &str, channel: &str, user_id: &str) -> String {
format!("{}:{}:{}", app_id, channel, user_id)
}
pub struct PresenceManager {
presence_locks: DashMap<String, ()>,
}
impl Default for PresenceManager {
fn default() -> Self {
Self::new()
}
}
impl PresenceManager {
pub fn new() -> Self {
Self {
presence_locks: DashMap::new(),
}
}
#[allow(clippy::too_many_arguments)]
pub async fn handle_member_added(
&self,
connection_manager: Arc<dyn ConnectionManager + Send + Sync>,
webhook_integration: Option<&Arc<WebhookIntegration>>,
app_config: &App,
channel: &str,
user_id: &str,
user_info: Option<&sonic_rs::Value>,
excluding_socket: Option<&SocketId>,
) -> Result<()> {
debug!(
"Processing presence member addition for user {} in channel {} (app: {})",
user_id, channel, app_config.id
);
let lock_key = presence_lock_key(&app_config.id, channel, user_id);
let _lock_guard = self.presence_locks.entry(lock_key.clone()).or_insert(());
let had_other_connections = Self::user_has_other_connections_in_presence_channel(
Arc::clone(&connection_manager),
&app_config.id,
channel,
user_id,
excluding_socket,
)
.await?;
if !had_other_connections {
debug!(
"User {} is joining channel {} for the first time, sending member_added events",
user_id, channel
);
if let Some(webhook_integration) = webhook_integration
&& let Err(e) = webhook_integration
.send_member_added(app_config, channel, user_id)
.await
{
warn!(
"Failed to send member_added webhook for user {} in channel {}: {}",
user_id, channel, e
);
}
let member_added_msg = crate::protocol::messages::PusherMessage::member_added(
channel.to_string(),
user_id.to_string(),
user_info.cloned(),
);
Self::broadcast_to_channel(
Arc::clone(&connection_manager),
&app_config.id,
channel,
member_added_msg,
excluding_socket,
)
.await?;
debug!(
"Successfully processed member_added for user {} in channel {}",
user_id, channel
);
} else {
debug!(
"User {} already has {} connections in channel {}, skipping member_added events",
user_id,
if had_other_connections { "other" } else { "no" },
channel
);
}
if let Some(excluding_socket) = excluding_socket
&& let Some(horizontal_adapter) = connection_manager.as_horizontal_adapter()
{
horizontal_adapter
.broadcast_presence_join(
&app_config.id,
channel,
user_id,
&excluding_socket.to_string(),
user_info.cloned(),
)
.await
.map_err(|e| {
error!("Failed to broadcast presence join: {}", e);
e
})
.ok(); }
Ok(())
}
pub async fn handle_member_removed(
&self,
connection_manager: &Arc<dyn ConnectionManager + Send + Sync>,
webhook_integration: Option<&Arc<WebhookIntegration>>,
app_config: &App,
channel: &str,
user_id: &str,
excluding_socket: Option<&SocketId>,
) -> Result<()> {
debug!(
"Processing presence member removal for user {} in channel {} (app: {})",
user_id, channel, app_config.id
);
let lock_key = presence_lock_key(&app_config.id, channel, user_id);
let _lock_guard = self.presence_locks.entry(lock_key.clone()).or_insert(());
let has_other_connections = Self::user_has_other_connections_in_presence_channel(
Arc::clone(connection_manager),
&app_config.id,
channel,
user_id,
excluding_socket,
)
.await?;
if !has_other_connections {
debug!(
"User {} has no other connections in channel {}, sending member_removed events",
user_id, channel
);
if let Some(webhook_integration) = webhook_integration
&& let Err(e) = webhook_integration
.send_member_removed(app_config, channel, user_id)
.await
{
warn!(
"Failed to send member_removed webhook for user {} in channel {}: {}",
user_id, channel, e
);
}
let member_removed_msg =
PusherMessage::member_removed(channel.to_string(), user_id.to_string());
Self::broadcast_to_channel(
Arc::clone(connection_manager),
&app_config.id,
channel,
member_removed_msg,
excluding_socket,
)
.await?;
debug!(
"Successfully processed member_removed for user {} in channel {}",
user_id, channel
);
} else {
debug!(
"User {} has other connections in channel {}, skipping member_removed events",
user_id, channel
);
}
if let Some(excluding_socket) = excluding_socket
&& let Some(horizontal_adapter) = connection_manager.as_horizontal_adapter()
{
horizontal_adapter
.broadcast_presence_leave(
&app_config.id,
channel,
user_id,
&excluding_socket.to_string(),
)
.await
.map_err(|e| {
error!("Failed to broadcast presence leave: {}", e);
e
})
.ok(); }
Ok(())
}
pub fn cleanup_stale_locks(&self) {
if self.presence_locks.len() > 100_000 {
warn!(
"Presence locks map has {} entries, clearing stale entries",
self.presence_locks.len()
);
self.presence_locks.clear();
}
}
async fn user_has_other_connections_in_presence_channel(
connection_manager: Arc<dyn ConnectionManager + Send + Sync>,
app_id: &str,
channel: &str,
user_id: &str,
excluding_socket: Option<&SocketId>,
) -> Result<bool> {
let subscribed_count = connection_manager
.count_user_connections_in_channel(user_id, app_id, channel, excluding_socket)
.await?;
let has_other_connections = subscribed_count > 0;
Ok(has_other_connections)
}
async fn broadcast_to_channel(
connection_manager: Arc<dyn ConnectionManager + Send + Sync>,
app_id: &str,
channel: &str,
message: PusherMessage,
excluding_socket: Option<&SocketId>,
) -> Result<()> {
connection_manager
.send(channel, message, excluding_socket, app_id, None)
.await
}
}
mod static_helpers {
use super::*;
use std::sync::LazyLock;
static GLOBAL_PRESENCE_MANAGER: LazyLock<PresenceManager> = LazyLock::new(PresenceManager::new);
pub fn global() -> &'static PresenceManager {
&GLOBAL_PRESENCE_MANAGER
}
}
pub fn global_presence_manager() -> &'static PresenceManager {
static_helpers::global()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_presence_lock_key_format() {
let key = presence_lock_key("app123", "presence-room", "user456");
assert_eq!(key, "app123:presence-room:user456");
}
#[test]
fn test_presence_manager_creation() {
let manager = PresenceManager::new();
assert_eq!(manager.presence_locks.len(), 0);
}
#[test]
fn test_cleanup_stale_locks_threshold() {
let manager = PresenceManager::new();
for i in 0..100 {
manager.presence_locks.insert(format!("key_{}", i), ());
}
manager.cleanup_stale_locks();
assert_eq!(manager.presence_locks.len(), 100);
}
}