use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::info;
use uuid::Uuid;
pub struct CollaborationManager {
shared_sessions: Arc<RwLock<HashMap<String, SharedSession>>>,
update_tx: broadcast::Sender<CollaborationUpdate>,
config: CollaborationConfig,
}
#[derive(Debug, Clone)]
pub struct CollaborationConfig {
pub max_users_per_session: usize,
pub enable_cursor_sharing: bool,
pub enable_presence: bool,
pub idle_timeout_secs: u64,
pub broadcast_buffer_size: usize,
}
impl Default for CollaborationConfig {
fn default() -> Self {
Self {
max_users_per_session: 10,
enable_cursor_sharing: true,
enable_presence: true,
idle_timeout_secs: 1800, broadcast_buffer_size: 1000,
}
}
}
#[derive(Debug, Clone)]
pub struct SharedSession {
pub session_id: String,
pub owner_id: String,
pub participants: HashMap<String, Participant>,
pub created_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub metadata: HashMap<String, serde_json::Value>,
pub access_control: AccessControl,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Participant {
pub user_id: String,
pub display_name: String,
pub joined_at: DateTime<Utc>,
pub last_seen: DateTime<Utc>,
pub cursor_position: Option<CursorPosition>,
pub role: ParticipantRole,
pub status: ParticipantStatus,
pub avatar_color: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CursorPosition {
pub line: usize,
pub column: usize,
pub selection: Option<TextRange>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TextRange {
pub start_line: usize,
pub start_column: usize,
pub end_line: usize,
pub end_column: usize,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ParticipantRole {
Owner,
Editor,
Viewer,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ParticipantStatus {
Active,
Idle,
Away,
Offline,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccessControl {
pub is_public: bool,
pub allowed_users: HashSet<String>,
pub require_approval: bool,
pub max_participants: usize,
}
impl Default for AccessControl {
fn default() -> Self {
Self {
is_public: false,
allowed_users: HashSet::new(),
require_approval: false,
max_participants: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum CollaborationUpdate {
#[serde(rename = "user_joined")]
UserJoined {
session_id: String,
participant: Participant,
},
#[serde(rename = "user_left")]
UserLeft { session_id: String, user_id: String },
#[serde(rename = "cursor_moved")]
CursorMoved {
session_id: String,
user_id: String,
position: CursorPosition,
},
#[serde(rename = "status_changed")]
StatusChanged {
session_id: String,
user_id: String,
status: ParticipantStatus,
},
#[serde(rename = "metadata_updated")]
MetadataUpdated {
session_id: String,
metadata: HashMap<String, serde_json::Value>,
},
#[serde(rename = "query_update")]
QueryUpdate {
session_id: String,
user_id: String,
query_text: String,
cursor_position: Option<CursorPosition>,
},
}
impl CollaborationManager {
pub fn new(config: CollaborationConfig) -> Self {
let (update_tx, _) = broadcast::channel(config.broadcast_buffer_size);
Self {
shared_sessions: Arc::new(RwLock::new(HashMap::new())),
update_tx,
config,
}
}
pub async fn create_shared_session(
&self,
owner_id: String,
access_control: Option<AccessControl>,
) -> Result<String> {
let session_id = Uuid::new_v4().to_string();
let now = Utc::now();
let avatar_color = Self::generate_avatar_color();
let owner = Participant {
user_id: owner_id.clone(),
display_name: format!("User {}", &owner_id[..owner_id.len().min(8)]),
joined_at: now,
last_seen: now,
cursor_position: None,
role: ParticipantRole::Owner,
status: ParticipantStatus::Active,
avatar_color,
};
let mut participants = HashMap::new();
participants.insert(owner_id.clone(), owner.clone());
let session = SharedSession {
session_id: session_id.clone(),
owner_id,
participants,
created_at: now,
last_activity: now,
metadata: HashMap::new(),
access_control: access_control.unwrap_or_default(),
};
let mut sessions = self.shared_sessions.write().await;
sessions.insert(session_id.clone(), session);
info!("Created shared session: {}", session_id);
Ok(session_id)
}
pub async fn join_session(
&self,
session_id: &str,
user_id: String,
display_name: Option<String>,
) -> Result<()> {
let mut sessions = self.shared_sessions.write().await;
let session = sessions.get_mut(session_id).context("Session not found")?;
if !session.access_control.is_public
&& !session.access_control.allowed_users.contains(&user_id)
&& session.owner_id != user_id
{
anyhow::bail!("Access denied to session");
}
if session.participants.len() >= session.access_control.max_participants {
anyhow::bail!("Session has reached maximum participants");
}
let now = Utc::now();
let avatar_color = Self::generate_avatar_color();
let participant = Participant {
user_id: user_id.clone(),
display_name: display_name
.unwrap_or_else(|| format!("User {}", &user_id[..user_id.len().min(8)])),
joined_at: now,
last_seen: now,
cursor_position: None,
role: ParticipantRole::Editor,
status: ParticipantStatus::Active,
avatar_color,
};
session
.participants
.insert(user_id.clone(), participant.clone());
session.last_activity = now;
let _ = self.update_tx.send(CollaborationUpdate::UserJoined {
session_id: session_id.to_string(),
participant,
});
info!("User {} joined session {}", user_id, session_id);
Ok(())
}
pub async fn leave_session(&self, session_id: &str, user_id: &str) -> Result<()> {
let mut sessions = self.shared_sessions.write().await;
let session = sessions.get_mut(session_id).context("Session not found")?;
session.participants.remove(user_id);
session.last_activity = Utc::now();
let _ = self.update_tx.send(CollaborationUpdate::UserLeft {
session_id: session_id.to_string(),
user_id: user_id.to_string(),
});
info!("User {} left session {}", user_id, session_id);
if session.participants.is_empty() {
sessions.remove(session_id);
info!("Removed empty session {}", session_id);
}
Ok(())
}
pub async fn update_cursor(
&self,
session_id: &str,
user_id: &str,
position: CursorPosition,
) -> Result<()> {
if !self.config.enable_cursor_sharing {
return Ok(());
}
let mut sessions = self.shared_sessions.write().await;
let session = sessions.get_mut(session_id).context("Session not found")?;
if let Some(participant) = session.participants.get_mut(user_id) {
participant.cursor_position = Some(position.clone());
participant.last_seen = Utc::now();
session.last_activity = Utc::now();
let _ = self.update_tx.send(CollaborationUpdate::CursorMoved {
session_id: session_id.to_string(),
user_id: user_id.to_string(),
position,
});
}
Ok(())
}
pub async fn update_status(
&self,
session_id: &str,
user_id: &str,
status: ParticipantStatus,
) -> Result<()> {
let mut sessions = self.shared_sessions.write().await;
let session = sessions.get_mut(session_id).context("Session not found")?;
if let Some(participant) = session.participants.get_mut(user_id) {
participant.status = status;
participant.last_seen = Utc::now();
session.last_activity = Utc::now();
let _ = self.update_tx.send(CollaborationUpdate::StatusChanged {
session_id: session_id.to_string(),
user_id: user_id.to_string(),
status,
});
}
Ok(())
}
pub async fn broadcast_query_update(
&self,
session_id: &str,
user_id: &str,
query_text: String,
cursor_position: Option<CursorPosition>,
) -> Result<()> {
let sessions = self.shared_sessions.read().await;
if !sessions.contains_key(session_id) {
anyhow::bail!("Session not found");
}
let _ = self.update_tx.send(CollaborationUpdate::QueryUpdate {
session_id: session_id.to_string(),
user_id: user_id.to_string(),
query_text,
cursor_position,
});
Ok(())
}
pub async fn get_session(&self, session_id: &str) -> Option<SharedSession> {
let sessions = self.shared_sessions.read().await;
sessions.get(session_id).cloned()
}
pub async fn list_sessions(&self) -> Vec<String> {
let sessions = self.shared_sessions.read().await;
sessions.keys().cloned().collect()
}
pub async fn get_participants(&self, session_id: &str) -> Option<Vec<Participant>> {
let sessions = self.shared_sessions.read().await;
sessions
.get(session_id)
.map(|s| s.participants.values().cloned().collect())
}
pub fn subscribe(&self) -> broadcast::Receiver<CollaborationUpdate> {
self.update_tx.subscribe()
}
pub async fn cleanup_idle_sessions(&self) -> usize {
let mut sessions = self.shared_sessions.write().await;
let idle_threshold = chrono::Duration::seconds(self.config.idle_timeout_secs as i64);
let now = Utc::now();
let mut removed_count = 0;
let idle_sessions: Vec<String> = sessions
.iter()
.filter(|(_, session)| {
now.signed_duration_since(session.last_activity) > idle_threshold
})
.map(|(id, _)| id.clone())
.collect();
for session_id in idle_sessions {
sessions.remove(&session_id);
removed_count += 1;
info!("Removed idle session: {}", session_id);
}
removed_count
}
fn generate_avatar_color() -> String {
let colors = [
"#FF6B6B", "#4ECDC4", "#45B7D1", "#FFA07A", "#98D8C8", "#F7DC6F", "#BB8FCE", "#85C1E2",
"#F8B195", "#C06C84",
];
let index = fastrand::usize(..colors.len());
colors[index].to_string()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CollaborationStats {
pub active_sessions: usize,
pub total_participants: usize,
pub avg_participants_per_session: f64,
pub sessions_by_size: HashMap<usize, usize>,
}
impl CollaborationManager {
pub async fn get_stats(&self) -> CollaborationStats {
let sessions = self.shared_sessions.read().await;
let active_sessions = sessions.len();
let mut total_participants = 0;
let mut sessions_by_size: HashMap<usize, usize> = HashMap::new();
for session in sessions.values() {
let participant_count = session.participants.len();
total_participants += participant_count;
*sessions_by_size.entry(participant_count).or_insert(0) += 1;
}
let avg_participants_per_session = if active_sessions > 0 {
total_participants as f64 / active_sessions as f64
} else {
0.0
};
CollaborationStats {
active_sessions,
total_participants,
avg_participants_per_session,
sessions_by_size,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_create_shared_session() {
let config = CollaborationConfig::default();
let manager = CollaborationManager::new(config);
let session_id = manager
.create_shared_session("user1".to_string(), None)
.await
.expect("should succeed");
assert!(!session_id.is_empty());
let session = manager
.get_session(&session_id)
.await
.expect("should succeed");
assert_eq!(session.owner_id, "user1");
assert_eq!(session.participants.len(), 1);
}
#[tokio::test]
async fn test_join_session() {
let config = CollaborationConfig::default();
let manager = CollaborationManager::new(config);
let session_id = manager
.create_shared_session(
"user1".to_string(),
Some(AccessControl {
is_public: true,
..Default::default()
}),
)
.await
.expect("should succeed");
manager
.join_session(&session_id, "user2".to_string(), Some("User 2".to_string()))
.await
.expect("should succeed");
let participants = manager
.get_participants(&session_id)
.await
.expect("should succeed");
assert_eq!(participants.len(), 2);
}
#[tokio::test]
async fn test_cursor_update() {
let config = CollaborationConfig::default();
let manager = CollaborationManager::new(config);
let session_id = manager
.create_shared_session("user1".to_string(), None)
.await
.expect("should succeed");
let position = CursorPosition {
line: 10,
column: 5,
selection: None,
updated_at: Utc::now(),
};
manager
.update_cursor(&session_id, "user1", position)
.await
.expect("should succeed");
let session = manager
.get_session(&session_id)
.await
.expect("should succeed");
let participant = session.participants.get("user1").expect("should succeed");
assert!(participant.cursor_position.is_some());
}
#[tokio::test]
async fn test_collaboration_stats() {
let config = CollaborationConfig::default();
let manager = CollaborationManager::new(config);
let _session1 = manager
.create_shared_session("user1".to_string(), None)
.await
.expect("should succeed");
let stats = manager.get_stats().await;
assert_eq!(stats.active_sessions, 1);
assert_eq!(stats.total_participants, 1);
}
}