cola/
replica.rs

1use core::ops::{Range, RangeBounds};
2
3use crate::panic_messages as panic;
4use crate::*;
5
6/// A CRDT for text.
7///
8/// Like all other text CRDTs it allows multiple peers on a distributed
9/// network to concurrently edit the same text document, making sure that they
10/// all converge to the same final state without relying on a central server to
11/// coordinate the edits.
12///
13/// However, unlike many other CRDTs, a `Replica` doesn't actually store the
14/// text contents itself. This allows to decouple the text buffer from the CRDT
15/// machinery needed to handle concurrent edits and guarantee convergence.
16///
17/// Put another way, a `Replica` is a pure CRDT that doesn't know anything
18/// about where the text is actually stored. This is great because it makes it
19/// very easy to use it together with any text data structure of your choice:
20/// simple `String`s, gap buffers, piece tables, ropes, etc.
21///
22/// # How to distribute `Replica`s between peers.
23///
24/// When starting a new collaborative editing session, the first peer
25/// initializes its `Replica` via the [`new`](Self::new) method,
26/// [`encode`](Self::encode)s it and sends the result to the other peers in the
27/// session. If a new peer joins the session later on, one of the peers already
28/// in the session can [`encode`](Self::encode) their `Replica` and send it to
29/// them.
30///
31/// # How to integrate remote edits.
32///
33/// Every time a peer performs an edit on their local buffer they must inform
34/// their `Replica` by calling either [`inserted`](Self::inserted) or
35/// [`deleted`](Self::deleted). This produces [`Insertion`]s and [`Deletion`]s
36/// which can be sent over to the other peers using the network layer of your
37/// choice.
38///
39/// When a peer receives a remote `Insertion` or `Deletion` they can integrate
40/// it into their own `Replica` by calling either
41/// [`integrate_insertion`](Self::integrate_insertion) or
42/// [`integrate_deletion`](Self::integrate_deletion), respectively. The output
43/// of those methods tells the peer *where* in their local buffer they should
44/// apply the edit, taking into account all the other edits that have happened
45/// concurrently.
46///
47/// Basically, you tell your `Replica` how your buffer changes, and it tells
48/// you how your buffer *should* change when receiving remote edits.
49pub struct Replica {
50    /// The unique identifier of this replica.
51    id: ReplicaId,
52
53    /// Contains all the [`EditRun`]s that have been applied to this replica so
54    /// far. This is the main data structure.
55    run_tree: RunTree,
56
57    /// The value of the Lamport clock at this replica.
58    lamport_clock: LamportClock,
59
60    /// A local clock that's incremented every time a new insertion run is
61    /// created at this replica. If an insertion continues the current run the
62    /// clock is not incremented.
63    run_clock: RunClock,
64
65    /// Contains the latest character timestamps of all the replicas that this
66    /// replica has seen so far.
67    version_map: VersionMap,
68
69    /// A clock that keeps track of the order in which insertions happened at
70    /// this replica.
71    deletion_map: DeletionMap,
72
73    /// A collection of remote edits waiting to be merged.
74    backlog: Backlog,
75}
76
77impl Replica {
78    #[doc(hidden)]
79    pub fn assert_invariants(&self) {
80        self.run_tree.assert_invariants();
81        self.backlog.assert_invariants(&self.version_map, &self.deletion_map);
82    }
83
84    #[doc(hidden)]
85    pub fn average_gtree_inode_occupancy(&self) -> f32 {
86        self.run_tree.average_inode_occupancy()
87    }
88
89    /// The [`integrate_deletion`](Replica::integrate_deletion) method is not
90    /// able to immediately produce the offset range(s) to be deleted if the
91    /// `Deletion` is itself dependent on some context that the `Replica`
92    /// doesn't yet have. When this happens the `Deletion` is stored in an
93    /// internal backlog of edits that can't be processed yet, but may be in
94    /// the future.
95    ///
96    /// This method returns an iterator over all the backlogged deletions
97    /// which are now ready to be applied to your buffer.
98    ///
99    /// The [`BackloggedDeletions`] iterator yields the same kind of offset
100    /// ranges that [`integrate_deletion`](Replica::integrate_deletion) would
101    /// have produced had the `Deletion` been integrated right away.
102    ///
103    /// It's very important for the ranges to be deleted in the exact same
104    /// order in which they were yielded by the iterator. If you don't your
105    /// buffer could permanently diverge from the other peers.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// # use cola::Replica;
111    /// // A buffer with the text "Hello" is replicated between three peers.
112    /// let mut replica1 = Replica::new(1, 5);
113    /// let mut replica2 = replica1.fork(2);
114    /// let mut replica3 = replica2.fork(3);
115    ///
116    /// // Peer 1 inserts " world!" at the end of the buffer, and after
117    /// // integrating the insertion peer 2 deletes "world", leaving only
118    /// // "Hello!".
119    /// let insert_spc_world_excl = replica1.inserted(5, 7);
120    /// let _ = replica2.integrate_insertion(&insert_spc_world_excl);
121    /// let delete_world = replica2.deleted(5..11);
122    ///
123    /// // Peer 3 receives the deletion, but it can't integrate it right away
124    /// // because it doesn't have the context it needs. The deletion is stored
125    /// // in the backlog.
126    /// let ranges = replica3.integrate_deletion(&delete_world);
127    ///
128    /// assert!(ranges.is_empty());
129    ///
130    /// // After peer 3 receives the " world!" insertion from peer 1 it can
131    /// // finally integrate the deletion.
132    /// let _ = replica3.integrate_insertion(&insert_spc_world_excl);
133    ///
134    /// let mut deletions = replica3.backlogged_deletions();
135    /// assert_eq!(deletions.next(), Some(vec![5..11]));
136    /// assert_eq!(deletions.next(), None);
137    /// ```
138    #[inline]
139    pub fn backlogged_deletions(&mut self) -> BackloggedDeletions<'_> {
140        BackloggedDeletions::from_replica(self)
141    }
142
143    /// The [`integrate_insertion`](Replica::integrate_insertion) method is not
144    /// able to immediately produce an offset if the `Insertion` is itself
145    /// dependent on some context that the `Replica` doesn't yet have. When
146    /// this happens the `Insertion` is stored in an internal backlog of edits
147    /// that can't be processed yet, but may be in the future.
148    ///
149    /// This method returns an iterator over all the backlogged insertions
150    /// which are now ready to be applied to your buffer.
151    ///
152    /// The [`BackloggedInsertions`] iterator yields `(Text, Length)` pairs
153    /// containing the [`Text`] to be inserted and the offset at which it
154    /// should be inserted.
155    ///
156    /// It's very important for the insertions to be applied in the exact same
157    /// order in which they were yielded by the iterator. If you don't your
158    /// buffer could permanently diverge from the other peers.
159    ///
160    /// # Examples
161    ///
162    /// ```
163    /// # use cola::Replica;
164    /// // The buffer at peer 1 is "ab".
165    /// let mut replica1 = Replica::new(1, 2);
166    ///
167    /// // A second peer joins the session.
168    /// let mut replica2 = replica1.fork(2);
169    ///
170    /// // Peer 1 inserts 'c', 'd' and 'e' at the end of the buffer.
171    /// let insert_c = replica1.inserted(2, 1);
172    /// let insert_d = replica1.inserted(3, 1);
173    /// let insert_e = replica1.inserted(4, 1);
174    ///
175    /// // For some reason, the network layer messes up the order of the edits
176    /// // and they get to the second peer in the opposite order. Because each
177    /// // edit depends on the previous one, peer 2 can't merge the insertions
178    /// // of the 'd' and the 'e' until it sees the 'c'.
179    /// let none_e = replica2.integrate_insertion(&insert_e);
180    /// let none_d = replica2.integrate_insertion(&insert_d);
181    ///
182    /// assert!(none_e.is_none());
183    /// assert!(none_d.is_none());
184    ///
185    /// // Finally, peer 2 receives the 'c' and it's able merge it right away.
186    /// let offset_c = replica2.integrate_insertion(&insert_c).unwrap();
187    ///
188    /// assert_eq!(offset_c, 2);
189    ///
190    /// // Peer 2 now has all the context it needs to merge the rest of the
191    /// // edits that were previously backlogged.
192    /// let mut backlogged = replica2.backlogged_insertions();
193    ///
194    /// assert!(matches!(backlogged.next(), Some((_, 3))));
195    /// assert!(matches!(backlogged.next(), Some((_, 4))));
196    /// ```
197    #[inline]
198    pub fn backlogged_insertions(&mut self) -> BackloggedInsertions<'_> {
199        BackloggedInsertions::from_replica(self)
200    }
201
202    #[inline]
203    pub(crate) fn backlog_mut(&mut self) -> &mut Backlog {
204        &mut self.backlog
205    }
206
207    /// Returns `true` if this `Replica` is ready to merge the given
208    /// `Deletion`.
209    #[inline]
210    pub(crate) fn can_merge_deletion(&self, deletion: &Deletion) -> bool {
211        debug_assert!(!self.has_merged_deletion(deletion));
212
213        (
214            // Makes sure that we merge deletions in the same order they were
215            // created.
216            self.deletion_map.get(deletion.deleted_by()) + 1
217                == deletion.deletion_ts()
218        ) && (
219            // Makes sure that we have already merged all the insertions that
220            // the remote `Replica` had when it generated the deletion.
221            self.version_map >= *deletion.version_map()
222        )
223    }
224
225    /// Returns `true` if this `Replica` is ready to merge the given
226    /// `Insertion`.
227    #[inline]
228    pub(crate) fn can_merge_insertion(&self, insertion: &Insertion) -> bool {
229        debug_assert!(!self.has_merged_insertion(insertion));
230
231        (
232            // Makes sure that we merge insertions in the same order they were
233            // created.
234            //
235            // This is technically not needed to merge a single insertion (all
236            // that matters is that we know where to anchor the insertion), but
237            // it's needed to correctly increment the chararacter clock inside
238            // this `Replica`'s `VersionMap` without skipping any temporal
239            // range.
240            self.version_map.get(insertion.inserted_by()) == insertion.start()
241        ) && (
242            // Makes sure that we have already merged the insertion containing
243            // the anchor of this insertion.
244            self.version_map.get(insertion.anchor().replica_id())
245                >= insertion.anchor().character_ts()
246        )
247    }
248
249    #[doc(hidden)]
250    pub fn debug(&self) -> debug::DebugAsSelf<'_> {
251        self.into()
252    }
253
254    #[doc(hidden)]
255    pub fn debug_as_btree(&self) -> debug::DebugAsBtree<'_> {
256        self.into()
257    }
258
259    /// Creates a new `Replica` with the given [`ReplicaId`] by decoding the
260    /// contents of the [`EncodedReplica`].
261    ///
262    /// # Panics
263    ///
264    /// Panics if the [`ReplicaId`] is zero.
265    ///
266    /// # Examples
267    ///
268    /// ```
269    /// # use cola::{Replica, EncodedReplica};
270    /// let replica1 = Replica::new(1, 42);
271    ///
272    /// let encoded: EncodedReplica = replica1.encode();
273    ///
274    /// let replica2 = Replica::decode(2, &encoded).unwrap();
275    ///
276    /// assert_eq!(replica2.id(), 2);
277    /// ```
278    #[cfg(feature = "encode")]
279    #[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
280    #[track_caller]
281    #[inline]
282    pub fn decode(
283        id: ReplicaId,
284        encoded: &EncodedReplica,
285    ) -> Result<Self, DecodeError> {
286        if id == 0 {
287            panic::replica_id_is_zero();
288        }
289
290        if encoded.protocol_version() != PROTOCOL_VERSION {
291            return Err(DecodeError::DifferentProtocol {
292                encoded_on: encoded.protocol_version(),
293                decoding_on: PROTOCOL_VERSION,
294            });
295        }
296
297        if encoded.checksum() != &checksum(encoded.bytes()) {
298            return Err(DecodeError::ChecksumFailed);
299        }
300
301        let Some((
302            run_tree,
303            lamport_clock,
304            mut version_map,
305            mut deletion_map,
306            backlog,
307        )) = encode::decode(encoded.bytes())
308        else {
309            return Err(DecodeError::InvalidData);
310        };
311
312        version_map.fork_in_place(id, 0);
313
314        deletion_map.fork_in_place(id, 0);
315
316        let replica = Self {
317            id,
318            run_tree,
319            run_clock: RunClock::new(),
320            lamport_clock,
321            version_map,
322            deletion_map,
323            backlog,
324        };
325
326        Ok(replica)
327    }
328
329    /// Informs the `Replica` that you have deleted the characters in the given
330    /// offset range.
331    ///
332    /// This produces a [`Deletion`] which can be sent to all the other peers
333    /// to integrate the deletion into their own `Replica`s.
334    ///
335    /// # Panics
336    ///
337    /// Panics if the start of the range is greater than the end or if the end
338    /// is out of bounds (i.e. greater than the current length of your buffer).
339    ///
340    /// # Examples
341    ///
342    /// ```
343    /// # use cola::{Replica, Deletion};
344    /// // The buffer at peer 1 is "Hello World".
345    /// let mut replica1 = Replica::new(1, 11);
346    ///
347    /// // Peer 1 deletes "Hello ".
348    /// let deletion: Deletion = replica1.deleted(..6);
349    /// ```
350    #[track_caller]
351    #[must_use]
352    #[inline]
353    pub fn deleted<R>(&mut self, range: R) -> Deletion
354    where
355        R: RangeBounds<Length>,
356    {
357        let (start, end) = range_bounds_to_start_end(range, 0, self.len());
358
359        if end > self.len() {
360            panic::offset_out_of_bounds(end, self.len());
361        }
362
363        if start > end {
364            panic::start_greater_than_end(start, end);
365        }
366
367        if start == end {
368            return Deletion::no_op();
369        }
370
371        let deleted_range = (start..end).into();
372
373        let (start, start_ts, end, end_ts) =
374            self.run_tree.delete(deleted_range);
375
376        *self.deletion_map.this_mut() += 1;
377
378        Deletion::new(
379            start,
380            start_ts,
381            end,
382            end_ts,
383            self.version_map.clone(),
384            self.deletion_map.this(),
385        )
386    }
387
388    #[doc(hidden)]
389    pub fn empty_leaves(&self) -> (usize, usize) {
390        self.run_tree.count_empty_leaves()
391    }
392
393    /// Returns `true` if the given `Replica` shares the same document state as
394    /// this one.
395    ///
396    /// This is used in tests to make sure that an encode-decode roundtrip was
397    /// successful.
398    #[doc(hidden)]
399    pub fn eq_decoded(&self, other: &Self) -> bool {
400        self.run_tree == other.run_tree && self.backlog == other.backlog
401    }
402
403    /// Encodes the `Replica` in a custom binary format.
404    ///
405    /// This can be used to send a `Replica` to another peer over the network.
406    /// Once they have received the [`EncodedReplica`] they can decode it via
407    /// the [`decode`](Replica::decode) method.
408    ///
409    /// Note that if you want to collaborate within a single process you can
410    /// just [`fork`](Replica::fork) the `Replica` without having to encode it
411    /// and decode it again.
412    #[cfg(feature = "encode")]
413    #[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
414    #[inline]
415    pub fn encode(&self) -> EncodedReplica {
416        let bytes = encode::encode(self);
417        let checksum = checksum(&bytes);
418        EncodedReplica::new(PROTOCOL_VERSION, checksum, bytes)
419    }
420
421    /// Creates a new `Replica` with the given [`ReplicaId`] but with the same
422    /// internal state as this one.
423    ///
424    /// Note that this method should be used when the collaborative session is
425    /// limited to a single process (e.g. multiple threads working on the same
426    /// document). If you want to collaborate across different processes or
427    /// machines you should [`encode`](Replica::encode) the `Replica` and send
428    /// the result to the other peers.
429    ///
430    /// # Panics
431    ///
432    /// Panics if the [`ReplicaId`] is zero.
433    ///
434    /// # Examples
435    ///
436    /// ```
437    /// # use cola::{Replica, ReplicaId};
438    /// let replica1 = Replica::new(1, 0);
439    /// let replica2 = replica1.fork(2);
440    /// assert_eq!(replica2.id(), 2)
441    /// ```
442    #[track_caller]
443    #[inline]
444    pub fn fork(&self, new_id: ReplicaId) -> Self {
445        if new_id == 0 {
446            panic::replica_id_is_zero();
447        }
448
449        Self {
450            id: new_id,
451            run_tree: self.run_tree.clone(),
452            run_clock: RunClock::new(),
453            lamport_clock: self.lamport_clock,
454            version_map: self.version_map.fork(new_id, 0),
455            deletion_map: self.deletion_map.fork(new_id, 0),
456            backlog: self.backlog.clone(),
457        }
458    }
459
460    /// Returns `true` if this `Replica` has already merged the given
461    /// `Deletion`.
462    #[inline]
463    fn has_merged_deletion(&self, deletion: &Deletion) -> bool {
464        self.deletion_map.get(deletion.deleted_by()) >= deletion.deletion_ts()
465    }
466
467    /// Returns `true` if this `Replica` has already merged the given
468    /// `Insertion`.
469    #[inline]
470    fn has_merged_insertion(&self, insertion: &Insertion) -> bool {
471        self.version_map.get(insertion.inserted_by()) > insertion.start()
472    }
473
474    /// Returns the id of this `Replica`.
475    #[inline]
476    pub fn id(&self) -> ReplicaId {
477        self.id
478    }
479
480    /// Informs the `Replica` that you have inserted `len` characters at the
481    /// given offset.
482    ///
483    /// This produces an [`Insertion`] which can be sent to all the other peers
484    /// to integrate the insertion into their own `Replica`s.
485    ///
486    /// # Panics
487    ///
488    /// Panics if the offset is out of bounds (i.e. greater than the current
489    /// length of your buffer).
490    ///
491    /// # Examples
492    ///
493    /// ```
494    /// # use cola::{Replica, Insertion};
495    /// // The buffer at peer 1 is "ab".
496    /// let mut replica1 = Replica::new(1, 2);
497    ///
498    /// // Peer 1 inserts two characters between the 'a' and the 'b'.
499    /// let insertion: Insertion = replica1.inserted(1, 2);
500    /// ```
501    #[track_caller]
502    #[must_use]
503    #[inline]
504    pub fn inserted(&mut self, at_offset: Length, len: Length) -> Insertion {
505        if at_offset > self.len() {
506            panic::offset_out_of_bounds(at_offset, self.len());
507        }
508
509        if len == 0 {
510            return Insertion::no_op();
511        }
512
513        let start = self.version_map.this();
514
515        *self.version_map.this_mut() += len;
516
517        let end = self.version_map.this();
518
519        let text = Text::new(self.id, start..end);
520
521        let (anchor, anchor_ts) = self.run_tree.insert(
522            at_offset,
523            text.clone(),
524            &mut self.run_clock,
525            &mut self.lamport_clock,
526        );
527
528        Insertion::new(
529            anchor,
530            anchor_ts,
531            text,
532            self.lamport_clock.highest(),
533            self.run_clock.last(),
534        )
535    }
536
537    #[allow(clippy::len_without_is_empty)]
538    #[doc(hidden)]
539    pub fn len(&self) -> Length {
540        self.run_tree.len()
541    }
542
543    /// Integrates a remote [`Deletion`] into this `Replica`, returning a
544    /// sequence of offset [`Range`]s to be deleted from your buffer.
545    ///
546    /// The number of ranges can be:
547    ///
548    /// - zero, if the `Deletion` has already been integrated by this `Replica`
549    /// or if it depends on some context that this `Replica` doesn't yet have
550    /// (see the [`backlogged_deletions`](Replica::backlogged_deletions) method
551    /// which handles this case);
552    ///
553    /// - one, if there haven't been any concurrent insertions (local or
554    /// remote) within the original range of the deletion;
555    ///
556    /// - more than one, if there have been. In this case the deleted range has
557    /// been split into multiple smaller ranges that "skip over" the newly
558    /// inserted text.
559    ///
560    /// The ranges are guaranteed to be sorted in ascending order and to not
561    /// overlap, i.e. for any two indices `i` and `j` where `i < j` and `j <
562    /// ranges.len()` it holds that `ranges[i].end < ranges[j].start` (and of
563    /// course that `ranges[i].start < ranges[i].end`).
564    ///
565    /// # Examples
566    ///
567    /// ```
568    /// # use cola::Replica;
569    /// // Peer 1 starts with the text "abcd" and sends it to a second peer.
570    /// let mut replica1 = Replica::new(1, 4);
571    ///
572    /// let mut replica2 = replica1.fork(2);
573    ///
574    /// // Peer 1 deletes the "bc" in "abcd".
575    /// let deletion = replica1.deleted(1..3);
576    ///
577    /// // Concurrently, peer 2 inserts a single character at start of the
578    /// // document.
579    /// let _ = replica2.inserted(0, 1);
580    ///
581    /// // Now peer 2 receives the deletion from peer 1. Since the previous
582    /// // insertion was outside of the deleted region the latter is still
583    /// // contiguous at this peer.
584    /// let ranges = replica2.integrate_deletion(&deletion);
585    ///
586    /// assert_eq!(ranges.as_slice(), &[2..4]);
587    /// ```
588    ///
589    /// ```
590    /// # use cola::Replica;
591    /// // Same as before..
592    /// let mut replica1 = Replica::new(1, 4);
593    /// let mut replica2 = replica1.fork(2);
594    ///
595    /// let deletion = replica1.deleted(1..3);
596    ///
597    /// // ..except now peer 2 inserts a single character between the 'b' and
598    /// // the 'c'.
599    /// let _ = replica2.inserted(2, 1);
600    ///
601    /// // Now peer 2 receives the deletion from peer 1. Since the previous
602    /// // insertion was inside the deleted range, the latter has now been
603    /// // split into two separate ranges.
604    /// let ranges = replica2.integrate_deletion(&deletion);
605    ///
606    /// assert_eq!(ranges.as_slice(), &[1..2, 3..4]);
607    /// ```
608    #[must_use]
609    #[inline]
610    pub fn integrate_deletion(
611        &mut self,
612        deletion: &Deletion,
613    ) -> Vec<Range<Length>> {
614        if deletion.is_no_op() || self.has_merged_deletion(deletion) {
615            Vec::new()
616        } else if self.can_merge_deletion(deletion) {
617            self.merge_unchecked_deletion(deletion)
618        } else {
619            self.backlog.insert_deletion(deletion.clone());
620            Vec::new()
621        }
622    }
623
624    /// Integrates a remote [`Insertion`] into this `Replica`, optionally
625    /// returning the offset at which to insert the `Insertion`'s
626    /// [`Text`](Insertion::text) into your buffer.
627    ///
628    /// A `None` value can be returned if the `Insertion` has already been
629    /// integrated by this `Replica` or if it depends on some context that this
630    /// `Replica` doesn't yet have (see the
631    /// [`backlogged_insertions`](Replica::backlogged_insertions) method which
632    /// handles this case).
633    ///
634    /// # Examples
635    ///
636    /// ```
637    /// # use cola::{Replica, Insertion};
638    /// // Peer 1 starts with the text "ab" and sends it to a second peer.
639    /// let mut replica1 = Replica::new(1, 2);
640    ///
641    /// let mut replica2 = replica1.fork(2);
642    ///
643    /// // Peer 1 inserts two characters between the 'a' and the 'b'.
644    /// let insertion_1 = replica1.inserted(1, 2);
645    ///
646    /// // Concurrently, peer 2 inserts a character at the start of the
647    /// // document.
648    /// let insertion_2 = replica2.inserted(0, 1);
649    ///
650    /// // Peer 1 receives this insertion, and since there haven't been any
651    /// // concurrent insertions at the start of the document, its offset
652    /// // hasn't changed.
653    /// let offset_2 = replica1.integrate_insertion(&insertion_2).unwrap();
654    ///
655    /// assert_eq!(offset_2, 0);
656    ///
657    /// // If we try to integrate the same insertion again, we'll get a `None`.
658    /// assert!(replica1.integrate_insertion(&insertion_2).is_none());
659    ///
660    /// // Finally, peer 2 receives the first insertion from peer 1. Its text
661    /// // should be inserted between the 'a' and the 'b', which is at offset
662    /// // 2 at this peer.
663    /// let offset_1 = replica2.integrate_insertion(&insertion_1).unwrap();
664    ///
665    /// assert_eq!(offset_1, 2);
666    /// ```
667    #[must_use]
668    #[inline]
669    pub fn integrate_insertion(
670        &mut self,
671        insertion: &Insertion,
672    ) -> Option<Length> {
673        if insertion.is_no_op() || self.has_merged_insertion(insertion) {
674            None
675        } else if self.can_merge_insertion(insertion) {
676            Some(self.merge_unchecked_insertion(insertion))
677        } else {
678            self.backlog.insert_insertion(insertion.clone());
679            None
680        }
681    }
682
683    /// Merges the given [`Deletion`] without checking whether it can be
684    /// merged.
685    #[inline]
686    pub(crate) fn merge_unchecked_deletion(
687        &mut self,
688        deletion: &Deletion,
689    ) -> Vec<Range<Length>> {
690        debug_assert!(self.can_merge_deletion(deletion));
691
692        let ranges = self.run_tree.merge_deletion(deletion);
693
694        *self.deletion_map.get_mut(deletion.deleted_by()) =
695            deletion.deletion_ts();
696
697        ranges
698    }
699
700    /// Merges the given [`Insertion`] without checking whether it can be
701    /// merged.
702    #[inline]
703    pub(crate) fn merge_unchecked_insertion(
704        &mut self,
705        insertion: &Insertion,
706    ) -> Length {
707        debug_assert!(self.can_merge_insertion(insertion));
708
709        let offset = self.run_tree.merge_insertion(insertion);
710
711        *self.version_map.get_mut(insertion.inserted_by()) += insertion.len();
712
713        self.lamport_clock.merge(insertion.lamport_ts());
714
715        offset
716    }
717
718    /// Creates a new `Replica` with the given [`ReplicaId`] from the initial
719    /// [`Length`] of your buffer.
720    ///
721    /// Note that if you have multiple peers working on the same document you
722    /// should only use this constructor on the first peer, usually the one
723    /// that starts the collaboration session.
724    ///
725    /// The other peers should get their `Replica` from another `Replica`
726    /// already in the session by either:
727    ///
728    /// a) [`fork`](Replica::fork)ing it if the collaboration happens all in
729    /// the same process (e.g. a text editor with plugins running on separate
730    /// threads),
731    ///
732    /// b) [`encode`](Replica::encode)ing it and sending the result over the
733    /// network if the collaboration is between different processes or
734    /// machines.
735    ///
736    /// # Panics
737    ///
738    /// Panics if the [`ReplicaId`] is zero.
739    ///
740    /// # Examples
741    ///
742    /// ```
743    /// # use std::thread;
744    /// # use cola::Replica;
745    /// // A text editor initializes a new Replica on the main thread where the
746    /// // buffer is "foo".
747    /// let replica_main = Replica::new(1, 3);
748    ///
749    /// // It then starts a plugin on a separate thread and wants to give it a
750    /// // Replica to keep its buffer synchronized with the one on the main
751    /// // thread. It does *not* call `new()` again, but instead forks the
752    /// // existing Replica and sends it to the new thread.
753    /// let replica_plugin = replica_main.fork(2);
754    ///
755    /// thread::spawn(move || {
756    ///     // The plugin can now use its Replica to exchange edits with the
757    ///     // main thread.
758    ///     println!("{replica_plugin:?}");
759    /// });
760    /// ```
761    #[track_caller]
762    #[inline]
763    pub fn new(id: ReplicaId, len: Length) -> Self {
764        if id == 0 {
765            panic::replica_id_is_zero();
766        }
767
768        let mut run_clock = RunClock::new();
769
770        let mut lamport_clock = LamportClock::new();
771
772        let initial_text = Text::new(id, 0..len);
773
774        let first_run =
775            EditRun::new(initial_text, run_clock.next(), lamport_clock.next());
776
777        let run_tree = RunTree::new(first_run);
778
779        Self {
780            id,
781            run_tree,
782            run_clock,
783            lamport_clock,
784            version_map: VersionMap::new(id, len),
785            deletion_map: DeletionMap::new(id, 0),
786            backlog: Backlog::new(),
787        }
788    }
789
790    #[doc(hidden)]
791    pub fn num_runs(&self) -> usize {
792        self.run_tree.count_empty_leaves().1
793    }
794}
795
796impl core::fmt::Debug for Replica {
797    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
798        struct DebugHexU64(u64);
799
800        impl core::fmt::Debug for DebugHexU64 {
801            fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
802                write!(f, "{:x}", self.0)
803            }
804        }
805
806        // In the public Debug we just print the ReplicaId to avoid leaking
807        // our internals.
808        //
809        // During development the `Replica::debug()` method (which is public
810        // but hidden from the API) can be used to obtain a more useful
811        // representation.
812        f.debug_tuple("Replica").field(&DebugHexU64(self.id)).finish()
813    }
814}
815
816pub type LamportTs = u64;
817
818/// A distributed logical clock used to determine if a run was in the document
819/// when another run was inserted.
820///
821/// If it was then its [`LamportTs`] is guaranteed to be strictly less than the
822/// new run's [`LamportTs`].
823///
824/// See [this](https://en.wikipedia.org/wiki/Lamport_timestamp) for more.
825#[derive(Copy, Clone)]
826#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
827pub struct LamportClock(LamportTs);
828
829impl core::fmt::Debug for LamportClock {
830    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
831        self.0.fmt(f)
832    }
833}
834
835impl LamportClock {
836    #[inline]
837    pub fn highest(&self) -> LamportTs {
838        self.0.saturating_sub(1)
839    }
840
841    #[inline]
842    fn merge(&mut self, remote_ts: LamportTs) {
843        if remote_ts >= self.0 {
844            self.0 = remote_ts + 1;
845        }
846    }
847
848    #[inline]
849    fn new() -> Self {
850        Self(0)
851    }
852
853    #[inline]
854    pub fn next(&mut self) -> LamportTs {
855        let next = self.0;
856        self.0 += 1;
857        next
858    }
859}
860
861pub type RunTs = u64;
862
863/// A local clock used increased every time a new insertion run is started.
864#[derive(Copy, Clone)]
865pub struct RunClock(RunTs);
866
867impl core::fmt::Debug for RunClock {
868    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
869        self.0.fmt(f)
870    }
871}
872
873impl RunClock {
874    #[inline]
875    fn last(&self) -> RunTs {
876        self.0.saturating_sub(1)
877    }
878
879    #[inline]
880    fn new() -> Self {
881        Self(0)
882    }
883
884    #[inline]
885    pub fn next(&mut self) -> RunTs {
886        let next = self.0;
887        self.0 += 1;
888        next
889    }
890}
891
892pub type DeletionTs = u64;
893
894#[cfg(feature = "encode")]
895mod encode {
896    use serde::{de, ser};
897
898    use super::*;
899
900    type EncodedFields =
901        (RunTree, LamportClock, VersionMap, DeletionMap, Backlog);
902
903    #[inline]
904    pub(super) fn encode(replica: &Replica) -> Vec<u8> {
905        let mut encoded = Vec::new();
906
907        encode_field(&mut encoded, &replica.run_tree);
908        encode_field(&mut encoded, &replica.lamport_clock);
909        encode_field(&mut encoded, &replica.version_map);
910        encode_field(&mut encoded, &replica.deletion_map);
911        encode_field(&mut encoded, &replica.backlog);
912
913        encoded
914    }
915
916    #[inline]
917    pub(super) fn decode(bytes: &[u8]) -> Option<EncodedFields> {
918        let (run_tree, bytes) = decode_field(bytes)?;
919        let (lamport_clock, bytes) = decode_field(bytes)?;
920        let (version_map, bytes) = decode_field(bytes)?;
921        let (deletion_map, bytes) = decode_field(bytes)?;
922        let (backlog, bytes) = decode_field(bytes)?;
923
924        if bytes.is_empty() {
925            Some((run_tree, lamport_clock, version_map, deletion_map, backlog))
926        } else {
927            None
928        }
929    }
930
931    #[inline]
932    fn encode_field<T>(buf: &mut Vec<u8>, field: &T)
933    where
934        T: ser::Serialize,
935    {
936        let field_bytes = serialize(field);
937        let len_bytes = field_bytes.len().to_le_bytes();
938        buf.extend_from_slice(&len_bytes);
939        buf.extend_from_slice(&field_bytes);
940    }
941
942    #[inline]
943    fn decode_field<'a, T>(buf: &'a [u8]) -> Option<(T, &'a [u8])>
944    where
945        T: de::Deserialize<'a>,
946    {
947        // The first 8 bytes represent the length of the encoded field.
948        let (len_bytes, rest) = if buf.len() >= 8 {
949            buf.split_at(8)
950        } else {
951            return None;
952        };
953
954        let len_bytes: [u8; 8] = len_bytes.try_into().ok()?;
955
956        let len = usize::from_le_bytes(len_bytes);
957
958        let (encoded_field, rest) = if rest.len() >= len {
959            rest.split_at(len)
960        } else {
961            return None;
962        };
963
964        deserialize::<T>(encoded_field).map(|field| (field, rest))
965    }
966
967    #[inline]
968    fn serialize<T>(value: &T) -> Vec<u8>
969    where
970        T: ser::Serialize,
971    {
972        bincode::serialize(value).expect("failed to serialize")
973    }
974
975    #[inline]
976    fn deserialize<'a, T>(bytes: &'a [u8]) -> Option<T>
977    where
978        T: de::Deserialize<'a>,
979    {
980        bincode::deserialize(bytes).ok()
981    }
982}
983
984mod debug {
985    use core::fmt::Debug;
986
987    use super::*;
988
989    pub struct DebugAsSelf<'a>(BaseDebug<'a, run_tree::DebugAsSelf<'a>>);
990
991    impl<'a> From<&'a Replica> for DebugAsSelf<'a> {
992        #[inline]
993        fn from(replica: &'a Replica) -> DebugAsSelf<'a> {
994            let base = BaseDebug {
995                replica,
996                debug_run_tree: replica.run_tree.debug_as_self(),
997            };
998
999            Self(base)
1000        }
1001    }
1002
1003    impl<'a> core::fmt::Debug for DebugAsSelf<'a> {
1004        fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1005            self.0.fmt(f)
1006        }
1007    }
1008
1009    pub struct DebugAsBtree<'a>(BaseDebug<'a, run_tree::DebugAsBtree<'a>>);
1010
1011    impl<'a> From<&'a Replica> for DebugAsBtree<'a> {
1012        #[inline]
1013        fn from(replica: &'a Replica) -> DebugAsBtree<'a> {
1014            let base = BaseDebug {
1015                replica,
1016                debug_run_tree: replica.run_tree.debug_as_btree(),
1017            };
1018
1019            Self(base)
1020        }
1021    }
1022
1023    impl<'a> core::fmt::Debug for DebugAsBtree<'a> {
1024        fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1025            self.0.fmt(f)
1026        }
1027    }
1028
1029    struct BaseDebug<'a, T: Debug> {
1030        replica: &'a Replica,
1031        debug_run_tree: T,
1032    }
1033
1034    impl<'a, T: Debug> Debug for BaseDebug<'a, T> {
1035        fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
1036            let replica = &self.replica;
1037
1038            f.debug_struct("Replica")
1039                .field("id", &replica.id)
1040                .field("run_tree", &self.debug_run_tree)
1041                .field("run_indices", &replica.run_tree.run_indices())
1042                .field("lamport_clock", &replica.lamport_clock)
1043                .field("run_clock", &replica.run_clock)
1044                .field("version_map", &replica.version_map)
1045                .field("deletion_map", &replica.deletion_map)
1046                .field("backlog", &replica.backlog)
1047                .finish()
1048        }
1049    }
1050}