1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::{AtomicBool, AtomicUsize};
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::{Change, Timestamp},
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
48 LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 #[inline]
89 pub fn with_barrier<F, R>(&self, f: F) -> R
90 where
91 F: FnOnce() -> R,
92 {
93 let (options, guard) = self.implicit_commit_then_stop();
94 let result = f();
95 drop(guard);
96 self.renew_txn_if_auto_commit(options);
97 result
98 }
99
100 pub fn new() -> Self {
101 let visible_op_count = Arc::new(AtomicUsize::new(0));
102 let oplog = OpLog::new(visible_op_count.clone());
103 let arena = oplog.arena.clone();
104 let config: Configure = oplog.configure.clone();
105 let lock_group = LoroLockGroup::new();
106 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
107 let inner = Arc::new_cyclic(|w| {
108 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
109 LoroDocInner {
110 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
111 state,
112 config,
113 visible_op_count,
114 detached: AtomicBool::new(false),
115 auto_commit: AtomicBool::new(false),
116 observer: Arc::new(Observer::new(arena.clone())),
117 diff_calculator: Arc::new(
118 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
119 ),
120 txn: global_txn,
121 arena,
122 local_update_subs: SubscriberSetWithQueue::new(),
123 peer_id_change_subs: SubscriberSetWithQueue::new(),
124 pre_commit_subs: SubscriberSetWithQueue::new(),
125 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
126 }
127 });
128 LoroDoc { inner }
129 }
130
131 pub fn fork(&self) -> Self {
132 if self.is_detached() {
133 return self
134 .fork_at(&self.state_frontiers())
135 .expect("fork_at on detached doc should not fail");
136 }
137
138 let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
139 let doc = Self::new();
140 doc.with_barrier(|| {
141 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
142 })
143 .unwrap();
144 doc.set_config(&self.config);
145 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
146 doc.start_auto_commit();
147 }
148 doc
149 }
150 pub fn set_detached_editing(&self, enable: bool) {
166 self.config.set_detached_editing(enable);
167 if enable && self.is_detached() {
168 self.with_barrier(|| {
169 self.renew_peer_id();
170 });
171 }
172 }
173
174 #[inline]
176 pub fn new_auto_commit() -> Self {
177 let doc = Self::new();
178 doc.start_auto_commit();
179 doc
180 }
181
182 #[inline(always)]
183 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
184 if peer == PeerID::MAX {
185 return Err(LoroError::InvalidPeerID);
186 }
187 let next_id = self.oplog.lock().next_id(peer);
188 if self.auto_commit.load(Acquire) {
189 let doc_state = self.state.lock();
190 doc_state
191 .peer
192 .store(peer, std::sync::atomic::Ordering::Relaxed);
193
194 if doc_state.is_in_txn() {
195 drop(doc_state);
196 self.with_barrier(|| {});
198 }
199 self.peer_id_change_subs.emit(&(), next_id);
200 return Ok(());
201 }
202
203 let doc_state = self.state.lock();
204 if doc_state.is_in_txn() {
205 return Err(LoroError::TransactionError(
206 "Cannot change peer id during transaction"
207 .to_string()
208 .into_boxed_str(),
209 ));
210 }
211
212 doc_state
213 .peer
214 .store(peer, std::sync::atomic::Ordering::Relaxed);
215 drop(doc_state);
216 self.peer_id_change_subs.emit(&(), next_id);
217 Ok(())
218 }
219
220 pub(crate) fn renew_peer_id(&self) {
222 let mut peer_id = DefaultRandom.next_u64();
223 while peer_id == PeerID::MAX {
224 peer_id = DefaultRandom.next_u64();
225 }
226 self.set_peer_id(peer_id).unwrap();
227 }
228
229 #[inline]
241 #[must_use]
242 pub fn implicit_commit_then_stop(
243 &self,
244 ) -> (
245 Option<CommitOptions>,
246 LoroMutexGuard<'_, Option<Transaction>>,
247 ) {
248 let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
250 (a, b.unwrap())
251 }
252
253 #[inline]
258 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
259 self.commit_internal(CommitOptions::new().immediate_renew(true), false)
261 .0
262 }
263
264 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
269 let mut txn_guard = self.txn.lock();
270 let Some(txn) = txn_guard.as_mut() else {
271 return Some(txn_guard);
272 };
273
274 if txn.is_peer_first_appearance {
275 txn.is_peer_first_appearance = false;
276 drop(txn_guard);
277 self.first_commit_from_peer_subs.emit(
279 &(),
280 FirstCommitFromPeerPayload {
281 peer: self.peer_id(),
282 },
283 );
284 }
285
286 None
287 }
288
289 #[instrument(skip_all)]
297 fn commit_internal(
298 &self,
299 config: CommitOptions,
300 preserve_on_empty: bool,
301 ) -> (
302 Option<CommitOptions>,
303 Option<LoroMutexGuard<'_, Option<Transaction>>>,
304 ) {
305 if !self.auto_commit.load(Acquire) {
306 let txn_guard = self.txn.lock();
307 return (None, Some(txn_guard));
310 }
311
312 loop {
313 if let Some(txn_guard) = self.before_commit() {
314 return (None, Some(txn_guard));
315 }
316
317 let mut txn_guard = self.txn.lock();
318 let txn = txn_guard.take();
319 let Some(mut txn) = txn else {
320 return (None, Some(txn_guard));
321 };
322 let on_commit = txn.take_on_commit();
323 if let Some(origin) = config.origin.clone() {
324 txn.set_origin(origin);
325 }
326
327 if let Some(timestamp) = config.timestamp {
328 txn.set_timestamp(timestamp);
329 }
330
331 if let Some(msg) = config.commit_msg.as_ref() {
332 txn.set_msg(Some(msg.clone()));
333 }
334
335 let id_span = txn.id_span();
336 let mut options = txn.commit().unwrap();
337 if let Some(opts) = options.as_mut() {
339 if config.origin.is_some() {
341 opts.set_origin(None);
342 }
343 if !preserve_on_empty {
345 options = None;
346 }
347 }
348 if config.immediate_renew {
349 if self.can_edit() {
350 let mut t = self.txn().unwrap();
351 if let Some(options) = options.as_ref() {
352 t.set_options(options.clone());
353 }
354 *txn_guard = Some(t);
355 }
356 }
357
358 if let Some(on_commit) = on_commit {
359 drop(txn_guard);
360 on_commit(&self.state, &self.oplog, id_span);
361 txn_guard = self.txn.lock();
362 if !config.immediate_renew && txn_guard.is_some() {
363 continue;
365 }
366 }
367
368 return (
369 options,
370 if !config.immediate_renew {
371 Some(txn_guard)
372 } else {
373 None
374 },
375 );
376 }
377 }
378
379 #[instrument(skip_all)]
384 pub fn commit_with(
385 &self,
386 config: CommitOptions,
387 ) -> (
388 Option<CommitOptions>,
389 Option<LoroMutexGuard<'_, Option<Transaction>>>,
390 ) {
391 self.commit_internal(config, false)
392 }
393
394 pub fn set_next_commit_message(&self, message: &str) {
396 let mut binding = self.txn.lock();
397 let Some(txn) = binding.as_mut() else {
398 return;
399 };
400
401 if message.is_empty() {
402 txn.set_msg(None)
403 } else {
404 txn.set_msg(Some(message.into()))
405 }
406 }
407
408 pub fn set_next_commit_origin(&self, origin: &str) {
410 let mut txn = self.txn.lock();
411 if let Some(txn) = txn.as_mut() {
412 txn.set_origin(origin.into());
413 }
414 }
415
416 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
418 let mut txn = self.txn.lock();
419 if let Some(txn) = txn.as_mut() {
420 txn.set_timestamp(timestamp);
421 }
422 }
423
424 pub fn set_next_commit_options(&self, options: CommitOptions) {
426 let mut txn = self.txn.lock();
427 if let Some(txn) = txn.as_mut() {
428 txn.set_options(options);
429 }
430 }
431
432 pub fn clear_next_commit_options(&self) {
434 let mut txn = self.txn.lock();
435 if let Some(txn) = txn.as_mut() {
436 txn.set_options(CommitOptions::new());
437 }
438 }
439
440 #[inline]
451 pub fn set_record_timestamp(&self, record: bool) {
452 self.config.set_record_timestamp(record);
453 }
454
455 #[inline]
460 pub fn set_change_merge_interval(&self, interval: i64) {
461 self.config.set_merge_interval(interval);
462 }
463
464 pub fn can_edit(&self) -> bool {
465 !self.is_detached() || self.config.detached_editing()
466 }
467
468 pub fn is_detached_editing_enabled(&self) -> bool {
469 self.config.detached_editing()
470 }
471
472 #[inline]
473 pub fn config_text_style(&self, text_style: StyleConfigMap) {
474 self.config.text_style_config.write().map = text_style.map;
475 }
476
477 #[inline]
478 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
479 self.config.text_style_config.write().default_style = text_style;
480 }
481 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
482 let doc = Self::new();
483 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
484 if mode.is_snapshot() {
485 doc.with_barrier(|| -> Result<(), LoroError> {
486 decode_snapshot(&doc, mode, body, Default::default())?;
487 Ok(())
488 })?;
489 Ok(doc)
490 } else {
491 Err(LoroError::DecodeError(
492 "Invalid encode mode".to_string().into(),
493 ))
494 }
495 }
496
497 #[inline(always)]
499 pub fn can_reset_with_snapshot(&self) -> bool {
500 let oplog = self.oplog.lock();
501 if oplog.batch_importing {
502 return false;
503 }
504
505 if self.is_detached() {
506 return false;
507 }
508
509 oplog.is_empty() && self.state.lock().can_import_snapshot()
510 }
511
512 #[inline(always)]
518 pub fn is_detached(&self) -> bool {
519 self.detached.load(Acquire)
520 }
521
522 pub(crate) fn set_detached(&self, detached: bool) {
523 self.detached.store(detached, Release);
524 }
525
526 #[inline(always)]
527 pub fn peer_id(&self) -> PeerID {
528 self.state
529 .lock()
530 .peer
531 .load(std::sync::atomic::Ordering::Relaxed)
532 }
533
534 #[inline(always)]
535 pub fn detach(&self) {
536 self.with_barrier(|| self.set_detached(true));
537 }
538
539 #[inline(always)]
540 pub fn attach(&self) {
541 self.checkout_to_latest()
542 }
543
544 pub fn state_timestamp(&self) -> Timestamp {
547 let f = { self.state.lock().frontiers.clone() };
549 self.oplog.lock().get_timestamp_of_version(&f)
550 }
551
552 #[inline(always)]
553 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
554 &self.state
555 }
556
557 #[inline]
558 pub fn get_state_deep_value(&self) -> LoroValue {
559 self.state.lock().get_deep_value()
560 }
561
562 #[inline(always)]
563 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
564 &self.oplog
565 }
566
567 #[inline(always)]
568 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
569 let s = debug_span!("import", peer = self.peer_id());
570 let _e = s.enter();
571 self.import_with(bytes, Default::default())
572 }
573
574 #[inline]
575 pub fn import_with(
576 &self,
577 bytes: &[u8],
578 origin: InternalString,
579 ) -> Result<ImportStatus, LoroError> {
580 self.with_barrier(|| self._import_with(bytes, origin))
581 }
582
583 #[tracing::instrument(skip_all)]
584 fn _import_with(
585 &self,
586 bytes: &[u8],
587 origin: InternalString,
588 ) -> Result<ImportStatus, LoroError> {
589 ensure_cov::notify_cov("loro_internal::import");
590 let parsed = parse_header_and_body(bytes, true)?;
591 loro_common::info!("Importing with mode={:?}", &parsed.mode);
592 let result = match parsed.mode {
593 EncodeMode::OutdatedRle => {
594 if self.state.lock().is_in_txn() {
595 return Err(LoroError::ImportWhenInTxn);
596 }
597
598 let s = tracing::span!(
599 tracing::Level::INFO,
600 "Import updates ",
601 peer = self.peer_id()
602 );
603 let _e = s.enter();
604 self.update_oplog_and_apply_delta_to_state_if_needed(
605 |oplog| oplog.decode(parsed),
606 origin,
607 )
608 }
609 EncodeMode::OutdatedSnapshot => {
610 if self.can_reset_with_snapshot() {
611 loro_common::info!("Init by snapshot {}", self.peer_id());
612 decode_snapshot(self, parsed.mode, parsed.body, origin)
613 } else {
614 self.update_oplog_and_apply_delta_to_state_if_needed(
615 |oplog| oplog.decode(parsed),
616 origin,
617 )
618 }
619 }
620 EncodeMode::FastSnapshot => {
621 if self.can_reset_with_snapshot() {
622 ensure_cov::notify_cov("loro_internal::import::snapshot");
623 loro_common::info!("Init by fast snapshot {}", self.peer_id());
624 decode_snapshot(self, parsed.mode, parsed.body, origin)
625 } else {
626 self.import_changes_and_apply_delta_to_state_if_needed(
627 |oplog| encoding::decode_oplog_changes(oplog, parsed),
628 origin,
629 )
630
631 }
636 }
637 EncodeMode::FastUpdates => self.import_changes_and_apply_delta_to_state_if_needed(
638 |oplog| encoding::decode_oplog_changes(oplog, parsed),
639 origin,
640 ),
641 EncodeMode::Auto => {
642 unreachable!()
643 }
644 };
645
646 self.emit_events();
647
648 result
649 }
650
651 #[tracing::instrument(skip_all)]
652 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
653 &self,
654 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
655 origin: InternalString,
656 ) -> Result<ImportStatus, LoroError> {
657 let mut oplog = self.oplog.lock();
658 oplog.begin_import_rollback();
659 if !self.is_detached() {
660 let old_vv = oplog.vv().clone();
661 let old_frontiers = oplog.frontiers().clone();
662 let result = f(&mut oplog);
663 if &old_vv != oplog.vv() {
664 let mut diff = DiffCalculator::new(false);
665 let (diff, diff_mode) = diff.calc_diff_internal(
666 &oplog,
667 &old_vv,
668 &old_frontiers,
669 oplog.vv(),
670 oplog.dag.get_frontiers(),
671 None,
672 );
673 let mut state = self.state.lock();
674 if let Err(e) = state.apply_diff(
675 InternalDocDiff {
676 origin,
677 diff: (diff).into(),
678 by: EventTriggerKind::Import,
679 new_version: Cow::Owned(oplog.frontiers().clone()),
680 },
681 diff_mode,
682 ) {
683 oplog.rollback_import();
684 return Err(e);
685 }
686 }
687 match result {
688 Ok(result) => {
689 oplog.commit_import_rollback();
690 Ok(result)
691 }
692 Err(e) => {
693 if &old_vv == oplog.vv() {
698 oplog.rollback_import();
699 } else {
700 oplog.commit_import_rollback();
701 }
702 Err(e)
703 }
704 }
705 } else {
706 match f(&mut oplog) {
707 Ok(result) => {
708 oplog.commit_import_rollback();
709 Ok(result)
710 }
711 Err(e) => {
712 oplog.rollback_import();
713 Err(e)
714 }
715 }
716 }
717 }
718
719 #[tracing::instrument(skip_all)]
720 pub(crate) fn import_changes_and_apply_delta_to_state_if_needed(
721 &self,
722 decode_changes: impl FnOnce(&mut OpLog) -> Result<Vec<Change>, LoroError>,
723 origin: InternalString,
724 ) -> Result<ImportStatus, LoroError> {
725 let mut oplog = self.oplog.lock();
726 let arena_checkpoint = oplog.arena.checkpoint_for_rollback();
727 let changes = match decode_changes(&mut oplog) {
728 Ok(changes) => changes,
729 Err(e) => {
730 oplog.arena.rollback(arena_checkpoint);
731 return Err(e);
732 }
733 };
734
735 let preflight = oplog.preflight_import_changes(&changes);
736 if preflight.has_deps_before_shallow_root
737 && (self.is_detached() || !preflight.applies_to_dag)
738 {
739 oplog.arena.rollback(arena_checkpoint);
740 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
741 }
742
743 if self.is_detached() {
744 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
745 if result.has_deps_before_shallow_root {
746 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
747 }
748
749 return Ok(result.status);
750 }
751
752 if !preflight.applies_to_dag {
753 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
754 if result.has_deps_before_shallow_root {
755 oplog.arena.rollback(arena_checkpoint);
756 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
757 }
758
759 return Ok(result.status);
760 }
761
762 let old_vv = oplog.vv().clone();
763 let old_frontiers = oplog.frontiers().clone();
764 let rollback_enabled = preflight.needs_state_apply_rollback;
765 if rollback_enabled {
766 oplog.begin_import_rollback_with_arena(arena_checkpoint);
767 }
768
769 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
770 if &old_vv != oplog.vv() {
771 let mut diff = DiffCalculator::new(false);
772 let (diff, diff_mode) = diff.calc_diff_internal(
773 &oplog,
774 &old_vv,
775 &old_frontiers,
776 oplog.vv(),
777 oplog.dag.get_frontiers(),
778 None,
779 );
780 let mut state = self.state.lock();
781 if let Err(e) = state.apply_diff(
782 InternalDocDiff {
783 origin,
784 diff: (diff).into(),
785 by: EventTriggerKind::Import,
786 new_version: Cow::Owned(oplog.frontiers().clone()),
787 },
788 diff_mode,
789 ) {
790 if rollback_enabled {
791 oplog.rollback_import();
792 return Err(e);
793 }
794
795 panic!("state apply returned Err for import without rollback guard: {e}");
796 }
797 }
798
799 if result.has_deps_before_shallow_root {
800 if rollback_enabled {
801 oplog.commit_import_rollback();
802 }
803 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
804 }
805
806 if rollback_enabled {
807 oplog.commit_import_rollback();
808 }
809 Ok(result.status)
810 }
811
812 fn emit_events(&self) {
813 let events = {
815 let mut state = self.state.lock();
816 state.take_events()
817 };
818 for event in events {
819 self.observer.emit(event);
820 }
821 }
822
823 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
824 let mut state = self.state.lock();
825 state.take_events()
826 }
827
828 #[tracing::instrument(skip_all)]
832 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
833 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
834 self.with_barrier(|| {
835 let result = self.import_changes_and_apply_delta_to_state_if_needed(
836 |oplog| crate::encoding::json_schema::decode_json_changes(json, &oplog.arena),
837 Default::default(),
838 );
839 self.emit_events();
840 result
841 })
842 }
843
844 pub fn export_json_updates(
845 &self,
846 start_vv: &VersionVector,
847 end_vv: &VersionVector,
848 with_peer_compression: bool,
849 ) -> JsonSchema {
850 self.with_barrier(|| {
851 let oplog = self.oplog.lock();
852 let mut start_vv = start_vv;
853 let _temp: Option<VersionVector>;
854 if !oplog.dag.shallow_since_vv().is_empty() {
855 let mut include_all = true;
857 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
858 if start_vv.get(peer).unwrap_or(&0) < counter {
859 include_all = false;
860 break;
861 }
862 }
863 if !include_all {
864 let mut vv = start_vv.clone();
865 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
866 vv.extend_to_include_end_id(ID::new(peer, counter));
867 }
868 _temp = Some(vv);
869 start_vv = _temp.as_ref().unwrap();
870 }
871 }
872
873 crate::encoding::json_schema::export_json(
874 &oplog,
875 start_vv,
876 end_vv,
877 with_peer_compression,
878 )
879 })
880 }
881
882 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
883 let oplog = self.oplog.lock();
884 let mut changes = export_json_in_id_span(&oplog, id_span);
885 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
886 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
887 changes.push(change_json);
888 }
889 changes
890 }
891
892 #[inline]
894 pub fn oplog_vv(&self) -> VersionVector {
895 self.oplog.lock().vv().clone()
896 }
897
898 #[inline]
900 pub fn state_vv(&self) -> VersionVector {
901 let oplog = self.oplog.lock();
902 let f = &self.state.lock().frontiers;
903 oplog.dag.frontiers_to_vv(f).unwrap()
904 }
905
906 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
907 let value: LoroValue = self.state.lock().get_value_by_path(path)?;
908 if let LoroValue::Container(c) = value {
909 Some(ValueOrHandler::Handler(Handler::new_attached(
910 c.clone(),
911 self.clone(),
912 )))
913 } else {
914 Some(ValueOrHandler::Value(value))
915 }
916 }
917
918 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
920 let path = str_to_path(path)?;
921 self.get_by_path(&path)
922 }
923
924 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
925 let arena = &self.arena;
926 let txn = self.txn.lock();
927 let txn = txn.as_ref()?;
928 let ops_ = txn.local_ops();
929 let new_id = ID {
930 peer: *txn.peer(),
931 counter: ops_.first()?.counter,
932 };
933 let change = ChangeRef {
934 id: &new_id,
935 deps: txn.frontiers(),
936 timestamp: &txn
937 .timestamp()
938 .as_ref()
939 .copied()
940 .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
941 commit_msg: txn.msg(),
942 ops: ops_,
943 lamport: txn.lamport(),
944 };
945 let json = encode_change_to_json(change, arena);
946 Some(json)
947 }
948
949 #[inline]
950 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
951 if self.has_container(&id) {
952 Some(Handler::new_attached(id, self.clone()))
953 } else {
954 None
955 }
956 }
957
958 #[inline]
961 pub fn try_get_text<I: IntoContainerId>(&self, id: I) -> Option<TextHandler> {
962 let id = id.into_container_id(&self.arena, ContainerType::Text);
963 if !self.has_container(&id) {
964 return None;
965 }
966 Handler::new_attached(id, self.clone()).into_text().ok()
967 }
968
969 #[inline]
972 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
973 self.try_get_text(id)
974 .expect("The container does not exist in the document. Use `try_get_text` or `get_container` to check for existence.")
975 }
976
977 #[inline]
980 pub fn try_get_list<I: IntoContainerId>(&self, id: I) -> Option<ListHandler> {
981 let id = id.into_container_id(&self.arena, ContainerType::List);
982 if !self.has_container(&id) {
983 return None;
984 }
985 Handler::new_attached(id, self.clone()).into_list().ok()
986 }
987
988 #[inline]
991 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
992 self.try_get_list(id)
993 .expect("The container does not exist in the document. Use `try_get_list` or `get_container` to check for existence.")
994 }
995
996 #[inline]
999 pub fn try_get_movable_list<I: IntoContainerId>(&self, id: I) -> Option<MovableListHandler> {
1000 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
1001 if !self.has_container(&id) {
1002 return None;
1003 }
1004 Handler::new_attached(id, self.clone())
1005 .into_movable_list()
1006 .ok()
1007 }
1008
1009 #[inline]
1012 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
1013 self.try_get_movable_list(id)
1014 .expect("The container does not exist in the document. Use `try_get_movable_list` or `get_container` to check for existence.")
1015 }
1016
1017 #[inline]
1020 pub fn try_get_map<I: IntoContainerId>(&self, id: I) -> Option<MapHandler> {
1021 let id = id.into_container_id(&self.arena, ContainerType::Map);
1022 if !self.has_container(&id) {
1023 return None;
1024 }
1025 Handler::new_attached(id, self.clone()).into_map().ok()
1026 }
1027
1028 #[inline]
1031 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
1032 self.try_get_map(id)
1033 .expect("The container does not exist in the document. Use `try_get_map` or `get_container` to check for existence.")
1034 }
1035
1036 #[inline]
1039 pub fn try_get_tree<I: IntoContainerId>(&self, id: I) -> Option<TreeHandler> {
1040 let id = id.into_container_id(&self.arena, ContainerType::Tree);
1041 if !self.has_container(&id) {
1042 return None;
1043 }
1044 Handler::new_attached(id, self.clone()).into_tree().ok()
1045 }
1046
1047 #[inline]
1050 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
1051 self.try_get_tree(id)
1052 .expect("The container does not exist in the document. Use `try_get_tree` or `get_container` to check for existence.")
1053 }
1054
1055 #[cfg(feature = "counter")]
1056 pub fn try_get_counter<I: IntoContainerId>(
1057 &self,
1058 id: I,
1059 ) -> Option<crate::handler::counter::CounterHandler> {
1060 let id = id.into_container_id(&self.arena, ContainerType::Counter);
1061 if !self.has_container(&id) {
1062 return None;
1063 }
1064 Handler::new_attached(id, self.clone()).into_counter().ok()
1065 }
1066
1067 #[cfg(feature = "counter")]
1068 pub fn get_counter<I: IntoContainerId>(
1069 &self,
1070 id: I,
1071 ) -> crate::handler::counter::CounterHandler {
1072 self.try_get_counter(id)
1073 .expect("The container does not exist in the document. Use `try_get_counter` or `get_container` to check for existence.")
1074 }
1075
1076 #[must_use]
1077 pub fn has_container(&self, id: &ContainerID) -> bool {
1078 if id.is_root() {
1079 return true;
1080 }
1081
1082 let exist = self.state.lock().does_container_exist(id);
1083 exist
1084 }
1085
1086 #[instrument(level = "info", skip_all)]
1100 pub fn undo_internal(
1101 &self,
1102 id_span: IdSpan,
1103 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1104 post_transform_base: Option<&DiffBatch>,
1105 before_diff: &mut dyn FnMut(&DiffBatch),
1106 ) -> LoroResult<CommitWhenDrop<'_>> {
1107 if !self.can_edit() {
1108 return Err(LoroError::EditWhenDetached);
1109 }
1110
1111 let (options, txn) = self.implicit_commit_then_stop();
1112 if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
1113 self.renew_txn_if_auto_commit(options);
1114 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
1115 }
1116
1117 let (was_recording, latest_frontiers) = {
1118 let mut state = self.state.lock();
1119 let was_recording = state.is_recording();
1120 state.stop_and_clear_recording();
1121 (was_recording, state.frontiers.clone())
1122 };
1123
1124 let spans = self.oplog.lock().split_span_based_on_deps(id_span);
1125 let diff = crate::undo::undo(
1126 spans,
1127 match post_transform_base {
1128 Some(d) => Either::Right(d),
1129 None => Either::Left(&latest_frontiers),
1130 },
1131 |from, to| {
1132 self._checkout_without_emitting(from, false, false).unwrap();
1133 self.state.lock().start_recording();
1134 self._checkout_without_emitting(to, false, false).unwrap();
1135 let mut state = self.state.lock();
1136 let e = state.take_events();
1137 state.stop_and_clear_recording();
1138 DiffBatch::new(e)
1139 },
1140 before_diff,
1141 );
1142
1143 self._checkout_without_emitting(&latest_frontiers, false, false)?;
1147 self.set_detached(false);
1148 if was_recording {
1149 self.state.lock().start_recording();
1150 }
1151 drop(txn);
1152 self.start_auto_commit();
1153 if let Err(e) = self._apply_diff(diff, container_remap, true) {
1157 warn!("Undo Failed {:?}", e);
1158 }
1159
1160 if let Some(options) = options {
1161 self.set_next_commit_options(options);
1162 }
1163 Ok(CommitWhenDrop {
1164 doc: self,
1165 default_options: CommitOptions::new().origin("undo"),
1166 })
1167 }
1168
1169 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1175 let f = self.state_frontiers();
1178 let diff = self.diff(&f, target)?;
1179 self._apply_diff(diff, &mut Default::default(), false)
1180 }
1181
1182 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1187 {
1188 let oplog = self.oplog.lock();
1191 let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1192 for id in frontiers.iter() {
1193 if !oplog.dag.contains(id) {
1194 return Err(LoroError::FrontiersNotFound(id));
1195 }
1196 }
1197
1198 if oplog.dag.is_before_shallow_root(frontiers) {
1199 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1200 }
1201
1202 Ok(())
1203 };
1204
1205 validate_frontiers(a)?;
1206 validate_frontiers(b)?;
1207 }
1208
1209 let (options, txn) = self.implicit_commit_then_stop();
1210 let was_detached = self.is_detached();
1211 let old_frontiers = self.state_frontiers();
1212 let was_recording = {
1213 let mut state = self.state.lock();
1214 let is_recording = state.is_recording();
1215 state.stop_and_clear_recording();
1216 is_recording
1217 };
1218 let result = (|| {
1219 self._checkout_without_emitting(a, true, false)?;
1220 self.state.lock().start_recording();
1221 self._checkout_without_emitting(b, true, false)?;
1222 let mut state = self.state.lock();
1223 let e = state.take_events();
1224 state.stop_and_clear_recording();
1225 Ok::<_, LoroError>(e)
1226 })();
1227
1228 self._checkout_without_emitting(&old_frontiers, false, false)
1230 .unwrap();
1231 drop(txn);
1232 if !was_detached {
1233 self.set_detached(false);
1234 self.renew_txn_if_auto_commit(options);
1235 }
1236 if was_recording {
1237 self.state.lock().start_recording();
1238 }
1239 result.map(DiffBatch::new)
1240 }
1241
1242 #[inline(always)]
1244 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1245 self._apply_diff(diff, &mut Default::default(), true)
1246 }
1247
1248 pub(crate) fn _apply_diff(
1260 &self,
1261 diff: DiffBatch,
1262 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1263 skip_unreachable: bool,
1264 ) -> LoroResult<()> {
1265 if !self.can_edit() {
1266 return Err(LoroError::EditWhenDetached);
1267 }
1268
1269 let mut ans: LoroResult<()> = Ok(());
1270 let mut missing_containers: Vec<ContainerID> = Vec::new();
1271 for (mut id, diff) in diff.into_iter() {
1272 let mut remapped = false;
1273 while let Some(rid) = container_remap.get(&id) {
1274 remapped = true;
1275 id = rid.clone();
1276 }
1277
1278 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1279 let exists = self.state.lock().does_container_exist(&id);
1281 if !exists {
1282 missing_containers.push(id);
1283 continue;
1284 }
1285 self.state.lock().ensure_container(&id);
1287 }
1288
1289 if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1290 continue;
1291 }
1292
1293 let Some(h) = self.get_handler(id.clone()) else {
1294 return Err(LoroError::ContainersNotFound {
1295 containers: Box::new(vec![id]),
1296 });
1297 };
1298 if let Err(e) = h.apply_diff(diff, container_remap) {
1299 ans = Err(e);
1300 }
1301 }
1302
1303 if !missing_containers.is_empty() {
1304 return Err(LoroError::ContainersNotFound {
1305 containers: Box::new(missing_containers),
1306 });
1307 }
1308
1309 ans
1310 }
1311
1312 #[inline]
1314 pub fn diagnose_size(&self) {
1315 self.oplog().lock().diagnose_size();
1316 }
1317
1318 #[inline]
1319 pub fn oplog_frontiers(&self) -> Frontiers {
1320 self.oplog().lock().frontiers().clone()
1321 }
1322
1323 #[inline]
1324 pub fn state_frontiers(&self) -> Frontiers {
1325 self.state.lock().frontiers.clone()
1326 }
1327
1328 #[inline]
1332 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1333 self.oplog().lock().cmp_with_frontiers(other)
1334 }
1335
1336 #[inline]
1340 pub fn cmp_frontiers(
1341 &self,
1342 a: &Frontiers,
1343 b: &Frontiers,
1344 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1345 self.oplog().lock().cmp_frontiers(a, b)
1346 }
1347
1348 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1349 let mut state = self.state.lock();
1350 if !state.is_recording() {
1351 state.start_recording();
1352 }
1353
1354 self.observer.subscribe_root(callback)
1355 }
1356
1357 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1358 let mut state = self.state.lock();
1359 if !state.is_recording() {
1360 state.start_recording();
1361 }
1362
1363 self.observer.subscribe(container_id, callback)
1364 }
1365
1366 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1367 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1368 activate();
1369 sub
1370 }
1371
1372 #[tracing::instrument(skip_all)]
1374 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1375 if bytes.is_empty() {
1376 return Ok(ImportStatus::default());
1377 }
1378
1379 if bytes.len() == 1 {
1380 return self.import(&bytes[0]);
1381 }
1382
1383 let mut success = VersionRange::default();
1384 let mut meta_arr = bytes
1385 .iter()
1386 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1387 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1388 meta_arr.sort_by(|a, b| {
1389 a.0.mode
1390 .cmp(&b.0.mode)
1391 .then(b.0.change_num.cmp(&a.0.change_num))
1392 });
1393
1394 let (options, txn) = self.implicit_commit_then_stop();
1395 let is_detached = self.is_detached();
1419 self.set_detached(true);
1420 self.oplog.lock().batch_importing = true;
1421 let mut err = None;
1422 for (_meta, data) in meta_arr {
1423 match self._import_with(data, Default::default()) {
1424 Ok(s) => {
1425 for (peer, (start, end)) in s.success.iter() {
1426 match success.0.entry(*peer) {
1427 Entry::Occupied(mut e) => {
1428 e.get_mut().1 = *end.max(&e.get().1);
1429 }
1430 Entry::Vacant(e) => {
1431 e.insert((*start, *end));
1432 }
1433 }
1434 }
1435 }
1436 Err(e) => {
1437 err = Some(e);
1438 }
1439 }
1440 }
1441
1442 let mut oplog = self.oplog.lock();
1443 oplog.batch_importing = false;
1444 let pending = oplog.pending_changes.version_range();
1445 drop(oplog);
1446 if !is_detached {
1447 self._checkout_to_latest_with_guard(txn);
1448 } else {
1449 drop(txn);
1450 }
1451
1452 self.renew_txn_if_auto_commit(options);
1453 if let Some(err) = err {
1454 return Err(err);
1455 }
1456
1457 Ok(ImportStatus {
1458 success,
1459 pending: if pending.is_empty() {
1460 None
1461 } else {
1462 Some(pending)
1463 },
1464 })
1465 }
1466
1467 #[inline]
1469 pub fn get_value(&self) -> LoroValue {
1470 self.state.lock().get_value()
1471 }
1472
1473 #[inline]
1475 pub fn get_deep_value(&self) -> LoroValue {
1476 self.state.lock().get_deep_value()
1477 }
1478
1479 #[inline]
1481 pub fn get_deep_value_with_id(&self) -> LoroValue {
1482 self.state.lock().get_deep_value_with_id()
1483 }
1484
1485 pub fn checkout_to_latest(&self) {
1486 let (options, _guard) = self.implicit_commit_then_stop();
1487 if !self.is_detached() {
1488 drop(_guard);
1489 self.renew_txn_if_auto_commit(options);
1490 return;
1491 }
1492
1493 self._checkout_to_latest_without_commit(true)
1494 .expect("checkout to oplog frontiers should succeed");
1495 self.emit_events();
1496 drop(_guard);
1497 self.renew_txn_if_auto_commit(options);
1498 }
1499
1500 fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1501 if !self.is_detached() {
1502 self._renew_txn_if_auto_commit_with_guard(None, guard);
1503 return;
1504 }
1505
1506 self._checkout_to_latest_without_commit(true)
1507 .expect("checkout to oplog frontiers should succeed");
1508 self._renew_txn_if_auto_commit_with_guard(None, guard);
1509 }
1510
1511 pub(crate) fn _checkout_to_latest_without_commit(
1513 &self,
1514 to_commit_then_renew: bool,
1515 ) -> LoroResult<()> {
1516 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1517 let f = self.oplog_frontiers();
1518 let this = &self;
1519 let frontiers = &f;
1520 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)?;
1521 this.emit_events();
1523 if this.config.detached_editing() {
1524 this.renew_peer_id();
1525 }
1526
1527 self.set_detached(false);
1528 Ok(())
1529 })
1530 }
1531
1532 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1537 let was_detached = self.is_detached();
1538 let (options, guard) = self.implicit_commit_then_stop();
1539 let result = self._checkout_without_emitting(frontiers, true, true);
1540 if result.is_ok() {
1541 self.emit_events();
1542 }
1543 drop(guard);
1544 if self.config.detached_editing() {
1545 if result.is_ok() {
1546 self.renew_peer_id();
1547 }
1548 self.renew_txn_if_auto_commit(options);
1549 } else if result.is_err() {
1550 if !was_detached {
1551 self.renew_txn_if_auto_commit(options);
1552 }
1553 } else if !self.is_detached() {
1554 self.renew_txn_if_auto_commit(options);
1555 }
1556
1557 result
1558 }
1559
1560 #[instrument(level = "info", skip(self))]
1562 pub(crate) fn _checkout_without_emitting(
1563 &self,
1564 frontiers: &Frontiers,
1565 to_shrink_frontiers: bool,
1566 to_commit_then_renew: bool,
1567 ) -> Result<(), LoroError> {
1568 if !self.txn.is_locked() {
1569 return Err(LoroError::TransactionError(
1570 "checkout requires the transaction mutex to be held"
1571 .to_string()
1572 .into_boxed_str(),
1573 ));
1574 }
1575 let from_frontiers = self.state_frontiers();
1576 loro_common::info!(
1577 "checkout from={:?} to={:?} cur_vv={:?}",
1578 from_frontiers,
1579 frontiers,
1580 self.oplog_vv()
1581 );
1582
1583 if &from_frontiers == frontiers {
1584 return Ok(());
1585 }
1586
1587 let oplog = self.oplog.lock();
1588 if oplog.dag.is_before_shallow_root(frontiers) {
1589 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1590 }
1591
1592 let frontiers = if to_shrink_frontiers {
1593 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1594 } else {
1595 frontiers.clone()
1596 };
1597
1598 if from_frontiers == frontiers {
1599 return Ok(());
1600 }
1601
1602 let mut state = self.state.lock();
1603 let mut calc = self.diff_calculator.lock();
1604 for i in frontiers.iter() {
1605 if !oplog.dag.contains(i) {
1606 return Err(LoroError::FrontiersNotFound(i));
1607 }
1608 }
1609
1610 let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1611 LoroError::NotFoundError(
1612 format!(
1613 "Cannot find the current state version {:?}",
1614 state.frontiers
1615 )
1616 .into_boxed_str(),
1617 )
1618 })?;
1619 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1620 return Err(LoroError::NotFoundError(
1621 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1622 ));
1623 };
1624
1625 self.set_detached(true);
1626 let (diff, diff_mode) =
1627 calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1628 state.apply_diff(
1629 InternalDocDiff {
1630 origin: "checkout".into(),
1631 diff: Cow::Owned(diff),
1632 by: EventTriggerKind::Checkout,
1633 new_version: Cow::Owned(frontiers.clone()),
1634 },
1635 diff_mode,
1636 )?;
1637
1638 Ok(())
1639 }
1640
1641 #[inline]
1642 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1643 self.oplog.lock().dag.vv_to_frontiers(vv)
1644 }
1645
1646 #[inline]
1647 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1648 self.oplog.lock().dag.frontiers_to_vv(frontiers)
1649 }
1650
1651 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1655 let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1656 self.import(&updates)
1657 }
1658
1659 pub(crate) fn arena(&self) -> &SharedArena {
1660 &self.arena
1661 }
1662
1663 #[inline]
1664 pub fn len_ops(&self) -> usize {
1665 if self.oplog.can_lock_in_this_thread() {
1666 return self.oplog.lock().visible_op_count_exact();
1667 }
1668
1669 self.visible_op_count.load(Acquire)
1670 }
1671
1672 #[inline]
1673 pub fn len_changes(&self) -> usize {
1674 let oplog = self.oplog.lock();
1675 oplog.len_changes()
1676 }
1677
1678 pub fn config(&self) -> &Configure {
1679 &self.config
1680 }
1681
1682 pub fn check_state_diff_calc_consistency_slow(&self) {
1687 {
1689 static IS_CHECKING: std::sync::atomic::AtomicBool =
1690 std::sync::atomic::AtomicBool::new(false);
1691 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1692 return;
1693 }
1694
1695 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1696 let peer_id = self.peer_id();
1697 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1698 let _g = s.enter();
1699 let options = self.implicit_commit_then_stop().0;
1700 self.oplog.lock().check_dag_correctness();
1701 if self.is_shallow() {
1702 let initial_snapshot = self
1713 .export(ExportMode::state_only(Some(
1714 &self.shallow_since_frontiers(),
1715 )))
1716 .unwrap();
1717
1718 let doc = LoroDoc::new();
1720 doc.import(&initial_snapshot).unwrap();
1721 self.checkout(&self.shallow_since_frontiers()).unwrap();
1722 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1723
1724 let updates = self.export(ExportMode::all_updates()).unwrap();
1726
1727 doc.import(&updates).unwrap();
1729 self.checkout_to_latest();
1730
1731 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1734 let mut calculated_state = doc.app_state().lock();
1735 let mut current_state = self.app_state().lock();
1736 current_state.check_is_the_same(&mut calculated_state);
1737 } else {
1738 let f = self.state_frontiers();
1739 let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1740 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1741 let doc = Self::new();
1742 doc.import(&bytes).unwrap();
1743 let mut calculated_state = doc.app_state().lock();
1744 let mut current_state = self.app_state().lock();
1745 current_state.check_is_the_same(&mut calculated_state);
1746 }
1747
1748 self.renew_txn_if_auto_commit(options);
1749 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1750 }
1751 }
1752
1753 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1754 self.query_pos_internal(pos, true)
1755 }
1756
1757 pub(crate) fn query_pos_internal(
1759 &self,
1760 pos: &Cursor,
1761 ret_event_index: bool,
1762 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1763 if !self.has_container(&pos.container) {
1764 return Err(CannotFindRelativePosition::IdNotFound);
1765 }
1766
1767 let mut state = self.state.lock();
1768 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1769 Ok(PosQueryResult {
1770 update: None,
1771 current: AbsolutePosition {
1772 pos: ans,
1773 side: pos.side,
1774 },
1775 })
1776 } else {
1777 drop(state);
1789 let result = self.with_barrier(|| {
1790 let oplog = self.oplog().lock();
1791 if let Some(id) = pos.id {
1793 if oplog.arena.id_to_idx(&pos.container).is_none() {
1795 let mut s = self.state.lock();
1796 if !s.does_container_exist(&pos.container) {
1797 return Err(CannotFindRelativePosition::ContainerDeleted);
1798 }
1799 s.ensure_container(&pos.container);
1800 drop(s);
1801 }
1802 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1803 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1805 if oplog.shallow_since_vv().includes_id(id) {
1806 return Err(CannotFindRelativePosition::HistoryCleared);
1807 }
1808
1809 tracing::error!("Cannot find id {}", id);
1810 return Err(CannotFindRelativePosition::IdNotFound);
1811 };
1812 let mut diff_calc = DiffCalculator::new(true);
1814 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1815 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1816 diff_calc.calc_diff_internal(
1818 &oplog,
1819 before,
1820 &before_frontiers,
1821 oplog.vv(),
1822 oplog.frontiers(),
1823 Some(&|target| idx == target),
1824 );
1825 let depth = self.arena.get_depth(idx);
1827 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1828 match diff_calc {
1829 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1830 let c = text.get_id_latest_pos(id).unwrap();
1831 let new_pos = c.pos;
1832 let handler = self.get_text(&pos.container);
1833 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1834 Ok(PosQueryResult {
1835 update: handler.get_cursor(current_pos, c.side),
1836 current: AbsolutePosition {
1837 pos: current_pos,
1838 side: c.side,
1839 },
1840 })
1841 }
1842 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1843 let c = list.get_id_latest_pos(id).unwrap();
1844 let new_pos = c.pos;
1845 let handler = self.get_list(&pos.container);
1846 Ok(PosQueryResult {
1847 update: handler.get_cursor(new_pos, c.side),
1848 current: AbsolutePosition {
1849 pos: new_pos,
1850 side: c.side,
1851 },
1852 })
1853 }
1854 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1855 let c = list.get_id_latest_pos(id).unwrap();
1856 let new_pos = c.pos;
1857 let handler = self.get_movable_list(&pos.container);
1858 let new_pos = handler.op_pos_to_user_pos(new_pos);
1859 Ok(PosQueryResult {
1860 update: handler.get_cursor(new_pos, c.side),
1861 current: AbsolutePosition {
1862 pos: new_pos,
1863 side: c.side,
1864 },
1865 })
1866 }
1867 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1868 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1869 #[cfg(feature = "counter")]
1870 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1871 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1872 }
1873 } else {
1874 match pos.container.container_type() {
1875 ContainerType::Text => {
1876 let text = self.get_text(&pos.container);
1877 Ok(PosQueryResult {
1878 update: Some(Cursor {
1879 id: None,
1880 container: text.id(),
1881 side: pos.side,
1882 origin_pos: text.len_unicode(),
1883 }),
1884 current: AbsolutePosition {
1885 pos: text.len_event(),
1886 side: pos.side,
1887 },
1888 })
1889 }
1890 ContainerType::List => {
1891 let list = self.get_list(&pos.container);
1892 Ok(PosQueryResult {
1893 update: Some(Cursor {
1894 id: None,
1895 container: list.id(),
1896 side: pos.side,
1897 origin_pos: list.len(),
1898 }),
1899 current: AbsolutePosition {
1900 pos: list.len(),
1901 side: pos.side,
1902 },
1903 })
1904 }
1905 ContainerType::MovableList => {
1906 let list = self.get_movable_list(&pos.container);
1907 Ok(PosQueryResult {
1908 update: Some(Cursor {
1909 id: None,
1910 container: list.id(),
1911 side: pos.side,
1912 origin_pos: list.len(),
1913 }),
1914 current: AbsolutePosition {
1915 pos: list.len(),
1916 side: pos.side,
1917 },
1918 })
1919 }
1920 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1921 unreachable!()
1922 }
1923 #[cfg(feature = "counter")]
1924 ContainerType::Counter => unreachable!(),
1925 }
1926 }
1927 });
1928 result
1929 }
1930 }
1931
1932 pub fn free_history_cache(&self) {
1937 self.oplog.lock().free_history_cache();
1938 }
1939
1940 pub fn free_diff_calculator(&self) {
1942 *self.diff_calculator.lock() = DiffCalculator::new(true);
1943 }
1944
1945 pub fn has_history_cache(&self) -> bool {
1948 self.oplog.lock().has_history_cache()
1949 }
1950
1951 #[inline]
1955 pub fn compact_change_store(&self) {
1956 self.with_barrier(|| {
1957 self.oplog.lock().compact_change_store();
1958 });
1959 }
1960
1961 #[inline]
1965 pub fn analyze(&self) -> DocAnalysis {
1966 DocAnalysis::analyze(self)
1967 }
1968
1969 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1971 let mut state = self.state.lock();
1972 if state.arena.id_to_idx(id).is_none() {
1973 if !state.does_container_exist(id) {
1974 return None;
1975 }
1976 state.ensure_container(id);
1977 }
1978 let idx = state.arena.id_to_idx(id).unwrap();
1979 state.get_path(idx)
1980 }
1981
1982 #[instrument(skip(self))]
1983 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1984 self.with_barrier(|| {
1985 let ans = match mode {
1986 ExportMode::Snapshot => export_fast_snapshot(self),
1987 ExportMode::Updates { from } => export_fast_updates(self, &from),
1988 ExportMode::UpdatesInRange { spans } => {
1989 export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
1990 }
1991 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1992 ExportMode::StateOnly(f) => match f {
1993 Some(f) => export_state_only_snapshot(self, &f)?,
1994 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1995 },
1996 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1997 };
1998 Ok(ans)
1999 })
2000 }
2001
2002 pub fn shallow_since_vv(&self) -> ImVersionVector {
2008 self.oplog().lock().shallow_since_vv().clone()
2009 }
2010
2011 pub fn shallow_since_frontiers(&self) -> Frontiers {
2012 self.oplog().lock().shallow_since_frontiers().clone()
2013 }
2014
2015 pub fn is_shallow(&self) -> bool {
2017 !self.oplog().lock().shallow_since_vv().is_empty()
2018 }
2019
2020 pub fn get_pending_txn_len(&self) -> usize {
2025 if let Some(txn) = self.txn.lock().as_ref() {
2026 txn.len()
2027 } else {
2028 0
2029 }
2030 }
2031
2032 #[inline]
2033 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
2034 self.oplog().lock().dag.find_path(from, to)
2035 }
2036
2037 pub fn subscribe_first_commit_from_peer(
2043 &self,
2044 callback: FirstCommitFromPeerCallback,
2045 ) -> Subscription {
2046 let (s, enable) = self
2047 .first_commit_from_peer_subs
2048 .inner()
2049 .insert((), callback);
2050 enable();
2051 s
2052 }
2053
2054 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
2059 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
2060 enable();
2061 s
2062 }
2063}
2064
2065#[derive(Debug, thiserror::Error)]
2066pub enum ChangeTravelError {
2067 #[error("Target id not found {0:?}")]
2068 TargetIdNotFound(ID),
2069 #[error("The shallow history of the doc doesn't include the target version")]
2070 TargetVersionNotIncluded,
2071}
2072
2073impl LoroDoc {
2074 pub fn travel_change_ancestors(
2075 &self,
2076 ids: &[ID],
2077 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
2078 ) -> Result<(), ChangeTravelError> {
2079 let (options, guard) = self.implicit_commit_then_stop();
2080 drop(guard);
2081 struct PendingNode(ChangeMeta);
2082 impl PartialEq for PendingNode {
2083 fn eq(&self, other: &Self) -> bool {
2084 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
2085 }
2086 }
2087
2088 impl Eq for PendingNode {}
2089 impl PartialOrd for PendingNode {
2090 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2091 Some(self.cmp(other))
2092 }
2093 }
2094
2095 impl Ord for PendingNode {
2096 fn cmp(&self, other: &Self) -> Ordering {
2097 self.0
2098 .lamport_last()
2099 .cmp(&other.0.lamport_last())
2100 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
2101 }
2102 }
2103
2104 for id in ids {
2105 let op_log = &self.oplog().lock();
2106 if !op_log.vv().includes_id(*id) {
2107 return Err(ChangeTravelError::TargetIdNotFound(*id));
2108 }
2109 if op_log.dag.shallow_since_vv().includes_id(*id) {
2110 return Err(ChangeTravelError::TargetVersionNotIncluded);
2111 }
2112 }
2113
2114 let mut visited = FxHashSet::default();
2115 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
2116 for id in ids {
2117 pending.push(PendingNode(ChangeMeta::from_change(
2118 &self.oplog().lock().get_change_at(*id).unwrap(),
2119 )));
2120 }
2121 while let Some(PendingNode(node)) = pending.pop() {
2122 let deps = node.deps.clone();
2123 if f(node).is_break() {
2124 break;
2125 }
2126
2127 for dep in deps.iter() {
2128 let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
2129 continue;
2130 };
2131 if visited.contains(&dep_node.id) {
2132 continue;
2133 }
2134
2135 visited.insert(dep_node.id);
2136 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
2137 }
2138 }
2139
2140 let ans = Ok(());
2141 self.renew_txn_if_auto_commit(options);
2142 ans
2143 }
2144
2145 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
2146 self.with_barrier(|| {
2147 let mut set = FxHashSet::default();
2148 {
2149 let oplog = self.oplog().lock();
2150 for op in oplog.iter_ops(id.to_span(len)) {
2151 let id = oplog.arena.get_container_id(op.container()).unwrap();
2152 set.insert(id);
2153 }
2154 }
2155 set
2156 })
2157 }
2158
2159 pub fn delete_root_container(&self, cid: ContainerID) {
2160 if !cid.is_root() {
2161 return;
2162 }
2163
2164 if !self.has_container(&cid) {
2166 return;
2167 }
2168
2169 let Some(h) = self.get_handler(cid.clone()) else {
2170 return;
2171 };
2172
2173 if let Err(e) = h.clear() {
2174 eprintln!("Failed to clear handler: {:?}", e);
2175 return;
2176 }
2177 self.config.deleted_root_containers.lock().insert(cid);
2178 }
2179
2180 pub fn set_hide_empty_root_containers(&self, hide: bool) {
2181 self.config
2182 .hide_empty_root_containers
2183 .store(hide, std::sync::atomic::Ordering::Relaxed);
2184 }
2185}
2186
2187fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2189 let start_vv = oplog
2190 .dag
2191 .frontiers_to_vv(&id.into())
2192 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2193 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
2194 for op in change.ops.iter().rev() {
2195 if op.container != idx {
2196 continue;
2197 }
2198 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2199 if d.id_start.to_span(d.atom_len()).contains(id) {
2200 return Some(ID::new(change.peer(), op.counter));
2201 }
2202 }
2203 }
2204 }
2205
2206 None
2207}
2208
2209#[derive(Debug)]
2210pub struct CommitWhenDrop<'a> {
2211 doc: &'a LoroDoc,
2212 default_options: CommitOptions,
2213}
2214
2215impl Drop for CommitWhenDrop<'_> {
2216 fn drop(&mut self) {
2217 {
2218 let mut guard = self.doc.txn.lock();
2219 if let Some(txn) = guard.as_mut() {
2220 txn.set_default_options(std::mem::take(&mut self.default_options));
2221 };
2222 }
2223
2224 self.doc.commit_then_renew();
2225 }
2226}
2227
2228#[derive(Debug, Clone)]
2230pub struct CommitOptions {
2231 pub origin: Option<InternalString>,
2234
2235 pub immediate_renew: bool,
2238
2239 pub timestamp: Option<Timestamp>,
2242
2243 pub commit_msg: Option<Arc<str>>,
2245}
2246
2247impl CommitOptions {
2248 pub fn new() -> Self {
2250 Self {
2251 origin: None,
2252 immediate_renew: true,
2253 timestamp: None,
2254 commit_msg: None,
2255 }
2256 }
2257
2258 pub fn origin(mut self, origin: &str) -> Self {
2260 self.origin = Some(origin.into());
2261 self
2262 }
2263
2264 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2266 self.immediate_renew = immediate_renew;
2267 self
2268 }
2269
2270 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2274 self.timestamp = Some(timestamp);
2275 self
2276 }
2277
2278 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2280 self.commit_msg = Some(commit_msg.into());
2281 self
2282 }
2283
2284 pub fn set_origin(&mut self, origin: Option<&str>) {
2286 self.origin = origin.map(|x| x.into())
2287 }
2288
2289 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2291 self.timestamp = timestamp;
2292 }
2293}
2294
2295impl Default for CommitOptions {
2296 fn default() -> Self {
2297 Self::new()
2298 }
2299}
2300
2301#[cfg(test)]
2302mod test {
2303 use std::{
2304 panic::AssertUnwindSafe,
2305 sync::{
2306 atomic::{AtomicUsize, Ordering},
2307 Arc,
2308 },
2309 };
2310
2311 use crate::{
2312 cursor::PosType,
2313 encoding::json_schema::json::{JsonOpContent, JsonSchema, ListOp},
2314 encoding::{fast_snapshot::EMPTY_MARK, EncodeMode},
2315 loro::ExportMode,
2316 version::{Frontiers, VersionVector},
2317 LoroDoc, ToJson, TreeParentId,
2318 };
2319 use bytes::{BufMut, Bytes};
2320 use loro_common::ID;
2321 use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
2322
2323 const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
2324
2325 fn encode_import_blob(mode: EncodeMode, body: &[u8]) -> Vec<u8> {
2326 let mut ans = Vec::new();
2327 ans.extend_from_slice(b"loro");
2328 ans.extend_from_slice(&[0; 16]);
2329 ans.extend_from_slice(&mode.to_bytes());
2330 ans.extend_from_slice(body);
2331 let checksum = xxhash_rust::xxh32::xxh32(&ans[20..], XXH_SEED);
2332 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
2333 ans
2334 }
2335
2336 fn encode_fast_snapshot_import(oplog_bytes: &[u8]) -> Vec<u8> {
2337 let mut body = Vec::new();
2338 body.put_u32_le(oplog_bytes.len() as u32);
2339 body.extend_from_slice(oplog_bytes);
2340 body.put_u32_le(EMPTY_MARK.len() as u32);
2341 body.extend_from_slice(EMPTY_MARK);
2342 body.put_u32_le(0);
2343 encode_import_blob(EncodeMode::FastSnapshot, &body)
2344 }
2345
2346 fn sstable_with_huge_meta_block_count() -> Vec<u8> {
2347 let mut bytes = Vec::new();
2348 bytes.extend_from_slice(b"LORO");
2349 bytes.push(0);
2350 bytes.put_u32_le(10_000_000);
2351 bytes.put_u32_le(xxhash_rust::xxh32::xxh32(&[], XXH_SEED));
2352 bytes.put_u32_le(5);
2353 bytes
2354 }
2355
2356 fn snapshot_oplog_with_malformed_block() -> Vec<u8> {
2357 let peer = 1;
2358 let id = ID::new(peer, 0);
2359 let vv = VersionVector::from_iter([(peer, 1)]);
2360 let frontiers = Frontiers::from_id(id);
2361 let mut store = MemKvStore::new(MemKvConfig::default());
2362 store.set(b"vv", vv.encode().into());
2363 store.set(b"fr", frontiers.encode().into());
2364 store.set(&id.to_bytes(), Bytes::from_static(&[0]));
2365 store.export_all().to_vec()
2366 }
2367
2368 fn make_json_import_stress_doc(peer: u64) -> LoroDoc {
2369 let doc = LoroDoc::new_auto_commit();
2370 doc.set_peer_id(peer).unwrap();
2371
2372 let text = doc.get_text("text");
2373 let mut text_pos = 0;
2374 for i in 0..32 {
2375 let chunk = format!("segment-{i}-abcdefghijklmnopqrstuvwxyz;");
2376 text.insert_unicode(text_pos, &chunk).unwrap();
2377 text_pos += chunk.chars().count();
2378 }
2379
2380 let list = doc.get_list("list");
2381 for i in 0..32 {
2382 list.insert(i, format!("item-{i}")).unwrap();
2383 }
2384
2385 let map = doc.get_map("map");
2386 for i in 0..32 {
2387 let key = format!("key-{i}");
2388 map.insert(&key, format!("value-{i}")).unwrap();
2389 }
2390
2391 let tree = doc.get_tree("tree");
2392 let mut parent = TreeParentId::Root;
2393 for i in 0..16 {
2394 let node = tree.create(parent).unwrap();
2395 let meta = tree.get_meta(node).unwrap();
2396 meta.insert("name", format!("node-{i}")).unwrap();
2397 meta.insert("payload", format!("payload-{i}-{}", "x".repeat(16)))
2398 .unwrap();
2399 parent = TreeParentId::Node(node);
2400 }
2401
2402 doc
2403 }
2404
2405 fn make_json_list_update_with_four_ops(peer: u64) -> (LoroDoc, JsonSchema) {
2406 let doc = LoroDoc::new();
2407 doc.set_peer_id(peer).unwrap();
2408 let map = doc.get_map("map");
2409 let list = doc.get_list("list");
2410 let text = doc.get_text("text");
2411
2412 let mut txn = doc.txn().unwrap();
2413 map.insert_with_txn(&mut txn, "prefix", "map-value".into())
2414 .unwrap();
2415 list.insert_with_txn(&mut txn, 0, "seed".into()).unwrap();
2416 text.insert_with_txn(&mut txn, 0, "text-value", PosType::Unicode)
2417 .unwrap();
2418 list.insert_with_txn(&mut txn, 1, "tail".into()).unwrap();
2419 txn.commit().unwrap();
2420
2421 let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv(), false);
2422 assert_eq!(json.changes.len(), 1);
2423 assert_eq!(json.changes[0].ops.len(), 4);
2424 (doc, json)
2425 }
2426
2427 fn move_last_list_insert_far_out_of_bounds(json: &mut JsonSchema) {
2428 let last_change = json.changes.last_mut().unwrap();
2429 let last_op = last_change.ops.last_mut().unwrap();
2430 match &mut last_op.content {
2431 JsonOpContent::List(ListOp::Insert { pos, .. }) => {
2432 *pos = 1_000;
2433 }
2434 other => panic!("expected list insert op, got {other:?}"),
2435 }
2436 }
2437
2438 #[test]
2439 fn test_sync() {
2440 fn is_send_sync<T: Send + Sync>(_v: T) {}
2441 let loro = super::LoroDoc::new();
2442 is_send_sync(loro)
2443 }
2444
2445 #[test]
2446 fn import_rejects_huge_sstable_meta_block_count_without_panic() {
2447 let bytes = encode_fast_snapshot_import(&sstable_with_huge_meta_block_count());
2448
2449 let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2450 assert!(result.is_ok(), "malformed import should not panic");
2451 assert!(result.unwrap().is_err());
2452 }
2453
2454 #[test]
2455 fn import_rejects_malformed_change_block_without_panic() {
2456 let bytes = encode_fast_snapshot_import(&snapshot_oplog_with_malformed_block());
2457
2458 let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2459 assert!(result.is_ok(), "malformed import should not panic");
2460 assert!(result.unwrap().is_err());
2461 }
2462
2463 #[test]
2464 fn failed_import_rolls_back_oplog_and_arena() {
2465 let src = LoroDoc::new();
2466 src.set_peer_id(1).unwrap();
2467 let text = src.get_text("text");
2468 let mut txn = src.txn().unwrap();
2469 text.insert_with_txn(&mut txn, 0, "hello", PosType::Unicode)
2470 .unwrap();
2471 txn.commit().unwrap();
2472 let update = src.export(ExportMode::all_updates()).unwrap();
2473
2474 let dst = LoroDoc::new();
2475 let vv_before_import = dst.oplog_vv();
2476 let state_before_import = dst.get_deep_value();
2477 let err = dst
2478 .import_with(&update, "__loro_fail_import_state_apply".into())
2479 .unwrap_err();
2480 assert!(err.to_string().contains("state apply failpoint"));
2481 assert_eq!(dst.oplog_vv(), vv_before_import);
2482 assert_eq!(dst.get_deep_value(), state_before_import);
2483 assert!(dst.oplog().lock().is_empty());
2484
2485 dst.import(&update).unwrap();
2486 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2487 }
2488
2489 #[test]
2490 fn failed_incremental_import_restores_previous_change_store_block() {
2491 let src = LoroDoc::new();
2492 src.set_peer_id(1).unwrap();
2493 let text = src.get_text("text");
2494 let mut txn = src.txn().unwrap();
2495 text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2496 .unwrap();
2497 txn.commit().unwrap();
2498 let first_update = src.export(ExportMode::all_updates()).unwrap();
2499 let first_vv = src.oplog_vv();
2500
2501 let mut txn = src.txn().unwrap();
2502 text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2503 .unwrap();
2504 txn.commit().unwrap();
2505 let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2506
2507 let dst = LoroDoc::new();
2508 dst.import(&first_update).unwrap();
2509 let vv_before_import = dst.oplog_vv();
2510 let state_before_import = dst.get_deep_value();
2511 dst.import_with(&second_update, "__loro_fail_import_state_apply".into())
2512 .unwrap_err();
2513 assert_eq!(dst.oplog_vv(), vv_before_import);
2514 assert_eq!(dst.get_deep_value(), state_before_import);
2515
2516 dst.import(&second_update).unwrap();
2517 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2518 }
2519
2520 #[test]
2521 fn failed_import_json_updates_rolls_back_complex_empty_doc() {
2522 let src = make_json_import_stress_doc(11);
2523 let json = src.export_json_updates(&Default::default(), &src.oplog_vv(), false);
2524
2525 let dst = LoroDoc::new();
2526 let vv_before_import = dst.oplog_vv();
2527 let frontiers_before_import = dst.oplog_frontiers();
2528 let state_before_import = dst.get_deep_value();
2529 for _ in 0..3 {
2530 crate::state::fail_next_import_state_apply_for_test();
2531 let err = dst.import_json_updates(json.clone()).unwrap_err();
2532 assert!(err.to_string().contains("state apply failpoint"));
2533 assert_eq!(dst.oplog_vv(), vv_before_import);
2534 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2535 assert_eq!(dst.get_deep_value(), state_before_import);
2536 assert!(dst.oplog().lock().is_empty());
2537 }
2538
2539 dst.import_json_updates(json).unwrap();
2540 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2541 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2542 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2543 }
2544
2545 #[test]
2546 fn failed_incremental_import_json_updates_restores_previous_change_store_block() {
2547 let src = LoroDoc::new_auto_commit();
2548 src.set_peer_id(12).unwrap();
2549 let text = src.get_text("text");
2550 text.insert_unicode(0, "a").unwrap();
2551 let list = src.get_list("list");
2552 list.push("seed").unwrap();
2553 let map = src.get_map("map");
2554 map.insert("seed", "value").unwrap();
2555 let tree = src.get_tree("tree");
2556 let root = tree.create(TreeParentId::Root).unwrap();
2557 tree.get_meta(root).unwrap().insert("name", "root").unwrap();
2558
2559 let first_vv = src.oplog_vv();
2560 let first_json = src.export_json_updates(&Default::default(), &first_vv, false);
2561
2562 let mut text_pos = text.len_unicode();
2563 for i in 0..64 {
2564 let chunk = format!("chunk-{i};");
2565 text.insert_unicode(text_pos, &chunk).unwrap();
2566 text_pos += chunk.chars().count();
2567 }
2568 for i in 0..32 {
2569 list.push(format!("after-{i}")).unwrap();
2570 let key = format!("after-{i}");
2571 map.insert(&key, format!("value-{i}")).unwrap();
2572 }
2573 let child = tree.create(TreeParentId::Node(root)).unwrap();
2574 tree.get_meta(child)
2575 .unwrap()
2576 .insert("name", "child")
2577 .unwrap();
2578
2579 let second_json = src.export_json_updates(&first_vv, &src.oplog_vv(), false);
2580
2581 let dst = LoroDoc::new();
2582 dst.import_json_updates(first_json).unwrap();
2583 let vv_before_import = dst.oplog_vv();
2584 let frontiers_before_import = dst.oplog_frontiers();
2585 let state_before_import = dst.get_deep_value();
2586
2587 for _ in 0..2 {
2588 crate::state::fail_next_import_state_apply_for_test();
2589 let err = dst.import_json_updates(second_json.clone()).unwrap_err();
2590 assert!(err.to_string().contains("state apply failpoint"));
2591 assert_eq!(dst.oplog_vv(), vv_before_import);
2592 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2593 assert_eq!(dst.get_deep_value(), state_before_import);
2594 }
2595
2596 dst.import_json_updates(second_json).unwrap();
2597 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2598 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2599 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2600 }
2601
2602 #[test]
2603 fn malformed_later_import_json_update_rolls_back_after_valid_prefix_enters_oplog() {
2604 let peer = 13;
2605 let (src, good_json) = make_json_list_update_with_four_ops(peer);
2606 let mut bad_json = good_json.clone();
2607 move_last_list_insert_far_out_of_bounds(&mut bad_json);
2608
2609 let good_dst = LoroDoc::new();
2610 good_dst.import_json_updates(good_json.clone()).unwrap();
2611 assert_eq!(good_dst.get_deep_value(), src.get_deep_value());
2612
2613 let last_op_counter = good_json.changes[0].ops.last().unwrap().counter;
2614 let prefix_vv = VersionVector::from_iter([(peer, last_op_counter)]);
2615 let prefix_json = src.export_json_updates(&Default::default(), &prefix_vv, false);
2616 assert_eq!(
2617 prefix_json.changes[0].ops.len(),
2618 good_json.changes[0].ops.len() - 1
2619 );
2620 let good_suffix_json = src.export_json_updates(&prefix_vv, &src.oplog_vv(), false);
2621 assert_eq!(good_suffix_json.changes[0].ops.len(), 1);
2622 let mut bad_suffix_json = good_suffix_json.clone();
2623 move_last_list_insert_far_out_of_bounds(&mut bad_suffix_json);
2624
2625 let prefix_dst = LoroDoc::new();
2626 prefix_dst.import_json_updates(prefix_json.clone()).unwrap();
2627 let vv_before_bad_suffix = prefix_dst.oplog_vv();
2628 let frontiers_before_bad_suffix = prefix_dst.oplog_frontiers();
2629 let state_before_bad_suffix = prefix_dst.get_deep_value();
2630
2631 let bad_suffix_json = serde_json::to_string(&bad_suffix_json).unwrap();
2632 let err = prefix_dst
2633 .import_json_updates(&bad_suffix_json)
2634 .unwrap_err();
2635 assert!(
2636 err.to_string().contains("list diff"),
2637 "expected state list bounds validation, got {err:?}"
2638 );
2639 assert_eq!(prefix_dst.oplog_vv(), vv_before_bad_suffix);
2640 assert_eq!(prefix_dst.oplog_frontiers(), frontiers_before_bad_suffix);
2641 assert_eq!(prefix_dst.get_deep_value(), state_before_bad_suffix);
2642
2643 prefix_dst.import_json_updates(good_suffix_json).unwrap();
2644 assert_eq!(prefix_dst.get_deep_value(), src.get_deep_value());
2645 assert_eq!(prefix_dst.oplog_vv(), src.oplog_vv());
2646
2647 let dst = LoroDoc::new();
2648 let vv_before_import = dst.oplog_vv();
2649 let frontiers_before_import = dst.oplog_frontiers();
2650 let state_before_import = dst.get_deep_value();
2651 let bad_json = serde_json::to_string(&bad_json).unwrap();
2652 let err = dst.import_json_updates(&bad_json).unwrap_err();
2653 assert!(
2654 err.to_string().contains("list diff"),
2655 "expected state list bounds validation, got {err:?}"
2656 );
2657 assert_eq!(dst.oplog_vv(), vv_before_import);
2658 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2659 assert_eq!(dst.get_deep_value(), state_before_import);
2660 assert!(dst.oplog().lock().is_empty());
2661 }
2662
2663 #[test]
2664 fn failed_import_restores_pending_changes_that_were_applied_during_import() {
2665 let src = LoroDoc::new();
2666 src.set_peer_id(14).unwrap();
2667 let text = src.get_text("text");
2668
2669 let mut txn = src.txn().unwrap();
2670 text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2671 .unwrap();
2672 txn.commit().unwrap();
2673 let first_update = src.export(ExportMode::all_updates()).unwrap();
2674 let first_vv = src.oplog_vv();
2675
2676 let mut txn = src.txn().unwrap();
2677 text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2678 .unwrap();
2679 txn.commit().unwrap();
2680 let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2681
2682 let dst = LoroDoc::new();
2683 let status = dst.import(&second_update).unwrap();
2684 assert!(status.success.is_empty());
2685 assert!(status.pending.is_some());
2686 let vv_before_dependency = dst.oplog_vv();
2687 let frontiers_before_dependency = dst.oplog_frontiers();
2688 let state_before_dependency = dst.get_deep_value();
2689
2690 crate::state::fail_next_import_state_apply_for_test();
2691 let err = dst.import(&first_update).unwrap_err();
2692 assert!(err.to_string().contains("state apply failpoint"));
2693 assert_eq!(dst.oplog_vv(), vv_before_dependency);
2694 assert_eq!(dst.oplog_frontiers(), frontiers_before_dependency);
2695 assert_eq!(dst.get_deep_value(), state_before_dependency);
2696
2697 dst.import(&first_update).unwrap();
2698 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2699 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2700 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2701 }
2702
2703 #[test]
2704 fn failed_import_json_updates_does_not_emit_or_leave_events() {
2705 let (src, good_json) = make_json_list_update_with_four_ops(15);
2706 let mut bad_json = good_json.clone();
2707 move_last_list_insert_far_out_of_bounds(&mut bad_json);
2708
2709 let dst = LoroDoc::new();
2710 let event_count = Arc::new(AtomicUsize::new(0));
2711 let event_count_cloned = event_count.clone();
2712 let _sub = dst.subscribe_root(Arc::new(move |_| {
2713 event_count_cloned.fetch_add(1, Ordering::SeqCst);
2714 }));
2715
2716 let bad_json = serde_json::to_string(&bad_json).unwrap();
2717 let err = dst.import_json_updates(&bad_json).unwrap_err();
2718 assert!(
2719 err.to_string().contains("list diff"),
2720 "expected state list bounds validation, got {err:?}"
2721 );
2722 assert_eq!(event_count.load(Ordering::SeqCst), 0);
2723 assert!(dst.drop_pending_events().is_empty());
2724 assert!(dst.oplog().lock().is_empty());
2725
2726 dst.import_json_updates(good_json).unwrap();
2727 assert_eq!(event_count.load(Ordering::SeqCst), 1);
2728 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2729 }
2730
2731 #[test]
2732 fn test_checkout() {
2733 let loro = LoroDoc::new();
2734 loro.set_peer_id(1).unwrap();
2735 let text = loro.get_text("text");
2736 let map = loro.get_map("map");
2737 let list = loro.get_list("list");
2738 let mut txn = loro.txn().unwrap();
2739 for i in 0..10 {
2740 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2741 text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2742 .unwrap();
2743 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2744 }
2745 txn.commit().unwrap();
2746 let b = LoroDoc::new();
2747 b.import(&loro.export(ExportMode::Snapshot).unwrap())
2748 .unwrap();
2749 loro.checkout(&Frontiers::default()).unwrap();
2750 {
2751 let json = &loro.get_deep_value();
2752 assert_eq!(
2753 json.to_json_value(),
2754 serde_json::json!({"text":"","list":[],"map":{}})
2755 );
2756 }
2757
2758 b.checkout(&ID::new(1, 2).into()).unwrap();
2759 {
2760 let json = &b.get_deep_value();
2761 assert_eq!(
2762 json.to_json_value(),
2763 serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2764 );
2765 }
2766
2767 loro.checkout(&ID::new(1, 3).into()).unwrap();
2768 {
2769 let json = &loro.get_deep_value();
2770 assert_eq!(
2771 json.to_json_value(),
2772 serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2773 );
2774 }
2775
2776 b.checkout(&ID::new(1, 29).into()).unwrap();
2777 {
2778 let json = &b.get_deep_value();
2779 assert_eq!(
2780 json.to_json_value(),
2781 serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2782 );
2783 }
2784 }
2785
2786 #[test]
2787 fn import_batch_err_181() {
2788 let a = LoroDoc::new_auto_commit();
2789 let update_a = a.export(ExportMode::Snapshot);
2790 let b = LoroDoc::new_auto_commit();
2791 b.import_batch(&[update_a.unwrap()]).unwrap();
2792 b.get_text("text")
2793 .insert(0, "hello", PosType::Unicode)
2794 .unwrap();
2795 b.commit_then_renew();
2796 let oplog = b.oplog().lock();
2797 drop(oplog);
2798 b.export(ExportMode::all_updates()).unwrap();
2799 }
2800
2801 #[test]
2802 fn poisoned_mutex_keeps_follow_up_operations_failed() {
2803 let doc = LoroDoc::new();
2804 let oplog = doc.oplog.clone();
2805 let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2806 let _guard = oplog.lock();
2807 panic!("poison oplog");
2808 }));
2809
2810 let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2811 .expect_err("poisoned lock should continue to fail fast");
2812 let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2813 (*msg).to_string()
2814 } else if let Some(msg) = err.downcast_ref::<String>() {
2815 msg.clone()
2816 } else {
2817 String::new()
2818 };
2819 assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2820 }
2821}