Skip to main content

sparkplug_b/
edge.rs

1//! Edge Node + Device lifecycle (Phase 2).
2//!
3//! [`EdgeNode`] is an async engine, generic over an [`MqttTransport`] and a
4//! [`BdSeqStore`], that implements the Sparkplug Edge Node session: register the
5//! NDEATH as the MQTT will, connect, subscribe to NCMD/DCMD, publish
6//! NBIRTH (`seq = 0`, the `bdSeq` metric, and `Node Control/Rebirth = false`)
7//! then each Device's DBIRTH, publish NDATA/DDATA by exception, handle an
8//! NCMD-driven (debounced) rebirth that reuses the connection's `bdSeq`, and
9//! publish an NDEATH on graceful disconnect.
10//!
11//! The engine spawns no tasks and uses no runtime timers, so it is deterministic
12//! and runtime-agnostic: the caller drives [`EdgeNode::recv_and_handle`] (or
13//! calls [`EdgeNode::handle_incoming`] directly) and the publish methods. The
14//! Phase 4 `rumqttc` transport supplies the real network loop.
15
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bytes::Bytes;
19
20use crate::alias::AliasRegistry;
21use crate::codec::{EncodeOptions, decode, encode};
22use crate::error::{Result, SparkplugError};
23use crate::model::{Metric, Payload};
24use crate::sequence::{BdSeqStore, Seq};
25use crate::state::StatePayload;
26use crate::topic::{DeviceId, EdgeNodeId, GroupId, MessageType, SparkplugTopic};
27use crate::transport::{
28    ConnectOptions, IncomingMessage, MqttTransport, OutboundMessage, Qos, TlsConfig,
29};
30use crate::value::MetricValue;
31use crate::{BDSEQ_METRIC_NAME, NODE_CONTROL_REBIRTH};
32
33/// Milliseconds since the Unix epoch (UTC), for payload/metric timestamps.
34fn now_ms() -> u64 {
35    SystemTime::now()
36        .duration_since(UNIX_EPOCH)
37        .map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
38        .unwrap_or(0)
39}
40
41/// Stamp every metric lacking a timestamp with `ts`. The spec requires a
42/// per-metric timestamp on every NBIRTH/DBIRTH/NDATA/DDATA metric
43/// (`tck-id-payloads-name-birth-data-requirement`), and some hosts (e.g. srad)
44/// reject an NBIRTH whose metrics are unstamped.
45fn stamp(metrics: &mut [Metric], ts: u64) {
46    for metric in metrics.iter_mut() {
47        if metric.timestamp.is_none() {
48            metric.timestamp = Some(ts);
49        }
50    }
51}
52
53/// Edge Node lifecycle state.
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum EdgeState {
56    /// Not connected.
57    Disconnected,
58    /// Connected; awaiting the configured primary host's online STATE before birthing.
59    WaitingForPrimaryHost,
60    /// Births published; publishing data by exception.
61    Online,
62}
63
64/// Configuration for an [`EdgeNode`].
65#[derive(Clone, Debug)]
66pub struct EdgeNodeConfig {
67    /// Group ID.
68    pub group: GroupId,
69    /// Edge Node ID.
70    pub edge: EdgeNodeId,
71    /// Devices reported by this Edge Node.
72    pub devices: Vec<DeviceId>,
73    /// Optional primary Host Application ID to gate online/offline on.
74    pub primary_host_id: Option<String>,
75    /// Assign and use metric aliases on DATA messages.
76    pub use_aliases: bool,
77    /// Debounce window collapsing a storm of rebirth requests into one.
78    pub rebirth_debounce: Duration,
79    /// MQTT client id.
80    pub client_id: String,
81    /// Broker host.
82    pub host: String,
83    /// Broker port.
84    pub port: u16,
85    /// Keep-alive interval, seconds.
86    pub keep_alive_secs: u16,
87    /// Optional TLS/mTLS configuration (honored with the `tls` feature).
88    pub tls: Option<TlsConfig>,
89}
90
91impl EdgeNodeConfig {
92    /// Build a config from string identifiers, validating the Group/Edge/Device IDs.
93    ///
94    /// # Errors
95    /// Returns [`SparkplugError::InvalidId`] if any identifier is empty or
96    /// contains a reserved character.
97    pub fn new(group: &str, edge: &str, devices: &[&str]) -> Result<Self> {
98        Ok(Self {
99            group: GroupId::new(group)?,
100            edge: EdgeNodeId::new(edge)?,
101            devices: devices
102                .iter()
103                .map(|d| DeviceId::new(*d))
104                .collect::<Result<_>>()?,
105            primary_host_id: None,
106            use_aliases: false,
107            rebirth_debounce: Duration::from_secs(5),
108            client_id: format!("{group}-{edge}"),
109            host: "localhost".to_owned(),
110            port: 1883,
111            keep_alive_secs: 30,
112            tls: None,
113        })
114    }
115}
116
117/// What the application supplies to the engine: the metrics to publish in births.
118pub trait DataSource {
119    /// The Edge Node's metrics for an NBIRTH (every metric it will ever report).
120    fn node_birth_metrics(&self) -> Vec<Metric>;
121
122    /// A Device's metrics for a DBIRTH.
123    fn device_birth_metrics(&self, device: &str) -> Vec<Metric>;
124}
125
126/// An event surfaced by [`EdgeNode::handle_incoming`].
127#[derive(Clone, Debug)]
128pub enum EdgeEvent {
129    /// A rebirth was requested and the birth sequence was re-published.
130    Rebirthed,
131    /// A rebirth was requested but suppressed by the debounce window.
132    RebirthDebounced,
133    /// An Edge Node command (NCMD) other than a rebirth.
134    NodeCommand(Payload),
135    /// A Device command (DCMD).
136    DeviceCommand {
137        /// Target device id.
138        device: String,
139        /// The command payload.
140        payload: Payload,
141    },
142    /// A primary-host STATE update.
143    PrimaryHostState(StatePayload),
144    /// A message that did not require engine action.
145    Ignored,
146}
147
148/// The Edge Node engine.
149pub struct EdgeNode<T, S> {
150    config: EdgeNodeConfig,
151    transport: T,
152    bdseq_store: S,
153    seq: Seq,
154    bd_seq: u8,
155    aliases: AliasRegistry,
156    next_alias: u64,
157    born_devices: Vec<String>,
158    last_rebirth: Option<Instant>,
159    last_state_ts: Option<i64>,
160    state: EdgeState,
161}
162
163impl<T: MqttTransport, S: BdSeqStore> EdgeNode<T, S> {
164    /// Create an engine. Call [`EdgeNode::connect`] to start the session.
165    pub fn new(config: EdgeNodeConfig, transport: T, bdseq_store: S) -> Self {
166        Self {
167            config,
168            transport,
169            bdseq_store,
170            seq: Seq::new(),
171            bd_seq: 0,
172            aliases: AliasRegistry::new(),
173            next_alias: 0,
174            born_devices: Vec::new(),
175            last_rebirth: None,
176            last_state_ts: None,
177            state: EdgeState::Disconnected,
178        }
179    }
180
181    /// The current lifecycle state.
182    #[must_use]
183    pub fn state(&self) -> EdgeState {
184        self.state
185    }
186
187    /// The `bdSeq` value bound to the current connection.
188    #[must_use]
189    pub fn bd_seq(&self) -> u8 {
190        self.bd_seq
191    }
192
193    fn node_topic(&self, ty: MessageType) -> String {
194        SparkplugTopic::Node {
195            group: self.config.group.clone(),
196            edge: self.config.edge.clone(),
197            ty,
198        }
199        .to_string()
200    }
201
202    fn device_topic(&self, device: &DeviceId, ty: MessageType) -> String {
203        SparkplugTopic::Device {
204            group: self.config.group.clone(),
205            edge: self.config.edge.clone(),
206            device: device.clone(),
207            ty,
208        }
209        .to_string()
210    }
211
212    /// The NDEATH payload (a single `bdSeq` INT64 metric).
213    fn ndeath_payload(&self) -> Payload {
214        let ts = now_ms();
215        let mut metrics = vec![Metric::new(
216            BDSEQ_METRIC_NAME,
217            MetricValue::Int64(i64::from(self.bd_seq)),
218        )];
219        stamp(&mut metrics, ts);
220        Payload {
221            timestamp: Some(ts),
222            metrics,
223            seq: None, // NDEATH carries no sequence number
224            uuid: None,
225            body: None,
226        }
227    }
228
229    async fn publish_raw(
230        &mut self,
231        topic: String,
232        payload: Bytes,
233        qos: Qos,
234        retain: bool,
235    ) -> Result<()> {
236        self.transport
237            .publish(&OutboundMessage {
238                topic,
239                qos,
240                retain,
241                payload,
242            })
243            .await
244    }
245
246    /// Connect to the broker (registering the NDEATH will), subscribe to NCMD /
247    /// per-device DCMD (and the primary host's STATE if configured), then publish
248    /// the birth sequence unless gated on a primary host.
249    ///
250    /// # Errors
251    /// Propagates any transport error.
252    pub async fn connect<D: DataSource>(&mut self, source: &D) -> Result<()> {
253        // The bdSeq for THIS connection, then persist the next one.
254        self.bd_seq = self.bdseq_store.load_next_death()?;
255        self.bdseq_store
256            .store_next_death(self.bd_seq.wrapping_add(1))?;
257
258        let will_payload = encode(&self.ndeath_payload(), EncodeOptions::birth());
259        let will = OutboundMessage {
260            topic: self.node_topic(MessageType::NDeath),
261            qos: Qos::AtLeastOnce, // will QoS MUST be 1
262            retain: false,         // will retain MUST be false
263            payload: will_payload,
264        };
265        let opts = ConnectOptions {
266            client_id: self.config.client_id.clone(),
267            host: self.config.host.clone(),
268            port: self.config.port,
269            keep_alive_secs: self.config.keep_alive_secs,
270            clean_start: true,
271            will: Some(will),
272            tls: self.config.tls.clone(),
273        };
274        self.transport.connect(&opts).await?;
275
276        // NCMD/DCMD subscriptions MUST be QoS 1 so rebirth/command requests are
277        // not silently dropped (tck-id-message-flow-edge-node-ncmd-subscribe,
278        // tck-id-message-flow-device-dcmd-subscribe).
279        let ncmd_topic = self.node_topic(MessageType::NCmd);
280        self.transport
281            .subscribe(&ncmd_topic, Qos::AtLeastOnce)
282            .await?;
283        let devices = self.config.devices.clone();
284        for device in &devices {
285            let dcmd_topic = self.device_topic(device, MessageType::DCmd);
286            self.transport
287                .subscribe(&dcmd_topic, Qos::AtLeastOnce)
288                .await?;
289        }
290
291        if let Some(host_id) = self.config.primary_host_id.clone() {
292            let state_topic = format!("spBv1.0/STATE/{host_id}");
293            self.transport
294                .subscribe(&state_topic, Qos::AtLeastOnce)
295                .await?;
296            self.state = EdgeState::WaitingForPrimaryHost;
297        } else {
298            self.publish_birth_sequence(source).await?;
299        }
300        Ok(())
301    }
302
303    /// Publish NBIRTH (`seq = 0`) followed by each Device's DBIRTH.
304    ///
305    /// # Errors
306    /// Propagates any transport error.
307    pub async fn publish_birth_sequence<D: DataSource>(&mut self, source: &D) -> Result<()> {
308        self.seq.reset();
309        if self.config.use_aliases {
310            self.aliases.clear();
311            self.next_alias = 0;
312        }
313        self.born_devices.clear();
314
315        let ts = now_ms();
316        let mut metrics = self.build_node_birth_metrics(source);
317        stamp(&mut metrics, ts);
318        let payload = Payload {
319            timestamp: Some(ts),
320            metrics,
321            seq: Some(self.seq.next_value()), // NBIRTH carries seq = 0
322            uuid: None,
323            body: None,
324        };
325        let bytes = encode(&payload, EncodeOptions::birth());
326        self.publish_raw(
327            self.node_topic(MessageType::NBirth),
328            bytes,
329            Qos::AtMostOnce,
330            false,
331        )
332        .await?;
333
334        let devices = self.config.devices.clone();
335        for device in &devices {
336            self.publish_device_birth(source, device).await?;
337        }
338        self.state = EdgeState::Online;
339        Ok(())
340    }
341
342    fn build_node_birth_metrics<D: DataSource>(&mut self, source: &D) -> Vec<Metric> {
343        let mut metrics = source.node_birth_metrics();
344        if !metrics
345            .iter()
346            .any(|m| m.name.as_deref() == Some(NODE_CONTROL_REBIRTH))
347        {
348            metrics.push(Metric::new(
349                NODE_CONTROL_REBIRTH,
350                MetricValue::Boolean(false),
351            ));
352        }
353        if !metrics
354            .iter()
355            .any(|m| m.name.as_deref() == Some(BDSEQ_METRIC_NAME))
356        {
357            metrics.push(Metric::new(
358                BDSEQ_METRIC_NAME,
359                MetricValue::Int64(i64::from(self.bd_seq)),
360            ));
361        }
362        self.assign_aliases(&mut metrics);
363        metrics
364    }
365
366    async fn publish_device_birth<D: DataSource>(
367        &mut self,
368        source: &D,
369        device: &DeviceId,
370    ) -> Result<()> {
371        let ts = now_ms();
372        let mut metrics = source.device_birth_metrics(device.as_str());
373        self.assign_aliases(&mut metrics);
374        stamp(&mut metrics, ts);
375        let payload = Payload {
376            timestamp: Some(ts),
377            metrics,
378            seq: Some(self.seq.next_value()),
379            uuid: None,
380            body: None,
381        };
382        let bytes = encode(&payload, EncodeOptions::birth());
383        self.publish_raw(
384            self.device_topic(device, MessageType::DBirth),
385            bytes,
386            Qos::AtMostOnce,
387            false,
388        )
389        .await?;
390        self.born_devices.push(device.as_str().to_owned());
391        Ok(())
392    }
393
394    /// Assign aliases to data metrics (never to the control metrics), recording
395    /// the name↔alias↔datatype binding for later DATA resolution.
396    fn assign_aliases(&mut self, metrics: &mut [Metric]) {
397        if !self.config.use_aliases {
398            return;
399        }
400        for metric in metrics.iter_mut() {
401            let Some(name) = metric.name.clone() else {
402                continue;
403            };
404            // The rebirth and bdSeq control metrics MUST NOT be aliased.
405            if name == NODE_CONTROL_REBIRTH || name == BDSEQ_METRIC_NAME {
406                continue;
407            }
408            let alias = self.next_alias;
409            self.next_alias += 1;
410            metric.alias = Some(alias);
411            self.aliases
412                .bind(&name, Some(alias), metric.value.datatype());
413        }
414    }
415
416    /// Map data metrics to their on-wire form: alias-only when aliasing is on
417    /// and a binding exists, otherwise by name.
418    fn to_data_metrics(&self, metrics: Vec<Metric>) -> Vec<Metric> {
419        if !self.config.use_aliases {
420            return metrics;
421        }
422        metrics
423            .into_iter()
424            .map(|mut m| {
425                if let Some(name) = m.name.as_deref()
426                    && let Some(alias) = self.aliases.alias_for_name(name)
427                {
428                    m.alias = Some(alias);
429                    m.name = None;
430                }
431                m
432            })
433            .collect()
434    }
435
436    /// Publish an NDATA (report-by-exception) with the next sequence number.
437    ///
438    /// # Errors
439    /// Returns [`SparkplugError::InvalidTopic`] if not online, or a transport error.
440    pub async fn publish_node_data(&mut self, metrics: Vec<Metric>) -> Result<()> {
441        if self.state != EdgeState::Online {
442            return Err(SparkplugError::InvalidTopic(
443                "cannot publish NDATA before NBIRTH".to_owned(),
444            ));
445        }
446        let ts = now_ms();
447        let mut data = self.to_data_metrics(metrics);
448        stamp(&mut data, ts);
449        let payload = Payload {
450            timestamp: Some(ts),
451            metrics: data,
452            seq: Some(self.seq.next_value()),
453            uuid: None,
454            body: None,
455        };
456        let bytes = encode(&payload, EncodeOptions::data());
457        self.publish_raw(
458            self.node_topic(MessageType::NData),
459            bytes,
460            Qos::AtMostOnce,
461            false,
462        )
463        .await
464    }
465
466    /// Publish a DDATA for a born device.
467    ///
468    /// # Errors
469    /// Returns an error if the device is not born or on transport failure.
470    pub async fn publish_device_data(&mut self, device: &str, metrics: Vec<Metric>) -> Result<()> {
471        if !self.born_devices.iter().any(|d| d == device) {
472            return Err(SparkplugError::InvalidTopic(format!(
473                "device {device:?} is not born; cannot publish DDATA"
474            )));
475        }
476        let device_id = DeviceId::new(device)?;
477        let ts = now_ms();
478        let mut data = self.to_data_metrics(metrics);
479        stamp(&mut data, ts);
480        let payload = Payload {
481            timestamp: Some(ts),
482            metrics: data,
483            seq: Some(self.seq.next_value()),
484            uuid: None,
485            body: None,
486        };
487        let bytes = encode(&payload, EncodeOptions::data());
488        self.publish_raw(
489            self.device_topic(&device_id, MessageType::DData),
490            bytes,
491            Qos::AtMostOnce,
492            false,
493        )
494        .await
495    }
496
497    /// Publish a DDEATH for a born device and mark it offline. DDEATH carries a
498    /// sequence number (`tck-id-payloads-ddeath-seq`) at QoS 0, retain=false.
499    ///
500    /// # Errors
501    /// Returns an error if the device is not born, or on transport failure.
502    pub async fn publish_device_death(&mut self, device: &str) -> Result<()> {
503        let pos = self
504            .born_devices
505            .iter()
506            .position(|d| d == device)
507            .ok_or_else(|| {
508                SparkplugError::InvalidTopic(format!(
509                    "device {device:?} is not born; cannot publish DDEATH"
510                ))
511            })?;
512        let device_id = DeviceId::new(device)?;
513        let payload = Payload {
514            timestamp: Some(now_ms()),
515            metrics: Vec::new(),
516            seq: Some(self.seq.next_value()),
517            uuid: None,
518            body: None,
519        };
520        let bytes = encode(&payload, EncodeOptions::birth());
521        self.publish_raw(
522            self.device_topic(&device_id, MessageType::DDeath),
523            bytes,
524            Qos::AtMostOnce,
525            false,
526        )
527        .await?;
528        self.born_devices.remove(pos);
529        Ok(())
530    }
531
532    fn rebirth_allowed(&self) -> bool {
533        match self.last_rebirth {
534            None => true,
535            Some(at) => at.elapsed() >= self.config.rebirth_debounce,
536        }
537    }
538
539    /// Handle one inbound message, driving rebirth / primary-host transitions.
540    ///
541    /// A `Node Control/Rebirth = true` NCMD re-publishes the birth sequence
542    /// (resetting `seq` to 0 and **reusing** the connection's `bdSeq`), subject to
543    /// the rebirth debounce.
544    ///
545    /// # Errors
546    /// Returns a decode/transport error; a malformed topic/payload is surfaced
547    /// rather than panicking.
548    pub async fn handle_incoming<D: DataSource>(
549        &mut self,
550        message: &IncomingMessage,
551        source: &D,
552    ) -> Result<EdgeEvent> {
553        let topic = SparkplugTopic::parse(&message.topic)?;
554        match topic {
555            SparkplugTopic::Node {
556                ty: MessageType::NCmd,
557                ..
558            } => {
559                let payload = decode(&message.payload, Some(&self.aliases))?;
560                let is_rebirth = payload.metrics.iter().any(|m| {
561                    m.name.as_deref() == Some(NODE_CONTROL_REBIRTH)
562                        && matches!(m.value, MetricValue::Boolean(true))
563                });
564                if is_rebirth {
565                    if self.rebirth_allowed() {
566                        self.publish_birth_sequence(source).await?;
567                        self.last_rebirth = Some(Instant::now());
568                        return Ok(EdgeEvent::Rebirthed);
569                    }
570                    return Ok(EdgeEvent::RebirthDebounced);
571                }
572                Ok(EdgeEvent::NodeCommand(payload))
573            }
574            SparkplugTopic::Device {
575                device,
576                ty: MessageType::DCmd,
577                ..
578            } => {
579                let payload = decode(&message.payload, Some(&self.aliases))?;
580                Ok(EdgeEvent::DeviceCommand {
581                    device: device.as_str().to_owned(),
582                    payload,
583                })
584            }
585            SparkplugTopic::HostState { host_id } => {
586                if self.config.primary_host_id.as_deref() != Some(host_id.as_str()) {
587                    return Ok(EdgeEvent::Ignored);
588                }
589                let state = StatePayload::parse(
590                    std::str::from_utf8(&message.payload)
591                        .map_err(|_| SparkplugError::InvalidUtf8)?,
592                )?;
593                // Ignore a STATE older than the last accepted one (the edge
594                // drops stale/out-of-order primary-host STATE messages).
595                if self
596                    .last_state_ts
597                    .is_some_and(|last| state.timestamp < last)
598                {
599                    return Ok(EdgeEvent::Ignored);
600                }
601                self.last_state_ts = Some(state.timestamp);
602                if state.online {
603                    if self.state == EdgeState::WaitingForPrimaryHost {
604                        self.publish_birth_sequence(source).await?;
605                    }
606                } else if self.state == EdgeState::Online {
607                    // Primary host went offline: terminate this session (a
608                    // graceful disconnect publishes NDEATH first). Re-homing to
609                    // another MQTT server is the Phase 4 multi-server concern.
610                    self.disconnect().await?;
611                }
612                Ok(EdgeEvent::PrimaryHostState(state))
613            }
614            _ => Ok(EdgeEvent::Ignored),
615        }
616    }
617
618    /// Receive the next inbound message and handle it; `None` if the stream closed.
619    ///
620    /// # Errors
621    /// Propagates transport/decode errors.
622    pub async fn recv_and_handle<D: DataSource>(
623        &mut self,
624        source: &D,
625    ) -> Result<Option<EdgeEvent>> {
626        match self.transport.recv().await? {
627            Some(message) => Ok(Some(self.handle_incoming(&message, source).await?)),
628            None => Ok(None),
629        }
630    }
631
632    /// Gracefully disconnect: publish the NDEATH explicitly (so the broker need
633    /// not deliver the will), then disconnect.
634    ///
635    /// # Errors
636    /// Propagates any transport error.
637    pub async fn disconnect(&mut self) -> Result<()> {
638        let bytes = encode(&self.ndeath_payload(), EncodeOptions::birth());
639        self.publish_raw(
640            self.node_topic(MessageType::NDeath),
641            bytes,
642            Qos::AtLeastOnce,
643            false,
644        )
645        .await?;
646        self.transport.disconnect().await?;
647        self.state = EdgeState::Disconnected;
648        Ok(())
649    }
650}