oxirs_chat/
collaboration.rs

1//! Real-Time Collaboration Module
2//!
3//! Provides real-time collaboration features for multi-user chat sessions including:
4//! - Shared session support with multi-user participation
5//! - Real-time presence indicators
6//! - Live cursor position sharing
7//! - Collaborative query building
8//! - Synchronized message streaming
9
10use anyhow::{Context, Result};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use tokio::sync::{broadcast, RwLock};
16use tracing::info;
17use uuid::Uuid;
18
19/// Collaborative session manager
20pub struct CollaborationManager {
21    /// Shared sessions indexed by session ID
22    shared_sessions: Arc<RwLock<HashMap<String, SharedSession>>>,
23    /// Broadcast channel for real-time updates
24    update_tx: broadcast::Sender<CollaborationUpdate>,
25    /// Configuration
26    config: CollaborationConfig,
27}
28
29/// Configuration for collaboration features
30#[derive(Debug, Clone)]
31pub struct CollaborationConfig {
32    /// Maximum users per shared session
33    pub max_users_per_session: usize,
34    /// Enable cursor position sharing
35    pub enable_cursor_sharing: bool,
36    /// Enable presence indicators
37    pub enable_presence: bool,
38    /// Session idle timeout (seconds)
39    pub idle_timeout_secs: u64,
40    /// Update broadcast buffer size
41    pub broadcast_buffer_size: usize,
42}
43
44impl Default for CollaborationConfig {
45    fn default() -> Self {
46        Self {
47            max_users_per_session: 10,
48            enable_cursor_sharing: true,
49            enable_presence: true,
50            idle_timeout_secs: 1800, // 30 minutes
51            broadcast_buffer_size: 1000,
52        }
53    }
54}
55
56/// Shared session allowing multiple users to collaborate
57#[derive(Debug, Clone)]
58pub struct SharedSession {
59    /// Unique session ID
60    pub session_id: String,
61    /// Session owner/creator
62    pub owner_id: String,
63    /// Currently connected participants
64    pub participants: HashMap<String, Participant>,
65    /// Session creation time
66    pub created_at: DateTime<Utc>,
67    /// Last activity timestamp
68    pub last_activity: DateTime<Utc>,
69    /// Session metadata
70    pub metadata: HashMap<String, serde_json::Value>,
71    /// Access control settings
72    pub access_control: AccessControl,
73}
74
75/// Participant in a shared session
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Participant {
78    /// User ID
79    pub user_id: String,
80    /// Display name
81    pub display_name: String,
82    /// Joined timestamp
83    pub joined_at: DateTime<Utc>,
84    /// Last seen timestamp
85    pub last_seen: DateTime<Utc>,
86    /// Current cursor position (if cursor sharing enabled)
87    pub cursor_position: Option<CursorPosition>,
88    /// User role in the session
89    pub role: ParticipantRole,
90    /// Current status
91    pub status: ParticipantStatus,
92    /// User avatar/color for UI display
93    pub avatar_color: String,
94}
95
96/// Cursor position in collaborative editing
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct CursorPosition {
99    /// Line number (0-indexed)
100    pub line: usize,
101    /// Column number (0-indexed)
102    pub column: usize,
103    /// Selected text range (if any)
104    pub selection: Option<TextRange>,
105    /// Last update timestamp
106    pub updated_at: DateTime<Utc>,
107}
108
109/// Text range for selections
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct TextRange {
112    pub start_line: usize,
113    pub start_column: usize,
114    pub end_line: usize,
115    pub end_column: usize,
116}
117
118/// Participant role
119#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
120#[serde(rename_all = "snake_case")]
121pub enum ParticipantRole {
122    /// Session owner with full control
123    Owner,
124    /// Editor with write permissions
125    Editor,
126    /// Viewer with read-only access
127    Viewer,
128}
129
130/// Participant status
131#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
132#[serde(rename_all = "snake_case")]
133pub enum ParticipantStatus {
134    /// Actively engaged
135    Active,
136    /// Connected but idle
137    Idle,
138    /// Temporarily away
139    Away,
140    /// Disconnected
141    Offline,
142}
143
144/// Access control for shared sessions
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct AccessControl {
147    /// Is session public or private
148    pub is_public: bool,
149    /// Allowed user IDs (if private)
150    pub allowed_users: HashSet<String>,
151    /// Require approval for new participants
152    pub require_approval: bool,
153    /// Maximum number of participants
154    pub max_participants: usize,
155}
156
157impl Default for AccessControl {
158    fn default() -> Self {
159        Self {
160            is_public: false,
161            allowed_users: HashSet::new(),
162            require_approval: false,
163            max_participants: 10,
164        }
165    }
166}
167
168/// Collaboration update event
169#[derive(Debug, Clone, Serialize, Deserialize)]
170#[serde(tag = "type")]
171pub enum CollaborationUpdate {
172    /// User joined session
173    #[serde(rename = "user_joined")]
174    UserJoined {
175        session_id: String,
176        participant: Participant,
177    },
178    /// User left session
179    #[serde(rename = "user_left")]
180    UserLeft { session_id: String, user_id: String },
181    /// Cursor position updated
182    #[serde(rename = "cursor_moved")]
183    CursorMoved {
184        session_id: String,
185        user_id: String,
186        position: CursorPosition,
187    },
188    /// Participant status changed
189    #[serde(rename = "status_changed")]
190    StatusChanged {
191        session_id: String,
192        user_id: String,
193        status: ParticipantStatus,
194    },
195    /// Session metadata updated
196    #[serde(rename = "metadata_updated")]
197    MetadataUpdated {
198        session_id: String,
199        metadata: HashMap<String, serde_json::Value>,
200    },
201    /// Query being collaboratively built
202    #[serde(rename = "query_update")]
203    QueryUpdate {
204        session_id: String,
205        user_id: String,
206        query_text: String,
207        cursor_position: Option<CursorPosition>,
208    },
209}
210
211impl CollaborationManager {
212    /// Create a new collaboration manager
213    pub fn new(config: CollaborationConfig) -> Self {
214        let (update_tx, _) = broadcast::channel(config.broadcast_buffer_size);
215
216        Self {
217            shared_sessions: Arc::new(RwLock::new(HashMap::new())),
218            update_tx,
219            config,
220        }
221    }
222
223    /// Create a new shared session
224    pub async fn create_shared_session(
225        &self,
226        owner_id: String,
227        access_control: Option<AccessControl>,
228    ) -> Result<String> {
229        let session_id = Uuid::new_v4().to_string();
230        let now = Utc::now();
231
232        // Generate a random avatar color for the owner
233        let avatar_color = Self::generate_avatar_color();
234
235        let owner = Participant {
236            user_id: owner_id.clone(),
237            display_name: format!("User {}", &owner_id[..owner_id.len().min(8)]),
238            joined_at: now,
239            last_seen: now,
240            cursor_position: None,
241            role: ParticipantRole::Owner,
242            status: ParticipantStatus::Active,
243            avatar_color,
244        };
245
246        let mut participants = HashMap::new();
247        participants.insert(owner_id.clone(), owner.clone());
248
249        let session = SharedSession {
250            session_id: session_id.clone(),
251            owner_id,
252            participants,
253            created_at: now,
254            last_activity: now,
255            metadata: HashMap::new(),
256            access_control: access_control.unwrap_or_default(),
257        };
258
259        let mut sessions = self.shared_sessions.write().await;
260        sessions.insert(session_id.clone(), session);
261
262        info!("Created shared session: {}", session_id);
263
264        Ok(session_id)
265    }
266
267    /// Add a participant to a shared session
268    pub async fn join_session(
269        &self,
270        session_id: &str,
271        user_id: String,
272        display_name: Option<String>,
273    ) -> Result<()> {
274        let mut sessions = self.shared_sessions.write().await;
275
276        let session = sessions.get_mut(session_id).context("Session not found")?;
277
278        // Check access control
279        if !session.access_control.is_public
280            && !session.access_control.allowed_users.contains(&user_id)
281            && session.owner_id != user_id
282        {
283            anyhow::bail!("Access denied to session");
284        }
285
286        // Check participant limit
287        if session.participants.len() >= session.access_control.max_participants {
288            anyhow::bail!("Session has reached maximum participants");
289        }
290
291        let now = Utc::now();
292        let avatar_color = Self::generate_avatar_color();
293
294        let participant = Participant {
295            user_id: user_id.clone(),
296            display_name: display_name
297                .unwrap_or_else(|| format!("User {}", &user_id[..user_id.len().min(8)])),
298            joined_at: now,
299            last_seen: now,
300            cursor_position: None,
301            role: ParticipantRole::Editor,
302            status: ParticipantStatus::Active,
303            avatar_color,
304        };
305
306        session
307            .participants
308            .insert(user_id.clone(), participant.clone());
309        session.last_activity = now;
310
311        // Broadcast join event
312        let _ = self.update_tx.send(CollaborationUpdate::UserJoined {
313            session_id: session_id.to_string(),
314            participant,
315        });
316
317        info!("User {} joined session {}", user_id, session_id);
318
319        Ok(())
320    }
321
322    /// Remove a participant from a session
323    pub async fn leave_session(&self, session_id: &str, user_id: &str) -> Result<()> {
324        let mut sessions = self.shared_sessions.write().await;
325
326        let session = sessions.get_mut(session_id).context("Session not found")?;
327
328        session.participants.remove(user_id);
329        session.last_activity = Utc::now();
330
331        // Broadcast leave event
332        let _ = self.update_tx.send(CollaborationUpdate::UserLeft {
333            session_id: session_id.to_string(),
334            user_id: user_id.to_string(),
335        });
336
337        info!("User {} left session {}", user_id, session_id);
338
339        // Remove session if empty
340        if session.participants.is_empty() {
341            sessions.remove(session_id);
342            info!("Removed empty session {}", session_id);
343        }
344
345        Ok(())
346    }
347
348    /// Update cursor position for a user
349    pub async fn update_cursor(
350        &self,
351        session_id: &str,
352        user_id: &str,
353        position: CursorPosition,
354    ) -> Result<()> {
355        if !self.config.enable_cursor_sharing {
356            return Ok(());
357        }
358
359        let mut sessions = self.shared_sessions.write().await;
360
361        let session = sessions.get_mut(session_id).context("Session not found")?;
362
363        if let Some(participant) = session.participants.get_mut(user_id) {
364            participant.cursor_position = Some(position.clone());
365            participant.last_seen = Utc::now();
366            session.last_activity = Utc::now();
367
368            // Broadcast cursor update
369            let _ = self.update_tx.send(CollaborationUpdate::CursorMoved {
370                session_id: session_id.to_string(),
371                user_id: user_id.to_string(),
372                position,
373            });
374        }
375
376        Ok(())
377    }
378
379    /// Update participant status
380    pub async fn update_status(
381        &self,
382        session_id: &str,
383        user_id: &str,
384        status: ParticipantStatus,
385    ) -> Result<()> {
386        let mut sessions = self.shared_sessions.write().await;
387
388        let session = sessions.get_mut(session_id).context("Session not found")?;
389
390        if let Some(participant) = session.participants.get_mut(user_id) {
391            participant.status = status;
392            participant.last_seen = Utc::now();
393            session.last_activity = Utc::now();
394
395            // Broadcast status update
396            let _ = self.update_tx.send(CollaborationUpdate::StatusChanged {
397                session_id: session_id.to_string(),
398                user_id: user_id.to_string(),
399                status,
400            });
401        }
402
403        Ok(())
404    }
405
406    /// Broadcast a collaborative query update
407    pub async fn broadcast_query_update(
408        &self,
409        session_id: &str,
410        user_id: &str,
411        query_text: String,
412        cursor_position: Option<CursorPosition>,
413    ) -> Result<()> {
414        let sessions = self.shared_sessions.read().await;
415
416        if !sessions.contains_key(session_id) {
417            anyhow::bail!("Session not found");
418        }
419
420        // Broadcast query update
421        let _ = self.update_tx.send(CollaborationUpdate::QueryUpdate {
422            session_id: session_id.to_string(),
423            user_id: user_id.to_string(),
424            query_text,
425            cursor_position,
426        });
427
428        Ok(())
429    }
430
431    /// Get session information
432    pub async fn get_session(&self, session_id: &str) -> Option<SharedSession> {
433        let sessions = self.shared_sessions.read().await;
434        sessions.get(session_id).cloned()
435    }
436
437    /// List all active sessions
438    pub async fn list_sessions(&self) -> Vec<String> {
439        let sessions = self.shared_sessions.read().await;
440        sessions.keys().cloned().collect()
441    }
442
443    /// Get participants in a session
444    pub async fn get_participants(&self, session_id: &str) -> Option<Vec<Participant>> {
445        let sessions = self.shared_sessions.read().await;
446        sessions
447            .get(session_id)
448            .map(|s| s.participants.values().cloned().collect())
449    }
450
451    /// Subscribe to collaboration updates
452    pub fn subscribe(&self) -> broadcast::Receiver<CollaborationUpdate> {
453        self.update_tx.subscribe()
454    }
455
456    /// Clean up idle sessions
457    pub async fn cleanup_idle_sessions(&self) -> usize {
458        let mut sessions = self.shared_sessions.write().await;
459        let idle_threshold = chrono::Duration::seconds(self.config.idle_timeout_secs as i64);
460        let now = Utc::now();
461
462        let mut removed_count = 0;
463        let idle_sessions: Vec<String> = sessions
464            .iter()
465            .filter(|(_, session)| {
466                now.signed_duration_since(session.last_activity) > idle_threshold
467            })
468            .map(|(id, _)| id.clone())
469            .collect();
470
471        for session_id in idle_sessions {
472            sessions.remove(&session_id);
473            removed_count += 1;
474            info!("Removed idle session: {}", session_id);
475        }
476
477        removed_count
478    }
479
480    /// Generate a random avatar color for visual distinction
481    fn generate_avatar_color() -> String {
482        let colors = [
483            "#FF6B6B", "#4ECDC4", "#45B7D1", "#FFA07A", "#98D8C8", "#F7DC6F", "#BB8FCE", "#85C1E2",
484            "#F8B195", "#C06C84",
485        ];
486
487        let index = fastrand::usize(..colors.len());
488        colors[index].to_string()
489    }
490}
491
492/// Statistics for collaboration features
493#[derive(Debug, Serialize, Deserialize)]
494pub struct CollaborationStats {
495    /// Total number of active shared sessions
496    pub active_sessions: usize,
497    /// Total number of participants across all sessions
498    pub total_participants: usize,
499    /// Average participants per session
500    pub avg_participants_per_session: f64,
501    /// Sessions by participant count
502    pub sessions_by_size: HashMap<usize, usize>,
503}
504
505impl CollaborationManager {
506    /// Get collaboration statistics
507    pub async fn get_stats(&self) -> CollaborationStats {
508        let sessions = self.shared_sessions.read().await;
509
510        let active_sessions = sessions.len();
511        let mut total_participants = 0;
512        let mut sessions_by_size: HashMap<usize, usize> = HashMap::new();
513
514        for session in sessions.values() {
515            let participant_count = session.participants.len();
516            total_participants += participant_count;
517            *sessions_by_size.entry(participant_count).or_insert(0) += 1;
518        }
519
520        let avg_participants_per_session = if active_sessions > 0 {
521            total_participants as f64 / active_sessions as f64
522        } else {
523            0.0
524        };
525
526        CollaborationStats {
527            active_sessions,
528            total_participants,
529            avg_participants_per_session,
530            sessions_by_size,
531        }
532    }
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538
539    #[tokio::test]
540    async fn test_create_shared_session() {
541        let config = CollaborationConfig::default();
542        let manager = CollaborationManager::new(config);
543
544        let session_id = manager
545            .create_shared_session("user1".to_string(), None)
546            .await
547            .unwrap();
548
549        assert!(!session_id.is_empty());
550
551        let session = manager.get_session(&session_id).await.unwrap();
552        assert_eq!(session.owner_id, "user1");
553        assert_eq!(session.participants.len(), 1);
554    }
555
556    #[tokio::test]
557    async fn test_join_session() {
558        let config = CollaborationConfig::default();
559        let manager = CollaborationManager::new(config);
560
561        let session_id = manager
562            .create_shared_session(
563                "user1".to_string(),
564                Some(AccessControl {
565                    is_public: true,
566                    ..Default::default()
567                }),
568            )
569            .await
570            .unwrap();
571
572        manager
573            .join_session(&session_id, "user2".to_string(), Some("User 2".to_string()))
574            .await
575            .unwrap();
576
577        let participants = manager.get_participants(&session_id).await.unwrap();
578        assert_eq!(participants.len(), 2);
579    }
580
581    #[tokio::test]
582    async fn test_cursor_update() {
583        let config = CollaborationConfig::default();
584        let manager = CollaborationManager::new(config);
585
586        let session_id = manager
587            .create_shared_session("user1".to_string(), None)
588            .await
589            .unwrap();
590
591        let position = CursorPosition {
592            line: 10,
593            column: 5,
594            selection: None,
595            updated_at: Utc::now(),
596        };
597
598        manager
599            .update_cursor(&session_id, "user1", position)
600            .await
601            .unwrap();
602
603        let session = manager.get_session(&session_id).await.unwrap();
604        let participant = session.participants.get("user1").unwrap();
605        assert!(participant.cursor_position.is_some());
606    }
607
608    #[tokio::test]
609    async fn test_collaboration_stats() {
610        let config = CollaborationConfig::default();
611        let manager = CollaborationManager::new(config);
612
613        let _session1 = manager
614            .create_shared_session("user1".to_string(), None)
615            .await
616            .unwrap();
617
618        let stats = manager.get_stats().await;
619        assert_eq!(stats.active_sessions, 1);
620        assert_eq!(stats.total_participants, 1);
621    }
622}