1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::{AtomicBool, AtomicUsize};
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::{Change, Timestamp},
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47 ContainerID, ContainerType, HasCounterSpan, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError,
48 LoroResult, LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 #[inline]
89 pub fn with_barrier<F, R>(&self, f: F) -> R
90 where
91 F: FnOnce() -> R,
92 {
93 let (options, guard) = self.implicit_commit_then_stop();
94 let result = f();
95 drop(guard);
96 self.renew_txn_if_auto_commit(options);
97 result
98 }
99
100 pub fn new() -> Self {
101 let visible_op_count = Arc::new(AtomicUsize::new(0));
102 let oplog = OpLog::new(visible_op_count.clone());
103 let arena = oplog.arena.clone();
104 let config: Configure = oplog.configure.clone();
105 let lock_group = LoroLockGroup::new();
106 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
107 let inner = Arc::new_cyclic(|w| {
108 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
109 LoroDocInner {
110 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
111 state,
112 config,
113 visible_op_count,
114 detached: AtomicBool::new(false),
115 auto_commit: AtomicBool::new(false),
116 observer: Arc::new(Observer::new(arena.clone())),
117 diff_calculator: Arc::new(
118 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
119 ),
120 txn: global_txn,
121 arena,
122 local_update_subs: SubscriberSetWithQueue::new(),
123 peer_id_change_subs: SubscriberSetWithQueue::new(),
124 pre_commit_subs: SubscriberSetWithQueue::new(),
125 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
126 }
127 });
128 LoroDoc { inner }
129 }
130
131 pub fn fork(&self) -> Self {
132 if self.is_detached() {
133 return self
134 .fork_at(&self.state_frontiers())
135 .expect("fork_at on detached doc should not fail");
136 }
137
138 let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
139 let doc = Self::new();
140 doc.with_barrier(|| {
141 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
142 })
143 .unwrap();
144 doc.set_config(&self.config);
145 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
146 doc.start_auto_commit();
147 }
148 doc
149 }
150 pub fn set_detached_editing(&self, enable: bool) {
166 self.config.set_detached_editing(enable);
167 if enable && self.is_detached() {
168 self.with_barrier(|| {
169 self.renew_peer_id();
170 });
171 }
172 }
173
174 #[inline]
176 pub fn new_auto_commit() -> Self {
177 let doc = Self::new();
178 doc.start_auto_commit();
179 doc
180 }
181
182 #[inline(always)]
183 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
184 if peer == PeerID::MAX {
185 return Err(LoroError::InvalidPeerID);
186 }
187 let next_id = self.oplog.lock().next_id(peer);
188 if self.auto_commit.load(Acquire) {
189 let doc_state = self.state.lock();
190 doc_state
191 .peer
192 .store(peer, std::sync::atomic::Ordering::Relaxed);
193
194 if doc_state.is_in_txn() {
195 drop(doc_state);
196 self.with_barrier(|| {});
198 }
199 self.peer_id_change_subs.emit(&(), next_id);
200 return Ok(());
201 }
202
203 let doc_state = self.state.lock();
204 if doc_state.is_in_txn() {
205 return Err(LoroError::TransactionError(
206 "Cannot change peer id during transaction"
207 .to_string()
208 .into_boxed_str(),
209 ));
210 }
211
212 doc_state
213 .peer
214 .store(peer, std::sync::atomic::Ordering::Relaxed);
215 drop(doc_state);
216 self.peer_id_change_subs.emit(&(), next_id);
217 Ok(())
218 }
219
220 pub(crate) fn renew_peer_id(&self) {
222 let mut peer_id = DefaultRandom.next_u64();
223 while peer_id == PeerID::MAX {
224 peer_id = DefaultRandom.next_u64();
225 }
226 self.set_peer_id(peer_id).unwrap();
227 }
228
229 #[inline]
241 #[must_use]
242 pub fn implicit_commit_then_stop(
243 &self,
244 ) -> (
245 Option<CommitOptions>,
246 LoroMutexGuard<'_, Option<Transaction>>,
247 ) {
248 let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
250 (a, b.unwrap())
251 }
252
253 #[inline]
258 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
259 self.commit_internal(CommitOptions::new().immediate_renew(true), false)
261 .0
262 }
263
264 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
269 let mut txn_guard = self.txn.lock();
270 let Some(txn) = txn_guard.as_mut() else {
271 return Some(txn_guard);
272 };
273
274 if txn.is_peer_first_appearance {
275 txn.is_peer_first_appearance = false;
276 drop(txn_guard);
277 self.first_commit_from_peer_subs.emit(
279 &(),
280 FirstCommitFromPeerPayload {
281 peer: self.peer_id(),
282 },
283 );
284 }
285
286 None
287 }
288
289 #[instrument(skip_all)]
297 fn commit_internal(
298 &self,
299 config: CommitOptions,
300 preserve_on_empty: bool,
301 ) -> (
302 Option<CommitOptions>,
303 Option<LoroMutexGuard<'_, Option<Transaction>>>,
304 ) {
305 if !self.auto_commit.load(Acquire) {
306 let txn_guard = self.txn.lock();
307 return (None, Some(txn_guard));
310 }
311
312 loop {
313 if let Some(txn_guard) = self.before_commit() {
314 return (None, Some(txn_guard));
315 }
316
317 let mut txn_guard = self.txn.lock();
318 let txn = txn_guard.take();
319 let Some(mut txn) = txn else {
320 return (None, Some(txn_guard));
321 };
322 let on_commit = txn.take_on_commit();
323 if let Some(origin) = config.origin.clone() {
324 txn.set_origin(origin);
325 }
326
327 if let Some(timestamp) = config.timestamp {
328 txn.set_timestamp(timestamp);
329 }
330
331 if let Some(msg) = config.commit_msg.as_ref() {
332 txn.set_msg(Some(msg.clone()));
333 }
334
335 let id_span = txn.id_span();
336 let mut options = txn.commit().unwrap();
337 if let Some(opts) = options.as_mut() {
339 if config.origin.is_some() {
341 opts.set_origin(None);
342 }
343 if !preserve_on_empty {
345 options = None;
346 }
347 }
348 if config.immediate_renew && self.can_edit() {
349 let mut t = self.txn().unwrap();
350 if let Some(options) = options.as_ref() {
351 t.set_options(options.clone());
352 }
353 *txn_guard = Some(t);
354 }
355
356 if let Some(on_commit) = on_commit {
357 drop(txn_guard);
358 on_commit(&self.state, &self.oplog, id_span);
359 txn_guard = self.txn.lock();
360 if !config.immediate_renew && txn_guard.is_some() {
361 continue;
363 }
364 }
365
366 return (
367 options,
368 if !config.immediate_renew {
369 Some(txn_guard)
370 } else {
371 None
372 },
373 );
374 }
375 }
376
377 #[instrument(skip_all)]
382 pub fn commit_with(
383 &self,
384 config: CommitOptions,
385 ) -> (
386 Option<CommitOptions>,
387 Option<LoroMutexGuard<'_, Option<Transaction>>>,
388 ) {
389 self.commit_internal(config, false)
390 }
391
392 pub fn set_next_commit_message(&self, message: &str) {
394 let mut binding = self.txn.lock();
395 let Some(txn) = binding.as_mut() else {
396 return;
397 };
398
399 if message.is_empty() {
400 txn.set_msg(None)
401 } else {
402 txn.set_msg(Some(message.into()))
403 }
404 }
405
406 pub fn set_next_commit_origin(&self, origin: &str) {
408 let mut txn = self.txn.lock();
409 if let Some(txn) = txn.as_mut() {
410 txn.set_origin(origin.into());
411 }
412 }
413
414 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
416 let mut txn = self.txn.lock();
417 if let Some(txn) = txn.as_mut() {
418 txn.set_timestamp(timestamp);
419 }
420 }
421
422 pub fn set_next_commit_options(&self, options: CommitOptions) {
424 let mut txn = self.txn.lock();
425 if let Some(txn) = txn.as_mut() {
426 txn.set_options(options);
427 }
428 }
429
430 pub fn clear_next_commit_options(&self) {
432 let mut txn = self.txn.lock();
433 if let Some(txn) = txn.as_mut() {
434 txn.set_options(CommitOptions::new());
435 }
436 }
437
438 #[inline]
449 pub fn set_record_timestamp(&self, record: bool) {
450 self.config.set_record_timestamp(record);
451 }
452
453 #[inline]
458 pub fn set_change_merge_interval(&self, interval: i64) {
459 self.config.set_merge_interval(interval);
460 }
461
462 pub fn can_edit(&self) -> bool {
463 !self.is_detached() || self.config.detached_editing()
464 }
465
466 pub fn is_detached_editing_enabled(&self) -> bool {
467 self.config.detached_editing()
468 }
469
470 #[inline]
471 pub fn config_text_style(&self, text_style: StyleConfigMap) {
472 self.config.text_style_config.write().map = text_style.map;
473 }
474
475 #[inline]
476 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
477 self.config.text_style_config.write().default_style = text_style;
478 }
479 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
480 let doc = Self::new();
481 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
482 if mode.is_snapshot() {
483 doc.with_barrier(|| -> Result<(), LoroError> {
484 decode_snapshot(&doc, mode, body, Default::default())?;
485 Ok(())
486 })?;
487 Ok(doc)
488 } else {
489 Err(LoroError::DecodeError(
490 "Invalid encode mode".to_string().into(),
491 ))
492 }
493 }
494
495 #[inline(always)]
497 pub fn can_reset_with_snapshot(&self) -> bool {
498 let oplog = self.oplog.lock();
499 if oplog.batch_importing {
500 return false;
501 }
502
503 if self.is_detached() {
504 return false;
505 }
506
507 oplog.is_empty() && self.state.lock().can_import_snapshot()
508 }
509
510 #[inline(always)]
516 pub fn is_detached(&self) -> bool {
517 self.detached.load(Acquire)
518 }
519
520 pub(crate) fn set_detached(&self, detached: bool) {
521 self.detached.store(detached, Release);
522 }
523
524 #[inline(always)]
525 pub fn peer_id(&self) -> PeerID {
526 self.state
527 .lock()
528 .peer
529 .load(std::sync::atomic::Ordering::Relaxed)
530 }
531
532 #[inline(always)]
533 pub fn detach(&self) {
534 self.with_barrier(|| self.set_detached(true));
535 }
536
537 #[inline(always)]
538 pub fn attach(&self) {
539 self.checkout_to_latest()
540 }
541
542 pub fn state_timestamp(&self) -> Timestamp {
545 let f = { self.state.lock().frontiers.clone() };
547 self.oplog.lock().get_timestamp_of_version(&f)
548 }
549
550 #[inline(always)]
551 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
552 &self.state
553 }
554
555 #[inline]
556 pub fn get_state_deep_value(&self) -> LoroValue {
557 self.state.lock().get_deep_value()
558 }
559
560 #[inline(always)]
561 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
562 &self.oplog
563 }
564
565 #[inline(always)]
566 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
567 let s = debug_span!("import", peer = self.peer_id());
568 let _e = s.enter();
569 self.import_with(bytes, Default::default())
570 }
571
572 #[inline]
573 pub fn import_with(
574 &self,
575 bytes: &[u8],
576 origin: InternalString,
577 ) -> Result<ImportStatus, LoroError> {
578 self.with_barrier(|| self._import_with(bytes, origin))
579 }
580
581 #[tracing::instrument(skip_all)]
582 fn _import_with(
583 &self,
584 bytes: &[u8],
585 origin: InternalString,
586 ) -> Result<ImportStatus, LoroError> {
587 ensure_cov::notify_cov("loro_internal::import");
588 let parsed = parse_header_and_body(bytes, true)?;
589 loro_common::info!("Importing with mode={:?}", &parsed.mode);
590 let result = match parsed.mode {
591 EncodeMode::OutdatedRle => {
592 if self.state.lock().is_in_txn() {
593 return Err(LoroError::ImportWhenInTxn);
594 }
595
596 let s = tracing::span!(
597 tracing::Level::INFO,
598 "Import updates ",
599 peer = self.peer_id()
600 );
601 let _e = s.enter();
602 self.update_oplog_and_apply_delta_to_state_if_needed(
603 |oplog| oplog.decode(parsed),
604 origin,
605 )
606 }
607 EncodeMode::OutdatedSnapshot => {
608 if self.can_reset_with_snapshot() {
609 loro_common::info!("Init by snapshot {}", self.peer_id());
610 decode_snapshot(self, parsed.mode, parsed.body, origin)
611 } else {
612 self.update_oplog_and_apply_delta_to_state_if_needed(
613 |oplog| oplog.decode(parsed),
614 origin,
615 )
616 }
617 }
618 EncodeMode::FastSnapshot => {
619 if self.can_reset_with_snapshot() {
620 ensure_cov::notify_cov("loro_internal::import::snapshot");
621 loro_common::info!("Init by fast snapshot {}", self.peer_id());
622 decode_snapshot(self, parsed.mode, parsed.body, origin)
623 } else {
624 self.import_changes_and_apply_delta_to_state_if_needed(
625 |oplog| encoding::decode_oplog_changes(oplog, parsed),
626 origin,
627 )
628
629 }
634 }
635 EncodeMode::FastUpdates => self.import_changes_and_apply_delta_to_state_if_needed(
636 |oplog| encoding::decode_oplog_changes(oplog, parsed),
637 origin,
638 ),
639 EncodeMode::Auto => {
640 unreachable!()
641 }
642 };
643
644 self.emit_events();
645
646 result
647 }
648
649 #[tracing::instrument(skip_all)]
650 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
651 &self,
652 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
653 origin: InternalString,
654 ) -> Result<ImportStatus, LoroError> {
655 let mut oplog = self.oplog.lock();
656 oplog.begin_import_rollback();
657 if !self.is_detached() {
658 let old_vv = oplog.vv().clone();
659 let old_frontiers = oplog.frontiers().clone();
660 let result = f(&mut oplog);
661 if &old_vv != oplog.vv() {
662 let mut diff = DiffCalculator::new(false);
663 let (diff, diff_mode) = diff.calc_diff_internal(
664 &oplog,
665 &old_vv,
666 &old_frontiers,
667 oplog.vv(),
668 oplog.dag.get_frontiers(),
669 None,
670 );
671 let mut state = self.state.lock();
672 if let Err(e) = state.apply_diff(
673 InternalDocDiff {
674 origin,
675 diff: (diff).into(),
676 by: EventTriggerKind::Import,
677 new_version: Cow::Owned(oplog.frontiers().clone()),
678 },
679 diff_mode,
680 ) {
681 oplog.rollback_import();
682 return Err(e);
683 }
684 }
685 match result {
686 Ok(result) => {
687 oplog.commit_import_rollback();
688 Ok(result)
689 }
690 Err(e) => {
691 if &old_vv == oplog.vv() {
696 oplog.rollback_import();
697 } else {
698 oplog.commit_import_rollback();
699 }
700 Err(e)
701 }
702 }
703 } else {
704 match f(&mut oplog) {
705 Ok(result) => {
706 oplog.commit_import_rollback();
707 Ok(result)
708 }
709 Err(e) => {
710 oplog.rollback_import();
711 Err(e)
712 }
713 }
714 }
715 }
716
717 #[tracing::instrument(skip_all)]
718 pub(crate) fn import_changes_and_apply_delta_to_state_if_needed(
719 &self,
720 decode_changes: impl FnOnce(&mut OpLog) -> Result<Vec<Change>, LoroError>,
721 origin: InternalString,
722 ) -> Result<ImportStatus, LoroError> {
723 let mut oplog = self.oplog.lock();
724 let arena_checkpoint = oplog.arena.checkpoint_for_rollback();
725 let changes = match decode_changes(&mut oplog) {
726 Ok(changes) => changes,
727 Err(e) => {
728 oplog.arena.rollback(arena_checkpoint);
729 return Err(e);
730 }
731 };
732
733 let preflight = oplog.preflight_import_changes(&changes);
734 if preflight.has_deps_before_shallow_root
735 && (self.is_detached() || !preflight.applies_to_dag)
736 {
737 oplog.arena.rollback(arena_checkpoint);
738 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
739 }
740
741 if self.is_detached() {
742 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
743 if result.has_deps_before_shallow_root {
744 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
745 }
746
747 return Ok(result.status);
748 }
749
750 if !preflight.applies_to_dag {
751 let pending_root_containers = pending_root_containers_to_materialize(&oplog, &changes);
752 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
753 if result.has_deps_before_shallow_root {
754 oplog.arena.rollback(arena_checkpoint);
755 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
756 }
757
758 if !pending_root_containers.is_empty() {
759 let mut state = self.state.lock();
760 for id in pending_root_containers {
761 state.ensure_container(&id);
762 }
763 }
764
765 return Ok(result.status);
766 }
767
768 let old_vv = oplog.vv().clone();
769 let old_frontiers = oplog.frontiers().clone();
770 let rollback_enabled = preflight.needs_state_apply_rollback;
771 if rollback_enabled {
772 oplog.begin_import_rollback_with_arena(arena_checkpoint);
773 }
774
775 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
776 if &old_vv != oplog.vv() {
777 let mut diff = DiffCalculator::new(false);
778 let (diff, diff_mode) = diff.calc_diff_internal(
779 &oplog,
780 &old_vv,
781 &old_frontiers,
782 oplog.vv(),
783 oplog.dag.get_frontiers(),
784 None,
785 );
786 let mut state = self.state.lock();
787 if let Err(e) = state.apply_diff(
788 InternalDocDiff {
789 origin,
790 diff: (diff).into(),
791 by: EventTriggerKind::Import,
792 new_version: Cow::Owned(oplog.frontiers().clone()),
793 },
794 diff_mode,
795 ) {
796 if rollback_enabled {
797 oplog.rollback_import();
798 return Err(e);
799 }
800
801 panic!("state apply returned Err for import without rollback guard: {e}");
802 }
803 }
804
805 if result.has_deps_before_shallow_root {
806 if rollback_enabled {
807 oplog.commit_import_rollback();
808 }
809 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
810 }
811
812 if rollback_enabled {
813 oplog.commit_import_rollback();
814 }
815 Ok(result.status)
816 }
817
818 fn emit_events(&self) {
819 let events = {
821 let mut state = self.state.lock();
822 state.take_events()
823 };
824 for event in events {
825 self.observer.emit(event);
826 }
827 }
828
829 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
830 let mut state = self.state.lock();
831 state.take_events()
832 }
833
834 #[tracing::instrument(skip_all)]
838 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
839 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
840 self.with_barrier(|| {
841 let result = self.import_changes_and_apply_delta_to_state_if_needed(
842 |oplog| crate::encoding::json_schema::decode_json_changes(json, &oplog.arena),
843 Default::default(),
844 );
845 self.emit_events();
846 result
847 })
848 }
849
850 pub fn export_json_updates(
851 &self,
852 start_vv: &VersionVector,
853 end_vv: &VersionVector,
854 with_peer_compression: bool,
855 ) -> JsonSchema {
856 self.with_barrier(|| {
857 let oplog = self.oplog.lock();
858 let mut start_vv = start_vv;
859 let _temp: Option<VersionVector>;
860 if !oplog.dag.shallow_since_vv().is_empty() {
861 let mut include_all = true;
863 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
864 if start_vv.get(peer).unwrap_or(&0) < counter {
865 include_all = false;
866 break;
867 }
868 }
869 if !include_all {
870 let mut vv = start_vv.clone();
871 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
872 vv.extend_to_include_end_id(ID::new(peer, counter));
873 }
874 _temp = Some(vv);
875 start_vv = _temp.as_ref().unwrap();
876 }
877 }
878
879 crate::encoding::json_schema::export_json(
880 &oplog,
881 start_vv,
882 end_vv,
883 with_peer_compression,
884 )
885 })
886 }
887
888 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
889 let oplog = self.oplog.lock();
890 let mut changes = export_json_in_id_span(&oplog, id_span);
891 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
892 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
893 changes.push(change_json);
894 }
895 changes
896 }
897
898 #[inline]
900 pub fn oplog_vv(&self) -> VersionVector {
901 self.oplog.lock().vv().clone()
902 }
903
904 #[inline]
906 pub fn state_vv(&self) -> VersionVector {
907 let oplog = self.oplog.lock();
908 let f = &self.state.lock().frontiers;
909 oplog.dag.frontiers_to_vv(f).unwrap()
910 }
911
912 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
913 let value: LoroValue = self.state.lock().get_value_by_path(path)?;
914 if let LoroValue::Container(c) = value {
915 Some(ValueOrHandler::Handler(Handler::new_attached(
916 c.clone(),
917 self.clone(),
918 )))
919 } else {
920 Some(ValueOrHandler::Value(value))
921 }
922 }
923
924 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
926 let path = str_to_path(path)?;
927 self.get_by_path(&path)
928 }
929
930 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
931 let arena = &self.arena;
932 let txn = self.txn.lock();
933 let txn = txn.as_ref()?;
934 let ops_ = txn.local_ops();
935 let new_id = ID {
936 peer: *txn.peer(),
937 counter: ops_.first()?.counter,
938 };
939 let change = ChangeRef {
940 id: &new_id,
941 deps: txn.frontiers(),
942 timestamp: &txn
943 .timestamp()
944 .as_ref()
945 .copied()
946 .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
947 commit_msg: txn.msg(),
948 ops: ops_,
949 lamport: txn.lamport(),
950 };
951 let json = encode_change_to_json(change, arena);
952 Some(json)
953 }
954
955 #[inline]
956 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
957 if self.has_container(&id) {
958 self.ensure_root_container(&id);
959 Some(Handler::new_attached(id, self.clone()))
960 } else {
961 None
962 }
963 }
964
965 #[inline]
966 fn ensure_root_container(&self, id: &ContainerID) {
967 if id.is_root() {
968 self.state.lock().ensure_container(id);
969 }
970 }
971
972 #[inline]
975 pub fn try_get_text<I: IntoContainerId>(&self, id: I) -> Option<TextHandler> {
976 let id = id.into_container_id(&self.arena, ContainerType::Text);
977 if !self.has_container(&id) {
978 return None;
979 }
980 self.ensure_root_container(&id);
981 Handler::new_attached(id, self.clone()).into_text().ok()
982 }
983
984 #[inline]
987 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
988 self.try_get_text(id)
989 .expect("The container does not exist in the document. Use `try_get_text` or `get_container` to check for existence.")
990 }
991
992 #[inline]
995 pub fn try_get_list<I: IntoContainerId>(&self, id: I) -> Option<ListHandler> {
996 let id = id.into_container_id(&self.arena, ContainerType::List);
997 if !self.has_container(&id) {
998 return None;
999 }
1000 self.ensure_root_container(&id);
1001 Handler::new_attached(id, self.clone()).into_list().ok()
1002 }
1003
1004 #[inline]
1007 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
1008 self.try_get_list(id)
1009 .expect("The container does not exist in the document. Use `try_get_list` or `get_container` to check for existence.")
1010 }
1011
1012 #[inline]
1015 pub fn try_get_movable_list<I: IntoContainerId>(&self, id: I) -> Option<MovableListHandler> {
1016 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
1017 if !self.has_container(&id) {
1018 return None;
1019 }
1020 self.ensure_root_container(&id);
1021 Handler::new_attached(id, self.clone())
1022 .into_movable_list()
1023 .ok()
1024 }
1025
1026 #[inline]
1029 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
1030 self.try_get_movable_list(id)
1031 .expect("The container does not exist in the document. Use `try_get_movable_list` or `get_container` to check for existence.")
1032 }
1033
1034 #[inline]
1037 pub fn try_get_map<I: IntoContainerId>(&self, id: I) -> Option<MapHandler> {
1038 let id = id.into_container_id(&self.arena, ContainerType::Map);
1039 if !self.has_container(&id) {
1040 return None;
1041 }
1042 self.ensure_root_container(&id);
1043 Handler::new_attached(id, self.clone()).into_map().ok()
1044 }
1045
1046 #[inline]
1049 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
1050 self.try_get_map(id)
1051 .expect("The container does not exist in the document. Use `try_get_map` or `get_container` to check for existence.")
1052 }
1053
1054 #[inline]
1057 pub fn try_get_tree<I: IntoContainerId>(&self, id: I) -> Option<TreeHandler> {
1058 let id = id.into_container_id(&self.arena, ContainerType::Tree);
1059 if !self.has_container(&id) {
1060 return None;
1061 }
1062 self.ensure_root_container(&id);
1063 Handler::new_attached(id, self.clone()).into_tree().ok()
1064 }
1065
1066 #[inline]
1069 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
1070 self.try_get_tree(id)
1071 .expect("The container does not exist in the document. Use `try_get_tree` or `get_container` to check for existence.")
1072 }
1073
1074 #[cfg(feature = "counter")]
1075 pub fn try_get_counter<I: IntoContainerId>(
1076 &self,
1077 id: I,
1078 ) -> Option<crate::handler::counter::CounterHandler> {
1079 let id = id.into_container_id(&self.arena, ContainerType::Counter);
1080 if !self.has_container(&id) {
1081 return None;
1082 }
1083 self.ensure_root_container(&id);
1084 Handler::new_attached(id, self.clone()).into_counter().ok()
1085 }
1086
1087 #[cfg(feature = "counter")]
1088 pub fn get_counter<I: IntoContainerId>(
1089 &self,
1090 id: I,
1091 ) -> crate::handler::counter::CounterHandler {
1092 self.try_get_counter(id)
1093 .expect("The container does not exist in the document. Use `try_get_counter` or `get_container` to check for existence.")
1094 }
1095
1096 #[must_use]
1097 pub fn has_container(&self, id: &ContainerID) -> bool {
1098 if id.is_root() {
1099 return true;
1100 }
1101
1102 let exist = self.state.lock().does_container_exist(id);
1103 exist
1104 }
1105
1106 #[instrument(level = "info", skip_all)]
1120 pub fn undo_internal(
1121 &self,
1122 id_span: IdSpan,
1123 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1124 post_transform_base: Option<&DiffBatch>,
1125 before_diff: &mut dyn FnMut(&DiffBatch),
1126 ) -> LoroResult<CommitWhenDrop<'_>> {
1127 if !self.can_edit() {
1128 return Err(LoroError::EditWhenDetached);
1129 }
1130
1131 let (options, txn) = self.implicit_commit_then_stop();
1132 if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
1133 self.renew_txn_if_auto_commit(options);
1134 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
1135 }
1136
1137 let (was_recording, latest_frontiers) = {
1138 let mut state = self.state.lock();
1139 let was_recording = state.is_recording();
1140 state.stop_and_clear_recording();
1141 (was_recording, state.frontiers.clone())
1142 };
1143
1144 let spans = self.oplog.lock().split_span_based_on_deps(id_span);
1145 let diff = crate::undo::undo(
1146 spans,
1147 match post_transform_base {
1148 Some(d) => Either::Right(d),
1149 None => Either::Left(&latest_frontiers),
1150 },
1151 |from, to| {
1152 self._checkout_without_emitting(from, false, false).unwrap();
1153 self.state.lock().start_recording();
1154 self._checkout_without_emitting(to, false, false).unwrap();
1155 let mut state = self.state.lock();
1156 let e = state.take_events();
1157 state.stop_and_clear_recording();
1158 DiffBatch::new(e)
1159 },
1160 before_diff,
1161 );
1162
1163 self._checkout_without_emitting(&latest_frontiers, false, false)?;
1167 self.set_detached(false);
1168 if was_recording {
1169 self.state.lock().start_recording();
1170 }
1171 drop(txn);
1172 self.start_auto_commit();
1173 if let Err(e) = self._apply_diff(diff, container_remap, true) {
1177 warn!("Undo Failed {:?}", e);
1178 }
1179
1180 if let Some(options) = options {
1181 self.set_next_commit_options(options);
1182 }
1183 Ok(CommitWhenDrop {
1184 doc: self,
1185 default_options: CommitOptions::new().origin("undo"),
1186 })
1187 }
1188
1189 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1195 let f = self.state_frontiers();
1198 let diff = self.diff(&f, target)?;
1199 self._apply_diff(diff, &mut Default::default(), false)
1200 }
1201
1202 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1207 {
1208 let oplog = self.oplog.lock();
1211 let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1212 for id in frontiers.iter() {
1213 if !oplog.dag.contains(id) {
1214 return Err(LoroError::FrontiersNotFound(id));
1215 }
1216 }
1217
1218 if oplog.dag.is_before_shallow_root(frontiers) {
1219 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1220 }
1221
1222 Ok(())
1223 };
1224
1225 validate_frontiers(a)?;
1226 validate_frontiers(b)?;
1227 }
1228
1229 let (options, txn) = self.implicit_commit_then_stop();
1230 let was_detached = self.is_detached();
1231 let old_frontiers = self.state_frontiers();
1232 let was_recording = {
1233 let mut state = self.state.lock();
1234 let is_recording = state.is_recording();
1235 state.stop_and_clear_recording();
1236 is_recording
1237 };
1238 let result = (|| {
1239 self._checkout_without_emitting(a, true, false)?;
1240 self.state.lock().start_recording();
1241 self._checkout_without_emitting(b, true, false)?;
1242 let mut state = self.state.lock();
1243 let e = state.take_events();
1244 state.stop_and_clear_recording();
1245 Ok::<_, LoroError>(e)
1246 })();
1247
1248 self._checkout_without_emitting(&old_frontiers, false, false)
1250 .unwrap();
1251 drop(txn);
1252 if !was_detached {
1253 self.set_detached(false);
1254 self.renew_txn_if_auto_commit(options);
1255 }
1256 if was_recording {
1257 self.state.lock().start_recording();
1258 }
1259 result.map(DiffBatch::new)
1260 }
1261
1262 #[inline(always)]
1264 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1265 self._apply_diff(diff, &mut Default::default(), true)
1266 }
1267
1268 pub(crate) fn _apply_diff(
1280 &self,
1281 diff: DiffBatch,
1282 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1283 skip_unreachable: bool,
1284 ) -> LoroResult<()> {
1285 if !self.can_edit() {
1286 return Err(LoroError::EditWhenDetached);
1287 }
1288
1289 let mut ans: LoroResult<()> = Ok(());
1290 let mut missing_containers: Vec<ContainerID> = Vec::new();
1291 for (mut id, diff) in diff.into_iter() {
1292 let mut remapped = false;
1293 while let Some(rid) = container_remap.get(&id) {
1294 remapped = true;
1295 id = rid.clone();
1296 }
1297
1298 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1299 let exists = self.state.lock().does_container_exist(&id);
1301 if !exists {
1302 missing_containers.push(id);
1303 continue;
1304 }
1305 self.state.lock().ensure_container(&id);
1307 }
1308
1309 if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1310 continue;
1311 }
1312
1313 let Some(h) = self.get_handler(id.clone()) else {
1314 return Err(LoroError::ContainersNotFound {
1315 containers: Box::new(vec![id]),
1316 });
1317 };
1318 if let Err(e) = h.apply_diff(diff, container_remap) {
1319 ans = Err(e);
1320 }
1321 }
1322
1323 if !missing_containers.is_empty() {
1324 return Err(LoroError::ContainersNotFound {
1325 containers: Box::new(missing_containers),
1326 });
1327 }
1328
1329 ans
1330 }
1331
1332 #[inline]
1334 pub fn diagnose_size(&self) {
1335 self.oplog().lock().diagnose_size();
1336 }
1337
1338 #[inline]
1339 pub fn oplog_frontiers(&self) -> Frontiers {
1340 self.oplog().lock().frontiers().clone()
1341 }
1342
1343 #[inline]
1344 pub fn state_frontiers(&self) -> Frontiers {
1345 self.state.lock().frontiers.clone()
1346 }
1347
1348 #[inline]
1352 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1353 self.oplog().lock().cmp_with_frontiers(other)
1354 }
1355
1356 #[inline]
1360 pub fn cmp_frontiers(
1361 &self,
1362 a: &Frontiers,
1363 b: &Frontiers,
1364 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1365 self.oplog().lock().cmp_frontiers(a, b)
1366 }
1367
1368 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1369 let mut state = self.state.lock();
1370 if !state.is_recording() {
1371 state.start_recording();
1372 }
1373
1374 self.observer.subscribe_root(callback)
1375 }
1376
1377 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1378 let mut state = self.state.lock();
1379 if !state.is_recording() {
1380 state.start_recording();
1381 }
1382
1383 self.observer.subscribe(container_id, callback)
1384 }
1385
1386 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1387 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1388 activate();
1389 sub
1390 }
1391
1392 #[tracing::instrument(skip_all)]
1394 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1395 if bytes.is_empty() {
1396 return Ok(ImportStatus::default());
1397 }
1398
1399 if bytes.len() == 1 {
1400 return self.import(&bytes[0]);
1401 }
1402
1403 let mut success = VersionRange::default();
1404 let mut meta_arr = bytes
1405 .iter()
1406 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1407 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1408 meta_arr.sort_by(|a, b| {
1409 a.0.mode
1410 .cmp(&b.0.mode)
1411 .then(b.0.change_num.cmp(&a.0.change_num))
1412 });
1413
1414 let (options, txn) = self.implicit_commit_then_stop();
1415 let is_detached = self.is_detached();
1439 self.set_detached(true);
1440 self.oplog.lock().batch_importing = true;
1441 let mut err = None;
1442 for (_meta, data) in meta_arr {
1443 match self._import_with(data, Default::default()) {
1444 Ok(s) => {
1445 for (peer, (start, end)) in s.success.iter() {
1446 match success.0.entry(*peer) {
1447 Entry::Occupied(mut e) => {
1448 e.get_mut().1 = *end.max(&e.get().1);
1449 }
1450 Entry::Vacant(e) => {
1451 e.insert((*start, *end));
1452 }
1453 }
1454 }
1455 }
1456 Err(e) => {
1457 err = Some(e);
1458 }
1459 }
1460 }
1461
1462 let mut oplog = self.oplog.lock();
1463 oplog.batch_importing = false;
1464 let pending = oplog.pending_changes.version_range();
1465 drop(oplog);
1466 if !is_detached {
1467 self._checkout_to_latest_with_guard(txn);
1468 } else {
1469 drop(txn);
1470 }
1471
1472 self.renew_txn_if_auto_commit(options);
1473 if let Some(err) = err {
1474 return Err(err);
1475 }
1476
1477 Ok(ImportStatus {
1478 success,
1479 pending: if pending.is_empty() {
1480 None
1481 } else {
1482 Some(pending)
1483 },
1484 })
1485 }
1486
1487 #[inline]
1489 pub fn get_value(&self) -> LoroValue {
1490 self.state.lock().get_value()
1491 }
1492
1493 #[inline]
1495 pub fn get_deep_value(&self) -> LoroValue {
1496 self.state.lock().get_deep_value()
1497 }
1498
1499 #[inline]
1501 pub fn get_deep_value_with_id(&self) -> LoroValue {
1502 self.state.lock().get_deep_value_with_id()
1503 }
1504
1505 pub fn checkout_to_latest(&self) {
1506 let (options, _guard) = self.implicit_commit_then_stop();
1507 if !self.is_detached() {
1508 drop(_guard);
1509 self.renew_txn_if_auto_commit(options);
1510 return;
1511 }
1512
1513 self._checkout_to_latest_without_commit(true)
1514 .expect("checkout to oplog frontiers should succeed");
1515 self.emit_events();
1516 drop(_guard);
1517 self.renew_txn_if_auto_commit(options);
1518 }
1519
1520 fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1521 if !self.is_detached() {
1522 self._renew_txn_if_auto_commit_with_guard(None, guard);
1523 return;
1524 }
1525
1526 self._checkout_to_latest_without_commit(true)
1527 .expect("checkout to oplog frontiers should succeed");
1528 self._renew_txn_if_auto_commit_with_guard(None, guard);
1529 }
1530
1531 pub(crate) fn _checkout_to_latest_without_commit(
1533 &self,
1534 to_commit_then_renew: bool,
1535 ) -> LoroResult<()> {
1536 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1537 let f = self.oplog_frontiers();
1538 let this = &self;
1539 let frontiers = &f;
1540 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)?;
1541 this.emit_events();
1543 if this.config.detached_editing() {
1544 this.renew_peer_id();
1545 }
1546
1547 self.set_detached(false);
1548 Ok(())
1549 })
1550 }
1551
1552 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1557 let was_detached = self.is_detached();
1558 let (options, guard) = self.implicit_commit_then_stop();
1559 let result = self._checkout_without_emitting(frontiers, true, true);
1560 if result.is_ok() {
1561 self.emit_events();
1562 }
1563 drop(guard);
1564 if self.config.detached_editing() {
1565 if result.is_ok() {
1566 self.renew_peer_id();
1567 }
1568 self.renew_txn_if_auto_commit(options);
1569 } else if result.is_err() {
1570 if !was_detached {
1571 self.renew_txn_if_auto_commit(options);
1572 }
1573 } else if !self.is_detached() {
1574 self.renew_txn_if_auto_commit(options);
1575 }
1576
1577 result
1578 }
1579
1580 #[instrument(level = "info", skip(self))]
1582 pub(crate) fn _checkout_without_emitting(
1583 &self,
1584 frontiers: &Frontiers,
1585 to_shrink_frontiers: bool,
1586 to_commit_then_renew: bool,
1587 ) -> Result<(), LoroError> {
1588 if !self.txn.is_locked() {
1589 return Err(LoroError::TransactionError(
1590 "checkout requires the transaction mutex to be held"
1591 .to_string()
1592 .into_boxed_str(),
1593 ));
1594 }
1595 let from_frontiers = self.state_frontiers();
1596 loro_common::info!(
1597 "checkout from={:?} to={:?} cur_vv={:?}",
1598 from_frontiers,
1599 frontiers,
1600 self.oplog_vv()
1601 );
1602
1603 if &from_frontiers == frontiers {
1604 return Ok(());
1605 }
1606
1607 let oplog = self.oplog.lock();
1608 if oplog.dag.is_before_shallow_root(frontiers) {
1609 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1610 }
1611
1612 let frontiers = if to_shrink_frontiers {
1613 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1614 } else {
1615 frontiers.clone()
1616 };
1617
1618 if from_frontiers == frontiers {
1619 return Ok(());
1620 }
1621
1622 let mut state = self.state.lock();
1623 let mut calc = self.diff_calculator.lock();
1624 for i in frontiers.iter() {
1625 if !oplog.dag.contains(i) {
1626 return Err(LoroError::FrontiersNotFound(i));
1627 }
1628 }
1629
1630 let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1631 LoroError::NotFoundError(
1632 format!(
1633 "Cannot find the current state version {:?}",
1634 state.frontiers
1635 )
1636 .into_boxed_str(),
1637 )
1638 })?;
1639 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1640 return Err(LoroError::NotFoundError(
1641 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1642 ));
1643 };
1644
1645 self.set_detached(true);
1646 let (diff, diff_mode) =
1647 calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1648 state.apply_diff(
1649 InternalDocDiff {
1650 origin: "checkout".into(),
1651 diff: Cow::Owned(diff),
1652 by: EventTriggerKind::Checkout,
1653 new_version: Cow::Owned(frontiers.clone()),
1654 },
1655 diff_mode,
1656 )?;
1657
1658 Ok(())
1659 }
1660
1661 #[inline]
1662 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1663 self.oplog.lock().dag.vv_to_frontiers(vv)
1664 }
1665
1666 #[inline]
1667 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1668 self.oplog.lock().dag.frontiers_to_vv(frontiers)
1669 }
1670
1671 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1675 let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1676 self.import(&updates)
1677 }
1678
1679 pub(crate) fn arena(&self) -> &SharedArena {
1680 &self.arena
1681 }
1682
1683 #[inline]
1684 pub fn len_ops(&self) -> usize {
1685 if self.oplog.can_lock_in_this_thread() {
1686 return self.oplog.lock().visible_op_count_exact();
1687 }
1688
1689 self.visible_op_count.load(Acquire)
1690 }
1691
1692 #[inline]
1693 pub fn len_changes(&self) -> usize {
1694 let oplog = self.oplog.lock();
1695 oplog.len_changes()
1696 }
1697
1698 pub fn config(&self) -> &Configure {
1699 &self.config
1700 }
1701
1702 pub fn check_state_diff_calc_consistency_slow(&self) {
1707 {
1709 static IS_CHECKING: std::sync::atomic::AtomicBool =
1710 std::sync::atomic::AtomicBool::new(false);
1711 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1712 return;
1713 }
1714
1715 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1716 let peer_id = self.peer_id();
1717 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1718 let _g = s.enter();
1719 let options = self.implicit_commit_then_stop().0;
1720 self.oplog.lock().check_dag_correctness();
1721 if self.is_shallow() {
1722 let initial_snapshot = self
1733 .export(ExportMode::state_only(Some(
1734 &self.shallow_since_frontiers(),
1735 )))
1736 .unwrap();
1737
1738 let doc = LoroDoc::new();
1740 doc.import(&initial_snapshot).unwrap();
1741 self.checkout(&self.shallow_since_frontiers()).unwrap();
1742 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1743
1744 let updates = self.export(ExportMode::all_updates()).unwrap();
1746
1747 doc.import(&updates).unwrap();
1749 self.checkout_to_latest();
1750
1751 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1754 let mut calculated_state = doc.app_state().lock();
1755 let mut current_state = self.app_state().lock();
1756 current_state.check_is_the_same(&mut calculated_state);
1757 } else {
1758 let f = self.state_frontiers();
1759 let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1760 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1761 let doc = Self::new();
1762 doc.import(&bytes).unwrap();
1763 let mut calculated_state = doc.app_state().lock();
1764 let mut current_state = self.app_state().lock();
1765 current_state.check_is_the_same(&mut calculated_state);
1766 }
1767
1768 self.renew_txn_if_auto_commit(options);
1769 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1770 }
1771 }
1772
1773 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1774 self.query_pos_internal(pos, true)
1775 }
1776
1777 pub(crate) fn query_pos_internal(
1779 &self,
1780 pos: &Cursor,
1781 ret_event_index: bool,
1782 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1783 if !self.has_container(&pos.container) {
1784 return Err(CannotFindRelativePosition::IdNotFound);
1785 }
1786
1787 let mut state = self.state.lock();
1788 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1789 Ok(PosQueryResult {
1790 update: None,
1791 current: AbsolutePosition {
1792 pos: ans,
1793 side: pos.side,
1794 },
1795 })
1796 } else {
1797 drop(state);
1809 let result = self.with_barrier(|| {
1810 let oplog = self.oplog().lock();
1811 if let Some(id) = pos.id {
1813 if oplog.arena.id_to_idx(&pos.container).is_none() {
1815 let mut s = self.state.lock();
1816 if !s.does_container_exist(&pos.container) {
1817 return Err(CannotFindRelativePosition::ContainerDeleted);
1818 }
1819 s.ensure_container(&pos.container);
1820 drop(s);
1821 }
1822 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1823 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1825 if oplog.shallow_since_vv().includes_id(id) {
1826 return Err(CannotFindRelativePosition::HistoryCleared);
1827 }
1828
1829 tracing::error!("Cannot find id {}", id);
1830 return Err(CannotFindRelativePosition::IdNotFound);
1831 };
1832 let mut diff_calc = DiffCalculator::new(true);
1834 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1835 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1836 diff_calc.calc_diff_internal(
1838 &oplog,
1839 before,
1840 &before_frontiers,
1841 oplog.vv(),
1842 oplog.frontiers(),
1843 Some(&|target| idx == target),
1844 );
1845 let depth = self.arena.get_depth(idx);
1847 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1848 match diff_calc {
1849 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1850 let c = text.get_id_latest_pos(id).unwrap();
1851 let new_pos = c.pos;
1852 let handler = self.get_text(&pos.container);
1853 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1854 Ok(PosQueryResult {
1855 update: handler.get_cursor(current_pos, c.side),
1856 current: AbsolutePosition {
1857 pos: current_pos,
1858 side: c.side,
1859 },
1860 })
1861 }
1862 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1863 let c = list.get_id_latest_pos(id).unwrap();
1864 let new_pos = c.pos;
1865 let handler = self.get_list(&pos.container);
1866 Ok(PosQueryResult {
1867 update: handler.get_cursor(new_pos, c.side),
1868 current: AbsolutePosition {
1869 pos: new_pos,
1870 side: c.side,
1871 },
1872 })
1873 }
1874 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1875 let c = list.get_id_latest_pos(id).unwrap();
1876 let new_pos = c.pos;
1877 let handler = self.get_movable_list(&pos.container);
1878 let new_pos = handler.op_pos_to_user_pos(new_pos);
1879 Ok(PosQueryResult {
1880 update: handler.get_cursor(new_pos, c.side),
1881 current: AbsolutePosition {
1882 pos: new_pos,
1883 side: c.side,
1884 },
1885 })
1886 }
1887 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1888 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1889 #[cfg(feature = "counter")]
1890 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1891 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1892 }
1893 } else {
1894 match pos.container.container_type() {
1895 ContainerType::Text => {
1896 let text = self.get_text(&pos.container);
1897 Ok(PosQueryResult {
1898 update: Some(Cursor {
1899 id: None,
1900 container: text.id(),
1901 side: pos.side,
1902 origin_pos: text.len_unicode(),
1903 }),
1904 current: AbsolutePosition {
1905 pos: text.len_event(),
1906 side: pos.side,
1907 },
1908 })
1909 }
1910 ContainerType::List => {
1911 let list = self.get_list(&pos.container);
1912 Ok(PosQueryResult {
1913 update: Some(Cursor {
1914 id: None,
1915 container: list.id(),
1916 side: pos.side,
1917 origin_pos: list.len(),
1918 }),
1919 current: AbsolutePosition {
1920 pos: list.len(),
1921 side: pos.side,
1922 },
1923 })
1924 }
1925 ContainerType::MovableList => {
1926 let list = self.get_movable_list(&pos.container);
1927 Ok(PosQueryResult {
1928 update: Some(Cursor {
1929 id: None,
1930 container: list.id(),
1931 side: pos.side,
1932 origin_pos: list.len(),
1933 }),
1934 current: AbsolutePosition {
1935 pos: list.len(),
1936 side: pos.side,
1937 },
1938 })
1939 }
1940 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1941 unreachable!()
1942 }
1943 #[cfg(feature = "counter")]
1944 ContainerType::Counter => unreachable!(),
1945 }
1946 }
1947 });
1948 result
1949 }
1950 }
1951
1952 pub fn free_history_cache(&self) {
1957 self.oplog.lock().free_history_cache();
1958 }
1959
1960 pub fn free_diff_calculator(&self) {
1962 *self.diff_calculator.lock() = DiffCalculator::new(true);
1963 }
1964
1965 pub fn has_history_cache(&self) -> bool {
1968 self.oplog.lock().has_history_cache()
1969 }
1970
1971 #[inline]
1975 pub fn compact_change_store(&self) {
1976 self.with_barrier(|| {
1977 self.oplog.lock().compact_change_store();
1978 });
1979 }
1980
1981 #[inline]
1985 pub fn analyze(&self) -> DocAnalysis {
1986 DocAnalysis::analyze(self)
1987 }
1988
1989 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1991 let mut state = self.state.lock();
1992 if state.arena.id_to_idx(id).is_none() {
1993 if !state.does_container_exist(id) {
1994 return None;
1995 }
1996 state.ensure_container(id);
1997 }
1998 let idx = state.arena.id_to_idx(id).unwrap();
1999 state.get_path(idx)
2000 }
2001
2002 #[instrument(skip(self))]
2003 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
2004 self.with_barrier(|| {
2005 let ans = match mode {
2006 ExportMode::Snapshot => export_fast_snapshot(self),
2007 ExportMode::Updates { from } => export_fast_updates(self, &from),
2008 ExportMode::UpdatesInRange { spans } => {
2009 export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
2010 }
2011 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
2012 ExportMode::StateOnly(f) => match f {
2013 Some(f) => export_state_only_snapshot(self, &f)?,
2014 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
2015 },
2016 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
2017 };
2018 Ok(ans)
2019 })
2020 }
2021
2022 pub fn shallow_since_vv(&self) -> ImVersionVector {
2028 self.oplog().lock().shallow_since_vv().clone()
2029 }
2030
2031 pub fn shallow_since_frontiers(&self) -> Frontiers {
2032 self.oplog().lock().shallow_since_frontiers().clone()
2033 }
2034
2035 pub fn is_shallow(&self) -> bool {
2037 !self.oplog().lock().shallow_since_vv().is_empty()
2038 }
2039
2040 pub fn get_pending_txn_len(&self) -> usize {
2045 if let Some(txn) = self.txn.lock().as_ref() {
2046 txn.len()
2047 } else {
2048 0
2049 }
2050 }
2051
2052 #[inline]
2053 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
2054 self.oplog().lock().dag.find_path(from, to)
2055 }
2056
2057 pub fn subscribe_first_commit_from_peer(
2063 &self,
2064 callback: FirstCommitFromPeerCallback,
2065 ) -> Subscription {
2066 let (s, enable) = self
2067 .first_commit_from_peer_subs
2068 .inner()
2069 .insert((), callback);
2070 enable();
2071 s
2072 }
2073
2074 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
2079 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
2080 enable();
2081 s
2082 }
2083}
2084
2085fn pending_root_containers_to_materialize(oplog: &OpLog, changes: &[Change]) -> Vec<ContainerID> {
2086 let mut roots = FxHashSet::default();
2087 for change in changes {
2088 if change.ctr_end() <= oplog.vv().get(&change.id.peer).copied().unwrap_or(0) {
2089 continue;
2090 }
2091
2092 if oplog.dag.is_before_shallow_root(&change.deps)
2093 || oplog
2094 .dag
2095 .get_change_lamport_from_deps(&change.deps)
2096 .is_some()
2097 {
2098 continue;
2099 }
2100
2101 for op in change.ops.iter() {
2102 let id = oplog
2103 .arena
2104 .get_container_id(op.container)
2105 .expect("decoded op container should be registered");
2106 if id.is_root() {
2107 roots.insert(id);
2108 }
2109 }
2110 }
2111
2112 roots.into_iter().collect()
2113}
2114
2115#[derive(Debug, thiserror::Error)]
2116pub enum ChangeTravelError {
2117 #[error("Target id not found {0:?}")]
2118 TargetIdNotFound(ID),
2119 #[error("The shallow history of the doc doesn't include the target version")]
2120 TargetVersionNotIncluded,
2121}
2122
2123impl LoroDoc {
2124 pub fn travel_change_ancestors(
2125 &self,
2126 ids: &[ID],
2127 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
2128 ) -> Result<(), ChangeTravelError> {
2129 let (options, guard) = self.implicit_commit_then_stop();
2130 drop(guard);
2131 struct PendingNode(ChangeMeta);
2132 impl PartialEq for PendingNode {
2133 fn eq(&self, other: &Self) -> bool {
2134 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
2135 }
2136 }
2137
2138 impl Eq for PendingNode {}
2139 impl PartialOrd for PendingNode {
2140 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2141 Some(self.cmp(other))
2142 }
2143 }
2144
2145 impl Ord for PendingNode {
2146 fn cmp(&self, other: &Self) -> Ordering {
2147 self.0
2148 .lamport_last()
2149 .cmp(&other.0.lamport_last())
2150 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
2151 }
2152 }
2153
2154 for id in ids {
2155 let op_log = &self.oplog().lock();
2156 if !op_log.vv().includes_id(*id) {
2157 return Err(ChangeTravelError::TargetIdNotFound(*id));
2158 }
2159 if op_log.dag.shallow_since_vv().includes_id(*id) {
2160 return Err(ChangeTravelError::TargetVersionNotIncluded);
2161 }
2162 }
2163
2164 let mut visited = FxHashSet::default();
2165 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
2166 for id in ids {
2167 pending.push(PendingNode(ChangeMeta::from_change(
2168 &self.oplog().lock().get_change_at(*id).unwrap(),
2169 )));
2170 }
2171 while let Some(PendingNode(node)) = pending.pop() {
2172 let deps = node.deps.clone();
2173 if f(node).is_break() {
2174 break;
2175 }
2176
2177 for dep in deps.iter() {
2178 let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
2179 continue;
2180 };
2181 if visited.contains(&dep_node.id) {
2182 continue;
2183 }
2184
2185 visited.insert(dep_node.id);
2186 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
2187 }
2188 }
2189
2190 let ans = Ok(());
2191 self.renew_txn_if_auto_commit(options);
2192 ans
2193 }
2194
2195 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
2196 self.with_barrier(|| {
2197 let mut set = FxHashSet::default();
2198 let len = i64::try_from(len).unwrap_or(i64::MAX);
2199 let start = i64::from(id.counter);
2200 let end = start.saturating_add(len);
2201 if end <= 0 {
2202 return set;
2203 }
2204
2205 let start = start.max(0).min(i64::from(i32::MAX));
2206 let end = end.max(0).min(i64::from(i32::MAX));
2207 if start >= end {
2208 return set;
2209 }
2210
2211 {
2212 let oplog = self.oplog().lock();
2213 let span = IdSpan::new(id.peer, start as i32, end as i32);
2214 for op in oplog.iter_ops(span) {
2215 let id = oplog.arena.get_container_id(op.container()).unwrap();
2216 set.insert(id);
2217 }
2218 }
2219 set
2220 })
2221 }
2222
2223 pub fn delete_root_container(&self, cid: ContainerID) {
2224 if !cid.is_root() {
2225 return;
2226 }
2227
2228 if !self.has_container(&cid) {
2230 return;
2231 }
2232
2233 let Some(h) = self.get_handler(cid.clone()) else {
2234 return;
2235 };
2236
2237 self.config
2238 .deleted_root_containers
2239 .lock()
2240 .insert(cid.clone());
2241 if let Err(e) = h.clear() {
2242 self.config.deleted_root_containers.lock().remove(&cid);
2243 eprintln!("Failed to clear handler: {:?}", e);
2244 }
2245 }
2246
2247 pub fn set_hide_empty_root_containers(&self, hide: bool) {
2248 self.config
2249 .hide_empty_root_containers
2250 .store(hide, std::sync::atomic::Ordering::Relaxed);
2251 }
2252}
2253
2254fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2255 let start_vv = oplog
2266 .dag
2267 .frontiers_to_vv(&id.into())
2268 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2269
2270 let mut best: Option<((loro_common::Lamport, loro_common::PeerID), ID)> = None;
2274
2275 for change in oplog.iter_changes_peer_by_peer(&start_vv, oplog.vv()) {
2276 let peer = change.peer();
2277 for op in change.ops.iter() {
2278 if op.container != idx {
2279 continue;
2280 }
2281 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2282 if d.id_start.to_span(d.atom_len()).contains(id) {
2283 debug_assert!(op.counter >= change.id().counter);
2284 let op_lamport =
2285 change.lamport + (op.counter - change.id().counter) as loro_common::Lamport;
2286 let key = (op_lamport, peer);
2287 if best.is_none_or(|(bk, _)| key > bk) {
2288 best = Some((key, ID::new(peer, op.counter)));
2289 }
2290 }
2291 }
2292 }
2293 }
2294
2295 best.map(|(_, op_id)| op_id)
2296}
2297
2298#[derive(Debug)]
2299pub struct CommitWhenDrop<'a> {
2300 doc: &'a LoroDoc,
2301 default_options: CommitOptions,
2302}
2303
2304impl Drop for CommitWhenDrop<'_> {
2305 fn drop(&mut self) {
2306 {
2307 let mut guard = self.doc.txn.lock();
2308 if let Some(txn) = guard.as_mut() {
2309 txn.set_default_options(std::mem::take(&mut self.default_options));
2310 };
2311 }
2312
2313 self.doc.commit_then_renew();
2314 }
2315}
2316
2317#[derive(Debug, Clone)]
2319pub struct CommitOptions {
2320 pub origin: Option<InternalString>,
2323
2324 pub immediate_renew: bool,
2327
2328 pub timestamp: Option<Timestamp>,
2331
2332 pub commit_msg: Option<Arc<str>>,
2334}
2335
2336impl CommitOptions {
2337 pub fn new() -> Self {
2339 Self {
2340 origin: None,
2341 immediate_renew: true,
2342 timestamp: None,
2343 commit_msg: None,
2344 }
2345 }
2346
2347 pub fn origin(mut self, origin: &str) -> Self {
2349 self.origin = Some(origin.into());
2350 self
2351 }
2352
2353 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2355 self.immediate_renew = immediate_renew;
2356 self
2357 }
2358
2359 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2363 self.timestamp = Some(timestamp);
2364 self
2365 }
2366
2367 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2369 self.commit_msg = Some(commit_msg.into());
2370 self
2371 }
2372
2373 pub fn set_origin(&mut self, origin: Option<&str>) {
2375 self.origin = origin.map(|x| x.into())
2376 }
2377
2378 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2380 self.timestamp = timestamp;
2381 }
2382}
2383
2384impl Default for CommitOptions {
2385 fn default() -> Self {
2386 Self::new()
2387 }
2388}
2389
2390#[cfg(test)]
2391mod test {
2392 use std::{
2393 panic::AssertUnwindSafe,
2394 sync::{
2395 atomic::{AtomicUsize, Ordering},
2396 Arc,
2397 },
2398 };
2399
2400 use crate::{
2401 cursor::PosType,
2402 encoding::json_schema::json::{JsonOpContent, JsonSchema, ListOp},
2403 encoding::{fast_snapshot::EMPTY_MARK, EncodeMode},
2404 loro::ExportMode,
2405 version::{Frontiers, VersionVector},
2406 LoroDoc, ToJson, TreeParentId,
2407 };
2408 use bytes::{BufMut, Bytes};
2409 use loro_common::ID;
2410 use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
2411
2412 const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
2413
2414 fn encode_import_blob(mode: EncodeMode, body: &[u8]) -> Vec<u8> {
2415 let mut ans = Vec::new();
2416 ans.extend_from_slice(b"loro");
2417 ans.extend_from_slice(&[0; 16]);
2418 ans.extend_from_slice(&mode.to_bytes());
2419 ans.extend_from_slice(body);
2420 let checksum = xxhash_rust::xxh32::xxh32(&ans[20..], XXH_SEED);
2421 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
2422 ans
2423 }
2424
2425 fn encode_fast_snapshot_import(oplog_bytes: &[u8]) -> Vec<u8> {
2426 let mut body = Vec::new();
2427 body.put_u32_le(oplog_bytes.len() as u32);
2428 body.extend_from_slice(oplog_bytes);
2429 body.put_u32_le(EMPTY_MARK.len() as u32);
2430 body.extend_from_slice(EMPTY_MARK);
2431 body.put_u32_le(0);
2432 encode_import_blob(EncodeMode::FastSnapshot, &body)
2433 }
2434
2435 fn sstable_with_huge_meta_block_count() -> Vec<u8> {
2436 let mut bytes = Vec::new();
2437 bytes.extend_from_slice(b"LORO");
2438 bytes.push(0);
2439 bytes.put_u32_le(10_000_000);
2440 bytes.put_u32_le(xxhash_rust::xxh32::xxh32(&[], XXH_SEED));
2441 bytes.put_u32_le(5);
2442 bytes
2443 }
2444
2445 fn snapshot_oplog_with_malformed_block() -> Vec<u8> {
2446 let peer = 1;
2447 let id = ID::new(peer, 0);
2448 let vv = VersionVector::from_iter([(peer, 1)]);
2449 let frontiers = Frontiers::from_id(id);
2450 let mut store = MemKvStore::new(MemKvConfig::default());
2451 store.set(b"vv", vv.encode().into());
2452 store.set(b"fr", frontiers.encode().into());
2453 store.set(&id.to_bytes(), Bytes::from_static(&[0]));
2454 store.export_all().to_vec()
2455 }
2456
2457 fn make_json_import_stress_doc(peer: u64) -> LoroDoc {
2458 let doc = LoroDoc::new_auto_commit();
2459 doc.set_peer_id(peer).unwrap();
2460
2461 let text = doc.get_text("text");
2462 let mut text_pos = 0;
2463 for i in 0..32 {
2464 let chunk = format!("segment-{i}-abcdefghijklmnopqrstuvwxyz;");
2465 text.insert_unicode(text_pos, &chunk).unwrap();
2466 text_pos += chunk.chars().count();
2467 }
2468
2469 let list = doc.get_list("list");
2470 for i in 0..32 {
2471 list.insert(i, format!("item-{i}")).unwrap();
2472 }
2473
2474 let map = doc.get_map("map");
2475 for i in 0..32 {
2476 let key = format!("key-{i}");
2477 map.insert(&key, format!("value-{i}")).unwrap();
2478 }
2479
2480 let tree = doc.get_tree("tree");
2481 let mut parent = TreeParentId::Root;
2482 for i in 0..16 {
2483 let node = tree.create(parent).unwrap();
2484 let meta = tree.get_meta(node).unwrap();
2485 meta.insert("name", format!("node-{i}")).unwrap();
2486 meta.insert("payload", format!("payload-{i}-{}", "x".repeat(16)))
2487 .unwrap();
2488 parent = TreeParentId::Node(node);
2489 }
2490
2491 doc
2492 }
2493
2494 fn make_json_list_update_with_four_ops(peer: u64) -> (LoroDoc, JsonSchema) {
2495 let doc = LoroDoc::new();
2496 doc.set_peer_id(peer).unwrap();
2497 let map = doc.get_map("map");
2498 let list = doc.get_list("list");
2499 let text = doc.get_text("text");
2500
2501 let mut txn = doc.txn().unwrap();
2502 map.insert_with_txn(&mut txn, "prefix", "map-value".into())
2503 .unwrap();
2504 list.insert_with_txn(&mut txn, 0, "seed".into()).unwrap();
2505 text.insert_with_txn(&mut txn, 0, "text-value", PosType::Unicode)
2506 .unwrap();
2507 list.insert_with_txn(&mut txn, 1, "tail".into()).unwrap();
2508 txn.commit().unwrap();
2509
2510 let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv(), false);
2511 assert_eq!(json.changes.len(), 1);
2512 assert_eq!(json.changes[0].ops.len(), 4);
2513 (doc, json)
2514 }
2515
2516 fn move_last_list_insert_far_out_of_bounds(json: &mut JsonSchema) {
2517 let last_change = json.changes.last_mut().unwrap();
2518 let last_op = last_change.ops.last_mut().unwrap();
2519 match &mut last_op.content {
2520 JsonOpContent::List(ListOp::Insert { pos, .. }) => {
2521 *pos = 1_000;
2522 }
2523 other => panic!("expected list insert op, got {other:?}"),
2524 }
2525 }
2526
2527 #[test]
2528 fn test_sync() {
2529 fn is_send_sync<T: Send + Sync>(_v: T) {}
2530 let loro = super::LoroDoc::new();
2531 is_send_sync(loro)
2532 }
2533
2534 #[test]
2535 fn import_rejects_huge_sstable_meta_block_count_without_panic() {
2536 let bytes = encode_fast_snapshot_import(&sstable_with_huge_meta_block_count());
2537
2538 let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2539 assert!(result.is_ok(), "malformed import should not panic");
2540 assert!(result.unwrap().is_err());
2541 }
2542
2543 #[test]
2544 fn import_rejects_malformed_change_block_without_panic() {
2545 let bytes = encode_fast_snapshot_import(&snapshot_oplog_with_malformed_block());
2546
2547 let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2548 assert!(result.is_ok(), "malformed import should not panic");
2549 assert!(result.unwrap().is_err());
2550 }
2551
2552 #[test]
2553 fn failed_import_rolls_back_oplog_and_arena() {
2554 let src = LoroDoc::new();
2555 src.set_peer_id(1).unwrap();
2556 let text = src.get_text("text");
2557 let mut txn = src.txn().unwrap();
2558 text.insert_with_txn(&mut txn, 0, "hello", PosType::Unicode)
2559 .unwrap();
2560 txn.commit().unwrap();
2561 let update = src.export(ExportMode::all_updates()).unwrap();
2562
2563 let dst = LoroDoc::new();
2564 let vv_before_import = dst.oplog_vv();
2565 let state_before_import = dst.get_deep_value();
2566 let err = dst
2567 .import_with(&update, "__loro_fail_import_state_apply".into())
2568 .unwrap_err();
2569 assert!(err.to_string().contains("state apply failpoint"));
2570 assert_eq!(dst.oplog_vv(), vv_before_import);
2571 assert_eq!(dst.get_deep_value(), state_before_import);
2572 assert!(dst.oplog().lock().is_empty());
2573
2574 dst.import(&update).unwrap();
2575 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2576 }
2577
2578 #[test]
2579 fn failed_incremental_import_restores_previous_change_store_block() {
2580 let src = LoroDoc::new();
2581 src.set_peer_id(1).unwrap();
2582 let text = src.get_text("text");
2583 let mut txn = src.txn().unwrap();
2584 text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2585 .unwrap();
2586 txn.commit().unwrap();
2587 let first_update = src.export(ExportMode::all_updates()).unwrap();
2588 let first_vv = src.oplog_vv();
2589
2590 let mut txn = src.txn().unwrap();
2591 text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2592 .unwrap();
2593 txn.commit().unwrap();
2594 let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2595
2596 let dst = LoroDoc::new();
2597 dst.import(&first_update).unwrap();
2598 let vv_before_import = dst.oplog_vv();
2599 let state_before_import = dst.get_deep_value();
2600 dst.import_with(&second_update, "__loro_fail_import_state_apply".into())
2601 .unwrap_err();
2602 assert_eq!(dst.oplog_vv(), vv_before_import);
2603 assert_eq!(dst.get_deep_value(), state_before_import);
2604
2605 dst.import(&second_update).unwrap();
2606 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2607 }
2608
2609 #[test]
2610 fn failed_import_json_updates_rolls_back_complex_empty_doc() {
2611 let src = make_json_import_stress_doc(11);
2612 let json = src.export_json_updates(&Default::default(), &src.oplog_vv(), false);
2613
2614 let dst = LoroDoc::new();
2615 let vv_before_import = dst.oplog_vv();
2616 let frontiers_before_import = dst.oplog_frontiers();
2617 let state_before_import = dst.get_deep_value();
2618 for _ in 0..3 {
2619 crate::state::fail_next_import_state_apply_for_test();
2620 let err = dst.import_json_updates(json.clone()).unwrap_err();
2621 assert!(err.to_string().contains("state apply failpoint"));
2622 assert_eq!(dst.oplog_vv(), vv_before_import);
2623 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2624 assert_eq!(dst.get_deep_value(), state_before_import);
2625 assert!(dst.oplog().lock().is_empty());
2626 }
2627
2628 dst.import_json_updates(json).unwrap();
2629 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2630 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2631 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2632 }
2633
2634 #[test]
2635 fn failed_incremental_import_json_updates_restores_previous_change_store_block() {
2636 let src = LoroDoc::new_auto_commit();
2637 src.set_peer_id(12).unwrap();
2638 let text = src.get_text("text");
2639 text.insert_unicode(0, "a").unwrap();
2640 let list = src.get_list("list");
2641 list.push("seed").unwrap();
2642 let map = src.get_map("map");
2643 map.insert("seed", "value").unwrap();
2644 let tree = src.get_tree("tree");
2645 let root = tree.create(TreeParentId::Root).unwrap();
2646 tree.get_meta(root).unwrap().insert("name", "root").unwrap();
2647
2648 let first_vv = src.oplog_vv();
2649 let first_json = src.export_json_updates(&Default::default(), &first_vv, false);
2650
2651 let mut text_pos = text.len_unicode();
2652 for i in 0..64 {
2653 let chunk = format!("chunk-{i};");
2654 text.insert_unicode(text_pos, &chunk).unwrap();
2655 text_pos += chunk.chars().count();
2656 }
2657 for i in 0..32 {
2658 list.push(format!("after-{i}")).unwrap();
2659 let key = format!("after-{i}");
2660 map.insert(&key, format!("value-{i}")).unwrap();
2661 }
2662 let child = tree.create(TreeParentId::Node(root)).unwrap();
2663 tree.get_meta(child)
2664 .unwrap()
2665 .insert("name", "child")
2666 .unwrap();
2667
2668 let second_json = src.export_json_updates(&first_vv, &src.oplog_vv(), false);
2669
2670 let dst = LoroDoc::new();
2671 dst.import_json_updates(first_json).unwrap();
2672 let vv_before_import = dst.oplog_vv();
2673 let frontiers_before_import = dst.oplog_frontiers();
2674 let state_before_import = dst.get_deep_value();
2675
2676 for _ in 0..2 {
2677 crate::state::fail_next_import_state_apply_for_test();
2678 let err = dst.import_json_updates(second_json.clone()).unwrap_err();
2679 assert!(err.to_string().contains("state apply failpoint"));
2680 assert_eq!(dst.oplog_vv(), vv_before_import);
2681 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2682 assert_eq!(dst.get_deep_value(), state_before_import);
2683 }
2684
2685 dst.import_json_updates(second_json).unwrap();
2686 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2687 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2688 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2689 }
2690
2691 #[test]
2692 fn malformed_later_import_json_update_rolls_back_after_valid_prefix_enters_oplog() {
2693 let peer = 13;
2694 let (src, good_json) = make_json_list_update_with_four_ops(peer);
2695 let mut bad_json = good_json.clone();
2696 move_last_list_insert_far_out_of_bounds(&mut bad_json);
2697
2698 let good_dst = LoroDoc::new();
2699 good_dst.import_json_updates(good_json.clone()).unwrap();
2700 assert_eq!(good_dst.get_deep_value(), src.get_deep_value());
2701
2702 let last_op_counter = good_json.changes[0].ops.last().unwrap().counter;
2703 let prefix_vv = VersionVector::from_iter([(peer, last_op_counter)]);
2704 let prefix_json = src.export_json_updates(&Default::default(), &prefix_vv, false);
2705 assert_eq!(
2706 prefix_json.changes[0].ops.len(),
2707 good_json.changes[0].ops.len() - 1
2708 );
2709 let good_suffix_json = src.export_json_updates(&prefix_vv, &src.oplog_vv(), false);
2710 assert_eq!(good_suffix_json.changes[0].ops.len(), 1);
2711 let mut bad_suffix_json = good_suffix_json.clone();
2712 move_last_list_insert_far_out_of_bounds(&mut bad_suffix_json);
2713
2714 let prefix_dst = LoroDoc::new();
2715 prefix_dst.import_json_updates(prefix_json.clone()).unwrap();
2716 let vv_before_bad_suffix = prefix_dst.oplog_vv();
2717 let frontiers_before_bad_suffix = prefix_dst.oplog_frontiers();
2718 let state_before_bad_suffix = prefix_dst.get_deep_value();
2719
2720 let bad_suffix_json = serde_json::to_string(&bad_suffix_json).unwrap();
2721 let err = prefix_dst
2722 .import_json_updates(&bad_suffix_json)
2723 .unwrap_err();
2724 assert!(
2725 err.to_string().contains("list diff"),
2726 "expected state list bounds validation, got {err:?}"
2727 );
2728 assert_eq!(prefix_dst.oplog_vv(), vv_before_bad_suffix);
2729 assert_eq!(prefix_dst.oplog_frontiers(), frontiers_before_bad_suffix);
2730 assert_eq!(prefix_dst.get_deep_value(), state_before_bad_suffix);
2731
2732 prefix_dst.import_json_updates(good_suffix_json).unwrap();
2733 assert_eq!(prefix_dst.get_deep_value(), src.get_deep_value());
2734 assert_eq!(prefix_dst.oplog_vv(), src.oplog_vv());
2735
2736 let dst = LoroDoc::new();
2737 let vv_before_import = dst.oplog_vv();
2738 let frontiers_before_import = dst.oplog_frontiers();
2739 let state_before_import = dst.get_deep_value();
2740 let bad_json = serde_json::to_string(&bad_json).unwrap();
2741 let err = dst.import_json_updates(&bad_json).unwrap_err();
2742 assert!(
2743 err.to_string().contains("list diff"),
2744 "expected state list bounds validation, got {err:?}"
2745 );
2746 assert_eq!(dst.oplog_vv(), vv_before_import);
2747 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2748 assert_eq!(dst.get_deep_value(), state_before_import);
2749 assert!(dst.oplog().lock().is_empty());
2750 }
2751
2752 #[test]
2753 fn failed_import_restores_pending_changes_that_were_applied_during_import() {
2754 let src = LoroDoc::new();
2755 src.set_peer_id(14).unwrap();
2756 let text = src.get_text("text");
2757
2758 let mut txn = src.txn().unwrap();
2759 text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2760 .unwrap();
2761 txn.commit().unwrap();
2762 let first_update = src.export(ExportMode::all_updates()).unwrap();
2763 let first_vv = src.oplog_vv();
2764
2765 let mut txn = src.txn().unwrap();
2766 text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2767 .unwrap();
2768 txn.commit().unwrap();
2769 let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2770
2771 let dst = LoroDoc::new();
2772 let status = dst.import(&second_update).unwrap();
2773 assert!(status.success.is_empty());
2774 assert!(status.pending.is_some());
2775 let vv_before_dependency = dst.oplog_vv();
2776 let frontiers_before_dependency = dst.oplog_frontiers();
2777 let state_before_dependency = dst.get_deep_value();
2778
2779 crate::state::fail_next_import_state_apply_for_test();
2780 let err = dst.import(&first_update).unwrap_err();
2781 assert!(err.to_string().contains("state apply failpoint"));
2782 assert_eq!(dst.oplog_vv(), vv_before_dependency);
2783 assert_eq!(dst.oplog_frontiers(), frontiers_before_dependency);
2784 assert_eq!(dst.get_deep_value(), state_before_dependency);
2785
2786 dst.import(&first_update).unwrap();
2787 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2788 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2789 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2790 }
2791
2792 #[test]
2793 fn failed_import_json_updates_does_not_emit_or_leave_events() {
2794 let (src, good_json) = make_json_list_update_with_four_ops(15);
2795 let mut bad_json = good_json.clone();
2796 move_last_list_insert_far_out_of_bounds(&mut bad_json);
2797
2798 let dst = LoroDoc::new();
2799 let event_count = Arc::new(AtomicUsize::new(0));
2800 let event_count_cloned = event_count.clone();
2801 let _sub = dst.subscribe_root(Arc::new(move |_| {
2802 event_count_cloned.fetch_add(1, Ordering::SeqCst);
2803 }));
2804
2805 let bad_json = serde_json::to_string(&bad_json).unwrap();
2806 let err = dst.import_json_updates(&bad_json).unwrap_err();
2807 assert!(
2808 err.to_string().contains("list diff"),
2809 "expected state list bounds validation, got {err:?}"
2810 );
2811 assert_eq!(event_count.load(Ordering::SeqCst), 0);
2812 assert!(dst.drop_pending_events().is_empty());
2813 assert!(dst.oplog().lock().is_empty());
2814
2815 dst.import_json_updates(good_json).unwrap();
2816 assert_eq!(event_count.load(Ordering::SeqCst), 1);
2817 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2818 }
2819
2820 #[test]
2821 fn test_checkout() {
2822 let loro = LoroDoc::new();
2823 loro.set_peer_id(1).unwrap();
2824 let text = loro.get_text("text");
2825 let map = loro.get_map("map");
2826 let list = loro.get_list("list");
2827 let mut txn = loro.txn().unwrap();
2828 for i in 0..10 {
2829 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2830 text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2831 .unwrap();
2832 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2833 }
2834 txn.commit().unwrap();
2835 let b = LoroDoc::new();
2836 b.import(&loro.export(ExportMode::Snapshot).unwrap())
2837 .unwrap();
2838 loro.checkout(&Frontiers::default()).unwrap();
2839 {
2840 let json = &loro.get_deep_value();
2841 assert_eq!(
2842 json.to_json_value(),
2843 serde_json::json!({"text":"","list":[],"map":{}})
2844 );
2845 }
2846
2847 b.checkout(&ID::new(1, 2).into()).unwrap();
2848 {
2849 let json = &b.get_deep_value();
2850 assert_eq!(
2851 json.to_json_value(),
2852 serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2853 );
2854 }
2855
2856 loro.checkout(&ID::new(1, 3).into()).unwrap();
2857 {
2858 let json = &loro.get_deep_value();
2859 assert_eq!(
2860 json.to_json_value(),
2861 serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2862 );
2863 }
2864
2865 b.checkout(&ID::new(1, 29).into()).unwrap();
2866 {
2867 let json = &b.get_deep_value();
2868 assert_eq!(
2869 json.to_json_value(),
2870 serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2871 );
2872 }
2873 }
2874
2875 #[test]
2876 fn import_batch_err_181() {
2877 let a = LoroDoc::new_auto_commit();
2878 let update_a = a.export(ExportMode::Snapshot);
2879 let b = LoroDoc::new_auto_commit();
2880 b.import_batch(&[update_a.unwrap()]).unwrap();
2881 b.get_text("text")
2882 .insert(0, "hello", PosType::Unicode)
2883 .unwrap();
2884 b.commit_then_renew();
2885 let oplog = b.oplog().lock();
2886 drop(oplog);
2887 b.export(ExportMode::all_updates()).unwrap();
2888 }
2889
2890 #[test]
2891 fn poisoned_mutex_keeps_follow_up_operations_failed() {
2892 let doc = LoroDoc::new();
2893 let oplog = doc.oplog.clone();
2894 let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2895 let _guard = oplog.lock();
2896 panic!("poison oplog");
2897 }));
2898
2899 let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2900 .expect_err("poisoned lock should continue to fail fast");
2901 let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2902 (*msg).to_string()
2903 } else if let Some(msg) = err.downcast_ref::<String>() {
2904 msg.clone()
2905 } else {
2906 String::new()
2907 };
2908 assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2909 }
2910}