Skip to main content

mqtt5_protocol/
bridge.rs

1use crate::prelude::*;
2use crate::types::QoS;
3use crate::validation::topic_matches_filter;
4use serde::{Deserialize, Serialize};
5
6fn default_qos() -> QoS {
7    QoS::AtMostOnce
8}
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "lowercase")]
12pub enum BridgeDirection {
13    In,
14    Out,
15    Both,
16}
17
18impl BridgeDirection {
19    #[must_use]
20    pub fn allows_incoming(&self) -> bool {
21        matches!(self, Self::In | Self::Both)
22    }
23
24    #[must_use]
25    pub fn allows_outgoing(&self) -> bool {
26        matches!(self, Self::Out | Self::Both)
27    }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TopicMappingCore {
32    pub pattern: String,
33    pub direction: BridgeDirection,
34    #[serde(default = "default_qos")]
35    pub qos: QoS,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub local_prefix: Option<String>,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub remote_prefix: Option<String>,
40}
41
42impl TopicMappingCore {
43    #[must_use]
44    pub fn new(pattern: impl Into<String>, direction: BridgeDirection) -> Self {
45        Self {
46            pattern: pattern.into(),
47            direction,
48            qos: QoS::AtMostOnce,
49            local_prefix: None,
50            remote_prefix: None,
51        }
52    }
53
54    #[must_use]
55    pub fn with_qos(mut self, qos: QoS) -> Self {
56        self.qos = qos;
57        self
58    }
59
60    #[must_use]
61    pub fn with_local_prefix(mut self, prefix: impl Into<String>) -> Self {
62        self.local_prefix = Some(prefix.into());
63        self
64    }
65
66    #[must_use]
67    pub fn with_remote_prefix(mut self, prefix: impl Into<String>) -> Self {
68        self.remote_prefix = Some(prefix.into());
69        self
70    }
71
72    #[must_use]
73    pub fn apply_local_prefix(&self, topic: &str) -> String {
74        match &self.local_prefix {
75            Some(prefix) => format!("{prefix}{topic}"),
76            None => topic.into(),
77        }
78    }
79
80    #[must_use]
81    pub fn apply_remote_prefix(&self, topic: &str) -> String {
82        match &self.remote_prefix {
83            Some(prefix) => format!("{prefix}{topic}"),
84            None => topic.into(),
85        }
86    }
87
88    #[must_use]
89    pub fn matches(&self, topic: &str, is_outgoing: bool) -> bool {
90        let direction_matches = match self.direction {
91            BridgeDirection::In => !is_outgoing,
92            BridgeDirection::Out => is_outgoing,
93            BridgeDirection::Both => true,
94        };
95        direction_matches && topic_matches_filter(topic, &self.pattern)
96    }
97}
98
99#[derive(Debug, Clone, Default)]
100pub struct BridgeStats {
101    pub messages_sent: u64,
102    pub messages_received: u64,
103    pub connection_attempts: u32,
104    pub last_error: Option<String>,
105}
106
107impl BridgeStats {
108    #[must_use]
109    pub fn new() -> Self {
110        Self::default()
111    }
112
113    pub fn record_sent(&mut self) {
114        self.messages_sent += 1;
115    }
116
117    pub fn record_received(&mut self) {
118        self.messages_received += 1;
119    }
120
121    pub fn record_connection_attempt(&mut self) {
122        self.connection_attempts += 1;
123    }
124
125    pub fn record_error(&mut self, error: impl Into<String>) {
126        self.last_error = Some(error.into());
127    }
128
129    pub fn clear_error(&mut self) {
130        self.last_error = None;
131    }
132}
133
134pub struct ForwardingDecision {
135    pub transformed_topic: String,
136    pub qos: QoS,
137}
138
139#[must_use]
140pub fn evaluate_forwarding(
141    topic: &str,
142    mappings: &[TopicMappingCore],
143    is_outgoing: bool,
144) -> Option<ForwardingDecision> {
145    if topic.starts_with("$SYS/") {
146        return None;
147    }
148
149    for mapping in mappings {
150        if mapping.matches(topic, is_outgoing) {
151            let transformed = if is_outgoing {
152                mapping.apply_remote_prefix(topic)
153            } else {
154                mapping.apply_local_prefix(topic)
155            };
156
157            return Some(ForwardingDecision {
158                transformed_topic: transformed,
159                qos: mapping.qos,
160            });
161        }
162    }
163    None
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_bridge_direction_helpers() {
172        assert!(BridgeDirection::In.allows_incoming());
173        assert!(!BridgeDirection::In.allows_outgoing());
174
175        assert!(!BridgeDirection::Out.allows_incoming());
176        assert!(BridgeDirection::Out.allows_outgoing());
177
178        assert!(BridgeDirection::Both.allows_incoming());
179        assert!(BridgeDirection::Both.allows_outgoing());
180    }
181
182    #[test]
183    fn test_topic_mapping_prefixes() {
184        let mapping = TopicMappingCore::new("sensors/#", BridgeDirection::Out)
185            .with_qos(QoS::AtLeastOnce)
186            .with_local_prefix("local/")
187            .with_remote_prefix("remote/");
188
189        assert_eq!(mapping.apply_local_prefix("temp"), "local/temp");
190        assert_eq!(mapping.apply_remote_prefix("temp"), "remote/temp");
191    }
192
193    #[test]
194    fn test_topic_mapping_matches() {
195        let mapping = TopicMappingCore::new("sensors/#", BridgeDirection::Out);
196
197        assert!(mapping.matches("sensors/temp", true));
198        assert!(!mapping.matches("sensors/temp", false));
199        assert!(!mapping.matches("actuators/valve", true));
200    }
201
202    #[test]
203    fn test_forwarding_decision() {
204        let mappings = vec![TopicMappingCore::new("sensors/#", BridgeDirection::Out)
205            .with_qos(QoS::AtLeastOnce)
206            .with_remote_prefix("bridge/")];
207
208        let decision = evaluate_forwarding("sensors/temp", &mappings, true);
209        assert!(decision.is_some());
210        let d = decision.unwrap();
211        assert_eq!(d.transformed_topic, "bridge/sensors/temp");
212        assert_eq!(d.qos, QoS::AtLeastOnce);
213
214        assert!(evaluate_forwarding("$SYS/broker/load", &mappings, true).is_none());
215        assert!(evaluate_forwarding("sensors/temp", &mappings, false).is_none());
216    }
217
218    #[test]
219    fn test_bridge_stats() {
220        let mut stats = BridgeStats::new();
221        stats.record_sent();
222        stats.record_sent();
223        stats.record_received();
224        stats.record_connection_attempt();
225        stats.record_error("connection refused");
226
227        assert_eq!(stats.messages_sent, 2);
228        assert_eq!(stats.messages_received, 1);
229        assert_eq!(stats.connection_attempts, 1);
230        assert_eq!(stats.last_error, Some("connection refused".into()));
231
232        stats.clear_error();
233        assert!(stats.last_error.is_none());
234    }
235}