mockforge_chaos/
collaboration.rs

1//! Collaborative editing support for orchestrations
2
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::broadcast;
9
10/// User in a collaboration session
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct CollaborationUser {
13    pub id: String,
14    pub name: String,
15    pub email: String,
16    pub color: String,
17    pub cursor: Option<CursorPosition>,
18    pub active_field: Option<String>,
19    pub joined_at: DateTime<Utc>,
20}
21
22/// Cursor position
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct CursorPosition {
25    pub x: i32,
26    pub y: i32,
27}
28
29/// Collaboration change
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CollaborationChange {
32    pub id: String,
33    pub user_id: String,
34    pub timestamp: DateTime<Utc>,
35    pub change_type: ChangeType,
36    pub path: String,
37    pub value: serde_json::Value,
38    pub previous_value: Option<serde_json::Value>,
39}
40
41/// Type of change
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "lowercase")]
44pub enum ChangeType {
45    Insert,
46    Update,
47    Delete,
48}
49
50/// Collaboration session
51#[derive(Debug)]
52pub struct CollaborationSession {
53    pub orchestration_id: String,
54    pub users: Arc<RwLock<HashMap<String, CollaborationUser>>>,
55    pub changes: Arc<RwLock<Vec<CollaborationChange>>>,
56    pub broadcast_tx: broadcast::Sender<CollaborationMessage>,
57    pub created_at: DateTime<Utc>,
58}
59
60/// Collaboration message
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type")]
63pub enum CollaborationMessage {
64    #[serde(rename = "user_joined")]
65    UserJoined { data: UserJoinedData },
66    #[serde(rename = "user_left")]
67    UserLeft { data: UserLeftData },
68    #[serde(rename = "user_presence")]
69    UserPresence { data: UserPresenceData },
70    #[serde(rename = "change")]
71    Change { data: ChangeData },
72    #[serde(rename = "sync")]
73    Sync { data: SyncData },
74    #[serde(rename = "conflict")]
75    Conflict { data: ConflictData },
76    #[serde(rename = "users_list")]
77    UsersList { data: UsersListData },
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct UserJoinedData {
82    pub user: CollaborationUser,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct UserLeftData {
87    pub user_id: String,
88    pub user_name: String,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct UserPresenceData {
93    pub user_id: String,
94    pub cursor: Option<CursorPosition>,
95    pub active_field: Option<String>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct ChangeData {
100    pub change: CollaborationChange,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct SyncData {
105    pub value: serde_json::Value,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ConflictData {
110    pub message: String,
111    pub conflicting_changes: Vec<String>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct UsersListData {
116    pub users: Vec<CollaborationUser>,
117}
118
119impl CollaborationSession {
120    /// Create a new collaboration session
121    pub fn new(orchestration_id: String) -> Self {
122        let (broadcast_tx, _) = broadcast::channel(100);
123
124        Self {
125            orchestration_id,
126            users: Arc::new(RwLock::new(HashMap::new())),
127            changes: Arc::new(RwLock::new(Vec::new())),
128            broadcast_tx,
129            created_at: Utc::now(),
130        }
131    }
132
133    /// Add a user to the session
134    pub fn add_user(&self, user: CollaborationUser) -> Result<(), String> {
135        let mut users = self.users.write();
136        users.insert(user.id.clone(), user.clone());
137
138        // Broadcast user joined
139        let _ = self.broadcast_tx.send(CollaborationMessage::UserJoined {
140            data: UserJoinedData { user },
141        });
142
143        Ok(())
144    }
145
146    /// Remove a user from the session
147    pub fn remove_user(&self, user_id: &str) -> Result<(), String> {
148        let mut users = self.users.write();
149
150        if let Some(user) = users.remove(user_id) {
151            // Broadcast user left
152            let _ = self.broadcast_tx.send(CollaborationMessage::UserLeft {
153                data: UserLeftData {
154                    user_id: user_id.to_string(),
155                    user_name: user.name,
156                },
157            });
158        }
159
160        Ok(())
161    }
162
163    /// Update user presence
164    pub fn update_presence(
165        &self,
166        user_id: &str,
167        cursor: Option<CursorPosition>,
168        active_field: Option<String>,
169    ) -> Result<(), String> {
170        let mut users = self.users.write();
171
172        if let Some(user) = users.get_mut(user_id) {
173            user.cursor = cursor.clone();
174            user.active_field = active_field.clone();
175
176            // Broadcast presence update
177            let _ = self.broadcast_tx.send(CollaborationMessage::UserPresence {
178                data: UserPresenceData {
179                    user_id: user_id.to_string(),
180                    cursor,
181                    active_field,
182                },
183            });
184        }
185
186        Ok(())
187    }
188
189    /// Apply a change
190    pub fn apply_change(&self, change: CollaborationChange) -> Result<(), String> {
191        // Check for conflicts
192        let changes = self.changes.read();
193        let conflicts: Vec<_> = changes
194            .iter()
195            .filter(|c| {
196                c.path == change.path
197                    && c.user_id != change.user_id
198                    && c.timestamp
199                        > change
200                            .timestamp
201                            .checked_sub_signed(chrono::Duration::seconds(5))
202                            .unwrap_or(change.timestamp)
203            })
204            .map(|c| c.id.clone())
205            .collect();
206
207        drop(changes);
208
209        if !conflicts.is_empty() {
210            // Broadcast conflict
211            let _ = self.broadcast_tx.send(CollaborationMessage::Conflict {
212                data: ConflictData {
213                    message: format!("Conflict detected in path: {}", change.path),
214                    conflicting_changes: conflicts,
215                },
216            });
217        }
218
219        // Store change
220        let mut changes = self.changes.write();
221        changes.push(change.clone());
222
223        // Broadcast change
224        let _ = self.broadcast_tx.send(CollaborationMessage::Change {
225            data: ChangeData { change },
226        });
227
228        Ok(())
229    }
230
231    /// Get all active users
232    pub fn get_users(&self) -> Result<Vec<CollaborationUser>, String> {
233        let users = self.users.read();
234        Ok(users.values().cloned().collect())
235    }
236
237    /// Get change history
238    pub fn get_changes(&self) -> Result<Vec<CollaborationChange>, String> {
239        let changes = self.changes.read();
240        Ok(changes.clone())
241    }
242
243    /// Subscribe to updates
244    pub fn subscribe(&self) -> broadcast::Receiver<CollaborationMessage> {
245        self.broadcast_tx.subscribe()
246    }
247}
248
249/// Collaboration manager
250pub struct CollaborationManager {
251    sessions: Arc<RwLock<HashMap<String, Arc<CollaborationSession>>>>,
252}
253
254impl CollaborationManager {
255    /// Create a new collaboration manager
256    pub fn new() -> Self {
257        Self {
258            sessions: Arc::new(RwLock::new(HashMap::new())),
259        }
260    }
261
262    /// Get or create a session
263    pub fn get_or_create_session(
264        &self,
265        orchestration_id: &str,
266    ) -> Result<Arc<CollaborationSession>, String> {
267        let mut sessions = self.sessions.write();
268
269        if let Some(session) = sessions.get(orchestration_id) {
270            Ok(Arc::clone(session))
271        } else {
272            let session = Arc::new(CollaborationSession::new(orchestration_id.to_string()));
273            sessions.insert(orchestration_id.to_string(), Arc::clone(&session));
274            Ok(session)
275        }
276    }
277
278    /// Remove a session
279    pub fn remove_session(&self, orchestration_id: &str) -> Result<(), String> {
280        let mut sessions = self.sessions.write();
281        sessions.remove(orchestration_id);
282        Ok(())
283    }
284
285    /// Get active sessions count
286    pub fn active_sessions_count(&self) -> usize {
287        self.sessions.read().len()
288    }
289}
290
291impl Default for CollaborationManager {
292    fn default() -> Self {
293        Self::new()
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn test_session_creation() {
303        let session = CollaborationSession::new("test-orch".to_string());
304        assert_eq!(session.orchestration_id, "test-orch");
305        assert_eq!(session.get_users().unwrap().len(), 0);
306    }
307
308    #[test]
309    fn test_add_user() {
310        let session = CollaborationSession::new("test-orch".to_string());
311        let user = CollaborationUser {
312            id: "user1".to_string(),
313            name: "Test User".to_string(),
314            email: "test@example.com".to_string(),
315            color: "#FF0000".to_string(),
316            cursor: None,
317            active_field: None,
318            joined_at: Utc::now(),
319        };
320
321        session.add_user(user).unwrap();
322        assert_eq!(session.get_users().unwrap().len(), 1);
323    }
324
325    #[test]
326    fn test_remove_user() {
327        let session = CollaborationSession::new("test-orch".to_string());
328        let user = CollaborationUser {
329            id: "user1".to_string(),
330            name: "Test User".to_string(),
331            email: "test@example.com".to_string(),
332            color: "#FF0000".to_string(),
333            cursor: None,
334            active_field: None,
335            joined_at: Utc::now(),
336        };
337
338        session.add_user(user).unwrap();
339        session.remove_user("user1").unwrap();
340        assert_eq!(session.get_users().unwrap().len(), 0);
341    }
342
343    #[test]
344    fn test_manager() {
345        let manager = CollaborationManager::new();
346        let session1 = manager.get_or_create_session("orch1").unwrap();
347        let session2 = manager.get_or_create_session("orch1").unwrap();
348
349        assert!(Arc::ptr_eq(&session1, &session2));
350        assert_eq!(manager.active_sessions_count(), 1);
351    }
352}