use crate::clock::CompactTimestamp;
use crate::error::{CRDTError, CRDTResult};
use crate::memory::{MemoryConfig, NodeId};
use crate::traits::{BoundedCRDT, CRDT, RealTimeCRDT};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum EquipmentStatus {
Offline = 0,
Starting = 1,
Idle = 2,
Running = 3,
Stopping = 4,
Warning = 5,
Error = 6,
Emergency = 7,
Maintenance = 8,
}
impl EquipmentStatus {
pub fn is_operational(&self) -> bool {
matches!(self, EquipmentStatus::Idle | EquipmentStatus::Running)
}
pub fn requires_attention(&self) -> bool {
matches!(self, EquipmentStatus::Error | EquipmentStatus::Emergency)
}
pub fn can_start(&self) -> bool {
matches!(self, EquipmentStatus::Offline | EquipmentStatus::Idle)
}
pub fn can_stop(&self) -> bool {
matches!(
self,
EquipmentStatus::Running | EquipmentStatus::Warning | EquipmentStatus::Error
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum MaintenanceState {
None = 0,
PreventiveDue = 1,
PreventiveOverdue = 2,
CorrectiveRequired = 3,
EmergencyRequired = 4,
InProgress = 5,
Completed = 6,
}
impl MaintenanceState {
pub fn requires_maintenance(&self) -> bool {
matches!(
self,
MaintenanceState::PreventiveOverdue
| MaintenanceState::CorrectiveRequired
| MaintenanceState::EmergencyRequired
)
}
pub fn is_urgent(&self) -> bool {
matches!(
self,
MaintenanceState::EmergencyRequired | MaintenanceState::CorrectiveRequired
)
}
pub fn priority_level(&self) -> u8 {
match self {
MaintenanceState::None => 0,
MaintenanceState::Completed => 0,
MaintenanceState::PreventiveDue => 1,
MaintenanceState::InProgress => 2,
MaintenanceState::PreventiveOverdue => 3,
MaintenanceState::CorrectiveRequired => 4,
MaintenanceState::EmergencyRequired => 5,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EquipmentInfo {
pub equipment_id: NodeId,
pub equipment_type: u16,
pub status: EquipmentStatus,
pub maintenance_state: MaintenanceState,
pub operating_hours: u32,
pub cycle_count: u32,
pub last_maintenance: CompactTimestamp,
pub next_maintenance_due: CompactTimestamp,
pub last_update: CompactTimestamp,
pub controller_id: NodeId,
}
impl EquipmentInfo {
pub fn new(
equipment_id: NodeId,
equipment_type: u16,
controller_id: NodeId,
timestamp: u64,
) -> Self {
Self {
equipment_id,
equipment_type,
status: EquipmentStatus::Offline,
maintenance_state: MaintenanceState::None,
operating_hours: 0,
cycle_count: 0,
last_maintenance: CompactTimestamp::new(timestamp),
next_maintenance_due: CompactTimestamp::new(timestamp + 86400000), last_update: CompactTimestamp::new(timestamp),
controller_id,
}
}
pub fn update_status(&mut self, status: EquipmentStatus, timestamp: u64) {
self.status = status;
self.last_update = CompactTimestamp::new(timestamp);
}
pub fn update_maintenance(&mut self, state: MaintenanceState, timestamp: u64) {
self.maintenance_state = state;
self.last_update = CompactTimestamp::new(timestamp);
if state == MaintenanceState::Completed {
self.last_maintenance = CompactTimestamp::new(timestamp);
self.next_maintenance_due = CompactTimestamp::new(timestamp + 30 * 86400000);
}
}
pub fn update_metrics(&mut self, operating_hours: u32, cycle_count: u32, timestamp: u64) {
self.operating_hours = operating_hours;
self.cycle_count = cycle_count;
self.last_update = CompactTimestamp::new(timestamp);
}
pub fn set_maintenance_due(&mut self, due_timestamp: u64, current_timestamp: u64) {
self.next_maintenance_due = CompactTimestamp::new(due_timestamp);
self.last_update = CompactTimestamp::new(current_timestamp);
}
pub fn should_override(&self, other: &EquipmentInfo) -> bool {
self.last_update > other.last_update
}
pub fn is_maintenance_overdue(&self, current_time: u64) -> bool {
current_time > self.next_maintenance_due.as_u64()
}
pub fn time_until_maintenance(&self, current_time: u64) -> i64 {
self.next_maintenance_due.as_u64() as i64 - current_time as i64
}
}
#[derive(Debug, Clone)]
pub struct EquipmentRegistry<C: MemoryConfig> {
equipment: [Option<EquipmentInfo>; 64], equipment_count: usize,
local_controller_id: NodeId,
last_update: CompactTimestamp,
_phantom: core::marker::PhantomData<C>,
}
impl<C: MemoryConfig> EquipmentRegistry<C> {
pub fn new(controller_id: NodeId) -> Self {
Self {
equipment: [const { None }; 64],
equipment_count: 0,
local_controller_id: controller_id,
last_update: CompactTimestamp::new(0),
_phantom: core::marker::PhantomData,
}
}
pub fn register_equipment(
&mut self,
equipment_id: NodeId,
equipment_type: u16,
timestamp: u64,
) -> CRDTResult<()> {
let equipment_info = EquipmentInfo::new(
equipment_id,
equipment_type,
self.local_controller_id,
timestamp,
);
self.add_equipment_info(equipment_info)?;
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
}
pub fn update_equipment_status(
&mut self,
equipment_id: NodeId,
status: EquipmentStatus,
timestamp: u64,
) -> CRDTResult<()> {
if let Some(equipment) = self.find_equipment_mut(equipment_id) {
equipment.update_status(status, timestamp);
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
} else {
Err(CRDTError::InvalidNodeId)
}
}
pub fn update_maintenance_state(
&mut self,
equipment_id: NodeId,
state: MaintenanceState,
timestamp: u64,
) -> CRDTResult<()> {
if let Some(equipment) = self.find_equipment_mut(equipment_id) {
equipment.update_maintenance(state, timestamp);
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
} else {
Err(CRDTError::InvalidNodeId)
}
}
pub fn update_equipment_metrics(
&mut self,
equipment_id: NodeId,
operating_hours: u32,
cycle_count: u32,
timestamp: u64,
) -> CRDTResult<()> {
if let Some(equipment) = self.find_equipment_mut(equipment_id) {
equipment.update_metrics(operating_hours, cycle_count, timestamp);
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
} else {
Err(CRDTError::InvalidNodeId)
}
}
pub fn schedule_maintenance(
&mut self,
equipment_id: NodeId,
due_timestamp: u64,
timestamp: u64,
) -> CRDTResult<()> {
if let Some(equipment) = self.find_equipment_mut(equipment_id) {
equipment.set_maintenance_due(due_timestamp, timestamp);
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
} else {
Err(CRDTError::InvalidNodeId)
}
}
pub fn all_equipment(&self) -> impl Iterator<Item = &EquipmentInfo> {
self.equipment.iter().filter_map(|e| e.as_ref())
}
pub fn equipment_by_status(
&self,
status: EquipmentStatus,
) -> impl Iterator<Item = &EquipmentInfo> {
self.all_equipment().filter(move |e| e.status == status)
}
pub fn running_equipment(&self) -> impl Iterator<Item = &EquipmentInfo> {
self.equipment_by_status(EquipmentStatus::Running)
}
pub fn equipment_requiring_attention(&self) -> impl Iterator<Item = &EquipmentInfo> {
self.all_equipment()
.filter(|e| e.status.requires_attention())
}
pub fn equipment_by_maintenance_state(
&self,
state: MaintenanceState,
) -> impl Iterator<Item = &EquipmentInfo> {
self.all_equipment()
.filter(move |e| e.maintenance_state == state)
}
pub fn equipment_requiring_maintenance(&self) -> impl Iterator<Item = &EquipmentInfo> {
self.all_equipment()
.filter(|e| e.maintenance_state.requires_maintenance())
}
pub fn equipment_with_overdue_maintenance(
&self,
current_time: u64,
) -> impl Iterator<Item = &EquipmentInfo> {
self.all_equipment()
.filter(move |e| e.is_maintenance_overdue(current_time))
}
pub fn equipment_by_type(&self, equipment_type: u16) -> impl Iterator<Item = &EquipmentInfo> {
self.all_equipment()
.filter(move |e| e.equipment_type == equipment_type)
}
pub fn equipment_by_controller(
&self,
controller_id: NodeId,
) -> impl Iterator<Item = &EquipmentInfo> {
self.all_equipment()
.filter(move |e| e.controller_id == controller_id)
}
pub fn get_equipment(&self, equipment_id: NodeId) -> Option<&EquipmentInfo> {
self.all_equipment()
.find(|e| e.equipment_id == equipment_id)
}
pub fn equipment_count(&self) -> usize {
self.equipment_count
}
pub fn emergency_stop_all(&mut self, timestamp: u64) -> usize {
let mut stopped = 0;
for i in 0..64 {
if let Some(ref mut equipment) = self.equipment[i] {
if equipment.status.is_operational() {
equipment.update_status(EquipmentStatus::Emergency, timestamp);
stopped += 1;
}
}
}
if stopped > 0 {
self.last_update = CompactTimestamp::new(timestamp);
}
stopped
}
fn find_equipment_mut(&mut self, equipment_id: NodeId) -> Option<&mut EquipmentInfo> {
for equipment_opt in &mut self.equipment {
if let Some(equipment) = equipment_opt {
if equipment.equipment_id == equipment_id {
return Some(equipment);
}
}
}
None
}
fn add_equipment_info(&mut self, equipment_info: EquipmentInfo) -> CRDTResult<()> {
for i in 0..64 {
if let Some(ref mut existing) = self.equipment[i] {
if existing.equipment_id == equipment_info.equipment_id {
if equipment_info.should_override(existing) {
*existing = equipment_info;
}
return Ok(());
}
} else {
self.equipment[i] = Some(equipment_info);
self.equipment_count += 1;
return Ok(());
}
}
self.make_space_for_equipment(equipment_info)
}
fn make_space_for_equipment(&mut self, new_equipment: EquipmentInfo) -> CRDTResult<()> {
let mut oldest_idx = None;
let mut oldest_time = u64::MAX;
for (i, equipment_opt) in self.equipment.iter().enumerate() {
if let Some(equipment) = equipment_opt {
if equipment.status == EquipmentStatus::Offline
&& equipment.last_update.as_u64() < oldest_time
{
oldest_time = equipment.last_update.as_u64();
oldest_idx = Some(i);
}
}
}
if let Some(idx) = oldest_idx {
self.equipment[idx] = Some(new_equipment);
Ok(())
} else {
Err(CRDTError::BufferOverflow)
}
}
pub fn validate_registry(&self) -> CRDTResult<()> {
for equipment in self.all_equipment() {
if equipment.equipment_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
if equipment.controller_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
}
Ok(())
}
}
impl<C: MemoryConfig> CRDT<C> for EquipmentRegistry<C> {
type Error = CRDTError;
fn merge(&mut self, other: &Self) -> CRDTResult<()> {
for equipment in other.all_equipment() {
self.add_equipment_info(*equipment)?;
}
if other.last_update > self.last_update {
self.last_update = other.last_update;
}
Ok(())
}
fn eq(&self, other: &Self) -> bool {
if self.equipment_count != other.equipment_count {
return false;
}
for equipment in self.all_equipment() {
let mut found = false;
for other_equipment in other.all_equipment() {
if equipment.equipment_id == other_equipment.equipment_id
&& equipment == other_equipment
{
found = true;
break;
}
}
if !found {
return false;
}
}
true
}
fn size_bytes(&self) -> usize {
core::mem::size_of::<Self>()
}
fn validate(&self) -> CRDTResult<()> {
self.validate_registry()
}
fn state_hash(&self) -> u32 {
let mut hash = self.local_controller_id as u32;
for equipment in self.all_equipment() {
hash ^= (equipment.equipment_id as u32)
^ (equipment.last_update.as_u64() as u32)
^ (equipment.status as u32);
}
hash ^= self.equipment_count as u32;
hash
}
fn can_merge(&self, _other: &Self) -> bool {
true
}
}
impl<C: MemoryConfig> BoundedCRDT<C> for EquipmentRegistry<C> {
const MAX_SIZE_BYTES: usize = core::mem::size_of::<Self>();
const MAX_ELEMENTS: usize = 64;
fn memory_usage(&self) -> usize {
core::mem::size_of::<Self>()
}
fn element_count(&self) -> usize {
self.equipment_count
}
fn compact(&mut self) -> CRDTResult<usize> {
Ok(0)
}
fn can_add_element(&self) -> bool {
self.equipment_count < Self::MAX_ELEMENTS
}
}
impl<C: MemoryConfig> RealTimeCRDT<C> for EquipmentRegistry<C> {
const MAX_MERGE_CYCLES: u32 = 200; const MAX_VALIDATE_CYCLES: u32 = 100;
const MAX_SERIALIZE_CYCLES: u32 = 150;
fn merge_bounded(&mut self, other: &Self) -> CRDTResult<()> {
self.merge(other)
}
fn validate_bounded(&self) -> CRDTResult<()> {
self.validate()
}
fn remaining_budget(&self) -> Option<u32> {
None
}
fn set_budget(&mut self, _cycles: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::DefaultConfig;
#[test]
fn test_equipment_status_properties() {
assert!(EquipmentStatus::Running.is_operational());
assert!(EquipmentStatus::Idle.is_operational());
assert!(!EquipmentStatus::Offline.is_operational());
assert!(EquipmentStatus::Error.requires_attention());
assert!(EquipmentStatus::Emergency.requires_attention());
assert!(!EquipmentStatus::Running.requires_attention());
assert!(EquipmentStatus::Offline.can_start());
assert!(EquipmentStatus::Idle.can_start());
assert!(!EquipmentStatus::Running.can_start());
assert!(EquipmentStatus::Running.can_stop());
assert!(EquipmentStatus::Error.can_stop());
assert!(!EquipmentStatus::Offline.can_stop());
}
#[test]
fn test_maintenance_state_properties() {
assert!(MaintenanceState::CorrectiveRequired.requires_maintenance());
assert!(MaintenanceState::EmergencyRequired.requires_maintenance());
assert!(!MaintenanceState::None.requires_maintenance());
assert!(MaintenanceState::EmergencyRequired.is_urgent());
assert!(MaintenanceState::CorrectiveRequired.is_urgent());
assert!(!MaintenanceState::PreventiveDue.is_urgent());
assert!(
MaintenanceState::EmergencyRequired.priority_level()
> MaintenanceState::CorrectiveRequired.priority_level()
);
assert!(
MaintenanceState::CorrectiveRequired.priority_level()
> MaintenanceState::PreventiveDue.priority_level()
);
}
#[test]
fn test_equipment_info_creation() {
let equipment = EquipmentInfo::new(42, 0x2001, 1, 1000);
assert_eq!(equipment.equipment_id, 42);
assert_eq!(equipment.equipment_type, 0x2001);
assert_eq!(equipment.controller_id, 1);
assert_eq!(equipment.status, EquipmentStatus::Offline);
assert_eq!(equipment.maintenance_state, MaintenanceState::None);
}
#[test]
fn test_equipment_info_updates() {
let mut equipment = EquipmentInfo::new(42, 0x2001, 1, 1000);
equipment.update_status(EquipmentStatus::Running, 1001);
assert_eq!(equipment.status, EquipmentStatus::Running);
equipment.update_maintenance(MaintenanceState::PreventiveDue, 1002);
assert_eq!(equipment.maintenance_state, MaintenanceState::PreventiveDue);
equipment.update_metrics(1000, 500, 1003);
assert_eq!(equipment.operating_hours, 1000);
assert_eq!(equipment.cycle_count, 500);
equipment.update_maintenance(MaintenanceState::Completed, 1004);
assert_eq!(equipment.last_maintenance.as_u64(), 1004);
assert!(equipment.next_maintenance_due.as_u64() > 1004);
}
#[test]
fn test_maintenance_timing() {
let mut equipment = EquipmentInfo::new(42, 0x2001, 1, 1000);
equipment.set_maintenance_due(2000, 1000);
assert!(!equipment.is_maintenance_overdue(1500)); assert!(equipment.is_maintenance_overdue(2500));
assert_eq!(equipment.time_until_maintenance(1500), 500); assert_eq!(equipment.time_until_maintenance(2500), -500); }
#[test]
fn test_equipment_registry_creation() {
let registry = EquipmentRegistry::<DefaultConfig>::new(1);
assert_eq!(registry.equipment_count(), 0);
assert_eq!(registry.local_controller_id, 1);
}
#[test]
fn test_equipment_registration_and_updates() {
let mut registry = EquipmentRegistry::<DefaultConfig>::new(1);
registry.register_equipment(42, 0x2001, 1000).unwrap();
assert_eq!(registry.equipment_count(), 1);
registry
.update_equipment_status(42, EquipmentStatus::Running, 1001)
.unwrap();
registry
.update_maintenance_state(42, MaintenanceState::PreventiveDue, 1002)
.unwrap();
registry
.update_equipment_metrics(42, 1000, 500, 1003)
.unwrap();
registry.schedule_maintenance(42, 2000, 1004).unwrap();
let equipment = registry.get_equipment(42).unwrap();
assert_eq!(equipment.status, EquipmentStatus::Running);
assert_eq!(equipment.maintenance_state, MaintenanceState::PreventiveDue);
assert_eq!(equipment.operating_hours, 1000);
assert_eq!(equipment.cycle_count, 500);
assert_eq!(equipment.next_maintenance_due.as_u64(), 2000);
}
#[test]
fn test_equipment_registry_queries() {
let mut registry = EquipmentRegistry::<DefaultConfig>::new(1);
registry.register_equipment(1, 0x2001, 1000).unwrap(); registry.register_equipment(2, 0x2002, 1001).unwrap(); registry.register_equipment(3, 0x2001, 1002).unwrap();
registry
.update_equipment_status(1, EquipmentStatus::Running, 1003)
.unwrap();
registry
.update_equipment_status(2, EquipmentStatus::Error, 1004)
.unwrap();
registry
.update_maintenance_state(3, MaintenanceState::PreventiveDue, 1005)
.unwrap();
assert_eq!(registry.running_equipment().count(), 1);
assert_eq!(registry.equipment_requiring_attention().count(), 1);
assert_eq!(registry.equipment_by_type(0x2001).count(), 2); assert_eq!(registry.equipment_by_controller(1).count(), 3); assert_eq!(registry.equipment_requiring_maintenance().count(), 0); }
#[test]
fn test_maintenance_scheduling() {
let mut registry = EquipmentRegistry::<DefaultConfig>::new(1);
registry.register_equipment(42, 0x2001, 1000).unwrap();
registry.schedule_maintenance(42, 2000, 1000).unwrap();
let equipment = registry.get_equipment(42).unwrap();
assert_eq!(equipment.next_maintenance_due.as_u64(), 2000);
assert!(!equipment.is_maintenance_overdue(1500)); assert!(equipment.is_maintenance_overdue(2500));
assert_eq!(registry.equipment_with_overdue_maintenance(2500).count(), 1);
}
#[test]
fn test_emergency_stop() {
let mut registry = EquipmentRegistry::<DefaultConfig>::new(1);
registry.register_equipment(1, 0x2001, 1000).unwrap();
registry.register_equipment(2, 0x2002, 1001).unwrap();
registry
.update_equipment_status(1, EquipmentStatus::Running, 1002)
.unwrap();
registry
.update_equipment_status(2, EquipmentStatus::Idle, 1003)
.unwrap();
assert_eq!(registry.running_equipment().count(), 1);
let stopped = registry.emergency_stop_all(1004);
assert_eq!(stopped, 2); assert_eq!(
registry
.equipment_by_status(EquipmentStatus::Emergency)
.count(),
2
);
}
#[test]
fn test_equipment_registry_merge() {
let mut registry1 = EquipmentRegistry::<DefaultConfig>::new(1);
let mut registry2 = EquipmentRegistry::<DefaultConfig>::new(2);
registry1.register_equipment(1, 0x2001, 1000).unwrap();
registry2.register_equipment(2, 0x2002, 1001).unwrap();
registry1.merge(®istry2).unwrap();
assert_eq!(registry1.equipment_count(), 2);
assert!(registry1.get_equipment(1).is_some());
assert!(registry1.get_equipment(2).is_some());
}
#[test]
fn test_bounded_crdt_implementation() {
let mut registry = EquipmentRegistry::<DefaultConfig>::new(1);
assert_eq!(registry.element_count(), 0);
assert!(registry.can_add_element());
registry.register_equipment(42, 0x2001, 1000).unwrap();
assert_eq!(registry.element_count(), 1);
assert!(registry.memory_usage() > 0);
}
#[test]
fn test_real_time_crdt_implementation() {
let mut registry1 = EquipmentRegistry::<DefaultConfig>::new(1);
let registry2 = EquipmentRegistry::<DefaultConfig>::new(2);
assert!(registry1.merge_bounded(®istry2).is_ok());
assert!(registry1.validate_bounded().is_ok());
}
}