Skip to main content

hyperi_rustlib/transport/
routed.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/routed.rs
3// Purpose:   Per-key routing transport for data originators (receiver, fetcher)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Per-key routing transport for data originators.
10//!
11//! Routes `send(key, payload)` to different transport backends based on the
12//! key. Used by dfe-receiver and dfe-fetcher where data-based routing
13//! determines the destination (topic, endpoint, stream).
14//!
15//! All other DFE stages (transforms, loader, archiver) use simple 1:1
16//! transports and do NOT need this.
17//!
18//! # Config
19//!
20//! ```yaml
21//! transport:
22//!   output:
23//!     type: routed
24//!     default: kafka
25//!     routes:
26//!       events.land:
27//!         type: grpc
28//!         grpc:
29//!           endpoint: "http://loader-land:6000"
30//!       events.load:
31//!         type: kafka
32//!       audit.land:
33//!         type: grpc
34//!         grpc:
35//!           endpoint: "http://archiver:6000"
36//!     kafka:
37//!       brokers: ["kafka:9092"]
38//! ```
39//!
40//! # Usage
41//!
42//! ```rust,ignore
43//! let sender = RoutedSender::from_config("transport.output").await?;
44//! // Routes to different backends based on key
45//! sender.send("events.land", payload).await;  // → gRPC to loader-land
46//! sender.send("events.load", payload).await;  // → Kafka topic
47//! sender.send("audit.land", payload).await;   // → gRPC to archiver
48//! sender.send("unknown", payload).await;      // → default (Kafka)
49//! ```
50
51use std::collections::HashMap;
52
53use super::error::{TransportError, TransportResult};
54use super::factory::AnySender;
55use super::traits::{TransportBase, TransportSender};
56use super::types::SendResult;
57
58/// A routing transport that dispatches `send()` to different backends
59/// based on the key parameter.
60///
61/// Used by dfe-receiver and dfe-fetcher (data originators) where
62/// data-based routing determines the destination.
63pub struct RoutedSender {
64    /// Per-key route overrides.
65    routes: HashMap<String, AnySender>,
66    /// Default sender for keys not in the routes map.
67    default: Option<AnySender>,
68    closed: std::sync::atomic::AtomicBool,
69}
70
71impl RoutedSender {
72    /// Create a new routed sender with explicit routes and optional default.
73    #[must_use]
74    pub fn new(routes: HashMap<String, AnySender>, default: Option<AnySender>) -> Self {
75        Self {
76            routes,
77            default,
78            closed: std::sync::atomic::AtomicBool::new(false),
79        }
80    }
81
82    /// Create from a map of key → `TransportConfig` plus a default config.
83    ///
84    /// Each route gets its own `AnySender` created from the corresponding config.
85    pub async fn from_route_configs(
86        routes: HashMap<String, super::TransportConfig>,
87        default_config: Option<super::TransportConfig>,
88    ) -> TransportResult<Self> {
89        let mut senders = HashMap::with_capacity(routes.len());
90        for (key, config) in routes {
91            let sender = AnySender::from_transport_config(&config).await?;
92            senders.insert(key, sender);
93        }
94
95        let default = match default_config {
96            Some(cfg) => Some(AnySender::from_transport_config(&cfg).await?),
97            None => None,
98        };
99
100        Ok(Self::new(senders, default))
101    }
102
103    /// Get the list of configured route keys.
104    #[must_use]
105    pub fn route_keys(&self) -> Vec<&str> {
106        self.routes.keys().map(String::as_str).collect()
107    }
108
109    /// Check if a specific route key is configured.
110    #[must_use]
111    pub fn has_route(&self, key: &str) -> bool {
112        self.routes.contains_key(key)
113    }
114
115    /// Check if a default fallback sender is configured.
116    #[must_use]
117    pub fn has_default(&self) -> bool {
118        self.default.is_some()
119    }
120
121    /// Resolve which route + sender handles a given key. Returns the
122    /// configured route name (or `"default"` for the fallback) so
123    /// metrics can label by route, not by per-message key (F7).
124    fn resolve(&self, key: &str) -> Option<(&str, &AnySender)> {
125        if let Some((name, sender)) = self.routes.get_key_value(key) {
126            Some((name.as_str(), sender))
127        } else {
128            self.default.as_ref().map(|s| ("default", s))
129        }
130    }
131}
132
133impl TransportBase for RoutedSender {
134    async fn close(&self) -> TransportResult<()> {
135        self.closed
136            .store(true, std::sync::atomic::Ordering::Relaxed);
137        // Close all route senders
138        for sender in self.routes.values() {
139            sender.close().await?;
140        }
141        if let Some(ref default) = self.default {
142            default.close().await?;
143        }
144        Ok(())
145    }
146
147    fn is_healthy(&self) -> bool {
148        if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
149            return false;
150        }
151        // Healthy if all configured senders are healthy
152        let routes_healthy = self.routes.values().all(|s| s.is_healthy());
153        let default_healthy = self.default.as_ref().is_none_or(|s| s.is_healthy());
154        routes_healthy && default_healthy
155    }
156
157    fn name(&self) -> &'static str {
158        "routed"
159    }
160}
161
162impl TransportSender for RoutedSender {
163    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
164        if self.closed.load(std::sync::atomic::Ordering::Relaxed) {
165            return SendResult::Fatal(TransportError::Closed);
166        }
167
168        let Some((route_name, sender)) = self.resolve(key) else {
169            return SendResult::Fatal(TransportError::Config(format!(
170                "no route configured for key '{key}' and no default sender"
171            )));
172        };
173        // F7: route label is the CONFIGURED route name (or
174        // "default"), not the per-message key. Cardinality is
175        // bounded by the routing table size, not by message count.
176        #[cfg(feature = "metrics")]
177        metrics::counter!(
178            "dfe_transport_sent_total",
179            "transport" => "routed",
180            "route" => route_name.to_string()
181        )
182        .increment(1);
183        #[cfg(not(feature = "metrics"))]
184        let _ = route_name;
185
186        sender.send(key, payload).await
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[cfg(feature = "transport-memory")]
195    use crate::transport::memory::{MemoryConfig, MemoryTransport};
196
197    #[cfg(feature = "transport-memory")]
198    fn make_memory_sender() -> AnySender {
199        AnySender::Memory(
200            MemoryTransport::new(&MemoryConfig::default())
201                .expect("memory transport with valid config must construct"),
202        )
203    }
204
205    #[tokio::test]
206    #[cfg(feature = "transport-memory")]
207    async fn routes_to_correct_sender() {
208        let mut route_map = HashMap::new();
209        route_map.insert("events.land".into(), make_memory_sender());
210        route_map.insert("events.load".into(), make_memory_sender());
211
212        let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
213
214        let result_land = sender
215            .send("events.land", bytes::Bytes::from_static(b"land-payload"))
216            .await;
217        assert!(result_land.is_ok());
218
219        let result_load = sender
220            .send("events.load", bytes::Bytes::from_static(b"load-payload"))
221            .await;
222        assert!(result_load.is_ok());
223
224        // Unknown key falls through to default
225        let result_default = sender
226            .send("unknown.key", bytes::Bytes::from_static(b"default-payload"))
227            .await;
228        assert!(result_default.is_ok());
229
230        assert!(sender.is_healthy());
231        assert_eq!(sender.name(), "routed");
232    }
233
234    #[tokio::test]
235    async fn no_route_no_default_returns_fatal() {
236        let sender = RoutedSender::new(HashMap::new(), None);
237
238        let result = sender
239            .send("unknown", bytes::Bytes::from_static(b"payload"))
240            .await;
241        assert!(result.is_fatal());
242    }
243
244    #[tokio::test]
245    #[cfg(feature = "transport-memory")]
246    async fn close_propagates_to_all_senders() {
247        let mut route_map = HashMap::new();
248        route_map.insert("a".into(), make_memory_sender());
249        let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
250
251        assert!(sender.is_healthy());
252        sender.close().await.unwrap();
253        assert!(!sender.is_healthy());
254    }
255
256    #[test]
257    fn route_keys_and_has_route() {
258        let sender = RoutedSender::new(HashMap::new(), None);
259        assert!(sender.route_keys().is_empty());
260        assert!(!sender.has_route("anything"));
261        assert!(!sender.has_default());
262    }
263
264    #[tokio::test]
265    async fn send_after_close_returns_fatal() {
266        let sender = RoutedSender::new(HashMap::new(), None);
267        sender.close().await.unwrap();
268
269        let result = sender
270            .send("key", bytes::Bytes::from_static(b"payload"))
271            .await;
272        assert!(result.is_fatal());
273    }
274
275    /// Codex F7 regression: `resolve` returns the configured route
276    /// name (or `"default"`), not the per-message key. Metric labels
277    /// stay bounded by the routing table size, not by message count.
278    #[test]
279    #[cfg(feature = "transport-memory")]
280    fn resolve_returns_route_name_not_message_key() {
281        let mut route_map = HashMap::new();
282        route_map.insert("events.land".into(), make_memory_sender());
283        let sender = RoutedSender::new(route_map, Some(make_memory_sender()));
284
285        // Match: route name equals the configured key.
286        let (name, _) = sender.resolve("events.land").unwrap();
287        assert_eq!(name, "events.land");
288
289        // Miss: falls through to "default" — bounded label, not the
290        // arbitrary inbound key.
291        let (name, _) = sender.resolve("arbitrary-user-key-12345").unwrap();
292        assert_eq!(name, "default");
293    }
294}