Skip to main content

loro_internal/oplog/
change_store.rs

1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::sync::Mutex;
4use crate::{
5    arena::SharedArena,
6    change::Change,
7    estimated_size::EstimatedSize,
8    kv_store::KvStore,
9    op::Op,
10    parent::register_container_and_parent_link,
11    version::{Frontiers, ImVersionVector},
12    VersionVector,
13};
14use block_encode::decode_block_range;
15use bytes::Bytes;
16use itertools::Itertools;
17use loro_common::{
18    Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError,
19    LoroResult, PeerID, ID,
20};
21use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
22use once_cell::sync::OnceCell;
23use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
24use std::sync::atomic::AtomicI64;
25use std::{
26    cmp::Ordering,
27    collections::{BTreeMap, VecDeque},
28    ops::{Bound, Deref},
29    sync::Arc,
30};
31use tracing::{info_span, warn};
32mod block_encode;
33mod block_meta_encode;
34pub(super) mod iter;
35
36#[cfg(not(test))]
37const MAX_BLOCK_SIZE: usize = 1024 * 4;
38#[cfg(test)]
39const MAX_BLOCK_SIZE: usize = 128;
40
41/// # Invariance
42///
43/// - We don't allow holes in a block or between two blocks with the same peer id.
44///   The [Change] should be continuous for each peer.
45/// - However, the first block of a peer can have counter > 0 so that we can trim the history.
46///
47/// # Encoding Schema
48///
49/// It's based on the underlying KV store.
50///
51/// The entries of the KV store is made up of the following fields
52///
53/// |Key                          |Value             |
54/// |:--                          |:----             |
55/// |b"vv"                        |VersionVector     |
56/// |b"fr"                        |Frontiers         |
57/// |b"sv"                        |Shallow VV        |
58/// |b"sf"                        |Shallow Frontiers |
59/// |12 bytes PeerID + Counter    |Encoded Block     |
60#[derive(Debug, Clone)]
61pub struct ChangeStore {
62    inner: Arc<Mutex<ChangeStoreInner>>,
63    arena: SharedArena,
64    /// A change may be in external_kv or in the mem_parsed_kv.
65    /// mem_parsed_kv is more up-to-date.
66    ///
67    /// We cannot directly write into the external_kv except from the initial load
68    external_kv: Arc<Mutex<dyn KvStore>>,
69    /// The version vector of the external kv store.
70    external_vv: Arc<Mutex<VersionVector>>,
71    merge_interval: Arc<AtomicI64>,
72}
73
74#[derive(Debug, Clone)]
75struct ChangeStoreInner {
76    /// The start version vector of the first block for each peer.
77    /// It allows us to trim the history
78    start_vv: ImVersionVector,
79    /// The last version of the shallow history.
80    start_frontiers: Frontiers,
81    /// It's more like a parsed cache for binary_kv.
82    mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
83}
84
85#[derive(Debug)]
86pub(crate) struct ChangeStoreRollback {
87    old_vv: VersionVector,
88    blocks_before_mutation: BTreeMap<ID, Arc<ChangesBlock>>,
89}
90
91impl ChangeStoreRollback {
92    pub(crate) fn new(old_vv: VersionVector) -> Self {
93        Self {
94            old_vv,
95            blocks_before_mutation: BTreeMap::new(),
96        }
97    }
98
99    fn record_block_before_mutation(&mut self, id: ID, block: Arc<ChangesBlock>) {
100        let old_end = self.old_vv.get(&id.peer).copied().unwrap_or(0);
101        if id.counter >= old_end {
102            return;
103        }
104
105        self.blocks_before_mutation.entry(id).or_insert(block);
106    }
107}
108
109#[derive(Debug, Clone)]
110pub(crate) struct ChangesBlock {
111    peer: PeerID,
112    counter_range: (Counter, Counter),
113    lamport_range: (Lamport, Lamport),
114    /// Estimated size of the block in bytes
115    estimated_size: usize,
116    flushed: bool,
117    content: ChangesBlockContent,
118}
119
120#[derive(Clone)]
121pub(crate) enum ChangesBlockContent {
122    Changes(Arc<Vec<Change>>),
123    Bytes(ChangesBlockBytes),
124    Both(Arc<Vec<Change>>, ChangesBlockBytes),
125}
126
127/// It's cheap to clone this struct because it's cheap to clone the bytes
128#[derive(Clone)]
129pub(crate) struct ChangesBlockBytes {
130    bytes: Bytes,
131    header: OnceCell<Arc<ChangesBlockHeader>>,
132}
133
134pub const START_VV_KEY: &[u8] = b"sv";
135pub const START_FRONTIERS_KEY: &[u8] = b"sf";
136pub const VV_KEY: &[u8] = b"vv";
137pub const FRONTIERS_KEY: &[u8] = b"fr";
138
139impl ChangeStore {
140    pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
141        Self {
142            inner: Arc::new(Mutex::new(ChangeStoreInner {
143                start_vv: ImVersionVector::new(),
144                start_frontiers: Frontiers::default(),
145                mem_parsed_kv: BTreeMap::new(),
146            })),
147            arena: a.clone(),
148            external_vv: Arc::new(Mutex::new(VersionVector::new())),
149            external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
150            // external_kv: Arc::new(Mutex::new(BTreeMap::default())),
151            merge_interval,
152        }
153    }
154
155    #[cfg(test)]
156    fn new_for_test() -> Self {
157        Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
158    }
159
160    pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
161        self.flush_and_compact(vv, frontiers);
162        let mut kv = self.external_kv.lock();
163        kv.export_all()
164    }
165
166    #[tracing::instrument(skip(self), level = "debug")]
167    pub(super) fn export_from(
168        &self,
169        start_vv: &VersionVector,
170        start_frontiers: &Frontiers,
171        latest_vv: &VersionVector,
172        latest_frontiers: &Frontiers,
173    ) -> Bytes {
174        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
175        for span in latest_vv.sub_iter(start_vv) {
176            // PERF: this can be optimized by reusing the current encoded blocks
177            // In the current method, it needs to parse and re-encode the blocks
178            for c in self.iter_changes(span) {
179                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
180                    as usize)
181                    .min(c.atom_len());
182                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
183                    as usize)
184                    .min(c.atom_len());
185
186                if start == end {
187                    continue;
188                }
189
190                let ch = c.slice(start, end);
191                new_store.insert_change(ch, false, false);
192            }
193        }
194
195        loro_common::debug!(
196            "start_vv={:?} start_frontiers={:?}",
197            &start_vv,
198            start_frontiers
199        );
200        new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
201    }
202
203    pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
204        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
205        for span in spans {
206            let mut span = *span;
207            span.normalize_();
208            if span.counter.end <= 0 {
209                continue;
210            }
211
212            span.counter.start = span.counter.start.max(0);
213            span.counter.end = span.counter.end.max(0);
214            if span.counter.start >= span.counter.end {
215                continue;
216            }
217
218            // PERF: this can be optimized by reusing the current encoded blocks
219            // In the current method, it needs to parse and re-encode the blocks
220            for c in self.iter_changes(span) {
221                let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
222                let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
223                if start == end {
224                    continue;
225                }
226
227                let ch = c.slice(start, end);
228                new_store.insert_change(ch, false, false);
229            }
230        }
231
232        encode_blocks_in_store(new_store, &self.arena, w);
233    }
234
235    fn encode_from(
236        &self,
237        start_vv: &VersionVector,
238        start_frontiers: &Frontiers,
239        latest_vv: &VersionVector,
240        latest_frontiers: &Frontiers,
241    ) -> Bytes {
242        {
243            let mut store = self.external_kv.lock();
244            store.set(START_VV_KEY, start_vv.encode().into());
245            store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
246            let mut inner = self.inner.lock();
247            inner.start_frontiers = start_frontiers.clone();
248            inner.start_vv = ImVersionVector::from_vv(start_vv);
249        }
250        self.flush_and_compact(latest_vv, latest_frontiers);
251        self.external_kv.lock().export_all()
252    }
253
254    pub(crate) fn decode_snapshot_for_updates(
255        bytes: Bytes,
256        arena: &SharedArena,
257        self_vv: &VersionVector,
258    ) -> Result<Vec<Change>, LoroError> {
259        let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
260        let _ = change_store.import_all(bytes)?;
261        let mut changes = Vec::new();
262        change_store.visit_all_changes(&mut |c| {
263            let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
264            if c.id.counter >= cnt_threshold {
265                changes.push(c.clone());
266                return;
267            }
268
269            let change_end = c.ctr_end();
270            if change_end > cnt_threshold {
271                changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
272            }
273        });
274
275        Ok(changes)
276    }
277
278    pub(crate) fn decode_block_bytes(
279        bytes: Bytes,
280        arena: &SharedArena,
281        self_vv: &VersionVector,
282    ) -> LoroResult<Vec<Change>> {
283        let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
284        if ans.is_empty() {
285            return Ok(ans);
286        }
287
288        let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
289        ans.retain_mut(|c| {
290            if c.id.counter >= start {
291                true
292            } else if c.ctr_end() > start {
293                *c = c.slice((start - c.id.counter) as usize, c.atom_len());
294                true
295            } else {
296                false
297            }
298        });
299
300        Ok(ans)
301    }
302
303    pub(crate) fn rollback_import(&self, rollback: ChangeStoreRollback) {
304        let mut inner = self.inner.lock();
305        inner.mem_parsed_kv.retain(|id, _| {
306            let old_end = rollback.old_vv.get(&id.peer).copied().unwrap_or(0);
307            id.counter < old_end
308        });
309
310        for (id, block) in rollback.blocks_before_mutation {
311            inner.mem_parsed_kv.insert(id, block);
312        }
313    }
314
315    pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
316        let block = self.get_block_that_contains(id)?;
317        Some(block.content.iter_dag_nodes())
318    }
319
320    pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
321        let block = self.get_the_last_block_of_peer(peer)?;
322        Some(block.content.iter_dag_nodes())
323    }
324
325    pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
326        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
327        let mut inner = self.inner.lock();
328        for (_, block) in inner.mem_parsed_kv.iter_mut() {
329            block
330                .ensure_changes(&self.arena)
331                .expect("Parse block error");
332            for c in block.content.try_changes().unwrap() {
333                f(c);
334            }
335        }
336    }
337
338    pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
339        if id_span.counter.start == id_span.counter.end {
340            return vec![];
341        }
342
343        assert!(id_span.counter.start < id_span.counter.end);
344        self.ensure_block_loaded_in_range(
345            Bound::Included(id_span.id_start()),
346            Bound::Excluded(id_span.id_end()),
347        );
348        let mut inner = self.inner.lock();
349        let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
350        match next_back {
351            None => {
352                return vec![];
353            }
354            Some(next_back) => {
355                if next_back.0.peer != id_span.peer {
356                    return vec![];
357                }
358            }
359        }
360        let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
361        let ans = inner
362            .mem_parsed_kv
363            .range_mut(
364                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
365            )
366            .filter_map(|(_id, block)| {
367                if block.counter_range.1 < id_span.counter.start {
368                    return None;
369                }
370
371                block
372                    .ensure_changes(&self.arena)
373                    .expect("Parse block error");
374                let changes = block.content.try_changes().unwrap();
375                let start;
376                let end;
377                if id_span.counter.start <= block.counter_range.0
378                    && id_span.counter.end >= block.counter_range.1
379                {
380                    start = 0;
381                    end = changes.len();
382                } else {
383                    start = block
384                        .get_change_index_by_counter(id_span.counter.start)
385                        .unwrap_or_else(|x| x);
386
387                    match block.get_change_index_by_counter(id_span.counter.end - 1) {
388                        Ok(e) => {
389                            end = e + 1;
390                        }
391                        Err(0) => return None,
392                        Err(e) => {
393                            end = e;
394                        }
395                    }
396                }
397                if start == end {
398                    return None;
399                }
400
401                Some((block.clone(), start, end))
402            })
403            // TODO: PERF avoid alloc
404            .collect_vec();
405
406        ans
407    }
408
409    pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
410        let v = self.iter_blocks(id_span);
411        #[cfg(debug_assertions)]
412        {
413            if !v.is_empty() {
414                assert_eq!(v[0].0.peer, id_span.peer);
415                assert_eq!(v.last().unwrap().0.peer, id_span.peer);
416                {
417                    // Test start
418                    let (block, start, _end) = v.first().unwrap();
419                    let changes = block.content.try_changes().unwrap();
420                    assert!(changes[*start].id.counter <= id_span.counter.start);
421                }
422                {
423                    // Test end
424                    let (block, _start, end) = v.last().unwrap();
425                    let changes = block.content.try_changes().unwrap();
426                    assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
427                    assert!(changes[*end - 1].id.counter < id_span.counter.end);
428                }
429            }
430        }
431
432        v.into_iter().flat_map(move |(block, start, end)| {
433            (start..end).map(move |i| BlockChangeRef {
434                change_index: i,
435                block: block.clone(),
436            })
437        })
438    }
439
440    #[allow(dead_code)]
441    pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
442        let mut inner = self.inner.lock();
443        let start_counter = inner
444            .mem_parsed_kv
445            .range(..=id_span.id_start())
446            .next_back()
447            .map(|(id, _)| id.counter)
448            .unwrap_or(0);
449        let vec = inner
450            .mem_parsed_kv
451            .range_mut(
452                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
453            )
454            .filter_map(|(_id, block)| {
455                if block.counter_range.1 < id_span.counter.start {
456                    return None;
457                }
458
459                block
460                    .ensure_changes(&self.arena)
461                    .expect("Parse block error");
462                Some(block.clone())
463            })
464            // TODO: PERF avoid alloc
465            .collect();
466        vec
467    }
468
469    pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
470        self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
471        let inner = self.inner.lock();
472        let block = inner
473            .mem_parsed_kv
474            .range(..=id)
475            .next_back()
476            .filter(|(_, block)| {
477                block.peer == id.peer
478                    && block.counter_range.0 <= id.counter
479                    && id.counter < block.counter_range.1
480            })
481            .map(|(_, block)| block.clone());
482
483        block
484    }
485
486    pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
487        let end_id = ID::new(peer, Counter::MAX);
488        self.ensure_id_lte(end_id);
489        let inner = self.inner.lock();
490        let block = inner
491            .mem_parsed_kv
492            .range(..=end_id)
493            .next_back()
494            .filter(|(_, block)| block.peer == peer)
495            .map(|(_, block)| block.clone());
496
497        block
498    }
499
500    pub fn change_num(&self) -> usize {
501        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
502        let mut inner = self.inner.lock();
503        inner
504            .mem_parsed_kv
505            .iter_mut()
506            .map(|(_, block)| block.change_num())
507            .sum()
508    }
509
510    pub fn fork(
511        &self,
512        arena: SharedArena,
513        merge_interval: Arc<AtomicI64>,
514        vv: &VersionVector,
515        frontiers: &Frontiers,
516    ) -> Self {
517        self.flush_and_compact(vv, frontiers);
518        let inner = self.inner.lock();
519        Self {
520            inner: Arc::new(Mutex::new(ChangeStoreInner {
521                start_vv: inner.start_vv.clone(),
522                start_frontiers: inner.start_frontiers.clone(),
523                mem_parsed_kv: BTreeMap::new(),
524            })),
525            arena,
526            external_vv: Arc::new(Mutex::new(self.external_vv.lock().clone())),
527            external_kv: self.external_kv.lock().clone_store(),
528            merge_interval,
529        }
530    }
531
532    pub fn kv_size(&self) -> usize {
533        self.external_kv
534            .lock()
535            .scan(Bound::Unbounded, Bound::Unbounded)
536            .map(|(k, v)| k.len() + v.len())
537            .sum()
538    }
539
540    pub(crate) fn export_blocks_from<W: std::io::Write>(
541        &self,
542        start_vv: &VersionVector,
543        shallow_since_vv: &ImVersionVector,
544        latest_vv: &VersionVector,
545        w: &mut W,
546    ) {
547        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
548        for mut span in latest_vv.sub_iter(start_vv) {
549            let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
550            span.counter.start = span.counter.start.max(counter_lower_bound);
551            span.counter.end = span.counter.end.max(counter_lower_bound);
552            if span.counter.start >= span.counter.end {
553                continue;
554            }
555
556            // PERF: this can be optimized by reusing the current encoded blocks
557            // In the current method, it needs to parse and re-encode the blocks
558            for c in self.iter_changes(span) {
559                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
560                    as usize)
561                    .min(c.atom_len());
562                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
563                    as usize)
564                    .min(c.atom_len());
565
566                assert_ne!(start, end);
567                let ch = c.slice(start, end);
568                new_store.insert_change(ch, false, false);
569            }
570        }
571
572        let arena = &self.arena;
573        encode_blocks_in_store(new_store, arena, w);
574    }
575
576    pub(crate) fn fork_changes_up_to(
577        &self,
578        start_vv: &ImVersionVector,
579        frontiers: &Frontiers,
580        vv: &VersionVector,
581    ) -> Bytes {
582        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
583        for mut span in vv.sub_iter_im(start_vv) {
584            let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
585            span.counter.start = span.counter.start.max(counter_lower_bound);
586            span.counter.end = span.counter.end.max(counter_lower_bound);
587            if span.counter.start >= span.counter.end {
588                continue;
589            }
590
591            // PERF: this can be optimized by reusing the current encoded blocks
592            // In the current method, it needs to parse and re-encode the blocks
593            for c in self.iter_changes(span) {
594                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
595                    as usize)
596                    .min(c.atom_len());
597                let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
598                    as usize)
599                    .min(c.atom_len());
600
601                assert_ne!(start, end);
602                let ch = c.slice(start, end);
603                new_store.insert_change(ch, false, false);
604            }
605        }
606
607        new_store.encode_all(vv, frontiers)
608    }
609}
610
611fn encode_blocks_in_store<W: std::io::Write>(
612    new_store: ChangeStore,
613    arena: &SharedArena,
614    w: &mut W,
615) {
616    let mut inner = new_store.inner.lock();
617    for (_id, block) in inner.mem_parsed_kv.iter_mut() {
618        let bytes = block.to_bytes(arena);
619        leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
620        w.write_all(&bytes.bytes).unwrap();
621    }
622}
623
624mod mut_external_kv {
625    //! Only this module contains the code that mutate the external kv store
626    //! All other modules should only read from the external kv store
627    use super::*;
628
629    impl ChangeStore {
630        #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
631        pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
632            let mut kv_store = self.external_kv.lock();
633            assert!(
634                // 2 because there are vv and frontiers
635                kv_store.len() <= 2,
636                "kv store should be empty when using decode_all"
637            );
638            kv_store
639                .import_all(bytes)
640                .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
641            drop(kv_store);
642            self.validate_imported_change_blocks()?;
643            let vv_bytes = self.external_kv.lock().get(VV_KEY).unwrap_or_default();
644            let vv = VersionVector::decode(&vv_bytes)
645                .map_err(|_| LoroError::DecodeDataCorruptionError)?;
646            let start_vv_bytes = self
647                .external_kv
648                .lock()
649                .get(START_VV_KEY)
650                .unwrap_or_default();
651            let start_vv = if start_vv_bytes.is_empty() {
652                Default::default()
653            } else {
654                VersionVector::decode(&start_vv_bytes)
655                    .map_err(|_| LoroError::DecodeDataCorruptionError)?
656            };
657
658            #[cfg(test)]
659            {
660                // This is for tests
661                for (peer, cnt) in vv.iter() {
662                    self.get_change(ID::new(*peer, *cnt - 1))
663                        .ok_or(LoroError::DecodeDataCorruptionError)?;
664                }
665            }
666
667            *self.external_vv.lock() = vv.clone();
668            let frontiers_bytes = self
669                .external_kv
670                .lock()
671                .get(FRONTIERS_KEY)
672                .unwrap_or_default();
673            let frontiers = Frontiers::decode(&frontiers_bytes)
674                .map_err(|_| LoroError::DecodeDataCorruptionError)?;
675            let start_frontiers = self
676                .external_kv
677                .lock()
678                .get(START_FRONTIERS_KEY)
679                .unwrap_or_default();
680            let start_frontiers = if start_frontiers.is_empty() {
681                Default::default()
682            } else {
683                Frontiers::decode(&start_frontiers)
684                    .map_err(|_| LoroError::DecodeDataCorruptionError)?
685            };
686
687            let mut max_lamport = None;
688            let mut max_timestamp = 0;
689            for id in frontiers.iter() {
690                let c = self
691                    .get_change(id)
692                    .ok_or(LoroError::DecodeDataCorruptionError)?;
693                debug_assert_ne!(c.atom_len(), 0);
694                let l = c.lamport_last();
695                if let Some(x) = max_lamport {
696                    if l > x {
697                        max_lamport = Some(l);
698                    }
699                } else {
700                    max_lamport = Some(l);
701                }
702
703                let t = c.timestamp;
704                if t > max_timestamp {
705                    max_timestamp = t;
706                }
707            }
708
709            Ok(BatchDecodeInfo {
710                vv,
711                frontiers,
712                start_version: if start_vv.is_empty() {
713                    None
714                } else {
715                    let mut inner = self.inner.lock();
716                    inner.start_frontiers = start_frontiers.clone();
717                    inner.start_vv = ImVersionVector::from_vv(&start_vv);
718                    Some((start_vv, start_frontiers))
719                },
720            })
721        }
722
723        fn validate_imported_change_blocks(&self) -> LoroResult<()> {
724            let blocks: Vec<(ID, Bytes)> = {
725                let kv_store = self.external_kv.lock();
726                kv_store
727                    .scan(Bound::Unbounded, Bound::Unbounded)
728                    .filter(|(id, _)| id.len() == 12)
729                    .map(|(id, bytes)| (ID::from_bytes(&id), bytes))
730                    .collect()
731            };
732
733            for (_id, bytes) in blocks {
734                let mut block = Arc::new(ChangesBlock::from_bytes(bytes)?);
735                block.ensure_changes(&self.arena)?;
736            }
737
738            Ok(())
739        }
740
741        /// Flush the cached change to kv_store
742        pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
743            let mut inner = self.inner.lock();
744            let mut store = self.external_kv.lock();
745            let mut external_vv = self.external_vv.lock();
746            for (id, block) in inner.mem_parsed_kv.iter_mut() {
747                if !block.flushed {
748                    let id_bytes = id.to_bytes();
749                    let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
750                    assert!(
751                        counter_start < block.counter_range.1,
752                        "Peer={} Block Counter Range={:?}, counter_start={}",
753                        id.peer,
754                        &block.counter_range,
755                        counter_start
756                    );
757                    if counter_start > block.counter_range.0 {
758                        assert!(store.get(&id_bytes).is_some());
759                    }
760                    external_vv.insert(id.peer, block.counter_range.1);
761                    let bytes = block.to_bytes(&self.arena);
762                    store.set(&id_bytes, bytes.bytes);
763                    Arc::make_mut(block).flushed = true;
764                }
765            }
766
767            if inner.start_vv.is_empty() {
768                assert_eq!(&*external_vv, vv);
769            } else {
770                #[cfg(debug_assertions)]
771                {
772                    // TODO: makes some assertions here?
773                }
774            }
775            let vv_bytes = vv.encode();
776            let frontiers_bytes = frontiers.encode();
777            store.set(VV_KEY, vv_bytes.into());
778            store.set(FRONTIERS_KEY, frontiers_bytes.into());
779        }
780    }
781}
782
783mod mut_inner_kv {
784    //! Only this module contains the code that mutate the internal kv store
785    //! All other modules should only read from the internal kv store
786
787    use super::*;
788    impl ChangeStore {
789        /// This method is the **only place** that push a new change into the change store
790        ///
791        /// The new change either merges with the previous block or is put into a new block.
792        /// This method only updates the internal kv store.
793        pub fn insert_change(&self, change: Change, split_when_exceeds: bool, is_local: bool) {
794            self.insert_change_inner(change, split_when_exceeds, is_local, None);
795        }
796
797        pub(crate) fn insert_change_with_rollback(
798            &self,
799            change: Change,
800            split_when_exceeds: bool,
801            is_local: bool,
802            rollback: &mut ChangeStoreRollback,
803        ) {
804            self.insert_change_inner(change, split_when_exceeds, is_local, Some(rollback));
805        }
806
807        fn insert_change_inner(
808            &self,
809            mut change: Change,
810            split_when_exceeds: bool,
811            is_local: bool,
812            mut rollback: Option<&mut ChangeStoreRollback>,
813        ) {
814            #[cfg(debug_assertions)]
815            {
816                let vv = self.external_vv.lock();
817                assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
818            }
819
820            let s = info_span!("change_store insert_change", id = ?change.id);
821            let _e = s.enter();
822            let estimated_size = change.estimate_storage_size();
823            if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
824                self.split_change_then_insert(change, rollback.as_deref_mut());
825                return;
826            }
827
828            let id = change.id;
829            let mut inner = self.inner.lock();
830
831            // try to merge with previous block
832            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
833                if block.peer == change.id.peer {
834                    if block.counter_range.1 != change.id.counter {
835                        panic!("counter should be continuous")
836                    }
837
838                    if let Some(rollback) = &mut rollback {
839                        rollback.record_block_before_mutation(*_id, block.clone());
840                    }
841
842                    match block.push_change(
843                        change,
844                        estimated_size,
845                        if is_local {
846                            // local change should try to merge with previous change when
847                            // the timestamp interval <= the `merge_interval`
848                            self.merge_interval
849                                .load(std::sync::atomic::Ordering::Acquire)
850                        } else {
851                            0
852                        },
853                        &self.arena,
854                    ) {
855                        Ok(_) => {
856                            drop(inner);
857                            debug_assert!(self.get_change(id).is_some());
858                            return;
859                        }
860                        Err(c) => change = c,
861                    }
862                }
863            }
864
865            inner
866                .mem_parsed_kv
867                .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
868            drop(inner);
869            debug_assert!(self.get_change(id).is_some());
870        }
871
872        pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
873            let block = self.get_parsed_block(id)?;
874            Some(BlockChangeRef {
875                change_index: block.get_change_index_by_counter(id.counter).unwrap(),
876                block: block.clone(),
877            })
878        }
879
880        /// Get the change with the given peer and lamport.
881        ///
882        /// If not found, return the change with the greatest lamport that is smaller than the given lamport.
883        pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
884            // This method is complicated because we impl binary search on top of the range api
885            // It can be simplified
886            let mut inner = self.inner.lock();
887            let mut iter = inner
888                .mem_parsed_kv
889                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
890
891            // This won't change, we only adjust upper_bound
892            let mut lower_bound = 0;
893            let mut upper_bound = i32::MAX;
894            let mut is_binary_searching = false;
895            loop {
896                match iter.next_back() {
897                    Some((&id, block)) => {
898                        if block.lamport_range.0 <= idlp.lamport
899                            && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
900                        {
901                            if !is_binary_searching
902                                && upper_bound != i32::MAX
903                                && upper_bound != block.counter_range.1
904                            {
905                                warn!(
906                                    "There is a hole between the last block and the current block"
907                                );
908                                // There is hole between the last block and the current block
909                                // We need to load it from the kv store
910                                break;
911                            }
912
913                            // Found the block
914                            block
915                                .ensure_changes(&self.arena)
916                                .expect("Parse block error");
917                            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
918                            return Some(BlockChangeRef {
919                                change_index: index,
920                                block: block.clone(),
921                            });
922                        }
923
924                        if is_binary_searching {
925                            let mid_bound = (lower_bound + upper_bound) / 2;
926                            if block.lamport_range.1 <= idlp.lamport {
927                                // Target is larger than the current block (pointed by mid_bound)
928                                lower_bound = mid_bound;
929                            } else {
930                                debug_assert!(
931                                    idlp.lamport < block.lamport_range.0,
932                                    "{} {:?}",
933                                    idlp,
934                                    &block.lamport_range
935                                );
936                                // Target is smaller than the current block (pointed by mid_bound)
937                                upper_bound = mid_bound;
938                            }
939
940                            let mid_bound = (lower_bound + upper_bound) / 2;
941                            iter = inner
942                                .mem_parsed_kv
943                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
944                        } else {
945                            // Test whether we need to switch to binary search by measuring the gap
946                            if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
947                            {
948                                // Use binary search to find the block
949                                upper_bound = id.counter;
950                                let mid_bound = (lower_bound + upper_bound) / 2;
951                                iter = inner.mem_parsed_kv.range_mut(
952                                    ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
953                                );
954                                is_binary_searching = true;
955                            }
956
957                            upper_bound = id.counter;
958                        }
959                    }
960                    None => {
961                        if !is_binary_searching {
962                            break;
963                        }
964
965                        let mid_bound = (lower_bound + upper_bound) / 2;
966                        lower_bound = mid_bound;
967                        if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
968                            // If they are too close, we can just scan the range
969                            iter = inner.mem_parsed_kv.range_mut(
970                                ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
971                            );
972                            is_binary_searching = false;
973                        } else {
974                            let mid_bound = (lower_bound + upper_bound) / 2;
975                            iter = inner
976                                .mem_parsed_kv
977                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
978                        }
979                    }
980                }
981            }
982
983            let counter_end = upper_bound;
984            let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
985
986            let (id, bytes) = 'block_scan: {
987                let kv_store = &self.external_kv.lock();
988                let iter = kv_store
989                    .scan(
990                        Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
991                        Bound::Excluded(&scan_end),
992                    )
993                    .rev();
994
995                for (id, bytes) in iter {
996                    let mut block = ChangesBlockBytes::new(bytes.clone());
997                    let (lamport_start, _lamport_end) = block.lamport_range();
998                    if lamport_start <= idlp.lamport {
999                        break 'block_scan (id, bytes);
1000                    }
1001                }
1002
1003                return None;
1004            };
1005
1006            let block_id = ID::from_bytes(&id);
1007            let mut block = Arc::new(
1008                ChangesBlock::from_bytes(bytes)
1009                    .expect("validated external change block should decode"),
1010            );
1011            block
1012                .ensure_changes(&self.arena)
1013                .expect("Parse block error");
1014            inner.mem_parsed_kv.insert(block_id, block.clone());
1015            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
1016            Some(BlockChangeRef {
1017                change_index: index,
1018                block,
1019            })
1020        }
1021
1022        fn split_change_then_insert(
1023            &self,
1024            change: Change,
1025            mut rollback: Option<&mut ChangeStoreRollback>,
1026        ) {
1027            let original_len = change.atom_len();
1028            let mut new_change = Change {
1029                ops: RleVec::new(),
1030                deps: change.deps,
1031                id: change.id,
1032                lamport: change.lamport,
1033                timestamp: change.timestamp,
1034                commit_msg: change.commit_msg.clone(),
1035            };
1036
1037            let mut total_len = 0;
1038            let mut estimated_size = new_change.estimate_storage_size();
1039            'outer: for mut op in change.ops.into_iter() {
1040                if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
1041                    new_change = self._insert_splitted_change(
1042                        new_change,
1043                        &mut total_len,
1044                        &mut estimated_size,
1045                        rollback.as_deref_mut(),
1046                    );
1047                }
1048
1049                while let Some(end) =
1050                    op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
1051                {
1052                    // The new op can take the rest of the room
1053                    let new = op.slice(0, end);
1054                    new_change.ops.push(new);
1055                    new_change = self._insert_splitted_change(
1056                        new_change,
1057                        &mut total_len,
1058                        &mut estimated_size,
1059                        rollback.as_deref_mut(),
1060                    );
1061
1062                    if end < op.atom_len() {
1063                        op = op.slice(end, op.atom_len());
1064                    } else {
1065                        continue 'outer;
1066                    }
1067                }
1068
1069                estimated_size += op.estimate_storage_size();
1070                if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
1071                    new_change = self._insert_splitted_change(
1072                        new_change,
1073                        &mut total_len,
1074                        &mut estimated_size,
1075                        rollback.as_deref_mut(),
1076                    );
1077                    new_change.ops.push(op);
1078                } else {
1079                    new_change.ops.push(op);
1080                }
1081            }
1082
1083            if !new_change.ops.is_empty() {
1084                total_len += new_change.atom_len();
1085                self.insert_change_inner(new_change, false, false, rollback);
1086            }
1087
1088            assert_eq!(total_len, original_len);
1089        }
1090
1091        fn _insert_splitted_change(
1092            &self,
1093            new_change: Change,
1094            total_len: &mut usize,
1095            estimated_size: &mut usize,
1096            rollback: Option<&mut ChangeStoreRollback>,
1097        ) -> Change {
1098            if new_change.atom_len() == 0 {
1099                return new_change;
1100            }
1101
1102            let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
1103            let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
1104            *total_len += new_change.atom_len();
1105            let ans = Change {
1106                ops: RleVec::new(),
1107                deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
1108                id: ID::new(new_change.id.peer, ctr_end),
1109                lamport: next_lamport,
1110                timestamp: new_change.timestamp,
1111                commit_msg: new_change.commit_msg.clone(),
1112            };
1113
1114            self.insert_change_inner(new_change, false, false, rollback);
1115            *estimated_size = ans.estimate_storage_size();
1116            ans
1117        }
1118
1119        fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1120            let mut inner = self.inner.lock();
1121            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1122                if block.peer == id.peer && block.counter_range.1 > id.counter {
1123                    block
1124                        .ensure_changes(&self.arena)
1125                        .expect("Parse block error");
1126                    return Some(block.clone());
1127                }
1128            }
1129
1130            let store = self.external_kv.lock();
1131            let mut iter = store
1132                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1133                .filter(|(id, _)| id.len() == 12);
1134
1135            // println!(
1136            //     "\nkeys {:?}",
1137            //     store
1138            //         .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1139            //         .filter(|(id, _)| id.len() == 12)
1140            //         .map(|(k, _v)| ID::from_bytes(&k))
1141            //         .count()
1142            // );
1143            // println!("id {:?}", id);
1144
1145            let (b_id, b_bytes) = iter.next_back()?;
1146            let block_id: ID = ID::from_bytes(&b_id[..]);
1147            let block = ChangesBlock::from_bytes(b_bytes)
1148                .expect("validated external change block should decode");
1149            if block_id.peer == id.peer
1150                && block_id.counter <= id.counter
1151                && block.counter_range.1 > id.counter
1152            {
1153                let mut arc_block = Arc::new(block);
1154                arc_block
1155                    .ensure_changes(&self.arena)
1156                    .expect("Parse block error");
1157                inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1158                return Some(arc_block);
1159            }
1160
1161            None
1162        }
1163
1164        /// Load all the blocks that have overlapped with the given ID range into `inner_mem_parsed_kv`
1165        ///
1166        /// This is fast because we don't actually parse the content.
1167        // TODO: PERF: This method feels slow.
1168        pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1169            let mut whether_need_scan_backward = match start {
1170                Bound::Included(id) => Some(id),
1171                Bound::Excluded(id) => Some(id.inc(1)),
1172                Bound::Unbounded => None,
1173            };
1174
1175            {
1176                let start = start.map(|id| id.to_bytes());
1177                let end = end.map(|id| id.to_bytes());
1178                let kv = self.external_kv.lock();
1179                let mut inner = self.inner.lock();
1180                for (id, bytes) in kv
1181                    .scan(
1182                        start.as_ref().map(|x| x.as_slice()),
1183                        end.as_ref().map(|x| x.as_slice()),
1184                    )
1185                    .filter(|(id, _)| id.len() == 12)
1186                {
1187                    let id = ID::from_bytes(&id);
1188                    if let Some(expected_start_id) = whether_need_scan_backward {
1189                        if id == expected_start_id {
1190                            whether_need_scan_backward = None;
1191                        }
1192                    }
1193
1194                    if inner.mem_parsed_kv.contains_key(&id) {
1195                        continue;
1196                    }
1197
1198                    let block = ChangesBlock::from_bytes(bytes.clone())
1199                        .expect("validated external change block should decode");
1200                    inner.mem_parsed_kv.insert(id, Arc::new(block));
1201                }
1202            }
1203
1204            if let Some(start_id) = whether_need_scan_backward {
1205                self.ensure_id_lte(start_id);
1206            }
1207        }
1208
1209        pub(super) fn ensure_id_lte(&self, id: ID) {
1210            let kv = self.external_kv.lock();
1211            let mut inner = self.inner.lock();
1212            let Some((next_back_id, next_back_bytes)) = kv
1213                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1214                .rfind(|(id, _)| id.len() == 12)
1215            else {
1216                return;
1217            };
1218
1219            let next_back_id = ID::from_bytes(&next_back_id);
1220            if next_back_id.peer == id.peer {
1221                if inner.mem_parsed_kv.contains_key(&next_back_id) {
1222                    return;
1223                }
1224
1225                let block = ChangesBlock::from_bytes(next_back_bytes)
1226                    .expect("validated external change block should decode");
1227                inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1228            }
1229        }
1230    }
1231}
1232
1233#[must_use]
1234#[derive(Clone, Debug)]
1235pub(crate) struct BatchDecodeInfo {
1236    pub vv: VersionVector,
1237    pub frontiers: Frontiers,
1238    pub start_version: Option<(VersionVector, Frontiers)>,
1239}
1240
1241#[derive(Clone, Debug)]
1242pub struct BlockChangeRef {
1243    block: Arc<ChangesBlock>,
1244    change_index: usize,
1245}
1246
1247impl Deref for BlockChangeRef {
1248    type Target = Change;
1249    fn deref(&self) -> &Change {
1250        &self.block.content.try_changes().unwrap()[self.change_index]
1251    }
1252}
1253
1254impl BlockChangeRef {
1255    pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1256        if counter >= self.ctr_end() {
1257            return None;
1258        }
1259
1260        let index = self.ops.search_atom_index(counter);
1261        Some(BlockOpRef {
1262            block: self.block.clone(),
1263            change_index: self.change_index,
1264            op_index: index,
1265        })
1266    }
1267}
1268
1269#[derive(Clone, Debug)]
1270pub(crate) struct BlockOpRef {
1271    pub block: Arc<ChangesBlock>,
1272    pub change_index: usize,
1273    pub op_index: usize,
1274}
1275
1276impl Deref for BlockOpRef {
1277    type Target = Op;
1278
1279    fn deref(&self) -> &Op {
1280        &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1281    }
1282}
1283
1284impl BlockOpRef {
1285    pub fn lamport(&self) -> Lamport {
1286        let change = &self.block.content.try_changes().unwrap()[self.change_index];
1287        let op = &change.ops[self.op_index];
1288        (op.counter - change.id.counter) as Lamport + change.lamport
1289    }
1290}
1291
1292impl ChangesBlock {
1293    fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1294        let len = bytes.len();
1295        let bytes = ChangesBlockBytes::new(bytes);
1296        bytes.ensure_header()?;
1297        let header = bytes
1298            .header
1299            .get()
1300            .expect("header should be initialized after ensure_header");
1301        let peer = header.peer;
1302        let counter_range = (
1303            header.counter,
1304            *header.counters.last().ok_or_else(|| {
1305                LoroError::DecodeError("Decode block error: missing counters".into())
1306            })?,
1307        );
1308        let lamport_range = (
1309            *header.lamports.first().ok_or_else(|| {
1310                LoroError::DecodeError("Decode block error: missing lamports".into())
1311            })?,
1312            *header.lamports.last().ok_or_else(|| {
1313                LoroError::DecodeError("Decode block error: missing lamports".into())
1314            })?,
1315        );
1316        let content = ChangesBlockContent::Bytes(bytes);
1317        Ok(Self {
1318            peer,
1319            estimated_size: len,
1320            counter_range,
1321            lamport_range,
1322            flushed: true,
1323            content,
1324        })
1325    }
1326
1327    #[allow(dead_code)]
1328    pub(crate) fn content(&self) -> &ChangesBlockContent {
1329        &self.content
1330    }
1331
1332    fn new(change: Change, _a: &SharedArena) -> Self {
1333        let atom_len = change.atom_len();
1334        let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1335        let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1336        let estimated_size = change.estimate_storage_size();
1337        let peer = change.id.peer;
1338        let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1339        Self {
1340            peer,
1341            counter_range,
1342            lamport_range,
1343            estimated_size,
1344            content,
1345            flushed: false,
1346        }
1347    }
1348
1349    #[allow(unused)]
1350    fn cmp_id(&self, id: ID) -> Ordering {
1351        self.peer.cmp(&id.peer).then_with(|| {
1352            if self.counter_range.0 > id.counter {
1353                Ordering::Greater
1354            } else if self.counter_range.1 <= id.counter {
1355                Ordering::Less
1356            } else {
1357                Ordering::Equal
1358            }
1359        })
1360    }
1361
1362    #[allow(unused)]
1363    fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1364        self.peer.cmp(&idlp.0).then_with(|| {
1365            if self.lamport_range.0 > idlp.1 {
1366                Ordering::Greater
1367            } else if self.lamport_range.1 <= idlp.1 {
1368                Ordering::Less
1369            } else {
1370                Ordering::Equal
1371            }
1372        })
1373    }
1374
1375    #[allow(unused)]
1376    fn is_full(&self) -> bool {
1377        self.estimated_size > MAX_BLOCK_SIZE
1378    }
1379
1380    #[allow(clippy::result_large_err)]
1381    fn push_change(
1382        self: &mut Arc<Self>,
1383        change: Change,
1384        new_change_size: usize,
1385        merge_interval: i64,
1386        a: &SharedArena,
1387    ) -> Result<(), Change> {
1388        if self.counter_range.1 != change.id.counter {
1389            return Err(change);
1390        }
1391
1392        let atom_len = change.atom_len();
1393        let next_lamport = change.lamport + atom_len as Lamport;
1394        let next_counter = change.id.counter + atom_len as Counter;
1395
1396        let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1397        let this = Arc::make_mut(self);
1398        let changes = this.content.changes_mut(a).unwrap();
1399        let changes = Arc::make_mut(changes);
1400        match changes.last_mut() {
1401            Some(last)
1402                if last.can_merge_right(&change, merge_interval)
1403                    && (!is_full
1404                        || (change.ops.len() == 1
1405                            && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1406            {
1407                for op in change.ops.into_iter() {
1408                    let size = op.estimate_storage_size();
1409                    if !last.ops.push(op) {
1410                        this.estimated_size += size;
1411                    }
1412                }
1413            }
1414            _ => {
1415                if is_full {
1416                    return Err(change);
1417                } else {
1418                    this.estimated_size += new_change_size;
1419                    changes.push(change);
1420                }
1421            }
1422        }
1423
1424        this.flushed = false;
1425        this.counter_range.1 = next_counter;
1426        this.lamport_range.1 = next_lamport;
1427        Ok(())
1428    }
1429
1430    fn to_bytes(self: &mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1431        match &self.content {
1432            ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1433            ChangesBlockContent::Both(_, bytes) => {
1434                let bytes = bytes.clone();
1435                let this = Arc::make_mut(self);
1436                this.content = ChangesBlockContent::Bytes(bytes.clone());
1437                bytes
1438            }
1439            ChangesBlockContent::Changes(changes) => {
1440                let bytes = ChangesBlockBytes::serialize(changes, a);
1441                let this = Arc::make_mut(self);
1442                this.content = ChangesBlockContent::Bytes(bytes.clone());
1443                bytes
1444            }
1445        }
1446    }
1447
1448    fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1449        match &self.content {
1450            ChangesBlockContent::Changes(_) => Ok(()),
1451            ChangesBlockContent::Both(_, _) => Ok(()),
1452            ChangesBlockContent::Bytes(bytes) => {
1453                let changes = bytes.parse(a)?;
1454                let b = bytes.clone();
1455                let this = Arc::make_mut(self);
1456                this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1457                Ok(())
1458            }
1459        }
1460    }
1461
1462    fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1463        let changes = self.content.try_changes().unwrap();
1464        changes.binary_search_by(|c| {
1465            if c.id.counter > counter {
1466                Ordering::Greater
1467            } else if (c.id.counter + c.content_len() as Counter) <= counter {
1468                Ordering::Less
1469            } else {
1470                Ordering::Equal
1471            }
1472        })
1473    }
1474
1475    fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1476        let changes = self.content.try_changes().unwrap();
1477        let r = changes.binary_search_by(|c| {
1478            if c.lamport > lamport {
1479                Ordering::Greater
1480            } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1481                Ordering::Less
1482            } else {
1483                Ordering::Equal
1484            }
1485        });
1486
1487        match r {
1488            Ok(found) => Some(found),
1489            Err(idx) => {
1490                if idx == 0 {
1491                    None
1492                } else {
1493                    Some(idx - 1)
1494                }
1495            }
1496        }
1497    }
1498
1499    #[allow(unused)]
1500    fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1501        self.content.changes(a)
1502    }
1503
1504    #[allow(unused)]
1505    fn id(&self) -> ID {
1506        ID::new(self.peer, self.counter_range.0)
1507    }
1508
1509    pub fn change_num(&self) -> usize {
1510        match &self.content {
1511            ChangesBlockContent::Changes(c) => c.len(),
1512            ChangesBlockContent::Bytes(b) => b.len_changes(),
1513            ChangesBlockContent::Both(c, _) => c.len(),
1514        }
1515    }
1516}
1517
1518impl ChangesBlockContent {
1519    // TODO: PERF: We can use Iter to replace Vec
1520    pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1521        let mut dag_nodes = Vec::new();
1522        match self {
1523            ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1524                for change in c.iter() {
1525                    let new_node = AppDagNodeInner {
1526                        peer: change.id.peer,
1527                        cnt: change.id.counter,
1528                        lamport: change.lamport,
1529                        deps: change.deps.clone(),
1530                        vv: OnceCell::new(),
1531                        has_succ: false,
1532                        len: change.atom_len(),
1533                    }
1534                    .into();
1535
1536                    dag_nodes.push_rle_element(new_node);
1537                }
1538            }
1539            ChangesBlockContent::Bytes(b) => {
1540                b.ensure_header().unwrap();
1541                let header = b.header.get().unwrap();
1542                let n = header.n_changes;
1543                for i in 0..n {
1544                    let new_node = AppDagNodeInner {
1545                        peer: header.peer,
1546                        cnt: header.counters[i],
1547                        lamport: header.lamports[i],
1548                        deps: header.deps_groups[i].clone(),
1549                        vv: OnceCell::new(),
1550                        has_succ: false,
1551                        len: (header.counters[i + 1] - header.counters[i]) as usize,
1552                    }
1553                    .into();
1554
1555                    dag_nodes.push_rle_element(new_node);
1556                }
1557            }
1558        }
1559
1560        dag_nodes
1561    }
1562
1563    #[allow(unused)]
1564    pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1565        match self {
1566            ChangesBlockContent::Changes(changes) => Ok(changes),
1567            ChangesBlockContent::Both(changes, _) => Ok(changes),
1568            ChangesBlockContent::Bytes(bytes) => {
1569                let changes = bytes.parse(a)?;
1570                *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1571                self.changes(a)
1572            }
1573        }
1574    }
1575
1576    /// Note that this method will invalidate the stored bytes
1577    fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1578        match self {
1579            ChangesBlockContent::Changes(changes) => Ok(changes),
1580            ChangesBlockContent::Both(changes, _) => {
1581                *self = ChangesBlockContent::Changes(std::mem::take(changes));
1582                self.changes_mut(a)
1583            }
1584            ChangesBlockContent::Bytes(bytes) => {
1585                let changes = bytes.parse(a)?;
1586                *self = ChangesBlockContent::Changes(Arc::new(changes));
1587                self.changes_mut(a)
1588            }
1589        }
1590    }
1591
1592    pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1593        match self {
1594            ChangesBlockContent::Changes(changes) => Some(changes),
1595            ChangesBlockContent::Both(changes, _) => Some(changes),
1596            ChangesBlockContent::Bytes(_) => None,
1597        }
1598    }
1599
1600    #[allow(dead_code)]
1601    pub(crate) fn len_changes(&self) -> usize {
1602        match self {
1603            ChangesBlockContent::Changes(changes) => changes.len(),
1604            ChangesBlockContent::Both(changes, _) => changes.len(),
1605            ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1606        }
1607    }
1608}
1609
1610impl std::fmt::Debug for ChangesBlockContent {
1611    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1612        match self {
1613            ChangesBlockContent::Changes(changes) => f
1614                .debug_tuple("ChangesBlockContent::Changes")
1615                .field(changes)
1616                .finish(),
1617            ChangesBlockContent::Bytes(_bytes) => {
1618                f.debug_tuple("ChangesBlockContent::Bytes").finish()
1619            }
1620            ChangesBlockContent::Both(changes, _bytes) => f
1621                .debug_tuple("ChangesBlockContent::Both")
1622                .field(changes)
1623                .finish(),
1624        }
1625    }
1626}
1627
1628impl ChangesBlockBytes {
1629    fn new(bytes: Bytes) -> Self {
1630        Self {
1631            header: OnceCell::new(),
1632            bytes,
1633        }
1634    }
1635
1636    fn ensure_header(&self) -> LoroResult<()> {
1637        self.header
1638            .get_or_try_init(|| decode_header(&self.bytes).map(Arc::new))?;
1639        Ok(())
1640    }
1641
1642    fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1643        self.ensure_header()?;
1644        let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1645        for c in ans.iter() {
1646            // PERF: This can be made faster (low priority)
1647            register_container_and_parent_link(a, c)
1648        }
1649
1650        Ok(ans)
1651    }
1652
1653    fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1654        let bytes = encode_block(changes, a);
1655        // TODO: Perf we can calculate header directly without parsing the bytes
1656        let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1657        bytes.ensure_header().unwrap();
1658        bytes
1659    }
1660
1661    fn lamport_range(&mut self) -> (Lamport, Lamport) {
1662        if let Some(header) = self.header.get() {
1663            (header.lamports[0], *header.lamports.last().unwrap())
1664        } else {
1665            decode_block_range(&self.bytes).unwrap().1
1666        }
1667    }
1668
1669    /// Length of the changes
1670    fn len_changes(&self) -> usize {
1671        self.ensure_header().unwrap();
1672        self.header.get().unwrap().n_changes
1673    }
1674}
1675
1676#[cfg(test)]
1677mod test {
1678    use crate::cursor::PosType;
1679    use crate::{
1680        loro::ExportMode, oplog::convert_change_to_remote, state::TreeParentId, ListHandler,
1681        LoroDoc, MovableListHandler, TextHandler, TreeHandler,
1682    };
1683
1684    use super::*;
1685
1686    fn test_encode_decode(doc: LoroDoc) {
1687        doc.commit_then_renew();
1688        let oplog = doc.oplog().lock();
1689        let bytes = oplog
1690            .change_store
1691            .encode_all(oplog.vv(), oplog.dag.frontiers());
1692        let store = ChangeStore::new_for_test();
1693        let _ = store.import_all(bytes.clone()).unwrap();
1694        assert_eq!(store.external_kv.lock().export_all(), bytes);
1695        let mut changes_parsed = Vec::new();
1696        let a = store.arena.clone();
1697        store.visit_all_changes(&mut |c| {
1698            changes_parsed.push(convert_change_to_remote(&a, c));
1699        });
1700        let mut changes = Vec::new();
1701        oplog.change_store.visit_all_changes(&mut |c| {
1702            changes.push(convert_change_to_remote(&oplog.arena, c));
1703        });
1704        assert_eq!(changes_parsed, changes);
1705    }
1706
1707    #[test]
1708    fn test_change_store() {
1709        let doc = LoroDoc::new_auto_commit();
1710        doc.set_record_timestamp(true);
1711        let t = doc.get_text("t");
1712        t.insert(0, "hello", PosType::Unicode).unwrap();
1713        doc.commit_then_renew();
1714        let t = doc.get_list("t");
1715        t.insert(0, "hello").unwrap();
1716        test_encode_decode(doc);
1717    }
1718
1719    #[test]
1720    fn test_synced_doc() -> LoroResult<()> {
1721        let doc_a = LoroDoc::new_auto_commit();
1722        let doc_b = LoroDoc::new_auto_commit();
1723        let doc_c = LoroDoc::new_auto_commit();
1724
1725        {
1726            // A: Create initial structure
1727            let map = doc_a.get_map("root");
1728            map.insert_container("text", TextHandler::new_detached())?;
1729            map.insert_container("list", ListHandler::new_detached())?;
1730            map.insert_container("tree", TreeHandler::new_detached())?;
1731        }
1732
1733        {
1734            // Sync initial state to B and C
1735            let initial_state = doc_a.export(ExportMode::all_updates()).unwrap();
1736            doc_b.import(&initial_state)?;
1737            doc_c.import(&initial_state)?;
1738        }
1739
1740        {
1741            // B: Edit text and list
1742            let map = doc_b.get_map("root");
1743            let text = map
1744                .insert_container("text", TextHandler::new_detached())
1745                .unwrap();
1746            text.insert(0, "Hello, ", PosType::Unicode)?;
1747
1748            let list = map
1749                .insert_container("list", ListHandler::new_detached())
1750                .unwrap();
1751            list.push("world")?;
1752        }
1753
1754        {
1755            // C: Edit tree and movable list
1756            let map = doc_c.get_map("root");
1757            let tree = map
1758                .insert_container("tree", TreeHandler::new_detached())
1759                .unwrap();
1760            let node_id = tree.create(TreeParentId::Root)?;
1761            tree.get_meta(node_id)?.insert("key", "value")?;
1762            let node_b = tree.create(TreeParentId::Root)?;
1763            tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1764
1765            let movable_list = map
1766                .insert_container("movable", MovableListHandler::new_detached())
1767                .unwrap();
1768            movable_list.push("item1".into())?;
1769            movable_list.push("item2".into())?;
1770            movable_list.mov(0, 1)?;
1771        }
1772
1773        // Sync B's changes to A
1774        let b_changes = doc_b
1775            .export(ExportMode::updates(&doc_a.oplog_vv()))
1776            .unwrap();
1777        doc_a.import(&b_changes)?;
1778
1779        // Sync C's changes to A
1780        let c_changes = doc_c
1781            .export(ExportMode::updates(&doc_a.oplog_vv()))
1782            .unwrap();
1783        doc_a.import(&c_changes)?;
1784
1785        test_encode_decode(doc_a);
1786        Ok(())
1787    }
1788}