mqtt5_protocol/validation/
namespace.rs1use crate::error::{MqttError, Result};
2use crate::validation::{validate_topic_filter, validate_topic_name, TopicValidator};
3
4#[derive(Debug, Clone)]
20pub struct NamespaceValidator {
21 pub service_prefix: String,
23 pub device_namespace: String,
25 pub device_id: Option<String>,
28 pub allow_system_topics: bool,
30 pub additional_reserved_prefixes: Vec<String>,
32}
33
34impl NamespaceValidator {
35 #[must_use]
41 pub fn new(service_prefix: impl Into<String>, device_namespace: impl Into<String>) -> Self {
42 Self {
43 service_prefix: service_prefix.into(),
44 device_namespace: device_namespace.into(),
45 device_id: None,
46 allow_system_topics: false,
47 additional_reserved_prefixes: Vec::new(),
48 }
49 }
50
51 #[must_use]
53 pub fn aws_iot() -> Self {
54 Self::new("$aws", "things")
55 }
56
57 #[must_use]
59 pub fn azure_iot() -> Self {
60 Self::new("$azure", "device")
61 }
62
63 #[must_use]
65 pub fn google_cloud_iot() -> Self {
66 Self::new("$gcp", "device")
67 }
68
69 #[must_use]
74 pub fn with_device_id(mut self, device_id: impl Into<String>) -> Self {
75 self.device_id = Some(device_id.into());
76 self
77 }
78
79 #[must_use]
81 pub fn with_system_topics(mut self, allow: bool) -> Self {
82 self.allow_system_topics = allow;
83 self
84 }
85
86 #[must_use]
88 pub fn with_reserved_prefix(mut self, prefix: impl Into<String>) -> Self {
89 self.additional_reserved_prefixes.push(prefix.into());
90 self
91 }
92
93 fn is_service_topic(&self, topic: &str) -> bool {
95 topic.starts_with(&format!("{}/", self.service_prefix))
96 }
97
98 fn is_system_topic(topic: &str) -> bool {
100 topic.starts_with("$SYS/")
101 }
102
103 fn validate_namespace_restrictions(&self, topic: &str) -> Result<()> {
105 if Self::is_system_topic(topic) && !self.allow_system_topics {
107 return Err(MqttError::InvalidTopicName(
108 "System topics ($SYS/*) are not allowed".to_string(),
109 ));
110 }
111
112 if self.is_service_topic(topic) {
114 if self.service_prefix == "$aws" {
116 let aws_reserved_prefixes = ["$aws/certificates/", "$aws/provisioning-templates/"];
118
119 for reserved in &aws_reserved_prefixes {
121 if topic.starts_with(reserved) {
122 return Err(MqttError::InvalidTopicName(format!(
123 "Cannot publish to reserved AWS IoT topic: {topic}"
124 )));
125 }
126 }
127
128 if topic.starts_with("$aws/things/") {
130 if let Some(ref device_id) = self.device_id {
133 let allowed_patterns = [
135 format!("$aws/things/{device_id}/shadow/update"),
136 format!("$aws/things/{device_id}/shadow/delete"),
137 format!("$aws/things/{device_id}/jobs/"),
138 ];
139 if !allowed_patterns
140 .iter()
141 .any(|pattern| topic.starts_with(pattern))
142 {
143 return Err(MqttError::InvalidTopicName(format!(
144 "Cannot publish to reserved AWS IoT topic: {topic}"
145 )));
146 }
147 } else {
148 return Err(MqttError::InvalidTopicName(
149 "Device-specific topics require device ID to be configured".to_string(),
150 ));
151 }
152 }
153 } else {
154 let device_namespace_prefix =
157 format!("{}/{}/", self.service_prefix, self.device_namespace);
158
159 if let Some(ref device_id) = self.device_id {
161 let device_prefix = format!("{device_namespace_prefix}{device_id}/");
162
163 if topic.starts_with(&device_prefix) {
165 return Ok(());
166 }
167
168 if !topic.starts_with(&device_namespace_prefix) {
170 return Ok(());
173 }
174
175 if topic.starts_with(&device_namespace_prefix) {
177 return Err(MqttError::InvalidTopicName(format!(
178 "Topic '{topic}' is for a different device. Only topics under '{device_prefix}' are allowed"
179 )));
180 }
181 } else {
182 if topic.starts_with(&device_namespace_prefix) {
184 return Err(MqttError::InvalidTopicName(format!(
185 "Device-specific topics ({device_namespace_prefix}*) require device ID to be configured"
186 )));
187 }
188 }
190 }
191 }
192
193 for prefix in &self.additional_reserved_prefixes {
195 if topic.starts_with(prefix) {
196 return Err(MqttError::InvalidTopicName(format!(
197 "Topic '{topic}' uses reserved prefix '{prefix}'"
198 )));
199 }
200 }
201
202 Ok(())
203 }
204}
205
206impl TopicValidator for NamespaceValidator {
207 fn validate_topic_name(&self, topic: &str) -> Result<()> {
208 validate_topic_name(topic)?;
210
211 if self.service_prefix == "$aws" && topic.len() > 256 {
213 return Err(MqttError::InvalidTopicName(
214 "AWS IoT topics must not exceed 256 characters".to_string(),
215 ));
216 }
217
218 self.validate_namespace_restrictions(topic)
220 }
221
222 fn validate_topic_filter(&self, filter: &str) -> Result<()> {
223 validate_topic_filter(filter)?;
225
226 if self.service_prefix == "$aws" && filter.len() > 256 {
228 return Err(MqttError::InvalidTopicFilter(
229 "AWS IoT topic filters must not exceed 256 characters".to_string(),
230 ));
231 }
232
233 if !filter.contains('+') && !filter.contains('#') {
236 self.validate_namespace_restrictions(filter)?;
237 }
238
239 Ok(())
240 }
241
242 fn is_reserved_topic(&self, topic: &str) -> bool {
243 if self.is_service_topic(topic) {
245 let device_namespace_prefix =
246 format!("{}/{}/", self.service_prefix, self.device_namespace);
247
248 if let Some(ref device_id) = self.device_id {
250 let device_prefix = format!("{device_namespace_prefix}{device_id}/");
251 return !topic.starts_with(&device_prefix)
252 && topic.starts_with(&device_namespace_prefix);
253 }
254 return topic.starts_with(&device_namespace_prefix);
256 }
257
258 if Self::is_system_topic(topic) && !self.allow_system_topics {
260 return true;
261 }
262
263 self.additional_reserved_prefixes
265 .iter()
266 .any(|prefix| topic.starts_with(prefix))
267 }
268
269 fn description(&self) -> &'static str {
270 "Namespace-based topic validator with hierarchical isolation"
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn test_namespace_validator_basic() {
280 let validator = NamespaceValidator::new("$aws", "thing");
281
282 assert!(validator.validate_topic_name("sensor/temperature").is_ok());
284 assert!(validator.validate_topic_filter("sensor/+").is_ok());
285
286 assert!(validator
288 .validate_topic_name("$SYS/broker/version")
289 .is_err());
290 assert!(!validator.is_reserved_topic("regular/topic"));
291 assert!(validator.is_reserved_topic("$SYS/broker/version"));
292 }
293
294 #[test]
295 fn test_namespace_validator_with_device() {
296 let validator = NamespaceValidator::new("$aws", "things").with_device_id("my-device");
297
298 assert!(validator
300 .validate_topic_name("$aws/things/my-device/shadow/update")
301 .is_ok());
302
303 assert!(validator
305 .validate_topic_name("$aws/things/other-device/shadow/update")
306 .is_err());
307
308 assert!(validator
310 .validate_topic_name("$aws/events/presence/connected/my-device")
311 .is_ok());
312 }
313
314 #[test]
315 fn test_namespace_validator_system_topics() {
316 let validator = NamespaceValidator::new("$aws", "thing").with_system_topics(true);
317
318 assert!(validator.validate_topic_name("$SYS/broker/version").is_ok());
320 assert!(!validator.is_reserved_topic("$SYS/broker/version"));
321 }
322
323 #[test]
324 fn test_namespace_validator_additional_prefixes() {
325 let validator = NamespaceValidator::new("$aws", "thing")
326 .with_reserved_prefix("company/")
327 .with_reserved_prefix("internal/");
328
329 assert!(validator.validate_topic_name("company/secret").is_err());
331 assert!(validator.validate_topic_name("internal/admin").is_err());
332 assert!(validator.is_reserved_topic("company/secret"));
333
334 assert!(validator.validate_topic_name("public/sensor").is_ok());
336 }
337
338 #[test]
339 fn test_different_cloud_providers() {
340 let aws = NamespaceValidator::aws_iot().with_device_id("sensor-123");
342 assert!(aws
343 .validate_topic_name("$aws/things/sensor-123/shadow/update")
344 .is_ok());
345 assert!(aws
346 .validate_topic_name("$aws/things/sensor-456/shadow/update")
347 .is_err());
348
349 let azure = NamespaceValidator::azure_iot().with_device_id("device-abc");
351 assert!(azure
352 .validate_topic_name("$azure/device/device-abc/telemetry")
353 .is_ok());
354 assert!(azure
355 .validate_topic_name("$azure/device/device-xyz/telemetry")
356 .is_err());
357
358 let enterprise = NamespaceValidator::new("$company", "asset").with_device_id("machine-001");
360 assert!(enterprise
361 .validate_topic_name("$company/asset/machine-001/status")
362 .is_ok());
363 assert!(enterprise
364 .validate_topic_name("$company/asset/machine-002/status")
365 .is_err());
366 }
367}