1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::sync::Mutex;
4use crate::{
5 arena::SharedArena,
6 change::Change,
7 estimated_size::EstimatedSize,
8 kv_store::KvStore,
9 op::Op,
10 parent::register_container_and_parent_link,
11 version::{Frontiers, ImVersionVector},
12 VersionVector,
13};
14use block_encode::decode_block_range;
15use bytes::Bytes;
16use itertools::Itertools;
17use loro_common::{
18 Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError,
19 LoroResult, PeerID, ID,
20};
21use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
22use once_cell::sync::OnceCell;
23use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
24use std::sync::atomic::AtomicI64;
25use std::{
26 cmp::Ordering,
27 collections::{BTreeMap, VecDeque},
28 ops::{Bound, Deref},
29 sync::Arc,
30};
31use tracing::{info_span, warn};
32mod block_encode;
33mod block_meta_encode;
34pub(super) mod iter;
35
36#[cfg(not(test))]
37const MAX_BLOCK_SIZE: usize = 1024 * 4;
38#[cfg(test)]
39const MAX_BLOCK_SIZE: usize = 128;
40
41#[derive(Debug, Clone)]
61pub struct ChangeStore {
62 inner: Arc<Mutex<ChangeStoreInner>>,
63 arena: SharedArena,
64 external_kv: Arc<Mutex<dyn KvStore>>,
69 external_vv: Arc<Mutex<VersionVector>>,
71 merge_interval: Arc<AtomicI64>,
72}
73
74#[derive(Debug, Clone)]
75struct ChangeStoreInner {
76 start_vv: ImVersionVector,
79 start_frontiers: Frontiers,
81 mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
83}
84
85#[derive(Debug)]
86pub(crate) struct ChangeStoreRollback {
87 old_vv: VersionVector,
88 blocks_before_mutation: BTreeMap<ID, Arc<ChangesBlock>>,
89}
90
91impl ChangeStoreRollback {
92 pub(crate) fn new(old_vv: VersionVector) -> Self {
93 Self {
94 old_vv,
95 blocks_before_mutation: BTreeMap::new(),
96 }
97 }
98
99 fn record_block_before_mutation(&mut self, id: ID, block: Arc<ChangesBlock>) {
100 let old_end = self.old_vv.get(&id.peer).copied().unwrap_or(0);
101 if id.counter >= old_end {
102 return;
103 }
104
105 self.blocks_before_mutation.entry(id).or_insert(block);
106 }
107}
108
109#[derive(Debug, Clone)]
110pub(crate) struct ChangesBlock {
111 peer: PeerID,
112 counter_range: (Counter, Counter),
113 lamport_range: (Lamport, Lamport),
114 estimated_size: usize,
116 flushed: bool,
117 content: ChangesBlockContent,
118}
119
120#[derive(Clone)]
121pub(crate) enum ChangesBlockContent {
122 Changes(Arc<Vec<Change>>),
123 Bytes(ChangesBlockBytes),
124 Both(Arc<Vec<Change>>, ChangesBlockBytes),
125}
126
127#[derive(Clone)]
129pub(crate) struct ChangesBlockBytes {
130 bytes: Bytes,
131 header: OnceCell<Arc<ChangesBlockHeader>>,
132}
133
134pub const START_VV_KEY: &[u8] = b"sv";
135pub const START_FRONTIERS_KEY: &[u8] = b"sf";
136pub const VV_KEY: &[u8] = b"vv";
137pub const FRONTIERS_KEY: &[u8] = b"fr";
138
139impl ChangeStore {
140 pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
141 Self {
142 inner: Arc::new(Mutex::new(ChangeStoreInner {
143 start_vv: ImVersionVector::new(),
144 start_frontiers: Frontiers::default(),
145 mem_parsed_kv: BTreeMap::new(),
146 })),
147 arena: a.clone(),
148 external_vv: Arc::new(Mutex::new(VersionVector::new())),
149 external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
150 merge_interval,
152 }
153 }
154
155 #[cfg(test)]
156 fn new_for_test() -> Self {
157 Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
158 }
159
160 pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
161 self.flush_and_compact(vv, frontiers);
162 let mut kv = self.external_kv.lock();
163 kv.export_all()
164 }
165
166 #[tracing::instrument(skip(self), level = "debug")]
167 pub(super) fn export_from(
168 &self,
169 start_vv: &VersionVector,
170 start_frontiers: &Frontiers,
171 latest_vv: &VersionVector,
172 latest_frontiers: &Frontiers,
173 ) -> Bytes {
174 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
175 for span in latest_vv.sub_iter(start_vv) {
176 for c in self.iter_changes(span) {
179 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
180 as usize)
181 .min(c.atom_len());
182 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
183 as usize)
184 .min(c.atom_len());
185
186 if start == end {
187 continue;
188 }
189
190 let ch = c.slice(start, end);
191 new_store.insert_change(ch, false, false);
192 }
193 }
194
195 loro_common::debug!(
196 "start_vv={:?} start_frontiers={:?}",
197 &start_vv,
198 start_frontiers
199 );
200 new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
201 }
202
203 pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
204 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
205 for span in spans {
206 let mut span = *span;
207 span.normalize_();
208 for c in self.iter_changes(span) {
211 let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
212 let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
213 if start == end {
214 continue;
215 }
216
217 let ch = c.slice(start, end);
218 new_store.insert_change(ch, false, false);
219 }
220 }
221
222 encode_blocks_in_store(new_store, &self.arena, w);
223 }
224
225 fn encode_from(
226 &self,
227 start_vv: &VersionVector,
228 start_frontiers: &Frontiers,
229 latest_vv: &VersionVector,
230 latest_frontiers: &Frontiers,
231 ) -> Bytes {
232 {
233 let mut store = self.external_kv.lock();
234 store.set(START_VV_KEY, start_vv.encode().into());
235 store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
236 let mut inner = self.inner.lock();
237 inner.start_frontiers = start_frontiers.clone();
238 inner.start_vv = ImVersionVector::from_vv(start_vv);
239 }
240 self.flush_and_compact(latest_vv, latest_frontiers);
241 self.external_kv.lock().export_all()
242 }
243
244 pub(crate) fn decode_snapshot_for_updates(
245 bytes: Bytes,
246 arena: &SharedArena,
247 self_vv: &VersionVector,
248 ) -> Result<Vec<Change>, LoroError> {
249 let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
250 let _ = change_store.import_all(bytes)?;
251 let mut changes = Vec::new();
252 change_store.visit_all_changes(&mut |c| {
253 let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
254 if c.id.counter >= cnt_threshold {
255 changes.push(c.clone());
256 return;
257 }
258
259 let change_end = c.ctr_end();
260 if change_end > cnt_threshold {
261 changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
262 }
263 });
264
265 Ok(changes)
266 }
267
268 pub(crate) fn decode_block_bytes(
269 bytes: Bytes,
270 arena: &SharedArena,
271 self_vv: &VersionVector,
272 ) -> LoroResult<Vec<Change>> {
273 let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
274 if ans.is_empty() {
275 return Ok(ans);
276 }
277
278 let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
279 ans.retain_mut(|c| {
280 if c.id.counter >= start {
281 true
282 } else if c.ctr_end() > start {
283 *c = c.slice((start - c.id.counter) as usize, c.atom_len());
284 true
285 } else {
286 false
287 }
288 });
289
290 Ok(ans)
291 }
292
293 pub(crate) fn rollback_import(&self, rollback: ChangeStoreRollback) {
294 let mut inner = self.inner.lock();
295 inner.mem_parsed_kv.retain(|id, _| {
296 let old_end = rollback.old_vv.get(&id.peer).copied().unwrap_or(0);
297 id.counter < old_end
298 });
299
300 for (id, block) in rollback.blocks_before_mutation {
301 inner.mem_parsed_kv.insert(id, block);
302 }
303 }
304
305 pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
306 let block = self.get_block_that_contains(id)?;
307 Some(block.content.iter_dag_nodes())
308 }
309
310 pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
311 let block = self.get_the_last_block_of_peer(peer)?;
312 Some(block.content.iter_dag_nodes())
313 }
314
315 pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
316 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
317 let mut inner = self.inner.lock();
318 for (_, block) in inner.mem_parsed_kv.iter_mut() {
319 block
320 .ensure_changes(&self.arena)
321 .expect("Parse block error");
322 for c in block.content.try_changes().unwrap() {
323 f(c);
324 }
325 }
326 }
327
328 pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
329 if id_span.counter.start == id_span.counter.end {
330 return vec![];
331 }
332
333 assert!(id_span.counter.start < id_span.counter.end);
334 self.ensure_block_loaded_in_range(
335 Bound::Included(id_span.id_start()),
336 Bound::Excluded(id_span.id_end()),
337 );
338 let mut inner = self.inner.lock();
339 let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
340 match next_back {
341 None => {
342 return vec![];
343 }
344 Some(next_back) => {
345 if next_back.0.peer != id_span.peer {
346 return vec![];
347 }
348 }
349 }
350 let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
351 let ans = inner
352 .mem_parsed_kv
353 .range_mut(
354 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
355 )
356 .filter_map(|(_id, block)| {
357 if block.counter_range.1 < id_span.counter.start {
358 return None;
359 }
360
361 block
362 .ensure_changes(&self.arena)
363 .expect("Parse block error");
364 let changes = block.content.try_changes().unwrap();
365 let start;
366 let end;
367 if id_span.counter.start <= block.counter_range.0
368 && id_span.counter.end >= block.counter_range.1
369 {
370 start = 0;
371 end = changes.len();
372 } else {
373 start = block
374 .get_change_index_by_counter(id_span.counter.start)
375 .unwrap_or_else(|x| x);
376
377 match block.get_change_index_by_counter(id_span.counter.end - 1) {
378 Ok(e) => {
379 end = e + 1;
380 }
381 Err(0) => return None,
382 Err(e) => {
383 end = e;
384 }
385 }
386 }
387 if start == end {
388 return None;
389 }
390
391 Some((block.clone(), start, end))
392 })
393 .collect_vec();
395
396 ans
397 }
398
399 pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
400 let v = self.iter_blocks(id_span);
401 #[cfg(debug_assertions)]
402 {
403 if !v.is_empty() {
404 assert_eq!(v[0].0.peer, id_span.peer);
405 assert_eq!(v.last().unwrap().0.peer, id_span.peer);
406 {
407 let (block, start, _end) = v.first().unwrap();
409 let changes = block.content.try_changes().unwrap();
410 assert!(changes[*start].id.counter <= id_span.counter.start);
411 }
412 {
413 let (block, _start, end) = v.last().unwrap();
415 let changes = block.content.try_changes().unwrap();
416 assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
417 assert!(changes[*end - 1].id.counter < id_span.counter.end);
418 }
419 }
420 }
421
422 v.into_iter().flat_map(move |(block, start, end)| {
423 (start..end).map(move |i| BlockChangeRef {
424 change_index: i,
425 block: block.clone(),
426 })
427 })
428 }
429
430 pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
431 let mut inner = self.inner.lock();
432 let start_counter = inner
433 .mem_parsed_kv
434 .range(..=id_span.id_start())
435 .next_back()
436 .map(|(id, _)| id.counter)
437 .unwrap_or(0);
438 let vec = inner
439 .mem_parsed_kv
440 .range_mut(
441 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
442 )
443 .filter_map(|(_id, block)| {
444 if block.counter_range.1 < id_span.counter.start {
445 return None;
446 }
447
448 block
449 .ensure_changes(&self.arena)
450 .expect("Parse block error");
451 Some(block.clone())
452 })
453 .collect();
455 vec
456 }
457
458 pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
459 self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
460 let inner = self.inner.lock();
461 let block = inner
462 .mem_parsed_kv
463 .range(..=id)
464 .next_back()
465 .filter(|(_, block)| {
466 block.peer == id.peer
467 && block.counter_range.0 <= id.counter
468 && id.counter < block.counter_range.1
469 })
470 .map(|(_, block)| block.clone());
471
472 block
473 }
474
475 pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
476 let end_id = ID::new(peer, Counter::MAX);
477 self.ensure_id_lte(end_id);
478 let inner = self.inner.lock();
479 let block = inner
480 .mem_parsed_kv
481 .range(..=end_id)
482 .next_back()
483 .filter(|(_, block)| block.peer == peer)
484 .map(|(_, block)| block.clone());
485
486 block
487 }
488
489 pub fn change_num(&self) -> usize {
490 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
491 let mut inner = self.inner.lock();
492 inner
493 .mem_parsed_kv
494 .iter_mut()
495 .map(|(_, block)| block.change_num())
496 .sum()
497 }
498
499 pub fn fork(
500 &self,
501 arena: SharedArena,
502 merge_interval: Arc<AtomicI64>,
503 vv: &VersionVector,
504 frontiers: &Frontiers,
505 ) -> Self {
506 self.flush_and_compact(vv, frontiers);
507 let inner = self.inner.lock();
508 Self {
509 inner: Arc::new(Mutex::new(ChangeStoreInner {
510 start_vv: inner.start_vv.clone(),
511 start_frontiers: inner.start_frontiers.clone(),
512 mem_parsed_kv: BTreeMap::new(),
513 })),
514 arena,
515 external_vv: Arc::new(Mutex::new(self.external_vv.lock().clone())),
516 external_kv: self.external_kv.lock().clone_store(),
517 merge_interval,
518 }
519 }
520
521 pub fn kv_size(&self) -> usize {
522 self.external_kv
523 .lock()
524 .scan(Bound::Unbounded, Bound::Unbounded)
525 .map(|(k, v)| k.len() + v.len())
526 .sum()
527 }
528
529 pub(crate) fn export_blocks_from<W: std::io::Write>(
530 &self,
531 start_vv: &VersionVector,
532 shallow_since_vv: &ImVersionVector,
533 latest_vv: &VersionVector,
534 w: &mut W,
535 ) {
536 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
537 for mut span in latest_vv.sub_iter(start_vv) {
538 let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
539 span.counter.start = span.counter.start.max(counter_lower_bound);
540 span.counter.end = span.counter.end.max(counter_lower_bound);
541 if span.counter.start >= span.counter.end {
542 continue;
543 }
544
545 for c in self.iter_changes(span) {
548 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
549 as usize)
550 .min(c.atom_len());
551 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
552 as usize)
553 .min(c.atom_len());
554
555 assert_ne!(start, end);
556 let ch = c.slice(start, end);
557 new_store.insert_change(ch, false, false);
558 }
559 }
560
561 let arena = &self.arena;
562 encode_blocks_in_store(new_store, arena, w);
563 }
564
565 pub(crate) fn fork_changes_up_to(
566 &self,
567 start_vv: &ImVersionVector,
568 frontiers: &Frontiers,
569 vv: &VersionVector,
570 ) -> Bytes {
571 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
572 for mut span in vv.sub_iter_im(start_vv) {
573 let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
574 span.counter.start = span.counter.start.max(counter_lower_bound);
575 span.counter.end = span.counter.end.max(counter_lower_bound);
576 if span.counter.start >= span.counter.end {
577 continue;
578 }
579
580 for c in self.iter_changes(span) {
583 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
584 as usize)
585 .min(c.atom_len());
586 let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
587 as usize)
588 .min(c.atom_len());
589
590 assert_ne!(start, end);
591 let ch = c.slice(start, end);
592 new_store.insert_change(ch, false, false);
593 }
594 }
595
596 new_store.encode_all(vv, frontiers)
597 }
598}
599
600fn encode_blocks_in_store<W: std::io::Write>(
601 new_store: ChangeStore,
602 arena: &SharedArena,
603 w: &mut W,
604) {
605 let mut inner = new_store.inner.lock();
606 for (_id, block) in inner.mem_parsed_kv.iter_mut() {
607 let bytes = block.to_bytes(arena);
608 leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
609 w.write_all(&bytes.bytes).unwrap();
610 }
611}
612
613mod mut_external_kv {
614 use super::*;
617
618 impl ChangeStore {
619 #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
620 pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
621 let mut kv_store = self.external_kv.lock();
622 assert!(
623 kv_store.len() <= 2,
625 "kv store should be empty when using decode_all"
626 );
627 kv_store
628 .import_all(bytes)
629 .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
630 drop(kv_store);
631 self.validate_imported_change_blocks()?;
632 let vv_bytes = self.external_kv.lock().get(VV_KEY).unwrap_or_default();
633 let vv = VersionVector::decode(&vv_bytes)
634 .map_err(|_| LoroError::DecodeDataCorruptionError)?;
635 let start_vv_bytes = self
636 .external_kv
637 .lock()
638 .get(START_VV_KEY)
639 .unwrap_or_default();
640 let start_vv = if start_vv_bytes.is_empty() {
641 Default::default()
642 } else {
643 VersionVector::decode(&start_vv_bytes)
644 .map_err(|_| LoroError::DecodeDataCorruptionError)?
645 };
646
647 #[cfg(test)]
648 {
649 for (peer, cnt) in vv.iter() {
651 self.get_change(ID::new(*peer, *cnt - 1))
652 .ok_or(LoroError::DecodeDataCorruptionError)?;
653 }
654 }
655
656 *self.external_vv.lock() = vv.clone();
657 let frontiers_bytes = self
658 .external_kv
659 .lock()
660 .get(FRONTIERS_KEY)
661 .unwrap_or_default();
662 let frontiers = Frontiers::decode(&frontiers_bytes)
663 .map_err(|_| LoroError::DecodeDataCorruptionError)?;
664 let start_frontiers = self
665 .external_kv
666 .lock()
667 .get(START_FRONTIERS_KEY)
668 .unwrap_or_default();
669 let start_frontiers = if start_frontiers.is_empty() {
670 Default::default()
671 } else {
672 Frontiers::decode(&start_frontiers)
673 .map_err(|_| LoroError::DecodeDataCorruptionError)?
674 };
675
676 let mut max_lamport = None;
677 let mut max_timestamp = 0;
678 for id in frontiers.iter() {
679 let c = self
680 .get_change(id)
681 .ok_or(LoroError::DecodeDataCorruptionError)?;
682 debug_assert_ne!(c.atom_len(), 0);
683 let l = c.lamport_last();
684 if let Some(x) = max_lamport {
685 if l > x {
686 max_lamport = Some(l);
687 }
688 } else {
689 max_lamport = Some(l);
690 }
691
692 let t = c.timestamp;
693 if t > max_timestamp {
694 max_timestamp = t;
695 }
696 }
697
698 Ok(BatchDecodeInfo {
699 vv,
700 frontiers,
701 start_version: if start_vv.is_empty() {
702 None
703 } else {
704 let mut inner = self.inner.lock();
705 inner.start_frontiers = start_frontiers.clone();
706 inner.start_vv = ImVersionVector::from_vv(&start_vv);
707 Some((start_vv, start_frontiers))
708 },
709 })
710 }
711
712 fn validate_imported_change_blocks(&self) -> LoroResult<()> {
713 let blocks: Vec<(ID, Bytes)> = {
714 let kv_store = self.external_kv.lock();
715 kv_store
716 .scan(Bound::Unbounded, Bound::Unbounded)
717 .filter(|(id, _)| id.len() == 12)
718 .map(|(id, bytes)| (ID::from_bytes(&id), bytes))
719 .collect()
720 };
721
722 for (_id, bytes) in blocks {
723 let mut block = Arc::new(ChangesBlock::from_bytes(bytes)?);
724 block.ensure_changes(&self.arena)?;
725 }
726
727 Ok(())
728 }
729
730 pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
732 let mut inner = self.inner.lock();
733 let mut store = self.external_kv.lock();
734 let mut external_vv = self.external_vv.lock();
735 for (id, block) in inner.mem_parsed_kv.iter_mut() {
736 if !block.flushed {
737 let id_bytes = id.to_bytes();
738 let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
739 assert!(
740 counter_start < block.counter_range.1,
741 "Peer={} Block Counter Range={:?}, counter_start={}",
742 id.peer,
743 &block.counter_range,
744 counter_start
745 );
746 if counter_start > block.counter_range.0 {
747 assert!(store.get(&id_bytes).is_some());
748 }
749 external_vv.insert(id.peer, block.counter_range.1);
750 let bytes = block.to_bytes(&self.arena);
751 store.set(&id_bytes, bytes.bytes);
752 Arc::make_mut(block).flushed = true;
753 }
754 }
755
756 if inner.start_vv.is_empty() {
757 assert_eq!(&*external_vv, vv);
758 } else {
759 #[cfg(debug_assertions)]
760 {
761 }
763 }
764 let vv_bytes = vv.encode();
765 let frontiers_bytes = frontiers.encode();
766 store.set(VV_KEY, vv_bytes.into());
767 store.set(FRONTIERS_KEY, frontiers_bytes.into());
768 }
769 }
770}
771
772mod mut_inner_kv {
773 use super::*;
777 impl ChangeStore {
778 pub fn insert_change(&self, change: Change, split_when_exceeds: bool, is_local: bool) {
783 self.insert_change_inner(change, split_when_exceeds, is_local, None);
784 }
785
786 pub(crate) fn insert_change_with_rollback(
787 &self,
788 change: Change,
789 split_when_exceeds: bool,
790 is_local: bool,
791 rollback: &mut ChangeStoreRollback,
792 ) {
793 self.insert_change_inner(change, split_when_exceeds, is_local, Some(rollback));
794 }
795
796 fn insert_change_inner(
797 &self,
798 mut change: Change,
799 split_when_exceeds: bool,
800 is_local: bool,
801 mut rollback: Option<&mut ChangeStoreRollback>,
802 ) {
803 #[cfg(debug_assertions)]
804 {
805 let vv = self.external_vv.lock();
806 assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
807 }
808
809 let s = info_span!("change_store insert_change", id = ?change.id);
810 let _e = s.enter();
811 let estimated_size = change.estimate_storage_size();
812 if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
813 self.split_change_then_insert(change, rollback.as_deref_mut());
814 return;
815 }
816
817 let id = change.id;
818 let mut inner = self.inner.lock();
819
820 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
822 if block.peer == change.id.peer {
823 if block.counter_range.1 != change.id.counter {
824 panic!("counter should be continuous")
825 }
826
827 if let Some(rollback) = rollback.as_deref_mut() {
828 rollback.record_block_before_mutation(*_id, block.clone());
829 }
830
831 match block.push_change(
832 change,
833 estimated_size,
834 if is_local {
835 self.merge_interval
838 .load(std::sync::atomic::Ordering::Acquire)
839 } else {
840 0
841 },
842 &self.arena,
843 ) {
844 Ok(_) => {
845 drop(inner);
846 debug_assert!(self.get_change(id).is_some());
847 return;
848 }
849 Err(c) => change = c,
850 }
851 }
852 }
853
854 inner
855 .mem_parsed_kv
856 .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
857 drop(inner);
858 debug_assert!(self.get_change(id).is_some());
859 }
860
861 pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
862 let block = self.get_parsed_block(id)?;
863 Some(BlockChangeRef {
864 change_index: block.get_change_index_by_counter(id.counter).unwrap(),
865 block: block.clone(),
866 })
867 }
868
869 pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
873 let mut inner = self.inner.lock();
876 let mut iter = inner
877 .mem_parsed_kv
878 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
879
880 let mut lower_bound = 0;
882 let mut upper_bound = i32::MAX;
883 let mut is_binary_searching = false;
884 loop {
885 match iter.next_back() {
886 Some((&id, block)) => {
887 if block.lamport_range.0 <= idlp.lamport
888 && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
889 {
890 if !is_binary_searching
891 && upper_bound != i32::MAX
892 && upper_bound != block.counter_range.1
893 {
894 warn!(
895 "There is a hole between the last block and the current block"
896 );
897 break;
900 }
901
902 block
904 .ensure_changes(&self.arena)
905 .expect("Parse block error");
906 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
907 return Some(BlockChangeRef {
908 change_index: index,
909 block: block.clone(),
910 });
911 }
912
913 if is_binary_searching {
914 let mid_bound = (lower_bound + upper_bound) / 2;
915 if block.lamport_range.1 <= idlp.lamport {
916 lower_bound = mid_bound;
918 } else {
919 debug_assert!(
920 idlp.lamport < block.lamport_range.0,
921 "{} {:?}",
922 idlp,
923 &block.lamport_range
924 );
925 upper_bound = mid_bound;
927 }
928
929 let mid_bound = (lower_bound + upper_bound) / 2;
930 iter = inner
931 .mem_parsed_kv
932 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
933 } else {
934 if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
936 {
937 upper_bound = id.counter;
939 let mid_bound = (lower_bound + upper_bound) / 2;
940 iter = inner.mem_parsed_kv.range_mut(
941 ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
942 );
943 is_binary_searching = true;
944 }
945
946 upper_bound = id.counter;
947 }
948 }
949 None => {
950 if !is_binary_searching {
951 break;
952 }
953
954 let mid_bound = (lower_bound + upper_bound) / 2;
955 lower_bound = mid_bound;
956 if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
957 iter = inner.mem_parsed_kv.range_mut(
959 ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
960 );
961 is_binary_searching = false;
962 } else {
963 let mid_bound = (lower_bound + upper_bound) / 2;
964 iter = inner
965 .mem_parsed_kv
966 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
967 }
968 }
969 }
970 }
971
972 let counter_end = upper_bound;
973 let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
974
975 let (id, bytes) = 'block_scan: {
976 let kv_store = &self.external_kv.lock();
977 let iter = kv_store
978 .scan(
979 Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
980 Bound::Excluded(&scan_end),
981 )
982 .rev();
983
984 for (id, bytes) in iter {
985 let mut block = ChangesBlockBytes::new(bytes.clone());
986 let (lamport_start, _lamport_end) = block.lamport_range();
987 if lamport_start <= idlp.lamport {
988 break 'block_scan (id, bytes);
989 }
990 }
991
992 return None;
993 };
994
995 let block_id = ID::from_bytes(&id);
996 let mut block = Arc::new(
997 ChangesBlock::from_bytes(bytes)
998 .expect("validated external change block should decode"),
999 );
1000 block
1001 .ensure_changes(&self.arena)
1002 .expect("Parse block error");
1003 inner.mem_parsed_kv.insert(block_id, block.clone());
1004 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
1005 Some(BlockChangeRef {
1006 change_index: index,
1007 block,
1008 })
1009 }
1010
1011 fn split_change_then_insert(
1012 &self,
1013 change: Change,
1014 mut rollback: Option<&mut ChangeStoreRollback>,
1015 ) {
1016 let original_len = change.atom_len();
1017 let mut new_change = Change {
1018 ops: RleVec::new(),
1019 deps: change.deps,
1020 id: change.id,
1021 lamport: change.lamport,
1022 timestamp: change.timestamp,
1023 commit_msg: change.commit_msg.clone(),
1024 };
1025
1026 let mut total_len = 0;
1027 let mut estimated_size = new_change.estimate_storage_size();
1028 'outer: for mut op in change.ops.into_iter() {
1029 if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
1030 new_change = self._insert_splitted_change(
1031 new_change,
1032 &mut total_len,
1033 &mut estimated_size,
1034 rollback.as_deref_mut(),
1035 );
1036 }
1037
1038 while let Some(end) =
1039 op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
1040 {
1041 let new = op.slice(0, end);
1043 new_change.ops.push(new);
1044 new_change = self._insert_splitted_change(
1045 new_change,
1046 &mut total_len,
1047 &mut estimated_size,
1048 rollback.as_deref_mut(),
1049 );
1050
1051 if end < op.atom_len() {
1052 op = op.slice(end, op.atom_len());
1053 } else {
1054 continue 'outer;
1055 }
1056 }
1057
1058 estimated_size += op.estimate_storage_size();
1059 if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
1060 new_change = self._insert_splitted_change(
1061 new_change,
1062 &mut total_len,
1063 &mut estimated_size,
1064 rollback.as_deref_mut(),
1065 );
1066 new_change.ops.push(op);
1067 } else {
1068 new_change.ops.push(op);
1069 }
1070 }
1071
1072 if !new_change.ops.is_empty() {
1073 total_len += new_change.atom_len();
1074 self.insert_change_inner(new_change, false, false, rollback.as_deref_mut());
1075 }
1076
1077 assert_eq!(total_len, original_len);
1078 }
1079
1080 fn _insert_splitted_change(
1081 &self,
1082 new_change: Change,
1083 total_len: &mut usize,
1084 estimated_size: &mut usize,
1085 rollback: Option<&mut ChangeStoreRollback>,
1086 ) -> Change {
1087 if new_change.atom_len() == 0 {
1088 return new_change;
1089 }
1090
1091 let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
1092 let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
1093 *total_len += new_change.atom_len();
1094 let ans = Change {
1095 ops: RleVec::new(),
1096 deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
1097 id: ID::new(new_change.id.peer, ctr_end),
1098 lamport: next_lamport,
1099 timestamp: new_change.timestamp,
1100 commit_msg: new_change.commit_msg.clone(),
1101 };
1102
1103 self.insert_change_inner(new_change, false, false, rollback);
1104 *estimated_size = ans.estimate_storage_size();
1105 ans
1106 }
1107
1108 fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1109 let mut inner = self.inner.lock();
1110 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1111 if block.peer == id.peer && block.counter_range.1 > id.counter {
1112 block
1113 .ensure_changes(&self.arena)
1114 .expect("Parse block error");
1115 return Some(block.clone());
1116 }
1117 }
1118
1119 let store = self.external_kv.lock();
1120 let mut iter = store
1121 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1122 .filter(|(id, _)| id.len() == 12);
1123
1124 let (b_id, b_bytes) = iter.next_back()?;
1135 let block_id: ID = ID::from_bytes(&b_id[..]);
1136 let block = ChangesBlock::from_bytes(b_bytes)
1137 .expect("validated external change block should decode");
1138 if block_id.peer == id.peer
1139 && block_id.counter <= id.counter
1140 && block.counter_range.1 > id.counter
1141 {
1142 let mut arc_block = Arc::new(block);
1143 arc_block
1144 .ensure_changes(&self.arena)
1145 .expect("Parse block error");
1146 inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1147 return Some(arc_block);
1148 }
1149
1150 None
1151 }
1152
1153 pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1158 let mut whether_need_scan_backward = match start {
1159 Bound::Included(id) => Some(id),
1160 Bound::Excluded(id) => Some(id.inc(1)),
1161 Bound::Unbounded => None,
1162 };
1163
1164 {
1165 let start = start.map(|id| id.to_bytes());
1166 let end = end.map(|id| id.to_bytes());
1167 let kv = self.external_kv.lock();
1168 let mut inner = self.inner.lock();
1169 for (id, bytes) in kv
1170 .scan(
1171 start.as_ref().map(|x| x.as_slice()),
1172 end.as_ref().map(|x| x.as_slice()),
1173 )
1174 .filter(|(id, _)| id.len() == 12)
1175 {
1176 let id = ID::from_bytes(&id);
1177 if let Some(expected_start_id) = whether_need_scan_backward {
1178 if id == expected_start_id {
1179 whether_need_scan_backward = None;
1180 }
1181 }
1182
1183 if inner.mem_parsed_kv.contains_key(&id) {
1184 continue;
1185 }
1186
1187 let block = ChangesBlock::from_bytes(bytes.clone())
1188 .expect("validated external change block should decode");
1189 inner.mem_parsed_kv.insert(id, Arc::new(block));
1190 }
1191 }
1192
1193 if let Some(start_id) = whether_need_scan_backward {
1194 self.ensure_id_lte(start_id);
1195 }
1196 }
1197
1198 pub(super) fn ensure_id_lte(&self, id: ID) {
1199 let kv = self.external_kv.lock();
1200 let mut inner = self.inner.lock();
1201 let Some((next_back_id, next_back_bytes)) = kv
1202 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1203 .filter(|(id, _)| id.len() == 12)
1204 .next_back()
1205 else {
1206 return;
1207 };
1208
1209 let next_back_id = ID::from_bytes(&next_back_id);
1210 if next_back_id.peer == id.peer {
1211 if inner.mem_parsed_kv.contains_key(&next_back_id) {
1212 return;
1213 }
1214
1215 let block = ChangesBlock::from_bytes(next_back_bytes)
1216 .expect("validated external change block should decode");
1217 inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1218 }
1219 }
1220 }
1221}
1222
1223#[must_use]
1224#[derive(Clone, Debug)]
1225pub(crate) struct BatchDecodeInfo {
1226 pub vv: VersionVector,
1227 pub frontiers: Frontiers,
1228 pub start_version: Option<(VersionVector, Frontiers)>,
1229}
1230
1231#[derive(Clone, Debug)]
1232pub struct BlockChangeRef {
1233 block: Arc<ChangesBlock>,
1234 change_index: usize,
1235}
1236
1237impl Deref for BlockChangeRef {
1238 type Target = Change;
1239 fn deref(&self) -> &Change {
1240 &self.block.content.try_changes().unwrap()[self.change_index]
1241 }
1242}
1243
1244impl BlockChangeRef {
1245 pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1246 if counter >= self.ctr_end() {
1247 return None;
1248 }
1249
1250 let index = self.ops.search_atom_index(counter);
1251 Some(BlockOpRef {
1252 block: self.block.clone(),
1253 change_index: self.change_index,
1254 op_index: index,
1255 })
1256 }
1257}
1258
1259#[derive(Clone, Debug)]
1260pub(crate) struct BlockOpRef {
1261 pub block: Arc<ChangesBlock>,
1262 pub change_index: usize,
1263 pub op_index: usize,
1264}
1265
1266impl Deref for BlockOpRef {
1267 type Target = Op;
1268
1269 fn deref(&self) -> &Op {
1270 &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1271 }
1272}
1273
1274impl BlockOpRef {
1275 pub fn lamport(&self) -> Lamport {
1276 let change = &self.block.content.try_changes().unwrap()[self.change_index];
1277 let op = &change.ops[self.op_index];
1278 (op.counter - change.id.counter) as Lamport + change.lamport
1279 }
1280}
1281
1282impl ChangesBlock {
1283 fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1284 let len = bytes.len();
1285 let bytes = ChangesBlockBytes::new(bytes);
1286 bytes.ensure_header()?;
1287 let header = bytes
1288 .header
1289 .get()
1290 .expect("header should be initialized after ensure_header");
1291 let peer = header.peer;
1292 let counter_range = (
1293 header.counter,
1294 *header.counters.last().ok_or_else(|| {
1295 LoroError::DecodeError("Decode block error: missing counters".into())
1296 })?,
1297 );
1298 let lamport_range = (
1299 *header.lamports.first().ok_or_else(|| {
1300 LoroError::DecodeError("Decode block error: missing lamports".into())
1301 })?,
1302 *header.lamports.last().ok_or_else(|| {
1303 LoroError::DecodeError("Decode block error: missing lamports".into())
1304 })?,
1305 );
1306 let content = ChangesBlockContent::Bytes(bytes);
1307 Ok(Self {
1308 peer,
1309 estimated_size: len,
1310 counter_range,
1311 lamport_range,
1312 flushed: true,
1313 content,
1314 })
1315 }
1316
1317 pub(crate) fn content(&self) -> &ChangesBlockContent {
1318 &self.content
1319 }
1320
1321 fn new(change: Change, _a: &SharedArena) -> Self {
1322 let atom_len = change.atom_len();
1323 let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1324 let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1325 let estimated_size = change.estimate_storage_size();
1326 let peer = change.id.peer;
1327 let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1328 Self {
1329 peer,
1330 counter_range,
1331 lamport_range,
1332 estimated_size,
1333 content,
1334 flushed: false,
1335 }
1336 }
1337
1338 #[allow(unused)]
1339 fn cmp_id(&self, id: ID) -> Ordering {
1340 self.peer.cmp(&id.peer).then_with(|| {
1341 if self.counter_range.0 > id.counter {
1342 Ordering::Greater
1343 } else if self.counter_range.1 <= id.counter {
1344 Ordering::Less
1345 } else {
1346 Ordering::Equal
1347 }
1348 })
1349 }
1350
1351 #[allow(unused)]
1352 fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1353 self.peer.cmp(&idlp.0).then_with(|| {
1354 if self.lamport_range.0 > idlp.1 {
1355 Ordering::Greater
1356 } else if self.lamport_range.1 <= idlp.1 {
1357 Ordering::Less
1358 } else {
1359 Ordering::Equal
1360 }
1361 })
1362 }
1363
1364 #[allow(unused)]
1365 fn is_full(&self) -> bool {
1366 self.estimated_size > MAX_BLOCK_SIZE
1367 }
1368
1369 #[allow(clippy::result_large_err)]
1370 fn push_change(
1371 self: &mut Arc<Self>,
1372 change: Change,
1373 new_change_size: usize,
1374 merge_interval: i64,
1375 a: &SharedArena,
1376 ) -> Result<(), Change> {
1377 if self.counter_range.1 != change.id.counter {
1378 return Err(change);
1379 }
1380
1381 let atom_len = change.atom_len();
1382 let next_lamport = change.lamport + atom_len as Lamport;
1383 let next_counter = change.id.counter + atom_len as Counter;
1384
1385 let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1386 let this = Arc::make_mut(self);
1387 let changes = this.content.changes_mut(a).unwrap();
1388 let changes = Arc::make_mut(changes);
1389 match changes.last_mut() {
1390 Some(last)
1391 if last.can_merge_right(&change, merge_interval)
1392 && (!is_full
1393 || (change.ops.len() == 1
1394 && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1395 {
1396 for op in change.ops.into_iter() {
1397 let size = op.estimate_storage_size();
1398 if !last.ops.push(op) {
1399 this.estimated_size += size;
1400 }
1401 }
1402 }
1403 _ => {
1404 if is_full {
1405 return Err(change);
1406 } else {
1407 this.estimated_size += new_change_size;
1408 changes.push(change);
1409 }
1410 }
1411 }
1412
1413 this.flushed = false;
1414 this.counter_range.1 = next_counter;
1415 this.lamport_range.1 = next_lamport;
1416 Ok(())
1417 }
1418
1419 fn to_bytes(self: &mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1420 match &self.content {
1421 ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1422 ChangesBlockContent::Both(_, bytes) => {
1423 let bytes = bytes.clone();
1424 let this = Arc::make_mut(self);
1425 this.content = ChangesBlockContent::Bytes(bytes.clone());
1426 bytes
1427 }
1428 ChangesBlockContent::Changes(changes) => {
1429 let bytes = ChangesBlockBytes::serialize(changes, a);
1430 let this = Arc::make_mut(self);
1431 this.content = ChangesBlockContent::Bytes(bytes.clone());
1432 bytes
1433 }
1434 }
1435 }
1436
1437 fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1438 match &self.content {
1439 ChangesBlockContent::Changes(_) => Ok(()),
1440 ChangesBlockContent::Both(_, _) => Ok(()),
1441 ChangesBlockContent::Bytes(bytes) => {
1442 let changes = bytes.parse(a)?;
1443 let b = bytes.clone();
1444 let this = Arc::make_mut(self);
1445 this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1446 Ok(())
1447 }
1448 }
1449 }
1450
1451 fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1452 let changes = self.content.try_changes().unwrap();
1453 changes.binary_search_by(|c| {
1454 if c.id.counter > counter {
1455 Ordering::Greater
1456 } else if (c.id.counter + c.content_len() as Counter) <= counter {
1457 Ordering::Less
1458 } else {
1459 Ordering::Equal
1460 }
1461 })
1462 }
1463
1464 fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1465 let changes = self.content.try_changes().unwrap();
1466 let r = changes.binary_search_by(|c| {
1467 if c.lamport > lamport {
1468 Ordering::Greater
1469 } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1470 Ordering::Less
1471 } else {
1472 Ordering::Equal
1473 }
1474 });
1475
1476 match r {
1477 Ok(found) => Some(found),
1478 Err(idx) => {
1479 if idx == 0 {
1480 None
1481 } else {
1482 Some(idx - 1)
1483 }
1484 }
1485 }
1486 }
1487
1488 #[allow(unused)]
1489 fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1490 self.content.changes(a)
1491 }
1492
1493 #[allow(unused)]
1494 fn id(&self) -> ID {
1495 ID::new(self.peer, self.counter_range.0)
1496 }
1497
1498 pub fn change_num(&self) -> usize {
1499 match &self.content {
1500 ChangesBlockContent::Changes(c) => c.len(),
1501 ChangesBlockContent::Bytes(b) => b.len_changes(),
1502 ChangesBlockContent::Both(c, _) => c.len(),
1503 }
1504 }
1505}
1506
1507impl ChangesBlockContent {
1508 pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1510 let mut dag_nodes = Vec::new();
1511 match self {
1512 ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1513 for change in c.iter() {
1514 let new_node = AppDagNodeInner {
1515 peer: change.id.peer,
1516 cnt: change.id.counter,
1517 lamport: change.lamport,
1518 deps: change.deps.clone(),
1519 vv: OnceCell::new(),
1520 has_succ: false,
1521 len: change.atom_len(),
1522 }
1523 .into();
1524
1525 dag_nodes.push_rle_element(new_node);
1526 }
1527 }
1528 ChangesBlockContent::Bytes(b) => {
1529 b.ensure_header().unwrap();
1530 let header = b.header.get().unwrap();
1531 let n = header.n_changes;
1532 for i in 0..n {
1533 let new_node = AppDagNodeInner {
1534 peer: header.peer,
1535 cnt: header.counters[i],
1536 lamport: header.lamports[i],
1537 deps: header.deps_groups[i].clone(),
1538 vv: OnceCell::new(),
1539 has_succ: false,
1540 len: (header.counters[i + 1] - header.counters[i]) as usize,
1541 }
1542 .into();
1543
1544 dag_nodes.push_rle_element(new_node);
1545 }
1546 }
1547 }
1548
1549 dag_nodes
1550 }
1551
1552 #[allow(unused)]
1553 pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1554 match self {
1555 ChangesBlockContent::Changes(changes) => Ok(changes),
1556 ChangesBlockContent::Both(changes, _) => Ok(changes),
1557 ChangesBlockContent::Bytes(bytes) => {
1558 let changes = bytes.parse(a)?;
1559 *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1560 self.changes(a)
1561 }
1562 }
1563 }
1564
1565 fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1567 match self {
1568 ChangesBlockContent::Changes(changes) => Ok(changes),
1569 ChangesBlockContent::Both(changes, _) => {
1570 *self = ChangesBlockContent::Changes(std::mem::take(changes));
1571 self.changes_mut(a)
1572 }
1573 ChangesBlockContent::Bytes(bytes) => {
1574 let changes = bytes.parse(a)?;
1575 *self = ChangesBlockContent::Changes(Arc::new(changes));
1576 self.changes_mut(a)
1577 }
1578 }
1579 }
1580
1581 pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1582 match self {
1583 ChangesBlockContent::Changes(changes) => Some(changes),
1584 ChangesBlockContent::Both(changes, _) => Some(changes),
1585 ChangesBlockContent::Bytes(_) => None,
1586 }
1587 }
1588
1589 pub(crate) fn len_changes(&self) -> usize {
1590 match self {
1591 ChangesBlockContent::Changes(changes) => changes.len(),
1592 ChangesBlockContent::Both(changes, _) => changes.len(),
1593 ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1594 }
1595 }
1596}
1597
1598impl std::fmt::Debug for ChangesBlockContent {
1599 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1600 match self {
1601 ChangesBlockContent::Changes(changes) => f
1602 .debug_tuple("ChangesBlockContent::Changes")
1603 .field(changes)
1604 .finish(),
1605 ChangesBlockContent::Bytes(_bytes) => {
1606 f.debug_tuple("ChangesBlockContent::Bytes").finish()
1607 }
1608 ChangesBlockContent::Both(changes, _bytes) => f
1609 .debug_tuple("ChangesBlockContent::Both")
1610 .field(changes)
1611 .finish(),
1612 }
1613 }
1614}
1615
1616impl ChangesBlockBytes {
1617 fn new(bytes: Bytes) -> Self {
1618 Self {
1619 header: OnceCell::new(),
1620 bytes,
1621 }
1622 }
1623
1624 fn ensure_header(&self) -> LoroResult<()> {
1625 self.header
1626 .get_or_try_init(|| decode_header(&self.bytes).map(Arc::new))?;
1627 Ok(())
1628 }
1629
1630 fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1631 self.ensure_header()?;
1632 let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1633 for c in ans.iter() {
1634 register_container_and_parent_link(a, c)
1636 }
1637
1638 Ok(ans)
1639 }
1640
1641 fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1642 let bytes = encode_block(changes, a);
1643 let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1645 bytes.ensure_header().unwrap();
1646 bytes
1647 }
1648
1649 fn lamport_range(&mut self) -> (Lamport, Lamport) {
1650 if let Some(header) = self.header.get() {
1651 (header.lamports[0], *header.lamports.last().unwrap())
1652 } else {
1653 decode_block_range(&self.bytes).unwrap().1
1654 }
1655 }
1656
1657 fn len_changes(&self) -> usize {
1659 self.ensure_header().unwrap();
1660 self.header.get().unwrap().n_changes
1661 }
1662}
1663
1664#[cfg(test)]
1665mod test {
1666 use crate::cursor::PosType;
1667 use crate::{
1668 loro::ExportMode, oplog::convert_change_to_remote, state::TreeParentId, ListHandler,
1669 LoroDoc, MovableListHandler, TextHandler, TreeHandler,
1670 };
1671
1672 use super::*;
1673
1674 fn test_encode_decode(doc: LoroDoc) {
1675 doc.commit_then_renew();
1676 let oplog = doc.oplog().lock();
1677 let bytes = oplog
1678 .change_store
1679 .encode_all(oplog.vv(), oplog.dag.frontiers());
1680 let store = ChangeStore::new_for_test();
1681 let _ = store.import_all(bytes.clone()).unwrap();
1682 assert_eq!(store.external_kv.lock().export_all(), bytes);
1683 let mut changes_parsed = Vec::new();
1684 let a = store.arena.clone();
1685 store.visit_all_changes(&mut |c| {
1686 changes_parsed.push(convert_change_to_remote(&a, c));
1687 });
1688 let mut changes = Vec::new();
1689 oplog.change_store.visit_all_changes(&mut |c| {
1690 changes.push(convert_change_to_remote(&oplog.arena, c));
1691 });
1692 assert_eq!(changes_parsed, changes);
1693 }
1694
1695 #[test]
1696 fn test_change_store() {
1697 let doc = LoroDoc::new_auto_commit();
1698 doc.set_record_timestamp(true);
1699 let t = doc.get_text("t");
1700 t.insert(0, "hello", PosType::Unicode).unwrap();
1701 doc.commit_then_renew();
1702 let t = doc.get_list("t");
1703 t.insert(0, "hello").unwrap();
1704 test_encode_decode(doc);
1705 }
1706
1707 #[test]
1708 fn test_synced_doc() -> LoroResult<()> {
1709 let doc_a = LoroDoc::new_auto_commit();
1710 let doc_b = LoroDoc::new_auto_commit();
1711 let doc_c = LoroDoc::new_auto_commit();
1712
1713 {
1714 let map = doc_a.get_map("root");
1716 map.insert_container("text", TextHandler::new_detached())?;
1717 map.insert_container("list", ListHandler::new_detached())?;
1718 map.insert_container("tree", TreeHandler::new_detached())?;
1719 }
1720
1721 {
1722 let initial_state = doc_a.export(ExportMode::all_updates()).unwrap();
1724 doc_b.import(&initial_state)?;
1725 doc_c.import(&initial_state)?;
1726 }
1727
1728 {
1729 let map = doc_b.get_map("root");
1731 let text = map
1732 .insert_container("text", TextHandler::new_detached())
1733 .unwrap();
1734 text.insert(0, "Hello, ", PosType::Unicode)?;
1735
1736 let list = map
1737 .insert_container("list", ListHandler::new_detached())
1738 .unwrap();
1739 list.push("world")?;
1740 }
1741
1742 {
1743 let map = doc_c.get_map("root");
1745 let tree = map
1746 .insert_container("tree", TreeHandler::new_detached())
1747 .unwrap();
1748 let node_id = tree.create(TreeParentId::Root)?;
1749 tree.get_meta(node_id)?.insert("key", "value")?;
1750 let node_b = tree.create(TreeParentId::Root)?;
1751 tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1752
1753 let movable_list = map
1754 .insert_container("movable", MovableListHandler::new_detached())
1755 .unwrap();
1756 movable_list.push("item1".into())?;
1757 movable_list.push("item2".into())?;
1758 movable_list.mov(0, 1)?;
1759 }
1760
1761 let b_changes = doc_b
1763 .export(ExportMode::updates(&doc_a.oplog_vv()))
1764 .unwrap();
1765 doc_a.import(&b_changes)?;
1766
1767 let c_changes = doc_c
1769 .export(ExportMode::updates(&doc_a.oplog_vv()))
1770 .unwrap();
1771 doc_a.import(&c_changes)?;
1772
1773 test_encode_decode(doc_a);
1774 Ok(())
1775 }
1776}