cola/
backlog.rs

1use alloc::collections::VecDeque;
2use core::ops::Range;
3
4use crate::*;
5
6/// A [`Replica`]'s backlog of remote edits that have been received from other
7/// replicas but have not yet been merged.
8///
9/// See [`Replica::backlogged`] for more information.
10#[derive(Debug, Clone, Default, PartialEq)]
11pub(crate) struct Backlog {
12    insertions: ReplicaIdMap<InsertionsBacklog>,
13    deletions: ReplicaIdMap<DeletionsBacklog>,
14}
15
16impl Backlog {
17    pub fn assert_invariants(
18        &self,
19        version_map: &VersionMap,
20        deletion_map: &DeletionMap,
21    ) {
22        for (&id, insertions) in self.insertions.iter() {
23            insertions.assert_invariants(id, version_map);
24        }
25        for (&id, deletions) in self.deletions.iter() {
26            deletions.assert_invariants(id, deletion_map);
27        }
28    }
29
30    /// Inserts a new [`Deletion`] into the backlog.
31    ///
32    /// Runs in `O(n)` in the number of deletions already in the backlog, with
33    /// a best-case of `O(log n)`.
34    ///
35    /// # Panics
36    ///
37    /// Panics if the deletion has already been backlogged.
38    #[inline]
39    pub fn insert_deletion(&mut self, deletion: Deletion) {
40        self.deletions
41            .entry(deletion.deleted_by())
42            .or_default()
43            .insert(deletion);
44    }
45
46    /// Inserts a new [`Insertion`] into the backlog.
47    ///
48    /// Runs in `O(n)` in the number of insertions already in the backlog, with
49    /// a best-case of `O(log n)`.
50    ///
51    /// # Panics
52    ///
53    /// Panics if the insertion has already been backlogged.
54    #[inline]
55    pub fn insert_insertion(&mut self, insertion: Insertion) {
56        self.insertions
57            .entry(insertion.inserted_by())
58            .or_default()
59            .insert(insertion);
60    }
61
62    /// Creates a new, empty `Backlog`.
63    #[inline]
64    pub fn new() -> Self {
65        Self::default()
66    }
67}
68
69/// Stores the backlogged [`Insertion`]s of a particular replica.
70#[derive(Clone, Default, PartialEq)]
71struct InsertionsBacklog {
72    insertions: VecDeque<Insertion>,
73}
74
75impl core::fmt::Debug for InsertionsBacklog {
76    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
77        f.debug_list()
78            .entries(self.insertions.iter().map(|i| i.text().temporal_range()))
79            .finish()
80    }
81}
82
83impl InsertionsBacklog {
84    fn assert_invariants(&self, id: ReplicaId, version_map: &VersionMap) {
85        let Some(first) = self.insertions.front() else {
86            return;
87        };
88
89        assert!(version_map.get(id) <= first.start());
90
91        let mut prev_end = 0;
92
93        for insertion in &self.insertions {
94            assert_eq!(insertion.inserted_by(), id);
95            assert!(insertion.start() >= prev_end);
96            prev_end = insertion.end();
97        }
98    }
99
100    /// # Panics
101    ///
102    /// Panics if the insertion has already been inserted.
103    #[inline]
104    fn insert(&mut self, insertion: Insertion) {
105        let offset = self
106            .insertions
107            .binary_search_by(|probe| probe.start().cmp(&insertion.start()))
108            .unwrap_err();
109
110        self.insertions.insert(offset, insertion);
111    }
112}
113
114/// Stores the backlogged [`Deletion`]s of a particular replica.
115#[derive(Clone, Default, PartialEq)]
116struct DeletionsBacklog {
117    deletions: VecDeque<Deletion>,
118}
119
120impl core::fmt::Debug for DeletionsBacklog {
121    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
122        f.debug_list()
123            .entries(self.deletions.iter().map(|d| d.deletion_ts()))
124            .finish()
125    }
126}
127
128impl DeletionsBacklog {
129    fn assert_invariants(&self, id: ReplicaId, deletion_map: &DeletionMap) {
130        let Some(first) = self.deletions.front() else {
131            return;
132        };
133
134        assert!(deletion_map.get(id) <= first.deletion_ts());
135
136        let mut prev_ts = 0;
137
138        for deletion in &self.deletions {
139            assert_eq!(deletion.deleted_by(), id);
140            assert!(deletion.deletion_ts() > prev_ts);
141            prev_ts = deletion.deletion_ts();
142        }
143    }
144
145    /// # Panics
146    ///
147    /// Panics if the deletion has already inserted.
148    #[inline]
149    fn insert(&mut self, deletion: Deletion) {
150        let offset = self
151            .deletions
152            .binary_search_by(|probe| {
153                probe.deletion_ts().cmp(&deletion.deletion_ts())
154            })
155            .unwrap_err();
156
157        self.deletions.insert(offset, deletion);
158    }
159}
160
161/// An iterator over the backlogged deletions that are ready to be
162/// applied to a [`Replica`].
163///
164/// This struct is created by the
165/// [`backlogged_deletions`](Replica::backlogged_deletions) method on
166/// [`Replica`]. See its documentation for more information.
167pub struct BackloggedDeletions<'a> {
168    replica: &'a mut Replica,
169    current: Option<&'a mut DeletionsBacklog>,
170    iter: ReplicaIdMapValuesMut<'a, DeletionsBacklog>,
171}
172
173impl<'a> BackloggedDeletions<'a> {
174    #[inline]
175    pub(crate) fn from_replica(replica: &'a mut Replica) -> Self {
176        let backlog = replica.backlog_mut();
177
178        // We transmute the exclusive reference to the backlog into the same
179        // type to get around the borrow checker.
180        //
181        // SAFETY: this is safe because in the `Iterator` implementation we
182        // never access the backlog through the `Replica`, neither directly nor
183        // by calling any methods on `Replica` that would access the backlog.
184        let backlog = unsafe {
185            core::mem::transmute::<&mut Backlog, &mut Backlog>(backlog)
186        };
187
188        let mut iter = backlog.deletions.values_mut();
189
190        let current = iter.next();
191
192        Self { replica, iter, current }
193    }
194}
195
196impl Iterator for BackloggedDeletions<'_> {
197    type Item = Vec<Range<Length>>;
198
199    #[inline]
200    fn next(&mut self) -> Option<Self::Item> {
201        let deletions = self.current.as_mut()?;
202
203        let Some(first) = deletions.deletions.front() else {
204            self.current = self.iter.next();
205            return self.next();
206        };
207
208        if self.replica.can_merge_deletion(first) {
209            let first = deletions.deletions.pop_front().unwrap();
210            let ranges = self.replica.merge_unchecked_deletion(&first);
211            if ranges.is_empty() {
212                self.next()
213            } else {
214                Some(ranges)
215            }
216        } else {
217            self.current = self.iter.next();
218            self.next()
219        }
220    }
221}
222
223impl core::iter::FusedIterator for BackloggedDeletions<'_> {}
224
225/// An iterator over the backlogged insertions that are ready to be
226/// applied to a [`Replica`].
227///
228/// This struct is created by the
229/// [`backlogged_insertion`](Replica::backlogged_insertions) method on
230/// [`Replica`]. See its documentation for more information.
231pub struct BackloggedInsertions<'a> {
232    replica: &'a mut Replica,
233    current: Option<&'a mut InsertionsBacklog>,
234    iter: ReplicaIdMapValuesMut<'a, InsertionsBacklog>,
235}
236
237impl<'a> BackloggedInsertions<'a> {
238    #[inline]
239    pub(crate) fn from_replica(replica: &'a mut Replica) -> Self {
240        let backlog = replica.backlog_mut();
241
242        // We transmute the exclusive reference to the backlog into the same
243        // type to get around the borrow checker.
244        //
245        // SAFETY: this is safe because in the `Iterator` implementation we
246        // never access the backlog through the `Replica`, neither directly nor
247        // by calling any methods on `Replica` that would access the backlog.
248        let backlog = unsafe {
249            core::mem::transmute::<&mut Backlog, &mut Backlog>(backlog)
250        };
251
252        let mut iter = backlog.insertions.values_mut();
253
254        let current = iter.next();
255
256        Self { replica, current, iter }
257    }
258}
259
260impl Iterator for BackloggedInsertions<'_> {
261    type Item = (Text, Length);
262
263    #[inline]
264    fn next(&mut self) -> Option<Self::Item> {
265        let insertions = self.current.as_mut()?;
266
267        let Some(first) = insertions.insertions.front() else {
268            self.current = self.iter.next();
269            return self.next();
270        };
271
272        if self.replica.can_merge_insertion(first) {
273            let first = insertions.insertions.pop_front().unwrap();
274            let edit = self.replica.merge_unchecked_insertion(&first);
275            Some((first.text().clone(), edit))
276        } else {
277            self.current = self.iter.next();
278            self.next()
279        }
280    }
281}
282
283impl core::iter::FusedIterator for BackloggedInsertions<'_> {}
284
285#[cfg(feature = "encode")]
286pub(crate) mod encode {
287    use super::*;
288    use crate::encode::{Decode, DecodeWithCtx, Encode, IntDecodeError};
289    use crate::version_map::encode::BaseMapDecodeError;
290
291    impl InsertionsBacklog {
292        #[inline(always)]
293        fn iter(&self) -> impl Iterator<Item = &Insertion> + '_ {
294            self.insertions.iter()
295        }
296
297        #[inline(always)]
298        fn len(&self) -> usize {
299            self.insertions.len()
300        }
301
302        #[inline(always)]
303        fn push(&mut self, insertion: Insertion) {
304            self.insertions.push_back(insertion);
305        }
306    }
307
308    impl DeletionsBacklog {
309        #[inline(always)]
310        fn iter(&self) -> impl Iterator<Item = &Deletion> + '_ {
311            self.deletions.iter()
312        }
313
314        #[inline(always)]
315        fn len(&self) -> usize {
316            self.deletions.len()
317        }
318
319        #[inline(always)]
320        fn push(&mut self, deletion: Deletion) {
321            self.deletions.push_back(deletion);
322        }
323    }
324
325    impl Encode for Backlog {
326        #[inline]
327        fn encode(&self, buf: &mut Vec<u8>) {
328            (self.insertions.len() as u64).encode(buf);
329
330            for (id, insertions) in &self.insertions {
331                ReplicaIdInsertions::new(*id, insertions).encode(buf);
332            }
333
334            (self.deletions.len() as u64).encode(buf);
335
336            for (id, deletions) in &self.deletions {
337                ReplicaIdDeletions::new(*id, deletions).encode(buf);
338            }
339        }
340    }
341
342    pub(crate) enum BacklogDecodeError {
343        Int(IntDecodeError),
344        VersionMap(BaseMapDecodeError<Length>),
345    }
346
347    impl From<IntDecodeError> for BacklogDecodeError {
348        #[inline(always)]
349        fn from(err: IntDecodeError) -> Self {
350            Self::Int(err)
351        }
352    }
353
354    impl From<BaseMapDecodeError<Length>> for BacklogDecodeError {
355        #[inline(always)]
356        fn from(err: BaseMapDecodeError<Length>) -> Self {
357            Self::VersionMap(err)
358        }
359    }
360
361    impl core::fmt::Display for BacklogDecodeError {
362        #[inline]
363        fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
364            let err: &dyn core::fmt::Display = match self {
365                Self::VersionMap(err) => err,
366                Self::Int(err) => err,
367            };
368
369            write!(f, "Backlog: couldn't be decoded: {err}")
370        }
371    }
372
373    impl Decode for Backlog {
374        type Value = Self;
375
376        type Error = BacklogDecodeError;
377
378        #[inline]
379        fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error> {
380            let (num_replicas, mut buf) = u64::decode(buf)?;
381
382            let mut insertions = ReplicaIdMap::default();
383
384            for _ in 0..num_replicas {
385                let ((), new_buf) =
386                    ReplicaIdInsertions::decode(buf, &mut insertions)?;
387                buf = new_buf;
388            }
389
390            let (num_replicas, mut buf) = u64::decode(buf)?;
391
392            let mut deletions = ReplicaIdMap::default();
393
394            for _ in 0..num_replicas {
395                let ((), new_buf) =
396                    ReplicaIdDeletions::decode(buf, &mut deletions)?;
397                buf = new_buf;
398            }
399
400            let this = Self { insertions, deletions };
401
402            Ok((this, buf))
403        }
404    }
405
406    struct ReplicaIdInsertions<'a> {
407        replica_id: ReplicaId,
408        insertions: &'a InsertionsBacklog,
409    }
410
411    impl<'a> ReplicaIdInsertions<'a> {
412        #[inline]
413        fn new(
414            replica_id: ReplicaId,
415            insertions: &'a InsertionsBacklog,
416        ) -> Self {
417            Self { replica_id, insertions }
418        }
419    }
420
421    impl Encode for ReplicaIdInsertions<'_> {
422        #[inline]
423        fn encode(&self, buf: &mut Vec<u8>) {
424            self.replica_id.encode(buf);
425
426            (self.insertions.len() as u64).encode(buf);
427
428            for insertion in self.insertions.iter() {
429                insertion.anchor().encode(buf);
430                let range = insertion.text().temporal_range();
431                range.start.encode(buf);
432                range.len().encode(buf);
433                insertion.run_ts().encode(buf);
434                insertion.lamport_ts().encode(buf);
435            }
436        }
437    }
438
439    impl DecodeWithCtx for ReplicaIdInsertions<'_> {
440        type Value = ();
441
442        type Ctx = ReplicaIdMap<InsertionsBacklog>;
443
444        type Error = BacklogDecodeError;
445
446        #[inline]
447        fn decode<'buf>(
448            buf: &'buf [u8],
449            ctx: &mut Self::Ctx,
450        ) -> Result<((), &'buf [u8]), Self::Error> {
451            let (replica_id, buf) = ReplicaId::decode(buf)?;
452
453            let (num_insertions, mut buf) = u64::decode(buf)?;
454
455            let mut insertions = InsertionsBacklog::default();
456
457            for _ in 0..num_insertions {
458                let (anchor, new_buf) = InnerAnchor::decode(buf)?;
459                let (start, new_buf) = Length::decode(new_buf)?;
460                let (len, new_buf) = Length::decode(new_buf)?;
461                let (run_ts, new_buf) = RunTs::decode(new_buf)?;
462                let (lamport_ts, new_buf) = LamportTs::decode(new_buf)?;
463
464                let insertion = Insertion::new(
465                    anchor,
466                    Text::new(replica_id, start..start + len),
467                    lamport_ts,
468                    run_ts,
469                );
470
471                insertions.push(insertion);
472
473                buf = new_buf;
474            }
475
476            ctx.insert(replica_id, insertions);
477
478            Ok(((), buf))
479        }
480    }
481
482    struct ReplicaIdDeletions<'a> {
483        replica_id: ReplicaId,
484        deletions: &'a DeletionsBacklog,
485    }
486
487    impl<'a> ReplicaIdDeletions<'a> {
488        #[inline]
489        fn new(
490            replica_id: ReplicaId,
491            deletions: &'a DeletionsBacklog,
492        ) -> Self {
493            Self { replica_id, deletions }
494        }
495    }
496
497    impl Encode for ReplicaIdDeletions<'_> {
498        #[inline]
499        fn encode(&self, buf: &mut Vec<u8>) {
500            self.replica_id.encode(buf);
501
502            (self.deletions.len() as u64).encode(buf);
503
504            for deletion in self.deletions.iter() {
505                deletion.start().encode(buf);
506                deletion.end().encode(buf);
507                deletion.version_map().encode(buf);
508                deletion.deletion_ts().encode(buf);
509            }
510        }
511    }
512
513    impl DecodeWithCtx for ReplicaIdDeletions<'_> {
514        type Value = ();
515
516        type Ctx = ReplicaIdMap<DeletionsBacklog>;
517
518        type Error = BacklogDecodeError;
519
520        #[inline]
521        fn decode<'buf>(
522            buf: &'buf [u8],
523            ctx: &mut Self::Ctx,
524        ) -> Result<((), &'buf [u8]), Self::Error> {
525            let (replica_id, buf) = ReplicaId::decode(buf)?;
526
527            let (num_deletions, mut buf) = u64::decode(buf)?;
528
529            let mut deletions = DeletionsBacklog::default();
530
531            for _ in 0..num_deletions {
532                let (start, new_buf) = InnerAnchor::decode(buf)?;
533                let (end, new_buf) = InnerAnchor::decode(new_buf)?;
534                let (version_map, new_buf) = VersionMap::decode(new_buf)?;
535                let (deletion_ts, new_buf) = DeletionTs::decode(new_buf)?;
536
537                let deletion =
538                    Deletion::new(start, end, version_map, deletion_ts);
539
540                deletions.push(deletion);
541
542                buf = new_buf;
543            }
544
545            ctx.insert(replica_id, deletions);
546
547            Ok(((), buf))
548        }
549    }
550}
551
552#[cfg(feature = "serde")]
553mod serde {
554    crate::encode::impl_serialize!(super::Backlog);
555    crate::encode::impl_deserialize!(super::Backlog);
556}