use crate::adapter::connection_manager::ConnectionManager;
use crate::app::config::App;
use crate::error::Result;
use crate::protocol::messages::PusherMessage;
use crate::webhook::integration::WebhookIntegration;
use crate::websocket::SocketId;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error};
pub struct PresenceManager;
impl PresenceManager {
pub async fn handle_member_added(
connection_manager: Arc<Mutex<dyn ConnectionManager + Send + Sync>>,
webhook_integration: Option<&Arc<WebhookIntegration>>,
app_config: &App,
channel: &str,
user_id: &str,
user_info: Option<&serde_json::Value>,
excluding_socket: Option<&SocketId>,
) -> Result<()> {
debug!(
"Processing presence member addition for user {} in channel {} (app: {})",
user_id, channel, app_config.id
);
let had_other_connections = Self::user_has_other_connections_in_presence_channel(
Arc::clone(&connection_manager),
&app_config.id,
channel,
user_id,
excluding_socket,
)
.await?;
if !had_other_connections {
debug!(
"User {} is joining channel {} for the first time, sending member_added events",
user_id, channel
);
if let Some(webhook_integration) = webhook_integration {
webhook_integration
.send_member_added(app_config, channel, user_id)
.await
.ok(); }
let member_added_msg = crate::protocol::messages::PusherMessage::member_added(
channel.to_string(),
user_id.to_string(),
user_info.cloned(),
);
Self::broadcast_to_channel(
Arc::clone(&connection_manager),
&app_config.id,
channel,
member_added_msg,
excluding_socket,
)
.await?;
debug!(
"Successfully processed member_added for user {} in channel {}",
user_id, channel
);
} else {
debug!(
"User {} already has connections in channel {}, skipping member_added events",
user_id, channel
);
}
if let Some(excluding_socket) = excluding_socket
&& let Some(horizontal_adapter) =
connection_manager.lock().await.as_horizontal_adapter()
{
horizontal_adapter
.broadcast_presence_join(
&app_config.id,
channel,
user_id,
excluding_socket.as_ref(),
user_info.cloned(),
)
.await
.map_err(|e| {
error!("Failed to broadcast presence join: {}", e);
e
})
.ok(); }
Ok(())
}
pub async fn handle_member_removed(
connection_manager: &Arc<Mutex<dyn ConnectionManager + Send + Sync>>,
webhook_integration: Option<&Arc<WebhookIntegration>>,
app_config: &App,
channel: &str,
user_id: &str,
excluding_socket: Option<&SocketId>,
) -> Result<()> {
debug!(
"Processing presence member removal for user {} in channel {} (app: {})",
user_id, channel, app_config.id
);
let has_other_connections = Self::user_has_other_connections_in_presence_channel(
Arc::clone(connection_manager),
&app_config.id,
channel,
user_id,
excluding_socket,
)
.await?;
if !has_other_connections {
debug!(
"User {} has no other connections in channel {}, sending member_removed events",
user_id, channel
);
if let Some(webhook_integration) = webhook_integration {
webhook_integration
.send_member_removed(app_config, channel, user_id)
.await
.ok(); }
let member_removed_msg =
PusherMessage::member_removed(channel.to_string(), user_id.to_string());
Self::broadcast_to_channel(
Arc::clone(connection_manager),
&app_config.id,
channel,
member_removed_msg,
excluding_socket,
)
.await?;
debug!(
"Successfully processed member_removed for user {} in channel {}",
user_id, channel
);
} else {
debug!(
"User {} has other connections in channel {}, skipping member_removed events",
user_id, channel
);
}
if let Some(excluding_socket) = excluding_socket
&& let Some(horizontal_adapter) =
connection_manager.lock().await.as_horizontal_adapter()
{
horizontal_adapter
.broadcast_presence_leave(
&app_config.id,
channel,
user_id,
excluding_socket.as_ref(),
)
.await
.map_err(|e| {
error!("Failed to broadcast presence leave: {}", e);
e
})
.ok(); }
Ok(())
}
async fn user_has_other_connections_in_presence_channel(
connection_manager: Arc<Mutex<dyn ConnectionManager + Send + Sync>>,
app_id: &str,
channel: &str,
user_id: &str,
excluding_socket: Option<&SocketId>,
) -> Result<bool> {
let mut connection_manager = connection_manager.lock().await;
let subscribed_count = connection_manager
.count_user_connections_in_channel(user_id, app_id, channel, excluding_socket)
.await?;
let has_other_connections = subscribed_count > 0;
Ok(has_other_connections)
}
async fn broadcast_to_channel(
connection_manager: Arc<Mutex<dyn ConnectionManager + Send + Sync>>,
app_id: &str,
channel: &str,
message: PusherMessage,
excluding_socket: Option<&SocketId>,
) -> Result<()> {
let mut connection_manager = connection_manager.lock().await;
connection_manager
.send(channel, message, excluding_socket, app_id, None)
.await
}
}