cola/
backlog.rs

1use alloc::collections::VecDeque;
2use core::ops::Range;
3
4use crate::*;
5
6/// A [`Replica`]'s backlog of remote edits that have been received from other
7/// replicas but have not yet been merged.
8///
9/// See [`Replica::backlogged`] for more information.
10#[derive(Debug, Clone, Default, PartialEq)]
11#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
12pub(crate) struct Backlog {
13    insertions: ReplicaIdMap<InsertionsBacklog>,
14    deletions: ReplicaIdMap<DeletionsBacklog>,
15}
16
17impl Backlog {
18    pub fn assert_invariants(
19        &self,
20        version_map: &VersionMap,
21        deletion_map: &DeletionMap,
22    ) {
23        for (&id, insertions) in self.insertions.iter() {
24            insertions.assert_invariants(id, version_map);
25        }
26        for (&id, deletions) in self.deletions.iter() {
27            deletions.assert_invariants(id, deletion_map);
28        }
29    }
30
31    /// Inserts a new [`Deletion`] into the backlog.
32    ///
33    /// Runs in `O(n)` in the number of deletions already in the backlog, with
34    /// a best-case of `O(log n)`.
35    ///
36    /// # Panics
37    ///
38    /// Panics if the deletion has already been backlogged.
39    #[inline]
40    pub fn insert_deletion(&mut self, deletion: Deletion) {
41        self.deletions
42            .entry(deletion.deleted_by())
43            .or_default()
44            .insert(deletion);
45    }
46
47    /// Inserts a new [`Insertion`] into the backlog.
48    ///
49    /// Runs in `O(n)` in the number of insertions already in the backlog, with
50    /// a best-case of `O(log n)`.
51    ///
52    /// # Panics
53    ///
54    /// Panics if the insertion has already been backlogged.
55    #[inline]
56    pub fn insert_insertion(&mut self, insertion: Insertion) {
57        self.insertions
58            .entry(insertion.inserted_by())
59            .or_default()
60            .insert(insertion);
61    }
62
63    /// Creates a new, empty `Backlog`.
64    #[inline]
65    pub fn new() -> Self {
66        Self::default()
67    }
68}
69
70/// Stores the backlogged [`Insertion`]s of a particular replica.
71#[derive(Clone, Default, PartialEq)]
72#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
73struct InsertionsBacklog {
74    insertions: VecDeque<Insertion>,
75}
76
77impl core::fmt::Debug for InsertionsBacklog {
78    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
79        f.debug_list()
80            .entries(self.insertions.iter().map(|i| i.text().temporal_range()))
81            .finish()
82    }
83}
84
85impl InsertionsBacklog {
86    fn assert_invariants(&self, id: ReplicaId, version_map: &VersionMap) {
87        let Some(first) = self.insertions.front() else {
88            return;
89        };
90
91        assert!(version_map.get(id) <= first.start());
92
93        let mut prev_end = 0;
94
95        for insertion in &self.insertions {
96            assert_eq!(insertion.inserted_by(), id);
97            assert!(insertion.start() >= prev_end);
98            prev_end = insertion.end();
99        }
100    }
101
102    /// # Panics
103    ///
104    /// Panics if the insertion has already been inserted.
105    #[inline]
106    fn insert(&mut self, insertion: Insertion) {
107        let offset = self
108            .insertions
109            .binary_search_by(|probe| probe.start().cmp(&insertion.start()))
110            .unwrap_err();
111
112        self.insertions.insert(offset, insertion);
113    }
114}
115
116/// Stores the backlogged [`Deletion`]s of a particular replica.
117#[derive(Clone, Default, PartialEq)]
118#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
119struct DeletionsBacklog {
120    deletions: VecDeque<Deletion>,
121}
122
123impl core::fmt::Debug for DeletionsBacklog {
124    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
125        f.debug_list()
126            .entries(self.deletions.iter().map(|d| d.deletion_ts()))
127            .finish()
128    }
129}
130
131impl DeletionsBacklog {
132    fn assert_invariants(&self, id: ReplicaId, deletion_map: &DeletionMap) {
133        let Some(first) = self.deletions.front() else {
134            return;
135        };
136
137        assert!(deletion_map.get(id) <= first.deletion_ts());
138
139        let mut prev_ts = 0;
140
141        for deletion in &self.deletions {
142            assert_eq!(deletion.deleted_by(), id);
143            assert!(deletion.deletion_ts() > prev_ts);
144            prev_ts = deletion.deletion_ts();
145        }
146    }
147
148    /// # Panics
149    ///
150    /// Panics if the deletion has already inserted.
151    #[inline]
152    fn insert(&mut self, deletion: Deletion) {
153        let offset = self
154            .deletions
155            .binary_search_by(|probe| {
156                probe.deletion_ts().cmp(&deletion.deletion_ts())
157            })
158            .unwrap_err();
159
160        self.deletions.insert(offset, deletion);
161    }
162}
163
164/// An iterator over the backlogged deletions that are ready to be
165/// applied to a [`Replica`].
166///
167/// This struct is created by the
168/// [`backlogged_deletions`](Replica::backlogged_deletions) method on
169/// [`Replica`]. See its documentation for more information.
170pub struct BackloggedDeletions<'a> {
171    replica: &'a mut Replica,
172    current: Option<&'a mut DeletionsBacklog>,
173    iter: ReplicaIdMapValuesMut<'a, DeletionsBacklog>,
174}
175
176impl<'a> BackloggedDeletions<'a> {
177    #[inline]
178    pub(crate) fn from_replica(replica: &'a mut Replica) -> Self {
179        let backlog = replica.backlog_mut();
180
181        // We transmute the exclusive reference to the backlog into the same
182        // type to get around the borrow checker.
183        //
184        // SAFETY: this is safe because in the `Iterator` implementation we
185        // never access the backlog through the `Replica`, neither directly nor
186        // by calling any methods on `Replica` that would access the backlog.
187        let backlog =
188            unsafe { core::mem::transmute::<_, &mut Backlog>(backlog) };
189
190        let mut iter = backlog.deletions.values_mut();
191
192        let current = iter.next();
193
194        Self { replica, iter, current }
195    }
196}
197
198impl Iterator for BackloggedDeletions<'_> {
199    type Item = Vec<Range<Length>>;
200
201    #[inline]
202    fn next(&mut self) -> Option<Self::Item> {
203        let deletions = self.current.as_mut()?;
204
205        let Some(first) = deletions.deletions.front() else {
206            self.current = self.iter.next();
207            return self.next();
208        };
209
210        if self.replica.can_merge_deletion(first) {
211            let first = deletions.deletions.pop_front().unwrap();
212            let ranges = self.replica.merge_unchecked_deletion(&first);
213            if ranges.is_empty() {
214                self.next()
215            } else {
216                Some(ranges)
217            }
218        } else {
219            self.current = self.iter.next();
220            self.next()
221        }
222    }
223}
224
225impl core::iter::FusedIterator for BackloggedDeletions<'_> {}
226
227/// An iterator over the backlogged insertions that are ready to be
228/// applied to a [`Replica`].
229///
230/// This struct is created by the
231/// [`backlogged_insertion`](Replica::backlogged_insertions) method on
232/// [`Replica`]. See its documentation for more information.
233pub struct BackloggedInsertions<'a> {
234    replica: &'a mut Replica,
235    current: Option<&'a mut InsertionsBacklog>,
236    iter: ReplicaIdMapValuesMut<'a, InsertionsBacklog>,
237}
238
239impl<'a> BackloggedInsertions<'a> {
240    #[inline]
241    pub(crate) fn from_replica(replica: &'a mut Replica) -> Self {
242        let backlog = replica.backlog_mut();
243
244        // We transmute the exclusive reference to the backlog into the same
245        // type to get around the borrow checker.
246        //
247        // SAFETY: this is safe because in the `Iterator` implementation we
248        // never access the backlog through the `Replica`, neither directly nor
249        // by calling any methods on `Replica` that would access the backlog.
250        let backlog =
251            unsafe { core::mem::transmute::<_, &mut Backlog>(backlog) };
252
253        let mut iter = backlog.insertions.values_mut();
254
255        let current = iter.next();
256
257        Self { replica, current, iter }
258    }
259}
260
261impl Iterator for BackloggedInsertions<'_> {
262    type Item = (Text, Length);
263
264    #[inline]
265    fn next(&mut self) -> Option<Self::Item> {
266        let Some(insertions) = self.current.as_mut() else {
267            return None;
268        };
269
270        let Some(first) = insertions.insertions.front() else {
271            self.current = self.iter.next();
272            return self.next();
273        };
274
275        if self.replica.can_merge_insertion(first) {
276            let first = insertions.insertions.pop_front().unwrap();
277            let edit = self.replica.merge_unchecked_insertion(&first);
278            Some((first.text().clone(), edit))
279        } else {
280            self.current = self.iter.next();
281            self.next()
282        }
283    }
284}
285
286impl core::iter::FusedIterator for BackloggedInsertions<'_> {}