Skip to main content

loro_internal/
loro.rs

1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8    arena::SharedArena,
9    change::Timestamp,
10    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11    container::{
12        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13        IntoContainerId,
14    },
15    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16    dag::{Dag, DagUtils},
17    diff_calc::DiffCalculator,
18    encoding::{
19        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21        export_state_only_snapshot,
22        json_schema::{encode_change_to_json, json::JsonSchema},
23        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24    },
25    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27    id::PeerID,
28    json::JsonChange,
29    op::InnerContent,
30    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31    state::DocState,
32    subscription::{LocalUpdateCallback, Observer, Subscriber},
33    undo::DiffBatch,
34    utils::subscription::{SubscriberSetWithQueue, Subscription},
35    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37    VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42    lock::{LoroLockGroup, LoroMutex},
43    txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47    ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48    LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53    borrow::Cow,
54    cmp::Ordering,
55    collections::{hash_map::Entry, BinaryHeap},
56    ops::ControlFlow,
57    sync::{
58        atomic::Ordering::{Acquire, Release},
59        Arc,
60    },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("LoroDoc")
73            .field("config", &self.config)
74            .field("auto_commit", &self.auto_commit)
75            .field("detached", &self.detached)
76            .finish()
77    }
78}
79
80impl LoroDoc {
81    /// Run the provided closure within a commit barrier.
82    ///
83    /// This finalizes any pending auto-commit transaction first (preserving
84    /// options across an empty txn), executes `f`, then renews the transaction
85    /// (carrying preserved options) if auto-commit is enabled. This is the
86    /// common implicit-commit pattern used by internal operations such as
87    /// import/export/checkouts.
88    #[inline]
89    pub fn with_barrier<F, R>(&self, f: F) -> R
90    where
91        F: FnOnce() -> R,
92    {
93        let (options, guard) = self.implicit_commit_then_stop();
94        let result = f();
95        drop(guard);
96        self.renew_txn_if_auto_commit(options);
97        result
98    }
99
100    pub fn new() -> Self {
101        let oplog = OpLog::new();
102        let arena = oplog.arena.clone();
103        let config: Configure = oplog.configure.clone();
104        let lock_group = LoroLockGroup::new();
105        let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
106        let inner = Arc::new_cyclic(|w| {
107            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
108            LoroDocInner {
109                oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
110                state,
111                config,
112                detached: AtomicBool::new(false),
113                auto_commit: AtomicBool::new(false),
114                observer: Arc::new(Observer::new(arena.clone())),
115                diff_calculator: Arc::new(
116                    lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
117                ),
118                txn: global_txn,
119                arena,
120                local_update_subs: SubscriberSetWithQueue::new(),
121                peer_id_change_subs: SubscriberSetWithQueue::new(),
122                pre_commit_subs: SubscriberSetWithQueue::new(),
123                first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
124            }
125        });
126        LoroDoc { inner }
127    }
128
129    pub fn fork(&self) -> Self {
130        if self.is_detached() {
131            return self.fork_at(&self.state_frontiers());
132        }
133
134        let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
135        let doc = Self::new();
136        doc.with_barrier(|| {
137            encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
138        })
139        .unwrap();
140        doc.set_config(&self.config);
141        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
142            doc.start_auto_commit();
143        }
144        doc
145    }
146    /// Enables editing of the document in detached mode.
147    ///
148    /// By default, the document cannot be edited in detached mode (after calling
149    /// `detach` or checking out a version other than the latest). This method
150    /// allows editing in detached mode.
151    ///
152    /// # Important Notes:
153    ///
154    /// - After enabling this mode, the document will use a different PeerID. Each
155    ///   time you call checkout, a new PeerID will be used.
156    /// - If you set a custom PeerID while this mode is enabled, ensure that
157    ///   concurrent operations with the same PeerID are not possible.
158    /// - On detached mode, importing will not change the state of the document.
159    ///   It also doesn't change the version of the [DocState]. The changes will be
160    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
161    pub fn set_detached_editing(&self, enable: bool) {
162        self.config.set_detached_editing(enable);
163        if enable && self.is_detached() {
164            self.with_barrier(|| {
165                self.renew_peer_id();
166            });
167        }
168    }
169
170    /// Create a doc with auto commit enabled.
171    #[inline]
172    pub fn new_auto_commit() -> Self {
173        let doc = Self::new();
174        doc.start_auto_commit();
175        doc
176    }
177
178    #[inline(always)]
179    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
180        if peer == PeerID::MAX {
181            return Err(LoroError::InvalidPeerID);
182        }
183        let next_id = self.oplog.lock().unwrap().next_id(peer);
184        if self.auto_commit.load(Acquire) {
185            let doc_state = self.state.lock().unwrap();
186            doc_state
187                .peer
188                .store(peer, std::sync::atomic::Ordering::Relaxed);
189
190            if doc_state.is_in_txn() {
191                drop(doc_state);
192                // Use implicit-style barrier to avoid swallowing next-commit options
193                self.with_barrier(|| {});
194            }
195            self.peer_id_change_subs.emit(&(), next_id);
196            return Ok(());
197        }
198
199        let doc_state = self.state.lock().unwrap();
200        if doc_state.is_in_txn() {
201            return Err(LoroError::TransactionError(
202                "Cannot change peer id during transaction"
203                    .to_string()
204                    .into_boxed_str(),
205            ));
206        }
207
208        doc_state
209            .peer
210            .store(peer, std::sync::atomic::Ordering::Relaxed);
211        drop(doc_state);
212        self.peer_id_change_subs.emit(&(), next_id);
213        Ok(())
214    }
215
216    /// Renews the PeerID for the document.
217    pub(crate) fn renew_peer_id(&self) {
218        let peer_id = DefaultRandom.next_u64();
219        self.set_peer_id(peer_id).unwrap();
220    }
221
222    /// Implicitly commit the cumulative auto-commit transaction.
223    /// This method only has effect when `auto_commit` is true.
224    ///
225    /// Follow-ups: the caller is responsible for renewing the transaction
226    /// as needed (e.g., via `renew_txn_if_auto_commit`). Prefer using
227    /// `with_barrier(...)` for most internal flows to handle this safely.
228    ///
229    /// Empty-commit behavior: if the pending transaction is empty, the returned
230    /// `Some(CommitOptions)` preserves next-commit options such as message and
231    /// timestamp so they can carry into the renewed transaction. Transient
232    /// labels like `origin` do not carry across an empty commit.
233    #[inline]
234    #[must_use]
235    pub fn implicit_commit_then_stop(
236        &self,
237    ) -> (
238        Option<CommitOptions>,
239        LoroMutexGuard<'_, Option<Transaction>>,
240    ) {
241        // Implicit commit: preserve options on empty commit
242        let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
243        (a, b.unwrap())
244    }
245
246    /// Commit the cumulative auto commit transaction.
247    /// It will start the next one immediately
248    ///
249    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
250    #[inline]
251    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
252        // Explicit commit: swallow options on empty commit
253        self.commit_internal(CommitOptions::new().immediate_renew(true), false)
254            .0
255    }
256
257    /// This method is called before the commit.
258    /// It can be used to modify the change before it is committed.
259    ///
260    /// It return Some(txn) if the txn is None
261    fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
262        let mut txn_guard = self.txn.lock().unwrap();
263        let Some(txn) = txn_guard.as_mut() else {
264            return Some(txn_guard);
265        };
266
267        if txn.is_peer_first_appearance {
268            txn.is_peer_first_appearance = false;
269            drop(txn_guard);
270            // First commit from a peer
271            self.first_commit_from_peer_subs.emit(
272                &(),
273                FirstCommitFromPeerPayload {
274                    peer: self.peer_id(),
275                },
276            );
277        }
278
279        None
280    }
281
282    /// Core implementation for committing the cumulative auto-commit transaction.
283    ///
284    /// - When `preserve_on_empty` is true (implicit commits like export/checkout),
285    ///   commit options from an empty transaction are carried over to the next transaction
286    ///   (except `origin`, which never carries across an empty commit).
287    /// - When `preserve_on_empty` is false (explicit commits), commit options from an
288    ///   empty transaction are swallowed and NOT carried over.
289    #[instrument(skip_all)]
290    fn commit_internal(
291        &self,
292        config: CommitOptions,
293        preserve_on_empty: bool,
294    ) -> (
295        Option<CommitOptions>,
296        Option<LoroMutexGuard<'_, Option<Transaction>>>,
297    ) {
298        if !self.auto_commit.load(Acquire) {
299            let txn_guard = self.txn.lock().unwrap();
300            // if not auto_commit, nothing should happen
301            // because the global txn is not used
302            return (None, Some(txn_guard));
303        }
304
305        loop {
306            if let Some(txn_guard) = self.before_commit() {
307                return (None, Some(txn_guard));
308            }
309
310            let mut txn_guard = self.txn.lock().unwrap();
311            let txn = txn_guard.take();
312            let Some(mut txn) = txn else {
313                return (None, Some(txn_guard));
314            };
315            let on_commit = txn.take_on_commit();
316            if let Some(origin) = config.origin.clone() {
317                txn.set_origin(origin);
318            }
319
320            if let Some(timestamp) = config.timestamp {
321                txn.set_timestamp(timestamp);
322            }
323
324            if let Some(msg) = config.commit_msg.as_ref() {
325                txn.set_msg(Some(msg.clone()));
326            }
327
328            let id_span = txn.id_span();
329            let mut options = txn.commit().unwrap();
330            // Empty commit returns Some(options). We may preserve parts of it for implicit commits.
331            if let Some(opts) = options.as_mut() {
332                // `origin` is an event-only label and never carries across an empty commit
333                if config.origin.is_some() {
334                    opts.set_origin(None);
335                }
336                // For explicit commits, swallow options from empty commit entirely
337                if !preserve_on_empty {
338                    options = None;
339                }
340            }
341            if config.immediate_renew {
342                assert!(self.can_edit());
343                let mut t = self.txn().unwrap();
344                if let Some(options) = options.as_ref() {
345                    t.set_options(options.clone());
346                }
347                *txn_guard = Some(t);
348            }
349
350            if let Some(on_commit) = on_commit {
351                drop(txn_guard);
352                on_commit(&self.state, &self.oplog, id_span);
353                txn_guard = self.txn.lock().unwrap();
354                if !config.immediate_renew && txn_guard.is_some() {
355                    // make sure that txn_guard is None when config.immediate_renew is false
356                    continue;
357                }
358            }
359
360            return (
361                options,
362                if !config.immediate_renew {
363                    Some(txn_guard)
364                } else {
365                    None
366                },
367            );
368        }
369    }
370
371    /// Commit the cumulative auto commit transaction (explicit API).
372    ///
373    /// This is used by user-facing explicit commits. If the transaction is empty,
374    /// any provided commit options are swallowed and will NOT carry over.
375    #[instrument(skip_all)]
376    pub fn commit_with(
377        &self,
378        config: CommitOptions,
379    ) -> (
380        Option<CommitOptions>,
381        Option<LoroMutexGuard<'_, Option<Transaction>>>,
382    ) {
383        self.commit_internal(config, false)
384    }
385
386    /// Set the commit message of the next commit
387    pub fn set_next_commit_message(&self, message: &str) {
388        let mut binding = self.txn.lock().unwrap();
389        let Some(txn) = binding.as_mut() else {
390            return;
391        };
392
393        if message.is_empty() {
394            txn.set_msg(None)
395        } else {
396            txn.set_msg(Some(message.into()))
397        }
398    }
399
400    /// Set the origin of the next commit
401    pub fn set_next_commit_origin(&self, origin: &str) {
402        let mut txn = self.txn.lock().unwrap();
403        if let Some(txn) = txn.as_mut() {
404            txn.set_origin(origin.into());
405        }
406    }
407
408    /// Set the timestamp of the next commit
409    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
410        let mut txn = self.txn.lock().unwrap();
411        if let Some(txn) = txn.as_mut() {
412            txn.set_timestamp(timestamp);
413        }
414    }
415
416    /// Set the options of the next commit
417    pub fn set_next_commit_options(&self, options: CommitOptions) {
418        let mut txn = self.txn.lock().unwrap();
419        if let Some(txn) = txn.as_mut() {
420            txn.set_options(options);
421        }
422    }
423
424    /// Clear the options of the next commit
425    pub fn clear_next_commit_options(&self) {
426        let mut txn = self.txn.lock().unwrap();
427        if let Some(txn) = txn.as_mut() {
428            txn.set_options(CommitOptions::new());
429        }
430    }
431
432    /// Set whether to record the timestamp of each change. Default is `false`.
433    ///
434    /// If enabled, the Unix timestamp will be recorded for each change automatically.
435    ///
436    /// You can also set each timestamp manually when you commit a change.
437    /// The timestamp manually set will override the automatic one.
438    ///
439    /// NOTE: Timestamps are forced to be in ascending order.
440    /// If you commit a new change with a timestamp that is less than the existing one,
441    /// the largest existing timestamp will be used instead.
442    #[inline]
443    pub fn set_record_timestamp(&self, record: bool) {
444        self.config.set_record_timestamp(record);
445    }
446
447    /// Set the interval of mergeable changes, in seconds.
448    ///
449    /// If two continuous local changes are within the interval, they will be merged into one change.
450    /// The default value is 1000 seconds.
451    #[inline]
452    pub fn set_change_merge_interval(&self, interval: i64) {
453        self.config.set_merge_interval(interval);
454    }
455
456    pub fn can_edit(&self) -> bool {
457        !self.is_detached() || self.config.detached_editing()
458    }
459
460    pub fn is_detached_editing_enabled(&self) -> bool {
461        self.config.detached_editing()
462    }
463
464    #[inline]
465    pub fn config_text_style(&self, text_style: StyleConfigMap) {
466        self.config.text_style_config.try_write().unwrap().map = text_style.map;
467    }
468
469    #[inline]
470    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
471        self.config
472            .text_style_config
473            .try_write()
474            .unwrap()
475            .default_style = text_style;
476    }
477    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
478        let doc = Self::new();
479        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
480        if mode.is_snapshot() {
481            doc.with_barrier(|| -> Result<(), LoroError> {
482                decode_snapshot(&doc, mode, body, Default::default())?;
483                Ok(())
484            })?;
485            Ok(doc)
486        } else {
487            Err(LoroError::DecodeError(
488                "Invalid encode mode".to_string().into(),
489            ))
490        }
491    }
492
493    /// Is the document empty? (no ops)
494    #[inline(always)]
495    pub fn can_reset_with_snapshot(&self) -> bool {
496        let oplog = self.oplog.lock().unwrap();
497        if oplog.batch_importing {
498            return false;
499        }
500
501        if self.is_detached() {
502            return false;
503        }
504
505        oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
506    }
507
508    /// Whether [OpLog] and [DocState] are detached.
509    ///
510    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
511    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
512    /// You need to call `checkout` to make it take effect.
513    #[inline(always)]
514    pub fn is_detached(&self) -> bool {
515        self.detached.load(Acquire)
516    }
517
518    pub(crate) fn set_detached(&self, detached: bool) {
519        self.detached.store(detached, Release);
520    }
521
522    #[inline(always)]
523    pub fn peer_id(&self) -> PeerID {
524        self.state
525            .lock()
526            .unwrap()
527            .peer
528            .load(std::sync::atomic::Ordering::Relaxed)
529    }
530
531    #[inline(always)]
532    pub fn detach(&self) {
533        self.with_barrier(|| self.set_detached(true));
534    }
535
536    #[inline(always)]
537    pub fn attach(&self) {
538        self.checkout_to_latest()
539    }
540
541    /// Get the timestamp of the current state.
542    /// It's the last edit time of the [DocState].
543    pub fn state_timestamp(&self) -> Timestamp {
544        // Acquire locks in correct order: read frontiers first, then query OpLog.
545        let f = { self.state.lock().unwrap().frontiers.clone() };
546        self.oplog.lock().unwrap().get_timestamp_of_version(&f)
547    }
548
549    #[inline(always)]
550    pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
551        &self.state
552    }
553
554    #[inline]
555    pub fn get_state_deep_value(&self) -> LoroValue {
556        self.state.lock().unwrap().get_deep_value()
557    }
558
559    #[inline(always)]
560    pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
561        &self.oplog
562    }
563
564    #[inline(always)]
565    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
566        let s = debug_span!("import", peer = self.peer_id());
567        let _e = s.enter();
568        self.import_with(bytes, Default::default())
569    }
570
571    #[inline]
572    pub fn import_with(
573        &self,
574        bytes: &[u8],
575        origin: InternalString,
576    ) -> Result<ImportStatus, LoroError> {
577        self.with_barrier(|| self._import_with(bytes, origin))
578    }
579
580    #[tracing::instrument(skip_all)]
581    fn _import_with(
582        &self,
583        bytes: &[u8],
584        origin: InternalString,
585    ) -> Result<ImportStatus, LoroError> {
586        ensure_cov::notify_cov("loro_internal::import");
587        let parsed = parse_header_and_body(bytes, true)?;
588        loro_common::info!("Importing with mode={:?}", &parsed.mode);
589        let result = match parsed.mode {
590            EncodeMode::OutdatedRle => {
591                if self.state.lock().unwrap().is_in_txn() {
592                    return Err(LoroError::ImportWhenInTxn);
593                }
594
595                let s = tracing::span!(
596                    tracing::Level::INFO,
597                    "Import updates ",
598                    peer = self.peer_id()
599                );
600                let _e = s.enter();
601                self.update_oplog_and_apply_delta_to_state_if_needed(
602                    |oplog| oplog.decode(parsed),
603                    origin,
604                )
605            }
606            EncodeMode::OutdatedSnapshot => {
607                if self.can_reset_with_snapshot() {
608                    loro_common::info!("Init by snapshot {}", self.peer_id());
609                    decode_snapshot(self, parsed.mode, parsed.body, origin)
610                } else {
611                    self.update_oplog_and_apply_delta_to_state_if_needed(
612                        |oplog| oplog.decode(parsed),
613                        origin,
614                    )
615                }
616            }
617            EncodeMode::FastSnapshot => {
618                if self.can_reset_with_snapshot() {
619                    ensure_cov::notify_cov("loro_internal::import::snapshot");
620                    loro_common::info!("Init by fast snapshot {}", self.peer_id());
621                    decode_snapshot(self, parsed.mode, parsed.body, origin)
622                } else {
623                    self.update_oplog_and_apply_delta_to_state_if_needed(
624                        |oplog| oplog.decode(parsed),
625                        origin,
626                    )
627
628                    // let new_doc = LoroDoc::new();
629                    // new_doc.import(bytes)?;
630                    // let updates = new_doc.export(ExportMode::updates(&self.oplog_vv())).unwrap();
631                    // return self.import_with(updates.as_slice(), origin);
632                }
633            }
634            EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
635                |oplog| oplog.decode(parsed),
636                origin,
637            ),
638            EncodeMode::Auto => {
639                unreachable!()
640            }
641        };
642
643        self.emit_events();
644
645        result
646    }
647
648    #[tracing::instrument(skip_all)]
649    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
650        &self,
651        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
652        origin: InternalString,
653    ) -> Result<ImportStatus, LoroError> {
654        let mut oplog = self.oplog.lock().unwrap();
655        if !self.is_detached() {
656            let old_vv = oplog.vv().clone();
657            let old_frontiers = oplog.frontiers().clone();
658            let result = f(&mut oplog);
659            if &old_vv != oplog.vv() {
660                let mut diff = DiffCalculator::new(false);
661                let (diff, diff_mode) = diff.calc_diff_internal(
662                    &oplog,
663                    &old_vv,
664                    &old_frontiers,
665                    oplog.vv(),
666                    oplog.dag.get_frontiers(),
667                    None,
668                );
669                let mut state = self.state.lock().unwrap();
670                state.apply_diff(
671                    InternalDocDiff {
672                        origin,
673                        diff: (diff).into(),
674                        by: EventTriggerKind::Import,
675                        new_version: Cow::Owned(oplog.frontiers().clone()),
676                    },
677                    diff_mode,
678                );
679            }
680            result
681        } else {
682            f(&mut oplog)
683        }
684    }
685
686    fn emit_events(&self) {
687        // we should not hold the lock when emitting events
688        let events = {
689            let mut state = self.state.lock().unwrap();
690            state.take_events()
691        };
692        for event in events {
693            self.observer.emit(event);
694        }
695    }
696
697    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
698        let mut state = self.state.lock().unwrap();
699        state.take_events()
700    }
701
702    /// Import the json schema updates.
703    ///
704    /// only supports backward compatibility but not forward compatibility.
705    #[tracing::instrument(skip_all)]
706    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
707        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
708        self.with_barrier(|| {
709            let result = self.update_oplog_and_apply_delta_to_state_if_needed(
710                |oplog| crate::encoding::json_schema::import_json(oplog, json),
711                Default::default(),
712            );
713            self.emit_events();
714            result
715        })
716    }
717
718    pub fn export_json_updates(
719        &self,
720        start_vv: &VersionVector,
721        end_vv: &VersionVector,
722        with_peer_compression: bool,
723    ) -> JsonSchema {
724        self.with_barrier(|| {
725            let oplog = self.oplog.lock().unwrap();
726            let mut start_vv = start_vv;
727            let _temp: Option<VersionVector>;
728            if !oplog.dag.shallow_since_vv().is_empty() {
729                // Make sure that start_vv >= shallow_since_vv
730                let mut include_all = true;
731                for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
732                    if start_vv.get(peer).unwrap_or(&0) < counter {
733                        include_all = false;
734                        break;
735                    }
736                }
737                if !include_all {
738                    let mut vv = start_vv.clone();
739                    for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
740                        vv.extend_to_include_end_id(ID::new(peer, counter));
741                    }
742                    _temp = Some(vv);
743                    start_vv = _temp.as_ref().unwrap();
744                }
745            }
746
747            crate::encoding::json_schema::export_json(
748                &oplog,
749                start_vv,
750                end_vv,
751                with_peer_compression,
752            )
753        })
754    }
755
756    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
757        let oplog = self.oplog.lock().unwrap();
758        let mut changes = export_json_in_id_span(&oplog, id_span);
759        if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
760            let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
761            changes.push(change_json);
762        }
763        changes
764    }
765
766    /// Get the version vector of the current OpLog
767    #[inline]
768    pub fn oplog_vv(&self) -> VersionVector {
769        self.oplog.lock().unwrap().vv().clone()
770    }
771
772    /// Get the version vector of the current [DocState]
773    #[inline]
774    pub fn state_vv(&self) -> VersionVector {
775        let oplog = self.oplog.lock().unwrap();
776        let f = &self.state.lock().unwrap().frontiers;
777        oplog.dag.frontiers_to_vv(f).unwrap()
778    }
779
780    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
781        let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
782        if let LoroValue::Container(c) = value {
783            Some(ValueOrHandler::Handler(Handler::new_attached(
784                c.clone(),
785                self.clone(),
786            )))
787        } else {
788            Some(ValueOrHandler::Value(value))
789        }
790    }
791
792    /// Get the handler by the string path.
793    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
794        let path = str_to_path(path)?;
795        self.get_by_path(&path)
796    }
797
798    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
799        let arena = &self.arena;
800        let txn = self.txn.lock().unwrap();
801        let txn = txn.as_ref()?;
802        let ops_ = txn.local_ops();
803        let new_id = ID {
804            peer: *txn.peer(),
805            counter: ops_.first()?.counter,
806        };
807        let change = ChangeRef {
808            id: &new_id,
809            deps: txn.frontiers(),
810            timestamp: &txn
811                .timestamp()
812                .as_ref()
813                .copied()
814                .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
815            commit_msg: txn.msg(),
816            ops: ops_,
817            lamport: txn.lamport(),
818        };
819        let json = encode_change_to_json(change, arena);
820        Some(json)
821    }
822
823    #[inline]
824    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
825        if self.has_container(&id) {
826            Some(Handler::new_attached(id, self.clone()))
827        } else {
828            None
829        }
830    }
831
832    /// id can be a str, ContainerID, or ContainerIdRaw.
833    /// if it's str it will use Root container, which will not be None
834    #[inline]
835    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
836        let id = id.into_container_id(&self.arena, ContainerType::Text);
837        assert!(self.has_container(&id));
838        Handler::new_attached(id, self.clone()).into_text().unwrap()
839    }
840
841    /// id can be a str, ContainerID, or ContainerIdRaw.
842    /// if it's str it will use Root container, which will not be None
843    #[inline]
844    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
845        let id = id.into_container_id(&self.arena, ContainerType::List);
846        assert!(self.has_container(&id));
847        Handler::new_attached(id, self.clone()).into_list().unwrap()
848    }
849
850    /// id can be a str, ContainerID, or ContainerIdRaw.
851    /// if it's str it will use Root container, which will not be None
852    #[inline]
853    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
854        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
855        assert!(self.has_container(&id));
856        Handler::new_attached(id, self.clone())
857            .into_movable_list()
858            .unwrap()
859    }
860
861    /// id can be a str, ContainerID, or ContainerIdRaw.
862    /// if it's str it will use Root container, which will not be None
863    #[inline]
864    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
865        let id = id.into_container_id(&self.arena, ContainerType::Map);
866        assert!(self.has_container(&id));
867        Handler::new_attached(id, self.clone()).into_map().unwrap()
868    }
869
870    /// id can be a str, ContainerID, or ContainerIdRaw.
871    /// if it's str it will use Root container, which will not be None
872    #[inline]
873    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
874        let id = id.into_container_id(&self.arena, ContainerType::Tree);
875        assert!(self.has_container(&id));
876        Handler::new_attached(id, self.clone()).into_tree().unwrap()
877    }
878
879    #[cfg(feature = "counter")]
880    pub fn get_counter<I: IntoContainerId>(
881        &self,
882        id: I,
883    ) -> crate::handler::counter::CounterHandler {
884        let id = id.into_container_id(&self.arena, ContainerType::Counter);
885        assert!(self.has_container(&id));
886        Handler::new_attached(id, self.clone())
887            .into_counter()
888            .unwrap()
889    }
890
891    #[must_use]
892    pub fn has_container(&self, id: &ContainerID) -> bool {
893        if id.is_root() {
894            return true;
895        }
896
897        let exist = self.state.lock().unwrap().does_container_exist(id);
898        exist
899    }
900
901    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
902    ///
903    /// This is an internal API. You should NOT use it directly.
904    ///
905    /// # Internal
906    ///
907    /// This method will use the diff calculator to calculate the diff required to time travel
908    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
909    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
910    ///
911    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
912    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
913    /// distance from id_span to the current latest version.
914    #[instrument(level = "info", skip_all)]
915    pub fn undo_internal(
916        &self,
917        id_span: IdSpan,
918        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
919        post_transform_base: Option<&DiffBatch>,
920        before_diff: &mut dyn FnMut(&DiffBatch),
921    ) -> LoroResult<CommitWhenDrop<'_>> {
922        if !self.can_edit() {
923            return Err(LoroError::EditWhenDetached);
924        }
925
926        let (options, txn) = self.implicit_commit_then_stop();
927        if !self
928            .oplog()
929            .lock()
930            .unwrap()
931            .vv()
932            .includes_id(id_span.id_last())
933        {
934            self.renew_txn_if_auto_commit(options);
935            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
936        }
937
938        let (was_recording, latest_frontiers) = {
939            let mut state = self.state.lock().unwrap();
940            let was_recording = state.is_recording();
941            state.stop_and_clear_recording();
942            (was_recording, state.frontiers.clone())
943        };
944
945        let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
946        let diff = crate::undo::undo(
947            spans,
948            match post_transform_base {
949                Some(d) => Either::Right(d),
950                None => Either::Left(&latest_frontiers),
951            },
952            |from, to| {
953                self._checkout_without_emitting(from, false, false).unwrap();
954                self.state.lock().unwrap().start_recording();
955                self._checkout_without_emitting(to, false, false).unwrap();
956                let mut state = self.state.lock().unwrap();
957                let e = state.take_events();
958                state.stop_and_clear_recording();
959                DiffBatch::new(e)
960            },
961            before_diff,
962        );
963
964        // println!("\nundo_internal: diff: {:?}", diff);
965        // println!("container remap: {:?}", container_remap);
966
967        self._checkout_without_emitting(&latest_frontiers, false, false)?;
968        self.set_detached(false);
969        if was_recording {
970            self.state.lock().unwrap().start_recording();
971        }
972        drop(txn);
973        self.start_auto_commit();
974        // Try applying the diff, but ignore the error if it happens.
975        // MovableList's undo behavior is too tricky to handle in a collaborative env
976        // so in edge cases this may be an Error
977        if let Err(e) = self._apply_diff(diff, container_remap, true) {
978            warn!("Undo Failed {:?}", e);
979        }
980
981        if let Some(options) = options {
982            self.set_next_commit_options(options);
983        }
984        Ok(CommitWhenDrop {
985            doc: self,
986            default_options: CommitOptions::new().origin("undo"),
987        })
988    }
989
990    /// Generate a series of local operations that can revert the current doc to the target
991    /// version.
992    ///
993    /// Internally, it will calculate the diff between the current state and the target state,
994    /// and apply the diff to the current state.
995    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
996        // TODO: test when the doc is readonly
997        // TODO: test when the doc is detached but enabled editing
998        let f = self.state_frontiers();
999        let diff = self.diff(&f, target)?;
1000        self._apply_diff(diff, &mut Default::default(), false)
1001    }
1002
1003    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
1004    ///
1005    /// NOTE: This method will make the doc enter the **detached mode**.
1006    // FIXME: This method needs testing (no event should be emitted during processing this)
1007    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1008        {
1009            // Check whether a and b are valid before checkout so this returns a normal error
1010            // instead of panicking on shallow docs.
1011            let oplog = self.oplog.lock().unwrap();
1012            let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1013                for id in frontiers.iter() {
1014                    if !oplog.dag.contains(id) {
1015                        return Err(LoroError::FrontiersNotFound(id));
1016                    }
1017                }
1018
1019                if oplog.dag.is_before_shallow_root(frontiers) {
1020                    return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1021                }
1022
1023                Ok(())
1024            };
1025
1026            validate_frontiers(a)?;
1027            validate_frontiers(b)?;
1028        }
1029
1030        let (options, txn) = self.implicit_commit_then_stop();
1031        let was_detached = self.is_detached();
1032        let old_frontiers = self.state_frontiers();
1033        let was_recording = {
1034            let mut state = self.state.lock().unwrap();
1035            let is_recording = state.is_recording();
1036            state.stop_and_clear_recording();
1037            is_recording
1038        };
1039        self._checkout_without_emitting(a, true, false).unwrap();
1040        self.state.lock().unwrap().start_recording();
1041        self._checkout_without_emitting(b, true, false).unwrap();
1042        let e = {
1043            let mut state = self.state.lock().unwrap();
1044            let e = state.take_events();
1045            state.stop_and_clear_recording();
1046            e
1047        };
1048        self._checkout_without_emitting(&old_frontiers, false, false)
1049            .unwrap();
1050        drop(txn);
1051        if !was_detached {
1052            self.set_detached(false);
1053            self.renew_txn_if_auto_commit(options);
1054        }
1055        if was_recording {
1056            self.state.lock().unwrap().start_recording();
1057        }
1058        Ok(DiffBatch::new(e))
1059    }
1060
1061    /// Apply a diff to the current state.
1062    #[inline(always)]
1063    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1064        self._apply_diff(diff, &mut Default::default(), true)
1065    }
1066
1067    /// Apply a diff to the current state.
1068    ///
1069    /// This method will not recreate containers with the same [ContainerID]s.
1070    /// While this can be convenient in certain cases, it can break several internal invariants:
1071    ///
1072    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
1073    ///    would result in multiple instances of the same container in the document.
1074    /// 2. Unreachable containers should be removable from the state when necessary.
1075    ///
1076    /// However, the diff may contain operations that depend on container IDs.
1077    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
1078    pub(crate) fn _apply_diff(
1079        &self,
1080        diff: DiffBatch,
1081        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1082        skip_unreachable: bool,
1083    ) -> LoroResult<()> {
1084        if !self.can_edit() {
1085            return Err(LoroError::EditWhenDetached);
1086        }
1087
1088        let mut ans: LoroResult<()> = Ok(());
1089        let mut missing_containers: Vec<ContainerID> = Vec::new();
1090        for (mut id, diff) in diff.into_iter() {
1091            let mut remapped = false;
1092            while let Some(rid) = container_remap.get(&id) {
1093                remapped = true;
1094                id = rid.clone();
1095            }
1096
1097            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1098                // Not in arena does not imply non-existent; consult state/kv and register lazily
1099                let exists = self.state.lock().unwrap().does_container_exist(&id);
1100                if !exists {
1101                    missing_containers.push(id);
1102                    continue;
1103                }
1104                // Ensure registration so handlers can be created
1105                self.state.lock().unwrap().ensure_container(&id);
1106            }
1107
1108            if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1109                continue;
1110            }
1111
1112            let Some(h) = self.get_handler(id.clone()) else {
1113                return Err(LoroError::ContainersNotFound {
1114                    containers: Box::new(vec![id]),
1115                });
1116            };
1117            if let Err(e) = h.apply_diff(diff, container_remap) {
1118                ans = Err(e);
1119            }
1120        }
1121
1122        if !missing_containers.is_empty() {
1123            return Err(LoroError::ContainersNotFound {
1124                containers: Box::new(missing_containers),
1125            });
1126        }
1127
1128        ans
1129    }
1130
1131    /// This is for debugging purpose. It will travel the whole oplog
1132    #[inline]
1133    pub fn diagnose_size(&self) {
1134        self.oplog().lock().unwrap().diagnose_size();
1135    }
1136
1137    #[inline]
1138    pub fn oplog_frontiers(&self) -> Frontiers {
1139        self.oplog().lock().unwrap().frontiers().clone()
1140    }
1141
1142    #[inline]
1143    pub fn state_frontiers(&self) -> Frontiers {
1144        self.state.lock().unwrap().frontiers.clone()
1145    }
1146
1147    /// - Ordering::Less means self is less than target or parallel
1148    /// - Ordering::Equal means versions equal
1149    /// - Ordering::Greater means self's version is greater than target
1150    #[inline]
1151    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1152        self.oplog().lock().unwrap().cmp_with_frontiers(other)
1153    }
1154
1155    /// Compare two [Frontiers] causally.
1156    ///
1157    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1158    #[inline]
1159    pub fn cmp_frontiers(
1160        &self,
1161        a: &Frontiers,
1162        b: &Frontiers,
1163    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1164        self.oplog().lock().unwrap().cmp_frontiers(a, b)
1165    }
1166
1167    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1168        let mut state = self.state.lock().unwrap();
1169        if !state.is_recording() {
1170            state.start_recording();
1171        }
1172
1173        self.observer.subscribe_root(callback)
1174    }
1175
1176    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1177        let mut state = self.state.lock().unwrap();
1178        if !state.is_recording() {
1179            state.start_recording();
1180        }
1181
1182        self.observer.subscribe(container_id, callback)
1183    }
1184
1185    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1186        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1187        activate();
1188        sub
1189    }
1190
1191    // PERF: opt
1192    #[tracing::instrument(skip_all)]
1193    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1194        if bytes.is_empty() {
1195            return Ok(ImportStatus::default());
1196        }
1197
1198        if bytes.len() == 1 {
1199            return self.import(&bytes[0]);
1200        }
1201
1202        let mut success = VersionRange::default();
1203        let mut pending = VersionRange::default();
1204        let mut meta_arr = bytes
1205            .iter()
1206            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1207            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1208        meta_arr.sort_by(|a, b| {
1209            a.0.mode
1210                .cmp(&b.0.mode)
1211                .then(b.0.change_num.cmp(&a.0.change_num))
1212        });
1213
1214        let (options, txn) = self.implicit_commit_then_stop();
1215        // Why we should keep locking `txn` here
1216        //
1217        // In a multi-threaded environment, `import_batch` used to drop the txn lock
1218        // (via `commit_then_stop` + `drop(txn)`) and call `detach()`/`checkout_to_latest()`
1219        // around the batch import. That created a race where another thread could
1220        // start or renew the auto-commit txn and perform local edits while we were
1221        // importing and temporarily detached. Those interleaved local edits could
1222        // violate invariants between `OpLog` and `DocState` (e.g., state being
1223        // updated when we expect it not to, missed events, or inconsistent
1224        // frontiers), as exposed by the loom test `local_edits_during_batch_import`.
1225        //
1226        // The fix is to hold the txn mutex for the entire critical section:
1227        // - Stop the current txn and keep the mutex guard.
1228        // - Force-detach with `set_detached(true)` (avoids `detach()` side effects),
1229        //   then run each `_import_with(...)` while detached so imports only touch
1230        //   the `OpLog`.
1231        // - After importing, reattach by checking out to latest and renew the txn
1232        //   using `_checkout_to_latest_with_guard`, which keeps the mutex held while
1233        //   (re)starting the auto-commit txn.
1234        //
1235        // Holding the lock ensures no concurrent thread can create/renew a txn and
1236        // do local edits in the middle of the batch import, making the whole
1237        // operation atomic with respect to local edits.
1238        let is_detached = self.is_detached();
1239        self.set_detached(true);
1240        self.oplog.lock().unwrap().batch_importing = true;
1241        let mut err = None;
1242        for (_meta, data) in meta_arr {
1243            match self._import_with(data, Default::default()) {
1244                Ok(s) => {
1245                    for (peer, (start, end)) in s.success.iter() {
1246                        match success.0.entry(*peer) {
1247                            Entry::Occupied(mut e) => {
1248                                e.get_mut().1 = *end.max(&e.get().1);
1249                            }
1250                            Entry::Vacant(e) => {
1251                                e.insert((*start, *end));
1252                            }
1253                        }
1254                    }
1255
1256                    if let Some(p) = s.pending.as_ref() {
1257                        for (&peer, &(start, end)) in p.iter() {
1258                            match pending.0.entry(peer) {
1259                                Entry::Occupied(mut e) => {
1260                                    e.get_mut().0 = start.min(e.get().0);
1261                                    e.get_mut().1 = end.min(e.get().1);
1262                                }
1263                                Entry::Vacant(e) => {
1264                                    e.insert((start, end));
1265                                }
1266                            }
1267                        }
1268                    }
1269                }
1270                Err(e) => {
1271                    err = Some(e);
1272                }
1273            }
1274        }
1275
1276        let mut oplog = self.oplog.lock().unwrap();
1277        oplog.batch_importing = false;
1278        drop(oplog);
1279        if !is_detached {
1280            self._checkout_to_latest_with_guard(txn);
1281        } else {
1282            drop(txn);
1283        }
1284
1285        self.renew_txn_if_auto_commit(options);
1286        if let Some(err) = err {
1287            return Err(err);
1288        }
1289
1290        Ok(ImportStatus {
1291            success,
1292            pending: if pending.is_empty() {
1293                None
1294            } else {
1295                Some(pending)
1296            },
1297        })
1298    }
1299
1300    /// Get shallow value of the document.
1301    #[inline]
1302    pub fn get_value(&self) -> LoroValue {
1303        self.state.lock().unwrap().get_value()
1304    }
1305
1306    /// Get deep value of the document.
1307    #[inline]
1308    pub fn get_deep_value(&self) -> LoroValue {
1309        self.state.lock().unwrap().get_deep_value()
1310    }
1311
1312    /// Get deep value of the document with container id
1313    #[inline]
1314    pub fn get_deep_value_with_id(&self) -> LoroValue {
1315        self.state.lock().unwrap().get_deep_value_with_id()
1316    }
1317
1318    pub fn checkout_to_latest(&self) {
1319        let (options, _guard) = self.implicit_commit_then_stop();
1320        if !self.is_detached() {
1321            drop(_guard);
1322            self.renew_txn_if_auto_commit(options);
1323            return;
1324        }
1325
1326        self._checkout_to_latest_without_commit(true);
1327        drop(_guard);
1328        self.renew_txn_if_auto_commit(options);
1329    }
1330
1331    fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1332        if !self.is_detached() {
1333            self._renew_txn_if_auto_commit_with_guard(None, guard);
1334            return;
1335        }
1336
1337        self._checkout_to_latest_without_commit(true);
1338        self._renew_txn_if_auto_commit_with_guard(None, guard);
1339    }
1340
1341    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1342    pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1343        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1344            let f = self.oplog_frontiers();
1345            let this = &self;
1346            let frontiers = &f;
1347            this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1348                .unwrap(); // we don't need to shrink frontiers
1349                           // because oplog's frontiers are already shrinked
1350            this.emit_events();
1351            if this.config.detached_editing() {
1352                this.renew_peer_id();
1353            }
1354
1355            self.set_detached(false);
1356        });
1357    }
1358
1359    /// Checkout [DocState] to a specific version.
1360    ///
1361    /// This will make the current [DocState] detached from the latest version of [OpLog].
1362    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1363    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1364        let was_detached = self.is_detached();
1365        let (options, guard) = self.implicit_commit_then_stop();
1366        let result = self._checkout_without_emitting(frontiers, true, true);
1367        if result.is_ok() {
1368            self.emit_events();
1369        }
1370        drop(guard);
1371        if self.config.detached_editing() {
1372            if result.is_ok() {
1373                self.renew_peer_id();
1374            }
1375            self.renew_txn_if_auto_commit(options);
1376        } else if result.is_err() {
1377            if !was_detached {
1378                self.renew_txn_if_auto_commit(options);
1379            }
1380        } else if !self.is_detached() {
1381            self.renew_txn_if_auto_commit(options);
1382        }
1383
1384        result
1385    }
1386
1387    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1388    #[instrument(level = "info", skip(self))]
1389    pub(crate) fn _checkout_without_emitting(
1390        &self,
1391        frontiers: &Frontiers,
1392        to_shrink_frontiers: bool,
1393        to_commit_then_renew: bool,
1394    ) -> Result<(), LoroError> {
1395        assert!(self.txn.is_locked());
1396        let from_frontiers = self.state_frontiers();
1397        loro_common::info!(
1398            "checkout from={:?} to={:?} cur_vv={:?}",
1399            from_frontiers,
1400            frontiers,
1401            self.oplog_vv()
1402        );
1403
1404        if &from_frontiers == frontiers {
1405            return Ok(());
1406        }
1407
1408        let oplog = self.oplog.lock().unwrap();
1409        if oplog.dag.is_before_shallow_root(frontiers) {
1410            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1411        }
1412
1413        let frontiers = if to_shrink_frontiers {
1414            shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1415        } else {
1416            frontiers.clone()
1417        };
1418
1419        if from_frontiers == frontiers {
1420            return Ok(());
1421        }
1422
1423        let mut state = self.state.lock().unwrap();
1424        let mut calc = self.diff_calculator.lock().unwrap();
1425        for i in frontiers.iter() {
1426            if !oplog.dag.contains(i) {
1427                return Err(LoroError::FrontiersNotFound(i));
1428            }
1429        }
1430
1431        let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1432        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1433            return Err(LoroError::NotFoundError(
1434                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1435            ));
1436        };
1437
1438        self.set_detached(true);
1439        let (diff, diff_mode) =
1440            calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1441        state.apply_diff(
1442            InternalDocDiff {
1443                origin: "checkout".into(),
1444                diff: Cow::Owned(diff),
1445                by: EventTriggerKind::Checkout,
1446                new_version: Cow::Owned(frontiers.clone()),
1447            },
1448            diff_mode,
1449        );
1450
1451        Ok(())
1452    }
1453
1454    #[inline]
1455    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1456        self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1457    }
1458
1459    #[inline]
1460    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1461        self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1462    }
1463
1464    /// Import ops from other doc.
1465    ///
1466    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1467    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1468        let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1469        self.import(&updates)
1470    }
1471
1472    pub(crate) fn arena(&self) -> &SharedArena {
1473        &self.arena
1474    }
1475
1476    #[inline]
1477    pub fn len_ops(&self) -> usize {
1478        let oplog = self.oplog.lock().unwrap();
1479        let ans = oplog.vv().values().sum::<i32>() as usize;
1480        if oplog.is_shallow() {
1481            let sub = oplog
1482                .shallow_since_vv()
1483                .iter()
1484                .map(|(_, ops)| *ops)
1485                .sum::<i32>() as usize;
1486            ans - sub
1487        } else {
1488            ans
1489        }
1490    }
1491
1492    #[inline]
1493    pub fn len_changes(&self) -> usize {
1494        let oplog = self.oplog.lock().unwrap();
1495        oplog.len_changes()
1496    }
1497
1498    pub fn config(&self) -> &Configure {
1499        &self.config
1500    }
1501
1502    /// This method compare the consistency between the current doc state
1503    /// and the state calculated by diff calculator from beginning.
1504    ///
1505    /// Panic when it's not consistent
1506    pub fn check_state_diff_calc_consistency_slow(&self) {
1507        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1508        {
1509            static IS_CHECKING: std::sync::atomic::AtomicBool =
1510                std::sync::atomic::AtomicBool::new(false);
1511            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1512                return;
1513            }
1514
1515            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1516            let peer_id = self.peer_id();
1517            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1518            let _g = s.enter();
1519            let options = self.implicit_commit_then_stop().0;
1520            self.oplog.lock().unwrap().check_dag_correctness();
1521            if self.is_shallow() {
1522                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1523                //
1524                // Instead, we:
1525                // 1. Export the initial state from the GC snapshot.
1526                // 2. Create a new document and import the initial snapshot.
1527                // 3. Export updates from the shallow start version vector to the current version.
1528                // 4. Import these updates into the new document.
1529                // 5. Compare the states of the new document and the current document.
1530
1531                // Step 1: Export the initial state from the GC snapshot.
1532                let initial_snapshot = self
1533                    .export(ExportMode::state_only(Some(
1534                        &self.shallow_since_frontiers(),
1535                    )))
1536                    .unwrap();
1537
1538                // Step 2: Create a new document and import the initial snapshot.
1539                let doc = LoroDoc::new();
1540                doc.import(&initial_snapshot).unwrap();
1541                self.checkout(&self.shallow_since_frontiers()).unwrap();
1542                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1543
1544                // Step 3: Export updates since the shallow start version vector to the current version.
1545                let updates = self.export(ExportMode::all_updates()).unwrap();
1546
1547                // Step 4: Import these updates into the new document.
1548                doc.import(&updates).unwrap();
1549                self.checkout_to_latest();
1550
1551                // Step 5: Checkout to the current state's frontiers and compare the states.
1552                // doc.checkout(&self.state_frontiers()).unwrap();
1553                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1554                let mut calculated_state = doc.app_state().lock().unwrap();
1555                let mut current_state = self.app_state().lock().unwrap();
1556                current_state.check_is_the_same(&mut calculated_state);
1557            } else {
1558                let f = self.state_frontiers();
1559                let vv = self
1560                    .oplog()
1561                    .lock()
1562                    .unwrap()
1563                    .dag
1564                    .frontiers_to_vv(&f)
1565                    .unwrap();
1566                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1567                let doc = Self::new();
1568                doc.import(&bytes).unwrap();
1569                let mut calculated_state = doc.app_state().lock().unwrap();
1570                let mut current_state = self.app_state().lock().unwrap();
1571                current_state.check_is_the_same(&mut calculated_state);
1572            }
1573
1574            self.renew_txn_if_auto_commit(options);
1575            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1576        }
1577    }
1578
1579    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1580        self.query_pos_internal(pos, true)
1581    }
1582
1583    /// Get position in a seq container
1584    pub(crate) fn query_pos_internal(
1585        &self,
1586        pos: &Cursor,
1587        ret_event_index: bool,
1588    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1589        if !self.has_container(&pos.container) {
1590            return Err(CannotFindRelativePosition::IdNotFound);
1591        }
1592
1593        let mut state = self.state.lock().unwrap();
1594        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1595            Ok(PosQueryResult {
1596                update: None,
1597                current: AbsolutePosition {
1598                    pos: ans,
1599                    side: pos.side,
1600                },
1601            })
1602        } else {
1603            // We need to trace back to the version where the relative position is valid.
1604            // The optimal way to find that version is to have succ info like Automerge.
1605            //
1606            // But we don't have that info now, so an alternative way is to trace back
1607            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1608            // the target is just deleted a few versions ago.
1609            //
1610            // What we need is to trace back to the latest version that deletes the target
1611            // id.
1612
1613            // commit the txn to make sure we can query the history correctly, preserving options
1614            drop(state);
1615            let result = self.with_barrier(|| {
1616                let oplog = self.oplog().lock().unwrap();
1617                // TODO: assert pos.id is not unknown
1618                if let Some(id) = pos.id {
1619                    // Ensure the container is registered if it exists lazily
1620                    if oplog.arena.id_to_idx(&pos.container).is_none() {
1621                        let mut s = self.state.lock().unwrap();
1622                        if !s.does_container_exist(&pos.container) {
1623                            return Err(CannotFindRelativePosition::ContainerDeleted);
1624                        }
1625                        s.ensure_container(&pos.container);
1626                        drop(s);
1627                    }
1628                    let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1629                    // We know where the target id is when we trace back to the delete_op_id.
1630                    let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1631                        if oplog.shallow_since_vv().includes_id(id) {
1632                            return Err(CannotFindRelativePosition::HistoryCleared);
1633                        }
1634
1635                        tracing::error!("Cannot find id {}", id);
1636                        return Err(CannotFindRelativePosition::IdNotFound);
1637                    };
1638                    // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1639                    let mut diff_calc = DiffCalculator::new(true);
1640                    let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1641                    let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1642                    // TODO: PERF: it doesn't need to calc the effects here
1643                    diff_calc.calc_diff_internal(
1644                        &oplog,
1645                        before,
1646                        &before_frontiers,
1647                        oplog.vv(),
1648                        oplog.frontiers(),
1649                        Some(&|target| idx == target),
1650                    );
1651                    // TODO: remove depth info
1652                    let depth = self.arena.get_depth(idx);
1653                    let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1654                    match diff_calc {
1655                        crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1656                            let c = text.get_id_latest_pos(id).unwrap();
1657                            let new_pos = c.pos;
1658                            let handler = self.get_text(&pos.container);
1659                            let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1660                            Ok(PosQueryResult {
1661                                update: handler.get_cursor(current_pos, c.side),
1662                                current: AbsolutePosition {
1663                                    pos: current_pos,
1664                                    side: c.side,
1665                                },
1666                            })
1667                        }
1668                        crate::diff_calc::ContainerDiffCalculator::List(list) => {
1669                            let c = list.get_id_latest_pos(id).unwrap();
1670                            let new_pos = c.pos;
1671                            let handler = self.get_list(&pos.container);
1672                            Ok(PosQueryResult {
1673                                update: handler.get_cursor(new_pos, c.side),
1674                                current: AbsolutePosition {
1675                                    pos: new_pos,
1676                                    side: c.side,
1677                                },
1678                            })
1679                        }
1680                        crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1681                            let c = list.get_id_latest_pos(id).unwrap();
1682                            let new_pos = c.pos;
1683                            let handler = self.get_movable_list(&pos.container);
1684                            let new_pos = handler.op_pos_to_user_pos(new_pos);
1685                            Ok(PosQueryResult {
1686                                update: handler.get_cursor(new_pos, c.side),
1687                                current: AbsolutePosition {
1688                                    pos: new_pos,
1689                                    side: c.side,
1690                                },
1691                            })
1692                        }
1693                        crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1694                        crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1695                        #[cfg(feature = "counter")]
1696                        crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1697                        crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1698                    }
1699                } else {
1700                    match pos.container.container_type() {
1701                        ContainerType::Text => {
1702                            let text = self.get_text(&pos.container);
1703                            Ok(PosQueryResult {
1704                                update: Some(Cursor {
1705                                    id: None,
1706                                    container: text.id(),
1707                                    side: pos.side,
1708                                    origin_pos: text.len_unicode(),
1709                                }),
1710                                current: AbsolutePosition {
1711                                    pos: text.len_event(),
1712                                    side: pos.side,
1713                                },
1714                            })
1715                        }
1716                        ContainerType::List => {
1717                            let list = self.get_list(&pos.container);
1718                            Ok(PosQueryResult {
1719                                update: Some(Cursor {
1720                                    id: None,
1721                                    container: list.id(),
1722                                    side: pos.side,
1723                                    origin_pos: list.len(),
1724                                }),
1725                                current: AbsolutePosition {
1726                                    pos: list.len(),
1727                                    side: pos.side,
1728                                },
1729                            })
1730                        }
1731                        ContainerType::MovableList => {
1732                            let list = self.get_movable_list(&pos.container);
1733                            Ok(PosQueryResult {
1734                                update: Some(Cursor {
1735                                    id: None,
1736                                    container: list.id(),
1737                                    side: pos.side,
1738                                    origin_pos: list.len(),
1739                                }),
1740                                current: AbsolutePosition {
1741                                    pos: list.len(),
1742                                    side: pos.side,
1743                                },
1744                            })
1745                        }
1746                        ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1747                            unreachable!()
1748                        }
1749                        #[cfg(feature = "counter")]
1750                        ContainerType::Counter => unreachable!(),
1751                    }
1752                }
1753            });
1754            result
1755        }
1756    }
1757
1758    /// Free the history cache that is used for making checkout faster.
1759    ///
1760    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1761    /// You can free it by calling this method.
1762    pub fn free_history_cache(&self) {
1763        self.oplog.lock().unwrap().free_history_cache();
1764    }
1765
1766    /// Free the cached diff calculator that is used for checkout.
1767    pub fn free_diff_calculator(&self) {
1768        *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1769    }
1770
1771    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1772    /// You can free it by calling `free_history_cache`.
1773    pub fn has_history_cache(&self) -> bool {
1774        self.oplog.lock().unwrap().has_history_cache()
1775    }
1776
1777    /// Encoded all ops and history cache to bytes and store them in the kv store.
1778    ///
1779    /// The parsed ops will be dropped
1780    #[inline]
1781    pub fn compact_change_store(&self) {
1782        self.with_barrier(|| {
1783            self.oplog.lock().unwrap().compact_change_store();
1784        });
1785    }
1786
1787    /// Analyze the container info of the doc
1788    ///
1789    /// This is used for development and debugging
1790    #[inline]
1791    pub fn analyze(&self) -> DocAnalysis {
1792        DocAnalysis::analyze(self)
1793    }
1794
1795    /// Get the path from the root to the container
1796    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1797        let mut state = self.state.lock().unwrap();
1798        if state.arena.id_to_idx(id).is_none() {
1799            if !state.does_container_exist(id) {
1800                return None;
1801            }
1802            state.ensure_container(id);
1803        }
1804        let idx = state.arena.id_to_idx(id).unwrap();
1805        state.get_path(idx)
1806    }
1807
1808    #[instrument(skip(self))]
1809    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1810        self.with_barrier(|| {
1811            let ans = match mode {
1812                ExportMode::Snapshot => export_fast_snapshot(self),
1813                ExportMode::Updates { from } => export_fast_updates(self, &from),
1814                ExportMode::UpdatesInRange { spans } => {
1815                    export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1816                }
1817                ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1818                ExportMode::StateOnly(f) => match f {
1819                    Some(f) => export_state_only_snapshot(self, &f)?,
1820                    None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1821                },
1822                ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1823            };
1824            Ok(ans)
1825        })
1826    }
1827
1828    /// The doc only contains the history since the shallow history start version vector.
1829    ///
1830    /// This is empty if the doc is not shallow.
1831    ///
1832    /// The ops included by the shallow history start version vector are not in the doc.
1833    pub fn shallow_since_vv(&self) -> ImVersionVector {
1834        self.oplog().lock().unwrap().shallow_since_vv().clone()
1835    }
1836
1837    pub fn shallow_since_frontiers(&self) -> Frontiers {
1838        self.oplog()
1839            .lock()
1840            .unwrap()
1841            .shallow_since_frontiers()
1842            .clone()
1843    }
1844
1845    /// Check if the doc contains the full history.
1846    pub fn is_shallow(&self) -> bool {
1847        !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1848    }
1849
1850    /// Get the number of operations in the pending transaction.
1851    ///
1852    /// The pending transaction is the one that is not committed yet. It will be committed
1853    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
1854    pub fn get_pending_txn_len(&self) -> usize {
1855        if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1856            txn.len()
1857        } else {
1858            0
1859        }
1860    }
1861
1862    #[inline]
1863    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1864        self.oplog().lock().unwrap().dag.find_path(from, to)
1865    }
1866
1867    /// Subscribe to the first commit from a peer. Operations performed on the `LoroDoc` within this callback
1868    /// will be merged into the current commit.
1869    ///
1870    /// This is useful for managing the relationship between `PeerID` and user information.
1871    /// For example, you could store user names in a `LoroMap` using `PeerID` as the key and the `UserID` as the value.
1872    pub fn subscribe_first_commit_from_peer(
1873        &self,
1874        callback: FirstCommitFromPeerCallback,
1875    ) -> Subscription {
1876        let (s, enable) = self
1877            .first_commit_from_peer_subs
1878            .inner()
1879            .insert((), callback);
1880        enable();
1881        s
1882    }
1883
1884    /// Subscribe to the pre-commit event.
1885    ///
1886    /// The callback will be called when the changes are committed but not yet applied to the OpLog.
1887    /// You can modify the commit message and timestamp in the callback by [`ChangeModifier`].
1888    pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1889        let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1890        enable();
1891        s
1892    }
1893}
1894
1895#[derive(Debug, thiserror::Error)]
1896pub enum ChangeTravelError {
1897    #[error("Target id not found {0:?}")]
1898    TargetIdNotFound(ID),
1899    #[error("The shallow history of the doc doesn't include the target version")]
1900    TargetVersionNotIncluded,
1901}
1902
1903impl LoroDoc {
1904    pub fn travel_change_ancestors(
1905        &self,
1906        ids: &[ID],
1907        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1908    ) -> Result<(), ChangeTravelError> {
1909        let (options, guard) = self.implicit_commit_then_stop();
1910        drop(guard);
1911        struct PendingNode(ChangeMeta);
1912        impl PartialEq for PendingNode {
1913            fn eq(&self, other: &Self) -> bool {
1914                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1915            }
1916        }
1917
1918        impl Eq for PendingNode {}
1919        impl PartialOrd for PendingNode {
1920            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1921                Some(self.cmp(other))
1922            }
1923        }
1924
1925        impl Ord for PendingNode {
1926            fn cmp(&self, other: &Self) -> Ordering {
1927                self.0
1928                    .lamport_last()
1929                    .cmp(&other.0.lamport_last())
1930                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1931            }
1932        }
1933
1934        for id in ids {
1935            let op_log = &self.oplog().lock().unwrap();
1936            if !op_log.vv().includes_id(*id) {
1937                return Err(ChangeTravelError::TargetIdNotFound(*id));
1938            }
1939            if op_log.dag.shallow_since_vv().includes_id(*id) {
1940                return Err(ChangeTravelError::TargetVersionNotIncluded);
1941            }
1942        }
1943
1944        let mut visited = FxHashSet::default();
1945        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1946        for id in ids {
1947            pending.push(PendingNode(ChangeMeta::from_change(
1948                &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1949            )));
1950        }
1951        while let Some(PendingNode(node)) = pending.pop() {
1952            let deps = node.deps.clone();
1953            if f(node).is_break() {
1954                break;
1955            }
1956
1957            for dep in deps.iter() {
1958                let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1959                    continue;
1960                };
1961                if visited.contains(&dep_node.id) {
1962                    continue;
1963                }
1964
1965                visited.insert(dep_node.id);
1966                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1967            }
1968        }
1969
1970        let ans = Ok(());
1971        self.renew_txn_if_auto_commit(options);
1972        ans
1973    }
1974
1975    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1976        self.with_barrier(|| {
1977            let mut set = FxHashSet::default();
1978            {
1979                let oplog = self.oplog().lock().unwrap();
1980                for op in oplog.iter_ops(id.to_span(len)) {
1981                    let id = oplog.arena.get_container_id(op.container()).unwrap();
1982                    set.insert(id);
1983                }
1984            }
1985            set
1986        })
1987    }
1988
1989    pub fn delete_root_container(&self, cid: ContainerID) {
1990        if !cid.is_root() {
1991            return;
1992        }
1993
1994        // Do not treat "not in arena" as non-existence; consult state/kv
1995        if !self.has_container(&cid) {
1996            return;
1997        }
1998
1999        let Some(h) = self.get_handler(cid.clone()) else {
2000            return;
2001        };
2002
2003        if let Err(e) = h.clear() {
2004            eprintln!("Failed to clear handler: {:?}", e);
2005            return;
2006        }
2007        self.config
2008            .deleted_root_containers
2009            .lock()
2010            .unwrap()
2011            .insert(cid);
2012    }
2013
2014    pub fn set_hide_empty_root_containers(&self, hide: bool) {
2015        self.config
2016            .hide_empty_root_containers
2017            .store(hide, std::sync::atomic::Ordering::Relaxed);
2018    }
2019}
2020
2021// FIXME: PERF: This method is quite slow because it iterates all the changes
2022fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2023    let start_vv = oplog
2024        .dag
2025        .frontiers_to_vv(&id.into())
2026        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2027    for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2028        for op in change.ops.iter().rev() {
2029            if op.container != idx {
2030                continue;
2031            }
2032            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2033                if d.id_start.to_span(d.atom_len()).contains(id) {
2034                    return Some(ID::new(change.peer(), op.counter));
2035                }
2036            }
2037        }
2038    }
2039
2040    None
2041}
2042
2043#[derive(Debug)]
2044pub struct CommitWhenDrop<'a> {
2045    doc: &'a LoroDoc,
2046    default_options: CommitOptions,
2047}
2048
2049impl Drop for CommitWhenDrop<'_> {
2050    fn drop(&mut self) {
2051        {
2052            let mut guard = self.doc.txn.lock().unwrap();
2053            if let Some(txn) = guard.as_mut() {
2054                txn.set_default_options(std::mem::take(&mut self.default_options));
2055            };
2056        }
2057
2058        self.doc.commit_then_renew();
2059    }
2060}
2061
2062/// Options for configuring a commit operation.
2063#[derive(Debug, Clone)]
2064pub struct CommitOptions {
2065    /// Origin identifier for the commit event, used to track the source of changes.
2066    /// It doesn't persist.
2067    pub origin: Option<InternalString>,
2068
2069    /// Whether to immediately start a new transaction after committing.
2070    /// Defaults to true.
2071    pub immediate_renew: bool,
2072
2073    /// Custom timestamp for the commit in seconds since Unix epoch.
2074    /// If None, the current time will be used.
2075    pub timestamp: Option<Timestamp>,
2076
2077    /// Optional commit message to attach to the changes. It will be persisted.
2078    pub commit_msg: Option<Arc<str>>,
2079}
2080
2081impl CommitOptions {
2082    /// Creates a new CommitOptions with default values.
2083    pub fn new() -> Self {
2084        Self {
2085            origin: None,
2086            immediate_renew: true,
2087            timestamp: None,
2088            commit_msg: None,
2089        }
2090    }
2091
2092    /// Sets the origin identifier for this commit.
2093    pub fn origin(mut self, origin: &str) -> Self {
2094        self.origin = Some(origin.into());
2095        self
2096    }
2097
2098    /// Sets whether to immediately start a new transaction after committing.
2099    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2100        self.immediate_renew = immediate_renew;
2101        self
2102    }
2103
2104    /// Set the timestamp of the commit.
2105    ///
2106    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
2107    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2108        self.timestamp = Some(timestamp);
2109        self
2110    }
2111
2112    /// Sets a commit message to be attached to the changes.
2113    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2114        self.commit_msg = Some(commit_msg.into());
2115        self
2116    }
2117
2118    /// Sets the origin identifier for this commit.
2119    pub fn set_origin(&mut self, origin: Option<&str>) {
2120        self.origin = origin.map(|x| x.into())
2121    }
2122
2123    /// Sets the timestamp for this commit.
2124    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2125        self.timestamp = timestamp;
2126    }
2127}
2128
2129impl Default for CommitOptions {
2130    fn default() -> Self {
2131        Self::new()
2132    }
2133}
2134
2135#[cfg(test)]
2136mod test {
2137    use crate::{cursor::PosType, loro::ExportMode, version::Frontiers, LoroDoc, ToJson};
2138    use loro_common::ID;
2139
2140    #[test]
2141    fn test_sync() {
2142        fn is_send_sync<T: Send + Sync>(_v: T) {}
2143        let loro = super::LoroDoc::new();
2144        is_send_sync(loro)
2145    }
2146
2147    #[test]
2148    fn test_checkout() {
2149        let loro = LoroDoc::new();
2150        loro.set_peer_id(1).unwrap();
2151        let text = loro.get_text("text");
2152        let map = loro.get_map("map");
2153        let list = loro.get_list("list");
2154        let mut txn = loro.txn().unwrap();
2155        for i in 0..10 {
2156            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2157            text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2158                .unwrap();
2159            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2160        }
2161        txn.commit().unwrap();
2162        let b = LoroDoc::new();
2163        b.import(&loro.export(ExportMode::Snapshot).unwrap())
2164            .unwrap();
2165        loro.checkout(&Frontiers::default()).unwrap();
2166        {
2167            let json = &loro.get_deep_value();
2168            assert_eq!(
2169                json.to_json_value(),
2170                serde_json::json!({"text":"","list":[],"map":{}})
2171            );
2172        }
2173
2174        b.checkout(&ID::new(1, 2).into()).unwrap();
2175        {
2176            let json = &b.get_deep_value();
2177            assert_eq!(
2178                json.to_json_value(),
2179                serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2180            );
2181        }
2182
2183        loro.checkout(&ID::new(1, 3).into()).unwrap();
2184        {
2185            let json = &loro.get_deep_value();
2186            assert_eq!(
2187                json.to_json_value(),
2188                serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2189            );
2190        }
2191
2192        b.checkout(&ID::new(1, 29).into()).unwrap();
2193        {
2194            let json = &b.get_deep_value();
2195            assert_eq!(
2196                json.to_json_value(),
2197                serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2198            );
2199        }
2200    }
2201
2202    #[test]
2203    fn import_batch_err_181() {
2204        let a = LoroDoc::new_auto_commit();
2205        let update_a = a.export(ExportMode::Snapshot);
2206        let b = LoroDoc::new_auto_commit();
2207        b.import_batch(&[update_a.unwrap()]).unwrap();
2208        b.get_text("text")
2209            .insert(0, "hello", PosType::Unicode)
2210            .unwrap();
2211        b.commit_then_renew();
2212        let oplog = b.oplog().lock().unwrap();
2213        drop(oplog);
2214        b.export(ExportMode::all_updates()).unwrap();
2215    }
2216}