use alloc::string::String;
use alloc::vec::Vec;
use crate::binary::{from_binary, to_binary};
use crate::error::{DecodeError, EncodeError};
use crate::reader::{MatchedDataSet, ReaderGroup};
use crate::transport::{PubSubTransport, TransportError};
use crate::uadp::network_message::NetworkMessage;
use crate::writer::{DataSetWriter, PublishedDataSet, WriterGroup};
#[derive(Debug, Clone, PartialEq)]
pub enum DaemonError {
Encode(EncodeError),
Decode(DecodeError),
Transport(TransportError),
UnknownDataSet(String),
#[cfg(feature = "security")]
Security(crate::security::SecurityError),
}
impl From<EncodeError> for DaemonError {
fn from(e: EncodeError) -> Self {
Self::Encode(e)
}
}
impl From<DecodeError> for DaemonError {
fn from(e: DecodeError) -> Self {
Self::Decode(e)
}
}
impl From<TransportError> for DaemonError {
fn from(e: TransportError) -> Self {
Self::Transport(e)
}
}
#[cfg(feature = "security")]
impl From<crate::security::SecurityError> for DaemonError {
fn from(e: crate::security::SecurityError) -> Self {
Self::Security(e)
}
}
impl core::fmt::Display for DaemonError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Encode(e) => write!(f, "encode error: {e}"),
Self::Decode(e) => write!(f, "decode error: {e}"),
Self::Transport(e) => write!(f, "transport error: {e}"),
Self::UnknownDataSet(n) => write!(f, "writer references unknown PublishedDataSet {n}"),
#[cfg(feature = "security")]
Self::Security(e) => write!(f, "security error: {e}"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for DaemonError {}
#[derive(Debug, Clone)]
pub struct Publisher<T: PubSubTransport> {
transport: T,
group: WriterGroup,
writers: Vec<DataSetWriter>,
datasets: Vec<PublishedDataSet>,
#[cfg_attr(not(feature = "security"), allow(dead_code))]
nonce_counter: u64,
}
impl<T: PubSubTransport> Publisher<T> {
#[must_use]
pub fn new(transport: T, group: WriterGroup) -> Self {
Self {
transport,
group,
writers: Vec::new(),
datasets: Vec::new(),
nonce_counter: 0,
}
}
pub fn add_dataset(&mut self, dataset: PublishedDataSet) -> &mut Self {
self.datasets.push(dataset);
self
}
pub fn add_writer(&mut self, writer: DataSetWriter) -> &mut Self {
self.writers.push(writer);
self
}
pub fn dataset_mut(&mut self, name: &str) -> Option<&mut PublishedDataSet> {
self.datasets.iter_mut().find(|d| d.name() == name)
}
pub fn transport(&self) -> &T {
&self.transport
}
fn frame_cycle(&mut self, timestamp: Option<i64>) -> Result<NetworkMessage, DaemonError> {
let mut messages = Vec::with_capacity(self.writers.len());
for w in &mut self.writers {
let name = w.config().data_set_name.clone();
let ds = self
.datasets
.iter()
.find(|d| d.name() == name)
.ok_or(DaemonError::UnknownDataSet(name))?;
messages.push(w.produce(ds, timestamp)?);
}
Ok(self.group.frame(messages, timestamp))
}
pub fn publish_cycle(&mut self, timestamp: Option<i64>) -> Result<(), DaemonError> {
let nm = self.frame_cycle(timestamp)?;
let bytes = to_binary(&nm)?;
self.transport.send(&bytes)?;
Ok(())
}
#[cfg(feature = "security")]
pub fn publish_cycle_secured(
&mut self,
timestamp: Option<i64>,
policy: crate::security::SecurityPolicy,
key: &crate::security::SecurityKey,
) -> Result<(), DaemonError> {
let nm = self.frame_cycle(timestamp)?;
self.nonce_counter = self.nonce_counter.wrapping_add(1);
let nonce = self.nonce_counter.to_be_bytes();
let bytes = crate::security::protect(&nm, policy, key, &nonce, true)?;
self.transport.send(&bytes)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Subscriber<T: PubSubTransport> {
transport: T,
reader_group: ReaderGroup,
}
impl<T: PubSubTransport> Subscriber<T> {
#[must_use]
pub fn new(transport: T, reader_group: ReaderGroup) -> Self {
Self {
transport,
reader_group,
}
}
pub fn reader_group_mut(&mut self) -> &mut ReaderGroup {
&mut self.reader_group
}
pub fn transport(&self) -> &T {
&self.transport
}
pub fn poll(&self) -> Result<Vec<MatchedDataSet>, DaemonError> {
let bytes = match self.transport.receive() {
Ok(b) => b,
Err(TransportError::Timeout) => return Ok(Vec::new()),
Err(e) => return Err(DaemonError::Transport(e)),
};
let nm: NetworkMessage = from_binary(&bytes)?;
Ok(self.reader_group.accept(&nm)?)
}
#[cfg(feature = "security")]
pub fn poll_secured(
&self,
policy: crate::security::SecurityPolicy,
sks: &crate::security::SecurityKeyService,
) -> Result<Vec<MatchedDataSet>, DaemonError> {
let bytes = match self.transport.receive() {
Ok(b) => b,
Err(TransportError::Timeout) => return Ok(Vec::new()),
Err(e) => return Err(DaemonError::Transport(e)),
};
let nm = crate::security::unprotect(&bytes, policy, sks)?;
Ok(self.reader_group.accept(&nm)?)
}
}
#[cfg(all(test, feature = "std"))]
mod tests {
use super::*;
use crate::config::{
ConfigurationVersion, DataSetMetaData, DataSetReaderConfig, DataSetWriterConfig,
FieldMetaData, WriterGroupConfig,
};
use crate::reader::DataSetReader;
use crate::transport::LoopbackTransport;
use crate::uadp::dataset_message::DataSetMessageKind;
use crate::uadp::network_message::PublisherId;
use zerodds_opcua_gateway::data_value::{DataValue, Variant, VariantValue};
use zerodds_opcua_gateway::types::BuiltinTypeKind;
fn dv(v: i32) -> DataValue {
DataValue {
value: Some(Variant::scalar(VariantValue::Int32(v))),
status: None,
source_timestamp: None,
server_timestamp: None,
source_pico_sec: None,
server_pico_sec: None,
}
}
fn meta() -> DataSetMetaData {
DataSetMetaData::new(
"ds1",
alloc::vec![FieldMetaData::scalar("a", BuiltinTypeKind::Int32)],
)
}
fn publisher(tx: LoopbackTransport) -> Publisher<LoopbackTransport> {
let mut pds = PublishedDataSet::new("ds1");
pds.add_field("a", dv(0));
let mut pubr = Publisher::new(
tx,
WriterGroup::new(WriterGroupConfig::new("g1", 1), PublisherId::UInt16(9)),
);
pubr.add_dataset(pds).add_writer(DataSetWriter::new(
DataSetWriterConfig::new("w1", 5, "ds1"),
ConfigurationVersion::default(),
));
pubr
}
fn subscriber(tx: LoopbackTransport) -> Subscriber<LoopbackTransport> {
let mut rg = ReaderGroup::new();
rg.add_reader(DataSetReader::new(DataSetReaderConfig::new("r1", meta())));
Subscriber::new(tx, rg)
}
#[test]
fn end_to_end_publish_then_poll() {
let bus = LoopbackTransport::new();
let mut pubr = publisher(bus.clone());
let subr = subscriber(bus);
pubr.dataset_mut("ds1").expect("ds").set("a", dv(123));
pubr.publish_cycle(None).expect("publish");
let matched = subr.poll().expect("poll");
assert_eq!(matched.len(), 1);
assert_eq!(matched[0].reader_name, "r1");
assert_eq!(matched[0].data.writer_id, 5);
assert_eq!(matched[0].data.kind, DataSetMessageKind::KeyFrame);
assert_eq!(
matched[0].data.fields[0].value.value,
Some(Variant::scalar(VariantValue::Int32(123)))
);
}
#[test]
fn poll_returns_empty_when_idle() {
let bus = LoopbackTransport::new();
let subr = subscriber(bus);
assert!(subr.poll().expect("poll").is_empty());
}
#[test]
fn unknown_dataset_is_reported() {
let bus = LoopbackTransport::new();
let mut pubr = Publisher::new(
bus,
WriterGroup::new(WriterGroupConfig::new("g1", 1), PublisherId::UInt16(9)),
);
pubr.add_writer(DataSetWriter::new(
DataSetWriterConfig::new("w1", 5, "ds1"),
ConfigurationVersion::default(),
));
assert_eq!(
pubr.publish_cycle(None),
Err(DaemonError::UnknownDataSet(String::from("ds1")))
);
}
#[cfg(feature = "security")]
#[test]
fn end_to_end_secured() {
use crate::security::{SecurityKey, SecurityKeyService, SecurityPolicy};
let policy = SecurityPolicy::Aes256Ctr;
let blob = alloc::vec![0x5Au8; policy.key_material_len()];
let key = SecurityKey::from_blob(policy, 11, &blob).expect("key");
let sks = SecurityKeyService::new(policy, "g", key.clone());
let bus = LoopbackTransport::new();
let mut pubr = publisher(bus.clone());
let subr = subscriber(bus);
pubr.dataset_mut("ds1").expect("ds").set("a", dv(777));
pubr.publish_cycle_secured(None, policy, &key)
.expect("publish");
assert_eq!(pubr.transport().pending(), 1);
let matched = subr.poll_secured(policy, &sks).expect("poll");
assert_eq!(matched.len(), 1);
assert_eq!(
matched[0].data.fields[0].value.value,
Some(Variant::scalar(VariantValue::Int32(777)))
);
}
}