1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::Timestamp,
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48 LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 #[inline]
89 pub fn with_barrier<F, R>(&self, f: F) -> R
90 where
91 F: FnOnce() -> R,
92 {
93 let (options, guard) = self.implicit_commit_then_stop();
94 let result = f();
95 drop(guard);
96 self.renew_txn_if_auto_commit(options);
97 result
98 }
99
100 pub fn new() -> Self {
101 let oplog = OpLog::new();
102 let arena = oplog.arena.clone();
103 let config: Configure = oplog.configure.clone();
104 let lock_group = LoroLockGroup::new();
105 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
106 let inner = Arc::new_cyclic(|w| {
107 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
108 LoroDocInner {
109 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
110 state,
111 config,
112 detached: AtomicBool::new(false),
113 auto_commit: AtomicBool::new(false),
114 observer: Arc::new(Observer::new(arena.clone())),
115 diff_calculator: Arc::new(
116 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
117 ),
118 txn: global_txn,
119 arena,
120 local_update_subs: SubscriberSetWithQueue::new(),
121 peer_id_change_subs: SubscriberSetWithQueue::new(),
122 pre_commit_subs: SubscriberSetWithQueue::new(),
123 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
124 }
125 });
126 LoroDoc { inner }
127 }
128
129 pub fn fork(&self) -> Self {
130 if self.is_detached() {
131 return self.fork_at(&self.state_frontiers());
132 }
133
134 let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
135 let doc = Self::new();
136 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default()).unwrap();
137 doc.set_config(&self.config);
138 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
139 doc.start_auto_commit();
140 }
141 doc
142 }
143 pub fn set_detached_editing(&self, enable: bool) {
159 self.config.set_detached_editing(enable);
160 if enable && self.is_detached() {
161 self.with_barrier(|| {
162 self.renew_peer_id();
163 });
164 }
165 }
166
167 #[inline]
169 pub fn new_auto_commit() -> Self {
170 let doc = Self::new();
171 doc.start_auto_commit();
172 doc
173 }
174
175 #[inline(always)]
176 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
177 if peer == PeerID::MAX {
178 return Err(LoroError::InvalidPeerID);
179 }
180 let next_id = self.oplog.lock().unwrap().next_id(peer);
181 if self.auto_commit.load(Acquire) {
182 let doc_state = self.state.lock().unwrap();
183 doc_state
184 .peer
185 .store(peer, std::sync::atomic::Ordering::Relaxed);
186
187 if doc_state.is_in_txn() {
188 drop(doc_state);
189 self.with_barrier(|| {});
191 }
192 self.peer_id_change_subs.emit(&(), next_id);
193 return Ok(());
194 }
195
196 let doc_state = self.state.lock().unwrap();
197 if doc_state.is_in_txn() {
198 return Err(LoroError::TransactionError(
199 "Cannot change peer id during transaction"
200 .to_string()
201 .into_boxed_str(),
202 ));
203 }
204
205 doc_state
206 .peer
207 .store(peer, std::sync::atomic::Ordering::Relaxed);
208 drop(doc_state);
209 self.peer_id_change_subs.emit(&(), next_id);
210 Ok(())
211 }
212
213 pub(crate) fn renew_peer_id(&self) {
215 let peer_id = DefaultRandom.next_u64();
216 self.set_peer_id(peer_id).unwrap();
217 }
218
219 #[inline]
231 #[must_use]
232 pub fn implicit_commit_then_stop(
233 &self,
234 ) -> (
235 Option<CommitOptions>,
236 LoroMutexGuard<'_, Option<Transaction>>,
237 ) {
238 let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
240 (a, b.unwrap())
241 }
242
243 #[inline]
248 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
249 self.commit_internal(CommitOptions::new().immediate_renew(true), false)
251 .0
252 }
253
254 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
259 let mut txn_guard = self.txn.lock().unwrap();
260 let Some(txn) = txn_guard.as_mut() else {
261 return Some(txn_guard);
262 };
263
264 if txn.is_peer_first_appearance {
265 txn.is_peer_first_appearance = false;
266 drop(txn_guard);
267 self.first_commit_from_peer_subs.emit(
269 &(),
270 FirstCommitFromPeerPayload {
271 peer: self.peer_id(),
272 },
273 );
274 }
275
276 None
277 }
278
279 #[instrument(skip_all)]
287 fn commit_internal(
288 &self,
289 config: CommitOptions,
290 preserve_on_empty: bool,
291 ) -> (
292 Option<CommitOptions>,
293 Option<LoroMutexGuard<'_, Option<Transaction>>>,
294 ) {
295 if !self.auto_commit.load(Acquire) {
296 let txn_guard = self.txn.lock().unwrap();
297 return (None, Some(txn_guard));
300 }
301
302 loop {
303 if let Some(txn_guard) = self.before_commit() {
304 return (None, Some(txn_guard));
305 }
306
307 let mut txn_guard = self.txn.lock().unwrap();
308 let txn = txn_guard.take();
309 let Some(mut txn) = txn else {
310 return (None, Some(txn_guard));
311 };
312 let on_commit = txn.take_on_commit();
313 if let Some(origin) = config.origin.clone() {
314 txn.set_origin(origin);
315 }
316
317 if let Some(timestamp) = config.timestamp {
318 txn.set_timestamp(timestamp);
319 }
320
321 if let Some(msg) = config.commit_msg.as_ref() {
322 txn.set_msg(Some(msg.clone()));
323 }
324
325 let id_span = txn.id_span();
326 let mut options = txn.commit().unwrap();
327 if let Some(opts) = options.as_mut() {
329 if config.origin.is_some() {
331 opts.set_origin(None);
332 }
333 if !preserve_on_empty {
335 options = None;
336 }
337 }
338 if config.immediate_renew {
339 assert!(self.can_edit());
340 let mut t = self.txn().unwrap();
341 if let Some(options) = options.as_ref() {
342 t.set_options(options.clone());
343 }
344 *txn_guard = Some(t);
345 }
346
347 if let Some(on_commit) = on_commit {
348 drop(txn_guard);
349 on_commit(&self.state, &self.oplog, id_span);
350 txn_guard = self.txn.lock().unwrap();
351 if !config.immediate_renew && txn_guard.is_some() {
352 continue;
354 }
355 }
356
357 return (
358 options,
359 if !config.immediate_renew {
360 Some(txn_guard)
361 } else {
362 None
363 },
364 );
365 }
366 }
367
368 #[instrument(skip_all)]
373 pub fn commit_with(
374 &self,
375 config: CommitOptions,
376 ) -> (
377 Option<CommitOptions>,
378 Option<LoroMutexGuard<'_, Option<Transaction>>>,
379 ) {
380 self.commit_internal(config, false)
381 }
382
383 pub fn set_next_commit_message(&self, message: &str) {
385 let mut binding = self.txn.lock().unwrap();
386 let Some(txn) = binding.as_mut() else {
387 return;
388 };
389
390 if message.is_empty() {
391 txn.set_msg(None)
392 } else {
393 txn.set_msg(Some(message.into()))
394 }
395 }
396
397 pub fn set_next_commit_origin(&self, origin: &str) {
399 let mut txn = self.txn.lock().unwrap();
400 if let Some(txn) = txn.as_mut() {
401 txn.set_origin(origin.into());
402 }
403 }
404
405 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
407 let mut txn = self.txn.lock().unwrap();
408 if let Some(txn) = txn.as_mut() {
409 txn.set_timestamp(timestamp);
410 }
411 }
412
413 pub fn set_next_commit_options(&self, options: CommitOptions) {
415 let mut txn = self.txn.lock().unwrap();
416 if let Some(txn) = txn.as_mut() {
417 txn.set_options(options);
418 }
419 }
420
421 pub fn clear_next_commit_options(&self) {
423 let mut txn = self.txn.lock().unwrap();
424 if let Some(txn) = txn.as_mut() {
425 txn.set_options(CommitOptions::new());
426 }
427 }
428
429 #[inline]
440 pub fn set_record_timestamp(&self, record: bool) {
441 self.config.set_record_timestamp(record);
442 }
443
444 #[inline]
449 pub fn set_change_merge_interval(&self, interval: i64) {
450 self.config.set_merge_interval(interval);
451 }
452
453 pub fn can_edit(&self) -> bool {
454 !self.is_detached() || self.config.detached_editing()
455 }
456
457 pub fn is_detached_editing_enabled(&self) -> bool {
458 self.config.detached_editing()
459 }
460
461 #[inline]
462 pub fn config_text_style(&self, text_style: StyleConfigMap) {
463 self.config.text_style_config.try_write().unwrap().map = text_style.map;
464 }
465
466 #[inline]
467 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
468 self.config
469 .text_style_config
470 .try_write()
471 .unwrap()
472 .default_style = text_style;
473 }
474 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
475 let doc = Self::new();
476 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
477 if mode.is_snapshot() {
478 doc.with_barrier(|| -> Result<(), LoroError> {
479 decode_snapshot(&doc, mode, body, Default::default())?;
480 Ok(())
481 })?;
482 Ok(doc)
483 } else {
484 Err(LoroError::DecodeError(
485 "Invalid encode mode".to_string().into(),
486 ))
487 }
488 }
489
490 #[inline(always)]
492 pub fn can_reset_with_snapshot(&self) -> bool {
493 let oplog = self.oplog.lock().unwrap();
494 if oplog.batch_importing {
495 return false;
496 }
497
498 if self.is_detached() {
499 return false;
500 }
501
502 oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
503 }
504
505 #[inline(always)]
511 pub fn is_detached(&self) -> bool {
512 self.detached.load(Acquire)
513 }
514
515 pub(crate) fn set_detached(&self, detached: bool) {
516 self.detached.store(detached, Release);
517 }
518
519 #[inline(always)]
520 pub fn peer_id(&self) -> PeerID {
521 self.state
522 .lock()
523 .unwrap()
524 .peer
525 .load(std::sync::atomic::Ordering::Relaxed)
526 }
527
528 #[inline(always)]
529 pub fn detach(&self) {
530 self.with_barrier(|| self.set_detached(true));
531 }
532
533 #[inline(always)]
534 pub fn attach(&self) {
535 self.checkout_to_latest()
536 }
537
538 pub fn state_timestamp(&self) -> Timestamp {
541 let f = { self.state.lock().unwrap().frontiers.clone() };
543 self.oplog.lock().unwrap().get_timestamp_of_version(&f)
544 }
545
546 #[inline(always)]
547 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
548 &self.state
549 }
550
551 #[inline]
552 pub fn get_state_deep_value(&self) -> LoroValue {
553 self.state.lock().unwrap().get_deep_value()
554 }
555
556 #[inline(always)]
557 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
558 &self.oplog
559 }
560
561 #[inline(always)]
562 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
563 let s = debug_span!("import", peer = self.peer_id());
564 let _e = s.enter();
565 self.import_with(bytes, Default::default())
566 }
567
568 #[inline]
569 pub fn import_with(
570 &self,
571 bytes: &[u8],
572 origin: InternalString,
573 ) -> Result<ImportStatus, LoroError> {
574 self.with_barrier(|| self._import_with(bytes, origin))
575 }
576
577 #[tracing::instrument(skip_all)]
578 fn _import_with(
579 &self,
580 bytes: &[u8],
581 origin: InternalString,
582 ) -> Result<ImportStatus, LoroError> {
583 ensure_cov::notify_cov("loro_internal::import");
584 let parsed = parse_header_and_body(bytes, true)?;
585 loro_common::info!("Importing with mode={:?}", &parsed.mode);
586 let result = match parsed.mode {
587 EncodeMode::OutdatedRle => {
588 if self.state.lock().unwrap().is_in_txn() {
589 return Err(LoroError::ImportWhenInTxn);
590 }
591
592 let s = tracing::span!(
593 tracing::Level::INFO,
594 "Import updates ",
595 peer = self.peer_id()
596 );
597 let _e = s.enter();
598 self.update_oplog_and_apply_delta_to_state_if_needed(
599 |oplog| oplog.decode(parsed),
600 origin,
601 )
602 }
603 EncodeMode::OutdatedSnapshot => {
604 if self.can_reset_with_snapshot() {
605 loro_common::info!("Init by snapshot {}", self.peer_id());
606 decode_snapshot(self, parsed.mode, parsed.body, origin)
607 } else {
608 self.update_oplog_and_apply_delta_to_state_if_needed(
609 |oplog| oplog.decode(parsed),
610 origin,
611 )
612 }
613 }
614 EncodeMode::FastSnapshot => {
615 if self.can_reset_with_snapshot() {
616 ensure_cov::notify_cov("loro_internal::import::snapshot");
617 loro_common::info!("Init by fast snapshot {}", self.peer_id());
618 decode_snapshot(self, parsed.mode, parsed.body, origin)
619 } else {
620 self.update_oplog_and_apply_delta_to_state_if_needed(
621 |oplog| oplog.decode(parsed),
622 origin,
623 )
624
625 }
630 }
631 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
632 |oplog| oplog.decode(parsed),
633 origin,
634 ),
635 EncodeMode::Auto => {
636 unreachable!()
637 }
638 };
639
640 self.emit_events();
641
642 result
643 }
644
645 #[tracing::instrument(skip_all)]
646 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
647 &self,
648 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
649 origin: InternalString,
650 ) -> Result<ImportStatus, LoroError> {
651 let mut oplog = self.oplog.lock().unwrap();
652 if !self.is_detached() {
653 let old_vv = oplog.vv().clone();
654 let old_frontiers = oplog.frontiers().clone();
655 let result = f(&mut oplog);
656 if &old_vv != oplog.vv() {
657 let mut diff = DiffCalculator::new(false);
658 let (diff, diff_mode) = diff.calc_diff_internal(
659 &oplog,
660 &old_vv,
661 &old_frontiers,
662 oplog.vv(),
663 oplog.dag.get_frontiers(),
664 None,
665 );
666 let mut state = self.state.lock().unwrap();
667 state.apply_diff(
668 InternalDocDiff {
669 origin,
670 diff: (diff).into(),
671 by: EventTriggerKind::Import,
672 new_version: Cow::Owned(oplog.frontiers().clone()),
673 },
674 diff_mode,
675 );
676 }
677 result
678 } else {
679 f(&mut oplog)
680 }
681 }
682
683 fn emit_events(&self) {
684 let events = {
686 let mut state = self.state.lock().unwrap();
687 state.take_events()
688 };
689 for event in events {
690 self.observer.emit(event);
691 }
692 }
693
694 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
695 let mut state = self.state.lock().unwrap();
696 state.take_events()
697 }
698
699 #[tracing::instrument(skip_all)]
703 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
704 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
705 self.with_barrier(|| {
706 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
707 |oplog| crate::encoding::json_schema::import_json(oplog, json),
708 Default::default(),
709 );
710 self.emit_events();
711 result
712 })
713 }
714
715 pub fn export_json_updates(
716 &self,
717 start_vv: &VersionVector,
718 end_vv: &VersionVector,
719 with_peer_compression: bool,
720 ) -> JsonSchema {
721 self.with_barrier(|| {
722 let oplog = self.oplog.lock().unwrap();
723 let mut start_vv = start_vv;
724 let _temp: Option<VersionVector>;
725 if !oplog.dag.shallow_since_vv().is_empty() {
726 let mut include_all = true;
728 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
729 if start_vv.get(peer).unwrap_or(&0) < counter {
730 include_all = false;
731 break;
732 }
733 }
734 if !include_all {
735 let mut vv = start_vv.clone();
736 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
737 vv.extend_to_include_end_id(ID::new(peer, counter));
738 }
739 _temp = Some(vv);
740 start_vv = _temp.as_ref().unwrap();
741 }
742 }
743
744 crate::encoding::json_schema::export_json(
745 &oplog,
746 start_vv,
747 end_vv,
748 with_peer_compression,
749 )
750 })
751 }
752
753 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
754 let oplog = self.oplog.lock().unwrap();
755 let mut changes = export_json_in_id_span(&oplog, id_span);
756 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
757 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
758 changes.push(change_json);
759 }
760 changes
761 }
762
763 #[inline]
765 pub fn oplog_vv(&self) -> VersionVector {
766 self.oplog.lock().unwrap().vv().clone()
767 }
768
769 #[inline]
771 pub fn state_vv(&self) -> VersionVector {
772 let oplog = self.oplog.lock().unwrap();
773 let f = &self.state.lock().unwrap().frontiers;
774 oplog.dag.frontiers_to_vv(f).unwrap()
775 }
776
777 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
778 let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
779 if let LoroValue::Container(c) = value {
780 Some(ValueOrHandler::Handler(Handler::new_attached(
781 c.clone(),
782 self.clone(),
783 )))
784 } else {
785 Some(ValueOrHandler::Value(value))
786 }
787 }
788
789 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
791 let path = str_to_path(path)?;
792 self.get_by_path(&path)
793 }
794
795 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
796 let arena = &self.arena;
797 let txn = self.txn.lock().unwrap();
798 let txn = txn.as_ref()?;
799 let ops_ = txn.local_ops();
800 let new_id = ID {
801 peer: *txn.peer(),
802 counter: ops_.first()?.counter,
803 };
804 let change = ChangeRef {
805 id: &new_id,
806 deps: txn.frontiers(),
807 timestamp: &txn
808 .timestamp()
809 .as_ref()
810 .copied()
811 .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
812 commit_msg: txn.msg(),
813 ops: ops_,
814 lamport: txn.lamport(),
815 };
816 let json = encode_change_to_json(change, arena);
817 Some(json)
818 }
819
820 #[inline]
821 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
822 if self.has_container(&id) {
823 Some(Handler::new_attached(id, self.clone()))
824 } else {
825 None
826 }
827 }
828
829 #[inline]
832 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
833 let id = id.into_container_id(&self.arena, ContainerType::Text);
834 assert!(self.has_container(&id));
835 Handler::new_attached(id, self.clone()).into_text().unwrap()
836 }
837
838 #[inline]
841 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
842 let id = id.into_container_id(&self.arena, ContainerType::List);
843 assert!(self.has_container(&id));
844 Handler::new_attached(id, self.clone()).into_list().unwrap()
845 }
846
847 #[inline]
850 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
851 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
852 assert!(self.has_container(&id));
853 Handler::new_attached(id, self.clone())
854 .into_movable_list()
855 .unwrap()
856 }
857
858 #[inline]
861 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
862 let id = id.into_container_id(&self.arena, ContainerType::Map);
863 assert!(self.has_container(&id));
864 Handler::new_attached(id, self.clone()).into_map().unwrap()
865 }
866
867 #[inline]
870 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
871 let id = id.into_container_id(&self.arena, ContainerType::Tree);
872 assert!(self.has_container(&id));
873 Handler::new_attached(id, self.clone()).into_tree().unwrap()
874 }
875
876 #[cfg(feature = "counter")]
877 pub fn get_counter<I: IntoContainerId>(
878 &self,
879 id: I,
880 ) -> crate::handler::counter::CounterHandler {
881 let id = id.into_container_id(&self.arena, ContainerType::Counter);
882 assert!(self.has_container(&id));
883 Handler::new_attached(id, self.clone())
884 .into_counter()
885 .unwrap()
886 }
887
888 #[must_use]
889 pub fn has_container(&self, id: &ContainerID) -> bool {
890 if id.is_root() {
891 return true;
892 }
893
894 let exist = self.state.lock().unwrap().does_container_exist(id);
895 exist
896 }
897
898 #[instrument(level = "info", skip_all)]
912 pub fn undo_internal(
913 &self,
914 id_span: IdSpan,
915 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
916 post_transform_base: Option<&DiffBatch>,
917 before_diff: &mut dyn FnMut(&DiffBatch),
918 ) -> LoroResult<CommitWhenDrop<'_>> {
919 if !self.can_edit() {
920 return Err(LoroError::EditWhenDetached);
921 }
922
923 let (options, txn) = self.implicit_commit_then_stop();
924 if !self
925 .oplog()
926 .lock()
927 .unwrap()
928 .vv()
929 .includes_id(id_span.id_last())
930 {
931 self.renew_txn_if_auto_commit(options);
932 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
933 }
934
935 let (was_recording, latest_frontiers) = {
936 let mut state = self.state.lock().unwrap();
937 let was_recording = state.is_recording();
938 state.stop_and_clear_recording();
939 (was_recording, state.frontiers.clone())
940 };
941
942 let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
943 let diff = crate::undo::undo(
944 spans,
945 match post_transform_base {
946 Some(d) => Either::Right(d),
947 None => Either::Left(&latest_frontiers),
948 },
949 |from, to| {
950 self._checkout_without_emitting(from, false, false).unwrap();
951 self.state.lock().unwrap().start_recording();
952 self._checkout_without_emitting(to, false, false).unwrap();
953 let mut state = self.state.lock().unwrap();
954 let e = state.take_events();
955 state.stop_and_clear_recording();
956 DiffBatch::new(e)
957 },
958 before_diff,
959 );
960
961 self._checkout_without_emitting(&latest_frontiers, false, false)?;
965 self.set_detached(false);
966 if was_recording {
967 self.state.lock().unwrap().start_recording();
968 }
969 drop(txn);
970 self.start_auto_commit();
971 if let Err(e) = self._apply_diff(diff, container_remap, true) {
975 warn!("Undo Failed {:?}", e);
976 }
977
978 if let Some(options) = options {
979 self.set_next_commit_options(options);
980 }
981 Ok(CommitWhenDrop {
982 doc: self,
983 default_options: CommitOptions::new().origin("undo"),
984 })
985 }
986
987 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
993 let f = self.state_frontiers();
996 let diff = self.diff(&f, target)?;
997 self._apply_diff(diff, &mut Default::default(), false)
998 }
999
1000 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1005 {
1006 let oplog = self.oplog.lock().unwrap();
1008 for id in a.iter() {
1009 if !oplog.dag.contains(id) {
1010 return Err(LoroError::FrontiersNotFound(id));
1011 }
1012 }
1013 for id in b.iter() {
1014 if !oplog.dag.contains(id) {
1015 return Err(LoroError::FrontiersNotFound(id));
1016 }
1017 }
1018 }
1019
1020 let (options, txn) = self.implicit_commit_then_stop();
1021 let was_detached = self.is_detached();
1022 let old_frontiers = self.state_frontiers();
1023 let was_recording = {
1024 let mut state = self.state.lock().unwrap();
1025 let is_recording = state.is_recording();
1026 state.stop_and_clear_recording();
1027 is_recording
1028 };
1029 self._checkout_without_emitting(a, true, false).unwrap();
1030 self.state.lock().unwrap().start_recording();
1031 self._checkout_without_emitting(b, true, false).unwrap();
1032 let e = {
1033 let mut state = self.state.lock().unwrap();
1034 let e = state.take_events();
1035 state.stop_and_clear_recording();
1036 e
1037 };
1038 self._checkout_without_emitting(&old_frontiers, false, false)
1039 .unwrap();
1040 drop(txn);
1041 if !was_detached {
1042 self.set_detached(false);
1043 self.renew_txn_if_auto_commit(options);
1044 }
1045 if was_recording {
1046 self.state.lock().unwrap().start_recording();
1047 }
1048 Ok(DiffBatch::new(e))
1049 }
1050
1051 #[inline(always)]
1053 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1054 self._apply_diff(diff, &mut Default::default(), true)
1055 }
1056
1057 pub(crate) fn _apply_diff(
1069 &self,
1070 diff: DiffBatch,
1071 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1072 skip_unreachable: bool,
1073 ) -> LoroResult<()> {
1074 if !self.can_edit() {
1075 return Err(LoroError::EditWhenDetached);
1076 }
1077
1078 let mut ans: LoroResult<()> = Ok(());
1079 let mut missing_containers: Vec<ContainerID> = Vec::new();
1080 for (mut id, diff) in diff.into_iter() {
1081 let mut remapped = false;
1082 while let Some(rid) = container_remap.get(&id) {
1083 remapped = true;
1084 id = rid.clone();
1085 }
1086
1087 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1088 let exists = self.state.lock().unwrap().does_container_exist(&id);
1090 if !exists {
1091 missing_containers.push(id);
1092 continue;
1093 }
1094 self.state.lock().unwrap().ensure_container(&id);
1096 }
1097
1098 if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1099 continue;
1100 }
1101
1102 let Some(h) = self.get_handler(id.clone()) else {
1103 return Err(LoroError::ContainersNotFound {
1104 containers: Box::new(vec![id]),
1105 });
1106 };
1107 if let Err(e) = h.apply_diff(diff, container_remap) {
1108 ans = Err(e);
1109 }
1110 }
1111
1112 if !missing_containers.is_empty() {
1113 return Err(LoroError::ContainersNotFound {
1114 containers: Box::new(missing_containers),
1115 });
1116 }
1117
1118 ans
1119 }
1120
1121 #[inline]
1123 pub fn diagnose_size(&self) {
1124 self.oplog().lock().unwrap().diagnose_size();
1125 }
1126
1127 #[inline]
1128 pub fn oplog_frontiers(&self) -> Frontiers {
1129 self.oplog().lock().unwrap().frontiers().clone()
1130 }
1131
1132 #[inline]
1133 pub fn state_frontiers(&self) -> Frontiers {
1134 self.state.lock().unwrap().frontiers.clone()
1135 }
1136
1137 #[inline]
1141 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1142 self.oplog().lock().unwrap().cmp_with_frontiers(other)
1143 }
1144
1145 #[inline]
1149 pub fn cmp_frontiers(
1150 &self,
1151 a: &Frontiers,
1152 b: &Frontiers,
1153 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1154 self.oplog().lock().unwrap().cmp_frontiers(a, b)
1155 }
1156
1157 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1158 let mut state = self.state.lock().unwrap();
1159 if !state.is_recording() {
1160 state.start_recording();
1161 }
1162
1163 self.observer.subscribe_root(callback)
1164 }
1165
1166 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1167 let mut state = self.state.lock().unwrap();
1168 if !state.is_recording() {
1169 state.start_recording();
1170 }
1171
1172 self.observer.subscribe(container_id, callback)
1173 }
1174
1175 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1176 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1177 activate();
1178 sub
1179 }
1180
1181 #[tracing::instrument(skip_all)]
1183 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1184 if bytes.is_empty() {
1185 return Ok(ImportStatus::default());
1186 }
1187
1188 if bytes.len() == 1 {
1189 return self.import(&bytes[0]);
1190 }
1191
1192 let mut success = VersionRange::default();
1193 let mut pending = VersionRange::default();
1194 let mut meta_arr = bytes
1195 .iter()
1196 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1197 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1198 meta_arr.sort_by(|a, b| {
1199 a.0.mode
1200 .cmp(&b.0.mode)
1201 .then(b.0.change_num.cmp(&a.0.change_num))
1202 });
1203
1204 let (options, txn) = self.implicit_commit_then_stop();
1205 let is_detached = self.is_detached();
1229 self.set_detached(true);
1230 self.oplog.lock().unwrap().batch_importing = true;
1231 let mut err = None;
1232 for (_meta, data) in meta_arr {
1233 match self._import_with(data, Default::default()) {
1234 Ok(s) => {
1235 for (peer, (start, end)) in s.success.iter() {
1236 match success.0.entry(*peer) {
1237 Entry::Occupied(mut e) => {
1238 e.get_mut().1 = *end.max(&e.get().1);
1239 }
1240 Entry::Vacant(e) => {
1241 e.insert((*start, *end));
1242 }
1243 }
1244 }
1245
1246 if let Some(p) = s.pending.as_ref() {
1247 for (&peer, &(start, end)) in p.iter() {
1248 match pending.0.entry(peer) {
1249 Entry::Occupied(mut e) => {
1250 e.get_mut().0 = start.min(e.get().0);
1251 e.get_mut().1 = end.min(e.get().1);
1252 }
1253 Entry::Vacant(e) => {
1254 e.insert((start, end));
1255 }
1256 }
1257 }
1258 }
1259 }
1260 Err(e) => {
1261 err = Some(e);
1262 }
1263 }
1264 }
1265
1266 let mut oplog = self.oplog.lock().unwrap();
1267 oplog.batch_importing = false;
1268 drop(oplog);
1269 if !is_detached {
1270 self._checkout_to_latest_with_guard(txn);
1271 } else {
1272 drop(txn);
1273 }
1274
1275 self.renew_txn_if_auto_commit(options);
1276 if let Some(err) = err {
1277 return Err(err);
1278 }
1279
1280 Ok(ImportStatus {
1281 success,
1282 pending: if pending.is_empty() {
1283 None
1284 } else {
1285 Some(pending)
1286 },
1287 })
1288 }
1289
1290 #[inline]
1292 pub fn get_value(&self) -> LoroValue {
1293 self.state.lock().unwrap().get_value()
1294 }
1295
1296 #[inline]
1298 pub fn get_deep_value(&self) -> LoroValue {
1299 self.state.lock().unwrap().get_deep_value()
1300 }
1301
1302 #[inline]
1304 pub fn get_deep_value_with_id(&self) -> LoroValue {
1305 self.state.lock().unwrap().get_deep_value_with_id()
1306 }
1307
1308 pub fn checkout_to_latest(&self) {
1309 let (options, _guard) = self.implicit_commit_then_stop();
1310 if !self.is_detached() {
1311 drop(_guard);
1312 self.renew_txn_if_auto_commit(options);
1313 return;
1314 }
1315
1316 self._checkout_to_latest_without_commit(true);
1317 drop(_guard);
1318 self.renew_txn_if_auto_commit(options);
1319 }
1320
1321 fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1322 if !self.is_detached() {
1323 self._renew_txn_if_auto_commit_with_guard(None, guard);
1324 return;
1325 }
1326
1327 self._checkout_to_latest_without_commit(true);
1328 self._renew_txn_if_auto_commit_with_guard(None, guard);
1329 }
1330
1331 pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1333 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1334 let f = self.oplog_frontiers();
1335 let this = &self;
1336 let frontiers = &f;
1337 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1338 .unwrap(); this.emit_events();
1341 if this.config.detached_editing() {
1342 this.renew_peer_id();
1343 }
1344
1345 self.set_detached(false);
1346 });
1347 }
1348
1349 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1354 let (options, guard) = self.implicit_commit_then_stop();
1355 self._checkout_without_emitting(frontiers, true, true)?;
1356 self.emit_events();
1357 drop(guard);
1358 if self.config.detached_editing() {
1359 self.renew_peer_id();
1360 self.renew_txn_if_auto_commit(options);
1361 } else if !self.is_detached() {
1362 self.renew_txn_if_auto_commit(options);
1363 }
1364
1365 Ok(())
1366 }
1367
1368 #[instrument(level = "info", skip(self))]
1370 pub(crate) fn _checkout_without_emitting(
1371 &self,
1372 frontiers: &Frontiers,
1373 to_shrink_frontiers: bool,
1374 to_commit_then_renew: bool,
1375 ) -> Result<(), LoroError> {
1376 assert!(self.txn.is_locked());
1377 let from_frontiers = self.state_frontiers();
1378 loro_common::info!(
1379 "checkout from={:?} to={:?} cur_vv={:?}",
1380 from_frontiers,
1381 frontiers,
1382 self.oplog_vv()
1383 );
1384
1385 if &from_frontiers == frontiers {
1386 return Ok(());
1387 }
1388
1389 let oplog = self.oplog.lock().unwrap();
1390 if oplog.dag.is_before_shallow_root(frontiers) {
1391 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1392 }
1393
1394 let frontiers = if to_shrink_frontiers {
1395 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1396 } else {
1397 frontiers.clone()
1398 };
1399
1400 if from_frontiers == frontiers {
1401 return Ok(());
1402 }
1403
1404 let mut state = self.state.lock().unwrap();
1405 let mut calc = self.diff_calculator.lock().unwrap();
1406 for i in frontiers.iter() {
1407 if !oplog.dag.contains(i) {
1408 return Err(LoroError::FrontiersNotFound(i));
1409 }
1410 }
1411
1412 let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1413 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1414 return Err(LoroError::NotFoundError(
1415 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1416 ));
1417 };
1418
1419 self.set_detached(true);
1420 let (diff, diff_mode) =
1421 calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1422 state.apply_diff(
1423 InternalDocDiff {
1424 origin: "checkout".into(),
1425 diff: Cow::Owned(diff),
1426 by: EventTriggerKind::Checkout,
1427 new_version: Cow::Owned(frontiers.clone()),
1428 },
1429 diff_mode,
1430 );
1431
1432 Ok(())
1433 }
1434
1435 #[inline]
1436 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1437 self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1438 }
1439
1440 #[inline]
1441 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1442 self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1443 }
1444
1445 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1449 let updates = other
1450 .export(ExportMode::updates(&self.oplog_vv()))
1451 .unwrap();
1452 self.import(&updates)
1453 }
1454
1455 pub(crate) fn arena(&self) -> &SharedArena {
1456 &self.arena
1457 }
1458
1459 #[inline]
1460 pub fn len_ops(&self) -> usize {
1461 let oplog = self.oplog.lock().unwrap();
1462 let ans = oplog.vv().values().sum::<i32>() as usize;
1463 if oplog.is_shallow() {
1464 let sub = oplog
1465 .shallow_since_vv()
1466 .iter()
1467 .map(|(_, ops)| *ops)
1468 .sum::<i32>() as usize;
1469 ans - sub
1470 } else {
1471 ans
1472 }
1473 }
1474
1475 #[inline]
1476 pub fn len_changes(&self) -> usize {
1477 let oplog = self.oplog.lock().unwrap();
1478 oplog.len_changes()
1479 }
1480
1481 pub fn config(&self) -> &Configure {
1482 &self.config
1483 }
1484
1485 pub fn check_state_diff_calc_consistency_slow(&self) {
1490 {
1492 static IS_CHECKING: std::sync::atomic::AtomicBool =
1493 std::sync::atomic::AtomicBool::new(false);
1494 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1495 return;
1496 }
1497
1498 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1499 let peer_id = self.peer_id();
1500 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1501 let _g = s.enter();
1502 let options = self.implicit_commit_then_stop().0;
1503 self.oplog.lock().unwrap().check_dag_correctness();
1504 if self.is_shallow() {
1505 let initial_snapshot = self
1516 .export(ExportMode::state_only(Some(
1517 &self.shallow_since_frontiers(),
1518 )))
1519 .unwrap();
1520
1521 let doc = LoroDoc::new();
1523 doc.import(&initial_snapshot).unwrap();
1524 self.checkout(&self.shallow_since_frontiers()).unwrap();
1525 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1526
1527 let updates = self.export(ExportMode::all_updates()).unwrap();
1529
1530 doc.import(&updates).unwrap();
1532 self.checkout_to_latest();
1533
1534 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1537 let mut calculated_state = doc.app_state().lock().unwrap();
1538 let mut current_state = self.app_state().lock().unwrap();
1539 current_state.check_is_the_same(&mut calculated_state);
1540 } else {
1541 let f = self.state_frontiers();
1542 let vv = self
1543 .oplog()
1544 .lock()
1545 .unwrap()
1546 .dag
1547 .frontiers_to_vv(&f)
1548 .unwrap();
1549 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1550 let doc = Self::new();
1551 doc.import(&bytes).unwrap();
1552 let mut calculated_state = doc.app_state().lock().unwrap();
1553 let mut current_state = self.app_state().lock().unwrap();
1554 current_state.check_is_the_same(&mut calculated_state);
1555 }
1556
1557 self.renew_txn_if_auto_commit(options);
1558 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1559 }
1560 }
1561
1562 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1563 self.query_pos_internal(pos, true)
1564 }
1565
1566 pub(crate) fn query_pos_internal(
1568 &self,
1569 pos: &Cursor,
1570 ret_event_index: bool,
1571 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1572 if !self.has_container(&pos.container) {
1573 return Err(CannotFindRelativePosition::IdNotFound);
1574 }
1575
1576 let mut state = self.state.lock().unwrap();
1577 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1578 Ok(PosQueryResult {
1579 update: None,
1580 current: AbsolutePosition {
1581 pos: ans,
1582 side: pos.side,
1583 },
1584 })
1585 } else {
1586 drop(state);
1598 let result = self.with_barrier(|| {
1599 let oplog = self.oplog().lock().unwrap();
1600 if let Some(id) = pos.id {
1602 if oplog.arena.id_to_idx(&pos.container).is_none() {
1604 let mut s = self.state.lock().unwrap();
1605 if !s.does_container_exist(&pos.container) {
1606 return Err(CannotFindRelativePosition::ContainerDeleted);
1607 }
1608 s.ensure_container(&pos.container);
1609 drop(s);
1610 }
1611 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1612 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1614 if oplog.shallow_since_vv().includes_id(id) {
1615 return Err(CannotFindRelativePosition::HistoryCleared);
1616 }
1617
1618 tracing::error!("Cannot find id {}", id);
1619 return Err(CannotFindRelativePosition::IdNotFound);
1620 };
1621 let mut diff_calc = DiffCalculator::new(true);
1623 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1624 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1625 diff_calc.calc_diff_internal(
1627 &oplog,
1628 before,
1629 &before_frontiers,
1630 oplog.vv(),
1631 oplog.frontiers(),
1632 Some(&|target| idx == target),
1633 );
1634 let depth = self.arena.get_depth(idx);
1636 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1637 match diff_calc {
1638 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1639 let c = text.get_id_latest_pos(id).unwrap();
1640 let new_pos = c.pos;
1641 let handler = self.get_text(&pos.container);
1642 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1643 Ok(PosQueryResult {
1644 update: handler.get_cursor(current_pos, c.side),
1645 current: AbsolutePosition {
1646 pos: current_pos,
1647 side: c.side,
1648 },
1649 })
1650 }
1651 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1652 let c = list.get_id_latest_pos(id).unwrap();
1653 let new_pos = c.pos;
1654 let handler = self.get_list(&pos.container);
1655 Ok(PosQueryResult {
1656 update: handler.get_cursor(new_pos, c.side),
1657 current: AbsolutePosition {
1658 pos: new_pos,
1659 side: c.side,
1660 },
1661 })
1662 }
1663 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1664 let c = list.get_id_latest_pos(id).unwrap();
1665 let new_pos = c.pos;
1666 let handler = self.get_movable_list(&pos.container);
1667 let new_pos = handler.op_pos_to_user_pos(new_pos);
1668 Ok(PosQueryResult {
1669 update: handler.get_cursor(new_pos, c.side),
1670 current: AbsolutePosition {
1671 pos: new_pos,
1672 side: c.side,
1673 },
1674 })
1675 }
1676 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1677 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1678 #[cfg(feature = "counter")]
1679 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1680 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1681 }
1682 } else {
1683 match pos.container.container_type() {
1684 ContainerType::Text => {
1685 let text = self.get_text(&pos.container);
1686 Ok(PosQueryResult {
1687 update: Some(Cursor {
1688 id: None,
1689 container: text.id(),
1690 side: pos.side,
1691 origin_pos: text.len_unicode(),
1692 }),
1693 current: AbsolutePosition {
1694 pos: text.len_event(),
1695 side: pos.side,
1696 },
1697 })
1698 }
1699 ContainerType::List => {
1700 let list = self.get_list(&pos.container);
1701 Ok(PosQueryResult {
1702 update: Some(Cursor {
1703 id: None,
1704 container: list.id(),
1705 side: pos.side,
1706 origin_pos: list.len(),
1707 }),
1708 current: AbsolutePosition {
1709 pos: list.len(),
1710 side: pos.side,
1711 },
1712 })
1713 }
1714 ContainerType::MovableList => {
1715 let list = self.get_movable_list(&pos.container);
1716 Ok(PosQueryResult {
1717 update: Some(Cursor {
1718 id: None,
1719 container: list.id(),
1720 side: pos.side,
1721 origin_pos: list.len(),
1722 }),
1723 current: AbsolutePosition {
1724 pos: list.len(),
1725 side: pos.side,
1726 },
1727 })
1728 }
1729 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1730 unreachable!()
1731 }
1732 #[cfg(feature = "counter")]
1733 ContainerType::Counter => unreachable!(),
1734 }
1735 }
1736 });
1737 result
1738 }
1739 }
1740
1741 pub fn free_history_cache(&self) {
1746 self.oplog.lock().unwrap().free_history_cache();
1747 }
1748
1749 pub fn free_diff_calculator(&self) {
1751 *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1752 }
1753
1754 pub fn has_history_cache(&self) -> bool {
1757 self.oplog.lock().unwrap().has_history_cache()
1758 }
1759
1760 #[inline]
1764 pub fn compact_change_store(&self) {
1765 self.with_barrier(|| {
1766 self.oplog.lock().unwrap().compact_change_store();
1767 });
1768 }
1769
1770 #[inline]
1774 pub fn analyze(&self) -> DocAnalysis {
1775 DocAnalysis::analyze(self)
1776 }
1777
1778 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1780 let mut state = self.state.lock().unwrap();
1781 if state.arena.id_to_idx(id).is_none() {
1782 if !state.does_container_exist(id) {
1783 return None;
1784 }
1785 state.ensure_container(id);
1786 }
1787 let idx = state.arena.id_to_idx(id).unwrap();
1788 state.get_path(idx)
1789 }
1790
1791 #[instrument(skip(self))]
1792 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1793 self.with_barrier(|| {
1794 let ans = match mode {
1795 ExportMode::Snapshot => export_fast_snapshot(self),
1796 ExportMode::Updates { from } => export_fast_updates(self, &from),
1797 ExportMode::UpdatesInRange { spans } => {
1798 export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1799 }
1800 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1801 ExportMode::StateOnly(f) => match f {
1802 Some(f) => export_state_only_snapshot(self, &f)?,
1803 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1804 },
1805 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1806 };
1807 Ok(ans)
1808 })
1809 }
1810
1811 pub fn shallow_since_vv(&self) -> ImVersionVector {
1817 self.oplog().lock().unwrap().shallow_since_vv().clone()
1818 }
1819
1820 pub fn shallow_since_frontiers(&self) -> Frontiers {
1821 self.oplog()
1822 .lock()
1823 .unwrap()
1824 .shallow_since_frontiers()
1825 .clone()
1826 }
1827
1828 pub fn is_shallow(&self) -> bool {
1830 !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1831 }
1832
1833 pub fn get_pending_txn_len(&self) -> usize {
1838 if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1839 txn.len()
1840 } else {
1841 0
1842 }
1843 }
1844
1845 #[inline]
1846 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1847 self.oplog().lock().unwrap().dag.find_path(from, to)
1848 }
1849
1850 pub fn subscribe_first_commit_from_peer(
1856 &self,
1857 callback: FirstCommitFromPeerCallback,
1858 ) -> Subscription {
1859 let (s, enable) = self
1860 .first_commit_from_peer_subs
1861 .inner()
1862 .insert((), callback);
1863 enable();
1864 s
1865 }
1866
1867 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1872 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1873 enable();
1874 s
1875 }
1876}
1877
1878#[derive(Debug, thiserror::Error)]
1879pub enum ChangeTravelError {
1880 #[error("Target id not found {0:?}")]
1881 TargetIdNotFound(ID),
1882 #[error("The shallow history of the doc doesn't include the target version")]
1883 TargetVersionNotIncluded,
1884}
1885
1886impl LoroDoc {
1887 pub fn travel_change_ancestors(
1888 &self,
1889 ids: &[ID],
1890 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1891 ) -> Result<(), ChangeTravelError> {
1892 let (options, guard) = self.implicit_commit_then_stop();
1893 drop(guard);
1894 struct PendingNode(ChangeMeta);
1895 impl PartialEq for PendingNode {
1896 fn eq(&self, other: &Self) -> bool {
1897 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1898 }
1899 }
1900
1901 impl Eq for PendingNode {}
1902 impl PartialOrd for PendingNode {
1903 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1904 Some(self.cmp(other))
1905 }
1906 }
1907
1908 impl Ord for PendingNode {
1909 fn cmp(&self, other: &Self) -> Ordering {
1910 self.0
1911 .lamport_last()
1912 .cmp(&other.0.lamport_last())
1913 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1914 }
1915 }
1916
1917 for id in ids {
1918 let op_log = &self.oplog().lock().unwrap();
1919 if !op_log.vv().includes_id(*id) {
1920 return Err(ChangeTravelError::TargetIdNotFound(*id));
1921 }
1922 if op_log.dag.shallow_since_vv().includes_id(*id) {
1923 return Err(ChangeTravelError::TargetVersionNotIncluded);
1924 }
1925 }
1926
1927 let mut visited = FxHashSet::default();
1928 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1929 for id in ids {
1930 pending.push(PendingNode(ChangeMeta::from_change(
1931 &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1932 )));
1933 }
1934 while let Some(PendingNode(node)) = pending.pop() {
1935 let deps = node.deps.clone();
1936 if f(node).is_break() {
1937 break;
1938 }
1939
1940 for dep in deps.iter() {
1941 let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1942 continue;
1943 };
1944 if visited.contains(&dep_node.id) {
1945 continue;
1946 }
1947
1948 visited.insert(dep_node.id);
1949 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1950 }
1951 }
1952
1953 let ans = Ok(());
1954 self.renew_txn_if_auto_commit(options);
1955 ans
1956 }
1957
1958 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1959 self.with_barrier(|| {
1960 let mut set = FxHashSet::default();
1961 {
1962 let oplog = self.oplog().lock().unwrap();
1963 for op in oplog.iter_ops(id.to_span(len)) {
1964 let id = oplog.arena.get_container_id(op.container()).unwrap();
1965 set.insert(id);
1966 }
1967 }
1968 set
1969 })
1970 }
1971
1972 pub fn delete_root_container(&self, cid: ContainerID) {
1973 if !cid.is_root() {
1974 return;
1975 }
1976
1977 if !self.has_container(&cid) {
1979 return;
1980 }
1981
1982 let Some(h) = self.get_handler(cid.clone()) else {
1983 return;
1984 };
1985
1986 if let Err(e) = h.clear() {
1987 eprintln!("Failed to clear handler: {:?}", e);
1988 return;
1989 }
1990 self.config
1991 .deleted_root_containers
1992 .lock()
1993 .unwrap()
1994 .insert(cid);
1995 }
1996
1997 pub fn set_hide_empty_root_containers(&self, hide: bool) {
1998 self.config
1999 .hide_empty_root_containers
2000 .store(hide, std::sync::atomic::Ordering::Relaxed);
2001 }
2002}
2003
2004fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2006 let start_vv = oplog
2007 .dag
2008 .frontiers_to_vv(&id.into())
2009 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2010 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2011 for op in change.ops.iter().rev() {
2012 if op.container != idx {
2013 continue;
2014 }
2015 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2016 if d.id_start.to_span(d.atom_len()).contains(id) {
2017 return Some(ID::new(change.peer(), op.counter));
2018 }
2019 }
2020 }
2021 }
2022
2023 None
2024}
2025
2026#[derive(Debug)]
2027pub struct CommitWhenDrop<'a> {
2028 doc: &'a LoroDoc,
2029 default_options: CommitOptions,
2030}
2031
2032impl Drop for CommitWhenDrop<'_> {
2033 fn drop(&mut self) {
2034 {
2035 let mut guard = self.doc.txn.lock().unwrap();
2036 if let Some(txn) = guard.as_mut() {
2037 txn.set_default_options(std::mem::take(&mut self.default_options));
2038 };
2039 }
2040
2041 self.doc.commit_then_renew();
2042 }
2043}
2044
2045#[derive(Debug, Clone)]
2047pub struct CommitOptions {
2048 pub origin: Option<InternalString>,
2051
2052 pub immediate_renew: bool,
2055
2056 pub timestamp: Option<Timestamp>,
2059
2060 pub commit_msg: Option<Arc<str>>,
2062}
2063
2064impl CommitOptions {
2065 pub fn new() -> Self {
2067 Self {
2068 origin: None,
2069 immediate_renew: true,
2070 timestamp: None,
2071 commit_msg: None,
2072 }
2073 }
2074
2075 pub fn origin(mut self, origin: &str) -> Self {
2077 self.origin = Some(origin.into());
2078 self
2079 }
2080
2081 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2083 self.immediate_renew = immediate_renew;
2084 self
2085 }
2086
2087 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2091 self.timestamp = Some(timestamp);
2092 self
2093 }
2094
2095 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2097 self.commit_msg = Some(commit_msg.into());
2098 self
2099 }
2100
2101 pub fn set_origin(&mut self, origin: Option<&str>) {
2103 self.origin = origin.map(|x| x.into())
2104 }
2105
2106 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2108 self.timestamp = timestamp;
2109 }
2110}
2111
2112impl Default for CommitOptions {
2113 fn default() -> Self {
2114 Self::new()
2115 }
2116}
2117
2118#[cfg(test)]
2119mod test {
2120 use loro_common::ID;
2121
2122 use crate::{loro::ExportMode, version::Frontiers, LoroDoc, ToJson};
2123
2124 #[test]
2125 fn test_sync() {
2126 fn is_send_sync<T: Send + Sync>(_v: T) {}
2127 let loro = super::LoroDoc::new();
2128 is_send_sync(loro)
2129 }
2130
2131 #[test]
2132 fn test_checkout() {
2133 let loro = LoroDoc::new();
2134 loro.set_peer_id(1).unwrap();
2135 let text = loro.get_text("text");
2136 let map = loro.get_map("map");
2137 let list = loro.get_list("list");
2138 let mut txn = loro.txn().unwrap();
2139 for i in 0..10 {
2140 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2141 text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
2142 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2143 }
2144 txn.commit().unwrap();
2145 let b = LoroDoc::new();
2146 b.import(&loro.export(ExportMode::Snapshot).unwrap()).unwrap();
2147 loro.checkout(&Frontiers::default()).unwrap();
2148 {
2149 let json = &loro.get_deep_value();
2150 assert_eq!(
2151 json.to_json_value(),
2152 serde_json::json!({"text":"","list":[],"map":{}})
2153 );
2154 }
2155
2156 b.checkout(&ID::new(1, 2).into()).unwrap();
2157 {
2158 let json = &b.get_deep_value();
2159 assert_eq!(
2160 json.to_json_value(),
2161 serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2162 );
2163 }
2164
2165 loro.checkout(&ID::new(1, 3).into()).unwrap();
2166 {
2167 let json = &loro.get_deep_value();
2168 assert_eq!(
2169 json.to_json_value(),
2170 serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2171 );
2172 }
2173
2174 b.checkout(&ID::new(1, 29).into()).unwrap();
2175 {
2176 let json = &b.get_deep_value();
2177 assert_eq!(
2178 json.to_json_value(),
2179 serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2180 );
2181 }
2182 }
2183
2184 #[test]
2185 fn import_batch_err_181() {
2186 let a = LoroDoc::new_auto_commit();
2187 let update_a = a.export(ExportMode::Snapshot);
2188 let b = LoroDoc::new_auto_commit();
2189 b.import_batch(&[update_a.unwrap()]).unwrap();
2190 b.get_text("text").insert(0, "hello").unwrap();
2191 b.commit_then_renew();
2192 let oplog = b.oplog().lock().unwrap();
2193 drop(oplog);
2194 b.export(ExportMode::all_updates()).unwrap();
2195 }
2196}