1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::Timestamp,
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48 LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 #[inline]
89 pub fn with_barrier<F, R>(&self, f: F) -> R
90 where
91 F: FnOnce() -> R,
92 {
93 let (options, guard) = self.implicit_commit_then_stop();
94 let result = f();
95 drop(guard);
96 self.renew_txn_if_auto_commit(options);
97 result
98 }
99
100 pub fn new() -> Self {
101 let oplog = OpLog::new();
102 let arena = oplog.arena.clone();
103 let config: Configure = oplog.configure.clone();
104 let lock_group = LoroLockGroup::new();
105 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
106 let inner = Arc::new_cyclic(|w| {
107 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
108 LoroDocInner {
109 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
110 state,
111 config,
112 detached: AtomicBool::new(false),
113 auto_commit: AtomicBool::new(false),
114 observer: Arc::new(Observer::new(arena.clone())),
115 diff_calculator: Arc::new(
116 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
117 ),
118 txn: global_txn,
119 arena,
120 local_update_subs: SubscriberSetWithQueue::new(),
121 peer_id_change_subs: SubscriberSetWithQueue::new(),
122 pre_commit_subs: SubscriberSetWithQueue::new(),
123 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
124 }
125 });
126 LoroDoc { inner }
127 }
128
129 pub fn fork(&self) -> Self {
130 if self.is_detached() {
131 return self
132 .fork_at(&self.state_frontiers())
133 .expect("fork_at on detached doc should not fail");
134 }
135
136 let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
137 let doc = Self::new();
138 doc.with_barrier(|| {
139 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
140 })
141 .unwrap();
142 doc.set_config(&self.config);
143 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
144 doc.start_auto_commit();
145 }
146 doc
147 }
148 pub fn set_detached_editing(&self, enable: bool) {
164 self.config.set_detached_editing(enable);
165 if enable && self.is_detached() {
166 self.with_barrier(|| {
167 self.renew_peer_id();
168 });
169 }
170 }
171
172 #[inline]
174 pub fn new_auto_commit() -> Self {
175 let doc = Self::new();
176 doc.start_auto_commit();
177 doc
178 }
179
180 #[inline(always)]
181 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
182 if peer == PeerID::MAX {
183 return Err(LoroError::InvalidPeerID);
184 }
185 let next_id = self.oplog.lock().next_id(peer);
186 if self.auto_commit.load(Acquire) {
187 let doc_state = self.state.lock();
188 doc_state
189 .peer
190 .store(peer, std::sync::atomic::Ordering::Relaxed);
191
192 if doc_state.is_in_txn() {
193 drop(doc_state);
194 self.with_barrier(|| {});
196 }
197 self.peer_id_change_subs.emit(&(), next_id);
198 return Ok(());
199 }
200
201 let doc_state = self.state.lock();
202 if doc_state.is_in_txn() {
203 return Err(LoroError::TransactionError(
204 "Cannot change peer id during transaction"
205 .to_string()
206 .into_boxed_str(),
207 ));
208 }
209
210 doc_state
211 .peer
212 .store(peer, std::sync::atomic::Ordering::Relaxed);
213 drop(doc_state);
214 self.peer_id_change_subs.emit(&(), next_id);
215 Ok(())
216 }
217
218 pub(crate) fn renew_peer_id(&self) {
220 let peer_id = DefaultRandom.next_u64();
221 self.set_peer_id(peer_id).unwrap();
222 }
223
224 #[inline]
236 #[must_use]
237 pub fn implicit_commit_then_stop(
238 &self,
239 ) -> (
240 Option<CommitOptions>,
241 LoroMutexGuard<'_, Option<Transaction>>,
242 ) {
243 let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
245 (a, b.unwrap())
246 }
247
248 #[inline]
253 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
254 self.commit_internal(CommitOptions::new().immediate_renew(true), false)
256 .0
257 }
258
259 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
264 let mut txn_guard = self.txn.lock();
265 let Some(txn) = txn_guard.as_mut() else {
266 return Some(txn_guard);
267 };
268
269 if txn.is_peer_first_appearance {
270 txn.is_peer_first_appearance = false;
271 drop(txn_guard);
272 self.first_commit_from_peer_subs.emit(
274 &(),
275 FirstCommitFromPeerPayload {
276 peer: self.peer_id(),
277 },
278 );
279 }
280
281 None
282 }
283
284 #[instrument(skip_all)]
292 fn commit_internal(
293 &self,
294 config: CommitOptions,
295 preserve_on_empty: bool,
296 ) -> (
297 Option<CommitOptions>,
298 Option<LoroMutexGuard<'_, Option<Transaction>>>,
299 ) {
300 if !self.auto_commit.load(Acquire) {
301 let txn_guard = self.txn.lock();
302 return (None, Some(txn_guard));
305 }
306
307 loop {
308 if let Some(txn_guard) = self.before_commit() {
309 return (None, Some(txn_guard));
310 }
311
312 let mut txn_guard = self.txn.lock();
313 let txn = txn_guard.take();
314 let Some(mut txn) = txn else {
315 return (None, Some(txn_guard));
316 };
317 let on_commit = txn.take_on_commit();
318 if let Some(origin) = config.origin.clone() {
319 txn.set_origin(origin);
320 }
321
322 if let Some(timestamp) = config.timestamp {
323 txn.set_timestamp(timestamp);
324 }
325
326 if let Some(msg) = config.commit_msg.as_ref() {
327 txn.set_msg(Some(msg.clone()));
328 }
329
330 let id_span = txn.id_span();
331 let mut options = txn.commit().unwrap();
332 if let Some(opts) = options.as_mut() {
334 if config.origin.is_some() {
336 opts.set_origin(None);
337 }
338 if !preserve_on_empty {
340 options = None;
341 }
342 }
343 if config.immediate_renew {
344 assert!(self.can_edit());
345 let mut t = self.txn().unwrap();
346 if let Some(options) = options.as_ref() {
347 t.set_options(options.clone());
348 }
349 *txn_guard = Some(t);
350 }
351
352 if let Some(on_commit) = on_commit {
353 drop(txn_guard);
354 on_commit(&self.state, &self.oplog, id_span);
355 txn_guard = self.txn.lock();
356 if !config.immediate_renew && txn_guard.is_some() {
357 continue;
359 }
360 }
361
362 return (
363 options,
364 if !config.immediate_renew {
365 Some(txn_guard)
366 } else {
367 None
368 },
369 );
370 }
371 }
372
373 #[instrument(skip_all)]
378 pub fn commit_with(
379 &self,
380 config: CommitOptions,
381 ) -> (
382 Option<CommitOptions>,
383 Option<LoroMutexGuard<'_, Option<Transaction>>>,
384 ) {
385 self.commit_internal(config, false)
386 }
387
388 pub fn set_next_commit_message(&self, message: &str) {
390 let mut binding = self.txn.lock();
391 let Some(txn) = binding.as_mut() else {
392 return;
393 };
394
395 if message.is_empty() {
396 txn.set_msg(None)
397 } else {
398 txn.set_msg(Some(message.into()))
399 }
400 }
401
402 pub fn set_next_commit_origin(&self, origin: &str) {
404 let mut txn = self.txn.lock();
405 if let Some(txn) = txn.as_mut() {
406 txn.set_origin(origin.into());
407 }
408 }
409
410 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
412 let mut txn = self.txn.lock();
413 if let Some(txn) = txn.as_mut() {
414 txn.set_timestamp(timestamp);
415 }
416 }
417
418 pub fn set_next_commit_options(&self, options: CommitOptions) {
420 let mut txn = self.txn.lock();
421 if let Some(txn) = txn.as_mut() {
422 txn.set_options(options);
423 }
424 }
425
426 pub fn clear_next_commit_options(&self) {
428 let mut txn = self.txn.lock();
429 if let Some(txn) = txn.as_mut() {
430 txn.set_options(CommitOptions::new());
431 }
432 }
433
434 #[inline]
445 pub fn set_record_timestamp(&self, record: bool) {
446 self.config.set_record_timestamp(record);
447 }
448
449 #[inline]
454 pub fn set_change_merge_interval(&self, interval: i64) {
455 self.config.set_merge_interval(interval);
456 }
457
458 pub fn can_edit(&self) -> bool {
459 !self.is_detached() || self.config.detached_editing()
460 }
461
462 pub fn is_detached_editing_enabled(&self) -> bool {
463 self.config.detached_editing()
464 }
465
466 #[inline]
467 pub fn config_text_style(&self, text_style: StyleConfigMap) {
468 self.config.text_style_config.write().map = text_style.map;
469 }
470
471 #[inline]
472 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
473 self.config.text_style_config.write().default_style = text_style;
474 }
475 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
476 let doc = Self::new();
477 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
478 if mode.is_snapshot() {
479 doc.with_barrier(|| -> Result<(), LoroError> {
480 decode_snapshot(&doc, mode, body, Default::default())?;
481 Ok(())
482 })?;
483 Ok(doc)
484 } else {
485 Err(LoroError::DecodeError(
486 "Invalid encode mode".to_string().into(),
487 ))
488 }
489 }
490
491 #[inline(always)]
493 pub fn can_reset_with_snapshot(&self) -> bool {
494 let oplog = self.oplog.lock();
495 if oplog.batch_importing {
496 return false;
497 }
498
499 if self.is_detached() {
500 return false;
501 }
502
503 oplog.is_empty() && self.state.lock().can_import_snapshot()
504 }
505
506 #[inline(always)]
512 pub fn is_detached(&self) -> bool {
513 self.detached.load(Acquire)
514 }
515
516 pub(crate) fn set_detached(&self, detached: bool) {
517 self.detached.store(detached, Release);
518 }
519
520 #[inline(always)]
521 pub fn peer_id(&self) -> PeerID {
522 self.state
523 .lock()
524 .peer
525 .load(std::sync::atomic::Ordering::Relaxed)
526 }
527
528 #[inline(always)]
529 pub fn detach(&self) {
530 self.with_barrier(|| self.set_detached(true));
531 }
532
533 #[inline(always)]
534 pub fn attach(&self) {
535 self.checkout_to_latest()
536 }
537
538 pub fn state_timestamp(&self) -> Timestamp {
541 let f = { self.state.lock().frontiers.clone() };
543 self.oplog.lock().get_timestamp_of_version(&f)
544 }
545
546 #[inline(always)]
547 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
548 &self.state
549 }
550
551 #[inline]
552 pub fn get_state_deep_value(&self) -> LoroValue {
553 self.state.lock().get_deep_value()
554 }
555
556 #[inline(always)]
557 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
558 &self.oplog
559 }
560
561 #[inline(always)]
562 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
563 let s = debug_span!("import", peer = self.peer_id());
564 let _e = s.enter();
565 self.import_with(bytes, Default::default())
566 }
567
568 #[inline]
569 pub fn import_with(
570 &self,
571 bytes: &[u8],
572 origin: InternalString,
573 ) -> Result<ImportStatus, LoroError> {
574 self.with_barrier(|| self._import_with(bytes, origin))
575 }
576
577 #[tracing::instrument(skip_all)]
578 fn _import_with(
579 &self,
580 bytes: &[u8],
581 origin: InternalString,
582 ) -> Result<ImportStatus, LoroError> {
583 ensure_cov::notify_cov("loro_internal::import");
584 let parsed = parse_header_and_body(bytes, true)?;
585 loro_common::info!("Importing with mode={:?}", &parsed.mode);
586 let result = match parsed.mode {
587 EncodeMode::OutdatedRle => {
588 if self.state.lock().is_in_txn() {
589 return Err(LoroError::ImportWhenInTxn);
590 }
591
592 let s = tracing::span!(
593 tracing::Level::INFO,
594 "Import updates ",
595 peer = self.peer_id()
596 );
597 let _e = s.enter();
598 self.update_oplog_and_apply_delta_to_state_if_needed(
599 |oplog| oplog.decode(parsed),
600 origin,
601 )
602 }
603 EncodeMode::OutdatedSnapshot => {
604 if self.can_reset_with_snapshot() {
605 loro_common::info!("Init by snapshot {}", self.peer_id());
606 decode_snapshot(self, parsed.mode, parsed.body, origin)
607 } else {
608 self.update_oplog_and_apply_delta_to_state_if_needed(
609 |oplog| oplog.decode(parsed),
610 origin,
611 )
612 }
613 }
614 EncodeMode::FastSnapshot => {
615 if self.can_reset_with_snapshot() {
616 ensure_cov::notify_cov("loro_internal::import::snapshot");
617 loro_common::info!("Init by fast snapshot {}", self.peer_id());
618 decode_snapshot(self, parsed.mode, parsed.body, origin)
619 } else {
620 self.update_oplog_and_apply_delta_to_state_if_needed(
621 |oplog| oplog.decode(parsed),
622 origin,
623 )
624
625 }
630 }
631 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
632 |oplog| oplog.decode(parsed),
633 origin,
634 ),
635 EncodeMode::Auto => {
636 unreachable!()
637 }
638 };
639
640 self.emit_events();
641
642 result
643 }
644
645 #[tracing::instrument(skip_all)]
646 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
647 &self,
648 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
649 origin: InternalString,
650 ) -> Result<ImportStatus, LoroError> {
651 let mut oplog = self.oplog.lock();
652 if !self.is_detached() {
653 let old_vv = oplog.vv().clone();
654 let old_frontiers = oplog.frontiers().clone();
655 let result = f(&mut oplog);
656 if &old_vv != oplog.vv() {
657 let mut diff = DiffCalculator::new(false);
658 let (diff, diff_mode) = diff.calc_diff_internal(
659 &oplog,
660 &old_vv,
661 &old_frontiers,
662 oplog.vv(),
663 oplog.dag.get_frontiers(),
664 None,
665 );
666 let mut state = self.state.lock();
667 state.apply_diff(
668 InternalDocDiff {
669 origin,
670 diff: (diff).into(),
671 by: EventTriggerKind::Import,
672 new_version: Cow::Owned(oplog.frontiers().clone()),
673 },
674 diff_mode,
675 )?;
676 }
677 result
678 } else {
679 f(&mut oplog)
680 }
681 }
682
683 fn emit_events(&self) {
684 let events = {
686 let mut state = self.state.lock();
687 state.take_events()
688 };
689 for event in events {
690 self.observer.emit(event);
691 }
692 }
693
694 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
695 let mut state = self.state.lock();
696 state.take_events()
697 }
698
699 #[tracing::instrument(skip_all)]
703 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
704 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
705 self.with_barrier(|| {
706 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
707 |oplog| crate::encoding::json_schema::import_json(oplog, json),
708 Default::default(),
709 );
710 self.emit_events();
711 result
712 })
713 }
714
715 pub fn export_json_updates(
716 &self,
717 start_vv: &VersionVector,
718 end_vv: &VersionVector,
719 with_peer_compression: bool,
720 ) -> JsonSchema {
721 self.with_barrier(|| {
722 let oplog = self.oplog.lock();
723 let mut start_vv = start_vv;
724 let _temp: Option<VersionVector>;
725 if !oplog.dag.shallow_since_vv().is_empty() {
726 let mut include_all = true;
728 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
729 if start_vv.get(peer).unwrap_or(&0) < counter {
730 include_all = false;
731 break;
732 }
733 }
734 if !include_all {
735 let mut vv = start_vv.clone();
736 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
737 vv.extend_to_include_end_id(ID::new(peer, counter));
738 }
739 _temp = Some(vv);
740 start_vv = _temp.as_ref().unwrap();
741 }
742 }
743
744 crate::encoding::json_schema::export_json(
745 &oplog,
746 start_vv,
747 end_vv,
748 with_peer_compression,
749 )
750 })
751 }
752
753 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
754 let oplog = self.oplog.lock();
755 let mut changes = export_json_in_id_span(&oplog, id_span);
756 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
757 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
758 changes.push(change_json);
759 }
760 changes
761 }
762
763 #[inline]
765 pub fn oplog_vv(&self) -> VersionVector {
766 self.oplog.lock().vv().clone()
767 }
768
769 #[inline]
771 pub fn state_vv(&self) -> VersionVector {
772 let oplog = self.oplog.lock();
773 let f = &self.state.lock().frontiers;
774 oplog.dag.frontiers_to_vv(f).unwrap()
775 }
776
777 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
778 let value: LoroValue = self.state.lock().get_value_by_path(path)?;
779 if let LoroValue::Container(c) = value {
780 Some(ValueOrHandler::Handler(Handler::new_attached(
781 c.clone(),
782 self.clone(),
783 )))
784 } else {
785 Some(ValueOrHandler::Value(value))
786 }
787 }
788
789 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
791 let path = str_to_path(path)?;
792 self.get_by_path(&path)
793 }
794
795 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
796 let arena = &self.arena;
797 let txn = self.txn.lock();
798 let txn = txn.as_ref()?;
799 let ops_ = txn.local_ops();
800 let new_id = ID {
801 peer: *txn.peer(),
802 counter: ops_.first()?.counter,
803 };
804 let change = ChangeRef {
805 id: &new_id,
806 deps: txn.frontiers(),
807 timestamp: &txn
808 .timestamp()
809 .as_ref()
810 .copied()
811 .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
812 commit_msg: txn.msg(),
813 ops: ops_,
814 lamport: txn.lamport(),
815 };
816 let json = encode_change_to_json(change, arena);
817 Some(json)
818 }
819
820 #[inline]
821 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
822 if self.has_container(&id) {
823 Some(Handler::new_attached(id, self.clone()))
824 } else {
825 None
826 }
827 }
828
829 #[inline]
832 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
833 let id = id.into_container_id(&self.arena, ContainerType::Text);
834 assert!(self.has_container(&id));
835 Handler::new_attached(id, self.clone()).into_text().unwrap()
836 }
837
838 #[inline]
841 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
842 let id = id.into_container_id(&self.arena, ContainerType::List);
843 assert!(self.has_container(&id));
844 Handler::new_attached(id, self.clone()).into_list().unwrap()
845 }
846
847 #[inline]
850 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
851 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
852 assert!(self.has_container(&id));
853 Handler::new_attached(id, self.clone())
854 .into_movable_list()
855 .unwrap()
856 }
857
858 #[inline]
861 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
862 let id = id.into_container_id(&self.arena, ContainerType::Map);
863 assert!(self.has_container(&id));
864 Handler::new_attached(id, self.clone()).into_map().unwrap()
865 }
866
867 #[inline]
870 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
871 let id = id.into_container_id(&self.arena, ContainerType::Tree);
872 assert!(self.has_container(&id));
873 Handler::new_attached(id, self.clone()).into_tree().unwrap()
874 }
875
876 #[cfg(feature = "counter")]
877 pub fn get_counter<I: IntoContainerId>(
878 &self,
879 id: I,
880 ) -> crate::handler::counter::CounterHandler {
881 let id = id.into_container_id(&self.arena, ContainerType::Counter);
882 assert!(self.has_container(&id));
883 Handler::new_attached(id, self.clone())
884 .into_counter()
885 .unwrap()
886 }
887
888 #[must_use]
889 pub fn has_container(&self, id: &ContainerID) -> bool {
890 if id.is_root() {
891 return true;
892 }
893
894 let exist = self.state.lock().does_container_exist(id);
895 exist
896 }
897
898 #[instrument(level = "info", skip_all)]
912 pub fn undo_internal(
913 &self,
914 id_span: IdSpan,
915 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
916 post_transform_base: Option<&DiffBatch>,
917 before_diff: &mut dyn FnMut(&DiffBatch),
918 ) -> LoroResult<CommitWhenDrop<'_>> {
919 if !self.can_edit() {
920 return Err(LoroError::EditWhenDetached);
921 }
922
923 let (options, txn) = self.implicit_commit_then_stop();
924 if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
925 self.renew_txn_if_auto_commit(options);
926 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
927 }
928
929 let (was_recording, latest_frontiers) = {
930 let mut state = self.state.lock();
931 let was_recording = state.is_recording();
932 state.stop_and_clear_recording();
933 (was_recording, state.frontiers.clone())
934 };
935
936 let spans = self.oplog.lock().split_span_based_on_deps(id_span);
937 let diff = crate::undo::undo(
938 spans,
939 match post_transform_base {
940 Some(d) => Either::Right(d),
941 None => Either::Left(&latest_frontiers),
942 },
943 |from, to| {
944 self._checkout_without_emitting(from, false, false).unwrap();
945 self.state.lock().start_recording();
946 self._checkout_without_emitting(to, false, false).unwrap();
947 let mut state = self.state.lock();
948 let e = state.take_events();
949 state.stop_and_clear_recording();
950 DiffBatch::new(e)
951 },
952 before_diff,
953 );
954
955 self._checkout_without_emitting(&latest_frontiers, false, false)?;
959 self.set_detached(false);
960 if was_recording {
961 self.state.lock().start_recording();
962 }
963 drop(txn);
964 self.start_auto_commit();
965 if let Err(e) = self._apply_diff(diff, container_remap, true) {
969 warn!("Undo Failed {:?}", e);
970 }
971
972 if let Some(options) = options {
973 self.set_next_commit_options(options);
974 }
975 Ok(CommitWhenDrop {
976 doc: self,
977 default_options: CommitOptions::new().origin("undo"),
978 })
979 }
980
981 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
987 let f = self.state_frontiers();
990 let diff = self.diff(&f, target)?;
991 self._apply_diff(diff, &mut Default::default(), false)
992 }
993
994 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
999 {
1000 let oplog = self.oplog.lock();
1003 let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1004 for id in frontiers.iter() {
1005 if !oplog.dag.contains(id) {
1006 return Err(LoroError::FrontiersNotFound(id));
1007 }
1008 }
1009
1010 if oplog.dag.is_before_shallow_root(frontiers) {
1011 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1012 }
1013
1014 Ok(())
1015 };
1016
1017 validate_frontiers(a)?;
1018 validate_frontiers(b)?;
1019 }
1020
1021 let (options, txn) = self.implicit_commit_then_stop();
1022 let was_detached = self.is_detached();
1023 let old_frontiers = self.state_frontiers();
1024 let was_recording = {
1025 let mut state = self.state.lock();
1026 let is_recording = state.is_recording();
1027 state.stop_and_clear_recording();
1028 is_recording
1029 };
1030 self._checkout_without_emitting(a, true, false).unwrap();
1031 self.state.lock().start_recording();
1032 self._checkout_without_emitting(b, true, false).unwrap();
1033 let e = {
1034 let mut state = self.state.lock();
1035 let e = state.take_events();
1036 state.stop_and_clear_recording();
1037 e
1038 };
1039 self._checkout_without_emitting(&old_frontiers, false, false)
1040 .unwrap();
1041 drop(txn);
1042 if !was_detached {
1043 self.set_detached(false);
1044 self.renew_txn_if_auto_commit(options);
1045 }
1046 if was_recording {
1047 self.state.lock().start_recording();
1048 }
1049 Ok(DiffBatch::new(e))
1050 }
1051
1052 #[inline(always)]
1054 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1055 self._apply_diff(diff, &mut Default::default(), true)
1056 }
1057
1058 pub(crate) fn _apply_diff(
1070 &self,
1071 diff: DiffBatch,
1072 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1073 skip_unreachable: bool,
1074 ) -> LoroResult<()> {
1075 if !self.can_edit() {
1076 return Err(LoroError::EditWhenDetached);
1077 }
1078
1079 let mut ans: LoroResult<()> = Ok(());
1080 let mut missing_containers: Vec<ContainerID> = Vec::new();
1081 for (mut id, diff) in diff.into_iter() {
1082 let mut remapped = false;
1083 while let Some(rid) = container_remap.get(&id) {
1084 remapped = true;
1085 id = rid.clone();
1086 }
1087
1088 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1089 let exists = self.state.lock().does_container_exist(&id);
1091 if !exists {
1092 missing_containers.push(id);
1093 continue;
1094 }
1095 self.state.lock().ensure_container(&id);
1097 }
1098
1099 if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1100 continue;
1101 }
1102
1103 let Some(h) = self.get_handler(id.clone()) else {
1104 return Err(LoroError::ContainersNotFound {
1105 containers: Box::new(vec![id]),
1106 });
1107 };
1108 if let Err(e) = h.apply_diff(diff, container_remap) {
1109 ans = Err(e);
1110 }
1111 }
1112
1113 if !missing_containers.is_empty() {
1114 return Err(LoroError::ContainersNotFound {
1115 containers: Box::new(missing_containers),
1116 });
1117 }
1118
1119 ans
1120 }
1121
1122 #[inline]
1124 pub fn diagnose_size(&self) {
1125 self.oplog().lock().diagnose_size();
1126 }
1127
1128 #[inline]
1129 pub fn oplog_frontiers(&self) -> Frontiers {
1130 self.oplog().lock().frontiers().clone()
1131 }
1132
1133 #[inline]
1134 pub fn state_frontiers(&self) -> Frontiers {
1135 self.state.lock().frontiers.clone()
1136 }
1137
1138 #[inline]
1142 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1143 self.oplog().lock().cmp_with_frontiers(other)
1144 }
1145
1146 #[inline]
1150 pub fn cmp_frontiers(
1151 &self,
1152 a: &Frontiers,
1153 b: &Frontiers,
1154 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1155 self.oplog().lock().cmp_frontiers(a, b)
1156 }
1157
1158 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1159 let mut state = self.state.lock();
1160 if !state.is_recording() {
1161 state.start_recording();
1162 }
1163
1164 self.observer.subscribe_root(callback)
1165 }
1166
1167 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1168 let mut state = self.state.lock();
1169 if !state.is_recording() {
1170 state.start_recording();
1171 }
1172
1173 self.observer.subscribe(container_id, callback)
1174 }
1175
1176 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1177 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1178 activate();
1179 sub
1180 }
1181
1182 #[tracing::instrument(skip_all)]
1184 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1185 if bytes.is_empty() {
1186 return Ok(ImportStatus::default());
1187 }
1188
1189 if bytes.len() == 1 {
1190 return self.import(&bytes[0]);
1191 }
1192
1193 let mut success = VersionRange::default();
1194 let mut pending = VersionRange::default();
1195 let mut meta_arr = bytes
1196 .iter()
1197 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1198 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1199 meta_arr.sort_by(|a, b| {
1200 a.0.mode
1201 .cmp(&b.0.mode)
1202 .then(b.0.change_num.cmp(&a.0.change_num))
1203 });
1204
1205 let (options, txn) = self.implicit_commit_then_stop();
1206 let is_detached = self.is_detached();
1230 self.set_detached(true);
1231 self.oplog.lock().batch_importing = true;
1232 let mut err = None;
1233 for (_meta, data) in meta_arr {
1234 match self._import_with(data, Default::default()) {
1235 Ok(s) => {
1236 for (peer, (start, end)) in s.success.iter() {
1237 match success.0.entry(*peer) {
1238 Entry::Occupied(mut e) => {
1239 e.get_mut().1 = *end.max(&e.get().1);
1240 }
1241 Entry::Vacant(e) => {
1242 e.insert((*start, *end));
1243 }
1244 }
1245 }
1246
1247 if let Some(p) = s.pending.as_ref() {
1248 for (&peer, &(start, end)) in p.iter() {
1249 match pending.0.entry(peer) {
1250 Entry::Occupied(mut e) => {
1251 e.get_mut().0 = start.min(e.get().0);
1252 e.get_mut().1 = end.min(e.get().1);
1253 }
1254 Entry::Vacant(e) => {
1255 e.insert((start, end));
1256 }
1257 }
1258 }
1259 }
1260 }
1261 Err(e) => {
1262 err = Some(e);
1263 }
1264 }
1265 }
1266
1267 let mut oplog = self.oplog.lock();
1268 oplog.batch_importing = false;
1269 drop(oplog);
1270 if !is_detached {
1271 self._checkout_to_latest_with_guard(txn);
1272 } else {
1273 drop(txn);
1274 }
1275
1276 self.renew_txn_if_auto_commit(options);
1277 if let Some(err) = err {
1278 return Err(err);
1279 }
1280
1281 Ok(ImportStatus {
1282 success,
1283 pending: if pending.is_empty() {
1284 None
1285 } else {
1286 Some(pending)
1287 },
1288 })
1289 }
1290
1291 #[inline]
1293 pub fn get_value(&self) -> LoroValue {
1294 self.state.lock().get_value()
1295 }
1296
1297 #[inline]
1299 pub fn get_deep_value(&self) -> LoroValue {
1300 self.state.lock().get_deep_value()
1301 }
1302
1303 #[inline]
1305 pub fn get_deep_value_with_id(&self) -> LoroValue {
1306 self.state.lock().get_deep_value_with_id()
1307 }
1308
1309 pub fn checkout_to_latest(&self) {
1310 let (options, _guard) = self.implicit_commit_then_stop();
1311 if !self.is_detached() {
1312 drop(_guard);
1313 self.renew_txn_if_auto_commit(options);
1314 return;
1315 }
1316
1317 self._checkout_to_latest_without_commit(true);
1318 drop(_guard);
1319 self.renew_txn_if_auto_commit(options);
1320 }
1321
1322 fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1323 if !self.is_detached() {
1324 self._renew_txn_if_auto_commit_with_guard(None, guard);
1325 return;
1326 }
1327
1328 self._checkout_to_latest_without_commit(true);
1329 self._renew_txn_if_auto_commit_with_guard(None, guard);
1330 }
1331
1332 pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1334 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1335 let f = self.oplog_frontiers();
1336 let this = &self;
1337 let frontiers = &f;
1338 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1339 .unwrap(); this.emit_events();
1342 if this.config.detached_editing() {
1343 this.renew_peer_id();
1344 }
1345
1346 self.set_detached(false);
1347 });
1348 }
1349
1350 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1355 let was_detached = self.is_detached();
1356 let (options, guard) = self.implicit_commit_then_stop();
1357 let result = self._checkout_without_emitting(frontiers, true, true);
1358 if result.is_ok() {
1359 self.emit_events();
1360 }
1361 drop(guard);
1362 if self.config.detached_editing() {
1363 if result.is_ok() {
1364 self.renew_peer_id();
1365 }
1366 self.renew_txn_if_auto_commit(options);
1367 } else if result.is_err() {
1368 if !was_detached {
1369 self.renew_txn_if_auto_commit(options);
1370 }
1371 } else if !self.is_detached() {
1372 self.renew_txn_if_auto_commit(options);
1373 }
1374
1375 result
1376 }
1377
1378 #[instrument(level = "info", skip(self))]
1380 pub(crate) fn _checkout_without_emitting(
1381 &self,
1382 frontiers: &Frontiers,
1383 to_shrink_frontiers: bool,
1384 to_commit_then_renew: bool,
1385 ) -> Result<(), LoroError> {
1386 if !self.txn.is_locked() {
1387 return Err(LoroError::TransactionError(
1388 "checkout requires the transaction mutex to be held"
1389 .to_string()
1390 .into_boxed_str(),
1391 ));
1392 }
1393 let from_frontiers = self.state_frontiers();
1394 loro_common::info!(
1395 "checkout from={:?} to={:?} cur_vv={:?}",
1396 from_frontiers,
1397 frontiers,
1398 self.oplog_vv()
1399 );
1400
1401 if &from_frontiers == frontiers {
1402 return Ok(());
1403 }
1404
1405 let oplog = self.oplog.lock();
1406 if oplog.dag.is_before_shallow_root(frontiers) {
1407 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1408 }
1409
1410 let frontiers = if to_shrink_frontiers {
1411 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1412 } else {
1413 frontiers.clone()
1414 };
1415
1416 if from_frontiers == frontiers {
1417 return Ok(());
1418 }
1419
1420 let mut state = self.state.lock();
1421 let mut calc = self.diff_calculator.lock();
1422 for i in frontiers.iter() {
1423 if !oplog.dag.contains(i) {
1424 return Err(LoroError::FrontiersNotFound(i));
1425 }
1426 }
1427
1428 let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1429 LoroError::NotFoundError(
1430 format!(
1431 "Cannot find the current state version {:?}",
1432 state.frontiers
1433 )
1434 .into_boxed_str(),
1435 )
1436 })?;
1437 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1438 return Err(LoroError::NotFoundError(
1439 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1440 ));
1441 };
1442
1443 self.set_detached(true);
1444 let (diff, diff_mode) =
1445 calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1446 state.apply_diff(
1447 InternalDocDiff {
1448 origin: "checkout".into(),
1449 diff: Cow::Owned(diff),
1450 by: EventTriggerKind::Checkout,
1451 new_version: Cow::Owned(frontiers.clone()),
1452 },
1453 diff_mode,
1454 )?;
1455
1456 Ok(())
1457 }
1458
1459 #[inline]
1460 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1461 self.oplog.lock().dag.vv_to_frontiers(vv)
1462 }
1463
1464 #[inline]
1465 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1466 self.oplog.lock().dag.frontiers_to_vv(frontiers)
1467 }
1468
1469 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1473 let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1474 self.import(&updates)
1475 }
1476
1477 pub(crate) fn arena(&self) -> &SharedArena {
1478 &self.arena
1479 }
1480
1481 #[inline]
1482 pub fn len_ops(&self) -> usize {
1483 let oplog = self.oplog.lock();
1484 let ans = oplog.vv().values().sum::<i32>() as usize;
1485 if oplog.is_shallow() {
1486 let sub = oplog
1487 .shallow_since_vv()
1488 .iter()
1489 .map(|(_, ops)| *ops)
1490 .sum::<i32>() as usize;
1491 ans - sub
1492 } else {
1493 ans
1494 }
1495 }
1496
1497 #[inline]
1498 pub fn len_changes(&self) -> usize {
1499 let oplog = self.oplog.lock();
1500 oplog.len_changes()
1501 }
1502
1503 pub fn config(&self) -> &Configure {
1504 &self.config
1505 }
1506
1507 pub fn check_state_diff_calc_consistency_slow(&self) {
1512 {
1514 static IS_CHECKING: std::sync::atomic::AtomicBool =
1515 std::sync::atomic::AtomicBool::new(false);
1516 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1517 return;
1518 }
1519
1520 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1521 let peer_id = self.peer_id();
1522 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1523 let _g = s.enter();
1524 let options = self.implicit_commit_then_stop().0;
1525 self.oplog.lock().check_dag_correctness();
1526 if self.is_shallow() {
1527 let initial_snapshot = self
1538 .export(ExportMode::state_only(Some(
1539 &self.shallow_since_frontiers(),
1540 )))
1541 .unwrap();
1542
1543 let doc = LoroDoc::new();
1545 doc.import(&initial_snapshot).unwrap();
1546 self.checkout(&self.shallow_since_frontiers()).unwrap();
1547 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1548
1549 let updates = self.export(ExportMode::all_updates()).unwrap();
1551
1552 doc.import(&updates).unwrap();
1554 self.checkout_to_latest();
1555
1556 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1559 let mut calculated_state = doc.app_state().lock();
1560 let mut current_state = self.app_state().lock();
1561 current_state.check_is_the_same(&mut calculated_state);
1562 } else {
1563 let f = self.state_frontiers();
1564 let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1565 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1566 let doc = Self::new();
1567 doc.import(&bytes).unwrap();
1568 let mut calculated_state = doc.app_state().lock();
1569 let mut current_state = self.app_state().lock();
1570 current_state.check_is_the_same(&mut calculated_state);
1571 }
1572
1573 self.renew_txn_if_auto_commit(options);
1574 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1575 }
1576 }
1577
1578 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1579 self.query_pos_internal(pos, true)
1580 }
1581
1582 pub(crate) fn query_pos_internal(
1584 &self,
1585 pos: &Cursor,
1586 ret_event_index: bool,
1587 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1588 if !self.has_container(&pos.container) {
1589 return Err(CannotFindRelativePosition::IdNotFound);
1590 }
1591
1592 let mut state = self.state.lock();
1593 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1594 Ok(PosQueryResult {
1595 update: None,
1596 current: AbsolutePosition {
1597 pos: ans,
1598 side: pos.side,
1599 },
1600 })
1601 } else {
1602 drop(state);
1614 let result = self.with_barrier(|| {
1615 let oplog = self.oplog().lock();
1616 if let Some(id) = pos.id {
1618 if oplog.arena.id_to_idx(&pos.container).is_none() {
1620 let mut s = self.state.lock();
1621 if !s.does_container_exist(&pos.container) {
1622 return Err(CannotFindRelativePosition::ContainerDeleted);
1623 }
1624 s.ensure_container(&pos.container);
1625 drop(s);
1626 }
1627 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1628 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1630 if oplog.shallow_since_vv().includes_id(id) {
1631 return Err(CannotFindRelativePosition::HistoryCleared);
1632 }
1633
1634 tracing::error!("Cannot find id {}", id);
1635 return Err(CannotFindRelativePosition::IdNotFound);
1636 };
1637 let mut diff_calc = DiffCalculator::new(true);
1639 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1640 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1641 diff_calc.calc_diff_internal(
1643 &oplog,
1644 before,
1645 &before_frontiers,
1646 oplog.vv(),
1647 oplog.frontiers(),
1648 Some(&|target| idx == target),
1649 );
1650 let depth = self.arena.get_depth(idx);
1652 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1653 match diff_calc {
1654 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1655 let c = text.get_id_latest_pos(id).unwrap();
1656 let new_pos = c.pos;
1657 let handler = self.get_text(&pos.container);
1658 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1659 Ok(PosQueryResult {
1660 update: handler.get_cursor(current_pos, c.side),
1661 current: AbsolutePosition {
1662 pos: current_pos,
1663 side: c.side,
1664 },
1665 })
1666 }
1667 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1668 let c = list.get_id_latest_pos(id).unwrap();
1669 let new_pos = c.pos;
1670 let handler = self.get_list(&pos.container);
1671 Ok(PosQueryResult {
1672 update: handler.get_cursor(new_pos, c.side),
1673 current: AbsolutePosition {
1674 pos: new_pos,
1675 side: c.side,
1676 },
1677 })
1678 }
1679 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1680 let c = list.get_id_latest_pos(id).unwrap();
1681 let new_pos = c.pos;
1682 let handler = self.get_movable_list(&pos.container);
1683 let new_pos = handler.op_pos_to_user_pos(new_pos);
1684 Ok(PosQueryResult {
1685 update: handler.get_cursor(new_pos, c.side),
1686 current: AbsolutePosition {
1687 pos: new_pos,
1688 side: c.side,
1689 },
1690 })
1691 }
1692 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1693 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1694 #[cfg(feature = "counter")]
1695 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1696 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1697 }
1698 } else {
1699 match pos.container.container_type() {
1700 ContainerType::Text => {
1701 let text = self.get_text(&pos.container);
1702 Ok(PosQueryResult {
1703 update: Some(Cursor {
1704 id: None,
1705 container: text.id(),
1706 side: pos.side,
1707 origin_pos: text.len_unicode(),
1708 }),
1709 current: AbsolutePosition {
1710 pos: text.len_event(),
1711 side: pos.side,
1712 },
1713 })
1714 }
1715 ContainerType::List => {
1716 let list = self.get_list(&pos.container);
1717 Ok(PosQueryResult {
1718 update: Some(Cursor {
1719 id: None,
1720 container: list.id(),
1721 side: pos.side,
1722 origin_pos: list.len(),
1723 }),
1724 current: AbsolutePosition {
1725 pos: list.len(),
1726 side: pos.side,
1727 },
1728 })
1729 }
1730 ContainerType::MovableList => {
1731 let list = self.get_movable_list(&pos.container);
1732 Ok(PosQueryResult {
1733 update: Some(Cursor {
1734 id: None,
1735 container: list.id(),
1736 side: pos.side,
1737 origin_pos: list.len(),
1738 }),
1739 current: AbsolutePosition {
1740 pos: list.len(),
1741 side: pos.side,
1742 },
1743 })
1744 }
1745 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1746 unreachable!()
1747 }
1748 #[cfg(feature = "counter")]
1749 ContainerType::Counter => unreachable!(),
1750 }
1751 }
1752 });
1753 result
1754 }
1755 }
1756
1757 pub fn free_history_cache(&self) {
1762 self.oplog.lock().free_history_cache();
1763 }
1764
1765 pub fn free_diff_calculator(&self) {
1767 *self.diff_calculator.lock() = DiffCalculator::new(true);
1768 }
1769
1770 pub fn has_history_cache(&self) -> bool {
1773 self.oplog.lock().has_history_cache()
1774 }
1775
1776 #[inline]
1780 pub fn compact_change_store(&self) {
1781 self.with_barrier(|| {
1782 self.oplog.lock().compact_change_store();
1783 });
1784 }
1785
1786 #[inline]
1790 pub fn analyze(&self) -> DocAnalysis {
1791 DocAnalysis::analyze(self)
1792 }
1793
1794 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1796 let mut state = self.state.lock();
1797 if state.arena.id_to_idx(id).is_none() {
1798 if !state.does_container_exist(id) {
1799 return None;
1800 }
1801 state.ensure_container(id);
1802 }
1803 let idx = state.arena.id_to_idx(id).unwrap();
1804 state.get_path(idx)
1805 }
1806
1807 #[instrument(skip(self))]
1808 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1809 self.with_barrier(|| {
1810 let ans = match mode {
1811 ExportMode::Snapshot => export_fast_snapshot(self),
1812 ExportMode::Updates { from } => export_fast_updates(self, &from),
1813 ExportMode::UpdatesInRange { spans } => {
1814 export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
1815 }
1816 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1817 ExportMode::StateOnly(f) => match f {
1818 Some(f) => export_state_only_snapshot(self, &f)?,
1819 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1820 },
1821 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1822 };
1823 Ok(ans)
1824 })
1825 }
1826
1827 pub fn shallow_since_vv(&self) -> ImVersionVector {
1833 self.oplog().lock().shallow_since_vv().clone()
1834 }
1835
1836 pub fn shallow_since_frontiers(&self) -> Frontiers {
1837 self.oplog().lock().shallow_since_frontiers().clone()
1838 }
1839
1840 pub fn is_shallow(&self) -> bool {
1842 !self.oplog().lock().shallow_since_vv().is_empty()
1843 }
1844
1845 pub fn get_pending_txn_len(&self) -> usize {
1850 if let Some(txn) = self.txn.lock().as_ref() {
1851 txn.len()
1852 } else {
1853 0
1854 }
1855 }
1856
1857 #[inline]
1858 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1859 self.oplog().lock().dag.find_path(from, to)
1860 }
1861
1862 pub fn subscribe_first_commit_from_peer(
1868 &self,
1869 callback: FirstCommitFromPeerCallback,
1870 ) -> Subscription {
1871 let (s, enable) = self
1872 .first_commit_from_peer_subs
1873 .inner()
1874 .insert((), callback);
1875 enable();
1876 s
1877 }
1878
1879 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1884 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1885 enable();
1886 s
1887 }
1888}
1889
1890#[derive(Debug, thiserror::Error)]
1891pub enum ChangeTravelError {
1892 #[error("Target id not found {0:?}")]
1893 TargetIdNotFound(ID),
1894 #[error("The shallow history of the doc doesn't include the target version")]
1895 TargetVersionNotIncluded,
1896}
1897
1898impl LoroDoc {
1899 pub fn travel_change_ancestors(
1900 &self,
1901 ids: &[ID],
1902 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1903 ) -> Result<(), ChangeTravelError> {
1904 let (options, guard) = self.implicit_commit_then_stop();
1905 drop(guard);
1906 struct PendingNode(ChangeMeta);
1907 impl PartialEq for PendingNode {
1908 fn eq(&self, other: &Self) -> bool {
1909 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1910 }
1911 }
1912
1913 impl Eq for PendingNode {}
1914 impl PartialOrd for PendingNode {
1915 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1916 Some(self.cmp(other))
1917 }
1918 }
1919
1920 impl Ord for PendingNode {
1921 fn cmp(&self, other: &Self) -> Ordering {
1922 self.0
1923 .lamport_last()
1924 .cmp(&other.0.lamport_last())
1925 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1926 }
1927 }
1928
1929 for id in ids {
1930 let op_log = &self.oplog().lock();
1931 if !op_log.vv().includes_id(*id) {
1932 return Err(ChangeTravelError::TargetIdNotFound(*id));
1933 }
1934 if op_log.dag.shallow_since_vv().includes_id(*id) {
1935 return Err(ChangeTravelError::TargetVersionNotIncluded);
1936 }
1937 }
1938
1939 let mut visited = FxHashSet::default();
1940 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1941 for id in ids {
1942 pending.push(PendingNode(ChangeMeta::from_change(
1943 &self.oplog().lock().get_change_at(*id).unwrap(),
1944 )));
1945 }
1946 while let Some(PendingNode(node)) = pending.pop() {
1947 let deps = node.deps.clone();
1948 if f(node).is_break() {
1949 break;
1950 }
1951
1952 for dep in deps.iter() {
1953 let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
1954 continue;
1955 };
1956 if visited.contains(&dep_node.id) {
1957 continue;
1958 }
1959
1960 visited.insert(dep_node.id);
1961 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1962 }
1963 }
1964
1965 let ans = Ok(());
1966 self.renew_txn_if_auto_commit(options);
1967 ans
1968 }
1969
1970 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1971 self.with_barrier(|| {
1972 let mut set = FxHashSet::default();
1973 {
1974 let oplog = self.oplog().lock();
1975 for op in oplog.iter_ops(id.to_span(len)) {
1976 let id = oplog.arena.get_container_id(op.container()).unwrap();
1977 set.insert(id);
1978 }
1979 }
1980 set
1981 })
1982 }
1983
1984 pub fn delete_root_container(&self, cid: ContainerID) {
1985 if !cid.is_root() {
1986 return;
1987 }
1988
1989 if !self.has_container(&cid) {
1991 return;
1992 }
1993
1994 let Some(h) = self.get_handler(cid.clone()) else {
1995 return;
1996 };
1997
1998 if let Err(e) = h.clear() {
1999 eprintln!("Failed to clear handler: {:?}", e);
2000 return;
2001 }
2002 self.config.deleted_root_containers.lock().insert(cid);
2003 }
2004
2005 pub fn set_hide_empty_root_containers(&self, hide: bool) {
2006 self.config
2007 .hide_empty_root_containers
2008 .store(hide, std::sync::atomic::Ordering::Relaxed);
2009 }
2010}
2011
2012fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2014 let start_vv = oplog
2015 .dag
2016 .frontiers_to_vv(&id.into())
2017 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2018 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2019 for op in change.ops.iter().rev() {
2020 if op.container != idx {
2021 continue;
2022 }
2023 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2024 if d.id_start.to_span(d.atom_len()).contains(id) {
2025 return Some(ID::new(change.peer(), op.counter));
2026 }
2027 }
2028 }
2029 }
2030
2031 None
2032}
2033
2034#[derive(Debug)]
2035pub struct CommitWhenDrop<'a> {
2036 doc: &'a LoroDoc,
2037 default_options: CommitOptions,
2038}
2039
2040impl Drop for CommitWhenDrop<'_> {
2041 fn drop(&mut self) {
2042 {
2043 let mut guard = self.doc.txn.lock();
2044 if let Some(txn) = guard.as_mut() {
2045 txn.set_default_options(std::mem::take(&mut self.default_options));
2046 };
2047 }
2048
2049 self.doc.commit_then_renew();
2050 }
2051}
2052
2053#[derive(Debug, Clone)]
2055pub struct CommitOptions {
2056 pub origin: Option<InternalString>,
2059
2060 pub immediate_renew: bool,
2063
2064 pub timestamp: Option<Timestamp>,
2067
2068 pub commit_msg: Option<Arc<str>>,
2070}
2071
2072impl CommitOptions {
2073 pub fn new() -> Self {
2075 Self {
2076 origin: None,
2077 immediate_renew: true,
2078 timestamp: None,
2079 commit_msg: None,
2080 }
2081 }
2082
2083 pub fn origin(mut self, origin: &str) -> Self {
2085 self.origin = Some(origin.into());
2086 self
2087 }
2088
2089 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2091 self.immediate_renew = immediate_renew;
2092 self
2093 }
2094
2095 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2099 self.timestamp = Some(timestamp);
2100 self
2101 }
2102
2103 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2105 self.commit_msg = Some(commit_msg.into());
2106 self
2107 }
2108
2109 pub fn set_origin(&mut self, origin: Option<&str>) {
2111 self.origin = origin.map(|x| x.into())
2112 }
2113
2114 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2116 self.timestamp = timestamp;
2117 }
2118}
2119
2120impl Default for CommitOptions {
2121 fn default() -> Self {
2122 Self::new()
2123 }
2124}
2125
2126#[cfg(test)]
2127mod test {
2128 use std::panic::AssertUnwindSafe;
2129
2130 use crate::{cursor::PosType, loro::ExportMode, version::Frontiers, LoroDoc, ToJson};
2131 use loro_common::ID;
2132
2133 #[test]
2134 fn test_sync() {
2135 fn is_send_sync<T: Send + Sync>(_v: T) {}
2136 let loro = super::LoroDoc::new();
2137 is_send_sync(loro)
2138 }
2139
2140 #[test]
2141 fn test_checkout() {
2142 let loro = LoroDoc::new();
2143 loro.set_peer_id(1).unwrap();
2144 let text = loro.get_text("text");
2145 let map = loro.get_map("map");
2146 let list = loro.get_list("list");
2147 let mut txn = loro.txn().unwrap();
2148 for i in 0..10 {
2149 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2150 text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2151 .unwrap();
2152 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2153 }
2154 txn.commit().unwrap();
2155 let b = LoroDoc::new();
2156 b.import(&loro.export(ExportMode::Snapshot).unwrap())
2157 .unwrap();
2158 loro.checkout(&Frontiers::default()).unwrap();
2159 {
2160 let json = &loro.get_deep_value();
2161 assert_eq!(
2162 json.to_json_value(),
2163 serde_json::json!({"text":"","list":[],"map":{}})
2164 );
2165 }
2166
2167 b.checkout(&ID::new(1, 2).into()).unwrap();
2168 {
2169 let json = &b.get_deep_value();
2170 assert_eq!(
2171 json.to_json_value(),
2172 serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2173 );
2174 }
2175
2176 loro.checkout(&ID::new(1, 3).into()).unwrap();
2177 {
2178 let json = &loro.get_deep_value();
2179 assert_eq!(
2180 json.to_json_value(),
2181 serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2182 );
2183 }
2184
2185 b.checkout(&ID::new(1, 29).into()).unwrap();
2186 {
2187 let json = &b.get_deep_value();
2188 assert_eq!(
2189 json.to_json_value(),
2190 serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2191 );
2192 }
2193 }
2194
2195 #[test]
2196 fn import_batch_err_181() {
2197 let a = LoroDoc::new_auto_commit();
2198 let update_a = a.export(ExportMode::Snapshot);
2199 let b = LoroDoc::new_auto_commit();
2200 b.import_batch(&[update_a.unwrap()]).unwrap();
2201 b.get_text("text")
2202 .insert(0, "hello", PosType::Unicode)
2203 .unwrap();
2204 b.commit_then_renew();
2205 let oplog = b.oplog().lock();
2206 drop(oplog);
2207 b.export(ExportMode::all_updates()).unwrap();
2208 }
2209
2210 #[test]
2211 fn poisoned_mutex_keeps_follow_up_operations_failed() {
2212 let doc = LoroDoc::new();
2213 let oplog = doc.oplog.clone();
2214 let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2215 let _guard = oplog.lock();
2216 panic!("poison oplog");
2217 }));
2218
2219 let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2220 .expect_err("poisoned lock should continue to fail fast");
2221 let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2222 (*msg).to_string()
2223 } else if let Some(msg) = err.downcast_ref::<String>() {
2224 msg.clone()
2225 } else {
2226 String::new()
2227 };
2228 assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2229 }
2230}