extern crate crdts;
use serde::{Deserialize, Serialize};
use std::cmp::{Eq, PartialEq};
use super::{Clock, LogOpMove, OpMove, State, Tree, TreeId, TreeMeta};
use crdts::Actor;
use log::debug;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TreeReplica<ID: TreeId, TM: TreeMeta, A: Actor> {
state: State<ID, TM, A>, time: Clock<A>,
latest_time_by_replica: HashMap<A, Clock<A>>,
}
impl<ID: TreeId, TM: TreeMeta, A: Actor + std::fmt::Debug> TreeReplica<ID, TM, A> {
pub fn new(id: A) -> Self {
Self {
state: State::new(),
time: Clock::<A>::new(id, None),
latest_time_by_replica: HashMap::<A, Clock<A>>::new(),
}
}
pub fn opmove(&self, parent_id: ID, metadata: TM, child_id: ID) -> OpMove<ID, TM, A> {
OpMove::new(self.time.inc(), parent_id, metadata, child_id)
}
pub fn opmoves(&self, ops: Vec<(ID, TM, ID)>) -> Vec<OpMove<ID, TM, A>> {
let mut time = self.time.clone();
let mut opmoves = vec![];
for op in ops {
opmoves.push(OpMove::new(time.tick(), op.0, op.1, op.2));
}
opmoves
}
#[inline]
pub fn id(&self) -> &A {
self.time.actor_id()
}
#[inline]
pub fn time(&self) -> &Clock<A> {
&self.time
}
#[inline]
pub fn state(&self) -> &State<ID, TM, A> {
&self.state
}
#[inline]
pub fn tree(&self) -> &Tree<ID, TM> {
self.state.tree()
}
#[inline]
pub fn tree_mut(&mut self) -> &mut Tree<ID, TM> {
self.state.tree_mut()
}
pub fn apply_op(&mut self, op: OpMove<ID, TM, A>) {
self.time = self.time.merge(op.timestamp());
let id = op.timestamp().actor_id();
match self.latest_time_by_replica.get(id) {
Some(latest) if (op.timestamp() <= latest) => {
debug!(
"Clock not increased, current timestamp {:?}, provided is {:?}, dropping op!",
latest,
op.timestamp()
);
}
_ => {
self.latest_time_by_replica
.insert(op.timestamp().actor_id().clone(), op.timestamp().clone());
}
};
self.state.apply_op(op);
}
pub fn apply_ops(&mut self, ops: Vec<OpMove<ID, TM, A>>) {
for op in ops {
self.apply_op(op);
}
}
pub fn apply_ops_byref(&mut self, ops: &[OpMove<ID, TM, A>]) {
self.apply_ops(ops.to_vec())
}
pub fn apply_log_op(&mut self, log_op: LogOpMove<ID, TM, A>) {
self.apply_op(log_op.into());
}
pub fn apply_log_ops(&mut self, log_ops: Vec<LogOpMove<ID, TM, A>>) {
for log_op in log_ops {
self.apply_log_op(log_op);
}
}
pub fn causally_stable_threshold(&self) -> Option<&Clock<A>> {
let mut v: Vec<&Clock<A>> = self.latest_time_by_replica.values().collect();
v.sort();
v.reverse(); v.pop()
}
pub fn truncate_log(&mut self) -> bool {
let result = self.causally_stable_threshold();
match result.cloned() {
Some(t) => self.state.truncate_log_before(&t),
None => false,
}
}
}