use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollaborationUser {
pub id: String,
pub name: String,
pub email: String,
pub color: String,
pub cursor: Option<CursorPosition>,
pub active_field: Option<String>,
pub joined_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CursorPosition {
pub x: i32,
pub y: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollaborationChange {
pub id: String,
pub user_id: String,
pub timestamp: DateTime<Utc>,
pub change_type: ChangeType,
pub path: String,
pub value: serde_json::Value,
pub previous_value: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ChangeType {
Insert,
Update,
Delete,
}
#[derive(Debug)]
pub struct CollaborationSession {
pub orchestration_id: String,
pub users: Arc<RwLock<HashMap<String, CollaborationUser>>>,
pub changes: Arc<RwLock<Vec<CollaborationChange>>>,
pub broadcast_tx: broadcast::Sender<CollaborationMessage>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum CollaborationMessage {
#[serde(rename = "user_joined")]
UserJoined { data: UserJoinedData },
#[serde(rename = "user_left")]
UserLeft { data: UserLeftData },
#[serde(rename = "user_presence")]
UserPresence { data: UserPresenceData },
#[serde(rename = "change")]
Change { data: ChangeData },
#[serde(rename = "sync")]
Sync { data: SyncData },
#[serde(rename = "conflict")]
Conflict { data: ConflictData },
#[serde(rename = "users_list")]
UsersList { data: UsersListData },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserJoinedData {
pub user: CollaborationUser,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserLeftData {
pub user_id: String,
pub user_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserPresenceData {
pub user_id: String,
pub cursor: Option<CursorPosition>,
pub active_field: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeData {
pub change: CollaborationChange,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncData {
pub value: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConflictData {
pub message: String,
pub conflicting_changes: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsersListData {
pub users: Vec<CollaborationUser>,
}
impl CollaborationSession {
pub fn new(orchestration_id: String) -> Self {
let (broadcast_tx, _) = broadcast::channel(100);
Self {
orchestration_id,
users: Arc::new(RwLock::new(HashMap::new())),
changes: Arc::new(RwLock::new(Vec::new())),
broadcast_tx,
created_at: Utc::now(),
}
}
pub fn add_user(&self, user: CollaborationUser) -> Result<(), String> {
let mut users = self.users.write();
users.insert(user.id.clone(), user.clone());
let _ = self.broadcast_tx.send(CollaborationMessage::UserJoined {
data: UserJoinedData { user },
});
Ok(())
}
pub fn remove_user(&self, user_id: &str) -> Result<(), String> {
let mut users = self.users.write();
if let Some(user) = users.remove(user_id) {
let _ = self.broadcast_tx.send(CollaborationMessage::UserLeft {
data: UserLeftData {
user_id: user_id.to_string(),
user_name: user.name,
},
});
}
Ok(())
}
pub fn update_presence(
&self,
user_id: &str,
cursor: Option<CursorPosition>,
active_field: Option<String>,
) -> Result<(), String> {
let mut users = self.users.write();
if let Some(user) = users.get_mut(user_id) {
user.cursor = cursor.clone();
user.active_field = active_field.clone();
let _ = self.broadcast_tx.send(CollaborationMessage::UserPresence {
data: UserPresenceData {
user_id: user_id.to_string(),
cursor,
active_field,
},
});
}
Ok(())
}
pub fn apply_change(&self, change: CollaborationChange) -> Result<(), String> {
let changes = self.changes.read();
let conflicts: Vec<_> = changes
.iter()
.filter(|c| {
c.path == change.path
&& c.user_id != change.user_id
&& c.timestamp
> change
.timestamp
.checked_sub_signed(chrono::Duration::seconds(5))
.unwrap_or(change.timestamp)
})
.map(|c| c.id.clone())
.collect();
drop(changes);
if !conflicts.is_empty() {
let _ = self.broadcast_tx.send(CollaborationMessage::Conflict {
data: ConflictData {
message: format!("Conflict detected in path: {}", change.path),
conflicting_changes: conflicts,
},
});
}
let mut changes = self.changes.write();
changes.push(change.clone());
let _ = self.broadcast_tx.send(CollaborationMessage::Change {
data: ChangeData { change },
});
Ok(())
}
pub fn get_users(&self) -> Result<Vec<CollaborationUser>, String> {
let users = self.users.read();
Ok(users.values().cloned().collect())
}
pub fn get_changes(&self) -> Result<Vec<CollaborationChange>, String> {
let changes = self.changes.read();
Ok(changes.clone())
}
pub fn subscribe(&self) -> broadcast::Receiver<CollaborationMessage> {
self.broadcast_tx.subscribe()
}
}
pub struct CollaborationManager {
sessions: Arc<RwLock<HashMap<String, Arc<CollaborationSession>>>>,
}
impl CollaborationManager {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn get_or_create_session(
&self,
orchestration_id: &str,
) -> Result<Arc<CollaborationSession>, String> {
let mut sessions = self.sessions.write();
if let Some(session) = sessions.get(orchestration_id) {
Ok(Arc::clone(session))
} else {
let session = Arc::new(CollaborationSession::new(orchestration_id.to_string()));
sessions.insert(orchestration_id.to_string(), Arc::clone(&session));
Ok(session)
}
}
pub fn remove_session(&self, orchestration_id: &str) -> Result<(), String> {
let mut sessions = self.sessions.write();
sessions.remove(orchestration_id);
Ok(())
}
pub fn active_sessions_count(&self) -> usize {
self.sessions.read().len()
}
}
impl Default for CollaborationManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_creation() {
let session = CollaborationSession::new("test-orch".to_string());
assert_eq!(session.orchestration_id, "test-orch");
assert_eq!(session.get_users().unwrap().len(), 0);
}
#[test]
fn test_add_user() {
let session = CollaborationSession::new("test-orch".to_string());
let user = CollaborationUser {
id: "user1".to_string(),
name: "Test User".to_string(),
email: "test@example.com".to_string(),
color: "#FF0000".to_string(),
cursor: None,
active_field: None,
joined_at: Utc::now(),
};
session.add_user(user).unwrap();
assert_eq!(session.get_users().unwrap().len(), 1);
}
#[test]
fn test_remove_user() {
let session = CollaborationSession::new("test-orch".to_string());
let user = CollaborationUser {
id: "user1".to_string(),
name: "Test User".to_string(),
email: "test@example.com".to_string(),
color: "#FF0000".to_string(),
cursor: None,
active_field: None,
joined_at: Utc::now(),
};
session.add_user(user).unwrap();
session.remove_user("user1").unwrap();
assert_eq!(session.get_users().unwrap().len(), 0);
}
#[test]
fn test_manager() {
let manager = CollaborationManager::new();
let session1 = manager.get_or_create_session("orch1").unwrap();
let session2 = manager.get_or_create_session("orch1").unwrap();
assert!(Arc::ptr_eq(&session1, &session2));
assert_eq!(manager.active_sessions_count(), 1);
}
}