loro_internal/oplog/
change_store.rs

1use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
2use super::{loro_dag::AppDagNodeInner, AppDagNode};
3use crate::{
4    arena::SharedArena,
5    change::Change,
6    estimated_size::EstimatedSize,
7    kv_store::KvStore,
8    op::Op,
9    parent::register_container_and_parent_link,
10    version::{Frontiers, ImVersionVector},
11    VersionVector,
12};
13use block_encode::decode_block_range;
14use bytes::Bytes;
15use itertools::Itertools;
16use loro_common::{
17    Counter, HasCounter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport,
18    LoroError, LoroResult, PeerID, ID,
19};
20use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
21use once_cell::sync::OnceCell;
22use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
23use std::{
24    cmp::Ordering,
25    collections::{BTreeMap, VecDeque},
26    ops::{Bound, Deref},
27    sync::{atomic::AtomicI64, Arc, Mutex},
28};
29use tracing::{debug, info_span, trace, warn};
30
31mod block_encode;
32mod block_meta_encode;
33pub(super) mod iter;
34
35#[cfg(not(test))]
36const MAX_BLOCK_SIZE: usize = 1024 * 4;
37#[cfg(test)]
38const MAX_BLOCK_SIZE: usize = 128;
39
40/// # Invariance
41///
42/// - We don't allow holes in a block or between two blocks with the same peer id.
43///   The [Change] should be continuous for each peer.
44/// - However, the first block of a peer can have counter > 0 so that we can trim the history.
45///
46/// # Encoding Schema
47///
48/// It's based on the underlying KV store.
49///
50/// The entries of the KV store is made up of the following fields
51///
52/// |Key                          |Value             |
53/// |:--                          |:----             |
54/// |b"vv"                        |VersionVector     |
55/// |b"fr"                        |Frontiers         |
56/// |b"sv"                        |Shallow VV        |
57/// |b"sf"                        |Shallow Frontiers |
58/// |12 bytes PeerID + Counter    |Encoded Block     |
59#[derive(Debug, Clone)]
60pub struct ChangeStore {
61    inner: Arc<Mutex<ChangeStoreInner>>,
62    arena: SharedArena,
63    /// A change may be in external_kv or in the mem_parsed_kv.
64    /// mem_parsed_kv is more up-to-date.
65    ///
66    /// We cannot directly write into the external_kv except from the initial load
67    external_kv: Arc<Mutex<dyn KvStore>>,
68    /// The version vector of the external kv store.
69    external_vv: Arc<Mutex<VersionVector>>,
70    merge_interval: Arc<AtomicI64>,
71}
72
73#[derive(Debug, Clone)]
74struct ChangeStoreInner {
75    /// The start version vector of the first block for each peer.
76    /// It allows us to trim the history
77    start_vv: ImVersionVector,
78    /// The last version of the shallow history.
79    start_frontiers: Frontiers,
80    /// It's more like a parsed cache for binary_kv.
81    mem_parsed_kv: BTreeMap<ID, Arc<ChangesBlock>>,
82}
83
84#[derive(Debug, Clone)]
85pub(crate) struct ChangesBlock {
86    peer: PeerID,
87    counter_range: (Counter, Counter),
88    lamport_range: (Lamport, Lamport),
89    /// Estimated size of the block in bytes
90    estimated_size: usize,
91    flushed: bool,
92    content: ChangesBlockContent,
93}
94
95#[derive(Clone)]
96pub(crate) enum ChangesBlockContent {
97    Changes(Arc<Vec<Change>>),
98    Bytes(ChangesBlockBytes),
99    Both(Arc<Vec<Change>>, ChangesBlockBytes),
100}
101
102/// It's cheap to clone this struct because it's cheap to clone the bytes
103#[derive(Clone)]
104pub(crate) struct ChangesBlockBytes {
105    bytes: Bytes,
106    header: OnceCell<Arc<ChangesBlockHeader>>,
107}
108
109pub const START_VV_KEY: &[u8] = b"sv";
110pub const START_FRONTIERS_KEY: &[u8] = b"sf";
111pub const VV_KEY: &[u8] = b"vv";
112pub const FRONTIERS_KEY: &[u8] = b"fr";
113
114impl ChangeStore {
115    pub fn new_mem(a: &SharedArena, merge_interval: Arc<AtomicI64>) -> Self {
116        Self {
117            inner: Arc::new(Mutex::new(ChangeStoreInner {
118                start_vv: ImVersionVector::new(),
119                start_frontiers: Frontiers::default(),
120                mem_parsed_kv: BTreeMap::new(),
121            })),
122            arena: a.clone(),
123            external_vv: Arc::new(Mutex::new(VersionVector::new())),
124            external_kv: Arc::new(Mutex::new(MemKvStore::new(MemKvConfig::default()))),
125            // external_kv: Arc::new(Mutex::new(BTreeMap::default())),
126            merge_interval,
127        }
128    }
129
130    #[cfg(test)]
131    fn new_for_test() -> Self {
132        Self::new_mem(&SharedArena::new(), Arc::new(AtomicI64::new(0)))
133    }
134
135    pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
136        self.flush_and_compact(vv, frontiers);
137        let mut kv = self.external_kv.try_lock().unwrap();
138        kv.export_all()
139    }
140
141    #[tracing::instrument(skip(self), level = "debug")]
142    pub(super) fn export_from(
143        &self,
144        start_vv: &VersionVector,
145        start_frontiers: &Frontiers,
146        latest_vv: &VersionVector,
147        latest_frontiers: &Frontiers,
148    ) -> Bytes {
149        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
150        for span in latest_vv.sub_iter(start_vv) {
151            // PERF: this can be optimized by reusing the current encoded blocks
152            // In the current method, it needs to parse and re-encode the blocks
153            for c in self.iter_changes(span) {
154                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
155                    as usize)
156                    .min(c.atom_len());
157                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
158                    as usize)
159                    .min(c.atom_len());
160
161                if start == end {
162                    continue;
163                }
164
165                let ch = c.slice(start, end);
166                new_store.insert_change(ch, false, false);
167            }
168        }
169
170        debug!(
171            "start_vv={:?} start_frontiers={:?}",
172            &start_vv, start_frontiers
173        );
174        new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
175    }
176
177    pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
178        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
179        for span in spans {
180            let mut span = *span;
181            span.normalize_();
182            // PERF: this can be optimized by reusing the current encoded blocks
183            // In the current method, it needs to parse and re-encode the blocks
184            for c in self.iter_changes(span) {
185                let start = ((span.counter.start - c.id.counter).max(0) as usize).min(c.atom_len());
186                let end = ((span.counter.end - c.id.counter).max(0) as usize).min(c.atom_len());
187                if start == end {
188                    continue;
189                }
190
191                let ch = c.slice(start, end);
192                new_store.insert_change(ch, false, false);
193            }
194        }
195
196        encode_blocks_in_store(new_store, &self.arena, w);
197    }
198
199    fn encode_from(
200        &self,
201        start_vv: &VersionVector,
202        start_frontiers: &Frontiers,
203        latest_vv: &VersionVector,
204        latest_frontiers: &Frontiers,
205    ) -> Bytes {
206        {
207            let mut store = self.external_kv.try_lock().unwrap();
208            store.set(START_VV_KEY, start_vv.encode().into());
209            store.set(START_FRONTIERS_KEY, start_frontiers.encode().into());
210            let mut inner = self.inner.try_lock().unwrap();
211            inner.start_frontiers = start_frontiers.clone();
212            inner.start_vv = ImVersionVector::from_vv(start_vv);
213        }
214        self.flush_and_compact(latest_vv, latest_frontiers);
215        self.external_kv.try_lock().unwrap().export_all()
216    }
217
218    pub(crate) fn decode_snapshot_for_updates(
219        bytes: Bytes,
220        arena: &SharedArena,
221        self_vv: &VersionVector,
222    ) -> Result<Vec<Change>, LoroError> {
223        let change_store = ChangeStore::new_mem(arena, Arc::new(AtomicI64::new(0)));
224        let _ = change_store.import_all(bytes)?;
225        let mut changes = Vec::new();
226        change_store.visit_all_changes(&mut |c| {
227            let cnt_threshold = self_vv.get(&c.id.peer).copied().unwrap_or(0);
228            if c.id.counter >= cnt_threshold {
229                changes.push(c.clone());
230                return;
231            }
232
233            let change_end = c.ctr_end();
234            if change_end > cnt_threshold {
235                changes.push(c.slice((cnt_threshold - c.id.counter) as usize, c.atom_len()));
236            }
237        });
238
239        Ok(changes)
240    }
241
242    pub(crate) fn decode_block_bytes(
243        bytes: Bytes,
244        arena: &SharedArena,
245        self_vv: &VersionVector,
246    ) -> LoroResult<Vec<Change>> {
247        let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
248        if ans.is_empty() {
249            return Ok(ans);
250        }
251
252        let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
253        ans.retain_mut(|c| {
254            if c.id.counter >= start {
255                true
256            } else if c.ctr_end() > start {
257                *c = c.slice((start - c.id.counter) as usize, c.atom_len());
258                true
259            } else {
260                false
261            }
262        });
263
264        Ok(ans)
265    }
266
267    pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
268        let block = self.get_block_that_contains(id)?;
269        Some(block.content.iter_dag_nodes())
270    }
271
272    pub fn get_last_dag_nodes_for_peer(&self, peer: PeerID) -> Option<Vec<AppDagNode>> {
273        let block = self.get_the_last_block_of_peer(peer)?;
274        Some(block.content.iter_dag_nodes())
275    }
276
277    pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
278        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
279        let mut inner = self.inner.try_lock().unwrap();
280        for (_, block) in inner.mem_parsed_kv.iter_mut() {
281            block
282                .ensure_changes(&self.arena)
283                .expect("Parse block error");
284            for c in block.content.try_changes().unwrap() {
285                f(c);
286            }
287        }
288    }
289
290    pub(crate) fn iter_blocks(&self, id_span: IdSpan) -> Vec<(Arc<ChangesBlock>, usize, usize)> {
291        if id_span.counter.start == id_span.counter.end {
292            return vec![];
293        }
294
295        assert!(id_span.counter.start < id_span.counter.end);
296        self.ensure_block_loaded_in_range(
297            Bound::Included(id_span.id_start()),
298            Bound::Excluded(id_span.id_end()),
299        );
300        let mut inner = self.inner.try_lock().unwrap();
301        let next_back = inner.mem_parsed_kv.range(..=id_span.id_start()).next_back();
302        match next_back {
303            None => {
304                return vec![];
305            }
306            Some(next_back) => {
307                if next_back.0.peer != id_span.peer {
308                    return vec![];
309                }
310            }
311        }
312        let start_counter = next_back.map(|(id, _)| id.counter).unwrap_or(0);
313        let ans = inner
314            .mem_parsed_kv
315            .range_mut(
316                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
317            )
318            .filter_map(|(_id, block)| {
319                if block.counter_range.1 < id_span.counter.start {
320                    return None;
321                }
322
323                block
324                    .ensure_changes(&self.arena)
325                    .expect("Parse block error");
326                let changes = block.content.try_changes().unwrap();
327                let start;
328                let end;
329                if id_span.counter.start <= block.counter_range.0
330                    && id_span.counter.end >= block.counter_range.1
331                {
332                    start = 0;
333                    end = changes.len();
334                } else {
335                    start = block
336                        .get_change_index_by_counter(id_span.counter.start)
337                        .unwrap_or_else(|x| x);
338
339                    match block.get_change_index_by_counter(id_span.counter.end - 1) {
340                        Ok(e) => {
341                            end = e + 1;
342                        }
343                        Err(0) => return None,
344                        Err(e) => {
345                            end = e;
346                        }
347                    }
348                }
349                if start == end {
350                    return None;
351                }
352
353                Some((block.clone(), start, end))
354            })
355            // TODO: PERF avoid alloc
356            .collect_vec();
357
358        ans
359    }
360
361    pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
362        let v = self.iter_blocks(id_span);
363        #[cfg(debug_assertions)]
364        {
365            if !v.is_empty() {
366                assert_eq!(v[0].0.peer, id_span.peer);
367                assert_eq!(v.last().unwrap().0.peer, id_span.peer);
368                {
369                    // Test start
370                    let (block, start, _end) = v.first().unwrap();
371                    let changes = block.content.try_changes().unwrap();
372                    assert!(changes[*start].id.counter <= id_span.counter.start);
373                }
374                {
375                    // Test end
376                    let (block, _start, end) = v.last().unwrap();
377                    let changes = block.content.try_changes().unwrap();
378                    assert!(changes[*end - 1].ctr_end() >= id_span.counter.end);
379                    assert!(changes[*end - 1].ctr_start() < id_span.counter.end);
380                }
381            }
382        }
383
384        v.into_iter().flat_map(move |(block, start, end)| {
385            (start..end).map(move |i| BlockChangeRef {
386                change_index: i,
387                block: block.clone(),
388            })
389        })
390    }
391
392    pub(crate) fn get_blocks_in_range(&self, id_span: IdSpan) -> VecDeque<Arc<ChangesBlock>> {
393        let mut inner = self.inner.try_lock().unwrap();
394        let start_counter = inner
395            .mem_parsed_kv
396            .range(..=id_span.id_start())
397            .next_back()
398            .map(|(id, _)| id.counter)
399            .unwrap_or(0);
400        let vec = inner
401            .mem_parsed_kv
402            .range_mut(
403                ID::new(id_span.peer, start_counter)..ID::new(id_span.peer, id_span.counter.end),
404            )
405            .filter_map(|(_id, block)| {
406                if block.counter_range.1 < id_span.counter.start {
407                    return None;
408                }
409
410                block
411                    .ensure_changes(&self.arena)
412                    .expect("Parse block error");
413                Some(block.clone())
414            })
415            // TODO: PERF avoid alloc
416            .collect();
417        vec
418    }
419
420    pub(crate) fn get_block_that_contains(&self, id: ID) -> Option<Arc<ChangesBlock>> {
421        self.ensure_block_loaded_in_range(Bound::Included(id), Bound::Included(id));
422        let inner = self.inner.try_lock().unwrap();
423        let block = inner
424            .mem_parsed_kv
425            .range(..=id)
426            .next_back()
427            .filter(|(_, block)| {
428                block.peer == id.peer
429                    && block.counter_range.0 <= id.counter
430                    && id.counter < block.counter_range.1
431            })
432            .map(|(_, block)| block.clone());
433
434        block
435    }
436
437    pub(crate) fn get_the_last_block_of_peer(&self, peer: PeerID) -> Option<Arc<ChangesBlock>> {
438        let end_id = ID::new(peer, Counter::MAX);
439        self.ensure_id_lte(end_id);
440        let inner = self.inner.try_lock().unwrap();
441        let block = inner
442            .mem_parsed_kv
443            .range(..=end_id)
444            .next_back()
445            .filter(|(_, block)| block.peer == peer)
446            .map(|(_, block)| block.clone());
447
448        block
449    }
450
451    pub fn change_num(&self) -> usize {
452        self.ensure_block_loaded_in_range(Bound::Unbounded, Bound::Unbounded);
453        let mut inner = self.inner.try_lock().unwrap();
454        inner
455            .mem_parsed_kv
456            .iter_mut()
457            .map(|(_, block)| block.change_num())
458            .sum()
459    }
460
461    pub fn fork(
462        &self,
463        arena: SharedArena,
464        merge_interval: Arc<AtomicI64>,
465        vv: &VersionVector,
466        frontiers: &Frontiers,
467    ) -> Self {
468        self.flush_and_compact(vv, frontiers);
469        let inner = self.inner.try_lock().unwrap();
470        Self {
471            inner: Arc::new(Mutex::new(ChangeStoreInner {
472                start_vv: inner.start_vv.clone(),
473                start_frontiers: inner.start_frontiers.clone(),
474                mem_parsed_kv: BTreeMap::new(),
475            })),
476            arena,
477            external_vv: Arc::new(Mutex::new(self.external_vv.try_lock().unwrap().clone())),
478            external_kv: self.external_kv.try_lock().unwrap().clone_store(),
479            merge_interval,
480        }
481    }
482
483    pub fn kv_size(&self) -> usize {
484        self.external_kv
485            .try_lock()
486            .unwrap()
487            .scan(Bound::Unbounded, Bound::Unbounded)
488            .map(|(k, v)| k.len() + v.len())
489            .sum()
490    }
491
492    pub(crate) fn export_blocks_from<W: std::io::Write>(
493        &self,
494        start_vv: &VersionVector,
495        shallow_since_vv: &ImVersionVector,
496        latest_vv: &VersionVector,
497        w: &mut W,
498    ) {
499        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
500        for mut span in latest_vv.sub_iter(start_vv) {
501            let counter_lower_bound = shallow_since_vv.get(&span.peer).copied().unwrap_or(0);
502            span.counter.start = span.counter.start.max(counter_lower_bound);
503            span.counter.end = span.counter.end.max(counter_lower_bound);
504            if span.counter.start >= span.counter.end {
505                continue;
506            }
507
508            // PERF: this can be optimized by reusing the current encoded blocks
509            // In the current method, it needs to parse and re-encode the blocks
510            for c in self.iter_changes(span) {
511                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
512                    as usize)
513                    .min(c.atom_len());
514                let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
515                    as usize)
516                    .min(c.atom_len());
517
518                assert_ne!(start, end);
519                let ch = c.slice(start, end);
520                new_store.insert_change(ch, false, false);
521            }
522        }
523
524        let arena = &self.arena;
525        encode_blocks_in_store(new_store, arena, w);
526    }
527
528    pub(crate) fn fork_changes_up_to(
529        &self,
530        start_vv: &ImVersionVector,
531        frontiers: &Frontiers,
532        vv: &VersionVector,
533    ) -> Bytes {
534        let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
535        for mut span in vv.sub_iter_im(start_vv) {
536            let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
537            span.counter.start = span.counter.start.max(counter_lower_bound);
538            span.counter.end = span.counter.end.max(counter_lower_bound);
539            if span.counter.start >= span.counter.end {
540                continue;
541            }
542
543            // PERF: this can be optimized by reusing the current encoded blocks
544            // In the current method, it needs to parse and re-encode the blocks
545            for c in self.iter_changes(span) {
546                let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
547                    as usize)
548                    .min(c.atom_len());
549                let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
550                    as usize)
551                    .min(c.atom_len());
552
553                assert_ne!(start, end);
554                let ch = c.slice(start, end);
555                new_store.insert_change(ch, false, false);
556            }
557        }
558
559        new_store.encode_all(vv, frontiers)
560    }
561}
562
563fn encode_blocks_in_store<W: std::io::Write>(
564    new_store: ChangeStore,
565    arena: &SharedArena,
566    w: &mut W,
567) {
568    let mut inner = new_store.inner.try_lock().unwrap();
569    for (_id, block) in inner.mem_parsed_kv.iter_mut() {
570        let bytes = block.to_bytes(arena);
571        leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
572        w.write_all(&bytes.bytes).unwrap();
573    }
574}
575
576mod mut_external_kv {
577    //! Only this module contains the code that mutate the external kv store
578    //! All other modules should only read from the external kv store
579    use super::*;
580
581    impl ChangeStore {
582        #[tracing::instrument(skip_all, level = "debug", name = "change_store import_all")]
583        pub(crate) fn import_all(&self, bytes: Bytes) -> Result<BatchDecodeInfo, LoroError> {
584            let mut kv_store = self.external_kv.try_lock().unwrap();
585            assert!(
586                // 2 because there are vv and frontiers
587                kv_store.len() <= 2,
588                "kv store should be empty when using decode_all"
589            );
590            kv_store
591                .import_all(bytes)
592                .map_err(|e| LoroError::DecodeError(e.into_boxed_str()))?;
593            let vv_bytes = kv_store.get(VV_KEY).unwrap_or_default();
594            let vv = VersionVector::decode(&vv_bytes).unwrap();
595            let start_vv_bytes = kv_store.get(START_VV_KEY).unwrap_or_default();
596            let start_vv = if start_vv_bytes.is_empty() {
597                Default::default()
598            } else {
599                VersionVector::decode(&start_vv_bytes).unwrap()
600            };
601
602            #[cfg(test)]
603            {
604                // This is for tests
605                drop(kv_store);
606                for (peer, cnt) in vv.iter() {
607                    self.get_change(ID::new(*peer, *cnt - 1)).unwrap();
608                }
609                kv_store = self.external_kv.try_lock().unwrap();
610            }
611
612            *self.external_vv.try_lock().unwrap() = vv.clone();
613            let frontiers_bytes = kv_store.get(FRONTIERS_KEY).unwrap_or_default();
614            let frontiers = Frontiers::decode(&frontiers_bytes).unwrap();
615            let start_frontiers = kv_store.get(START_FRONTIERS_KEY).unwrap_or_default();
616            let start_frontiers = if start_frontiers.is_empty() {
617                Default::default()
618            } else {
619                Frontiers::decode(&start_frontiers).unwrap()
620            };
621
622            let mut max_lamport = None;
623            let mut max_timestamp = 0;
624            drop(kv_store);
625            for id in frontiers.iter() {
626                let c = self.get_change(id).unwrap();
627                debug_assert_ne!(c.atom_len(), 0);
628                let l = c.lamport_last();
629                if let Some(x) = max_lamport {
630                    if l > x {
631                        max_lamport = Some(l);
632                    }
633                } else {
634                    max_lamport = Some(l);
635                }
636
637                let t = c.timestamp;
638                if t > max_timestamp {
639                    max_timestamp = t;
640                }
641            }
642
643            Ok(BatchDecodeInfo {
644                vv,
645                frontiers,
646                start_version: if start_vv.is_empty() {
647                    None
648                } else {
649                    let mut inner = self.inner.try_lock().unwrap();
650                    inner.start_frontiers = start_frontiers.clone();
651                    inner.start_vv = ImVersionVector::from_vv(&start_vv);
652                    Some((start_vv, start_frontiers))
653                },
654            })
655        }
656
657        /// Flush the cached change to kv_store
658        pub(crate) fn flush_and_compact(&self, vv: &VersionVector, frontiers: &Frontiers) {
659            let mut inner = self.inner.try_lock().unwrap();
660            let mut store = self.external_kv.try_lock().unwrap();
661            let mut external_vv = self.external_vv.try_lock().unwrap();
662            for (id, block) in inner.mem_parsed_kv.iter_mut() {
663                if !block.flushed {
664                    let id_bytes = id.to_bytes();
665                    let counter_start = external_vv.get(&id.peer).copied().unwrap_or(0);
666                    assert!(
667                        counter_start < block.counter_range.1,
668                        "Peer={} Block Counter Range={:?}, counter_start={}",
669                        id.peer,
670                        &block.counter_range,
671                        counter_start
672                    );
673                    if counter_start > block.counter_range.0 {
674                        assert!(store.get(&id_bytes).is_some());
675                    }
676                    external_vv.insert(id.peer, block.counter_range.1);
677                    let bytes = block.to_bytes(&self.arena);
678                    store.set(&id_bytes, bytes.bytes);
679                    Arc::make_mut(block).flushed = true;
680                }
681            }
682
683            if inner.start_vv.is_empty() {
684                assert_eq!(&*external_vv, vv);
685            } else {
686                #[cfg(debug_assertions)]
687                {
688                    // TODO: makes some assertions here?
689                }
690            }
691            let vv_bytes = vv.encode();
692            let frontiers_bytes = frontiers.encode();
693            store.set(VV_KEY, vv_bytes.into());
694            store.set(FRONTIERS_KEY, frontiers_bytes.into());
695        }
696    }
697}
698
699mod mut_inner_kv {
700    //! Only this module contains the code that mutate the internal kv store
701    //! All other modules should only read from the internal kv store
702
703    use super::*;
704    impl ChangeStore {
705        /// This method is the **only place** that push a new change into the change store
706        ///
707        /// The new change either merges with the previous block or is put into a new block.
708        /// This method only updates the internal kv store.
709        pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool, is_local: bool) {
710            #[cfg(debug_assertions)]
711            {
712                let vv = self.external_vv.try_lock().unwrap();
713                assert!(vv.get(&change.id.peer).copied().unwrap_or(0) <= change.id.counter);
714            }
715
716            let s = info_span!("change_store insert_change", id = ?change.id);
717            let _e = s.enter();
718            let estimated_size = change.estimate_storage_size();
719            if estimated_size > MAX_BLOCK_SIZE && split_when_exceeds {
720                self.split_change_then_insert(change);
721                return;
722            }
723
724            let id = change.id;
725            let mut inner = self.inner.try_lock().unwrap();
726
727            // try to merge with previous block
728            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..id).next_back() {
729                if block.peer == change.id.peer {
730                    if block.counter_range.1 != change.id.counter {
731                        panic!("counter should be continuous")
732                    }
733
734                    match block.push_change(
735                        change,
736                        estimated_size,
737                        if is_local {
738                            // local change should try to merge with previous change when
739                            // the timestamp interval <= the `merge_interval`
740                            self.merge_interval
741                                .load(std::sync::atomic::Ordering::Acquire)
742                        } else {
743                            0
744                        },
745                        &self.arena,
746                    ) {
747                        Ok(_) => {
748                            drop(inner);
749                            debug_assert!(self.get_change(id).is_some());
750                            return;
751                        }
752                        Err(c) => change = c,
753                    }
754                }
755            }
756
757            inner
758                .mem_parsed_kv
759                .insert(id, Arc::new(ChangesBlock::new(change, &self.arena)));
760            drop(inner);
761            debug_assert!(self.get_change(id).is_some());
762        }
763
764        pub fn get_change(&self, id: ID) -> Option<BlockChangeRef> {
765            let block = self.get_parsed_block(id)?;
766            Some(BlockChangeRef {
767                change_index: block.get_change_index_by_counter(id.counter).unwrap(),
768                block: block.clone(),
769            })
770        }
771
772        /// Get the change with the given peer and lamport.
773        ///
774        /// If not found, return the change with the greatest lamport that is smaller than the given lamport.
775        pub fn get_change_by_lamport_lte(&self, idlp: IdLp) -> Option<BlockChangeRef> {
776            // This method is complicated because we impl binary search on top of the range api
777            // It can be simplified
778            let mut inner = self.inner.try_lock().unwrap();
779            let mut iter = inner
780                .mem_parsed_kv
781                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
782
783            // This won't change, we only adjust upper_bound
784            let mut lower_bound = 0;
785            let mut upper_bound = i32::MAX;
786            let mut is_binary_searching = false;
787            loop {
788                match iter.next_back() {
789                    Some((&id, block)) => {
790                        if block.lamport_range.0 <= idlp.lamport
791                            && (!is_binary_searching || idlp.lamport < block.lamport_range.1)
792                        {
793                            if !is_binary_searching
794                                && upper_bound != i32::MAX
795                                && upper_bound != block.counter_range.1
796                            {
797                                warn!(
798                                    "There is a hole between the last block and the current block"
799                                );
800                                // There is hole between the last block and the current block
801                                // We need to load it from the kv store
802                                break;
803                            }
804
805                            // Found the block
806                            block
807                                .ensure_changes(&self.arena)
808                                .expect("Parse block error");
809                            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
810                            return Some(BlockChangeRef {
811                                change_index: index,
812                                block: block.clone(),
813                            });
814                        }
815
816                        if is_binary_searching {
817                            let mid_bound = (lower_bound + upper_bound) / 2;
818                            if block.lamport_range.1 <= idlp.lamport {
819                                // Target is larger than the current block (pointed by mid_bound)
820                                lower_bound = mid_bound;
821                            } else {
822                                debug_assert!(
823                                    idlp.lamport < block.lamport_range.0,
824                                    "{} {:?}",
825                                    idlp,
826                                    &block.lamport_range
827                                );
828                                // Target is smaller than the current block (pointed by mid_bound)
829                                upper_bound = mid_bound;
830                            }
831
832                            let mid_bound = (lower_bound + upper_bound) / 2;
833                            iter = inner
834                                .mem_parsed_kv
835                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
836                        } else {
837                            // Test whether we need to switch to binary search by measuring the gap
838                            if block.lamport_range.0 - idlp.lamport > MAX_BLOCK_SIZE as Lamport * 8
839                            {
840                                // Use binary search to find the block
841                                upper_bound = id.counter;
842                                let mid_bound = (lower_bound + upper_bound) / 2;
843                                iter = inner.mem_parsed_kv.range_mut(
844                                    ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound),
845                                );
846                                trace!("Use binary search");
847                                is_binary_searching = true;
848                            }
849
850                            upper_bound = id.counter;
851                        }
852                    }
853                    None => {
854                        if !is_binary_searching {
855                            trace!("No parsed block found, break");
856                            break;
857                        }
858
859                        let mid_bound = (lower_bound + upper_bound) / 2;
860                        lower_bound = mid_bound;
861                        if upper_bound - lower_bound <= MAX_BLOCK_SIZE as i32 {
862                            // If they are too close, we can just scan the range
863                            iter = inner.mem_parsed_kv.range_mut(
864                                ID::new(idlp.peer, lower_bound)..ID::new(idlp.peer, upper_bound),
865                            );
866                            is_binary_searching = false;
867                        } else {
868                            let mid_bound = (lower_bound + upper_bound) / 2;
869                            iter = inner
870                                .mem_parsed_kv
871                                .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, mid_bound));
872                        }
873                    }
874                }
875            }
876
877            let counter_end = upper_bound;
878            let scan_end = ID::new(idlp.peer, counter_end).to_bytes();
879            trace!(
880                "Scanning from {:?} to {:?}",
881                &ID::new(idlp.peer, 0),
882                &counter_end
883            );
884
885            let (id, bytes) = 'block_scan: {
886                let kv_store = &self.external_kv.try_lock().unwrap();
887                let iter = kv_store
888                    .scan(
889                        Bound::Included(&ID::new(idlp.peer, 0).to_bytes()),
890                        Bound::Excluded(&scan_end),
891                    )
892                    .rev();
893
894                for (id, bytes) in iter {
895                    let mut block = ChangesBlockBytes::new(bytes.clone());
896                    trace!(
897                        "Scanning block counter_range={:?} lamport_range={:?} id={:?}",
898                        &block.counter_range(),
899                        &block.lamport_range(),
900                        &ID::from_bytes(&id)
901                    );
902                    let (lamport_start, _lamport_end) = block.lamport_range();
903                    if lamport_start <= idlp.lamport {
904                        break 'block_scan (id, bytes);
905                    }
906                }
907
908                return None;
909            };
910
911            let block_id = ID::from_bytes(&id);
912            let mut block = Arc::new(ChangesBlock::from_bytes(bytes).unwrap());
913            block
914                .ensure_changes(&self.arena)
915                .expect("Parse block error");
916            inner.mem_parsed_kv.insert(block_id, block.clone());
917            let index = block.get_change_index_by_lamport_lte(idlp.lamport)?;
918            Some(BlockChangeRef {
919                change_index: index,
920                block,
921            })
922        }
923
924        fn split_change_then_insert(&self, change: Change) {
925            let original_len = change.atom_len();
926            let mut new_change = Change {
927                ops: RleVec::new(),
928                deps: change.deps,
929                id: change.id,
930                lamport: change.lamport,
931                timestamp: change.timestamp,
932                commit_msg: change.commit_msg.clone(),
933            };
934
935            let mut total_len = 0;
936            let mut estimated_size = new_change.estimate_storage_size();
937            'outer: for mut op in change.ops.into_iter() {
938                if op.estimate_storage_size() >= MAX_BLOCK_SIZE - estimated_size {
939                    new_change = self._insert_splitted_change(
940                        new_change,
941                        &mut total_len,
942                        &mut estimated_size,
943                    );
944                }
945
946                while let Some(end) =
947                    op.check_whether_slice_content_to_fit_in_size(MAX_BLOCK_SIZE - estimated_size)
948                {
949                    // The new op can take the rest of the room
950                    let new = op.slice(0, end);
951                    new_change.ops.push(new);
952                    new_change = self._insert_splitted_change(
953                        new_change,
954                        &mut total_len,
955                        &mut estimated_size,
956                    );
957
958                    if end < op.atom_len() {
959                        op = op.slice(end, op.atom_len());
960                    } else {
961                        continue 'outer;
962                    }
963                }
964
965                estimated_size += op.estimate_storage_size();
966                if estimated_size > MAX_BLOCK_SIZE && !new_change.ops.is_empty() {
967                    new_change = self._insert_splitted_change(
968                        new_change,
969                        &mut total_len,
970                        &mut estimated_size,
971                    );
972                    new_change.ops.push(op);
973                } else {
974                    new_change.ops.push(op);
975                }
976            }
977
978            if !new_change.ops.is_empty() {
979                total_len += new_change.atom_len();
980                self.insert_change(new_change, false, false);
981            }
982
983            assert_eq!(total_len, original_len);
984        }
985
986        fn _insert_splitted_change(
987            &self,
988            new_change: Change,
989            total_len: &mut usize,
990            estimated_size: &mut usize,
991        ) -> Change {
992            if new_change.atom_len() == 0 {
993                return new_change;
994            }
995
996            let ctr_end = new_change.id.counter + new_change.atom_len() as Counter;
997            let next_lamport = new_change.lamport + new_change.atom_len() as Lamport;
998            *total_len += new_change.atom_len();
999            let ans = Change {
1000                ops: RleVec::new(),
1001                deps: ID::new(new_change.id.peer, ctr_end - 1).into(),
1002                id: ID::new(new_change.id.peer, ctr_end),
1003                lamport: next_lamport,
1004                timestamp: new_change.timestamp,
1005                commit_msg: new_change.commit_msg.clone(),
1006            };
1007
1008            self.insert_change(new_change, false, false);
1009            *estimated_size = ans.estimate_storage_size();
1010            ans
1011        }
1012
1013        fn get_parsed_block(&self, id: ID) -> Option<Arc<ChangesBlock>> {
1014            let mut inner = self.inner.try_lock().unwrap();
1015            // trace!("inner: {:#?}", &inner);
1016            if let Some((_id, block)) = inner.mem_parsed_kv.range_mut(..=id).next_back() {
1017                if block.peer == id.peer && block.counter_range.1 > id.counter {
1018                    block
1019                        .ensure_changes(&self.arena)
1020                        .expect("Parse block error");
1021                    return Some(block.clone());
1022                }
1023            }
1024
1025            let store = self.external_kv.try_lock().unwrap();
1026            let mut iter = store
1027                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1028                .filter(|(id, _)| id.len() == 12);
1029
1030            // println!(
1031            //     "\nkeys {:?}",
1032            //     store
1033            //         .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1034            //         .filter(|(id, _)| id.len() == 12)
1035            //         .map(|(k, _v)| ID::from_bytes(&k))
1036            //         .count()
1037            // );
1038            // println!("id {:?}", id);
1039
1040            let (b_id, b_bytes) = iter.next_back()?;
1041            let block_id: ID = ID::from_bytes(&b_id[..]);
1042            let block = ChangesBlock::from_bytes(b_bytes).unwrap();
1043            trace!(
1044                "block_id={:?} id={:?} counter_range={:?}",
1045                block_id,
1046                id,
1047                block.counter_range
1048            );
1049            if block_id.peer == id.peer
1050                && block_id.counter <= id.counter
1051                && block.counter_range.1 > id.counter
1052            {
1053                let mut arc_block = Arc::new(block);
1054                arc_block
1055                    .ensure_changes(&self.arena)
1056                    .expect("Parse block error");
1057                inner.mem_parsed_kv.insert(block_id, arc_block.clone());
1058                return Some(arc_block);
1059            }
1060
1061            None
1062        }
1063
1064        /// Load all the blocks that have overlapped with the given ID range into `inner_mem_parsed_kv`
1065        ///
1066        /// This is fast because we don't actually parse the content.
1067        // TODO: PERF: This method feels slow.
1068        pub(super) fn ensure_block_loaded_in_range(&self, start: Bound<ID>, end: Bound<ID>) {
1069            let mut whether_need_scan_backward = match start {
1070                Bound::Included(id) => Some(id),
1071                Bound::Excluded(id) => Some(id.inc(1)),
1072                Bound::Unbounded => None,
1073            };
1074
1075            {
1076                let start = start.map(|id| id.to_bytes());
1077                let end = end.map(|id| id.to_bytes());
1078                let kv = self.external_kv.try_lock().unwrap();
1079                let mut inner = self.inner.try_lock().unwrap();
1080                for (id, bytes) in kv
1081                    .scan(
1082                        start.as_ref().map(|x| x.as_slice()),
1083                        end.as_ref().map(|x| x.as_slice()),
1084                    )
1085                    .filter(|(id, _)| id.len() == 12)
1086                {
1087                    let id = ID::from_bytes(&id);
1088                    if let Some(expected_start_id) = whether_need_scan_backward {
1089                        if id == expected_start_id {
1090                            whether_need_scan_backward = None;
1091                        }
1092                    }
1093
1094                    if inner.mem_parsed_kv.contains_key(&id) {
1095                        continue;
1096                    }
1097
1098                    let block = ChangesBlock::from_bytes(bytes.clone()).unwrap();
1099                    inner.mem_parsed_kv.insert(id, Arc::new(block));
1100                }
1101            }
1102
1103            if let Some(start_id) = whether_need_scan_backward {
1104                self.ensure_id_lte(start_id);
1105            }
1106        }
1107
1108        pub(super) fn ensure_id_lte(&self, id: ID) {
1109            let kv = self.external_kv.try_lock().unwrap();
1110            let mut inner = self.inner.try_lock().unwrap();
1111            let Some((next_back_id, next_back_bytes)) = kv
1112                .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
1113                .filter(|(id, _)| id.len() == 12)
1114                .next_back()
1115            else {
1116                return;
1117            };
1118
1119            let next_back_id = ID::from_bytes(&next_back_id);
1120            if next_back_id.peer == id.peer {
1121                if inner.mem_parsed_kv.contains_key(&next_back_id) {
1122                    return;
1123                }
1124
1125                let block = ChangesBlock::from_bytes(next_back_bytes).unwrap();
1126                inner.mem_parsed_kv.insert(next_back_id, Arc::new(block));
1127            }
1128        }
1129    }
1130}
1131
1132#[must_use]
1133#[derive(Clone, Debug)]
1134pub(crate) struct BatchDecodeInfo {
1135    pub vv: VersionVector,
1136    pub frontiers: Frontiers,
1137    pub start_version: Option<(VersionVector, Frontiers)>,
1138}
1139
1140#[derive(Clone, Debug)]
1141pub struct BlockChangeRef {
1142    block: Arc<ChangesBlock>,
1143    change_index: usize,
1144}
1145
1146impl Deref for BlockChangeRef {
1147    type Target = Change;
1148    fn deref(&self) -> &Change {
1149        &self.block.content.try_changes().unwrap()[self.change_index]
1150    }
1151}
1152
1153impl BlockChangeRef {
1154    pub(crate) fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
1155        if counter >= self.ctr_end() {
1156            return None;
1157        }
1158
1159        let index = self.ops.search_atom_index(counter);
1160        Some(BlockOpRef {
1161            block: self.block.clone(),
1162            change_index: self.change_index,
1163            op_index: index,
1164        })
1165    }
1166}
1167
1168#[derive(Clone, Debug)]
1169pub(crate) struct BlockOpRef {
1170    pub block: Arc<ChangesBlock>,
1171    pub change_index: usize,
1172    pub op_index: usize,
1173}
1174
1175impl Deref for BlockOpRef {
1176    type Target = Op;
1177
1178    fn deref(&self) -> &Op {
1179        &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
1180    }
1181}
1182
1183impl BlockOpRef {
1184    pub fn lamport(&self) -> Lamport {
1185        let change = &self.block.content.try_changes().unwrap()[self.change_index];
1186        let op = &change.ops[self.op_index];
1187        (op.counter - change.id.counter) as Lamport + change.lamport
1188    }
1189}
1190
1191impl ChangesBlock {
1192    fn from_bytes(bytes: Bytes) -> LoroResult<Self> {
1193        let len = bytes.len();
1194        let mut bytes = ChangesBlockBytes::new(bytes);
1195        let peer = bytes.peer();
1196        let counter_range = bytes.counter_range();
1197        let lamport_range = bytes.lamport_range();
1198        let content = ChangesBlockContent::Bytes(bytes);
1199        Ok(Self {
1200            peer,
1201            estimated_size: len,
1202            counter_range,
1203            lamport_range,
1204            flushed: true,
1205            content,
1206        })
1207    }
1208
1209    pub(crate) fn content(&self) -> &ChangesBlockContent {
1210        &self.content
1211    }
1212
1213    fn new(change: Change, _a: &SharedArena) -> Self {
1214        let atom_len = change.atom_len();
1215        let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
1216        let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
1217        let estimated_size = change.estimate_storage_size();
1218        let peer = change.id.peer;
1219        let content = ChangesBlockContent::Changes(Arc::new(vec![change]));
1220        Self {
1221            peer,
1222            counter_range,
1223            lamport_range,
1224            estimated_size,
1225            content,
1226            flushed: false,
1227        }
1228    }
1229
1230    #[allow(unused)]
1231    fn cmp_id(&self, id: ID) -> Ordering {
1232        self.peer.cmp(&id.peer).then_with(|| {
1233            if self.counter_range.0 > id.counter {
1234                Ordering::Greater
1235            } else if self.counter_range.1 <= id.counter {
1236                Ordering::Less
1237            } else {
1238                Ordering::Equal
1239            }
1240        })
1241    }
1242
1243    #[allow(unused)]
1244    fn cmp_idlp(&self, idlp: (PeerID, Lamport)) -> Ordering {
1245        self.peer.cmp(&idlp.0).then_with(|| {
1246            if self.lamport_range.0 > idlp.1 {
1247                Ordering::Greater
1248            } else if self.lamport_range.1 <= idlp.1 {
1249                Ordering::Less
1250            } else {
1251                Ordering::Equal
1252            }
1253        })
1254    }
1255
1256    #[allow(unused)]
1257    fn is_full(&self) -> bool {
1258        self.estimated_size > MAX_BLOCK_SIZE
1259    }
1260
1261    fn push_change(
1262        self: &mut Arc<Self>,
1263        change: Change,
1264        new_change_size: usize,
1265        merge_interval: i64,
1266        a: &SharedArena,
1267    ) -> Result<(), Change> {
1268        if self.counter_range.1 != change.id.counter {
1269            return Err(change);
1270        }
1271
1272        let atom_len = change.atom_len();
1273        let next_lamport = change.lamport + atom_len as Lamport;
1274        let next_counter = change.id.counter + atom_len as Counter;
1275
1276        let is_full = new_change_size + self.estimated_size > MAX_BLOCK_SIZE;
1277        let this = Arc::make_mut(self);
1278        let changes = this.content.changes_mut(a).unwrap();
1279        let changes = Arc::make_mut(changes);
1280        match changes.last_mut() {
1281            Some(last)
1282                if last.can_merge_right(&change, merge_interval)
1283                    && (!is_full
1284                        || (change.ops.len() == 1
1285                            && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
1286            {
1287                for op in change.ops.into_iter() {
1288                    let size = op.estimate_storage_size();
1289                    if !last.ops.push(op) {
1290                        this.estimated_size += size;
1291                    }
1292                }
1293            }
1294            _ => {
1295                if is_full {
1296                    return Err(change);
1297                } else {
1298                    this.estimated_size += new_change_size;
1299                    changes.push(change);
1300                }
1301            }
1302        }
1303
1304        this.flushed = false;
1305        this.counter_range.1 = next_counter;
1306        this.lamport_range.1 = next_lamport;
1307        Ok(())
1308    }
1309
1310    fn to_bytes<'a>(self: &'a mut Arc<Self>, a: &SharedArena) -> ChangesBlockBytes {
1311        match &self.content {
1312            ChangesBlockContent::Bytes(bytes) => bytes.clone(),
1313            ChangesBlockContent::Both(_, bytes) => {
1314                let bytes = bytes.clone();
1315                let this = Arc::make_mut(self);
1316                this.content = ChangesBlockContent::Bytes(bytes.clone());
1317                bytes
1318            }
1319            ChangesBlockContent::Changes(changes) => {
1320                let bytes = ChangesBlockBytes::serialize(changes, a);
1321                let this = Arc::make_mut(self);
1322                this.content = ChangesBlockContent::Bytes(bytes.clone());
1323                bytes
1324            }
1325        }
1326    }
1327
1328    fn ensure_changes(self: &mut Arc<Self>, a: &SharedArena) -> LoroResult<()> {
1329        match &self.content {
1330            ChangesBlockContent::Changes(_) => Ok(()),
1331            ChangesBlockContent::Both(_, _) => Ok(()),
1332            ChangesBlockContent::Bytes(bytes) => {
1333                let changes = bytes.parse(a)?;
1334                let b = bytes.clone();
1335                let this = Arc::make_mut(self);
1336                this.content = ChangesBlockContent::Both(Arc::new(changes), b);
1337                Ok(())
1338            }
1339        }
1340    }
1341
1342    fn get_change_index_by_counter(&self, counter: Counter) -> Result<usize, usize> {
1343        let changes = self.content.try_changes().unwrap();
1344        changes.binary_search_by(|c| {
1345            if c.id.counter > counter {
1346                Ordering::Greater
1347            } else if (c.id.counter + c.content_len() as Counter) <= counter {
1348                Ordering::Less
1349            } else {
1350                Ordering::Equal
1351            }
1352        })
1353    }
1354
1355    fn get_change_index_by_lamport_lte(&self, lamport: Lamport) -> Option<usize> {
1356        let changes = self.content.try_changes().unwrap();
1357        let r = changes.binary_search_by(|c| {
1358            if c.lamport > lamport {
1359                Ordering::Greater
1360            } else if (c.lamport + c.content_len() as Lamport) <= lamport {
1361                Ordering::Less
1362            } else {
1363                Ordering::Equal
1364            }
1365        });
1366
1367        match r {
1368            Ok(found) => Some(found),
1369            Err(idx) => {
1370                if idx == 0 {
1371                    None
1372                } else {
1373                    Some(idx - 1)
1374                }
1375            }
1376        }
1377    }
1378
1379    #[allow(unused)]
1380    fn get_changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1381        self.content.changes(a)
1382    }
1383
1384    #[allow(unused)]
1385    fn id(&self) -> ID {
1386        ID::new(self.peer, self.counter_range.0)
1387    }
1388
1389    pub fn change_num(&self) -> usize {
1390        match &self.content {
1391            ChangesBlockContent::Changes(c) => c.len(),
1392            ChangesBlockContent::Bytes(b) => b.len_changes(),
1393            ChangesBlockContent::Both(c, _) => c.len(),
1394        }
1395    }
1396}
1397
1398impl ChangesBlockContent {
1399    // TODO: PERF: We can use Iter to replace Vec
1400    pub fn iter_dag_nodes(&self) -> Vec<AppDagNode> {
1401        let mut dag_nodes = Vec::new();
1402        match self {
1403            ChangesBlockContent::Changes(c) | ChangesBlockContent::Both(c, _) => {
1404                for change in c.iter() {
1405                    let new_node = AppDagNodeInner {
1406                        peer: change.id.peer,
1407                        cnt: change.id.counter,
1408                        lamport: change.lamport,
1409                        deps: change.deps.clone(),
1410                        vv: OnceCell::new(),
1411                        has_succ: false,
1412                        len: change.atom_len(),
1413                    }
1414                    .into();
1415
1416                    dag_nodes.push_rle_element(new_node);
1417                }
1418            }
1419            ChangesBlockContent::Bytes(b) => {
1420                b.ensure_header().unwrap();
1421                let header = b.header.get().unwrap();
1422                let n = header.n_changes;
1423                for i in 0..n {
1424                    let new_node = AppDagNodeInner {
1425                        peer: header.peer,
1426                        cnt: header.counters[i],
1427                        lamport: header.lamports[i],
1428                        deps: header.deps_groups[i].clone(),
1429                        vv: OnceCell::new(),
1430                        has_succ: false,
1431                        len: (header.counters[i + 1] - header.counters[i]) as usize,
1432                    }
1433                    .into();
1434
1435                    dag_nodes.push_rle_element(new_node);
1436                }
1437            }
1438        }
1439
1440        dag_nodes
1441    }
1442
1443    #[allow(unused)]
1444    pub fn changes(&mut self, a: &SharedArena) -> LoroResult<&Vec<Change>> {
1445        match self {
1446            ChangesBlockContent::Changes(changes) => Ok(changes),
1447            ChangesBlockContent::Both(changes, _) => Ok(changes),
1448            ChangesBlockContent::Bytes(bytes) => {
1449                let changes = bytes.parse(a)?;
1450                *self = ChangesBlockContent::Both(Arc::new(changes), bytes.clone());
1451                self.changes(a)
1452            }
1453        }
1454    }
1455
1456    /// Note that this method will invalidate the stored bytes
1457    fn changes_mut(&mut self, a: &SharedArena) -> LoroResult<&mut Arc<Vec<Change>>> {
1458        match self {
1459            ChangesBlockContent::Changes(changes) => Ok(changes),
1460            ChangesBlockContent::Both(changes, _) => {
1461                *self = ChangesBlockContent::Changes(std::mem::take(changes));
1462                self.changes_mut(a)
1463            }
1464            ChangesBlockContent::Bytes(bytes) => {
1465                let changes = bytes.parse(a)?;
1466                *self = ChangesBlockContent::Changes(Arc::new(changes));
1467                self.changes_mut(a)
1468            }
1469        }
1470    }
1471
1472    pub(crate) fn try_changes(&self) -> Option<&Vec<Change>> {
1473        match self {
1474            ChangesBlockContent::Changes(changes) => Some(changes),
1475            ChangesBlockContent::Both(changes, _) => Some(changes),
1476            ChangesBlockContent::Bytes(_) => None,
1477        }
1478    }
1479
1480    pub(crate) fn len_changes(&self) -> usize {
1481        match self {
1482            ChangesBlockContent::Changes(changes) => changes.len(),
1483            ChangesBlockContent::Both(changes, _) => changes.len(),
1484            ChangesBlockContent::Bytes(bytes) => bytes.len_changes(),
1485        }
1486    }
1487}
1488
1489impl std::fmt::Debug for ChangesBlockContent {
1490    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1491        match self {
1492            ChangesBlockContent::Changes(changes) => f
1493                .debug_tuple("ChangesBlockContent::Changes")
1494                .field(changes)
1495                .finish(),
1496            ChangesBlockContent::Bytes(_bytes) => {
1497                f.debug_tuple("ChangesBlockContent::Bytes").finish()
1498            }
1499            ChangesBlockContent::Both(changes, _bytes) => f
1500                .debug_tuple("ChangesBlockContent::Both")
1501                .field(changes)
1502                .finish(),
1503        }
1504    }
1505}
1506
1507impl ChangesBlockBytes {
1508    fn new(bytes: Bytes) -> Self {
1509        Self {
1510            header: OnceCell::new(),
1511            bytes,
1512        }
1513    }
1514
1515    fn ensure_header(&self) -> LoroResult<()> {
1516        self.header
1517            .get_or_init(|| Arc::new(decode_header(&self.bytes).unwrap()));
1518        Ok(())
1519    }
1520
1521    fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
1522        self.ensure_header()?;
1523        let ans: Vec<Change> = decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))?;
1524        for c in ans.iter() {
1525            // PERF: This can be made faster (low priority)
1526            register_container_and_parent_link(a, c)
1527        }
1528
1529        Ok(ans)
1530    }
1531
1532    fn serialize(changes: &[Change], a: &SharedArena) -> Self {
1533        let bytes = encode_block(changes, a);
1534        // TODO: Perf we can calculate header directly without parsing the bytes
1535        let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
1536        bytes.ensure_header().unwrap();
1537        bytes
1538    }
1539
1540    fn peer(&mut self) -> PeerID {
1541        self.ensure_header().unwrap();
1542        self.header.get().as_ref().unwrap().peer
1543    }
1544
1545    fn counter_range(&mut self) -> (Counter, Counter) {
1546        if let Some(header) = self.header.get() {
1547            (header.counter, *header.counters.last().unwrap())
1548        } else {
1549            decode_block_range(&self.bytes).unwrap().0
1550        }
1551    }
1552
1553    fn lamport_range(&mut self) -> (Lamport, Lamport) {
1554        if let Some(header) = self.header.get() {
1555            (header.lamports[0], *header.lamports.last().unwrap())
1556        } else {
1557            decode_block_range(&self.bytes).unwrap().1
1558        }
1559    }
1560
1561    /// Length of the changes
1562    fn len_changes(&self) -> usize {
1563        self.ensure_header().unwrap();
1564        self.header.get().unwrap().n_changes
1565    }
1566}
1567
1568#[cfg(test)]
1569mod test {
1570    use crate::{
1571        oplog::convert_change_to_remote, state::TreeParentId, ListHandler, LoroDoc,
1572        MovableListHandler, TextHandler, TreeHandler,
1573    };
1574
1575    use super::*;
1576
1577    fn test_encode_decode(doc: LoroDoc) {
1578        doc.commit_then_renew();
1579        let oplog = doc.oplog().try_lock().unwrap();
1580        let bytes = oplog
1581            .change_store
1582            .encode_all(oplog.vv(), oplog.dag.frontiers());
1583        let store = ChangeStore::new_for_test();
1584        let _ = store.import_all(bytes.clone()).unwrap();
1585        assert_eq!(store.external_kv.try_lock().unwrap().export_all(), bytes);
1586        let mut changes_parsed = Vec::new();
1587        let a = store.arena.clone();
1588        store.visit_all_changes(&mut |c| {
1589            changes_parsed.push(convert_change_to_remote(&a, c));
1590        });
1591        let mut changes = Vec::new();
1592        oplog.change_store.visit_all_changes(&mut |c| {
1593            changes.push(convert_change_to_remote(&oplog.arena, c));
1594        });
1595        assert_eq!(changes_parsed, changes);
1596    }
1597
1598    #[test]
1599    fn test_change_store() {
1600        let doc = LoroDoc::new_auto_commit();
1601        doc.set_record_timestamp(true);
1602        let t = doc.get_text("t");
1603        t.insert(0, "hello").unwrap();
1604        doc.commit_then_renew();
1605        let t = doc.get_list("t");
1606        t.insert(0, "hello").unwrap();
1607        test_encode_decode(doc);
1608    }
1609
1610    #[test]
1611    fn test_synced_doc() -> LoroResult<()> {
1612        let doc_a = LoroDoc::new_auto_commit();
1613        let doc_b = LoroDoc::new_auto_commit();
1614        let doc_c = LoroDoc::new_auto_commit();
1615
1616        {
1617            // A: Create initial structure
1618            let map = doc_a.get_map("root");
1619            map.insert_container("text", TextHandler::new_detached())?;
1620            map.insert_container("list", ListHandler::new_detached())?;
1621            map.insert_container("tree", TreeHandler::new_detached())?;
1622        }
1623
1624        {
1625            // Sync initial state to B and C
1626            let initial_state = doc_a.export_from(&Default::default());
1627            doc_b.import(&initial_state)?;
1628            doc_c.import(&initial_state)?;
1629        }
1630
1631        {
1632            // B: Edit text and list
1633            let map = doc_b.get_map("root");
1634            let text = map
1635                .insert_container("text", TextHandler::new_detached())
1636                .unwrap();
1637            text.insert(0, "Hello, ")?;
1638
1639            let list = map
1640                .insert_container("list", ListHandler::new_detached())
1641                .unwrap();
1642            list.push("world")?;
1643        }
1644
1645        {
1646            // C: Edit tree and movable list
1647            let map = doc_c.get_map("root");
1648            let tree = map
1649                .insert_container("tree", TreeHandler::new_detached())
1650                .unwrap();
1651            let node_id = tree.create(TreeParentId::Root)?;
1652            tree.get_meta(node_id)?.insert("key", "value")?;
1653            let node_b = tree.create(TreeParentId::Root)?;
1654            tree.move_to(node_b, TreeParentId::Root, 0).unwrap();
1655
1656            let movable_list = map
1657                .insert_container("movable", MovableListHandler::new_detached())
1658                .unwrap();
1659            movable_list.push("item1".into())?;
1660            movable_list.push("item2".into())?;
1661            movable_list.mov(0, 1)?;
1662        }
1663
1664        // Sync B's changes to A
1665        let b_changes = doc_b.export_from(&doc_a.oplog_vv());
1666        doc_a.import(&b_changes)?;
1667
1668        // Sync C's changes to A
1669        let c_changes = doc_c.export_from(&doc_a.oplog_vv());
1670        doc_a.import(&c_changes)?;
1671
1672        test_encode_decode(doc_a);
1673        Ok(())
1674    }
1675}