use crate::clock::CompactTimestamp;
use crate::error::{CRDTError, CRDTResult};
use crate::memory::{MemoryConfig, NodeId};
use crate::traits::{BoundedCRDT, CRDT, RealTimeCRDT};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum ProcessState {
Stopped = 0,
Starting = 1,
Running = 2,
Pausing = 3,
Paused = 4,
Stopping = 5,
Error = 6,
Emergency = 7,
Maintenance = 8,
}
impl ProcessState {
pub fn is_operational(&self) -> bool {
matches!(
self,
ProcessState::Running | ProcessState::Starting | ProcessState::Pausing
)
}
pub fn requires_attention(&self) -> bool {
matches!(self, ProcessState::Error | ProcessState::Emergency)
}
pub fn can_start(&self) -> bool {
matches!(self, ProcessState::Stopped | ProcessState::Paused)
}
pub fn can_stop(&self) -> bool {
matches!(
self,
ProcessState::Running | ProcessState::Paused | ProcessState::Error
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum ControlAction {
Start = 1,
Stop = 2,
Pause = 3,
Resume = 4,
EmergencyStop = 5,
Reset = 6,
Maintenance = 7,
ExitMaintenance = 8,
}
impl ControlAction {
pub fn is_emergency(&self) -> bool {
matches!(self, ControlAction::EmergencyStop)
}
pub fn requires_privileges(&self) -> bool {
matches!(
self,
ControlAction::EmergencyStop
| ControlAction::Reset
| ControlAction::Maintenance
| ControlAction::ExitMaintenance
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ProcessStep {
pub process_id: NodeId,
pub step_number: u16,
pub state: ProcessState,
pub progress: u8,
pub setpoint: i32,
pub current_value: i32,
pub last_action: ControlAction,
pub timestamp: CompactTimestamp,
pub controller_id: NodeId,
}
impl ProcessStep {
pub fn new(
process_id: NodeId,
step_number: u16,
controller_id: NodeId,
timestamp: u64,
) -> Self {
Self {
process_id,
step_number,
state: ProcessState::Stopped,
progress: 0,
setpoint: 0,
current_value: 0,
last_action: ControlAction::Stop,
timestamp: CompactTimestamp::new(timestamp),
controller_id,
}
}
pub fn update_state(&mut self, state: ProcessState, action: ControlAction, timestamp: u64) {
self.state = state;
self.last_action = action;
self.timestamp = CompactTimestamp::new(timestamp);
}
pub fn update_values(
&mut self,
setpoint: i32,
current_value: i32,
progress: u8,
timestamp: u64,
) {
self.setpoint = setpoint;
self.current_value = current_value;
self.progress = progress;
self.timestamp = CompactTimestamp::new(timestamp);
}
pub fn should_override(&self, other: &ProcessStep) -> bool {
self.timestamp > other.timestamp
}
pub fn is_at_setpoint(&self, tolerance: i32) -> bool {
(self.current_value - self.setpoint).abs() <= tolerance
}
pub fn error_from_setpoint(&self) -> i32 {
self.current_value - self.setpoint
}
}
#[derive(Debug, Clone)]
pub struct ProcessControl<C: MemoryConfig> {
processes: [Option<ProcessStep>; 64], process_count: usize,
local_controller_id: NodeId,
last_update: CompactTimestamp,
_phantom: core::marker::PhantomData<C>,
}
impl<C: MemoryConfig> ProcessControl<C> {
pub fn new(controller_id: NodeId) -> Self {
Self {
processes: [const { None }; 64],
process_count: 0,
local_controller_id: controller_id,
last_update: CompactTimestamp::new(0),
_phantom: core::marker::PhantomData,
}
}
pub fn register_process(
&mut self,
process_id: NodeId,
step_number: u16,
timestamp: u64,
) -> CRDTResult<()> {
let process_step =
ProcessStep::new(process_id, step_number, self.local_controller_id, timestamp);
self.add_process_step(process_step)?;
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
}
pub fn apply_control_action(
&mut self,
process_id: NodeId,
action: ControlAction,
timestamp: u64,
) -> CRDTResult<()> {
if let Some(process) = self.find_process_mut(process_id) {
let new_state = match action {
ControlAction::Start => {
if process.state.can_start() {
ProcessState::Starting
} else {
return Err(CRDTError::InvalidOperation);
}
}
ControlAction::Stop => {
if process.state.can_stop() {
ProcessState::Stopping
} else {
return Err(CRDTError::InvalidOperation);
}
}
ControlAction::Pause => ProcessState::Pausing,
ControlAction::Resume => ProcessState::Starting,
ControlAction::EmergencyStop => ProcessState::Emergency,
ControlAction::Reset => ProcessState::Stopped,
ControlAction::Maintenance => ProcessState::Maintenance,
ControlAction::ExitMaintenance => ProcessState::Stopped,
};
process.update_state(new_state, action, timestamp);
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
} else {
Err(CRDTError::InvalidNodeId)
}
}
pub fn update_process_values(
&mut self,
process_id: NodeId,
setpoint: i32,
current_value: i32,
progress: u8,
timestamp: u64,
) -> CRDTResult<()> {
if let Some(process) = self.find_process_mut(process_id) {
process.update_values(setpoint, current_value, progress, timestamp);
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
} else {
Err(CRDTError::InvalidNodeId)
}
}
pub fn update_process_state(
&mut self,
process_id: NodeId,
state: ProcessState,
timestamp: u64,
) -> CRDTResult<()> {
if let Some(process) = self.find_process_mut(process_id) {
process.state = state;
process.timestamp = CompactTimestamp::new(timestamp);
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
} else {
Err(CRDTError::InvalidNodeId)
}
}
pub fn all_processes(&self) -> impl Iterator<Item = &ProcessStep> {
self.processes.iter().filter_map(|p| p.as_ref())
}
pub fn processes_by_state(&self, state: ProcessState) -> impl Iterator<Item = &ProcessStep> {
self.all_processes().filter(move |p| p.state == state)
}
pub fn running_processes(&self) -> impl Iterator<Item = &ProcessStep> {
self.processes_by_state(ProcessState::Running)
}
pub fn processes_requiring_attention(&self) -> impl Iterator<Item = &ProcessStep> {
self.all_processes()
.filter(|p| p.state.requires_attention())
}
pub fn processes_by_controller(
&self,
controller_id: NodeId,
) -> impl Iterator<Item = &ProcessStep> {
self.all_processes()
.filter(move |p| p.controller_id == controller_id)
}
pub fn get_process(&self, process_id: NodeId) -> Option<&ProcessStep> {
self.all_processes().find(|p| p.process_id == process_id)
}
pub fn process_count(&self) -> usize {
self.process_count
}
pub fn emergency_stop_all(&mut self, timestamp: u64) -> usize {
let mut stopped = 0;
for i in 0..64 {
if let Some(ref mut process) = self.processes[i] {
if process.state.is_operational() {
process.update_state(
ProcessState::Emergency,
ControlAction::EmergencyStop,
timestamp,
);
stopped += 1;
}
}
}
if stopped > 0 {
self.last_update = CompactTimestamp::new(timestamp);
}
stopped
}
fn find_process_mut(&mut self, process_id: NodeId) -> Option<&mut ProcessStep> {
for process_opt in &mut self.processes {
if let Some(process) = process_opt {
if process.process_id == process_id {
return Some(process);
}
}
}
None
}
fn add_process_step(&mut self, process_step: ProcessStep) -> CRDTResult<()> {
for i in 0..64 {
if let Some(ref mut existing) = self.processes[i] {
if existing.process_id == process_step.process_id {
if process_step.should_override(existing) {
*existing = process_step;
}
return Ok(());
}
} else {
self.processes[i] = Some(process_step);
self.process_count += 1;
return Ok(());
}
}
self.make_space_for_process(process_step)
}
fn make_space_for_process(&mut self, new_process: ProcessStep) -> CRDTResult<()> {
let mut oldest_idx = None;
let mut oldest_time = u64::MAX;
for (i, process_opt) in self.processes.iter().enumerate() {
if let Some(process) = process_opt {
if process.state == ProcessState::Stopped
&& process.timestamp.as_u64() < oldest_time
{
oldest_time = process.timestamp.as_u64();
oldest_idx = Some(i);
}
}
}
if let Some(idx) = oldest_idx {
self.processes[idx] = Some(new_process);
Ok(())
} else {
Err(CRDTError::BufferOverflow)
}
}
pub fn validate_control(&self) -> CRDTResult<()> {
for process in self.all_processes() {
if process.process_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
if process.controller_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
}
Ok(())
}
}
impl<C: MemoryConfig> CRDT<C> for ProcessControl<C> {
type Error = CRDTError;
fn merge(&mut self, other: &Self) -> CRDTResult<()> {
for process in other.all_processes() {
self.add_process_step(*process)?;
}
if other.last_update > self.last_update {
self.last_update = other.last_update;
}
Ok(())
}
fn eq(&self, other: &Self) -> bool {
if self.process_count != other.process_count {
return false;
}
for process in self.all_processes() {
let mut found = false;
for other_process in other.all_processes() {
if process.process_id == other_process.process_id && process == other_process {
found = true;
break;
}
}
if !found {
return false;
}
}
true
}
fn size_bytes(&self) -> usize {
core::mem::size_of::<Self>()
}
fn validate(&self) -> CRDTResult<()> {
self.validate_control()
}
fn state_hash(&self) -> u32 {
let mut hash = self.local_controller_id as u32;
for process in self.all_processes() {
hash ^= (process.process_id as u32)
^ (process.timestamp.as_u64() as u32)
^ (process.state as u32);
}
hash ^= self.process_count as u32;
hash
}
fn can_merge(&self, _other: &Self) -> bool {
true
}
}
impl<C: MemoryConfig> BoundedCRDT<C> for ProcessControl<C> {
const MAX_SIZE_BYTES: usize = core::mem::size_of::<Self>();
const MAX_ELEMENTS: usize = 64;
fn memory_usage(&self) -> usize {
core::mem::size_of::<Self>()
}
fn element_count(&self) -> usize {
self.process_count
}
fn compact(&mut self) -> CRDTResult<usize> {
Ok(0)
}
fn can_add_element(&self) -> bool {
self.process_count < Self::MAX_ELEMENTS
}
}
impl<C: MemoryConfig> RealTimeCRDT<C> for ProcessControl<C> {
const MAX_MERGE_CYCLES: u32 = 200; const MAX_VALIDATE_CYCLES: u32 = 100;
const MAX_SERIALIZE_CYCLES: u32 = 150;
fn merge_bounded(&mut self, other: &Self) -> CRDTResult<()> {
self.merge(other)
}
fn validate_bounded(&self) -> CRDTResult<()> {
self.validate()
}
fn remaining_budget(&self) -> Option<u32> {
None
}
fn set_budget(&mut self, _cycles: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::DefaultConfig;
#[test]
fn test_process_state_properties() {
assert!(ProcessState::Running.is_operational());
assert!(ProcessState::Starting.is_operational());
assert!(!ProcessState::Stopped.is_operational());
assert!(ProcessState::Error.requires_attention());
assert!(ProcessState::Emergency.requires_attention());
assert!(!ProcessState::Running.requires_attention());
assert!(ProcessState::Stopped.can_start());
assert!(ProcessState::Paused.can_start());
assert!(!ProcessState::Running.can_start());
assert!(ProcessState::Running.can_stop());
assert!(ProcessState::Error.can_stop());
assert!(!ProcessState::Stopped.can_stop());
}
#[test]
fn test_control_action_properties() {
assert!(ControlAction::EmergencyStop.is_emergency());
assert!(!ControlAction::Start.is_emergency());
assert!(ControlAction::EmergencyStop.requires_privileges());
assert!(ControlAction::Reset.requires_privileges());
assert!(!ControlAction::Start.requires_privileges());
}
#[test]
fn test_process_step_creation() {
let step = ProcessStep::new(42, 1, 1, 1000);
assert_eq!(step.process_id, 42);
assert_eq!(step.step_number, 1);
assert_eq!(step.controller_id, 1);
assert_eq!(step.state, ProcessState::Stopped);
assert_eq!(step.progress, 0);
}
#[test]
fn test_process_step_updates() {
let mut step = ProcessStep::new(42, 1, 1, 1000);
step.update_state(ProcessState::Running, ControlAction::Start, 1001);
assert_eq!(step.state, ProcessState::Running);
assert_eq!(step.last_action, ControlAction::Start);
step.update_values(1000, 950, 75, 1002);
assert_eq!(step.setpoint, 1000);
assert_eq!(step.current_value, 950);
assert_eq!(step.progress, 75);
assert!(!step.is_at_setpoint(10)); assert!(step.is_at_setpoint(100)); assert_eq!(step.error_from_setpoint(), -50);
}
#[test]
fn test_process_control_creation() {
let control = ProcessControl::<DefaultConfig>::new(1);
assert_eq!(control.process_count(), 0);
assert_eq!(control.local_controller_id, 1);
}
#[test]
fn test_process_registration_and_control() {
let mut control = ProcessControl::<DefaultConfig>::new(1);
control.register_process(42, 1, 1000).unwrap();
assert_eq!(control.process_count(), 1);
control
.apply_control_action(42, ControlAction::Start, 1001)
.unwrap();
let process = control.get_process(42).unwrap();
assert_eq!(process.state, ProcessState::Starting);
control
.update_process_state(42, ProcessState::Running, 1002)
.unwrap();
control
.update_process_values(42, 1000, 950, 75, 1003)
.unwrap();
let process = control.get_process(42).unwrap();
assert_eq!(process.state, ProcessState::Running);
assert_eq!(process.setpoint, 1000);
assert_eq!(process.current_value, 950);
assert_eq!(process.progress, 75);
}
#[test]
fn test_process_control_queries() {
let mut control = ProcessControl::<DefaultConfig>::new(1);
control.register_process(1, 1, 1000).unwrap();
control.register_process(2, 1, 1001).unwrap();
control.register_process(3, 1, 1002).unwrap();
control
.apply_control_action(1, ControlAction::Start, 1003)
.unwrap();
control
.update_process_state(1, ProcessState::Running, 1004)
.unwrap();
control
.apply_control_action(2, ControlAction::Start, 1005)
.unwrap();
control
.update_process_state(2, ProcessState::Error, 1006)
.unwrap();
assert_eq!(control.running_processes().count(), 1);
assert_eq!(control.processes_requiring_attention().count(), 1);
assert_eq!(control.processes_by_controller(1).count(), 3);
assert_eq!(control.processes_by_state(ProcessState::Stopped).count(), 1);
}
#[test]
fn test_emergency_stop() {
let mut control = ProcessControl::<DefaultConfig>::new(1);
control.register_process(1, 1, 1000).unwrap();
control.register_process(2, 1, 1001).unwrap();
control
.apply_control_action(1, ControlAction::Start, 1002)
.unwrap();
control
.update_process_state(1, ProcessState::Running, 1003)
.unwrap();
control
.apply_control_action(2, ControlAction::Start, 1004)
.unwrap();
control
.update_process_state(2, ProcessState::Running, 1005)
.unwrap();
assert_eq!(control.running_processes().count(), 2);
let stopped = control.emergency_stop_all(1006);
assert_eq!(stopped, 2);
assert_eq!(
control.processes_by_state(ProcessState::Emergency).count(),
2
);
}
#[test]
fn test_invalid_control_actions() {
let mut control = ProcessControl::<DefaultConfig>::new(1);
control.register_process(42, 1, 1000).unwrap();
let result = control.apply_control_action(42, ControlAction::Stop, 1001);
assert!(result.is_err());
control
.apply_control_action(42, ControlAction::Start, 1002)
.unwrap();
control
.update_process_state(42, ProcessState::Running, 1003)
.unwrap();
let result = control.apply_control_action(42, ControlAction::Stop, 1004);
assert!(result.is_ok());
let result = control.apply_control_action(42, ControlAction::Start, 1005);
assert!(result.is_err());
}
#[test]
fn test_process_control_merge() {
let mut control1 = ProcessControl::<DefaultConfig>::new(1);
let mut control2 = ProcessControl::<DefaultConfig>::new(2);
control1.register_process(1, 1, 1000).unwrap();
control2.register_process(2, 1, 1001).unwrap();
control1.merge(&control2).unwrap();
assert_eq!(control1.process_count(), 2);
assert!(control1.get_process(1).is_some());
assert!(control1.get_process(2).is_some());
}
#[test]
fn test_bounded_crdt_implementation() {
let mut control = ProcessControl::<DefaultConfig>::new(1);
assert_eq!(control.element_count(), 0);
assert!(control.can_add_element());
control.register_process(42, 1, 1000).unwrap();
assert_eq!(control.element_count(), 1);
assert!(control.memory_usage() > 0);
}
#[test]
fn test_real_time_crdt_implementation() {
let mut control1 = ProcessControl::<DefaultConfig>::new(1);
let control2 = ProcessControl::<DefaultConfig>::new(2);
assert!(control1.merge_bounded(&control2).is_ok());
assert!(control1.validate_bounded().is_ok());
}
}