Skip to main content

styrene_mqtt/
topic.rs

1use crate::error::{MqttError, Result};
2
3const PREFIX: &str = "styrene";
4const EVENTS_SEGMENT: &str = "events";
5
6/// Components of a fully-qualified Aether topic.
7///
8/// Topic format: `styrene/{operator_id}/{service}/{instance_id}/events/{event_type}`
9#[derive(Debug, Clone, PartialEq, Eq, Hash)]
10pub struct TopicAddress {
11    pub operator_id: String,
12    pub service: String,
13    pub instance_id: String,
14    pub event_type: String,
15}
16
17impl TopicAddress {
18    /// Render as an MQTT topic string.
19    pub fn to_topic_string(&self) -> String {
20        format!(
21            "{}/{}/{}/{}/{}/{}",
22            PREFIX,
23            self.operator_id,
24            self.service,
25            self.instance_id,
26            EVENTS_SEGMENT,
27            self.event_type,
28        )
29    }
30
31    /// Parse from an MQTT topic string.
32    pub fn parse(topic: &str) -> Result<Self> {
33        let parts: Vec<&str> = topic.splitn(6, '/').collect();
34
35        if parts.len() < 6 {
36            return Err(MqttError::InvalidTopic(format!(
37                "expected at least 6 segments, got {}: `{topic}`",
38                parts.len()
39            )));
40        }
41
42        if parts[0] != PREFIX {
43            return Err(MqttError::InvalidTopic(format!(
44                "expected prefix `{PREFIX}`, got `{}`",
45                parts[0]
46            )));
47        }
48
49        if parts[4] != EVENTS_SEGMENT {
50            return Err(MqttError::InvalidTopic(format!(
51                "expected `{EVENTS_SEGMENT}` at segment 4, got `{}`",
52                parts[4]
53            )));
54        }
55
56        Ok(Self {
57            operator_id: parts[1].to_owned(),
58            service: parts[2].to_owned(),
59            instance_id: parts[3].to_owned(),
60            event_type: parts[5].to_owned(),
61        })
62    }
63}
64
65impl std::fmt::Display for TopicAddress {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "{}", self.to_topic_string())
68    }
69}
70
71/// Fluent builder for constructing MQTT topic strings and subscription filters.
72///
73/// Unset fields become MQTT wildcards in subscription filters:
74/// - operator/service/instance: `+` (single-level)
75/// - event_type: `#` (multi-level, matches all sub-levels)
76#[derive(Debug, Clone, Default)]
77pub struct TopicBuilder {
78    operator_id: Option<String>,
79    service: Option<String>,
80    instance_id: Option<String>,
81    event_type: Option<String>,
82}
83
84impl TopicBuilder {
85    pub fn new() -> Self {
86        Self::default()
87    }
88
89    pub fn operator(mut self, id: impl Into<String>) -> Self {
90        self.operator_id = Some(id.into());
91        self
92    }
93
94    pub fn service(mut self, name: impl Into<String>) -> Self {
95        self.service = Some(name.into());
96        self
97    }
98
99    pub fn instance(mut self, id: impl Into<String>) -> Self {
100        self.instance_id = Some(id.into());
101        self
102    }
103
104    pub fn event_type(mut self, ty: impl Into<String>) -> Self {
105        self.event_type = Some(ty.into());
106        self
107    }
108
109    /// Build a concrete publish topic. All fields must be set.
110    pub fn build_publish(&self) -> Result<String> {
111        let operator_id = self
112            .operator_id
113            .as_deref()
114            .ok_or_else(|| MqttError::InvalidTopic("operator_id required for publish".into()))?;
115        let service = self
116            .service
117            .as_deref()
118            .ok_or_else(|| MqttError::InvalidTopic("service required for publish".into()))?;
119        let instance_id = self
120            .instance_id
121            .as_deref()
122            .ok_or_else(|| MqttError::InvalidTopic("instance_id required for publish".into()))?;
123        let event_type = self
124            .event_type
125            .as_deref()
126            .ok_or_else(|| MqttError::InvalidTopic("event_type required for publish".into()))?;
127
128        Ok(format!("{PREFIX}/{operator_id}/{service}/{instance_id}/{EVENTS_SEGMENT}/{event_type}"))
129    }
130
131    /// Build a subscription filter. Unset fields become MQTT wildcards.
132    pub fn build_subscribe(&self) -> String {
133        let op = self.operator_id.as_deref().unwrap_or("+");
134        let svc = self.service.as_deref().unwrap_or("+");
135        let inst = self.instance_id.as_deref().unwrap_or("+");
136        let evt = self.event_type.as_deref().unwrap_or("#");
137
138        format!("{PREFIX}/{op}/{svc}/{inst}/{EVENTS_SEGMENT}/{evt}")
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn address_roundtrip() {
148        let addr = TopicAddress {
149            operator_id: "op1".into(),
150            service: "omegon".into(),
151            instance_id: "abc123".into(),
152            event_type: "turn.started".into(),
153        };
154        let topic = addr.to_topic_string();
155        assert_eq!(topic, "styrene/op1/omegon/abc123/events/turn.started");
156
157        let parsed = TopicAddress::parse(&topic).expect("parse should succeed");
158        assert_eq!(parsed, addr);
159    }
160
161    #[test]
162    fn parse_rejects_short_topic() {
163        assert!(TopicAddress::parse("styrene/op1/omegon").is_err());
164    }
165
166    #[test]
167    fn parse_rejects_wrong_prefix() {
168        assert!(TopicAddress::parse("wrong/op1/omegon/abc/events/turn.started").is_err());
169    }
170
171    #[test]
172    fn parse_rejects_missing_events_segment() {
173        assert!(TopicAddress::parse("styrene/op1/omegon/abc/other/turn.started").is_err());
174    }
175
176    #[test]
177    fn builder_publish_requires_all_fields() {
178        let b = TopicBuilder::new().operator("op1").service("omegon");
179        assert!(b.build_publish().is_err());
180    }
181
182    #[test]
183    fn builder_publish_full() {
184        let topic = TopicBuilder::new()
185            .operator("op1")
186            .service("omegon")
187            .instance("abc123")
188            .event_type("tool.ended")
189            .build_publish()
190            .expect("should succeed");
191        assert_eq!(topic, "styrene/op1/omegon/abc123/events/tool.ended");
192    }
193
194    #[test]
195    fn builder_subscribe_all_wildcards() {
196        let filter = TopicBuilder::new().build_subscribe();
197        assert_eq!(filter, "styrene/+/+/+/events/#");
198    }
199
200    #[test]
201    fn builder_subscribe_partial() {
202        let filter = TopicBuilder::new().operator("op1").service("omegon").build_subscribe();
203        assert_eq!(filter, "styrene/op1/omegon/+/events/#");
204    }
205
206    #[test]
207    fn builder_subscribe_specific_event() {
208        let filter =
209            TopicBuilder::new().operator("op1").event_type("turn.started").build_subscribe();
210        assert_eq!(filter, "styrene/op1/+/+/events/turn.started");
211    }
212}