1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::Timestamp,
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48 LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 #[inline]
89 pub fn with_barrier<F, R>(&self, f: F) -> R
90 where
91 F: FnOnce() -> R,
92 {
93 let (options, guard) = self.implicit_commit_then_stop();
94 let result = f();
95 drop(guard);
96 self.renew_txn_if_auto_commit(options);
97 result
98 }
99
100 pub fn new() -> Self {
101 let oplog = OpLog::new();
102 let arena = oplog.arena.clone();
103 let config: Configure = oplog.configure.clone();
104 let lock_group = LoroLockGroup::new();
105 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
106 let inner = Arc::new_cyclic(|w| {
107 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
108 LoroDocInner {
109 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
110 state,
111 config,
112 detached: AtomicBool::new(false),
113 auto_commit: AtomicBool::new(false),
114 observer: Arc::new(Observer::new(arena.clone())),
115 diff_calculator: Arc::new(
116 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
117 ),
118 txn: global_txn,
119 arena,
120 local_update_subs: SubscriberSetWithQueue::new(),
121 peer_id_change_subs: SubscriberSetWithQueue::new(),
122 pre_commit_subs: SubscriberSetWithQueue::new(),
123 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
124 }
125 });
126 LoroDoc { inner }
127 }
128
129 pub fn fork(&self) -> Self {
130 if self.is_detached() {
131 return self.fork_at(&self.state_frontiers());
132 }
133
134 let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
135 let doc = Self::new();
136 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default()).unwrap();
137 doc.set_config(&self.config);
138 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
139 doc.start_auto_commit();
140 }
141 doc
142 }
143 pub fn set_detached_editing(&self, enable: bool) {
159 self.config.set_detached_editing(enable);
160 if enable && self.is_detached() {
161 self.with_barrier(|| {
162 self.renew_peer_id();
163 });
164 }
165 }
166
167 #[inline]
169 pub fn new_auto_commit() -> Self {
170 let doc = Self::new();
171 doc.start_auto_commit();
172 doc
173 }
174
175 #[inline(always)]
176 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
177 if peer == PeerID::MAX {
178 return Err(LoroError::InvalidPeerID);
179 }
180 let next_id = self.oplog.lock().unwrap().next_id(peer);
181 if self.auto_commit.load(Acquire) {
182 let doc_state = self.state.lock().unwrap();
183 doc_state
184 .peer
185 .store(peer, std::sync::atomic::Ordering::Relaxed);
186
187 if doc_state.is_in_txn() {
188 drop(doc_state);
189 self.with_barrier(|| {});
191 }
192 self.peer_id_change_subs.emit(&(), next_id);
193 return Ok(());
194 }
195
196 let doc_state = self.state.lock().unwrap();
197 if doc_state.is_in_txn() {
198 return Err(LoroError::TransactionError(
199 "Cannot change peer id during transaction"
200 .to_string()
201 .into_boxed_str(),
202 ));
203 }
204
205 doc_state
206 .peer
207 .store(peer, std::sync::atomic::Ordering::Relaxed);
208 drop(doc_state);
209 self.peer_id_change_subs.emit(&(), next_id);
210 Ok(())
211 }
212
213 pub(crate) fn renew_peer_id(&self) {
215 let peer_id = DefaultRandom.next_u64();
216 self.set_peer_id(peer_id).unwrap();
217 }
218
219 #[inline]
231 #[must_use]
232 pub fn implicit_commit_then_stop(
233 &self,
234 ) -> (
235 Option<CommitOptions>,
236 LoroMutexGuard<'_, Option<Transaction>>,
237 ) {
238 let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
240 (a, b.unwrap())
241 }
242
243 #[inline]
248 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
249 self.commit_internal(CommitOptions::new().immediate_renew(true), false)
251 .0
252 }
253
254 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
259 let mut txn_guard = self.txn.lock().unwrap();
260 let Some(txn) = txn_guard.as_mut() else {
261 return Some(txn_guard);
262 };
263
264 if txn.is_peer_first_appearance {
265 txn.is_peer_first_appearance = false;
266 drop(txn_guard);
267 self.first_commit_from_peer_subs.emit(
269 &(),
270 FirstCommitFromPeerPayload {
271 peer: self.peer_id(),
272 },
273 );
274 }
275
276 None
277 }
278
279 #[instrument(skip_all)]
287 fn commit_internal(
288 &self,
289 config: CommitOptions,
290 preserve_on_empty: bool,
291 ) -> (
292 Option<CommitOptions>,
293 Option<LoroMutexGuard<'_, Option<Transaction>>>,
294 ) {
295 if !self.auto_commit.load(Acquire) {
296 let txn_guard = self.txn.lock().unwrap();
297 return (None, Some(txn_guard));
300 }
301
302 loop {
303 if let Some(txn_guard) = self.before_commit() {
304 return (None, Some(txn_guard));
305 }
306
307 let mut txn_guard = self.txn.lock().unwrap();
308 let txn = txn_guard.take();
309 let Some(mut txn) = txn else {
310 return (None, Some(txn_guard));
311 };
312 let on_commit = txn.take_on_commit();
313 if let Some(origin) = config.origin.clone() {
314 txn.set_origin(origin);
315 }
316
317 if let Some(timestamp) = config.timestamp {
318 txn.set_timestamp(timestamp);
319 }
320
321 if let Some(msg) = config.commit_msg.as_ref() {
322 txn.set_msg(Some(msg.clone()));
323 }
324
325 let id_span = txn.id_span();
326 let mut options = txn.commit().unwrap();
327 if let Some(opts) = options.as_mut() {
329 if config.origin.is_some() {
331 opts.set_origin(None);
332 }
333 if !preserve_on_empty {
335 options = None;
336 }
337 }
338 if config.immediate_renew {
339 assert!(self.can_edit());
340 let mut t = self.txn().unwrap();
341 if let Some(options) = options.as_ref() {
342 t.set_options(options.clone());
343 }
344 *txn_guard = Some(t);
345 }
346
347 if let Some(on_commit) = on_commit {
348 drop(txn_guard);
349 on_commit(&self.state, &self.oplog, id_span);
350 txn_guard = self.txn.lock().unwrap();
351 if !config.immediate_renew && txn_guard.is_some() {
352 continue;
354 }
355 }
356
357 return (
358 options,
359 if !config.immediate_renew {
360 Some(txn_guard)
361 } else {
362 None
363 },
364 );
365 }
366 }
367
368 #[instrument(skip_all)]
373 pub fn commit_with(
374 &self,
375 config: CommitOptions,
376 ) -> (
377 Option<CommitOptions>,
378 Option<LoroMutexGuard<'_, Option<Transaction>>>,
379 ) {
380 self.commit_internal(config, false)
381 }
382
383 pub fn set_next_commit_message(&self, message: &str) {
385 let mut binding = self.txn.lock().unwrap();
386 let Some(txn) = binding.as_mut() else {
387 return;
388 };
389
390 if message.is_empty() {
391 txn.set_msg(None)
392 } else {
393 txn.set_msg(Some(message.into()))
394 }
395 }
396
397 pub fn set_next_commit_origin(&self, origin: &str) {
399 let mut txn = self.txn.lock().unwrap();
400 if let Some(txn) = txn.as_mut() {
401 txn.set_origin(origin.into());
402 }
403 }
404
405 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
407 let mut txn = self.txn.lock().unwrap();
408 if let Some(txn) = txn.as_mut() {
409 txn.set_timestamp(timestamp);
410 }
411 }
412
413 pub fn set_next_commit_options(&self, options: CommitOptions) {
415 let mut txn = self.txn.lock().unwrap();
416 if let Some(txn) = txn.as_mut() {
417 txn.set_options(options);
418 }
419 }
420
421 pub fn clear_next_commit_options(&self) {
423 let mut txn = self.txn.lock().unwrap();
424 if let Some(txn) = txn.as_mut() {
425 txn.set_options(CommitOptions::new());
426 }
427 }
428
429 #[inline]
440 pub fn set_record_timestamp(&self, record: bool) {
441 self.config.set_record_timestamp(record);
442 }
443
444 #[inline]
449 pub fn set_change_merge_interval(&self, interval: i64) {
450 self.config.set_merge_interval(interval);
451 }
452
453 pub fn can_edit(&self) -> bool {
454 !self.is_detached() || self.config.detached_editing()
455 }
456
457 pub fn is_detached_editing_enabled(&self) -> bool {
458 self.config.detached_editing()
459 }
460
461 #[inline]
462 pub fn config_text_style(&self, text_style: StyleConfigMap) {
463 self.config.text_style_config.try_write().unwrap().map = text_style.map;
464 }
465
466 #[inline]
467 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
468 self.config
469 .text_style_config
470 .try_write()
471 .unwrap()
472 .default_style = text_style;
473 }
474 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
475 let doc = Self::new();
476 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
477 if mode.is_snapshot() {
478 doc.with_barrier(|| -> Result<(), LoroError> {
479 decode_snapshot(&doc, mode, body, Default::default())?;
480 Ok(())
481 })?;
482 Ok(doc)
483 } else {
484 Err(LoroError::DecodeError(
485 "Invalid encode mode".to_string().into(),
486 ))
487 }
488 }
489
490 #[inline(always)]
492 pub fn can_reset_with_snapshot(&self) -> bool {
493 let oplog = self.oplog.lock().unwrap();
494 if oplog.batch_importing {
495 return false;
496 }
497
498 if self.is_detached() {
499 return false;
500 }
501
502 oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
503 }
504
505 #[inline(always)]
511 pub fn is_detached(&self) -> bool {
512 self.detached.load(Acquire)
513 }
514
515 pub(crate) fn set_detached(&self, detached: bool) {
516 self.detached.store(detached, Release);
517 }
518
519 #[inline(always)]
520 pub fn peer_id(&self) -> PeerID {
521 self.state
522 .lock()
523 .unwrap()
524 .peer
525 .load(std::sync::atomic::Ordering::Relaxed)
526 }
527
528 #[inline(always)]
529 pub fn detach(&self) {
530 self.with_barrier(|| self.set_detached(true));
531 }
532
533 #[inline(always)]
534 pub fn attach(&self) {
535 self.checkout_to_latest()
536 }
537
538 pub fn state_timestamp(&self) -> Timestamp {
541 let f = { self.state.lock().unwrap().frontiers.clone() };
543 self.oplog.lock().unwrap().get_timestamp_of_version(&f)
544 }
545
546 #[inline(always)]
547 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
548 &self.state
549 }
550
551 #[inline]
552 pub fn get_state_deep_value(&self) -> LoroValue {
553 self.state.lock().unwrap().get_deep_value()
554 }
555
556 #[inline(always)]
557 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
558 &self.oplog
559 }
560
561 pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
562 self.with_barrier(|| self.oplog.lock().unwrap().export_from(vv))
563 }
564
565 #[inline(always)]
566 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
567 let s = debug_span!("import", peer = self.peer_id());
568 let _e = s.enter();
569 self.import_with(bytes, Default::default())
570 }
571
572 #[inline]
573 pub fn import_with(
574 &self,
575 bytes: &[u8],
576 origin: InternalString,
577 ) -> Result<ImportStatus, LoroError> {
578 self.with_barrier(|| self._import_with(bytes, origin))
579 }
580
581 #[tracing::instrument(skip_all)]
582 fn _import_with(
583 &self,
584 bytes: &[u8],
585 origin: InternalString,
586 ) -> Result<ImportStatus, LoroError> {
587 ensure_cov::notify_cov("loro_internal::import");
588 let parsed = parse_header_and_body(bytes, true)?;
589 loro_common::info!("Importing with mode={:?}", &parsed.mode);
590 let result = match parsed.mode {
591 EncodeMode::OutdatedRle => {
592 if self.state.lock().unwrap().is_in_txn() {
593 return Err(LoroError::ImportWhenInTxn);
594 }
595
596 let s = tracing::span!(
597 tracing::Level::INFO,
598 "Import updates ",
599 peer = self.peer_id()
600 );
601 let _e = s.enter();
602 self.update_oplog_and_apply_delta_to_state_if_needed(
603 |oplog| oplog.decode(parsed),
604 origin,
605 )
606 }
607 EncodeMode::OutdatedSnapshot => {
608 if self.can_reset_with_snapshot() {
609 loro_common::info!("Init by snapshot {}", self.peer_id());
610 decode_snapshot(self, parsed.mode, parsed.body, origin)
611 } else {
612 self.update_oplog_and_apply_delta_to_state_if_needed(
613 |oplog| oplog.decode(parsed),
614 origin,
615 )
616 }
617 }
618 EncodeMode::FastSnapshot => {
619 if self.can_reset_with_snapshot() {
620 ensure_cov::notify_cov("loro_internal::import::snapshot");
621 loro_common::info!("Init by fast snapshot {}", self.peer_id());
622 decode_snapshot(self, parsed.mode, parsed.body, origin)
623 } else {
624 self.update_oplog_and_apply_delta_to_state_if_needed(
625 |oplog| oplog.decode(parsed),
626 origin,
627 )
628
629 }
634 }
635 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
636 |oplog| oplog.decode(parsed),
637 origin,
638 ),
639 EncodeMode::Auto => {
640 unreachable!()
641 }
642 };
643
644 self.emit_events();
645
646 result
647 }
648
649 #[tracing::instrument(skip_all)]
650 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
651 &self,
652 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
653 origin: InternalString,
654 ) -> Result<ImportStatus, LoroError> {
655 let mut oplog = self.oplog.lock().unwrap();
656 if !self.is_detached() {
657 let old_vv = oplog.vv().clone();
658 let old_frontiers = oplog.frontiers().clone();
659 let result = f(&mut oplog);
660 if &old_vv != oplog.vv() {
661 let mut diff = DiffCalculator::new(false);
662 let (diff, diff_mode) = diff.calc_diff_internal(
663 &oplog,
664 &old_vv,
665 &old_frontiers,
666 oplog.vv(),
667 oplog.dag.get_frontiers(),
668 None,
669 );
670 let mut state = self.state.lock().unwrap();
671 state.apply_diff(
672 InternalDocDiff {
673 origin,
674 diff: (diff).into(),
675 by: EventTriggerKind::Import,
676 new_version: Cow::Owned(oplog.frontiers().clone()),
677 },
678 diff_mode,
679 );
680 }
681 result
682 } else {
683 f(&mut oplog)
684 }
685 }
686
687 fn emit_events(&self) {
688 let events = {
690 let mut state = self.state.lock().unwrap();
691 state.take_events()
692 };
693 for event in events {
694 self.observer.emit(event);
695 }
696 }
697
698 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
699 let mut state = self.state.lock().unwrap();
700 state.take_events()
701 }
702
703 #[instrument(skip_all)]
704 pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
705 if self.is_shallow() {
706 return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
707 }
708 let ans = self.with_barrier(|| export_snapshot(self));
709 Ok(ans)
710 }
711
712 #[tracing::instrument(skip_all)]
716 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
717 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
718 self.with_barrier(|| {
719 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
720 |oplog| crate::encoding::json_schema::import_json(oplog, json),
721 Default::default(),
722 );
723 self.emit_events();
724 result
725 })
726 }
727
728 pub fn export_json_updates(
729 &self,
730 start_vv: &VersionVector,
731 end_vv: &VersionVector,
732 with_peer_compression: bool,
733 ) -> JsonSchema {
734 self.with_barrier(|| {
735 let oplog = self.oplog.lock().unwrap();
736 let mut start_vv = start_vv;
737 let _temp: Option<VersionVector>;
738 if !oplog.dag.shallow_since_vv().is_empty() {
739 let mut include_all = true;
741 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
742 if start_vv.get(peer).unwrap_or(&0) < counter {
743 include_all = false;
744 break;
745 }
746 }
747 if !include_all {
748 let mut vv = start_vv.clone();
749 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
750 vv.extend_to_include_end_id(ID::new(peer, counter));
751 }
752 _temp = Some(vv);
753 start_vv = _temp.as_ref().unwrap();
754 }
755 }
756
757 crate::encoding::json_schema::export_json(
758 &oplog,
759 start_vv,
760 end_vv,
761 with_peer_compression,
762 )
763 })
764 }
765
766 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
767 let oplog = self.oplog.lock().unwrap();
768 let mut changes = export_json_in_id_span(&oplog, id_span);
769 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
770 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
771 changes.push(change_json);
772 }
773 changes
774 }
775
776 #[inline]
778 pub fn oplog_vv(&self) -> VersionVector {
779 self.oplog.lock().unwrap().vv().clone()
780 }
781
782 #[inline]
784 pub fn state_vv(&self) -> VersionVector {
785 let oplog = self.oplog.lock().unwrap();
786 let f = &self.state.lock().unwrap().frontiers;
787 oplog.dag.frontiers_to_vv(f).unwrap()
788 }
789
790 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
791 let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
792 if let LoroValue::Container(c) = value {
793 Some(ValueOrHandler::Handler(Handler::new_attached(
794 c.clone(),
795 self.clone(),
796 )))
797 } else {
798 Some(ValueOrHandler::Value(value))
799 }
800 }
801
802 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
804 let path = str_to_path(path)?;
805 self.get_by_path(&path)
806 }
807
808 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
809 let arena = &self.arena;
810 let txn = self.txn.lock().unwrap();
811 let txn = txn.as_ref()?;
812 let ops_ = txn.local_ops();
813 let new_id = ID {
814 peer: *txn.peer(),
815 counter: ops_.first()?.counter,
816 };
817 let change = ChangeRef {
818 id: &new_id,
819 deps: txn.frontiers(),
820 timestamp: &txn
821 .timestamp()
822 .as_ref()
823 .copied()
824 .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
825 commit_msg: txn.msg(),
826 ops: ops_,
827 lamport: txn.lamport(),
828 };
829 let json = encode_change_to_json(change, arena);
830 Some(json)
831 }
832
833 #[inline]
834 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
835 if self.has_container(&id) {
836 Some(Handler::new_attached(id, self.clone()))
837 } else {
838 None
839 }
840 }
841
842 #[inline]
845 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
846 let id = id.into_container_id(&self.arena, ContainerType::Text);
847 assert!(self.has_container(&id));
848 Handler::new_attached(id, self.clone()).into_text().unwrap()
849 }
850
851 #[inline]
854 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
855 let id = id.into_container_id(&self.arena, ContainerType::List);
856 assert!(self.has_container(&id));
857 Handler::new_attached(id, self.clone()).into_list().unwrap()
858 }
859
860 #[inline]
863 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
864 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
865 assert!(self.has_container(&id));
866 Handler::new_attached(id, self.clone())
867 .into_movable_list()
868 .unwrap()
869 }
870
871 #[inline]
874 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
875 let id = id.into_container_id(&self.arena, ContainerType::Map);
876 assert!(self.has_container(&id));
877 Handler::new_attached(id, self.clone()).into_map().unwrap()
878 }
879
880 #[inline]
883 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
884 let id = id.into_container_id(&self.arena, ContainerType::Tree);
885 assert!(self.has_container(&id));
886 Handler::new_attached(id, self.clone()).into_tree().unwrap()
887 }
888
889 #[cfg(feature = "counter")]
890 pub fn get_counter<I: IntoContainerId>(
891 &self,
892 id: I,
893 ) -> crate::handler::counter::CounterHandler {
894 let id = id.into_container_id(&self.arena, ContainerType::Counter);
895 assert!(self.has_container(&id));
896 Handler::new_attached(id, self.clone())
897 .into_counter()
898 .unwrap()
899 }
900
901 #[must_use]
902 pub fn has_container(&self, id: &ContainerID) -> bool {
903 if id.is_root() {
904 return true;
905 }
906
907 let exist = self.state.lock().unwrap().does_container_exist(id);
908 exist
909 }
910
911 #[instrument(level = "info", skip_all)]
925 pub fn undo_internal(
926 &self,
927 id_span: IdSpan,
928 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
929 post_transform_base: Option<&DiffBatch>,
930 before_diff: &mut dyn FnMut(&DiffBatch),
931 ) -> LoroResult<CommitWhenDrop<'_>> {
932 if !self.can_edit() {
933 return Err(LoroError::EditWhenDetached);
934 }
935
936 let (options, txn) = self.implicit_commit_then_stop();
937 if !self
938 .oplog()
939 .lock()
940 .unwrap()
941 .vv()
942 .includes_id(id_span.id_last())
943 {
944 self.renew_txn_if_auto_commit(options);
945 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
946 }
947
948 let (was_recording, latest_frontiers) = {
949 let mut state = self.state.lock().unwrap();
950 let was_recording = state.is_recording();
951 state.stop_and_clear_recording();
952 (was_recording, state.frontiers.clone())
953 };
954
955 let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
956 let diff = crate::undo::undo(
957 spans,
958 match post_transform_base {
959 Some(d) => Either::Right(d),
960 None => Either::Left(&latest_frontiers),
961 },
962 |from, to| {
963 self._checkout_without_emitting(from, false, false).unwrap();
964 self.state.lock().unwrap().start_recording();
965 self._checkout_without_emitting(to, false, false).unwrap();
966 let mut state = self.state.lock().unwrap();
967 let e = state.take_events();
968 state.stop_and_clear_recording();
969 DiffBatch::new(e)
970 },
971 before_diff,
972 );
973
974 self._checkout_without_emitting(&latest_frontiers, false, false)?;
978 self.set_detached(false);
979 if was_recording {
980 self.state.lock().unwrap().start_recording();
981 }
982 drop(txn);
983 self.start_auto_commit();
984 if let Err(e) = self._apply_diff(diff, container_remap, true) {
988 warn!("Undo Failed {:?}", e);
989 }
990
991 if let Some(options) = options {
992 self.set_next_commit_options(options);
993 }
994 Ok(CommitWhenDrop {
995 doc: self,
996 default_options: CommitOptions::new().origin("undo"),
997 })
998 }
999
1000 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1006 let f = self.state_frontiers();
1009 let diff = self.diff(&f, target)?;
1010 self._apply_diff(diff, &mut Default::default(), false)
1011 }
1012
1013 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1018 {
1019 let oplog = self.oplog.lock().unwrap();
1021 for id in a.iter() {
1022 if !oplog.dag.contains(id) {
1023 return Err(LoroError::FrontiersNotFound(id));
1024 }
1025 }
1026 for id in b.iter() {
1027 if !oplog.dag.contains(id) {
1028 return Err(LoroError::FrontiersNotFound(id));
1029 }
1030 }
1031 }
1032
1033 let (options, txn) = self.implicit_commit_then_stop();
1034 let was_detached = self.is_detached();
1035 let old_frontiers = self.state_frontiers();
1036 let was_recording = {
1037 let mut state = self.state.lock().unwrap();
1038 let is_recording = state.is_recording();
1039 state.stop_and_clear_recording();
1040 is_recording
1041 };
1042 self._checkout_without_emitting(a, true, false).unwrap();
1043 self.state.lock().unwrap().start_recording();
1044 self._checkout_without_emitting(b, true, false).unwrap();
1045 let e = {
1046 let mut state = self.state.lock().unwrap();
1047 let e = state.take_events();
1048 state.stop_and_clear_recording();
1049 e
1050 };
1051 self._checkout_without_emitting(&old_frontiers, false, false)
1052 .unwrap();
1053 drop(txn);
1054 if !was_detached {
1055 self.set_detached(false);
1056 self.renew_txn_if_auto_commit(options);
1057 }
1058 if was_recording {
1059 self.state.lock().unwrap().start_recording();
1060 }
1061 Ok(DiffBatch::new(e))
1062 }
1063
1064 #[inline(always)]
1066 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1067 self._apply_diff(diff, &mut Default::default(), true)
1068 }
1069
1070 pub(crate) fn _apply_diff(
1082 &self,
1083 diff: DiffBatch,
1084 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1085 skip_unreachable: bool,
1086 ) -> LoroResult<()> {
1087 if !self.can_edit() {
1088 return Err(LoroError::EditWhenDetached);
1089 }
1090
1091 let mut ans: LoroResult<()> = Ok(());
1092 let mut missing_containers: Vec<ContainerID> = Vec::new();
1093 for (mut id, diff) in diff.into_iter() {
1094 let mut remapped = false;
1095 while let Some(rid) = container_remap.get(&id) {
1096 remapped = true;
1097 id = rid.clone();
1098 }
1099
1100 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1101 let exists = self.state.lock().unwrap().does_container_exist(&id);
1103 if !exists {
1104 missing_containers.push(id);
1105 continue;
1106 }
1107 self.state.lock().unwrap().ensure_container(&id);
1109 }
1110
1111 if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1112 continue;
1113 }
1114
1115 let Some(h) = self.get_handler(id.clone()) else {
1116 return Err(LoroError::ContainersNotFound {
1117 containers: Box::new(vec![id]),
1118 });
1119 };
1120 if let Err(e) = h.apply_diff(diff, container_remap) {
1121 ans = Err(e);
1122 }
1123 }
1124
1125 if !missing_containers.is_empty() {
1126 return Err(LoroError::ContainersNotFound {
1127 containers: Box::new(missing_containers),
1128 });
1129 }
1130
1131 ans
1132 }
1133
1134 #[inline]
1136 pub fn diagnose_size(&self) {
1137 self.oplog().lock().unwrap().diagnose_size();
1138 }
1139
1140 #[inline]
1141 pub fn oplog_frontiers(&self) -> Frontiers {
1142 self.oplog().lock().unwrap().frontiers().clone()
1143 }
1144
1145 #[inline]
1146 pub fn state_frontiers(&self) -> Frontiers {
1147 self.state.lock().unwrap().frontiers.clone()
1148 }
1149
1150 #[inline]
1154 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1155 self.oplog().lock().unwrap().cmp_with_frontiers(other)
1156 }
1157
1158 #[inline]
1162 pub fn cmp_frontiers(
1163 &self,
1164 a: &Frontiers,
1165 b: &Frontiers,
1166 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1167 self.oplog().lock().unwrap().cmp_frontiers(a, b)
1168 }
1169
1170 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1171 let mut state = self.state.lock().unwrap();
1172 if !state.is_recording() {
1173 state.start_recording();
1174 }
1175
1176 self.observer.subscribe_root(callback)
1177 }
1178
1179 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1180 let mut state = self.state.lock().unwrap();
1181 if !state.is_recording() {
1182 state.start_recording();
1183 }
1184
1185 self.observer.subscribe(container_id, callback)
1186 }
1187
1188 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1189 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1190 activate();
1191 sub
1192 }
1193
1194 #[tracing::instrument(skip_all)]
1196 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1197 if bytes.is_empty() {
1198 return Ok(ImportStatus::default());
1199 }
1200
1201 if bytes.len() == 1 {
1202 return self.import(&bytes[0]);
1203 }
1204
1205 let mut success = VersionRange::default();
1206 let mut pending = VersionRange::default();
1207 let mut meta_arr = bytes
1208 .iter()
1209 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1210 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1211 meta_arr.sort_by(|a, b| {
1212 a.0.mode
1213 .cmp(&b.0.mode)
1214 .then(b.0.change_num.cmp(&a.0.change_num))
1215 });
1216
1217 let (options, txn) = self.implicit_commit_then_stop();
1218 let is_detached = self.is_detached();
1242 self.set_detached(true);
1243 self.oplog.lock().unwrap().batch_importing = true;
1244 let mut err = None;
1245 for (_meta, data) in meta_arr {
1246 match self._import_with(data, Default::default()) {
1247 Ok(s) => {
1248 for (peer, (start, end)) in s.success.iter() {
1249 match success.0.entry(*peer) {
1250 Entry::Occupied(mut e) => {
1251 e.get_mut().1 = *end.max(&e.get().1);
1252 }
1253 Entry::Vacant(e) => {
1254 e.insert((*start, *end));
1255 }
1256 }
1257 }
1258
1259 if let Some(p) = s.pending.as_ref() {
1260 for (&peer, &(start, end)) in p.iter() {
1261 match pending.0.entry(peer) {
1262 Entry::Occupied(mut e) => {
1263 e.get_mut().0 = start.min(e.get().0);
1264 e.get_mut().1 = end.min(e.get().1);
1265 }
1266 Entry::Vacant(e) => {
1267 e.insert((start, end));
1268 }
1269 }
1270 }
1271 }
1272 }
1273 Err(e) => {
1274 err = Some(e);
1275 }
1276 }
1277 }
1278
1279 let mut oplog = self.oplog.lock().unwrap();
1280 oplog.batch_importing = false;
1281 drop(oplog);
1282 if !is_detached {
1283 self._checkout_to_latest_with_guard(txn);
1284 } else {
1285 drop(txn);
1286 }
1287
1288 self.renew_txn_if_auto_commit(options);
1289 if let Some(err) = err {
1290 return Err(err);
1291 }
1292
1293 Ok(ImportStatus {
1294 success,
1295 pending: if pending.is_empty() {
1296 None
1297 } else {
1298 Some(pending)
1299 },
1300 })
1301 }
1302
1303 #[inline]
1305 pub fn get_value(&self) -> LoroValue {
1306 self.state.lock().unwrap().get_value()
1307 }
1308
1309 #[inline]
1311 pub fn get_deep_value(&self) -> LoroValue {
1312 self.state.lock().unwrap().get_deep_value()
1313 }
1314
1315 #[inline]
1317 pub fn get_deep_value_with_id(&self) -> LoroValue {
1318 self.state.lock().unwrap().get_deep_value_with_id()
1319 }
1320
1321 pub fn checkout_to_latest(&self) {
1322 let (options, _guard) = self.implicit_commit_then_stop();
1323 if !self.is_detached() {
1324 drop(_guard);
1325 self.renew_txn_if_auto_commit(options);
1326 return;
1327 }
1328
1329 self._checkout_to_latest_without_commit(true);
1330 drop(_guard);
1331 self.renew_txn_if_auto_commit(options);
1332 }
1333
1334 fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1335 if !self.is_detached() {
1336 self._renew_txn_if_auto_commit_with_guard(None, guard);
1337 return;
1338 }
1339
1340 self._checkout_to_latest_without_commit(true);
1341 self._renew_txn_if_auto_commit_with_guard(None, guard);
1342 }
1343
1344 pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1346 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1347 let f = self.oplog_frontiers();
1348 let this = &self;
1349 let frontiers = &f;
1350 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1351 .unwrap(); this.emit_events();
1354 if this.config.detached_editing() {
1355 this.renew_peer_id();
1356 }
1357
1358 self.set_detached(false);
1359 });
1360 }
1361
1362 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1367 let (options, guard) = self.implicit_commit_then_stop();
1368 self._checkout_without_emitting(frontiers, true, true)?;
1369 self.emit_events();
1370 drop(guard);
1371 if self.config.detached_editing() {
1372 self.renew_peer_id();
1373 self.renew_txn_if_auto_commit(options);
1374 } else if !self.is_detached() {
1375 self.renew_txn_if_auto_commit(options);
1376 }
1377
1378 Ok(())
1379 }
1380
1381 #[instrument(level = "info", skip(self))]
1383 pub(crate) fn _checkout_without_emitting(
1384 &self,
1385 frontiers: &Frontiers,
1386 to_shrink_frontiers: bool,
1387 to_commit_then_renew: bool,
1388 ) -> Result<(), LoroError> {
1389 assert!(self.txn.is_locked());
1390 let from_frontiers = self.state_frontiers();
1391 loro_common::info!(
1392 "checkout from={:?} to={:?} cur_vv={:?}",
1393 from_frontiers,
1394 frontiers,
1395 self.oplog_vv()
1396 );
1397
1398 if &from_frontiers == frontiers {
1399 return Ok(());
1400 }
1401
1402 let oplog = self.oplog.lock().unwrap();
1403 if oplog.dag.is_before_shallow_root(frontiers) {
1404 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1405 }
1406
1407 let frontiers = if to_shrink_frontiers {
1408 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1409 } else {
1410 frontiers.clone()
1411 };
1412
1413 if from_frontiers == frontiers {
1414 return Ok(());
1415 }
1416
1417 let mut state = self.state.lock().unwrap();
1418 let mut calc = self.diff_calculator.lock().unwrap();
1419 for i in frontiers.iter() {
1420 if !oplog.dag.contains(i) {
1421 return Err(LoroError::FrontiersNotFound(i));
1422 }
1423 }
1424
1425 let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1426 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1427 return Err(LoroError::NotFoundError(
1428 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1429 ));
1430 };
1431
1432 self.set_detached(true);
1433 let (diff, diff_mode) =
1434 calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1435 state.apply_diff(
1436 InternalDocDiff {
1437 origin: "checkout".into(),
1438 diff: Cow::Owned(diff),
1439 by: EventTriggerKind::Checkout,
1440 new_version: Cow::Owned(frontiers.clone()),
1441 },
1442 diff_mode,
1443 );
1444
1445 Ok(())
1446 }
1447
1448 #[inline]
1449 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1450 self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1451 }
1452
1453 #[inline]
1454 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1455 self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1456 }
1457
1458 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1462 self.import(&other.export_from(&self.oplog_vv()))
1463 }
1464
1465 pub(crate) fn arena(&self) -> &SharedArena {
1466 &self.arena
1467 }
1468
1469 #[inline]
1470 pub fn len_ops(&self) -> usize {
1471 let oplog = self.oplog.lock().unwrap();
1472 let ans = oplog.vv().values().sum::<i32>() as usize;
1473 if oplog.is_shallow() {
1474 let sub = oplog
1475 .shallow_since_vv()
1476 .iter()
1477 .map(|(_, ops)| *ops)
1478 .sum::<i32>() as usize;
1479 ans - sub
1480 } else {
1481 ans
1482 }
1483 }
1484
1485 #[inline]
1486 pub fn len_changes(&self) -> usize {
1487 let oplog = self.oplog.lock().unwrap();
1488 oplog.len_changes()
1489 }
1490
1491 pub fn config(&self) -> &Configure {
1492 &self.config
1493 }
1494
1495 pub fn check_state_diff_calc_consistency_slow(&self) {
1500 {
1502 static IS_CHECKING: std::sync::atomic::AtomicBool =
1503 std::sync::atomic::AtomicBool::new(false);
1504 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1505 return;
1506 }
1507
1508 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1509 let peer_id = self.peer_id();
1510 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1511 let _g = s.enter();
1512 let options = self.implicit_commit_then_stop().0;
1513 self.oplog.lock().unwrap().check_dag_correctness();
1514 if self.is_shallow() {
1515 let initial_snapshot = self
1526 .export(ExportMode::state_only(Some(
1527 &self.shallow_since_frontiers(),
1528 )))
1529 .unwrap();
1530
1531 let doc = LoroDoc::new();
1533 doc.import(&initial_snapshot).unwrap();
1534 self.checkout(&self.shallow_since_frontiers()).unwrap();
1535 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1536
1537 let updates = self.export(ExportMode::all_updates()).unwrap();
1539
1540 doc.import(&updates).unwrap();
1542 self.checkout_to_latest();
1543
1544 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1547 let mut calculated_state = doc.app_state().lock().unwrap();
1548 let mut current_state = self.app_state().lock().unwrap();
1549 current_state.check_is_the_same(&mut calculated_state);
1550 } else {
1551 let f = self.state_frontiers();
1552 let vv = self
1553 .oplog()
1554 .lock()
1555 .unwrap()
1556 .dag
1557 .frontiers_to_vv(&f)
1558 .unwrap();
1559 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1560 let doc = Self::new();
1561 doc.import(&bytes).unwrap();
1562 let mut calculated_state = doc.app_state().lock().unwrap();
1563 let mut current_state = self.app_state().lock().unwrap();
1564 current_state.check_is_the_same(&mut calculated_state);
1565 }
1566
1567 self.renew_txn_if_auto_commit(options);
1568 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1569 }
1570 }
1571
1572 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1573 self.query_pos_internal(pos, true)
1574 }
1575
1576 pub(crate) fn query_pos_internal(
1578 &self,
1579 pos: &Cursor,
1580 ret_event_index: bool,
1581 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1582 let mut state = self.state.lock().unwrap();
1583 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1584 Ok(PosQueryResult {
1585 update: None,
1586 current: AbsolutePosition {
1587 pos: ans,
1588 side: pos.side,
1589 },
1590 })
1591 } else {
1592 drop(state);
1604 let result = self.with_barrier(|| {
1605 let oplog = self.oplog().lock().unwrap();
1606 if let Some(id) = pos.id {
1608 if oplog.arena.id_to_idx(&pos.container).is_none() {
1610 let mut s = self.state.lock().unwrap();
1611 if !s.does_container_exist(&pos.container) {
1612 return Err(CannotFindRelativePosition::ContainerDeleted);
1613 }
1614 s.ensure_container(&pos.container);
1615 drop(s);
1616 }
1617 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1618 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1620 if oplog.shallow_since_vv().includes_id(id) {
1621 return Err(CannotFindRelativePosition::HistoryCleared);
1622 }
1623
1624 tracing::error!("Cannot find id {}", id);
1625 return Err(CannotFindRelativePosition::IdNotFound);
1626 };
1627 let mut diff_calc = DiffCalculator::new(true);
1629 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1630 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1631 diff_calc.calc_diff_internal(
1633 &oplog,
1634 before,
1635 &before_frontiers,
1636 oplog.vv(),
1637 oplog.frontiers(),
1638 Some(&|target| idx == target),
1639 );
1640 let depth = self.arena.get_depth(idx);
1642 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1643 match diff_calc {
1644 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1645 let c = text.get_id_latest_pos(id).unwrap();
1646 let new_pos = c.pos;
1647 let handler = self.get_text(&pos.container);
1648 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1649 Ok(PosQueryResult {
1650 update: handler.get_cursor(current_pos, c.side),
1651 current: AbsolutePosition {
1652 pos: current_pos,
1653 side: c.side,
1654 },
1655 })
1656 }
1657 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1658 let c = list.get_id_latest_pos(id).unwrap();
1659 let new_pos = c.pos;
1660 let handler = self.get_list(&pos.container);
1661 Ok(PosQueryResult {
1662 update: handler.get_cursor(new_pos, c.side),
1663 current: AbsolutePosition {
1664 pos: new_pos,
1665 side: c.side,
1666 },
1667 })
1668 }
1669 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1670 let c = list.get_id_latest_pos(id).unwrap();
1671 let new_pos = c.pos;
1672 let handler = self.get_movable_list(&pos.container);
1673 let new_pos = handler.op_pos_to_user_pos(new_pos);
1674 Ok(PosQueryResult {
1675 update: handler.get_cursor(new_pos, c.side),
1676 current: AbsolutePosition {
1677 pos: new_pos,
1678 side: c.side,
1679 },
1680 })
1681 }
1682 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1683 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1684 #[cfg(feature = "counter")]
1685 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1686 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1687 }
1688 } else {
1689 match pos.container.container_type() {
1690 ContainerType::Text => {
1691 let text = self.get_text(&pos.container);
1692 Ok(PosQueryResult {
1693 update: Some(Cursor {
1694 id: None,
1695 container: text.id(),
1696 side: pos.side,
1697 origin_pos: text.len_unicode(),
1698 }),
1699 current: AbsolutePosition {
1700 pos: text.len_event(),
1701 side: pos.side,
1702 },
1703 })
1704 }
1705 ContainerType::List => {
1706 let list = self.get_list(&pos.container);
1707 Ok(PosQueryResult {
1708 update: Some(Cursor {
1709 id: None,
1710 container: list.id(),
1711 side: pos.side,
1712 origin_pos: list.len(),
1713 }),
1714 current: AbsolutePosition {
1715 pos: list.len(),
1716 side: pos.side,
1717 },
1718 })
1719 }
1720 ContainerType::MovableList => {
1721 let list = self.get_movable_list(&pos.container);
1722 Ok(PosQueryResult {
1723 update: Some(Cursor {
1724 id: None,
1725 container: list.id(),
1726 side: pos.side,
1727 origin_pos: list.len(),
1728 }),
1729 current: AbsolutePosition {
1730 pos: list.len(),
1731 side: pos.side,
1732 },
1733 })
1734 }
1735 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1736 unreachable!()
1737 }
1738 #[cfg(feature = "counter")]
1739 ContainerType::Counter => unreachable!(),
1740 }
1741 }
1742 });
1743 result
1744 }
1745 }
1746
1747 pub fn free_history_cache(&self) {
1752 self.oplog.lock().unwrap().free_history_cache();
1753 }
1754
1755 pub fn free_diff_calculator(&self) {
1757 *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1758 }
1759
1760 pub fn has_history_cache(&self) -> bool {
1763 self.oplog.lock().unwrap().has_history_cache()
1764 }
1765
1766 #[inline]
1770 pub fn compact_change_store(&self) {
1771 self.with_barrier(|| {
1772 self.oplog.lock().unwrap().compact_change_store();
1773 });
1774 }
1775
1776 #[inline]
1780 pub fn analyze(&self) -> DocAnalysis {
1781 DocAnalysis::analyze(self)
1782 }
1783
1784 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1786 let mut state = self.state.lock().unwrap();
1787 if state.arena.id_to_idx(id).is_none() {
1788 if !state.does_container_exist(id) {
1789 return None;
1790 }
1791 state.ensure_container(id);
1792 }
1793 let idx = state.arena.id_to_idx(id).unwrap();
1794 state.get_path(idx)
1795 }
1796
1797 #[instrument(skip(self))]
1798 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1799 self.with_barrier(|| {
1800 let ans = match mode {
1801 ExportMode::Snapshot => export_fast_snapshot(self),
1802 ExportMode::Updates { from } => export_fast_updates(self, &from),
1803 ExportMode::UpdatesInRange { spans } => {
1804 export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1805 }
1806 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1807 ExportMode::StateOnly(f) => match f {
1808 Some(f) => export_state_only_snapshot(self, &f)?,
1809 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1810 },
1811 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1812 };
1813 Ok(ans)
1814 })
1815 }
1816
1817 pub fn shallow_since_vv(&self) -> ImVersionVector {
1823 self.oplog().lock().unwrap().shallow_since_vv().clone()
1824 }
1825
1826 pub fn shallow_since_frontiers(&self) -> Frontiers {
1827 self.oplog()
1828 .lock()
1829 .unwrap()
1830 .shallow_since_frontiers()
1831 .clone()
1832 }
1833
1834 pub fn is_shallow(&self) -> bool {
1836 !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1837 }
1838
1839 pub fn get_pending_txn_len(&self) -> usize {
1844 if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1845 txn.len()
1846 } else {
1847 0
1848 }
1849 }
1850
1851 #[inline]
1852 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1853 self.oplog().lock().unwrap().dag.find_path(from, to)
1854 }
1855
1856 pub fn subscribe_first_commit_from_peer(
1862 &self,
1863 callback: FirstCommitFromPeerCallback,
1864 ) -> Subscription {
1865 let (s, enable) = self
1866 .first_commit_from_peer_subs
1867 .inner()
1868 .insert((), callback);
1869 enable();
1870 s
1871 }
1872
1873 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1878 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1879 enable();
1880 s
1881 }
1882}
1883
1884#[derive(Debug, thiserror::Error)]
1885pub enum ChangeTravelError {
1886 #[error("Target id not found {0:?}")]
1887 TargetIdNotFound(ID),
1888 #[error("The shallow history of the doc doesn't include the target version")]
1889 TargetVersionNotIncluded,
1890}
1891
1892impl LoroDoc {
1893 pub fn travel_change_ancestors(
1894 &self,
1895 ids: &[ID],
1896 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1897 ) -> Result<(), ChangeTravelError> {
1898 let (options, guard) = self.implicit_commit_then_stop();
1899 drop(guard);
1900 struct PendingNode(ChangeMeta);
1901 impl PartialEq for PendingNode {
1902 fn eq(&self, other: &Self) -> bool {
1903 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1904 }
1905 }
1906
1907 impl Eq for PendingNode {}
1908 impl PartialOrd for PendingNode {
1909 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1910 Some(self.cmp(other))
1911 }
1912 }
1913
1914 impl Ord for PendingNode {
1915 fn cmp(&self, other: &Self) -> Ordering {
1916 self.0
1917 .lamport_last()
1918 .cmp(&other.0.lamport_last())
1919 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1920 }
1921 }
1922
1923 for id in ids {
1924 let op_log = &self.oplog().lock().unwrap();
1925 if !op_log.vv().includes_id(*id) {
1926 return Err(ChangeTravelError::TargetIdNotFound(*id));
1927 }
1928 if op_log.dag.shallow_since_vv().includes_id(*id) {
1929 return Err(ChangeTravelError::TargetVersionNotIncluded);
1930 }
1931 }
1932
1933 let mut visited = FxHashSet::default();
1934 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1935 for id in ids {
1936 pending.push(PendingNode(ChangeMeta::from_change(
1937 &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1938 )));
1939 }
1940 while let Some(PendingNode(node)) = pending.pop() {
1941 let deps = node.deps.clone();
1942 if f(node).is_break() {
1943 break;
1944 }
1945
1946 for dep in deps.iter() {
1947 let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1948 continue;
1949 };
1950 if visited.contains(&dep_node.id) {
1951 continue;
1952 }
1953
1954 visited.insert(dep_node.id);
1955 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1956 }
1957 }
1958
1959 let ans = Ok(());
1960 self.renew_txn_if_auto_commit(options);
1961 ans
1962 }
1963
1964 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1965 self.with_barrier(|| {
1966 let mut set = FxHashSet::default();
1967 {
1968 let oplog = self.oplog().lock().unwrap();
1969 for op in oplog.iter_ops(id.to_span(len)) {
1970 let id = oplog.arena.get_container_id(op.container()).unwrap();
1971 set.insert(id);
1972 }
1973 }
1974 set
1975 })
1976 }
1977
1978 pub fn delete_root_container(&self, cid: ContainerID) {
1979 if !cid.is_root() {
1980 return;
1981 }
1982
1983 if !self.has_container(&cid) {
1985 return;
1986 }
1987
1988 let Some(h) = self.get_handler(cid.clone()) else {
1989 return;
1990 };
1991
1992 if let Err(e) = h.clear() {
1993 eprintln!("Failed to clear handler: {:?}", e);
1994 return;
1995 }
1996 self.config
1997 .deleted_root_containers
1998 .lock()
1999 .unwrap()
2000 .insert(cid);
2001 }
2002
2003 pub fn set_hide_empty_root_containers(&self, hide: bool) {
2004 self.config
2005 .hide_empty_root_containers
2006 .store(hide, std::sync::atomic::Ordering::Relaxed);
2007 }
2008}
2009
2010fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2012 let start_vv = oplog
2013 .dag
2014 .frontiers_to_vv(&id.into())
2015 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2016 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2017 for op in change.ops.iter().rev() {
2018 if op.container != idx {
2019 continue;
2020 }
2021 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2022 if d.id_start.to_span(d.atom_len()).contains(id) {
2023 return Some(ID::new(change.peer(), op.counter));
2024 }
2025 }
2026 }
2027 }
2028
2029 None
2030}
2031
2032#[derive(Debug)]
2033pub struct CommitWhenDrop<'a> {
2034 doc: &'a LoroDoc,
2035 default_options: CommitOptions,
2036}
2037
2038impl Drop for CommitWhenDrop<'_> {
2039 fn drop(&mut self) {
2040 {
2041 let mut guard = self.doc.txn.lock().unwrap();
2042 if let Some(txn) = guard.as_mut() {
2043 txn.set_default_options(std::mem::take(&mut self.default_options));
2044 };
2045 }
2046
2047 self.doc.commit_then_renew();
2048 }
2049}
2050
2051#[derive(Debug, Clone)]
2053pub struct CommitOptions {
2054 pub origin: Option<InternalString>,
2057
2058 pub immediate_renew: bool,
2061
2062 pub timestamp: Option<Timestamp>,
2065
2066 pub commit_msg: Option<Arc<str>>,
2068}
2069
2070impl CommitOptions {
2071 pub fn new() -> Self {
2073 Self {
2074 origin: None,
2075 immediate_renew: true,
2076 timestamp: None,
2077 commit_msg: None,
2078 }
2079 }
2080
2081 pub fn origin(mut self, origin: &str) -> Self {
2083 self.origin = Some(origin.into());
2084 self
2085 }
2086
2087 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2089 self.immediate_renew = immediate_renew;
2090 self
2091 }
2092
2093 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2097 self.timestamp = Some(timestamp);
2098 self
2099 }
2100
2101 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2103 self.commit_msg = Some(commit_msg.into());
2104 self
2105 }
2106
2107 pub fn set_origin(&mut self, origin: Option<&str>) {
2109 self.origin = origin.map(|x| x.into())
2110 }
2111
2112 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2114 self.timestamp = timestamp;
2115 }
2116}
2117
2118impl Default for CommitOptions {
2119 fn default() -> Self {
2120 Self::new()
2121 }
2122}
2123
2124#[cfg(test)]
2125mod test {
2126 use loro_common::ID;
2127
2128 use crate::{version::Frontiers, LoroDoc, ToJson};
2129
2130 #[test]
2131 fn test_sync() {
2132 fn is_send_sync<T: Send + Sync>(_v: T) {}
2133 let loro = super::LoroDoc::new();
2134 is_send_sync(loro)
2135 }
2136
2137 #[test]
2138 fn test_checkout() {
2139 let loro = LoroDoc::new();
2140 loro.set_peer_id(1).unwrap();
2141 let text = loro.get_text("text");
2142 let map = loro.get_map("map");
2143 let list = loro.get_list("list");
2144 let mut txn = loro.txn().unwrap();
2145 for i in 0..10 {
2146 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2147 text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
2148 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2149 }
2150 txn.commit().unwrap();
2151 let b = LoroDoc::new();
2152 b.import(&loro.export_snapshot().unwrap()).unwrap();
2153 loro.checkout(&Frontiers::default()).unwrap();
2154 {
2155 let json = &loro.get_deep_value();
2156 assert_eq!(
2157 json.to_json_value(),
2158 serde_json::json!({"text":"","list":[],"map":{}})
2159 );
2160 }
2161
2162 b.checkout(&ID::new(1, 2).into()).unwrap();
2163 {
2164 let json = &b.get_deep_value();
2165 assert_eq!(
2166 json.to_json_value(),
2167 serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2168 );
2169 }
2170
2171 loro.checkout(&ID::new(1, 3).into()).unwrap();
2172 {
2173 let json = &loro.get_deep_value();
2174 assert_eq!(
2175 json.to_json_value(),
2176 serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2177 );
2178 }
2179
2180 b.checkout(&ID::new(1, 29).into()).unwrap();
2181 {
2182 let json = &b.get_deep_value();
2183 assert_eq!(
2184 json.to_json_value(),
2185 serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2186 );
2187 }
2188 }
2189
2190 #[test]
2191 fn import_batch_err_181() {
2192 let a = LoroDoc::new_auto_commit();
2193 let update_a = a.export_snapshot();
2194 let b = LoroDoc::new_auto_commit();
2195 b.import_batch(&[update_a.unwrap()]).unwrap();
2196 b.get_text("text").insert(0, "hello").unwrap();
2197 b.commit_then_renew();
2198 let oplog = b.oplog().lock().unwrap();
2199 drop(oplog);
2200 b.export_from(&Default::default());
2201 }
2202}