use Error;
use dot::{SiteId, Counter as RCounter};
use std::borrow::Cow;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Counter {
inner: CounterInner,
site_id: SiteId,
awaiting_site_id: Option<Op>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CounterState<'a>(Cow<'a, CounterInner>);
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(crate) struct CounterInner(HashMap<SiteId, SiteInc>);
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct SiteInc {
inc: i64,
counter: RCounter,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Op {
site_id: SiteId,
counter: RCounter,
inc: i64,
}
impl Counter {
pub fn new(value: i64) -> Self {
let site_id = 1;
let inner = CounterInner::new(value, site_id);
Counter{inner, site_id, awaiting_site_id: None}
}
pub fn get(&self) -> i64 {
self.inner.get()
}
pub fn increment(&mut self, amount: i64) -> Result<Op, Error> {
let op = self.inner.increment(amount, self.site_id);
if self.site_id == 0 {
self.awaiting_site_id = Some(op);
Err(Error::AwaitingSiteId)
} else {
Ok(op)
}
}
pub fn site_id(&self) -> SiteId {
self.site_id
}
pub fn state(&self) -> CounterState {
CounterState(Cow::Borrowed(&self.inner))
}
pub fn clone_state(&self) -> CounterState<'static> {
CounterState(Cow::Owned(self.inner.clone()))
}
pub fn into_state(self) -> CounterState<'static> {
CounterState(Cow::Owned(self.inner))
}
pub fn from_state(state: CounterState, site_id: Option<SiteId>) -> Result<Self, Error> {
let site_id = match site_id {
None => 0,
Some(0) => return Err(Error::InvalidSiteId),
Some(s) => s,
};
Ok(Counter{
inner: state.0.into_owned(),
site_id,
awaiting_site_id: None,
})
}
pub fn execute_op(&mut self, op: &Op) -> Option<i64> {
self.inner.execute_op(op)
}
pub fn validate_and_execute_op(&mut self, op: &Op, site_id: SiteId) -> Result<Option<i64>, Error> {
op.validate(site_id)?;
Ok(self.execute_op(op))
}
pub fn merge(&mut self, other: CounterState) {
self.inner.merge(other.0.into_owned())
}
pub fn add_site_id(&mut self, site_id: SiteId) -> Result<Option<Op>, Error> {
if self.site_id != 0 { return Err(Error::AlreadyHasSiteId) }
self.site_id = site_id;
self.inner.add_site_id(site_id);
if let Some(mut op) = self.awaiting_site_id.take() {
op.add_site_id(site_id);
Ok(Some(op))
} else {
Ok(None)
}
}
}
impl CounterInner {
fn new(inc: i64, site_id: SiteId) -> Self {
let mut map = HashMap::new();
map.insert(site_id, SiteInc{inc, counter: 1});
CounterInner(map)
}
fn get(&self) -> i64 {
self.0.values().fold(0, |sum, site_count| sum + site_count.inc)
}
fn increment(&mut self, amount: i64, site_id: SiteId) -> Op {
let site_inc = self.0
.entry(site_id)
.or_insert_with(|| SiteInc{inc: 0, counter: 0});
site_inc.inc += amount;
site_inc.counter += 1;
Op{site_id, counter: site_inc.counter, inc: site_inc.inc}
}
fn execute_op(&mut self, op: &Op) -> Option<i64> {
let Op{site_id, counter, inc} = *op;
let site_inc = self.0
.entry(site_id)
.or_insert_with(|| SiteInc{inc: 0, counter: 0});
if site_inc.counter >= counter {
None
} else {
let diff = inc - site_inc.inc;
site_inc.counter = counter;
site_inc.inc = inc;
Some(diff)
}
}
fn merge(&mut self, other: CounterInner) {
for (site_id, SiteInc{inc, counter}) in other.0 {
let site_inc = self.0
.entry(site_id)
.or_insert_with(|| SiteInc{inc: 0, counter: 0});
if counter > site_inc.counter {
site_inc.inc = inc;
site_inc.counter = counter;
}
}
}
fn add_site_id(&mut self, site_id: SiteId) {
if let Some(site_inc) = self.0.remove(&0) {
self.0.insert(site_id, site_inc);
}
}
}
impl Op {
pub fn site_id(&self) -> SiteId { self.site_id }
pub fn counter(&self) -> RCounter { self.counter }
pub fn inc(&self) -> i64 { self.inc }
pub fn add_site_id(&mut self, site_id: SiteId) {
self.site_id = site_id;
}
pub fn validate(&self, site_id: SiteId) -> Result<(), Error> {
if self.site_id == site_id { Ok(()) } else { Err(Error::InvalidOp) }
}
}