use Error;
use dot::{Dot, Summary, SiteId};
use map_tuple_vec;
use traits::*;
use serde::ser::Serialize;
use serde::de::DeserializeOwned;
use std::borrow::{Borrow, Cow};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::hash::Hash;
pub trait Key: Clone + Eq + Hash + Serialize + DeserializeOwned {}
impl<T: Clone + Eq + Hash + Serialize + DeserializeOwned> Key for T {}
pub trait Value: Clone + PartialEq + Serialize + DeserializeOwned {}
impl<T: Clone + PartialEq + Serialize + DeserializeOwned> Value for T {}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(bound(deserialize = ""))]
pub struct Map<K: Key, V: Value> {
inner: Inner<K, V>,
summary: Summary,
site_id: SiteId,
cached_ops: Vec<Op<K, V>>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(bound(deserialize = ""))]
pub struct MapState<'a, K: Key + 'a, V: Value + 'a> {
inner: Cow<'a, Inner<K,V>>,
summary: Cow<'a, Summary>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[doc(hidden)]
pub struct Inner<K: Key, V: Value>(#[serde(with = "map_tuple_vec")] pub HashMap<K, Vec<Element<V>>>);
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Op<K, V> {
key: K,
inserted_element: Option<Element<V>>,
removed_dots: Vec<Dot>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum LocalOp<K, V> {
Insert{key: K, value: V},
Remove{key: K},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[doc(hidden)]
pub struct Element<V> {
pub value: V,
pub dot: Dot,
}
impl<V> PartialEq for Element<V> {
fn eq(&self, other: &Element<V>) -> bool {
self.dot == other.dot
}
}
impl<V> Eq for Element<V> {}
impl<V> PartialOrd for Element<V> {
fn partial_cmp(&self, other: &Element<V>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<V> Ord for Element<V> {
fn cmp(&self, other: &Element<V>) -> Ordering {
self.dot.cmp(&other.dot)
}
}
impl<K: Key, V: Value> Map<K, V> {
pub fn new() -> Self {
let inner = Inner::new();
let summary = Summary::default();
let site_id = 1;
Map{inner, summary, site_id, cached_ops: vec![]}
}
pub fn contains_key(&self, key: &K) -> bool {
self.inner.0.contains_key(key)
}
pub fn get(&self, key: &K) -> Option<&V> {
let elements = self.inner.0.get(key)?;
Some(&elements[0].value)
}
pub fn insert(&mut self, key: K, value: V) -> Result<Op<K, V>, Error> {
let dot = self.summary.get_dot(self.site_id);
let op = self.inner.insert(key, value, dot);
self.after_op(op)
}
pub fn remove(&mut self, key: &K) -> Option<Result<Op<K,V>, Error>> {
let op = self.inner.remove(key)?;
Some(self.after_op(op))
}
crdt_impl2! {
Map,
MapState<K, V>,
MapState<'static, K, V>,
MapState,
Inner<K, V>,
Op<K, V>,
LocalOp<K, V>,
HashMap<K, V>,
}
}
impl<K: Key, V: Value> From<HashMap<K, V>> for Map<K, V> {
fn from(local_value: HashMap<K, V>) -> Self {
let mut map = Map::new();
for (k, v) in local_value { let _ = map.insert(k, v); }
map
}
}
impl<K: Key, V: Value> Inner<K, V> {
pub fn new() -> Self {
Inner(HashMap::new())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn iter(&self) -> ::std::collections::hash_map::Iter<K,Vec<Element<V>>> {
self.0.iter()
}
pub fn get_mut<Q: ?Sized>(&mut self, key: &Q) -> Option<&mut Element<V>>
where Q: Hash + Eq,
K: Borrow<Q>,
{
let elements = self.0.get_mut(key)?;
Some(&mut elements[0])
}
pub fn get_mut_element<Q: ?Sized>(&mut self, key: &Q, dot: Dot) -> Option<&mut Element<V>>
where Q: Hash + Eq,
K: Borrow<Q>,
{
let elements = self.0.get_mut(key)?;
let idx = elements.binary_search_by(|e| e.dot.cmp(&dot)).ok()?;
Some(&mut elements[idx])
}
pub fn insert(&mut self, key: K, value: V, dot: Dot) -> Op<K, V> {
let inserted_element = Element{value, dot};
let removed_elements = self.0.insert(key.clone(), vec![inserted_element.clone()]).unwrap_or_else(|| vec![]);
let removed_dots = removed_elements.into_iter().map(|e| e.dot).collect();
Op{key, inserted_element: Some(inserted_element), removed_dots}
}
pub fn remove<Q: ?Sized>(&mut self, key: &Q) -> Option<Op<K, V>>
where Q: Hash + Eq + ToOwned<Owned = K>,
K: Borrow<Q>,
{
let removed_elements = self.0.remove(key)?;
let removed_dots = removed_elements.into_iter().map(|e| e.dot).collect();
Some(Op{key: key.to_owned(), inserted_element: None, removed_dots})
}
pub fn execute_op(&mut self, op: Op<K, V>) -> LocalOp<K, V> {
let mut elements = self.0.remove(&op.key).unwrap_or_else(|| vec![]);
elements.retain(|e| !op.removed_dots.contains(&e.dot));
if let Some(new_element) = op.inserted_element {
if let Err(idx) = elements.binary_search_by(|e| e.cmp(&new_element)) {
elements.insert(idx, new_element);
}
}
if elements.is_empty() {
LocalOp::Remove{key: op.key}
} else {
let value = elements[0].value.clone();
self.0.insert(op.key.clone(), elements);
LocalOp::Insert{key: op.key, value}
}
}
pub fn merge(&mut self, other: Self, summary: &Summary, other_summary: &Summary) {
let mut other_values = other.0;
self.0.retain(|key, elements| {
let mut other_elements = other_values.remove(key).unwrap_or_else(|| vec![]);
elements.retain(|e| other_elements.contains(e) || !other_summary.contains(&e.dot));
other_elements.retain(|e| !elements.contains(e) && !summary.contains(&e.dot));
elements.append(&mut other_elements);
elements.sort();
!elements.is_empty()
});
for (key, mut elements) in other_values {
elements.retain(|e| !summary.contains(&e.dot));
if !elements.is_empty() {
self.0.insert(key, elements);
}
}
}
pub fn add_site_id(&mut self, site_id: SiteId) {
for elements in self.0.values_mut() {
for element in elements {
if element.dot.site_id == 0 { element.dot.site_id = site_id };
}
}
}
pub fn validate_no_unassigned_sites(&self) -> Result<(), Error> {
for elements in self.0.values() {
for element in elements {
if element.dot.site_id == 0 {
return Err(Error::InvalidSiteId);
}
}
}
Ok(())
}
pub fn local_value(&self) -> HashMap<K, V> {
let mut hashmap = HashMap::with_capacity(self.0.len());
for (key, elements) in &self.0 {
hashmap.insert(key.clone(), elements[0].value.clone());
}
hashmap
}
}
impl<K: Key, V: Value + NestedInner> NestedInner for Inner<K, V> {
fn nested_add_site_id(&mut self, site_id: SiteId) {
for elements in self.0.values_mut() {
for element in elements {
element.value.nested_add_site_id(site_id);
if element.dot.site_id == 0 {
element.dot.site_id = site_id;
}
}
}
}
fn nested_validate_no_unassigned_sites(&self) -> Result<(), Error> {
for elements in self.0.values() {
for element in elements {
if element.dot.site_id == 0 {
return Err(Error::InvalidSiteId);
}
element.value.nested_validate_no_unassigned_sites()?;
}
}
Ok(())
}
fn nested_validate_all(&self, site_id: SiteId) -> Result<(), Error> {
for elements in self.0.values() {
for element in elements {
if element.dot.site_id != site_id {
return Err(Error::InvalidSiteId);
}
element.value.nested_validate_all(site_id)?;
}
}
Ok(())
}
fn nested_can_merge(&self, other: &Inner<K,V>) -> bool {
for (key, elements) in &self.0 {
if let Some(other_elements) = other.0.get(key) {
for element in elements {
if let Ok(idx) = other_elements.binary_search_by(|e| e.cmp(element)) {
let other_value = &other_elements[idx].value;
if !element.value.nested_can_merge(other_value) {
return false
}
}
}
}
}
true
}
fn nested_force_merge(&mut self, other: Inner<K, V>, summary: &Summary, other_summary: &Summary) {
let mut other_values = other.0;
self.0.retain(|key, elements| {
let mut other_elements = other_values.remove(key).unwrap_or_else(|| vec![]);
elements.retain(|e| other_elements.contains(e) || !other_summary.contains(&e.dot));
other_elements.retain(|e| elements.contains(e) || !summary.contains(&e.dot));
let (other_merge, mut other_insert) = other_elements.into_iter()
.partition(|e| elements.contains(e));
for element in other_merge {
let idx = elements.binary_search_by(|e| e.cmp(&element)).expect("Element must be present");
elements[idx].value.nested_force_merge(element.value, summary, other_summary);
}
elements.append(&mut other_insert);
elements.sort();
!elements.is_empty()
});
for (key, mut elements) in other_values {
elements.retain(|e| !summary.contains(&e.dot));
if !elements.is_empty() {
self.0.insert(key, elements);
}
}
}
}
impl<K: Key, V: Value> Op<K, V> {
pub fn key(&self) -> &K { &self.key }
pub fn inserted_element(&self) -> Option<&Element<V>> { self.inserted_element.as_ref() }
pub fn removed_dots(&self) -> &[Dot] { &self.removed_dots }
pub fn add_site_id(&mut self, site_id: SiteId) {
if let Some(ref mut e) = self.inserted_element {
if e.dot.site_id == 0 { e.dot.site_id = site_id };
}
for r in &mut self.removed_dots {
if r.site_id == 0 { r.site_id = site_id };
}
}
pub fn validate(&self, site_id: SiteId) -> Result<(), Error> {
if let Some(ref e) = self.inserted_element {
if e.dot.site_id != site_id { return Err(Error::InvalidOp) };
}
Ok(())
}
pub(crate) fn inserted_dots(&self) -> Vec<Dot> {
match self.inserted_element {
Some(ref e) => vec![e.dot],
None => vec![],
}
}
}
impl<K: Key, V: Value + NestedInner> NestedOp for Op<K, V> {
fn nested_add_site_id(&mut self, site_id: SiteId) {
if let Some(ref mut e) = self.inserted_element {
e.value.nested_add_site_id(site_id);
if e.dot.site_id == 0 { e.dot.site_id = site_id };
}
for r in &mut self.removed_dots {
if r.site_id == 0 { r.site_id = site_id };
}
}
fn nested_validate(&self, site_id: SiteId) -> Result<(), Error> {
if let Some(ref e) = self.inserted_element {
if e.dot.site_id != site_id { return Err(Error::InvalidOp) };
e.value.nested_validate_all(site_id)?;
}
Ok(())
}
}