use crate::error::{MqttError, Result};
use crate::prelude::{format, String, ToString, Vec};
use crate::validation::{validate_topic_filter, validate_topic_name, TopicValidator};
#[derive(Debug, Clone)]
pub struct NamespaceValidator {
pub service_prefix: String,
pub device_namespace: String,
pub device_id: Option<String>,
pub allow_system_topics: bool,
pub additional_reserved_prefixes: Vec<String>,
}
impl NamespaceValidator {
#[must_use]
pub fn new(service_prefix: impl Into<String>, device_namespace: impl Into<String>) -> Self {
Self {
service_prefix: service_prefix.into(),
device_namespace: device_namespace.into(),
device_id: None,
allow_system_topics: false,
additional_reserved_prefixes: Vec::new(),
}
}
#[must_use]
pub fn aws_iot() -> Self {
Self::new("$aws", "things")
}
#[must_use]
pub fn azure_iot() -> Self {
Self::new("$azure", "device")
}
#[must_use]
pub fn google_cloud_iot() -> Self {
Self::new("$gcp", "device")
}
#[must_use]
pub fn with_device_id(mut self, device_id: impl Into<String>) -> Self {
self.device_id = Some(device_id.into());
self
}
#[must_use]
pub fn with_system_topics(mut self, allow: bool) -> Self {
self.allow_system_topics = allow;
self
}
#[must_use]
pub fn with_reserved_prefix(mut self, prefix: impl Into<String>) -> Self {
self.additional_reserved_prefixes.push(prefix.into());
self
}
fn is_service_topic(&self, topic: &str) -> bool {
topic.starts_with(&format!("{}/", self.service_prefix))
}
fn is_system_topic(topic: &str) -> bool {
topic.starts_with("$SYS/")
}
fn validate_namespace_restrictions(&self, topic: &str) -> Result<()> {
if Self::is_system_topic(topic) && !self.allow_system_topics {
return Err(MqttError::InvalidTopicName(
"System topics ($SYS/*) are not allowed".to_string(),
));
}
if self.is_service_topic(topic) {
if self.service_prefix == "$aws" {
let aws_reserved_prefixes = ["$aws/certificates/", "$aws/provisioning-templates/"];
for reserved in &aws_reserved_prefixes {
if topic.starts_with(reserved) {
return Err(MqttError::InvalidTopicName(format!(
"Cannot publish to reserved AWS IoT topic: {topic}"
)));
}
}
if topic.starts_with("$aws/things/") {
if let Some(ref device_id) = self.device_id {
let allowed_patterns = [
format!("$aws/things/{device_id}/shadow/update"),
format!("$aws/things/{device_id}/shadow/delete"),
format!("$aws/things/{device_id}/jobs/"),
];
if !allowed_patterns
.iter()
.any(|pattern| topic.starts_with(pattern))
{
return Err(MqttError::InvalidTopicName(format!(
"Cannot publish to reserved AWS IoT topic: {topic}"
)));
}
} else {
return Err(MqttError::InvalidTopicName(
"Device-specific topics require device ID to be configured".to_string(),
));
}
}
} else {
let device_namespace_prefix =
format!("{}/{}/", self.service_prefix, self.device_namespace);
if let Some(ref device_id) = self.device_id {
let device_prefix = format!("{device_namespace_prefix}{device_id}/");
if topic.starts_with(&device_prefix) {
return Ok(());
}
if !topic.starts_with(&device_namespace_prefix) {
return Ok(());
}
if topic.starts_with(&device_namespace_prefix) {
return Err(MqttError::InvalidTopicName(format!(
"Topic '{topic}' is for a different device. Only topics under '{device_prefix}' are allowed"
)));
}
} else {
if topic.starts_with(&device_namespace_prefix) {
return Err(MqttError::InvalidTopicName(format!(
"Device-specific topics ({device_namespace_prefix}*) require device ID to be configured"
)));
}
}
}
}
for prefix in &self.additional_reserved_prefixes {
if topic.starts_with(prefix) {
return Err(MqttError::InvalidTopicName(format!(
"Topic '{topic}' uses reserved prefix '{prefix}'"
)));
}
}
Ok(())
}
}
impl TopicValidator for NamespaceValidator {
fn validate_topic_name(&self, topic: &str) -> Result<()> {
validate_topic_name(topic)?;
if self.service_prefix == "$aws" && topic.len() > 256 {
return Err(MqttError::InvalidTopicName(
"AWS IoT topics must not exceed 256 characters".to_string(),
));
}
self.validate_namespace_restrictions(topic)
}
fn validate_topic_filter(&self, filter: &str) -> Result<()> {
validate_topic_filter(filter)?;
if self.service_prefix == "$aws" && filter.len() > 256 {
return Err(MqttError::InvalidTopicFilter(
"AWS IoT topic filters must not exceed 256 characters".to_string(),
));
}
if !filter.contains('+') && !filter.contains('#') {
self.validate_namespace_restrictions(filter)?;
}
Ok(())
}
fn is_reserved_topic(&self, topic: &str) -> bool {
if self.is_service_topic(topic) {
let device_namespace_prefix =
format!("{}/{}/", self.service_prefix, self.device_namespace);
if let Some(ref device_id) = self.device_id {
let device_prefix = format!("{device_namespace_prefix}{device_id}/");
return !topic.starts_with(&device_prefix)
&& topic.starts_with(&device_namespace_prefix);
}
return topic.starts_with(&device_namespace_prefix);
}
if Self::is_system_topic(topic) && !self.allow_system_topics {
return true;
}
self.additional_reserved_prefixes
.iter()
.any(|prefix| topic.starts_with(prefix))
}
fn description(&self) -> &'static str {
"Namespace-based topic validator with hierarchical isolation"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_namespace_validator_basic() {
let validator = NamespaceValidator::new("$aws", "thing");
assert!(validator.validate_topic_name("sensor/temperature").is_ok());
assert!(validator.validate_topic_filter("sensor/+").is_ok());
assert!(validator
.validate_topic_name("$SYS/broker/version")
.is_err());
assert!(!validator.is_reserved_topic("regular/topic"));
assert!(validator.is_reserved_topic("$SYS/broker/version"));
}
#[test]
fn test_namespace_validator_with_device() {
let validator = NamespaceValidator::new("$aws", "things").with_device_id("my-device");
assert!(validator
.validate_topic_name("$aws/things/my-device/shadow/update")
.is_ok());
assert!(validator
.validate_topic_name("$aws/things/other-device/shadow/update")
.is_err());
assert!(validator
.validate_topic_name("$aws/events/presence/connected/my-device")
.is_ok());
}
#[test]
fn test_namespace_validator_system_topics() {
let validator = NamespaceValidator::new("$aws", "thing").with_system_topics(true);
assert!(validator.validate_topic_name("$SYS/broker/version").is_ok());
assert!(!validator.is_reserved_topic("$SYS/broker/version"));
}
#[test]
fn test_namespace_validator_additional_prefixes() {
let validator = NamespaceValidator::new("$aws", "thing")
.with_reserved_prefix("company/")
.with_reserved_prefix("internal/");
assert!(validator.validate_topic_name("company/secret").is_err());
assert!(validator.validate_topic_name("internal/admin").is_err());
assert!(validator.is_reserved_topic("company/secret"));
assert!(validator.validate_topic_name("public/sensor").is_ok());
}
#[test]
fn test_different_cloud_providers() {
let aws = NamespaceValidator::aws_iot().with_device_id("sensor-123");
assert!(aws
.validate_topic_name("$aws/things/sensor-123/shadow/update")
.is_ok());
assert!(aws
.validate_topic_name("$aws/things/sensor-456/shadow/update")
.is_err());
let azure = NamespaceValidator::azure_iot().with_device_id("device-abc");
assert!(azure
.validate_topic_name("$azure/device/device-abc/telemetry")
.is_ok());
assert!(azure
.validate_topic_name("$azure/device/device-xyz/telemetry")
.is_err());
let enterprise = NamespaceValidator::new("$company", "asset").with_device_id("machine-001");
assert!(enterprise
.validate_topic_name("$company/asset/machine-001/status")
.is_ok());
assert!(enterprise
.validate_topic_name("$company/asset/machine-002/status")
.is_err());
}
}