Skip to main content

hyperi_rustlib/transport/
factory.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/factory.rs
3// Purpose:   Transport factory -- create senders from config
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Transport factory for runtime transport selection.
10//!
11//! Creates transport senders from configuration, enabling apps to swap
12//! between Kafka, gRPC, file, pipe, HTTP, or Redis via config change.
13//!
14//! # Usage
15//!
16//! ```yaml
17//! # settings.yaml
18//! transport:
19//!   output:
20//!     type: kafka
21//!     kafka:
22//!       brokers: ["kafka:9092"]
23//! ```
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::factory::AnySender;
27//!
28//! let sender = AnySender::from_config("transport.output").await?;
29//! sender.send("events.land", payload).await;
30//! ```
31
32use super::error::{TransportError, TransportResult};
33use super::traits::{TransportBase, TransportSender};
34use super::types::SendResult;
35#[cfg(any(
36    feature = "transport-kafka",
37    feature = "transport-grpc",
38    feature = "transport-memory",
39    feature = "transport-pipe",
40    feature = "transport-file",
41    feature = "transport-http",
42    feature = "transport-redis"
43))]
44use super::types::TransportType;
45
46/// Type-erased transport sender.
47///
48/// Wraps any concrete transport sender behind an enum for runtime
49/// dispatch. Created by the transport factory from config.
50///
51/// Uses enum dispatch (not trait objects) because `TransportSender`
52/// has `impl Future` return types which prevent `dyn` dispatch.
53pub enum AnySender {
54    #[cfg(feature = "transport-kafka")]
55    Kafka(super::kafka::KafkaTransport),
56
57    #[cfg(feature = "transport-grpc")]
58    Grpc(super::grpc::GrpcTransport),
59
60    #[cfg(feature = "transport-memory")]
61    Memory(super::memory::MemoryTransport),
62
63    #[cfg(feature = "transport-pipe")]
64    Pipe(super::pipe::PipeTransport),
65
66    #[cfg(feature = "transport-file")]
67    File(super::file::FileTransport),
68
69    #[cfg(feature = "transport-http")]
70    Http(super::http::HttpTransport),
71
72    #[cfg(feature = "transport-redis")]
73    Redis(super::redis_transport::RedisTransport),
74}
75
76impl TransportBase for AnySender {
77    async fn close(&self) -> TransportResult<()> {
78        match self {
79            #[cfg(feature = "transport-kafka")]
80            Self::Kafka(t) => t.close().await,
81            #[cfg(feature = "transport-grpc")]
82            Self::Grpc(t) => t.close().await,
83            #[cfg(feature = "transport-memory")]
84            Self::Memory(t) => t.close().await,
85            #[cfg(feature = "transport-pipe")]
86            Self::Pipe(t) => t.close().await,
87            #[cfg(feature = "transport-file")]
88            Self::File(t) => t.close().await,
89            #[cfg(feature = "transport-http")]
90            Self::Http(t) => t.close().await,
91            #[cfg(feature = "transport-redis")]
92            Self::Redis(t) => t.close().await,
93            #[allow(unreachable_patterns)]
94            _ => Err(TransportError::Config(
95                "no transport variant enabled".into(),
96            )),
97        }
98    }
99
100    fn is_healthy(&self) -> bool {
101        match self {
102            #[cfg(feature = "transport-kafka")]
103            Self::Kafka(t) => t.is_healthy(),
104            #[cfg(feature = "transport-grpc")]
105            Self::Grpc(t) => t.is_healthy(),
106            #[cfg(feature = "transport-memory")]
107            Self::Memory(t) => t.is_healthy(),
108            #[cfg(feature = "transport-pipe")]
109            Self::Pipe(t) => t.is_healthy(),
110            #[cfg(feature = "transport-file")]
111            Self::File(t) => t.is_healthy(),
112            #[cfg(feature = "transport-http")]
113            Self::Http(t) => t.is_healthy(),
114            #[cfg(feature = "transport-redis")]
115            Self::Redis(t) => t.is_healthy(),
116            #[allow(unreachable_patterns)]
117            _ => false,
118        }
119    }
120
121    fn name(&self) -> &'static str {
122        match self {
123            #[cfg(feature = "transport-kafka")]
124            Self::Kafka(t) => t.name(),
125            #[cfg(feature = "transport-grpc")]
126            Self::Grpc(t) => t.name(),
127            #[cfg(feature = "transport-memory")]
128            Self::Memory(t) => t.name(),
129            #[cfg(feature = "transport-pipe")]
130            Self::Pipe(t) => t.name(),
131            #[cfg(feature = "transport-file")]
132            Self::File(t) => t.name(),
133            #[cfg(feature = "transport-http")]
134            Self::Http(t) => t.name(),
135            #[cfg(feature = "transport-redis")]
136            Self::Redis(t) => t.name(),
137            #[allow(unreachable_patterns)]
138            _ => "none",
139        }
140    }
141}
142
143impl TransportSender for AnySender {
144    #[cfg_attr(
145        not(any(
146            feature = "transport-kafka",
147            feature = "transport-grpc",
148            feature = "transport-memory",
149            feature = "transport-pipe",
150            feature = "transport-file",
151            feature = "transport-http",
152            feature = "transport-redis"
153        )),
154        allow(unused_variables)
155    )]
156    async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
157        match self {
158            #[cfg(feature = "transport-kafka")]
159            Self::Kafka(t) => t.send(key, payload).await,
160            #[cfg(feature = "transport-grpc")]
161            Self::Grpc(t) => t.send(key, payload).await,
162            #[cfg(feature = "transport-memory")]
163            Self::Memory(t) => t.send(key, payload).await,
164            #[cfg(feature = "transport-pipe")]
165            Self::Pipe(t) => t.send(key, payload).await,
166            #[cfg(feature = "transport-file")]
167            Self::File(t) => t.send(key, payload).await,
168            #[cfg(feature = "transport-http")]
169            Self::Http(t) => t.send(key, payload).await,
170            #[cfg(feature = "transport-redis")]
171            Self::Redis(t) => t.send(key, payload).await,
172            #[allow(unreachable_patterns)]
173            _ => SendResult::Fatal(TransportError::Config(
174                "no transport variant enabled".into(),
175            )),
176        }
177    }
178}
179
180impl AnySender {
181    /// Create a sender from config cascade.
182    ///
183    /// Reads the transport config from the given key in the config
184    /// cascade and creates the appropriate sender.
185    ///
186    /// # Example config
187    ///
188    /// ```yaml
189    /// transport:
190    ///   output:
191    ///     type: kafka
192    ///     kafka:
193    ///       brokers: ["kafka:9092"]
194    /// ```
195    ///
196    /// ```rust,ignore
197    /// let sender = AnySender::from_config("transport.output").await?;
198    /// ```
199    pub async fn from_config(key: &str) -> TransportResult<Self> {
200        #[cfg(feature = "config")]
201        let config = {
202            let cfg = crate::config::try_get()
203                .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
204            cfg.unmarshal_key::<super::TransportConfig>(key)
205                .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
206        };
207
208        #[cfg(not(feature = "config"))]
209        let config = {
210            let _ = key;
211            super::TransportConfig::default()
212        };
213
214        Self::from_transport_config(&config).await
215    }
216
217    /// Create a sender from an explicit `TransportConfig`.
218    pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
219        match config.transport_type {
220            #[cfg(feature = "transport-kafka")]
221            TransportType::Kafka => {
222                let kafka_config = config
223                    .kafka
224                    .as_ref()
225                    .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
226                let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
227                Ok(Self::Kafka(transport))
228            }
229
230            #[cfg(feature = "transport-grpc")]
231            TransportType::Grpc => {
232                let grpc_config = config
233                    .grpc
234                    .as_ref()
235                    .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
236                let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
237                Ok(Self::Grpc(transport))
238            }
239
240            #[cfg(feature = "transport-memory")]
241            TransportType::Memory => {
242                let memory_config = config.memory.clone().unwrap_or_default();
243                let transport = super::memory::MemoryTransport::new(&memory_config)?;
244                Ok(Self::Memory(transport))
245            }
246
247            #[cfg(feature = "transport-pipe")]
248            TransportType::Pipe => {
249                let pipe_config = config.pipe.clone().unwrap_or_default();
250                let transport = super::pipe::PipeTransport::new(&pipe_config);
251                Ok(Self::Pipe(transport))
252            }
253
254            #[cfg(feature = "transport-file")]
255            TransportType::File => {
256                let file_config = config
257                    .file
258                    .as_ref()
259                    .ok_or_else(|| TransportError::Config("file config missing".into()))?;
260                let transport = super::file::FileTransport::new(file_config).await?;
261                Ok(Self::File(transport))
262            }
263
264            #[cfg(feature = "transport-http")]
265            TransportType::Http => {
266                let http_config = config
267                    .http
268                    .as_ref()
269                    .ok_or_else(|| TransportError::Config("http config missing".into()))?;
270                let transport = super::http::HttpTransport::new(http_config).await?;
271                Ok(Self::Http(transport))
272            }
273
274            #[cfg(feature = "transport-redis")]
275            TransportType::Redis => {
276                let redis_config = config
277                    .redis
278                    .as_ref()
279                    .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
280                let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
281                Ok(Self::Redis(transport))
282            }
283
284            // Transport types for modules not yet implemented
285            #[allow(unreachable_patterns)]
286            other => Err(TransportError::Config(format!(
287                "transport type '{other}' is not available (feature not enabled or not yet implemented)"
288            ))),
289        }
290    }
291}