Skip to main content

loro_internal/
loro.rs

1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::{AtomicBool, AtomicUsize};
6pub(crate) use crate::LoroDocInner;
7use crate::{
8    arena::SharedArena,
9    change::{Change, Timestamp},
10    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11    container::{
12        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13        IntoContainerId,
14    },
15    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16    dag::{Dag, DagUtils},
17    diff_calc::DiffCalculator,
18    encoding::{
19        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21        export_state_only_snapshot,
22        json_schema::{encode_change_to_json, json::JsonSchema},
23        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24    },
25    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27    id::PeerID,
28    json::JsonChange,
29    op::InnerContent,
30    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31    state::DocState,
32    subscription::{LocalUpdateCallback, Observer, Subscriber},
33    undo::DiffBatch,
34    utils::subscription::{SubscriberSetWithQueue, Subscription},
35    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37    VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42    lock::{LoroLockGroup, LoroMutex},
43    txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47    ContainerID, ContainerType, HasCounterSpan, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError,
48    LoroResult, LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53    borrow::Cow,
54    cmp::Ordering,
55    collections::{hash_map::Entry, BinaryHeap},
56    ops::ControlFlow,
57    sync::{
58        atomic::Ordering::{Acquire, Release},
59        Arc,
60    },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("LoroDoc")
73            .field("config", &self.config)
74            .field("auto_commit", &self.auto_commit)
75            .field("detached", &self.detached)
76            .finish()
77    }
78}
79
80impl LoroDoc {
81    /// Run the provided closure within a commit barrier.
82    ///
83    /// This finalizes any pending auto-commit transaction first (preserving
84    /// options across an empty txn), executes `f`, then renews the transaction
85    /// (carrying preserved options) if auto-commit is enabled. This is the
86    /// common implicit-commit pattern used by internal operations such as
87    /// import/export/checkouts.
88    #[inline]
89    pub fn with_barrier<F, R>(&self, f: F) -> R
90    where
91        F: FnOnce() -> R,
92    {
93        let (options, guard) = self.implicit_commit_then_stop();
94        let result = f();
95        drop(guard);
96        self.renew_txn_if_auto_commit(options);
97        result
98    }
99
100    pub fn new() -> Self {
101        let visible_op_count = Arc::new(AtomicUsize::new(0));
102        let oplog = OpLog::new(visible_op_count.clone());
103        let arena = oplog.arena.clone();
104        let config: Configure = oplog.configure.clone();
105        let lock_group = LoroLockGroup::new();
106        let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
107        let inner = Arc::new_cyclic(|w| {
108            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
109            LoroDocInner {
110                oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
111                state,
112                config,
113                visible_op_count,
114                detached: AtomicBool::new(false),
115                auto_commit: AtomicBool::new(false),
116                observer: Arc::new(Observer::new(arena.clone())),
117                diff_calculator: Arc::new(
118                    lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
119                ),
120                txn: global_txn,
121                arena,
122                local_update_subs: SubscriberSetWithQueue::new(),
123                peer_id_change_subs: SubscriberSetWithQueue::new(),
124                pre_commit_subs: SubscriberSetWithQueue::new(),
125                first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
126            }
127        });
128        LoroDoc { inner }
129    }
130
131    pub fn fork(&self) -> Self {
132        if self.is_detached() {
133            return self
134                .fork_at(&self.state_frontiers())
135                .expect("fork_at on detached doc should not fail");
136        }
137
138        let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
139        let doc = Self::new();
140        doc.with_barrier(|| {
141            encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
142        })
143        .unwrap();
144        doc.set_config(&self.config);
145        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
146            doc.start_auto_commit();
147        }
148        doc
149    }
150    /// Enables editing of the document in detached mode.
151    ///
152    /// By default, the document cannot be edited in detached mode (after calling
153    /// `detach` or checking out a version other than the latest). This method
154    /// allows editing in detached mode.
155    ///
156    /// # Important Notes:
157    ///
158    /// - After enabling this mode, the document will use a different PeerID. Each
159    ///   time you call checkout, a new PeerID will be used.
160    /// - If you set a custom PeerID while this mode is enabled, ensure that
161    ///   concurrent operations with the same PeerID are not possible.
162    /// - On detached mode, importing will not change the state of the document.
163    ///   It also doesn't change the version of the [DocState]. The changes will be
164    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
165    pub fn set_detached_editing(&self, enable: bool) {
166        self.config.set_detached_editing(enable);
167        if enable && self.is_detached() {
168            self.with_barrier(|| {
169                self.renew_peer_id();
170            });
171        }
172    }
173
174    /// Create a doc with auto commit enabled.
175    #[inline]
176    pub fn new_auto_commit() -> Self {
177        let doc = Self::new();
178        doc.start_auto_commit();
179        doc
180    }
181
182    #[inline(always)]
183    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
184        if peer == PeerID::MAX {
185            return Err(LoroError::InvalidPeerID);
186        }
187        let next_id = self.oplog.lock().next_id(peer);
188        if self.auto_commit.load(Acquire) {
189            let doc_state = self.state.lock();
190            doc_state
191                .peer
192                .store(peer, std::sync::atomic::Ordering::Relaxed);
193
194            if doc_state.is_in_txn() {
195                drop(doc_state);
196                // Use implicit-style barrier to avoid swallowing next-commit options
197                self.with_barrier(|| {});
198            }
199            self.peer_id_change_subs.emit(&(), next_id);
200            return Ok(());
201        }
202
203        let doc_state = self.state.lock();
204        if doc_state.is_in_txn() {
205            return Err(LoroError::TransactionError(
206                "Cannot change peer id during transaction"
207                    .to_string()
208                    .into_boxed_str(),
209            ));
210        }
211
212        doc_state
213            .peer
214            .store(peer, std::sync::atomic::Ordering::Relaxed);
215        drop(doc_state);
216        self.peer_id_change_subs.emit(&(), next_id);
217        Ok(())
218    }
219
220    /// Renews the PeerID for the document.
221    pub(crate) fn renew_peer_id(&self) {
222        let mut peer_id = DefaultRandom.next_u64();
223        while peer_id == PeerID::MAX {
224            peer_id = DefaultRandom.next_u64();
225        }
226        self.set_peer_id(peer_id).unwrap();
227    }
228
229    /// Implicitly commit the cumulative auto-commit transaction.
230    /// This method only has effect when `auto_commit` is true.
231    ///
232    /// Follow-ups: the caller is responsible for renewing the transaction
233    /// as needed (e.g., via `renew_txn_if_auto_commit`). Prefer using
234    /// `with_barrier(...)` for most internal flows to handle this safely.
235    ///
236    /// Empty-commit behavior: if the pending transaction is empty, the returned
237    /// `Some(CommitOptions)` preserves next-commit options such as message and
238    /// timestamp so they can carry into the renewed transaction. Transient
239    /// labels like `origin` do not carry across an empty commit.
240    #[inline]
241    #[must_use]
242    pub fn implicit_commit_then_stop(
243        &self,
244    ) -> (
245        Option<CommitOptions>,
246        LoroMutexGuard<'_, Option<Transaction>>,
247    ) {
248        // Implicit commit: preserve options on empty commit
249        let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
250        (a, b.unwrap())
251    }
252
253    /// Commit the cumulative auto commit transaction.
254    /// It will start the next one immediately
255    ///
256    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
257    #[inline]
258    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
259        // Explicit commit: swallow options on empty commit
260        self.commit_internal(CommitOptions::new().immediate_renew(true), false)
261            .0
262    }
263
264    /// This method is called before the commit.
265    /// It can be used to modify the change before it is committed.
266    ///
267    /// It return Some(txn) if the txn is None
268    fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
269        let mut txn_guard = self.txn.lock();
270        let Some(txn) = txn_guard.as_mut() else {
271            return Some(txn_guard);
272        };
273
274        if txn.is_peer_first_appearance {
275            txn.is_peer_first_appearance = false;
276            drop(txn_guard);
277            // First commit from a peer
278            self.first_commit_from_peer_subs.emit(
279                &(),
280                FirstCommitFromPeerPayload {
281                    peer: self.peer_id(),
282                },
283            );
284        }
285
286        None
287    }
288
289    /// Core implementation for committing the cumulative auto-commit transaction.
290    ///
291    /// - When `preserve_on_empty` is true (implicit commits like export/checkout),
292    ///   commit options from an empty transaction are carried over to the next transaction
293    ///   (except `origin`, which never carries across an empty commit).
294    /// - When `preserve_on_empty` is false (explicit commits), commit options from an
295    ///   empty transaction are swallowed and NOT carried over.
296    #[instrument(skip_all)]
297    fn commit_internal(
298        &self,
299        config: CommitOptions,
300        preserve_on_empty: bool,
301    ) -> (
302        Option<CommitOptions>,
303        Option<LoroMutexGuard<'_, Option<Transaction>>>,
304    ) {
305        if !self.auto_commit.load(Acquire) {
306            let txn_guard = self.txn.lock();
307            // if not auto_commit, nothing should happen
308            // because the global txn is not used
309            return (None, Some(txn_guard));
310        }
311
312        loop {
313            if let Some(txn_guard) = self.before_commit() {
314                return (None, Some(txn_guard));
315            }
316
317            let mut txn_guard = self.txn.lock();
318            let txn = txn_guard.take();
319            let Some(mut txn) = txn else {
320                return (None, Some(txn_guard));
321            };
322            let on_commit = txn.take_on_commit();
323            if let Some(origin) = config.origin.clone() {
324                txn.set_origin(origin);
325            }
326
327            if let Some(timestamp) = config.timestamp {
328                txn.set_timestamp(timestamp);
329            }
330
331            if let Some(msg) = config.commit_msg.as_ref() {
332                txn.set_msg(Some(msg.clone()));
333            }
334
335            let id_span = txn.id_span();
336            let mut options = txn.commit().unwrap();
337            // Empty commit returns Some(options). We may preserve parts of it for implicit commits.
338            if let Some(opts) = options.as_mut() {
339                // `origin` is an event-only label and never carries across an empty commit
340                if config.origin.is_some() {
341                    opts.set_origin(None);
342                }
343                // For explicit commits, swallow options from empty commit entirely
344                if !preserve_on_empty {
345                    options = None;
346                }
347            }
348            if config.immediate_renew && self.can_edit() {
349                let mut t = self.txn().unwrap();
350                if let Some(options) = options.as_ref() {
351                    t.set_options(options.clone());
352                }
353                *txn_guard = Some(t);
354            }
355
356            if let Some(on_commit) = on_commit {
357                drop(txn_guard);
358                on_commit(&self.state, &self.oplog, id_span);
359                txn_guard = self.txn.lock();
360                if !config.immediate_renew && txn_guard.is_some() {
361                    // make sure that txn_guard is None when config.immediate_renew is false
362                    continue;
363                }
364            }
365
366            return (
367                options,
368                if !config.immediate_renew {
369                    Some(txn_guard)
370                } else {
371                    None
372                },
373            );
374        }
375    }
376
377    /// Commit the cumulative auto commit transaction (explicit API).
378    ///
379    /// This is used by user-facing explicit commits. If the transaction is empty,
380    /// any provided commit options are swallowed and will NOT carry over.
381    #[instrument(skip_all)]
382    pub fn commit_with(
383        &self,
384        config: CommitOptions,
385    ) -> (
386        Option<CommitOptions>,
387        Option<LoroMutexGuard<'_, Option<Transaction>>>,
388    ) {
389        self.commit_internal(config, false)
390    }
391
392    /// Set the commit message of the next commit
393    pub fn set_next_commit_message(&self, message: &str) {
394        let mut binding = self.txn.lock();
395        let Some(txn) = binding.as_mut() else {
396            return;
397        };
398
399        if message.is_empty() {
400            txn.set_msg(None)
401        } else {
402            txn.set_msg(Some(message.into()))
403        }
404    }
405
406    /// Set the origin of the next commit
407    pub fn set_next_commit_origin(&self, origin: &str) {
408        let mut txn = self.txn.lock();
409        if let Some(txn) = txn.as_mut() {
410            txn.set_origin(origin.into());
411        }
412    }
413
414    /// Set the timestamp of the next commit
415    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
416        let mut txn = self.txn.lock();
417        if let Some(txn) = txn.as_mut() {
418            txn.set_timestamp(timestamp);
419        }
420    }
421
422    /// Set the options of the next commit
423    pub fn set_next_commit_options(&self, options: CommitOptions) {
424        let mut txn = self.txn.lock();
425        if let Some(txn) = txn.as_mut() {
426            txn.set_options(options);
427        }
428    }
429
430    /// Clear the options of the next commit
431    pub fn clear_next_commit_options(&self) {
432        let mut txn = self.txn.lock();
433        if let Some(txn) = txn.as_mut() {
434            txn.set_options(CommitOptions::new());
435        }
436    }
437
438    /// Set whether to record the timestamp of each change. Default is `false`.
439    ///
440    /// If enabled, the Unix timestamp will be recorded for each change automatically.
441    ///
442    /// You can also set each timestamp manually when you commit a change.
443    /// The timestamp manually set will override the automatic one.
444    ///
445    /// NOTE: Timestamps are forced to be in ascending order.
446    /// If you commit a new change with a timestamp that is less than the existing one,
447    /// the largest existing timestamp will be used instead.
448    #[inline]
449    pub fn set_record_timestamp(&self, record: bool) {
450        self.config.set_record_timestamp(record);
451    }
452
453    /// Set the interval of mergeable changes, in seconds.
454    ///
455    /// If two continuous local changes are within the interval, they will be merged into one change.
456    /// The default value is 1000 seconds.
457    #[inline]
458    pub fn set_change_merge_interval(&self, interval: i64) {
459        self.config.set_merge_interval(interval);
460    }
461
462    pub fn can_edit(&self) -> bool {
463        !self.is_detached() || self.config.detached_editing()
464    }
465
466    pub fn is_detached_editing_enabled(&self) -> bool {
467        self.config.detached_editing()
468    }
469
470    #[inline]
471    pub fn config_text_style(&self, text_style: StyleConfigMap) {
472        self.config.text_style_config.write().map = text_style.map;
473    }
474
475    #[inline]
476    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
477        self.config.text_style_config.write().default_style = text_style;
478    }
479    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
480        let doc = Self::new();
481        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
482        if mode.is_snapshot() {
483            doc.with_barrier(|| -> Result<(), LoroError> {
484                decode_snapshot(&doc, mode, body, Default::default())?;
485                Ok(())
486            })?;
487            Ok(doc)
488        } else {
489            Err(LoroError::DecodeError(
490                "Invalid encode mode".to_string().into(),
491            ))
492        }
493    }
494
495    /// Is the document empty? (no ops)
496    #[inline(always)]
497    pub fn can_reset_with_snapshot(&self) -> bool {
498        let oplog = self.oplog.lock();
499        if oplog.batch_importing {
500            return false;
501        }
502
503        if self.is_detached() {
504            return false;
505        }
506
507        oplog.is_empty() && self.state.lock().can_import_snapshot()
508    }
509
510    /// Whether [OpLog] and [DocState] are detached.
511    ///
512    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
513    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
514    /// You need to call `checkout` to make it take effect.
515    #[inline(always)]
516    pub fn is_detached(&self) -> bool {
517        self.detached.load(Acquire)
518    }
519
520    pub(crate) fn set_detached(&self, detached: bool) {
521        self.detached.store(detached, Release);
522    }
523
524    #[inline(always)]
525    pub fn peer_id(&self) -> PeerID {
526        self.state
527            .lock()
528            .peer
529            .load(std::sync::atomic::Ordering::Relaxed)
530    }
531
532    #[inline(always)]
533    pub fn detach(&self) {
534        self.with_barrier(|| self.set_detached(true));
535    }
536
537    #[inline(always)]
538    pub fn attach(&self) {
539        self.checkout_to_latest()
540    }
541
542    /// Get the timestamp of the current state.
543    /// It's the last edit time of the [DocState].
544    pub fn state_timestamp(&self) -> Timestamp {
545        // Acquire locks in correct order: read frontiers first, then query OpLog.
546        let f = { self.state.lock().frontiers.clone() };
547        self.oplog.lock().get_timestamp_of_version(&f)
548    }
549
550    #[inline(always)]
551    pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
552        &self.state
553    }
554
555    #[inline]
556    pub fn get_state_deep_value(&self) -> LoroValue {
557        self.state.lock().get_deep_value()
558    }
559
560    #[inline(always)]
561    pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
562        &self.oplog
563    }
564
565    #[inline(always)]
566    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
567        let s = debug_span!("import", peer = self.peer_id());
568        let _e = s.enter();
569        self.import_with(bytes, Default::default())
570    }
571
572    #[inline]
573    pub fn import_with(
574        &self,
575        bytes: &[u8],
576        origin: InternalString,
577    ) -> Result<ImportStatus, LoroError> {
578        self.with_barrier(|| self._import_with(bytes, origin))
579    }
580
581    #[tracing::instrument(skip_all)]
582    fn _import_with(
583        &self,
584        bytes: &[u8],
585        origin: InternalString,
586    ) -> Result<ImportStatus, LoroError> {
587        ensure_cov::notify_cov("loro_internal::import");
588        let parsed = parse_header_and_body(bytes, true)?;
589        loro_common::info!("Importing with mode={:?}", &parsed.mode);
590        let result = match parsed.mode {
591            EncodeMode::OutdatedRle => {
592                if self.state.lock().is_in_txn() {
593                    return Err(LoroError::ImportWhenInTxn);
594                }
595
596                let s = tracing::span!(
597                    tracing::Level::INFO,
598                    "Import updates ",
599                    peer = self.peer_id()
600                );
601                let _e = s.enter();
602                self.update_oplog_and_apply_delta_to_state_if_needed(
603                    |oplog| oplog.decode(parsed),
604                    origin,
605                )
606            }
607            EncodeMode::OutdatedSnapshot => {
608                if self.can_reset_with_snapshot() {
609                    loro_common::info!("Init by snapshot {}", self.peer_id());
610                    decode_snapshot(self, parsed.mode, parsed.body, origin)
611                } else {
612                    self.update_oplog_and_apply_delta_to_state_if_needed(
613                        |oplog| oplog.decode(parsed),
614                        origin,
615                    )
616                }
617            }
618            EncodeMode::FastSnapshot => {
619                if self.can_reset_with_snapshot() {
620                    ensure_cov::notify_cov("loro_internal::import::snapshot");
621                    loro_common::info!("Init by fast snapshot {}", self.peer_id());
622                    decode_snapshot(self, parsed.mode, parsed.body, origin)
623                } else {
624                    self.import_changes_and_apply_delta_to_state_if_needed(
625                        |oplog| encoding::decode_oplog_changes(oplog, parsed),
626                        origin,
627                    )
628
629                    // let new_doc = LoroDoc::new();
630                    // new_doc.import(bytes)?;
631                    // let updates = new_doc.export(ExportMode::updates(&self.oplog_vv())).unwrap();
632                    // return self.import_with(updates.as_slice(), origin);
633                }
634            }
635            EncodeMode::FastUpdates => self.import_changes_and_apply_delta_to_state_if_needed(
636                |oplog| encoding::decode_oplog_changes(oplog, parsed),
637                origin,
638            ),
639            EncodeMode::Auto => {
640                unreachable!()
641            }
642        };
643
644        self.emit_events();
645
646        result
647    }
648
649    #[tracing::instrument(skip_all)]
650    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
651        &self,
652        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
653        origin: InternalString,
654    ) -> Result<ImportStatus, LoroError> {
655        let mut oplog = self.oplog.lock();
656        oplog.begin_import_rollback();
657        if !self.is_detached() {
658            let old_vv = oplog.vv().clone();
659            let old_frontiers = oplog.frontiers().clone();
660            let result = f(&mut oplog);
661            if &old_vv != oplog.vv() {
662                let mut diff = DiffCalculator::new(false);
663                let (diff, diff_mode) = diff.calc_diff_internal(
664                    &oplog,
665                    &old_vv,
666                    &old_frontiers,
667                    oplog.vv(),
668                    oplog.dag.get_frontiers(),
669                    None,
670                );
671                let mut state = self.state.lock();
672                if let Err(e) = state.apply_diff(
673                    InternalDocDiff {
674                        origin,
675                        diff: (diff).into(),
676                        by: EventTriggerKind::Import,
677                        new_version: Cow::Owned(oplog.frontiers().clone()),
678                    },
679                    diff_mode,
680                ) {
681                    oplog.rollback_import();
682                    return Err(e);
683                }
684            }
685            match result {
686                Ok(result) => {
687                    oplog.commit_import_rollback();
688                    Ok(result)
689                }
690                Err(e) => {
691                    // Some import errors are reported after a valid prefix has
692                    // been inserted into the oplog. Keep that prefix if it was
693                    // also applied to state; rollback is for failed state apply
694                    // or decode errors that made no visible oplog progress.
695                    if &old_vv == oplog.vv() {
696                        oplog.rollback_import();
697                    } else {
698                        oplog.commit_import_rollback();
699                    }
700                    Err(e)
701                }
702            }
703        } else {
704            match f(&mut oplog) {
705                Ok(result) => {
706                    oplog.commit_import_rollback();
707                    Ok(result)
708                }
709                Err(e) => {
710                    oplog.rollback_import();
711                    Err(e)
712                }
713            }
714        }
715    }
716
717    #[tracing::instrument(skip_all)]
718    pub(crate) fn import_changes_and_apply_delta_to_state_if_needed(
719        &self,
720        decode_changes: impl FnOnce(&mut OpLog) -> Result<Vec<Change>, LoroError>,
721        origin: InternalString,
722    ) -> Result<ImportStatus, LoroError> {
723        let mut oplog = self.oplog.lock();
724        let arena_checkpoint = oplog.arena.checkpoint_for_rollback();
725        let changes = match decode_changes(&mut oplog) {
726            Ok(changes) => changes,
727            Err(e) => {
728                oplog.arena.rollback(arena_checkpoint);
729                return Err(e);
730            }
731        };
732
733        let preflight = oplog.preflight_import_changes(&changes);
734        if preflight.has_deps_before_shallow_root
735            && (self.is_detached() || !preflight.applies_to_dag)
736        {
737            oplog.arena.rollback(arena_checkpoint);
738            return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
739        }
740
741        if self.is_detached() {
742            let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
743            if result.has_deps_before_shallow_root {
744                return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
745            }
746
747            return Ok(result.status);
748        }
749
750        if !preflight.applies_to_dag {
751            let pending_root_containers = pending_root_containers_to_materialize(&oplog, &changes);
752            let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
753            if result.has_deps_before_shallow_root {
754                oplog.arena.rollback(arena_checkpoint);
755                return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
756            }
757
758            if !pending_root_containers.is_empty() {
759                let mut state = self.state.lock();
760                for id in pending_root_containers {
761                    state.ensure_container(&id);
762                }
763            }
764
765            return Ok(result.status);
766        }
767
768        let old_vv = oplog.vv().clone();
769        let old_frontiers = oplog.frontiers().clone();
770        let rollback_enabled = preflight.needs_state_apply_rollback;
771        if rollback_enabled {
772            oplog.begin_import_rollback_with_arena(arena_checkpoint);
773        }
774
775        let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
776        if &old_vv != oplog.vv() {
777            let mut diff = DiffCalculator::new(false);
778            let (diff, diff_mode) = diff.calc_diff_internal(
779                &oplog,
780                &old_vv,
781                &old_frontiers,
782                oplog.vv(),
783                oplog.dag.get_frontiers(),
784                None,
785            );
786            let mut state = self.state.lock();
787            if let Err(e) = state.apply_diff(
788                InternalDocDiff {
789                    origin,
790                    diff: (diff).into(),
791                    by: EventTriggerKind::Import,
792                    new_version: Cow::Owned(oplog.frontiers().clone()),
793                },
794                diff_mode,
795            ) {
796                if rollback_enabled {
797                    oplog.rollback_import();
798                    return Err(e);
799                }
800
801                panic!("state apply returned Err for import without rollback guard: {e}");
802            }
803        }
804
805        if result.has_deps_before_shallow_root {
806            if rollback_enabled {
807                oplog.commit_import_rollback();
808            }
809            return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
810        }
811
812        if rollback_enabled {
813            oplog.commit_import_rollback();
814        }
815        Ok(result.status)
816    }
817
818    fn emit_events(&self) {
819        // we should not hold the lock when emitting events
820        let events = {
821            let mut state = self.state.lock();
822            state.take_events()
823        };
824        for event in events {
825            self.observer.emit(event);
826        }
827    }
828
829    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
830        let mut state = self.state.lock();
831        state.take_events()
832    }
833
834    /// Import the json schema updates.
835    ///
836    /// only supports backward compatibility but not forward compatibility.
837    #[tracing::instrument(skip_all)]
838    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
839        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
840        self.with_barrier(|| {
841            let result = self.import_changes_and_apply_delta_to_state_if_needed(
842                |oplog| crate::encoding::json_schema::decode_json_changes(json, &oplog.arena),
843                Default::default(),
844            );
845            self.emit_events();
846            result
847        })
848    }
849
850    pub fn export_json_updates(
851        &self,
852        start_vv: &VersionVector,
853        end_vv: &VersionVector,
854        with_peer_compression: bool,
855    ) -> JsonSchema {
856        self.with_barrier(|| {
857            let oplog = self.oplog.lock();
858            let mut start_vv = start_vv;
859            let _temp: Option<VersionVector>;
860            if !oplog.dag.shallow_since_vv().is_empty() {
861                // Make sure that start_vv >= shallow_since_vv
862                let mut include_all = true;
863                for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
864                    if start_vv.get(peer).unwrap_or(&0) < counter {
865                        include_all = false;
866                        break;
867                    }
868                }
869                if !include_all {
870                    let mut vv = start_vv.clone();
871                    for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
872                        vv.extend_to_include_end_id(ID::new(peer, counter));
873                    }
874                    _temp = Some(vv);
875                    start_vv = _temp.as_ref().unwrap();
876                }
877            }
878
879            crate::encoding::json_schema::export_json(
880                &oplog,
881                start_vv,
882                end_vv,
883                with_peer_compression,
884            )
885        })
886    }
887
888    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
889        let oplog = self.oplog.lock();
890        let mut changes = export_json_in_id_span(&oplog, id_span);
891        if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
892            let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
893            changes.push(change_json);
894        }
895        changes
896    }
897
898    /// Get the version vector of the current OpLog
899    #[inline]
900    pub fn oplog_vv(&self) -> VersionVector {
901        self.oplog.lock().vv().clone()
902    }
903
904    /// Get the version vector of the current [DocState]
905    #[inline]
906    pub fn state_vv(&self) -> VersionVector {
907        let oplog = self.oplog.lock();
908        let f = &self.state.lock().frontiers;
909        oplog.dag.frontiers_to_vv(f).unwrap()
910    }
911
912    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
913        let value: LoroValue = self.state.lock().get_value_by_path(path)?;
914        if let LoroValue::Container(c) = value {
915            Some(ValueOrHandler::Handler(Handler::new_attached(
916                c.clone(),
917                self.clone(),
918            )))
919        } else {
920            Some(ValueOrHandler::Value(value))
921        }
922    }
923
924    /// Get the handler by the string path.
925    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
926        let path = str_to_path(path)?;
927        self.get_by_path(&path)
928    }
929
930    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
931        let arena = &self.arena;
932        let txn = self.txn.lock();
933        let txn = txn.as_ref()?;
934        let ops_ = txn.local_ops();
935        let new_id = ID {
936            peer: *txn.peer(),
937            counter: ops_.first()?.counter,
938        };
939        let change = ChangeRef {
940            id: &new_id,
941            deps: txn.frontiers(),
942            timestamp: &txn
943                .timestamp()
944                .as_ref()
945                .copied()
946                .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
947            commit_msg: txn.msg(),
948            ops: ops_,
949            lamport: txn.lamport(),
950        };
951        let json = encode_change_to_json(change, arena);
952        Some(json)
953    }
954
955    #[inline]
956    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
957        if self.has_container(&id) {
958            self.ensure_root_container(&id);
959            Some(Handler::new_attached(id, self.clone()))
960        } else {
961            None
962        }
963    }
964
965    #[inline]
966    fn ensure_root_container(&self, id: &ContainerID) {
967        // Mergeable roots stay lazily materialized: their existence is derived from the
968        // parent map's child ref, and their state is only created on first op (or at
969        // export via the alive-container walk).
970        if id.is_root() && !id.is_mergeable() {
971            self.state.lock().ensure_container(id);
972        }
973    }
974
975    /// id can be a str, ContainerID, or ContainerIdRaw.
976    /// if it's str it will use Root container, which will not be None
977    #[inline]
978    pub fn try_get_text<I: IntoContainerId>(&self, id: I) -> Option<TextHandler> {
979        let id = id.into_container_id(&self.arena, ContainerType::Text);
980        if !self.has_container(&id) {
981            return None;
982        }
983        self.ensure_root_container(&id);
984        Handler::new_attached(id, self.clone()).into_text().ok()
985    }
986
987    /// id can be a str, ContainerID, or ContainerIdRaw.
988    /// if it's str it will use Root container, which will not be None
989    #[inline]
990    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
991        self.try_get_text(id)
992            .expect("The container does not exist in the document. Use `try_get_text` or `get_container` to check for existence.")
993    }
994
995    /// id can be a str, ContainerID, or ContainerIdRaw.
996    /// if it's str it will use Root container, which will not be None
997    #[inline]
998    pub fn try_get_list<I: IntoContainerId>(&self, id: I) -> Option<ListHandler> {
999        let id = id.into_container_id(&self.arena, ContainerType::List);
1000        if !self.has_container(&id) {
1001            return None;
1002        }
1003        self.ensure_root_container(&id);
1004        Handler::new_attached(id, self.clone()).into_list().ok()
1005    }
1006
1007    /// id can be a str, ContainerID, or ContainerIdRaw.
1008    /// if it's str it will use Root container, which will not be None
1009    #[inline]
1010    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
1011        self.try_get_list(id)
1012            .expect("The container does not exist in the document. Use `try_get_list` or `get_container` to check for existence.")
1013    }
1014
1015    /// id can be a str, ContainerID, or ContainerIdRaw.
1016    /// if it's str it will use Root container, which will not be None
1017    #[inline]
1018    pub fn try_get_movable_list<I: IntoContainerId>(&self, id: I) -> Option<MovableListHandler> {
1019        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
1020        if !self.has_container(&id) {
1021            return None;
1022        }
1023        self.ensure_root_container(&id);
1024        Handler::new_attached(id, self.clone())
1025            .into_movable_list()
1026            .ok()
1027    }
1028
1029    /// id can be a str, ContainerID, or ContainerIdRaw.
1030    /// if it's str it will use Root container, which will not be None
1031    #[inline]
1032    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
1033        self.try_get_movable_list(id)
1034            .expect("The container does not exist in the document. Use `try_get_movable_list` or `get_container` to check for existence.")
1035    }
1036
1037    /// id can be a str, ContainerID, or ContainerIdRaw.
1038    /// if it's str it will use Root container, which will not be None
1039    #[inline]
1040    pub fn try_get_map<I: IntoContainerId>(&self, id: I) -> Option<MapHandler> {
1041        let id = id.into_container_id(&self.arena, ContainerType::Map);
1042        if !self.has_container(&id) {
1043            return None;
1044        }
1045        self.ensure_root_container(&id);
1046        Handler::new_attached(id, self.clone()).into_map().ok()
1047    }
1048
1049    /// id can be a str, ContainerID, or ContainerIdRaw.
1050    /// if it's str it will use Root container, which will not be None
1051    #[inline]
1052    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
1053        self.try_get_map(id)
1054            .expect("The container does not exist in the document. Use `try_get_map` or `get_container` to check for existence.")
1055    }
1056
1057    /// id can be a str, ContainerID, or ContainerIdRaw.
1058    /// if it's str it will use Root container, which will not be None
1059    #[inline]
1060    pub fn try_get_tree<I: IntoContainerId>(&self, id: I) -> Option<TreeHandler> {
1061        let id = id.into_container_id(&self.arena, ContainerType::Tree);
1062        if !self.has_container(&id) {
1063            return None;
1064        }
1065        self.ensure_root_container(&id);
1066        Handler::new_attached(id, self.clone()).into_tree().ok()
1067    }
1068
1069    /// id can be a str, ContainerID, or ContainerIdRaw.
1070    /// if it's str it will use Root container, which will not be None
1071    #[inline]
1072    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
1073        self.try_get_tree(id)
1074            .expect("The container does not exist in the document. Use `try_get_tree` or `get_container` to check for existence.")
1075    }
1076
1077    #[cfg(feature = "counter")]
1078    pub fn try_get_counter<I: IntoContainerId>(
1079        &self,
1080        id: I,
1081    ) -> Option<crate::handler::counter::CounterHandler> {
1082        let id = id.into_container_id(&self.arena, ContainerType::Counter);
1083        if !self.has_container(&id) {
1084            return None;
1085        }
1086        self.ensure_root_container(&id);
1087        Handler::new_attached(id, self.clone()).into_counter().ok()
1088    }
1089
1090    #[cfg(feature = "counter")]
1091    pub fn get_counter<I: IntoContainerId>(
1092        &self,
1093        id: I,
1094    ) -> crate::handler::counter::CounterHandler {
1095        self.try_get_counter(id)
1096            .expect("The container does not exist in the document. Use `try_get_counter` or `get_container` to check for existence.")
1097    }
1098
1099    #[must_use]
1100    pub fn has_container(&self, id: &ContainerID) -> bool {
1101        if id.is_root() && !id.is_mergeable() {
1102            return true;
1103        }
1104
1105        let exist = self.state.lock().does_container_exist(id);
1106        exist
1107    }
1108
1109    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
1110    ///
1111    /// This is an internal API. You should NOT use it directly.
1112    ///
1113    /// # Internal
1114    ///
1115    /// This method will use the diff calculator to calculate the diff required to time travel
1116    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
1117    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
1118    ///
1119    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
1120    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
1121    /// distance from id_span to the current latest version.
1122    #[instrument(level = "info", skip_all)]
1123    pub fn undo_internal(
1124        &self,
1125        id_span: IdSpan,
1126        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1127        post_transform_base: Option<&DiffBatch>,
1128        before_diff: &mut dyn FnMut(&DiffBatch),
1129    ) -> LoroResult<CommitWhenDrop<'_>> {
1130        if !self.can_edit() {
1131            return Err(LoroError::EditWhenDetached);
1132        }
1133
1134        let (options, txn) = self.implicit_commit_then_stop();
1135        if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
1136            self.renew_txn_if_auto_commit(options);
1137            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
1138        }
1139
1140        let (was_recording, latest_frontiers) = {
1141            let mut state = self.state.lock();
1142            let was_recording = state.is_recording();
1143            state.stop_and_clear_recording();
1144            (was_recording, state.frontiers.clone())
1145        };
1146
1147        let spans = self.oplog.lock().split_span_based_on_deps(id_span);
1148        let diff = crate::undo::undo(
1149            spans,
1150            match post_transform_base {
1151                Some(d) => Either::Right(d),
1152                None => Either::Left(&latest_frontiers),
1153            },
1154            |from, to| {
1155                self._checkout_without_emitting(from, false, false).unwrap();
1156                self.state.lock().start_recording();
1157                self._checkout_without_emitting(to, false, false).unwrap();
1158                let mut state = self.state.lock();
1159                let e = state.take_events();
1160                state.stop_and_clear_recording();
1161                DiffBatch::new(e)
1162            },
1163            before_diff,
1164        );
1165
1166        // println!("\nundo_internal: diff: {:?}", diff);
1167        // println!("container remap: {:?}", container_remap);
1168
1169        self._checkout_without_emitting(&latest_frontiers, false, false)?;
1170        self.set_detached(false);
1171        if was_recording {
1172            self.state.lock().start_recording();
1173        }
1174        drop(txn);
1175        self.start_auto_commit();
1176        // Try applying the diff, but ignore the error if it happens.
1177        // MovableList's undo behavior is too tricky to handle in a collaborative env
1178        // so in edge cases this may be an Error
1179        if let Err(e) = self._apply_diff(diff, container_remap, true) {
1180            warn!("Undo Failed {:?}", e);
1181        }
1182
1183        if let Some(options) = options {
1184            self.set_next_commit_options(options);
1185        }
1186        Ok(CommitWhenDrop {
1187            doc: self,
1188            default_options: CommitOptions::new().origin("undo"),
1189        })
1190    }
1191
1192    /// Generate a series of local operations that can revert the current doc to the target
1193    /// version.
1194    ///
1195    /// Internally, it will calculate the diff between the current state and the target state,
1196    /// and apply the diff to the current state.
1197    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1198        // TODO: test when the doc is readonly
1199        // TODO: test when the doc is detached but enabled editing
1200        let f = self.state_frontiers();
1201        let diff = self.diff(&f, target)?;
1202        self._apply_diff(diff, &mut Default::default(), false)
1203    }
1204
1205    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
1206    ///
1207    /// NOTE: This method will make the doc enter the **detached mode**.
1208    // FIXME: This method needs testing (no event should be emitted during processing this)
1209    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1210        {
1211            // Check whether a and b are valid before checkout so this returns a normal error
1212            // instead of panicking on shallow docs.
1213            let oplog = self.oplog.lock();
1214            let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1215                for id in frontiers.iter() {
1216                    if !oplog.dag.contains(id) {
1217                        return Err(LoroError::FrontiersNotFound(id));
1218                    }
1219                }
1220
1221                if oplog.dag.is_before_shallow_root(frontiers) {
1222                    return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1223                }
1224
1225                Ok(())
1226            };
1227
1228            validate_frontiers(a)?;
1229            validate_frontiers(b)?;
1230        }
1231
1232        let (options, txn) = self.implicit_commit_then_stop();
1233        let was_detached = self.is_detached();
1234        let old_frontiers = self.state_frontiers();
1235        let was_recording = {
1236            let mut state = self.state.lock();
1237            let is_recording = state.is_recording();
1238            state.stop_and_clear_recording();
1239            is_recording
1240        };
1241        let result = (|| {
1242            self._checkout_without_emitting(a, true, false)?;
1243            self.state.lock().start_recording();
1244            self._checkout_without_emitting(b, true, false)?;
1245            let mut state = self.state.lock();
1246            let e = state.take_events();
1247            state.stop_and_clear_recording();
1248            Ok::<_, LoroError>(e)
1249        })();
1250
1251        // Always restore state regardless of whether diff calculation succeeded
1252        self._checkout_without_emitting(&old_frontiers, false, false)
1253            .unwrap();
1254        drop(txn);
1255        if !was_detached {
1256            self.set_detached(false);
1257            self.renew_txn_if_auto_commit(options);
1258        }
1259        if was_recording {
1260            self.state.lock().start_recording();
1261        }
1262        result.map(DiffBatch::new)
1263    }
1264
1265    /// Apply a diff to the current state.
1266    #[inline(always)]
1267    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1268        self._apply_diff(diff, &mut Default::default(), true)
1269    }
1270
1271    /// Apply a diff to the current state.
1272    ///
1273    /// This method will not recreate containers with the same [ContainerID]s.
1274    /// While this can be convenient in certain cases, it can break several internal invariants:
1275    ///
1276    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
1277    ///    would result in multiple instances of the same container in the document.
1278    /// 2. Unreachable containers should be removable from the state when necessary.
1279    ///
1280    /// However, the diff may contain operations that depend on container IDs.
1281    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
1282    pub(crate) fn _apply_diff(
1283        &self,
1284        diff: DiffBatch,
1285        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1286        skip_unreachable: bool,
1287    ) -> LoroResult<()> {
1288        if !self.can_edit() {
1289            return Err(LoroError::EditWhenDetached);
1290        }
1291
1292        let mut ans: LoroResult<()> = Ok(());
1293        let mut missing_containers: Vec<ContainerID> = Vec::new();
1294        for (mut id, diff) in diff.into_iter() {
1295            let mut remapped = false;
1296            while let Some(rid) = container_remap.get(&id) {
1297                remapped = true;
1298                id = rid.clone();
1299            }
1300
1301            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1302                // Not in arena does not imply non-existent; consult state/kv and register lazily
1303                let exists = self.state.lock().does_container_exist(&id);
1304                if !exists {
1305                    missing_containers.push(id);
1306                    continue;
1307                }
1308                // Ensure registration so handlers can be created
1309                self.state.lock().ensure_container(&id);
1310            }
1311
1312            if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1313                continue;
1314            }
1315
1316            let Some(h) = self.get_handler(id.clone()) else {
1317                return Err(LoroError::ContainersNotFound {
1318                    containers: Box::new(vec![id]),
1319                });
1320            };
1321            if let Err(e) = h.apply_diff(diff, container_remap) {
1322                ans = Err(e);
1323            }
1324        }
1325
1326        if !missing_containers.is_empty() {
1327            return Err(LoroError::ContainersNotFound {
1328                containers: Box::new(missing_containers),
1329            });
1330        }
1331
1332        ans
1333    }
1334
1335    /// This is for debugging purpose. It will travel the whole oplog
1336    #[inline]
1337    pub fn diagnose_size(&self) {
1338        self.oplog().lock().diagnose_size();
1339    }
1340
1341    #[inline]
1342    pub fn oplog_frontiers(&self) -> Frontiers {
1343        self.oplog().lock().frontiers().clone()
1344    }
1345
1346    #[inline]
1347    pub fn state_frontiers(&self) -> Frontiers {
1348        self.state.lock().frontiers.clone()
1349    }
1350
1351    /// - Ordering::Less means self is less than target or parallel
1352    /// - Ordering::Equal means versions equal
1353    /// - Ordering::Greater means self's version is greater than target
1354    #[inline]
1355    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1356        self.oplog().lock().cmp_with_frontiers(other)
1357    }
1358
1359    /// Compare two [Frontiers] causally.
1360    ///
1361    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1362    #[inline]
1363    pub fn cmp_frontiers(
1364        &self,
1365        a: &Frontiers,
1366        b: &Frontiers,
1367    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1368        self.oplog().lock().cmp_frontiers(a, b)
1369    }
1370
1371    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1372        let mut state = self.state.lock();
1373        if !state.is_recording() {
1374            state.start_recording();
1375        }
1376
1377        self.observer.subscribe_root(callback)
1378    }
1379
1380    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1381        let mut state = self.state.lock();
1382        if !state.is_recording() {
1383            state.start_recording();
1384        }
1385
1386        self.observer.subscribe(container_id, callback)
1387    }
1388
1389    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1390        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1391        activate();
1392        sub
1393    }
1394
1395    // PERF: opt
1396    #[tracing::instrument(skip_all)]
1397    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1398        if bytes.is_empty() {
1399            return Ok(ImportStatus::default());
1400        }
1401
1402        if bytes.len() == 1 {
1403            return self.import(&bytes[0]);
1404        }
1405
1406        let mut success = VersionRange::default();
1407        let mut meta_arr = bytes
1408            .iter()
1409            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1410            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1411        meta_arr.sort_by(|a, b| {
1412            a.0.mode
1413                .cmp(&b.0.mode)
1414                .then(b.0.change_num.cmp(&a.0.change_num))
1415        });
1416
1417        let (options, txn) = self.implicit_commit_then_stop();
1418        // Why we should keep locking `txn` here
1419        //
1420        // In a multi-threaded environment, `import_batch` used to drop the txn lock
1421        // (via `commit_then_stop` + `drop(txn)`) and call `detach()`/`checkout_to_latest()`
1422        // around the batch import. That created a race where another thread could
1423        // start or renew the auto-commit txn and perform local edits while we were
1424        // importing and temporarily detached. Those interleaved local edits could
1425        // violate invariants between `OpLog` and `DocState` (e.g., state being
1426        // updated when we expect it not to, missed events, or inconsistent
1427        // frontiers), as exposed by the loom test `local_edits_during_batch_import`.
1428        //
1429        // The fix is to hold the txn mutex for the entire critical section:
1430        // - Stop the current txn and keep the mutex guard.
1431        // - Force-detach with `set_detached(true)` (avoids `detach()` side effects),
1432        //   then run each `_import_with(...)` while detached so imports only touch
1433        //   the `OpLog`.
1434        // - After importing, reattach by checking out to latest and renew the txn
1435        //   using `_checkout_to_latest_with_guard`, which keeps the mutex held while
1436        //   (re)starting the auto-commit txn.
1437        //
1438        // Holding the lock ensures no concurrent thread can create/renew a txn and
1439        // do local edits in the middle of the batch import, making the whole
1440        // operation atomic with respect to local edits.
1441        let is_detached = self.is_detached();
1442        self.set_detached(true);
1443        self.oplog.lock().batch_importing = true;
1444        let mut err = None;
1445        for (_meta, data) in meta_arr {
1446            match self._import_with(data, Default::default()) {
1447                Ok(s) => {
1448                    for (peer, (start, end)) in s.success.iter() {
1449                        match success.0.entry(*peer) {
1450                            Entry::Occupied(mut e) => {
1451                                e.get_mut().1 = *end.max(&e.get().1);
1452                            }
1453                            Entry::Vacant(e) => {
1454                                e.insert((*start, *end));
1455                            }
1456                        }
1457                    }
1458                }
1459                Err(e) => {
1460                    err = Some(e);
1461                }
1462            }
1463        }
1464
1465        let mut oplog = self.oplog.lock();
1466        oplog.batch_importing = false;
1467        let pending = oplog.pending_changes.version_range();
1468        drop(oplog);
1469        if !is_detached {
1470            self._checkout_to_latest_with_guard(txn);
1471        } else {
1472            drop(txn);
1473        }
1474
1475        self.renew_txn_if_auto_commit(options);
1476        if let Some(err) = err {
1477            return Err(err);
1478        }
1479
1480        Ok(ImportStatus {
1481            success,
1482            pending: if pending.is_empty() {
1483                None
1484            } else {
1485                Some(pending)
1486            },
1487        })
1488    }
1489
1490    /// Get shallow value of the document.
1491    #[inline]
1492    pub fn get_value(&self) -> LoroValue {
1493        self.state.lock().get_value()
1494    }
1495
1496    /// Get deep value of the document.
1497    #[inline]
1498    pub fn get_deep_value(&self) -> LoroValue {
1499        self.state.lock().get_deep_value()
1500    }
1501
1502    /// Get deep value of the document with container id
1503    #[inline]
1504    pub fn get_deep_value_with_id(&self) -> LoroValue {
1505        self.state.lock().get_deep_value_with_id()
1506    }
1507
1508    pub fn checkout_to_latest(&self) {
1509        let (options, _guard) = self.implicit_commit_then_stop();
1510        if !self.is_detached() {
1511            drop(_guard);
1512            self.renew_txn_if_auto_commit(options);
1513            return;
1514        }
1515
1516        self._checkout_to_latest_without_commit(true)
1517            .expect("checkout to oplog frontiers should succeed");
1518        self.emit_events();
1519        drop(_guard);
1520        self.renew_txn_if_auto_commit(options);
1521    }
1522
1523    fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1524        if !self.is_detached() {
1525            self._renew_txn_if_auto_commit_with_guard(None, guard);
1526            return;
1527        }
1528
1529        self._checkout_to_latest_without_commit(true)
1530            .expect("checkout to oplog frontiers should succeed");
1531        self._renew_txn_if_auto_commit_with_guard(None, guard);
1532    }
1533
1534    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1535    pub(crate) fn _checkout_to_latest_without_commit(
1536        &self,
1537        to_commit_then_renew: bool,
1538    ) -> LoroResult<()> {
1539        self._checkout_to_latest_without_commit_with_event(
1540            to_commit_then_renew,
1541            "checkout".into(),
1542            EventTriggerKind::Checkout,
1543        )
1544    }
1545
1546    pub(crate) fn _checkout_to_latest_without_commit_as_import(
1547        &self,
1548        to_commit_then_renew: bool,
1549        origin: InternalString,
1550    ) -> LoroResult<()> {
1551        self._checkout_to_latest_without_commit_with_event(
1552            to_commit_then_renew,
1553            origin,
1554            EventTriggerKind::Import,
1555        )
1556    }
1557
1558    fn _checkout_to_latest_without_commit_with_event(
1559        &self,
1560        to_commit_then_renew: bool,
1561        origin: InternalString,
1562        triggered_by: EventTriggerKind,
1563    ) -> LoroResult<()> {
1564        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1565            let f = self.oplog_frontiers();
1566            let this = &self;
1567            let frontiers = &f;
1568            this._checkout_without_emitting_with_event(
1569                frontiers,
1570                false,
1571                to_commit_then_renew,
1572                origin,
1573                triggered_by,
1574            )?;
1575            // We don't need to shrink frontiers because oplog's frontiers are already shrinked.
1576            this.emit_events();
1577            if this.config.detached_editing() {
1578                this.renew_peer_id();
1579            }
1580
1581            self.set_detached(false);
1582            Ok(())
1583        })
1584    }
1585
1586    /// Checkout [DocState] to a specific version.
1587    ///
1588    /// This will make the current [DocState] detached from the latest version of [OpLog].
1589    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1590    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1591        let was_detached = self.is_detached();
1592        let (options, guard) = self.implicit_commit_then_stop();
1593        let result = self._checkout_without_emitting(frontiers, true, true);
1594        if result.is_ok() {
1595            self.emit_events();
1596        }
1597        drop(guard);
1598        if self.config.detached_editing() {
1599            if result.is_ok() {
1600                self.renew_peer_id();
1601            }
1602            self.renew_txn_if_auto_commit(options);
1603        } else if result.is_err() {
1604            if !was_detached {
1605                self.renew_txn_if_auto_commit(options);
1606            }
1607        } else if !self.is_detached() {
1608            self.renew_txn_if_auto_commit(options);
1609        }
1610
1611        result
1612    }
1613
1614    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1615    #[instrument(level = "info", skip(self))]
1616    pub(crate) fn _checkout_without_emitting(
1617        &self,
1618        frontiers: &Frontiers,
1619        to_shrink_frontiers: bool,
1620        to_commit_then_renew: bool,
1621    ) -> Result<(), LoroError> {
1622        self._checkout_without_emitting_with_event(
1623            frontiers,
1624            to_shrink_frontiers,
1625            to_commit_then_renew,
1626            "checkout".into(),
1627            EventTriggerKind::Checkout,
1628        )
1629    }
1630
1631    fn _checkout_without_emitting_with_event(
1632        &self,
1633        frontiers: &Frontiers,
1634        to_shrink_frontiers: bool,
1635        _to_commit_then_renew: bool,
1636        origin: InternalString,
1637        triggered_by: EventTriggerKind,
1638    ) -> Result<(), LoroError> {
1639        if !self.txn.is_locked() {
1640            return Err(LoroError::TransactionError(
1641                "checkout requires the transaction mutex to be held"
1642                    .to_string()
1643                    .into_boxed_str(),
1644            ));
1645        }
1646        let from_frontiers = self.state_frontiers();
1647        loro_common::info!(
1648            "checkout from={:?} to={:?} cur_vv={:?}",
1649            from_frontiers,
1650            frontiers,
1651            self.oplog_vv()
1652        );
1653
1654        if &from_frontiers == frontiers {
1655            return Ok(());
1656        }
1657
1658        let oplog = self.oplog.lock();
1659        if oplog.dag.is_before_shallow_root(frontiers) {
1660            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1661        }
1662
1663        let frontiers = if to_shrink_frontiers {
1664            shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1665        } else {
1666            frontiers.clone()
1667        };
1668
1669        if from_frontiers == frontiers {
1670            return Ok(());
1671        }
1672
1673        let mut state = self.state.lock();
1674        let mut calc = self.diff_calculator.lock();
1675        for i in frontiers.iter() {
1676            if !oplog.dag.contains(i) {
1677                return Err(LoroError::FrontiersNotFound(i));
1678            }
1679        }
1680
1681        let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1682            LoroError::NotFoundError(
1683                format!(
1684                    "Cannot find the current state version {:?}",
1685                    state.frontiers
1686                )
1687                .into_boxed_str(),
1688            )
1689        })?;
1690        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1691            return Err(LoroError::NotFoundError(
1692                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1693            ));
1694        };
1695
1696        self.set_detached(true);
1697        let (diff, diff_mode) =
1698            calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1699        state.apply_diff(
1700            InternalDocDiff {
1701                origin,
1702                diff: Cow::Owned(diff),
1703                by: triggered_by,
1704                new_version: Cow::Owned(frontiers.clone()),
1705            },
1706            diff_mode,
1707        )?;
1708
1709        Ok(())
1710    }
1711
1712    #[inline]
1713    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1714        self.oplog.lock().dag.vv_to_frontiers(vv)
1715    }
1716
1717    #[inline]
1718    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1719        self.oplog.lock().dag.frontiers_to_vv(frontiers)
1720    }
1721
1722    /// Import ops from other doc.
1723    ///
1724    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1725    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1726        let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1727        self.import(&updates)
1728    }
1729
1730    pub(crate) fn arena(&self) -> &SharedArena {
1731        &self.arena
1732    }
1733
1734    #[inline]
1735    pub fn len_ops(&self) -> usize {
1736        if self.oplog.can_lock_in_this_thread() {
1737            return self.oplog.lock().visible_op_count_exact();
1738        }
1739
1740        self.visible_op_count.load(Acquire)
1741    }
1742
1743    #[inline]
1744    pub fn len_changes(&self) -> usize {
1745        let oplog = self.oplog.lock();
1746        oplog.len_changes()
1747    }
1748
1749    pub fn config(&self) -> &Configure {
1750        &self.config
1751    }
1752
1753    /// This method compare the consistency between the current doc state
1754    /// and the state calculated by diff calculator from beginning.
1755    ///
1756    /// Panic when it's not consistent
1757    pub fn check_state_diff_calc_consistency_slow(&self) {
1758        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1759        {
1760            static IS_CHECKING: std::sync::atomic::AtomicBool =
1761                std::sync::atomic::AtomicBool::new(false);
1762            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1763                return;
1764            }
1765
1766            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1767            let peer_id = self.peer_id();
1768            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1769            let _g = s.enter();
1770            let options = self.implicit_commit_then_stop().0;
1771            self.oplog.lock().check_dag_correctness();
1772            if self.is_shallow() {
1773                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1774                //
1775                // Instead, we:
1776                // 1. Export the initial state from the GC snapshot.
1777                // 2. Create a new document and import the initial snapshot.
1778                // 3. Export updates from the shallow start version vector to the current version.
1779                // 4. Import these updates into the new document.
1780                // 5. Compare the states of the new document and the current document.
1781
1782                // Step 1: Export the initial state from the GC snapshot.
1783                let initial_snapshot = self
1784                    .export(ExportMode::state_only(Some(
1785                        &self.shallow_since_frontiers(),
1786                    )))
1787                    .unwrap();
1788
1789                // Step 2: Create a new document and import the initial snapshot.
1790                let doc = LoroDoc::new();
1791                doc.import(&initial_snapshot).unwrap();
1792                self.checkout(&self.shallow_since_frontiers()).unwrap();
1793                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1794
1795                // Step 3: Export updates since the shallow start version vector to the current version.
1796                let updates = self.export(ExportMode::all_updates()).unwrap();
1797
1798                // Step 4: Import these updates into the new document.
1799                doc.import(&updates).unwrap();
1800                self.checkout_to_latest();
1801
1802                // Step 5: Checkout to the current state's frontiers and compare the states.
1803                // doc.checkout(&self.state_frontiers()).unwrap();
1804                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1805                let mut calculated_state = doc.app_state().lock();
1806                let mut current_state = self.app_state().lock();
1807                current_state.check_is_the_same(&mut calculated_state);
1808            } else {
1809                let f = self.state_frontiers();
1810                let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1811                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1812                let doc = Self::new();
1813                doc.import(&bytes).unwrap();
1814                let mut calculated_state = doc.app_state().lock();
1815                let mut current_state = self.app_state().lock();
1816                current_state.check_is_the_same(&mut calculated_state);
1817            }
1818
1819            self.renew_txn_if_auto_commit(options);
1820            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1821        }
1822    }
1823
1824    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1825        self.query_pos_internal(pos, true)
1826    }
1827
1828    /// Get position in a seq container
1829    pub(crate) fn query_pos_internal(
1830        &self,
1831        pos: &Cursor,
1832        ret_event_index: bool,
1833    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1834        if !self.has_container(&pos.container) {
1835            return Err(CannotFindRelativePosition::IdNotFound);
1836        }
1837
1838        let mut state = self.state.lock();
1839        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1840            Ok(PosQueryResult {
1841                update: None,
1842                current: AbsolutePosition {
1843                    pos: ans,
1844                    side: pos.side,
1845                },
1846            })
1847        } else {
1848            // We need to trace back to the version where the relative position is valid.
1849            // The optimal way to find that version is to have succ info like Automerge.
1850            //
1851            // But we don't have that info now, so an alternative way is to trace back
1852            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1853            // the target is just deleted a few versions ago.
1854            //
1855            // What we need is to trace back to the latest version that deletes the target
1856            // id.
1857
1858            // commit the txn to make sure we can query the history correctly, preserving options
1859            drop(state);
1860            let result = self.with_barrier(|| {
1861                let oplog = self.oplog().lock();
1862                // TODO: assert pos.id is not unknown
1863                if let Some(id) = pos.id {
1864                    // Ensure the container is registered if it exists lazily
1865                    if oplog.arena.id_to_idx(&pos.container).is_none() {
1866                        let mut s = self.state.lock();
1867                        if !s.does_container_exist(&pos.container) {
1868                            return Err(CannotFindRelativePosition::ContainerDeleted);
1869                        }
1870                        s.ensure_container(&pos.container);
1871                        drop(s);
1872                    }
1873                    let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1874                    // We know where the target id is when we trace back to the delete_op_id.
1875                    let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1876                        if oplog.shallow_since_vv().includes_id(id) {
1877                            return Err(CannotFindRelativePosition::HistoryCleared);
1878                        }
1879
1880                        tracing::error!("Cannot find id {}", id);
1881                        return Err(CannotFindRelativePosition::IdNotFound);
1882                    };
1883                    // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1884                    let mut diff_calc = DiffCalculator::new(true);
1885                    let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1886                    let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1887                    // TODO: PERF: it doesn't need to calc the effects here
1888                    diff_calc.calc_diff_internal(
1889                        &oplog,
1890                        before,
1891                        &before_frontiers,
1892                        oplog.vv(),
1893                        oplog.frontiers(),
1894                        Some(&|target| idx == target),
1895                    );
1896                    // TODO: remove depth info
1897                    let depth = self.arena.get_depth(idx);
1898                    let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1899                    match diff_calc {
1900                        crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1901                            let c = text.get_id_latest_pos(id).unwrap();
1902                            let new_pos = c.pos;
1903                            let handler = self.get_text(&pos.container);
1904                            let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1905                            Ok(PosQueryResult {
1906                                update: handler.get_cursor(current_pos, c.side),
1907                                current: AbsolutePosition {
1908                                    pos: current_pos,
1909                                    side: c.side,
1910                                },
1911                            })
1912                        }
1913                        crate::diff_calc::ContainerDiffCalculator::List(list) => {
1914                            let c = list.get_id_latest_pos(id).unwrap();
1915                            let new_pos = c.pos;
1916                            let handler = self.get_list(&pos.container);
1917                            Ok(PosQueryResult {
1918                                update: handler.get_cursor(new_pos, c.side),
1919                                current: AbsolutePosition {
1920                                    pos: new_pos,
1921                                    side: c.side,
1922                                },
1923                            })
1924                        }
1925                        crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1926                            let c = list.get_id_latest_pos(id).unwrap();
1927                            let new_pos = c.pos;
1928                            let handler = self.get_movable_list(&pos.container);
1929                            let new_pos = handler.op_pos_to_user_pos(new_pos);
1930                            Ok(PosQueryResult {
1931                                update: handler.get_cursor(new_pos, c.side),
1932                                current: AbsolutePosition {
1933                                    pos: new_pos,
1934                                    side: c.side,
1935                                },
1936                            })
1937                        }
1938                        crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1939                        crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1940                        #[cfg(feature = "counter")]
1941                        crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1942                        crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1943                    }
1944                } else {
1945                    match pos.container.container_type() {
1946                        ContainerType::Text => {
1947                            let text = self.get_text(&pos.container);
1948                            Ok(PosQueryResult {
1949                                update: Some(Cursor {
1950                                    id: None,
1951                                    container: text.id(),
1952                                    side: pos.side,
1953                                    origin_pos: text.len_unicode(),
1954                                }),
1955                                current: AbsolutePosition {
1956                                    pos: text.len_event(),
1957                                    side: pos.side,
1958                                },
1959                            })
1960                        }
1961                        ContainerType::List => {
1962                            let list = self.get_list(&pos.container);
1963                            Ok(PosQueryResult {
1964                                update: Some(Cursor {
1965                                    id: None,
1966                                    container: list.id(),
1967                                    side: pos.side,
1968                                    origin_pos: list.len(),
1969                                }),
1970                                current: AbsolutePosition {
1971                                    pos: list.len(),
1972                                    side: pos.side,
1973                                },
1974                            })
1975                        }
1976                        ContainerType::MovableList => {
1977                            let list = self.get_movable_list(&pos.container);
1978                            Ok(PosQueryResult {
1979                                update: Some(Cursor {
1980                                    id: None,
1981                                    container: list.id(),
1982                                    side: pos.side,
1983                                    origin_pos: list.len(),
1984                                }),
1985                                current: AbsolutePosition {
1986                                    pos: list.len(),
1987                                    side: pos.side,
1988                                },
1989                            })
1990                        }
1991                        ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1992                            unreachable!()
1993                        }
1994                        #[cfg(feature = "counter")]
1995                        ContainerType::Counter => unreachable!(),
1996                    }
1997                }
1998            });
1999            result
2000        }
2001    }
2002
2003    /// Free the history cache that is used for making checkout faster.
2004    ///
2005    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
2006    /// You can free it by calling this method.
2007    pub fn free_history_cache(&self) {
2008        self.oplog.lock().free_history_cache();
2009    }
2010
2011    /// Free the cached diff calculator that is used for checkout.
2012    pub fn free_diff_calculator(&self) {
2013        *self.diff_calculator.lock() = DiffCalculator::new(true);
2014    }
2015
2016    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
2017    /// You can free it by calling `free_history_cache`.
2018    pub fn has_history_cache(&self) -> bool {
2019        self.oplog.lock().has_history_cache()
2020    }
2021
2022    /// Encoded all ops and history cache to bytes and store them in the kv store.
2023    ///
2024    /// The parsed ops will be dropped
2025    #[inline]
2026    pub fn compact_change_store(&self) {
2027        self.with_barrier(|| {
2028            self.oplog.lock().compact_change_store();
2029        });
2030    }
2031
2032    /// Analyze the container info of the doc
2033    ///
2034    /// This is used for development and debugging
2035    #[inline]
2036    pub fn analyze(&self) -> DocAnalysis {
2037        DocAnalysis::analyze(self)
2038    }
2039
2040    /// Get the path from the root to the container
2041    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
2042        let mut state = self.state.lock();
2043        if state.arena.id_to_idx(id).is_none() {
2044            if id.is_mergeable() {
2045                // Mergeable children can be logically active via the parent map marker
2046                // before they have their own encoded state. Register only the arena edge; do not
2047                // create container state or change `has_container` semantics.
2048                state.arena.register_container(id);
2049            } else if !state.does_container_exist(id) {
2050                return None;
2051            } else {
2052                state.ensure_container(id);
2053            }
2054        }
2055        let idx = state.arena.id_to_idx(id).unwrap();
2056        state.get_path(idx)
2057    }
2058
2059    #[instrument(skip(self))]
2060    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
2061        self.with_barrier(|| {
2062            let ans = match mode {
2063                ExportMode::Snapshot => export_fast_snapshot(self),
2064                ExportMode::Updates { from } => export_fast_updates(self, &from),
2065                ExportMode::UpdatesInRange { spans } => {
2066                    export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
2067                }
2068                ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
2069                ExportMode::StateOnly(f) => match f {
2070                    Some(f) => export_state_only_snapshot(self, &f)?,
2071                    None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
2072                },
2073                ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
2074            };
2075            Ok(ans)
2076        })
2077    }
2078
2079    /// The doc only contains the history since the shallow history start version vector.
2080    ///
2081    /// This is empty if the doc is not shallow.
2082    ///
2083    /// The ops included by the shallow history start version vector are not in the doc.
2084    pub fn shallow_since_vv(&self) -> ImVersionVector {
2085        self.oplog().lock().shallow_since_vv().clone()
2086    }
2087
2088    pub fn shallow_since_frontiers(&self) -> Frontiers {
2089        self.oplog().lock().shallow_since_frontiers().clone()
2090    }
2091
2092    /// Check if the doc contains the full history.
2093    pub fn is_shallow(&self) -> bool {
2094        !self.oplog().lock().shallow_since_vv().is_empty()
2095    }
2096
2097    /// Get the number of operations in the pending transaction.
2098    ///
2099    /// The pending transaction is the one that is not committed yet. It will be committed
2100    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
2101    pub fn get_pending_txn_len(&self) -> usize {
2102        if let Some(txn) = self.txn.lock().as_ref() {
2103            txn.len()
2104        } else {
2105            0
2106        }
2107    }
2108
2109    #[inline]
2110    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
2111        self.oplog().lock().dag.find_path(from, to)
2112    }
2113
2114    /// Subscribe to the first commit from a peer. Operations performed on the `LoroDoc` within this callback
2115    /// will be merged into the current commit.
2116    ///
2117    /// This is useful for managing the relationship between `PeerID` and user information.
2118    /// For example, you could store user names in a `LoroMap` using `PeerID` as the key and the `UserID` as the value.
2119    pub fn subscribe_first_commit_from_peer(
2120        &self,
2121        callback: FirstCommitFromPeerCallback,
2122    ) -> Subscription {
2123        let (s, enable) = self
2124            .first_commit_from_peer_subs
2125            .inner()
2126            .insert((), callback);
2127        enable();
2128        s
2129    }
2130
2131    /// Subscribe to the pre-commit event.
2132    ///
2133    /// The callback will be called when the changes are committed but not yet applied to the OpLog.
2134    /// You can modify the commit message and timestamp in the callback by [`ChangeModifier`].
2135    pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
2136        let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
2137        enable();
2138        s
2139    }
2140}
2141
2142fn pending_root_containers_to_materialize(oplog: &OpLog, changes: &[Change]) -> Vec<ContainerID> {
2143    let mut roots = FxHashSet::default();
2144    for change in changes {
2145        if change.ctr_end() <= oplog.vv().get(&change.id.peer).copied().unwrap_or(0) {
2146            continue;
2147        }
2148
2149        if oplog.dag.is_before_shallow_root(&change.deps)
2150            || oplog
2151                .dag
2152                .get_change_lamport_from_deps(&change.deps)
2153                .is_some()
2154        {
2155            continue;
2156        }
2157
2158        for op in change.ops.iter() {
2159            let id = oplog
2160                .arena
2161                .get_container_id(op.container)
2162                .expect("decoded op container should be registered");
2163            // Mergeable containers share the `ContainerID::Root` namespace but are logical
2164            // *children*: their existence is governed by their parent map's marker, and their
2165            // parent can be any (possibly not-yet-imported) map. Eagerly materializing one
2166            // while its causal dependencies are still pending has no valid parent edge to
2167            // resolve its depth against, which used to panic in `ContainerWrapper::new`.
2168            // They get materialized correctly through the normal diff path once the creating
2169            // change applies, so skip them here (mirrors the `!is_mergeable()` guard in
2170            // `ensure_root_container`). See the `mergeable_container::pending` regression test.
2171            if id.is_root() && !id.is_mergeable() {
2172                roots.insert(id);
2173            }
2174        }
2175    }
2176
2177    roots.into_iter().collect()
2178}
2179
2180#[derive(Debug, thiserror::Error)]
2181pub enum ChangeTravelError {
2182    #[error("Target id not found {0:?}")]
2183    TargetIdNotFound(ID),
2184    #[error("The shallow history of the doc doesn't include the target version")]
2185    TargetVersionNotIncluded,
2186}
2187
2188impl LoroDoc {
2189    pub fn travel_change_ancestors(
2190        &self,
2191        ids: &[ID],
2192        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
2193    ) -> Result<(), ChangeTravelError> {
2194        let (options, guard) = self.implicit_commit_then_stop();
2195        drop(guard);
2196        struct PendingNode(ChangeMeta);
2197        impl PartialEq for PendingNode {
2198            fn eq(&self, other: &Self) -> bool {
2199                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
2200            }
2201        }
2202
2203        impl Eq for PendingNode {}
2204        impl PartialOrd for PendingNode {
2205            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2206                Some(self.cmp(other))
2207            }
2208        }
2209
2210        impl Ord for PendingNode {
2211            fn cmp(&self, other: &Self) -> Ordering {
2212                self.0
2213                    .lamport_last()
2214                    .cmp(&other.0.lamport_last())
2215                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
2216            }
2217        }
2218
2219        for id in ids {
2220            let op_log = &self.oplog().lock();
2221            if !op_log.vv().includes_id(*id) {
2222                return Err(ChangeTravelError::TargetIdNotFound(*id));
2223            }
2224            if op_log.dag.shallow_since_vv().includes_id(*id) {
2225                return Err(ChangeTravelError::TargetVersionNotIncluded);
2226            }
2227        }
2228
2229        let mut visited = FxHashSet::default();
2230        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
2231        for id in ids {
2232            pending.push(PendingNode(ChangeMeta::from_change(
2233                &self.oplog().lock().get_change_at(*id).unwrap(),
2234            )));
2235        }
2236        while let Some(PendingNode(node)) = pending.pop() {
2237            let deps = node.deps.clone();
2238            if f(node).is_break() {
2239                break;
2240            }
2241
2242            for dep in deps.iter() {
2243                let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
2244                    continue;
2245                };
2246                if visited.contains(&dep_node.id) {
2247                    continue;
2248                }
2249
2250                visited.insert(dep_node.id);
2251                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
2252            }
2253        }
2254
2255        let ans = Ok(());
2256        self.renew_txn_if_auto_commit(options);
2257        ans
2258    }
2259
2260    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
2261        self.with_barrier(|| {
2262            let mut set = FxHashSet::default();
2263            let len = i64::try_from(len).unwrap_or(i64::MAX);
2264            let start = i64::from(id.counter);
2265            let end = start.saturating_add(len);
2266            if end <= 0 {
2267                return set;
2268            }
2269
2270            let start = start.max(0).min(i64::from(i32::MAX));
2271            let end = end.max(0).min(i64::from(i32::MAX));
2272            if start >= end {
2273                return set;
2274            }
2275
2276            {
2277                let oplog = self.oplog().lock();
2278                let span = IdSpan::new(id.peer, start as i32, end as i32);
2279                for op in oplog.iter_ops(span) {
2280                    let id = oplog.arena.get_container_id(op.container()).unwrap();
2281                    set.insert(id);
2282                }
2283            }
2284            set
2285        })
2286    }
2287
2288    pub fn delete_root_container(&self, cid: ContainerID) {
2289        if !cid.is_root() {
2290            return;
2291        }
2292
2293        // Do not treat "not in arena" as non-existence; consult state/kv
2294        if !self.has_container(&cid) {
2295            return;
2296        }
2297
2298        let Some(h) = self.get_handler(cid.clone()) else {
2299            return;
2300        };
2301
2302        self.config
2303            .deleted_root_containers
2304            .lock()
2305            .insert(cid.clone());
2306        if let Err(e) = h.clear() {
2307            self.config.deleted_root_containers.lock().remove(&cid);
2308            eprintln!("Failed to clear handler: {:?}", e);
2309        }
2310    }
2311
2312    pub fn set_hide_empty_root_containers(&self, hide: bool) {
2313        self.config
2314            .hide_empty_root_containers
2315            .store(hide, std::sync::atomic::Ordering::Relaxed);
2316    }
2317}
2318
2319fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2320    // Any delete op that covers `id` must have observed it, so its peer's counter
2321    // at delete time was > id.counter. start_vv (the vv at `id`) is therefore a
2322    // valid lower bound: changes at or before start_vv[peer] predate `id` and can
2323    // be skipped. We scan peer-by-peer rather than using the DAG-ordered
2324    // iter_changes_causally_rev, which is O(total changes).
2325    //
2326    // We choose the matching delete op with the greatest (op_lamport, peer, counter)
2327    // ordering. op_lamport is the Lamport of the specific op within the change
2328    // (change.lamport + op offset), not just the change's starting Lamport, so
2329    // concurrent deletes with equal change Lamports are broken deterministically.
2330    let start_vv = oplog
2331        .dag
2332        .frontiers_to_vv(&id.into())
2333        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2334
2335    // (op_lamport, peer) gives a deterministic total order for concurrent deletes.
2336    // A single peer cannot produce two ops with the same lamport, so peer suffices
2337    // as a tie-breaker.
2338    let mut best: Option<((loro_common::Lamport, loro_common::PeerID), ID)> = None;
2339
2340    for change in oplog.iter_changes_peer_by_peer(&start_vv, oplog.vv()) {
2341        let peer = change.peer();
2342        for op in change.ops.iter() {
2343            if op.container != idx {
2344                continue;
2345            }
2346            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2347                if d.id_start.to_span(d.atom_len()).contains(id) {
2348                    debug_assert!(op.counter >= change.id().counter);
2349                    let op_lamport =
2350                        change.lamport + (op.counter - change.id().counter) as loro_common::Lamport;
2351                    let key = (op_lamport, peer);
2352                    if best.is_none_or(|(bk, _)| key > bk) {
2353                        best = Some((key, ID::new(peer, op.counter)));
2354                    }
2355                }
2356            }
2357        }
2358    }
2359
2360    best.map(|(_, op_id)| op_id)
2361}
2362
2363#[derive(Debug)]
2364pub struct CommitWhenDrop<'a> {
2365    doc: &'a LoroDoc,
2366    default_options: CommitOptions,
2367}
2368
2369impl Drop for CommitWhenDrop<'_> {
2370    fn drop(&mut self) {
2371        {
2372            let mut guard = self.doc.txn.lock();
2373            if let Some(txn) = guard.as_mut() {
2374                txn.set_default_options(std::mem::take(&mut self.default_options));
2375            };
2376        }
2377
2378        self.doc.commit_then_renew();
2379    }
2380}
2381
2382/// Options for configuring a commit operation.
2383#[derive(Debug, Clone)]
2384pub struct CommitOptions {
2385    /// Origin identifier for the commit event, used to track the source of changes.
2386    /// It doesn't persist.
2387    pub origin: Option<InternalString>,
2388
2389    /// Whether to immediately start a new transaction after committing.
2390    /// Defaults to true.
2391    pub immediate_renew: bool,
2392
2393    /// Custom timestamp for the commit in seconds since Unix epoch.
2394    /// If None, the current time will be used.
2395    pub timestamp: Option<Timestamp>,
2396
2397    /// Optional commit message to attach to the changes. It will be persisted.
2398    pub commit_msg: Option<Arc<str>>,
2399}
2400
2401impl CommitOptions {
2402    /// Creates a new CommitOptions with default values.
2403    pub fn new() -> Self {
2404        Self {
2405            origin: None,
2406            immediate_renew: true,
2407            timestamp: None,
2408            commit_msg: None,
2409        }
2410    }
2411
2412    /// Sets the origin identifier for this commit.
2413    pub fn origin(mut self, origin: &str) -> Self {
2414        self.origin = Some(origin.into());
2415        self
2416    }
2417
2418    /// Sets whether to immediately start a new transaction after committing.
2419    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2420        self.immediate_renew = immediate_renew;
2421        self
2422    }
2423
2424    /// Set the timestamp of the commit.
2425    ///
2426    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
2427    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2428        self.timestamp = Some(timestamp);
2429        self
2430    }
2431
2432    /// Sets a commit message to be attached to the changes.
2433    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2434        self.commit_msg = Some(commit_msg.into());
2435        self
2436    }
2437
2438    /// Sets the origin identifier for this commit.
2439    pub fn set_origin(&mut self, origin: Option<&str>) {
2440        self.origin = origin.map(|x| x.into())
2441    }
2442
2443    /// Sets the timestamp for this commit.
2444    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2445        self.timestamp = timestamp;
2446    }
2447}
2448
2449impl Default for CommitOptions {
2450    fn default() -> Self {
2451        Self::new()
2452    }
2453}
2454
2455#[cfg(test)]
2456mod test {
2457    use std::{
2458        panic::AssertUnwindSafe,
2459        sync::{
2460            atomic::{AtomicUsize, Ordering},
2461            Arc,
2462        },
2463    };
2464
2465    use crate::{
2466        cursor::PosType,
2467        encoding::json_schema::json::{JsonOpContent, JsonSchema, ListOp},
2468        encoding::{fast_snapshot::EMPTY_MARK, EncodeMode},
2469        loro::ExportMode,
2470        version::{Frontiers, VersionVector},
2471        LoroDoc, ToJson, TreeParentId,
2472    };
2473    use bytes::{BufMut, Bytes};
2474    use loro_common::ID;
2475    use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
2476
2477    const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
2478
2479    fn encode_import_blob(mode: EncodeMode, body: &[u8]) -> Vec<u8> {
2480        let mut ans = Vec::new();
2481        ans.extend_from_slice(b"loro");
2482        ans.extend_from_slice(&[0; 16]);
2483        ans.extend_from_slice(&mode.to_bytes());
2484        ans.extend_from_slice(body);
2485        let checksum = xxhash_rust::xxh32::xxh32(&ans[20..], XXH_SEED);
2486        ans[16..20].copy_from_slice(&checksum.to_le_bytes());
2487        ans
2488    }
2489
2490    fn encode_fast_snapshot_import(oplog_bytes: &[u8]) -> Vec<u8> {
2491        let mut body = Vec::new();
2492        body.put_u32_le(oplog_bytes.len() as u32);
2493        body.extend_from_slice(oplog_bytes);
2494        body.put_u32_le(EMPTY_MARK.len() as u32);
2495        body.extend_from_slice(EMPTY_MARK);
2496        body.put_u32_le(0);
2497        encode_import_blob(EncodeMode::FastSnapshot, &body)
2498    }
2499
2500    fn sstable_with_huge_meta_block_count() -> Vec<u8> {
2501        let mut bytes = Vec::new();
2502        bytes.extend_from_slice(b"LORO");
2503        bytes.push(0);
2504        bytes.put_u32_le(10_000_000);
2505        bytes.put_u32_le(xxhash_rust::xxh32::xxh32(&[], XXH_SEED));
2506        bytes.put_u32_le(5);
2507        bytes
2508    }
2509
2510    fn snapshot_oplog_with_malformed_block() -> Vec<u8> {
2511        let peer = 1;
2512        let id = ID::new(peer, 0);
2513        let vv = VersionVector::from_iter([(peer, 1)]);
2514        let frontiers = Frontiers::from_id(id);
2515        let mut store = MemKvStore::new(MemKvConfig::default());
2516        store.set(b"vv", vv.encode().into());
2517        store.set(b"fr", frontiers.encode().into());
2518        store.set(&id.to_bytes(), Bytes::from_static(&[0]));
2519        store.export_all().to_vec()
2520    }
2521
2522    fn make_json_import_stress_doc(peer: u64) -> LoroDoc {
2523        let doc = LoroDoc::new_auto_commit();
2524        doc.set_peer_id(peer).unwrap();
2525
2526        let text = doc.get_text("text");
2527        let mut text_pos = 0;
2528        for i in 0..32 {
2529            let chunk = format!("segment-{i}-abcdefghijklmnopqrstuvwxyz;");
2530            text.insert_unicode(text_pos, &chunk).unwrap();
2531            text_pos += chunk.chars().count();
2532        }
2533
2534        let list = doc.get_list("list");
2535        for i in 0..32 {
2536            list.insert(i, format!("item-{i}")).unwrap();
2537        }
2538
2539        let map = doc.get_map("map");
2540        for i in 0..32 {
2541            let key = format!("key-{i}");
2542            map.insert(&key, format!("value-{i}")).unwrap();
2543        }
2544
2545        let tree = doc.get_tree("tree");
2546        let mut parent = TreeParentId::Root;
2547        for i in 0..16 {
2548            let node = tree.create(parent).unwrap();
2549            let meta = tree.get_meta(node).unwrap();
2550            meta.insert("name", format!("node-{i}")).unwrap();
2551            meta.insert("payload", format!("payload-{i}-{}", "x".repeat(16)))
2552                .unwrap();
2553            parent = TreeParentId::Node(node);
2554        }
2555
2556        doc
2557    }
2558
2559    fn make_json_list_update_with_four_ops(peer: u64) -> (LoroDoc, JsonSchema) {
2560        let doc = LoroDoc::new();
2561        doc.set_peer_id(peer).unwrap();
2562        let map = doc.get_map("map");
2563        let list = doc.get_list("list");
2564        let text = doc.get_text("text");
2565
2566        let mut txn = doc.txn().unwrap();
2567        map.insert_with_txn(&mut txn, "prefix", "map-value".into())
2568            .unwrap();
2569        list.insert_with_txn(&mut txn, 0, "seed".into()).unwrap();
2570        text.insert_with_txn(&mut txn, 0, "text-value", PosType::Unicode)
2571            .unwrap();
2572        list.insert_with_txn(&mut txn, 1, "tail".into()).unwrap();
2573        txn.commit().unwrap();
2574
2575        let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv(), false);
2576        assert_eq!(json.changes.len(), 1);
2577        assert_eq!(json.changes[0].ops.len(), 4);
2578        (doc, json)
2579    }
2580
2581    fn move_last_list_insert_far_out_of_bounds(json: &mut JsonSchema) {
2582        let last_change = json.changes.last_mut().unwrap();
2583        let last_op = last_change.ops.last_mut().unwrap();
2584        match &mut last_op.content {
2585            JsonOpContent::List(ListOp::Insert { pos, .. }) => {
2586                *pos = 1_000;
2587            }
2588            other => panic!("expected list insert op, got {other:?}"),
2589        }
2590    }
2591
2592    #[test]
2593    fn test_sync() {
2594        fn is_send_sync<T: Send + Sync>(_v: T) {}
2595        let loro = super::LoroDoc::new();
2596        is_send_sync(loro)
2597    }
2598
2599    #[test]
2600    fn import_rejects_huge_sstable_meta_block_count_without_panic() {
2601        let bytes = encode_fast_snapshot_import(&sstable_with_huge_meta_block_count());
2602
2603        let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2604        assert!(result.is_ok(), "malformed import should not panic");
2605        assert!(result.unwrap().is_err());
2606    }
2607
2608    #[test]
2609    fn import_rejects_malformed_change_block_without_panic() {
2610        let bytes = encode_fast_snapshot_import(&snapshot_oplog_with_malformed_block());
2611
2612        let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2613        assert!(result.is_ok(), "malformed import should not panic");
2614        assert!(result.unwrap().is_err());
2615    }
2616
2617    #[test]
2618    fn failed_import_rolls_back_oplog_and_arena() {
2619        let src = LoroDoc::new();
2620        src.set_peer_id(1).unwrap();
2621        let text = src.get_text("text");
2622        let mut txn = src.txn().unwrap();
2623        text.insert_with_txn(&mut txn, 0, "hello", PosType::Unicode)
2624            .unwrap();
2625        txn.commit().unwrap();
2626        let update = src.export(ExportMode::all_updates()).unwrap();
2627
2628        let dst = LoroDoc::new();
2629        let vv_before_import = dst.oplog_vv();
2630        let state_before_import = dst.get_deep_value();
2631        let err = dst
2632            .import_with(&update, "__loro_fail_import_state_apply".into())
2633            .unwrap_err();
2634        assert!(err.to_string().contains("state apply failpoint"));
2635        assert_eq!(dst.oplog_vv(), vv_before_import);
2636        assert_eq!(dst.get_deep_value(), state_before_import);
2637        assert!(dst.oplog().lock().is_empty());
2638
2639        dst.import(&update).unwrap();
2640        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2641    }
2642
2643    #[test]
2644    fn failed_incremental_import_restores_previous_change_store_block() {
2645        let src = LoroDoc::new();
2646        src.set_peer_id(1).unwrap();
2647        let text = src.get_text("text");
2648        let mut txn = src.txn().unwrap();
2649        text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2650            .unwrap();
2651        txn.commit().unwrap();
2652        let first_update = src.export(ExportMode::all_updates()).unwrap();
2653        let first_vv = src.oplog_vv();
2654
2655        let mut txn = src.txn().unwrap();
2656        text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2657            .unwrap();
2658        txn.commit().unwrap();
2659        let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2660
2661        let dst = LoroDoc::new();
2662        dst.import(&first_update).unwrap();
2663        let vv_before_import = dst.oplog_vv();
2664        let state_before_import = dst.get_deep_value();
2665        dst.import_with(&second_update, "__loro_fail_import_state_apply".into())
2666            .unwrap_err();
2667        assert_eq!(dst.oplog_vv(), vv_before_import);
2668        assert_eq!(dst.get_deep_value(), state_before_import);
2669
2670        dst.import(&second_update).unwrap();
2671        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2672    }
2673
2674    #[test]
2675    fn failed_import_json_updates_rolls_back_complex_empty_doc() {
2676        let src = make_json_import_stress_doc(11);
2677        let json = src.export_json_updates(&Default::default(), &src.oplog_vv(), false);
2678
2679        let dst = LoroDoc::new();
2680        let vv_before_import = dst.oplog_vv();
2681        let frontiers_before_import = dst.oplog_frontiers();
2682        let state_before_import = dst.get_deep_value();
2683        for _ in 0..3 {
2684            crate::state::fail_next_import_state_apply_for_test();
2685            let err = dst.import_json_updates(json.clone()).unwrap_err();
2686            assert!(err.to_string().contains("state apply failpoint"));
2687            assert_eq!(dst.oplog_vv(), vv_before_import);
2688            assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2689            assert_eq!(dst.get_deep_value(), state_before_import);
2690            assert!(dst.oplog().lock().is_empty());
2691        }
2692
2693        dst.import_json_updates(json).unwrap();
2694        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2695        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2696        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2697    }
2698
2699    #[test]
2700    fn failed_incremental_import_json_updates_restores_previous_change_store_block() {
2701        let src = LoroDoc::new_auto_commit();
2702        src.set_peer_id(12).unwrap();
2703        let text = src.get_text("text");
2704        text.insert_unicode(0, "a").unwrap();
2705        let list = src.get_list("list");
2706        list.push("seed").unwrap();
2707        let map = src.get_map("map");
2708        map.insert("seed", "value").unwrap();
2709        let tree = src.get_tree("tree");
2710        let root = tree.create(TreeParentId::Root).unwrap();
2711        tree.get_meta(root).unwrap().insert("name", "root").unwrap();
2712
2713        let first_vv = src.oplog_vv();
2714        let first_json = src.export_json_updates(&Default::default(), &first_vv, false);
2715
2716        let mut text_pos = text.len_unicode();
2717        for i in 0..64 {
2718            let chunk = format!("chunk-{i};");
2719            text.insert_unicode(text_pos, &chunk).unwrap();
2720            text_pos += chunk.chars().count();
2721        }
2722        for i in 0..32 {
2723            list.push(format!("after-{i}")).unwrap();
2724            let key = format!("after-{i}");
2725            map.insert(&key, format!("value-{i}")).unwrap();
2726        }
2727        let child = tree.create(TreeParentId::Node(root)).unwrap();
2728        tree.get_meta(child)
2729            .unwrap()
2730            .insert("name", "child")
2731            .unwrap();
2732
2733        let second_json = src.export_json_updates(&first_vv, &src.oplog_vv(), false);
2734
2735        let dst = LoroDoc::new();
2736        dst.import_json_updates(first_json).unwrap();
2737        let vv_before_import = dst.oplog_vv();
2738        let frontiers_before_import = dst.oplog_frontiers();
2739        let state_before_import = dst.get_deep_value();
2740
2741        for _ in 0..2 {
2742            crate::state::fail_next_import_state_apply_for_test();
2743            let err = dst.import_json_updates(second_json.clone()).unwrap_err();
2744            assert!(err.to_string().contains("state apply failpoint"));
2745            assert_eq!(dst.oplog_vv(), vv_before_import);
2746            assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2747            assert_eq!(dst.get_deep_value(), state_before_import);
2748        }
2749
2750        dst.import_json_updates(second_json).unwrap();
2751        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2752        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2753        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2754    }
2755
2756    #[test]
2757    fn malformed_later_import_json_update_rolls_back_after_valid_prefix_enters_oplog() {
2758        let peer = 13;
2759        let (src, good_json) = make_json_list_update_with_four_ops(peer);
2760        let mut bad_json = good_json.clone();
2761        move_last_list_insert_far_out_of_bounds(&mut bad_json);
2762
2763        let good_dst = LoroDoc::new();
2764        good_dst.import_json_updates(good_json.clone()).unwrap();
2765        assert_eq!(good_dst.get_deep_value(), src.get_deep_value());
2766
2767        let last_op_counter = good_json.changes[0].ops.last().unwrap().counter;
2768        let prefix_vv = VersionVector::from_iter([(peer, last_op_counter)]);
2769        let prefix_json = src.export_json_updates(&Default::default(), &prefix_vv, false);
2770        assert_eq!(
2771            prefix_json.changes[0].ops.len(),
2772            good_json.changes[0].ops.len() - 1
2773        );
2774        let good_suffix_json = src.export_json_updates(&prefix_vv, &src.oplog_vv(), false);
2775        assert_eq!(good_suffix_json.changes[0].ops.len(), 1);
2776        let mut bad_suffix_json = good_suffix_json.clone();
2777        move_last_list_insert_far_out_of_bounds(&mut bad_suffix_json);
2778
2779        let prefix_dst = LoroDoc::new();
2780        prefix_dst.import_json_updates(prefix_json.clone()).unwrap();
2781        let vv_before_bad_suffix = prefix_dst.oplog_vv();
2782        let frontiers_before_bad_suffix = prefix_dst.oplog_frontiers();
2783        let state_before_bad_suffix = prefix_dst.get_deep_value();
2784
2785        let bad_suffix_json = serde_json::to_string(&bad_suffix_json).unwrap();
2786        let err = prefix_dst
2787            .import_json_updates(&bad_suffix_json)
2788            .unwrap_err();
2789        assert!(
2790            err.to_string().contains("list diff"),
2791            "expected state list bounds validation, got {err:?}"
2792        );
2793        assert_eq!(prefix_dst.oplog_vv(), vv_before_bad_suffix);
2794        assert_eq!(prefix_dst.oplog_frontiers(), frontiers_before_bad_suffix);
2795        assert_eq!(prefix_dst.get_deep_value(), state_before_bad_suffix);
2796
2797        prefix_dst.import_json_updates(good_suffix_json).unwrap();
2798        assert_eq!(prefix_dst.get_deep_value(), src.get_deep_value());
2799        assert_eq!(prefix_dst.oplog_vv(), src.oplog_vv());
2800
2801        let dst = LoroDoc::new();
2802        let vv_before_import = dst.oplog_vv();
2803        let frontiers_before_import = dst.oplog_frontiers();
2804        let state_before_import = dst.get_deep_value();
2805        let bad_json = serde_json::to_string(&bad_json).unwrap();
2806        let err = dst.import_json_updates(&bad_json).unwrap_err();
2807        assert!(
2808            err.to_string().contains("list diff"),
2809            "expected state list bounds validation, got {err:?}"
2810        );
2811        assert_eq!(dst.oplog_vv(), vv_before_import);
2812        assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2813        assert_eq!(dst.get_deep_value(), state_before_import);
2814        assert!(dst.oplog().lock().is_empty());
2815    }
2816
2817    #[test]
2818    fn failed_import_restores_pending_changes_that_were_applied_during_import() {
2819        let src = LoroDoc::new();
2820        src.set_peer_id(14).unwrap();
2821        let text = src.get_text("text");
2822
2823        let mut txn = src.txn().unwrap();
2824        text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2825            .unwrap();
2826        txn.commit().unwrap();
2827        let first_update = src.export(ExportMode::all_updates()).unwrap();
2828        let first_vv = src.oplog_vv();
2829
2830        let mut txn = src.txn().unwrap();
2831        text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2832            .unwrap();
2833        txn.commit().unwrap();
2834        let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2835
2836        let dst = LoroDoc::new();
2837        let status = dst.import(&second_update).unwrap();
2838        assert!(status.success.is_empty());
2839        assert!(status.pending.is_some());
2840        let vv_before_dependency = dst.oplog_vv();
2841        let frontiers_before_dependency = dst.oplog_frontiers();
2842        let state_before_dependency = dst.get_deep_value();
2843
2844        crate::state::fail_next_import_state_apply_for_test();
2845        let err = dst.import(&first_update).unwrap_err();
2846        assert!(err.to_string().contains("state apply failpoint"));
2847        assert_eq!(dst.oplog_vv(), vv_before_dependency);
2848        assert_eq!(dst.oplog_frontiers(), frontiers_before_dependency);
2849        assert_eq!(dst.get_deep_value(), state_before_dependency);
2850
2851        dst.import(&first_update).unwrap();
2852        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2853        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2854        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2855    }
2856
2857    #[test]
2858    fn failed_import_json_updates_does_not_emit_or_leave_events() {
2859        let (src, good_json) = make_json_list_update_with_four_ops(15);
2860        let mut bad_json = good_json.clone();
2861        move_last_list_insert_far_out_of_bounds(&mut bad_json);
2862
2863        let dst = LoroDoc::new();
2864        let event_count = Arc::new(AtomicUsize::new(0));
2865        let event_count_cloned = event_count.clone();
2866        let _sub = dst.subscribe_root(Arc::new(move |_| {
2867            event_count_cloned.fetch_add(1, Ordering::SeqCst);
2868        }));
2869
2870        let bad_json = serde_json::to_string(&bad_json).unwrap();
2871        let err = dst.import_json_updates(&bad_json).unwrap_err();
2872        assert!(
2873            err.to_string().contains("list diff"),
2874            "expected state list bounds validation, got {err:?}"
2875        );
2876        assert_eq!(event_count.load(Ordering::SeqCst), 0);
2877        assert!(dst.drop_pending_events().is_empty());
2878        assert!(dst.oplog().lock().is_empty());
2879
2880        dst.import_json_updates(good_json).unwrap();
2881        assert_eq!(event_count.load(Ordering::SeqCst), 1);
2882        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2883    }
2884
2885    #[test]
2886    fn test_checkout() {
2887        let loro = LoroDoc::new();
2888        loro.set_peer_id(1).unwrap();
2889        let text = loro.get_text("text");
2890        let map = loro.get_map("map");
2891        let list = loro.get_list("list");
2892        let mut txn = loro.txn().unwrap();
2893        for i in 0..10 {
2894            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2895            text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2896                .unwrap();
2897            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2898        }
2899        txn.commit().unwrap();
2900        let b = LoroDoc::new();
2901        b.import(&loro.export(ExportMode::Snapshot).unwrap())
2902            .unwrap();
2903        loro.checkout(&Frontiers::default()).unwrap();
2904        {
2905            let json = &loro.get_deep_value();
2906            assert_eq!(
2907                json.to_json_value(),
2908                serde_json::json!({"text":"","list":[],"map":{}})
2909            );
2910        }
2911
2912        b.checkout(&ID::new(1, 2).into()).unwrap();
2913        {
2914            let json = &b.get_deep_value();
2915            assert_eq!(
2916                json.to_json_value(),
2917                serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2918            );
2919        }
2920
2921        loro.checkout(&ID::new(1, 3).into()).unwrap();
2922        {
2923            let json = &loro.get_deep_value();
2924            assert_eq!(
2925                json.to_json_value(),
2926                serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2927            );
2928        }
2929
2930        b.checkout(&ID::new(1, 29).into()).unwrap();
2931        {
2932            let json = &b.get_deep_value();
2933            assert_eq!(
2934                json.to_json_value(),
2935                serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2936            );
2937        }
2938    }
2939
2940    #[test]
2941    fn import_batch_err_181() {
2942        let a = LoroDoc::new_auto_commit();
2943        let update_a = a.export(ExportMode::Snapshot);
2944        let b = LoroDoc::new_auto_commit();
2945        b.import_batch(&[update_a.unwrap()]).unwrap();
2946        b.get_text("text")
2947            .insert(0, "hello", PosType::Unicode)
2948            .unwrap();
2949        b.commit_then_renew();
2950        let oplog = b.oplog().lock();
2951        drop(oplog);
2952        b.export(ExportMode::all_updates()).unwrap();
2953    }
2954
2955    #[test]
2956    fn poisoned_mutex_keeps_follow_up_operations_failed() {
2957        let doc = LoroDoc::new();
2958        let oplog = doc.oplog.clone();
2959        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2960            let _guard = oplog.lock();
2961            panic!("poison oplog");
2962        }));
2963
2964        let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2965            .expect_err("poisoned lock should continue to fail fast");
2966        let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2967            (*msg).to_string()
2968        } else if let Some(msg) = err.downcast_ref::<String>() {
2969            msg.clone()
2970        } else {
2971            String::new()
2972        };
2973        assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2974    }
2975}