1use crate::prelude::*;
2use crate::types::QoS;
3use crate::validation::topic_matches_filter;
4use serde::{Deserialize, Serialize};
5
6fn default_qos() -> QoS {
7 QoS::AtMostOnce
8}
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "lowercase")]
12pub enum BridgeDirection {
13 In,
14 Out,
15 Both,
16}
17
18impl BridgeDirection {
19 #[must_use]
20 pub fn allows_incoming(&self) -> bool {
21 matches!(self, Self::In | Self::Both)
22 }
23
24 #[must_use]
25 pub fn allows_outgoing(&self) -> bool {
26 matches!(self, Self::Out | Self::Both)
27 }
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TopicMappingCore {
32 pub pattern: String,
33 pub direction: BridgeDirection,
34 #[serde(default = "default_qos")]
35 pub qos: QoS,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub local_prefix: Option<String>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub remote_prefix: Option<String>,
40}
41
42impl TopicMappingCore {
43 #[must_use]
44 pub fn new(pattern: impl Into<String>, direction: BridgeDirection) -> Self {
45 Self {
46 pattern: pattern.into(),
47 direction,
48 qos: QoS::AtMostOnce,
49 local_prefix: None,
50 remote_prefix: None,
51 }
52 }
53
54 #[must_use]
55 pub fn with_qos(mut self, qos: QoS) -> Self {
56 self.qos = qos;
57 self
58 }
59
60 #[must_use]
61 pub fn with_local_prefix(mut self, prefix: impl Into<String>) -> Self {
62 self.local_prefix = Some(prefix.into());
63 self
64 }
65
66 #[must_use]
67 pub fn with_remote_prefix(mut self, prefix: impl Into<String>) -> Self {
68 self.remote_prefix = Some(prefix.into());
69 self
70 }
71
72 #[must_use]
73 pub fn apply_local_prefix(&self, topic: &str) -> String {
74 match &self.local_prefix {
75 Some(prefix) => format!("{prefix}{topic}"),
76 None => topic.into(),
77 }
78 }
79
80 #[must_use]
81 pub fn apply_remote_prefix(&self, topic: &str) -> String {
82 match &self.remote_prefix {
83 Some(prefix) => format!("{prefix}{topic}"),
84 None => topic.into(),
85 }
86 }
87
88 #[must_use]
89 pub fn matches(&self, topic: &str, is_outgoing: bool) -> bool {
90 let direction_matches = match self.direction {
91 BridgeDirection::In => !is_outgoing,
92 BridgeDirection::Out => is_outgoing,
93 BridgeDirection::Both => true,
94 };
95 direction_matches && topic_matches_filter(topic, &self.pattern)
96 }
97}
98
99#[derive(Debug, Clone, Default)]
100pub struct BridgeStats {
101 pub messages_sent: u64,
102 pub messages_received: u64,
103 pub connection_attempts: u32,
104 pub last_error: Option<String>,
105}
106
107impl BridgeStats {
108 #[must_use]
109 pub fn new() -> Self {
110 Self::default()
111 }
112
113 pub fn record_sent(&mut self) {
114 self.messages_sent += 1;
115 }
116
117 pub fn record_received(&mut self) {
118 self.messages_received += 1;
119 }
120
121 pub fn record_connection_attempt(&mut self) {
122 self.connection_attempts += 1;
123 }
124
125 pub fn record_error(&mut self, error: impl Into<String>) {
126 self.last_error = Some(error.into());
127 }
128
129 pub fn clear_error(&mut self) {
130 self.last_error = None;
131 }
132}
133
134pub struct ForwardingDecision {
135 pub transformed_topic: String,
136 pub qos: QoS,
137}
138
139#[must_use]
140pub fn evaluate_forwarding(
141 topic: &str,
142 mappings: &[TopicMappingCore],
143 is_outgoing: bool,
144) -> Option<ForwardingDecision> {
145 if topic.starts_with("$SYS/") {
146 return None;
147 }
148
149 for mapping in mappings {
150 if mapping.matches(topic, is_outgoing) {
151 let transformed = if is_outgoing {
152 mapping.apply_remote_prefix(topic)
153 } else {
154 mapping.apply_local_prefix(topic)
155 };
156
157 return Some(ForwardingDecision {
158 transformed_topic: transformed,
159 qos: mapping.qos,
160 });
161 }
162 }
163 None
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn test_bridge_direction_helpers() {
172 assert!(BridgeDirection::In.allows_incoming());
173 assert!(!BridgeDirection::In.allows_outgoing());
174
175 assert!(!BridgeDirection::Out.allows_incoming());
176 assert!(BridgeDirection::Out.allows_outgoing());
177
178 assert!(BridgeDirection::Both.allows_incoming());
179 assert!(BridgeDirection::Both.allows_outgoing());
180 }
181
182 #[test]
183 fn test_topic_mapping_prefixes() {
184 let mapping = TopicMappingCore::new("sensors/#", BridgeDirection::Out)
185 .with_qos(QoS::AtLeastOnce)
186 .with_local_prefix("local/")
187 .with_remote_prefix("remote/");
188
189 assert_eq!(mapping.apply_local_prefix("temp"), "local/temp");
190 assert_eq!(mapping.apply_remote_prefix("temp"), "remote/temp");
191 }
192
193 #[test]
194 fn test_topic_mapping_matches() {
195 let mapping = TopicMappingCore::new("sensors/#", BridgeDirection::Out);
196
197 assert!(mapping.matches("sensors/temp", true));
198 assert!(!mapping.matches("sensors/temp", false));
199 assert!(!mapping.matches("actuators/valve", true));
200 }
201
202 #[test]
203 fn test_forwarding_decision() {
204 let mappings = vec![TopicMappingCore::new("sensors/#", BridgeDirection::Out)
205 .with_qos(QoS::AtLeastOnce)
206 .with_remote_prefix("bridge/")];
207
208 let decision = evaluate_forwarding("sensors/temp", &mappings, true);
209 assert!(decision.is_some());
210 let d = decision.unwrap();
211 assert_eq!(d.transformed_topic, "bridge/sensors/temp");
212 assert_eq!(d.qos, QoS::AtLeastOnce);
213
214 assert!(evaluate_forwarding("$SYS/broker/load", &mappings, true).is_none());
215 assert!(evaluate_forwarding("sensors/temp", &mappings, false).is_none());
216 }
217
218 #[test]
219 fn test_bridge_stats() {
220 let mut stats = BridgeStats::new();
221 stats.record_sent();
222 stats.record_sent();
223 stats.record_received();
224 stats.record_connection_attempt();
225 stats.record_error("connection refused");
226
227 assert_eq!(stats.messages_sent, 2);
228 assert_eq!(stats.messages_received, 1);
229 assert_eq!(stats.connection_attempts, 1);
230 assert_eq!(stats.last_error, Some("connection refused".into()));
231
232 stats.clear_error();
233 assert!(stats.last_error.is_none());
234 }
235}