1use anyhow::{Context, Result};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use tokio::sync::{broadcast, RwLock};
16use tracing::info;
17use uuid::Uuid;
18
19pub struct CollaborationManager {
21 shared_sessions: Arc<RwLock<HashMap<String, SharedSession>>>,
23 update_tx: broadcast::Sender<CollaborationUpdate>,
25 config: CollaborationConfig,
27}
28
29#[derive(Debug, Clone)]
31pub struct CollaborationConfig {
32 pub max_users_per_session: usize,
34 pub enable_cursor_sharing: bool,
36 pub enable_presence: bool,
38 pub idle_timeout_secs: u64,
40 pub broadcast_buffer_size: usize,
42}
43
44impl Default for CollaborationConfig {
45 fn default() -> Self {
46 Self {
47 max_users_per_session: 10,
48 enable_cursor_sharing: true,
49 enable_presence: true,
50 idle_timeout_secs: 1800, broadcast_buffer_size: 1000,
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
58pub struct SharedSession {
59 pub session_id: String,
61 pub owner_id: String,
63 pub participants: HashMap<String, Participant>,
65 pub created_at: DateTime<Utc>,
67 pub last_activity: DateTime<Utc>,
69 pub metadata: HashMap<String, serde_json::Value>,
71 pub access_control: AccessControl,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Participant {
78 pub user_id: String,
80 pub display_name: String,
82 pub joined_at: DateTime<Utc>,
84 pub last_seen: DateTime<Utc>,
86 pub cursor_position: Option<CursorPosition>,
88 pub role: ParticipantRole,
90 pub status: ParticipantStatus,
92 pub avatar_color: String,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct CursorPosition {
99 pub line: usize,
101 pub column: usize,
103 pub selection: Option<TextRange>,
105 pub updated_at: DateTime<Utc>,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct TextRange {
112 pub start_line: usize,
113 pub start_column: usize,
114 pub end_line: usize,
115 pub end_column: usize,
116}
117
118#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
120#[serde(rename_all = "snake_case")]
121pub enum ParticipantRole {
122 Owner,
124 Editor,
126 Viewer,
128}
129
130#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
132#[serde(rename_all = "snake_case")]
133pub enum ParticipantStatus {
134 Active,
136 Idle,
138 Away,
140 Offline,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct AccessControl {
147 pub is_public: bool,
149 pub allowed_users: HashSet<String>,
151 pub require_approval: bool,
153 pub max_participants: usize,
155}
156
157impl Default for AccessControl {
158 fn default() -> Self {
159 Self {
160 is_public: false,
161 allowed_users: HashSet::new(),
162 require_approval: false,
163 max_participants: 10,
164 }
165 }
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
170#[serde(tag = "type")]
171pub enum CollaborationUpdate {
172 #[serde(rename = "user_joined")]
174 UserJoined {
175 session_id: String,
176 participant: Participant,
177 },
178 #[serde(rename = "user_left")]
180 UserLeft { session_id: String, user_id: String },
181 #[serde(rename = "cursor_moved")]
183 CursorMoved {
184 session_id: String,
185 user_id: String,
186 position: CursorPosition,
187 },
188 #[serde(rename = "status_changed")]
190 StatusChanged {
191 session_id: String,
192 user_id: String,
193 status: ParticipantStatus,
194 },
195 #[serde(rename = "metadata_updated")]
197 MetadataUpdated {
198 session_id: String,
199 metadata: HashMap<String, serde_json::Value>,
200 },
201 #[serde(rename = "query_update")]
203 QueryUpdate {
204 session_id: String,
205 user_id: String,
206 query_text: String,
207 cursor_position: Option<CursorPosition>,
208 },
209}
210
211impl CollaborationManager {
212 pub fn new(config: CollaborationConfig) -> Self {
214 let (update_tx, _) = broadcast::channel(config.broadcast_buffer_size);
215
216 Self {
217 shared_sessions: Arc::new(RwLock::new(HashMap::new())),
218 update_tx,
219 config,
220 }
221 }
222
223 pub async fn create_shared_session(
225 &self,
226 owner_id: String,
227 access_control: Option<AccessControl>,
228 ) -> Result<String> {
229 let session_id = Uuid::new_v4().to_string();
230 let now = Utc::now();
231
232 let avatar_color = Self::generate_avatar_color();
234
235 let owner = Participant {
236 user_id: owner_id.clone(),
237 display_name: format!("User {}", &owner_id[..owner_id.len().min(8)]),
238 joined_at: now,
239 last_seen: now,
240 cursor_position: None,
241 role: ParticipantRole::Owner,
242 status: ParticipantStatus::Active,
243 avatar_color,
244 };
245
246 let mut participants = HashMap::new();
247 participants.insert(owner_id.clone(), owner.clone());
248
249 let session = SharedSession {
250 session_id: session_id.clone(),
251 owner_id,
252 participants,
253 created_at: now,
254 last_activity: now,
255 metadata: HashMap::new(),
256 access_control: access_control.unwrap_or_default(),
257 };
258
259 let mut sessions = self.shared_sessions.write().await;
260 sessions.insert(session_id.clone(), session);
261
262 info!("Created shared session: {}", session_id);
263
264 Ok(session_id)
265 }
266
267 pub async fn join_session(
269 &self,
270 session_id: &str,
271 user_id: String,
272 display_name: Option<String>,
273 ) -> Result<()> {
274 let mut sessions = self.shared_sessions.write().await;
275
276 let session = sessions.get_mut(session_id).context("Session not found")?;
277
278 if !session.access_control.is_public
280 && !session.access_control.allowed_users.contains(&user_id)
281 && session.owner_id != user_id
282 {
283 anyhow::bail!("Access denied to session");
284 }
285
286 if session.participants.len() >= session.access_control.max_participants {
288 anyhow::bail!("Session has reached maximum participants");
289 }
290
291 let now = Utc::now();
292 let avatar_color = Self::generate_avatar_color();
293
294 let participant = Participant {
295 user_id: user_id.clone(),
296 display_name: display_name
297 .unwrap_or_else(|| format!("User {}", &user_id[..user_id.len().min(8)])),
298 joined_at: now,
299 last_seen: now,
300 cursor_position: None,
301 role: ParticipantRole::Editor,
302 status: ParticipantStatus::Active,
303 avatar_color,
304 };
305
306 session
307 .participants
308 .insert(user_id.clone(), participant.clone());
309 session.last_activity = now;
310
311 let _ = self.update_tx.send(CollaborationUpdate::UserJoined {
313 session_id: session_id.to_string(),
314 participant,
315 });
316
317 info!("User {} joined session {}", user_id, session_id);
318
319 Ok(())
320 }
321
322 pub async fn leave_session(&self, session_id: &str, user_id: &str) -> Result<()> {
324 let mut sessions = self.shared_sessions.write().await;
325
326 let session = sessions.get_mut(session_id).context("Session not found")?;
327
328 session.participants.remove(user_id);
329 session.last_activity = Utc::now();
330
331 let _ = self.update_tx.send(CollaborationUpdate::UserLeft {
333 session_id: session_id.to_string(),
334 user_id: user_id.to_string(),
335 });
336
337 info!("User {} left session {}", user_id, session_id);
338
339 if session.participants.is_empty() {
341 sessions.remove(session_id);
342 info!("Removed empty session {}", session_id);
343 }
344
345 Ok(())
346 }
347
348 pub async fn update_cursor(
350 &self,
351 session_id: &str,
352 user_id: &str,
353 position: CursorPosition,
354 ) -> Result<()> {
355 if !self.config.enable_cursor_sharing {
356 return Ok(());
357 }
358
359 let mut sessions = self.shared_sessions.write().await;
360
361 let session = sessions.get_mut(session_id).context("Session not found")?;
362
363 if let Some(participant) = session.participants.get_mut(user_id) {
364 participant.cursor_position = Some(position.clone());
365 participant.last_seen = Utc::now();
366 session.last_activity = Utc::now();
367
368 let _ = self.update_tx.send(CollaborationUpdate::CursorMoved {
370 session_id: session_id.to_string(),
371 user_id: user_id.to_string(),
372 position,
373 });
374 }
375
376 Ok(())
377 }
378
379 pub async fn update_status(
381 &self,
382 session_id: &str,
383 user_id: &str,
384 status: ParticipantStatus,
385 ) -> Result<()> {
386 let mut sessions = self.shared_sessions.write().await;
387
388 let session = sessions.get_mut(session_id).context("Session not found")?;
389
390 if let Some(participant) = session.participants.get_mut(user_id) {
391 participant.status = status;
392 participant.last_seen = Utc::now();
393 session.last_activity = Utc::now();
394
395 let _ = self.update_tx.send(CollaborationUpdate::StatusChanged {
397 session_id: session_id.to_string(),
398 user_id: user_id.to_string(),
399 status,
400 });
401 }
402
403 Ok(())
404 }
405
406 pub async fn broadcast_query_update(
408 &self,
409 session_id: &str,
410 user_id: &str,
411 query_text: String,
412 cursor_position: Option<CursorPosition>,
413 ) -> Result<()> {
414 let sessions = self.shared_sessions.read().await;
415
416 if !sessions.contains_key(session_id) {
417 anyhow::bail!("Session not found");
418 }
419
420 let _ = self.update_tx.send(CollaborationUpdate::QueryUpdate {
422 session_id: session_id.to_string(),
423 user_id: user_id.to_string(),
424 query_text,
425 cursor_position,
426 });
427
428 Ok(())
429 }
430
431 pub async fn get_session(&self, session_id: &str) -> Option<SharedSession> {
433 let sessions = self.shared_sessions.read().await;
434 sessions.get(session_id).cloned()
435 }
436
437 pub async fn list_sessions(&self) -> Vec<String> {
439 let sessions = self.shared_sessions.read().await;
440 sessions.keys().cloned().collect()
441 }
442
443 pub async fn get_participants(&self, session_id: &str) -> Option<Vec<Participant>> {
445 let sessions = self.shared_sessions.read().await;
446 sessions
447 .get(session_id)
448 .map(|s| s.participants.values().cloned().collect())
449 }
450
451 pub fn subscribe(&self) -> broadcast::Receiver<CollaborationUpdate> {
453 self.update_tx.subscribe()
454 }
455
456 pub async fn cleanup_idle_sessions(&self) -> usize {
458 let mut sessions = self.shared_sessions.write().await;
459 let idle_threshold = chrono::Duration::seconds(self.config.idle_timeout_secs as i64);
460 let now = Utc::now();
461
462 let mut removed_count = 0;
463 let idle_sessions: Vec<String> = sessions
464 .iter()
465 .filter(|(_, session)| {
466 now.signed_duration_since(session.last_activity) > idle_threshold
467 })
468 .map(|(id, _)| id.clone())
469 .collect();
470
471 for session_id in idle_sessions {
472 sessions.remove(&session_id);
473 removed_count += 1;
474 info!("Removed idle session: {}", session_id);
475 }
476
477 removed_count
478 }
479
480 fn generate_avatar_color() -> String {
482 let colors = [
483 "#FF6B6B", "#4ECDC4", "#45B7D1", "#FFA07A", "#98D8C8", "#F7DC6F", "#BB8FCE", "#85C1E2",
484 "#F8B195", "#C06C84",
485 ];
486
487 let index = fastrand::usize(..colors.len());
488 colors[index].to_string()
489 }
490}
491
492#[derive(Debug, Serialize, Deserialize)]
494pub struct CollaborationStats {
495 pub active_sessions: usize,
497 pub total_participants: usize,
499 pub avg_participants_per_session: f64,
501 pub sessions_by_size: HashMap<usize, usize>,
503}
504
505impl CollaborationManager {
506 pub async fn get_stats(&self) -> CollaborationStats {
508 let sessions = self.shared_sessions.read().await;
509
510 let active_sessions = sessions.len();
511 let mut total_participants = 0;
512 let mut sessions_by_size: HashMap<usize, usize> = HashMap::new();
513
514 for session in sessions.values() {
515 let participant_count = session.participants.len();
516 total_participants += participant_count;
517 *sessions_by_size.entry(participant_count).or_insert(0) += 1;
518 }
519
520 let avg_participants_per_session = if active_sessions > 0 {
521 total_participants as f64 / active_sessions as f64
522 } else {
523 0.0
524 };
525
526 CollaborationStats {
527 active_sessions,
528 total_participants,
529 avg_participants_per_session,
530 sessions_by_size,
531 }
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538
539 #[tokio::test]
540 async fn test_create_shared_session() {
541 let config = CollaborationConfig::default();
542 let manager = CollaborationManager::new(config);
543
544 let session_id = manager
545 .create_shared_session("user1".to_string(), None)
546 .await
547 .unwrap();
548
549 assert!(!session_id.is_empty());
550
551 let session = manager.get_session(&session_id).await.unwrap();
552 assert_eq!(session.owner_id, "user1");
553 assert_eq!(session.participants.len(), 1);
554 }
555
556 #[tokio::test]
557 async fn test_join_session() {
558 let config = CollaborationConfig::default();
559 let manager = CollaborationManager::new(config);
560
561 let session_id = manager
562 .create_shared_session(
563 "user1".to_string(),
564 Some(AccessControl {
565 is_public: true,
566 ..Default::default()
567 }),
568 )
569 .await
570 .unwrap();
571
572 manager
573 .join_session(&session_id, "user2".to_string(), Some("User 2".to_string()))
574 .await
575 .unwrap();
576
577 let participants = manager.get_participants(&session_id).await.unwrap();
578 assert_eq!(participants.len(), 2);
579 }
580
581 #[tokio::test]
582 async fn test_cursor_update() {
583 let config = CollaborationConfig::default();
584 let manager = CollaborationManager::new(config);
585
586 let session_id = manager
587 .create_shared_session("user1".to_string(), None)
588 .await
589 .unwrap();
590
591 let position = CursorPosition {
592 line: 10,
593 column: 5,
594 selection: None,
595 updated_at: Utc::now(),
596 };
597
598 manager
599 .update_cursor(&session_id, "user1", position)
600 .await
601 .unwrap();
602
603 let session = manager.get_session(&session_id).await.unwrap();
604 let participant = session.participants.get("user1").unwrap();
605 assert!(participant.cursor_position.is_some());
606 }
607
608 #[tokio::test]
609 async fn test_collaboration_stats() {
610 let config = CollaborationConfig::default();
611 let manager = CollaborationManager::new(config);
612
613 let _session1 = manager
614 .create_shared_session("user1".to_string(), None)
615 .await
616 .unwrap();
617
618 let stats = manager.get_stats().await;
619 assert_eq!(stats.active_sessions, 1);
620 assert_eq!(stats.total_participants, 1);
621 }
622}