use super::ConnectionHandler;
use crate::channel_manager::ChannelManager;
use crate::cleanup::{AuthInfo, ConnectionCleanupInfo, DisconnectTask};
use crate::horizontal_adapter::DeadNodeEvent;
use sockudo_core::app::App;
use sockudo_core::error::{Error, Result};
use sockudo_core::websocket::SocketId;
use sockudo_protocol::messages::{ErrorData, MessageData, PusherMessage};
use sonic_rs::Value;
use sonic_rs::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info, warn};
impl ConnectionHandler {
pub async fn send_connection_established(
&self,
app_id: &str,
socket_id: &SocketId,
) -> Result<()> {
let connection_message = PusherMessage::connection_established(
socket_id.to_string(),
self.server_options.activity_timeout,
);
self.send_message_to_socket(app_id, socket_id, connection_message)
.await
}
pub async fn send_error(
&self,
app_id: &str,
socket_id: &SocketId,
error: &Error,
channel: Option<String>,
) -> Result<()> {
let error_data = ErrorData {
message: error.to_string(),
code: Some(u32::from(error.close_code())),
};
let error_message =
PusherMessage::error(error_data.code.unwrap_or(4000), error_data.message, channel);
self.send_message_to_socket(app_id, socket_id, error_message)
.await
}
pub async fn handle_unsubscribe(
&self,
socket_id: &SocketId,
message: &PusherMessage,
app_config: &App,
) -> Result<()> {
let channel_name = self.extract_channel_from_unsubscribe_message(message)?;
let user_id = self.get_user_id_for_socket(socket_id, app_config).await?;
#[cfg(feature = "delta")]
self.delta_compression
.clear_channel_state(socket_id, &channel_name);
let leave_response = ChannelManager::unsubscribe_local(
&self.connection_manager,
&socket_id.to_string(),
&channel_name,
&app_config.id,
user_id.as_deref(),
)
.await?;
self.update_connection_unsubscribe_state(socket_id, app_config, &channel_name)
.await?;
if let Some(ref metrics) = self.metrics {
let channel_type = sockudo_core::channel::ChannelType::from_name(&channel_name);
let channel_type_str = channel_type.as_str();
{
metrics.mark_channel_unsubscription(&app_config.id, channel_type_str);
}
if leave_response.vacated_locally {
metrics.mark_channel_deactivated(&app_config.id, channel_type_str);
}
}
if channel_name.starts_with("presence-")
&& let Some(user_id_str) = user_id
{
let presence_history_policy = app_config
.resolved_presence_history(&channel_name, &self.server_options().presence_history);
self.presence_manager
.handle_member_removed(
&self.connection_manager,
Arc::clone(self.presence_history_store()),
presence_history_policy.enabled,
self.webhook_integration.as_ref(),
self.metrics.as_ref(),
app_config,
&channel_name,
&user_id_str,
Some(socket_id),
sockudo_core::presence_history::PresenceHistoryEventCause::Disconnect,
None,
0,
Some(presence_history_policy.retention()),
)
.await?;
}
if let Err(e) = self
.send_unsubscribe_count_notifications(app_config, &channel_name)
.await
{
warn!(
"Failed to emit unsubscribe count notifications for {}: {}",
channel_name, e
);
}
Ok(())
}
async fn send_unsubscribe_count_notifications(
&self,
app_config: &App,
channel_name: &str,
) -> Result<()> {
if sockudo_core::utils::is_meta_channel(channel_name) {
return Ok(());
}
let wants_channel_vacated_webhook =
self.subscription_count_webhook_configured(app_config, "channel_vacated");
let wants_subscription_count_webhook =
self.subscription_count_webhook_configured(app_config, "subscription_count");
let wants_meta_channel = self
.subscription_count_meta_channel_has_local_subscriber(app_config, channel_name)
.await;
if !wants_channel_vacated_webhook
&& !wants_subscription_count_webhook
&& !wants_meta_channel
{
return Ok(());
}
let current_sub_count = self
.connection_manager
.get_channel_socket_count(&app_config.id, channel_name)
.await;
if wants_subscription_count_webhook
&& !channel_name.starts_with("presence-")
&& let Some(webhook_integration) = &self.webhook_integration
{
webhook_integration
.send_subscription_count_changed(app_config, channel_name, current_sub_count)
.await
.ok();
}
if wants_meta_channel {
let event_name = if current_sub_count == 0 {
"channel_vacated"
} else {
"subscription_count"
};
self.broadcast_metachannel_event(
app_config,
channel_name,
event_name,
sonic_rs::json!({
"channel": channel_name,
"subscription_count": current_sub_count,
}),
)
.await
.ok();
}
if current_sub_count == 0
&& wants_channel_vacated_webhook
&& let Some(webhook_integration) = &self.webhook_integration
{
let wi = Arc::clone(webhook_integration);
let cm = Arc::clone(&self.connection_manager);
let app = app_config.clone();
let channel = channel_name.to_string();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let count = cm.get_channel_socket_count(&app.id, &channel).await;
if count == 0 {
wi.send_channel_vacated(&app, &channel).await.ok();
}
});
}
Ok(())
}
pub(crate) fn subscription_count_webhook_configured(
&self,
app_config: &App,
event_type: &str,
) -> bool {
self.webhook_integration
.as_ref()
.is_some_and(|integration| integration.webhook_configured(app_config, event_type))
}
pub(crate) async fn subscription_count_meta_channel_has_local_subscriber(
&self,
app_config: &App,
channel: &str,
) -> bool {
match sockudo_core::utils::meta_channel_for(channel) {
Some(meta_channel) => {
self.connection_manager
.get_local_channel_socket_count(&app_config.id, &meta_channel)
.await
> 0
}
None => false,
}
}
async fn should_use_async_cleanup(&self) -> bool {
const MAX_CONSECUTIVE_FAILURES: usize = 10;
const CIRCUIT_BREAKER_RECOVERY_TIMEOUT_SECS: u64 = 30;
if let Some(ref cleanup_queue) = self.cleanup_queue {
let failures = self.cleanup_consecutive_failures.load(Ordering::Acquire);
if failures > MAX_CONSECUTIVE_FAILURES {
let opened_at = self
.cleanup_circuit_breaker_opened_at
.load(Ordering::Acquire);
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if opened_at == 0 {
match self.cleanup_circuit_breaker_opened_at.compare_exchange(
0,
current_time,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
warn!(
"Circuit breaker opened: too many cleanup failures ({}), disabling async cleanup for {} seconds",
failures, CIRCUIT_BREAKER_RECOVERY_TIMEOUT_SECS
);
}
Err(_) => {
debug!("Circuit breaker already opened by another thread");
}
}
return false;
} else if current_time >= opened_at + CIRCUIT_BREAKER_RECOVERY_TIMEOUT_SECS {
debug!(
"Circuit breaker entering half-open state after {} seconds, attempting recovery",
current_time - opened_at
);
return !cleanup_queue.is_closed();
} else {
debug!(
"Circuit breaker still open, {} seconds remaining until recovery attempt",
(opened_at + CIRCUIT_BREAKER_RECOVERY_TIMEOUT_SECS) - current_time
);
return false;
}
}
!cleanup_queue.is_closed()
} else {
false
}
}
pub async fn handle_disconnect(&self, app_id: &str, socket_id: &SocketId) -> Result<()> {
self.handle_disconnect_with_presence_timeout(app_id, socket_id, 0)
.await
}
pub async fn handle_ungraceful_disconnect(
&self,
app_id: &str,
socket_id: &SocketId,
) -> Result<()> {
self.handle_disconnect_with_presence_timeout(
app_id,
socket_id,
self.server_options().presence.ungraceful_timeout_seconds,
)
.await
}
async fn handle_disconnect_with_presence_timeout(
&self,
app_id: &str,
socket_id: &SocketId,
presence_ungraceful_timeout_seconds: u64,
) -> Result<()> {
debug!("Handling disconnect for socket: {}", socket_id);
if self.should_use_async_cleanup().await {
let cleanup_queue = self.cleanup_queue.as_ref().unwrap();
match self
.handle_disconnect_async(
app_id,
socket_id,
cleanup_queue,
presence_ungraceful_timeout_seconds,
)
.await
{
Ok(()) => {
let previous_failures =
self.cleanup_consecutive_failures.swap(0, Ordering::Release);
let was_circuit_breaker_open = self
.cleanup_circuit_breaker_opened_at
.swap(0, Ordering::Release);
if was_circuit_breaker_open > 0 {
info!(
"Circuit breaker recovered: async cleanup successful after {} failures",
previous_failures
);
}
return Ok(());
}
Err(e) => {
let new_failure_count = self
.cleanup_consecutive_failures
.fetch_add(1, Ordering::AcqRel)
+ 1;
warn!(
"Async cleanup failed for socket {} (failure #{}: {}), falling back to sync",
socket_id, new_failure_count, e
);
}
}
}
self.handle_disconnect_sync(app_id, socket_id, presence_ungraceful_timeout_seconds)
.await
}
async fn handle_disconnect_async(
&self,
app_id: &str,
socket_id: &SocketId,
cleanup_queue: &crate::cleanup::CleanupSender,
presence_ungraceful_timeout_seconds: u64,
) -> Result<()> {
use std::time::Instant;
debug!("Using async cleanup for socket: {}", socket_id);
let disconnect_info = {
let connection_manager = &self.connection_manager;
let connection = connection_manager.get_connection(socket_id, app_id).await;
if let Some(conn_ref) = connection {
let mut conn_locked = conn_ref.inner.lock().await;
if conn_locked.state.disconnecting {
debug!("Connection {} already disconnecting, skipping", socket_id);
return Ok(());
}
conn_locked.state.disconnecting = true;
let channels: Vec<String> =
conn_locked.state.get_subscribed_channels_list().to_vec();
let user_id = conn_locked.state.user_id.clone();
let presence_channels: Vec<String> = channels
.iter()
.filter(|ch| ch.starts_with("presence-"))
.cloned()
.collect();
Some(DisconnectTask {
socket_id: *socket_id,
app_id: app_id.to_string(),
subscribed_channels: channels,
user_id: user_id.clone(),
timestamp: Instant::now(),
connection_info: if !presence_channels.is_empty() {
Some(ConnectionCleanupInfo {
presence_channels,
auth_info: user_id.map(|uid| AuthInfo {
user_id: uid,
user_info: None,
}),
})
} else {
None
},
presence_ungraceful_timeout_seconds,
})
} else {
debug!("Connection {} not found during disconnect", socket_id);
return Ok(());
}
};
self.clear_activity_timeout(app_id, socket_id).await.ok();
self.clear_user_authentication_timeout(app_id, socket_id)
.await
.ok();
if self.client_event_limiters.remove(socket_id).is_some() {
debug!(
"Removed client event rate limiter for socket: {}",
socket_id
);
}
#[cfg(feature = "delta")]
self.delta_compression.remove_socket(socket_id);
if let Some(task) = disconnect_info {
if let Err(_send_error) = cleanup_queue.try_send(task) {
warn!(
"Failed to queue async cleanup for socket {} (queue full/closed), falling back to sync cleanup",
socket_id
);
{
let connection_manager = &self.connection_manager;
if let Some(conn_ref) =
connection_manager.get_connection(socket_id, app_id).await
{
let mut conn_locked = conn_ref.inner.lock().await;
conn_locked.state.disconnecting = false;
}
}
return self
.handle_disconnect_sync(app_id, socket_id, presence_ungraceful_timeout_seconds)
.await;
}
debug!("Queued async cleanup for socket: {}", socket_id);
}
if let Some(ref metrics) = self.metrics {
metrics.mark_disconnection(app_id, socket_id);
}
debug!(
"Fast disconnect processing completed for socket: {}",
socket_id
);
Ok(())
}
async fn handle_disconnect_sync(
&self,
app_id: &str,
socket_id: &SocketId,
presence_ungraceful_timeout_seconds: u64,
) -> Result<()> {
debug!("Using synchronous cleanup for socket: {}", socket_id);
let conn = {
let connection_manager = &self.connection_manager;
connection_manager.get_connection(socket_id, app_id).await
};
let already_disconnecting = if let Some(conn) = conn {
if let Ok(mut conn_locked) = conn.inner.try_lock() {
let was_disconnecting = conn_locked.state.disconnecting;
conn_locked.state.disconnecting = true;
was_disconnecting
} else {
debug!(
"Connection {} is busy, assuming disconnect already in progress",
socket_id
);
true
}
} else {
true
};
if already_disconnecting {
debug!(
"Connection {} already disconnecting or doesn't exist, skipping",
socket_id
);
return Ok(());
}
self.clear_activity_timeout(app_id, socket_id).await?;
self.clear_user_authentication_timeout(app_id, socket_id)
.await?;
if self.client_event_limiters.remove(socket_id).is_some() {
debug!(
"Removed client event rate limiter for socket: {}",
socket_id
);
}
#[cfg(feature = "delta")]
self.delta_compression.remove_socket(socket_id);
let app_config = match self.app_manager.find_by_id(app_id).await? {
Some(app) => app,
None => {
error!("App not found during disconnect: {}", app_id);
self.cleanup_connection_from_manager(socket_id, app_id)
.await;
return Err(sockudo_core::error::Error::ApplicationNotFound);
}
};
let (subscribed_channels, user_id, user_watchlist) = self
.extract_connection_state_for_disconnect(socket_id, &app_config)
.await?;
if let Some(ref user_id_str) = user_id {
self.handle_disconnect_watchlist_events(
&app_config,
user_id_str,
socket_id,
user_watchlist,
)
.await?;
}
self.cleanup_connection_from_manager(socket_id, app_id)
.await;
if !subscribed_channels.is_empty() {
self.process_channel_unsubscriptions_on_disconnect(
socket_id,
&app_config,
&subscribed_channels,
&user_id,
presence_ungraceful_timeout_seconds,
)
.await?;
}
if let Some(ref metrics) = self.metrics {
metrics.mark_disconnection(app_id, socket_id);
}
debug!(
"Successfully processed synchronous disconnect for socket: {}",
socket_id
);
Ok(())
}
async fn extract_connection_state_for_disconnect(
&self,
socket_id: &SocketId,
app_config: &App,
) -> Result<(HashSet<String>, Option<String>, Option<Vec<String>>)> {
let connection_manager = &self.connection_manager;
match connection_manager
.get_connection(socket_id, &app_config.id)
.await
{
Some(conn_arc) => {
let mut conn_locked = conn_arc.inner.lock().await;
conn_locked.state.timeouts.clear_all();
let watchlist = conn_locked
.state
.user_info
.as_ref()
.and_then(|ui| ui.watchlist.clone());
Ok((
conn_locked
.state
.get_subscribed_channels_list()
.into_iter()
.collect(),
conn_locked.state.user_id.clone(),
watchlist,
))
}
None => {
warn!(
"No connection found for socket during disconnect: {}",
socket_id
);
Ok((HashSet::new(), None, None))
}
}
}
async fn process_channel_unsubscriptions_on_disconnect(
&self,
socket_id: &SocketId,
app_config: &App,
subscribed_channels: &HashSet<String>,
user_id: &Option<String>,
presence_ungraceful_timeout_seconds: u64,
) -> Result<()> {
if subscribed_channels.is_empty() {
return Ok(());
}
debug!(
"Processing batch unsubscribe for socket {} from {} channels",
socket_id,
subscribed_channels.len()
);
let operations: Vec<(String, String, String)> = subscribed_channels
.iter()
.map(|channel| {
(
socket_id.to_string(),
channel.clone(),
app_config.id.clone(),
)
})
.collect();
match ChannelManager::batch_unsubscribe(&self.connection_manager, operations).await {
Ok(results) => {
for (channel_name, result) in &results {
match result {
Ok((was_removed, remaining_connections, local_vacated)) => {
if *local_vacated && let Some(ref metrics) = self.metrics {
let channel_type =
sockudo_core::channel::ChannelType::from_name(channel_name)
.as_str();
metrics.mark_channel_deactivated(&app_config.id, channel_type);
}
if *was_removed {
self.handle_post_unsubscribe_webhooks(
app_config,
channel_name,
user_id,
*remaining_connections,
socket_id,
presence_ungraceful_timeout_seconds,
)
.await?;
}
}
Err(e) => {
error!(
"Error unsubscribing socket {} from channel {} during disconnect: {}",
socket_id, channel_name, e
);
}
}
}
}
Err(e) => {
error!(
"Batch unsubscribe failed for socket {} during disconnect: {}",
socket_id, e
);
}
}
Ok(())
}
async fn handle_post_unsubscribe_webhooks(
&self,
app_config: &App,
channel_str: &str,
user_id: &Option<String>,
current_sub_count: usize,
socket_id: &SocketId,
presence_ungraceful_timeout_seconds: u64,
) -> Result<()> {
if channel_str.starts_with("presence-") {
if let Some(disconnected_user_id) = user_id {
let presence_history_policy = app_config.resolved_presence_history(
channel_str,
&self.server_options().presence_history,
);
self.presence_manager
.handle_member_removed(
&self.connection_manager,
Arc::clone(self.presence_history_store()),
presence_history_policy.enabled,
self.webhook_integration.as_ref(),
self.metrics.as_ref(),
app_config,
channel_str,
disconnected_user_id,
Some(socket_id),
sockudo_core::presence_history::PresenceHistoryEventCause::Disconnect,
None,
presence_ungraceful_timeout_seconds,
Some(presence_history_policy.retention()),
)
.await
.ok();
}
} else {
if let Some(webhook_integration) = &self.webhook_integration {
webhook_integration
.send_subscription_count_changed(app_config, channel_str, current_sub_count)
.await
.ok();
}
}
if current_sub_count == 0
&& let Some(webhook_integration) = &self.webhook_integration
&& webhook_integration.webhook_configured(app_config, "channel_vacated")
{
let wi = Arc::clone(webhook_integration);
let cm = Arc::clone(&self.connection_manager);
let app = app_config.clone();
let channel = channel_str.to_string();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let count = cm.get_channel_socket_count(&app.id, &channel).await;
if count == 0 {
wi.send_channel_vacated(&app, &channel).await.ok();
}
});
}
Ok(())
}
async fn handle_disconnect_watchlist_events(
&self,
app_config: &App,
user_id_str: &str,
socket_id: &SocketId,
user_watchlist: Option<Vec<String>>,
) -> Result<()> {
if app_config.watchlist_events_enabled() && user_watchlist.is_some() {
info!(
"Processing watchlist disconnect for user {} on socket {}",
user_id_str, socket_id
);
let offline_events = self
.watchlist_manager
.remove_user_connection(&app_config.id, user_id_str, socket_id)
.await?;
if !offline_events.is_empty() {
let watchers_to_notify = self
.get_watchers_for_user(&app_config.id, user_id_str)
.await?;
for event in offline_events {
for watcher_socket_id in &watchers_to_notify {
if let Err(e) = self
.send_message_to_socket(
&app_config.id,
watcher_socket_id,
event.clone(),
)
.await
{
warn!(
"Failed to send offline notification to watcher {}: {}",
watcher_socket_id, e
);
}
}
}
}
}
Ok(())
}
async fn cleanup_connection_from_manager(&self, socket_id: &SocketId, app_id: &str) {
let connection_manager = &self.connection_manager;
if let Some(conn_to_cleanup) = connection_manager.get_connection(socket_id, app_id).await {
connection_manager
.cleanup_connection(app_id, conn_to_cleanup)
.await;
}
connection_manager
.remove_connection(socket_id, app_id)
.await
.ok();
}
fn extract_channel_from_unsubscribe_message(&self, message: &PusherMessage) -> Result<String> {
let message_data = message.data.as_ref().ok_or_else(|| {
Error::InvalidMessageFormat("Missing data in unsubscribe message".into())
})?;
match message_data {
MessageData::String(channel_str) => Ok(channel_str.clone()),
MessageData::Json(data) => data
.get("channel")
.and_then(Value::as_str)
.map(|s| s.to_string())
.ok_or_else(|| {
Error::InvalidMessageFormat("Missing channel in unsubscribe message".into())
}),
MessageData::Structured { channel, .. } => {
channel.as_ref().map(|s| s.to_string()).ok_or_else(|| {
Error::InvalidMessageFormat("Missing channel in unsubscribe message".into())
})
}
}
}
async fn get_user_id_for_socket(
&self,
socket_id: &SocketId,
app_config: &App,
) -> Result<Option<String>> {
let connection_manager = &self.connection_manager;
if let Some(conn) = connection_manager
.get_connection(socket_id, &app_config.id)
.await
{
let conn_locked = conn.inner.lock().await;
Ok(conn_locked.state.user_id.clone())
} else {
Ok(None)
}
}
pub(crate) fn set_namespace_presence(
&self,
app_id: &str,
socket_id: &SocketId,
channel: &str,
info: sockudo_core::channel::PresenceMemberInfo,
) {
let Some(ref local_adapter) = self.local_adapter else {
warn!(
"local_adapter unavailable, namespace presence mirror not updated for socket {socket_id} in channel {channel}"
);
return;
};
let Some(namespace) = local_adapter.namespaces.get(app_id) else {
warn!("namespace {app_id} missing, presence mirror not updated for socket {socket_id}");
return;
};
namespace
.presence_data
.entry(*socket_id)
.or_default()
.insert(channel.to_string(), info);
}
pub(crate) fn clear_namespace_presence(
&self,
app_id: &str,
socket_id: &SocketId,
channel: &str,
) {
let Some(ref local_adapter) = self.local_adapter else {
warn!(
"local_adapter unavailable, namespace presence mirror not cleared for socket {socket_id} in channel {channel}"
);
return;
};
let Some(namespace) = local_adapter.namespaces.get(app_id) else {
return;
};
if let dashmap::mapref::entry::Entry::Occupied(mut per_socket) =
namespace.presence_data.entry(*socket_id)
{
per_socket.get_mut().remove(channel);
if per_socket.get().is_empty() {
per_socket.remove();
}
}
}
async fn update_connection_unsubscribe_state(
&self,
socket_id: &SocketId,
app_config: &App,
channel_name: &str,
) -> Result<()> {
let connection_manager = &self.connection_manager;
if let Some(conn_arc) = connection_manager
.get_connection(socket_id, &app_config.id)
.await
{
#[cfg(feature = "tag-filtering")]
if let Some(ref local_adapter) = self.local_adapter {
let filter_index = local_adapter.get_filter_index();
let filter_node = conn_arc.get_channel_filter_sync(channel_name);
filter_index.remove_socket_filter(channel_name, *socket_id, filter_node.as_deref());
}
conn_arc.channel_filters.remove(channel_name);
let mut conn_locked = conn_arc.inner.lock().await;
conn_locked.unsubscribe_from_channel(channel_name);
if channel_name.starts_with("presence-") {
conn_locked.remove_presence_info(channel_name);
drop(conn_locked);
self.clear_namespace_presence(&app_config.id, socket_id, channel_name);
}
}
Ok(())
}
#[allow(dead_code)]
async fn user_has_other_connections_in_presence_channel(
&self,
app_id: &str,
channel_name: &str,
user_id: &str,
) -> Result<bool> {
let connection_manager = &self.connection_manager;
let user_sockets = connection_manager.get_user_sockets(user_id, app_id).await?;
for ws_ref in user_sockets.iter() {
let socket_state_guard = ws_ref.inner.lock().await;
if socket_state_guard.state.is_subscribed(channel_name) {
return Ok(true);
}
}
Ok(false)
}
pub async fn send_missed_cache_if_exists(
&self,
app_id: &str,
socket_id: &SocketId,
channel: &str,
) -> Result<()> {
let cache_key = format!("app:{app_id}:channel:{channel}:cache_miss");
match self.cache_manager.get(&cache_key).await {
Ok(Some(cache_content)) => {
let cache_message: PusherMessage =
sonic_rs::from_str(&cache_content).map_err(|e| {
Error::InvalidMessageFormat(format!("Invalid cached message format: {e}"))
})?;
self.send_message_to_socket(app_id, socket_id, cache_message)
.await?;
info!(
"Sent cached content to socket {} for channel {}",
socket_id, channel
);
}
Ok(None) => {
let cache_miss_message = PusherMessage::cache_miss_event(channel.to_string());
self.send_message_to_socket(app_id, socket_id, cache_miss_message)
.await?;
if let Some(app_config) = self.app_manager.find_by_id(app_id).await?
&& let Some(webhook_integration) = &self.webhook_integration
&& let Err(e) = webhook_integration
.send_cache_missed(&app_config, channel)
.await
{
warn!(
"Failed to send cache_missed webhook for channel {}: {}",
channel, e
);
}
info!(
"No cached content found for channel: {}, sent cache_miss event",
channel
);
}
Err(e) => {
error!("Failed to get cache for channel {}: {}", channel, e);
let cache_miss_message = PusherMessage::cache_miss_event(channel.to_string());
self.send_message_to_socket(app_id, socket_id, cache_miss_message)
.await?;
return Err(Error::Internal(format!(
"Cache retrieval failed for channel {channel}: {e}"
)));
}
}
Ok(())
}
pub async fn store_cache_for_channel(
&self,
app_id: &str,
channel: &str,
message: &PusherMessage,
ttl_seconds: Option<u64>,
) -> Result<()> {
let cache_key = format!("app:{app_id}:channel:{channel}:cache_miss");
let message_json = sonic_rs::to_string(message).map_err(|e| {
Error::InvalidMessageFormat(format!("Failed to serialize message for cache: {e}"))
})?;
match ttl_seconds {
Some(ttl) => {
self.cache_manager
.set(&cache_key, &message_json, ttl)
.await
.map_err(|e| Error::Internal(format!("Failed to store cache with TTL: {e}")))?;
}
None => {
self.cache_manager
.set(&cache_key, &message_json, 0)
.await
.map_err(|e| Error::Internal(format!("Failed to store cache: {e}")))?;
}
}
debug!("Stored cache for channel {} in app {}", channel, app_id);
Ok(())
}
pub async fn clear_cache_for_channel(&self, app_id: &str, channel: &str) -> Result<()> {
let cache_key = format!("app:{app_id}:channel:{channel}:cache_miss");
self.cache_manager.remove(&cache_key).await.map_err(|e| {
Error::Internal(format!("Failed to clear cache for channel {channel}: {e}"))
})?;
debug!("Cleared cache for channel {} in app {}", channel, app_id);
Ok(())
}
pub async fn has_cache_for_channel(&self, app_id: &str, channel: &str) -> Result<bool> {
let cache_key = format!("app:{app_id}:channel:{channel}:cache_miss");
match self.cache_manager.get(&cache_key).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => {
warn!("Error checking cache for channel {}: {}", channel, e);
Ok(false) }
}
}
pub async fn handle_dead_node_cleanup(&self, event: DeadNodeEvent) -> Result<()> {
let orphaned_members_count = event.orphaned_members.len();
debug!(
"Processing dead node cleanup for node {}, cleaning up {} orphaned members",
event.dead_node_id, orphaned_members_count
);
let mut members_by_app: HashMap<String, Vec<_>> = HashMap::new();
for member in event.orphaned_members {
members_by_app
.entry(member.app_id.clone())
.or_default()
.push(member);
}
debug!(
"Batched {} orphaned members across {} apps for efficient processing",
orphaned_members_count,
members_by_app.len()
);
for (app_id, members) in members_by_app {
let app_config = match self.app_manager.find_by_id(&app_id).await {
Ok(Some(app)) => app,
Ok(None) => {
warn!(
"App {} not found during dead node cleanup, skipping {} members",
app_id,
members.len()
);
continue;
}
Err(e) => {
error!(
"Error fetching app {} during dead node cleanup: {}, skipping {} members",
app_id,
e,
members.len()
);
continue;
}
};
debug!(
"Processing {} orphaned members for app {}",
members.len(),
app_config.id
);
for orphaned_member in members {
let presence_history_policy = app_config.resolved_presence_history(
&orphaned_member.channel,
&self.server_options().presence_history,
);
if let Err(e) = self
.presence_manager
.handle_member_removed(
&self.connection_manager,
Arc::clone(self.presence_history_store()),
presence_history_policy.enabled,
self.webhook_integration.as_ref(),
self.metrics.as_ref(),
&app_config,
&orphaned_member.channel,
&orphaned_member.user_id,
None, sockudo_core::presence_history::PresenceHistoryEventCause::OrphanCleanup,
Some(&event.dead_node_id),
0,
Some(presence_history_policy.retention()),
)
.await
{
error!(
"Failed to handle member removal for user {} in channel {} (app: {}) during dead node cleanup: {}",
orphaned_member.user_id, orphaned_member.channel, orphaned_member.app_id, e
);
} else {
debug!(
"Successfully cleaned up orphaned member {} from channel {} (app: {})",
orphaned_member.user_id, orphaned_member.channel, orphaned_member.app_id
);
}
}
}
info!(
"Completed dead node cleanup for node {}, processed {} orphaned members",
event.dead_node_id, orphaned_members_count
);
Ok(())
}
}