elif_http/websocket/channel/
manager.rs1use super::super::types::{ConnectionId, WebSocketError, WebSocketMessage, WebSocketResult};
4use super::channel::Channel;
5use super::events::ChannelEvent;
6use super::message::ChannelMessage;
7use super::types::{
8 ChannelId, ChannelManagerStats, ChannelMetadata, ChannelPermissions, ChannelStats, ChannelType,
9};
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::info;
14
15pub struct ChannelManager {
17 channels: Arc<RwLock<HashMap<ChannelId, Arc<Channel>>>>,
19 connection_channels: Arc<RwLock<HashMap<ConnectionId, HashSet<ChannelId>>>>,
21 event_handlers: Arc<RwLock<Vec<Box<dyn Fn(ChannelEvent) + Send + Sync>>>>,
23}
24
25impl ChannelManager {
26 pub fn new() -> Self {
28 Self {
29 channels: Arc::new(RwLock::new(HashMap::new())),
30 connection_channels: Arc::new(RwLock::new(HashMap::new())),
31 event_handlers: Arc::new(RwLock::new(Vec::new())),
32 }
33 }
34
35 pub async fn create_channel(
37 &self,
38 name: String,
39 channel_type: ChannelType,
40 created_by: Option<ConnectionId>,
41 ) -> WebSocketResult<ChannelId> {
42 let channel = Channel::new(name.clone(), channel_type, created_by);
43 let channel_id = channel.id;
44
45 if let Some(creator_id) = created_by {
47 channel
48 .add_member(creator_id, ChannelPermissions::admin(), None)
49 .await?;
50
51 let mut connection_channels = self.connection_channels.write().await;
53 connection_channels
54 .entry(creator_id)
55 .or_insert_with(HashSet::new)
56 .insert(channel_id);
57 }
58
59 {
61 let mut channels = self.channels.write().await;
62 channels.insert(channel_id, Arc::new(channel));
63 }
64
65 info!("Created channel '{}' with ID {}", name, channel_id);
66 self.emit_event(ChannelEvent::ChannelCreated(channel_id, name))
67 .await;
68
69 Ok(channel_id)
70 }
71
72 pub async fn create_channel_with_metadata(
74 &self,
75 metadata: ChannelMetadata,
76 ) -> WebSocketResult<ChannelId> {
77 let channel = Channel::with_metadata(metadata.clone());
78 let channel_id = channel.id;
79
80 if let Some(creator_id) = metadata.created_by {
82 channel
83 .add_member(creator_id, ChannelPermissions::admin(), None)
84 .await?;
85
86 let mut connection_channels = self.connection_channels.write().await;
88 connection_channels
89 .entry(creator_id)
90 .or_insert_with(HashSet::new)
91 .insert(channel_id);
92 }
93
94 {
96 let mut channels = self.channels.write().await;
97 channels.insert(channel_id, Arc::new(channel));
98 }
99
100 info!("Created channel '{}' with ID {}", metadata.name, channel_id);
101 self.emit_event(ChannelEvent::ChannelCreated(channel_id, metadata.name))
102 .await;
103
104 Ok(channel_id)
105 }
106
107 pub async fn delete_channel(&self, channel_id: ChannelId) -> WebSocketResult<()> {
109 let channel = {
110 let mut channels = self.channels.write().await;
111 channels.remove(&channel_id)
112 };
113
114 if let Some(channel) = channel {
115 let channel_name = channel.metadata.name.clone();
116 let member_ids = channel.get_member_ids().await;
117
118 if !member_ids.is_empty() {
120 let mut connection_channels = self.connection_channels.write().await;
121 for member_id in member_ids {
122 if let Some(member_channels) = connection_channels.get_mut(&member_id) {
123 member_channels.remove(&channel_id);
124 if member_channels.is_empty() {
125 connection_channels.remove(&member_id);
126 }
127 }
128 }
129 }
130
131 info!("Deleted channel '{}' with ID {}", channel_name, channel_id);
132 self.emit_event(ChannelEvent::ChannelDeleted(channel_id, channel_name))
133 .await;
134 Ok(())
135 } else {
136 Err(WebSocketError::Connection(format!(
137 "Channel {} not found",
138 channel_id
139 )))
140 }
141 }
142
143 pub async fn get_channel(&self, channel_id: ChannelId) -> Option<Arc<Channel>> {
145 let channels = self.channels.read().await;
146 channels.get(&channel_id).cloned()
147 }
148
149 pub async fn get_channel_by_name(&self, name: &str) -> Option<Arc<Channel>> {
151 let channel_id = ChannelId::from_name(name);
152 self.get_channel(channel_id).await
153 }
154
155 pub async fn get_all_channels(&self) -> Vec<Arc<Channel>> {
157 let channels = self.channels.read().await;
158 channels.values().cloned().collect()
159 }
160
161 pub async fn get_connection_channels(&self, connection_id: ConnectionId) -> Vec<Arc<Channel>> {
163 let connection_channels = self.connection_channels.read().await;
164
165 if let Some(channel_ids) = connection_channels.get(&connection_id) {
166 let channels = self.channels.read().await;
167 channel_ids
168 .iter()
169 .filter_map(|id| channels.get(id).cloned())
170 .collect()
171 } else {
172 Vec::new()
173 }
174 }
175
176 pub async fn join_channel(
178 &self,
179 channel_id: ChannelId,
180 connection_id: ConnectionId,
181 password: Option<&str>,
182 nickname: Option<String>,
183 ) -> WebSocketResult<()> {
184 let channel = self
185 .get_channel(channel_id)
186 .await
187 .ok_or(WebSocketError::Connection(format!(
188 "Channel {} not found",
189 channel_id
190 )))?;
191
192 match &channel.metadata.channel_type {
194 ChannelType::Public => {
195 }
197 ChannelType::Private => {
198 return Err(WebSocketError::Connection(
201 "Channel is private and requires invitation".to_string(),
202 ));
203 }
204 ChannelType::Protected { .. } => {
205 let provided_password = password.ok_or(WebSocketError::Connection(
206 "Password required for protected channel".to_string(),
207 ))?;
208
209 if !channel.validate_password(provided_password) {
210 return Err(WebSocketError::Connection("Invalid password".to_string()));
211 }
212 }
213 }
214
215 let permissions = if Some(connection_id) == channel.metadata.created_by {
217 ChannelPermissions::admin()
218 } else {
219 ChannelPermissions::default()
220 };
221
222 channel
223 .add_member(connection_id, permissions, nickname.clone())
224 .await?;
225
226 {
228 let mut connection_channels = self.connection_channels.write().await;
229 connection_channels
230 .entry(connection_id)
231 .or_insert_with(HashSet::new)
232 .insert(channel_id);
233 }
234
235 info!("Connection {} joined channel {}", connection_id, channel_id);
236 self.emit_event(ChannelEvent::MemberJoined(
237 channel_id,
238 connection_id,
239 nickname,
240 ))
241 .await;
242
243 Ok(())
244 }
245
246 pub async fn leave_channel(
248 &self,
249 channel_id: ChannelId,
250 connection_id: ConnectionId,
251 ) -> WebSocketResult<()> {
252 let channel = self
253 .get_channel(channel_id)
254 .await
255 .ok_or(WebSocketError::Connection(format!(
256 "Channel {} not found",
257 channel_id
258 )))?;
259
260 let member = channel.get_member(connection_id).await;
262 let nickname = member.as_ref().and_then(|m| m.nickname.clone());
263
264 channel
266 .remove_member(connection_id)
267 .await
268 .ok_or(WebSocketError::Connection(
269 "Connection not a member of channel".to_string(),
270 ))?;
271
272 {
274 let mut connection_channels = self.connection_channels.write().await;
275 if let Some(member_channels) = connection_channels.get_mut(&connection_id) {
276 member_channels.remove(&channel_id);
277 if member_channels.is_empty() {
278 connection_channels.remove(&connection_id);
279 }
280 }
281 }
282
283 info!("Connection {} left channel {}", connection_id, channel_id);
284 self.emit_event(ChannelEvent::MemberLeft(
285 channel_id,
286 connection_id,
287 nickname,
288 ))
289 .await;
290
291 if channel.is_empty().await && channel.metadata.created_by.is_none() {
293 self.delete_channel(channel_id).await?;
294 }
295
296 Ok(())
297 }
298
299 pub async fn leave_all_channels(&self, connection_id: ConnectionId) -> Vec<ChannelId> {
301 let channel_ids = {
303 let mut connection_channels = self.connection_channels.write().await;
304 connection_channels
305 .remove(&connection_id)
306 .unwrap_or_default()
307 };
308
309 let mut left_channels = Vec::new();
310
311 for channel_id in channel_ids {
313 if let Some(channel) = self.get_channel(channel_id).await {
314 let member = channel.get_member(connection_id).await;
316 let nickname = member.as_ref().and_then(|m| m.nickname.clone());
317
318 if channel.remove_member(connection_id).await.is_some() {
320 left_channels.push(channel_id);
321
322 info!("Connection {} left channel {}", connection_id, channel_id);
323 self.emit_event(ChannelEvent::MemberLeft(
324 channel_id,
325 connection_id,
326 nickname,
327 ))
328 .await;
329
330 if channel.is_empty().await && channel.metadata.created_by.is_none() {
332 let _ = self.delete_channel(channel_id).await;
333 }
334 }
335 }
336 }
337
338 if !left_channels.is_empty() {
339 info!(
340 "Connection {} left {} channels",
341 connection_id,
342 left_channels.len()
343 );
344 }
345
346 left_channels
347 }
348
349 pub async fn send_to_channel(
351 &self,
352 channel_id: ChannelId,
353 sender_id: ConnectionId,
354 message: WebSocketMessage,
355 ) -> WebSocketResult<Vec<ConnectionId>> {
356 let channel = self
357 .get_channel(channel_id)
358 .await
359 .ok_or(WebSocketError::Connection(format!(
360 "Channel {} not found",
361 channel_id
362 )))?;
363
364 let sender_member =
366 channel
367 .get_member(sender_id)
368 .await
369 .ok_or(WebSocketError::Connection(
370 "Sender not a member of channel".to_string(),
371 ))?;
372
373 if !sender_member.permissions.can_send_messages {
374 return Err(WebSocketError::Connection(
375 "No permission to send messages".to_string(),
376 ));
377 }
378
379 let channel_message = ChannelMessage::new(
381 channel_id,
382 sender_id,
383 message.clone(),
384 sender_member.nickname.clone(),
385 );
386
387 channel.add_message(channel_message.clone()).await;
389
390 let member_ids = channel.get_member_ids().await;
392
393 info!(
394 "Message sent to channel {} by {} (broadcasting to {} members)",
395 channel_id,
396 sender_id,
397 member_ids.len()
398 );
399
400 self.emit_event(ChannelEvent::MessageSent(channel_id, channel_message))
401 .await;
402
403 Ok(member_ids)
404 }
405
406 pub async fn get_all_channel_stats(&self) -> Vec<ChannelStats> {
408 let channels = self.channels.read().await;
409 let mut stats = Vec::with_capacity(channels.len());
410
411 for channel in channels.values() {
412 stats.push(channel.stats().await);
413 }
414
415 stats
416 }
417
418 pub async fn get_public_channels(&self) -> Vec<ChannelStats> {
420 let channels = self.channels.read().await;
421 let mut public_channels = Vec::new();
422
423 for channel in channels.values() {
424 if matches!(channel.metadata.channel_type, ChannelType::Public) {
425 public_channels.push(channel.stats().await);
426 }
427 }
428
429 public_channels
430 }
431
432 pub async fn stats(&self) -> ChannelManagerStats {
434 let channels = self.channels.read().await;
435 let connection_channels = self.connection_channels.read().await;
436
437 let mut stats = ChannelManagerStats {
438 total_channels: channels.len(),
439 total_connections_in_channels: connection_channels.len(),
440 public_channels: 0,
441 private_channels: 0,
442 protected_channels: 0,
443 empty_channels: 0,
444 };
445
446 for channel in channels.values() {
447 match channel.metadata.channel_type {
448 ChannelType::Public => stats.public_channels += 1,
449 ChannelType::Private => stats.private_channels += 1,
450 ChannelType::Protected { .. } => stats.protected_channels += 1,
451 }
452
453 if channel.is_empty().await {
454 stats.empty_channels += 1;
455 }
456 }
457
458 stats
459 }
460
461 pub async fn cleanup_empty_channels(&self) -> usize {
463 let channels = self.get_all_channels().await;
464 let mut cleaned_up = 0;
465
466 for channel in channels {
467 if channel.is_empty().await
469 && channel.metadata.created_by.is_none()
470 && self.delete_channel(channel.id).await.is_ok()
471 {
472 cleaned_up += 1;
473 }
474 }
475
476 if cleaned_up > 0 {
477 info!("Cleaned up {} empty channels", cleaned_up);
478 }
479
480 cleaned_up
481 }
482
483 pub async fn add_event_handler<F>(&self, handler: F)
485 where
486 F: Fn(ChannelEvent) + Send + Sync + 'static,
487 {
488 let mut handlers = self.event_handlers.write().await;
489 handlers.push(Box::new(handler));
490 }
491
492 async fn emit_event(&self, event: ChannelEvent) {
494 let handlers = self.event_handlers.read().await;
495 for handler in handlers.iter() {
496 handler(event.clone());
497 }
498 }
499}
500
501impl Default for ChannelManager {
502 fn default() -> Self {
503 Self::new()
504 }
505}