Skip to main content

oxigdal_websocket/broadcast/
mod.rs

1//! Broadcasting and pub/sub system
2//!
3//! This module provides:
4//! - Pub/sub channels for topic-based messaging
5//! - Room management for group communications
6//! - Selective broadcasting with message filters
7//! - Message routing and distribution
8
9pub mod channel;
10pub mod filter;
11pub mod room;
12pub mod router;
13
14pub use channel::{Channel, ChannelConfig, ChannelStats, TopicChannel};
15pub use filter::{FilterChain, FilterPredicate, MessageFilter};
16pub use room::{Room, RoomManager, RoomStats};
17pub use router::{MessageRouter, RoutingRule, RoutingStrategy};
18
19use crate::error::Result;
20use crate::protocol::message::Message;
21use crate::server::connection::ConnectionId;
22use std::sync::Arc;
23use tokio::sync::RwLock;
24
25/// Broadcast configuration
26#[derive(Debug, Clone)]
27pub struct BroadcastConfig {
28    /// Maximum subscribers per topic
29    pub max_subscribers_per_topic: usize,
30    /// Maximum topics
31    pub max_topics: usize,
32    /// Maximum rooms
33    pub max_rooms: usize,
34    /// Maximum members per room
35    pub max_members_per_room: usize,
36    /// Enable message filtering
37    pub enable_filtering: bool,
38    /// Channel buffer size
39    pub channel_buffer_size: usize,
40}
41
42impl Default for BroadcastConfig {
43    fn default() -> Self {
44        Self {
45            max_subscribers_per_topic: 10_000,
46            max_topics: 1_000,
47            max_rooms: 1_000,
48            max_members_per_room: 10_000,
49            enable_filtering: true,
50            channel_buffer_size: 1000,
51        }
52    }
53}
54
55/// Broadcast system
56pub struct BroadcastSystem {
57    config: BroadcastConfig,
58    channels: Arc<RwLock<std::collections::HashMap<String, Arc<TopicChannel>>>>,
59    room_manager: Arc<RoomManager>,
60    router: Arc<MessageRouter>,
61}
62
63impl BroadcastSystem {
64    /// Create a new broadcast system
65    pub fn new(config: BroadcastConfig) -> Self {
66        Self {
67            config: config.clone(),
68            channels: Arc::new(RwLock::new(std::collections::HashMap::new())),
69            room_manager: Arc::new(RoomManager::new(
70                config.max_rooms,
71                config.max_members_per_room,
72            )),
73            router: Arc::new(MessageRouter::new()),
74        }
75    }
76
77    /// Subscribe to a topic
78    pub async fn subscribe(&self, topic: String, subscriber: ConnectionId) -> Result<()> {
79        let mut channels = self.channels.write().await;
80
81        let channel = channels.entry(topic.clone()).or_insert_with(|| {
82            Arc::new(TopicChannel::new(
83                topic.clone(),
84                ChannelConfig {
85                    max_subscribers: self.config.max_subscribers_per_topic,
86                    buffer_size: self.config.channel_buffer_size,
87                },
88            ))
89        });
90
91        channel.subscribe(subscriber).await
92    }
93
94    /// Unsubscribe from a topic
95    pub async fn unsubscribe(&self, topic: &str, subscriber: &ConnectionId) -> Result<()> {
96        let channels = self.channels.read().await;
97
98        if let Some(channel) = channels.get(topic) {
99            channel.unsubscribe(subscriber).await?;
100        }
101
102        Ok(())
103    }
104
105    /// Publish a message to a topic
106    pub async fn publish(&self, topic: &str, message: Message) -> Result<usize> {
107        let channels = self.channels.read().await;
108
109        if let Some(channel) = channels.get(topic) {
110            channel.publish(message).await
111        } else {
112            Ok(0)
113        }
114    }
115
116    /// Get room manager
117    pub fn room_manager(&self) -> &Arc<RoomManager> {
118        &self.room_manager
119    }
120
121    /// Get message router
122    pub fn router(&self) -> &Arc<MessageRouter> {
123        &self.router
124    }
125
126    /// Get broadcast statistics
127    pub async fn stats(&self) -> BroadcastStats {
128        let channels = self.channels.read().await;
129        let mut total_subscribers = 0;
130        let mut total_messages = 0;
131
132        for channel in channels.values() {
133            let stats = channel.stats().await;
134            total_subscribers += stats.subscriber_count;
135            total_messages += stats.messages_published;
136        }
137
138        let room_stats = self.room_manager.stats().await;
139
140        BroadcastStats {
141            topic_count: channels.len(),
142            total_subscribers,
143            total_messages,
144            room_count: room_stats.total_rooms,
145            total_room_members: room_stats.total_members,
146        }
147    }
148}
149
150/// Broadcast statistics
151#[derive(Debug, Clone)]
152pub struct BroadcastStats {
153    /// Number of topics
154    pub topic_count: usize,
155    /// Total subscribers across all topics
156    pub total_subscribers: usize,
157    /// Total messages published
158    pub total_messages: u64,
159    /// Number of rooms
160    pub room_count: usize,
161    /// Total room members
162    pub total_room_members: usize,
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn test_broadcast_config_default() {
171        let config = BroadcastConfig::default();
172        assert!(config.enable_filtering);
173        assert_eq!(config.max_topics, 1_000);
174    }
175
176    #[tokio::test]
177    async fn test_broadcast_system() {
178        let config = BroadcastConfig::default();
179        let system = BroadcastSystem::new(config);
180
181        let stats = system.stats().await;
182        assert_eq!(stats.topic_count, 0);
183        assert_eq!(stats.total_subscribers, 0);
184    }
185}