use std::sync::Mutex;
use flume::{Receiver, Sender, TryRecvError};
use crate::websocket::events::{SessionEvent, WsUpdate};
pub struct UpdateStream {
rx: Receiver<WsUpdate>,
}
impl UpdateStream {
#[inline]
pub(crate) fn new(rx: Receiver<WsUpdate>) -> Self {
Self { rx }
}
#[inline]
pub async fn next(&self) -> Option<WsUpdate> {
self.rx.recv_async().await.ok()
}
#[inline]
pub fn try_next(&self) -> Result<Option<WsUpdate>, TryRecvError> {
match self.rx.try_recv() {
Ok(v) => Ok(Some(v)),
Err(TryRecvError::Empty) => Ok(None),
Err(e @ TryRecvError::Disconnected) => Err(e),
}
}
#[inline]
pub fn len(&self) -> usize {
self.rx.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.rx.is_empty()
}
#[inline]
pub fn is_closed(&self) -> bool {
self.rx.is_disconnected()
}
}
pub struct SessionStream {
rx: Receiver<SessionEvent>,
}
impl SessionStream {
#[inline]
pub(crate) fn new(rx: Receiver<SessionEvent>) -> Self {
Self { rx }
}
#[inline]
pub async fn next(&self) -> Option<SessionEvent> {
self.rx.recv_async().await.ok()
}
#[inline]
pub fn try_next(&self) -> Result<Option<SessionEvent>, TryRecvError> {
match self.rx.try_recv() {
Ok(v) => Ok(Some(v)),
Err(TryRecvError::Empty) => Ok(None),
Err(e @ TryRecvError::Disconnected) => Err(e),
}
}
#[inline]
pub fn is_closed(&self) -> bool {
self.rx.is_disconnected()
}
}
pub struct WsDispatcher {
updates_tx: Sender<WsUpdate>,
updates_rx: Mutex<Option<Receiver<WsUpdate>>>,
session_tx: Sender<SessionEvent>,
session_rx: Mutex<Option<Receiver<SessionEvent>>>,
}
#[derive(Debug, Clone, Copy)]
pub struct WsDispatcherConfig {
pub updates_capacity: usize,
pub session_capacity: usize,
}
impl Default for WsDispatcherConfig {
fn default() -> Self {
Self {
updates_capacity: 4096,
session_capacity: 256,
}
}
}
impl WsDispatcher {
pub fn new(config: WsDispatcherConfig) -> Self {
let (updates_tx, updates_rx) = flume::bounded(config.updates_capacity);
let (session_tx, session_rx) = flume::bounded(config.session_capacity);
Self {
updates_tx,
updates_rx: Mutex::new(Some(updates_rx)),
session_tx,
session_rx: Mutex::new(Some(session_rx)),
}
}
#[inline]
pub fn take_updates(&self) -> Option<UpdateStream> {
self.updates_rx
.lock()
.ok()
.and_then(|mut g| g.take())
.map(UpdateStream::new)
}
#[inline]
pub fn take_session_events(&self) -> Option<SessionStream> {
self.session_rx
.lock()
.ok()
.and_then(|mut g| g.take())
.map(SessionStream::new)
}
#[inline]
pub fn try_send_update(&self, update: WsUpdate) -> bool {
self.updates_tx.try_send(update).is_ok()
}
#[inline]
pub async fn send_session(&self, event: SessionEvent) {
let _ = self.session_tx.send_async(event).await;
}
#[inline]
pub fn is_updates_full(&self) -> bool {
self.updates_tx.is_full()
}
}