1use super::error::{TransportError, TransportResult};
33use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
34use super::types::SendResult;
35#[cfg(any(
36 feature = "transport-kafka",
37 feature = "transport-grpc",
38 feature = "transport-memory",
39 feature = "transport-pipe",
40 feature = "transport-file",
41 feature = "transport-http",
42 feature = "transport-redis"
43))]
44use super::types::TransportType;
45use super::work_batch::{Record, WorkBatch};
46
47pub enum AnySender {
55 #[cfg(feature = "transport-kafka")]
56 Kafka(super::kafka::KafkaTransport),
57
58 #[cfg(feature = "transport-grpc")]
59 Grpc(super::grpc::GrpcTransport),
60
61 #[cfg(feature = "transport-memory")]
62 Memory(super::memory::MemoryTransport),
63
64 #[cfg(feature = "transport-pipe")]
65 Pipe(super::pipe::PipeTransport),
66
67 #[cfg(feature = "transport-file")]
68 File(super::file::FileTransport),
69
70 #[cfg(feature = "transport-http")]
71 Http(super::http::HttpTransport),
72
73 #[cfg(feature = "transport-redis")]
74 Redis(super::redis_transport::RedisTransport),
75}
76
77impl TransportBase for AnySender {
78 async fn close(&self) -> TransportResult<()> {
79 match self {
80 #[cfg(feature = "transport-kafka")]
81 Self::Kafka(t) => t.close().await,
82 #[cfg(feature = "transport-grpc")]
83 Self::Grpc(t) => t.close().await,
84 #[cfg(feature = "transport-memory")]
85 Self::Memory(t) => t.close().await,
86 #[cfg(feature = "transport-pipe")]
87 Self::Pipe(t) => t.close().await,
88 #[cfg(feature = "transport-file")]
89 Self::File(t) => t.close().await,
90 #[cfg(feature = "transport-http")]
91 Self::Http(t) => t.close().await,
92 #[cfg(feature = "transport-redis")]
93 Self::Redis(t) => t.close().await,
94 #[allow(unreachable_patterns)]
95 _ => Err(TransportError::Config(
96 "no transport variant enabled".into(),
97 )),
98 }
99 }
100
101 fn is_healthy(&self) -> bool {
102 match self {
103 #[cfg(feature = "transport-kafka")]
104 Self::Kafka(t) => t.is_healthy(),
105 #[cfg(feature = "transport-grpc")]
106 Self::Grpc(t) => t.is_healthy(),
107 #[cfg(feature = "transport-memory")]
108 Self::Memory(t) => t.is_healthy(),
109 #[cfg(feature = "transport-pipe")]
110 Self::Pipe(t) => t.is_healthy(),
111 #[cfg(feature = "transport-file")]
112 Self::File(t) => t.is_healthy(),
113 #[cfg(feature = "transport-http")]
114 Self::Http(t) => t.is_healthy(),
115 #[cfg(feature = "transport-redis")]
116 Self::Redis(t) => t.is_healthy(),
117 #[allow(unreachable_patterns)]
118 _ => false,
119 }
120 }
121
122 fn name(&self) -> &'static str {
123 match self {
124 #[cfg(feature = "transport-kafka")]
125 Self::Kafka(t) => t.name(),
126 #[cfg(feature = "transport-grpc")]
127 Self::Grpc(t) => t.name(),
128 #[cfg(feature = "transport-memory")]
129 Self::Memory(t) => t.name(),
130 #[cfg(feature = "transport-pipe")]
131 Self::Pipe(t) => t.name(),
132 #[cfg(feature = "transport-file")]
133 Self::File(t) => t.name(),
134 #[cfg(feature = "transport-http")]
135 Self::Http(t) => t.name(),
136 #[cfg(feature = "transport-redis")]
137 Self::Redis(t) => t.name(),
138 #[allow(unreachable_patterns)]
139 _ => "none",
140 }
141 }
142}
143
144impl TransportSender for AnySender {
145 #[cfg_attr(
146 not(any(
147 feature = "transport-kafka",
148 feature = "transport-grpc",
149 feature = "transport-memory",
150 feature = "transport-pipe",
151 feature = "transport-file",
152 feature = "transport-http",
153 feature = "transport-redis"
154 )),
155 allow(unused_variables)
156 )]
157 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
158 match self {
159 #[cfg(feature = "transport-kafka")]
160 Self::Kafka(t) => t.send(key, payload).await,
161 #[cfg(feature = "transport-grpc")]
162 Self::Grpc(t) => t.send(key, payload).await,
163 #[cfg(feature = "transport-memory")]
164 Self::Memory(t) => t.send(key, payload).await,
165 #[cfg(feature = "transport-pipe")]
166 Self::Pipe(t) => t.send(key, payload).await,
167 #[cfg(feature = "transport-file")]
168 Self::File(t) => t.send(key, payload).await,
169 #[cfg(feature = "transport-http")]
170 Self::Http(t) => t.send(key, payload).await,
171 #[cfg(feature = "transport-redis")]
172 Self::Redis(t) => t.send(key, payload).await,
173 #[allow(unreachable_patterns)]
174 _ => SendResult::Fatal(TransportError::Config(
175 "no transport variant enabled".into(),
176 )),
177 }
178 }
179
180 #[cfg_attr(
185 not(any(
186 feature = "transport-kafka",
187 feature = "transport-grpc",
188 feature = "transport-memory",
189 feature = "transport-pipe",
190 feature = "transport-file",
191 feature = "transport-http",
192 feature = "transport-redis"
193 )),
194 allow(unused_variables)
195 )]
196 async fn send_batch(&self, records: &[Record]) -> SendResult {
197 match self {
198 #[cfg(feature = "transport-kafka")]
199 Self::Kafka(t) => t.send_batch(records).await,
200 #[cfg(feature = "transport-grpc")]
201 Self::Grpc(t) => t.send_batch(records).await,
202 #[cfg(feature = "transport-memory")]
203 Self::Memory(t) => t.send_batch(records).await,
204 #[cfg(feature = "transport-pipe")]
205 Self::Pipe(t) => t.send_batch(records).await,
206 #[cfg(feature = "transport-file")]
207 Self::File(t) => t.send_batch(records).await,
208 #[cfg(feature = "transport-http")]
209 Self::Http(t) => t.send_batch(records).await,
210 #[cfg(feature = "transport-redis")]
211 Self::Redis(t) => t.send_batch(records).await,
212 #[allow(unreachable_patterns)]
213 _ => SendResult::Fatal(TransportError::Config(
214 "no transport variant enabled".into(),
215 )),
216 }
217 }
218}
219
220impl AnySender {
221 pub async fn from_config(key: &str) -> TransportResult<Self> {
240 #[cfg(feature = "config")]
241 let config = {
242 let cfg = crate::config::try_get()
243 .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
244 cfg.unmarshal_key::<super::TransportConfig>(key)
245 .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
246 };
247
248 #[cfg(not(feature = "config"))]
249 let config = {
250 let _ = key;
251 super::TransportConfig::default()
252 };
253
254 Self::from_transport_config(&config).await
255 }
256
257 pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
259 match config.transport_type {
260 #[cfg(feature = "transport-kafka")]
261 TransportType::Kafka => {
262 let kafka_config = config
263 .kafka
264 .as_ref()
265 .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
266 let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
267 Ok(Self::Kafka(transport))
268 }
269
270 #[cfg(feature = "transport-grpc")]
271 TransportType::Grpc => {
272 let grpc_config = config
273 .grpc
274 .as_ref()
275 .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
276 let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
277 Ok(Self::Grpc(transport))
278 }
279
280 #[cfg(feature = "transport-memory")]
281 TransportType::Memory => {
282 let memory_config = config.memory.clone().unwrap_or_default();
283 let transport = super::memory::MemoryTransport::new(&memory_config)?;
284 Ok(Self::Memory(transport))
285 }
286
287 #[cfg(feature = "transport-pipe")]
288 TransportType::Pipe => {
289 let pipe_config = config.pipe.clone().unwrap_or_default();
290 let transport = super::pipe::PipeTransport::new(&pipe_config);
291 Ok(Self::Pipe(transport))
292 }
293
294 #[cfg(feature = "transport-file")]
295 TransportType::File => {
296 let file_config = config
297 .file
298 .as_ref()
299 .ok_or_else(|| TransportError::Config("file config missing".into()))?;
300 let transport = super::file::FileTransport::new(file_config).await?;
301 Ok(Self::File(transport))
302 }
303
304 #[cfg(feature = "transport-http")]
305 TransportType::Http => {
306 let http_config = config
307 .http
308 .as_ref()
309 .ok_or_else(|| TransportError::Config("http config missing".into()))?;
310 let transport = super::http::HttpTransport::new(http_config).await?;
311 Ok(Self::Http(transport))
312 }
313
314 #[cfg(feature = "transport-redis")]
315 TransportType::Redis => {
316 let redis_config = config
317 .redis
318 .as_ref()
319 .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
320 let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
321 Ok(Self::Redis(transport))
322 }
323
324 #[allow(unreachable_patterns)]
326 other => Err(TransportError::Config(format!(
327 "transport type '{other}' is not available (feature not enabled or not yet implemented)"
328 ))),
329 }
330 }
331}
332
333#[derive(Debug, Clone)]
353#[non_exhaustive]
354pub enum AnyToken {
355 #[cfg(feature = "transport-kafka")]
356 Kafka(super::kafka::KafkaToken),
358
359 #[cfg(feature = "transport-grpc")]
360 Grpc(super::grpc::GrpcToken),
362
363 #[cfg(feature = "transport-memory")]
364 Memory(super::memory::MemoryToken),
366
367 #[cfg(feature = "transport-pipe")]
368 Pipe(super::pipe::PipeToken),
370
371 #[cfg(feature = "transport-file")]
372 File(super::file::FileToken),
374
375 #[cfg(feature = "transport-http")]
376 Http(super::http::HttpToken),
378
379 #[cfg(feature = "transport-redis")]
380 Redis(super::redis_transport::RedisToken),
382}
383
384impl std::fmt::Display for AnyToken {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 #[cfg(feature = "transport-kafka")]
388 Self::Kafka(t) => std::fmt::Display::fmt(t, f),
389 #[cfg(feature = "transport-grpc")]
390 Self::Grpc(t) => std::fmt::Display::fmt(t, f),
391 #[cfg(feature = "transport-memory")]
392 Self::Memory(t) => std::fmt::Display::fmt(t, f),
393 #[cfg(feature = "transport-pipe")]
394 Self::Pipe(t) => std::fmt::Display::fmt(t, f),
395 #[cfg(feature = "transport-file")]
396 Self::File(t) => std::fmt::Display::fmt(t, f),
397 #[cfg(feature = "transport-http")]
398 Self::Http(t) => std::fmt::Display::fmt(t, f),
399 #[cfg(feature = "transport-redis")]
400 Self::Redis(t) => std::fmt::Display::fmt(t, f),
401 #[allow(unreachable_patterns)]
402 _ => write!(f, "none"),
403 }
404 }
405}
406
407impl CommitToken for AnyToken {}
408
409pub enum AnyReceiver {
429 #[cfg(feature = "transport-kafka")]
430 Kafka(super::kafka::KafkaTransport),
431
432 #[cfg(feature = "transport-grpc")]
433 Grpc(super::grpc::GrpcTransport),
434
435 #[cfg(feature = "transport-memory")]
436 Memory(super::memory::MemoryTransport),
437
438 #[cfg(feature = "transport-pipe")]
439 Pipe(super::pipe::PipeTransport),
440
441 #[cfg(feature = "transport-file")]
442 File(super::file::FileTransport),
443
444 #[cfg(feature = "transport-http")]
445 Http(super::http::HttpTransport),
446
447 #[cfg(feature = "transport-redis")]
448 Redis(super::redis_transport::RedisTransport),
449}
450
451impl TransportBase for AnyReceiver {
452 async fn close(&self) -> TransportResult<()> {
453 match self {
454 #[cfg(feature = "transport-kafka")]
455 Self::Kafka(t) => t.close().await,
456 #[cfg(feature = "transport-grpc")]
457 Self::Grpc(t) => t.close().await,
458 #[cfg(feature = "transport-memory")]
459 Self::Memory(t) => t.close().await,
460 #[cfg(feature = "transport-pipe")]
461 Self::Pipe(t) => t.close().await,
462 #[cfg(feature = "transport-file")]
463 Self::File(t) => t.close().await,
464 #[cfg(feature = "transport-http")]
465 Self::Http(t) => t.close().await,
466 #[cfg(feature = "transport-redis")]
467 Self::Redis(t) => t.close().await,
468 #[allow(unreachable_patterns)]
469 _ => Err(TransportError::Config(
470 "no transport variant enabled".into(),
471 )),
472 }
473 }
474
475 fn is_healthy(&self) -> bool {
476 match self {
477 #[cfg(feature = "transport-kafka")]
478 Self::Kafka(t) => t.is_healthy(),
479 #[cfg(feature = "transport-grpc")]
480 Self::Grpc(t) => t.is_healthy(),
481 #[cfg(feature = "transport-memory")]
482 Self::Memory(t) => t.is_healthy(),
483 #[cfg(feature = "transport-pipe")]
484 Self::Pipe(t) => t.is_healthy(),
485 #[cfg(feature = "transport-file")]
486 Self::File(t) => t.is_healthy(),
487 #[cfg(feature = "transport-http")]
488 Self::Http(t) => t.is_healthy(),
489 #[cfg(feature = "transport-redis")]
490 Self::Redis(t) => t.is_healthy(),
491 #[allow(unreachable_patterns)]
492 _ => false,
493 }
494 }
495
496 fn name(&self) -> &'static str {
497 match self {
498 #[cfg(feature = "transport-kafka")]
499 Self::Kafka(t) => t.name(),
500 #[cfg(feature = "transport-grpc")]
501 Self::Grpc(t) => t.name(),
502 #[cfg(feature = "transport-memory")]
503 Self::Memory(t) => t.name(),
504 #[cfg(feature = "transport-pipe")]
505 Self::Pipe(t) => t.name(),
506 #[cfg(feature = "transport-file")]
507 Self::File(t) => t.name(),
508 #[cfg(feature = "transport-http")]
509 Self::Http(t) => t.name(),
510 #[cfg(feature = "transport-redis")]
511 Self::Redis(t) => t.name(),
512 #[allow(unreachable_patterns)]
513 _ => "none",
514 }
515 }
516}
517
518#[cfg(any(
523 feature = "transport-kafka",
524 feature = "transport-grpc",
525 feature = "transport-memory",
526 feature = "transport-pipe",
527 feature = "transport-file",
528 feature = "transport-http",
529 feature = "transport-redis"
530))]
531fn wrap_batch<B: CommitToken>(
532 batch: WorkBatch<B>,
533 wrap_token: impl Fn(B) -> AnyToken,
534) -> WorkBatch<AnyToken> {
535 let commit_tokens = batch.commit_tokens.into_iter().map(wrap_token).collect();
536 WorkBatch::new(batch.records, commit_tokens).with_dlq_entries(batch.dlq_entries)
537}
538
539impl TransportReceiver for AnyReceiver {
540 type Token = AnyToken;
541
542 #[cfg_attr(
543 not(any(
544 feature = "transport-kafka",
545 feature = "transport-grpc",
546 feature = "transport-memory",
547 feature = "transport-pipe",
548 feature = "transport-file",
549 feature = "transport-http",
550 feature = "transport-redis"
551 )),
552 allow(unused_variables)
553 )]
554 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<AnyToken>> {
555 match self {
556 #[cfg(feature = "transport-kafka")]
557 Self::Kafka(t) => {
558 let batch = t.recv(max).await?;
559 Ok(wrap_batch(batch, AnyToken::Kafka))
560 }
561 #[cfg(feature = "transport-grpc")]
562 Self::Grpc(t) => {
563 let batch = t.recv(max).await?;
564 Ok(wrap_batch(batch, AnyToken::Grpc))
565 }
566 #[cfg(feature = "transport-memory")]
567 Self::Memory(t) => {
568 let batch = t.recv(max).await?;
569 Ok(wrap_batch(batch, AnyToken::Memory))
570 }
571 #[cfg(feature = "transport-pipe")]
572 Self::Pipe(t) => {
573 let batch = t.recv(max).await?;
574 Ok(wrap_batch(batch, AnyToken::Pipe))
575 }
576 #[cfg(feature = "transport-file")]
577 Self::File(t) => {
578 let batch = t.recv(max).await?;
579 Ok(wrap_batch(batch, AnyToken::File))
580 }
581 #[cfg(feature = "transport-http")]
582 Self::Http(t) => {
583 let batch = t.recv(max).await?;
584 Ok(wrap_batch(batch, AnyToken::Http))
585 }
586 #[cfg(feature = "transport-redis")]
587 Self::Redis(t) => {
588 let batch = t.recv(max).await?;
589 Ok(wrap_batch(batch, AnyToken::Redis))
590 }
591 #[allow(unreachable_patterns)]
592 _ => Err(TransportError::Config(
593 "no transport variant enabled".into(),
594 )),
595 }
596 }
597
598 #[cfg_attr(
604 not(any(
605 feature = "transport-kafka",
606 feature = "transport-grpc",
607 feature = "transport-memory",
608 feature = "transport-pipe",
609 feature = "transport-file",
610 feature = "transport-http",
611 feature = "transport-redis"
612 )),
613 allow(unused_variables)
614 )]
615 async fn recv_limited(
616 &self,
617 limits: super::traits::RecvLimits,
618 ) -> TransportResult<WorkBatch<AnyToken>> {
619 match self {
620 #[cfg(feature = "transport-kafka")]
621 Self::Kafka(t) => {
622 let batch = t.recv_limited(limits).await?;
623 Ok(wrap_batch(batch, AnyToken::Kafka))
624 }
625 #[cfg(feature = "transport-grpc")]
626 Self::Grpc(t) => {
627 let batch = t.recv_limited(limits).await?;
628 Ok(wrap_batch(batch, AnyToken::Grpc))
629 }
630 #[cfg(feature = "transport-memory")]
631 Self::Memory(t) => {
632 let batch = t.recv_limited(limits).await?;
633 Ok(wrap_batch(batch, AnyToken::Memory))
634 }
635 #[cfg(feature = "transport-pipe")]
636 Self::Pipe(t) => {
637 let batch = t.recv_limited(limits).await?;
638 Ok(wrap_batch(batch, AnyToken::Pipe))
639 }
640 #[cfg(feature = "transport-file")]
641 Self::File(t) => {
642 let batch = t.recv_limited(limits).await?;
643 Ok(wrap_batch(batch, AnyToken::File))
644 }
645 #[cfg(feature = "transport-http")]
646 Self::Http(t) => {
647 let batch = t.recv_limited(limits).await?;
648 Ok(wrap_batch(batch, AnyToken::Http))
649 }
650 #[cfg(feature = "transport-redis")]
651 Self::Redis(t) => {
652 let batch = t.recv_limited(limits).await?;
653 Ok(wrap_batch(batch, AnyToken::Redis))
654 }
655 #[allow(unreachable_patterns)]
656 _ => Err(TransportError::Config(
657 "no transport variant enabled".into(),
658 )),
659 }
660 }
661
662 #[cfg_attr(
663 not(any(
664 feature = "transport-kafka",
665 feature = "transport-grpc",
666 feature = "transport-memory",
667 feature = "transport-pipe",
668 feature = "transport-file",
669 feature = "transport-http",
670 feature = "transport-redis"
671 )),
672 allow(unused_variables)
673 )]
674 async fn commit(&self, tokens: &[AnyToken]) -> TransportResult<()> {
675 match self {
683 #[cfg(feature = "transport-kafka")]
684 Self::Kafka(t) => {
685 let inner: Vec<_> = tokens
686 .iter()
687 .filter_map(|tok| match tok {
688 AnyToken::Kafka(k) => Some(k.clone()),
689 #[allow(unreachable_patterns)]
690 _ => None,
691 })
692 .collect();
693 t.commit(&inner).await
694 }
695 #[cfg(feature = "transport-grpc")]
696 Self::Grpc(t) => {
697 let inner: Vec<_> = tokens
698 .iter()
699 .filter_map(|tok| match tok {
700 AnyToken::Grpc(g) => Some(g.clone()),
701 #[allow(unreachable_patterns)]
702 _ => None,
703 })
704 .collect();
705 t.commit(&inner).await
706 }
707 #[cfg(feature = "transport-memory")]
708 Self::Memory(t) => {
709 let inner: Vec<_> = tokens
710 .iter()
711 .filter_map(|tok| match tok {
712 AnyToken::Memory(m) => Some(*m),
713 #[allow(unreachable_patterns)]
714 _ => None,
715 })
716 .collect();
717 t.commit(&inner).await
718 }
719 #[cfg(feature = "transport-pipe")]
720 Self::Pipe(t) => {
721 let inner: Vec<_> = tokens
722 .iter()
723 .filter_map(|tok| match tok {
724 AnyToken::Pipe(p) => Some(*p),
725 #[allow(unreachable_patterns)]
726 _ => None,
727 })
728 .collect();
729 t.commit(&inner).await
730 }
731 #[cfg(feature = "transport-file")]
732 Self::File(t) => {
733 let inner: Vec<_> = tokens
734 .iter()
735 .filter_map(|tok| match tok {
736 AnyToken::File(f) => Some(*f),
737 #[allow(unreachable_patterns)]
738 _ => None,
739 })
740 .collect();
741 t.commit(&inner).await
742 }
743 #[cfg(feature = "transport-http")]
744 Self::Http(t) => {
745 let inner: Vec<_> = tokens
746 .iter()
747 .filter_map(|tok| match tok {
748 AnyToken::Http(h) => Some(h.clone()),
749 #[allow(unreachable_patterns)]
750 _ => None,
751 })
752 .collect();
753 t.commit(&inner).await
754 }
755 #[cfg(feature = "transport-redis")]
756 Self::Redis(t) => {
757 let inner: Vec<_> = tokens
758 .iter()
759 .filter_map(|tok| match tok {
760 AnyToken::Redis(r) => Some(r.clone()),
761 #[allow(unreachable_patterns)]
762 _ => None,
763 })
764 .collect();
765 t.commit(&inner).await
766 }
767 #[allow(unreachable_patterns)]
768 _ => Err(TransportError::Config(
769 "no transport variant enabled".into(),
770 )),
771 }
772 }
773}
774
775impl AnyReceiver {
776 pub async fn from_config(key: &str) -> TransportResult<Self> {
796 #[cfg(feature = "config")]
797 let config = {
798 let cfg = crate::config::try_get()
799 .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
800 cfg.unmarshal_key::<super::TransportConfig>(key)
801 .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
802 };
803
804 #[cfg(not(feature = "config"))]
805 let config = {
806 let _ = key;
807 super::TransportConfig::default()
808 };
809
810 Self::from_transport_config(&config).await
811 }
812
813 pub async fn from_transport_config(config: &super::TransportConfig) -> TransportResult<Self> {
815 match config.transport_type {
816 #[cfg(feature = "transport-kafka")]
817 TransportType::Kafka => {
818 let kafka_config = config
819 .kafka
820 .as_ref()
821 .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
822 let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
823 Ok(Self::Kafka(transport))
824 }
825
826 #[cfg(feature = "transport-grpc")]
827 TransportType::Grpc => {
828 let grpc_config = config
829 .grpc
830 .as_ref()
831 .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
832 let transport = super::grpc::GrpcTransport::new(grpc_config).await?;
833 Ok(Self::Grpc(transport))
834 }
835
836 #[cfg(feature = "transport-memory")]
837 TransportType::Memory => {
838 let memory_config = config.memory.clone().unwrap_or_default();
839 let transport = super::memory::MemoryTransport::new(&memory_config)?;
840 Ok(Self::Memory(transport))
841 }
842
843 #[cfg(feature = "transport-pipe")]
844 TransportType::Pipe => {
845 let pipe_config = config.pipe.clone().unwrap_or_default();
846 let transport = super::pipe::PipeTransport::new(&pipe_config);
847 Ok(Self::Pipe(transport))
848 }
849
850 #[cfg(feature = "transport-file")]
851 TransportType::File => {
852 let file_config = config
853 .file
854 .as_ref()
855 .ok_or_else(|| TransportError::Config("file config missing".into()))?;
856 let transport = super::file::FileTransport::new(file_config).await?;
857 Ok(Self::File(transport))
858 }
859
860 #[cfg(feature = "transport-http")]
861 TransportType::Http => {
862 let http_config = config
863 .http
864 .as_ref()
865 .ok_or_else(|| TransportError::Config("http config missing".into()))?;
866 let transport = super::http::HttpTransport::new(http_config).await?;
867 Ok(Self::Http(transport))
868 }
869
870 #[cfg(feature = "transport-redis")]
871 TransportType::Redis => {
872 let redis_config = config
873 .redis
874 .as_ref()
875 .ok_or_else(|| TransportError::Config("redis config missing".into()))?;
876 let transport = super::redis_transport::RedisTransport::new(redis_config).await?;
877 Ok(Self::Redis(transport))
878 }
879
880 #[allow(unreachable_patterns)]
882 other => Err(TransportError::Config(format!(
883 "transport type '{other}' is not available (feature not enabled or not yet implemented)"
884 ))),
885 }
886 }
887
888 #[cfg(feature = "governor")]
905 pub async fn from_config_with_governor(
906 key: &str,
907 governor: &crate::SelfRegulationGovernor,
908 ) -> TransportResult<Self> {
909 #[cfg(feature = "config")]
910 let config = {
911 let cfg = crate::config::try_get()
912 .ok_or_else(|| TransportError::Config("config not initialised".into()))?;
913 cfg.unmarshal_key::<super::TransportConfig>(key)
914 .map_err(|e| TransportError::Config(format!("failed to read {key}: {e}")))?
915 };
916
917 #[cfg(not(feature = "config"))]
918 let config = {
919 let _ = key;
920 super::TransportConfig::default()
921 };
922
923 Self::from_transport_config_with_governor(&config, governor).await
924 }
925
926 #[cfg(feature = "governor")]
948 pub async fn from_transport_config_with_governor(
949 config: &super::TransportConfig,
950 #[cfg_attr(
951 not(any(
952 feature = "transport-kafka",
953 feature = "transport-grpc",
954 feature = "transport-http"
955 )),
956 allow(unused_variables)
957 )]
958 governor: &crate::SelfRegulationGovernor,
959 ) -> TransportResult<Self> {
960 match config.transport_type {
961 #[cfg(feature = "transport-kafka")]
962 TransportType::Kafka => {
963 let kafka_config = config
964 .kafka
965 .as_ref()
966 .ok_or_else(|| TransportError::Config("kafka config missing".into()))?;
967 let transport = super::kafka::KafkaTransport::new(kafka_config).await?;
968 let transport = governor.attach_kafka_gate(transport);
972 Ok(Self::Kafka(transport))
973 }
974
975 #[cfg(feature = "transport-grpc")]
976 TransportType::Grpc => {
977 let grpc_config = config
978 .grpc
979 .as_ref()
980 .ok_or_else(|| TransportError::Config("grpc config missing".into()))?;
981 let transport = super::grpc::GrpcTransport::with_pressure(
982 grpc_config,
983 Some(governor.pressure()),
984 )
985 .await?;
986 Ok(Self::Grpc(transport))
987 }
988
989 #[cfg(feature = "transport-http")]
990 TransportType::Http => {
991 let http_config = config
992 .http
993 .as_ref()
994 .ok_or_else(|| TransportError::Config("http config missing".into()))?;
995 let transport = super::http::HttpTransport::with_pressure(
996 http_config,
997 Some(governor.pressure()),
998 )
999 .await?;
1000 Ok(Self::Http(transport))
1001 }
1002
1003 #[cfg(any(
1007 feature = "transport-memory",
1008 feature = "transport-pipe",
1009 feature = "transport-file",
1010 feature = "transport-redis"
1011 ))]
1012 _ => Self::from_transport_config(config).await,
1013
1014 #[allow(unreachable_patterns)]
1017 _ => Self::from_transport_config(config).await,
1018 }
1019 }
1020}
1021
1022#[cfg(all(test, feature = "transport-memory"))]
1027mod tests {
1028 use super::*;
1029 use crate::transport::memory::{MemoryConfig, MemoryTransport};
1030 use crate::transport::traits::TransportReceiver;
1031
1032 #[tokio::test]
1038 async fn any_receiver_memory_recv_commit_round_trip() {
1039 let inner = MemoryTransport::new(&MemoryConfig::default())
1041 .expect("memory transport must construct with default config");
1042 inner
1043 .inject(Some("events.test"), b"hello from AnyReceiver".to_vec())
1044 .await
1045 .expect("inject must succeed");
1046
1047 let receiver = AnyReceiver::Memory(inner);
1049
1050 assert_eq!(receiver.name(), "memory");
1051 assert!(receiver.is_healthy());
1052
1053 let batch = receiver.recv(10).await.expect("recv must succeed");
1055 assert_eq!(batch.records.len(), 1, "expected exactly one record");
1056 assert_eq!(batch.commit_tokens.len(), 1, "expected one commit token");
1057 assert!(batch.dlq_entries.is_empty(), "no DLQ entries expected");
1058
1059 let record = &batch.records[0];
1060 assert_eq!(record.payload.as_ref(), b"hello from AnyReceiver");
1061 assert_eq!(record.key.as_deref(), Some("events.test"));
1062
1063 let token = &batch.commit_tokens[0];
1065 assert!(
1066 matches!(token, AnyToken::Memory(_)),
1067 "token variant must be AnyToken::Memory, got {token}"
1068 );
1069
1070 let display = token.to_string();
1072 assert!(
1073 display.starts_with("memory:"),
1074 "Display must delegate to MemoryToken, got {display}"
1075 );
1076
1077 let tokens: Vec<AnyToken> = batch.commit_tokens;
1079 let seq_before = if let AnyReceiver::Memory(ref t) = receiver {
1080 t.committed_sequence()
1081 } else {
1082 panic!("must be Memory variant");
1083 };
1084
1085 receiver.commit(&tokens).await.expect("commit must succeed");
1086
1087 if let AnyReceiver::Memory(ref t) = receiver {
1089 let seq_after = t.committed_sequence();
1090 assert!(
1091 seq_after > seq_before || seq_after == 0,
1092 "committed_sequence must advance after commit (before={seq_before}, after={seq_after})"
1093 );
1094 }
1095 }
1096
1097 #[tokio::test]
1100 async fn any_receiver_commit_ignores_mismatched_variants() {
1101 let inner = MemoryTransport::new(&MemoryConfig::default())
1102 .expect("memory transport must construct with default config");
1103 let receiver = AnyReceiver::Memory(inner);
1104
1105 #[cfg(feature = "transport-pipe")]
1107 {
1108 let alien_token = AnyToken::Pipe(crate::transport::pipe::PipeToken { seq: 99 });
1109 receiver
1110 .commit(&[alien_token])
1111 .await
1112 .expect("commit with mismatched variant must succeed without error");
1113 }
1114
1115 receiver
1117 .commit(&[])
1118 .await
1119 .expect("commit with empty slice must succeed");
1120 }
1121}
1122
1123#[cfg(all(test, feature = "governor"))]
1131mod governor_tests {
1132 #[cfg(any(
1133 feature = "transport-kafka",
1134 feature = "transport-grpc",
1135 feature = "transport-http",
1136 feature = "transport-memory"
1137 ))]
1138 use super::*;
1139
1140 #[cfg(any(
1143 feature = "transport-kafka",
1144 feature = "transport-grpc",
1145 feature = "transport-http",
1146 feature = "transport-memory"
1147 ))]
1148 fn governor(pinned_high: bool) -> crate::SelfRegulationGovernor {
1149 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1150 let guard = std::sync::Arc::new(MemoryGuard::new(MemoryGuardConfig {
1151 limit_bytes: 1000,
1152 pressure_threshold: 0.80,
1153 ..Default::default()
1154 }));
1155 if pinned_high {
1156 guard.add_bytes(950); } else {
1158 guard.add_bytes(10); }
1160 crate::SelfRegulationConfig::default()
1161 .build(guard)
1162 .expect("governor enabled by default")
1163 }
1164
1165 #[cfg(feature = "transport-kafka")]
1169 #[tokio::test]
1170 async fn kafka_governed_receiver_has_inbound_gate() {
1171 let kafka = crate::transport::kafka::KafkaConfig::for_testing(
1172 "localhost:9092",
1173 "phase6-test",
1174 Vec::new(), );
1176 let cfg = crate::transport::TransportConfig {
1177 transport_type: crate::transport::types::TransportType::Kafka,
1178 kafka: Some(kafka),
1179 ..Default::default()
1180 };
1181
1182 let gov = governor(false);
1183 let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1184 .await
1185 .expect("governed kafka receiver must construct broker-free");
1186
1187 match receiver {
1188 AnyReceiver::Kafka(ref t) => assert!(
1189 t.has_inbound_gate(),
1190 "factory-built Kafka receiver must have the governor's inbound gate attached"
1191 ),
1192 _ => panic!("expected Kafka variant"),
1193 }
1194
1195 let plain = AnyReceiver::from_transport_config(&cfg)
1198 .await
1199 .expect("plain kafka receiver must construct broker-free");
1200 match plain {
1201 AnyReceiver::Kafka(ref t) => assert!(
1202 !t.has_inbound_gate(),
1203 "non-governor constructor must leave the inbound gate unattached"
1204 ),
1205 _ => panic!("expected Kafka variant"),
1206 }
1207 }
1208
1209 #[cfg(feature = "transport-grpc")]
1212 #[tokio::test]
1213 async fn grpc_governed_receiver_sheds_under_pressure() {
1214 use crate::transport::traits::{TransportBase, TransportSender};
1215 use crate::transport::types::SendResult;
1216
1217 let server_cfg = crate::transport::grpc::GrpcConfig::server("127.0.0.1:16188");
1218 let cfg = crate::transport::TransportConfig {
1219 transport_type: crate::transport::types::TransportType::Grpc,
1220 grpc: Some(server_cfg),
1221 ..Default::default()
1222 };
1223
1224 let gov = governor(true);
1225 assert!(
1226 gov.pressure().should_hold(),
1227 "pinned-high governor must hold"
1228 );
1229
1230 let server = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1231 .await
1232 .expect("governed grpc receiver must construct");
1233 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1234
1235 let client = crate::transport::grpc::GrpcTransport::new(
1236 &crate::transport::grpc::GrpcConfig::client("http://127.0.0.1:16188"),
1237 )
1238 .await
1239 .expect("grpc client");
1240 let result = client
1241 .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1242 .await;
1243 assert!(
1244 matches!(result, SendResult::Backpressured),
1245 "push under pressure must surface as backpressure, got {result:?}"
1246 );
1247
1248 client.close().await.unwrap();
1249 server.close().await.unwrap();
1250 }
1251
1252 #[cfg(feature = "transport-http")]
1255 #[tokio::test]
1256 async fn http_governed_receiver_sheds_under_pressure() {
1257 use crate::transport::traits::{TransportBase, TransportReceiver};
1258
1259 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1260 let addr = listener.local_addr().unwrap();
1261 drop(listener);
1262
1263 let http_cfg = crate::transport::http::HttpTransportConfig {
1264 listen: Some(addr.to_string()),
1265 recv_timeout_ms: 200,
1266 ..Default::default()
1267 };
1268 let cfg = crate::transport::TransportConfig {
1269 transport_type: crate::transport::types::TransportType::Http,
1270 http: Some(http_cfg),
1271 ..Default::default()
1272 };
1273
1274 let gov = governor(true);
1275 assert!(
1276 gov.pressure().should_hold(),
1277 "pinned-high governor must hold"
1278 );
1279
1280 let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1281 .await
1282 .expect("governed http receiver must construct");
1283 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1284
1285 let client = reqwest::Client::new();
1286 let resp = client
1287 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1288 .body(b"{\"msg\":\"shed\"}".to_vec())
1289 .send()
1290 .await
1291 .unwrap();
1292 assert_eq!(
1293 resp.status(),
1294 reqwest::StatusCode::SERVICE_UNAVAILABLE,
1295 "factory-built HTTP receiver under pressure must shed with 503"
1296 );
1297
1298 let records = receiver.recv(10).await.unwrap().records;
1299 assert!(records.is_empty(), "shed request must not be queued");
1300 receiver.close().await.unwrap();
1301 }
1302
1303 #[cfg(feature = "transport-memory")]
1306 #[tokio::test]
1307 async fn memory_governed_receiver_is_plain() {
1308 use crate::transport::traits::TransportReceiver;
1309
1310 let cfg = crate::transport::TransportConfig {
1311 transport_type: crate::transport::types::TransportType::Memory,
1312 memory: Some(crate::transport::memory::MemoryConfig::default()),
1313 ..Default::default()
1314 };
1315
1316 let gov = governor(false);
1317 let receiver = AnyReceiver::from_transport_config_with_governor(&cfg, &gov)
1318 .await
1319 .expect("governed memory receiver must construct");
1320
1321 assert_eq!(receiver.name(), "memory");
1322 let batch = receiver.recv(1).await.expect("recv must succeed");
1324 assert!(batch.records.is_empty(), "no records injected");
1325 }
1326}