Skip to main content

sparkplug_b/
topic.rs

1//! Topic namespace, message types, and validated identifiers (spec §4).
2//!
3//! `namespace/group_id/message_type/edge_node_id/[device_id]`
4//! (`tck-id-topic-structure`), with the constant namespace `spBv1.0`. We model
5//! a topic as an enum (ADR-3) so the token-count ↔ message-type invariant is
6//! enforced by the type system, and we expose a single canonical parser
7//! (Eclipse Tahu ships two divergent ones).
8
9use std::fmt;
10use std::str::FromStr;
11
12use crate::error::{Result, SparkplugError};
13
14/// The Sparkplug B namespace token.
15pub const NAMESPACE: &str = "spBv1.0";
16
17/// The STATE topic token (`spBv1.0/STATE/<host_id>`).
18pub const STATE_TOKEN: &str = "STATE";
19
20/// A Sparkplug message type (the topic's `message_type` token).
21#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
22pub enum MessageType {
23    /// Edge Node birth certificate.
24    NBirth,
25    /// Edge Node death certificate.
26    NDeath,
27    /// Edge Node data.
28    NData,
29    /// Edge Node command.
30    NCmd,
31    /// Device birth certificate.
32    DBirth,
33    /// Device death certificate.
34    DDeath,
35    /// Device data.
36    DData,
37    /// Device command.
38    DCmd,
39    /// Host Application state.
40    State,
41}
42
43impl MessageType {
44    /// The wire token (`"NBIRTH"`, `"DDATA"`, …).
45    #[must_use]
46    pub const fn as_str(self) -> &'static str {
47        match self {
48            Self::NBirth => "NBIRTH",
49            Self::NDeath => "NDEATH",
50            Self::NData => "NDATA",
51            Self::NCmd => "NCMD",
52            Self::DBirth => "DBIRTH",
53            Self::DDeath => "DDEATH",
54            Self::DData => "DDATA",
55            Self::DCmd => "DCMD",
56            Self::State => "STATE",
57        }
58    }
59
60    /// Whether this message type addresses a Device (carries a `device_id`).
61    #[must_use]
62    pub const fn has_device(self) -> bool {
63        matches!(self, Self::DBirth | Self::DDeath | Self::DData | Self::DCmd)
64    }
65
66    /// Whether a payload of this type carries a sequence number. NDEATH, NCMD,
67    /// DCMD, and STATE do not (`tck-id-payloads-{ndeath,ncmd,dcmd}-seq`).
68    #[must_use]
69    pub const fn carries_seq(self) -> bool {
70        matches!(
71            self,
72            Self::NBirth | Self::NData | Self::DBirth | Self::DData | Self::DDeath
73        )
74    }
75
76    /// Whether this is a command (NCMD/DCMD).
77    #[must_use]
78    pub const fn is_command(self) -> bool {
79        matches!(self, Self::NCmd | Self::DCmd)
80    }
81}
82
83impl fmt::Display for MessageType {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.write_str(self.as_str())
86    }
87}
88
89impl FromStr for MessageType {
90    type Err = SparkplugError;
91
92    fn from_str(s: &str) -> Result<Self> {
93        let ty = match s {
94            "NBIRTH" => Self::NBirth,
95            "NDEATH" => Self::NDeath,
96            "NDATA" => Self::NData,
97            "NCMD" => Self::NCmd,
98            "DBIRTH" => Self::DBirth,
99            "DDEATH" => Self::DDeath,
100            "DDATA" => Self::DData,
101            "DCMD" => Self::DCmd,
102            "STATE" => Self::State,
103            other => {
104                return Err(SparkplugError::InvalidTopic(format!(
105                    "unknown message type {other:?}"
106                )));
107            }
108        };
109        Ok(ty)
110    }
111}
112
113/// Validate a Sparkplug identifier token: non-empty and free of the reserved
114/// MQTT characters `+`, `/`, `#` (`tck-id-topic-structure-namespace-valid-*`).
115fn validate_id(s: &str) -> Result<()> {
116    if s.is_empty() {
117        return Err(SparkplugError::InvalidId("identifier is empty".to_owned()));
118    }
119    if s.contains(['+', '/', '#']) {
120        return Err(SparkplugError::InvalidId(format!(
121            "{s:?} contains a reserved character (+, /, or #)"
122        )));
123    }
124    Ok(())
125}
126
127/// Define a validated identifier newtype.
128macro_rules! id_newtype {
129    ($(#[$m:meta])* $name:ident) => {
130        $(#[$m])*
131        #[derive(Clone, Debug, PartialEq, Eq, Hash)]
132        pub struct $name(String);
133
134        impl $name {
135            /// Create the identifier, validating reserved characters.
136            ///
137            /// # Errors
138            /// Returns [`SparkplugError::InvalidId`] if empty or containing `+`, `/`, `#`.
139            pub fn new(s: impl Into<String>) -> Result<Self> {
140                let s = s.into();
141                validate_id(&s)?;
142                Ok(Self(s))
143            }
144
145            /// The identifier as a string slice.
146            #[must_use]
147            pub fn as_str(&self) -> &str {
148                &self.0
149            }
150        }
151
152        impl fmt::Display for $name {
153            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154                f.write_str(&self.0)
155            }
156        }
157
158        impl AsRef<str> for $name {
159            fn as_ref(&self) -> &str {
160                &self.0
161            }
162        }
163    };
164}
165
166id_newtype!(
167    /// A Sparkplug Group ID.
168    GroupId
169);
170id_newtype!(
171    /// A Sparkplug Edge Node ID.
172    EdgeNodeId
173);
174id_newtype!(
175    /// A Sparkplug Device ID.
176    DeviceId
177);
178
179/// A parsed Sparkplug B topic.
180#[derive(Clone, Debug, PartialEq, Eq)]
181pub enum SparkplugTopic {
182    /// An Edge-Node-level topic (4 tokens; `N*` message types).
183    Node {
184        /// Group ID.
185        group: GroupId,
186        /// Edge Node ID.
187        edge: EdgeNodeId,
188        /// Message type (one of NBIRTH/NDEATH/NDATA/NCMD).
189        ty: MessageType,
190    },
191    /// A Device-level topic (5 tokens; `D*` message types).
192    Device {
193        /// Group ID.
194        group: GroupId,
195        /// Edge Node ID.
196        edge: EdgeNodeId,
197        /// Device ID.
198        device: DeviceId,
199        /// Message type (one of DBIRTH/DDEATH/DDATA/DCMD).
200        ty: MessageType,
201    },
202    /// A Host Application STATE topic (`spBv1.0/STATE/<host_id>`).
203    HostState {
204        /// Host Application ID.
205        host_id: String,
206    },
207}
208
209impl SparkplugTopic {
210    /// Build an Edge-Node topic; the message type must be a non-device `N*` type.
211    ///
212    /// # Errors
213    /// Returns [`SparkplugError::InvalidTopic`] if `ty` is a device or STATE type.
214    pub fn node(group: GroupId, edge: EdgeNodeId, ty: MessageType) -> Result<Self> {
215        if ty.has_device() || ty == MessageType::State {
216            return Err(SparkplugError::InvalidTopic(format!(
217                "{ty} is not an Edge-Node message type"
218            )));
219        }
220        Ok(Self::Node { group, edge, ty })
221    }
222
223    /// Build a Device topic; the message type must be a `D*` type.
224    ///
225    /// # Errors
226    /// Returns [`SparkplugError::InvalidTopic`] if `ty` is not a device type.
227    pub fn device(
228        group: GroupId,
229        edge: EdgeNodeId,
230        device: DeviceId,
231        ty: MessageType,
232    ) -> Result<Self> {
233        if !ty.has_device() {
234            return Err(SparkplugError::InvalidTopic(format!(
235                "{ty} is not a Device message type"
236            )));
237        }
238        Ok(Self::Device {
239            group,
240            edge,
241            device,
242            ty,
243        })
244    }
245
246    /// The message type of this topic.
247    #[must_use]
248    pub fn message_type(&self) -> MessageType {
249        match self {
250            Self::Node { ty, .. } | Self::Device { ty, .. } => *ty,
251            Self::HostState { .. } => MessageType::State,
252        }
253    }
254
255    /// Parse a topic string into a validated [`SparkplugTopic`].
256    ///
257    /// # Errors
258    /// Returns [`SparkplugError::InvalidTopic`] / [`SparkplugError::InvalidId`]
259    /// for a bad namespace, wrong token count, reserved characters, or a
260    /// message type that does not match the topic arity.
261    pub fn parse(topic: &str) -> Result<Self> {
262        let parts: Vec<&str> = topic.split('/').collect();
263        if parts.first() != Some(&NAMESPACE) {
264            return Err(SparkplugError::InvalidTopic(format!(
265                "topic must start with {NAMESPACE:?}: {topic:?}"
266            )));
267        }
268        match parts.as_slice() {
269            [_, token, host] if *token == STATE_TOKEN => {
270                validate_id(host)?;
271                Ok(Self::HostState {
272                    host_id: (*host).to_owned(),
273                })
274            }
275            [_, group, ty_token, edge] => {
276                let ty = MessageType::from_str(ty_token)?;
277                if ty.has_device() || ty == MessageType::State {
278                    return Err(SparkplugError::InvalidTopic(format!(
279                        "{ty} requires a different token count"
280                    )));
281                }
282                Self::node(GroupId::new(*group)?, EdgeNodeId::new(*edge)?, ty)
283            }
284            [_, group, ty_token, edge, device] => {
285                let ty = MessageType::from_str(ty_token)?;
286                if !ty.has_device() {
287                    return Err(SparkplugError::InvalidTopic(format!(
288                        "{ty} must not include a device_id"
289                    )));
290                }
291                Self::device(
292                    GroupId::new(*group)?,
293                    EdgeNodeId::new(*edge)?,
294                    DeviceId::new(*device)?,
295                    ty,
296                )
297            }
298            _ => Err(SparkplugError::InvalidTopic(format!(
299                "unexpected token count in topic {topic:?}"
300            ))),
301        }
302    }
303}
304
305impl fmt::Display for SparkplugTopic {
306    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307        match self {
308            Self::Node { group, edge, ty } => {
309                write!(f, "{NAMESPACE}/{group}/{ty}/{edge}")
310            }
311            Self::Device {
312                group,
313                edge,
314                device,
315                ty,
316            } => write!(f, "{NAMESPACE}/{group}/{ty}/{edge}/{device}"),
317            Self::HostState { host_id } => write!(f, "{NAMESPACE}/{STATE_TOKEN}/{host_id}"),
318        }
319    }
320}