yrs/
update.rs

1use std::cmp::Ordering;
2use std::collections::hash_map::Entry;
3use std::collections::{HashMap, VecDeque};
4use std::hash::BuildHasherDefault;
5use std::sync::Arc;
6
7use crate::block::{
8    BlockRange, ClientID, Item, ItemContent, ItemPtr, BLOCK_GC_REF_NUMBER, BLOCK_SKIP_REF_NUMBER,
9    HAS_ORIGIN, HAS_PARENT_SUB, HAS_RIGHT_ORIGIN,
10};
11use crate::encoding::read::Error;
12use crate::error::UpdateError;
13use crate::id_set::{DeleteSet, IdSet};
14use crate::slice::ItemSlice;
15#[cfg(test)]
16use crate::store::Store;
17use crate::transaction::TransactionMut;
18use crate::types::TypePtr;
19use crate::updates::decoder::{Decode, Decoder};
20use crate::updates::encoder::{Encode, Encoder};
21use crate::utils::client_hasher::ClientHasher;
22use crate::{OffsetKind, StateVector, ID};
23
24#[derive(Debug, Default, PartialEq)]
25pub(crate) struct UpdateBlocks {
26    clients: HashMap<ClientID, VecDeque<BlockCarrier>, BuildHasherDefault<ClientHasher>>,
27}
28
29impl UpdateBlocks {
30    /**
31    @todo this should be refactored.
32    I'm currently using this to add blocks to the Update
33    */
34    pub(crate) fn add_block(&mut self, block: BlockCarrier) {
35        let e = self.clients.entry(block.id().client).or_default();
36        e.push_back(block);
37    }
38
39    pub fn is_empty(&self) -> bool {
40        self.clients.is_empty()
41    }
42
43    /// Returns an iterator that allows a traversal of all of the blocks
44    /// which consist into this [Update].
45    pub(crate) fn blocks(&self) -> Blocks<'_> {
46        Blocks::new(self)
47    }
48
49    /// Returns an iterator that allows a traversal of all of the blocks
50    /// which consist into this [Update].
51    pub(crate) fn into_blocks(self, ignore_skip: bool) -> IntoBlocks {
52        IntoBlocks::new(self, ignore_skip)
53    }
54}
55
56impl std::fmt::Display for UpdateBlocks {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        writeln!(f, "{{")?;
59        for (client, blocks) in self.clients.iter() {
60            writeln!(f, "\t{} -> [", client)?;
61            for block in blocks {
62                writeln!(f, "\t\t{}", block)?;
63            }
64            write!(f, "\t]")?;
65        }
66        writeln!(f, "}}")
67    }
68}
69
70impl std::fmt::Debug for BlockCarrier {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        std::fmt::Display::fmt(self, f)
73    }
74}
75impl std::fmt::Display for BlockCarrier {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        match self {
78            BlockCarrier::Item(x) => x.fmt(f),
79            BlockCarrier::Skip(x) => write!(f, "Skip{}", x),
80            BlockCarrier::GC(x) => write!(f, "GC{}", x),
81        }
82    }
83}
84
85/// Update type which contains an information about all decoded blocks which are incoming from a
86/// remote peer. Since these blocks are not yet integrated into current document's block store,
87/// they still may require repairing before doing so as they don't contain full data about their
88/// relations.
89///
90/// Update is conceptually similar to a block store itself, however the work patters are different.
91#[derive(Default, PartialEq)]
92pub struct Update {
93    pub(crate) blocks: UpdateBlocks,
94    pub(crate) delete_set: DeleteSet,
95}
96
97impl Update {
98    /// Binary containing an empty update, serialized via lib0 version 1 encoder.
99    pub const EMPTY_V1: &'static [u8] = &[0, 0];
100
101    /// Binary containing an empty update, serialized via lib0 version 2 encoder.
102    pub const EMPTY_V2: &'static [u8] = &[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0];
103
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    /// Check if current update is empty.
109    pub fn is_empty(&self) -> bool {
110        self.blocks.is_empty() && self.delete_set.is_empty()
111    }
112
113    /// Check if current update has changes that add new information to a document with given state.
114    pub fn extends(&self, state_vector: &StateVector) -> bool {
115        for (client_id, blocks) in self.blocks.clients.iter() {
116            let clock = state_vector.get(client_id);
117            let mut iter = blocks.iter();
118            while let Some(block) = iter.next() {
119                let range = block.range();
120                if range.id.clock <= clock && range.id.clock + range.len > clock {
121                    // this block overlaps or extends current state. It must NOT be Skip
122                    // in order to introduce any new changes
123                    if !block.is_skip() {
124                        return true;
125                    }
126                }
127            }
128        }
129        false
130    }
131
132    /// Returns a state vector representing an upper bound of client clocks included by blocks
133    /// stored in current update.
134    pub fn state_vector(&self) -> StateVector {
135        let mut sv = StateVector::default();
136        for (&client, blocks) in self.blocks.clients.iter() {
137            let mut last_clock = 0;
138            if !blocks.is_empty() && blocks[0].id().clock == 0 {
139                // we expect clocks to start from 0, otherwise blocks for this client are not
140                // continuous
141                for block in blocks.iter() {
142                    if let BlockCarrier::Skip(_) = block {
143                        // if we met skip, we stop counting: blocks are not continuous any more
144                        break;
145                    }
146                    last_clock = block.id().clock + block.len();
147                }
148            }
149            if last_clock != 0 {
150                sv.set_max(client, last_clock);
151            }
152        }
153        sv
154    }
155
156    /// Returns a state vector representing a lower bound of items inserted by this update,
157    /// grouped by their respective clients.
158    pub fn state_vector_lower(&self) -> StateVector {
159        let mut sv = StateVector::default();
160        for (&client, blocks) in self.blocks.clients.iter() {
161            for block in blocks.iter() {
162                if !block.is_skip() {
163                    let id = block.id();
164                    sv.set_max(client, id.clock);
165                    break;
166                }
167            }
168        }
169        sv
170    }
171
172    /// Returns an insertion set associated with current update.
173    /// It contains ids of all blocks inserted by this update.
174    /// If `include_deleted` flag is set, result will include GC'ed blocks and ones that were
175    /// inserted but softly deleted.
176    pub fn insertions(&self, include_deleted: bool) -> IdSet {
177        let mut insertions = IdSet::default();
178        for blocks in self.blocks.clients.values() {
179            for block in blocks.iter() {
180                match block {
181                    BlockCarrier::Item(item) if include_deleted || !item.is_deleted() => {
182                        insertions.insert(item.id, item.len);
183                    }
184                    BlockCarrier::GC(range) if include_deleted => {
185                        insertions.insert(range.id, range.len);
186                    }
187                    _ => {}
188                }
189            }
190        }
191        insertions.squash();
192        insertions
193    }
194
195    /// Returns a delete set associated with current update.
196    pub fn delete_set(&self) -> &DeleteSet {
197        &self.delete_set
198    }
199
200    /// Merges another update into current one. Their blocks are deduplicated and reordered.
201    pub fn merge(&mut self, other: Self) {
202        for (client, other_blocks) in other.blocks.clients {
203            match self.blocks.clients.entry(client) {
204                Entry::Occupied(e) => {
205                    let blocks = e.into_mut();
206                    let mut i2 = other_blocks.into_iter();
207                    let mut n2 = i2.next();
208
209                    let mut i1 = 0;
210
211                    while i1 < blocks.len() {
212                        let a = &mut blocks[i1];
213                        if let Some(b) = n2.as_ref() {
214                            if a.try_squash(b) {
215                                n2 = i2.next();
216                                continue;
217                            } else if let BlockCarrier::Item(block) = a {
218                                // we only can split Block::Item
219                                let diff = (block.id().clock + block.len()) as isize
220                                    - b.id().clock as isize;
221                                if diff > 0 {
222                                    // `b`'s clock position is inside of `a` -> we need to split `a`
223                                    if let Some(new) = a.splice(diff as u32) {
224                                        blocks.insert(i1 + 1, new);
225                                    }
226                                    //blocks = self.blocks.clients.get_mut(&client).unwrap();
227                                }
228                            }
229                            i1 += 1;
230                            n2 = i2.next();
231                        } else {
232                            break;
233                        }
234                    }
235
236                    while let Some(b) = n2 {
237                        blocks.push_back(b);
238                        n2 = i2.next();
239                    }
240                }
241                Entry::Vacant(e) => {
242                    e.insert(other_blocks);
243                }
244            }
245        }
246    }
247
248    /// Integrates current update into a block store referenced by a given transaction.
249    /// If entire integration process was successful a `None` value is returned. Otherwise a
250    /// pending update object is returned which contains blocks that couldn't be integrated, most
251    /// likely because there were missing blocks that are used as a dependencies of other blocks
252    /// contained in this update.
253    pub(crate) fn integrate(
254        mut self,
255        txn: &mut TransactionMut,
256    ) -> Result<(Option<PendingUpdate>, Option<Update>), UpdateError> {
257        let remaining_blocks = if self.blocks.is_empty() {
258            None
259        } else {
260            let mut store = txn.store_mut();
261            let mut client_block_ref_ids: Vec<ClientID> =
262                self.blocks.clients.keys().cloned().collect();
263            client_block_ref_ids.sort();
264
265            let mut current_client_id = client_block_ref_ids.pop().unwrap();
266            let mut current_target = self.blocks.clients.get_mut(&current_client_id);
267            let mut stack_head = if let Some(v) = current_target.as_mut() {
268                v.pop_front()
269            } else {
270                None
271            };
272
273            let mut local_sv = store.blocks.get_state_vector();
274            let mut missing_sv = StateVector::default();
275            let mut remaining = UpdateBlocks::default();
276            let mut stack = Vec::new();
277
278            while let Some(mut block) = stack_head {
279                if !block.is_skip() {
280                    let id = *block.id();
281                    if local_sv.contains(&id) {
282                        let offset = local_sv.get(&id.client) as i32 - id.clock as i32;
283                        if let Some(dep) = Self::missing(&block, &local_sv) {
284                            stack.push(block);
285                            // get the struct reader that has the missing struct
286                            match self.blocks.clients.get_mut(&dep) {
287                                Some(block_refs) if !block_refs.is_empty() => {
288                                    stack_head = block_refs.pop_front();
289                                    current_target =
290                                        self.blocks.clients.get_mut(&current_client_id);
291                                    continue;
292                                }
293                                _ => {
294                                    // This update message causally depends on another update message that doesn't exist yet
295                                    missing_sv.set_min(dep, local_sv.get(&dep));
296                                    Self::return_stack(stack, &mut self.blocks, &mut remaining);
297                                    current_target =
298                                        self.blocks.clients.get_mut(&current_client_id);
299                                    stack = Vec::new();
300                                }
301                            }
302                        } else if offset == 0 || (offset as u32) < block.len() {
303                            let offset = offset as u32;
304                            let client = id.client;
305                            local_sv.set_max(client, id.clock + block.len());
306                            if let BlockCarrier::Item(item) = &mut block {
307                                item.repair(store)?;
308                            }
309                            let should_delete = block.integrate(txn, offset);
310                            let mut delete_ptr = if should_delete {
311                                let ptr = block.as_item_ptr();
312                                ptr
313                            } else {
314                                None
315                            };
316                            store = txn.store_mut();
317                            match block {
318                                BlockCarrier::Item(item) => {
319                                    if item.parent != TypePtr::Unknown {
320                                        store.blocks.push_block(item)
321                                    } else {
322                                        // parent is not defined. Integrate GC struct instead
323                                        store.blocks.push_gc(BlockRange::new(item.id, item.len));
324                                        delete_ptr = None;
325                                    }
326                                }
327                                BlockCarrier::GC(gc) => store.blocks.push_gc(gc),
328                                BlockCarrier::Skip(_) => { /* do nothing */ }
329                            }
330
331                            if let Some(ptr) = delete_ptr {
332                                txn.delete(ptr);
333                            }
334                            store = txn.store_mut();
335                        }
336                    } else {
337                        // update from the same client is missing
338                        let id = block.id();
339                        missing_sv.set_min(id.client, id.clock - 1);
340                        stack.push(block);
341                        // hid a dead wall, add all items from stack to restSS
342                        Self::return_stack(stack, &mut self.blocks, &mut remaining);
343                        current_target = self.blocks.clients.get_mut(&current_client_id);
344                        stack = Vec::new();
345                    }
346                }
347
348                // iterate to next stackHead
349                if !stack.is_empty() {
350                    stack_head = stack.pop();
351                } else {
352                    match current_target.take() {
353                        Some(v) if !v.is_empty() => {
354                            stack_head = v.pop_front();
355                            current_target = Some(v);
356                        }
357                        _ => {
358                            if let Some((client_id, target)) =
359                                Self::next_target(&mut client_block_ref_ids, &mut self.blocks)
360                            {
361                                stack_head = target.pop_front();
362                                current_client_id = client_id;
363                                current_target = Some(target);
364                            } else {
365                                // we're done
366                                break;
367                            }
368                        }
369                    };
370                }
371            }
372
373            if remaining.is_empty() {
374                None
375            } else {
376                Some(PendingUpdate {
377                    update: Update {
378                        blocks: remaining,
379                        delete_set: DeleteSet::new(),
380                    },
381                    missing: missing_sv,
382                })
383            }
384        };
385
386        let remaining_ds = txn.apply_delete(&self.delete_set).map(|ds| {
387            let mut update = Update::new();
388            update.delete_set = ds;
389            update
390        });
391
392        Ok((remaining_blocks, remaining_ds))
393    }
394
395    fn missing(block: &BlockCarrier, local_sv: &StateVector) -> Option<ClientID> {
396        if let BlockCarrier::Item(item) = block {
397            if let Some(origin) = &item.origin {
398                if origin.client != item.id.client && origin.clock >= local_sv.get(&origin.client) {
399                    return Some(origin.client);
400                }
401            }
402
403            if let Some(right_origin) = &item.right_origin {
404                if right_origin.client != item.id.client
405                    && right_origin.clock >= local_sv.get(&right_origin.client)
406                {
407                    return Some(right_origin.client);
408                }
409            }
410
411            match &item.parent {
412                TypePtr::Branch(parent) => {
413                    if let Some(block) = &parent.item {
414                        let parent_id = block.id();
415                        if parent_id.client != item.id.client
416                            && parent_id.clock >= local_sv.get(&parent_id.client)
417                        {
418                            return Some(parent_id.client);
419                        }
420                    }
421                }
422                TypePtr::ID(parent_id) => {
423                    if parent_id.client != item.id.client
424                        && parent_id.clock >= local_sv.get(&parent_id.client)
425                    {
426                        return Some(parent_id.client);
427                    }
428                }
429                _ => {}
430            }
431
432            match &item.content {
433                ItemContent::Move(m) => {
434                    if let Some(start) = m.start.id() {
435                        if start.clock >= local_sv.get(&start.client) {
436                            return Some(start.client);
437                        }
438                    }
439                    if !m.is_collapsed() {
440                        if let Some(end) = m.end.id() {
441                            if end.clock >= local_sv.get(&end.client) {
442                                return Some(end.client);
443                            }
444                        }
445                    }
446                }
447                ItemContent::Type(branch) => {
448                    #[cfg(feature = "weak")]
449                    if let crate::types::TypeRef::WeakLink(source) = &branch.type_ref {
450                        let start = source.quote_start.id();
451                        let end = source.quote_end.id();
452                        if let Some(start) = start {
453                            if start.clock >= local_sv.get(&start.client) {
454                                return Some(start.client);
455                            }
456                        }
457                        if start != end {
458                            if let Some(end) = &source.quote_end.id() {
459                                if end.clock >= local_sv.get(&end.client) {
460                                    return Some(end.client);
461                                }
462                            }
463                        }
464                    }
465                }
466                _ => { /* do nothing */ }
467            }
468        }
469        None
470    }
471
472    fn next_target<'a, 'b>(
473        client_block_ref_ids: &'a mut Vec<ClientID>,
474        blocks: &'b mut UpdateBlocks,
475    ) -> Option<(ClientID, &'b mut VecDeque<BlockCarrier>)> {
476        while let Some(id) = client_block_ref_ids.pop() {
477            match blocks.clients.get(&id) {
478                Some(client_blocks) if !client_blocks.is_empty() => {
479                    // we need to borrow client_blocks in mutable context AND we're
480                    // doing so in a loop at the same time - this combination causes
481                    // Rust borrow checker go nuts. TODO: remove the unsafe block
482                    let client_blocks = unsafe {
483                        (client_blocks as *const VecDeque<BlockCarrier>
484                            as *mut VecDeque<BlockCarrier>)
485                            .as_mut()
486                            .unwrap()
487                    };
488                    return Some((id, client_blocks));
489                }
490                _ => {}
491            }
492        }
493        None
494    }
495
496    fn return_stack(
497        stack: Vec<BlockCarrier>,
498        refs: &mut UpdateBlocks,
499        remaining: &mut UpdateBlocks,
500    ) {
501        for item in stack.into_iter() {
502            let client = item.id().client;
503            // remove client from clientsStructRefsIds to prevent users from applying the same update again
504            if let Some(mut unapplicable_items) = refs.clients.remove(&client) {
505                // decrement because we weren't able to apply previous operation
506                unapplicable_items.push_front(item);
507                remaining.clients.insert(client, unapplicable_items);
508            } else {
509                // item was the last item on clientsStructRefs and the field was already cleared.
510                // Add item to restStructs and continue
511                let mut blocks = VecDeque::with_capacity(1);
512                blocks.push_back(item);
513                remaining.clients.insert(client, blocks);
514            }
515        }
516    }
517
518    fn decode_block<D: Decoder>(id: ID, decoder: &mut D) -> Result<Option<BlockCarrier>, Error> {
519        let info = decoder.read_info()?;
520        match info {
521            BLOCK_SKIP_REF_NUMBER => {
522                let len: u32 = decoder.read_var()?;
523                Ok(Some(BlockCarrier::Skip(BlockRange { id, len })))
524            }
525            BLOCK_GC_REF_NUMBER => {
526                let len: u32 = decoder.read_len()?;
527                Ok(Some(BlockCarrier::GC(BlockRange { id, len })))
528            }
529            info => {
530                let cant_copy_parent_info = info & (HAS_ORIGIN | HAS_RIGHT_ORIGIN) == 0;
531                let origin = if info & HAS_ORIGIN != 0 {
532                    Some(decoder.read_left_id()?)
533                } else {
534                    None
535                };
536                let right_origin = if info & HAS_RIGHT_ORIGIN != 0 {
537                    Some(decoder.read_right_id()?)
538                } else {
539                    None
540                };
541                let parent = if cant_copy_parent_info {
542                    if decoder.read_parent_info()? {
543                        TypePtr::Named(decoder.read_string()?.into())
544                    } else {
545                        TypePtr::ID(decoder.read_left_id()?)
546                    }
547                } else {
548                    TypePtr::Unknown
549                };
550                let parent_sub: Option<Arc<str>> =
551                    if cant_copy_parent_info && (info & HAS_PARENT_SUB != 0) {
552                        Some(decoder.read_string()?.into())
553                    } else {
554                        None
555                    };
556                let content = ItemContent::decode(decoder, info)?;
557                let item = Item::new(
558                    id,
559                    None,
560                    origin,
561                    None,
562                    right_origin,
563                    parent,
564                    parent_sub,
565                    content,
566                );
567                match item {
568                    None => Ok(None),
569                    Some(item) => Ok(Some(BlockCarrier::from(item))),
570                }
571            }
572        }
573    }
574
575    pub(crate) fn encode_diff<E: Encoder>(&self, remote_sv: &StateVector, encoder: &mut E) {
576        let mut clients = HashMap::new();
577        for (client, blocks) in self.blocks.clients.iter() {
578            let remote_clock = remote_sv.get(client);
579            let mut iter = blocks.iter();
580            let mut curr = iter.next();
581            while let Some(block) = curr {
582                if block.is_skip() {
583                    curr = iter.next();
584                } else if block.id().clock + block.len() > remote_clock {
585                    let e = clients.entry(*client).or_insert_with(|| (0, Vec::new()));
586                    e.0 = (remote_clock as i64 - block.id().clock as i64).max(0) as u32;
587                    e.1.push(block);
588                    curr = iter.next();
589                    while let Some(block) = curr {
590                        e.1.push(block);
591                        curr = iter.next();
592                    }
593                } else {
594                    // read until something new comes up
595                    curr = iter.next();
596                }
597            }
598        }
599
600        // Write higher clients first ⇒ sort by clientID & clock and remove decoders without content
601        let mut sorted_clients: Vec<_> =
602            clients.iter().filter(|(_, (_, q))| !q.is_empty()).collect();
603        sorted_clients.sort_by(|&(x_id, _), &(y_id, _)| y_id.cmp(x_id));
604
605        // finish lazy struct writing
606        encoder.write_var(sorted_clients.len());
607        for (&client, (offset, blocks)) in sorted_clients {
608            encoder.write_var(blocks.len());
609            encoder.write_client(client);
610
611            let mut block = blocks[0];
612            encoder.write_var(block.id().clock + offset);
613            block.encode_with_offset(encoder, *offset);
614            for i in 1..blocks.len() {
615                block = blocks[i];
616                block.encode_with_offset(encoder, 0);
617            }
618        }
619        self.delete_set.encode(encoder)
620    }
621
622    pub fn merge_updates<T>(block_stores: T) -> Update
623    where
624        T: IntoIterator<Item = Update>,
625    {
626        let mut result = Update::new();
627        let update_blocks: Vec<UpdateBlocks> = block_stores
628            .into_iter()
629            .map(|update| {
630                result.delete_set.merge(update.delete_set);
631                update.blocks
632            })
633            .collect();
634
635        let mut lazy_struct_decoders: VecDeque<_> = update_blocks
636            .into_iter()
637            .filter(|block_store| !block_store.is_empty())
638            .map(|update_blocks| {
639                let mut memo = update_blocks.into_blocks(true).memoized();
640                memo.move_next();
641                memo
642            })
643            .collect();
644
645        let mut curr_write: Option<BlockCarrier> = None;
646
647        // Note: We need to ensure that all lazyStructDecoders are fully consumed
648        // Note: Should merge document updates whenever possible - even from different updates
649        // Note: Should handle that some operations cannot be applied yet ()
650        loop {
651            {
652                lazy_struct_decoders
653                    .retain(|lazy_struct_decoder| lazy_struct_decoder.current().is_some());
654                // sort
655                lazy_struct_decoders
656                    .make_contiguous()
657                    .sort_by(|dec1, dec2| {
658                        // Write higher clients first ⇒ sort by clientID & clock and remove decoders without content
659                        let left = dec1.current().unwrap();
660                        let right = dec2.current().unwrap();
661                        let lid = left.id();
662                        let rid = right.id();
663                        match lid.client.cmp(&rid.client) {
664                            Ordering::Equal => match lid.clock.cmp(&rid.clock) {
665                                Ordering::Equal if left.same_type(right) => Ordering::Equal,
666                                Ordering::Equal if left.is_skip() => Ordering::Greater,
667                                Ordering::Equal => Ordering::Less,
668                                Ordering::Less if !left.is_skip() || right.is_skip() => {
669                                    Ordering::Less
670                                }
671                                ordering => ordering,
672                            },
673                            ordering => ordering.reverse(),
674                        }
675                    });
676            }
677
678            let curr_decoder = match lazy_struct_decoders.iter_mut().next() {
679                Some(decoder) => decoder,
680                None => break,
681            };
682
683            let curr_block = match curr_decoder.current() {
684                Some(block) => block,
685                None => continue,
686            };
687            // write from currDecoder until the next operation is from another client or if filler-struct
688            // then we need to reorder the decoders and find the next operation to write
689            let first_client = curr_block.id().client;
690            if let Some(curr_write_block) = curr_write.as_mut() {
691                let mut iterated = false;
692
693                // iterate until we find something that we haven't written already
694                // remember: first the high client-ids are written
695                let curr_write_last = curr_write_block.id().clock + curr_write_block.len();
696                while match curr_decoder.current() {
697                    Some(block) => {
698                        let last = block.id().clock + block.len();
699                        last <= curr_write_last && block.id().client >= curr_write_block.id().client
700                    }
701                    None => false,
702                } {
703                    curr_decoder.move_next();
704                    iterated = true;
705                }
706
707                let curr_block = match curr_decoder.current() {
708                    Some(block) => block,
709                    None => continue,
710                };
711                let cid = curr_block.id();
712                if cid.client != first_client || // check whether there is another decoder that has has updates from `firstClient`
713                    (iterated && cid.clock > curr_write_last)
714                // the above while loop was used and we are potentially missing updates
715                {
716                    continue;
717                }
718
719                if first_client != curr_write_block.id().client {
720                    result
721                        .blocks
722                        .add_block(curr_write.unwrap_or_else(|| unreachable!()));
723                    curr_write = curr_decoder.take();
724                    curr_decoder.move_next();
725                } else if curr_write_last < curr_block.id().clock {
726                    //TODO: write currStruct & set currStruct = Skip(clock = currStruct.id.clock + currStruct.length, length = curr.id.clock - self.clock)
727                    let skip = match curr_write.unwrap_or_else(|| unreachable!()) {
728                        BlockCarrier::Skip(mut skip) => {
729                            // extend existing skip
730                            skip.len = curr_block.id().clock + curr_block.len() - skip.id.clock;
731                            skip
732                        }
733                        other => {
734                            result.blocks.add_block(other);
735                            let diff = curr_block.id().clock - curr_write_last;
736                            BlockRange::new(ID::new(first_client, curr_write_last), diff)
737                        }
738                    };
739                    curr_write = Some(BlockCarrier::Skip(skip));
740                } else {
741                    // if (currWrite.struct.id.clock + currWrite.struct.length >= curr.id.clock) {
742                    let diff = curr_write_last.saturating_sub(curr_block.id().clock);
743
744                    let mut block_slice = None;
745                    if diff > 0 {
746                        if let BlockCarrier::Skip(skip) = curr_write_block {
747                            // prefer to slice Skip because the other struct might contain more information
748                            skip.len -= diff as u32;
749                        } else {
750                            block_slice = Some(curr_block.splice(diff as u32).unwrap());
751                        }
752                    }
753
754                    let curr_block = block_slice
755                        .as_ref()
756                        .or(curr_decoder.current())
757                        .unwrap_or_else(|| unreachable!());
758                    if !curr_write_block.try_squash(curr_block) {
759                        result
760                            .blocks
761                            .add_block(curr_write.unwrap_or_else(|| unreachable!()));
762                        curr_write = block_slice.or_else(|| curr_decoder.take());
763                        curr_decoder.move_next();
764                    }
765                }
766            } else {
767                curr_write = curr_decoder.take();
768                curr_decoder.move_next();
769            }
770
771            while let Some(next) = curr_decoder.current() {
772                let block = curr_write.as_ref().unwrap();
773                let nid = next.id();
774                if nid.client == first_client && nid.clock == block.id().clock + block.len() {
775                    result.blocks.add_block(curr_write.unwrap());
776                    curr_write = curr_decoder.take();
777                    curr_decoder.move_next();
778                } else {
779                    break;
780                }
781            }
782        }
783
784        if let Some(block) = curr_write.take() {
785            result.blocks.add_block(block);
786        }
787
788        result
789    }
790}
791
792impl Encode for Update {
793    #[inline]
794    fn encode<E: Encoder>(&self, encoder: &mut E) {
795        self.encode_diff(&StateVector::default(), encoder)
796    }
797}
798
799impl Decode for Update {
800    fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
801        // read blocks
802        let clients_len: u32 = decoder.read_var()?;
803        let mut clients = HashMap::with_hasher(BuildHasherDefault::default());
804        clients.try_reserve(clients_len as usize)?;
805
806        let mut blocks = UpdateBlocks { clients };
807        for _ in 0..clients_len {
808            let blocks_len = decoder.read_var::<u32>()? as usize;
809
810            let client = decoder.read_client()?;
811            let mut clock: u32 = decoder.read_var()?;
812            let blocks = blocks
813                .clients
814                .entry(client)
815                .or_insert_with(|| VecDeque::new());
816            // Attempt to pre-allocate memory for the blocks. If the capacity overflows and
817            // allocation fails, return an error.
818            blocks.try_reserve(blocks_len)?;
819
820            for _ in 0..blocks_len {
821                let id = ID::new(client, clock);
822                if let Some(block) = Self::decode_block(id, decoder)? {
823                    // due to bug in the past it was possible for empty bugs to be generated
824                    // even though they had no effect on the document store
825                    clock += block.len();
826                    blocks.push_back(block);
827                }
828            }
829        }
830        // read delete set
831        let delete_set = DeleteSet::decode(decoder)?;
832        Ok(Update { blocks, delete_set })
833    }
834}
835
836/// Similar to [Peekable], but can be used in situation when [Peekable::peek] is not allowed
837/// due to a lack of of `&mut self` reference. [Memo] can be proactively advanced using
838/// [Memo::advance] which works similar to [Peekable::peek], but later peeked element can still be
839/// accessed using [Memo::current] which doesn't require mutable reference.
840struct Memo<I: Iterator + ?Sized> {
841    current: Option<I::Item>,
842    iter: I,
843}
844
845impl<I: Iterator> Memo<I> {
846    fn current(&self) -> Option<&I::Item> {
847        self.current.as_ref()
848    }
849
850    fn take(&mut self) -> Option<I::Item> {
851        self.current.take()
852    }
853
854    fn move_next(&mut self) {
855        self.current = self.iter.next();
856    }
857}
858
859trait Memoizable: Iterator {
860    fn memoized(self) -> Memo<Self>;
861}
862
863impl<T: Iterator> Memoizable for T {
864    fn memoized(self) -> Memo<Self> {
865        Memo {
866            iter: self,
867            current: None,
868        }
869    }
870}
871
872#[derive(PartialEq)]
873pub(crate) enum BlockCarrier {
874    Item(Box<Item>),
875    GC(BlockRange),
876    Skip(BlockRange),
877}
878
879impl BlockCarrier {
880    pub(crate) fn splice(&self, offset: u32) -> Option<Self> {
881        match self {
882            BlockCarrier::Item(x) => {
883                let next = ItemPtr::from(x).splice(offset, OffsetKind::Utf16)?;
884                Some(BlockCarrier::Item(next))
885            }
886            BlockCarrier::Skip(x) => {
887                if offset == 0 {
888                    None
889                } else {
890                    Some(BlockCarrier::Skip(x.slice(offset)))
891                }
892            }
893            BlockCarrier::GC(x) => {
894                if offset == 0 {
895                    None
896                } else {
897                    Some(BlockCarrier::GC(x.slice(offset)))
898                }
899            }
900        }
901    }
902    pub(crate) fn same_type(&self, other: &BlockCarrier) -> bool {
903        match (self, other) {
904            (BlockCarrier::Skip(_), BlockCarrier::Skip(_)) => true,
905            (BlockCarrier::Item(_), BlockCarrier::Item(_)) => true,
906            (BlockCarrier::GC(_), BlockCarrier::GC(_)) => true,
907            (_, _) => false,
908        }
909    }
910    pub(crate) fn id(&self) -> &ID {
911        match self {
912            BlockCarrier::Item(x) => x.id(),
913            BlockCarrier::Skip(x) => &x.id,
914            BlockCarrier::GC(x) => &x.id,
915        }
916    }
917
918    pub(crate) fn len(&self) -> u32 {
919        match self {
920            BlockCarrier::Item(x) => x.len(),
921            BlockCarrier::Skip(x) => x.len,
922            BlockCarrier::GC(x) => x.len,
923        }
924    }
925
926    pub(crate) fn range(&self) -> BlockRange {
927        match self {
928            BlockCarrier::Item(item) => BlockRange::new(item.id, item.len),
929            BlockCarrier::GC(gc) => gc.clone(),
930            BlockCarrier::Skip(skip) => skip.clone(),
931        }
932    }
933
934    pub(crate) fn last_id(&self) -> ID {
935        match self {
936            BlockCarrier::Item(x) => x.last_id(),
937            BlockCarrier::Skip(x) => x.last_id(),
938            BlockCarrier::GC(x) => x.last_id(),
939        }
940    }
941
942    pub(crate) fn try_squash(&mut self, other: &BlockCarrier) -> bool {
943        match (self, other) {
944            (BlockCarrier::Item(a), BlockCarrier::Item(b)) => {
945                ItemPtr::from(a).try_squash(ItemPtr::from(b))
946            }
947            (BlockCarrier::Skip(a), BlockCarrier::Skip(b)) => {
948                a.merge(b);
949                true
950            }
951            _ => false,
952        }
953    }
954
955    pub fn as_item_ptr(&mut self) -> Option<ItemPtr> {
956        if let BlockCarrier::Item(block) = self {
957            Some(ItemPtr::from(block))
958        } else {
959            None
960        }
961    }
962
963    pub fn into_block(self) -> Option<Box<Item>> {
964        if let BlockCarrier::Item(block) = self {
965            Some(block)
966        } else {
967            None
968        }
969    }
970
971    #[inline]
972    pub fn is_skip(&self) -> bool {
973        if let BlockCarrier::Skip(_) = self {
974            true
975        } else {
976            false
977        }
978    }
979    pub fn encode_with_offset<E: Encoder>(&self, encoder: &mut E, offset: u32) {
980        match self {
981            BlockCarrier::Item(x) => {
982                let slice = ItemSlice::new(x.into(), offset, x.len() - 1);
983                slice.encode(encoder)
984            }
985            BlockCarrier::Skip(x) => {
986                encoder.write_info(BLOCK_SKIP_REF_NUMBER);
987                encoder.write_var(x.len - offset);
988            }
989            BlockCarrier::GC(x) => {
990                encoder.write_info(BLOCK_GC_REF_NUMBER);
991                encoder.write_len(x.len - offset);
992            }
993        }
994    }
995
996    pub fn integrate(&mut self, txn: &mut TransactionMut, offset: u32) -> bool {
997        match self {
998            BlockCarrier::Item(x) => ItemPtr::from(x).integrate(txn, offset),
999            BlockCarrier::Skip(x) => x.integrate(offset),
1000            BlockCarrier::GC(x) => x.integrate(offset),
1001        }
1002    }
1003}
1004
1005impl From<Box<Item>> for BlockCarrier {
1006    fn from(block: Box<Item>) -> Self {
1007        BlockCarrier::Item(block)
1008    }
1009}
1010
1011impl Encode for BlockCarrier {
1012    fn encode<E: Encoder>(&self, encoder: &mut E) {
1013        match self {
1014            BlockCarrier::Item(block) => block.encode(encoder),
1015            BlockCarrier::Skip(skip) => {
1016                encoder.write_info(BLOCK_SKIP_REF_NUMBER);
1017                encoder.write_len(skip.len)
1018            }
1019            BlockCarrier::GC(gc) => {
1020                encoder.write_info(BLOCK_GC_REF_NUMBER);
1021                encoder.write_len(gc.len)
1022            }
1023        }
1024    }
1025}
1026
1027/// A pending update which contains unapplied blocks from the update which created it.
1028#[derive(Debug, PartialEq)]
1029pub struct PendingUpdate {
1030    /// Collection of unapplied blocks.
1031    pub update: Update,
1032    /// A state vector that informs about minimal client clock values that need to be satisfied
1033    /// in order to successfully apply corresponding `update`.
1034    pub missing: StateVector,
1035}
1036
1037impl std::fmt::Debug for Update {
1038    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1039        std::fmt::Display::fmt(self, f)
1040    }
1041}
1042
1043impl std::fmt::Display for Update {
1044    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1045        let mut s = f.debug_struct("");
1046        if !self.blocks.is_empty() {
1047            s.field("blocks", &self.blocks);
1048        }
1049        if !self.delete_set.is_empty() {
1050            s.field("delete set", &self.delete_set);
1051        }
1052        s.finish()
1053    }
1054}
1055
1056/// Conversion for tests only
1057#[cfg(test)]
1058impl Into<Store> for Update {
1059    fn into(self) -> Store {
1060        use crate::doc::Options;
1061
1062        let mut store = Store::new(&Options::with_client_id(0));
1063        for (_, vec) in self.blocks.clients {
1064            for block in vec {
1065                if let BlockCarrier::Item(block) = block {
1066                    store.blocks.push_block(block);
1067                } else {
1068                    panic!("Cannot convert Update into block store - Skip block detected");
1069                }
1070            }
1071        }
1072        store
1073    }
1074}
1075
1076pub(crate) struct Blocks<'a> {
1077    current_client: std::vec::IntoIter<(&'a ClientID, &'a VecDeque<BlockCarrier>)>,
1078    current_block: Option<std::collections::vec_deque::Iter<'a, BlockCarrier>>,
1079}
1080
1081impl<'a> Blocks<'a> {
1082    fn new(update: &'a UpdateBlocks) -> Self {
1083        let mut client_blocks: Vec<(&'a ClientID, &'a VecDeque<BlockCarrier>)> =
1084            update.clients.iter().collect();
1085        // sorting to return higher client ids first
1086        client_blocks.sort_by(|a, b| b.0.cmp(a.0));
1087        let mut current_client = client_blocks.into_iter();
1088
1089        let current_block = current_client.next().map(|(_, v)| v.iter());
1090        Blocks {
1091            current_client,
1092            current_block,
1093        }
1094    }
1095}
1096
1097impl<'a> Iterator for Blocks<'a> {
1098    type Item = &'a BlockCarrier;
1099
1100    fn next(&mut self) -> Option<Self::Item> {
1101        if let Some(blocks) = self.current_block.as_mut() {
1102            let block = blocks.next();
1103            if block.is_some() {
1104                return block;
1105            }
1106        }
1107
1108        if let Some(entry) = self.current_client.next() {
1109            self.current_block = Some(entry.1.iter());
1110            self.next()
1111        } else {
1112            None
1113        }
1114    }
1115}
1116
1117pub(crate) struct IntoBlocks {
1118    current_client: std::vec::IntoIter<(ClientID, VecDeque<BlockCarrier>)>,
1119    current_block: Option<std::collections::vec_deque::IntoIter<BlockCarrier>>,
1120    ignore_skip: bool,
1121}
1122
1123impl IntoBlocks {
1124    fn new(update: UpdateBlocks, ignore_skip: bool) -> Self {
1125        let mut client_blocks: Vec<(ClientID, VecDeque<BlockCarrier>)> =
1126            update.clients.into_iter().collect();
1127        // sorting to return higher client ids first
1128        client_blocks.sort_by(|a, b| b.0.cmp(&a.0));
1129        let mut current_client = client_blocks.into_iter();
1130
1131        let current_block = current_client.next().map(|(_, v)| v.into_iter());
1132        IntoBlocks {
1133            current_client,
1134            current_block,
1135            ignore_skip,
1136        }
1137    }
1138}
1139
1140impl Iterator for IntoBlocks {
1141    type Item = BlockCarrier;
1142
1143    fn next(&mut self) -> Option<Self::Item> {
1144        if let Some(blocks) = self.current_block.as_mut() {
1145            let block = blocks.next();
1146            match block {
1147                Some(BlockCarrier::Skip(_)) if self.ignore_skip => return self.next(),
1148                Some(block) => return Some(block),
1149                None => {}
1150            }
1151        }
1152
1153        if let Some(entry) = self.current_client.next() {
1154            self.current_block = Some(entry.1.into_iter());
1155            self.next()
1156        } else {
1157            None
1158        }
1159    }
1160}
1161
1162#[cfg(test)]
1163mod test {
1164    use std::collections::{HashMap, VecDeque};
1165    use std::iter::FromIterator;
1166    use std::sync::{Arc, Mutex};
1167
1168    use crate::block::{BlockRange, ClientID, Item, ItemContent};
1169    use crate::encoding::read::Cursor;
1170    use crate::types::{Delta, TypePtr};
1171    use crate::update::{BlockCarrier, Update, UpdateBlocks};
1172    use crate::updates::decoder::{Decode, DecoderV1};
1173    use crate::updates::encoder::Encode;
1174    use crate::{
1175        merge_updates_v1, Any, DeleteSet, Doc, GetString, Options, ReadTxn, StateVector, Text,
1176        Transact, WriteTxn, XmlFragment, XmlOut, ID,
1177    };
1178
1179    #[test]
1180    fn update_decode() {
1181        /* Generated with:
1182
1183           ```js
1184           var Y = require('yjs');
1185
1186           var doc = new Y.Doc()
1187           var map = doc.getMap()
1188           map.set('keyB', 'valueB')
1189
1190           // Merge changes from remote
1191           var update = Y.encodeStateAsUpdate(doc)
1192           ```
1193        */
1194        let update: &[u8] = &[
1195            1, 1, 176, 249, 159, 198, 7, 0, 40, 1, 0, 4, 107, 101, 121, 66, 1, 119, 6, 118, 97,
1196            108, 117, 101, 66, 0,
1197        ];
1198        let mut decoder = DecoderV1::from(update);
1199        let u = Update::decode(&mut decoder).unwrap();
1200
1201        let id = ID::new(2026372272, 0);
1202        let block = u.blocks.clients.get(&id.client).unwrap();
1203        let mut expected: Vec<BlockCarrier> = Vec::new();
1204        expected.push(
1205            Item::new(
1206                id,
1207                None,
1208                None,
1209                None,
1210                None,
1211                TypePtr::Named("".into()),
1212                Some("keyB".into()),
1213                ItemContent::Any(vec!["valueB".into()]),
1214            )
1215            .unwrap()
1216            .into(),
1217        );
1218        assert_eq!(block, &expected);
1219    }
1220
1221    #[test]
1222    fn update_merge() {
1223        let d1 = Doc::with_client_id(1);
1224        let txt1 = d1.get_or_insert_text("test");
1225        let mut t1 = d1.transact_mut();
1226
1227        let d2 = Doc::with_client_id(2);
1228        let txt2 = d2.get_or_insert_text("test");
1229        let mut t2 = d2.transact_mut();
1230
1231        txt1.insert(&mut t1, 0, "aaa");
1232        txt1.insert(&mut t1, 0, "aaa");
1233
1234        txt2.insert(&mut t2, 0, "bbb");
1235        txt2.insert(&mut t2, 2, "bbb");
1236
1237        let binary1 = t1.encode_update_v1();
1238        let binary2 = t2.encode_update_v1();
1239
1240        t1.apply_update(Update::decode_v1(binary2.as_slice()).unwrap())
1241            .unwrap();
1242        t2.apply_update(Update::decode_v1(binary1.as_slice()).unwrap())
1243            .unwrap();
1244
1245        let u1 = Update::decode(&mut DecoderV1::new(Cursor::new(binary1.as_slice()))).unwrap();
1246        let u2 = Update::decode(&mut DecoderV1::new(Cursor::new(binary2.as_slice()))).unwrap();
1247
1248        // a crux of our test: merged update upon applying should produce
1249        // the same output as sequence of updates applied individually
1250        let u12 = Update::merge_updates(vec![u1, u2]);
1251
1252        let d3 = Doc::with_client_id(3);
1253        let txt3 = d3.get_or_insert_text("test");
1254        let mut t3 = d3.transact_mut();
1255        t3.apply_update(u12).unwrap();
1256
1257        let str1 = txt1.get_string(&t1);
1258        let str2 = txt2.get_string(&t2);
1259        let str3 = txt3.get_string(&t3);
1260
1261        assert_eq!(str1, str2);
1262        assert_eq!(str2, str3);
1263    }
1264
1265    #[test]
1266    fn test_duplicate_updates() {
1267        let doc = Doc::with_client_id(1);
1268        let txt = doc.get_or_insert_text("test");
1269        let mut tr = doc.transact_mut();
1270        txt.insert(&mut tr, 0, "aaa");
1271
1272        let binary = tr.encode_update_v1();
1273        let u1 = decode_update(&binary);
1274        let u2 = decode_update(&binary);
1275        let u3 = decode_update(&binary);
1276
1277        let merged_update = Update::merge_updates(vec![u1, u2]);
1278        assert_eq!(merged_update, u3);
1279    }
1280
1281    #[test]
1282    fn test_multiple_clients_in_one_update() {
1283        let binary1 = {
1284            let doc = Doc::with_client_id(1);
1285            let txt = doc.get_or_insert_text("test");
1286            let mut tr = doc.transact_mut();
1287            txt.insert(&mut tr, 0, "aaa");
1288            tr.encode_update_v1()
1289        };
1290        let binary2 = {
1291            let doc = Doc::with_client_id(2);
1292            let txt = doc.get_or_insert_text("test");
1293            let mut tr = doc.transact_mut();
1294            txt.insert(&mut tr, 0, "bbb");
1295            tr.encode_update_v1()
1296        };
1297
1298        let u12 = Update::merge_updates(vec![decode_update(&binary1), decode_update(&binary2)]);
1299        let u12_copy =
1300            Update::merge_updates(vec![decode_update(&binary1), decode_update(&binary2)]);
1301
1302        assert_eq!(2, u12.blocks.clients.keys().len());
1303
1304        let merged_update = Update::merge_updates(vec![u12]);
1305        assert_eq!(merged_update, u12_copy);
1306    }
1307
1308    #[test]
1309    fn test_v2_encoding_of_fragmented_delete_set() {
1310        let before = vec![
1311            0, 1, 0, 11, 129, 215, 239, 201, 16, 198, 237, 152, 220, 8, 4, 4, 0, 4, 1, 1, 0, 11,
1312            40, 3, 39, 0, 4, 0, 7, 0, 40, 3, 8, 163, 1, 142, 1, 110, 111, 116, 101, 46, 103, 117,
1313            105, 100, 110, 111, 116, 101, 71, 117, 105, 100, 110, 111, 116, 101, 46, 111, 119, 110,
1314            101, 114, 111, 119, 110, 101, 114, 110, 111, 116, 101, 46, 116, 121, 112, 101, 110,
1315            111, 116, 101, 84, 121, 112, 101, 110, 111, 116, 101, 46, 99, 114, 101, 97, 116, 101,
1316            84, 105, 109, 101, 99, 114, 101, 97, 116, 101, 84, 105, 109, 101, 110, 111, 116, 101,
1317            46, 116, 105, 116, 108, 101, 116, 105, 116, 108, 101, 49, 112, 114, 111, 115, 101, 109,
1318            105, 114, 114, 111, 114, 108, 105, 110, 107, 110, 111, 116, 101, 103, 117, 105, 100,
1319            115, 108, 111, 116, 71, 117, 105, 100, 116, 121, 112, 101, 108, 105, 110, 107, 84, 121,
1320            112, 101, 99, 104, 105, 108, 100, 114, 101, 110, 98, 9, 8, 10, 5, 9, 8, 15, 74, 0, 5,
1321            1, 11, 8, 4, 8, 4, 72, 0, 1, 9, 1, 4, 0, 0, 1, 0, 0, 3, 1, 2, 2, 3, 2, 65, 8, 2, 4, 0,
1322            119, 22, 97, 99, 70, 120, 85, 89, 68, 76, 82, 104, 101, 114, 107, 74, 97, 66, 101, 99,
1323            115, 99, 51, 103, 125, 136, 57, 125, 0, 119, 13, 49, 54, 56, 53, 53, 51, 48, 50, 56,
1324            54, 54, 53, 54, 9, 0, 119, 22, 66, 67, 100, 81, 112, 112, 119, 69, 84, 48, 105, 82, 86,
1325            66, 81, 45, 56, 69, 87, 50, 87, 103, 119, 22, 106, 114, 69, 109, 73, 77, 112, 86, 84,
1326            101, 45, 99, 114, 78, 50, 86, 76, 51, 99, 97, 72, 81, 119, 8, 108, 105, 110, 107, 110,
1327            111, 116, 101, 119, 1, 49, 118, 2, 4, 103, 117, 105, 100, 119, 22, 66, 67, 100, 81,
1328            112, 112, 119, 69, 84, 48, 105, 82, 86, 66, 81, 45, 56, 69, 87, 50, 87, 103, 8, 115,
1329            108, 111, 116, 71, 117, 105, 100, 119, 22, 106, 114, 69, 109, 73, 77, 112, 86, 84, 101,
1330            45, 99, 114, 78, 50, 86, 76, 51, 99, 97, 72, 81, 119, 22, 66, 67, 100, 81, 112, 112,
1331            119, 69, 84, 48, 105, 82, 86, 66, 81, 45, 56, 69, 87, 50, 87, 103, 0,
1332        ];
1333        let update = vec![
1334            0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 198, 182, 140, 174, 4, 1, 2, 0, 0, 5,
1335        ];
1336        let doc = Doc::with_options(Options {
1337            skip_gc: true,
1338            client_id: 1,
1339            ..Default::default()
1340        });
1341        let prosemirror = doc.get_or_insert_xml_fragment("prosemirror");
1342        {
1343            let mut txn = doc.transact_mut();
1344            let u = Update::decode_v2(&before).unwrap();
1345            txn.apply_update(u).unwrap();
1346            let linknote = prosemirror.get(&txn, 0);
1347            let actual = linknote.and_then(|xml| match xml {
1348                XmlOut::Element(elem) => Some(elem.tag().clone()),
1349                _ => None,
1350            });
1351            assert_eq!(actual, Some("linknote".into()));
1352        }
1353        {
1354            let mut txn = doc.transact_mut();
1355            let u = Update::decode_v2(&update).unwrap();
1356            txn.apply_update(u).unwrap();
1357
1358            // this should not panic
1359            let binary = txn.encode_update_v2();
1360            let _ = Update::decode_v2(&binary).unwrap();
1361
1362            let linknote = prosemirror.get(&txn, 0);
1363            assert!(linknote.is_none());
1364        }
1365    }
1366
1367    #[test]
1368    fn merge_pending_updates() {
1369        let d0 = Doc::with_client_id(0);
1370        let server_updates = Arc::new(Mutex::new(vec![]));
1371        let sub = {
1372            let server_updates = server_updates.clone();
1373            d0.observe_update_v1(move |_, update| {
1374                let mut lock = server_updates.lock().unwrap();
1375                lock.push(update.update.clone());
1376            })
1377            .unwrap()
1378        };
1379        let txt = d0.get_or_insert_text("textBlock");
1380        txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("r")]);
1381        txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("o")]);
1382        txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("n")]);
1383        txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("e")]);
1384        txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("n")]);
1385        drop(sub);
1386
1387        let updates = Arc::into_inner(server_updates).unwrap();
1388        let updates = updates.into_inner().unwrap();
1389
1390        let d1 = Doc::with_client_id(1);
1391        d1.transact_mut()
1392            .apply_update(Update::decode_v1(&updates[0]).unwrap())
1393            .unwrap();
1394        let u1 = d1
1395            .transact()
1396            .encode_state_as_update_v1(&StateVector::default());
1397
1398        let d2 = Doc::with_client_id(2);
1399        d2.transact_mut()
1400            .apply_update(Update::decode_v1(&u1).unwrap())
1401            .unwrap();
1402        d2.transact_mut()
1403            .apply_update(Update::decode_v1(&updates[1]).unwrap())
1404            .unwrap();
1405        let u2 = d2
1406            .transact()
1407            .encode_state_as_update_v1(&StateVector::default());
1408
1409        let d3 = Doc::with_client_id(3);
1410        d3.transact_mut()
1411            .apply_update(Update::decode_v1(&u2).unwrap())
1412            .unwrap();
1413        d3.transact_mut()
1414            .apply_update(Update::decode_v1(&updates[3]).unwrap())
1415            .unwrap();
1416        let u3 = d3
1417            .transact()
1418            .encode_state_as_update_v1(&StateVector::default());
1419
1420        let d4 = Doc::with_client_id(4);
1421        d4.transact_mut()
1422            .apply_update(Update::decode_v1(&u3).unwrap())
1423            .unwrap();
1424        d4.transact_mut()
1425            .apply_update(Update::decode_v1(&updates[2]).unwrap())
1426            .unwrap();
1427        let u4 = d4
1428            .transact()
1429            .encode_state_as_update_v1(&StateVector::default());
1430
1431        let d5 = Doc::with_client_id(5);
1432        d5.transact_mut()
1433            .apply_update(Update::decode_v1(&u4).unwrap())
1434            .unwrap();
1435        d5.transact_mut()
1436            .apply_update(Update::decode_v1(&updates[4]).unwrap())
1437            .unwrap();
1438
1439        let txt5 = d5.get_or_insert_text("textBlock");
1440        let str = txt5.get_string(&d5.transact());
1441        assert_eq!(str, "nenor");
1442    }
1443
1444    #[test]
1445    fn update_state_vector_with_skips() {
1446        let mut update = Update::new();
1447        // skip followed by item => not included in state vector as it's not continuous from 0
1448        update
1449            .blocks
1450            .add_block(BlockCarrier::Skip(BlockRange::new(ID::new(1, 0), 1)));
1451        update.blocks.add_block(test_item(1, 1, 1));
1452        // item starting from non-0 => not included
1453        update.blocks.add_block(test_item(2, 1, 1));
1454        // item => skip => item : second item not included
1455        update.blocks.add_block(test_item(3, 0, 1));
1456        update
1457            .blocks
1458            .add_block(BlockCarrier::Skip(BlockRange::new(ID::new(3, 1), 1)));
1459        update.blocks.add_block(test_item(3, 2, 1));
1460
1461        let sv = update.state_vector();
1462        assert_eq!(sv, StateVector::from_iter([(3, 1)]));
1463    }
1464
1465    #[test]
1466    fn test_extends() {
1467        let mut u = Update::new();
1468        u.blocks.add_block(test_item(1, 0, 2)); // new data with partial duplicate
1469        assert!(u.extends(&StateVector::from_iter([(1, 1)])));
1470
1471        let mut u = Update::new();
1472        u.blocks.add_block(test_item(1, 0, 1)); // duplicate
1473        assert!(!u.extends(&StateVector::from_iter([(1, 1)])));
1474
1475        let mut u = Update::new();
1476        u.blocks
1477            .add_block(BlockCarrier::Skip(BlockRange::new(ID::new(1, 0), 2)));
1478        u.blocks.add_block(test_item(1, 2, 1)); // skip cause disjoin in updates
1479        assert!(!u.extends(&StateVector::from_iter([(1, 1)])));
1480
1481        let mut u = Update::new();
1482        u.blocks.add_block(test_item(1, 1, 1)); // adjacent
1483        assert!(u.extends(&StateVector::from_iter([(1, 1)])));
1484
1485        let mut u = Update::new();
1486        u.blocks.add_block(test_item(1, 2, 1)); // disjoint
1487        assert!(!u.extends(&StateVector::from_iter([(1, 1)])));
1488    }
1489
1490    #[test]
1491    fn empty_update_v1() {
1492        let u = Update::new();
1493        let binary = u.encode_v1();
1494        assert_eq!(&binary, Update::EMPTY_V1)
1495    }
1496
1497    #[test]
1498    fn empty_update_v2() {
1499        let u = Update::new();
1500        let binary = u.encode_v2();
1501        assert_eq!(&binary, Update::EMPTY_V2)
1502    }
1503
1504    #[test]
1505    fn update_v2_with_skips() {
1506        let u1 = update_with_skips();
1507        let encoded = u1.encode_v2();
1508        let u2 = Update::decode_v2(&encoded).unwrap();
1509        assert_eq!(u1, u2);
1510    }
1511
1512    #[test]
1513    fn pending_update_check() {
1514        let update = update_with_skips();
1515        let expected = update.encode_v1();
1516        let doc = Doc::with_client_id(2);
1517        let mut txn = doc.transact_mut();
1518        let txt = txn.get_or_insert_text("test");
1519        txn.apply_update(update).unwrap();
1520        let str = txt.get_string(&txn);
1521        assert_eq!(str, "hello"); // 'world' is missing because of skip block
1522        assert!(txn.has_missing_updates());
1523        let state = txn.encode_state_as_update_v1(&Default::default());
1524        assert_eq!(state, expected); // we include pending update
1525        let pending = txn.prune_pending();
1526        assert!(pending.is_some());
1527        let state = txn.encode_state_as_update_v1(&Default::default());
1528        assert_ne!(state, expected); // we pruned pending update
1529        let joined = merge_updates_v1([state, pending.unwrap().encode_v1()]).unwrap();
1530        assert_eq!(joined, expected); // we joined current and pending state, they should be equal
1531    }
1532
1533    fn update_with_skips() -> Update {
1534        Update {
1535            blocks: UpdateBlocks {
1536                clients: HashMap::from_iter([(
1537                    1,
1538                    VecDeque::from_iter([
1539                        BlockCarrier::Item(
1540                            Item::new(
1541                                ID::new(1, 0),
1542                                None,
1543                                None,
1544                                None,
1545                                None,
1546                                TypePtr::Named("test".into()),
1547                                None,
1548                                ItemContent::String("hello".into()),
1549                            )
1550                            .unwrap(),
1551                        ),
1552                        BlockCarrier::Skip(BlockRange::new(ID::new(1, 5), 3)),
1553                        BlockCarrier::Item(
1554                            Item::new(
1555                                ID::new(1, 8),
1556                                None,
1557                                Some(ID::new(1, 7)),
1558                                None,
1559                                None,
1560                                TypePtr::Unknown,
1561                                None,
1562                                ItemContent::String("world".into()),
1563                            )
1564                            .unwrap(),
1565                        ),
1566                    ]),
1567                )]),
1568            },
1569            delete_set: DeleteSet::default(),
1570        }
1571    }
1572
1573    fn test_item(client_id: ClientID, clock: u32, len: u32) -> BlockCarrier {
1574        assert!(len > 0);
1575        let any: Vec<_> = (0..len).into_iter().map(Any::from).collect();
1576        BlockCarrier::Item(
1577            Item::new(
1578                ID::new(client_id, clock),
1579                None,
1580                None,
1581                None,
1582                None,
1583                TypePtr::Named("test".into()),
1584                None,
1585                ItemContent::Any(any),
1586            )
1587            .unwrap(),
1588        )
1589    }
1590
1591    fn decode_update(bin: &[u8]) -> Update {
1592        Update::decode(&mut DecoderV1::new(Cursor::new(bin))).unwrap()
1593    }
1594}