use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::debug;
use crate::apdu::encoding::ApduDecoder;
use crate::apdu::types::{ConfirmedService, ErrorClass, ErrorCode};
use crate::service::cov::{CovManager, CovSubscription};
use super::handler::{ConfirmedServiceHandler, ServiceContext, ServiceResult};
#[derive(Debug, Clone)]
pub struct SubscribeCovRequest {
pub subscriber_process_id: u32,
pub monitored_object: crate::object::types::ObjectId,
pub issue_confirmed_notifications: Option<bool>,
pub lifetime: Option<u32>,
}
fn decode_subscribe_cov(data: &[u8]) -> Result<SubscribeCovRequest, SubscribeCovError> {
let mut decoder = ApduDecoder::new(data);
let (tag, is_context, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if !is_context || tag != 0 {
return Err(SubscribeCovError::InvalidRequest);
}
let subscriber_process_id = decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?;
let (tag, is_context, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if !is_context || tag != 1 || len != 4 {
return Err(SubscribeCovError::InvalidRequest);
}
let monitored_object = decoder
.decode_object_identifier()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
let issue_confirmed_notifications = if !decoder.is_empty() {
if let Some(byte) = decoder.peek() {
let peek_tag = (byte >> 4) & 0x0F;
let peek_context = (byte & 0x08) != 0;
if peek_context && peek_tag == 2 {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if len == 0 {
Some(false)
} else {
let val = decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?;
Some(val != 0)
}
} else {
None
}
} else {
None
}
} else {
None
};
let lifetime = if !decoder.is_empty() {
if let Some(byte) = decoder.peek() {
let peek_tag = (byte >> 4) & 0x0F;
let peek_context = (byte & 0x08) != 0;
if peek_context && peek_tag == 3 {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
Some(
decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?,
)
} else {
None
}
} else {
None
}
} else {
None
};
Ok(SubscribeCovRequest {
subscriber_process_id,
monitored_object,
issue_confirmed_notifications,
lifetime,
})
}
pub struct SubscribeCovHandler {
cov_manager: Arc<CovManager>,
default_addr: SocketAddr,
}
impl SubscribeCovHandler {
pub fn new(cov_manager: Arc<CovManager>) -> Self {
Self {
cov_manager,
default_addr: "0.0.0.0:47808".parse().unwrap(),
}
}
pub fn with_default_addr(mut self, addr: SocketAddr) -> Self {
self.default_addr = addr;
self
}
fn subscriber_addr(&self, ctx: &ServiceContext) -> SocketAddr {
ctx.source_address.unwrap_or(self.default_addr)
}
}
impl ConfirmedServiceHandler for SubscribeCovHandler {
fn service_choice(&self) -> ConfirmedService {
ConfirmedService::SubscribeCov
}
fn handle(&self, data: &[u8], ctx: &ServiceContext) -> ServiceResult {
let request = match decode_subscribe_cov(data) {
Ok(r) => r,
Err(_) => {
return ServiceResult::Error {
error_class: ErrorClass::Services,
error_code: ErrorCode::InvalidParameterDataType,
};
}
};
debug!(
process_id = request.subscriber_process_id,
object = ?request.monitored_object,
confirmed = ?request.issue_confirmed_notifications,
lifetime = ?request.lifetime,
"SubscribeCOV request"
);
if request.issue_confirmed_notifications.is_none() {
let removed = self.cov_manager.unsubscribe(
self.subscriber_addr(ctx),
request.subscriber_process_id,
request.monitored_object,
);
if removed {
debug!("COV subscription cancelled");
}
return ServiceResult::SimpleAck;
}
if ctx.objects.get(&request.monitored_object).is_none() {
return ServiceResult::Error {
error_class: ErrorClass::Object,
error_code: ErrorCode::UnknownObject,
};
}
let lifetime = match request.lifetime {
Some(0) | None => None, Some(secs) => Some(Duration::from_secs(secs as u64)),
};
let subscription = CovSubscription::new(
self.subscriber_addr(ctx),
request.subscriber_process_id,
request.monitored_object,
request.issue_confirmed_notifications.unwrap_or(false),
lifetime,
);
match self.cov_manager.subscribe(subscription) {
Ok(()) => {
debug!("COV subscription created");
ServiceResult::SimpleAck
}
Err(_) => ServiceResult::Error {
error_class: ErrorClass::Resources,
error_code: ErrorCode::CovSubscriptionFailed,
},
}
}
fn name(&self) -> &'static str {
"SubscribeCOV"
}
fn min_data_length(&self) -> usize {
6 }
}
#[derive(Debug, Clone)]
pub struct SubscribeCovPropertyRequest {
pub subscriber_process_id: u32,
pub monitored_object: crate::object::types::ObjectId,
pub issue_confirmed_notifications: Option<bool>,
pub lifetime: Option<u32>,
pub monitored_property: Option<crate::object::property::PropertyId>,
pub cov_increment: Option<f32>,
}
fn decode_subscribe_cov_property(
data: &[u8],
) -> Result<SubscribeCovPropertyRequest, SubscribeCovError> {
let mut decoder = ApduDecoder::new(data);
let (tag, is_context, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if !is_context || tag != 0 {
return Err(SubscribeCovError::InvalidRequest);
}
let subscriber_process_id = decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?;
let (tag, is_context, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if !is_context || tag != 1 || len != 4 {
return Err(SubscribeCovError::InvalidRequest);
}
let monitored_object = decoder
.decode_object_identifier()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
let mut issue_confirmed = None;
let mut lifetime = None;
let mut monitored_property = None;
let mut cov_increment = None;
while !decoder.is_empty() {
if let Some(byte) = decoder.peek() {
let peek_tag = (byte >> 4) & 0x0F;
let peek_context = (byte & 0x08) != 0;
if !peek_context {
break;
}
match peek_tag {
2 => {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if len == 0 {
issue_confirmed = Some(false);
} else {
let val = decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?;
issue_confirmed = Some(val != 0);
}
}
3 => {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
lifetime = Some(
decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?,
);
}
4 => {
if decoder.is_opening_tag(4) {
decoder
.read_u8()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
let prop_val = decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?;
monitored_property =
crate::object::property::PropertyId::from_u32(prop_val);
while !decoder.is_empty() && !decoder.is_closing_tag(4) {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if len > 0 {
let _ = decoder.read_bytes(len);
}
}
if decoder.is_closing_tag(4) {
decoder
.read_u8()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
}
} else {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
let prop_val = decoder
.decode_unsigned(len)
.map_err(|_| SubscribeCovError::InvalidRequest)?;
monitored_property =
crate::object::property::PropertyId::from_u32(prop_val);
}
}
5 => {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if len == 4 {
let bytes = decoder
.read_bytes(4)
.map_err(|_| SubscribeCovError::InvalidRequest)?;
cov_increment =
Some(f32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]));
} else {
let _ = decoder.read_bytes(len);
}
}
_ => {
let (_, _, len) = decoder
.decode_tag_info()
.map_err(|_| SubscribeCovError::InvalidRequest)?;
if len > 0 {
let _ = decoder.read_bytes(len);
}
}
}
} else {
break;
}
}
Ok(SubscribeCovPropertyRequest {
subscriber_process_id,
monitored_object,
issue_confirmed_notifications: issue_confirmed,
lifetime,
monitored_property,
cov_increment,
})
}
pub struct SubscribeCovPropertyHandler {
cov_manager: Arc<CovManager>,
default_addr: SocketAddr,
}
impl SubscribeCovPropertyHandler {
pub fn new(cov_manager: Arc<CovManager>) -> Self {
Self {
cov_manager,
default_addr: "0.0.0.0:47808".parse().unwrap(),
}
}
fn subscriber_addr(&self, ctx: &ServiceContext) -> SocketAddr {
ctx.source_address.unwrap_or(self.default_addr)
}
}
impl ConfirmedServiceHandler for SubscribeCovPropertyHandler {
fn service_choice(&self) -> ConfirmedService {
ConfirmedService::SubscribeCovProperty
}
fn handle(&self, data: &[u8], ctx: &ServiceContext) -> ServiceResult {
let request = match decode_subscribe_cov_property(data) {
Ok(r) => r,
Err(_) => {
return ServiceResult::Error {
error_class: ErrorClass::Services,
error_code: ErrorCode::InvalidParameterDataType,
};
}
};
if request.issue_confirmed_notifications.is_none() {
self.cov_manager.unsubscribe(
self.subscriber_addr(ctx),
request.subscriber_process_id,
request.monitored_object,
);
return ServiceResult::SimpleAck;
}
if ctx.objects.get(&request.monitored_object).is_none() {
return ServiceResult::Error {
error_class: ErrorClass::Object,
error_code: ErrorCode::UnknownObject,
};
}
let lifetime = match request.lifetime {
Some(0) | None => None,
Some(secs) => Some(Duration::from_secs(secs as u64)),
};
let mut subscription = CovSubscription::new(
self.subscriber_addr(ctx),
request.subscriber_process_id,
request.monitored_object,
request.issue_confirmed_notifications.unwrap_or(false),
lifetime,
);
subscription.cov_increment = request.cov_increment;
match self.cov_manager.subscribe(subscription) {
Ok(()) => ServiceResult::SimpleAck,
Err(_) => ServiceResult::Error {
error_class: ErrorClass::Resources,
error_code: ErrorCode::CovSubscriptionFailed,
},
}
}
fn name(&self) -> &'static str {
"SubscribeCOVProperty"
}
fn min_data_length(&self) -> usize {
6
}
}
#[derive(Debug, thiserror::Error)]
pub enum SubscribeCovError {
#[error("Invalid request format")]
InvalidRequest,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object::registry::ObjectRegistry;
use crate::object::standard::AnalogInput;
use crate::object::types::{ObjectId, ObjectType};
use crate::service::handler::ServiceContext;
use std::sync::Arc;
fn make_ctx_with_source(registry: &Arc<ObjectRegistry>, source: SocketAddr) -> ServiceContext {
ServiceContext {
objects: registry.clone(),
device_instance: 1234,
invoke_id: Some(1),
max_apdu_length: 1476,
source_address: Some(source),
}
}
#[test]
fn test_decode_subscribe_cov_with_confirmed_and_lifetime() {
let data = [
0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x29, 0x01, 0x39, 0x01, ];
let request = decode_subscribe_cov(&data).unwrap();
assert_eq!(request.subscriber_process_id, 1);
assert_eq!(
request.monitored_object,
ObjectId::new(ObjectType::AnalogInput, 0)
);
assert_eq!(request.issue_confirmed_notifications, Some(true));
assert_eq!(request.lifetime, Some(1));
}
#[test]
fn test_decode_subscribe_cov_cancellation() {
let data = [
0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00, ];
let request = decode_subscribe_cov(&data).unwrap();
assert_eq!(request.subscriber_process_id, 1);
assert!(request.issue_confirmed_notifications.is_none());
assert!(request.lifetime.is_none());
}
#[test]
fn test_subscribe_cov_handler_creates_subscription() {
let (cov_manager, _rx) = CovManager::new(1234, 100);
let cov_manager = Arc::new(cov_manager);
let handler = SubscribeCovHandler::new(cov_manager.clone());
let registry = Arc::new(ObjectRegistry::new());
let ai = Arc::new(AnalogInput::new(0, "AI_0"));
registry.register(ai);
let source: SocketAddr = "10.0.0.1:47808".parse().unwrap();
let ctx = make_ctx_with_source(®istry, source);
let data = [
0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x29, 0x00, ];
let result = handler.handle(&data, &ctx);
assert!(matches!(result, ServiceResult::SimpleAck));
assert_eq!(cov_manager.subscription_count(), 1);
let subs = cov_manager.subscriptions_for_object(ObjectId::new(ObjectType::AnalogInput, 0));
assert_eq!(subs.len(), 1);
assert_eq!(subs[0].subscriber_address, source);
}
#[test]
fn test_subscribe_cov_handler_cancellation() {
let (cov_manager, _rx) = CovManager::new(1234, 100);
let cov_manager = Arc::new(cov_manager);
let handler = SubscribeCovHandler::new(cov_manager.clone());
let registry = Arc::new(ObjectRegistry::new());
let ai = Arc::new(AnalogInput::new(0, "AI_0"));
registry.register(ai);
let source: SocketAddr = "10.0.0.1:47808".parse().unwrap();
let ctx = make_ctx_with_source(®istry, source);
let subscribe_data = [0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x29, 0x00];
handler.handle(&subscribe_data, &ctx);
assert_eq!(cov_manager.subscription_count(), 1);
let cancel_data = [0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00];
let result = handler.handle(&cancel_data, &ctx);
assert!(matches!(result, ServiceResult::SimpleAck));
assert_eq!(cov_manager.subscription_count(), 0);
}
#[test]
fn test_subscribe_cov_handler_unknown_object() {
let (cov_manager, _rx) = CovManager::new(1234, 100);
let cov_manager = Arc::new(cov_manager);
let handler = SubscribeCovHandler::new(cov_manager.clone());
let registry = Arc::new(ObjectRegistry::new());
let source: SocketAddr = "10.0.0.1:47808".parse().unwrap();
let ctx = make_ctx_with_source(®istry, source);
let data = [
0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x29, 0x00, ];
let result = handler.handle(&data, &ctx);
assert!(matches!(
result,
ServiceResult::Error {
error_class: ErrorClass::Object,
error_code: ErrorCode::UnknownObject,
}
));
}
#[test]
fn test_subscribe_cov_property_handler_creates_subscription() {
let (cov_manager, _rx) = CovManager::new(1234, 100);
let cov_manager = Arc::new(cov_manager);
let handler = SubscribeCovPropertyHandler::new(cov_manager.clone());
let registry = Arc::new(ObjectRegistry::new());
let ai = Arc::new(AnalogInput::new(0, "AI_0"));
registry.register(ai);
let source: SocketAddr = "10.0.0.5:47808".parse().unwrap();
let ctx = make_ctx_with_source(®istry, source);
let data = [
0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x29, 0x01, ];
assert_eq!(
handler.service_choice(),
ConfirmedService::SubscribeCovProperty
);
assert_eq!(handler.name(), "SubscribeCOVProperty");
let result = handler.handle(&data, &ctx);
assert!(matches!(result, ServiceResult::SimpleAck));
assert_eq!(cov_manager.subscription_count(), 1);
let subs = cov_manager.subscriptions_for_object(ObjectId::new(ObjectType::AnalogInput, 0));
assert_eq!(subs[0].subscriber_address, source);
assert!(subs[0].confirmed_notifications);
}
#[test]
fn test_subscribe_cov_property_handler_cancellation() {
let (cov_manager, _rx) = CovManager::new(1234, 100);
let cov_manager = Arc::new(cov_manager);
let handler = SubscribeCovPropertyHandler::new(cov_manager.clone());
let registry = Arc::new(ObjectRegistry::new());
let ai = Arc::new(AnalogInput::new(0, "AI_0"));
registry.register(ai);
let source: SocketAddr = "10.0.0.5:47808".parse().unwrap();
let ctx = make_ctx_with_source(®istry, source);
let subscribe_data = [0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x29, 0x01];
handler.handle(&subscribe_data, &ctx);
assert_eq!(cov_manager.subscription_count(), 1);
let cancel_data = [0x09, 0x01, 0x1C, 0x00, 0x00, 0x00, 0x00];
let result = handler.handle(&cancel_data, &ctx);
assert!(matches!(result, ServiceResult::SimpleAck));
assert_eq!(cov_manager.subscription_count(), 0);
}
#[test]
fn test_decode_subscribe_cov_property_with_increment() {
let cov_val = f32::to_be_bytes(2.0);
let data = vec![
0x09, 0x05, 0x1C, 0x00, 0x00, 0x00, 0x03, 0x29, 0x00, 0x3A, 0x02, 0x58, 0x5C, cov_val[0], cov_val[1], cov_val[2], cov_val[3],
];
let request = decode_subscribe_cov_property(&data).unwrap();
assert_eq!(request.subscriber_process_id, 5);
assert_eq!(
request.monitored_object,
ObjectId::new(ObjectType::AnalogInput, 3)
);
assert_eq!(request.issue_confirmed_notifications, Some(false));
assert_eq!(request.lifetime, Some(600));
assert_eq!(request.cov_increment, Some(2.0));
}
#[test]
fn test_subscribe_cov_handler_service_identity() {
let (cov_manager, _rx) = CovManager::new(1234, 100);
let handler = SubscribeCovHandler::new(Arc::new(cov_manager));
assert_eq!(handler.service_choice(), ConfirmedService::SubscribeCov);
assert_eq!(handler.name(), "SubscribeCOV");
assert_eq!(handler.min_data_length(), 6);
}
}