mqtt5_protocol/validation/
namespace.rs

1use crate::error::{MqttError, Result};
2use crate::validation::{validate_topic_filter, validate_topic_name, TopicValidator};
3
4/// Namespace-based topic validator for hierarchical topic isolation
5///
6/// This validator implements namespace-based topic validation patterns that enforce
7/// hierarchical isolation of topics, commonly used in cloud `IoT` platforms and
8/// enterprise systems.
9///
10/// The validator enforces:
11/// - Service-reserved topic prefixes (e.g., `$aws/`, `$azure/`, `$company/`)
12/// - Device-specific namespaces (e.g., `{prefix}/thing/{device}/`, `{prefix}/device/{device}/`)
13/// - System topics (e.g., `$SYS/`)
14///
15/// Examples:
16/// - AWS `IoT`: `NamespaceValidator::new("$aws", "thing")`
17/// - Azure `IoT`: `NamespaceValidator::new("$azure", "device")`
18/// - Enterprise: `NamespaceValidator::new("$company", "asset")`
19#[derive(Debug, Clone)]
20pub struct NamespaceValidator {
21    /// Service prefix for reserved topics (e.g., "$aws", "$azure", "$company")
22    pub service_prefix: String,
23    /// Device namespace identifier (e.g., "thing", "device", "asset")
24    pub device_namespace: String,
25    /// Optional device identifier for device-specific validation
26    /// If set, allows `{service_prefix}/{device_namespace}/{device_id}/*` topics
27    pub device_id: Option<String>,
28    /// Whether to allow system topics (`$SYS/*`)
29    pub allow_system_topics: bool,
30    /// Additional reserved prefixes beyond the service prefix
31    pub additional_reserved_prefixes: Vec<String>,
32}
33
34impl NamespaceValidator {
35    /// Creates a new namespace validator
36    ///
37    /// # Arguments
38    /// * `service_prefix` - The service prefix (e.g., "$aws", "$azure")
39    /// * `device_namespace` - The device namespace pattern (e.g., "thing", "device")
40    #[must_use]
41    pub fn new(service_prefix: impl Into<String>, device_namespace: impl Into<String>) -> Self {
42        Self {
43            service_prefix: service_prefix.into(),
44            device_namespace: device_namespace.into(),
45            device_id: None,
46            allow_system_topics: false,
47            additional_reserved_prefixes: Vec::new(),
48        }
49    }
50
51    /// Creates a validator configured for AWS `IoT` Core
52    #[must_use]
53    pub fn aws_iot() -> Self {
54        Self::new("$aws", "things")
55    }
56
57    /// Creates a validator configured for Azure `IoT` Hub
58    #[must_use]
59    pub fn azure_iot() -> Self {
60        Self::new("$azure", "device")
61    }
62
63    /// Creates a validator configured for Google Cloud `IoT`
64    #[must_use]
65    pub fn google_cloud_iot() -> Self {
66        Self::new("$gcp", "device")
67    }
68
69    /// Sets the device identifier for device-specific topic validation
70    ///
71    /// When set, allows topics like `{prefix}/{namespace}/{device_id}/*` while
72    /// rejecting `{prefix}/{namespace}/{other_device}/*`
73    #[must_use]
74    pub fn with_device_id(mut self, device_id: impl Into<String>) -> Self {
75        self.device_id = Some(device_id.into());
76        self
77    }
78
79    /// Enables system topics (`$SYS/*`)
80    #[must_use]
81    pub fn with_system_topics(mut self, allow: bool) -> Self {
82        self.allow_system_topics = allow;
83        self
84    }
85
86    /// Adds an additional reserved prefix
87    #[must_use]
88    pub fn with_reserved_prefix(mut self, prefix: impl Into<String>) -> Self {
89        self.additional_reserved_prefixes.push(prefix.into());
90        self
91    }
92
93    /// Checks if a topic is a service topic (e.g., `$aws/*`, `$azure/*`)
94    fn is_service_topic(&self, topic: &str) -> bool {
95        topic.starts_with(&format!("{}/", self.service_prefix))
96    }
97
98    /// Checks if a topic is a system topic (`$SYS/*`)  
99    fn is_system_topic(topic: &str) -> bool {
100        topic.starts_with("$SYS/")
101    }
102
103    /// Validates namespace-based topic restrictions
104    fn validate_namespace_restrictions(&self, topic: &str) -> Result<()> {
105        // Check system topics
106        if Self::is_system_topic(topic) && !self.allow_system_topics {
107            return Err(MqttError::InvalidTopicName(
108                "System topics ($SYS/*) are not allowed".to_string(),
109            ));
110        }
111
112        // Check service topics
113        if self.is_service_topic(topic) {
114            // For AWS IoT, most service topics are read-only and shouldn't be published to
115            if self.service_prefix == "$aws" {
116                // AWS IoT reserved topics that clients should not publish to
117                let aws_reserved_prefixes = ["$aws/certificates/", "$aws/provisioning-templates/"];
118
119                // Check if it's a reserved AWS topic
120                for reserved in &aws_reserved_prefixes {
121                    if topic.starts_with(reserved) {
122                        return Err(MqttError::InvalidTopicName(format!(
123                            "Cannot publish to reserved AWS IoT topic: {topic}"
124                        )));
125                    }
126                }
127
128                // Check device-specific topics for AWS
129                if topic.starts_with("$aws/things/") {
130                    // AWS IoT topics like $aws/things/{thing}/shadow/get/accepted are read-only
131                    // Only allow certain operations for publishing
132                    if let Some(ref device_id) = self.device_id {
133                        // Allow shadow update/delete and job operations for the configured device
134                        let allowed_patterns = [
135                            format!("$aws/things/{device_id}/shadow/update"),
136                            format!("$aws/things/{device_id}/shadow/delete"),
137                            format!("$aws/things/{device_id}/jobs/"),
138                        ];
139                        if !allowed_patterns
140                            .iter()
141                            .any(|pattern| topic.starts_with(pattern))
142                        {
143                            return Err(MqttError::InvalidTopicName(format!(
144                                "Cannot publish to reserved AWS IoT topic: {topic}"
145                            )));
146                        }
147                    } else {
148                        return Err(MqttError::InvalidTopicName(
149                            "Device-specific topics require device ID to be configured".to_string(),
150                        ));
151                    }
152                }
153            } else {
154                // Original logic for non-AWS providers
155                // Build the device namespace prefix (e.g., "$aws/thing/", "$azure/device/")
156                let device_namespace_prefix =
157                    format!("{}/{}/", self.service_prefix, self.device_namespace);
158
159                // If we have a device ID, check device-specific topics
160                if let Some(ref device_id) = self.device_id {
161                    let device_prefix = format!("{device_namespace_prefix}{device_id}/");
162
163                    // Allow device-specific topics
164                    if topic.starts_with(&device_prefix) {
165                        return Ok(());
166                    }
167
168                    // Allow general service topics that don't start with device namespace
169                    if !topic.starts_with(&device_namespace_prefix) {
170                        // These are general service topics like $aws/events, $azure/operations, etc.
171                        // Allow them for now, but this could be further restricted based on requirements
172                        return Ok(());
173                    }
174
175                    // Reject topics for other devices
176                    if topic.starts_with(&device_namespace_prefix) {
177                        return Err(MqttError::InvalidTopicName(format!(
178                            "Topic '{topic}' is for a different device. Only topics under '{device_prefix}' are allowed"
179                        )));
180                    }
181                } else {
182                    // No device ID set - reject all device-specific topics
183                    if topic.starts_with(&device_namespace_prefix) {
184                        return Err(MqttError::InvalidTopicName(format!(
185                            "Device-specific topics ({device_namespace_prefix}*) require device ID to be configured"
186                        )));
187                    }
188                    // Allow other service topics
189                }
190            }
191        }
192
193        // Check additional reserved prefixes
194        for prefix in &self.additional_reserved_prefixes {
195            if topic.starts_with(prefix) {
196                return Err(MqttError::InvalidTopicName(format!(
197                    "Topic '{topic}' uses reserved prefix '{prefix}'"
198                )));
199            }
200        }
201
202        Ok(())
203    }
204}
205
206impl TopicValidator for NamespaceValidator {
207    fn validate_topic_name(&self, topic: &str) -> Result<()> {
208        // First apply standard MQTT validation
209        validate_topic_name(topic)?;
210
211        // AWS IoT has a 256 character limit
212        if self.service_prefix == "$aws" && topic.len() > 256 {
213            return Err(MqttError::InvalidTopicName(
214                "AWS IoT topics must not exceed 256 characters".to_string(),
215            ));
216        }
217
218        // Then apply namespace-specific restrictions
219        self.validate_namespace_restrictions(topic)
220    }
221
222    fn validate_topic_filter(&self, filter: &str) -> Result<()> {
223        // First apply standard MQTT validation
224        validate_topic_filter(filter)?;
225
226        // AWS IoT has a 256 character limit
227        if self.service_prefix == "$aws" && filter.len() > 256 {
228            return Err(MqttError::InvalidTopicFilter(
229                "AWS IoT topic filters must not exceed 256 characters".to_string(),
230            ));
231        }
232
233        // For filters, we're more lenient with wildcards
234        // Only check if it's a literal reserved prefix (no wildcards)
235        if !filter.contains('+') && !filter.contains('#') {
236            self.validate_namespace_restrictions(filter)?;
237        }
238
239        Ok(())
240    }
241
242    fn is_reserved_topic(&self, topic: &str) -> bool {
243        // Check service topics
244        if self.is_service_topic(topic) {
245            let device_namespace_prefix =
246                format!("{}/{}/", self.service_prefix, self.device_namespace);
247
248            // If we have a device ID, only topics outside our device are reserved
249            if let Some(ref device_id) = self.device_id {
250                let device_prefix = format!("{device_namespace_prefix}{device_id}/");
251                return !topic.starts_with(&device_prefix)
252                    && topic.starts_with(&device_namespace_prefix);
253            }
254            // If no device ID, all device namespace topics are reserved
255            return topic.starts_with(&device_namespace_prefix);
256        }
257
258        // Check system topics
259        if Self::is_system_topic(topic) && !self.allow_system_topics {
260            return true;
261        }
262
263        // Check additional reserved prefixes
264        self.additional_reserved_prefixes
265            .iter()
266            .any(|prefix| topic.starts_with(prefix))
267    }
268
269    fn description(&self) -> &'static str {
270        "Namespace-based topic validator with hierarchical isolation"
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn test_namespace_validator_basic() {
280        let validator = NamespaceValidator::new("$aws", "thing");
281
282        // Regular topics should work
283        assert!(validator.validate_topic_name("sensor/temperature").is_ok());
284        assert!(validator.validate_topic_filter("sensor/+").is_ok());
285
286        // System topics should be rejected by default
287        assert!(validator
288            .validate_topic_name("$SYS/broker/version")
289            .is_err());
290        assert!(!validator.is_reserved_topic("regular/topic"));
291        assert!(validator.is_reserved_topic("$SYS/broker/version"));
292    }
293
294    #[test]
295    fn test_namespace_validator_with_device() {
296        let validator = NamespaceValidator::new("$aws", "things").with_device_id("my-device");
297
298        // Device-specific topics should work
299        assert!(validator
300            .validate_topic_name("$aws/things/my-device/shadow/update")
301            .is_ok());
302
303        // Other device topics should be rejected
304        assert!(validator
305            .validate_topic_name("$aws/things/other-device/shadow/update")
306            .is_err());
307
308        // General AWS topics should work
309        assert!(validator
310            .validate_topic_name("$aws/events/presence/connected/my-device")
311            .is_ok());
312    }
313
314    #[test]
315    fn test_namespace_validator_system_topics() {
316        let validator = NamespaceValidator::new("$aws", "thing").with_system_topics(true);
317
318        // System topics should now work
319        assert!(validator.validate_topic_name("$SYS/broker/version").is_ok());
320        assert!(!validator.is_reserved_topic("$SYS/broker/version"));
321    }
322
323    #[test]
324    fn test_namespace_validator_additional_prefixes() {
325        let validator = NamespaceValidator::new("$aws", "thing")
326            .with_reserved_prefix("company/")
327            .with_reserved_prefix("internal/");
328
329        // Additional reserved prefixes should be rejected
330        assert!(validator.validate_topic_name("company/secret").is_err());
331        assert!(validator.validate_topic_name("internal/admin").is_err());
332        assert!(validator.is_reserved_topic("company/secret"));
333
334        // Regular topics should work
335        assert!(validator.validate_topic_name("public/sensor").is_ok());
336    }
337
338    #[test]
339    fn test_different_cloud_providers() {
340        // Test AWS IoT
341        let aws = NamespaceValidator::aws_iot().with_device_id("sensor-123");
342        assert!(aws
343            .validate_topic_name("$aws/things/sensor-123/shadow/update")
344            .is_ok());
345        assert!(aws
346            .validate_topic_name("$aws/things/sensor-456/shadow/update")
347            .is_err());
348
349        // Test Azure IoT
350        let azure = NamespaceValidator::azure_iot().with_device_id("device-abc");
351        assert!(azure
352            .validate_topic_name("$azure/device/device-abc/telemetry")
353            .is_ok());
354        assert!(azure
355            .validate_topic_name("$azure/device/device-xyz/telemetry")
356            .is_err());
357
358        // Test custom enterprise
359        let enterprise = NamespaceValidator::new("$company", "asset").with_device_id("machine-001");
360        assert!(enterprise
361            .validate_topic_name("$company/asset/machine-001/status")
362            .is_ok());
363        assert!(enterprise
364            .validate_topic_name("$company/asset/machine-002/status")
365            .is_err());
366    }
367}