vectis_crdt/
causal_buffer.rs1use crate::document::{Document, Operation};
21use crate::error::{VectisError, VectisResult};
22use crate::rga::StrokeId;
23use crate::types::OpId;
24
25const DEFAULT_MAX_CAPACITY: usize = 10_000;
27
28fn is_causally_ready(op: &Operation, doc: &Document) -> bool {
30 match op {
31 Operation::InsertStroke { origin_left, .. } => {
32 origin_left.is_zero() || doc.stroke_order.index.contains_key(origin_left)
33 }
34 Operation::DeleteStroke { target, .. } => {
35 doc.stroke_order.index.contains_key(target)
37 }
38 Operation::UpdateProperty { target, .. } => {
39 doc.stroke_store.contains(target)
40 }
41 Operation::UpdateMetadata { .. } => true,
42 }
43}
44
45pub struct CausalBuffer {
48 pending: Vec<Operation>,
49 max_capacity: usize,
50}
51
52impl CausalBuffer {
53 pub fn new() -> Self {
54 Self {
55 pending: Vec::new(),
56 max_capacity: DEFAULT_MAX_CAPACITY,
57 }
58 }
59
60 pub fn with_capacity(max_capacity: usize) -> Self {
61 Self { pending: Vec::new(), max_capacity }
62 }
63
64 #[inline]
66 pub fn len(&self) -> usize {
67 self.pending.len()
68 }
69
70 #[inline]
71 pub fn is_empty(&self) -> bool {
72 self.pending.is_empty()
73 }
74
75 pub fn push(&mut self, op: Operation) -> VectisResult<()> {
77 if self.pending.len() >= self.max_capacity {
78 return Err(VectisError::CausalBufferOverflow {
79 capacity: self.max_capacity,
80 });
81 }
82 self.pending.push(op);
83 Ok(())
84 }
85
86 pub fn drain_ready(&mut self, doc: &Document) -> Vec<Operation> {
92 let mut ready = Vec::new();
93 let mut still_pending = Vec::new();
94
95 for op in self.pending.drain(..) {
96 if is_causally_ready(&op, doc) {
97 ready.push(op);
98 } else {
99 still_pending.push(op);
100 }
101 }
102
103 self.pending = still_pending;
104 ready
105 }
106
107 pub fn pending_ids(&self) -> Vec<OpId> {
110 self.pending.iter().map(|op| op.id()).collect()
111 }
112}
113
114impl Default for CausalBuffer {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl Document {
123 pub fn apply_remote_buffered(
130 &mut self,
131 op: Operation,
132 buffer: &mut CausalBuffer,
133 ) -> VectisResult<Vec<StrokeId>> {
134 buffer.push(op)?;
135 let mut changed: Vec<StrokeId> = Vec::new();
136
137 loop {
138 let ready = buffer.drain_ready(self);
139 if ready.is_empty() {
140 break;
141 }
142 for op in ready {
143 if let Some(id) = self.apply_remote(op) {
144 if !changed.contains(&id) {
145 changed.push(id);
146 }
147 }
148 }
149 }
150
151 Ok(changed)
152 }
153}
154
155#[cfg(test)]
158mod tests {
159 use super::*;
160 use crate::document::{Document, Operation};
161 use crate::rga::{ItemState, RgaItem};
162 use crate::stroke::{StrokeData, StrokePoint, StrokeProperties, ToolKind};
163 use crate::types::{ActorId, LamportTs, OpId};
164
165 fn make_doc(actor: u64) -> Document {
166 Document::new(ActorId(actor))
167 }
168
169 fn simple_stroke() -> (StrokeData, StrokeProperties) {
170 let pts: Box<[StrokePoint]> = vec![StrokePoint::basic(1.0, 1.0)].into();
171 let data = StrokeData::new(pts, ToolKind::Pen);
172 let props = StrokeProperties::new(0xFF000000, 2.0, 1.0, OpId::ZERO);
173 (data, props)
174 }
175
176 #[test]
177 fn buffered_delete_waits_for_insert() {
178 let mut doc_src = make_doc(1);
179 let (data, props) = simple_stroke();
180 let stroke_id = doc_src.insert_stroke(data, props);
181 let mut ops: Vec<Operation> = std::mem::take(&mut doc_src.pending_ops);
182 doc_src.delete_stroke(stroke_id);
186 let del_op = doc_src.pending_ops.remove(0); let mut doc = make_doc(99);
190 let mut buf = CausalBuffer::new();
191
192 let changed = doc.apply_remote_buffered(del_op, &mut buf).unwrap();
194 assert!(changed.is_empty(), "delete can't apply without its insert");
195 assert_eq!(buf.len(), 1, "delete must be buffered");
196
197 let insert_op = ops.remove(0);
199 let changed = doc.apply_remote_buffered(insert_op, &mut buf).unwrap();
200 assert_eq!(buf.len(), 0, "buffer must be empty after insert");
201 assert!(doc.visible_stroke_ids().is_empty(), "stroke deleted");
202 }
203
204 #[test]
205 fn buffered_insert_waits_for_origin() {
206 let mut src = make_doc(1);
208 let (data_a, props_a) = simple_stroke();
209 let id_a = src.insert_stroke(data_a, props_a);
210 let op_a = src.pending_ops.remove(0);
211
212 let (data_b, props_b) = simple_stroke();
213 src.insert_stroke(data_b, props_b);
214 let op_b = src.pending_ops.remove(0); let mut doc = make_doc(99);
217 let mut buf = CausalBuffer::new();
218
219 let changed = doc.apply_remote_buffered(op_b, &mut buf).unwrap();
221 assert!(changed.is_empty());
222 assert_eq!(buf.len(), 1);
223
224 let changed = doc.apply_remote_buffered(op_a, &mut buf).unwrap();
226 assert_eq!(buf.len(), 0);
227 assert_eq!(doc.visible_stroke_ids().len(), 2);
228 let ids = doc.visible_stroke_ids();
230 assert_eq!(ids[0], id_a);
231 }
232
233 #[test]
234 fn buffer_overflow_returns_error() {
235 let mut buf = CausalBuffer::with_capacity(2);
236 let mut src = make_doc(1);
237
238 for _ in 0..3 {
239 let (data, props) = simple_stroke();
240 src.insert_stroke(data, props);
241 let op = src.pending_ops.remove(0);
242 let _ = buf.push(op); }
244
245 let mut src2 = make_doc(2);
247 let (data, props) = simple_stroke();
248 src2.insert_stroke(data, props);
249 let op = src2.pending_ops.remove(0);
250 let result = buf.push(op);
251 assert!(result.is_err());
252 }
253}