loro_internal/
loro.rs

1use crate::change::ChangeRef;
2pub use crate::encoding::ExportMode;
3pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
4pub(crate) use crate::LoroDocInner;
5use crate::{
6    arena::SharedArena,
7    change::Timestamp,
8    configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
9    container::{
10        idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
11        IntoContainerId,
12    },
13    cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
14    dag::{Dag, DagUtils},
15    diff_calc::DiffCalculator,
16    encoding::{
17        self, decode_snapshot, export_fast_snapshot, export_fast_updates,
18        export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
19        export_state_only_snapshot,
20        json_schema::{encode_change_to_json, json::JsonSchema},
21        parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
22    },
23    event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
24    handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
25    id::PeerID,
26    json::JsonChange,
27    op::InnerContent,
28    oplog::{loro_dag::FrontiersNotIncluded, OpLog},
29    state::DocState,
30    subscription::{LocalUpdateCallback, Observer, Subscriber},
31    undo::DiffBatch,
32    utils::subscription::{SubscriberSetWithQueue, Subscription},
33    version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
34    ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
35    VersionVector,
36};
37use either::Either;
38use fxhash::{FxHashMap, FxHashSet};
39use loro_common::{
40    ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
41    LoroValue, ID,
42};
43use rle::HasLength;
44use std::{
45    borrow::Cow,
46    cmp::Ordering,
47    collections::{hash_map::Entry, BinaryHeap},
48    ops::ControlFlow,
49    sync::{
50        atomic::{
51            AtomicBool,
52            Ordering::{Acquire, Release},
53        },
54        Arc, Mutex,
55    },
56};
57use tracing::{debug_span, info, info_span, instrument, warn};
58
59impl Default for LoroDoc {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl std::fmt::Debug for LoroDocInner {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("LoroDoc")
68            .field("config", &self.config)
69            .field("auto_commit", &self.auto_commit)
70            .field("detached", &self.detached)
71            .finish()
72    }
73}
74
75impl LoroDoc {
76    pub fn new() -> Self {
77        let oplog = OpLog::new();
78        let arena = oplog.arena.clone();
79        let config: Configure = oplog.configure.clone();
80        let global_txn = Arc::new(Mutex::new(None));
81        let inner = Arc::new_cyclic(|w| {
82            let state = DocState::new_arc(w.clone(), arena.clone(), config.clone());
83            LoroDocInner {
84                oplog: Arc::new(Mutex::new(oplog)),
85                state,
86                config,
87                detached: AtomicBool::new(false),
88                auto_commit: AtomicBool::new(false),
89                observer: Arc::new(Observer::new(arena.clone())),
90                diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
91                txn: global_txn,
92                arena,
93                local_update_subs: SubscriberSetWithQueue::new(),
94                peer_id_change_subs: SubscriberSetWithQueue::new(),
95            }
96        });
97        Self { inner }
98    }
99
100    pub fn fork(&self) -> Self {
101        if self.is_detached() {
102            return self.fork_at(&self.state_frontiers());
103        }
104
105        let options = self.commit_then_stop();
106        let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self);
107        let doc = Self::new();
108        encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc).unwrap();
109        doc.set_config(&self.config);
110        if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
111            doc.start_auto_commit();
112        }
113        self.renew_txn_if_auto_commit(options);
114        doc
115    }
116
117    /// Enables editing of the document in detached mode.
118    ///
119    /// By default, the document cannot be edited in detached mode (after calling
120    /// `detach` or checking out a version other than the latest). This method
121    /// allows editing in detached mode.
122    ///
123    /// # Important Notes:
124    ///
125    /// - After enabling this mode, the document will use a different PeerID. Each
126    ///   time you call checkout, a new PeerID will be used.
127    /// - If you set a custom PeerID while this mode is enabled, ensure that
128    ///   concurrent operations with the same PeerID are not possible.
129    /// - On detached mode, importing will not change the state of the document.
130    ///   It also doesn't change the version of the [DocState]. The changes will be
131    ///   recorded into [OpLog] only. You need to call `checkout` to make it take effect.
132    pub fn set_detached_editing(&self, enable: bool) {
133        self.config.set_detached_editing(enable);
134        if enable && self.is_detached() {
135            let options = self.commit_then_stop();
136            self.renew_peer_id();
137            self.renew_txn_if_auto_commit(options);
138        }
139    }
140
141    /// Create a doc with auto commit enabled.
142    #[inline]
143    pub fn new_auto_commit() -> Self {
144        let doc = Self::new();
145        doc.start_auto_commit();
146        doc
147    }
148
149    #[inline(always)]
150    pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
151        if peer == PeerID::MAX {
152            return Err(LoroError::InvalidPeerID);
153        }
154        let next_id = self.oplog.try_lock().unwrap().next_id(peer);
155        if self.auto_commit.load(Acquire) {
156            let doc_state = self.state.try_lock().unwrap();
157            doc_state
158                .peer
159                .store(peer, std::sync::atomic::Ordering::Relaxed);
160            drop(doc_state);
161
162            let txn = self.txn.try_lock().unwrap().take();
163            if let Some(txn) = txn {
164                txn.commit().unwrap();
165            }
166
167            let new_txn = self.txn().unwrap();
168            self.txn.try_lock().unwrap().replace(new_txn);
169            self.peer_id_change_subs.emit(&(), next_id);
170            return Ok(());
171        }
172
173        let doc_state = self.state.try_lock().unwrap();
174        if doc_state.is_in_txn() {
175            return Err(LoroError::TransactionError(
176                "Cannot change peer id during transaction"
177                    .to_string()
178                    .into_boxed_str(),
179            ));
180        }
181
182        doc_state
183            .peer
184            .store(peer, std::sync::atomic::Ordering::Relaxed);
185        drop(doc_state);
186        self.peer_id_change_subs.emit(&(), next_id);
187        Ok(())
188    }
189
190    /// Renews the PeerID for the document.
191    pub(crate) fn renew_peer_id(&self) {
192        let peer_id = DefaultRandom.next_u64();
193        self.set_peer_id(peer_id).unwrap();
194    }
195
196    /// Commit the cumulative auto commit transaction.
197    /// This method only has effect when `auto_commit` is true.
198    ///
199    /// Afterwards, the users need to call `self.renew_txn_after_commit()` to resume the continuous transaction.
200    ///
201    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
202    #[inline]
203    #[must_use]
204    pub fn commit_then_stop(&self) -> Option<CommitOptions> {
205        self.commit_with(CommitOptions::new().immediate_renew(false))
206    }
207
208    /// Commit the cumulative auto commit transaction.
209    /// It will start the next one immediately
210    ///
211    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
212    #[inline]
213    pub fn commit_then_renew(&self) -> Option<CommitOptions> {
214        self.commit_with(CommitOptions::new().immediate_renew(true))
215    }
216
217    /// Commit the cumulative auto commit transaction.
218    /// This method only has effect when `auto_commit` is true.
219    /// If `immediate_renew` is true, a new transaction will be created after the old one is committed
220    ///
221    /// It only returns Some(options_of_the_empty_txn) when the txn is empty
222    #[instrument(skip_all)]
223    pub fn commit_with(&self, config: CommitOptions) -> Option<CommitOptions> {
224        if !self.auto_commit.load(Acquire) {
225            // if not auto_commit, nothing should happen
226            // because the global txn is not used
227            return None;
228        }
229
230        let mut txn_guard = self.txn.try_lock().unwrap();
231        let txn = txn_guard.take();
232        drop(txn_guard);
233        let mut txn = txn?;
234        let on_commit = txn.take_on_commit();
235        if let Some(origin) = config.origin {
236            txn.set_origin(origin);
237        }
238
239        if let Some(timestamp) = config.timestamp {
240            txn.set_timestamp(timestamp);
241        }
242
243        if let Some(msg) = config.commit_msg.as_ref() {
244            txn.set_msg(Some(msg.clone()));
245        }
246
247        let id_span = txn.id_span();
248        let options = txn.commit().unwrap();
249        if config.immediate_renew {
250            let mut txn_guard = self.txn.try_lock().unwrap();
251            assert!(self.can_edit());
252            let mut t = self.txn().unwrap();
253            if let Some(options) = options.as_ref() {
254                t.set_options(options.clone());
255            }
256            *txn_guard = Some(t);
257        }
258
259        if let Some(on_commit) = on_commit {
260            on_commit(&self.state, &self.oplog, id_span);
261        }
262
263        options
264    }
265
266    /// Set the commit message of the next commit
267    pub fn set_next_commit_message(&self, message: &str) {
268        let mut binding = self.txn.try_lock().unwrap();
269        let Some(txn) = binding.as_mut() else {
270            return;
271        };
272
273        if message.is_empty() {
274            txn.set_msg(None)
275        } else {
276            txn.set_msg(Some(message.into()))
277        }
278    }
279
280    /// Set the origin of the next commit
281    pub fn set_next_commit_origin(&self, origin: &str) {
282        let mut txn = self.txn.try_lock().unwrap();
283        if let Some(txn) = txn.as_mut() {
284            txn.set_origin(origin.into());
285        }
286    }
287
288    /// Set the timestamp of the next commit
289    pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
290        let mut txn = self.txn.try_lock().unwrap();
291        if let Some(txn) = txn.as_mut() {
292            txn.set_timestamp(timestamp);
293        }
294    }
295
296    /// Set the options of the next commit
297    pub fn set_next_commit_options(&self, options: CommitOptions) {
298        let mut txn = self.txn.try_lock().unwrap();
299        if let Some(txn) = txn.as_mut() {
300            txn.set_options(options);
301        }
302    }
303
304    /// Clear the options of the next commit
305    pub fn clear_next_commit_options(&self) {
306        let mut txn = self.txn.try_lock().unwrap();
307        if let Some(txn) = txn.as_mut() {
308            txn.set_options(CommitOptions::new());
309        }
310    }
311
312    /// Set whether to record the timestamp of each change. Default is `false`.
313    ///
314    /// If enabled, the Unix timestamp will be recorded for each change automatically.
315    ///
316    /// You can also set each timestamp manually when you commit a change.
317    /// The timestamp manually set will override the automatic one.
318    ///
319    /// NOTE: Timestamps are forced to be in ascending order.
320    /// If you commit a new change with a timestamp that is less than the existing one,
321    /// the largest existing timestamp will be used instead.
322    #[inline]
323    pub fn set_record_timestamp(&self, record: bool) {
324        self.config.set_record_timestamp(record);
325    }
326
327    /// Set the interval of mergeable changes, in seconds.
328    ///
329    /// If two continuous local changes are within the interval, they will be merged into one change.
330    /// The default value is 1000 seconds.
331    #[inline]
332    pub fn set_change_merge_interval(&self, interval: i64) {
333        self.config.set_merge_interval(interval);
334    }
335
336    pub fn can_edit(&self) -> bool {
337        !self.is_detached() || self.config.detached_editing()
338    }
339
340    pub fn is_detached_editing_enabled(&self) -> bool {
341        self.config.detached_editing()
342    }
343
344    #[inline]
345    pub fn config_text_style(&self, text_style: StyleConfigMap) {
346        self.config.text_style_config.try_write().unwrap().map = text_style.map;
347    }
348
349    #[inline]
350    pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
351        self.config
352            .text_style_config
353            .try_write()
354            .unwrap()
355            .default_style = text_style;
356    }
357    pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
358        let doc = Self::new();
359        let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
360        if mode.is_snapshot() {
361            decode_snapshot(&doc, mode, body)?;
362            Ok(doc)
363        } else {
364            Err(LoroError::DecodeError(
365                "Invalid encode mode".to_string().into(),
366            ))
367        }
368    }
369
370    /// Is the document empty? (no ops)
371    #[inline(always)]
372    pub fn can_reset_with_snapshot(&self) -> bool {
373        let oplog = self.oplog.try_lock().unwrap();
374        if oplog.batch_importing {
375            return false;
376        }
377
378        if self.is_detached() {
379            return false;
380        }
381
382        oplog.is_empty() && self.state.try_lock().unwrap().can_import_snapshot()
383    }
384
385    /// Whether [OpLog] and [DocState] are detached.
386    ///
387    /// If so, the document is in readonly mode by default and importing will not change the state of the document.
388    /// It also doesn't change the version of the [DocState]. The changes will be recorded into [OpLog] only.
389    /// You need to call `checkout` to make it take effect.
390    #[inline(always)]
391    pub fn is_detached(&self) -> bool {
392        self.detached.load(Acquire)
393    }
394
395    pub(crate) fn set_detached(&self, detached: bool) {
396        self.detached.store(detached, Release);
397    }
398
399    #[inline(always)]
400    pub fn peer_id(&self) -> PeerID {
401        self.state
402            .try_lock()
403            .unwrap()
404            .peer
405            .load(std::sync::atomic::Ordering::Relaxed)
406    }
407
408    #[inline(always)]
409    pub fn detach(&self) {
410        let options = self.commit_then_stop();
411        self.set_detached(true);
412        self.renew_txn_if_auto_commit(options);
413    }
414
415    #[inline(always)]
416    pub fn attach(&self) {
417        self.checkout_to_latest()
418    }
419
420    /// Get the timestamp of the current state.
421    /// It's the last edit time of the [DocState].
422    pub fn state_timestamp(&self) -> Timestamp {
423        let f = &self.state.try_lock().unwrap().frontiers;
424        self.oplog.try_lock().unwrap().get_timestamp_of_version(f)
425    }
426
427    #[inline(always)]
428    pub fn app_state(&self) -> &Arc<Mutex<DocState>> {
429        &self.state
430    }
431
432    #[inline]
433    pub fn get_state_deep_value(&self) -> LoroValue {
434        self.state.try_lock().unwrap().get_deep_value()
435    }
436
437    #[inline(always)]
438    pub fn oplog(&self) -> &Arc<Mutex<OpLog>> {
439        &self.oplog
440    }
441
442    pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
443        let options = self.commit_then_stop();
444        let ans = self.oplog.try_lock().unwrap().export_from(vv);
445        self.renew_txn_if_auto_commit(options);
446        ans
447    }
448
449    #[inline(always)]
450    pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
451        let s = debug_span!("import", peer = self.peer_id());
452        let _e = s.enter();
453        self.import_with(bytes, Default::default())
454    }
455
456    #[inline]
457    pub fn import_with(
458        &self,
459        bytes: &[u8],
460        origin: InternalString,
461    ) -> Result<ImportStatus, LoroError> {
462        let options = self.commit_then_stop();
463        let ans = self._import_with(bytes, origin);
464        self.renew_txn_if_auto_commit(options);
465        ans
466    }
467
468    #[tracing::instrument(skip_all)]
469    fn _import_with(
470        &self,
471        bytes: &[u8],
472        origin: InternalString,
473    ) -> Result<ImportStatus, LoroError> {
474        ensure_cov::notify_cov("loro_internal::import");
475        let parsed = parse_header_and_body(bytes, true)?;
476        info!("Importing with mode={:?}", &parsed.mode);
477        let result = match parsed.mode {
478            EncodeMode::OutdatedRle => {
479                if self.state.try_lock().unwrap().is_in_txn() {
480                    return Err(LoroError::ImportWhenInTxn);
481                }
482
483                let s = tracing::span!(
484                    tracing::Level::INFO,
485                    "Import updates ",
486                    peer = self.peer_id()
487                );
488                let _e = s.enter();
489                self.update_oplog_and_apply_delta_to_state_if_needed(
490                    |oplog| oplog.decode(parsed),
491                    origin,
492                )
493            }
494            EncodeMode::OutdatedSnapshot => {
495                if self.can_reset_with_snapshot() {
496                    tracing::info!("Init by snapshot {}", self.peer_id());
497                    decode_snapshot(self, parsed.mode, parsed.body)
498                } else {
499                    self.update_oplog_and_apply_delta_to_state_if_needed(
500                        |oplog| oplog.decode(parsed),
501                        origin,
502                    )
503                }
504            }
505            EncodeMode::FastSnapshot => {
506                if self.can_reset_with_snapshot() {
507                    ensure_cov::notify_cov("loro_internal::import::snapshot");
508                    tracing::info!("Init by fast snapshot {}", self.peer_id());
509                    decode_snapshot(self, parsed.mode, parsed.body)
510                } else {
511                    self.update_oplog_and_apply_delta_to_state_if_needed(
512                        |oplog| oplog.decode(parsed),
513                        origin,
514                    )
515
516                    // let new_doc = LoroDoc::new();
517                    // new_doc.import(bytes)?;
518                    // let updates = new_doc.export_from(&self.oplog_vv());
519                    // return self.import_with(updates.as_slice(), origin);
520                }
521            }
522            EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
523                |oplog| oplog.decode(parsed),
524                origin,
525            ),
526            EncodeMode::Auto => {
527                unreachable!()
528            }
529        };
530
531        self.emit_events();
532        result
533    }
534
535    #[tracing::instrument(skip_all)]
536    pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
537        &self,
538        f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
539        origin: InternalString,
540    ) -> Result<ImportStatus, LoroError> {
541        let mut oplog = self.oplog.try_lock().unwrap();
542        if !self.is_detached() {
543            let old_vv = oplog.vv().clone();
544            let old_frontiers = oplog.frontiers().clone();
545            let result = f(&mut oplog);
546            if &old_vv != oplog.vv() {
547                let mut diff = DiffCalculator::new(false);
548                let (diff, diff_mode) = diff.calc_diff_internal(
549                    &oplog,
550                    &old_vv,
551                    &old_frontiers,
552                    oplog.vv(),
553                    oplog.dag.get_frontiers(),
554                    None,
555                );
556                let mut state = self.state.try_lock().unwrap();
557                state.apply_diff(
558                    InternalDocDiff {
559                        origin,
560                        diff: (diff).into(),
561                        by: EventTriggerKind::Import,
562                        new_version: Cow::Owned(oplog.frontiers().clone()),
563                    },
564                    diff_mode,
565                );
566            }
567            result
568        } else {
569            f(&mut oplog)
570        }
571    }
572
573    fn emit_events(&self) {
574        // we should not hold the lock when emitting events
575        let events = {
576            let mut state = self.state.try_lock().unwrap();
577            state.take_events()
578        };
579        for event in events {
580            self.observer.emit(event);
581        }
582    }
583
584    pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
585        let mut state = self.state.try_lock().unwrap();
586        state.take_events()
587    }
588
589    #[instrument(skip_all)]
590    pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
591        if self.is_shallow() {
592            return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
593        }
594        let options = self.commit_then_stop();
595        let ans = export_snapshot(self);
596        self.renew_txn_if_auto_commit(options);
597        Ok(ans)
598    }
599
600    /// Import the json schema updates.
601    ///
602    /// only supports backward compatibility but not forward compatibility.
603    #[tracing::instrument(skip_all)]
604    pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
605        let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
606        let options = self.commit_then_stop();
607        let result = self.update_oplog_and_apply_delta_to_state_if_needed(
608            |oplog| crate::encoding::json_schema::import_json(oplog, json),
609            Default::default(),
610        );
611        self.emit_events();
612        self.renew_txn_if_auto_commit(options);
613        result
614    }
615
616    pub fn export_json_updates(
617        &self,
618        start_vv: &VersionVector,
619        end_vv: &VersionVector,
620        with_peer_compression: bool,
621    ) -> JsonSchema {
622        let options = self.commit_then_stop();
623        let oplog = self.oplog.try_lock().unwrap();
624        let mut start_vv = start_vv;
625        let _temp: Option<VersionVector>;
626        if !oplog.dag.shallow_since_vv().is_empty() {
627            // Make sure that start_vv >= shallow_since_vv
628            let mut include_all = true;
629            for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
630                if start_vv.get(peer).unwrap_or(&0) < counter {
631                    include_all = false;
632                    break;
633                }
634            }
635            if !include_all {
636                let mut vv = start_vv.clone();
637                for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
638                    vv.extend_to_include_end_id(ID::new(peer, counter));
639                }
640                _temp = Some(vv);
641                start_vv = _temp.as_ref().unwrap();
642            }
643        }
644
645        let json = crate::encoding::json_schema::export_json(
646            &oplog,
647            start_vv,
648            end_vv,
649            with_peer_compression,
650        );
651        drop(oplog);
652        self.renew_txn_if_auto_commit(options);
653        json
654    }
655
656    pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
657        let options = self.commit_then_stop();
658        let oplog = self.oplog.try_lock().unwrap();
659        let json = crate::encoding::json_schema::export_json_in_id_span(&oplog, id_span);
660        drop(oplog);
661        self.renew_txn_if_auto_commit(options);
662        json
663    }
664
665    /// Get the version vector of the current OpLog
666    #[inline]
667    pub fn oplog_vv(&self) -> VersionVector {
668        self.oplog.try_lock().unwrap().vv().clone()
669    }
670
671    /// Get the version vector of the current [DocState]
672    #[inline]
673    pub fn state_vv(&self) -> VersionVector {
674        let f = &self.state.try_lock().unwrap().frontiers;
675        self.oplog
676            .try_lock()
677            .unwrap()
678            .dag
679            .frontiers_to_vv(f)
680            .unwrap()
681    }
682
683    pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
684        let value: LoroValue = self.state.try_lock().unwrap().get_value_by_path(path)?;
685        if let LoroValue::Container(c) = value {
686            Some(ValueOrHandler::Handler(Handler::new_attached(
687                c.clone(),
688                self.inner.clone(),
689            )))
690        } else {
691            Some(ValueOrHandler::Value(value))
692        }
693    }
694
695    /// Get the handler by the string path.
696    pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
697        let path = str_to_path(path)?;
698        self.get_by_path(&path)
699    }
700
701    pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
702        let arena = &self.arena;
703        let txn = self.txn.try_lock().unwrap();
704        let txn = txn.as_ref()?;
705        let ops_ = txn.local_ops();
706        let new_id = ID {
707            peer: *txn.peer(),
708            counter: ops_.first()?.counter,
709        };
710        let change = ChangeRef {
711            id: &new_id,
712            deps: txn.frontiers(),
713            timestamp: &txn
714                .timestamp()
715                .as_ref()
716                .copied()
717                .unwrap_or_else(|| self.oplog.try_lock().unwrap().get_timestamp_for_next_txn()),
718            commit_msg: txn.msg(),
719            ops: ops_,
720            lamport: txn.lamport(),
721        };
722        let json = encode_change_to_json(change, arena);
723        Some(json)
724    }
725
726    #[inline]
727    pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
728        if self.has_container(&id) {
729            Some(Handler::new_attached(id, self.inner.clone()))
730        } else {
731            None
732        }
733    }
734
735    /// id can be a str, ContainerID, or ContainerIdRaw.
736    /// if it's str it will use Root container, which will not be None
737    #[inline]
738    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
739        let id = id.into_container_id(&self.arena, ContainerType::Text);
740        assert!(self.has_container(&id));
741        Handler::new_attached(id, self.inner.clone())
742            .into_text()
743            .unwrap()
744    }
745
746    /// id can be a str, ContainerID, or ContainerIdRaw.
747    /// if it's str it will use Root container, which will not be None
748    #[inline]
749    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
750        let id = id.into_container_id(&self.arena, ContainerType::List);
751        assert!(self.has_container(&id));
752        Handler::new_attached(id, self.inner.clone())
753            .into_list()
754            .unwrap()
755    }
756
757    /// id can be a str, ContainerID, or ContainerIdRaw.
758    /// if it's str it will use Root container, which will not be None
759    #[inline]
760    pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
761        let id = id.into_container_id(&self.arena, ContainerType::MovableList);
762        assert!(self.has_container(&id));
763        Handler::new_attached(id, self.inner.clone())
764            .into_movable_list()
765            .unwrap()
766    }
767
768    /// id can be a str, ContainerID, or ContainerIdRaw.
769    /// if it's str it will use Root container, which will not be None
770    #[inline]
771    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
772        let id = id.into_container_id(&self.arena, ContainerType::Map);
773        assert!(self.has_container(&id));
774        Handler::new_attached(id, self.inner.clone())
775            .into_map()
776            .unwrap()
777    }
778
779    /// id can be a str, ContainerID, or ContainerIdRaw.
780    /// if it's str it will use Root container, which will not be None
781    #[inline]
782    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
783        let id = id.into_container_id(&self.arena, ContainerType::Tree);
784        assert!(self.has_container(&id));
785        Handler::new_attached(id, self.inner.clone())
786            .into_tree()
787            .unwrap()
788    }
789
790    #[cfg(feature = "counter")]
791    pub fn get_counter<I: IntoContainerId>(
792        &self,
793        id: I,
794    ) -> crate::handler::counter::CounterHandler {
795        let id = id.into_container_id(&self.arena, ContainerType::Counter);
796        assert!(self.has_container(&id));
797        Handler::new_attached(id, self.inner.clone())
798            .into_counter()
799            .unwrap()
800    }
801
802    #[must_use]
803    pub fn has_container(&self, id: &ContainerID) -> bool {
804        if id.is_root() {
805            return true;
806        }
807
808        let exist = self.state.try_lock().unwrap().does_container_exist(id);
809        exist
810    }
811
812    /// Undo the operations between the given id_span. It can be used even in a collaborative environment.
813    ///
814    /// This is an internal API. You should NOT use it directly.
815    ///
816    /// # Internal
817    ///
818    /// This method will use the diff calculator to calculate the diff required to time travel
819    /// from the end of id_span to the beginning of the id_span. Then it will convert the diff to
820    /// operations and apply them to the OpLog with a dep on the last id of the given id_span.
821    ///
822    /// This implementation is kinda slow, but it's simple and maintainable. We can optimize it
823    /// further when it's needed. The time complexity is O(n + m), n is the ops in the id_span, m is the
824    /// distance from id_span to the current latest version.
825    #[instrument(level = "info", skip_all)]
826    pub fn undo_internal(
827        &self,
828        id_span: IdSpan,
829        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
830        post_transform_base: Option<&DiffBatch>,
831        before_diff: &mut dyn FnMut(&DiffBatch),
832    ) -> LoroResult<CommitWhenDrop> {
833        if !self.can_edit() {
834            return Err(LoroError::EditWhenDetached);
835        }
836
837        let options = self.commit_then_stop();
838        if !self
839            .oplog()
840            .try_lock()
841            .unwrap()
842            .vv()
843            .includes_id(id_span.id_last())
844        {
845            self.renew_txn_if_auto_commit(options);
846            return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
847        }
848
849        let (was_recording, latest_frontiers) = {
850            let mut state = self.state.try_lock().unwrap();
851            let was_recording = state.is_recording();
852            state.stop_and_clear_recording();
853            (was_recording, state.frontiers.clone())
854        };
855
856        let spans = self
857            .oplog
858            .try_lock()
859            .unwrap()
860            .split_span_based_on_deps(id_span);
861        let diff = crate::undo::undo(
862            spans,
863            match post_transform_base {
864                Some(d) => Either::Right(d),
865                None => Either::Left(&latest_frontiers),
866            },
867            |from, to| {
868                self.checkout_without_emitting(from, false).unwrap();
869                self.state.try_lock().unwrap().start_recording();
870                self.checkout_without_emitting(to, false).unwrap();
871                let mut state = self.state.try_lock().unwrap();
872                let e = state.take_events();
873                state.stop_and_clear_recording();
874                DiffBatch::new(e)
875            },
876            before_diff,
877        );
878
879        // println!("\nundo_internal: diff: {:?}", diff);
880        // println!("container remap: {:?}", container_remap);
881
882        self.checkout_without_emitting(&latest_frontiers, false)?;
883        self.set_detached(false);
884        if was_recording {
885            self.state.try_lock().unwrap().start_recording();
886        }
887        self.start_auto_commit();
888        // Try applying the diff, but ignore the error if it happens.
889        // MovableList's undo behavior is too tricky to handle in a collaborative env
890        // so in edge cases this may be an Error
891        if let Err(e) = self._apply_diff(diff, container_remap, true) {
892            warn!("Undo Failed {:?}", e);
893        }
894
895        if let Some(options) = options {
896            self.set_next_commit_options(options);
897        }
898        Ok(CommitWhenDrop {
899            doc: self,
900            default_options: CommitOptions::new().origin("undo"),
901        })
902    }
903
904    /// Generate a series of local operations that can revert the current doc to the target
905    /// version.
906    ///
907    /// Internally, it will calculate the diff between the current state and the target state,
908    /// and apply the diff to the current state.
909    pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
910        // TODO: test when the doc is readonly
911        // TODO: test when the doc is detached but enabled editing
912        let f = self.state_frontiers();
913        let diff = self.diff(&f, target)?;
914        self._apply_diff(diff, &mut Default::default(), false)
915    }
916
917    /// Calculate the diff between two versions so that apply diff on a will make the state same as b.
918    ///
919    /// NOTE: This method will make the doc enter the **detached mode**.
920    // FIXME: This method needs testing (no event should be emitted during processing this)
921    pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
922        {
923            // check whether a and b are valid
924            let oplog = self.oplog.try_lock().unwrap();
925            for id in a.iter() {
926                if !oplog.dag.contains(id) {
927                    return Err(LoroError::FrontiersNotFound(id));
928                }
929            }
930            for id in b.iter() {
931                if !oplog.dag.contains(id) {
932                    return Err(LoroError::FrontiersNotFound(id));
933                }
934            }
935        }
936
937        let options = self.commit_then_stop();
938        let was_detached = self.is_detached();
939        let old_frontiers = self.state_frontiers();
940        let was_recording = {
941            let mut state = self.state.try_lock().unwrap();
942            let is_recording = state.is_recording();
943            state.stop_and_clear_recording();
944            is_recording
945        };
946        self.checkout_without_emitting(a, true).unwrap();
947        self.state.try_lock().unwrap().start_recording();
948        self.checkout_without_emitting(b, true).unwrap();
949        let e = {
950            let mut state = self.state.try_lock().unwrap();
951            let e = state.take_events();
952            state.stop_and_clear_recording();
953            e
954        };
955        self.checkout_without_emitting(&old_frontiers, false)
956            .unwrap();
957        if !was_detached {
958            self.set_detached(false);
959            self.renew_txn_if_auto_commit(options);
960        }
961        if was_recording {
962            self.state.try_lock().unwrap().start_recording();
963        }
964        Ok(DiffBatch::new(e))
965    }
966
967    /// Apply a diff to the current state.
968    #[inline(always)]
969    pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
970        self._apply_diff(diff, &mut Default::default(), true)
971    }
972
973    /// Apply a diff to the current state.
974    ///
975    /// This method will not recreate containers with the same [ContainerID]s.
976    /// While this can be convenient in certain cases, it can break several internal invariants:
977    ///
978    /// 1. Each container should appear only once in the document. Allowing containers with the same ID
979    ///    would result in multiple instances of the same container in the document.
980    /// 2. Unreachable containers should be removable from the state when necessary.
981    ///
982    /// However, the diff may contain operations that depend on container IDs.
983    /// Therefore, users need to provide a `container_remap` to record and retrieve the container ID remapping.
984    pub(crate) fn _apply_diff(
985        &self,
986        diff: DiffBatch,
987        container_remap: &mut FxHashMap<ContainerID, ContainerID>,
988        skip_unreachable: bool,
989    ) -> LoroResult<()> {
990        if !self.can_edit() {
991            return Err(LoroError::EditWhenDetached);
992        }
993
994        let mut ans: LoroResult<()> = Ok(());
995        let mut missing_containers: Vec<ContainerID> = Vec::new();
996        for (mut id, diff) in diff.into_iter() {
997            info!(
998                "id: {:?} diff: {:?} remap: {:?}",
999                &id, &diff, container_remap
1000            );
1001            let mut remapped = false;
1002            while let Some(rid) = container_remap.get(&id) {
1003                remapped = true;
1004                id = rid.clone();
1005            }
1006
1007            if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1008                missing_containers.push(id);
1009                continue;
1010            }
1011
1012            if skip_unreachable && !remapped && !self.state.try_lock().unwrap().get_reachable(&id) {
1013                continue;
1014            }
1015
1016            let Some(h) = self.get_handler(id.clone()) else {
1017                return Err(LoroError::ContainersNotFound {
1018                    containers: Box::new(vec![id]),
1019                });
1020            };
1021            if let Err(e) = h.apply_diff(diff, container_remap) {
1022                ans = Err(e);
1023            }
1024        }
1025
1026        if !missing_containers.is_empty() {
1027            return Err(LoroError::ContainersNotFound {
1028                containers: Box::new(missing_containers),
1029            });
1030        }
1031
1032        ans
1033    }
1034
1035    /// This is for debugging purpose. It will travel the whole oplog
1036    #[inline]
1037    pub fn diagnose_size(&self) {
1038        self.oplog().try_lock().unwrap().diagnose_size();
1039    }
1040
1041    #[inline]
1042    pub fn oplog_frontiers(&self) -> Frontiers {
1043        self.oplog().try_lock().unwrap().frontiers().clone()
1044    }
1045
1046    #[inline]
1047    pub fn state_frontiers(&self) -> Frontiers {
1048        self.state.try_lock().unwrap().frontiers.clone()
1049    }
1050
1051    /// - Ordering::Less means self is less than target or parallel
1052    /// - Ordering::Equal means versions equal
1053    /// - Ordering::Greater means self's version is greater than target
1054    #[inline]
1055    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1056        self.oplog().try_lock().unwrap().cmp_with_frontiers(other)
1057    }
1058
1059    /// Compare two [Frontiers] causally.
1060    ///
1061    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
1062    #[inline]
1063    pub fn cmp_frontiers(
1064        &self,
1065        a: &Frontiers,
1066        b: &Frontiers,
1067    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1068        self.oplog().try_lock().unwrap().cmp_frontiers(a, b)
1069    }
1070
1071    pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1072        let mut state = self.state.try_lock().unwrap();
1073        if !state.is_recording() {
1074            state.start_recording();
1075        }
1076
1077        self.observer.subscribe_root(callback)
1078    }
1079
1080    pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1081        let mut state = self.state.try_lock().unwrap();
1082        if !state.is_recording() {
1083            state.start_recording();
1084        }
1085
1086        self.observer.subscribe(container_id, callback)
1087    }
1088
1089    pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1090        let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1091        activate();
1092        sub
1093    }
1094
1095    // PERF: opt
1096    #[tracing::instrument(skip_all)]
1097    pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1098        if bytes.is_empty() {
1099            return Ok(ImportStatus::default());
1100        }
1101
1102        if bytes.len() == 1 {
1103            return self.import(&bytes[0]);
1104        }
1105
1106        let mut success = VersionRange::default();
1107        let mut pending = VersionRange::default();
1108        let mut meta_arr = bytes
1109            .iter()
1110            .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1111            .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1112        meta_arr.sort_by(|a, b| {
1113            a.0.mode
1114                .cmp(&b.0.mode)
1115                .then(b.0.change_num.cmp(&a.0.change_num))
1116        });
1117
1118        let options = self.commit_then_stop();
1119        let is_detached = self.is_detached();
1120        self.detach();
1121        self.oplog.try_lock().unwrap().batch_importing = true;
1122        let mut err = None;
1123        for (_meta, data) in meta_arr {
1124            match self.import(data) {
1125                Ok(s) => {
1126                    for (peer, (start, end)) in s.success.iter() {
1127                        match success.0.entry(*peer) {
1128                            Entry::Occupied(mut e) => {
1129                                e.get_mut().1 = *end.max(&e.get().1);
1130                            }
1131                            Entry::Vacant(e) => {
1132                                e.insert((*start, *end));
1133                            }
1134                        }
1135                    }
1136
1137                    if let Some(p) = s.pending.as_ref() {
1138                        for (&peer, &(start, end)) in p.iter() {
1139                            match pending.0.entry(peer) {
1140                                Entry::Occupied(mut e) => {
1141                                    e.get_mut().0 = start.min(e.get().0);
1142                                    e.get_mut().1 = end.min(e.get().1);
1143                                }
1144                                Entry::Vacant(e) => {
1145                                    e.insert((start, end));
1146                                }
1147                            }
1148                        }
1149                    }
1150                }
1151                Err(e) => {
1152                    err = Some(e);
1153                }
1154            }
1155        }
1156
1157        let mut oplog = self.oplog.try_lock().unwrap();
1158        oplog.batch_importing = false;
1159        drop(oplog);
1160
1161        if !is_detached {
1162            self.checkout_to_latest();
1163        }
1164
1165        self.renew_txn_if_auto_commit(options);
1166        if let Some(err) = err {
1167            return Err(err);
1168        }
1169
1170        Ok(ImportStatus {
1171            success,
1172            pending: if pending.is_empty() {
1173                None
1174            } else {
1175                Some(pending)
1176            },
1177        })
1178    }
1179
1180    /// Get shallow value of the document.
1181    #[inline]
1182    pub fn get_value(&self) -> LoroValue {
1183        self.state.try_lock().unwrap().get_value()
1184    }
1185
1186    /// Get deep value of the document.
1187    #[inline]
1188    pub fn get_deep_value(&self) -> LoroValue {
1189        self.state.try_lock().unwrap().get_deep_value()
1190    }
1191
1192    /// Get deep value of the document with container id
1193    #[inline]
1194    pub fn get_deep_value_with_id(&self) -> LoroValue {
1195        self.state.try_lock().unwrap().get_deep_value_with_id()
1196    }
1197
1198    pub fn checkout_to_latest(&self) {
1199        let options = self.commit_then_renew();
1200        if !self.is_detached() {
1201            return;
1202        }
1203
1204        tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1205            let f = self.oplog_frontiers();
1206            let this = &self;
1207            let frontiers = &f;
1208            this.checkout_without_emitting(frontiers, false).unwrap(); // we don't need to shrink frontiers
1209                                                                       // because oplog's frontiers are already shrinked
1210            this.emit_events();
1211            if this.config.detached_editing() {
1212                this.renew_peer_id();
1213            }
1214
1215            self.set_detached(false);
1216            self.renew_txn_if_auto_commit(options);
1217        });
1218    }
1219
1220    /// Checkout [DocState] to a specific version.
1221    ///
1222    /// This will make the current [DocState] detached from the latest version of [OpLog].
1223    /// Any further import will not be reflected on the [DocState], until user call [LoroDoc::attach()]
1224    pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1225        let options = self.checkout_without_emitting(frontiers, true)?;
1226        self.emit_events();
1227        if self.config.detached_editing() {
1228            self.renew_peer_id();
1229            self.renew_txn_if_auto_commit(options);
1230        }
1231
1232        Ok(())
1233    }
1234
1235    #[instrument(level = "info", skip(self))]
1236    pub(crate) fn checkout_without_emitting(
1237        &self,
1238        frontiers: &Frontiers,
1239        to_shrink_frontiers: bool,
1240    ) -> Result<Option<CommitOptions>, LoroError> {
1241        let mut options = None;
1242        let had_txn = self.txn.try_lock().unwrap().is_some();
1243        if had_txn {
1244            options = self.commit_then_stop();
1245        }
1246        let from_frontiers = self.state_frontiers();
1247        info!(
1248            "checkout from={:?} to={:?} cur_vv={:?}",
1249            from_frontiers,
1250            frontiers,
1251            self.oplog_vv()
1252        );
1253
1254        if &from_frontiers == frontiers {
1255            if had_txn {
1256                self.renew_txn_if_auto_commit(options);
1257            }
1258            return Ok(None);
1259        }
1260
1261        let oplog = self.oplog.try_lock().unwrap();
1262        if oplog.dag.is_before_shallow_root(frontiers) {
1263            drop(oplog);
1264            if had_txn {
1265                self.renew_txn_if_auto_commit(options);
1266            }
1267            return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1268        }
1269
1270        let frontiers = if to_shrink_frontiers {
1271            shrink_frontiers(frontiers, &oplog.dag)
1272                .map_err(|_| LoroError::SwitchToVersionBeforeShallowRoot)?
1273        } else {
1274            frontiers.clone()
1275        };
1276        if from_frontiers == frontiers {
1277            drop(oplog);
1278            if had_txn {
1279                self.renew_txn_if_auto_commit(options);
1280            }
1281            return Ok(None);
1282        }
1283
1284        let mut state = self.state.try_lock().unwrap();
1285        let mut calc = self.diff_calculator.try_lock().unwrap();
1286        for i in frontiers.iter() {
1287            if !oplog.dag.contains(i) {
1288                drop(oplog);
1289                drop(state);
1290                if had_txn {
1291                    self.renew_txn_if_auto_commit(options);
1292                }
1293                return Err(LoroError::FrontiersNotFound(i));
1294            }
1295        }
1296
1297        let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1298        let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1299            drop(oplog);
1300            drop(state);
1301            if had_txn {
1302                self.renew_txn_if_auto_commit(options);
1303            }
1304            return Err(LoroError::NotFoundError(
1305                format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1306            ));
1307        };
1308
1309        self.set_detached(true);
1310        let (diff, diff_mode) =
1311            calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1312        state.apply_diff(
1313            InternalDocDiff {
1314                origin: "checkout".into(),
1315                diff: Cow::Owned(diff),
1316                by: EventTriggerKind::Checkout,
1317                new_version: Cow::Owned(frontiers.clone()),
1318            },
1319            diff_mode,
1320        );
1321
1322        drop(state);
1323        drop(oplog);
1324        Ok(options)
1325    }
1326
1327    #[inline]
1328    pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1329        self.oplog.try_lock().unwrap().dag.vv_to_frontiers(vv)
1330    }
1331
1332    #[inline]
1333    pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1334        self.oplog
1335            .try_lock()
1336            .unwrap()
1337            .dag
1338            .frontiers_to_vv(frontiers)
1339    }
1340
1341    /// Import ops from other doc.
1342    ///
1343    /// After `a.merge(b)` and `b.merge(a)`, `a` and `b` will have the same content if they are in attached mode.
1344    pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1345        self.import(&other.export_from(&self.oplog_vv()))
1346    }
1347
1348    pub(crate) fn arena(&self) -> &SharedArena {
1349        &self.arena
1350    }
1351
1352    #[inline]
1353    pub fn len_ops(&self) -> usize {
1354        let oplog = self.oplog.try_lock().unwrap();
1355        let ans = oplog.vv().iter().map(|(_, ops)| *ops).sum::<i32>() as usize;
1356        if oplog.is_shallow() {
1357            let sub = oplog
1358                .shallow_since_vv()
1359                .iter()
1360                .map(|(_, ops)| *ops)
1361                .sum::<i32>() as usize;
1362            ans - sub
1363        } else {
1364            ans
1365        }
1366    }
1367
1368    #[inline]
1369    pub fn len_changes(&self) -> usize {
1370        let oplog = self.oplog.try_lock().unwrap();
1371        oplog.len_changes()
1372    }
1373
1374    pub fn config(&self) -> &Configure {
1375        &self.config
1376    }
1377
1378    /// This method compare the consistency between the current doc state
1379    /// and the state calculated by diff calculator from beginning.
1380    ///
1381    /// Panic when it's not consistent
1382    pub fn check_state_diff_calc_consistency_slow(&self) {
1383        // #[cfg(any(test, debug_assertions, feature = "test_utils"))]
1384        {
1385            static IS_CHECKING: AtomicBool = AtomicBool::new(false);
1386            if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1387                return;
1388            }
1389
1390            IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1391            let peer_id = self.peer_id();
1392            let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1393            let _g = s.enter();
1394            let options = self.commit_then_stop();
1395            self.oplog.try_lock().unwrap().check_dag_correctness();
1396            if self.is_shallow() {
1397                // For shallow documents, we cannot replay from the beginning as the history is not complete.
1398                //
1399                // Instead, we:
1400                // 1. Export the initial state from the GC snapshot.
1401                // 2. Create a new document and import the initial snapshot.
1402                // 3. Export updates from the shallow start version vector to the current version.
1403                // 4. Import these updates into the new document.
1404                // 5. Compare the states of the new document and the current document.
1405
1406                // Step 1: Export the initial state from the GC snapshot.
1407                let initial_snapshot = self
1408                    .export(ExportMode::state_only(Some(
1409                        &self.shallow_since_frontiers(),
1410                    )))
1411                    .unwrap();
1412
1413                // Step 2: Create a new document and import the initial snapshot.
1414                let doc = LoroDoc::new();
1415                doc.import(&initial_snapshot).unwrap();
1416                self.checkout(&self.shallow_since_frontiers()).unwrap();
1417                assert_eq!(self.get_deep_value(), doc.get_deep_value());
1418
1419                // Step 3: Export updates since the shallow start version vector to the current version.
1420                let updates = self.export(ExportMode::all_updates()).unwrap();
1421
1422                // Step 4: Import these updates into the new document.
1423                doc.import(&updates).unwrap();
1424                self.checkout_to_latest();
1425
1426                // Step 5: Checkout to the current state's frontiers and compare the states.
1427                // doc.checkout(&self.state_frontiers()).unwrap();
1428                assert_eq!(doc.get_deep_value(), self.get_deep_value());
1429                let mut calculated_state = doc.app_state().try_lock().unwrap();
1430                let mut current_state = self.app_state().try_lock().unwrap();
1431                current_state.check_is_the_same(&mut calculated_state);
1432            } else {
1433                let f = self.state_frontiers();
1434                let vv = self
1435                    .oplog()
1436                    .try_lock()
1437                    .unwrap()
1438                    .dag
1439                    .frontiers_to_vv(&f)
1440                    .unwrap();
1441                let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1442                let doc = Self::new();
1443                doc.import(&bytes).unwrap();
1444                let mut calculated_state = doc.app_state().try_lock().unwrap();
1445                let mut current_state = self.app_state().try_lock().unwrap();
1446                current_state.check_is_the_same(&mut calculated_state);
1447            }
1448
1449            self.renew_txn_if_auto_commit(options);
1450            IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1451        }
1452    }
1453
1454    #[inline]
1455    pub fn log_estimated_size(&self) {
1456        let state = self.state.try_lock().unwrap();
1457        state.log_estimated_size();
1458    }
1459
1460    pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1461        self.query_pos_internal(pos, true)
1462    }
1463
1464    /// Get position in a seq container
1465    pub(crate) fn query_pos_internal(
1466        &self,
1467        pos: &Cursor,
1468        ret_event_index: bool,
1469    ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1470        let mut state = self.state.try_lock().unwrap();
1471        if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1472            Ok(PosQueryResult {
1473                update: None,
1474                current: AbsolutePosition {
1475                    pos: ans,
1476                    side: pos.side,
1477                },
1478            })
1479        } else {
1480            // We need to trace back to the version where the relative position is valid.
1481            // The optimal way to find that version is to have succ info like Automerge.
1482            //
1483            // But we don't have that info now, so an alternative way is to trace back
1484            // to version with frontiers of `[pos.id]`. But this may be very slow even if
1485            // the target is just deleted a few versions ago.
1486            //
1487            // What we need is to trace back to the latest version that deletes the target
1488            // id.
1489
1490            // commit the txn to make sure we can query the history correctly
1491            drop(state);
1492            self.commit_then_renew();
1493            let oplog = self.oplog().try_lock().unwrap();
1494            // TODO: assert pos.id is not unknown
1495            if let Some(id) = pos.id {
1496                let idx = oplog
1497                    .arena
1498                    .id_to_idx(&pos.container)
1499                    .ok_or(CannotFindRelativePosition::ContainerDeleted)?;
1500                // We know where the target id is when we trace back to the delete_op_id.
1501                let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1502                    if oplog.shallow_since_vv().includes_id(id) {
1503                        return Err(CannotFindRelativePosition::HistoryCleared);
1504                    }
1505
1506                    tracing::error!("Cannot find id {}", id);
1507                    return Err(CannotFindRelativePosition::IdNotFound);
1508                };
1509                // Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
1510                let mut diff_calc = DiffCalculator::new(true);
1511                let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1512                let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1513                // TODO: PERF: it doesn't need to calc the effects here
1514                diff_calc.calc_diff_internal(
1515                    &oplog,
1516                    before,
1517                    &before_frontiers,
1518                    oplog.vv(),
1519                    oplog.frontiers(),
1520                    Some(&|target| idx == target),
1521                );
1522                // TODO: remove depth info
1523                let depth = self.arena.get_depth(idx);
1524                let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1525                match diff_calc {
1526                    crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1527                        let c = text.get_id_latest_pos(id).unwrap();
1528                        let new_pos = c.pos;
1529                        let handler = self.get_text(&pos.container);
1530                        let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1531                        Ok(PosQueryResult {
1532                            update: handler.get_cursor(current_pos, c.side),
1533                            current: AbsolutePosition {
1534                                pos: current_pos,
1535                                side: c.side,
1536                            },
1537                        })
1538                    }
1539                    crate::diff_calc::ContainerDiffCalculator::List(list) => {
1540                        let c = list.get_id_latest_pos(id).unwrap();
1541                        let new_pos = c.pos;
1542                        let handler = self.get_list(&pos.container);
1543                        Ok(PosQueryResult {
1544                            update: handler.get_cursor(new_pos, c.side),
1545                            current: AbsolutePosition {
1546                                pos: new_pos,
1547                                side: c.side,
1548                            },
1549                        })
1550                    }
1551                    crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1552                        let c = list.get_id_latest_pos(id).unwrap();
1553                        let new_pos = c.pos;
1554                        let handler = self.get_movable_list(&pos.container);
1555                        let new_pos = handler.op_pos_to_user_pos(new_pos);
1556                        Ok(PosQueryResult {
1557                            update: handler.get_cursor(new_pos, c.side),
1558                            current: AbsolutePosition {
1559                                pos: new_pos,
1560                                side: c.side,
1561                            },
1562                        })
1563                    }
1564                    crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1565                    crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1566                    #[cfg(feature = "counter")]
1567                    crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1568                    crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1569                }
1570            } else {
1571                match pos.container.container_type() {
1572                    ContainerType::Text => {
1573                        let text = self.get_text(&pos.container);
1574                        Ok(PosQueryResult {
1575                            update: Some(Cursor {
1576                                id: None,
1577                                container: text.id(),
1578                                side: pos.side,
1579                                origin_pos: text.len_unicode(),
1580                            }),
1581                            current: AbsolutePosition {
1582                                pos: text.len_event(),
1583                                side: pos.side,
1584                            },
1585                        })
1586                    }
1587                    ContainerType::List => {
1588                        let list = self.get_list(&pos.container);
1589                        Ok(PosQueryResult {
1590                            update: Some(Cursor {
1591                                id: None,
1592                                container: list.id(),
1593                                side: pos.side,
1594                                origin_pos: list.len(),
1595                            }),
1596                            current: AbsolutePosition {
1597                                pos: list.len(),
1598                                side: pos.side,
1599                            },
1600                        })
1601                    }
1602                    ContainerType::MovableList => {
1603                        let list = self.get_movable_list(&pos.container);
1604                        Ok(PosQueryResult {
1605                            update: Some(Cursor {
1606                                id: None,
1607                                container: list.id(),
1608                                side: pos.side,
1609                                origin_pos: list.len(),
1610                            }),
1611                            current: AbsolutePosition {
1612                                pos: list.len(),
1613                                side: pos.side,
1614                            },
1615                        })
1616                    }
1617                    ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1618                        unreachable!()
1619                    }
1620                    #[cfg(feature = "counter")]
1621                    ContainerType::Counter => unreachable!(),
1622                }
1623            }
1624        }
1625    }
1626
1627    /// Free the history cache that is used for making checkout faster.
1628    ///
1629    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1630    /// You can free it by calling this method.
1631    pub fn free_history_cache(&self) {
1632        self.oplog.try_lock().unwrap().free_history_cache();
1633    }
1634
1635    /// Free the cached diff calculator that is used for checkout.
1636    pub fn free_diff_calculator(&self) {
1637        *self.diff_calculator.try_lock().unwrap() = DiffCalculator::new(true);
1638    }
1639
1640    /// If you use checkout that switching to an old/concurrent version, the history cache will be built.
1641    /// You can free it by calling `free_history_cache`.
1642    pub fn has_history_cache(&self) -> bool {
1643        self.oplog.try_lock().unwrap().has_history_cache()
1644    }
1645
1646    /// Encoded all ops and history cache to bytes and store them in the kv store.
1647    ///
1648    /// The parsed ops will be dropped
1649    #[inline]
1650    pub fn compact_change_store(&self) {
1651        self.commit_then_renew();
1652        self.oplog.try_lock().unwrap().compact_change_store();
1653    }
1654
1655    /// Analyze the container info of the doc
1656    ///
1657    /// This is used for development and debugging
1658    #[inline]
1659    pub fn analyze(&self) -> DocAnalysis {
1660        DocAnalysis::analyze(self)
1661    }
1662
1663    /// Get the path from the root to the container
1664    pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1665        let mut state = self.state.try_lock().unwrap();
1666        let idx = state.arena.id_to_idx(id)?;
1667        state.get_path(idx)
1668    }
1669
1670    #[instrument(skip(self))]
1671    pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1672        let options = self.commit_then_stop();
1673        let ans = match mode {
1674            ExportMode::Snapshot => export_fast_snapshot(self),
1675            ExportMode::Updates { from } => export_fast_updates(self, &from),
1676            ExportMode::UpdatesInRange { spans } => {
1677                export_fast_updates_in_range(&self.oplog.try_lock().unwrap(), spans.as_ref())
1678            }
1679            ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1680            ExportMode::StateOnly(f) => match f {
1681                Some(f) => export_state_only_snapshot(self, &f)?,
1682                None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1683            },
1684            ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1685        };
1686
1687        self.renew_txn_if_auto_commit(options);
1688        Ok(ans)
1689    }
1690
1691    /// The doc only contains the history since the shallow history start version vector.
1692    ///
1693    /// This is empty if the doc is not shallow.
1694    ///
1695    /// The ops included by the shallow history start version vector are not in the doc.
1696    pub fn shallow_since_vv(&self) -> ImVersionVector {
1697        self.oplog().try_lock().unwrap().shallow_since_vv().clone()
1698    }
1699
1700    pub fn shallow_since_frontiers(&self) -> Frontiers {
1701        self.oplog()
1702            .try_lock()
1703            .unwrap()
1704            .shallow_since_frontiers()
1705            .clone()
1706    }
1707
1708    /// Check if the doc contains the full history.
1709    pub fn is_shallow(&self) -> bool {
1710        !self
1711            .oplog()
1712            .try_lock()
1713            .unwrap()
1714            .shallow_since_vv()
1715            .is_empty()
1716    }
1717
1718    /// Get the number of operations in the pending transaction.
1719    ///
1720    /// The pending transaction is the one that is not committed yet. It will be committed
1721    /// after calling `doc.commit()`, `doc.export(mode)` or `doc.checkout(version)`.
1722    pub fn get_pending_txn_len(&self) -> usize {
1723        if let Some(txn) = self.txn.try_lock().unwrap().as_ref() {
1724            txn.len()
1725        } else {
1726            0
1727        }
1728    }
1729
1730    #[inline]
1731    pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1732        self.oplog().try_lock().unwrap().dag.find_path(from, to)
1733    }
1734}
1735
1736#[derive(Debug, thiserror::Error)]
1737pub enum ChangeTravelError {
1738    #[error("Target id not found {0:?}")]
1739    TargetIdNotFound(ID),
1740    #[error("The shallow history of the doc doesn't include the target version")]
1741    TargetVersionNotIncluded,
1742}
1743
1744impl LoroDoc {
1745    pub fn travel_change_ancestors(
1746        &self,
1747        ids: &[ID],
1748        f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1749    ) -> Result<(), ChangeTravelError> {
1750        self.commit_then_renew();
1751        struct PendingNode(ChangeMeta);
1752        impl PartialEq for PendingNode {
1753            fn eq(&self, other: &Self) -> bool {
1754                self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1755            }
1756        }
1757
1758        impl Eq for PendingNode {}
1759        impl PartialOrd for PendingNode {
1760            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1761                Some(self.cmp(other))
1762            }
1763        }
1764
1765        impl Ord for PendingNode {
1766            fn cmp(&self, other: &Self) -> Ordering {
1767                self.0
1768                    .lamport_last()
1769                    .cmp(&other.0.lamport_last())
1770                    .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1771            }
1772        }
1773
1774        for id in ids {
1775            let op_log = &self.oplog().try_lock().unwrap();
1776            if !op_log.vv().includes_id(*id) {
1777                return Err(ChangeTravelError::TargetIdNotFound(*id));
1778            }
1779            if op_log.dag.shallow_since_vv().includes_id(*id) {
1780                return Err(ChangeTravelError::TargetVersionNotIncluded);
1781            }
1782        }
1783
1784        let mut visited = FxHashSet::default();
1785        let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1786        for id in ids {
1787            pending.push(PendingNode(ChangeMeta::from_change(
1788                &self.oplog().try_lock().unwrap().get_change_at(*id).unwrap(),
1789            )));
1790        }
1791        while let Some(PendingNode(node)) = pending.pop() {
1792            let deps = node.deps.clone();
1793            if f(node).is_break() {
1794                break;
1795            }
1796
1797            for dep in deps.iter() {
1798                let Some(dep_node) = self.oplog().try_lock().unwrap().get_change_at(dep) else {
1799                    continue;
1800                };
1801                if visited.contains(&dep_node.id) {
1802                    continue;
1803                }
1804
1805                visited.insert(dep_node.id);
1806                pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1807            }
1808        }
1809
1810        Ok(())
1811    }
1812
1813    pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1814        self.commit_then_renew();
1815        let mut set = FxHashSet::default();
1816        let oplog = &self.oplog().try_lock().unwrap();
1817        for op in oplog.iter_ops(id.to_span(len)) {
1818            let id = oplog.arena.get_container_id(op.container()).unwrap();
1819            set.insert(id);
1820        }
1821
1822        set
1823    }
1824}
1825
1826// FIXME: PERF: This method is quite slow because it iterates all the changes
1827fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
1828    let start_vv = oplog
1829        .dag
1830        .frontiers_to_vv(&id.into())
1831        .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
1832    for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
1833        for op in change.ops.iter().rev() {
1834            if op.container != idx {
1835                continue;
1836            }
1837            if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
1838                if d.id_start.to_span(d.atom_len()).contains(id) {
1839                    return Some(ID::new(change.peer(), op.counter));
1840                }
1841            }
1842        }
1843    }
1844
1845    None
1846}
1847
1848#[derive(Debug)]
1849pub struct CommitWhenDrop<'a> {
1850    doc: &'a LoroDoc,
1851    default_options: CommitOptions,
1852}
1853
1854impl Drop for CommitWhenDrop<'_> {
1855    fn drop(&mut self) {
1856        {
1857            let mut guard = self.doc.txn.try_lock().unwrap();
1858            if let Some(txn) = guard.as_mut() {
1859                txn.set_default_options(std::mem::take(&mut self.default_options));
1860            };
1861        }
1862
1863        self.doc.commit_then_renew();
1864    }
1865}
1866
1867/// Options for configuring a commit operation.
1868#[derive(Debug, Clone)]
1869pub struct CommitOptions {
1870    /// Origin identifier for the commit event, used to track the source of changes.
1871    /// It doesn't persist.
1872    pub origin: Option<InternalString>,
1873
1874    /// Whether to immediately start a new transaction after committing.
1875    /// Defaults to true.
1876    pub immediate_renew: bool,
1877
1878    /// Custom timestamp for the commit in seconds since Unix epoch.
1879    /// If None, the current time will be used.
1880    pub timestamp: Option<Timestamp>,
1881
1882    /// Optional commit message to attach to the changes. It will be persisted.
1883    pub commit_msg: Option<Arc<str>>,
1884}
1885
1886impl CommitOptions {
1887    /// Creates a new CommitOptions with default values.
1888    pub fn new() -> Self {
1889        Self {
1890            origin: None,
1891            immediate_renew: true,
1892            timestamp: None,
1893            commit_msg: None,
1894        }
1895    }
1896
1897    /// Sets the origin identifier for this commit.
1898    pub fn origin(mut self, origin: &str) -> Self {
1899        self.origin = Some(origin.into());
1900        self
1901    }
1902
1903    /// Sets whether to immediately start a new transaction after committing.
1904    pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
1905        self.immediate_renew = immediate_renew;
1906        self
1907    }
1908
1909    /// Set the timestamp of the commit.
1910    ///
1911    /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
1912    pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
1913        self.timestamp = Some(timestamp);
1914        self
1915    }
1916
1917    /// Sets a commit message to be attached to the changes.
1918    pub fn commit_msg(mut self, commit_msg: &str) -> Self {
1919        self.commit_msg = Some(commit_msg.into());
1920        self
1921    }
1922
1923    /// Sets the origin identifier for this commit.
1924    pub fn set_origin(&mut self, origin: Option<&str>) {
1925        self.origin = origin.map(|x| x.into())
1926    }
1927
1928    /// Sets the timestamp for this commit.
1929    pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
1930        self.timestamp = timestamp;
1931    }
1932}
1933
1934impl Default for CommitOptions {
1935    fn default() -> Self {
1936        Self::new()
1937    }
1938}
1939
1940#[cfg(test)]
1941mod test {
1942    use loro_common::ID;
1943
1944    use crate::{version::Frontiers, LoroDoc, ToJson};
1945
1946    #[test]
1947    fn test_sync() {
1948        fn is_send_sync<T: Send + Sync>(_v: T) {}
1949        let loro = super::LoroDoc::new();
1950        is_send_sync(loro)
1951    }
1952
1953    #[test]
1954    fn test_checkout() {
1955        let loro = LoroDoc::new();
1956        loro.set_peer_id(1).unwrap();
1957        let text = loro.get_text("text");
1958        let map = loro.get_map("map");
1959        let list = loro.get_list("list");
1960        let mut txn = loro.txn().unwrap();
1961        for i in 0..10 {
1962            map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
1963            text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
1964            list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
1965        }
1966        txn.commit().unwrap();
1967        let b = LoroDoc::new();
1968        b.import(&loro.export_snapshot().unwrap()).unwrap();
1969        loro.checkout(&Frontiers::default()).unwrap();
1970        {
1971            let json = &loro.get_deep_value();
1972            assert_eq!(json.to_json(), r#"{"text":"","list":[],"map":{}}"#);
1973        }
1974
1975        b.checkout(&ID::new(1, 2).into()).unwrap();
1976        {
1977            let json = &b.get_deep_value();
1978            assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":0}}"#);
1979        }
1980
1981        loro.checkout(&ID::new(1, 3).into()).unwrap();
1982        {
1983            let json = &loro.get_deep_value();
1984            assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":1}}"#);
1985        }
1986
1987        b.checkout(&ID::new(1, 29).into()).unwrap();
1988        {
1989            let json = &b.get_deep_value();
1990            assert_eq!(
1991                json.to_json(),
1992                r#"{"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}}"#
1993            );
1994        }
1995    }
1996
1997    #[test]
1998    fn import_batch_err_181() {
1999        let a = LoroDoc::new_auto_commit();
2000        let update_a = a.export_snapshot();
2001        let b = LoroDoc::new_auto_commit();
2002        b.import_batch(&[update_a.unwrap()]).unwrap();
2003        b.get_text("text").insert(0, "hello").unwrap();
2004        b.commit_then_renew();
2005        let oplog = b.oplog().try_lock().unwrap();
2006        drop(oplog);
2007        b.export_from(&Default::default());
2008    }
2009}