use std::net::SocketAddr;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use tokio::sync::mpsc;
use crate::apdu::encoding::ApduEncoder;
use crate::object::property::{BACnetValue, PropertyId};
use crate::object::types::ObjectId;
#[derive(Debug, Clone)]
pub struct CovSubscription {
pub subscriber_address: SocketAddr,
pub subscriber_process_id: u32,
pub monitored_object: ObjectId,
pub confirmed_notifications: bool,
pub lifetime: Option<Duration>,
pub cov_increment: Option<f32>,
pub created_at: Instant,
pub last_notification: Option<Instant>,
}
impl CovSubscription {
pub fn new(
subscriber_address: SocketAddr,
subscriber_process_id: u32,
monitored_object: ObjectId,
confirmed: bool,
lifetime: Option<Duration>,
) -> Self {
Self {
subscriber_address,
subscriber_process_id,
monitored_object,
confirmed_notifications: confirmed,
lifetime,
cov_increment: None,
created_at: Instant::now(),
last_notification: None,
}
}
pub fn is_expired(&self) -> bool {
if let Some(lifetime) = self.lifetime {
self.created_at.elapsed() > lifetime
} else {
false
}
}
pub fn remaining_lifetime(&self) -> Option<u32> {
self.lifetime.map(|lifetime| {
let elapsed = self.created_at.elapsed();
if elapsed > lifetime {
0
} else {
(lifetime - elapsed).as_secs() as u32
}
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SubscriptionKey {
subscriber_address: SocketAddr,
subscriber_process_id: u32,
monitored_object: ObjectId,
}
#[derive(Debug, Clone)]
pub struct CovNotification {
pub destination: SocketAddr,
pub subscriber_process_id: u32,
pub initiating_device: ObjectId,
pub monitored_object: ObjectId,
pub time_remaining: u32,
pub list_of_values: Vec<(PropertyId, BACnetValue)>,
pub confirmed: bool,
}
impl CovNotification {
fn encode_service_data(&self) -> Vec<u8> {
let mut encoder = ApduEncoder::new();
encoder.encode_context_unsigned(0, self.subscriber_process_id);
encoder.encode_context_object_identifier(1, self.initiating_device);
encoder.encode_context_object_identifier(2, self.monitored_object);
encoder.encode_context_unsigned(3, self.time_remaining);
encoder.encode_opening_tag(4);
for (property_id, value) in &self.list_of_values {
encoder.encode_context_enumerated(0, *property_id as u32);
encoder.encode_opening_tag(2);
encoder.encode_value(value);
encoder.encode_closing_tag(2);
}
encoder.encode_closing_tag(4);
encoder.into_bytes()
}
pub fn encode_unconfirmed(&self) -> Vec<u8> {
self.encode_service_data()
}
pub fn encode_confirmed(&self, invoke_id: u8) -> Vec<u8> {
let service_data = self.encode_service_data();
let mut apdu = Vec::with_capacity(4 + service_data.len());
apdu.push(0x00); apdu.push(0x05); apdu.push(invoke_id);
apdu.push(crate::apdu::types::ConfirmedService::ConfirmedCovNotification as u8);
apdu.extend_from_slice(&service_data);
apdu
}
}
pub struct CovManager {
subscriptions: DashMap<SubscriptionKey, CovSubscription>,
notification_tx: mpsc::Sender<CovNotification>,
max_subscriptions: usize,
device_instance: u32,
}
impl CovManager {
pub fn new(
device_instance: u32,
max_subscriptions: usize,
) -> (Self, mpsc::Receiver<CovNotification>) {
let (tx, rx) = mpsc::channel(1000);
(
Self {
subscriptions: DashMap::new(),
notification_tx: tx,
max_subscriptions,
device_instance,
},
rx,
)
}
pub fn subscribe(&self, subscription: CovSubscription) -> Result<(), CovError> {
if self.subscriptions.len() >= self.max_subscriptions {
self.cleanup_expired();
if self.subscriptions.len() >= self.max_subscriptions {
return Err(CovError::MaxSubscriptionsReached);
}
}
let key = SubscriptionKey {
subscriber_address: subscription.subscriber_address,
subscriber_process_id: subscription.subscriber_process_id,
monitored_object: subscription.monitored_object,
};
self.subscriptions.insert(key, subscription);
Ok(())
}
pub fn unsubscribe(
&self,
subscriber_address: SocketAddr,
subscriber_process_id: u32,
monitored_object: ObjectId,
) -> bool {
let key = SubscriptionKey {
subscriber_address,
subscriber_process_id,
monitored_object,
};
self.subscriptions.remove(&key).is_some()
}
pub fn subscription_count(&self) -> usize {
self.subscriptions.len()
}
pub fn subscriptions_for_object(&self, object_id: ObjectId) -> Vec<CovSubscription> {
self.subscriptions
.iter()
.filter(|entry| entry.monitored_object == object_id)
.map(|entry| entry.clone())
.collect()
}
pub async fn notify_change(&self, object_id: ObjectId, values: Vec<(PropertyId, BACnetValue)>) {
let device_id = ObjectId::device(self.device_instance);
for entry in self.subscriptions.iter() {
let subscription = entry.value();
if subscription.monitored_object != object_id || subscription.is_expired() {
continue;
}
let notification = CovNotification {
destination: subscription.subscriber_address,
subscriber_process_id: subscription.subscriber_process_id,
initiating_device: device_id,
monitored_object: object_id,
time_remaining: subscription.remaining_lifetime().unwrap_or(0),
list_of_values: values.clone(),
confirmed: subscription.confirmed_notifications,
};
let _ = self.notification_tx.send(notification).await;
}
}
pub fn cleanup_expired(&self) {
self.subscriptions.retain(|_, sub| !sub.is_expired());
}
pub fn list_subscriptions(&self) -> Vec<CovSubscription> {
self.subscriptions
.iter()
.map(|entry| entry.clone())
.collect()
}
}
#[derive(Debug, thiserror::Error)]
pub enum CovError {
#[error("Maximum subscriptions reached")]
MaxSubscriptionsReached,
#[error("Subscription not found")]
SubscriptionNotFound,
#[error("Object does not support COV")]
NotCovProperty,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscription_expiry() {
let sub = CovSubscription::new(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
Some(Duration::from_secs(0)), );
std::thread::sleep(Duration::from_millis(10));
assert!(sub.is_expired());
}
#[test]
fn test_subscription_no_expiry() {
let sub = CovSubscription::new(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
None, );
assert!(!sub.is_expired());
}
#[tokio::test]
async fn test_cov_manager() {
let (manager, _rx) = CovManager::new(1234, 100);
let sub = CovSubscription::new(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
Some(Duration::from_secs(300)),
);
manager.subscribe(sub).unwrap();
assert_eq!(manager.subscription_count(), 1);
manager.unsubscribe(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
);
assert_eq!(manager.subscription_count(), 0);
}
#[test]
fn test_subscription_remaining_lifetime() {
let sub = CovSubscription::new(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
Some(Duration::from_secs(300)),
);
let remaining = sub.remaining_lifetime().unwrap();
assert!(remaining > 0 && remaining <= 300);
let sub_inf = CovSubscription::new(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
None,
);
assert!(sub_inf.remaining_lifetime().is_none());
}
#[test]
fn test_subscription_cov_increment() {
let mut sub = CovSubscription::new(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
None,
);
assert!(sub.cov_increment.is_none());
sub.cov_increment = Some(1.5);
assert_eq!(sub.cov_increment, Some(1.5));
}
#[test]
fn test_encode_unconfirmed_notification() {
let notification = CovNotification {
destination: "10.0.0.1:47808".parse().unwrap(),
subscriber_process_id: 42,
initiating_device: ObjectId::device(1234),
monitored_object: ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
time_remaining: 300,
list_of_values: vec![
(PropertyId::PresentValue, BACnetValue::Real(72.5)),
(
PropertyId::StatusFlags,
BACnetValue::BitString(vec![false, false, false, false]),
),
],
confirmed: false,
};
let data = notification.encode_unconfirmed();
assert!(!data.is_empty());
assert_eq!(data[0] & 0x0F, 0x09); }
#[test]
fn test_encode_confirmed_notification() {
let notification = CovNotification {
destination: "10.0.0.1:47808".parse().unwrap(),
subscriber_process_id: 1,
initiating_device: ObjectId::device(1234),
monitored_object: ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
time_remaining: 0,
list_of_values: vec![(PropertyId::PresentValue, BACnetValue::Real(25.0))],
confirmed: true,
};
let apdu = notification.encode_confirmed(7);
assert_eq!(apdu[0], 0x00); assert_eq!(apdu[1], 0x05); assert_eq!(apdu[2], 7); assert_eq!(
apdu[3],
crate::apdu::types::ConfirmedService::ConfirmedCovNotification as u8
);
assert!(apdu.len() > 4);
}
#[tokio::test]
async fn test_cov_manager_max_subscriptions() {
let (manager, _rx) = CovManager::new(1234, 2);
let sub1 = CovSubscription::new(
"10.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
None,
);
let sub2 = CovSubscription::new(
"10.0.0.2:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
None,
);
let sub3 = CovSubscription::new(
"10.0.0.3:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
None,
);
manager.subscribe(sub1).unwrap();
manager.subscribe(sub2).unwrap();
assert!(manager.subscribe(sub3).is_err());
}
#[tokio::test]
async fn test_cov_manager_cleanup_expired() {
let (manager, _rx) = CovManager::new(1234, 100);
let sub = CovSubscription::new(
"127.0.0.1:47808".parse().unwrap(),
1,
ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1),
false,
Some(Duration::from_secs(0)), );
manager.subscribe(sub).unwrap();
assert_eq!(manager.subscription_count(), 1);
std::thread::sleep(Duration::from_millis(10));
manager.cleanup_expired();
assert_eq!(manager.subscription_count(), 0);
}
#[tokio::test]
async fn test_cov_manager_subscriptions_for_object() {
let (manager, _rx) = CovManager::new(1234, 100);
let obj1 = ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1);
let obj2 = ObjectId::new(crate::object::types::ObjectType::AnalogInput, 2);
manager
.subscribe(CovSubscription::new(
"10.0.0.1:47808".parse().unwrap(),
1,
obj1,
false,
None,
))
.unwrap();
manager
.subscribe(CovSubscription::new(
"10.0.0.2:47808".parse().unwrap(),
1,
obj1,
true,
None,
))
.unwrap();
manager
.subscribe(CovSubscription::new(
"10.0.0.3:47808".parse().unwrap(),
1,
obj2,
false,
None,
))
.unwrap();
let subs = manager.subscriptions_for_object(obj1);
assert_eq!(subs.len(), 2);
let subs2 = manager.subscriptions_for_object(obj2);
assert_eq!(subs2.len(), 1);
}
#[tokio::test]
async fn test_cov_notify_change() {
let (manager, mut rx) = CovManager::new(1234, 100);
let obj = ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1);
manager
.subscribe(CovSubscription::new(
"10.0.0.1:47808".parse().unwrap(),
1,
obj,
false,
None,
))
.unwrap();
manager
.subscribe(CovSubscription::new(
"10.0.0.2:47808".parse().unwrap(),
2,
obj,
true,
None,
))
.unwrap();
let values = vec![(PropertyId::PresentValue, BACnetValue::Real(42.0))];
manager.notify_change(obj, values).await;
let n1 = rx.try_recv().unwrap();
let n2 = rx.try_recv().unwrap();
let (confirmed_count, unconfirmed_count) =
[n1, n2].iter().fold(
(0, 0),
|(c, u), n| {
if n.confirmed {
(c + 1, u)
} else {
(c, u + 1)
}
},
);
assert_eq!(confirmed_count, 1);
assert_eq!(unconfirmed_count, 1);
}
#[tokio::test]
async fn test_cov_manager_list_subscriptions() {
let (manager, _rx) = CovManager::new(1234, 100);
let obj = ObjectId::new(crate::object::types::ObjectType::AnalogInput, 1);
manager
.subscribe(CovSubscription::new(
"10.0.0.1:47808".parse().unwrap(),
1,
obj,
false,
None,
))
.unwrap();
manager
.subscribe(CovSubscription::new(
"10.0.0.2:47808".parse().unwrap(),
2,
obj,
true,
Some(Duration::from_secs(600)),
))
.unwrap();
let list = manager.list_subscriptions();
assert_eq!(list.len(), 2);
}
}