use super::metadata::{Address, Entries, Entry, Index, Perm};
use crate::{Error, Result};
pub use crdts::{lseq::Op, Actor};
use crdts::{
lseq::{ident::Identifier, Entry as LSeqEntry, LSeq},
CmRDT,
};
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering::{Equal, Greater, Less},
collections::BTreeMap,
fmt::{self, Display},
hash::Hash,
};
const LSEQ_BOUNDARY: u64 = 1;
const LSEQ_TREE_BASE: u8 = 10;
#[derive(Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Hash)]
pub struct CrdtDataOperation<A: Actor + Display + std::fmt::Debug, T> {
pub address: Address,
pub crdt_op: Op<T, A>,
pub ctx: Identifier<A>,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Hash)]
pub struct CrdtPolicyOperation<A: Actor + Display + std::fmt::Debug, P> {
pub address: Address,
pub crdt_op: Op<(P, Option<Identifier<A>>), A>,
pub ctx: Option<(Identifier<A>, Option<Identifier<A>>)>,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd)]
pub struct SequenceCrdt<A, P>
where
A: Actor + Display + std::fmt::Debug,
P: Perm + Hash + Clone,
{
address: Address,
data: BTreeMap<Identifier<A>, LSeq<Entry, A>>,
policy: LSeq<(P, Option<Identifier<A>>), A>,
}
impl<A, P> Display for SequenceCrdt<A, P>
where
A: Actor + Display + std::fmt::Debug,
P: Perm + Hash + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[")?;
if let Some(lseq) = self.current_lseq() {
for (i, entry) in lseq.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "<{}>", String::from_utf8_lossy(&entry),)?;
}
}
write!(f, "]")
}
}
impl<A, P> SequenceCrdt<A, P>
where
A: Actor + Display + std::fmt::Debug,
P: Perm + Hash + Clone,
{
pub fn new(actor: A, address: Address) -> Self {
Self {
address,
data: BTreeMap::default(),
policy: LSeq::new_with_args(actor, LSEQ_TREE_BASE, LSEQ_BOUNDARY),
}
}
pub fn address(&self) -> &Address {
&self.address
}
pub fn len(&self) -> u64 {
if let Some(lseq) = self.current_lseq() {
lseq.len() as u64
} else {
0
}
}
pub fn policy_index(&self) -> u64 {
self.policy.len() as u64
}
pub fn append(&mut self, entry: Entry) -> Result<CrdtDataOperation<A, Entry>> {
let address = *self.address();
match self.policy.last_entry() {
None => Err(Error::InvalidOperation),
Some(cur_policy) => match self.data.get_mut(&cur_policy.id) {
None => Err(Error::Unexpected(
"The data is an unexpected inconsistent state".to_string(),
)),
Some(lseq) => {
let crdt_op = lseq.append(entry);
Ok(CrdtDataOperation {
address,
crdt_op,
ctx: cur_policy.id.clone(),
})
}
},
}
}
pub fn apply_data_op(&mut self, op: CrdtDataOperation<A, Entry>) -> Result<()> {
let policy_id = op.ctx.clone();
if self.policy.find_entry(&policy_id).is_some() {
for LSeqEntry {
id,
val: (_, item_id),
..
} in self.policy.iter_entries()
{
let should_apply_op = match id.cmp(&policy_id) {
Equal => true,
Less => false,
Greater => match item_id {
None => true,
Some(item_id) => item_id >= op.crdt_op.id(),
},
};
if should_apply_op {
let lseq = self.data.get_mut(id).ok_or_else(|| {
Error::Unexpected(
"The data is an unexpected inconsistent state".to_string(),
)
})?;
lseq.apply(op.crdt_op.clone());
}
}
Ok(())
} else {
Err(Error::OpNotCausallyReady)
}
}
pub fn set_policy(&mut self, policy: P) -> Result<CrdtPolicyOperation<A, P>> {
let (new_lseq, prev_policy_id) = match self.policy.last_entry() {
None => {
let actor = self.policy.actor();
(
LSeq::new_with_args(actor, LSEQ_TREE_BASE, LSEQ_BOUNDARY),
None,
)
}
Some(cur_policy) => match self.data.get(&cur_policy.id) {
Some(lseq) => (lseq.clone(), Some(cur_policy.id.clone())),
None => {
let actor = self.policy.actor();
(
LSeq::new_with_args(actor, LSEQ_TREE_BASE, LSEQ_BOUNDARY),
Some(cur_policy.id.clone()),
)
}
},
};
let cur_last_item = new_lseq.last_entry().map(|entry| entry.id.clone());
let crdt_op = self.policy.append((policy, cur_last_item.clone()));
let policy_id = crdt_op.id().clone();
let _ = self.data.insert(policy_id, new_lseq);
let ctx = prev_policy_id.map(|policy_id| (policy_id, cur_last_item));
Ok(CrdtPolicyOperation {
address: *self.address(),
crdt_op,
ctx,
})
}
pub fn apply_policy_op(&mut self, op: CrdtPolicyOperation<A, P>) -> Result<()> {
let new_lseq = if let Some((policy_id, item_id)) = op.ctx {
if self.policy.find_entry(&policy_id).is_none() {
return Err(Error::OpNotCausallyReady);
} else {
let lseq = self.data.get(&policy_id).ok_or_else(|| {
Error::Unexpected("The data is an unexpected inconsistent state".to_string())
})?;
match item_id {
None => {
lseq.clone()
}
Some(id) => {
let actor = self.policy.actor();
let mut new_lseq =
LSeq::new_with_args(actor, LSEQ_TREE_BASE, LSEQ_BOUNDARY);
lseq.iter_entries().for_each(|entry| {
if entry.id <= id {
let op = Op::Insert {
id: entry.id.clone(),
dot: entry.dot.clone(),
val: entry.val.clone(),
};
new_lseq.apply(op);
}
});
new_lseq
}
}
}
} else {
let actor = self.policy.actor();
LSeq::new_with_args(actor, LSEQ_TREE_BASE, LSEQ_BOUNDARY)
};
let policy_id = op.crdt_op.id();
if !self.data.contains_key(policy_id) {
let _ = self.data.insert(policy_id.clone(), new_lseq);
}
self.policy.apply(op.crdt_op);
Ok(())
}
pub fn get(&self, index: Index) -> Option<&Entry> {
let i = to_absolute_index(index, self.len() as usize)?;
self.current_lseq().and_then(|lseq| lseq.get(i))
}
pub fn last_entry(&self) -> Option<&Entry> {
self.current_lseq().and_then(|lseq| lseq.last())
}
pub fn policy(&self, index: impl Into<Index>) -> Option<&P> {
let i = to_absolute_index(index.into(), self.policy.len())?;
self.policy.get(i).map(|p| &p.0)
}
pub fn in_range(&self, start: Index, end: Index) -> Option<Entries> {
let start_index = to_absolute_index(start, self.len() as usize)?;
let end_index = to_absolute_index(end, self.len() as usize)?;
self.current_lseq().map(|lseq| {
lseq.iter()
.take(end_index - 1)
.enumerate()
.take_while(|(i, _)| i >= &start_index)
.map(|(_, entry)| entry.clone())
.collect::<Entries>()
})
}
fn current_lseq(&self) -> Option<&LSeq<Entry, A>> {
self.policy
.last_entry()
.and_then(|cur_policy| self.data.get(&cur_policy.id))
}
}
fn to_absolute_index(index: Index, count: usize) -> Option<usize> {
match index {
Index::FromStart(index) if index as usize <= count => Some(index as usize),
Index::FromStart(_) => None,
Index::FromEnd(index) => count.checked_sub(index as usize),
}
}