pub(crate) mod dag;
mod pending_changes;
use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::Ordering;
use std::mem::take;
use std::rc::Rc;
use std::sync::Mutex;
use fxhash::FxHashMap;
use rle::{HasLength, RleCollection, RlePush, RleVec, Sliceable};
use smallvec::SmallVec;
use crate::change::{Change, Lamport, Timestamp};
use crate::container::list::list_op;
use crate::dag::DagUtils;
use crate::diff_calc::tree::MoveLamportAndID;
use crate::diff_calc::TreeDiffCache;
use crate::encoding::RemoteClientChanges;
use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
use crate::id::{Counter, PeerID, ID};
use crate::op::{ListSlice, RawOpContent, RemoteOp};
use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
type ClientChanges = FxHashMap<PeerID, Vec<Change>>;
use self::pending_changes::PendingChanges;
use super::arena::SharedArena;
pub struct OpLog {
pub(crate) dag: AppDag,
pub(crate) arena: SharedArena,
changes: ClientChanges,
pub(crate) next_lamport: Lamport,
pub(crate) latest_timestamp: Timestamp,
pub(crate) pending_changes: PendingChanges,
pub(crate) batch_importing: bool,
pub(crate) tree_parent_cache: Mutex<TreeDiffCache>,
}
#[derive(Debug, Clone, Default)]
pub struct AppDag {
pub(crate) map: FxHashMap<PeerID, Vec<AppDagNode>>,
pub(crate) frontiers: Frontiers,
pub(crate) vv: VersionVector,
}
#[derive(Debug, Clone)]
pub struct AppDagNode {
pub(crate) peer: PeerID,
pub(crate) cnt: Counter,
pub(crate) lamport: Lamport,
pub(crate) deps: Frontiers,
pub(crate) vv: ImVersionVector,
pub(crate) has_succ: bool,
pub(crate) len: usize,
}
impl Clone for OpLog {
fn clone(&self) -> Self {
Self {
dag: self.dag.clone(),
arena: Default::default(),
changes: self.changes.clone(),
next_lamport: self.next_lamport,
latest_timestamp: self.latest_timestamp,
pending_changes: Default::default(),
batch_importing: false,
tree_parent_cache: Default::default(),
}
}
}
impl AppDag {
pub fn get_mut(&mut self, id: ID) -> Option<&mut AppDagNode> {
let ID {
peer: client_id,
counter,
} = id;
self.map.get_mut(&client_id).and_then(|rle| {
if counter >= rle.sum_atom_len() {
return None;
}
let index = rle.search_atom_index(counter);
Some(&mut rle[index])
})
}
pub(crate) fn refresh_frontiers(&mut self) {
self.frontiers = self
.map
.iter()
.filter(|(_, vec)| {
if vec.is_empty() {
return false;
}
!vec.last().unwrap().has_succ
})
.map(|(peer, vec)| ID::new(*peer, vec.last().unwrap().ctr_last()))
.collect();
}
pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
for dep in change.deps.iter() {
match self.get_lamport(dep) {
Some(lamport) => {
change.lamport = change.lamport.max(lamport + 1);
}
None => return Err(()),
}
}
Ok(())
}
}
impl std::fmt::Debug for OpLog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpLog")
.field("dag", &self.dag)
.field("changes", &self.changes)
.field("pending_changes", &self.pending_changes)
.field("next_lamport", &self.next_lamport)
.field("latest_timestamp", &self.latest_timestamp)
.finish()
}
}
pub(crate) struct EnsureChangeDepsAreAtTheEnd;
impl OpLog {
pub fn new() -> Self {
Self {
dag: AppDag::default(),
arena: Default::default(),
changes: ClientChanges::default(),
next_lamport: 0,
latest_timestamp: Timestamp::default(),
pending_changes: Default::default(),
batch_importing: false,
tree_parent_cache: Default::default(),
}
}
pub fn new_with_arena(arena: SharedArena) -> Self {
Self {
dag: AppDag::default(),
arena,
next_lamport: 0,
..Default::default()
}
}
pub fn latest_timestamp(&self) -> Timestamp {
self.latest_timestamp
}
pub fn dag(&self) -> &AppDag {
&self.dag
}
pub fn get_timestamp_of_version(&self, f: &Frontiers) -> Timestamp {
let mut timestamp = Timestamp::default();
for id in f.iter() {
if let Some(change) = self.lookup_change(*id) {
timestamp = timestamp.max(change.timestamp);
}
}
timestamp
}
pub fn is_empty(&self) -> bool {
self.dag.map.is_empty() && self.arena.can_import_snapshot()
}
pub fn changes(&self) -> &ClientChanges {
&self.changes
}
pub(crate) fn insert_new_change(&mut self, mut change: Change, _: EnsureChangeDepsAreAtTheEnd) {
let entry = self.changes.entry(change.id.peer).or_default();
match entry.last_mut() {
Some(last) => {
assert_eq!(change.id.counter, last.ctr_end());
let timestamp_change = change.timestamp - last.timestamp;
if !last.has_dependents && change.deps_on_self() && timestamp_change < 1000 {
for op in take(change.ops.vec_mut()) {
last.ops.push(op);
}
} else {
entry.push(change);
}
}
None => {
assert!(change.id.counter == 0);
entry.push(change);
}
}
}
pub fn import_local_change(&mut self, change: Change, from_txn: bool) -> Result<(), LoroError> {
let Some(change) = self.trim_the_known_part_of_change(change) else {
return Ok(());
};
self.check_id_is_not_duplicated(change.id)?;
if let Err(id) = self.check_deps(&change.deps) {
return Err(LoroError::DecodeError(
format!("Missing dep {:?}", id).into_boxed_str(),
));
}
if cfg!(debug_assertions) {
let lamport = self.dag.frontiers_to_next_lamport(&change.deps);
assert_eq!(
lamport, change.lamport,
"{:#?}\nDAG={:#?}",
&change, &self.dag
);
}
self.next_lamport = self.next_lamport.max(change.lamport_end());
self.latest_timestamp = self.latest_timestamp.max(change.timestamp);
self.dag.vv.extend_to_include_last_id(change.id_last());
self.dag.frontiers.retain_non_included(&change.deps);
self.dag.frontiers.filter_peer(change.id.peer);
self.dag.frontiers.push(change.id_last());
let mark = self.insert_dag_node_on_new_change(&change);
let mut tree_cache = self.tree_parent_cache.lock().unwrap();
for op in change.ops().iter() {
if let crate::op::InnerContent::Tree(tree) = op.content {
let diff = op.counter - change.id.counter;
let node = MoveLamportAndID {
lamport: change.lamport + diff as Lamport,
id: ID {
peer: change.id.peer,
counter: op.counter,
},
target: tree.target,
parent: tree.parent,
effected: true,
};
if from_txn {
tree_cache.add_node_uncheck(node);
} else {
tree_cache.add_node(node);
}
}
}
drop(tree_cache);
self.insert_new_change(change, mark);
Ok(())
}
pub(crate) fn insert_dag_node_on_new_change(
&mut self,
change: &Change,
) -> EnsureChangeDepsAreAtTheEnd {
let len = change.content_len();
if change.deps_on_self() {
let nodes = self.dag.map.get_mut(&change.id.peer).unwrap();
let last = nodes.last_mut().unwrap();
assert_eq!(last.peer, change.id.peer);
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
last.has_succ = false;
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
let dag_row = &mut self.dag.map.entry(change.id.peer).or_default();
if change.id.counter > 0 {
assert_eq!(dag_row.last().unwrap().ctr_end(), change.id.counter);
}
dag_row.push_rle_element(AppDagNode {
vv,
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
});
for dep in change.deps.iter() {
self.ensure_dep_on_change_end(change.id.peer, *dep);
let target = self.dag.get_mut(*dep).unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
}
}
}
EnsureChangeDepsAreAtTheEnd
}
fn ensure_dep_on_change_end(&mut self, src: PeerID, dep: ID) {
let changes = self.changes.get_mut(&dep.peer).unwrap();
match changes.binary_search_by(|c| c.ctr_last().cmp(&dep.counter)) {
Ok(index) => {
if src != dep.peer {
changes[index].has_dependents = true;
}
}
Err(index) => {
let change = &mut changes[index];
let offset = (dep.counter - change.id.counter + 1) as usize;
let left = change.slice(0, offset);
let right = change.slice(offset, change.atom_len());
assert_ne!(left.atom_len(), 0);
assert_ne!(right.atom_len(), 0);
*change = left;
changes.insert(index + 1, right);
}
}
}
pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
let Some(changes) = self.changes.get(&change.id.peer) else {
return Some(change);
};
if changes.is_empty() {
return Some(change);
}
let end = changes.last().unwrap().ctr_end();
if change.id.counter >= end {
return Some(change);
}
if change.ctr_end() <= end {
return None;
}
let offset = (end - change.id.counter) as usize;
Some(change.slice(offset, change.atom_len()))
}
fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
let cur_end = self.dag.vv.get(&id.peer).cloned().unwrap_or(0);
if cur_end > id.counter {
return Err(LoroError::UsedOpID { id });
}
Ok(())
}
fn check_deps(&self, deps: &Frontiers) -> Result<(), ID> {
for dep in deps.iter() {
if !self.dag.vv.includes_id(*dep) {
return Err(*dep);
}
}
Ok(())
}
pub fn next_lamport(&self) -> Lamport {
self.next_lamport
}
pub fn next_id(&self, peer: PeerID) -> ID {
let cnt = self.dag.vv.get(&peer).copied().unwrap_or(0);
ID::new(peer, cnt)
}
pub fn get_peer_changes(&self, peer: PeerID) -> Option<&Vec<Change>> {
self.changes.get(&peer)
}
pub(crate) fn vv(&self) -> &VersionVector {
&self.dag.vv
}
pub(crate) fn frontiers(&self) -> &Frontiers {
&self.dag.frontiers
}
pub fn cmp_frontiers(&self, other: &Frontiers) -> Ordering {
self.dag.cmp_frontiers(other)
}
pub(crate) fn export_changes_from(&self, from: &VersionVector) -> RemoteClientChanges {
let mut changes = RemoteClientChanges::default();
for (&peer, &cnt) in self.vv().iter() {
let start_cnt = from.get(&peer).copied().unwrap_or(0);
if cnt <= start_cnt {
continue;
}
let mut temp = Vec::new();
if let Some(peer_changes) = self.changes.get(&peer) {
if let Some(result) = peer_changes.get_by_atom_index(start_cnt) {
for change in &peer_changes[result.merged_index..] {
if change.id.counter < start_cnt {
if change.id.counter + change.atom_len() as Counter <= start_cnt {
continue;
}
let sliced = change
.slice((start_cnt - change.id.counter) as usize, change.atom_len());
temp.push(self.convert_change_to_remote(&sliced));
} else {
temp.push(self.convert_change_to_remote(change));
}
}
}
}
if !temp.is_empty() {
changes.insert(peer, temp);
}
}
changes
}
pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {
self.get_change_at(id).map(|c| c.lamport).unwrap_or(0)
}
pub(crate) fn get_max_lamport_at(&self, id: ID) -> Lamport {
self.get_change_at(id)
.map(|c| {
let change_counter = c.id.counter as u32;
c.lamport + c.ops().last().map(|op| op.counter).unwrap_or(0) as u32 - change_counter
})
.unwrap_or(Lamport::MAX)
}
pub fn get_change_at(&self, id: ID) -> Option<&Change> {
if let Some(peer_changes) = self.changes.get(&id.peer) {
if let Some(result) = peer_changes.get_by_atom_index(id.counter) {
return Some(&peer_changes[result.merged_index]);
}
}
None
}
pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp>> {
let change = self.get_change_at(id)?;
Some(self.convert_change_to_remote(change))
}
fn convert_change_to_remote(&self, change: &Change) -> Change<RemoteOp> {
let mut ops = RleVec::new();
for op in change.ops.iter() {
for op in self.local_op_to_remote(op) {
ops.push(op);
}
}
Change {
ops,
id: change.id,
deps: change.deps.clone(),
lamport: change.lamport,
timestamp: change.timestamp,
has_dependents: false,
}
}
pub(crate) fn local_op_to_remote(&self, op: &crate::op::Op) -> SmallVec<[RemoteOp<'_>; 1]> {
let container = self.arena.get_container_id(op.container).unwrap();
let mut contents: SmallVec<[_; 1]> = SmallVec::new();
match &op.content {
crate::op::InnerContent::List(list) => match list {
list_op::InnerListOp::Insert { slice, pos } => match container.container_type() {
loro_common::ContainerType::Text => {
let str = self.arena.slice_str_by_unicode_range(
slice.0.start as usize..slice.0.end as usize,
);
contents.push(RawOpContent::List(list_op::ListOp::Insert {
slice: ListSlice::RawStr {
unicode_len: str.chars().count(),
str: Cow::Owned(str),
},
pos: *pos,
}));
}
loro_common::ContainerType::List => {
contents.push(RawOpContent::List(list_op::ListOp::Insert {
slice: ListSlice::RawData(Cow::Owned(
self.arena
.get_values(slice.0.start as usize..slice.0.end as usize),
)),
pos: *pos,
}))
}
loro_common::ContainerType::Map => unreachable!(),
loro_common::ContainerType::Tree => unreachable!(),
},
list_op::InnerListOp::InsertText {
slice,
unicode_len: len,
unicode_start: _,
pos,
} => match container.container_type() {
loro_common::ContainerType::Text => {
contents.push(RawOpContent::List(list_op::ListOp::Insert {
slice: ListSlice::RawStr {
unicode_len: *len as usize,
str: Cow::Owned(std::str::from_utf8(slice).unwrap().to_owned()),
},
pos: *pos as usize,
}));
}
loro_common::ContainerType::List
| loro_common::ContainerType::Map
| loro_common::ContainerType::Tree => {
unreachable!()
}
},
list_op::InnerListOp::Delete(del) => {
contents.push(RawOpContent::List(list_op::ListOp::Delete(*del)))
}
list_op::InnerListOp::StyleStart {
start,
end,
key,
value,
info,
} => contents.push(RawOpContent::List(list_op::ListOp::StyleStart {
start: *start,
end: *end,
key: key.clone(),
value: value.clone(),
info: *info,
})),
list_op::InnerListOp::StyleEnd => {
contents.push(RawOpContent::List(list_op::ListOp::StyleEnd))
}
},
crate::op::InnerContent::Map(map) => {
let value = map.value.and_then(|v| self.arena.get_value(v as usize));
contents.push(RawOpContent::Map(crate::container::map::MapSet {
key: map.key.clone(),
value,
}))
}
crate::op::InnerContent::Tree(tree) => contents.push(RawOpContent::Tree(*tree)),
};
let mut ans = SmallVec::with_capacity(contents.len());
for content in contents {
ans.push(RemoteOp {
container: container.clone(),
content,
counter: op.counter,
})
}
ans
}
pub(crate) fn import_remote_changes(
&mut self,
remote_changes: RemoteClientChanges,
) -> Result<(), LoroError> {
self.check_changes(&remote_changes)?;
let latest_vv = self.dag.vv.clone();
let ids = self.arena.clone().with_op_converter(|converter| {
self.apply_appliable_changes_and_cache_pending(remote_changes, converter, latest_vv)
});
let mut latest_vv = self.dag.vv.clone();
self.try_apply_pending(ids, &mut latest_vv);
if !self.batch_importing {
self.dag.refresh_frontiers();
}
Ok(())
}
pub(crate) fn import_unknown_lamport_remote_changes(
&mut self,
remote_changes: Vec<Change<RemoteOp>>,
) -> Result<(), LoroError> {
let latest_vv = self.dag.vv.clone();
self.arena.clone().with_op_converter(|converter| {
self.extend_pending_changes_with_unknown_lamport(remote_changes, converter, &latest_vv)
});
Ok(())
}
pub(crate) fn lookup_change(&self, id: ID) -> Option<&Change> {
self.changes.get(&id.peer).and_then(|changes| {
if id.counter <= changes.last().unwrap().id_last().counter {
Some(changes.get_by_atom_index(id.counter).unwrap().element)
} else {
None
}
})
}
#[allow(unused)]
pub(crate) fn lookup_op(&self, id: ID) -> Option<&crate::op::Op> {
self.lookup_change(id)
.and_then(|change| change.ops.get_by_atom_index(id.counter).map(|x| x.element))
}
#[inline(always)]
pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
encode_oplog(self, vv, EncodeMode::Auto)
}
#[inline(always)]
pub fn decode(&mut self, data: &[u8]) -> Result<(), LoroError> {
decode_oplog(self, data)
}
pub(crate) fn for_each_change_within(
&self,
a: &VersionVector,
b: &VersionVector,
mut f: impl FnMut(&Change),
) {
for (peer, changes) in self.changes.iter() {
let mut from_cnt = a.get(peer).copied().unwrap_or(0);
let mut to_cnt = b.get(peer).copied().unwrap_or(0);
if from_cnt == to_cnt {
continue;
}
if to_cnt < from_cnt {
std::mem::swap(&mut from_cnt, &mut to_cnt);
}
let Some(result) = changes.get_by_atom_index(from_cnt) else {
continue;
};
for change in &changes[result.merged_index..changes.len()] {
if change.id.counter >= to_cnt {
break;
}
f(change)
}
}
}
pub(crate) fn iter_from_lca_causally(
&self,
from: &VersionVector,
from_frontiers: Option<&Frontiers>,
to: &VersionVector,
to_frontiers: Option<&Frontiers>,
) -> (
VersionVector,
impl Iterator<Item = (&Change, Counter, Rc<RefCell<VersionVector>>)>,
) {
debug_log::group!("iter_from_lca_causally");
let mut merged_vv = from.clone();
merged_vv.merge(to);
let from_frontiers_inner;
let to_frontiers_inner;
let from_frontiers = match from_frontiers {
Some(f) => f,
None => {
from_frontiers_inner = Some(from.to_frontiers(&self.dag));
from_frontiers_inner.as_ref().unwrap()
}
};
let to_frontiers = match to_frontiers {
Some(t) => t,
None => {
to_frontiers_inner = Some(to.to_frontiers(&self.dag));
to_frontiers_inner.as_ref().unwrap()
}
};
let common_ancestors = self.dag.find_common_ancestor(from_frontiers, to_frontiers);
let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
let diff = common_ancestors_vv.diff(&merged_vv).right;
let mut iter = self.dag.iter_causal(&common_ancestors, diff);
let mut node = iter.next();
let mut cur_cnt = 0;
let vv = Rc::new(RefCell::new(VersionVector::default()));
(
common_ancestors_vv.clone(),
std::iter::from_fn(move || {
if let Some(inner) = &node {
let mut inner_vv = vv.borrow_mut();
inner_vv.clear();
inner_vv.extend_to_include_vv(inner.data.vv.iter());
let peer = inner.data.peer;
let cnt = inner
.data
.cnt
.max(cur_cnt)
.max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
let end = (inner.data.cnt + inner.data.len as Counter)
.min(merged_vv.get(&peer).copied().unwrap_or(0));
let change = self
.changes
.get(&peer)
.and_then(|x| x.get_by_atom_index(cnt).map(|x| x.element))
.unwrap();
if change.ctr_end() < end {
cur_cnt = change.ctr_end();
} else {
node = iter.next();
cur_cnt = 0;
}
inner_vv.extend_to_include_end_id(change.id);
Some((change, cnt, vv.clone()))
} else {
debug_log::group_end!();
None
}
}),
)
}
pub(crate) fn iter_causally(
&self,
from: VersionVector,
to: VersionVector,
) -> impl Iterator<Item = (&Change, Rc<RefCell<VersionVector>>)> {
let from_frontiers = from.to_frontiers(&self.dag);
let diff = from.diff(&to).right;
let mut iter = self.dag.iter_causal(&from_frontiers, diff);
let mut node = iter.next();
let mut cur_cnt = 0;
let vv = Rc::new(RefCell::new(VersionVector::default()));
std::iter::from_fn(move || {
if let Some(inner) = &node {
let mut inner_vv = vv.borrow_mut();
inner_vv.clear();
inner_vv.extend_to_include_vv(inner.data.vv.iter());
let peer = inner.data.peer;
let cnt = inner
.data
.cnt
.max(cur_cnt)
.max(from.get(&peer).copied().unwrap_or(0));
let end = (inner.data.cnt + inner.data.len as Counter)
.min(to.get(&peer).copied().unwrap_or(0));
let change = self
.changes
.get(&peer)
.and_then(|x| x.get_by_atom_index(cnt).map(|x| x.element))
.unwrap();
if change.ctr_end() < end {
cur_cnt = change.ctr_end();
} else {
node = iter.next();
cur_cnt = 0;
}
inner_vv.extend_to_include_end_id(change.id);
Some((change, vv.clone()))
} else {
None
}
})
}
pub(crate) fn len_changes(&self) -> usize {
self.changes.values().map(|x| x.len()).sum()
}
pub fn diagnose_size(&self) -> SizeInfo {
let mut total_changes = 0;
let mut total_ops = 0;
let mut total_atom_ops = 0;
let total_dag_node = self.dag.map.len();
for changes in self.changes.values() {
total_changes += changes.len();
for change in changes.iter() {
total_ops += change.ops.len();
total_atom_ops += change.atom_len();
}
}
println!("total changes: {}", total_changes);
println!("total ops: {}", total_ops);
println!("total atom ops: {}", total_atom_ops);
println!("total dag node: {}", total_dag_node);
SizeInfo {
total_changes,
total_ops,
total_atom_ops,
total_dag_node,
}
}
}
#[derive(Debug)]
pub struct SizeInfo {
pub total_changes: usize,
pub total_ops: usize,
pub total_atom_ops: usize,
pub total_dag_node: usize,
}
impl Default for OpLog {
fn default() -> Self {
Self::new()
}
}