1use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::broadcast;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct CollaborationUser {
13 pub id: String,
14 pub name: String,
15 pub email: String,
16 pub color: String,
17 pub cursor: Option<CursorPosition>,
18 pub active_field: Option<String>,
19 pub joined_at: DateTime<Utc>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct CursorPosition {
25 pub x: i32,
26 pub y: i32,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CollaborationChange {
32 pub id: String,
33 pub user_id: String,
34 pub timestamp: DateTime<Utc>,
35 pub change_type: ChangeType,
36 pub path: String,
37 pub value: serde_json::Value,
38 pub previous_value: Option<serde_json::Value>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "lowercase")]
44pub enum ChangeType {
45 Insert,
46 Update,
47 Delete,
48}
49
50#[derive(Debug)]
52pub struct CollaborationSession {
53 pub orchestration_id: String,
54 pub users: Arc<RwLock<HashMap<String, CollaborationUser>>>,
55 pub changes: Arc<RwLock<Vec<CollaborationChange>>>,
56 pub broadcast_tx: broadcast::Sender<CollaborationMessage>,
57 pub created_at: DateTime<Utc>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type")]
63pub enum CollaborationMessage {
64 #[serde(rename = "user_joined")]
65 UserJoined { data: UserJoinedData },
66 #[serde(rename = "user_left")]
67 UserLeft { data: UserLeftData },
68 #[serde(rename = "user_presence")]
69 UserPresence { data: UserPresenceData },
70 #[serde(rename = "change")]
71 Change { data: ChangeData },
72 #[serde(rename = "sync")]
73 Sync { data: SyncData },
74 #[serde(rename = "conflict")]
75 Conflict { data: ConflictData },
76 #[serde(rename = "users_list")]
77 UsersList { data: UsersListData },
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct UserJoinedData {
82 pub user: CollaborationUser,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct UserLeftData {
87 pub user_id: String,
88 pub user_name: String,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct UserPresenceData {
93 pub user_id: String,
94 pub cursor: Option<CursorPosition>,
95 pub active_field: Option<String>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct ChangeData {
100 pub change: CollaborationChange,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct SyncData {
105 pub value: serde_json::Value,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ConflictData {
110 pub message: String,
111 pub conflicting_changes: Vec<String>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct UsersListData {
116 pub users: Vec<CollaborationUser>,
117}
118
119impl CollaborationSession {
120 pub fn new(orchestration_id: String) -> Self {
122 let (broadcast_tx, _) = broadcast::channel(100);
123
124 Self {
125 orchestration_id,
126 users: Arc::new(RwLock::new(HashMap::new())),
127 changes: Arc::new(RwLock::new(Vec::new())),
128 broadcast_tx,
129 created_at: Utc::now(),
130 }
131 }
132
133 pub fn add_user(&self, user: CollaborationUser) -> Result<(), String> {
135 let mut users = self.users.write();
136 users.insert(user.id.clone(), user.clone());
137
138 let _ = self.broadcast_tx.send(CollaborationMessage::UserJoined {
140 data: UserJoinedData { user },
141 });
142
143 Ok(())
144 }
145
146 pub fn remove_user(&self, user_id: &str) -> Result<(), String> {
148 let mut users = self.users.write();
149
150 if let Some(user) = users.remove(user_id) {
151 let _ = self.broadcast_tx.send(CollaborationMessage::UserLeft {
153 data: UserLeftData {
154 user_id: user_id.to_string(),
155 user_name: user.name,
156 },
157 });
158 }
159
160 Ok(())
161 }
162
163 pub fn update_presence(
165 &self,
166 user_id: &str,
167 cursor: Option<CursorPosition>,
168 active_field: Option<String>,
169 ) -> Result<(), String> {
170 let mut users = self.users.write();
171
172 if let Some(user) = users.get_mut(user_id) {
173 user.cursor = cursor.clone();
174 user.active_field = active_field.clone();
175
176 let _ = self.broadcast_tx.send(CollaborationMessage::UserPresence {
178 data: UserPresenceData {
179 user_id: user_id.to_string(),
180 cursor,
181 active_field,
182 },
183 });
184 }
185
186 Ok(())
187 }
188
189 pub fn apply_change(&self, change: CollaborationChange) -> Result<(), String> {
191 let changes = self.changes.read();
193 let conflicts: Vec<_> = changes
194 .iter()
195 .filter(|c| {
196 c.path == change.path
197 && c.user_id != change.user_id
198 && c.timestamp
199 > change
200 .timestamp
201 .checked_sub_signed(chrono::Duration::seconds(5))
202 .unwrap_or(change.timestamp)
203 })
204 .map(|c| c.id.clone())
205 .collect();
206
207 drop(changes);
208
209 if !conflicts.is_empty() {
210 let _ = self.broadcast_tx.send(CollaborationMessage::Conflict {
212 data: ConflictData {
213 message: format!("Conflict detected in path: {}", change.path),
214 conflicting_changes: conflicts,
215 },
216 });
217 }
218
219 let mut changes = self.changes.write();
221 changes.push(change.clone());
222
223 let _ = self.broadcast_tx.send(CollaborationMessage::Change {
225 data: ChangeData { change },
226 });
227
228 Ok(())
229 }
230
231 pub fn get_users(&self) -> Result<Vec<CollaborationUser>, String> {
233 let users = self.users.read();
234 Ok(users.values().cloned().collect())
235 }
236
237 pub fn get_changes(&self) -> Result<Vec<CollaborationChange>, String> {
239 let changes = self.changes.read();
240 Ok(changes.clone())
241 }
242
243 pub fn subscribe(&self) -> broadcast::Receiver<CollaborationMessage> {
245 self.broadcast_tx.subscribe()
246 }
247}
248
249pub struct CollaborationManager {
251 sessions: Arc<RwLock<HashMap<String, Arc<CollaborationSession>>>>,
252}
253
254impl CollaborationManager {
255 pub fn new() -> Self {
257 Self {
258 sessions: Arc::new(RwLock::new(HashMap::new())),
259 }
260 }
261
262 pub fn get_or_create_session(
264 &self,
265 orchestration_id: &str,
266 ) -> Result<Arc<CollaborationSession>, String> {
267 let mut sessions = self.sessions.write();
268
269 if let Some(session) = sessions.get(orchestration_id) {
270 Ok(Arc::clone(session))
271 } else {
272 let session = Arc::new(CollaborationSession::new(orchestration_id.to_string()));
273 sessions.insert(orchestration_id.to_string(), Arc::clone(&session));
274 Ok(session)
275 }
276 }
277
278 pub fn remove_session(&self, orchestration_id: &str) -> Result<(), String> {
280 let mut sessions = self.sessions.write();
281 sessions.remove(orchestration_id);
282 Ok(())
283 }
284
285 pub fn active_sessions_count(&self) -> usize {
287 self.sessions.read().len()
288 }
289}
290
291impl Default for CollaborationManager {
292 fn default() -> Self {
293 Self::new()
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn test_session_creation() {
303 let session = CollaborationSession::new("test-orch".to_string());
304 assert_eq!(session.orchestration_id, "test-orch");
305 assert_eq!(session.get_users().unwrap().len(), 0);
306 }
307
308 #[test]
309 fn test_add_user() {
310 let session = CollaborationSession::new("test-orch".to_string());
311 let user = CollaborationUser {
312 id: "user1".to_string(),
313 name: "Test User".to_string(),
314 email: "test@example.com".to_string(),
315 color: "#FF0000".to_string(),
316 cursor: None,
317 active_field: None,
318 joined_at: Utc::now(),
319 };
320
321 session.add_user(user).unwrap();
322 assert_eq!(session.get_users().unwrap().len(), 1);
323 }
324
325 #[test]
326 fn test_remove_user() {
327 let session = CollaborationSession::new("test-orch".to_string());
328 let user = CollaborationUser {
329 id: "user1".to_string(),
330 name: "Test User".to_string(),
331 email: "test@example.com".to_string(),
332 color: "#FF0000".to_string(),
333 cursor: None,
334 active_field: None,
335 joined_at: Utc::now(),
336 };
337
338 session.add_user(user).unwrap();
339 session.remove_user("user1").unwrap();
340 assert_eq!(session.get_users().unwrap().len(), 0);
341 }
342
343 #[test]
344 fn test_manager() {
345 let manager = CollaborationManager::new();
346 let session1 = manager.get_or_create_session("orch1").unwrap();
347 let session2 = manager.get_or_create_session("orch1").unwrap();
348
349 assert!(Arc::ptr_eq(&session1, &session2));
350 assert_eq!(manager.active_sessions_count(), 1);
351 }
352}