1use crate::change::ChangeRef;
2pub use crate::encoding::ExportMode;
3pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
4pub(crate) use crate::LoroDocInner;
5use crate::{
6 arena::SharedArena,
7 change::Timestamp,
8 configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
9 container::{
10 idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
11 IntoContainerId,
12 },
13 cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
14 dag::{Dag, DagUtils},
15 diff_calc::DiffCalculator,
16 encoding::{
17 self, decode_snapshot, export_fast_snapshot, export_fast_updates,
18 export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
19 export_state_only_snapshot,
20 json_schema::{encode_change_to_json, json::JsonSchema},
21 parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
22 },
23 event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
24 handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
25 id::PeerID,
26 json::JsonChange,
27 op::InnerContent,
28 oplog::{loro_dag::FrontiersNotIncluded, OpLog},
29 state::DocState,
30 subscription::{LocalUpdateCallback, Observer, Subscriber},
31 undo::DiffBatch,
32 utils::subscription::{SubscriberSetWithQueue, Subscription},
33 version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
34 ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
35 VersionVector,
36};
37use either::Either;
38use fxhash::{FxHashMap, FxHashSet};
39use loro_common::{
40 ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
41 LoroValue, ID,
42};
43use rle::HasLength;
44use std::{
45 borrow::Cow,
46 cmp::Ordering,
47 collections::{hash_map::Entry, BinaryHeap},
48 ops::ControlFlow,
49 sync::{
50 atomic::{
51 AtomicBool,
52 Ordering::{Acquire, Release},
53 },
54 Arc, Mutex,
55 },
56};
57use tracing::{debug_span, info, info_span, instrument, warn};
58
59impl Default for LoroDoc {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl std::fmt::Debug for LoroDocInner {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("LoroDoc")
68 .field("config", &self.config)
69 .field("auto_commit", &self.auto_commit)
70 .field("detached", &self.detached)
71 .finish()
72 }
73}
74
75impl LoroDoc {
76 pub fn new() -> Self {
77 let oplog = OpLog::new();
78 let arena = oplog.arena.clone();
79 let config: Configure = oplog.configure.clone();
80 let global_txn = Arc::new(Mutex::new(None));
81 let inner = Arc::new_cyclic(|w| {
82 let state = DocState::new_arc(w.clone(), arena.clone(), config.clone());
83 LoroDocInner {
84 oplog: Arc::new(Mutex::new(oplog)),
85 state,
86 config,
87 detached: AtomicBool::new(false),
88 auto_commit: AtomicBool::new(false),
89 observer: Arc::new(Observer::new(arena.clone())),
90 diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
91 txn: global_txn,
92 arena,
93 local_update_subs: SubscriberSetWithQueue::new(),
94 peer_id_change_subs: SubscriberSetWithQueue::new(),
95 }
96 });
97 Self { inner }
98 }
99
100 pub fn fork(&self) -> Self {
101 if self.is_detached() {
102 return self.fork_at(&self.state_frontiers());
103 }
104
105 let options = self.commit_then_stop();
106 let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self);
107 let doc = Self::new();
108 encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc).unwrap();
109 doc.set_config(&self.config);
110 if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
111 doc.start_auto_commit();
112 }
113 self.renew_txn_if_auto_commit(options);
114 doc
115 }
116
117 pub fn set_detached_editing(&self, enable: bool) {
133 self.config.set_detached_editing(enable);
134 if enable && self.is_detached() {
135 let options = self.commit_then_stop();
136 self.renew_peer_id();
137 self.renew_txn_if_auto_commit(options);
138 }
139 }
140
141 #[inline]
143 pub fn new_auto_commit() -> Self {
144 let doc = Self::new();
145 doc.start_auto_commit();
146 doc
147 }
148
149 #[inline(always)]
150 pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
151 if peer == PeerID::MAX {
152 return Err(LoroError::InvalidPeerID);
153 }
154 let next_id = self.oplog.try_lock().unwrap().next_id(peer);
155 if self.auto_commit.load(Acquire) {
156 let doc_state = self.state.try_lock().unwrap();
157 doc_state
158 .peer
159 .store(peer, std::sync::atomic::Ordering::Relaxed);
160 drop(doc_state);
161
162 let txn = self.txn.try_lock().unwrap().take();
163 if let Some(txn) = txn {
164 txn.commit().unwrap();
165 }
166
167 let new_txn = self.txn().unwrap();
168 self.txn.try_lock().unwrap().replace(new_txn);
169 self.peer_id_change_subs.emit(&(), next_id);
170 return Ok(());
171 }
172
173 let doc_state = self.state.try_lock().unwrap();
174 if doc_state.is_in_txn() {
175 return Err(LoroError::TransactionError(
176 "Cannot change peer id during transaction"
177 .to_string()
178 .into_boxed_str(),
179 ));
180 }
181
182 doc_state
183 .peer
184 .store(peer, std::sync::atomic::Ordering::Relaxed);
185 drop(doc_state);
186 self.peer_id_change_subs.emit(&(), next_id);
187 Ok(())
188 }
189
190 pub(crate) fn renew_peer_id(&self) {
192 let peer_id = DefaultRandom.next_u64();
193 self.set_peer_id(peer_id).unwrap();
194 }
195
196 #[inline]
203 #[must_use]
204 pub fn commit_then_stop(&self) -> Option<CommitOptions> {
205 self.commit_with(CommitOptions::new().immediate_renew(false))
206 }
207
208 #[inline]
213 pub fn commit_then_renew(&self) -> Option<CommitOptions> {
214 self.commit_with(CommitOptions::new().immediate_renew(true))
215 }
216
217 #[instrument(skip_all)]
223 pub fn commit_with(&self, config: CommitOptions) -> Option<CommitOptions> {
224 if !self.auto_commit.load(Acquire) {
225 return None;
228 }
229
230 let mut txn_guard = self.txn.try_lock().unwrap();
231 let txn = txn_guard.take();
232 drop(txn_guard);
233 let mut txn = txn?;
234 let on_commit = txn.take_on_commit();
235 if let Some(origin) = config.origin {
236 txn.set_origin(origin);
237 }
238
239 if let Some(timestamp) = config.timestamp {
240 txn.set_timestamp(timestamp);
241 }
242
243 if let Some(msg) = config.commit_msg.as_ref() {
244 txn.set_msg(Some(msg.clone()));
245 }
246
247 let id_span = txn.id_span();
248 let options = txn.commit().unwrap();
249 if config.immediate_renew {
250 let mut txn_guard = self.txn.try_lock().unwrap();
251 assert!(self.can_edit());
252 let mut t = self.txn().unwrap();
253 if let Some(options) = options.as_ref() {
254 t.set_options(options.clone());
255 }
256 *txn_guard = Some(t);
257 }
258
259 if let Some(on_commit) = on_commit {
260 on_commit(&self.state, &self.oplog, id_span);
261 }
262
263 options
264 }
265
266 pub fn set_next_commit_message(&self, message: &str) {
268 let mut binding = self.txn.try_lock().unwrap();
269 let Some(txn) = binding.as_mut() else {
270 return;
271 };
272
273 if message.is_empty() {
274 txn.set_msg(None)
275 } else {
276 txn.set_msg(Some(message.into()))
277 }
278 }
279
280 pub fn set_next_commit_origin(&self, origin: &str) {
282 let mut txn = self.txn.try_lock().unwrap();
283 if let Some(txn) = txn.as_mut() {
284 txn.set_origin(origin.into());
285 }
286 }
287
288 pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
290 let mut txn = self.txn.try_lock().unwrap();
291 if let Some(txn) = txn.as_mut() {
292 txn.set_timestamp(timestamp);
293 }
294 }
295
296 pub fn set_next_commit_options(&self, options: CommitOptions) {
298 let mut txn = self.txn.try_lock().unwrap();
299 if let Some(txn) = txn.as_mut() {
300 txn.set_options(options);
301 }
302 }
303
304 pub fn clear_next_commit_options(&self) {
306 let mut txn = self.txn.try_lock().unwrap();
307 if let Some(txn) = txn.as_mut() {
308 txn.set_options(CommitOptions::new());
309 }
310 }
311
312 #[inline]
323 pub fn set_record_timestamp(&self, record: bool) {
324 self.config.set_record_timestamp(record);
325 }
326
327 #[inline]
332 pub fn set_change_merge_interval(&self, interval: i64) {
333 self.config.set_merge_interval(interval);
334 }
335
336 pub fn can_edit(&self) -> bool {
337 !self.is_detached() || self.config.detached_editing()
338 }
339
340 pub fn is_detached_editing_enabled(&self) -> bool {
341 self.config.detached_editing()
342 }
343
344 #[inline]
345 pub fn config_text_style(&self, text_style: StyleConfigMap) {
346 self.config.text_style_config.try_write().unwrap().map = text_style.map;
347 }
348
349 #[inline]
350 pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
351 self.config
352 .text_style_config
353 .try_write()
354 .unwrap()
355 .default_style = text_style;
356 }
357 pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
358 let doc = Self::new();
359 let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
360 if mode.is_snapshot() {
361 decode_snapshot(&doc, mode, body)?;
362 Ok(doc)
363 } else {
364 Err(LoroError::DecodeError(
365 "Invalid encode mode".to_string().into(),
366 ))
367 }
368 }
369
370 #[inline(always)]
372 pub fn can_reset_with_snapshot(&self) -> bool {
373 let oplog = self.oplog.try_lock().unwrap();
374 if oplog.batch_importing {
375 return false;
376 }
377
378 if self.is_detached() {
379 return false;
380 }
381
382 oplog.is_empty() && self.state.try_lock().unwrap().can_import_snapshot()
383 }
384
385 #[inline(always)]
391 pub fn is_detached(&self) -> bool {
392 self.detached.load(Acquire)
393 }
394
395 pub(crate) fn set_detached(&self, detached: bool) {
396 self.detached.store(detached, Release);
397 }
398
399 #[inline(always)]
400 pub fn peer_id(&self) -> PeerID {
401 self.state
402 .try_lock()
403 .unwrap()
404 .peer
405 .load(std::sync::atomic::Ordering::Relaxed)
406 }
407
408 #[inline(always)]
409 pub fn detach(&self) {
410 let options = self.commit_then_stop();
411 self.set_detached(true);
412 self.renew_txn_if_auto_commit(options);
413 }
414
415 #[inline(always)]
416 pub fn attach(&self) {
417 self.checkout_to_latest()
418 }
419
420 pub fn state_timestamp(&self) -> Timestamp {
423 let f = &self.state.try_lock().unwrap().frontiers;
424 self.oplog.try_lock().unwrap().get_timestamp_of_version(f)
425 }
426
427 #[inline(always)]
428 pub fn app_state(&self) -> &Arc<Mutex<DocState>> {
429 &self.state
430 }
431
432 #[inline]
433 pub fn get_state_deep_value(&self) -> LoroValue {
434 self.state.try_lock().unwrap().get_deep_value()
435 }
436
437 #[inline(always)]
438 pub fn oplog(&self) -> &Arc<Mutex<OpLog>> {
439 &self.oplog
440 }
441
442 pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
443 let options = self.commit_then_stop();
444 let ans = self.oplog.try_lock().unwrap().export_from(vv);
445 self.renew_txn_if_auto_commit(options);
446 ans
447 }
448
449 #[inline(always)]
450 pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
451 let s = debug_span!("import", peer = self.peer_id());
452 let _e = s.enter();
453 self.import_with(bytes, Default::default())
454 }
455
456 #[inline]
457 pub fn import_with(
458 &self,
459 bytes: &[u8],
460 origin: InternalString,
461 ) -> Result<ImportStatus, LoroError> {
462 let options = self.commit_then_stop();
463 let ans = self._import_with(bytes, origin);
464 self.renew_txn_if_auto_commit(options);
465 ans
466 }
467
468 #[tracing::instrument(skip_all)]
469 fn _import_with(
470 &self,
471 bytes: &[u8],
472 origin: InternalString,
473 ) -> Result<ImportStatus, LoroError> {
474 ensure_cov::notify_cov("loro_internal::import");
475 let parsed = parse_header_and_body(bytes, true)?;
476 info!("Importing with mode={:?}", &parsed.mode);
477 let result = match parsed.mode {
478 EncodeMode::OutdatedRle => {
479 if self.state.try_lock().unwrap().is_in_txn() {
480 return Err(LoroError::ImportWhenInTxn);
481 }
482
483 let s = tracing::span!(
484 tracing::Level::INFO,
485 "Import updates ",
486 peer = self.peer_id()
487 );
488 let _e = s.enter();
489 self.update_oplog_and_apply_delta_to_state_if_needed(
490 |oplog| oplog.decode(parsed),
491 origin,
492 )
493 }
494 EncodeMode::OutdatedSnapshot => {
495 if self.can_reset_with_snapshot() {
496 tracing::info!("Init by snapshot {}", self.peer_id());
497 decode_snapshot(self, parsed.mode, parsed.body)
498 } else {
499 self.update_oplog_and_apply_delta_to_state_if_needed(
500 |oplog| oplog.decode(parsed),
501 origin,
502 )
503 }
504 }
505 EncodeMode::FastSnapshot => {
506 if self.can_reset_with_snapshot() {
507 ensure_cov::notify_cov("loro_internal::import::snapshot");
508 tracing::info!("Init by fast snapshot {}", self.peer_id());
509 decode_snapshot(self, parsed.mode, parsed.body)
510 } else {
511 self.update_oplog_and_apply_delta_to_state_if_needed(
512 |oplog| oplog.decode(parsed),
513 origin,
514 )
515
516 }
521 }
522 EncodeMode::FastUpdates => self.update_oplog_and_apply_delta_to_state_if_needed(
523 |oplog| oplog.decode(parsed),
524 origin,
525 ),
526 EncodeMode::Auto => {
527 unreachable!()
528 }
529 };
530
531 self.emit_events();
532 result
533 }
534
535 #[tracing::instrument(skip_all)]
536 pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
537 &self,
538 f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
539 origin: InternalString,
540 ) -> Result<ImportStatus, LoroError> {
541 let mut oplog = self.oplog.try_lock().unwrap();
542 if !self.is_detached() {
543 let old_vv = oplog.vv().clone();
544 let old_frontiers = oplog.frontiers().clone();
545 let result = f(&mut oplog);
546 if &old_vv != oplog.vv() {
547 let mut diff = DiffCalculator::new(false);
548 let (diff, diff_mode) = diff.calc_diff_internal(
549 &oplog,
550 &old_vv,
551 &old_frontiers,
552 oplog.vv(),
553 oplog.dag.get_frontiers(),
554 None,
555 );
556 let mut state = self.state.try_lock().unwrap();
557 state.apply_diff(
558 InternalDocDiff {
559 origin,
560 diff: (diff).into(),
561 by: EventTriggerKind::Import,
562 new_version: Cow::Owned(oplog.frontiers().clone()),
563 },
564 diff_mode,
565 );
566 }
567 result
568 } else {
569 f(&mut oplog)
570 }
571 }
572
573 fn emit_events(&self) {
574 let events = {
576 let mut state = self.state.try_lock().unwrap();
577 state.take_events()
578 };
579 for event in events {
580 self.observer.emit(event);
581 }
582 }
583
584 pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
585 let mut state = self.state.try_lock().unwrap();
586 state.take_events()
587 }
588
589 #[instrument(skip_all)]
590 pub fn export_snapshot(&self) -> Result<Vec<u8>, LoroEncodeError> {
591 if self.is_shallow() {
592 return Err(LoroEncodeError::ShallowSnapshotIncompatibleWithOldFormat);
593 }
594 let options = self.commit_then_stop();
595 let ans = export_snapshot(self);
596 self.renew_txn_if_auto_commit(options);
597 Ok(ans)
598 }
599
600 #[tracing::instrument(skip_all)]
604 pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
605 let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
606 let options = self.commit_then_stop();
607 let result = self.update_oplog_and_apply_delta_to_state_if_needed(
608 |oplog| crate::encoding::json_schema::import_json(oplog, json),
609 Default::default(),
610 );
611 self.emit_events();
612 self.renew_txn_if_auto_commit(options);
613 result
614 }
615
616 pub fn export_json_updates(
617 &self,
618 start_vv: &VersionVector,
619 end_vv: &VersionVector,
620 with_peer_compression: bool,
621 ) -> JsonSchema {
622 let options = self.commit_then_stop();
623 let oplog = self.oplog.try_lock().unwrap();
624 let mut start_vv = start_vv;
625 let _temp: Option<VersionVector>;
626 if !oplog.dag.shallow_since_vv().is_empty() {
627 let mut include_all = true;
629 for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
630 if start_vv.get(peer).unwrap_or(&0) < counter {
631 include_all = false;
632 break;
633 }
634 }
635 if !include_all {
636 let mut vv = start_vv.clone();
637 for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
638 vv.extend_to_include_end_id(ID::new(peer, counter));
639 }
640 _temp = Some(vv);
641 start_vv = _temp.as_ref().unwrap();
642 }
643 }
644
645 let json = crate::encoding::json_schema::export_json(
646 &oplog,
647 start_vv,
648 end_vv,
649 with_peer_compression,
650 );
651 drop(oplog);
652 self.renew_txn_if_auto_commit(options);
653 json
654 }
655
656 pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
657 let options = self.commit_then_stop();
658 let oplog = self.oplog.try_lock().unwrap();
659 let json = crate::encoding::json_schema::export_json_in_id_span(&oplog, id_span);
660 drop(oplog);
661 self.renew_txn_if_auto_commit(options);
662 json
663 }
664
665 #[inline]
667 pub fn oplog_vv(&self) -> VersionVector {
668 self.oplog.try_lock().unwrap().vv().clone()
669 }
670
671 #[inline]
673 pub fn state_vv(&self) -> VersionVector {
674 let f = &self.state.try_lock().unwrap().frontiers;
675 self.oplog
676 .try_lock()
677 .unwrap()
678 .dag
679 .frontiers_to_vv(f)
680 .unwrap()
681 }
682
683 pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
684 let value: LoroValue = self.state.try_lock().unwrap().get_value_by_path(path)?;
685 if let LoroValue::Container(c) = value {
686 Some(ValueOrHandler::Handler(Handler::new_attached(
687 c.clone(),
688 self.inner.clone(),
689 )))
690 } else {
691 Some(ValueOrHandler::Value(value))
692 }
693 }
694
695 pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
697 let path = str_to_path(path)?;
698 self.get_by_path(&path)
699 }
700
701 pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
702 let arena = &self.arena;
703 let txn = self.txn.try_lock().unwrap();
704 let txn = txn.as_ref()?;
705 let ops_ = txn.local_ops();
706 let new_id = ID {
707 peer: *txn.peer(),
708 counter: ops_.first()?.counter,
709 };
710 let change = ChangeRef {
711 id: &new_id,
712 deps: txn.frontiers(),
713 timestamp: &txn
714 .timestamp()
715 .as_ref()
716 .copied()
717 .unwrap_or_else(|| self.oplog.try_lock().unwrap().get_timestamp_for_next_txn()),
718 commit_msg: txn.msg(),
719 ops: ops_,
720 lamport: txn.lamport(),
721 };
722 let json = encode_change_to_json(change, arena);
723 Some(json)
724 }
725
726 #[inline]
727 pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
728 if self.has_container(&id) {
729 Some(Handler::new_attached(id, self.inner.clone()))
730 } else {
731 None
732 }
733 }
734
735 #[inline]
738 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
739 let id = id.into_container_id(&self.arena, ContainerType::Text);
740 assert!(self.has_container(&id));
741 Handler::new_attached(id, self.inner.clone())
742 .into_text()
743 .unwrap()
744 }
745
746 #[inline]
749 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
750 let id = id.into_container_id(&self.arena, ContainerType::List);
751 assert!(self.has_container(&id));
752 Handler::new_attached(id, self.inner.clone())
753 .into_list()
754 .unwrap()
755 }
756
757 #[inline]
760 pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
761 let id = id.into_container_id(&self.arena, ContainerType::MovableList);
762 assert!(self.has_container(&id));
763 Handler::new_attached(id, self.inner.clone())
764 .into_movable_list()
765 .unwrap()
766 }
767
768 #[inline]
771 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
772 let id = id.into_container_id(&self.arena, ContainerType::Map);
773 assert!(self.has_container(&id));
774 Handler::new_attached(id, self.inner.clone())
775 .into_map()
776 .unwrap()
777 }
778
779 #[inline]
782 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
783 let id = id.into_container_id(&self.arena, ContainerType::Tree);
784 assert!(self.has_container(&id));
785 Handler::new_attached(id, self.inner.clone())
786 .into_tree()
787 .unwrap()
788 }
789
790 #[cfg(feature = "counter")]
791 pub fn get_counter<I: IntoContainerId>(
792 &self,
793 id: I,
794 ) -> crate::handler::counter::CounterHandler {
795 let id = id.into_container_id(&self.arena, ContainerType::Counter);
796 assert!(self.has_container(&id));
797 Handler::new_attached(id, self.inner.clone())
798 .into_counter()
799 .unwrap()
800 }
801
802 #[must_use]
803 pub fn has_container(&self, id: &ContainerID) -> bool {
804 if id.is_root() {
805 return true;
806 }
807
808 let exist = self.state.try_lock().unwrap().does_container_exist(id);
809 exist
810 }
811
812 #[instrument(level = "info", skip_all)]
826 pub fn undo_internal(
827 &self,
828 id_span: IdSpan,
829 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
830 post_transform_base: Option<&DiffBatch>,
831 before_diff: &mut dyn FnMut(&DiffBatch),
832 ) -> LoroResult<CommitWhenDrop> {
833 if !self.can_edit() {
834 return Err(LoroError::EditWhenDetached);
835 }
836
837 let options = self.commit_then_stop();
838 if !self
839 .oplog()
840 .try_lock()
841 .unwrap()
842 .vv()
843 .includes_id(id_span.id_last())
844 {
845 self.renew_txn_if_auto_commit(options);
846 return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
847 }
848
849 let (was_recording, latest_frontiers) = {
850 let mut state = self.state.try_lock().unwrap();
851 let was_recording = state.is_recording();
852 state.stop_and_clear_recording();
853 (was_recording, state.frontiers.clone())
854 };
855
856 let spans = self
857 .oplog
858 .try_lock()
859 .unwrap()
860 .split_span_based_on_deps(id_span);
861 let diff = crate::undo::undo(
862 spans,
863 match post_transform_base {
864 Some(d) => Either::Right(d),
865 None => Either::Left(&latest_frontiers),
866 },
867 |from, to| {
868 self.checkout_without_emitting(from, false).unwrap();
869 self.state.try_lock().unwrap().start_recording();
870 self.checkout_without_emitting(to, false).unwrap();
871 let mut state = self.state.try_lock().unwrap();
872 let e = state.take_events();
873 state.stop_and_clear_recording();
874 DiffBatch::new(e)
875 },
876 before_diff,
877 );
878
879 self.checkout_without_emitting(&latest_frontiers, false)?;
883 self.set_detached(false);
884 if was_recording {
885 self.state.try_lock().unwrap().start_recording();
886 }
887 self.start_auto_commit();
888 if let Err(e) = self._apply_diff(diff, container_remap, true) {
892 warn!("Undo Failed {:?}", e);
893 }
894
895 if let Some(options) = options {
896 self.set_next_commit_options(options);
897 }
898 Ok(CommitWhenDrop {
899 doc: self,
900 default_options: CommitOptions::new().origin("undo"),
901 })
902 }
903
904 pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
910 let f = self.state_frontiers();
913 let diff = self.diff(&f, target)?;
914 self._apply_diff(diff, &mut Default::default(), false)
915 }
916
917 pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
922 {
923 let oplog = self.oplog.try_lock().unwrap();
925 for id in a.iter() {
926 if !oplog.dag.contains(id) {
927 return Err(LoroError::FrontiersNotFound(id));
928 }
929 }
930 for id in b.iter() {
931 if !oplog.dag.contains(id) {
932 return Err(LoroError::FrontiersNotFound(id));
933 }
934 }
935 }
936
937 let options = self.commit_then_stop();
938 let was_detached = self.is_detached();
939 let old_frontiers = self.state_frontiers();
940 let was_recording = {
941 let mut state = self.state.try_lock().unwrap();
942 let is_recording = state.is_recording();
943 state.stop_and_clear_recording();
944 is_recording
945 };
946 self.checkout_without_emitting(a, true).unwrap();
947 self.state.try_lock().unwrap().start_recording();
948 self.checkout_without_emitting(b, true).unwrap();
949 let e = {
950 let mut state = self.state.try_lock().unwrap();
951 let e = state.take_events();
952 state.stop_and_clear_recording();
953 e
954 };
955 self.checkout_without_emitting(&old_frontiers, false)
956 .unwrap();
957 if !was_detached {
958 self.set_detached(false);
959 self.renew_txn_if_auto_commit(options);
960 }
961 if was_recording {
962 self.state.try_lock().unwrap().start_recording();
963 }
964 Ok(DiffBatch::new(e))
965 }
966
967 #[inline(always)]
969 pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
970 self._apply_diff(diff, &mut Default::default(), true)
971 }
972
973 pub(crate) fn _apply_diff(
985 &self,
986 diff: DiffBatch,
987 container_remap: &mut FxHashMap<ContainerID, ContainerID>,
988 skip_unreachable: bool,
989 ) -> LoroResult<()> {
990 if !self.can_edit() {
991 return Err(LoroError::EditWhenDetached);
992 }
993
994 let mut ans: LoroResult<()> = Ok(());
995 let mut missing_containers: Vec<ContainerID> = Vec::new();
996 for (mut id, diff) in diff.into_iter() {
997 info!(
998 "id: {:?} diff: {:?} remap: {:?}",
999 &id, &diff, container_remap
1000 );
1001 let mut remapped = false;
1002 while let Some(rid) = container_remap.get(&id) {
1003 remapped = true;
1004 id = rid.clone();
1005 }
1006
1007 if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
1008 missing_containers.push(id);
1009 continue;
1010 }
1011
1012 if skip_unreachable && !remapped && !self.state.try_lock().unwrap().get_reachable(&id) {
1013 continue;
1014 }
1015
1016 let Some(h) = self.get_handler(id.clone()) else {
1017 return Err(LoroError::ContainersNotFound {
1018 containers: Box::new(vec![id]),
1019 });
1020 };
1021 if let Err(e) = h.apply_diff(diff, container_remap) {
1022 ans = Err(e);
1023 }
1024 }
1025
1026 if !missing_containers.is_empty() {
1027 return Err(LoroError::ContainersNotFound {
1028 containers: Box::new(missing_containers),
1029 });
1030 }
1031
1032 ans
1033 }
1034
1035 #[inline]
1037 pub fn diagnose_size(&self) {
1038 self.oplog().try_lock().unwrap().diagnose_size();
1039 }
1040
1041 #[inline]
1042 pub fn oplog_frontiers(&self) -> Frontiers {
1043 self.oplog().try_lock().unwrap().frontiers().clone()
1044 }
1045
1046 #[inline]
1047 pub fn state_frontiers(&self) -> Frontiers {
1048 self.state.try_lock().unwrap().frontiers.clone()
1049 }
1050
1051 #[inline]
1055 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
1056 self.oplog().try_lock().unwrap().cmp_with_frontiers(other)
1057 }
1058
1059 #[inline]
1063 pub fn cmp_frontiers(
1064 &self,
1065 a: &Frontiers,
1066 b: &Frontiers,
1067 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
1068 self.oplog().try_lock().unwrap().cmp_frontiers(a, b)
1069 }
1070
1071 pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
1072 let mut state = self.state.try_lock().unwrap();
1073 if !state.is_recording() {
1074 state.start_recording();
1075 }
1076
1077 self.observer.subscribe_root(callback)
1078 }
1079
1080 pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
1081 let mut state = self.state.try_lock().unwrap();
1082 if !state.is_recording() {
1083 state.start_recording();
1084 }
1085
1086 self.observer.subscribe(container_id, callback)
1087 }
1088
1089 pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
1090 let (sub, activate) = self.local_update_subs.inner().insert((), callback);
1091 activate();
1092 sub
1093 }
1094
1095 #[tracing::instrument(skip_all)]
1097 pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
1098 if bytes.is_empty() {
1099 return Ok(ImportStatus::default());
1100 }
1101
1102 if bytes.len() == 1 {
1103 return self.import(&bytes[0]);
1104 }
1105
1106 let mut success = VersionRange::default();
1107 let mut pending = VersionRange::default();
1108 let mut meta_arr = bytes
1109 .iter()
1110 .map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
1111 .collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
1112 meta_arr.sort_by(|a, b| {
1113 a.0.mode
1114 .cmp(&b.0.mode)
1115 .then(b.0.change_num.cmp(&a.0.change_num))
1116 });
1117
1118 let options = self.commit_then_stop();
1119 let is_detached = self.is_detached();
1120 self.detach();
1121 self.oplog.try_lock().unwrap().batch_importing = true;
1122 let mut err = None;
1123 for (_meta, data) in meta_arr {
1124 match self.import(data) {
1125 Ok(s) => {
1126 for (peer, (start, end)) in s.success.iter() {
1127 match success.0.entry(*peer) {
1128 Entry::Occupied(mut e) => {
1129 e.get_mut().1 = *end.max(&e.get().1);
1130 }
1131 Entry::Vacant(e) => {
1132 e.insert((*start, *end));
1133 }
1134 }
1135 }
1136
1137 if let Some(p) = s.pending.as_ref() {
1138 for (&peer, &(start, end)) in p.iter() {
1139 match pending.0.entry(peer) {
1140 Entry::Occupied(mut e) => {
1141 e.get_mut().0 = start.min(e.get().0);
1142 e.get_mut().1 = end.min(e.get().1);
1143 }
1144 Entry::Vacant(e) => {
1145 e.insert((start, end));
1146 }
1147 }
1148 }
1149 }
1150 }
1151 Err(e) => {
1152 err = Some(e);
1153 }
1154 }
1155 }
1156
1157 let mut oplog = self.oplog.try_lock().unwrap();
1158 oplog.batch_importing = false;
1159 drop(oplog);
1160
1161 if !is_detached {
1162 self.checkout_to_latest();
1163 }
1164
1165 self.renew_txn_if_auto_commit(options);
1166 if let Some(err) = err {
1167 return Err(err);
1168 }
1169
1170 Ok(ImportStatus {
1171 success,
1172 pending: if pending.is_empty() {
1173 None
1174 } else {
1175 Some(pending)
1176 },
1177 })
1178 }
1179
1180 #[inline]
1182 pub fn get_value(&self) -> LoroValue {
1183 self.state.try_lock().unwrap().get_value()
1184 }
1185
1186 #[inline]
1188 pub fn get_deep_value(&self) -> LoroValue {
1189 self.state.try_lock().unwrap().get_deep_value()
1190 }
1191
1192 #[inline]
1194 pub fn get_deep_value_with_id(&self) -> LoroValue {
1195 self.state.try_lock().unwrap().get_deep_value_with_id()
1196 }
1197
1198 pub fn checkout_to_latest(&self) {
1199 let options = self.commit_then_renew();
1200 if !self.is_detached() {
1201 return;
1202 }
1203
1204 tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
1205 let f = self.oplog_frontiers();
1206 let this = &self;
1207 let frontiers = &f;
1208 this.checkout_without_emitting(frontiers, false).unwrap(); this.emit_events();
1211 if this.config.detached_editing() {
1212 this.renew_peer_id();
1213 }
1214
1215 self.set_detached(false);
1216 self.renew_txn_if_auto_commit(options);
1217 });
1218 }
1219
1220 pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
1225 let options = self.checkout_without_emitting(frontiers, true)?;
1226 self.emit_events();
1227 if self.config.detached_editing() {
1228 self.renew_peer_id();
1229 self.renew_txn_if_auto_commit(options);
1230 }
1231
1232 Ok(())
1233 }
1234
1235 #[instrument(level = "info", skip(self))]
1236 pub(crate) fn checkout_without_emitting(
1237 &self,
1238 frontiers: &Frontiers,
1239 to_shrink_frontiers: bool,
1240 ) -> Result<Option<CommitOptions>, LoroError> {
1241 let mut options = None;
1242 let had_txn = self.txn.try_lock().unwrap().is_some();
1243 if had_txn {
1244 options = self.commit_then_stop();
1245 }
1246 let from_frontiers = self.state_frontiers();
1247 info!(
1248 "checkout from={:?} to={:?} cur_vv={:?}",
1249 from_frontiers,
1250 frontiers,
1251 self.oplog_vv()
1252 );
1253
1254 if &from_frontiers == frontiers {
1255 if had_txn {
1256 self.renew_txn_if_auto_commit(options);
1257 }
1258 return Ok(None);
1259 }
1260
1261 let oplog = self.oplog.try_lock().unwrap();
1262 if oplog.dag.is_before_shallow_root(frontiers) {
1263 drop(oplog);
1264 if had_txn {
1265 self.renew_txn_if_auto_commit(options);
1266 }
1267 return Err(LoroError::SwitchToVersionBeforeShallowRoot);
1268 }
1269
1270 let frontiers = if to_shrink_frontiers {
1271 shrink_frontiers(frontiers, &oplog.dag)
1272 .map_err(|_| LoroError::SwitchToVersionBeforeShallowRoot)?
1273 } else {
1274 frontiers.clone()
1275 };
1276 if from_frontiers == frontiers {
1277 drop(oplog);
1278 if had_txn {
1279 self.renew_txn_if_auto_commit(options);
1280 }
1281 return Ok(None);
1282 }
1283
1284 let mut state = self.state.try_lock().unwrap();
1285 let mut calc = self.diff_calculator.try_lock().unwrap();
1286 for i in frontiers.iter() {
1287 if !oplog.dag.contains(i) {
1288 drop(oplog);
1289 drop(state);
1290 if had_txn {
1291 self.renew_txn_if_auto_commit(options);
1292 }
1293 return Err(LoroError::FrontiersNotFound(i));
1294 }
1295 }
1296
1297 let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
1298 let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
1299 drop(oplog);
1300 drop(state);
1301 if had_txn {
1302 self.renew_txn_if_auto_commit(options);
1303 }
1304 return Err(LoroError::NotFoundError(
1305 format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
1306 ));
1307 };
1308
1309 self.set_detached(true);
1310 let (diff, diff_mode) =
1311 calc.calc_diff_internal(&oplog, before, &state.frontiers, after, &frontiers, None);
1312 state.apply_diff(
1313 InternalDocDiff {
1314 origin: "checkout".into(),
1315 diff: Cow::Owned(diff),
1316 by: EventTriggerKind::Checkout,
1317 new_version: Cow::Owned(frontiers.clone()),
1318 },
1319 diff_mode,
1320 );
1321
1322 drop(state);
1323 drop(oplog);
1324 Ok(options)
1325 }
1326
1327 #[inline]
1328 pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
1329 self.oplog.try_lock().unwrap().dag.vv_to_frontiers(vv)
1330 }
1331
1332 #[inline]
1333 pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
1334 self.oplog
1335 .try_lock()
1336 .unwrap()
1337 .dag
1338 .frontiers_to_vv(frontiers)
1339 }
1340
1341 pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
1345 self.import(&other.export_from(&self.oplog_vv()))
1346 }
1347
1348 pub(crate) fn arena(&self) -> &SharedArena {
1349 &self.arena
1350 }
1351
1352 #[inline]
1353 pub fn len_ops(&self) -> usize {
1354 let oplog = self.oplog.try_lock().unwrap();
1355 let ans = oplog.vv().iter().map(|(_, ops)| *ops).sum::<i32>() as usize;
1356 if oplog.is_shallow() {
1357 let sub = oplog
1358 .shallow_since_vv()
1359 .iter()
1360 .map(|(_, ops)| *ops)
1361 .sum::<i32>() as usize;
1362 ans - sub
1363 } else {
1364 ans
1365 }
1366 }
1367
1368 #[inline]
1369 pub fn len_changes(&self) -> usize {
1370 let oplog = self.oplog.try_lock().unwrap();
1371 oplog.len_changes()
1372 }
1373
1374 pub fn config(&self) -> &Configure {
1375 &self.config
1376 }
1377
1378 pub fn check_state_diff_calc_consistency_slow(&self) {
1383 {
1385 static IS_CHECKING: AtomicBool = AtomicBool::new(false);
1386 if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
1387 return;
1388 }
1389
1390 IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
1391 let peer_id = self.peer_id();
1392 let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
1393 let _g = s.enter();
1394 let options = self.commit_then_stop();
1395 self.oplog.try_lock().unwrap().check_dag_correctness();
1396 if self.is_shallow() {
1397 let initial_snapshot = self
1408 .export(ExportMode::state_only(Some(
1409 &self.shallow_since_frontiers(),
1410 )))
1411 .unwrap();
1412
1413 let doc = LoroDoc::new();
1415 doc.import(&initial_snapshot).unwrap();
1416 self.checkout(&self.shallow_since_frontiers()).unwrap();
1417 assert_eq!(self.get_deep_value(), doc.get_deep_value());
1418
1419 let updates = self.export(ExportMode::all_updates()).unwrap();
1421
1422 doc.import(&updates).unwrap();
1424 self.checkout_to_latest();
1425
1426 assert_eq!(doc.get_deep_value(), self.get_deep_value());
1429 let mut calculated_state = doc.app_state().try_lock().unwrap();
1430 let mut current_state = self.app_state().try_lock().unwrap();
1431 current_state.check_is_the_same(&mut calculated_state);
1432 } else {
1433 let f = self.state_frontiers();
1434 let vv = self
1435 .oplog()
1436 .try_lock()
1437 .unwrap()
1438 .dag
1439 .frontiers_to_vv(&f)
1440 .unwrap();
1441 let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
1442 let doc = Self::new();
1443 doc.import(&bytes).unwrap();
1444 let mut calculated_state = doc.app_state().try_lock().unwrap();
1445 let mut current_state = self.app_state().try_lock().unwrap();
1446 current_state.check_is_the_same(&mut calculated_state);
1447 }
1448
1449 self.renew_txn_if_auto_commit(options);
1450 IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
1451 }
1452 }
1453
1454 #[inline]
1455 pub fn log_estimated_size(&self) {
1456 let state = self.state.try_lock().unwrap();
1457 state.log_estimated_size();
1458 }
1459
1460 pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
1461 self.query_pos_internal(pos, true)
1462 }
1463
1464 pub(crate) fn query_pos_internal(
1466 &self,
1467 pos: &Cursor,
1468 ret_event_index: bool,
1469 ) -> Result<PosQueryResult, CannotFindRelativePosition> {
1470 let mut state = self.state.try_lock().unwrap();
1471 if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
1472 Ok(PosQueryResult {
1473 update: None,
1474 current: AbsolutePosition {
1475 pos: ans,
1476 side: pos.side,
1477 },
1478 })
1479 } else {
1480 drop(state);
1492 self.commit_then_renew();
1493 let oplog = self.oplog().try_lock().unwrap();
1494 if let Some(id) = pos.id {
1496 let idx = oplog
1497 .arena
1498 .id_to_idx(&pos.container)
1499 .ok_or(CannotFindRelativePosition::ContainerDeleted)?;
1500 let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
1502 if oplog.shallow_since_vv().includes_id(id) {
1503 return Err(CannotFindRelativePosition::HistoryCleared);
1504 }
1505
1506 tracing::error!("Cannot find id {}", id);
1507 return Err(CannotFindRelativePosition::IdNotFound);
1508 };
1509 let mut diff_calc = DiffCalculator::new(true);
1511 let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
1512 let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
1513 diff_calc.calc_diff_internal(
1515 &oplog,
1516 before,
1517 &before_frontiers,
1518 oplog.vv(),
1519 oplog.frontiers(),
1520 Some(&|target| idx == target),
1521 );
1522 let depth = self.arena.get_depth(idx);
1524 let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
1525 match diff_calc {
1526 crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
1527 let c = text.get_id_latest_pos(id).unwrap();
1528 let new_pos = c.pos;
1529 let handler = self.get_text(&pos.container);
1530 let current_pos = handler.convert_entity_index_to_event_index(new_pos);
1531 Ok(PosQueryResult {
1532 update: handler.get_cursor(current_pos, c.side),
1533 current: AbsolutePosition {
1534 pos: current_pos,
1535 side: c.side,
1536 },
1537 })
1538 }
1539 crate::diff_calc::ContainerDiffCalculator::List(list) => {
1540 let c = list.get_id_latest_pos(id).unwrap();
1541 let new_pos = c.pos;
1542 let handler = self.get_list(&pos.container);
1543 Ok(PosQueryResult {
1544 update: handler.get_cursor(new_pos, c.side),
1545 current: AbsolutePosition {
1546 pos: new_pos,
1547 side: c.side,
1548 },
1549 })
1550 }
1551 crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
1552 let c = list.get_id_latest_pos(id).unwrap();
1553 let new_pos = c.pos;
1554 let handler = self.get_movable_list(&pos.container);
1555 let new_pos = handler.op_pos_to_user_pos(new_pos);
1556 Ok(PosQueryResult {
1557 update: handler.get_cursor(new_pos, c.side),
1558 current: AbsolutePosition {
1559 pos: new_pos,
1560 side: c.side,
1561 },
1562 })
1563 }
1564 crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
1565 crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
1566 #[cfg(feature = "counter")]
1567 crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
1568 crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
1569 }
1570 } else {
1571 match pos.container.container_type() {
1572 ContainerType::Text => {
1573 let text = self.get_text(&pos.container);
1574 Ok(PosQueryResult {
1575 update: Some(Cursor {
1576 id: None,
1577 container: text.id(),
1578 side: pos.side,
1579 origin_pos: text.len_unicode(),
1580 }),
1581 current: AbsolutePosition {
1582 pos: text.len_event(),
1583 side: pos.side,
1584 },
1585 })
1586 }
1587 ContainerType::List => {
1588 let list = self.get_list(&pos.container);
1589 Ok(PosQueryResult {
1590 update: Some(Cursor {
1591 id: None,
1592 container: list.id(),
1593 side: pos.side,
1594 origin_pos: list.len(),
1595 }),
1596 current: AbsolutePosition {
1597 pos: list.len(),
1598 side: pos.side,
1599 },
1600 })
1601 }
1602 ContainerType::MovableList => {
1603 let list = self.get_movable_list(&pos.container);
1604 Ok(PosQueryResult {
1605 update: Some(Cursor {
1606 id: None,
1607 container: list.id(),
1608 side: pos.side,
1609 origin_pos: list.len(),
1610 }),
1611 current: AbsolutePosition {
1612 pos: list.len(),
1613 side: pos.side,
1614 },
1615 })
1616 }
1617 ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
1618 unreachable!()
1619 }
1620 #[cfg(feature = "counter")]
1621 ContainerType::Counter => unreachable!(),
1622 }
1623 }
1624 }
1625 }
1626
1627 pub fn free_history_cache(&self) {
1632 self.oplog.try_lock().unwrap().free_history_cache();
1633 }
1634
1635 pub fn free_diff_calculator(&self) {
1637 *self.diff_calculator.try_lock().unwrap() = DiffCalculator::new(true);
1638 }
1639
1640 pub fn has_history_cache(&self) -> bool {
1643 self.oplog.try_lock().unwrap().has_history_cache()
1644 }
1645
1646 #[inline]
1650 pub fn compact_change_store(&self) {
1651 self.commit_then_renew();
1652 self.oplog.try_lock().unwrap().compact_change_store();
1653 }
1654
1655 #[inline]
1659 pub fn analyze(&self) -> DocAnalysis {
1660 DocAnalysis::analyze(self)
1661 }
1662
1663 pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
1665 let mut state = self.state.try_lock().unwrap();
1666 let idx = state.arena.id_to_idx(id)?;
1667 state.get_path(idx)
1668 }
1669
1670 #[instrument(skip(self))]
1671 pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
1672 let options = self.commit_then_stop();
1673 let ans = match mode {
1674 ExportMode::Snapshot => export_fast_snapshot(self),
1675 ExportMode::Updates { from } => export_fast_updates(self, &from),
1676 ExportMode::UpdatesInRange { spans } => {
1677 export_fast_updates_in_range(&self.oplog.try_lock().unwrap(), spans.as_ref())
1678 }
1679 ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
1680 ExportMode::StateOnly(f) => match f {
1681 Some(f) => export_state_only_snapshot(self, &f)?,
1682 None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
1683 },
1684 ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
1685 };
1686
1687 self.renew_txn_if_auto_commit(options);
1688 Ok(ans)
1689 }
1690
1691 pub fn shallow_since_vv(&self) -> ImVersionVector {
1697 self.oplog().try_lock().unwrap().shallow_since_vv().clone()
1698 }
1699
1700 pub fn shallow_since_frontiers(&self) -> Frontiers {
1701 self.oplog()
1702 .try_lock()
1703 .unwrap()
1704 .shallow_since_frontiers()
1705 .clone()
1706 }
1707
1708 pub fn is_shallow(&self) -> bool {
1710 !self
1711 .oplog()
1712 .try_lock()
1713 .unwrap()
1714 .shallow_since_vv()
1715 .is_empty()
1716 }
1717
1718 pub fn get_pending_txn_len(&self) -> usize {
1723 if let Some(txn) = self.txn.try_lock().unwrap().as_ref() {
1724 txn.len()
1725 } else {
1726 0
1727 }
1728 }
1729
1730 #[inline]
1731 pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
1732 self.oplog().try_lock().unwrap().dag.find_path(from, to)
1733 }
1734}
1735
1736#[derive(Debug, thiserror::Error)]
1737pub enum ChangeTravelError {
1738 #[error("Target id not found {0:?}")]
1739 TargetIdNotFound(ID),
1740 #[error("The shallow history of the doc doesn't include the target version")]
1741 TargetVersionNotIncluded,
1742}
1743
1744impl LoroDoc {
1745 pub fn travel_change_ancestors(
1746 &self,
1747 ids: &[ID],
1748 f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
1749 ) -> Result<(), ChangeTravelError> {
1750 self.commit_then_renew();
1751 struct PendingNode(ChangeMeta);
1752 impl PartialEq for PendingNode {
1753 fn eq(&self, other: &Self) -> bool {
1754 self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
1755 }
1756 }
1757
1758 impl Eq for PendingNode {}
1759 impl PartialOrd for PendingNode {
1760 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1761 Some(self.cmp(other))
1762 }
1763 }
1764
1765 impl Ord for PendingNode {
1766 fn cmp(&self, other: &Self) -> Ordering {
1767 self.0
1768 .lamport_last()
1769 .cmp(&other.0.lamport_last())
1770 .then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
1771 }
1772 }
1773
1774 for id in ids {
1775 let op_log = &self.oplog().try_lock().unwrap();
1776 if !op_log.vv().includes_id(*id) {
1777 return Err(ChangeTravelError::TargetIdNotFound(*id));
1778 }
1779 if op_log.dag.shallow_since_vv().includes_id(*id) {
1780 return Err(ChangeTravelError::TargetVersionNotIncluded);
1781 }
1782 }
1783
1784 let mut visited = FxHashSet::default();
1785 let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
1786 for id in ids {
1787 pending.push(PendingNode(ChangeMeta::from_change(
1788 &self.oplog().try_lock().unwrap().get_change_at(*id).unwrap(),
1789 )));
1790 }
1791 while let Some(PendingNode(node)) = pending.pop() {
1792 let deps = node.deps.clone();
1793 if f(node).is_break() {
1794 break;
1795 }
1796
1797 for dep in deps.iter() {
1798 let Some(dep_node) = self.oplog().try_lock().unwrap().get_change_at(dep) else {
1799 continue;
1800 };
1801 if visited.contains(&dep_node.id) {
1802 continue;
1803 }
1804
1805 visited.insert(dep_node.id);
1806 pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
1807 }
1808 }
1809
1810 Ok(())
1811 }
1812
1813 pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
1814 self.commit_then_renew();
1815 let mut set = FxHashSet::default();
1816 let oplog = &self.oplog().try_lock().unwrap();
1817 for op in oplog.iter_ops(id.to_span(len)) {
1818 let id = oplog.arena.get_container_id(op.container()).unwrap();
1819 set.insert(id);
1820 }
1821
1822 set
1823 }
1824}
1825
1826fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
1828 let start_vv = oplog
1829 .dag
1830 .frontiers_to_vv(&id.into())
1831 .unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
1832 for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
1833 for op in change.ops.iter().rev() {
1834 if op.container != idx {
1835 continue;
1836 }
1837 if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
1838 if d.id_start.to_span(d.atom_len()).contains(id) {
1839 return Some(ID::new(change.peer(), op.counter));
1840 }
1841 }
1842 }
1843 }
1844
1845 None
1846}
1847
1848#[derive(Debug)]
1849pub struct CommitWhenDrop<'a> {
1850 doc: &'a LoroDoc,
1851 default_options: CommitOptions,
1852}
1853
1854impl Drop for CommitWhenDrop<'_> {
1855 fn drop(&mut self) {
1856 {
1857 let mut guard = self.doc.txn.try_lock().unwrap();
1858 if let Some(txn) = guard.as_mut() {
1859 txn.set_default_options(std::mem::take(&mut self.default_options));
1860 };
1861 }
1862
1863 self.doc.commit_then_renew();
1864 }
1865}
1866
1867#[derive(Debug, Clone)]
1869pub struct CommitOptions {
1870 pub origin: Option<InternalString>,
1873
1874 pub immediate_renew: bool,
1877
1878 pub timestamp: Option<Timestamp>,
1881
1882 pub commit_msg: Option<Arc<str>>,
1884}
1885
1886impl CommitOptions {
1887 pub fn new() -> Self {
1889 Self {
1890 origin: None,
1891 immediate_renew: true,
1892 timestamp: None,
1893 commit_msg: None,
1894 }
1895 }
1896
1897 pub fn origin(mut self, origin: &str) -> Self {
1899 self.origin = Some(origin.into());
1900 self
1901 }
1902
1903 pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
1905 self.immediate_renew = immediate_renew;
1906 self
1907 }
1908
1909 pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
1913 self.timestamp = Some(timestamp);
1914 self
1915 }
1916
1917 pub fn commit_msg(mut self, commit_msg: &str) -> Self {
1919 self.commit_msg = Some(commit_msg.into());
1920 self
1921 }
1922
1923 pub fn set_origin(&mut self, origin: Option<&str>) {
1925 self.origin = origin.map(|x| x.into())
1926 }
1927
1928 pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
1930 self.timestamp = timestamp;
1931 }
1932}
1933
1934impl Default for CommitOptions {
1935 fn default() -> Self {
1936 Self::new()
1937 }
1938}
1939
1940#[cfg(test)]
1941mod test {
1942 use loro_common::ID;
1943
1944 use crate::{version::Frontiers, LoroDoc, ToJson};
1945
1946 #[test]
1947 fn test_sync() {
1948 fn is_send_sync<T: Send + Sync>(_v: T) {}
1949 let loro = super::LoroDoc::new();
1950 is_send_sync(loro)
1951 }
1952
1953 #[test]
1954 fn test_checkout() {
1955 let loro = LoroDoc::new();
1956 loro.set_peer_id(1).unwrap();
1957 let text = loro.get_text("text");
1958 let map = loro.get_map("map");
1959 let list = loro.get_list("list");
1960 let mut txn = loro.txn().unwrap();
1961 for i in 0..10 {
1962 map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
1963 text.insert_with_txn(&mut txn, 0, &i.to_string()).unwrap();
1964 list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
1965 }
1966 txn.commit().unwrap();
1967 let b = LoroDoc::new();
1968 b.import(&loro.export_snapshot().unwrap()).unwrap();
1969 loro.checkout(&Frontiers::default()).unwrap();
1970 {
1971 let json = &loro.get_deep_value();
1972 assert_eq!(json.to_json(), r#"{"text":"","list":[],"map":{}}"#);
1973 }
1974
1975 b.checkout(&ID::new(1, 2).into()).unwrap();
1976 {
1977 let json = &b.get_deep_value();
1978 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":0}}"#);
1979 }
1980
1981 loro.checkout(&ID::new(1, 3).into()).unwrap();
1982 {
1983 let json = &loro.get_deep_value();
1984 assert_eq!(json.to_json(), r#"{"text":"0","list":[0],"map":{"key":1}}"#);
1985 }
1986
1987 b.checkout(&ID::new(1, 29).into()).unwrap();
1988 {
1989 let json = &b.get_deep_value();
1990 assert_eq!(
1991 json.to_json(),
1992 r#"{"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}}"#
1993 );
1994 }
1995 }
1996
1997 #[test]
1998 fn import_batch_err_181() {
1999 let a = LoroDoc::new_auto_commit();
2000 let update_a = a.export_snapshot();
2001 let b = LoroDoc::new_auto_commit();
2002 b.import_batch(&[update_a.unwrap()]).unwrap();
2003 b.get_text("text").insert(0, "hello").unwrap();
2004 b.commit_then_renew();
2005 let oplog = b.oplog().try_lock().unwrap();
2006 drop(oplog);
2007 b.export_from(&Default::default());
2008 }
2009}