1use alloc::collections::VecDeque;
2use core::ops::Range;
3
4use crate::*;
5
6#[derive(Debug, Clone, Default, PartialEq)]
11pub(crate) struct Backlog {
12 insertions: ReplicaIdMap<InsertionsBacklog>,
13 deletions: ReplicaIdMap<DeletionsBacklog>,
14}
15
16impl Backlog {
17 pub fn assert_invariants(
18 &self,
19 version_map: &VersionMap,
20 deletion_map: &DeletionMap,
21 ) {
22 for (&id, insertions) in self.insertions.iter() {
23 insertions.assert_invariants(id, version_map);
24 }
25 for (&id, deletions) in self.deletions.iter() {
26 deletions.assert_invariants(id, deletion_map);
27 }
28 }
29
30 #[inline]
39 pub fn insert_deletion(&mut self, deletion: Deletion) {
40 self.deletions
41 .entry(deletion.deleted_by())
42 .or_default()
43 .insert(deletion);
44 }
45
46 #[inline]
55 pub fn insert_insertion(&mut self, insertion: Insertion) {
56 self.insertions
57 .entry(insertion.inserted_by())
58 .or_default()
59 .insert(insertion);
60 }
61
62 #[inline]
64 pub fn new() -> Self {
65 Self::default()
66 }
67}
68
69#[derive(Clone, Default, PartialEq)]
71struct InsertionsBacklog {
72 insertions: VecDeque<Insertion>,
73}
74
75impl core::fmt::Debug for InsertionsBacklog {
76 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
77 f.debug_list()
78 .entries(self.insertions.iter().map(|i| i.text().temporal_range()))
79 .finish()
80 }
81}
82
83impl InsertionsBacklog {
84 fn assert_invariants(&self, id: ReplicaId, version_map: &VersionMap) {
85 let Some(first) = self.insertions.front() else {
86 return;
87 };
88
89 assert!(version_map.get(id) <= first.start());
90
91 let mut prev_end = 0;
92
93 for insertion in &self.insertions {
94 assert_eq!(insertion.inserted_by(), id);
95 assert!(insertion.start() >= prev_end);
96 prev_end = insertion.end();
97 }
98 }
99
100 #[inline]
104 fn insert(&mut self, insertion: Insertion) {
105 let offset = self
106 .insertions
107 .binary_search_by(|probe| probe.start().cmp(&insertion.start()))
108 .unwrap_err();
109
110 self.insertions.insert(offset, insertion);
111 }
112}
113
114#[derive(Clone, Default, PartialEq)]
116struct DeletionsBacklog {
117 deletions: VecDeque<Deletion>,
118}
119
120impl core::fmt::Debug for DeletionsBacklog {
121 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
122 f.debug_list()
123 .entries(self.deletions.iter().map(|d| d.deletion_ts()))
124 .finish()
125 }
126}
127
128impl DeletionsBacklog {
129 fn assert_invariants(&self, id: ReplicaId, deletion_map: &DeletionMap) {
130 let Some(first) = self.deletions.front() else {
131 return;
132 };
133
134 assert!(deletion_map.get(id) <= first.deletion_ts());
135
136 let mut prev_ts = 0;
137
138 for deletion in &self.deletions {
139 assert_eq!(deletion.deleted_by(), id);
140 assert!(deletion.deletion_ts() > prev_ts);
141 prev_ts = deletion.deletion_ts();
142 }
143 }
144
145 #[inline]
149 fn insert(&mut self, deletion: Deletion) {
150 let offset = self
151 .deletions
152 .binary_search_by(|probe| {
153 probe.deletion_ts().cmp(&deletion.deletion_ts())
154 })
155 .unwrap_err();
156
157 self.deletions.insert(offset, deletion);
158 }
159}
160
161pub struct BackloggedDeletions<'a> {
168 replica: &'a mut Replica,
169 current: Option<&'a mut DeletionsBacklog>,
170 iter: ReplicaIdMapValuesMut<'a, DeletionsBacklog>,
171}
172
173impl<'a> BackloggedDeletions<'a> {
174 #[inline]
175 pub(crate) fn from_replica(replica: &'a mut Replica) -> Self {
176 let backlog = replica.backlog_mut();
177
178 let backlog = unsafe {
185 core::mem::transmute::<&mut Backlog, &mut Backlog>(backlog)
186 };
187
188 let mut iter = backlog.deletions.values_mut();
189
190 let current = iter.next();
191
192 Self { replica, iter, current }
193 }
194}
195
196impl Iterator for BackloggedDeletions<'_> {
197 type Item = Vec<Range<Length>>;
198
199 #[inline]
200 fn next(&mut self) -> Option<Self::Item> {
201 let deletions = self.current.as_mut()?;
202
203 let Some(first) = deletions.deletions.front() else {
204 self.current = self.iter.next();
205 return self.next();
206 };
207
208 if self.replica.can_merge_deletion(first) {
209 let first = deletions.deletions.pop_front().unwrap();
210 let ranges = self.replica.merge_unchecked_deletion(&first);
211 if ranges.is_empty() {
212 self.next()
213 } else {
214 Some(ranges)
215 }
216 } else {
217 self.current = self.iter.next();
218 self.next()
219 }
220 }
221}
222
223impl core::iter::FusedIterator for BackloggedDeletions<'_> {}
224
225pub struct BackloggedInsertions<'a> {
232 replica: &'a mut Replica,
233 current: Option<&'a mut InsertionsBacklog>,
234 iter: ReplicaIdMapValuesMut<'a, InsertionsBacklog>,
235}
236
237impl<'a> BackloggedInsertions<'a> {
238 #[inline]
239 pub(crate) fn from_replica(replica: &'a mut Replica) -> Self {
240 let backlog = replica.backlog_mut();
241
242 let backlog = unsafe {
249 core::mem::transmute::<&mut Backlog, &mut Backlog>(backlog)
250 };
251
252 let mut iter = backlog.insertions.values_mut();
253
254 let current = iter.next();
255
256 Self { replica, current, iter }
257 }
258}
259
260impl Iterator for BackloggedInsertions<'_> {
261 type Item = (Text, Length);
262
263 #[inline]
264 fn next(&mut self) -> Option<Self::Item> {
265 let insertions = self.current.as_mut()?;
266
267 let Some(first) = insertions.insertions.front() else {
268 self.current = self.iter.next();
269 return self.next();
270 };
271
272 if self.replica.can_merge_insertion(first) {
273 let first = insertions.insertions.pop_front().unwrap();
274 let edit = self.replica.merge_unchecked_insertion(&first);
275 Some((first.text().clone(), edit))
276 } else {
277 self.current = self.iter.next();
278 self.next()
279 }
280 }
281}
282
283impl core::iter::FusedIterator for BackloggedInsertions<'_> {}
284
285#[cfg(feature = "encode")]
286pub(crate) mod encode {
287 use super::*;
288 use crate::encode::{Decode, DecodeWithCtx, Encode, IntDecodeError};
289 use crate::version_map::encode::BaseMapDecodeError;
290
291 impl InsertionsBacklog {
292 #[inline(always)]
293 fn iter(&self) -> impl Iterator<Item = &Insertion> + '_ {
294 self.insertions.iter()
295 }
296
297 #[inline(always)]
298 fn len(&self) -> usize {
299 self.insertions.len()
300 }
301
302 #[inline(always)]
303 fn push(&mut self, insertion: Insertion) {
304 self.insertions.push_back(insertion);
305 }
306 }
307
308 impl DeletionsBacklog {
309 #[inline(always)]
310 fn iter(&self) -> impl Iterator<Item = &Deletion> + '_ {
311 self.deletions.iter()
312 }
313
314 #[inline(always)]
315 fn len(&self) -> usize {
316 self.deletions.len()
317 }
318
319 #[inline(always)]
320 fn push(&mut self, deletion: Deletion) {
321 self.deletions.push_back(deletion);
322 }
323 }
324
325 impl Encode for Backlog {
326 #[inline]
327 fn encode(&self, buf: &mut Vec<u8>) {
328 (self.insertions.len() as u64).encode(buf);
329
330 for (id, insertions) in &self.insertions {
331 ReplicaIdInsertions::new(*id, insertions).encode(buf);
332 }
333
334 (self.deletions.len() as u64).encode(buf);
335
336 for (id, deletions) in &self.deletions {
337 ReplicaIdDeletions::new(*id, deletions).encode(buf);
338 }
339 }
340 }
341
342 pub(crate) enum BacklogDecodeError {
343 Int(IntDecodeError),
344 VersionMap(BaseMapDecodeError<Length>),
345 }
346
347 impl From<IntDecodeError> for BacklogDecodeError {
348 #[inline(always)]
349 fn from(err: IntDecodeError) -> Self {
350 Self::Int(err)
351 }
352 }
353
354 impl From<BaseMapDecodeError<Length>> for BacklogDecodeError {
355 #[inline(always)]
356 fn from(err: BaseMapDecodeError<Length>) -> Self {
357 Self::VersionMap(err)
358 }
359 }
360
361 impl core::fmt::Display for BacklogDecodeError {
362 #[inline]
363 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
364 let err: &dyn core::fmt::Display = match self {
365 Self::VersionMap(err) => err,
366 Self::Int(err) => err,
367 };
368
369 write!(f, "Backlog: couldn't be decoded: {err}")
370 }
371 }
372
373 impl Decode for Backlog {
374 type Value = Self;
375
376 type Error = BacklogDecodeError;
377
378 #[inline]
379 fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error> {
380 let (num_replicas, mut buf) = u64::decode(buf)?;
381
382 let mut insertions = ReplicaIdMap::default();
383
384 for _ in 0..num_replicas {
385 let ((), new_buf) =
386 ReplicaIdInsertions::decode(buf, &mut insertions)?;
387 buf = new_buf;
388 }
389
390 let (num_replicas, mut buf) = u64::decode(buf)?;
391
392 let mut deletions = ReplicaIdMap::default();
393
394 for _ in 0..num_replicas {
395 let ((), new_buf) =
396 ReplicaIdDeletions::decode(buf, &mut deletions)?;
397 buf = new_buf;
398 }
399
400 let this = Self { insertions, deletions };
401
402 Ok((this, buf))
403 }
404 }
405
406 struct ReplicaIdInsertions<'a> {
407 replica_id: ReplicaId,
408 insertions: &'a InsertionsBacklog,
409 }
410
411 impl<'a> ReplicaIdInsertions<'a> {
412 #[inline]
413 fn new(
414 replica_id: ReplicaId,
415 insertions: &'a InsertionsBacklog,
416 ) -> Self {
417 Self { replica_id, insertions }
418 }
419 }
420
421 impl Encode for ReplicaIdInsertions<'_> {
422 #[inline]
423 fn encode(&self, buf: &mut Vec<u8>) {
424 self.replica_id.encode(buf);
425
426 (self.insertions.len() as u64).encode(buf);
427
428 for insertion in self.insertions.iter() {
429 insertion.anchor().encode(buf);
430 let range = insertion.text().temporal_range();
431 range.start.encode(buf);
432 range.len().encode(buf);
433 insertion.run_ts().encode(buf);
434 insertion.lamport_ts().encode(buf);
435 }
436 }
437 }
438
439 impl DecodeWithCtx for ReplicaIdInsertions<'_> {
440 type Value = ();
441
442 type Ctx = ReplicaIdMap<InsertionsBacklog>;
443
444 type Error = BacklogDecodeError;
445
446 #[inline]
447 fn decode<'buf>(
448 buf: &'buf [u8],
449 ctx: &mut Self::Ctx,
450 ) -> Result<((), &'buf [u8]), Self::Error> {
451 let (replica_id, buf) = ReplicaId::decode(buf)?;
452
453 let (num_insertions, mut buf) = u64::decode(buf)?;
454
455 let mut insertions = InsertionsBacklog::default();
456
457 for _ in 0..num_insertions {
458 let (anchor, new_buf) = InnerAnchor::decode(buf)?;
459 let (start, new_buf) = Length::decode(new_buf)?;
460 let (len, new_buf) = Length::decode(new_buf)?;
461 let (run_ts, new_buf) = RunTs::decode(new_buf)?;
462 let (lamport_ts, new_buf) = LamportTs::decode(new_buf)?;
463
464 let insertion = Insertion::new(
465 anchor,
466 Text::new(replica_id, start..start + len),
467 lamport_ts,
468 run_ts,
469 );
470
471 insertions.push(insertion);
472
473 buf = new_buf;
474 }
475
476 ctx.insert(replica_id, insertions);
477
478 Ok(((), buf))
479 }
480 }
481
482 struct ReplicaIdDeletions<'a> {
483 replica_id: ReplicaId,
484 deletions: &'a DeletionsBacklog,
485 }
486
487 impl<'a> ReplicaIdDeletions<'a> {
488 #[inline]
489 fn new(
490 replica_id: ReplicaId,
491 deletions: &'a DeletionsBacklog,
492 ) -> Self {
493 Self { replica_id, deletions }
494 }
495 }
496
497 impl Encode for ReplicaIdDeletions<'_> {
498 #[inline]
499 fn encode(&self, buf: &mut Vec<u8>) {
500 self.replica_id.encode(buf);
501
502 (self.deletions.len() as u64).encode(buf);
503
504 for deletion in self.deletions.iter() {
505 deletion.start().encode(buf);
506 deletion.end().encode(buf);
507 deletion.version_map().encode(buf);
508 deletion.deletion_ts().encode(buf);
509 }
510 }
511 }
512
513 impl DecodeWithCtx for ReplicaIdDeletions<'_> {
514 type Value = ();
515
516 type Ctx = ReplicaIdMap<DeletionsBacklog>;
517
518 type Error = BacklogDecodeError;
519
520 #[inline]
521 fn decode<'buf>(
522 buf: &'buf [u8],
523 ctx: &mut Self::Ctx,
524 ) -> Result<((), &'buf [u8]), Self::Error> {
525 let (replica_id, buf) = ReplicaId::decode(buf)?;
526
527 let (num_deletions, mut buf) = u64::decode(buf)?;
528
529 let mut deletions = DeletionsBacklog::default();
530
531 for _ in 0..num_deletions {
532 let (start, new_buf) = InnerAnchor::decode(buf)?;
533 let (end, new_buf) = InnerAnchor::decode(new_buf)?;
534 let (version_map, new_buf) = VersionMap::decode(new_buf)?;
535 let (deletion_ts, new_buf) = DeletionTs::decode(new_buf)?;
536
537 let deletion =
538 Deletion::new(start, end, version_map, deletion_ts);
539
540 deletions.push(deletion);
541
542 buf = new_buf;
543 }
544
545 ctx.insert(replica_id, deletions);
546
547 Ok(((), buf))
548 }
549 }
550}
551
552#[cfg(feature = "serde")]
553mod serde {
554 crate::encode::impl_serialize!(super::Backlog);
555 crate::encode::impl_deserialize!(super::Backlog);
556}