yrs/
id_set.rs

1use crate::block::{ClientID, ID};
2use crate::block_store::BlockStore;
3use crate::encoding::read::Error;
4use crate::iter::TxnIterator;
5use crate::slice::BlockSlice;
6use crate::store::Store;
7use crate::updates::decoder::{Decode, Decoder};
8use crate::updates::encoder::{Encode, Encoder};
9use crate::utils::client_hasher::ClientHasher;
10use crate::ReadTxn;
11use serde::de::{SeqAccess, Visitor};
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use smallvec::{smallvec, SmallVec};
14use std::collections::hash_map::Entry;
15use std::collections::HashMap;
16use std::fmt::Formatter;
17use std::hash::{BuildHasherDefault, Hash, Hasher};
18use std::ops::Range;
19// Note: use native Rust [Range](https://doc.rust-lang.org/std/ops/struct.Range.html)
20// as it's left-inclusive/right-exclusive and defines the exact capabilities we care about here.
21
22impl Encode for Range<u32> {
23    fn encode<E: Encoder>(&self, encoder: &mut E) {
24        encoder.write_ds_clock(self.start);
25        encoder.write_ds_len(self.end - self.start)
26    }
27}
28
29impl Decode for Range<u32> {
30    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
31        let clock = decoder.read_ds_clock()?;
32        let len = decoder.read_ds_len()?;
33        Ok(clock..(clock + len))
34    }
35}
36
37/// [IdRange] describes a set of elements, represented as ranges of `u32` clock values, created by
38/// specific client, included in a corresponding [IdSet].
39#[derive(Clone, PartialEq, Eq, Hash)]
40pub enum IdRange {
41    /// [IdRange] variant that can be represented as a single range of clock values. It's an
42    /// optimization that can be used to avoid allocations, when the corresponding set describes
43    /// a single continuous range.
44    Continuous(Range<u32>),
45    /// A multiple ranges containing clock values, separated from each other by other clock ranges
46    /// not included in this [IdRange].
47    Fragmented(Vec<Range<u32>>),
48}
49
50impl IdRange {
51    pub fn with_capacity(capacity: usize) -> Self {
52        IdRange::Fragmented(Vec::with_capacity(capacity))
53    }
54
55    /// Check if range is empty (doesn't cover any clock space).
56    pub fn is_empty(&self) -> bool {
57        match self {
58            IdRange::Continuous(r) => r.start == r.end,
59            IdRange::Fragmented(rs) => rs.is_empty(),
60        }
61    }
62
63    /// Inverts current [IdRange], returning another [IdRange] that contains all
64    /// "holes" (ranges not included in current range). If current range is a continuous space
65    /// starting from the initial clock (eg. [0..5)), then returned range will be empty.
66    pub fn invert(&self) -> IdRange {
67        match self {
68            IdRange::Continuous(range) => IdRange::Continuous(0..range.start),
69            IdRange::Fragmented(ranges) => {
70                let mut inv = Vec::new();
71                let mut start = 0;
72                for range in ranges.iter() {
73                    if range.start > start {
74                        inv.push(start..range.start);
75                    }
76                    start = range.end;
77                }
78                match inv.len() {
79                    0 => IdRange::Continuous(0..0),
80                    1 => IdRange::Continuous(inv[0].clone()),
81                    _ => IdRange::Fragmented(inv),
82                }
83            }
84        }
85    }
86
87    /// Check if given clock exists within current [IdRange].
88    pub fn contains(&self, clock: u32) -> bool {
89        match self {
90            IdRange::Continuous(range) => range.contains(&clock),
91            IdRange::Fragmented(ranges) => ranges.iter().any(|r| r.contains(&clock)),
92        }
93    }
94
95    /// Iterate over ranges described by current [IdRange].
96    pub fn iter(&self) -> IdRangeIter<'_> {
97        let (range, inner) = match self {
98            IdRange::Continuous(range) => (Some(range), None),
99            IdRange::Fragmented(ranges) => (None, Some(ranges.iter())),
100        };
101        IdRangeIter { range, inner }
102    }
103
104    fn push(&mut self, range: Range<u32>) {
105        match self {
106            IdRange::Continuous(r) => {
107                if r.end >= range.start {
108                    if r.start > range.end {
109                        *self = IdRange::Fragmented(vec![range, r.clone()])
110                    } else {
111                        // two ranges overlap - merge them
112                        r.end = range.end.max(r.end);
113                        r.start = range.start.min(r.start);
114                    }
115                } else {
116                    *self = IdRange::Fragmented(vec![r.clone(), range])
117                }
118            }
119            IdRange::Fragmented(ranges) => {
120                if ranges.is_empty() {
121                    *self = IdRange::Continuous(range);
122                } else {
123                    let last_idx = ranges.len() - 1;
124                    let last = &mut ranges[last_idx];
125                    if !Self::try_join(last, &range) {
126                        ranges.push(range);
127                    }
128                }
129            }
130        }
131    }
132
133    /// Alters current [IdRange] by compacting its internal implementation (in fragmented case).
134    /// Example: fragmented space of [0,3), [3,5), [6,7) will be compacted into [0,5), [6,7).
135    fn squash(&mut self) {
136        if let IdRange::Fragmented(ranges) = self {
137            if !ranges.is_empty() {
138                ranges.sort_by(|a, b| a.start.cmp(&b.start));
139                let mut new_len = 1;
140
141                let len = ranges.len() as isize;
142                let head = ranges.as_mut_ptr();
143                let mut current = unsafe { head.as_mut().unwrap() };
144                let mut i = 1;
145                while i < len {
146                    let next = unsafe { head.offset(i).as_ref().unwrap() };
147                    if !Self::try_join(current, next) {
148                        // current and next are disjoined eg. [0,5) & [6,9)
149
150                        // move current pointer one index to the left: by using new_len we
151                        // squash ranges possibly already merged to current
152                        current = unsafe { head.offset(new_len).as_mut().unwrap() };
153
154                        // make next a new current
155                        current.start = next.start;
156                        current.end = next.end;
157                        new_len += 1;
158                    }
159
160                    i += 1;
161                }
162
163                if new_len == 1 {
164                    *self = IdRange::Continuous(ranges[0].clone())
165                } else if ranges.len() != new_len as usize {
166                    ranges.truncate(new_len as usize);
167                }
168            }
169        }
170    }
171
172    fn is_squashed(&self) -> bool {
173        match self {
174            IdRange::Continuous(_) => true,
175            IdRange::Fragmented(ranges) => {
176                let mut i = ranges.iter();
177                if let Some(r) = i.next() {
178                    let mut prev_end = r.end;
179                    while let Some(r) = i.next() {
180                        if r.start < prev_end {
181                            return false;
182                        }
183                        prev_end = r.end;
184                    }
185                    true
186                } else {
187                    true
188                }
189            }
190        }
191    }
192
193    /// Merge `other` ID range into current one.
194    pub fn merge(&mut self, other: IdRange) {
195        let raw = std::mem::take(self);
196        *self = match (raw, other) {
197            (IdRange::Continuous(mut a), IdRange::Continuous(b)) => {
198                if Self::disjoint(&a, &b) {
199                    IdRange::Fragmented(vec![a, b])
200                } else {
201                    a.start = a.start.min(b.start);
202                    a.end = a.end.max(b.end);
203                    IdRange::Continuous(a)
204                }
205            }
206            (IdRange::Fragmented(mut a), IdRange::Continuous(b)) => {
207                a.push(b);
208                IdRange::Fragmented(a)
209            }
210            (IdRange::Continuous(a), IdRange::Fragmented(b)) => {
211                let mut v = b;
212                v.push(a);
213                IdRange::Fragmented(v)
214            }
215            (IdRange::Fragmented(mut a), IdRange::Fragmented(mut b)) => {
216                a.append(&mut b);
217                IdRange::Fragmented(a)
218            }
219        };
220    }
221
222    /// Check if current [IdRange] is a subset of `other`. This means that all the elements
223    /// described by the current [IdRange] can be found within the bounds of `other` [IdRange].
224    /// If there are some clock values not found within the `other` this method will return false.
225    pub fn subset_of(&self, other: &Self) -> bool {
226        for range in self.iter() {
227            if !Self::is_range_covered(range, other) {
228                return false;
229            }
230        }
231        true
232    }
233
234    fn is_range_covered(range: &Range<u32>, other: &Self) -> bool {
235        if range.is_empty() {
236            return true;
237        }
238
239        let mut current = range.start;
240
241        for other_range in other.iter() {
242            // Skip ranges that end before our current position
243            if other_range.end <= current {
244                continue;
245            }
246
247            // If there's a gap before this range, we're not fully covered
248            if other_range.start > current {
249                return false;
250            }
251
252            // This range covers from current to its end
253            current = other_range.end;
254
255            // If we've covered the entire range, we're done
256            if current >= range.end {
257                return true;
258            }
259        }
260
261        // Check if we covered the entire range
262        current >= range.end
263    }
264
265    /// Subtracts `other` from the current [IdRange], producing a new [IdRange] that contains
266    /// all elements from the current range that are not present in `other`.
267    pub fn subtract(&mut self, other: Self) {
268        let mut result = Vec::new();
269
270        for range in self.iter() {
271            let mut current_ranges: SmallVec<[Range<u32>; 2]> = smallvec![range.clone()];
272
273            for other_range in other.iter() {
274                let mut new_ranges = SmallVec::new();
275                for r in current_ranges {
276                    if Self::disjoint(&r, other_range) {
277                        // No overlap, keep the range as is
278                        new_ranges.push(r);
279                    } else {
280                        // There's overlap, subtract the overlapping part
281                        if r.start < other_range.start {
282                            // Keep the part before other_range
283                            new_ranges.push(r.start..other_range.start);
284                        }
285                        if r.end > other_range.end {
286                            // Keep the part after other_range
287                            new_ranges.push(other_range.end..r.end);
288                        }
289                        // The overlapping part is discarded
290                    }
291                }
292                current_ranges = new_ranges;
293            }
294
295            result.extend(current_ranges);
296        }
297
298        *self = match result.len() {
299            0 => IdRange::Continuous(0..0),
300            1 => IdRange::Continuous(result[0].clone()),
301            _ => IdRange::Fragmented(result),
302        };
303    }
304
305    /// Computes the intersection of the current [IdRange] with `other`, modifying the current
306    /// range to contain only elements present in both ranges.
307    pub fn intersect(&mut self, other: Self) {
308        let mut result = Vec::new();
309
310        for self_range in self.iter() {
311            for other_range in other.iter() {
312                if !Self::disjoint(self_range, other_range) {
313                    // Ranges overlap, compute the intersection
314                    let start = self_range.start.max(other_range.start);
315                    let end = self_range.end.min(other_range.end);
316                    result.push(start..end);
317                }
318            }
319        }
320
321        *self = match result.len() {
322            0 => IdRange::Continuous(0..0),
323            1 => IdRange::Continuous(result[0].clone()),
324            _ => IdRange::Fragmented(result),
325        };
326    }
327
328    fn encode_raw<E: Encoder>(&self, encoder: &mut E) {
329        match self {
330            IdRange::Continuous(range) => {
331                encoder.write_var(1u32);
332                range.encode(encoder)
333            }
334            IdRange::Fragmented(ranges) => {
335                encoder.write_var(ranges.len() as u32);
336                for range in ranges.iter() {
337                    range.encode(encoder);
338                }
339            }
340        }
341    }
342
343    #[inline]
344    fn try_join(a: &mut Range<u32>, b: &Range<u32>) -> bool {
345        if Self::disjoint(a, b) {
346            false
347        } else {
348            a.start = a.start.min(b.start);
349            a.end = a.end.max(b.end);
350            true
351        }
352    }
353
354    #[inline]
355    fn disjoint(a: &Range<u32>, b: &Range<u32>) -> bool {
356        a.start > b.end || b.start > a.end
357    }
358}
359
360impl Default for IdRange {
361    fn default() -> Self {
362        IdRange::Continuous(0..0)
363    }
364}
365
366impl Encode for IdRange {
367    fn encode<E: Encoder>(&self, encoder: &mut E) {
368        if self.is_squashed() {
369            self.encode_raw(encoder)
370        } else {
371            let mut clone = self.clone();
372            clone.squash();
373            clone.encode_raw(encoder);
374        }
375    }
376}
377
378impl Decode for IdRange {
379    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
380        match decoder.read_var::<u32>()? {
381            1 => {
382                let range = Range::decode(decoder)?;
383                Ok(IdRange::Continuous(range))
384            }
385            len => {
386                let mut ranges = Vec::with_capacity(len as usize);
387                let mut i = 0;
388                while i < len {
389                    ranges.push(Range::decode(decoder)?);
390                    i += 1;
391                }
392                Ok(IdRange::Fragmented(ranges))
393            }
394        }
395    }
396}
397
398pub struct IdRangeIter<'a> {
399    inner: Option<std::slice::Iter<'a, Range<u32>>>,
400    range: Option<&'a Range<u32>>,
401}
402
403impl<'a> Iterator for IdRangeIter<'a> {
404    type Item = &'a Range<u32>;
405
406    fn next(&mut self) -> Option<Self::Item> {
407        if let Some(inner) = &mut self.inner {
408            inner.next()
409        } else {
410            self.range.take()
411        }
412    }
413}
414
415/// Implement this to efficiently let IdRange iterator work in descending order
416impl<'a> DoubleEndedIterator for IdRangeIter<'a> {
417    fn next_back(&mut self) -> Option<Self::Item> {
418        if let Some(inner) = &mut self.inner {
419            inner.next_back()
420        } else {
421            self.range.take()
422        }
423    }
424}
425
426/// DeleteSet is a temporary object that is created when needed.
427/// - When created in a transaction, it must only be accessed after sorting and merging.
428///   - This DeleteSet is sent to other clients.
429/// - We do not create a DeleteSet when we send a sync message. The DeleteSet message is created
430///   directly from StructStore.
431/// - We read a DeleteSet as a apart of sync/update message. In this case the DeleteSet is already
432///   sorted and merged.
433#[derive(Default, Clone, PartialEq, Eq)]
434pub struct IdSet(HashMap<ClientID, IdRange, BuildHasherDefault<ClientHasher>>);
435
436pub(crate) type Iter<'a> = std::collections::hash_map::Iter<'a, ClientID, IdRange>;
437
438//TODO: I'd say we should split IdSet and DeleteSet into two structures. While DeleteSet can be
439// implemented in terms of IdSet, it has more specific methods (related to deletion process), while
440// IdSet could contain wider area of use cases.
441impl IdSet {
442    pub fn new() -> Self {
443        Self::default()
444    }
445
446    /// Returns number of clients stored;
447    pub fn len(&self) -> usize {
448        self.0.len()
449    }
450
451    pub(crate) fn iter(&self) -> Iter<'_> {
452        self.0.iter()
453    }
454
455    /// Check if current [IdSet] contains given `id`.
456    pub fn contains(&self, id: &ID) -> bool {
457        if let Some(ranges) = self.0.get(&id.client) {
458            ranges.contains(id.clock)
459        } else {
460            false
461        }
462    }
463
464    /// Checks if current ID set contains any data.
465    pub fn is_empty(&self) -> bool {
466        self.0.is_empty() || self.0.values().all(|r| r.is_empty())
467    }
468
469    /// Compacts an internal ranges representation.
470    pub fn squash(&mut self) {
471        for block in self.0.values_mut() {
472            block.squash();
473        }
474    }
475
476    pub fn insert(&mut self, id: ID, len: u32) {
477        let range = id.clock..(id.clock + len);
478        match self.0.entry(id.client) {
479            Entry::Occupied(r) => {
480                r.into_mut().push(range);
481            }
482            Entry::Vacant(e) => {
483                e.insert(IdRange::Continuous(range));
484            }
485        }
486    }
487
488    /// Inserts a new ID `range` corresponding with a given `client`.
489    pub fn insert_range(&mut self, client: ClientID, range: IdRange) {
490        self.0.insert(client, range);
491    }
492
493    /// Merges another ID set into a current one, combining their information about observed ID
494    /// ranges and squashing them if necessary.
495    pub fn merge(&mut self, other: Self) {
496        other.0.into_iter().for_each(|(client, range)| {
497            if let Some(r) = self.0.get_mut(&client) {
498                r.merge(range)
499            } else {
500                self.0.insert(client, range);
501            }
502        });
503        self.squash()
504    }
505
506    pub fn get(&self, client_id: &ClientID) -> Option<&IdRange> {
507        self.0.get(client_id)
508    }
509}
510
511impl Encode for IdSet {
512    fn encode<E: Encoder>(&self, encoder: &mut E) {
513        encoder.write_var(self.0.len() as u32);
514        for (&client_id, block) in self.0.iter() {
515            encoder.reset_ds_cur_val();
516            encoder.write_var(client_id);
517            block.encode(encoder);
518        }
519    }
520}
521
522impl Decode for IdSet {
523    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
524        let mut set = Self::new();
525        let client_len: u32 = decoder.read_var()?;
526        let mut i = 0;
527        while i < client_len {
528            decoder.reset_ds_cur_val();
529            let client: u32 = decoder.read_var()?;
530            let range = IdRange::decode(decoder)?;
531            set.0.insert(client as ClientID, range);
532            i += 1;
533        }
534        Ok(set)
535    }
536}
537
538impl Hash for IdSet {
539    fn hash<H: Hasher>(&self, state: &mut H) {
540        for (client, range) in self.0.iter() {
541            client.hash(state);
542            range.hash(state);
543        }
544    }
545}
546
547impl Serialize for IdSet {
548    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
549    where
550        S: Serializer,
551    {
552        serializer.serialize_bytes(&self.encode_v1())
553    }
554}
555
556impl<'de> Deserialize<'de> for IdSet {
557    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
558    where
559        D: Deserializer<'de>,
560    {
561        struct IdSetVisitor;
562        impl<'de> Visitor<'de> for IdSetVisitor {
563            type Value = IdSet;
564
565            fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
566                write!(formatter, "IdSet")
567            }
568
569            fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
570            where
571                E: serde::de::Error,
572            {
573                IdSet::decode_v1(v).map_err(E::custom)
574            }
575
576            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
577            where
578                A: SeqAccess<'de>,
579            {
580                let mut bytes = match seq.size_hint() {
581                    None => Vec::new(),
582                    Some(capacity) => Vec::with_capacity(capacity),
583                };
584                while let Some(x) = seq.next_element()? {
585                    bytes.push(x);
586                }
587                use serde::de::Error;
588                IdSet::decode_v1(&bytes).map_err(A::Error::custom)
589            }
590        }
591
592        deserializer.deserialize_bytes(IdSetVisitor)
593    }
594}
595
596/// [DeleteSet] contains information about all blocks (described by clock ranges) that have been
597/// subjected to delete process.
598#[repr(transparent)]
599#[derive(Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
600pub struct DeleteSet(IdSet);
601
602impl From<IdSet> for DeleteSet {
603    fn from(id_set: IdSet) -> Self {
604        DeleteSet(id_set)
605    }
606}
607
608impl<'a> From<&'a BlockStore> for DeleteSet {
609    /// Creates a [DeleteSet] by reading all deleted blocks and including their clock ranges into
610    /// the delete set itself.
611    fn from(store: &'a BlockStore) -> Self {
612        let mut set = DeleteSet(IdSet::new());
613        for (&client, blocks) in store.iter() {
614            let mut deletes = IdRange::with_capacity(blocks.len());
615            for block in blocks.iter() {
616                if block.is_deleted() {
617                    let (start, end) = block.clock_range();
618                    deletes.push(start..(end + 1));
619                }
620            }
621
622            if !deletes.is_empty() {
623                set.0.insert_range(client, deletes);
624            }
625        }
626        set
627    }
628}
629
630impl Default for DeleteSet {
631    fn default() -> Self {
632        Self::new()
633    }
634}
635
636impl std::fmt::Debug for DeleteSet {
637    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638        std::fmt::Display::fmt(self, f)
639    }
640}
641impl std::fmt::Display for DeleteSet {
642    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
643        std::fmt::Display::fmt(&self.0, f)
644    }
645}
646
647impl std::fmt::Debug for IdSet {
648    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
649        std::fmt::Display::fmt(self, f)
650    }
651}
652impl std::fmt::Display for IdSet {
653    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654        let mut s = f.debug_struct("");
655        for (k, v) in self.iter() {
656            s.field(&k.to_string(), v);
657        }
658        s.finish()
659    }
660}
661
662impl std::fmt::Debug for IdRange {
663    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
664        std::fmt::Display::fmt(self, f)
665    }
666}
667impl std::fmt::Display for IdRange {
668    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669        match self {
670            IdRange::Continuous(r) => write!(f, "[{}..{})", r.start, r.end),
671            IdRange::Fragmented(r) => {
672                write!(f, "[")?;
673                for r in r.iter() {
674                    write!(f, " [{}..{})", r.start, r.end)?;
675                }
676                write!(f, " ]")
677            }
678        }
679    }
680}
681
682impl DeleteSet {
683    /// Creates a new empty delete set instance.
684    pub fn new() -> Self {
685        DeleteSet(IdSet::new())
686    }
687
688    /// Inserts an information about delete block (identified by `id` and having a specified length)
689    /// inside of a current delete set.
690    pub fn insert(&mut self, id: ID, len: u32) {
691        self.0.insert(id, len)
692    }
693
694    /// Returns number of clients stored;
695    pub fn len(&self) -> usize {
696        self.0.len()
697    }
698
699    /// Checks if delete set contains any clock ranges.
700    pub fn is_empty(&self) -> bool {
701        self.0.is_empty()
702    }
703
704    /// Checks if given block `id` is considered deleted from the perspective of current delete set.
705    pub fn is_deleted(&self, id: &ID) -> bool {
706        self.0.contains(id)
707    }
708
709    /// Returns an iterator over all client-range pairs registered in this delete set.
710    pub fn iter(&self) -> Iter<'_> {
711        self.0.iter()
712    }
713
714    /// Merges another delete set into a current one, combining their information about deleted
715    /// clock ranges.
716    pub fn merge(&mut self, other: Self) {
717        self.0.merge(other.0)
718    }
719
720    /// Squashes the contents of a current delete set. This operation means, that in case when
721    /// delete set contains any overlapping ranges within, they will be squashed together to
722    /// optimize the space and make future encoding more compact.
723    pub fn squash(&mut self) {
724        self.0.squash()
725    }
726
727    pub fn range(&self, client_id: &ClientID) -> Option<&IdRange> {
728        self.0.get(client_id)
729    }
730
731    pub(crate) fn try_squash_with(&mut self, store: &mut Store) {
732        // try to merge deleted / gc'd items
733        for (&client, range) in self.iter() {
734            let blocks = store.blocks.get_client_blocks_mut(client);
735            for r in range.iter().rev() {
736                // start with merging the item next to the last deleted item
737                let mut si =
738                    (blocks.len() - 1).min(1 + blocks.find_pivot(r.end - 1).unwrap_or_default());
739                let mut block = &blocks[si];
740
741                let mut valid_range = usize::MAX..usize::MIN;
742
743                while si > 0 && block.clock_start() >= r.start {
744                    valid_range.start = valid_range.start.min(si);
745                    valid_range.end = valid_range.end.max(si);
746                    si -= 1;
747                    block = &blocks[si];
748                }
749
750                if valid_range.start != usize::MAX && valid_range.end != usize::MIN {
751                    blocks.squash_left_range_compaction(valid_range.start..=valid_range.end);
752                }
753            }
754        }
755    }
756
757    pub(crate) fn deleted_blocks(&self) -> DeletedBlocks {
758        DeletedBlocks::new(self)
759    }
760}
761
762impl Decode for DeleteSet {
763    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
764        Ok(DeleteSet(IdSet::decode(decoder)?))
765    }
766}
767
768impl Encode for DeleteSet {
769    #[inline]
770    fn encode<E: Encoder>(&self, encoder: &mut E) {
771        self.0.encode(encoder)
772    }
773}
774
775pub(crate) struct DeletedBlocks<'ds> {
776    ds_iter: Iter<'ds>,
777    current_range: Option<&'ds Range<u32>>,
778    current_client_id: Option<ClientID>,
779    range_iter: Option<IdRangeIter<'ds>>,
780    current_index: Option<usize>,
781}
782
783impl<'ds> DeletedBlocks<'ds> {
784    pub(crate) fn new(ds: &'ds DeleteSet) -> Self {
785        let ds_iter = ds.iter();
786        DeletedBlocks {
787            ds_iter,
788            current_client_id: None,
789            current_range: None,
790            range_iter: None,
791            current_index: None,
792        }
793    }
794}
795
796impl<'ds> TxnIterator for DeletedBlocks<'ds> {
797    type Item = BlockSlice;
798
799    fn next<T: ReadTxn>(&mut self, txn: &T) -> Option<Self::Item> {
800        if let Some(r) = self.current_range {
801            let mut block = if let Some(idx) = self.current_index.as_mut() {
802                if let Some(block) = txn
803                    .store()
804                    .blocks
805                    .get_client(&self.current_client_id?)
806                    .unwrap()
807                    .get(*idx)
808                {
809                    *idx += 1;
810                    block.as_slice()
811                } else {
812                    self.current_range = None;
813                    self.current_index = None;
814                    return self.next(txn);
815                }
816            } else {
817                // first block for a particular client
818                let list = txn
819                    .store()
820                    .blocks
821                    .get_client(&self.current_client_id?)
822                    .unwrap();
823                if let Some(idx) = list.find_pivot(r.start) {
824                    let mut block = list[idx].as_slice();
825                    let clock = block.clock_start();
826
827                    // check if we don't need to cut first block
828                    if clock < r.start {
829                        block.trim_start(r.start - clock);
830                    }
831                    self.current_index = Some(idx + 1);
832                    block
833                } else {
834                    self.current_range = None;
835                    self.current_index = None;
836                    return self.next(txn);
837                }
838            };
839
840            // check if this is the last block for a current client
841            let clock = block.clock_start();
842            let block_len = block.len();
843            if clock > r.end {
844                // move to the next range
845                self.current_range = None;
846                self.current_index = None;
847                return self.next(txn);
848            } else if clock < r.end && clock + block_len > r.end {
849                // we need to cut the last block
850                block.trim_end(clock + block_len - r.end);
851                self.current_range = None;
852                self.current_index = None;
853            }
854
855            if clock + block_len >= r.end {
856                self.current_range = None;
857                self.current_index = None;
858            }
859
860            Some(block)
861        } else {
862            let range_iter = if let Some(iter) = self.range_iter.as_mut() {
863                iter
864            } else {
865                let (client_id, range) = self.ds_iter.next()?;
866                self.current_client_id = Some(client_id.clone());
867                self.current_index = None;
868                self.range_iter = Some(range.iter());
869                self.range_iter.as_mut().unwrap()
870            };
871            self.current_range = match range_iter.next() {
872                None => {
873                    let (client_id, range) = self.ds_iter.next()?;
874                    self.current_client_id = Some(client_id.clone());
875                    self.current_index = None;
876                    let mut iter = range.iter();
877                    let range = iter.next();
878                    self.range_iter = Some(iter);
879                    range
880                }
881                other => other,
882            };
883            return self.next(txn);
884        }
885    }
886}
887
888#[cfg(test)]
889mod test {
890    use crate::block::ItemContent;
891    use crate::id_set::{IdRange, IdSet};
892    use crate::iter::TxnIterator;
893    use crate::slice::BlockSlice;
894    use crate::test_utils::exchange_updates;
895    use crate::updates::decoder::{Decode, DecoderV1};
896    use crate::updates::encoder::{Encode, Encoder, EncoderV1};
897    use crate::{DeleteSet, Doc, Options, ReadTxn, Text, Transact, ID};
898    use std::collections::HashSet;
899    use std::fmt::Debug;
900
901    #[test]
902    fn id_range_merge_continous() {
903        // `b` entirely within `a`
904        let mut a = IdRange::Continuous(0..5);
905        a.merge(IdRange::Continuous(2..4));
906        assert_eq!(a, IdRange::Continuous(0..5));
907
908        // the tail of `a` crosses the head of `b`
909        let mut a = IdRange::Continuous(0..5);
910        a.merge(IdRange::Continuous(4..9));
911        assert_eq!(a, IdRange::Continuous(0..9));
912
913        // `b` is immediately adjacent to the end of `a`
914        let mut a = IdRange::Continuous(0..5);
915        a.merge(IdRange::Continuous(5..9));
916        assert_eq!(a, IdRange::Continuous(0..9));
917
918        // `a` does not intersect with `b`
919        let mut a = IdRange::Continuous(0..4);
920        a.merge(IdRange::Continuous(6..9));
921        assert_eq!(a, IdRange::Fragmented(vec![0..4, 6..9]));
922    }
923
924    #[test]
925    fn id_range_compact() {
926        let mut r = IdRange::Fragmented(vec![(0..3), (3..5), (6..7)]);
927        r.squash();
928        assert_eq!(r, IdRange::Fragmented(vec![(0..5), (6..7)]));
929    }
930
931    #[test]
932    fn id_range_invert() {
933        assert!(IdRange::Continuous(0..3).invert().is_empty());
934
935        assert_eq!(
936            IdRange::Continuous(3..5).invert(),
937            IdRange::Continuous(0..3)
938        );
939
940        assert_eq!(
941            IdRange::Fragmented(vec![0..3, 4..5]).invert(),
942            IdRange::Continuous(3..4)
943        );
944
945        assert_eq!(
946            IdRange::Fragmented(vec![3..4, 7..9]).invert(),
947            IdRange::Fragmented(vec![0..3, 4..7])
948        );
949    }
950
951    #[test]
952    fn id_range_contains() {
953        assert!(!IdRange::Continuous(1..3).contains(0));
954        assert!(IdRange::Continuous(1..3).contains(1));
955        assert!(IdRange::Continuous(1..3).contains(2));
956        assert!(!IdRange::Continuous(1..3).contains(3));
957
958        assert!(!IdRange::Fragmented(vec![1..3, 4..5]).contains(0));
959        assert!(IdRange::Fragmented(vec![1..3, 4..5]).contains(1));
960        assert!(IdRange::Fragmented(vec![1..3, 4..5]).contains(2));
961        assert!(!IdRange::Fragmented(vec![1..3, 4..5]).contains(3));
962        assert!(IdRange::Fragmented(vec![1..3, 4..5]).contains(4));
963        assert!(!IdRange::Fragmented(vec![1..3, 4..5]).contains(5));
964        assert!(!IdRange::Fragmented(vec![1..3, 4..5]).contains(6));
965    }
966
967    #[test]
968    fn id_range_push() {
969        let mut range = IdRange::Continuous(0..0);
970
971        range.push(0..4);
972        assert_eq!(range, IdRange::Continuous(0..4));
973
974        range.push(4..6);
975        assert_eq!(range, IdRange::Continuous(0..6));
976
977        range.push(7..9);
978        assert_eq!(range, IdRange::Fragmented(vec![0..6, 7..9]));
979    }
980
981    #[test]
982    fn id_range_subset_of() {
983        assert!(IdRange::Continuous(1..2).subset_of(&IdRange::Continuous(1..2)));
984        assert!(IdRange::Continuous(1..2).subset_of(&IdRange::Continuous(0..2)));
985        assert!(IdRange::Continuous(1..2).subset_of(&IdRange::Continuous(1..3)));
986
987        assert!(IdRange::Fragmented(vec![1..2, 3..4]).subset_of(&IdRange::Continuous(1..4)));
988        assert!(IdRange::Fragmented(vec![1..2, 3..4, 5..6])
989            .subset_of(&IdRange::Fragmented(vec![1..2, 3..6])));
990        assert!(
991            IdRange::Fragmented(vec![3..4, 5..6]).subset_of(&IdRange::Fragmented(vec![0..1, 3..6]))
992        );
993        assert!(
994            IdRange::Fragmented(vec![3..4, 5..6]).subset_of(&IdRange::Fragmented(vec![3..4, 5..6]))
995        );
996
997        assert!(!IdRange::Continuous(1..3).subset_of(&IdRange::Continuous(0..2)));
998        assert!(!IdRange::Continuous(1..3).subset_of(&IdRange::Continuous(1..2)));
999        assert!(!IdRange::Continuous(1..3).subset_of(&IdRange::Continuous(2..3)));
1000        assert!(!IdRange::Continuous(1..3).subset_of(&IdRange::Continuous(2..4)));
1001
1002        assert!(!IdRange::Fragmented(vec![1..2, 3..4]).subset_of(&IdRange::Continuous(1..3)));
1003        assert!(!IdRange::Fragmented(vec![1..2, 3..6])
1004            .subset_of(&IdRange::Fragmented(vec![1..2, 3..5])));
1005        assert!(!IdRange::Fragmented(vec![1..2, 3..6])
1006            .subset_of(&IdRange::Fragmented(vec![1..2, 4..7])));
1007    }
1008
1009    #[test]
1010    fn id_range_subtract() {
1011        // subtract from the left side
1012        let mut a = IdRange::Continuous(0..4);
1013        a.subtract(IdRange::Continuous(0..2));
1014        assert_eq!(a, IdRange::Continuous(2..4));
1015
1016        // subtract from the right side
1017        let mut a = IdRange::Continuous(0..4);
1018        a.subtract(IdRange::Continuous(2..4));
1019        assert_eq!(a, IdRange::Continuous(0..2));
1020
1021        // subtract in the middle - splitting the continuous block in two
1022        let mut a = IdRange::Continuous(0..4);
1023        a.subtract(IdRange::Continuous(1..3));
1024        assert_eq!(a, IdRange::Fragmented(vec![0..1, 3..4]));
1025
1026        // subtract with fragmented block - splitting single range into >2 ranges
1027        let mut a = IdRange::Continuous(0..10);
1028        a.subtract(IdRange::Fragmented(vec![1..3, 4..5]));
1029        assert_eq!(a, IdRange::Fragmented(vec![0..1, 3..4, 5..10]));
1030
1031        // subtract continuous range overlapping with more than one range
1032        let mut a = IdRange::Fragmented(vec![0..4, 5..6, 7..10]);
1033        a.subtract(IdRange::Continuous(3..8));
1034        assert_eq!(a, IdRange::Fragmented(vec![0..3, 8..10]));
1035
1036        // subtract two fragmented ranges with partially overlapping boundaries
1037        let mut a = IdRange::Fragmented(vec![0..4, 7..10]);
1038        a.subtract(IdRange::Fragmented(vec![3..5, 6..9]));
1039        assert_eq!(a, IdRange::Fragmented(vec![0..3, 9..10]));
1040
1041        // subtract fragmented ranges, when one gets split into 2+, and another overlaps at the end
1042        let mut a = IdRange::Fragmented(vec![0..4, 7..10]);
1043        a.subtract(IdRange::Fragmented(vec![2..3, 5..6, 9..10]));
1044        assert_eq!(a, IdRange::Fragmented(vec![0..2, 3..4, 7..9]));
1045    }
1046
1047    #[test]
1048    fn id_range_intersect() {
1049        // Basic continuous range intersection
1050        let mut a = IdRange::Continuous(0..10);
1051        a.intersect(IdRange::Continuous(5..15));
1052        assert_eq!(a, IdRange::Continuous(5..10));
1053
1054        // Fragmented intersecting with continuous
1055        let mut a = IdRange::Fragmented(vec![0..5, 10..15]);
1056        a.intersect(IdRange::Continuous(3..12));
1057        assert_eq!(a, IdRange::Fragmented(vec![3..5, 10..12]));
1058
1059        // No overlap - empty result
1060        let mut a = IdRange::Continuous(0..5);
1061        a.intersect(IdRange::Continuous(10..15));
1062        assert_eq!(a, IdRange::Continuous(0..0));
1063
1064        // Multiple ranges with multiple intersections
1065        let mut a = IdRange::Fragmented(vec![1..4, 6..9]);
1066        a.intersect(IdRange::Continuous(2..7));
1067        assert_eq!(a, IdRange::Fragmented(vec![2..4, 6..7]));
1068
1069        // Complete overlap
1070        let mut a = IdRange::Continuous(2..8);
1071        a.intersect(IdRange::Continuous(0..10));
1072        assert_eq!(a, IdRange::Continuous(2..8));
1073
1074        // Partial overlap on both sides
1075        let mut a = IdRange::Continuous(5..15);
1076        a.intersect(IdRange::Continuous(0..10));
1077        assert_eq!(a, IdRange::Continuous(5..10));
1078
1079        // Fragmented with fragmented
1080        let mut a = IdRange::Fragmented(vec![0..5, 10..15, 20..25]);
1081        a.intersect(IdRange::Fragmented(vec![3..12, 22..30]));
1082        assert_eq!(a, IdRange::Fragmented(vec![3..5, 10..12, 22..25]));
1083
1084        // Exact match
1085        let mut a = IdRange::Continuous(5..10);
1086        a.intersect(IdRange::Continuous(5..10));
1087        assert_eq!(a, IdRange::Continuous(5..10));
1088
1089        // Single element overlap
1090        let mut a = IdRange::Continuous(0..5);
1091        a.intersect(IdRange::Continuous(4..10));
1092        assert_eq!(a, IdRange::Continuous(4..5));
1093    }
1094
1095    #[test]
1096    fn id_range_encode_decode() {
1097        roundtrip(&IdRange::Continuous(0..4));
1098        roundtrip(&IdRange::Fragmented(vec![1..4, 5..8]));
1099    }
1100
1101    #[test]
1102    fn id_set_encode_decode() {
1103        let mut set = IdSet::new();
1104        set.insert(ID::new(124, 0), 1);
1105        set.insert(ID::new(1337, 0), 12);
1106        set.insert(ID::new(124, 1), 3);
1107
1108        roundtrip(&set);
1109    }
1110
1111    fn roundtrip<T>(value: &T)
1112    where
1113        T: Encode + Decode + PartialEq + Debug,
1114    {
1115        let mut encoder = EncoderV1::new();
1116        value.encode(&mut encoder);
1117        let buf = encoder.to_vec();
1118        let mut decoder = DecoderV1::from(buf.as_slice());
1119        let decoded = T::decode(&mut decoder).unwrap();
1120
1121        assert_eq!(value, &decoded);
1122    }
1123
1124    #[test]
1125    fn deleted_blocks() {
1126        let mut o = Options::default();
1127        o.client_id = 1;
1128        o.skip_gc = true;
1129        let d1 = Doc::with_options(o.clone());
1130        let t1 = d1.get_or_insert_text("test");
1131
1132        o.client_id = 2;
1133        let d2 = Doc::with_options(o);
1134        let t2 = d2.get_or_insert_text("test");
1135
1136        t1.insert(&mut d1.transact_mut(), 0, "aaaaa");
1137        t1.insert(&mut d1.transact_mut(), 0, "bbb");
1138
1139        exchange_updates(&[&d1, &d2]);
1140
1141        t2.insert(&mut d2.transact_mut(), 4, "cccc");
1142
1143        exchange_updates(&[&d1, &d2]);
1144
1145        // t1: 'bbbaccccaaaa'
1146        t1.remove_range(&mut d1.transact_mut(), 2, 2); // => 'bbccccaaaa'
1147        t1.remove_range(&mut d1.transact_mut(), 3, 1); // => 'bbcccaaaa'
1148        t1.remove_range(&mut d1.transact_mut(), 3, 1); // => 'bbccaaaa'
1149        t1.remove_range(&mut d1.transact_mut(), 7, 1); // => 'bbccaaa'
1150
1151        let blocks = {
1152            let mut txn = d1.transact_mut();
1153            let s = txn.snapshot();
1154
1155            let mut blocks = HashSet::new();
1156
1157            let mut i = 0;
1158            let mut deleted = s.delete_set.deleted_blocks();
1159            while let Some(BlockSlice::Item(b)) = deleted.next(&txn) {
1160                let item = txn.store.materialize(b);
1161                if let ItemContent::String(str) = &item.content {
1162                    let t = (
1163                        item.is_deleted(),
1164                        item.id,
1165                        item.len(),
1166                        str.as_str().to_string(),
1167                    );
1168                    blocks.insert(t);
1169                }
1170                i += 1;
1171                if i == 5 {
1172                    break;
1173                }
1174            }
1175            blocks
1176        };
1177
1178        let expected = HashSet::from([
1179            (true, ID::new(1, 0), 1, "a".to_owned()),
1180            (true, ID::new(1, 4), 1, "a".to_owned()),
1181            (true, ID::new(1, 7), 1, "b".to_owned()),
1182            (true, ID::new(2, 1), 2, "cc".to_owned()),
1183        ]);
1184
1185        assert_eq!(blocks, expected);
1186    }
1187
1188    #[test]
1189    fn deleted_blocks2() {
1190        let mut ds = DeleteSet::new();
1191        let doc = Doc::with_client_id(1);
1192        let txt = doc.get_or_insert_text("test");
1193        txt.push(&mut doc.transact_mut(), "testab");
1194        ds.insert(ID::new(1, 5), 1);
1195        let txn = doc.transact_mut();
1196        let mut i = ds.deleted_blocks();
1197        let ptr = i.next(&txn).unwrap();
1198        let start = ptr.clock_start();
1199        let end = ptr.clock_end();
1200        assert_eq!(start, 5);
1201        assert_eq!(end, 5);
1202        assert!(i.next(&txn).is_none());
1203    }
1204}