1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::Timestamp,
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48 LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 #[inline]
89 pub fn with_barrier<F, R>(&self, f: F) -> R
90 where
91 F: FnOnce() -> R,
92 {
93 let (options, guard) = self.implicit_commit_then_stop();
94 let result = f();
95 drop(guard);
96 self.renew_txn_if_auto_commit(options);
97 result
98 }
99
100 pub fn new() -> Self {
101 let oplog = OpLog::new();
102 let arena = oplog.arena.clone();
103 let config: Configure = oplog.configure.clone();
104 let lock_group = LoroLockGroup::new();
105 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
106 let inner = Arc::new_cyclic(|w| {
107 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
108 LoroDocInner {
109 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
110 state,
111 config,
112 detached: AtomicBool::new(false),
113 auto_commit: AtomicBool::new(false),
114 observer: Arc::new(Observer::new(arena.clone())),
115 diff_calculator: Arc::new(
116 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
117 ),
118 txn: global_txn,
119 arena,
120 local_update_subs: SubscriberSetWithQueue::new(),
121 peer_id_change_subs: SubscriberSetWithQueue::new(),
122 pre_commit_subs: SubscriberSetWithQueue::new(),
123 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
124 }
125 });
126 LoroDoc { inner }
127 }
128
129 pub fn fork(&self) -> Self {
130 if self.is_detached() {
131 return self.fork_at(&self.state_frontiers());
132 }
133
134 let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
135 let doc = Self::new();
136 doc.with_barrier(|| {
137 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
138 })
139 .unwrap();
140 doc.set_config(&self.config);
141 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
142 doc.start_auto_commit();
143 }
144 doc
145 }
146 pub fn set_detached_editing(&self, enable: bool) {
162 self.config.set_detached_editing(enable);
163 if enable && self.is_detached() {
164 self.with_barrier(|| {
165 self.renew_peer_id();
166 });
167 }
168 }
169
170 #[inline]
172 pub fn new_auto_commit() -> Self {
173 let doc = Self::new();
174 doc.start_auto_commit();
175 doc
176 }
177
178 #[inline(always)]
179 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
180 if peer == PeerID::MAX {
181 return Err(LoroError::InvalidPeerID);
182 }
183 let next_id = self.oplog.lock().unwrap().next_id(peer);
184 if self.auto_commit.load(Acquire) {
185 let doc_state = self.state.lock().unwrap();
186 doc_state
187 .peer
188 .store(peer, std::sync::atomic::Ordering::Relaxed);
189
190 if doc_state.is_in_txn() {
191 drop(doc_state);
192 self.with_barrier(|| {});
194 }
195 self.peer_id_change_subs.emit(&(), next_id);
196 return Ok(());
197 }
198
199 let doc_state = self.state.lock().unwrap();
200 if doc_state.is_in_txn() {
201 return Err(LoroError::TransactionError(
202 "Cannot change peer id during transaction"
203 .to_string()
204 .into_boxed_str(),
205 ));
206 }
207
208 doc_state
209 .peer
210 .store(peer, std::sync::atomic::Ordering::Relaxed);
211 drop(doc_state);
212 self.peer_id_change_subs.emit(&(), next_id);
213 Ok(())
214 }
215
216 pub(crate) fn renew_peer_id(&self) {
218 let peer_id = DefaultRandom.next_u64();
219 self.set_peer_id(peer_id).unwrap();
220 }
221
222 #[inline]
234 #[must_use]
235 pub fn implicit_commit_then_stop(
236 &self,
237 ) -> (
238 Option<CommitOptions>,
239 LoroMutexGuard<'_, Option<Transaction>>,
240 ) {
241 let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
243 (a, b.unwrap())
244 }
245
246 #[inline]
251 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
252 self.commit_internal(CommitOptions::new().immediate_renew(true), false)
254 .0
255 }
256
257 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
262 let mut txn_guard = self.txn.lock().unwrap();
263 let Some(txn) = txn_guard.as_mut() else {
264 return Some(txn_guard);
265 };
266
267 if txn.is_peer_first_appearance {
268 txn.is_peer_first_appearance = false;
269 drop(txn_guard);
270 self.first_commit_from_peer_subs.emit(
272 &(),
273 FirstCommitFromPeerPayload {
274 peer: self.peer_id(),
275 },
276 );
277 }
278
279 None
280 }
281
282 #[instrument(skip_all)]
290 fn commit_internal(
291 &self,
292 config: CommitOptions,
293 preserve_on_empty: bool,
294 ) -> (
295 Option<CommitOptions>,
296 Option<LoroMutexGuard<'_, Option<Transaction>>>,
297 ) {
298 if !self.auto_commit.load(Acquire) {
299 let txn_guard = self.txn.lock().unwrap();
300 return (None, Some(txn_guard));
303 }
304
305 loop {
306 if let Some(txn_guard) = self.before_commit() {
307 return (None, Some(txn_guard));
308 }
309
310 let mut txn_guard = self.txn.lock().unwrap();
311 let txn = txn_guard.take();
312 let Some(mut txn) = txn else {
313 return (None, Some(txn_guard));
314 };
315 let on_commit = txn.take_on_commit();
316 if let Some(origin) = config.origin.clone() {
317 txn.set_origin(origin);
318 }
319
320 if let Some(timestamp) = config.timestamp {
321 txn.set_timestamp(timestamp);
322 }
323
324 if let Some(msg) = config.commit_msg.as_ref() {
325 txn.set_msg(Some(msg.clone()));
326 }
327
328 let id_span = txn.id_span();
329 let mut options = txn.commit().unwrap();
330 if let Some(opts) = options.as_mut() {
332 if config.origin.is_some() {
334 opts.set_origin(None);
335 }
336 if !preserve_on_empty {
338 options = None;
339 }
340 }
341 if config.immediate_renew {
342 assert!(self.can_edit());
343 let mut t = self.txn().unwrap();
344 if let Some(options) = options.as_ref() {
345 t.set_options(options.clone());
346 }
347 *txn_guard = Some(t);
348 }
349
350 if let Some(on_commit) = on_commit {
351 drop(txn_guard);
352 on_commit(&self.state, &self.oplog, id_span);
353 txn_guard = self.txn.lock().unwrap();
354 if !config.immediate_renew && txn_guard.is_some() {
355 continue;
357 }
358 }
359
360 return (
361 options,
362 if !config.immediate_renew {
363 Some(txn_guard)
364 } else {
365 None
366 },
367 );
368 }
369 }
370
371 #[instrument(skip_all)]
376 pub fn commit_with(
377 &self,
378 config: CommitOptions,
379 ) -> (
380 Option<CommitOptions>,
381 Option<LoroMutexGuard<'_, Option<Transaction>>>,
382 ) {
383 self.commit_internal(config, false)
384 }
385
386 pub fn set_next_commit_message(&self, message: &str) {
388 let mut binding = self.txn.lock().unwrap();
389 let Some(txn) = binding.as_mut() else {
390 return;
391 };
392
393 if message.is_empty() {
394 txn.set_msg(None)
395 } else {
396 txn.set_msg(Some(message.into()))
397 }
398 }
399
400 pub fn set_next_commit_origin(&self, origin: &str) {
402 let mut txn = self.txn.lock().unwrap();
403 if let Some(txn) = txn.as_mut() {
404 txn.set_origin(origin.into());
405 }
406 }
407
408 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
410 let mut txn = self.txn.lock().unwrap();
411 if let Some(txn) = txn.as_mut() {
412 txn.set_timestamp(timestamp);
413 }
414 }
415
416 pub fn set_next_commit_options(&self, options: CommitOptions) {
418 let mut txn = self.txn.lock().unwrap();
419 if let Some(txn) = txn.as_mut() {
420 txn.set_options(options);
421 }
422 }
423
424 pub fn clear_next_commit_options(&self) {
426 let mut txn = self.txn.lock().unwrap();
427 if let Some(txn) = txn.as_mut() {
428 txn.set_options(CommitOptions::new());
429 }
430 }
431
432 #[inline]
443 pub fn set_record_timestamp(&self, record: bool) {
444 self.config.set_record_timestamp(record);
445 }
446
447 #[inline]
452 pub fn set_change_merge_interval(&self, interval: i64) {
453 self.config.set_merge_interval(interval);
454 }
455
456 pub fn can_edit(&self) -> bool {
457 !self.is_detached() || self.config.detached_editing()
458 }
459
460 pub fn is_detached_editing_enabled(&self) -> bool {
461 self.config.detached_editing()
462 }
463
464 #[inline]
465 pub fn config_text_style(&self, text_style: StyleConfigMap) {
466 self.config.text_style_config.try_write().unwrap().map = text_style.map;
467 }
468
469 #[inline]
470 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
471 self.config
472 .text_style_config
473 .try_write()
474 .unwrap()
475 .default_style = text_style;
476 }
477 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
478 let doc = Self::new();
479 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
480 if mode.is_snapshot() {
481 doc.with_barrier(|| -> Result<(), LoroError> {
482 decode_snapshot(&doc, mode, body, Default::default())?;
483 Ok(())
484 })?;
485 Ok(doc)
486 } else {
487 Err(LoroError::DecodeError(
488 "Invalid encode mode".to_string().into(),
489 ))
490 }
491 }
492
493 #[inline(always)]
495 pub fn can_reset_with_snapshot(&self) -> bool {
496 let oplog = self.oplog.lock().unwrap();
497 if oplog.batch_importing {
498 return false;
499 }
500
501 if self.is_detached() {
502 return false;
503 }
504
505 oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
506 }
507
508 #[inline(always)]
514 pub fn is_detached(&self) -> bool {
515 self.detached.load(Acquire)
516 }
517
518 pub(crate) fn set_detached(&self, detached: bool) {
519 self.detached.store(detached, Release);
520 }
521
522 #[inline(always)]
523 pub fn peer_id(&self) -> PeerID {
524 self.state
525 .lock()
526 .unwrap()
527 .peer
528 .load(std::sync::atomic::Ordering::Relaxed)
529 }
530
531 #[inline(always)]
532 pub fn detach(&self) {
533 self.with_barrier(|| self.set_detached(true));
534 }
535
536 #[inline(always)]
537 pub fn attach(&self) {
538 self.checkout_to_latest()
539 }
540
541 pub fn state_timestamp(&self) -> Timestamp {
544 let f = { self.state.lock().unwrap().frontiers.clone() };
546 self.oplog.lock().unwrap().get_timestamp_of_version(&f)
547 }
548
549 #[inline(always)]
550 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
551 &self.state
552 }
553
554 #[inline]
555 pub fn get_state_deep_value(&self) -> LoroValue {
556 self.state.lock().unwrap().get_deep_value()
557 }
558
559 #[inline(always)]
560 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
561 &self.oplog
562 }
563
564 #[inline(always)]
565 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
566 let s = debug_span!("import", peer = self.peer_id());
567 let _e = s.enter();
568 self.import_with(bytes, Default::default())
569 }
570
571 #[inline]
572 pub fn import_with(
573 &self,
574 bytes: &[u8],
575 origin: InternalString,
576 ) -> Result<ImportStatus, LoroError> {
577 self.with_barrier(|| self._import_with(bytes, origin))
578 }
579
580 #[tracing::instrument(skip_all)]
581 fn _import_with(
582 &self,
583 bytes: &[u8],
584 origin: InternalString,
585 ) -> Result<ImportStatus, LoroError> {
586 ensure_cov::notify_cov("loro_internal::import");
587 let parsed = parse_header_and_body(bytes, true)?;
588 loro_common::info!("Importing with mode={:?}", &parsed.mode);
589 let result = match parsed.mode {
590 EncodeMode::OutdatedRle => {
591 if self.state.lock().unwrap().is_in_txn() {
592 return Err(LoroError::ImportWhenInTxn);
593 }
594
595 let s = tracing::span!(
596 tracing::Level::INFO,
597 "Import updates ",
598 peer = self.peer_id()
599 );
600 let _e = s.enter();
601 self.update_oplog_and_apply_delta_to_state_if_needed(
602 |oplog| oplog.decode(parsed),
603 origin,
604 )
605 }
606 EncodeMode::OutdatedSnapshot => {
607 if self.can_reset_with_snapshot() {
608 loro_common::info!("Init by snapshot {}", self.peer_id());
609 decode_snapshot(self, parsed.mode, parsed.body, origin)
610 } else {
611 self.update_oplog_and_apply_delta_to_state_if_needed(
612 |oplog| oplog.decode(parsed),
613 origin,
614 )
615 }
616 }
617 EncodeMode::FastSnapshot => {
618 if self.can_reset_with_snapshot() {
619 ensure_cov::notify_cov("loro_internal::import::snapshot");
620 loro_common::info!("Init by fast snapshot {}", self.peer_id());
621 decode_snapshot(self, parsed.mode, parsed.body, origin)
622 } else {
623 self.update_oplog_and_apply_delta_to_state_if_needed(
624 |oplog| oplog.decode(parsed),
625 origin,
626 )
627
628 }
633 }
634 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
635 |oplog| oplog.decode(parsed),
636 origin,
637 ),
638 EncodeMode::Auto => {
639 unreachable!()
640 }
641 };
642
643 self.emit_events();
644
645 result
646 }
647
648 #[tracing::instrument(skip_all)]
649 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
650 &self,
651 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
652 origin: InternalString,
653 ) -> Result<ImportStatus, LoroError> {
654 let mut oplog = self.oplog.lock().unwrap();
655 if !self.is_detached() {
656 let old_vv = oplog.vv().clone();
657 let old_frontiers = oplog.frontiers().clone();
658 let result = f(&mut oplog);
659 if &old_vv != oplog.vv() {
660 let mut diff = DiffCalculator::new(false);
661 let (diff, diff_mode) = diff.calc_diff_internal(
662 &oplog,
663 &old_vv,
664 &old_frontiers,
665 oplog.vv(),
666 oplog.dag.get_frontiers(),
667 None,
668 );
669 let mut state = self.state.lock().unwrap();
670 state.apply_diff(
671 InternalDocDiff {
672 origin,
673 diff: (diff).into(),
674 by: EventTriggerKind::Import,
675 new_version: Cow::Owned(oplog.frontiers().clone()),
676 },
677 diff_mode,
678 );
679 }
680 result
681 } else {
682 f(&mut oplog)
683 }
684 }
685
686 fn emit_events(&self) {
687 let events = {
689 let mut state = self.state.lock().unwrap();
690 state.take_events()
691 };
692 for event in events {
693 self.observer.emit(event);
694 }
695 }
696
697 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
698 let mut state = self.state.lock().unwrap();
699 state.take_events()
700 }
701
702 #[tracing::instrument(skip_all)]
706 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
707 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
708 self.with_barrier(|| {
709 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
710 |oplog| crate::encoding::json_schema::import_json(oplog, json),
711 Default::default(),
712 );
713 self.emit_events();
714 result
715 })
716 }
717
718 pub fn export_json_updates(
719 &self,
720 start_vv: &VersionVector,
721 end_vv: &VersionVector,
722 with_peer_compression: bool,
723 ) -> JsonSchema {
724 self.with_barrier(|| {
725 let oplog = self.oplog.lock().unwrap();
726 let mut start_vv = start_vv;
727 let _temp: Option<VersionVector>;
728 if !oplog.dag.shallow_since_vv().is_empty() {
729 let mut include_all = true;
731 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
732 if start_vv.get(peer).unwrap_or(&0) < counter {
733 include_all = false;
734 break;
735 }
736 }
737 if !include_all {
738 let mut vv = start_vv.clone();
739 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
740 vv.extend_to_include_end_id(ID::new(peer, counter));
741 }
742 _temp = Some(vv);
743 start_vv = _temp.as_ref().unwrap();
744 }
745 }
746
747 crate::encoding::json_schema::export_json(
748 &oplog,
749 start_vv,
750 end_vv,
751 with_peer_compression,
752 )
753 })
754 }
755
756 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
757 let oplog = self.oplog.lock().unwrap();
758 let mut changes = export_json_in_id_span(&oplog, id_span);
759 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
760 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
761 changes.push(change_json);
762 }
763 changes
764 }
765
766 #[inline]
768 pub fn oplog_vv(&self) -> VersionVector {
769 self.oplog.lock().unwrap().vv().clone()
770 }
771
772 #[inline]
774 pub fn state_vv(&self) -> VersionVector {
775 let oplog = self.oplog.lock().unwrap();
776 let f = &self.state.lock().unwrap().frontiers;
777 oplog.dag.frontiers_to_vv(f).unwrap()
778 }
779
780 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
781 let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
782 if let LoroValue::Container(c) = value {
783 Some(ValueOrHandler::Handler(Handler::new_attached(
784 c.clone(),
785 self.clone(),
786 )))
787 } else {
788 Some(ValueOrHandler::Value(value))
789 }
790 }
791
792 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
794 let path = str_to_path(path)?;
795 self.get_by_path(&path)
796 }
797
798 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
799 let arena = &self.arena;
800 let txn = self.txn.lock().unwrap();
801 let txn = txn.as_ref()?;
802 let ops_ = txn.local_ops();
803 let new_id = ID {
804 peer: *txn.peer(),
805 counter: ops_.first()?.counter,
806 };
807 let change = ChangeRef {
808 id: &new_id,
809 deps: txn.frontiers(),
810 timestamp: &txn
811 .timestamp()
812 .as_ref()
813 .copied()
814 .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
815 commit_msg: txn.msg(),
816 ops: ops_,
817 lamport: txn.lamport(),
818 };
819 let json = encode_change_to_json(change, arena);
820 Some(json)
821 }
822
823 #[inline]
824 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
825 if self.has_container(&id) {
826 Some(Handler::new_attached(id, self.clone()))
827 } else {
828 None
829 }
830 }
831
832 #[inline]
835 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
836 let id = id.into_container_id(&self.arena, ContainerType::Text);
837 assert!(self.has_container(&id));
838 Handler::new_attached(id, self.clone()).into_text().unwrap()
839 }
840
841 #[inline]
844 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
845 let id = id.into_container_id(&self.arena, ContainerType::List);
846 assert!(self.has_container(&id));
847 Handler::new_attached(id, self.clone()).into_list().unwrap()
848 }
849
850 #[inline]
853 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
854 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
855 assert!(self.has_container(&id));
856 Handler::new_attached(id, self.clone())
857 .into_movable_list()
858 .unwrap()
859 }
860
861 #[inline]
864 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
865 let id = id.into_container_id(&self.arena, ContainerType::Map);
866 assert!(self.has_container(&id));
867 Handler::new_attached(id, self.clone()).into_map().unwrap()
868 }
869
870 #[inline]
873 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
874 let id = id.into_container_id(&self.arena, ContainerType::Tree);
875 assert!(self.has_container(&id));
876 Handler::new_attached(id, self.clone()).into_tree().unwrap()
877 }
878
879 #[cfg(feature = "counter")]
880 pub fn get_counter<I: IntoContainerId>(
881 &self,
882 id: I,
883 ) -> crate::handler::counter::CounterHandler {
884 let id = id.into_container_id(&self.arena, ContainerType::Counter);
885 assert!(self.has_container(&id));
886 Handler::new_attached(id, self.clone())
887 .into_counter()
888 .unwrap()
889 }
890
891 #[must_use]
892 pub fn has_container(&self, id: &ContainerID) -> bool {
893 if id.is_root() {
894 return true;
895 }
896
897 let exist = self.state.lock().unwrap().does_container_exist(id);
898 exist
899 }
900
901 #[instrument(level = "info", skip_all)]
915 pub fn undo_internal(
916 &self,
917 id_span: IdSpan,
918 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
919 post_transform_base: Option<&DiffBatch>,
920 before_diff: &mut dyn FnMut(&DiffBatch),
921 ) -> LoroResult<CommitWhenDrop<'_>> {
922 if !self.can_edit() {
923 return Err(LoroError::EditWhenDetached);
924 }
925
926 let (options, txn) = self.implicit_commit_then_stop();
927 if !self
928 .oplog()
929 .lock()
930 .unwrap()
931 .vv()
932 .includes_id(id_span.id_last())
933 {
934 self.renew_txn_if_auto_commit(options);
935 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
936 }
937
938 let (was_recording, latest_frontiers) = {
939 let mut state = self.state.lock().unwrap();
940 let was_recording = state.is_recording();
941 state.stop_and_clear_recording();
942 (was_recording, state.frontiers.clone())
943 };
944
945 let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
946 let diff = crate::undo::undo(
947 spans,
948 match post_transform_base {
949 Some(d) => Either::Right(d),
950 None => Either::Left(&latest_frontiers),
951 },
952 |from, to| {
953 self._checkout_without_emitting(from, false, false).unwrap();
954 self.state.lock().unwrap().start_recording();
955 self._checkout_without_emitting(to, false, false).unwrap();
956 let mut state = self.state.lock().unwrap();
957 let e = state.take_events();
958 state.stop_and_clear_recording();
959 DiffBatch::new(e)
960 },
961 before_diff,
962 );
963
964 self._checkout_without_emitting(&latest_frontiers, false, false)?;
968 self.set_detached(false);
969 if was_recording {
970 self.state.lock().unwrap().start_recording();
971 }
972 drop(txn);
973 self.start_auto_commit();
974 if let Err(e) = self._apply_diff(diff, container_remap, true) {
978 warn!("Undo Failed {:?}", e);
979 }
980
981 if let Some(options) = options {
982 self.set_next_commit_options(options);
983 }
984 Ok(CommitWhenDrop {
985 doc: self,
986 default_options: CommitOptions::new().origin("undo"),
987 })
988 }
989
990 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
996 let f = self.state_frontiers();
999 let diff = self.diff(&f, target)?;
1000 self._apply_diff(diff, &mut Default::default(), false)
1001 }
1002
1003 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1008 {
1009 let oplog = self.oplog.lock().unwrap();
1012 let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1013 for id in frontiers.iter() {
1014 if !oplog.dag.contains(id) {
1015 return Err(LoroError::FrontiersNotFound(id));
1016 }
1017 }
1018
1019 if oplog.dag.is_before_shallow_root(frontiers) {
1020 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1021 }
1022
1023 Ok(())
1024 };
1025
1026 validate_frontiers(a)?;
1027 validate_frontiers(b)?;
1028 }
1029
1030 let (options, txn) = self.implicit_commit_then_stop();
1031 let was_detached = self.is_detached();
1032 let old_frontiers = self.state_frontiers();
1033 let was_recording = {
1034 let mut state = self.state.lock().unwrap();
1035 let is_recording = state.is_recording();
1036 state.stop_and_clear_recording();
1037 is_recording
1038 };
1039 self._checkout_without_emitting(a, true, false).unwrap();
1040 self.state.lock().unwrap().start_recording();
1041 self._checkout_without_emitting(b, true, false).unwrap();
1042 let e = {
1043 let mut state = self.state.lock().unwrap();
1044 let e = state.take_events();
1045 state.stop_and_clear_recording();
1046 e
1047 };
1048 self._checkout_without_emitting(&old_frontiers, false, false)
1049 .unwrap();
1050 drop(txn);
1051 if !was_detached {
1052 self.set_detached(false);
1053 self.renew_txn_if_auto_commit(options);
1054 }
1055 if was_recording {
1056 self.state.lock().unwrap().start_recording();
1057 }
1058 Ok(DiffBatch::new(e))
1059 }
1060
1061 #[inline(always)]
1063 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1064 self._apply_diff(diff, &mut Default::default(), true)
1065 }
1066
1067 pub(crate) fn _apply_diff(
1079 &self,
1080 diff: DiffBatch,
1081 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1082 skip_unreachable: bool,
1083 ) -> LoroResult<()> {
1084 if !self.can_edit() {
1085 return Err(LoroError::EditWhenDetached);
1086 }
1087
1088 let mut ans: LoroResult<()> = Ok(());
1089 let mut missing_containers: Vec<ContainerID> = Vec::new();
1090 for (mut id, diff) in diff.into_iter() {
1091 let mut remapped = false;
1092 while let Some(rid) = container_remap.get(&id) {
1093 remapped = true;
1094 id = rid.clone();
1095 }
1096
1097 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1098 let exists = self.state.lock().unwrap().does_container_exist(&id);
1100 if !exists {
1101 missing_containers.push(id);
1102 continue;
1103 }
1104 self.state.lock().unwrap().ensure_container(&id);
1106 }
1107
1108 if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1109 continue;
1110 }
1111
1112 let Some(h) = self.get_handler(id.clone()) else {
1113 return Err(LoroError::ContainersNotFound {
1114 containers: Box::new(vec![id]),
1115 });
1116 };
1117 if let Err(e) = h.apply_diff(diff, container_remap) {
1118 ans = Err(e);
1119 }
1120 }
1121
1122 if !missing_containers.is_empty() {
1123 return Err(LoroError::ContainersNotFound {
1124 containers: Box::new(missing_containers),
1125 });
1126 }
1127
1128 ans
1129 }
1130
1131 #[inline]
1133 pub fn diagnose_size(&self) {
1134 self.oplog().lock().unwrap().diagnose_size();
1135 }
1136
1137 #[inline]
1138 pub fn oplog_frontiers(&self) -> Frontiers {
1139 self.oplog().lock().unwrap().frontiers().clone()
1140 }
1141
1142 #[inline]
1143 pub fn state_frontiers(&self) -> Frontiers {
1144 self.state.lock().unwrap().frontiers.clone()
1145 }
1146
1147 #[inline]
1151 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1152 self.oplog().lock().unwrap().cmp_with_frontiers(other)
1153 }
1154
1155 #[inline]
1159 pub fn cmp_frontiers(
1160 &self,
1161 a: &Frontiers,
1162 b: &Frontiers,
1163 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1164 self.oplog().lock().unwrap().cmp_frontiers(a, b)
1165 }
1166
1167 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1168 let mut state = self.state.lock().unwrap();
1169 if !state.is_recording() {
1170 state.start_recording();
1171 }
1172
1173 self.observer.subscribe_root(callback)
1174 }
1175
1176 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1177 let mut state = self.state.lock().unwrap();
1178 if !state.is_recording() {
1179 state.start_recording();
1180 }
1181
1182 self.observer.subscribe(container_id, callback)
1183 }
1184
1185 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1186 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1187 activate();
1188 sub
1189 }
1190
1191 #[tracing::instrument(skip_all)]
1193 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1194 if bytes.is_empty() {
1195 return Ok(ImportStatus::default());
1196 }
1197
1198 if bytes.len() == 1 {
1199 return self.import(&bytes[0]);
1200 }
1201
1202 let mut success = VersionRange::default();
1203 let mut pending = VersionRange::default();
1204 let mut meta_arr = bytes
1205 .iter()
1206 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1207 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1208 meta_arr.sort_by(|a, b| {
1209 a.0.mode
1210 .cmp(&b.0.mode)
1211 .then(b.0.change_num.cmp(&a.0.change_num))
1212 });
1213
1214 let (options, txn) = self.implicit_commit_then_stop();
1215 let is_detached = self.is_detached();
1239 self.set_detached(true);
1240 self.oplog.lock().unwrap().batch_importing = true;
1241 let mut err = None;
1242 for (_meta, data) in meta_arr {
1243 match self._import_with(data, Default::default()) {
1244 Ok(s) => {
1245 for (peer, (start, end)) in s.success.iter() {
1246 match success.0.entry(*peer) {
1247 Entry::Occupied(mut e) => {
1248 e.get_mut().1 = *end.max(&e.get().1);
1249 }
1250 Entry::Vacant(e) => {
1251 e.insert((*start, *end));
1252 }
1253 }
1254 }
1255
1256 if let Some(p) = s.pending.as_ref() {
1257 for (&peer, &(start, end)) in p.iter() {
1258 match pending.0.entry(peer) {
1259 Entry::Occupied(mut e) => {
1260 e.get_mut().0 = start.min(e.get().0);
1261 e.get_mut().1 = end.min(e.get().1);
1262 }
1263 Entry::Vacant(e) => {
1264 e.insert((start, end));
1265 }
1266 }
1267 }
1268 }
1269 }
1270 Err(e) => {
1271 err = Some(e);
1272 }
1273 }
1274 }
1275
1276 let mut oplog = self.oplog.lock().unwrap();
1277 oplog.batch_importing = false;
1278 drop(oplog);
1279 if !is_detached {
1280 self._checkout_to_latest_with_guard(txn);
1281 } else {
1282 drop(txn);
1283 }
1284
1285 self.renew_txn_if_auto_commit(options);
1286 if let Some(err) = err {
1287 return Err(err);
1288 }
1289
1290 Ok(ImportStatus {
1291 success,
1292 pending: if pending.is_empty() {
1293 None
1294 } else {
1295 Some(pending)
1296 },
1297 })
1298 }
1299
1300 #[inline]
1302 pub fn get_value(&self) -> LoroValue {
1303 self.state.lock().unwrap().get_value()
1304 }
1305
1306 #[inline]
1308 pub fn get_deep_value(&self) -> LoroValue {
1309 self.state.lock().unwrap().get_deep_value()
1310 }
1311
1312 #[inline]
1314 pub fn get_deep_value_with_id(&self) -> LoroValue {
1315 self.state.lock().unwrap().get_deep_value_with_id()
1316 }
1317
1318 pub fn checkout_to_latest(&self) {
1319 let (options, _guard) = self.implicit_commit_then_stop();
1320 if !self.is_detached() {
1321 drop(_guard);
1322 self.renew_txn_if_auto_commit(options);
1323 return;
1324 }
1325
1326 self._checkout_to_latest_without_commit(true);
1327 drop(_guard);
1328 self.renew_txn_if_auto_commit(options);
1329 }
1330
1331 fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1332 if !self.is_detached() {
1333 self._renew_txn_if_auto_commit_with_guard(None, guard);
1334 return;
1335 }
1336
1337 self._checkout_to_latest_without_commit(true);
1338 self._renew_txn_if_auto_commit_with_guard(None, guard);
1339 }
1340
1341 pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1343 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1344 let f = self.oplog_frontiers();
1345 let this = &self;
1346 let frontiers = &f;
1347 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1348 .unwrap(); this.emit_events();
1351 if this.config.detached_editing() {
1352 this.renew_peer_id();
1353 }
1354
1355 self.set_detached(false);
1356 });
1357 }
1358
1359 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1364 let was_detached = self.is_detached();
1365 let (options, guard) = self.implicit_commit_then_stop();
1366 let result = self._checkout_without_emitting(frontiers, true, true);
1367 if result.is_ok() {
1368 self.emit_events();
1369 }
1370 drop(guard);
1371 if self.config.detached_editing() {
1372 if result.is_ok() {
1373 self.renew_peer_id();
1374 }
1375 self.renew_txn_if_auto_commit(options);
1376 } else if result.is_err() {
1377 if !was_detached {
1378 self.renew_txn_if_auto_commit(options);
1379 }
1380 } else if !self.is_detached() {
1381 self.renew_txn_if_auto_commit(options);
1382 }
1383
1384 result
1385 }
1386
1387 #[instrument(level = "info", skip(self))]
1389 pub(crate) fn _checkout_without_emitting(
1390 &self,
1391 frontiers: &Frontiers,
1392 to_shrink_frontiers: bool,
1393 to_commit_then_renew: bool,
1394 ) -> Result<(), LoroError> {
1395 assert!(self.txn.is_locked());
1396 let from_frontiers = self.state_frontiers();
1397 loro_common::info!(
1398 "checkout from={:?} to={:?} cur_vv={:?}",
1399 from_frontiers,
1400 frontiers,
1401 self.oplog_vv()
1402 );
1403
1404 if &from_frontiers == frontiers {
1405 return Ok(());
1406 }
1407
1408 let oplog = self.oplog.lock().unwrap();
1409 if oplog.dag.is_before_shallow_root(frontiers) {
1410 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1411 }
1412
1413 let frontiers = if to_shrink_frontiers {
1414 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1415 } else {
1416 frontiers.clone()
1417 };
1418
1419 if from_frontiers == frontiers {
1420 return Ok(());
1421 }
1422
1423 let mut state = self.state.lock().unwrap();
1424 let mut calc = self.diff_calculator.lock().unwrap();
1425 for i in frontiers.iter() {
1426 if !oplog.dag.contains(i) {
1427 return Err(LoroError::FrontiersNotFound(i));
1428 }
1429 }
1430
1431 let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1432 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1433 return Err(LoroError::NotFoundError(
1434 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1435 ));
1436 };
1437
1438 self.set_detached(true);
1439 let (diff, diff_mode) =
1440 calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1441 state.apply_diff(
1442 InternalDocDiff {
1443 origin: "checkout".into(),
1444 diff: Cow::Owned(diff),
1445 by: EventTriggerKind::Checkout,
1446 new_version: Cow::Owned(frontiers.clone()),
1447 },
1448 diff_mode,
1449 );
1450
1451 Ok(())
1452 }
1453
1454 #[inline]
1455 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1456 self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1457 }
1458
1459 #[inline]
1460 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1461 self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1462 }
1463
1464 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1468 let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1469 self.import(&updates)
1470 }
1471
1472 pub(crate) fn arena(&self) -> &SharedArena {
1473 &self.arena
1474 }
1475
1476 #[inline]
1477 pub fn len_ops(&self) -> usize {
1478 let oplog = self.oplog.lock().unwrap();
1479 let ans = oplog.vv().values().sum::<i32>() as usize;
1480 if oplog.is_shallow() {
1481 let sub = oplog
1482 .shallow_since_vv()
1483 .iter()
1484 .map(|(_, ops)| *ops)
1485 .sum::<i32>() as usize;
1486 ans - sub
1487 } else {
1488 ans
1489 }
1490 }
1491
1492 #[inline]
1493 pub fn len_changes(&self) -> usize {
1494 let oplog = self.oplog.lock().unwrap();
1495 oplog.len_changes()
1496 }
1497
1498 pub fn config(&self) -> &Configure {
1499 &self.config
1500 }
1501
1502 pub fn check_state_diff_calc_consistency_slow(&self) {
1507 {
1509 static IS_CHECKING: std::sync::atomic::AtomicBool =
1510 std::sync::atomic::AtomicBool::new(false);
1511 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1512 return;
1513 }
1514
1515 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1516 let peer_id = self.peer_id();
1517 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1518 let _g = s.enter();
1519 let options = self.implicit_commit_then_stop().0;
1520 self.oplog.lock().unwrap().check_dag_correctness();
1521 if self.is_shallow() {
1522 let initial_snapshot = self
1533 .export(ExportMode::state_only(Some(
1534 &self.shallow_since_frontiers(),
1535 )))
1536 .unwrap();
1537
1538 let doc = LoroDoc::new();
1540 doc.import(&initial_snapshot).unwrap();
1541 self.checkout(&self.shallow_since_frontiers()).unwrap();
1542 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1543
1544 let updates = self.export(ExportMode::all_updates()).unwrap();
1546
1547 doc.import(&updates).unwrap();
1549 self.checkout_to_latest();
1550
1551 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1554 let mut calculated_state = doc.app_state().lock().unwrap();
1555 let mut current_state = self.app_state().lock().unwrap();
1556 current_state.check_is_the_same(&mut calculated_state);
1557 } else {
1558 let f = self.state_frontiers();
1559 let vv = self
1560 .oplog()
1561 .lock()
1562 .unwrap()
1563 .dag
1564 .frontiers_to_vv(&f)
1565 .unwrap();
1566 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1567 let doc = Self::new();
1568 doc.import(&bytes).unwrap();
1569 let mut calculated_state = doc.app_state().lock().unwrap();
1570 let mut current_state = self.app_state().lock().unwrap();
1571 current_state.check_is_the_same(&mut calculated_state);
1572 }
1573
1574 self.renew_txn_if_auto_commit(options);
1575 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1576 }
1577 }
1578
1579 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1580 self.query_pos_internal(pos, true)
1581 }
1582
1583 pub(crate) fn query_pos_internal(
1585 &self,
1586 pos: &Cursor,
1587 ret_event_index: bool,
1588 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1589 if !self.has_container(&pos.container) {
1590 return Err(CannotFindRelativePosition::IdNotFound);
1591 }
1592
1593 let mut state = self.state.lock().unwrap();
1594 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1595 Ok(PosQueryResult {
1596 update: None,
1597 current: AbsolutePosition {
1598 pos: ans,
1599 side: pos.side,
1600 },
1601 })
1602 } else {
1603 drop(state);
1615 let result = self.with_barrier(|| {
1616 let oplog = self.oplog().lock().unwrap();
1617 if let Some(id) = pos.id {
1619 if oplog.arena.id_to_idx(&pos.container).is_none() {
1621 let mut s = self.state.lock().unwrap();
1622 if !s.does_container_exist(&pos.container) {
1623 return Err(CannotFindRelativePosition::ContainerDeleted);
1624 }
1625 s.ensure_container(&pos.container);
1626 drop(s);
1627 }
1628 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1629 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1631 if oplog.shallow_since_vv().includes_id(id) {
1632 return Err(CannotFindRelativePosition::HistoryCleared);
1633 }
1634
1635 tracing::error!("Cannot find id {}", id);
1636 return Err(CannotFindRelativePosition::IdNotFound);
1637 };
1638 let mut diff_calc = DiffCalculator::new(true);
1640 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1641 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1642 diff_calc.calc_diff_internal(
1644 &oplog,
1645 before,
1646 &before_frontiers,
1647 oplog.vv(),
1648 oplog.frontiers(),
1649 Some(&|target| idx == target),
1650 );
1651 let depth = self.arena.get_depth(idx);
1653 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1654 match diff_calc {
1655 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1656 let c = text.get_id_latest_pos(id).unwrap();
1657 let new_pos = c.pos;
1658 let handler = self.get_text(&pos.container);
1659 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1660 Ok(PosQueryResult {
1661 update: handler.get_cursor(current_pos, c.side),
1662 current: AbsolutePosition {
1663 pos: current_pos,
1664 side: c.side,
1665 },
1666 })
1667 }
1668 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1669 let c = list.get_id_latest_pos(id).unwrap();
1670 let new_pos = c.pos;
1671 let handler = self.get_list(&pos.container);
1672 Ok(PosQueryResult {
1673 update: handler.get_cursor(new_pos, c.side),
1674 current: AbsolutePosition {
1675 pos: new_pos,
1676 side: c.side,
1677 },
1678 })
1679 }
1680 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1681 let c = list.get_id_latest_pos(id).unwrap();
1682 let new_pos = c.pos;
1683 let handler = self.get_movable_list(&pos.container);
1684 let new_pos = handler.op_pos_to_user_pos(new_pos);
1685 Ok(PosQueryResult {
1686 update: handler.get_cursor(new_pos, c.side),
1687 current: AbsolutePosition {
1688 pos: new_pos,
1689 side: c.side,
1690 },
1691 })
1692 }
1693 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1694 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1695 #[cfg(feature = "counter")]
1696 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1697 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1698 }
1699 } else {
1700 match pos.container.container_type() {
1701 ContainerType::Text => {
1702 let text = self.get_text(&pos.container);
1703 Ok(PosQueryResult {
1704 update: Some(Cursor {
1705 id: None,
1706 container: text.id(),
1707 side: pos.side,
1708 origin_pos: text.len_unicode(),
1709 }),
1710 current: AbsolutePosition {
1711 pos: text.len_event(),
1712 side: pos.side,
1713 },
1714 })
1715 }
1716 ContainerType::List => {
1717 let list = self.get_list(&pos.container);
1718 Ok(PosQueryResult {
1719 update: Some(Cursor {
1720 id: None,
1721 container: list.id(),
1722 side: pos.side,
1723 origin_pos: list.len(),
1724 }),
1725 current: AbsolutePosition {
1726 pos: list.len(),
1727 side: pos.side,
1728 },
1729 })
1730 }
1731 ContainerType::MovableList => {
1732 let list = self.get_movable_list(&pos.container);
1733 Ok(PosQueryResult {
1734 update: Some(Cursor {
1735 id: None,
1736 container: list.id(),
1737 side: pos.side,
1738 origin_pos: list.len(),
1739 }),
1740 current: AbsolutePosition {
1741 pos: list.len(),
1742 side: pos.side,
1743 },
1744 })
1745 }
1746 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1747 unreachable!()
1748 }
1749 #[cfg(feature = "counter")]
1750 ContainerType::Counter => unreachable!(),
1751 }
1752 }
1753 });
1754 result
1755 }
1756 }
1757
1758 pub fn free_history_cache(&self) {
1763 self.oplog.lock().unwrap().free_history_cache();
1764 }
1765
1766 pub fn free_diff_calculator(&self) {
1768 *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1769 }
1770
1771 pub fn has_history_cache(&self) -> bool {
1774 self.oplog.lock().unwrap().has_history_cache()
1775 }
1776
1777 #[inline]
1781 pub fn compact_change_store(&self) {
1782 self.with_barrier(|| {
1783 self.oplog.lock().unwrap().compact_change_store();
1784 });
1785 }
1786
1787 #[inline]
1791 pub fn analyze(&self) -> DocAnalysis {
1792 DocAnalysis::analyze(self)
1793 }
1794
1795 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1797 let mut state = self.state.lock().unwrap();
1798 if state.arena.id_to_idx(id).is_none() {
1799 if !state.does_container_exist(id) {
1800 return None;
1801 }
1802 state.ensure_container(id);
1803 }
1804 let idx = state.arena.id_to_idx(id).unwrap();
1805 state.get_path(idx)
1806 }
1807
1808 #[instrument(skip(self))]
1809 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1810 self.with_barrier(|| {
1811 let ans = match mode {
1812 ExportMode::Snapshot => export_fast_snapshot(self),
1813 ExportMode::Updates { from } => export_fast_updates(self, &from),
1814 ExportMode::UpdatesInRange { spans } => {
1815 export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1816 }
1817 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1818 ExportMode::StateOnly(f) => match f {
1819 Some(f) => export_state_only_snapshot(self, &f)?,
1820 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1821 },
1822 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1823 };
1824 Ok(ans)
1825 })
1826 }
1827
1828 pub fn shallow_since_vv(&self) -> ImVersionVector {
1834 self.oplog().lock().unwrap().shallow_since_vv().clone()
1835 }
1836
1837 pub fn shallow_since_frontiers(&self) -> Frontiers {
1838 self.oplog()
1839 .lock()
1840 .unwrap()
1841 .shallow_since_frontiers()
1842 .clone()
1843 }
1844
1845 pub fn is_shallow(&self) -> bool {
1847 !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1848 }
1849
1850 pub fn get_pending_txn_len(&self) -> usize {
1855 if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1856 txn.len()
1857 } else {
1858 0
1859 }
1860 }
1861
1862 #[inline]
1863 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1864 self.oplog().lock().unwrap().dag.find_path(from, to)
1865 }
1866
1867 pub fn subscribe_first_commit_from_peer(
1873 &self,
1874 callback: FirstCommitFromPeerCallback,
1875 ) -> Subscription {
1876 let (s, enable) = self
1877 .first_commit_from_peer_subs
1878 .inner()
1879 .insert((), callback);
1880 enable();
1881 s
1882 }
1883
1884 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1889 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1890 enable();
1891 s
1892 }
1893}
1894
1895#[derive(Debug, thiserror::Error)]
1896pub enum ChangeTravelError {
1897 #[error("Target id not found {0:?}")]
1898 TargetIdNotFound(ID),
1899 #[error("The shallow history of the doc doesn't include the target version")]
1900 TargetVersionNotIncluded,
1901}
1902
1903impl LoroDoc {
1904 pub fn travel_change_ancestors(
1905 &self,
1906 ids: &[ID],
1907 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1908 ) -> Result<(), ChangeTravelError> {
1909 let (options, guard) = self.implicit_commit_then_stop();
1910 drop(guard);
1911 struct PendingNode(ChangeMeta);
1912 impl PartialEq for PendingNode {
1913 fn eq(&self, other: &Self) -> bool {
1914 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1915 }
1916 }
1917
1918 impl Eq for PendingNode {}
1919 impl PartialOrd for PendingNode {
1920 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1921 Some(self.cmp(other))
1922 }
1923 }
1924
1925 impl Ord for PendingNode {
1926 fn cmp(&self, other: &Self) -> Ordering {
1927 self.0
1928 .lamport_last()
1929 .cmp(&other.0.lamport_last())
1930 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1931 }
1932 }
1933
1934 for id in ids {
1935 let op_log = &self.oplog().lock().unwrap();
1936 if !op_log.vv().includes_id(*id) {
1937 return Err(ChangeTravelError::TargetIdNotFound(*id));
1938 }
1939 if op_log.dag.shallow_since_vv().includes_id(*id) {
1940 return Err(ChangeTravelError::TargetVersionNotIncluded);
1941 }
1942 }
1943
1944 let mut visited = FxHashSet::default();
1945 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1946 for id in ids {
1947 pending.push(PendingNode(ChangeMeta::from_change(
1948 &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1949 )));
1950 }
1951 while let Some(PendingNode(node)) = pending.pop() {
1952 let deps = node.deps.clone();
1953 if f(node).is_break() {
1954 break;
1955 }
1956
1957 for dep in deps.iter() {
1958 let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1959 continue;
1960 };
1961 if visited.contains(&dep_node.id) {
1962 continue;
1963 }
1964
1965 visited.insert(dep_node.id);
1966 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1967 }
1968 }
1969
1970 let ans = Ok(());
1971 self.renew_txn_if_auto_commit(options);
1972 ans
1973 }
1974
1975 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1976 self.with_barrier(|| {
1977 let mut set = FxHashSet::default();
1978 {
1979 let oplog = self.oplog().lock().unwrap();
1980 for op in oplog.iter_ops(id.to_span(len)) {
1981 let id = oplog.arena.get_container_id(op.container()).unwrap();
1982 set.insert(id);
1983 }
1984 }
1985 set
1986 })
1987 }
1988
1989 pub fn delete_root_container(&self, cid: ContainerID) {
1990 if !cid.is_root() {
1991 return;
1992 }
1993
1994 if !self.has_container(&cid) {
1996 return;
1997 }
1998
1999 let Some(h) = self.get_handler(cid.clone()) else {
2000 return;
2001 };
2002
2003 if let Err(e) = h.clear() {
2004 eprintln!("Failed to clear handler: {:?}", e);
2005 return;
2006 }
2007 self.config
2008 .deleted_root_containers
2009 .lock()
2010 .unwrap()
2011 .insert(cid);
2012 }
2013
2014 pub fn set_hide_empty_root_containers(&self, hide: bool) {
2015 self.config
2016 .hide_empty_root_containers
2017 .store(hide, std::sync::atomic::Ordering::Relaxed);
2018 }
2019}
2020
2021fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2023 let start_vv = oplog
2024 .dag
2025 .frontiers_to_vv(&id.into())
2026 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2027 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2028 for op in change.ops.iter().rev() {
2029 if op.container != idx {
2030 continue;
2031 }
2032 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2033 if d.id_start.to_span(d.atom_len()).contains(id) {
2034 return Some(ID::new(change.peer(), op.counter));
2035 }
2036 }
2037 }
2038 }
2039
2040 None
2041}
2042
2043#[derive(Debug)]
2044pub struct CommitWhenDrop<'a> {
2045 doc: &'a LoroDoc,
2046 default_options: CommitOptions,
2047}
2048
2049impl Drop for CommitWhenDrop<'_> {
2050 fn drop(&mut self) {
2051 {
2052 let mut guard = self.doc.txn.lock().unwrap();
2053 if let Some(txn) = guard.as_mut() {
2054 txn.set_default_options(std::mem::take(&mut self.default_options));
2055 };
2056 }
2057
2058 self.doc.commit_then_renew();
2059 }
2060}
2061
2062#[derive(Debug, Clone)]
2064pub struct CommitOptions {
2065 pub origin: Option<InternalString>,
2068
2069 pub immediate_renew: bool,
2072
2073 pub timestamp: Option<Timestamp>,
2076
2077 pub commit_msg: Option<Arc<str>>,
2079}
2080
2081impl CommitOptions {
2082 pub fn new() -> Self {
2084 Self {
2085 origin: None,
2086 immediate_renew: true,
2087 timestamp: None,
2088 commit_msg: None,
2089 }
2090 }
2091
2092 pub fn origin(mut self, origin: &str) -> Self {
2094 self.origin = Some(origin.into());
2095 self
2096 }
2097
2098 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2100 self.immediate_renew = immediate_renew;
2101 self
2102 }
2103
2104 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2108 self.timestamp = Some(timestamp);
2109 self
2110 }
2111
2112 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2114 self.commit_msg = Some(commit_msg.into());
2115 self
2116 }
2117
2118 pub fn set_origin(&mut self, origin: Option<&str>) {
2120 self.origin = origin.map(|x| x.into())
2121 }
2122
2123 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2125 self.timestamp = timestamp;
2126 }
2127}
2128
2129impl Default for CommitOptions {
2130 fn default() -> Self {
2131 Self::new()
2132 }
2133}
2134
2135#[cfg(test)]
2136mod test {
2137 use crate::{cursor::PosType, loro::ExportMode, version::Frontiers, LoroDoc, ToJson};
2138 use loro_common::ID;
2139
2140 #[test]
2141 fn test_sync() {
2142 fn is_send_sync<T: Send + Sync>(_v: T) {}
2143 let loro = super::LoroDoc::new();
2144 is_send_sync(loro)
2145 }
2146
2147 #[test]
2148 fn test_checkout() {
2149 let loro = LoroDoc::new();
2150 loro.set_peer_id(1).unwrap();
2151 let text = loro.get_text("text");
2152 let map = loro.get_map("map");
2153 let list = loro.get_list("list");
2154 let mut txn = loro.txn().unwrap();
2155 for i in 0..10 {
2156 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2157 text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2158 .unwrap();
2159 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2160 }
2161 txn.commit().unwrap();
2162 let b = LoroDoc::new();
2163 b.import(&loro.export(ExportMode::Snapshot).unwrap())
2164 .unwrap();
2165 loro.checkout(&Frontiers::default()).unwrap();
2166 {
2167 let json = &loro.get_deep_value();
2168 assert_eq!(
2169 json.to_json_value(),
2170 serde_json::json!({"text":"","list":[],"map":{}})
2171 );
2172 }
2173
2174 b.checkout(&ID::new(1, 2).into()).unwrap();
2175 {
2176 let json = &b.get_deep_value();
2177 assert_eq!(
2178 json.to_json_value(),
2179 serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2180 );
2181 }
2182
2183 loro.checkout(&ID::new(1, 3).into()).unwrap();
2184 {
2185 let json = &loro.get_deep_value();
2186 assert_eq!(
2187 json.to_json_value(),
2188 serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2189 );
2190 }
2191
2192 b.checkout(&ID::new(1, 29).into()).unwrap();
2193 {
2194 let json = &b.get_deep_value();
2195 assert_eq!(
2196 json.to_json_value(),
2197 serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2198 );
2199 }
2200 }
2201
2202 #[test]
2203 fn import_batch_err_181() {
2204 let a = LoroDoc::new_auto_commit();
2205 let update_a = a.export(ExportMode::Snapshot);
2206 let b = LoroDoc::new_auto_commit();
2207 b.import_batch(&[update_a.unwrap()]).unwrap();
2208 b.get_text("text")
2209 .insert(0, "hello", PosType::Unicode)
2210 .unwrap();
2211 b.commit_then_renew();
2212 let oplog = b.oplog().lock().unwrap();
2213 drop(oplog);
2214 b.export(ExportMode::all_updates()).unwrap();
2215 }
2216}