Skip to main content

oximedia_edit/
collab_edit.rs

1//! Collaborative editing support for the timeline editor.
2//!
3//! Provides operational-transform-based concurrent edit merging across multiple
4//! peers sharing a single timeline session.
5
6use std::time::SystemTime;
7
8use crate::clip::ClipId;
9use crate::error::{EditError, EditResult};
10
11/// Unique identifier for a collaborating peer.
12pub type PeerId = String;
13
14// ─────────────────────────────────────────────────────────────────────────────
15// EditOpType
16// ─────────────────────────────────────────────────────────────────────────────
17
18/// The kind of atomic edit operation applied to the shared timeline.
19#[derive(Clone, Debug, PartialEq)]
20pub enum EditOpType {
21    /// Move a clip to a new timeline start position.
22    MoveClip {
23        /// Target clip.
24        clip_id: ClipId,
25        /// New timeline start position (timebase units).
26        new_start: i64,
27    },
28    /// Trim a clip's source in/out points.
29    TrimClip {
30        /// Target clip.
31        clip_id: ClipId,
32        /// New source-in point.
33        new_in: i64,
34        /// New source-out point.
35        new_out: i64,
36    },
37    /// Delete a clip from the timeline.
38    DeleteClip {
39        /// Target clip.
40        clip_id: ClipId,
41    },
42    /// Insert a new clip at the given track and position.
43    InsertClip {
44        /// New clip identifier.
45        clip_id: ClipId,
46        /// Target track index.
47        track_index: usize,
48        /// Timeline start position.
49        start: i64,
50        /// Clip duration.
51        duration: i64,
52    },
53    /// No-op placeholder used when an operation is suppressed by OT.
54    NoOp,
55}
56
57// ─────────────────────────────────────────────────────────────────────────────
58// EditOperation
59// ─────────────────────────────────────────────────────────────────────────────
60
61/// A single atomic edit operation carrying authorship and revision metadata.
62#[derive(Clone, Debug)]
63pub struct EditOperation {
64    /// Globally unique operation identifier.
65    pub op_id: String,
66    /// The peer that originated this operation.
67    pub peer_id: PeerId,
68    /// Revision of the shared state this operation was generated against.
69    pub revision: u64,
70    /// The actual edit to apply.
71    pub op_type: EditOpType,
72    /// Wall-clock time at which the operation was created.
73    pub timestamp: SystemTime,
74}
75
76impl EditOperation {
77    /// Create a new edit operation.
78    #[must_use]
79    pub fn new(
80        op_id: impl Into<String>,
81        peer_id: PeerId,
82        revision: u64,
83        op_type: EditOpType,
84    ) -> Self {
85        Self {
86            op_id: op_id.into(),
87            peer_id,
88            revision,
89            op_type,
90            timestamp: SystemTime::now(),
91        }
92    }
93
94    /// Returns `true` when this operation is a no-op.
95    #[must_use]
96    pub fn is_noop(&self) -> bool {
97        matches!(self.op_type, EditOpType::NoOp)
98    }
99
100    /// Clip ID targeted by this operation, if any.
101    #[must_use]
102    pub fn affected_clip(&self) -> Option<ClipId> {
103        match &self.op_type {
104            EditOpType::MoveClip { clip_id, .. }
105            | EditOpType::TrimClip { clip_id, .. }
106            | EditOpType::DeleteClip { clip_id }
107            | EditOpType::InsertClip { clip_id, .. } => Some(*clip_id),
108            EditOpType::NoOp => None,
109        }
110    }
111}
112
113// ─────────────────────────────────────────────────────────────────────────────
114// SharedEditState
115// ─────────────────────────────────────────────────────────────────────────────
116
117/// Shared state tracking the authoritative operation log and revision counter.
118#[derive(Debug)]
119pub struct SharedEditState {
120    /// Monotonically increasing revision number.
121    revision: u64,
122    /// Committed operations in order.
123    operations: Vec<EditOperation>,
124    /// Wall-clock time of the last applied operation.
125    last_modified: SystemTime,
126}
127
128impl SharedEditState {
129    /// Create a fresh shared state at revision 0.
130    #[must_use]
131    pub fn new() -> Self {
132        Self {
133            revision: 0,
134            operations: Vec::new(),
135            last_modified: SystemTime::now(),
136        }
137    }
138
139    /// Apply an operation, advancing the revision.
140    pub fn apply(&mut self, op: &EditOperation) {
141        self.revision += 1;
142        self.operations.push(op.clone());
143        self.last_modified = SystemTime::now();
144    }
145
146    /// Current revision number.
147    #[must_use]
148    pub fn revision(&self) -> u64 {
149        self.revision
150    }
151
152    /// Committed operation log.
153    #[must_use]
154    pub fn operations(&self) -> &[EditOperation] {
155        &self.operations
156    }
157
158    /// Wall-clock time of the most recent applied operation.
159    #[must_use]
160    pub fn last_modified(&self) -> SystemTime {
161        self.last_modified
162    }
163}
164
165impl Default for SharedEditState {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171// ─────────────────────────────────────────────────────────────────────────────
172// CollabEditEvent
173// ─────────────────────────────────────────────────────────────────────────────
174
175/// Events emitted by a [`CollabSession`] to notify subscribers of activity.
176#[derive(Clone, Debug)]
177pub enum CollabEditEvent {
178    /// A new peer joined the session.
179    PeerJoined(PeerId),
180    /// A peer left the session.
181    PeerLeft(PeerId),
182    /// An operation was successfully applied to the shared state.
183    EditApplied(EditOperation),
184    /// Two concurrent operations were merged by operational transform.
185    ConflictResolved {
186        /// The local operation.
187        local: EditOperation,
188        /// The incoming remote operation.
189        remote: EditOperation,
190        /// The resolved operation that was actually applied.
191        result: EditOperation,
192    },
193}
194
195// ─────────────────────────────────────────────────────────────────────────────
196// CollabSession
197// ─────────────────────────────────────────────────────────────────────────────
198
199/// A collaborative editing session shared between multiple peers.
200///
201/// Uses operational transform (OT) to merge concurrent edits that were
202/// generated against the same revision of the shared state.
203pub struct CollabSession {
204    /// Human-readable session identifier.
205    pub session_id: String,
206    /// Connected peers.
207    pub peers: Vec<PeerId>,
208    /// Authoritative shared edit state.
209    pub shared_state: SharedEditState,
210    /// The peer representing the local user.
211    pub local_peer: PeerId,
212    /// Operations submitted locally but not yet acknowledged by the server.
213    pub pending_ops: Vec<EditOperation>,
214    /// Operations that have been applied to the shared state.
215    pub acknowledged_ops: Vec<EditOperation>,
216    /// Pending events to be consumed by the caller.
217    pending_events: Vec<CollabEditEvent>,
218}
219
220impl CollabSession {
221    /// Create a new collaborative session.
222    #[must_use]
223    pub fn new(session_id: impl Into<String>, local_peer: PeerId) -> Self {
224        Self {
225            session_id: session_id.into(),
226            peers: Vec::new(),
227            shared_state: SharedEditState::new(),
228            local_peer,
229            pending_ops: Vec::new(),
230            acknowledged_ops: Vec::new(),
231            pending_events: Vec::new(),
232        }
233    }
234
235    /// Add a peer to the session.
236    pub fn add_peer(&mut self, peer: PeerId) {
237        if !self.peers.contains(&peer) {
238            self.pending_events
239                .push(CollabEditEvent::PeerJoined(peer.clone()));
240            self.peers.push(peer);
241        }
242    }
243
244    /// Remove a peer from the session.
245    pub fn remove_peer(&mut self, peer: &PeerId) {
246        if let Some(pos) = self.peers.iter().position(|p| p == peer) {
247            self.pending_events
248                .push(CollabEditEvent::PeerLeft(peer.clone()));
249            self.peers.remove(pos);
250        }
251    }
252
253    /// Apply an incoming operation to the shared state.
254    ///
255    /// If the incoming operation was generated against the same revision as any
256    /// pending local operations, OT is applied to resolve the conflict.
257    /// Returns the list of transformed operations actually committed.
258    pub fn apply_edit(&mut self, op: EditOperation) -> EditResult<Vec<EditOperation>> {
259        // OT: if there are pending local ops at the same revision, transform them
260        let mut op_to_apply = op.clone();
261        let mut conflicts_resolved = false;
262
263        // Collect indices of pending ops at the same revision to transform
264        let mut transformed_pending: Vec<EditOperation> = Vec::new();
265        for pending in &self.pending_ops {
266            if pending.revision == op.revision {
267                let resolved = self.ot_transform(pending.clone(), op.clone())?;
268                transformed_pending.push(resolved);
269                conflicts_resolved = true;
270            }
271        }
272
273        // If we resolved conflicts, synthesise a merged op
274        if conflicts_resolved {
275            if let Some(last_resolved) = transformed_pending.last() {
276                // The remote op becomes the result of the last resolution
277                let result = last_resolved.clone();
278                self.pending_events.push(CollabEditEvent::ConflictResolved {
279                    local: self
280                        .pending_ops
281                        .last()
282                        .cloned()
283                        .unwrap_or_else(|| op.clone()),
284                    remote: op.clone(),
285                    result: result.clone(),
286                });
287                op_to_apply = result;
288            }
289        }
290
291        if !op_to_apply.is_noop() {
292            self.shared_state.apply(&op_to_apply);
293            self.acknowledged_ops.push(op_to_apply.clone());
294            self.pending_events
295                .push(CollabEditEvent::EditApplied(op_to_apply.clone()));
296        }
297
298        Ok(vec![op_to_apply])
299    }
300
301    /// Merge two concurrent operations using operational transform.
302    ///
303    /// Both `local` and `remote` must have been generated against the same
304    /// revision.  Returns the transformed remote operation that can be applied
305    /// on top of the local one.
306    pub fn merge_concurrent(
307        &mut self,
308        local: EditOperation,
309        remote: EditOperation,
310    ) -> EditResult<EditOperation> {
311        self.ot_transform(local, remote)
312    }
313
314    /// Drain and return pending events.
315    pub fn broadcast(&mut self) -> Vec<CollabEditEvent> {
316        std::mem::take(&mut self.pending_events)
317    }
318
319    /// Peek at pending events without draining them.
320    #[must_use]
321    pub fn pending_events(&self) -> &[CollabEditEvent] {
322        &self.pending_events
323    }
324
325    /// Acknowledge a locally-generated operation (moves it from pending to
326    /// acknowledged and commits it to the shared state).
327    pub fn acknowledge_local(&mut self, op_id: &str) -> EditResult<()> {
328        let pos = self
329            .pending_ops
330            .iter()
331            .position(|o| o.op_id == op_id)
332            .ok_or_else(|| EditError::InvalidEdit(format!("op {op_id} not in pending list")))?;
333        let op = self.pending_ops.remove(pos);
334        self.shared_state.apply(&op);
335        self.acknowledged_ops.push(op.clone());
336        self.pending_events.push(CollabEditEvent::EditApplied(op));
337        Ok(())
338    }
339
340    /// Enqueue a locally-generated operation (does not apply it yet).
341    pub fn enqueue_local(&mut self, op: EditOperation) {
342        self.pending_ops.push(op);
343    }
344
345    // ── Operational-transform core ────────────────────────────────────────────
346
347    /// Core OT function: transform `remote` so it can be applied after `local`.
348    fn ot_transform(
349        &self,
350        local: EditOperation,
351        remote: EditOperation,
352    ) -> EditResult<EditOperation> {
353        // Only OT for concurrent ops (same base revision)
354        if local.revision != remote.revision {
355            return Ok(remote);
356        }
357
358        let transformed_type = match (&local.op_type, &remote.op_type) {
359            // Two moves on the same clip: remote wins (idempotent last-writer-wins)
360            (
361                EditOpType::MoveClip { clip_id: l_id, .. },
362                EditOpType::MoveClip {
363                    clip_id: r_id,
364                    new_start,
365                },
366            ) if l_id == r_id => EditOpType::MoveClip {
367                clip_id: *r_id,
368                new_start: *new_start,
369            },
370
371            // Delete beats Move on same clip — suppress the Move
372            (
373                EditOpType::DeleteClip { clip_id: l_id },
374                EditOpType::MoveClip { clip_id: r_id, .. },
375            ) if l_id == r_id => EditOpType::NoOp,
376
377            // Move beats Delete: suppress the Delete if local already moved it
378            (
379                EditOpType::MoveClip { clip_id: l_id, .. },
380                EditOpType::DeleteClip { clip_id: r_id },
381            ) if l_id == r_id => EditOpType::NoOp,
382
383            // Concurrent trims on same clip: local wins, suppress remote
384            (
385                EditOpType::TrimClip { clip_id: l_id, .. },
386                EditOpType::TrimClip { clip_id: r_id, .. },
387            ) if l_id == r_id => EditOpType::NoOp,
388
389            // InsertClip id collision: bump the incoming clip_id via a simple hash
390            (
391                EditOpType::InsertClip { clip_id: l_id, .. },
392                EditOpType::InsertClip {
393                    clip_id: r_id,
394                    track_index,
395                    start,
396                    duration,
397                },
398            ) if l_id == r_id => {
399                // Use a deterministic but distinct id
400                let new_id = r_id.wrapping_add(0x8000_0000);
401                EditOpType::InsertClip {
402                    clip_id: new_id,
403                    track_index: *track_index,
404                    start: *start,
405                    duration: *duration,
406                }
407            }
408
409            // Default: compose (keep remote as-is)
410            _ => remote.op_type.clone(),
411        };
412
413        Ok(EditOperation {
414            op_id: remote.op_id.clone(),
415            peer_id: remote.peer_id.clone(),
416            revision: remote.revision + 1,
417            op_type: transformed_type,
418            timestamp: remote.timestamp,
419        })
420    }
421}
422
423// ─────────────────────────────────────────────────────────────────────────────
424// Tests
425// ─────────────────────────────────────────────────────────────────────────────
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    fn make_op(id: &str, peer: &str, rev: u64, op_type: EditOpType) -> EditOperation {
432        EditOperation::new(id, peer.to_string(), rev, op_type)
433    }
434
435    #[test]
436    fn test_new_session() {
437        let session = CollabSession::new("session-1", "peer-A".to_string());
438        assert_eq!(session.session_id, "session-1");
439        assert_eq!(session.local_peer, "peer-A");
440        assert_eq!(session.shared_state.revision(), 0);
441        assert!(session.peers.is_empty());
442    }
443
444    #[test]
445    fn test_add_remove_peer() {
446        let mut session = CollabSession::new("s1", "A".to_string());
447        session.add_peer("B".to_string());
448        session.add_peer("C".to_string());
449        assert_eq!(session.peers.len(), 2);
450
451        // Duplicate add is ignored
452        session.add_peer("B".to_string());
453        assert_eq!(session.peers.len(), 2);
454
455        session.remove_peer(&"B".to_string());
456        assert_eq!(session.peers.len(), 1);
457        assert_eq!(session.peers[0], "C");
458    }
459
460    #[test]
461    fn test_apply_edit_increments_revision() {
462        let mut session = CollabSession::new("s1", "A".to_string());
463        let op = make_op(
464            "op1",
465            "B",
466            0,
467            EditOpType::MoveClip {
468                clip_id: 1,
469                new_start: 500,
470            },
471        );
472        session.apply_edit(op).expect("apply_edit should succeed");
473        assert_eq!(session.shared_state.revision(), 1);
474    }
475
476    #[test]
477    fn test_broadcast_drains_events() {
478        let mut session = CollabSession::new("s1", "A".to_string());
479        session.add_peer("B".to_string());
480        let events = session.broadcast();
481        assert_eq!(events.len(), 1);
482        assert!(matches!(events[0], CollabEditEvent::PeerJoined(_)));
483        // Second broadcast should be empty
484        assert!(session.broadcast().is_empty());
485    }
486
487    #[test]
488    fn test_ot_delete_wins_over_move() {
489        let mut session = CollabSession::new("s1", "A".to_string());
490        let local = make_op("l1", "A", 0, EditOpType::DeleteClip { clip_id: 42 });
491        let remote = make_op(
492            "r1",
493            "B",
494            0,
495            EditOpType::MoveClip {
496                clip_id: 42,
497                new_start: 1000,
498            },
499        );
500        let result = session.merge_concurrent(local, remote).expect("merge ok");
501        assert!(result.is_noop(), "delete should suppress concurrent move");
502    }
503
504    #[test]
505    fn test_ot_concurrent_trims_suppressed() {
506        let mut session = CollabSession::new("s1", "A".to_string());
507        let local = make_op(
508            "l1",
509            "A",
510            5,
511            EditOpType::TrimClip {
512                clip_id: 7,
513                new_in: 0,
514                new_out: 100,
515            },
516        );
517        let remote = make_op(
518            "r1",
519            "B",
520            5,
521            EditOpType::TrimClip {
522                clip_id: 7,
523                new_in: 10,
524                new_out: 90,
525            },
526        );
527        let result = session.merge_concurrent(local, remote).expect("merge ok");
528        assert!(
529            result.is_noop(),
530            "local trim should suppress concurrent remote trim"
531        );
532    }
533
534    #[test]
535    fn test_ot_non_concurrent_ops_pass_through() {
536        let mut session = CollabSession::new("s1", "A".to_string());
537        let local = make_op(
538            "l1",
539            "A",
540            3,
541            EditOpType::MoveClip {
542                clip_id: 1,
543                new_start: 0,
544            },
545        );
546        let remote = make_op(
547            "r1",
548            "B",
549            5,
550            EditOpType::MoveClip {
551                clip_id: 1,
552                new_start: 200,
553            },
554        );
555        let result = session.merge_concurrent(local, remote).expect("merge ok");
556        // Different revisions → pass-through, no transform
557        assert!(!result.is_noop());
558        assert_eq!(result.revision, 5);
559    }
560
561    #[test]
562    fn test_shared_edit_state_operations() {
563        let mut state = SharedEditState::new();
564        assert_eq!(state.revision(), 0);
565        let op = make_op("o1", "A", 0, EditOpType::NoOp);
566        state.apply(&op);
567        assert_eq!(state.revision(), 1);
568        assert_eq!(state.operations().len(), 1);
569    }
570}