use crate::app::manager::AppManager;
use crate::channel::PresenceMemberInfo;
use crate::error::Result;
use crate::namespace::Namespace;
use crate::protocol::messages::PusherMessage;
use crate::websocket::{SocketId, WebSocketBufferConfig, WebSocketRef};
use ahash::AHashMap as HashMap;
use async_trait::async_trait;
use crossfire::mpsc;
use sockudo_ws::axum_integration::WebSocketWriter;
use std::any::Any;
use std::sync::Arc;
pub type DeadNodeEventBusFlavor = mpsc::List<crate::adapter::horizontal_adapter::DeadNodeEvent>;
pub type DeadNodeEventBusSender = crossfire::MTx<DeadNodeEventBusFlavor>;
pub type DeadNodeEventBusReceiver = crossfire::AsyncRx<DeadNodeEventBusFlavor>;
pub struct CompressionParams<'a> {
pub delta_compression: Arc<crate::delta_compression::DeltaCompressionManager>,
pub channel_settings: Option<&'a crate::delta_compression::ChannelDeltaSettings>,
}
#[async_trait]
pub trait HorizontalAdapterInterface: Send + Sync {
async fn broadcast_presence_join(
&self,
app_id: &str,
channel: &str,
user_id: &str,
socket_id: &str,
user_info: Option<sonic_rs::Value>,
) -> Result<()>;
async fn broadcast_presence_leave(
&self,
app_id: &str,
channel: &str,
user_id: &str,
socket_id: &str,
) -> Result<()>;
}
#[async_trait]
pub trait ConnectionManager: Send + Sync {
async fn init(&self);
async fn get_namespace(&self, app_id: &str) -> Option<Arc<Namespace>>;
async fn add_socket(
&self,
socket_id: SocketId,
socket: WebSocketWriter,
app_id: &str,
app_manager: Arc<dyn AppManager + Send + Sync>,
buffer_config: WebSocketBufferConfig,
) -> Result<()>;
async fn get_connection(&self, socket_id: &SocketId, app_id: &str) -> Option<WebSocketRef>;
async fn remove_connection(&self, socket_id: &SocketId, app_id: &str) -> Result<()>;
async fn send_message(
&self,
app_id: &str,
socket_id: &SocketId,
message: PusherMessage,
) -> Result<()>;
async fn send(
&self,
channel: &str,
message: PusherMessage,
except: Option<&SocketId>,
app_id: &str,
start_time_ms: Option<f64>,
) -> Result<()>;
async fn send_with_compression(
&self,
channel: &str,
message: PusherMessage,
except: Option<&SocketId>,
app_id: &str,
start_time_ms: Option<f64>,
compression: CompressionParams<'_>,
) -> Result<()> {
let _ = compression;
self.send(channel, message, except, app_id, start_time_ms)
.await
}
async fn get_channel_members(
&self,
app_id: &str,
channel: &str,
) -> Result<HashMap<String, PresenceMemberInfo>>;
async fn get_channel_sockets(&self, app_id: &str, channel: &str) -> Result<Vec<SocketId>>;
async fn remove_channel(&self, app_id: &str, channel: &str);
async fn is_in_channel(
&self,
app_id: &str,
channel: &str,
socket_id: &SocketId,
) -> Result<bool>;
async fn get_user_sockets(&self, user_id: &str, app_id: &str) -> Result<Vec<WebSocketRef>>;
async fn cleanup_connection(&self, app_id: &str, ws: WebSocketRef);
async fn terminate_connection(&self, app_id: &str, user_id: &str) -> Result<()>;
async fn add_channel_to_sockets(&self, app_id: &str, channel: &str, socket_id: &SocketId);
async fn get_channel_socket_count(&self, app_id: &str, channel: &str) -> usize;
async fn add_to_channel(
&self,
app_id: &str,
channel: &str,
socket_id: &SocketId,
) -> Result<bool>;
async fn remove_from_channel(
&self,
app_id: &str,
channel: &str,
socket_id: &SocketId,
) -> Result<bool>;
async fn get_presence_member(
&self,
app_id: &str,
channel: &str,
socket_id: &SocketId,
) -> Option<PresenceMemberInfo>;
async fn terminate_user_connections(&self, app_id: &str, user_id: &str) -> Result<()>;
async fn add_user(&self, ws: WebSocketRef) -> Result<()>;
async fn remove_user(&self, ws: WebSocketRef) -> Result<()>;
async fn remove_user_socket(
&self,
user_id: &str,
socket_id: &SocketId,
app_id: &str,
) -> Result<()>;
async fn count_user_connections_in_channel(
&self,
user_id: &str,
app_id: &str,
channel: &str,
excluding_socket: Option<&SocketId>,
) -> Result<usize>;
async fn get_channels_with_socket_count(&self, app_id: &str) -> Result<HashMap<String, usize>>;
async fn get_sockets_count(&self, app_id: &str) -> Result<usize>;
async fn get_namespaces(&self) -> Result<Vec<(String, Arc<Namespace>)>>;
fn as_any_mut(&mut self) -> &mut dyn Any;
async fn check_health(&self) -> Result<()>;
fn get_node_id(&self) -> String;
fn as_horizontal_adapter(&self) -> Option<&dyn HorizontalAdapterInterface>;
fn configure_dead_node_events(&self) -> Option<DeadNodeEventBusReceiver> {
None }
}