elif_http/websocket/channel/
manager.rs

1//! Channel manager for WebSocket channel operations
2
3use super::super::types::{ConnectionId, WebSocketError, WebSocketMessage, WebSocketResult};
4use super::channel::Channel;
5use super::events::ChannelEvent;
6use super::message::ChannelMessage;
7use super::types::{
8    ChannelId, ChannelManagerStats, ChannelMetadata, ChannelPermissions, ChannelStats, ChannelType,
9};
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::info;
14
15/// High-performance channel manager for WebSocket channel operations
16pub struct ChannelManager {
17    /// Active channels
18    channels: Arc<RwLock<HashMap<ChannelId, Arc<Channel>>>>,
19    /// Connection to channel mapping for quick lookup
20    connection_channels: Arc<RwLock<HashMap<ConnectionId, HashSet<ChannelId>>>>,
21    /// Event handlers
22    event_handlers: Arc<RwLock<Vec<Box<dyn Fn(ChannelEvent) + Send + Sync>>>>,
23}
24
25impl ChannelManager {
26    /// Create a new channel manager
27    pub fn new() -> Self {
28        Self {
29            channels: Arc::new(RwLock::new(HashMap::new())),
30            connection_channels: Arc::new(RwLock::new(HashMap::new())),
31            event_handlers: Arc::new(RwLock::new(Vec::new())),
32        }
33    }
34
35    /// Create a new channel
36    pub async fn create_channel(
37        &self,
38        name: String,
39        channel_type: ChannelType,
40        created_by: Option<ConnectionId>,
41    ) -> WebSocketResult<ChannelId> {
42        let channel = Channel::new(name.clone(), channel_type, created_by);
43        let channel_id = channel.id;
44
45        // Add creator as admin if specified
46        if let Some(creator_id) = created_by {
47            channel
48                .add_member(creator_id, ChannelPermissions::admin(), None)
49                .await?;
50
51            // Track the connection's channel membership
52            let mut connection_channels = self.connection_channels.write().await;
53            connection_channels
54                .entry(creator_id)
55                .or_insert_with(HashSet::new)
56                .insert(channel_id);
57        }
58
59        // Store the channel
60        {
61            let mut channels = self.channels.write().await;
62            channels.insert(channel_id, Arc::new(channel));
63        }
64
65        info!("Created channel '{}' with ID {}", name, channel_id);
66        self.emit_event(ChannelEvent::ChannelCreated(channel_id, name))
67            .await;
68
69        Ok(channel_id)
70    }
71
72    /// Create a channel with custom metadata
73    pub async fn create_channel_with_metadata(
74        &self,
75        metadata: ChannelMetadata,
76    ) -> WebSocketResult<ChannelId> {
77        let channel = Channel::with_metadata(metadata.clone());
78        let channel_id = channel.id;
79
80        // Add creator as admin if specified
81        if let Some(creator_id) = metadata.created_by {
82            channel
83                .add_member(creator_id, ChannelPermissions::admin(), None)
84                .await?;
85
86            // Track the connection's channel membership
87            let mut connection_channels = self.connection_channels.write().await;
88            connection_channels
89                .entry(creator_id)
90                .or_insert_with(HashSet::new)
91                .insert(channel_id);
92        }
93
94        // Store the channel
95        {
96            let mut channels = self.channels.write().await;
97            channels.insert(channel_id, Arc::new(channel));
98        }
99
100        info!("Created channel '{}' with ID {}", metadata.name, channel_id);
101        self.emit_event(ChannelEvent::ChannelCreated(channel_id, metadata.name))
102            .await;
103
104        Ok(channel_id)
105    }
106
107    /// Delete a channel
108    pub async fn delete_channel(&self, channel_id: ChannelId) -> WebSocketResult<()> {
109        let channel = {
110            let mut channels = self.channels.write().await;
111            channels.remove(&channel_id)
112        };
113
114        if let Some(channel) = channel {
115            let channel_name = channel.metadata.name.clone();
116            let member_ids = channel.get_member_ids().await;
117
118            // Remove channel from all members' tracking
119            if !member_ids.is_empty() {
120                let mut connection_channels = self.connection_channels.write().await;
121                for member_id in member_ids {
122                    if let Some(member_channels) = connection_channels.get_mut(&member_id) {
123                        member_channels.remove(&channel_id);
124                        if member_channels.is_empty() {
125                            connection_channels.remove(&member_id);
126                        }
127                    }
128                }
129            }
130
131            info!("Deleted channel '{}' with ID {}", channel_name, channel_id);
132            self.emit_event(ChannelEvent::ChannelDeleted(channel_id, channel_name))
133                .await;
134            Ok(())
135        } else {
136            Err(WebSocketError::Connection(format!(
137                "Channel {} not found",
138                channel_id
139            )))
140        }
141    }
142
143    /// Get a channel by ID
144    pub async fn get_channel(&self, channel_id: ChannelId) -> Option<Arc<Channel>> {
145        let channels = self.channels.read().await;
146        channels.get(&channel_id).cloned()
147    }
148
149    /// Get a channel by name
150    pub async fn get_channel_by_name(&self, name: &str) -> Option<Arc<Channel>> {
151        let channel_id = ChannelId::from_name(name);
152        self.get_channel(channel_id).await
153    }
154
155    /// Get all channels
156    pub async fn get_all_channels(&self) -> Vec<Arc<Channel>> {
157        let channels = self.channels.read().await;
158        channels.values().cloned().collect()
159    }
160
161    /// Get channels that a connection is a member of
162    pub async fn get_connection_channels(&self, connection_id: ConnectionId) -> Vec<Arc<Channel>> {
163        let connection_channels = self.connection_channels.read().await;
164
165        if let Some(channel_ids) = connection_channels.get(&connection_id) {
166            let channels = self.channels.read().await;
167            channel_ids
168                .iter()
169                .filter_map(|id| channels.get(id).cloned())
170                .collect()
171        } else {
172            Vec::new()
173        }
174    }
175
176    /// Join a connection to a channel
177    pub async fn join_channel(
178        &self,
179        channel_id: ChannelId,
180        connection_id: ConnectionId,
181        password: Option<&str>,
182        nickname: Option<String>,
183    ) -> WebSocketResult<()> {
184        let channel = self
185            .get_channel(channel_id)
186            .await
187            .ok_or(WebSocketError::Connection(format!(
188                "Channel {} not found",
189                channel_id
190            )))?;
191
192        // Check access permissions
193        match &channel.metadata.channel_type {
194            ChannelType::Public => {
195                // Anyone can join public channels
196            }
197            ChannelType::Private => {
198                // For private channels, we'd need invitation logic
199                // For now, reject all attempts
200                return Err(WebSocketError::Connection(
201                    "Channel is private and requires invitation".to_string(),
202                ));
203            }
204            ChannelType::Protected { .. } => {
205                let provided_password = password.ok_or(WebSocketError::Connection(
206                    "Password required for protected channel".to_string(),
207                ))?;
208
209                if !channel.validate_password(provided_password) {
210                    return Err(WebSocketError::Connection("Invalid password".to_string()));
211                }
212            }
213        }
214
215        // Add member to channel
216        let permissions = if Some(connection_id) == channel.metadata.created_by {
217            ChannelPermissions::admin()
218        } else {
219            ChannelPermissions::default()
220        };
221
222        channel
223            .add_member(connection_id, permissions, nickname.clone())
224            .await?;
225
226        // Track the connection's channel membership
227        {
228            let mut connection_channels = self.connection_channels.write().await;
229            connection_channels
230                .entry(connection_id)
231                .or_insert_with(HashSet::new)
232                .insert(channel_id);
233        }
234
235        info!("Connection {} joined channel {}", connection_id, channel_id);
236        self.emit_event(ChannelEvent::MemberJoined(
237            channel_id,
238            connection_id,
239            nickname,
240        ))
241        .await;
242
243        Ok(())
244    }
245
246    /// Remove a connection from a channel
247    pub async fn leave_channel(
248        &self,
249        channel_id: ChannelId,
250        connection_id: ConnectionId,
251    ) -> WebSocketResult<()> {
252        let channel = self
253            .get_channel(channel_id)
254            .await
255            .ok_or(WebSocketError::Connection(format!(
256                "Channel {} not found",
257                channel_id
258            )))?;
259
260        // Get member info before removal
261        let member = channel.get_member(connection_id).await;
262        let nickname = member.as_ref().and_then(|m| m.nickname.clone());
263
264        // Remove member from channel
265        channel
266            .remove_member(connection_id)
267            .await
268            .ok_or(WebSocketError::Connection(
269                "Connection not a member of channel".to_string(),
270            ))?;
271
272        // Remove from connection tracking
273        {
274            let mut connection_channels = self.connection_channels.write().await;
275            if let Some(member_channels) = connection_channels.get_mut(&connection_id) {
276                member_channels.remove(&channel_id);
277                if member_channels.is_empty() {
278                    connection_channels.remove(&connection_id);
279                }
280            }
281        }
282
283        info!("Connection {} left channel {}", connection_id, channel_id);
284        self.emit_event(ChannelEvent::MemberLeft(
285            channel_id,
286            connection_id,
287            nickname,
288        ))
289        .await;
290
291        // Auto-delete empty channels (except those with explicit creators)
292        if channel.is_empty().await && channel.metadata.created_by.is_none() {
293            self.delete_channel(channel_id).await?;
294        }
295
296        Ok(())
297    }
298
299    /// Remove a connection from all channels (useful for cleanup on disconnect)
300    pub async fn leave_all_channels(&self, connection_id: ConnectionId) -> Vec<ChannelId> {
301        // Acquire write lock once and remove all channel entries for this connection
302        let channel_ids = {
303            let mut connection_channels = self.connection_channels.write().await;
304            connection_channels
305                .remove(&connection_id)
306                .unwrap_or_default()
307        };
308
309        let mut left_channels = Vec::new();
310
311        // Now handle cleanup for each channel without repeated lock acquisitions
312        for channel_id in channel_ids {
313            if let Some(channel) = self.get_channel(channel_id).await {
314                // Get member info before removal for event logging
315                let member = channel.get_member(connection_id).await;
316                let nickname = member.as_ref().and_then(|m| m.nickname.clone());
317
318                // Remove member from channel
319                if channel.remove_member(connection_id).await.is_some() {
320                    left_channels.push(channel_id);
321
322                    info!("Connection {} left channel {}", connection_id, channel_id);
323                    self.emit_event(ChannelEvent::MemberLeft(
324                        channel_id,
325                        connection_id,
326                        nickname,
327                    ))
328                    .await;
329
330                    // Auto-delete empty channels (except those with explicit creators)
331                    if channel.is_empty().await && channel.metadata.created_by.is_none() {
332                        let _ = self.delete_channel(channel_id).await;
333                    }
334                }
335            }
336        }
337
338        if !left_channels.is_empty() {
339            info!(
340                "Connection {} left {} channels",
341                connection_id,
342                left_channels.len()
343            );
344        }
345
346        left_channels
347    }
348
349    /// Send a message to a channel
350    pub async fn send_to_channel(
351        &self,
352        channel_id: ChannelId,
353        sender_id: ConnectionId,
354        message: WebSocketMessage,
355    ) -> WebSocketResult<Vec<ConnectionId>> {
356        let channel = self
357            .get_channel(channel_id)
358            .await
359            .ok_or(WebSocketError::Connection(format!(
360                "Channel {} not found",
361                channel_id
362            )))?;
363
364        // Check if sender is a member and has permission to send messages
365        let sender_member =
366            channel
367                .get_member(sender_id)
368                .await
369                .ok_or(WebSocketError::Connection(
370                    "Sender not a member of channel".to_string(),
371                ))?;
372
373        if !sender_member.permissions.can_send_messages {
374            return Err(WebSocketError::Connection(
375                "No permission to send messages".to_string(),
376            ));
377        }
378
379        // Create channel message
380        let channel_message = ChannelMessage::new(
381            channel_id,
382            sender_id,
383            message.clone(),
384            sender_member.nickname.clone(),
385        );
386
387        // Add to channel history
388        channel.add_message(channel_message.clone()).await;
389
390        // Get all member IDs for broadcasting
391        let member_ids = channel.get_member_ids().await;
392
393        info!(
394            "Message sent to channel {} by {} (broadcasting to {} members)",
395            channel_id,
396            sender_id,
397            member_ids.len()
398        );
399
400        self.emit_event(ChannelEvent::MessageSent(channel_id, channel_message))
401            .await;
402
403        Ok(member_ids)
404    }
405
406    /// Get channel statistics for all channels
407    pub async fn get_all_channel_stats(&self) -> Vec<ChannelStats> {
408        let channels = self.channels.read().await;
409        let mut stats = Vec::with_capacity(channels.len());
410
411        for channel in channels.values() {
412            stats.push(channel.stats().await);
413        }
414
415        stats
416    }
417
418    /// Get public channels for discovery
419    pub async fn get_public_channels(&self) -> Vec<ChannelStats> {
420        let channels = self.channels.read().await;
421        let mut public_channels = Vec::new();
422
423        for channel in channels.values() {
424            if matches!(channel.metadata.channel_type, ChannelType::Public) {
425                public_channels.push(channel.stats().await);
426            }
427        }
428
429        public_channels
430    }
431
432    /// Get manager statistics
433    pub async fn stats(&self) -> ChannelManagerStats {
434        let channels = self.channels.read().await;
435        let connection_channels = self.connection_channels.read().await;
436
437        let mut stats = ChannelManagerStats {
438            total_channels: channels.len(),
439            total_connections_in_channels: connection_channels.len(),
440            public_channels: 0,
441            private_channels: 0,
442            protected_channels: 0,
443            empty_channels: 0,
444        };
445
446        for channel in channels.values() {
447            match channel.metadata.channel_type {
448                ChannelType::Public => stats.public_channels += 1,
449                ChannelType::Private => stats.private_channels += 1,
450                ChannelType::Protected { .. } => stats.protected_channels += 1,
451            }
452
453            if channel.is_empty().await {
454                stats.empty_channels += 1;
455            }
456        }
457
458        stats
459    }
460
461    /// Clean up empty channels
462    pub async fn cleanup_empty_channels(&self) -> usize {
463        let channels = self.get_all_channels().await;
464        let mut cleaned_up = 0;
465
466        for channel in channels {
467            // Only auto-delete channels without explicit creators
468            if channel.is_empty().await
469                && channel.metadata.created_by.is_none()
470                && self.delete_channel(channel.id).await.is_ok()
471            {
472                cleaned_up += 1;
473            }
474        }
475
476        if cleaned_up > 0 {
477            info!("Cleaned up {} empty channels", cleaned_up);
478        }
479
480        cleaned_up
481    }
482
483    /// Add an event handler
484    pub async fn add_event_handler<F>(&self, handler: F)
485    where
486        F: Fn(ChannelEvent) + Send + Sync + 'static,
487    {
488        let mut handlers = self.event_handlers.write().await;
489        handlers.push(Box::new(handler));
490    }
491
492    /// Emit an event to all handlers
493    async fn emit_event(&self, event: ChannelEvent) {
494        let handlers = self.event_handlers.read().await;
495        for handler in handlers.iter() {
496            handler(event.clone());
497        }
498    }
499}
500
501impl Default for ChannelManager {
502    fn default() -> Self {
503        Self::new()
504    }
505}