1use std::cmp::Ordering;
2use std::collections::hash_map::Entry;
3use std::collections::{HashMap, VecDeque};
4use std::hash::BuildHasherDefault;
5use std::sync::Arc;
6
7use crate::block::{
8 BlockRange, ClientID, Item, ItemContent, ItemPtr, BLOCK_GC_REF_NUMBER, BLOCK_SKIP_REF_NUMBER,
9 HAS_ORIGIN, HAS_PARENT_SUB, HAS_RIGHT_ORIGIN,
10};
11use crate::encoding::read::Error;
12use crate::error::UpdateError;
13use crate::id_set::{DeleteSet, IdSet};
14use crate::slice::ItemSlice;
15#[cfg(test)]
16use crate::store::Store;
17use crate::transaction::TransactionMut;
18use crate::types::TypePtr;
19use crate::updates::decoder::{Decode, Decoder};
20use crate::updates::encoder::{Encode, Encoder};
21use crate::utils::client_hasher::ClientHasher;
22use crate::{OffsetKind, StateVector, ID};
23
24#[derive(Debug, Default, PartialEq)]
25pub(crate) struct UpdateBlocks {
26 clients: HashMap<ClientID, VecDeque<BlockCarrier>, BuildHasherDefault<ClientHasher>>,
27}
28
29impl UpdateBlocks {
30 pub(crate) fn add_block(&mut self, block: BlockCarrier) {
35 let e = self.clients.entry(block.id().client).or_default();
36 e.push_back(block);
37 }
38
39 pub fn is_empty(&self) -> bool {
40 self.clients.is_empty()
41 }
42
43 pub(crate) fn blocks(&self) -> Blocks<'_> {
46 Blocks::new(self)
47 }
48
49 pub(crate) fn into_blocks(self, ignore_skip: bool) -> IntoBlocks {
52 IntoBlocks::new(self, ignore_skip)
53 }
54}
55
56impl std::fmt::Display for UpdateBlocks {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 writeln!(f, "{{")?;
59 for (client, blocks) in self.clients.iter() {
60 writeln!(f, "\t{} -> [", client)?;
61 for block in blocks {
62 writeln!(f, "\t\t{}", block)?;
63 }
64 write!(f, "\t]")?;
65 }
66 writeln!(f, "}}")
67 }
68}
69
70impl std::fmt::Debug for BlockCarrier {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 std::fmt::Display::fmt(self, f)
73 }
74}
75impl std::fmt::Display for BlockCarrier {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 match self {
78 BlockCarrier::Item(x) => x.fmt(f),
79 BlockCarrier::Skip(x) => write!(f, "Skip{}", x),
80 BlockCarrier::GC(x) => write!(f, "GC{}", x),
81 }
82 }
83}
84
85#[derive(Default, PartialEq)]
92pub struct Update {
93 pub(crate) blocks: UpdateBlocks,
94 pub(crate) delete_set: DeleteSet,
95}
96
97impl Update {
98 pub const EMPTY_V1: &'static [u8] = &[0, 0];
100
101 pub const EMPTY_V2: &'static [u8] = &[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0];
103
104 pub fn new() -> Self {
105 Self::default()
106 }
107
108 pub fn is_empty(&self) -> bool {
110 self.blocks.is_empty() && self.delete_set.is_empty()
111 }
112
113 pub fn extends(&self, state_vector: &StateVector) -> bool {
115 for (client_id, blocks) in self.blocks.clients.iter() {
116 let clock = state_vector.get(client_id);
117 let mut iter = blocks.iter();
118 while let Some(block) = iter.next() {
119 let range = block.range();
120 if range.id.clock <= clock && range.id.clock + range.len > clock {
121 if !block.is_skip() {
124 return true;
125 }
126 }
127 }
128 }
129 false
130 }
131
132 pub fn state_vector(&self) -> StateVector {
135 let mut sv = StateVector::default();
136 for (&client, blocks) in self.blocks.clients.iter() {
137 let mut last_clock = 0;
138 if !blocks.is_empty() && blocks[0].id().clock == 0 {
139 for block in blocks.iter() {
142 if let BlockCarrier::Skip(_) = block {
143 break;
145 }
146 last_clock = block.id().clock + block.len();
147 }
148 }
149 if last_clock != 0 {
150 sv.set_max(client, last_clock);
151 }
152 }
153 sv
154 }
155
156 pub fn state_vector_lower(&self) -> StateVector {
159 let mut sv = StateVector::default();
160 for (&client, blocks) in self.blocks.clients.iter() {
161 for block in blocks.iter() {
162 if !block.is_skip() {
163 let id = block.id();
164 sv.set_max(client, id.clock);
165 break;
166 }
167 }
168 }
169 sv
170 }
171
172 pub fn insertions(&self, include_deleted: bool) -> IdSet {
177 let mut insertions = IdSet::default();
178 for blocks in self.blocks.clients.values() {
179 for block in blocks.iter() {
180 match block {
181 BlockCarrier::Item(item) if include_deleted || !item.is_deleted() => {
182 insertions.insert(item.id, item.len);
183 }
184 BlockCarrier::GC(range) if include_deleted => {
185 insertions.insert(range.id, range.len);
186 }
187 _ => {}
188 }
189 }
190 }
191 insertions.squash();
192 insertions
193 }
194
195 pub fn delete_set(&self) -> &DeleteSet {
197 &self.delete_set
198 }
199
200 pub fn merge(&mut self, other: Self) {
202 for (client, other_blocks) in other.blocks.clients {
203 match self.blocks.clients.entry(client) {
204 Entry::Occupied(e) => {
205 let blocks = e.into_mut();
206 let mut i2 = other_blocks.into_iter();
207 let mut n2 = i2.next();
208
209 let mut i1 = 0;
210
211 while i1 < blocks.len() {
212 let a = &mut blocks[i1];
213 if let Some(b) = n2.as_ref() {
214 if a.try_squash(b) {
215 n2 = i2.next();
216 continue;
217 } else if let BlockCarrier::Item(block) = a {
218 let diff = (block.id().clock + block.len()) as isize
220 - b.id().clock as isize;
221 if diff > 0 {
222 if let Some(new) = a.splice(diff as u32) {
224 blocks.insert(i1 + 1, new);
225 }
226 }
228 }
229 i1 += 1;
230 n2 = i2.next();
231 } else {
232 break;
233 }
234 }
235
236 while let Some(b) = n2 {
237 blocks.push_back(b);
238 n2 = i2.next();
239 }
240 }
241 Entry::Vacant(e) => {
242 e.insert(other_blocks);
243 }
244 }
245 }
246 }
247
248 pub(crate) fn integrate(
254 mut self,
255 txn: &mut TransactionMut,
256 ) -> Result<(Option<PendingUpdate>, Option<Update>), UpdateError> {
257 let remaining_blocks = if self.blocks.is_empty() {
258 None
259 } else {
260 let mut store = txn.store_mut();
261 let mut client_block_ref_ids: Vec<ClientID> =
262 self.blocks.clients.keys().cloned().collect();
263 client_block_ref_ids.sort();
264
265 let mut current_client_id = client_block_ref_ids.pop().unwrap();
266 let mut current_target = self.blocks.clients.get_mut(¤t_client_id);
267 let mut stack_head = if let Some(v) = current_target.as_mut() {
268 v.pop_front()
269 } else {
270 None
271 };
272
273 let mut local_sv = store.blocks.get_state_vector();
274 let mut missing_sv = StateVector::default();
275 let mut remaining = UpdateBlocks::default();
276 let mut stack = Vec::new();
277
278 while let Some(mut block) = stack_head {
279 if !block.is_skip() {
280 let id = *block.id();
281 if local_sv.contains(&id) {
282 let offset = local_sv.get(&id.client) as i32 - id.clock as i32;
283 if let Some(dep) = Self::missing(&block, &local_sv) {
284 stack.push(block);
285 match self.blocks.clients.get_mut(&dep) {
287 Some(block_refs) if !block_refs.is_empty() => {
288 stack_head = block_refs.pop_front();
289 current_target =
290 self.blocks.clients.get_mut(¤t_client_id);
291 continue;
292 }
293 _ => {
294 missing_sv.set_min(dep, local_sv.get(&dep));
296 Self::return_stack(stack, &mut self.blocks, &mut remaining);
297 current_target =
298 self.blocks.clients.get_mut(¤t_client_id);
299 stack = Vec::new();
300 }
301 }
302 } else if offset == 0 || (offset as u32) < block.len() {
303 let offset = offset as u32;
304 let client = id.client;
305 local_sv.set_max(client, id.clock + block.len());
306 if let BlockCarrier::Item(item) = &mut block {
307 item.repair(store)?;
308 }
309 let should_delete = block.integrate(txn, offset);
310 let mut delete_ptr = if should_delete {
311 let ptr = block.as_item_ptr();
312 ptr
313 } else {
314 None
315 };
316 store = txn.store_mut();
317 match block {
318 BlockCarrier::Item(item) => {
319 if item.parent != TypePtr::Unknown {
320 store.blocks.push_block(item)
321 } else {
322 store.blocks.push_gc(BlockRange::new(item.id, item.len));
324 delete_ptr = None;
325 }
326 }
327 BlockCarrier::GC(gc) => store.blocks.push_gc(gc),
328 BlockCarrier::Skip(_) => { }
329 }
330
331 if let Some(ptr) = delete_ptr {
332 txn.delete(ptr);
333 }
334 store = txn.store_mut();
335 }
336 } else {
337 let id = block.id();
339 missing_sv.set_min(id.client, id.clock - 1);
340 stack.push(block);
341 Self::return_stack(stack, &mut self.blocks, &mut remaining);
343 current_target = self.blocks.clients.get_mut(¤t_client_id);
344 stack = Vec::new();
345 }
346 }
347
348 if !stack.is_empty() {
350 stack_head = stack.pop();
351 } else {
352 match current_target.take() {
353 Some(v) if !v.is_empty() => {
354 stack_head = v.pop_front();
355 current_target = Some(v);
356 }
357 _ => {
358 if let Some((client_id, target)) =
359 Self::next_target(&mut client_block_ref_ids, &mut self.blocks)
360 {
361 stack_head = target.pop_front();
362 current_client_id = client_id;
363 current_target = Some(target);
364 } else {
365 break;
367 }
368 }
369 };
370 }
371 }
372
373 if remaining.is_empty() {
374 None
375 } else {
376 Some(PendingUpdate {
377 update: Update {
378 blocks: remaining,
379 delete_set: DeleteSet::new(),
380 },
381 missing: missing_sv,
382 })
383 }
384 };
385
386 let remaining_ds = txn.apply_delete(&self.delete_set).map(|ds| {
387 let mut update = Update::new();
388 update.delete_set = ds;
389 update
390 });
391
392 Ok((remaining_blocks, remaining_ds))
393 }
394
395 fn missing(block: &BlockCarrier, local_sv: &StateVector) -> Option<ClientID> {
396 if let BlockCarrier::Item(item) = block {
397 if let Some(origin) = &item.origin {
398 if origin.client != item.id.client && origin.clock >= local_sv.get(&origin.client) {
399 return Some(origin.client);
400 }
401 }
402
403 if let Some(right_origin) = &item.right_origin {
404 if right_origin.client != item.id.client
405 && right_origin.clock >= local_sv.get(&right_origin.client)
406 {
407 return Some(right_origin.client);
408 }
409 }
410
411 match &item.parent {
412 TypePtr::Branch(parent) => {
413 if let Some(block) = &parent.item {
414 let parent_id = block.id();
415 if parent_id.client != item.id.client
416 && parent_id.clock >= local_sv.get(&parent_id.client)
417 {
418 return Some(parent_id.client);
419 }
420 }
421 }
422 TypePtr::ID(parent_id) => {
423 if parent_id.client != item.id.client
424 && parent_id.clock >= local_sv.get(&parent_id.client)
425 {
426 return Some(parent_id.client);
427 }
428 }
429 _ => {}
430 }
431
432 match &item.content {
433 ItemContent::Move(m) => {
434 if let Some(start) = m.start.id() {
435 if start.clock >= local_sv.get(&start.client) {
436 return Some(start.client);
437 }
438 }
439 if !m.is_collapsed() {
440 if let Some(end) = m.end.id() {
441 if end.clock >= local_sv.get(&end.client) {
442 return Some(end.client);
443 }
444 }
445 }
446 }
447 ItemContent::Type(branch) => {
448 #[cfg(feature = "weak")]
449 if let crate::types::TypeRef::WeakLink(source) = &branch.type_ref {
450 let start = source.quote_start.id();
451 let end = source.quote_end.id();
452 if let Some(start) = start {
453 if start.clock >= local_sv.get(&start.client) {
454 return Some(start.client);
455 }
456 }
457 if start != end {
458 if let Some(end) = &source.quote_end.id() {
459 if end.clock >= local_sv.get(&end.client) {
460 return Some(end.client);
461 }
462 }
463 }
464 }
465 }
466 _ => { }
467 }
468 }
469 None
470 }
471
472 fn next_target<'a, 'b>(
473 client_block_ref_ids: &'a mut Vec<ClientID>,
474 blocks: &'b mut UpdateBlocks,
475 ) -> Option<(ClientID, &'b mut VecDeque<BlockCarrier>)> {
476 while let Some(id) = client_block_ref_ids.pop() {
477 match blocks.clients.get(&id) {
478 Some(client_blocks) if !client_blocks.is_empty() => {
479 let client_blocks = unsafe {
483 (client_blocks as *const VecDeque<BlockCarrier>
484 as *mut VecDeque<BlockCarrier>)
485 .as_mut()
486 .unwrap()
487 };
488 return Some((id, client_blocks));
489 }
490 _ => {}
491 }
492 }
493 None
494 }
495
496 fn return_stack(
497 stack: Vec<BlockCarrier>,
498 refs: &mut UpdateBlocks,
499 remaining: &mut UpdateBlocks,
500 ) {
501 for item in stack.into_iter() {
502 let client = item.id().client;
503 if let Some(mut unapplicable_items) = refs.clients.remove(&client) {
505 unapplicable_items.push_front(item);
507 remaining.clients.insert(client, unapplicable_items);
508 } else {
509 let mut blocks = VecDeque::with_capacity(1);
512 blocks.push_back(item);
513 remaining.clients.insert(client, blocks);
514 }
515 }
516 }
517
518 fn decode_block<D: Decoder>(id: ID, decoder: &mut D) -> Result<Option<BlockCarrier>, Error> {
519 let info = decoder.read_info()?;
520 match info {
521 BLOCK_SKIP_REF_NUMBER => {
522 let len: u32 = decoder.read_var()?;
523 Ok(Some(BlockCarrier::Skip(BlockRange { id, len })))
524 }
525 BLOCK_GC_REF_NUMBER => {
526 let len: u32 = decoder.read_len()?;
527 Ok(Some(BlockCarrier::GC(BlockRange { id, len })))
528 }
529 info => {
530 let cant_copy_parent_info = info & (HAS_ORIGIN | HAS_RIGHT_ORIGIN) == 0;
531 let origin = if info & HAS_ORIGIN != 0 {
532 Some(decoder.read_left_id()?)
533 } else {
534 None
535 };
536 let right_origin = if info & HAS_RIGHT_ORIGIN != 0 {
537 Some(decoder.read_right_id()?)
538 } else {
539 None
540 };
541 let parent = if cant_copy_parent_info {
542 if decoder.read_parent_info()? {
543 TypePtr::Named(decoder.read_string()?.into())
544 } else {
545 TypePtr::ID(decoder.read_left_id()?)
546 }
547 } else {
548 TypePtr::Unknown
549 };
550 let parent_sub: Option<Arc<str>> =
551 if cant_copy_parent_info && (info & HAS_PARENT_SUB != 0) {
552 Some(decoder.read_string()?.into())
553 } else {
554 None
555 };
556 let content = ItemContent::decode(decoder, info)?;
557 let item = Item::new(
558 id,
559 None,
560 origin,
561 None,
562 right_origin,
563 parent,
564 parent_sub,
565 content,
566 );
567 match item {
568 None => Ok(None),
569 Some(item) => Ok(Some(BlockCarrier::from(item))),
570 }
571 }
572 }
573 }
574
575 pub(crate) fn encode_diff<E: Encoder>(&self, remote_sv: &StateVector, encoder: &mut E) {
576 let mut clients = HashMap::new();
577 for (client, blocks) in self.blocks.clients.iter() {
578 let remote_clock = remote_sv.get(client);
579 let mut iter = blocks.iter();
580 let mut curr = iter.next();
581 while let Some(block) = curr {
582 if block.is_skip() {
583 curr = iter.next();
584 } else if block.id().clock + block.len() > remote_clock {
585 let e = clients.entry(*client).or_insert_with(|| (0, Vec::new()));
586 e.0 = (remote_clock as i64 - block.id().clock as i64).max(0) as u32;
587 e.1.push(block);
588 curr = iter.next();
589 while let Some(block) = curr {
590 e.1.push(block);
591 curr = iter.next();
592 }
593 } else {
594 curr = iter.next();
596 }
597 }
598 }
599
600 let mut sorted_clients: Vec<_> =
602 clients.iter().filter(|(_, (_, q))| !q.is_empty()).collect();
603 sorted_clients.sort_by(|&(x_id, _), &(y_id, _)| y_id.cmp(x_id));
604
605 encoder.write_var(sorted_clients.len());
607 for (&client, (offset, blocks)) in sorted_clients {
608 encoder.write_var(blocks.len());
609 encoder.write_client(client);
610
611 let mut block = blocks[0];
612 encoder.write_var(block.id().clock + offset);
613 block.encode_with_offset(encoder, *offset);
614 for i in 1..blocks.len() {
615 block = blocks[i];
616 block.encode_with_offset(encoder, 0);
617 }
618 }
619 self.delete_set.encode(encoder)
620 }
621
622 pub fn merge_updates<T>(block_stores: T) -> Update
623 where
624 T: IntoIterator<Item = Update>,
625 {
626 let mut result = Update::new();
627 let update_blocks: Vec<UpdateBlocks> = block_stores
628 .into_iter()
629 .map(|update| {
630 result.delete_set.merge(update.delete_set);
631 update.blocks
632 })
633 .collect();
634
635 let mut lazy_struct_decoders: VecDeque<_> = update_blocks
636 .into_iter()
637 .filter(|block_store| !block_store.is_empty())
638 .map(|update_blocks| {
639 let mut memo = update_blocks.into_blocks(true).memoized();
640 memo.move_next();
641 memo
642 })
643 .collect();
644
645 let mut curr_write: Option<BlockCarrier> = None;
646
647 loop {
651 {
652 lazy_struct_decoders
653 .retain(|lazy_struct_decoder| lazy_struct_decoder.current().is_some());
654 lazy_struct_decoders
656 .make_contiguous()
657 .sort_by(|dec1, dec2| {
658 let left = dec1.current().unwrap();
660 let right = dec2.current().unwrap();
661 let lid = left.id();
662 let rid = right.id();
663 match lid.client.cmp(&rid.client) {
664 Ordering::Equal => match lid.clock.cmp(&rid.clock) {
665 Ordering::Equal if left.same_type(right) => Ordering::Equal,
666 Ordering::Equal if left.is_skip() => Ordering::Greater,
667 Ordering::Equal => Ordering::Less,
668 Ordering::Less if !left.is_skip() || right.is_skip() => {
669 Ordering::Less
670 }
671 ordering => ordering,
672 },
673 ordering => ordering.reverse(),
674 }
675 });
676 }
677
678 let curr_decoder = match lazy_struct_decoders.iter_mut().next() {
679 Some(decoder) => decoder,
680 None => break,
681 };
682
683 let curr_block = match curr_decoder.current() {
684 Some(block) => block,
685 None => continue,
686 };
687 let first_client = curr_block.id().client;
690 if let Some(curr_write_block) = curr_write.as_mut() {
691 let mut iterated = false;
692
693 let curr_write_last = curr_write_block.id().clock + curr_write_block.len();
696 while match curr_decoder.current() {
697 Some(block) => {
698 let last = block.id().clock + block.len();
699 last <= curr_write_last && block.id().client >= curr_write_block.id().client
700 }
701 None => false,
702 } {
703 curr_decoder.move_next();
704 iterated = true;
705 }
706
707 let curr_block = match curr_decoder.current() {
708 Some(block) => block,
709 None => continue,
710 };
711 let cid = curr_block.id();
712 if cid.client != first_client || (iterated && cid.clock > curr_write_last)
714 {
716 continue;
717 }
718
719 if first_client != curr_write_block.id().client {
720 result
721 .blocks
722 .add_block(curr_write.unwrap_or_else(|| unreachable!()));
723 curr_write = curr_decoder.take();
724 curr_decoder.move_next();
725 } else if curr_write_last < curr_block.id().clock {
726 let skip = match curr_write.unwrap_or_else(|| unreachable!()) {
728 BlockCarrier::Skip(mut skip) => {
729 skip.len = curr_block.id().clock + curr_block.len() - skip.id.clock;
731 skip
732 }
733 other => {
734 result.blocks.add_block(other);
735 let diff = curr_block.id().clock - curr_write_last;
736 BlockRange::new(ID::new(first_client, curr_write_last), diff)
737 }
738 };
739 curr_write = Some(BlockCarrier::Skip(skip));
740 } else {
741 let diff = curr_write_last.saturating_sub(curr_block.id().clock);
743
744 let mut block_slice = None;
745 if diff > 0 {
746 if let BlockCarrier::Skip(skip) = curr_write_block {
747 skip.len -= diff as u32;
749 } else {
750 block_slice = Some(curr_block.splice(diff as u32).unwrap());
751 }
752 }
753
754 let curr_block = block_slice
755 .as_ref()
756 .or(curr_decoder.current())
757 .unwrap_or_else(|| unreachable!());
758 if !curr_write_block.try_squash(curr_block) {
759 result
760 .blocks
761 .add_block(curr_write.unwrap_or_else(|| unreachable!()));
762 curr_write = block_slice.or_else(|| curr_decoder.take());
763 curr_decoder.move_next();
764 }
765 }
766 } else {
767 curr_write = curr_decoder.take();
768 curr_decoder.move_next();
769 }
770
771 while let Some(next) = curr_decoder.current() {
772 let block = curr_write.as_ref().unwrap();
773 let nid = next.id();
774 if nid.client == first_client && nid.clock == block.id().clock + block.len() {
775 result.blocks.add_block(curr_write.unwrap());
776 curr_write = curr_decoder.take();
777 curr_decoder.move_next();
778 } else {
779 break;
780 }
781 }
782 }
783
784 if let Some(block) = curr_write.take() {
785 result.blocks.add_block(block);
786 }
787
788 result
789 }
790}
791
792impl Encode for Update {
793 #[inline]
794 fn encode<E: Encoder>(&self, encoder: &mut E) {
795 self.encode_diff(&StateVector::default(), encoder)
796 }
797}
798
799impl Decode for Update {
800 fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, Error> {
801 let clients_len: u32 = decoder.read_var()?;
803 let mut clients = HashMap::with_hasher(BuildHasherDefault::default());
804 clients.try_reserve(clients_len as usize)?;
805
806 let mut blocks = UpdateBlocks { clients };
807 for _ in 0..clients_len {
808 let blocks_len = decoder.read_var::<u32>()? as usize;
809
810 let client = decoder.read_client()?;
811 let mut clock: u32 = decoder.read_var()?;
812 let blocks = blocks
813 .clients
814 .entry(client)
815 .or_insert_with(|| VecDeque::new());
816 blocks.try_reserve(blocks_len)?;
819
820 for _ in 0..blocks_len {
821 let id = ID::new(client, clock);
822 if let Some(block) = Self::decode_block(id, decoder)? {
823 clock += block.len();
826 blocks.push_back(block);
827 }
828 }
829 }
830 let delete_set = DeleteSet::decode(decoder)?;
832 Ok(Update { blocks, delete_set })
833 }
834}
835
836struct Memo<I: Iterator + ?Sized> {
841 current: Option<I::Item>,
842 iter: I,
843}
844
845impl<I: Iterator> Memo<I> {
846 fn current(&self) -> Option<&I::Item> {
847 self.current.as_ref()
848 }
849
850 fn take(&mut self) -> Option<I::Item> {
851 self.current.take()
852 }
853
854 fn move_next(&mut self) {
855 self.current = self.iter.next();
856 }
857}
858
859trait Memoizable: Iterator {
860 fn memoized(self) -> Memo<Self>;
861}
862
863impl<T: Iterator> Memoizable for T {
864 fn memoized(self) -> Memo<Self> {
865 Memo {
866 iter: self,
867 current: None,
868 }
869 }
870}
871
872#[derive(PartialEq)]
873pub(crate) enum BlockCarrier {
874 Item(Box<Item>),
875 GC(BlockRange),
876 Skip(BlockRange),
877}
878
879impl BlockCarrier {
880 pub(crate) fn splice(&self, offset: u32) -> Option<Self> {
881 match self {
882 BlockCarrier::Item(x) => {
883 let next = ItemPtr::from(x).splice(offset, OffsetKind::Utf16)?;
884 Some(BlockCarrier::Item(next))
885 }
886 BlockCarrier::Skip(x) => {
887 if offset == 0 {
888 None
889 } else {
890 Some(BlockCarrier::Skip(x.slice(offset)))
891 }
892 }
893 BlockCarrier::GC(x) => {
894 if offset == 0 {
895 None
896 } else {
897 Some(BlockCarrier::GC(x.slice(offset)))
898 }
899 }
900 }
901 }
902 pub(crate) fn same_type(&self, other: &BlockCarrier) -> bool {
903 match (self, other) {
904 (BlockCarrier::Skip(_), BlockCarrier::Skip(_)) => true,
905 (BlockCarrier::Item(_), BlockCarrier::Item(_)) => true,
906 (BlockCarrier::GC(_), BlockCarrier::GC(_)) => true,
907 (_, _) => false,
908 }
909 }
910 pub(crate) fn id(&self) -> &ID {
911 match self {
912 BlockCarrier::Item(x) => x.id(),
913 BlockCarrier::Skip(x) => &x.id,
914 BlockCarrier::GC(x) => &x.id,
915 }
916 }
917
918 pub(crate) fn len(&self) -> u32 {
919 match self {
920 BlockCarrier::Item(x) => x.len(),
921 BlockCarrier::Skip(x) => x.len,
922 BlockCarrier::GC(x) => x.len,
923 }
924 }
925
926 pub(crate) fn range(&self) -> BlockRange {
927 match self {
928 BlockCarrier::Item(item) => BlockRange::new(item.id, item.len),
929 BlockCarrier::GC(gc) => gc.clone(),
930 BlockCarrier::Skip(skip) => skip.clone(),
931 }
932 }
933
934 pub(crate) fn last_id(&self) -> ID {
935 match self {
936 BlockCarrier::Item(x) => x.last_id(),
937 BlockCarrier::Skip(x) => x.last_id(),
938 BlockCarrier::GC(x) => x.last_id(),
939 }
940 }
941
942 pub(crate) fn try_squash(&mut self, other: &BlockCarrier) -> bool {
943 match (self, other) {
944 (BlockCarrier::Item(a), BlockCarrier::Item(b)) => {
945 ItemPtr::from(a).try_squash(ItemPtr::from(b))
946 }
947 (BlockCarrier::Skip(a), BlockCarrier::Skip(b)) => {
948 a.merge(b);
949 true
950 }
951 _ => false,
952 }
953 }
954
955 pub fn as_item_ptr(&mut self) -> Option<ItemPtr> {
956 if let BlockCarrier::Item(block) = self {
957 Some(ItemPtr::from(block))
958 } else {
959 None
960 }
961 }
962
963 pub fn into_block(self) -> Option<Box<Item>> {
964 if let BlockCarrier::Item(block) = self {
965 Some(block)
966 } else {
967 None
968 }
969 }
970
971 #[inline]
972 pub fn is_skip(&self) -> bool {
973 if let BlockCarrier::Skip(_) = self {
974 true
975 } else {
976 false
977 }
978 }
979 pub fn encode_with_offset<E: Encoder>(&self, encoder: &mut E, offset: u32) {
980 match self {
981 BlockCarrier::Item(x) => {
982 let slice = ItemSlice::new(x.into(), offset, x.len() - 1);
983 slice.encode(encoder)
984 }
985 BlockCarrier::Skip(x) => {
986 encoder.write_info(BLOCK_SKIP_REF_NUMBER);
987 encoder.write_var(x.len - offset);
988 }
989 BlockCarrier::GC(x) => {
990 encoder.write_info(BLOCK_GC_REF_NUMBER);
991 encoder.write_len(x.len - offset);
992 }
993 }
994 }
995
996 pub fn integrate(&mut self, txn: &mut TransactionMut, offset: u32) -> bool {
997 match self {
998 BlockCarrier::Item(x) => ItemPtr::from(x).integrate(txn, offset),
999 BlockCarrier::Skip(x) => x.integrate(offset),
1000 BlockCarrier::GC(x) => x.integrate(offset),
1001 }
1002 }
1003}
1004
1005impl From<Box<Item>> for BlockCarrier {
1006 fn from(block: Box<Item>) -> Self {
1007 BlockCarrier::Item(block)
1008 }
1009}
1010
1011impl Encode for BlockCarrier {
1012 fn encode<E: Encoder>(&self, encoder: &mut E) {
1013 match self {
1014 BlockCarrier::Item(block) => block.encode(encoder),
1015 BlockCarrier::Skip(skip) => {
1016 encoder.write_info(BLOCK_SKIP_REF_NUMBER);
1017 encoder.write_len(skip.len)
1018 }
1019 BlockCarrier::GC(gc) => {
1020 encoder.write_info(BLOCK_GC_REF_NUMBER);
1021 encoder.write_len(gc.len)
1022 }
1023 }
1024 }
1025}
1026
1027#[derive(Debug, PartialEq)]
1029pub struct PendingUpdate {
1030 pub update: Update,
1032 pub missing: StateVector,
1035}
1036
1037impl std::fmt::Debug for Update {
1038 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1039 std::fmt::Display::fmt(self, f)
1040 }
1041}
1042
1043impl std::fmt::Display for Update {
1044 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1045 let mut s = f.debug_struct("");
1046 if !self.blocks.is_empty() {
1047 s.field("blocks", &self.blocks);
1048 }
1049 if !self.delete_set.is_empty() {
1050 s.field("delete set", &self.delete_set);
1051 }
1052 s.finish()
1053 }
1054}
1055
1056#[cfg(test)]
1058impl Into<Store> for Update {
1059 fn into(self) -> Store {
1060 use crate::doc::Options;
1061
1062 let mut store = Store::new(&Options::with_client_id(0));
1063 for (_, vec) in self.blocks.clients {
1064 for block in vec {
1065 if let BlockCarrier::Item(block) = block {
1066 store.blocks.push_block(block);
1067 } else {
1068 panic!("Cannot convert Update into block store - Skip block detected");
1069 }
1070 }
1071 }
1072 store
1073 }
1074}
1075
1076pub(crate) struct Blocks<'a> {
1077 current_client: std::vec::IntoIter<(&'a ClientID, &'a VecDeque<BlockCarrier>)>,
1078 current_block: Option<std::collections::vec_deque::Iter<'a, BlockCarrier>>,
1079}
1080
1081impl<'a> Blocks<'a> {
1082 fn new(update: &'a UpdateBlocks) -> Self {
1083 let mut client_blocks: Vec<(&'a ClientID, &'a VecDeque<BlockCarrier>)> =
1084 update.clients.iter().collect();
1085 client_blocks.sort_by(|a, b| b.0.cmp(a.0));
1087 let mut current_client = client_blocks.into_iter();
1088
1089 let current_block = current_client.next().map(|(_, v)| v.iter());
1090 Blocks {
1091 current_client,
1092 current_block,
1093 }
1094 }
1095}
1096
1097impl<'a> Iterator for Blocks<'a> {
1098 type Item = &'a BlockCarrier;
1099
1100 fn next(&mut self) -> Option<Self::Item> {
1101 if let Some(blocks) = self.current_block.as_mut() {
1102 let block = blocks.next();
1103 if block.is_some() {
1104 return block;
1105 }
1106 }
1107
1108 if let Some(entry) = self.current_client.next() {
1109 self.current_block = Some(entry.1.iter());
1110 self.next()
1111 } else {
1112 None
1113 }
1114 }
1115}
1116
1117pub(crate) struct IntoBlocks {
1118 current_client: std::vec::IntoIter<(ClientID, VecDeque<BlockCarrier>)>,
1119 current_block: Option<std::collections::vec_deque::IntoIter<BlockCarrier>>,
1120 ignore_skip: bool,
1121}
1122
1123impl IntoBlocks {
1124 fn new(update: UpdateBlocks, ignore_skip: bool) -> Self {
1125 let mut client_blocks: Vec<(ClientID, VecDeque<BlockCarrier>)> =
1126 update.clients.into_iter().collect();
1127 client_blocks.sort_by(|a, b| b.0.cmp(&a.0));
1129 let mut current_client = client_blocks.into_iter();
1130
1131 let current_block = current_client.next().map(|(_, v)| v.into_iter());
1132 IntoBlocks {
1133 current_client,
1134 current_block,
1135 ignore_skip,
1136 }
1137 }
1138}
1139
1140impl Iterator for IntoBlocks {
1141 type Item = BlockCarrier;
1142
1143 fn next(&mut self) -> Option<Self::Item> {
1144 if let Some(blocks) = self.current_block.as_mut() {
1145 let block = blocks.next();
1146 match block {
1147 Some(BlockCarrier::Skip(_)) if self.ignore_skip => return self.next(),
1148 Some(block) => return Some(block),
1149 None => {}
1150 }
1151 }
1152
1153 if let Some(entry) = self.current_client.next() {
1154 self.current_block = Some(entry.1.into_iter());
1155 self.next()
1156 } else {
1157 None
1158 }
1159 }
1160}
1161
1162#[cfg(test)]
1163mod test {
1164 use std::collections::{HashMap, VecDeque};
1165 use std::iter::FromIterator;
1166 use std::sync::{Arc, Mutex};
1167
1168 use crate::block::{BlockRange, ClientID, Item, ItemContent};
1169 use crate::encoding::read::Cursor;
1170 use crate::types::{Delta, TypePtr};
1171 use crate::update::{BlockCarrier, Update, UpdateBlocks};
1172 use crate::updates::decoder::{Decode, DecoderV1};
1173 use crate::updates::encoder::Encode;
1174 use crate::{
1175 merge_updates_v1, Any, DeleteSet, Doc, GetString, Options, ReadTxn, StateVector, Text,
1176 Transact, WriteTxn, XmlFragment, XmlOut, ID,
1177 };
1178
1179 #[test]
1180 fn update_decode() {
1181 let update: &[u8] = &[
1195 1, 1, 176, 249, 159, 198, 7, 0, 40, 1, 0, 4, 107, 101, 121, 66, 1, 119, 6, 118, 97,
1196 108, 117, 101, 66, 0,
1197 ];
1198 let mut decoder = DecoderV1::from(update);
1199 let u = Update::decode(&mut decoder).unwrap();
1200
1201 let id = ID::new(2026372272, 0);
1202 let block = u.blocks.clients.get(&id.client).unwrap();
1203 let mut expected: Vec<BlockCarrier> = Vec::new();
1204 expected.push(
1205 Item::new(
1206 id,
1207 None,
1208 None,
1209 None,
1210 None,
1211 TypePtr::Named("".into()),
1212 Some("keyB".into()),
1213 ItemContent::Any(vec!["valueB".into()]),
1214 )
1215 .unwrap()
1216 .into(),
1217 );
1218 assert_eq!(block, &expected);
1219 }
1220
1221 #[test]
1222 fn update_merge() {
1223 let d1 = Doc::with_client_id(1);
1224 let txt1 = d1.get_or_insert_text("test");
1225 let mut t1 = d1.transact_mut();
1226
1227 let d2 = Doc::with_client_id(2);
1228 let txt2 = d2.get_or_insert_text("test");
1229 let mut t2 = d2.transact_mut();
1230
1231 txt1.insert(&mut t1, 0, "aaa");
1232 txt1.insert(&mut t1, 0, "aaa");
1233
1234 txt2.insert(&mut t2, 0, "bbb");
1235 txt2.insert(&mut t2, 2, "bbb");
1236
1237 let binary1 = t1.encode_update_v1();
1238 let binary2 = t2.encode_update_v1();
1239
1240 t1.apply_update(Update::decode_v1(binary2.as_slice()).unwrap())
1241 .unwrap();
1242 t2.apply_update(Update::decode_v1(binary1.as_slice()).unwrap())
1243 .unwrap();
1244
1245 let u1 = Update::decode(&mut DecoderV1::new(Cursor::new(binary1.as_slice()))).unwrap();
1246 let u2 = Update::decode(&mut DecoderV1::new(Cursor::new(binary2.as_slice()))).unwrap();
1247
1248 let u12 = Update::merge_updates(vec![u1, u2]);
1251
1252 let d3 = Doc::with_client_id(3);
1253 let txt3 = d3.get_or_insert_text("test");
1254 let mut t3 = d3.transact_mut();
1255 t3.apply_update(u12).unwrap();
1256
1257 let str1 = txt1.get_string(&t1);
1258 let str2 = txt2.get_string(&t2);
1259 let str3 = txt3.get_string(&t3);
1260
1261 assert_eq!(str1, str2);
1262 assert_eq!(str2, str3);
1263 }
1264
1265 #[test]
1266 fn test_duplicate_updates() {
1267 let doc = Doc::with_client_id(1);
1268 let txt = doc.get_or_insert_text("test");
1269 let mut tr = doc.transact_mut();
1270 txt.insert(&mut tr, 0, "aaa");
1271
1272 let binary = tr.encode_update_v1();
1273 let u1 = decode_update(&binary);
1274 let u2 = decode_update(&binary);
1275 let u3 = decode_update(&binary);
1276
1277 let merged_update = Update::merge_updates(vec![u1, u2]);
1278 assert_eq!(merged_update, u3);
1279 }
1280
1281 #[test]
1282 fn test_multiple_clients_in_one_update() {
1283 let binary1 = {
1284 let doc = Doc::with_client_id(1);
1285 let txt = doc.get_or_insert_text("test");
1286 let mut tr = doc.transact_mut();
1287 txt.insert(&mut tr, 0, "aaa");
1288 tr.encode_update_v1()
1289 };
1290 let binary2 = {
1291 let doc = Doc::with_client_id(2);
1292 let txt = doc.get_or_insert_text("test");
1293 let mut tr = doc.transact_mut();
1294 txt.insert(&mut tr, 0, "bbb");
1295 tr.encode_update_v1()
1296 };
1297
1298 let u12 = Update::merge_updates(vec![decode_update(&binary1), decode_update(&binary2)]);
1299 let u12_copy =
1300 Update::merge_updates(vec![decode_update(&binary1), decode_update(&binary2)]);
1301
1302 assert_eq!(2, u12.blocks.clients.keys().len());
1303
1304 let merged_update = Update::merge_updates(vec![u12]);
1305 assert_eq!(merged_update, u12_copy);
1306 }
1307
1308 #[test]
1309 fn test_v2_encoding_of_fragmented_delete_set() {
1310 let before = vec![
1311 0, 1, 0, 11, 129, 215, 239, 201, 16, 198, 237, 152, 220, 8, 4, 4, 0, 4, 1, 1, 0, 11,
1312 40, 3, 39, 0, 4, 0, 7, 0, 40, 3, 8, 163, 1, 142, 1, 110, 111, 116, 101, 46, 103, 117,
1313 105, 100, 110, 111, 116, 101, 71, 117, 105, 100, 110, 111, 116, 101, 46, 111, 119, 110,
1314 101, 114, 111, 119, 110, 101, 114, 110, 111, 116, 101, 46, 116, 121, 112, 101, 110,
1315 111, 116, 101, 84, 121, 112, 101, 110, 111, 116, 101, 46, 99, 114, 101, 97, 116, 101,
1316 84, 105, 109, 101, 99, 114, 101, 97, 116, 101, 84, 105, 109, 101, 110, 111, 116, 101,
1317 46, 116, 105, 116, 108, 101, 116, 105, 116, 108, 101, 49, 112, 114, 111, 115, 101, 109,
1318 105, 114, 114, 111, 114, 108, 105, 110, 107, 110, 111, 116, 101, 103, 117, 105, 100,
1319 115, 108, 111, 116, 71, 117, 105, 100, 116, 121, 112, 101, 108, 105, 110, 107, 84, 121,
1320 112, 101, 99, 104, 105, 108, 100, 114, 101, 110, 98, 9, 8, 10, 5, 9, 8, 15, 74, 0, 5,
1321 1, 11, 8, 4, 8, 4, 72, 0, 1, 9, 1, 4, 0, 0, 1, 0, 0, 3, 1, 2, 2, 3, 2, 65, 8, 2, 4, 0,
1322 119, 22, 97, 99, 70, 120, 85, 89, 68, 76, 82, 104, 101, 114, 107, 74, 97, 66, 101, 99,
1323 115, 99, 51, 103, 125, 136, 57, 125, 0, 119, 13, 49, 54, 56, 53, 53, 51, 48, 50, 56,
1324 54, 54, 53, 54, 9, 0, 119, 22, 66, 67, 100, 81, 112, 112, 119, 69, 84, 48, 105, 82, 86,
1325 66, 81, 45, 56, 69, 87, 50, 87, 103, 119, 22, 106, 114, 69, 109, 73, 77, 112, 86, 84,
1326 101, 45, 99, 114, 78, 50, 86, 76, 51, 99, 97, 72, 81, 119, 8, 108, 105, 110, 107, 110,
1327 111, 116, 101, 119, 1, 49, 118, 2, 4, 103, 117, 105, 100, 119, 22, 66, 67, 100, 81,
1328 112, 112, 119, 69, 84, 48, 105, 82, 86, 66, 81, 45, 56, 69, 87, 50, 87, 103, 8, 115,
1329 108, 111, 116, 71, 117, 105, 100, 119, 22, 106, 114, 69, 109, 73, 77, 112, 86, 84, 101,
1330 45, 99, 114, 78, 50, 86, 76, 51, 99, 97, 72, 81, 119, 22, 66, 67, 100, 81, 112, 112,
1331 119, 69, 84, 48, 105, 82, 86, 66, 81, 45, 56, 69, 87, 50, 87, 103, 0,
1332 ];
1333 let update = vec![
1334 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 198, 182, 140, 174, 4, 1, 2, 0, 0, 5,
1335 ];
1336 let doc = Doc::with_options(Options {
1337 skip_gc: true,
1338 client_id: 1,
1339 ..Default::default()
1340 });
1341 let prosemirror = doc.get_or_insert_xml_fragment("prosemirror");
1342 {
1343 let mut txn = doc.transact_mut();
1344 let u = Update::decode_v2(&before).unwrap();
1345 txn.apply_update(u).unwrap();
1346 let linknote = prosemirror.get(&txn, 0);
1347 let actual = linknote.and_then(|xml| match xml {
1348 XmlOut::Element(elem) => Some(elem.tag().clone()),
1349 _ => None,
1350 });
1351 assert_eq!(actual, Some("linknote".into()));
1352 }
1353 {
1354 let mut txn = doc.transact_mut();
1355 let u = Update::decode_v2(&update).unwrap();
1356 txn.apply_update(u).unwrap();
1357
1358 let binary = txn.encode_update_v2();
1360 let _ = Update::decode_v2(&binary).unwrap();
1361
1362 let linknote = prosemirror.get(&txn, 0);
1363 assert!(linknote.is_none());
1364 }
1365 }
1366
1367 #[test]
1368 fn merge_pending_updates() {
1369 let d0 = Doc::with_client_id(0);
1370 let server_updates = Arc::new(Mutex::new(vec![]));
1371 let sub = {
1372 let server_updates = server_updates.clone();
1373 d0.observe_update_v1(move |_, update| {
1374 let mut lock = server_updates.lock().unwrap();
1375 lock.push(update.update.clone());
1376 })
1377 .unwrap()
1378 };
1379 let txt = d0.get_or_insert_text("textBlock");
1380 txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("r")]);
1381 txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("o")]);
1382 txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("n")]);
1383 txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("e")]);
1384 txt.apply_delta(&mut d0.transact_mut(), [Delta::insert("n")]);
1385 drop(sub);
1386
1387 let updates = Arc::into_inner(server_updates).unwrap();
1388 let updates = updates.into_inner().unwrap();
1389
1390 let d1 = Doc::with_client_id(1);
1391 d1.transact_mut()
1392 .apply_update(Update::decode_v1(&updates[0]).unwrap())
1393 .unwrap();
1394 let u1 = d1
1395 .transact()
1396 .encode_state_as_update_v1(&StateVector::default());
1397
1398 let d2 = Doc::with_client_id(2);
1399 d2.transact_mut()
1400 .apply_update(Update::decode_v1(&u1).unwrap())
1401 .unwrap();
1402 d2.transact_mut()
1403 .apply_update(Update::decode_v1(&updates[1]).unwrap())
1404 .unwrap();
1405 let u2 = d2
1406 .transact()
1407 .encode_state_as_update_v1(&StateVector::default());
1408
1409 let d3 = Doc::with_client_id(3);
1410 d3.transact_mut()
1411 .apply_update(Update::decode_v1(&u2).unwrap())
1412 .unwrap();
1413 d3.transact_mut()
1414 .apply_update(Update::decode_v1(&updates[3]).unwrap())
1415 .unwrap();
1416 let u3 = d3
1417 .transact()
1418 .encode_state_as_update_v1(&StateVector::default());
1419
1420 let d4 = Doc::with_client_id(4);
1421 d4.transact_mut()
1422 .apply_update(Update::decode_v1(&u3).unwrap())
1423 .unwrap();
1424 d4.transact_mut()
1425 .apply_update(Update::decode_v1(&updates[2]).unwrap())
1426 .unwrap();
1427 let u4 = d4
1428 .transact()
1429 .encode_state_as_update_v1(&StateVector::default());
1430
1431 let d5 = Doc::with_client_id(5);
1432 d5.transact_mut()
1433 .apply_update(Update::decode_v1(&u4).unwrap())
1434 .unwrap();
1435 d5.transact_mut()
1436 .apply_update(Update::decode_v1(&updates[4]).unwrap())
1437 .unwrap();
1438
1439 let txt5 = d5.get_or_insert_text("textBlock");
1440 let str = txt5.get_string(&d5.transact());
1441 assert_eq!(str, "nenor");
1442 }
1443
1444 #[test]
1445 fn update_state_vector_with_skips() {
1446 let mut update = Update::new();
1447 update
1449 .blocks
1450 .add_block(BlockCarrier::Skip(BlockRange::new(ID::new(1, 0), 1)));
1451 update.blocks.add_block(test_item(1, 1, 1));
1452 update.blocks.add_block(test_item(2, 1, 1));
1454 update.blocks.add_block(test_item(3, 0, 1));
1456 update
1457 .blocks
1458 .add_block(BlockCarrier::Skip(BlockRange::new(ID::new(3, 1), 1)));
1459 update.blocks.add_block(test_item(3, 2, 1));
1460
1461 let sv = update.state_vector();
1462 assert_eq!(sv, StateVector::from_iter([(3, 1)]));
1463 }
1464
1465 #[test]
1466 fn test_extends() {
1467 let mut u = Update::new();
1468 u.blocks.add_block(test_item(1, 0, 2)); assert!(u.extends(&StateVector::from_iter([(1, 1)])));
1470
1471 let mut u = Update::new();
1472 u.blocks.add_block(test_item(1, 0, 1)); assert!(!u.extends(&StateVector::from_iter([(1, 1)])));
1474
1475 let mut u = Update::new();
1476 u.blocks
1477 .add_block(BlockCarrier::Skip(BlockRange::new(ID::new(1, 0), 2)));
1478 u.blocks.add_block(test_item(1, 2, 1)); assert!(!u.extends(&StateVector::from_iter([(1, 1)])));
1480
1481 let mut u = Update::new();
1482 u.blocks.add_block(test_item(1, 1, 1)); assert!(u.extends(&StateVector::from_iter([(1, 1)])));
1484
1485 let mut u = Update::new();
1486 u.blocks.add_block(test_item(1, 2, 1)); assert!(!u.extends(&StateVector::from_iter([(1, 1)])));
1488 }
1489
1490 #[test]
1491 fn empty_update_v1() {
1492 let u = Update::new();
1493 let binary = u.encode_v1();
1494 assert_eq!(&binary, Update::EMPTY_V1)
1495 }
1496
1497 #[test]
1498 fn empty_update_v2() {
1499 let u = Update::new();
1500 let binary = u.encode_v2();
1501 assert_eq!(&binary, Update::EMPTY_V2)
1502 }
1503
1504 #[test]
1505 fn update_v2_with_skips() {
1506 let u1 = update_with_skips();
1507 let encoded = u1.encode_v2();
1508 let u2 = Update::decode_v2(&encoded).unwrap();
1509 assert_eq!(u1, u2);
1510 }
1511
1512 #[test]
1513 fn pending_update_check() {
1514 let update = update_with_skips();
1515 let expected = update.encode_v1();
1516 let doc = Doc::with_client_id(2);
1517 let mut txn = doc.transact_mut();
1518 let txt = txn.get_or_insert_text("test");
1519 txn.apply_update(update).unwrap();
1520 let str = txt.get_string(&txn);
1521 assert_eq!(str, "hello"); assert!(txn.has_missing_updates());
1523 let state = txn.encode_state_as_update_v1(&Default::default());
1524 assert_eq!(state, expected); let pending = txn.prune_pending();
1526 assert!(pending.is_some());
1527 let state = txn.encode_state_as_update_v1(&Default::default());
1528 assert_ne!(state, expected); let joined = merge_updates_v1([state, pending.unwrap().encode_v1()]).unwrap();
1530 assert_eq!(joined, expected); }
1532
1533 fn update_with_skips() -> Update {
1534 Update {
1535 blocks: UpdateBlocks {
1536 clients: HashMap::from_iter([(
1537 1,
1538 VecDeque::from_iter([
1539 BlockCarrier::Item(
1540 Item::new(
1541 ID::new(1, 0),
1542 None,
1543 None,
1544 None,
1545 None,
1546 TypePtr::Named("test".into()),
1547 None,
1548 ItemContent::String("hello".into()),
1549 )
1550 .unwrap(),
1551 ),
1552 BlockCarrier::Skip(BlockRange::new(ID::new(1, 5), 3)),
1553 BlockCarrier::Item(
1554 Item::new(
1555 ID::new(1, 8),
1556 None,
1557 Some(ID::new(1, 7)),
1558 None,
1559 None,
1560 TypePtr::Unknown,
1561 None,
1562 ItemContent::String("world".into()),
1563 )
1564 .unwrap(),
1565 ),
1566 ]),
1567 )]),
1568 },
1569 delete_set: DeleteSet::default(),
1570 }
1571 }
1572
1573 fn test_item(client_id: ClientID, clock: u32, len: u32) -> BlockCarrier {
1574 assert!(len > 0);
1575 let any: Vec<_> = (0..len).into_iter().map(Any::from).collect();
1576 BlockCarrier::Item(
1577 Item::new(
1578 ID::new(client_id, clock),
1579 None,
1580 None,
1581 None,
1582 None,
1583 TypePtr::Named("test".into()),
1584 None,
1585 ItemContent::Any(any),
1586 )
1587 .unwrap(),
1588 )
1589 }
1590
1591 fn decode_update(bin: &[u8]) -> Update {
1592 Update::decode(&mut DecoderV1::new(Cursor::new(bin))).unwrap()
1593 }
1594}