use std::{collections::BTreeMap, ops::Deref};
use crate::{
change::Change,
version::{ImVersionVector, VersionRange},
OpLog, VersionVector,
};
use loro_common::{Counter, CounterSpan, HasCounterSpan, HasIdSpan, LoroResult, PeerID, ID};
use rustc_hash::FxHashMap;
#[derive(Debug)]
pub enum PendingChange {
Unknown(Change),
#[allow(unused)]
Known(Change),
}
impl Deref for PendingChange {
type Target = Change;
fn deref(&self) -> &Self::Target {
match self {
Self::Unknown(a) => a,
Self::Known(a) => a,
}
}
}
#[derive(Debug, Default)]
pub(crate) struct PendingChanges {
changes: FxHashMap<PeerID, BTreeMap<Counter, Vec<PendingChange>>>,
}
impl OpLog {
pub(super) fn extend_pending_changes_with_unknown_lamport(
&mut self,
remote_changes: Vec<Change>,
) -> LoroResult<()> {
for change in remote_changes {
let local_change = PendingChange::Unknown(change);
match remote_change_apply_state(self.vv(), self.shallow_since_vv(), &local_change) {
ChangeState::AwaitingMissingDependency(miss_dep) => self
.pending_changes
.changes
.entry(miss_dep.peer)
.or_default()
.entry(miss_dep.counter)
.or_default()
.push(local_change),
ChangeState::Applied => unreachable!("already applied"),
ChangeState::CanApplyDirectly => unreachable!("can apply directly"),
}
}
Ok(())
}
}
impl OpLog {
pub(crate) fn try_apply_pending(
&mut self,
mut new_ids: Vec<ID>,
mut would_affect: Option<&mut VersionRange>,
) {
while let Some(id) = new_ids.pop() {
let Some(tree) = self.pending_changes.changes.get_mut(&id.peer) else {
continue;
};
let mut to_remove = Vec::new();
for (cnt, _) in tree.range_mut(0..=id.counter) {
to_remove.push(*cnt);
}
let mut pending_set = Vec::with_capacity(to_remove.len());
for cnt in to_remove {
pending_set.push(tree.remove(&cnt).unwrap());
}
if tree.is_empty() {
self.pending_changes.changes.remove(&id.peer);
}
for pending_changes in pending_set {
for pending_change in pending_changes {
match remote_change_apply_state(
self.dag.vv(),
self.dag.shallow_since_vv(),
&pending_change,
) {
ChangeState::CanApplyDirectly => {
new_ids.push(pending_change.id_last());
self.apply_change_from_remote(
pending_change,
would_affect.as_deref_mut(),
);
}
ChangeState::Applied => {}
ChangeState::AwaitingMissingDependency(miss_dep) => self
.pending_changes
.changes
.entry(miss_dep.peer)
.or_default()
.entry(miss_dep.counter)
.or_default()
.push(pending_change),
}
}
}
}
}
pub(super) fn apply_change_from_remote(
&mut self,
change: PendingChange,
would_affect: Option<&mut VersionRange>,
) {
let change = match change {
PendingChange::Known(mut c) => {
self.dag.calc_unknown_lamport_change(&mut c).unwrap();
c
}
PendingChange::Unknown(mut c) => {
self.dag.calc_unknown_lamport_change(&mut c).unwrap();
c
}
};
let Some(change) = self.trim_the_known_part_of_change(change) else {
return;
};
if let Some(w) = would_affect {
w.extends_to_include_id_span(change.id_span());
}
self.insert_new_change(change, false);
}
}
enum ChangeState {
Applied,
CanApplyDirectly,
AwaitingMissingDependency(ID),
}
fn remote_change_apply_state(
vv: &VersionVector,
_shallow_vv: &ImVersionVector,
change: &Change,
) -> ChangeState {
let peer = change.id.peer;
let CounterSpan { start, end } = change.ctr_span();
let vv_latest_ctr = vv.get(&peer).copied().unwrap_or(0);
if vv_latest_ctr >= end {
return ChangeState::Applied;
}
if vv_latest_ctr < start {
return ChangeState::AwaitingMissingDependency(change.id.inc(-1));
}
for dep in change.deps.iter() {
let dep_vv_latest_ctr = vv.get(&dep.peer).copied().unwrap_or(0);
if dep_vv_latest_ctr - 1 < dep.counter {
return ChangeState::AwaitingMissingDependency(dep);
}
}
ChangeState::CanApplyDirectly
}
#[cfg(test)]
mod test {
use crate::{cursor::PosType, loro::ExportMode, LoroDoc, ToJson, VersionVector};
#[test]
fn import_pending() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let text_a = a.get_text("text");
text_a.insert(0, "a", PosType::Unicode).unwrap();
let update1 = a
.export(ExportMode::updates(&VersionVector::default()))
.unwrap();
let version1 = a.oplog_vv();
text_a.insert(0, "b", PosType::Unicode).unwrap();
let update2 = a.export(ExportMode::updates(&version1)).unwrap();
let version2 = a.oplog_vv();
text_a.insert(0, "c", PosType::Unicode).unwrap();
let update3 = a.export(ExportMode::updates(&version2)).unwrap();
let version3 = a.oplog_vv();
text_a.insert(0, "d", PosType::Unicode).unwrap();
let update4 = a.export(ExportMode::updates(&version3)).unwrap();
text_a.insert(0, "e", PosType::Unicode).unwrap();
let update3_5 = a.export(ExportMode::updates(&version2)).unwrap();
b.import(&update3_5).unwrap();
b.import(&update4).unwrap();
b.import(&update2).unwrap();
b.import(&update3).unwrap();
b.import(&update1).unwrap();
assert_eq!(a.get_deep_value(), b.get_deep_value());
}
#[test]
fn pending_import_snapshot() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let text_a = a.get_text("text");
text_a.insert(0, "a", PosType::Unicode).unwrap();
let update1 = a.export(ExportMode::Snapshot).unwrap();
let version1 = a.oplog_vv();
text_a.insert(0, "b", PosType::Unicode).unwrap();
let update2 = a.export(ExportMode::updates(&version1)).unwrap();
let _version2 = a.oplog_vv();
b.import(&update2).unwrap();
b.import(&update1).unwrap();
assert_eq!(a.get_deep_value(), b.get_deep_value());
}
#[test]
fn need_deps_pending_import() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let c = LoroDoc::new_auto_commit();
c.set_peer_id(3).unwrap();
let d = LoroDoc::new_auto_commit();
d.set_peer_id(4).unwrap();
let text_a = a.get_text("text");
let text_b = b.get_text("text");
text_a.insert(0, "a", PosType::Unicode).unwrap();
let version_a1 = a.oplog_vv();
let update_a1 = a
.export(ExportMode::updates(&VersionVector::default()))
.unwrap();
b.import(&update_a1).unwrap();
text_b.insert(1, "b", PosType::Unicode).unwrap();
let update_b1 = b.export(ExportMode::updates(&version_a1)).unwrap();
a.import(&update_b1).unwrap();
let version_a1b1 = a.oplog_vv();
text_a.insert(2, "c", PosType::Unicode).unwrap();
let update_a2 = a.export(ExportMode::updates(&version_a1b1)).unwrap();
c.import(&update_a2).unwrap();
assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}");
c.import(&update_a1).unwrap();
assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"a\"}");
c.import(&update_b1).unwrap();
assert_eq!(a.get_deep_value(), c.get_deep_value());
d.import(&update_a2).unwrap();
assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}");
d.import(&update_b1).unwrap();
assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}");
d.import(&update_a1).unwrap();
assert_eq!(a.get_deep_value(), d.get_deep_value());
}
#[test]
fn should_activate_pending_change_when() {
let a = LoroDoc::new_auto_commit();
a.set_peer_id(1).unwrap();
let b = LoroDoc::new_auto_commit();
b.set_peer_id(2).unwrap();
let c = LoroDoc::new_auto_commit();
c.set_peer_id(3).unwrap();
let text_a = a.get_text("text");
let text_b = b.get_text("text");
text_a.insert(0, "1", PosType::Unicode).unwrap();
b.import(&a.export(ExportMode::Snapshot).unwrap()).unwrap();
text_b.insert(0, "1", PosType::Unicode).unwrap();
let b_change = b.export(ExportMode::updates(&a.oplog_vv())).unwrap();
text_a.insert(0, "1", PosType::Unicode).unwrap();
c.import(&b_change).unwrap();
c.import(&a.export(ExportMode::Snapshot).unwrap()).unwrap();
a.import(&b_change).unwrap();
assert_eq!(c.get_deep_value(), a.get_deep_value());
}
}