1use alloc::string::String;
21use alloc::vec::Vec;
22
23use crate::binary::{from_binary, to_binary};
24use crate::error::{DecodeError, EncodeError};
25use crate::reader::{MatchedDataSet, ReaderGroup};
26use crate::transport::{PubSubTransport, TransportError};
27use crate::uadp::network_message::NetworkMessage;
28use crate::writer::{DataSetWriter, PublishedDataSet, WriterGroup};
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum DaemonError {
33 Encode(EncodeError),
35 Decode(DecodeError),
37 Transport(TransportError),
39 UnknownDataSet(String),
41 #[cfg(feature = "security")]
43 Security(crate::security::SecurityError),
44}
45
46impl From<EncodeError> for DaemonError {
47 fn from(e: EncodeError) -> Self {
48 Self::Encode(e)
49 }
50}
51
52impl From<DecodeError> for DaemonError {
53 fn from(e: DecodeError) -> Self {
54 Self::Decode(e)
55 }
56}
57
58impl From<TransportError> for DaemonError {
59 fn from(e: TransportError) -> Self {
60 Self::Transport(e)
61 }
62}
63
64#[cfg(feature = "security")]
65impl From<crate::security::SecurityError> for DaemonError {
66 fn from(e: crate::security::SecurityError) -> Self {
67 Self::Security(e)
68 }
69}
70
71impl core::fmt::Display for DaemonError {
72 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
73 match self {
74 Self::Encode(e) => write!(f, "encode error: {e}"),
75 Self::Decode(e) => write!(f, "decode error: {e}"),
76 Self::Transport(e) => write!(f, "transport error: {e}"),
77 Self::UnknownDataSet(n) => write!(f, "writer references unknown PublishedDataSet {n}"),
78 #[cfg(feature = "security")]
79 Self::Security(e) => write!(f, "security error: {e}"),
80 }
81 }
82}
83
84#[cfg(feature = "std")]
85impl std::error::Error for DaemonError {}
86
87#[derive(Debug, Clone)]
90pub struct Publisher<T: PubSubTransport> {
91 transport: T,
92 group: WriterGroup,
93 writers: Vec<DataSetWriter>,
94 datasets: Vec<PublishedDataSet>,
95 #[cfg_attr(not(feature = "security"), allow(dead_code))]
97 nonce_counter: u64,
98}
99
100impl<T: PubSubTransport> Publisher<T> {
101 #[must_use]
103 pub fn new(transport: T, group: WriterGroup) -> Self {
104 Self {
105 transport,
106 group,
107 writers: Vec::new(),
108 datasets: Vec::new(),
109 nonce_counter: 0,
110 }
111 }
112
113 pub fn add_dataset(&mut self, dataset: PublishedDataSet) -> &mut Self {
115 self.datasets.push(dataset);
116 self
117 }
118
119 pub fn add_writer(&mut self, writer: DataSetWriter) -> &mut Self {
121 self.writers.push(writer);
122 self
123 }
124
125 pub fn dataset_mut(&mut self, name: &str) -> Option<&mut PublishedDataSet> {
127 self.datasets.iter_mut().find(|d| d.name() == name)
128 }
129
130 pub fn transport(&self) -> &T {
132 &self.transport
133 }
134
135 fn frame_cycle(&mut self, timestamp: Option<i64>) -> Result<NetworkMessage, DaemonError> {
139 let mut messages = Vec::with_capacity(self.writers.len());
140 for w in &mut self.writers {
141 let name = w.config().data_set_name.clone();
142 let ds = self
143 .datasets
144 .iter()
145 .find(|d| d.name() == name)
146 .ok_or(DaemonError::UnknownDataSet(name))?;
147 messages.push(w.produce(ds, timestamp)?);
148 }
149 Ok(self.group.frame(messages, timestamp))
150 }
151
152 pub fn publish_cycle(&mut self, timestamp: Option<i64>) -> Result<(), DaemonError> {
159 let nm = self.frame_cycle(timestamp)?;
160 let bytes = to_binary(&nm)?;
161 self.transport.send(&bytes)?;
162 Ok(())
163 }
164
165 #[cfg(feature = "security")]
173 pub fn publish_cycle_secured(
174 &mut self,
175 timestamp: Option<i64>,
176 policy: crate::security::SecurityPolicy,
177 key: &crate::security::SecurityKey,
178 ) -> Result<(), DaemonError> {
179 let nm = self.frame_cycle(timestamp)?;
180 self.nonce_counter = self.nonce_counter.wrapping_add(1);
181 let nonce = self.nonce_counter.to_be_bytes();
182 let bytes = crate::security::protect(&nm, policy, key, &nonce, true)?;
183 self.transport.send(&bytes)?;
184 Ok(())
185 }
186}
187
188#[derive(Debug, Clone)]
190pub struct Subscriber<T: PubSubTransport> {
191 transport: T,
192 reader_group: ReaderGroup,
193}
194
195impl<T: PubSubTransport> Subscriber<T> {
196 #[must_use]
198 pub fn new(transport: T, reader_group: ReaderGroup) -> Self {
199 Self {
200 transport,
201 reader_group,
202 }
203 }
204
205 pub fn reader_group_mut(&mut self) -> &mut ReaderGroup {
207 &mut self.reader_group
208 }
209
210 pub fn transport(&self) -> &T {
212 &self.transport
213 }
214
215 pub fn poll(&self) -> Result<Vec<MatchedDataSet>, DaemonError> {
221 let bytes = match self.transport.receive() {
222 Ok(b) => b,
223 Err(TransportError::Timeout) => return Ok(Vec::new()),
224 Err(e) => return Err(DaemonError::Transport(e)),
225 };
226 let nm: NetworkMessage = from_binary(&bytes)?;
227 Ok(self.reader_group.accept(&nm)?)
228 }
229
230 #[cfg(feature = "security")]
238 pub fn poll_secured(
239 &self,
240 policy: crate::security::SecurityPolicy,
241 sks: &crate::security::SecurityKeyService,
242 ) -> Result<Vec<MatchedDataSet>, DaemonError> {
243 let bytes = match self.transport.receive() {
244 Ok(b) => b,
245 Err(TransportError::Timeout) => return Ok(Vec::new()),
246 Err(e) => return Err(DaemonError::Transport(e)),
247 };
248 let nm = crate::security::unprotect(&bytes, policy, sks)?;
249 Ok(self.reader_group.accept(&nm)?)
250 }
251}
252
253#[cfg(all(test, feature = "std"))]
254mod tests {
255 use super::*;
256 use crate::config::{
257 ConfigurationVersion, DataSetMetaData, DataSetReaderConfig, DataSetWriterConfig,
258 FieldMetaData, WriterGroupConfig,
259 };
260 use crate::reader::DataSetReader;
261 use crate::transport::LoopbackTransport;
262 use crate::uadp::dataset_message::DataSetMessageKind;
263 use crate::uadp::network_message::PublisherId;
264 use zerodds_opcua_gateway::data_value::{DataValue, Variant, VariantValue};
265 use zerodds_opcua_gateway::types::BuiltinTypeKind;
266
267 fn dv(v: i32) -> DataValue {
268 DataValue {
269 value: Some(Variant::scalar(VariantValue::Int32(v))),
270 status: None,
271 source_timestamp: None,
272 server_timestamp: None,
273 source_pico_sec: None,
274 server_pico_sec: None,
275 }
276 }
277
278 fn meta() -> DataSetMetaData {
279 DataSetMetaData::new(
280 "ds1",
281 alloc::vec![FieldMetaData::scalar("a", BuiltinTypeKind::Int32)],
282 )
283 }
284
285 fn publisher(tx: LoopbackTransport) -> Publisher<LoopbackTransport> {
286 let mut pds = PublishedDataSet::new("ds1");
287 pds.add_field("a", dv(0));
288 let mut pubr = Publisher::new(
289 tx,
290 WriterGroup::new(WriterGroupConfig::new("g1", 1), PublisherId::UInt16(9)),
291 );
292 pubr.add_dataset(pds).add_writer(DataSetWriter::new(
293 DataSetWriterConfig::new("w1", 5, "ds1"),
294 ConfigurationVersion::default(),
295 ));
296 pubr
297 }
298
299 fn subscriber(tx: LoopbackTransport) -> Subscriber<LoopbackTransport> {
300 let mut rg = ReaderGroup::new();
301 rg.add_reader(DataSetReader::new(DataSetReaderConfig::new("r1", meta())));
302 Subscriber::new(tx, rg)
303 }
304
305 #[test]
306 fn end_to_end_publish_then_poll() {
307 let bus = LoopbackTransport::new();
308 let mut pubr = publisher(bus.clone());
309 let subr = subscriber(bus);
310
311 pubr.dataset_mut("ds1").expect("ds").set("a", dv(123));
312 pubr.publish_cycle(None).expect("publish");
313
314 let matched = subr.poll().expect("poll");
315 assert_eq!(matched.len(), 1);
316 assert_eq!(matched[0].reader_name, "r1");
317 assert_eq!(matched[0].data.writer_id, 5);
318 assert_eq!(matched[0].data.kind, DataSetMessageKind::KeyFrame);
319 assert_eq!(
320 matched[0].data.fields[0].value.value,
321 Some(Variant::scalar(VariantValue::Int32(123)))
322 );
323 }
324
325 #[test]
326 fn poll_returns_empty_when_idle() {
327 let bus = LoopbackTransport::new();
328 let subr = subscriber(bus);
329 assert!(subr.poll().expect("poll").is_empty());
330 }
331
332 #[test]
333 fn unknown_dataset_is_reported() {
334 let bus = LoopbackTransport::new();
335 let mut pubr = Publisher::new(
336 bus,
337 WriterGroup::new(WriterGroupConfig::new("g1", 1), PublisherId::UInt16(9)),
338 );
339 pubr.add_writer(DataSetWriter::new(
341 DataSetWriterConfig::new("w1", 5, "ds1"),
342 ConfigurationVersion::default(),
343 ));
344 assert_eq!(
345 pubr.publish_cycle(None),
346 Err(DaemonError::UnknownDataSet(String::from("ds1")))
347 );
348 }
349
350 #[cfg(feature = "security")]
351 #[test]
352 fn end_to_end_secured() {
353 use crate::security::{SecurityKey, SecurityKeyService, SecurityPolicy};
354
355 let policy = SecurityPolicy::Aes256Ctr;
356 let blob = alloc::vec![0x5Au8; policy.key_material_len()];
357 let key = SecurityKey::from_blob(policy, 11, &blob).expect("key");
358 let sks = SecurityKeyService::new(policy, "g", key.clone());
359
360 let bus = LoopbackTransport::new();
361 let mut pubr = publisher(bus.clone());
362 let subr = subscriber(bus);
363
364 pubr.dataset_mut("ds1").expect("ds").set("a", dv(777));
365 pubr.publish_cycle_secured(None, policy, &key)
366 .expect("publish");
367
368 assert_eq!(pubr.transport().pending(), 1);
370
371 let matched = subr.poll_secured(policy, &sks).expect("poll");
372 assert_eq!(matched.len(), 1);
373 assert_eq!(
374 matched[0].data.fields[0].value.value,
375 Some(Variant::scalar(VariantValue::Int32(777)))
376 );
377 }
378}