Skip to main content

loro_internal/oplog/
change_store.rs

1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::sync::Mutex;
4use crate::{
5    arena::SharedArena,
6    change::Change,
7    estimated_size::EstimatedSize,
8    kv_store::KvStore,
9    op::Op,
10    parent::register_container_and_parent_link,
11    version::{Frontiers, ImVersionVector},
12    VersionVector,
13};
14use block_encode::decode_block_range;
15use bytes::Bytes;
16use itertools::Itertools;
17use loro_common::{
18    Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError,
19    LoroResult, PeerID, ID,
20};
21use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
22use once_cell::sync::OnceCell;
23use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
24use std::sync::atomic::AtomicI64;
25use std::{
26    cmp::Ordering,
27    collections::{BTreeMap, VecDeque},
28    ops::{Bound, Deref},
29    sync::Arc,
30};
31use tracing::{info_span, warn};
32mod block_encode;
33mod block_meta_encode;
34pub(super) mod iter;
35
36#[cfg(not(test))]
37const MAX_BLOCK_SIZE: usize = 1024 * 4;
38#[cfg(test)]
39const MAX_BLOCK_SIZE: usize = 128;
40
41/// # Invariance
42///
43/// - We don't allow holes in a block or between two blocks with the same peer id.
44///   The [Change] should be continuous for each peer.
45/// - However, the first block of a peer can have counter > 0 so that we can trim the history.
46///
47/// # Encoding Schema
48///
49/// It's based on the underlying KV store.
50///
51/// The entries of the KV store is made up of the following fields
52///
53/// |Key                          |Value             |
54/// |:--                          |:----             |
55/// |b"vv"                        |VersionVector     |
56/// |b"fr"                        |Frontiers         |
57/// |b"sv"                        |Shallow VV        |
58/// |b"sf"                        |Shallow Frontiers |
59/// |12 bytes PeerID + Counter    |Encoded Block     |
60#[derive(Debug, Clone)]
61pub struct ChangeStore {
62    inner: Arc<Mutex<ChangeStoreInner>>,
63    arena: SharedArena,
64    /// A change may be in external_kv or in the mem_parsed_kv.
65    /// mem_parsed_kv is more up-to-date.
66    ///
67    /// We cannot directly write into the external_kv except from the initial load
68    external_kv: Arc<Mutex<dyn KvStore>>,
69    /// The version vector of the external kv store.
70    external_vv: Arc<Mutex<VersionVector>>,
71    merge_interval: Arc<AtomicI64>,
72}
73
74#[derive(Debug, Clone)]
75struct ChangeStoreInner {
76    /// The start version vector of the first block for each peer.
77    /// It allows us to trim the history
78    start_vv: ImVersionVector,
79    /// The last version of the shallow history.
80    start_frontiers: Frontiers,
81    /// It's more like a parsed cache for binary_kv.
82    mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
83}
84
85#[derive(Debug)]
86pub(crate) struct ChangeStoreRollback {
87    old_vv: VersionVector,
88    blocks_before_mutation: BTreeMap<ID, Arc<ChangesBlock>>,
89}
90
91impl ChangeStoreRollback {
92    pub(crate) fn new(old_vv: VersionVector) -> Self {
93        Self {
94            old_vv,
95            blocks_before_mutation: BTreeMap::new(),
96        }
97    }
98
99    fn record_block_before_mutation(&mut self, id: ID, block: Arc<ChangesBlock>) {
100        let old_end = self.old_vv.get(&id.peer).copied().unwrap_or(0);
101        if id.counter >= old_end {
102            return;
103        }
104
105        self.blocks_before_mutation.entry(id).or_insert(block);
106    }
107}
108
109#[derive(Debug, Clone)]
110pub(crate) struct ChangesBlock {
111    peer: PeerID,
112    counter_range: (Counter, Counter),
113    lamport_range: (Lamport, Lamport),
114    /// Estimated size of the block in bytes
115    estimated_size: usize,
116    flushed: bool,
117    content: ChangesBlockContent,
118}
119
120#[derive(Clone)]
121pub(crate) enum ChangesBlockContent {
122    Changes(Arc<Vec<Change>>),
123    Bytes(ChangesBlockBytes),
124    Both(Arc<Vec<Change>>, ChangesBlockBytes),
125}
126
127/// It's cheap to clone this struct because it's cheap to clone the bytes
128#[derive(Clone)]
129pub(crate) struct ChangesBlockBytes {
130    bytes: Bytes,
131    header: OnceCell<Arc<ChangesBlockHeader>>,
132}
133
134pub const START_VV_KEY: &[u8] = b"sv";
135pub const START_FRONTIERS_KEY: &[u8] = b"sf";
136pub const VV_KEY: &[u8] = b"vv";
137pub const FRONTIERS_KEY: &[u8] = b"fr";
138
139impl ChangeStore {
140    pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
141        Self {
142            inner: Arc::new(Mutex::new(ChangeStoreInner {
143                start_vv: ImVersionVector::new(),
144                start_frontiers: Frontiers::default(),
145                mem_parsed_kv: BTreeMap::new(),
146            })),
147            arena: a.clone(),
148            external_vv: Arc::new(Mutex::new(VersionVector::new())),
149            external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
150            // external_kv: Arc::new(Mutex::new(BTreeMap::default())),
151            merge_interval,
152        }
153    }
154
155    #[cfg(test)]
156    fn new_for_test() -> Self {
157        Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
158    }
159
160    pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
161        self.flush_and_compact(vv, frontiers);
162        let mut kv = self.external_kv.lock();
163        kv.export_all()
164    }
165
166    #[tracing::instrument(skip(self), level = "debug")]
167    pub(super) fn export_from(
168        &self,
169        start_vv: &VersionVector,
170        start_frontiers: &Frontiers,
171        latest_vv: &VersionVector,
172        latest_frontiers: &Frontiers,
173    ) -> Bytes {
174        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
175        for span in latest_vv.sub_iter(start_vv) {
176            // PERF: this can be optimized by reusing the current encoded blocks
177            // In the current method, it needs to parse and re-encode the blocks
178            for c in self.iter_changes(span) {
179                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
180                    as usize)
181                    .min(c.atom_len());
182                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
183                    as usize)
184                    .min(c.atom_len());
185
186                if start == end {
187                    continue;
188                }
189
190                let ch = c.slice(start, end);
191                new_store.insert_change(ch, false, false);
192            }
193        }
194
195        loro_common::debug!(
196            "start_vv={:?} start_frontiers={:?}",
197            &start_vv,
198            start_frontiers
199        );
200        new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
201    }
202
203    pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
204        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
205        for span in spans {
206            let mut span = *span;
207            span.normalize_();
208            // PERF: this can be optimized by reusing the current encoded blocks
209            // In the current method, it needs to parse and re-encode the blocks
210            for c in self.iter_changes(span) {
211                let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
212                let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
213                if start == end {
214                    continue;
215                }
216
217                let ch = c.slice(start, end);
218                new_store.insert_change(ch, false, false);
219            }
220        }
221
222        encode_blocks_in_store(new_store, &self.arena, w);
223    }
224
225    fn encode_from(
226        &self,
227        start_vv: &VersionVector,
228        start_frontiers: &Frontiers,
229        latest_vv: &VersionVector,
230        latest_frontiers: &Frontiers,
231    ) -> Bytes {
232        {
233            let mut store = self.external_kv.lock();
234            store.set(START_VV_KEY, start_vv.encode().into());
235            store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
236            let mut inner = self.inner.lock();
237            inner.start_frontiers = start_frontiers.clone();
238            inner.start_vv = ImVersionVector::from_vv(start_vv);
239        }
240        self.flush_and_compact(latest_vv, latest_frontiers);
241        self.external_kv.lock().export_all()
242    }
243
244    pub(crate) fn decode_snapshot_for_updates(
245        bytes: Bytes,
246        arena: &SharedArena,
247        self_vv: &VersionVector,
248    ) -> Result<Vec<Change>, LoroError> {
249        let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
250        let _ = change_store.import_all(bytes)?;
251        let mut changes = Vec::new();
252        change_store.visit_all_changes(&mut |c| {
253            let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
254            if c.id.counter >= cnt_threshold {
255                changes.push(c.clone());
256                return;
257            }
258
259            let change_end = c.ctr_end();
260            if change_end > cnt_threshold {
261                changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
262            }
263        });
264
265        Ok(changes)
266    }
267
268    pub(crate) fn decode_block_bytes(
269        bytes: Bytes,
270        arena: &SharedArena,
271        self_vv: &VersionVector,
272    ) -> LoroResult<Vec<Change>> {
273        let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
274        if ans.is_empty() {
275            return Ok(ans);
276        }
277
278        let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
279        ans.retain_mut(|c| {
280            if c.id.counter >= start {
281                true
282            } else if c.ctr_end() > start {
283                *c = c.slice((start - c.id.counter) as usize, c.atom_len());
284                true
285            } else {
286                false
287            }
288        });
289
290        Ok(ans)
291    }
292
293    pub(crate) fn rollback_import(&self, rollback: ChangeStoreRollback) {
294        let mut inner = self.inner.lock();
295        inner.mem_parsed_kv.retain(|id, _| {
296            let old_end = rollback.old_vv.get(&id.peer).copied().unwrap_or(0);
297            id.counter < old_end
298        });
299
300        for (id, block) in rollback.blocks_before_mutation {
301            inner.mem_parsed_kv.insert(id, block);
302        }
303    }
304
305    pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
306        let block = self.get_block_that_contains(id)?;
307        Some(block.content.iter_dag_nodes())
308    }
309
310    pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
311        let block = self.get_the_last_block_of_peer(peer)?;
312        Some(block.content.iter_dag_nodes())
313    }
314
315    pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
316        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
317        let mut inner = self.inner.lock();
318        for (_, block) in inner.mem_parsed_kv.iter_mut() {
319            block
320                .ensure_changes(&self.arena)
321                .expect("Parse block error");
322            for c in block.content.try_changes().unwrap() {
323                f(c);
324            }
325        }
326    }
327
328    pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
329        if id_span.counter.start == id_span.counter.end {
330            return vec![];
331        }
332
333        assert!(id_span.counter.start < id_span.counter.end);
334        self.ensure_block_loaded_in_range(
335            Bound::Included(id_span.id_start()),
336            Bound::Excluded(id_span.id_end()),
337        );
338        let mut inner = self.inner.lock();
339        let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
340        match next_back {
341            None => {
342                return vec![];
343            }
344            Some(next_back) => {
345                if next_back.0.peer != id_span.peer {
346                    return vec![];
347                }
348            }
349        }
350        let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
351        let ans = inner
352            .mem_parsed_kv
353            .range_mut(
354                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
355            )
356            .filter_map(|(_id, block)| {
357                if block.counter_range.1 < id_span.counter.start {
358                    return None;
359                }
360
361                block
362                    .ensure_changes(&self.arena)
363                    .expect("Parse block error");
364                let changes = block.content.try_changes().unwrap();
365                let start;
366                let end;
367                if id_span.counter.start <= block.counter_range.0
368                    && id_span.counter.end >= block.counter_range.1
369                {
370                    start = 0;
371                    end = changes.len();
372                } else {
373                    start = block
374                        .get_change_index_by_counter(id_span.counter.start)
375                        .unwrap_or_else(|x| x);
376
377                    match block.get_change_index_by_counter(id_span.counter.end - 1) {
378                        Ok(e) => {
379                            end = e + 1;
380                        }
381                        Err(0) => return None,
382                        Err(e) => {
383                            end = e;
384                        }
385                    }
386                }
387                if start == end {
388                    return None;
389                }
390
391                Some((block.clone(), start, end))
392            })
393            // TODO: PERF avoid alloc
394            .collect_vec();
395
396        ans
397    }
398
399    pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
400        let v = self.iter_blocks(id_span);
401        #[cfg(debug_assertions)]
402        {
403            if !v.is_empty() {
404                assert_eq!(v[0].0.peer, id_span.peer);
405                assert_eq!(v.last().unwrap().0.peer, id_span.peer);
406                {
407                    // Test start
408                    let (block, start, _end) = v.first().unwrap();
409                    let changes = block.content.try_changes().unwrap();
410                    assert!(changes[*start].id.counter <= id_span.counter.start);
411                }
412                {
413                    // Test end
414                    let (block, _start, end) = v.last().unwrap();
415                    let changes = block.content.try_changes().unwrap();
416                    assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
417                    assert!(changes[*end - 1].id.counter < id_span.counter.end);
418                }
419            }
420        }
421
422        v.into_iter().flat_map(move |(block, start, end)| {
423            (start..end).map(move |i| BlockChangeRef {
424                change_index: i,
425                block: block.clone(),
426            })
427        })
428    }
429
430    pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
431        let mut inner = self.inner.lock();
432        let start_counter = inner
433            .mem_parsed_kv
434            .range(..=id_span.id_start())
435            .next_back()
436            .map(|(id, _)| id.counter)
437            .unwrap_or(0);
438        let vec = inner
439            .mem_parsed_kv
440            .range_mut(
441                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
442            )
443            .filter_map(|(_id, block)| {
444                if block.counter_range.1 < id_span.counter.start {
445                    return None;
446                }
447
448                block
449                    .ensure_changes(&self.arena)
450                    .expect("Parse block error");
451                Some(block.clone())
452            })
453            // TODO: PERF avoid alloc
454            .collect();
455        vec
456    }
457
458    pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
459        self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
460        let inner = self.inner.lock();
461        let block = inner
462            .mem_parsed_kv
463            .range(..=id)
464            .next_back()
465            .filter(|(_, block)| {
466                block.peer == id.peer
467                    && block.counter_range.0 <= id.counter
468                    && id.counter < block.counter_range.1
469            })
470            .map(|(_, block)| block.clone());
471
472        block
473    }
474
475    pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
476        let end_id = ID::new(peer, Counter::MAX);
477        self.ensure_id_lte(end_id);
478        let inner = self.inner.lock();
479        let block = inner
480            .mem_parsed_kv
481            .range(..=end_id)
482            .next_back()
483            .filter(|(_, block)| block.peer == peer)
484            .map(|(_, block)| block.clone());
485
486        block
487    }
488
489    pub fn change_num(&self) -> usize {
490        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
491        let mut inner = self.inner.lock();
492        inner
493            .mem_parsed_kv
494            .iter_mut()
495            .map(|(_, block)| block.change_num())
496            .sum()
497    }
498
499    pub fn fork(
500        &self,
501        arena: SharedArena,
502        merge_interval: Arc<AtomicI64>,
503        vv: &VersionVector,
504        frontiers: &Frontiers,
505    ) -> Self {
506        self.flush_and_compact(vv, frontiers);
507        let inner = self.inner.lock();
508        Self {
509            inner: Arc::new(Mutex::new(ChangeStoreInner {
510                start_vv: inner.start_vv.clone(),
511                start_frontiers: inner.start_frontiers.clone(),
512                mem_parsed_kv: BTreeMap::new(),
513            })),
514            arena,
515            external_vv: Arc::new(Mutex::new(self.external_vv.lock().clone())),
516            external_kv: self.external_kv.lock().clone_store(),
517            merge_interval,
518        }
519    }
520
521    pub fn kv_size(&self) -> usize {
522        self.external_kv
523            .lock()
524            .scan(Bound::Unbounded, Bound::Unbounded)
525            .map(|(k, v)| k.len() + v.len())
526            .sum()
527    }
528
529    pub(crate) fn export_blocks_from<W: std::io::Write>(
530        &self,
531        start_vv: &VersionVector,
532        shallow_since_vv: &ImVersionVector,
533        latest_vv: &VersionVector,
534        w: &mut W,
535    ) {
536        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
537        for mut span in latest_vv.sub_iter(start_vv) {
538            let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
539            span.counter.start = span.counter.start.max(counter_lower_bound);
540            span.counter.end = span.counter.end.max(counter_lower_bound);
541            if span.counter.start >= span.counter.end {
542                continue;
543            }
544
545            // PERF: this can be optimized by reusing the current encoded blocks
546            // In the current method, it needs to parse and re-encode the blocks
547            for c in self.iter_changes(span) {
548                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
549                    as usize)
550                    .min(c.atom_len());
551                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
552                    as usize)
553                    .min(c.atom_len());
554
555                assert_ne!(start, end);
556                let ch = c.slice(start, end);
557                new_store.insert_change(ch, false, false);
558            }
559        }
560
561        let arena = &self.arena;
562        encode_blocks_in_store(new_store, arena, w);
563    }
564
565    pub(crate) fn fork_changes_up_to(
566        &self,
567        start_vv: &ImVersionVector,
568        frontiers: &Frontiers,
569        vv: &VersionVector,
570    ) -> Bytes {
571        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
572        for mut span in vv.sub_iter_im(start_vv) {
573            let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
574            span.counter.start = span.counter.start.max(counter_lower_bound);
575            span.counter.end = span.counter.end.max(counter_lower_bound);
576            if span.counter.start >= span.counter.end {
577                continue;
578            }
579
580            // PERF: this can be optimized by reusing the current encoded blocks
581            // In the current method, it needs to parse and re-encode the blocks
582            for c in self.iter_changes(span) {
583                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
584                    as usize)
585                    .min(c.atom_len());
586                let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
587                    as usize)
588                    .min(c.atom_len());
589
590                assert_ne!(start, end);
591                let ch = c.slice(start, end);
592                new_store.insert_change(ch, false, false);
593            }
594        }
595
596        new_store.encode_all(vv, frontiers)
597    }
598}
599
600fn encode_blocks_in_store<W: std::io::Write>(
601    new_store: ChangeStore,
602    arena: &SharedArena,
603    w: &mut W,
604) {
605    let mut inner = new_store.inner.lock();
606    for (_id, block) in inner.mem_parsed_kv.iter_mut() {
607        let bytes = block.to_bytes(arena);
608        leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
609        w.write_all(&bytes.bytes).unwrap();
610    }
611}
612
613mod mut_external_kv {
614    //! Only this module contains the code that mutate the external kv store
615    //! All other modules should only read from the external kv store
616    use super::*;
617
618    impl ChangeStore {
619        #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
620        pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
621            let mut kv_store = self.external_kv.lock();
622            assert!(
623                // 2 because there are vv and frontiers
624                kv_store.len() <= 2,
625                "kv store should be empty when using decode_all"
626            );
627            kv_store
628                .import_all(bytes)
629                .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
630            drop(kv_store);
631            self.validate_imported_change_blocks()?;
632            let vv_bytes = self.external_kv.lock().get(VV_KEY).unwrap_or_default();
633            let vv = VersionVector::decode(&vv_bytes)
634                .map_err(|_| LoroError::DecodeDataCorruptionError)?;
635            let start_vv_bytes = self
636                .external_kv
637                .lock()
638                .get(START_VV_KEY)
639                .unwrap_or_default();
640            let start_vv = if start_vv_bytes.is_empty() {
641                Default::default()
642            } else {
643                VersionVector::decode(&start_vv_bytes)
644                    .map_err(|_| LoroError::DecodeDataCorruptionError)?
645            };
646
647            #[cfg(test)]
648            {
649                // This is for tests
650                for (peer, cnt) in vv.iter() {
651                    self.get_change(ID::new(*peer, *cnt - 1))
652                        .ok_or(LoroError::DecodeDataCorruptionError)?;
653                }
654            }
655
656            *self.external_vv.lock() = vv.clone();
657            let frontiers_bytes = self
658                .external_kv
659                .lock()
660                .get(FRONTIERS_KEY)
661                .unwrap_or_default();
662            let frontiers = Frontiers::decode(&frontiers_bytes)
663                .map_err(|_| LoroError::DecodeDataCorruptionError)?;
664            let start_frontiers = self
665                .external_kv
666                .lock()
667                .get(START_FRONTIERS_KEY)
668                .unwrap_or_default();
669            let start_frontiers = if start_frontiers.is_empty() {
670                Default::default()
671            } else {
672                Frontiers::decode(&start_frontiers)
673                    .map_err(|_| LoroError::DecodeDataCorruptionError)?
674            };
675
676            let mut max_lamport = None;
677            let mut max_timestamp = 0;
678            for id in frontiers.iter() {
679                let c = self
680                    .get_change(id)
681                    .ok_or(LoroError::DecodeDataCorruptionError)?;
682                debug_assert_ne!(c.atom_len(), 0);
683                let l = c.lamport_last();
684                if let Some(x) = max_lamport {
685                    if l > x {
686                        max_lamport = Some(l);
687                    }
688                } else {
689                    max_lamport = Some(l);
690                }
691
692                let t = c.timestamp;
693                if t > max_timestamp {
694                    max_timestamp = t;
695                }
696            }
697
698            Ok(BatchDecodeInfo {
699                vv,
700                frontiers,
701                start_version: if start_vv.is_empty() {
702                    None
703                } else {
704                    let mut inner = self.inner.lock();
705                    inner.start_frontiers = start_frontiers.clone();
706                    inner.start_vv = ImVersionVector::from_vv(&start_vv);
707                    Some((start_vv, start_frontiers))
708                },
709            })
710        }
711
712        fn validate_imported_change_blocks(&self) -> LoroResult<()> {
713            let blocks: Vec<(ID, Bytes)> = {
714                let kv_store = self.external_kv.lock();
715                kv_store
716                    .scan(Bound::Unbounded, Bound::Unbounded)
717                    .filter(|(id, _)| id.len() == 12)
718                    .map(|(id, bytes)| (ID::from_bytes(&id), bytes))
719                    .collect()
720            };
721
722            for (_id, bytes) in blocks {
723                let mut block = Arc::new(ChangesBlock::from_bytes(bytes)?);
724                block.ensure_changes(&self.arena)?;
725            }
726
727            Ok(())
728        }
729
730        /// Flush the cached change to kv_store
731        pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
732            let mut inner = self.inner.lock();
733            let mut store = self.external_kv.lock();
734            let mut external_vv = self.external_vv.lock();
735            for (id, block) in inner.mem_parsed_kv.iter_mut() {
736                if !block.flushed {
737                    let id_bytes = id.to_bytes();
738                    let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
739                    assert!(
740                        counter_start < block.counter_range.1,
741                        "Peer={} Block Counter Range={:?}, counter_start={}",
742                        id.peer,
743                        &block.counter_range,
744                        counter_start
745                    );
746                    if counter_start > block.counter_range.0 {
747                        assert!(store.get(&id_bytes).is_some());
748                    }
749                    external_vv.insert(id.peer, block.counter_range.1);
750                    let bytes = block.to_bytes(&self.arena);
751                    store.set(&id_bytes, bytes.bytes);
752                    Arc::make_mut(block).flushed = true;
753                }
754            }
755
756            if inner.start_vv.is_empty() {
757                assert_eq!(&*external_vv, vv);
758            } else {
759                #[cfg(debug_assertions)]
760                {
761                    // TODO: makes some assertions here?
762                }
763            }
764            let vv_bytes = vv.encode();
765            let frontiers_bytes = frontiers.encode();
766            store.set(VV_KEY, vv_bytes.into());
767            store.set(FRONTIERS_KEY, frontiers_bytes.into());
768        }
769    }
770}
771
772mod mut_inner_kv {
773    //! Only this module contains the code that mutate the internal kv store
774    //! All other modules should only read from the internal kv store
775
776    use super::*;
777    impl ChangeStore {
778        /// This method is the **only place** that push a new change into the change store
779        ///
780        /// The new change either merges with the previous block or is put into a new block.
781        /// This method only updates the internal kv store.
782        pub fn insert_change(&self, change: Change, split_when_exceeds: bool, is_local: bool) {
783            self.insert_change_inner(change, split_when_exceeds, is_local, None);
784        }
785
786        pub(crate) fn insert_change_with_rollback(
787            &self,
788            change: Change,
789            split_when_exceeds: bool,
790            is_local: bool,
791            rollback: &mut ChangeStoreRollback,
792        ) {
793            self.insert_change_inner(change, split_when_exceeds, is_local, Some(rollback));
794        }
795
796        fn insert_change_inner(
797            &self,
798            mut change: Change,
799            split_when_exceeds: bool,
800            is_local: bool,
801            mut rollback: Option<&mut ChangeStoreRollback>,
802        ) {
803            #[cfg(debug_assertions)]
804            {
805                let vv = self.external_vv.lock();
806                assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
807            }
808
809            let s = info_span!("change_store insert_change", id = ?change.id);
810            let _e = s.enter();
811            let estimated_size = change.estimate_storage_size();
812            if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
813                self.split_change_then_insert(change, rollback.as_deref_mut());
814                return;
815            }
816
817            let id = change.id;
818            let mut inner = self.inner.lock();
819
820            // try to merge with previous block
821            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
822                if block.peer == change.id.peer {
823                    if block.counter_range.1 != change.id.counter {
824                        panic!("counter should be continuous")
825                    }
826
827                    if let Some(rollback) = rollback.as_deref_mut() {
828                        rollback.record_block_before_mutation(*_id, block.clone());
829                    }
830
831                    match block.push_change(
832                        change,
833                        estimated_size,
834                        if is_local {
835                            // local change should try to merge with previous change when
836                            // the timestamp interval <= the `merge_interval`
837                            self.merge_interval
838                                .load(std::sync::atomic::Ordering::Acquire)
839                        } else {
840                            0
841                        },
842                        &self.arena,
843                    ) {
844                        Ok(_) => {
845                            drop(inner);
846                            debug_assert!(self.get_change(id).is_some());
847                            return;
848                        }
849                        Err(c) => change = c,
850                    }
851                }
852            }
853
854            inner
855                .mem_parsed_kv
856                .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
857            drop(inner);
858            debug_assert!(self.get_change(id).is_some());
859        }
860
861        pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
862            let block = self.get_parsed_block(id)?;
863            Some(BlockChangeRef {
864                change_index: block.get_change_index_by_counter(id.counter).unwrap(),
865                block: block.clone(),
866            })
867        }
868
869        /// Get the change with the given peer and lamport.
870        ///
871        /// If not found, return the change with the greatest lamport that is smaller than the given lamport.
872        pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
873            // This method is complicated because we impl binary search on top of the range api
874            // It can be simplified
875            let mut inner = self.inner.lock();
876            let mut iter = inner
877                .mem_parsed_kv
878                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
879
880            // This won't change, we only adjust upper_bound
881            let mut lower_bound = 0;
882            let mut upper_bound = i32::MAX;
883            let mut is_binary_searching = false;
884            loop {
885                match iter.next_back() {
886                    Some((&id, block)) => {
887                        if block.lamport_range.0 <= idlp.lamport
888                            && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
889                        {
890                            if !is_binary_searching
891                                && upper_bound != i32::MAX
892                                && upper_bound != block.counter_range.1
893                            {
894                                warn!(
895                                    "There is a hole between the last block and the current block"
896                                );
897                                // There is hole between the last block and the current block
898                                // We need to load it from the kv store
899                                break;
900                            }
901
902                            // Found the block
903                            block
904                                .ensure_changes(&self.arena)
905                                .expect("Parse block error");
906                            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
907                            return Some(BlockChangeRef {
908                                change_index: index,
909                                block: block.clone(),
910                            });
911                        }
912
913                        if is_binary_searching {
914                            let mid_bound = (lower_bound + upper_bound) / 2;
915                            if block.lamport_range.1 <= idlp.lamport {
916                                // Target is larger than the current block (pointed by mid_bound)
917                                lower_bound = mid_bound;
918                            } else {
919                                debug_assert!(
920                                    idlp.lamport < block.lamport_range.0,
921                                    "{} {:?}",
922                                    idlp,
923                                    &block.lamport_range
924                                );
925                                // Target is smaller than the current block (pointed by mid_bound)
926                                upper_bound = mid_bound;
927                            }
928
929                            let mid_bound = (lower_bound + upper_bound) / 2;
930                            iter = inner
931                                .mem_parsed_kv
932                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
933                        } else {
934                            // Test whether we need to switch to binary search by measuring the gap
935                            if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
936                            {
937                                // Use binary search to find the block
938                                upper_bound = id.counter;
939                                let mid_bound = (lower_bound + upper_bound) / 2;
940                                iter = inner.mem_parsed_kv.range_mut(
941                                    ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
942                                );
943                                is_binary_searching = true;
944                            }
945
946                            upper_bound = id.counter;
947                        }
948                    }
949                    None => {
950                        if !is_binary_searching {
951                            break;
952                        }
953
954                        let mid_bound = (lower_bound + upper_bound) / 2;
955                        lower_bound = mid_bound;
956                        if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
957                            // If they are too close, we can just scan the range
958                            iter = inner.mem_parsed_kv.range_mut(
959                                ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
960                            );
961                            is_binary_searching = false;
962                        } else {
963                            let mid_bound = (lower_bound + upper_bound) / 2;
964                            iter = inner
965                                .mem_parsed_kv
966                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
967                        }
968                    }
969                }
970            }
971
972            let counter_end = upper_bound;
973            let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
974
975            let (id, bytes) = 'block_scan: {
976                let kv_store = &self.external_kv.lock();
977                let iter = kv_store
978                    .scan(
979                        Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
980                        Bound::Excluded(&scan_end),
981                    )
982                    .rev();
983
984                for (id, bytes) in iter {
985                    let mut block = ChangesBlockBytes::new(bytes.clone());
986                    let (lamport_start, _lamport_end) = block.lamport_range();
987                    if lamport_start <= idlp.lamport {
988                        break 'block_scan (id, bytes);
989                    }
990                }
991
992                return None;
993            };
994
995            let block_id = ID::from_bytes(&id);
996            let mut block = Arc::new(
997                ChangesBlock::from_bytes(bytes)
998                    .expect("validated external change block should decode"),
999            );
1000            block
1001                .ensure_changes(&self.arena)
1002                .expect("Parse block error");
1003            inner.mem_parsed_kv.insert(block_id, block.clone());
1004            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
1005            Some(BlockChangeRef {
1006                change_index: index,
1007                block,
1008            })
1009        }
1010
1011        fn split_change_then_insert(
1012            &self,
1013            change: Change,
1014            mut rollback: Option<&mut ChangeStoreRollback>,
1015        ) {
1016            let original_len = change.atom_len();
1017            let mut new_change = Change {
1018                ops: RleVec::new(),
1019                deps: change.deps,
1020                id: change.id,
1021                lamport: change.lamport,
1022                timestamp: change.timestamp,
1023                commit_msg: change.commit_msg.clone(),
1024            };
1025
1026            let mut total_len = 0;
1027            let mut estimated_size = new_change.estimate_storage_size();
1028            'outer: for mut op in change.ops.into_iter() {
1029                if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
1030                    new_change = self._insert_splitted_change(
1031                        new_change,
1032                        &mut total_len,
1033                        &mut estimated_size,
1034                        rollback.as_deref_mut(),
1035                    );
1036                }
1037
1038                while let Some(end) =
1039                    op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
1040                {
1041                    // The new op can take the rest of the room
1042                    let new = op.slice(0, end);
1043                    new_change.ops.push(new);
1044                    new_change = self._insert_splitted_change(
1045                        new_change,
1046                        &mut total_len,
1047                        &mut estimated_size,
1048                        rollback.as_deref_mut(),
1049                    );
1050
1051                    if end < op.atom_len() {
1052                        op = op.slice(end, op.atom_len());
1053                    } else {
1054                        continue 'outer;
1055                    }
1056                }
1057
1058                estimated_size += op.estimate_storage_size();
1059                if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
1060                    new_change = self._insert_splitted_change(
1061                        new_change,
1062                        &mut total_len,
1063                        &mut estimated_size,
1064                        rollback.as_deref_mut(),
1065                    );
1066                    new_change.ops.push(op);
1067                } else {
1068                    new_change.ops.push(op);
1069                }
1070            }
1071
1072            if !new_change.ops.is_empty() {
1073                total_len += new_change.atom_len();
1074                self.insert_change_inner(new_change, false, false, rollback.as_deref_mut());
1075            }
1076
1077            assert_eq!(total_len, original_len);
1078        }
1079
1080        fn _insert_splitted_change(
1081            &self,
1082            new_change: Change,
1083            total_len: &mut usize,
1084            estimated_size: &mut usize,
1085            rollback: Option<&mut ChangeStoreRollback>,
1086        ) -> Change {
1087            if new_change.atom_len() == 0 {
1088                return new_change;
1089            }
1090
1091            let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
1092            let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
1093            *total_len += new_change.atom_len();
1094            let ans = Change {
1095                ops: RleVec::new(),
1096                deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
1097                id: ID::new(new_change.id.peer, ctr_end),
1098                lamport: next_lamport,
1099                timestamp: new_change.timestamp,
1100                commit_msg: new_change.commit_msg.clone(),
1101            };
1102
1103            self.insert_change_inner(new_change, false, false, rollback);
1104            *estimated_size = ans.estimate_storage_size();
1105            ans
1106        }
1107
1108        fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1109            let mut inner = self.inner.lock();
1110            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1111                if block.peer == id.peer && block.counter_range.1 > id.counter {
1112                    block
1113                        .ensure_changes(&self.arena)
1114                        .expect("Parse block error");
1115                    return Some(block.clone());
1116                }
1117            }
1118
1119            let store = self.external_kv.lock();
1120            let mut iter = store
1121                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1122                .filter(|(id, _)| id.len() == 12);
1123
1124            // println!(
1125            //     "\nkeys {:?}",
1126            //     store
1127            //         .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1128            //         .filter(|(id, _)| id.len() == 12)
1129            //         .map(|(k, _v)| ID::from_bytes(&k))
1130            //         .count()
1131            // );
1132            // println!("id {:?}", id);
1133
1134            let (b_id, b_bytes) = iter.next_back()?;
1135            let block_id: ID = ID::from_bytes(&b_id[..]);
1136            let block = ChangesBlock::from_bytes(b_bytes)
1137                .expect("validated external change block should decode");
1138            if block_id.peer == id.peer
1139                && block_id.counter <= id.counter
1140                && block.counter_range.1 > id.counter
1141            {
1142                let mut arc_block = Arc::new(block);
1143                arc_block
1144                    .ensure_changes(&self.arena)
1145                    .expect("Parse block error");
1146                inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1147                return Some(arc_block);
1148            }
1149
1150            None
1151        }
1152
1153        /// Load all the blocks that have overlapped with the given ID range into `inner_mem_parsed_kv`
1154        ///
1155        /// This is fast because we don't actually parse the content.
1156        // TODO: PERF: This method feels slow.
1157        pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1158            let mut whether_need_scan_backward = match start {
1159                Bound::Included(id) => Some(id),
1160                Bound::Excluded(id) => Some(id.inc(1)),
1161                Bound::Unbounded => None,
1162            };
1163
1164            {
1165                let start = start.map(|id| id.to_bytes());
1166                let end = end.map(|id| id.to_bytes());
1167                let kv = self.external_kv.lock();
1168                let mut inner = self.inner.lock();
1169                for (id, bytes) in kv
1170                    .scan(
1171                        start.as_ref().map(|x| x.as_slice()),
1172                        end.as_ref().map(|x| x.as_slice()),
1173                    )
1174                    .filter(|(id, _)| id.len() == 12)
1175                {
1176                    let id = ID::from_bytes(&id);
1177                    if let Some(expected_start_id) = whether_need_scan_backward {
1178                        if id == expected_start_id {
1179                            whether_need_scan_backward = None;
1180                        }
1181                    }
1182
1183                    if inner.mem_parsed_kv.contains_key(&id) {
1184                        continue;
1185                    }
1186
1187                    let block = ChangesBlock::from_bytes(bytes.clone())
1188                        .expect("validated external change block should decode");
1189                    inner.mem_parsed_kv.insert(id, Arc::new(block));
1190                }
1191            }
1192
1193            if let Some(start_id) = whether_need_scan_backward {
1194                self.ensure_id_lte(start_id);
1195            }
1196        }
1197
1198        pub(super) fn ensure_id_lte(&self, id: ID) {
1199            let kv = self.external_kv.lock();
1200            let mut inner = self.inner.lock();
1201            let Some((next_back_id, next_back_bytes)) = kv
1202                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1203                .filter(|(id, _)| id.len() == 12)
1204                .next_back()
1205            else {
1206                return;
1207            };
1208
1209            let next_back_id = ID::from_bytes(&next_back_id);
1210            if next_back_id.peer == id.peer {
1211                if inner.mem_parsed_kv.contains_key(&next_back_id) {
1212                    return;
1213                }
1214
1215                let block = ChangesBlock::from_bytes(next_back_bytes)
1216                    .expect("validated external change block should decode");
1217                inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1218            }
1219        }
1220    }
1221}
1222
1223#[must_use]
1224#[derive(Clone, Debug)]
1225pub(crate) struct BatchDecodeInfo {
1226    pub vv: VersionVector,
1227    pub frontiers: Frontiers,
1228    pub start_version: Option<(VersionVector, Frontiers)>,
1229}
1230
1231#[derive(Clone, Debug)]
1232pub struct BlockChangeRef {
1233    block: Arc<ChangesBlock>,
1234    change_index: usize,
1235}
1236
1237impl Deref for BlockChangeRef {
1238    type Target = Change;
1239    fn deref(&self) -> &Change {
1240        &self.block.content.try_changes().unwrap()[self.change_index]
1241    }
1242}
1243
1244impl BlockChangeRef {
1245    pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1246        if counter >= self.ctr_end() {
1247            return None;
1248        }
1249
1250        let index = self.ops.search_atom_index(counter);
1251        Some(BlockOpRef {
1252            block: self.block.clone(),
1253            change_index: self.change_index,
1254            op_index: index,
1255        })
1256    }
1257}
1258
1259#[derive(Clone, Debug)]
1260pub(crate) struct BlockOpRef {
1261    pub block: Arc<ChangesBlock>,
1262    pub change_index: usize,
1263    pub op_index: usize,
1264}
1265
1266impl Deref for BlockOpRef {
1267    type Target = Op;
1268
1269    fn deref(&self) -> &Op {
1270        &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1271    }
1272}
1273
1274impl BlockOpRef {
1275    pub fn lamport(&self) -> Lamport {
1276        let change = &self.block.content.try_changes().unwrap()[self.change_index];
1277        let op = &change.ops[self.op_index];
1278        (op.counter - change.id.counter) as Lamport + change.lamport
1279    }
1280}
1281
1282impl ChangesBlock {
1283    fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1284        let len = bytes.len();
1285        let bytes = ChangesBlockBytes::new(bytes);
1286        bytes.ensure_header()?;
1287        let header = bytes
1288            .header
1289            .get()
1290            .expect("header should be initialized after ensure_header");
1291        let peer = header.peer;
1292        let counter_range = (
1293            header.counter,
1294            *header.counters.last().ok_or_else(|| {
1295                LoroError::DecodeError("Decode block error: missing counters".into())
1296            })?,
1297        );
1298        let lamport_range = (
1299            *header.lamports.first().ok_or_else(|| {
1300                LoroError::DecodeError("Decode block error: missing lamports".into())
1301            })?,
1302            *header.lamports.last().ok_or_else(|| {
1303                LoroError::DecodeError("Decode block error: missing lamports".into())
1304            })?,
1305        );
1306        let content = ChangesBlockContent::Bytes(bytes);
1307        Ok(Self {
1308            peer,
1309            estimated_size: len,
1310            counter_range,
1311            lamport_range,
1312            flushed: true,
1313            content,
1314        })
1315    }
1316
1317    pub(crate) fn content(&self) -> &ChangesBlockContent {
1318        &self.content
1319    }
1320
1321    fn new(change: Change, _a: &SharedArena) -> Self {
1322        let atom_len = change.atom_len();
1323        let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1324        let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1325        let estimated_size = change.estimate_storage_size();
1326        let peer = change.id.peer;
1327        let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1328        Self {
1329            peer,
1330            counter_range,
1331            lamport_range,
1332            estimated_size,
1333            content,
1334            flushed: false,
1335        }
1336    }
1337
1338    #[allow(unused)]
1339    fn cmp_id(&self, id: ID) -> Ordering {
1340        self.peer.cmp(&id.peer).then_with(|| {
1341            if self.counter_range.0 > id.counter {
1342                Ordering::Greater
1343            } else if self.counter_range.1 <= id.counter {
1344                Ordering::Less
1345            } else {
1346                Ordering::Equal
1347            }
1348        })
1349    }
1350
1351    #[allow(unused)]
1352    fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1353        self.peer.cmp(&idlp.0).then_with(|| {
1354            if self.lamport_range.0 > idlp.1 {
1355                Ordering::Greater
1356            } else if self.lamport_range.1 <= idlp.1 {
1357                Ordering::Less
1358            } else {
1359                Ordering::Equal
1360            }
1361        })
1362    }
1363
1364    #[allow(unused)]
1365    fn is_full(&self) -> bool {
1366        self.estimated_size > MAX_BLOCK_SIZE
1367    }
1368
1369    #[allow(clippy::result_large_err)]
1370    fn push_change(
1371        self: &mut Arc<Self>,
1372        change: Change,
1373        new_change_size: usize,
1374        merge_interval: i64,
1375        a: &SharedArena,
1376    ) -> Result<(), Change> {
1377        if self.counter_range.1 != change.id.counter {
1378            return Err(change);
1379        }
1380
1381        let atom_len = change.atom_len();
1382        let next_lamport = change.lamport + atom_len as Lamport;
1383        let next_counter = change.id.counter + atom_len as Counter;
1384
1385        let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1386        let this = Arc::make_mut(self);
1387        let changes = this.content.changes_mut(a).unwrap();
1388        let changes = Arc::make_mut(changes);
1389        match changes.last_mut() {
1390            Some(last)
1391                if last.can_merge_right(&change, merge_interval)
1392                    && (!is_full
1393                        || (change.ops.len() == 1
1394                            && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1395            {
1396                for op in change.ops.into_iter() {
1397                    let size = op.estimate_storage_size();
1398                    if !last.ops.push(op) {
1399                        this.estimated_size += size;
1400                    }
1401                }
1402            }
1403            _ => {
1404                if is_full {
1405                    return Err(change);
1406                } else {
1407                    this.estimated_size += new_change_size;
1408                    changes.push(change);
1409                }
1410            }
1411        }
1412
1413        this.flushed = false;
1414        this.counter_range.1 = next_counter;
1415        this.lamport_range.1 = next_lamport;
1416        Ok(())
1417    }
1418
1419    fn to_bytes(self: &mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1420        match &self.content {
1421            ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1422            ChangesBlockContent::Both(_, bytes) => {
1423                let bytes = bytes.clone();
1424                let this = Arc::make_mut(self);
1425                this.content = ChangesBlockContent::Bytes(bytes.clone());
1426                bytes
1427            }
1428            ChangesBlockContent::Changes(changes) => {
1429                let bytes = ChangesBlockBytes::serialize(changes, a);
1430                let this = Arc::make_mut(self);
1431                this.content = ChangesBlockContent::Bytes(bytes.clone());
1432                bytes
1433            }
1434        }
1435    }
1436
1437    fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1438        match &self.content {
1439            ChangesBlockContent::Changes(_) => Ok(()),
1440            ChangesBlockContent::Both(_, _) => Ok(()),
1441            ChangesBlockContent::Bytes(bytes) => {
1442                let changes = bytes.parse(a)?;
1443                let b = bytes.clone();
1444                let this = Arc::make_mut(self);
1445                this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1446                Ok(())
1447            }
1448        }
1449    }
1450
1451    fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1452        let changes = self.content.try_changes().unwrap();
1453        changes.binary_search_by(|c| {
1454            if c.id.counter > counter {
1455                Ordering::Greater
1456            } else if (c.id.counter + c.content_len() as Counter) <= counter {
1457                Ordering::Less
1458            } else {
1459                Ordering::Equal
1460            }
1461        })
1462    }
1463
1464    fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1465        let changes = self.content.try_changes().unwrap();
1466        let r = changes.binary_search_by(|c| {
1467            if c.lamport > lamport {
1468                Ordering::Greater
1469            } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1470                Ordering::Less
1471            } else {
1472                Ordering::Equal
1473            }
1474        });
1475
1476        match r {
1477            Ok(found) => Some(found),
1478            Err(idx) => {
1479                if idx == 0 {
1480                    None
1481                } else {
1482                    Some(idx - 1)
1483                }
1484            }
1485        }
1486    }
1487
1488    #[allow(unused)]
1489    fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1490        self.content.changes(a)
1491    }
1492
1493    #[allow(unused)]
1494    fn id(&self) -> ID {
1495        ID::new(self.peer, self.counter_range.0)
1496    }
1497
1498    pub fn change_num(&self) -> usize {
1499        match &self.content {
1500            ChangesBlockContent::Changes(c) => c.len(),
1501            ChangesBlockContent::Bytes(b) => b.len_changes(),
1502            ChangesBlockContent::Both(c, _) => c.len(),
1503        }
1504    }
1505}
1506
1507impl ChangesBlockContent {
1508    // TODO: PERF: We can use Iter to replace Vec
1509    pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1510        let mut dag_nodes = Vec::new();
1511        match self {
1512            ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1513                for change in c.iter() {
1514                    let new_node = AppDagNodeInner {
1515                        peer: change.id.peer,
1516                        cnt: change.id.counter,
1517                        lamport: change.lamport,
1518                        deps: change.deps.clone(),
1519                        vv: OnceCell::new(),
1520                        has_succ: false,
1521                        len: change.atom_len(),
1522                    }
1523                    .into();
1524
1525                    dag_nodes.push_rle_element(new_node);
1526                }
1527            }
1528            ChangesBlockContent::Bytes(b) => {
1529                b.ensure_header().unwrap();
1530                let header = b.header.get().unwrap();
1531                let n = header.n_changes;
1532                for i in 0..n {
1533                    let new_node = AppDagNodeInner {
1534                        peer: header.peer,
1535                        cnt: header.counters[i],
1536                        lamport: header.lamports[i],
1537                        deps: header.deps_groups[i].clone(),
1538                        vv: OnceCell::new(),
1539                        has_succ: false,
1540                        len: (header.counters[i + 1] - header.counters[i]) as usize,
1541                    }
1542                    .into();
1543
1544                    dag_nodes.push_rle_element(new_node);
1545                }
1546            }
1547        }
1548
1549        dag_nodes
1550    }
1551
1552    #[allow(unused)]
1553    pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1554        match self {
1555            ChangesBlockContent::Changes(changes) => Ok(changes),
1556            ChangesBlockContent::Both(changes, _) => Ok(changes),
1557            ChangesBlockContent::Bytes(bytes) => {
1558                let changes = bytes.parse(a)?;
1559                *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1560                self.changes(a)
1561            }
1562        }
1563    }
1564
1565    /// Note that this method will invalidate the stored bytes
1566    fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1567        match self {
1568            ChangesBlockContent::Changes(changes) => Ok(changes),
1569            ChangesBlockContent::Both(changes, _) => {
1570                *self = ChangesBlockContent::Changes(std::mem::take(changes));
1571                self.changes_mut(a)
1572            }
1573            ChangesBlockContent::Bytes(bytes) => {
1574                let changes = bytes.parse(a)?;
1575                *self = ChangesBlockContent::Changes(Arc::new(changes));
1576                self.changes_mut(a)
1577            }
1578        }
1579    }
1580
1581    pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1582        match self {
1583            ChangesBlockContent::Changes(changes) => Some(changes),
1584            ChangesBlockContent::Both(changes, _) => Some(changes),
1585            ChangesBlockContent::Bytes(_) => None,
1586        }
1587    }
1588
1589    pub(crate) fn len_changes(&self) -> usize {
1590        match self {
1591            ChangesBlockContent::Changes(changes) => changes.len(),
1592            ChangesBlockContent::Both(changes, _) => changes.len(),
1593            ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1594        }
1595    }
1596}
1597
1598impl std::fmt::Debug for ChangesBlockContent {
1599    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1600        match self {
1601            ChangesBlockContent::Changes(changes) => f
1602                .debug_tuple("ChangesBlockContent::Changes")
1603                .field(changes)
1604                .finish(),
1605            ChangesBlockContent::Bytes(_bytes) => {
1606                f.debug_tuple("ChangesBlockContent::Bytes").finish()
1607            }
1608            ChangesBlockContent::Both(changes, _bytes) => f
1609                .debug_tuple("ChangesBlockContent::Both")
1610                .field(changes)
1611                .finish(),
1612        }
1613    }
1614}
1615
1616impl ChangesBlockBytes {
1617    fn new(bytes: Bytes) -> Self {
1618        Self {
1619            header: OnceCell::new(),
1620            bytes,
1621        }
1622    }
1623
1624    fn ensure_header(&self) -> LoroResult<()> {
1625        self.header
1626            .get_or_try_init(|| decode_header(&self.bytes).map(Arc::new))?;
1627        Ok(())
1628    }
1629
1630    fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1631        self.ensure_header()?;
1632        let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1633        for c in ans.iter() {
1634            // PERF: This can be made faster (low priority)
1635            register_container_and_parent_link(a, c)
1636        }
1637
1638        Ok(ans)
1639    }
1640
1641    fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1642        let bytes = encode_block(changes, a);
1643        // TODO: Perf we can calculate header directly without parsing the bytes
1644        let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1645        bytes.ensure_header().unwrap();
1646        bytes
1647    }
1648
1649    fn lamport_range(&mut self) -> (Lamport, Lamport) {
1650        if let Some(header) = self.header.get() {
1651            (header.lamports[0], *header.lamports.last().unwrap())
1652        } else {
1653            decode_block_range(&self.bytes).unwrap().1
1654        }
1655    }
1656
1657    /// Length of the changes
1658    fn len_changes(&self) -> usize {
1659        self.ensure_header().unwrap();
1660        self.header.get().unwrap().n_changes
1661    }
1662}
1663
1664#[cfg(test)]
1665mod test {
1666    use crate::cursor::PosType;
1667    use crate::{
1668        loro::ExportMode, oplog::convert_change_to_remote, state::TreeParentId, ListHandler,
1669        LoroDoc, MovableListHandler, TextHandler, TreeHandler,
1670    };
1671
1672    use super::*;
1673
1674    fn test_encode_decode(doc: LoroDoc) {
1675        doc.commit_then_renew();
1676        let oplog = doc.oplog().lock();
1677        let bytes = oplog
1678            .change_store
1679            .encode_all(oplog.vv(), oplog.dag.frontiers());
1680        let store = ChangeStore::new_for_test();
1681        let _ = store.import_all(bytes.clone()).unwrap();
1682        assert_eq!(store.external_kv.lock().export_all(), bytes);
1683        let mut changes_parsed = Vec::new();
1684        let a = store.arena.clone();
1685        store.visit_all_changes(&mut |c| {
1686            changes_parsed.push(convert_change_to_remote(&a, c));
1687        });
1688        let mut changes = Vec::new();
1689        oplog.change_store.visit_all_changes(&mut |c| {
1690            changes.push(convert_change_to_remote(&oplog.arena, c));
1691        });
1692        assert_eq!(changes_parsed, changes);
1693    }
1694
1695    #[test]
1696    fn test_change_store() {
1697        let doc = LoroDoc::new_auto_commit();
1698        doc.set_record_timestamp(true);
1699        let t = doc.get_text("t");
1700        t.insert(0, "hello", PosType::Unicode).unwrap();
1701        doc.commit_then_renew();
1702        let t = doc.get_list("t");
1703        t.insert(0, "hello").unwrap();
1704        test_encode_decode(doc);
1705    }
1706
1707    #[test]
1708    fn test_synced_doc() -> LoroResult<()> {
1709        let doc_a = LoroDoc::new_auto_commit();
1710        let doc_b = LoroDoc::new_auto_commit();
1711        let doc_c = LoroDoc::new_auto_commit();
1712
1713        {
1714            // A: Create initial structure
1715            let map = doc_a.get_map("root");
1716            map.insert_container("text", TextHandler::new_detached())?;
1717            map.insert_container("list", ListHandler::new_detached())?;
1718            map.insert_container("tree", TreeHandler::new_detached())?;
1719        }
1720
1721        {
1722            // Sync initial state to B and C
1723            let initial_state = doc_a.export(ExportMode::all_updates()).unwrap();
1724            doc_b.import(&initial_state)?;
1725            doc_c.import(&initial_state)?;
1726        }
1727
1728        {
1729            // B: Edit text and list
1730            let map = doc_b.get_map("root");
1731            let text = map
1732                .insert_container("text", TextHandler::new_detached())
1733                .unwrap();
1734            text.insert(0, "Hello, ", PosType::Unicode)?;
1735
1736            let list = map
1737                .insert_container("list", ListHandler::new_detached())
1738                .unwrap();
1739            list.push("world")?;
1740        }
1741
1742        {
1743            // C: Edit tree and movable list
1744            let map = doc_c.get_map("root");
1745            let tree = map
1746                .insert_container("tree", TreeHandler::new_detached())
1747                .unwrap();
1748            let node_id = tree.create(TreeParentId::Root)?;
1749            tree.get_meta(node_id)?.insert("key", "value")?;
1750            let node_b = tree.create(TreeParentId::Root)?;
1751            tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1752
1753            let movable_list = map
1754                .insert_container("movable", MovableListHandler::new_detached())
1755                .unwrap();
1756            movable_list.push("item1".into())?;
1757            movable_list.push("item2".into())?;
1758            movable_list.mov(0, 1)?;
1759        }
1760
1761        // Sync B's changes to A
1762        let b_changes = doc_b
1763            .export(ExportMode::updates(&doc_a.oplog_vv()))
1764            .unwrap();
1765        doc_a.import(&b_changes)?;
1766
1767        // Sync C's changes to A
1768        let c_changes = doc_c
1769            .export(ExportMode::updates(&doc_a.oplog_vv()))
1770            .unwrap();
1771        doc_a.import(&c_changes)?;
1772
1773        test_encode_decode(doc_a);
1774        Ok(())
1775    }
1776}