use alloc::string::String;
use alloc::vec::Vec;
use zerodds_opcua_gateway::data_value::Variant;
use crate::reader::ReceivedDataSet;
use crate::writer::PublishedDataSet;
#[derive(Debug, Clone, PartialEq)]
pub struct DdsSample {
pub topic: String,
pub fields: Vec<(String, Variant)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BridgeError {
Dds(String),
}
impl core::fmt::Display for BridgeError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Dds(m) => write!(f, "DDS backend error: {m}"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for BridgeError {}
pub trait DdsPublisher {
fn publish(&self, sample: &DdsSample) -> Result<(), BridgeError>;
}
pub trait DdsSubscriber {
fn take(&self, topic: &str) -> Result<Vec<DdsSample>, BridgeError>;
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct DataSetTopicMapping {
pub topic: String,
pub field_map: Vec<(String, String)>,
}
impl DataSetTopicMapping {
#[must_use]
pub fn identity(topic: impl Into<String>, field_names: &[&str]) -> Self {
Self {
topic: topic.into(),
field_map: field_names
.iter()
.map(|n| (String::from(*n), String::from(*n)))
.collect(),
}
}
fn dds_member(&self, dataset_field: &str) -> Option<&str> {
self.field_map
.iter()
.find(|(ds, _)| ds == dataset_field)
.map(|(_, dds)| dds.as_str())
}
fn dataset_field(&self, dds_member: &str) -> Option<&str> {
self.field_map
.iter()
.find(|(_, dds)| dds == dds_member)
.map(|(ds, _)| ds.as_str())
}
#[must_use]
pub fn dataset_to_sample(&self, received: &ReceivedDataSet) -> DdsSample {
let mut fields = Vec::with_capacity(received.fields.len());
for f in &received.fields {
if let Some(member) = self.dds_member(&f.name) {
if let Some(value) = &f.value.value {
fields.push((String::from(member), value.clone()));
}
}
}
DdsSample {
topic: self.topic.clone(),
fields,
}
}
pub fn apply_sample(&self, sample: &DdsSample, dataset: &mut PublishedDataSet) -> usize {
let mut updated = 0;
for (member, value) in &sample.fields {
if let Some(field) = self.dataset_field(member) {
if dataset.set_variant(field, value.clone()) {
updated += 1;
}
}
}
updated
}
}
#[derive(Debug, Clone)]
pub struct OpcUaToDdsBridge<P: DdsPublisher> {
mapping: DataSetTopicMapping,
publisher: P,
}
impl<P: DdsPublisher> OpcUaToDdsBridge<P> {
pub fn new(mapping: DataSetTopicMapping, publisher: P) -> Self {
Self { mapping, publisher }
}
pub fn forward(&self, received: &ReceivedDataSet) -> Result<(), BridgeError> {
let sample = self.mapping.dataset_to_sample(received);
self.publisher.publish(&sample)
}
}
#[derive(Debug, Clone)]
pub struct DdsToOpcUaBridge<S: DdsSubscriber> {
mapping: DataSetTopicMapping,
subscriber: S,
}
impl<S: DdsSubscriber> DdsToOpcUaBridge<S> {
pub fn new(mapping: DataSetTopicMapping, subscriber: S) -> Self {
Self {
mapping,
subscriber,
}
}
pub fn pump(&self, dataset: &mut PublishedDataSet) -> Result<usize, BridgeError> {
let samples = self.subscriber.take(&self.mapping.topic)?;
let count = samples.len();
for sample in &samples {
self.mapping.apply_sample(sample, dataset);
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::reader::ReceivedField;
use core::cell::RefCell;
use zerodds_opcua_gateway::data_value::{DataValue, VariantValue};
fn variant_field(name: &str, index: u16, v: i32) -> ReceivedField {
ReceivedField {
name: String::from(name),
index,
value: DataValue {
value: Some(Variant::scalar(VariantValue::Int32(v))),
status: None,
source_timestamp: None,
server_timestamp: None,
source_pico_sec: None,
server_pico_sec: None,
},
}
}
#[test]
fn dataset_maps_to_dds_sample_with_renames() {
let mut mapping = DataSetTopicMapping::identity("Telemetry", &["a", "b"]);
mapping.field_map[1].1 = String::from("speed");
let received = ReceivedDataSet {
writer_id: 5,
kind: crate::uadp::dataset_message::DataSetMessageKind::KeyFrame,
fields: alloc::vec![variant_field("a", 0, 1), variant_field("b", 1, 2)],
};
let sample = mapping.dataset_to_sample(&received);
assert_eq!(sample.topic, "Telemetry");
assert_eq!(sample.fields[0].0, "a");
assert_eq!(sample.fields[1].0, "speed");
assert_eq!(sample.fields[1].1, Variant::scalar(VariantValue::Int32(2)));
}
#[test]
fn dds_sample_applies_to_published_dataset() {
let mapping = DataSetTopicMapping::identity("T", &["a", "b"]);
let mut pds = PublishedDataSet::new("ds");
pds.add_field(
"a",
DataValue {
value: Some(Variant::scalar(VariantValue::Int32(0))),
status: None,
source_timestamp: None,
server_timestamp: None,
source_pico_sec: None,
server_pico_sec: None,
},
)
.add_field(
"b",
DataValue {
value: Some(Variant::scalar(VariantValue::Int32(0))),
status: None,
source_timestamp: None,
server_timestamp: None,
source_pico_sec: None,
server_pico_sec: None,
},
);
let sample = DdsSample {
topic: String::from("T"),
fields: alloc::vec![
(String::from("a"), Variant::scalar(VariantValue::Int32(11))),
(String::from("b"), Variant::scalar(VariantValue::Int32(22))),
],
};
assert_eq!(mapping.apply_sample(&sample, &mut pds), 2);
assert_eq!(
pds.values()[0].value,
Some(Variant::scalar(VariantValue::Int32(11)))
);
assert_eq!(
pds.values()[1].value,
Some(Variant::scalar(VariantValue::Int32(22)))
);
}
struct MockDds {
published: RefCell<Vec<DdsSample>>,
}
impl DdsPublisher for MockDds {
fn publish(&self, sample: &DdsSample) -> Result<(), BridgeError> {
self.published.borrow_mut().push(sample.clone());
Ok(())
}
}
impl DdsSubscriber for MockDds {
fn take(&self, _topic: &str) -> Result<Vec<DdsSample>, BridgeError> {
Ok(self.published.borrow_mut().drain(..).collect())
}
}
#[test]
fn opcua_to_dds_and_back() {
let mapping = DataSetTopicMapping::identity("T", &["a"]);
let dds = MockDds {
published: RefCell::new(Vec::new()),
};
let received = ReceivedDataSet {
writer_id: 1,
kind: crate::uadp::dataset_message::DataSetMessageKind::KeyFrame,
fields: alloc::vec![variant_field("a", 0, 99)],
};
OpcUaToDdsBridge::new(mapping.clone(), &dds)
.forward(&received)
.expect("forward");
let mut pds = PublishedDataSet::new("ds");
pds.add_field(
"a",
DataValue {
value: Some(Variant::scalar(VariantValue::Int32(0))),
status: None,
source_timestamp: None,
server_timestamp: None,
source_pico_sec: None,
server_pico_sec: None,
},
);
let consumed = DdsToOpcUaBridge::new(mapping, &dds)
.pump(&mut pds)
.expect("pump");
assert_eq!(consumed, 1);
assert_eq!(
pds.values()[0].value,
Some(Variant::scalar(VariantValue::Int32(99)))
);
}
impl DdsPublisher for &MockDds {
fn publish(&self, sample: &DdsSample) -> Result<(), BridgeError> {
(**self).publish(sample)
}
}
impl DdsSubscriber for &MockDds {
fn take(&self, topic: &str) -> Result<Vec<DdsSample>, BridgeError> {
(**self).take(topic)
}
}
}