hyperi_rustlib/transport/
factory.rs1use super::error::{TransportError, TransportResult};
33use super::traits::{TransportBase, TransportSender};
34use super::types::SendResult;
35#[cfg(any(
36 feature = "transport-kafka",
37 feature = "transport-grpc",
38 feature = "transport-memory",
39 feature = "transport-pipe",
40 feature = "transport-file",
41 feature = "transport-http",
42 feature = "transport-redis"
43))]
44use super::types::TransportType;
45
46pub enum AnySender {
54 #[cfg(feature = "transport-kafka")]
55 Kafka(super::kafka::KafkaTransport),
56
57 #[cfg(feature = "transport-grpc")]
58 Grpc(super::grpc::GrpcTransport),
59
60 #[cfg(feature = "transport-memory")]
61 Memory(super::memory::MemoryTransport),
62
63 #[cfg(feature = "transport-pipe")]
64 Pipe(super::pipe::PipeTransport),
65
66 #[cfg(feature = "transport-file")]
67 File(super::file::FileTransport),
68
69 #[cfg(feature = "transport-http")]
70 Http(super::http::HttpTransport),
71
72 #[cfg(feature = "transport-redis")]
73 Redis(super::redis_transport::RedisTransport),
74}
75
76impl TransportBase for AnySender {
77 async fn close(&self) -> TransportResult<()> {
78 match self {
79 #[cfg(feature = "transport-kafka")]
80 Self::Kafka(t) => t.close().await,
81 #[cfg(feature = "transport-grpc")]
82 Self::Grpc(t) => t.close().await,
83 #[cfg(feature = "transport-memory")]
84 Self::Memory(t) => t.close().await,
85 #[cfg(feature = "transport-pipe")]
86 Self::Pipe(t) => t.close().await,
87 #[cfg(feature = "transport-file")]
88 Self::File(t) => t.close().await,
89 #[cfg(feature = "transport-http")]
90 Self::Http(t) => t.close().await,
91 #[cfg(feature = "transport-redis")]
92 Self::Redis(t) => t.close().await,
93 #[allow(unreachable_patterns)]
94 _ => Err(TransportError::Config(
95 "no transport variant enabled".into(),
96 )),
97 }
98 }
99
100 fn is_healthy(&self) -> bool {
101 match self {
102 #[cfg(feature = "transport-kafka")]
103 Self::Kafka(t) => t.is_healthy(),
104 #[cfg(feature = "transport-grpc")]
105 Self::Grpc(t) => t.is_healthy(),
106 #[cfg(feature = "transport-memory")]
107 Self::Memory(t) => t.is_healthy(),
108 #[cfg(feature = "transport-pipe")]
109 Self::Pipe(t) => t.is_healthy(),
110 #[cfg(feature = "transport-file")]
111 Self::File(t) => t.is_healthy(),
112 #[cfg(feature = "transport-http")]
113 Self::Http(t) => t.is_healthy(),
114 #[cfg(feature = "transport-redis")]
115 Self::Redis(t) => t.is_healthy(),
116 #[allow(unreachable_patterns)]
117 _ => false,
118 }
119 }
120
121 fn name(&self) -> &'static str {
122 match self {
123 #[cfg(feature = "transport-kafka")]
124 Self::Kafka(t) => t.name(),
125 #[cfg(feature = "transport-grpc")]
126 Self::Grpc(t) => t.name(),
127 #[cfg(feature = "transport-memory")]
128 Self::Memory(t) => t.name(),
129 #[cfg(feature = "transport-pipe")]
130 Self::Pipe(t) => t.name(),
131 #[cfg(feature = "transport-file")]
132 Self::File(t) => t.name(),
133 #[cfg(feature = "transport-http")]
134 Self::Http(t) => t.name(),
135 #[cfg(feature = "transport-redis")]
136 Self::Redis(t) => t.name(),
137 #[allow(unreachable_patterns)]
138 _ => "none",
139 }
140 }
141}
142
143impl TransportSender for AnySender {
144 #[cfg_attr(
145 not(any(
146 feature = "transport-kafka",
147 feature = "transport-grpc",
148 feature = "transport-memory",
149 feature = "transport-pipe",
150 feature = "transport-file",
151 feature = "transport-http",
152 feature = "transport-redis"
153 )),
154 allow(unused_variables)
155 )]
156 async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
157 match self {
158 #[cfg(feature = "transport-kafka")]
159 Self::Kafka(t) => t.send(key, payload).await,
160 #[cfg(feature = "transport-grpc")]
161 Self::Grpc(t) => t.send(key, payload).await,
162 #[cfg(feature = "transport-memory")]
163 Self::Memory(t) => t.send(key, payload).await,
164 #[cfg(feature = "transport-pipe")]
165 Self::Pipe(t) => t.send(key, payload).await,
166 #[cfg(feature = "transport-file")]
167 Self::File(t) => t.send(key, payload).await,
168 #[cfg(feature = "transport-http")]
169 Self::Http(t) => t.send(key, payload).await,
170 #[cfg(feature = "transport-redis")]
171 Self::Redis(t) => t.send(key, payload).await,
172 #[allow(unreachable_patterns)]
173 _ => SendResult::Fatal(TransportError::Config(
174 "no transport variant enabled".into(),
175 )),
176 }
177 }
178}
179
180impl AnySender {
181 pub async fn from_config(key: &str) -> TransportResult<Self> {
200 #[cfg(feature = "config")]
201 let config = {
202 let cfg = crate::config::try_get()
203 .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
204 cfg.unmarshal_key::<super::TransportConfig>(key)
205 .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
206 };
207
208 #[cfg(not(feature = "config"))]
209 let config = {
210 let _ = key;
211 super::TransportConfig::default()
212 };
213
214 Self::from_transport_config(&config).await
215 }
216
217 pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
219 match config.transport_type {
220 #[cfg(feature = "transport-kafka")]
221 TransportType::Kafka => {
222 let kafka_config = config
223 .kafka
224 .as_ref()
225 .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
226 let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
227 Ok(Self::Kafka(transport))
228 }
229
230 #[cfg(feature = "transport-grpc")]
231 TransportType::Grpc => {
232 let grpc_config = config
233 .grpc
234 .as_ref()
235 .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
236 let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
237 Ok(Self::Grpc(transport))
238 }
239
240 #[cfg(feature = "transport-memory")]
241 TransportType::Memory => {
242 let memory_config = config.memory.clone().unwrap_or_default();
243 let transport = super::memory::MemoryTransport::new(&memory_config)?;
244 Ok(Self::Memory(transport))
245 }
246
247 #[cfg(feature = "transport-pipe")]
248 TransportType::Pipe => {
249 let pipe_config = config.pipe.clone().unwrap_or_default();
250 let transport = super::pipe::PipeTransport::new(&pipe_config);
251 Ok(Self::Pipe(transport))
252 }
253
254 #[cfg(feature = "transport-file")]
255 TransportType::File => {
256 let file_config = config
257 .file
258 .as_ref()
259 .ok_or_else(|| TransportError::Config("file config missing".into()))?;
260 let transport = super::file::FileTransport::new(file_config).await?;
261 Ok(Self::File(transport))
262 }
263
264 #[cfg(feature = "transport-http")]
265 TransportType::Http => {
266 let http_config = config
267 .http
268 .as_ref()
269 .ok_or_else(|| TransportError::Config("http config missing".into()))?;
270 let transport = super::http::HttpTransport::new(http_config).await?;
271 Ok(Self::Http(transport))
272 }
273
274 #[cfg(feature = "transport-redis")]
275 TransportType::Redis => {
276 let redis_config = config
277 .redis
278 .as_ref()
279 .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
280 let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
281 Ok(Self::Redis(transport))
282 }
283
284 #[allow(unreachable_patterns)]
286 other => Err(TransportError::Config(format!(
287 "transport type '{other}' is not available (feature not enabled or not yet implemented)"
288 ))),
289 }
290 }
291}