Skip to main content

loro_internal/
oplog.rs

1mod change_store;
2pub(crate) mod loro_dag;
3mod pending_changes;
4
5use crate::sync::{AtomicUsize, Mutex};
6use bytes::Bytes;
7use std::borrow::Cow;
8use std::cell::RefCell;
9use std::cmp::Ordering;
10use std::rc::Rc;
11use std::sync::Arc;
12use tracing::trace_span;
13
14use self::change_store::iter::MergedChangeIter;
15use self::pending_changes::{PendingChanges, PendingChangesRollback};
16use super::arena::{SharedArena, SharedArenaRollback};
17use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
18use crate::configure::Configure;
19use crate::container::list::list_op;
20use crate::dag::{Dag, DagUtils};
21use crate::diff_calc::DiffMode;
22use crate::encoding::decode_oplog;
23use crate::encoding::{ImportStatus, ParsedHeaderAndBody};
24use crate::history_cache::ContainerHistoryCache;
25use crate::id::{Counter, PeerID, ID};
26use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
27use crate::span::{HasCounterSpan, HasLamportSpan};
28use crate::version::{Frontiers, ImVersionVector, VersionVector};
29use crate::LoroError;
30use change_store::{BlockOpRef, ChangeStoreRollback};
31use loro_common::{ContainerType, HasIdSpan, IdLp, IdSpan};
32use rle::{HasLength, RleVec, Sliceable};
33use smallvec::SmallVec;
34
35pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
36pub use change_store::{BlockChangeRef, ChangeStore};
37
38/// [OpLog] store all the ops i.e. the history.
39/// It allows multiple [AppState] to attach to it.
40/// So you can derive different versions of the state from the [OpLog].
41/// It allows us to build a version control system.
42///
43/// The causal graph should always be a DAG and complete. So we can always find the LCA.
44/// If deps are missing, we can't import the change. It will be put into the `pending_changes`.
45pub struct OpLog {
46    pub(crate) dag: AppDag,
47    pub(crate) arena: SharedArena,
48    visible_op_count: Arc<AtomicUsize>,
49    change_store: ChangeStore,
50    history_cache: Mutex<ContainerHistoryCache>,
51    /// Pending changes that haven't been applied to the dag.
52    /// A change can be imported only when all its deps are already imported.
53    /// Key is the ID of the missing dep
54    pub(crate) pending_changes: PendingChanges,
55    /// Whether we are importing a batch of changes.
56    /// If so the Dag's frontiers won't be updated until the batch is finished.
57    pub(crate) batch_importing: bool,
58    pub(crate) configure: Configure,
59    /// The uncommitted change, it's a placeholder for the change
60    /// that is being edited in pre-commit callback.
61    pub(crate) uncommitted_change: Option<Change>,
62    pub(crate) import_rollback: Option<ImportRollback>,
63}
64
65pub(crate) struct ImportRollback {
66    old_vv: VersionVector,
67    arena: SharedArenaRollback,
68    change_store: ChangeStoreRollback,
69    pending: PendingChangesRollback,
70}
71
72#[derive(Debug, Default, Clone, Copy)]
73pub(crate) struct ImportChangesPreflight {
74    pub applies_to_dag: bool,
75    pub has_deps_before_shallow_root: bool,
76    pub needs_state_apply_rollback: bool,
77}
78
79impl std::fmt::Debug for OpLog {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("OpLog")
82            .field("dag", &self.dag)
83            .field("pending_changes", &self.pending_changes)
84            .finish()
85    }
86}
87
88impl OpLog {
89    #[inline]
90    pub(crate) fn new(visible_op_count: Arc<AtomicUsize>) -> Self {
91        let arena = SharedArena::new();
92        let cfg = Configure::default();
93        let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
94        Self {
95            visible_op_count,
96            history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
97            dag: AppDag::new(change_store.clone()),
98            change_store,
99            arena,
100            pending_changes: Default::default(),
101            batch_importing: false,
102            configure: cfg,
103            uncommitted_change: None,
104            import_rollback: None,
105        }
106    }
107
108    #[inline]
109    fn calc_visible_op_count(&self) -> usize {
110        let total = self.dag.vv().values().sum::<i32>() as usize;
111        let shallow = self
112            .dag
113            .shallow_since_vv()
114            .iter()
115            .map(|(_, ops)| *ops)
116            .sum::<i32>() as usize;
117        total - shallow
118    }
119
120    #[inline]
121    pub(crate) fn visible_op_count_exact(&self) -> usize {
122        self.calc_visible_op_count()
123    }
124
125    #[inline]
126    pub(crate) fn refresh_visible_op_count(&self) -> usize {
127        let count = self.calc_visible_op_count();
128        self.visible_op_count
129            .store(count, std::sync::atomic::Ordering::Release);
130        count
131    }
132
133    #[inline]
134    pub fn dag(&self) -> &AppDag {
135        &self.dag
136    }
137
138    pub fn change_store(&self) -> &ChangeStore {
139        &self.change_store
140    }
141
142    /// Get the change with the given peer and lamport.
143    ///
144    /// If not found, return the change with the greatest lamport that is smaller than the given lamport.
145    pub fn get_change_with_lamport_lte(
146        &self,
147        peer: PeerID,
148        lamport: Lamport,
149    ) -> Option<BlockChangeRef> {
150        let ans = self
151            .change_store
152            .get_change_by_lamport_lte(IdLp::new(peer, lamport))?;
153        debug_assert!(ans.lamport <= lamport);
154        Some(ans)
155    }
156
157    pub fn get_timestamp_of_version(&self, f: &Frontiers) -> Timestamp {
158        let mut timestamp = Timestamp::default();
159        for id in f.iter() {
160            if let Some(change) = self.lookup_change(id) {
161                timestamp = timestamp.max(change.timestamp);
162            }
163        }
164
165        timestamp
166    }
167
168    #[inline]
169    pub fn is_empty(&self) -> bool {
170        self.dag.is_empty() && self.arena.can_import_snapshot()
171    }
172
173    /// This is the **only** place to update the `OpLog.changes`
174    pub(crate) fn insert_new_change(&mut self, change: Change, from_local: bool) {
175        let s = trace_span!(
176            "insert_new_change",
177            id = ?change.id,
178            lamport = change.lamport,
179            deps = ?change.deps
180        );
181        let _enter = s.enter();
182        let rollback_old_vv = self
183            .import_rollback
184            .as_ref()
185            .and_then(|x| (!x.old_vv.is_empty()).then_some(&x.old_vv));
186        self.dag
187            .handle_new_change(&change, from_local, rollback_old_vv);
188        self.history_cache
189            .lock()
190            .insert_by_new_change(&change, true, true);
191        self.register_container_and_parent_link(&change);
192        if let Some(rollback) = self.import_rollback.as_mut() {
193            self.change_store.insert_change_with_rollback(
194                change,
195                true,
196                from_local,
197                &mut rollback.change_store,
198            );
199        } else {
200            self.change_store.insert_change(change, true, from_local);
201        }
202        self.refresh_visible_op_count();
203    }
204
205    pub(crate) fn begin_import_rollback(&mut self) {
206        let arena = self.arena.checkpoint_for_rollback();
207        self.begin_import_rollback_with_arena(arena);
208    }
209
210    pub(crate) fn begin_import_rollback_with_arena(&mut self, arena: SharedArenaRollback) {
211        debug_assert!(self.import_rollback.is_none());
212        let old_vv = self.vv().clone();
213        self.dag.begin_import_rollback();
214        self.import_rollback = Some(ImportRollback {
215            old_vv: old_vv.clone(),
216            arena,
217            change_store: ChangeStoreRollback::new(old_vv),
218            pending: Default::default(),
219        });
220    }
221
222    pub(crate) fn commit_import_rollback(&mut self) {
223        self.dag.commit_import_rollback();
224        self.import_rollback = None;
225    }
226
227    pub(crate) fn preflight_import_changes(&self, changes: &[Change]) -> ImportChangesPreflight {
228        let mut ans = ImportChangesPreflight::default();
229        let pending_needs_state_apply_rollback =
230            self.pending_changes.has_state_apply_rollback_ops();
231        for change in changes {
232            if change.ctr_end() <= self.vv().get(&change.id.peer).copied().unwrap_or(0) {
233                continue;
234            }
235
236            if self.dag.is_before_shallow_root(&change.deps) {
237                ans.has_deps_before_shallow_root = true;
238                continue;
239            }
240
241            if self
242                .dag
243                .get_change_lamport_from_deps(&change.deps)
244                .is_none()
245            {
246                continue;
247            }
248
249            ans.applies_to_dag = true;
250            if change.ops.iter().any(|op| {
251                matches!(
252                    op.container.get_type(),
253                    ContainerType::List | ContainerType::Tree
254                )
255            }) {
256                ans.needs_state_apply_rollback = true;
257            }
258        }
259
260        // Any newly applied change can unlock pending changes whose ops are not
261        // visible in `changes`, so include pending in the rollback decision.
262        // Keep this narrow: text/map-only pending changes cannot return a
263        // state-apply error, and forcing rollback there adds lock traffic to
264        // small sync/import workloads.
265        if ans.applies_to_dag && pending_needs_state_apply_rollback {
266            ans.needs_state_apply_rollback = true;
267        }
268
269        #[cfg(test)]
270        if ans.applies_to_dag {
271            ans.needs_state_apply_rollback = true;
272        }
273
274        ans
275    }
276
277    pub(crate) fn rollback_import(&mut self) {
278        let Some(rollback) = self.import_rollback.take() else {
279            return;
280        };
281
282        self.change_store.rollback_import(rollback.change_store);
283        self.dag.rollback_import();
284        rollback.pending.rollback(&mut self.pending_changes);
285        self.history_cache.lock().free_all();
286        self.arena.rollback(rollback.arena);
287        self.refresh_visible_op_count();
288    }
289
290    pub(crate) fn reset_to_empty_for_failed_snapshot_import(
291        &mut self,
292        arena_checkpoint: SharedArenaRollback,
293    ) {
294        let arena = self.arena.clone();
295        let configure = self.configure.clone();
296        arena.rollback(arena_checkpoint);
297        let change_store = ChangeStore::new_mem(&arena, configure.merge_interval_in_s.clone());
298        self.history_cache = Mutex::new(ContainerHistoryCache::new(change_store.clone(), None));
299        self.dag = AppDag::new(change_store.clone());
300        self.change_store = change_store;
301        self.pending_changes = Default::default();
302        self.batch_importing = false;
303        self.configure = configure;
304        self.uncommitted_change = None;
305        self.import_rollback = None;
306        self.visible_op_count
307            .store(0, std::sync::atomic::Ordering::Release);
308    }
309
310    #[inline(always)]
311    pub(crate) fn with_history_cache<F, R>(&self, f: F) -> R
312    where
313        F: FnOnce(&mut ContainerHistoryCache) -> R,
314    {
315        let mut history_cache = self.history_cache.lock();
316        f(&mut history_cache)
317    }
318
319    pub fn has_history_cache(&self) -> bool {
320        self.history_cache.lock().has_cache()
321    }
322
323    pub fn free_history_cache(&self) {
324        let mut history_cache = self.history_cache.lock();
325        history_cache.free();
326    }
327
328    #[cfg(test)]
329    #[allow(dead_code)]
330    pub(crate) fn pending_changes_len(&self) -> usize {
331        self.pending_changes.len()
332    }
333
334    /// Import a change.
335    ///
336    /// Pending changes that haven't been applied to the dag.
337    /// A change can be imported only when all its deps are already imported.
338    /// Key is the ID of the missing dep
339    ///
340    /// # Err
341    ///
342    /// - Return Err(LoroError::UsedOpID) when the change's id is occupied
343    /// - Return Err(LoroError::DecodeError) when the change's deps are missing
344    pub(crate) fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
345        self.insert_new_change(change, true);
346        Ok(())
347    }
348
349    /// Trim the known part of change
350    pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
351        let Some(&end) = self.dag.vv().get(&change.id.peer) else {
352            return Some(change);
353        };
354
355        if change.id.counter >= end {
356            return Some(change);
357        }
358
359        if change.ctr_end() <= end {
360            return None;
361        }
362
363        let offset = (end - change.id.counter) as usize;
364        Some(change.slice(offset, change.atom_len()))
365    }
366
367    #[allow(unused)]
368    fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
369        let cur_end = self.dag.vv().get(&id.peer).cloned().unwrap_or(0);
370        if cur_end > id.counter {
371            return Err(LoroError::UsedOpID { id });
372        }
373
374        Ok(())
375    }
376
377    /// Ensure the new change is greater than the last peer's id and the counter is continuous.
378    ///
379    /// It can be false when users use detached editing mode and use a custom peer id.
380    // This method might be slow and can be optimized if needed in the future.
381    pub(crate) fn check_change_greater_than_last_peer_id(
382        &self,
383        peer: PeerID,
384        counter: Counter,
385        deps: &Frontiers,
386    ) -> Result<(), LoroError> {
387        if counter == 0 {
388            return Ok(());
389        }
390
391        if !self.configure.detached_editing() {
392            return Ok(());
393        }
394
395        let mut max_last_counter = -1;
396        for dep in deps.iter() {
397            let dep_vv = self
398                .dag
399                .get_vv(dep)
400                .ok_or(LoroError::FrontiersNotFound(dep))?;
401            max_last_counter = max_last_counter.max(dep_vv.get(&peer).cloned().unwrap_or(0) - 1);
402        }
403
404        if counter != max_last_counter + 1 {
405            return Err(LoroError::ConcurrentOpsWithSamePeerID {
406                peer,
407                last_counter: max_last_counter,
408                current: counter,
409            });
410        }
411
412        Ok(())
413    }
414
415    pub(crate) fn next_id(&self, peer: PeerID) -> ID {
416        let cnt = self.dag.vv().get(&peer).copied().unwrap_or(0);
417        ID::new(peer, cnt)
418    }
419
420    pub(crate) fn vv(&self) -> &VersionVector {
421        self.dag.vv()
422    }
423
424    pub(crate) fn frontiers(&self) -> &Frontiers {
425        self.dag.frontiers()
426    }
427
428    /// - Ordering::Less means self is less than target or parallel
429    /// - Ordering::Equal means versions equal
430    /// - Ordering::Greater means self's version is greater than target
431    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
432        self.dag.cmp_with_frontiers(other)
433    }
434
435    /// Compare two [Frontiers] causally.
436    ///
437    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
438    #[inline]
439    pub fn cmp_frontiers(
440        &self,
441        a: &Frontiers,
442        b: &Frontiers,
443    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
444        self.dag.cmp_frontiers(a, b)
445    }
446
447    pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {
448        self.get_change_at(id).map(|c| c.lamport).unwrap_or(0)
449    }
450
451    pub(crate) fn get_lamport_at(&self, id: ID) -> Option<Lamport> {
452        self.get_change_at(id)
453            .map(|c| c.lamport + (id.counter - c.id.counter) as Lamport)
454    }
455
456    pub(crate) fn iter_ops(&self, id_span: IdSpan) -> impl Iterator<Item = RichOp<'static>> + '_ {
457        let change_iter = self.change_store.iter_changes(id_span);
458        change_iter.flat_map(move |c| RichOp::new_iter_by_cnt_range(c, id_span.counter))
459    }
460
461    pub(crate) fn get_max_lamport_at(&self, id: ID) -> Lamport {
462        self.get_change_at(id)
463            .map(|c| {
464                let change_counter = c.id.counter as u32;
465                c.lamport + c.ops().last().map(|op| op.counter).unwrap_or(0) as u32 - change_counter
466            })
467            .unwrap_or(Lamport::MAX)
468    }
469
470    pub fn get_change_at(&self, id: ID) -> Option<BlockChangeRef> {
471        self.change_store.get_change(id)
472    }
473
474    pub(crate) fn set_uncommitted_change(&mut self, change: Change) {
475        self.uncommitted_change = Some(change);
476    }
477
478    pub(crate) fn get_uncommitted_change_in_span(
479        &self,
480        id_span: IdSpan,
481    ) -> Option<Cow<'_, Change>> {
482        self.uncommitted_change.as_ref().and_then(|c| {
483            if c.id_span() == id_span {
484                Some(Cow::Borrowed(c))
485            } else if let Some((start, end)) = id_span.get_slice_range_on(&c.id_span()) {
486                Some(Cow::Owned(c.slice(start, end)))
487            } else {
488                None
489            }
490        })
491    }
492
493    pub fn get_deps_of(&self, id: ID) -> Option<Frontiers> {
494        self.get_change_at(id).map(|c| {
495            if c.id.counter == id.counter {
496                c.deps.clone()
497            } else {
498                Frontiers::from_id(id.inc(-1))
499            }
500        })
501    }
502
503    pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp<'static>>> {
504        let change = self.get_change_at(id)?;
505        Some(convert_change_to_remote(&self.arena, &change))
506    }
507
508    pub(crate) fn import_unknown_lamport_pending_changes(
509        &mut self,
510        remote_changes: Vec<Change>,
511    ) -> Result<(), LoroError> {
512        self.extend_pending_changes_with_unknown_lamport(remote_changes)
513    }
514
515    /// lookup change by id.
516    ///
517    /// if id does not included in this oplog, return None
518    pub(crate) fn lookup_change(&self, id: ID) -> Option<BlockChangeRef> {
519        self.change_store.get_change(id)
520    }
521
522    #[inline(always)]
523    pub(crate) fn export_change_store_from(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
524        self.change_store
525            .export_from(vv, f, self.vv(), self.frontiers())
526    }
527
528    #[inline(always)]
529    pub(crate) fn export_change_store_in_range(
530        &self,
531        vv: &VersionVector,
532        f: &Frontiers,
533        to_vv: &VersionVector,
534        to_frontiers: &Frontiers,
535    ) -> Bytes {
536        self.change_store.export_from(vv, f, to_vv, to_frontiers)
537    }
538
539    #[inline(always)]
540    pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
541        self.change_store
542            .export_blocks_from(vv, self.shallow_since_vv(), self.vv(), w)
543    }
544
545    #[inline(always)]
546    pub(crate) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
547        self.change_store.export_blocks_in_range(spans, w)
548    }
549
550    pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option<Bytes> {
551        let vv = self.dag.frontiers_to_vv(frontiers)?;
552        Some(
553            self.change_store
554                .fork_changes_up_to(self.dag.shallow_since_vv(), frontiers, &vv),
555        )
556    }
557
558    #[inline(always)]
559    pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<ImportStatus, LoroError> {
560        decode_oplog(self, data)
561    }
562
563    /// iterates over all changes between LCA(common ancestors) to the merged version of (`from` and `to`) causally
564    ///
565    /// Tht iterator will include a version vector when the change is applied
566    ///
567    /// returns: (common_ancestor_vv, iterator)
568    ///
569    /// Note: the change returned by the iterator may include redundant ops at the beginning, you should trim it by yourself.
570    /// You can trim it by the provided counter value. It should start with the counter.
571    ///
572    /// If frontiers are provided, it will be faster (because we don't need to calculate it from version vector
573    #[allow(clippy::type_complexity)]
574    pub(crate) fn iter_from_lca_causally(
575        &self,
576        from: &VersionVector,
577        from_frontiers: &Frontiers,
578        to: &VersionVector,
579        to_frontiers: &Frontiers,
580    ) -> (
581        VersionVector,
582        DiffMode,
583        impl Iterator<
584                Item = (
585                    BlockChangeRef,
586                    (Counter, Counter),
587                    Rc<RefCell<VersionVector>>,
588                ),
589            > + '_,
590    ) {
591        let mut merged_vv = from.clone();
592        merged_vv.merge(to);
593        loro_common::debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
594        let (common_ancestors, mut diff_mode) =
595            self.dag.find_common_ancestor(from_frontiers, to_frontiers);
596        if diff_mode == DiffMode::Checkout && to > from {
597            diff_mode = DiffMode::Import;
598        }
599
600        let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
601        // go from lca to merged_vv
602        let diff = common_ancestors_vv.diff(&merged_vv).forward;
603        let mut iter = self.dag.iter_causal(common_ancestors, diff);
604        let mut node = iter.next();
605        let mut cur_cnt = 0;
606        let vv = Rc::new(RefCell::new(VersionVector::default()));
607        (
608            common_ancestors_vv.clone(),
609            diff_mode,
610            std::iter::from_fn(move || {
611                if let Some(inner) = &node {
612                    let mut inner_vv = vv.borrow_mut();
613                    // FIXME: PERF: it looks slow for large vv, like 10000+ entries
614                    inner_vv.clear();
615                    self.dag.ensure_vv_for(&inner.data);
616                    inner_vv.extend_to_include_vv(inner.data.vv.get().unwrap().iter());
617                    let peer = inner.data.peer;
618                    let cnt = inner
619                        .data
620                        .cnt
621                        .max(cur_cnt)
622                        .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
623                    let dag_node_end = (inner.data.cnt + inner.data.len as Counter)
624                        .min(merged_vv.get(&peer).copied().unwrap_or(0));
625                    let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap();
626
627                    if change.ctr_end() < dag_node_end {
628                        cur_cnt = change.ctr_end();
629                    } else {
630                        node = iter.next();
631                        cur_cnt = 0;
632                    }
633
634                    inner_vv.extend_to_include_end_id(change.id);
635
636                    Some((change, (cnt, dag_node_end), vv.clone()))
637                } else {
638                    None
639                }
640            }),
641        )
642    }
643
644    pub fn len_changes(&self) -> usize {
645        self.change_store.change_num()
646    }
647
648    pub fn diagnose_size(&self) -> SizeInfo {
649        let mut total_changes = 0;
650        let mut total_ops = 0;
651        let mut total_atom_ops = 0;
652        let total_dag_node = self.dag.total_parsed_dag_node();
653        self.change_store.visit_all_changes(&mut |change| {
654            total_changes += 1;
655            total_ops += change.ops.len();
656            total_atom_ops += change.atom_len();
657        });
658
659        println!("total changes: {}", total_changes);
660        println!("total ops: {}", total_ops);
661        println!("total atom ops: {}", total_atom_ops);
662        println!("total dag node: {}", total_dag_node);
663        SizeInfo {
664            total_changes,
665            total_ops,
666            total_atom_ops,
667            total_dag_node,
668        }
669    }
670
671    pub(crate) fn iter_changes_peer_by_peer<'a>(
672        &'a self,
673        from: &VersionVector,
674        to: &VersionVector,
675    ) -> impl Iterator<Item = BlockChangeRef> + 'a {
676        let spans: Vec<_> = from.diff_iter(to).1.collect();
677        spans
678            .into_iter()
679            .flat_map(move |span| self.change_store.iter_changes(span))
680    }
681
682    pub(crate) fn iter_changes_causally_rev<'a>(
683        &'a self,
684        from: &VersionVector,
685        to: &VersionVector,
686    ) -> impl Iterator<Item = BlockChangeRef> + 'a {
687        MergedChangeIter::new_change_iter_rev(self, from, to)
688    }
689
690    pub fn get_timestamp_for_next_txn(&self) -> Timestamp {
691        if self.configure.record_timestamp() {
692            get_timestamp_now_txn()
693        } else {
694            0
695        }
696    }
697
698    #[inline(never)]
699    pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
700        let change = self.change_store.get_change_by_lamport_lte(id)?;
701
702        if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
703            return None;
704        }
705
706        Some(ID::new(
707            change.id.peer,
708            (id.lamport - change.lamport) as Counter + change.id.counter,
709        ))
710    }
711
712    #[allow(unused)]
713    pub(crate) fn id_to_idlp(&self, id_start: ID) -> IdLp {
714        let change = self.get_change_at(id_start).unwrap();
715        let lamport = change.lamport + (id_start.counter - change.id.counter) as Lamport;
716        let peer = id_start.peer;
717        loro_common::IdLp { peer, lamport }
718    }
719
720    /// NOTE: This may return a op that includes the given id, not necessarily start with the given id
721    pub(crate) fn get_op_that_includes(&self, id: ID) -> Option<BlockOpRef> {
722        let change = self.get_change_at(id)?;
723        change.get_op_with_counter(id.counter)
724    }
725
726    pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> {
727        let peer = id_span.peer;
728        let mut counter = id_span.counter.min();
729        let span_end = id_span.counter.norm_end();
730        let mut ans = Vec::new();
731
732        while counter < span_end {
733            let id = ID::new(peer, counter);
734            let node = self.dag.get(id).unwrap();
735
736            let f = if node.cnt == counter {
737                node.deps.clone()
738            } else if counter > 0 {
739                id.inc(-1).into()
740            } else {
741                unreachable!()
742            };
743
744            let cur_end = node.cnt + node.len as Counter;
745            let len = cur_end.min(span_end) - counter;
746            ans.push((id.to_span(len as usize), f));
747            counter += len;
748        }
749
750        ans
751    }
752
753    #[inline]
754    pub fn compact_change_store(&mut self) {
755        self.change_store
756            .flush_and_compact(self.dag.vv(), self.dag.frontiers());
757    }
758
759    #[inline]
760    pub fn change_store_kv_size(&self) -> usize {
761        self.change_store.kv_size()
762    }
763
764    pub fn encode_change_store(&self) -> bytes::Bytes {
765        self.change_store
766            .encode_all(self.dag.vv(), self.dag.frontiers())
767    }
768
769    pub fn check_dag_correctness(&self) {
770        self.dag.check_dag_correctness();
771    }
772
773    pub fn shallow_since_vv(&self) -> &ImVersionVector {
774        self.dag.shallow_since_vv()
775    }
776
777    pub fn shallow_since_frontiers(&self) -> &Frontiers {
778        self.dag.shallow_since_frontiers()
779    }
780
781    pub fn is_shallow(&self) -> bool {
782        !self.dag.shallow_since_vv().is_empty()
783    }
784
785    pub fn get_greatest_timestamp(&self, frontiers: &Frontiers) -> Timestamp {
786        let mut max_timestamp = Timestamp::default();
787        for id in frontiers.iter() {
788            let change = self.get_change_at(id).unwrap();
789            if change.timestamp > max_timestamp {
790                max_timestamp = change.timestamp;
791            }
792        }
793
794        max_timestamp
795    }
796}
797
798#[derive(Debug)]
799pub struct SizeInfo {
800    pub total_changes: usize,
801    pub total_ops: usize,
802    pub total_atom_ops: usize,
803    pub total_dag_node: usize,
804}
805
806pub(crate) fn convert_change_to_remote(
807    arena: &SharedArena,
808    change: &Change,
809) -> Change<RemoteOp<'static>> {
810    let mut ops = RleVec::new();
811    for op in change.ops.iter() {
812        for op in local_op_to_remote(arena, op) {
813            ops.push(op);
814        }
815    }
816
817    Change {
818        ops,
819        id: change.id,
820        deps: change.deps.clone(),
821        lamport: change.lamport,
822        timestamp: change.timestamp,
823        commit_msg: change.commit_msg.clone(),
824    }
825}
826
827pub(crate) fn local_op_to_remote(
828    arena: &SharedArena,
829    op: &crate::op::Op,
830) -> SmallVec<[RemoteOp<'static>; 1]> {
831    let container = arena.get_container_id(op.container).unwrap();
832    let mut contents: SmallVec<[_; 1]> = SmallVec::new();
833    match &op.content {
834        crate::op::InnerContent::List(list) => match list {
835            list_op::InnerListOp::Insert { slice, pos } => match container.container_type() {
836                loro_common::ContainerType::Text => {
837                    let str = arena
838                        .slice_str_by_unicode_range(slice.0.start as usize..slice.0.end as usize);
839                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
840                        slice: ListSlice::RawStr {
841                            unicode_len: str.chars().count(),
842                            str: Cow::Owned(str),
843                        },
844                        pos: *pos,
845                    }));
846                }
847                loro_common::ContainerType::List | loro_common::ContainerType::MovableList => {
848                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
849                        slice: ListSlice::RawData(Cow::Owned(
850                            arena.get_values(slice.0.start as usize..slice.0.end as usize),
851                        )),
852                        pos: *pos,
853                    }))
854                }
855                _ => unreachable!(),
856            },
857            list_op::InnerListOp::InsertText {
858                slice,
859                unicode_len: len,
860                unicode_start: _,
861                pos,
862            } => match container.container_type() {
863                loro_common::ContainerType::Text => {
864                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
865                        slice: ListSlice::RawStr {
866                            unicode_len: *len as usize,
867                            str: Cow::Owned(std::str::from_utf8(slice).unwrap().to_owned()),
868                        },
869                        pos: *pos as usize,
870                    }));
871                }
872                _ => unreachable!(),
873            },
874            list_op::InnerListOp::Delete(del) => {
875                contents.push(RawOpContent::List(list_op::ListOp::Delete(*del)))
876            }
877            list_op::InnerListOp::StyleStart {
878                start,
879                end,
880                key,
881                value,
882                info,
883            } => contents.push(RawOpContent::List(list_op::ListOp::StyleStart {
884                start: *start,
885                end: *end,
886                key: key.clone(),
887                value: value.clone(),
888                info: *info,
889            })),
890            list_op::InnerListOp::StyleEnd => {
891                contents.push(RawOpContent::List(list_op::ListOp::StyleEnd))
892            }
893            list_op::InnerListOp::Move {
894                from,
895                elem_id: from_id,
896                to,
897            } => contents.push(RawOpContent::List(list_op::ListOp::Move {
898                from: *from,
899                elem_id: *from_id,
900                to: *to,
901            })),
902            list_op::InnerListOp::Set { elem_id, value } => {
903                contents.push(RawOpContent::List(list_op::ListOp::Set {
904                    elem_id: *elem_id,
905                    value: value.clone(),
906                }))
907            }
908        },
909        crate::op::InnerContent::Map(map) => {
910            let value = map.value.clone();
911            contents.push(RawOpContent::Map(crate::container::map::MapSet {
912                key: map.key.clone(),
913                value,
914            }))
915        }
916        crate::op::InnerContent::Tree(tree) => contents.push(RawOpContent::Tree(tree.clone())),
917        crate::op::InnerContent::Future(f) => match f {
918            #[cfg(feature = "counter")]
919            crate::op::FutureInnerContent::Counter(c) => contents.push(RawOpContent::Counter(*c)),
920            FutureInnerContent::Unknown { prop, value } => {
921                contents.push(crate::op::RawOpContent::Unknown {
922                    prop: *prop,
923                    value: (**value).clone(),
924                })
925            }
926        },
927    };
928
929    let mut ans = SmallVec::with_capacity(contents.len());
930    for content in contents {
931        ans.push(RemoteOp {
932            container: container.clone(),
933            content,
934            counter: op.counter,
935        })
936    }
937    ans
938}
939
940pub(crate) fn get_timestamp_now_txn() -> Timestamp {
941    (get_sys_timestamp() as Timestamp + 500) / 1000
942}