Skip to main content

hyperi_rustlib/transport/
factory.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/factory.rs
3// Purpose:   Transport factory -- create senders from config
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Transport factory for runtime transport selection.
10//!
11//! Creates transport senders from configuration, enabling apps to swap
12//! between Kafka, gRPC, file, pipe, HTTP, or Redis via config change.
13//!
14//! # Usage
15//!
16//! ```yaml
17//! # settings.yaml
18//! transport:
19//!   output:
20//!     type: kafka
21//!     kafka:
22//!       brokers: ["kafka:9092"]
23//! ```
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::factory::AnySender;
27//!
28//! let sender = AnySender::from_config("transport.output").await?;
29//! sender.send("events.land", payload).await;
30//! ```
31
32use super::error::{TransportError, TransportResult};
33use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
34use super::types::SendResult;
35#[cfg(any(
36    feature = "transport-kafka",
37    feature = "transport-grpc",
38    feature = "transport-memory",
39    feature = "transport-pipe",
40    feature = "transport-file",
41    feature = "transport-http",
42    feature = "transport-redis"
43))]
44use super::types::TransportType;
45use super::work_batch::{Record, WorkBatch};
46
47/// Type-erased transport sender, created by the factory from config.
48///
49/// Enum dispatch, not trait objects: `TransportSender` has `impl Future`
50/// returns which prevent `dyn` dispatch.
51pub enum AnySender {
52    #[cfg(feature = "transport-kafka")]
53    Kafka(super::kafka::KafkaTransport),
54
55    #[cfg(feature = "transport-grpc")]
56    Grpc(super::grpc::GrpcTransport),
57
58    #[cfg(feature = "transport-memory")]
59    Memory(super::memory::MemoryTransport),
60
61    #[cfg(feature = "transport-pipe")]
62    Pipe(super::pipe::PipeTransport),
63
64    #[cfg(feature = "transport-file")]
65    File(super::file::FileTransport),
66
67    #[cfg(feature = "transport-http")]
68    Http(super::http::HttpTransport),
69
70    #[cfg(feature = "transport-redis")]
71    Redis(super::redis_transport::RedisTransport),
72}
73
74impl TransportBase for AnySender {
75    async fn close(&self) -> TransportResult<()> {
76        match self {
77            #[cfg(feature = "transport-kafka")]
78            Self::Kafka(t) => t.close().await,
79            #[cfg(feature = "transport-grpc")]
80            Self::Grpc(t) => t.close().await,
81            #[cfg(feature = "transport-memory")]
82            Self::Memory(t) => t.close().await,
83            #[cfg(feature = "transport-pipe")]
84            Self::Pipe(t) => t.close().await,
85            #[cfg(feature = "transport-file")]
86            Self::File(t) => t.close().await,
87            #[cfg(feature = "transport-http")]
88            Self::Http(t) => t.close().await,
89            #[cfg(feature = "transport-redis")]
90            Self::Redis(t) => t.close().await,
91            #[allow(unreachable_patterns)]
92            _ => Err(TransportError::Config(
93                "no transport variant enabled".into(),
94            )),
95        }
96    }
97
98    fn is_healthy(&self) -> bool {
99        match self {
100            #[cfg(feature = "transport-kafka")]
101            Self::Kafka(t) => t.is_healthy(),
102            #[cfg(feature = "transport-grpc")]
103            Self::Grpc(t) => t.is_healthy(),
104            #[cfg(feature = "transport-memory")]
105            Self::Memory(t) => t.is_healthy(),
106            #[cfg(feature = "transport-pipe")]
107            Self::Pipe(t) => t.is_healthy(),
108            #[cfg(feature = "transport-file")]
109            Self::File(t) => t.is_healthy(),
110            #[cfg(feature = "transport-http")]
111            Self::Http(t) => t.is_healthy(),
112            #[cfg(feature = "transport-redis")]
113            Self::Redis(t) => t.is_healthy(),
114            #[allow(unreachable_patterns)]
115            _ => false,
116        }
117    }
118
119    fn name(&self) -> &'static str {
120        match self {
121            #[cfg(feature = "transport-kafka")]
122            Self::Kafka(t) => t.name(),
123            #[cfg(feature = "transport-grpc")]
124            Self::Grpc(t) => t.name(),
125            #[cfg(feature = "transport-memory")]
126            Self::Memory(t) => t.name(),
127            #[cfg(feature = "transport-pipe")]
128            Self::Pipe(t) => t.name(),
129            #[cfg(feature = "transport-file")]
130            Self::File(t) => t.name(),
131            #[cfg(feature = "transport-http")]
132            Self::Http(t) => t.name(),
133            #[cfg(feature = "transport-redis")]
134            Self::Redis(t) => t.name(),
135            #[allow(unreachable_patterns)]
136            _ => "none",
137        }
138    }
139}
140
141impl TransportSender for AnySender {
142    #[cfg_attr(
143        not(any(
144            feature = "transport-kafka",
145            feature = "transport-grpc",
146            feature = "transport-memory",
147            feature = "transport-pipe",
148            feature = "transport-file",
149            feature = "transport-http",
150            feature = "transport-redis"
151        )),
152        allow(unused_variables)
153    )]
154    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
155        match self {
156            #[cfg(feature = "transport-kafka")]
157            Self::Kafka(t) => t.send(key, payload).await,
158            #[cfg(feature = "transport-grpc")]
159            Self::Grpc(t) => t.send(key, payload).await,
160            #[cfg(feature = "transport-memory")]
161            Self::Memory(t) => t.send(key, payload).await,
162            #[cfg(feature = "transport-pipe")]
163            Self::Pipe(t) => t.send(key, payload).await,
164            #[cfg(feature = "transport-file")]
165            Self::File(t) => t.send(key, payload).await,
166            #[cfg(feature = "transport-http")]
167            Self::Http(t) => t.send(key, payload).await,
168            #[cfg(feature = "transport-redis")]
169            Self::Redis(t) => t.send(key, payload).await,
170            #[allow(unreachable_patterns)]
171            _ => SendResult::Fatal(TransportError::Config(
172                "no transport variant enabled".into(),
173            )),
174        }
175    }
176
177    /// Forward [`send_batch`](TransportSender::send_batch) to the active
178    /// backend. gRPC uses its native single-RPC `RouteBatch` override; every
179    /// other backend uses the trait's per-record default (see the at-least-once
180    /// partial-send caveat on the trait method).
181    #[cfg_attr(
182        not(any(
183            feature = "transport-kafka",
184            feature = "transport-grpc",
185            feature = "transport-memory",
186            feature = "transport-pipe",
187            feature = "transport-file",
188            feature = "transport-http",
189            feature = "transport-redis"
190        )),
191        allow(unused_variables)
192    )]
193    async fn send_batch(&self, records: &[Record]) -> SendResult {
194        match self {
195            #[cfg(feature = "transport-kafka")]
196            Self::Kafka(t) => t.send_batch(records).await,
197            #[cfg(feature = "transport-grpc")]
198            Self::Grpc(t) => t.send_batch(records).await,
199            #[cfg(feature = "transport-memory")]
200            Self::Memory(t) => t.send_batch(records).await,
201            #[cfg(feature = "transport-pipe")]
202            Self::Pipe(t) => t.send_batch(records).await,
203            #[cfg(feature = "transport-file")]
204            Self::File(t) => t.send_batch(records).await,
205            #[cfg(feature = "transport-http")]
206            Self::Http(t) => t.send_batch(records).await,
207            #[cfg(feature = "transport-redis")]
208            Self::Redis(t) => t.send_batch(records).await,
209            #[allow(unreachable_patterns)]
210            _ => SendResult::Fatal(TransportError::Config(
211                "no transport variant enabled".into(),
212            )),
213        }
214    }
215}
216
217impl AnySender {
218    /// Create a sender from config cascade.
219    ///
220    /// Reads the transport config from the given key in the config
221    /// cascade and creates the appropriate sender.
222    ///
223    /// # Example config
224    ///
225    /// ```yaml
226    /// transport:
227    ///   output:
228    ///     type: kafka
229    ///     kafka:
230    ///       brokers: ["kafka:9092"]
231    /// ```
232    ///
233    /// ```rust,ignore
234    /// let sender = AnySender::from_config("transport.output").await?;
235    /// ```
236    pub async fn from_config(key: &str) -> TransportResult<Self> {
237        #[cfg(feature = "config")]
238        let config = read_transport_config(key)?;
239
240        #[cfg(not(feature = "config"))]
241        let config = {
242            let _ = key;
243            super::TransportConfig::default()
244        };
245
246        Self::from_transport_config(&config).await
247    }
248
249    /// Create a sender from an explicit `TransportConfig`.
250    pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
251        match config.transport_type {
252            #[cfg(feature = "transport-kafka")]
253            TransportType::Kafka => {
254                let kafka_config = config
255                    .kafka
256                    .as_ref()
257                    .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
258                let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
259                Ok(Self::Kafka(transport))
260            }
261
262            #[cfg(feature = "transport-grpc")]
263            TransportType::Grpc => {
264                let grpc_config = config
265                    .grpc
266                    .as_ref()
267                    .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
268                let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
269                Ok(Self::Grpc(transport))
270            }
271
272            #[cfg(feature = "transport-memory")]
273            TransportType::Memory => {
274                let memory_config = config.memory.clone().unwrap_or_default();
275                let transport = super::memory::MemoryTransport::new(&memory_config)?;
276                Ok(Self::Memory(transport))
277            }
278
279            #[cfg(feature = "transport-pipe")]
280            TransportType::Pipe => {
281                let pipe_config = config.pipe.clone().unwrap_or_default();
282                let transport = super::pipe::PipeTransport::new(&pipe_config);
283                Ok(Self::Pipe(transport))
284            }
285
286            #[cfg(feature = "transport-file")]
287            TransportType::File => {
288                let file_config = config
289                    .file
290                    .as_ref()
291                    .ok_or_else(|| TransportError::Config("file config missing".into()))?;
292                let transport = super::file::FileTransport::new(file_config).await?;
293                Ok(Self::File(transport))
294            }
295
296            #[cfg(feature = "transport-http")]
297            TransportType::Http => {
298                let http_config = config
299                    .http
300                    .as_ref()
301                    .ok_or_else(|| TransportError::Config("http config missing".into()))?;
302                let transport = super::http::HttpTransport::new(http_config).await?;
303                Ok(Self::Http(transport))
304            }
305
306            #[cfg(feature = "transport-redis")]
307            TransportType::Redis => {
308                let redis_config = config
309                    .redis
310                    .as_ref()
311                    .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
312                let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
313                Ok(Self::Redis(transport))
314            }
315
316            // Transport types for modules not yet implemented
317            #[allow(unreachable_patterns)]
318            other => Err(TransportError::Config(format!(
319                "transport type '{other}' is not available (feature not enabled or not yet implemented)"
320            ))),
321        }
322    }
323}
324
325// ---------------------------------------------------------------------------
326// AnyToken -- type-erased commit token, one variant per enabled backend.
327// ---------------------------------------------------------------------------
328
329/// Type-erased commit token produced by [`AnyReceiver`].
330///
331/// Wraps each backend's concrete token in a matching variant so
332/// `AnyReceiver::commit` routes tokens back to the correct backend with no heap
333/// allocation or trait objects. Variant set mirrors the enabled feature flags.
334/// `commit` skips tokens whose variant does not match the active backend
335/// (defensive; cannot occur in practice -- tokens come from the same receiver).
336///
337/// `#[non_exhaustive]`: adding a backend variant is not a breaking change.
338/// Downstream `match` on `AnyToken` must include a wildcard arm.
339#[derive(Debug, Clone)]
340#[non_exhaustive]
341pub enum AnyToken {
342    #[cfg(feature = "transport-kafka")]
343    /// Kafka consumer offset token.
344    Kafka(super::kafka::KafkaToken),
345
346    #[cfg(feature = "transport-grpc")]
347    /// gRPC no-op sequence token.
348    Grpc(super::grpc::GrpcToken),
349
350    #[cfg(feature = "transport-memory")]
351    /// In-memory sequence token.
352    Memory(super::memory::MemoryToken),
353
354    #[cfg(feature = "transport-pipe")]
355    /// Pipe sequence token.
356    Pipe(super::pipe::PipeToken),
357
358    #[cfg(feature = "transport-file")]
359    /// File byte-offset token.
360    File(super::file::FileToken),
361
362    #[cfg(feature = "transport-http")]
363    /// HTTP sequence token.
364    Http(super::http::HttpToken),
365
366    #[cfg(feature = "transport-redis")]
367    /// Redis XACK entry token.
368    Redis(super::redis_transport::RedisToken),
369}
370
371impl std::fmt::Display for AnyToken {
372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373        match self {
374            #[cfg(feature = "transport-kafka")]
375            Self::Kafka(t) => std::fmt::Display::fmt(t, f),
376            #[cfg(feature = "transport-grpc")]
377            Self::Grpc(t) => std::fmt::Display::fmt(t, f),
378            #[cfg(feature = "transport-memory")]
379            Self::Memory(t) => std::fmt::Display::fmt(t, f),
380            #[cfg(feature = "transport-pipe")]
381            Self::Pipe(t) => std::fmt::Display::fmt(t, f),
382            #[cfg(feature = "transport-file")]
383            Self::File(t) => std::fmt::Display::fmt(t, f),
384            #[cfg(feature = "transport-http")]
385            Self::Http(t) => std::fmt::Display::fmt(t, f),
386            #[cfg(feature = "transport-redis")]
387            Self::Redis(t) => std::fmt::Display::fmt(t, f),
388            #[allow(unreachable_patterns)]
389            _ => write!(f, "none"),
390        }
391    }
392}
393
394impl CommitToken for AnyToken {}
395
396// ---------------------------------------------------------------------------
397// AnyReceiver -- type-erased transport receiver, mirroring AnySender.
398// ---------------------------------------------------------------------------
399
400/// Type-erased transport receiver, created by the factory, mirroring
401/// [`AnySender`].
402///
403/// Enum dispatch, not trait objects: [`TransportReceiver`] has `impl Future`
404/// returns and an associated `Token` type, both of which prevent `dyn`.
405/// [`AnyReceiver::recv`] wraps each backend token in the matching [`AnyToken`]
406/// variant; [`AnyReceiver::commit`] forwards the inner tokens to that backend
407/// (variant mismatches skipped defensively -- cannot legitimately appear).
408pub enum AnyReceiver {
409    #[cfg(feature = "transport-kafka")]
410    Kafka(super::kafka::KafkaTransport),
411
412    #[cfg(feature = "transport-grpc")]
413    Grpc(super::grpc::GrpcTransport),
414
415    #[cfg(feature = "transport-memory")]
416    Memory(super::memory::MemoryTransport),
417
418    #[cfg(feature = "transport-pipe")]
419    Pipe(super::pipe::PipeTransport),
420
421    #[cfg(feature = "transport-file")]
422    File(super::file::FileTransport),
423
424    #[cfg(feature = "transport-http")]
425    Http(super::http::HttpTransport),
426
427    #[cfg(feature = "transport-redis")]
428    Redis(super::redis_transport::RedisTransport),
429}
430
431impl TransportBase for AnyReceiver {
432    async fn close(&self) -> TransportResult<()> {
433        match self {
434            #[cfg(feature = "transport-kafka")]
435            Self::Kafka(t) => t.close().await,
436            #[cfg(feature = "transport-grpc")]
437            Self::Grpc(t) => t.close().await,
438            #[cfg(feature = "transport-memory")]
439            Self::Memory(t) => t.close().await,
440            #[cfg(feature = "transport-pipe")]
441            Self::Pipe(t) => t.close().await,
442            #[cfg(feature = "transport-file")]
443            Self::File(t) => t.close().await,
444            #[cfg(feature = "transport-http")]
445            Self::Http(t) => t.close().await,
446            #[cfg(feature = "transport-redis")]
447            Self::Redis(t) => t.close().await,
448            #[allow(unreachable_patterns)]
449            _ => Err(TransportError::Config(
450                "no transport variant enabled".into(),
451            )),
452        }
453    }
454
455    fn is_healthy(&self) -> bool {
456        match self {
457            #[cfg(feature = "transport-kafka")]
458            Self::Kafka(t) => t.is_healthy(),
459            #[cfg(feature = "transport-grpc")]
460            Self::Grpc(t) => t.is_healthy(),
461            #[cfg(feature = "transport-memory")]
462            Self::Memory(t) => t.is_healthy(),
463            #[cfg(feature = "transport-pipe")]
464            Self::Pipe(t) => t.is_healthy(),
465            #[cfg(feature = "transport-file")]
466            Self::File(t) => t.is_healthy(),
467            #[cfg(feature = "transport-http")]
468            Self::Http(t) => t.is_healthy(),
469            #[cfg(feature = "transport-redis")]
470            Self::Redis(t) => t.is_healthy(),
471            #[allow(unreachable_patterns)]
472            _ => false,
473        }
474    }
475
476    fn name(&self) -> &'static str {
477        match self {
478            #[cfg(feature = "transport-kafka")]
479            Self::Kafka(t) => t.name(),
480            #[cfg(feature = "transport-grpc")]
481            Self::Grpc(t) => t.name(),
482            #[cfg(feature = "transport-memory")]
483            Self::Memory(t) => t.name(),
484            #[cfg(feature = "transport-pipe")]
485            Self::Pipe(t) => t.name(),
486            #[cfg(feature = "transport-file")]
487            Self::File(t) => t.name(),
488            #[cfg(feature = "transport-http")]
489            Self::Http(t) => t.name(),
490            #[cfg(feature = "transport-redis")]
491            Self::Redis(t) => t.name(),
492            #[allow(unreachable_patterns)]
493            _ => "none",
494        }
495    }
496}
497
498/// Map a backend's `WorkBatch<BackendToken>` into `WorkBatch<AnyToken>` using
499/// the provided variant constructor.  Each `commit_tokens` entry is wrapped in
500/// the matching [`AnyToken`] variant; `records` and `dlq_entries` move straight
501/// through (the record payload `Bytes` is a refcount bump, never a copy).
502#[cfg(any(
503    feature = "transport-kafka",
504    feature = "transport-grpc",
505    feature = "transport-memory",
506    feature = "transport-pipe",
507    feature = "transport-file",
508    feature = "transport-http",
509    feature = "transport-redis"
510))]
511fn wrap_batch<B: CommitToken>(
512    batch: WorkBatch<B>,
513    wrap_token: impl Fn(B) -> AnyToken,
514) -> WorkBatch<AnyToken> {
515    let commit_tokens = batch.commit_tokens.into_iter().map(wrap_token).collect();
516    WorkBatch::new(batch.records, commit_tokens).with_dlq_entries(batch.dlq_entries)
517}
518
519impl TransportReceiver for AnyReceiver {
520    type Token = AnyToken;
521
522    #[cfg_attr(
523        not(any(
524            feature = "transport-kafka",
525            feature = "transport-grpc",
526            feature = "transport-memory",
527            feature = "transport-pipe",
528            feature = "transport-file",
529            feature = "transport-http",
530            feature = "transport-redis"
531        )),
532        allow(unused_variables)
533    )]
534    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<AnyToken>> {
535        match self {
536            #[cfg(feature = "transport-kafka")]
537            Self::Kafka(t) => {
538                let batch = t.recv(max).await?;
539                Ok(wrap_batch(batch, AnyToken::Kafka))
540            }
541            #[cfg(feature = "transport-grpc")]
542            Self::Grpc(t) => {
543                let batch = t.recv(max).await?;
544                Ok(wrap_batch(batch, AnyToken::Grpc))
545            }
546            #[cfg(feature = "transport-memory")]
547            Self::Memory(t) => {
548                let batch = t.recv(max).await?;
549                Ok(wrap_batch(batch, AnyToken::Memory))
550            }
551            #[cfg(feature = "transport-pipe")]
552            Self::Pipe(t) => {
553                let batch = t.recv(max).await?;
554                Ok(wrap_batch(batch, AnyToken::Pipe))
555            }
556            #[cfg(feature = "transport-file")]
557            Self::File(t) => {
558                let batch = t.recv(max).await?;
559                Ok(wrap_batch(batch, AnyToken::File))
560            }
561            #[cfg(feature = "transport-http")]
562            Self::Http(t) => {
563                let batch = t.recv(max).await?;
564                Ok(wrap_batch(batch, AnyToken::Http))
565            }
566            #[cfg(feature = "transport-redis")]
567            Self::Redis(t) => {
568                let batch = t.recv(max).await?;
569                Ok(wrap_batch(batch, AnyToken::Redis))
570            }
571            #[allow(unreachable_patterns)]
572            _ => Err(TransportError::Config(
573                "no transport variant enabled".into(),
574            )),
575        }
576    }
577
578    /// Forward the byte-aware recv to each inner transport so the governed
579    /// driver's byte budget reaches the transport that can honour it (Kafka's
580    /// recv-arena). Transports without a byte-aware override fall back to the
581    /// trait default (record-bounded `recv`), which is correct for the
582    /// one-record-at-a-time channel/stream transports.
583    #[cfg_attr(
584        not(any(
585            feature = "transport-kafka",
586            feature = "transport-grpc",
587            feature = "transport-memory",
588            feature = "transport-pipe",
589            feature = "transport-file",
590            feature = "transport-http",
591            feature = "transport-redis"
592        )),
593        allow(unused_variables)
594    )]
595    async fn recv_limited(
596        &self,
597        limits: super::traits::RecvLimits,
598    ) -> TransportResult<WorkBatch<AnyToken>> {
599        match self {
600            #[cfg(feature = "transport-kafka")]
601            Self::Kafka(t) => {
602                let batch = t.recv_limited(limits).await?;
603                Ok(wrap_batch(batch, AnyToken::Kafka))
604            }
605            #[cfg(feature = "transport-grpc")]
606            Self::Grpc(t) => {
607                let batch = t.recv_limited(limits).await?;
608                Ok(wrap_batch(batch, AnyToken::Grpc))
609            }
610            #[cfg(feature = "transport-memory")]
611            Self::Memory(t) => {
612                let batch = t.recv_limited(limits).await?;
613                Ok(wrap_batch(batch, AnyToken::Memory))
614            }
615            #[cfg(feature = "transport-pipe")]
616            Self::Pipe(t) => {
617                let batch = t.recv_limited(limits).await?;
618                Ok(wrap_batch(batch, AnyToken::Pipe))
619            }
620            #[cfg(feature = "transport-file")]
621            Self::File(t) => {
622                let batch = t.recv_limited(limits).await?;
623                Ok(wrap_batch(batch, AnyToken::File))
624            }
625            #[cfg(feature = "transport-http")]
626            Self::Http(t) => {
627                let batch = t.recv_limited(limits).await?;
628                Ok(wrap_batch(batch, AnyToken::Http))
629            }
630            #[cfg(feature = "transport-redis")]
631            Self::Redis(t) => {
632                let batch = t.recv_limited(limits).await?;
633                Ok(wrap_batch(batch, AnyToken::Redis))
634            }
635            #[allow(unreachable_patterns)]
636            _ => Err(TransportError::Config(
637                "no transport variant enabled".into(),
638            )),
639        }
640    }
641
642    #[cfg_attr(
643        not(any(
644            feature = "transport-kafka",
645            feature = "transport-grpc",
646            feature = "transport-memory",
647            feature = "transport-pipe",
648            feature = "transport-file",
649            feature = "transport-http",
650            feature = "transport-redis"
651        )),
652        allow(unused_variables)
653    )]
654    async fn commit(&self, tokens: &[AnyToken]) -> TransportResult<()> {
655        // `match tok { Variant(x) => Some(x), _ => None }` not `if let`: with a
656        // single feature enabled AnyToken has one variant, making `if let`
657        // irrefutable (error under -D warnings); the wildcard arm is unreachable
658        // but legal there. Tokens from a non-matching variant are a programming
659        // error (cannot arise from this receiver's recv) -- filtered out
660        // defensively, not panicked on.
661        match self {
662            #[cfg(feature = "transport-kafka")]
663            Self::Kafka(t) => {
664                t.commit(&extract_tokens(tokens, |tok| match tok {
665                    AnyToken::Kafka(k) => Some(k.clone()),
666                    #[allow(unreachable_patterns)]
667                    _ => None,
668                }))
669                .await
670            }
671            #[cfg(feature = "transport-grpc")]
672            Self::Grpc(t) => {
673                t.commit(&extract_tokens(tokens, |tok| match tok {
674                    AnyToken::Grpc(g) => Some(g.clone()),
675                    #[allow(unreachable_patterns)]
676                    _ => None,
677                }))
678                .await
679            }
680            #[cfg(feature = "transport-memory")]
681            Self::Memory(t) => {
682                t.commit(&extract_tokens(tokens, |tok| match tok {
683                    AnyToken::Memory(m) => Some(*m),
684                    #[allow(unreachable_patterns)]
685                    _ => None,
686                }))
687                .await
688            }
689            #[cfg(feature = "transport-pipe")]
690            Self::Pipe(t) => {
691                t.commit(&extract_tokens(tokens, |tok| match tok {
692                    AnyToken::Pipe(p) => Some(*p),
693                    #[allow(unreachable_patterns)]
694                    _ => None,
695                }))
696                .await
697            }
698            #[cfg(feature = "transport-file")]
699            Self::File(t) => {
700                t.commit(&extract_tokens(tokens, |tok| match tok {
701                    AnyToken::File(f) => Some(*f),
702                    #[allow(unreachable_patterns)]
703                    _ => None,
704                }))
705                .await
706            }
707            #[cfg(feature = "transport-http")]
708            Self::Http(t) => {
709                t.commit(&extract_tokens(tokens, |tok| match tok {
710                    AnyToken::Http(h) => Some(h.clone()),
711                    #[allow(unreachable_patterns)]
712                    _ => None,
713                }))
714                .await
715            }
716            #[cfg(feature = "transport-redis")]
717            Self::Redis(t) => {
718                t.commit(&extract_tokens(tokens, |tok| match tok {
719                    AnyToken::Redis(r) => Some(r.clone()),
720                    #[allow(unreachable_patterns)]
721                    _ => None,
722                }))
723                .await
724            }
725            #[allow(unreachable_patterns)]
726            _ => Err(TransportError::Config(
727                "no transport variant enabled".into(),
728            )),
729        }
730    }
731}
732
733/// Read a [`TransportConfig`](super::TransportConfig) from the global cascade
734/// under `key`. Shared by the `from_config*` constructors.
735#[cfg(feature = "config")]
736fn read_transport_config(key: &str) -> TransportResult<super::TransportConfig> {
737    let cfg = crate::config::try_get()
738        .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
739    cfg.unmarshal_key::<super::TransportConfig>(key)
740        .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))
741}
742
743/// Collect the tokens of one `AnyToken` variant from a mixed slice. The
744/// per-variant match stays in the caller -- a single-feature build has one
745/// variant, making `if let` irrefutable under -D warnings -- so this only
746/// folds away the repeated `.collect()`.
747#[cfg(any(
748    feature = "transport-kafka",
749    feature = "transport-grpc",
750    feature = "transport-memory",
751    feature = "transport-pipe",
752    feature = "transport-file",
753    feature = "transport-http",
754    feature = "transport-redis",
755))]
756fn extract_tokens<T>(tokens: &[AnyToken], pick: impl Fn(&AnyToken) -> Option<T>) -> Vec<T> {
757    tokens.iter().filter_map(pick).collect()
758}
759
760impl AnyReceiver {
761    /// Create a receiver from the config cascade.
762    ///
763    /// Reads the transport config from the given key in the config
764    /// cascade and creates the appropriate receiver.
765    ///
766    /// # Example config
767    ///
768    /// ```yaml
769    /// transport:
770    ///   input:
771    ///     type: kafka
772    ///     kafka:
773    ///       brokers: ["kafka:9092"]
774    ///       group_id: "my-consumer"
775    /// ```
776    ///
777    /// ```rust,ignore
778    /// let receiver = AnyReceiver::from_config("transport.input").await?;
779    /// ```
780    pub async fn from_config(key: &str) -> TransportResult<Self> {
781        #[cfg(feature = "config")]
782        let config = read_transport_config(key)?;
783
784        #[cfg(not(feature = "config"))]
785        let config = {
786            let _ = key;
787            super::TransportConfig::default()
788        };
789
790        Self::from_transport_config(&config).await
791    }
792
793    /// Create a receiver from an explicit `TransportConfig`.
794    pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
795        match config.transport_type {
796            #[cfg(feature = "transport-kafka")]
797            TransportType::Kafka => {
798                let kafka_config = config
799                    .kafka
800                    .as_ref()
801                    .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
802                let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
803                Ok(Self::Kafka(transport))
804            }
805
806            #[cfg(feature = "transport-grpc")]
807            TransportType::Grpc => {
808                let grpc_config = config
809                    .grpc
810                    .as_ref()
811                    .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
812                let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
813                Ok(Self::Grpc(transport))
814            }
815
816            #[cfg(feature = "transport-memory")]
817            TransportType::Memory => {
818                let memory_config = config.memory.clone().unwrap_or_default();
819                let transport = super::memory::MemoryTransport::new(&memory_config)?;
820                Ok(Self::Memory(transport))
821            }
822
823            #[cfg(feature = "transport-pipe")]
824            TransportType::Pipe => {
825                let pipe_config = config.pipe.clone().unwrap_or_default();
826                let transport = super::pipe::PipeTransport::new(&pipe_config);
827                Ok(Self::Pipe(transport))
828            }
829
830            #[cfg(feature = "transport-file")]
831            TransportType::File => {
832                let file_config = config
833                    .file
834                    .as_ref()
835                    .ok_or_else(|| TransportError::Config("file config missing".into()))?;
836                let transport = super::file::FileTransport::new(file_config).await?;
837                Ok(Self::File(transport))
838            }
839
840            #[cfg(feature = "transport-http")]
841            TransportType::Http => {
842                let http_config = config
843                    .http
844                    .as_ref()
845                    .ok_or_else(|| TransportError::Config("http config missing".into()))?;
846                let transport = super::http::HttpTransport::new(http_config).await?;
847                Ok(Self::Http(transport))
848            }
849
850            #[cfg(feature = "transport-redis")]
851            TransportType::Redis => {
852                let redis_config = config
853                    .redis
854                    .as_ref()
855                    .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
856                let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
857                Ok(Self::Redis(transport))
858            }
859
860            // Transport types for modules not yet implemented
861            #[allow(unreachable_patterns)]
862            other => Err(TransportError::Config(format!(
863                "transport type '{other}' is not available (feature not enabled or not yet implemented)"
864            ))),
865        }
866    }
867
868    /// Create a governed receiver from the config cascade (`governor` feature).
869    ///
870    /// Identical to [`from_config`](Self::from_config) but threads the supplied
871    /// [`SelfRegulationGovernor`](crate::SelfRegulationGovernor)'s pressure into
872    /// the inbound brake of every backend that can honour it -- the Kafka
873    /// pause-partitions gate and the HTTP/gRPC 503/`unavailable` shed -- so a
874    /// factory-built receiver actually engages the default-on governor instead
875    /// of silently dropping the inbound brake.
876    ///
877    /// Construction order: the `governor` (and its pressure) is built by the
878    /// runtime BEFORE this call, so the pressure latch already exists and is
879    /// merely cloned (cheap `Arc` bump) into each transport here.
880    ///
881    /// # Errors
882    ///
883    /// Same as [`from_config`](Self::from_config).
884    #[cfg(feature = "governor")]
885    pub async fn from_config_with_governor(
886        key: &str,
887        governor: &crate::SelfRegulationGovernor,
888    ) -> TransportResult<Self> {
889        #[cfg(feature = "config")]
890        let config = read_transport_config(key)?;
891
892        #[cfg(not(feature = "config"))]
893        let config = {
894            let _ = key;
895            super::TransportConfig::default()
896        };
897
898        Self::from_transport_config_with_governor(&config, governor).await
899    }
900
901    /// Create a governed receiver from an explicit `TransportConfig`
902    /// (`governor` feature).
903    ///
904    /// The governor-aware sibling of [`from_transport_config`](Self::from_transport_config).
905    /// Backends that own an inbound brake are wired to the governor's shared
906    /// pressure:
907    ///
908    /// - **Kafka**: the consumer's assigned partitions are paused/resumed via
909    ///   `SelfRegulationGovernor::attach_kafka_gate` (`transport-kafka` feature)
910    ///   -- the full `gate_actuator -> InboundGate -> with_inbound_gate` dance.
911    /// - **HTTP / gRPC**: the embedded receive server is built with
912    ///   `with_pressure(Some(governor.pressure()))`, so it sheds with 503 /
913    ///   `Status::unavailable` while the pressure latch holds.
914    ///
915    /// Backends with no inbound brake (memory, pipe, file, redis) construct
916    /// exactly as in [`from_transport_config`](Self::from_transport_config) --
917    /// the byte-budget lever already reaches them through the governed driver.
918    ///
919    /// # Errors
920    ///
921    /// Same as [`from_transport_config`](Self::from_transport_config).
922    #[cfg(feature = "governor")]
923    pub async fn from_transport_config_with_governor(
924        config: &super::TransportConfig,
925        #[cfg_attr(
926            not(any(
927                feature = "transport-kafka",
928                feature = "transport-grpc",
929                feature = "transport-http"
930            )),
931            allow(unused_variables)
932        )]
933        governor: &crate::SelfRegulationGovernor,
934    ) -> TransportResult<Self> {
935        match config.transport_type {
936            #[cfg(feature = "transport-kafka")]
937            TransportType::Kafka => {
938                let kafka_config = config
939                    .kafka
940                    .as_ref()
941                    .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
942                let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
943                // Attach the inbound gate over the governor's shared pressure:
944                // pauses assigned partitions while the latch holds (member stays
945                // in the group -- no rebalance).
946                let transport = governor.attach_kafka_gate(transport);
947                Ok(Self::Kafka(transport))
948            }
949
950            #[cfg(feature = "transport-grpc")]
951            TransportType::Grpc => {
952                let grpc_config = config
953                    .grpc
954                    .as_ref()
955                    .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
956                let transport = super::grpc::GrpcTransport::with_pressure(
957                    grpc_config,
958                    Some(governor.pressure()),
959                )
960                .await?;
961                Ok(Self::Grpc(transport))
962            }
963
964            #[cfg(feature = "transport-http")]
965            TransportType::Http => {
966                let http_config = config
967                    .http
968                    .as_ref()
969                    .ok_or_else(|| TransportError::Config("http config missing".into()))?;
970                let transport = super::http::HttpTransport::with_pressure(
971                    http_config,
972                    Some(governor.pressure()),
973                )
974                .await?;
975                Ok(Self::Http(transport))
976            }
977
978            // Backends with no inbound brake: construct identically to the
979            // non-governor path. The byte-budget lever reaches these via the
980            // governed driver, not an inbound gate.
981            #[cfg(any(
982                feature = "transport-memory",
983                feature = "transport-pipe",
984                feature = "transport-file",
985                feature = "transport-redis"
986            ))]
987            _ => Self::from_transport_config(config).await,
988
989            // No brakeable backend enabled at all: defer entirely to the
990            // non-governor path (handles the "feature not enabled" error too).
991            #[allow(unreachable_patterns)]
992            _ => Self::from_transport_config(config).await,
993        }
994    }
995}
996
997// ---------------------------------------------------------------------------
998// Tests
999// ---------------------------------------------------------------------------
1000
1001#[cfg(all(test, feature = "transport-memory"))]
1002mod tests {
1003    use super::*;
1004    use crate::transport::memory::{MemoryConfig, MemoryTransport};
1005    use crate::transport::traits::TransportReceiver;
1006
1007    /// End-to-end round-trip: inject a message, recv via `AnyReceiver`,
1008    /// assert token wrapping, then commit and verify the memory transport's
1009    /// committed sequence advances.
1010    ///
1011    /// This exercises the full token wrap + commit re-dispatch path.
1012    #[tokio::test]
1013    async fn any_receiver_memory_recv_commit_round_trip() {
1014        // Build the underlying transport and inject a message.
1015        let inner = MemoryTransport::new(&MemoryConfig::default())
1016            .expect("memory transport must construct with default config");
1017        inner
1018            .inject(Some("events.test"), b"hello from AnyReceiver".to_vec())
1019            .await
1020            .expect("inject must succeed");
1021
1022        // Wrap in AnyReceiver.
1023        let receiver = AnyReceiver::Memory(inner);
1024
1025        assert_eq!(receiver.name(), "memory");
1026        assert!(receiver.is_healthy());
1027
1028        // Recv via AnyReceiver -- must yield a WorkBatch<AnyToken>.
1029        let batch = receiver.recv(10).await.expect("recv must succeed");
1030        assert_eq!(batch.records.len(), 1, "expected exactly one record");
1031        assert_eq!(batch.commit_tokens.len(), 1, "expected one commit token");
1032        assert!(batch.dlq_entries.is_empty(), "no DLQ entries expected");
1033
1034        let record = &batch.records[0];
1035        assert_eq!(record.payload.as_ref(), b"hello from AnyReceiver");
1036        assert_eq!(record.key.as_deref(), Some("events.test"));
1037
1038        // Token must be wrapped in the Memory variant.
1039        let token = &batch.commit_tokens[0];
1040        assert!(
1041            matches!(token, AnyToken::Memory(_)),
1042            "token variant must be AnyToken::Memory, got {token}"
1043        );
1044
1045        // Display delegates to the inner MemoryToken (format: "memory:<seq>").
1046        let display = token.to_string();
1047        assert!(
1048            display.starts_with("memory:"),
1049            "Display must delegate to MemoryToken, got {display}"
1050        );
1051
1052        // Commit the AnyToken slice -- routes back to the MemoryTransport.
1053        let tokens: Vec<AnyToken> = batch.commit_tokens;
1054        // Irrefutable when transport-memory is the only variant compiled in,
1055        // refutable when other transports are enabled -- allow either.
1056        #[allow(irrefutable_let_patterns)]
1057        let seq_before = if let AnyReceiver::Memory(ref t) = receiver {
1058            t.committed_sequence()
1059        } else {
1060            panic!("must be Memory variant");
1061        };
1062
1063        receiver.commit(&tokens).await.expect("commit must succeed");
1064
1065        // The memory transport tracks the max committed seq; it must have advanced.
1066        #[allow(irrefutable_let_patterns)]
1067        if let AnyReceiver::Memory(ref t) = receiver {
1068            let seq_after = t.committed_sequence();
1069            assert!(
1070                seq_after > seq_before || seq_after == 0,
1071                "committed_sequence must advance after commit (before={seq_before}, after={seq_after})"
1072            );
1073        }
1074    }
1075
1076    /// Tokens from the wrong variant are silently ignored by commit --
1077    /// commit must succeed without error even if no tokens match.
1078    #[tokio::test]
1079    async fn any_receiver_commit_ignores_mismatched_variants() {
1080        let inner = MemoryTransport::new(&MemoryConfig::default())
1081            .expect("memory transport must construct with default config");
1082        let receiver = AnyReceiver::Memory(inner);
1083
1084        // A Pipe token delivered to a Memory receiver -- must not panic or error.
1085        #[cfg(feature = "transport-pipe")]
1086        {
1087            let alien_token = AnyToken::Pipe(crate::transport::pipe::PipeToken { seq: 99 });
1088            receiver
1089                .commit(&[alien_token])
1090                .await
1091                .expect("commit with mismatched variant must succeed without error");
1092        }
1093
1094        // Zero tokens -- always a no-op.
1095        receiver
1096            .commit(&[])
1097            .await
1098            .expect("commit with empty slice must succeed");
1099    }
1100}
1101
1102// ---------------------------------------------------------------------------
1103// Governor-aware factory tests (Remediation Phase 6).
1104//
1105// Prove `*_with_governor` actually threads the governor's inbound brake into
1106// the backends that own one, and that the non-governor path is unchanged.
1107// ---------------------------------------------------------------------------
1108
1109#[cfg(all(test, feature = "governor"))]
1110mod governor_tests {
1111    #[cfg(any(
1112        feature = "transport-kafka",
1113        feature = "transport-grpc",
1114        feature = "transport-http",
1115        feature = "transport-memory"
1116    ))]
1117    use super::*;
1118
1119    /// Build a [`SelfRegulationGovernor`] whose single HARD memory source is
1120    /// pinned ABOVE / BELOW `pause_above` (default 0.80) by sizing the guard.
1121    #[cfg(any(
1122        feature = "transport-kafka",
1123        feature = "transport-grpc",
1124        feature = "transport-http",
1125        feature = "transport-memory"
1126    ))]
1127    fn governor(pinned_high: bool) -> crate::SelfRegulationGovernor {
1128        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1129        let guard = std::sync::Arc::new(MemoryGuard::new(MemoryGuardConfig {
1130            limit_bytes: 1000,
1131            pressure_threshold: 0.80,
1132            ..Default::default()
1133        }));
1134        if pinned_high {
1135            guard.add_bytes(950); // 95% -> well above pause_above
1136        } else {
1137            guard.add_bytes(10); // 1% -> well below resume_below
1138        }
1139        crate::SelfRegulationConfig::default()
1140            .build(guard)
1141            .expect("governor enabled by default")
1142    }
1143
1144    /// A factory-built Kafka receiver MUST carry an inbound gate when a governor
1145    /// is supplied. Broker-free: `KafkaTransport::new` lazily connects and an
1146    /// empty topic list means no subscribe/poll happens at construction.
1147    #[cfg(feature = "transport-kafka")]
1148    #[tokio::test]
1149    async fn kafka_governed_receiver_has_inbound_gate() {
1150        let kafka = crate::transport::kafka::KafkaConfig::for_testing(
1151            "localhost:9092",
1152            "phase6-test",
1153            Vec::new(), // no topics -> no subscribe -> broker-free build
1154        );
1155        let cfg = crate::transport::TransportConfig {
1156            transport_type: crate::transport::types::TransportType::Kafka,
1157            kafka: Some(kafka),
1158            ..Default::default()
1159        };
1160
1161        let gov = governor(false);
1162        let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1163            .await
1164            .expect("governed kafka receiver must construct broker-free");
1165
1166        match receiver {
1167            AnyReceiver::Kafka(ref t) => assert!(
1168                t.has_inbound_gate(),
1169                "factory-built Kafka receiver must have the governor's inbound gate attached"
1170            ),
1171            _ => panic!("expected Kafka variant"),
1172        }
1173
1174        // The non-governor constructor must NOT attach a gate (byte-identical
1175        // to pre-Phase-6 behaviour).
1176        let plain = AnyReceiver::from_transport_config(&cfg)
1177            .await
1178            .expect("plain kafka receiver must construct broker-free");
1179        match plain {
1180            AnyReceiver::Kafka(ref t) => assert!(
1181                !t.has_inbound_gate(),
1182                "non-governor constructor must leave the inbound gate unattached"
1183            ),
1184            _ => panic!("expected Kafka variant"),
1185        }
1186    }
1187
1188    /// A factory-built gRPC receiver MUST reject under pressure (governor pinned
1189    /// HIGH) with `Status::unavailable`, surfaced to the client as backpressure.
1190    #[cfg(feature = "transport-grpc")]
1191    #[tokio::test]
1192    async fn grpc_governed_receiver_sheds_under_pressure() {
1193        use crate::transport::traits::{TransportBase, TransportSender};
1194        use crate::transport::types::SendResult;
1195
1196        let server_cfg = crate::transport::grpc::GrpcConfig::server("127.0.0.1:16188");
1197        let cfg = crate::transport::TransportConfig {
1198            transport_type: crate::transport::types::TransportType::Grpc,
1199            grpc: Some(server_cfg),
1200            ..Default::default()
1201        };
1202
1203        let gov = governor(true);
1204        assert!(
1205            gov.pressure().should_hold(),
1206            "pinned-high governor must hold"
1207        );
1208
1209        let server = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1210            .await
1211            .expect("governed grpc receiver must construct");
1212        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1213
1214        let client = crate::transport::grpc::GrpcTransport::new(
1215            &crate::transport::grpc::GrpcConfig::client("http://127.0.0.1:16188"),
1216        )
1217        .await
1218        .expect("grpc client");
1219        let result = client
1220            .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1221            .await;
1222        assert!(
1223            matches!(result, SendResult::Backpressured),
1224            "push under pressure must surface as backpressure, got {result:?}"
1225        );
1226
1227        client.close().await.unwrap();
1228        server.close().await.unwrap();
1229    }
1230
1231    /// A factory-built HTTP receiver MUST shed with 503 under pressure (governor
1232    /// pinned HIGH); the shed request never reaches the queue.
1233    #[cfg(feature = "transport-http")]
1234    #[tokio::test]
1235    async fn http_governed_receiver_sheds_under_pressure() {
1236        use crate::transport::traits::{TransportBase, TransportReceiver};
1237
1238        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1239        let addr = listener.local_addr().unwrap();
1240        drop(listener);
1241
1242        let http_cfg = crate::transport::http::HttpTransportConfig {
1243            listen: Some(addr.to_string()),
1244            recv_timeout_ms: 200,
1245            ..Default::default()
1246        };
1247        let cfg = crate::transport::TransportConfig {
1248            transport_type: crate::transport::types::TransportType::Http,
1249            http: Some(http_cfg),
1250            ..Default::default()
1251        };
1252
1253        let gov = governor(true);
1254        assert!(
1255            gov.pressure().should_hold(),
1256            "pinned-high governor must hold"
1257        );
1258
1259        let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1260            .await
1261            .expect("governed http receiver must construct");
1262        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1263
1264        let client = reqwest::Client::new();
1265        let resp = client
1266            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1267            .body(b"{\"msg\":\"shed\"}".to_vec())
1268            .send()
1269            .await
1270            .unwrap();
1271        assert_eq!(
1272            resp.status(),
1273            reqwest::StatusCode::SERVICE_UNAVAILABLE,
1274            "factory-built HTTP receiver under pressure must shed with 503"
1275        );
1276
1277        let records = receiver.recv(10).await.unwrap().records;
1278        assert!(records.is_empty(), "shed request must not be queued");
1279        receiver.close().await.unwrap();
1280    }
1281
1282    /// Backends with no inbound brake (memory) construct identically through the
1283    /// governor-aware path -- the receiver still works as a plain receiver.
1284    #[cfg(feature = "transport-memory")]
1285    #[tokio::test]
1286    async fn memory_governed_receiver_is_plain() {
1287        use crate::transport::traits::TransportReceiver;
1288
1289        let cfg = crate::transport::TransportConfig {
1290            transport_type: crate::transport::types::TransportType::Memory,
1291            memory: Some(crate::transport::memory::MemoryConfig::default()),
1292            ..Default::default()
1293        };
1294
1295        let gov = governor(false);
1296        let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1297            .await
1298            .expect("governed memory receiver must construct");
1299
1300        assert_eq!(receiver.name(), "memory");
1301        // No gate concept for memory -- recv just returns an empty batch.
1302        let batch = receiver.recv(1).await.expect("recv must succeed");
1303        assert!(batch.records.is_empty(), "no records injected");
1304    }
1305}