Skip to main content

loro_internal/
loro.rs

1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::{AtomicBool, AtomicUsize};
6pub(crate) use crate::LoroDocInner;
7use crate::{
8    arena::SharedArena,
9    change::{Change, Timestamp},
10    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11    container::{
12        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13        IntoContainerId,
14    },
15    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16    dag::{Dag, DagUtils},
17    diff_calc::DiffCalculator,
18    encoding::{
19        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21        export_state_only_snapshot,
22        json_schema::{encode_change_to_json, json::JsonSchema},
23        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24    },
25    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27    id::PeerID,
28    json::JsonChange,
29    op::InnerContent,
30    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31    state::DocState,
32    subscription::{LocalUpdateCallback, Observer, Subscriber},
33    undo::DiffBatch,
34    utils::subscription::{SubscriberSetWithQueue, Subscription},
35    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37    VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42    lock::{LoroLockGroup, LoroMutex},
43    txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47    ContainerID, ContainerType, HasCounterSpan, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError,
48    LoroResult, LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53    borrow::Cow,
54    cmp::Ordering,
55    collections::{hash_map::Entry, BinaryHeap},
56    ops::ControlFlow,
57    sync::{
58        atomic::Ordering::{Acquire, Release},
59        Arc,
60    },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("LoroDoc")
73            .field("config", &self.config)
74            .field("auto_commit", &self.auto_commit)
75            .field("detached", &self.detached)
76            .finish()
77    }
78}
79
80impl LoroDoc {
81    /// Run the provided closure within a commit barrier.
82    ///
83    /// This finalizes any pending auto-commit transaction first (preserving
84    /// options across an empty txn), executes `f`, then renews the transaction
85    /// (carrying preserved options) if auto-commit is enabled. This is the
86    /// common implicit-commit pattern used by internal operations such as
87    /// import/export/checkouts.
88    #[inline]
89    pub fn with_barrier<F, R>(&self, f: F) -> R
90    where
91        F: FnOnce() -> R,
92    {
93        let (options, guard) = self.implicit_commit_then_stop();
94        let result = f();
95        drop(guard);
96        self.renew_txn_if_auto_commit(options);
97        result
98    }
99
100    pub fn new() -> Self {
101        let visible_op_count = Arc::new(AtomicUsize::new(0));
102        let oplog = OpLog::new(visible_op_count.clone());
103        let arena = oplog.arena.clone();
104        let config: Configure = oplog.configure.clone();
105        let lock_group = LoroLockGroup::new();
106        let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
107        let inner = Arc::new_cyclic(|w| {
108            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
109            LoroDocInner {
110                oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
111                state,
112                config,
113                visible_op_count,
114                detached: AtomicBool::new(false),
115                auto_commit: AtomicBool::new(false),
116                observer: Arc::new(Observer::new(arena.clone())),
117                diff_calculator: Arc::new(
118                    lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
119                ),
120                txn: global_txn,
121                arena,
122                local_update_subs: SubscriberSetWithQueue::new(),
123                peer_id_change_subs: SubscriberSetWithQueue::new(),
124                pre_commit_subs: SubscriberSetWithQueue::new(),
125                first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
126            }
127        });
128        LoroDoc { inner }
129    }
130
131    pub fn fork(&self) -> Self {
132        if self.is_detached() {
133            return self
134                .fork_at(&self.state_frontiers())
135                .expect("fork_at on detached doc should not fail");
136        }
137
138        let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
139        let doc = Self::new();
140        doc.with_barrier(|| {
141            encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
142        })
143        .unwrap();
144        doc.set_config(&self.config);
145        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
146            doc.start_auto_commit();
147        }
148        doc
149    }
150    /// Enables editing of the document in detached mode.
151    ///
152    /// By default, the document cannot be edited in detached mode (after calling
153    /// `detach` or checking out a version other than the latest). This method
154    /// allows editing in detached mode.
155    ///
156    /// # Important Notes:
157    ///
158    /// - After enabling this mode, the document will use a different PeerID. Each
159    ///   time you call checkout, a new PeerID will be used.
160    /// - If you set a custom PeerID while this mode is enabled, ensure that
161    ///   concurrent operations with the same PeerID are not possible.
162    /// - On detached mode, importing will not change the state of the document.
163    ///   It also doesn't change the version of the [DocState]. The changes will be
164    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
165    pub fn set_detached_editing(&self, enable: bool) {
166        self.config.set_detached_editing(enable);
167        if enable && self.is_detached() {
168            self.with_barrier(|| {
169                self.renew_peer_id();
170            });
171        }
172    }
173
174    /// Create a doc with auto commit enabled.
175    #[inline]
176    pub fn new_auto_commit() -> Self {
177        let doc = Self::new();
178        doc.start_auto_commit();
179        doc
180    }
181
182    #[inline(always)]
183    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
184        if peer == PeerID::MAX {
185            return Err(LoroError::InvalidPeerID);
186        }
187        let next_id = self.oplog.lock().next_id(peer);
188        if self.auto_commit.load(Acquire) {
189            let doc_state = self.state.lock();
190            doc_state
191                .peer
192                .store(peer, std::sync::atomic::Ordering::Relaxed);
193
194            if doc_state.is_in_txn() {
195                drop(doc_state);
196                // Use implicit-style barrier to avoid swallowing next-commit options
197                self.with_barrier(|| {});
198            }
199            self.peer_id_change_subs.emit(&(), next_id);
200            return Ok(());
201        }
202
203        let doc_state = self.state.lock();
204        if doc_state.is_in_txn() {
205            return Err(LoroError::TransactionError(
206                "Cannot change peer id during transaction"
207                    .to_string()
208                    .into_boxed_str(),
209            ));
210        }
211
212        doc_state
213            .peer
214            .store(peer, std::sync::atomic::Ordering::Relaxed);
215        drop(doc_state);
216        self.peer_id_change_subs.emit(&(), next_id);
217        Ok(())
218    }
219
220    /// Renews the PeerID for the document.
221    pub(crate) fn renew_peer_id(&self) {
222        let mut peer_id = DefaultRandom.next_u64();
223        while peer_id == PeerID::MAX {
224            peer_id = DefaultRandom.next_u64();
225        }
226        self.set_peer_id(peer_id).unwrap();
227    }
228
229    /// Implicitly commit the cumulative auto-commit transaction.
230    /// This method only has effect when `auto_commit` is true.
231    ///
232    /// Follow-ups: the caller is responsible for renewing the transaction
233    /// as needed (e.g., via `renew_txn_if_auto_commit`). Prefer using
234    /// `with_barrier(...)` for most internal flows to handle this safely.
235    ///
236    /// Empty-commit behavior: if the pending transaction is empty, the returned
237    /// `Some(CommitOptions)` preserves next-commit options such as message and
238    /// timestamp so they can carry into the renewed transaction. Transient
239    /// labels like `origin` do not carry across an empty commit.
240    #[inline]
241    #[must_use]
242    pub fn implicit_commit_then_stop(
243        &self,
244    ) -> (
245        Option<CommitOptions>,
246        LoroMutexGuard<'_, Option<Transaction>>,
247    ) {
248        // Implicit commit: preserve options on empty commit
249        let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
250        (a, b.unwrap())
251    }
252
253    /// Commit the cumulative auto commit transaction.
254    /// It will start the next one immediately
255    ///
256    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
257    #[inline]
258    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
259        // Explicit commit: swallow options on empty commit
260        self.commit_internal(CommitOptions::new().immediate_renew(true), false)
261            .0
262    }
263
264    /// This method is called before the commit.
265    /// It can be used to modify the change before it is committed.
266    ///
267    /// It return Some(txn) if the txn is None
268    fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
269        let mut txn_guard = self.txn.lock();
270        let Some(txn) = txn_guard.as_mut() else {
271            return Some(txn_guard);
272        };
273
274        if txn.is_peer_first_appearance {
275            txn.is_peer_first_appearance = false;
276            drop(txn_guard);
277            // First commit from a peer
278            self.first_commit_from_peer_subs.emit(
279                &(),
280                FirstCommitFromPeerPayload {
281                    peer: self.peer_id(),
282                },
283            );
284        }
285
286        None
287    }
288
289    /// Core implementation for committing the cumulative auto-commit transaction.
290    ///
291    /// - When `preserve_on_empty` is true (implicit commits like export/checkout),
292    ///   commit options from an empty transaction are carried over to the next transaction
293    ///   (except `origin`, which never carries across an empty commit).
294    /// - When `preserve_on_empty` is false (explicit commits), commit options from an
295    ///   empty transaction are swallowed and NOT carried over.
296    #[instrument(skip_all)]
297    fn commit_internal(
298        &self,
299        config: CommitOptions,
300        preserve_on_empty: bool,
301    ) -> (
302        Option<CommitOptions>,
303        Option<LoroMutexGuard<'_, Option<Transaction>>>,
304    ) {
305        if !self.auto_commit.load(Acquire) {
306            let txn_guard = self.txn.lock();
307            // if not auto_commit, nothing should happen
308            // because the global txn is not used
309            return (None, Some(txn_guard));
310        }
311
312        loop {
313            if let Some(txn_guard) = self.before_commit() {
314                return (None, Some(txn_guard));
315            }
316
317            let mut txn_guard = self.txn.lock();
318            let txn = txn_guard.take();
319            let Some(mut txn) = txn else {
320                return (None, Some(txn_guard));
321            };
322            let on_commit = txn.take_on_commit();
323            if let Some(origin) = config.origin.clone() {
324                txn.set_origin(origin);
325            }
326
327            if let Some(timestamp) = config.timestamp {
328                txn.set_timestamp(timestamp);
329            }
330
331            if let Some(msg) = config.commit_msg.as_ref() {
332                txn.set_msg(Some(msg.clone()));
333            }
334
335            let id_span = txn.id_span();
336            let mut options = txn.commit().unwrap();
337            // Empty commit returns Some(options). We may preserve parts of it for implicit commits.
338            if let Some(opts) = options.as_mut() {
339                // `origin` is an event-only label and never carries across an empty commit
340                if config.origin.is_some() {
341                    opts.set_origin(None);
342                }
343                // For explicit commits, swallow options from empty commit entirely
344                if !preserve_on_empty {
345                    options = None;
346                }
347            }
348            if config.immediate_renew && self.can_edit() {
349                let mut t = self.txn().unwrap();
350                if let Some(options) = options.as_ref() {
351                    t.set_options(options.clone());
352                }
353                *txn_guard = Some(t);
354            }
355
356            if let Some(on_commit) = on_commit {
357                drop(txn_guard);
358                on_commit(&self.state, &self.oplog, id_span);
359                txn_guard = self.txn.lock();
360                if !config.immediate_renew && txn_guard.is_some() {
361                    // make sure that txn_guard is None when config.immediate_renew is false
362                    continue;
363                }
364            }
365
366            return (
367                options,
368                if !config.immediate_renew {
369                    Some(txn_guard)
370                } else {
371                    None
372                },
373            );
374        }
375    }
376
377    /// Commit the cumulative auto commit transaction (explicit API).
378    ///
379    /// This is used by user-facing explicit commits. If the transaction is empty,
380    /// any provided commit options are swallowed and will NOT carry over.
381    #[instrument(skip_all)]
382    pub fn commit_with(
383        &self,
384        config: CommitOptions,
385    ) -> (
386        Option<CommitOptions>,
387        Option<LoroMutexGuard<'_, Option<Transaction>>>,
388    ) {
389        self.commit_internal(config, false)
390    }
391
392    /// Set the commit message of the next commit
393    pub fn set_next_commit_message(&self, message: &str) {
394        let mut binding = self.txn.lock();
395        let Some(txn) = binding.as_mut() else {
396            return;
397        };
398
399        if message.is_empty() {
400            txn.set_msg(None)
401        } else {
402            txn.set_msg(Some(message.into()))
403        }
404    }
405
406    /// Set the origin of the next commit
407    pub fn set_next_commit_origin(&self, origin: &str) {
408        let mut txn = self.txn.lock();
409        if let Some(txn) = txn.as_mut() {
410            txn.set_origin(origin.into());
411        }
412    }
413
414    /// Set the timestamp of the next commit
415    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
416        let mut txn = self.txn.lock();
417        if let Some(txn) = txn.as_mut() {
418            txn.set_timestamp(timestamp);
419        }
420    }
421
422    /// Set the options of the next commit
423    pub fn set_next_commit_options(&self, options: CommitOptions) {
424        let mut txn = self.txn.lock();
425        if let Some(txn) = txn.as_mut() {
426            txn.set_options(options);
427        }
428    }
429
430    /// Clear the options of the next commit
431    pub fn clear_next_commit_options(&self) {
432        let mut txn = self.txn.lock();
433        if let Some(txn) = txn.as_mut() {
434            txn.set_options(CommitOptions::new());
435        }
436    }
437
438    /// Set whether to record the timestamp of each change. Default is `false`.
439    ///
440    /// If enabled, the Unix timestamp will be recorded for each change automatically.
441    ///
442    /// You can also set each timestamp manually when you commit a change.
443    /// The timestamp manually set will override the automatic one.
444    ///
445    /// NOTE: Timestamps are forced to be in ascending order.
446    /// If you commit a new change with a timestamp that is less than the existing one,
447    /// the largest existing timestamp will be used instead.
448    #[inline]
449    pub fn set_record_timestamp(&self, record: bool) {
450        self.config.set_record_timestamp(record);
451    }
452
453    /// Set the interval of mergeable changes, in seconds.
454    ///
455    /// If two continuous local changes are within the interval, they will be merged into one change.
456    /// The default value is 1000 seconds.
457    #[inline]
458    pub fn set_change_merge_interval(&self, interval: i64) {
459        self.config.set_merge_interval(interval);
460    }
461
462    pub fn can_edit(&self) -> bool {
463        !self.is_detached() || self.config.detached_editing()
464    }
465
466    pub fn is_detached_editing_enabled(&self) -> bool {
467        self.config.detached_editing()
468    }
469
470    #[inline]
471    pub fn config_text_style(&self, text_style: StyleConfigMap) {
472        self.config.text_style_config.write().map = text_style.map;
473    }
474
475    #[inline]
476    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
477        self.config.text_style_config.write().default_style = text_style;
478    }
479    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
480        let doc = Self::new();
481        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
482        if mode.is_snapshot() {
483            doc.with_barrier(|| -> Result<(), LoroError> {
484                decode_snapshot(&doc, mode, body, Default::default())?;
485                Ok(())
486            })?;
487            Ok(doc)
488        } else {
489            Err(LoroError::DecodeError(
490                "Invalid encode mode".to_string().into(),
491            ))
492        }
493    }
494
495    /// Is the document empty? (no ops)
496    #[inline(always)]
497    pub fn can_reset_with_snapshot(&self) -> bool {
498        let oplog = self.oplog.lock();
499        if oplog.batch_importing {
500            return false;
501        }
502
503        if self.is_detached() {
504            return false;
505        }
506
507        oplog.is_empty() && self.state.lock().can_import_snapshot()
508    }
509
510    /// Whether [OpLog] and [DocState] are detached.
511    ///
512    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
513    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
514    /// You need to call `checkout` to make it take effect.
515    #[inline(always)]
516    pub fn is_detached(&self) -> bool {
517        self.detached.load(Acquire)
518    }
519
520    pub(crate) fn set_detached(&self, detached: bool) {
521        self.detached.store(detached, Release);
522    }
523
524    #[inline(always)]
525    pub fn peer_id(&self) -> PeerID {
526        self.state
527            .lock()
528            .peer
529            .load(std::sync::atomic::Ordering::Relaxed)
530    }
531
532    #[inline(always)]
533    pub fn detach(&self) {
534        self.with_barrier(|| self.set_detached(true));
535    }
536
537    #[inline(always)]
538    pub fn attach(&self) {
539        self.checkout_to_latest()
540    }
541
542    /// Get the timestamp of the current state.
543    /// It's the last edit time of the [DocState].
544    pub fn state_timestamp(&self) -> Timestamp {
545        // Acquire locks in correct order: read frontiers first, then query OpLog.
546        let f = { self.state.lock().frontiers.clone() };
547        self.oplog.lock().get_timestamp_of_version(&f)
548    }
549
550    #[inline(always)]
551    pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
552        &self.state
553    }
554
555    #[inline]
556    pub fn get_state_deep_value(&self) -> LoroValue {
557        self.state.lock().get_deep_value()
558    }
559
560    #[inline(always)]
561    pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
562        &self.oplog
563    }
564
565    #[inline(always)]
566    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
567        let s = debug_span!("import", peer = self.peer_id());
568        let _e = s.enter();
569        self.import_with(bytes, Default::default())
570    }
571
572    #[inline]
573    pub fn import_with(
574        &self,
575        bytes: &[u8],
576        origin: InternalString,
577    ) -> Result<ImportStatus, LoroError> {
578        self.with_barrier(|| self._import_with(bytes, origin))
579    }
580
581    #[tracing::instrument(skip_all)]
582    fn _import_with(
583        &self,
584        bytes: &[u8],
585        origin: InternalString,
586    ) -> Result<ImportStatus, LoroError> {
587        ensure_cov::notify_cov("loro_internal::import");
588        let parsed = parse_header_and_body(bytes, true)?;
589        loro_common::info!("Importing with mode={:?}", &parsed.mode);
590        let result = match parsed.mode {
591            EncodeMode::OutdatedRle => {
592                if self.state.lock().is_in_txn() {
593                    return Err(LoroError::ImportWhenInTxn);
594                }
595
596                let s = tracing::span!(
597                    tracing::Level::INFO,
598                    "Import updates ",
599                    peer = self.peer_id()
600                );
601                let _e = s.enter();
602                self.update_oplog_and_apply_delta_to_state_if_needed(
603                    |oplog| oplog.decode(parsed),
604                    origin,
605                )
606            }
607            EncodeMode::OutdatedSnapshot => {
608                if self.can_reset_with_snapshot() {
609                    loro_common::info!("Init by snapshot {}", self.peer_id());
610                    decode_snapshot(self, parsed.mode, parsed.body, origin)
611                } else {
612                    self.update_oplog_and_apply_delta_to_state_if_needed(
613                        |oplog| oplog.decode(parsed),
614                        origin,
615                    )
616                }
617            }
618            EncodeMode::FastSnapshot => {
619                if self.can_reset_with_snapshot() {
620                    ensure_cov::notify_cov("loro_internal::import::snapshot");
621                    loro_common::info!("Init by fast snapshot {}", self.peer_id());
622                    decode_snapshot(self, parsed.mode, parsed.body, origin)
623                } else {
624                    self.import_changes_and_apply_delta_to_state_if_needed(
625                        |oplog| encoding::decode_oplog_changes(oplog, parsed),
626                        origin,
627                    )
628
629                    // let new_doc = LoroDoc::new();
630                    // new_doc.import(bytes)?;
631                    // let updates = new_doc.export(ExportMode::updates(&self.oplog_vv())).unwrap();
632                    // return self.import_with(updates.as_slice(), origin);
633                }
634            }
635            EncodeMode::FastUpdates => self.import_changes_and_apply_delta_to_state_if_needed(
636                |oplog| encoding::decode_oplog_changes(oplog, parsed),
637                origin,
638            ),
639            EncodeMode::Auto => {
640                unreachable!()
641            }
642        };
643
644        self.emit_events();
645
646        result
647    }
648
649    #[tracing::instrument(skip_all)]
650    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
651        &self,
652        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
653        origin: InternalString,
654    ) -> Result<ImportStatus, LoroError> {
655        let mut oplog = self.oplog.lock();
656        oplog.begin_import_rollback();
657        if !self.is_detached() {
658            let old_vv = oplog.vv().clone();
659            let old_frontiers = oplog.frontiers().clone();
660            let result = f(&mut oplog);
661            if &old_vv != oplog.vv() {
662                let mut diff = DiffCalculator::new(false);
663                let (diff, diff_mode) = diff.calc_diff_internal(
664                    &oplog,
665                    &old_vv,
666                    &old_frontiers,
667                    oplog.vv(),
668                    oplog.dag.get_frontiers(),
669                    None,
670                );
671                let mut state = self.state.lock();
672                if let Err(e) = state.apply_diff(
673                    InternalDocDiff {
674                        origin,
675                        diff: (diff).into(),
676                        by: EventTriggerKind::Import,
677                        new_version: Cow::Owned(oplog.frontiers().clone()),
678                    },
679                    diff_mode,
680                ) {
681                    oplog.rollback_import();
682                    return Err(e);
683                }
684            }
685            match result {
686                Ok(result) => {
687                    oplog.commit_import_rollback();
688                    Ok(result)
689                }
690                Err(e) => {
691                    // Some import errors are reported after a valid prefix has
692                    // been inserted into the oplog. Keep that prefix if it was
693                    // also applied to state; rollback is for failed state apply
694                    // or decode errors that made no visible oplog progress.
695                    if &old_vv == oplog.vv() {
696                        oplog.rollback_import();
697                    } else {
698                        oplog.commit_import_rollback();
699                    }
700                    Err(e)
701                }
702            }
703        } else {
704            match f(&mut oplog) {
705                Ok(result) => {
706                    oplog.commit_import_rollback();
707                    Ok(result)
708                }
709                Err(e) => {
710                    oplog.rollback_import();
711                    Err(e)
712                }
713            }
714        }
715    }
716
717    #[tracing::instrument(skip_all)]
718    pub(crate) fn import_changes_and_apply_delta_to_state_if_needed(
719        &self,
720        decode_changes: impl FnOnce(&mut OpLog) -> Result<Vec<Change>, LoroError>,
721        origin: InternalString,
722    ) -> Result<ImportStatus, LoroError> {
723        let mut oplog = self.oplog.lock();
724        let arena_checkpoint = oplog.arena.checkpoint_for_rollback();
725        let changes = match decode_changes(&mut oplog) {
726            Ok(changes) => changes,
727            Err(e) => {
728                oplog.arena.rollback(arena_checkpoint);
729                return Err(e);
730            }
731        };
732
733        let preflight = oplog.preflight_import_changes(&changes);
734        if preflight.has_deps_before_shallow_root
735            && (self.is_detached() || !preflight.applies_to_dag)
736        {
737            oplog.arena.rollback(arena_checkpoint);
738            return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
739        }
740
741        if self.is_detached() {
742            let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
743            if result.has_deps_before_shallow_root {
744                return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
745            }
746
747            return Ok(result.status);
748        }
749
750        if !preflight.applies_to_dag {
751            let pending_root_containers = pending_root_containers_to_materialize(&oplog, &changes);
752            let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
753            if result.has_deps_before_shallow_root {
754                oplog.arena.rollback(arena_checkpoint);
755                return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
756            }
757
758            if !pending_root_containers.is_empty() {
759                let mut state = self.state.lock();
760                for id in pending_root_containers {
761                    state.ensure_container(&id);
762                }
763            }
764
765            return Ok(result.status);
766        }
767
768        let old_vv = oplog.vv().clone();
769        let old_frontiers = oplog.frontiers().clone();
770        let rollback_enabled = preflight.needs_state_apply_rollback;
771        if rollback_enabled {
772            oplog.begin_import_rollback_with_arena(arena_checkpoint);
773        }
774
775        let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
776        if &old_vv != oplog.vv() {
777            let mut diff = DiffCalculator::new(false);
778            let (diff, diff_mode) = diff.calc_diff_internal(
779                &oplog,
780                &old_vv,
781                &old_frontiers,
782                oplog.vv(),
783                oplog.dag.get_frontiers(),
784                None,
785            );
786            let mut state = self.state.lock();
787            if let Err(e) = state.apply_diff(
788                InternalDocDiff {
789                    origin,
790                    diff: (diff).into(),
791                    by: EventTriggerKind::Import,
792                    new_version: Cow::Owned(oplog.frontiers().clone()),
793                },
794                diff_mode,
795            ) {
796                if rollback_enabled {
797                    oplog.rollback_import();
798                    return Err(e);
799                }
800
801                panic!("state apply returned Err for import without rollback guard: {e}");
802            }
803        }
804
805        if result.has_deps_before_shallow_root {
806            if rollback_enabled {
807                oplog.commit_import_rollback();
808            }
809            return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
810        }
811
812        if rollback_enabled {
813            oplog.commit_import_rollback();
814        }
815        Ok(result.status)
816    }
817
818    fn emit_events(&self) {
819        // we should not hold the lock when emitting events
820        let events = {
821            let mut state = self.state.lock();
822            state.take_events()
823        };
824        for event in events {
825            self.observer.emit(event);
826        }
827    }
828
829    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
830        let mut state = self.state.lock();
831        state.take_events()
832    }
833
834    /// Import the json schema updates.
835    ///
836    /// only supports backward compatibility but not forward compatibility.
837    #[tracing::instrument(skip_all)]
838    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
839        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
840        self.with_barrier(|| {
841            let result = self.import_changes_and_apply_delta_to_state_if_needed(
842                |oplog| crate::encoding::json_schema::decode_json_changes(json, &oplog.arena),
843                Default::default(),
844            );
845            self.emit_events();
846            result
847        })
848    }
849
850    pub fn export_json_updates(
851        &self,
852        start_vv: &VersionVector,
853        end_vv: &VersionVector,
854        with_peer_compression: bool,
855    ) -> JsonSchema {
856        self.with_barrier(|| {
857            let oplog = self.oplog.lock();
858            let mut start_vv = start_vv;
859            let _temp: Option<VersionVector>;
860            if !oplog.dag.shallow_since_vv().is_empty() {
861                // Make sure that start_vv >= shallow_since_vv
862                let mut include_all = true;
863                for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
864                    if start_vv.get(peer).unwrap_or(&0) < counter {
865                        include_all = false;
866                        break;
867                    }
868                }
869                if !include_all {
870                    let mut vv = start_vv.clone();
871                    for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
872                        vv.extend_to_include_end_id(ID::new(peer, counter));
873                    }
874                    _temp = Some(vv);
875                    start_vv = _temp.as_ref().unwrap();
876                }
877            }
878
879            crate::encoding::json_schema::export_json(
880                &oplog,
881                start_vv,
882                end_vv,
883                with_peer_compression,
884            )
885        })
886    }
887
888    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
889        let oplog = self.oplog.lock();
890        let mut changes = export_json_in_id_span(&oplog, id_span);
891        if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
892            let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
893            changes.push(change_json);
894        }
895        changes
896    }
897
898    /// Get the version vector of the current OpLog
899    #[inline]
900    pub fn oplog_vv(&self) -> VersionVector {
901        self.oplog.lock().vv().clone()
902    }
903
904    /// Get the version vector of the current [DocState]
905    #[inline]
906    pub fn state_vv(&self) -> VersionVector {
907        let oplog = self.oplog.lock();
908        let f = &self.state.lock().frontiers;
909        oplog.dag.frontiers_to_vv(f).unwrap()
910    }
911
912    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
913        let value: LoroValue = self.state.lock().get_value_by_path(path)?;
914        if let LoroValue::Container(c) = value {
915            Some(ValueOrHandler::Handler(Handler::new_attached(
916                c.clone(),
917                self.clone(),
918            )))
919        } else {
920            Some(ValueOrHandler::Value(value))
921        }
922    }
923
924    /// Get the handler by the string path.
925    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
926        let path = str_to_path(path)?;
927        self.get_by_path(&path)
928    }
929
930    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
931        let arena = &self.arena;
932        let txn = self.txn.lock();
933        let txn = txn.as_ref()?;
934        let ops_ = txn.local_ops();
935        let new_id = ID {
936            peer: *txn.peer(),
937            counter: ops_.first()?.counter,
938        };
939        let change = ChangeRef {
940            id: &new_id,
941            deps: txn.frontiers(),
942            timestamp: &txn
943                .timestamp()
944                .as_ref()
945                .copied()
946                .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
947            commit_msg: txn.msg(),
948            ops: ops_,
949            lamport: txn.lamport(),
950        };
951        let json = encode_change_to_json(change, arena);
952        Some(json)
953    }
954
955    #[inline]
956    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
957        if self.has_container(&id) {
958            self.ensure_root_container(&id);
959            Some(Handler::new_attached(id, self.clone()))
960        } else {
961            None
962        }
963    }
964
965    #[inline]
966    fn ensure_root_container(&self, id: &ContainerID) {
967        if id.is_root() {
968            self.state.lock().ensure_container(id);
969        }
970    }
971
972    /// id can be a str, ContainerID, or ContainerIdRaw.
973    /// if it's str it will use Root container, which will not be None
974    #[inline]
975    pub fn try_get_text<I: IntoContainerId>(&self, id: I) -> Option<TextHandler> {
976        let id = id.into_container_id(&self.arena, ContainerType::Text);
977        if !self.has_container(&id) {
978            return None;
979        }
980        self.ensure_root_container(&id);
981        Handler::new_attached(id, self.clone()).into_text().ok()
982    }
983
984    /// id can be a str, ContainerID, or ContainerIdRaw.
985    /// if it's str it will use Root container, which will not be None
986    #[inline]
987    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
988        self.try_get_text(id)
989            .expect("The container does not exist in the document. Use `try_get_text` or `get_container` to check for existence.")
990    }
991
992    /// id can be a str, ContainerID, or ContainerIdRaw.
993    /// if it's str it will use Root container, which will not be None
994    #[inline]
995    pub fn try_get_list<I: IntoContainerId>(&self, id: I) -> Option<ListHandler> {
996        let id = id.into_container_id(&self.arena, ContainerType::List);
997        if !self.has_container(&id) {
998            return None;
999        }
1000        self.ensure_root_container(&id);
1001        Handler::new_attached(id, self.clone()).into_list().ok()
1002    }
1003
1004    /// id can be a str, ContainerID, or ContainerIdRaw.
1005    /// if it's str it will use Root container, which will not be None
1006    #[inline]
1007    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
1008        self.try_get_list(id)
1009            .expect("The container does not exist in the document. Use `try_get_list` or `get_container` to check for existence.")
1010    }
1011
1012    /// id can be a str, ContainerID, or ContainerIdRaw.
1013    /// if it's str it will use Root container, which will not be None
1014    #[inline]
1015    pub fn try_get_movable_list<I: IntoContainerId>(&self, id: I) -> Option<MovableListHandler> {
1016        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
1017        if !self.has_container(&id) {
1018            return None;
1019        }
1020        self.ensure_root_container(&id);
1021        Handler::new_attached(id, self.clone())
1022            .into_movable_list()
1023            .ok()
1024    }
1025
1026    /// id can be a str, ContainerID, or ContainerIdRaw.
1027    /// if it's str it will use Root container, which will not be None
1028    #[inline]
1029    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
1030        self.try_get_movable_list(id)
1031            .expect("The container does not exist in the document. Use `try_get_movable_list` or `get_container` to check for existence.")
1032    }
1033
1034    /// id can be a str, ContainerID, or ContainerIdRaw.
1035    /// if it's str it will use Root container, which will not be None
1036    #[inline]
1037    pub fn try_get_map<I: IntoContainerId>(&self, id: I) -> Option<MapHandler> {
1038        let id = id.into_container_id(&self.arena, ContainerType::Map);
1039        if !self.has_container(&id) {
1040            return None;
1041        }
1042        self.ensure_root_container(&id);
1043        Handler::new_attached(id, self.clone()).into_map().ok()
1044    }
1045
1046    /// id can be a str, ContainerID, or ContainerIdRaw.
1047    /// if it's str it will use Root container, which will not be None
1048    #[inline]
1049    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
1050        self.try_get_map(id)
1051            .expect("The container does not exist in the document. Use `try_get_map` or `get_container` to check for existence.")
1052    }
1053
1054    /// id can be a str, ContainerID, or ContainerIdRaw.
1055    /// if it's str it will use Root container, which will not be None
1056    #[inline]
1057    pub fn try_get_tree<I: IntoContainerId>(&self, id: I) -> Option<TreeHandler> {
1058        let id = id.into_container_id(&self.arena, ContainerType::Tree);
1059        if !self.has_container(&id) {
1060            return None;
1061        }
1062        self.ensure_root_container(&id);
1063        Handler::new_attached(id, self.clone()).into_tree().ok()
1064    }
1065
1066    /// id can be a str, ContainerID, or ContainerIdRaw.
1067    /// if it's str it will use Root container, which will not be None
1068    #[inline]
1069    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
1070        self.try_get_tree(id)
1071            .expect("The container does not exist in the document. Use `try_get_tree` or `get_container` to check for existence.")
1072    }
1073
1074    #[cfg(feature = "counter")]
1075    pub fn try_get_counter<I: IntoContainerId>(
1076        &self,
1077        id: I,
1078    ) -> Option<crate::handler::counter::CounterHandler> {
1079        let id = id.into_container_id(&self.arena, ContainerType::Counter);
1080        if !self.has_container(&id) {
1081            return None;
1082        }
1083        self.ensure_root_container(&id);
1084        Handler::new_attached(id, self.clone()).into_counter().ok()
1085    }
1086
1087    #[cfg(feature = "counter")]
1088    pub fn get_counter<I: IntoContainerId>(
1089        &self,
1090        id: I,
1091    ) -> crate::handler::counter::CounterHandler {
1092        self.try_get_counter(id)
1093            .expect("The container does not exist in the document. Use `try_get_counter` or `get_container` to check for existence.")
1094    }
1095
1096    #[must_use]
1097    pub fn has_container(&self, id: &ContainerID) -> bool {
1098        if id.is_root() {
1099            return true;
1100        }
1101
1102        let exist = self.state.lock().does_container_exist(id);
1103        exist
1104    }
1105
1106    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
1107    ///
1108    /// This is an internal API. You should NOT use it directly.
1109    ///
1110    /// # Internal
1111    ///
1112    /// This method will use the diff calculator to calculate the diff required to time travel
1113    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
1114    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
1115    ///
1116    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
1117    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
1118    /// distance from id_span to the current latest version.
1119    #[instrument(level = "info", skip_all)]
1120    pub fn undo_internal(
1121        &self,
1122        id_span: IdSpan,
1123        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1124        post_transform_base: Option<&DiffBatch>,
1125        before_diff: &mut dyn FnMut(&DiffBatch),
1126    ) -> LoroResult<CommitWhenDrop<'_>> {
1127        if !self.can_edit() {
1128            return Err(LoroError::EditWhenDetached);
1129        }
1130
1131        let (options, txn) = self.implicit_commit_then_stop();
1132        if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
1133            self.renew_txn_if_auto_commit(options);
1134            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
1135        }
1136
1137        let (was_recording, latest_frontiers) = {
1138            let mut state = self.state.lock();
1139            let was_recording = state.is_recording();
1140            state.stop_and_clear_recording();
1141            (was_recording, state.frontiers.clone())
1142        };
1143
1144        let spans = self.oplog.lock().split_span_based_on_deps(id_span);
1145        let diff = crate::undo::undo(
1146            spans,
1147            match post_transform_base {
1148                Some(d) => Either::Right(d),
1149                None => Either::Left(&latest_frontiers),
1150            },
1151            |from, to| {
1152                self._checkout_without_emitting(from, false, false).unwrap();
1153                self.state.lock().start_recording();
1154                self._checkout_without_emitting(to, false, false).unwrap();
1155                let mut state = self.state.lock();
1156                let e = state.take_events();
1157                state.stop_and_clear_recording();
1158                DiffBatch::new(e)
1159            },
1160            before_diff,
1161        );
1162
1163        // println!("\nundo_internal: diff: {:?}", diff);
1164        // println!("container remap: {:?}", container_remap);
1165
1166        self._checkout_without_emitting(&latest_frontiers, false, false)?;
1167        self.set_detached(false);
1168        if was_recording {
1169            self.state.lock().start_recording();
1170        }
1171        drop(txn);
1172        self.start_auto_commit();
1173        // Try applying the diff, but ignore the error if it happens.
1174        // MovableList's undo behavior is too tricky to handle in a collaborative env
1175        // so in edge cases this may be an Error
1176        if let Err(e) = self._apply_diff(diff, container_remap, true) {
1177            warn!("Undo Failed {:?}", e);
1178        }
1179
1180        if let Some(options) = options {
1181            self.set_next_commit_options(options);
1182        }
1183        Ok(CommitWhenDrop {
1184            doc: self,
1185            default_options: CommitOptions::new().origin("undo"),
1186        })
1187    }
1188
1189    /// Generate a series of local operations that can revert the current doc to the target
1190    /// version.
1191    ///
1192    /// Internally, it will calculate the diff between the current state and the target state,
1193    /// and apply the diff to the current state.
1194    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1195        // TODO: test when the doc is readonly
1196        // TODO: test when the doc is detached but enabled editing
1197        let f = self.state_frontiers();
1198        let diff = self.diff(&f, target)?;
1199        self._apply_diff(diff, &mut Default::default(), false)
1200    }
1201
1202    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
1203    ///
1204    /// NOTE: This method will make the doc enter the **detached mode**.
1205    // FIXME: This method needs testing (no event should be emitted during processing this)
1206    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1207        {
1208            // Check whether a and b are valid before checkout so this returns a normal error
1209            // instead of panicking on shallow docs.
1210            let oplog = self.oplog.lock();
1211            let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1212                for id in frontiers.iter() {
1213                    if !oplog.dag.contains(id) {
1214                        return Err(LoroError::FrontiersNotFound(id));
1215                    }
1216                }
1217
1218                if oplog.dag.is_before_shallow_root(frontiers) {
1219                    return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1220                }
1221
1222                Ok(())
1223            };
1224
1225            validate_frontiers(a)?;
1226            validate_frontiers(b)?;
1227        }
1228
1229        let (options, txn) = self.implicit_commit_then_stop();
1230        let was_detached = self.is_detached();
1231        let old_frontiers = self.state_frontiers();
1232        let was_recording = {
1233            let mut state = self.state.lock();
1234            let is_recording = state.is_recording();
1235            state.stop_and_clear_recording();
1236            is_recording
1237        };
1238        let result = (|| {
1239            self._checkout_without_emitting(a, true, false)?;
1240            self.state.lock().start_recording();
1241            self._checkout_without_emitting(b, true, false)?;
1242            let mut state = self.state.lock();
1243            let e = state.take_events();
1244            state.stop_and_clear_recording();
1245            Ok::<_, LoroError>(e)
1246        })();
1247
1248        // Always restore state regardless of whether diff calculation succeeded
1249        self._checkout_without_emitting(&old_frontiers, false, false)
1250            .unwrap();
1251        drop(txn);
1252        if !was_detached {
1253            self.set_detached(false);
1254            self.renew_txn_if_auto_commit(options);
1255        }
1256        if was_recording {
1257            self.state.lock().start_recording();
1258        }
1259        result.map(DiffBatch::new)
1260    }
1261
1262    /// Apply a diff to the current state.
1263    #[inline(always)]
1264    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1265        self._apply_diff(diff, &mut Default::default(), true)
1266    }
1267
1268    /// Apply a diff to the current state.
1269    ///
1270    /// This method will not recreate containers with the same [ContainerID]s.
1271    /// While this can be convenient in certain cases, it can break several internal invariants:
1272    ///
1273    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
1274    ///    would result in multiple instances of the same container in the document.
1275    /// 2. Unreachable containers should be removable from the state when necessary.
1276    ///
1277    /// However, the diff may contain operations that depend on container IDs.
1278    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
1279    pub(crate) fn _apply_diff(
1280        &self,
1281        diff: DiffBatch,
1282        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1283        skip_unreachable: bool,
1284    ) -> LoroResult<()> {
1285        if !self.can_edit() {
1286            return Err(LoroError::EditWhenDetached);
1287        }
1288
1289        let mut ans: LoroResult<()> = Ok(());
1290        let mut missing_containers: Vec<ContainerID> = Vec::new();
1291        for (mut id, diff) in diff.into_iter() {
1292            let mut remapped = false;
1293            while let Some(rid) = container_remap.get(&id) {
1294                remapped = true;
1295                id = rid.clone();
1296            }
1297
1298            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1299                // Not in arena does not imply non-existent; consult state/kv and register lazily
1300                let exists = self.state.lock().does_container_exist(&id);
1301                if !exists {
1302                    missing_containers.push(id);
1303                    continue;
1304                }
1305                // Ensure registration so handlers can be created
1306                self.state.lock().ensure_container(&id);
1307            }
1308
1309            if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1310                continue;
1311            }
1312
1313            let Some(h) = self.get_handler(id.clone()) else {
1314                return Err(LoroError::ContainersNotFound {
1315                    containers: Box::new(vec![id]),
1316                });
1317            };
1318            if let Err(e) = h.apply_diff(diff, container_remap) {
1319                ans = Err(e);
1320            }
1321        }
1322
1323        if !missing_containers.is_empty() {
1324            return Err(LoroError::ContainersNotFound {
1325                containers: Box::new(missing_containers),
1326            });
1327        }
1328
1329        ans
1330    }
1331
1332    /// This is for debugging purpose. It will travel the whole oplog
1333    #[inline]
1334    pub fn diagnose_size(&self) {
1335        self.oplog().lock().diagnose_size();
1336    }
1337
1338    #[inline]
1339    pub fn oplog_frontiers(&self) -> Frontiers {
1340        self.oplog().lock().frontiers().clone()
1341    }
1342
1343    #[inline]
1344    pub fn state_frontiers(&self) -> Frontiers {
1345        self.state.lock().frontiers.clone()
1346    }
1347
1348    /// - Ordering::Less means self is less than target or parallel
1349    /// - Ordering::Equal means versions equal
1350    /// - Ordering::Greater means self's version is greater than target
1351    #[inline]
1352    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1353        self.oplog().lock().cmp_with_frontiers(other)
1354    }
1355
1356    /// Compare two [Frontiers] causally.
1357    ///
1358    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1359    #[inline]
1360    pub fn cmp_frontiers(
1361        &self,
1362        a: &Frontiers,
1363        b: &Frontiers,
1364    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1365        self.oplog().lock().cmp_frontiers(a, b)
1366    }
1367
1368    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1369        let mut state = self.state.lock();
1370        if !state.is_recording() {
1371            state.start_recording();
1372        }
1373
1374        self.observer.subscribe_root(callback)
1375    }
1376
1377    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1378        let mut state = self.state.lock();
1379        if !state.is_recording() {
1380            state.start_recording();
1381        }
1382
1383        self.observer.subscribe(container_id, callback)
1384    }
1385
1386    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1387        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1388        activate();
1389        sub
1390    }
1391
1392    // PERF: opt
1393    #[tracing::instrument(skip_all)]
1394    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1395        if bytes.is_empty() {
1396            return Ok(ImportStatus::default());
1397        }
1398
1399        if bytes.len() == 1 {
1400            return self.import(&bytes[0]);
1401        }
1402
1403        let mut success = VersionRange::default();
1404        let mut meta_arr = bytes
1405            .iter()
1406            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1407            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1408        meta_arr.sort_by(|a, b| {
1409            a.0.mode
1410                .cmp(&b.0.mode)
1411                .then(b.0.change_num.cmp(&a.0.change_num))
1412        });
1413
1414        let (options, txn) = self.implicit_commit_then_stop();
1415        // Why we should keep locking `txn` here
1416        //
1417        // In a multi-threaded environment, `import_batch` used to drop the txn lock
1418        // (via `commit_then_stop` + `drop(txn)`) and call `detach()`/`checkout_to_latest()`
1419        // around the batch import. That created a race where another thread could
1420        // start or renew the auto-commit txn and perform local edits while we were
1421        // importing and temporarily detached. Those interleaved local edits could
1422        // violate invariants between `OpLog` and `DocState` (e.g., state being
1423        // updated when we expect it not to, missed events, or inconsistent
1424        // frontiers), as exposed by the loom test `local_edits_during_batch_import`.
1425        //
1426        // The fix is to hold the txn mutex for the entire critical section:
1427        // - Stop the current txn and keep the mutex guard.
1428        // - Force-detach with `set_detached(true)` (avoids `detach()` side effects),
1429        //   then run each `_import_with(...)` while detached so imports only touch
1430        //   the `OpLog`.
1431        // - After importing, reattach by checking out to latest and renew the txn
1432        //   using `_checkout_to_latest_with_guard`, which keeps the mutex held while
1433        //   (re)starting the auto-commit txn.
1434        //
1435        // Holding the lock ensures no concurrent thread can create/renew a txn and
1436        // do local edits in the middle of the batch import, making the whole
1437        // operation atomic with respect to local edits.
1438        let is_detached = self.is_detached();
1439        self.set_detached(true);
1440        self.oplog.lock().batch_importing = true;
1441        let mut err = None;
1442        for (_meta, data) in meta_arr {
1443            match self._import_with(data, Default::default()) {
1444                Ok(s) => {
1445                    for (peer, (start, end)) in s.success.iter() {
1446                        match success.0.entry(*peer) {
1447                            Entry::Occupied(mut e) => {
1448                                e.get_mut().1 = *end.max(&e.get().1);
1449                            }
1450                            Entry::Vacant(e) => {
1451                                e.insert((*start, *end));
1452                            }
1453                        }
1454                    }
1455                }
1456                Err(e) => {
1457                    err = Some(e);
1458                }
1459            }
1460        }
1461
1462        let mut oplog = self.oplog.lock();
1463        oplog.batch_importing = false;
1464        let pending = oplog.pending_changes.version_range();
1465        drop(oplog);
1466        if !is_detached {
1467            self._checkout_to_latest_with_guard(txn);
1468        } else {
1469            drop(txn);
1470        }
1471
1472        self.renew_txn_if_auto_commit(options);
1473        if let Some(err) = err {
1474            return Err(err);
1475        }
1476
1477        Ok(ImportStatus {
1478            success,
1479            pending: if pending.is_empty() {
1480                None
1481            } else {
1482                Some(pending)
1483            },
1484        })
1485    }
1486
1487    /// Get shallow value of the document.
1488    #[inline]
1489    pub fn get_value(&self) -> LoroValue {
1490        self.state.lock().get_value()
1491    }
1492
1493    /// Get deep value of the document.
1494    #[inline]
1495    pub fn get_deep_value(&self) -> LoroValue {
1496        self.state.lock().get_deep_value()
1497    }
1498
1499    /// Get deep value of the document with container id
1500    #[inline]
1501    pub fn get_deep_value_with_id(&self) -> LoroValue {
1502        self.state.lock().get_deep_value_with_id()
1503    }
1504
1505    pub fn checkout_to_latest(&self) {
1506        let (options, _guard) = self.implicit_commit_then_stop();
1507        if !self.is_detached() {
1508            drop(_guard);
1509            self.renew_txn_if_auto_commit(options);
1510            return;
1511        }
1512
1513        self._checkout_to_latest_without_commit(true)
1514            .expect("checkout to oplog frontiers should succeed");
1515        self.emit_events();
1516        drop(_guard);
1517        self.renew_txn_if_auto_commit(options);
1518    }
1519
1520    fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1521        if !self.is_detached() {
1522            self._renew_txn_if_auto_commit_with_guard(None, guard);
1523            return;
1524        }
1525
1526        self._checkout_to_latest_without_commit(true)
1527            .expect("checkout to oplog frontiers should succeed");
1528        self._renew_txn_if_auto_commit_with_guard(None, guard);
1529    }
1530
1531    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1532    pub(crate) fn _checkout_to_latest_without_commit(
1533        &self,
1534        to_commit_then_renew: bool,
1535    ) -> LoroResult<()> {
1536        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1537            let f = self.oplog_frontiers();
1538            let this = &self;
1539            let frontiers = &f;
1540            this._checkout_without_emitting(frontiers, false, to_commit_then_renew)?;
1541            // We don't need to shrink frontiers because oplog's frontiers are already shrinked.
1542            this.emit_events();
1543            if this.config.detached_editing() {
1544                this.renew_peer_id();
1545            }
1546
1547            self.set_detached(false);
1548            Ok(())
1549        })
1550    }
1551
1552    /// Checkout [DocState] to a specific version.
1553    ///
1554    /// This will make the current [DocState] detached from the latest version of [OpLog].
1555    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1556    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1557        let was_detached = self.is_detached();
1558        let (options, guard) = self.implicit_commit_then_stop();
1559        let result = self._checkout_without_emitting(frontiers, true, true);
1560        if result.is_ok() {
1561            self.emit_events();
1562        }
1563        drop(guard);
1564        if self.config.detached_editing() {
1565            if result.is_ok() {
1566                self.renew_peer_id();
1567            }
1568            self.renew_txn_if_auto_commit(options);
1569        } else if result.is_err() {
1570            if !was_detached {
1571                self.renew_txn_if_auto_commit(options);
1572            }
1573        } else if !self.is_detached() {
1574            self.renew_txn_if_auto_commit(options);
1575        }
1576
1577        result
1578    }
1579
1580    /// NOTE: The caller of this method should ensure the txn is locked and set to None
1581    #[instrument(level = "info", skip(self))]
1582    pub(crate) fn _checkout_without_emitting(
1583        &self,
1584        frontiers: &Frontiers,
1585        to_shrink_frontiers: bool,
1586        to_commit_then_renew: bool,
1587    ) -> Result<(), LoroError> {
1588        if !self.txn.is_locked() {
1589            return Err(LoroError::TransactionError(
1590                "checkout requires the transaction mutex to be held"
1591                    .to_string()
1592                    .into_boxed_str(),
1593            ));
1594        }
1595        let from_frontiers = self.state_frontiers();
1596        loro_common::info!(
1597            "checkout from={:?} to={:?} cur_vv={:?}",
1598            from_frontiers,
1599            frontiers,
1600            self.oplog_vv()
1601        );
1602
1603        if &from_frontiers == frontiers {
1604            return Ok(());
1605        }
1606
1607        let oplog = self.oplog.lock();
1608        if oplog.dag.is_before_shallow_root(frontiers) {
1609            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1610        }
1611
1612        let frontiers = if to_shrink_frontiers {
1613            shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1614        } else {
1615            frontiers.clone()
1616        };
1617
1618        if from_frontiers == frontiers {
1619            return Ok(());
1620        }
1621
1622        let mut state = self.state.lock();
1623        let mut calc = self.diff_calculator.lock();
1624        for i in frontiers.iter() {
1625            if !oplog.dag.contains(i) {
1626                return Err(LoroError::FrontiersNotFound(i));
1627            }
1628        }
1629
1630        let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1631            LoroError::NotFoundError(
1632                format!(
1633                    "Cannot find the current state version {:?}",
1634                    state.frontiers
1635                )
1636                .into_boxed_str(),
1637            )
1638        })?;
1639        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1640            return Err(LoroError::NotFoundError(
1641                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1642            ));
1643        };
1644
1645        self.set_detached(true);
1646        let (diff, diff_mode) =
1647            calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1648        state.apply_diff(
1649            InternalDocDiff {
1650                origin: "checkout".into(),
1651                diff: Cow::Owned(diff),
1652                by: EventTriggerKind::Checkout,
1653                new_version: Cow::Owned(frontiers.clone()),
1654            },
1655            diff_mode,
1656        )?;
1657
1658        Ok(())
1659    }
1660
1661    #[inline]
1662    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1663        self.oplog.lock().dag.vv_to_frontiers(vv)
1664    }
1665
1666    #[inline]
1667    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1668        self.oplog.lock().dag.frontiers_to_vv(frontiers)
1669    }
1670
1671    /// Import ops from other doc.
1672    ///
1673    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1674    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1675        let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1676        self.import(&updates)
1677    }
1678
1679    pub(crate) fn arena(&self) -> &SharedArena {
1680        &self.arena
1681    }
1682
1683    #[inline]
1684    pub fn len_ops(&self) -> usize {
1685        if self.oplog.can_lock_in_this_thread() {
1686            return self.oplog.lock().visible_op_count_exact();
1687        }
1688
1689        self.visible_op_count.load(Acquire)
1690    }
1691
1692    #[inline]
1693    pub fn len_changes(&self) -> usize {
1694        let oplog = self.oplog.lock();
1695        oplog.len_changes()
1696    }
1697
1698    pub fn config(&self) -> &Configure {
1699        &self.config
1700    }
1701
1702    /// This method compare the consistency between the current doc state
1703    /// and the state calculated by diff calculator from beginning.
1704    ///
1705    /// Panic when it's not consistent
1706    pub fn check_state_diff_calc_consistency_slow(&self) {
1707        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1708        {
1709            static IS_CHECKING: std::sync::atomic::AtomicBool =
1710                std::sync::atomic::AtomicBool::new(false);
1711            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1712                return;
1713            }
1714
1715            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1716            let peer_id = self.peer_id();
1717            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1718            let _g = s.enter();
1719            let options = self.implicit_commit_then_stop().0;
1720            self.oplog.lock().check_dag_correctness();
1721            if self.is_shallow() {
1722                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1723                //
1724                // Instead, we:
1725                // 1. Export the initial state from the GC snapshot.
1726                // 2. Create a new document and import the initial snapshot.
1727                // 3. Export updates from the shallow start version vector to the current version.
1728                // 4. Import these updates into the new document.
1729                // 5. Compare the states of the new document and the current document.
1730
1731                // Step 1: Export the initial state from the GC snapshot.
1732                let initial_snapshot = self
1733                    .export(ExportMode::state_only(Some(
1734                        &self.shallow_since_frontiers(),
1735                    )))
1736                    .unwrap();
1737
1738                // Step 2: Create a new document and import the initial snapshot.
1739                let doc = LoroDoc::new();
1740                doc.import(&initial_snapshot).unwrap();
1741                self.checkout(&self.shallow_since_frontiers()).unwrap();
1742                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1743
1744                // Step 3: Export updates since the shallow start version vector to the current version.
1745                let updates = self.export(ExportMode::all_updates()).unwrap();
1746
1747                // Step 4: Import these updates into the new document.
1748                doc.import(&updates).unwrap();
1749                self.checkout_to_latest();
1750
1751                // Step 5: Checkout to the current state's frontiers and compare the states.
1752                // doc.checkout(&self.state_frontiers()).unwrap();
1753                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1754                let mut calculated_state = doc.app_state().lock();
1755                let mut current_state = self.app_state().lock();
1756                current_state.check_is_the_same(&mut calculated_state);
1757            } else {
1758                let f = self.state_frontiers();
1759                let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1760                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1761                let doc = Self::new();
1762                doc.import(&bytes).unwrap();
1763                let mut calculated_state = doc.app_state().lock();
1764                let mut current_state = self.app_state().lock();
1765                current_state.check_is_the_same(&mut calculated_state);
1766            }
1767
1768            self.renew_txn_if_auto_commit(options);
1769            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1770        }
1771    }
1772
1773    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1774        self.query_pos_internal(pos, true)
1775    }
1776
1777    /// Get position in a seq container
1778    pub(crate) fn query_pos_internal(
1779        &self,
1780        pos: &Cursor,
1781        ret_event_index: bool,
1782    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1783        if !self.has_container(&pos.container) {
1784            return Err(CannotFindRelativePosition::IdNotFound);
1785        }
1786
1787        let mut state = self.state.lock();
1788        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1789            Ok(PosQueryResult {
1790                update: None,
1791                current: AbsolutePosition {
1792                    pos: ans,
1793                    side: pos.side,
1794                },
1795            })
1796        } else {
1797            // We need to trace back to the version where the relative position is valid.
1798            // The optimal way to find that version is to have succ info like Automerge.
1799            //
1800            // But we don't have that info now, so an alternative way is to trace back
1801            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1802            // the target is just deleted a few versions ago.
1803            //
1804            // What we need is to trace back to the latest version that deletes the target
1805            // id.
1806
1807            // commit the txn to make sure we can query the history correctly, preserving options
1808            drop(state);
1809            let result = self.with_barrier(|| {
1810                let oplog = self.oplog().lock();
1811                // TODO: assert pos.id is not unknown
1812                if let Some(id) = pos.id {
1813                    // Ensure the container is registered if it exists lazily
1814                    if oplog.arena.id_to_idx(&pos.container).is_none() {
1815                        let mut s = self.state.lock();
1816                        if !s.does_container_exist(&pos.container) {
1817                            return Err(CannotFindRelativePosition::ContainerDeleted);
1818                        }
1819                        s.ensure_container(&pos.container);
1820                        drop(s);
1821                    }
1822                    let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1823                    // We know where the target id is when we trace back to the delete_op_id.
1824                    let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1825                        if oplog.shallow_since_vv().includes_id(id) {
1826                            return Err(CannotFindRelativePosition::HistoryCleared);
1827                        }
1828
1829                        tracing::error!("Cannot find id {}", id);
1830                        return Err(CannotFindRelativePosition::IdNotFound);
1831                    };
1832                    // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1833                    let mut diff_calc = DiffCalculator::new(true);
1834                    let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1835                    let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1836                    // TODO: PERF: it doesn't need to calc the effects here
1837                    diff_calc.calc_diff_internal(
1838                        &oplog,
1839                        before,
1840                        &before_frontiers,
1841                        oplog.vv(),
1842                        oplog.frontiers(),
1843                        Some(&|target| idx == target),
1844                    );
1845                    // TODO: remove depth info
1846                    let depth = self.arena.get_depth(idx);
1847                    let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1848                    match diff_calc {
1849                        crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1850                            let c = text.get_id_latest_pos(id).unwrap();
1851                            let new_pos = c.pos;
1852                            let handler = self.get_text(&pos.container);
1853                            let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1854                            Ok(PosQueryResult {
1855                                update: handler.get_cursor(current_pos, c.side),
1856                                current: AbsolutePosition {
1857                                    pos: current_pos,
1858                                    side: c.side,
1859                                },
1860                            })
1861                        }
1862                        crate::diff_calc::ContainerDiffCalculator::List(list) => {
1863                            let c = list.get_id_latest_pos(id).unwrap();
1864                            let new_pos = c.pos;
1865                            let handler = self.get_list(&pos.container);
1866                            Ok(PosQueryResult {
1867                                update: handler.get_cursor(new_pos, c.side),
1868                                current: AbsolutePosition {
1869                                    pos: new_pos,
1870                                    side: c.side,
1871                                },
1872                            })
1873                        }
1874                        crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1875                            let c = list.get_id_latest_pos(id).unwrap();
1876                            let new_pos = c.pos;
1877                            let handler = self.get_movable_list(&pos.container);
1878                            let new_pos = handler.op_pos_to_user_pos(new_pos);
1879                            Ok(PosQueryResult {
1880                                update: handler.get_cursor(new_pos, c.side),
1881                                current: AbsolutePosition {
1882                                    pos: new_pos,
1883                                    side: c.side,
1884                                },
1885                            })
1886                        }
1887                        crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1888                        crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1889                        #[cfg(feature = "counter")]
1890                        crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1891                        crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1892                    }
1893                } else {
1894                    match pos.container.container_type() {
1895                        ContainerType::Text => {
1896                            let text = self.get_text(&pos.container);
1897                            Ok(PosQueryResult {
1898                                update: Some(Cursor {
1899                                    id: None,
1900                                    container: text.id(),
1901                                    side: pos.side,
1902                                    origin_pos: text.len_unicode(),
1903                                }),
1904                                current: AbsolutePosition {
1905                                    pos: text.len_event(),
1906                                    side: pos.side,
1907                                },
1908                            })
1909                        }
1910                        ContainerType::List => {
1911                            let list = self.get_list(&pos.container);
1912                            Ok(PosQueryResult {
1913                                update: Some(Cursor {
1914                                    id: None,
1915                                    container: list.id(),
1916                                    side: pos.side,
1917                                    origin_pos: list.len(),
1918                                }),
1919                                current: AbsolutePosition {
1920                                    pos: list.len(),
1921                                    side: pos.side,
1922                                },
1923                            })
1924                        }
1925                        ContainerType::MovableList => {
1926                            let list = self.get_movable_list(&pos.container);
1927                            Ok(PosQueryResult {
1928                                update: Some(Cursor {
1929                                    id: None,
1930                                    container: list.id(),
1931                                    side: pos.side,
1932                                    origin_pos: list.len(),
1933                                }),
1934                                current: AbsolutePosition {
1935                                    pos: list.len(),
1936                                    side: pos.side,
1937                                },
1938                            })
1939                        }
1940                        ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1941                            unreachable!()
1942                        }
1943                        #[cfg(feature = "counter")]
1944                        ContainerType::Counter => unreachable!(),
1945                    }
1946                }
1947            });
1948            result
1949        }
1950    }
1951
1952    /// Free the history cache that is used for making checkout faster.
1953    ///
1954    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1955    /// You can free it by calling this method.
1956    pub fn free_history_cache(&self) {
1957        self.oplog.lock().free_history_cache();
1958    }
1959
1960    /// Free the cached diff calculator that is used for checkout.
1961    pub fn free_diff_calculator(&self) {
1962        *self.diff_calculator.lock() = DiffCalculator::new(true);
1963    }
1964
1965    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1966    /// You can free it by calling `free_history_cache`.
1967    pub fn has_history_cache(&self) -> bool {
1968        self.oplog.lock().has_history_cache()
1969    }
1970
1971    /// Encoded all ops and history cache to bytes and store them in the kv store.
1972    ///
1973    /// The parsed ops will be dropped
1974    #[inline]
1975    pub fn compact_change_store(&self) {
1976        self.with_barrier(|| {
1977            self.oplog.lock().compact_change_store();
1978        });
1979    }
1980
1981    /// Analyze the container info of the doc
1982    ///
1983    /// This is used for development and debugging
1984    #[inline]
1985    pub fn analyze(&self) -> DocAnalysis {
1986        DocAnalysis::analyze(self)
1987    }
1988
1989    /// Get the path from the root to the container
1990    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1991        let mut state = self.state.lock();
1992        if state.arena.id_to_idx(id).is_none() {
1993            if !state.does_container_exist(id) {
1994                return None;
1995            }
1996            state.ensure_container(id);
1997        }
1998        let idx = state.arena.id_to_idx(id).unwrap();
1999        state.get_path(idx)
2000    }
2001
2002    #[instrument(skip(self))]
2003    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
2004        self.with_barrier(|| {
2005            let ans = match mode {
2006                ExportMode::Snapshot => export_fast_snapshot(self),
2007                ExportMode::Updates { from } => export_fast_updates(self, &from),
2008                ExportMode::UpdatesInRange { spans } => {
2009                    export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
2010                }
2011                ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
2012                ExportMode::StateOnly(f) => match f {
2013                    Some(f) => export_state_only_snapshot(self, &f)?,
2014                    None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
2015                },
2016                ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
2017            };
2018            Ok(ans)
2019        })
2020    }
2021
2022    /// The doc only contains the history since the shallow history start version vector.
2023    ///
2024    /// This is empty if the doc is not shallow.
2025    ///
2026    /// The ops included by the shallow history start version vector are not in the doc.
2027    pub fn shallow_since_vv(&self) -> ImVersionVector {
2028        self.oplog().lock().shallow_since_vv().clone()
2029    }
2030
2031    pub fn shallow_since_frontiers(&self) -> Frontiers {
2032        self.oplog().lock().shallow_since_frontiers().clone()
2033    }
2034
2035    /// Check if the doc contains the full history.
2036    pub fn is_shallow(&self) -> bool {
2037        !self.oplog().lock().shallow_since_vv().is_empty()
2038    }
2039
2040    /// Get the number of operations in the pending transaction.
2041    ///
2042    /// The pending transaction is the one that is not committed yet. It will be committed
2043    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
2044    pub fn get_pending_txn_len(&self) -> usize {
2045        if let Some(txn) = self.txn.lock().as_ref() {
2046            txn.len()
2047        } else {
2048            0
2049        }
2050    }
2051
2052    #[inline]
2053    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
2054        self.oplog().lock().dag.find_path(from, to)
2055    }
2056
2057    /// Subscribe to the first commit from a peer. Operations performed on the `LoroDoc` within this callback
2058    /// will be merged into the current commit.
2059    ///
2060    /// This is useful for managing the relationship between `PeerID` and user information.
2061    /// For example, you could store user names in a `LoroMap` using `PeerID` as the key and the `UserID` as the value.
2062    pub fn subscribe_first_commit_from_peer(
2063        &self,
2064        callback: FirstCommitFromPeerCallback,
2065    ) -> Subscription {
2066        let (s, enable) = self
2067            .first_commit_from_peer_subs
2068            .inner()
2069            .insert((), callback);
2070        enable();
2071        s
2072    }
2073
2074    /// Subscribe to the pre-commit event.
2075    ///
2076    /// The callback will be called when the changes are committed but not yet applied to the OpLog.
2077    /// You can modify the commit message and timestamp in the callback by [`ChangeModifier`].
2078    pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
2079        let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
2080        enable();
2081        s
2082    }
2083}
2084
2085fn pending_root_containers_to_materialize(oplog: &OpLog, changes: &[Change]) -> Vec<ContainerID> {
2086    let mut roots = FxHashSet::default();
2087    for change in changes {
2088        if change.ctr_end() <= oplog.vv().get(&change.id.peer).copied().unwrap_or(0) {
2089            continue;
2090        }
2091
2092        if oplog.dag.is_before_shallow_root(&change.deps)
2093            || oplog
2094                .dag
2095                .get_change_lamport_from_deps(&change.deps)
2096                .is_some()
2097        {
2098            continue;
2099        }
2100
2101        for op in change.ops.iter() {
2102            let id = oplog
2103                .arena
2104                .get_container_id(op.container)
2105                .expect("decoded op container should be registered");
2106            if id.is_root() {
2107                roots.insert(id);
2108            }
2109        }
2110    }
2111
2112    roots.into_iter().collect()
2113}
2114
2115#[derive(Debug, thiserror::Error)]
2116pub enum ChangeTravelError {
2117    #[error("Target id not found {0:?}")]
2118    TargetIdNotFound(ID),
2119    #[error("The shallow history of the doc doesn't include the target version")]
2120    TargetVersionNotIncluded,
2121}
2122
2123impl LoroDoc {
2124    pub fn travel_change_ancestors(
2125        &self,
2126        ids: &[ID],
2127        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
2128    ) -> Result<(), ChangeTravelError> {
2129        let (options, guard) = self.implicit_commit_then_stop();
2130        drop(guard);
2131        struct PendingNode(ChangeMeta);
2132        impl PartialEq for PendingNode {
2133            fn eq(&self, other: &Self) -> bool {
2134                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
2135            }
2136        }
2137
2138        impl Eq for PendingNode {}
2139        impl PartialOrd for PendingNode {
2140            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2141                Some(self.cmp(other))
2142            }
2143        }
2144
2145        impl Ord for PendingNode {
2146            fn cmp(&self, other: &Self) -> Ordering {
2147                self.0
2148                    .lamport_last()
2149                    .cmp(&other.0.lamport_last())
2150                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
2151            }
2152        }
2153
2154        for id in ids {
2155            let op_log = &self.oplog().lock();
2156            if !op_log.vv().includes_id(*id) {
2157                return Err(ChangeTravelError::TargetIdNotFound(*id));
2158            }
2159            if op_log.dag.shallow_since_vv().includes_id(*id) {
2160                return Err(ChangeTravelError::TargetVersionNotIncluded);
2161            }
2162        }
2163
2164        let mut visited = FxHashSet::default();
2165        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
2166        for id in ids {
2167            pending.push(PendingNode(ChangeMeta::from_change(
2168                &self.oplog().lock().get_change_at(*id).unwrap(),
2169            )));
2170        }
2171        while let Some(PendingNode(node)) = pending.pop() {
2172            let deps = node.deps.clone();
2173            if f(node).is_break() {
2174                break;
2175            }
2176
2177            for dep in deps.iter() {
2178                let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
2179                    continue;
2180                };
2181                if visited.contains(&dep_node.id) {
2182                    continue;
2183                }
2184
2185                visited.insert(dep_node.id);
2186                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
2187            }
2188        }
2189
2190        let ans = Ok(());
2191        self.renew_txn_if_auto_commit(options);
2192        ans
2193    }
2194
2195    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
2196        self.with_barrier(|| {
2197            let mut set = FxHashSet::default();
2198            let len = i64::try_from(len).unwrap_or(i64::MAX);
2199            let start = i64::from(id.counter);
2200            let end = start.saturating_add(len);
2201            if end <= 0 {
2202                return set;
2203            }
2204
2205            let start = start.max(0).min(i64::from(i32::MAX));
2206            let end = end.max(0).min(i64::from(i32::MAX));
2207            if start >= end {
2208                return set;
2209            }
2210
2211            {
2212                let oplog = self.oplog().lock();
2213                let span = IdSpan::new(id.peer, start as i32, end as i32);
2214                for op in oplog.iter_ops(span) {
2215                    let id = oplog.arena.get_container_id(op.container()).unwrap();
2216                    set.insert(id);
2217                }
2218            }
2219            set
2220        })
2221    }
2222
2223    pub fn delete_root_container(&self, cid: ContainerID) {
2224        if !cid.is_root() {
2225            return;
2226        }
2227
2228        // Do not treat "not in arena" as non-existence; consult state/kv
2229        if !self.has_container(&cid) {
2230            return;
2231        }
2232
2233        let Some(h) = self.get_handler(cid.clone()) else {
2234            return;
2235        };
2236
2237        self.config
2238            .deleted_root_containers
2239            .lock()
2240            .insert(cid.clone());
2241        if let Err(e) = h.clear() {
2242            self.config.deleted_root_containers.lock().remove(&cid);
2243            eprintln!("Failed to clear handler: {:?}", e);
2244        }
2245    }
2246
2247    pub fn set_hide_empty_root_containers(&self, hide: bool) {
2248        self.config
2249            .hide_empty_root_containers
2250            .store(hide, std::sync::atomic::Ordering::Relaxed);
2251    }
2252}
2253
2254fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2255    // Any delete op that covers `id` must have observed it, so its peer's counter
2256    // at delete time was > id.counter. start_vv (the vv at `id`) is therefore a
2257    // valid lower bound: changes at or before start_vv[peer] predate `id` and can
2258    // be skipped. We scan peer-by-peer rather than using the DAG-ordered
2259    // iter_changes_causally_rev, which is O(total changes).
2260    //
2261    // We choose the matching delete op with the greatest (op_lamport, peer, counter)
2262    // ordering. op_lamport is the Lamport of the specific op within the change
2263    // (change.lamport + op offset), not just the change's starting Lamport, so
2264    // concurrent deletes with equal change Lamports are broken deterministically.
2265    let start_vv = oplog
2266        .dag
2267        .frontiers_to_vv(&id.into())
2268        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2269
2270    // (op_lamport, peer) gives a deterministic total order for concurrent deletes.
2271    // A single peer cannot produce two ops with the same lamport, so peer suffices
2272    // as a tie-breaker.
2273    let mut best: Option<((loro_common::Lamport, loro_common::PeerID), ID)> = None;
2274
2275    for change in oplog.iter_changes_peer_by_peer(&start_vv, oplog.vv()) {
2276        let peer = change.peer();
2277        for op in change.ops.iter() {
2278            if op.container != idx {
2279                continue;
2280            }
2281            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2282                if d.id_start.to_span(d.atom_len()).contains(id) {
2283                    debug_assert!(op.counter >= change.id().counter);
2284                    let op_lamport =
2285                        change.lamport + (op.counter - change.id().counter) as loro_common::Lamport;
2286                    let key = (op_lamport, peer);
2287                    if best.is_none_or(|(bk, _)| key > bk) {
2288                        best = Some((key, ID::new(peer, op.counter)));
2289                    }
2290                }
2291            }
2292        }
2293    }
2294
2295    best.map(|(_, op_id)| op_id)
2296}
2297
2298#[derive(Debug)]
2299pub struct CommitWhenDrop<'a> {
2300    doc: &'a LoroDoc,
2301    default_options: CommitOptions,
2302}
2303
2304impl Drop for CommitWhenDrop<'_> {
2305    fn drop(&mut self) {
2306        {
2307            let mut guard = self.doc.txn.lock();
2308            if let Some(txn) = guard.as_mut() {
2309                txn.set_default_options(std::mem::take(&mut self.default_options));
2310            };
2311        }
2312
2313        self.doc.commit_then_renew();
2314    }
2315}
2316
2317/// Options for configuring a commit operation.
2318#[derive(Debug, Clone)]
2319pub struct CommitOptions {
2320    /// Origin identifier for the commit event, used to track the source of changes.
2321    /// It doesn't persist.
2322    pub origin: Option<InternalString>,
2323
2324    /// Whether to immediately start a new transaction after committing.
2325    /// Defaults to true.
2326    pub immediate_renew: bool,
2327
2328    /// Custom timestamp for the commit in seconds since Unix epoch.
2329    /// If None, the current time will be used.
2330    pub timestamp: Option<Timestamp>,
2331
2332    /// Optional commit message to attach to the changes. It will be persisted.
2333    pub commit_msg: Option<Arc<str>>,
2334}
2335
2336impl CommitOptions {
2337    /// Creates a new CommitOptions with default values.
2338    pub fn new() -> Self {
2339        Self {
2340            origin: None,
2341            immediate_renew: true,
2342            timestamp: None,
2343            commit_msg: None,
2344        }
2345    }
2346
2347    /// Sets the origin identifier for this commit.
2348    pub fn origin(mut self, origin: &str) -> Self {
2349        self.origin = Some(origin.into());
2350        self
2351    }
2352
2353    /// Sets whether to immediately start a new transaction after committing.
2354    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2355        self.immediate_renew = immediate_renew;
2356        self
2357    }
2358
2359    /// Set the timestamp of the commit.
2360    ///
2361    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
2362    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2363        self.timestamp = Some(timestamp);
2364        self
2365    }
2366
2367    /// Sets a commit message to be attached to the changes.
2368    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2369        self.commit_msg = Some(commit_msg.into());
2370        self
2371    }
2372
2373    /// Sets the origin identifier for this commit.
2374    pub fn set_origin(&mut self, origin: Option<&str>) {
2375        self.origin = origin.map(|x| x.into())
2376    }
2377
2378    /// Sets the timestamp for this commit.
2379    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2380        self.timestamp = timestamp;
2381    }
2382}
2383
2384impl Default for CommitOptions {
2385    fn default() -> Self {
2386        Self::new()
2387    }
2388}
2389
2390#[cfg(test)]
2391mod test {
2392    use std::{
2393        panic::AssertUnwindSafe,
2394        sync::{
2395            atomic::{AtomicUsize, Ordering},
2396            Arc,
2397        },
2398    };
2399
2400    use crate::{
2401        cursor::PosType,
2402        encoding::json_schema::json::{JsonOpContent, JsonSchema, ListOp},
2403        encoding::{fast_snapshot::EMPTY_MARK, EncodeMode},
2404        loro::ExportMode,
2405        version::{Frontiers, VersionVector},
2406        LoroDoc, ToJson, TreeParentId,
2407    };
2408    use bytes::{BufMut, Bytes};
2409    use loro_common::ID;
2410    use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
2411
2412    const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
2413
2414    fn encode_import_blob(mode: EncodeMode, body: &[u8]) -> Vec<u8> {
2415        let mut ans = Vec::new();
2416        ans.extend_from_slice(b"loro");
2417        ans.extend_from_slice(&[0; 16]);
2418        ans.extend_from_slice(&mode.to_bytes());
2419        ans.extend_from_slice(body);
2420        let checksum = xxhash_rust::xxh32::xxh32(&ans[20..], XXH_SEED);
2421        ans[16..20].copy_from_slice(&checksum.to_le_bytes());
2422        ans
2423    }
2424
2425    fn encode_fast_snapshot_import(oplog_bytes: &[u8]) -> Vec<u8> {
2426        let mut body = Vec::new();
2427        body.put_u32_le(oplog_bytes.len() as u32);
2428        body.extend_from_slice(oplog_bytes);
2429        body.put_u32_le(EMPTY_MARK.len() as u32);
2430        body.extend_from_slice(EMPTY_MARK);
2431        body.put_u32_le(0);
2432        encode_import_blob(EncodeMode::FastSnapshot, &body)
2433    }
2434
2435    fn sstable_with_huge_meta_block_count() -> Vec<u8> {
2436        let mut bytes = Vec::new();
2437        bytes.extend_from_slice(b"LORO");
2438        bytes.push(0);
2439        bytes.put_u32_le(10_000_000);
2440        bytes.put_u32_le(xxhash_rust::xxh32::xxh32(&[], XXH_SEED));
2441        bytes.put_u32_le(5);
2442        bytes
2443    }
2444
2445    fn snapshot_oplog_with_malformed_block() -> Vec<u8> {
2446        let peer = 1;
2447        let id = ID::new(peer, 0);
2448        let vv = VersionVector::from_iter([(peer, 1)]);
2449        let frontiers = Frontiers::from_id(id);
2450        let mut store = MemKvStore::new(MemKvConfig::default());
2451        store.set(b"vv", vv.encode().into());
2452        store.set(b"fr", frontiers.encode().into());
2453        store.set(&id.to_bytes(), Bytes::from_static(&[0]));
2454        store.export_all().to_vec()
2455    }
2456
2457    fn make_json_import_stress_doc(peer: u64) -> LoroDoc {
2458        let doc = LoroDoc::new_auto_commit();
2459        doc.set_peer_id(peer).unwrap();
2460
2461        let text = doc.get_text("text");
2462        let mut text_pos = 0;
2463        for i in 0..32 {
2464            let chunk = format!("segment-{i}-abcdefghijklmnopqrstuvwxyz;");
2465            text.insert_unicode(text_pos, &chunk).unwrap();
2466            text_pos += chunk.chars().count();
2467        }
2468
2469        let list = doc.get_list("list");
2470        for i in 0..32 {
2471            list.insert(i, format!("item-{i}")).unwrap();
2472        }
2473
2474        let map = doc.get_map("map");
2475        for i in 0..32 {
2476            let key = format!("key-{i}");
2477            map.insert(&key, format!("value-{i}")).unwrap();
2478        }
2479
2480        let tree = doc.get_tree("tree");
2481        let mut parent = TreeParentId::Root;
2482        for i in 0..16 {
2483            let node = tree.create(parent).unwrap();
2484            let meta = tree.get_meta(node).unwrap();
2485            meta.insert("name", format!("node-{i}")).unwrap();
2486            meta.insert("payload", format!("payload-{i}-{}", "x".repeat(16)))
2487                .unwrap();
2488            parent = TreeParentId::Node(node);
2489        }
2490
2491        doc
2492    }
2493
2494    fn make_json_list_update_with_four_ops(peer: u64) -> (LoroDoc, JsonSchema) {
2495        let doc = LoroDoc::new();
2496        doc.set_peer_id(peer).unwrap();
2497        let map = doc.get_map("map");
2498        let list = doc.get_list("list");
2499        let text = doc.get_text("text");
2500
2501        let mut txn = doc.txn().unwrap();
2502        map.insert_with_txn(&mut txn, "prefix", "map-value".into())
2503            .unwrap();
2504        list.insert_with_txn(&mut txn, 0, "seed".into()).unwrap();
2505        text.insert_with_txn(&mut txn, 0, "text-value", PosType::Unicode)
2506            .unwrap();
2507        list.insert_with_txn(&mut txn, 1, "tail".into()).unwrap();
2508        txn.commit().unwrap();
2509
2510        let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv(), false);
2511        assert_eq!(json.changes.len(), 1);
2512        assert_eq!(json.changes[0].ops.len(), 4);
2513        (doc, json)
2514    }
2515
2516    fn move_last_list_insert_far_out_of_bounds(json: &mut JsonSchema) {
2517        let last_change = json.changes.last_mut().unwrap();
2518        let last_op = last_change.ops.last_mut().unwrap();
2519        match &mut last_op.content {
2520            JsonOpContent::List(ListOp::Insert { pos, .. }) => {
2521                *pos = 1_000;
2522            }
2523            other => panic!("expected list insert op, got {other:?}"),
2524        }
2525    }
2526
2527    #[test]
2528    fn test_sync() {
2529        fn is_send_sync<T: Send + Sync>(_v: T) {}
2530        let loro = super::LoroDoc::new();
2531        is_send_sync(loro)
2532    }
2533
2534    #[test]
2535    fn import_rejects_huge_sstable_meta_block_count_without_panic() {
2536        let bytes = encode_fast_snapshot_import(&sstable_with_huge_meta_block_count());
2537
2538        let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2539        assert!(result.is_ok(), "malformed import should not panic");
2540        assert!(result.unwrap().is_err());
2541    }
2542
2543    #[test]
2544    fn import_rejects_malformed_change_block_without_panic() {
2545        let bytes = encode_fast_snapshot_import(&snapshot_oplog_with_malformed_block());
2546
2547        let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2548        assert!(result.is_ok(), "malformed import should not panic");
2549        assert!(result.unwrap().is_err());
2550    }
2551
2552    #[test]
2553    fn failed_import_rolls_back_oplog_and_arena() {
2554        let src = LoroDoc::new();
2555        src.set_peer_id(1).unwrap();
2556        let text = src.get_text("text");
2557        let mut txn = src.txn().unwrap();
2558        text.insert_with_txn(&mut txn, 0, "hello", PosType::Unicode)
2559            .unwrap();
2560        txn.commit().unwrap();
2561        let update = src.export(ExportMode::all_updates()).unwrap();
2562
2563        let dst = LoroDoc::new();
2564        let vv_before_import = dst.oplog_vv();
2565        let state_before_import = dst.get_deep_value();
2566        let err = dst
2567            .import_with(&update, "__loro_fail_import_state_apply".into())
2568            .unwrap_err();
2569        assert!(err.to_string().contains("state apply failpoint"));
2570        assert_eq!(dst.oplog_vv(), vv_before_import);
2571        assert_eq!(dst.get_deep_value(), state_before_import);
2572        assert!(dst.oplog().lock().is_empty());
2573
2574        dst.import(&update).unwrap();
2575        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2576    }
2577
2578    #[test]
2579    fn failed_incremental_import_restores_previous_change_store_block() {
2580        let src = LoroDoc::new();
2581        src.set_peer_id(1).unwrap();
2582        let text = src.get_text("text");
2583        let mut txn = src.txn().unwrap();
2584        text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2585            .unwrap();
2586        txn.commit().unwrap();
2587        let first_update = src.export(ExportMode::all_updates()).unwrap();
2588        let first_vv = src.oplog_vv();
2589
2590        let mut txn = src.txn().unwrap();
2591        text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2592            .unwrap();
2593        txn.commit().unwrap();
2594        let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2595
2596        let dst = LoroDoc::new();
2597        dst.import(&first_update).unwrap();
2598        let vv_before_import = dst.oplog_vv();
2599        let state_before_import = dst.get_deep_value();
2600        dst.import_with(&second_update, "__loro_fail_import_state_apply".into())
2601            .unwrap_err();
2602        assert_eq!(dst.oplog_vv(), vv_before_import);
2603        assert_eq!(dst.get_deep_value(), state_before_import);
2604
2605        dst.import(&second_update).unwrap();
2606        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2607    }
2608
2609    #[test]
2610    fn failed_import_json_updates_rolls_back_complex_empty_doc() {
2611        let src = make_json_import_stress_doc(11);
2612        let json = src.export_json_updates(&Default::default(), &src.oplog_vv(), false);
2613
2614        let dst = LoroDoc::new();
2615        let vv_before_import = dst.oplog_vv();
2616        let frontiers_before_import = dst.oplog_frontiers();
2617        let state_before_import = dst.get_deep_value();
2618        for _ in 0..3 {
2619            crate::state::fail_next_import_state_apply_for_test();
2620            let err = dst.import_json_updates(json.clone()).unwrap_err();
2621            assert!(err.to_string().contains("state apply failpoint"));
2622            assert_eq!(dst.oplog_vv(), vv_before_import);
2623            assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2624            assert_eq!(dst.get_deep_value(), state_before_import);
2625            assert!(dst.oplog().lock().is_empty());
2626        }
2627
2628        dst.import_json_updates(json).unwrap();
2629        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2630        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2631        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2632    }
2633
2634    #[test]
2635    fn failed_incremental_import_json_updates_restores_previous_change_store_block() {
2636        let src = LoroDoc::new_auto_commit();
2637        src.set_peer_id(12).unwrap();
2638        let text = src.get_text("text");
2639        text.insert_unicode(0, "a").unwrap();
2640        let list = src.get_list("list");
2641        list.push("seed").unwrap();
2642        let map = src.get_map("map");
2643        map.insert("seed", "value").unwrap();
2644        let tree = src.get_tree("tree");
2645        let root = tree.create(TreeParentId::Root).unwrap();
2646        tree.get_meta(root).unwrap().insert("name", "root").unwrap();
2647
2648        let first_vv = src.oplog_vv();
2649        let first_json = src.export_json_updates(&Default::default(), &first_vv, false);
2650
2651        let mut text_pos = text.len_unicode();
2652        for i in 0..64 {
2653            let chunk = format!("chunk-{i};");
2654            text.insert_unicode(text_pos, &chunk).unwrap();
2655            text_pos += chunk.chars().count();
2656        }
2657        for i in 0..32 {
2658            list.push(format!("after-{i}")).unwrap();
2659            let key = format!("after-{i}");
2660            map.insert(&key, format!("value-{i}")).unwrap();
2661        }
2662        let child = tree.create(TreeParentId::Node(root)).unwrap();
2663        tree.get_meta(child)
2664            .unwrap()
2665            .insert("name", "child")
2666            .unwrap();
2667
2668        let second_json = src.export_json_updates(&first_vv, &src.oplog_vv(), false);
2669
2670        let dst = LoroDoc::new();
2671        dst.import_json_updates(first_json).unwrap();
2672        let vv_before_import = dst.oplog_vv();
2673        let frontiers_before_import = dst.oplog_frontiers();
2674        let state_before_import = dst.get_deep_value();
2675
2676        for _ in 0..2 {
2677            crate::state::fail_next_import_state_apply_for_test();
2678            let err = dst.import_json_updates(second_json.clone()).unwrap_err();
2679            assert!(err.to_string().contains("state apply failpoint"));
2680            assert_eq!(dst.oplog_vv(), vv_before_import);
2681            assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2682            assert_eq!(dst.get_deep_value(), state_before_import);
2683        }
2684
2685        dst.import_json_updates(second_json).unwrap();
2686        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2687        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2688        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2689    }
2690
2691    #[test]
2692    fn malformed_later_import_json_update_rolls_back_after_valid_prefix_enters_oplog() {
2693        let peer = 13;
2694        let (src, good_json) = make_json_list_update_with_four_ops(peer);
2695        let mut bad_json = good_json.clone();
2696        move_last_list_insert_far_out_of_bounds(&mut bad_json);
2697
2698        let good_dst = LoroDoc::new();
2699        good_dst.import_json_updates(good_json.clone()).unwrap();
2700        assert_eq!(good_dst.get_deep_value(), src.get_deep_value());
2701
2702        let last_op_counter = good_json.changes[0].ops.last().unwrap().counter;
2703        let prefix_vv = VersionVector::from_iter([(peer, last_op_counter)]);
2704        let prefix_json = src.export_json_updates(&Default::default(), &prefix_vv, false);
2705        assert_eq!(
2706            prefix_json.changes[0].ops.len(),
2707            good_json.changes[0].ops.len() - 1
2708        );
2709        let good_suffix_json = src.export_json_updates(&prefix_vv, &src.oplog_vv(), false);
2710        assert_eq!(good_suffix_json.changes[0].ops.len(), 1);
2711        let mut bad_suffix_json = good_suffix_json.clone();
2712        move_last_list_insert_far_out_of_bounds(&mut bad_suffix_json);
2713
2714        let prefix_dst = LoroDoc::new();
2715        prefix_dst.import_json_updates(prefix_json.clone()).unwrap();
2716        let vv_before_bad_suffix = prefix_dst.oplog_vv();
2717        let frontiers_before_bad_suffix = prefix_dst.oplog_frontiers();
2718        let state_before_bad_suffix = prefix_dst.get_deep_value();
2719
2720        let bad_suffix_json = serde_json::to_string(&bad_suffix_json).unwrap();
2721        let err = prefix_dst
2722            .import_json_updates(&bad_suffix_json)
2723            .unwrap_err();
2724        assert!(
2725            err.to_string().contains("list diff"),
2726            "expected state list bounds validation, got {err:?}"
2727        );
2728        assert_eq!(prefix_dst.oplog_vv(), vv_before_bad_suffix);
2729        assert_eq!(prefix_dst.oplog_frontiers(), frontiers_before_bad_suffix);
2730        assert_eq!(prefix_dst.get_deep_value(), state_before_bad_suffix);
2731
2732        prefix_dst.import_json_updates(good_suffix_json).unwrap();
2733        assert_eq!(prefix_dst.get_deep_value(), src.get_deep_value());
2734        assert_eq!(prefix_dst.oplog_vv(), src.oplog_vv());
2735
2736        let dst = LoroDoc::new();
2737        let vv_before_import = dst.oplog_vv();
2738        let frontiers_before_import = dst.oplog_frontiers();
2739        let state_before_import = dst.get_deep_value();
2740        let bad_json = serde_json::to_string(&bad_json).unwrap();
2741        let err = dst.import_json_updates(&bad_json).unwrap_err();
2742        assert!(
2743            err.to_string().contains("list diff"),
2744            "expected state list bounds validation, got {err:?}"
2745        );
2746        assert_eq!(dst.oplog_vv(), vv_before_import);
2747        assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2748        assert_eq!(dst.get_deep_value(), state_before_import);
2749        assert!(dst.oplog().lock().is_empty());
2750    }
2751
2752    #[test]
2753    fn failed_import_restores_pending_changes_that_were_applied_during_import() {
2754        let src = LoroDoc::new();
2755        src.set_peer_id(14).unwrap();
2756        let text = src.get_text("text");
2757
2758        let mut txn = src.txn().unwrap();
2759        text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2760            .unwrap();
2761        txn.commit().unwrap();
2762        let first_update = src.export(ExportMode::all_updates()).unwrap();
2763        let first_vv = src.oplog_vv();
2764
2765        let mut txn = src.txn().unwrap();
2766        text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2767            .unwrap();
2768        txn.commit().unwrap();
2769        let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2770
2771        let dst = LoroDoc::new();
2772        let status = dst.import(&second_update).unwrap();
2773        assert!(status.success.is_empty());
2774        assert!(status.pending.is_some());
2775        let vv_before_dependency = dst.oplog_vv();
2776        let frontiers_before_dependency = dst.oplog_frontiers();
2777        let state_before_dependency = dst.get_deep_value();
2778
2779        crate::state::fail_next_import_state_apply_for_test();
2780        let err = dst.import(&first_update).unwrap_err();
2781        assert!(err.to_string().contains("state apply failpoint"));
2782        assert_eq!(dst.oplog_vv(), vv_before_dependency);
2783        assert_eq!(dst.oplog_frontiers(), frontiers_before_dependency);
2784        assert_eq!(dst.get_deep_value(), state_before_dependency);
2785
2786        dst.import(&first_update).unwrap();
2787        assert_eq!(dst.oplog_vv(), src.oplog_vv());
2788        assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2789        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2790    }
2791
2792    #[test]
2793    fn failed_import_json_updates_does_not_emit_or_leave_events() {
2794        let (src, good_json) = make_json_list_update_with_four_ops(15);
2795        let mut bad_json = good_json.clone();
2796        move_last_list_insert_far_out_of_bounds(&mut bad_json);
2797
2798        let dst = LoroDoc::new();
2799        let event_count = Arc::new(AtomicUsize::new(0));
2800        let event_count_cloned = event_count.clone();
2801        let _sub = dst.subscribe_root(Arc::new(move |_| {
2802            event_count_cloned.fetch_add(1, Ordering::SeqCst);
2803        }));
2804
2805        let bad_json = serde_json::to_string(&bad_json).unwrap();
2806        let err = dst.import_json_updates(&bad_json).unwrap_err();
2807        assert!(
2808            err.to_string().contains("list diff"),
2809            "expected state list bounds validation, got {err:?}"
2810        );
2811        assert_eq!(event_count.load(Ordering::SeqCst), 0);
2812        assert!(dst.drop_pending_events().is_empty());
2813        assert!(dst.oplog().lock().is_empty());
2814
2815        dst.import_json_updates(good_json).unwrap();
2816        assert_eq!(event_count.load(Ordering::SeqCst), 1);
2817        assert_eq!(dst.get_deep_value(), src.get_deep_value());
2818    }
2819
2820    #[test]
2821    fn test_checkout() {
2822        let loro = LoroDoc::new();
2823        loro.set_peer_id(1).unwrap();
2824        let text = loro.get_text("text");
2825        let map = loro.get_map("map");
2826        let list = loro.get_list("list");
2827        let mut txn = loro.txn().unwrap();
2828        for i in 0..10 {
2829            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2830            text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2831                .unwrap();
2832            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2833        }
2834        txn.commit().unwrap();
2835        let b = LoroDoc::new();
2836        b.import(&loro.export(ExportMode::Snapshot).unwrap())
2837            .unwrap();
2838        loro.checkout(&Frontiers::default()).unwrap();
2839        {
2840            let json = &loro.get_deep_value();
2841            assert_eq!(
2842                json.to_json_value(),
2843                serde_json::json!({"text":"","list":[],"map":{}})
2844            );
2845        }
2846
2847        b.checkout(&ID::new(1, 2).into()).unwrap();
2848        {
2849            let json = &b.get_deep_value();
2850            assert_eq!(
2851                json.to_json_value(),
2852                serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2853            );
2854        }
2855
2856        loro.checkout(&ID::new(1, 3).into()).unwrap();
2857        {
2858            let json = &loro.get_deep_value();
2859            assert_eq!(
2860                json.to_json_value(),
2861                serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2862            );
2863        }
2864
2865        b.checkout(&ID::new(1, 29).into()).unwrap();
2866        {
2867            let json = &b.get_deep_value();
2868            assert_eq!(
2869                json.to_json_value(),
2870                serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2871            );
2872        }
2873    }
2874
2875    #[test]
2876    fn import_batch_err_181() {
2877        let a = LoroDoc::new_auto_commit();
2878        let update_a = a.export(ExportMode::Snapshot);
2879        let b = LoroDoc::new_auto_commit();
2880        b.import_batch(&[update_a.unwrap()]).unwrap();
2881        b.get_text("text")
2882            .insert(0, "hello", PosType::Unicode)
2883            .unwrap();
2884        b.commit_then_renew();
2885        let oplog = b.oplog().lock();
2886        drop(oplog);
2887        b.export(ExportMode::all_updates()).unwrap();
2888    }
2889
2890    #[test]
2891    fn poisoned_mutex_keeps_follow_up_operations_failed() {
2892        let doc = LoroDoc::new();
2893        let oplog = doc.oplog.clone();
2894        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2895            let _guard = oplog.lock();
2896            panic!("poison oplog");
2897        }));
2898
2899        let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2900            .expect_err("poisoned lock should continue to fail fast");
2901        let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2902            (*msg).to_string()
2903        } else if let Some(msg) = err.downcast_ref::<String>() {
2904            msg.clone()
2905        } else {
2906            String::new()
2907        };
2908        assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2909    }
2910}