mod broadcast;
mod connection_manager;
mod helpers;
use self::helpers::{PendingPresenceMember, pending_presence_channel_key};
#[cfg(feature = "tag-filtering")]
use crate::filter_index::FilterIndex;
use compact_str::CompactString;
use dashmap::DashMap;
#[cfg(feature = "delta")]
use sockudo_core::app::AppManager;
use sockudo_core::namespace::Namespace;
use std::sync::Arc;
#[cfg(feature = "delta")]
use std::sync::OnceLock;
#[cfg(feature = "tag-filtering")]
use std::sync::atomic::AtomicBool;
#[cfg(any(feature = "tag-filtering", feature = "delta"))]
use std::sync::atomic::Ordering;
use tokio::sync::Semaphore;
use tracing::info;
pub(super) type FastDashMap<K, V> = DashMap<K, V, ahash::RandomState>;
pub(super) fn fast_dashmap<K: Eq + std::hash::Hash, V>() -> FastDashMap<K, V> {
DashMap::with_hasher(ahash::RandomState::new())
}
pub struct LocalAdapter {
pub namespaces: Arc<FastDashMap<String, Arc<Namespace>>>,
pending_presence_members: Arc<FastDashMap<CompactString, PendingPresenceMember>>,
pending_presence_by_channel:
Arc<FastDashMap<CompactString, Arc<FastDashMap<CompactString, ()>>>>,
pub buffer_multiplier_per_cpu: usize,
pub max_concurrent: usize,
broadcast_semaphore: Arc<Semaphore>,
#[cfg(feature = "delta")]
delta_compression: Arc<OnceLock<Arc<sockudo_delta::DeltaCompressionManager>>>,
#[cfg(feature = "delta")]
app_manager: Arc<OnceLock<Arc<dyn AppManager + Send + Sync>>>,
#[cfg(feature = "tag-filtering")]
tag_filtering_enabled: Arc<AtomicBool>,
#[cfg(feature = "tag-filtering")]
enable_tags_globally: Arc<AtomicBool>,
#[cfg(feature = "tag-filtering")]
filter_index: Arc<FilterIndex>,
}
impl Clone for LocalAdapter {
fn clone(&self) -> Self {
Self {
namespaces: Arc::clone(&self.namespaces),
pending_presence_members: Arc::clone(&self.pending_presence_members),
pending_presence_by_channel: Arc::clone(&self.pending_presence_by_channel),
buffer_multiplier_per_cpu: self.buffer_multiplier_per_cpu,
max_concurrent: self.max_concurrent,
broadcast_semaphore: Arc::clone(&self.broadcast_semaphore),
#[cfg(feature = "delta")]
delta_compression: Arc::clone(&self.delta_compression),
#[cfg(feature = "delta")]
app_manager: Arc::clone(&self.app_manager),
#[cfg(feature = "tag-filtering")]
tag_filtering_enabled: Arc::clone(&self.tag_filtering_enabled),
#[cfg(feature = "tag-filtering")]
enable_tags_globally: Arc::clone(&self.enable_tags_globally),
#[cfg(feature = "tag-filtering")]
filter_index: Arc::clone(&self.filter_index),
}
}
}
impl Default for LocalAdapter {
fn default() -> Self {
Self::new()
}
}
impl LocalAdapter {
pub fn new() -> Self {
Self::new_with_buffer_multiplier(64)
}
#[cfg(feature = "tag-filtering")]
pub fn set_tag_filtering_enabled(&self, enabled: bool) {
self.tag_filtering_enabled.store(enabled, Ordering::Release);
}
#[cfg(feature = "tag-filtering")]
pub fn is_tag_filtering_enabled(&self) -> bool {
self.tag_filtering_enabled.load(Ordering::Acquire)
}
#[cfg(feature = "tag-filtering")]
pub fn set_enable_tags_globally(&self, enabled: bool) {
self.enable_tags_globally.store(enabled, Ordering::Release);
}
#[cfg(feature = "tag-filtering")]
pub fn get_enable_tags_globally(&self) -> bool {
self.enable_tags_globally.load(Ordering::Acquire)
}
pub fn new_with_buffer_multiplier(multiplier: usize) -> Self {
let cpu_cores = num_cpus::get();
let max_concurrent = cpu_cores * multiplier;
info!(
"LocalAdapter initialized with {} CPU cores, buffer multiplier {}, max concurrent {}",
cpu_cores, multiplier, max_concurrent
);
Self {
namespaces: Arc::new(fast_dashmap()),
pending_presence_members: Arc::new(fast_dashmap()),
pending_presence_by_channel: Arc::new(fast_dashmap()),
buffer_multiplier_per_cpu: multiplier,
max_concurrent,
broadcast_semaphore: Arc::new(Semaphore::new(max_concurrent * 8)),
#[cfg(feature = "delta")]
delta_compression: Arc::new(OnceLock::new()),
#[cfg(feature = "delta")]
app_manager: Arc::new(OnceLock::new()),
#[cfg(feature = "tag-filtering")]
tag_filtering_enabled: Arc::new(AtomicBool::new(false)),
#[cfg(feature = "tag-filtering")]
enable_tags_globally: Arc::new(AtomicBool::new(true)), #[cfg(feature = "tag-filtering")]
filter_index: Arc::new(FilterIndex::new()),
}
}
#[cfg(feature = "tag-filtering")]
pub fn get_filter_index(&self) -> Arc<FilterIndex> {
Arc::clone(&self.filter_index)
}
#[cfg(feature = "delta")]
pub async fn set_delta_compression(
&self,
delta_compression: Arc<sockudo_delta::DeltaCompressionManager>,
app_manager: Arc<dyn AppManager + Send + Sync>,
) {
let _ = self.delta_compression.set(delta_compression);
let _ = self.app_manager.set(app_manager);
}
#[cfg(feature = "delta")]
#[inline]
pub fn get_delta_compression(&self) -> Option<&Arc<sockudo_delta::DeltaCompressionManager>> {
self.delta_compression.get()
}
#[cfg(feature = "delta")]
#[inline]
pub fn get_app_manager(&self) -> Option<&Arc<dyn AppManager + Send + Sync>> {
self.app_manager.get()
}
fn remove_pending_presence_index_entry(&self, app_id: &str, channel: &str, pending_key: &str) {
let channel_key = pending_presence_channel_key(app_id, channel);
let should_remove_channel = self
.pending_presence_by_channel
.get(&channel_key)
.is_some_and(|index| {
index.remove(pending_key);
index.is_empty()
});
if should_remove_channel {
self.pending_presence_by_channel.remove(&channel_key);
}
}
}
#[cfg(test)]
mod tests;