#![allow(dead_code)]
mod operations;
mod utils;
use crate::local_adapter::LocalAdapter;
use ahash::AHashMap;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use sockudo_core::channel::PresenceMemberInfo;
use sockudo_core::metrics::MetricsInterface;
use sockudo_core::websocket::SocketId;
use std::collections::HashSet;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tokio::sync::{Notify, RwLock};
use uuid::Uuid;
type FastDashMap<K, V> = DashMap<K, V, ahash::RandomState>;
fn fast_dashmap<K: Eq + std::hash::Hash, V>() -> FastDashMap<K, V> {
DashMap::with_hasher(ahash::RandomState::new())
}
pub use utils::{current_timestamp, generate_request_id};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum RequestType {
ChannelMembers, ChannelSockets, ChannelSocketsCount, SocketExistsInChannel, TerminateUserConnections, ChannelsWithSocketsCount,
Sockets, Channels, SocketsCount, ChannelMembersCount, CountUserConnectionsInChannel,
PresenceMemberJoined, PresenceMemberLeft, PresenceMemberUpdated,
Heartbeat, NodeDead,
PresenceStateSync, BatchChannelSocketsCount,
ChannelCountUpdate, ChannelCountSync, }
impl RequestType {
pub fn is_fire_and_forget(&self) -> bool {
matches!(
self,
Self::PresenceMemberJoined
| Self::PresenceMemberLeft
| Self::PresenceMemberUpdated
| Self::Heartbeat
| Self::NodeDead
| Self::PresenceStateSync
| Self::ChannelCountUpdate
| Self::ChannelCountSync
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestBody {
pub request_id: String,
pub node_id: String,
pub app_id: String,
pub request_type: RequestType,
pub channel: Option<String>,
pub socket_id: Option<String>,
pub user_id: Option<String>,
pub user_info: Option<sonic_rs::Value>, pub timestamp: Option<u64>, pub dead_node_id: Option<String>, pub target_node_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub channels: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reply_to: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResponseBody {
pub request_id: String,
pub node_id: String,
pub app_id: String,
pub members: AHashMap<String, PresenceMemberInfo>,
pub channels_with_sockets_count: AHashMap<String, usize>,
pub socket_ids: Vec<String>,
pub sockets_count: usize,
pub exists: bool,
pub channels: HashSet<String>,
pub members_count: usize, #[serde(default)]
pub responses_received: usize,
#[serde(default)]
pub expected_responses: usize,
#[serde(default = "default_true")]
pub complete: bool,
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BroadcastMessage {
pub node_id: String,
pub app_id: String,
pub channel: String,
pub message: String,
pub except_socket_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp_ms: Option<f64>, #[serde(skip_serializing_if = "Option::is_none")]
pub compression_metadata: Option<CompressionMetadata>,
#[serde(skip_serializing_if = "Option::is_none")]
pub idempotency_key: Option<String>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub ephemeral: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionMetadata {
pub conflation_key: Option<String>,
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub sequence: Option<u32>,
pub is_full_message: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_name: Option<String>,
}
#[derive(Clone)]
pub struct PendingRequest {
pub(crate) start_time: Instant,
pub(crate) app_id: String,
pub(crate) responses: Vec<ResponseBody>,
pub(crate) notify: Arc<Notify>,
}
#[derive(Debug, Clone, Copy)]
pub struct AggregationStats {
pub responses_received: usize,
pub expected_responses: usize,
pub complete: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PresenceEntry {
pub user_info: Option<Arc<sonic_rs::Value>>,
pub node_id: String, pub app_id: String, pub user_id: String, pub socket_id: String, pub sequence_number: u64, }
pub type ClusterPresenceRegistry =
AHashMap<String, AHashMap<String, AHashMap<String, PresenceEntry>>>;
#[derive(Debug, Clone)]
pub struct DeadNodeEvent {
pub dead_node_id: String,
pub orphaned_members: Vec<OrphanedMember>,
}
#[derive(Debug, Clone)]
pub struct OrphanedMember {
pub app_id: String,
pub channel: String,
pub user_id: String,
pub user_info: Option<sonic_rs::Value>,
}
pub struct HorizontalAdapter {
pub node_id: String,
pub local_adapter: Arc<LocalAdapter>,
pub pending_requests: Arc<FastDashMap<String, PendingRequest>>,
pub requests_timeout: AtomicU64,
pub metrics: OnceLock<Arc<dyn MetricsInterface + Send + Sync>>,
pub cluster_presence_registry: Arc<RwLock<ClusterPresenceRegistry>>,
pub node_heartbeats: Arc<RwLock<AHashMap<String, Instant>>>,
pub sequence_counter: Arc<AtomicU64>,
pub cluster_channel_counts: Arc<FastDashMap<(String, String), AHashMap<String, usize>>>,
pub dirty_channel_counts: Arc<FastDashMap<(String, String), ()>>,
}
impl Default for HorizontalAdapter {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests;