use crate::encoding::json_schema::{encode_change, export_json_in_id_span};
pub use crate::encoding::ExportMode;
use crate::pre_commit::{FirstCommitFromPeerCallback, FirstCommitFromPeerPayload};
pub use crate::state::analyzer::{ContainerAnalysisInfo, DocAnalysis};
use crate::sync::{AtomicBool, AtomicUsize};
pub(crate) use crate::LoroDocInner;
use crate::{
arena::SharedArena,
change::{Change, Timestamp},
configure::{Configure, DefaultRandom, SecureRandomGenerator, StyleConfig},
container::{
idx::ContainerIdx, list::list_op::InnerListOp, richtext::config::StyleConfigMap,
IntoContainerId,
},
cursor::{AbsolutePosition, CannotFindRelativePosition, Cursor, PosQueryResult},
dag::{Dag, DagUtils},
diff_calc::DiffCalculator,
encoding::{
self, decode_snapshot, export_fast_snapshot, export_fast_updates,
export_fast_updates_in_range, export_shallow_snapshot, export_snapshot_at,
export_state_only_snapshot,
json_schema::{encode_change_to_json, json::JsonSchema},
parse_header_and_body, EncodeMode, ImportBlobMetadata, ImportStatus, ParsedHeaderAndBody,
},
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
id::PeerID,
json::JsonChange,
op::InnerContent,
oplog::{loro_dag::FrontiersNotIncluded, OpLog},
state::DocState,
subscription::{LocalUpdateCallback, Observer, Subscriber},
undo::DiffBatch,
utils::subscription::{SubscriberSetWithQueue, Subscription},
version::{shrink_frontiers, Frontiers, ImVersionVector, VersionRange, VersionVectorDiff},
ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroDoc, LoroError, MapHandler,
VersionVector,
};
use crate::{change::ChangeRef, lock::LockKind};
use crate::{lock::LoroMutexGuard, pre_commit::PreCommitCallback};
use crate::{
lock::{LoroLockGroup, LoroMutex},
txn::Transaction,
};
use either::Either;
use loro_common::{
ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroEncodeError, LoroResult,
LoroValue, ID,
};
use rle::HasLength;
use rustc_hash::{FxHashMap, FxHashSet};
use std::{
borrow::Cow,
cmp::Ordering,
collections::{hash_map::Entry, BinaryHeap},
ops::ControlFlow,
sync::{
atomic::Ordering::{Acquire, Release},
Arc,
},
};
use tracing::{debug_span, info_span, instrument, warn};
impl Default for LoroDoc {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for LoroDocInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LoroDoc")
.field("config", &self.config)
.field("auto_commit", &self.auto_commit)
.field("detached", &self.detached)
.finish()
}
}
impl LoroDoc {
#[inline]
pub fn with_barrier<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let (options, guard) = self.implicit_commit_then_stop();
let result = f();
drop(guard);
self.renew_txn_if_auto_commit(options);
result
}
pub fn new() -> Self {
let visible_op_count = Arc::new(AtomicUsize::new(0));
let oplog = OpLog::new(visible_op_count.clone());
let arena = oplog.arena.clone();
let config: Configure = oplog.configure.clone();
let lock_group = LoroLockGroup::new();
let global_txn = Arc::new(lock_group.new_lock(None, LockKind::Txn));
let inner = Arc::new_cyclic(|w| {
let state = DocState::new_arc(w.clone(), arena.clone(), config.clone(), &lock_group);
LoroDocInner {
oplog: Arc::new(lock_group.new_lock(oplog, LockKind::OpLog)),
state,
config,
visible_op_count,
detached: AtomicBool::new(false),
auto_commit: AtomicBool::new(false),
observer: Arc::new(Observer::new(arena.clone())),
diff_calculator: Arc::new(
lock_group.new_lock(DiffCalculator::new(true), LockKind::DiffCalculator),
),
txn: global_txn,
arena,
local_update_subs: SubscriberSetWithQueue::new(),
peer_id_change_subs: SubscriberSetWithQueue::new(),
pre_commit_subs: SubscriberSetWithQueue::new(),
first_commit_from_peer_subs: SubscriberSetWithQueue::new(),
}
});
LoroDoc { inner }
}
pub fn fork(&self) -> Self {
if self.is_detached() {
return self
.fork_at(&self.state_frontiers())
.expect("fork_at on detached doc should not fail");
}
let snapshot = self.with_barrier(|| encoding::fast_snapshot::encode_snapshot_inner(self));
let doc = Self::new();
doc.with_barrier(|| {
encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc, Default::default())
})
.unwrap();
doc.set_config(&self.config);
if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
doc.start_auto_commit();
}
doc
}
pub fn set_detached_editing(&self, enable: bool) {
self.config.set_detached_editing(enable);
if enable && self.is_detached() {
self.with_barrier(|| {
self.renew_peer_id();
});
}
}
#[inline]
pub fn new_auto_commit() -> Self {
let doc = Self::new();
doc.start_auto_commit();
doc
}
#[inline(always)]
pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
if peer == PeerID::MAX {
return Err(LoroError::InvalidPeerID);
}
let next_id = self.oplog.lock().next_id(peer);
if self.auto_commit.load(Acquire) {
let doc_state = self.state.lock();
doc_state
.peer
.store(peer, std::sync::atomic::Ordering::Relaxed);
if doc_state.is_in_txn() {
drop(doc_state);
self.with_barrier(|| {});
}
self.peer_id_change_subs.emit(&(), next_id);
return Ok(());
}
let doc_state = self.state.lock();
if doc_state.is_in_txn() {
return Err(LoroError::TransactionError(
"Cannot change peer id during transaction"
.to_string()
.into_boxed_str(),
));
}
doc_state
.peer
.store(peer, std::sync::atomic::Ordering::Relaxed);
drop(doc_state);
self.peer_id_change_subs.emit(&(), next_id);
Ok(())
}
pub(crate) fn renew_peer_id(&self) {
let mut peer_id = DefaultRandom.next_u64();
while peer_id == PeerID::MAX {
peer_id = DefaultRandom.next_u64();
}
self.set_peer_id(peer_id).unwrap();
}
#[inline]
#[must_use]
pub fn implicit_commit_then_stop(
&self,
) -> (
Option<CommitOptions>,
LoroMutexGuard<'_, Option<Transaction>>,
) {
let (a, b) = self.commit_internal(CommitOptions::new().immediate_renew(false), true);
(a, b.unwrap())
}
#[inline]
pub fn commit_then_renew(&self) -> Option<CommitOptions> {
self.commit_internal(CommitOptions::new().immediate_renew(true), false)
.0
}
fn before_commit(&self) -> Option<LoroMutexGuard<'_, Option<Transaction>>> {
let mut txn_guard = self.txn.lock();
let Some(txn) = txn_guard.as_mut() else {
return Some(txn_guard);
};
if txn.is_peer_first_appearance {
txn.is_peer_first_appearance = false;
drop(txn_guard);
self.first_commit_from_peer_subs.emit(
&(),
FirstCommitFromPeerPayload {
peer: self.peer_id(),
},
);
}
None
}
#[instrument(skip_all)]
fn commit_internal(
&self,
config: CommitOptions,
preserve_on_empty: bool,
) -> (
Option<CommitOptions>,
Option<LoroMutexGuard<'_, Option<Transaction>>>,
) {
if !self.auto_commit.load(Acquire) {
let txn_guard = self.txn.lock();
return (None, Some(txn_guard));
}
loop {
if let Some(txn_guard) = self.before_commit() {
return (None, Some(txn_guard));
}
let mut txn_guard = self.txn.lock();
let txn = txn_guard.take();
let Some(mut txn) = txn else {
return (None, Some(txn_guard));
};
let on_commit = txn.take_on_commit();
if let Some(origin) = config.origin.clone() {
txn.set_origin(origin);
}
if let Some(timestamp) = config.timestamp {
txn.set_timestamp(timestamp);
}
if let Some(msg) = config.commit_msg.as_ref() {
txn.set_msg(Some(msg.clone()));
}
let id_span = txn.id_span();
let mut options = txn.commit().unwrap();
if let Some(opts) = options.as_mut() {
if config.origin.is_some() {
opts.set_origin(None);
}
if !preserve_on_empty {
options = None;
}
}
if config.immediate_renew {
if self.can_edit() {
let mut t = self.txn().unwrap();
if let Some(options) = options.as_ref() {
t.set_options(options.clone());
}
*txn_guard = Some(t);
}
}
if let Some(on_commit) = on_commit {
drop(txn_guard);
on_commit(&self.state, &self.oplog, id_span);
txn_guard = self.txn.lock();
if !config.immediate_renew && txn_guard.is_some() {
continue;
}
}
return (
options,
if !config.immediate_renew {
Some(txn_guard)
} else {
None
},
);
}
}
#[instrument(skip_all)]
pub fn commit_with(
&self,
config: CommitOptions,
) -> (
Option<CommitOptions>,
Option<LoroMutexGuard<'_, Option<Transaction>>>,
) {
self.commit_internal(config, false)
}
pub fn set_next_commit_message(&self, message: &str) {
let mut binding = self.txn.lock();
let Some(txn) = binding.as_mut() else {
return;
};
if message.is_empty() {
txn.set_msg(None)
} else {
txn.set_msg(Some(message.into()))
}
}
pub fn set_next_commit_origin(&self, origin: &str) {
let mut txn = self.txn.lock();
if let Some(txn) = txn.as_mut() {
txn.set_origin(origin.into());
}
}
pub fn set_next_commit_timestamp(&self, timestamp: Timestamp) {
let mut txn = self.txn.lock();
if let Some(txn) = txn.as_mut() {
txn.set_timestamp(timestamp);
}
}
pub fn set_next_commit_options(&self, options: CommitOptions) {
let mut txn = self.txn.lock();
if let Some(txn) = txn.as_mut() {
txn.set_options(options);
}
}
pub fn clear_next_commit_options(&self) {
let mut txn = self.txn.lock();
if let Some(txn) = txn.as_mut() {
txn.set_options(CommitOptions::new());
}
}
#[inline]
pub fn set_record_timestamp(&self, record: bool) {
self.config.set_record_timestamp(record);
}
#[inline]
pub fn set_change_merge_interval(&self, interval: i64) {
self.config.set_merge_interval(interval);
}
pub fn can_edit(&self) -> bool {
!self.is_detached() || self.config.detached_editing()
}
pub fn is_detached_editing_enabled(&self) -> bool {
self.config.detached_editing()
}
#[inline]
pub fn config_text_style(&self, text_style: StyleConfigMap) {
self.config.text_style_config.write().map = text_style.map;
}
#[inline]
pub fn config_default_text_style(&self, text_style: Option<StyleConfig>) {
self.config.text_style_config.write().default_style = text_style;
}
pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
let doc = Self::new();
let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
if mode.is_snapshot() {
doc.with_barrier(|| -> Result<(), LoroError> {
decode_snapshot(&doc, mode, body, Default::default())?;
Ok(())
})?;
Ok(doc)
} else {
Err(LoroError::DecodeError(
"Invalid encode mode".to_string().into(),
))
}
}
#[inline(always)]
pub fn can_reset_with_snapshot(&self) -> bool {
let oplog = self.oplog.lock();
if oplog.batch_importing {
return false;
}
if self.is_detached() {
return false;
}
oplog.is_empty() && self.state.lock().can_import_snapshot()
}
#[inline(always)]
pub fn is_detached(&self) -> bool {
self.detached.load(Acquire)
}
pub(crate) fn set_detached(&self, detached: bool) {
self.detached.store(detached, Release);
}
#[inline(always)]
pub fn peer_id(&self) -> PeerID {
self.state
.lock()
.peer
.load(std::sync::atomic::Ordering::Relaxed)
}
#[inline(always)]
pub fn detach(&self) {
self.with_barrier(|| self.set_detached(true));
}
#[inline(always)]
pub fn attach(&self) {
self.checkout_to_latest()
}
pub fn state_timestamp(&self) -> Timestamp {
let f = { self.state.lock().frontiers.clone() };
self.oplog.lock().get_timestamp_of_version(&f)
}
#[inline(always)]
pub fn app_state(&self) -> &Arc<LoroMutex<DocState>> {
&self.state
}
#[inline]
pub fn get_state_deep_value(&self) -> LoroValue {
self.state.lock().get_deep_value()
}
#[inline(always)]
pub fn oplog(&self) -> &Arc<LoroMutex<OpLog>> {
&self.oplog
}
#[inline(always)]
pub fn import(&self, bytes: &[u8]) -> Result<ImportStatus, LoroError> {
let s = debug_span!("import", peer = self.peer_id());
let _e = s.enter();
self.import_with(bytes, Default::default())
}
#[inline]
pub fn import_with(
&self,
bytes: &[u8],
origin: InternalString,
) -> Result<ImportStatus, LoroError> {
self.with_barrier(|| self._import_with(bytes, origin))
}
#[tracing::instrument(skip_all)]
fn _import_with(
&self,
bytes: &[u8],
origin: InternalString,
) -> Result<ImportStatus, LoroError> {
ensure_cov::notify_cov("loro_internal::import");
let parsed = parse_header_and_body(bytes, true)?;
loro_common::info!("Importing with mode={:?}", &parsed.mode);
let result = match parsed.mode {
EncodeMode::OutdatedRle => {
if self.state.lock().is_in_txn() {
return Err(LoroError::ImportWhenInTxn);
}
let s = tracing::span!(
tracing::Level::INFO,
"Import updates ",
peer = self.peer_id()
);
let _e = s.enter();
self.update_oplog_and_apply_delta_to_state_if_needed(
|oplog| oplog.decode(parsed),
origin,
)
}
EncodeMode::OutdatedSnapshot => {
if self.can_reset_with_snapshot() {
loro_common::info!("Init by snapshot {}", self.peer_id());
decode_snapshot(self, parsed.mode, parsed.body, origin)
} else {
self.update_oplog_and_apply_delta_to_state_if_needed(
|oplog| oplog.decode(parsed),
origin,
)
}
}
EncodeMode::FastSnapshot => {
if self.can_reset_with_snapshot() {
ensure_cov::notify_cov("loro_internal::import::snapshot");
loro_common::info!("Init by fast snapshot {}", self.peer_id());
decode_snapshot(self, parsed.mode, parsed.body, origin)
} else {
self.import_changes_and_apply_delta_to_state_if_needed(
|oplog| encoding::decode_oplog_changes(oplog, parsed),
origin,
)
}
}
EncodeMode::FastUpdates => self.import_changes_and_apply_delta_to_state_if_needed(
|oplog| encoding::decode_oplog_changes(oplog, parsed),
origin,
),
EncodeMode::Auto => {
unreachable!()
}
};
self.emit_events();
result
}
#[tracing::instrument(skip_all)]
pub(crate) fn update_oplog_and_apply_delta_to_state_if_needed(
&self,
f: impl FnOnce(&mut OpLog) -> Result<ImportStatus, LoroError>,
origin: InternalString,
) -> Result<ImportStatus, LoroError> {
let mut oplog = self.oplog.lock();
oplog.begin_import_rollback();
if !self.is_detached() {
let old_vv = oplog.vv().clone();
let old_frontiers = oplog.frontiers().clone();
let result = f(&mut oplog);
if &old_vv != oplog.vv() {
let mut diff = DiffCalculator::new(false);
let (diff, diff_mode) = diff.calc_diff_internal(
&oplog,
&old_vv,
&old_frontiers,
oplog.vv(),
oplog.dag.get_frontiers(),
None,
);
let mut state = self.state.lock();
if let Err(e) = state.apply_diff(
InternalDocDiff {
origin,
diff: (diff).into(),
by: EventTriggerKind::Import,
new_version: Cow::Owned(oplog.frontiers().clone()),
},
diff_mode,
) {
oplog.rollback_import();
return Err(e);
}
}
match result {
Ok(result) => {
oplog.commit_import_rollback();
Ok(result)
}
Err(e) => {
if &old_vv == oplog.vv() {
oplog.rollback_import();
} else {
oplog.commit_import_rollback();
}
Err(e)
}
}
} else {
match f(&mut oplog) {
Ok(result) => {
oplog.commit_import_rollback();
Ok(result)
}
Err(e) => {
oplog.rollback_import();
Err(e)
}
}
}
}
#[tracing::instrument(skip_all)]
pub(crate) fn import_changes_and_apply_delta_to_state_if_needed(
&self,
decode_changes: impl FnOnce(&mut OpLog) -> Result<Vec<Change>, LoroError>,
origin: InternalString,
) -> Result<ImportStatus, LoroError> {
let mut oplog = self.oplog.lock();
let arena_checkpoint = oplog.arena.checkpoint_for_rollback();
let changes = match decode_changes(&mut oplog) {
Ok(changes) => changes,
Err(e) => {
oplog.arena.rollback(arena_checkpoint);
return Err(e);
}
};
let preflight = oplog.preflight_import_changes(&changes);
if preflight.has_deps_before_shallow_root
&& (self.is_detached() || !preflight.applies_to_dag)
{
oplog.arena.rollback(arena_checkpoint);
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
if self.is_detached() {
let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
if result.has_deps_before_shallow_root {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
return Ok(result.status);
}
if !preflight.applies_to_dag {
let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
if result.has_deps_before_shallow_root {
oplog.arena.rollback(arena_checkpoint);
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
return Ok(result.status);
}
let old_vv = oplog.vv().clone();
let old_frontiers = oplog.frontiers().clone();
let rollback_enabled = preflight.needs_state_apply_rollback;
if rollback_enabled {
oplog.begin_import_rollback_with_arena(arena_checkpoint);
}
let result = encoding::apply_decoded_changes_to_oplog(&mut oplog, changes);
if &old_vv != oplog.vv() {
let mut diff = DiffCalculator::new(false);
let (diff, diff_mode) = diff.calc_diff_internal(
&oplog,
&old_vv,
&old_frontiers,
oplog.vv(),
oplog.dag.get_frontiers(),
None,
);
let mut state = self.state.lock();
if let Err(e) = state.apply_diff(
InternalDocDiff {
origin,
diff: (diff).into(),
by: EventTriggerKind::Import,
new_version: Cow::Owned(oplog.frontiers().clone()),
},
diff_mode,
) {
if rollback_enabled {
oplog.rollback_import();
return Err(e);
}
panic!("state apply returned Err for import without rollback guard: {e}");
}
}
if result.has_deps_before_shallow_root {
if rollback_enabled {
oplog.commit_import_rollback();
}
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
if rollback_enabled {
oplog.commit_import_rollback();
}
Ok(result.status)
}
fn emit_events(&self) {
let events = {
let mut state = self.state.lock();
state.take_events()
};
for event in events {
self.observer.emit(event);
}
}
pub(crate) fn drop_pending_events(&self) -> Vec<DocDiff> {
let mut state = self.state.lock();
state.take_events()
}
#[tracing::instrument(skip_all)]
pub fn import_json_updates<T: TryInto<JsonSchema>>(&self, json: T) -> LoroResult<ImportStatus> {
let json = json.try_into().map_err(|_| LoroError::InvalidJsonSchema)?;
self.with_barrier(|| {
let result = self.import_changes_and_apply_delta_to_state_if_needed(
|oplog| crate::encoding::json_schema::decode_json_changes(json, &oplog.arena),
Default::default(),
);
self.emit_events();
result
})
}
pub fn export_json_updates(
&self,
start_vv: &VersionVector,
end_vv: &VersionVector,
with_peer_compression: bool,
) -> JsonSchema {
self.with_barrier(|| {
let oplog = self.oplog.lock();
let mut start_vv = start_vv;
let _temp: Option<VersionVector>;
if !oplog.dag.shallow_since_vv().is_empty() {
let mut include_all = true;
for (peer, counter) in oplog.dag.shallow_since_vv().iter() {
if start_vv.get(peer).unwrap_or(&0) < counter {
include_all = false;
break;
}
}
if !include_all {
let mut vv = start_vv.clone();
for (&peer, &counter) in oplog.dag.shallow_since_vv().iter() {
vv.extend_to_include_end_id(ID::new(peer, counter));
}
_temp = Some(vv);
start_vv = _temp.as_ref().unwrap();
}
}
crate::encoding::json_schema::export_json(
&oplog,
start_vv,
end_vv,
with_peer_compression,
)
})
}
pub fn export_json_in_id_span(&self, id_span: IdSpan) -> Vec<JsonChange> {
let oplog = self.oplog.lock();
let mut changes = export_json_in_id_span(&oplog, id_span);
if let Some(uncommit) = oplog.get_uncommitted_change_in_span(id_span) {
let change_json = encode_change(ChangeRef::from_change(&uncommit), &self.arena, None);
changes.push(change_json);
}
changes
}
#[inline]
pub fn oplog_vv(&self) -> VersionVector {
self.oplog.lock().vv().clone()
}
#[inline]
pub fn state_vv(&self) -> VersionVector {
let oplog = self.oplog.lock();
let f = &self.state.lock().frontiers;
oplog.dag.frontiers_to_vv(f).unwrap()
}
pub fn get_by_path(&self, path: &[Index]) -> Option<ValueOrHandler> {
let value: LoroValue = self.state.lock().get_value_by_path(path)?;
if let LoroValue::Container(c) = value {
Some(ValueOrHandler::Handler(Handler::new_attached(
c.clone(),
self.clone(),
)))
} else {
Some(ValueOrHandler::Value(value))
}
}
pub fn get_by_str_path(&self, path: &str) -> Option<ValueOrHandler> {
let path = str_to_path(path)?;
self.get_by_path(&path)
}
pub fn get_uncommitted_ops_as_json(&self) -> Option<JsonSchema> {
let arena = &self.arena;
let txn = self.txn.lock();
let txn = txn.as_ref()?;
let ops_ = txn.local_ops();
let new_id = ID {
peer: *txn.peer(),
counter: ops_.first()?.counter,
};
let change = ChangeRef {
id: &new_id,
deps: txn.frontiers(),
timestamp: &txn
.timestamp()
.as_ref()
.copied()
.unwrap_or_else(|| self.oplog.lock().get_timestamp_for_next_txn()),
commit_msg: txn.msg(),
ops: ops_,
lamport: txn.lamport(),
};
let json = encode_change_to_json(change, arena);
Some(json)
}
#[inline]
pub fn get_handler(&self, id: ContainerID) -> Option<Handler> {
if self.has_container(&id) {
Some(Handler::new_attached(id, self.clone()))
} else {
None
}
}
#[inline]
pub fn try_get_text<I: IntoContainerId>(&self, id: I) -> Option<TextHandler> {
let id = id.into_container_id(&self.arena, ContainerType::Text);
if !self.has_container(&id) {
return None;
}
Handler::new_attached(id, self.clone()).into_text().ok()
}
#[inline]
pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
self.try_get_text(id)
.expect("The container does not exist in the document. Use `try_get_text` or `get_container` to check for existence.")
}
#[inline]
pub fn try_get_list<I: IntoContainerId>(&self, id: I) -> Option<ListHandler> {
let id = id.into_container_id(&self.arena, ContainerType::List);
if !self.has_container(&id) {
return None;
}
Handler::new_attached(id, self.clone()).into_list().ok()
}
#[inline]
pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
self.try_get_list(id)
.expect("The container does not exist in the document. Use `try_get_list` or `get_container` to check for existence.")
}
#[inline]
pub fn try_get_movable_list<I: IntoContainerId>(&self, id: I) -> Option<MovableListHandler> {
let id = id.into_container_id(&self.arena, ContainerType::MovableList);
if !self.has_container(&id) {
return None;
}
Handler::new_attached(id, self.clone())
.into_movable_list()
.ok()
}
#[inline]
pub fn get_movable_list<I: IntoContainerId>(&self, id: I) -> MovableListHandler {
self.try_get_movable_list(id)
.expect("The container does not exist in the document. Use `try_get_movable_list` or `get_container` to check for existence.")
}
#[inline]
pub fn try_get_map<I: IntoContainerId>(&self, id: I) -> Option<MapHandler> {
let id = id.into_container_id(&self.arena, ContainerType::Map);
if !self.has_container(&id) {
return None;
}
Handler::new_attached(id, self.clone()).into_map().ok()
}
#[inline]
pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
self.try_get_map(id)
.expect("The container does not exist in the document. Use `try_get_map` or `get_container` to check for existence.")
}
#[inline]
pub fn try_get_tree<I: IntoContainerId>(&self, id: I) -> Option<TreeHandler> {
let id = id.into_container_id(&self.arena, ContainerType::Tree);
if !self.has_container(&id) {
return None;
}
Handler::new_attached(id, self.clone()).into_tree().ok()
}
#[inline]
pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
self.try_get_tree(id)
.expect("The container does not exist in the document. Use `try_get_tree` or `get_container` to check for existence.")
}
#[cfg(feature = "counter")]
pub fn try_get_counter<I: IntoContainerId>(
&self,
id: I,
) -> Option<crate::handler::counter::CounterHandler> {
let id = id.into_container_id(&self.arena, ContainerType::Counter);
if !self.has_container(&id) {
return None;
}
Handler::new_attached(id, self.clone()).into_counter().ok()
}
#[cfg(feature = "counter")]
pub fn get_counter<I: IntoContainerId>(
&self,
id: I,
) -> crate::handler::counter::CounterHandler {
self.try_get_counter(id)
.expect("The container does not exist in the document. Use `try_get_counter` or `get_container` to check for existence.")
}
#[must_use]
pub fn has_container(&self, id: &ContainerID) -> bool {
if id.is_root() {
return true;
}
let exist = self.state.lock().does_container_exist(id);
exist
}
#[instrument(level = "info", skip_all)]
pub fn undo_internal(
&self,
id_span: IdSpan,
container_remap: &mut FxHashMap<ContainerID, ContainerID>,
post_transform_base: Option<&DiffBatch>,
before_diff: &mut dyn FnMut(&DiffBatch),
) -> LoroResult<CommitWhenDrop<'_>> {
if !self.can_edit() {
return Err(LoroError::EditWhenDetached);
}
let (options, txn) = self.implicit_commit_then_stop();
if !self.oplog().lock().vv().includes_id(id_span.id_last()) {
self.renew_txn_if_auto_commit(options);
return Err(LoroError::UndoInvalidIdSpan(id_span.id_last()));
}
let (was_recording, latest_frontiers) = {
let mut state = self.state.lock();
let was_recording = state.is_recording();
state.stop_and_clear_recording();
(was_recording, state.frontiers.clone())
};
let spans = self.oplog.lock().split_span_based_on_deps(id_span);
let diff = crate::undo::undo(
spans,
match post_transform_base {
Some(d) => Either::Right(d),
None => Either::Left(&latest_frontiers),
},
|from, to| {
self._checkout_without_emitting(from, false, false).unwrap();
self.state.lock().start_recording();
self._checkout_without_emitting(to, false, false).unwrap();
let mut state = self.state.lock();
let e = state.take_events();
state.stop_and_clear_recording();
DiffBatch::new(e)
},
before_diff,
);
self._checkout_without_emitting(&latest_frontiers, false, false)?;
self.set_detached(false);
if was_recording {
self.state.lock().start_recording();
}
drop(txn);
self.start_auto_commit();
if let Err(e) = self._apply_diff(diff, container_remap, true) {
warn!("Undo Failed {:?}", e);
}
if let Some(options) = options {
self.set_next_commit_options(options);
}
Ok(CommitWhenDrop {
doc: self,
default_options: CommitOptions::new().origin("undo"),
})
}
pub fn revert_to(&self, target: &Frontiers) -> LoroResult<()> {
let f = self.state_frontiers();
let diff = self.diff(&f, target)?;
self._apply_diff(diff, &mut Default::default(), false)
}
pub fn diff(&self, a: &Frontiers, b: &Frontiers) -> LoroResult<DiffBatch> {
{
let oplog = self.oplog.lock();
let validate_frontiers = |frontiers: &Frontiers| -> LoroResult<()> {
for id in frontiers.iter() {
if !oplog.dag.contains(id) {
return Err(LoroError::FrontiersNotFound(id));
}
}
if oplog.dag.is_before_shallow_root(frontiers) {
return Err(LoroError::SwitchToVersionBeforeShallowRoot);
}
Ok(())
};
validate_frontiers(a)?;
validate_frontiers(b)?;
}
let (options, txn) = self.implicit_commit_then_stop();
let was_detached = self.is_detached();
let old_frontiers = self.state_frontiers();
let was_recording = {
let mut state = self.state.lock();
let is_recording = state.is_recording();
state.stop_and_clear_recording();
is_recording
};
let result = (|| {
self._checkout_without_emitting(a, true, false)?;
self.state.lock().start_recording();
self._checkout_without_emitting(b, true, false)?;
let mut state = self.state.lock();
let e = state.take_events();
state.stop_and_clear_recording();
Ok::<_, LoroError>(e)
})();
self._checkout_without_emitting(&old_frontiers, false, false)
.unwrap();
drop(txn);
if !was_detached {
self.set_detached(false);
self.renew_txn_if_auto_commit(options);
}
if was_recording {
self.state.lock().start_recording();
}
result.map(DiffBatch::new)
}
#[inline(always)]
pub fn apply_diff(&self, diff: DiffBatch) -> LoroResult<()> {
self._apply_diff(diff, &mut Default::default(), true)
}
pub(crate) fn _apply_diff(
&self,
diff: DiffBatch,
container_remap: &mut FxHashMap<ContainerID, ContainerID>,
skip_unreachable: bool,
) -> LoroResult<()> {
if !self.can_edit() {
return Err(LoroError::EditWhenDetached);
}
let mut ans: LoroResult<()> = Ok(());
let mut missing_containers: Vec<ContainerID> = Vec::new();
for (mut id, diff) in diff.into_iter() {
let mut remapped = false;
while let Some(rid) = container_remap.get(&id) {
remapped = true;
id = rid.clone();
}
if matches!(&id, ContainerID::Normal { .. }) && self.arena.id_to_idx(&id).is_none() {
let exists = self.state.lock().does_container_exist(&id);
if !exists {
missing_containers.push(id);
continue;
}
self.state.lock().ensure_container(&id);
}
if skip_unreachable && !remapped && !self.state.lock().get_reachable(&id) {
continue;
}
let Some(h) = self.get_handler(id.clone()) else {
return Err(LoroError::ContainersNotFound {
containers: Box::new(vec![id]),
});
};
if let Err(e) = h.apply_diff(diff, container_remap) {
ans = Err(e);
}
}
if !missing_containers.is_empty() {
return Err(LoroError::ContainersNotFound {
containers: Box::new(missing_containers),
});
}
ans
}
#[inline]
pub fn diagnose_size(&self) {
self.oplog().lock().diagnose_size();
}
#[inline]
pub fn oplog_frontiers(&self) -> Frontiers {
self.oplog().lock().frontiers().clone()
}
#[inline]
pub fn state_frontiers(&self) -> Frontiers {
self.state.lock().frontiers.clone()
}
#[inline]
pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
self.oplog().lock().cmp_with_frontiers(other)
}
#[inline]
pub fn cmp_frontiers(
&self,
a: &Frontiers,
b: &Frontiers,
) -> Result<Option<Ordering>, FrontiersNotIncluded> {
self.oplog().lock().cmp_frontiers(a, b)
}
pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
let mut state = self.state.lock();
if !state.is_recording() {
state.start_recording();
}
self.observer.subscribe_root(callback)
}
pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
let mut state = self.state.lock();
if !state.is_recording() {
state.start_recording();
}
self.observer.subscribe(container_id, callback)
}
pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
let (sub, activate) = self.local_update_subs.inner().insert((), callback);
activate();
sub
}
#[tracing::instrument(skip_all)]
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<ImportStatus> {
if bytes.is_empty() {
return Ok(ImportStatus::default());
}
if bytes.len() == 1 {
return self.import(&bytes[0]);
}
let mut success = VersionRange::default();
let mut meta_arr = bytes
.iter()
.map(|b| Ok((LoroDoc::decode_import_blob_meta(b, false)?, b)))
.collect::<LoroResult<Vec<(ImportBlobMetadata, &Vec<u8>)>>>()?;
meta_arr.sort_by(|a, b| {
a.0.mode
.cmp(&b.0.mode)
.then(b.0.change_num.cmp(&a.0.change_num))
});
let (options, txn) = self.implicit_commit_then_stop();
let is_detached = self.is_detached();
self.set_detached(true);
self.oplog.lock().batch_importing = true;
let mut err = None;
for (_meta, data) in meta_arr {
match self._import_with(data, Default::default()) {
Ok(s) => {
for (peer, (start, end)) in s.success.iter() {
match success.0.entry(*peer) {
Entry::Occupied(mut e) => {
e.get_mut().1 = *end.max(&e.get().1);
}
Entry::Vacant(e) => {
e.insert((*start, *end));
}
}
}
}
Err(e) => {
err = Some(e);
}
}
}
let mut oplog = self.oplog.lock();
oplog.batch_importing = false;
let pending = oplog.pending_changes.version_range();
drop(oplog);
if !is_detached {
self._checkout_to_latest_with_guard(txn);
} else {
drop(txn);
}
self.renew_txn_if_auto_commit(options);
if let Some(err) = err {
return Err(err);
}
Ok(ImportStatus {
success,
pending: if pending.is_empty() {
None
} else {
Some(pending)
},
})
}
#[inline]
pub fn get_value(&self) -> LoroValue {
self.state.lock().get_value()
}
#[inline]
pub fn get_deep_value(&self) -> LoroValue {
self.state.lock().get_deep_value()
}
#[inline]
pub fn get_deep_value_with_id(&self) -> LoroValue {
self.state.lock().get_deep_value_with_id()
}
pub fn checkout_to_latest(&self) {
let (options, _guard) = self.implicit_commit_then_stop();
if !self.is_detached() {
drop(_guard);
self.renew_txn_if_auto_commit(options);
return;
}
self._checkout_to_latest_without_commit(true)
.expect("checkout to oplog frontiers should succeed");
self.emit_events();
drop(_guard);
self.renew_txn_if_auto_commit(options);
}
fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
if !self.is_detached() {
self._renew_txn_if_auto_commit_with_guard(None, guard);
return;
}
self._checkout_to_latest_without_commit(true)
.expect("checkout to oplog frontiers should succeed");
self._renew_txn_if_auto_commit_with_guard(None, guard);
}
pub(crate) fn _checkout_to_latest_without_commit(
&self,
to_commit_then_renew: bool,
) -> LoroResult<()> {
tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {
let f = self.oplog_frontiers();
let this = &self;
let frontiers = &f;
this._checkout_without_emitting(frontiers, false, to_commit_then_renew)?;
this.emit_events();
if this.config.detached_editing() {
this.renew_peer_id();
}
self.set_detached(false);
Ok(())
})
}
pub fn checkout(&self, frontiers: &Frontiers) -> LoroResult<()> {
let was_detached = self.is_detached();
let (options, guard) = self.implicit_commit_then_stop();
let result = self._checkout_without_emitting(frontiers, true, true);
if result.is_ok() {
self.emit_events();
}
drop(guard);
if self.config.detached_editing() {
if result.is_ok() {
self.renew_peer_id();
}
self.renew_txn_if_auto_commit(options);
} else if result.is_err() {
if !was_detached {
self.renew_txn_if_auto_commit(options);
}
} else if !self.is_detached() {
self.renew_txn_if_auto_commit(options);
}
result
}
#[instrument(level = "info", skip(self))]
pub(crate) fn _checkout_without_emitting(
&self,
frontiers: &Frontiers,
to_shrink_frontiers: bool,
to_commit_then_renew: bool,
) -> Result<(), LoroError> {
if !self.txn.is_locked() {
return Err(LoroError::TransactionError(
"checkout requires the transaction mutex to be held"
.to_string()
.into_boxed_str(),
));
}
let from_frontiers = self.state_frontiers();
loro_common::info!(
"checkout from={:?} to={:?} cur_vv={:?}",
from_frontiers,
frontiers,
self.oplog_vv()
);
if &from_frontiers == frontiers {
return Ok(());
}
let oplog = self.oplog.lock();
if oplog.dag.is_before_shallow_root(frontiers) {
return Err(LoroError::SwitchToVersionBeforeShallowRoot);
}
let frontiers = if to_shrink_frontiers {
shrink_frontiers(frontiers, &oplog.dag).map_err(LoroError::FrontiersNotFound)?
} else {
frontiers.clone()
};
if from_frontiers == frontiers {
return Ok(());
}
let mut state = self.state.lock();
let mut calc = self.diff_calculator.lock();
for i in frontiers.iter() {
if !oplog.dag.contains(i) {
return Err(LoroError::FrontiersNotFound(i));
}
}
let before = oplog.dag.frontiers_to_vv(&state.frontiers).ok_or_else(|| {
LoroError::NotFoundError(
format!(
"Cannot find the current state version {:?}",
state.frontiers
)
.into_boxed_str(),
)
})?;
let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
return Err(LoroError::NotFoundError(
format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
));
};
self.set_detached(true);
let (diff, diff_mode) =
calc.calc_diff_internal(&oplog, &before, &state.frontiers, after, &frontiers, None);
state.apply_diff(
InternalDocDiff {
origin: "checkout".into(),
diff: Cow::Owned(diff),
by: EventTriggerKind::Checkout,
new_version: Cow::Owned(frontiers.clone()),
},
diff_mode,
)?;
Ok(())
}
#[inline]
pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
self.oplog.lock().dag.vv_to_frontiers(vv)
}
#[inline]
pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
self.oplog.lock().dag.frontiers_to_vv(frontiers)
}
pub fn merge(&self, other: &Self) -> LoroResult<ImportStatus> {
let updates = other.export(ExportMode::updates(&self.oplog_vv())).unwrap();
self.import(&updates)
}
pub(crate) fn arena(&self) -> &SharedArena {
&self.arena
}
#[inline]
pub fn len_ops(&self) -> usize {
if self.oplog.can_lock_in_this_thread() {
return self.oplog.lock().visible_op_count_exact();
}
self.visible_op_count.load(Acquire)
}
#[inline]
pub fn len_changes(&self) -> usize {
let oplog = self.oplog.lock();
oplog.len_changes()
}
pub fn config(&self) -> &Configure {
&self.config
}
pub fn check_state_diff_calc_consistency_slow(&self) {
{
static IS_CHECKING: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
if IS_CHECKING.load(std::sync::atomic::Ordering::Acquire) {
return;
}
IS_CHECKING.store(true, std::sync::atomic::Ordering::Release);
let peer_id = self.peer_id();
let s = info_span!("CheckStateDiffCalcConsistencySlow", ?peer_id);
let _g = s.enter();
let options = self.implicit_commit_then_stop().0;
self.oplog.lock().check_dag_correctness();
if self.is_shallow() {
let initial_snapshot = self
.export(ExportMode::state_only(Some(
&self.shallow_since_frontiers(),
)))
.unwrap();
let doc = LoroDoc::new();
doc.import(&initial_snapshot).unwrap();
self.checkout(&self.shallow_since_frontiers()).unwrap();
assert_eq!(self.get_deep_value(), doc.get_deep_value());
let updates = self.export(ExportMode::all_updates()).unwrap();
doc.import(&updates).unwrap();
self.checkout_to_latest();
assert_eq!(doc.get_deep_value(), self.get_deep_value());
let mut calculated_state = doc.app_state().lock();
let mut current_state = self.app_state().lock();
current_state.check_is_the_same(&mut calculated_state);
} else {
let f = self.state_frontiers();
let vv = self.oplog().lock().dag.frontiers_to_vv(&f).unwrap();
let bytes = self.export(ExportMode::updates_till(&vv)).unwrap();
let doc = Self::new();
doc.import(&bytes).unwrap();
let mut calculated_state = doc.app_state().lock();
let mut current_state = self.app_state().lock();
current_state.check_is_the_same(&mut calculated_state);
}
self.renew_txn_if_auto_commit(options);
IS_CHECKING.store(false, std::sync::atomic::Ordering::Release);
}
}
pub fn query_pos(&self, pos: &Cursor) -> Result<PosQueryResult, CannotFindRelativePosition> {
self.query_pos_internal(pos, true)
}
pub(crate) fn query_pos_internal(
&self,
pos: &Cursor,
ret_event_index: bool,
) -> Result<PosQueryResult, CannotFindRelativePosition> {
if !self.has_container(&pos.container) {
return Err(CannotFindRelativePosition::IdNotFound);
}
let mut state = self.state.lock();
if let Some(ans) = state.get_relative_position(pos, ret_event_index) {
Ok(PosQueryResult {
update: None,
current: AbsolutePosition {
pos: ans,
side: pos.side,
},
})
} else {
drop(state);
let result = self.with_barrier(|| {
let oplog = self.oplog().lock();
if let Some(id) = pos.id {
if oplog.arena.id_to_idx(&pos.container).is_none() {
let mut s = self.state.lock();
if !s.does_container_exist(&pos.container) {
return Err(CannotFindRelativePosition::ContainerDeleted);
}
s.ensure_container(&pos.container);
drop(s);
}
let idx = oplog.arena.id_to_idx(&pos.container).unwrap();
let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
if oplog.shallow_since_vv().includes_id(id) {
return Err(CannotFindRelativePosition::HistoryCleared);
}
tracing::error!("Cannot find id {}", id);
return Err(CannotFindRelativePosition::IdNotFound);
};
let mut diff_calc = DiffCalculator::new(true);
let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
let before = &oplog.dag.frontiers_to_vv(&before_frontiers).unwrap();
diff_calc.calc_diff_internal(
&oplog,
before,
&before_frontiers,
oplog.vv(),
oplog.frontiers(),
Some(&|target| idx == target),
);
let depth = self.arena.get_depth(idx);
let (_, diff_calc) = &mut diff_calc.get_or_create_calc(idx, depth);
match diff_calc {
crate::diff_calc::ContainerDiffCalculator::Richtext(text) => {
let c = text.get_id_latest_pos(id).unwrap();
let new_pos = c.pos;
let handler = self.get_text(&pos.container);
let current_pos = handler.convert_entity_index_to_event_index(new_pos);
Ok(PosQueryResult {
update: handler.get_cursor(current_pos, c.side),
current: AbsolutePosition {
pos: current_pos,
side: c.side,
},
})
}
crate::diff_calc::ContainerDiffCalculator::List(list) => {
let c = list.get_id_latest_pos(id).unwrap();
let new_pos = c.pos;
let handler = self.get_list(&pos.container);
Ok(PosQueryResult {
update: handler.get_cursor(new_pos, c.side),
current: AbsolutePosition {
pos: new_pos,
side: c.side,
},
})
}
crate::diff_calc::ContainerDiffCalculator::MovableList(list) => {
let c = list.get_id_latest_pos(id).unwrap();
let new_pos = c.pos;
let handler = self.get_movable_list(&pos.container);
let new_pos = handler.op_pos_to_user_pos(new_pos);
Ok(PosQueryResult {
update: handler.get_cursor(new_pos, c.side),
current: AbsolutePosition {
pos: new_pos,
side: c.side,
},
})
}
crate::diff_calc::ContainerDiffCalculator::Tree(_) => unreachable!(),
crate::diff_calc::ContainerDiffCalculator::Map(_) => unreachable!(),
#[cfg(feature = "counter")]
crate::diff_calc::ContainerDiffCalculator::Counter(_) => unreachable!(),
crate::diff_calc::ContainerDiffCalculator::Unknown(_) => unreachable!(),
}
} else {
match pos.container.container_type() {
ContainerType::Text => {
let text = self.get_text(&pos.container);
Ok(PosQueryResult {
update: Some(Cursor {
id: None,
container: text.id(),
side: pos.side,
origin_pos: text.len_unicode(),
}),
current: AbsolutePosition {
pos: text.len_event(),
side: pos.side,
},
})
}
ContainerType::List => {
let list = self.get_list(&pos.container);
Ok(PosQueryResult {
update: Some(Cursor {
id: None,
container: list.id(),
side: pos.side,
origin_pos: list.len(),
}),
current: AbsolutePosition {
pos: list.len(),
side: pos.side,
},
})
}
ContainerType::MovableList => {
let list = self.get_movable_list(&pos.container);
Ok(PosQueryResult {
update: Some(Cursor {
id: None,
container: list.id(),
side: pos.side,
origin_pos: list.len(),
}),
current: AbsolutePosition {
pos: list.len(),
side: pos.side,
},
})
}
ContainerType::Map | ContainerType::Tree | ContainerType::Unknown(_) => {
unreachable!()
}
#[cfg(feature = "counter")]
ContainerType::Counter => unreachable!(),
}
}
});
result
}
}
pub fn free_history_cache(&self) {
self.oplog.lock().free_history_cache();
}
pub fn free_diff_calculator(&self) {
*self.diff_calculator.lock() = DiffCalculator::new(true);
}
pub fn has_history_cache(&self) -> bool {
self.oplog.lock().has_history_cache()
}
#[inline]
pub fn compact_change_store(&self) {
self.with_barrier(|| {
self.oplog.lock().compact_change_store();
});
}
#[inline]
pub fn analyze(&self) -> DocAnalysis {
DocAnalysis::analyze(self)
}
pub fn get_path_to_container(&self, id: &ContainerID) -> Option<Vec<(ContainerID, Index)>> {
let mut state = self.state.lock();
if state.arena.id_to_idx(id).is_none() {
if !state.does_container_exist(id) {
return None;
}
state.ensure_container(id);
}
let idx = state.arena.id_to_idx(id).unwrap();
state.get_path(idx)
}
#[instrument(skip(self))]
pub fn export(&self, mode: ExportMode) -> Result<Vec<u8>, LoroEncodeError> {
self.with_barrier(|| {
let ans = match mode {
ExportMode::Snapshot => export_fast_snapshot(self),
ExportMode::Updates { from } => export_fast_updates(self, &from),
ExportMode::UpdatesInRange { spans } => {
export_fast_updates_in_range(&self.oplog.lock(), spans.as_ref())
}
ExportMode::ShallowSnapshot(f) => export_shallow_snapshot(self, &f)?,
ExportMode::StateOnly(f) => match f {
Some(f) => export_state_only_snapshot(self, &f)?,
None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
},
ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
};
Ok(ans)
})
}
pub fn shallow_since_vv(&self) -> ImVersionVector {
self.oplog().lock().shallow_since_vv().clone()
}
pub fn shallow_since_frontiers(&self) -> Frontiers {
self.oplog().lock().shallow_since_frontiers().clone()
}
pub fn is_shallow(&self) -> bool {
!self.oplog().lock().shallow_since_vv().is_empty()
}
pub fn get_pending_txn_len(&self) -> usize {
if let Some(txn) = self.txn.lock().as_ref() {
txn.len()
} else {
0
}
}
#[inline]
pub fn find_id_spans_between(&self, from: &Frontiers, to: &Frontiers) -> VersionVectorDiff {
self.oplog().lock().dag.find_path(from, to)
}
pub fn subscribe_first_commit_from_peer(
&self,
callback: FirstCommitFromPeerCallback,
) -> Subscription {
let (s, enable) = self
.first_commit_from_peer_subs
.inner()
.insert((), callback);
enable();
s
}
pub fn subscribe_pre_commit(&self, callback: PreCommitCallback) -> Subscription {
let (s, enable) = self.pre_commit_subs.inner().insert((), callback);
enable();
s
}
}
#[derive(Debug, thiserror::Error)]
pub enum ChangeTravelError {
#[error("Target id not found {0:?}")]
TargetIdNotFound(ID),
#[error("The shallow history of the doc doesn't include the target version")]
TargetVersionNotIncluded,
}
impl LoroDoc {
pub fn travel_change_ancestors(
&self,
ids: &[ID],
f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
) -> Result<(), ChangeTravelError> {
let (options, guard) = self.implicit_commit_then_stop();
drop(guard);
struct PendingNode(ChangeMeta);
impl PartialEq for PendingNode {
fn eq(&self, other: &Self) -> bool {
self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
}
}
impl Eq for PendingNode {}
impl PartialOrd for PendingNode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PendingNode {
fn cmp(&self, other: &Self) -> Ordering {
self.0
.lamport_last()
.cmp(&other.0.lamport_last())
.then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
}
}
for id in ids {
let op_log = &self.oplog().lock();
if !op_log.vv().includes_id(*id) {
return Err(ChangeTravelError::TargetIdNotFound(*id));
}
if op_log.dag.shallow_since_vv().includes_id(*id) {
return Err(ChangeTravelError::TargetVersionNotIncluded);
}
}
let mut visited = FxHashSet::default();
let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
for id in ids {
pending.push(PendingNode(ChangeMeta::from_change(
&self.oplog().lock().get_change_at(*id).unwrap(),
)));
}
while let Some(PendingNode(node)) = pending.pop() {
let deps = node.deps.clone();
if f(node).is_break() {
break;
}
for dep in deps.iter() {
let Some(dep_node) = self.oplog().lock().get_change_at(dep) else {
continue;
};
if visited.contains(&dep_node.id) {
continue;
}
visited.insert(dep_node.id);
pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
}
}
let ans = Ok(());
self.renew_txn_if_auto_commit(options);
ans
}
pub fn get_changed_containers_in(&self, id: ID, len: usize) -> FxHashSet<ContainerID> {
self.with_barrier(|| {
let mut set = FxHashSet::default();
{
let oplog = self.oplog().lock();
for op in oplog.iter_ops(id.to_span(len)) {
let id = oplog.arena.get_container_id(op.container()).unwrap();
set.insert(id);
}
}
set
})
}
pub fn delete_root_container(&self, cid: ContainerID) {
if !cid.is_root() {
return;
}
if !self.has_container(&cid) {
return;
}
let Some(h) = self.get_handler(cid.clone()) else {
return;
};
if let Err(e) = h.clear() {
eprintln!("Failed to clear handler: {:?}", e);
return;
}
self.config.deleted_root_containers.lock().insert(cid);
}
pub fn set_hide_empty_root_containers(&self, hide: bool) {
self.config
.hide_empty_root_containers
.store(hide, std::sync::atomic::Ordering::Relaxed);
}
}
fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
let start_vv = oplog
.dag
.frontiers_to_vv(&id.into())
.unwrap_or_else(|| oplog.shallow_since_vv().to_vv());
for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
for op in change.ops.iter().rev() {
if op.container != idx {
continue;
}
if let InnerContent::List(InnerListOp::Delete(d)) = &op.content {
if d.id_start.to_span(d.atom_len()).contains(id) {
return Some(ID::new(change.peer(), op.counter));
}
}
}
}
None
}
#[derive(Debug)]
pub struct CommitWhenDrop<'a> {
doc: &'a LoroDoc,
default_options: CommitOptions,
}
impl Drop for CommitWhenDrop<'_> {
fn drop(&mut self) {
{
let mut guard = self.doc.txn.lock();
if let Some(txn) = guard.as_mut() {
txn.set_default_options(std::mem::take(&mut self.default_options));
};
}
self.doc.commit_then_renew();
}
}
#[derive(Debug, Clone)]
pub struct CommitOptions {
pub origin: Option<InternalString>,
pub immediate_renew: bool,
pub timestamp: Option<Timestamp>,
pub commit_msg: Option<Arc<str>>,
}
impl CommitOptions {
pub fn new() -> Self {
Self {
origin: None,
immediate_renew: true,
timestamp: None,
commit_msg: None,
}
}
pub fn origin(mut self, origin: &str) -> Self {
self.origin = Some(origin.into());
self
}
pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
self.immediate_renew = immediate_renew;
self
}
pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
self.timestamp = Some(timestamp);
self
}
pub fn commit_msg(mut self, commit_msg: &str) -> Self {
self.commit_msg = Some(commit_msg.into());
self
}
pub fn set_origin(&mut self, origin: Option<&str>) {
self.origin = origin.map(|x| x.into())
}
pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
self.timestamp = timestamp;
}
}
impl Default for CommitOptions {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod test {
use std::{
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use crate::{
cursor::PosType,
encoding::json_schema::json::{JsonOpContent, JsonSchema, ListOp},
encoding::{fast_snapshot::EMPTY_MARK, EncodeMode},
loro::ExportMode,
version::{Frontiers, VersionVector},
LoroDoc, ToJson, TreeParentId,
};
use bytes::{BufMut, Bytes};
use loro_common::ID;
use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
fn encode_import_blob(mode: EncodeMode, body: &[u8]) -> Vec<u8> {
let mut ans = Vec::new();
ans.extend_from_slice(b"loro");
ans.extend_from_slice(&[0; 16]);
ans.extend_from_slice(&mode.to_bytes());
ans.extend_from_slice(body);
let checksum = xxhash_rust::xxh32::xxh32(&ans[20..], XXH_SEED);
ans[16..20].copy_from_slice(&checksum.to_le_bytes());
ans
}
fn encode_fast_snapshot_import(oplog_bytes: &[u8]) -> Vec<u8> {
let mut body = Vec::new();
body.put_u32_le(oplog_bytes.len() as u32);
body.extend_from_slice(oplog_bytes);
body.put_u32_le(EMPTY_MARK.len() as u32);
body.extend_from_slice(EMPTY_MARK);
body.put_u32_le(0);
encode_import_blob(EncodeMode::FastSnapshot, &body)
}
fn sstable_with_huge_meta_block_count() -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(b"LORO");
bytes.push(0);
bytes.put_u32_le(10_000_000);
bytes.put_u32_le(xxhash_rust::xxh32::xxh32(&[], XXH_SEED));
bytes.put_u32_le(5);
bytes
}
fn snapshot_oplog_with_malformed_block() -> Vec<u8> {
let peer = 1;
let id = ID::new(peer, 0);
let vv = VersionVector::from_iter([(peer, 1)]);
let frontiers = Frontiers::from_id(id);
let mut store = MemKvStore::new(MemKvConfig::default());
store.set(b"vv", vv.encode().into());
store.set(b"fr", frontiers.encode().into());
store.set(&id.to_bytes(), Bytes::from_static(&[0]));
store.export_all().to_vec()
}
fn make_json_import_stress_doc(peer: u64) -> LoroDoc {
let doc = LoroDoc::new_auto_commit();
doc.set_peer_id(peer).unwrap();
let text = doc.get_text("text");
let mut text_pos = 0;
for i in 0..32 {
let chunk = format!("segment-{i}-abcdefghijklmnopqrstuvwxyz;");
text.insert_unicode(text_pos, &chunk).unwrap();
text_pos += chunk.chars().count();
}
let list = doc.get_list("list");
for i in 0..32 {
list.insert(i, format!("item-{i}")).unwrap();
}
let map = doc.get_map("map");
for i in 0..32 {
let key = format!("key-{i}");
map.insert(&key, format!("value-{i}")).unwrap();
}
let tree = doc.get_tree("tree");
let mut parent = TreeParentId::Root;
for i in 0..16 {
let node = tree.create(parent).unwrap();
let meta = tree.get_meta(node).unwrap();
meta.insert("name", format!("node-{i}")).unwrap();
meta.insert("payload", format!("payload-{i}-{}", "x".repeat(16)))
.unwrap();
parent = TreeParentId::Node(node);
}
doc
}
fn make_json_list_update_with_four_ops(peer: u64) -> (LoroDoc, JsonSchema) {
let doc = LoroDoc::new();
doc.set_peer_id(peer).unwrap();
let map = doc.get_map("map");
let list = doc.get_list("list");
let text = doc.get_text("text");
let mut txn = doc.txn().unwrap();
map.insert_with_txn(&mut txn, "prefix", "map-value".into())
.unwrap();
list.insert_with_txn(&mut txn, 0, "seed".into()).unwrap();
text.insert_with_txn(&mut txn, 0, "text-value", PosType::Unicode)
.unwrap();
list.insert_with_txn(&mut txn, 1, "tail".into()).unwrap();
txn.commit().unwrap();
let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv(), false);
assert_eq!(json.changes.len(), 1);
assert_eq!(json.changes[0].ops.len(), 4);
(doc, json)
}
fn move_last_list_insert_far_out_of_bounds(json: &mut JsonSchema) {
let last_change = json.changes.last_mut().unwrap();
let last_op = last_change.ops.last_mut().unwrap();
match &mut last_op.content {
JsonOpContent::List(ListOp::Insert { pos, .. }) => {
*pos = 1_000;
}
other => panic!("expected list insert op, got {other:?}"),
}
}
#[test]
fn test_sync() {
fn is_send_sync<T: Send + Sync>(_v: T) {}
let loro = super::LoroDoc::new();
is_send_sync(loro)
}
#[test]
fn import_rejects_huge_sstable_meta_block_count_without_panic() {
let bytes = encode_fast_snapshot_import(&sstable_with_huge_meta_block_count());
let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
assert!(result.is_ok(), "malformed import should not panic");
assert!(result.unwrap().is_err());
}
#[test]
fn import_rejects_malformed_change_block_without_panic() {
let bytes = encode_fast_snapshot_import(&snapshot_oplog_with_malformed_block());
let result = std::panic::catch_unwind(AssertUnwindSafe(|| LoroDoc::new().import(&bytes)));
assert!(result.is_ok(), "malformed import should not panic");
assert!(result.unwrap().is_err());
}
#[test]
fn failed_import_rolls_back_oplog_and_arena() {
let src = LoroDoc::new();
src.set_peer_id(1).unwrap();
let text = src.get_text("text");
let mut txn = src.txn().unwrap();
text.insert_with_txn(&mut txn, 0, "hello", PosType::Unicode)
.unwrap();
txn.commit().unwrap();
let update = src.export(ExportMode::all_updates()).unwrap();
let dst = LoroDoc::new();
let vv_before_import = dst.oplog_vv();
let state_before_import = dst.get_deep_value();
let err = dst
.import_with(&update, "__loro_fail_import_state_apply".into())
.unwrap_err();
assert!(err.to_string().contains("state apply failpoint"));
assert_eq!(dst.oplog_vv(), vv_before_import);
assert_eq!(dst.get_deep_value(), state_before_import);
assert!(dst.oplog().lock().is_empty());
dst.import(&update).unwrap();
assert_eq!(dst.get_deep_value(), src.get_deep_value());
}
#[test]
fn failed_incremental_import_restores_previous_change_store_block() {
let src = LoroDoc::new();
src.set_peer_id(1).unwrap();
let text = src.get_text("text");
let mut txn = src.txn().unwrap();
text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
.unwrap();
txn.commit().unwrap();
let first_update = src.export(ExportMode::all_updates()).unwrap();
let first_vv = src.oplog_vv();
let mut txn = src.txn().unwrap();
text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
.unwrap();
txn.commit().unwrap();
let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
let dst = LoroDoc::new();
dst.import(&first_update).unwrap();
let vv_before_import = dst.oplog_vv();
let state_before_import = dst.get_deep_value();
dst.import_with(&second_update, "__loro_fail_import_state_apply".into())
.unwrap_err();
assert_eq!(dst.oplog_vv(), vv_before_import);
assert_eq!(dst.get_deep_value(), state_before_import);
dst.import(&second_update).unwrap();
assert_eq!(dst.get_deep_value(), src.get_deep_value());
}
#[test]
fn failed_import_json_updates_rolls_back_complex_empty_doc() {
let src = make_json_import_stress_doc(11);
let json = src.export_json_updates(&Default::default(), &src.oplog_vv(), false);
let dst = LoroDoc::new();
let vv_before_import = dst.oplog_vv();
let frontiers_before_import = dst.oplog_frontiers();
let state_before_import = dst.get_deep_value();
for _ in 0..3 {
crate::state::fail_next_import_state_apply_for_test();
let err = dst.import_json_updates(json.clone()).unwrap_err();
assert!(err.to_string().contains("state apply failpoint"));
assert_eq!(dst.oplog_vv(), vv_before_import);
assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
assert_eq!(dst.get_deep_value(), state_before_import);
assert!(dst.oplog().lock().is_empty());
}
dst.import_json_updates(json).unwrap();
assert_eq!(dst.oplog_vv(), src.oplog_vv());
assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
assert_eq!(dst.get_deep_value(), src.get_deep_value());
}
#[test]
fn failed_incremental_import_json_updates_restores_previous_change_store_block() {
let src = LoroDoc::new_auto_commit();
src.set_peer_id(12).unwrap();
let text = src.get_text("text");
text.insert_unicode(0, "a").unwrap();
let list = src.get_list("list");
list.push("seed").unwrap();
let map = src.get_map("map");
map.insert("seed", "value").unwrap();
let tree = src.get_tree("tree");
let root = tree.create(TreeParentId::Root).unwrap();
tree.get_meta(root).unwrap().insert("name", "root").unwrap();
let first_vv = src.oplog_vv();
let first_json = src.export_json_updates(&Default::default(), &first_vv, false);
let mut text_pos = text.len_unicode();
for i in 0..64 {
let chunk = format!("chunk-{i};");
text.insert_unicode(text_pos, &chunk).unwrap();
text_pos += chunk.chars().count();
}
for i in 0..32 {
list.push(format!("after-{i}")).unwrap();
let key = format!("after-{i}");
map.insert(&key, format!("value-{i}")).unwrap();
}
let child = tree.create(TreeParentId::Node(root)).unwrap();
tree.get_meta(child)
.unwrap()
.insert("name", "child")
.unwrap();
let second_json = src.export_json_updates(&first_vv, &src.oplog_vv(), false);
let dst = LoroDoc::new();
dst.import_json_updates(first_json).unwrap();
let vv_before_import = dst.oplog_vv();
let frontiers_before_import = dst.oplog_frontiers();
let state_before_import = dst.get_deep_value();
for _ in 0..2 {
crate::state::fail_next_import_state_apply_for_test();
let err = dst.import_json_updates(second_json.clone()).unwrap_err();
assert!(err.to_string().contains("state apply failpoint"));
assert_eq!(dst.oplog_vv(), vv_before_import);
assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
assert_eq!(dst.get_deep_value(), state_before_import);
}
dst.import_json_updates(second_json).unwrap();
assert_eq!(dst.oplog_vv(), src.oplog_vv());
assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
assert_eq!(dst.get_deep_value(), src.get_deep_value());
}
#[test]
fn malformed_later_import_json_update_rolls_back_after_valid_prefix_enters_oplog() {
let peer = 13;
let (src, good_json) = make_json_list_update_with_four_ops(peer);
let mut bad_json = good_json.clone();
move_last_list_insert_far_out_of_bounds(&mut bad_json);
let good_dst = LoroDoc::new();
good_dst.import_json_updates(good_json.clone()).unwrap();
assert_eq!(good_dst.get_deep_value(), src.get_deep_value());
let last_op_counter = good_json.changes[0].ops.last().unwrap().counter;
let prefix_vv = VersionVector::from_iter([(peer, last_op_counter)]);
let prefix_json = src.export_json_updates(&Default::default(), &prefix_vv, false);
assert_eq!(
prefix_json.changes[0].ops.len(),
good_json.changes[0].ops.len() - 1
);
let good_suffix_json = src.export_json_updates(&prefix_vv, &src.oplog_vv(), false);
assert_eq!(good_suffix_json.changes[0].ops.len(), 1);
let mut bad_suffix_json = good_suffix_json.clone();
move_last_list_insert_far_out_of_bounds(&mut bad_suffix_json);
let prefix_dst = LoroDoc::new();
prefix_dst.import_json_updates(prefix_json.clone()).unwrap();
let vv_before_bad_suffix = prefix_dst.oplog_vv();
let frontiers_before_bad_suffix = prefix_dst.oplog_frontiers();
let state_before_bad_suffix = prefix_dst.get_deep_value();
let bad_suffix_json = serde_json::to_string(&bad_suffix_json).unwrap();
let err = prefix_dst
.import_json_updates(&bad_suffix_json)
.unwrap_err();
assert!(
err.to_string().contains("list diff"),
"expected state list bounds validation, got {err:?}"
);
assert_eq!(prefix_dst.oplog_vv(), vv_before_bad_suffix);
assert_eq!(prefix_dst.oplog_frontiers(), frontiers_before_bad_suffix);
assert_eq!(prefix_dst.get_deep_value(), state_before_bad_suffix);
prefix_dst.import_json_updates(good_suffix_json).unwrap();
assert_eq!(prefix_dst.get_deep_value(), src.get_deep_value());
assert_eq!(prefix_dst.oplog_vv(), src.oplog_vv());
let dst = LoroDoc::new();
let vv_before_import = dst.oplog_vv();
let frontiers_before_import = dst.oplog_frontiers();
let state_before_import = dst.get_deep_value();
let bad_json = serde_json::to_string(&bad_json).unwrap();
let err = dst.import_json_updates(&bad_json).unwrap_err();
assert!(
err.to_string().contains("list diff"),
"expected state list bounds validation, got {err:?}"
);
assert_eq!(dst.oplog_vv(), vv_before_import);
assert_eq!(dst.oplog_frontiers(), frontiers_before_import);
assert_eq!(dst.get_deep_value(), state_before_import);
assert!(dst.oplog().lock().is_empty());
}
#[test]
fn failed_import_restores_pending_changes_that_were_applied_during_import() {
let src = LoroDoc::new();
src.set_peer_id(14).unwrap();
let text = src.get_text("text");
let mut txn = src.txn().unwrap();
text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
.unwrap();
txn.commit().unwrap();
let first_update = src.export(ExportMode::all_updates()).unwrap();
let first_vv = src.oplog_vv();
let mut txn = src.txn().unwrap();
text.insert_with_txn(&mut txn, 1, "b", PosType::Unicode)
.unwrap();
txn.commit().unwrap();
let second_update = src.export(ExportMode::updates(&first_vv)).unwrap();
let dst = LoroDoc::new();
let status = dst.import(&second_update).unwrap();
assert!(status.success.is_empty());
assert!(status.pending.is_some());
let vv_before_dependency = dst.oplog_vv();
let frontiers_before_dependency = dst.oplog_frontiers();
let state_before_dependency = dst.get_deep_value();
crate::state::fail_next_import_state_apply_for_test();
let err = dst.import(&first_update).unwrap_err();
assert!(err.to_string().contains("state apply failpoint"));
assert_eq!(dst.oplog_vv(), vv_before_dependency);
assert_eq!(dst.oplog_frontiers(), frontiers_before_dependency);
assert_eq!(dst.get_deep_value(), state_before_dependency);
dst.import(&first_update).unwrap();
assert_eq!(dst.oplog_vv(), src.oplog_vv());
assert_eq!(dst.oplog_frontiers(), src.oplog_frontiers());
assert_eq!(dst.get_deep_value(), src.get_deep_value());
}
#[test]
fn failed_import_json_updates_does_not_emit_or_leave_events() {
let (src, good_json) = make_json_list_update_with_four_ops(15);
let mut bad_json = good_json.clone();
move_last_list_insert_far_out_of_bounds(&mut bad_json);
let dst = LoroDoc::new();
let event_count = Arc::new(AtomicUsize::new(0));
let event_count_cloned = event_count.clone();
let _sub = dst.subscribe_root(Arc::new(move |_| {
event_count_cloned.fetch_add(1, Ordering::SeqCst);
}));
let bad_json = serde_json::to_string(&bad_json).unwrap();
let err = dst.import_json_updates(&bad_json).unwrap_err();
assert!(
err.to_string().contains("list diff"),
"expected state list bounds validation, got {err:?}"
);
assert_eq!(event_count.load(Ordering::SeqCst), 0);
assert!(dst.drop_pending_events().is_empty());
assert!(dst.oplog().lock().is_empty());
dst.import_json_updates(good_json).unwrap();
assert_eq!(event_count.load(Ordering::SeqCst), 1);
assert_eq!(dst.get_deep_value(), src.get_deep_value());
}
#[test]
fn test_checkout() {
let loro = LoroDoc::new();
loro.set_peer_id(1).unwrap();
let text = loro.get_text("text");
let map = loro.get_map("map");
let list = loro.get_list("list");
let mut txn = loro.txn().unwrap();
for i in 0..10 {
map.insert_with_txn(&mut txn, "key", i.into()).unwrap();
text.insert_with_txn(&mut txn, 0, &i.to_string(), PosType::Unicode)
.unwrap();
list.insert_with_txn(&mut txn, 0, i.into()).unwrap();
}
txn.commit().unwrap();
let b = LoroDoc::new();
b.import(&loro.export(ExportMode::Snapshot).unwrap())
.unwrap();
loro.checkout(&Frontiers::default()).unwrap();
{
let json = &loro.get_deep_value();
assert_eq!(
json.to_json_value(),
serde_json::json!({"text":"","list":[],"map":{}})
);
}
b.checkout(&ID::new(1, 2).into()).unwrap();
{
let json = &b.get_deep_value();
assert_eq!(
json.to_json_value(),
serde_json::json!({"text":"0","list":[0],"map":{"key":0}})
);
}
loro.checkout(&ID::new(1, 3).into()).unwrap();
{
let json = &loro.get_deep_value();
assert_eq!(
json.to_json_value(),
serde_json::json!({"text":"0","list":[0],"map":{"key":1}})
);
}
b.checkout(&ID::new(1, 29).into()).unwrap();
{
let json = &b.get_deep_value();
assert_eq!(
json.to_json_value(),
serde_json::json!({"text":"9876543210","list":[9,8,7,6,5,4,3,2,1,0],"map":{"key":9}})
);
}
}
#[test]
fn import_batch_err_181() {
let a = LoroDoc::new_auto_commit();
let update_a = a.export(ExportMode::Snapshot);
let b = LoroDoc::new_auto_commit();
b.import_batch(&[update_a.unwrap()]).unwrap();
b.get_text("text")
.insert(0, "hello", PosType::Unicode)
.unwrap();
b.commit_then_renew();
let oplog = b.oplog().lock();
drop(oplog);
b.export(ExportMode::all_updates()).unwrap();
}
#[test]
fn poisoned_mutex_keeps_follow_up_operations_failed() {
let doc = LoroDoc::new();
let oplog = doc.oplog.clone();
let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
let _guard = oplog.lock();
panic!("poison oplog");
}));
let err = std::panic::catch_unwind(AssertUnwindSafe(|| doc.oplog_vv()))
.expect_err("poisoned lock should continue to fail fast");
let msg = if let Some(msg) = err.downcast_ref::<&str>() {
(*msg).to_string()
} else if let Some(msg) = err.downcast_ref::<String>() {
msg.clone()
} else {
String::new()
};
assert!(msg.contains("poisoned LoroMutex"), "{msg}");
}
}