1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::sync::Mutex;
4use crate::{
5 arena::SharedArena,
6 change::Change,
7 estimated_size::EstimatedSize,
8 kv_store::KvStore,
9 op::Op,
10 parent::register_container_and_parent_link,
11 version::{Frontiers, ImVersionVector},
12 VersionVector,
13};
14use block_encode::decode_block_range;
15use bytes::Bytes;
16use itertools::Itertools;
17use loro_common::{
18 Counter, HasCounter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport,
19 LoroError, LoroResult, PeerID, ID,
20};
21use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
22use once_cell::sync::OnceCell;
23use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
24use std::sync::atomic::AtomicI64;
25use std::{
26 cmp::Ordering,
27 collections::{BTreeMap, VecDeque},
28 ops::{Bound, Deref},
29 sync::Arc,
30};
31use tracing::{info_span, warn};
32mod block_encode;
33mod block_meta_encode;
34pub(super) mod iter;
35
36#[cfg(not(test))]
37const MAX_BLOCK_SIZE: usize = 1024 * 4;
38#[cfg(test)]
39const MAX_BLOCK_SIZE: usize = 128;
40
41#[derive(Debug, Clone)]
61pub struct ChangeStore {
62 inner: Arc<Mutex<ChangeStoreInner>>,
63 arena: SharedArena,
64 external_kv: Arc<Mutex<dyn KvStore>>,
69 external_vv: Arc<Mutex<VersionVector>>,
71 merge_interval: Arc<AtomicI64>,
72}
73
74#[derive(Debug, Clone)]
75struct ChangeStoreInner {
76 start_vv: ImVersionVector,
79 start_frontiers: Frontiers,
81 mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct ChangesBlock {
87 peer: PeerID,
88 counter_range: (Counter, Counter),
89 lamport_range: (Lamport, Lamport),
90 estimated_size: usize,
92 flushed: bool,
93 content: ChangesBlockContent,
94}
95
96#[derive(Clone)]
97pub(crate) enum ChangesBlockContent {
98 Changes(Arc<Vec<Change>>),
99 Bytes(ChangesBlockBytes),
100 Both(Arc<Vec<Change>>, ChangesBlockBytes),
101}
102
103#[derive(Clone)]
105pub(crate) struct ChangesBlockBytes {
106 bytes: Bytes,
107 header: OnceCell<Arc<ChangesBlockHeader>>,
108}
109
110pub const START_VV_KEY: &[u8] = b"sv";
111pub const START_FRONTIERS_KEY: &[u8] = b"sf";
112pub const VV_KEY: &[u8] = b"vv";
113pub const FRONTIERS_KEY: &[u8] = b"fr";
114
115impl ChangeStore {
116 pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
117 Self {
118 inner: Arc::new(Mutex::new(ChangeStoreInner {
119 start_vv: ImVersionVector::new(),
120 start_frontiers: Frontiers::default(),
121 mem_parsed_kv: BTreeMap::new(),
122 })),
123 arena: a.clone(),
124 external_vv: Arc::new(Mutex::new(VersionVector::new())),
125 external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
126 merge_interval,
128 }
129 }
130
131 #[cfg(test)]
132 fn new_for_test() -> Self {
133 Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
134 }
135
136 pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
137 self.flush_and_compact(vv, frontiers);
138 let mut kv = self.external_kv.lock().unwrap();
139 kv.export_all()
140 }
141
142 #[tracing::instrument(skip(self), level = "debug")]
143 pub(super) fn export_from(
144 &self,
145 start_vv: &VersionVector,
146 start_frontiers: &Frontiers,
147 latest_vv: &VersionVector,
148 latest_frontiers: &Frontiers,
149 ) -> Bytes {
150 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
151 for span in latest_vv.sub_iter(start_vv) {
152 for c in self.iter_changes(span) {
155 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
156 as usize)
157 .min(c.atom_len());
158 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
159 as usize)
160 .min(c.atom_len());
161
162 if start == end {
163 continue;
164 }
165
166 let ch = c.slice(start, end);
167 new_store.insert_change(ch, false, false);
168 }
169 }
170
171 loro_common::debug!(
172 "start_vv={:?} start_frontiers={:?}",
173 &start_vv,
174 start_frontiers
175 );
176 new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
177 }
178
179 pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
180 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
181 for span in spans {
182 let mut span = *span;
183 span.normalize_();
184 for c in self.iter_changes(span) {
187 let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
188 let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
189 if start == end {
190 continue;
191 }
192
193 let ch = c.slice(start, end);
194 new_store.insert_change(ch, false, false);
195 }
196 }
197
198 encode_blocks_in_store(new_store, &self.arena, w);
199 }
200
201 fn encode_from(
202 &self,
203 start_vv: &VersionVector,
204 start_frontiers: &Frontiers,
205 latest_vv: &VersionVector,
206 latest_frontiers: &Frontiers,
207 ) -> Bytes {
208 {
209 let mut store = self.external_kv.lock().unwrap();
210 store.set(START_VV_KEY, start_vv.encode().into());
211 store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
212 let mut inner = self.inner.lock().unwrap();
213 inner.start_frontiers = start_frontiers.clone();
214 inner.start_vv = ImVersionVector::from_vv(start_vv);
215 }
216 self.flush_and_compact(latest_vv, latest_frontiers);
217 self.external_kv.lock().unwrap().export_all()
218 }
219
220 pub(crate) fn decode_snapshot_for_updates(
221 bytes: Bytes,
222 arena: &SharedArena,
223 self_vv: &VersionVector,
224 ) -> Result<Vec<Change>, LoroError> {
225 let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
226 let _ = change_store.import_all(bytes)?;
227 let mut changes = Vec::new();
228 change_store.visit_all_changes(&mut |c| {
229 let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
230 if c.id.counter >= cnt_threshold {
231 changes.push(c.clone());
232 return;
233 }
234
235 let change_end = c.ctr_end();
236 if change_end > cnt_threshold {
237 changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
238 }
239 });
240
241 Ok(changes)
242 }
243
244 pub(crate) fn decode_block_bytes(
245 bytes: Bytes,
246 arena: &SharedArena,
247 self_vv: &VersionVector,
248 ) -> LoroResult<Vec<Change>> {
249 let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
250 if ans.is_empty() {
251 return Ok(ans);
252 }
253
254 let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
255 ans.retain_mut(|c| {
256 if c.id.counter >= start {
257 true
258 } else if c.ctr_end() > start {
259 *c = c.slice((start - c.id.counter) as usize, c.atom_len());
260 true
261 } else {
262 false
263 }
264 });
265
266 Ok(ans)
267 }
268
269 pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
270 let block = self.get_block_that_contains(id)?;
271 Some(block.content.iter_dag_nodes())
272 }
273
274 pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
275 let block = self.get_the_last_block_of_peer(peer)?;
276 Some(block.content.iter_dag_nodes())
277 }
278
279 pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
280 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
281 let mut inner = self.inner.lock().unwrap();
282 for (_, block) in inner.mem_parsed_kv.iter_mut() {
283 block
284 .ensure_changes(&self.arena)
285 .expect("Parse block error");
286 for c in block.content.try_changes().unwrap() {
287 f(c);
288 }
289 }
290 }
291
292 pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
293 if id_span.counter.start == id_span.counter.end {
294 return vec![];
295 }
296
297 assert!(id_span.counter.start < id_span.counter.end);
298 self.ensure_block_loaded_in_range(
299 Bound::Included(id_span.id_start()),
300 Bound::Excluded(id_span.id_end()),
301 );
302 let mut inner = self.inner.lock().unwrap();
303 let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
304 match next_back {
305 None => {
306 return vec![];
307 }
308 Some(next_back) => {
309 if next_back.0.peer != id_span.peer {
310 return vec![];
311 }
312 }
313 }
314 let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
315 let ans = inner
316 .mem_parsed_kv
317 .range_mut(
318 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
319 )
320 .filter_map(|(_id, block)| {
321 if block.counter_range.1 < id_span.counter.start {
322 return None;
323 }
324
325 block
326 .ensure_changes(&self.arena)
327 .expect("Parse block error");
328 let changes = block.content.try_changes().unwrap();
329 let start;
330 let end;
331 if id_span.counter.start <= block.counter_range.0
332 && id_span.counter.end >= block.counter_range.1
333 {
334 start = 0;
335 end = changes.len();
336 } else {
337 start = block
338 .get_change_index_by_counter(id_span.counter.start)
339 .unwrap_or_else(|x| x);
340
341 match block.get_change_index_by_counter(id_span.counter.end - 1) {
342 Ok(e) => {
343 end = e + 1;
344 }
345 Err(0) => return None,
346 Err(e) => {
347 end = e;
348 }
349 }
350 }
351 if start == end {
352 return None;
353 }
354
355 Some((block.clone(), start, end))
356 })
357 .collect_vec();
359
360 ans
361 }
362
363 pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
364 let v = self.iter_blocks(id_span);
365 #[cfg(debug_assertions)]
366 {
367 if !v.is_empty() {
368 assert_eq!(v[0].0.peer, id_span.peer);
369 assert_eq!(v.last().unwrap().0.peer, id_span.peer);
370 {
371 let (block, start, _end) = v.first().unwrap();
373 let changes = block.content.try_changes().unwrap();
374 assert!(changes[*start].id.counter <= id_span.counter.start);
375 }
376 {
377 let (block, _start, end) = v.last().unwrap();
379 let changes = block.content.try_changes().unwrap();
380 assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
381 assert!(changes[*end - 1].ctr_start() < id_span.counter.end);
382 }
383 }
384 }
385
386 v.into_iter().flat_map(move |(block, start, end)| {
387 (start..end).map(move |i| BlockChangeRef {
388 change_index: i,
389 block: block.clone(),
390 })
391 })
392 }
393
394 pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
395 let mut inner = self.inner.lock().unwrap();
396 let start_counter = inner
397 .mem_parsed_kv
398 .range(..=id_span.id_start())
399 .next_back()
400 .map(|(id, _)| id.counter)
401 .unwrap_or(0);
402 let vec = inner
403 .mem_parsed_kv
404 .range_mut(
405 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
406 )
407 .filter_map(|(_id, block)| {
408 if block.counter_range.1 < id_span.counter.start {
409 return None;
410 }
411
412 block
413 .ensure_changes(&self.arena)
414 .expect("Parse block error");
415 Some(block.clone())
416 })
417 .collect();
419 vec
420 }
421
422 pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
423 self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
424 let inner = self.inner.lock().unwrap();
425 let block = inner
426 .mem_parsed_kv
427 .range(..=id)
428 .next_back()
429 .filter(|(_, block)| {
430 block.peer == id.peer
431 && block.counter_range.0 <= id.counter
432 && id.counter < block.counter_range.1
433 })
434 .map(|(_, block)| block.clone());
435
436 block
437 }
438
439 pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
440 let end_id = ID::new(peer, Counter::MAX);
441 self.ensure_id_lte(end_id);
442 let inner = self.inner.lock().unwrap();
443 let block = inner
444 .mem_parsed_kv
445 .range(..=end_id)
446 .next_back()
447 .filter(|(_, block)| block.peer == peer)
448 .map(|(_, block)| block.clone());
449
450 block
451 }
452
453 pub fn change_num(&self) -> usize {
454 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
455 let mut inner = self.inner.lock().unwrap();
456 inner
457 .mem_parsed_kv
458 .iter_mut()
459 .map(|(_, block)| block.change_num())
460 .sum()
461 }
462
463 pub fn fork(
464 &self,
465 arena: SharedArena,
466 merge_interval: Arc<AtomicI64>,
467 vv: &VersionVector,
468 frontiers: &Frontiers,
469 ) -> Self {
470 self.flush_and_compact(vv, frontiers);
471 let inner = self.inner.lock().unwrap();
472 Self {
473 inner: Arc::new(Mutex::new(ChangeStoreInner {
474 start_vv: inner.start_vv.clone(),
475 start_frontiers: inner.start_frontiers.clone(),
476 mem_parsed_kv: BTreeMap::new(),
477 })),
478 arena,
479 external_vv: Arc::new(Mutex::new(self.external_vv.lock().unwrap().clone())),
480 external_kv: self.external_kv.lock().unwrap().clone_store(),
481 merge_interval,
482 }
483 }
484
485 pub fn kv_size(&self) -> usize {
486 self.external_kv
487 .lock()
488 .unwrap()
489 .scan(Bound::Unbounded, Bound::Unbounded)
490 .map(|(k, v)| k.len() + v.len())
491 .sum()
492 }
493
494 pub(crate) fn export_blocks_from<W: std::io::Write>(
495 &self,
496 start_vv: &VersionVector,
497 shallow_since_vv: &ImVersionVector,
498 latest_vv: &VersionVector,
499 w: &mut W,
500 ) {
501 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
502 for mut span in latest_vv.sub_iter(start_vv) {
503 let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
504 span.counter.start = span.counter.start.max(counter_lower_bound);
505 span.counter.end = span.counter.end.max(counter_lower_bound);
506 if span.counter.start >= span.counter.end {
507 continue;
508 }
509
510 for c in self.iter_changes(span) {
513 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
514 as usize)
515 .min(c.atom_len());
516 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
517 as usize)
518 .min(c.atom_len());
519
520 assert_ne!(start, end);
521 let ch = c.slice(start, end);
522 new_store.insert_change(ch, false, false);
523 }
524 }
525
526 let arena = &self.arena;
527 encode_blocks_in_store(new_store, arena, w);
528 }
529
530 pub(crate) fn fork_changes_up_to(
531 &self,
532 start_vv: &ImVersionVector,
533 frontiers: &Frontiers,
534 vv: &VersionVector,
535 ) -> Bytes {
536 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
537 for mut span in vv.sub_iter_im(start_vv) {
538 let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
539 span.counter.start = span.counter.start.max(counter_lower_bound);
540 span.counter.end = span.counter.end.max(counter_lower_bound);
541 if span.counter.start >= span.counter.end {
542 continue;
543 }
544
545 for c in self.iter_changes(span) {
548 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
549 as usize)
550 .min(c.atom_len());
551 let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
552 as usize)
553 .min(c.atom_len());
554
555 assert_ne!(start, end);
556 let ch = c.slice(start, end);
557 new_store.insert_change(ch, false, false);
558 }
559 }
560
561 new_store.encode_all(vv, frontiers)
562 }
563}
564
565fn encode_blocks_in_store<W: std::io::Write>(
566 new_store: ChangeStore,
567 arena: &SharedArena,
568 w: &mut W,
569) {
570 let mut inner = new_store.inner.lock().unwrap();
571 for (_id, block) in inner.mem_parsed_kv.iter_mut() {
572 let bytes = block.to_bytes(arena);
573 leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
574 w.write_all(&bytes.bytes).unwrap();
575 }
576}
577
578mod mut_external_kv {
579 use super::*;
582
583 impl ChangeStore {
584 #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
585 pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
586 let mut kv_store = self.external_kv.lock().unwrap();
587 assert!(
588 kv_store.len() <= 2,
590 "kv store should be empty when using decode_all"
591 );
592 kv_store
593 .import_all(bytes)
594 .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
595 let vv_bytes = kv_store.get(VV_KEY).unwrap_or_default();
596 let vv = VersionVector::decode(&vv_bytes).unwrap();
597 let start_vv_bytes = kv_store.get(START_VV_KEY).unwrap_or_default();
598 let start_vv = if start_vv_bytes.is_empty() {
599 Default::default()
600 } else {
601 VersionVector::decode(&start_vv_bytes).unwrap()
602 };
603
604 #[cfg(test)]
605 {
606 drop(kv_store);
608 for (peer, cnt) in vv.iter() {
609 self.get_change(ID::new(*peer, *cnt - 1)).unwrap();
610 }
611 kv_store = self.external_kv.lock().unwrap();
612 }
613
614 *self.external_vv.lock().unwrap() = vv.clone();
615 let frontiers_bytes = kv_store.get(FRONTIERS_KEY).unwrap_or_default();
616 let frontiers = Frontiers::decode(&frontiers_bytes).unwrap();
617 let start_frontiers = kv_store.get(START_FRONTIERS_KEY).unwrap_or_default();
618 let start_frontiers = if start_frontiers.is_empty() {
619 Default::default()
620 } else {
621 Frontiers::decode(&start_frontiers).unwrap()
622 };
623
624 let mut max_lamport = None;
625 let mut max_timestamp = 0;
626 drop(kv_store);
627 for id in frontiers.iter() {
628 let c = self.get_change(id).unwrap();
629 debug_assert_ne!(c.atom_len(), 0);
630 let l = c.lamport_last();
631 if let Some(x) = max_lamport {
632 if l > x {
633 max_lamport = Some(l);
634 }
635 } else {
636 max_lamport = Some(l);
637 }
638
639 let t = c.timestamp;
640 if t > max_timestamp {
641 max_timestamp = t;
642 }
643 }
644
645 Ok(BatchDecodeInfo {
646 vv,
647 frontiers,
648 start_version: if start_vv.is_empty() {
649 None
650 } else {
651 let mut inner = self.inner.lock().unwrap();
652 inner.start_frontiers = start_frontiers.clone();
653 inner.start_vv = ImVersionVector::from_vv(&start_vv);
654 Some((start_vv, start_frontiers))
655 },
656 })
657 }
658
659 pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
661 let mut inner = self.inner.lock().unwrap();
662 let mut store = self.external_kv.lock().unwrap();
663 let mut external_vv = self.external_vv.lock().unwrap();
664 for (id, block) in inner.mem_parsed_kv.iter_mut() {
665 if !block.flushed {
666 let id_bytes = id.to_bytes();
667 let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
668 assert!(
669 counter_start < block.counter_range.1,
670 "Peer={} Block Counter Range={:?}, counter_start={}",
671 id.peer,
672 &block.counter_range,
673 counter_start
674 );
675 if counter_start > block.counter_range.0 {
676 assert!(store.get(&id_bytes).is_some());
677 }
678 external_vv.insert(id.peer, block.counter_range.1);
679 let bytes = block.to_bytes(&self.arena);
680 store.set(&id_bytes, bytes.bytes);
681 Arc::make_mut(block).flushed = true;
682 }
683 }
684
685 if inner.start_vv.is_empty() {
686 assert_eq!(&*external_vv, vv);
687 } else {
688 #[cfg(debug_assertions)]
689 {
690 }
692 }
693 let vv_bytes = vv.encode();
694 let frontiers_bytes = frontiers.encode();
695 store.set(VV_KEY, vv_bytes.into());
696 store.set(FRONTIERS_KEY, frontiers_bytes.into());
697 }
698 }
699}
700
701mod mut_inner_kv {
702 use super::*;
706 impl ChangeStore {
707 pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool, is_local: bool) {
712 #[cfg(debug_assertions)]
713 {
714 let vv = self.external_vv.lock().unwrap();
715 assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
716 }
717
718 let s = info_span!("change_store insert_change", id = ?change.id);
719 let _e = s.enter();
720 let estimated_size = change.estimate_storage_size();
721 if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
722 self.split_change_then_insert(change);
723 return;
724 }
725
726 let id = change.id;
727 let mut inner = self.inner.lock().unwrap();
728
729 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
731 if block.peer == change.id.peer {
732 if block.counter_range.1 != change.id.counter {
733 panic!("counter should be continuous")
734 }
735
736 match block.push_change(
737 change,
738 estimated_size,
739 if is_local {
740 self.merge_interval
743 .load(std::sync::atomic::Ordering::Acquire)
744 } else {
745 0
746 },
747 &self.arena,
748 ) {
749 Ok(_) => {
750 drop(inner);
751 debug_assert!(self.get_change(id).is_some());
752 return;
753 }
754 Err(c) => change = c,
755 }
756 }
757 }
758
759 inner
760 .mem_parsed_kv
761 .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
762 drop(inner);
763 debug_assert!(self.get_change(id).is_some());
764 }
765
766 pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
767 let block = self.get_parsed_block(id)?;
768 Some(BlockChangeRef {
769 change_index: block.get_change_index_by_counter(id.counter).unwrap(),
770 block: block.clone(),
771 })
772 }
773
774 pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
778 let mut inner = self.inner.lock().unwrap();
781 let mut iter = inner
782 .mem_parsed_kv
783 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
784
785 let mut lower_bound = 0;
787 let mut upper_bound = i32::MAX;
788 let mut is_binary_searching = false;
789 loop {
790 match iter.next_back() {
791 Some((&id, block)) => {
792 if block.lamport_range.0 <= idlp.lamport
793 && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
794 {
795 if !is_binary_searching
796 && upper_bound != i32::MAX
797 && upper_bound != block.counter_range.1
798 {
799 warn!(
800 "There is a hole between the last block and the current block"
801 );
802 break;
805 }
806
807 block
809 .ensure_changes(&self.arena)
810 .expect("Parse block error");
811 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
812 return Some(BlockChangeRef {
813 change_index: index,
814 block: block.clone(),
815 });
816 }
817
818 if is_binary_searching {
819 let mid_bound = (lower_bound + upper_bound) / 2;
820 if block.lamport_range.1 <= idlp.lamport {
821 lower_bound = mid_bound;
823 } else {
824 debug_assert!(
825 idlp.lamport < block.lamport_range.0,
826 "{} {:?}",
827 idlp,
828 &block.lamport_range
829 );
830 upper_bound = mid_bound;
832 }
833
834 let mid_bound = (lower_bound + upper_bound) / 2;
835 iter = inner
836 .mem_parsed_kv
837 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
838 } else {
839 if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
841 {
842 upper_bound = id.counter;
844 let mid_bound = (lower_bound + upper_bound) / 2;
845 iter = inner.mem_parsed_kv.range_mut(
846 ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
847 );
848 is_binary_searching = true;
849 }
850
851 upper_bound = id.counter;
852 }
853 }
854 None => {
855 if !is_binary_searching {
856 break;
857 }
858
859 let mid_bound = (lower_bound + upper_bound) / 2;
860 lower_bound = mid_bound;
861 if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
862 iter = inner.mem_parsed_kv.range_mut(
864 ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
865 );
866 is_binary_searching = false;
867 } else {
868 let mid_bound = (lower_bound + upper_bound) / 2;
869 iter = inner
870 .mem_parsed_kv
871 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
872 }
873 }
874 }
875 }
876
877 let counter_end = upper_bound;
878 let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
879
880 let (id, bytes) = 'block_scan: {
881 let kv_store = &self.external_kv.lock().unwrap();
882 let iter = kv_store
883 .scan(
884 Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
885 Bound::Excluded(&scan_end),
886 )
887 .rev();
888
889 for (id, bytes) in iter {
890 let mut block = ChangesBlockBytes::new(bytes.clone());
891 let (lamport_start, _lamport_end) = block.lamport_range();
892 if lamport_start <= idlp.lamport {
893 break 'block_scan (id, bytes);
894 }
895 }
896
897 return None;
898 };
899
900 let block_id = ID::from_bytes(&id);
901 let mut block = Arc::new(ChangesBlock::from_bytes(bytes).unwrap());
902 block
903 .ensure_changes(&self.arena)
904 .expect("Parse block error");
905 inner.mem_parsed_kv.insert(block_id, block.clone());
906 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
907 Some(BlockChangeRef {
908 change_index: index,
909 block,
910 })
911 }
912
913 fn split_change_then_insert(&self, change: Change) {
914 let original_len = change.atom_len();
915 let mut new_change = Change {
916 ops: RleVec::new(),
917 deps: change.deps,
918 id: change.id,
919 lamport: change.lamport,
920 timestamp: change.timestamp,
921 commit_msg: change.commit_msg.clone(),
922 };
923
924 let mut total_len = 0;
925 let mut estimated_size = new_change.estimate_storage_size();
926 'outer: for mut op in change.ops.into_iter() {
927 if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
928 new_change = self._insert_splitted_change(
929 new_change,
930 &mut total_len,
931 &mut estimated_size,
932 );
933 }
934
935 while let Some(end) =
936 op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
937 {
938 let new = op.slice(0, end);
940 new_change.ops.push(new);
941 new_change = self._insert_splitted_change(
942 new_change,
943 &mut total_len,
944 &mut estimated_size,
945 );
946
947 if end < op.atom_len() {
948 op = op.slice(end, op.atom_len());
949 } else {
950 continue 'outer;
951 }
952 }
953
954 estimated_size += op.estimate_storage_size();
955 if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
956 new_change = self._insert_splitted_change(
957 new_change,
958 &mut total_len,
959 &mut estimated_size,
960 );
961 new_change.ops.push(op);
962 } else {
963 new_change.ops.push(op);
964 }
965 }
966
967 if !new_change.ops.is_empty() {
968 total_len += new_change.atom_len();
969 self.insert_change(new_change, false, false);
970 }
971
972 assert_eq!(total_len, original_len);
973 }
974
975 fn _insert_splitted_change(
976 &self,
977 new_change: Change,
978 total_len: &mut usize,
979 estimated_size: &mut usize,
980 ) -> Change {
981 if new_change.atom_len() == 0 {
982 return new_change;
983 }
984
985 let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
986 let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
987 *total_len += new_change.atom_len();
988 let ans = Change {
989 ops: RleVec::new(),
990 deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
991 id: ID::new(new_change.id.peer, ctr_end),
992 lamport: next_lamport,
993 timestamp: new_change.timestamp,
994 commit_msg: new_change.commit_msg.clone(),
995 };
996
997 self.insert_change(new_change, false, false);
998 *estimated_size = ans.estimate_storage_size();
999 ans
1000 }
1001
1002 fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1003 let mut inner = self.inner.lock().unwrap();
1004 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1005 if block.peer == id.peer && block.counter_range.1 > id.counter {
1006 block
1007 .ensure_changes(&self.arena)
1008 .expect("Parse block error");
1009 return Some(block.clone());
1010 }
1011 }
1012
1013 let store = self.external_kv.lock().unwrap();
1014 let mut iter = store
1015 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1016 .filter(|(id, _)| id.len() == 12);
1017
1018 let (b_id, b_bytes) = iter.next_back()?;
1029 let block_id: ID = ID::from_bytes(&b_id[..]);
1030 let block = ChangesBlock::from_bytes(b_bytes).unwrap();
1031 if block_id.peer == id.peer
1032 && block_id.counter <= id.counter
1033 && block.counter_range.1 > id.counter
1034 {
1035 let mut arc_block = Arc::new(block);
1036 arc_block
1037 .ensure_changes(&self.arena)
1038 .expect("Parse block error");
1039 inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1040 return Some(arc_block);
1041 }
1042
1043 None
1044 }
1045
1046 pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1051 let mut whether_need_scan_backward = match start {
1052 Bound::Included(id) => Some(id),
1053 Bound::Excluded(id) => Some(id.inc(1)),
1054 Bound::Unbounded => None,
1055 };
1056
1057 {
1058 let start = start.map(|id| id.to_bytes());
1059 let end = end.map(|id| id.to_bytes());
1060 let kv = self.external_kv.lock().unwrap();
1061 let mut inner = self.inner.lock().unwrap();
1062 for (id, bytes) in kv
1063 .scan(
1064 start.as_ref().map(|x| x.as_slice()),
1065 end.as_ref().map(|x| x.as_slice()),
1066 )
1067 .filter(|(id, _)| id.len() == 12)
1068 {
1069 let id = ID::from_bytes(&id);
1070 if let Some(expected_start_id) = whether_need_scan_backward {
1071 if id == expected_start_id {
1072 whether_need_scan_backward = None;
1073 }
1074 }
1075
1076 if inner.mem_parsed_kv.contains_key(&id) {
1077 continue;
1078 }
1079
1080 let block = ChangesBlock::from_bytes(bytes.clone()).unwrap();
1081 inner.mem_parsed_kv.insert(id, Arc::new(block));
1082 }
1083 }
1084
1085 if let Some(start_id) = whether_need_scan_backward {
1086 self.ensure_id_lte(start_id);
1087 }
1088 }
1089
1090 pub(super) fn ensure_id_lte(&self, id: ID) {
1091 let kv = self.external_kv.lock().unwrap();
1092 let mut inner = self.inner.lock().unwrap();
1093 let Some((next_back_id, next_back_bytes)) = kv
1094 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1095 .filter(|(id, _)| id.len() == 12)
1096 .next_back()
1097 else {
1098 return;
1099 };
1100
1101 let next_back_id = ID::from_bytes(&next_back_id);
1102 if next_back_id.peer == id.peer {
1103 if inner.mem_parsed_kv.contains_key(&next_back_id) {
1104 return;
1105 }
1106
1107 let block = ChangesBlock::from_bytes(next_back_bytes).unwrap();
1108 inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1109 }
1110 }
1111 }
1112}
1113
1114#[must_use]
1115#[derive(Clone, Debug)]
1116pub(crate) struct BatchDecodeInfo {
1117 pub vv: VersionVector,
1118 pub frontiers: Frontiers,
1119 pub start_version: Option<(VersionVector, Frontiers)>,
1120}
1121
1122#[derive(Clone, Debug)]
1123pub struct BlockChangeRef {
1124 block: Arc<ChangesBlock>,
1125 change_index: usize,
1126}
1127
1128impl Deref for BlockChangeRef {
1129 type Target = Change;
1130 fn deref(&self) -> &Change {
1131 &self.block.content.try_changes().unwrap()[self.change_index]
1132 }
1133}
1134
1135impl BlockChangeRef {
1136 pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1137 if counter >= self.ctr_end() {
1138 return None;
1139 }
1140
1141 let index = self.ops.search_atom_index(counter);
1142 Some(BlockOpRef {
1143 block: self.block.clone(),
1144 change_index: self.change_index,
1145 op_index: index,
1146 })
1147 }
1148}
1149
1150#[derive(Clone, Debug)]
1151pub(crate) struct BlockOpRef {
1152 pub block: Arc<ChangesBlock>,
1153 pub change_index: usize,
1154 pub op_index: usize,
1155}
1156
1157impl Deref for BlockOpRef {
1158 type Target = Op;
1159
1160 fn deref(&self) -> &Op {
1161 &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1162 }
1163}
1164
1165impl BlockOpRef {
1166 pub fn lamport(&self) -> Lamport {
1167 let change = &self.block.content.try_changes().unwrap()[self.change_index];
1168 let op = &change.ops[self.op_index];
1169 (op.counter - change.id.counter) as Lamport + change.lamport
1170 }
1171}
1172
1173impl ChangesBlock {
1174 fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1175 let len = bytes.len();
1176 let mut bytes = ChangesBlockBytes::new(bytes);
1177 let peer = bytes.peer();
1178 let counter_range = bytes.counter_range();
1179 let lamport_range = bytes.lamport_range();
1180 let content = ChangesBlockContent::Bytes(bytes);
1181 Ok(Self {
1182 peer,
1183 estimated_size: len,
1184 counter_range,
1185 lamport_range,
1186 flushed: true,
1187 content,
1188 })
1189 }
1190
1191 pub(crate) fn content(&self) -> &ChangesBlockContent {
1192 &self.content
1193 }
1194
1195 fn new(change: Change, _a: &SharedArena) -> Self {
1196 let atom_len = change.atom_len();
1197 let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1198 let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1199 let estimated_size = change.estimate_storage_size();
1200 let peer = change.id.peer;
1201 let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1202 Self {
1203 peer,
1204 counter_range,
1205 lamport_range,
1206 estimated_size,
1207 content,
1208 flushed: false,
1209 }
1210 }
1211
1212 #[allow(unused)]
1213 fn cmp_id(&self, id: ID) -> Ordering {
1214 self.peer.cmp(&id.peer).then_with(|| {
1215 if self.counter_range.0 > id.counter {
1216 Ordering::Greater
1217 } else if self.counter_range.1 <= id.counter {
1218 Ordering::Less
1219 } else {
1220 Ordering::Equal
1221 }
1222 })
1223 }
1224
1225 #[allow(unused)]
1226 fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1227 self.peer.cmp(&idlp.0).then_with(|| {
1228 if self.lamport_range.0 > idlp.1 {
1229 Ordering::Greater
1230 } else if self.lamport_range.1 <= idlp.1 {
1231 Ordering::Less
1232 } else {
1233 Ordering::Equal
1234 }
1235 })
1236 }
1237
1238 #[allow(unused)]
1239 fn is_full(&self) -> bool {
1240 self.estimated_size > MAX_BLOCK_SIZE
1241 }
1242
1243 #[allow(clippy::result_large_err)]
1244 fn push_change(
1245 self: &mut Arc<Self>,
1246 change: Change,
1247 new_change_size: usize,
1248 merge_interval: i64,
1249 a: &SharedArena,
1250 ) -> Result<(), Change> {
1251 if self.counter_range.1 != change.id.counter {
1252 return Err(change);
1253 }
1254
1255 let atom_len = change.atom_len();
1256 let next_lamport = change.lamport + atom_len as Lamport;
1257 let next_counter = change.id.counter + atom_len as Counter;
1258
1259 let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1260 let this = Arc::make_mut(self);
1261 let changes = this.content.changes_mut(a).unwrap();
1262 let changes = Arc::make_mut(changes);
1263 match changes.last_mut() {
1264 Some(last)
1265 if last.can_merge_right(&change, merge_interval)
1266 && (!is_full
1267 || (change.ops.len() == 1
1268 && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1269 {
1270 for op in change.ops.into_iter() {
1271 let size = op.estimate_storage_size();
1272 if !last.ops.push(op) {
1273 this.estimated_size += size;
1274 }
1275 }
1276 }
1277 _ => {
1278 if is_full {
1279 return Err(change);
1280 } else {
1281 this.estimated_size += new_change_size;
1282 changes.push(change);
1283 }
1284 }
1285 }
1286
1287 this.flushed = false;
1288 this.counter_range.1 = next_counter;
1289 this.lamport_range.1 = next_lamport;
1290 Ok(())
1291 }
1292
1293 fn to_bytes(self: &mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1294 match &self.content {
1295 ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1296 ChangesBlockContent::Both(_, bytes) => {
1297 let bytes = bytes.clone();
1298 let this = Arc::make_mut(self);
1299 this.content = ChangesBlockContent::Bytes(bytes.clone());
1300 bytes
1301 }
1302 ChangesBlockContent::Changes(changes) => {
1303 let bytes = ChangesBlockBytes::serialize(changes, a);
1304 let this = Arc::make_mut(self);
1305 this.content = ChangesBlockContent::Bytes(bytes.clone());
1306 bytes
1307 }
1308 }
1309 }
1310
1311 fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1312 match &self.content {
1313 ChangesBlockContent::Changes(_) => Ok(()),
1314 ChangesBlockContent::Both(_, _) => Ok(()),
1315 ChangesBlockContent::Bytes(bytes) => {
1316 let changes = bytes.parse(a)?;
1317 let b = bytes.clone();
1318 let this = Arc::make_mut(self);
1319 this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1320 Ok(())
1321 }
1322 }
1323 }
1324
1325 fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1326 let changes = self.content.try_changes().unwrap();
1327 changes.binary_search_by(|c| {
1328 if c.id.counter > counter {
1329 Ordering::Greater
1330 } else if (c.id.counter + c.content_len() as Counter) <= counter {
1331 Ordering::Less
1332 } else {
1333 Ordering::Equal
1334 }
1335 })
1336 }
1337
1338 fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1339 let changes = self.content.try_changes().unwrap();
1340 let r = changes.binary_search_by(|c| {
1341 if c.lamport > lamport {
1342 Ordering::Greater
1343 } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1344 Ordering::Less
1345 } else {
1346 Ordering::Equal
1347 }
1348 });
1349
1350 match r {
1351 Ok(found) => Some(found),
1352 Err(idx) => {
1353 if idx == 0 {
1354 None
1355 } else {
1356 Some(idx - 1)
1357 }
1358 }
1359 }
1360 }
1361
1362 #[allow(unused)]
1363 fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1364 self.content.changes(a)
1365 }
1366
1367 #[allow(unused)]
1368 fn id(&self) -> ID {
1369 ID::new(self.peer, self.counter_range.0)
1370 }
1371
1372 pub fn change_num(&self) -> usize {
1373 match &self.content {
1374 ChangesBlockContent::Changes(c) => c.len(),
1375 ChangesBlockContent::Bytes(b) => b.len_changes(),
1376 ChangesBlockContent::Both(c, _) => c.len(),
1377 }
1378 }
1379}
1380
1381impl ChangesBlockContent {
1382 pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1384 let mut dag_nodes = Vec::new();
1385 match self {
1386 ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1387 for change in c.iter() {
1388 let new_node = AppDagNodeInner {
1389 peer: change.id.peer,
1390 cnt: change.id.counter,
1391 lamport: change.lamport,
1392 deps: change.deps.clone(),
1393 vv: OnceCell::new(),
1394 has_succ: false,
1395 len: change.atom_len(),
1396 }
1397 .into();
1398
1399 dag_nodes.push_rle_element(new_node);
1400 }
1401 }
1402 ChangesBlockContent::Bytes(b) => {
1403 b.ensure_header().unwrap();
1404 let header = b.header.get().unwrap();
1405 let n = header.n_changes;
1406 for i in 0..n {
1407 let new_node = AppDagNodeInner {
1408 peer: header.peer,
1409 cnt: header.counters[i],
1410 lamport: header.lamports[i],
1411 deps: header.deps_groups[i].clone(),
1412 vv: OnceCell::new(),
1413 has_succ: false,
1414 len: (header.counters[i + 1] - header.counters[i]) as usize,
1415 }
1416 .into();
1417
1418 dag_nodes.push_rle_element(new_node);
1419 }
1420 }
1421 }
1422
1423 dag_nodes
1424 }
1425
1426 #[allow(unused)]
1427 pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1428 match self {
1429 ChangesBlockContent::Changes(changes) => Ok(changes),
1430 ChangesBlockContent::Both(changes, _) => Ok(changes),
1431 ChangesBlockContent::Bytes(bytes) => {
1432 let changes = bytes.parse(a)?;
1433 *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1434 self.changes(a)
1435 }
1436 }
1437 }
1438
1439 fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1441 match self {
1442 ChangesBlockContent::Changes(changes) => Ok(changes),
1443 ChangesBlockContent::Both(changes, _) => {
1444 *self = ChangesBlockContent::Changes(std::mem::take(changes));
1445 self.changes_mut(a)
1446 }
1447 ChangesBlockContent::Bytes(bytes) => {
1448 let changes = bytes.parse(a)?;
1449 *self = ChangesBlockContent::Changes(Arc::new(changes));
1450 self.changes_mut(a)
1451 }
1452 }
1453 }
1454
1455 pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1456 match self {
1457 ChangesBlockContent::Changes(changes) => Some(changes),
1458 ChangesBlockContent::Both(changes, _) => Some(changes),
1459 ChangesBlockContent::Bytes(_) => None,
1460 }
1461 }
1462
1463 pub(crate) fn len_changes(&self) -> usize {
1464 match self {
1465 ChangesBlockContent::Changes(changes) => changes.len(),
1466 ChangesBlockContent::Both(changes, _) => changes.len(),
1467 ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1468 }
1469 }
1470}
1471
1472impl std::fmt::Debug for ChangesBlockContent {
1473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1474 match self {
1475 ChangesBlockContent::Changes(changes) => f
1476 .debug_tuple("ChangesBlockContent::Changes")
1477 .field(changes)
1478 .finish(),
1479 ChangesBlockContent::Bytes(_bytes) => {
1480 f.debug_tuple("ChangesBlockContent::Bytes").finish()
1481 }
1482 ChangesBlockContent::Both(changes, _bytes) => f
1483 .debug_tuple("ChangesBlockContent::Both")
1484 .field(changes)
1485 .finish(),
1486 }
1487 }
1488}
1489
1490impl ChangesBlockBytes {
1491 fn new(bytes: Bytes) -> Self {
1492 Self {
1493 header: OnceCell::new(),
1494 bytes,
1495 }
1496 }
1497
1498 fn ensure_header(&self) -> LoroResult<()> {
1499 self.header
1500 .get_or_init(|| Arc::new(decode_header(&self.bytes).unwrap()));
1501 Ok(())
1502 }
1503
1504 fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1505 self.ensure_header()?;
1506 let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1507 for c in ans.iter() {
1508 register_container_and_parent_link(a, c)
1510 }
1511
1512 Ok(ans)
1513 }
1514
1515 fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1516 let bytes = encode_block(changes, a);
1517 let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1519 bytes.ensure_header().unwrap();
1520 bytes
1521 }
1522
1523 fn peer(&mut self) -> PeerID {
1524 self.ensure_header().unwrap();
1525 self.header.get().as_ref().unwrap().peer
1526 }
1527
1528 fn counter_range(&mut self) -> (Counter, Counter) {
1529 if let Some(header) = self.header.get() {
1530 (header.counter, *header.counters.last().unwrap())
1531 } else {
1532 decode_block_range(&self.bytes).unwrap().0
1533 }
1534 }
1535
1536 fn lamport_range(&mut self) -> (Lamport, Lamport) {
1537 if let Some(header) = self.header.get() {
1538 (header.lamports[0], *header.lamports.last().unwrap())
1539 } else {
1540 decode_block_range(&self.bytes).unwrap().1
1541 }
1542 }
1543
1544 fn len_changes(&self) -> usize {
1546 self.ensure_header().unwrap();
1547 self.header.get().unwrap().n_changes
1548 }
1549}
1550
1551#[cfg(test)]
1552mod test {
1553 use crate::{
1554 loro::ExportMode, oplog::convert_change_to_remote, state::TreeParentId, ListHandler,
1555 LoroDoc, MovableListHandler, TextHandler, TreeHandler,
1556 };
1557
1558 use super::*;
1559
1560 fn test_encode_decode(doc: LoroDoc) {
1561 doc.commit_then_renew();
1562 let oplog = doc.oplog().lock().unwrap();
1563 let bytes = oplog
1564 .change_store
1565 .encode_all(oplog.vv(), oplog.dag.frontiers());
1566 let store = ChangeStore::new_for_test();
1567 let _ = store.import_all(bytes.clone()).unwrap();
1568 assert_eq!(store.external_kv.lock().unwrap().export_all(), bytes);
1569 let mut changes_parsed = Vec::new();
1570 let a = store.arena.clone();
1571 store.visit_all_changes(&mut |c| {
1572 changes_parsed.push(convert_change_to_remote(&a, c));
1573 });
1574 let mut changes = Vec::new();
1575 oplog.change_store.visit_all_changes(&mut |c| {
1576 changes.push(convert_change_to_remote(&oplog.arena, c));
1577 });
1578 assert_eq!(changes_parsed, changes);
1579 }
1580
1581 #[test]
1582 fn test_change_store() {
1583 let doc = LoroDoc::new_auto_commit();
1584 doc.set_record_timestamp(true);
1585 let t = doc.get_text("t");
1586 t.insert(0, "hello").unwrap();
1587 doc.commit_then_renew();
1588 let t = doc.get_list("t");
1589 t.insert(0, "hello").unwrap();
1590 test_encode_decode(doc);
1591 }
1592
1593 #[test]
1594 fn test_synced_doc() -> LoroResult<()> {
1595 let doc_a = LoroDoc::new_auto_commit();
1596 let doc_b = LoroDoc::new_auto_commit();
1597 let doc_c = LoroDoc::new_auto_commit();
1598
1599 {
1600 let map = doc_a.get_map("root");
1602 map.insert_container("text", TextHandler::new_detached())?;
1603 map.insert_container("list", ListHandler::new_detached())?;
1604 map.insert_container("tree", TreeHandler::new_detached())?;
1605 }
1606
1607 {
1608 let initial_state = doc_a.export(ExportMode::all_updates()).unwrap();
1610 doc_b.import(&initial_state)?;
1611 doc_c.import(&initial_state)?;
1612 }
1613
1614 {
1615 let map = doc_b.get_map("root");
1617 let text = map
1618 .insert_container("text", TextHandler::new_detached())
1619 .unwrap();
1620 text.insert(0, "Hello, ")?;
1621
1622 let list = map
1623 .insert_container("list", ListHandler::new_detached())
1624 .unwrap();
1625 list.push("world")?;
1626 }
1627
1628 {
1629 let map = doc_c.get_map("root");
1631 let tree = map
1632 .insert_container("tree", TreeHandler::new_detached())
1633 .unwrap();
1634 let node_id = tree.create(TreeParentId::Root)?;
1635 tree.get_meta(node_id)?.insert("key", "value")?;
1636 let node_b = tree.create(TreeParentId::Root)?;
1637 tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1638
1639 let movable_list = map
1640 .insert_container("movable", MovableListHandler::new_detached())
1641 .unwrap();
1642 movable_list.push("item1".into())?;
1643 movable_list.push("item2".into())?;
1644 movable_list.mov(0, 1)?;
1645 }
1646
1647 let b_changes = doc_b.export(ExportMode::updates(&doc_a.oplog_vv())).unwrap();
1649 doc_a.import(&b_changes)?;
1650
1651 let c_changes = doc_c.export(ExportMode::updates(&doc_a.oplog_vv())).unwrap();
1653 doc_a.import(&c_changes)?;
1654
1655 test_encode_decode(doc_a);
1656 Ok(())
1657 }
1658}