hyperi_rustlib/transport/
routed.rs1use std::collections::HashMap;
52
53use super::error::{TransportError, TransportResult};
54use super::factory::AnySender;
55use super::traits::{TransportBase, TransportSender};
56use super::types::SendResult;
57
58pub struct RoutedSender {
64 routes: HashMap<String, AnySender>,
66 default: Option<AnySender>,
68 closed: std::sync::atomic::AtomicBool,
69}
70
71impl RoutedSender {
72 pub fn new(routes: HashMap<String, AnySender>, default: Option<AnySender>) -> Self {
74 Self {
75 routes,
76 default,
77 closed: std::sync::atomic::AtomicBool::new(false),
78 }
79 }
80
81 pub async fn from_route_configs(
85 routes: HashMap<String, super::TransportConfig>,
86 default_config: Option<super::TransportConfig>,
87 ) -> TransportResult<Self> {
88 let mut senders = HashMap::with_capacity(routes.len());
89 for (key, config) in routes {
90 let sender = AnySender::from_transport_config(&config).await?;
91 senders.insert(key, sender);
92 }
93
94 let default = match default_config {
95 Some(cfg) => Some(AnySender::from_transport_config(&cfg).await?),
96 None => None,
97 };
98
99 Ok(Self::new(senders, default))
100 }
101
102 #[must_use]
104 pub fn route_keys(&self) -> Vec<&str> {
105 self.routes.keys().map(String::as_str).collect()
106 }
107
108 #[must_use]
110 pub fn has_route(&self, key: &str) -> bool {
111 self.routes.contains_key(key)
112 }
113
114 #[must_use]
116 pub fn has_default(&self) -> bool {
117 self.default.is_some()
118 }
119
120 fn resolve(&self, key: &str) -> Option<(&str, &AnySender)> {
124 if let Some((name, sender)) = self.routes.get_key_value(key) {
125 Some((name.as_str(), sender))
126 } else {
127 self.default.as_ref().map(|s| ("default", s))
128 }
129 }
130}
131
132impl TransportBase for RoutedSender {
133 async fn close(&self) -> TransportResult<()> {
134 self.closed
135 .store(true, std::sync::atomic::Ordering::Relaxed);
136 for sender in self.routes.values() {
138 sender.close().await?;
139 }
140 if let Some(ref default) = self.default {
141 default.close().await?;
142 }
143 Ok(())
144 }
145
146 fn is_healthy(&self) -> bool {
147 if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
148 return false;
149 }
150 let routes_healthy = self.routes.values().all(|s| s.is_healthy());
152 let default_healthy = self.default.as_ref().is_none_or(|s| s.is_healthy());
153 routes_healthy && default_healthy
154 }
155
156 fn name(&self) -> &'static str {
157 "routed"
158 }
159}
160
161impl TransportSender for RoutedSender {
162 async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
163 if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
164 return SendResult::Fatal(TransportError::Closed);
165 }
166
167 let Some((route_name, sender)) = self.resolve(key) else {
168 return SendResult::Fatal(TransportError::Config(format!(
169 "no route configured for key '{key}' and no default sender"
170 )));
171 };
172 #[cfg(feature = "metrics")]
176 metrics::counter!(
177 "dfe_transport_sent_total",
178 "transport" => "routed",
179 "route" => route_name.to_string()
180 )
181 .increment(1);
182 #[cfg(not(feature = "metrics"))]
183 let _ = route_name;
184
185 sender.send(key, payload).await
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[cfg(feature = "transport-memory")]
194 use crate::transport::memory::{MemoryConfig, MemoryTransport};
195
196 #[cfg(feature = "transport-memory")]
197 fn make_memory_sender() -> AnySender {
198 AnySender::Memory(
199 MemoryTransport::new(&MemoryConfig::default())
200 .expect("memory transport with valid config must construct"),
201 )
202 }
203
204 #[tokio::test]
205 #[cfg(feature = "transport-memory")]
206 async fn routes_to_correct_sender() {
207 let mut route_map = HashMap::new();
208 route_map.insert("events.land".into(), make_memory_sender());
209 route_map.insert("events.load".into(), make_memory_sender());
210
211 let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
212
213 let result_land = sender.send("events.land", b"land-payload").await;
214 assert!(result_land.is_ok());
215
216 let result_load = sender.send("events.load", b"load-payload").await;
217 assert!(result_load.is_ok());
218
219 let result_default = sender.send("unknown.key", b"default-payload").await;
221 assert!(result_default.is_ok());
222
223 assert!(sender.is_healthy());
224 assert_eq!(sender.name(), "routed");
225 }
226
227 #[tokio::test]
228 async fn no_route_no_default_returns_fatal() {
229 let sender = RoutedSender::new(HashMap::new(), None);
230
231 let result = sender.send("unknown", b"payload").await;
232 assert!(result.is_fatal());
233 }
234
235 #[tokio::test]
236 #[cfg(feature = "transport-memory")]
237 async fn close_propagates_to_all_senders() {
238 let mut route_map = HashMap::new();
239 route_map.insert("a".into(), make_memory_sender());
240 let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
241
242 assert!(sender.is_healthy());
243 sender.close().await.unwrap();
244 assert!(!sender.is_healthy());
245 }
246
247 #[test]
248 fn route_keys_and_has_route() {
249 let sender = RoutedSender::new(HashMap::new(), None);
250 assert!(sender.route_keys().is_empty());
251 assert!(!sender.has_route("anything"));
252 assert!(!sender.has_default());
253 }
254
255 #[tokio::test]
256 async fn send_after_close_returns_fatal() {
257 let sender = RoutedSender::new(HashMap::new(), None);
258 sender.close().await.unwrap();
259
260 let result = sender.send("key", b"payload").await;
261 assert!(result.is_fatal());
262 }
263
264 #[test]
268 #[cfg(feature = "transport-memory")]
269 fn resolve_returns_route_name_not_message_key() {
270 let mut route_map = HashMap::new();
271 route_map.insert("events.land".into(), make_memory_sender());
272 let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
273
274 let (name, _) = sender.resolve("events.land").unwrap();
276 assert_eq!(name, "events.land");
277
278 let (name, _) = sender.resolve("arbitrary-user-key-12345").unwrap();
281 assert_eq!(name, "default");
282 }
283}