rustbasic-core 0.1.26

Core framework logic for RustBasic - A modern web framework for Rust
Documentation
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::RwLock;

pub type Tx = UnboundedSender<String>;

pub struct ClientSession {
    pub id: u64,
    pub tx: Tx,
}

#[derive(Default)]
pub struct BroadcasterState {
    pub channels: RwLock<HashMap<String, Vec<ClientSession>>>,
    pub next_id: std::sync::atomic::AtomicU64,
}

impl BroadcasterState {
    pub fn next_conn_id(&self) -> u64 {
        self.next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
    }

    pub async fn subscribe(&self, channel_name: &str, session: ClientSession) {
        let mut channels = self.channels.write().await;
        let entry = channels.entry(channel_name.to_string()).or_default();
        entry.retain(|c| c.id != session.id);
        entry.push(session);
    }

    pub async fn unsubscribe(&self, channel_name: &str, session_id: u64) {
        let mut channels = self.channels.write().await;
        if let Some(sessions) = channels.get_mut(channel_name) {
            sessions.retain(|c| c.id != session_id);
        }
    }

    pub async fn remove_session(&self, session_id: u64) {
        let mut channels = self.channels.write().await;
        for sessions in channels.values_mut() {
            sessions.retain(|c| c.id != session_id);
        }
    }
}

pub struct Broadcaster;

static BROADCASTER_STATE: std::sync::OnceLock<Arc<BroadcasterState>> = std::sync::OnceLock::new();

impl Broadcaster {
    pub fn state() -> &'static Arc<BroadcasterState> {
        BROADCASTER_STATE.get_or_init(|| Arc::new(BroadcasterState::default()))
    }

    pub fn to(channel: &str) -> ChannelBroadcaster {
        ChannelBroadcaster {
            channel: channel.to_string(),
        }
    }
}

pub struct ChannelBroadcaster {
    channel: String,
}

impl ChannelBroadcaster {
    pub async fn emit<T: serde::Serialize>(&self, event: &str, payload: T) {
        let state = Broadcaster::state();
        let msg = serde_json::json!({
            "event": event,
            "channel": self.channel,
            "data": payload
        });
        let msg_str = match serde_json::to_string(&msg) {
            Ok(s) => s,
            Err(_) => return,
        };

        let channels = state.channels.read().await;
        if let Some(sessions) = channels.get(&self.channel) {
            for session in sessions {
                let _ = session.tx.send(msg_str.clone());
            }
        }
    }
}