1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::{
4 arena::SharedArena,
5 change::Change,
6 estimated_size::EstimatedSize,
7 kv_store::KvStore,
8 op::Op,
9 parent::register_container_and_parent_link,
10 version::{Frontiers, ImVersionVector},
11 VersionVector,
12};
13use block_encode::decode_block_range;
14use bytes::Bytes;
15use itertools::Itertools;
16use loro_common::{
17 Counter, HasCounter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport,
18 LoroError, LoroResult, PeerID, ID,
19};
20use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
21use once_cell::sync::OnceCell;
22use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
23use std::{
24 cmp::Ordering,
25 collections::{BTreeMap, VecDeque},
26 ops::{Bound, Deref},
27 sync::{atomic::AtomicI64, Arc, Mutex},
28};
29use tracing::{debug, info_span, trace, warn};
30
31mod block_encode;
32mod block_meta_encode;
33pub(super) mod iter;
34
35#[cfg(not(test))]
36const MAX_BLOCK_SIZE: usize = 1024 * 4;
37#[cfg(test)]
38const MAX_BLOCK_SIZE: usize = 128;
39
40#[derive(Debug, Clone)]
60pub struct ChangeStore {
61 inner: Arc<Mutex<ChangeStoreInner>>,
62 arena: SharedArena,
63 external_kv: Arc<Mutex<dyn KvStore>>,
68 external_vv: Arc<Mutex<VersionVector>>,
70 merge_interval: Arc<AtomicI64>,
71}
72
73#[derive(Debug, Clone)]
74struct ChangeStoreInner {
75 start_vv: ImVersionVector,
78 start_frontiers: Frontiers,
80 mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
82}
83
84#[derive(Debug, Clone)]
85pub(crate) struct ChangesBlock {
86 peer: PeerID,
87 counter_range: (Counter, Counter),
88 lamport_range: (Lamport, Lamport),
89 estimated_size: usize,
91 flushed: bool,
92 content: ChangesBlockContent,
93}
94
95#[derive(Clone)]
96pub(crate) enum ChangesBlockContent {
97 Changes(Arc<Vec<Change>>),
98 Bytes(ChangesBlockBytes),
99 Both(Arc<Vec<Change>>, ChangesBlockBytes),
100}
101
102#[derive(Clone)]
104pub(crate) struct ChangesBlockBytes {
105 bytes: Bytes,
106 header: OnceCell<Arc<ChangesBlockHeader>>,
107}
108
109pub const START_VV_KEY: &[u8] = b"sv";
110pub const START_FRONTIERS_KEY: &[u8] = b"sf";
111pub const VV_KEY: &[u8] = b"vv";
112pub const FRONTIERS_KEY: &[u8] = b"fr";
113
114impl ChangeStore {
115 pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
116 Self {
117 inner: Arc::new(Mutex::new(ChangeStoreInner {
118 start_vv: ImVersionVector::new(),
119 start_frontiers: Frontiers::default(),
120 mem_parsed_kv: BTreeMap::new(),
121 })),
122 arena: a.clone(),
123 external_vv: Arc::new(Mutex::new(VersionVector::new())),
124 external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
125 merge_interval,
127 }
128 }
129
130 #[cfg(test)]
131 fn new_for_test() -> Self {
132 Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
133 }
134
135 pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
136 self.flush_and_compact(vv, frontiers);
137 let mut kv = self.external_kv.try_lock().unwrap();
138 kv.export_all()
139 }
140
141 #[tracing::instrument(skip(self), level = "debug")]
142 pub(super) fn export_from(
143 &self,
144 start_vv: &VersionVector,
145 start_frontiers: &Frontiers,
146 latest_vv: &VersionVector,
147 latest_frontiers: &Frontiers,
148 ) -> Bytes {
149 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
150 for span in latest_vv.sub_iter(start_vv) {
151 for c in self.iter_changes(span) {
154 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
155 as usize)
156 .min(c.atom_len());
157 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
158 as usize)
159 .min(c.atom_len());
160
161 if start == end {
162 continue;
163 }
164
165 let ch = c.slice(start, end);
166 new_store.insert_change(ch, false, false);
167 }
168 }
169
170 debug!(
171 "start_vv={:?} start_frontiers={:?}",
172 &start_vv, start_frontiers
173 );
174 new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
175 }
176
177 pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
178 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
179 for span in spans {
180 let mut span = *span;
181 span.normalize_();
182 for c in self.iter_changes(span) {
185 let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
186 let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
187 if start == end {
188 continue;
189 }
190
191 let ch = c.slice(start, end);
192 new_store.insert_change(ch, false, false);
193 }
194 }
195
196 encode_blocks_in_store(new_store, &self.arena, w);
197 }
198
199 fn encode_from(
200 &self,
201 start_vv: &VersionVector,
202 start_frontiers: &Frontiers,
203 latest_vv: &VersionVector,
204 latest_frontiers: &Frontiers,
205 ) -> Bytes {
206 {
207 let mut store = self.external_kv.try_lock().unwrap();
208 store.set(START_VV_KEY, start_vv.encode().into());
209 store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
210 let mut inner = self.inner.try_lock().unwrap();
211 inner.start_frontiers = start_frontiers.clone();
212 inner.start_vv = ImVersionVector::from_vv(start_vv);
213 }
214 self.flush_and_compact(latest_vv, latest_frontiers);
215 self.external_kv.try_lock().unwrap().export_all()
216 }
217
218 pub(crate) fn decode_snapshot_for_updates(
219 bytes: Bytes,
220 arena: &SharedArena,
221 self_vv: &VersionVector,
222 ) -> Result<Vec<Change>, LoroError> {
223 let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
224 let _ = change_store.import_all(bytes)?;
225 let mut changes = Vec::new();
226 change_store.visit_all_changes(&mut |c| {
227 let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
228 if c.id.counter >= cnt_threshold {
229 changes.push(c.clone());
230 return;
231 }
232
233 let change_end = c.ctr_end();
234 if change_end > cnt_threshold {
235 changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
236 }
237 });
238
239 Ok(changes)
240 }
241
242 pub(crate) fn decode_block_bytes(
243 bytes: Bytes,
244 arena: &SharedArena,
245 self_vv: &VersionVector,
246 ) -> LoroResult<Vec<Change>> {
247 let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
248 if ans.is_empty() {
249 return Ok(ans);
250 }
251
252 let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
253 ans.retain_mut(|c| {
254 if c.id.counter >= start {
255 true
256 } else if c.ctr_end() > start {
257 *c = c.slice((start - c.id.counter) as usize, c.atom_len());
258 true
259 } else {
260 false
261 }
262 });
263
264 Ok(ans)
265 }
266
267 pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
268 let block = self.get_block_that_contains(id)?;
269 Some(block.content.iter_dag_nodes())
270 }
271
272 pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
273 let block = self.get_the_last_block_of_peer(peer)?;
274 Some(block.content.iter_dag_nodes())
275 }
276
277 pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
278 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
279 let mut inner = self.inner.try_lock().unwrap();
280 for (_, block) in inner.mem_parsed_kv.iter_mut() {
281 block
282 .ensure_changes(&self.arena)
283 .expect("Parse block error");
284 for c in block.content.try_changes().unwrap() {
285 f(c);
286 }
287 }
288 }
289
290 pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
291 if id_span.counter.start == id_span.counter.end {
292 return vec![];
293 }
294
295 assert!(id_span.counter.start < id_span.counter.end);
296 self.ensure_block_loaded_in_range(
297 Bound::Included(id_span.id_start()),
298 Bound::Excluded(id_span.id_end()),
299 );
300 let mut inner = self.inner.try_lock().unwrap();
301 let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
302 match next_back {
303 None => {
304 return vec![];
305 }
306 Some(next_back) => {
307 if next_back.0.peer != id_span.peer {
308 return vec![];
309 }
310 }
311 }
312 let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
313 let ans = inner
314 .mem_parsed_kv
315 .range_mut(
316 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
317 )
318 .filter_map(|(_id, block)| {
319 if block.counter_range.1 < id_span.counter.start {
320 return None;
321 }
322
323 block
324 .ensure_changes(&self.arena)
325 .expect("Parse block error");
326 let changes = block.content.try_changes().unwrap();
327 let start;
328 let end;
329 if id_span.counter.start <= block.counter_range.0
330 && id_span.counter.end >= block.counter_range.1
331 {
332 start = 0;
333 end = changes.len();
334 } else {
335 start = block
336 .get_change_index_by_counter(id_span.counter.start)
337 .unwrap_or_else(|x| x);
338
339 match block.get_change_index_by_counter(id_span.counter.end - 1) {
340 Ok(e) => {
341 end = e + 1;
342 }
343 Err(0) => return None,
344 Err(e) => {
345 end = e;
346 }
347 }
348 }
349 if start == end {
350 return None;
351 }
352
353 Some((block.clone(), start, end))
354 })
355 .collect_vec();
357
358 ans
359 }
360
361 pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
362 let v = self.iter_blocks(id_span);
363 #[cfg(debug_assertions)]
364 {
365 if !v.is_empty() {
366 assert_eq!(v[0].0.peer, id_span.peer);
367 assert_eq!(v.last().unwrap().0.peer, id_span.peer);
368 {
369 let (block, start, _end) = v.first().unwrap();
371 let changes = block.content.try_changes().unwrap();
372 assert!(changes[*start].id.counter <= id_span.counter.start);
373 }
374 {
375 let (block, _start, end) = v.last().unwrap();
377 let changes = block.content.try_changes().unwrap();
378 assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
379 assert!(changes[*end - 1].ctr_start() < id_span.counter.end);
380 }
381 }
382 }
383
384 v.into_iter().flat_map(move |(block, start, end)| {
385 (start..end).map(move |i| BlockChangeRef {
386 change_index: i,
387 block: block.clone(),
388 })
389 })
390 }
391
392 pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
393 let mut inner = self.inner.try_lock().unwrap();
394 let start_counter = inner
395 .mem_parsed_kv
396 .range(..=id_span.id_start())
397 .next_back()
398 .map(|(id, _)| id.counter)
399 .unwrap_or(0);
400 let vec = inner
401 .mem_parsed_kv
402 .range_mut(
403 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
404 )
405 .filter_map(|(_id, block)| {
406 if block.counter_range.1 < id_span.counter.start {
407 return None;
408 }
409
410 block
411 .ensure_changes(&self.arena)
412 .expect("Parse block error");
413 Some(block.clone())
414 })
415 .collect();
417 vec
418 }
419
420 pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
421 self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
422 let inner = self.inner.try_lock().unwrap();
423 let block = inner
424 .mem_parsed_kv
425 .range(..=id)
426 .next_back()
427 .filter(|(_, block)| {
428 block.peer == id.peer
429 && block.counter_range.0 <= id.counter
430 && id.counter < block.counter_range.1
431 })
432 .map(|(_, block)| block.clone());
433
434 block
435 }
436
437 pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
438 let end_id = ID::new(peer, Counter::MAX);
439 self.ensure_id_lte(end_id);
440 let inner = self.inner.try_lock().unwrap();
441 let block = inner
442 .mem_parsed_kv
443 .range(..=end_id)
444 .next_back()
445 .filter(|(_, block)| block.peer == peer)
446 .map(|(_, block)| block.clone());
447
448 block
449 }
450
451 pub fn change_num(&self) -> usize {
452 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
453 let mut inner = self.inner.try_lock().unwrap();
454 inner
455 .mem_parsed_kv
456 .iter_mut()
457 .map(|(_, block)| block.change_num())
458 .sum()
459 }
460
461 pub fn fork(
462 &self,
463 arena: SharedArena,
464 merge_interval: Arc<AtomicI64>,
465 vv: &VersionVector,
466 frontiers: &Frontiers,
467 ) -> Self {
468 self.flush_and_compact(vv, frontiers);
469 let inner = self.inner.try_lock().unwrap();
470 Self {
471 inner: Arc::new(Mutex::new(ChangeStoreInner {
472 start_vv: inner.start_vv.clone(),
473 start_frontiers: inner.start_frontiers.clone(),
474 mem_parsed_kv: BTreeMap::new(),
475 })),
476 arena,
477 external_vv: Arc::new(Mutex::new(self.external_vv.try_lock().unwrap().clone())),
478 external_kv: self.external_kv.try_lock().unwrap().clone_store(),
479 merge_interval,
480 }
481 }
482
483 pub fn kv_size(&self) -> usize {
484 self.external_kv
485 .try_lock()
486 .unwrap()
487 .scan(Bound::Unbounded, Bound::Unbounded)
488 .map(|(k, v)| k.len() + v.len())
489 .sum()
490 }
491
492 pub(crate) fn export_blocks_from<W: std::io::Write>(
493 &self,
494 start_vv: &VersionVector,
495 shallow_since_vv: &ImVersionVector,
496 latest_vv: &VersionVector,
497 w: &mut W,
498 ) {
499 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
500 for mut span in latest_vv.sub_iter(start_vv) {
501 let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
502 span.counter.start = span.counter.start.max(counter_lower_bound);
503 span.counter.end = span.counter.end.max(counter_lower_bound);
504 if span.counter.start >= span.counter.end {
505 continue;
506 }
507
508 for c in self.iter_changes(span) {
511 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
512 as usize)
513 .min(c.atom_len());
514 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
515 as usize)
516 .min(c.atom_len());
517
518 assert_ne!(start, end);
519 let ch = c.slice(start, end);
520 new_store.insert_change(ch, false, false);
521 }
522 }
523
524 let arena = &self.arena;
525 encode_blocks_in_store(new_store, arena, w);
526 }
527
528 pub(crate) fn fork_changes_up_to(
529 &self,
530 start_vv: &ImVersionVector,
531 frontiers: &Frontiers,
532 vv: &VersionVector,
533 ) -> Bytes {
534 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
535 for mut span in vv.sub_iter_im(start_vv) {
536 let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
537 span.counter.start = span.counter.start.max(counter_lower_bound);
538 span.counter.end = span.counter.end.max(counter_lower_bound);
539 if span.counter.start >= span.counter.end {
540 continue;
541 }
542
543 for c in self.iter_changes(span) {
546 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
547 as usize)
548 .min(c.atom_len());
549 let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
550 as usize)
551 .min(c.atom_len());
552
553 assert_ne!(start, end);
554 let ch = c.slice(start, end);
555 new_store.insert_change(ch, false, false);
556 }
557 }
558
559 new_store.encode_all(vv, frontiers)
560 }
561}
562
563fn encode_blocks_in_store<W: std::io::Write>(
564 new_store: ChangeStore,
565 arena: &SharedArena,
566 w: &mut W,
567) {
568 let mut inner = new_store.inner.try_lock().unwrap();
569 for (_id, block) in inner.mem_parsed_kv.iter_mut() {
570 let bytes = block.to_bytes(arena);
571 leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
572 w.write_all(&bytes.bytes).unwrap();
573 }
574}
575
576mod mut_external_kv {
577 use super::*;
580
581 impl ChangeStore {
582 #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
583 pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
584 let mut kv_store = self.external_kv.try_lock().unwrap();
585 assert!(
586 kv_store.len() <= 2,
588 "kv store should be empty when using decode_all"
589 );
590 kv_store
591 .import_all(bytes)
592 .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
593 let vv_bytes = kv_store.get(VV_KEY).unwrap_or_default();
594 let vv = VersionVector::decode(&vv_bytes).unwrap();
595 let start_vv_bytes = kv_store.get(START_VV_KEY).unwrap_or_default();
596 let start_vv = if start_vv_bytes.is_empty() {
597 Default::default()
598 } else {
599 VersionVector::decode(&start_vv_bytes).unwrap()
600 };
601
602 #[cfg(test)]
603 {
604 drop(kv_store);
606 for (peer, cnt) in vv.iter() {
607 self.get_change(ID::new(*peer, *cnt - 1)).unwrap();
608 }
609 kv_store = self.external_kv.try_lock().unwrap();
610 }
611
612 *self.external_vv.try_lock().unwrap() = vv.clone();
613 let frontiers_bytes = kv_store.get(FRONTIERS_KEY).unwrap_or_default();
614 let frontiers = Frontiers::decode(&frontiers_bytes).unwrap();
615 let start_frontiers = kv_store.get(START_FRONTIERS_KEY).unwrap_or_default();
616 let start_frontiers = if start_frontiers.is_empty() {
617 Default::default()
618 } else {
619 Frontiers::decode(&start_frontiers).unwrap()
620 };
621
622 let mut max_lamport = None;
623 let mut max_timestamp = 0;
624 drop(kv_store);
625 for id in frontiers.iter() {
626 let c = self.get_change(id).unwrap();
627 debug_assert_ne!(c.atom_len(), 0);
628 let l = c.lamport_last();
629 if let Some(x) = max_lamport {
630 if l > x {
631 max_lamport = Some(l);
632 }
633 } else {
634 max_lamport = Some(l);
635 }
636
637 let t = c.timestamp;
638 if t > max_timestamp {
639 max_timestamp = t;
640 }
641 }
642
643 Ok(BatchDecodeInfo {
644 vv,
645 frontiers,
646 start_version: if start_vv.is_empty() {
647 None
648 } else {
649 let mut inner = self.inner.try_lock().unwrap();
650 inner.start_frontiers = start_frontiers.clone();
651 inner.start_vv = ImVersionVector::from_vv(&start_vv);
652 Some((start_vv, start_frontiers))
653 },
654 })
655 }
656
657 pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
659 let mut inner = self.inner.try_lock().unwrap();
660 let mut store = self.external_kv.try_lock().unwrap();
661 let mut external_vv = self.external_vv.try_lock().unwrap();
662 for (id, block) in inner.mem_parsed_kv.iter_mut() {
663 if !block.flushed {
664 let id_bytes = id.to_bytes();
665 let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
666 assert!(
667 counter_start < block.counter_range.1,
668 "Peer={} Block Counter Range={:?}, counter_start={}",
669 id.peer,
670 &block.counter_range,
671 counter_start
672 );
673 if counter_start > block.counter_range.0 {
674 assert!(store.get(&id_bytes).is_some());
675 }
676 external_vv.insert(id.peer, block.counter_range.1);
677 let bytes = block.to_bytes(&self.arena);
678 store.set(&id_bytes, bytes.bytes);
679 Arc::make_mut(block).flushed = true;
680 }
681 }
682
683 if inner.start_vv.is_empty() {
684 assert_eq!(&*external_vv, vv);
685 } else {
686 #[cfg(debug_assertions)]
687 {
688 }
690 }
691 let vv_bytes = vv.encode();
692 let frontiers_bytes = frontiers.encode();
693 store.set(VV_KEY, vv_bytes.into());
694 store.set(FRONTIERS_KEY, frontiers_bytes.into());
695 }
696 }
697}
698
699mod mut_inner_kv {
700 use super::*;
704 impl ChangeStore {
705 pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool, is_local: bool) {
710 #[cfg(debug_assertions)]
711 {
712 let vv = self.external_vv.try_lock().unwrap();
713 assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
714 }
715
716 let s = info_span!("change_store insert_change", id = ?change.id);
717 let _e = s.enter();
718 let estimated_size = change.estimate_storage_size();
719 if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
720 self.split_change_then_insert(change);
721 return;
722 }
723
724 let id = change.id;
725 let mut inner = self.inner.try_lock().unwrap();
726
727 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
729 if block.peer == change.id.peer {
730 if block.counter_range.1 != change.id.counter {
731 panic!("counter should be continuous")
732 }
733
734 match block.push_change(
735 change,
736 estimated_size,
737 if is_local {
738 self.merge_interval
741 .load(std::sync::atomic::Ordering::Acquire)
742 } else {
743 0
744 },
745 &self.arena,
746 ) {
747 Ok(_) => {
748 drop(inner);
749 debug_assert!(self.get_change(id).is_some());
750 return;
751 }
752 Err(c) => change = c,
753 }
754 }
755 }
756
757 inner
758 .mem_parsed_kv
759 .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
760 drop(inner);
761 debug_assert!(self.get_change(id).is_some());
762 }
763
764 pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
765 let block = self.get_parsed_block(id)?;
766 Some(BlockChangeRef {
767 change_index: block.get_change_index_by_counter(id.counter).unwrap(),
768 block: block.clone(),
769 })
770 }
771
772 pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
776 let mut inner = self.inner.try_lock().unwrap();
779 let mut iter = inner
780 .mem_parsed_kv
781 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
782
783 let mut lower_bound = 0;
785 let mut upper_bound = i32::MAX;
786 let mut is_binary_searching = false;
787 loop {
788 match iter.next_back() {
789 Some((&id, block)) => {
790 if block.lamport_range.0 <= idlp.lamport
791 && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
792 {
793 if !is_binary_searching
794 && upper_bound != i32::MAX
795 && upper_bound != block.counter_range.1
796 {
797 warn!(
798 "There is a hole between the last block and the current block"
799 );
800 break;
803 }
804
805 block
807 .ensure_changes(&self.arena)
808 .expect("Parse block error");
809 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
810 return Some(BlockChangeRef {
811 change_index: index,
812 block: block.clone(),
813 });
814 }
815
816 if is_binary_searching {
817 let mid_bound = (lower_bound + upper_bound) / 2;
818 if block.lamport_range.1 <= idlp.lamport {
819 lower_bound = mid_bound;
821 } else {
822 debug_assert!(
823 idlp.lamport < block.lamport_range.0,
824 "{} {:?}",
825 idlp,
826 &block.lamport_range
827 );
828 upper_bound = mid_bound;
830 }
831
832 let mid_bound = (lower_bound + upper_bound) / 2;
833 iter = inner
834 .mem_parsed_kv
835 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
836 } else {
837 if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
839 {
840 upper_bound = id.counter;
842 let mid_bound = (lower_bound + upper_bound) / 2;
843 iter = inner.mem_parsed_kv.range_mut(
844 ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
845 );
846 trace!("Use binary search");
847 is_binary_searching = true;
848 }
849
850 upper_bound = id.counter;
851 }
852 }
853 None => {
854 if !is_binary_searching {
855 trace!("No parsed block found, break");
856 break;
857 }
858
859 let mid_bound = (lower_bound + upper_bound) / 2;
860 lower_bound = mid_bound;
861 if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
862 iter = inner.mem_parsed_kv.range_mut(
864 ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
865 );
866 is_binary_searching = false;
867 } else {
868 let mid_bound = (lower_bound + upper_bound) / 2;
869 iter = inner
870 .mem_parsed_kv
871 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
872 }
873 }
874 }
875 }
876
877 let counter_end = upper_bound;
878 let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
879 trace!(
880 "Scanning from {:?} to {:?}",
881 &ID::new(idlp.peer, 0),
882 &counter_end
883 );
884
885 let (id, bytes) = 'block_scan: {
886 let kv_store = &self.external_kv.try_lock().unwrap();
887 let iter = kv_store
888 .scan(
889 Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
890 Bound::Excluded(&scan_end),
891 )
892 .rev();
893
894 for (id, bytes) in iter {
895 let mut block = ChangesBlockBytes::new(bytes.clone());
896 trace!(
897 "Scanning block counter_range={:?} lamport_range={:?} id={:?}",
898 &block.counter_range(),
899 &block.lamport_range(),
900 &ID::from_bytes(&id)
901 );
902 let (lamport_start, _lamport_end) = block.lamport_range();
903 if lamport_start <= idlp.lamport {
904 break 'block_scan (id, bytes);
905 }
906 }
907
908 return None;
909 };
910
911 let block_id = ID::from_bytes(&id);
912 let mut block = Arc::new(ChangesBlock::from_bytes(bytes).unwrap());
913 block
914 .ensure_changes(&self.arena)
915 .expect("Parse block error");
916 inner.mem_parsed_kv.insert(block_id, block.clone());
917 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
918 Some(BlockChangeRef {
919 change_index: index,
920 block,
921 })
922 }
923
924 fn split_change_then_insert(&self, change: Change) {
925 let original_len = change.atom_len();
926 let mut new_change = Change {
927 ops: RleVec::new(),
928 deps: change.deps,
929 id: change.id,
930 lamport: change.lamport,
931 timestamp: change.timestamp,
932 commit_msg: change.commit_msg.clone(),
933 };
934
935 let mut total_len = 0;
936 let mut estimated_size = new_change.estimate_storage_size();
937 'outer: for mut op in change.ops.into_iter() {
938 if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
939 new_change = self._insert_splitted_change(
940 new_change,
941 &mut total_len,
942 &mut estimated_size,
943 );
944 }
945
946 while let Some(end) =
947 op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
948 {
949 let new = op.slice(0, end);
951 new_change.ops.push(new);
952 new_change = self._insert_splitted_change(
953 new_change,
954 &mut total_len,
955 &mut estimated_size,
956 );
957
958 if end < op.atom_len() {
959 op = op.slice(end, op.atom_len());
960 } else {
961 continue 'outer;
962 }
963 }
964
965 estimated_size += op.estimate_storage_size();
966 if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
967 new_change = self._insert_splitted_change(
968 new_change,
969 &mut total_len,
970 &mut estimated_size,
971 );
972 new_change.ops.push(op);
973 } else {
974 new_change.ops.push(op);
975 }
976 }
977
978 if !new_change.ops.is_empty() {
979 total_len += new_change.atom_len();
980 self.insert_change(new_change, false, false);
981 }
982
983 assert_eq!(total_len, original_len);
984 }
985
986 fn _insert_splitted_change(
987 &self,
988 new_change: Change,
989 total_len: &mut usize,
990 estimated_size: &mut usize,
991 ) -> Change {
992 if new_change.atom_len() == 0 {
993 return new_change;
994 }
995
996 let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
997 let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
998 *total_len += new_change.atom_len();
999 let ans = Change {
1000 ops: RleVec::new(),
1001 deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
1002 id: ID::new(new_change.id.peer, ctr_end),
1003 lamport: next_lamport,
1004 timestamp: new_change.timestamp,
1005 commit_msg: new_change.commit_msg.clone(),
1006 };
1007
1008 self.insert_change(new_change, false, false);
1009 *estimated_size = ans.estimate_storage_size();
1010 ans
1011 }
1012
1013 fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1014 let mut inner = self.inner.try_lock().unwrap();
1015 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1017 if block.peer == id.peer && block.counter_range.1 > id.counter {
1018 block
1019 .ensure_changes(&self.arena)
1020 .expect("Parse block error");
1021 return Some(block.clone());
1022 }
1023 }
1024
1025 let store = self.external_kv.try_lock().unwrap();
1026 let mut iter = store
1027 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1028 .filter(|(id, _)| id.len() == 12);
1029
1030 let (b_id, b_bytes) = iter.next_back()?;
1041 let block_id: ID = ID::from_bytes(&b_id[..]);
1042 let block = ChangesBlock::from_bytes(b_bytes).unwrap();
1043 trace!(
1044 "block_id={:?} id={:?} counter_range={:?}",
1045 block_id,
1046 id,
1047 block.counter_range
1048 );
1049 if block_id.peer == id.peer
1050 && block_id.counter <= id.counter
1051 && block.counter_range.1 > id.counter
1052 {
1053 let mut arc_block = Arc::new(block);
1054 arc_block
1055 .ensure_changes(&self.arena)
1056 .expect("Parse block error");
1057 inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1058 return Some(arc_block);
1059 }
1060
1061 None
1062 }
1063
1064 pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1069 let mut whether_need_scan_backward = match start {
1070 Bound::Included(id) => Some(id),
1071 Bound::Excluded(id) => Some(id.inc(1)),
1072 Bound::Unbounded => None,
1073 };
1074
1075 {
1076 let start = start.map(|id| id.to_bytes());
1077 let end = end.map(|id| id.to_bytes());
1078 let kv = self.external_kv.try_lock().unwrap();
1079 let mut inner = self.inner.try_lock().unwrap();
1080 for (id, bytes) in kv
1081 .scan(
1082 start.as_ref().map(|x| x.as_slice()),
1083 end.as_ref().map(|x| x.as_slice()),
1084 )
1085 .filter(|(id, _)| id.len() == 12)
1086 {
1087 let id = ID::from_bytes(&id);
1088 if let Some(expected_start_id) = whether_need_scan_backward {
1089 if id == expected_start_id {
1090 whether_need_scan_backward = None;
1091 }
1092 }
1093
1094 if inner.mem_parsed_kv.contains_key(&id) {
1095 continue;
1096 }
1097
1098 let block = ChangesBlock::from_bytes(bytes.clone()).unwrap();
1099 inner.mem_parsed_kv.insert(id, Arc::new(block));
1100 }
1101 }
1102
1103 if let Some(start_id) = whether_need_scan_backward {
1104 self.ensure_id_lte(start_id);
1105 }
1106 }
1107
1108 pub(super) fn ensure_id_lte(&self, id: ID) {
1109 let kv = self.external_kv.try_lock().unwrap();
1110 let mut inner = self.inner.try_lock().unwrap();
1111 let Some((next_back_id, next_back_bytes)) = kv
1112 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1113 .filter(|(id, _)| id.len() == 12)
1114 .next_back()
1115 else {
1116 return;
1117 };
1118
1119 let next_back_id = ID::from_bytes(&next_back_id);
1120 if next_back_id.peer == id.peer {
1121 if inner.mem_parsed_kv.contains_key(&next_back_id) {
1122 return;
1123 }
1124
1125 let block = ChangesBlock::from_bytes(next_back_bytes).unwrap();
1126 inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1127 }
1128 }
1129 }
1130}
1131
1132#[must_use]
1133#[derive(Clone, Debug)]
1134pub(crate) struct BatchDecodeInfo {
1135 pub vv: VersionVector,
1136 pub frontiers: Frontiers,
1137 pub start_version: Option<(VersionVector, Frontiers)>,
1138}
1139
1140#[derive(Clone, Debug)]
1141pub struct BlockChangeRef {
1142 block: Arc<ChangesBlock>,
1143 change_index: usize,
1144}
1145
1146impl Deref for BlockChangeRef {
1147 type Target = Change;
1148 fn deref(&self) -> &Change {
1149 &self.block.content.try_changes().unwrap()[self.change_index]
1150 }
1151}
1152
1153impl BlockChangeRef {
1154 pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1155 if counter >= self.ctr_end() {
1156 return None;
1157 }
1158
1159 let index = self.ops.search_atom_index(counter);
1160 Some(BlockOpRef {
1161 block: self.block.clone(),
1162 change_index: self.change_index,
1163 op_index: index,
1164 })
1165 }
1166}
1167
1168#[derive(Clone, Debug)]
1169pub(crate) struct BlockOpRef {
1170 pub block: Arc<ChangesBlock>,
1171 pub change_index: usize,
1172 pub op_index: usize,
1173}
1174
1175impl Deref for BlockOpRef {
1176 type Target = Op;
1177
1178 fn deref(&self) -> &Op {
1179 &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1180 }
1181}
1182
1183impl BlockOpRef {
1184 pub fn lamport(&self) -> Lamport {
1185 let change = &self.block.content.try_changes().unwrap()[self.change_index];
1186 let op = &change.ops[self.op_index];
1187 (op.counter - change.id.counter) as Lamport + change.lamport
1188 }
1189}
1190
1191impl ChangesBlock {
1192 fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1193 let len = bytes.len();
1194 let mut bytes = ChangesBlockBytes::new(bytes);
1195 let peer = bytes.peer();
1196 let counter_range = bytes.counter_range();
1197 let lamport_range = bytes.lamport_range();
1198 let content = ChangesBlockContent::Bytes(bytes);
1199 Ok(Self {
1200 peer,
1201 estimated_size: len,
1202 counter_range,
1203 lamport_range,
1204 flushed: true,
1205 content,
1206 })
1207 }
1208
1209 pub(crate) fn content(&self) -> &ChangesBlockContent {
1210 &self.content
1211 }
1212
1213 fn new(change: Change, _a: &SharedArena) -> Self {
1214 let atom_len = change.atom_len();
1215 let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1216 let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1217 let estimated_size = change.estimate_storage_size();
1218 let peer = change.id.peer;
1219 let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1220 Self {
1221 peer,
1222 counter_range,
1223 lamport_range,
1224 estimated_size,
1225 content,
1226 flushed: false,
1227 }
1228 }
1229
1230 #[allow(unused)]
1231 fn cmp_id(&self, id: ID) -> Ordering {
1232 self.peer.cmp(&id.peer).then_with(|| {
1233 if self.counter_range.0 > id.counter {
1234 Ordering::Greater
1235 } else if self.counter_range.1 <= id.counter {
1236 Ordering::Less
1237 } else {
1238 Ordering::Equal
1239 }
1240 })
1241 }
1242
1243 #[allow(unused)]
1244 fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1245 self.peer.cmp(&idlp.0).then_with(|| {
1246 if self.lamport_range.0 > idlp.1 {
1247 Ordering::Greater
1248 } else if self.lamport_range.1 <= idlp.1 {
1249 Ordering::Less
1250 } else {
1251 Ordering::Equal
1252 }
1253 })
1254 }
1255
1256 #[allow(unused)]
1257 fn is_full(&self) -> bool {
1258 self.estimated_size > MAX_BLOCK_SIZE
1259 }
1260
1261 fn push_change(
1262 self: &mut Arc<Self>,
1263 change: Change,
1264 new_change_size: usize,
1265 merge_interval: i64,
1266 a: &SharedArena,
1267 ) -> Result<(), Change> {
1268 if self.counter_range.1 != change.id.counter {
1269 return Err(change);
1270 }
1271
1272 let atom_len = change.atom_len();
1273 let next_lamport = change.lamport + atom_len as Lamport;
1274 let next_counter = change.id.counter + atom_len as Counter;
1275
1276 let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1277 let this = Arc::make_mut(self);
1278 let changes = this.content.changes_mut(a).unwrap();
1279 let changes = Arc::make_mut(changes);
1280 match changes.last_mut() {
1281 Some(last)
1282 if last.can_merge_right(&change, merge_interval)
1283 && (!is_full
1284 || (change.ops.len() == 1
1285 && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1286 {
1287 for op in change.ops.into_iter() {
1288 let size = op.estimate_storage_size();
1289 if !last.ops.push(op) {
1290 this.estimated_size += size;
1291 }
1292 }
1293 }
1294 _ => {
1295 if is_full {
1296 return Err(change);
1297 } else {
1298 this.estimated_size += new_change_size;
1299 changes.push(change);
1300 }
1301 }
1302 }
1303
1304 this.flushed = false;
1305 this.counter_range.1 = next_counter;
1306 this.lamport_range.1 = next_lamport;
1307 Ok(())
1308 }
1309
1310 fn to_bytes<'a>(self: &'a mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1311 match &self.content {
1312 ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1313 ChangesBlockContent::Both(_, bytes) => {
1314 let bytes = bytes.clone();
1315 let this = Arc::make_mut(self);
1316 this.content = ChangesBlockContent::Bytes(bytes.clone());
1317 bytes
1318 }
1319 ChangesBlockContent::Changes(changes) => {
1320 let bytes = ChangesBlockBytes::serialize(changes, a);
1321 let this = Arc::make_mut(self);
1322 this.content = ChangesBlockContent::Bytes(bytes.clone());
1323 bytes
1324 }
1325 }
1326 }
1327
1328 fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1329 match &self.content {
1330 ChangesBlockContent::Changes(_) => Ok(()),
1331 ChangesBlockContent::Both(_, _) => Ok(()),
1332 ChangesBlockContent::Bytes(bytes) => {
1333 let changes = bytes.parse(a)?;
1334 let b = bytes.clone();
1335 let this = Arc::make_mut(self);
1336 this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1337 Ok(())
1338 }
1339 }
1340 }
1341
1342 fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1343 let changes = self.content.try_changes().unwrap();
1344 changes.binary_search_by(|c| {
1345 if c.id.counter > counter {
1346 Ordering::Greater
1347 } else if (c.id.counter + c.content_len() as Counter) <= counter {
1348 Ordering::Less
1349 } else {
1350 Ordering::Equal
1351 }
1352 })
1353 }
1354
1355 fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1356 let changes = self.content.try_changes().unwrap();
1357 let r = changes.binary_search_by(|c| {
1358 if c.lamport > lamport {
1359 Ordering::Greater
1360 } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1361 Ordering::Less
1362 } else {
1363 Ordering::Equal
1364 }
1365 });
1366
1367 match r {
1368 Ok(found) => Some(found),
1369 Err(idx) => {
1370 if idx == 0 {
1371 None
1372 } else {
1373 Some(idx - 1)
1374 }
1375 }
1376 }
1377 }
1378
1379 #[allow(unused)]
1380 fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1381 self.content.changes(a)
1382 }
1383
1384 #[allow(unused)]
1385 fn id(&self) -> ID {
1386 ID::new(self.peer, self.counter_range.0)
1387 }
1388
1389 pub fn change_num(&self) -> usize {
1390 match &self.content {
1391 ChangesBlockContent::Changes(c) => c.len(),
1392 ChangesBlockContent::Bytes(b) => b.len_changes(),
1393 ChangesBlockContent::Both(c, _) => c.len(),
1394 }
1395 }
1396}
1397
1398impl ChangesBlockContent {
1399 pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1401 let mut dag_nodes = Vec::new();
1402 match self {
1403 ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1404 for change in c.iter() {
1405 let new_node = AppDagNodeInner {
1406 peer: change.id.peer,
1407 cnt: change.id.counter,
1408 lamport: change.lamport,
1409 deps: change.deps.clone(),
1410 vv: OnceCell::new(),
1411 has_succ: false,
1412 len: change.atom_len(),
1413 }
1414 .into();
1415
1416 dag_nodes.push_rle_element(new_node);
1417 }
1418 }
1419 ChangesBlockContent::Bytes(b) => {
1420 b.ensure_header().unwrap();
1421 let header = b.header.get().unwrap();
1422 let n = header.n_changes;
1423 for i in 0..n {
1424 let new_node = AppDagNodeInner {
1425 peer: header.peer,
1426 cnt: header.counters[i],
1427 lamport: header.lamports[i],
1428 deps: header.deps_groups[i].clone(),
1429 vv: OnceCell::new(),
1430 has_succ: false,
1431 len: (header.counters[i + 1] - header.counters[i]) as usize,
1432 }
1433 .into();
1434
1435 dag_nodes.push_rle_element(new_node);
1436 }
1437 }
1438 }
1439
1440 dag_nodes
1441 }
1442
1443 #[allow(unused)]
1444 pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1445 match self {
1446 ChangesBlockContent::Changes(changes) => Ok(changes),
1447 ChangesBlockContent::Both(changes, _) => Ok(changes),
1448 ChangesBlockContent::Bytes(bytes) => {
1449 let changes = bytes.parse(a)?;
1450 *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1451 self.changes(a)
1452 }
1453 }
1454 }
1455
1456 fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1458 match self {
1459 ChangesBlockContent::Changes(changes) => Ok(changes),
1460 ChangesBlockContent::Both(changes, _) => {
1461 *self = ChangesBlockContent::Changes(std::mem::take(changes));
1462 self.changes_mut(a)
1463 }
1464 ChangesBlockContent::Bytes(bytes) => {
1465 let changes = bytes.parse(a)?;
1466 *self = ChangesBlockContent::Changes(Arc::new(changes));
1467 self.changes_mut(a)
1468 }
1469 }
1470 }
1471
1472 pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1473 match self {
1474 ChangesBlockContent::Changes(changes) => Some(changes),
1475 ChangesBlockContent::Both(changes, _) => Some(changes),
1476 ChangesBlockContent::Bytes(_) => None,
1477 }
1478 }
1479
1480 pub(crate) fn len_changes(&self) -> usize {
1481 match self {
1482 ChangesBlockContent::Changes(changes) => changes.len(),
1483 ChangesBlockContent::Both(changes, _) => changes.len(),
1484 ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1485 }
1486 }
1487}
1488
1489impl std::fmt::Debug for ChangesBlockContent {
1490 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1491 match self {
1492 ChangesBlockContent::Changes(changes) => f
1493 .debug_tuple("ChangesBlockContent::Changes")
1494 .field(changes)
1495 .finish(),
1496 ChangesBlockContent::Bytes(_bytes) => {
1497 f.debug_tuple("ChangesBlockContent::Bytes").finish()
1498 }
1499 ChangesBlockContent::Both(changes, _bytes) => f
1500 .debug_tuple("ChangesBlockContent::Both")
1501 .field(changes)
1502 .finish(),
1503 }
1504 }
1505}
1506
1507impl ChangesBlockBytes {
1508 fn new(bytes: Bytes) -> Self {
1509 Self {
1510 header: OnceCell::new(),
1511 bytes,
1512 }
1513 }
1514
1515 fn ensure_header(&self) -> LoroResult<()> {
1516 self.header
1517 .get_or_init(|| Arc::new(decode_header(&self.bytes).unwrap()));
1518 Ok(())
1519 }
1520
1521 fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1522 self.ensure_header()?;
1523 let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1524 for c in ans.iter() {
1525 register_container_and_parent_link(a, c)
1527 }
1528
1529 Ok(ans)
1530 }
1531
1532 fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1533 let bytes = encode_block(changes, a);
1534 let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1536 bytes.ensure_header().unwrap();
1537 bytes
1538 }
1539
1540 fn peer(&mut self) -> PeerID {
1541 self.ensure_header().unwrap();
1542 self.header.get().as_ref().unwrap().peer
1543 }
1544
1545 fn counter_range(&mut self) -> (Counter, Counter) {
1546 if let Some(header) = self.header.get() {
1547 (header.counter, *header.counters.last().unwrap())
1548 } else {
1549 decode_block_range(&self.bytes).unwrap().0
1550 }
1551 }
1552
1553 fn lamport_range(&mut self) -> (Lamport, Lamport) {
1554 if let Some(header) = self.header.get() {
1555 (header.lamports[0], *header.lamports.last().unwrap())
1556 } else {
1557 decode_block_range(&self.bytes).unwrap().1
1558 }
1559 }
1560
1561 fn len_changes(&self) -> usize {
1563 self.ensure_header().unwrap();
1564 self.header.get().unwrap().n_changes
1565 }
1566}
1567
1568#[cfg(test)]
1569mod test {
1570 use crate::{
1571 oplog::convert_change_to_remote, state::TreeParentId, ListHandler, LoroDoc,
1572 MovableListHandler, TextHandler, TreeHandler,
1573 };
1574
1575 use super::*;
1576
1577 fn test_encode_decode(doc: LoroDoc) {
1578 doc.commit_then_renew();
1579 let oplog = doc.oplog().try_lock().unwrap();
1580 let bytes = oplog
1581 .change_store
1582 .encode_all(oplog.vv(), oplog.dag.frontiers());
1583 let store = ChangeStore::new_for_test();
1584 let _ = store.import_all(bytes.clone()).unwrap();
1585 assert_eq!(store.external_kv.try_lock().unwrap().export_all(), bytes);
1586 let mut changes_parsed = Vec::new();
1587 let a = store.arena.clone();
1588 store.visit_all_changes(&mut |c| {
1589 changes_parsed.push(convert_change_to_remote(&a, c));
1590 });
1591 let mut changes = Vec::new();
1592 oplog.change_store.visit_all_changes(&mut |c| {
1593 changes.push(convert_change_to_remote(&oplog.arena, c));
1594 });
1595 assert_eq!(changes_parsed, changes);
1596 }
1597
1598 #[test]
1599 fn test_change_store() {
1600 let doc = LoroDoc::new_auto_commit();
1601 doc.set_record_timestamp(true);
1602 let t = doc.get_text("t");
1603 t.insert(0, "hello").unwrap();
1604 doc.commit_then_renew();
1605 let t = doc.get_list("t");
1606 t.insert(0, "hello").unwrap();
1607 test_encode_decode(doc);
1608 }
1609
1610 #[test]
1611 fn test_synced_doc() -> LoroResult<()> {
1612 let doc_a = LoroDoc::new_auto_commit();
1613 let doc_b = LoroDoc::new_auto_commit();
1614 let doc_c = LoroDoc::new_auto_commit();
1615
1616 {
1617 let map = doc_a.get_map("root");
1619 map.insert_container("text", TextHandler::new_detached())?;
1620 map.insert_container("list", ListHandler::new_detached())?;
1621 map.insert_container("tree", TreeHandler::new_detached())?;
1622 }
1623
1624 {
1625 let initial_state = doc_a.export_from(&Default::default());
1627 doc_b.import(&initial_state)?;
1628 doc_c.import(&initial_state)?;
1629 }
1630
1631 {
1632 let map = doc_b.get_map("root");
1634 let text = map
1635 .insert_container("text", TextHandler::new_detached())
1636 .unwrap();
1637 text.insert(0, "Hello, ")?;
1638
1639 let list = map
1640 .insert_container("list", ListHandler::new_detached())
1641 .unwrap();
1642 list.push("world")?;
1643 }
1644
1645 {
1646 let map = doc_c.get_map("root");
1648 let tree = map
1649 .insert_container("tree", TreeHandler::new_detached())
1650 .unwrap();
1651 let node_id = tree.create(TreeParentId::Root)?;
1652 tree.get_meta(node_id)?.insert("key", "value")?;
1653 let node_b = tree.create(TreeParentId::Root)?;
1654 tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1655
1656 let movable_list = map
1657 .insert_container("movable", MovableListHandler::new_detached())
1658 .unwrap();
1659 movable_list.push("item1".into())?;
1660 movable_list.push("item2".into())?;
1661 movable_list.mov(0, 1)?;
1662 }
1663
1664 let b_changes = doc_b.export_from(&doc_a.oplog_vv());
1666 doc_a.import(&b_changes)?;
1667
1668 let c_changes = doc_c.export_from(&doc_a.oplog_vv());
1670 doc_a.import(&c_changes)?;
1671
1672 test_encode_decode(doc_a);
1673 Ok(())
1674 }
1675}