1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::sync::Mutex;
4use crate::{
5 arena::SharedArena,
6 change::Change,
7 estimated_size::EstimatedSize,
8 kv_store::KvStore,
9 op::Op,
10 parent::register_container_and_parent_link,
11 version::{Frontiers, ImVersionVector},
12 VersionVector,
13};
14use block_encode::decode_block_range;
15use bytes::Bytes;
16use itertools::Itertools;
17use loro_common::{
18 Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError,
19 LoroResult, PeerID, ID,
20};
21use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
22use once_cell::sync::OnceCell;
23use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
24use std::sync::atomic::AtomicI64;
25use std::{
26 cmp::Ordering,
27 collections::{BTreeMap, VecDeque},
28 ops::{Bound, Deref},
29 sync::Arc,
30};
31use tracing::{info_span, warn};
32mod block_encode;
33mod block_meta_encode;
34pub(super) mod iter;
35
36#[cfg(not(test))]
37const MAX_BLOCK_SIZE: usize = 1024 * 4;
38#[cfg(test)]
39const MAX_BLOCK_SIZE: usize = 128;
40
41#[derive(Debug, Clone)]
61pub struct ChangeStore {
62 inner: Arc<Mutex<ChangeStoreInner>>,
63 arena: SharedArena,
64 external_kv: Arc<Mutex<dyn KvStore>>,
69 external_vv: Arc<Mutex<VersionVector>>,
71 merge_interval: Arc<AtomicI64>,
72}
73
74#[derive(Debug, Clone)]
75struct ChangeStoreInner {
76 start_vv: ImVersionVector,
79 start_frontiers: Frontiers,
81 mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
83}
84
85#[derive(Debug)]
86pub(crate) struct ChangeStoreRollback {
87 old_vv: VersionVector,
88 blocks_before_mutation: BTreeMap<ID, Arc<ChangesBlock>>,
89}
90
91impl ChangeStoreRollback {
92 pub(crate) fn new(old_vv: VersionVector) -> Self {
93 Self {
94 old_vv,
95 blocks_before_mutation: BTreeMap::new(),
96 }
97 }
98
99 fn record_block_before_mutation(&mut self, id: ID, block: Arc<ChangesBlock>) {
100 let old_end = self.old_vv.get(&id.peer).copied().unwrap_or(0);
101 if id.counter >= old_end {
102 return;
103 }
104
105 self.blocks_before_mutation.entry(id).or_insert(block);
106 }
107}
108
109#[derive(Debug, Clone)]
110pub(crate) struct ChangesBlock {
111 peer: PeerID,
112 counter_range: (Counter, Counter),
113 lamport_range: (Lamport, Lamport),
114 estimated_size: usize,
116 flushed: bool,
117 content: ChangesBlockContent,
118}
119
120#[derive(Clone)]
121pub(crate) enum ChangesBlockContent {
122 Changes(Arc<Vec<Change>>),
123 Bytes(ChangesBlockBytes),
124 Both(Arc<Vec<Change>>, ChangesBlockBytes),
125}
126
127#[derive(Clone)]
129pub(crate) struct ChangesBlockBytes {
130 bytes: Bytes,
131 header: OnceCell<Arc<ChangesBlockHeader>>,
132}
133
134pub const START_VV_KEY: &[u8] = b"sv";
135pub const START_FRONTIERS_KEY: &[u8] = b"sf";
136pub const VV_KEY: &[u8] = b"vv";
137pub const FRONTIERS_KEY: &[u8] = b"fr";
138
139impl ChangeStore {
140 pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
141 Self {
142 inner: Arc::new(Mutex::new(ChangeStoreInner {
143 start_vv: ImVersionVector::new(),
144 start_frontiers: Frontiers::default(),
145 mem_parsed_kv: BTreeMap::new(),
146 })),
147 arena: a.clone(),
148 external_vv: Arc::new(Mutex::new(VersionVector::new())),
149 external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
150 merge_interval,
152 }
153 }
154
155 #[cfg(test)]
156 fn new_for_test() -> Self {
157 Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
158 }
159
160 pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
161 self.flush_and_compact(vv, frontiers);
162 let mut kv = self.external_kv.lock();
163 kv.export_all()
164 }
165
166 #[tracing::instrument(skip(self), level = "debug")]
167 pub(super) fn export_from(
168 &self,
169 start_vv: &VersionVector,
170 start_frontiers: &Frontiers,
171 latest_vv: &VersionVector,
172 latest_frontiers: &Frontiers,
173 ) -> Bytes {
174 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
175 for span in latest_vv.sub_iter(start_vv) {
176 for c in self.iter_changes(span) {
179 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
180 as usize)
181 .min(c.atom_len());
182 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
183 as usize)
184 .min(c.atom_len());
185
186 if start == end {
187 continue;
188 }
189
190 let ch = c.slice(start, end);
191 new_store.insert_change(ch, false, false);
192 }
193 }
194
195 loro_common::debug!(
196 "start_vv={:?} start_frontiers={:?}",
197 &start_vv,
198 start_frontiers
199 );
200 new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
201 }
202
203 pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
204 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
205 for span in spans {
206 let mut span = *span;
207 span.normalize_();
208 if span.counter.end <= 0 {
209 continue;
210 }
211
212 span.counter.start = span.counter.start.max(0);
213 span.counter.end = span.counter.end.max(0);
214 if span.counter.start >= span.counter.end {
215 continue;
216 }
217
218 for c in self.iter_changes(span) {
221 let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
222 let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
223 if start == end {
224 continue;
225 }
226
227 let ch = c.slice(start, end);
228 new_store.insert_change(ch, false, false);
229 }
230 }
231
232 encode_blocks_in_store(new_store, &self.arena, w);
233 }
234
235 fn encode_from(
236 &self,
237 start_vv: &VersionVector,
238 start_frontiers: &Frontiers,
239 latest_vv: &VersionVector,
240 latest_frontiers: &Frontiers,
241 ) -> Bytes {
242 {
243 let mut store = self.external_kv.lock();
244 store.set(START_VV_KEY, start_vv.encode().into());
245 store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
246 let mut inner = self.inner.lock();
247 inner.start_frontiers = start_frontiers.clone();
248 inner.start_vv = ImVersionVector::from_vv(start_vv);
249 }
250 self.flush_and_compact(latest_vv, latest_frontiers);
251 self.external_kv.lock().export_all()
252 }
253
254 pub(crate) fn decode_snapshot_for_updates(
255 bytes: Bytes,
256 arena: &SharedArena,
257 self_vv: &VersionVector,
258 ) -> Result<Vec<Change>, LoroError> {
259 let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
260 let _ = change_store.import_all(bytes)?;
261 let mut changes = Vec::new();
262 change_store.visit_all_changes(&mut |c| {
263 let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
264 if c.id.counter >= cnt_threshold {
265 changes.push(c.clone());
266 return;
267 }
268
269 let change_end = c.ctr_end();
270 if change_end > cnt_threshold {
271 changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
272 }
273 });
274
275 Ok(changes)
276 }
277
278 pub(crate) fn decode_block_bytes(
279 bytes: Bytes,
280 arena: &SharedArena,
281 self_vv: &VersionVector,
282 ) -> LoroResult<Vec<Change>> {
283 let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
284 if ans.is_empty() {
285 return Ok(ans);
286 }
287
288 let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
289 ans.retain_mut(|c| {
290 if c.id.counter >= start {
291 true
292 } else if c.ctr_end() > start {
293 *c = c.slice((start - c.id.counter) as usize, c.atom_len());
294 true
295 } else {
296 false
297 }
298 });
299
300 Ok(ans)
301 }
302
303 pub(crate) fn rollback_import(&self, rollback: ChangeStoreRollback) {
304 let mut inner = self.inner.lock();
305 inner.mem_parsed_kv.retain(|id, _| {
306 let old_end = rollback.old_vv.get(&id.peer).copied().unwrap_or(0);
307 id.counter < old_end
308 });
309
310 for (id, block) in rollback.blocks_before_mutation {
311 inner.mem_parsed_kv.insert(id, block);
312 }
313 }
314
315 pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
316 let block = self.get_block_that_contains(id)?;
317 Some(block.content.iter_dag_nodes())
318 }
319
320 pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
321 let block = self.get_the_last_block_of_peer(peer)?;
322 Some(block.content.iter_dag_nodes())
323 }
324
325 pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
326 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
327 let mut inner = self.inner.lock();
328 for (_, block) in inner.mem_parsed_kv.iter_mut() {
329 block
330 .ensure_changes(&self.arena)
331 .expect("Parse block error");
332 for c in block.content.try_changes().unwrap() {
333 f(c);
334 }
335 }
336 }
337
338 pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
339 if id_span.counter.start == id_span.counter.end {
340 return vec![];
341 }
342
343 assert!(id_span.counter.start < id_span.counter.end);
344 self.ensure_block_loaded_in_range(
345 Bound::Included(id_span.id_start()),
346 Bound::Excluded(id_span.id_end()),
347 );
348 let mut inner = self.inner.lock();
349 let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
350 match next_back {
351 None => {
352 return vec![];
353 }
354 Some(next_back) => {
355 if next_back.0.peer != id_span.peer {
356 return vec![];
357 }
358 }
359 }
360 let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
361 let ans = inner
362 .mem_parsed_kv
363 .range_mut(
364 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
365 )
366 .filter_map(|(_id, block)| {
367 if block.counter_range.1 < id_span.counter.start {
368 return None;
369 }
370
371 block
372 .ensure_changes(&self.arena)
373 .expect("Parse block error");
374 let changes = block.content.try_changes().unwrap();
375 let start;
376 let end;
377 if id_span.counter.start <= block.counter_range.0
378 && id_span.counter.end >= block.counter_range.1
379 {
380 start = 0;
381 end = changes.len();
382 } else {
383 start = block
384 .get_change_index_by_counter(id_span.counter.start)
385 .unwrap_or_else(|x| x);
386
387 match block.get_change_index_by_counter(id_span.counter.end - 1) {
388 Ok(e) => {
389 end = e + 1;
390 }
391 Err(0) => return None,
392 Err(e) => {
393 end = e;
394 }
395 }
396 }
397 if start == end {
398 return None;
399 }
400
401 Some((block.clone(), start, end))
402 })
403 .collect_vec();
405
406 ans
407 }
408
409 pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
410 let v = self.iter_blocks(id_span);
411 #[cfg(debug_assertions)]
412 {
413 if !v.is_empty() {
414 assert_eq!(v[0].0.peer, id_span.peer);
415 assert_eq!(v.last().unwrap().0.peer, id_span.peer);
416 {
417 let (block, start, _end) = v.first().unwrap();
419 let changes = block.content.try_changes().unwrap();
420 assert!(changes[*start].id.counter <= id_span.counter.start);
421 }
422 {
423 let (block, _start, end) = v.last().unwrap();
425 let changes = block.content.try_changes().unwrap();
426 assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
427 assert!(changes[*end - 1].id.counter < id_span.counter.end);
428 }
429 }
430 }
431
432 v.into_iter().flat_map(move |(block, start, end)| {
433 (start..end).map(move |i| BlockChangeRef {
434 change_index: i,
435 block: block.clone(),
436 })
437 })
438 }
439
440 #[allow(dead_code)]
441 pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
442 let mut inner = self.inner.lock();
443 let start_counter = inner
444 .mem_parsed_kv
445 .range(..=id_span.id_start())
446 .next_back()
447 .map(|(id, _)| id.counter)
448 .unwrap_or(0);
449 let vec = inner
450 .mem_parsed_kv
451 .range_mut(
452 ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
453 )
454 .filter_map(|(_id, block)| {
455 if block.counter_range.1 < id_span.counter.start {
456 return None;
457 }
458
459 block
460 .ensure_changes(&self.arena)
461 .expect("Parse block error");
462 Some(block.clone())
463 })
464 .collect();
466 vec
467 }
468
469 pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
470 self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
471 let inner = self.inner.lock();
472 let block = inner
473 .mem_parsed_kv
474 .range(..=id)
475 .next_back()
476 .filter(|(_, block)| {
477 block.peer == id.peer
478 && block.counter_range.0 <= id.counter
479 && id.counter < block.counter_range.1
480 })
481 .map(|(_, block)| block.clone());
482
483 block
484 }
485
486 pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
487 let end_id = ID::new(peer, Counter::MAX);
488 self.ensure_id_lte(end_id);
489 let inner = self.inner.lock();
490 let block = inner
491 .mem_parsed_kv
492 .range(..=end_id)
493 .next_back()
494 .filter(|(_, block)| block.peer == peer)
495 .map(|(_, block)| block.clone());
496
497 block
498 }
499
500 pub fn change_num(&self) -> usize {
501 self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
502 let mut inner = self.inner.lock();
503 inner
504 .mem_parsed_kv
505 .iter_mut()
506 .map(|(_, block)| block.change_num())
507 .sum()
508 }
509
510 pub fn fork(
511 &self,
512 arena: SharedArena,
513 merge_interval: Arc<AtomicI64>,
514 vv: &VersionVector,
515 frontiers: &Frontiers,
516 ) -> Self {
517 self.flush_and_compact(vv, frontiers);
518 let inner = self.inner.lock();
519 Self {
520 inner: Arc::new(Mutex::new(ChangeStoreInner {
521 start_vv: inner.start_vv.clone(),
522 start_frontiers: inner.start_frontiers.clone(),
523 mem_parsed_kv: BTreeMap::new(),
524 })),
525 arena,
526 external_vv: Arc::new(Mutex::new(self.external_vv.lock().clone())),
527 external_kv: self.external_kv.lock().clone_store(),
528 merge_interval,
529 }
530 }
531
532 pub fn kv_size(&self) -> usize {
533 self.external_kv
534 .lock()
535 .scan(Bound::Unbounded, Bound::Unbounded)
536 .map(|(k, v)| k.len() + v.len())
537 .sum()
538 }
539
540 pub(crate) fn export_blocks_from<W: std::io::Write>(
541 &self,
542 start_vv: &VersionVector,
543 shallow_since_vv: &ImVersionVector,
544 latest_vv: &VersionVector,
545 w: &mut W,
546 ) {
547 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
548 for mut span in latest_vv.sub_iter(start_vv) {
549 let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
550 span.counter.start = span.counter.start.max(counter_lower_bound);
551 span.counter.end = span.counter.end.max(counter_lower_bound);
552 if span.counter.start >= span.counter.end {
553 continue;
554 }
555
556 for c in self.iter_changes(span) {
559 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
560 as usize)
561 .min(c.atom_len());
562 let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
563 as usize)
564 .min(c.atom_len());
565
566 assert_ne!(start, end);
567 let ch = c.slice(start, end);
568 new_store.insert_change(ch, false, false);
569 }
570 }
571
572 let arena = &self.arena;
573 encode_blocks_in_store(new_store, arena, w);
574 }
575
576 pub(crate) fn fork_changes_up_to(
577 &self,
578 start_vv: &ImVersionVector,
579 frontiers: &Frontiers,
580 vv: &VersionVector,
581 ) -> Bytes {
582 let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
583 for mut span in vv.sub_iter_im(start_vv) {
584 let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
585 span.counter.start = span.counter.start.max(counter_lower_bound);
586 span.counter.end = span.counter.end.max(counter_lower_bound);
587 if span.counter.start >= span.counter.end {
588 continue;
589 }
590
591 for c in self.iter_changes(span) {
594 let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
595 as usize)
596 .min(c.atom_len());
597 let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
598 as usize)
599 .min(c.atom_len());
600
601 assert_ne!(start, end);
602 let ch = c.slice(start, end);
603 new_store.insert_change(ch, false, false);
604 }
605 }
606
607 new_store.encode_all(vv, frontiers)
608 }
609}
610
611fn encode_blocks_in_store<W: std::io::Write>(
612 new_store: ChangeStore,
613 arena: &SharedArena,
614 w: &mut W,
615) {
616 let mut inner = new_store.inner.lock();
617 for (_id, block) in inner.mem_parsed_kv.iter_mut() {
618 let bytes = block.to_bytes(arena);
619 leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
620 w.write_all(&bytes.bytes).unwrap();
621 }
622}
623
624mod mut_external_kv {
625 use super::*;
628
629 impl ChangeStore {
630 #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
631 pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
632 let mut kv_store = self.external_kv.lock();
633 assert!(
634 kv_store.len() <= 2,
636 "kv store should be empty when using decode_all"
637 );
638 kv_store
639 .import_all(bytes)
640 .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
641 drop(kv_store);
642 self.validate_imported_change_blocks()?;
643 let vv_bytes = self.external_kv.lock().get(VV_KEY).unwrap_or_default();
644 let vv = VersionVector::decode(&vv_bytes)
645 .map_err(|_| LoroError::DecodeDataCorruptionError)?;
646 let start_vv_bytes = self
647 .external_kv
648 .lock()
649 .get(START_VV_KEY)
650 .unwrap_or_default();
651 let start_vv = if start_vv_bytes.is_empty() {
652 Default::default()
653 } else {
654 VersionVector::decode(&start_vv_bytes)
655 .map_err(|_| LoroError::DecodeDataCorruptionError)?
656 };
657
658 #[cfg(test)]
659 {
660 for (peer, cnt) in vv.iter() {
662 self.get_change(ID::new(*peer, *cnt - 1))
663 .ok_or(LoroError::DecodeDataCorruptionError)?;
664 }
665 }
666
667 *self.external_vv.lock() = vv.clone();
668 let frontiers_bytes = self
669 .external_kv
670 .lock()
671 .get(FRONTIERS_KEY)
672 .unwrap_or_default();
673 let frontiers = Frontiers::decode(&frontiers_bytes)
674 .map_err(|_| LoroError::DecodeDataCorruptionError)?;
675 let start_frontiers = self
676 .external_kv
677 .lock()
678 .get(START_FRONTIERS_KEY)
679 .unwrap_or_default();
680 let start_frontiers = if start_frontiers.is_empty() {
681 Default::default()
682 } else {
683 Frontiers::decode(&start_frontiers)
684 .map_err(|_| LoroError::DecodeDataCorruptionError)?
685 };
686
687 let mut max_lamport = None;
688 let mut max_timestamp = 0;
689 for id in frontiers.iter() {
690 let c = self
691 .get_change(id)
692 .ok_or(LoroError::DecodeDataCorruptionError)?;
693 debug_assert_ne!(c.atom_len(), 0);
694 let l = c.lamport_last();
695 if let Some(x) = max_lamport {
696 if l > x {
697 max_lamport = Some(l);
698 }
699 } else {
700 max_lamport = Some(l);
701 }
702
703 let t = c.timestamp;
704 if t > max_timestamp {
705 max_timestamp = t;
706 }
707 }
708
709 Ok(BatchDecodeInfo {
710 vv,
711 frontiers,
712 start_version: if start_vv.is_empty() {
713 None
714 } else {
715 let mut inner = self.inner.lock();
716 inner.start_frontiers = start_frontiers.clone();
717 inner.start_vv = ImVersionVector::from_vv(&start_vv);
718 Some((start_vv, start_frontiers))
719 },
720 })
721 }
722
723 fn validate_imported_change_blocks(&self) -> LoroResult<()> {
724 let blocks: Vec<(ID, Bytes)> = {
725 let kv_store = self.external_kv.lock();
726 kv_store
727 .scan(Bound::Unbounded, Bound::Unbounded)
728 .filter(|(id, _)| id.len() == 12)
729 .map(|(id, bytes)| (ID::from_bytes(&id), bytes))
730 .collect()
731 };
732
733 for (_id, bytes) in blocks {
734 let mut block = Arc::new(ChangesBlock::from_bytes(bytes)?);
735 block.ensure_changes(&self.arena)?;
736 }
737
738 Ok(())
739 }
740
741 pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
743 let mut inner = self.inner.lock();
744 let mut store = self.external_kv.lock();
745 let mut external_vv = self.external_vv.lock();
746 for (id, block) in inner.mem_parsed_kv.iter_mut() {
747 if !block.flushed {
748 let id_bytes = id.to_bytes();
749 let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
750 assert!(
751 counter_start < block.counter_range.1,
752 "Peer={} Block Counter Range={:?}, counter_start={}",
753 id.peer,
754 &block.counter_range,
755 counter_start
756 );
757 if counter_start > block.counter_range.0 {
758 assert!(store.get(&id_bytes).is_some());
759 }
760 external_vv.insert(id.peer, block.counter_range.1);
761 let bytes = block.to_bytes(&self.arena);
762 store.set(&id_bytes, bytes.bytes);
763 Arc::make_mut(block).flushed = true;
764 }
765 }
766
767 if inner.start_vv.is_empty() {
768 assert_eq!(&*external_vv, vv);
769 } else {
770 #[cfg(debug_assertions)]
771 {
772 }
774 }
775 let vv_bytes = vv.encode();
776 let frontiers_bytes = frontiers.encode();
777 store.set(VV_KEY, vv_bytes.into());
778 store.set(FRONTIERS_KEY, frontiers_bytes.into());
779 }
780 }
781}
782
783mod mut_inner_kv {
784 use super::*;
788 impl ChangeStore {
789 pub fn insert_change(&self, change: Change, split_when_exceeds: bool, is_local: bool) {
794 self.insert_change_inner(change, split_when_exceeds, is_local, None);
795 }
796
797 pub(crate) fn insert_change_with_rollback(
798 &self,
799 change: Change,
800 split_when_exceeds: bool,
801 is_local: bool,
802 rollback: &mut ChangeStoreRollback,
803 ) {
804 self.insert_change_inner(change, split_when_exceeds, is_local, Some(rollback));
805 }
806
807 fn insert_change_inner(
808 &self,
809 mut change: Change,
810 split_when_exceeds: bool,
811 is_local: bool,
812 mut rollback: Option<&mut ChangeStoreRollback>,
813 ) {
814 #[cfg(debug_assertions)]
815 {
816 let vv = self.external_vv.lock();
817 assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
818 }
819
820 let s = info_span!("change_store insert_change", id = ?change.id);
821 let _e = s.enter();
822 let estimated_size = change.estimate_storage_size();
823 if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
824 self.split_change_then_insert(change, rollback.as_deref_mut());
825 return;
826 }
827
828 let id = change.id;
829 let mut inner = self.inner.lock();
830
831 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
833 if block.peer == change.id.peer {
834 if block.counter_range.1 != change.id.counter {
835 panic!("counter should be continuous")
836 }
837
838 if let Some(rollback) = &mut rollback {
839 rollback.record_block_before_mutation(*_id, block.clone());
840 }
841
842 match block.push_change(
843 change,
844 estimated_size,
845 if is_local {
846 self.merge_interval
849 .load(std::sync::atomic::Ordering::Acquire)
850 } else {
851 0
852 },
853 &self.arena,
854 ) {
855 Ok(_) => {
856 drop(inner);
857 debug_assert!(self.get_change(id).is_some());
858 return;
859 }
860 Err(c) => change = c,
861 }
862 }
863 }
864
865 inner
866 .mem_parsed_kv
867 .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
868 drop(inner);
869 debug_assert!(self.get_change(id).is_some());
870 }
871
872 pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
873 let block = self.get_parsed_block(id)?;
874 Some(BlockChangeRef {
875 change_index: block.get_change_index_by_counter(id.counter).unwrap(),
876 block: block.clone(),
877 })
878 }
879
880 pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
884 let mut inner = self.inner.lock();
887 let mut iter = inner
888 .mem_parsed_kv
889 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
890
891 let mut lower_bound = 0;
893 let mut upper_bound = i32::MAX;
894 let mut is_binary_searching = false;
895 loop {
896 match iter.next_back() {
897 Some((&id, block)) => {
898 if block.lamport_range.0 <= idlp.lamport
899 && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
900 {
901 if !is_binary_searching
902 && upper_bound != i32::MAX
903 && upper_bound != block.counter_range.1
904 {
905 warn!(
906 "There is a hole between the last block and the current block"
907 );
908 break;
911 }
912
913 block
915 .ensure_changes(&self.arena)
916 .expect("Parse block error");
917 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
918 return Some(BlockChangeRef {
919 change_index: index,
920 block: block.clone(),
921 });
922 }
923
924 if is_binary_searching {
925 let mid_bound = (lower_bound + upper_bound) / 2;
926 if block.lamport_range.1 <= idlp.lamport {
927 lower_bound = mid_bound;
929 } else {
930 debug_assert!(
931 idlp.lamport < block.lamport_range.0,
932 "{} {:?}",
933 idlp,
934 &block.lamport_range
935 );
936 upper_bound = mid_bound;
938 }
939
940 let mid_bound = (lower_bound + upper_bound) / 2;
941 iter = inner
942 .mem_parsed_kv
943 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
944 } else {
945 if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
947 {
948 upper_bound = id.counter;
950 let mid_bound = (lower_bound + upper_bound) / 2;
951 iter = inner.mem_parsed_kv.range_mut(
952 ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
953 );
954 is_binary_searching = true;
955 }
956
957 upper_bound = id.counter;
958 }
959 }
960 None => {
961 if !is_binary_searching {
962 break;
963 }
964
965 let mid_bound = (lower_bound + upper_bound) / 2;
966 lower_bound = mid_bound;
967 if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
968 iter = inner.mem_parsed_kv.range_mut(
970 ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
971 );
972 is_binary_searching = false;
973 } else {
974 let mid_bound = (lower_bound + upper_bound) / 2;
975 iter = inner
976 .mem_parsed_kv
977 .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
978 }
979 }
980 }
981 }
982
983 let counter_end = upper_bound;
984 let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
985
986 let (id, bytes) = 'block_scan: {
987 let kv_store = &self.external_kv.lock();
988 let iter = kv_store
989 .scan(
990 Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
991 Bound::Excluded(&scan_end),
992 )
993 .rev();
994
995 for (id, bytes) in iter {
996 let mut block = ChangesBlockBytes::new(bytes.clone());
997 let (lamport_start, _lamport_end) = block.lamport_range();
998 if lamport_start <= idlp.lamport {
999 break 'block_scan (id, bytes);
1000 }
1001 }
1002
1003 return None;
1004 };
1005
1006 let block_id = ID::from_bytes(&id);
1007 let mut block = Arc::new(
1008 ChangesBlock::from_bytes(bytes)
1009 .expect("validated external change block should decode"),
1010 );
1011 block
1012 .ensure_changes(&self.arena)
1013 .expect("Parse block error");
1014 inner.mem_parsed_kv.insert(block_id, block.clone());
1015 let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
1016 Some(BlockChangeRef {
1017 change_index: index,
1018 block,
1019 })
1020 }
1021
1022 fn split_change_then_insert(
1023 &self,
1024 change: Change,
1025 mut rollback: Option<&mut ChangeStoreRollback>,
1026 ) {
1027 let original_len = change.atom_len();
1028 let mut new_change = Change {
1029 ops: RleVec::new(),
1030 deps: change.deps,
1031 id: change.id,
1032 lamport: change.lamport,
1033 timestamp: change.timestamp,
1034 commit_msg: change.commit_msg.clone(),
1035 };
1036
1037 let mut total_len = 0;
1038 let mut estimated_size = new_change.estimate_storage_size();
1039 'outer: for mut op in change.ops.into_iter() {
1040 if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
1041 new_change = self._insert_splitted_change(
1042 new_change,
1043 &mut total_len,
1044 &mut estimated_size,
1045 rollback.as_deref_mut(),
1046 );
1047 }
1048
1049 while let Some(end) =
1050 op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
1051 {
1052 let new = op.slice(0, end);
1054 new_change.ops.push(new);
1055 new_change = self._insert_splitted_change(
1056 new_change,
1057 &mut total_len,
1058 &mut estimated_size,
1059 rollback.as_deref_mut(),
1060 );
1061
1062 if end < op.atom_len() {
1063 op = op.slice(end, op.atom_len());
1064 } else {
1065 continue 'outer;
1066 }
1067 }
1068
1069 estimated_size += op.estimate_storage_size();
1070 if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
1071 new_change = self._insert_splitted_change(
1072 new_change,
1073 &mut total_len,
1074 &mut estimated_size,
1075 rollback.as_deref_mut(),
1076 );
1077 new_change.ops.push(op);
1078 } else {
1079 new_change.ops.push(op);
1080 }
1081 }
1082
1083 if !new_change.ops.is_empty() {
1084 total_len += new_change.atom_len();
1085 self.insert_change_inner(new_change, false, false, rollback);
1086 }
1087
1088 assert_eq!(total_len, original_len);
1089 }
1090
1091 fn _insert_splitted_change(
1092 &self,
1093 new_change: Change,
1094 total_len: &mut usize,
1095 estimated_size: &mut usize,
1096 rollback: Option<&mut ChangeStoreRollback>,
1097 ) -> Change {
1098 if new_change.atom_len() == 0 {
1099 return new_change;
1100 }
1101
1102 let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
1103 let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
1104 *total_len += new_change.atom_len();
1105 let ans = Change {
1106 ops: RleVec::new(),
1107 deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
1108 id: ID::new(new_change.id.peer, ctr_end),
1109 lamport: next_lamport,
1110 timestamp: new_change.timestamp,
1111 commit_msg: new_change.commit_msg.clone(),
1112 };
1113
1114 self.insert_change_inner(new_change, false, false, rollback);
1115 *estimated_size = ans.estimate_storage_size();
1116 ans
1117 }
1118
1119 fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1120 let mut inner = self.inner.lock();
1121 if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1122 if block.peer == id.peer && block.counter_range.1 > id.counter {
1123 block
1124 .ensure_changes(&self.arena)
1125 .expect("Parse block error");
1126 return Some(block.clone());
1127 }
1128 }
1129
1130 let store = self.external_kv.lock();
1131 let mut iter = store
1132 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1133 .filter(|(id, _)| id.len() == 12);
1134
1135 let (b_id, b_bytes) = iter.next_back()?;
1146 let block_id: ID = ID::from_bytes(&b_id[..]);
1147 let block = ChangesBlock::from_bytes(b_bytes)
1148 .expect("validated external change block should decode");
1149 if block_id.peer == id.peer
1150 && block_id.counter <= id.counter
1151 && block.counter_range.1 > id.counter
1152 {
1153 let mut arc_block = Arc::new(block);
1154 arc_block
1155 .ensure_changes(&self.arena)
1156 .expect("Parse block error");
1157 inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1158 return Some(arc_block);
1159 }
1160
1161 None
1162 }
1163
1164 pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1169 let mut whether_need_scan_backward = match start {
1170 Bound::Included(id) => Some(id),
1171 Bound::Excluded(id) => Some(id.inc(1)),
1172 Bound::Unbounded => None,
1173 };
1174
1175 {
1176 let start = start.map(|id| id.to_bytes());
1177 let end = end.map(|id| id.to_bytes());
1178 let kv = self.external_kv.lock();
1179 let mut inner = self.inner.lock();
1180 for (id, bytes) in kv
1181 .scan(
1182 start.as_ref().map(|x| x.as_slice()),
1183 end.as_ref().map(|x| x.as_slice()),
1184 )
1185 .filter(|(id, _)| id.len() == 12)
1186 {
1187 let id = ID::from_bytes(&id);
1188 if let Some(expected_start_id) = whether_need_scan_backward {
1189 if id == expected_start_id {
1190 whether_need_scan_backward = None;
1191 }
1192 }
1193
1194 if inner.mem_parsed_kv.contains_key(&id) {
1195 continue;
1196 }
1197
1198 let block = ChangesBlock::from_bytes(bytes.clone())
1199 .expect("validated external change block should decode");
1200 inner.mem_parsed_kv.insert(id, Arc::new(block));
1201 }
1202 }
1203
1204 if let Some(start_id) = whether_need_scan_backward {
1205 self.ensure_id_lte(start_id);
1206 }
1207 }
1208
1209 pub(super) fn ensure_id_lte(&self, id: ID) {
1210 let kv = self.external_kv.lock();
1211 let mut inner = self.inner.lock();
1212 let Some((next_back_id, next_back_bytes)) = kv
1213 .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1214 .rfind(|(id, _)| id.len() == 12)
1215 else {
1216 return;
1217 };
1218
1219 let next_back_id = ID::from_bytes(&next_back_id);
1220 if next_back_id.peer == id.peer {
1221 if inner.mem_parsed_kv.contains_key(&next_back_id) {
1222 return;
1223 }
1224
1225 let block = ChangesBlock::from_bytes(next_back_bytes)
1226 .expect("validated external change block should decode");
1227 inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1228 }
1229 }
1230 }
1231}
1232
1233#[must_use]
1234#[derive(Clone, Debug)]
1235pub(crate) struct BatchDecodeInfo {
1236 pub vv: VersionVector,
1237 pub frontiers: Frontiers,
1238 pub start_version: Option<(VersionVector, Frontiers)>,
1239}
1240
1241#[derive(Clone, Debug)]
1242pub struct BlockChangeRef {
1243 block: Arc<ChangesBlock>,
1244 change_index: usize,
1245}
1246
1247impl Deref for BlockChangeRef {
1248 type Target = Change;
1249 fn deref(&self) -> &Change {
1250 &self.block.content.try_changes().unwrap()[self.change_index]
1251 }
1252}
1253
1254impl BlockChangeRef {
1255 pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1256 if counter >= self.ctr_end() {
1257 return None;
1258 }
1259
1260 let index = self.ops.search_atom_index(counter);
1261 Some(BlockOpRef {
1262 block: self.block.clone(),
1263 change_index: self.change_index,
1264 op_index: index,
1265 })
1266 }
1267}
1268
1269#[derive(Clone, Debug)]
1270pub(crate) struct BlockOpRef {
1271 pub block: Arc<ChangesBlock>,
1272 pub change_index: usize,
1273 pub op_index: usize,
1274}
1275
1276impl Deref for BlockOpRef {
1277 type Target = Op;
1278
1279 fn deref(&self) -> &Op {
1280 &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1281 }
1282}
1283
1284impl BlockOpRef {
1285 pub fn lamport(&self) -> Lamport {
1286 let change = &self.block.content.try_changes().unwrap()[self.change_index];
1287 let op = &change.ops[self.op_index];
1288 (op.counter - change.id.counter) as Lamport + change.lamport
1289 }
1290}
1291
1292impl ChangesBlock {
1293 fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1294 let len = bytes.len();
1295 let bytes = ChangesBlockBytes::new(bytes);
1296 bytes.ensure_header()?;
1297 let header = bytes
1298 .header
1299 .get()
1300 .expect("header should be initialized after ensure_header");
1301 let peer = header.peer;
1302 let counter_range = (
1303 header.counter,
1304 *header.counters.last().ok_or_else(|| {
1305 LoroError::DecodeError("Decode block error: missing counters".into())
1306 })?,
1307 );
1308 let lamport_range = (
1309 *header.lamports.first().ok_or_else(|| {
1310 LoroError::DecodeError("Decode block error: missing lamports".into())
1311 })?,
1312 *header.lamports.last().ok_or_else(|| {
1313 LoroError::DecodeError("Decode block error: missing lamports".into())
1314 })?,
1315 );
1316 let content = ChangesBlockContent::Bytes(bytes);
1317 Ok(Self {
1318 peer,
1319 estimated_size: len,
1320 counter_range,
1321 lamport_range,
1322 flushed: true,
1323 content,
1324 })
1325 }
1326
1327 #[allow(dead_code)]
1328 pub(crate) fn content(&self) -> &ChangesBlockContent {
1329 &self.content
1330 }
1331
1332 fn new(change: Change, _a: &SharedArena) -> Self {
1333 let atom_len = change.atom_len();
1334 let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1335 let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1336 let estimated_size = change.estimate_storage_size();
1337 let peer = change.id.peer;
1338 let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1339 Self {
1340 peer,
1341 counter_range,
1342 lamport_range,
1343 estimated_size,
1344 content,
1345 flushed: false,
1346 }
1347 }
1348
1349 #[allow(unused)]
1350 fn cmp_id(&self, id: ID) -> Ordering {
1351 self.peer.cmp(&id.peer).then_with(|| {
1352 if self.counter_range.0 > id.counter {
1353 Ordering::Greater
1354 } else if self.counter_range.1 <= id.counter {
1355 Ordering::Less
1356 } else {
1357 Ordering::Equal
1358 }
1359 })
1360 }
1361
1362 #[allow(unused)]
1363 fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1364 self.peer.cmp(&idlp.0).then_with(|| {
1365 if self.lamport_range.0 > idlp.1 {
1366 Ordering::Greater
1367 } else if self.lamport_range.1 <= idlp.1 {
1368 Ordering::Less
1369 } else {
1370 Ordering::Equal
1371 }
1372 })
1373 }
1374
1375 #[allow(unused)]
1376 fn is_full(&self) -> bool {
1377 self.estimated_size > MAX_BLOCK_SIZE
1378 }
1379
1380 #[allow(clippy::result_large_err)]
1381 fn push_change(
1382 self: &mut Arc<Self>,
1383 change: Change,
1384 new_change_size: usize,
1385 merge_interval: i64,
1386 a: &SharedArena,
1387 ) -> Result<(), Change> {
1388 if self.counter_range.1 != change.id.counter {
1389 return Err(change);
1390 }
1391
1392 let atom_len = change.atom_len();
1393 let next_lamport = change.lamport + atom_len as Lamport;
1394 let next_counter = change.id.counter + atom_len as Counter;
1395
1396 let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1397 let this = Arc::make_mut(self);
1398 let changes = this.content.changes_mut(a).unwrap();
1399 let changes = Arc::make_mut(changes);
1400 match changes.last_mut() {
1401 Some(last)
1402 if last.can_merge_right(&change, merge_interval)
1403 && (!is_full
1404 || (change.ops.len() == 1
1405 && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1406 {
1407 for op in change.ops.into_iter() {
1408 let size = op.estimate_storage_size();
1409 if !last.ops.push(op) {
1410 this.estimated_size += size;
1411 }
1412 }
1413 }
1414 _ => {
1415 if is_full {
1416 return Err(change);
1417 } else {
1418 this.estimated_size += new_change_size;
1419 changes.push(change);
1420 }
1421 }
1422 }
1423
1424 this.flushed = false;
1425 this.counter_range.1 = next_counter;
1426 this.lamport_range.1 = next_lamport;
1427 Ok(())
1428 }
1429
1430 fn to_bytes(self: &mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1431 match &self.content {
1432 ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1433 ChangesBlockContent::Both(_, bytes) => {
1434 let bytes = bytes.clone();
1435 let this = Arc::make_mut(self);
1436 this.content = ChangesBlockContent::Bytes(bytes.clone());
1437 bytes
1438 }
1439 ChangesBlockContent::Changes(changes) => {
1440 let bytes = ChangesBlockBytes::serialize(changes, a);
1441 let this = Arc::make_mut(self);
1442 this.content = ChangesBlockContent::Bytes(bytes.clone());
1443 bytes
1444 }
1445 }
1446 }
1447
1448 fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1449 match &self.content {
1450 ChangesBlockContent::Changes(_) => Ok(()),
1451 ChangesBlockContent::Both(_, _) => Ok(()),
1452 ChangesBlockContent::Bytes(bytes) => {
1453 let changes = bytes.parse(a)?;
1454 let b = bytes.clone();
1455 let this = Arc::make_mut(self);
1456 this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1457 Ok(())
1458 }
1459 }
1460 }
1461
1462 fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1463 let changes = self.content.try_changes().unwrap();
1464 changes.binary_search_by(|c| {
1465 if c.id.counter > counter {
1466 Ordering::Greater
1467 } else if (c.id.counter + c.content_len() as Counter) <= counter {
1468 Ordering::Less
1469 } else {
1470 Ordering::Equal
1471 }
1472 })
1473 }
1474
1475 fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1476 let changes = self.content.try_changes().unwrap();
1477 let r = changes.binary_search_by(|c| {
1478 if c.lamport > lamport {
1479 Ordering::Greater
1480 } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1481 Ordering::Less
1482 } else {
1483 Ordering::Equal
1484 }
1485 });
1486
1487 match r {
1488 Ok(found) => Some(found),
1489 Err(idx) => {
1490 if idx == 0 {
1491 None
1492 } else {
1493 Some(idx - 1)
1494 }
1495 }
1496 }
1497 }
1498
1499 #[allow(unused)]
1500 fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1501 self.content.changes(a)
1502 }
1503
1504 #[allow(unused)]
1505 fn id(&self) -> ID {
1506 ID::new(self.peer, self.counter_range.0)
1507 }
1508
1509 pub fn change_num(&self) -> usize {
1510 match &self.content {
1511 ChangesBlockContent::Changes(c) => c.len(),
1512 ChangesBlockContent::Bytes(b) => b.len_changes(),
1513 ChangesBlockContent::Both(c, _) => c.len(),
1514 }
1515 }
1516}
1517
1518impl ChangesBlockContent {
1519 pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1521 let mut dag_nodes = Vec::new();
1522 match self {
1523 ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1524 for change in c.iter() {
1525 let new_node = AppDagNodeInner {
1526 peer: change.id.peer,
1527 cnt: change.id.counter,
1528 lamport: change.lamport,
1529 deps: change.deps.clone(),
1530 vv: OnceCell::new(),
1531 has_succ: false,
1532 len: change.atom_len(),
1533 }
1534 .into();
1535
1536 dag_nodes.push_rle_element(new_node);
1537 }
1538 }
1539 ChangesBlockContent::Bytes(b) => {
1540 b.ensure_header().unwrap();
1541 let header = b.header.get().unwrap();
1542 let n = header.n_changes;
1543 for i in 0..n {
1544 let new_node = AppDagNodeInner {
1545 peer: header.peer,
1546 cnt: header.counters[i],
1547 lamport: header.lamports[i],
1548 deps: header.deps_groups[i].clone(),
1549 vv: OnceCell::new(),
1550 has_succ: false,
1551 len: (header.counters[i + 1] - header.counters[i]) as usize,
1552 }
1553 .into();
1554
1555 dag_nodes.push_rle_element(new_node);
1556 }
1557 }
1558 }
1559
1560 dag_nodes
1561 }
1562
1563 #[allow(unused)]
1564 pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1565 match self {
1566 ChangesBlockContent::Changes(changes) => Ok(changes),
1567 ChangesBlockContent::Both(changes, _) => Ok(changes),
1568 ChangesBlockContent::Bytes(bytes) => {
1569 let changes = bytes.parse(a)?;
1570 *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1571 self.changes(a)
1572 }
1573 }
1574 }
1575
1576 fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1578 match self {
1579 ChangesBlockContent::Changes(changes) => Ok(changes),
1580 ChangesBlockContent::Both(changes, _) => {
1581 *self = ChangesBlockContent::Changes(std::mem::take(changes));
1582 self.changes_mut(a)
1583 }
1584 ChangesBlockContent::Bytes(bytes) => {
1585 let changes = bytes.parse(a)?;
1586 *self = ChangesBlockContent::Changes(Arc::new(changes));
1587 self.changes_mut(a)
1588 }
1589 }
1590 }
1591
1592 pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1593 match self {
1594 ChangesBlockContent::Changes(changes) => Some(changes),
1595 ChangesBlockContent::Both(changes, _) => Some(changes),
1596 ChangesBlockContent::Bytes(_) => None,
1597 }
1598 }
1599
1600 #[allow(dead_code)]
1601 pub(crate) fn len_changes(&self) -> usize {
1602 match self {
1603 ChangesBlockContent::Changes(changes) => changes.len(),
1604 ChangesBlockContent::Both(changes, _) => changes.len(),
1605 ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1606 }
1607 }
1608}
1609
1610impl std::fmt::Debug for ChangesBlockContent {
1611 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1612 match self {
1613 ChangesBlockContent::Changes(changes) => f
1614 .debug_tuple("ChangesBlockContent::Changes")
1615 .field(changes)
1616 .finish(),
1617 ChangesBlockContent::Bytes(_bytes) => {
1618 f.debug_tuple("ChangesBlockContent::Bytes").finish()
1619 }
1620 ChangesBlockContent::Both(changes, _bytes) => f
1621 .debug_tuple("ChangesBlockContent::Both")
1622 .field(changes)
1623 .finish(),
1624 }
1625 }
1626}
1627
1628impl ChangesBlockBytes {
1629 fn new(bytes: Bytes) -> Self {
1630 Self {
1631 header: OnceCell::new(),
1632 bytes,
1633 }
1634 }
1635
1636 fn ensure_header(&self) -> LoroResult<()> {
1637 self.header
1638 .get_or_try_init(|| decode_header(&self.bytes).map(Arc::new))?;
1639 Ok(())
1640 }
1641
1642 fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1643 self.ensure_header()?;
1644 let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1645 for c in ans.iter() {
1646 register_container_and_parent_link(a, c)
1648 }
1649
1650 Ok(ans)
1651 }
1652
1653 fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1654 let bytes = encode_block(changes, a);
1655 let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1657 bytes.ensure_header().unwrap();
1658 bytes
1659 }
1660
1661 fn lamport_range(&mut self) -> (Lamport, Lamport) {
1662 if let Some(header) = self.header.get() {
1663 (header.lamports[0], *header.lamports.last().unwrap())
1664 } else {
1665 decode_block_range(&self.bytes).unwrap().1
1666 }
1667 }
1668
1669 fn len_changes(&self) -> usize {
1671 self.ensure_header().unwrap();
1672 self.header.get().unwrap().n_changes
1673 }
1674}
1675
1676#[cfg(test)]
1677mod test {
1678 use crate::cursor::PosType;
1679 use crate::{
1680 loro::ExportMode, oplog::convert_change_to_remote, state::TreeParentId, ListHandler,
1681 LoroDoc, MovableListHandler, TextHandler, TreeHandler,
1682 };
1683
1684 use super::*;
1685
1686 fn test_encode_decode(doc: LoroDoc) {
1687 doc.commit_then_renew();
1688 let oplog = doc.oplog().lock();
1689 let bytes = oplog
1690 .change_store
1691 .encode_all(oplog.vv(), oplog.dag.frontiers());
1692 let store = ChangeStore::new_for_test();
1693 let _ = store.import_all(bytes.clone()).unwrap();
1694 assert_eq!(store.external_kv.lock().export_all(), bytes);
1695 let mut changes_parsed = Vec::new();
1696 let a = store.arena.clone();
1697 store.visit_all_changes(&mut |c| {
1698 changes_parsed.push(convert_change_to_remote(&a, c));
1699 });
1700 let mut changes = Vec::new();
1701 oplog.change_store.visit_all_changes(&mut |c| {
1702 changes.push(convert_change_to_remote(&oplog.arena, c));
1703 });
1704 assert_eq!(changes_parsed, changes);
1705 }
1706
1707 #[test]
1708 fn test_change_store() {
1709 let doc = LoroDoc::new_auto_commit();
1710 doc.set_record_timestamp(true);
1711 let t = doc.get_text("t");
1712 t.insert(0, "hello", PosType::Unicode).unwrap();
1713 doc.commit_then_renew();
1714 let t = doc.get_list("t");
1715 t.insert(0, "hello").unwrap();
1716 test_encode_decode(doc);
1717 }
1718
1719 #[test]
1720 fn test_synced_doc() -> LoroResult<()> {
1721 let doc_a = LoroDoc::new_auto_commit();
1722 let doc_b = LoroDoc::new_auto_commit();
1723 let doc_c = LoroDoc::new_auto_commit();
1724
1725 {
1726 let map = doc_a.get_map("root");
1728 map.insert_container("text", TextHandler::new_detached())?;
1729 map.insert_container("list", ListHandler::new_detached())?;
1730 map.insert_container("tree", TreeHandler::new_detached())?;
1731 }
1732
1733 {
1734 let initial_state = doc_a.export(ExportMode::all_updates()).unwrap();
1736 doc_b.import(&initial_state)?;
1737 doc_c.import(&initial_state)?;
1738 }
1739
1740 {
1741 let map = doc_b.get_map("root");
1743 let text = map
1744 .insert_container("text", TextHandler::new_detached())
1745 .unwrap();
1746 text.insert(0, "Hello, ", PosType::Unicode)?;
1747
1748 let list = map
1749 .insert_container("list", ListHandler::new_detached())
1750 .unwrap();
1751 list.push("world")?;
1752 }
1753
1754 {
1755 let map = doc_c.get_map("root");
1757 let tree = map
1758 .insert_container("tree", TreeHandler::new_detached())
1759 .unwrap();
1760 let node_id = tree.create(TreeParentId::Root)?;
1761 tree.get_meta(node_id)?.insert("key", "value")?;
1762 let node_b = tree.create(TreeParentId::Root)?;
1763 tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1764
1765 let movable_list = map
1766 .insert_container("movable", MovableListHandler::new_detached())
1767 .unwrap();
1768 movable_list.push("item1".into())?;
1769 movable_list.push("item2".into())?;
1770 movable_list.mov(0, 1)?;
1771 }
1772
1773 let b_changes = doc_b
1775 .export(ExportMode::updates(&doc_a.oplog_vv()))
1776 .unwrap();
1777 doc_a.import(&b_changes)?;
1778
1779 let c_changes = doc_c
1781 .export(ExportMode::updates(&doc_a.oplog_vv()))
1782 .unwrap();
1783 doc_a.import(&c_changes)?;
1784
1785 test_encode_decode(doc_a);
1786 Ok(())
1787 }
1788}