cola/replica.rs
1use core::ops::{Range, RangeBounds};
2
3use crate::panic_messages as panic;
4use crate::*;
5
6/// A CRDT for text.
7///
8/// Like all other text CRDTs it allows multiple peers on a distributed
9/// network to concurrently edit the same text document, making sure that they
10/// all converge to the same final state without relying on a central server to
11/// coordinate the edits.
12///
13/// However, unlike many other CRDTs, a `Replica` doesn't actually store the
14/// text contents itself. This allows to decouple the text buffer from the CRDT
15/// machinery needed to handle concurrent edits and guarantee convergence.
16///
17/// Put another way, a `Replica` is a pure CRDT that doesn't know anything
18/// about where the text is actually stored. This is great because it makes it
19/// very easy to use it together with any text data structure of your choice:
20/// simple `String`s, gap buffers, piece tables, ropes, etc.
21///
22/// # How to distribute `Replica`s between peers.
23///
24/// When starting a new collaborative editing session, the first peer
25/// initializes its `Replica` via the [`new`](Self::new) method,
26/// [`encode`](Self::encode)s it and sends the result to the other peers in the
27/// session. If a new peer joins the session later on, one of the peers already
28/// in the session can [`encode`](Self::encode) their `Replica` and send it to
29/// them.
30///
31/// # How to integrate remote edits.
32///
33/// Every time a peer performs an edit on their local buffer they must inform
34/// their `Replica` by calling either [`inserted`](Self::inserted) or
35/// [`deleted`](Self::deleted). This produces [`Insertion`]s and [`Deletion`]s
36/// which can be sent over to the other peers using the network layer of your
37/// choice.
38///
39/// When a peer receives a remote `Insertion` or `Deletion` they can integrate
40/// it into their own `Replica` by calling either
41/// [`integrate_insertion`](Self::integrate_insertion) or
42/// [`integrate_deletion`](Self::integrate_deletion), respectively. The output
43/// of those methods tells the peer *where* in their local buffer they should
44/// apply the edit, taking into account all the other edits that have happened
45/// concurrently.
46///
47/// Basically, you tell your `Replica` how your buffer changes, and it tells
48/// you how your buffer *should* change when receiving remote edits.
49pub struct Replica {
50 /// The unique identifier of this replica.
51 id: ReplicaId,
52
53 /// Contains all the [`EditRun`]s that have been applied to this replica so
54 /// far. This is the main data structure.
55 run_tree: RunTree,
56
57 /// The value of the Lamport clock at this replica.
58 lamport_clock: LamportClock,
59
60 /// A local clock that's incremented every time a new insertion run is
61 /// created at this replica. If an insertion continues the current run the
62 /// clock is not incremented.
63 run_clock: RunClock,
64
65 /// Contains the latest character timestamps of all the replicas that this
66 /// replica has seen so far.
67 version_map: VersionMap,
68
69 /// A clock that keeps track of the order in which insertions happened at
70 /// this replica.
71 deletion_map: DeletionMap,
72
73 /// A collection of remote edits waiting to be merged.
74 backlog: Backlog,
75}
76
77impl Replica {
78 #[doc(hidden)]
79 pub fn assert_invariants(&self) {
80 self.run_tree.assert_invariants();
81 self.backlog.assert_invariants(&self.version_map, &self.deletion_map);
82 }
83
84 #[doc(hidden)]
85 pub fn average_gtree_inode_occupancy(&self) -> f32 {
86 self.run_tree.average_inode_occupancy()
87 }
88
89 /// The [`integrate_deletion`](Replica::integrate_deletion) method is not
90 /// able to immediately produce the offset range(s) to be deleted if the
91 /// `Deletion` is itself dependent on some context that the `Replica`
92 /// doesn't yet have. When this happens the `Deletion` is stored in an
93 /// internal backlog of edits that can't be processed yet, but may be in
94 /// the future.
95 ///
96 /// This method returns an iterator over all the backlogged deletions
97 /// which are now ready to be applied to your buffer.
98 ///
99 /// The [`BackloggedDeletions`] iterator yields the same kind of offset
100 /// ranges that [`integrate_deletion`](Replica::integrate_deletion) would
101 /// have produced had the `Deletion` been integrated right away.
102 ///
103 /// It's very important for the ranges to be deleted in the exact same
104 /// order in which they were yielded by the iterator. If you don't your
105 /// buffer could permanently diverge from the other peers.
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// # use cola::Replica;
111 /// // A buffer with the text "Hello" is replicated between three peers.
112 /// let mut replica1 = Replica::new(1, 5);
113 /// let mut replica2 = replica1.fork(2);
114 /// let mut replica3 = replica2.fork(3);
115 ///
116 /// // Peer 1 inserts " world!" at the end of the buffer, and after
117 /// // integrating the insertion peer 2 deletes "world", leaving only
118 /// // "Hello!".
119 /// let insert_spc_world_excl = replica1.inserted(5, 7);
120 /// let _ = replica2.integrate_insertion(&insert_spc_world_excl);
121 /// let delete_world = replica2.deleted(5..11);
122 ///
123 /// // Peer 3 receives the deletion, but it can't integrate it right away
124 /// // because it doesn't have the context it needs. The deletion is stored
125 /// // in the backlog.
126 /// let ranges = replica3.integrate_deletion(&delete_world);
127 ///
128 /// assert!(ranges.is_empty());
129 ///
130 /// // After peer 3 receives the " world!" insertion from peer 1 it can
131 /// // finally integrate the deletion.
132 /// let _ = replica3.integrate_insertion(&insert_spc_world_excl);
133 ///
134 /// let mut deletions = replica3.backlogged_deletions();
135 /// assert_eq!(deletions.next(), Some(vec![5..11]));
136 /// assert_eq!(deletions.next(), None);
137 /// ```
138 #[inline]
139 pub fn backlogged_deletions(&mut self) -> BackloggedDeletions<'_> {
140 BackloggedDeletions::from_replica(self)
141 }
142
143 /// The [`integrate_insertion`](Replica::integrate_insertion) method is not
144 /// able to immediately produce an offset if the `Insertion` is itself
145 /// dependent on some context that the `Replica` doesn't yet have. When
146 /// this happens the `Insertion` is stored in an internal backlog of edits
147 /// that can't be processed yet, but may be in the future.
148 ///
149 /// This method returns an iterator over all the backlogged insertions
150 /// which are now ready to be applied to your buffer.
151 ///
152 /// The [`BackloggedInsertions`] iterator yields `(Text, Length)` pairs
153 /// containing the [`Text`] to be inserted and the offset at which it
154 /// should be inserted.
155 ///
156 /// It's very important for the insertions to be applied in the exact same
157 /// order in which they were yielded by the iterator. If you don't your
158 /// buffer could permanently diverge from the other peers.
159 ///
160 /// # Examples
161 ///
162 /// ```
163 /// # use cola::Replica;
164 /// // The buffer at peer 1 is "ab".
165 /// let mut replica1 = Replica::new(1, 2);
166 ///
167 /// // A second peer joins the session.
168 /// let mut replica2 = replica1.fork(2);
169 ///
170 /// // Peer 1 inserts 'c', 'd' and 'e' at the end of the buffer.
171 /// let insert_c = replica1.inserted(2, 1);
172 /// let insert_d = replica1.inserted(3, 1);
173 /// let insert_e = replica1.inserted(4, 1);
174 ///
175 /// // For some reason, the network layer messes up the order of the edits
176 /// // and they get to the second peer in the opposite order. Because each
177 /// // edit depends on the previous one, peer 2 can't merge the insertions
178 /// // of the 'd' and the 'e' until it sees the 'c'.
179 /// let none_e = replica2.integrate_insertion(&insert_e);
180 /// let none_d = replica2.integrate_insertion(&insert_d);
181 ///
182 /// assert!(none_e.is_none());
183 /// assert!(none_d.is_none());
184 ///
185 /// // Finally, peer 2 receives the 'c' and it's able merge it right away.
186 /// let offset_c = replica2.integrate_insertion(&insert_c).unwrap();
187 ///
188 /// assert_eq!(offset_c, 2);
189 ///
190 /// // Peer 2 now has all the context it needs to merge the rest of the
191 /// // edits that were previously backlogged.
192 /// let mut backlogged = replica2.backlogged_insertions();
193 ///
194 /// assert!(matches!(backlogged.next(), Some((_, 3))));
195 /// assert!(matches!(backlogged.next(), Some((_, 4))));
196 /// ```
197 #[inline]
198 pub fn backlogged_insertions(&mut self) -> BackloggedInsertions<'_> {
199 BackloggedInsertions::from_replica(self)
200 }
201
202 #[inline]
203 pub(crate) fn backlog_mut(&mut self) -> &mut Backlog {
204 &mut self.backlog
205 }
206
207 /// Returns `true` if this `Replica` is ready to merge the given
208 /// `Deletion`.
209 #[inline]
210 pub(crate) fn can_merge_deletion(&self, deletion: &Deletion) -> bool {
211 debug_assert!(!self.has_merged_deletion(deletion));
212
213 (
214 // Makes sure that we merge deletions in the same order they were
215 // created.
216 self.deletion_map.get(deletion.deleted_by()) + 1
217 == deletion.deletion_ts()
218 ) && (
219 // Makes sure that we have already merged all the insertions that
220 // the remote `Replica` had when it generated the deletion.
221 self.version_map >= *deletion.version_map()
222 )
223 }
224
225 /// Returns `true` if this `Replica` is ready to merge the given
226 /// `Insertion`.
227 #[inline]
228 pub(crate) fn can_merge_insertion(&self, insertion: &Insertion) -> bool {
229 debug_assert!(!self.has_merged_insertion(insertion));
230
231 (
232 // Makes sure that we merge insertions in the same order they were
233 // created.
234 //
235 // This is technically not needed to merge a single insertion (all
236 // that matters is that we know where to anchor the insertion), but
237 // it's needed to correctly increment the chararacter clock inside
238 // this `Replica`'s `VersionMap` without skipping any temporal
239 // range.
240 self.version_map.get(insertion.inserted_by()) == insertion.start()
241 ) && (
242 // Makes sure that we have already merged the insertion containing
243 // the anchor of this insertion.
244 self.version_map.get(insertion.anchor().replica_id())
245 >= insertion.anchor().character_ts()
246 )
247 }
248
249 #[doc(hidden)]
250 pub fn debug(&self) -> debug::DebugAsSelf<'_> {
251 self.into()
252 }
253
254 #[doc(hidden)]
255 pub fn debug_as_btree(&self) -> debug::DebugAsBtree<'_> {
256 self.into()
257 }
258
259 /// Creates a new `Replica` with the given [`ReplicaId`] by decoding the
260 /// contents of the [`EncodedReplica`].
261 ///
262 /// # Panics
263 ///
264 /// Panics if the [`ReplicaId`] is zero.
265 ///
266 /// # Examples
267 ///
268 /// ```
269 /// # use cola::{Replica, EncodedReplica};
270 /// let replica1 = Replica::new(1, 42);
271 ///
272 /// let encoded: EncodedReplica = replica1.encode();
273 ///
274 /// let replica2 = Replica::decode(2, &encoded).unwrap();
275 ///
276 /// assert_eq!(replica2.id(), 2);
277 /// ```
278 #[cfg(feature = "encode")]
279 #[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
280 #[track_caller]
281 #[inline]
282 pub fn decode(
283 id: ReplicaId,
284 encoded: &EncodedReplica,
285 ) -> Result<Self, DecodeError> {
286 if id == 0 {
287 panic::replica_id_is_zero();
288 }
289
290 if encoded.protocol_version() != PROTOCOL_VERSION {
291 return Err(DecodeError::DifferentProtocol {
292 encoded_on: encoded.protocol_version(),
293 decoding_on: PROTOCOL_VERSION,
294 });
295 }
296
297 if encoded.checksum() != &checksum(encoded.bytes()) {
298 return Err(DecodeError::ChecksumFailed);
299 }
300
301 let Some((
302 run_tree,
303 lamport_clock,
304 mut version_map,
305 mut deletion_map,
306 backlog,
307 )) = encode::decode(encoded.bytes())
308 else {
309 return Err(DecodeError::InvalidData);
310 };
311
312 version_map.fork_in_place(id, 0);
313
314 deletion_map.fork_in_place(id, 0);
315
316 let replica = Self {
317 id,
318 run_tree,
319 run_clock: RunClock::new(),
320 lamport_clock,
321 version_map,
322 deletion_map,
323 backlog,
324 };
325
326 Ok(replica)
327 }
328
329 /// Informs the `Replica` that you have deleted the characters in the given
330 /// offset range.
331 ///
332 /// This produces a [`Deletion`] which can be sent to all the other peers
333 /// to integrate the deletion into their own `Replica`s.
334 ///
335 /// # Panics
336 ///
337 /// Panics if the start of the range is greater than the end or if the end
338 /// is out of bounds (i.e. greater than the current length of your buffer).
339 ///
340 /// # Examples
341 ///
342 /// ```
343 /// # use cola::{Replica, Deletion};
344 /// // The buffer at peer 1 is "Hello World".
345 /// let mut replica1 = Replica::new(1, 11);
346 ///
347 /// // Peer 1 deletes "Hello ".
348 /// let deletion: Deletion = replica1.deleted(..6);
349 /// ```
350 #[track_caller]
351 #[must_use]
352 #[inline]
353 pub fn deleted<R>(&mut self, range: R) -> Deletion
354 where
355 R: RangeBounds<Length>,
356 {
357 let (start, end) = range_bounds_to_start_end(range, 0, self.len());
358
359 if end > self.len() {
360 panic::offset_out_of_bounds(end, self.len());
361 }
362
363 if start > end {
364 panic::start_greater_than_end(start, end);
365 }
366
367 if start == end {
368 return Deletion::no_op();
369 }
370
371 let deleted_range = (start..end).into();
372
373 let (start, start_ts, end, end_ts) =
374 self.run_tree.delete(deleted_range);
375
376 *self.deletion_map.this_mut() += 1;
377
378 Deletion::new(
379 start,
380 start_ts,
381 end,
382 end_ts,
383 self.version_map.clone(),
384 self.deletion_map.this(),
385 )
386 }
387
388 #[doc(hidden)]
389 pub fn empty_leaves(&self) -> (usize, usize) {
390 self.run_tree.count_empty_leaves()
391 }
392
393 /// Returns `true` if the given `Replica` shares the same document state as
394 /// this one.
395 ///
396 /// This is used in tests to make sure that an encode-decode roundtrip was
397 /// successful.
398 #[doc(hidden)]
399 pub fn eq_decoded(&self, other: &Self) -> bool {
400 self.run_tree == other.run_tree && self.backlog == other.backlog
401 }
402
403 /// Encodes the `Replica` in a custom binary format.
404 ///
405 /// This can be used to send a `Replica` to another peer over the network.
406 /// Once they have received the [`EncodedReplica`] they can decode it via
407 /// the [`decode`](Replica::decode) method.
408 ///
409 /// Note that if you want to collaborate within a single process you can
410 /// just [`fork`](Replica::fork) the `Replica` without having to encode it
411 /// and decode it again.
412 #[cfg(feature = "encode")]
413 #[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
414 #[inline]
415 pub fn encode(&self) -> EncodedReplica {
416 let bytes = encode::encode(self);
417 let checksum = checksum(&bytes);
418 EncodedReplica::new(PROTOCOL_VERSION, checksum, bytes)
419 }
420
421 /// Creates a new `Replica` with the given [`ReplicaId`] but with the same
422 /// internal state as this one.
423 ///
424 /// Note that this method should be used when the collaborative session is
425 /// limited to a single process (e.g. multiple threads working on the same
426 /// document). If you want to collaborate across different processes or
427 /// machines you should [`encode`](Replica::encode) the `Replica` and send
428 /// the result to the other peers.
429 ///
430 /// # Panics
431 ///
432 /// Panics if the [`ReplicaId`] is zero.
433 ///
434 /// # Examples
435 ///
436 /// ```
437 /// # use cola::{Replica, ReplicaId};
438 /// let replica1 = Replica::new(1, 0);
439 /// let replica2 = replica1.fork(2);
440 /// assert_eq!(replica2.id(), 2)
441 /// ```
442 #[track_caller]
443 #[inline]
444 pub fn fork(&self, new_id: ReplicaId) -> Self {
445 if new_id == 0 {
446 panic::replica_id_is_zero();
447 }
448
449 Self {
450 id: new_id,
451 run_tree: self.run_tree.clone(),
452 run_clock: RunClock::new(),
453 lamport_clock: self.lamport_clock,
454 version_map: self.version_map.fork(new_id, 0),
455 deletion_map: self.deletion_map.fork(new_id, 0),
456 backlog: self.backlog.clone(),
457 }
458 }
459
460 /// Returns `true` if this `Replica` has already merged the given
461 /// `Deletion`.
462 #[inline]
463 fn has_merged_deletion(&self, deletion: &Deletion) -> bool {
464 self.deletion_map.get(deletion.deleted_by()) >= deletion.deletion_ts()
465 }
466
467 /// Returns `true` if this `Replica` has already merged the given
468 /// `Insertion`.
469 #[inline]
470 fn has_merged_insertion(&self, insertion: &Insertion) -> bool {
471 self.version_map.get(insertion.inserted_by()) > insertion.start()
472 }
473
474 /// Returns the id of this `Replica`.
475 #[inline]
476 pub fn id(&self) -> ReplicaId {
477 self.id
478 }
479
480 /// Informs the `Replica` that you have inserted `len` characters at the
481 /// given offset.
482 ///
483 /// This produces an [`Insertion`] which can be sent to all the other peers
484 /// to integrate the insertion into their own `Replica`s.
485 ///
486 /// # Panics
487 ///
488 /// Panics if the offset is out of bounds (i.e. greater than the current
489 /// length of your buffer).
490 ///
491 /// # Examples
492 ///
493 /// ```
494 /// # use cola::{Replica, Insertion};
495 /// // The buffer at peer 1 is "ab".
496 /// let mut replica1 = Replica::new(1, 2);
497 ///
498 /// // Peer 1 inserts two characters between the 'a' and the 'b'.
499 /// let insertion: Insertion = replica1.inserted(1, 2);
500 /// ```
501 #[track_caller]
502 #[must_use]
503 #[inline]
504 pub fn inserted(&mut self, at_offset: Length, len: Length) -> Insertion {
505 if at_offset > self.len() {
506 panic::offset_out_of_bounds(at_offset, self.len());
507 }
508
509 if len == 0 {
510 return Insertion::no_op();
511 }
512
513 let start = self.version_map.this();
514
515 *self.version_map.this_mut() += len;
516
517 let end = self.version_map.this();
518
519 let text = Text::new(self.id, start..end);
520
521 let (anchor, anchor_ts) = self.run_tree.insert(
522 at_offset,
523 text.clone(),
524 &mut self.run_clock,
525 &mut self.lamport_clock,
526 );
527
528 Insertion::new(
529 anchor,
530 anchor_ts,
531 text,
532 self.lamport_clock.highest(),
533 self.run_clock.last(),
534 )
535 }
536
537 #[allow(clippy::len_without_is_empty)]
538 #[doc(hidden)]
539 pub fn len(&self) -> Length {
540 self.run_tree.len()
541 }
542
543 /// Integrates a remote [`Deletion`] into this `Replica`, returning a
544 /// sequence of offset [`Range`]s to be deleted from your buffer.
545 ///
546 /// The number of ranges can be:
547 ///
548 /// - zero, if the `Deletion` has already been integrated by this `Replica`
549 /// or if it depends on some context that this `Replica` doesn't yet have
550 /// (see the [`backlogged_deletions`](Replica::backlogged_deletions) method
551 /// which handles this case);
552 ///
553 /// - one, if there haven't been any concurrent insertions (local or
554 /// remote) within the original range of the deletion;
555 ///
556 /// - more than one, if there have been. In this case the deleted range has
557 /// been split into multiple smaller ranges that "skip over" the newly
558 /// inserted text.
559 ///
560 /// The ranges are guaranteed to be sorted in ascending order and to not
561 /// overlap, i.e. for any two indices `i` and `j` where `i < j` and `j <
562 /// ranges.len()` it holds that `ranges[i].end < ranges[j].start` (and of
563 /// course that `ranges[i].start < ranges[i].end`).
564 ///
565 /// # Examples
566 ///
567 /// ```
568 /// # use cola::Replica;
569 /// // Peer 1 starts with the text "abcd" and sends it to a second peer.
570 /// let mut replica1 = Replica::new(1, 4);
571 ///
572 /// let mut replica2 = replica1.fork(2);
573 ///
574 /// // Peer 1 deletes the "bc" in "abcd".
575 /// let deletion = replica1.deleted(1..3);
576 ///
577 /// // Concurrently, peer 2 inserts a single character at start of the
578 /// // document.
579 /// let _ = replica2.inserted(0, 1);
580 ///
581 /// // Now peer 2 receives the deletion from peer 1. Since the previous
582 /// // insertion was outside of the deleted region the latter is still
583 /// // contiguous at this peer.
584 /// let ranges = replica2.integrate_deletion(&deletion);
585 ///
586 /// assert_eq!(ranges.as_slice(), &[2..4]);
587 /// ```
588 ///
589 /// ```
590 /// # use cola::Replica;
591 /// // Same as before..
592 /// let mut replica1 = Replica::new(1, 4);
593 /// let mut replica2 = replica1.fork(2);
594 ///
595 /// let deletion = replica1.deleted(1..3);
596 ///
597 /// // ..except now peer 2 inserts a single character between the 'b' and
598 /// // the 'c'.
599 /// let _ = replica2.inserted(2, 1);
600 ///
601 /// // Now peer 2 receives the deletion from peer 1. Since the previous
602 /// // insertion was inside the deleted range, the latter has now been
603 /// // split into two separate ranges.
604 /// let ranges = replica2.integrate_deletion(&deletion);
605 ///
606 /// assert_eq!(ranges.as_slice(), &[1..2, 3..4]);
607 /// ```
608 #[must_use]
609 #[inline]
610 pub fn integrate_deletion(
611 &mut self,
612 deletion: &Deletion,
613 ) -> Vec<Range<Length>> {
614 if deletion.is_no_op() || self.has_merged_deletion(deletion) {
615 Vec::new()
616 } else if self.can_merge_deletion(deletion) {
617 self.merge_unchecked_deletion(deletion)
618 } else {
619 self.backlog.insert_deletion(deletion.clone());
620 Vec::new()
621 }
622 }
623
624 /// Integrates a remote [`Insertion`] into this `Replica`, optionally
625 /// returning the offset at which to insert the `Insertion`'s
626 /// [`Text`](Insertion::text) into your buffer.
627 ///
628 /// A `None` value can be returned if the `Insertion` has already been
629 /// integrated by this `Replica` or if it depends on some context that this
630 /// `Replica` doesn't yet have (see the
631 /// [`backlogged_insertions`](Replica::backlogged_insertions) method which
632 /// handles this case).
633 ///
634 /// # Examples
635 ///
636 /// ```
637 /// # use cola::{Replica, Insertion};
638 /// // Peer 1 starts with the text "ab" and sends it to a second peer.
639 /// let mut replica1 = Replica::new(1, 2);
640 ///
641 /// let mut replica2 = replica1.fork(2);
642 ///
643 /// // Peer 1 inserts two characters between the 'a' and the 'b'.
644 /// let insertion_1 = replica1.inserted(1, 2);
645 ///
646 /// // Concurrently, peer 2 inserts a character at the start of the
647 /// // document.
648 /// let insertion_2 = replica2.inserted(0, 1);
649 ///
650 /// // Peer 1 receives this insertion, and since there haven't been any
651 /// // concurrent insertions at the start of the document, its offset
652 /// // hasn't changed.
653 /// let offset_2 = replica1.integrate_insertion(&insertion_2).unwrap();
654 ///
655 /// assert_eq!(offset_2, 0);
656 ///
657 /// // If we try to integrate the same insertion again, we'll get a `None`.
658 /// assert!(replica1.integrate_insertion(&insertion_2).is_none());
659 ///
660 /// // Finally, peer 2 receives the first insertion from peer 1. Its text
661 /// // should be inserted between the 'a' and the 'b', which is at offset
662 /// // 2 at this peer.
663 /// let offset_1 = replica2.integrate_insertion(&insertion_1).unwrap();
664 ///
665 /// assert_eq!(offset_1, 2);
666 /// ```
667 #[must_use]
668 #[inline]
669 pub fn integrate_insertion(
670 &mut self,
671 insertion: &Insertion,
672 ) -> Option<Length> {
673 if insertion.is_no_op() || self.has_merged_insertion(insertion) {
674 None
675 } else if self.can_merge_insertion(insertion) {
676 Some(self.merge_unchecked_insertion(insertion))
677 } else {
678 self.backlog.insert_insertion(insertion.clone());
679 None
680 }
681 }
682
683 /// Merges the given [`Deletion`] without checking whether it can be
684 /// merged.
685 #[inline]
686 pub(crate) fn merge_unchecked_deletion(
687 &mut self,
688 deletion: &Deletion,
689 ) -> Vec<Range<Length>> {
690 debug_assert!(self.can_merge_deletion(deletion));
691
692 let ranges = self.run_tree.merge_deletion(deletion);
693
694 *self.deletion_map.get_mut(deletion.deleted_by()) =
695 deletion.deletion_ts();
696
697 ranges
698 }
699
700 /// Merges the given [`Insertion`] without checking whether it can be
701 /// merged.
702 #[inline]
703 pub(crate) fn merge_unchecked_insertion(
704 &mut self,
705 insertion: &Insertion,
706 ) -> Length {
707 debug_assert!(self.can_merge_insertion(insertion));
708
709 let offset = self.run_tree.merge_insertion(insertion);
710
711 *self.version_map.get_mut(insertion.inserted_by()) += insertion.len();
712
713 self.lamport_clock.merge(insertion.lamport_ts());
714
715 offset
716 }
717
718 /// Creates a new `Replica` with the given [`ReplicaId`] from the initial
719 /// [`Length`] of your buffer.
720 ///
721 /// Note that if you have multiple peers working on the same document you
722 /// should only use this constructor on the first peer, usually the one
723 /// that starts the collaboration session.
724 ///
725 /// The other peers should get their `Replica` from another `Replica`
726 /// already in the session by either:
727 ///
728 /// a) [`fork`](Replica::fork)ing it if the collaboration happens all in
729 /// the same process (e.g. a text editor with plugins running on separate
730 /// threads),
731 ///
732 /// b) [`encode`](Replica::encode)ing it and sending the result over the
733 /// network if the collaboration is between different processes or
734 /// machines.
735 ///
736 /// # Panics
737 ///
738 /// Panics if the [`ReplicaId`] is zero.
739 ///
740 /// # Examples
741 ///
742 /// ```
743 /// # use std::thread;
744 /// # use cola::Replica;
745 /// // A text editor initializes a new Replica on the main thread where the
746 /// // buffer is "foo".
747 /// let replica_main = Replica::new(1, 3);
748 ///
749 /// // It then starts a plugin on a separate thread and wants to give it a
750 /// // Replica to keep its buffer synchronized with the one on the main
751 /// // thread. It does *not* call `new()` again, but instead forks the
752 /// // existing Replica and sends it to the new thread.
753 /// let replica_plugin = replica_main.fork(2);
754 ///
755 /// thread::spawn(move || {
756 /// // The plugin can now use its Replica to exchange edits with the
757 /// // main thread.
758 /// println!("{replica_plugin:?}");
759 /// });
760 /// ```
761 #[track_caller]
762 #[inline]
763 pub fn new(id: ReplicaId, len: Length) -> Self {
764 if id == 0 {
765 panic::replica_id_is_zero();
766 }
767
768 let mut run_clock = RunClock::new();
769
770 let mut lamport_clock = LamportClock::new();
771
772 let initial_text = Text::new(id, 0..len);
773
774 let first_run =
775 EditRun::new(initial_text, run_clock.next(), lamport_clock.next());
776
777 let run_tree = RunTree::new(first_run);
778
779 Self {
780 id,
781 run_tree,
782 run_clock,
783 lamport_clock,
784 version_map: VersionMap::new(id, len),
785 deletion_map: DeletionMap::new(id, 0),
786 backlog: Backlog::new(),
787 }
788 }
789
790 #[doc(hidden)]
791 pub fn num_runs(&self) -> usize {
792 self.run_tree.count_empty_leaves().1
793 }
794}
795
796impl core::fmt::Debug for Replica {
797 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
798 struct DebugHexU64(u64);
799
800 impl core::fmt::Debug for DebugHexU64 {
801 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
802 write!(f, "{:x}", self.0)
803 }
804 }
805
806 // In the public Debug we just print the ReplicaId to avoid leaking
807 // our internals.
808 //
809 // During development the `Replica::debug()` method (which is public
810 // but hidden from the API) can be used to obtain a more useful
811 // representation.
812 f.debug_tuple("Replica").field(&DebugHexU64(self.id)).finish()
813 }
814}
815
816pub type LamportTs = u64;
817
818/// A distributed logical clock used to determine if a run was in the document
819/// when another run was inserted.
820///
821/// If it was then its [`LamportTs`] is guaranteed to be strictly less than the
822/// new run's [`LamportTs`].
823///
824/// See [this](https://en.wikipedia.org/wiki/Lamport_timestamp) for more.
825#[derive(Copy, Clone)]
826#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
827pub struct LamportClock(LamportTs);
828
829impl core::fmt::Debug for LamportClock {
830 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
831 self.0.fmt(f)
832 }
833}
834
835impl LamportClock {
836 #[inline]
837 pub fn highest(&self) -> LamportTs {
838 self.0.saturating_sub(1)
839 }
840
841 #[inline]
842 fn merge(&mut self, remote_ts: LamportTs) {
843 if remote_ts >= self.0 {
844 self.0 = remote_ts + 1;
845 }
846 }
847
848 #[inline]
849 fn new() -> Self {
850 Self(0)
851 }
852
853 #[inline]
854 pub fn next(&mut self) -> LamportTs {
855 let next = self.0;
856 self.0 += 1;
857 next
858 }
859}
860
861pub type RunTs = u64;
862
863/// A local clock used increased every time a new insertion run is started.
864#[derive(Copy, Clone)]
865pub struct RunClock(RunTs);
866
867impl core::fmt::Debug for RunClock {
868 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
869 self.0.fmt(f)
870 }
871}
872
873impl RunClock {
874 #[inline]
875 fn last(&self) -> RunTs {
876 self.0.saturating_sub(1)
877 }
878
879 #[inline]
880 fn new() -> Self {
881 Self(0)
882 }
883
884 #[inline]
885 pub fn next(&mut self) -> RunTs {
886 let next = self.0;
887 self.0 += 1;
888 next
889 }
890}
891
892pub type DeletionTs = u64;
893
894#[cfg(feature = "encode")]
895mod encode {
896 use serde::{de, ser};
897
898 use super::*;
899
900 type EncodedFields =
901 (RunTree, LamportClock, VersionMap, DeletionMap, Backlog);
902
903 #[inline]
904 pub(super) fn encode(replica: &Replica) -> Vec<u8> {
905 let mut encoded = Vec::new();
906
907 encode_field(&mut encoded, &replica.run_tree);
908 encode_field(&mut encoded, &replica.lamport_clock);
909 encode_field(&mut encoded, &replica.version_map);
910 encode_field(&mut encoded, &replica.deletion_map);
911 encode_field(&mut encoded, &replica.backlog);
912
913 encoded
914 }
915
916 #[inline]
917 pub(super) fn decode(bytes: &[u8]) -> Option<EncodedFields> {
918 let (run_tree, bytes) = decode_field(bytes)?;
919 let (lamport_clock, bytes) = decode_field(bytes)?;
920 let (version_map, bytes) = decode_field(bytes)?;
921 let (deletion_map, bytes) = decode_field(bytes)?;
922 let (backlog, bytes) = decode_field(bytes)?;
923
924 if bytes.is_empty() {
925 Some((run_tree, lamport_clock, version_map, deletion_map, backlog))
926 } else {
927 None
928 }
929 }
930
931 #[inline]
932 fn encode_field<T>(buf: &mut Vec<u8>, field: &T)
933 where
934 T: ser::Serialize,
935 {
936 let field_bytes = serialize(field);
937 let len_bytes = field_bytes.len().to_le_bytes();
938 buf.extend_from_slice(&len_bytes);
939 buf.extend_from_slice(&field_bytes);
940 }
941
942 #[inline]
943 fn decode_field<'a, T>(buf: &'a [u8]) -> Option<(T, &'a [u8])>
944 where
945 T: de::Deserialize<'a>,
946 {
947 // The first 8 bytes represent the length of the encoded field.
948 let (len_bytes, rest) = if buf.len() >= 8 {
949 buf.split_at(8)
950 } else {
951 return None;
952 };
953
954 let len_bytes: [u8; 8] = len_bytes.try_into().ok()?;
955
956 let len = usize::from_le_bytes(len_bytes);
957
958 let (encoded_field, rest) = if rest.len() >= len {
959 rest.split_at(len)
960 } else {
961 return None;
962 };
963
964 deserialize::<T>(encoded_field).map(|field| (field, rest))
965 }
966
967 #[inline]
968 fn serialize<T>(value: &T) -> Vec<u8>
969 where
970 T: ser::Serialize,
971 {
972 bincode::serialize(value).expect("failed to serialize")
973 }
974
975 #[inline]
976 fn deserialize<'a, T>(bytes: &'a [u8]) -> Option<T>
977 where
978 T: de::Deserialize<'a>,
979 {
980 bincode::deserialize(bytes).ok()
981 }
982}
983
984mod debug {
985 use core::fmt::Debug;
986
987 use super::*;
988
989 pub struct DebugAsSelf<'a>(BaseDebug<'a, run_tree::DebugAsSelf<'a>>);
990
991 impl<'a> From<&'a Replica> for DebugAsSelf<'a> {
992 #[inline]
993 fn from(replica: &'a Replica) -> DebugAsSelf<'a> {
994 let base = BaseDebug {
995 replica,
996 debug_run_tree: replica.run_tree.debug_as_self(),
997 };
998
999 Self(base)
1000 }
1001 }
1002
1003 impl<'a> core::fmt::Debug for DebugAsSelf<'a> {
1004 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1005 self.0.fmt(f)
1006 }
1007 }
1008
1009 pub struct DebugAsBtree<'a>(BaseDebug<'a, run_tree::DebugAsBtree<'a>>);
1010
1011 impl<'a> From<&'a Replica> for DebugAsBtree<'a> {
1012 #[inline]
1013 fn from(replica: &'a Replica) -> DebugAsBtree<'a> {
1014 let base = BaseDebug {
1015 replica,
1016 debug_run_tree: replica.run_tree.debug_as_btree(),
1017 };
1018
1019 Self(base)
1020 }
1021 }
1022
1023 impl<'a> core::fmt::Debug for DebugAsBtree<'a> {
1024 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1025 self.0.fmt(f)
1026 }
1027 }
1028
1029 struct BaseDebug<'a, T: Debug> {
1030 replica: &'a Replica,
1031 debug_run_tree: T,
1032 }
1033
1034 impl<'a, T: Debug> Debug for BaseDebug<'a, T> {
1035 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1036 let replica = &self.replica;
1037
1038 f.debug_struct("Replica")
1039 .field("id", &replica.id)
1040 .field("run_tree", &self.debug_run_tree)
1041 .field("run_indices", &replica.run_tree.run_indices())
1042 .field("lamport_clock", &replica.lamport_clock)
1043 .field("run_clock", &replica.run_clock)
1044 .field("version_map", &replica.version_map)
1045 .field("deletion_map", &replica.deletion_map)
1046 .field("backlog", &replica.backlog)
1047 .finish()
1048 }
1049 }
1050}