use alloc::string::String;
use alloc::vec::Vec;
use zerodds_opcua_gateway::data_value::{Variant, VariantValue};
use zerodds_opcua_gateway::node_id::NodeId;
use zerodds_opcua_gateway::types::{NodeClass, QualifiedName};
use crate::config::{
DataSetMetaData, DataSetReaderConfig, DataSetWriterConfig, PubSubConnectionConfig,
ReaderGroupConfig, WriterGroupConfig,
};
pub const PUBLISH_SUBSCRIBE: NodeId = NodeId::numeric(0, 14443);
pub const PUBSUB_CONNECTION_TYPE: NodeId = NodeId::numeric(0, 14209);
pub const WRITER_GROUP_TYPE: NodeId = NodeId::numeric(0, 17725);
pub const DATASET_WRITER_TYPE: NodeId = NodeId::numeric(0, 15298);
pub const READER_GROUP_TYPE: NodeId = NodeId::numeric(0, 17999);
pub const DATASET_READER_TYPE: NodeId = NodeId::numeric(0, 15306);
pub const PUBLISHED_DATASET_TYPE: NodeId = NodeId::numeric(0, 14509);
pub const BASE_DATA_VARIABLE_TYPE: NodeId = NodeId::numeric(0, 63);
pub const HAS_COMPONENT: NodeId = NodeId::numeric(0, 47);
pub const HAS_PROPERTY: NodeId = NodeId::numeric(0, 46);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InfoModelError {
NotFound(NodeId),
}
impl core::fmt::Display for InfoModelError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::NotFound(n) => write!(f, "PubSub node {n:?} not found"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for InfoModelError {}
#[derive(Debug, Clone, PartialEq)]
pub struct DataSetWriterModel {
pub node_id: NodeId,
pub config: DataSetWriterConfig,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DataSetReaderModel {
pub node_id: NodeId,
pub config: DataSetReaderConfig,
}
#[derive(Debug, Clone, PartialEq)]
pub struct WriterGroupModel {
pub node_id: NodeId,
pub config: WriterGroupConfig,
pub writers: Vec<DataSetWriterModel>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ReaderGroupModel {
pub node_id: NodeId,
pub config: ReaderGroupConfig,
pub readers: Vec<DataSetReaderModel>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ConnectionModel {
pub node_id: NodeId,
pub config: PubSubConnectionConfig,
pub writer_groups: Vec<WriterGroupModel>,
pub reader_groups: Vec<ReaderGroupModel>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PublishedDataSetModel {
pub node_id: NodeId,
pub meta_data: DataSetMetaData,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PubSubNode {
pub node_id: NodeId,
pub browse_name: QualifiedName,
pub node_class: NodeClass,
pub type_definition: NodeId,
pub parent: Option<NodeId>,
pub reference_from_parent: Option<NodeId>,
pub value: Option<Variant>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PubSubConfiguration {
namespace_index: u16,
next_id: u32,
connections: Vec<ConnectionModel>,
published_data_sets: Vec<PublishedDataSetModel>,
}
impl PubSubConfiguration {
#[must_use]
pub fn new(namespace_index: u16) -> Self {
Self {
namespace_index,
next_id: 1,
connections: Vec::new(),
published_data_sets: Vec::new(),
}
}
fn alloc_node(&mut self) -> NodeId {
let id = self.next_id;
self.next_id += 1;
NodeId::numeric(self.namespace_index, id)
}
#[must_use]
pub fn connections(&self) -> &[ConnectionModel] {
&self.connections
}
#[must_use]
pub fn published_data_sets(&self) -> &[PublishedDataSetModel] {
&self.published_data_sets
}
pub fn add_connection(&mut self, config: PubSubConnectionConfig) -> NodeId {
let node_id = self.alloc_node();
self.connections.push(ConnectionModel {
node_id: node_id.clone(),
config,
writer_groups: Vec::new(),
reader_groups: Vec::new(),
});
node_id
}
pub fn add_published_data_set(&mut self, meta_data: DataSetMetaData) -> NodeId {
let node_id = self.alloc_node();
self.published_data_sets.push(PublishedDataSetModel {
node_id: node_id.clone(),
meta_data,
});
node_id
}
fn connection_mut(&mut self, node_id: &NodeId) -> Option<&mut ConnectionModel> {
self.connections.iter_mut().find(|c| &c.node_id == node_id)
}
pub fn add_writer_group(
&mut self,
connection: &NodeId,
config: WriterGroupConfig,
) -> Result<NodeId, InfoModelError> {
let node_id = NodeId::numeric(self.namespace_index, self.next_id);
let conn = self
.connection_mut(connection)
.ok_or_else(|| InfoModelError::NotFound(connection.clone()))?;
conn.writer_groups.push(WriterGroupModel {
node_id: node_id.clone(),
config,
writers: Vec::new(),
});
self.next_id += 1;
Ok(node_id)
}
pub fn add_reader_group(
&mut self,
connection: &NodeId,
config: ReaderGroupConfig,
) -> Result<NodeId, InfoModelError> {
let node_id = NodeId::numeric(self.namespace_index, self.next_id);
let conn = self
.connection_mut(connection)
.ok_or_else(|| InfoModelError::NotFound(connection.clone()))?;
conn.reader_groups.push(ReaderGroupModel {
node_id: node_id.clone(),
config,
readers: Vec::new(),
});
self.next_id += 1;
Ok(node_id)
}
pub fn add_dataset_writer(
&mut self,
group: &NodeId,
config: DataSetWriterConfig,
) -> Result<NodeId, InfoModelError> {
let node_id = NodeId::numeric(self.namespace_index, self.next_id);
let g = self
.connections
.iter_mut()
.flat_map(|c| c.writer_groups.iter_mut())
.find(|g| &g.node_id == group)
.ok_or_else(|| InfoModelError::NotFound(group.clone()))?;
g.writers.push(DataSetWriterModel {
node_id: node_id.clone(),
config,
});
self.next_id += 1;
Ok(node_id)
}
pub fn add_dataset_reader(
&mut self,
group: &NodeId,
config: DataSetReaderConfig,
) -> Result<NodeId, InfoModelError> {
let node_id = NodeId::numeric(self.namespace_index, self.next_id);
let g = self
.connections
.iter_mut()
.flat_map(|c| c.reader_groups.iter_mut())
.find(|g| &g.node_id == group)
.ok_or_else(|| InfoModelError::NotFound(group.clone()))?;
g.readers.push(DataSetReaderModel {
node_id: node_id.clone(),
config,
});
self.next_id += 1;
Ok(node_id)
}
pub fn remove(&mut self, node_id: &NodeId) -> bool {
let before = self.count();
self.published_data_sets.retain(|p| &p.node_id != node_id);
self.connections.retain(|c| &c.node_id != node_id);
for conn in &mut self.connections {
conn.writer_groups.retain(|g| &g.node_id != node_id);
conn.reader_groups.retain(|g| &g.node_id != node_id);
for g in &mut conn.writer_groups {
g.writers.retain(|w| &w.node_id != node_id);
}
for g in &mut conn.reader_groups {
g.readers.retain(|r| &r.node_id != node_id);
}
}
self.count() != before
}
fn count(&self) -> usize {
self.published_data_sets.len()
+ self
.connections
.iter()
.map(|c| {
1 + c.writer_groups.len()
+ c.reader_groups.len()
+ c.writer_groups
.iter()
.map(|g| g.writers.len())
.sum::<usize>()
+ c.reader_groups
.iter()
.map(|g| g.readers.len())
.sum::<usize>()
})
.sum::<usize>()
}
#[must_use]
pub fn nodes(&self) -> Vec<PubSubNode> {
let mut out = Vec::new();
let root = PUBLISH_SUBSCRIBE;
for conn in &self.connections {
out.push(object_node(
&conn.node_id,
&conn.config.name,
self.namespace_index,
PUBSUB_CONNECTION_TYPE,
&root,
));
self.push_props(
&mut out,
&conn.node_id,
&[
("PublisherId", publisher_id_variant(&conn.config)),
(
"TransportProfileUri",
str_variant(&conn.config.transport_profile_uri),
),
("Address", str_variant(&conn.config.address_url)),
],
);
for g in &conn.writer_groups {
out.push(object_node(
&g.node_id,
&g.config.name,
self.namespace_index,
WRITER_GROUP_TYPE,
&conn.node_id,
));
self.push_props(
&mut out,
&g.node_id,
&[
("WriterGroupId", u16_variant(g.config.writer_group_id)),
(
"PublishingInterval",
f64_variant(g.config.publishing_interval_ms),
),
],
);
for w in &g.writers {
out.push(object_node(
&w.node_id,
&w.config.name,
self.namespace_index,
DATASET_WRITER_TYPE,
&g.node_id,
));
self.push_props(
&mut out,
&w.node_id,
&[
("DataSetWriterId", u16_variant(w.config.data_set_writer_id)),
(
"KeyFrameCount",
Variant::scalar(VariantValue::UInt32(w.config.key_frame_count)),
),
("DataSetName", str_variant(&w.config.data_set_name)),
],
);
}
}
for g in &conn.reader_groups {
out.push(object_node(
&g.node_id,
&g.config.name,
self.namespace_index,
READER_GROUP_TYPE,
&conn.node_id,
));
for r in &g.readers {
out.push(object_node(
&r.node_id,
&r.config.name,
self.namespace_index,
DATASET_READER_TYPE,
&g.node_id,
));
self.push_props(
&mut out,
&r.node_id,
&[("DataSetWriterId", u16_variant(r.config.data_set_writer_id))],
);
}
}
}
for pds in &self.published_data_sets {
out.push(object_node(
&pds.node_id,
&pds.meta_data.name,
self.namespace_index,
PUBLISHED_DATASET_TYPE,
&root,
));
}
out
}
fn push_props(&self, out: &mut Vec<PubSubNode>, parent: &NodeId, props: &[(&str, Variant)]) {
for (name, value) in props {
out.push(PubSubNode {
node_id: NodeId {
namespace_index: self.namespace_index,
identifier_type: zerodds_opcua_gateway::node_id::NodeIdentifier::String(
property_id(parent, name),
),
},
browse_name: QualifiedName {
namespace_index: self.namespace_index,
name: String::from(*name),
},
node_class: NodeClass::Variable,
type_definition: BASE_DATA_VARIABLE_TYPE,
parent: Some(parent.clone()),
reference_from_parent: Some(HAS_PROPERTY),
value: Some(value.clone()),
});
}
}
}
fn object_node(
node_id: &NodeId,
name: &str,
ns: u16,
type_definition: NodeId,
parent: &NodeId,
) -> PubSubNode {
PubSubNode {
node_id: node_id.clone(),
browse_name: QualifiedName {
namespace_index: ns,
name: String::from(name),
},
node_class: NodeClass::Object,
type_definition,
parent: Some(parent.clone()),
reference_from_parent: Some(HAS_COMPONENT),
value: None,
}
}
fn property_id(parent: &NodeId, name: &str) -> String {
use core::fmt::Write as _;
let mut s = String::new();
let _ = write!(&mut s, "{parent:?}.{name}");
s
}
fn str_variant(s: &str) -> Variant {
Variant::scalar(VariantValue::String(String::from(s)))
}
fn u16_variant(v: u16) -> Variant {
Variant::scalar(VariantValue::UInt16(v))
}
fn f64_variant(v: f64) -> Variant {
Variant::scalar(VariantValue::Double(v))
}
fn publisher_id_variant(c: &PubSubConnectionConfig) -> Variant {
use crate::uadp::network_message::PublisherId;
match &c.publisher_id {
PublisherId::Byte(v) => Variant::scalar(VariantValue::Byte(*v)),
PublisherId::UInt16(v) => Variant::scalar(VariantValue::UInt16(*v)),
PublisherId::UInt32(v) => Variant::scalar(VariantValue::UInt32(*v)),
PublisherId::UInt64(v) => Variant::scalar(VariantValue::UInt64(*v)),
PublisherId::String(s) => Variant::scalar(VariantValue::String(s.clone())),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::uadp::network_message::PublisherId;
fn connection() -> PubSubConnectionConfig {
PubSubConnectionConfig {
name: String::from("conn1"),
publisher_id: PublisherId::UInt16(9),
transport_profile_uri: String::from("uadp"),
address_url: String::from("opc.udp://239.0.0.1:4840"),
}
}
#[test]
fn builds_and_browses_hierarchy() {
let mut cfg = PubSubConfiguration::new(1);
let conn = cfg.add_connection(connection());
let wg = cfg
.add_writer_group(&conn, WriterGroupConfig::new("wg1", 1))
.expect("wg");
let w = cfg
.add_dataset_writer(&wg, DataSetWriterConfig::new("w1", 5, "ds1"))
.expect("w");
let rg = cfg
.add_reader_group(&conn, ReaderGroupConfig::default())
.expect("rg");
cfg.add_dataset_reader(
&rg,
DataSetReaderConfig::new("r1", DataSetMetaData::default()),
)
.expect("r");
cfg.add_published_data_set(DataSetMetaData::new("ds1", Vec::new()));
let nodes = cfg.nodes();
let conn_node = nodes.iter().find(|n| n.node_id == conn).expect("conn node");
assert_eq!(conn_node.parent.as_ref(), Some(&PUBLISH_SUBSCRIBE));
assert_eq!(conn_node.type_definition, PUBSUB_CONNECTION_TYPE);
assert_eq!(conn_node.node_class, NodeClass::Object);
let w_node = nodes.iter().find(|n| n.node_id == w).expect("writer node");
assert_eq!(w_node.parent.as_ref(), Some(&wg));
assert_eq!(w_node.type_definition, DATASET_WRITER_TYPE);
assert!(nodes.iter().any(|n| {
n.parent.as_ref() == Some(&w)
&& n.browse_name.name == "DataSetWriterId"
&& n.value == Some(u16_variant(5))
}));
}
#[test]
fn add_to_unknown_group_is_rejected() {
let mut cfg = PubSubConfiguration::new(1);
let phantom = NodeId::numeric(1, 999);
assert_eq!(
cfg.add_dataset_writer(&phantom, DataSetWriterConfig::new("w", 1, "ds")),
Err(InfoModelError::NotFound(phantom))
);
}
#[test]
fn remove_prunes_subtree_entries() {
let mut cfg = PubSubConfiguration::new(1);
let conn = cfg.add_connection(connection());
let wg = cfg
.add_writer_group(&conn, WriterGroupConfig::new("wg1", 1))
.expect("wg");
let _w = cfg
.add_dataset_writer(&wg, DataSetWriterConfig::new("w1", 5, "ds1"))
.expect("w");
assert!(cfg.remove(&wg));
assert!(cfg.connections()[0].writer_groups.is_empty());
assert!(cfg.remove(&conn));
assert!(cfg.connections().is_empty());
assert!(!cfg.remove(&NodeId::numeric(1, 4242)));
}
}