1use std::time::SystemTime;
7
8use crate::clip::ClipId;
9use crate::error::{EditError, EditResult};
10
11pub type PeerId = String;
13
14#[derive(Clone, Debug, PartialEq)]
20pub enum EditOpType {
21 MoveClip {
23 clip_id: ClipId,
25 new_start: i64,
27 },
28 TrimClip {
30 clip_id: ClipId,
32 new_in: i64,
34 new_out: i64,
36 },
37 DeleteClip {
39 clip_id: ClipId,
41 },
42 InsertClip {
44 clip_id: ClipId,
46 track_index: usize,
48 start: i64,
50 duration: i64,
52 },
53 NoOp,
55}
56
57#[derive(Clone, Debug)]
63pub struct EditOperation {
64 pub op_id: String,
66 pub peer_id: PeerId,
68 pub revision: u64,
70 pub op_type: EditOpType,
72 pub timestamp: SystemTime,
74}
75
76impl EditOperation {
77 #[must_use]
79 pub fn new(
80 op_id: impl Into<String>,
81 peer_id: PeerId,
82 revision: u64,
83 op_type: EditOpType,
84 ) -> Self {
85 Self {
86 op_id: op_id.into(),
87 peer_id,
88 revision,
89 op_type,
90 timestamp: SystemTime::now(),
91 }
92 }
93
94 #[must_use]
96 pub fn is_noop(&self) -> bool {
97 matches!(self.op_type, EditOpType::NoOp)
98 }
99
100 #[must_use]
102 pub fn affected_clip(&self) -> Option<ClipId> {
103 match &self.op_type {
104 EditOpType::MoveClip { clip_id, .. }
105 | EditOpType::TrimClip { clip_id, .. }
106 | EditOpType::DeleteClip { clip_id }
107 | EditOpType::InsertClip { clip_id, .. } => Some(*clip_id),
108 EditOpType::NoOp => None,
109 }
110 }
111}
112
113#[derive(Debug)]
119pub struct SharedEditState {
120 revision: u64,
122 operations: Vec<EditOperation>,
124 last_modified: SystemTime,
126}
127
128impl SharedEditState {
129 #[must_use]
131 pub fn new() -> Self {
132 Self {
133 revision: 0,
134 operations: Vec::new(),
135 last_modified: SystemTime::now(),
136 }
137 }
138
139 pub fn apply(&mut self, op: &EditOperation) {
141 self.revision += 1;
142 self.operations.push(op.clone());
143 self.last_modified = SystemTime::now();
144 }
145
146 #[must_use]
148 pub fn revision(&self) -> u64 {
149 self.revision
150 }
151
152 #[must_use]
154 pub fn operations(&self) -> &[EditOperation] {
155 &self.operations
156 }
157
158 #[must_use]
160 pub fn last_modified(&self) -> SystemTime {
161 self.last_modified
162 }
163}
164
165impl Default for SharedEditState {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[derive(Clone, Debug)]
177pub enum CollabEditEvent {
178 PeerJoined(PeerId),
180 PeerLeft(PeerId),
182 EditApplied(EditOperation),
184 ConflictResolved {
186 local: EditOperation,
188 remote: EditOperation,
190 result: EditOperation,
192 },
193}
194
195pub struct CollabSession {
204 pub session_id: String,
206 pub peers: Vec<PeerId>,
208 pub shared_state: SharedEditState,
210 pub local_peer: PeerId,
212 pub pending_ops: Vec<EditOperation>,
214 pub acknowledged_ops: Vec<EditOperation>,
216 pending_events: Vec<CollabEditEvent>,
218}
219
220impl CollabSession {
221 #[must_use]
223 pub fn new(session_id: impl Into<String>, local_peer: PeerId) -> Self {
224 Self {
225 session_id: session_id.into(),
226 peers: Vec::new(),
227 shared_state: SharedEditState::new(),
228 local_peer,
229 pending_ops: Vec::new(),
230 acknowledged_ops: Vec::new(),
231 pending_events: Vec::new(),
232 }
233 }
234
235 pub fn add_peer(&mut self, peer: PeerId) {
237 if !self.peers.contains(&peer) {
238 self.pending_events
239 .push(CollabEditEvent::PeerJoined(peer.clone()));
240 self.peers.push(peer);
241 }
242 }
243
244 pub fn remove_peer(&mut self, peer: &PeerId) {
246 if let Some(pos) = self.peers.iter().position(|p| p == peer) {
247 self.pending_events
248 .push(CollabEditEvent::PeerLeft(peer.clone()));
249 self.peers.remove(pos);
250 }
251 }
252
253 pub fn apply_edit(&mut self, op: EditOperation) -> EditResult<Vec<EditOperation>> {
259 let mut op_to_apply = op.clone();
261 let mut conflicts_resolved = false;
262
263 let mut transformed_pending: Vec<EditOperation> = Vec::new();
265 for pending in &self.pending_ops {
266 if pending.revision == op.revision {
267 let resolved = self.ot_transform(pending.clone(), op.clone())?;
268 transformed_pending.push(resolved);
269 conflicts_resolved = true;
270 }
271 }
272
273 if conflicts_resolved {
275 if let Some(last_resolved) = transformed_pending.last() {
276 let result = last_resolved.clone();
278 self.pending_events.push(CollabEditEvent::ConflictResolved {
279 local: self
280 .pending_ops
281 .last()
282 .cloned()
283 .unwrap_or_else(|| op.clone()),
284 remote: op.clone(),
285 result: result.clone(),
286 });
287 op_to_apply = result;
288 }
289 }
290
291 if !op_to_apply.is_noop() {
292 self.shared_state.apply(&op_to_apply);
293 self.acknowledged_ops.push(op_to_apply.clone());
294 self.pending_events
295 .push(CollabEditEvent::EditApplied(op_to_apply.clone()));
296 }
297
298 Ok(vec![op_to_apply])
299 }
300
301 pub fn merge_concurrent(
307 &mut self,
308 local: EditOperation,
309 remote: EditOperation,
310 ) -> EditResult<EditOperation> {
311 self.ot_transform(local, remote)
312 }
313
314 pub fn broadcast(&mut self) -> Vec<CollabEditEvent> {
316 std::mem::take(&mut self.pending_events)
317 }
318
319 #[must_use]
321 pub fn pending_events(&self) -> &[CollabEditEvent] {
322 &self.pending_events
323 }
324
325 pub fn acknowledge_local(&mut self, op_id: &str) -> EditResult<()> {
328 let pos = self
329 .pending_ops
330 .iter()
331 .position(|o| o.op_id == op_id)
332 .ok_or_else(|| EditError::InvalidEdit(format!("op {op_id} not in pending list")))?;
333 let op = self.pending_ops.remove(pos);
334 self.shared_state.apply(&op);
335 self.acknowledged_ops.push(op.clone());
336 self.pending_events.push(CollabEditEvent::EditApplied(op));
337 Ok(())
338 }
339
340 pub fn enqueue_local(&mut self, op: EditOperation) {
342 self.pending_ops.push(op);
343 }
344
345 fn ot_transform(
349 &self,
350 local: EditOperation,
351 remote: EditOperation,
352 ) -> EditResult<EditOperation> {
353 if local.revision != remote.revision {
355 return Ok(remote);
356 }
357
358 let transformed_type = match (&local.op_type, &remote.op_type) {
359 (
361 EditOpType::MoveClip { clip_id: l_id, .. },
362 EditOpType::MoveClip {
363 clip_id: r_id,
364 new_start,
365 },
366 ) if l_id == r_id => EditOpType::MoveClip {
367 clip_id: *r_id,
368 new_start: *new_start,
369 },
370
371 (
373 EditOpType::DeleteClip { clip_id: l_id },
374 EditOpType::MoveClip { clip_id: r_id, .. },
375 ) if l_id == r_id => EditOpType::NoOp,
376
377 (
379 EditOpType::MoveClip { clip_id: l_id, .. },
380 EditOpType::DeleteClip { clip_id: r_id },
381 ) if l_id == r_id => EditOpType::NoOp,
382
383 (
385 EditOpType::TrimClip { clip_id: l_id, .. },
386 EditOpType::TrimClip { clip_id: r_id, .. },
387 ) if l_id == r_id => EditOpType::NoOp,
388
389 (
391 EditOpType::InsertClip { clip_id: l_id, .. },
392 EditOpType::InsertClip {
393 clip_id: r_id,
394 track_index,
395 start,
396 duration,
397 },
398 ) if l_id == r_id => {
399 let new_id = r_id.wrapping_add(0x8000_0000);
401 EditOpType::InsertClip {
402 clip_id: new_id,
403 track_index: *track_index,
404 start: *start,
405 duration: *duration,
406 }
407 }
408
409 _ => remote.op_type.clone(),
411 };
412
413 Ok(EditOperation {
414 op_id: remote.op_id.clone(),
415 peer_id: remote.peer_id.clone(),
416 revision: remote.revision + 1,
417 op_type: transformed_type,
418 timestamp: remote.timestamp,
419 })
420 }
421}
422
423#[cfg(test)]
428mod tests {
429 use super::*;
430
431 fn make_op(id: &str, peer: &str, rev: u64, op_type: EditOpType) -> EditOperation {
432 EditOperation::new(id, peer.to_string(), rev, op_type)
433 }
434
435 #[test]
436 fn test_new_session() {
437 let session = CollabSession::new("session-1", "peer-A".to_string());
438 assert_eq!(session.session_id, "session-1");
439 assert_eq!(session.local_peer, "peer-A");
440 assert_eq!(session.shared_state.revision(), 0);
441 assert!(session.peers.is_empty());
442 }
443
444 #[test]
445 fn test_add_remove_peer() {
446 let mut session = CollabSession::new("s1", "A".to_string());
447 session.add_peer("B".to_string());
448 session.add_peer("C".to_string());
449 assert_eq!(session.peers.len(), 2);
450
451 session.add_peer("B".to_string());
453 assert_eq!(session.peers.len(), 2);
454
455 session.remove_peer(&"B".to_string());
456 assert_eq!(session.peers.len(), 1);
457 assert_eq!(session.peers[0], "C");
458 }
459
460 #[test]
461 fn test_apply_edit_increments_revision() {
462 let mut session = CollabSession::new("s1", "A".to_string());
463 let op = make_op(
464 "op1",
465 "B",
466 0,
467 EditOpType::MoveClip {
468 clip_id: 1,
469 new_start: 500,
470 },
471 );
472 session.apply_edit(op).expect("apply_edit should succeed");
473 assert_eq!(session.shared_state.revision(), 1);
474 }
475
476 #[test]
477 fn test_broadcast_drains_events() {
478 let mut session = CollabSession::new("s1", "A".to_string());
479 session.add_peer("B".to_string());
480 let events = session.broadcast();
481 assert_eq!(events.len(), 1);
482 assert!(matches!(events[0], CollabEditEvent::PeerJoined(_)));
483 assert!(session.broadcast().is_empty());
485 }
486
487 #[test]
488 fn test_ot_delete_wins_over_move() {
489 let mut session = CollabSession::new("s1", "A".to_string());
490 let local = make_op("l1", "A", 0, EditOpType::DeleteClip { clip_id: 42 });
491 let remote = make_op(
492 "r1",
493 "B",
494 0,
495 EditOpType::MoveClip {
496 clip_id: 42,
497 new_start: 1000,
498 },
499 );
500 let result = session.merge_concurrent(local, remote).expect("merge ok");
501 assert!(result.is_noop(), "delete should suppress concurrent move");
502 }
503
504 #[test]
505 fn test_ot_concurrent_trims_suppressed() {
506 let mut session = CollabSession::new("s1", "A".to_string());
507 let local = make_op(
508 "l1",
509 "A",
510 5,
511 EditOpType::TrimClip {
512 clip_id: 7,
513 new_in: 0,
514 new_out: 100,
515 },
516 );
517 let remote = make_op(
518 "r1",
519 "B",
520 5,
521 EditOpType::TrimClip {
522 clip_id: 7,
523 new_in: 10,
524 new_out: 90,
525 },
526 );
527 let result = session.merge_concurrent(local, remote).expect("merge ok");
528 assert!(
529 result.is_noop(),
530 "local trim should suppress concurrent remote trim"
531 );
532 }
533
534 #[test]
535 fn test_ot_non_concurrent_ops_pass_through() {
536 let mut session = CollabSession::new("s1", "A".to_string());
537 let local = make_op(
538 "l1",
539 "A",
540 3,
541 EditOpType::MoveClip {
542 clip_id: 1,
543 new_start: 0,
544 },
545 );
546 let remote = make_op(
547 "r1",
548 "B",
549 5,
550 EditOpType::MoveClip {
551 clip_id: 1,
552 new_start: 200,
553 },
554 );
555 let result = session.merge_concurrent(local, remote).expect("merge ok");
556 assert!(!result.is_noop());
558 assert_eq!(result.revision, 5);
559 }
560
561 #[test]
562 fn test_shared_edit_state_operations() {
563 let mut state = SharedEditState::new();
564 assert_eq!(state.revision(), 0);
565 let op = make_op("o1", "A", 0, EditOpType::NoOp);
566 state.apply(&op);
567 assert_eq!(state.revision(), 1);
568 assert_eq!(state.operations().len(), 1);
569 }
570}