use crate::clock::CompactTimestamp;
use crate::error::{CRDTError, CRDTResult};
use crate::memory::{MemoryConfig, NodeId};
use crate::traits::{BoundedCRDT, CRDT, RealTimeCRDT};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum SensorType {
Temperature = 1,
Humidity = 2,
Pressure = 3,
Light = 4,
Motion = 5,
AirQuality = 6,
Sound = 7,
Proximity = 8,
Accelerometer = 9,
GPS = 10,
Analog = 11,
Digital = 12,
}
impl SensorType {
pub fn is_continuous(&self) -> bool {
matches!(
self,
SensorType::Temperature
| SensorType::Humidity
| SensorType::Pressure
| SensorType::Light
| SensorType::AirQuality
| SensorType::Sound
| SensorType::Analog
)
}
pub fn is_event_based(&self) -> bool {
matches!(
self,
SensorType::Motion | SensorType::Proximity | SensorType::Digital
)
}
pub fn typical_interval_ms(&self) -> u32 {
match self {
SensorType::Temperature | SensorType::Humidity => 30000, SensorType::Pressure => 60000, SensorType::Light => 10000, SensorType::Motion => 1000, SensorType::AirQuality => 60000, SensorType::Sound => 5000, SensorType::Proximity => 500, SensorType::Accelerometer => 100, SensorType::GPS => 30000, SensorType::Analog => 5000, SensorType::Digital => 1000, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum ReadingQuality {
Poor = 1,
Fair = 2,
Good = 3,
Excellent = 4,
}
impl ReadingQuality {
pub fn confidence_weight(&self) -> f32 {
match self {
ReadingQuality::Poor => 0.25,
ReadingQuality::Fair => 0.5,
ReadingQuality::Good => 0.75,
ReadingQuality::Excellent => 1.0,
}
}
pub fn is_acceptable(&self) -> bool {
matches!(self, ReadingQuality::Good | ReadingQuality::Excellent)
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct SensorReading {
pub sensor_id: NodeId,
pub sensor_type: SensorType,
pub value: i32,
pub quality: ReadingQuality,
pub timestamp: CompactTimestamp,
pub location_id: u16,
pub battery_level: u8,
pub signal_strength: u8,
}
impl SensorReading {
pub fn new(
sensor_id: NodeId,
sensor_type: SensorType,
value: i32,
quality: ReadingQuality,
timestamp: u64,
location_id: u16,
) -> Self {
Self {
sensor_id,
sensor_type,
value,
quality,
timestamp: CompactTimestamp::new(timestamp),
location_id,
battery_level: 255, signal_strength: 255, }
}
pub fn with_vitals(
sensor_id: NodeId,
sensor_type: SensorType,
value: i32,
quality: ReadingQuality,
timestamp: u64,
location_id: u16,
battery_level: u8,
signal_strength: u8,
) -> Self {
Self {
sensor_id,
sensor_type,
value,
quality,
timestamp: CompactTimestamp::new(timestamp),
location_id,
battery_level,
signal_strength,
}
}
pub fn should_override(&self, other: &SensorReading) -> bool {
if self.sensor_id == other.sensor_id {
return self.timestamp > other.timestamp;
}
if self.quality > other.quality {
return true;
}
if self.quality == other.quality {
return self.timestamp > other.timestamp;
}
false
}
pub fn is_stale(&self, current_time: u64, max_age_ms: u64) -> bool {
current_time > self.timestamp.as_u64() + max_age_ms
}
pub fn weighted_value(&self) -> f32 {
self.value as f32 * self.quality.confidence_weight()
}
}
#[derive(Debug, Clone)]
pub struct SensorNetwork<C: MemoryConfig> {
readings: [Option<SensorReading>; 128], reading_count: usize,
local_gateway_id: NodeId,
last_update: CompactTimestamp,
_phantom: core::marker::PhantomData<C>,
}
impl<C: MemoryConfig> SensorNetwork<C> {
pub fn new(gateway_id: NodeId) -> Self {
Self {
readings: [const { None }; 128],
reading_count: 0,
local_gateway_id: gateway_id,
last_update: CompactTimestamp::new(0),
_phantom: core::marker::PhantomData,
}
}
pub fn add_reading(
&mut self,
sensor_id: NodeId,
sensor_type: SensorType,
value: i32,
quality: ReadingQuality,
timestamp: u64,
location_id: u16,
) -> CRDTResult<()> {
let reading = SensorReading::new(
sensor_id,
sensor_type,
value,
quality,
timestamp,
location_id,
);
self.add_sensor_reading(reading)?;
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
}
pub fn add_reading_with_vitals(
&mut self,
sensor_id: NodeId,
sensor_type: SensorType,
value: i32,
quality: ReadingQuality,
timestamp: u64,
location_id: u16,
battery_level: u8,
signal_strength: u8,
) -> CRDTResult<()> {
let reading = SensorReading::with_vitals(
sensor_id,
sensor_type,
value,
quality,
timestamp,
location_id,
battery_level,
signal_strength,
);
self.add_sensor_reading(reading)?;
self.last_update = CompactTimestamp::new(timestamp);
Ok(())
}
pub fn all_readings(&self) -> impl Iterator<Item = &SensorReading> {
self.readings.iter().filter_map(|r| r.as_ref())
}
pub fn readings_by_type(
&self,
sensor_type: SensorType,
) -> impl Iterator<Item = &SensorReading> {
self.all_readings()
.filter(move |r| r.sensor_type == sensor_type)
}
pub fn readings_by_location(&self, location_id: u16) -> impl Iterator<Item = &SensorReading> {
self.all_readings()
.filter(move |r| r.location_id == location_id)
}
pub fn readings_by_sensor(&self, sensor_id: NodeId) -> impl Iterator<Item = &SensorReading> {
self.all_readings()
.filter(move |r| r.sensor_id == sensor_id)
}
pub fn quality_readings(&self) -> impl Iterator<Item = &SensorReading> {
self.all_readings().filter(|r| r.quality.is_acceptable())
}
pub fn latest_reading(
&self,
sensor_type: SensorType,
location_id: u16,
) -> Option<&SensorReading> {
self.readings_by_type(sensor_type)
.filter(|r| r.location_id == location_id)
.max_by_key(|r| r.timestamp.as_u64())
}
pub fn average_value(
&self,
sensor_type: SensorType,
location_id: u16,
max_age_ms: u64,
current_time: u64,
) -> Option<f32> {
let mut sum = 0.0f32;
let mut weight_sum = 0.0f32;
let mut count = 0;
for reading in self
.readings_by_type(sensor_type)
.filter(|r| r.location_id == location_id)
.filter(|r| !r.is_stale(current_time, max_age_ms))
.filter(|r| r.quality.is_acceptable())
{
sum += reading.weighted_value();
weight_sum += reading.quality.confidence_weight();
count += 1;
}
if count > 0 && weight_sum > 0.0 {
Some(sum / weight_sum)
} else {
None
}
}
pub fn low_battery_sensors(&self, threshold: u8) -> impl Iterator<Item = &SensorReading> {
self.all_readings()
.filter(move |r| r.battery_level < threshold && r.battery_level < 255)
}
pub fn weak_signal_sensors(&self, threshold: u8) -> impl Iterator<Item = &SensorReading> {
self.all_readings()
.filter(move |r| r.signal_strength < threshold && r.signal_strength < 255)
}
pub fn reading_count(&self) -> usize {
self.reading_count
}
pub fn cleanup_stale_readings(&mut self, current_time: u64, max_age_ms: u64) -> usize {
let mut removed = 0;
for i in 0..128 {
if let Some(reading) = &self.readings[i] {
if reading.is_stale(current_time, max_age_ms) {
self.readings[i] = None;
self.reading_count -= 1;
removed += 1;
}
}
}
self.compact_readings();
removed
}
fn add_sensor_reading(&mut self, reading: SensorReading) -> CRDTResult<()> {
for i in 0..128 {
if let Some(ref mut existing) = self.readings[i] {
if existing.sensor_id == reading.sensor_id
&& existing.sensor_type == reading.sensor_type
{
if reading.should_override(existing) {
*existing = reading;
}
return Ok(());
}
} else {
self.readings[i] = Some(reading);
self.reading_count += 1;
return Ok(());
}
}
self.make_space_for_reading(reading)
}
fn make_space_for_reading(&mut self, new_reading: SensorReading) -> CRDTResult<()> {
let mut oldest_idx = None;
let mut oldest_time = u64::MAX;
for (i, reading_opt) in self.readings.iter().enumerate() {
if let Some(reading) = reading_opt {
if reading.quality <= ReadingQuality::Fair
&& reading.timestamp.as_u64() < oldest_time
{
oldest_time = reading.timestamp.as_u64();
oldest_idx = Some(i);
}
}
}
if let Some(idx) = oldest_idx {
self.readings[idx] = Some(new_reading);
Ok(())
} else {
Err(CRDTError::BufferOverflow)
}
}
fn compact_readings(&mut self) {
let mut write_idx = 0;
for read_idx in 0..128 {
if let Some(reading) = self.readings[read_idx] {
if write_idx != read_idx {
self.readings[write_idx] = Some(reading);
self.readings[read_idx] = None;
}
write_idx += 1;
}
}
}
pub fn validate_network(&self) -> CRDTResult<()> {
for reading in self.all_readings() {
if reading.sensor_id as usize >= C::MAX_NODES {
return Err(CRDTError::InvalidNodeId);
}
}
Ok(())
}
}
impl<C: MemoryConfig> CRDT<C> for SensorNetwork<C> {
type Error = CRDTError;
fn merge(&mut self, other: &Self) -> CRDTResult<()> {
for reading in other.all_readings() {
self.add_sensor_reading(*reading)?;
}
if other.last_update > self.last_update {
self.last_update = other.last_update;
}
Ok(())
}
fn eq(&self, other: &Self) -> bool {
if self.reading_count != other.reading_count {
return false;
}
for reading in self.all_readings() {
let mut found = false;
for other_reading in other.all_readings() {
if reading.sensor_id == other_reading.sensor_id
&& reading.timestamp == other_reading.timestamp
&& reading == other_reading
{
found = true;
break;
}
}
if !found {
return false;
}
}
true
}
fn size_bytes(&self) -> usize {
core::mem::size_of::<Self>()
}
fn validate(&self) -> CRDTResult<()> {
self.validate_network()
}
fn state_hash(&self) -> u32 {
let mut hash = self.local_gateway_id as u32;
for reading in self.all_readings() {
hash ^= (reading.sensor_id as u32)
^ (reading.timestamp.as_u64() as u32)
^ (reading.value as u32);
}
hash ^= self.reading_count as u32;
hash
}
fn can_merge(&self, _other: &Self) -> bool {
true
}
}
impl<C: MemoryConfig> BoundedCRDT<C> for SensorNetwork<C> {
const MAX_SIZE_BYTES: usize = core::mem::size_of::<Self>();
const MAX_ELEMENTS: usize = 128;
fn memory_usage(&self) -> usize {
core::mem::size_of::<Self>()
}
fn element_count(&self) -> usize {
self.reading_count
}
fn compact(&mut self) -> CRDTResult<usize> {
self.compact_readings();
Ok(0)
}
fn can_add_element(&self) -> bool {
self.reading_count < Self::MAX_ELEMENTS
}
}
impl<C: MemoryConfig> RealTimeCRDT<C> for SensorNetwork<C> {
const MAX_MERGE_CYCLES: u32 = 300; const MAX_VALIDATE_CYCLES: u32 = 150;
const MAX_SERIALIZE_CYCLES: u32 = 200;
fn merge_bounded(&mut self, other: &Self) -> CRDTResult<()> {
self.merge(other)
}
fn validate_bounded(&self) -> CRDTResult<()> {
self.validate()
}
fn remaining_budget(&self) -> Option<u32> {
None
}
fn set_budget(&mut self, _cycles: u32) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::DefaultConfig;
#[test]
fn test_sensor_type_properties() {
assert!(SensorType::Temperature.is_continuous());
assert!(!SensorType::Temperature.is_event_based());
assert!(SensorType::Motion.is_event_based());
assert!(!SensorType::Motion.is_continuous());
assert!(SensorType::Temperature.typical_interval_ms() > 1000);
}
#[test]
fn test_reading_quality_properties() {
assert!(
ReadingQuality::Excellent.confidence_weight()
> ReadingQuality::Poor.confidence_weight()
);
assert!(ReadingQuality::Good.is_acceptable());
assert!(!ReadingQuality::Poor.is_acceptable());
}
#[test]
fn test_sensor_reading_creation() {
let reading = SensorReading::new(
42,
SensorType::Temperature,
2350, ReadingQuality::Good,
1000,
1,
);
assert_eq!(reading.sensor_id, 42);
assert_eq!(reading.sensor_type, SensorType::Temperature);
assert_eq!(reading.value, 2350);
assert_eq!(reading.quality, ReadingQuality::Good);
assert_eq!(reading.location_id, 1);
}
#[test]
fn test_sensor_reading_override() {
let reading1 = SensorReading::new(
42,
SensorType::Temperature,
2300,
ReadingQuality::Good,
1000,
1,
);
let reading2 = SensorReading::new(
42,
SensorType::Temperature,
2350,
ReadingQuality::Good,
1001,
1,
);
let reading3 = SensorReading::new(
43,
SensorType::Temperature,
2400,
ReadingQuality::Excellent,
999,
1,
);
assert!(reading2.should_override(&reading1)); assert!(reading3.should_override(&reading1)); assert!(!reading1.should_override(&reading2)); }
#[test]
fn test_sensor_network_creation() {
let network = SensorNetwork::<DefaultConfig>::new(1);
assert_eq!(network.reading_count(), 0);
assert_eq!(network.local_gateway_id, 1);
}
#[test]
fn test_sensor_reading_addition() {
let mut network = SensorNetwork::<DefaultConfig>::new(1);
network
.add_reading(
42,
SensorType::Temperature,
2350,
ReadingQuality::Good,
1000,
1,
)
.unwrap();
assert_eq!(network.reading_count(), 1);
network
.add_reading_with_vitals(
43,
SensorType::Humidity,
6500, ReadingQuality::Excellent,
1001,
1,
80, 200, )
.unwrap();
assert_eq!(network.reading_count(), 2);
}
#[test]
fn test_sensor_network_queries() {
let mut network = SensorNetwork::<DefaultConfig>::new(1);
network
.add_reading(
42,
SensorType::Temperature,
2350,
ReadingQuality::Good,
1000,
1,
)
.unwrap();
network
.add_reading(
43,
SensorType::Temperature,
2400,
ReadingQuality::Excellent,
1001,
2,
)
.unwrap();
network
.add_reading(
44,
SensorType::Humidity,
6500,
ReadingQuality::Good,
1002,
1,
)
.unwrap();
network
.add_reading_with_vitals(
45,
SensorType::Motion,
1,
ReadingQuality::Fair,
1003,
1,
20,
100,
)
.unwrap();
assert_eq!(network.readings_by_type(SensorType::Temperature).count(), 2);
assert_eq!(network.readings_by_location(1).count(), 3);
assert_eq!(network.readings_by_sensor(42).count(), 1);
assert_eq!(network.quality_readings().count(), 3);
let latest_temp = network.latest_reading(SensorType::Temperature, 2).unwrap();
assert_eq!(latest_temp.sensor_id, 43);
assert_eq!(network.low_battery_sensors(50).count(), 1); }
#[test]
fn test_sensor_network_average() {
let mut network = SensorNetwork::<DefaultConfig>::new(1);
network
.add_reading(
42,
SensorType::Temperature,
2300,
ReadingQuality::Good,
1000,
1,
)
.unwrap();
network
.add_reading(
43,
SensorType::Temperature,
2400,
ReadingQuality::Excellent,
1001,
1,
)
.unwrap();
network
.add_reading(
44,
SensorType::Temperature,
2350,
ReadingQuality::Good,
1002,
1,
)
.unwrap();
let avg = network
.average_value(SensorType::Temperature, 1, 10000, 2000)
.unwrap();
assert!((avg - 2355.0).abs() < 1.0);
}
#[test]
fn test_sensor_network_merge() {
let mut network1 = SensorNetwork::<DefaultConfig>::new(1);
let mut network2 = SensorNetwork::<DefaultConfig>::new(2);
network1
.add_reading(
42,
SensorType::Temperature,
2300,
ReadingQuality::Good,
1000,
1,
)
.unwrap();
network2
.add_reading(
43,
SensorType::Humidity,
6500,
ReadingQuality::Excellent,
1001,
1,
)
.unwrap();
network1.merge(&network2).unwrap();
assert_eq!(network1.reading_count(), 2);
assert_eq!(
network1.readings_by_type(SensorType::Temperature).count(),
1
);
assert_eq!(network1.readings_by_type(SensorType::Humidity).count(), 1);
}
#[test]
fn test_stale_reading_cleanup() {
let mut network = SensorNetwork::<DefaultConfig>::new(1);
network
.add_reading(
42,
SensorType::Temperature,
2300,
ReadingQuality::Good,
1000,
1,
)
.unwrap();
assert_eq!(network.reading_count(), 1);
let removed = network.cleanup_stale_readings(10000, 5000); assert_eq!(removed, 1);
assert_eq!(network.reading_count(), 0);
}
#[test]
fn test_bounded_crdt_implementation() {
let mut network = SensorNetwork::<DefaultConfig>::new(1);
assert_eq!(network.element_count(), 0);
assert!(network.can_add_element());
network
.add_reading(
42,
SensorType::Temperature,
2300,
ReadingQuality::Good,
1000,
1,
)
.unwrap();
assert_eq!(network.element_count(), 1);
assert!(network.memory_usage() > 0);
}
#[test]
fn test_real_time_crdt_implementation() {
let mut network1 = SensorNetwork::<DefaultConfig>::new(1);
let network2 = SensorNetwork::<DefaultConfig>::new(2);
assert!(network1.merge_bounded(&network2).is_ok());
assert!(network1.validate_bounded().is_ok());
}
}