1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::Timestamp,
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use fxhash::{FxHashMap, FxHashSet};
47use loro_common::{
48 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
49 LoroValue, ID,
50};
51use rle::HasLength;
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 pub fn new() -> Self {
82 let oplog = OpLog::new();
83 let arena = oplog.arena.clone();
84 let config: Configure = oplog.configure.clone();
85 let lock_group = LoroLockGroup::new();
86 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
87 let inner = Arc::new_cyclic(|w| {
88 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
89 LoroDocInner {
90 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
91 state,
92 config,
93 detached: AtomicBool::new(false),
94 auto_commit: AtomicBool::new(false),
95 observer: Arc::new(Observer::new(arena.clone())),
96 diff_calculator: Arc::new(
97 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
98 ),
99 txn: global_txn,
100 arena,
101 local_update_subs: SubscriberSetWithQueue::new(),
102 peer_id_change_subs: SubscriberSetWithQueue::new(),
103 pre_commit_subs: SubscriberSetWithQueue::new(),
104 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
105 }
106 });
107 LoroDoc { inner }
108 }
109
110 pub fn fork(&self) -> Self {
111 if self.is_detached() {
112 return self.fork_at(&self.state_frontiers());
113 }
114
115 let (options, txn) = self.commit_then_stop();
116 drop(txn);
117 let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self);
118 let doc = Self::new();
119 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default()).unwrap();
120 doc.set_config(&self.config);
121 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
122 doc.start_auto_commit();
123 }
124 self.renew_txn_if_auto_commit(options);
125 doc
126 }
127 pub fn set_detached_editing(&self, enable: bool) {
143 self.config.set_detached_editing(enable);
144 if enable && self.is_detached() {
145 let (options, txn) = self.commit_then_stop();
146 drop(txn);
147 self.renew_peer_id();
148 self.renew_txn_if_auto_commit(options);
149 }
150 }
151
152 #[inline]
154 pub fn new_auto_commit() -> Self {
155 let doc = Self::new();
156 doc.start_auto_commit();
157 doc
158 }
159
160 #[inline(always)]
161 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
162 if peer == PeerID::MAX {
163 return Err(LoroError::InvalidPeerID);
164 }
165 let next_id = self.oplog.lock().unwrap().next_id(peer);
166 if self.auto_commit.load(Acquire) {
167 let doc_state = self.state.lock().unwrap();
168 doc_state
169 .peer
170 .store(peer, std::sync::atomic::Ordering::Relaxed);
171
172 if doc_state.is_in_txn() {
173 drop(doc_state);
174 self.commit_then_renew();
175 }
176 self.peer_id_change_subs.emit(&(), next_id);
177 return Ok(());
178 }
179
180 let doc_state = self.state.lock().unwrap();
181 if doc_state.is_in_txn() {
182 return Err(LoroError::TransactionError(
183 "Cannot change peer id during transaction"
184 .to_string()
185 .into_boxed_str(),
186 ));
187 }
188
189 doc_state
190 .peer
191 .store(peer, std::sync::atomic::Ordering::Relaxed);
192 drop(doc_state);
193 self.peer_id_change_subs.emit(&(), next_id);
194 Ok(())
195 }
196
197 pub(crate) fn renew_peer_id(&self) {
199 let peer_id = DefaultRandom.next_u64();
200 self.set_peer_id(peer_id).unwrap();
201 }
202
203 #[inline]
210 #[must_use]
211 pub fn commit_then_stop(&self) -> (Option<CommitOptions>, LoroMutexGuard<'_, Option<Transaction>>) {
212 let (a, b) = self.commit_with(CommitOptions::new().immediate_renew(false));
213 (a, b.unwrap())
214 }
215
216 #[inline]
221 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
222 self.commit_with(CommitOptions::new().immediate_renew(true))
223 .0
224 }
225
226 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
231 let mut txn_guard = self.txn.lock().unwrap();
232 let Some(txn) = txn_guard.as_mut() else {
233 return Some(txn_guard);
234 };
235
236 if txn.is_peer_first_appearance {
237 txn.is_peer_first_appearance = false;
238 drop(txn_guard);
239 self.first_commit_from_peer_subs.emit(
241 &(),
242 FirstCommitFromPeerPayload {
243 peer: self.peer_id(),
244 },
245 );
246 }
247
248 None
249 }
250
251 #[instrument(skip_all)]
257 pub fn commit_with(
258 &self,
259 config: CommitOptions,
260 ) -> (
261 Option<CommitOptions>,
262 Option<LoroMutexGuard<'_, Option<Transaction>>>,
263 ) {
264 if !self.auto_commit.load(Acquire) {
265 let txn_guard = self.txn.lock().unwrap();
266 return (None, Some(txn_guard));
269 }
270
271 loop {
272 if let Some(txn_guard) = self.before_commit() {
273 return (None, Some(txn_guard));
274 }
275
276 let mut txn_guard = self.txn.lock().unwrap();
277 let txn = txn_guard.take();
278 let Some(mut txn) = txn else {
279 return (None, Some(txn_guard));
280 };
281 let on_commit = txn.take_on_commit();
282 if let Some(origin) = config.origin.clone() {
283 txn.set_origin(origin);
284 }
285
286 if let Some(timestamp) = config.timestamp {
287 txn.set_timestamp(timestamp);
288 }
289
290 if let Some(msg) = config.commit_msg.as_ref() {
291 txn.set_msg(Some(msg.clone()));
292 }
293
294 let id_span = txn.id_span();
295 let options = txn.commit().unwrap();
296 if config.immediate_renew {
297 assert!(self.can_edit());
298 let mut t = self.txn().unwrap();
299 if let Some(options) = options.as_ref() {
300 t.set_options(options.clone());
301 }
302 *txn_guard = Some(t);
303 }
304
305 if let Some(on_commit) = on_commit {
306 drop(txn_guard);
307 on_commit(&self.state, &self.oplog, id_span);
308 txn_guard = self.txn.lock().unwrap();
309 if !config.immediate_renew && txn_guard.is_some() {
310 continue;
312 }
313 }
314
315 return (
316 options,
317 if !config.immediate_renew {
318 Some(txn_guard)
319 } else {
320 None
321 },
322 );
323 }
324 }
325
326 pub fn set_next_commit_message(&self, message: &str) {
328 let mut binding = self.txn.lock().unwrap();
329 let Some(txn) = binding.as_mut() else {
330 return;
331 };
332
333 if message.is_empty() {
334 txn.set_msg(None)
335 } else {
336 txn.set_msg(Some(message.into()))
337 }
338 }
339
340 pub fn set_next_commit_origin(&self, origin: &str) {
342 let mut txn = self.txn.lock().unwrap();
343 if let Some(txn) = txn.as_mut() {
344 txn.set_origin(origin.into());
345 }
346 }
347
348 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
350 let mut txn = self.txn.lock().unwrap();
351 if let Some(txn) = txn.as_mut() {
352 txn.set_timestamp(timestamp);
353 }
354 }
355
356 pub fn set_next_commit_options(&self, options: CommitOptions) {
358 let mut txn = self.txn.lock().unwrap();
359 if let Some(txn) = txn.as_mut() {
360 txn.set_options(options);
361 }
362 }
363
364 pub fn clear_next_commit_options(&self) {
366 let mut txn = self.txn.lock().unwrap();
367 if let Some(txn) = txn.as_mut() {
368 txn.set_options(CommitOptions::new());
369 }
370 }
371
372 #[inline]
383 pub fn set_record_timestamp(&self, record: bool) {
384 self.config.set_record_timestamp(record);
385 }
386
387 #[inline]
392 pub fn set_change_merge_interval(&self, interval: i64) {
393 self.config.set_merge_interval(interval);
394 }
395
396 pub fn can_edit(&self) -> bool {
397 !self.is_detached() || self.config.detached_editing()
398 }
399
400 pub fn is_detached_editing_enabled(&self) -> bool {
401 self.config.detached_editing()
402 }
403
404 #[inline]
405 pub fn config_text_style(&self, text_style: StyleConfigMap) {
406 self.config.text_style_config.try_write().unwrap().map = text_style.map;
407 }
408
409 #[inline]
410 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
411 self.config
412 .text_style_config
413 .try_write()
414 .unwrap()
415 .default_style = text_style;
416 }
417 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
418 let doc = Self::new();
419 let (options, _guard) = doc.commit_then_stop();
420 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
421 if mode.is_snapshot() {
422 decode_snapshot(&doc, mode, body, Default::default())?;
423 drop(_guard);
424 doc.renew_txn_if_auto_commit(options);
425 Ok(doc)
426 } else {
427 Err(LoroError::DecodeError(
428 "Invalid encode mode".to_string().into(),
429 ))
430 }
431 }
432
433 #[inline(always)]
435 pub fn can_reset_with_snapshot(&self) -> bool {
436 let oplog = self.oplog.lock().unwrap();
437 if oplog.batch_importing {
438 return false;
439 }
440
441 if self.is_detached() {
442 return false;
443 }
444
445 oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
446 }
447
448 #[inline(always)]
454 pub fn is_detached(&self) -> bool {
455 self.detached.load(Acquire)
456 }
457
458 pub(crate) fn set_detached(&self, detached: bool) {
459 self.detached.store(detached, Release);
460 }
461
462 #[inline(always)]
463 pub fn peer_id(&self) -> PeerID {
464 self.state
465 .lock()
466 .unwrap()
467 .peer
468 .load(std::sync::atomic::Ordering::Relaxed)
469 }
470
471 #[inline(always)]
472 pub fn detach(&self) {
473 let (options, txn) = self.commit_then_stop();
474 drop(txn);
475 self.set_detached(true);
476 self.renew_txn_if_auto_commit(options);
477 }
478
479 #[inline(always)]
480 pub fn attach(&self) {
481 self.checkout_to_latest()
482 }
483
484 pub fn state_timestamp(&self) -> Timestamp {
487 let f = &self.state.lock().unwrap().frontiers;
488 self.oplog.lock().unwrap().get_timestamp_of_version(f)
489 }
490
491 #[inline(always)]
492 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
493 &self.state
494 }
495
496 #[inline]
497 pub fn get_state_deep_value(&self) -> LoroValue {
498 self.state.lock().unwrap().get_deep_value()
499 }
500
501 #[inline(always)]
502 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
503 &self.oplog
504 }
505
506 pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
507 let (options, txn) = self.commit_then_stop();
508 let ans = self.oplog.lock().unwrap().export_from(vv);
509 drop(txn);
510 self.renew_txn_if_auto_commit(options);
511 ans
512 }
513
514 #[inline(always)]
515 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
516 let s = debug_span!("import", peer = self.peer_id());
517 let _e = s.enter();
518 self.import_with(bytes, Default::default())
519 }
520
521 #[inline]
522 pub fn import_with(
523 &self,
524 bytes: &[u8],
525 origin: InternalString,
526 ) -> Result<ImportStatus, LoroError> {
527 let (options, txn) = self.commit_then_stop();
528 assert!(txn.is_none());
529 let ans = self._import_with(bytes, origin);
530 drop(txn);
531 self.renew_txn_if_auto_commit(options);
532 ans
533 }
534
535 #[tracing::instrument(skip_all)]
536 fn _import_with(
537 &self,
538 bytes: &[u8],
539 origin: InternalString,
540 ) -> Result<ImportStatus, LoroError> {
541 ensure_cov::notify_cov("loro_internal::import");
542 let parsed = parse_header_and_body(bytes, true)?;
543 loro_common::info!("Importing with mode={:?}", &parsed.mode);
544 let result = match parsed.mode {
545 EncodeMode::OutdatedRle => {
546 if self.state.lock().unwrap().is_in_txn() {
547 return Err(LoroError::ImportWhenInTxn);
548 }
549
550 let s = tracing::span!(
551 tracing::Level::INFO,
552 "Import updates ",
553 peer = self.peer_id()
554 );
555 let _e = s.enter();
556 self.update_oplog_and_apply_delta_to_state_if_needed(
557 |oplog| oplog.decode(parsed),
558 origin,
559 )
560 }
561 EncodeMode::OutdatedSnapshot => {
562 if self.can_reset_with_snapshot() {
563 loro_common::info!("Init by snapshot {}", self.peer_id());
564 decode_snapshot(self, parsed.mode, parsed.body, origin)
565 } else {
566 self.update_oplog_and_apply_delta_to_state_if_needed(
567 |oplog| oplog.decode(parsed),
568 origin,
569 )
570 }
571 }
572 EncodeMode::FastSnapshot => {
573 if self.can_reset_with_snapshot() {
574 ensure_cov::notify_cov("loro_internal::import::snapshot");
575 loro_common::info!("Init by fast snapshot {}", self.peer_id());
576 decode_snapshot(self, parsed.mode, parsed.body, origin)
577 } else {
578 self.update_oplog_and_apply_delta_to_state_if_needed(
579 |oplog| oplog.decode(parsed),
580 origin,
581 )
582
583 }
588 }
589 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
590 |oplog| oplog.decode(parsed),
591 origin,
592 ),
593 EncodeMode::Auto => {
594 unreachable!()
595 }
596 };
597
598 self.emit_events();
599
600 result
601 }
602
603 #[tracing::instrument(skip_all)]
604 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
605 &self,
606 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
607 origin: InternalString,
608 ) -> Result<ImportStatus, LoroError> {
609 let mut oplog = self.oplog.lock().unwrap();
610 if !self.is_detached() {
611 let old_vv = oplog.vv().clone();
612 let old_frontiers = oplog.frontiers().clone();
613 let result = f(&mut oplog);
614 if &old_vv != oplog.vv() {
615 let mut diff = DiffCalculator::new(false);
616 let (diff, diff_mode) = diff.calc_diff_internal(
617 &oplog,
618 &old_vv,
619 &old_frontiers,
620 oplog.vv(),
621 oplog.dag.get_frontiers(),
622 None,
623 );
624 let mut state = self.state.lock().unwrap();
625 state.apply_diff(
626 InternalDocDiff {
627 origin,
628 diff: (diff).into(),
629 by: EventTriggerKind::Import,
630 new_version: Cow::Owned(oplog.frontiers().clone()),
631 },
632 diff_mode,
633 );
634 }
635 result
636 } else {
637 f(&mut oplog)
638 }
639 }
640
641 fn emit_events(&self) {
642 let events = {
644 let mut state = self.state.lock().unwrap();
645 state.take_events()
646 };
647 for event in events {
648 self.observer.emit(event);
649 }
650 }
651
652 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
653 let mut state = self.state.lock().unwrap();
654 state.take_events()
655 }
656
657 #[instrument(skip_all)]
658 pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
659 if self.is_shallow() {
660 return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
661 }
662 let (options, txn) = self.commit_then_stop();
663 drop(txn);
664 let ans = export_snapshot(self);
665 self.renew_txn_if_auto_commit(options);
666 Ok(ans)
667 }
668
669 #[tracing::instrument(skip_all)]
673 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
674 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
675 let (options, txn) = self.commit_then_stop();
676 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
677 |oplog| crate::encoding::json_schema::import_json(oplog, json),
678 Default::default(),
679 );
680 self.emit_events();
681 drop(txn);
682 self.renew_txn_if_auto_commit(options);
683 result
684 }
685
686 pub fn export_json_updates(
687 &self,
688 start_vv: &VersionVector,
689 end_vv: &VersionVector,
690 with_peer_compression: bool,
691 ) -> JsonSchema {
692 let (options, txn) = self.commit_then_stop();
693 drop(txn);
694 let oplog = self.oplog.lock().unwrap();
695 let mut start_vv = start_vv;
696 let _temp: Option<VersionVector>;
697 if !oplog.dag.shallow_since_vv().is_empty() {
698 let mut include_all = true;
700 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
701 if start_vv.get(peer).unwrap_or(&0) < counter {
702 include_all = false;
703 break;
704 }
705 }
706 if !include_all {
707 let mut vv = start_vv.clone();
708 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
709 vv.extend_to_include_end_id(ID::new(peer, counter));
710 }
711 _temp = Some(vv);
712 start_vv = _temp.as_ref().unwrap();
713 }
714 }
715
716 let json = crate::encoding::json_schema::export_json(
717 &oplog,
718 start_vv,
719 end_vv,
720 with_peer_compression,
721 );
722 drop(oplog);
723 self.renew_txn_if_auto_commit(options);
724 json
725 }
726
727 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
728 let oplog = self.oplog.lock().unwrap();
729 let mut changes = export_json_in_id_span(&oplog, id_span);
730 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
731 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
732 changes.push(change_json);
733 }
734 changes
735 }
736
737 #[inline]
739 pub fn oplog_vv(&self) -> VersionVector {
740 self.oplog.lock().unwrap().vv().clone()
741 }
742
743 #[inline]
745 pub fn state_vv(&self) -> VersionVector {
746 let oplog = self.oplog.lock().unwrap();
747 let f = &self.state.lock().unwrap().frontiers;
748 oplog.dag.frontiers_to_vv(f).unwrap()
749 }
750
751 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
752 let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
753 if let LoroValue::Container(c) = value {
754 Some(ValueOrHandler::Handler(Handler::new_attached(
755 c.clone(),
756 self.clone(),
757 )))
758 } else {
759 Some(ValueOrHandler::Value(value))
760 }
761 }
762
763 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
765 let path = str_to_path(path)?;
766 self.get_by_path(&path)
767 }
768
769 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
770 let arena = &self.arena;
771 let txn = self.txn.lock().unwrap();
772 let txn = txn.as_ref()?;
773 let ops_ = txn.local_ops();
774 let new_id = ID {
775 peer: *txn.peer(),
776 counter: ops_.first()?.counter,
777 };
778 let change = ChangeRef {
779 id: &new_id,
780 deps: txn.frontiers(),
781 timestamp: &txn
782 .timestamp()
783 .as_ref()
784 .copied()
785 .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
786 commit_msg: txn.msg(),
787 ops: ops_,
788 lamport: txn.lamport(),
789 };
790 let json = encode_change_to_json(change, arena);
791 Some(json)
792 }
793
794 #[inline]
795 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
796 if self.has_container(&id) {
797 Some(Handler::new_attached(id, self.clone()))
798 } else {
799 None
800 }
801 }
802
803 #[inline]
806 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
807 let id = id.into_container_id(&self.arena, ContainerType::Text);
808 assert!(self.has_container(&id));
809 Handler::new_attached(id, self.clone()).into_text().unwrap()
810 }
811
812 #[inline]
815 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
816 let id = id.into_container_id(&self.arena, ContainerType::List);
817 assert!(self.has_container(&id));
818 Handler::new_attached(id, self.clone()).into_list().unwrap()
819 }
820
821 #[inline]
824 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
825 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
826 assert!(self.has_container(&id));
827 Handler::new_attached(id, self.clone())
828 .into_movable_list()
829 .unwrap()
830 }
831
832 #[inline]
835 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
836 let id = id.into_container_id(&self.arena, ContainerType::Map);
837 assert!(self.has_container(&id));
838 Handler::new_attached(id, self.clone()).into_map().unwrap()
839 }
840
841 #[inline]
844 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
845 let id = id.into_container_id(&self.arena, ContainerType::Tree);
846 assert!(self.has_container(&id));
847 Handler::new_attached(id, self.clone()).into_tree().unwrap()
848 }
849
850 #[cfg(feature = "counter")]
851 pub fn get_counter<I: IntoContainerId>(
852 &self,
853 id: I,
854 ) -> crate::handler::counter::CounterHandler {
855 let id = id.into_container_id(&self.arena, ContainerType::Counter);
856 assert!(self.has_container(&id));
857 Handler::new_attached(id, self.clone())
858 .into_counter()
859 .unwrap()
860 }
861
862 #[must_use]
863 pub fn has_container(&self, id: &ContainerID) -> bool {
864 if id.is_root() {
865 return true;
866 }
867
868 let exist = self.state.lock().unwrap().does_container_exist(id);
869 exist
870 }
871
872 #[instrument(level = "info", skip_all)]
886 pub fn undo_internal(
887 &self,
888 id_span: IdSpan,
889 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
890 post_transform_base: Option<&DiffBatch>,
891 before_diff: &mut dyn FnMut(&DiffBatch),
892 ) -> LoroResult<CommitWhenDrop<'_>> {
893 if !self.can_edit() {
894 return Err(LoroError::EditWhenDetached);
895 }
896
897 let (options, txn) = self.commit_then_stop();
898 if !self
899 .oplog()
900 .lock()
901 .unwrap()
902 .vv()
903 .includes_id(id_span.id_last())
904 {
905 self.renew_txn_if_auto_commit(options);
906 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
907 }
908
909 let (was_recording, latest_frontiers) = {
910 let mut state = self.state.lock().unwrap();
911 let was_recording = state.is_recording();
912 state.stop_and_clear_recording();
913 (was_recording, state.frontiers.clone())
914 };
915
916 let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
917 let diff = crate::undo::undo(
918 spans,
919 match post_transform_base {
920 Some(d) => Either::Right(d),
921 None => Either::Left(&latest_frontiers),
922 },
923 |from, to| {
924 self._checkout_without_emitting(from, false, false).unwrap();
925 self.state.lock().unwrap().start_recording();
926 self._checkout_without_emitting(to, false, false).unwrap();
927 let mut state = self.state.lock().unwrap();
928 let e = state.take_events();
929 state.stop_and_clear_recording();
930 DiffBatch::new(e)
931 },
932 before_diff,
933 );
934
935 self._checkout_without_emitting(&latest_frontiers, false, false)?;
939 self.set_detached(false);
940 if was_recording {
941 self.state.lock().unwrap().start_recording();
942 }
943 drop(txn);
944 self.start_auto_commit();
945 if let Err(e) = self._apply_diff(diff, container_remap, true) {
949 warn!("Undo Failed {:?}", e);
950 }
951
952 if let Some(options) = options {
953 self.set_next_commit_options(options);
954 }
955 Ok(CommitWhenDrop {
956 doc: self,
957 default_options: CommitOptions::new().origin("undo"),
958 })
959 }
960
961 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
967 let f = self.state_frontiers();
970 let diff = self.diff(&f, target)?;
971 self._apply_diff(diff, &mut Default::default(), false)
972 }
973
974 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
979 {
980 let oplog = self.oplog.lock().unwrap();
982 for id in a.iter() {
983 if !oplog.dag.contains(id) {
984 return Err(LoroError::FrontiersNotFound(id));
985 }
986 }
987 for id in b.iter() {
988 if !oplog.dag.contains(id) {
989 return Err(LoroError::FrontiersNotFound(id));
990 }
991 }
992 }
993
994 let (options, txn) = self.commit_then_stop();
995 let was_detached = self.is_detached();
996 let old_frontiers = self.state_frontiers();
997 let was_recording = {
998 let mut state = self.state.lock().unwrap();
999 let is_recording = state.is_recording();
1000 state.stop_and_clear_recording();
1001 is_recording
1002 };
1003 self._checkout_without_emitting(a, true, false).unwrap();
1004 self.state.lock().unwrap().start_recording();
1005 self._checkout_without_emitting(b, true, false).unwrap();
1006 let e = {
1007 let mut state = self.state.lock().unwrap();
1008 let e = state.take_events();
1009 state.stop_and_clear_recording();
1010 e
1011 };
1012 self._checkout_without_emitting(&old_frontiers, false, false)
1013 .unwrap();
1014 drop(txn);
1015 if !was_detached {
1016 self.set_detached(false);
1017 self.renew_txn_if_auto_commit(options);
1018 }
1019 if was_recording {
1020 self.state.lock().unwrap().start_recording();
1021 }
1022 Ok(DiffBatch::new(e))
1023 }
1024
1025 #[inline(always)]
1027 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1028 self._apply_diff(diff, &mut Default::default(), true)
1029 }
1030
1031 pub(crate) fn _apply_diff(
1043 &self,
1044 diff: DiffBatch,
1045 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1046 skip_unreachable: bool,
1047 ) -> LoroResult<()> {
1048 if !self.can_edit() {
1049 return Err(LoroError::EditWhenDetached);
1050 }
1051
1052 let mut ans: LoroResult<()> = Ok(());
1053 let mut missing_containers: Vec<ContainerID> = Vec::new();
1054 for (mut id, diff) in diff.into_iter() {
1055 let mut remapped = false;
1056 while let Some(rid) = container_remap.get(&id) {
1057 remapped = true;
1058 id = rid.clone();
1059 }
1060
1061 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1062 let exists = self.state.lock().unwrap().does_container_exist(&id);
1064 if !exists {
1065 missing_containers.push(id);
1066 continue;
1067 }
1068 self.state.lock().unwrap().ensure_container(&id);
1070 }
1071
1072 if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1073 continue;
1074 }
1075
1076 let Some(h) = self.get_handler(id.clone()) else {
1077 return Err(LoroError::ContainersNotFound {
1078 containers: Box::new(vec![id]),
1079 });
1080 };
1081 if let Err(e) = h.apply_diff(diff, container_remap) {
1082 ans = Err(e);
1083 }
1084 }
1085
1086 if !missing_containers.is_empty() {
1087 return Err(LoroError::ContainersNotFound {
1088 containers: Box::new(missing_containers),
1089 });
1090 }
1091
1092 ans
1093 }
1094
1095 #[inline]
1097 pub fn diagnose_size(&self) {
1098 self.oplog().lock().unwrap().diagnose_size();
1099 }
1100
1101 #[inline]
1102 pub fn oplog_frontiers(&self) -> Frontiers {
1103 self.oplog().lock().unwrap().frontiers().clone()
1104 }
1105
1106 #[inline]
1107 pub fn state_frontiers(&self) -> Frontiers {
1108 self.state.lock().unwrap().frontiers.clone()
1109 }
1110
1111 #[inline]
1115 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1116 self.oplog().lock().unwrap().cmp_with_frontiers(other)
1117 }
1118
1119 #[inline]
1123 pub fn cmp_frontiers(
1124 &self,
1125 a: &Frontiers,
1126 b: &Frontiers,
1127 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1128 self.oplog().lock().unwrap().cmp_frontiers(a, b)
1129 }
1130
1131 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1132 let mut state = self.state.lock().unwrap();
1133 if !state.is_recording() {
1134 state.start_recording();
1135 }
1136
1137 self.observer.subscribe_root(callback)
1138 }
1139
1140 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1141 let mut state = self.state.lock().unwrap();
1142 if !state.is_recording() {
1143 state.start_recording();
1144 }
1145
1146 self.observer.subscribe(container_id, callback)
1147 }
1148
1149 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1150 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1151 activate();
1152 sub
1153 }
1154
1155 #[tracing::instrument(skip_all)]
1157 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1158 if bytes.is_empty() {
1159 return Ok(ImportStatus::default());
1160 }
1161
1162 if bytes.len() == 1 {
1163 return self.import(&bytes[0]);
1164 }
1165
1166 let mut success = VersionRange::default();
1167 let mut pending = VersionRange::default();
1168 let mut meta_arr = bytes
1169 .iter()
1170 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1171 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1172 meta_arr.sort_by(|a, b| {
1173 a.0.mode
1174 .cmp(&b.0.mode)
1175 .then(b.0.change_num.cmp(&a.0.change_num))
1176 });
1177
1178 let (options, txn) = self.commit_then_stop();
1179 drop(txn);
1180 let is_detached = self.is_detached();
1181 self.detach();
1182 self.oplog.lock().unwrap().batch_importing = true;
1183 let mut err = None;
1184 for (_meta, data) in meta_arr {
1185 match self.import(data) {
1186 Ok(s) => {
1187 for (peer, (start, end)) in s.success.iter() {
1188 match success.0.entry(*peer) {
1189 Entry::Occupied(mut e) => {
1190 e.get_mut().1 = *end.max(&e.get().1);
1191 }
1192 Entry::Vacant(e) => {
1193 e.insert((*start, *end));
1194 }
1195 }
1196 }
1197
1198 if let Some(p) = s.pending.as_ref() {
1199 for (&peer, &(start, end)) in p.iter() {
1200 match pending.0.entry(peer) {
1201 Entry::Occupied(mut e) => {
1202 e.get_mut().0 = start.min(e.get().0);
1203 e.get_mut().1 = end.min(e.get().1);
1204 }
1205 Entry::Vacant(e) => {
1206 e.insert((start, end));
1207 }
1208 }
1209 }
1210 }
1211 }
1212 Err(e) => {
1213 err = Some(e);
1214 }
1215 }
1216 }
1217
1218 let mut oplog = self.oplog.lock().unwrap();
1219 oplog.batch_importing = false;
1220 drop(oplog);
1221
1222 if !is_detached {
1223 self.checkout_to_latest();
1224 }
1225
1226 self.renew_txn_if_auto_commit(options);
1227 if let Some(err) = err {
1228 return Err(err);
1229 }
1230
1231 Ok(ImportStatus {
1232 success,
1233 pending: if pending.is_empty() {
1234 None
1235 } else {
1236 Some(pending)
1237 },
1238 })
1239 }
1240
1241 #[inline]
1243 pub fn get_value(&self) -> LoroValue {
1244 self.state.lock().unwrap().get_value()
1245 }
1246
1247 #[inline]
1249 pub fn get_deep_value(&self) -> LoroValue {
1250 self.state.lock().unwrap().get_deep_value()
1251 }
1252
1253 #[inline]
1255 pub fn get_deep_value_with_id(&self) -> LoroValue {
1256 self.state.lock().unwrap().get_deep_value_with_id()
1257 }
1258
1259 pub fn checkout_to_latest(&self) {
1260 let (options, _guard) = self.commit_then_stop();
1261 if !self.is_detached() {
1262 drop(_guard);
1263 self.renew_txn_if_auto_commit(options);
1264 return;
1265 }
1266
1267 self._checkout_to_latest_without_commit(true);
1268 drop(_guard);
1269 self.renew_txn_if_auto_commit(options);
1270 }
1271
1272 pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1274 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1275 let f = self.oplog_frontiers();
1276 let this = &self;
1277 let frontiers = &f;
1278 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1279 .unwrap(); this.emit_events();
1282 if this.config.detached_editing() {
1283 this.renew_peer_id();
1284 }
1285
1286 self.set_detached(false);
1287 });
1288 }
1289
1290 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1295 let (options, guard) = self.commit_then_stop();
1296 self._checkout_without_emitting(frontiers, true, true)?;
1297 self.emit_events();
1298 drop(guard);
1299 if self.config.detached_editing() {
1300 self.renew_peer_id();
1301 self.renew_txn_if_auto_commit(options);
1302 } else if !self.is_detached() {
1303 self.renew_txn_if_auto_commit(options);
1304 }
1305
1306 Ok(())
1307 }
1308
1309 #[instrument(level = "info", skip(self))]
1311 pub(crate) fn _checkout_without_emitting(
1312 &self,
1313 frontiers: &Frontiers,
1314 to_shrink_frontiers: bool,
1315 to_commit_then_renew: bool,
1316 ) -> Result<(), LoroError> {
1317 assert!(self.txn.is_locked());
1318 let from_frontiers = self.state_frontiers();
1319 loro_common::info!(
1320 "checkout from={:?} to={:?} cur_vv={:?}",
1321 from_frontiers,
1322 frontiers,
1323 self.oplog_vv()
1324 );
1325
1326 if &from_frontiers == frontiers {
1327 return Ok(());
1328 }
1329
1330 let oplog = self.oplog.lock().unwrap();
1331 if oplog.dag.is_before_shallow_root(frontiers) {
1332 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1333 }
1334
1335 let frontiers = if to_shrink_frontiers {
1336 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1337 } else {
1338 frontiers.clone()
1339 };
1340
1341 if from_frontiers == frontiers {
1342 return Ok(());
1343 }
1344
1345 let mut state = self.state.lock().unwrap();
1346 let mut calc = self.diff_calculator.lock().unwrap();
1347 for i in frontiers.iter() {
1348 if !oplog.dag.contains(i) {
1349 return Err(LoroError::FrontiersNotFound(i));
1350 }
1351 }
1352
1353 let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1354 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1355 return Err(LoroError::NotFoundError(
1356 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1357 ));
1358 };
1359
1360 self.set_detached(true);
1361 let (diff, diff_mode) =
1362 calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1363 state.apply_diff(
1364 InternalDocDiff {
1365 origin: "checkout".into(),
1366 diff: Cow::Owned(diff),
1367 by: EventTriggerKind::Checkout,
1368 new_version: Cow::Owned(frontiers.clone()),
1369 },
1370 diff_mode,
1371 );
1372
1373 Ok(())
1374 }
1375
1376 #[inline]
1377 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1378 self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1379 }
1380
1381 #[inline]
1382 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1383 self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1384 }
1385
1386 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1390 self.import(&other.export_from(&self.oplog_vv()))
1391 }
1392
1393 pub(crate) fn arena(&self) -> &SharedArena {
1394 &self.arena
1395 }
1396
1397 #[inline]
1398 pub fn len_ops(&self) -> usize {
1399 let oplog = self.oplog.lock().unwrap();
1400 let ans = oplog.vv().values().sum::<i32>() as usize;
1401 if oplog.is_shallow() {
1402 let sub = oplog
1403 .shallow_since_vv()
1404 .iter()
1405 .map(|(_, ops)| *ops)
1406 .sum::<i32>() as usize;
1407 ans - sub
1408 } else {
1409 ans
1410 }
1411 }
1412
1413 #[inline]
1414 pub fn len_changes(&self) -> usize {
1415 let oplog = self.oplog.lock().unwrap();
1416 oplog.len_changes()
1417 }
1418
1419 pub fn config(&self) -> &Configure {
1420 &self.config
1421 }
1422
1423 pub fn check_state_diff_calc_consistency_slow(&self) {
1428 {
1430 static IS_CHECKING: std::sync::atomic::AtomicBool =
1431 std::sync::atomic::AtomicBool::new(false);
1432 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1433 return;
1434 }
1435
1436 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1437 let peer_id = self.peer_id();
1438 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1439 let _g = s.enter();
1440 let options = self.commit_then_stop().0;
1441 self.oplog.lock().unwrap().check_dag_correctness();
1442 if self.is_shallow() {
1443 let initial_snapshot = self
1454 .export(ExportMode::state_only(Some(
1455 &self.shallow_since_frontiers(),
1456 )))
1457 .unwrap();
1458
1459 let doc = LoroDoc::new();
1461 doc.import(&initial_snapshot).unwrap();
1462 self.checkout(&self.shallow_since_frontiers()).unwrap();
1463 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1464
1465 let updates = self.export(ExportMode::all_updates()).unwrap();
1467
1468 doc.import(&updates).unwrap();
1470 self.checkout_to_latest();
1471
1472 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1475 let mut calculated_state = doc.app_state().lock().unwrap();
1476 let mut current_state = self.app_state().lock().unwrap();
1477 current_state.check_is_the_same(&mut calculated_state);
1478 } else {
1479 let f = self.state_frontiers();
1480 let vv = self
1481 .oplog()
1482 .lock()
1483 .unwrap()
1484 .dag
1485 .frontiers_to_vv(&f)
1486 .unwrap();
1487 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1488 let doc = Self::new();
1489 doc.import(&bytes).unwrap();
1490 let mut calculated_state = doc.app_state().lock().unwrap();
1491 let mut current_state = self.app_state().lock().unwrap();
1492 current_state.check_is_the_same(&mut calculated_state);
1493 }
1494
1495 self.renew_txn_if_auto_commit(options);
1496 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1497 }
1498 }
1499
1500 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1501 self.query_pos_internal(pos, true)
1502 }
1503
1504 pub(crate) fn query_pos_internal(
1506 &self,
1507 pos: &Cursor,
1508 ret_event_index: bool,
1509 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1510 let mut state = self.state.lock().unwrap();
1511 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1512 Ok(PosQueryResult {
1513 update: None,
1514 current: AbsolutePosition {
1515 pos: ans,
1516 side: pos.side,
1517 },
1518 })
1519 } else {
1520 drop(state);
1532 self.commit_then_renew();
1533 let oplog = self.oplog().lock().unwrap();
1534 if let Some(id) = pos.id {
1536 if oplog.arena.id_to_idx(&pos.container).is_none() {
1538 let mut s = self.state.lock().unwrap();
1539 if !s.does_container_exist(&pos.container) {
1540 return Err(CannotFindRelativePosition::ContainerDeleted);
1541 }
1542 s.ensure_container(&pos.container);
1543 drop(s);
1544 }
1545 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1546 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1548 if oplog.shallow_since_vv().includes_id(id) {
1549 return Err(CannotFindRelativePosition::HistoryCleared);
1550 }
1551
1552 tracing::error!("Cannot find id {}", id);
1553 return Err(CannotFindRelativePosition::IdNotFound);
1554 };
1555 let mut diff_calc = DiffCalculator::new(true);
1557 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1558 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1559 diff_calc.calc_diff_internal(
1561 &oplog,
1562 before,
1563 &before_frontiers,
1564 oplog.vv(),
1565 oplog.frontiers(),
1566 Some(&|target| idx == target),
1567 );
1568 let depth = self.arena.get_depth(idx);
1570 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1571 match diff_calc {
1572 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1573 let c = text.get_id_latest_pos(id).unwrap();
1574 let new_pos = c.pos;
1575 let handler = self.get_text(&pos.container);
1576 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1577 Ok(PosQueryResult {
1578 update: handler.get_cursor(current_pos, c.side),
1579 current: AbsolutePosition {
1580 pos: current_pos,
1581 side: c.side,
1582 },
1583 })
1584 }
1585 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1586 let c = list.get_id_latest_pos(id).unwrap();
1587 let new_pos = c.pos;
1588 let handler = self.get_list(&pos.container);
1589 Ok(PosQueryResult {
1590 update: handler.get_cursor(new_pos, c.side),
1591 current: AbsolutePosition {
1592 pos: new_pos,
1593 side: c.side,
1594 },
1595 })
1596 }
1597 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1598 let c = list.get_id_latest_pos(id).unwrap();
1599 let new_pos = c.pos;
1600 let handler = self.get_movable_list(&pos.container);
1601 let new_pos = handler.op_pos_to_user_pos(new_pos);
1602 Ok(PosQueryResult {
1603 update: handler.get_cursor(new_pos, c.side),
1604 current: AbsolutePosition {
1605 pos: new_pos,
1606 side: c.side,
1607 },
1608 })
1609 }
1610 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1611 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1612 #[cfg(feature = "counter")]
1613 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1614 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1615 }
1616 } else {
1617 match pos.container.container_type() {
1618 ContainerType::Text => {
1619 let text = self.get_text(&pos.container);
1620 Ok(PosQueryResult {
1621 update: Some(Cursor {
1622 id: None,
1623 container: text.id(),
1624 side: pos.side,
1625 origin_pos: text.len_unicode(),
1626 }),
1627 current: AbsolutePosition {
1628 pos: text.len_event(),
1629 side: pos.side,
1630 },
1631 })
1632 }
1633 ContainerType::List => {
1634 let list = self.get_list(&pos.container);
1635 Ok(PosQueryResult {
1636 update: Some(Cursor {
1637 id: None,
1638 container: list.id(),
1639 side: pos.side,
1640 origin_pos: list.len(),
1641 }),
1642 current: AbsolutePosition {
1643 pos: list.len(),
1644 side: pos.side,
1645 },
1646 })
1647 }
1648 ContainerType::MovableList => {
1649 let list = self.get_movable_list(&pos.container);
1650 Ok(PosQueryResult {
1651 update: Some(Cursor {
1652 id: None,
1653 container: list.id(),
1654 side: pos.side,
1655 origin_pos: list.len(),
1656 }),
1657 current: AbsolutePosition {
1658 pos: list.len(),
1659 side: pos.side,
1660 },
1661 })
1662 }
1663 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1664 unreachable!()
1665 }
1666 #[cfg(feature = "counter")]
1667 ContainerType::Counter => unreachable!(),
1668 }
1669 }
1670 }
1671 }
1672
1673 pub fn free_history_cache(&self) {
1678 self.oplog.lock().unwrap().free_history_cache();
1679 }
1680
1681 pub fn free_diff_calculator(&self) {
1683 *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1684 }
1685
1686 pub fn has_history_cache(&self) -> bool {
1689 self.oplog.lock().unwrap().has_history_cache()
1690 }
1691
1692 #[inline]
1696 pub fn compact_change_store(&self) {
1697 self.commit_then_renew();
1698 self.oplog.lock().unwrap().compact_change_store();
1699 }
1700
1701 #[inline]
1705 pub fn analyze(&self) -> DocAnalysis {
1706 DocAnalysis::analyze(self)
1707 }
1708
1709 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1711 let mut state = self.state.lock().unwrap();
1712 if state.arena.id_to_idx(id).is_none() {
1713 if !state.does_container_exist(id) {
1714 return None;
1715 }
1716 state.ensure_container(id);
1717 }
1718 let idx = state.arena.id_to_idx(id).unwrap();
1719 state.get_path(idx)
1720 }
1721
1722 #[instrument(skip(self))]
1723 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1724 let (options, txn) = self.commit_then_stop();
1725 let ans = match mode {
1726 ExportMode::Snapshot => export_fast_snapshot(self),
1727 ExportMode::Updates { from } => export_fast_updates(self, &from),
1728 ExportMode::UpdatesInRange { spans } => {
1729 export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1730 }
1731 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1732 ExportMode::StateOnly(f) => match f {
1733 Some(f) => export_state_only_snapshot(self, &f)?,
1734 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1735 },
1736 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1737 };
1738
1739 drop(txn);
1740 self.renew_txn_if_auto_commit(options);
1741 Ok(ans)
1742 }
1743
1744 pub fn shallow_since_vv(&self) -> ImVersionVector {
1750 self.oplog().lock().unwrap().shallow_since_vv().clone()
1751 }
1752
1753 pub fn shallow_since_frontiers(&self) -> Frontiers {
1754 self.oplog()
1755 .lock()
1756 .unwrap()
1757 .shallow_since_frontiers()
1758 .clone()
1759 }
1760
1761 pub fn is_shallow(&self) -> bool {
1763 !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1764 }
1765
1766 pub fn get_pending_txn_len(&self) -> usize {
1771 if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1772 txn.len()
1773 } else {
1774 0
1775 }
1776 }
1777
1778 #[inline]
1779 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1780 self.oplog().lock().unwrap().dag.find_path(from, to)
1781 }
1782
1783 pub fn subscribe_first_commit_from_peer(
1789 &self,
1790 callback: FirstCommitFromPeerCallback,
1791 ) -> Subscription {
1792 let (s, enable) = self
1793 .first_commit_from_peer_subs
1794 .inner()
1795 .insert((), callback);
1796 enable();
1797 s
1798 }
1799
1800 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1805 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1806 enable();
1807 s
1808 }
1809}
1810
1811#[derive(Debug, thiserror::Error)]
1812pub enum ChangeTravelError {
1813 #[error("Target id not found {0:?}")]
1814 TargetIdNotFound(ID),
1815 #[error("The shallow history of the doc doesn't include the target version")]
1816 TargetVersionNotIncluded,
1817}
1818
1819impl LoroDoc {
1820 pub fn travel_change_ancestors(
1821 &self,
1822 ids: &[ID],
1823 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1824 ) -> Result<(), ChangeTravelError> {
1825 self.commit_then_renew();
1826 struct PendingNode(ChangeMeta);
1827 impl PartialEq for PendingNode {
1828 fn eq(&self, other: &Self) -> bool {
1829 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1830 }
1831 }
1832
1833 impl Eq for PendingNode {}
1834 impl PartialOrd for PendingNode {
1835 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1836 Some(self.cmp(other))
1837 }
1838 }
1839
1840 impl Ord for PendingNode {
1841 fn cmp(&self, other: &Self) -> Ordering {
1842 self.0
1843 .lamport_last()
1844 .cmp(&other.0.lamport_last())
1845 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1846 }
1847 }
1848
1849 for id in ids {
1850 let op_log = &self.oplog().lock().unwrap();
1851 if !op_log.vv().includes_id(*id) {
1852 return Err(ChangeTravelError::TargetIdNotFound(*id));
1853 }
1854 if op_log.dag.shallow_since_vv().includes_id(*id) {
1855 return Err(ChangeTravelError::TargetVersionNotIncluded);
1856 }
1857 }
1858
1859 let mut visited = FxHashSet::default();
1860 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1861 for id in ids {
1862 pending.push(PendingNode(ChangeMeta::from_change(
1863 &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1864 )));
1865 }
1866 while let Some(PendingNode(node)) = pending.pop() {
1867 let deps = node.deps.clone();
1868 if f(node).is_break() {
1869 break;
1870 }
1871
1872 for dep in deps.iter() {
1873 let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1874 continue;
1875 };
1876 if visited.contains(&dep_node.id) {
1877 continue;
1878 }
1879
1880 visited.insert(dep_node.id);
1881 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1882 }
1883 }
1884
1885 Ok(())
1886 }
1887
1888 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1889 self.commit_then_renew();
1890 let mut set = FxHashSet::default();
1891 let oplog = &self.oplog().lock().unwrap();
1892 for op in oplog.iter_ops(id.to_span(len)) {
1893 let id = oplog.arena.get_container_id(op.container()).unwrap();
1894 set.insert(id);
1895 }
1896
1897 set
1898 }
1899
1900 pub fn delete_root_container(&self, cid: ContainerID) {
1901 if !cid.is_root() {
1902 return;
1903 }
1904
1905 if !self.has_container(&cid) {
1907 return;
1908 }
1909
1910 let Some(h) = self.get_handler(cid.clone()) else {
1911 return;
1912 };
1913
1914 if let Err(e) = h.clear() {
1915 eprintln!("Failed to clear handler: {:?}", e);
1916 return;
1917 }
1918 self.config
1919 .deleted_root_containers
1920 .lock()
1921 .unwrap()
1922 .insert(cid);
1923 }
1924
1925 pub fn set_hide_empty_root_containers(&self, hide: bool) {
1926 self.config
1927 .hide_empty_root_containers
1928 .store(hide, std::sync::atomic::Ordering::Relaxed);
1929 }
1930}
1931
1932fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
1934 let start_vv = oplog
1935 .dag
1936 .frontiers_to_vv(&id.into())
1937 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
1938 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
1939 for op in change.ops.iter().rev() {
1940 if op.container != idx {
1941 continue;
1942 }
1943 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
1944 if d.id_start.to_span(d.atom_len()).contains(id) {
1945 return Some(ID::new(change.peer(), op.counter));
1946 }
1947 }
1948 }
1949 }
1950
1951 None
1952}
1953
1954#[derive(Debug)]
1955pub struct CommitWhenDrop<'a> {
1956 doc: &'a LoroDoc,
1957 default_options: CommitOptions,
1958}
1959
1960impl Drop for CommitWhenDrop<'_> {
1961 fn drop(&mut self) {
1962 {
1963 let mut guard = self.doc.txn.lock().unwrap();
1964 if let Some(txn) = guard.as_mut() {
1965 txn.set_default_options(std::mem::take(&mut self.default_options));
1966 };
1967 }
1968
1969 self.doc.commit_then_renew();
1970 }
1971}
1972
1973#[derive(Debug, Clone)]
1975pub struct CommitOptions {
1976 pub origin: Option<InternalString>,
1979
1980 pub immediate_renew: bool,
1983
1984 pub timestamp: Option<Timestamp>,
1987
1988 pub commit_msg: Option<Arc<str>>,
1990}
1991
1992impl CommitOptions {
1993 pub fn new() -> Self {
1995 Self {
1996 origin: None,
1997 immediate_renew: true,
1998 timestamp: None,
1999 commit_msg: None,
2000 }
2001 }
2002
2003 pub fn origin(mut self, origin: &str) -> Self {
2005 self.origin = Some(origin.into());
2006 self
2007 }
2008
2009 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2011 self.immediate_renew = immediate_renew;
2012 self
2013 }
2014
2015 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2019 self.timestamp = Some(timestamp);
2020 self
2021 }
2022
2023 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2025 self.commit_msg = Some(commit_msg.into());
2026 self
2027 }
2028
2029 pub fn set_origin(&mut self, origin: Option<&str>) {
2031 self.origin = origin.map(|x| x.into())
2032 }
2033
2034 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2036 self.timestamp = timestamp;
2037 }
2038}
2039
2040impl Default for CommitOptions {
2041 fn default() -> Self {
2042 Self::new()
2043 }
2044}
2045
2046#[cfg(test)]
2047mod test {
2048 use loro_common::ID;
2049
2050 use crate::{version::Frontiers, LoroDoc, ToJson};
2051
2052 #[test]
2053 fn test_sync() {
2054 fn is_send_sync<T: Send + Sync>(_v: T) {}
2055 let loro = super::LoroDoc::new();
2056 is_send_sync(loro)
2057 }
2058
2059 #[test]
2060 fn test_checkout() {
2061 let loro = LoroDoc::new();
2062 loro.set_peer_id(1).unwrap();
2063 let text = loro.get_text("text");
2064 let map = loro.get_map("map");
2065 let list = loro.get_list("list");
2066 let mut txn = loro.txn().unwrap();
2067 for i in 0..10 {
2068 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2069 text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
2070 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2071 }
2072 txn.commit().unwrap();
2073 let b = LoroDoc::new();
2074 b.import(&loro.export_snapshot().unwrap()).unwrap();
2075 loro.checkout(&Frontiers::default()).unwrap();
2076 {
2077 let json = &loro.get_deep_value();
2078 assert_eq!(json.to_json(), r#"{"text":"","list":[],"map":{}}"#);
2079 }
2080
2081 b.checkout(&ID::new(1, 2).into()).unwrap();
2082 {
2083 let json = &b.get_deep_value();
2084 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":0}}"#);
2085 }
2086
2087 loro.checkout(&ID::new(1, 3).into()).unwrap();
2088 {
2089 let json = &loro.get_deep_value();
2090 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":1}}"#);
2091 }
2092
2093 b.checkout(&ID::new(1, 29).into()).unwrap();
2094 {
2095 let json = &b.get_deep_value();
2096 assert_eq!(
2097 json.to_json(),
2098 r#"{"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}}"#
2099 );
2100 }
2101 }
2102
2103 #[test]
2104 fn import_batch_err_181() {
2105 let a = LoroDoc::new_auto_commit();
2106 let update_a = a.export_snapshot();
2107 let b = LoroDoc::new_auto_commit();
2108 b.import_batch(&[update_a.unwrap()]).unwrap();
2109 b.get_text("text").insert(0, "hello").unwrap();
2110 b.commit_then_renew();
2111 let oplog = b.oplog().lock().unwrap();
2112 drop(oplog);
2113 b.export_from(&Default::default());
2114 }
2115}