loro_internal/
loro.rs

1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8    arena::SharedArena,
9    change::Timestamp,
10    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11    container::{
12        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13        IntoContainerId,
14    },
15    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16    dag::{Dag, DagUtils},
17    diff_calc::DiffCalculator,
18    encoding::{
19        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
21        export_state_only_snapshot,
22        json_schema::{encode_change_to_json, json::JsonSchema},
23        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24    },
25    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27    id::PeerID,
28    json::JsonChange,
29    op::InnerContent,
30    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31    state::DocState,
32    subscription::{LocalUpdateCallback, Observer, Subscriber},
33    undo::DiffBatch,
34    utils::subscription::{SubscriberSetWithQueue, Subscription},
35    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37    VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42    lock::{LoroLockGroup, LoroMutex},
43    txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47    ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48    LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53    borrow::Cow,
54    cmp::Ordering,
55    collections::{hash_map::Entry, BinaryHeap},
56    ops::ControlFlow,
57    sync::{
58        atomic::Ordering::{Acquire, Release},
59        Arc,
60    },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("LoroDoc")
73            .field("config", &self.config)
74            .field("auto_commit", &self.auto_commit)
75            .field("detached", &self.detached)
76            .finish()
77    }
78}
79
80impl LoroDoc {
81    /// Run the provided closure within a commit barrier.
82    ///
83    /// This finalizes any pending auto-commit transaction first (preserving
84    /// options across an empty txn), executes `f`, then renews the transaction
85    /// (carrying preserved options) if auto-commit is enabled. This is the
86    /// common implicit-commit pattern used by internal operations such as
87    /// import/export/checkouts.
88    #[inline]
89    pub fn with_barrier<F, R>(&self, f: F) -> R
90    where
91        F: FnOnce() -> R,
92    {
93        let (options, guard) = self.implicit_commit_then_stop();
94        let result = f();
95        drop(guard);
96        self.renew_txn_if_auto_commit(options);
97        result
98    }
99
100    pub fn new() -> Self {
101        let oplog = OpLog::new();
102        let arena = oplog.arena.clone();
103        let config: Configure = oplog.configure.clone();
104        let lock_group = LoroLockGroup::new();
105        let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
106        let inner = Arc::new_cyclic(|w| {
107            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
108            LoroDocInner {
109                oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
110                state,
111                config,
112                detached: AtomicBool::new(false),
113                auto_commit: AtomicBool::new(false),
114                observer: Arc::new(Observer::new(arena.clone())),
115                diff_calculator: Arc::new(
116                    lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
117                ),
118                txn: global_txn,
119                arena,
120                local_update_subs: SubscriberSetWithQueue::new(),
121                peer_id_change_subs: SubscriberSetWithQueue::new(),
122                pre_commit_subs: SubscriberSetWithQueue::new(),
123                first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
124            }
125        });
126        LoroDoc { inner }
127    }
128
129    pub fn fork(&self) -> Self {
130        if self.is_detached() {
131            return self.fork_at(&self.state_frontiers());
132        }
133
134        let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
135        let doc = Self::new();
136        encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default()).unwrap();
137        doc.set_config(&self.config);
138        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
139            doc.start_auto_commit();
140        }
141        doc
142    }
143    /// Enables editing of the document in detached mode.
144    ///
145    /// By default, the document cannot be edited in detached mode (after calling
146    /// `detach` or checking out a version other than the latest). This method
147    /// allows editing in detached mode.
148    ///
149    /// # Important Notes:
150    ///
151    /// - After enabling this mode, the document will use a different PeerID. Each
152    ///   time you call checkout, a new PeerID will be used.
153    /// - If you set a custom PeerID while this mode is enabled, ensure that
154    ///   concurrent operations with the same PeerID are not possible.
155    /// - On detached mode, importing will not change the state of the document.
156    ///   It also doesn't change the version of the [DocState]. The changes will be
157    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
158    pub fn set_detached_editing(&self, enable: bool) {
159        self.config.set_detached_editing(enable);
160        if enable && self.is_detached() {
161            self.with_barrier(|| {
162                self.renew_peer_id();
163            });
164        }
165    }
166
167    /// Create a doc with auto commit enabled.
168    #[inline]
169    pub fn new_auto_commit() -> Self {
170        let doc = Self::new();
171        doc.start_auto_commit();
172        doc
173    }
174
175    #[inline(always)]
176    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
177        if peer == PeerID::MAX {
178            return Err(LoroError::InvalidPeerID);
179        }
180        let next_id = self.oplog.lock().unwrap().next_id(peer);
181        if self.auto_commit.load(Acquire) {
182            let doc_state = self.state.lock().unwrap();
183            doc_state
184                .peer
185                .store(peer, std::sync::atomic::Ordering::Relaxed);
186
187            if doc_state.is_in_txn() {
188                drop(doc_state);
189                // Use implicit-style barrier to avoid swallowing next-commit options
190                self.with_barrier(|| {});
191            }
192            self.peer_id_change_subs.emit(&(), next_id);
193            return Ok(());
194        }
195
196        let doc_state = self.state.lock().unwrap();
197        if doc_state.is_in_txn() {
198            return Err(LoroError::TransactionError(
199                "Cannot change peer id during transaction"
200                    .to_string()
201                    .into_boxed_str(),
202            ));
203        }
204
205        doc_state
206            .peer
207            .store(peer, std::sync::atomic::Ordering::Relaxed);
208        drop(doc_state);
209        self.peer_id_change_subs.emit(&(), next_id);
210        Ok(())
211    }
212
213    /// Renews the PeerID for the document.
214    pub(crate) fn renew_peer_id(&self) {
215        let peer_id = DefaultRandom.next_u64();
216        self.set_peer_id(peer_id).unwrap();
217    }
218
219    /// Implicitly commit the cumulative auto-commit transaction.
220    /// This method only has effect when `auto_commit` is true.
221    ///
222    /// Follow-ups: the caller is responsible for renewing the transaction
223    /// as needed (e.g., via `renew_txn_if_auto_commit`). Prefer using
224    /// `with_barrier(...)` for most internal flows to handle this safely.
225    ///
226    /// Empty-commit behavior: if the pending transaction is empty, the returned
227    /// `Some(CommitOptions)` preserves next-commit options such as message and
228    /// timestamp so they can carry into the renewed transaction. Transient
229    /// labels like `origin` do not carry across an empty commit.
230    #[inline]
231    #[must_use]
232    pub fn implicit_commit_then_stop(
233        &self,
234    ) -> (
235        Option<CommitOptions>,
236        LoroMutexGuard<'_, Option<Transaction>>,
237    ) {
238        // Implicit commit: preserve options on empty commit
239        let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
240        (a, b.unwrap())
241    }
242
243    /// Commit the cumulative auto commit transaction.
244    /// It will start the next one immediately
245    ///
246    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
247    #[inline]
248    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
249        // Explicit commit: swallow options on empty commit
250        self.commit_internal(CommitOptions::new().immediate_renew(true), false)
251            .0
252    }
253
254    /// This method is called before the commit.
255    /// It can be used to modify the change before it is committed.
256    ///
257    /// It return Some(txn) if the txn is None
258    fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
259        let mut txn_guard = self.txn.lock().unwrap();
260        let Some(txn) = txn_guard.as_mut() else {
261            return Some(txn_guard);
262        };
263
264        if txn.is_peer_first_appearance {
265            txn.is_peer_first_appearance = false;
266            drop(txn_guard);
267            // First commit from a peer
268            self.first_commit_from_peer_subs.emit(
269                &(),
270                FirstCommitFromPeerPayload {
271                    peer: self.peer_id(),
272                },
273            );
274        }
275
276        None
277    }
278
279    /// Core implementation for committing the cumulative auto-commit transaction.
280    ///
281    /// - When `preserve_on_empty` is true (implicit commits like export/checkout),
282    ///   commit options from an empty transaction are carried over to the next transaction
283    ///   (except `origin`, which never carries across an empty commit).
284    /// - When `preserve_on_empty` is false (explicit commits), commit options from an
285    ///   empty transaction are swallowed and NOT carried over.
286    #[instrument(skip_all)]
287    fn commit_internal(
288        &self,
289        config: CommitOptions,
290        preserve_on_empty: bool,
291    ) -> (
292        Option<CommitOptions>,
293        Option<LoroMutexGuard<'_, Option<Transaction>>>,
294    ) {
295        if !self.auto_commit.load(Acquire) {
296            let txn_guard = self.txn.lock().unwrap();
297            // if not auto_commit, nothing should happen
298            // because the global txn is not used
299            return (None, Some(txn_guard));
300        }
301
302        loop {
303            if let Some(txn_guard) = self.before_commit() {
304                return (None, Some(txn_guard));
305            }
306
307            let mut txn_guard = self.txn.lock().unwrap();
308            let txn = txn_guard.take();
309            let Some(mut txn) = txn else {
310                return (None, Some(txn_guard));
311            };
312            let on_commit = txn.take_on_commit();
313            if let Some(origin) = config.origin.clone() {
314                txn.set_origin(origin);
315            }
316
317            if let Some(timestamp) = config.timestamp {
318                txn.set_timestamp(timestamp);
319            }
320
321            if let Some(msg) = config.commit_msg.as_ref() {
322                txn.set_msg(Some(msg.clone()));
323            }
324
325            let id_span = txn.id_span();
326            let mut options = txn.commit().unwrap();
327            // Empty commit returns Some(options). We may preserve parts of it for implicit commits.
328            if let Some(opts) = options.as_mut() {
329                // `origin` is an event-only label and never carries across an empty commit
330                if config.origin.is_some() {
331                    opts.set_origin(None);
332                }
333                // For explicit commits, swallow options from empty commit entirely
334                if !preserve_on_empty {
335                    options = None;
336                }
337            }
338            if config.immediate_renew {
339                assert!(self.can_edit());
340                let mut t = self.txn().unwrap();
341                if let Some(options) = options.as_ref() {
342                    t.set_options(options.clone());
343                }
344                *txn_guard = Some(t);
345            }
346
347            if let Some(on_commit) = on_commit {
348                drop(txn_guard);
349                on_commit(&self.state, &self.oplog, id_span);
350                txn_guard = self.txn.lock().unwrap();
351                if !config.immediate_renew && txn_guard.is_some() {
352                    // make sure that txn_guard is None when config.immediate_renew is false
353                    continue;
354                }
355            }
356
357            return (
358                options,
359                if !config.immediate_renew {
360                    Some(txn_guard)
361                } else {
362                    None
363                },
364            );
365        }
366    }
367
368    /// Commit the cumulative auto commit transaction (explicit API).
369    ///
370    /// This is used by user-facing explicit commits. If the transaction is empty,
371    /// any provided commit options are swallowed and will NOT carry over.
372    #[instrument(skip_all)]
373    pub fn commit_with(
374        &self,
375        config: CommitOptions,
376    ) -> (
377        Option<CommitOptions>,
378        Option<LoroMutexGuard<'_, Option<Transaction>>>,
379    ) {
380        self.commit_internal(config, false)
381    }
382
383    /// Set the commit message of the next commit
384    pub fn set_next_commit_message(&self, message: &str) {
385        let mut binding = self.txn.lock().unwrap();
386        let Some(txn) = binding.as_mut() else {
387            return;
388        };
389
390        if message.is_empty() {
391            txn.set_msg(None)
392        } else {
393            txn.set_msg(Some(message.into()))
394        }
395    }
396
397    /// Set the origin of the next commit
398    pub fn set_next_commit_origin(&self, origin: &str) {
399        let mut txn = self.txn.lock().unwrap();
400        if let Some(txn) = txn.as_mut() {
401            txn.set_origin(origin.into());
402        }
403    }
404
405    /// Set the timestamp of the next commit
406    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
407        let mut txn = self.txn.lock().unwrap();
408        if let Some(txn) = txn.as_mut() {
409            txn.set_timestamp(timestamp);
410        }
411    }
412
413    /// Set the options of the next commit
414    pub fn set_next_commit_options(&self, options: CommitOptions) {
415        let mut txn = self.txn.lock().unwrap();
416        if let Some(txn) = txn.as_mut() {
417            txn.set_options(options);
418        }
419    }
420
421    /// Clear the options of the next commit
422    pub fn clear_next_commit_options(&self) {
423        let mut txn = self.txn.lock().unwrap();
424        if let Some(txn) = txn.as_mut() {
425            txn.set_options(CommitOptions::new());
426        }
427    }
428
429    /// Set whether to record the timestamp of each change. Default is `false`.
430    ///
431    /// If enabled, the Unix timestamp will be recorded for each change automatically.
432    ///
433    /// You can also set each timestamp manually when you commit a change.
434    /// The timestamp manually set will override the automatic one.
435    ///
436    /// NOTE: Timestamps are forced to be in ascending order.
437    /// If you commit a new change with a timestamp that is less than the existing one,
438    /// the largest existing timestamp will be used instead.
439    #[inline]
440    pub fn set_record_timestamp(&self, record: bool) {
441        self.config.set_record_timestamp(record);
442    }
443
444    /// Set the interval of mergeable changes, in seconds.
445    ///
446    /// If two continuous local changes are within the interval, they will be merged into one change.
447    /// The default value is 1000 seconds.
448    #[inline]
449    pub fn set_change_merge_interval(&self, interval: i64) {
450        self.config.set_merge_interval(interval);
451    }
452
453    pub fn can_edit(&self) -> bool {
454        !self.is_detached() || self.config.detached_editing()
455    }
456
457    pub fn is_detached_editing_enabled(&self) -> bool {
458        self.config.detached_editing()
459    }
460
461    #[inline]
462    pub fn config_text_style(&self, text_style: StyleConfigMap) {
463        self.config.text_style_config.try_write().unwrap().map = text_style.map;
464    }
465
466    #[inline]
467    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
468        self.config
469            .text_style_config
470            .try_write()
471            .unwrap()
472            .default_style = text_style;
473    }
474    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
475        let doc = Self::new();
476        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
477        if mode.is_snapshot() {
478            doc.with_barrier(|| -> Result<(), LoroError> {
479                decode_snapshot(&doc, mode, body, Default::default())?;
480                Ok(())
481            })?;
482            Ok(doc)
483        } else {
484            Err(LoroError::DecodeError(
485                "Invalid encode mode".to_string().into(),
486            ))
487        }
488    }
489
490    /// Is the document empty? (no ops)
491    #[inline(always)]
492    pub fn can_reset_with_snapshot(&self) -> bool {
493        let oplog = self.oplog.lock().unwrap();
494        if oplog.batch_importing {
495            return false;
496        }
497
498        if self.is_detached() {
499            return false;
500        }
501
502        oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
503    }
504
505    /// Whether [OpLog] and [DocState] are detached.
506    ///
507    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
508    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
509    /// You need to call `checkout` to make it take effect.
510    #[inline(always)]
511    pub fn is_detached(&self) -> bool {
512        self.detached.load(Acquire)
513    }
514
515    pub(crate) fn set_detached(&self, detached: bool) {
516        self.detached.store(detached, Release);
517    }
518
519    #[inline(always)]
520    pub fn peer_id(&self) -> PeerID {
521        self.state
522            .lock()
523            .unwrap()
524            .peer
525            .load(std::sync::atomic::Ordering::Relaxed)
526    }
527
528    #[inline(always)]
529    pub fn detach(&self) {
530        self.with_barrier(|| self.set_detached(true));
531    }
532
533    #[inline(always)]
534    pub fn attach(&self) {
535        self.checkout_to_latest()
536    }
537
538    /// Get the timestamp of the current state.
539    /// It's the last edit time of the [DocState].
540    pub fn state_timestamp(&self) -> Timestamp {
541        // Acquire locks in correct order: read frontiers first, then query OpLog.
542        let f = { self.state.lock().unwrap().frontiers.clone() };
543        self.oplog.lock().unwrap().get_timestamp_of_version(&f)
544    }
545
546    #[inline(always)]
547    pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
548        &self.state
549    }
550
551    #[inline]
552    pub fn get_state_deep_value(&self) -> LoroValue {
553        self.state.lock().unwrap().get_deep_value()
554    }
555
556    #[inline(always)]
557    pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
558        &self.oplog
559    }
560
561    pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
562        self.with_barrier(|| self.oplog.lock().unwrap().export_from(vv))
563    }
564
565    #[inline(always)]
566    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
567        let s = debug_span!("import", peer = self.peer_id());
568        let _e = s.enter();
569        self.import_with(bytes, Default::default())
570    }
571
572    #[inline]
573    pub fn import_with(
574        &self,
575        bytes: &[u8],
576        origin: InternalString,
577    ) -> Result<ImportStatus, LoroError> {
578        self.with_barrier(|| self._import_with(bytes, origin))
579    }
580
581    #[tracing::instrument(skip_all)]
582    fn _import_with(
583        &self,
584        bytes: &[u8],
585        origin: InternalString,
586    ) -> Result<ImportStatus, LoroError> {
587        ensure_cov::notify_cov("loro_internal::import");
588        let parsed = parse_header_and_body(bytes, true)?;
589        loro_common::info!("Importing with mode={:?}", &parsed.mode);
590        let result = match parsed.mode {
591            EncodeMode::OutdatedRle => {
592                if self.state.lock().unwrap().is_in_txn() {
593                    return Err(LoroError::ImportWhenInTxn);
594                }
595
596                let s = tracing::span!(
597                    tracing::Level::INFO,
598                    "Import updates ",
599                    peer = self.peer_id()
600                );
601                let _e = s.enter();
602                self.update_oplog_and_apply_delta_to_state_if_needed(
603                    |oplog| oplog.decode(parsed),
604                    origin,
605                )
606            }
607            EncodeMode::OutdatedSnapshot => {
608                if self.can_reset_with_snapshot() {
609                    loro_common::info!("Init by snapshot {}", self.peer_id());
610                    decode_snapshot(self, parsed.mode, parsed.body, origin)
611                } else {
612                    self.update_oplog_and_apply_delta_to_state_if_needed(
613                        |oplog| oplog.decode(parsed),
614                        origin,
615                    )
616                }
617            }
618            EncodeMode::FastSnapshot => {
619                if self.can_reset_with_snapshot() {
620                    ensure_cov::notify_cov("loro_internal::import::snapshot");
621                    loro_common::info!("Init by fast snapshot {}", self.peer_id());
622                    decode_snapshot(self, parsed.mode, parsed.body, origin)
623                } else {
624                    self.update_oplog_and_apply_delta_to_state_if_needed(
625                        |oplog| oplog.decode(parsed),
626                        origin,
627                    )
628
629                    // let new_doc = LoroDoc::new();
630                    // new_doc.import(bytes)?;
631                    // let updates = new_doc.export_from(&self.oplog_vv());
632                    // return self.import_with(updates.as_slice(), origin);
633                }
634            }
635            EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
636                |oplog| oplog.decode(parsed),
637                origin,
638            ),
639            EncodeMode::Auto => {
640                unreachable!()
641            }
642        };
643
644        self.emit_events();
645
646        result
647    }
648
649    #[tracing::instrument(skip_all)]
650    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
651        &self,
652        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
653        origin: InternalString,
654    ) -> Result<ImportStatus, LoroError> {
655        let mut oplog = self.oplog.lock().unwrap();
656        if !self.is_detached() {
657            let old_vv = oplog.vv().clone();
658            let old_frontiers = oplog.frontiers().clone();
659            let result = f(&mut oplog);
660            if &old_vv != oplog.vv() {
661                let mut diff = DiffCalculator::new(false);
662                let (diff, diff_mode) = diff.calc_diff_internal(
663                    &oplog,
664                    &old_vv,
665                    &old_frontiers,
666                    oplog.vv(),
667                    oplog.dag.get_frontiers(),
668                    None,
669                );
670                let mut state = self.state.lock().unwrap();
671                state.apply_diff(
672                    InternalDocDiff {
673                        origin,
674                        diff: (diff).into(),
675                        by: EventTriggerKind::Import,
676                        new_version: Cow::Owned(oplog.frontiers().clone()),
677                    },
678                    diff_mode,
679                );
680            }
681            result
682        } else {
683            f(&mut oplog)
684        }
685    }
686
687    fn emit_events(&self) {
688        // we should not hold the lock when emitting events
689        let events = {
690            let mut state = self.state.lock().unwrap();
691            state.take_events()
692        };
693        for event in events {
694            self.observer.emit(event);
695        }
696    }
697
698    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
699        let mut state = self.state.lock().unwrap();
700        state.take_events()
701    }
702
703    #[instrument(skip_all)]
704    pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
705        if self.is_shallow() {
706            return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
707        }
708        let ans = self.with_barrier(|| export_snapshot(self));
709        Ok(ans)
710    }
711
712    /// Import the json schema updates.
713    ///
714    /// only supports backward compatibility but not forward compatibility.
715    #[tracing::instrument(skip_all)]
716    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
717        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
718        self.with_barrier(|| {
719            let result = self.update_oplog_and_apply_delta_to_state_if_needed(
720                |oplog| crate::encoding::json_schema::import_json(oplog, json),
721                Default::default(),
722            );
723            self.emit_events();
724            result
725        })
726    }
727
728    pub fn export_json_updates(
729        &self,
730        start_vv: &VersionVector,
731        end_vv: &VersionVector,
732        with_peer_compression: bool,
733    ) -> JsonSchema {
734        self.with_barrier(|| {
735            let oplog = self.oplog.lock().unwrap();
736            let mut start_vv = start_vv;
737            let _temp: Option<VersionVector>;
738            if !oplog.dag.shallow_since_vv().is_empty() {
739                // Make sure that start_vv >= shallow_since_vv
740                let mut include_all = true;
741                for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
742                    if start_vv.get(peer).unwrap_or(&0) < counter {
743                        include_all = false;
744                        break;
745                    }
746                }
747                if !include_all {
748                    let mut vv = start_vv.clone();
749                    for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
750                        vv.extend_to_include_end_id(ID::new(peer, counter));
751                    }
752                    _temp = Some(vv);
753                    start_vv = _temp.as_ref().unwrap();
754                }
755            }
756
757            crate::encoding::json_schema::export_json(
758                &oplog,
759                start_vv,
760                end_vv,
761                with_peer_compression,
762            )
763        })
764    }
765
766    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
767        let oplog = self.oplog.lock().unwrap();
768        let mut changes = export_json_in_id_span(&oplog, id_span);
769        if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
770            let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
771            changes.push(change_json);
772        }
773        changes
774    }
775
776    /// Get the version vector of the current OpLog
777    #[inline]
778    pub fn oplog_vv(&self) -> VersionVector {
779        self.oplog.lock().unwrap().vv().clone()
780    }
781
782    /// Get the version vector of the current [DocState]
783    #[inline]
784    pub fn state_vv(&self) -> VersionVector {
785        let oplog = self.oplog.lock().unwrap();
786        let f = &self.state.lock().unwrap().frontiers;
787        oplog.dag.frontiers_to_vv(f).unwrap()
788    }
789
790    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
791        let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
792        if let LoroValue::Container(c) = value {
793            Some(ValueOrHandler::Handler(Handler::new_attached(
794                c.clone(),
795                self.clone(),
796            )))
797        } else {
798            Some(ValueOrHandler::Value(value))
799        }
800    }
801
802    /// Get the handler by the string path.
803    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
804        let path = str_to_path(path)?;
805        self.get_by_path(&path)
806    }
807
808    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
809        let arena = &self.arena;
810        let txn = self.txn.lock().unwrap();
811        let txn = txn.as_ref()?;
812        let ops_ = txn.local_ops();
813        let new_id = ID {
814            peer: *txn.peer(),
815            counter: ops_.first()?.counter,
816        };
817        let change = ChangeRef {
818            id: &new_id,
819            deps: txn.frontiers(),
820            timestamp: &txn
821                .timestamp()
822                .as_ref()
823                .copied()
824                .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
825            commit_msg: txn.msg(),
826            ops: ops_,
827            lamport: txn.lamport(),
828        };
829        let json = encode_change_to_json(change, arena);
830        Some(json)
831    }
832
833    #[inline]
834    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
835        if self.has_container(&id) {
836            Some(Handler::new_attached(id, self.clone()))
837        } else {
838            None
839        }
840    }
841
842    /// id can be a str, ContainerID, or ContainerIdRaw.
843    /// if it's str it will use Root container, which will not be None
844    #[inline]
845    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
846        let id = id.into_container_id(&self.arena, ContainerType::Text);
847        assert!(self.has_container(&id));
848        Handler::new_attached(id, self.clone()).into_text().unwrap()
849    }
850
851    /// id can be a str, ContainerID, or ContainerIdRaw.
852    /// if it's str it will use Root container, which will not be None
853    #[inline]
854    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
855        let id = id.into_container_id(&self.arena, ContainerType::List);
856        assert!(self.has_container(&id));
857        Handler::new_attached(id, self.clone()).into_list().unwrap()
858    }
859
860    /// id can be a str, ContainerID, or ContainerIdRaw.
861    /// if it's str it will use Root container, which will not be None
862    #[inline]
863    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
864        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
865        assert!(self.has_container(&id));
866        Handler::new_attached(id, self.clone())
867            .into_movable_list()
868            .unwrap()
869    }
870
871    /// id can be a str, ContainerID, or ContainerIdRaw.
872    /// if it's str it will use Root container, which will not be None
873    #[inline]
874    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
875        let id = id.into_container_id(&self.arena, ContainerType::Map);
876        assert!(self.has_container(&id));
877        Handler::new_attached(id, self.clone()).into_map().unwrap()
878    }
879
880    /// id can be a str, ContainerID, or ContainerIdRaw.
881    /// if it's str it will use Root container, which will not be None
882    #[inline]
883    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
884        let id = id.into_container_id(&self.arena, ContainerType::Tree);
885        assert!(self.has_container(&id));
886        Handler::new_attached(id, self.clone()).into_tree().unwrap()
887    }
888
889    #[cfg(feature = "counter")]
890    pub fn get_counter<I: IntoContainerId>(
891        &self,
892        id: I,
893    ) -> crate::handler::counter::CounterHandler {
894        let id = id.into_container_id(&self.arena, ContainerType::Counter);
895        assert!(self.has_container(&id));
896        Handler::new_attached(id, self.clone())
897            .into_counter()
898            .unwrap()
899    }
900
901    #[must_use]
902    pub fn has_container(&self, id: &ContainerID) -> bool {
903        if id.is_root() {
904            return true;
905        }
906
907        let exist = self.state.lock().unwrap().does_container_exist(id);
908        exist
909    }
910
911    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
912    ///
913    /// This is an internal API. You should NOT use it directly.
914    ///
915    /// # Internal
916    ///
917    /// This method will use the diff calculator to calculate the diff required to time travel
918    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
919    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
920    ///
921    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
922    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
923    /// distance from id_span to the current latest version.
924    #[instrument(level = "info", skip_all)]
925    pub fn undo_internal(
926        &self,
927        id_span: IdSpan,
928        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
929        post_transform_base: Option<&DiffBatch>,
930        before_diff: &mut dyn FnMut(&DiffBatch),
931    ) -> LoroResult<CommitWhenDrop<'_>> {
932        if !self.can_edit() {
933            return Err(LoroError::EditWhenDetached);
934        }
935
936        let (options, txn) = self.implicit_commit_then_stop();
937        if !self
938            .oplog()
939            .lock()
940            .unwrap()
941            .vv()
942            .includes_id(id_span.id_last())
943        {
944            self.renew_txn_if_auto_commit(options);
945            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
946        }
947
948        let (was_recording, latest_frontiers) = {
949            let mut state = self.state.lock().unwrap();
950            let was_recording = state.is_recording();
951            state.stop_and_clear_recording();
952            (was_recording, state.frontiers.clone())
953        };
954
955        let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
956        let diff = crate::undo::undo(
957            spans,
958            match post_transform_base {
959                Some(d) => Either::Right(d),
960                None => Either::Left(&latest_frontiers),
961            },
962            |from, to| {
963                self._checkout_without_emitting(from, false, false).unwrap();
964                self.state.lock().unwrap().start_recording();
965                self._checkout_without_emitting(to, false, false).unwrap();
966                let mut state = self.state.lock().unwrap();
967                let e = state.take_events();
968                state.stop_and_clear_recording();
969                DiffBatch::new(e)
970            },
971            before_diff,
972        );
973
974        // println!("\nundo_internal: diff: {:?}", diff);
975        // println!("container remap: {:?}", container_remap);
976
977        self._checkout_without_emitting(&latest_frontiers, false, false)?;
978        self.set_detached(false);
979        if was_recording {
980            self.state.lock().unwrap().start_recording();
981        }
982        drop(txn);
983        self.start_auto_commit();
984        // Try applying the diff, but ignore the error if it happens.
985        // MovableList's undo behavior is too tricky to handle in a collaborative env
986        // so in edge cases this may be an Error
987        if let Err(e) = self._apply_diff(diff, container_remap, true) {
988            warn!("Undo Failed {:?}", e);
989        }
990
991        if let Some(options) = options {
992            self.set_next_commit_options(options);
993        }
994        Ok(CommitWhenDrop {
995            doc: self,
996            default_options: CommitOptions::new().origin("undo"),
997        })
998    }
999
1000    /// Generate a series of local operations that can revert the current doc to the target
1001    /// version.
1002    ///
1003    /// Internally, it will calculate the diff between the current state and the target state,
1004    /// and apply the diff to the current state.
1005    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1006        // TODO: test when the doc is readonly
1007        // TODO: test when the doc is detached but enabled editing
1008        let f = self.state_frontiers();
1009        let diff = self.diff(&f, target)?;
1010        self._apply_diff(diff, &mut Default::default(), false)
1011    }
1012
1013    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
1014    ///
1015    /// NOTE: This method will make the doc enter the **detached mode**.
1016    // FIXME: This method needs testing (no event should be emitted during processing this)
1017    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1018        {
1019            // check whether a and b are valid
1020            let oplog = self.oplog.lock().unwrap();
1021            for id in a.iter() {
1022                if !oplog.dag.contains(id) {
1023                    return Err(LoroError::FrontiersNotFound(id));
1024                }
1025            }
1026            for id in b.iter() {
1027                if !oplog.dag.contains(id) {
1028                    return Err(LoroError::FrontiersNotFound(id));
1029                }
1030            }
1031        }
1032
1033        let (options, txn) = self.implicit_commit_then_stop();
1034        let was_detached = self.is_detached();
1035        let old_frontiers = self.state_frontiers();
1036        let was_recording = {
1037            let mut state = self.state.lock().unwrap();
1038            let is_recording = state.is_recording();
1039            state.stop_and_clear_recording();
1040            is_recording
1041        };
1042        self._checkout_without_emitting(a, true, false).unwrap();
1043        self.state.lock().unwrap().start_recording();
1044        self._checkout_without_emitting(b, true, false).unwrap();
1045        let e = {
1046            let mut state = self.state.lock().unwrap();
1047            let e = state.take_events();
1048            state.stop_and_clear_recording();
1049            e
1050        };
1051        self._checkout_without_emitting(&old_frontiers, false, false)
1052            .unwrap();
1053        drop(txn);
1054        if !was_detached {
1055            self.set_detached(false);
1056            self.renew_txn_if_auto_commit(options);
1057        }
1058        if was_recording {
1059            self.state.lock().unwrap().start_recording();
1060        }
1061        Ok(DiffBatch::new(e))
1062    }
1063
1064    /// Apply a diff to the current state.
1065    #[inline(always)]
1066    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1067        self._apply_diff(diff, &mut Default::default(), true)
1068    }
1069
1070    /// Apply a diff to the current state.
1071    ///
1072    /// This method will not recreate containers with the same [ContainerID]s.
1073    /// While this can be convenient in certain cases, it can break several internal invariants:
1074    ///
1075    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
1076    ///    would result in multiple instances of the same container in the document.
1077    /// 2. Unreachable containers should be removable from the state when necessary.
1078    ///
1079    /// However, the diff may contain operations that depend on container IDs.
1080    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
1081    pub(crate) fn _apply_diff(
1082        &self,
1083        diff: DiffBatch,
1084        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1085        skip_unreachable: bool,
1086    ) -> LoroResult<()> {
1087        if !self.can_edit() {
1088            return Err(LoroError::EditWhenDetached);
1089        }
1090
1091        let mut ans: LoroResult<()> = Ok(());
1092        let mut missing_containers: Vec<ContainerID> = Vec::new();
1093        for (mut id, diff) in diff.into_iter() {
1094            let mut remapped = false;
1095            while let Some(rid) = container_remap.get(&id) {
1096                remapped = true;
1097                id = rid.clone();
1098            }
1099
1100            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1101                // Not in arena does not imply non-existent; consult state/kv and register lazily
1102                let exists = self.state.lock().unwrap().does_container_exist(&id);
1103                if !exists {
1104                    missing_containers.push(id);
1105                    continue;
1106                }
1107                // Ensure registration so handlers can be created
1108                self.state.lock().unwrap().ensure_container(&id);
1109            }
1110
1111            if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1112                continue;
1113            }
1114
1115            let Some(h) = self.get_handler(id.clone()) else {
1116                return Err(LoroError::ContainersNotFound {
1117                    containers: Box::new(vec![id]),
1118                });
1119            };
1120            if let Err(e) = h.apply_diff(diff, container_remap) {
1121                ans = Err(e);
1122            }
1123        }
1124
1125        if !missing_containers.is_empty() {
1126            return Err(LoroError::ContainersNotFound {
1127                containers: Box::new(missing_containers),
1128            });
1129        }
1130
1131        ans
1132    }
1133
1134    /// This is for debugging purpose. It will travel the whole oplog
1135    #[inline]
1136    pub fn diagnose_size(&self) {
1137        self.oplog().lock().unwrap().diagnose_size();
1138    }
1139
1140    #[inline]
1141    pub fn oplog_frontiers(&self) -> Frontiers {
1142        self.oplog().lock().unwrap().frontiers().clone()
1143    }
1144
1145    #[inline]
1146    pub fn state_frontiers(&self) -> Frontiers {
1147        self.state.lock().unwrap().frontiers.clone()
1148    }
1149
1150    /// - Ordering::Less means self is less than target or parallel
1151    /// - Ordering::Equal means versions equal
1152    /// - Ordering::Greater means self's version is greater than target
1153    #[inline]
1154    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1155        self.oplog().lock().unwrap().cmp_with_frontiers(other)
1156    }
1157
1158    /// Compare two [Frontiers] causally.
1159    ///
1160    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1161    #[inline]
1162    pub fn cmp_frontiers(
1163        &self,
1164        a: &Frontiers,
1165        b: &Frontiers,
1166    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1167        self.oplog().lock().unwrap().cmp_frontiers(a, b)
1168    }
1169
1170    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1171        let mut state = self.state.lock().unwrap();
1172        if !state.is_recording() {
1173            state.start_recording();
1174        }
1175
1176        self.observer.subscribe_root(callback)
1177    }
1178
1179    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1180        let mut state = self.state.lock().unwrap();
1181        if !state.is_recording() {
1182            state.start_recording();
1183        }
1184
1185        self.observer.subscribe(container_id, callback)
1186    }
1187
1188    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1189        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1190        activate();
1191        sub
1192    }
1193
1194    // PERF: opt
1195    #[tracing::instrument(skip_all)]
1196    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1197        if bytes.is_empty() {
1198            return Ok(ImportStatus::default());
1199        }
1200
1201        if bytes.len() == 1 {
1202            return self.import(&bytes[0]);
1203        }
1204
1205        let mut success = VersionRange::default();
1206        let mut pending = VersionRange::default();
1207        let mut meta_arr = bytes
1208            .iter()
1209            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1210            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1211        meta_arr.sort_by(|a, b| {
1212            a.0.mode
1213                .cmp(&b.0.mode)
1214                .then(b.0.change_num.cmp(&a.0.change_num))
1215        });
1216
1217        let (options, txn) = self.implicit_commit_then_stop();
1218        // Why we should keep locking `txn` here
1219        //
1220        // In a multi-threaded environment, `import_batch` used to drop the txn lock
1221        // (via `commit_then_stop` + `drop(txn)`) and call `detach()`/`checkout_to_latest()`
1222        // around the batch import. That created a race where another thread could
1223        // start or renew the auto-commit txn and perform local edits while we were
1224        // importing and temporarily detached. Those interleaved local edits could
1225        // violate invariants between `OpLog` and `DocState` (e.g., state being
1226        // updated when we expect it not to, missed events, or inconsistent
1227        // frontiers), as exposed by the loom test `local_edits_during_batch_import`.
1228        //
1229        // The fix is to hold the txn mutex for the entire critical section:
1230        // - Stop the current txn and keep the mutex guard.
1231        // - Force-detach with `set_detached(true)` (avoids `detach()` side effects),
1232        //   then run each `_import_with(...)` while detached so imports only touch
1233        //   the `OpLog`.
1234        // - After importing, reattach by checking out to latest and renew the txn
1235        //   using `_checkout_to_latest_with_guard`, which keeps the mutex held while
1236        //   (re)starting the auto-commit txn.
1237        //
1238        // Holding the lock ensures no concurrent thread can create/renew a txn and
1239        // do local edits in the middle of the batch import, making the whole
1240        // operation atomic with respect to local edits.
1241        let is_detached = self.is_detached();
1242        self.set_detached(true);
1243        self.oplog.lock().unwrap().batch_importing = true;
1244        let mut err = None;
1245        for (_meta, data) in meta_arr {
1246            match self._import_with(data, Default::default()) {
1247                Ok(s) => {
1248                    for (peer, (start, end)) in s.success.iter() {
1249                        match success.0.entry(*peer) {
1250                            Entry::Occupied(mut e) => {
1251                                e.get_mut().1 = *end.max(&e.get().1);
1252                            }
1253                            Entry::Vacant(e) => {
1254                                e.insert((*start, *end));
1255                            }
1256                        }
1257                    }
1258
1259                    if let Some(p) = s.pending.as_ref() {
1260                        for (&peer, &(start, end)) in p.iter() {
1261                            match pending.0.entry(peer) {
1262                                Entry::Occupied(mut e) => {
1263                                    e.get_mut().0 = start.min(e.get().0);
1264                                    e.get_mut().1 = end.min(e.get().1);
1265                                }
1266                                Entry::Vacant(e) => {
1267                                    e.insert((start, end));
1268                                }
1269                            }
1270                        }
1271                    }
1272                }
1273                Err(e) => {
1274                    err = Some(e);
1275                }
1276            }
1277        }
1278
1279        let mut oplog = self.oplog.lock().unwrap();
1280        oplog.batch_importing = false;
1281        drop(oplog);
1282        if !is_detached {
1283            self._checkout_to_latest_with_guard(txn);
1284        } else {
1285            drop(txn);
1286        }
1287
1288        self.renew_txn_if_auto_commit(options);
1289        if let Some(err) = err {
1290            return Err(err);
1291        }
1292
1293        Ok(ImportStatus {
1294            success,
1295            pending: if pending.is_empty() {
1296                None
1297            } else {
1298                Some(pending)
1299            },
1300        })
1301    }
1302
1303    /// Get shallow value of the document.
1304    #[inline]
1305    pub fn get_value(&self) -> LoroValue {
1306        self.state.lock().unwrap().get_value()
1307    }
1308
1309    /// Get deep value of the document.
1310    #[inline]
1311    pub fn get_deep_value(&self) -> LoroValue {
1312        self.state.lock().unwrap().get_deep_value()
1313    }
1314
1315    /// Get deep value of the document with container id
1316    #[inline]
1317    pub fn get_deep_value_with_id(&self) -> LoroValue {
1318        self.state.lock().unwrap().get_deep_value_with_id()
1319    }
1320
1321    pub fn checkout_to_latest(&self) {
1322        let (options, _guard) = self.implicit_commit_then_stop();
1323        if !self.is_detached() {
1324            drop(_guard);
1325            self.renew_txn_if_auto_commit(options);
1326            return;
1327        }
1328
1329        self._checkout_to_latest_without_commit(true);
1330        drop(_guard);
1331        self.renew_txn_if_auto_commit(options);
1332    }
1333
1334    fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1335        if !self.is_detached() {
1336            self._renew_txn_if_auto_commit_with_guard(None, guard);
1337            return;
1338        }
1339
1340        self._checkout_to_latest_without_commit(true);
1341        self._renew_txn_if_auto_commit_with_guard(None, guard);
1342    }
1343
1344    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1345    pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1346        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1347            let f = self.oplog_frontiers();
1348            let this = &self;
1349            let frontiers = &f;
1350            this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1351                .unwrap(); // we don't need to shrink frontiers
1352                           // because oplog's frontiers are already shrinked
1353            this.emit_events();
1354            if this.config.detached_editing() {
1355                this.renew_peer_id();
1356            }
1357
1358            self.set_detached(false);
1359        });
1360    }
1361
1362    /// Checkout [DocState] to a specific version.
1363    ///
1364    /// This will make the current [DocState] detached from the latest version of [OpLog].
1365    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1366    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1367        let (options, guard) = self.implicit_commit_then_stop();
1368        self._checkout_without_emitting(frontiers, true, true)?;
1369        self.emit_events();
1370        drop(guard);
1371        if self.config.detached_editing() {
1372            self.renew_peer_id();
1373            self.renew_txn_if_auto_commit(options);
1374        } else if !self.is_detached() {
1375            self.renew_txn_if_auto_commit(options);
1376        }
1377
1378        Ok(())
1379    }
1380
1381    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1382    #[instrument(level = "info", skip(self))]
1383    pub(crate) fn _checkout_without_emitting(
1384        &self,
1385        frontiers: &Frontiers,
1386        to_shrink_frontiers: bool,
1387        to_commit_then_renew: bool,
1388    ) -> Result<(), LoroError> {
1389        assert!(self.txn.is_locked());
1390        let from_frontiers = self.state_frontiers();
1391        loro_common::info!(
1392            "checkout from={:?} to={:?} cur_vv={:?}",
1393            from_frontiers,
1394            frontiers,
1395            self.oplog_vv()
1396        );
1397
1398        if &from_frontiers == frontiers {
1399            return Ok(());
1400        }
1401
1402        let oplog = self.oplog.lock().unwrap();
1403        if oplog.dag.is_before_shallow_root(frontiers) {
1404            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1405        }
1406
1407        let frontiers = if to_shrink_frontiers {
1408            shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1409        } else {
1410            frontiers.clone()
1411        };
1412
1413        if from_frontiers == frontiers {
1414            return Ok(());
1415        }
1416
1417        let mut state = self.state.lock().unwrap();
1418        let mut calc = self.diff_calculator.lock().unwrap();
1419        for i in frontiers.iter() {
1420            if !oplog.dag.contains(i) {
1421                return Err(LoroError::FrontiersNotFound(i));
1422            }
1423        }
1424
1425        let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1426        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1427            return Err(LoroError::NotFoundError(
1428                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1429            ));
1430        };
1431
1432        self.set_detached(true);
1433        let (diff, diff_mode) =
1434            calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1435        state.apply_diff(
1436            InternalDocDiff {
1437                origin: "checkout".into(),
1438                diff: Cow::Owned(diff),
1439                by: EventTriggerKind::Checkout,
1440                new_version: Cow::Owned(frontiers.clone()),
1441            },
1442            diff_mode,
1443        );
1444
1445        Ok(())
1446    }
1447
1448    #[inline]
1449    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1450        self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1451    }
1452
1453    #[inline]
1454    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1455        self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1456    }
1457
1458    /// Import ops from other doc.
1459    ///
1460    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1461    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1462        self.import(&other.export_from(&self.oplog_vv()))
1463    }
1464
1465    pub(crate) fn arena(&self) -> &SharedArena {
1466        &self.arena
1467    }
1468
1469    #[inline]
1470    pub fn len_ops(&self) -> usize {
1471        let oplog = self.oplog.lock().unwrap();
1472        let ans = oplog.vv().values().sum::<i32>() as usize;
1473        if oplog.is_shallow() {
1474            let sub = oplog
1475                .shallow_since_vv()
1476                .iter()
1477                .map(|(_, ops)| *ops)
1478                .sum::<i32>() as usize;
1479            ans - sub
1480        } else {
1481            ans
1482        }
1483    }
1484
1485    #[inline]
1486    pub fn len_changes(&self) -> usize {
1487        let oplog = self.oplog.lock().unwrap();
1488        oplog.len_changes()
1489    }
1490
1491    pub fn config(&self) -> &Configure {
1492        &self.config
1493    }
1494
1495    /// This method compare the consistency between the current doc state
1496    /// and the state calculated by diff calculator from beginning.
1497    ///
1498    /// Panic when it's not consistent
1499    pub fn check_state_diff_calc_consistency_slow(&self) {
1500        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1501        {
1502            static IS_CHECKING: std::sync::atomic::AtomicBool =
1503                std::sync::atomic::AtomicBool::new(false);
1504            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1505                return;
1506            }
1507
1508            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1509            let peer_id = self.peer_id();
1510            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1511            let _g = s.enter();
1512            let options = self.implicit_commit_then_stop().0;
1513            self.oplog.lock().unwrap().check_dag_correctness();
1514            if self.is_shallow() {
1515                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1516                //
1517                // Instead, we:
1518                // 1. Export the initial state from the GC snapshot.
1519                // 2. Create a new document and import the initial snapshot.
1520                // 3. Export updates from the shallow start version vector to the current version.
1521                // 4. Import these updates into the new document.
1522                // 5. Compare the states of the new document and the current document.
1523
1524                // Step 1: Export the initial state from the GC snapshot.
1525                let initial_snapshot = self
1526                    .export(ExportMode::state_only(Some(
1527                        &self.shallow_since_frontiers(),
1528                    )))
1529                    .unwrap();
1530
1531                // Step 2: Create a new document and import the initial snapshot.
1532                let doc = LoroDoc::new();
1533                doc.import(&initial_snapshot).unwrap();
1534                self.checkout(&self.shallow_since_frontiers()).unwrap();
1535                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1536
1537                // Step 3: Export updates since the shallow start version vector to the current version.
1538                let updates = self.export(ExportMode::all_updates()).unwrap();
1539
1540                // Step 4: Import these updates into the new document.
1541                doc.import(&updates).unwrap();
1542                self.checkout_to_latest();
1543
1544                // Step 5: Checkout to the current state's frontiers and compare the states.
1545                // doc.checkout(&self.state_frontiers()).unwrap();
1546                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1547                let mut calculated_state = doc.app_state().lock().unwrap();
1548                let mut current_state = self.app_state().lock().unwrap();
1549                current_state.check_is_the_same(&mut calculated_state);
1550            } else {
1551                let f = self.state_frontiers();
1552                let vv = self
1553                    .oplog()
1554                    .lock()
1555                    .unwrap()
1556                    .dag
1557                    .frontiers_to_vv(&f)
1558                    .unwrap();
1559                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1560                let doc = Self::new();
1561                doc.import(&bytes).unwrap();
1562                let mut calculated_state = doc.app_state().lock().unwrap();
1563                let mut current_state = self.app_state().lock().unwrap();
1564                current_state.check_is_the_same(&mut calculated_state);
1565            }
1566
1567            self.renew_txn_if_auto_commit(options);
1568            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1569        }
1570    }
1571
1572    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1573        self.query_pos_internal(pos, true)
1574    }
1575
1576    /// Get position in a seq container
1577    pub(crate) fn query_pos_internal(
1578        &self,
1579        pos: &Cursor,
1580        ret_event_index: bool,
1581    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1582        let mut state = self.state.lock().unwrap();
1583        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1584            Ok(PosQueryResult {
1585                update: None,
1586                current: AbsolutePosition {
1587                    pos: ans,
1588                    side: pos.side,
1589                },
1590            })
1591        } else {
1592            // We need to trace back to the version where the relative position is valid.
1593            // The optimal way to find that version is to have succ info like Automerge.
1594            //
1595            // But we don't have that info now, so an alternative way is to trace back
1596            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1597            // the target is just deleted a few versions ago.
1598            //
1599            // What we need is to trace back to the latest version that deletes the target
1600            // id.
1601
1602            // commit the txn to make sure we can query the history correctly, preserving options
1603            drop(state);
1604            let result = self.with_barrier(|| {
1605                let oplog = self.oplog().lock().unwrap();
1606                // TODO: assert pos.id is not unknown
1607                if let Some(id) = pos.id {
1608                    // Ensure the container is registered if it exists lazily
1609                    if oplog.arena.id_to_idx(&pos.container).is_none() {
1610                        let mut s = self.state.lock().unwrap();
1611                        if !s.does_container_exist(&pos.container) {
1612                            return Err(CannotFindRelativePosition::ContainerDeleted);
1613                        }
1614                        s.ensure_container(&pos.container);
1615                        drop(s);
1616                    }
1617                    let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1618                    // We know where the target id is when we trace back to the delete_op_id.
1619                    let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1620                        if oplog.shallow_since_vv().includes_id(id) {
1621                            return Err(CannotFindRelativePosition::HistoryCleared);
1622                        }
1623
1624                        tracing::error!("Cannot find id {}", id);
1625                        return Err(CannotFindRelativePosition::IdNotFound);
1626                    };
1627                    // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1628                    let mut diff_calc = DiffCalculator::new(true);
1629                    let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1630                    let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1631                    // TODO: PERF: it doesn't need to calc the effects here
1632                    diff_calc.calc_diff_internal(
1633                        &oplog,
1634                        before,
1635                        &before_frontiers,
1636                        oplog.vv(),
1637                        oplog.frontiers(),
1638                        Some(&|target| idx == target),
1639                    );
1640                    // TODO: remove depth info
1641                    let depth = self.arena.get_depth(idx);
1642                    let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1643                    match diff_calc {
1644                        crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1645                            let c = text.get_id_latest_pos(id).unwrap();
1646                            let new_pos = c.pos;
1647                            let handler = self.get_text(&pos.container);
1648                            let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1649                            Ok(PosQueryResult {
1650                                update: handler.get_cursor(current_pos, c.side),
1651                                current: AbsolutePosition {
1652                                    pos: current_pos,
1653                                    side: c.side,
1654                                },
1655                            })
1656                        }
1657                        crate::diff_calc::ContainerDiffCalculator::List(list) => {
1658                            let c = list.get_id_latest_pos(id).unwrap();
1659                            let new_pos = c.pos;
1660                            let handler = self.get_list(&pos.container);
1661                            Ok(PosQueryResult {
1662                                update: handler.get_cursor(new_pos, c.side),
1663                                current: AbsolutePosition {
1664                                    pos: new_pos,
1665                                    side: c.side,
1666                                },
1667                            })
1668                        }
1669                        crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1670                            let c = list.get_id_latest_pos(id).unwrap();
1671                            let new_pos = c.pos;
1672                            let handler = self.get_movable_list(&pos.container);
1673                            let new_pos = handler.op_pos_to_user_pos(new_pos);
1674                            Ok(PosQueryResult {
1675                                update: handler.get_cursor(new_pos, c.side),
1676                                current: AbsolutePosition {
1677                                    pos: new_pos,
1678                                    side: c.side,
1679                                },
1680                            })
1681                        }
1682                        crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1683                        crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1684                        #[cfg(feature = "counter")]
1685                        crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1686                        crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1687                    }
1688                } else {
1689                    match pos.container.container_type() {
1690                        ContainerType::Text => {
1691                            let text = self.get_text(&pos.container);
1692                            Ok(PosQueryResult {
1693                                update: Some(Cursor {
1694                                    id: None,
1695                                    container: text.id(),
1696                                    side: pos.side,
1697                                    origin_pos: text.len_unicode(),
1698                                }),
1699                                current: AbsolutePosition {
1700                                    pos: text.len_event(),
1701                                    side: pos.side,
1702                                },
1703                            })
1704                        }
1705                        ContainerType::List => {
1706                            let list = self.get_list(&pos.container);
1707                            Ok(PosQueryResult {
1708                                update: Some(Cursor {
1709                                    id: None,
1710                                    container: list.id(),
1711                                    side: pos.side,
1712                                    origin_pos: list.len(),
1713                                }),
1714                                current: AbsolutePosition {
1715                                    pos: list.len(),
1716                                    side: pos.side,
1717                                },
1718                            })
1719                        }
1720                        ContainerType::MovableList => {
1721                            let list = self.get_movable_list(&pos.container);
1722                            Ok(PosQueryResult {
1723                                update: Some(Cursor {
1724                                    id: None,
1725                                    container: list.id(),
1726                                    side: pos.side,
1727                                    origin_pos: list.len(),
1728                                }),
1729                                current: AbsolutePosition {
1730                                    pos: list.len(),
1731                                    side: pos.side,
1732                                },
1733                            })
1734                        }
1735                        ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1736                            unreachable!()
1737                        }
1738                        #[cfg(feature = "counter")]
1739                        ContainerType::Counter => unreachable!(),
1740                    }
1741                }
1742            });
1743            result
1744        }
1745    }
1746
1747    /// Free the history cache that is used for making checkout faster.
1748    ///
1749    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1750    /// You can free it by calling this method.
1751    pub fn free_history_cache(&self) {
1752        self.oplog.lock().unwrap().free_history_cache();
1753    }
1754
1755    /// Free the cached diff calculator that is used for checkout.
1756    pub fn free_diff_calculator(&self) {
1757        *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1758    }
1759
1760    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1761    /// You can free it by calling `free_history_cache`.
1762    pub fn has_history_cache(&self) -> bool {
1763        self.oplog.lock().unwrap().has_history_cache()
1764    }
1765
1766    /// Encoded all ops and history cache to bytes and store them in the kv store.
1767    ///
1768    /// The parsed ops will be dropped
1769    #[inline]
1770    pub fn compact_change_store(&self) {
1771        self.with_barrier(|| {
1772            self.oplog.lock().unwrap().compact_change_store();
1773        });
1774    }
1775
1776    /// Analyze the container info of the doc
1777    ///
1778    /// This is used for development and debugging
1779    #[inline]
1780    pub fn analyze(&self) -> DocAnalysis {
1781        DocAnalysis::analyze(self)
1782    }
1783
1784    /// Get the path from the root to the container
1785    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1786        let mut state = self.state.lock().unwrap();
1787        if state.arena.id_to_idx(id).is_none() {
1788            if !state.does_container_exist(id) {
1789                return None;
1790            }
1791            state.ensure_container(id);
1792        }
1793        let idx = state.arena.id_to_idx(id).unwrap();
1794        state.get_path(idx)
1795    }
1796
1797    #[instrument(skip(self))]
1798    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1799        self.with_barrier(|| {
1800            let ans = match mode {
1801                ExportMode::Snapshot => export_fast_snapshot(self),
1802                ExportMode::Updates { from } => export_fast_updates(self, &from),
1803                ExportMode::UpdatesInRange { spans } => {
1804                    export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1805                }
1806                ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1807                ExportMode::StateOnly(f) => match f {
1808                    Some(f) => export_state_only_snapshot(self, &f)?,
1809                    None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1810                },
1811                ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1812            };
1813            Ok(ans)
1814        })
1815    }
1816
1817    /// The doc only contains the history since the shallow history start version vector.
1818    ///
1819    /// This is empty if the doc is not shallow.
1820    ///
1821    /// The ops included by the shallow history start version vector are not in the doc.
1822    pub fn shallow_since_vv(&self) -> ImVersionVector {
1823        self.oplog().lock().unwrap().shallow_since_vv().clone()
1824    }
1825
1826    pub fn shallow_since_frontiers(&self) -> Frontiers {
1827        self.oplog()
1828            .lock()
1829            .unwrap()
1830            .shallow_since_frontiers()
1831            .clone()
1832    }
1833
1834    /// Check if the doc contains the full history.
1835    pub fn is_shallow(&self) -> bool {
1836        !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1837    }
1838
1839    /// Get the number of operations in the pending transaction.
1840    ///
1841    /// The pending transaction is the one that is not committed yet. It will be committed
1842    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
1843    pub fn get_pending_txn_len(&self) -> usize {
1844        if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1845            txn.len()
1846        } else {
1847            0
1848        }
1849    }
1850
1851    #[inline]
1852    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1853        self.oplog().lock().unwrap().dag.find_path(from, to)
1854    }
1855
1856    /// Subscribe to the first commit from a peer. Operations performed on the `LoroDoc` within this callback
1857    /// will be merged into the current commit.
1858    ///
1859    /// This is useful for managing the relationship between `PeerID` and user information.
1860    /// For example, you could store user names in a `LoroMap` using `PeerID` as the key and the `UserID` as the value.
1861    pub fn subscribe_first_commit_from_peer(
1862        &self,
1863        callback: FirstCommitFromPeerCallback,
1864    ) -> Subscription {
1865        let (s, enable) = self
1866            .first_commit_from_peer_subs
1867            .inner()
1868            .insert((), callback);
1869        enable();
1870        s
1871    }
1872
1873    /// Subscribe to the pre-commit event.
1874    ///
1875    /// The callback will be called when the changes are committed but not yet applied to the OpLog.
1876    /// You can modify the commit message and timestamp in the callback by [`ChangeModifier`].
1877    pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1878        let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1879        enable();
1880        s
1881    }
1882}
1883
1884#[derive(Debug, thiserror::Error)]
1885pub enum ChangeTravelError {
1886    #[error("Target id not found {0:?}")]
1887    TargetIdNotFound(ID),
1888    #[error("The shallow history of the doc doesn't include the target version")]
1889    TargetVersionNotIncluded,
1890}
1891
1892impl LoroDoc {
1893    pub fn travel_change_ancestors(
1894        &self,
1895        ids: &[ID],
1896        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1897    ) -> Result<(), ChangeTravelError> {
1898        let (options, guard) = self.implicit_commit_then_stop();
1899        drop(guard);
1900        struct PendingNode(ChangeMeta);
1901        impl PartialEq for PendingNode {
1902            fn eq(&self, other: &Self) -> bool {
1903                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1904            }
1905        }
1906
1907        impl Eq for PendingNode {}
1908        impl PartialOrd for PendingNode {
1909            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1910                Some(self.cmp(other))
1911            }
1912        }
1913
1914        impl Ord for PendingNode {
1915            fn cmp(&self, other: &Self) -> Ordering {
1916                self.0
1917                    .lamport_last()
1918                    .cmp(&other.0.lamport_last())
1919                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1920            }
1921        }
1922
1923        for id in ids {
1924            let op_log = &self.oplog().lock().unwrap();
1925            if !op_log.vv().includes_id(*id) {
1926                return Err(ChangeTravelError::TargetIdNotFound(*id));
1927            }
1928            if op_log.dag.shallow_since_vv().includes_id(*id) {
1929                return Err(ChangeTravelError::TargetVersionNotIncluded);
1930            }
1931        }
1932
1933        let mut visited = FxHashSet::default();
1934        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1935        for id in ids {
1936            pending.push(PendingNode(ChangeMeta::from_change(
1937                &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1938            )));
1939        }
1940        while let Some(PendingNode(node)) = pending.pop() {
1941            let deps = node.deps.clone();
1942            if f(node).is_break() {
1943                break;
1944            }
1945
1946            for dep in deps.iter() {
1947                let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1948                    continue;
1949                };
1950                if visited.contains(&dep_node.id) {
1951                    continue;
1952                }
1953
1954                visited.insert(dep_node.id);
1955                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1956            }
1957        }
1958
1959        let ans = Ok(());
1960        self.renew_txn_if_auto_commit(options);
1961        ans
1962    }
1963
1964    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1965        self.with_barrier(|| {
1966            let mut set = FxHashSet::default();
1967            {
1968                let oplog = self.oplog().lock().unwrap();
1969                for op in oplog.iter_ops(id.to_span(len)) {
1970                    let id = oplog.arena.get_container_id(op.container()).unwrap();
1971                    set.insert(id);
1972                }
1973            }
1974            set
1975        })
1976    }
1977
1978    pub fn delete_root_container(&self, cid: ContainerID) {
1979        if !cid.is_root() {
1980            return;
1981        }
1982
1983        // Do not treat "not in arena" as non-existence; consult state/kv
1984        if !self.has_container(&cid) {
1985            return;
1986        }
1987
1988        let Some(h) = self.get_handler(cid.clone()) else {
1989            return;
1990        };
1991
1992        if let Err(e) = h.clear() {
1993            eprintln!("Failed to clear handler: {:?}", e);
1994            return;
1995        }
1996        self.config
1997            .deleted_root_containers
1998            .lock()
1999            .unwrap()
2000            .insert(cid);
2001    }
2002
2003    pub fn set_hide_empty_root_containers(&self, hide: bool) {
2004        self.config
2005            .hide_empty_root_containers
2006            .store(hide, std::sync::atomic::Ordering::Relaxed);
2007    }
2008}
2009
2010// FIXME: PERF: This method is quite slow because it iterates all the changes
2011fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2012    let start_vv = oplog
2013        .dag
2014        .frontiers_to_vv(&id.into())
2015        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2016    for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2017        for op in change.ops.iter().rev() {
2018            if op.container != idx {
2019                continue;
2020            }
2021            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2022                if d.id_start.to_span(d.atom_len()).contains(id) {
2023                    return Some(ID::new(change.peer(), op.counter));
2024                }
2025            }
2026        }
2027    }
2028
2029    None
2030}
2031
2032#[derive(Debug)]
2033pub struct CommitWhenDrop<'a> {
2034    doc: &'a LoroDoc,
2035    default_options: CommitOptions,
2036}
2037
2038impl Drop for CommitWhenDrop<'_> {
2039    fn drop(&mut self) {
2040        {
2041            let mut guard = self.doc.txn.lock().unwrap();
2042            if let Some(txn) = guard.as_mut() {
2043                txn.set_default_options(std::mem::take(&mut self.default_options));
2044            };
2045        }
2046
2047        self.doc.commit_then_renew();
2048    }
2049}
2050
2051/// Options for configuring a commit operation.
2052#[derive(Debug, Clone)]
2053pub struct CommitOptions {
2054    /// Origin identifier for the commit event, used to track the source of changes.
2055    /// It doesn't persist.
2056    pub origin: Option<InternalString>,
2057
2058    /// Whether to immediately start a new transaction after committing.
2059    /// Defaults to true.
2060    pub immediate_renew: bool,
2061
2062    /// Custom timestamp for the commit in seconds since Unix epoch.
2063    /// If None, the current time will be used.
2064    pub timestamp: Option<Timestamp>,
2065
2066    /// Optional commit message to attach to the changes. It will be persisted.
2067    pub commit_msg: Option<Arc<str>>,
2068}
2069
2070impl CommitOptions {
2071    /// Creates a new CommitOptions with default values.
2072    pub fn new() -> Self {
2073        Self {
2074            origin: None,
2075            immediate_renew: true,
2076            timestamp: None,
2077            commit_msg: None,
2078        }
2079    }
2080
2081    /// Sets the origin identifier for this commit.
2082    pub fn origin(mut self, origin: &str) -> Self {
2083        self.origin = Some(origin.into());
2084        self
2085    }
2086
2087    /// Sets whether to immediately start a new transaction after committing.
2088    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2089        self.immediate_renew = immediate_renew;
2090        self
2091    }
2092
2093    /// Set the timestamp of the commit.
2094    ///
2095    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
2096    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2097        self.timestamp = Some(timestamp);
2098        self
2099    }
2100
2101    /// Sets a commit message to be attached to the changes.
2102    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2103        self.commit_msg = Some(commit_msg.into());
2104        self
2105    }
2106
2107    /// Sets the origin identifier for this commit.
2108    pub fn set_origin(&mut self, origin: Option<&str>) {
2109        self.origin = origin.map(|x| x.into())
2110    }
2111
2112    /// Sets the timestamp for this commit.
2113    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2114        self.timestamp = timestamp;
2115    }
2116}
2117
2118impl Default for CommitOptions {
2119    fn default() -> Self {
2120        Self::new()
2121    }
2122}
2123
2124#[cfg(test)]
2125mod test {
2126    use loro_common::ID;
2127
2128    use crate::{version::Frontiers, LoroDoc, ToJson};
2129
2130    #[test]
2131    fn test_sync() {
2132        fn is_send_sync<T: Send + Sync>(_v: T) {}
2133        let loro = super::LoroDoc::new();
2134        is_send_sync(loro)
2135    }
2136
2137    #[test]
2138    fn test_checkout() {
2139        let loro = LoroDoc::new();
2140        loro.set_peer_id(1).unwrap();
2141        let text = loro.get_text("text");
2142        let map = loro.get_map("map");
2143        let list = loro.get_list("list");
2144        let mut txn = loro.txn().unwrap();
2145        for i in 0..10 {
2146            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2147            text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
2148            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2149        }
2150        txn.commit().unwrap();
2151        let b = LoroDoc::new();
2152        b.import(&loro.export_snapshot().unwrap()).unwrap();
2153        loro.checkout(&Frontiers::default()).unwrap();
2154        {
2155            let json = &loro.get_deep_value();
2156            assert_eq!(
2157                json.to_json_value(),
2158                serde_json::json!({"text":"","list":[],"map":{}})
2159            );
2160        }
2161
2162        b.checkout(&ID::new(1, 2).into()).unwrap();
2163        {
2164            let json = &b.get_deep_value();
2165            assert_eq!(
2166                json.to_json_value(),
2167                serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2168            );
2169        }
2170
2171        loro.checkout(&ID::new(1, 3).into()).unwrap();
2172        {
2173            let json = &loro.get_deep_value();
2174            assert_eq!(
2175                json.to_json_value(),
2176                serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2177            );
2178        }
2179
2180        b.checkout(&ID::new(1, 29).into()).unwrap();
2181        {
2182            let json = &b.get_deep_value();
2183            assert_eq!(
2184                json.to_json_value(),
2185                serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2186            );
2187        }
2188    }
2189
2190    #[test]
2191    fn import_batch_err_181() {
2192        let a = LoroDoc::new_auto_commit();
2193        let update_a = a.export_snapshot();
2194        let b = LoroDoc::new_auto_commit();
2195        b.import_batch(&[update_a.unwrap()]).unwrap();
2196        b.get_text("text").insert(0, "hello").unwrap();
2197        b.commit_then_renew();
2198        let oplog = b.oplog().lock().unwrap();
2199        drop(oplog);
2200        b.export_from(&Default::default());
2201    }
2202}