loro_internal/oplog/
change_store.rs

1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::sync::Mutex;
4use crate::{
5    arena::SharedArena,
6    change::Change,
7    estimated_size::EstimatedSize,
8    kv_store::KvStore,
9    op::Op,
10    parent::register_container_and_parent_link,
11    version::{Frontiers, ImVersionVector},
12    VersionVector,
13};
14use block_encode::decode_block_range;
15use bytes::Bytes;
16use itertools::Itertools;
17use loro_common::{
18    Counter, HasCounter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport,
19    LoroError, LoroResult, PeerID, ID,
20};
21use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
22use once_cell::sync::OnceCell;
23use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
24use std::sync::atomic::AtomicI64;
25use std::{
26    cmp::Ordering,
27    collections::{BTreeMap, VecDeque},
28    ops::{Bound, Deref},
29    sync::Arc,
30};
31use tracing::{info_span, warn};
32mod block_encode;
33mod block_meta_encode;
34pub(super) mod iter;
35
36#[cfg(not(test))]
37const MAX_BLOCK_SIZE: usize = 1024 * 4;
38#[cfg(test)]
39const MAX_BLOCK_SIZE: usize = 128;
40
41/// # Invariance
42///
43/// - We don't allow holes in a block or between two blocks with the same peer id.
44///   The [Change] should be continuous for each peer.
45/// - However, the first block of a peer can have counter > 0 so that we can trim the history.
46///
47/// # Encoding Schema
48///
49/// It's based on the underlying KV store.
50///
51/// The entries of the KV store is made up of the following fields
52///
53/// |Key                          |Value             |
54/// |:--                          |:----             |
55/// |b"vv"                        |VersionVector     |
56/// |b"fr"                        |Frontiers         |
57/// |b"sv"                        |Shallow VV        |
58/// |b"sf"                        |Shallow Frontiers |
59/// |12 bytes PeerID + Counter    |Encoded Block     |
60#[derive(Debug, Clone)]
61pub struct ChangeStore {
62    inner: Arc<Mutex<ChangeStoreInner>>,
63    arena: SharedArena,
64    /// A change may be in external_kv or in the mem_parsed_kv.
65    /// mem_parsed_kv is more up-to-date.
66    ///
67    /// We cannot directly write into the external_kv except from the initial load
68    external_kv: Arc<Mutex<dyn KvStore>>,
69    /// The version vector of the external kv store.
70    external_vv: Arc<Mutex<VersionVector>>,
71    merge_interval: Arc<AtomicI64>,
72}
73
74#[derive(Debug, Clone)]
75struct ChangeStoreInner {
76    /// The start version vector of the first block for each peer.
77    /// It allows us to trim the history
78    start_vv: ImVersionVector,
79    /// The last version of the shallow history.
80    start_frontiers: Frontiers,
81    /// It's more like a parsed cache for binary_kv.
82    mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct ChangesBlock {
87    peer: PeerID,
88    counter_range: (Counter, Counter),
89    lamport_range: (Lamport, Lamport),
90    /// Estimated size of the block in bytes
91    estimated_size: usize,
92    flushed: bool,
93    content: ChangesBlockContent,
94}
95
96#[derive(Clone)]
97pub(crate) enum ChangesBlockContent {
98    Changes(Arc<Vec<Change>>),
99    Bytes(ChangesBlockBytes),
100    Both(Arc<Vec<Change>>, ChangesBlockBytes),
101}
102
103/// It's cheap to clone this struct because it's cheap to clone the bytes
104#[derive(Clone)]
105pub(crate) struct ChangesBlockBytes {
106    bytes: Bytes,
107    header: OnceCell<Arc<ChangesBlockHeader>>,
108}
109
110pub const START_VV_KEY: &[u8] = b"sv";
111pub const START_FRONTIERS_KEY: &[u8] = b"sf";
112pub const VV_KEY: &[u8] = b"vv";
113pub const FRONTIERS_KEY: &[u8] = b"fr";
114
115impl ChangeStore {
116    pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
117        Self {
118            inner: Arc::new(Mutex::new(ChangeStoreInner {
119                start_vv: ImVersionVector::new(),
120                start_frontiers: Frontiers::default(),
121                mem_parsed_kv: BTreeMap::new(),
122            })),
123            arena: a.clone(),
124            external_vv: Arc::new(Mutex::new(VersionVector::new())),
125            external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
126            // external_kv: Arc::new(Mutex::new(BTreeMap::default())),
127            merge_interval,
128        }
129    }
130
131    #[cfg(test)]
132    fn new_for_test() -> Self {
133        Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
134    }
135
136    pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
137        self.flush_and_compact(vv, frontiers);
138        let mut kv = self.external_kv.lock().unwrap();
139        kv.export_all()
140    }
141
142    #[tracing::instrument(skip(self), level = "debug")]
143    pub(super) fn export_from(
144        &self,
145        start_vv: &VersionVector,
146        start_frontiers: &Frontiers,
147        latest_vv: &VersionVector,
148        latest_frontiers: &Frontiers,
149    ) -> Bytes {
150        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
151        for span in latest_vv.sub_iter(start_vv) {
152            // PERF: this can be optimized by reusing the current encoded blocks
153            // In the current method, it needs to parse and re-encode the blocks
154            for c in self.iter_changes(span) {
155                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
156                    as usize)
157                    .min(c.atom_len());
158                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
159                    as usize)
160                    .min(c.atom_len());
161
162                if start == end {
163                    continue;
164                }
165
166                let ch = c.slice(start, end);
167                new_store.insert_change(ch, false, false);
168            }
169        }
170
171        loro_common::debug!(
172            "start_vv={:?} start_frontiers={:?}",
173            &start_vv,
174            start_frontiers
175        );
176        new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
177    }
178
179    pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
180        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
181        for span in spans {
182            let mut span = *span;
183            span.normalize_();
184            // PERF: this can be optimized by reusing the current encoded blocks
185            // In the current method, it needs to parse and re-encode the blocks
186            for c in self.iter_changes(span) {
187                let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
188                let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
189                if start == end {
190                    continue;
191                }
192
193                let ch = c.slice(start, end);
194                new_store.insert_change(ch, false, false);
195            }
196        }
197
198        encode_blocks_in_store(new_store, &self.arena, w);
199    }
200
201    fn encode_from(
202        &self,
203        start_vv: &VersionVector,
204        start_frontiers: &Frontiers,
205        latest_vv: &VersionVector,
206        latest_frontiers: &Frontiers,
207    ) -> Bytes {
208        {
209            let mut store = self.external_kv.lock().unwrap();
210            store.set(START_VV_KEY, start_vv.encode().into());
211            store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
212            let mut inner = self.inner.lock().unwrap();
213            inner.start_frontiers = start_frontiers.clone();
214            inner.start_vv = ImVersionVector::from_vv(start_vv);
215        }
216        self.flush_and_compact(latest_vv, latest_frontiers);
217        self.external_kv.lock().unwrap().export_all()
218    }
219
220    pub(crate) fn decode_snapshot_for_updates(
221        bytes: Bytes,
222        arena: &SharedArena,
223        self_vv: &VersionVector,
224    ) -> Result<Vec<Change>, LoroError> {
225        let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
226        let _ = change_store.import_all(bytes)?;
227        let mut changes = Vec::new();
228        change_store.visit_all_changes(&mut |c| {
229            let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
230            if c.id.counter >= cnt_threshold {
231                changes.push(c.clone());
232                return;
233            }
234
235            let change_end = c.ctr_end();
236            if change_end > cnt_threshold {
237                changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
238            }
239        });
240
241        Ok(changes)
242    }
243
244    pub(crate) fn decode_block_bytes(
245        bytes: Bytes,
246        arena: &SharedArena,
247        self_vv: &VersionVector,
248    ) -> LoroResult<Vec<Change>> {
249        let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
250        if ans.is_empty() {
251            return Ok(ans);
252        }
253
254        let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
255        ans.retain_mut(|c| {
256            if c.id.counter >= start {
257                true
258            } else if c.ctr_end() > start {
259                *c = c.slice((start - c.id.counter) as usize, c.atom_len());
260                true
261            } else {
262                false
263            }
264        });
265
266        Ok(ans)
267    }
268
269    pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
270        let block = self.get_block_that_contains(id)?;
271        Some(block.content.iter_dag_nodes())
272    }
273
274    pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
275        let block = self.get_the_last_block_of_peer(peer)?;
276        Some(block.content.iter_dag_nodes())
277    }
278
279    pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
280        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
281        let mut inner = self.inner.lock().unwrap();
282        for (_, block) in inner.mem_parsed_kv.iter_mut() {
283            block
284                .ensure_changes(&self.arena)
285                .expect("Parse block error");
286            for c in block.content.try_changes().unwrap() {
287                f(c);
288            }
289        }
290    }
291
292    pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
293        if id_span.counter.start == id_span.counter.end {
294            return vec![];
295        }
296
297        assert!(id_span.counter.start < id_span.counter.end);
298        self.ensure_block_loaded_in_range(
299            Bound::Included(id_span.id_start()),
300            Bound::Excluded(id_span.id_end()),
301        );
302        let mut inner = self.inner.lock().unwrap();
303        let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
304        match next_back {
305            None => {
306                return vec![];
307            }
308            Some(next_back) => {
309                if next_back.0.peer != id_span.peer {
310                    return vec![];
311                }
312            }
313        }
314        let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
315        let ans = inner
316            .mem_parsed_kv
317            .range_mut(
318                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
319            )
320            .filter_map(|(_id, block)| {
321                if block.counter_range.1 < id_span.counter.start {
322                    return None;
323                }
324
325                block
326                    .ensure_changes(&self.arena)
327                    .expect("Parse block error");
328                let changes = block.content.try_changes().unwrap();
329                let start;
330                let end;
331                if id_span.counter.start <= block.counter_range.0
332                    && id_span.counter.end >= block.counter_range.1
333                {
334                    start = 0;
335                    end = changes.len();
336                } else {
337                    start = block
338                        .get_change_index_by_counter(id_span.counter.start)
339                        .unwrap_or_else(|x| x);
340
341                    match block.get_change_index_by_counter(id_span.counter.end - 1) {
342                        Ok(e) => {
343                            end = e + 1;
344                        }
345                        Err(0) => return None,
346                        Err(e) => {
347                            end = e;
348                        }
349                    }
350                }
351                if start == end {
352                    return None;
353                }
354
355                Some((block.clone(), start, end))
356            })
357            // TODO: PERF avoid alloc
358            .collect_vec();
359
360        ans
361    }
362
363    pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
364        let v = self.iter_blocks(id_span);
365        #[cfg(debug_assertions)]
366        {
367            if !v.is_empty() {
368                assert_eq!(v[0].0.peer, id_span.peer);
369                assert_eq!(v.last().unwrap().0.peer, id_span.peer);
370                {
371                    // Test start
372                    let (block, start, _end) = v.first().unwrap();
373                    let changes = block.content.try_changes().unwrap();
374                    assert!(changes[*start].id.counter <= id_span.counter.start);
375                }
376                {
377                    // Test end
378                    let (block, _start, end) = v.last().unwrap();
379                    let changes = block.content.try_changes().unwrap();
380                    assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
381                    assert!(changes[*end - 1].ctr_start() < id_span.counter.end);
382                }
383            }
384        }
385
386        v.into_iter().flat_map(move |(block, start, end)| {
387            (start..end).map(move |i| BlockChangeRef {
388                change_index: i,
389                block: block.clone(),
390            })
391        })
392    }
393
394    pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
395        let mut inner = self.inner.lock().unwrap();
396        let start_counter = inner
397            .mem_parsed_kv
398            .range(..=id_span.id_start())
399            .next_back()
400            .map(|(id, _)| id.counter)
401            .unwrap_or(0);
402        let vec = inner
403            .mem_parsed_kv
404            .range_mut(
405                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
406            )
407            .filter_map(|(_id, block)| {
408                if block.counter_range.1 < id_span.counter.start {
409                    return None;
410                }
411
412                block
413                    .ensure_changes(&self.arena)
414                    .expect("Parse block error");
415                Some(block.clone())
416            })
417            // TODO: PERF avoid alloc
418            .collect();
419        vec
420    }
421
422    pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
423        self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
424        let inner = self.inner.lock().unwrap();
425        let block = inner
426            .mem_parsed_kv
427            .range(..=id)
428            .next_back()
429            .filter(|(_, block)| {
430                block.peer == id.peer
431                    && block.counter_range.0 <= id.counter
432                    && id.counter < block.counter_range.1
433            })
434            .map(|(_, block)| block.clone());
435
436        block
437    }
438
439    pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
440        let end_id = ID::new(peer, Counter::MAX);
441        self.ensure_id_lte(end_id);
442        let inner = self.inner.lock().unwrap();
443        let block = inner
444            .mem_parsed_kv
445            .range(..=end_id)
446            .next_back()
447            .filter(|(_, block)| block.peer == peer)
448            .map(|(_, block)| block.clone());
449
450        block
451    }
452
453    pub fn change_num(&self) -> usize {
454        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
455        let mut inner = self.inner.lock().unwrap();
456        inner
457            .mem_parsed_kv
458            .iter_mut()
459            .map(|(_, block)| block.change_num())
460            .sum()
461    }
462
463    pub fn fork(
464        &self,
465        arena: SharedArena,
466        merge_interval: Arc<AtomicI64>,
467        vv: &VersionVector,
468        frontiers: &Frontiers,
469    ) -> Self {
470        self.flush_and_compact(vv, frontiers);
471        let inner = self.inner.lock().unwrap();
472        Self {
473            inner: Arc::new(Mutex::new(ChangeStoreInner {
474                start_vv: inner.start_vv.clone(),
475                start_frontiers: inner.start_frontiers.clone(),
476                mem_parsed_kv: BTreeMap::new(),
477            })),
478            arena,
479            external_vv: Arc::new(Mutex::new(self.external_vv.lock().unwrap().clone())),
480            external_kv: self.external_kv.lock().unwrap().clone_store(),
481            merge_interval,
482        }
483    }
484
485    pub fn kv_size(&self) -> usize {
486        self.external_kv
487            .lock()
488            .unwrap()
489            .scan(Bound::Unbounded, Bound::Unbounded)
490            .map(|(k, v)| k.len() + v.len())
491            .sum()
492    }
493
494    pub(crate) fn export_blocks_from<W: std::io::Write>(
495        &self,
496        start_vv: &VersionVector,
497        shallow_since_vv: &ImVersionVector,
498        latest_vv: &VersionVector,
499        w: &mut W,
500    ) {
501        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
502        for mut span in latest_vv.sub_iter(start_vv) {
503            let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
504            span.counter.start = span.counter.start.max(counter_lower_bound);
505            span.counter.end = span.counter.end.max(counter_lower_bound);
506            if span.counter.start >= span.counter.end {
507                continue;
508            }
509
510            // PERF: this can be optimized by reusing the current encoded blocks
511            // In the current method, it needs to parse and re-encode the blocks
512            for c in self.iter_changes(span) {
513                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
514                    as usize)
515                    .min(c.atom_len());
516                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
517                    as usize)
518                    .min(c.atom_len());
519
520                assert_ne!(start, end);
521                let ch = c.slice(start, end);
522                new_store.insert_change(ch, false, false);
523            }
524        }
525
526        let arena = &self.arena;
527        encode_blocks_in_store(new_store, arena, w);
528    }
529
530    pub(crate) fn fork_changes_up_to(
531        &self,
532        start_vv: &ImVersionVector,
533        frontiers: &Frontiers,
534        vv: &VersionVector,
535    ) -> Bytes {
536        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
537        for mut span in vv.sub_iter_im(start_vv) {
538            let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
539            span.counter.start = span.counter.start.max(counter_lower_bound);
540            span.counter.end = span.counter.end.max(counter_lower_bound);
541            if span.counter.start >= span.counter.end {
542                continue;
543            }
544
545            // PERF: this can be optimized by reusing the current encoded blocks
546            // In the current method, it needs to parse and re-encode the blocks
547            for c in self.iter_changes(span) {
548                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
549                    as usize)
550                    .min(c.atom_len());
551                let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
552                    as usize)
553                    .min(c.atom_len());
554
555                assert_ne!(start, end);
556                let ch = c.slice(start, end);
557                new_store.insert_change(ch, false, false);
558            }
559        }
560
561        new_store.encode_all(vv, frontiers)
562    }
563}
564
565fn encode_blocks_in_store<W: std::io::Write>(
566    new_store: ChangeStore,
567    arena: &SharedArena,
568    w: &mut W,
569) {
570    let mut inner = new_store.inner.lock().unwrap();
571    for (_id, block) in inner.mem_parsed_kv.iter_mut() {
572        let bytes = block.to_bytes(arena);
573        leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
574        w.write_all(&bytes.bytes).unwrap();
575    }
576}
577
578mod mut_external_kv {
579    //! Only this module contains the code that mutate the external kv store
580    //! All other modules should only read from the external kv store
581    use super::*;
582
583    impl ChangeStore {
584        #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
585        pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
586            let mut kv_store = self.external_kv.lock().unwrap();
587            assert!(
588                // 2 because there are vv and frontiers
589                kv_store.len() <= 2,
590                "kv store should be empty when using decode_all"
591            );
592            kv_store
593                .import_all(bytes)
594                .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
595            let vv_bytes = kv_store.get(VV_KEY).unwrap_or_default();
596            let vv = VersionVector::decode(&vv_bytes).unwrap();
597            let start_vv_bytes = kv_store.get(START_VV_KEY).unwrap_or_default();
598            let start_vv = if start_vv_bytes.is_empty() {
599                Default::default()
600            } else {
601                VersionVector::decode(&start_vv_bytes).unwrap()
602            };
603
604            #[cfg(test)]
605            {
606                // This is for tests
607                drop(kv_store);
608                for (peer, cnt) in vv.iter() {
609                    self.get_change(ID::new(*peer, *cnt - 1)).unwrap();
610                }
611                kv_store = self.external_kv.lock().unwrap();
612            }
613
614            *self.external_vv.lock().unwrap() = vv.clone();
615            let frontiers_bytes = kv_store.get(FRONTIERS_KEY).unwrap_or_default();
616            let frontiers = Frontiers::decode(&frontiers_bytes).unwrap();
617            let start_frontiers = kv_store.get(START_FRONTIERS_KEY).unwrap_or_default();
618            let start_frontiers = if start_frontiers.is_empty() {
619                Default::default()
620            } else {
621                Frontiers::decode(&start_frontiers).unwrap()
622            };
623
624            let mut max_lamport = None;
625            let mut max_timestamp = 0;
626            drop(kv_store);
627            for id in frontiers.iter() {
628                let c = self.get_change(id).unwrap();
629                debug_assert_ne!(c.atom_len(), 0);
630                let l = c.lamport_last();
631                if let Some(x) = max_lamport {
632                    if l > x {
633                        max_lamport = Some(l);
634                    }
635                } else {
636                    max_lamport = Some(l);
637                }
638
639                let t = c.timestamp;
640                if t > max_timestamp {
641                    max_timestamp = t;
642                }
643            }
644
645            Ok(BatchDecodeInfo {
646                vv,
647                frontiers,
648                start_version: if start_vv.is_empty() {
649                    None
650                } else {
651                    let mut inner = self.inner.lock().unwrap();
652                    inner.start_frontiers = start_frontiers.clone();
653                    inner.start_vv = ImVersionVector::from_vv(&start_vv);
654                    Some((start_vv, start_frontiers))
655                },
656            })
657        }
658
659        /// Flush the cached change to kv_store
660        pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
661            let mut inner = self.inner.lock().unwrap();
662            let mut store = self.external_kv.lock().unwrap();
663            let mut external_vv = self.external_vv.lock().unwrap();
664            for (id, block) in inner.mem_parsed_kv.iter_mut() {
665                if !block.flushed {
666                    let id_bytes = id.to_bytes();
667                    let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
668                    assert!(
669                        counter_start < block.counter_range.1,
670                        "Peer={} Block Counter Range={:?}, counter_start={}",
671                        id.peer,
672                        &block.counter_range,
673                        counter_start
674                    );
675                    if counter_start > block.counter_range.0 {
676                        assert!(store.get(&id_bytes).is_some());
677                    }
678                    external_vv.insert(id.peer, block.counter_range.1);
679                    let bytes = block.to_bytes(&self.arena);
680                    store.set(&id_bytes, bytes.bytes);
681                    Arc::make_mut(block).flushed = true;
682                }
683            }
684
685            if inner.start_vv.is_empty() {
686                assert_eq!(&*external_vv, vv);
687            } else {
688                #[cfg(debug_assertions)]
689                {
690                    // TODO: makes some assertions here?
691                }
692            }
693            let vv_bytes = vv.encode();
694            let frontiers_bytes = frontiers.encode();
695            store.set(VV_KEY, vv_bytes.into());
696            store.set(FRONTIERS_KEY, frontiers_bytes.into());
697        }
698    }
699}
700
701mod mut_inner_kv {
702    //! Only this module contains the code that mutate the internal kv store
703    //! All other modules should only read from the internal kv store
704
705    use super::*;
706    impl ChangeStore {
707        /// This method is the **only place** that push a new change into the change store
708        ///
709        /// The new change either merges with the previous block or is put into a new block.
710        /// This method only updates the internal kv store.
711        pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool, is_local: bool) {
712            #[cfg(debug_assertions)]
713            {
714                let vv = self.external_vv.lock().unwrap();
715                assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
716            }
717
718            let s = info_span!("change_store insert_change", id = ?change.id);
719            let _e = s.enter();
720            let estimated_size = change.estimate_storage_size();
721            if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
722                self.split_change_then_insert(change);
723                return;
724            }
725
726            let id = change.id;
727            let mut inner = self.inner.lock().unwrap();
728
729            // try to merge with previous block
730            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
731                if block.peer == change.id.peer {
732                    if block.counter_range.1 != change.id.counter {
733                        panic!("counter should be continuous")
734                    }
735
736                    match block.push_change(
737                        change,
738                        estimated_size,
739                        if is_local {
740                            // local change should try to merge with previous change when
741                            // the timestamp interval <= the `merge_interval`
742                            self.merge_interval
743                                .load(std::sync::atomic::Ordering::Acquire)
744                        } else {
745                            0
746                        },
747                        &self.arena,
748                    ) {
749                        Ok(_) => {
750                            drop(inner);
751                            debug_assert!(self.get_change(id).is_some());
752                            return;
753                        }
754                        Err(c) => change = c,
755                    }
756                }
757            }
758
759            inner
760                .mem_parsed_kv
761                .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
762            drop(inner);
763            debug_assert!(self.get_change(id).is_some());
764        }
765
766        pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
767            let block = self.get_parsed_block(id)?;
768            Some(BlockChangeRef {
769                change_index: block.get_change_index_by_counter(id.counter).unwrap(),
770                block: block.clone(),
771            })
772        }
773
774        /// Get the change with the given peer and lamport.
775        ///
776        /// If not found, return the change with the greatest lamport that is smaller than the given lamport.
777        pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
778            // This method is complicated because we impl binary search on top of the range api
779            // It can be simplified
780            let mut inner = self.inner.lock().unwrap();
781            let mut iter = inner
782                .mem_parsed_kv
783                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
784
785            // This won't change, we only adjust upper_bound
786            let mut lower_bound = 0;
787            let mut upper_bound = i32::MAX;
788            let mut is_binary_searching = false;
789            loop {
790                match iter.next_back() {
791                    Some((&id, block)) => {
792                        if block.lamport_range.0 <= idlp.lamport
793                            && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
794                        {
795                            if !is_binary_searching
796                                && upper_bound != i32::MAX
797                                && upper_bound != block.counter_range.1
798                            {
799                                warn!(
800                                    "There is a hole between the last block and the current block"
801                                );
802                                // There is hole between the last block and the current block
803                                // We need to load it from the kv store
804                                break;
805                            }
806
807                            // Found the block
808                            block
809                                .ensure_changes(&self.arena)
810                                .expect("Parse block error");
811                            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
812                            return Some(BlockChangeRef {
813                                change_index: index,
814                                block: block.clone(),
815                            });
816                        }
817
818                        if is_binary_searching {
819                            let mid_bound = (lower_bound + upper_bound) / 2;
820                            if block.lamport_range.1 <= idlp.lamport {
821                                // Target is larger than the current block (pointed by mid_bound)
822                                lower_bound = mid_bound;
823                            } else {
824                                debug_assert!(
825                                    idlp.lamport < block.lamport_range.0,
826                                    "{} {:?}",
827                                    idlp,
828                                    &block.lamport_range
829                                );
830                                // Target is smaller than the current block (pointed by mid_bound)
831                                upper_bound = mid_bound;
832                            }
833
834                            let mid_bound = (lower_bound + upper_bound) / 2;
835                            iter = inner
836                                .mem_parsed_kv
837                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
838                        } else {
839                            // Test whether we need to switch to binary search by measuring the gap
840                            if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
841                            {
842                                // Use binary search to find the block
843                                upper_bound = id.counter;
844                                let mid_bound = (lower_bound + upper_bound) / 2;
845                                iter = inner.mem_parsed_kv.range_mut(
846                                    ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
847                                );
848                                is_binary_searching = true;
849                            }
850
851                            upper_bound = id.counter;
852                        }
853                    }
854                    None => {
855                        if !is_binary_searching {
856                            break;
857                        }
858
859                        let mid_bound = (lower_bound + upper_bound) / 2;
860                        lower_bound = mid_bound;
861                        if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
862                            // If they are too close, we can just scan the range
863                            iter = inner.mem_parsed_kv.range_mut(
864                                ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
865                            );
866                            is_binary_searching = false;
867                        } else {
868                            let mid_bound = (lower_bound + upper_bound) / 2;
869                            iter = inner
870                                .mem_parsed_kv
871                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
872                        }
873                    }
874                }
875            }
876
877            let counter_end = upper_bound;
878            let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
879
880            let (id, bytes) = 'block_scan: {
881                let kv_store = &self.external_kv.lock().unwrap();
882                let iter = kv_store
883                    .scan(
884                        Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
885                        Bound::Excluded(&scan_end),
886                    )
887                    .rev();
888
889                for (id, bytes) in iter {
890                    let mut block = ChangesBlockBytes::new(bytes.clone());
891                    let (lamport_start, _lamport_end) = block.lamport_range();
892                    if lamport_start <= idlp.lamport {
893                        break 'block_scan (id, bytes);
894                    }
895                }
896
897                return None;
898            };
899
900            let block_id = ID::from_bytes(&id);
901            let mut block = Arc::new(ChangesBlock::from_bytes(bytes).unwrap());
902            block
903                .ensure_changes(&self.arena)
904                .expect("Parse block error");
905            inner.mem_parsed_kv.insert(block_id, block.clone());
906            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
907            Some(BlockChangeRef {
908                change_index: index,
909                block,
910            })
911        }
912
913        fn split_change_then_insert(&self, change: Change) {
914            let original_len = change.atom_len();
915            let mut new_change = Change {
916                ops: RleVec::new(),
917                deps: change.deps,
918                id: change.id,
919                lamport: change.lamport,
920                timestamp: change.timestamp,
921                commit_msg: change.commit_msg.clone(),
922            };
923
924            let mut total_len = 0;
925            let mut estimated_size = new_change.estimate_storage_size();
926            'outer: for mut op in change.ops.into_iter() {
927                if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
928                    new_change = self._insert_splitted_change(
929                        new_change,
930                        &mut total_len,
931                        &mut estimated_size,
932                    );
933                }
934
935                while let Some(end) =
936                    op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
937                {
938                    // The new op can take the rest of the room
939                    let new = op.slice(0, end);
940                    new_change.ops.push(new);
941                    new_change = self._insert_splitted_change(
942                        new_change,
943                        &mut total_len,
944                        &mut estimated_size,
945                    );
946
947                    if end < op.atom_len() {
948                        op = op.slice(end, op.atom_len());
949                    } else {
950                        continue 'outer;
951                    }
952                }
953
954                estimated_size += op.estimate_storage_size();
955                if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
956                    new_change = self._insert_splitted_change(
957                        new_change,
958                        &mut total_len,
959                        &mut estimated_size,
960                    );
961                    new_change.ops.push(op);
962                } else {
963                    new_change.ops.push(op);
964                }
965            }
966
967            if !new_change.ops.is_empty() {
968                total_len += new_change.atom_len();
969                self.insert_change(new_change, false, false);
970            }
971
972            assert_eq!(total_len, original_len);
973        }
974
975        fn _insert_splitted_change(
976            &self,
977            new_change: Change,
978            total_len: &mut usize,
979            estimated_size: &mut usize,
980        ) -> Change {
981            if new_change.atom_len() == 0 {
982                return new_change;
983            }
984
985            let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
986            let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
987            *total_len += new_change.atom_len();
988            let ans = Change {
989                ops: RleVec::new(),
990                deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
991                id: ID::new(new_change.id.peer, ctr_end),
992                lamport: next_lamport,
993                timestamp: new_change.timestamp,
994                commit_msg: new_change.commit_msg.clone(),
995            };
996
997            self.insert_change(new_change, false, false);
998            *estimated_size = ans.estimate_storage_size();
999            ans
1000        }
1001
1002        fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1003            let mut inner = self.inner.lock().unwrap();
1004            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1005                if block.peer == id.peer && block.counter_range.1 > id.counter {
1006                    block
1007                        .ensure_changes(&self.arena)
1008                        .expect("Parse block error");
1009                    return Some(block.clone());
1010                }
1011            }
1012
1013            let store = self.external_kv.lock().unwrap();
1014            let mut iter = store
1015                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1016                .filter(|(id, _)| id.len() == 12);
1017
1018            // println!(
1019            //     "\nkeys {:?}",
1020            //     store
1021            //         .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1022            //         .filter(|(id, _)| id.len() == 12)
1023            //         .map(|(k, _v)| ID::from_bytes(&k))
1024            //         .count()
1025            // );
1026            // println!("id {:?}", id);
1027
1028            let (b_id, b_bytes) = iter.next_back()?;
1029            let block_id: ID = ID::from_bytes(&b_id[..]);
1030            let block = ChangesBlock::from_bytes(b_bytes).unwrap();
1031            if block_id.peer == id.peer
1032                && block_id.counter <= id.counter
1033                && block.counter_range.1 > id.counter
1034            {
1035                let mut arc_block = Arc::new(block);
1036                arc_block
1037                    .ensure_changes(&self.arena)
1038                    .expect("Parse block error");
1039                inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1040                return Some(arc_block);
1041            }
1042
1043            None
1044        }
1045
1046        /// Load all the blocks that have overlapped with the given ID range into `inner_mem_parsed_kv`
1047        ///
1048        /// This is fast because we don't actually parse the content.
1049        // TODO: PERF: This method feels slow.
1050        pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1051            let mut whether_need_scan_backward = match start {
1052                Bound::Included(id) => Some(id),
1053                Bound::Excluded(id) => Some(id.inc(1)),
1054                Bound::Unbounded => None,
1055            };
1056
1057            {
1058                let start = start.map(|id| id.to_bytes());
1059                let end = end.map(|id| id.to_bytes());
1060                let kv = self.external_kv.lock().unwrap();
1061                let mut inner = self.inner.lock().unwrap();
1062                for (id, bytes) in kv
1063                    .scan(
1064                        start.as_ref().map(|x| x.as_slice()),
1065                        end.as_ref().map(|x| x.as_slice()),
1066                    )
1067                    .filter(|(id, _)| id.len() == 12)
1068                {
1069                    let id = ID::from_bytes(&id);
1070                    if let Some(expected_start_id) = whether_need_scan_backward {
1071                        if id == expected_start_id {
1072                            whether_need_scan_backward = None;
1073                        }
1074                    }
1075
1076                    if inner.mem_parsed_kv.contains_key(&id) {
1077                        continue;
1078                    }
1079
1080                    let block = ChangesBlock::from_bytes(bytes.clone()).unwrap();
1081                    inner.mem_parsed_kv.insert(id, Arc::new(block));
1082                }
1083            }
1084
1085            if let Some(start_id) = whether_need_scan_backward {
1086                self.ensure_id_lte(start_id);
1087            }
1088        }
1089
1090        pub(super) fn ensure_id_lte(&self, id: ID) {
1091            let kv = self.external_kv.lock().unwrap();
1092            let mut inner = self.inner.lock().unwrap();
1093            let Some((next_back_id, next_back_bytes)) = kv
1094                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1095                .filter(|(id, _)| id.len() == 12)
1096                .next_back()
1097            else {
1098                return;
1099            };
1100
1101            let next_back_id = ID::from_bytes(&next_back_id);
1102            if next_back_id.peer == id.peer {
1103                if inner.mem_parsed_kv.contains_key(&next_back_id) {
1104                    return;
1105                }
1106
1107                let block = ChangesBlock::from_bytes(next_back_bytes).unwrap();
1108                inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1109            }
1110        }
1111    }
1112}
1113
1114#[must_use]
1115#[derive(Clone, Debug)]
1116pub(crate) struct BatchDecodeInfo {
1117    pub vv: VersionVector,
1118    pub frontiers: Frontiers,
1119    pub start_version: Option<(VersionVector, Frontiers)>,
1120}
1121
1122#[derive(Clone, Debug)]
1123pub struct BlockChangeRef {
1124    block: Arc<ChangesBlock>,
1125    change_index: usize,
1126}
1127
1128impl Deref for BlockChangeRef {
1129    type Target = Change;
1130    fn deref(&self) -> &Change {
1131        &self.block.content.try_changes().unwrap()[self.change_index]
1132    }
1133}
1134
1135impl BlockChangeRef {
1136    pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1137        if counter >= self.ctr_end() {
1138            return None;
1139        }
1140
1141        let index = self.ops.search_atom_index(counter);
1142        Some(BlockOpRef {
1143            block: self.block.clone(),
1144            change_index: self.change_index,
1145            op_index: index,
1146        })
1147    }
1148}
1149
1150#[derive(Clone, Debug)]
1151pub(crate) struct BlockOpRef {
1152    pub block: Arc<ChangesBlock>,
1153    pub change_index: usize,
1154    pub op_index: usize,
1155}
1156
1157impl Deref for BlockOpRef {
1158    type Target = Op;
1159
1160    fn deref(&self) -> &Op {
1161        &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1162    }
1163}
1164
1165impl BlockOpRef {
1166    pub fn lamport(&self) -> Lamport {
1167        let change = &self.block.content.try_changes().unwrap()[self.change_index];
1168        let op = &change.ops[self.op_index];
1169        (op.counter - change.id.counter) as Lamport + change.lamport
1170    }
1171}
1172
1173impl ChangesBlock {
1174    fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1175        let len = bytes.len();
1176        let mut bytes = ChangesBlockBytes::new(bytes);
1177        let peer = bytes.peer();
1178        let counter_range = bytes.counter_range();
1179        let lamport_range = bytes.lamport_range();
1180        let content = ChangesBlockContent::Bytes(bytes);
1181        Ok(Self {
1182            peer,
1183            estimated_size: len,
1184            counter_range,
1185            lamport_range,
1186            flushed: true,
1187            content,
1188        })
1189    }
1190
1191    pub(crate) fn content(&self) -> &ChangesBlockContent {
1192        &self.content
1193    }
1194
1195    fn new(change: Change, _a: &SharedArena) -> Self {
1196        let atom_len = change.atom_len();
1197        let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1198        let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1199        let estimated_size = change.estimate_storage_size();
1200        let peer = change.id.peer;
1201        let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1202        Self {
1203            peer,
1204            counter_range,
1205            lamport_range,
1206            estimated_size,
1207            content,
1208            flushed: false,
1209        }
1210    }
1211
1212    #[allow(unused)]
1213    fn cmp_id(&self, id: ID) -> Ordering {
1214        self.peer.cmp(&id.peer).then_with(|| {
1215            if self.counter_range.0 > id.counter {
1216                Ordering::Greater
1217            } else if self.counter_range.1 <= id.counter {
1218                Ordering::Less
1219            } else {
1220                Ordering::Equal
1221            }
1222        })
1223    }
1224
1225    #[allow(unused)]
1226    fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1227        self.peer.cmp(&idlp.0).then_with(|| {
1228            if self.lamport_range.0 > idlp.1 {
1229                Ordering::Greater
1230            } else if self.lamport_range.1 <= idlp.1 {
1231                Ordering::Less
1232            } else {
1233                Ordering::Equal
1234            }
1235        })
1236    }
1237
1238    #[allow(unused)]
1239    fn is_full(&self) -> bool {
1240        self.estimated_size > MAX_BLOCK_SIZE
1241    }
1242
1243    #[allow(clippy::result_large_err)]
1244    fn push_change(
1245        self: &mut Arc<Self>,
1246        change: Change,
1247        new_change_size: usize,
1248        merge_interval: i64,
1249        a: &SharedArena,
1250    ) -> Result<(), Change> {
1251        if self.counter_range.1 != change.id.counter {
1252            return Err(change);
1253        }
1254
1255        let atom_len = change.atom_len();
1256        let next_lamport = change.lamport + atom_len as Lamport;
1257        let next_counter = change.id.counter + atom_len as Counter;
1258
1259        let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1260        let this = Arc::make_mut(self);
1261        let changes = this.content.changes_mut(a).unwrap();
1262        let changes = Arc::make_mut(changes);
1263        match changes.last_mut() {
1264            Some(last)
1265                if last.can_merge_right(&change, merge_interval)
1266                    && (!is_full
1267                        || (change.ops.len() == 1
1268                            && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1269            {
1270                for op in change.ops.into_iter() {
1271                    let size = op.estimate_storage_size();
1272                    if !last.ops.push(op) {
1273                        this.estimated_size += size;
1274                    }
1275                }
1276            }
1277            _ => {
1278                if is_full {
1279                    return Err(change);
1280                } else {
1281                    this.estimated_size += new_change_size;
1282                    changes.push(change);
1283                }
1284            }
1285        }
1286
1287        this.flushed = false;
1288        this.counter_range.1 = next_counter;
1289        this.lamport_range.1 = next_lamport;
1290        Ok(())
1291    }
1292
1293    fn to_bytes(self: &mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1294        match &self.content {
1295            ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1296            ChangesBlockContent::Both(_, bytes) => {
1297                let bytes = bytes.clone();
1298                let this = Arc::make_mut(self);
1299                this.content = ChangesBlockContent::Bytes(bytes.clone());
1300                bytes
1301            }
1302            ChangesBlockContent::Changes(changes) => {
1303                let bytes = ChangesBlockBytes::serialize(changes, a);
1304                let this = Arc::make_mut(self);
1305                this.content = ChangesBlockContent::Bytes(bytes.clone());
1306                bytes
1307            }
1308        }
1309    }
1310
1311    fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1312        match &self.content {
1313            ChangesBlockContent::Changes(_) => Ok(()),
1314            ChangesBlockContent::Both(_, _) => Ok(()),
1315            ChangesBlockContent::Bytes(bytes) => {
1316                let changes = bytes.parse(a)?;
1317                let b = bytes.clone();
1318                let this = Arc::make_mut(self);
1319                this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1320                Ok(())
1321            }
1322        }
1323    }
1324
1325    fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1326        let changes = self.content.try_changes().unwrap();
1327        changes.binary_search_by(|c| {
1328            if c.id.counter > counter {
1329                Ordering::Greater
1330            } else if (c.id.counter + c.content_len() as Counter) <= counter {
1331                Ordering::Less
1332            } else {
1333                Ordering::Equal
1334            }
1335        })
1336    }
1337
1338    fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1339        let changes = self.content.try_changes().unwrap();
1340        let r = changes.binary_search_by(|c| {
1341            if c.lamport > lamport {
1342                Ordering::Greater
1343            } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1344                Ordering::Less
1345            } else {
1346                Ordering::Equal
1347            }
1348        });
1349
1350        match r {
1351            Ok(found) => Some(found),
1352            Err(idx) => {
1353                if idx == 0 {
1354                    None
1355                } else {
1356                    Some(idx - 1)
1357                }
1358            }
1359        }
1360    }
1361
1362    #[allow(unused)]
1363    fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1364        self.content.changes(a)
1365    }
1366
1367    #[allow(unused)]
1368    fn id(&self) -> ID {
1369        ID::new(self.peer, self.counter_range.0)
1370    }
1371
1372    pub fn change_num(&self) -> usize {
1373        match &self.content {
1374            ChangesBlockContent::Changes(c) => c.len(),
1375            ChangesBlockContent::Bytes(b) => b.len_changes(),
1376            ChangesBlockContent::Both(c, _) => c.len(),
1377        }
1378    }
1379}
1380
1381impl ChangesBlockContent {
1382    // TODO: PERF: We can use Iter to replace Vec
1383    pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1384        let mut dag_nodes = Vec::new();
1385        match self {
1386            ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1387                for change in c.iter() {
1388                    let new_node = AppDagNodeInner {
1389                        peer: change.id.peer,
1390                        cnt: change.id.counter,
1391                        lamport: change.lamport,
1392                        deps: change.deps.clone(),
1393                        vv: OnceCell::new(),
1394                        has_succ: false,
1395                        len: change.atom_len(),
1396                    }
1397                    .into();
1398
1399                    dag_nodes.push_rle_element(new_node);
1400                }
1401            }
1402            ChangesBlockContent::Bytes(b) => {
1403                b.ensure_header().unwrap();
1404                let header = b.header.get().unwrap();
1405                let n = header.n_changes;
1406                for i in 0..n {
1407                    let new_node = AppDagNodeInner {
1408                        peer: header.peer,
1409                        cnt: header.counters[i],
1410                        lamport: header.lamports[i],
1411                        deps: header.deps_groups[i].clone(),
1412                        vv: OnceCell::new(),
1413                        has_succ: false,
1414                        len: (header.counters[i + 1] - header.counters[i]) as usize,
1415                    }
1416                    .into();
1417
1418                    dag_nodes.push_rle_element(new_node);
1419                }
1420            }
1421        }
1422
1423        dag_nodes
1424    }
1425
1426    #[allow(unused)]
1427    pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1428        match self {
1429            ChangesBlockContent::Changes(changes) => Ok(changes),
1430            ChangesBlockContent::Both(changes, _) => Ok(changes),
1431            ChangesBlockContent::Bytes(bytes) => {
1432                let changes = bytes.parse(a)?;
1433                *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1434                self.changes(a)
1435            }
1436        }
1437    }
1438
1439    /// Note that this method will invalidate the stored bytes
1440    fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1441        match self {
1442            ChangesBlockContent::Changes(changes) => Ok(changes),
1443            ChangesBlockContent::Both(changes, _) => {
1444                *self = ChangesBlockContent::Changes(std::mem::take(changes));
1445                self.changes_mut(a)
1446            }
1447            ChangesBlockContent::Bytes(bytes) => {
1448                let changes = bytes.parse(a)?;
1449                *self = ChangesBlockContent::Changes(Arc::new(changes));
1450                self.changes_mut(a)
1451            }
1452        }
1453    }
1454
1455    pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1456        match self {
1457            ChangesBlockContent::Changes(changes) => Some(changes),
1458            ChangesBlockContent::Both(changes, _) => Some(changes),
1459            ChangesBlockContent::Bytes(_) => None,
1460        }
1461    }
1462
1463    pub(crate) fn len_changes(&self) -> usize {
1464        match self {
1465            ChangesBlockContent::Changes(changes) => changes.len(),
1466            ChangesBlockContent::Both(changes, _) => changes.len(),
1467            ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1468        }
1469    }
1470}
1471
1472impl std::fmt::Debug for ChangesBlockContent {
1473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1474        match self {
1475            ChangesBlockContent::Changes(changes) => f
1476                .debug_tuple("ChangesBlockContent::Changes")
1477                .field(changes)
1478                .finish(),
1479            ChangesBlockContent::Bytes(_bytes) => {
1480                f.debug_tuple("ChangesBlockContent::Bytes").finish()
1481            }
1482            ChangesBlockContent::Both(changes, _bytes) => f
1483                .debug_tuple("ChangesBlockContent::Both")
1484                .field(changes)
1485                .finish(),
1486        }
1487    }
1488}
1489
1490impl ChangesBlockBytes {
1491    fn new(bytes: Bytes) -> Self {
1492        Self {
1493            header: OnceCell::new(),
1494            bytes,
1495        }
1496    }
1497
1498    fn ensure_header(&self) -> LoroResult<()> {
1499        self.header
1500            .get_or_init(|| Arc::new(decode_header(&self.bytes).unwrap()));
1501        Ok(())
1502    }
1503
1504    fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1505        self.ensure_header()?;
1506        let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1507        for c in ans.iter() {
1508            // PERF: This can be made faster (low priority)
1509            register_container_and_parent_link(a, c)
1510        }
1511
1512        Ok(ans)
1513    }
1514
1515    fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1516        let bytes = encode_block(changes, a);
1517        // TODO: Perf we can calculate header directly without parsing the bytes
1518        let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1519        bytes.ensure_header().unwrap();
1520        bytes
1521    }
1522
1523    fn peer(&mut self) -> PeerID {
1524        self.ensure_header().unwrap();
1525        self.header.get().as_ref().unwrap().peer
1526    }
1527
1528    fn counter_range(&mut self) -> (Counter, Counter) {
1529        if let Some(header) = self.header.get() {
1530            (header.counter, *header.counters.last().unwrap())
1531        } else {
1532            decode_block_range(&self.bytes).unwrap().0
1533        }
1534    }
1535
1536    fn lamport_range(&mut self) -> (Lamport, Lamport) {
1537        if let Some(header) = self.header.get() {
1538            (header.lamports[0], *header.lamports.last().unwrap())
1539        } else {
1540            decode_block_range(&self.bytes).unwrap().1
1541        }
1542    }
1543
1544    /// Length of the changes
1545    fn len_changes(&self) -> usize {
1546        self.ensure_header().unwrap();
1547        self.header.get().unwrap().n_changes
1548    }
1549}
1550
1551#[cfg(test)]
1552mod test {
1553    use crate::{
1554        loro::ExportMode, oplog::convert_change_to_remote, state::TreeParentId, ListHandler,
1555        LoroDoc, MovableListHandler, TextHandler, TreeHandler,
1556    };
1557
1558    use super::*;
1559
1560    fn test_encode_decode(doc: LoroDoc) {
1561        doc.commit_then_renew();
1562        let oplog = doc.oplog().lock().unwrap();
1563        let bytes = oplog
1564            .change_store
1565            .encode_all(oplog.vv(), oplog.dag.frontiers());
1566        let store = ChangeStore::new_for_test();
1567        let _ = store.import_all(bytes.clone()).unwrap();
1568        assert_eq!(store.external_kv.lock().unwrap().export_all(), bytes);
1569        let mut changes_parsed = Vec::new();
1570        let a = store.arena.clone();
1571        store.visit_all_changes(&mut |c| {
1572            changes_parsed.push(convert_change_to_remote(&a, c));
1573        });
1574        let mut changes = Vec::new();
1575        oplog.change_store.visit_all_changes(&mut |c| {
1576            changes.push(convert_change_to_remote(&oplog.arena, c));
1577        });
1578        assert_eq!(changes_parsed, changes);
1579    }
1580
1581    #[test]
1582    fn test_change_store() {
1583        let doc = LoroDoc::new_auto_commit();
1584        doc.set_record_timestamp(true);
1585        let t = doc.get_text("t");
1586        t.insert(0, "hello").unwrap();
1587        doc.commit_then_renew();
1588        let t = doc.get_list("t");
1589        t.insert(0, "hello").unwrap();
1590        test_encode_decode(doc);
1591    }
1592
1593    #[test]
1594    fn test_synced_doc() -> LoroResult<()> {
1595        let doc_a = LoroDoc::new_auto_commit();
1596        let doc_b = LoroDoc::new_auto_commit();
1597        let doc_c = LoroDoc::new_auto_commit();
1598
1599        {
1600            // A: Create initial structure
1601            let map = doc_a.get_map("root");
1602            map.insert_container("text", TextHandler::new_detached())?;
1603            map.insert_container("list", ListHandler::new_detached())?;
1604            map.insert_container("tree", TreeHandler::new_detached())?;
1605        }
1606
1607        {
1608            // Sync initial state to B and C
1609            let initial_state = doc_a.export(ExportMode::all_updates()).unwrap();
1610            doc_b.import(&initial_state)?;
1611            doc_c.import(&initial_state)?;
1612        }
1613
1614        {
1615            // B: Edit text and list
1616            let map = doc_b.get_map("root");
1617            let text = map
1618                .insert_container("text", TextHandler::new_detached())
1619                .unwrap();
1620            text.insert(0, "Hello, ")?;
1621
1622            let list = map
1623                .insert_container("list", ListHandler::new_detached())
1624                .unwrap();
1625            list.push("world")?;
1626        }
1627
1628        {
1629            // C: Edit tree and movable list
1630            let map = doc_c.get_map("root");
1631            let tree = map
1632                .insert_container("tree", TreeHandler::new_detached())
1633                .unwrap();
1634            let node_id = tree.create(TreeParentId::Root)?;
1635            tree.get_meta(node_id)?.insert("key", "value")?;
1636            let node_b = tree.create(TreeParentId::Root)?;
1637            tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1638
1639            let movable_list = map
1640                .insert_container("movable", MovableListHandler::new_detached())
1641                .unwrap();
1642            movable_list.push("item1".into())?;
1643            movable_list.push("item2".into())?;
1644            movable_list.mov(0, 1)?;
1645        }
1646
1647        // Sync B's changes to A
1648        let b_changes = doc_b.export(ExportMode::updates(&doc_a.oplog_vv())).unwrap();
1649        doc_a.import(&b_changes)?;
1650
1651        // Sync C's changes to A
1652        let c_changes = doc_c.export(ExportMode::updates(&doc_a.oplog_vv())).unwrap();
1653        doc_a.import(&c_changes)?;
1654
1655        test_encode_decode(doc_a);
1656        Ok(())
1657    }
1658}