oxigdal_websocket/broadcast/
mod.rs1pub mod channel;
10pub mod filter;
11pub mod room;
12pub mod router;
13
14pub use channel::{Channel, ChannelConfig, ChannelStats, TopicChannel};
15pub use filter::{FilterChain, FilterPredicate, MessageFilter};
16pub use room::{Room, RoomManager, RoomStats};
17pub use router::{MessageRouter, RoutingRule, RoutingStrategy};
18
19use crate::error::Result;
20use crate::protocol::message::Message;
21use crate::server::connection::ConnectionId;
22use std::sync::Arc;
23use tokio::sync::RwLock;
24
25#[derive(Debug, Clone)]
27pub struct BroadcastConfig {
28 pub max_subscribers_per_topic: usize,
30 pub max_topics: usize,
32 pub max_rooms: usize,
34 pub max_members_per_room: usize,
36 pub enable_filtering: bool,
38 pub channel_buffer_size: usize,
40}
41
42impl Default for BroadcastConfig {
43 fn default() -> Self {
44 Self {
45 max_subscribers_per_topic: 10_000,
46 max_topics: 1_000,
47 max_rooms: 1_000,
48 max_members_per_room: 10_000,
49 enable_filtering: true,
50 channel_buffer_size: 1000,
51 }
52 }
53}
54
55pub struct BroadcastSystem {
57 config: BroadcastConfig,
58 channels: Arc<RwLock<std::collections::HashMap<String, Arc<TopicChannel>>>>,
59 room_manager: Arc<RoomManager>,
60 router: Arc<MessageRouter>,
61}
62
63impl BroadcastSystem {
64 pub fn new(config: BroadcastConfig) -> Self {
66 Self {
67 config: config.clone(),
68 channels: Arc::new(RwLock::new(std::collections::HashMap::new())),
69 room_manager: Arc::new(RoomManager::new(
70 config.max_rooms,
71 config.max_members_per_room,
72 )),
73 router: Arc::new(MessageRouter::new()),
74 }
75 }
76
77 pub async fn subscribe(&self, topic: String, subscriber: ConnectionId) -> Result<()> {
79 let mut channels = self.channels.write().await;
80
81 let channel = channels.entry(topic.clone()).or_insert_with(|| {
82 Arc::new(TopicChannel::new(
83 topic.clone(),
84 ChannelConfig {
85 max_subscribers: self.config.max_subscribers_per_topic,
86 buffer_size: self.config.channel_buffer_size,
87 },
88 ))
89 });
90
91 channel.subscribe(subscriber).await
92 }
93
94 pub async fn unsubscribe(&self, topic: &str, subscriber: &ConnectionId) -> Result<()> {
96 let channels = self.channels.read().await;
97
98 if let Some(channel) = channels.get(topic) {
99 channel.unsubscribe(subscriber).await?;
100 }
101
102 Ok(())
103 }
104
105 pub async fn publish(&self, topic: &str, message: Message) -> Result<usize> {
107 let channels = self.channels.read().await;
108
109 if let Some(channel) = channels.get(topic) {
110 channel.publish(message).await
111 } else {
112 Ok(0)
113 }
114 }
115
116 pub fn room_manager(&self) -> &Arc<RoomManager> {
118 &self.room_manager
119 }
120
121 pub fn router(&self) -> &Arc<MessageRouter> {
123 &self.router
124 }
125
126 pub async fn stats(&self) -> BroadcastStats {
128 let channels = self.channels.read().await;
129 let mut total_subscribers = 0;
130 let mut total_messages = 0;
131
132 for channel in channels.values() {
133 let stats = channel.stats().await;
134 total_subscribers += stats.subscriber_count;
135 total_messages += stats.messages_published;
136 }
137
138 let room_stats = self.room_manager.stats().await;
139
140 BroadcastStats {
141 topic_count: channels.len(),
142 total_subscribers,
143 total_messages,
144 room_count: room_stats.total_rooms,
145 total_room_members: room_stats.total_members,
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct BroadcastStats {
153 pub topic_count: usize,
155 pub total_subscribers: usize,
157 pub total_messages: u64,
159 pub room_count: usize,
161 pub total_room_members: usize,
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 #[test]
170 fn test_broadcast_config_default() {
171 let config = BroadcastConfig::default();
172 assert!(config.enable_filtering);
173 assert_eq!(config.max_topics, 1_000);
174 }
175
176 #[tokio::test]
177 async fn test_broadcast_system() {
178 let config = BroadcastConfig::default();
179 let system = BroadcastSystem::new(config);
180
181 let stats = system.stats().await;
182 assert_eq!(stats.topic_count, 0);
183 assert_eq!(stats.total_subscribers, 0);
184 }
185}