use crate::error::{CRDTError, CRDTResult};
use crate::memory::{MemoryConfig, NodeId};
use crate::traits::{BoundedCRDT, CRDT, RealTimeCRDT};
#[cfg(feature = "hardware-atomic")]
use core::sync::atomic::{AtomicU32, Ordering};
#[cfg(feature = "serde")]
use serde::{Deserialize, Deserializer, Serialize, Serializer};
#[derive(Debug)]
#[cfg_attr(feature = "aurix", repr(align(32)))] #[cfg_attr(feature = "stm32", repr(align(4)))] #[cfg_attr(feature = "cortex-m", repr(align(4)))] #[cfg_attr(feature = "riscv", repr(align(8)))] #[cfg_attr(
not(any(
feature = "aurix",
feature = "stm32",
feature = "cortex-m",
feature = "riscv"
)),
repr(align(4))
)] pub struct GCounter<C: MemoryConfig, const CAPACITY: usize = 16> {
#[cfg(not(feature = "hardware-atomic"))]
counters: [u32; CAPACITY],
#[cfg(feature = "hardware-atomic")]
counters: [AtomicU32; CAPACITY],
node_id: NodeId,
_phantom: core::marker::PhantomData<C>,
}
impl<C: MemoryConfig, const CAPACITY: usize> Clone for GCounter<C, CAPACITY> {
fn clone(&self) -> Self {
#[cfg(not(feature = "hardware-atomic"))]
{
Self {
counters: self.counters,
node_id: self.node_id,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "hardware-atomic")]
{
let new_counters = [const { AtomicU32::new(0) }; CAPACITY];
for i in 0..CAPACITY {
new_counters[i].store(self.counters[i].load(Ordering::Relaxed), Ordering::Relaxed);
}
Self {
counters: new_counters,
node_id: self.node_id,
_phantom: core::marker::PhantomData,
}
}
}
}
impl<C: MemoryConfig, const CAPACITY: usize> GCounter<C, CAPACITY> {
pub fn with_capacity(node_id: NodeId) -> Self {
#[cfg(not(feature = "hardware-atomic"))]
{
Self {
counters: [0; CAPACITY],
node_id,
_phantom: core::marker::PhantomData,
}
}
#[cfg(feature = "hardware-atomic")]
{
Self {
counters: [const { AtomicU32::new(0) }; CAPACITY],
node_id,
_phantom: core::marker::PhantomData,
}
}
}
}
impl<C: MemoryConfig> GCounter<C, 16> {
pub fn new(node_id: NodeId) -> Self {
Self::with_capacity(node_id)
}
}
impl<C: MemoryConfig, const CAPACITY: usize> GCounter<C, CAPACITY> {
#[cfg(not(feature = "hardware-atomic"))]
pub fn increment(&mut self, amount: u32) -> CRDTResult<()> {
if amount == 0 {
return Err(CRDTError::InvalidOperation);
}
let node_index = self.node_id as usize;
if node_index >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
if self.counters[node_index] > u32::MAX - amount {
return Err(CRDTError::BufferOverflow);
}
self.counters[node_index] += amount;
Ok(())
}
#[cfg(feature = "hardware-atomic")]
pub fn increment(&self, amount: u32) -> CRDTResult<()> {
if amount == 0 {
return Err(CRDTError::InvalidOperation);
}
let node_index = self.node_id as usize;
if node_index >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
let old_value = self.counters[node_index].fetch_add(amount, Ordering::Relaxed);
if old_value > u32::MAX - amount {
self.counters[node_index].fetch_sub(amount, Ordering::Relaxed);
return Err(CRDTError::BufferOverflow);
}
Ok(())
}
#[cfg(not(feature = "hardware-atomic"))]
pub fn inc(&mut self) -> CRDTResult<()> {
self.increment(1)
}
#[cfg(feature = "hardware-atomic")]
pub fn inc(&self) -> CRDTResult<()> {
self.increment(1)
}
pub fn value(&self) -> u64 {
#[cfg(not(feature = "hardware-atomic"))]
{
self.counters.iter().map(|&x| x as u64).sum()
}
#[cfg(feature = "hardware-atomic")]
{
self.counters
.iter()
.map(|atomic| atomic.load(Ordering::Relaxed) as u64)
.sum()
}
}
pub fn node_value(&self, node_id: NodeId) -> u64 {
let node_index = node_id as usize;
if node_index < CAPACITY {
#[cfg(not(feature = "hardware-atomic"))]
{
self.counters[node_index] as u64
}
#[cfg(feature = "hardware-atomic")]
{
self.counters[node_index].load(Ordering::Relaxed) as u64
}
} else {
0
}
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn capacity(&self) -> usize {
CAPACITY
}
pub fn is_empty(&self) -> bool {
#[cfg(not(feature = "hardware-atomic"))]
{
self.counters.iter().all(|&x| x == 0)
}
#[cfg(feature = "hardware-atomic")]
{
self.counters
.iter()
.all(|atomic| atomic.load(Ordering::Relaxed) == 0)
}
}
pub fn active_nodes(&self) -> usize {
#[cfg(not(feature = "hardware-atomic"))]
{
self.counters.iter().filter(|&&x| x > 0).count()
}
#[cfg(feature = "hardware-atomic")]
{
self.counters
.iter()
.filter(|atomic| atomic.load(Ordering::Relaxed) > 0)
.count()
}
}
}
#[cfg(feature = "serde")]
impl<C: MemoryConfig, const CAPACITY: usize> Serialize for GCounter<C, CAPACITY> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeStruct;
let mut state = serializer.serialize_struct("GCounter", 2)?;
#[cfg(not(feature = "hardware-atomic"))]
{
state.serialize_field("counters", &self.counters[..])?;
}
#[cfg(feature = "hardware-atomic")]
{
let mut counters = [0u32; CAPACITY];
for i in 0..CAPACITY {
counters[i] = self.counters[i].load(Ordering::Relaxed);
}
state.serialize_field("counters", &counters[..])?;
}
state.serialize_field("node_id", &self.node_id)?;
state.end()
}
}
#[cfg(feature = "serde")]
impl<'de, C: MemoryConfig, const CAPACITY: usize> Deserialize<'de> for GCounter<C, CAPACITY> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
use core::fmt;
use serde::de::{self, MapAccess, Visitor};
#[derive(Deserialize)]
#[serde(field_identifier, rename_all = "lowercase")]
enum Field {
Counters,
NodeId,
}
struct GCounterVisitor<C: MemoryConfig, const CAPACITY: usize> {
_phantom: core::marker::PhantomData<C>,
}
impl<'de, C: MemoryConfig, const CAPACITY: usize> Visitor<'de> for GCounterVisitor<C, CAPACITY> {
type Value = GCounter<C, CAPACITY>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct GCounter")
}
fn visit_map<V>(self, mut map: V) -> Result<GCounter<C, CAPACITY>, V::Error>
where
V: MapAccess<'de>,
{
let mut counters = None;
let mut node_id = None;
while let Some(key) = map.next_key()? {
match key {
Field::Counters => {
if counters.is_some() {
return Err(de::Error::duplicate_field("counters"));
}
use serde::de::SeqAccess;
struct ArrayDeserializer<const N: usize>;
impl<'de, const N: usize> serde::de::DeserializeSeed<'de> for ArrayDeserializer<N> {
type Value = [u32; N];
fn deserialize<D>(
self,
deserializer: D,
) -> Result<Self::Value, D::Error>
where
D: serde::de::Deserializer<'de>,
{
struct ArrayVisitor<const N: usize>;
impl<'de, const N: usize> serde::de::Visitor<'de> for ArrayVisitor<N> {
type Value = [u32; N];
fn expecting(
&self,
formatter: &mut core::fmt::Formatter,
) -> core::fmt::Result
{
write!(formatter, "an array of {} u32 values", N)
}
fn visit_seq<A>(
self,
mut seq: A,
) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut array = [0u32; N];
for i in 0..N {
if let Some(value) = seq.next_element()? {
array[i] = value;
} else {
return Err(serde::de::Error::invalid_length(
i, &self,
));
}
}
Ok(array)
}
}
deserializer.deserialize_seq(ArrayVisitor::<N>)
}
}
counters = Some(map.next_value_seed(ArrayDeserializer::<CAPACITY>)?);
}
Field::NodeId => {
if node_id.is_some() {
return Err(de::Error::duplicate_field("node_id"));
}
node_id = Some(map.next_value()?);
}
}
}
let counters = counters.ok_or_else(|| de::Error::missing_field("counters"))?;
let node_id = node_id.ok_or_else(|| de::Error::missing_field("node_id"))?;
#[cfg(not(feature = "hardware-atomic"))]
{
Ok(GCounter {
counters,
node_id,
_phantom: core::marker::PhantomData,
})
}
#[cfg(feature = "hardware-atomic")]
{
let atomic_counters = [const { AtomicU32::new(0) }; CAPACITY];
for i in 0..CAPACITY {
atomic_counters[i].store(counters[i], Ordering::Relaxed);
}
Ok(GCounter {
counters: atomic_counters,
node_id,
_phantom: core::marker::PhantomData,
})
}
}
}
const FIELDS: &[&str] = &["counters", "node_id"];
deserializer.deserialize_struct(
"GCounter",
FIELDS,
GCounterVisitor {
_phantom: core::marker::PhantomData,
},
)
}
}
impl<C: MemoryConfig, const CAPACITY: usize> CRDT<C> for GCounter<C, CAPACITY> {
type Error = CRDTError;
fn merge(&mut self, other: &Self) -> CRDTResult<()> {
#[cfg(not(feature = "hardware-atomic"))]
{
for i in 0..CAPACITY {
self.counters[i] = self.counters[i].max(other.counters[i]);
}
}
#[cfg(feature = "hardware-atomic")]
{
for i in 0..CAPACITY {
let other_value = other.counters[i].load(Ordering::Relaxed);
let mut current = self.counters[i].load(Ordering::Relaxed);
while other_value > current {
match self.counters[i].compare_exchange_weak(
current,
other_value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}
Ok(())
}
fn eq(&self, other: &Self) -> bool {
#[cfg(not(feature = "hardware-atomic"))]
{
self.counters == other.counters
}
#[cfg(feature = "hardware-atomic")]
{
for i in 0..CAPACITY {
if self.counters[i].load(Ordering::Relaxed)
!= other.counters[i].load(Ordering::Relaxed)
{
return false;
}
}
true
}
}
fn size_bytes(&self) -> usize {
core::mem::size_of::<Self>()
}
fn validate(&self) -> CRDTResult<()> {
if self.node_id as usize >= CAPACITY {
return Err(CRDTError::InvalidNodeId);
}
if self.node_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
#[cfg(feature = "aurix")]
{
if self.node_id >= crate::platform::constants::MAX_CORES {
return Err(CRDTError::InvalidNodeId);
}
}
#[cfg(feature = "stm32")]
{
if self.active_nodes() > crate::platform::validation::MAX_ACTIVE_NODES {
return Err(CRDTError::ConfigurationExceeded);
}
}
#[cfg(feature = "cortex-m")]
{
if self.memory_usage() > crate::platform::validation::MAX_MEMORY_USAGE {
return Err(CRDTError::BufferOverflow);
}
}
#[cfg(feature = "riscv")]
{
if self.active_nodes() > crate::platform::validation::MAX_ACTIVE_NODES {
return Err(CRDTError::ConfigurationExceeded);
}
}
Ok(())
}
fn state_hash(&self) -> u32 {
let mut hash = 0u32;
#[cfg(not(feature = "hardware-atomic"))]
{
for (i, &value) in self.counters.iter().enumerate() {
if value > 0 {
hash ^= value ^ ((i as u32) << 16);
}
}
}
#[cfg(feature = "hardware-atomic")]
{
for (i, atomic) in self.counters.iter().enumerate() {
let value = atomic.load(Ordering::Relaxed);
if value > 0 {
hash ^= value ^ ((i as u32) << 16);
}
}
}
hash
}
fn can_merge(&self, _other: &Self) -> bool {
true
}
}
impl<C: MemoryConfig, const CAPACITY: usize> BoundedCRDT<C> for GCounter<C, CAPACITY> {
const MAX_SIZE_BYTES: usize = core::mem::size_of::<Self>();
const MAX_ELEMENTS: usize = CAPACITY;
fn memory_usage(&self) -> usize {
core::mem::size_of::<Self>()
}
fn element_count(&self) -> usize {
self.active_nodes()
}
fn compact(&mut self) -> CRDTResult<usize> {
Ok(0)
}
fn can_add_element(&self) -> bool {
self.element_count() < Self::MAX_ELEMENTS
}
}
impl<C: MemoryConfig, const CAPACITY: usize> RealTimeCRDT<C> for GCounter<C, CAPACITY> {
const MAX_MERGE_CYCLES: u32 = crate::platform::constants::MAX_MERGE_CYCLES / 10; const MAX_VALIDATE_CYCLES: u32 = crate::platform::constants::MAX_MERGE_CYCLES / 25; const MAX_SERIALIZE_CYCLES: u32 = crate::platform::constants::MAX_MERGE_CYCLES / 5;
fn merge_bounded(&mut self, other: &Self) -> CRDTResult<()> {
self.merge(other)
}
fn validate_bounded(&self) -> CRDTResult<()> {
self.validate()
}
fn remaining_budget(&self) -> Option<u32> {
None
}
fn set_budget(&mut self, _cycles: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::DefaultConfig;
use crate::traits::CRDT;
#[test]
fn test_new_counter() {
let counter = GCounter::<DefaultConfig>::new(1);
assert_eq!(counter.value(), 0);
assert_eq!(counter.node_id(), 1);
assert!(counter.is_empty());
assert_eq!(counter.active_nodes(), 0);
}
#[test]
fn test_increment() {
let mut counter = GCounter::<DefaultConfig>::new(1);
assert!(counter.increment(5).is_ok());
assert_eq!(counter.value(), 5);
assert_eq!(counter.node_value(1), 5);
assert_eq!(counter.node_value(2), 0);
assert!(!counter.is_empty());
assert_eq!(counter.active_nodes(), 1);
}
#[test]
fn test_inc() {
let mut counter = GCounter::<DefaultConfig>::new(1);
assert!(counter.inc().is_ok());
assert_eq!(counter.value(), 1);
assert!(counter.inc().is_ok());
assert_eq!(counter.value(), 2);
}
#[test]
fn test_invalid_increment() {
let mut counter = GCounter::<DefaultConfig>::new(1);
assert!(counter.increment(0).is_err());
assert_eq!(counter.value(), 0);
}
#[test]
fn test_overflow_protection() {
let mut counter = GCounter::<DefaultConfig>::new(1);
#[cfg(not(feature = "hardware-atomic"))]
{
counter.counters[1] = u32::MAX - 1;
}
#[cfg(feature = "hardware-atomic")]
{
counter.counters[1].store(u32::MAX - 1, Ordering::Relaxed);
}
assert!(counter.increment(1).is_ok());
assert_eq!(counter.node_value(1), u32::MAX as u64);
assert!(counter.increment(1).is_err());
}
#[test]
fn test_merge() {
let mut counter1 = GCounter::<DefaultConfig>::new(1);
let mut counter2 = GCounter::<DefaultConfig>::new(2);
counter1.increment(10).unwrap();
counter2.increment(5).unwrap();
assert_eq!(counter1.value(), 10);
assert_eq!(counter2.value(), 5);
counter1.merge(&counter2).unwrap();
assert_eq!(counter1.value(), 15); assert_eq!(counter1.node_value(1), 10);
assert_eq!(counter1.node_value(2), 5);
assert_eq!(counter1.active_nodes(), 2);
}
#[test]
fn test_merge_with_overlap() {
let mut counter1 = GCounter::<DefaultConfig>::new(1);
let mut counter2 = GCounter::<DefaultConfig>::new(1);
counter1.increment(10).unwrap();
counter2.increment(5).unwrap();
counter1.merge(&counter2).unwrap();
assert_eq!(counter1.value(), 10); assert_eq!(counter1.node_value(1), 10);
}
#[test]
fn test_merge_idempotent() {
let mut counter1 = GCounter::<DefaultConfig>::new(1);
let counter2 = GCounter::<DefaultConfig>::new(2);
counter1.increment(10).unwrap();
counter1.merge(&counter2).unwrap();
let value1 = counter1.value();
counter1.merge(&counter2).unwrap();
let value2 = counter1.value();
assert_eq!(value1, value2);
}
#[test]
fn test_merge_commutative() {
let mut counter1a = GCounter::<DefaultConfig>::new(1);
let mut counter1b = GCounter::<DefaultConfig>::new(1);
let mut counter2 = GCounter::<DefaultConfig>::new(2);
let mut counter3 = GCounter::<DefaultConfig>::new(3);
counter1a.increment(10).unwrap();
counter1b.increment(10).unwrap();
counter2.increment(5).unwrap();
counter3.increment(3).unwrap();
counter1a.merge(&counter2).unwrap();
counter1a.merge(&counter3).unwrap();
counter1b.merge(&counter3).unwrap();
counter1b.merge(&counter2).unwrap();
assert_eq!(counter1a.value(), counter1b.value());
assert!(counter1a.eq(&counter1b));
}
#[test]
fn test_bounded_crdt() {
let mut counter = GCounter::<DefaultConfig>::new(1);
counter.increment(5).unwrap();
assert_eq!(counter.element_count(), 1); assert!(counter.memory_usage() > 0);
assert!(counter.can_add_element());
let mut other = GCounter::<DefaultConfig>::new(2);
other.increment(3).unwrap();
counter.merge(&other).unwrap();
assert_eq!(counter.element_count(), 2); }
#[test]
fn test_validation() {
let counter = GCounter::<DefaultConfig>::new(1);
assert!(counter.validate().is_ok());
let invalid_counter = GCounter::<DefaultConfig>::new(255);
assert!(invalid_counter.validate().is_err());
}
#[test]
fn test_real_time_crdt() {
let mut counter1 = GCounter::<DefaultConfig>::new(1);
let counter2 = GCounter::<DefaultConfig>::new(2);
assert!(counter1.merge_bounded(&counter2).is_ok());
assert!(counter1.validate_bounded().is_ok());
}
#[test]
fn test_state_hash() {
let mut counter1 = GCounter::<DefaultConfig>::new(1);
let mut counter2 = GCounter::<DefaultConfig>::new(1);
assert_eq!(counter1.state_hash(), counter2.state_hash());
counter1.increment(5).unwrap();
assert_ne!(counter1.state_hash(), counter2.state_hash());
counter2.increment(5).unwrap();
assert_eq!(counter1.state_hash(), counter2.state_hash());
}
#[test]
fn test_with_capacity() {
let counter = GCounter::<DefaultConfig, 32>::with_capacity(1);
assert_eq!(counter.value(), 0);
assert_eq!(counter.node_id(), 1);
assert_eq!(counter.capacity(), 32);
assert!(counter.is_empty());
assert_eq!(counter.active_nodes(), 0);
}
#[test]
fn test_custom_capacity_operations() {
let mut counter = GCounter::<DefaultConfig, 8>::with_capacity(3);
assert!(counter.increment(10).is_ok());
assert_eq!(counter.value(), 10);
assert_eq!(counter.node_value(3), 10);
assert_eq!(counter.capacity(), 8);
let mut invalid_counter = GCounter::<DefaultConfig, 4>::with_capacity(5);
assert!(invalid_counter.increment(1).is_err()); }
#[test]
fn test_capacity_merge() {
let mut counter1 = GCounter::<DefaultConfig, 8>::with_capacity(1);
let mut counter2 = GCounter::<DefaultConfig, 8>::with_capacity(2);
counter1.increment(5).unwrap();
counter2.increment(3).unwrap();
counter1.merge(&counter2).unwrap();
assert_eq!(counter1.value(), 8);
assert_eq!(counter1.node_value(1), 5);
assert_eq!(counter1.node_value(2), 3);
}
#[cfg(all(test, feature = "serde"))]
mod serde_tests {
use super::*;
#[test]
fn test_serialize_deserialize() {
let mut counter = GCounter::<DefaultConfig>::new(1);
counter.increment(10).unwrap();
let mut other = GCounter::<DefaultConfig>::new(2);
other.increment(5).unwrap();
counter.merge(&other).unwrap();
assert_eq!(counter.value(), 15);
assert_eq!(counter.node_value(1), 10);
assert_eq!(counter.node_value(2), 5);
}
#[test]
fn test_atomic_vs_standard_compatibility() {
let mut counter = GCounter::<DefaultConfig>::new(1);
counter.increment(42).unwrap();
assert_eq!(counter.value(), 42);
assert_eq!(counter.node_value(1), 42);
}
#[test]
fn test_custom_capacity_serialization() {
let mut counter = GCounter::<DefaultConfig, 8>::with_capacity(3);
counter.increment(100).unwrap();
assert_eq!(counter.capacity(), 8);
assert_eq!(counter.value(), 100);
assert_eq!(counter.node_value(3), 100);
}
}
}