use crate::clock::CompactTimestamp;
use crate::error::{CRDTError, CRDTResult};
use crate::memory::{MemoryConfig, NodeId};
use crate::traits::{BoundedCRDT, CRDT, RealTimeCRDT};
#[cfg(feature = "hardware-atomic")]
use core::cell::UnsafeCell;
#[cfg(feature = "hardware-atomic")]
use core::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "serde")]
use serde::{Deserialize, Deserializer, Serialize, Serializer};
#[derive(Debug)]
pub struct LWWMap<K, V, C: MemoryConfig, const CAPACITY: usize = 8> {
#[cfg(not(feature = "hardware-atomic"))]
entries: [Option<Entry<K, V>>; CAPACITY],
#[cfg(not(feature = "hardware-atomic"))]
count: usize,
#[cfg(feature = "hardware-atomic")]
entries: UnsafeCell<[Option<Entry<K, V>>; CAPACITY]>,
#[cfg(feature = "hardware-atomic")]
count: AtomicUsize,
node_id: NodeId,
_phantom: core::marker::PhantomData<C>,
}
#[cfg(feature = "hardware-atomic")]
unsafe impl<K, V, C: MemoryConfig> Sync for LWWMap<K, V, C>
where
K: Send,
V: Send,
C: Send + Sync,
{
}
impl<K, V, C: MemoryConfig, const CAPACITY: usize> Clone for LWWMap<K, V, C, CAPACITY>
where
K: Clone,
V: Clone,
{
fn clone(&self) -> Self {
#[cfg(not(feature = "hardware-atomic"))]
{
Self {
entries: self.entries.clone(),
count: self.count,
node_id: self.node_id,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "hardware-atomic")]
{
let cloned_entries = unsafe { (*self.entries.get()).clone() };
Self {
entries: UnsafeCell::new(cloned_entries),
count: AtomicUsize::new(self.count.load(Ordering::Relaxed)),
node_id: self.node_id,
_phantom: core::marker::PhantomData,
}
}
}
}
#[derive(Debug, Clone, Copy)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
struct Entry<K, V> {
key: K,
value: V,
#[cfg_attr(feature = "serde", serde(with = "compact_timestamp_serde"))]
timestamp: CompactTimestamp,
node_id: NodeId,
}
#[cfg(feature = "serde")]
mod compact_timestamp_serde {
use super::*;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(timestamp: &CompactTimestamp, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
timestamp.as_u64().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<CompactTimestamp, D::Error>
where
D: Deserializer<'de>,
{
let timestamp_u64 = u64::deserialize(deserializer)?;
Ok(CompactTimestamp::new(timestamp_u64))
}
}
#[cfg(feature = "serde")]
impl<K, V, C: MemoryConfig, const CAPACITY: usize> Serialize for LWWMap<K, V, C, CAPACITY>
where
K: Serialize + Clone + PartialEq,
V: Serialize + Clone + PartialEq,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeStruct;
let mut state = serializer.serialize_struct("LWWMap", 3)?;
#[cfg(not(feature = "hardware-atomic"))]
{
state.serialize_field("entries", &&self.entries[..self.count])?;
state.serialize_field("count", &self.count)?;
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
state.serialize_field("entries", &&entries_ref[..current_count])?;
state.serialize_field("count", ¤t_count)?;
}
state.serialize_field("node_id", &self.node_id)?;
state.end()
}
}
#[cfg(feature = "serde")]
impl<'de, K, V, C: MemoryConfig, const CAPACITY: usize> Deserialize<'de>
for LWWMap<K, V, C, CAPACITY>
where
K: Deserialize<'de> + Clone + PartialEq,
V: Deserialize<'de> + Clone + PartialEq,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
use core::fmt;
use serde::de::{self, MapAccess, Visitor};
#[derive(Deserialize)]
#[serde(field_identifier, rename_all = "snake_case")]
enum Field {
Entries,
Count,
NodeId,
}
struct LWWMapVisitor<K, V, C: MemoryConfig, const CAPACITY: usize> {
_phantom: core::marker::PhantomData<(K, V, C)>,
}
impl<'de, K, V, C: MemoryConfig, const CAPACITY: usize> Visitor<'de>
for LWWMapVisitor<K, V, C, CAPACITY>
where
K: Deserialize<'de> + Clone + PartialEq,
V: Deserialize<'de> + Clone + PartialEq,
{
type Value = LWWMap<K, V, C, CAPACITY>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct LWWMap")
}
fn visit_map<A>(self, mut map: A) -> Result<LWWMap<K, V, C, CAPACITY>, A::Error>
where
A: MapAccess<'de>,
{
let mut entries = None;
let mut count = None;
let mut node_id = None;
while let Some(key) = map.next_key()? {
match key {
Field::Entries => {
if entries.is_some() {
return Err(de::Error::duplicate_field("entries"));
}
use serde::de::SeqAccess;
struct EntriesDeserializer<K, V, const CAPACITY: usize> {
_phantom: core::marker::PhantomData<(K, V)>,
}
impl<'de, K, V, const CAPACITY: usize> serde::de::DeserializeSeed<'de>
for EntriesDeserializer<K, V, CAPACITY>
where
K: Deserialize<'de>,
V: Deserialize<'de>,
{
type Value = [Option<Entry<K, V>>; CAPACITY];
fn deserialize<D>(
self,
deserializer: D,
) -> Result<Self::Value, D::Error>
where
D: serde::de::Deserializer<'de>,
{
struct EntriesVisitor<K, V, const CAPACITY: usize> {
_phantom: core::marker::PhantomData<(K, V)>,
}
impl<'de, K, V, const CAPACITY: usize> serde::de::Visitor<'de> for EntriesVisitor<K, V, CAPACITY>
where
K: Deserialize<'de>,
V: Deserialize<'de>,
{
type Value = [Option<Entry<K, V>>; CAPACITY];
fn expecting(
&self,
formatter: &mut core::fmt::Formatter,
) -> core::fmt::Result
{
write!(
formatter,
"a sequence of at most {} entries",
CAPACITY
)
}
fn visit_seq<A>(
self,
mut seq: A,
) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut array = [const { None }; CAPACITY];
let mut index = 0;
while let Some(entry) =
seq.next_element::<Option<Entry<K, V>>>()?
{
if index >= CAPACITY {
return Err(serde::de::Error::custom(
"too many entries for capacity",
));
}
array[index] = entry;
index += 1;
}
Ok(array)
}
}
deserializer.deserialize_seq(EntriesVisitor::<K, V, CAPACITY> {
_phantom: core::marker::PhantomData,
})
}
}
let entries_array =
map.next_value_seed(EntriesDeserializer::<K, V, CAPACITY> {
_phantom: core::marker::PhantomData,
})?;
entries = Some(entries_array);
}
Field::Count => {
if count.is_some() {
return Err(de::Error::duplicate_field("count"));
}
count = Some(map.next_value::<usize>()?);
}
Field::NodeId => {
if node_id.is_some() {
return Err(de::Error::duplicate_field("node_id"));
}
node_id = Some(map.next_value::<NodeId>()?);
}
}
}
let entries_array = entries.ok_or_else(|| de::Error::missing_field("entries"))?;
let count = count.ok_or_else(|| de::Error::missing_field("count"))?;
let node_id = node_id.ok_or_else(|| de::Error::missing_field("node_id"))?;
if count > CAPACITY {
return Err(de::Error::custom("count exceeds capacity"));
}
#[cfg(not(feature = "hardware-atomic"))]
{
Ok(LWWMap {
entries: entries_array,
count,
node_id,
_phantom: core::marker::PhantomData,
})
}
#[cfg(feature = "hardware-atomic")]
{
Ok(LWWMap {
entries: UnsafeCell::new(entries_array),
count: AtomicUsize::new(count),
node_id,
_phantom: core::marker::PhantomData,
})
}
}
}
const FIELDS: &[&str] = &["entries", "count", "node_id"];
deserializer.deserialize_struct(
"LWWMap",
FIELDS,
LWWMapVisitor {
_phantom: core::marker::PhantomData,
},
)
}
}
impl<K, V, C: MemoryConfig, const CAPACITY: usize> LWWMap<K, V, C, CAPACITY>
where
K: Clone + PartialEq,
V: Clone + PartialEq,
{
pub fn with_capacity(node_id: NodeId) -> Self {
#[cfg(not(feature = "hardware-atomic"))]
{
Self {
entries: [const { None }; CAPACITY],
count: 0,
node_id,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "hardware-atomic")]
{
Self {
entries: UnsafeCell::new([const { None }; CAPACITY]),
count: AtomicUsize::new(0),
node_id,
_phantom: core::marker::PhantomData,
}
}
}
}
impl<K, V, C: MemoryConfig> LWWMap<K, V, C, 8>
where
K: Clone + PartialEq,
V: Clone + PartialEq,
{
pub fn new(node_id: NodeId) -> Self {
Self::with_capacity(node_id)
}
}
impl<K, V, C: MemoryConfig, const CAPACITY: usize> LWWMap<K, V, C, CAPACITY>
where
K: Clone + PartialEq,
V: Clone + PartialEq,
{
#[cfg(not(feature = "hardware-atomic"))]
pub fn insert(&mut self, key: K, value: V, timestamp: u64) -> CRDTResult<bool> {
let new_timestamp = CompactTimestamp::new(timestamp);
for i in 0..self.count {
if let Some(entry) = &mut self.entries[i] {
if entry.key == key {
let should_update = if new_timestamp > entry.timestamp {
true } else if new_timestamp == entry.timestamp {
if self.node_id == entry.node_id {
true } else {
self.node_id > entry.node_id }
} else {
false };
if should_update {
entry.value = value;
entry.timestamp = new_timestamp;
entry.node_id = self.node_id;
}
return Ok(false); }
}
}
if self.count >= CAPACITY {
return Err(CRDTError::BufferOverflow);
}
self.entries[self.count] = Some(Entry {
key,
value,
timestamp: new_timestamp,
node_id: self.node_id,
});
self.count += 1;
Ok(true)
}
#[cfg(feature = "hardware-atomic")]
pub fn insert(&self, key: K, value: V, timestamp: u64) -> CRDTResult<bool> {
let new_timestamp = CompactTimestamp::new(timestamp);
let current_count = self.count.load(Ordering::Relaxed);
let entries_ptr = self.entries.get();
let entries_mut = unsafe { &mut *entries_ptr };
for i in 0..current_count {
if let Some(entry) = &mut entries_mut[i] {
if entry.key == key {
let should_update = if new_timestamp > entry.timestamp {
true } else if new_timestamp == entry.timestamp {
if self.node_id == entry.node_id {
true } else {
self.node_id > entry.node_id }
} else {
false };
if should_update {
entry.value = value;
entry.timestamp = new_timestamp;
entry.node_id = self.node_id;
}
return Ok(false); }
}
}
loop {
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*entries_ptr };
for i in 0..current_count {
if let Some(entry) = &entries_ref[i] {
if entry.key == key {
let entries_mut = unsafe { &mut *entries_ptr };
if let Some(entry) = &mut entries_mut[i] {
let should_update = new_timestamp > entry.timestamp
|| (new_timestamp == entry.timestamp
&& self.node_id > entry.node_id);
if should_update {
entry.value = value;
entry.timestamp = new_timestamp;
entry.node_id = self.node_id;
}
}
return Ok(false);
}
}
}
if current_count >= CAPACITY {
return Err(CRDTError::BufferOverflow);
}
match self.count.compare_exchange_weak(
current_count,
current_count + 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
let entries_mut = unsafe { &mut *entries_ptr };
entries_mut[current_count] = Some(Entry {
key,
value,
timestamp: new_timestamp,
node_id: self.node_id,
});
return Ok(true);
}
Err(_) => {
continue;
}
}
}
}
pub fn get(&self, key: &K) -> Option<&V> {
#[cfg(not(feature = "hardware-atomic"))]
{
for entry in self.entries.iter().take(self.count) {
if let Some(entry) = entry {
if entry.key == *key {
return Some(&entry.value);
}
}
}
None
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
for entry in entries_ref.iter().take(current_count) {
if let Some(entry) = entry {
if entry.key == *key {
return Some(&entry.value);
}
}
}
None
}
}
pub fn get_timestamp(&self, key: &K) -> Option<CompactTimestamp> {
#[cfg(not(feature = "hardware-atomic"))]
{
for entry in self.entries.iter().take(self.count) {
if let Some(entry) = entry {
if entry.key == *key {
return Some(entry.timestamp);
}
}
}
None
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
for entry in entries_ref.iter().take(current_count) {
if let Some(entry) = entry {
if entry.key == *key {
return Some(entry.timestamp);
}
}
}
None
}
}
pub fn get_node_id(&self, key: &K) -> Option<NodeId> {
#[cfg(not(feature = "hardware-atomic"))]
{
for entry in self.entries.iter().take(self.count) {
if let Some(entry) = entry {
if entry.key == *key {
return Some(entry.node_id);
}
}
}
None
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
for entry in entries_ref.iter().take(current_count) {
if let Some(entry) = entry {
if entry.key == *key {
return Some(entry.node_id);
}
}
}
None
}
}
pub fn contains_key(&self, key: &K) -> bool {
self.get(key).is_some()
}
pub fn len(&self) -> usize {
#[cfg(not(feature = "hardware-atomic"))]
{
self.count
}
#[cfg(feature = "hardware-atomic")]
{
self.count.load(Ordering::Relaxed)
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_full(&self) -> bool {
self.len() >= CAPACITY
}
pub fn capacity(&self) -> usize {
CAPACITY
}
pub fn remaining_capacity(&self) -> usize {
CAPACITY - self.len()
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
#[cfg(not(feature = "hardware-atomic"))]
{
self.entries
.iter()
.take(self.count)
.filter_map(|opt| opt.as_ref().map(|entry| (&entry.key, &entry.value)))
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
entries_ref
.iter()
.take(current_count)
.filter_map(|opt| opt.as_ref().map(|entry| (&entry.key, &entry.value)))
}
}
pub fn keys(&self) -> impl Iterator<Item = &K> {
#[cfg(not(feature = "hardware-atomic"))]
{
self.entries
.iter()
.take(self.count)
.filter_map(|opt| opt.as_ref().map(|entry| &entry.key))
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
entries_ref
.iter()
.take(current_count)
.filter_map(|opt| opt.as_ref().map(|entry| &entry.key))
}
}
pub fn values(&self) -> impl Iterator<Item = &V> {
#[cfg(not(feature = "hardware-atomic"))]
{
self.entries
.iter()
.take(self.count)
.filter_map(|opt| opt.as_ref().map(|entry| &entry.value))
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
entries_ref
.iter()
.take(current_count)
.filter_map(|opt| opt.as_ref().map(|entry| &entry.value))
}
}
#[cfg(not(feature = "hardware-atomic"))]
pub fn remove(&mut self, key: &K) -> Option<V> {
for i in 0..self.count {
if let Some(entry) = &self.entries[i] {
if entry.key == *key {
let removed_value = entry.value.clone();
for j in i..(self.count - 1) {
self.entries[j] = self.entries[j + 1].take();
}
self.entries[self.count - 1] = None;
self.count -= 1;
return Some(removed_value);
}
}
}
None
}
#[cfg(feature = "hardware-atomic")]
pub fn remove(&self, key: &K) -> Option<V> {
let entries_ptr = self.entries.get();
let entries_mut = unsafe { &mut *entries_ptr };
loop {
let current_count = self.count.load(Ordering::Relaxed);
let mut found_index = None;
let mut removed_value = None;
let mut entry_timestamp = None;
let mut entry_node_id = None;
for i in 0..current_count {
if let Some(entry) = &entries_mut[i] {
if entry.key == *key {
found_index = Some(i);
removed_value = Some(entry.value.clone());
entry_timestamp = Some(entry.timestamp);
entry_node_id = Some(entry.node_id);
break;
}
}
}
if let Some(i) = found_index {
let removed_val = removed_value.unwrap();
for j in i..(current_count - 1) {
entries_mut[j] = entries_mut[j + 1].take();
}
entries_mut[current_count - 1] = None;
match self.count.compare_exchange_weak(
current_count,
current_count - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return Some(removed_val),
Err(_) => {
for j in (i + 1..current_count).rev() {
entries_mut[j] = entries_mut[j - 1].take();
}
entries_mut[i] = Some(Entry {
key: key.clone(),
value: removed_val,
timestamp: entry_timestamp.unwrap(),
node_id: entry_node_id.unwrap(),
});
continue; }
}
} else {
return None;
}
}
}
}
impl<K, V, C: MemoryConfig, const CAPACITY: usize> CRDT<C> for LWWMap<K, V, C, CAPACITY>
where
K: Clone + PartialEq + core::fmt::Debug,
V: Clone + PartialEq + core::fmt::Debug,
{
type Error = CRDTError;
fn merge(&mut self, other: &Self) -> CRDTResult<()> {
#[cfg(not(feature = "hardware-atomic"))]
{
for other_entry in other.entries.iter().take(other.count) {
if let Some(other_entry) = other_entry {
let mut found = false;
for i in 0..self.count {
if let Some(our_entry) = &mut self.entries[i] {
if our_entry.key == other_entry.key {
found = true;
let should_update = if other_entry.timestamp > our_entry.timestamp {
true } else if other_entry.timestamp == our_entry.timestamp {
if other_entry.node_id == our_entry.node_id {
true } else {
other_entry.node_id > our_entry.node_id }
} else {
false };
if should_update {
our_entry.value = other_entry.value.clone();
our_entry.timestamp = other_entry.timestamp;
our_entry.node_id = other_entry.node_id;
}
break;
}
}
}
if !found {
if self.count >= CAPACITY {
return Err(CRDTError::BufferOverflow);
}
self.entries[self.count] = Some(Entry {
key: other_entry.key.clone(),
value: other_entry.value.clone(),
timestamp: other_entry.timestamp,
node_id: other_entry.node_id,
});
self.count += 1;
}
}
}
}
#[cfg(feature = "hardware-atomic")]
{
let other_count = other.count.load(Ordering::Relaxed);
let other_entries_ref = unsafe { &*other.entries.get() };
let self_entries_mut = unsafe { &mut *self.entries.get() };
let mut self_count = self.count.load(Ordering::Relaxed);
for other_entry in other_entries_ref.iter().take(other_count) {
if let Some(other_entry) = other_entry {
let mut found = false;
for i in 0..self_count {
if let Some(our_entry) = &mut self_entries_mut[i] {
if our_entry.key == other_entry.key {
found = true;
let should_update = if other_entry.timestamp > our_entry.timestamp {
true } else if other_entry.timestamp == our_entry.timestamp {
if other_entry.node_id == our_entry.node_id {
true } else {
other_entry.node_id > our_entry.node_id }
} else {
false };
if should_update {
our_entry.value = other_entry.value.clone();
our_entry.timestamp = other_entry.timestamp;
our_entry.node_id = other_entry.node_id;
}
break;
}
}
}
if !found {
if self_count >= CAPACITY {
return Err(CRDTError::BufferOverflow);
}
self_entries_mut[self_count] = Some(Entry {
key: other_entry.key.clone(),
value: other_entry.value.clone(),
timestamp: other_entry.timestamp,
node_id: other_entry.node_id,
});
self_count += 1;
}
}
}
self.count.store(self_count, Ordering::Relaxed);
}
Ok(())
}
fn eq(&self, other: &Self) -> bool {
#[cfg(not(feature = "hardware-atomic"))]
{
if self.count != other.count {
return false;
}
for entry in self.entries.iter().take(self.count) {
if let Some(entry) = entry {
if let Some(other_value) = other.get(&entry.key) {
if entry.value != *other_value {
return false;
}
if let Some(other_timestamp) = other.get_timestamp(&entry.key) {
if entry.timestamp != other_timestamp {
return false;
}
}
if let Some(other_node_id) = other.get_node_id(&entry.key) {
if entry.node_id != other_node_id {
return false;
}
}
} else {
return false;
}
}
}
true
}
#[cfg(feature = "hardware-atomic")]
{
let self_count = self.count.load(Ordering::Relaxed);
let other_count = other.count.load(Ordering::Relaxed);
if self_count != other_count {
return false;
}
let self_entries_ref = unsafe { &*self.entries.get() };
for entry in self_entries_ref.iter().take(self_count) {
if let Some(entry) = entry {
if let Some(other_value) = other.get(&entry.key) {
if entry.value != *other_value {
return false;
}
if let Some(other_timestamp) = other.get_timestamp(&entry.key) {
if entry.timestamp != other_timestamp {
return false;
}
}
if let Some(other_node_id) = other.get_node_id(&entry.key) {
if entry.node_id != other_node_id {
return false;
}
}
} else {
return false;
}
}
}
true
}
}
fn size_bytes(&self) -> usize {
core::mem::size_of::<Self>()
}
fn validate(&self) -> CRDTResult<()> {
if self.node_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
#[cfg(not(feature = "hardware-atomic"))]
{
if self.count > CAPACITY {
return Err(CRDTError::ConfigurationExceeded);
}
if self.count > C::MAX_MAP_ENTRIES {
return Err(CRDTError::ConfigurationExceeded);
}
for i in 0..self.count {
if let Some(entry_i) = &self.entries[i] {
for j in (i + 1)..self.count {
if let Some(entry_j) = &self.entries[j] {
if entry_i.key == entry_j.key {
return Err(CRDTError::InvalidState);
}
}
}
}
}
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
if current_count > CAPACITY {
return Err(CRDTError::ConfigurationExceeded);
}
if current_count > C::MAX_MAP_ENTRIES {
return Err(CRDTError::ConfigurationExceeded);
}
for i in 0..current_count {
if let Some(entry_i) = &entries_ref[i] {
for j in (i + 1)..current_count {
if let Some(entry_j) = &entries_ref[j] {
if entry_i.key == entry_j.key {
return Err(CRDTError::InvalidState);
}
}
}
}
}
}
Ok(())
}
fn state_hash(&self) -> u32 {
#[cfg(not(feature = "hardware-atomic"))]
{
let mut hash = 0u32;
for entry in self.entries.iter().take(self.count) {
if let Some(entry) = entry {
let key_ptr = &entry.key as *const K as usize;
let value_ptr = &entry.value as *const V as usize;
hash ^=
(key_ptr as u32) ^ (value_ptr as u32) ^ (entry.timestamp.as_u64() as u32);
}
}
hash ^= self.count as u32;
hash
}
#[cfg(feature = "hardware-atomic")]
{
let current_count = self.count.load(Ordering::Relaxed);
let entries_ref = unsafe { &*self.entries.get() };
let mut hash = 0u32;
for entry in entries_ref.iter().take(current_count) {
if let Some(entry) = entry {
let key_ptr = &entry.key as *const K as usize;
let value_ptr = &entry.value as *const V as usize;
hash ^=
(key_ptr as u32) ^ (value_ptr as u32) ^ (entry.timestamp.as_u64() as u32);
}
}
hash ^= current_count as u32;
hash
}
}
fn can_merge(&self, other: &Self) -> bool {
#[cfg(not(feature = "hardware-atomic"))]
{
let mut new_keys = 0;
for other_entry in other.entries.iter().take(other.count) {
if let Some(other_entry) = other_entry {
if !self.contains_key(&other_entry.key) {
new_keys += 1;
}
}
}
self.count + new_keys <= CAPACITY
}
#[cfg(feature = "hardware-atomic")]
{
let self_count = self.count.load(Ordering::Relaxed);
let other_count = other.count.load(Ordering::Relaxed);
let other_entries_ref = unsafe { &*other.entries.get() };
let mut new_keys = 0;
for other_entry in other_entries_ref.iter().take(other_count) {
if let Some(other_entry) = other_entry {
if !self.contains_key(&other_entry.key) {
new_keys += 1;
}
}
}
self_count + new_keys <= CAPACITY
}
}
}
impl<K, V, C: MemoryConfig, const CAPACITY: usize> BoundedCRDT<C> for LWWMap<K, V, C, CAPACITY>
where
K: Clone + PartialEq + core::fmt::Debug,
V: Clone + PartialEq + core::fmt::Debug,
{
const MAX_SIZE_BYTES: usize = core::mem::size_of::<Self>();
const MAX_ELEMENTS: usize = CAPACITY;
fn memory_usage(&self) -> usize {
core::mem::size_of::<Self>()
}
fn element_count(&self) -> usize {
#[cfg(not(feature = "hardware-atomic"))]
{
self.count
}
#[cfg(feature = "hardware-atomic")]
{
self.count.load(Ordering::Relaxed)
}
}
fn compact(&mut self) -> CRDTResult<usize> {
Ok(0)
}
fn can_add_element(&self) -> bool {
self.element_count() < Self::MAX_ELEMENTS
}
}
impl<K, V, C: MemoryConfig, const CAPACITY: usize> RealTimeCRDT<C> for LWWMap<K, V, C, CAPACITY>
where
K: Clone + PartialEq + core::fmt::Debug,
V: Clone + PartialEq + core::fmt::Debug,
{
const MAX_MERGE_CYCLES: u32 = 300; const MAX_VALIDATE_CYCLES: u32 = 150;
const MAX_SERIALIZE_CYCLES: u32 = 200;
fn merge_bounded(&mut self, other: &Self) -> CRDTResult<()> {
self.merge(other)
}
fn validate_bounded(&self) -> CRDTResult<()> {
self.validate()
}
fn remaining_budget(&self) -> Option<u32> {
None
}
fn set_budget(&mut self, _cycles: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::DefaultConfig;
#[test]
fn test_new_map() {
let map = LWWMap::<u8, u32, DefaultConfig>::new(1);
assert!(map.is_empty());
assert_eq!(map.len(), 0);
assert_eq!(map.capacity(), 8);
assert_eq!(map.remaining_capacity(), 8);
assert!(!map.is_full());
assert_eq!(map.node_id(), 1);
}
#[test]
fn test_insert_and_get() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
assert!(map.insert(1, 100, 1000).unwrap());
assert_eq!(map.len(), 1);
assert!(!map.is_empty());
assert_eq!(map.get(&1), Some(&100));
assert!(map.contains_key(&1));
assert!(!map.insert(1, 200, 2000).unwrap());
assert_eq!(map.len(), 1); assert_eq!(map.get(&1), Some(&200));
assert!(!map.insert(1, 300, 500).unwrap());
assert_eq!(map.get(&1), Some(&200));
assert!(map.insert(2, 400, 3000).unwrap());
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&400));
}
#[test]
fn test_timestamps_and_node_ids() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 100, 1000).unwrap();
assert_eq!(map.get_timestamp(&1).unwrap().as_u64(), 1000);
assert_eq!(map.get_node_id(&1), Some(1));
assert_eq!(map.get_timestamp(&2), None);
assert_eq!(map.get_node_id(&2), None);
}
#[test]
fn test_capacity_limits() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
for i in 0..8 {
assert!(map.insert(i, i as u32 * 10, 1000 + i as u64).is_ok());
}
assert!(map.is_full());
assert_eq!(map.remaining_capacity(), 0);
assert!(map.insert(8, 80, 2000).is_err());
}
#[test]
fn test_iterators() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 10, 1000).unwrap();
map.insert(3, 30, 1001).unwrap();
map.insert(2, 20, 1002).unwrap();
let mut pairs = [(0u8, 0u32); 3];
let mut i = 0;
for (&k, &v) in map.iter() {
pairs[i] = (k, v);
i += 1;
}
pairs.sort_by_key(|&(k, _)| k);
assert_eq!(pairs, [(1, 10), (2, 20), (3, 30)]);
let mut keys = [0u8; 3];
let mut i = 0;
for &k in map.keys() {
keys[i] = k;
i += 1;
}
keys.sort();
assert_eq!(keys, [1, 2, 3]);
let mut values = [0u32; 3];
let mut i = 0;
for &v in map.values() {
values[i] = v;
i += 1;
}
values.sort();
assert_eq!(values, [10, 20, 30]);
}
#[test]
fn test_merge() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
map1.insert(1, 10, 1000).unwrap();
map1.insert(2, 20, 1001).unwrap();
map2.insert(2, 25, 2000).unwrap(); map2.insert(3, 30, 2001).unwrap();
assert_eq!(map1.len(), 2);
assert_eq!(map2.len(), 2);
map1.merge(&map2).unwrap();
assert_eq!(map1.len(), 3);
assert_eq!(map1.get(&1), Some(&10)); assert_eq!(map1.get(&2), Some(&25)); assert_eq!(map1.get(&3), Some(&30)); }
#[test]
fn test_merge_tiebreaker() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
map1.insert(1, 10, 1000).unwrap();
map2.insert(1, 20, 1000).unwrap();
map1.merge(&map2).unwrap();
assert_eq!(map1.get(&1), Some(&20)); }
#[test]
fn test_merge_overflow() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
for i in 0..8 {
map1.insert(i, i as u32 * 10, 1000).unwrap();
}
map2.insert(100, 1000, 2000).unwrap();
assert!(map1.merge(&map2).is_err());
}
#[test]
fn test_merge_idempotent() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
map1.insert(1, 10, 1000).unwrap();
map1.merge(&map2).unwrap();
let len1 = map1.len();
map1.merge(&map2).unwrap();
let len2 = map1.len();
assert_eq!(len1, len2);
}
#[test]
fn test_merge_commutative() {
let mut map1a = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map1b = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
let mut map3 = LWWMap::<u8, u32, DefaultConfig>::new(3);
map1a.insert(1, 10, 1000).unwrap();
map1b.insert(1, 10, 1000).unwrap();
map2.insert(2, 20, 2000).unwrap();
map3.insert(3, 30, 3000).unwrap();
map1a.merge(&map2).unwrap();
map1a.merge(&map3).unwrap();
map1b.merge(&map3).unwrap();
map1b.merge(&map2).unwrap();
assert_eq!(map1a.len(), map1b.len());
assert!(map1a.eq(&map1b));
}
#[test]
fn test_bounded_crdt() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 10, 1000).unwrap();
assert_eq!(map.element_count(), 1);
assert!(map.memory_usage() > 0);
assert!(map.can_add_element());
for i in 2..8 {
map.insert(i, i as u32 * 10, 1000 + i as u64).unwrap();
}
assert_eq!(map.element_count(), 7);
assert!(map.can_add_element());
map.insert(8, 80, 2000).unwrap();
assert_eq!(map.element_count(), 8);
assert!(!map.can_add_element());
}
#[test]
fn test_validation() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 10, 1000).unwrap();
assert!(map.validate().is_ok());
}
#[test]
fn test_real_time_crdt() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
assert!(map1.merge_bounded(&map2).is_ok());
assert!(map1.validate_bounded().is_ok());
}
#[test]
fn test_can_merge() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
for i in 0..8 {
map1.insert(i, i as u32 * 10, 1000).unwrap();
}
assert!(map1.can_merge(&map2));
map2.insert(5, 50, 2000).unwrap();
assert!(map1.can_merge(&map2));
map2.insert(100, 1000, 3000).unwrap();
assert!(!map1.can_merge(&map2));
}
#[test]
fn test_eq() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(1);
assert!(map1.eq(&map2));
map1.insert(1, 10, 1000).unwrap();
map2.insert(1, 10, 1000).unwrap();
assert!(map1.eq(&map2));
map2.insert(2, 20, 1001).unwrap(); assert!(!map1.eq(&map2));
}
#[test]
fn test_with_capacity() {
let map = LWWMap::<u8, u32, DefaultConfig, 16>::with_capacity(1);
assert!(map.is_empty());
assert_eq!(map.len(), 0);
assert_eq!(map.capacity(), 16);
assert_eq!(map.remaining_capacity(), 16);
assert!(!map.is_full());
assert_eq!(map.node_id(), 1);
}
#[test]
fn test_custom_capacity_operations() {
let mut map = LWWMap::<u8, u32, DefaultConfig, 4>::with_capacity(1);
assert!(map.insert(1, 100, 1000).is_ok());
assert_eq!(map.len(), 1);
assert_eq!(map.get(&1), Some(&100));
assert_eq!(map.capacity(), 4);
for i in 2..=4 {
assert!(map.insert(i, i as u32 * 100, 1000 + i as u64).is_ok());
}
assert!(map.is_full());
assert_eq!(map.remaining_capacity(), 0);
assert!(map.insert(5, 500, 2000).is_err());
}
#[test]
fn test_capacity_merge() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig, 4>::with_capacity(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig, 4>::with_capacity(2);
map1.insert(1, 10, 1000).unwrap();
map1.insert(2, 20, 1001).unwrap();
map2.insert(2, 25, 2000).unwrap(); map2.insert(3, 30, 2001).unwrap();
map1.merge(&map2).unwrap();
assert_eq!(map1.len(), 3);
assert_eq!(map1.get(&1), Some(&10)); assert_eq!(map1.get(&2), Some(&25)); assert_eq!(map1.get(&3), Some(&30)); }
#[cfg(all(test, feature = "serde"))]
mod serde_tests {
use super::*;
#[test]
fn test_serialize_deserialize() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 100, 1000).unwrap();
map.insert(2, 200, 1500).unwrap();
map.insert(1, 150, 2000).unwrap();
assert_eq!(map.len(), 2);
assert_eq!(map.get(&1), Some(&150)); assert_eq!(map.get(&2), Some(&200));
assert_eq!(map.get_timestamp(&1).unwrap().as_u64(), 2000);
assert_eq!(map.get_timestamp(&2).unwrap().as_u64(), 1500);
assert_eq!(map.node_id(), 1);
}
#[test]
fn test_atomic_vs_standard_compatibility() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 100, 1000).unwrap();
map.insert(2, 200, 1500).unwrap();
map.insert(3, 300, 2000).unwrap();
assert_eq!(map.len(), 3);
assert_eq!(map.get(&1), Some(&100));
assert_eq!(map.get(&2), Some(&200));
assert_eq!(map.get(&3), Some(&300));
}
#[test]
fn test_empty_map_serialization() {
let map = LWWMap::<u8, u32, DefaultConfig>::new(1);
assert_eq!(map.len(), 0);
assert!(map.is_empty());
assert_eq!(map.node_id(), 1);
}
#[test]
fn test_lww_semantics_serialization() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(2);
map1.insert(1, 100, 1000).unwrap();
map1.insert(2, 200, 1500).unwrap();
map2.insert(1, 150, 2000).unwrap(); map2.insert(2, 250, 1500).unwrap(); map2.insert(3, 300, 1600).unwrap();
map1.merge(&map2).unwrap();
assert_eq!(map1.len(), 3);
assert_eq!(map1.get(&1), Some(&150)); assert_eq!(map1.get(&2), Some(&250)); assert_eq!(map1.get(&3), Some(&300)); assert_eq!(map1.get_node_id(&1), Some(2)); assert_eq!(map1.get_node_id(&2), Some(2)); assert_eq!(map1.get_node_id(&3), Some(2)); }
#[test]
fn test_custom_capacity_serialization() {
let mut map = LWWMap::<u8, u32, DefaultConfig, 4>::with_capacity(1);
map.insert(1, 100, 1000).unwrap();
map.insert(2, 200, 1100).unwrap();
map.insert(3, 300, 1200).unwrap();
assert_eq!(map.len(), 3);
assert_eq!(map.capacity(), 4);
assert_eq!(map.get(&1), Some(&100));
assert_eq!(map.get(&2), Some(&200));
assert_eq!(map.get(&3), Some(&300));
}
}
#[test]
fn test_remove_basic() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 100, 1000).unwrap();
map.insert(2, 200, 1001).unwrap();
map.insert(3, 300, 1002).unwrap();
assert_eq!(map.len(), 3);
assert_eq!(map.remove(&2), Some(200));
assert_eq!(map.len(), 2);
assert!(!map.contains_key(&2));
assert_eq!(map.get(&2), None);
assert_eq!(map.get(&1), Some(&100));
assert_eq!(map.get(&3), Some(&300));
assert_eq!(map.remove(&99), None);
assert_eq!(map.len(), 2);
}
#[test]
fn test_remove_and_reinsert() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 100, 1000).unwrap();
assert_eq!(map.len(), 1);
let removed = map.remove(&1);
assert_eq!(removed, Some(100));
assert_eq!(map.len(), 0);
assert!(!map.contains_key(&1));
map.insert(1, 200, 2000).unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map.get(&1), Some(&200));
}
#[test]
fn test_remove_capacity_freed() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
for i in 0..8 {
map.insert(i, i as u32 * 10, 1000 + i as u64).unwrap();
}
assert!(map.is_full());
assert_eq!(map.remaining_capacity(), 0);
map.remove(&3);
assert!(!map.is_full());
assert_eq!(map.remaining_capacity(), 1);
assert_eq!(map.len(), 7);
map.insert(99, 990, 2000).unwrap();
assert_eq!(map.len(), 8);
assert!(map.contains_key(&99));
}
#[test]
fn test_remove_all_entries() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 100, 1000).unwrap();
map.insert(2, 200, 1001).unwrap();
map.insert(3, 300, 1002).unwrap();
assert_eq!(map.remove(&1), Some(100));
assert_eq!(map.remove(&2), Some(200));
assert_eq!(map.remove(&3), Some(300));
assert!(map.is_empty());
assert_eq!(map.len(), 0);
assert_eq!(map.remaining_capacity(), 8);
}
#[test]
fn test_remove_order_independence() {
let mut map1 = LWWMap::<u8, u32, DefaultConfig>::new(1);
let mut map2 = LWWMap::<u8, u32, DefaultConfig>::new(1);
for i in 1..=5 {
map1.insert(i, i as u32 * 10, 1000 + i as u64).unwrap();
map2.insert(i, i as u32 * 10, 1000 + i as u64).unwrap();
}
map1.remove(&2);
map1.remove(&4);
map2.remove(&4);
map2.remove(&2);
assert_eq!(map1.len(), map2.len());
assert_eq!(map1.get(&1), map2.get(&1));
assert_eq!(map1.get(&3), map2.get(&3));
assert_eq!(map1.get(&5), map2.get(&5));
assert!(!map1.contains_key(&2));
assert!(!map1.contains_key(&4));
assert!(!map2.contains_key(&2));
assert!(!map2.contains_key(&4));
}
#[test]
fn test_remove_with_custom_capacity() {
let mut map = LWWMap::<u8, u32, DefaultConfig, 4>::with_capacity(1);
for i in 1..=4 {
map.insert(i, i as u32 * 100, 1000 + i as u64).unwrap();
}
assert_eq!(map.len(), 4);
assert!(map.is_full());
assert_eq!(map.remove(&2), Some(200));
assert_eq!(map.len(), 3);
assert!(!map.is_full());
assert_eq!(map.get(&1), Some(&100));
assert_eq!(map.get(&3), Some(&300));
assert_eq!(map.get(&4), Some(&400));
assert!(!map.contains_key(&2));
}
#[test]
fn test_remove_empty_map() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
assert_eq!(map.remove(&1), None);
assert_eq!(map.len(), 0);
assert!(map.is_empty());
}
#[test]
fn test_remove_single_entry() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(42, 420, 1000).unwrap();
assert_eq!(map.len(), 1);
assert_eq!(map.remove(&42), Some(420));
assert_eq!(map.len(), 0);
assert!(map.is_empty());
assert!(!map.contains_key(&42));
}
#[test]
fn test_remove_preserves_order() {
let mut map = LWWMap::<u8, u32, DefaultConfig>::new(1);
map.insert(1, 10, 1000).unwrap();
map.insert(2, 20, 1001).unwrap();
map.insert(3, 30, 1002).unwrap();
map.insert(4, 40, 1003).unwrap();
map.remove(&2);
let mut entries = [(&0u8, &0u32); 4];
let mut count = 0;
for (k, v) in map.iter() {
entries[count] = (k, v);
count += 1;
}
entries[..count].sort_by_key(|&(k, _)| k);
assert_eq!(count, 3);
assert_eq!(entries[0], (&1, &10));
assert_eq!(entries[1], (&3, &30));
assert_eq!(entries[2], (&4, &40));
}
}