Skip to main content

zerodds_opcua_pubsub/
daemon.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! The PubSub runtime — drives writer and reader groups over a
4//! [`PubSubTransport`] (OPC Foundation Part 14 §6.2.6 PubSubConnection
5//! behaviour).
6//!
7//! A [`Publisher`] owns a [`WriterGroup`], its [`DataSetWriter`]s and the
8//! [`PublishedDataSet`]s they read from; each [`publish_cycle`] produces a
9//! DataSetMessage per writer, frames them into one NetworkMessage and sends it.
10//! A [`Subscriber`] owns a [`ReaderGroup`] and on each [`poll`] receives one
11//! datagram, decodes it and dispatches it to the matching readers.
12//!
13//! With the `security` feature, `Publisher::publish_cycle_secured` and
14//! `Subscriber::poll_secured` add UADP message security (`crate::security`)
15//! around the same flow.
16//!
17//! [`publish_cycle`]: Publisher::publish_cycle
18//! [`poll`]: Subscriber::poll
19
20use alloc::string::String;
21use alloc::vec::Vec;
22
23use crate::binary::{from_binary, to_binary};
24use crate::error::{DecodeError, EncodeError};
25use crate::reader::{MatchedDataSet, ReaderGroup};
26use crate::transport::{PubSubTransport, TransportError};
27use crate::uadp::network_message::NetworkMessage;
28use crate::writer::{DataSetWriter, PublishedDataSet, WriterGroup};
29
30/// An error from the PubSub runtime.
31#[derive(Debug, Clone, PartialEq)]
32pub enum DaemonError {
33    /// A NetworkMessage could not be encoded.
34    Encode(EncodeError),
35    /// A received datagram could not be decoded.
36    Decode(DecodeError),
37    /// The transport carrier failed.
38    Transport(TransportError),
39    /// A writer references a PublishedDataSet the Publisher does not hold.
40    UnknownDataSet(String),
41    /// A security operation failed (feature `security`).
42    #[cfg(feature = "security")]
43    Security(crate::security::SecurityError),
44}
45
46impl From<EncodeError> for DaemonError {
47    fn from(e: EncodeError) -> Self {
48        Self::Encode(e)
49    }
50}
51
52impl From<DecodeError> for DaemonError {
53    fn from(e: DecodeError) -> Self {
54        Self::Decode(e)
55    }
56}
57
58impl From<TransportError> for DaemonError {
59    fn from(e: TransportError) -> Self {
60        Self::Transport(e)
61    }
62}
63
64#[cfg(feature = "security")]
65impl From<crate::security::SecurityError> for DaemonError {
66    fn from(e: crate::security::SecurityError) -> Self {
67        Self::Security(e)
68    }
69}
70
71impl core::fmt::Display for DaemonError {
72    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
73        match self {
74            Self::Encode(e) => write!(f, "encode error: {e}"),
75            Self::Decode(e) => write!(f, "decode error: {e}"),
76            Self::Transport(e) => write!(f, "transport error: {e}"),
77            Self::UnknownDataSet(n) => write!(f, "writer references unknown PublishedDataSet {n}"),
78            #[cfg(feature = "security")]
79            Self::Security(e) => write!(f, "security error: {e}"),
80        }
81    }
82}
83
84#[cfg(feature = "std")]
85impl std::error::Error for DaemonError {}
86
87/// The publishing runtime: a [`WriterGroup`] plus its writers and the
88/// PublishedDataSets they read from, bound to a transport.
89#[derive(Debug, Clone)]
90pub struct Publisher<T: PubSubTransport> {
91    transport: T,
92    group: WriterGroup,
93    writers: Vec<DataSetWriter>,
94    datasets: Vec<PublishedDataSet>,
95    // Monotonic per-message nonce source; only read on the secured path.
96    #[cfg_attr(not(feature = "security"), allow(dead_code))]
97    nonce_counter: u64,
98}
99
100impl<T: PubSubTransport> Publisher<T> {
101    /// Creates a Publisher for `group` over `transport`.
102    #[must_use]
103    pub fn new(transport: T, group: WriterGroup) -> Self {
104        Self {
105            transport,
106            group,
107            writers: Vec::new(),
108            datasets: Vec::new(),
109            nonce_counter: 0,
110        }
111    }
112
113    /// Registers a PublishedDataSet (referenced by writers via its name).
114    pub fn add_dataset(&mut self, dataset: PublishedDataSet) -> &mut Self {
115        self.datasets.push(dataset);
116        self
117    }
118
119    /// Registers a DataSetWriter.
120    pub fn add_writer(&mut self, writer: DataSetWriter) -> &mut Self {
121        self.writers.push(writer);
122        self
123    }
124
125    /// Mutable access to a registered PublishedDataSet by name.
126    pub fn dataset_mut(&mut self, name: &str) -> Option<&mut PublishedDataSet> {
127        self.datasets.iter_mut().find(|d| d.name() == name)
128    }
129
130    /// The transport carrier.
131    pub fn transport(&self) -> &T {
132        &self.transport
133    }
134
135    /// Produces one DataSetMessage per writer and frames them into a single
136    /// NetworkMessage. `timestamp` (DateTime ticks) feeds the message/group
137    /// Timestamp content-mask bits.
138    fn frame_cycle(&mut self, timestamp: Option<i64>) -> Result<NetworkMessage, DaemonError> {
139        let mut messages = Vec::with_capacity(self.writers.len());
140        for w in &mut self.writers {
141            let name = w.config().data_set_name.clone();
142            let ds = self
143                .datasets
144                .iter()
145                .find(|d| d.name() == name)
146                .ok_or(DaemonError::UnknownDataSet(name))?;
147            messages.push(w.produce(ds, timestamp)?);
148        }
149        Ok(self.group.frame(messages, timestamp))
150    }
151
152    /// Runs one publish cycle: produce, frame, serialise and send a plaintext
153    /// NetworkMessage.
154    ///
155    /// # Errors
156    /// [`DaemonError`] if a dataset is missing, encoding fails or the send
157    /// fails.
158    pub fn publish_cycle(&mut self, timestamp: Option<i64>) -> Result<(), DaemonError> {
159        let nm = self.frame_cycle(timestamp)?;
160        let bytes = to_binary(&nm)?;
161        self.transport.send(&bytes)?;
162        Ok(())
163    }
164
165    /// Runs one publish cycle with UADP message security: produce, frame,
166    /// then [`protect`](crate::security::protect) (always signed, encrypted)
167    /// with a per-cycle monotonic nonce, and send.
168    ///
169    /// # Errors
170    /// [`DaemonError`] on a missing dataset, encode/crypto failure or send
171    /// failure.
172    #[cfg(feature = "security")]
173    pub fn publish_cycle_secured(
174        &mut self,
175        timestamp: Option<i64>,
176        policy: crate::security::SecurityPolicy,
177        key: &crate::security::SecurityKey,
178    ) -> Result<(), DaemonError> {
179        let nm = self.frame_cycle(timestamp)?;
180        self.nonce_counter = self.nonce_counter.wrapping_add(1);
181        let nonce = self.nonce_counter.to_be_bytes();
182        let bytes = crate::security::protect(&nm, policy, key, &nonce, true)?;
183        self.transport.send(&bytes)?;
184        Ok(())
185    }
186}
187
188/// The subscribing runtime: a [`ReaderGroup`] bound to a transport.
189#[derive(Debug, Clone)]
190pub struct Subscriber<T: PubSubTransport> {
191    transport: T,
192    reader_group: ReaderGroup,
193}
194
195impl<T: PubSubTransport> Subscriber<T> {
196    /// Creates a Subscriber for `reader_group` over `transport`.
197    #[must_use]
198    pub fn new(transport: T, reader_group: ReaderGroup) -> Self {
199        Self {
200            transport,
201            reader_group,
202        }
203    }
204
205    /// Mutable access to the reader group (to add readers).
206    pub fn reader_group_mut(&mut self) -> &mut ReaderGroup {
207        &mut self.reader_group
208    }
209
210    /// The transport carrier.
211    pub fn transport(&self) -> &T {
212        &self.transport
213    }
214
215    /// Receives and dispatches one plaintext datagram. Returns an empty vector
216    /// when the transport has nothing pending ([`TransportError::Timeout`]).
217    ///
218    /// # Errors
219    /// [`DaemonError`] on a non-timeout transport failure or a decode failure.
220    pub fn poll(&self) -> Result<Vec<MatchedDataSet>, DaemonError> {
221        let bytes = match self.transport.receive() {
222            Ok(b) => b,
223            Err(TransportError::Timeout) => return Ok(Vec::new()),
224            Err(e) => return Err(DaemonError::Transport(e)),
225        };
226        let nm: NetworkMessage = from_binary(&bytes)?;
227        Ok(self.reader_group.accept(&nm)?)
228    }
229
230    /// Receives and dispatches one secured datagram, verifying and decrypting
231    /// it via [`unprotect`](crate::security::unprotect). Returns an empty
232    /// vector on a transport timeout.
233    ///
234    /// # Errors
235    /// [`DaemonError`] on a non-timeout transport failure, a security failure
236    /// or a decode failure.
237    #[cfg(feature = "security")]
238    pub fn poll_secured(
239        &self,
240        policy: crate::security::SecurityPolicy,
241        sks: &crate::security::SecurityKeyService,
242    ) -> Result<Vec<MatchedDataSet>, DaemonError> {
243        let bytes = match self.transport.receive() {
244            Ok(b) => b,
245            Err(TransportError::Timeout) => return Ok(Vec::new()),
246            Err(e) => return Err(DaemonError::Transport(e)),
247        };
248        let nm = crate::security::unprotect(&bytes, policy, sks)?;
249        Ok(self.reader_group.accept(&nm)?)
250    }
251}
252
253#[cfg(all(test, feature = "std"))]
254mod tests {
255    use super::*;
256    use crate::config::{
257        ConfigurationVersion, DataSetMetaData, DataSetReaderConfig, DataSetWriterConfig,
258        FieldMetaData, WriterGroupConfig,
259    };
260    use crate::reader::DataSetReader;
261    use crate::transport::LoopbackTransport;
262    use crate::uadp::dataset_message::DataSetMessageKind;
263    use crate::uadp::network_message::PublisherId;
264    use zerodds_opcua_gateway::data_value::{DataValue, Variant, VariantValue};
265    use zerodds_opcua_gateway::types::BuiltinTypeKind;
266
267    fn dv(v: i32) -> DataValue {
268        DataValue {
269            value: Some(Variant::scalar(VariantValue::Int32(v))),
270            status: None,
271            source_timestamp: None,
272            server_timestamp: None,
273            source_pico_sec: None,
274            server_pico_sec: None,
275        }
276    }
277
278    fn meta() -> DataSetMetaData {
279        DataSetMetaData::new(
280            "ds1",
281            alloc::vec![FieldMetaData::scalar("a", BuiltinTypeKind::Int32)],
282        )
283    }
284
285    fn publisher(tx: LoopbackTransport) -> Publisher<LoopbackTransport> {
286        let mut pds = PublishedDataSet::new("ds1");
287        pds.add_field("a", dv(0));
288        let mut pubr = Publisher::new(
289            tx,
290            WriterGroup::new(WriterGroupConfig::new("g1", 1), PublisherId::UInt16(9)),
291        );
292        pubr.add_dataset(pds).add_writer(DataSetWriter::new(
293            DataSetWriterConfig::new("w1", 5, "ds1"),
294            ConfigurationVersion::default(),
295        ));
296        pubr
297    }
298
299    fn subscriber(tx: LoopbackTransport) -> Subscriber<LoopbackTransport> {
300        let mut rg = ReaderGroup::new();
301        rg.add_reader(DataSetReader::new(DataSetReaderConfig::new("r1", meta())));
302        Subscriber::new(tx, rg)
303    }
304
305    #[test]
306    fn end_to_end_publish_then_poll() {
307        let bus = LoopbackTransport::new();
308        let mut pubr = publisher(bus.clone());
309        let subr = subscriber(bus);
310
311        pubr.dataset_mut("ds1").expect("ds").set("a", dv(123));
312        pubr.publish_cycle(None).expect("publish");
313
314        let matched = subr.poll().expect("poll");
315        assert_eq!(matched.len(), 1);
316        assert_eq!(matched[0].reader_name, "r1");
317        assert_eq!(matched[0].data.writer_id, 5);
318        assert_eq!(matched[0].data.kind, DataSetMessageKind::KeyFrame);
319        assert_eq!(
320            matched[0].data.fields[0].value.value,
321            Some(Variant::scalar(VariantValue::Int32(123)))
322        );
323    }
324
325    #[test]
326    fn poll_returns_empty_when_idle() {
327        let bus = LoopbackTransport::new();
328        let subr = subscriber(bus);
329        assert!(subr.poll().expect("poll").is_empty());
330    }
331
332    #[test]
333    fn unknown_dataset_is_reported() {
334        let bus = LoopbackTransport::new();
335        let mut pubr = Publisher::new(
336            bus,
337            WriterGroup::new(WriterGroupConfig::new("g1", 1), PublisherId::UInt16(9)),
338        );
339        // Writer references "ds1" but no dataset was added.
340        pubr.add_writer(DataSetWriter::new(
341            DataSetWriterConfig::new("w1", 5, "ds1"),
342            ConfigurationVersion::default(),
343        ));
344        assert_eq!(
345            pubr.publish_cycle(None),
346            Err(DaemonError::UnknownDataSet(String::from("ds1")))
347        );
348    }
349
350    #[cfg(feature = "security")]
351    #[test]
352    fn end_to_end_secured() {
353        use crate::security::{SecurityKey, SecurityKeyService, SecurityPolicy};
354
355        let policy = SecurityPolicy::Aes256Ctr;
356        let blob = alloc::vec![0x5Au8; policy.key_material_len()];
357        let key = SecurityKey::from_blob(policy, 11, &blob).expect("key");
358        let sks = SecurityKeyService::new(policy, "g", key.clone());
359
360        let bus = LoopbackTransport::new();
361        let mut pubr = publisher(bus.clone());
362        let subr = subscriber(bus);
363
364        pubr.dataset_mut("ds1").expect("ds").set("a", dv(777));
365        pubr.publish_cycle_secured(None, policy, &key)
366            .expect("publish");
367
368        // The plaintext value must not be observable on the wire.
369        assert_eq!(pubr.transport().pending(), 1);
370
371        let matched = subr.poll_secured(policy, &sks).expect("poll");
372        assert_eq!(matched.len(), 1);
373        assert_eq!(
374            matched[0].data.fields[0].value.value,
375            Some(Variant::scalar(VariantValue::Int32(777)))
376        );
377    }
378}