1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::{AtomicBool, AtomicUsize};
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::{Change, Timestamp},
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use loro_common::{
47 ContainerID, ContainerType, HasCounterSpan, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError,
48 LoroResult, LoroValue, ID,
49};
50use rle::HasLength;
51use rustc_hash::{FxHashMap, FxHashSet};
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 #[inline]
89 pub fn with_barrier<F, R>(&self, f: F) -> R
90 where
91 F: FnOnce() -> R,
92 {
93 let (options, guard) = self.implicit_commit_then_stop();
94 let result = f();
95 drop(guard);
96 self.renew_txn_if_auto_commit(options);
97 result
98 }
99
100 pub fn new() -> Self {
101 let visible_op_count = Arc::new(AtomicUsize::new(0));
102 let oplog = OpLog::new(visible_op_count.clone());
103 let arena = oplog.arena.clone();
104 let config: Configure = oplog.configure.clone();
105 let lock_group = LoroLockGroup::new();
106 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
107 let inner = Arc::new_cyclic(|w| {
108 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
109 LoroDocInner {
110 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
111 state,
112 config,
113 visible_op_count,
114 detached: AtomicBool::new(false),
115 auto_commit: AtomicBool::new(false),
116 observer: Arc::new(Observer::new(arena.clone())),
117 diff_calculator: Arc::new(
118 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
119 ),
120 txn: global_txn,
121 arena,
122 local_update_subs: SubscriberSetWithQueue::new(),
123 peer_id_change_subs: SubscriberSetWithQueue::new(),
124 pre_commit_subs: SubscriberSetWithQueue::new(),
125 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
126 }
127 });
128 LoroDoc { inner }
129 }
130
131 pub fn fork(&self) -> Self {
132 if self.is_detached() {
133 return self
134 .fork_at(&self.state_frontiers())
135 .expect("fork_at on detached doc should not fail");
136 }
137
138 let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
139 let doc = Self::new();
140 doc.with_barrier(|| {
141 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
142 })
143 .unwrap();
144 doc.set_config(&self.config);
145 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
146 doc.start_auto_commit();
147 }
148 doc
149 }
150 pub fn set_detached_editing(&self, enable: bool) {
166 self.config.set_detached_editing(enable);
167 if enable && self.is_detached() {
168 self.with_barrier(|| {
169 self.renew_peer_id();
170 });
171 }
172 }
173
174 #[inline]
176 pub fn new_auto_commit() -> Self {
177 let doc = Self::new();
178 doc.start_auto_commit();
179 doc
180 }
181
182 #[inline(always)]
183 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
184 if peer == PeerID::MAX {
185 return Err(LoroError::InvalidPeerID);
186 }
187 let next_id = self.oplog.lock().next_id(peer);
188 if self.auto_commit.load(Acquire) {
189 let doc_state = self.state.lock();
190 doc_state
191 .peer
192 .store(peer, std::sync::atomic::Ordering::Relaxed);
193
194 if doc_state.is_in_txn() {
195 drop(doc_state);
196 self.with_barrier(|| {});
198 }
199 self.peer_id_change_subs.emit(&(), next_id);
200 return Ok(());
201 }
202
203 let doc_state = self.state.lock();
204 if doc_state.is_in_txn() {
205 return Err(LoroError::TransactionError(
206 "Cannot change peer id during transaction"
207 .to_string()
208 .into_boxed_str(),
209 ));
210 }
211
212 doc_state
213 .peer
214 .store(peer, std::sync::atomic::Ordering::Relaxed);
215 drop(doc_state);
216 self.peer_id_change_subs.emit(&(), next_id);
217 Ok(())
218 }
219
220 pub(crate) fn renew_peer_id(&self) {
222 let mut peer_id = DefaultRandom.next_u64();
223 while peer_id == PeerID::MAX {
224 peer_id = DefaultRandom.next_u64();
225 }
226 self.set_peer_id(peer_id).unwrap();
227 }
228
229 #[inline]
241 #[must_use]
242 pub fn implicit_commit_then_stop(
243 &self,
244 ) -> (
245 Option<CommitOptions>,
246 LoroMutexGuard<'_, Option<Transaction>>,
247 ) {
248 let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
250 (a, b.unwrap())
251 }
252
253 #[inline]
258 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
259 self.commit_internal(CommitOptions::new().immediate_renew(true), false)
261 .0
262 }
263
264 fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
269 let mut txn_guard = self.txn.lock();
270 let Some(txn) = txn_guard.as_mut() else {
271 return Some(txn_guard);
272 };
273
274 if txn.is_peer_first_appearance {
275 txn.is_peer_first_appearance = false;
276 drop(txn_guard);
277 self.first_commit_from_peer_subs.emit(
279 &(),
280 FirstCommitFromPeerPayload {
281 peer: self.peer_id(),
282 },
283 );
284 }
285
286 None
287 }
288
289 #[instrument(skip_all)]
297 fn commit_internal(
298 &self,
299 config: CommitOptions,
300 preserve_on_empty: bool,
301 ) -> (
302 Option<CommitOptions>,
303 Option<LoroMutexGuard<'_, Option<Transaction>>>,
304 ) {
305 if !self.auto_commit.load(Acquire) {
306 let txn_guard = self.txn.lock();
307 return (None, Some(txn_guard));
310 }
311
312 loop {
313 if let Some(txn_guard) = self.before_commit() {
314 return (None, Some(txn_guard));
315 }
316
317 let mut txn_guard = self.txn.lock();
318 let txn = txn_guard.take();
319 let Some(mut txn) = txn else {
320 return (None, Some(txn_guard));
321 };
322 let on_commit = txn.take_on_commit();
323 if let Some(origin) = config.origin.clone() {
324 txn.set_origin(origin);
325 }
326
327 if let Some(timestamp) = config.timestamp {
328 txn.set_timestamp(timestamp);
329 }
330
331 if let Some(msg) = config.commit_msg.as_ref() {
332 txn.set_msg(Some(msg.clone()));
333 }
334
335 let id_span = txn.id_span();
336 let mut options = txn.commit().unwrap();
337 if let Some(opts) = options.as_mut() {
339 if config.origin.is_some() {
341 opts.set_origin(None);
342 }
343 if !preserve_on_empty {
345 options = None;
346 }
347 }
348 if config.immediate_renew && self.can_edit() {
349 let mut t = self.txn().unwrap();
350 if let Some(options) = options.as_ref() {
351 t.set_options(options.clone());
352 }
353 *txn_guard = Some(t);
354 }
355
356 if let Some(on_commit) = on_commit {
357 drop(txn_guard);
358 on_commit(&self.state, &self.oplog, id_span);
359 txn_guard = self.txn.lock();
360 if !config.immediate_renew && txn_guard.is_some() {
361 continue;
363 }
364 }
365
366 return (
367 options,
368 if !config.immediate_renew {
369 Some(txn_guard)
370 } else {
371 None
372 },
373 );
374 }
375 }
376
377 #[instrument(skip_all)]
382 pub fn commit_with(
383 &self,
384 config: CommitOptions,
385 ) -> (
386 Option<CommitOptions>,
387 Option<LoroMutexGuard<'_, Option<Transaction>>>,
388 ) {
389 self.commit_internal(config, false)
390 }
391
392 pub fn set_next_commit_message(&self, message: &str) {
394 let mut binding = self.txn.lock();
395 let Some(txn) = binding.as_mut() else {
396 return;
397 };
398
399 if message.is_empty() {
400 txn.set_msg(None)
401 } else {
402 txn.set_msg(Some(message.into()))
403 }
404 }
405
406 pub fn set_next_commit_origin(&self, origin: &str) {
408 let mut txn = self.txn.lock();
409 if let Some(txn) = txn.as_mut() {
410 txn.set_origin(origin.into());
411 }
412 }
413
414 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
416 let mut txn = self.txn.lock();
417 if let Some(txn) = txn.as_mut() {
418 txn.set_timestamp(timestamp);
419 }
420 }
421
422 pub fn set_next_commit_options(&self, options: CommitOptions) {
424 let mut txn = self.txn.lock();
425 if let Some(txn) = txn.as_mut() {
426 txn.set_options(options);
427 }
428 }
429
430 pub fn clear_next_commit_options(&self) {
432 let mut txn = self.txn.lock();
433 if let Some(txn) = txn.as_mut() {
434 txn.set_options(CommitOptions::new());
435 }
436 }
437
438 #[inline]
449 pub fn set_record_timestamp(&self, record: bool) {
450 self.config.set_record_timestamp(record);
451 }
452
453 #[inline]
458 pub fn set_change_merge_interval(&self, interval: i64) {
459 self.config.set_merge_interval(interval);
460 }
461
462 pub fn can_edit(&self) -> bool {
463 !self.is_detached() || self.config.detached_editing()
464 }
465
466 pub fn is_detached_editing_enabled(&self) -> bool {
467 self.config.detached_editing()
468 }
469
470 #[inline]
471 pub fn config_text_style(&self, text_style: StyleConfigMap) {
472 self.config.text_style_config.write().map = text_style.map;
473 }
474
475 #[inline]
476 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
477 self.config.text_style_config.write().default_style = text_style;
478 }
479 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
480 let doc = Self::new();
481 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
482 if mode.is_snapshot() {
483 doc.with_barrier(|| -> Result<(), LoroError> {
484 decode_snapshot(&doc, mode, body, Default::default())?;
485 Ok(())
486 })?;
487 Ok(doc)
488 } else {
489 Err(LoroError::DecodeError(
490 "Invalid encode mode".to_string().into(),
491 ))
492 }
493 }
494
495 #[inline(always)]
497 pub fn can_reset_with_snapshot(&self) -> bool {
498 let oplog = self.oplog.lock();
499 if oplog.batch_importing {
500 return false;
501 }
502
503 if self.is_detached() {
504 return false;
505 }
506
507 oplog.is_empty() && self.state.lock().can_import_snapshot()
508 }
509
510 #[inline(always)]
516 pub fn is_detached(&self) -> bool {
517 self.detached.load(Acquire)
518 }
519
520 pub(crate) fn set_detached(&self, detached: bool) {
521 self.detached.store(detached, Release);
522 }
523
524 #[inline(always)]
525 pub fn peer_id(&self) -> PeerID {
526 self.state
527 .lock()
528 .peer
529 .load(std::sync::atomic::Ordering::Relaxed)
530 }
531
532 #[inline(always)]
533 pub fn detach(&self) {
534 self.with_barrier(|| self.set_detached(true));
535 }
536
537 #[inline(always)]
538 pub fn attach(&self) {
539 self.checkout_to_latest()
540 }
541
542 pub fn state_timestamp(&self) -> Timestamp {
545 let f = { self.state.lock().frontiers.clone() };
547 self.oplog.lock().get_timestamp_of_version(&f)
548 }
549
550 #[inline(always)]
551 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
552 &self.state
553 }
554
555 #[inline]
556 pub fn get_state_deep_value(&self) -> LoroValue {
557 self.state.lock().get_deep_value()
558 }
559
560 #[inline(always)]
561 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
562 &self.oplog
563 }
564
565 #[inline(always)]
566 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
567 let s = debug_span!("import", peer = self.peer_id());
568 let _e = s.enter();
569 self.import_with(bytes, Default::default())
570 }
571
572 #[inline]
573 pub fn import_with(
574 &self,
575 bytes: &[u8],
576 origin: InternalString,
577 ) -> Result<ImportStatus, LoroError> {
578 self.with_barrier(|| self._import_with(bytes, origin))
579 }
580
581 #[tracing::instrument(skip_all)]
582 fn _import_with(
583 &self,
584 bytes: &[u8],
585 origin: InternalString,
586 ) -> Result<ImportStatus, LoroError> {
587 ensure_cov::notify_cov("loro_internal::import");
588 let parsed = parse_header_and_body(bytes, true)?;
589 loro_common::info!("Importing with mode={:?}", &parsed.mode);
590 let result = match parsed.mode {
591 EncodeMode::OutdatedRle => {
592 if self.state.lock().is_in_txn() {
593 return Err(LoroError::ImportWhenInTxn);
594 }
595
596 let s = tracing::span!(
597 tracing::Level::INFO,
598 "Import updates ",
599 peer = self.peer_id()
600 );
601 let _e = s.enter();
602 self.update_oplog_and_apply_delta_to_state_if_needed(
603 |oplog| oplog.decode(parsed),
604 origin,
605 )
606 }
607 EncodeMode::OutdatedSnapshot => {
608 if self.can_reset_with_snapshot() {
609 loro_common::info!("Init by snapshot {}", self.peer_id());
610 decode_snapshot(self, parsed.mode, parsed.body, origin)
611 } else {
612 self.update_oplog_and_apply_delta_to_state_if_needed(
613 |oplog| oplog.decode(parsed),
614 origin,
615 )
616 }
617 }
618 EncodeMode::FastSnapshot => {
619 if self.can_reset_with_snapshot() {
620 ensure_cov::notify_cov("loro_internal::import::snapshot");
621 loro_common::info!("Init by fast snapshot {}", self.peer_id());
622 decode_snapshot(self, parsed.mode, parsed.body, origin)
623 } else {
624 self.import_changes_and_apply_delta_to_state_if_needed(
625 |oplog| encoding::decode_oplog_changes(oplog, parsed),
626 origin,
627 )
628
629 }
634 }
635 EncodeMode::FastUpdates => self.import_changes_and_apply_delta_to_state_if_needed(
636 |oplog| encoding::decode_oplog_changes(oplog, parsed),
637 origin,
638 ),
639 EncodeMode::Auto => {
640 unreachable!()
641 }
642 };
643
644 self.emit_events();
645
646 result
647 }
648
649 #[tracing::instrument(skip_all)]
650 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
651 &self,
652 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
653 origin: InternalString,
654 ) -> Result<ImportStatus, LoroError> {
655 let mut oplog = self.oplog.lock();
656 oplog.begin_import_rollback();
657 if !self.is_detached() {
658 let old_vv = oplog.vv().clone();
659 let old_frontiers = oplog.frontiers().clone();
660 let result = f(&mut oplog);
661 if &old_vv != oplog.vv() {
662 let mut diff = DiffCalculator::new(false);
663 let (diff, diff_mode) = diff.calc_diff_internal(
664 &oplog,
665 &old_vv,
666 &old_frontiers,
667 oplog.vv(),
668 oplog.dag.get_frontiers(),
669 None,
670 );
671 let mut state = self.state.lock();
672 if let Err(e) = state.apply_diff(
673 InternalDocDiff {
674 origin,
675 diff: (diff).into(),
676 by: EventTriggerKind::Import,
677 new_version: Cow::Owned(oplog.frontiers().clone()),
678 },
679 diff_mode,
680 ) {
681 oplog.rollback_import();
682 return Err(e);
683 }
684 }
685 match result {
686 Ok(result) => {
687 oplog.commit_import_rollback();
688 Ok(result)
689 }
690 Err(e) => {
691 if &old_vv == oplog.vv() {
696 oplog.rollback_import();
697 } else {
698 oplog.commit_import_rollback();
699 }
700 Err(e)
701 }
702 }
703 } else {
704 match f(&mut oplog) {
705 Ok(result) => {
706 oplog.commit_import_rollback();
707 Ok(result)
708 }
709 Err(e) => {
710 oplog.rollback_import();
711 Err(e)
712 }
713 }
714 }
715 }
716
717 #[tracing::instrument(skip_all)]
718 pub(crate) fn import_changes_and_apply_delta_to_state_if_needed(
719 &self,
720 decode_changes: impl FnOnce(&mut OpLog) -> Result<Vec<Change>, LoroError>,
721 origin: InternalString,
722 ) -> Result<ImportStatus, LoroError> {
723 let mut oplog = self.oplog.lock();
724 let arena_checkpoint = oplog.arena.checkpoint_for_rollback();
725 let changes = match decode_changes(&mut oplog) {
726 Ok(changes) => changes,
727 Err(e) => {
728 oplog.arena.rollback(arena_checkpoint);
729 return Err(e);
730 }
731 };
732
733 let preflight = oplog.preflight_import_changes(&changes);
734 if preflight.has_deps_before_shallow_root
735 && (self.is_detached() || !preflight.applies_to_dag)
736 {
737 oplog.arena.rollback(arena_checkpoint);
738 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
739 }
740
741 if self.is_detached() {
742 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
743 if result.has_deps_before_shallow_root {
744 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
745 }
746
747 return Ok(result.status);
748 }
749
750 if !preflight.applies_to_dag {
751 let pending_root_containers = pending_root_containers_to_materialize(&oplog, &changes);
752 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
753 if result.has_deps_before_shallow_root {
754 oplog.arena.rollback(arena_checkpoint);
755 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
756 }
757
758 if !pending_root_containers.is_empty() {
759 let mut state = self.state.lock();
760 for id in pending_root_containers {
761 state.ensure_container(&id);
762 }
763 }
764
765 return Ok(result.status);
766 }
767
768 let old_vv = oplog.vv().clone();
769 let old_frontiers = oplog.frontiers().clone();
770 let rollback_enabled = preflight.needs_state_apply_rollback;
771 if rollback_enabled {
772 oplog.begin_import_rollback_with_arena(arena_checkpoint);
773 }
774
775 let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
776 if &old_vv != oplog.vv() {
777 let mut diff = DiffCalculator::new(false);
778 let (diff, diff_mode) = diff.calc_diff_internal(
779 &oplog,
780 &old_vv,
781 &old_frontiers,
782 oplog.vv(),
783 oplog.dag.get_frontiers(),
784 None,
785 );
786 let mut state = self.state.lock();
787 if let Err(e) = state.apply_diff(
788 InternalDocDiff {
789 origin,
790 diff: (diff).into(),
791 by: EventTriggerKind::Import,
792 new_version: Cow::Owned(oplog.frontiers().clone()),
793 },
794 diff_mode,
795 ) {
796 if rollback_enabled {
797 oplog.rollback_import();
798 return Err(e);
799 }
800
801 panic!("state apply returned Err for import without rollback guard: {e}");
802 }
803 }
804
805 if result.has_deps_before_shallow_root {
806 if rollback_enabled {
807 oplog.commit_import_rollback();
808 }
809 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
810 }
811
812 if rollback_enabled {
813 oplog.commit_import_rollback();
814 }
815 Ok(result.status)
816 }
817
818 fn emit_events(&self) {
819 let events = {
821 let mut state = self.state.lock();
822 state.take_events()
823 };
824 for event in events {
825 self.observer.emit(event);
826 }
827 }
828
829 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
830 let mut state = self.state.lock();
831 state.take_events()
832 }
833
834 #[tracing::instrument(skip_all)]
838 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
839 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
840 self.with_barrier(|| {
841 let result = self.import_changes_and_apply_delta_to_state_if_needed(
842 |oplog| crate::encoding::json_schema::decode_json_changes(json, &oplog.arena),
843 Default::default(),
844 );
845 self.emit_events();
846 result
847 })
848 }
849
850 pub fn export_json_updates(
851 &self,
852 start_vv: &VersionVector,
853 end_vv: &VersionVector,
854 with_peer_compression: bool,
855 ) -> JsonSchema {
856 self.with_barrier(|| {
857 let oplog = self.oplog.lock();
858 let mut start_vv = start_vv;
859 let _temp: Option<VersionVector>;
860 if !oplog.dag.shallow_since_vv().is_empty() {
861 let mut include_all = true;
863 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
864 if start_vv.get(peer).unwrap_or(&0) < counter {
865 include_all = false;
866 break;
867 }
868 }
869 if !include_all {
870 let mut vv = start_vv.clone();
871 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
872 vv.extend_to_include_end_id(ID::new(peer, counter));
873 }
874 _temp = Some(vv);
875 start_vv = _temp.as_ref().unwrap();
876 }
877 }
878
879 crate::encoding::json_schema::export_json(
880 &oplog,
881 start_vv,
882 end_vv,
883 with_peer_compression,
884 )
885 })
886 }
887
888 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
889 let oplog = self.oplog.lock();
890 let mut changes = export_json_in_id_span(&oplog, id_span);
891 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
892 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
893 changes.push(change_json);
894 }
895 changes
896 }
897
898 #[inline]
900 pub fn oplog_vv(&self) -> VersionVector {
901 self.oplog.lock().vv().clone()
902 }
903
904 #[inline]
906 pub fn state_vv(&self) -> VersionVector {
907 let oplog = self.oplog.lock();
908 let f = &self.state.lock().frontiers;
909 oplog.dag.frontiers_to_vv(f).unwrap()
910 }
911
912 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
913 let value: LoroValue = self.state.lock().get_value_by_path(path)?;
914 if let LoroValue::Container(c) = value {
915 Some(ValueOrHandler::Handler(Handler::new_attached(
916 c.clone(),
917 self.clone(),
918 )))
919 } else {
920 Some(ValueOrHandler::Value(value))
921 }
922 }
923
924 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
926 let path = str_to_path(path)?;
927 self.get_by_path(&path)
928 }
929
930 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
931 let arena = &self.arena;
932 let txn = self.txn.lock();
933 let txn = txn.as_ref()?;
934 let ops_ = txn.local_ops();
935 let new_id = ID {
936 peer: *txn.peer(),
937 counter: ops_.first()?.counter,
938 };
939 let change = ChangeRef {
940 id: &new_id,
941 deps: txn.frontiers(),
942 timestamp: &txn
943 .timestamp()
944 .as_ref()
945 .copied()
946 .unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
947 commit_msg: txn.msg(),
948 ops: ops_,
949 lamport: txn.lamport(),
950 };
951 let json = encode_change_to_json(change, arena);
952 Some(json)
953 }
954
955 #[inline]
956 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
957 if self.has_container(&id) {
958 self.ensure_root_container(&id);
959 Some(Handler::new_attached(id, self.clone()))
960 } else {
961 None
962 }
963 }
964
965 #[inline]
966 fn ensure_root_container(&self, id: &ContainerID) {
967 if id.is_root() && !id.is_mergeable() {
971 self.state.lock().ensure_container(id);
972 }
973 }
974
975 #[inline]
978 pub fn try_get_text<I: IntoContainerId>(&self, id: I) -> Option<TextHandler> {
979 let id = id.into_container_id(&self.arena, ContainerType::Text);
980 if !self.has_container(&id) {
981 return None;
982 }
983 self.ensure_root_container(&id);
984 Handler::new_attached(id, self.clone()).into_text().ok()
985 }
986
987 #[inline]
990 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
991 self.try_get_text(id)
992 .expect("The container does not exist in the document. Use `try_get_text` or `get_container` to check for existence.")
993 }
994
995 #[inline]
998 pub fn try_get_list<I: IntoContainerId>(&self, id: I) -> Option<ListHandler> {
999 let id = id.into_container_id(&self.arena, ContainerType::List);
1000 if !self.has_container(&id) {
1001 return None;
1002 }
1003 self.ensure_root_container(&id);
1004 Handler::new_attached(id, self.clone()).into_list().ok()
1005 }
1006
1007 #[inline]
1010 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
1011 self.try_get_list(id)
1012 .expect("The container does not exist in the document. Use `try_get_list` or `get_container` to check for existence.")
1013 }
1014
1015 #[inline]
1018 pub fn try_get_movable_list<I: IntoContainerId>(&self, id: I) -> Option<MovableListHandler> {
1019 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
1020 if !self.has_container(&id) {
1021 return None;
1022 }
1023 self.ensure_root_container(&id);
1024 Handler::new_attached(id, self.clone())
1025 .into_movable_list()
1026 .ok()
1027 }
1028
1029 #[inline]
1032 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
1033 self.try_get_movable_list(id)
1034 .expect("The container does not exist in the document. Use `try_get_movable_list` or `get_container` to check for existence.")
1035 }
1036
1037 #[inline]
1040 pub fn try_get_map<I: IntoContainerId>(&self, id: I) -> Option<MapHandler> {
1041 let id = id.into_container_id(&self.arena, ContainerType::Map);
1042 if !self.has_container(&id) {
1043 return None;
1044 }
1045 self.ensure_root_container(&id);
1046 Handler::new_attached(id, self.clone()).into_map().ok()
1047 }
1048
1049 #[inline]
1052 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
1053 self.try_get_map(id)
1054 .expect("The container does not exist in the document. Use `try_get_map` or `get_container` to check for existence.")
1055 }
1056
1057 #[inline]
1060 pub fn try_get_tree<I: IntoContainerId>(&self, id: I) -> Option<TreeHandler> {
1061 let id = id.into_container_id(&self.arena, ContainerType::Tree);
1062 if !self.has_container(&id) {
1063 return None;
1064 }
1065 self.ensure_root_container(&id);
1066 Handler::new_attached(id, self.clone()).into_tree().ok()
1067 }
1068
1069 #[inline]
1072 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
1073 self.try_get_tree(id)
1074 .expect("The container does not exist in the document. Use `try_get_tree` or `get_container` to check for existence.")
1075 }
1076
1077 #[cfg(feature = "counter")]
1078 pub fn try_get_counter<I: IntoContainerId>(
1079 &self,
1080 id: I,
1081 ) -> Option<crate::handler::counter::CounterHandler> {
1082 let id = id.into_container_id(&self.arena, ContainerType::Counter);
1083 if !self.has_container(&id) {
1084 return None;
1085 }
1086 self.ensure_root_container(&id);
1087 Handler::new_attached(id, self.clone()).into_counter().ok()
1088 }
1089
1090 #[cfg(feature = "counter")]
1091 pub fn get_counter<I: IntoContainerId>(
1092 &self,
1093 id: I,
1094 ) -> crate::handler::counter::CounterHandler {
1095 self.try_get_counter(id)
1096 .expect("The container does not exist in the document. Use `try_get_counter` or `get_container` to check for existence.")
1097 }
1098
1099 #[must_use]
1100 pub fn has_container(&self, id: &ContainerID) -> bool {
1101 if id.is_root() && !id.is_mergeable() {
1102 return true;
1103 }
1104
1105 let exist = self.state.lock().does_container_exist(id);
1106 exist
1107 }
1108
1109 #[instrument(level = "info", skip_all)]
1123 pub fn undo_internal(
1124 &self,
1125 id_span: IdSpan,
1126 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1127 post_transform_base: Option<&DiffBatch>,
1128 before_diff: &mut dyn FnMut(&DiffBatch),
1129 ) -> LoroResult<CommitWhenDrop<'_>> {
1130 if !self.can_edit() {
1131 return Err(LoroError::EditWhenDetached);
1132 }
1133
1134 let (options, txn) = self.implicit_commit_then_stop();
1135 if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
1136 self.renew_txn_if_auto_commit(options);
1137 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
1138 }
1139
1140 let (was_recording, latest_frontiers) = {
1141 let mut state = self.state.lock();
1142 let was_recording = state.is_recording();
1143 state.stop_and_clear_recording();
1144 (was_recording, state.frontiers.clone())
1145 };
1146
1147 let spans = self.oplog.lock().split_span_based_on_deps(id_span);
1148 let diff = crate::undo::undo(
1149 spans,
1150 match post_transform_base {
1151 Some(d) => Either::Right(d),
1152 None => Either::Left(&latest_frontiers),
1153 },
1154 |from, to| {
1155 self._checkout_without_emitting(from, false, false).unwrap();
1156 self.state.lock().start_recording();
1157 self._checkout_without_emitting(to, false, false).unwrap();
1158 let mut state = self.state.lock();
1159 let e = state.take_events();
1160 state.stop_and_clear_recording();
1161 DiffBatch::new(e)
1162 },
1163 before_diff,
1164 );
1165
1166 self._checkout_without_emitting(&latest_frontiers, false, false)?;
1170 self.set_detached(false);
1171 if was_recording {
1172 self.state.lock().start_recording();
1173 }
1174 drop(txn);
1175 self.start_auto_commit();
1176 if let Err(e) = self._apply_diff(diff, container_remap, true) {
1180 warn!("Undo Failed {:?}", e);
1181 }
1182
1183 if let Some(options) = options {
1184 self.set_next_commit_options(options);
1185 }
1186 Ok(CommitWhenDrop {
1187 doc: self,
1188 default_options: CommitOptions::new().origin("undo"),
1189 })
1190 }
1191
1192 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
1198 let f = self.state_frontiers();
1201 let diff = self.diff(&f, target)?;
1202 self._apply_diff(diff, &mut Default::default(), false)
1203 }
1204
1205 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
1210 {
1211 let oplog = self.oplog.lock();
1214 let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
1215 for id in frontiers.iter() {
1216 if !oplog.dag.contains(id) {
1217 return Err(LoroError::FrontiersNotFound(id));
1218 }
1219 }
1220
1221 if oplog.dag.is_before_shallow_root(frontiers) {
1222 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1223 }
1224
1225 Ok(())
1226 };
1227
1228 validate_frontiers(a)?;
1229 validate_frontiers(b)?;
1230 }
1231
1232 let (options, txn) = self.implicit_commit_then_stop();
1233 let was_detached = self.is_detached();
1234 let old_frontiers = self.state_frontiers();
1235 let was_recording = {
1236 let mut state = self.state.lock();
1237 let is_recording = state.is_recording();
1238 state.stop_and_clear_recording();
1239 is_recording
1240 };
1241 let result = (|| {
1242 self._checkout_without_emitting(a, true, false)?;
1243 self.state.lock().start_recording();
1244 self._checkout_without_emitting(b, true, false)?;
1245 let mut state = self.state.lock();
1246 let e = state.take_events();
1247 state.stop_and_clear_recording();
1248 Ok::<_, LoroError>(e)
1249 })();
1250
1251 self._checkout_without_emitting(&old_frontiers, false, false)
1253 .unwrap();
1254 drop(txn);
1255 if !was_detached {
1256 self.set_detached(false);
1257 self.renew_txn_if_auto_commit(options);
1258 }
1259 if was_recording {
1260 self.state.lock().start_recording();
1261 }
1262 result.map(DiffBatch::new)
1263 }
1264
1265 #[inline(always)]
1267 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1268 self._apply_diff(diff, &mut Default::default(), true)
1269 }
1270
1271 pub(crate) fn _apply_diff(
1283 &self,
1284 diff: DiffBatch,
1285 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1286 skip_unreachable: bool,
1287 ) -> LoroResult<()> {
1288 if !self.can_edit() {
1289 return Err(LoroError::EditWhenDetached);
1290 }
1291
1292 let mut ans: LoroResult<()> = Ok(());
1293 let mut missing_containers: Vec<ContainerID> = Vec::new();
1294 for (mut id, diff) in diff.into_iter() {
1295 let mut remapped = false;
1296 while let Some(rid) = container_remap.get(&id) {
1297 remapped = true;
1298 id = rid.clone();
1299 }
1300
1301 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1302 let exists = self.state.lock().does_container_exist(&id);
1304 if !exists {
1305 missing_containers.push(id);
1306 continue;
1307 }
1308 self.state.lock().ensure_container(&id);
1310 }
1311
1312 if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
1313 continue;
1314 }
1315
1316 let Some(h) = self.get_handler(id.clone()) else {
1317 return Err(LoroError::ContainersNotFound {
1318 containers: Box::new(vec![id]),
1319 });
1320 };
1321 if let Err(e) = h.apply_diff(diff, container_remap) {
1322 ans = Err(e);
1323 }
1324 }
1325
1326 if !missing_containers.is_empty() {
1327 return Err(LoroError::ContainersNotFound {
1328 containers: Box::new(missing_containers),
1329 });
1330 }
1331
1332 ans
1333 }
1334
1335 #[inline]
1337 pub fn diagnose_size(&self) {
1338 self.oplog().lock().diagnose_size();
1339 }
1340
1341 #[inline]
1342 pub fn oplog_frontiers(&self) -> Frontiers {
1343 self.oplog().lock().frontiers().clone()
1344 }
1345
1346 #[inline]
1347 pub fn state_frontiers(&self) -> Frontiers {
1348 self.state.lock().frontiers.clone()
1349 }
1350
1351 #[inline]
1355 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1356 self.oplog().lock().cmp_with_frontiers(other)
1357 }
1358
1359 #[inline]
1363 pub fn cmp_frontiers(
1364 &self,
1365 a: &Frontiers,
1366 b: &Frontiers,
1367 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1368 self.oplog().lock().cmp_frontiers(a, b)
1369 }
1370
1371 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1372 let mut state = self.state.lock();
1373 if !state.is_recording() {
1374 state.start_recording();
1375 }
1376
1377 self.observer.subscribe_root(callback)
1378 }
1379
1380 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1381 let mut state = self.state.lock();
1382 if !state.is_recording() {
1383 state.start_recording();
1384 }
1385
1386 self.observer.subscribe(container_id, callback)
1387 }
1388
1389 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1390 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1391 activate();
1392 sub
1393 }
1394
1395 #[tracing::instrument(skip_all)]
1397 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1398 if bytes.is_empty() {
1399 return Ok(ImportStatus::default());
1400 }
1401
1402 if bytes.len() == 1 {
1403 return self.import(&bytes[0]);
1404 }
1405
1406 let mut success = VersionRange::default();
1407 let mut meta_arr = bytes
1408 .iter()
1409 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1410 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1411 meta_arr.sort_by(|a, b| {
1412 a.0.mode
1413 .cmp(&b.0.mode)
1414 .then(b.0.change_num.cmp(&a.0.change_num))
1415 });
1416
1417 let (options, txn) = self.implicit_commit_then_stop();
1418 let is_detached = self.is_detached();
1442 self.set_detached(true);
1443 self.oplog.lock().batch_importing = true;
1444 let mut err = None;
1445 for (_meta, data) in meta_arr {
1446 match self._import_with(data, Default::default()) {
1447 Ok(s) => {
1448 for (peer, (start, end)) in s.success.iter() {
1449 match success.0.entry(*peer) {
1450 Entry::Occupied(mut e) => {
1451 e.get_mut().1 = *end.max(&e.get().1);
1452 }
1453 Entry::Vacant(e) => {
1454 e.insert((*start, *end));
1455 }
1456 }
1457 }
1458 }
1459 Err(e) => {
1460 err = Some(e);
1461 }
1462 }
1463 }
1464
1465 let mut oplog = self.oplog.lock();
1466 oplog.batch_importing = false;
1467 let pending = oplog.pending_changes.version_range();
1468 drop(oplog);
1469 if !is_detached {
1470 self._checkout_to_latest_with_guard(txn);
1471 } else {
1472 drop(txn);
1473 }
1474
1475 self.renew_txn_if_auto_commit(options);
1476 if let Some(err) = err {
1477 return Err(err);
1478 }
1479
1480 Ok(ImportStatus {
1481 success,
1482 pending: if pending.is_empty() {
1483 None
1484 } else {
1485 Some(pending)
1486 },
1487 })
1488 }
1489
1490 #[inline]
1492 pub fn get_value(&self) -> LoroValue {
1493 self.state.lock().get_value()
1494 }
1495
1496 #[inline]
1498 pub fn get_deep_value(&self) -> LoroValue {
1499 self.state.lock().get_deep_value()
1500 }
1501
1502 #[inline]
1504 pub fn get_deep_value_with_id(&self) -> LoroValue {
1505 self.state.lock().get_deep_value_with_id()
1506 }
1507
1508 pub fn checkout_to_latest(&self) {
1509 let (options, _guard) = self.implicit_commit_then_stop();
1510 if !self.is_detached() {
1511 drop(_guard);
1512 self.renew_txn_if_auto_commit(options);
1513 return;
1514 }
1515
1516 self._checkout_to_latest_without_commit(true)
1517 .expect("checkout to oplog frontiers should succeed");
1518 self.emit_events();
1519 drop(_guard);
1520 self.renew_txn_if_auto_commit(options);
1521 }
1522
1523 fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1524 if !self.is_detached() {
1525 self._renew_txn_if_auto_commit_with_guard(None, guard);
1526 return;
1527 }
1528
1529 self._checkout_to_latest_without_commit(true)
1530 .expect("checkout to oplog frontiers should succeed");
1531 self._renew_txn_if_auto_commit_with_guard(None, guard);
1532 }
1533
1534 pub(crate) fn _checkout_to_latest_without_commit(
1536 &self,
1537 to_commit_then_renew: bool,
1538 ) -> LoroResult<()> {
1539 self._checkout_to_latest_without_commit_with_event(
1540 to_commit_then_renew,
1541 "checkout".into(),
1542 EventTriggerKind::Checkout,
1543 )
1544 }
1545
1546 pub(crate) fn _checkout_to_latest_without_commit_as_import(
1547 &self,
1548 to_commit_then_renew: bool,
1549 origin: InternalString,
1550 ) -> LoroResult<()> {
1551 self._checkout_to_latest_without_commit_with_event(
1552 to_commit_then_renew,
1553 origin,
1554 EventTriggerKind::Import,
1555 )
1556 }
1557
1558 fn _checkout_to_latest_without_commit_with_event(
1559 &self,
1560 to_commit_then_renew: bool,
1561 origin: InternalString,
1562 triggered_by: EventTriggerKind,
1563 ) -> LoroResult<()> {
1564 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1565 let f = self.oplog_frontiers();
1566 let this = &self;
1567 let frontiers = &f;
1568 this._checkout_without_emitting_with_event(
1569 frontiers,
1570 false,
1571 to_commit_then_renew,
1572 origin,
1573 triggered_by,
1574 )?;
1575 this.emit_events();
1577 if this.config.detached_editing() {
1578 this.renew_peer_id();
1579 }
1580
1581 self.set_detached(false);
1582 Ok(())
1583 })
1584 }
1585
1586 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1591 let was_detached = self.is_detached();
1592 let (options, guard) = self.implicit_commit_then_stop();
1593 let result = self._checkout_without_emitting(frontiers, true, true);
1594 if result.is_ok() {
1595 self.emit_events();
1596 }
1597 drop(guard);
1598 if self.config.detached_editing() {
1599 if result.is_ok() {
1600 self.renew_peer_id();
1601 }
1602 self.renew_txn_if_auto_commit(options);
1603 } else if result.is_err() {
1604 if !was_detached {
1605 self.renew_txn_if_auto_commit(options);
1606 }
1607 } else if !self.is_detached() {
1608 self.renew_txn_if_auto_commit(options);
1609 }
1610
1611 result
1612 }
1613
1614 #[instrument(level = "info", skip(self))]
1616 pub(crate) fn _checkout_without_emitting(
1617 &self,
1618 frontiers: &Frontiers,
1619 to_shrink_frontiers: bool,
1620 to_commit_then_renew: bool,
1621 ) -> Result<(), LoroError> {
1622 self._checkout_without_emitting_with_event(
1623 frontiers,
1624 to_shrink_frontiers,
1625 to_commit_then_renew,
1626 "checkout".into(),
1627 EventTriggerKind::Checkout,
1628 )
1629 }
1630
1631 fn _checkout_without_emitting_with_event(
1632 &self,
1633 frontiers: &Frontiers,
1634 to_shrink_frontiers: bool,
1635 _to_commit_then_renew: bool,
1636 origin: InternalString,
1637 triggered_by: EventTriggerKind,
1638 ) -> Result<(), LoroError> {
1639 if !self.txn.is_locked() {
1640 return Err(LoroError::TransactionError(
1641 "checkout requires the transaction mutex to be held"
1642 .to_string()
1643 .into_boxed_str(),
1644 ));
1645 }
1646 let from_frontiers = self.state_frontiers();
1647 loro_common::info!(
1648 "checkout from={:?} to={:?} cur_vv={:?}",
1649 from_frontiers,
1650 frontiers,
1651 self.oplog_vv()
1652 );
1653
1654 if &from_frontiers == frontiers {
1655 return Ok(());
1656 }
1657
1658 let oplog = self.oplog.lock();
1659 if oplog.dag.is_before_shallow_root(frontiers) {
1660 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1661 }
1662
1663 let frontiers = if to_shrink_frontiers {
1664 shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
1665 } else {
1666 frontiers.clone()
1667 };
1668
1669 if from_frontiers == frontiers {
1670 return Ok(());
1671 }
1672
1673 let mut state = self.state.lock();
1674 let mut calc = self.diff_calculator.lock();
1675 for i in frontiers.iter() {
1676 if !oplog.dag.contains(i) {
1677 return Err(LoroError::FrontiersNotFound(i));
1678 }
1679 }
1680
1681 let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
1682 LoroError::NotFoundError(
1683 format!(
1684 "Cannot find the current state version {:?}",
1685 state.frontiers
1686 )
1687 .into_boxed_str(),
1688 )
1689 })?;
1690 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1691 return Err(LoroError::NotFoundError(
1692 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1693 ));
1694 };
1695
1696 self.set_detached(true);
1697 let (diff, diff_mode) =
1698 calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
1699 state.apply_diff(
1700 InternalDocDiff {
1701 origin,
1702 diff: Cow::Owned(diff),
1703 by: triggered_by,
1704 new_version: Cow::Owned(frontiers.clone()),
1705 },
1706 diff_mode,
1707 )?;
1708
1709 Ok(())
1710 }
1711
1712 #[inline]
1713 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1714 self.oplog.lock().dag.vv_to_frontiers(vv)
1715 }
1716
1717 #[inline]
1718 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1719 self.oplog.lock().dag.frontiers_to_vv(frontiers)
1720 }
1721
1722 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1726 let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
1727 self.import(&updates)
1728 }
1729
1730 pub(crate) fn arena(&self) -> &SharedArena {
1731 &self.arena
1732 }
1733
1734 #[inline]
1735 pub fn len_ops(&self) -> usize {
1736 if self.oplog.can_lock_in_this_thread() {
1737 return self.oplog.lock().visible_op_count_exact();
1738 }
1739
1740 self.visible_op_count.load(Acquire)
1741 }
1742
1743 #[inline]
1744 pub fn len_changes(&self) -> usize {
1745 let oplog = self.oplog.lock();
1746 oplog.len_changes()
1747 }
1748
1749 pub fn config(&self) -> &Configure {
1750 &self.config
1751 }
1752
1753 pub fn check_state_diff_calc_consistency_slow(&self) {
1758 {
1760 static IS_CHECKING: std::sync::atomic::AtomicBool =
1761 std::sync::atomic::AtomicBool::new(false);
1762 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1763 return;
1764 }
1765
1766 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1767 let peer_id = self.peer_id();
1768 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1769 let _g = s.enter();
1770 let options = self.implicit_commit_then_stop().0;
1771 self.oplog.lock().check_dag_correctness();
1772 if self.is_shallow() {
1773 let initial_snapshot = self
1784 .export(ExportMode::state_only(Some(
1785 &self.shallow_since_frontiers(),
1786 )))
1787 .unwrap();
1788
1789 let doc = LoroDoc::new();
1791 doc.import(&initial_snapshot).unwrap();
1792 self.checkout(&self.shallow_since_frontiers()).unwrap();
1793 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1794
1795 let updates = self.export(ExportMode::all_updates()).unwrap();
1797
1798 doc.import(&updates).unwrap();
1800 self.checkout_to_latest();
1801
1802 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1805 let mut calculated_state = doc.app_state().lock();
1806 let mut current_state = self.app_state().lock();
1807 current_state.check_is_the_same(&mut calculated_state);
1808 } else {
1809 let f = self.state_frontiers();
1810 let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
1811 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1812 let doc = Self::new();
1813 doc.import(&bytes).unwrap();
1814 let mut calculated_state = doc.app_state().lock();
1815 let mut current_state = self.app_state().lock();
1816 current_state.check_is_the_same(&mut calculated_state);
1817 }
1818
1819 self.renew_txn_if_auto_commit(options);
1820 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1821 }
1822 }
1823
1824 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1825 self.query_pos_internal(pos, true)
1826 }
1827
1828 pub(crate) fn query_pos_internal(
1830 &self,
1831 pos: &Cursor,
1832 ret_event_index: bool,
1833 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1834 if !self.has_container(&pos.container) {
1835 return Err(CannotFindRelativePosition::IdNotFound);
1836 }
1837
1838 let mut state = self.state.lock();
1839 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1840 Ok(PosQueryResult {
1841 update: None,
1842 current: AbsolutePosition {
1843 pos: ans,
1844 side: pos.side,
1845 },
1846 })
1847 } else {
1848 drop(state);
1860 let result = self.with_barrier(|| {
1861 let oplog = self.oplog().lock();
1862 if let Some(id) = pos.id {
1864 if oplog.arena.id_to_idx(&pos.container).is_none() {
1866 let mut s = self.state.lock();
1867 if !s.does_container_exist(&pos.container) {
1868 return Err(CannotFindRelativePosition::ContainerDeleted);
1869 }
1870 s.ensure_container(&pos.container);
1871 drop(s);
1872 }
1873 let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
1874 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1876 if oplog.shallow_since_vv().includes_id(id) {
1877 return Err(CannotFindRelativePosition::HistoryCleared);
1878 }
1879
1880 tracing::error!("Cannot find id {}", id);
1881 return Err(CannotFindRelativePosition::IdNotFound);
1882 };
1883 let mut diff_calc = DiffCalculator::new(true);
1885 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1886 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1887 diff_calc.calc_diff_internal(
1889 &oplog,
1890 before,
1891 &before_frontiers,
1892 oplog.vv(),
1893 oplog.frontiers(),
1894 Some(&|target| idx == target),
1895 );
1896 let depth = self.arena.get_depth(idx);
1898 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1899 match diff_calc {
1900 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1901 let c = text.get_id_latest_pos(id).unwrap();
1902 let new_pos = c.pos;
1903 let handler = self.get_text(&pos.container);
1904 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1905 Ok(PosQueryResult {
1906 update: handler.get_cursor(current_pos, c.side),
1907 current: AbsolutePosition {
1908 pos: current_pos,
1909 side: c.side,
1910 },
1911 })
1912 }
1913 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1914 let c = list.get_id_latest_pos(id).unwrap();
1915 let new_pos = c.pos;
1916 let handler = self.get_list(&pos.container);
1917 Ok(PosQueryResult {
1918 update: handler.get_cursor(new_pos, c.side),
1919 current: AbsolutePosition {
1920 pos: new_pos,
1921 side: c.side,
1922 },
1923 })
1924 }
1925 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1926 let c = list.get_id_latest_pos(id).unwrap();
1927 let new_pos = c.pos;
1928 let handler = self.get_movable_list(&pos.container);
1929 let new_pos = handler.op_pos_to_user_pos(new_pos);
1930 Ok(PosQueryResult {
1931 update: handler.get_cursor(new_pos, c.side),
1932 current: AbsolutePosition {
1933 pos: new_pos,
1934 side: c.side,
1935 },
1936 })
1937 }
1938 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1939 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1940 #[cfg(feature = "counter")]
1941 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1942 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1943 }
1944 } else {
1945 match pos.container.container_type() {
1946 ContainerType::Text => {
1947 let text = self.get_text(&pos.container);
1948 Ok(PosQueryResult {
1949 update: Some(Cursor {
1950 id: None,
1951 container: text.id(),
1952 side: pos.side,
1953 origin_pos: text.len_unicode(),
1954 }),
1955 current: AbsolutePosition {
1956 pos: text.len_event(),
1957 side: pos.side,
1958 },
1959 })
1960 }
1961 ContainerType::List => {
1962 let list = self.get_list(&pos.container);
1963 Ok(PosQueryResult {
1964 update: Some(Cursor {
1965 id: None,
1966 container: list.id(),
1967 side: pos.side,
1968 origin_pos: list.len(),
1969 }),
1970 current: AbsolutePosition {
1971 pos: list.len(),
1972 side: pos.side,
1973 },
1974 })
1975 }
1976 ContainerType::MovableList => {
1977 let list = self.get_movable_list(&pos.container);
1978 Ok(PosQueryResult {
1979 update: Some(Cursor {
1980 id: None,
1981 container: list.id(),
1982 side: pos.side,
1983 origin_pos: list.len(),
1984 }),
1985 current: AbsolutePosition {
1986 pos: list.len(),
1987 side: pos.side,
1988 },
1989 })
1990 }
1991 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1992 unreachable!()
1993 }
1994 #[cfg(feature = "counter")]
1995 ContainerType::Counter => unreachable!(),
1996 }
1997 }
1998 });
1999 result
2000 }
2001 }
2002
2003 pub fn free_history_cache(&self) {
2008 self.oplog.lock().free_history_cache();
2009 }
2010
2011 pub fn free_diff_calculator(&self) {
2013 *self.diff_calculator.lock() = DiffCalculator::new(true);
2014 }
2015
2016 pub fn has_history_cache(&self) -> bool {
2019 self.oplog.lock().has_history_cache()
2020 }
2021
2022 #[inline]
2026 pub fn compact_change_store(&self) {
2027 self.with_barrier(|| {
2028 self.oplog.lock().compact_change_store();
2029 });
2030 }
2031
2032 #[inline]
2036 pub fn analyze(&self) -> DocAnalysis {
2037 DocAnalysis::analyze(self)
2038 }
2039
2040 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
2042 let mut state = self.state.lock();
2043 if state.arena.id_to_idx(id).is_none() {
2044 if id.is_mergeable() {
2045 state.arena.register_container(id);
2049 } else if !state.does_container_exist(id) {
2050 return None;
2051 } else {
2052 state.ensure_container(id);
2053 }
2054 }
2055 let idx = state.arena.id_to_idx(id).unwrap();
2056 state.get_path(idx)
2057 }
2058
2059 #[instrument(skip(self))]
2060 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
2061 self.with_barrier(|| {
2062 let ans = match mode {
2063 ExportMode::Snapshot => export_fast_snapshot(self),
2064 ExportMode::Updates { from } => export_fast_updates(self, &from),
2065 ExportMode::UpdatesInRange { spans } => {
2066 export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
2067 }
2068 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
2069 ExportMode::StateOnly(f) => match f {
2070 Some(f) => export_state_only_snapshot(self, &f)?,
2071 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
2072 },
2073 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
2074 };
2075 Ok(ans)
2076 })
2077 }
2078
2079 pub fn shallow_since_vv(&self) -> ImVersionVector {
2085 self.oplog().lock().shallow_since_vv().clone()
2086 }
2087
2088 pub fn shallow_since_frontiers(&self) -> Frontiers {
2089 self.oplog().lock().shallow_since_frontiers().clone()
2090 }
2091
2092 pub fn is_shallow(&self) -> bool {
2094 !self.oplog().lock().shallow_since_vv().is_empty()
2095 }
2096
2097 pub fn get_pending_txn_len(&self) -> usize {
2102 if let Some(txn) = self.txn.lock().as_ref() {
2103 txn.len()
2104 } else {
2105 0
2106 }
2107 }
2108
2109 #[inline]
2110 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
2111 self.oplog().lock().dag.find_path(from, to)
2112 }
2113
2114 pub fn subscribe_first_commit_from_peer(
2120 &self,
2121 callback: FirstCommitFromPeerCallback,
2122 ) -> Subscription {
2123 let (s, enable) = self
2124 .first_commit_from_peer_subs
2125 .inner()
2126 .insert((), callback);
2127 enable();
2128 s
2129 }
2130
2131 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
2136 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
2137 enable();
2138 s
2139 }
2140}
2141
2142fn pending_root_containers_to_materialize(oplog: &OpLog, changes: &[Change]) -> Vec<ContainerID> {
2143 let mut roots = FxHashSet::default();
2144 for change in changes {
2145 if change.ctr_end() <= oplog.vv().get(&change.id.peer).copied().unwrap_or(0) {
2146 continue;
2147 }
2148
2149 if oplog.dag.is_before_shallow_root(&change.deps)
2150 || oplog
2151 .dag
2152 .get_change_lamport_from_deps(&change.deps)
2153 .is_some()
2154 {
2155 continue;
2156 }
2157
2158 for op in change.ops.iter() {
2159 let id = oplog
2160 .arena
2161 .get_container_id(op.container)
2162 .expect("decoded op container should be registered");
2163 if id.is_root() && !id.is_mergeable() {
2172 roots.insert(id);
2173 }
2174 }
2175 }
2176
2177 roots.into_iter().collect()
2178}
2179
2180#[derive(Debug, thiserror::Error)]
2181pub enum ChangeTravelError {
2182 #[error("Target id not found {0:?}")]
2183 TargetIdNotFound(ID),
2184 #[error("The shallow history of the doc doesn't include the target version")]
2185 TargetVersionNotIncluded,
2186}
2187
2188impl LoroDoc {
2189 pub fn travel_change_ancestors(
2190 &self,
2191 ids: &[ID],
2192 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
2193 ) -> Result<(), ChangeTravelError> {
2194 let (options, guard) = self.implicit_commit_then_stop();
2195 drop(guard);
2196 struct PendingNode(ChangeMeta);
2197 impl PartialEq for PendingNode {
2198 fn eq(&self, other: &Self) -> bool {
2199 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
2200 }
2201 }
2202
2203 impl Eq for PendingNode {}
2204 impl PartialOrd for PendingNode {
2205 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2206 Some(self.cmp(other))
2207 }
2208 }
2209
2210 impl Ord for PendingNode {
2211 fn cmp(&self, other: &Self) -> Ordering {
2212 self.0
2213 .lamport_last()
2214 .cmp(&other.0.lamport_last())
2215 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
2216 }
2217 }
2218
2219 for id in ids {
2220 let op_log = &self.oplog().lock();
2221 if !op_log.vv().includes_id(*id) {
2222 return Err(ChangeTravelError::TargetIdNotFound(*id));
2223 }
2224 if op_log.dag.shallow_since_vv().includes_id(*id) {
2225 return Err(ChangeTravelError::TargetVersionNotIncluded);
2226 }
2227 }
2228
2229 let mut visited = FxHashSet::default();
2230 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
2231 for id in ids {
2232 pending.push(PendingNode(ChangeMeta::from_change(
2233 &self.oplog().lock().get_change_at(*id).unwrap(),
2234 )));
2235 }
2236 while let Some(PendingNode(node)) = pending.pop() {
2237 let deps = node.deps.clone();
2238 if f(node).is_break() {
2239 break;
2240 }
2241
2242 for dep in deps.iter() {
2243 let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
2244 continue;
2245 };
2246 if visited.contains(&dep_node.id) {
2247 continue;
2248 }
2249
2250 visited.insert(dep_node.id);
2251 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
2252 }
2253 }
2254
2255 let ans = Ok(());
2256 self.renew_txn_if_auto_commit(options);
2257 ans
2258 }
2259
2260 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
2261 self.with_barrier(|| {
2262 let mut set = FxHashSet::default();
2263 let len = i64::try_from(len).unwrap_or(i64::MAX);
2264 let start = i64::from(id.counter);
2265 let end = start.saturating_add(len);
2266 if end <= 0 {
2267 return set;
2268 }
2269
2270 let start = start.max(0).min(i64::from(i32::MAX));
2271 let end = end.max(0).min(i64::from(i32::MAX));
2272 if start >= end {
2273 return set;
2274 }
2275
2276 {
2277 let oplog = self.oplog().lock();
2278 let span = IdSpan::new(id.peer, start as i32, end as i32);
2279 for op in oplog.iter_ops(span) {
2280 let id = oplog.arena.get_container_id(op.container()).unwrap();
2281 set.insert(id);
2282 }
2283 }
2284 set
2285 })
2286 }
2287
2288 pub fn delete_root_container(&self, cid: ContainerID) {
2289 if !cid.is_root() {
2290 return;
2291 }
2292
2293 if !self.has_container(&cid) {
2295 return;
2296 }
2297
2298 let Some(h) = self.get_handler(cid.clone()) else {
2299 return;
2300 };
2301
2302 self.config
2303 .deleted_root_containers
2304 .lock()
2305 .insert(cid.clone());
2306 if let Err(e) = h.clear() {
2307 self.config.deleted_root_containers.lock().remove(&cid);
2308 eprintln!("Failed to clear handler: {:?}", e);
2309 }
2310 }
2311
2312 pub fn set_hide_empty_root_containers(&self, hide: bool) {
2313 self.config
2314 .hide_empty_root_containers
2315 .store(hide, std::sync::atomic::Ordering::Relaxed);
2316 }
2317}
2318
2319fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
2320 let start_vv = oplog
2331 .dag
2332 .frontiers_to_vv(&id.into())
2333 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
2334
2335 let mut best: Option<((loro_common::Lamport, loro_common::PeerID), ID)> = None;
2339
2340 for change in oplog.iter_changes_peer_by_peer(&start_vv, oplog.vv()) {
2341 let peer = change.peer();
2342 for op in change.ops.iter() {
2343 if op.container != idx {
2344 continue;
2345 }
2346 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
2347 if d.id_start.to_span(d.atom_len()).contains(id) {
2348 debug_assert!(op.counter >= change.id().counter);
2349 let op_lamport =
2350 change.lamport + (op.counter - change.id().counter) as loro_common::Lamport;
2351 let key = (op_lamport, peer);
2352 if best.is_none_or(|(bk, _)| key > bk) {
2353 best = Some((key, ID::new(peer, op.counter)));
2354 }
2355 }
2356 }
2357 }
2358 }
2359
2360 best.map(|(_, op_id)| op_id)
2361}
2362
2363#[derive(Debug)]
2364pub struct CommitWhenDrop<'a> {
2365 doc: &'a LoroDoc,
2366 default_options: CommitOptions,
2367}
2368
2369impl Drop for CommitWhenDrop<'_> {
2370 fn drop(&mut self) {
2371 {
2372 let mut guard = self.doc.txn.lock();
2373 if let Some(txn) = guard.as_mut() {
2374 txn.set_default_options(std::mem::take(&mut self.default_options));
2375 };
2376 }
2377
2378 self.doc.commit_then_renew();
2379 }
2380}
2381
2382#[derive(Debug, Clone)]
2384pub struct CommitOptions {
2385 pub origin: Option<InternalString>,
2388
2389 pub immediate_renew: bool,
2392
2393 pub timestamp: Option<Timestamp>,
2396
2397 pub commit_msg: Option<Arc<str>>,
2399}
2400
2401impl CommitOptions {
2402 pub fn new() -> Self {
2404 Self {
2405 origin: None,
2406 immediate_renew: true,
2407 timestamp: None,
2408 commit_msg: None,
2409 }
2410 }
2411
2412 pub fn origin(mut self, origin: &str) -> Self {
2414 self.origin = Some(origin.into());
2415 self
2416 }
2417
2418 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
2420 self.immediate_renew = immediate_renew;
2421 self
2422 }
2423
2424 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
2428 self.timestamp = Some(timestamp);
2429 self
2430 }
2431
2432 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
2434 self.commit_msg = Some(commit_msg.into());
2435 self
2436 }
2437
2438 pub fn set_origin(&mut self, origin: Option<&str>) {
2440 self.origin = origin.map(|x| x.into())
2441 }
2442
2443 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
2445 self.timestamp = timestamp;
2446 }
2447}
2448
2449impl Default for CommitOptions {
2450 fn default() -> Self {
2451 Self::new()
2452 }
2453}
2454
2455#[cfg(test)]
2456mod test {
2457 use std::{
2458 panic::AssertUnwindSafe,
2459 sync::{
2460 atomic::{AtomicUsize, Ordering},
2461 Arc,
2462 },
2463 };
2464
2465 use crate::{
2466 cursor::PosType,
2467 encoding::json_schema::json::{JsonOpContent, JsonSchema, ListOp},
2468 encoding::{fast_snapshot::EMPTY_MARK, EncodeMode},
2469 loro::ExportMode,
2470 version::{Frontiers, VersionVector},
2471 LoroDoc, ToJson, TreeParentId,
2472 };
2473 use bytes::{BufMut, Bytes};
2474 use loro_common::ID;
2475 use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
2476
2477 const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
2478
2479 fn encode_import_blob(mode: EncodeMode, body: &[u8]) -> Vec<u8> {
2480 let mut ans = Vec::new();
2481 ans.extend_from_slice(b"loro");
2482 ans.extend_from_slice(&[0; 16]);
2483 ans.extend_from_slice(&mode.to_bytes());
2484 ans.extend_from_slice(body);
2485 let checksum = xxhash_rust::xxh32::xxh32(&ans[20..], XXH_SEED);
2486 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
2487 ans
2488 }
2489
2490 fn encode_fast_snapshot_import(oplog_bytes: &[u8]) -> Vec<u8> {
2491 let mut body = Vec::new();
2492 body.put_u32_le(oplog_bytes.len() as u32);
2493 body.extend_from_slice(oplog_bytes);
2494 body.put_u32_le(EMPTY_MARK.len() as u32);
2495 body.extend_from_slice(EMPTY_MARK);
2496 body.put_u32_le(0);
2497 encode_import_blob(EncodeMode::FastSnapshot, &body)
2498 }
2499
2500 fn sstable_with_huge_meta_block_count() -> Vec<u8> {
2501 let mut bytes = Vec::new();
2502 bytes.extend_from_slice(b"LORO");
2503 bytes.push(0);
2504 bytes.put_u32_le(10_000_000);
2505 bytes.put_u32_le(xxhash_rust::xxh32::xxh32(&[], XXH_SEED));
2506 bytes.put_u32_le(5);
2507 bytes
2508 }
2509
2510 fn snapshot_oplog_with_malformed_block() -> Vec<u8> {
2511 let peer = 1;
2512 let id = ID::new(peer, 0);
2513 let vv = VersionVector::from_iter([(peer, 1)]);
2514 let frontiers = Frontiers::from_id(id);
2515 let mut store = MemKvStore::new(MemKvConfig::default());
2516 store.set(b"vv", vv.encode().into());
2517 store.set(b"fr", frontiers.encode().into());
2518 store.set(&id.to_bytes(), Bytes::from_static(&[0]));
2519 store.export_all().to_vec()
2520 }
2521
2522 fn make_json_import_stress_doc(peer: u64) -> LoroDoc {
2523 let doc = LoroDoc::new_auto_commit();
2524 doc.set_peer_id(peer).unwrap();
2525
2526 let text = doc.get_text("text");
2527 let mut text_pos = 0;
2528 for i in 0..32 {
2529 let chunk = format!("segment-{i}-abcdefghijklmnopqrstuvwxyz;");
2530 text.insert_unicode(text_pos, &chunk).unwrap();
2531 text_pos += chunk.chars().count();
2532 }
2533
2534 let list = doc.get_list("list");
2535 for i in 0..32 {
2536 list.insert(i, format!("item-{i}")).unwrap();
2537 }
2538
2539 let map = doc.get_map("map");
2540 for i in 0..32 {
2541 let key = format!("key-{i}");
2542 map.insert(&key, format!("value-{i}")).unwrap();
2543 }
2544
2545 let tree = doc.get_tree("tree");
2546 let mut parent = TreeParentId::Root;
2547 for i in 0..16 {
2548 let node = tree.create(parent).unwrap();
2549 let meta = tree.get_meta(node).unwrap();
2550 meta.insert("name", format!("node-{i}")).unwrap();
2551 meta.insert("payload", format!("payload-{i}-{}", "x".repeat(16)))
2552 .unwrap();
2553 parent = TreeParentId::Node(node);
2554 }
2555
2556 doc
2557 }
2558
2559 fn make_json_list_update_with_four_ops(peer: u64) -> (LoroDoc, JsonSchema) {
2560 let doc = LoroDoc::new();
2561 doc.set_peer_id(peer).unwrap();
2562 let map = doc.get_map("map");
2563 let list = doc.get_list("list");
2564 let text = doc.get_text("text");
2565
2566 let mut txn = doc.txn().unwrap();
2567 map.insert_with_txn(&mut txn, "prefix", "map-value".into())
2568 .unwrap();
2569 list.insert_with_txn(&mut txn, 0, "seed".into()).unwrap();
2570 text.insert_with_txn(&mut txn, 0, "text-value", PosType::Unicode)
2571 .unwrap();
2572 list.insert_with_txn(&mut txn, 1, "tail".into()).unwrap();
2573 txn.commit().unwrap();
2574
2575 let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv(), false);
2576 assert_eq!(json.changes.len(), 1);
2577 assert_eq!(json.changes[0].ops.len(), 4);
2578 (doc, json)
2579 }
2580
2581 fn move_last_list_insert_far_out_of_bounds(json: &mut JsonSchema) {
2582 let last_change = json.changes.last_mut().unwrap();
2583 let last_op = last_change.ops.last_mut().unwrap();
2584 match &mut last_op.content {
2585 JsonOpContent::List(ListOp::Insert { pos, .. }) => {
2586 *pos = 1_000;
2587 }
2588 other => panic!("expected list insert op, got {other:?}"),
2589 }
2590 }
2591
2592 #[test]
2593 fn test_sync() {
2594 fn is_send_sync<T: Send + Sync>(_v: T) {}
2595 let loro = super::LoroDoc::new();
2596 is_send_sync(loro)
2597 }
2598
2599 #[test]
2600 fn import_rejects_huge_sstable_meta_block_count_without_panic() {
2601 let bytes = encode_fast_snapshot_import(&sstable_with_huge_meta_block_count());
2602
2603 let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2604 assert!(result.is_ok(), "malformed import should not panic");
2605 assert!(result.unwrap().is_err());
2606 }
2607
2608 #[test]
2609 fn import_rejects_malformed_change_block_without_panic() {
2610 let bytes = encode_fast_snapshot_import(&snapshot_oplog_with_malformed_block());
2611
2612 let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
2613 assert!(result.is_ok(), "malformed import should not panic");
2614 assert!(result.unwrap().is_err());
2615 }
2616
2617 #[test]
2618 fn failed_import_rolls_back_oplog_and_arena() {
2619 let src = LoroDoc::new();
2620 src.set_peer_id(1).unwrap();
2621 let text = src.get_text("text");
2622 let mut txn = src.txn().unwrap();
2623 text.insert_with_txn(&mut txn, 0, "hello", PosType::Unicode)
2624 .unwrap();
2625 txn.commit().unwrap();
2626 let update = src.export(ExportMode::all_updates()).unwrap();
2627
2628 let dst = LoroDoc::new();
2629 let vv_before_import = dst.oplog_vv();
2630 let state_before_import = dst.get_deep_value();
2631 let err = dst
2632 .import_with(&update, "__loro_fail_import_state_apply".into())
2633 .unwrap_err();
2634 assert!(err.to_string().contains("state apply failpoint"));
2635 assert_eq!(dst.oplog_vv(), vv_before_import);
2636 assert_eq!(dst.get_deep_value(), state_before_import);
2637 assert!(dst.oplog().lock().is_empty());
2638
2639 dst.import(&update).unwrap();
2640 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2641 }
2642
2643 #[test]
2644 fn failed_incremental_import_restores_previous_change_store_block() {
2645 let src = LoroDoc::new();
2646 src.set_peer_id(1).unwrap();
2647 let text = src.get_text("text");
2648 let mut txn = src.txn().unwrap();
2649 text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2650 .unwrap();
2651 txn.commit().unwrap();
2652 let first_update = src.export(ExportMode::all_updates()).unwrap();
2653 let first_vv = src.oplog_vv();
2654
2655 let mut txn = src.txn().unwrap();
2656 text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2657 .unwrap();
2658 txn.commit().unwrap();
2659 let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2660
2661 let dst = LoroDoc::new();
2662 dst.import(&first_update).unwrap();
2663 let vv_before_import = dst.oplog_vv();
2664 let state_before_import = dst.get_deep_value();
2665 dst.import_with(&second_update, "__loro_fail_import_state_apply".into())
2666 .unwrap_err();
2667 assert_eq!(dst.oplog_vv(), vv_before_import);
2668 assert_eq!(dst.get_deep_value(), state_before_import);
2669
2670 dst.import(&second_update).unwrap();
2671 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2672 }
2673
2674 #[test]
2675 fn failed_import_json_updates_rolls_back_complex_empty_doc() {
2676 let src = make_json_import_stress_doc(11);
2677 let json = src.export_json_updates(&Default::default(), &src.oplog_vv(), false);
2678
2679 let dst = LoroDoc::new();
2680 let vv_before_import = dst.oplog_vv();
2681 let frontiers_before_import = dst.oplog_frontiers();
2682 let state_before_import = dst.get_deep_value();
2683 for _ in 0..3 {
2684 crate::state::fail_next_import_state_apply_for_test();
2685 let err = dst.import_json_updates(json.clone()).unwrap_err();
2686 assert!(err.to_string().contains("state apply failpoint"));
2687 assert_eq!(dst.oplog_vv(), vv_before_import);
2688 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2689 assert_eq!(dst.get_deep_value(), state_before_import);
2690 assert!(dst.oplog().lock().is_empty());
2691 }
2692
2693 dst.import_json_updates(json).unwrap();
2694 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2695 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2696 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2697 }
2698
2699 #[test]
2700 fn failed_incremental_import_json_updates_restores_previous_change_store_block() {
2701 let src = LoroDoc::new_auto_commit();
2702 src.set_peer_id(12).unwrap();
2703 let text = src.get_text("text");
2704 text.insert_unicode(0, "a").unwrap();
2705 let list = src.get_list("list");
2706 list.push("seed").unwrap();
2707 let map = src.get_map("map");
2708 map.insert("seed", "value").unwrap();
2709 let tree = src.get_tree("tree");
2710 let root = tree.create(TreeParentId::Root).unwrap();
2711 tree.get_meta(root).unwrap().insert("name", "root").unwrap();
2712
2713 let first_vv = src.oplog_vv();
2714 let first_json = src.export_json_updates(&Default::default(), &first_vv, false);
2715
2716 let mut text_pos = text.len_unicode();
2717 for i in 0..64 {
2718 let chunk = format!("chunk-{i};");
2719 text.insert_unicode(text_pos, &chunk).unwrap();
2720 text_pos += chunk.chars().count();
2721 }
2722 for i in 0..32 {
2723 list.push(format!("after-{i}")).unwrap();
2724 let key = format!("after-{i}");
2725 map.insert(&key, format!("value-{i}")).unwrap();
2726 }
2727 let child = tree.create(TreeParentId::Node(root)).unwrap();
2728 tree.get_meta(child)
2729 .unwrap()
2730 .insert("name", "child")
2731 .unwrap();
2732
2733 let second_json = src.export_json_updates(&first_vv, &src.oplog_vv(), false);
2734
2735 let dst = LoroDoc::new();
2736 dst.import_json_updates(first_json).unwrap();
2737 let vv_before_import = dst.oplog_vv();
2738 let frontiers_before_import = dst.oplog_frontiers();
2739 let state_before_import = dst.get_deep_value();
2740
2741 for _ in 0..2 {
2742 crate::state::fail_next_import_state_apply_for_test();
2743 let err = dst.import_json_updates(second_json.clone()).unwrap_err();
2744 assert!(err.to_string().contains("state apply failpoint"));
2745 assert_eq!(dst.oplog_vv(), vv_before_import);
2746 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2747 assert_eq!(dst.get_deep_value(), state_before_import);
2748 }
2749
2750 dst.import_json_updates(second_json).unwrap();
2751 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2752 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2753 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2754 }
2755
2756 #[test]
2757 fn malformed_later_import_json_update_rolls_back_after_valid_prefix_enters_oplog() {
2758 let peer = 13;
2759 let (src, good_json) = make_json_list_update_with_four_ops(peer);
2760 let mut bad_json = good_json.clone();
2761 move_last_list_insert_far_out_of_bounds(&mut bad_json);
2762
2763 let good_dst = LoroDoc::new();
2764 good_dst.import_json_updates(good_json.clone()).unwrap();
2765 assert_eq!(good_dst.get_deep_value(), src.get_deep_value());
2766
2767 let last_op_counter = good_json.changes[0].ops.last().unwrap().counter;
2768 let prefix_vv = VersionVector::from_iter([(peer, last_op_counter)]);
2769 let prefix_json = src.export_json_updates(&Default::default(), &prefix_vv, false);
2770 assert_eq!(
2771 prefix_json.changes[0].ops.len(),
2772 good_json.changes[0].ops.len() - 1
2773 );
2774 let good_suffix_json = src.export_json_updates(&prefix_vv, &src.oplog_vv(), false);
2775 assert_eq!(good_suffix_json.changes[0].ops.len(), 1);
2776 let mut bad_suffix_json = good_suffix_json.clone();
2777 move_last_list_insert_far_out_of_bounds(&mut bad_suffix_json);
2778
2779 let prefix_dst = LoroDoc::new();
2780 prefix_dst.import_json_updates(prefix_json.clone()).unwrap();
2781 let vv_before_bad_suffix = prefix_dst.oplog_vv();
2782 let frontiers_before_bad_suffix = prefix_dst.oplog_frontiers();
2783 let state_before_bad_suffix = prefix_dst.get_deep_value();
2784
2785 let bad_suffix_json = serde_json::to_string(&bad_suffix_json).unwrap();
2786 let err = prefix_dst
2787 .import_json_updates(&bad_suffix_json)
2788 .unwrap_err();
2789 assert!(
2790 err.to_string().contains("list diff"),
2791 "expected state list bounds validation, got {err:?}"
2792 );
2793 assert_eq!(prefix_dst.oplog_vv(), vv_before_bad_suffix);
2794 assert_eq!(prefix_dst.oplog_frontiers(), frontiers_before_bad_suffix);
2795 assert_eq!(prefix_dst.get_deep_value(), state_before_bad_suffix);
2796
2797 prefix_dst.import_json_updates(good_suffix_json).unwrap();
2798 assert_eq!(prefix_dst.get_deep_value(), src.get_deep_value());
2799 assert_eq!(prefix_dst.oplog_vv(), src.oplog_vv());
2800
2801 let dst = LoroDoc::new();
2802 let vv_before_import = dst.oplog_vv();
2803 let frontiers_before_import = dst.oplog_frontiers();
2804 let state_before_import = dst.get_deep_value();
2805 let bad_json = serde_json::to_string(&bad_json).unwrap();
2806 let err = dst.import_json_updates(&bad_json).unwrap_err();
2807 assert!(
2808 err.to_string().contains("list diff"),
2809 "expected state list bounds validation, got {err:?}"
2810 );
2811 assert_eq!(dst.oplog_vv(), vv_before_import);
2812 assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
2813 assert_eq!(dst.get_deep_value(), state_before_import);
2814 assert!(dst.oplog().lock().is_empty());
2815 }
2816
2817 #[test]
2818 fn failed_import_restores_pending_changes_that_were_applied_during_import() {
2819 let src = LoroDoc::new();
2820 src.set_peer_id(14).unwrap();
2821 let text = src.get_text("text");
2822
2823 let mut txn = src.txn().unwrap();
2824 text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
2825 .unwrap();
2826 txn.commit().unwrap();
2827 let first_update = src.export(ExportMode::all_updates()).unwrap();
2828 let first_vv = src.oplog_vv();
2829
2830 let mut txn = src.txn().unwrap();
2831 text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
2832 .unwrap();
2833 txn.commit().unwrap();
2834 let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
2835
2836 let dst = LoroDoc::new();
2837 let status = dst.import(&second_update).unwrap();
2838 assert!(status.success.is_empty());
2839 assert!(status.pending.is_some());
2840 let vv_before_dependency = dst.oplog_vv();
2841 let frontiers_before_dependency = dst.oplog_frontiers();
2842 let state_before_dependency = dst.get_deep_value();
2843
2844 crate::state::fail_next_import_state_apply_for_test();
2845 let err = dst.import(&first_update).unwrap_err();
2846 assert!(err.to_string().contains("state apply failpoint"));
2847 assert_eq!(dst.oplog_vv(), vv_before_dependency);
2848 assert_eq!(dst.oplog_frontiers(), frontiers_before_dependency);
2849 assert_eq!(dst.get_deep_value(), state_before_dependency);
2850
2851 dst.import(&first_update).unwrap();
2852 assert_eq!(dst.oplog_vv(), src.oplog_vv());
2853 assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
2854 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2855 }
2856
2857 #[test]
2858 fn failed_import_json_updates_does_not_emit_or_leave_events() {
2859 let (src, good_json) = make_json_list_update_with_four_ops(15);
2860 let mut bad_json = good_json.clone();
2861 move_last_list_insert_far_out_of_bounds(&mut bad_json);
2862
2863 let dst = LoroDoc::new();
2864 let event_count = Arc::new(AtomicUsize::new(0));
2865 let event_count_cloned = event_count.clone();
2866 let _sub = dst.subscribe_root(Arc::new(move |_| {
2867 event_count_cloned.fetch_add(1, Ordering::SeqCst);
2868 }));
2869
2870 let bad_json = serde_json::to_string(&bad_json).unwrap();
2871 let err = dst.import_json_updates(&bad_json).unwrap_err();
2872 assert!(
2873 err.to_string().contains("list diff"),
2874 "expected state list bounds validation, got {err:?}"
2875 );
2876 assert_eq!(event_count.load(Ordering::SeqCst), 0);
2877 assert!(dst.drop_pending_events().is_empty());
2878 assert!(dst.oplog().lock().is_empty());
2879
2880 dst.import_json_updates(good_json).unwrap();
2881 assert_eq!(event_count.load(Ordering::SeqCst), 1);
2882 assert_eq!(dst.get_deep_value(), src.get_deep_value());
2883 }
2884
2885 #[test]
2886 fn test_checkout() {
2887 let loro = LoroDoc::new();
2888 loro.set_peer_id(1).unwrap();
2889 let text = loro.get_text("text");
2890 let map = loro.get_map("map");
2891 let list = loro.get_list("list");
2892 let mut txn = loro.txn().unwrap();
2893 for i in 0..10 {
2894 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2895 text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
2896 .unwrap();
2897 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2898 }
2899 txn.commit().unwrap();
2900 let b = LoroDoc::new();
2901 b.import(&loro.export(ExportMode::Snapshot).unwrap())
2902 .unwrap();
2903 loro.checkout(&Frontiers::default()).unwrap();
2904 {
2905 let json = &loro.get_deep_value();
2906 assert_eq!(
2907 json.to_json_value(),
2908 serde_json::json!({"text":"","list":[],"map":{}})
2909 );
2910 }
2911
2912 b.checkout(&ID::new(1, 2).into()).unwrap();
2913 {
2914 let json = &b.get_deep_value();
2915 assert_eq!(
2916 json.to_json_value(),
2917 serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
2918 );
2919 }
2920
2921 loro.checkout(&ID::new(1, 3).into()).unwrap();
2922 {
2923 let json = &loro.get_deep_value();
2924 assert_eq!(
2925 json.to_json_value(),
2926 serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
2927 );
2928 }
2929
2930 b.checkout(&ID::new(1, 29).into()).unwrap();
2931 {
2932 let json = &b.get_deep_value();
2933 assert_eq!(
2934 json.to_json_value(),
2935 serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
2936 );
2937 }
2938 }
2939
2940 #[test]
2941 fn import_batch_err_181() {
2942 let a = LoroDoc::new_auto_commit();
2943 let update_a = a.export(ExportMode::Snapshot);
2944 let b = LoroDoc::new_auto_commit();
2945 b.import_batch(&[update_a.unwrap()]).unwrap();
2946 b.get_text("text")
2947 .insert(0, "hello", PosType::Unicode)
2948 .unwrap();
2949 b.commit_then_renew();
2950 let oplog = b.oplog().lock();
2951 drop(oplog);
2952 b.export(ExportMode::all_updates()).unwrap();
2953 }
2954
2955 #[test]
2956 fn poisoned_mutex_keeps_follow_up_operations_failed() {
2957 let doc = LoroDoc::new();
2958 let oplog = doc.oplog.clone();
2959 let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
2960 let _guard = oplog.lock();
2961 panic!("poison oplog");
2962 }));
2963
2964 let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
2965 .expect_err("poisoned lock should continue to fail fast");
2966 let msg = if let Some(msg) = err.downcast_ref::<&str>() {
2967 (*msg).to_string()
2968 } else if let Some(msg) = err.downcast_ref::<String>() {
2969 msg.clone()
2970 } else {
2971 String::new()
2972 };
2973 assert!(msg.contains("poisoned LoroMutex"), "{msg}");
2974 }
2975}