Skip to main content

hyperi_rustlib/transport/
factory.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/factory.rs
3// Purpose:   Transport factory -- create senders from config
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Transport factory for runtime transport selection.
10//!
11//! Creates transport senders from configuration, enabling apps to swap
12//! between Kafka, gRPC, file, pipe, HTTP, or Redis via config change.
13//!
14//! # Usage
15//!
16//! ```yaml
17//! # settings.yaml
18//! transport:
19//!   output:
20//!     type: kafka
21//!     kafka:
22//!       brokers: ["kafka:9092"]
23//! ```
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::factory::AnySender;
27//!
28//! let sender = AnySender::from_config("transport.output").await?;
29//! sender.send("events.land", payload).await;
30//! ```
31
32use super::error::{TransportError, TransportResult};
33use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
34use super::types::SendResult;
35#[cfg(any(
36    feature = "transport-kafka",
37    feature = "transport-grpc",
38    feature = "transport-memory",
39    feature = "transport-pipe",
40    feature = "transport-file",
41    feature = "transport-http",
42    feature = "transport-redis"
43))]
44use super::types::TransportType;
45use super::work_batch::{Record, WorkBatch};
46
47/// Type-erased transport sender.
48///
49/// Wraps any concrete transport sender behind an enum for runtime
50/// dispatch. Created by the transport factory from config.
51///
52/// Uses enum dispatch (not trait objects) because `TransportSender`
53/// has `impl Future` return types which prevent `dyn` dispatch.
54pub enum AnySender {
55    #[cfg(feature = "transport-kafka")]
56    Kafka(super::kafka::KafkaTransport),
57
58    #[cfg(feature = "transport-grpc")]
59    Grpc(super::grpc::GrpcTransport),
60
61    #[cfg(feature = "transport-memory")]
62    Memory(super::memory::MemoryTransport),
63
64    #[cfg(feature = "transport-pipe")]
65    Pipe(super::pipe::PipeTransport),
66
67    #[cfg(feature = "transport-file")]
68    File(super::file::FileTransport),
69
70    #[cfg(feature = "transport-http")]
71    Http(super::http::HttpTransport),
72
73    #[cfg(feature = "transport-redis")]
74    Redis(super::redis_transport::RedisTransport),
75}
76
77impl TransportBase for AnySender {
78    async fn close(&self) -> TransportResult<()> {
79        match self {
80            #[cfg(feature = "transport-kafka")]
81            Self::Kafka(t) => t.close().await,
82            #[cfg(feature = "transport-grpc")]
83            Self::Grpc(t) => t.close().await,
84            #[cfg(feature = "transport-memory")]
85            Self::Memory(t) => t.close().await,
86            #[cfg(feature = "transport-pipe")]
87            Self::Pipe(t) => t.close().await,
88            #[cfg(feature = "transport-file")]
89            Self::File(t) => t.close().await,
90            #[cfg(feature = "transport-http")]
91            Self::Http(t) => t.close().await,
92            #[cfg(feature = "transport-redis")]
93            Self::Redis(t) => t.close().await,
94            #[allow(unreachable_patterns)]
95            _ => Err(TransportError::Config(
96                "no transport variant enabled".into(),
97            )),
98        }
99    }
100
101    fn is_healthy(&self) -> bool {
102        match self {
103            #[cfg(feature = "transport-kafka")]
104            Self::Kafka(t) => t.is_healthy(),
105            #[cfg(feature = "transport-grpc")]
106            Self::Grpc(t) => t.is_healthy(),
107            #[cfg(feature = "transport-memory")]
108            Self::Memory(t) => t.is_healthy(),
109            #[cfg(feature = "transport-pipe")]
110            Self::Pipe(t) => t.is_healthy(),
111            #[cfg(feature = "transport-file")]
112            Self::File(t) => t.is_healthy(),
113            #[cfg(feature = "transport-http")]
114            Self::Http(t) => t.is_healthy(),
115            #[cfg(feature = "transport-redis")]
116            Self::Redis(t) => t.is_healthy(),
117            #[allow(unreachable_patterns)]
118            _ => false,
119        }
120    }
121
122    fn name(&self) -> &'static str {
123        match self {
124            #[cfg(feature = "transport-kafka")]
125            Self::Kafka(t) => t.name(),
126            #[cfg(feature = "transport-grpc")]
127            Self::Grpc(t) => t.name(),
128            #[cfg(feature = "transport-memory")]
129            Self::Memory(t) => t.name(),
130            #[cfg(feature = "transport-pipe")]
131            Self::Pipe(t) => t.name(),
132            #[cfg(feature = "transport-file")]
133            Self::File(t) => t.name(),
134            #[cfg(feature = "transport-http")]
135            Self::Http(t) => t.name(),
136            #[cfg(feature = "transport-redis")]
137            Self::Redis(t) => t.name(),
138            #[allow(unreachable_patterns)]
139            _ => "none",
140        }
141    }
142}
143
144impl TransportSender for AnySender {
145    #[cfg_attr(
146        not(any(
147            feature = "transport-kafka",
148            feature = "transport-grpc",
149            feature = "transport-memory",
150            feature = "transport-pipe",
151            feature = "transport-file",
152            feature = "transport-http",
153            feature = "transport-redis"
154        )),
155        allow(unused_variables)
156    )]
157    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
158        match self {
159            #[cfg(feature = "transport-kafka")]
160            Self::Kafka(t) => t.send(key, payload).await,
161            #[cfg(feature = "transport-grpc")]
162            Self::Grpc(t) => t.send(key, payload).await,
163            #[cfg(feature = "transport-memory")]
164            Self::Memory(t) => t.send(key, payload).await,
165            #[cfg(feature = "transport-pipe")]
166            Self::Pipe(t) => t.send(key, payload).await,
167            #[cfg(feature = "transport-file")]
168            Self::File(t) => t.send(key, payload).await,
169            #[cfg(feature = "transport-http")]
170            Self::Http(t) => t.send(key, payload).await,
171            #[cfg(feature = "transport-redis")]
172            Self::Redis(t) => t.send(key, payload).await,
173            #[allow(unreachable_patterns)]
174            _ => SendResult::Fatal(TransportError::Config(
175                "no transport variant enabled".into(),
176            )),
177        }
178    }
179
180    /// Forward [`send_batch`](TransportSender::send_batch) to the active
181    /// backend. gRPC uses its native single-RPC `RouteBatch` override; every
182    /// other backend uses the trait's per-record default (see the at-least-once
183    /// partial-send caveat on the trait method).
184    #[cfg_attr(
185        not(any(
186            feature = "transport-kafka",
187            feature = "transport-grpc",
188            feature = "transport-memory",
189            feature = "transport-pipe",
190            feature = "transport-file",
191            feature = "transport-http",
192            feature = "transport-redis"
193        )),
194        allow(unused_variables)
195    )]
196    async fn send_batch(&self, records: &[Record]) -> SendResult {
197        match self {
198            #[cfg(feature = "transport-kafka")]
199            Self::Kafka(t) => t.send_batch(records).await,
200            #[cfg(feature = "transport-grpc")]
201            Self::Grpc(t) => t.send_batch(records).await,
202            #[cfg(feature = "transport-memory")]
203            Self::Memory(t) => t.send_batch(records).await,
204            #[cfg(feature = "transport-pipe")]
205            Self::Pipe(t) => t.send_batch(records).await,
206            #[cfg(feature = "transport-file")]
207            Self::File(t) => t.send_batch(records).await,
208            #[cfg(feature = "transport-http")]
209            Self::Http(t) => t.send_batch(records).await,
210            #[cfg(feature = "transport-redis")]
211            Self::Redis(t) => t.send_batch(records).await,
212            #[allow(unreachable_patterns)]
213            _ => SendResult::Fatal(TransportError::Config(
214                "no transport variant enabled".into(),
215            )),
216        }
217    }
218}
219
220impl AnySender {
221    /// Create a sender from config cascade.
222    ///
223    /// Reads the transport config from the given key in the config
224    /// cascade and creates the appropriate sender.
225    ///
226    /// # Example config
227    ///
228    /// ```yaml
229    /// transport:
230    ///   output:
231    ///     type: kafka
232    ///     kafka:
233    ///       brokers: ["kafka:9092"]
234    /// ```
235    ///
236    /// ```rust,ignore
237    /// let sender = AnySender::from_config("transport.output").await?;
238    /// ```
239    pub async fn from_config(key: &str) -> TransportResult<Self> {
240        #[cfg(feature = "config")]
241        let config = {
242            let cfg = crate::config::try_get()
243                .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
244            cfg.unmarshal_key::<super::TransportConfig>(key)
245                .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
246        };
247
248        #[cfg(not(feature = "config"))]
249        let config = {
250            let _ = key;
251            super::TransportConfig::default()
252        };
253
254        Self::from_transport_config(&config).await
255    }
256
257    /// Create a sender from an explicit `TransportConfig`.
258    pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
259        match config.transport_type {
260            #[cfg(feature = "transport-kafka")]
261            TransportType::Kafka => {
262                let kafka_config = config
263                    .kafka
264                    .as_ref()
265                    .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
266                let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
267                Ok(Self::Kafka(transport))
268            }
269
270            #[cfg(feature = "transport-grpc")]
271            TransportType::Grpc => {
272                let grpc_config = config
273                    .grpc
274                    .as_ref()
275                    .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
276                let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
277                Ok(Self::Grpc(transport))
278            }
279
280            #[cfg(feature = "transport-memory")]
281            TransportType::Memory => {
282                let memory_config = config.memory.clone().unwrap_or_default();
283                let transport = super::memory::MemoryTransport::new(&memory_config)?;
284                Ok(Self::Memory(transport))
285            }
286
287            #[cfg(feature = "transport-pipe")]
288            TransportType::Pipe => {
289                let pipe_config = config.pipe.clone().unwrap_or_default();
290                let transport = super::pipe::PipeTransport::new(&pipe_config);
291                Ok(Self::Pipe(transport))
292            }
293
294            #[cfg(feature = "transport-file")]
295            TransportType::File => {
296                let file_config = config
297                    .file
298                    .as_ref()
299                    .ok_or_else(|| TransportError::Config("file config missing".into()))?;
300                let transport = super::file::FileTransport::new(file_config).await?;
301                Ok(Self::File(transport))
302            }
303
304            #[cfg(feature = "transport-http")]
305            TransportType::Http => {
306                let http_config = config
307                    .http
308                    .as_ref()
309                    .ok_or_else(|| TransportError::Config("http config missing".into()))?;
310                let transport = super::http::HttpTransport::new(http_config).await?;
311                Ok(Self::Http(transport))
312            }
313
314            #[cfg(feature = "transport-redis")]
315            TransportType::Redis => {
316                let redis_config = config
317                    .redis
318                    .as_ref()
319                    .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
320                let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
321                Ok(Self::Redis(transport))
322            }
323
324            // Transport types for modules not yet implemented
325            #[allow(unreachable_patterns)]
326            other => Err(TransportError::Config(format!(
327                "transport type '{other}' is not available (feature not enabled or not yet implemented)"
328            ))),
329        }
330    }
331}
332
333// ---------------------------------------------------------------------------
334// AnyToken -- type-erased commit token, one variant per enabled backend.
335// ---------------------------------------------------------------------------
336
337/// Type-erased commit token produced by [`AnyReceiver`].
338///
339/// Wraps each backend's concrete token in a matching enum variant so that
340/// `AnyReceiver::commit` can route tokens back to the correct backend without
341/// heap allocation or trait objects.  The variant set mirrors the enabled
342/// transport feature flags exactly.
343///
344/// Tokens are always produced by the same `AnyReceiver` that delivered the
345/// messages, so the active variant and active receiver variant will always
346/// agree.  `commit` skips tokens whose variant does not match the active
347/// backend (defensive; should not occur in practice).
348///
349/// `#[non_exhaustive]`: adding a new backend variant later is not a breaking
350/// change. Downstream crates that match on `AnyToken` must include a wildcard
351/// arm.
352#[derive(Debug, Clone)]
353#[non_exhaustive]
354pub enum AnyToken {
355    #[cfg(feature = "transport-kafka")]
356    /// Kafka consumer offset token.
357    Kafka(super::kafka::KafkaToken),
358
359    #[cfg(feature = "transport-grpc")]
360    /// gRPC no-op sequence token.
361    Grpc(super::grpc::GrpcToken),
362
363    #[cfg(feature = "transport-memory")]
364    /// In-memory sequence token.
365    Memory(super::memory::MemoryToken),
366
367    #[cfg(feature = "transport-pipe")]
368    /// Pipe sequence token.
369    Pipe(super::pipe::PipeToken),
370
371    #[cfg(feature = "transport-file")]
372    /// File byte-offset token.
373    File(super::file::FileToken),
374
375    #[cfg(feature = "transport-http")]
376    /// HTTP sequence token.
377    Http(super::http::HttpToken),
378
379    #[cfg(feature = "transport-redis")]
380    /// Redis XACK entry token.
381    Redis(super::redis_transport::RedisToken),
382}
383
384impl std::fmt::Display for AnyToken {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            #[cfg(feature = "transport-kafka")]
388            Self::Kafka(t) => std::fmt::Display::fmt(t, f),
389            #[cfg(feature = "transport-grpc")]
390            Self::Grpc(t) => std::fmt::Display::fmt(t, f),
391            #[cfg(feature = "transport-memory")]
392            Self::Memory(t) => std::fmt::Display::fmt(t, f),
393            #[cfg(feature = "transport-pipe")]
394            Self::Pipe(t) => std::fmt::Display::fmt(t, f),
395            #[cfg(feature = "transport-file")]
396            Self::File(t) => std::fmt::Display::fmt(t, f),
397            #[cfg(feature = "transport-http")]
398            Self::Http(t) => std::fmt::Display::fmt(t, f),
399            #[cfg(feature = "transport-redis")]
400            Self::Redis(t) => std::fmt::Display::fmt(t, f),
401            #[allow(unreachable_patterns)]
402            _ => write!(f, "none"),
403        }
404    }
405}
406
407impl CommitToken for AnyToken {}
408
409// ---------------------------------------------------------------------------
410// AnyReceiver -- type-erased transport receiver, mirroring AnySender.
411// ---------------------------------------------------------------------------
412
413/// Type-erased transport receiver.
414///
415/// Wraps any concrete transport receiver behind an enum for runtime
416/// dispatch. Created by the transport factory from config, mirroring
417/// [`AnySender`].
418///
419/// Uses enum dispatch (not trait objects) because [`TransportReceiver`]
420/// has `impl Future` return types and an associated `Token` type that
421/// prevent `dyn` dispatch.
422///
423/// The [`AnyReceiver::recv`] method wraps each backend token in the
424/// corresponding [`AnyToken`] variant.  [`AnyReceiver::commit`] extracts
425/// the inner tokens for the active backend and forwards to that backend's
426/// own `commit` -- tokens from a different variant are silently skipped
427/// (they cannot legitimately appear but the code stays defensive).
428pub enum AnyReceiver {
429    #[cfg(feature = "transport-kafka")]
430    Kafka(super::kafka::KafkaTransport),
431
432    #[cfg(feature = "transport-grpc")]
433    Grpc(super::grpc::GrpcTransport),
434
435    #[cfg(feature = "transport-memory")]
436    Memory(super::memory::MemoryTransport),
437
438    #[cfg(feature = "transport-pipe")]
439    Pipe(super::pipe::PipeTransport),
440
441    #[cfg(feature = "transport-file")]
442    File(super::file::FileTransport),
443
444    #[cfg(feature = "transport-http")]
445    Http(super::http::HttpTransport),
446
447    #[cfg(feature = "transport-redis")]
448    Redis(super::redis_transport::RedisTransport),
449}
450
451impl TransportBase for AnyReceiver {
452    async fn close(&self) -> TransportResult<()> {
453        match self {
454            #[cfg(feature = "transport-kafka")]
455            Self::Kafka(t) => t.close().await,
456            #[cfg(feature = "transport-grpc")]
457            Self::Grpc(t) => t.close().await,
458            #[cfg(feature = "transport-memory")]
459            Self::Memory(t) => t.close().await,
460            #[cfg(feature = "transport-pipe")]
461            Self::Pipe(t) => t.close().await,
462            #[cfg(feature = "transport-file")]
463            Self::File(t) => t.close().await,
464            #[cfg(feature = "transport-http")]
465            Self::Http(t) => t.close().await,
466            #[cfg(feature = "transport-redis")]
467            Self::Redis(t) => t.close().await,
468            #[allow(unreachable_patterns)]
469            _ => Err(TransportError::Config(
470                "no transport variant enabled".into(),
471            )),
472        }
473    }
474
475    fn is_healthy(&self) -> bool {
476        match self {
477            #[cfg(feature = "transport-kafka")]
478            Self::Kafka(t) => t.is_healthy(),
479            #[cfg(feature = "transport-grpc")]
480            Self::Grpc(t) => t.is_healthy(),
481            #[cfg(feature = "transport-memory")]
482            Self::Memory(t) => t.is_healthy(),
483            #[cfg(feature = "transport-pipe")]
484            Self::Pipe(t) => t.is_healthy(),
485            #[cfg(feature = "transport-file")]
486            Self::File(t) => t.is_healthy(),
487            #[cfg(feature = "transport-http")]
488            Self::Http(t) => t.is_healthy(),
489            #[cfg(feature = "transport-redis")]
490            Self::Redis(t) => t.is_healthy(),
491            #[allow(unreachable_patterns)]
492            _ => false,
493        }
494    }
495
496    fn name(&self) -> &'static str {
497        match self {
498            #[cfg(feature = "transport-kafka")]
499            Self::Kafka(t) => t.name(),
500            #[cfg(feature = "transport-grpc")]
501            Self::Grpc(t) => t.name(),
502            #[cfg(feature = "transport-memory")]
503            Self::Memory(t) => t.name(),
504            #[cfg(feature = "transport-pipe")]
505            Self::Pipe(t) => t.name(),
506            #[cfg(feature = "transport-file")]
507            Self::File(t) => t.name(),
508            #[cfg(feature = "transport-http")]
509            Self::Http(t) => t.name(),
510            #[cfg(feature = "transport-redis")]
511            Self::Redis(t) => t.name(),
512            #[allow(unreachable_patterns)]
513            _ => "none",
514        }
515    }
516}
517
518/// Map a backend's `WorkBatch<BackendToken>` into `WorkBatch<AnyToken>` using
519/// the provided variant constructor.  Each `commit_tokens` entry is wrapped in
520/// the matching [`AnyToken`] variant; `records` and `dlq_entries` move straight
521/// through (the record payload `Bytes` is a refcount bump, never a copy).
522#[cfg(any(
523    feature = "transport-kafka",
524    feature = "transport-grpc",
525    feature = "transport-memory",
526    feature = "transport-pipe",
527    feature = "transport-file",
528    feature = "transport-http",
529    feature = "transport-redis"
530))]
531fn wrap_batch<B: CommitToken>(
532    batch: WorkBatch<B>,
533    wrap_token: impl Fn(B) -> AnyToken,
534) -> WorkBatch<AnyToken> {
535    let commit_tokens = batch.commit_tokens.into_iter().map(wrap_token).collect();
536    WorkBatch::new(batch.records, commit_tokens).with_dlq_entries(batch.dlq_entries)
537}
538
539impl TransportReceiver for AnyReceiver {
540    type Token = AnyToken;
541
542    #[cfg_attr(
543        not(any(
544            feature = "transport-kafka",
545            feature = "transport-grpc",
546            feature = "transport-memory",
547            feature = "transport-pipe",
548            feature = "transport-file",
549            feature = "transport-http",
550            feature = "transport-redis"
551        )),
552        allow(unused_variables)
553    )]
554    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<AnyToken>> {
555        match self {
556            #[cfg(feature = "transport-kafka")]
557            Self::Kafka(t) => {
558                let batch = t.recv(max).await?;
559                Ok(wrap_batch(batch, AnyToken::Kafka))
560            }
561            #[cfg(feature = "transport-grpc")]
562            Self::Grpc(t) => {
563                let batch = t.recv(max).await?;
564                Ok(wrap_batch(batch, AnyToken::Grpc))
565            }
566            #[cfg(feature = "transport-memory")]
567            Self::Memory(t) => {
568                let batch = t.recv(max).await?;
569                Ok(wrap_batch(batch, AnyToken::Memory))
570            }
571            #[cfg(feature = "transport-pipe")]
572            Self::Pipe(t) => {
573                let batch = t.recv(max).await?;
574                Ok(wrap_batch(batch, AnyToken::Pipe))
575            }
576            #[cfg(feature = "transport-file")]
577            Self::File(t) => {
578                let batch = t.recv(max).await?;
579                Ok(wrap_batch(batch, AnyToken::File))
580            }
581            #[cfg(feature = "transport-http")]
582            Self::Http(t) => {
583                let batch = t.recv(max).await?;
584                Ok(wrap_batch(batch, AnyToken::Http))
585            }
586            #[cfg(feature = "transport-redis")]
587            Self::Redis(t) => {
588                let batch = t.recv(max).await?;
589                Ok(wrap_batch(batch, AnyToken::Redis))
590            }
591            #[allow(unreachable_patterns)]
592            _ => Err(TransportError::Config(
593                "no transport variant enabled".into(),
594            )),
595        }
596    }
597
598    /// Forward the byte-aware recv to each inner transport so the governed
599    /// driver's byte budget reaches the transport that can honour it (Kafka's
600    /// recv-arena). Transports without a byte-aware override fall back to the
601    /// trait default (record-bounded `recv`), which is correct for the
602    /// one-record-at-a-time channel/stream transports.
603    #[cfg_attr(
604        not(any(
605            feature = "transport-kafka",
606            feature = "transport-grpc",
607            feature = "transport-memory",
608            feature = "transport-pipe",
609            feature = "transport-file",
610            feature = "transport-http",
611            feature = "transport-redis"
612        )),
613        allow(unused_variables)
614    )]
615    async fn recv_limited(
616        &self,
617        limits: super::traits::RecvLimits,
618    ) -> TransportResult<WorkBatch<AnyToken>> {
619        match self {
620            #[cfg(feature = "transport-kafka")]
621            Self::Kafka(t) => {
622                let batch = t.recv_limited(limits).await?;
623                Ok(wrap_batch(batch, AnyToken::Kafka))
624            }
625            #[cfg(feature = "transport-grpc")]
626            Self::Grpc(t) => {
627                let batch = t.recv_limited(limits).await?;
628                Ok(wrap_batch(batch, AnyToken::Grpc))
629            }
630            #[cfg(feature = "transport-memory")]
631            Self::Memory(t) => {
632                let batch = t.recv_limited(limits).await?;
633                Ok(wrap_batch(batch, AnyToken::Memory))
634            }
635            #[cfg(feature = "transport-pipe")]
636            Self::Pipe(t) => {
637                let batch = t.recv_limited(limits).await?;
638                Ok(wrap_batch(batch, AnyToken::Pipe))
639            }
640            #[cfg(feature = "transport-file")]
641            Self::File(t) => {
642                let batch = t.recv_limited(limits).await?;
643                Ok(wrap_batch(batch, AnyToken::File))
644            }
645            #[cfg(feature = "transport-http")]
646            Self::Http(t) => {
647                let batch = t.recv_limited(limits).await?;
648                Ok(wrap_batch(batch, AnyToken::Http))
649            }
650            #[cfg(feature = "transport-redis")]
651            Self::Redis(t) => {
652                let batch = t.recv_limited(limits).await?;
653                Ok(wrap_batch(batch, AnyToken::Redis))
654            }
655            #[allow(unreachable_patterns)]
656            _ => Err(TransportError::Config(
657                "no transport variant enabled".into(),
658            )),
659        }
660    }
661
662    #[cfg_attr(
663        not(any(
664            feature = "transport-kafka",
665            feature = "transport-grpc",
666            feature = "transport-memory",
667            feature = "transport-pipe",
668            feature = "transport-file",
669            feature = "transport-http",
670            feature = "transport-redis"
671        )),
672        allow(unused_variables)
673    )]
674    async fn commit(&self, tokens: &[AnyToken]) -> TransportResult<()> {
675        // Each arm uses `match tok { Variant(x) => Some(x), #[allow(unreachable_patterns)] _ => None }`
676        // rather than `if let`.  When only a single transport feature is enabled, the AnyToken enum
677        // has a single variant, making an `if let` irrefutable (an error under -D warnings).
678        // The explicit wildcard arm with `#[allow(unreachable_patterns)]` avoids that -- it is
679        // genuinely unreachable in the single-feature case but legal.  Tokens from a non-matching
680        // variant indicate a programming error; they are silently filtered out rather than panicking
681        // (defensive behaviour, they cannot legitimately arise from this receiver's recv).
682        match self {
683            #[cfg(feature = "transport-kafka")]
684            Self::Kafka(t) => {
685                let inner: Vec<_> = tokens
686                    .iter()
687                    .filter_map(|tok| match tok {
688                        AnyToken::Kafka(k) => Some(k.clone()),
689                        #[allow(unreachable_patterns)]
690                        _ => None,
691                    })
692                    .collect();
693                t.commit(&inner).await
694            }
695            #[cfg(feature = "transport-grpc")]
696            Self::Grpc(t) => {
697                let inner: Vec<_> = tokens
698                    .iter()
699                    .filter_map(|tok| match tok {
700                        AnyToken::Grpc(g) => Some(g.clone()),
701                        #[allow(unreachable_patterns)]
702                        _ => None,
703                    })
704                    .collect();
705                t.commit(&inner).await
706            }
707            #[cfg(feature = "transport-memory")]
708            Self::Memory(t) => {
709                let inner: Vec<_> = tokens
710                    .iter()
711                    .filter_map(|tok| match tok {
712                        AnyToken::Memory(m) => Some(*m),
713                        #[allow(unreachable_patterns)]
714                        _ => None,
715                    })
716                    .collect();
717                t.commit(&inner).await
718            }
719            #[cfg(feature = "transport-pipe")]
720            Self::Pipe(t) => {
721                let inner: Vec<_> = tokens
722                    .iter()
723                    .filter_map(|tok| match tok {
724                        AnyToken::Pipe(p) => Some(*p),
725                        #[allow(unreachable_patterns)]
726                        _ => None,
727                    })
728                    .collect();
729                t.commit(&inner).await
730            }
731            #[cfg(feature = "transport-file")]
732            Self::File(t) => {
733                let inner: Vec<_> = tokens
734                    .iter()
735                    .filter_map(|tok| match tok {
736                        AnyToken::File(f) => Some(*f),
737                        #[allow(unreachable_patterns)]
738                        _ => None,
739                    })
740                    .collect();
741                t.commit(&inner).await
742            }
743            #[cfg(feature = "transport-http")]
744            Self::Http(t) => {
745                let inner: Vec<_> = tokens
746                    .iter()
747                    .filter_map(|tok| match tok {
748                        AnyToken::Http(h) => Some(h.clone()),
749                        #[allow(unreachable_patterns)]
750                        _ => None,
751                    })
752                    .collect();
753                t.commit(&inner).await
754            }
755            #[cfg(feature = "transport-redis")]
756            Self::Redis(t) => {
757                let inner: Vec<_> = tokens
758                    .iter()
759                    .filter_map(|tok| match tok {
760                        AnyToken::Redis(r) => Some(r.clone()),
761                        #[allow(unreachable_patterns)]
762                        _ => None,
763                    })
764                    .collect();
765                t.commit(&inner).await
766            }
767            #[allow(unreachable_patterns)]
768            _ => Err(TransportError::Config(
769                "no transport variant enabled".into(),
770            )),
771        }
772    }
773}
774
775impl AnyReceiver {
776    /// Create a receiver from the config cascade.
777    ///
778    /// Reads the transport config from the given key in the config
779    /// cascade and creates the appropriate receiver.
780    ///
781    /// # Example config
782    ///
783    /// ```yaml
784    /// transport:
785    ///   input:
786    ///     type: kafka
787    ///     kafka:
788    ///       brokers: ["kafka:9092"]
789    ///       group_id: "my-consumer"
790    /// ```
791    ///
792    /// ```rust,ignore
793    /// let receiver = AnyReceiver::from_config("transport.input").await?;
794    /// ```
795    pub async fn from_config(key: &str) -> TransportResult<Self> {
796        #[cfg(feature = "config")]
797        let config = {
798            let cfg = crate::config::try_get()
799                .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
800            cfg.unmarshal_key::<super::TransportConfig>(key)
801                .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
802        };
803
804        #[cfg(not(feature = "config"))]
805        let config = {
806            let _ = key;
807            super::TransportConfig::default()
808        };
809
810        Self::from_transport_config(&config).await
811    }
812
813    /// Create a receiver from an explicit `TransportConfig`.
814    pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
815        match config.transport_type {
816            #[cfg(feature = "transport-kafka")]
817            TransportType::Kafka => {
818                let kafka_config = config
819                    .kafka
820                    .as_ref()
821                    .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
822                let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
823                Ok(Self::Kafka(transport))
824            }
825
826            #[cfg(feature = "transport-grpc")]
827            TransportType::Grpc => {
828                let grpc_config = config
829                    .grpc
830                    .as_ref()
831                    .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
832                let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
833                Ok(Self::Grpc(transport))
834            }
835
836            #[cfg(feature = "transport-memory")]
837            TransportType::Memory => {
838                let memory_config = config.memory.clone().unwrap_or_default();
839                let transport = super::memory::MemoryTransport::new(&memory_config)?;
840                Ok(Self::Memory(transport))
841            }
842
843            #[cfg(feature = "transport-pipe")]
844            TransportType::Pipe => {
845                let pipe_config = config.pipe.clone().unwrap_or_default();
846                let transport = super::pipe::PipeTransport::new(&pipe_config);
847                Ok(Self::Pipe(transport))
848            }
849
850            #[cfg(feature = "transport-file")]
851            TransportType::File => {
852                let file_config = config
853                    .file
854                    .as_ref()
855                    .ok_or_else(|| TransportError::Config("file config missing".into()))?;
856                let transport = super::file::FileTransport::new(file_config).await?;
857                Ok(Self::File(transport))
858            }
859
860            #[cfg(feature = "transport-http")]
861            TransportType::Http => {
862                let http_config = config
863                    .http
864                    .as_ref()
865                    .ok_or_else(|| TransportError::Config("http config missing".into()))?;
866                let transport = super::http::HttpTransport::new(http_config).await?;
867                Ok(Self::Http(transport))
868            }
869
870            #[cfg(feature = "transport-redis")]
871            TransportType::Redis => {
872                let redis_config = config
873                    .redis
874                    .as_ref()
875                    .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
876                let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
877                Ok(Self::Redis(transport))
878            }
879
880            // Transport types for modules not yet implemented
881            #[allow(unreachable_patterns)]
882            other => Err(TransportError::Config(format!(
883                "transport type '{other}' is not available (feature not enabled or not yet implemented)"
884            ))),
885        }
886    }
887
888    /// Create a governed receiver from the config cascade (`governor` feature).
889    ///
890    /// Identical to [`from_config`](Self::from_config) but threads the supplied
891    /// [`SelfRegulationGovernor`](crate::SelfRegulationGovernor)'s pressure into
892    /// the inbound brake of every backend that can honour it -- the Kafka
893    /// pause-partitions gate and the HTTP/gRPC 503/`unavailable` shed -- so a
894    /// factory-built receiver actually engages the default-on governor instead
895    /// of silently dropping the inbound brake.
896    ///
897    /// Construction order: the `governor` (and its pressure) is built by the
898    /// runtime BEFORE this call, so the pressure latch already exists and is
899    /// merely cloned (cheap `Arc` bump) into each transport here.
900    ///
901    /// # Errors
902    ///
903    /// Same as [`from_config`](Self::from_config).
904    #[cfg(feature = "governor")]
905    pub async fn from_config_with_governor(
906        key: &str,
907        governor: &crate::SelfRegulationGovernor,
908    ) -> TransportResult<Self> {
909        #[cfg(feature = "config")]
910        let config = {
911            let cfg = crate::config::try_get()
912                .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
913            cfg.unmarshal_key::<super::TransportConfig>(key)
914                .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
915        };
916
917        #[cfg(not(feature = "config"))]
918        let config = {
919            let _ = key;
920            super::TransportConfig::default()
921        };
922
923        Self::from_transport_config_with_governor(&config, governor).await
924    }
925
926    /// Create a governed receiver from an explicit `TransportConfig`
927    /// (`governor` feature).
928    ///
929    /// The governor-aware sibling of [`from_transport_config`](Self::from_transport_config).
930    /// Backends that own an inbound brake are wired to the governor's shared
931    /// pressure:
932    ///
933    /// - **Kafka**: the consumer's assigned partitions are paused/resumed via
934    ///   [`SelfRegulationGovernor::attach_kafka_gate`](crate::SelfRegulationGovernor::attach_kafka_gate)
935    ///   (the full `gate_actuator -> InboundGate -> with_inbound_gate` dance).
936    /// - **HTTP / gRPC**: the embedded receive server is built with
937    ///   `with_pressure(Some(governor.pressure()))`, so it sheds with 503 /
938    ///   `Status::unavailable` while the pressure latch holds.
939    ///
940    /// Backends with no inbound brake (memory, pipe, file, redis) construct
941    /// exactly as in [`from_transport_config`](Self::from_transport_config) --
942    /// the byte-budget lever already reaches them through the governed driver.
943    ///
944    /// # Errors
945    ///
946    /// Same as [`from_transport_config`](Self::from_transport_config).
947    #[cfg(feature = "governor")]
948    pub async fn from_transport_config_with_governor(
949        config: &super::TransportConfig,
950        #[cfg_attr(
951            not(any(
952                feature = "transport-kafka",
953                feature = "transport-grpc",
954                feature = "transport-http"
955            )),
956            allow(unused_variables)
957        )]
958        governor: &crate::SelfRegulationGovernor,
959    ) -> TransportResult<Self> {
960        match config.transport_type {
961            #[cfg(feature = "transport-kafka")]
962            TransportType::Kafka => {
963                let kafka_config = config
964                    .kafka
965                    .as_ref()
966                    .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
967                let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
968                // Attach the inbound gate over the governor's shared pressure:
969                // pauses assigned partitions while the latch holds (member stays
970                // in the group -- no rebalance).
971                let transport = governor.attach_kafka_gate(transport);
972                Ok(Self::Kafka(transport))
973            }
974
975            #[cfg(feature = "transport-grpc")]
976            TransportType::Grpc => {
977                let grpc_config = config
978                    .grpc
979                    .as_ref()
980                    .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
981                let transport = super::grpc::GrpcTransport::with_pressure(
982                    grpc_config,
983                    Some(governor.pressure()),
984                )
985                .await?;
986                Ok(Self::Grpc(transport))
987            }
988
989            #[cfg(feature = "transport-http")]
990            TransportType::Http => {
991                let http_config = config
992                    .http
993                    .as_ref()
994                    .ok_or_else(|| TransportError::Config("http config missing".into()))?;
995                let transport = super::http::HttpTransport::with_pressure(
996                    http_config,
997                    Some(governor.pressure()),
998                )
999                .await?;
1000                Ok(Self::Http(transport))
1001            }
1002
1003            // Backends with no inbound brake: construct identically to the
1004            // non-governor path. The byte-budget lever reaches these via the
1005            // governed driver, not an inbound gate.
1006            #[cfg(any(
1007                feature = "transport-memory",
1008                feature = "transport-pipe",
1009                feature = "transport-file",
1010                feature = "transport-redis"
1011            ))]
1012            _ => Self::from_transport_config(config).await,
1013
1014            // No brakeable backend enabled at all: defer entirely to the
1015            // non-governor path (handles the "feature not enabled" error too).
1016            #[allow(unreachable_patterns)]
1017            _ => Self::from_transport_config(config).await,
1018        }
1019    }
1020}
1021
1022// ---------------------------------------------------------------------------
1023// Tests
1024// ---------------------------------------------------------------------------
1025
1026#[cfg(all(test, feature = "transport-memory"))]
1027mod tests {
1028    use super::*;
1029    use crate::transport::memory::{MemoryConfig, MemoryTransport};
1030    use crate::transport::traits::TransportReceiver;
1031
1032    /// End-to-end round-trip: inject a message, recv via `AnyReceiver`,
1033    /// assert token wrapping, then commit and verify the memory transport's
1034    /// committed sequence advances.
1035    ///
1036    /// This exercises the full token wrap + commit re-dispatch path.
1037    #[tokio::test]
1038    async fn any_receiver_memory_recv_commit_round_trip() {
1039        // Build the underlying transport and inject a message.
1040        let inner = MemoryTransport::new(&MemoryConfig::default())
1041            .expect("memory transport must construct with default config");
1042        inner
1043            .inject(Some("events.test"), b"hello from AnyReceiver".to_vec())
1044            .await
1045            .expect("inject must succeed");
1046
1047        // Wrap in AnyReceiver.
1048        let receiver = AnyReceiver::Memory(inner);
1049
1050        assert_eq!(receiver.name(), "memory");
1051        assert!(receiver.is_healthy());
1052
1053        // Recv via AnyReceiver -- must yield a WorkBatch<AnyToken>.
1054        let batch = receiver.recv(10).await.expect("recv must succeed");
1055        assert_eq!(batch.records.len(), 1, "expected exactly one record");
1056        assert_eq!(batch.commit_tokens.len(), 1, "expected one commit token");
1057        assert!(batch.dlq_entries.is_empty(), "no DLQ entries expected");
1058
1059        let record = &batch.records[0];
1060        assert_eq!(record.payload.as_ref(), b"hello from AnyReceiver");
1061        assert_eq!(record.key.as_deref(), Some("events.test"));
1062
1063        // Token must be wrapped in the Memory variant.
1064        let token = &batch.commit_tokens[0];
1065        assert!(
1066            matches!(token, AnyToken::Memory(_)),
1067            "token variant must be AnyToken::Memory, got {token}"
1068        );
1069
1070        // Display delegates to the inner MemoryToken (format: "memory:<seq>").
1071        let display = token.to_string();
1072        assert!(
1073            display.starts_with("memory:"),
1074            "Display must delegate to MemoryToken, got {display}"
1075        );
1076
1077        // Commit the AnyToken slice -- routes back to the MemoryTransport.
1078        let tokens: Vec<AnyToken> = batch.commit_tokens;
1079        let seq_before = if let AnyReceiver::Memory(ref t) = receiver {
1080            t.committed_sequence()
1081        } else {
1082            panic!("must be Memory variant");
1083        };
1084
1085        receiver.commit(&tokens).await.expect("commit must succeed");
1086
1087        // The memory transport tracks the max committed seq; it must have advanced.
1088        if let AnyReceiver::Memory(ref t) = receiver {
1089            let seq_after = t.committed_sequence();
1090            assert!(
1091                seq_after > seq_before || seq_after == 0,
1092                "committed_sequence must advance after commit (before={seq_before}, after={seq_after})"
1093            );
1094        }
1095    }
1096
1097    /// Tokens from the wrong variant are silently ignored by commit --
1098    /// commit must succeed without error even if no tokens match.
1099    #[tokio::test]
1100    async fn any_receiver_commit_ignores_mismatched_variants() {
1101        let inner = MemoryTransport::new(&MemoryConfig::default())
1102            .expect("memory transport must construct with default config");
1103        let receiver = AnyReceiver::Memory(inner);
1104
1105        // A Pipe token delivered to a Memory receiver -- must not panic or error.
1106        #[cfg(feature = "transport-pipe")]
1107        {
1108            let alien_token = AnyToken::Pipe(crate::transport::pipe::PipeToken { seq: 99 });
1109            receiver
1110                .commit(&[alien_token])
1111                .await
1112                .expect("commit with mismatched variant must succeed without error");
1113        }
1114
1115        // Zero tokens -- always a no-op.
1116        receiver
1117            .commit(&[])
1118            .await
1119            .expect("commit with empty slice must succeed");
1120    }
1121}
1122
1123// ---------------------------------------------------------------------------
1124// Governor-aware factory tests (Remediation Phase 6).
1125//
1126// Prove `*_with_governor` actually threads the governor's inbound brake into
1127// the backends that own one, and that the non-governor path is unchanged.
1128// ---------------------------------------------------------------------------
1129
1130#[cfg(all(test, feature = "governor"))]
1131mod governor_tests {
1132    #[cfg(any(
1133        feature = "transport-kafka",
1134        feature = "transport-grpc",
1135        feature = "transport-http",
1136        feature = "transport-memory"
1137    ))]
1138    use super::*;
1139
1140    /// Build a [`SelfRegulationGovernor`] whose single HARD memory source is
1141    /// pinned ABOVE / BELOW `pause_above` (default 0.80) by sizing the guard.
1142    #[cfg(any(
1143        feature = "transport-kafka",
1144        feature = "transport-grpc",
1145        feature = "transport-http",
1146        feature = "transport-memory"
1147    ))]
1148    fn governor(pinned_high: bool) -> crate::SelfRegulationGovernor {
1149        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1150        let guard = std::sync::Arc::new(MemoryGuard::new(MemoryGuardConfig {
1151            limit_bytes: 1000,
1152            pressure_threshold: 0.80,
1153            ..Default::default()
1154        }));
1155        if pinned_high {
1156            guard.add_bytes(950); // 95% -> well above pause_above
1157        } else {
1158            guard.add_bytes(10); // 1% -> well below resume_below
1159        }
1160        crate::SelfRegulationConfig::default()
1161            .build(guard)
1162            .expect("governor enabled by default")
1163    }
1164
1165    /// A factory-built Kafka receiver MUST carry an inbound gate when a governor
1166    /// is supplied. Broker-free: `KafkaTransport::new` lazily connects and an
1167    /// empty topic list means no subscribe/poll happens at construction.
1168    #[cfg(feature = "transport-kafka")]
1169    #[tokio::test]
1170    async fn kafka_governed_receiver_has_inbound_gate() {
1171        let kafka = crate::transport::kafka::KafkaConfig::for_testing(
1172            "localhost:9092",
1173            "phase6-test",
1174            Vec::new(), // no topics -> no subscribe -> broker-free build
1175        );
1176        let cfg = crate::transport::TransportConfig {
1177            transport_type: crate::transport::types::TransportType::Kafka,
1178            kafka: Some(kafka),
1179            ..Default::default()
1180        };
1181
1182        let gov = governor(false);
1183        let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1184            .await
1185            .expect("governed kafka receiver must construct broker-free");
1186
1187        match receiver {
1188            AnyReceiver::Kafka(ref t) => assert!(
1189                t.has_inbound_gate(),
1190                "factory-built Kafka receiver must have the governor's inbound gate attached"
1191            ),
1192            _ => panic!("expected Kafka variant"),
1193        }
1194
1195        // The non-governor constructor must NOT attach a gate (byte-identical
1196        // to pre-Phase-6 behaviour).
1197        let plain = AnyReceiver::from_transport_config(&cfg)
1198            .await
1199            .expect("plain kafka receiver must construct broker-free");
1200        match plain {
1201            AnyReceiver::Kafka(ref t) => assert!(
1202                !t.has_inbound_gate(),
1203                "non-governor constructor must leave the inbound gate unattached"
1204            ),
1205            _ => panic!("expected Kafka variant"),
1206        }
1207    }
1208
1209    /// A factory-built gRPC receiver MUST reject under pressure (governor pinned
1210    /// HIGH) with `Status::unavailable`, surfaced to the client as backpressure.
1211    #[cfg(feature = "transport-grpc")]
1212    #[tokio::test]
1213    async fn grpc_governed_receiver_sheds_under_pressure() {
1214        use crate::transport::traits::{TransportBase, TransportSender};
1215        use crate::transport::types::SendResult;
1216
1217        let server_cfg = crate::transport::grpc::GrpcConfig::server("127.0.0.1:16188");
1218        let cfg = crate::transport::TransportConfig {
1219            transport_type: crate::transport::types::TransportType::Grpc,
1220            grpc: Some(server_cfg),
1221            ..Default::default()
1222        };
1223
1224        let gov = governor(true);
1225        assert!(
1226            gov.pressure().should_hold(),
1227            "pinned-high governor must hold"
1228        );
1229
1230        let server = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1231            .await
1232            .expect("governed grpc receiver must construct");
1233        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1234
1235        let client = crate::transport::grpc::GrpcTransport::new(
1236            &crate::transport::grpc::GrpcConfig::client("http://127.0.0.1:16188"),
1237        )
1238        .await
1239        .expect("grpc client");
1240        let result = client
1241            .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1242            .await;
1243        assert!(
1244            matches!(result, SendResult::Backpressured),
1245            "push under pressure must surface as backpressure, got {result:?}"
1246        );
1247
1248        client.close().await.unwrap();
1249        server.close().await.unwrap();
1250    }
1251
1252    /// A factory-built HTTP receiver MUST shed with 503 under pressure (governor
1253    /// pinned HIGH); the shed request never reaches the queue.
1254    #[cfg(feature = "transport-http")]
1255    #[tokio::test]
1256    async fn http_governed_receiver_sheds_under_pressure() {
1257        use crate::transport::traits::{TransportBase, TransportReceiver};
1258
1259        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1260        let addr = listener.local_addr().unwrap();
1261        drop(listener);
1262
1263        let http_cfg = crate::transport::http::HttpTransportConfig {
1264            listen: Some(addr.to_string()),
1265            recv_timeout_ms: 200,
1266            ..Default::default()
1267        };
1268        let cfg = crate::transport::TransportConfig {
1269            transport_type: crate::transport::types::TransportType::Http,
1270            http: Some(http_cfg),
1271            ..Default::default()
1272        };
1273
1274        let gov = governor(true);
1275        assert!(
1276            gov.pressure().should_hold(),
1277            "pinned-high governor must hold"
1278        );
1279
1280        let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1281            .await
1282            .expect("governed http receiver must construct");
1283        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1284
1285        let client = reqwest::Client::new();
1286        let resp = client
1287            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1288            .body(b"{\"msg\":\"shed\"}".to_vec())
1289            .send()
1290            .await
1291            .unwrap();
1292        assert_eq!(
1293            resp.status(),
1294            reqwest::StatusCode::SERVICE_UNAVAILABLE,
1295            "factory-built HTTP receiver under pressure must shed with 503"
1296        );
1297
1298        let records = receiver.recv(10).await.unwrap().records;
1299        assert!(records.is_empty(), "shed request must not be queued");
1300        receiver.close().await.unwrap();
1301    }
1302
1303    /// Backends with no inbound brake (memory) construct identically through the
1304    /// governor-aware path -- the receiver still works as a plain receiver.
1305    #[cfg(feature = "transport-memory")]
1306    #[tokio::test]
1307    async fn memory_governed_receiver_is_plain() {
1308        use crate::transport::traits::TransportReceiver;
1309
1310        let cfg = crate::transport::TransportConfig {
1311            transport_type: crate::transport::types::TransportType::Memory,
1312            memory: Some(crate::transport::memory::MemoryConfig::default()),
1313            ..Default::default()
1314        };
1315
1316        let gov = governor(false);
1317        let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1318            .await
1319            .expect("governed memory receiver must construct");
1320
1321        assert_eq!(receiver.name(), "memory");
1322        // No gate concept for memory -- recv just returns an empty batch.
1323        let batch = receiver.recv(1).await.expect("recv must succeed");
1324        assert!(batch.records.is_empty(), "no records injected");
1325    }
1326}