use crate::{
types::{Callback, ClientId},
LongPollingServiceContext,
};
use ahash::AHashMap;
use axum::http::HeaderMap;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::RwLock;
const DEFAULT_TIMEOUT_MS: u64 = 20_000;
const DEFAULT_INTERVAL_MS: u64 = 0;
const DEFAULT_MAX_INTERVAL_MS: u64 = 60_000;
const DEFAULT_CHANNEL_CAPACITY: usize = 500;
const DEFAULT_STORAGE_CAPACITY: usize = 10_000;
#[derive(Debug)]
pub struct LongPollingServiceContextBuilder {
subscriptions_storage_capacity: usize,
client_ids_storage_capacity: usize,
consts: LongPollingServiceContextConsts,
session_added: Callback<(Arc<LongPollingServiceContext>, ClientId, HeaderMap)>,
session_removed: Callback<(Arc<LongPollingServiceContext>, ClientId)>,
}
impl Default for LongPollingServiceContextBuilder {
#[inline(always)]
fn default() -> Self {
Self {
subscriptions_storage_capacity: DEFAULT_STORAGE_CAPACITY,
client_ids_storage_capacity: DEFAULT_STORAGE_CAPACITY,
consts: Default::default(),
session_added: Default::default(),
session_removed: Default::default(),
}
}
}
#[derive(Debug)]
pub(crate) struct LongPollingServiceContextConsts {
pub(crate) timeout_ms: u64,
pub(crate) interval_ms: u64,
pub(crate) max_interval_ms: u64,
pub(crate) client_channel_capacity: usize,
pub(crate) subscription_channel_capacity: usize,
}
impl Default for LongPollingServiceContextConsts {
#[inline(always)]
fn default() -> Self {
Self {
timeout_ms: DEFAULT_TIMEOUT_MS,
interval_ms: DEFAULT_INTERVAL_MS,
max_interval_ms: DEFAULT_MAX_INTERVAL_MS,
client_channel_capacity: DEFAULT_CHANNEL_CAPACITY,
subscription_channel_capacity: DEFAULT_CHANNEL_CAPACITY,
}
}
}
impl LongPollingServiceContextBuilder {
#[inline(always)]
pub fn new() -> Self {
Self::default()
}
#[inline(always)]
pub fn build(self) -> Arc<LongPollingServiceContext> {
let Self {
subscriptions_storage_capacity,
client_ids_storage_capacity,
consts,
session_added,
session_removed,
} = self;
Arc::new(LongPollingServiceContext {
session_added,
session_removed,
consts,
channels_data: RwLock::new(AHashMap::with_capacity(subscriptions_storage_capacity)),
client_id_senders: Arc::new(RwLock::new(AHashMap::with_capacity(
client_ids_storage_capacity,
))),
})
}
#[inline(always)]
pub fn timeout_ms(self, timeout_ms: u64) -> Self {
Self {
consts: LongPollingServiceContextConsts {
timeout_ms,
..self.consts
},
..self
}
}
#[inline(always)]
pub fn interval_ms(self, _interval_ms: u64) -> Self {
unimplemented!()
}
#[inline(always)]
pub fn max_interval_ms(self, max_interval_ms: u64) -> Self {
Self {
consts: LongPollingServiceContextConsts {
max_interval_ms,
..self.consts
},
..self
}
}
#[inline(always)]
pub fn client_channel_capacity(self, capacity: usize) -> Self {
Self {
consts: LongPollingServiceContextConsts {
client_channel_capacity: capacity,
..self.consts
},
..self
}
}
#[inline(always)]
pub fn client_storage_capacity(self, capacity: usize) -> Self {
Self {
client_ids_storage_capacity: capacity,
..self
}
}
#[inline(always)]
pub fn subscription_channel_capacity(self, capacity: usize) -> Self {
Self {
consts: LongPollingServiceContextConsts {
subscription_channel_capacity: capacity,
..self.consts
},
..self
}
}
#[inline(always)]
pub fn subscription_storage_capacity(self, capacity: usize) -> Self {
Self {
subscriptions_storage_capacity: capacity,
..self
}
}
#[inline(always)]
pub fn session_added<F>(self, callback: F) -> Self
where
F: Fn((Arc<LongPollingServiceContext>, ClientId, HeaderMap)) + Send + Sync + 'static,
{
Self {
session_added: Callback::new_sync(callback),
..self
}
}
#[inline(always)]
pub fn async_session_added<F, Fut>(self, callback: F) -> Self
where
F: Fn((Arc<LongPollingServiceContext>, ClientId, HeaderMap)) -> Fut + Sync + Send + 'static,
Fut: Future<Output = ()> + Sync + Send + 'static,
{
Self {
session_added: Callback::new_async(callback),
..self
}
}
#[inline(always)]
pub fn session_removed<F>(self, callback: F) -> Self
where
F: Fn((Arc<LongPollingServiceContext>, ClientId)) + Send + Sync + 'static,
{
Self {
session_removed: Callback::new_sync(callback),
..self
}
}
#[inline(always)]
pub fn async_session_removed<F, Fut>(self, callback: F) -> Self
where
F: Fn((Arc<LongPollingServiceContext>, ClientId)) -> Fut + Sync + Send + 'static,
Fut: Future<Output = ()> + Sync + Send + 'static,
{
Self {
session_removed: Callback::new_async(callback),
..self
}
}
}