Skip to main content

loro_internal/
loro.rs

1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8    arena::SharedArena,
9    change::Timestamp,
10    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11    container::{
12        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13        IntoContainerId,
14    },
15    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16    dag::{Dag, DagUtils},
17    diff_calc::DiffCalculator,
18    encoding::{
19        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21        export_state_only_snapshot,
22        json_schema::{encode_change_to_json, json::JsonSchema},
23        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24    },
25    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27    id::PeerID,
28    json::JsonChange,
29    op::InnerContent,
30    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31    state::DocState,
32    subscription::{LocalUpdateCallback, Observer, Subscriber},
33    undo::DiffBatch,
34    utils::subscription::{SubscriberSetWithQueue, Subscription},
35    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37    VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42    lock::{LoroLockGroup, LoroMutex},
43    txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47    ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48    LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53    borrow::Cow,
54    cmp::Ordering,
55    collections::{hash_map::Entry, BinaryHeap},
56    ops::ControlFlow,
57    sync::{
58        atomic::Ordering::{Acquire, Release},
59        Arc,
60    },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("LoroDoc")
73            .field("config", &self.config)
74            .field("auto_commit", &self.auto_commit)
75            .field("detached", &self.detached)
76            .finish()
77    }
78}
79
80impl LoroDoc {
81    /// Run the provided closure within a commit barrier.
82    ///
83    /// This finalizes any pending auto-commit transaction first (preserving
84    /// options across an empty txn), executes `f`, then renews the transaction
85    /// (carrying preserved options) if auto-commit is enabled. This is the
86    /// common implicit-commit pattern used by internal operations such as
87    /// import/export/checkouts.
88    #[inline]
89    pub fn with_barrier<F, R>(&self, f: F) -> R
90    where
91        F: FnOnce() -> R,
92    {
93        let (options, guard) = self.implicit_commit_then_stop();
94        let result = f();
95        drop(guard);
96        self.renew_txn_if_auto_commit(options);
97        result
98    }
99
100    pub fn new() -> Self {
101        let oplog = OpLog::new();
102        let arena = oplog.arena.clone();
103        let config: Configure = oplog.configure.clone();
104        let lock_group = LoroLockGroup::new();
105        let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
106        let inner = Arc::new_cyclic(|w| {
107            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
108            LoroDocInner {
109                oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
110                state,
111                config,
112                detached: AtomicBool::new(false),
113                auto_commit: AtomicBool::new(false),
114                observer: Arc::new(Observer::new(arena.clone())),
115                diff_calculator: Arc::new(
116                    lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
117                ),
118                txn: global_txn,
119                arena,
120                local_update_subs: SubscriberSetWithQueue::new(),
121                peer_id_change_subs: SubscriberSetWithQueue::new(),
122                pre_commit_subs: SubscriberSetWithQueue::new(),
123                first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
124            }
125        });
126        LoroDoc { inner }
127    }
128
129    pub fn fork(&self) -> Self {
130        if self.is_detached() {
131            return self
132                .fork_at(&self.state_frontiers())
133                .expect("fork_at on detached doc should not fail");
134        }
135
136        let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
137        let doc = Self::new();
138        doc.with_barrier(|| {
139            encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
140        })
141        .unwrap();
142        doc.set_config(&self.config);
143        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
144            doc.start_auto_commit();
145        }
146        doc
147    }
148    /// Enables editing of the document in detached mode.
149    ///
150    /// By default, the document cannot be edited in detached mode (after calling
151    /// `detach` or checking out a version other than the latest). This method
152    /// allows editing in detached mode.
153    ///
154    /// # Important Notes:
155    ///
156    /// - After enabling this mode, the document will use a different PeerID. Each
157    ///   time you call checkout, a new PeerID will be used.
158    /// - If you set a custom PeerID while this mode is enabled, ensure that
159    ///   concurrent operations with the same PeerID are not possible.
160    /// - On detached mode, importing will not change the state of the document.
161    ///   It also doesn't change the version of the [DocState]. The changes will be
162    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
163    pub fn set_detached_editing(&self, enable: bool) {
164        self.config.set_detached_editing(enable);
165        if enable && self.is_detached() {
166            self.with_barrier(|| {
167                self.renew_peer_id();
168            });
169        }
170    }
171
172    /// Create a doc with auto commit enabled.
173    #[inline]
174    pub fn new_auto_commit() -> Self {
175        let doc = Self::new();
176        doc.start_auto_commit();
177        doc
178    }
179
180    #[inline(always)]
181    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
182        if peer == PeerID::MAX {
183            return Err(LoroError::InvalidPeerID);
184        }
185        let next_id = self.oplog.lock().next_id(peer);
186        if self.auto_commit.load(Acquire) {
187            let doc_state = self.state.lock();
188            doc_state
189                .peer
190                .store(peer, std::sync::atomic::Ordering::Relaxed);
191
192            if doc_state.is_in_txn() {
193                drop(doc_state);
194                // Use implicit-style barrier to avoid swallowing next-commit options
195                self.with_barrier(|| {});
196            }
197            self.peer_id_change_subs.emit(&(), next_id);
198            return Ok(());
199        }
200
201        let doc_state = self.state.lock();
202        if doc_state.is_in_txn() {
203            return Err(LoroError::TransactionError(
204                "Cannot change peer id during transaction"
205                    .to_string()
206                    .into_boxed_str(),
207            ));
208        }
209
210        doc_state
211            .peer
212            .store(peer, std::sync::atomic::Ordering::Relaxed);
213        drop(doc_state);
214        self.peer_id_change_subs.emit(&(), next_id);
215        Ok(())
216    }
217
218    /// Renews the PeerID for the document.
219    pub(crate) fn renew_peer_id(&self) {
220        let peer_id = DefaultRandom.next_u64();
221        self.set_peer_id(peer_id).unwrap();
222    }
223
224    /// Implicitly commit the cumulative auto-commit transaction.
225    /// This method only has effect when `auto_commit` is true.
226    ///
227    /// Follow-ups: the caller is responsible for renewing the transaction
228    /// as needed (e.g., via `renew_txn_if_auto_commit`). Prefer using
229    /// `with_barrier(...)` for most internal flows to handle this safely.
230    ///
231    /// Empty-commit behavior: if the pending transaction is empty, the returned
232    /// `Some(CommitOptions)` preserves next-commit options such as message and
233    /// timestamp so they can carry into the renewed transaction. Transient
234    /// labels like `origin` do not carry across an empty commit.
235    #[inline]
236    #[must_use]
237    pub fn implicit_commit_then_stop(
238        &self,
239    ) -> (
240        Option<CommitOptions>,
241        LoroMutexGuard<'_, Option<Transaction>>,
242    ) {
243        // Implicit commit: preserve options on empty commit
244        let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
245        (a, b.unwrap())
246    }
247
248    /// Commit the cumulative auto commit transaction.
249    /// It will start the next one immediately
250    ///
251    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
252    #[inline]
253    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
254        // Explicit commit: swallow options on empty commit
255        self.commit_internal(CommitOptions::new().immediate_renew(true), false)
256            .0
257    }
258
259    /// This method is called before the commit.
260    /// It can be used to modify the change before it is committed.
261    ///
262    /// It return Some(txn) if the txn is None
263    fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
264        let mut txn_guard = self.txn.lock();
265        let Some(txn) = txn_guard.as_mut() else {
266            return Some(txn_guard);
267        };
268
269        if txn.is_peer_first_appearance {
270            txn.is_peer_first_appearance = false;
271            drop(txn_guard);
272            // First commit from a peer
273            self.first_commit_from_peer_subs.emit(
274                &(),
275                FirstCommitFromPeerPayload {
276                    peer: self.peer_id(),
277                },
278            );
279        }
280
281        None
282    }
283
284    /// Core implementation for committing the cumulative auto-commit transaction.
285    ///
286    /// - When `preserve_on_empty` is true (implicit commits like export/checkout),
287    ///   commit options from an empty transaction are carried over to the next transaction
288    ///   (except `origin`, which never carries across an empty commit).
289    /// - When `preserve_on_empty` is false (explicit commits), commit options from an
290    ///   empty transaction are swallowed and NOT carried over.
291    #[instrument(skip_all)]
292    fn commit_internal(
293        &self,
294        config: CommitOptions,
295        preserve_on_empty: bool,
296    ) -> (
297        Option<CommitOptions>,
298        Option<LoroMutexGuard<'_, Option<Transaction>>>,
299    ) {
300        if !self.auto_commit.load(Acquire) {
301            let txn_guard = self.txn.lock();
302            // if not auto_commit, nothing should happen
303            // because the global txn is not used
304            return (None, Some(txn_guard));
305        }
306
307        loop {
308            if let Some(txn_guard) = self.before_commit() {
309                return (None, Some(txn_guard));
310            }
311
312            let mut txn_guard = self.txn.lock();
313            let txn = txn_guard.take();
314            let Some(mut txn) = txn else {
315                return (None, Some(txn_guard));
316            };
317            let on_commit = txn.take_on_commit();
318            if let Some(origin) = config.origin.clone() {
319                txn.set_origin(origin);
320            }
321
322            if let Some(timestamp) = config.timestamp {
323                txn.set_timestamp(timestamp);
324            }
325
326            if let Some(msg) = config.commit_msg.as_ref() {
327                txn.set_msg(Some(msg.clone()));
328            }
329
330            let id_span = txn.id_span();
331            let mut options = txn.commit().unwrap();
332            // Empty commit returns Some(options). We may preserve parts of it for implicit commits.
333            if let Some(opts) = options.as_mut() {
334                // `origin` is an event-only label and never carries across an empty commit
335                if config.origin.is_some() {
336                    opts.set_origin(None);
337                }
338                // For explicit commits, swallow options from empty commit entirely
339                if !preserve_on_empty {
340                    options = None;
341                }
342            }
343            if config.immediate_renew {
344                assert!(self.can_edit());
345                let mut t = self.txn().unwrap();
346                if let Some(options) = options.as_ref() {
347                    t.set_options(options.clone());
348                }
349                *txn_guard = Some(t);
350            }
351
352            if let Some(on_commit) = on_commit {
353                drop(txn_guard);
354                on_commit(&self.state, &self.oplog, id_span);
355                txn_guard = self.txn.lock();
356                if !config.immediate_renew && txn_guard.is_some() {
357                    // make sure that txn_guard is None when config.immediate_renew is false
358                    continue;
359                }
360            }
361
362            return (
363                options,
364                if !config.immediate_renew {
365                    Some(txn_guard)
366                } else {
367                    None
368                },
369            );
370        }
371    }
372
373    /// Commit the cumulative auto commit transaction (explicit API).
374    ///
375    /// This is used by user-facing explicit commits. If the transaction is empty,
376    /// any provided commit options are swallowed and will NOT carry over.
377    #[instrument(skip_all)]
378    pub fn commit_with(
379        &self,
380        config: CommitOptions,
381    ) -> (
382        Option<CommitOptions>,
383        Option<LoroMutexGuard<'_, Option<Transaction>>>,
384    ) {
385        self.commit_internal(config, false)
386    }
387
388    /// Set the commit message of the next commit
389    pub fn set_next_commit_message(&self, message: &str) {
390        let mut binding = self.txn.lock();
391        let Some(txn) = binding.as_mut() else {
392            return;
393        };
394
395        if message.is_empty() {
396            txn.set_msg(None)
397        } else {
398            txn.set_msg(Some(message.into()))
399        }
400    }
401
402    /// Set the origin of the next commit
403    pub fn set_next_commit_origin(&self, origin: &str) {
404        let mut txn = self.txn.lock();
405        if let Some(txn) = txn.as_mut() {
406            txn.set_origin(origin.into());
407        }
408    }
409
410    /// Set the timestamp of the next commit
411    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
412        let mut txn = self.txn.lock();
413        if let Some(txn) = txn.as_mut() {
414            txn.set_timestamp(timestamp);
415        }
416    }
417
418    /// Set the options of the next commit
419    pub fn set_next_commit_options(&self, options: CommitOptions) {
420        let mut txn = self.txn.lock();
421        if let Some(txn) = txn.as_mut() {
422            txn.set_options(options);
423        }
424    }
425
426    /// Clear the options of the next commit
427    pub fn clear_next_commit_options(&self) {
428        let mut txn = self.txn.lock();
429        if let Some(txn) = txn.as_mut() {
430            txn.set_options(CommitOptions::new());
431        }
432    }
433
434    /// Set whether to record the timestamp of each change. Default is `false`.
435    ///
436    /// If enabled, the Unix timestamp will be recorded for each change automatically.
437    ///
438    /// You can also set each timestamp manually when you commit a change.
439    /// The timestamp manually set will override the automatic one.
440    ///
441    /// NOTE: Timestamps are forced to be in ascending order.
442    /// If you commit a new change with a timestamp that is less than the existing one,
443    /// the largest existing timestamp will be used instead.
444    #[inline]
445    pub fn set_record_timestamp(&self, record: bool) {
446        self.config.set_record_timestamp(record);
447    }
448
449    /// Set the interval of mergeable changes, in seconds.
450    ///
451    /// If two continuous local changes are within the interval, they will be merged into one change.
452    /// The default value is 1000 seconds.
453    #[inline]
454    pub fn set_change_merge_interval(&self, interval: i64) {
455        self.config.set_merge_interval(interval);
456    }
457
458    pub fn can_edit(&self) -> bool {
459        !self.is_detached() || self.config.detached_editing()
460    }
461
462    pub fn is_detached_editing_enabled(&self) -> bool {
463        self.config.detached_editing()
464    }
465
466    #[inline]
467    pub fn config_text_style(&self, text_style: StyleConfigMap) {
468        self.config.text_style_config.write().map = text_style.map;
469    }
470
471    #[inline]
472    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
473        self.config.text_style_config.write().default_style = text_style;
474    }
475    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
476        let doc = Self::new();
477        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
478        if mode.is_snapshot() {
479            doc.with_barrier(|| -> Result<(), LoroError> {
480                decode_snapshot(&doc, mode, body, Default::default())?;
481                Ok(())
482            })?;
483            Ok(doc)
484        } else {
485            Err(LoroError::DecodeError(
486                "Invalid encode mode".to_string().into(),
487            ))
488        }
489    }
490
491    /// Is the document empty? (no ops)
492    #[inline(always)]
493    pub fn can_reset_with_snapshot(&self) -> bool {
494        let oplog = self.oplog.lock();
495        if oplog.batch_importing {
496            return false;
497        }
498
499        if self.is_detached() {
500            return false;
501        }
502
503        oplog.is_empty() && self.state.lock().can_import_snapshot()
504    }
505
506    /// Whether [OpLog] and [DocState] are detached.
507    ///
508    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
509    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
510    /// You need to call `checkout` to make it take effect.
511    #[inline(always)]
512    pub fn is_detached(&self) -> bool {
513        self.detached.load(Acquire)
514    }
515
516    pub(crate) fn set_detached(&self, detached: bool) {
517        self.detached.store(detached, Release);
518    }
519
520    #[inline(always)]
521    pub fn peer_id(&self) -> PeerID {
522        self.state
523            .lock()
524            .peer
525            .load(std::sync::atomic::Ordering::Relaxed)
526    }
527
528    #[inline(always)]
529    pub fn detach(&self) {
530        self.with_barrier(|| self.set_detached(true));
531    }
532
533    #[inline(always)]
534    pub fn attach(&self) {
535        self.checkout_to_latest()
536    }
537
538    /// Get the timestamp of the current state.
539    /// It's the last edit time of the [DocState].
540    pub fn state_timestamp(&self) -> Timestamp {
541        // Acquire locks in correct order: read frontiers first, then query OpLog.
542        let f = { self.state.lock().frontiers.clone() };
543        self.oplog.lock().get_timestamp_of_version(&f)
544    }
545
546    #[inline(always)]
547    pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
548        &self.state
549    }
550
551    #[inline]
552    pub fn get_state_deep_value(&self) -> LoroValue {
553        self.state.lock().get_deep_value()
554    }
555
556    #[inline(always)]
557    pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
558        &self.oplog
559    }
560
561    #[inline(always)]
562    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
563        let s = debug_span!("import", peer = self.peer_id());
564        let _e = s.enter();
565        self.import_with(bytes, Default::default())
566    }
567
568    #[inline]
569    pub fn import_with(
570        &self,
571        bytes: &[u8],
572        origin: InternalString,
573    ) -> Result<ImportStatus, LoroError> {
574        self.with_barrier(|| self._import_with(bytes, origin))
575    }
576
577    #[tracing::instrument(skip_all)]
578    fn _import_with(
579        &self,
580        bytes: &[u8],
581        origin: InternalString,
582    ) -> Result<ImportStatus, LoroError> {
583        ensure_cov::notify_cov("loro_internal::import");
584        let parsed = parse_header_and_body(bytes, true)?;
585        loro_common::info!("Importing with mode={:?}", &parsed.mode);
586        let result = match parsed.mode {
587            EncodeMode::OutdatedRle => {
588                if self.state.lock().is_in_txn() {
589                    return Err(LoroError::ImportWhenInTxn);
590                }
591
592                let s = tracing::span!(
593                    tracing::Level::INFO,
594                    "Import updates ",
595                    peer = self.peer_id()
596                );
597                let _e = s.enter();
598                self.update_oplog_and_apply_delta_to_state_if_needed(
599                    |oplog| oplog.decode(parsed),
600                    origin,
601                )
602            }
603            EncodeMode::OutdatedSnapshot => {
604                if self.can_reset_with_snapshot() {
605                    loro_common::info!("Init by snapshot {}", self.peer_id());
606                    decode_snapshot(self, parsed.mode, parsed.body, origin)
607                } else {
608                    self.update_oplog_and_apply_delta_to_state_if_needed(
609                        |oplog| oplog.decode(parsed),
610                        origin,
611                    )
612                }
613            }
614            EncodeMode::FastSnapshot => {
615                if self.can_reset_with_snapshot() {
616                    ensure_cov::notify_cov("loro_internal::import::snapshot");
617                    loro_common::info!("Init by fast snapshot {}", self.peer_id());
618                    decode_snapshot(self, parsed.mode, parsed.body, origin)
619                } else {
620                    self.update_oplog_and_apply_delta_to_state_if_needed(
621                        |oplog| oplog.decode(parsed),
622                        origin,
623                    )
624
625                    // let new_doc = LoroDoc::new();
626                    // new_doc.import(bytes)?;
627                    // let updates = new_doc.export(ExportMode::updates(&self.oplog_vv())).unwrap();
628                    // return self.import_with(updates.as_slice(), origin);
629                }
630            }
631            EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
632                |oplog| oplog.decode(parsed),
633                origin,
634            ),
635            EncodeMode::Auto => {
636                unreachable!()
637            }
638        };
639
640        self.emit_events();
641
642        result
643    }
644
645    #[tracing::instrument(skip_all)]
646    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
647        &self,
648        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
649        origin: InternalString,
650    ) -> Result<ImportStatus, LoroError> {
651        let mut oplog = self.oplog.lock();
652        if !self.is_detached() {
653            let old_vv = oplog.vv().clone();
654            let old_frontiers = oplog.frontiers().clone();
655            let result = f(&mut oplog);
656            if &old_vv != oplog.vv() {
657                let mut diff = DiffCalculator::new(false);
658                let (diff, diff_mode) = diff.calc_diff_internal(
659                    &oplog,
660                    &old_vv,
661                    &old_frontiers,
662                    oplog.vv(),
663                    oplog.dag.get_frontiers(),
664                    None,
665                );
666                let mut state = self.state.lock();
667                state.apply_diff(
668                    InternalDocDiff {
669                        origin,
670                        diff: (diff).into(),
671                        by: EventTriggerKind::Import,
672                        new_version: Cow::Owned(oplog.frontiers().clone()),
673                    },
674                    diff_mode,
675                )?;
676            }
677            result
678        } else {
679            f(&mut oplog)
680        }
681    }
682
683    fn emit_events(&self) {
684        // we should not hold the lock when emitting events
685        let events = {
686            let mut state = self.state.lock();
687            state.take_events()
688        };
689        for event in events {
690            self.observer.emit(event);
691        }
692    }
693
694    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
695        let mut state = self.state.lock();
696        state.take_events()
697    }
698
699    /// Import the json schema updates.
700    ///
701    /// only supports backward compatibility but not forward compatibility.
702    #[tracing::instrument(skip_all)]
703    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
704        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
705        self.with_barrier(|| {
706            let result = self.update_oplog_and_apply_delta_to_state_if_needed(
707                |oplog| crate::encoding::json_schema::import_json(oplog, json),
708                Default::default(),
709            );
710            self.emit_events();
711            result
712        })
713    }
714
715    pub fn export_json_updates(
716        &self,
717        start_vv: &VersionVector,
718        end_vv: &VersionVector,
719        with_peer_compression: bool,
720    ) -> JsonSchema {
721        self.with_barrier(|| {
722            let oplog = self.oplog.lock();
723            let mut start_vv = start_vv;
724            let _temp: Option<VersionVector>;
725            if !oplog.dag.shallow_since_vv().is_empty() {
726                // Make sure that start_vv >= shallow_since_vv
727                let mut include_all = true;
728                for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
729                    if start_vv.get(peer).unwrap_or(&0) < counter {
730                        include_all = false;
731                        break;
732                    }
733                }
734                if !include_all {
735                    let mut vv = start_vv.clone();
736                    for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
737                        vv.extend_to_include_end_id(ID::new(peer, counter));
738                    }
739                    _temp = Some(vv);
740                    start_vv = _temp.as_ref().unwrap();
741                }
742            }
743
744            crate::encoding::json_schema::export_json(
745                &oplog,
746                start_vv,
747                end_vv,
748                with_peer_compression,
749            )
750        })
751    }
752
753    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
754        let oplog = self.oplog.lock();
755        let mut changes = export_json_in_id_span(&oplog, id_span);
756        if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
757            let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
758            changes.push(change_json);
759        }
760        changes
761    }
762
763    /// Get the version vector of the current OpLog
764    #[inline]
765    pub fn oplog_vv(&self) -> VersionVector {
766        self.oplog.lock().vv().clone()
767    }
768
769    /// Get the version vector of the current [DocState]
770    #[inline]
771    pub fn state_vv(&self) -> VersionVector {
772        let oplog = self.oplog.lock();
773        let f = &self.state.lock().frontiers;
774        oplog.dag.frontiers_to_vv(f).unwrap()
775    }
776
777    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
778        let value: LoroValue = self.state.lock().get_value_by_path(path)?;
779        if let LoroValue::Container(c) = value {
780            Some(ValueOrHandler::Handler(Handler::new_attached(
781                c.clone(),
782                self.clone(),
783            )))
784        } else {
785            Some(ValueOrHandler::Value(value))
786        }
787    }
788
789    /// Get the handler by the string path.
790    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
791        let path = str_to_path(path)?;
792        self.get_by_path(&path)
793    }
794
795    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
796        let arena = &self.arena;
797        let txn = self.txn.lock();
798        let txn = txn.as_ref()?;
799        let ops_ = txn.local_ops();
800        let new_id = ID {
801            peer: *txn.peer(),
802            counter: ops_.first()?.counter,
803        };
804        let change = ChangeRef {
805            id: &new_id,
806            deps: txn.frontiers(),
807            timestamp: &txn
808                .timestamp()
809                .as_ref()
810                .copied()
811                .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
812            commit_msg: txn.msg(),
813            ops: ops_,
814            lamport: txn.lamport(),
815        };
816        let json = encode_change_to_json(change, arena);
817        Some(json)
818    }
819
820    #[inline]
821    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
822        if self.has_container(&id) {
823            Some(Handler::new_attached(id, self.clone()))
824        } else {
825            None
826        }
827    }
828
829    /// id can be a str, ContainerID, or ContainerIdRaw.
830    /// if it's str it will use Root container, which will not be None
831    #[inline]
832    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
833        let id = id.into_container_id(&self.arena, ContainerType::Text);
834        assert!(self.has_container(&id));
835        Handler::new_attached(id, self.clone()).into_text().unwrap()
836    }
837
838    /// id can be a str, ContainerID, or ContainerIdRaw.
839    /// if it's str it will use Root container, which will not be None
840    #[inline]
841    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
842        let id = id.into_container_id(&self.arena, ContainerType::List);
843        assert!(self.has_container(&id));
844        Handler::new_attached(id, self.clone()).into_list().unwrap()
845    }
846
847    /// id can be a str, ContainerID, or ContainerIdRaw.
848    /// if it's str it will use Root container, which will not be None
849    #[inline]
850    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
851        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
852        assert!(self.has_container(&id));
853        Handler::new_attached(id, self.clone())
854            .into_movable_list()
855            .unwrap()
856    }
857
858    /// id can be a str, ContainerID, or ContainerIdRaw.
859    /// if it's str it will use Root container, which will not be None
860    #[inline]
861    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
862        let id = id.into_container_id(&self.arena, ContainerType::Map);
863        assert!(self.has_container(&id));
864        Handler::new_attached(id, self.clone()).into_map().unwrap()
865    }
866
867    /// id can be a str, ContainerID, or ContainerIdRaw.
868    /// if it's str it will use Root container, which will not be None
869    #[inline]
870    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
871        let id = id.into_container_id(&self.arena, ContainerType::Tree);
872        assert!(self.has_container(&id));
873        Handler::new_attached(id, self.clone()).into_tree().unwrap()
874    }
875
876    #[cfg(feature = "counter")]
877    pub fn get_counter<I: IntoContainerId>(
878        &self,
879        id: I,
880    ) -> crate::handler::counter::CounterHandler {
881        let id = id.into_container_id(&self.arena, ContainerType::Counter);
882        assert!(self.has_container(&id));
883        Handler::new_attached(id, self.clone())
884            .into_counter()
885            .unwrap()
886    }
887
888    #[must_use]
889    pub fn has_container(&self, id: &ContainerID) -> bool {
890        if id.is_root() {
891            return true;
892        }
893
894        let exist = self.state.lock().does_container_exist(id);
895        exist
896    }
897
898    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
899    ///
900    /// This is an internal API. You should NOT use it directly.
901    ///
902    /// # Internal
903    ///
904    /// This method will use the diff calculator to calculate the diff required to time travel
905    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
906    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
907    ///
908    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
909    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
910    /// distance from id_span to the current latest version.
911    #[instrument(level = "info", skip_all)]
912    pub fn undo_internal(
913        &self,
914        id_span: IdSpan,
915        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
916        post_transform_base: Option<&DiffBatch>,
917        before_diff: &mut dyn FnMut(&DiffBatch),
918    ) -> LoroResult<CommitWhenDrop<'_>> {
919        if !self.can_edit() {
920            return Err(LoroError::EditWhenDetached);
921        }
922
923        let (options, txn) = self.implicit_commit_then_stop();
924        if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
925            self.renew_txn_if_auto_commit(options);
926            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
927        }
928
929        let (was_recording, latest_frontiers) = {
930            let mut state = self.state.lock();
931            let was_recording = state.is_recording();
932            state.stop_and_clear_recording();
933            (was_recording, state.frontiers.clone())
934        };
935
936        let spans = self.oplog.lock().split_span_based_on_deps(id_span);
937        let diff = crate::undo::undo(
938            spans,
939            match post_transform_base {
940                Some(d) => Either::Right(d),
941                None => Either::Left(&latest_frontiers),
942            },
943            |from, to| {
944                self._checkout_without_emitting(from, false, false).unwrap();
945                self.state.lock().start_recording();
946                self._checkout_without_emitting(to, false, false).unwrap();
947                let mut state = self.state.lock();
948                let e = state.take_events();
949                state.stop_and_clear_recording();
950                DiffBatch::new(e)
951            },
952            before_diff,
953        );
954
955        // println!("\nundo_internal: diff: {:?}", diff);
956        // println!("container remap: {:?}", container_remap);
957
958        self._checkout_without_emitting(&latest_frontiers, false, false)?;
959        self.set_detached(false);
960        if was_recording {
961            self.state.lock().start_recording();
962        }
963        drop(txn);
964        self.start_auto_commit();
965        // Try applying the diff, but ignore the error if it happens.
966        // MovableList's undo behavior is too tricky to handle in a collaborative env
967        // so in edge cases this may be an Error
968        if let Err(e) = self._apply_diff(diff, container_remap, true) {
969            warn!("Undo Failed {:?}", e);
970        }
971
972        if let Some(options) = options {
973            self.set_next_commit_options(options);
974        }
975        Ok(CommitWhenDrop {
976            doc: self,
977            default_options: CommitOptions::new().origin("undo"),
978        })
979    }
980
981    /// Generate a series of local operations that can revert the current doc to the target
982    /// version.
983    ///
984    /// Internally, it will calculate the diff between the current state and the target state,
985    /// and apply the diff to the current state.
986    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
987        // TODO: test when the doc is readonly
988        // TODO: test when the doc is detached but enabled editing
989        let f = self.state_frontiers();
990        let diff = self.diff(&f, target)?;
991        self._apply_diff(diff, &mut Default::default(), false)
992    }
993
994    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
995    ///
996    /// NOTE: This method will make the doc enter the **detached mode**.
997    // FIXME: This method needs testing (no event should be emitted during processing this)
998    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
999        {
1000            // Check whether a and b are valid before checkout so this returns a normal error
1001            // instead of panicking on shallow docs.
1002            let oplog = self.oplog.lock();
1003            let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1004                for id in frontiers.iter() {
1005                    if !oplog.dag.contains(id) {
1006                        return Err(LoroError::FrontiersNotFound(id));
1007                    }
1008                }
1009
1010                if oplog.dag.is_before_shallow_root(frontiers) {
1011                    return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1012                }
1013
1014                Ok(())
1015            };
1016
1017            validate_frontiers(a)?;
1018            validate_frontiers(b)?;
1019        }
1020
1021        let (options, txn) = self.implicit_commit_then_stop();
1022        let was_detached = self.is_detached();
1023        let old_frontiers = self.state_frontiers();
1024        let was_recording = {
1025            let mut state = self.state.lock();
1026            let is_recording = state.is_recording();
1027            state.stop_and_clear_recording();
1028            is_recording
1029        };
1030        self._checkout_without_emitting(a, true, false).unwrap();
1031        self.state.lock().start_recording();
1032        self._checkout_without_emitting(b, true, false).unwrap();
1033        let e = {
1034            let mut state = self.state.lock();
1035            let e = state.take_events();
1036            state.stop_and_clear_recording();
1037            e
1038        };
1039        self._checkout_without_emitting(&old_frontiers, false, false)
1040            .unwrap();
1041        drop(txn);
1042        if !was_detached {
1043            self.set_detached(false);
1044            self.renew_txn_if_auto_commit(options);
1045        }
1046        if was_recording {
1047            self.state.lock().start_recording();
1048        }
1049        Ok(DiffBatch::new(e))
1050    }
1051
1052    /// Apply a diff to the current state.
1053    #[inline(always)]
1054    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1055        self._apply_diff(diff, &mut Default::default(), true)
1056    }
1057
1058    /// Apply a diff to the current state.
1059    ///
1060    /// This method will not recreate containers with the same [ContainerID]s.
1061    /// While this can be convenient in certain cases, it can break several internal invariants:
1062    ///
1063    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
1064    ///    would result in multiple instances of the same container in the document.
1065    /// 2. Unreachable containers should be removable from the state when necessary.
1066    ///
1067    /// However, the diff may contain operations that depend on container IDs.
1068    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
1069    pub(crate) fn _apply_diff(
1070        &self,
1071        diff: DiffBatch,
1072        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1073        skip_unreachable: bool,
1074    ) -> LoroResult<()> {
1075        if !self.can_edit() {
1076            return Err(LoroError::EditWhenDetached);
1077        }
1078
1079        let mut ans: LoroResult<()> = Ok(());
1080        let mut missing_containers: Vec<ContainerID> = Vec::new();
1081        for (mut id, diff) in diff.into_iter() {
1082            let mut remapped = false;
1083            while let Some(rid) = container_remap.get(&id) {
1084                remapped = true;
1085                id = rid.clone();
1086            }
1087
1088            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1089                // Not in arena does not imply non-existent; consult state/kv and register lazily
1090                let exists = self.state.lock().does_container_exist(&id);
1091                if !exists {
1092                    missing_containers.push(id);
1093                    continue;
1094                }
1095                // Ensure registration so handlers can be created
1096                self.state.lock().ensure_container(&id);
1097            }
1098
1099            if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1100                continue;
1101            }
1102
1103            let Some(h) = self.get_handler(id.clone()) else {
1104                return Err(LoroError::ContainersNotFound {
1105                    containers: Box::new(vec![id]),
1106                });
1107            };
1108            if let Err(e) = h.apply_diff(diff, container_remap) {
1109                ans = Err(e);
1110            }
1111        }
1112
1113        if !missing_containers.is_empty() {
1114            return Err(LoroError::ContainersNotFound {
1115                containers: Box::new(missing_containers),
1116            });
1117        }
1118
1119        ans
1120    }
1121
1122    /// This is for debugging purpose. It will travel the whole oplog
1123    #[inline]
1124    pub fn diagnose_size(&self) {
1125        self.oplog().lock().diagnose_size();
1126    }
1127
1128    #[inline]
1129    pub fn oplog_frontiers(&self) -> Frontiers {
1130        self.oplog().lock().frontiers().clone()
1131    }
1132
1133    #[inline]
1134    pub fn state_frontiers(&self) -> Frontiers {
1135        self.state.lock().frontiers.clone()
1136    }
1137
1138    /// - Ordering::Less means self is less than target or parallel
1139    /// - Ordering::Equal means versions equal
1140    /// - Ordering::Greater means self's version is greater than target
1141    #[inline]
1142    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1143        self.oplog().lock().cmp_with_frontiers(other)
1144    }
1145
1146    /// Compare two [Frontiers] causally.
1147    ///
1148    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1149    #[inline]
1150    pub fn cmp_frontiers(
1151        &self,
1152        a: &Frontiers,
1153        b: &Frontiers,
1154    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1155        self.oplog().lock().cmp_frontiers(a, b)
1156    }
1157
1158    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1159        let mut state = self.state.lock();
1160        if !state.is_recording() {
1161            state.start_recording();
1162        }
1163
1164        self.observer.subscribe_root(callback)
1165    }
1166
1167    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1168        let mut state = self.state.lock();
1169        if !state.is_recording() {
1170            state.start_recording();
1171        }
1172
1173        self.observer.subscribe(container_id, callback)
1174    }
1175
1176    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1177        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1178        activate();
1179        sub
1180    }
1181
1182    // PERF: opt
1183    #[tracing::instrument(skip_all)]
1184    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1185        if bytes.is_empty() {
1186            return Ok(ImportStatus::default());
1187        }
1188
1189        if bytes.len() == 1 {
1190            return self.import(&bytes[0]);
1191        }
1192
1193        let mut success = VersionRange::default();
1194        let mut pending = VersionRange::default();
1195        let mut meta_arr = bytes
1196            .iter()
1197            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1198            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1199        meta_arr.sort_by(|a, b| {
1200            a.0.mode
1201                .cmp(&b.0.mode)
1202                .then(b.0.change_num.cmp(&a.0.change_num))
1203        });
1204
1205        let (options, txn) = self.implicit_commit_then_stop();
1206        // Why we should keep locking `txn` here
1207        //
1208        // In a multi-threaded environment, `import_batch` used to drop the txn lock
1209        // (via `commit_then_stop` + `drop(txn)`) and call `detach()`/`checkout_to_latest()`
1210        // around the batch import. That created a race where another thread could
1211        // start or renew the auto-commit txn and perform local edits while we were
1212        // importing and temporarily detached. Those interleaved local edits could
1213        // violate invariants between `OpLog` and `DocState` (e.g., state being
1214        // updated when we expect it not to, missed events, or inconsistent
1215        // frontiers), as exposed by the loom test `local_edits_during_batch_import`.
1216        //
1217        // The fix is to hold the txn mutex for the entire critical section:
1218        // - Stop the current txn and keep the mutex guard.
1219        // - Force-detach with `set_detached(true)` (avoids `detach()` side effects),
1220        //   then run each `_import_with(...)` while detached so imports only touch
1221        //   the `OpLog`.
1222        // - After importing, reattach by checking out to latest and renew the txn
1223        //   using `_checkout_to_latest_with_guard`, which keeps the mutex held while
1224        //   (re)starting the auto-commit txn.
1225        //
1226        // Holding the lock ensures no concurrent thread can create/renew a txn and
1227        // do local edits in the middle of the batch import, making the whole
1228        // operation atomic with respect to local edits.
1229        let is_detached = self.is_detached();
1230        self.set_detached(true);
1231        self.oplog.lock().batch_importing = true;
1232        let mut err = None;
1233        for (_meta, data) in meta_arr {
1234            match self._import_with(data, Default::default()) {
1235                Ok(s) => {
1236                    for (peer, (start, end)) in s.success.iter() {
1237                        match success.0.entry(*peer) {
1238                            Entry::Occupied(mut e) => {
1239                                e.get_mut().1 = *end.max(&e.get().1);
1240                            }
1241                            Entry::Vacant(e) => {
1242                                e.insert((*start, *end));
1243                            }
1244                        }
1245                    }
1246
1247                    if let Some(p) = s.pending.as_ref() {
1248                        for (&peer, &(start, end)) in p.iter() {
1249                            match pending.0.entry(peer) {
1250                                Entry::Occupied(mut e) => {
1251                                    e.get_mut().0 = start.min(e.get().0);
1252                                    e.get_mut().1 = end.min(e.get().1);
1253                                }
1254                                Entry::Vacant(e) => {
1255                                    e.insert((start, end));
1256                                }
1257                            }
1258                        }
1259                    }
1260                }
1261                Err(e) => {
1262                    err = Some(e);
1263                }
1264            }
1265        }
1266
1267        let mut oplog = self.oplog.lock();
1268        oplog.batch_importing = false;
1269        drop(oplog);
1270        if !is_detached {
1271            self._checkout_to_latest_with_guard(txn);
1272        } else {
1273            drop(txn);
1274        }
1275
1276        self.renew_txn_if_auto_commit(options);
1277        if let Some(err) = err {
1278            return Err(err);
1279        }
1280
1281        Ok(ImportStatus {
1282            success,
1283            pending: if pending.is_empty() {
1284                None
1285            } else {
1286                Some(pending)
1287            },
1288        })
1289    }
1290
1291    /// Get shallow value of the document.
1292    #[inline]
1293    pub fn get_value(&self) -> LoroValue {
1294        self.state.lock().get_value()
1295    }
1296
1297    /// Get deep value of the document.
1298    #[inline]
1299    pub fn get_deep_value(&self) -> LoroValue {
1300        self.state.lock().get_deep_value()
1301    }
1302
1303    /// Get deep value of the document with container id
1304    #[inline]
1305    pub fn get_deep_value_with_id(&self) -> LoroValue {
1306        self.state.lock().get_deep_value_with_id()
1307    }
1308
1309    pub fn checkout_to_latest(&self) {
1310        let (options, _guard) = self.implicit_commit_then_stop();
1311        if !self.is_detached() {
1312            drop(_guard);
1313            self.renew_txn_if_auto_commit(options);
1314            return;
1315        }
1316
1317        self._checkout_to_latest_without_commit(true);
1318        drop(_guard);
1319        self.renew_txn_if_auto_commit(options);
1320    }
1321
1322    fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1323        if !self.is_detached() {
1324            self._renew_txn_if_auto_commit_with_guard(None, guard);
1325            return;
1326        }
1327
1328        self._checkout_to_latest_without_commit(true);
1329        self._renew_txn_if_auto_commit_with_guard(None, guard);
1330    }
1331
1332    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1333    pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1334        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1335            let f = self.oplog_frontiers();
1336            let this = &self;
1337            let frontiers = &f;
1338            this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1339                .unwrap(); // we don't need to shrink frontiers
1340                           // because oplog's frontiers are already shrinked
1341            this.emit_events();
1342            if this.config.detached_editing() {
1343                this.renew_peer_id();
1344            }
1345
1346            self.set_detached(false);
1347        });
1348    }
1349
1350    /// Checkout [DocState] to a specific version.
1351    ///
1352    /// This will make the current [DocState] detached from the latest version of [OpLog].
1353    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1354    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1355        let was_detached = self.is_detached();
1356        let (options, guard) = self.implicit_commit_then_stop();
1357        let result = self._checkout_without_emitting(frontiers, true, true);
1358        if result.is_ok() {
1359            self.emit_events();
1360        }
1361        drop(guard);
1362        if self.config.detached_editing() {
1363            if result.is_ok() {
1364                self.renew_peer_id();
1365            }
1366            self.renew_txn_if_auto_commit(options);
1367        } else if result.is_err() {
1368            if !was_detached {
1369                self.renew_txn_if_auto_commit(options);
1370            }
1371        } else if !self.is_detached() {
1372            self.renew_txn_if_auto_commit(options);
1373        }
1374
1375        result
1376    }
1377
1378    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1379    #[instrument(level = "info", skip(self))]
1380    pub(crate) fn _checkout_without_emitting(
1381        &self,
1382        frontiers: &Frontiers,
1383        to_shrink_frontiers: bool,
1384        to_commit_then_renew: bool,
1385    ) -> Result<(), LoroError> {
1386        if !self.txn.is_locked() {
1387            return Err(LoroError::TransactionError(
1388                "checkout requires the transaction mutex to be held"
1389                    .to_string()
1390                    .into_boxed_str(),
1391            ));
1392        }
1393        let from_frontiers = self.state_frontiers();
1394        loro_common::info!(
1395            "checkout from={:?} to={:?} cur_vv={:?}",
1396            from_frontiers,
1397            frontiers,
1398            self.oplog_vv()
1399        );
1400
1401        if &from_frontiers == frontiers {
1402            return Ok(());
1403        }
1404
1405        let oplog = self.oplog.lock();
1406        if oplog.dag.is_before_shallow_root(frontiers) {
1407            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1408        }
1409
1410        let frontiers = if to_shrink_frontiers {
1411            shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1412        } else {
1413            frontiers.clone()
1414        };
1415
1416        if from_frontiers == frontiers {
1417            return Ok(());
1418        }
1419
1420        let mut state = self.state.lock();
1421        let mut calc = self.diff_calculator.lock();
1422        for i in frontiers.iter() {
1423            if !oplog.dag.contains(i) {
1424                return Err(LoroError::FrontiersNotFound(i));
1425            }
1426        }
1427
1428        let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1429            LoroError::NotFoundError(
1430                format!(
1431                    "Cannot find the current state version {:?}",
1432                    state.frontiers
1433                )
1434                .into_boxed_str(),
1435            )
1436        })?;
1437        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1438            return Err(LoroError::NotFoundError(
1439                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1440            ));
1441        };
1442
1443        self.set_detached(true);
1444        let (diff, diff_mode) =
1445            calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1446        state.apply_diff(
1447            InternalDocDiff {
1448                origin: "checkout".into(),
1449                diff: Cow::Owned(diff),
1450                by: EventTriggerKind::Checkout,
1451                new_version: Cow::Owned(frontiers.clone()),
1452            },
1453            diff_mode,
1454        )?;
1455
1456        Ok(())
1457    }
1458
1459    #[inline]
1460    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1461        self.oplog.lock().dag.vv_to_frontiers(vv)
1462    }
1463
1464    #[inline]
1465    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1466        self.oplog.lock().dag.frontiers_to_vv(frontiers)
1467    }
1468
1469    /// Import ops from other doc.
1470    ///
1471    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1472    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1473        let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1474        self.import(&updates)
1475    }
1476
1477    pub(crate) fn arena(&self) -> &SharedArena {
1478        &self.arena
1479    }
1480
1481    #[inline]
1482    pub fn len_ops(&self) -> usize {
1483        let oplog = self.oplog.lock();
1484        let ans = oplog.vv().values().sum::<i32>() as usize;
1485        if oplog.is_shallow() {
1486            let sub = oplog
1487                .shallow_since_vv()
1488                .iter()
1489                .map(|(_, ops)| *ops)
1490                .sum::<i32>() as usize;
1491            ans - sub
1492        } else {
1493            ans
1494        }
1495    }
1496
1497    #[inline]
1498    pub fn len_changes(&self) -> usize {
1499        let oplog = self.oplog.lock();
1500        oplog.len_changes()
1501    }
1502
1503    pub fn config(&self) -> &Configure {
1504        &self.config
1505    }
1506
1507    /// This method compare the consistency between the current doc state
1508    /// and the state calculated by diff calculator from beginning.
1509    ///
1510    /// Panic when it's not consistent
1511    pub fn check_state_diff_calc_consistency_slow(&self) {
1512        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1513        {
1514            static IS_CHECKING: std::sync::atomic::AtomicBool =
1515                std::sync::atomic::AtomicBool::new(false);
1516            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1517                return;
1518            }
1519
1520            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1521            let peer_id = self.peer_id();
1522            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1523            let _g = s.enter();
1524            let options = self.implicit_commit_then_stop().0;
1525            self.oplog.lock().check_dag_correctness();
1526            if self.is_shallow() {
1527                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1528                //
1529                // Instead, we:
1530                // 1. Export the initial state from the GC snapshot.
1531                // 2. Create a new document and import the initial snapshot.
1532                // 3. Export updates from the shallow start version vector to the current version.
1533                // 4. Import these updates into the new document.
1534                // 5. Compare the states of the new document and the current document.
1535
1536                // Step 1: Export the initial state from the GC snapshot.
1537                let initial_snapshot = self
1538                    .export(ExportMode::state_only(Some(
1539                        &self.shallow_since_frontiers(),
1540                    )))
1541                    .unwrap();
1542
1543                // Step 2: Create a new document and import the initial snapshot.
1544                let doc = LoroDoc::new();
1545                doc.import(&initial_snapshot).unwrap();
1546                self.checkout(&self.shallow_since_frontiers()).unwrap();
1547                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1548
1549                // Step 3: Export updates since the shallow start version vector to the current version.
1550                let updates = self.export(ExportMode::all_updates()).unwrap();
1551
1552                // Step 4: Import these updates into the new document.
1553                doc.import(&updates).unwrap();
1554                self.checkout_to_latest();
1555
1556                // Step 5: Checkout to the current state's frontiers and compare the states.
1557                // doc.checkout(&self.state_frontiers()).unwrap();
1558                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1559                let mut calculated_state = doc.app_state().lock();
1560                let mut current_state = self.app_state().lock();
1561                current_state.check_is_the_same(&mut calculated_state);
1562            } else {
1563                let f = self.state_frontiers();
1564                let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1565                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1566                let doc = Self::new();
1567                doc.import(&bytes).unwrap();
1568                let mut calculated_state = doc.app_state().lock();
1569                let mut current_state = self.app_state().lock();
1570                current_state.check_is_the_same(&mut calculated_state);
1571            }
1572
1573            self.renew_txn_if_auto_commit(options);
1574            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1575        }
1576    }
1577
1578    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1579        self.query_pos_internal(pos, true)
1580    }
1581
1582    /// Get position in a seq container
1583    pub(crate) fn query_pos_internal(
1584        &self,
1585        pos: &Cursor,
1586        ret_event_index: bool,
1587    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1588        if !self.has_container(&pos.container) {
1589            return Err(CannotFindRelativePosition::IdNotFound);
1590        }
1591
1592        let mut state = self.state.lock();
1593        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1594            Ok(PosQueryResult {
1595                update: None,
1596                current: AbsolutePosition {
1597                    pos: ans,
1598                    side: pos.side,
1599                },
1600            })
1601        } else {
1602            // We need to trace back to the version where the relative position is valid.
1603            // The optimal way to find that version is to have succ info like Automerge.
1604            //
1605            // But we don't have that info now, so an alternative way is to trace back
1606            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1607            // the target is just deleted a few versions ago.
1608            //
1609            // What we need is to trace back to the latest version that deletes the target
1610            // id.
1611
1612            // commit the txn to make sure we can query the history correctly, preserving options
1613            drop(state);
1614            let result = self.with_barrier(|| {
1615                let oplog = self.oplog().lock();
1616                // TODO: assert pos.id is not unknown
1617                if let Some(id) = pos.id {
1618                    // Ensure the container is registered if it exists lazily
1619                    if oplog.arena.id_to_idx(&pos.container).is_none() {
1620                        let mut s = self.state.lock();
1621                        if !s.does_container_exist(&pos.container) {
1622                            return Err(CannotFindRelativePosition::ContainerDeleted);
1623                        }
1624                        s.ensure_container(&pos.container);
1625                        drop(s);
1626                    }
1627                    let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1628                    // We know where the target id is when we trace back to the delete_op_id.
1629                    let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1630                        if oplog.shallow_since_vv().includes_id(id) {
1631                            return Err(CannotFindRelativePosition::HistoryCleared);
1632                        }
1633
1634                        tracing::error!("Cannot find id {}", id);
1635                        return Err(CannotFindRelativePosition::IdNotFound);
1636                    };
1637                    // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1638                    let mut diff_calc = DiffCalculator::new(true);
1639                    let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1640                    let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1641                    // TODO: PERF: it doesn't need to calc the effects here
1642                    diff_calc.calc_diff_internal(
1643                        &oplog,
1644                        before,
1645                        &before_frontiers,
1646                        oplog.vv(),
1647                        oplog.frontiers(),
1648                        Some(&|target| idx == target),
1649                    );
1650                    // TODO: remove depth info
1651                    let depth = self.arena.get_depth(idx);
1652                    let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1653                    match diff_calc {
1654                        crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1655                            let c = text.get_id_latest_pos(id).unwrap();
1656                            let new_pos = c.pos;
1657                            let handler = self.get_text(&pos.container);
1658                            let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1659                            Ok(PosQueryResult {
1660                                update: handler.get_cursor(current_pos, c.side),
1661                                current: AbsolutePosition {
1662                                    pos: current_pos,
1663                                    side: c.side,
1664                                },
1665                            })
1666                        }
1667                        crate::diff_calc::ContainerDiffCalculator::List(list) => {
1668                            let c = list.get_id_latest_pos(id).unwrap();
1669                            let new_pos = c.pos;
1670                            let handler = self.get_list(&pos.container);
1671                            Ok(PosQueryResult {
1672                                update: handler.get_cursor(new_pos, c.side),
1673                                current: AbsolutePosition {
1674                                    pos: new_pos,
1675                                    side: c.side,
1676                                },
1677                            })
1678                        }
1679                        crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1680                            let c = list.get_id_latest_pos(id).unwrap();
1681                            let new_pos = c.pos;
1682                            let handler = self.get_movable_list(&pos.container);
1683                            let new_pos = handler.op_pos_to_user_pos(new_pos);
1684                            Ok(PosQueryResult {
1685                                update: handler.get_cursor(new_pos, c.side),
1686                                current: AbsolutePosition {
1687                                    pos: new_pos,
1688                                    side: c.side,
1689                                },
1690                            })
1691                        }
1692                        crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1693                        crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1694                        #[cfg(feature = "counter")]
1695                        crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1696                        crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1697                    }
1698                } else {
1699                    match pos.container.container_type() {
1700                        ContainerType::Text => {
1701                            let text = self.get_text(&pos.container);
1702                            Ok(PosQueryResult {
1703                                update: Some(Cursor {
1704                                    id: None,
1705                                    container: text.id(),
1706                                    side: pos.side,
1707                                    origin_pos: text.len_unicode(),
1708                                }),
1709                                current: AbsolutePosition {
1710                                    pos: text.len_event(),
1711                                    side: pos.side,
1712                                },
1713                            })
1714                        }
1715                        ContainerType::List => {
1716                            let list = self.get_list(&pos.container);
1717                            Ok(PosQueryResult {
1718                                update: Some(Cursor {
1719                                    id: None,
1720                                    container: list.id(),
1721                                    side: pos.side,
1722                                    origin_pos: list.len(),
1723                                }),
1724                                current: AbsolutePosition {
1725                                    pos: list.len(),
1726                                    side: pos.side,
1727                                },
1728                            })
1729                        }
1730                        ContainerType::MovableList => {
1731                            let list = self.get_movable_list(&pos.container);
1732                            Ok(PosQueryResult {
1733                                update: Some(Cursor {
1734                                    id: None,
1735                                    container: list.id(),
1736                                    side: pos.side,
1737                                    origin_pos: list.len(),
1738                                }),
1739                                current: AbsolutePosition {
1740                                    pos: list.len(),
1741                                    side: pos.side,
1742                                },
1743                            })
1744                        }
1745                        ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1746                            unreachable!()
1747                        }
1748                        #[cfg(feature = "counter")]
1749                        ContainerType::Counter => unreachable!(),
1750                    }
1751                }
1752            });
1753            result
1754        }
1755    }
1756
1757    /// Free the history cache that is used for making checkout faster.
1758    ///
1759    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1760    /// You can free it by calling this method.
1761    pub fn free_history_cache(&self) {
1762        self.oplog.lock().free_history_cache();
1763    }
1764
1765    /// Free the cached diff calculator that is used for checkout.
1766    pub fn free_diff_calculator(&self) {
1767        *self.diff_calculator.lock() = DiffCalculator::new(true);
1768    }
1769
1770    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1771    /// You can free it by calling `free_history_cache`.
1772    pub fn has_history_cache(&self) -> bool {
1773        self.oplog.lock().has_history_cache()
1774    }
1775
1776    /// Encoded all ops and history cache to bytes and store them in the kv store.
1777    ///
1778    /// The parsed ops will be dropped
1779    #[inline]
1780    pub fn compact_change_store(&self) {
1781        self.with_barrier(|| {
1782            self.oplog.lock().compact_change_store();
1783        });
1784    }
1785
1786    /// Analyze the container info of the doc
1787    ///
1788    /// This is used for development and debugging
1789    #[inline]
1790    pub fn analyze(&self) -> DocAnalysis {
1791        DocAnalysis::analyze(self)
1792    }
1793
1794    /// Get the path from the root to the container
1795    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1796        let mut state = self.state.lock();
1797        if state.arena.id_to_idx(id).is_none() {
1798            if !state.does_container_exist(id) {
1799                return None;
1800            }
1801            state.ensure_container(id);
1802        }
1803        let idx = state.arena.id_to_idx(id).unwrap();
1804        state.get_path(idx)
1805    }
1806
1807    #[instrument(skip(self))]
1808    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1809        self.with_barrier(|| {
1810            let ans = match mode {
1811                ExportMode::Snapshot => export_fast_snapshot(self),
1812                ExportMode::Updates { from } => export_fast_updates(self, &from),
1813                ExportMode::UpdatesInRange { spans } => {
1814                    export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
1815                }
1816                ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1817                ExportMode::StateOnly(f) => match f {
1818                    Some(f) => export_state_only_snapshot(self, &f)?,
1819                    None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1820                },
1821                ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1822            };
1823            Ok(ans)
1824        })
1825    }
1826
1827    /// The doc only contains the history since the shallow history start version vector.
1828    ///
1829    /// This is empty if the doc is not shallow.
1830    ///
1831    /// The ops included by the shallow history start version vector are not in the doc.
1832    pub fn shallow_since_vv(&self) -> ImVersionVector {
1833        self.oplog().lock().shallow_since_vv().clone()
1834    }
1835
1836    pub fn shallow_since_frontiers(&self) -> Frontiers {
1837        self.oplog().lock().shallow_since_frontiers().clone()
1838    }
1839
1840    /// Check if the doc contains the full history.
1841    pub fn is_shallow(&self) -> bool {
1842        !self.oplog().lock().shallow_since_vv().is_empty()
1843    }
1844
1845    /// Get the number of operations in the pending transaction.
1846    ///
1847    /// The pending transaction is the one that is not committed yet. It will be committed
1848    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
1849    pub fn get_pending_txn_len(&self) -> usize {
1850        if let Some(txn) = self.txn.lock().as_ref() {
1851            txn.len()
1852        } else {
1853            0
1854        }
1855    }
1856
1857    #[inline]
1858    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1859        self.oplog().lock().dag.find_path(from, to)
1860    }
1861
1862    /// Subscribe to the first commit from a peer. Operations performed on the `LoroDoc` within this callback
1863    /// will be merged into the current commit.
1864    ///
1865    /// This is useful for managing the relationship between `PeerID` and user information.
1866    /// For example, you could store user names in a `LoroMap` using `PeerID` as the key and the `UserID` as the value.
1867    pub fn subscribe_first_commit_from_peer(
1868        &self,
1869        callback: FirstCommitFromPeerCallback,
1870    ) -> Subscription {
1871        let (s, enable) = self
1872            .first_commit_from_peer_subs
1873            .inner()
1874            .insert((), callback);
1875        enable();
1876        s
1877    }
1878
1879    /// Subscribe to the pre-commit event.
1880    ///
1881    /// The callback will be called when the changes are committed but not yet applied to the OpLog.
1882    /// You can modify the commit message and timestamp in the callback by [`ChangeModifier`].
1883    pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1884        let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1885        enable();
1886        s
1887    }
1888}
1889
1890#[derive(Debug, thiserror::Error)]
1891pub enum ChangeTravelError {
1892    #[error("Target id not found {0:?}")]
1893    TargetIdNotFound(ID),
1894    #[error("The shallow history of the doc doesn't include the target version")]
1895    TargetVersionNotIncluded,
1896}
1897
1898impl LoroDoc {
1899    pub fn travel_change_ancestors(
1900        &self,
1901        ids: &[ID],
1902        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1903    ) -> Result<(), ChangeTravelError> {
1904        let (options, guard) = self.implicit_commit_then_stop();
1905        drop(guard);
1906        struct PendingNode(ChangeMeta);
1907        impl PartialEq for PendingNode {
1908            fn eq(&self, other: &Self) -> bool {
1909                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1910            }
1911        }
1912
1913        impl Eq for PendingNode {}
1914        impl PartialOrd for PendingNode {
1915            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1916                Some(self.cmp(other))
1917            }
1918        }
1919
1920        impl Ord for PendingNode {
1921            fn cmp(&self, other: &Self) -> Ordering {
1922                self.0
1923                    .lamport_last()
1924                    .cmp(&other.0.lamport_last())
1925                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1926            }
1927        }
1928
1929        for id in ids {
1930            let op_log = &self.oplog().lock();
1931            if !op_log.vv().includes_id(*id) {
1932                return Err(ChangeTravelError::TargetIdNotFound(*id));
1933            }
1934            if op_log.dag.shallow_since_vv().includes_id(*id) {
1935                return Err(ChangeTravelError::TargetVersionNotIncluded);
1936            }
1937        }
1938
1939        let mut visited = FxHashSet::default();
1940        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1941        for id in ids {
1942            pending.push(PendingNode(ChangeMeta::from_change(
1943                &self.oplog().lock().get_change_at(*id).unwrap(),
1944            )));
1945        }
1946        while let Some(PendingNode(node)) = pending.pop() {
1947            let deps = node.deps.clone();
1948            if f(node).is_break() {
1949                break;
1950            }
1951
1952            for dep in deps.iter() {
1953                let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
1954                    continue;
1955                };
1956                if visited.contains(&dep_node.id) {
1957                    continue;
1958                }
1959
1960                visited.insert(dep_node.id);
1961                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1962            }
1963        }
1964
1965        let ans = Ok(());
1966        self.renew_txn_if_auto_commit(options);
1967        ans
1968    }
1969
1970    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1971        self.with_barrier(|| {
1972            let mut set = FxHashSet::default();
1973            {
1974                let oplog = self.oplog().lock();
1975                for op in oplog.iter_ops(id.to_span(len)) {
1976                    let id = oplog.arena.get_container_id(op.container()).unwrap();
1977                    set.insert(id);
1978                }
1979            }
1980            set
1981        })
1982    }
1983
1984    pub fn delete_root_container(&self, cid: ContainerID) {
1985        if !cid.is_root() {
1986            return;
1987        }
1988
1989        // Do not treat "not in arena" as non-existence; consult state/kv
1990        if !self.has_container(&cid) {
1991            return;
1992        }
1993
1994        let Some(h) = self.get_handler(cid.clone()) else {
1995            return;
1996        };
1997
1998        if let Err(e) = h.clear() {
1999            eprintln!("Failed to clear handler: {:?}", e);
2000            return;
2001        }
2002        self.config.deleted_root_containers.lock().insert(cid);
2003    }
2004
2005    pub fn set_hide_empty_root_containers(&self, hide: bool) {
2006        self.config
2007            .hide_empty_root_containers
2008            .store(hide, std::sync::atomic::Ordering::Relaxed);
2009    }
2010}
2011
2012// FIXME: PERF: This method is quite slow because it iterates all the changes
2013fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2014    let start_vv = oplog
2015        .dag
2016        .frontiers_to_vv(&id.into())
2017        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2018    for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2019        for op in change.ops.iter().rev() {
2020            if op.container != idx {
2021                continue;
2022            }
2023            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2024                if d.id_start.to_span(d.atom_len()).contains(id) {
2025                    return Some(ID::new(change.peer(), op.counter));
2026                }
2027            }
2028        }
2029    }
2030
2031    None
2032}
2033
2034#[derive(Debug)]
2035pub struct CommitWhenDrop<'a> {
2036    doc: &'a LoroDoc,
2037    default_options: CommitOptions,
2038}
2039
2040impl Drop for CommitWhenDrop<'_> {
2041    fn drop(&mut self) {
2042        {
2043            let mut guard = self.doc.txn.lock();
2044            if let Some(txn) = guard.as_mut() {
2045                txn.set_default_options(std::mem::take(&mut self.default_options));
2046            };
2047        }
2048
2049        self.doc.commit_then_renew();
2050    }
2051}
2052
2053/// Options for configuring a commit operation.
2054#[derive(Debug, Clone)]
2055pub struct CommitOptions {
2056    /// Origin identifier for the commit event, used to track the source of changes.
2057    /// It doesn't persist.
2058    pub origin: Option<InternalString>,
2059
2060    /// Whether to immediately start a new transaction after committing.
2061    /// Defaults to true.
2062    pub immediate_renew: bool,
2063
2064    /// Custom timestamp for the commit in seconds since Unix epoch.
2065    /// If None, the current time will be used.
2066    pub timestamp: Option<Timestamp>,
2067
2068    /// Optional commit message to attach to the changes. It will be persisted.
2069    pub commit_msg: Option<Arc<str>>,
2070}
2071
2072impl CommitOptions {
2073    /// Creates a new CommitOptions with default values.
2074    pub fn new() -> Self {
2075        Self {
2076            origin: None,
2077            immediate_renew: true,
2078            timestamp: None,
2079            commit_msg: None,
2080        }
2081    }
2082
2083    /// Sets the origin identifier for this commit.
2084    pub fn origin(mut self, origin: &str) -> Self {
2085        self.origin = Some(origin.into());
2086        self
2087    }
2088
2089    /// Sets whether to immediately start a new transaction after committing.
2090    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2091        self.immediate_renew = immediate_renew;
2092        self
2093    }
2094
2095    /// Set the timestamp of the commit.
2096    ///
2097    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
2098    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2099        self.timestamp = Some(timestamp);
2100        self
2101    }
2102
2103    /// Sets a commit message to be attached to the changes.
2104    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2105        self.commit_msg = Some(commit_msg.into());
2106        self
2107    }
2108
2109    /// Sets the origin identifier for this commit.
2110    pub fn set_origin(&mut self, origin: Option<&str>) {
2111        self.origin = origin.map(|x| x.into())
2112    }
2113
2114    /// Sets the timestamp for this commit.
2115    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2116        self.timestamp = timestamp;
2117    }
2118}
2119
2120impl Default for CommitOptions {
2121    fn default() -> Self {
2122        Self::new()
2123    }
2124}
2125
2126#[cfg(test)]
2127mod test {
2128    use std::panic::AssertUnwindSafe;
2129
2130    use crate::{cursor::PosType, loro::ExportMode, version::Frontiers, LoroDoc, ToJson};
2131    use loro_common::ID;
2132
2133    #[test]
2134    fn test_sync() {
2135        fn is_send_sync<T: Send + Sync>(_v: T) {}
2136        let loro = super::LoroDoc::new();
2137        is_send_sync(loro)
2138    }
2139
2140    #[test]
2141    fn test_checkout() {
2142        let loro = LoroDoc::new();
2143        loro.set_peer_id(1).unwrap();
2144        let text = loro.get_text("text");
2145        let map = loro.get_map("map");
2146        let list = loro.get_list("list");
2147        let mut txn = loro.txn().unwrap();
2148        for i in 0..10 {
2149            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2150            text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2151                .unwrap();
2152            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2153        }
2154        txn.commit().unwrap();
2155        let b = LoroDoc::new();
2156        b.import(&loro.export(ExportMode::Snapshot).unwrap())
2157            .unwrap();
2158        loro.checkout(&Frontiers::default()).unwrap();
2159        {
2160            let json = &loro.get_deep_value();
2161            assert_eq!(
2162                json.to_json_value(),
2163                serde_json::json!({"text":"","list":[],"map":{}})
2164            );
2165        }
2166
2167        b.checkout(&ID::new(1, 2).into()).unwrap();
2168        {
2169            let json = &b.get_deep_value();
2170            assert_eq!(
2171                json.to_json_value(),
2172                serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2173            );
2174        }
2175
2176        loro.checkout(&ID::new(1, 3).into()).unwrap();
2177        {
2178            let json = &loro.get_deep_value();
2179            assert_eq!(
2180                json.to_json_value(),
2181                serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2182            );
2183        }
2184
2185        b.checkout(&ID::new(1, 29).into()).unwrap();
2186        {
2187            let json = &b.get_deep_value();
2188            assert_eq!(
2189                json.to_json_value(),
2190                serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2191            );
2192        }
2193    }
2194
2195    #[test]
2196    fn import_batch_err_181() {
2197        let a = LoroDoc::new_auto_commit();
2198        let update_a = a.export(ExportMode::Snapshot);
2199        let b = LoroDoc::new_auto_commit();
2200        b.import_batch(&[update_a.unwrap()]).unwrap();
2201        b.get_text("text")
2202            .insert(0, "hello", PosType::Unicode)
2203            .unwrap();
2204        b.commit_then_renew();
2205        let oplog = b.oplog().lock();
2206        drop(oplog);
2207        b.export(ExportMode::all_updates()).unwrap();
2208    }
2209
2210    #[test]
2211    fn poisoned_mutex_keeps_follow_up_operations_failed() {
2212        let doc = LoroDoc::new();
2213        let oplog = doc.oplog.clone();
2214        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2215            let _guard = oplog.lock();
2216            panic!("poison oplog");
2217        }));
2218
2219        let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2220            .expect_err("poisoned lock should continue to fail fast");
2221        let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2222            (*msg).to_string()
2223        } else if let Some(msg) = err.downcast_ref::<String>() {
2224            msg.clone()
2225        } else {
2226            String::new()
2227        };
2228        assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2229    }
2230}