use crate::error::{CRDTError, CRDTResult};
use crate::memory::{MemoryConfig, NodeId};
use crate::traits::{BoundedCRDT, CRDT, RealTimeCRDT};
#[cfg(feature = "hardware-atomic")]
use core::sync::atomic::{AtomicU32, Ordering};
#[cfg(feature = "serde")]
use serde::{Deserialize, Deserializer, Serialize, Serializer};
#[derive(Debug)]
#[cfg_attr(feature = "aurix", repr(align(32)))] #[cfg_attr(feature = "stm32", repr(align(4)))] #[cfg_attr(feature = "cortex-m", repr(align(4)))] #[cfg_attr(feature = "riscv", repr(align(8)))] #[cfg_attr(
not(any(
feature = "aurix",
feature = "stm32",
feature = "cortex-m",
feature = "riscv"
)),
repr(align(4))
)] pub struct PNCounter<C: MemoryConfig, const CAPACITY: usize = 16> {
#[cfg(not(feature = "hardware-atomic"))]
positive: [u32; CAPACITY],
#[cfg(not(feature = "hardware-atomic"))]
negative: [u32; CAPACITY],
#[cfg(feature = "hardware-atomic")]
positive: [AtomicU32; CAPACITY],
#[cfg(feature = "hardware-atomic")]
negative: [AtomicU32; CAPACITY],
node_id: NodeId,
_phantom: core::marker::PhantomData<C>,
}
impl<C: MemoryConfig, const CAPACITY: usize> Clone for PNCounter<C, CAPACITY> {
fn clone(&self) -> Self {
#[cfg(not(feature = "hardware-atomic"))]
{
Self {
positive: self.positive,
negative: self.negative,
node_id: self.node_id,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "hardware-atomic")]
{
let new_positive = [const { AtomicU32::new(0) }; CAPACITY];
let new_negative = [const { AtomicU32::new(0) }; CAPACITY];
for i in 0..CAPACITY {
new_positive[i].store(self.positive[i].load(Ordering::Relaxed), Ordering::Relaxed);
new_negative[i].store(self.negative[i].load(Ordering::Relaxed), Ordering::Relaxed);
}
Self {
positive: new_positive,
negative: new_negative,
node_id: self.node_id,
_phantom: core::marker::PhantomData,
}
}
}
}
impl<C: MemoryConfig, const CAPACITY: usize> PNCounter<C, CAPACITY> {
pub fn with_capacity(node_id: NodeId) -> Self {
#[cfg(not(feature = "hardware-atomic"))]
{
Self {
positive: [0; CAPACITY],
negative: [0; CAPACITY],
node_id,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "hardware-atomic")]
{
Self {
positive: [const { AtomicU32::new(0) }; CAPACITY],
negative: [const { AtomicU32::new(0) }; CAPACITY],
node_id,
_phantom: core::marker::PhantomData,
}
}
}
}
impl<C: MemoryConfig> PNCounter<C, 16> {
pub fn new(node_id: NodeId) -> Self {
Self::with_capacity(node_id)
}
}
impl<C: MemoryConfig, const CAPACITY: usize> PNCounter<C, CAPACITY> {
#[cfg(not(feature = "hardware-atomic"))]
pub fn increment(&mut self, amount: u32) -> CRDTResult<()> {
if amount == 0 {
return Err(CRDTError::InvalidOperation);
}
let node_index = self.node_id as usize;
if node_index >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
if self.positive[node_index] > u32::MAX - amount {
return Err(CRDTError::BufferOverflow);
}
self.positive[node_index] += amount;
Ok(())
}
#[cfg(feature = "hardware-atomic")]
pub fn increment(&self, amount: u32) -> CRDTResult<()> {
if amount == 0 {
return Err(CRDTError::InvalidOperation);
}
let node_index = self.node_id as usize;
if node_index >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
let old_value = self.positive[node_index].fetch_add(amount, Ordering::Relaxed);
if old_value > u32::MAX - amount {
self.positive[node_index].fetch_sub(amount, Ordering::Relaxed);
return Err(CRDTError::BufferOverflow);
}
Ok(())
}
#[cfg(not(feature = "hardware-atomic"))]
pub fn decrement(&mut self, amount: u32) -> CRDTResult<()> {
if amount == 0 {
return Err(CRDTError::InvalidOperation);
}
let node_index = self.node_id as usize;
if node_index >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
if self.negative[node_index] > u32::MAX - amount {
return Err(CRDTError::BufferOverflow);
}
self.negative[node_index] += amount;
Ok(())
}
#[cfg(feature = "hardware-atomic")]
pub fn decrement(&self, amount: u32) -> CRDTResult<()> {
if amount == 0 {
return Err(CRDTError::InvalidOperation);
}
let node_index = self.node_id as usize;
if node_index >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
let old_value = self.negative[node_index].fetch_add(amount, Ordering::Relaxed);
if old_value > u32::MAX - amount {
self.negative[node_index].fetch_sub(amount, Ordering::Relaxed);
return Err(CRDTError::BufferOverflow);
}
Ok(())
}
#[cfg(not(feature = "hardware-atomic"))]
pub fn inc(&mut self) -> CRDTResult<()> {
self.increment(1)
}
#[cfg(feature = "hardware-atomic")]
pub fn inc(&self) -> CRDTResult<()> {
self.increment(1)
}
#[cfg(not(feature = "hardware-atomic"))]
pub fn dec(&mut self) -> CRDTResult<()> {
self.decrement(1)
}
#[cfg(feature = "hardware-atomic")]
pub fn dec(&self) -> CRDTResult<()> {
self.decrement(1)
}
pub fn value(&self) -> i64 {
#[cfg(not(feature = "hardware-atomic"))]
{
let positive_sum: u64 = self.positive.iter().map(|&x| x as u64).sum();
let negative_sum: u64 = self.negative.iter().map(|&x| x as u64).sum();
if positive_sum >= negative_sum {
(positive_sum - negative_sum) as i64
} else {
-((negative_sum - positive_sum) as i64)
}
}
#[cfg(feature = "hardware-atomic")]
{
let positive_sum: u64 = self
.positive
.iter()
.map(|atomic| atomic.load(Ordering::Relaxed) as u64)
.sum();
let negative_sum: u64 = self
.negative
.iter()
.map(|atomic| atomic.load(Ordering::Relaxed) as u64)
.sum();
if positive_sum >= negative_sum {
(positive_sum - negative_sum) as i64
} else {
-((negative_sum - positive_sum) as i64)
}
}
}
pub fn node_positive(&self, node_id: NodeId) -> u64 {
let node_index = node_id as usize;
if node_index < CAPACITY {
#[cfg(not(feature = "hardware-atomic"))]
{
self.positive[node_index] as u64
}
#[cfg(feature = "hardware-atomic")]
{
self.positive[node_index].load(Ordering::Relaxed) as u64
}
} else {
0
}
}
pub fn node_negative(&self, node_id: NodeId) -> u64 {
let node_index = node_id as usize;
if node_index < CAPACITY {
#[cfg(not(feature = "hardware-atomic"))]
{
self.negative[node_index] as u64
}
#[cfg(feature = "hardware-atomic")]
{
self.negative[node_index].load(Ordering::Relaxed) as u64
}
} else {
0
}
}
pub fn node_value(&self, node_id: NodeId) -> i64 {
let positive = self.node_positive(node_id) as i64;
let negative = self.node_negative(node_id) as i64;
positive - negative
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn capacity(&self) -> usize {
CAPACITY
}
pub fn positive_counters(&self) -> [u64; CAPACITY] {
let mut result = [0u64; CAPACITY];
#[cfg(not(feature = "hardware-atomic"))]
{
for i in 0..CAPACITY {
result[i] = self.positive[i] as u64;
}
}
#[cfg(feature = "hardware-atomic")]
{
for i in 0..CAPACITY {
result[i] = self.positive[i].load(Ordering::Relaxed) as u64;
}
}
result
}
pub fn negative_counters(&self) -> [u64; CAPACITY] {
let mut result = [0u64; CAPACITY];
#[cfg(not(feature = "hardware-atomic"))]
{
for i in 0..CAPACITY {
result[i] = self.negative[i] as u64;
}
}
#[cfg(feature = "hardware-atomic")]
{
for i in 0..CAPACITY {
result[i] = self.negative[i].load(Ordering::Relaxed) as u64;
}
}
result
}
pub fn is_empty(&self) -> bool {
#[cfg(not(feature = "hardware-atomic"))]
{
self.positive.iter().all(|&x| x == 0) && self.negative.iter().all(|&x| x == 0)
}
#[cfg(feature = "hardware-atomic")]
{
self.positive
.iter()
.all(|atomic| atomic.load(Ordering::Relaxed) == 0)
&& self
.negative
.iter()
.all(|atomic| atomic.load(Ordering::Relaxed) == 0)
}
}
pub fn active_nodes(&self) -> usize {
#[cfg(not(feature = "hardware-atomic"))]
{
let mut active = 0;
for i in 0..CAPACITY {
if self.positive[i] > 0 || self.negative[i] > 0 {
active += 1;
}
}
active
}
#[cfg(feature = "hardware-atomic")]
{
let mut active = 0;
for i in 0..CAPACITY {
if self.positive[i].load(Ordering::Relaxed) > 0
|| self.negative[i].load(Ordering::Relaxed) > 0
{
active += 1;
}
}
active
}
}
pub fn total_positive(&self) -> u64 {
#[cfg(not(feature = "hardware-atomic"))]
{
self.positive.iter().map(|&x| x as u64).sum()
}
#[cfg(feature = "hardware-atomic")]
{
self.positive
.iter()
.map(|atomic| atomic.load(Ordering::Relaxed) as u64)
.sum()
}
}
pub fn total_negative(&self) -> u64 {
#[cfg(not(feature = "hardware-atomic"))]
{
self.negative.iter().map(|&x| x as u64).sum()
}
#[cfg(feature = "hardware-atomic")]
{
self.negative
.iter()
.map(|atomic| atomic.load(Ordering::Relaxed) as u64)
.sum()
}
}
}
#[cfg(feature = "serde")]
impl<C: MemoryConfig> Serialize for PNCounter<C> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeStruct;
let mut state = serializer.serialize_struct("PNCounter", 3)?;
#[cfg(not(feature = "hardware-atomic"))]
{
state.serialize_field("positive", &self.positive[..])?;
state.serialize_field("negative", &self.negative[..])?;
}
#[cfg(feature = "hardware-atomic")]
{
let mut positive = [0u32; 16];
let mut negative = [0u32; 16];
for i in 0..16 {
positive[i] = self.positive[i].load(Ordering::Relaxed);
negative[i] = self.negative[i].load(Ordering::Relaxed);
}
state.serialize_field("positive", &positive[..])?;
state.serialize_field("negative", &negative[..])?;
}
state.serialize_field("node_id", &self.node_id)?;
state.end()
}
}
#[cfg(feature = "serde")]
impl<'de, C: MemoryConfig> Deserialize<'de> for PNCounter<C> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
use core::fmt;
use serde::de::{self, MapAccess, Visitor};
#[derive(Deserialize)]
#[serde(field_identifier, rename_all = "lowercase")]
enum Field {
Positive,
Negative,
NodeId,
}
struct PNCounterVisitor<C: MemoryConfig> {
_phantom: core::marker::PhantomData<C>,
}
impl<'de, C: MemoryConfig> Visitor<'de> for PNCounterVisitor<C> {
type Value = PNCounter<C>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct PNCounter")
}
fn visit_map<V>(self, mut map: V) -> Result<PNCounter<C>, V::Error>
where
V: MapAccess<'de>,
{
let mut positive = None;
let mut negative = None;
let mut node_id = None;
while let Some(key) = map.next_key()? {
match key {
Field::Positive => {
if positive.is_some() {
return Err(de::Error::duplicate_field("positive"));
}
use serde::de::SeqAccess;
struct ArrayDeserializer<const N: usize>;
impl<'de, const N: usize> serde::de::DeserializeSeed<'de> for ArrayDeserializer<N> {
type Value = [u32; N];
fn deserialize<D>(
self,
deserializer: D,
) -> Result<Self::Value, D::Error>
where
D: serde::de::Deserializer<'de>,
{
struct ArrayVisitor<const N: usize>;
impl<'de, const N: usize> serde::de::Visitor<'de> for ArrayVisitor<N> {
type Value = [u32; N];
fn expecting(
&self,
formatter: &mut core::fmt::Formatter,
) -> core::fmt::Result
{
write!(formatter, "an array of {} u32 values", N)
}
fn visit_seq<A>(
self,
mut seq: A,
) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut array = [0u32; N];
for i in 0..N {
if let Some(value) = seq.next_element()? {
array[i] = value;
} else {
return Err(serde::de::Error::invalid_length(
i, &self,
));
}
}
Ok(array)
}
}
deserializer.deserialize_seq(ArrayVisitor::<N>)
}
}
positive = Some(map.next_value_seed(ArrayDeserializer::<16>)?);
}
Field::Negative => {
if negative.is_some() {
return Err(de::Error::duplicate_field("negative"));
}
use serde::de::SeqAccess;
struct ArrayDeserializer<const N: usize>;
impl<'de, const N: usize> serde::de::DeserializeSeed<'de> for ArrayDeserializer<N> {
type Value = [u32; N];
fn deserialize<D>(
self,
deserializer: D,
) -> Result<Self::Value, D::Error>
where
D: serde::de::Deserializer<'de>,
{
struct ArrayVisitor<const N: usize>;
impl<'de, const N: usize> serde::de::Visitor<'de> for ArrayVisitor<N> {
type Value = [u32; N];
fn expecting(
&self,
formatter: &mut core::fmt::Formatter,
) -> core::fmt::Result
{
write!(formatter, "an array of {} u32 values", N)
}
fn visit_seq<A>(
self,
mut seq: A,
) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut array = [0u32; N];
for i in 0..N {
if let Some(value) = seq.next_element()? {
array[i] = value;
} else {
return Err(serde::de::Error::invalid_length(
i, &self,
));
}
}
Ok(array)
}
}
deserializer.deserialize_seq(ArrayVisitor::<N>)
}
}
negative = Some(map.next_value_seed(ArrayDeserializer::<16>)?);
}
Field::NodeId => {
if node_id.is_some() {
return Err(de::Error::duplicate_field("node_id"));
}
node_id = Some(map.next_value()?);
}
}
}
let positive = positive.ok_or_else(|| de::Error::missing_field("positive"))?;
let negative = negative.ok_or_else(|| de::Error::missing_field("negative"))?;
let node_id = node_id.ok_or_else(|| de::Error::missing_field("node_id"))?;
#[cfg(not(feature = "hardware-atomic"))]
{
Ok(PNCounter {
positive,
negative,
node_id,
_phantom: core::marker::PhantomData,
})
}
#[cfg(feature = "hardware-atomic")]
{
let atomic_positive = [const { AtomicU32::new(0) }; 16];
let atomic_negative = [const { AtomicU32::new(0) }; 16];
for i in 0..16 {
atomic_positive[i].store(positive[i], Ordering::Relaxed);
atomic_negative[i].store(negative[i], Ordering::Relaxed);
}
Ok(PNCounter {
positive: atomic_positive,
negative: atomic_negative,
node_id,
_phantom: core::marker::PhantomData,
})
}
}
}
const FIELDS: &[&str] = &["positive", "negative", "node_id"];
deserializer.deserialize_struct(
"PNCounter",
FIELDS,
PNCounterVisitor {
_phantom: core::marker::PhantomData,
},
)
}
}
impl<C: MemoryConfig, const CAPACITY: usize> CRDT<C> for PNCounter<C, CAPACITY> {
type Error = CRDTError;
fn merge(&mut self, other: &Self) -> CRDTResult<()> {
#[cfg(not(feature = "hardware-atomic"))]
{
for i in 0..CAPACITY {
self.positive[i] = self.positive[i].max(other.positive[i]);
self.negative[i] = self.negative[i].max(other.negative[i]);
}
}
#[cfg(feature = "hardware-atomic")]
{
for i in 0..CAPACITY {
let other_pos_value = other.positive[i].load(Ordering::Relaxed);
let mut current_pos = self.positive[i].load(Ordering::Relaxed);
while other_pos_value > current_pos {
match self.positive[i].compare_exchange_weak(
current_pos,
other_pos_value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_pos = actual,
}
}
let other_neg_value = other.negative[i].load(Ordering::Relaxed);
let mut current_neg = self.negative[i].load(Ordering::Relaxed);
while other_neg_value > current_neg {
match self.negative[i].compare_exchange_weak(
current_neg,
other_neg_value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_neg = actual,
}
}
}
}
Ok(())
}
fn eq(&self, other: &Self) -> bool {
#[cfg(not(feature = "hardware-atomic"))]
{
self.positive == other.positive && self.negative == other.negative
}
#[cfg(feature = "hardware-atomic")]
{
for i in 0..CAPACITY {
if self.positive[i].load(Ordering::Relaxed)
!= other.positive[i].load(Ordering::Relaxed)
|| self.negative[i].load(Ordering::Relaxed)
!= other.negative[i].load(Ordering::Relaxed)
{
return false;
}
}
true
}
}
fn size_bytes(&self) -> usize {
core::mem::size_of::<Self>()
}
fn validate(&self) -> CRDTResult<()> {
if self.node_id as usize >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
if self.node_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
Ok(())
}
fn state_hash(&self) -> u32 {
let mut hash = 0u32;
#[cfg(not(feature = "hardware-atomic"))]
{
for (i, (&pos, &neg)) in self.positive.iter().zip(self.negative.iter()).enumerate() {
if pos > 0 || neg > 0 {
hash ^= pos ^ (neg << 8) ^ ((i as u32) << 16);
}
}
}
#[cfg(feature = "hardware-atomic")]
{
for (i, (pos_atomic, neg_atomic)) in
self.positive.iter().zip(self.negative.iter()).enumerate()
{
let pos = pos_atomic.load(Ordering::Relaxed);
let neg = neg_atomic.load(Ordering::Relaxed);
if pos > 0 || neg > 0 {
hash ^= pos ^ (neg << 8) ^ ((i as u32) << 16);
}
}
}
hash
}
fn can_merge(&self, _other: &Self) -> bool {
true
}
}
impl<C: MemoryConfig, const CAPACITY: usize> BoundedCRDT<C> for PNCounter<C, CAPACITY> {
const MAX_SIZE_BYTES: usize = core::mem::size_of::<Self>();
const MAX_ELEMENTS: usize = CAPACITY;
fn memory_usage(&self) -> usize {
core::mem::size_of::<Self>()
}
fn element_count(&self) -> usize {
self.active_nodes()
}
fn compact(&mut self) -> CRDTResult<usize> {
Ok(0)
}
fn can_add_element(&self) -> bool {
self.element_count() < Self::MAX_ELEMENTS
}
}
impl<C: MemoryConfig, const CAPACITY: usize> RealTimeCRDT<C> for PNCounter<C, CAPACITY> {
const MAX_MERGE_CYCLES: u32 = 100; const MAX_VALIDATE_CYCLES: u32 = 30;
const MAX_SERIALIZE_CYCLES: u32 = 150;
fn merge_bounded(&mut self, other: &Self) -> CRDTResult<()> {
self.merge(other)
}
fn validate_bounded(&self) -> CRDTResult<()> {
self.validate()
}
fn remaining_budget(&self) -> Option<u32> {
None
}
fn set_budget(&mut self, _cycles: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::DefaultConfig;
#[test]
fn test_new_counter() {
let counter = PNCounter::<DefaultConfig>::new(1);
assert_eq!(counter.value(), 0);
assert_eq!(counter.node_id(), 1);
assert!(counter.is_empty());
assert_eq!(counter.active_nodes(), 0);
assert_eq!(counter.total_positive(), 0);
assert_eq!(counter.total_negative(), 0);
}
#[test]
fn test_increment() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
assert!(counter.increment(5).is_ok());
assert_eq!(counter.value(), 5);
assert_eq!(counter.node_positive(1), 5);
assert_eq!(counter.node_negative(1), 0);
assert_eq!(counter.node_value(1), 5);
assert!(!counter.is_empty());
assert_eq!(counter.active_nodes(), 1);
}
#[test]
fn test_decrement() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
assert!(counter.decrement(3).is_ok());
assert_eq!(counter.value(), -3);
assert_eq!(counter.node_positive(1), 0);
assert_eq!(counter.node_negative(1), 3);
assert_eq!(counter.node_value(1), -3);
assert!(!counter.is_empty());
assert_eq!(counter.active_nodes(), 1);
}
#[test]
fn test_inc_dec() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
assert!(counter.inc().is_ok());
assert_eq!(counter.value(), 1);
assert!(counter.dec().is_ok());
assert_eq!(counter.value(), 0);
assert!(counter.dec().is_ok());
assert_eq!(counter.value(), -1);
}
#[test]
fn test_mixed_operations() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
counter.increment(10).unwrap();
counter.decrement(3).unwrap();
counter.increment(2).unwrap();
counter.decrement(1).unwrap();
assert_eq!(counter.value(), 8); assert_eq!(counter.node_positive(1), 12);
assert_eq!(counter.node_negative(1), 4);
assert_eq!(counter.total_positive(), 12);
assert_eq!(counter.total_negative(), 4);
}
#[test]
fn test_invalid_operations() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
assert!(counter.increment(0).is_err());
assert!(counter.decrement(0).is_err());
assert_eq!(counter.value(), 0);
}
#[test]
fn test_overflow_protection() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
#[cfg(not(feature = "hardware-atomic"))]
{
counter.positive[1] = u32::MAX - 1;
}
#[cfg(feature = "hardware-atomic")]
{
counter.positive[1].store(u32::MAX - 1, Ordering::Relaxed);
}
assert!(counter.increment(1).is_ok());
assert_eq!(counter.node_positive(1), u32::MAX as u64);
assert!(counter.increment(1).is_err());
#[cfg(not(feature = "hardware-atomic"))]
{
counter.negative[1] = u32::MAX - 1;
}
#[cfg(feature = "hardware-atomic")]
{
counter.negative[1].store(u32::MAX - 1, Ordering::Relaxed);
}
assert!(counter.decrement(1).is_ok());
assert_eq!(counter.node_negative(1), u32::MAX as u64);
assert!(counter.decrement(1).is_err());
}
#[test]
fn test_merge() {
let mut counter1 = PNCounter::<DefaultConfig>::new(1);
let mut counter2 = PNCounter::<DefaultConfig>::new(2);
counter1.increment(10).unwrap();
counter1.decrement(2).unwrap();
counter2.increment(5).unwrap();
counter2.decrement(1).unwrap();
assert_eq!(counter1.value(), 8); assert_eq!(counter2.value(), 4);
counter1.merge(&counter2).unwrap();
assert_eq!(counter1.value(), 12); assert_eq!(counter1.node_positive(1), 10);
assert_eq!(counter1.node_negative(1), 2);
assert_eq!(counter1.node_positive(2), 5);
assert_eq!(counter1.node_negative(2), 1);
assert_eq!(counter1.active_nodes(), 2);
}
#[test]
fn test_merge_with_overlap() {
let mut counter1 = PNCounter::<DefaultConfig>::new(1);
let mut counter2 = PNCounter::<DefaultConfig>::new(1);
counter1.increment(10).unwrap();
counter1.decrement(2).unwrap();
counter2.increment(5).unwrap(); counter2.decrement(3).unwrap();
counter1.merge(&counter2).unwrap();
assert_eq!(counter1.value(), 7); assert_eq!(counter1.node_positive(1), 10); assert_eq!(counter1.node_negative(1), 3); }
#[test]
fn test_merge_commutative() {
let mut counter1a = PNCounter::<DefaultConfig>::new(1);
let mut counter1b = PNCounter::<DefaultConfig>::new(1);
let mut counter2 = PNCounter::<DefaultConfig>::new(2);
let mut counter3 = PNCounter::<DefaultConfig>::new(3);
counter1a.increment(10).unwrap();
counter1a.decrement(1).unwrap();
counter1b.increment(10).unwrap();
counter1b.decrement(1).unwrap();
counter2.increment(5).unwrap();
counter2.decrement(2).unwrap();
counter3.increment(3).unwrap();
counter3.decrement(1).unwrap();
counter1a.merge(&counter2).unwrap();
counter1a.merge(&counter3).unwrap();
counter1b.merge(&counter3).unwrap();
counter1b.merge(&counter2).unwrap();
assert_eq!(counter1a.value(), counter1b.value());
assert!(counter1a.eq(&counter1b));
}
#[test]
fn test_bounded_crdt() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
counter.increment(5).unwrap();
counter.decrement(2).unwrap();
assert_eq!(counter.element_count(), 1); assert!(counter.memory_usage() > 0);
assert!(counter.can_add_element());
let mut other = PNCounter::<DefaultConfig>::new(2);
other.increment(3).unwrap();
counter.merge(&other).unwrap();
assert_eq!(counter.element_count(), 2); }
#[test]
fn test_validation() {
let counter = PNCounter::<DefaultConfig>::new(1);
assert!(counter.validate().is_ok());
let invalid_counter = PNCounter::<DefaultConfig>::new(255);
assert!(invalid_counter.validate().is_err());
}
#[test]
fn test_real_time_crdt() {
let mut counter1 = PNCounter::<DefaultConfig>::new(1);
let counter2 = PNCounter::<DefaultConfig>::new(2);
assert!(counter1.merge_bounded(&counter2).is_ok());
assert!(counter1.validate_bounded().is_ok());
}
#[test]
fn test_state_hash() {
let mut counter1 = PNCounter::<DefaultConfig>::new(1);
let mut counter2 = PNCounter::<DefaultConfig>::new(1);
assert_eq!(counter1.state_hash(), counter2.state_hash());
counter1.increment(5).unwrap();
assert_ne!(counter1.state_hash(), counter2.state_hash());
counter2.increment(5).unwrap();
assert_eq!(counter1.state_hash(), counter2.state_hash());
}
#[test]
fn test_negative_values() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
counter.decrement(10).unwrap();
assert_eq!(counter.value(), -10);
counter.increment(3).unwrap();
assert_eq!(counter.value(), -7);
counter.increment(10).unwrap();
assert_eq!(counter.value(), 3);
}
#[test]
fn test_with_capacity() {
let counter = PNCounter::<DefaultConfig, 32>::with_capacity(1);
assert_eq!(counter.value(), 0);
assert_eq!(counter.node_id(), 1);
assert_eq!(counter.capacity(), 32);
assert!(counter.is_empty());
assert_eq!(counter.active_nodes(), 0);
}
#[test]
fn test_custom_capacity_operations() {
let mut counter = PNCounter::<DefaultConfig, 8>::with_capacity(3);
assert!(counter.increment(10).is_ok());
assert!(counter.decrement(3).is_ok());
assert_eq!(counter.value(), 7);
assert_eq!(counter.node_positive(3), 10);
assert_eq!(counter.node_negative(3), 3);
assert_eq!(counter.capacity(), 8);
let mut invalid_counter = PNCounter::<DefaultConfig, 4>::with_capacity(5);
assert!(invalid_counter.increment(1).is_err()); assert!(invalid_counter.decrement(1).is_err()); }
#[test]
fn test_capacity_merge() {
let mut counter1 = PNCounter::<DefaultConfig, 8>::with_capacity(1);
let mut counter2 = PNCounter::<DefaultConfig, 8>::with_capacity(2);
counter1.increment(5).unwrap();
counter1.decrement(1).unwrap();
counter2.increment(3).unwrap();
counter2.decrement(2).unwrap();
counter1.merge(&counter2).unwrap();
assert_eq!(counter1.value(), 5); assert_eq!(counter1.node_positive(1), 5);
assert_eq!(counter1.node_negative(1), 1);
assert_eq!(counter1.node_positive(2), 3);
assert_eq!(counter1.node_negative(2), 2);
}
#[cfg(all(test, feature = "serde"))]
mod serde_tests {
use super::*;
#[test]
fn test_serialize_deserialize() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
counter.increment(10).unwrap();
counter.decrement(3).unwrap();
let mut other = PNCounter::<DefaultConfig>::new(2);
other.increment(5).unwrap();
other.decrement(1).unwrap();
counter.merge(&other).unwrap();
assert_eq!(counter.value(), 11); assert_eq!(counter.node_positive(1), 10);
assert_eq!(counter.node_negative(1), 3);
assert_eq!(counter.node_positive(2), 5);
assert_eq!(counter.node_negative(2), 1);
}
#[test]
fn test_atomic_vs_standard_compatibility() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
counter.increment(42).unwrap();
counter.decrement(7).unwrap();
assert_eq!(counter.value(), 35);
assert_eq!(counter.node_positive(1), 42);
assert_eq!(counter.node_negative(1), 7);
}
#[test]
fn test_negative_value_serialization() {
let mut counter = PNCounter::<DefaultConfig>::new(1);
counter.decrement(100).unwrap();
counter.increment(30).unwrap();
assert_eq!(counter.value(), -70);
assert_eq!(counter.node_positive(1), 30);
assert_eq!(counter.node_negative(1), 100);
}
#[test]
fn test_custom_capacity_serialization() {
let mut counter = PNCounter::<DefaultConfig, 8>::with_capacity(1);
counter.increment(100).unwrap();
counter.decrement(25).unwrap();
assert_eq!(counter.capacity(), 8);
assert_eq!(counter.value(), 75);
assert_eq!(counter.node_positive(1), 100);
assert_eq!(counter.node_negative(1), 25);
}
}
}