hyperi_rustlib/transport/
routed.rs1use std::collections::HashMap;
52
53use super::error::{TransportError, TransportResult};
54use super::factory::AnySender;
55use super::traits::{TransportBase, TransportSender};
56use super::types::SendResult;
57
58pub struct RoutedSender {
64 routes: HashMap<String, AnySender>,
66 default: Option<AnySender>,
68 closed: std::sync::atomic::AtomicBool,
69}
70
71impl RoutedSender {
72 #[must_use]
74 pub fn new(routes: HashMap<String, AnySender>, default: Option<AnySender>) -> Self {
75 Self {
76 routes,
77 default,
78 closed: std::sync::atomic::AtomicBool::new(false),
79 }
80 }
81
82 pub async fn from_route_configs(
86 routes: HashMap<String, super::TransportConfig>,
87 default_config: Option<super::TransportConfig>,
88 ) -> TransportResult<Self> {
89 let mut senders = HashMap::with_capacity(routes.len());
90 for (key, config) in routes {
91 let sender = AnySender::from_transport_config(&config).await?;
92 senders.insert(key, sender);
93 }
94
95 let default = match default_config {
96 Some(cfg) => Some(AnySender::from_transport_config(&cfg).await?),
97 None => None,
98 };
99
100 Ok(Self::new(senders, default))
101 }
102
103 #[must_use]
105 pub fn route_keys(&self) -> Vec<&str> {
106 self.routes.keys().map(String::as_str).collect()
107 }
108
109 #[must_use]
111 pub fn has_route(&self, key: &str) -> bool {
112 self.routes.contains_key(key)
113 }
114
115 #[must_use]
117 pub fn has_default(&self) -> bool {
118 self.default.is_some()
119 }
120
121 fn resolve(&self, key: &str) -> Option<(&str, &AnySender)> {
125 if let Some((name, sender)) = self.routes.get_key_value(key) {
126 Some((name.as_str(), sender))
127 } else {
128 self.default.as_ref().map(|s| ("default", s))
129 }
130 }
131}
132
133impl TransportBase for RoutedSender {
134 async fn close(&self) -> TransportResult<()> {
135 self.closed
136 .store(true, std::sync::atomic::Ordering::Relaxed);
137 for sender in self.routes.values() {
139 sender.close().await?;
140 }
141 if let Some(ref default) = self.default {
142 default.close().await?;
143 }
144 Ok(())
145 }
146
147 fn is_healthy(&self) -> bool {
148 if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
149 return false;
150 }
151 let routes_healthy = self.routes.values().all(|s| s.is_healthy());
153 let default_healthy = self.default.as_ref().is_none_or(|s| s.is_healthy());
154 routes_healthy && default_healthy
155 }
156
157 fn name(&self) -> &'static str {
158 "routed"
159 }
160}
161
162impl TransportSender for RoutedSender {
163 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
164 if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
165 return SendResult::Fatal(TransportError::Closed);
166 }
167
168 let Some((route_name, sender)) = self.resolve(key) else {
169 return SendResult::Fatal(TransportError::Config(format!(
170 "no route configured for key '{key}' and no default sender"
171 )));
172 };
173 #[cfg(feature = "metrics")]
177 metrics::counter!(
178 "dfe_transport_sent_total",
179 "transport" => "routed",
180 "route" => route_name.to_string()
181 )
182 .increment(1);
183 #[cfg(not(feature = "metrics"))]
184 let _ = route_name;
185
186 sender.send(key, payload).await
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[cfg(feature = "transport-memory")]
195 use crate::transport::memory::{MemoryConfig, MemoryTransport};
196
197 #[cfg(feature = "transport-memory")]
198 fn make_memory_sender() -> AnySender {
199 AnySender::Memory(
200 MemoryTransport::new(&MemoryConfig::default())
201 .expect("memory transport with valid config must construct"),
202 )
203 }
204
205 #[tokio::test]
206 #[cfg(feature = "transport-memory")]
207 async fn routes_to_correct_sender() {
208 let mut route_map = HashMap::new();
209 route_map.insert("events.land".into(), make_memory_sender());
210 route_map.insert("events.load".into(), make_memory_sender());
211
212 let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
213
214 let result_land = sender
215 .send("events.land", bytes::Bytes::from_static(b"land-payload"))
216 .await;
217 assert!(result_land.is_ok());
218
219 let result_load = sender
220 .send("events.load", bytes::Bytes::from_static(b"load-payload"))
221 .await;
222 assert!(result_load.is_ok());
223
224 let result_default = sender
226 .send("unknown.key", bytes::Bytes::from_static(b"default-payload"))
227 .await;
228 assert!(result_default.is_ok());
229
230 assert!(sender.is_healthy());
231 assert_eq!(sender.name(), "routed");
232 }
233
234 #[tokio::test]
235 async fn no_route_no_default_returns_fatal() {
236 let sender = RoutedSender::new(HashMap::new(), None);
237
238 let result = sender
239 .send("unknown", bytes::Bytes::from_static(b"payload"))
240 .await;
241 assert!(result.is_fatal());
242 }
243
244 #[tokio::test]
245 #[cfg(feature = "transport-memory")]
246 async fn close_propagates_to_all_senders() {
247 let mut route_map = HashMap::new();
248 route_map.insert("a".into(), make_memory_sender());
249 let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
250
251 assert!(sender.is_healthy());
252 sender.close().await.unwrap();
253 assert!(!sender.is_healthy());
254 }
255
256 #[test]
257 fn route_keys_and_has_route() {
258 let sender = RoutedSender::new(HashMap::new(), None);
259 assert!(sender.route_keys().is_empty());
260 assert!(!sender.has_route("anything"));
261 assert!(!sender.has_default());
262 }
263
264 #[tokio::test]
265 async fn send_after_close_returns_fatal() {
266 let sender = RoutedSender::new(HashMap::new(), None);
267 sender.close().await.unwrap();
268
269 let result = sender
270 .send("key", bytes::Bytes::from_static(b"payload"))
271 .await;
272 assert!(result.is_fatal());
273 }
274
275 #[test]
279 #[cfg(feature = "transport-memory")]
280 fn resolve_returns_route_name_not_message_key() {
281 let mut route_map = HashMap::new();
282 route_map.insert("events.land".into(), make_memory_sender());
283 let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
284
285 let (name, _) = sender.resolve("events.land").unwrap();
287 assert_eq!(name, "events.land");
288
289 let (name, _) = sender.resolve("arbitrary-user-key-12345").unwrap();
292 assert_eq!(name, "default");
293 }
294}