Skip to main content

vectis_crdt/
causal_buffer.rs

1//! Causal ordering buffer.
2//!
3//! In a distributed system, operations can arrive out of causal order.
4//! Example: peer B inserts stroke X with `origin_left = Y.id`, but the
5//! InsertStroke(Y) hasn't arrived yet. Applying X immediately would place
6//! it at the wrong position (appended at the end instead of after Y).
7//!
8//! The CausalBuffer holds such "not yet ready" operations and re-tries them
9//! each time a new operation is successfully integrated.
10//!
11//! ## Causal readiness rules
12//!
13//! | Operation        | Ready when                                            |
14//! |------------------|-------------------------------------------------------|
15//! | InsertStroke     | origin_left is ZERO or in RGA index                   |
16//! | DeleteStroke     | target is in RGA index (insert was applied)           |
17//! | UpdateProperty   | target is in StrokeStore (insert was applied)         |
18//! | UpdateMetadata   | always ready                                          |
19
20use crate::document::{Document, Operation};
21use crate::error::{VectisError, VectisResult};
22use crate::rga::StrokeId;
23use crate::types::OpId;
24
25/// Maximum number of buffered operations before we declare a broken peer.
26const DEFAULT_MAX_CAPACITY: usize = 10_000;
27
28/// Checks whether `op` can be applied to `doc` right now.
29fn is_causally_ready(op: &Operation, doc: &Document) -> bool {
30    match op {
31        Operation::InsertStroke { origin_left, .. } => {
32            origin_left.is_zero() || doc.stroke_order.index.contains_key(origin_left)
33        }
34        Operation::DeleteStroke { target, .. } => {
35            // Apply even if it's already a tombstone (idempotent).
36            doc.stroke_order.index.contains_key(target)
37        }
38        Operation::UpdateProperty { target, .. } => {
39            doc.stroke_store.contains(target)
40        }
41        Operation::UpdateMetadata { .. } => true,
42    }
43}
44
45/// Holds operations that cannot be applied yet due to unresolved
46/// causal dependencies.
47pub struct CausalBuffer {
48    pending: Vec<Operation>,
49    max_capacity: usize,
50}
51
52impl CausalBuffer {
53    pub fn new() -> Self {
54        Self {
55            pending: Vec::new(),
56            max_capacity: DEFAULT_MAX_CAPACITY,
57        }
58    }
59
60    pub fn with_capacity(max_capacity: usize) -> Self {
61        Self { pending: Vec::new(), max_capacity }
62    }
63
64    /// Returns the number of buffered (pending) operations.
65    #[inline]
66    pub fn len(&self) -> usize {
67        self.pending.len()
68    }
69
70    #[inline]
71    pub fn is_empty(&self) -> bool {
72        self.pending.is_empty()
73    }
74
75    /// Enqueues an operation. Returns `Err` if the buffer is full.
76    pub fn push(&mut self, op: Operation) -> VectisResult<()> {
77        if self.pending.len() >= self.max_capacity {
78            return Err(VectisError::CausalBufferOverflow {
79                capacity: self.max_capacity,
80            });
81        }
82        self.pending.push(op);
83        Ok(())
84    }
85
86    /// Drains all operations that are causally ready given the current
87    /// document state. Leaves unready operations in the buffer.
88    ///
89    /// This must be called in a loop until it returns an empty Vec,
90    /// because applying one ready op may unblock others.
91    pub fn drain_ready(&mut self, doc: &Document) -> Vec<Operation> {
92        let mut ready = Vec::new();
93        let mut still_pending = Vec::new();
94
95        for op in self.pending.drain(..) {
96            if is_causally_ready(&op, doc) {
97                ready.push(op);
98            } else {
99                still_pending.push(op);
100            }
101        }
102
103        self.pending = still_pending;
104        ready
105    }
106
107    /// Returns the OpIds of all operations still waiting in the buffer.
108    /// Useful for diagnostics / debugging stuck ops.
109    pub fn pending_ids(&self) -> Vec<OpId> {
110        self.pending.iter().map(|op| op.id()).collect()
111    }
112}
113
114impl Default for CausalBuffer {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120// ─── Document integration ────────────────────────────────────────────────────
121
122impl Document {
123    /// Apply a remote operation through the causal buffer.
124    ///
125    /// Pushes `op` into the buffer, then repeatedly drains + applies
126    /// ready operations until no more can be unblocked.
127    ///
128    /// Returns the StrokeIds that were changed (for incremental re-render).
129    pub fn apply_remote_buffered(
130        &mut self,
131        op: Operation,
132        buffer: &mut CausalBuffer,
133    ) -> VectisResult<Vec<StrokeId>> {
134        buffer.push(op)?;
135        let mut changed: Vec<StrokeId> = Vec::new();
136
137        loop {
138            let ready = buffer.drain_ready(self);
139            if ready.is_empty() {
140                break;
141            }
142            for op in ready {
143                if let Some(id) = self.apply_remote(op) {
144                    if !changed.contains(&id) {
145                        changed.push(id);
146                    }
147                }
148            }
149        }
150
151        Ok(changed)
152    }
153}
154
155// ─── Tests ───────────────────────────────────────────────────────────────────
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::document::{Document, Operation};
161    use crate::rga::{ItemState, RgaItem};
162    use crate::stroke::{StrokeData, StrokePoint, StrokeProperties, ToolKind};
163    use crate::types::{ActorId, LamportTs, OpId};
164
165    fn make_doc(actor: u64) -> Document {
166        Document::new(ActorId(actor))
167    }
168
169    fn simple_stroke() -> (StrokeData, StrokeProperties) {
170        let pts: Box<[StrokePoint]> = vec![StrokePoint::basic(1.0, 1.0)].into();
171        let data = StrokeData::new(pts, ToolKind::Pen);
172        let props = StrokeProperties::new(0xFF000000, 2.0, 1.0, OpId::ZERO);
173        (data, props)
174    }
175
176    #[test]
177    fn buffered_delete_waits_for_insert() {
178        let mut doc_src = make_doc(1);
179        let (data, props) = simple_stroke();
180        let stroke_id = doc_src.insert_stroke(data, props);
181        let mut ops: Vec<Operation> = std::mem::take(&mut doc_src.pending_ops);
182        // ops[0] = InsertStroke
183
184        // Also produce a delete op
185        doc_src.delete_stroke(stroke_id);
186        let del_op = doc_src.pending_ops.remove(0); // DeleteStroke
187
188        // Apply DELETE before INSERT to a fresh doc
189        let mut doc = make_doc(99);
190        let mut buf = CausalBuffer::new();
191
192        // Delete arrives first — must be buffered
193        let changed = doc.apply_remote_buffered(del_op, &mut buf).unwrap();
194        assert!(changed.is_empty(), "delete can't apply without its insert");
195        assert_eq!(buf.len(), 1, "delete must be buffered");
196
197        // Now insert arrives — both should apply
198        let insert_op = ops.remove(0);
199        let changed = doc.apply_remote_buffered(insert_op, &mut buf).unwrap();
200        assert_eq!(buf.len(), 0, "buffer must be empty after insert");
201        assert!(doc.visible_stroke_ids().is_empty(), "stroke deleted");
202    }
203
204    #[test]
205    fn buffered_insert_waits_for_origin() {
206        // B.origin_left = A.id, but A arrives after B
207        let mut src = make_doc(1);
208        let (data_a, props_a) = simple_stroke();
209        let id_a = src.insert_stroke(data_a, props_a);
210        let op_a = src.pending_ops.remove(0);
211
212        let (data_b, props_b) = simple_stroke();
213        src.insert_stroke(data_b, props_b);
214        let op_b = src.pending_ops.remove(0); // origin_left = id_a
215
216        let mut doc = make_doc(99);
217        let mut buf = CausalBuffer::new();
218
219        // B arrives first
220        let changed = doc.apply_remote_buffered(op_b, &mut buf).unwrap();
221        assert!(changed.is_empty());
222        assert_eq!(buf.len(), 1);
223
224        // A arrives — both should now apply
225        let changed = doc.apply_remote_buffered(op_a, &mut buf).unwrap();
226        assert_eq!(buf.len(), 0);
227        assert_eq!(doc.visible_stroke_ids().len(), 2);
228        // A must come before B in z-order
229        let ids = doc.visible_stroke_ids();
230        assert_eq!(ids[0], id_a);
231    }
232
233    #[test]
234    fn buffer_overflow_returns_error() {
235        let mut buf = CausalBuffer::with_capacity(2);
236        let mut src = make_doc(1);
237
238        for _ in 0..3 {
239            let (data, props) = simple_stroke();
240            src.insert_stroke(data, props);
241            let op = src.pending_ops.remove(0);
242            let _ = buf.push(op); // we don't check each result here
243        }
244
245        // The 3rd push should fail
246        let mut src2 = make_doc(2);
247        let (data, props) = simple_stroke();
248        src2.insert_stroke(data, props);
249        let op = src2.pending_ops.remove(0);
250        let result = buf.push(op);
251        assert!(result.is_err());
252    }
253}