Skip to main content

loro_internal/
loro.rs

1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::{AtomicBool, AtomicUsize};
6pub(crate) use crate::LoroDocInner;
7use crate::{
8    arena::SharedArena,
9    change::{Change, Timestamp},
10    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11    container::{
12        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13        IntoContainerId,
14    },
15    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16    dag::{Dag, DagUtils},
17    diff_calc::DiffCalculator,
18    encoding::{
19        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21        export_state_only_snapshot,
22        json_schema::{encode_change_to_json, json::JsonSchema},
23        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24    },
25    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27    id::PeerID,
28    json::JsonChange,
29    op::InnerContent,
30    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31    state::DocState,
32    subscription::{LocalUpdateCallback, Observer, Subscriber},
33    undo::DiffBatch,
34    utils::subscription::{SubscriberSetWithQueue, Subscription},
35    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37    VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42    lock::{LoroLockGroup, LoroMutex},
43    txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47    ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48    LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53    borrow::Cow,
54    cmp::Ordering,
55    collections::{hash_map::Entry, BinaryHeap},
56    ops::ControlFlow,
57    sync::{
58        atomic::Ordering::{Acquire, Release},
59        Arc,
60    },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("LoroDoc")
73            .field("config", &self.config)
74            .field("auto_commit", &self.auto_commit)
75            .field("detached", &self.detached)
76            .finish()
77    }
78}
79
80impl LoroDoc {
81    /// Run the provided closure within a commit barrier.
82    ///
83    /// This finalizes any pending auto-commit transaction first (preserving
84    /// options across an empty txn), executes `f`, then renews the transaction
85    /// (carrying preserved options) if auto-commit is enabled. This is the
86    /// common implicit-commit pattern used by internal operations such as
87    /// import/export/checkouts.
88    #[inline]
89    pub fn with_barrier<F, R>(&self, f: F) -> R
90    where
91        F: FnOnce() -> R,
92    {
93        let (options, guard) = self.implicit_commit_then_stop();
94        let result = f();
95        drop(guard);
96        self.renew_txn_if_auto_commit(options);
97        result
98    }
99
100    pub fn new() -> Self {
101        let visible_op_count = Arc::new(AtomicUsize::new(0));
102        let oplog = OpLog::new(visible_op_count.clone());
103        let arena = oplog.arena.clone();
104        let config: Configure = oplog.configure.clone();
105        let lock_group = LoroLockGroup::new();
106        let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
107        let inner = Arc::new_cyclic(|w| {
108            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
109            LoroDocInner {
110                oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
111                state,
112                config,
113                visible_op_count,
114                detached: AtomicBool::new(false),
115                auto_commit: AtomicBool::new(false),
116                observer: Arc::new(Observer::new(arena.clone())),
117                diff_calculator: Arc::new(
118                    lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
119                ),
120                txn: global_txn,
121                arena,
122                local_update_subs: SubscriberSetWithQueue::new(),
123                peer_id_change_subs: SubscriberSetWithQueue::new(),
124                pre_commit_subs: SubscriberSetWithQueue::new(),
125                first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
126            }
127        });
128        LoroDoc { inner }
129    }
130
131    pub fn fork(&self) -> Self {
132        if self.is_detached() {
133            return self
134                .fork_at(&self.state_frontiers())
135                .expect("fork_at on detached doc should not fail");
136        }
137
138        let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
139        let doc = Self::new();
140        doc.with_barrier(|| {
141            encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
142        })
143        .unwrap();
144        doc.set_config(&self.config);
145        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
146            doc.start_auto_commit();
147        }
148        doc
149    }
150    /// Enables editing of the document in detached mode.
151    ///
152    /// By default, the document cannot be edited in detached mode (after calling
153    /// `detach` or checking out a version other than the latest). This method
154    /// allows editing in detached mode.
155    ///
156    /// # Important Notes:
157    ///
158    /// - After enabling this mode, the document will use a different PeerID. Each
159    ///   time you call checkout, a new PeerID will be used.
160    /// - If you set a custom PeerID while this mode is enabled, ensure that
161    ///   concurrent operations with the same PeerID are not possible.
162    /// - On detached mode, importing will not change the state of the document.
163    ///   It also doesn't change the version of the [DocState]. The changes will be
164    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
165    pub fn set_detached_editing(&self, enable: bool) {
166        self.config.set_detached_editing(enable);
167        if enable && self.is_detached() {
168            self.with_barrier(|| {
169                self.renew_peer_id();
170            });
171        }
172    }
173
174    /// Create a doc with auto commit enabled.
175    #[inline]
176    pub fn new_auto_commit() -> Self {
177        let doc = Self::new();
178        doc.start_auto_commit();
179        doc
180    }
181
182    #[inline(always)]
183    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
184        if peer == PeerID::MAX {
185            return Err(LoroError::InvalidPeerID);
186        }
187        let next_id = self.oplog.lock().next_id(peer);
188        if self.auto_commit.load(Acquire) {
189            let doc_state = self.state.lock();
190            doc_state
191                .peer
192                .store(peer, std::sync::atomic::Ordering::Relaxed);
193
194            if doc_state.is_in_txn() {
195                drop(doc_state);
196                // Use implicit-style barrier to avoid swallowing next-commit options
197                self.with_barrier(|| {});
198            }
199            self.peer_id_change_subs.emit(&(), next_id);
200            return Ok(());
201        }
202
203        let doc_state = self.state.lock();
204        if doc_state.is_in_txn() {
205            return Err(LoroError::TransactionError(
206                "Cannot change peer id during transaction"
207                    .to_string()
208                    .into_boxed_str(),
209            ));
210        }
211
212        doc_state
213            .peer
214            .store(peer, std::sync::atomic::Ordering::Relaxed);
215        drop(doc_state);
216        self.peer_id_change_subs.emit(&(), next_id);
217        Ok(())
218    }
219
220    /// Renews the PeerID for the document.
221    pub(crate) fn renew_peer_id(&self) {
222        let mut peer_id = DefaultRandom.next_u64();
223        while peer_id == PeerID::MAX {
224            peer_id = DefaultRandom.next_u64();
225        }
226        self.set_peer_id(peer_id).unwrap();
227    }
228
229    /// Implicitly commit the cumulative auto-commit transaction.
230    /// This method only has effect when `auto_commit` is true.
231    ///
232    /// Follow-ups: the caller is responsible for renewing the transaction
233    /// as needed (e.g., via `renew_txn_if_auto_commit`). Prefer using
234    /// `with_barrier(...)` for most internal flows to handle this safely.
235    ///
236    /// Empty-commit behavior: if the pending transaction is empty, the returned
237    /// `Some(CommitOptions)` preserves next-commit options such as message and
238    /// timestamp so they can carry into the renewed transaction. Transient
239    /// labels like `origin` do not carry across an empty commit.
240    #[inline]
241    #[must_use]
242    pub fn implicit_commit_then_stop(
243        &self,
244    ) -> (
245        Option<CommitOptions>,
246        LoroMutexGuard<'_, Option<Transaction>>,
247    ) {
248        // Implicit commit: preserve options on empty commit
249        let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
250        (a, b.unwrap())
251    }
252
253    /// Commit the cumulative auto commit transaction.
254    /// It will start the next one immediately
255    ///
256    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
257    #[inline]
258    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
259        // Explicit commit: swallow options on empty commit
260        self.commit_internal(CommitOptions::new().immediate_renew(true), false)
261            .0
262    }
263
264    /// This method is called before the commit.
265    /// It can be used to modify the change before it is committed.
266    ///
267    /// It return Some(txn) if the txn is None
268    fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
269        let mut txn_guard = self.txn.lock();
270        let Some(txn) = txn_guard.as_mut() else {
271            return Some(txn_guard);
272        };
273
274        if txn.is_peer_first_appearance {
275            txn.is_peer_first_appearance = false;
276            drop(txn_guard);
277            // First commit from a peer
278            self.first_commit_from_peer_subs.emit(
279                &(),
280                FirstCommitFromPeerPayload {
281                    peer: self.peer_id(),
282                },
283            );
284        }
285
286        None
287    }
288
289    /// Core implementation for committing the cumulative auto-commit transaction.
290    ///
291    /// - When `preserve_on_empty` is true (implicit commits like export/checkout),
292    ///   commit options from an empty transaction are carried over to the next transaction
293    ///   (except `origin`, which never carries across an empty commit).
294    /// - When `preserve_on_empty` is false (explicit commits), commit options from an
295    ///   empty transaction are swallowed and NOT carried over.
296    #[instrument(skip_all)]
297    fn commit_internal(
298        &self,
299        config: CommitOptions,
300        preserve_on_empty: bool,
301    ) -> (
302        Option<CommitOptions>,
303        Option<LoroMutexGuard<'_, Option<Transaction>>>,
304    ) {
305        if !self.auto_commit.load(Acquire) {
306            let txn_guard = self.txn.lock();
307            // if not auto_commit, nothing should happen
308            // because the global txn is not used
309            return (None, Some(txn_guard));
310        }
311
312        loop {
313            if let Some(txn_guard) = self.before_commit() {
314                return (None, Some(txn_guard));
315            }
316
317            let mut txn_guard = self.txn.lock();
318            let txn = txn_guard.take();
319            let Some(mut txn) = txn else {
320                return (None, Some(txn_guard));
321            };
322            let on_commit = txn.take_on_commit();
323            if let Some(origin) = config.origin.clone() {
324                txn.set_origin(origin);
325            }
326
327            if let Some(timestamp) = config.timestamp {
328                txn.set_timestamp(timestamp);
329            }
330
331            if let Some(msg) = config.commit_msg.as_ref() {
332                txn.set_msg(Some(msg.clone()));
333            }
334
335            let id_span = txn.id_span();
336            let mut options = txn.commit().unwrap();
337            // Empty commit returns Some(options). We may preserve parts of it for implicit commits.
338            if let Some(opts) = options.as_mut() {
339                // `origin` is an event-only label and never carries across an empty commit
340                if config.origin.is_some() {
341                    opts.set_origin(None);
342                }
343                // For explicit commits, swallow options from empty commit entirely
344                if !preserve_on_empty {
345                    options = None;
346                }
347            }
348            if config.immediate_renew {
349                if self.can_edit() {
350                    let mut t = self.txn().unwrap();
351                    if let Some(options) = options.as_ref() {
352                        t.set_options(options.clone());
353                    }
354                    *txn_guard = Some(t);
355                }
356            }
357
358            if let Some(on_commit) = on_commit {
359                drop(txn_guard);
360                on_commit(&self.state, &self.oplog, id_span);
361                txn_guard = self.txn.lock();
362                if !config.immediate_renew && txn_guard.is_some() {
363                    // make sure that txn_guard is None when config.immediate_renew is false
364                    continue;
365                }
366            }
367
368            return (
369                options,
370                if !config.immediate_renew {
371                    Some(txn_guard)
372                } else {
373                    None
374                },
375            );
376        }
377    }
378
379    /// Commit the cumulative auto commit transaction (explicit API).
380    ///
381    /// This is used by user-facing explicit commits. If the transaction is empty,
382    /// any provided commit options are swallowed and will NOT carry over.
383    #[instrument(skip_all)]
384    pub fn commit_with(
385        &self,
386        config: CommitOptions,
387    ) -> (
388        Option<CommitOptions>,
389        Option<LoroMutexGuard<'_, Option<Transaction>>>,
390    ) {
391        self.commit_internal(config, false)
392    }
393
394    /// Set the commit message of the next commit
395    pub fn set_next_commit_message(&self, message: &str) {
396        let mut binding = self.txn.lock();
397        let Some(txn) = binding.as_mut() else {
398            return;
399        };
400
401        if message.is_empty() {
402            txn.set_msg(None)
403        } else {
404            txn.set_msg(Some(message.into()))
405        }
406    }
407
408    /// Set the origin of the next commit
409    pub fn set_next_commit_origin(&self, origin: &str) {
410        let mut txn = self.txn.lock();
411        if let Some(txn) = txn.as_mut() {
412            txn.set_origin(origin.into());
413        }
414    }
415
416    /// Set the timestamp of the next commit
417    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
418        let mut txn = self.txn.lock();
419        if let Some(txn) = txn.as_mut() {
420            txn.set_timestamp(timestamp);
421        }
422    }
423
424    /// Set the options of the next commit
425    pub fn set_next_commit_options(&self, options: CommitOptions) {
426        let mut txn = self.txn.lock();
427        if let Some(txn) = txn.as_mut() {
428            txn.set_options(options);
429        }
430    }
431
432    /// Clear the options of the next commit
433    pub fn clear_next_commit_options(&self) {
434        let mut txn = self.txn.lock();
435        if let Some(txn) = txn.as_mut() {
436            txn.set_options(CommitOptions::new());
437        }
438    }
439
440    /// Set whether to record the timestamp of each change. Default is `false`.
441    ///
442    /// If enabled, the Unix timestamp will be recorded for each change automatically.
443    ///
444    /// You can also set each timestamp manually when you commit a change.
445    /// The timestamp manually set will override the automatic one.
446    ///
447    /// NOTE: Timestamps are forced to be in ascending order.
448    /// If you commit a new change with a timestamp that is less than the existing one,
449    /// the largest existing timestamp will be used instead.
450    #[inline]
451    pub fn set_record_timestamp(&self, record: bool) {
452        self.config.set_record_timestamp(record);
453    }
454
455    /// Set the interval of mergeable changes, in seconds.
456    ///
457    /// If two continuous local changes are within the interval, they will be merged into one change.
458    /// The default value is 1000 seconds.
459    #[inline]
460    pub fn set_change_merge_interval(&self, interval: i64) {
461        self.config.set_merge_interval(interval);
462    }
463
464    pub fn can_edit(&self) -> bool {
465        !self.is_detached() || self.config.detached_editing()
466    }
467
468    pub fn is_detached_editing_enabled(&self) -> bool {
469        self.config.detached_editing()
470    }
471
472    #[inline]
473    pub fn config_text_style(&self, text_style: StyleConfigMap) {
474        self.config.text_style_config.write().map = text_style.map;
475    }
476
477    #[inline]
478    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
479        self.config.text_style_config.write().default_style = text_style;
480    }
481    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
482        let doc = Self::new();
483        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
484        if mode.is_snapshot() {
485            doc.with_barrier(|| -> Result<(), LoroError> {
486                decode_snapshot(&doc, mode, body, Default::default())?;
487                Ok(())
488            })?;
489            Ok(doc)
490        } else {
491            Err(LoroError::DecodeError(
492                "Invalid encode mode".to_string().into(),
493            ))
494        }
495    }
496
497    /// Is the document empty? (no ops)
498    #[inline(always)]
499    pub fn can_reset_with_snapshot(&self) -> bool {
500        let oplog = self.oplog.lock();
501        if oplog.batch_importing {
502            return false;
503        }
504
505        if self.is_detached() {
506            return false;
507        }
508
509        oplog.is_empty() && self.state.lock().can_import_snapshot()
510    }
511
512    /// Whether [OpLog] and [DocState] are detached.
513    ///
514    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
515    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
516    /// You need to call `checkout` to make it take effect.
517    #[inline(always)]
518    pub fn is_detached(&self) -> bool {
519        self.detached.load(Acquire)
520    }
521
522    pub(crate) fn set_detached(&self, detached: bool) {
523        self.detached.store(detached, Release);
524    }
525
526    #[inline(always)]
527    pub fn peer_id(&self) -> PeerID {
528        self.state
529            .lock()
530            .peer
531            .load(std::sync::atomic::Ordering::Relaxed)
532    }
533
534    #[inline(always)]
535    pub fn detach(&self) {
536        self.with_barrier(|| self.set_detached(true));
537    }
538
539    #[inline(always)]
540    pub fn attach(&self) {
541        self.checkout_to_latest()
542    }
543
544    /// Get the timestamp of the current state.
545    /// It's the last edit time of the [DocState].
546    pub fn state_timestamp(&self) -> Timestamp {
547        // Acquire locks in correct order: read frontiers first, then query OpLog.
548        let f = { self.state.lock().frontiers.clone() };
549        self.oplog.lock().get_timestamp_of_version(&f)
550    }
551
552    #[inline(always)]
553    pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
554        &self.state
555    }
556
557    #[inline]
558    pub fn get_state_deep_value(&self) -> LoroValue {
559        self.state.lock().get_deep_value()
560    }
561
562    #[inline(always)]
563    pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
564        &self.oplog
565    }
566
567    #[inline(always)]
568    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
569        let s = debug_span!("import", peer = self.peer_id());
570        let _e = s.enter();
571        self.import_with(bytes, Default::default())
572    }
573
574    #[inline]
575    pub fn import_with(
576        &self,
577        bytes: &[u8],
578        origin: InternalString,
579    ) -> Result<ImportStatus, LoroError> {
580        self.with_barrier(|| self._import_with(bytes, origin))
581    }
582
583    #[tracing::instrument(skip_all)]
584    fn _import_with(
585        &self,
586        bytes: &[u8],
587        origin: InternalString,
588    ) -> Result<ImportStatus, LoroError> {
589        ensure_cov::notify_cov("loro_internal::import");
590        let parsed = parse_header_and_body(bytes, true)?;
591        loro_common::info!("Importing with mode={:?}", &parsed.mode);
592        let result = match parsed.mode {
593            EncodeMode::OutdatedRle => {
594                if self.state.lock().is_in_txn() {
595                    return Err(LoroError::ImportWhenInTxn);
596                }
597
598                let s = tracing::span!(
599                    tracing::Level::INFO,
600                    "Import updates ",
601                    peer = self.peer_id()
602                );
603                let _e = s.enter();
604                self.update_oplog_and_apply_delta_to_state_if_needed(
605                    |oplog| oplog.decode(parsed),
606                    origin,
607                )
608            }
609            EncodeMode::OutdatedSnapshot => {
610                if self.can_reset_with_snapshot() {
611                    loro_common::info!("Init by snapshot {}", self.peer_id());
612                    decode_snapshot(self, parsed.mode, parsed.body, origin)
613                } else {
614                    self.update_oplog_and_apply_delta_to_state_if_needed(
615                        |oplog| oplog.decode(parsed),
616                        origin,
617                    )
618                }
619            }
620            EncodeMode::FastSnapshot => {
621                if self.can_reset_with_snapshot() {
622                    ensure_cov::notify_cov("loro_internal::import::snapshot");
623                    loro_common::info!("Init by fast snapshot {}", self.peer_id());
624                    decode_snapshot(self, parsed.mode, parsed.body, origin)
625                } else {
626                    self.import_changes_and_apply_delta_to_state_if_needed(
627                        |oplog| encoding::decode_oplog_changes(oplog, parsed),
628                        origin,
629                    )
630
631                    // let new_doc = LoroDoc::new();
632                    // new_doc.import(bytes)?;
633                    // let updates = new_doc.export(ExportMode::updates(&self.oplog_vv())).unwrap();
634                    // return self.import_with(updates.as_slice(), origin);
635                }
636            }
637            EncodeMode::FastUpdates => self.import_changes_and_apply_delta_to_state_if_needed(
638                |oplog| encoding::decode_oplog_changes(oplog, parsed),
639                origin,
640            ),
641            EncodeMode::Auto => {
642                unreachable!()
643            }
644        };
645
646        self.emit_events();
647
648        result
649    }
650
651    #[tracing::instrument(skip_all)]
652    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
653        &self,
654        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
655        origin: InternalString,
656    ) -> Result<ImportStatus, LoroError> {
657        let mut oplog = self.oplog.lock();
658        oplog.begin_import_rollback();
659        if !self.is_detached() {
660            let old_vv = oplog.vv().clone();
661            let old_frontiers = oplog.frontiers().clone();
662            let result = f(&mut oplog);
663            if &old_vv != oplog.vv() {
664                let mut diff = DiffCalculator::new(false);
665                let (diff, diff_mode) = diff.calc_diff_internal(
666                    &oplog,
667                    &old_vv,
668                    &old_frontiers,
669                    oplog.vv(),
670                    oplog.dag.get_frontiers(),
671                    None,
672                );
673                let mut state = self.state.lock();
674                if let Err(e) = state.apply_diff(
675                    InternalDocDiff {
676                        origin,
677                        diff: (diff).into(),
678                        by: EventTriggerKind::Import,
679                        new_version: Cow::Owned(oplog.frontiers().clone()),
680                    },
681                    diff_mode,
682                ) {
683                    oplog.rollback_import();
684                    return Err(e);
685                }
686            }
687            match result {
688                Ok(result) => {
689                    oplog.commit_import_rollback();
690                    Ok(result)
691                }
692                Err(e) => {
693                    // Some import errors are reported after a valid prefix has
694                    // been inserted into the oplog. Keep that prefix if it was
695                    // also applied to state; rollback is for failed state apply
696                    // or decode errors that made no visible oplog progress.
697                    if &old_vv == oplog.vv() {
698                        oplog.rollback_import();
699                    } else {
700                        oplog.commit_import_rollback();
701                    }
702                    Err(e)
703                }
704            }
705        } else {
706            match f(&mut oplog) {
707                Ok(result) => {
708                    oplog.commit_import_rollback();
709                    Ok(result)
710                }
711                Err(e) => {
712                    oplog.rollback_import();
713                    Err(e)
714                }
715            }
716        }
717    }
718
719    #[tracing::instrument(skip_all)]
720    pub(crate) fn import_changes_and_apply_delta_to_state_if_needed(
721        &self,
722        decode_changes: impl FnOnce(&mut OpLog) -> Result<Vec<Change>, LoroError>,
723        origin: InternalString,
724    ) -> Result<ImportStatus, LoroError> {
725        let mut oplog = self.oplog.lock();
726        let arena_checkpoint = oplog.arena.checkpoint_for_rollback();
727        let changes = match decode_changes(&mut oplog) {
728            Ok(changes) => changes,
729            Err(e) => {
730                oplog.arena.rollback(arena_checkpoint);
731                return Err(e);
732            }
733        };
734
735        let preflight = oplog.preflight_import_changes(&changes);
736        if preflight.has_deps_before_shallow_root
737            && (self.is_detached() || !preflight.applies_to_dag)
738        {
739            oplog.arena.rollback(arena_checkpoint);
740            return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
741        }
742
743        if self.is_detached() {
744            let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
745            if result.has_deps_before_shallow_root {
746                return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
747            }
748
749            return Ok(result.status);
750        }
751
752        if !preflight.applies_to_dag {
753            let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
754            if result.has_deps_before_shallow_root {
755                oplog.arena.rollback(arena_checkpoint);
756                return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
757            }
758
759            return Ok(result.status);
760        }
761
762        let old_vv = oplog.vv().clone();
763        let old_frontiers = oplog.frontiers().clone();
764        let rollback_enabled = preflight.needs_state_apply_rollback;
765        if rollback_enabled {
766            oplog.begin_import_rollback_with_arena(arena_checkpoint);
767        }
768
769        let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
770        if &old_vv != oplog.vv() {
771            let mut diff = DiffCalculator::new(false);
772            let (diff, diff_mode) = diff.calc_diff_internal(
773                &oplog,
774                &old_vv,
775                &old_frontiers,
776                oplog.vv(),
777                oplog.dag.get_frontiers(),
778                None,
779            );
780            let mut state = self.state.lock();
781            if let Err(e) = state.apply_diff(
782                InternalDocDiff {
783                    origin,
784                    diff: (diff).into(),
785                    by: EventTriggerKind::Import,
786                    new_version: Cow::Owned(oplog.frontiers().clone()),
787                },
788                diff_mode,
789            ) {
790                if rollback_enabled {
791                    oplog.rollback_import();
792                    return Err(e);
793                }
794
795                panic!("state apply returned Err for import without rollback guard: {e}");
796            }
797        }
798
799        if result.has_deps_before_shallow_root {
800            if rollback_enabled {
801                oplog.commit_import_rollback();
802            }
803            return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
804        }
805
806        if rollback_enabled {
807            oplog.commit_import_rollback();
808        }
809        Ok(result.status)
810    }
811
812    fn emit_events(&self) {
813        // we should not hold the lock when emitting events
814        let events = {
815            let mut state = self.state.lock();
816            state.take_events()
817        };
818        for event in events {
819            self.observer.emit(event);
820        }
821    }
822
823    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
824        let mut state = self.state.lock();
825        state.take_events()
826    }
827
828    /// Import the json schema updates.
829    ///
830    /// only supports backward compatibility but not forward compatibility.
831    #[tracing::instrument(skip_all)]
832    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
833        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
834        self.with_barrier(|| {
835            let result = self.import_changes_and_apply_delta_to_state_if_needed(
836                |oplog| crate::encoding::json_schema::decode_json_changes(json, &oplog.arena),
837                Default::default(),
838            );
839            self.emit_events();
840            result
841        })
842    }
843
844    pub fn export_json_updates(
845        &self,
846        start_vv: &VersionVector,
847        end_vv: &VersionVector,
848        with_peer_compression: bool,
849    ) -> JsonSchema {
850        self.with_barrier(|| {
851            let oplog = self.oplog.lock();
852            let mut start_vv = start_vv;
853            let _temp: Option<VersionVector>;
854            if !oplog.dag.shallow_since_vv().is_empty() {
855                // Make sure that start_vv >= shallow_since_vv
856                let mut include_all = true;
857                for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
858                    if start_vv.get(peer).unwrap_or(&0) < counter {
859                        include_all = false;
860                        break;
861                    }
862                }
863                if !include_all {
864                    let mut vv = start_vv.clone();
865                    for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
866                        vv.extend_to_include_end_id(ID::new(peer, counter));
867                    }
868                    _temp = Some(vv);
869                    start_vv = _temp.as_ref().unwrap();
870                }
871            }
872
873            crate::encoding::json_schema::export_json(
874                &oplog,
875                start_vv,
876                end_vv,
877                with_peer_compression,
878            )
879        })
880    }
881
882    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
883        let oplog = self.oplog.lock();
884        let mut changes = export_json_in_id_span(&oplog, id_span);
885        if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
886            let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
887            changes.push(change_json);
888        }
889        changes
890    }
891
892    /// Get the version vector of the current OpLog
893    #[inline]
894    pub fn oplog_vv(&self) -> VersionVector {
895        self.oplog.lock().vv().clone()
896    }
897
898    /// Get the version vector of the current [DocState]
899    #[inline]
900    pub fn state_vv(&self) -> VersionVector {
901        let oplog = self.oplog.lock();
902        let f = &self.state.lock().frontiers;
903        oplog.dag.frontiers_to_vv(f).unwrap()
904    }
905
906    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
907        let value: LoroValue = self.state.lock().get_value_by_path(path)?;
908        if let LoroValue::Container(c) = value {
909            Some(ValueOrHandler::Handler(Handler::new_attached(
910                c.clone(),
911                self.clone(),
912            )))
913        } else {
914            Some(ValueOrHandler::Value(value))
915        }
916    }
917
918    /// Get the handler by the string path.
919    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
920        let path = str_to_path(path)?;
921        self.get_by_path(&path)
922    }
923
924    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
925        let arena = &self.arena;
926        let txn = self.txn.lock();
927        let txn = txn.as_ref()?;
928        let ops_ = txn.local_ops();
929        let new_id = ID {
930            peer: *txn.peer(),
931            counter: ops_.first()?.counter,
932        };
933        let change = ChangeRef {
934            id: &new_id,
935            deps: txn.frontiers(),
936            timestamp: &txn
937                .timestamp()
938                .as_ref()
939                .copied()
940                .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
941            commit_msg: txn.msg(),
942            ops: ops_,
943            lamport: txn.lamport(),
944        };
945        let json = encode_change_to_json(change, arena);
946        Some(json)
947    }
948
949    #[inline]
950    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
951        if self.has_container(&id) {
952            Some(Handler::new_attached(id, self.clone()))
953        } else {
954            None
955        }
956    }
957
958    /// id can be a str, ContainerID, or ContainerIdRaw.
959    /// if it's str it will use Root container, which will not be None
960    #[inline]
961    pub fn try_get_text<I: IntoContainerId>(&self, id: I) -> Option<TextHandler> {
962        let id = id.into_container_id(&self.arena, ContainerType::Text);
963        if !self.has_container(&id) {
964            return None;
965        }
966        Handler::new_attached(id, self.clone()).into_text().ok()
967    }
968
969    /// id can be a str, ContainerID, or ContainerIdRaw.
970    /// if it's str it will use Root container, which will not be None
971    #[inline]
972    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
973        self.try_get_text(id)
974            .expect("The container does not exist in the document. Use `try_get_text` or `get_container` to check for existence.")
975    }
976
977    /// id can be a str, ContainerID, or ContainerIdRaw.
978    /// if it's str it will use Root container, which will not be None
979    #[inline]
980    pub fn try_get_list<I: IntoContainerId>(&self, id: I) -> Option<ListHandler> {
981        let id = id.into_container_id(&self.arena, ContainerType::List);
982        if !self.has_container(&id) {
983            return None;
984        }
985        Handler::new_attached(id, self.clone()).into_list().ok()
986    }
987
988    /// id can be a str, ContainerID, or ContainerIdRaw.
989    /// if it's str it will use Root container, which will not be None
990    #[inline]
991    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
992        self.try_get_list(id)
993            .expect("The container does not exist in the document. Use `try_get_list` or `get_container` to check for existence.")
994    }
995
996    /// id can be a str, ContainerID, or ContainerIdRaw.
997    /// if it's str it will use Root container, which will not be None
998    #[inline]
999    pub fn try_get_movable_list<I: IntoContainerId>(&self, id: I) -> Option<MovableListHandler> {
1000        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
1001        if !self.has_container(&id) {
1002            return None;
1003        }
1004        Handler::new_attached(id, self.clone())
1005            .into_movable_list()
1006            .ok()
1007    }
1008
1009    /// id can be a str, ContainerID, or ContainerIdRaw.
1010    /// if it's str it will use Root container, which will not be None
1011    #[inline]
1012    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
1013        self.try_get_movable_list(id)
1014            .expect("The container does not exist in the document. Use `try_get_movable_list` or `get_container` to check for existence.")
1015    }
1016
1017    /// id can be a str, ContainerID, or ContainerIdRaw.
1018    /// if it's str it will use Root container, which will not be None
1019    #[inline]
1020    pub fn try_get_map<I: IntoContainerId>(&self, id: I) -> Option<MapHandler> {
1021        let id = id.into_container_id(&self.arena, ContainerType::Map);
1022        if !self.has_container(&id) {
1023            return None;
1024        }
1025        Handler::new_attached(id, self.clone()).into_map().ok()
1026    }
1027
1028    /// id can be a str, ContainerID, or ContainerIdRaw.
1029    /// if it's str it will use Root container, which will not be None
1030    #[inline]
1031    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
1032        self.try_get_map(id)
1033            .expect("The container does not exist in the document. Use `try_get_map` or `get_container` to check for existence.")
1034    }
1035
1036    /// id can be a str, ContainerID, or ContainerIdRaw.
1037    /// if it's str it will use Root container, which will not be None
1038    #[inline]
1039    pub fn try_get_tree<I: IntoContainerId>(&self, id: I) -> Option<TreeHandler> {
1040        let id = id.into_container_id(&self.arena, ContainerType::Tree);
1041        if !self.has_container(&id) {
1042            return None;
1043        }
1044        Handler::new_attached(id, self.clone()).into_tree().ok()
1045    }
1046
1047    /// id can be a str, ContainerID, or ContainerIdRaw.
1048    /// if it's str it will use Root container, which will not be None
1049    #[inline]
1050    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
1051        self.try_get_tree(id)
1052            .expect("The container does not exist in the document. Use `try_get_tree` or `get_container` to check for existence.")
1053    }
1054
1055    #[cfg(feature = "counter")]
1056    pub fn try_get_counter<I: IntoContainerId>(
1057        &self,
1058        id: I,
1059    ) -> Option<crate::handler::counter::CounterHandler> {
1060        let id = id.into_container_id(&self.arena, ContainerType::Counter);
1061        if !self.has_container(&id) {
1062            return None;
1063        }
1064        Handler::new_attached(id, self.clone()).into_counter().ok()
1065    }
1066
1067    #[cfg(feature = "counter")]
1068    pub fn get_counter<I: IntoContainerId>(
1069        &self,
1070        id: I,
1071    ) -> crate::handler::counter::CounterHandler {
1072        self.try_get_counter(id)
1073            .expect("The container does not exist in the document. Use `try_get_counter` or `get_container` to check for existence.")
1074    }
1075
1076    #[must_use]
1077    pub fn has_container(&self, id: &ContainerID) -> bool {
1078        if id.is_root() {
1079            return true;
1080        }
1081
1082        let exist = self.state.lock().does_container_exist(id);
1083        exist
1084    }
1085
1086    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
1087    ///
1088    /// This is an internal API. You should NOT use it directly.
1089    ///
1090    /// # Internal
1091    ///
1092    /// This method will use the diff calculator to calculate the diff required to time travel
1093    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
1094    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
1095    ///
1096    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
1097    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
1098    /// distance from id_span to the current latest version.
1099    #[instrument(level = "info", skip_all)]
1100    pub fn undo_internal(
1101        &self,
1102        id_span: IdSpan,
1103        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1104        post_transform_base: Option<&DiffBatch>,
1105        before_diff: &mut dyn FnMut(&DiffBatch),
1106    ) -> LoroResult<CommitWhenDrop<'_>> {
1107        if !self.can_edit() {
1108            return Err(LoroError::EditWhenDetached);
1109        }
1110
1111        let (options, txn) = self.implicit_commit_then_stop();
1112        if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
1113            self.renew_txn_if_auto_commit(options);
1114            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
1115        }
1116
1117        let (was_recording, latest_frontiers) = {
1118            let mut state = self.state.lock();
1119            let was_recording = state.is_recording();
1120            state.stop_and_clear_recording();
1121            (was_recording, state.frontiers.clone())
1122        };
1123
1124        let spans = self.oplog.lock().split_span_based_on_deps(id_span);
1125        let diff = crate::undo::undo(
1126            spans,
1127            match post_transform_base {
1128                Some(d) => Either::Right(d),
1129                None => Either::Left(&latest_frontiers),
1130            },
1131            |from, to| {
1132                self._checkout_without_emitting(from, false, false).unwrap();
1133                self.state.lock().start_recording();
1134                self._checkout_without_emitting(to, false, false).unwrap();
1135                let mut state = self.state.lock();
1136                let e = state.take_events();
1137                state.stop_and_clear_recording();
1138                DiffBatch::new(e)
1139            },
1140            before_diff,
1141        );
1142
1143        // println!("\nundo_internal: diff: {:?}", diff);
1144        // println!("container remap: {:?}", container_remap);
1145
1146        self._checkout_without_emitting(&latest_frontiers, false, false)?;
1147        self.set_detached(false);
1148        if was_recording {
1149            self.state.lock().start_recording();
1150        }
1151        drop(txn);
1152        self.start_auto_commit();
1153        // Try applying the diff, but ignore the error if it happens.
1154        // MovableList's undo behavior is too tricky to handle in a collaborative env
1155        // so in edge cases this may be an Error
1156        if let Err(e) = self._apply_diff(diff, container_remap, true) {
1157            warn!("Undo Failed {:?}", e);
1158        }
1159
1160        if let Some(options) = options {
1161            self.set_next_commit_options(options);
1162        }
1163        Ok(CommitWhenDrop {
1164            doc: self,
1165            default_options: CommitOptions::new().origin("undo"),
1166        })
1167    }
1168
1169    /// Generate a series of local operations that can revert the current doc to the target
1170    /// version.
1171    ///
1172    /// Internally, it will calculate the diff between the current state and the target state,
1173    /// and apply the diff to the current state.
1174    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1175        // TODO: test when the doc is readonly
1176        // TODO: test when the doc is detached but enabled editing
1177        let f = self.state_frontiers();
1178        let diff = self.diff(&f, target)?;
1179        self._apply_diff(diff, &mut Default::default(), false)
1180    }
1181
1182    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
1183    ///
1184    /// NOTE: This method will make the doc enter the **detached mode**.
1185    // FIXME: This method needs testing (no event should be emitted during processing this)
1186    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1187        {
1188            // Check whether a and b are valid before checkout so this returns a normal error
1189            // instead of panicking on shallow docs.
1190            let oplog = self.oplog.lock();
1191            let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1192                for id in frontiers.iter() {
1193                    if !oplog.dag.contains(id) {
1194                        return Err(LoroError::FrontiersNotFound(id));
1195                    }
1196                }
1197
1198                if oplog.dag.is_before_shallow_root(frontiers) {
1199                    return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1200                }
1201
1202                Ok(())
1203            };
1204
1205            validate_frontiers(a)?;
1206            validate_frontiers(b)?;
1207        }
1208
1209        let (options, txn) = self.implicit_commit_then_stop();
1210        let was_detached = self.is_detached();
1211        let old_frontiers = self.state_frontiers();
1212        let was_recording = {
1213            let mut state = self.state.lock();
1214            let is_recording = state.is_recording();
1215            state.stop_and_clear_recording();
1216            is_recording
1217        };
1218        let result = (|| {
1219            self._checkout_without_emitting(a, true, false)?;
1220            self.state.lock().start_recording();
1221            self._checkout_without_emitting(b, true, false)?;
1222            let mut state = self.state.lock();
1223            let e = state.take_events();
1224            state.stop_and_clear_recording();
1225            Ok::<_, LoroError>(e)
1226        })();
1227
1228        // Always restore state regardless of whether diff calculation succeeded
1229        self._checkout_without_emitting(&old_frontiers, false, false)
1230            .unwrap();
1231        drop(txn);
1232        if !was_detached {
1233            self.set_detached(false);
1234            self.renew_txn_if_auto_commit(options);
1235        }
1236        if was_recording {
1237            self.state.lock().start_recording();
1238        }
1239        result.map(DiffBatch::new)
1240    }
1241
1242    /// Apply a diff to the current state.
1243    #[inline(always)]
1244    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1245        self._apply_diff(diff, &mut Default::default(), true)
1246    }
1247
1248    /// Apply a diff to the current state.
1249    ///
1250    /// This method will not recreate containers with the same [ContainerID]s.
1251    /// While this can be convenient in certain cases, it can break several internal invariants:
1252    ///
1253    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
1254    ///    would result in multiple instances of the same container in the document.
1255    /// 2. Unreachable containers should be removable from the state when necessary.
1256    ///
1257    /// However, the diff may contain operations that depend on container IDs.
1258    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
1259    pub(crate) fn _apply_diff(
1260        &self,
1261        diff: DiffBatch,
1262        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1263        skip_unreachable: bool,
1264    ) -> LoroResult<()> {
1265        if !self.can_edit() {
1266            return Err(LoroError::EditWhenDetached);
1267        }
1268
1269        let mut ans: LoroResult<()> = Ok(());
1270        let mut missing_containers: Vec<ContainerID> = Vec::new();
1271        for (mut id, diff) in diff.into_iter() {
1272            let mut remapped = false;
1273            while let Some(rid) = container_remap.get(&id) {
1274                remapped = true;
1275                id = rid.clone();
1276            }
1277
1278            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1279                // Not in arena does not imply non-existent; consult state/kv and register lazily
1280                let exists = self.state.lock().does_container_exist(&id);
1281                if !exists {
1282                    missing_containers.push(id);
1283                    continue;
1284                }
1285                // Ensure registration so handlers can be created
1286                self.state.lock().ensure_container(&id);
1287            }
1288
1289            if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1290                continue;
1291            }
1292
1293            let Some(h) = self.get_handler(id.clone()) else {
1294                return Err(LoroError::ContainersNotFound {
1295                    containers: Box::new(vec![id]),
1296                });
1297            };
1298            if let Err(e) = h.apply_diff(diff, container_remap) {
1299                ans = Err(e);
1300            }
1301        }
1302
1303        if !missing_containers.is_empty() {
1304            return Err(LoroError::ContainersNotFound {
1305                containers: Box::new(missing_containers),
1306            });
1307        }
1308
1309        ans
1310    }
1311
1312    /// This is for debugging purpose. It will travel the whole oplog
1313    #[inline]
1314    pub fn diagnose_size(&self) {
1315        self.oplog().lock().diagnose_size();
1316    }
1317
1318    #[inline]
1319    pub fn oplog_frontiers(&self) -> Frontiers {
1320        self.oplog().lock().frontiers().clone()
1321    }
1322
1323    #[inline]
1324    pub fn state_frontiers(&self) -> Frontiers {
1325        self.state.lock().frontiers.clone()
1326    }
1327
1328    /// - Ordering::Less means self is less than target or parallel
1329    /// - Ordering::Equal means versions equal
1330    /// - Ordering::Greater means self's version is greater than target
1331    #[inline]
1332    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1333        self.oplog().lock().cmp_with_frontiers(other)
1334    }
1335
1336    /// Compare two [Frontiers] causally.
1337    ///
1338    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1339    #[inline]
1340    pub fn cmp_frontiers(
1341        &self,
1342        a: &Frontiers,
1343        b: &Frontiers,
1344    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1345        self.oplog().lock().cmp_frontiers(a, b)
1346    }
1347
1348    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1349        let mut state = self.state.lock();
1350        if !state.is_recording() {
1351            state.start_recording();
1352        }
1353
1354        self.observer.subscribe_root(callback)
1355    }
1356
1357    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1358        let mut state = self.state.lock();
1359        if !state.is_recording() {
1360            state.start_recording();
1361        }
1362
1363        self.observer.subscribe(container_id, callback)
1364    }
1365
1366    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1367        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1368        activate();
1369        sub
1370    }
1371
1372    // PERF: opt
1373    #[tracing::instrument(skip_all)]
1374    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1375        if bytes.is_empty() {
1376            return Ok(ImportStatus::default());
1377        }
1378
1379        if bytes.len() == 1 {
1380            return self.import(&bytes[0]);
1381        }
1382
1383        let mut success = VersionRange::default();
1384        let mut meta_arr = bytes
1385            .iter()
1386            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1387            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1388        meta_arr.sort_by(|a, b| {
1389            a.0.mode
1390                .cmp(&b.0.mode)
1391                .then(b.0.change_num.cmp(&a.0.change_num))
1392        });
1393
1394        let (options, txn) = self.implicit_commit_then_stop();
1395        // Why we should keep locking `txn` here
1396        //
1397        // In a multi-threaded environment, `import_batch` used to drop the txn lock
1398        // (via `commit_then_stop` + `drop(txn)`) and call `detach()`/`checkout_to_latest()`
1399        // around the batch import. That created a race where another thread could
1400        // start or renew the auto-commit txn and perform local edits while we were
1401        // importing and temporarily detached. Those interleaved local edits could
1402        // violate invariants between `OpLog` and `DocState` (e.g., state being
1403        // updated when we expect it not to, missed events, or inconsistent
1404        // frontiers), as exposed by the loom test `local_edits_during_batch_import`.
1405        //
1406        // The fix is to hold the txn mutex for the entire critical section:
1407        // - Stop the current txn and keep the mutex guard.
1408        // - Force-detach with `set_detached(true)` (avoids `detach()` side effects),
1409        //   then run each `_import_with(...)` while detached so imports only touch
1410        //   the `OpLog`.
1411        // - After importing, reattach by checking out to latest and renew the txn
1412        //   using `_checkout_to_latest_with_guard`, which keeps the mutex held while
1413        //   (re)starting the auto-commit txn.
1414        //
1415        // Holding the lock ensures no concurrent thread can create/renew a txn and
1416        // do local edits in the middle of the batch import, making the whole
1417        // operation atomic with respect to local edits.
1418        let is_detached = self.is_detached();
1419        self.set_detached(true);
1420        self.oplog.lock().batch_importing = true;
1421        let mut err = None;
1422        for (_meta, data) in meta_arr {
1423            match self._import_with(data, Default::default()) {
1424                Ok(s) => {
1425                    for (peer, (start, end)) in s.success.iter() {
1426                        match success.0.entry(*peer) {
1427                            Entry::Occupied(mut e) => {
1428                                e.get_mut().1 = *end.max(&e.get().1);
1429                            }
1430                            Entry::Vacant(e) => {
1431                                e.insert((*start, *end));
1432                            }
1433                        }
1434                    }
1435                }
1436                Err(e) => {
1437                    err = Some(e);
1438                }
1439            }
1440        }
1441
1442        let mut oplog = self.oplog.lock();
1443        oplog.batch_importing = false;
1444        let pending = oplog.pending_changes.version_range();
1445        drop(oplog);
1446        if !is_detached {
1447            self._checkout_to_latest_with_guard(txn);
1448        } else {
1449            drop(txn);
1450        }
1451
1452        self.renew_txn_if_auto_commit(options);
1453        if let Some(err) = err {
1454            return Err(err);
1455        }
1456
1457        Ok(ImportStatus {
1458            success,
1459            pending: if pending.is_empty() {
1460                None
1461            } else {
1462                Some(pending)
1463            },
1464        })
1465    }
1466
1467    /// Get shallow value of the document.
1468    #[inline]
1469    pub fn get_value(&self) -> LoroValue {
1470        self.state.lock().get_value()
1471    }
1472
1473    /// Get deep value of the document.
1474    #[inline]
1475    pub fn get_deep_value(&self) -> LoroValue {
1476        self.state.lock().get_deep_value()
1477    }
1478
1479    /// Get deep value of the document with container id
1480    #[inline]
1481    pub fn get_deep_value_with_id(&self) -> LoroValue {
1482        self.state.lock().get_deep_value_with_id()
1483    }
1484
1485    pub fn checkout_to_latest(&self) {
1486        let (options, _guard) = self.implicit_commit_then_stop();
1487        if !self.is_detached() {
1488            drop(_guard);
1489            self.renew_txn_if_auto_commit(options);
1490            return;
1491        }
1492
1493        self._checkout_to_latest_without_commit(true)
1494            .expect("checkout to oplog frontiers should succeed");
1495        self.emit_events();
1496        drop(_guard);
1497        self.renew_txn_if_auto_commit(options);
1498    }
1499
1500    fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1501        if !self.is_detached() {
1502            self._renew_txn_if_auto_commit_with_guard(None, guard);
1503            return;
1504        }
1505
1506        self._checkout_to_latest_without_commit(true)
1507            .expect("checkout to oplog frontiers should succeed");
1508        self._renew_txn_if_auto_commit_with_guard(None, guard);
1509    }
1510
1511    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1512    pub(crate) fn _checkout_to_latest_without_commit(
1513        &self,
1514        to_commit_then_renew: bool,
1515    ) -> LoroResult<()> {
1516        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1517            let f = self.oplog_frontiers();
1518            let this = &self;
1519            let frontiers = &f;
1520            this._checkout_without_emitting(frontiers, false, to_commit_then_renew)?;
1521            // We don't need to shrink frontiers because oplog's frontiers are already shrinked.
1522            this.emit_events();
1523            if this.config.detached_editing() {
1524                this.renew_peer_id();
1525            }
1526
1527            self.set_detached(false);
1528            Ok(())
1529        })
1530    }
1531
1532    /// Checkout [DocState] to a specific version.
1533    ///
1534    /// This will make the current [DocState] detached from the latest version of [OpLog].
1535    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1536    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1537        let was_detached = self.is_detached();
1538        let (options, guard) = self.implicit_commit_then_stop();
1539        let result = self._checkout_without_emitting(frontiers, true, true);
1540        if result.is_ok() {
1541            self.emit_events();
1542        }
1543        drop(guard);
1544        if self.config.detached_editing() {
1545            if result.is_ok() {
1546                self.renew_peer_id();
1547            }
1548            self.renew_txn_if_auto_commit(options);
1549        } else if result.is_err() {
1550            if !was_detached {
1551                self.renew_txn_if_auto_commit(options);
1552            }
1553        } else if !self.is_detached() {
1554            self.renew_txn_if_auto_commit(options);
1555        }
1556
1557        result
1558    }
1559
1560    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1561    #[instrument(level = "info", skip(self))]
1562    pub(crate) fn _checkout_without_emitting(
1563        &self,
1564        frontiers: &Frontiers,
1565        to_shrink_frontiers: bool,
1566        to_commit_then_renew: bool,
1567    ) -> Result<(), LoroError> {
1568        if !self.txn.is_locked() {
1569            return Err(LoroError::TransactionError(
1570                "checkout requires the transaction mutex to be held"
1571                    .to_string()
1572                    .into_boxed_str(),
1573            ));
1574        }
1575        let from_frontiers = self.state_frontiers();
1576        loro_common::info!(
1577            "checkout from={:?} to={:?} cur_vv={:?}",
1578            from_frontiers,
1579            frontiers,
1580            self.oplog_vv()
1581        );
1582
1583        if &from_frontiers == frontiers {
1584            return Ok(());
1585        }
1586
1587        let oplog = self.oplog.lock();
1588        if oplog.dag.is_before_shallow_root(frontiers) {
1589            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1590        }
1591
1592        let frontiers = if to_shrink_frontiers {
1593            shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1594        } else {
1595            frontiers.clone()
1596        };
1597
1598        if from_frontiers == frontiers {
1599            return Ok(());
1600        }
1601
1602        let mut state = self.state.lock();
1603        let mut calc = self.diff_calculator.lock();
1604        for i in frontiers.iter() {
1605            if !oplog.dag.contains(i) {
1606                return Err(LoroError::FrontiersNotFound(i));
1607            }
1608        }
1609
1610        let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1611            LoroError::NotFoundError(
1612                format!(
1613                    "Cannot find the current state version {:?}",
1614                    state.frontiers
1615                )
1616                .into_boxed_str(),
1617            )
1618        })?;
1619        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1620            return Err(LoroError::NotFoundError(
1621                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1622            ));
1623        };
1624
1625        self.set_detached(true);
1626        let (diff, diff_mode) =
1627            calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1628        state.apply_diff(
1629            InternalDocDiff {
1630                origin: "checkout".into(),
1631                diff: Cow::Owned(diff),
1632                by: EventTriggerKind::Checkout,
1633                new_version: Cow::Owned(frontiers.clone()),
1634            },
1635            diff_mode,
1636        )?;
1637
1638        Ok(())
1639    }
1640
1641    #[inline]
1642    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1643        self.oplog.lock().dag.vv_to_frontiers(vv)
1644    }
1645
1646    #[inline]
1647    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1648        self.oplog.lock().dag.frontiers_to_vv(frontiers)
1649    }
1650
1651    /// Import ops from other doc.
1652    ///
1653    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1654    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1655        let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1656        self.import(&updates)
1657    }
1658
1659    pub(crate) fn arena(&self) -> &SharedArena {
1660        &self.arena
1661    }
1662
1663    #[inline]
1664    pub fn len_ops(&self) -> usize {
1665        if self.oplog.can_lock_in_this_thread() {
1666            return self.oplog.lock().visible_op_count_exact();
1667        }
1668
1669        self.visible_op_count.load(Acquire)
1670    }
1671
1672    #[inline]
1673    pub fn len_changes(&self) -> usize {
1674        let oplog = self.oplog.lock();
1675        oplog.len_changes()
1676    }
1677
1678    pub fn config(&self) -> &Configure {
1679        &self.config
1680    }
1681
1682    /// This method compare the consistency between the current doc state
1683    /// and the state calculated by diff calculator from beginning.
1684    ///
1685    /// Panic when it's not consistent
1686    pub fn check_state_diff_calc_consistency_slow(&self) {
1687        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1688        {
1689            static IS_CHECKING: std::sync::atomic::AtomicBool =
1690                std::sync::atomic::AtomicBool::new(false);
1691            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1692                return;
1693            }
1694
1695            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1696            let peer_id = self.peer_id();
1697            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1698            let _g = s.enter();
1699            let options = self.implicit_commit_then_stop().0;
1700            self.oplog.lock().check_dag_correctness();
1701            if self.is_shallow() {
1702                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1703                //
1704                // Instead, we:
1705                // 1. Export the initial state from the GC snapshot.
1706                // 2. Create a new document and import the initial snapshot.
1707                // 3. Export updates from the shallow start version vector to the current version.
1708                // 4. Import these updates into the new document.
1709                // 5. Compare the states of the new document and the current document.
1710
1711                // Step 1: Export the initial state from the GC snapshot.
1712                let initial_snapshot = self
1713                    .export(ExportMode::state_only(Some(
1714                        &self.shallow_since_frontiers(),
1715                    )))
1716                    .unwrap();
1717
1718                // Step 2: Create a new document and import the initial snapshot.
1719                let doc = LoroDoc::new();
1720                doc.import(&initial_snapshot).unwrap();
1721                self.checkout(&self.shallow_since_frontiers()).unwrap();
1722                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1723
1724                // Step 3: Export updates since the shallow start version vector to the current version.
1725                let updates = self.export(ExportMode::all_updates()).unwrap();
1726
1727                // Step 4: Import these updates into the new document.
1728                doc.import(&updates).unwrap();
1729                self.checkout_to_latest();
1730
1731                // Step 5: Checkout to the current state's frontiers and compare the states.
1732                // doc.checkout(&self.state_frontiers()).unwrap();
1733                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1734                let mut calculated_state = doc.app_state().lock();
1735                let mut current_state = self.app_state().lock();
1736                current_state.check_is_the_same(&mut calculated_state);
1737            } else {
1738                let f = self.state_frontiers();
1739                let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1740                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1741                let doc = Self::new();
1742                doc.import(&bytes).unwrap();
1743                let mut calculated_state = doc.app_state().lock();
1744                let mut current_state = self.app_state().lock();
1745                current_state.check_is_the_same(&mut calculated_state);
1746            }
1747
1748            self.renew_txn_if_auto_commit(options);
1749            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1750        }
1751    }
1752
1753    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1754        self.query_pos_internal(pos, true)
1755    }
1756
1757    /// Get position in a seq container
1758    pub(crate) fn query_pos_internal(
1759        &self,
1760        pos: &Cursor,
1761        ret_event_index: bool,
1762    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1763        if !self.has_container(&pos.container) {
1764            return Err(CannotFindRelativePosition::IdNotFound);
1765        }
1766
1767        let mut state = self.state.lock();
1768        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1769            Ok(PosQueryResult {
1770                update: None,
1771                current: AbsolutePosition {
1772                    pos: ans,
1773                    side: pos.side,
1774                },
1775            })
1776        } else {
1777            // We need to trace back to the version where the relative position is valid.
1778            // The optimal way to find that version is to have succ info like Automerge.
1779            //
1780            // But we don't have that info now, so an alternative way is to trace back
1781            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1782            // the target is just deleted a few versions ago.
1783            //
1784            // What we need is to trace back to the latest version that deletes the target
1785            // id.
1786
1787            // commit the txn to make sure we can query the history correctly, preserving options
1788            drop(state);
1789            let result = self.with_barrier(|| {
1790                let oplog = self.oplog().lock();
1791                // TODO: assert pos.id is not unknown
1792                if let Some(id) = pos.id {
1793                    // Ensure the container is registered if it exists lazily
1794                    if oplog.arena.id_to_idx(&pos.container).is_none() {
1795                        let mut s = self.state.lock();
1796                        if !s.does_container_exist(&pos.container) {
1797                            return Err(CannotFindRelativePosition::ContainerDeleted);
1798                        }
1799                        s.ensure_container(&pos.container);
1800                        drop(s);
1801                    }
1802                    let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1803                    // We know where the target id is when we trace back to the delete_op_id.
1804                    let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1805                        if oplog.shallow_since_vv().includes_id(id) {
1806                            return Err(CannotFindRelativePosition::HistoryCleared);
1807                        }
1808
1809                        tracing::error!("Cannot find id {}", id);
1810                        return Err(CannotFindRelativePosition::IdNotFound);
1811                    };
1812                    // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1813                    let mut diff_calc = DiffCalculator::new(true);
1814                    let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1815                    let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1816                    // TODO: PERF: it doesn't need to calc the effects here
1817                    diff_calc.calc_diff_internal(
1818                        &oplog,
1819                        before,
1820                        &before_frontiers,
1821                        oplog.vv(),
1822                        oplog.frontiers(),
1823                        Some(&|target| idx == target),
1824                    );
1825                    // TODO: remove depth info
1826                    let depth = self.arena.get_depth(idx);
1827                    let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1828                    match diff_calc {
1829                        crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1830                            let c = text.get_id_latest_pos(id).unwrap();
1831                            let new_pos = c.pos;
1832                            let handler = self.get_text(&pos.container);
1833                            let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1834                            Ok(PosQueryResult {
1835                                update: handler.get_cursor(current_pos, c.side),
1836                                current: AbsolutePosition {
1837                                    pos: current_pos,
1838                                    side: c.side,
1839                                },
1840                            })
1841                        }
1842                        crate::diff_calc::ContainerDiffCalculator::List(list) => {
1843                            let c = list.get_id_latest_pos(id).unwrap();
1844                            let new_pos = c.pos;
1845                            let handler = self.get_list(&pos.container);
1846                            Ok(PosQueryResult {
1847                                update: handler.get_cursor(new_pos, c.side),
1848                                current: AbsolutePosition {
1849                                    pos: new_pos,
1850                                    side: c.side,
1851                                },
1852                            })
1853                        }
1854                        crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1855                            let c = list.get_id_latest_pos(id).unwrap();
1856                            let new_pos = c.pos;
1857                            let handler = self.get_movable_list(&pos.container);
1858                            let new_pos = handler.op_pos_to_user_pos(new_pos);
1859                            Ok(PosQueryResult {
1860                                update: handler.get_cursor(new_pos, c.side),
1861                                current: AbsolutePosition {
1862                                    pos: new_pos,
1863                                    side: c.side,
1864                                },
1865                            })
1866                        }
1867                        crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1868                        crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1869                        #[cfg(feature = "counter")]
1870                        crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1871                        crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1872                    }
1873                } else {
1874                    match pos.container.container_type() {
1875                        ContainerType::Text => {
1876                            let text = self.get_text(&pos.container);
1877                            Ok(PosQueryResult {
1878                                update: Some(Cursor {
1879                                    id: None,
1880                                    container: text.id(),
1881                                    side: pos.side,
1882                                    origin_pos: text.len_unicode(),
1883                                }),
1884                                current: AbsolutePosition {
1885                                    pos: text.len_event(),
1886                                    side: pos.side,
1887                                },
1888                            })
1889                        }
1890                        ContainerType::List => {
1891                            let list = self.get_list(&pos.container);
1892                            Ok(PosQueryResult {
1893                                update: Some(Cursor {
1894                                    id: None,
1895                                    container: list.id(),
1896                                    side: pos.side,
1897                                    origin_pos: list.len(),
1898                                }),
1899                                current: AbsolutePosition {
1900                                    pos: list.len(),
1901                                    side: pos.side,
1902                                },
1903                            })
1904                        }
1905                        ContainerType::MovableList => {
1906                            let list = self.get_movable_list(&pos.container);
1907                            Ok(PosQueryResult {
1908                                update: Some(Cursor {
1909                                    id: None,
1910                                    container: list.id(),
1911                                    side: pos.side,
1912                                    origin_pos: list.len(),
1913                                }),
1914                                current: AbsolutePosition {
1915                                    pos: list.len(),
1916                                    side: pos.side,
1917                                },
1918                            })
1919                        }
1920                        ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1921                            unreachable!()
1922                        }
1923                        #[cfg(feature = "counter")]
1924                        ContainerType::Counter => unreachable!(),
1925                    }
1926                }
1927            });
1928            result
1929        }
1930    }
1931
1932    /// Free the history cache that is used for making checkout faster.
1933    ///
1934    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1935    /// You can free it by calling this method.
1936    pub fn free_history_cache(&self) {
1937        self.oplog.lock().free_history_cache();
1938    }
1939
1940    /// Free the cached diff calculator that is used for checkout.
1941    pub fn free_diff_calculator(&self) {
1942        *self.diff_calculator.lock() = DiffCalculator::new(true);
1943    }
1944
1945    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1946    /// You can free it by calling `free_history_cache`.
1947    pub fn has_history_cache(&self) -> bool {
1948        self.oplog.lock().has_history_cache()
1949    }
1950
1951    /// Encoded all ops and history cache to bytes and store them in the kv store.
1952    ///
1953    /// The parsed ops will be dropped
1954    #[inline]
1955    pub fn compact_change_store(&self) {
1956        self.with_barrier(|| {
1957            self.oplog.lock().compact_change_store();
1958        });
1959    }
1960
1961    /// Analyze the container info of the doc
1962    ///
1963    /// This is used for development and debugging
1964    #[inline]
1965    pub fn analyze(&self) -> DocAnalysis {
1966        DocAnalysis::analyze(self)
1967    }
1968
1969    /// Get the path from the root to the container
1970    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1971        let mut state = self.state.lock();
1972        if state.arena.id_to_idx(id).is_none() {
1973            if !state.does_container_exist(id) {
1974                return None;
1975            }
1976            state.ensure_container(id);
1977        }
1978        let idx = state.arena.id_to_idx(id).unwrap();
1979        state.get_path(idx)
1980    }
1981
1982    #[instrument(skip(self))]
1983    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1984        self.with_barrier(|| {
1985            let ans = match mode {
1986                ExportMode::Snapshot => export_fast_snapshot(self),
1987                ExportMode::Updates { from } => export_fast_updates(self, &from),
1988                ExportMode::UpdatesInRange { spans } => {
1989                    export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
1990                }
1991                ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1992                ExportMode::StateOnly(f) => match f {
1993                    Some(f) => export_state_only_snapshot(self, &f)?,
1994                    None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1995                },
1996                ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1997            };
1998            Ok(ans)
1999        })
2000    }
2001
2002    /// The doc only contains the history since the shallow history start version vector.
2003    ///
2004    /// This is empty if the doc is not shallow.
2005    ///
2006    /// The ops included by the shallow history start version vector are not in the doc.
2007    pub fn shallow_since_vv(&self) -> ImVersionVector {
2008        self.oplog().lock().shallow_since_vv().clone()
2009    }
2010
2011    pub fn shallow_since_frontiers(&self) -> Frontiers {
2012        self.oplog().lock().shallow_since_frontiers().clone()
2013    }
2014
2015    /// Check if the doc contains the full history.
2016    pub fn is_shallow(&self) -> bool {
2017        !self.oplog().lock().shallow_since_vv().is_empty()
2018    }
2019
2020    /// Get the number of operations in the pending transaction.
2021    ///
2022    /// The pending transaction is the one that is not committed yet. It will be committed
2023    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
2024    pub fn get_pending_txn_len(&self) -> usize {
2025        if let Some(txn) = self.txn.lock().as_ref() {
2026            txn.len()
2027        } else {
2028            0
2029        }
2030    }
2031
2032    #[inline]
2033    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
2034        self.oplog().lock().dag.find_path(from, to)
2035    }
2036
2037    /// Subscribe to the first commit from a peer. Operations performed on the `LoroDoc` within this callback
2038    /// will be merged into the current commit.
2039    ///
2040    /// This is useful for managing the relationship between `PeerID` and user information.
2041    /// For example, you could store user names in a `LoroMap` using `PeerID` as the key and the `UserID` as the value.
2042    pub fn subscribe_first_commit_from_peer(
2043        &self,
2044        callback: FirstCommitFromPeerCallback,
2045    ) -> Subscription {
2046        let (s, enable) = self
2047            .first_commit_from_peer_subs
2048            .inner()
2049            .insert((), callback);
2050        enable();
2051        s
2052    }
2053
2054    /// Subscribe to the pre-commit event.
2055    ///
2056    /// The callback will be called when the changes are committed but not yet applied to the OpLog.
2057    /// You can modify the commit message and timestamp in the callback by [`ChangeModifier`].
2058    pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
2059        let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
2060        enable();
2061        s
2062    }
2063}
2064
2065#[derive(Debug, thiserror::Error)]
2066pub enum ChangeTravelError {
2067    #[error("Target id not found {0:?}")]
2068    TargetIdNotFound(ID),
2069    #[error("The shallow history of the doc doesn't include the target version")]
2070    TargetVersionNotIncluded,
2071}
2072
2073impl LoroDoc {
2074    pub fn travel_change_ancestors(
2075        &self,
2076        ids: &[ID],
2077        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
2078    ) -> Result<(), ChangeTravelError> {
2079        let (options, guard) = self.implicit_commit_then_stop();
2080        drop(guard);
2081        struct PendingNode(ChangeMeta);
2082        impl PartialEq for PendingNode {
2083            fn eq(&self, other: &Self) -> bool {
2084                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
2085            }
2086        }
2087
2088        impl Eq for PendingNode {}
2089        impl PartialOrd for PendingNode {
2090            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2091                Some(self.cmp(other))
2092            }
2093        }
2094
2095        impl Ord for PendingNode {
2096            fn cmp(&self, other: &Self) -> Ordering {
2097                self.0
2098                    .lamport_last()
2099                    .cmp(&other.0.lamport_last())
2100                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
2101            }
2102        }
2103
2104        for id in ids {
2105            let op_log = &self.oplog().lock();
2106            if !op_log.vv().includes_id(*id) {
2107                return Err(ChangeTravelError::TargetIdNotFound(*id));
2108            }
2109            if op_log.dag.shallow_since_vv().includes_id(*id) {
2110                return Err(ChangeTravelError::TargetVersionNotIncluded);
2111            }
2112        }
2113
2114        let mut visited = FxHashSet::default();
2115        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
2116        for id in ids {
2117            pending.push(PendingNode(ChangeMeta::from_change(
2118                &self.oplog().lock().get_change_at(*id).unwrap(),
2119            )));
2120        }
2121        while let Some(PendingNode(node)) = pending.pop() {
2122            let deps = node.deps.clone();
2123            if f(node).is_break() {
2124                break;
2125            }
2126
2127            for dep in deps.iter() {
2128                let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
2129                    continue;
2130                };
2131                if visited.contains(&dep_node.id) {
2132                    continue;
2133                }
2134
2135                visited.insert(dep_node.id);
2136                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
2137            }
2138        }
2139
2140        let ans = Ok(());
2141        self.renew_txn_if_auto_commit(options);
2142        ans
2143    }
2144
2145    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
2146        self.with_barrier(|| {
2147            let mut set = FxHashSet::default();
2148            {
2149                let oplog = self.oplog().lock();
2150                for op in oplog.iter_ops(id.to_span(len)) {
2151                    let id = oplog.arena.get_container_id(op.container()).unwrap();
2152                    set.insert(id);
2153                }
2154            }
2155            set
2156        })
2157    }
2158
2159    pub fn delete_root_container(&self, cid: ContainerID) {
2160        if !cid.is_root() {
2161            return;
2162        }
2163
2164        // Do not treat "not in arena" as non-existence; consult state/kv
2165        if !self.has_container(&cid) {
2166            return;
2167        }
2168
2169        let Some(h) = self.get_handler(cid.clone()) else {
2170            return;
2171        };
2172
2173        if let Err(e) = h.clear() {
2174            eprintln!("Failed to clear handler: {:?}", e);
2175            return;
2176        }
2177        self.config.deleted_root_containers.lock().insert(cid);
2178    }
2179
2180    pub fn set_hide_empty_root_containers(&self, hide: bool) {
2181        self.config
2182            .hide_empty_root_containers
2183            .store(hide, std::sync::atomic::Ordering::Relaxed);
2184    }
2185}
2186
2187// FIXME: PERF: This method is quite slow because it iterates all the changes
2188fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2189    let start_vv = oplog
2190        .dag
2191        .frontiers_to_vv(&id.into())
2192        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2193    for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2194        for op in change.ops.iter().rev() {
2195            if op.container != idx {
2196                continue;
2197            }
2198            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2199                if d.id_start.to_span(d.atom_len()).contains(id) {
2200                    return Some(ID::new(change.peer(), op.counter));
2201                }
2202            }
2203        }
2204    }
2205
2206    None
2207}
2208
2209#[derive(Debug)]
2210pub struct CommitWhenDrop<'a> {
2211    doc: &'a LoroDoc,
2212    default_options: CommitOptions,
2213}
2214
2215impl Drop for CommitWhenDrop<'_> {
2216    fn drop(&mut self) {
2217        {
2218            let mut guard = self.doc.txn.lock();
2219            if let Some(txn) = guard.as_mut() {
2220                txn.set_default_options(std::mem::take(&mut self.default_options));
2221            };
2222        }
2223
2224        self.doc.commit_then_renew();
2225    }
2226}
2227
2228/// Options for configuring a commit operation.
2229#[derive(Debug, Clone)]
2230pub struct CommitOptions {
2231    /// Origin identifier for the commit event, used to track the source of changes.
2232    /// It doesn't persist.
2233    pub origin: Option<InternalString>,
2234
2235    /// Whether to immediately start a new transaction after committing.
2236    /// Defaults to true.
2237    pub immediate_renew: bool,
2238
2239    /// Custom timestamp for the commit in seconds since Unix epoch.
2240    /// If None, the current time will be used.
2241    pub timestamp: Option<Timestamp>,
2242
2243    /// Optional commit message to attach to the changes. It will be persisted.
2244    pub commit_msg: Option<Arc<str>>,
2245}
2246
2247impl CommitOptions {
2248    /// Creates a new CommitOptions with default values.
2249    pub fn new() -> Self {
2250        Self {
2251            origin: None,
2252            immediate_renew: true,
2253            timestamp: None,
2254            commit_msg: None,
2255        }
2256    }
2257
2258    /// Sets the origin identifier for this commit.
2259    pub fn origin(mut self, origin: &str) -> Self {
2260        self.origin = Some(origin.into());
2261        self
2262    }
2263
2264    /// Sets whether to immediately start a new transaction after committing.
2265    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2266        self.immediate_renew = immediate_renew;
2267        self
2268    }
2269
2270    /// Set the timestamp of the commit.
2271    ///
2272    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
2273    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2274        self.timestamp = Some(timestamp);
2275        self
2276    }
2277
2278    /// Sets a commit message to be attached to the changes.
2279    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2280        self.commit_msg = Some(commit_msg.into());
2281        self
2282    }
2283
2284    /// Sets the origin identifier for this commit.
2285    pub fn set_origin(&mut self, origin: Option<&str>) {
2286        self.origin = origin.map(|x| x.into())
2287    }
2288
2289    /// Sets the timestamp for this commit.
2290    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2291        self.timestamp = timestamp;
2292    }
2293}
2294
2295impl Default for CommitOptions {
2296    fn default() -> Self {
2297        Self::new()
2298    }
2299}
2300
2301#[cfg(test)]
2302mod test {
2303    use std::{
2304        panic::AssertUnwindSafe,
2305        sync::{
2306            atomic::{AtomicUsize, Ordering},
2307            Arc,
2308        },
2309    };
2310
2311    use crate::{
2312        cursor::PosType,
2313        encoding::json_schema::json::{JsonOpContent, JsonSchema, ListOp},
2314        encoding::{fast_snapshot::EMPTY_MARK, EncodeMode},
2315        loro::ExportMode,
2316        version::{Frontiers, VersionVector},
2317        LoroDoc, ToJson, TreeParentId,
2318    };
2319    use bytes::{BufMut, Bytes};
2320    use loro_common::ID;
2321    use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
2322
2323    const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
2324
2325    fn encode_import_blob(mode: EncodeMode, body: &[u8]) -> Vec<u8> {
2326        let mut ans = Vec::new();
2327        ans.extend_from_slice(b"loro");
2328        ans.extend_from_slice(&[0; 16]);
2329        ans.extend_from_slice(&mode.to_bytes());
2330        ans.extend_from_slice(body);
2331        let checksum = xxhash_rust::xxh32::xxh32(&ans[20..], XXH_SEED);
2332        ans[16..20].copy_from_slice(&checksum.to_le_bytes());
2333        ans
2334    }
2335
2336    fn encode_fast_snapshot_import(oplog_bytes: &[u8]) -> Vec<u8> {
2337        let mut body = Vec::new();
2338        body.put_u32_le(oplog_bytes.len() as u32);
2339        body.extend_from_slice(oplog_bytes);
2340        body.put_u32_le(EMPTY_MARK.len() as u32);
2341        body.extend_from_slice(EMPTY_MARK);
2342        body.put_u32_le(0);
2343        encode_import_blob(EncodeMode::FastSnapshot, &body)
2344    }
2345
2346    fn sstable_with_huge_meta_block_count() -> Vec<u8> {
2347        let mut bytes = Vec::new();
2348        bytes.extend_from_slice(b"LORO");
2349        bytes.push(0);
2350        bytes.put_u32_le(10_000_000);
2351        bytes.put_u32_le(xxhash_rust::xxh32::xxh32(&[], XXH_SEED));
2352        bytes.put_u32_le(5);
2353        bytes
2354    }
2355
2356    fn snapshot_oplog_with_malformed_block() -> Vec<u8> {
2357        let peer = 1;
2358        let id = ID::new(peer, 0);
2359        let vv = VersionVector::from_iter([(peer, 1)]);
2360        let frontiers = Frontiers::from_id(id);
2361        let mut store = MemKvStore::new(MemKvConfig::default());
2362        store.set(b"vv", vv.encode().into());
2363        store.set(b"fr", frontiers.encode().into());
2364        store.set(&id.to_bytes(), Bytes::from_static(&[0]));
2365        store.export_all().to_vec()
2366    }
2367
2368    fn make_json_import_stress_doc(peer: u64) -> LoroDoc {
2369        let doc = LoroDoc::new_auto_commit();
2370        doc.set_peer_id(peer).unwrap();
2371
2372        let text = doc.get_text("text");
2373        let mut text_pos = 0;
2374        for i in 0..32 {
2375            let chunk = format!("segment-{i}-abcdefghijklmnopqrstuvwxyz;");
2376            text.insert_unicode(text_pos, &chunk).unwrap();
2377            text_pos += chunk.chars().count();
2378        }
2379
2380        let list = doc.get_list("list");
2381        for i in 0..32 {
2382            list.insert(i, format!("item-{i}")).unwrap();
2383        }
2384
2385        let map = doc.get_map("map");
2386        for i in 0..32 {
2387            let key = format!("key-{i}");
2388            map.insert(&key, format!("value-{i}")).unwrap();
2389        }
2390
2391        let tree = doc.get_tree("tree");
2392        let mut parent = TreeParentId::Root;
2393        for i in 0..16 {
2394            let node = tree.create(parent).unwrap();
2395            let meta = tree.get_meta(node).unwrap();
2396            meta.insert("name", format!("node-{i}")).unwrap();
2397            meta.insert("payload", format!("payload-{i}-{}", "x".repeat(16)))
2398                .unwrap();
2399            parent = TreeParentId::Node(node);
2400        }
2401
2402        doc
2403    }
2404
2405    fn make_json_list_update_with_four_ops(peer: u64) -> (LoroDoc, JsonSchema) {
2406        let doc = LoroDoc::new();
2407        doc.set_peer_id(peer).unwrap();
2408        let map = doc.get_map("map");
2409        let list = doc.get_list("list");
2410        let text = doc.get_text("text");
2411
2412        let mut txn = doc.txn().unwrap();
2413        map.insert_with_txn(&mut txn, "prefix", "map-value".into())
2414            .unwrap();
2415        list.insert_with_txn(&mut txn, 0, "seed".into()).unwrap();
2416        text.insert_with_txn(&mut txn, 0, "text-value", PosType::Unicode)
2417            .unwrap();
2418        list.insert_with_txn(&mut txn, 1, "tail".into()).unwrap();
2419        txn.commit().unwrap();
2420
2421        let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv(), false);
2422        assert_eq!(json.changes.len(), 1);
2423        assert_eq!(json.changes[0].ops.len(), 4);
2424        (doc, json)
2425    }
2426
2427    fn move_last_list_insert_far_out_of_bounds(json: &mut JsonSchema) {
2428        let last_change = json.changes.last_mut().unwrap();
2429        let last_op = last_change.ops.last_mut().unwrap();
2430        match &mut last_op.content {
2431            JsonOpContent::List(ListOp::Insert { pos, .. }) => {
2432                *pos = 1_000;
2433            }
2434            other => panic!("expected list insert op, got {other:?}"),
2435        }
2436    }
2437
2438    #[test]
2439    fn test_sync() {
2440        fn is_send_sync<T: Send + Sync>(_v: T) {}
2441        let loro = super::LoroDoc::new();
2442        is_send_sync(loro)
2443    }
2444
2445    #[test]
2446    fn import_rejects_huge_sstable_meta_block_count_without_panic() {
2447        let bytes = encode_fast_snapshot_import(&sstable_with_huge_meta_block_count());
2448
2449        let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2450        assert!(result.is_ok(), "malformed import should not panic");
2451        assert!(result.unwrap().is_err());
2452    }
2453
2454    #[test]
2455    fn import_rejects_malformed_change_block_without_panic() {
2456        let bytes = encode_fast_snapshot_import(&snapshot_oplog_with_malformed_block());
2457
2458        let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2459        assert!(result.is_ok(), "malformed import should not panic");
2460        assert!(result.unwrap().is_err());
2461    }
2462
2463    #[test]
2464    fn failed_import_rolls_back_oplog_and_arena() {
2465        let src = LoroDoc::new();
2466        src.set_peer_id(1).unwrap();
2467        let text = src.get_text("text");
2468        let mut txn = src.txn().unwrap();
2469        text.insert_with_txn(&mut txn, 0, "hello", PosType::Unicode)
2470            .unwrap();
2471        txn.commit().unwrap();
2472        let update = src.export(ExportMode::all_updates()).unwrap();
2473
2474        let dst = LoroDoc::new();
2475        let vv_before_import = dst.oplog_vv();
2476        let state_before_import = dst.get_deep_value();
2477        let err = dst
2478            .import_with(&update, "__loro_fail_import_state_apply".into())
2479            .unwrap_err();
2480        assert!(err.to_string().contains("state apply failpoint"));
2481        assert_eq!(dst.oplog_vv(), vv_before_import);
2482        assert_eq!(dst.get_deep_value(), state_before_import);
2483        assert!(dst.oplog().lock().is_empty());
2484
2485        dst.import(&update).unwrap();
2486        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2487    }
2488
2489    #[test]
2490    fn failed_incremental_import_restores_previous_change_store_block() {
2491        let src = LoroDoc::new();
2492        src.set_peer_id(1).unwrap();
2493        let text = src.get_text("text");
2494        let mut txn = src.txn().unwrap();
2495        text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2496            .unwrap();
2497        txn.commit().unwrap();
2498        let first_update = src.export(ExportMode::all_updates()).unwrap();
2499        let first_vv = src.oplog_vv();
2500
2501        let mut txn = src.txn().unwrap();
2502        text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2503            .unwrap();
2504        txn.commit().unwrap();
2505        let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2506
2507        let dst = LoroDoc::new();
2508        dst.import(&first_update).unwrap();
2509        let vv_before_import = dst.oplog_vv();
2510        let state_before_import = dst.get_deep_value();
2511        dst.import_with(&second_update, "__loro_fail_import_state_apply".into())
2512            .unwrap_err();
2513        assert_eq!(dst.oplog_vv(), vv_before_import);
2514        assert_eq!(dst.get_deep_value(), state_before_import);
2515
2516        dst.import(&second_update).unwrap();
2517        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2518    }
2519
2520    #[test]
2521    fn failed_import_json_updates_rolls_back_complex_empty_doc() {
2522        let src = make_json_import_stress_doc(11);
2523        let json = src.export_json_updates(&Default::default(), &src.oplog_vv(), false);
2524
2525        let dst = LoroDoc::new();
2526        let vv_before_import = dst.oplog_vv();
2527        let frontiers_before_import = dst.oplog_frontiers();
2528        let state_before_import = dst.get_deep_value();
2529        for _ in 0..3 {
2530            crate::state::fail_next_import_state_apply_for_test();
2531            let err = dst.import_json_updates(json.clone()).unwrap_err();
2532            assert!(err.to_string().contains("state apply failpoint"));
2533            assert_eq!(dst.oplog_vv(), vv_before_import);
2534            assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2535            assert_eq!(dst.get_deep_value(), state_before_import);
2536            assert!(dst.oplog().lock().is_empty());
2537        }
2538
2539        dst.import_json_updates(json).unwrap();
2540        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2541        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2542        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2543    }
2544
2545    #[test]
2546    fn failed_incremental_import_json_updates_restores_previous_change_store_block() {
2547        let src = LoroDoc::new_auto_commit();
2548        src.set_peer_id(12).unwrap();
2549        let text = src.get_text("text");
2550        text.insert_unicode(0, "a").unwrap();
2551        let list = src.get_list("list");
2552        list.push("seed").unwrap();
2553        let map = src.get_map("map");
2554        map.insert("seed", "value").unwrap();
2555        let tree = src.get_tree("tree");
2556        let root = tree.create(TreeParentId::Root).unwrap();
2557        tree.get_meta(root).unwrap().insert("name", "root").unwrap();
2558
2559        let first_vv = src.oplog_vv();
2560        let first_json = src.export_json_updates(&Default::default(), &first_vv, false);
2561
2562        let mut text_pos = text.len_unicode();
2563        for i in 0..64 {
2564            let chunk = format!("chunk-{i};");
2565            text.insert_unicode(text_pos, &chunk).unwrap();
2566            text_pos += chunk.chars().count();
2567        }
2568        for i in 0..32 {
2569            list.push(format!("after-{i}")).unwrap();
2570            let key = format!("after-{i}");
2571            map.insert(&key, format!("value-{i}")).unwrap();
2572        }
2573        let child = tree.create(TreeParentId::Node(root)).unwrap();
2574        tree.get_meta(child)
2575            .unwrap()
2576            .insert("name", "child")
2577            .unwrap();
2578
2579        let second_json = src.export_json_updates(&first_vv, &src.oplog_vv(), false);
2580
2581        let dst = LoroDoc::new();
2582        dst.import_json_updates(first_json).unwrap();
2583        let vv_before_import = dst.oplog_vv();
2584        let frontiers_before_import = dst.oplog_frontiers();
2585        let state_before_import = dst.get_deep_value();
2586
2587        for _ in 0..2 {
2588            crate::state::fail_next_import_state_apply_for_test();
2589            let err = dst.import_json_updates(second_json.clone()).unwrap_err();
2590            assert!(err.to_string().contains("state apply failpoint"));
2591            assert_eq!(dst.oplog_vv(), vv_before_import);
2592            assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2593            assert_eq!(dst.get_deep_value(), state_before_import);
2594        }
2595
2596        dst.import_json_updates(second_json).unwrap();
2597        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2598        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2599        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2600    }
2601
2602    #[test]
2603    fn malformed_later_import_json_update_rolls_back_after_valid_prefix_enters_oplog() {
2604        let peer = 13;
2605        let (src, good_json) = make_json_list_update_with_four_ops(peer);
2606        let mut bad_json = good_json.clone();
2607        move_last_list_insert_far_out_of_bounds(&mut bad_json);
2608
2609        let good_dst = LoroDoc::new();
2610        good_dst.import_json_updates(good_json.clone()).unwrap();
2611        assert_eq!(good_dst.get_deep_value(), src.get_deep_value());
2612
2613        let last_op_counter = good_json.changes[0].ops.last().unwrap().counter;
2614        let prefix_vv = VersionVector::from_iter([(peer, last_op_counter)]);
2615        let prefix_json = src.export_json_updates(&Default::default(), &prefix_vv, false);
2616        assert_eq!(
2617            prefix_json.changes[0].ops.len(),
2618            good_json.changes[0].ops.len() - 1
2619        );
2620        let good_suffix_json = src.export_json_updates(&prefix_vv, &src.oplog_vv(), false);
2621        assert_eq!(good_suffix_json.changes[0].ops.len(), 1);
2622        let mut bad_suffix_json = good_suffix_json.clone();
2623        move_last_list_insert_far_out_of_bounds(&mut bad_suffix_json);
2624
2625        let prefix_dst = LoroDoc::new();
2626        prefix_dst.import_json_updates(prefix_json.clone()).unwrap();
2627        let vv_before_bad_suffix = prefix_dst.oplog_vv();
2628        let frontiers_before_bad_suffix = prefix_dst.oplog_frontiers();
2629        let state_before_bad_suffix = prefix_dst.get_deep_value();
2630
2631        let bad_suffix_json = serde_json::to_string(&bad_suffix_json).unwrap();
2632        let err = prefix_dst
2633            .import_json_updates(&bad_suffix_json)
2634            .unwrap_err();
2635        assert!(
2636            err.to_string().contains("list diff"),
2637            "expected state list bounds validation, got {err:?}"
2638        );
2639        assert_eq!(prefix_dst.oplog_vv(), vv_before_bad_suffix);
2640        assert_eq!(prefix_dst.oplog_frontiers(), frontiers_before_bad_suffix);
2641        assert_eq!(prefix_dst.get_deep_value(), state_before_bad_suffix);
2642
2643        prefix_dst.import_json_updates(good_suffix_json).unwrap();
2644        assert_eq!(prefix_dst.get_deep_value(), src.get_deep_value());
2645        assert_eq!(prefix_dst.oplog_vv(), src.oplog_vv());
2646
2647        let dst = LoroDoc::new();
2648        let vv_before_import = dst.oplog_vv();
2649        let frontiers_before_import = dst.oplog_frontiers();
2650        let state_before_import = dst.get_deep_value();
2651        let bad_json = serde_json::to_string(&bad_json).unwrap();
2652        let err = dst.import_json_updates(&bad_json).unwrap_err();
2653        assert!(
2654            err.to_string().contains("list diff"),
2655            "expected state list bounds validation, got {err:?}"
2656        );
2657        assert_eq!(dst.oplog_vv(), vv_before_import);
2658        assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2659        assert_eq!(dst.get_deep_value(), state_before_import);
2660        assert!(dst.oplog().lock().is_empty());
2661    }
2662
2663    #[test]
2664    fn failed_import_restores_pending_changes_that_were_applied_during_import() {
2665        let src = LoroDoc::new();
2666        src.set_peer_id(14).unwrap();
2667        let text = src.get_text("text");
2668
2669        let mut txn = src.txn().unwrap();
2670        text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2671            .unwrap();
2672        txn.commit().unwrap();
2673        let first_update = src.export(ExportMode::all_updates()).unwrap();
2674        let first_vv = src.oplog_vv();
2675
2676        let mut txn = src.txn().unwrap();
2677        text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2678            .unwrap();
2679        txn.commit().unwrap();
2680        let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2681
2682        let dst = LoroDoc::new();
2683        let status = dst.import(&second_update).unwrap();
2684        assert!(status.success.is_empty());
2685        assert!(status.pending.is_some());
2686        let vv_before_dependency = dst.oplog_vv();
2687        let frontiers_before_dependency = dst.oplog_frontiers();
2688        let state_before_dependency = dst.get_deep_value();
2689
2690        crate::state::fail_next_import_state_apply_for_test();
2691        let err = dst.import(&first_update).unwrap_err();
2692        assert!(err.to_string().contains("state apply failpoint"));
2693        assert_eq!(dst.oplog_vv(), vv_before_dependency);
2694        assert_eq!(dst.oplog_frontiers(), frontiers_before_dependency);
2695        assert_eq!(dst.get_deep_value(), state_before_dependency);
2696
2697        dst.import(&first_update).unwrap();
2698        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2699        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2700        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2701    }
2702
2703    #[test]
2704    fn failed_import_json_updates_does_not_emit_or_leave_events() {
2705        let (src, good_json) = make_json_list_update_with_four_ops(15);
2706        let mut bad_json = good_json.clone();
2707        move_last_list_insert_far_out_of_bounds(&mut bad_json);
2708
2709        let dst = LoroDoc::new();
2710        let event_count = Arc::new(AtomicUsize::new(0));
2711        let event_count_cloned = event_count.clone();
2712        let _sub = dst.subscribe_root(Arc::new(move |_| {
2713            event_count_cloned.fetch_add(1, Ordering::SeqCst);
2714        }));
2715
2716        let bad_json = serde_json::to_string(&bad_json).unwrap();
2717        let err = dst.import_json_updates(&bad_json).unwrap_err();
2718        assert!(
2719            err.to_string().contains("list diff"),
2720            "expected state list bounds validation, got {err:?}"
2721        );
2722        assert_eq!(event_count.load(Ordering::SeqCst), 0);
2723        assert!(dst.drop_pending_events().is_empty());
2724        assert!(dst.oplog().lock().is_empty());
2725
2726        dst.import_json_updates(good_json).unwrap();
2727        assert_eq!(event_count.load(Ordering::SeqCst), 1);
2728        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2729    }
2730
2731    #[test]
2732    fn test_checkout() {
2733        let loro = LoroDoc::new();
2734        loro.set_peer_id(1).unwrap();
2735        let text = loro.get_text("text");
2736        let map = loro.get_map("map");
2737        let list = loro.get_list("list");
2738        let mut txn = loro.txn().unwrap();
2739        for i in 0..10 {
2740            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2741            text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2742                .unwrap();
2743            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2744        }
2745        txn.commit().unwrap();
2746        let b = LoroDoc::new();
2747        b.import(&loro.export(ExportMode::Snapshot).unwrap())
2748            .unwrap();
2749        loro.checkout(&Frontiers::default()).unwrap();
2750        {
2751            let json = &loro.get_deep_value();
2752            assert_eq!(
2753                json.to_json_value(),
2754                serde_json::json!({"text":"","list":[],"map":{}})
2755            );
2756        }
2757
2758        b.checkout(&ID::new(1, 2).into()).unwrap();
2759        {
2760            let json = &b.get_deep_value();
2761            assert_eq!(
2762                json.to_json_value(),
2763                serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2764            );
2765        }
2766
2767        loro.checkout(&ID::new(1, 3).into()).unwrap();
2768        {
2769            let json = &loro.get_deep_value();
2770            assert_eq!(
2771                json.to_json_value(),
2772                serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2773            );
2774        }
2775
2776        b.checkout(&ID::new(1, 29).into()).unwrap();
2777        {
2778            let json = &b.get_deep_value();
2779            assert_eq!(
2780                json.to_json_value(),
2781                serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2782            );
2783        }
2784    }
2785
2786    #[test]
2787    fn import_batch_err_181() {
2788        let a = LoroDoc::new_auto_commit();
2789        let update_a = a.export(ExportMode::Snapshot);
2790        let b = LoroDoc::new_auto_commit();
2791        b.import_batch(&[update_a.unwrap()]).unwrap();
2792        b.get_text("text")
2793            .insert(0, "hello", PosType::Unicode)
2794            .unwrap();
2795        b.commit_then_renew();
2796        let oplog = b.oplog().lock();
2797        drop(oplog);
2798        b.export(ExportMode::all_updates()).unwrap();
2799    }
2800
2801    #[test]
2802    fn poisoned_mutex_keeps_follow_up_operations_failed() {
2803        let doc = LoroDoc::new();
2804        let oplog = doc.oplog.clone();
2805        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2806            let _guard = oplog.lock();
2807            panic!("poison oplog");
2808        }));
2809
2810        let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2811            .expect_err("poisoned lock should continue to fail fast");
2812        let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2813            (*msg).to_string()
2814        } else if let Some(msg) = err.downcast_ref::<String>() {
2815            msg.clone()
2816        } else {
2817            String::new()
2818        };
2819        assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2820    }
2821}