use std::{collections::BTreeMap, ops::Deref};
use crate::{
change::Change,
version::{ImVersionVector, VersionRange},
OpLog, VersionVector,
};
use loro_common::{
ContainerType, Counter, CounterSpan, HasCounterSpan, HasIdSpan, LoroResult, PeerID, ID,
};
use rustc_hash::FxHashMap;
#[derive(Debug, Clone)]
pub enum PendingChange {
Unknown(Change),
#[allow(unused)]
Known(Change),
}
impl Deref for PendingChange {
type Target = Change;
fn deref(&self) -> &Self::Target {
match self {
Self::Unknown(a) => a,
Self::Known(a) => a,
}
}
}
#[derive(Debug, Default)]
pub(crate) struct PendingChanges {
changes: FxHashMap<PeerID, BTreeMap<Counter, Vec<PendingChange>>>,
}
impl PendingChanges {
pub(crate) fn has_state_apply_rollback_ops(&self) -> bool {
self.changes.values().any(|tree| {
tree.values().any(|changes| {
changes.iter().any(|change| {
change.ops.iter().any(|op| {
matches!(
op.container.get_type(),
ContainerType::List | ContainerType::Tree
)
})
})
})
})
}
pub(crate) fn version_range(&self) -> VersionRange {
let mut range = VersionRange::default();
for tree in self.changes.values() {
for pending_changes in tree.values() {
for pending_change in pending_changes {
range.extends_to_include_id_span(pending_change.id_span());
}
}
}
range
}
}
#[cfg(test)]
#[allow(dead_code)]
impl PendingChanges {
pub(crate) fn len(&self) -> usize {
self.changes
.values()
.map(|tree| tree.values().map(Vec::len).sum::<usize>())
.sum()
}
}
#[derive(Debug, Default)]
pub(crate) struct PendingChangesRollback {
added: Vec<(PeerID, Counter)>,
removed: Vec<(PeerID, Counter, Vec<PendingChange>)>,
}
impl PendingChangesRollback {
fn record_added(&mut self, id: ID) {
self.added.push((id.peer, id.counter));
}
fn record_removed(&mut self, peer: PeerID, counter: Counter, changes: Vec<PendingChange>) {
self.removed.push((peer, counter, changes));
}
pub(crate) fn rollback(self, pending_changes: &mut PendingChanges) {
for (peer, counter) in self.added.into_iter().rev() {
let Some(tree) = pending_changes.changes.get_mut(&peer) else {
continue;
};
let Some(changes) = tree.get_mut(&counter) else {
continue;
};
changes.pop();
if changes.is_empty() {
tree.remove(&counter);
}
if tree.is_empty() {
pending_changes.changes.remove(&peer);
}
}
for (peer, counter, changes) in self.removed.into_iter().rev() {
pending_changes
.changes
.entry(peer)
.or_default()
.insert(counter, changes);
}
}
}
impl OpLog {
fn push_pending_change(&mut self, missing_dep: ID, change: PendingChange) {
if let Some(rollback) = self.import_rollback.as_mut() {
rollback.pending.record_added(missing_dep);
}
self.pending_changes
.changes
.entry(missing_dep.peer)
.or_default()
.entry(missing_dep.counter)
.or_default()
.push(change);
}
pub(super) fn extend_pending_changes_with_unknown_lamport(
&mut self,
remote_changes: Vec<Change>,
) -> LoroResult<()> {
for change in remote_changes {
let local_change = PendingChange::Unknown(change);
match remote_change_apply_state(self.vv(), self.shallow_since_vv(), &local_change) {
ChangeState::AwaitingMissingDependency(miss_dep) => {
self.push_pending_change(miss_dep, local_change)
}
ChangeState::Applied => unreachable!("already applied"),
ChangeState::CanApplyDirectly => unreachable!("can apply directly"),
}
}
Ok(())
}
}
impl OpLog {
pub(crate) fn try_apply_pending(
&mut self,
mut new_ids: Vec<ID>,
mut would_affect: Option<&mut VersionRange>,
) {
while let Some(id) = new_ids.pop() {
let Some(tree) = self.pending_changes.changes.get_mut(&id.peer) else {
continue;
};
let mut to_remove = Vec::new();
for (cnt, _) in tree.range_mut(0..=id.counter) {
to_remove.push(*cnt);
}
let mut pending_set = Vec::with_capacity(to_remove.len());
for cnt in to_remove {
let pending_changes = tree.remove(&cnt).unwrap();
if let Some(rollback) = self.import_rollback.as_mut() {
rollback
.pending
.record_removed(id.peer, cnt, pending_changes.clone());
}
pending_set.push(pending_changes);
}
if tree.is_empty() {
self.pending_changes.changes.remove(&id.peer);
}
for pending_changes in pending_set {
for pending_change in pending_changes {
match remote_change_apply_state(
self.dag.vv(),
self.dag.shallow_since_vv(),
&pending_change,
) {
ChangeState::CanApplyDirectly => {
new_ids.push(pending_change.id_last());
self.apply_change_from_remote(
pending_change,
would_affect.as_deref_mut(),
);
}
ChangeState::Applied => {}
ChangeState::AwaitingMissingDependency(miss_dep) => {
self.push_pending_change(miss_dep, pending_change)
}
}
}
}
}
}
pub(super) fn apply_change_from_remote(
&mut self,
change: PendingChange,
would_affect: Option<&mut VersionRange>,
) {
let change = match change {
PendingChange::Known(mut c) => {
self.dag.calc_unknown_lamport_change(&mut c).unwrap();
c
}
PendingChange::Unknown(mut c) => {
self.dag.calc_unknown_lamport_change(&mut c).unwrap();
c
}
};
let Some(change) = self.trim_the_known_part_of_change(change) else {
return;
};
if let Some(w) = would_affect {
w.extends_to_include_id_span(change.id_span());
}
self.insert_new_change(change, false);
}
}
enum ChangeState {
Applied,
CanApplyDirectly,
AwaitingMissingDependency(ID),
}
fn remote_change_apply_state(
vv: &VersionVector,
_shallow_vv: &ImVersionVector,
change: &Change,
) -> ChangeState {
let peer = change.id.peer;
let CounterSpan { start, end } = change.ctr_span();
let vv_latest_ctr = vv.get(&peer).copied().unwrap_or(0);
if vv_latest_ctr >= end {
return ChangeState::Applied;
}
if vv_latest_ctr < start {
return ChangeState::AwaitingMissingDependency(change.id.inc(-1));
}
for dep in change.deps.iter() {
let dep_vv_latest_ctr = vv.get(&dep.peer).copied().unwrap_or(0);
if dep_vv_latest_ctr - 1 < dep.counter {
return ChangeState::AwaitingMissingDependency(dep);
}
}
ChangeState::CanApplyDirectly
}
#[cfg(test)]
mod test {
use crate::{cursor::PosType, loro::ExportMode, LoroDoc, ToJson, VersionVector};
#[test]
fn import_pending() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let text_a = a.get_text("text");
text_a.insert(0, "a", PosType::Unicode).unwrap();
let update1 = a
.export(ExportMode::updates(&VersionVector::default()))
.unwrap();
let version1 = a.oplog_vv();
text_a.insert(0, "b", PosType::Unicode).unwrap();
let update2 = a.export(ExportMode::updates(&version1)).unwrap();
let version2 = a.oplog_vv();
text_a.insert(0, "c", PosType::Unicode).unwrap();
let update3 = a.export(ExportMode::updates(&version2)).unwrap();
let version3 = a.oplog_vv();
text_a.insert(0, "d", PosType::Unicode).unwrap();
let update4 = a.export(ExportMode::updates(&version3)).unwrap();
text_a.insert(0, "e", PosType::Unicode).unwrap();
let update3_5 = a.export(ExportMode::updates(&version2)).unwrap();
b.import(&update3_5).unwrap();
b.import(&update4).unwrap();
b.import(&update2).unwrap();
b.import(&update3).unwrap();
b.import(&update1).unwrap();
assert_eq!(a.get_deep_value(), b.get_deep_value());
}
#[test]
fn pending_import_snapshot() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let text_a = a.get_text("text");
text_a.insert(0, "a", PosType::Unicode).unwrap();
let update1 = a.export(ExportMode::Snapshot).unwrap();
let version1 = a.oplog_vv();
text_a.insert(0, "b", PosType::Unicode).unwrap();
let update2 = a.export(ExportMode::updates(&version1)).unwrap();
let _version2 = a.oplog_vv();
b.import(&update2).unwrap();
b.import(&update1).unwrap();
assert_eq!(a.get_deep_value(), b.get_deep_value());
}
#[test]
fn need_deps_pending_import() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let c = LoroDoc::new_auto_commit();
c.set_peer_id(3).unwrap();
let d = LoroDoc::new_auto_commit();
d.set_peer_id(4).unwrap();
let text_a = a.get_text("text");
let text_b = b.get_text("text");
text_a.insert(0, "a", PosType::Unicode).unwrap();
let version_a1 = a.oplog_vv();
let update_a1 = a
.export(ExportMode::updates(&VersionVector::default()))
.unwrap();
b.import(&update_a1).unwrap();
text_b.insert(1, "b", PosType::Unicode).unwrap();
let update_b1 = b.export(ExportMode::updates(&version_a1)).unwrap();
a.import(&update_b1).unwrap();
let version_a1b1 = a.oplog_vv();
text_a.insert(2, "c", PosType::Unicode).unwrap();
let update_a2 = a.export(ExportMode::updates(&version_a1b1)).unwrap();
c.import(&update_a2).unwrap();
assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}");
c.import(&update_a1).unwrap();
assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"a\"}");
c.import(&update_b1).unwrap();
assert_eq!(a.get_deep_value(), c.get_deep_value());
d.import(&update_a2).unwrap();
assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}");
d.import(&update_b1).unwrap();
assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}");
d.import(&update_a1).unwrap();
assert_eq!(a.get_deep_value(), d.get_deep_value());
}
#[test]
fn should_activate_pending_change_when() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let c = LoroDoc::new_auto_commit();
c.set_peer_id(3).unwrap();
let text_a = a.get_text("text");
let text_b = b.get_text("text");
text_a.insert(0, "1", PosType::Unicode).unwrap();
b.import(&a.export(ExportMode::Snapshot).unwrap()).unwrap();
text_b.insert(0, "1", PosType::Unicode).unwrap();
let b_change = b.export(ExportMode::updates(&a.oplog_vv())).unwrap();
text_a.insert(0, "1", PosType::Unicode).unwrap();
c.import(&b_change).unwrap();
c.import(&a.export(ExportMode::Snapshot).unwrap()).unwrap();
a.import(&b_change).unwrap();
assert_eq!(c.get_deep_value(), a.get_deep_value());
}
}