mqtt5_protocol/validation/
namespace.rs1use crate::error::{MqttError, Result};
2use crate::prelude::{format, String, ToString, Vec};
3use crate::validation::{validate_topic_filter, validate_topic_name, TopicValidator};
4
5#[derive(Debug, Clone)]
21pub struct NamespaceValidator {
22 pub service_prefix: String,
24 pub device_namespace: String,
26 pub device_id: Option<String>,
29 pub allow_system_topics: bool,
31 pub additional_reserved_prefixes: Vec<String>,
33}
34
35impl NamespaceValidator {
36 #[must_use]
42 pub fn new(service_prefix: impl Into<String>, device_namespace: impl Into<String>) -> Self {
43 Self {
44 service_prefix: service_prefix.into(),
45 device_namespace: device_namespace.into(),
46 device_id: None,
47 allow_system_topics: false,
48 additional_reserved_prefixes: Vec::new(),
49 }
50 }
51
52 #[must_use]
54 pub fn aws_iot() -> Self {
55 Self::new("$aws", "things")
56 }
57
58 #[must_use]
60 pub fn azure_iot() -> Self {
61 Self::new("$azure", "device")
62 }
63
64 #[must_use]
66 pub fn google_cloud_iot() -> Self {
67 Self::new("$gcp", "device")
68 }
69
70 #[must_use]
75 pub fn with_device_id(mut self, device_id: impl Into<String>) -> Self {
76 self.device_id = Some(device_id.into());
77 self
78 }
79
80 #[must_use]
82 pub fn with_system_topics(mut self, allow: bool) -> Self {
83 self.allow_system_topics = allow;
84 self
85 }
86
87 #[must_use]
89 pub fn with_reserved_prefix(mut self, prefix: impl Into<String>) -> Self {
90 self.additional_reserved_prefixes.push(prefix.into());
91 self
92 }
93
94 fn is_service_topic(&self, topic: &str) -> bool {
96 topic.starts_with(&format!("{}/", self.service_prefix))
97 }
98
99 fn is_system_topic(topic: &str) -> bool {
101 topic.starts_with("$SYS/")
102 }
103
104 fn validate_namespace_restrictions(&self, topic: &str) -> Result<()> {
106 if Self::is_system_topic(topic) && !self.allow_system_topics {
108 return Err(MqttError::InvalidTopicName(
109 "System topics ($SYS/*) are not allowed".to_string(),
110 ));
111 }
112
113 if self.is_service_topic(topic) {
115 if self.service_prefix == "$aws" {
117 let aws_reserved_prefixes = ["$aws/certificates/", "$aws/provisioning-templates/"];
119
120 for reserved in &aws_reserved_prefixes {
122 if topic.starts_with(reserved) {
123 return Err(MqttError::InvalidTopicName(format!(
124 "Cannot publish to reserved AWS IoT topic: {topic}"
125 )));
126 }
127 }
128
129 if topic.starts_with("$aws/things/") {
131 if let Some(ref device_id) = self.device_id {
134 let allowed_patterns = [
136 format!("$aws/things/{device_id}/shadow/update"),
137 format!("$aws/things/{device_id}/shadow/delete"),
138 format!("$aws/things/{device_id}/jobs/"),
139 ];
140 if !allowed_patterns
141 .iter()
142 .any(|pattern| topic.starts_with(pattern))
143 {
144 return Err(MqttError::InvalidTopicName(format!(
145 "Cannot publish to reserved AWS IoT topic: {topic}"
146 )));
147 }
148 } else {
149 return Err(MqttError::InvalidTopicName(
150 "Device-specific topics require device ID to be configured".to_string(),
151 ));
152 }
153 }
154 } else {
155 let device_namespace_prefix =
158 format!("{}/{}/", self.service_prefix, self.device_namespace);
159
160 if let Some(ref device_id) = self.device_id {
162 let device_prefix = format!("{device_namespace_prefix}{device_id}/");
163
164 if topic.starts_with(&device_prefix) {
166 return Ok(());
167 }
168
169 if !topic.starts_with(&device_namespace_prefix) {
171 return Ok(());
174 }
175
176 if topic.starts_with(&device_namespace_prefix) {
178 return Err(MqttError::InvalidTopicName(format!(
179 "Topic '{topic}' is for a different device. Only topics under '{device_prefix}' are allowed"
180 )));
181 }
182 } else {
183 if topic.starts_with(&device_namespace_prefix) {
185 return Err(MqttError::InvalidTopicName(format!(
186 "Device-specific topics ({device_namespace_prefix}*) require device ID to be configured"
187 )));
188 }
189 }
191 }
192 }
193
194 for prefix in &self.additional_reserved_prefixes {
196 if topic.starts_with(prefix) {
197 return Err(MqttError::InvalidTopicName(format!(
198 "Topic '{topic}' uses reserved prefix '{prefix}'"
199 )));
200 }
201 }
202
203 Ok(())
204 }
205}
206
207impl TopicValidator for NamespaceValidator {
208 fn validate_topic_name(&self, topic: &str) -> Result<()> {
209 validate_topic_name(topic)?;
211
212 if self.service_prefix == "$aws" && topic.len() > 256 {
214 return Err(MqttError::InvalidTopicName(
215 "AWS IoT topics must not exceed 256 characters".to_string(),
216 ));
217 }
218
219 self.validate_namespace_restrictions(topic)
221 }
222
223 fn validate_topic_filter(&self, filter: &str) -> Result<()> {
224 validate_topic_filter(filter)?;
226
227 if self.service_prefix == "$aws" && filter.len() > 256 {
229 return Err(MqttError::InvalidTopicFilter(
230 "AWS IoT topic filters must not exceed 256 characters".to_string(),
231 ));
232 }
233
234 if !filter.contains('+') && !filter.contains('#') {
237 self.validate_namespace_restrictions(filter)?;
238 }
239
240 Ok(())
241 }
242
243 fn is_reserved_topic(&self, topic: &str) -> bool {
244 if self.is_service_topic(topic) {
246 let device_namespace_prefix =
247 format!("{}/{}/", self.service_prefix, self.device_namespace);
248
249 if let Some(ref device_id) = self.device_id {
251 let device_prefix = format!("{device_namespace_prefix}{device_id}/");
252 return !topic.starts_with(&device_prefix)
253 && topic.starts_with(&device_namespace_prefix);
254 }
255 return topic.starts_with(&device_namespace_prefix);
257 }
258
259 if Self::is_system_topic(topic) && !self.allow_system_topics {
261 return true;
262 }
263
264 self.additional_reserved_prefixes
266 .iter()
267 .any(|prefix| topic.starts_with(prefix))
268 }
269
270 fn description(&self) -> &'static str {
271 "Namespace-based topic validator with hierarchical isolation"
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_namespace_validator_basic() {
281 let validator = NamespaceValidator::new("$aws", "thing");
282
283 assert!(validator.validate_topic_name("sensor/temperature").is_ok());
285 assert!(validator.validate_topic_filter("sensor/+").is_ok());
286
287 assert!(validator
289 .validate_topic_name("$SYS/broker/version")
290 .is_err());
291 assert!(!validator.is_reserved_topic("regular/topic"));
292 assert!(validator.is_reserved_topic("$SYS/broker/version"));
293 }
294
295 #[test]
296 fn test_namespace_validator_with_device() {
297 let validator = NamespaceValidator::new("$aws", "things").with_device_id("my-device");
298
299 assert!(validator
301 .validate_topic_name("$aws/things/my-device/shadow/update")
302 .is_ok());
303
304 assert!(validator
306 .validate_topic_name("$aws/things/other-device/shadow/update")
307 .is_err());
308
309 assert!(validator
311 .validate_topic_name("$aws/events/presence/connected/my-device")
312 .is_ok());
313 }
314
315 #[test]
316 fn test_namespace_validator_system_topics() {
317 let validator = NamespaceValidator::new("$aws", "thing").with_system_topics(true);
318
319 assert!(validator.validate_topic_name("$SYS/broker/version").is_ok());
321 assert!(!validator.is_reserved_topic("$SYS/broker/version"));
322 }
323
324 #[test]
325 fn test_namespace_validator_additional_prefixes() {
326 let validator = NamespaceValidator::new("$aws", "thing")
327 .with_reserved_prefix("company/")
328 .with_reserved_prefix("internal/");
329
330 assert!(validator.validate_topic_name("company/secret").is_err());
332 assert!(validator.validate_topic_name("internal/admin").is_err());
333 assert!(validator.is_reserved_topic("company/secret"));
334
335 assert!(validator.validate_topic_name("public/sensor").is_ok());
337 }
338
339 #[test]
340 fn test_different_cloud_providers() {
341 let aws = NamespaceValidator::aws_iot().with_device_id("sensor-123");
343 assert!(aws
344 .validate_topic_name("$aws/things/sensor-123/shadow/update")
345 .is_ok());
346 assert!(aws
347 .validate_topic_name("$aws/things/sensor-456/shadow/update")
348 .is_err());
349
350 let azure = NamespaceValidator::azure_iot().with_device_id("device-abc");
352 assert!(azure
353 .validate_topic_name("$azure/device/device-abc/telemetry")
354 .is_ok());
355 assert!(azure
356 .validate_topic_name("$azure/device/device-xyz/telemetry")
357 .is_err());
358
359 let enterprise = NamespaceValidator::new("$company", "asset").with_device_id("machine-001");
361 assert!(enterprise
362 .validate_topic_name("$company/asset/machine-001/status")
363 .is_ok());
364 assert!(enterprise
365 .validate_topic_name("$company/asset/machine-002/status")
366 .is_err());
367 }
368}