1use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
2pub use crate::encoding::ExportMode;
3use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
4pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
5use crate::sync::AtomicBool;
6pub(crate) use crate::LoroDocInner;
7use crate::{
8 arena::SharedArena,
9 change::Timestamp,
10 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
11 container::{
12 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
13 IntoContainerId,
14 },
15 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
16 dag::{Dag, DagUtils},
17 diff_calc::DiffCalculator,
18 encoding::{
19 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
20 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
21 export_state_only_snapshot,
22 json_schema::{encode_change_to_json, json::JsonSchema},
23 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
24 },
25 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
26 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
27 id::PeerID,
28 json::JsonChange,
29 op::InnerContent,
30 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
31 state::DocState,
32 subscription::{LocalUpdateCallback, Observer, Subscriber},
33 undo::DiffBatch,
34 utils::subscription::{SubscriberSetWithQueue, Subscription},
35 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
36 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
37 VersionVector,
38};
39use crate::{change::ChangeRef, lock::LockKind};
40use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
41use crate::{
42 lock::{LoroLockGroup, LoroMutex},
43 txn::Transaction,
44};
45use either::Either;
46use fxhash::{FxHashMap, FxHashSet};
47use loro_common::{
48 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
49 LoroValue, ID,
50};
51use rle::HasLength;
52use std::{
53 borrow::Cow,
54 cmp::Ordering,
55 collections::{hash_map::Entry, BinaryHeap},
56 ops::ControlFlow,
57 sync::{
58 atomic::Ordering::{Acquire, Release},
59 Arc,
60 },
61};
62use tracing::{debug_span, info, info_span, instrument, warn};
63
64impl Default for LoroDoc {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl std::fmt::Debug for LoroDocInner {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("LoroDoc")
73 .field("config", &self.config)
74 .field("auto_commit", &self.auto_commit)
75 .field("detached", &self.detached)
76 .finish()
77 }
78}
79
80impl LoroDoc {
81 pub fn new() -> Self {
82 let oplog = OpLog::new();
83 let arena = oplog.arena.clone();
84 let config: Configure = oplog.configure.clone();
85 let lock_group = LoroLockGroup::new();
86 let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
87 let inner = Arc::new_cyclic(|w| {
88 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
89 LoroDocInner {
90 oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
91 state,
92 config,
93 detached: AtomicBool::new(false),
94 auto_commit: AtomicBool::new(false),
95 observer: Arc::new(Observer::new(arena.clone())),
96 diff_calculator: Arc::new(
97 lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
98 ),
99 txn: global_txn,
100 arena,
101 local_update_subs: SubscriberSetWithQueue::new(),
102 peer_id_change_subs: SubscriberSetWithQueue::new(),
103 pre_commit_subs: SubscriberSetWithQueue::new(),
104 first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
105 }
106 });
107 LoroDoc { inner }
108 }
109
110 pub fn fork(&self) -> Self {
111 if self.is_detached() {
112 return self.fork_at(&self.state_frontiers());
113 }
114
115 let (options, txn) = self.commit_then_stop();
116 drop(txn);
117 let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self);
118 let doc = Self::new();
119 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc).unwrap();
120 doc.set_config(&self.config);
121 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
122 doc.start_auto_commit();
123 }
124 self.renew_txn_if_auto_commit(options);
125 doc
126 }
127 pub fn set_detached_editing(&self, enable: bool) {
143 self.config.set_detached_editing(enable);
144 if enable && self.is_detached() {
145 let (options, txn) = self.commit_then_stop();
146 drop(txn);
147 self.renew_peer_id();
148 self.renew_txn_if_auto_commit(options);
149 }
150 }
151
152 #[inline]
154 pub fn new_auto_commit() -> Self {
155 let doc = Self::new();
156 doc.start_auto_commit();
157 doc
158 }
159
160 #[inline(always)]
161 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
162 if peer == PeerID::MAX {
163 return Err(LoroError::InvalidPeerID);
164 }
165 let next_id = self.oplog.lock().unwrap().next_id(peer);
166 if self.auto_commit.load(Acquire) {
167 let doc_state = self.state.lock().unwrap();
168 doc_state
169 .peer
170 .store(peer, std::sync::atomic::Ordering::Relaxed);
171
172 if doc_state.is_in_txn() {
173 drop(doc_state);
174 self.commit_then_renew();
175 }
176 self.peer_id_change_subs.emit(&(), next_id);
177 return Ok(());
178 }
179
180 let doc_state = self.state.lock().unwrap();
181 if doc_state.is_in_txn() {
182 return Err(LoroError::TransactionError(
183 "Cannot change peer id during transaction"
184 .to_string()
185 .into_boxed_str(),
186 ));
187 }
188
189 doc_state
190 .peer
191 .store(peer, std::sync::atomic::Ordering::Relaxed);
192 drop(doc_state);
193 self.peer_id_change_subs.emit(&(), next_id);
194 Ok(())
195 }
196
197 pub(crate) fn renew_peer_id(&self) {
199 let peer_id = DefaultRandom.next_u64();
200 self.set_peer_id(peer_id).unwrap();
201 }
202
203 #[inline]
210 #[must_use]
211 pub fn commit_then_stop(&self) -> (Option<CommitOptions>, LoroMutexGuard<Option<Transaction>>) {
212 let (a, b) = self.commit_with(CommitOptions::new().immediate_renew(false));
213 (a, b.unwrap())
214 }
215
216 #[inline]
221 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
222 self.commit_with(CommitOptions::new().immediate_renew(true))
223 .0
224 }
225
226 fn before_commit(&self) -> Option<LoroMutexGuard<Option<Transaction>>> {
231 let mut txn_guard = self.txn.lock().unwrap();
232 let Some(txn) = txn_guard.as_mut() else {
233 return Some(txn_guard);
234 };
235
236 if txn.is_peer_first_appearance {
237 txn.is_peer_first_appearance = false;
238 drop(txn_guard);
239 self.first_commit_from_peer_subs.emit(
241 &(),
242 FirstCommitFromPeerPayload {
243 peer: self.peer_id(),
244 },
245 );
246 }
247
248 None
249 }
250
251 #[instrument(skip_all)]
257 pub fn commit_with(
258 &self,
259 config: CommitOptions,
260 ) -> (
261 Option<CommitOptions>,
262 Option<LoroMutexGuard<Option<Transaction>>>,
263 ) {
264 if !self.auto_commit.load(Acquire) {
265 let txn_guard = self.txn.lock().unwrap();
266 return (None, Some(txn_guard));
269 }
270
271 loop {
272 if let Some(txn_guard) = self.before_commit() {
273 return (None, Some(txn_guard));
274 }
275
276 let mut txn_guard = self.txn.lock().unwrap();
277 let txn = txn_guard.take();
278 let Some(mut txn) = txn else {
279 return (None, Some(txn_guard));
280 };
281 let on_commit = txn.take_on_commit();
282 if let Some(origin) = config.origin.clone() {
283 txn.set_origin(origin);
284 }
285
286 if let Some(timestamp) = config.timestamp {
287 txn.set_timestamp(timestamp);
288 }
289
290 if let Some(msg) = config.commit_msg.as_ref() {
291 txn.set_msg(Some(msg.clone()));
292 }
293
294 let id_span = txn.id_span();
295 let options = txn.commit().unwrap();
296 if config.immediate_renew {
297 assert!(self.can_edit());
298 let mut t = self.txn().unwrap();
299 if let Some(options) = options.as_ref() {
300 t.set_options(options.clone());
301 }
302 *txn_guard = Some(t);
303 }
304
305 if let Some(on_commit) = on_commit {
306 drop(txn_guard);
307 on_commit(&self.state, &self.oplog, id_span);
308 txn_guard = self.txn.lock().unwrap();
309 if !config.immediate_renew && txn_guard.is_some() {
310 continue;
312 }
313 }
314
315 return (
316 options,
317 if !config.immediate_renew {
318 Some(txn_guard)
319 } else {
320 None
321 },
322 );
323 }
324 }
325
326 pub fn set_next_commit_message(&self, message: &str) {
328 let mut binding = self.txn.lock().unwrap();
329 let Some(txn) = binding.as_mut() else {
330 return;
331 };
332
333 if message.is_empty() {
334 txn.set_msg(None)
335 } else {
336 txn.set_msg(Some(message.into()))
337 }
338 }
339
340 pub fn set_next_commit_origin(&self, origin: &str) {
342 let mut txn = self.txn.lock().unwrap();
343 if let Some(txn) = txn.as_mut() {
344 txn.set_origin(origin.into());
345 }
346 }
347
348 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
350 let mut txn = self.txn.lock().unwrap();
351 if let Some(txn) = txn.as_mut() {
352 txn.set_timestamp(timestamp);
353 }
354 }
355
356 pub fn set_next_commit_options(&self, options: CommitOptions) {
358 let mut txn = self.txn.lock().unwrap();
359 if let Some(txn) = txn.as_mut() {
360 txn.set_options(options);
361 }
362 }
363
364 pub fn clear_next_commit_options(&self) {
366 let mut txn = self.txn.lock().unwrap();
367 if let Some(txn) = txn.as_mut() {
368 txn.set_options(CommitOptions::new());
369 }
370 }
371
372 #[inline]
383 pub fn set_record_timestamp(&self, record: bool) {
384 self.config.set_record_timestamp(record);
385 }
386
387 #[inline]
392 pub fn set_change_merge_interval(&self, interval: i64) {
393 self.config.set_merge_interval(interval);
394 }
395
396 pub fn can_edit(&self) -> bool {
397 !self.is_detached() || self.config.detached_editing()
398 }
399
400 pub fn is_detached_editing_enabled(&self) -> bool {
401 self.config.detached_editing()
402 }
403
404 #[inline]
405 pub fn config_text_style(&self, text_style: StyleConfigMap) {
406 self.config.text_style_config.try_write().unwrap().map = text_style.map;
407 }
408
409 #[inline]
410 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
411 self.config
412 .text_style_config
413 .try_write()
414 .unwrap()
415 .default_style = text_style;
416 }
417 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
418 let doc = Self::new();
419 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
420 if mode.is_snapshot() {
421 decode_snapshot(&doc, mode, body)?;
422 Ok(doc)
423 } else {
424 Err(LoroError::DecodeError(
425 "Invalid encode mode".to_string().into(),
426 ))
427 }
428 }
429
430 #[inline(always)]
432 pub fn can_reset_with_snapshot(&self) -> bool {
433 let oplog = self.oplog.lock().unwrap();
434 if oplog.batch_importing {
435 return false;
436 }
437
438 if self.is_detached() {
439 return false;
440 }
441
442 oplog.is_empty() && self.state.lock().unwrap().can_import_snapshot()
443 }
444
445 #[inline(always)]
451 pub fn is_detached(&self) -> bool {
452 self.detached.load(Acquire)
453 }
454
455 pub(crate) fn set_detached(&self, detached: bool) {
456 self.detached.store(detached, Release);
457 }
458
459 #[inline(always)]
460 pub fn peer_id(&self) -> PeerID {
461 self.state
462 .lock()
463 .unwrap()
464 .peer
465 .load(std::sync::atomic::Ordering::Relaxed)
466 }
467
468 #[inline(always)]
469 pub fn detach(&self) {
470 let (options, txn) = self.commit_then_stop();
471 drop(txn);
472 self.set_detached(true);
473 self.renew_txn_if_auto_commit(options);
474 }
475
476 #[inline(always)]
477 pub fn attach(&self) {
478 self.checkout_to_latest()
479 }
480
481 pub fn state_timestamp(&self) -> Timestamp {
484 let f = &self.state.lock().unwrap().frontiers;
485 self.oplog.lock().unwrap().get_timestamp_of_version(f)
486 }
487
488 #[inline(always)]
489 pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
490 &self.state
491 }
492
493 #[inline]
494 pub fn get_state_deep_value(&self) -> LoroValue {
495 self.state.lock().unwrap().get_deep_value()
496 }
497
498 #[inline(always)]
499 pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
500 &self.oplog
501 }
502
503 pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
504 let (options, txn) = self.commit_then_stop();
505 let ans = self.oplog.lock().unwrap().export_from(vv);
506 drop(txn);
507 self.renew_txn_if_auto_commit(options);
508 ans
509 }
510
511 #[inline(always)]
512 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
513 let s = debug_span!("import", peer = self.peer_id());
514 let _e = s.enter();
515 self.import_with(bytes, Default::default())
516 }
517
518 #[inline]
519 pub fn import_with(
520 &self,
521 bytes: &[u8],
522 origin: InternalString,
523 ) -> Result<ImportStatus, LoroError> {
524 let (options, txn) = self.commit_then_stop();
525 assert!(txn.is_none());
526 let ans = self._import_with(bytes, origin);
527 drop(txn);
528 self.renew_txn_if_auto_commit(options);
529 ans
530 }
531
532 #[tracing::instrument(skip_all)]
533 fn _import_with(
534 &self,
535 bytes: &[u8],
536 origin: InternalString,
537 ) -> Result<ImportStatus, LoroError> {
538 ensure_cov::notify_cov("loro_internal::import");
539 let parsed = parse_header_and_body(bytes, true)?;
540 info!("Importing with mode={:?}", &parsed.mode);
541 let result = match parsed.mode {
542 EncodeMode::OutdatedRle => {
543 if self.state.lock().unwrap().is_in_txn() {
544 return Err(LoroError::ImportWhenInTxn);
545 }
546
547 let s = tracing::span!(
548 tracing::Level::INFO,
549 "Import updates ",
550 peer = self.peer_id()
551 );
552 let _e = s.enter();
553 self.update_oplog_and_apply_delta_to_state_if_needed(
554 |oplog| oplog.decode(parsed),
555 origin,
556 )
557 }
558 EncodeMode::OutdatedSnapshot => {
559 if self.can_reset_with_snapshot() {
560 tracing::info!("Init by snapshot {}", self.peer_id());
561 decode_snapshot(self, parsed.mode, parsed.body)
562 } else {
563 self.update_oplog_and_apply_delta_to_state_if_needed(
564 |oplog| oplog.decode(parsed),
565 origin,
566 )
567 }
568 }
569 EncodeMode::FastSnapshot => {
570 if self.can_reset_with_snapshot() {
571 ensure_cov::notify_cov("loro_internal::import::snapshot");
572 tracing::info!("Init by fast snapshot {}", self.peer_id());
573 decode_snapshot(self, parsed.mode, parsed.body)
574 } else {
575 self.update_oplog_and_apply_delta_to_state_if_needed(
576 |oplog| oplog.decode(parsed),
577 origin,
578 )
579
580 }
585 }
586 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
587 |oplog| oplog.decode(parsed),
588 origin,
589 ),
590 EncodeMode::Auto => {
591 unreachable!()
592 }
593 };
594
595 self.emit_events();
596
597 result
598 }
599
600 #[tracing::instrument(skip_all)]
601 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
602 &self,
603 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
604 origin: InternalString,
605 ) -> Result<ImportStatus, LoroError> {
606 let mut oplog = self.oplog.lock().unwrap();
607 if !self.is_detached() {
608 let old_vv = oplog.vv().clone();
609 let old_frontiers = oplog.frontiers().clone();
610 let result = f(&mut oplog);
611 if &old_vv != oplog.vv() {
612 let mut diff = DiffCalculator::new(false);
613 let (diff, diff_mode) = diff.calc_diff_internal(
614 &oplog,
615 &old_vv,
616 &old_frontiers,
617 oplog.vv(),
618 oplog.dag.get_frontiers(),
619 None,
620 );
621 let mut state = self.state.lock().unwrap();
622 state.apply_diff(
623 InternalDocDiff {
624 origin,
625 diff: (diff).into(),
626 by: EventTriggerKind::Import,
627 new_version: Cow::Owned(oplog.frontiers().clone()),
628 },
629 diff_mode,
630 );
631 }
632 result
633 } else {
634 f(&mut oplog)
635 }
636 }
637
638 fn emit_events(&self) {
639 let events = {
641 let mut state = self.state.lock().unwrap();
642 state.take_events()
643 };
644 for event in events {
645 self.observer.emit(event);
646 }
647 }
648
649 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
650 let mut state = self.state.lock().unwrap();
651 state.take_events()
652 }
653
654 #[instrument(skip_all)]
655 pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
656 if self.is_shallow() {
657 return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
658 }
659 let (options, txn) = self.commit_then_stop();
660 drop(txn);
661 let ans = export_snapshot(self);
662 self.renew_txn_if_auto_commit(options);
663 Ok(ans)
664 }
665
666 #[tracing::instrument(skip_all)]
670 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
671 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
672 let (options, txn) = self.commit_then_stop();
673 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
674 |oplog| crate::encoding::json_schema::import_json(oplog, json),
675 Default::default(),
676 );
677 self.emit_events();
678 drop(txn);
679 self.renew_txn_if_auto_commit(options);
680 result
681 }
682
683 pub fn export_json_updates(
684 &self,
685 start_vv: &VersionVector,
686 end_vv: &VersionVector,
687 with_peer_compression: bool,
688 ) -> JsonSchema {
689 let (options, txn) = self.commit_then_stop();
690 drop(txn);
691 let oplog = self.oplog.lock().unwrap();
692 let mut start_vv = start_vv;
693 let _temp: Option<VersionVector>;
694 if !oplog.dag.shallow_since_vv().is_empty() {
695 let mut include_all = true;
697 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
698 if start_vv.get(peer).unwrap_or(&0) < counter {
699 include_all = false;
700 break;
701 }
702 }
703 if !include_all {
704 let mut vv = start_vv.clone();
705 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
706 vv.extend_to_include_end_id(ID::new(peer, counter));
707 }
708 _temp = Some(vv);
709 start_vv = _temp.as_ref().unwrap();
710 }
711 }
712
713 let json = crate::encoding::json_schema::export_json(
714 &oplog,
715 start_vv,
716 end_vv,
717 with_peer_compression,
718 );
719 drop(oplog);
720 self.renew_txn_if_auto_commit(options);
721 json
722 }
723
724 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
725 let oplog = self.oplog.lock().unwrap();
726 let mut changes = export_json_in_id_span(&oplog, id_span);
727 if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
728 let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
729 changes.push(change_json);
730 }
731 changes
732 }
733
734 #[inline]
736 pub fn oplog_vv(&self) -> VersionVector {
737 self.oplog.lock().unwrap().vv().clone()
738 }
739
740 #[inline]
742 pub fn state_vv(&self) -> VersionVector {
743 let oplog = self.oplog.lock().unwrap();
744 let f = &self.state.lock().unwrap().frontiers;
745 oplog.dag.frontiers_to_vv(f).unwrap()
746 }
747
748 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
749 let value: LoroValue = self.state.lock().unwrap().get_value_by_path(path)?;
750 if let LoroValue::Container(c) = value {
751 Some(ValueOrHandler::Handler(Handler::new_attached(
752 c.clone(),
753 self.clone(),
754 )))
755 } else {
756 Some(ValueOrHandler::Value(value))
757 }
758 }
759
760 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
762 let path = str_to_path(path)?;
763 self.get_by_path(&path)
764 }
765
766 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
767 let arena = &self.arena;
768 let txn = self.txn.lock().unwrap();
769 let txn = txn.as_ref()?;
770 let ops_ = txn.local_ops();
771 let new_id = ID {
772 peer: *txn.peer(),
773 counter: ops_.first()?.counter,
774 };
775 let change = ChangeRef {
776 id: &new_id,
777 deps: txn.frontiers(),
778 timestamp: &txn
779 .timestamp()
780 .as_ref()
781 .copied()
782 .unwrap_or_else(|| self.oplog.lock().unwrap().get_timestamp_for_next_txn()),
783 commit_msg: txn.msg(),
784 ops: ops_,
785 lamport: txn.lamport(),
786 };
787 let json = encode_change_to_json(change, arena);
788 Some(json)
789 }
790
791 #[inline]
792 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
793 if self.has_container(&id) {
794 Some(Handler::new_attached(id, self.clone()))
795 } else {
796 None
797 }
798 }
799
800 #[inline]
803 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
804 let id = id.into_container_id(&self.arena, ContainerType::Text);
805 assert!(self.has_container(&id));
806 Handler::new_attached(id, self.clone()).into_text().unwrap()
807 }
808
809 #[inline]
812 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
813 let id = id.into_container_id(&self.arena, ContainerType::List);
814 assert!(self.has_container(&id));
815 Handler::new_attached(id, self.clone()).into_list().unwrap()
816 }
817
818 #[inline]
821 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
822 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
823 assert!(self.has_container(&id));
824 Handler::new_attached(id, self.clone())
825 .into_movable_list()
826 .unwrap()
827 }
828
829 #[inline]
832 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
833 let id = id.into_container_id(&self.arena, ContainerType::Map);
834 assert!(self.has_container(&id));
835 Handler::new_attached(id, self.clone()).into_map().unwrap()
836 }
837
838 #[inline]
841 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
842 let id = id.into_container_id(&self.arena, ContainerType::Tree);
843 assert!(self.has_container(&id));
844 Handler::new_attached(id, self.clone()).into_tree().unwrap()
845 }
846
847 #[cfg(feature = "counter")]
848 pub fn get_counter<I: IntoContainerId>(
849 &self,
850 id: I,
851 ) -> crate::handler::counter::CounterHandler {
852 let id = id.into_container_id(&self.arena, ContainerType::Counter);
853 assert!(self.has_container(&id));
854 Handler::new_attached(id, self.clone())
855 .into_counter()
856 .unwrap()
857 }
858
859 #[must_use]
860 pub fn has_container(&self, id: &ContainerID) -> bool {
861 if id.is_root() {
862 return true;
863 }
864
865 let exist = self.state.lock().unwrap().does_container_exist(id);
866 exist
867 }
868
869 #[instrument(level = "info", skip_all)]
883 pub fn undo_internal(
884 &self,
885 id_span: IdSpan,
886 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
887 post_transform_base: Option<&DiffBatch>,
888 before_diff: &mut dyn FnMut(&DiffBatch),
889 ) -> LoroResult<CommitWhenDrop> {
890 if !self.can_edit() {
891 return Err(LoroError::EditWhenDetached);
892 }
893
894 let (options, txn) = self.commit_then_stop();
895 if !self
896 .oplog()
897 .lock()
898 .unwrap()
899 .vv()
900 .includes_id(id_span.id_last())
901 {
902 self.renew_txn_if_auto_commit(options);
903 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
904 }
905
906 let (was_recording, latest_frontiers) = {
907 let mut state = self.state.lock().unwrap();
908 let was_recording = state.is_recording();
909 state.stop_and_clear_recording();
910 (was_recording, state.frontiers.clone())
911 };
912
913 let spans = self.oplog.lock().unwrap().split_span_based_on_deps(id_span);
914 let diff = crate::undo::undo(
915 spans,
916 match post_transform_base {
917 Some(d) => Either::Right(d),
918 None => Either::Left(&latest_frontiers),
919 },
920 |from, to| {
921 self._checkout_without_emitting(from, false, false).unwrap();
922 self.state.lock().unwrap().start_recording();
923 self._checkout_without_emitting(to, false, false).unwrap();
924 let mut state = self.state.lock().unwrap();
925 let e = state.take_events();
926 state.stop_and_clear_recording();
927 DiffBatch::new(e)
928 },
929 before_diff,
930 );
931
932 self._checkout_without_emitting(&latest_frontiers, false, false)?;
936 self.set_detached(false);
937 if was_recording {
938 self.state.lock().unwrap().start_recording();
939 }
940 drop(txn);
941 self.start_auto_commit();
942 if let Err(e) = self._apply_diff(diff, container_remap, true) {
946 warn!("Undo Failed {:?}", e);
947 }
948
949 if let Some(options) = options {
950 self.set_next_commit_options(options);
951 }
952 Ok(CommitWhenDrop {
953 doc: self,
954 default_options: CommitOptions::new().origin("undo"),
955 })
956 }
957
958 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
964 let f = self.state_frontiers();
967 let diff = self.diff(&f, target)?;
968 self._apply_diff(diff, &mut Default::default(), false)
969 }
970
971 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
976 {
977 let oplog = self.oplog.lock().unwrap();
979 for id in a.iter() {
980 if !oplog.dag.contains(id) {
981 return Err(LoroError::FrontiersNotFound(id));
982 }
983 }
984 for id in b.iter() {
985 if !oplog.dag.contains(id) {
986 return Err(LoroError::FrontiersNotFound(id));
987 }
988 }
989 }
990
991 let (options, txn) = self.commit_then_stop();
992 let was_detached = self.is_detached();
993 let old_frontiers = self.state_frontiers();
994 let was_recording = {
995 let mut state = self.state.lock().unwrap();
996 let is_recording = state.is_recording();
997 state.stop_and_clear_recording();
998 is_recording
999 };
1000 self._checkout_without_emitting(a, true, false).unwrap();
1001 self.state.lock().unwrap().start_recording();
1002 self._checkout_without_emitting(b, true, false).unwrap();
1003 let e = {
1004 let mut state = self.state.lock().unwrap();
1005 let e = state.take_events();
1006 state.stop_and_clear_recording();
1007 e
1008 };
1009 self._checkout_without_emitting(&old_frontiers, false, false)
1010 .unwrap();
1011 drop(txn);
1012 if !was_detached {
1013 self.set_detached(false);
1014 self.renew_txn_if_auto_commit(options);
1015 }
1016 if was_recording {
1017 self.state.lock().unwrap().start_recording();
1018 }
1019 Ok(DiffBatch::new(e))
1020 }
1021
1022 #[inline(always)]
1024 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
1025 self._apply_diff(diff, &mut Default::default(), true)
1026 }
1027
1028 pub(crate) fn _apply_diff(
1040 &self,
1041 diff: DiffBatch,
1042 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
1043 skip_unreachable: bool,
1044 ) -> LoroResult<()> {
1045 if !self.can_edit() {
1046 return Err(LoroError::EditWhenDetached);
1047 }
1048
1049 let mut ans: LoroResult<()> = Ok(());
1050 let mut missing_containers: Vec<ContainerID> = Vec::new();
1051 for (mut id, diff) in diff.into_iter() {
1052 let mut remapped = false;
1053 while let Some(rid) = container_remap.get(&id) {
1054 remapped = true;
1055 id = rid.clone();
1056 }
1057
1058 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1059 missing_containers.push(id);
1060 continue;
1061 }
1062
1063 if skip_unreachable && !remapped && !self.state.lock().unwrap().get_reachable(&id) {
1064 continue;
1065 }
1066
1067 let Some(h) = self.get_handler(id.clone()) else {
1068 return Err(LoroError::ContainersNotFound {
1069 containers: Box::new(vec![id]),
1070 });
1071 };
1072 if let Err(e) = h.apply_diff(diff, container_remap) {
1073 ans = Err(e);
1074 }
1075 }
1076
1077 if !missing_containers.is_empty() {
1078 return Err(LoroError::ContainersNotFound {
1079 containers: Box::new(missing_containers),
1080 });
1081 }
1082
1083 ans
1084 }
1085
1086 #[inline]
1088 pub fn diagnose_size(&self) {
1089 self.oplog().lock().unwrap().diagnose_size();
1090 }
1091
1092 #[inline]
1093 pub fn oplog_frontiers(&self) -> Frontiers {
1094 self.oplog().lock().unwrap().frontiers().clone()
1095 }
1096
1097 #[inline]
1098 pub fn state_frontiers(&self) -> Frontiers {
1099 self.state.lock().unwrap().frontiers.clone()
1100 }
1101
1102 #[inline]
1106 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1107 self.oplog().lock().unwrap().cmp_with_frontiers(other)
1108 }
1109
1110 #[inline]
1114 pub fn cmp_frontiers(
1115 &self,
1116 a: &Frontiers,
1117 b: &Frontiers,
1118 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1119 self.oplog().lock().unwrap().cmp_frontiers(a, b)
1120 }
1121
1122 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1123 let mut state = self.state.lock().unwrap();
1124 if !state.is_recording() {
1125 state.start_recording();
1126 }
1127
1128 self.observer.subscribe_root(callback)
1129 }
1130
1131 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1132 let mut state = self.state.lock().unwrap();
1133 if !state.is_recording() {
1134 state.start_recording();
1135 }
1136
1137 self.observer.subscribe(container_id, callback)
1138 }
1139
1140 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1141 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1142 activate();
1143 sub
1144 }
1145
1146 #[tracing::instrument(skip_all)]
1148 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1149 if bytes.is_empty() {
1150 return Ok(ImportStatus::default());
1151 }
1152
1153 if bytes.len() == 1 {
1154 return self.import(&bytes[0]);
1155 }
1156
1157 let mut success = VersionRange::default();
1158 let mut pending = VersionRange::default();
1159 let mut meta_arr = bytes
1160 .iter()
1161 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1162 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1163 meta_arr.sort_by(|a, b| {
1164 a.0.mode
1165 .cmp(&b.0.mode)
1166 .then(b.0.change_num.cmp(&a.0.change_num))
1167 });
1168
1169 let (options, txn) = self.commit_then_stop();
1170 drop(txn);
1171 let is_detached = self.is_detached();
1172 self.detach();
1173 self.oplog.lock().unwrap().batch_importing = true;
1174 let mut err = None;
1175 for (_meta, data) in meta_arr {
1176 match self.import(data) {
1177 Ok(s) => {
1178 for (peer, (start, end)) in s.success.iter() {
1179 match success.0.entry(*peer) {
1180 Entry::Occupied(mut e) => {
1181 e.get_mut().1 = *end.max(&e.get().1);
1182 }
1183 Entry::Vacant(e) => {
1184 e.insert((*start, *end));
1185 }
1186 }
1187 }
1188
1189 if let Some(p) = s.pending.as_ref() {
1190 for (&peer, &(start, end)) in p.iter() {
1191 match pending.0.entry(peer) {
1192 Entry::Occupied(mut e) => {
1193 e.get_mut().0 = start.min(e.get().0);
1194 e.get_mut().1 = end.min(e.get().1);
1195 }
1196 Entry::Vacant(e) => {
1197 e.insert((start, end));
1198 }
1199 }
1200 }
1201 }
1202 }
1203 Err(e) => {
1204 err = Some(e);
1205 }
1206 }
1207 }
1208
1209 let mut oplog = self.oplog.lock().unwrap();
1210 oplog.batch_importing = false;
1211 drop(oplog);
1212
1213 if !is_detached {
1214 self.checkout_to_latest();
1215 }
1216
1217 self.renew_txn_if_auto_commit(options);
1218 if let Some(err) = err {
1219 return Err(err);
1220 }
1221
1222 Ok(ImportStatus {
1223 success,
1224 pending: if pending.is_empty() {
1225 None
1226 } else {
1227 Some(pending)
1228 },
1229 })
1230 }
1231
1232 #[inline]
1234 pub fn get_value(&self) -> LoroValue {
1235 self.state.lock().unwrap().get_value()
1236 }
1237
1238 #[inline]
1240 pub fn get_deep_value(&self) -> LoroValue {
1241 self.state.lock().unwrap().get_deep_value()
1242 }
1243
1244 #[inline]
1246 pub fn get_deep_value_with_id(&self) -> LoroValue {
1247 self.state.lock().unwrap().get_deep_value_with_id()
1248 }
1249
1250 pub fn checkout_to_latest(&self) {
1251 let (options, _guard) = self.commit_then_stop();
1252 if !self.is_detached() {
1253 drop(_guard);
1254 self.renew_txn_if_auto_commit(options);
1255 return;
1256 }
1257
1258 self._checkout_to_latest_without_commit(true);
1259 drop(_guard);
1260 self.renew_txn_if_auto_commit(options);
1261 }
1262
1263 pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
1265 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1266 let f = self.oplog_frontiers();
1267 let this = &self;
1268 let frontiers = &f;
1269 this._checkout_without_emitting(frontiers, false, to_commit_then_renew)
1270 .unwrap(); this.emit_events();
1273 if this.config.detached_editing() {
1274 this.renew_peer_id();
1275 }
1276
1277 self.set_detached(false);
1278 });
1279 }
1280
1281 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1286 let (options, guard) = self.commit_then_stop();
1287 self._checkout_without_emitting(frontiers, true, true)?;
1288 self.emit_events();
1289 drop(guard);
1290 if self.config.detached_editing() {
1291 self.renew_peer_id();
1292 self.renew_txn_if_auto_commit(options);
1293 }
1294
1295 Ok(())
1296 }
1297
1298 #[instrument(level = "info", skip(self))]
1300 pub(crate) fn _checkout_without_emitting(
1301 &self,
1302 frontiers: &Frontiers,
1303 to_shrink_frontiers: bool,
1304 to_commit_then_renew: bool,
1305 ) -> Result<(), LoroError> {
1306 assert!(self.txn.is_locked());
1307 let from_frontiers = self.state_frontiers();
1308 info!(
1309 "checkout from={:?} to={:?} cur_vv={:?}",
1310 from_frontiers,
1311 frontiers,
1312 self.oplog_vv()
1313 );
1314
1315 if &from_frontiers == frontiers {
1316 return Ok(());
1317 }
1318
1319 let oplog = self.oplog.lock().unwrap();
1320 if oplog.dag.is_before_shallow_root(frontiers) {
1321 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1322 }
1323
1324 let frontiers = if to_shrink_frontiers {
1325 shrink_frontiers(frontiers, &oplog.dag)
1326 .map_err(|_| LoroError::SwitchToVersionBeforeShallowRoot)?
1327 } else {
1328 frontiers.clone()
1329 };
1330 if from_frontiers == frontiers {
1331 return Ok(());
1332 }
1333
1334 let mut state = self.state.lock().unwrap();
1335 let mut calc = self.diff_calculator.lock().unwrap();
1336 for i in frontiers.iter() {
1337 if !oplog.dag.contains(i) {
1338 return Err(LoroError::FrontiersNotFound(i));
1339 }
1340 }
1341
1342 let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1343 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1344 return Err(LoroError::NotFoundError(
1345 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1346 ));
1347 };
1348
1349 self.set_detached(true);
1350 let (diff, diff_mode) =
1351 calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1352 state.apply_diff(
1353 InternalDocDiff {
1354 origin: "checkout".into(),
1355 diff: Cow::Owned(diff),
1356 by: EventTriggerKind::Checkout,
1357 new_version: Cow::Owned(frontiers.clone()),
1358 },
1359 diff_mode,
1360 );
1361
1362 Ok(())
1363 }
1364
1365 #[inline]
1366 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1367 self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
1368 }
1369
1370 #[inline]
1371 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1372 self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
1373 }
1374
1375 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1379 self.import(&other.export_from(&self.oplog_vv()))
1380 }
1381
1382 pub(crate) fn arena(&self) -> &SharedArena {
1383 &self.arena
1384 }
1385
1386 #[inline]
1387 pub fn len_ops(&self) -> usize {
1388 let oplog = self.oplog.lock().unwrap();
1389 let ans = oplog.vv().iter().map(|(_, ops)| *ops).sum::<i32>() as usize;
1390 if oplog.is_shallow() {
1391 let sub = oplog
1392 .shallow_since_vv()
1393 .iter()
1394 .map(|(_, ops)| *ops)
1395 .sum::<i32>() as usize;
1396 ans - sub
1397 } else {
1398 ans
1399 }
1400 }
1401
1402 #[inline]
1403 pub fn len_changes(&self) -> usize {
1404 let oplog = self.oplog.lock().unwrap();
1405 oplog.len_changes()
1406 }
1407
1408 pub fn config(&self) -> &Configure {
1409 &self.config
1410 }
1411
1412 pub fn check_state_diff_calc_consistency_slow(&self) {
1417 {
1419 static IS_CHECKING: std::sync::atomic::AtomicBool =
1420 std::sync::atomic::AtomicBool::new(false);
1421 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1422 return;
1423 }
1424
1425 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1426 let peer_id = self.peer_id();
1427 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1428 let _g = s.enter();
1429 let options = self.commit_then_stop().0;
1430 self.oplog.lock().unwrap().check_dag_correctness();
1431 if self.is_shallow() {
1432 let initial_snapshot = self
1443 .export(ExportMode::state_only(Some(
1444 &self.shallow_since_frontiers(),
1445 )))
1446 .unwrap();
1447
1448 let doc = LoroDoc::new();
1450 doc.import(&initial_snapshot).unwrap();
1451 self.checkout(&self.shallow_since_frontiers()).unwrap();
1452 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1453
1454 let updates = self.export(ExportMode::all_updates()).unwrap();
1456
1457 doc.import(&updates).unwrap();
1459 self.checkout_to_latest();
1460
1461 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1464 let mut calculated_state = doc.app_state().lock().unwrap();
1465 let mut current_state = self.app_state().lock().unwrap();
1466 current_state.check_is_the_same(&mut calculated_state);
1467 } else {
1468 let f = self.state_frontiers();
1469 let vv = self
1470 .oplog()
1471 .lock()
1472 .unwrap()
1473 .dag
1474 .frontiers_to_vv(&f)
1475 .unwrap();
1476 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1477 let doc = Self::new();
1478 doc.import(&bytes).unwrap();
1479 let mut calculated_state = doc.app_state().lock().unwrap();
1480 let mut current_state = self.app_state().lock().unwrap();
1481 current_state.check_is_the_same(&mut calculated_state);
1482 }
1483
1484 self.renew_txn_if_auto_commit(options);
1485 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1486 }
1487 }
1488
1489 #[inline]
1490 pub fn log_estimated_size(&self) {
1491 let state = self.state.lock().unwrap();
1492 state.log_estimated_size();
1493 }
1494
1495 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1496 self.query_pos_internal(pos, true)
1497 }
1498
1499 pub(crate) fn query_pos_internal(
1501 &self,
1502 pos: &Cursor,
1503 ret_event_index: bool,
1504 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1505 let mut state = self.state.lock().unwrap();
1506 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1507 Ok(PosQueryResult {
1508 update: None,
1509 current: AbsolutePosition {
1510 pos: ans,
1511 side: pos.side,
1512 },
1513 })
1514 } else {
1515 drop(state);
1527 self.commit_then_renew();
1528 let oplog = self.oplog().lock().unwrap();
1529 if let Some(id) = pos.id {
1531 let idx = oplog
1532 .arena
1533 .id_to_idx(&pos.container)
1534 .ok_or(CannotFindRelativePosition::ContainerDeleted)?;
1535 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1537 if oplog.shallow_since_vv().includes_id(id) {
1538 return Err(CannotFindRelativePosition::HistoryCleared);
1539 }
1540
1541 tracing::error!("Cannot find id {}", id);
1542 return Err(CannotFindRelativePosition::IdNotFound);
1543 };
1544 let mut diff_calc = DiffCalculator::new(true);
1546 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1547 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1548 diff_calc.calc_diff_internal(
1550 &oplog,
1551 before,
1552 &before_frontiers,
1553 oplog.vv(),
1554 oplog.frontiers(),
1555 Some(&|target| idx == target),
1556 );
1557 let depth = self.arena.get_depth(idx);
1559 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1560 match diff_calc {
1561 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1562 let c = text.get_id_latest_pos(id).unwrap();
1563 let new_pos = c.pos;
1564 let handler = self.get_text(&pos.container);
1565 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1566 Ok(PosQueryResult {
1567 update: handler.get_cursor(current_pos, c.side),
1568 current: AbsolutePosition {
1569 pos: current_pos,
1570 side: c.side,
1571 },
1572 })
1573 }
1574 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1575 let c = list.get_id_latest_pos(id).unwrap();
1576 let new_pos = c.pos;
1577 let handler = self.get_list(&pos.container);
1578 Ok(PosQueryResult {
1579 update: handler.get_cursor(new_pos, c.side),
1580 current: AbsolutePosition {
1581 pos: new_pos,
1582 side: c.side,
1583 },
1584 })
1585 }
1586 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1587 let c = list.get_id_latest_pos(id).unwrap();
1588 let new_pos = c.pos;
1589 let handler = self.get_movable_list(&pos.container);
1590 let new_pos = handler.op_pos_to_user_pos(new_pos);
1591 Ok(PosQueryResult {
1592 update: handler.get_cursor(new_pos, c.side),
1593 current: AbsolutePosition {
1594 pos: new_pos,
1595 side: c.side,
1596 },
1597 })
1598 }
1599 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1600 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1601 #[cfg(feature = "counter")]
1602 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1603 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1604 }
1605 } else {
1606 match pos.container.container_type() {
1607 ContainerType::Text => {
1608 let text = self.get_text(&pos.container);
1609 Ok(PosQueryResult {
1610 update: Some(Cursor {
1611 id: None,
1612 container: text.id(),
1613 side: pos.side,
1614 origin_pos: text.len_unicode(),
1615 }),
1616 current: AbsolutePosition {
1617 pos: text.len_event(),
1618 side: pos.side,
1619 },
1620 })
1621 }
1622 ContainerType::List => {
1623 let list = self.get_list(&pos.container);
1624 Ok(PosQueryResult {
1625 update: Some(Cursor {
1626 id: None,
1627 container: list.id(),
1628 side: pos.side,
1629 origin_pos: list.len(),
1630 }),
1631 current: AbsolutePosition {
1632 pos: list.len(),
1633 side: pos.side,
1634 },
1635 })
1636 }
1637 ContainerType::MovableList => {
1638 let list = self.get_movable_list(&pos.container);
1639 Ok(PosQueryResult {
1640 update: Some(Cursor {
1641 id: None,
1642 container: list.id(),
1643 side: pos.side,
1644 origin_pos: list.len(),
1645 }),
1646 current: AbsolutePosition {
1647 pos: list.len(),
1648 side: pos.side,
1649 },
1650 })
1651 }
1652 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1653 unreachable!()
1654 }
1655 #[cfg(feature = "counter")]
1656 ContainerType::Counter => unreachable!(),
1657 }
1658 }
1659 }
1660 }
1661
1662 pub fn free_history_cache(&self) {
1667 self.oplog.lock().unwrap().free_history_cache();
1668 }
1669
1670 pub fn free_diff_calculator(&self) {
1672 *self.diff_calculator.lock().unwrap() = DiffCalculator::new(true);
1673 }
1674
1675 pub fn has_history_cache(&self) -> bool {
1678 self.oplog.lock().unwrap().has_history_cache()
1679 }
1680
1681 #[inline]
1685 pub fn compact_change_store(&self) {
1686 self.commit_then_renew();
1687 self.oplog.lock().unwrap().compact_change_store();
1688 }
1689
1690 #[inline]
1694 pub fn analyze(&self) -> DocAnalysis {
1695 DocAnalysis::analyze(self)
1696 }
1697
1698 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1700 let mut state = self.state.lock().unwrap();
1701 let idx = state.arena.id_to_idx(id)?;
1702 state.get_path(idx)
1703 }
1704
1705 #[instrument(skip(self))]
1706 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1707 let (options, txn) = self.commit_then_stop();
1708 let ans = match mode {
1709 ExportMode::Snapshot => export_fast_snapshot(self),
1710 ExportMode::Updates { from } => export_fast_updates(self, &from),
1711 ExportMode::UpdatesInRange { spans } => {
1712 export_fast_updates_in_range(&self.oplog.lock().unwrap(), spans.as_ref())
1713 }
1714 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1715 ExportMode::StateOnly(f) => match f {
1716 Some(f) => export_state_only_snapshot(self, &f)?,
1717 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1718 },
1719 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1720 };
1721
1722 drop(txn);
1723 self.renew_txn_if_auto_commit(options);
1724 Ok(ans)
1725 }
1726
1727 pub fn shallow_since_vv(&self) -> ImVersionVector {
1733 self.oplog().lock().unwrap().shallow_since_vv().clone()
1734 }
1735
1736 pub fn shallow_since_frontiers(&self) -> Frontiers {
1737 self.oplog()
1738 .lock()
1739 .unwrap()
1740 .shallow_since_frontiers()
1741 .clone()
1742 }
1743
1744 pub fn is_shallow(&self) -> bool {
1746 !self.oplog().lock().unwrap().shallow_since_vv().is_empty()
1747 }
1748
1749 pub fn get_pending_txn_len(&self) -> usize {
1754 if let Some(txn) = self.txn.lock().unwrap().as_ref() {
1755 txn.len()
1756 } else {
1757 0
1758 }
1759 }
1760
1761 #[inline]
1762 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1763 self.oplog().lock().unwrap().dag.find_path(from, to)
1764 }
1765
1766 pub fn subscribe_first_commit_from_peer(
1772 &self,
1773 callback: FirstCommitFromPeerCallback,
1774 ) -> Subscription {
1775 let (s, enable) = self
1776 .first_commit_from_peer_subs
1777 .inner()
1778 .insert((), callback);
1779 enable();
1780 s
1781 }
1782
1783 pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
1788 let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
1789 enable();
1790 s
1791 }
1792}
1793
1794#[derive(Debug, thiserror::Error)]
1795pub enum ChangeTravelError {
1796 #[error("Target id not found {0:?}")]
1797 TargetIdNotFound(ID),
1798 #[error("The shallow history of the doc doesn't include the target version")]
1799 TargetVersionNotIncluded,
1800}
1801
1802impl LoroDoc {
1803 pub fn travel_change_ancestors(
1804 &self,
1805 ids: &[ID],
1806 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1807 ) -> Result<(), ChangeTravelError> {
1808 self.commit_then_renew();
1809 struct PendingNode(ChangeMeta);
1810 impl PartialEq for PendingNode {
1811 fn eq(&self, other: &Self) -> bool {
1812 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1813 }
1814 }
1815
1816 impl Eq for PendingNode {}
1817 impl PartialOrd for PendingNode {
1818 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1819 Some(self.cmp(other))
1820 }
1821 }
1822
1823 impl Ord for PendingNode {
1824 fn cmp(&self, other: &Self) -> Ordering {
1825 self.0
1826 .lamport_last()
1827 .cmp(&other.0.lamport_last())
1828 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1829 }
1830 }
1831
1832 for id in ids {
1833 let op_log = &self.oplog().lock().unwrap();
1834 if !op_log.vv().includes_id(*id) {
1835 return Err(ChangeTravelError::TargetIdNotFound(*id));
1836 }
1837 if op_log.dag.shallow_since_vv().includes_id(*id) {
1838 return Err(ChangeTravelError::TargetVersionNotIncluded);
1839 }
1840 }
1841
1842 let mut visited = FxHashSet::default();
1843 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1844 for id in ids {
1845 pending.push(PendingNode(ChangeMeta::from_change(
1846 &self.oplog().lock().unwrap().get_change_at(*id).unwrap(),
1847 )));
1848 }
1849 while let Some(PendingNode(node)) = pending.pop() {
1850 let deps = node.deps.clone();
1851 if f(node).is_break() {
1852 break;
1853 }
1854
1855 for dep in deps.iter() {
1856 let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
1857 continue;
1858 };
1859 if visited.contains(&dep_node.id) {
1860 continue;
1861 }
1862
1863 visited.insert(dep_node.id);
1864 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1865 }
1866 }
1867
1868 Ok(())
1869 }
1870
1871 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1872 self.commit_then_renew();
1873 let mut set = FxHashSet::default();
1874 let oplog = &self.oplog().lock().unwrap();
1875 for op in oplog.iter_ops(id.to_span(len)) {
1876 let id = oplog.arena.get_container_id(op.container()).unwrap();
1877 set.insert(id);
1878 }
1879
1880 set
1881 }
1882}
1883
1884fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
1886 let start_vv = oplog
1887 .dag
1888 .frontiers_to_vv(&id.into())
1889 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
1890 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
1891 for op in change.ops.iter().rev() {
1892 if op.container != idx {
1893 continue;
1894 }
1895 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
1896 if d.id_start.to_span(d.atom_len()).contains(id) {
1897 return Some(ID::new(change.peer(), op.counter));
1898 }
1899 }
1900 }
1901 }
1902
1903 None
1904}
1905
1906#[derive(Debug)]
1907pub struct CommitWhenDrop<'a> {
1908 doc: &'a LoroDoc,
1909 default_options: CommitOptions,
1910}
1911
1912impl Drop for CommitWhenDrop<'_> {
1913 fn drop(&mut self) {
1914 {
1915 let mut guard = self.doc.txn.lock().unwrap();
1916 if let Some(txn) = guard.as_mut() {
1917 txn.set_default_options(std::mem::take(&mut self.default_options));
1918 };
1919 }
1920
1921 self.doc.commit_then_renew();
1922 }
1923}
1924
1925#[derive(Debug, Clone)]
1927pub struct CommitOptions {
1928 pub origin: Option<InternalString>,
1931
1932 pub immediate_renew: bool,
1935
1936 pub timestamp: Option<Timestamp>,
1939
1940 pub commit_msg: Option<Arc<str>>,
1942}
1943
1944impl CommitOptions {
1945 pub fn new() -> Self {
1947 Self {
1948 origin: None,
1949 immediate_renew: true,
1950 timestamp: None,
1951 commit_msg: None,
1952 }
1953 }
1954
1955 pub fn origin(mut self, origin: &str) -> Self {
1957 self.origin = Some(origin.into());
1958 self
1959 }
1960
1961 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
1963 self.immediate_renew = immediate_renew;
1964 self
1965 }
1966
1967 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
1971 self.timestamp = Some(timestamp);
1972 self
1973 }
1974
1975 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
1977 self.commit_msg = Some(commit_msg.into());
1978 self
1979 }
1980
1981 pub fn set_origin(&mut self, origin: Option<&str>) {
1983 self.origin = origin.map(|x| x.into())
1984 }
1985
1986 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
1988 self.timestamp = timestamp;
1989 }
1990}
1991
1992impl Default for CommitOptions {
1993 fn default() -> Self {
1994 Self::new()
1995 }
1996}
1997
1998#[cfg(test)]
1999mod test {
2000 use loro_common::ID;
2001
2002 use crate::{version::Frontiers, LoroDoc, ToJson};
2003
2004 #[test]
2005 fn test_sync() {
2006 fn is_send_sync<T: Send + Sync>(_v: T) {}
2007 let loro = super::LoroDoc::new();
2008 is_send_sync(loro)
2009 }
2010
2011 #[test]
2012 fn test_checkout() {
2013 let loro = LoroDoc::new();
2014 loro.set_peer_id(1).unwrap();
2015 let text = loro.get_text("text");
2016 let map = loro.get_map("map");
2017 let list = loro.get_list("list");
2018 let mut txn = loro.txn().unwrap();
2019 for i in 0..10 {
2020 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
2021 text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
2022 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
2023 }
2024 txn.commit().unwrap();
2025 let b = LoroDoc::new();
2026 b.import(&loro.export_snapshot().unwrap()).unwrap();
2027 loro.checkout(&Frontiers::default()).unwrap();
2028 {
2029 let json = &loro.get_deep_value();
2030 assert_eq!(json.to_json(), r#"{"text":"","list":[],"map":{}}"#);
2031 }
2032
2033 b.checkout(&ID::new(1, 2).into()).unwrap();
2034 {
2035 let json = &b.get_deep_value();
2036 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":0}}"#);
2037 }
2038
2039 loro.checkout(&ID::new(1, 3).into()).unwrap();
2040 {
2041 let json = &loro.get_deep_value();
2042 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":1}}"#);
2043 }
2044
2045 b.checkout(&ID::new(1, 29).into()).unwrap();
2046 {
2047 let json = &b.get_deep_value();
2048 assert_eq!(
2049 json.to_json(),
2050 r#"{"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}}"#
2051 );
2052 }
2053 }
2054
2055 #[test]
2056 fn import_batch_err_181() {
2057 let a = LoroDoc::new_auto_commit();
2058 let update_a = a.export_snapshot();
2059 let b = LoroDoc::new_auto_commit();
2060 b.import_batch(&[update_a.unwrap()]).unwrap();
2061 b.get_text("text").insert(0, "hello").unwrap();
2062 b.commit_then_renew();
2063 let oplog = b.oplog().lock().unwrap();
2064 drop(oplog);
2065 b.export_from(&Default::default());
2066 }
2067}