Skip to main content

hyperi_rustlib/transport/
routed.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/routed.rs
3// Purpose:   Per-key routing transport for data originators (receiver, fetcher)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Per-key routing transport for data originators.
10//!
11//! Routes `send(key, payload)` to different transport backends based on the
12//! key. Used by dfe-receiver and dfe-fetcher where data-based routing
13//! determines the destination (topic, endpoint, stream).
14//!
15//! All other DFE stages (transforms, loader, archiver) use simple 1:1
16//! transports and do NOT need this.
17//!
18//! # Config
19//!
20//! ```yaml
21//! transport:
22//!   output:
23//!     type: routed
24//!     default: kafka
25//!     routes:
26//!       events.land:
27//!         type: grpc
28//!         grpc:
29//!           endpoint: "http://loader-land:6000"
30//!       events.load:
31//!         type: kafka
32//!       audit.land:
33//!         type: grpc
34//!         grpc:
35//!           endpoint: "http://archiver:6000"
36//!     kafka:
37//!       brokers: ["kafka:9092"]
38//! ```
39//!
40//! # Usage
41//!
42//! ```rust,ignore
43//! let sender = RoutedSender::from_config("transport.output").await?;
44//! // Routes to different backends based on key
45//! sender.send("events.land", payload).await;  // → gRPC to loader-land
46//! sender.send("events.load", payload).await;  // → Kafka topic
47//! sender.send("audit.land", payload).await;   // → gRPC to archiver
48//! sender.send("unknown", payload).await;      // → default (Kafka)
49//! ```
50
51use std::collections::HashMap;
52
53use super::error::{TransportError, TransportResult};
54use super::factory::AnySender;
55use super::traits::{TransportBase, TransportSender};
56use super::types::SendResult;
57
58/// A routing transport that dispatches `send()` to different backends
59/// based on the key parameter.
60///
61/// Used by dfe-receiver and dfe-fetcher (data originators) where
62/// data-based routing determines the destination.
63pub struct RoutedSender {
64    /// Per-key route overrides.
65    routes: HashMap<String, AnySender>,
66    /// Default sender for keys not in the routes map.
67    default: Option<AnySender>,
68    closed: std::sync::atomic::AtomicBool,
69}
70
71impl RoutedSender {
72    /// Create a new routed sender with explicit routes and optional default.
73    pub fn new(routes: HashMap<String, AnySender>, default: Option<AnySender>) -> Self {
74        Self {
75            routes,
76            default,
77            closed: std::sync::atomic::AtomicBool::new(false),
78        }
79    }
80
81    /// Create from a map of key → `TransportConfig` plus a default config.
82    ///
83    /// Each route gets its own `AnySender` created from the corresponding config.
84    pub async fn from_route_configs(
85        routes: HashMap<String, super::TransportConfig>,
86        default_config: Option<super::TransportConfig>,
87    ) -> TransportResult<Self> {
88        let mut senders = HashMap::with_capacity(routes.len());
89        for (key, config) in routes {
90            let sender = AnySender::from_transport_config(&config).await?;
91            senders.insert(key, sender);
92        }
93
94        let default = match default_config {
95            Some(cfg) => Some(AnySender::from_transport_config(&cfg).await?),
96            None => None,
97        };
98
99        Ok(Self::new(senders, default))
100    }
101
102    /// Get the list of configured route keys.
103    #[must_use]
104    pub fn route_keys(&self) -> Vec<&str> {
105        self.routes.keys().map(String::as_str).collect()
106    }
107
108    /// Check if a specific route key is configured.
109    #[must_use]
110    pub fn has_route(&self, key: &str) -> bool {
111        self.routes.contains_key(key)
112    }
113
114    /// Check if a default fallback sender is configured.
115    #[must_use]
116    pub fn has_default(&self) -> bool {
117        self.default.is_some()
118    }
119
120    /// Resolve which route + sender handles a given key. Returns the
121    /// configured route name (or `"default"` for the fallback) so
122    /// metrics can label by route, not by per-message key (F7).
123    fn resolve(&self, key: &str) -> Option<(&str, &AnySender)> {
124        if let Some((name, sender)) = self.routes.get_key_value(key) {
125            Some((name.as_str(), sender))
126        } else {
127            self.default.as_ref().map(|s| ("default", s))
128        }
129    }
130}
131
132impl TransportBase for RoutedSender {
133    async fn close(&self) -> TransportResult<()> {
134        self.closed
135            .store(true, std::sync::atomic::Ordering::Relaxed);
136        // Close all route senders
137        for sender in self.routes.values() {
138            sender.close().await?;
139        }
140        if let Some(ref default) = self.default {
141            default.close().await?;
142        }
143        Ok(())
144    }
145
146    fn is_healthy(&self) -> bool {
147        if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
148            return false;
149        }
150        // Healthy if all configured senders are healthy
151        let routes_healthy = self.routes.values().all(|s| s.is_healthy());
152        let default_healthy = self.default.as_ref().is_none_or(|s| s.is_healthy());
153        routes_healthy && default_healthy
154    }
155
156    fn name(&self) -> &'static str {
157        "routed"
158    }
159}
160
161impl TransportSender for RoutedSender {
162    async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
163        if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
164            return SendResult::Fatal(TransportError::Closed);
165        }
166
167        let Some((route_name, sender)) = self.resolve(key) else {
168            return SendResult::Fatal(TransportError::Config(format!(
169                "no route configured for key '{key}' and no default sender"
170            )));
171        };
172        // F7: route label is the CONFIGURED route name (or
173        // "default"), not the per-message key. Cardinality is
174        // bounded by the routing table size, not by message count.
175        #[cfg(feature = "metrics")]
176        metrics::counter!(
177            "dfe_transport_sent_total",
178            "transport" => "routed",
179            "route" => route_name.to_string()
180        )
181        .increment(1);
182        #[cfg(not(feature = "metrics"))]
183        let _ = route_name;
184
185        sender.send(key, payload).await
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[cfg(feature = "transport-memory")]
194    use crate::transport::memory::{MemoryConfig, MemoryTransport};
195
196    #[cfg(feature = "transport-memory")]
197    fn make_memory_sender() -> AnySender {
198        AnySender::Memory(
199            MemoryTransport::new(&MemoryConfig::default())
200                .expect("memory transport with valid config must construct"),
201        )
202    }
203
204    #[tokio::test]
205    #[cfg(feature = "transport-memory")]
206    async fn routes_to_correct_sender() {
207        let mut route_map = HashMap::new();
208        route_map.insert("events.land".into(), make_memory_sender());
209        route_map.insert("events.load".into(), make_memory_sender());
210
211        let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
212
213        let result_land = sender.send("events.land", b"land-payload").await;
214        assert!(result_land.is_ok());
215
216        let result_load = sender.send("events.load", b"load-payload").await;
217        assert!(result_load.is_ok());
218
219        // Unknown key falls through to default
220        let result_default = sender.send("unknown.key", b"default-payload").await;
221        assert!(result_default.is_ok());
222
223        assert!(sender.is_healthy());
224        assert_eq!(sender.name(), "routed");
225    }
226
227    #[tokio::test]
228    async fn no_route_no_default_returns_fatal() {
229        let sender = RoutedSender::new(HashMap::new(), None);
230
231        let result = sender.send("unknown", b"payload").await;
232        assert!(result.is_fatal());
233    }
234
235    #[tokio::test]
236    #[cfg(feature = "transport-memory")]
237    async fn close_propagates_to_all_senders() {
238        let mut route_map = HashMap::new();
239        route_map.insert("a".into(), make_memory_sender());
240        let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
241
242        assert!(sender.is_healthy());
243        sender.close().await.unwrap();
244        assert!(!sender.is_healthy());
245    }
246
247    #[test]
248    fn route_keys_and_has_route() {
249        let sender = RoutedSender::new(HashMap::new(), None);
250        assert!(sender.route_keys().is_empty());
251        assert!(!sender.has_route("anything"));
252        assert!(!sender.has_default());
253    }
254
255    #[tokio::test]
256    async fn send_after_close_returns_fatal() {
257        let sender = RoutedSender::new(HashMap::new(), None);
258        sender.close().await.unwrap();
259
260        let result = sender.send("key", b"payload").await;
261        assert!(result.is_fatal());
262    }
263
264    /// Codex F7 regression: `resolve` returns the configured route
265    /// name (or `"default"`), not the per-message key. Metric labels
266    /// stay bounded by the routing table size, not by message count.
267    #[test]
268    #[cfg(feature = "transport-memory")]
269    fn resolve_returns_route_name_not_message_key() {
270        let mut route_map = HashMap::new();
271        route_map.insert("events.land".into(), make_memory_sender());
272        let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
273
274        // Match: route name equals the configured key.
275        let (name, _) = sender.resolve("events.land").unwrap();
276        assert_eq!(name, "events.land");
277
278        // Miss: falls through to "default" — bounded label, not the
279        // arbitrary inbound key.
280        let (name, _) = sender.resolve("arbitrary-user-key-12345").unwrap();
281        assert_eq!(name, "default");
282    }
283}