use crate::clock::CompactTimestamp;
use crate::error::{CRDTError, CRDTResult};
use crate::memory::{MemoryConfig, NodeId};
use crate::traits::{BoundedCRDT, CRDT, RealTimeCRDT};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum SignalType {
Start = 1,
Stop = 2,
Help = 3,
Complete = 4,
Warning = 5,
Emergency = 6,
Formation = 7,
Rendezvous = 8,
}
impl SignalType {
pub fn is_critical(&self) -> bool {
matches!(
self,
SignalType::Emergency | SignalType::Help | SignalType::Warning
)
}
pub fn requires_immediate_response(&self) -> bool {
matches!(self, SignalType::Emergency | SignalType::Stop)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum SignalPriority {
Low = 1,
Normal = 2,
High = 3,
Critical = 4,
}
impl SignalPriority {
pub fn timeout_seconds(&self) -> u32 {
match self {
SignalPriority::Critical => 5, SignalPriority::High => 30, SignalPriority::Normal => 300, SignalPriority::Low => 1800, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Signal {
pub signal_type: SignalType,
pub priority: SignalPriority,
pub data: u32,
pub timestamp: CompactTimestamp,
pub sender_id: NodeId,
pub target_id: NodeId,
}
impl Signal {
pub fn new(
signal_type: SignalType,
priority: SignalPriority,
data: u32,
timestamp: u64,
sender_id: NodeId,
target_id: NodeId,
) -> Self {
Self {
signal_type,
priority,
data,
timestamp: CompactTimestamp::new(timestamp),
sender_id,
target_id,
}
}
pub fn broadcast(
signal_type: SignalType,
priority: SignalPriority,
data: u32,
timestamp: u64,
sender_id: NodeId,
) -> Self {
Self::new(signal_type, priority, data, timestamp, sender_id, 0)
}
pub fn is_broadcast(&self) -> bool {
self.target_id == 0
}
pub fn is_for_robot(&self, robot_id: NodeId) -> bool {
self.is_broadcast() || self.target_id == robot_id
}
pub fn is_expired(&self, current_time: u64) -> bool {
let timeout = self.priority.timeout_seconds() as u64 * 1000; current_time > self.timestamp.as_u64() + timeout
}
}
#[derive(Debug, Clone)]
pub struct CoordinationSignals<C: MemoryConfig> {
signals: [Option<Signal>; 32], signal_count: usize,
local_robot_id: NodeId,
last_update: CompactTimestamp,
_phantom: core::marker::PhantomData<C>,
}
impl<C: MemoryConfig> CoordinationSignals<C> {
pub fn new(robot_id: NodeId) -> Self {
Self {
signals: [const { None }; 32],
signal_count: 0,
local_robot_id: robot_id,
last_update: CompactTimestamp::new(0),
_phantom: core::marker::PhantomData,
}
}
pub fn send_signal(
&mut self,
signal_type: SignalType,
priority: SignalPriority,
data: u32,
timestamp: u64,
target_id: NodeId,
) -> CRDTResult<()> {
let signal = Signal::new(
signal_type,
priority,
data,
timestamp,
self.local_robot_id,
target_id,
);
self.add_signal(signal)?;
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
}
pub fn broadcast_signal(
&mut self,
signal_type: SignalType,
priority: SignalPriority,
data: u32,
timestamp: u64,
) -> CRDTResult<()> {
self.send_signal(signal_type, priority, data, timestamp, 0)
}
pub fn all_signals(&self) -> impl Iterator<Item = &Signal> {
self.signals.iter().filter_map(|s| s.as_ref())
}
pub fn signals_for_robot(&self) -> impl Iterator<Item = &Signal> {
self.all_signals()
.filter(move |s| s.is_for_robot(self.local_robot_id))
}
pub fn critical_signals(&self) -> impl Iterator<Item = &Signal> {
self.signals_for_robot()
.filter(|s| s.signal_type.is_critical())
}
pub fn signals_by_type(&self, signal_type: SignalType) -> impl Iterator<Item = &Signal> {
self.signals_for_robot()
.filter(move |s| s.signal_type == signal_type)
}
pub fn signals_by_priority(
&self,
min_priority: SignalPriority,
) -> impl Iterator<Item = &Signal> {
self.signals_for_robot()
.filter(move |s| s.priority >= min_priority)
}
pub fn has_emergency_signals(&self) -> bool {
self.signals_by_type(SignalType::Emergency).next().is_some()
}
pub fn has_help_requests(&self) -> bool {
self.signals_by_type(SignalType::Help).next().is_some()
}
pub fn signal_count(&self) -> usize {
self.signal_count
}
pub fn cleanup_expired(&mut self, current_time: u64) -> usize {
let mut removed = 0;
for i in 0..32 {
if let Some(signal) = &self.signals[i] {
if signal.is_expired(current_time) {
self.signals[i] = None;
self.signal_count -= 1;
removed += 1;
}
}
}
self.compact_signals();
removed
}
fn add_signal(&mut self, signal: Signal) -> CRDTResult<()> {
for existing in self.all_signals() {
if existing.signal_type == signal.signal_type
&& existing.sender_id == signal.sender_id
&& existing.target_id == signal.target_id
{
let time_diff = signal
.timestamp
.as_u64()
.saturating_sub(existing.timestamp.as_u64());
if time_diff < 1000 {
return Ok(()); }
}
}
for i in 0..32 {
if self.signals[i].is_none() {
self.signals[i] = Some(signal);
self.signal_count += 1;
return Ok(());
}
}
self.make_space_for_signal(signal)
}
fn make_space_for_signal(&mut self, new_signal: Signal) -> CRDTResult<()> {
let mut oldest_idx = None;
let mut oldest_time = u64::MAX;
for (i, signal_opt) in self.signals.iter().enumerate() {
if let Some(signal) = signal_opt {
if signal.priority <= SignalPriority::Normal
&& signal.timestamp.as_u64() < oldest_time
{
oldest_time = signal.timestamp.as_u64();
oldest_idx = Some(i);
}
}
}
if let Some(idx) = oldest_idx {
self.signals[idx] = Some(new_signal);
Ok(())
} else {
Err(CRDTError::BufferOverflow)
}
}
fn compact_signals(&mut self) {
let mut write_idx = 0;
for read_idx in 0..32 {
if let Some(signal) = self.signals[read_idx] {
if write_idx != read_idx {
self.signals[write_idx] = Some(signal);
self.signals[read_idx] = None;
}
write_idx += 1;
}
}
}
pub fn validate_signals(&self) -> CRDTResult<()> {
for signal in self.all_signals() {
if signal.sender_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
if signal.target_id as usize >= C::MAX_NODES && signal.target_id != 0 {
return Err(CRDTError::InvalidNodeId);
}
}
Ok(())
}
}
impl<C: MemoryConfig> CRDT<C> for CoordinationSignals<C> {
type Error = CRDTError;
fn merge(&mut self, other: &Self) -> CRDTResult<()> {
for signal in other.all_signals() {
self.add_signal(*signal)?;
}
if other.last_update > self.last_update {
self.last_update = other.last_update;
}
Ok(())
}
fn eq(&self, other: &Self) -> bool {
if self.signal_count != other.signal_count {
return false;
}
for signal in self.all_signals() {
let mut found = false;
for other_signal in other.all_signals() {
if signal == other_signal {
found = true;
break;
}
}
if !found {
return false;
}
}
true
}
fn size_bytes(&self) -> usize {
core::mem::size_of::<Self>()
}
fn validate(&self) -> CRDTResult<()> {
self.validate_signals()
}
fn state_hash(&self) -> u32 {
let mut hash = self.local_robot_id as u32;
for signal in self.all_signals() {
hash ^= (signal.sender_id as u32) ^ (signal.timestamp.as_u64() as u32) ^ (signal.data);
}
hash ^= self.signal_count as u32;
hash
}
fn can_merge(&self, _other: &Self) -> bool {
true
}
}
impl<C: MemoryConfig> BoundedCRDT<C> for CoordinationSignals<C> {
const MAX_SIZE_BYTES: usize = core::mem::size_of::<Self>();
const MAX_ELEMENTS: usize = 32;
fn memory_usage(&self) -> usize {
core::mem::size_of::<Self>()
}
fn element_count(&self) -> usize {
self.signal_count
}
fn compact(&mut self) -> CRDTResult<usize> {
self.compact_signals();
Ok(0)
}
fn can_add_element(&self) -> bool {
self.signal_count < Self::MAX_ELEMENTS
}
}
impl<C: MemoryConfig> RealTimeCRDT<C> for CoordinationSignals<C> {
const MAX_MERGE_CYCLES: u32 = 150; const MAX_VALIDATE_CYCLES: u32 = 75;
const MAX_SERIALIZE_CYCLES: u32 = 100;
fn merge_bounded(&mut self, other: &Self) -> CRDTResult<()> {
self.merge(other)
}
fn validate_bounded(&self) -> CRDTResult<()> {
self.validate()
}
fn remaining_budget(&self) -> Option<u32> {
None
}
fn set_budget(&mut self, _cycles: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::DefaultConfig;
#[test]
fn test_signal_type_properties() {
assert!(SignalType::Emergency.is_critical());
assert!(SignalType::Help.is_critical());
assert!(!SignalType::Start.is_critical());
assert!(SignalType::Emergency.requires_immediate_response());
assert!(SignalType::Stop.requires_immediate_response());
assert!(!SignalType::Complete.requires_immediate_response());
}
#[test]
fn test_signal_priority() {
assert!(
SignalPriority::Critical.timeout_seconds() < SignalPriority::Normal.timeout_seconds()
);
assert!(SignalPriority::High > SignalPriority::Normal);
}
#[test]
fn test_signal_creation() {
let signal = Signal::new(
SignalType::Emergency,
SignalPriority::Critical,
42,
1000,
1,
2,
);
assert_eq!(signal.signal_type, SignalType::Emergency);
assert_eq!(signal.sender_id, 1);
assert_eq!(signal.target_id, 2);
assert!(!signal.is_broadcast());
assert!(signal.is_for_robot(2));
assert!(!signal.is_for_robot(3));
let broadcast = Signal::broadcast(SignalType::Start, SignalPriority::Normal, 0, 1000, 1);
assert!(broadcast.is_broadcast());
assert!(broadcast.is_for_robot(2));
assert!(broadcast.is_for_robot(3));
}
#[test]
fn test_signal_expiration() {
let signal = Signal::new(
SignalType::Emergency,
SignalPriority::Critical,
0,
1000,
1,
0,
);
assert!(!signal.is_expired(1000)); assert!(!signal.is_expired(3000)); assert!(signal.is_expired(10000)); }
#[test]
fn test_coordination_signals_creation() {
let signals = CoordinationSignals::<DefaultConfig>::new(1);
assert_eq!(signals.signal_count(), 0);
assert!(!signals.has_emergency_signals());
assert!(!signals.has_help_requests());
}
#[test]
fn test_signal_sending_and_querying() {
let mut signals = CoordinationSignals::<DefaultConfig>::new(1);
signals
.send_signal(
SignalType::Emergency,
SignalPriority::Critical,
911,
1000,
0, )
.unwrap();
signals
.send_signal(
SignalType::Help,
SignalPriority::High,
123,
1001,
2, )
.unwrap();
assert_eq!(signals.signal_count(), 2);
assert!(signals.has_emergency_signals());
let robot_signals_count = signals.signals_for_robot().count();
assert_eq!(robot_signals_count, 1);
let critical_count = signals.critical_signals().count();
assert_eq!(critical_count, 1);
let emergency_count = signals.signals_by_type(SignalType::Emergency).count();
assert_eq!(emergency_count, 1);
let emergency = signals
.signals_by_type(SignalType::Emergency)
.next()
.unwrap();
assert_eq!(emergency.data, 911);
}
#[test]
fn test_signal_cleanup() {
let mut signals = CoordinationSignals::<DefaultConfig>::new(1);
signals
.send_signal(SignalType::Start, SignalPriority::Critical, 0, 1000, 0)
.unwrap();
assert_eq!(signals.signal_count(), 1);
let removed = signals.cleanup_expired(10000); assert_eq!(removed, 1);
assert_eq!(signals.signal_count(), 0);
}
#[test]
fn test_coordination_signals_merge() {
let mut signals1 = CoordinationSignals::<DefaultConfig>::new(1);
let mut signals2 = CoordinationSignals::<DefaultConfig>::new(2);
signals1
.send_signal(
SignalType::Emergency,
SignalPriority::Critical,
911,
1000,
0,
)
.unwrap();
signals2
.send_signal(SignalType::Help, SignalPriority::High, 123, 1001, 1)
.unwrap();
signals1.merge(&signals2).unwrap();
assert_eq!(signals1.signal_count(), 2);
assert!(signals1.has_emergency_signals());
assert!(signals1.has_help_requests());
}
#[test]
fn test_bounded_crdt_implementation() {
let mut signals = CoordinationSignals::<DefaultConfig>::new(1);
assert_eq!(signals.element_count(), 0);
assert!(signals.can_add_element());
signals
.send_signal(SignalType::Start, SignalPriority::Normal, 0, 1000, 0)
.unwrap();
assert_eq!(signals.element_count(), 1);
assert!(signals.memory_usage() > 0);
}
#[test]
fn test_real_time_crdt_implementation() {
let mut signals1 = CoordinationSignals::<DefaultConfig>::new(1);
let signals2 = CoordinationSignals::<DefaultConfig>::new(2);
assert!(signals1.merge_bounded(&signals2).is_ok());
assert!(signals1.validate_bounded().is_ok());
}
}