loro_internal/
loro.rs

1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8    arena::SharedArena,
9    change::Timestamp,
10    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11    container::{
12        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13        IntoContainerId,
14    },
15    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16    dag::{Dag, DagUtils},
17    diff_calc::DiffCalculator,
18    encoding::{
19        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
21        export_state_only_snapshot,
22        json_schema::{encode_change_to_json, json::JsonSchema},
23        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24    },
25    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27    id::PeerID,
28    json::JsonChange,
29    op::InnerContent,
30    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31    state::DocState,
32    subscription::{LocalUpdateCallback, Observer, Subscriber},
33    undo::DiffBatch,
34    utils::subscription::{SubscriberSetWithQueue, Subscription},
35    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37    VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42    lock::{LoroLockGroup, LoroMutex},
43    txn::Transaction,
44};
45use either::Either;
46use fxhash::{FxHashMap, FxHashSet};
47use loro_common::{
48    ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
49    LoroValue, ID,
50};
51use rle::HasLength;
52use std::{
53    borrow::Cow,
54    cmp::Ordering,
55    collections::{hash_map::Entry, BinaryHeap},
56    ops::ControlFlow,
57    sync::{
58        atomic::Ordering::{Acquire, Release},
59        Arc,
60    },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("LoroDoc")
73            .field("config", &self.config)
74            .field("auto_commit", &self.auto_commit)
75            .field("detached", &self.detached)
76            .finish()
77    }
78}
79
80impl LoroDoc {
81    pub fn new() -> Self {
82        let oplog = OpLog::new();
83        let arena = oplog.arena.clone();
84        let config: Configure = oplog.configure.clone();
85        let lock_group = LoroLockGroup::new();
86        let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
87        let inner = Arc::new_cyclic(|w| {
88            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
89            LoroDocInner {
90                oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
91                state,
92                config,
93                detached: AtomicBool::new(false),
94                auto_commit: AtomicBool::new(false),
95                observer: Arc::new(Observer::new(arena.clone())),
96                diff_calculator: Arc::new(
97                    lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
98                ),
99                txn: global_txn,
100                arena,
101                local_update_subs: SubscriberSetWithQueue::new(),
102                peer_id_change_subs: SubscriberSetWithQueue::new(),
103                pre_commit_subs: SubscriberSetWithQueue::new(),
104                first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
105            }
106        });
107        LoroDoc { inner }
108    }
109
110    pub fn fork(&self) -> Self {
111        if self.is_detached() {
112            return self.fork_at(&self.state_frontiers());
113        }
114
115        let (options, txn) = self.commit_then_stop();
116        drop(txn);
117        let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self);
118        let doc = Self::new();
119        encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default()).unwrap();
120        doc.set_config(&self.config);
121        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
122            doc.start_auto_commit();
123        }
124        self.renew_txn_if_auto_commit(options);
125        doc
126    }
127    /// Enables editing of the document in detached mode.
128    ///
129    /// By default, the document cannot be edited in detached mode (after calling
130    /// `detach` or checking out a version other than the latest). This method
131    /// allows editing in detached mode.
132    ///
133    /// # Important Notes:
134    ///
135    /// - After enabling this mode, the document will use a different PeerID. Each
136    ///   time you call checkout, a new PeerID will be used.
137    /// - If you set a custom PeerID while this mode is enabled, ensure that
138    ///   concurrent operations with the same PeerID are not possible.
139    /// - On detached mode, importing will not change the state of the document.
140    ///   It also doesn't change the version of the [DocState]. The changes will be
141    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
142    pub fn set_detached_editing(&self, enable: bool) {
143        self.config.set_detached_editing(enable);
144        if enable && self.is_detached() {
145            let (options, txn) = self.commit_then_stop();
146            drop(txn);
147            self.renew_peer_id();
148            self.renew_txn_if_auto_commit(options);
149        }
150    }
151
152    /// Create a doc with auto commit enabled.
153    #[inline]
154    pub fn new_auto_commit() -> Self {
155        let doc = Self::new();
156        doc.start_auto_commit();
157        doc
158    }
159
160    #[inline(always)]
161    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
162        if peer == PeerID::MAX {
163            return Err(LoroError::InvalidPeerID);
164        }
165        let next_id = self.oplog.lock().unwrap().next_id(peer);
166        if self.auto_commit.load(Acquire) {
167            let doc_state = self.state.lock().unwrap();
168            doc_state
169                .peer
170                .store(peer, std::sync::atomic::Ordering::Relaxed);
171
172            if doc_state.is_in_txn() {
173                drop(doc_state);
174                self.commit_then_renew();
175            }
176            self.peer_id_change_subs.emit(&(), next_id);
177            return Ok(());
178        }
179
180        let doc_state = self.state.lock().unwrap();
181        if doc_state.is_in_txn() {
182            return Err(LoroError::TransactionError(
183                "Cannot change peer id during transaction"
184                    .to_string()
185                    .into_boxed_str(),
186            ));
187        }
188
189        doc_state
190            .peer
191            .store(peer, std::sync::atomic::Ordering::Relaxed);
192        drop(doc_state);
193        self.peer_id_change_subs.emit(&(), next_id);
194        Ok(())
195    }
196
197    /// Renews the PeerID for the document.
198    pub(crate) fn renew_peer_id(&self) {
199        let peer_id = DefaultRandom.next_u64();
200        self.set_peer_id(peer_id).unwrap();
201    }
202
203    /// Commit the cumulative auto commit transaction.
204    /// This method only has effect when `auto_commit` is true.
205    ///
206    /// Afterwards, the users need to call `self.renew_txn_after_commit()` to resume the continuous transaction.
207    ///
208    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
209    #[inline]
210    #[must_use]
211    pub fn commit_then_stop(&self) -> (Option<CommitOptions>, LoroMutexGuard<'_, Option<Transaction>>) {
212        let (a, b) = self.commit_with(CommitOptions::new().immediate_renew(false));
213        (a, b.unwrap())
214    }
215
216    /// Commit the cumulative auto commit transaction.
217    /// It will start the next one immediately
218    ///
219    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
220    #[inline]
221    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
222        self.commit_with(CommitOptions::new().immediate_renew(true))
223            .0
224    }
225
226    /// This method is called before the commit.
227    /// It can be used to modify the change before it is committed.
228    ///
229    /// It return Some(txn) if the txn is None
230    fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
231        let mut txn_guard = self.txn.lock().unwrap();
232        let Some(txn) = txn_guard.as_mut() else {
233            return Some(txn_guard);
234        };
235
236        if txn.is_peer_first_appearance {
237            txn.is_peer_first_appearance = false;
238            drop(txn_guard);
239            // First commit from a peer
240            self.first_commit_from_peer_subs.emit(
241                &(),
242                FirstCommitFromPeerPayload {
243                    peer: self.peer_id(),
244                },
245            );
246        }
247
248        None
249    }
250
251    /// Commit the cumulative auto commit transaction.
252    /// This method only has effect when `auto_commit` is true.
253    /// If `immediate_renew` is true, a new transaction will be created after the old one is committed
254    ///
255    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
256    #[instrument(skip_all)]
257    pub fn commit_with(
258        &self,
259        config: CommitOptions,
260    ) -> (
261        Option<CommitOptions>,
262        Option<LoroMutexGuard<'_, Option<Transaction>>>,
263    ) {
264        if !self.auto_commit.load(Acquire) {
265            let txn_guard = self.txn.lock().unwrap();
266            // if not auto_commit, nothing should happen
267            // because the global txn is not used
268            return (None, Some(txn_guard));
269        }
270
271        loop {
272            if let Some(txn_guard) = self.before_commit() {
273                return (None, Some(txn_guard));
274            }
275
276            let mut txn_guard = self.txn.lock().unwrap();
277            let txn = txn_guard.take();
278            let Some(mut txn) = txn else {
279                return (None, Some(txn_guard));
280            };
281            let on_commit = txn.take_on_commit();
282            if let Some(origin) = config.origin.clone() {
283                txn.set_origin(origin);
284            }
285
286            if let Some(timestamp) = config.timestamp {
287                txn.set_timestamp(timestamp);
288            }
289
290            if let Some(msg) = config.commit_msg.as_ref() {
291                txn.set_msg(Some(msg.clone()));
292            }
293
294            let id_span = txn.id_span();
295            let options = txn.commit().unwrap();
296            if config.immediate_renew {
297                assert!(self.can_edit());
298                let mut t = self.txn().unwrap();
299                if let Some(options) = options.as_ref() {
300                    t.set_options(options.clone());
301                }
302                *txn_guard = Some(t);
303            }
304
305            if let Some(on_commit) = on_commit {
306                drop(txn_guard);
307                on_commit(&self.state, &self.oplog, id_span);
308                txn_guard = self.txn.lock().unwrap();
309                if !config.immediate_renew && txn_guard.is_some() {
310                    // make sure that txn_guard is None when config.immediate_renew is false
311                    continue;
312                }
313            }
314
315            return (
316                options,
317                if !config.immediate_renew {
318                    Some(txn_guard)
319                } else {
320                    None
321                },
322            );
323        }
324    }
325
326    /// Set the commit message of the next commit
327    pub fn set_next_commit_message(&self, message: &str) {
328        let mut binding = self.txn.lock().unwrap();
329        let Some(txn) = binding.as_mut() else {
330            return;
331        };
332
333        if message.is_empty() {
334            txn.set_msg(None)
335        } else {
336            txn.set_msg(Some(message.into()))
337        }
338    }
339
340    /// Set the origin of the next commit
341    pub fn set_next_commit_origin(&self, origin: &str) {
342        let mut txn = self.txn.lock().unwrap();
343        if let Some(txn) = txn.as_mut() {
344            txn.set_origin(origin.into());
345        }
346    }
347
348    /// Set the timestamp of the next commit
349    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
350        let mut txn = self.txn.lock().unwrap();
351        if let Some(txn) = txn.as_mut() {
352            txn.set_timestamp(timestamp);
353        }
354    }
355
356    /// Set the options of the next commit
357    pub fn set_next_commit_options(&self, options: CommitOptions) {
358        let mut txn = self.txn.lock().unwrap();
359        if let Some(txn) = txn.as_mut() {
360            txn.set_options(options);
361        }
362    }
363
364    /// Clear the options of the next commit
365    pub fn clear_next_commit_options(&self) {
366        let mut txn = self.txn.lock().unwrap();
367        if let Some(txn) = txn.as_mut() {
368            txn.set_options(CommitOptions::new());
369        }
370    }
371
372    /// Set whether to record the timestamp of each change. Default is `false`.
373    ///
374    /// If enabled, the Unix timestamp will be recorded for each change automatically.
375    ///
376    /// You can also set each timestamp manually when you commit a change.
377    /// The timestamp manually set will override the automatic one.
378    ///
379    /// NOTE: Timestamps are forced to be in ascending order.
380    /// If you commit a new change with a timestamp that is less than the existing one,
381    /// the largest existing timestamp will be used instead.
382    #[inline]
383    pub fn set_record_timestamp(&self, record: bool) {
384        self.config.set_record_timestamp(record);
385    }
386
387    /// Set the interval of mergeable changes, in seconds.
388    ///
389    /// If two continuous local changes are within the interval, they will be merged into one change.
390    /// The default value is 1000 seconds.
391    #[inline]
392    pub fn set_change_merge_interval(&self, interval: i64) {
393        self.config.set_merge_interval(interval);
394    }
395
396    pub fn can_edit(&self) -> bool {
397        !self.is_detached() || self.config.detached_editing()
398    }
399
400    pub fn is_detached_editing_enabled(&self) -> bool {
401        self.config.detached_editing()
402    }
403
404    #[inline]
405    pub fn config_text_style(&self, text_style: StyleConfigMap) {
406        self.config.text_style_config.try_write().unwrap().map = text_style.map;
407    }
408
409    #[inline]
410    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
411        self.config
412            .text_style_config
413            .try_write()
414            .unwrap()
415            .default_style = text_style;
416    }
417    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
418        let doc = Self::new();
419        let (options, _guard) = doc.commit_then_stop();
420        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
421        if mode.is_snapshot() {
422            decode_snapshot(&doc, mode, body, Default::default())?;
423            drop(_guard);
424            doc.renew_txn_if_auto_commit(options);
425            Ok(doc)
426        } else {
427            Err(LoroError::DecodeError(
428                "Invalid encode mode".to_string().into(),
429            ))
430        }
431    }
432
433    /// Is the document empty? (no ops)
434    #[inline(always)]
435    pub fn can_reset_with_snapshot(&self) -> bool {
436        let oplog = self.oplog.lock().unwrap();
437        if oplog.batch_importing {
438            return false;
439        }
440
441        if self.is_detached() {
442            return false;
443        }
444
445        oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
446    }
447
448    /// Whether [OpLog] and [DocState] are detached.
449    ///
450    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
451    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
452    /// You need to call `checkout` to make it take effect.
453    #[inline(always)]
454    pub fn is_detached(&self) -> bool {
455        self.detached.load(Acquire)
456    }
457
458    pub(crate) fn set_detached(&self, detached: bool) {
459        self.detached.store(detached, Release);
460    }
461
462    #[inline(always)]
463    pub fn peer_id(&self) -> PeerID {
464        self.state
465            .lock()
466            .unwrap()
467            .peer
468            .load(std::sync::atomic::Ordering::Relaxed)
469    }
470
471    #[inline(always)]
472    pub fn detach(&self) {
473        let (options, txn) = self.commit_then_stop();
474        drop(txn);
475        self.set_detached(true);
476        self.renew_txn_if_auto_commit(options);
477    }
478
479    #[inline(always)]
480    pub fn attach(&self) {
481        self.checkout_to_latest()
482    }
483
484    /// Get the timestamp of the current state.
485    /// It's the last edit time of the [DocState].
486    pub fn state_timestamp(&self) -> Timestamp {
487        let f = &self.state.lock().unwrap().frontiers;
488        self.oplog.lock().unwrap().get_timestamp_of_version(f)
489    }
490
491    #[inline(always)]
492    pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
493        &self.state
494    }
495
496    #[inline]
497    pub fn get_state_deep_value(&self) -> LoroValue {
498        self.state.lock().unwrap().get_deep_value()
499    }
500
501    #[inline(always)]
502    pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
503        &self.oplog
504    }
505
506    pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
507        let (options, txn) = self.commit_then_stop();
508        let ans = self.oplog.lock().unwrap().export_from(vv);
509        drop(txn);
510        self.renew_txn_if_auto_commit(options);
511        ans
512    }
513
514    #[inline(always)]
515    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
516        let s = debug_span!("import", peer = self.peer_id());
517        let _e = s.enter();
518        self.import_with(bytes, Default::default())
519    }
520
521    #[inline]
522    pub fn import_with(
523        &self,
524        bytes: &[u8],
525        origin: InternalString,
526    ) -> Result<ImportStatus, LoroError> {
527        let (options, txn) = self.commit_then_stop();
528        assert!(txn.is_none());
529        let ans = self._import_with(bytes, origin);
530        drop(txn);
531        self.renew_txn_if_auto_commit(options);
532        ans
533    }
534
535    #[tracing::instrument(skip_all)]
536    fn _import_with(
537        &self,
538        bytes: &[u8],
539        origin: InternalString,
540    ) -> Result<ImportStatus, LoroError> {
541        ensure_cov::notify_cov("loro_internal::import");
542        let parsed = parse_header_and_body(bytes, true)?;
543        loro_common::info!("Importing with mode={:?}", &parsed.mode);
544        let result = match parsed.mode {
545            EncodeMode::OutdatedRle => {
546                if self.state.lock().unwrap().is_in_txn() {
547                    return Err(LoroError::ImportWhenInTxn);
548                }
549
550                let s = tracing::span!(
551                    tracing::Level::INFO,
552                    "Import updates ",
553                    peer = self.peer_id()
554                );
555                let _e = s.enter();
556                self.update_oplog_and_apply_delta_to_state_if_needed(
557                    |oplog| oplog.decode(parsed),
558                    origin,
559                )
560            }
561            EncodeMode::OutdatedSnapshot => {
562                if self.can_reset_with_snapshot() {
563                    loro_common::info!("Init by snapshot {}", self.peer_id());
564                    decode_snapshot(self, parsed.mode, parsed.body, origin)
565                } else {
566                    self.update_oplog_and_apply_delta_to_state_if_needed(
567                        |oplog| oplog.decode(parsed),
568                        origin,
569                    )
570                }
571            }
572            EncodeMode::FastSnapshot => {
573                if self.can_reset_with_snapshot() {
574                    ensure_cov::notify_cov("loro_internal::import::snapshot");
575                    loro_common::info!("Init by fast snapshot {}", self.peer_id());
576                    decode_snapshot(self, parsed.mode, parsed.body, origin)
577                } else {
578                    self.update_oplog_and_apply_delta_to_state_if_needed(
579                        |oplog| oplog.decode(parsed),
580                        origin,
581                    )
582
583                    // let new_doc = LoroDoc::new();
584                    // new_doc.import(bytes)?;
585                    // let updates = new_doc.export_from(&self.oplog_vv());
586                    // return self.import_with(updates.as_slice(), origin);
587                }
588            }
589            EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
590                |oplog| oplog.decode(parsed),
591                origin,
592            ),
593            EncodeMode::Auto => {
594                unreachable!()
595            }
596        };
597
598        self.emit_events();
599
600        result
601    }
602
603    #[tracing::instrument(skip_all)]
604    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
605        &self,
606        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
607        origin: InternalString,
608    ) -> Result<ImportStatus, LoroError> {
609        let mut oplog = self.oplog.lock().unwrap();
610        if !self.is_detached() {
611            let old_vv = oplog.vv().clone();
612            let old_frontiers = oplog.frontiers().clone();
613            let result = f(&mut oplog);
614            if &old_vv != oplog.vv() {
615                let mut diff = DiffCalculator::new(false);
616                let (diff, diff_mode) = diff.calc_diff_internal(
617                    &oplog,
618                    &old_vv,
619                    &old_frontiers,
620                    oplog.vv(),
621                    oplog.dag.get_frontiers(),
622                    None,
623                );
624                let mut state = self.state.lock().unwrap();
625                state.apply_diff(
626                    InternalDocDiff {
627                        origin,
628                        diff: (diff).into(),
629                        by: EventTriggerKind::Import,
630                        new_version: Cow::Owned(oplog.frontiers().clone()),
631                    },
632                    diff_mode,
633                );
634            }
635            result
636        } else {
637            f(&mut oplog)
638        }
639    }
640
641    fn emit_events(&self) {
642        // we should not hold the lock when emitting events
643        let events = {
644            let mut state = self.state.lock().unwrap();
645            state.take_events()
646        };
647        for event in events {
648            self.observer.emit(event);
649        }
650    }
651
652    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
653        let mut state = self.state.lock().unwrap();
654        state.take_events()
655    }
656
657    #[instrument(skip_all)]
658    pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
659        if self.is_shallow() {
660            return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
661        }
662        let (options, txn) = self.commit_then_stop();
663        drop(txn);
664        let ans = export_snapshot(self);
665        self.renew_txn_if_auto_commit(options);
666        Ok(ans)
667    }
668
669    /// Import the json schema updates.
670    ///
671    /// only supports backward compatibility but not forward compatibility.
672    #[tracing::instrument(skip_all)]
673    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
674        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
675        let (options, txn) = self.commit_then_stop();
676        let result = self.update_oplog_and_apply_delta_to_state_if_needed(
677            |oplog| crate::encoding::json_schema::import_json(oplog, json),
678            Default::default(),
679        );
680        self.emit_events();
681        drop(txn);
682        self.renew_txn_if_auto_commit(options);
683        result
684    }
685
686    pub fn export_json_updates(
687        &self,
688        start_vv: &VersionVector,
689        end_vv: &VersionVector,
690        with_peer_compression: bool,
691    ) -> JsonSchema {
692        let (options, txn) = self.commit_then_stop();
693        drop(txn);
694        let oplog = self.oplog.lock().unwrap();
695        let mut start_vv = start_vv;
696        let _temp: Option<VersionVector>;
697        if !oplog.dag.shallow_since_vv().is_empty() {
698            // Make sure that start_vv >= shallow_since_vv
699            let mut include_all = true;
700            for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
701                if start_vv.get(peer).unwrap_or(&0) < counter {
702                    include_all = false;
703                    break;
704                }
705            }
706            if !include_all {
707                let mut vv = start_vv.clone();
708                for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
709                    vv.extend_to_include_end_id(ID::new(peer, counter));
710                }
711                _temp = Some(vv);
712                start_vv = _temp.as_ref().unwrap();
713            }
714        }
715
716        let json = crate::encoding::json_schema::export_json(
717            &oplog,
718            start_vv,
719            end_vv,
720            with_peer_compression,
721        );
722        drop(oplog);
723        self.renew_txn_if_auto_commit(options);
724        json
725    }
726
727    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
728        let oplog = self.oplog.lock().unwrap();
729        let mut changes = export_json_in_id_span(&oplog, id_span);
730        if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
731            let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
732            changes.push(change_json);
733        }
734        changes
735    }
736
737    /// Get the version vector of the current OpLog
738    #[inline]
739    pub fn oplog_vv(&self) -> VersionVector {
740        self.oplog.lock().unwrap().vv().clone()
741    }
742
743    /// Get the version vector of the current [DocState]
744    #[inline]
745    pub fn state_vv(&self) -> VersionVector {
746        let oplog = self.oplog.lock().unwrap();
747        let f = &self.state.lock().unwrap().frontiers;
748        oplog.dag.frontiers_to_vv(f).unwrap()
749    }
750
751    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
752        let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
753        if let LoroValue::Container(c) = value {
754            Some(ValueOrHandler::Handler(Handler::new_attached(
755                c.clone(),
756                self.clone(),
757            )))
758        } else {
759            Some(ValueOrHandler::Value(value))
760        }
761    }
762
763    /// Get the handler by the string path.
764    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
765        let path = str_to_path(path)?;
766        self.get_by_path(&path)
767    }
768
769    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
770        let arena = &self.arena;
771        let txn = self.txn.lock().unwrap();
772        let txn = txn.as_ref()?;
773        let ops_ = txn.local_ops();
774        let new_id = ID {
775            peer: *txn.peer(),
776            counter: ops_.first()?.counter,
777        };
778        let change = ChangeRef {
779            id: &new_id,
780            deps: txn.frontiers(),
781            timestamp: &txn
782                .timestamp()
783                .as_ref()
784                .copied()
785                .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
786            commit_msg: txn.msg(),
787            ops: ops_,
788            lamport: txn.lamport(),
789        };
790        let json = encode_change_to_json(change, arena);
791        Some(json)
792    }
793
794    #[inline]
795    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
796        if self.has_container(&id) {
797            Some(Handler::new_attached(id, self.clone()))
798        } else {
799            None
800        }
801    }
802
803    /// id can be a str, ContainerID, or ContainerIdRaw.
804    /// if it's str it will use Root container, which will not be None
805    #[inline]
806    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
807        let id = id.into_container_id(&self.arena, ContainerType::Text);
808        assert!(self.has_container(&id));
809        Handler::new_attached(id, self.clone()).into_text().unwrap()
810    }
811
812    /// id can be a str, ContainerID, or ContainerIdRaw.
813    /// if it's str it will use Root container, which will not be None
814    #[inline]
815    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
816        let id = id.into_container_id(&self.arena, ContainerType::List);
817        assert!(self.has_container(&id));
818        Handler::new_attached(id, self.clone()).into_list().unwrap()
819    }
820
821    /// id can be a str, ContainerID, or ContainerIdRaw.
822    /// if it's str it will use Root container, which will not be None
823    #[inline]
824    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
825        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
826        assert!(self.has_container(&id));
827        Handler::new_attached(id, self.clone())
828            .into_movable_list()
829            .unwrap()
830    }
831
832    /// id can be a str, ContainerID, or ContainerIdRaw.
833    /// if it's str it will use Root container, which will not be None
834    #[inline]
835    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
836        let id = id.into_container_id(&self.arena, ContainerType::Map);
837        assert!(self.has_container(&id));
838        Handler::new_attached(id, self.clone()).into_map().unwrap()
839    }
840
841    /// id can be a str, ContainerID, or ContainerIdRaw.
842    /// if it's str it will use Root container, which will not be None
843    #[inline]
844    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
845        let id = id.into_container_id(&self.arena, ContainerType::Tree);
846        assert!(self.has_container(&id));
847        Handler::new_attached(id, self.clone()).into_tree().unwrap()
848    }
849
850    #[cfg(feature = "counter")]
851    pub fn get_counter<I: IntoContainerId>(
852        &self,
853        id: I,
854    ) -> crate::handler::counter::CounterHandler {
855        let id = id.into_container_id(&self.arena, ContainerType::Counter);
856        assert!(self.has_container(&id));
857        Handler::new_attached(id, self.clone())
858            .into_counter()
859            .unwrap()
860    }
861
862    #[must_use]
863    pub fn has_container(&self, id: &ContainerID) -> bool {
864        if id.is_root() {
865            return true;
866        }
867
868        let exist = self.state.lock().unwrap().does_container_exist(id);
869        exist
870    }
871
872    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
873    ///
874    /// This is an internal API. You should NOT use it directly.
875    ///
876    /// # Internal
877    ///
878    /// This method will use the diff calculator to calculate the diff required to time travel
879    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
880    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
881    ///
882    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
883    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
884    /// distance from id_span to the current latest version.
885    #[instrument(level = "info", skip_all)]
886    pub fn undo_internal(
887        &self,
888        id_span: IdSpan,
889        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
890        post_transform_base: Option<&DiffBatch>,
891        before_diff: &mut dyn FnMut(&DiffBatch),
892    ) -> LoroResult<CommitWhenDrop<'_>> {
893        if !self.can_edit() {
894            return Err(LoroError::EditWhenDetached);
895        }
896
897        let (options, txn) = self.commit_then_stop();
898        if !self
899            .oplog()
900            .lock()
901            .unwrap()
902            .vv()
903            .includes_id(id_span.id_last())
904        {
905            self.renew_txn_if_auto_commit(options);
906            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
907        }
908
909        let (was_recording, latest_frontiers) = {
910            let mut state = self.state.lock().unwrap();
911            let was_recording = state.is_recording();
912            state.stop_and_clear_recording();
913            (was_recording, state.frontiers.clone())
914        };
915
916        let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
917        let diff = crate::undo::undo(
918            spans,
919            match post_transform_base {
920                Some(d) => Either::Right(d),
921                None => Either::Left(&latest_frontiers),
922            },
923            |from, to| {
924                self._checkout_without_emitting(from, false, false).unwrap();
925                self.state.lock().unwrap().start_recording();
926                self._checkout_without_emitting(to, false, false).unwrap();
927                let mut state = self.state.lock().unwrap();
928                let e = state.take_events();
929                state.stop_and_clear_recording();
930                DiffBatch::new(e)
931            },
932            before_diff,
933        );
934
935        // println!("\nundo_internal: diff: {:?}", diff);
936        // println!("container remap: {:?}", container_remap);
937
938        self._checkout_without_emitting(&latest_frontiers, false, false)?;
939        self.set_detached(false);
940        if was_recording {
941            self.state.lock().unwrap().start_recording();
942        }
943        drop(txn);
944        self.start_auto_commit();
945        // Try applying the diff, but ignore the error if it happens.
946        // MovableList's undo behavior is too tricky to handle in a collaborative env
947        // so in edge cases this may be an Error
948        if let Err(e) = self._apply_diff(diff, container_remap, true) {
949            warn!("Undo Failed {:?}", e);
950        }
951
952        if let Some(options) = options {
953            self.set_next_commit_options(options);
954        }
955        Ok(CommitWhenDrop {
956            doc: self,
957            default_options: CommitOptions::new().origin("undo"),
958        })
959    }
960
961    /// Generate a series of local operations that can revert the current doc to the target
962    /// version.
963    ///
964    /// Internally, it will calculate the diff between the current state and the target state,
965    /// and apply the diff to the current state.
966    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
967        // TODO: test when the doc is readonly
968        // TODO: test when the doc is detached but enabled editing
969        let f = self.state_frontiers();
970        let diff = self.diff(&f, target)?;
971        self._apply_diff(diff, &mut Default::default(), false)
972    }
973
974    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
975    ///
976    /// NOTE: This method will make the doc enter the **detached mode**.
977    // FIXME: This method needs testing (no event should be emitted during processing this)
978    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
979        {
980            // check whether a and b are valid
981            let oplog = self.oplog.lock().unwrap();
982            for id in a.iter() {
983                if !oplog.dag.contains(id) {
984                    return Err(LoroError::FrontiersNotFound(id));
985                }
986            }
987            for id in b.iter() {
988                if !oplog.dag.contains(id) {
989                    return Err(LoroError::FrontiersNotFound(id));
990                }
991            }
992        }
993
994        let (options, txn) = self.commit_then_stop();
995        let was_detached = self.is_detached();
996        let old_frontiers = self.state_frontiers();
997        let was_recording = {
998            let mut state = self.state.lock().unwrap();
999            let is_recording = state.is_recording();
1000            state.stop_and_clear_recording();
1001            is_recording
1002        };
1003        self._checkout_without_emitting(a, true, false).unwrap();
1004        self.state.lock().unwrap().start_recording();
1005        self._checkout_without_emitting(b, true, false).unwrap();
1006        let e = {
1007            let mut state = self.state.lock().unwrap();
1008            let e = state.take_events();
1009            state.stop_and_clear_recording();
1010            e
1011        };
1012        self._checkout_without_emitting(&old_frontiers, false, false)
1013            .unwrap();
1014        drop(txn);
1015        if !was_detached {
1016            self.set_detached(false);
1017            self.renew_txn_if_auto_commit(options);
1018        }
1019        if was_recording {
1020            self.state.lock().unwrap().start_recording();
1021        }
1022        Ok(DiffBatch::new(e))
1023    }
1024
1025    /// Apply a diff to the current state.
1026    #[inline(always)]
1027    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1028        self._apply_diff(diff, &mut Default::default(), true)
1029    }
1030
1031    /// Apply a diff to the current state.
1032    ///
1033    /// This method will not recreate containers with the same [ContainerID]s.
1034    /// While this can be convenient in certain cases, it can break several internal invariants:
1035    ///
1036    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
1037    ///    would result in multiple instances of the same container in the document.
1038    /// 2. Unreachable containers should be removable from the state when necessary.
1039    ///
1040    /// However, the diff may contain operations that depend on container IDs.
1041    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
1042    pub(crate) fn _apply_diff(
1043        &self,
1044        diff: DiffBatch,
1045        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1046        skip_unreachable: bool,
1047    ) -> LoroResult<()> {
1048        if !self.can_edit() {
1049            return Err(LoroError::EditWhenDetached);
1050        }
1051
1052        let mut ans: LoroResult<()> = Ok(());
1053        let mut missing_containers: Vec<ContainerID> = Vec::new();
1054        for (mut id, diff) in diff.into_iter() {
1055            let mut remapped = false;
1056            while let Some(rid) = container_remap.get(&id) {
1057                remapped = true;
1058                id = rid.clone();
1059            }
1060
1061            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1062                // Not in arena does not imply non-existent; consult state/kv and register lazily
1063                let exists = self.state.lock().unwrap().does_container_exist(&id);
1064                if !exists {
1065                    missing_containers.push(id);
1066                    continue;
1067                }
1068                // Ensure registration so handlers can be created
1069                self.state.lock().unwrap().ensure_container(&id);
1070            }
1071
1072            if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1073                continue;
1074            }
1075
1076            let Some(h) = self.get_handler(id.clone()) else {
1077                return Err(LoroError::ContainersNotFound {
1078                    containers: Box::new(vec![id]),
1079                });
1080            };
1081            if let Err(e) = h.apply_diff(diff, container_remap) {
1082                ans = Err(e);
1083            }
1084        }
1085
1086        if !missing_containers.is_empty() {
1087            return Err(LoroError::ContainersNotFound {
1088                containers: Box::new(missing_containers),
1089            });
1090        }
1091
1092        ans
1093    }
1094
1095    /// This is for debugging purpose. It will travel the whole oplog
1096    #[inline]
1097    pub fn diagnose_size(&self) {
1098        self.oplog().lock().unwrap().diagnose_size();
1099    }
1100
1101    #[inline]
1102    pub fn oplog_frontiers(&self) -> Frontiers {
1103        self.oplog().lock().unwrap().frontiers().clone()
1104    }
1105
1106    #[inline]
1107    pub fn state_frontiers(&self) -> Frontiers {
1108        self.state.lock().unwrap().frontiers.clone()
1109    }
1110
1111    /// - Ordering::Less means self is less than target or parallel
1112    /// - Ordering::Equal means versions equal
1113    /// - Ordering::Greater means self's version is greater than target
1114    #[inline]
1115    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1116        self.oplog().lock().unwrap().cmp_with_frontiers(other)
1117    }
1118
1119    /// Compare two [Frontiers] causally.
1120    ///
1121    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1122    #[inline]
1123    pub fn cmp_frontiers(
1124        &self,
1125        a: &Frontiers,
1126        b: &Frontiers,
1127    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1128        self.oplog().lock().unwrap().cmp_frontiers(a, b)
1129    }
1130
1131    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1132        let mut state = self.state.lock().unwrap();
1133        if !state.is_recording() {
1134            state.start_recording();
1135        }
1136
1137        self.observer.subscribe_root(callback)
1138    }
1139
1140    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1141        let mut state = self.state.lock().unwrap();
1142        if !state.is_recording() {
1143            state.start_recording();
1144        }
1145
1146        self.observer.subscribe(container_id, callback)
1147    }
1148
1149    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1150        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1151        activate();
1152        sub
1153    }
1154
1155    // PERF: opt
1156    #[tracing::instrument(skip_all)]
1157    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1158        if bytes.is_empty() {
1159            return Ok(ImportStatus::default());
1160        }
1161
1162        if bytes.len() == 1 {
1163            return self.import(&bytes[0]);
1164        }
1165
1166        let mut success = VersionRange::default();
1167        let mut pending = VersionRange::default();
1168        let mut meta_arr = bytes
1169            .iter()
1170            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1171            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1172        meta_arr.sort_by(|a, b| {
1173            a.0.mode
1174                .cmp(&b.0.mode)
1175                .then(b.0.change_num.cmp(&a.0.change_num))
1176        });
1177
1178        let (options, txn) = self.commit_then_stop();
1179        drop(txn);
1180        let is_detached = self.is_detached();
1181        self.detach();
1182        self.oplog.lock().unwrap().batch_importing = true;
1183        let mut err = None;
1184        for (_meta, data) in meta_arr {
1185            match self.import(data) {
1186                Ok(s) => {
1187                    for (peer, (start, end)) in s.success.iter() {
1188                        match success.0.entry(*peer) {
1189                            Entry::Occupied(mut e) => {
1190                                e.get_mut().1 = *end.max(&e.get().1);
1191                            }
1192                            Entry::Vacant(e) => {
1193                                e.insert((*start, *end));
1194                            }
1195                        }
1196                    }
1197
1198                    if let Some(p) = s.pending.as_ref() {
1199                        for (&peer, &(start, end)) in p.iter() {
1200                            match pending.0.entry(peer) {
1201                                Entry::Occupied(mut e) => {
1202                                    e.get_mut().0 = start.min(e.get().0);
1203                                    e.get_mut().1 = end.min(e.get().1);
1204                                }
1205                                Entry::Vacant(e) => {
1206                                    e.insert((start, end));
1207                                }
1208                            }
1209                        }
1210                    }
1211                }
1212                Err(e) => {
1213                    err = Some(e);
1214                }
1215            }
1216        }
1217
1218        let mut oplog = self.oplog.lock().unwrap();
1219        oplog.batch_importing = false;
1220        drop(oplog);
1221
1222        if !is_detached {
1223            self.checkout_to_latest();
1224        }
1225
1226        self.renew_txn_if_auto_commit(options);
1227        if let Some(err) = err {
1228            return Err(err);
1229        }
1230
1231        Ok(ImportStatus {
1232            success,
1233            pending: if pending.is_empty() {
1234                None
1235            } else {
1236                Some(pending)
1237            },
1238        })
1239    }
1240
1241    /// Get shallow value of the document.
1242    #[inline]
1243    pub fn get_value(&self) -> LoroValue {
1244        self.state.lock().unwrap().get_value()
1245    }
1246
1247    /// Get deep value of the document.
1248    #[inline]
1249    pub fn get_deep_value(&self) -> LoroValue {
1250        self.state.lock().unwrap().get_deep_value()
1251    }
1252
1253    /// Get deep value of the document with container id
1254    #[inline]
1255    pub fn get_deep_value_with_id(&self) -> LoroValue {
1256        self.state.lock().unwrap().get_deep_value_with_id()
1257    }
1258
1259    pub fn checkout_to_latest(&self) {
1260        let (options, _guard) = self.commit_then_stop();
1261        if !self.is_detached() {
1262            drop(_guard);
1263            self.renew_txn_if_auto_commit(options);
1264            return;
1265        }
1266
1267        self._checkout_to_latest_without_commit(true);
1268        drop(_guard);
1269        self.renew_txn_if_auto_commit(options);
1270    }
1271
1272    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1273    pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1274        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1275            let f = self.oplog_frontiers();
1276            let this = &self;
1277            let frontiers = &f;
1278            this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1279                .unwrap(); // we don't need to shrink frontiers
1280                           // because oplog's frontiers are already shrinked
1281            this.emit_events();
1282            if this.config.detached_editing() {
1283                this.renew_peer_id();
1284            }
1285
1286            self.set_detached(false);
1287        });
1288    }
1289
1290    /// Checkout [DocState] to a specific version.
1291    ///
1292    /// This will make the current [DocState] detached from the latest version of [OpLog].
1293    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1294    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1295        let (options, guard) = self.commit_then_stop();
1296        self._checkout_without_emitting(frontiers, true, true)?;
1297        self.emit_events();
1298        drop(guard);
1299        if self.config.detached_editing() {
1300            self.renew_peer_id();
1301            self.renew_txn_if_auto_commit(options);
1302        } else if !self.is_detached() {
1303            self.renew_txn_if_auto_commit(options);
1304        }
1305
1306        Ok(())
1307    }
1308
1309    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1310    #[instrument(level = "info", skip(self))]
1311    pub(crate) fn _checkout_without_emitting(
1312        &self,
1313        frontiers: &Frontiers,
1314        to_shrink_frontiers: bool,
1315        to_commit_then_renew: bool,
1316    ) -> Result<(), LoroError> {
1317        assert!(self.txn.is_locked());
1318        let from_frontiers = self.state_frontiers();
1319        loro_common::info!(
1320            "checkout from={:?} to={:?} cur_vv={:?}",
1321            from_frontiers,
1322            frontiers,
1323            self.oplog_vv()
1324        );
1325
1326        if &from_frontiers == frontiers {
1327            return Ok(());
1328        }
1329
1330        let oplog = self.oplog.lock().unwrap();
1331        if oplog.dag.is_before_shallow_root(frontiers) {
1332            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1333        }
1334
1335        let frontiers = if to_shrink_frontiers {
1336            shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1337        } else {
1338            frontiers.clone()
1339        };
1340
1341        if from_frontiers == frontiers {
1342            return Ok(());
1343        }
1344
1345        let mut state = self.state.lock().unwrap();
1346        let mut calc = self.diff_calculator.lock().unwrap();
1347        for i in frontiers.iter() {
1348            if !oplog.dag.contains(i) {
1349                return Err(LoroError::FrontiersNotFound(i));
1350            }
1351        }
1352
1353        let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1354        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1355            return Err(LoroError::NotFoundError(
1356                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1357            ));
1358        };
1359
1360        self.set_detached(true);
1361        let (diff, diff_mode) =
1362            calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1363        state.apply_diff(
1364            InternalDocDiff {
1365                origin: "checkout".into(),
1366                diff: Cow::Owned(diff),
1367                by: EventTriggerKind::Checkout,
1368                new_version: Cow::Owned(frontiers.clone()),
1369            },
1370            diff_mode,
1371        );
1372
1373        Ok(())
1374    }
1375
1376    #[inline]
1377    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1378        self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1379    }
1380
1381    #[inline]
1382    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1383        self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1384    }
1385
1386    /// Import ops from other doc.
1387    ///
1388    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1389    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1390        self.import(&other.export_from(&self.oplog_vv()))
1391    }
1392
1393    pub(crate) fn arena(&self) -> &SharedArena {
1394        &self.arena
1395    }
1396
1397    #[inline]
1398    pub fn len_ops(&self) -> usize {
1399        let oplog = self.oplog.lock().unwrap();
1400        let ans = oplog.vv().values().sum::<i32>() as usize;
1401        if oplog.is_shallow() {
1402            let sub = oplog
1403                .shallow_since_vv()
1404                .iter()
1405                .map(|(_, ops)| *ops)
1406                .sum::<i32>() as usize;
1407            ans - sub
1408        } else {
1409            ans
1410        }
1411    }
1412
1413    #[inline]
1414    pub fn len_changes(&self) -> usize {
1415        let oplog = self.oplog.lock().unwrap();
1416        oplog.len_changes()
1417    }
1418
1419    pub fn config(&self) -> &Configure {
1420        &self.config
1421    }
1422
1423    /// This method compare the consistency between the current doc state
1424    /// and the state calculated by diff calculator from beginning.
1425    ///
1426    /// Panic when it's not consistent
1427    pub fn check_state_diff_calc_consistency_slow(&self) {
1428        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1429        {
1430            static IS_CHECKING: std::sync::atomic::AtomicBool =
1431                std::sync::atomic::AtomicBool::new(false);
1432            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1433                return;
1434            }
1435
1436            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1437            let peer_id = self.peer_id();
1438            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1439            let _g = s.enter();
1440            let options = self.commit_then_stop().0;
1441            self.oplog.lock().unwrap().check_dag_correctness();
1442            if self.is_shallow() {
1443                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1444                //
1445                // Instead, we:
1446                // 1. Export the initial state from the GC snapshot.
1447                // 2. Create a new document and import the initial snapshot.
1448                // 3. Export updates from the shallow start version vector to the current version.
1449                // 4. Import these updates into the new document.
1450                // 5. Compare the states of the new document and the current document.
1451
1452                // Step 1: Export the initial state from the GC snapshot.
1453                let initial_snapshot = self
1454                    .export(ExportMode::state_only(Some(
1455                        &self.shallow_since_frontiers(),
1456                    )))
1457                    .unwrap();
1458
1459                // Step 2: Create a new document and import the initial snapshot.
1460                let doc = LoroDoc::new();
1461                doc.import(&initial_snapshot).unwrap();
1462                self.checkout(&self.shallow_since_frontiers()).unwrap();
1463                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1464
1465                // Step 3: Export updates since the shallow start version vector to the current version.
1466                let updates = self.export(ExportMode::all_updates()).unwrap();
1467
1468                // Step 4: Import these updates into the new document.
1469                doc.import(&updates).unwrap();
1470                self.checkout_to_latest();
1471
1472                // Step 5: Checkout to the current state's frontiers and compare the states.
1473                // doc.checkout(&self.state_frontiers()).unwrap();
1474                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1475                let mut calculated_state = doc.app_state().lock().unwrap();
1476                let mut current_state = self.app_state().lock().unwrap();
1477                current_state.check_is_the_same(&mut calculated_state);
1478            } else {
1479                let f = self.state_frontiers();
1480                let vv = self
1481                    .oplog()
1482                    .lock()
1483                    .unwrap()
1484                    .dag
1485                    .frontiers_to_vv(&f)
1486                    .unwrap();
1487                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1488                let doc = Self::new();
1489                doc.import(&bytes).unwrap();
1490                let mut calculated_state = doc.app_state().lock().unwrap();
1491                let mut current_state = self.app_state().lock().unwrap();
1492                current_state.check_is_the_same(&mut calculated_state);
1493            }
1494
1495            self.renew_txn_if_auto_commit(options);
1496            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1497        }
1498    }
1499
1500    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1501        self.query_pos_internal(pos, true)
1502    }
1503
1504    /// Get position in a seq container
1505    pub(crate) fn query_pos_internal(
1506        &self,
1507        pos: &Cursor,
1508        ret_event_index: bool,
1509    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1510        let mut state = self.state.lock().unwrap();
1511        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1512            Ok(PosQueryResult {
1513                update: None,
1514                current: AbsolutePosition {
1515                    pos: ans,
1516                    side: pos.side,
1517                },
1518            })
1519        } else {
1520            // We need to trace back to the version where the relative position is valid.
1521            // The optimal way to find that version is to have succ info like Automerge.
1522            //
1523            // But we don't have that info now, so an alternative way is to trace back
1524            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1525            // the target is just deleted a few versions ago.
1526            //
1527            // What we need is to trace back to the latest version that deletes the target
1528            // id.
1529
1530            // commit the txn to make sure we can query the history correctly
1531            drop(state);
1532            self.commit_then_renew();
1533            let oplog = self.oplog().lock().unwrap();
1534            // TODO: assert pos.id is not unknown
1535            if let Some(id) = pos.id {
1536                // Ensure the container is registered if it exists lazily
1537                if oplog.arena.id_to_idx(&pos.container).is_none() {
1538                    let mut s = self.state.lock().unwrap();
1539                    if !s.does_container_exist(&pos.container) {
1540                        return Err(CannotFindRelativePosition::ContainerDeleted);
1541                    }
1542                    s.ensure_container(&pos.container);
1543                    drop(s);
1544                }
1545                let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1546                // We know where the target id is when we trace back to the delete_op_id.
1547                let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1548                    if oplog.shallow_since_vv().includes_id(id) {
1549                        return Err(CannotFindRelativePosition::HistoryCleared);
1550                    }
1551
1552                    tracing::error!("Cannot find id {}", id);
1553                    return Err(CannotFindRelativePosition::IdNotFound);
1554                };
1555                // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1556                let mut diff_calc = DiffCalculator::new(true);
1557                let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1558                let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1559                // TODO: PERF: it doesn't need to calc the effects here
1560                diff_calc.calc_diff_internal(
1561                    &oplog,
1562                    before,
1563                    &before_frontiers,
1564                    oplog.vv(),
1565                    oplog.frontiers(),
1566                    Some(&|target| idx == target),
1567                );
1568                // TODO: remove depth info
1569                let depth = self.arena.get_depth(idx);
1570                let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1571                match diff_calc {
1572                    crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1573                        let c = text.get_id_latest_pos(id).unwrap();
1574                        let new_pos = c.pos;
1575                        let handler = self.get_text(&pos.container);
1576                        let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1577                        Ok(PosQueryResult {
1578                            update: handler.get_cursor(current_pos, c.side),
1579                            current: AbsolutePosition {
1580                                pos: current_pos,
1581                                side: c.side,
1582                            },
1583                        })
1584                    }
1585                    crate::diff_calc::ContainerDiffCalculator::List(list) => {
1586                        let c = list.get_id_latest_pos(id).unwrap();
1587                        let new_pos = c.pos;
1588                        let handler = self.get_list(&pos.container);
1589                        Ok(PosQueryResult {
1590                            update: handler.get_cursor(new_pos, c.side),
1591                            current: AbsolutePosition {
1592                                pos: new_pos,
1593                                side: c.side,
1594                            },
1595                        })
1596                    }
1597                    crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1598                        let c = list.get_id_latest_pos(id).unwrap();
1599                        let new_pos = c.pos;
1600                        let handler = self.get_movable_list(&pos.container);
1601                        let new_pos = handler.op_pos_to_user_pos(new_pos);
1602                        Ok(PosQueryResult {
1603                            update: handler.get_cursor(new_pos, c.side),
1604                            current: AbsolutePosition {
1605                                pos: new_pos,
1606                                side: c.side,
1607                            },
1608                        })
1609                    }
1610                    crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1611                    crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1612                    #[cfg(feature = "counter")]
1613                    crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1614                    crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1615                }
1616            } else {
1617                match pos.container.container_type() {
1618                    ContainerType::Text => {
1619                        let text = self.get_text(&pos.container);
1620                        Ok(PosQueryResult {
1621                            update: Some(Cursor {
1622                                id: None,
1623                                container: text.id(),
1624                                side: pos.side,
1625                                origin_pos: text.len_unicode(),
1626                            }),
1627                            current: AbsolutePosition {
1628                                pos: text.len_event(),
1629                                side: pos.side,
1630                            },
1631                        })
1632                    }
1633                    ContainerType::List => {
1634                        let list = self.get_list(&pos.container);
1635                        Ok(PosQueryResult {
1636                            update: Some(Cursor {
1637                                id: None,
1638                                container: list.id(),
1639                                side: pos.side,
1640                                origin_pos: list.len(),
1641                            }),
1642                            current: AbsolutePosition {
1643                                pos: list.len(),
1644                                side: pos.side,
1645                            },
1646                        })
1647                    }
1648                    ContainerType::MovableList => {
1649                        let list = self.get_movable_list(&pos.container);
1650                        Ok(PosQueryResult {
1651                            update: Some(Cursor {
1652                                id: None,
1653                                container: list.id(),
1654                                side: pos.side,
1655                                origin_pos: list.len(),
1656                            }),
1657                            current: AbsolutePosition {
1658                                pos: list.len(),
1659                                side: pos.side,
1660                            },
1661                        })
1662                    }
1663                    ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1664                        unreachable!()
1665                    }
1666                    #[cfg(feature = "counter")]
1667                    ContainerType::Counter => unreachable!(),
1668                }
1669            }
1670        }
1671    }
1672
1673    /// Free the history cache that is used for making checkout faster.
1674    ///
1675    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1676    /// You can free it by calling this method.
1677    pub fn free_history_cache(&self) {
1678        self.oplog.lock().unwrap().free_history_cache();
1679    }
1680
1681    /// Free the cached diff calculator that is used for checkout.
1682    pub fn free_diff_calculator(&self) {
1683        *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1684    }
1685
1686    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1687    /// You can free it by calling `free_history_cache`.
1688    pub fn has_history_cache(&self) -> bool {
1689        self.oplog.lock().unwrap().has_history_cache()
1690    }
1691
1692    /// Encoded all ops and history cache to bytes and store them in the kv store.
1693    ///
1694    /// The parsed ops will be dropped
1695    #[inline]
1696    pub fn compact_change_store(&self) {
1697        self.commit_then_renew();
1698        self.oplog.lock().unwrap().compact_change_store();
1699    }
1700
1701    /// Analyze the container info of the doc
1702    ///
1703    /// This is used for development and debugging
1704    #[inline]
1705    pub fn analyze(&self) -> DocAnalysis {
1706        DocAnalysis::analyze(self)
1707    }
1708
1709    /// Get the path from the root to the container
1710    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1711        let mut state = self.state.lock().unwrap();
1712        if state.arena.id_to_idx(id).is_none() {
1713            if !state.does_container_exist(id) {
1714                return None;
1715            }
1716            state.ensure_container(id);
1717        }
1718        let idx = state.arena.id_to_idx(id).unwrap();
1719        state.get_path(idx)
1720    }
1721
1722    #[instrument(skip(self))]
1723    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1724        let (options, txn) = self.commit_then_stop();
1725        let ans = match mode {
1726            ExportMode::Snapshot => export_fast_snapshot(self),
1727            ExportMode::Updates { from } => export_fast_updates(self, &from),
1728            ExportMode::UpdatesInRange { spans } => {
1729                export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1730            }
1731            ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1732            ExportMode::StateOnly(f) => match f {
1733                Some(f) => export_state_only_snapshot(self, &f)?,
1734                None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1735            },
1736            ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1737        };
1738
1739        drop(txn);
1740        self.renew_txn_if_auto_commit(options);
1741        Ok(ans)
1742    }
1743
1744    /// The doc only contains the history since the shallow history start version vector.
1745    ///
1746    /// This is empty if the doc is not shallow.
1747    ///
1748    /// The ops included by the shallow history start version vector are not in the doc.
1749    pub fn shallow_since_vv(&self) -> ImVersionVector {
1750        self.oplog().lock().unwrap().shallow_since_vv().clone()
1751    }
1752
1753    pub fn shallow_since_frontiers(&self) -> Frontiers {
1754        self.oplog()
1755            .lock()
1756            .unwrap()
1757            .shallow_since_frontiers()
1758            .clone()
1759    }
1760
1761    /// Check if the doc contains the full history.
1762    pub fn is_shallow(&self) -> bool {
1763        !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1764    }
1765
1766    /// Get the number of operations in the pending transaction.
1767    ///
1768    /// The pending transaction is the one that is not committed yet. It will be committed
1769    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
1770    pub fn get_pending_txn_len(&self) -> usize {
1771        if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1772            txn.len()
1773        } else {
1774            0
1775        }
1776    }
1777
1778    #[inline]
1779    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1780        self.oplog().lock().unwrap().dag.find_path(from, to)
1781    }
1782
1783    /// Subscribe to the first commit from a peer. Operations performed on the `LoroDoc` within this callback
1784    /// will be merged into the current commit.
1785    ///
1786    /// This is useful for managing the relationship between `PeerID` and user information.
1787    /// For example, you could store user names in a `LoroMap` using `PeerID` as the key and the `UserID` as the value.
1788    pub fn subscribe_first_commit_from_peer(
1789        &self,
1790        callback: FirstCommitFromPeerCallback,
1791    ) -> Subscription {
1792        let (s, enable) = self
1793            .first_commit_from_peer_subs
1794            .inner()
1795            .insert((), callback);
1796        enable();
1797        s
1798    }
1799
1800    /// Subscribe to the pre-commit event.
1801    ///
1802    /// The callback will be called when the changes are committed but not yet applied to the OpLog.
1803    /// You can modify the commit message and timestamp in the callback by [`ChangeModifier`].
1804    pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1805        let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1806        enable();
1807        s
1808    }
1809}
1810
1811#[derive(Debug, thiserror::Error)]
1812pub enum ChangeTravelError {
1813    #[error("Target id not found {0:?}")]
1814    TargetIdNotFound(ID),
1815    #[error("The shallow history of the doc doesn't include the target version")]
1816    TargetVersionNotIncluded,
1817}
1818
1819impl LoroDoc {
1820    pub fn travel_change_ancestors(
1821        &self,
1822        ids: &[ID],
1823        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1824    ) -> Result<(), ChangeTravelError> {
1825        self.commit_then_renew();
1826        struct PendingNode(ChangeMeta);
1827        impl PartialEq for PendingNode {
1828            fn eq(&self, other: &Self) -> bool {
1829                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1830            }
1831        }
1832
1833        impl Eq for PendingNode {}
1834        impl PartialOrd for PendingNode {
1835            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1836                Some(self.cmp(other))
1837            }
1838        }
1839
1840        impl Ord for PendingNode {
1841            fn cmp(&self, other: &Self) -> Ordering {
1842                self.0
1843                    .lamport_last()
1844                    .cmp(&other.0.lamport_last())
1845                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1846            }
1847        }
1848
1849        for id in ids {
1850            let op_log = &self.oplog().lock().unwrap();
1851            if !op_log.vv().includes_id(*id) {
1852                return Err(ChangeTravelError::TargetIdNotFound(*id));
1853            }
1854            if op_log.dag.shallow_since_vv().includes_id(*id) {
1855                return Err(ChangeTravelError::TargetVersionNotIncluded);
1856            }
1857        }
1858
1859        let mut visited = FxHashSet::default();
1860        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1861        for id in ids {
1862            pending.push(PendingNode(ChangeMeta::from_change(
1863                &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1864            )));
1865        }
1866        while let Some(PendingNode(node)) = pending.pop() {
1867            let deps = node.deps.clone();
1868            if f(node).is_break() {
1869                break;
1870            }
1871
1872            for dep in deps.iter() {
1873                let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1874                    continue;
1875                };
1876                if visited.contains(&dep_node.id) {
1877                    continue;
1878                }
1879
1880                visited.insert(dep_node.id);
1881                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1882            }
1883        }
1884
1885        Ok(())
1886    }
1887
1888    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1889        self.commit_then_renew();
1890        let mut set = FxHashSet::default();
1891        let oplog = &self.oplog().lock().unwrap();
1892        for op in oplog.iter_ops(id.to_span(len)) {
1893            let id = oplog.arena.get_container_id(op.container()).unwrap();
1894            set.insert(id);
1895        }
1896
1897        set
1898    }
1899
1900    pub fn delete_root_container(&self, cid: ContainerID) {
1901        if !cid.is_root() {
1902            return;
1903        }
1904
1905        // Do not treat "not in arena" as non-existence; consult state/kv
1906        if !self.has_container(&cid) {
1907            return;
1908        }
1909
1910        let Some(h) = self.get_handler(cid.clone()) else {
1911            return;
1912        };
1913
1914        if let Err(e) = h.clear() {
1915            eprintln!("Failed to clear handler: {:?}", e);
1916            return;
1917        }
1918        self.config
1919            .deleted_root_containers
1920            .lock()
1921            .unwrap()
1922            .insert(cid);
1923    }
1924
1925    pub fn set_hide_empty_root_containers(&self, hide: bool) {
1926        self.config
1927            .hide_empty_root_containers
1928            .store(hide, std::sync::atomic::Ordering::Relaxed);
1929    }
1930}
1931
1932// FIXME: PERF: This method is quite slow because it iterates all the changes
1933fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
1934    let start_vv = oplog
1935        .dag
1936        .frontiers_to_vv(&id.into())
1937        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
1938    for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
1939        for op in change.ops.iter().rev() {
1940            if op.container != idx {
1941                continue;
1942            }
1943            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
1944                if d.id_start.to_span(d.atom_len()).contains(id) {
1945                    return Some(ID::new(change.peer(), op.counter));
1946                }
1947            }
1948        }
1949    }
1950
1951    None
1952}
1953
1954#[derive(Debug)]
1955pub struct CommitWhenDrop<'a> {
1956    doc: &'a LoroDoc,
1957    default_options: CommitOptions,
1958}
1959
1960impl Drop for CommitWhenDrop<'_> {
1961    fn drop(&mut self) {
1962        {
1963            let mut guard = self.doc.txn.lock().unwrap();
1964            if let Some(txn) = guard.as_mut() {
1965                txn.set_default_options(std::mem::take(&mut self.default_options));
1966            };
1967        }
1968
1969        self.doc.commit_then_renew();
1970    }
1971}
1972
1973/// Options for configuring a commit operation.
1974#[derive(Debug, Clone)]
1975pub struct CommitOptions {
1976    /// Origin identifier for the commit event, used to track the source of changes.
1977    /// It doesn't persist.
1978    pub origin: Option<InternalString>,
1979
1980    /// Whether to immediately start a new transaction after committing.
1981    /// Defaults to true.
1982    pub immediate_renew: bool,
1983
1984    /// Custom timestamp for the commit in seconds since Unix epoch.
1985    /// If None, the current time will be used.
1986    pub timestamp: Option<Timestamp>,
1987
1988    /// Optional commit message to attach to the changes. It will be persisted.
1989    pub commit_msg: Option<Arc<str>>,
1990}
1991
1992impl CommitOptions {
1993    /// Creates a new CommitOptions with default values.
1994    pub fn new() -> Self {
1995        Self {
1996            origin: None,
1997            immediate_renew: true,
1998            timestamp: None,
1999            commit_msg: None,
2000        }
2001    }
2002
2003    /// Sets the origin identifier for this commit.
2004    pub fn origin(mut self, origin: &str) -> Self {
2005        self.origin = Some(origin.into());
2006        self
2007    }
2008
2009    /// Sets whether to immediately start a new transaction after committing.
2010    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2011        self.immediate_renew = immediate_renew;
2012        self
2013    }
2014
2015    /// Set the timestamp of the commit.
2016    ///
2017    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
2018    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2019        self.timestamp = Some(timestamp);
2020        self
2021    }
2022
2023    /// Sets a commit message to be attached to the changes.
2024    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2025        self.commit_msg = Some(commit_msg.into());
2026        self
2027    }
2028
2029    /// Sets the origin identifier for this commit.
2030    pub fn set_origin(&mut self, origin: Option<&str>) {
2031        self.origin = origin.map(|x| x.into())
2032    }
2033
2034    /// Sets the timestamp for this commit.
2035    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2036        self.timestamp = timestamp;
2037    }
2038}
2039
2040impl Default for CommitOptions {
2041    fn default() -> Self {
2042        Self::new()
2043    }
2044}
2045
2046#[cfg(test)]
2047mod test {
2048    use loro_common::ID;
2049
2050    use crate::{version::Frontiers, LoroDoc, ToJson};
2051
2052    #[test]
2053    fn test_sync() {
2054        fn is_send_sync<T: Send + Sync>(_v: T) {}
2055        let loro = super::LoroDoc::new();
2056        is_send_sync(loro)
2057    }
2058
2059    #[test]
2060    fn test_checkout() {
2061        let loro = LoroDoc::new();
2062        loro.set_peer_id(1).unwrap();
2063        let text = loro.get_text("text");
2064        let map = loro.get_map("map");
2065        let list = loro.get_list("list");
2066        let mut txn = loro.txn().unwrap();
2067        for i in 0..10 {
2068            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2069            text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
2070            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2071        }
2072        txn.commit().unwrap();
2073        let b = LoroDoc::new();
2074        b.import(&loro.export_snapshot().unwrap()).unwrap();
2075        loro.checkout(&Frontiers::default()).unwrap();
2076        {
2077            let json = &loro.get_deep_value();
2078            assert_eq!(json.to_json(), r#"{"text":"","list":[],"map":{}}"#);
2079        }
2080
2081        b.checkout(&ID::new(1, 2).into()).unwrap();
2082        {
2083            let json = &b.get_deep_value();
2084            assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":0}}"#);
2085        }
2086
2087        loro.checkout(&ID::new(1, 3).into()).unwrap();
2088        {
2089            let json = &loro.get_deep_value();
2090            assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":1}}"#);
2091        }
2092
2093        b.checkout(&ID::new(1, 29).into()).unwrap();
2094        {
2095            let json = &b.get_deep_value();
2096            assert_eq!(
2097                json.to_json(),
2098                r#"{"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}}"#
2099            );
2100        }
2101    }
2102
2103    #[test]
2104    fn import_batch_err_181() {
2105        let a = LoroDoc::new_auto_commit();
2106        let update_a = a.export_snapshot();
2107        let b = LoroDoc::new_auto_commit();
2108        b.import_batch(&[update_a.unwrap()]).unwrap();
2109        b.get_text("text").insert(0, "hello").unwrap();
2110        b.commit_then_renew();
2111        let oplog = b.oplog().lock().unwrap();
2112        drop(oplog);
2113        b.export_from(&Default::default());
2114    }
2115}