1use super::error::{TransportError, TransportResult};
33use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
34use super::types::SendResult;
35#[cfg(any(
36 feature = "transport-kafka",
37 feature = "transport-grpc",
38 feature = "transport-memory",
39 feature = "transport-pipe",
40 feature = "transport-file",
41 feature = "transport-http",
42 feature = "transport-redis"
43))]
44use super::types::TransportType;
45use super::work_batch::{Record, WorkBatch};
46
47pub enum AnySender {
52 #[cfg(feature = "transport-kafka")]
53 Kafka(super::kafka::KafkaTransport),
54
55 #[cfg(feature = "transport-grpc")]
56 Grpc(super::grpc::GrpcTransport),
57
58 #[cfg(feature = "transport-memory")]
59 Memory(super::memory::MemoryTransport),
60
61 #[cfg(feature = "transport-pipe")]
62 Pipe(super::pipe::PipeTransport),
63
64 #[cfg(feature = "transport-file")]
65 File(super::file::FileTransport),
66
67 #[cfg(feature = "transport-http")]
68 Http(super::http::HttpTransport),
69
70 #[cfg(feature = "transport-redis")]
71 Redis(super::redis_transport::RedisTransport),
72}
73
74impl TransportBase for AnySender {
75 async fn close(&self) -> TransportResult<()> {
76 match self {
77 #[cfg(feature = "transport-kafka")]
78 Self::Kafka(t) => t.close().await,
79 #[cfg(feature = "transport-grpc")]
80 Self::Grpc(t) => t.close().await,
81 #[cfg(feature = "transport-memory")]
82 Self::Memory(t) => t.close().await,
83 #[cfg(feature = "transport-pipe")]
84 Self::Pipe(t) => t.close().await,
85 #[cfg(feature = "transport-file")]
86 Self::File(t) => t.close().await,
87 #[cfg(feature = "transport-http")]
88 Self::Http(t) => t.close().await,
89 #[cfg(feature = "transport-redis")]
90 Self::Redis(t) => t.close().await,
91 #[allow(unreachable_patterns)]
92 _ => Err(TransportError::Config(
93 "no transport variant enabled".into(),
94 )),
95 }
96 }
97
98 fn is_healthy(&self) -> bool {
99 match self {
100 #[cfg(feature = "transport-kafka")]
101 Self::Kafka(t) => t.is_healthy(),
102 #[cfg(feature = "transport-grpc")]
103 Self::Grpc(t) => t.is_healthy(),
104 #[cfg(feature = "transport-memory")]
105 Self::Memory(t) => t.is_healthy(),
106 #[cfg(feature = "transport-pipe")]
107 Self::Pipe(t) => t.is_healthy(),
108 #[cfg(feature = "transport-file")]
109 Self::File(t) => t.is_healthy(),
110 #[cfg(feature = "transport-http")]
111 Self::Http(t) => t.is_healthy(),
112 #[cfg(feature = "transport-redis")]
113 Self::Redis(t) => t.is_healthy(),
114 #[allow(unreachable_patterns)]
115 _ => false,
116 }
117 }
118
119 fn name(&self) -> &'static str {
120 match self {
121 #[cfg(feature = "transport-kafka")]
122 Self::Kafka(t) => t.name(),
123 #[cfg(feature = "transport-grpc")]
124 Self::Grpc(t) => t.name(),
125 #[cfg(feature = "transport-memory")]
126 Self::Memory(t) => t.name(),
127 #[cfg(feature = "transport-pipe")]
128 Self::Pipe(t) => t.name(),
129 #[cfg(feature = "transport-file")]
130 Self::File(t) => t.name(),
131 #[cfg(feature = "transport-http")]
132 Self::Http(t) => t.name(),
133 #[cfg(feature = "transport-redis")]
134 Self::Redis(t) => t.name(),
135 #[allow(unreachable_patterns)]
136 _ => "none",
137 }
138 }
139}
140
141impl TransportSender for AnySender {
142 #[cfg_attr(
143 not(any(
144 feature = "transport-kafka",
145 feature = "transport-grpc",
146 feature = "transport-memory",
147 feature = "transport-pipe",
148 feature = "transport-file",
149 feature = "transport-http",
150 feature = "transport-redis"
151 )),
152 allow(unused_variables)
153 )]
154 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
155 match self {
156 #[cfg(feature = "transport-kafka")]
157 Self::Kafka(t) => t.send(key, payload).await,
158 #[cfg(feature = "transport-grpc")]
159 Self::Grpc(t) => t.send(key, payload).await,
160 #[cfg(feature = "transport-memory")]
161 Self::Memory(t) => t.send(key, payload).await,
162 #[cfg(feature = "transport-pipe")]
163 Self::Pipe(t) => t.send(key, payload).await,
164 #[cfg(feature = "transport-file")]
165 Self::File(t) => t.send(key, payload).await,
166 #[cfg(feature = "transport-http")]
167 Self::Http(t) => t.send(key, payload).await,
168 #[cfg(feature = "transport-redis")]
169 Self::Redis(t) => t.send(key, payload).await,
170 #[allow(unreachable_patterns)]
171 _ => SendResult::Fatal(TransportError::Config(
172 "no transport variant enabled".into(),
173 )),
174 }
175 }
176
177 #[cfg_attr(
182 not(any(
183 feature = "transport-kafka",
184 feature = "transport-grpc",
185 feature = "transport-memory",
186 feature = "transport-pipe",
187 feature = "transport-file",
188 feature = "transport-http",
189 feature = "transport-redis"
190 )),
191 allow(unused_variables)
192 )]
193 async fn send_batch(&self, records: &[Record]) -> SendResult {
194 match self {
195 #[cfg(feature = "transport-kafka")]
196 Self::Kafka(t) => t.send_batch(records).await,
197 #[cfg(feature = "transport-grpc")]
198 Self::Grpc(t) => t.send_batch(records).await,
199 #[cfg(feature = "transport-memory")]
200 Self::Memory(t) => t.send_batch(records).await,
201 #[cfg(feature = "transport-pipe")]
202 Self::Pipe(t) => t.send_batch(records).await,
203 #[cfg(feature = "transport-file")]
204 Self::File(t) => t.send_batch(records).await,
205 #[cfg(feature = "transport-http")]
206 Self::Http(t) => t.send_batch(records).await,
207 #[cfg(feature = "transport-redis")]
208 Self::Redis(t) => t.send_batch(records).await,
209 #[allow(unreachable_patterns)]
210 _ => SendResult::Fatal(TransportError::Config(
211 "no transport variant enabled".into(),
212 )),
213 }
214 }
215}
216
217impl AnySender {
218 pub async fn from_config(key: &str) -> TransportResult<Self> {
237 #[cfg(feature = "config")]
238 let config = read_transport_config(key)?;
239
240 #[cfg(not(feature = "config"))]
241 let config = {
242 let _ = key;
243 super::TransportConfig::default()
244 };
245
246 Self::from_transport_config(&config).await
247 }
248
249 pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
251 match config.transport_type {
252 #[cfg(feature = "transport-kafka")]
253 TransportType::Kafka => {
254 let kafka_config = config
255 .kafka
256 .as_ref()
257 .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
258 let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
259 Ok(Self::Kafka(transport))
260 }
261
262 #[cfg(feature = "transport-grpc")]
263 TransportType::Grpc => {
264 let grpc_config = config
265 .grpc
266 .as_ref()
267 .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
268 let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
269 Ok(Self::Grpc(transport))
270 }
271
272 #[cfg(feature = "transport-memory")]
273 TransportType::Memory => {
274 let memory_config = config.memory.clone().unwrap_or_default();
275 let transport = super::memory::MemoryTransport::new(&memory_config)?;
276 Ok(Self::Memory(transport))
277 }
278
279 #[cfg(feature = "transport-pipe")]
280 TransportType::Pipe => {
281 let pipe_config = config.pipe.clone().unwrap_or_default();
282 let transport = super::pipe::PipeTransport::new(&pipe_config);
283 Ok(Self::Pipe(transport))
284 }
285
286 #[cfg(feature = "transport-file")]
287 TransportType::File => {
288 let file_config = config
289 .file
290 .as_ref()
291 .ok_or_else(|| TransportError::Config("file config missing".into()))?;
292 let transport = super::file::FileTransport::new(file_config).await?;
293 Ok(Self::File(transport))
294 }
295
296 #[cfg(feature = "transport-http")]
297 TransportType::Http => {
298 let http_config = config
299 .http
300 .as_ref()
301 .ok_or_else(|| TransportError::Config("http config missing".into()))?;
302 let transport = super::http::HttpTransport::new(http_config).await?;
303 Ok(Self::Http(transport))
304 }
305
306 #[cfg(feature = "transport-redis")]
307 TransportType::Redis => {
308 let redis_config = config
309 .redis
310 .as_ref()
311 .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
312 let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
313 Ok(Self::Redis(transport))
314 }
315
316 #[allow(unreachable_patterns)]
318 other => Err(TransportError::Config(format!(
319 "transport type '{other}' is not available (feature not enabled or not yet implemented)"
320 ))),
321 }
322 }
323}
324
325#[derive(Debug, Clone)]
340#[non_exhaustive]
341pub enum AnyToken {
342 #[cfg(feature = "transport-kafka")]
343 Kafka(super::kafka::KafkaToken),
345
346 #[cfg(feature = "transport-grpc")]
347 Grpc(super::grpc::GrpcToken),
349
350 #[cfg(feature = "transport-memory")]
351 Memory(super::memory::MemoryToken),
353
354 #[cfg(feature = "transport-pipe")]
355 Pipe(super::pipe::PipeToken),
357
358 #[cfg(feature = "transport-file")]
359 File(super::file::FileToken),
361
362 #[cfg(feature = "transport-http")]
363 Http(super::http::HttpToken),
365
366 #[cfg(feature = "transport-redis")]
367 Redis(super::redis_transport::RedisToken),
369}
370
371impl std::fmt::Display for AnyToken {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 match self {
374 #[cfg(feature = "transport-kafka")]
375 Self::Kafka(t) => std::fmt::Display::fmt(t, f),
376 #[cfg(feature = "transport-grpc")]
377 Self::Grpc(t) => std::fmt::Display::fmt(t, f),
378 #[cfg(feature = "transport-memory")]
379 Self::Memory(t) => std::fmt::Display::fmt(t, f),
380 #[cfg(feature = "transport-pipe")]
381 Self::Pipe(t) => std::fmt::Display::fmt(t, f),
382 #[cfg(feature = "transport-file")]
383 Self::File(t) => std::fmt::Display::fmt(t, f),
384 #[cfg(feature = "transport-http")]
385 Self::Http(t) => std::fmt::Display::fmt(t, f),
386 #[cfg(feature = "transport-redis")]
387 Self::Redis(t) => std::fmt::Display::fmt(t, f),
388 #[allow(unreachable_patterns)]
389 _ => write!(f, "none"),
390 }
391 }
392}
393
394impl CommitToken for AnyToken {}
395
396pub enum AnyReceiver {
409 #[cfg(feature = "transport-kafka")]
410 Kafka(super::kafka::KafkaTransport),
411
412 #[cfg(feature = "transport-grpc")]
413 Grpc(super::grpc::GrpcTransport),
414
415 #[cfg(feature = "transport-memory")]
416 Memory(super::memory::MemoryTransport),
417
418 #[cfg(feature = "transport-pipe")]
419 Pipe(super::pipe::PipeTransport),
420
421 #[cfg(feature = "transport-file")]
422 File(super::file::FileTransport),
423
424 #[cfg(feature = "transport-http")]
425 Http(super::http::HttpTransport),
426
427 #[cfg(feature = "transport-redis")]
428 Redis(super::redis_transport::RedisTransport),
429}
430
431impl TransportBase for AnyReceiver {
432 async fn close(&self) -> TransportResult<()> {
433 match self {
434 #[cfg(feature = "transport-kafka")]
435 Self::Kafka(t) => t.close().await,
436 #[cfg(feature = "transport-grpc")]
437 Self::Grpc(t) => t.close().await,
438 #[cfg(feature = "transport-memory")]
439 Self::Memory(t) => t.close().await,
440 #[cfg(feature = "transport-pipe")]
441 Self::Pipe(t) => t.close().await,
442 #[cfg(feature = "transport-file")]
443 Self::File(t) => t.close().await,
444 #[cfg(feature = "transport-http")]
445 Self::Http(t) => t.close().await,
446 #[cfg(feature = "transport-redis")]
447 Self::Redis(t) => t.close().await,
448 #[allow(unreachable_patterns)]
449 _ => Err(TransportError::Config(
450 "no transport variant enabled".into(),
451 )),
452 }
453 }
454
455 fn is_healthy(&self) -> bool {
456 match self {
457 #[cfg(feature = "transport-kafka")]
458 Self::Kafka(t) => t.is_healthy(),
459 #[cfg(feature = "transport-grpc")]
460 Self::Grpc(t) => t.is_healthy(),
461 #[cfg(feature = "transport-memory")]
462 Self::Memory(t) => t.is_healthy(),
463 #[cfg(feature = "transport-pipe")]
464 Self::Pipe(t) => t.is_healthy(),
465 #[cfg(feature = "transport-file")]
466 Self::File(t) => t.is_healthy(),
467 #[cfg(feature = "transport-http")]
468 Self::Http(t) => t.is_healthy(),
469 #[cfg(feature = "transport-redis")]
470 Self::Redis(t) => t.is_healthy(),
471 #[allow(unreachable_patterns)]
472 _ => false,
473 }
474 }
475
476 fn name(&self) -> &'static str {
477 match self {
478 #[cfg(feature = "transport-kafka")]
479 Self::Kafka(t) => t.name(),
480 #[cfg(feature = "transport-grpc")]
481 Self::Grpc(t) => t.name(),
482 #[cfg(feature = "transport-memory")]
483 Self::Memory(t) => t.name(),
484 #[cfg(feature = "transport-pipe")]
485 Self::Pipe(t) => t.name(),
486 #[cfg(feature = "transport-file")]
487 Self::File(t) => t.name(),
488 #[cfg(feature = "transport-http")]
489 Self::Http(t) => t.name(),
490 #[cfg(feature = "transport-redis")]
491 Self::Redis(t) => t.name(),
492 #[allow(unreachable_patterns)]
493 _ => "none",
494 }
495 }
496}
497
498#[cfg(any(
503 feature = "transport-kafka",
504 feature = "transport-grpc",
505 feature = "transport-memory",
506 feature = "transport-pipe",
507 feature = "transport-file",
508 feature = "transport-http",
509 feature = "transport-redis"
510))]
511fn wrap_batch<B: CommitToken>(
512 batch: WorkBatch<B>,
513 wrap_token: impl Fn(B) -> AnyToken,
514) -> WorkBatch<AnyToken> {
515 let commit_tokens = batch.commit_tokens.into_iter().map(wrap_token).collect();
516 WorkBatch::new(batch.records, commit_tokens).with_dlq_entries(batch.dlq_entries)
517}
518
519impl TransportReceiver for AnyReceiver {
520 type Token = AnyToken;
521
522 #[cfg_attr(
523 not(any(
524 feature = "transport-kafka",
525 feature = "transport-grpc",
526 feature = "transport-memory",
527 feature = "transport-pipe",
528 feature = "transport-file",
529 feature = "transport-http",
530 feature = "transport-redis"
531 )),
532 allow(unused_variables)
533 )]
534 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<AnyToken>> {
535 match self {
536 #[cfg(feature = "transport-kafka")]
537 Self::Kafka(t) => {
538 let batch = t.recv(max).await?;
539 Ok(wrap_batch(batch, AnyToken::Kafka))
540 }
541 #[cfg(feature = "transport-grpc")]
542 Self::Grpc(t) => {
543 let batch = t.recv(max).await?;
544 Ok(wrap_batch(batch, AnyToken::Grpc))
545 }
546 #[cfg(feature = "transport-memory")]
547 Self::Memory(t) => {
548 let batch = t.recv(max).await?;
549 Ok(wrap_batch(batch, AnyToken::Memory))
550 }
551 #[cfg(feature = "transport-pipe")]
552 Self::Pipe(t) => {
553 let batch = t.recv(max).await?;
554 Ok(wrap_batch(batch, AnyToken::Pipe))
555 }
556 #[cfg(feature = "transport-file")]
557 Self::File(t) => {
558 let batch = t.recv(max).await?;
559 Ok(wrap_batch(batch, AnyToken::File))
560 }
561 #[cfg(feature = "transport-http")]
562 Self::Http(t) => {
563 let batch = t.recv(max).await?;
564 Ok(wrap_batch(batch, AnyToken::Http))
565 }
566 #[cfg(feature = "transport-redis")]
567 Self::Redis(t) => {
568 let batch = t.recv(max).await?;
569 Ok(wrap_batch(batch, AnyToken::Redis))
570 }
571 #[allow(unreachable_patterns)]
572 _ => Err(TransportError::Config(
573 "no transport variant enabled".into(),
574 )),
575 }
576 }
577
578 #[cfg_attr(
584 not(any(
585 feature = "transport-kafka",
586 feature = "transport-grpc",
587 feature = "transport-memory",
588 feature = "transport-pipe",
589 feature = "transport-file",
590 feature = "transport-http",
591 feature = "transport-redis"
592 )),
593 allow(unused_variables)
594 )]
595 async fn recv_limited(
596 &self,
597 limits: super::traits::RecvLimits,
598 ) -> TransportResult<WorkBatch<AnyToken>> {
599 match self {
600 #[cfg(feature = "transport-kafka")]
601 Self::Kafka(t) => {
602 let batch = t.recv_limited(limits).await?;
603 Ok(wrap_batch(batch, AnyToken::Kafka))
604 }
605 #[cfg(feature = "transport-grpc")]
606 Self::Grpc(t) => {
607 let batch = t.recv_limited(limits).await?;
608 Ok(wrap_batch(batch, AnyToken::Grpc))
609 }
610 #[cfg(feature = "transport-memory")]
611 Self::Memory(t) => {
612 let batch = t.recv_limited(limits).await?;
613 Ok(wrap_batch(batch, AnyToken::Memory))
614 }
615 #[cfg(feature = "transport-pipe")]
616 Self::Pipe(t) => {
617 let batch = t.recv_limited(limits).await?;
618 Ok(wrap_batch(batch, AnyToken::Pipe))
619 }
620 #[cfg(feature = "transport-file")]
621 Self::File(t) => {
622 let batch = t.recv_limited(limits).await?;
623 Ok(wrap_batch(batch, AnyToken::File))
624 }
625 #[cfg(feature = "transport-http")]
626 Self::Http(t) => {
627 let batch = t.recv_limited(limits).await?;
628 Ok(wrap_batch(batch, AnyToken::Http))
629 }
630 #[cfg(feature = "transport-redis")]
631 Self::Redis(t) => {
632 let batch = t.recv_limited(limits).await?;
633 Ok(wrap_batch(batch, AnyToken::Redis))
634 }
635 #[allow(unreachable_patterns)]
636 _ => Err(TransportError::Config(
637 "no transport variant enabled".into(),
638 )),
639 }
640 }
641
642 #[cfg_attr(
643 not(any(
644 feature = "transport-kafka",
645 feature = "transport-grpc",
646 feature = "transport-memory",
647 feature = "transport-pipe",
648 feature = "transport-file",
649 feature = "transport-http",
650 feature = "transport-redis"
651 )),
652 allow(unused_variables)
653 )]
654 async fn commit(&self, tokens: &[AnyToken]) -> TransportResult<()> {
655 match self {
662 #[cfg(feature = "transport-kafka")]
663 Self::Kafka(t) => {
664 t.commit(&extract_tokens(tokens, |tok| match tok {
665 AnyToken::Kafka(k) => Some(k.clone()),
666 #[allow(unreachable_patterns)]
667 _ => None,
668 }))
669 .await
670 }
671 #[cfg(feature = "transport-grpc")]
672 Self::Grpc(t) => {
673 t.commit(&extract_tokens(tokens, |tok| match tok {
674 AnyToken::Grpc(g) => Some(g.clone()),
675 #[allow(unreachable_patterns)]
676 _ => None,
677 }))
678 .await
679 }
680 #[cfg(feature = "transport-memory")]
681 Self::Memory(t) => {
682 t.commit(&extract_tokens(tokens, |tok| match tok {
683 AnyToken::Memory(m) => Some(*m),
684 #[allow(unreachable_patterns)]
685 _ => None,
686 }))
687 .await
688 }
689 #[cfg(feature = "transport-pipe")]
690 Self::Pipe(t) => {
691 t.commit(&extract_tokens(tokens, |tok| match tok {
692 AnyToken::Pipe(p) => Some(*p),
693 #[allow(unreachable_patterns)]
694 _ => None,
695 }))
696 .await
697 }
698 #[cfg(feature = "transport-file")]
699 Self::File(t) => {
700 t.commit(&extract_tokens(tokens, |tok| match tok {
701 AnyToken::File(f) => Some(*f),
702 #[allow(unreachable_patterns)]
703 _ => None,
704 }))
705 .await
706 }
707 #[cfg(feature = "transport-http")]
708 Self::Http(t) => {
709 t.commit(&extract_tokens(tokens, |tok| match tok {
710 AnyToken::Http(h) => Some(h.clone()),
711 #[allow(unreachable_patterns)]
712 _ => None,
713 }))
714 .await
715 }
716 #[cfg(feature = "transport-redis")]
717 Self::Redis(t) => {
718 t.commit(&extract_tokens(tokens, |tok| match tok {
719 AnyToken::Redis(r) => Some(r.clone()),
720 #[allow(unreachable_patterns)]
721 _ => None,
722 }))
723 .await
724 }
725 #[allow(unreachable_patterns)]
726 _ => Err(TransportError::Config(
727 "no transport variant enabled".into(),
728 )),
729 }
730 }
731}
732
733#[cfg(feature = "config")]
736fn read_transport_config(key: &str) -> TransportResult<super::TransportConfig> {
737 let cfg = crate::config::try_get()
738 .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
739 cfg.unmarshal_key::<super::TransportConfig>(key)
740 .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))
741}
742
743#[cfg(any(
748 feature = "transport-kafka",
749 feature = "transport-grpc",
750 feature = "transport-memory",
751 feature = "transport-pipe",
752 feature = "transport-file",
753 feature = "transport-http",
754 feature = "transport-redis",
755))]
756fn extract_tokens<T>(tokens: &[AnyToken], pick: impl Fn(&AnyToken) -> Option<T>) -> Vec<T> {
757 tokens.iter().filter_map(pick).collect()
758}
759
760impl AnyReceiver {
761 pub async fn from_config(key: &str) -> TransportResult<Self> {
781 #[cfg(feature = "config")]
782 let config = read_transport_config(key)?;
783
784 #[cfg(not(feature = "config"))]
785 let config = {
786 let _ = key;
787 super::TransportConfig::default()
788 };
789
790 Self::from_transport_config(&config).await
791 }
792
793 pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
795 match config.transport_type {
796 #[cfg(feature = "transport-kafka")]
797 TransportType::Kafka => {
798 let kafka_config = config
799 .kafka
800 .as_ref()
801 .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
802 let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
803 Ok(Self::Kafka(transport))
804 }
805
806 #[cfg(feature = "transport-grpc")]
807 TransportType::Grpc => {
808 let grpc_config = config
809 .grpc
810 .as_ref()
811 .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
812 let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
813 Ok(Self::Grpc(transport))
814 }
815
816 #[cfg(feature = "transport-memory")]
817 TransportType::Memory => {
818 let memory_config = config.memory.clone().unwrap_or_default();
819 let transport = super::memory::MemoryTransport::new(&memory_config)?;
820 Ok(Self::Memory(transport))
821 }
822
823 #[cfg(feature = "transport-pipe")]
824 TransportType::Pipe => {
825 let pipe_config = config.pipe.clone().unwrap_or_default();
826 let transport = super::pipe::PipeTransport::new(&pipe_config);
827 Ok(Self::Pipe(transport))
828 }
829
830 #[cfg(feature = "transport-file")]
831 TransportType::File => {
832 let file_config = config
833 .file
834 .as_ref()
835 .ok_or_else(|| TransportError::Config("file config missing".into()))?;
836 let transport = super::file::FileTransport::new(file_config).await?;
837 Ok(Self::File(transport))
838 }
839
840 #[cfg(feature = "transport-http")]
841 TransportType::Http => {
842 let http_config = config
843 .http
844 .as_ref()
845 .ok_or_else(|| TransportError::Config("http config missing".into()))?;
846 let transport = super::http::HttpTransport::new(http_config).await?;
847 Ok(Self::Http(transport))
848 }
849
850 #[cfg(feature = "transport-redis")]
851 TransportType::Redis => {
852 let redis_config = config
853 .redis
854 .as_ref()
855 .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
856 let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
857 Ok(Self::Redis(transport))
858 }
859
860 #[allow(unreachable_patterns)]
862 other => Err(TransportError::Config(format!(
863 "transport type '{other}' is not available (feature not enabled or not yet implemented)"
864 ))),
865 }
866 }
867
868 #[cfg(feature = "governor")]
885 pub async fn from_config_with_governor(
886 key: &str,
887 governor: &crate::SelfRegulationGovernor,
888 ) -> TransportResult<Self> {
889 #[cfg(feature = "config")]
890 let config = read_transport_config(key)?;
891
892 #[cfg(not(feature = "config"))]
893 let config = {
894 let _ = key;
895 super::TransportConfig::default()
896 };
897
898 Self::from_transport_config_with_governor(&config, governor).await
899 }
900
901 #[cfg(feature = "governor")]
923 pub async fn from_transport_config_with_governor(
924 config: &super::TransportConfig,
925 #[cfg_attr(
926 not(any(
927 feature = "transport-kafka",
928 feature = "transport-grpc",
929 feature = "transport-http"
930 )),
931 allow(unused_variables)
932 )]
933 governor: &crate::SelfRegulationGovernor,
934 ) -> TransportResult<Self> {
935 match config.transport_type {
936 #[cfg(feature = "transport-kafka")]
937 TransportType::Kafka => {
938 let kafka_config = config
939 .kafka
940 .as_ref()
941 .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
942 let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
943 let transport = governor.attach_kafka_gate(transport);
947 Ok(Self::Kafka(transport))
948 }
949
950 #[cfg(feature = "transport-grpc")]
951 TransportType::Grpc => {
952 let grpc_config = config
953 .grpc
954 .as_ref()
955 .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
956 let transport = super::grpc::GrpcTransport::with_pressure(
957 grpc_config,
958 Some(governor.pressure()),
959 )
960 .await?;
961 Ok(Self::Grpc(transport))
962 }
963
964 #[cfg(feature = "transport-http")]
965 TransportType::Http => {
966 let http_config = config
967 .http
968 .as_ref()
969 .ok_or_else(|| TransportError::Config("http config missing".into()))?;
970 let transport = super::http::HttpTransport::with_pressure(
971 http_config,
972 Some(governor.pressure()),
973 )
974 .await?;
975 Ok(Self::Http(transport))
976 }
977
978 #[cfg(any(
982 feature = "transport-memory",
983 feature = "transport-pipe",
984 feature = "transport-file",
985 feature = "transport-redis"
986 ))]
987 _ => Self::from_transport_config(config).await,
988
989 #[allow(unreachable_patterns)]
992 _ => Self::from_transport_config(config).await,
993 }
994 }
995}
996
997#[cfg(all(test, feature = "transport-memory"))]
1002mod tests {
1003 use super::*;
1004 use crate::transport::memory::{MemoryConfig, MemoryTransport};
1005 use crate::transport::traits::TransportReceiver;
1006
1007 #[tokio::test]
1013 async fn any_receiver_memory_recv_commit_round_trip() {
1014 let inner = MemoryTransport::new(&MemoryConfig::default())
1016 .expect("memory transport must construct with default config");
1017 inner
1018 .inject(Some("events.test"), b"hello from AnyReceiver".to_vec())
1019 .await
1020 .expect("inject must succeed");
1021
1022 let receiver = AnyReceiver::Memory(inner);
1024
1025 assert_eq!(receiver.name(), "memory");
1026 assert!(receiver.is_healthy());
1027
1028 let batch = receiver.recv(10).await.expect("recv must succeed");
1030 assert_eq!(batch.records.len(), 1, "expected exactly one record");
1031 assert_eq!(batch.commit_tokens.len(), 1, "expected one commit token");
1032 assert!(batch.dlq_entries.is_empty(), "no DLQ entries expected");
1033
1034 let record = &batch.records[0];
1035 assert_eq!(record.payload.as_ref(), b"hello from AnyReceiver");
1036 assert_eq!(record.key.as_deref(), Some("events.test"));
1037
1038 let token = &batch.commit_tokens[0];
1040 assert!(
1041 matches!(token, AnyToken::Memory(_)),
1042 "token variant must be AnyToken::Memory, got {token}"
1043 );
1044
1045 let display = token.to_string();
1047 assert!(
1048 display.starts_with("memory:"),
1049 "Display must delegate to MemoryToken, got {display}"
1050 );
1051
1052 let tokens: Vec<AnyToken> = batch.commit_tokens;
1054 #[allow(irrefutable_let_patterns)]
1057 let seq_before = if let AnyReceiver::Memory(ref t) = receiver {
1058 t.committed_sequence()
1059 } else {
1060 panic!("must be Memory variant");
1061 };
1062
1063 receiver.commit(&tokens).await.expect("commit must succeed");
1064
1065 #[allow(irrefutable_let_patterns)]
1067 if let AnyReceiver::Memory(ref t) = receiver {
1068 let seq_after = t.committed_sequence();
1069 assert!(
1070 seq_after > seq_before || seq_after == 0,
1071 "committed_sequence must advance after commit (before={seq_before}, after={seq_after})"
1072 );
1073 }
1074 }
1075
1076 #[tokio::test]
1079 async fn any_receiver_commit_ignores_mismatched_variants() {
1080 let inner = MemoryTransport::new(&MemoryConfig::default())
1081 .expect("memory transport must construct with default config");
1082 let receiver = AnyReceiver::Memory(inner);
1083
1084 #[cfg(feature = "transport-pipe")]
1086 {
1087 let alien_token = AnyToken::Pipe(crate::transport::pipe::PipeToken { seq: 99 });
1088 receiver
1089 .commit(&[alien_token])
1090 .await
1091 .expect("commit with mismatched variant must succeed without error");
1092 }
1093
1094 receiver
1096 .commit(&[])
1097 .await
1098 .expect("commit with empty slice must succeed");
1099 }
1100}
1101
1102#[cfg(all(test, feature = "governor"))]
1110mod governor_tests {
1111 #[cfg(any(
1112 feature = "transport-kafka",
1113 feature = "transport-grpc",
1114 feature = "transport-http",
1115 feature = "transport-memory"
1116 ))]
1117 use super::*;
1118
1119 #[cfg(any(
1122 feature = "transport-kafka",
1123 feature = "transport-grpc",
1124 feature = "transport-http",
1125 feature = "transport-memory"
1126 ))]
1127 fn governor(pinned_high: bool) -> crate::SelfRegulationGovernor {
1128 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1129 let guard = std::sync::Arc::new(MemoryGuard::new(MemoryGuardConfig {
1130 limit_bytes: 1000,
1131 pressure_threshold: 0.80,
1132 ..Default::default()
1133 }));
1134 if pinned_high {
1135 guard.add_bytes(950); } else {
1137 guard.add_bytes(10); }
1139 crate::SelfRegulationConfig::default()
1140 .build(guard)
1141 .expect("governor enabled by default")
1142 }
1143
1144 #[cfg(feature = "transport-kafka")]
1148 #[tokio::test]
1149 async fn kafka_governed_receiver_has_inbound_gate() {
1150 let kafka = crate::transport::kafka::KafkaConfig::for_testing(
1151 "localhost:9092",
1152 "phase6-test",
1153 Vec::new(), );
1155 let cfg = crate::transport::TransportConfig {
1156 transport_type: crate::transport::types::TransportType::Kafka,
1157 kafka: Some(kafka),
1158 ..Default::default()
1159 };
1160
1161 let gov = governor(false);
1162 let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1163 .await
1164 .expect("governed kafka receiver must construct broker-free");
1165
1166 match receiver {
1167 AnyReceiver::Kafka(ref t) => assert!(
1168 t.has_inbound_gate(),
1169 "factory-built Kafka receiver must have the governor's inbound gate attached"
1170 ),
1171 _ => panic!("expected Kafka variant"),
1172 }
1173
1174 let plain = AnyReceiver::from_transport_config(&cfg)
1177 .await
1178 .expect("plain kafka receiver must construct broker-free");
1179 match plain {
1180 AnyReceiver::Kafka(ref t) => assert!(
1181 !t.has_inbound_gate(),
1182 "non-governor constructor must leave the inbound gate unattached"
1183 ),
1184 _ => panic!("expected Kafka variant"),
1185 }
1186 }
1187
1188 #[cfg(feature = "transport-grpc")]
1191 #[tokio::test]
1192 async fn grpc_governed_receiver_sheds_under_pressure() {
1193 use crate::transport::traits::{TransportBase, TransportSender};
1194 use crate::transport::types::SendResult;
1195
1196 let server_cfg = crate::transport::grpc::GrpcConfig::server("127.0.0.1:16188");
1197 let cfg = crate::transport::TransportConfig {
1198 transport_type: crate::transport::types::TransportType::Grpc,
1199 grpc: Some(server_cfg),
1200 ..Default::default()
1201 };
1202
1203 let gov = governor(true);
1204 assert!(
1205 gov.pressure().should_hold(),
1206 "pinned-high governor must hold"
1207 );
1208
1209 let server = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1210 .await
1211 .expect("governed grpc receiver must construct");
1212 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1213
1214 let client = crate::transport::grpc::GrpcTransport::new(
1215 &crate::transport::grpc::GrpcConfig::client("http://127.0.0.1:16188"),
1216 )
1217 .await
1218 .expect("grpc client");
1219 let result = client
1220 .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1221 .await;
1222 assert!(
1223 matches!(result, SendResult::Backpressured),
1224 "push under pressure must surface as backpressure, got {result:?}"
1225 );
1226
1227 client.close().await.unwrap();
1228 server.close().await.unwrap();
1229 }
1230
1231 #[cfg(feature = "transport-http")]
1234 #[tokio::test]
1235 async fn http_governed_receiver_sheds_under_pressure() {
1236 use crate::transport::traits::{TransportBase, TransportReceiver};
1237
1238 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1239 let addr = listener.local_addr().unwrap();
1240 drop(listener);
1241
1242 let http_cfg = crate::transport::http::HttpTransportConfig {
1243 listen: Some(addr.to_string()),
1244 recv_timeout_ms: 200,
1245 ..Default::default()
1246 };
1247 let cfg = crate::transport::TransportConfig {
1248 transport_type: crate::transport::types::TransportType::Http,
1249 http: Some(http_cfg),
1250 ..Default::default()
1251 };
1252
1253 let gov = governor(true);
1254 assert!(
1255 gov.pressure().should_hold(),
1256 "pinned-high governor must hold"
1257 );
1258
1259 let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1260 .await
1261 .expect("governed http receiver must construct");
1262 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1263
1264 let client = reqwest::Client::new();
1265 let resp = client
1266 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1267 .body(b"{\"msg\":\"shed\"}".to_vec())
1268 .send()
1269 .await
1270 .unwrap();
1271 assert_eq!(
1272 resp.status(),
1273 reqwest::StatusCode::SERVICE_UNAVAILABLE,
1274 "factory-built HTTP receiver under pressure must shed with 503"
1275 );
1276
1277 let records = receiver.recv(10).await.unwrap().records;
1278 assert!(records.is_empty(), "shed request must not be queued");
1279 receiver.close().await.unwrap();
1280 }
1281
1282 #[cfg(feature = "transport-memory")]
1285 #[tokio::test]
1286 async fn memory_governed_receiver_is_plain() {
1287 use crate::transport::traits::TransportReceiver;
1288
1289 let cfg = crate::transport::TransportConfig {
1290 transport_type: crate::transport::types::TransportType::Memory,
1291 memory: Some(crate::transport::memory::MemoryConfig::default()),
1292 ..Default::default()
1293 };
1294
1295 let gov = governor(false);
1296 let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1297 .await
1298 .expect("governed memory receiver must construct");
1299
1300 assert_eq!(receiver.name(), "memory");
1301 let batch = receiver.recv(1).await.expect("recv must succeed");
1303 assert!(batch.records.is_empty(), "no records injected");
1304 }
1305}