1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::Timestamp,
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use fxhash::{FxHashMap, FxHashSet};
47use loro_common::{
48 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
49 LoroValue, ID,
50};
51use rle::HasLength;
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 pub fn new() -> Self {
82 let oplog = OpLog::new();
83 let arena = oplog.arena.clone();
84 let config: Configure = oplog.configure.clone();
85 let lock_group = LoroLockGroup::new();
86 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
87 let inner = Arc::new_cyclic(|w| {
88 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
89 LoroDocInner {
90 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
91 state,
92 config,
93 detached: AtomicBool::new(false),
94 auto_commit: AtomicBool::new(false),
95 observer: Arc::new(Observer::new(arena.clone())),
96 diff_calculator: Arc::new(
97 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
98 ),
99 txn: global_txn,
100 arena,
101 local_update_subs: SubscriberSetWithQueue::new(),
102 peer_id_change_subs: SubscriberSetWithQueue::new(),
103 pre_commit_subs: SubscriberSetWithQueue::new(),
104 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
105 }
106 });
107 LoroDoc { inner }
108 }
109
110 pub fn fork(&self) -> Self {
111 if self.is_detached() {
112 return self.fork_at(&self.state_frontiers());
113 }
114
115 let (options, txn) = self.commit_then_stop();
116 drop(txn);
117 let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self);
118 let doc = Self::new();
119 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc).unwrap();
120 doc.set_config(&self.config);
121 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
122 doc.start_auto_commit();
123 }
124 self.renew_txn_if_auto_commit(options);
125 doc
126 }
127 pub fn set_detached_editing(&self, enable: bool) {
143 self.config.set_detached_editing(enable);
144 if enable && self.is_detached() {
145 let (options, txn) = self.commit_then_stop();
146 drop(txn);
147 self.renew_peer_id();
148 self.renew_txn_if_auto_commit(options);
149 }
150 }
151
152 #[inline]
154 pub fn new_auto_commit() -> Self {
155 let doc = Self::new();
156 doc.start_auto_commit();
157 doc
158 }
159
160 #[inline(always)]
161 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
162 if peer == PeerID::MAX {
163 return Err(LoroError::InvalidPeerID);
164 }
165 let next_id = self.oplog.lock().unwrap().next_id(peer);
166 if self.auto_commit.load(Acquire) {
167 let doc_state = self.state.lock().unwrap();
168 doc_state
169 .peer
170 .store(peer, std::sync::atomic::Ordering::Relaxed);
171
172 if doc_state.is_in_txn() {
173 drop(doc_state);
174 self.commit_then_renew();
175 }
176 self.peer_id_change_subs.emit(&(), next_id);
177 return Ok(());
178 }
179
180 let doc_state = self.state.lock().unwrap();
181 if doc_state.is_in_txn() {
182 return Err(LoroError::TransactionError(
183 "Cannot change peer id during transaction"
184 .to_string()
185 .into_boxed_str(),
186 ));
187 }
188
189 doc_state
190 .peer
191 .store(peer, std::sync::atomic::Ordering::Relaxed);
192 drop(doc_state);
193 self.peer_id_change_subs.emit(&(), next_id);
194 Ok(())
195 }
196
197 pub(crate) fn renew_peer_id(&self) {
199 let peer_id = DefaultRandom.next_u64();
200 self.set_peer_id(peer_id).unwrap();
201 }
202
203 #[inline]
210 #[must_use]
211 pub fn commit_then_stop(&self) -> (Option<CommitOptions>, LoroMutexGuard<Option<Transaction>>) {
212 let (a, b) = self.commit_with(CommitOptions::new().immediate_renew(false));
213 (a, b.unwrap())
214 }
215
216 #[inline]
221 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
222 self.commit_with(CommitOptions::new().immediate_renew(true))
223 .0
224 }
225
226 fn before_commit(&self) -> Option<LoroMutexGuard<Option<Transaction>>> {
231 let mut txn_guard = self.txn.lock().unwrap();
232 let Some(txn) = txn_guard.as_mut() else {
233 return Some(txn_guard);
234 };
235
236 if txn.is_peer_first_appearance {
237 txn.is_peer_first_appearance = false;
238 drop(txn_guard);
239 self.first_commit_from_peer_subs.emit(
241 &(),
242 FirstCommitFromPeerPayload {
243 peer: self.peer_id(),
244 },
245 );
246 }
247
248 None
249 }
250
251 #[instrument(skip_all)]
257 pub fn commit_with(
258 &self,
259 config: CommitOptions,
260 ) -> (
261 Option<CommitOptions>,
262 Option<LoroMutexGuard<Option<Transaction>>>,
263 ) {
264 if !self.auto_commit.load(Acquire) {
265 let txn_guard = self.txn.lock().unwrap();
266 return (None, Some(txn_guard));
269 }
270
271 loop {
272 if let Some(txn_guard) = self.before_commit() {
273 return (None, Some(txn_guard));
274 }
275
276 let mut txn_guard = self.txn.lock().unwrap();
277 let txn = txn_guard.take();
278 let Some(mut txn) = txn else {
279 return (None, Some(txn_guard));
280 };
281 let on_commit = txn.take_on_commit();
282 if let Some(origin) = config.origin.clone() {
283 txn.set_origin(origin);
284 }
285
286 if let Some(timestamp) = config.timestamp {
287 txn.set_timestamp(timestamp);
288 }
289
290 if let Some(msg) = config.commit_msg.as_ref() {
291 txn.set_msg(Some(msg.clone()));
292 }
293
294 let id_span = txn.id_span();
295 let options = txn.commit().unwrap();
296 if config.immediate_renew {
297 assert!(self.can_edit());
298 let mut t = self.txn().unwrap();
299 if let Some(options) = options.as_ref() {
300 t.set_options(options.clone());
301 }
302 *txn_guard = Some(t);
303 }
304
305 if let Some(on_commit) = on_commit {
306 drop(txn_guard);
307 on_commit(&self.state, &self.oplog, id_span);
308 txn_guard = self.txn.lock().unwrap();
309 if !config.immediate_renew && txn_guard.is_some() {
310 continue;
312 }
313 }
314
315 return (
316 options,
317 if !config.immediate_renew {
318 Some(txn_guard)
319 } else {
320 None
321 },
322 );
323 }
324 }
325
326 pub fn set_next_commit_message(&self, message: &str) {
328 let mut binding = self.txn.lock().unwrap();
329 let Some(txn) = binding.as_mut() else {
330 return;
331 };
332
333 if message.is_empty() {
334 txn.set_msg(None)
335 } else {
336 txn.set_msg(Some(message.into()))
337 }
338 }
339
340 pub fn set_next_commit_origin(&self, origin: &str) {
342 let mut txn = self.txn.lock().unwrap();
343 if let Some(txn) = txn.as_mut() {
344 txn.set_origin(origin.into());
345 }
346 }
347
348 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
350 let mut txn = self.txn.lock().unwrap();
351 if let Some(txn) = txn.as_mut() {
352 txn.set_timestamp(timestamp);
353 }
354 }
355
356 pub fn set_next_commit_options(&self, options: CommitOptions) {
358 let mut txn = self.txn.lock().unwrap();
359 if let Some(txn) = txn.as_mut() {
360 txn.set_options(options);
361 }
362 }
363
364 pub fn clear_next_commit_options(&self) {
366 let mut txn = self.txn.lock().unwrap();
367 if let Some(txn) = txn.as_mut() {
368 txn.set_options(CommitOptions::new());
369 }
370 }
371
372 #[inline]
383 pub fn set_record_timestamp(&self, record: bool) {
384 self.config.set_record_timestamp(record);
385 }
386
387 #[inline]
392 pub fn set_change_merge_interval(&self, interval: i64) {
393 self.config.set_merge_interval(interval);
394 }
395
396 pub fn can_edit(&self) -> bool {
397 !self.is_detached() || self.config.detached_editing()
398 }
399
400 pub fn is_detached_editing_enabled(&self) -> bool {
401 self.config.detached_editing()
402 }
403
404 #[inline]
405 pub fn config_text_style(&self, text_style: StyleConfigMap) {
406 self.config.text_style_config.try_write().unwrap().map = text_style.map;
407 }
408
409 #[inline]
410 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
411 self.config
412 .text_style_config
413 .try_write()
414 .unwrap()
415 .default_style = text_style;
416 }
417 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
418 let doc = Self::new();
419 let (options, _guard) = doc.commit_then_stop();
420 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
421 if mode.is_snapshot() {
422 decode_snapshot(&doc, mode, body)?;
423 drop(_guard);
424 doc.renew_txn_if_auto_commit(options);
425 Ok(doc)
426 } else {
427 Err(LoroError::DecodeError(
428 "Invalid encode mode".to_string().into(),
429 ))
430 }
431 }
432
433 #[inline(always)]
435 pub fn can_reset_with_snapshot(&self) -> bool {
436 let oplog = self.oplog.lock().unwrap();
437 if oplog.batch_importing {
438 return false;
439 }
440
441 if self.is_detached() {
442 return false;
443 }
444
445 oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
446 }
447
448 #[inline(always)]
454 pub fn is_detached(&self) -> bool {
455 self.detached.load(Acquire)
456 }
457
458 pub(crate) fn set_detached(&self, detached: bool) {
459 self.detached.store(detached, Release);
460 }
461
462 #[inline(always)]
463 pub fn peer_id(&self) -> PeerID {
464 self.state
465 .lock()
466 .unwrap()
467 .peer
468 .load(std::sync::atomic::Ordering::Relaxed)
469 }
470
471 #[inline(always)]
472 pub fn detach(&self) {
473 let (options, txn) = self.commit_then_stop();
474 drop(txn);
475 self.set_detached(true);
476 self.renew_txn_if_auto_commit(options);
477 }
478
479 #[inline(always)]
480 pub fn attach(&self) {
481 self.checkout_to_latest()
482 }
483
484 pub fn state_timestamp(&self) -> Timestamp {
487 let f = &self.state.lock().unwrap().frontiers;
488 self.oplog.lock().unwrap().get_timestamp_of_version(f)
489 }
490
491 #[inline(always)]
492 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
493 &self.state
494 }
495
496 #[inline]
497 pub fn get_state_deep_value(&self) -> LoroValue {
498 self.state.lock().unwrap().get_deep_value()
499 }
500
501 #[inline(always)]
502 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
503 &self.oplog
504 }
505
506 pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
507 let (options, txn) = self.commit_then_stop();
508 let ans = self.oplog.lock().unwrap().export_from(vv);
509 drop(txn);
510 self.renew_txn_if_auto_commit(options);
511 ans
512 }
513
514 #[inline(always)]
515 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
516 let s = debug_span!("import", peer = self.peer_id());
517 let _e = s.enter();
518 self.import_with(bytes, Default::default())
519 }
520
521 #[inline]
522 pub fn import_with(
523 &self,
524 bytes: &[u8],
525 origin: InternalString,
526 ) -> Result<ImportStatus, LoroError> {
527 let (options, txn) = self.commit_then_stop();
528 assert!(txn.is_none());
529 let ans = self._import_with(bytes, origin);
530 drop(txn);
531 self.renew_txn_if_auto_commit(options);
532 ans
533 }
534
535 #[tracing::instrument(skip_all)]
536 fn _import_with(
537 &self,
538 bytes: &[u8],
539 origin: InternalString,
540 ) -> Result<ImportStatus, LoroError> {
541 ensure_cov::notify_cov("loro_internal::import");
542 let parsed = parse_header_and_body(bytes, true)?;
543 loro_common::info!("Importing with mode={:?}", &parsed.mode);
544 let result = match parsed.mode {
545 EncodeMode::OutdatedRle => {
546 if self.state.lock().unwrap().is_in_txn() {
547 return Err(LoroError::ImportWhenInTxn);
548 }
549
550 let s = tracing::span!(
551 tracing::Level::INFO,
552 "Import updates ",
553 peer = self.peer_id()
554 );
555 let _e = s.enter();
556 self.update_oplog_and_apply_delta_to_state_if_needed(
557 |oplog| oplog.decode(parsed),
558 origin,
559 )
560 }
561 EncodeMode::OutdatedSnapshot => {
562 if self.can_reset_with_snapshot() {
563 loro_common::info!("Init by snapshot {}", self.peer_id());
564 decode_snapshot(self, parsed.mode, parsed.body)
565 } else {
566 self.update_oplog_and_apply_delta_to_state_if_needed(
567 |oplog| oplog.decode(parsed),
568 origin,
569 )
570 }
571 }
572 EncodeMode::FastSnapshot => {
573 if self.can_reset_with_snapshot() {
574 ensure_cov::notify_cov("loro_internal::import::snapshot");
575 loro_common::info!("Init by fast snapshot {}", self.peer_id());
576 decode_snapshot(self, parsed.mode, parsed.body)
577 } else {
578 self.update_oplog_and_apply_delta_to_state_if_needed(
579 |oplog| oplog.decode(parsed),
580 origin,
581 )
582
583 }
588 }
589 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
590 |oplog| oplog.decode(parsed),
591 origin,
592 ),
593 EncodeMode::Auto => {
594 unreachable!()
595 }
596 };
597
598 self.emit_events();
599
600 result
601 }
602
603 #[tracing::instrument(skip_all)]
604 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
605 &self,
606 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
607 origin: InternalString,
608 ) -> Result<ImportStatus, LoroError> {
609 let mut oplog = self.oplog.lock().unwrap();
610 if !self.is_detached() {
611 let old_vv = oplog.vv().clone();
612 let old_frontiers = oplog.frontiers().clone();
613 let result = f(&mut oplog);
614 if &old_vv != oplog.vv() {
615 let mut diff = DiffCalculator::new(false);
616 let (diff, diff_mode) = diff.calc_diff_internal(
617 &oplog,
618 &old_vv,
619 &old_frontiers,
620 oplog.vv(),
621 oplog.dag.get_frontiers(),
622 None,
623 );
624 let mut state = self.state.lock().unwrap();
625 state.apply_diff(
626 InternalDocDiff {
627 origin,
628 diff: (diff).into(),
629 by: EventTriggerKind::Import,
630 new_version: Cow::Owned(oplog.frontiers().clone()),
631 },
632 diff_mode,
633 );
634 }
635 result
636 } else {
637 f(&mut oplog)
638 }
639 }
640
641 fn emit_events(&self) {
642 let events = {
644 let mut state = self.state.lock().unwrap();
645 state.take_events()
646 };
647 for event in events {
648 self.observer.emit(event);
649 }
650 }
651
652 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
653 let mut state = self.state.lock().unwrap();
654 state.take_events()
655 }
656
657 #[instrument(skip_all)]
658 pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
659 if self.is_shallow() {
660 return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
661 }
662 let (options, txn) = self.commit_then_stop();
663 drop(txn);
664 let ans = export_snapshot(self);
665 self.renew_txn_if_auto_commit(options);
666 Ok(ans)
667 }
668
669 #[tracing::instrument(skip_all)]
673 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
674 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
675 let (options, txn) = self.commit_then_stop();
676 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
677 |oplog| crate::encoding::json_schema::import_json(oplog, json),
678 Default::default(),
679 );
680 self.emit_events();
681 drop(txn);
682 self.renew_txn_if_auto_commit(options);
683 result
684 }
685
686 pub fn export_json_updates(
687 &self,
688 start_vv: &VersionVector,
689 end_vv: &VersionVector,
690 with_peer_compression: bool,
691 ) -> JsonSchema {
692 let (options, txn) = self.commit_then_stop();
693 drop(txn);
694 let oplog = self.oplog.lock().unwrap();
695 let mut start_vv = start_vv;
696 let _temp: Option<VersionVector>;
697 if !oplog.dag.shallow_since_vv().is_empty() {
698 let mut include_all = true;
700 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
701 if start_vv.get(peer).unwrap_or(&0) < counter {
702 include_all = false;
703 break;
704 }
705 }
706 if !include_all {
707 let mut vv = start_vv.clone();
708 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
709 vv.extend_to_include_end_id(ID::new(peer, counter));
710 }
711 _temp = Some(vv);
712 start_vv = _temp.as_ref().unwrap();
713 }
714 }
715
716 let json = crate::encoding::json_schema::export_json(
717 &oplog,
718 start_vv,
719 end_vv,
720 with_peer_compression,
721 );
722 drop(oplog);
723 self.renew_txn_if_auto_commit(options);
724 json
725 }
726
727 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
728 let oplog = self.oplog.lock().unwrap();
729 let mut changes = export_json_in_id_span(&oplog, id_span);
730 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
731 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
732 changes.push(change_json);
733 }
734 changes
735 }
736
737 #[inline]
739 pub fn oplog_vv(&self) -> VersionVector {
740 self.oplog.lock().unwrap().vv().clone()
741 }
742
743 #[inline]
745 pub fn state_vv(&self) -> VersionVector {
746 let oplog = self.oplog.lock().unwrap();
747 let f = &self.state.lock().unwrap().frontiers;
748 oplog.dag.frontiers_to_vv(f).unwrap()
749 }
750
751 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
752 let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
753 if let LoroValue::Container(c) = value {
754 Some(ValueOrHandler::Handler(Handler::new_attached(
755 c.clone(),
756 self.clone(),
757 )))
758 } else {
759 Some(ValueOrHandler::Value(value))
760 }
761 }
762
763 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
765 let path = str_to_path(path)?;
766 self.get_by_path(&path)
767 }
768
769 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
770 let arena = &self.arena;
771 let txn = self.txn.lock().unwrap();
772 let txn = txn.as_ref()?;
773 let ops_ = txn.local_ops();
774 let new_id = ID {
775 peer: *txn.peer(),
776 counter: ops_.first()?.counter,
777 };
778 let change = ChangeRef {
779 id: &new_id,
780 deps: txn.frontiers(),
781 timestamp: &txn
782 .timestamp()
783 .as_ref()
784 .copied()
785 .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
786 commit_msg: txn.msg(),
787 ops: ops_,
788 lamport: txn.lamport(),
789 };
790 let json = encode_change_to_json(change, arena);
791 Some(json)
792 }
793
794 #[inline]
795 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
796 if self.has_container(&id) {
797 Some(Handler::new_attached(id, self.clone()))
798 } else {
799 None
800 }
801 }
802
803 #[inline]
806 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
807 let id = id.into_container_id(&self.arena, ContainerType::Text);
808 assert!(self.has_container(&id));
809 Handler::new_attached(id, self.clone()).into_text().unwrap()
810 }
811
812 #[inline]
815 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
816 let id = id.into_container_id(&self.arena, ContainerType::List);
817 assert!(self.has_container(&id));
818 Handler::new_attached(id, self.clone()).into_list().unwrap()
819 }
820
821 #[inline]
824 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
825 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
826 assert!(self.has_container(&id));
827 Handler::new_attached(id, self.clone())
828 .into_movable_list()
829 .unwrap()
830 }
831
832 #[inline]
835 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
836 let id = id.into_container_id(&self.arena, ContainerType::Map);
837 assert!(self.has_container(&id));
838 Handler::new_attached(id, self.clone()).into_map().unwrap()
839 }
840
841 #[inline]
844 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
845 let id = id.into_container_id(&self.arena, ContainerType::Tree);
846 assert!(self.has_container(&id));
847 Handler::new_attached(id, self.clone()).into_tree().unwrap()
848 }
849
850 #[cfg(feature = "counter")]
851 pub fn get_counter<I: IntoContainerId>(
852 &self,
853 id: I,
854 ) -> crate::handler::counter::CounterHandler {
855 let id = id.into_container_id(&self.arena, ContainerType::Counter);
856 assert!(self.has_container(&id));
857 Handler::new_attached(id, self.clone())
858 .into_counter()
859 .unwrap()
860 }
861
862 #[must_use]
863 pub fn has_container(&self, id: &ContainerID) -> bool {
864 if id.is_root() {
865 return true;
866 }
867
868 let exist = self.state.lock().unwrap().does_container_exist(id);
869 exist
870 }
871
872 #[instrument(level = "info", skip_all)]
886 pub fn undo_internal(
887 &self,
888 id_span: IdSpan,
889 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
890 post_transform_base: Option<&DiffBatch>,
891 before_diff: &mut dyn FnMut(&DiffBatch),
892 ) -> LoroResult<CommitWhenDrop> {
893 if !self.can_edit() {
894 return Err(LoroError::EditWhenDetached);
895 }
896
897 let (options, txn) = self.commit_then_stop();
898 if !self
899 .oplog()
900 .lock()
901 .unwrap()
902 .vv()
903 .includes_id(id_span.id_last())
904 {
905 self.renew_txn_if_auto_commit(options);
906 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
907 }
908
909 let (was_recording, latest_frontiers) = {
910 let mut state = self.state.lock().unwrap();
911 let was_recording = state.is_recording();
912 state.stop_and_clear_recording();
913 (was_recording, state.frontiers.clone())
914 };
915
916 let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
917 let diff = crate::undo::undo(
918 spans,
919 match post_transform_base {
920 Some(d) => Either::Right(d),
921 None => Either::Left(&latest_frontiers),
922 },
923 |from, to| {
924 self._checkout_without_emitting(from, false, false).unwrap();
925 self.state.lock().unwrap().start_recording();
926 self._checkout_without_emitting(to, false, false).unwrap();
927 let mut state = self.state.lock().unwrap();
928 let e = state.take_events();
929 state.stop_and_clear_recording();
930 DiffBatch::new(e)
931 },
932 before_diff,
933 );
934
935 self._checkout_without_emitting(&latest_frontiers, false, false)?;
939 self.set_detached(false);
940 if was_recording {
941 self.state.lock().unwrap().start_recording();
942 }
943 drop(txn);
944 self.start_auto_commit();
945 if let Err(e) = self._apply_diff(diff, container_remap, true) {
949 warn!("Undo Failed {:?}", e);
950 }
951
952 if let Some(options) = options {
953 self.set_next_commit_options(options);
954 }
955 Ok(CommitWhenDrop {
956 doc: self,
957 default_options: CommitOptions::new().origin("undo"),
958 })
959 }
960
961 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
967 let f = self.state_frontiers();
970 let diff = self.diff(&f, target)?;
971 self._apply_diff(diff, &mut Default::default(), false)
972 }
973
974 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
979 {
980 let oplog = self.oplog.lock().unwrap();
982 for id in a.iter() {
983 if !oplog.dag.contains(id) {
984 return Err(LoroError::FrontiersNotFound(id));
985 }
986 }
987 for id in b.iter() {
988 if !oplog.dag.contains(id) {
989 return Err(LoroError::FrontiersNotFound(id));
990 }
991 }
992 }
993
994 let (options, txn) = self.commit_then_stop();
995 let was_detached = self.is_detached();
996 let old_frontiers = self.state_frontiers();
997 let was_recording = {
998 let mut state = self.state.lock().unwrap();
999 let is_recording = state.is_recording();
1000 state.stop_and_clear_recording();
1001 is_recording
1002 };
1003 self._checkout_without_emitting(a, true, false).unwrap();
1004 self.state.lock().unwrap().start_recording();
1005 self._checkout_without_emitting(b, true, false).unwrap();
1006 let e = {
1007 let mut state = self.state.lock().unwrap();
1008 let e = state.take_events();
1009 state.stop_and_clear_recording();
1010 e
1011 };
1012 self._checkout_without_emitting(&old_frontiers, false, false)
1013 .unwrap();
1014 drop(txn);
1015 if !was_detached {
1016 self.set_detached(false);
1017 self.renew_txn_if_auto_commit(options);
1018 }
1019 if was_recording {
1020 self.state.lock().unwrap().start_recording();
1021 }
1022 Ok(DiffBatch::new(e))
1023 }
1024
1025 #[inline(always)]
1027 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1028 self._apply_diff(diff, &mut Default::default(), true)
1029 }
1030
1031 pub(crate) fn _apply_diff(
1043 &self,
1044 diff: DiffBatch,
1045 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1046 skip_unreachable: bool,
1047 ) -> LoroResult<()> {
1048 if !self.can_edit() {
1049 return Err(LoroError::EditWhenDetached);
1050 }
1051
1052 let mut ans: LoroResult<()> = Ok(());
1053 let mut missing_containers: Vec<ContainerID> = Vec::new();
1054 for (mut id, diff) in diff.into_iter() {
1055 let mut remapped = false;
1056 while let Some(rid) = container_remap.get(&id) {
1057 remapped = true;
1058 id = rid.clone();
1059 }
1060
1061 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1062 missing_containers.push(id);
1063 continue;
1064 }
1065
1066 if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1067 continue;
1068 }
1069
1070 let Some(h) = self.get_handler(id.clone()) else {
1071 return Err(LoroError::ContainersNotFound {
1072 containers: Box::new(vec![id]),
1073 });
1074 };
1075 if let Err(e) = h.apply_diff(diff, container_remap) {
1076 ans = Err(e);
1077 }
1078 }
1079
1080 if !missing_containers.is_empty() {
1081 return Err(LoroError::ContainersNotFound {
1082 containers: Box::new(missing_containers),
1083 });
1084 }
1085
1086 ans
1087 }
1088
1089 #[inline]
1091 pub fn diagnose_size(&self) {
1092 self.oplog().lock().unwrap().diagnose_size();
1093 }
1094
1095 #[inline]
1096 pub fn oplog_frontiers(&self) -> Frontiers {
1097 self.oplog().lock().unwrap().frontiers().clone()
1098 }
1099
1100 #[inline]
1101 pub fn state_frontiers(&self) -> Frontiers {
1102 self.state.lock().unwrap().frontiers.clone()
1103 }
1104
1105 #[inline]
1109 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1110 self.oplog().lock().unwrap().cmp_with_frontiers(other)
1111 }
1112
1113 #[inline]
1117 pub fn cmp_frontiers(
1118 &self,
1119 a: &Frontiers,
1120 b: &Frontiers,
1121 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1122 self.oplog().lock().unwrap().cmp_frontiers(a, b)
1123 }
1124
1125 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1126 let mut state = self.state.lock().unwrap();
1127 if !state.is_recording() {
1128 state.start_recording();
1129 }
1130
1131 self.observer.subscribe_root(callback)
1132 }
1133
1134 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1135 let mut state = self.state.lock().unwrap();
1136 if !state.is_recording() {
1137 state.start_recording();
1138 }
1139
1140 self.observer.subscribe(container_id, callback)
1141 }
1142
1143 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1144 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1145 activate();
1146 sub
1147 }
1148
1149 #[tracing::instrument(skip_all)]
1151 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1152 if bytes.is_empty() {
1153 return Ok(ImportStatus::default());
1154 }
1155
1156 if bytes.len() == 1 {
1157 return self.import(&bytes[0]);
1158 }
1159
1160 let mut success = VersionRange::default();
1161 let mut pending = VersionRange::default();
1162 let mut meta_arr = bytes
1163 .iter()
1164 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1165 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1166 meta_arr.sort_by(|a, b| {
1167 a.0.mode
1168 .cmp(&b.0.mode)
1169 .then(b.0.change_num.cmp(&a.0.change_num))
1170 });
1171
1172 let (options, txn) = self.commit_then_stop();
1173 drop(txn);
1174 let is_detached = self.is_detached();
1175 self.detach();
1176 self.oplog.lock().unwrap().batch_importing = true;
1177 let mut err = None;
1178 for (_meta, data) in meta_arr {
1179 match self.import(data) {
1180 Ok(s) => {
1181 for (peer, (start, end)) in s.success.iter() {
1182 match success.0.entry(*peer) {
1183 Entry::Occupied(mut e) => {
1184 e.get_mut().1 = *end.max(&e.get().1);
1185 }
1186 Entry::Vacant(e) => {
1187 e.insert((*start, *end));
1188 }
1189 }
1190 }
1191
1192 if let Some(p) = s.pending.as_ref() {
1193 for (&peer, &(start, end)) in p.iter() {
1194 match pending.0.entry(peer) {
1195 Entry::Occupied(mut e) => {
1196 e.get_mut().0 = start.min(e.get().0);
1197 e.get_mut().1 = end.min(e.get().1);
1198 }
1199 Entry::Vacant(e) => {
1200 e.insert((start, end));
1201 }
1202 }
1203 }
1204 }
1205 }
1206 Err(e) => {
1207 err = Some(e);
1208 }
1209 }
1210 }
1211
1212 let mut oplog = self.oplog.lock().unwrap();
1213 oplog.batch_importing = false;
1214 drop(oplog);
1215
1216 if !is_detached {
1217 self.checkout_to_latest();
1218 }
1219
1220 self.renew_txn_if_auto_commit(options);
1221 if let Some(err) = err {
1222 return Err(err);
1223 }
1224
1225 Ok(ImportStatus {
1226 success,
1227 pending: if pending.is_empty() {
1228 None
1229 } else {
1230 Some(pending)
1231 },
1232 })
1233 }
1234
1235 #[inline]
1237 pub fn get_value(&self) -> LoroValue {
1238 self.state.lock().unwrap().get_value()
1239 }
1240
1241 #[inline]
1243 pub fn get_deep_value(&self) -> LoroValue {
1244 self.state.lock().unwrap().get_deep_value()
1245 }
1246
1247 #[inline]
1249 pub fn get_deep_value_with_id(&self) -> LoroValue {
1250 self.state.lock().unwrap().get_deep_value_with_id()
1251 }
1252
1253 pub fn checkout_to_latest(&self) {
1254 let (options, _guard) = self.commit_then_stop();
1255 if !self.is_detached() {
1256 drop(_guard);
1257 self.renew_txn_if_auto_commit(options);
1258 return;
1259 }
1260
1261 self._checkout_to_latest_without_commit(true);
1262 drop(_guard);
1263 self.renew_txn_if_auto_commit(options);
1264 }
1265
1266 pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1268 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1269 let f = self.oplog_frontiers();
1270 let this = &self;
1271 let frontiers = &f;
1272 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1273 .unwrap(); this.emit_events();
1276 if this.config.detached_editing() {
1277 this.renew_peer_id();
1278 }
1279
1280 self.set_detached(false);
1281 });
1282 }
1283
1284 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1289 let (options, guard) = self.commit_then_stop();
1290 self._checkout_without_emitting(frontiers, true, true)?;
1291 self.emit_events();
1292 drop(guard);
1293 if self.config.detached_editing() {
1294 self.renew_peer_id();
1295 self.renew_txn_if_auto_commit(options);
1296 } else if !self.is_detached() {
1297 self.renew_txn_if_auto_commit(options);
1298 }
1299
1300 Ok(())
1301 }
1302
1303 #[instrument(level = "info", skip(self))]
1305 pub(crate) fn _checkout_without_emitting(
1306 &self,
1307 frontiers: &Frontiers,
1308 to_shrink_frontiers: bool,
1309 to_commit_then_renew: bool,
1310 ) -> Result<(), LoroError> {
1311 assert!(self.txn.is_locked());
1312 let from_frontiers = self.state_frontiers();
1313 loro_common::info!(
1314 "checkout from={:?} to={:?} cur_vv={:?}",
1315 from_frontiers,
1316 frontiers,
1317 self.oplog_vv()
1318 );
1319
1320 if &from_frontiers == frontiers {
1321 return Ok(());
1322 }
1323
1324 let oplog = self.oplog.lock().unwrap();
1325 if oplog.dag.is_before_shallow_root(frontiers) {
1326 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1327 }
1328
1329 let frontiers = if to_shrink_frontiers {
1330 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1331 } else {
1332 frontiers.clone()
1333 };
1334
1335 if from_frontiers == frontiers {
1336 return Ok(());
1337 }
1338
1339 let mut state = self.state.lock().unwrap();
1340 let mut calc = self.diff_calculator.lock().unwrap();
1341 for i in frontiers.iter() {
1342 if !oplog.dag.contains(i) {
1343 return Err(LoroError::FrontiersNotFound(i));
1344 }
1345 }
1346
1347 let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1348 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1349 return Err(LoroError::NotFoundError(
1350 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1351 ));
1352 };
1353
1354 self.set_detached(true);
1355 let (diff, diff_mode) =
1356 calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1357 state.apply_diff(
1358 InternalDocDiff {
1359 origin: "checkout".into(),
1360 diff: Cow::Owned(diff),
1361 by: EventTriggerKind::Checkout,
1362 new_version: Cow::Owned(frontiers.clone()),
1363 },
1364 diff_mode,
1365 );
1366
1367 Ok(())
1368 }
1369
1370 #[inline]
1371 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1372 self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1373 }
1374
1375 #[inline]
1376 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1377 self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1378 }
1379
1380 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1384 self.import(&other.export_from(&self.oplog_vv()))
1385 }
1386
1387 pub(crate) fn arena(&self) -> &SharedArena {
1388 &self.arena
1389 }
1390
1391 #[inline]
1392 pub fn len_ops(&self) -> usize {
1393 let oplog = self.oplog.lock().unwrap();
1394 let ans = oplog.vv().iter().map(|(_, ops)| *ops).sum::<i32>() as usize;
1395 if oplog.is_shallow() {
1396 let sub = oplog
1397 .shallow_since_vv()
1398 .iter()
1399 .map(|(_, ops)| *ops)
1400 .sum::<i32>() as usize;
1401 ans - sub
1402 } else {
1403 ans
1404 }
1405 }
1406
1407 #[inline]
1408 pub fn len_changes(&self) -> usize {
1409 let oplog = self.oplog.lock().unwrap();
1410 oplog.len_changes()
1411 }
1412
1413 pub fn config(&self) -> &Configure {
1414 &self.config
1415 }
1416
1417 pub fn check_state_diff_calc_consistency_slow(&self) {
1422 {
1424 static IS_CHECKING: std::sync::atomic::AtomicBool =
1425 std::sync::atomic::AtomicBool::new(false);
1426 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1427 return;
1428 }
1429
1430 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1431 let peer_id = self.peer_id();
1432 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1433 let _g = s.enter();
1434 let options = self.commit_then_stop().0;
1435 self.oplog.lock().unwrap().check_dag_correctness();
1436 if self.is_shallow() {
1437 let initial_snapshot = self
1448 .export(ExportMode::state_only(Some(
1449 &self.shallow_since_frontiers(),
1450 )))
1451 .unwrap();
1452
1453 let doc = LoroDoc::new();
1455 doc.import(&initial_snapshot).unwrap();
1456 self.checkout(&self.shallow_since_frontiers()).unwrap();
1457 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1458
1459 let updates = self.export(ExportMode::all_updates()).unwrap();
1461
1462 doc.import(&updates).unwrap();
1464 self.checkout_to_latest();
1465
1466 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1469 let mut calculated_state = doc.app_state().lock().unwrap();
1470 let mut current_state = self.app_state().lock().unwrap();
1471 current_state.check_is_the_same(&mut calculated_state);
1472 } else {
1473 let f = self.state_frontiers();
1474 let vv = self
1475 .oplog()
1476 .lock()
1477 .unwrap()
1478 .dag
1479 .frontiers_to_vv(&f)
1480 .unwrap();
1481 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1482 let doc = Self::new();
1483 doc.import(&bytes).unwrap();
1484 let mut calculated_state = doc.app_state().lock().unwrap();
1485 let mut current_state = self.app_state().lock().unwrap();
1486 current_state.check_is_the_same(&mut calculated_state);
1487 }
1488
1489 self.renew_txn_if_auto_commit(options);
1490 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1491 }
1492 }
1493
1494 #[inline]
1495 pub fn log_estimated_size(&self) {
1496 let state = self.state.lock().unwrap();
1497 state.log_estimated_size();
1498 }
1499
1500 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1501 self.query_pos_internal(pos, true)
1502 }
1503
1504 pub(crate) fn query_pos_internal(
1506 &self,
1507 pos: &Cursor,
1508 ret_event_index: bool,
1509 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1510 let mut state = self.state.lock().unwrap();
1511 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1512 Ok(PosQueryResult {
1513 update: None,
1514 current: AbsolutePosition {
1515 pos: ans,
1516 side: pos.side,
1517 },
1518 })
1519 } else {
1520 drop(state);
1532 self.commit_then_renew();
1533 let oplog = self.oplog().lock().unwrap();
1534 if let Some(id) = pos.id {
1536 let idx = oplog
1537 .arena
1538 .id_to_idx(&pos.container)
1539 .ok_or(CannotFindRelativePosition::ContainerDeleted)?;
1540 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1542 if oplog.shallow_since_vv().includes_id(id) {
1543 return Err(CannotFindRelativePosition::HistoryCleared);
1544 }
1545
1546 tracing::error!("Cannot find id {}", id);
1547 return Err(CannotFindRelativePosition::IdNotFound);
1548 };
1549 let mut diff_calc = DiffCalculator::new(true);
1551 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1552 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1553 diff_calc.calc_diff_internal(
1555 &oplog,
1556 before,
1557 &before_frontiers,
1558 oplog.vv(),
1559 oplog.frontiers(),
1560 Some(&|target| idx == target),
1561 );
1562 let depth = self.arena.get_depth(idx);
1564 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1565 match diff_calc {
1566 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1567 let c = text.get_id_latest_pos(id).unwrap();
1568 let new_pos = c.pos;
1569 let handler = self.get_text(&pos.container);
1570 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1571 Ok(PosQueryResult {
1572 update: handler.get_cursor(current_pos, c.side),
1573 current: AbsolutePosition {
1574 pos: current_pos,
1575 side: c.side,
1576 },
1577 })
1578 }
1579 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1580 let c = list.get_id_latest_pos(id).unwrap();
1581 let new_pos = c.pos;
1582 let handler = self.get_list(&pos.container);
1583 Ok(PosQueryResult {
1584 update: handler.get_cursor(new_pos, c.side),
1585 current: AbsolutePosition {
1586 pos: new_pos,
1587 side: c.side,
1588 },
1589 })
1590 }
1591 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1592 let c = list.get_id_latest_pos(id).unwrap();
1593 let new_pos = c.pos;
1594 let handler = self.get_movable_list(&pos.container);
1595 let new_pos = handler.op_pos_to_user_pos(new_pos);
1596 Ok(PosQueryResult {
1597 update: handler.get_cursor(new_pos, c.side),
1598 current: AbsolutePosition {
1599 pos: new_pos,
1600 side: c.side,
1601 },
1602 })
1603 }
1604 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1605 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1606 #[cfg(feature = "counter")]
1607 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1608 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1609 }
1610 } else {
1611 match pos.container.container_type() {
1612 ContainerType::Text => {
1613 let text = self.get_text(&pos.container);
1614 Ok(PosQueryResult {
1615 update: Some(Cursor {
1616 id: None,
1617 container: text.id(),
1618 side: pos.side,
1619 origin_pos: text.len_unicode(),
1620 }),
1621 current: AbsolutePosition {
1622 pos: text.len_event(),
1623 side: pos.side,
1624 },
1625 })
1626 }
1627 ContainerType::List => {
1628 let list = self.get_list(&pos.container);
1629 Ok(PosQueryResult {
1630 update: Some(Cursor {
1631 id: None,
1632 container: list.id(),
1633 side: pos.side,
1634 origin_pos: list.len(),
1635 }),
1636 current: AbsolutePosition {
1637 pos: list.len(),
1638 side: pos.side,
1639 },
1640 })
1641 }
1642 ContainerType::MovableList => {
1643 let list = self.get_movable_list(&pos.container);
1644 Ok(PosQueryResult {
1645 update: Some(Cursor {
1646 id: None,
1647 container: list.id(),
1648 side: pos.side,
1649 origin_pos: list.len(),
1650 }),
1651 current: AbsolutePosition {
1652 pos: list.len(),
1653 side: pos.side,
1654 },
1655 })
1656 }
1657 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1658 unreachable!()
1659 }
1660 #[cfg(feature = "counter")]
1661 ContainerType::Counter => unreachable!(),
1662 }
1663 }
1664 }
1665 }
1666
1667 pub fn free_history_cache(&self) {
1672 self.oplog.lock().unwrap().free_history_cache();
1673 }
1674
1675 pub fn free_diff_calculator(&self) {
1677 *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1678 }
1679
1680 pub fn has_history_cache(&self) -> bool {
1683 self.oplog.lock().unwrap().has_history_cache()
1684 }
1685
1686 #[inline]
1690 pub fn compact_change_store(&self) {
1691 self.commit_then_renew();
1692 self.oplog.lock().unwrap().compact_change_store();
1693 }
1694
1695 #[inline]
1699 pub fn analyze(&self) -> DocAnalysis {
1700 DocAnalysis::analyze(self)
1701 }
1702
1703 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1705 let mut state = self.state.lock().unwrap();
1706 let idx = state.arena.id_to_idx(id)?;
1707 state.get_path(idx)
1708 }
1709
1710 #[instrument(skip(self))]
1711 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1712 let (options, txn) = self.commit_then_stop();
1713 let ans = match mode {
1714 ExportMode::Snapshot => export_fast_snapshot(self),
1715 ExportMode::Updates { from } => export_fast_updates(self, &from),
1716 ExportMode::UpdatesInRange { spans } => {
1717 export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1718 }
1719 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1720 ExportMode::StateOnly(f) => match f {
1721 Some(f) => export_state_only_snapshot(self, &f)?,
1722 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1723 },
1724 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1725 };
1726
1727 drop(txn);
1728 self.renew_txn_if_auto_commit(options);
1729 Ok(ans)
1730 }
1731
1732 pub fn shallow_since_vv(&self) -> ImVersionVector {
1738 self.oplog().lock().unwrap().shallow_since_vv().clone()
1739 }
1740
1741 pub fn shallow_since_frontiers(&self) -> Frontiers {
1742 self.oplog()
1743 .lock()
1744 .unwrap()
1745 .shallow_since_frontiers()
1746 .clone()
1747 }
1748
1749 pub fn is_shallow(&self) -> bool {
1751 !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1752 }
1753
1754 pub fn get_pending_txn_len(&self) -> usize {
1759 if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1760 txn.len()
1761 } else {
1762 0
1763 }
1764 }
1765
1766 #[inline]
1767 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1768 self.oplog().lock().unwrap().dag.find_path(from, to)
1769 }
1770
1771 pub fn subscribe_first_commit_from_peer(
1777 &self,
1778 callback: FirstCommitFromPeerCallback,
1779 ) -> Subscription {
1780 let (s, enable) = self
1781 .first_commit_from_peer_subs
1782 .inner()
1783 .insert((), callback);
1784 enable();
1785 s
1786 }
1787
1788 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1793 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1794 enable();
1795 s
1796 }
1797}
1798
1799#[derive(Debug, thiserror::Error)]
1800pub enum ChangeTravelError {
1801 #[error("Target id not found {0:?}")]
1802 TargetIdNotFound(ID),
1803 #[error("The shallow history of the doc doesn't include the target version")]
1804 TargetVersionNotIncluded,
1805}
1806
1807impl LoroDoc {
1808 pub fn travel_change_ancestors(
1809 &self,
1810 ids: &[ID],
1811 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1812 ) -> Result<(), ChangeTravelError> {
1813 self.commit_then_renew();
1814 struct PendingNode(ChangeMeta);
1815 impl PartialEq for PendingNode {
1816 fn eq(&self, other: &Self) -> bool {
1817 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1818 }
1819 }
1820
1821 impl Eq for PendingNode {}
1822 impl PartialOrd for PendingNode {
1823 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1824 Some(self.cmp(other))
1825 }
1826 }
1827
1828 impl Ord for PendingNode {
1829 fn cmp(&self, other: &Self) -> Ordering {
1830 self.0
1831 .lamport_last()
1832 .cmp(&other.0.lamport_last())
1833 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1834 }
1835 }
1836
1837 for id in ids {
1838 let op_log = &self.oplog().lock().unwrap();
1839 if !op_log.vv().includes_id(*id) {
1840 return Err(ChangeTravelError::TargetIdNotFound(*id));
1841 }
1842 if op_log.dag.shallow_since_vv().includes_id(*id) {
1843 return Err(ChangeTravelError::TargetVersionNotIncluded);
1844 }
1845 }
1846
1847 let mut visited = FxHashSet::default();
1848 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1849 for id in ids {
1850 pending.push(PendingNode(ChangeMeta::from_change(
1851 &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1852 )));
1853 }
1854 while let Some(PendingNode(node)) = pending.pop() {
1855 let deps = node.deps.clone();
1856 if f(node).is_break() {
1857 break;
1858 }
1859
1860 for dep in deps.iter() {
1861 let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1862 continue;
1863 };
1864 if visited.contains(&dep_node.id) {
1865 continue;
1866 }
1867
1868 visited.insert(dep_node.id);
1869 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1870 }
1871 }
1872
1873 Ok(())
1874 }
1875
1876 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1877 self.commit_then_renew();
1878 let mut set = FxHashSet::default();
1879 let oplog = &self.oplog().lock().unwrap();
1880 for op in oplog.iter_ops(id.to_span(len)) {
1881 let id = oplog.arena.get_container_id(op.container()).unwrap();
1882 set.insert(id);
1883 }
1884
1885 set
1886 }
1887
1888 pub fn delete_root_container(&self, cid: ContainerID) {
1889 if !cid.is_root() {
1890 return;
1891 }
1892
1893 if self.arena.id_to_idx(&cid).is_none() {
1894 return;
1896 }
1897
1898 let Some(h) = self.get_handler(cid.clone()) else {
1899 return;
1900 };
1901
1902 if let Err(e) = h.clear() {
1903 eprintln!("Failed to clear handler: {:?}", e);
1904 return;
1905 }
1906 self.config
1907 .deleted_root_containers
1908 .lock()
1909 .unwrap()
1910 .insert(cid);
1911 }
1912
1913 pub fn set_hide_empty_root_containers(&self, hide: bool) {
1914 self.config
1915 .hide_empty_root_containers
1916 .store(hide, std::sync::atomic::Ordering::Relaxed);
1917 }
1918}
1919
1920fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
1922 let start_vv = oplog
1923 .dag
1924 .frontiers_to_vv(&id.into())
1925 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
1926 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
1927 for op in change.ops.iter().rev() {
1928 if op.container != idx {
1929 continue;
1930 }
1931 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
1932 if d.id_start.to_span(d.atom_len()).contains(id) {
1933 return Some(ID::new(change.peer(), op.counter));
1934 }
1935 }
1936 }
1937 }
1938
1939 None
1940}
1941
1942#[derive(Debug)]
1943pub struct CommitWhenDrop<'a> {
1944 doc: &'a LoroDoc,
1945 default_options: CommitOptions,
1946}
1947
1948impl Drop for CommitWhenDrop<'_> {
1949 fn drop(&mut self) {
1950 {
1951 let mut guard = self.doc.txn.lock().unwrap();
1952 if let Some(txn) = guard.as_mut() {
1953 txn.set_default_options(std::mem::take(&mut self.default_options));
1954 };
1955 }
1956
1957 self.doc.commit_then_renew();
1958 }
1959}
1960
1961#[derive(Debug, Clone)]
1963pub struct CommitOptions {
1964 pub origin: Option<InternalString>,
1967
1968 pub immediate_renew: bool,
1971
1972 pub timestamp: Option<Timestamp>,
1975
1976 pub commit_msg: Option<Arc<str>>,
1978}
1979
1980impl CommitOptions {
1981 pub fn new() -> Self {
1983 Self {
1984 origin: None,
1985 immediate_renew: true,
1986 timestamp: None,
1987 commit_msg: None,
1988 }
1989 }
1990
1991 pub fn origin(mut self, origin: &str) -> Self {
1993 self.origin = Some(origin.into());
1994 self
1995 }
1996
1997 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
1999 self.immediate_renew = immediate_renew;
2000 self
2001 }
2002
2003 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2007 self.timestamp = Some(timestamp);
2008 self
2009 }
2010
2011 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2013 self.commit_msg = Some(commit_msg.into());
2014 self
2015 }
2016
2017 pub fn set_origin(&mut self, origin: Option<&str>) {
2019 self.origin = origin.map(|x| x.into())
2020 }
2021
2022 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2024 self.timestamp = timestamp;
2025 }
2026}
2027
2028impl Default for CommitOptions {
2029 fn default() -> Self {
2030 Self::new()
2031 }
2032}
2033
2034#[cfg(test)]
2035mod test {
2036 use loro_common::ID;
2037
2038 use crate::{version::Frontiers, LoroDoc, ToJson};
2039
2040 #[test]
2041 fn test_sync() {
2042 fn is_send_sync<T: Send + Sync>(_v: T) {}
2043 let loro = super::LoroDoc::new();
2044 is_send_sync(loro)
2045 }
2046
2047 #[test]
2048 fn test_checkout() {
2049 let loro = LoroDoc::new();
2050 loro.set_peer_id(1).unwrap();
2051 let text = loro.get_text("text");
2052 let map = loro.get_map("map");
2053 let list = loro.get_list("list");
2054 let mut txn = loro.txn().unwrap();
2055 for i in 0..10 {
2056 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2057 text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
2058 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2059 }
2060 txn.commit().unwrap();
2061 let b = LoroDoc::new();
2062 b.import(&loro.export_snapshot().unwrap()).unwrap();
2063 loro.checkout(&Frontiers::default()).unwrap();
2064 {
2065 let json = &loro.get_deep_value();
2066 assert_eq!(json.to_json(), r#"{"text":"","list":[],"map":{}}"#);
2067 }
2068
2069 b.checkout(&ID::new(1, 2).into()).unwrap();
2070 {
2071 let json = &b.get_deep_value();
2072 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":0}}"#);
2073 }
2074
2075 loro.checkout(&ID::new(1, 3).into()).unwrap();
2076 {
2077 let json = &loro.get_deep_value();
2078 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":1}}"#);
2079 }
2080
2081 b.checkout(&ID::new(1, 29).into()).unwrap();
2082 {
2083 let json = &b.get_deep_value();
2084 assert_eq!(
2085 json.to_json(),
2086 r#"{"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}}"#
2087 );
2088 }
2089 }
2090
2091 #[test]
2092 fn import_batch_err_181() {
2093 let a = LoroDoc::new_auto_commit();
2094 let update_a = a.export_snapshot();
2095 let b = LoroDoc::new_auto_commit();
2096 b.import_batch(&[update_a.unwrap()]).unwrap();
2097 b.get_text("text").insert(0, "hello").unwrap();
2098 b.commit_then_renew();
2099 let oplog = b.oplog().lock().unwrap();
2100 drop(oplog);
2101 b.export_from(&Default::default());
2102 }
2103}