1use crate::{
2 AckingBehavior, Exchange, ExchangeKind, FinalizationKind, Header, HeadersMatchingBehavior,
3 Queue,
4};
5use humantime::parse_duration;
6use serde::de::{DeserializeSeed, Error, IgnoredAny, MapAccess, Visitor};
7use serde::{Deserialize, Deserializer};
8use std::collections::{HashMap, HashSet};
9use std::fmt::Formatter;
10use std::num::{NonZeroU16, NonZeroUsize};
11use std::sync::Arc;
12use std::time::Duration;
13use strut_deserialize::{OneOrMany, Slug, SlugMap};
14use strut_factory::impl_deserialize_field;
15use thiserror::Error;
16
17pub mod queue;
18
19#[derive(Debug, Default, Clone, PartialEq, Eq)]
21pub struct IngressLandscape {
22 ingresses: SlugMap<Ingress>,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct Ingress {
28 name: Arc<str>,
29 exchange: Exchange,
30 queue: Queue,
31 durable: bool,
32 exclusive: bool,
33 auto_delete: bool,
34 batch_size: NonZeroUsize,
35 batch_timeout: Duration,
36 prefetch_count: Option<NonZeroU16>,
37 acking_behavior: AckingBehavior,
38 gibberish_behavior: FinalizationKind,
39 binding_keys: HashSet<String>, binding_headers: HashMap<String, Header>, headers_behavior: HeadersMatchingBehavior, }
44
45impl IngressLandscape {
46 pub fn contains(&self, name: impl AsRef<str>) -> bool {
49 self.ingresses.contains_key(name.as_ref())
50 }
51
52 pub fn get(&self, name: impl AsRef<str>) -> Option<&Ingress> {
56 self.ingresses.get(name.as_ref())
57 }
58
59 pub fn expect(&self, name: impl AsRef<str>) -> &Ingress {
62 let name = name.as_ref();
63
64 self.get(name)
65 .unwrap_or_else(|| panic!("requested an undefined RabbitMQ ingress '{}'", name))
66 }
67}
68
69impl Ingress {
70 pub fn builder() -> IngressBuilder {
72 IngressBuilder::new()
73 }
74}
75
76impl Ingress {
77 pub fn name(&self) -> &str {
79 &self.name
80 }
81
82 pub fn exchange(&self) -> &Exchange {
84 &self.exchange
85 }
86
87 pub fn queue(&self) -> &Queue {
89 &self.queue
90 }
91
92 pub fn durable(&self) -> bool {
94 self.durable
95 }
96
97 pub fn exclusive(&self) -> bool {
99 self.exclusive
100 }
101
102 pub fn auto_delete(&self) -> bool {
104 self.auto_delete
105 }
106
107 pub fn no_ack(&self) -> bool {
115 match self.acking_behavior {
116 AckingBehavior::Manual => false,
117 AckingBehavior::Auto => true,
118 }
119 }
120
121 pub fn delivers_pending(&self) -> bool {
125 match self.acking_behavior {
126 AckingBehavior::Manual => true,
127 AckingBehavior::Auto => false,
128 }
129 }
130
131 pub fn batch_size(&self) -> NonZeroUsize {
138 self.batch_size
139 }
140
141 pub fn batch_timeout(&self) -> Duration {
148 self.batch_timeout
149 }
150
151 pub fn prefetch_count(&self) -> Option<NonZeroU16> {
153 self.prefetch_count
154 }
155
156 pub fn acking_behavior(&self) -> AckingBehavior {
158 self.acking_behavior
159 }
160
161 pub fn gibberish_behavior(&self) -> FinalizationKind {
163 self.gibberish_behavior
164 }
165
166 pub fn binding_keys(&self) -> &HashSet<String> {
168 &self.binding_keys
169 }
170
171 pub fn binding_headers(&self) -> &HashMap<String, Header> {
173 &self.binding_headers
174 }
175
176 pub fn headers_behavior(&self) -> HeadersMatchingBehavior {
178 self.headers_behavior
179 }
180}
181
182#[derive(Debug)]
187pub struct IngressBuilder {
188 name: Arc<str>,
189 exchange: Exchange,
190 queue: Queue,
191 durable: bool,
192 exclusive: bool,
193 auto_delete: bool,
194 batch_size: NonZeroUsize,
195 batch_timeout: Duration,
196 prefetch_count: Option<NonZeroU16>,
197 acking_behavior: AckingBehavior,
198 gibberish_behavior: FinalizationKind,
199 binding_keys: HashSet<String>, binding_headers: HashMap<String, Header>, headers_behavior: HeadersMatchingBehavior, }
204
205impl IngressBuilder {
206 pub fn new() -> Self {
208 Self {
209 name: Arc::from(Ingress::default_name()),
210 exchange: Exchange::default(),
211 queue: Queue::default(),
212 durable: Ingress::default_durable(),
213 exclusive: Ingress::default_exclusive(),
214 auto_delete: Ingress::default_auto_delete(),
215 batch_size: Ingress::default_batch_size(),
216 batch_timeout: Ingress::default_batch_timeout(),
217 prefetch_count: Ingress::default_prefetch_count(),
218 acking_behavior: Ingress::default_acking_behavior(),
219 gibberish_behavior: Ingress::default_gibberish_behavior(),
220 binding_keys: Ingress::default_binding_keys(),
221 binding_headers: Ingress::default_binding_headers(),
222 headers_behavior: Ingress::default_headers_behavior(),
223 }
224 }
225
226 pub fn with_name(self, name: impl AsRef<str>) -> Self {
228 Self {
229 name: Arc::from(name.as_ref()),
230 ..self
231 }
232 }
233
234 pub fn with_exchange(self, exchange: Exchange) -> Self {
236 Self { exchange, ..self }
237 }
238
239 pub fn with_queue(self, queue: Queue) -> Self {
241 Self { queue, ..self }
242 }
243
244 pub fn with_queue_named(self, queue: impl AsRef<str>) -> Self {
247 Self {
248 queue: Queue::named(queue),
249 ..self
250 }
251 }
252
253 pub fn with_durable(self, durable: bool) -> Self {
255 Self { durable, ..self }
256 }
257
258 pub fn with_exclusive(self, exclusive: bool) -> Self {
260 Self { exclusive, ..self }
261 }
262
263 pub fn with_auto_delete(self, auto_delete: bool) -> Self {
265 Self {
266 auto_delete,
267 ..self
268 }
269 }
270
271 pub fn with_batch_size(self, batch_size: NonZeroUsize) -> Self {
273 Self { batch_size, ..self }
274 }
275
276 pub fn with_batch_timeout(self, batch_timeout: Duration) -> Self {
278 Self {
279 batch_timeout,
280 ..self
281 }
282 }
283
284 pub fn with_prefetch_count(self, prefetch_count: Option<NonZeroU16>) -> Self {
286 Self {
287 prefetch_count,
288 ..self
289 }
290 }
291
292 pub fn with_acking_behavior(self, acking_behavior: AckingBehavior) -> Self {
294 Self {
295 acking_behavior,
296 ..self
297 }
298 }
299
300 pub fn with_gibberish_behavior(self, gibberish_behavior: FinalizationKind) -> Self {
302 Self {
303 gibberish_behavior,
304 ..self
305 }
306 }
307
308 pub fn with_binding_key(self, binding_key: impl Into<String>) -> Self {
311 let mut binding_keys = self.binding_keys;
312 binding_keys.insert(binding_key.into());
313
314 Self {
315 binding_keys,
316 ..self
317 }
318 }
319
320 pub fn with_replaced_binding_keys(self, binding_keys: HashSet<String>) -> Self {
325 Self {
326 binding_keys,
327 ..self
328 }
329 }
330
331 pub fn with_binding_header<V>(self, key: impl Into<String>, value: V) -> Self
334 where
335 Header: From<V>,
336 {
337 let mut binding_headers = self.binding_headers;
338 binding_headers.insert(key.into(), Header::from(value));
339
340 Self {
341 binding_headers,
342 ..self
343 }
344 }
345
346 pub fn with_replaced_binding_headers(self, binding_headers: HashMap<String, Header>) -> Self {
351 Self {
352 binding_headers,
353 ..self
354 }
355 }
356
357 pub fn with_matching_all_headers(self) -> Self {
360 Self {
361 headers_behavior: HeadersMatchingBehavior::All,
362 ..self
363 }
364 }
365
366 pub fn with_matching_any_headers(self) -> Self {
369 Self {
370 headers_behavior: HeadersMatchingBehavior::Any,
371 ..self
372 }
373 }
374
375 pub fn with_headers_behavior(self, headers_behavior: HeadersMatchingBehavior) -> Self {
377 Self {
378 headers_behavior,
379 ..self
380 }
381 }
382
383 pub fn build(mut self) -> Result<Ingress, IngressError> {
386 if let Some(implicit_binding_key) = self.maybe_implicit_binding_key() {
388 self.binding_keys = implicit_binding_key;
389 }
390
391 self.validate()?;
392
393 Ok(Ingress {
394 name: self.name,
395 exchange: self.exchange,
396 queue: self.queue,
397 durable: self.durable,
398 exclusive: self.exclusive,
399 auto_delete: self.auto_delete,
400 batch_size: self.batch_size,
401 batch_timeout: self.batch_timeout,
402 prefetch_count: self.prefetch_count,
403 acking_behavior: self.acking_behavior,
404 gibberish_behavior: self.gibberish_behavior,
405 binding_keys: self.binding_keys,
406 binding_headers: self.binding_headers,
407 headers_behavior: self.headers_behavior,
408 })
409 }
410
411 fn maybe_implicit_binding_key(&self) -> Option<HashSet<String>> {
412 let exchange_allows_implicit_binding_key = match self.exchange.kind() {
414 ExchangeKind::Direct | ExchangeKind::Topic => !self.exchange.is_default(),
415 _ => false,
416 };
417
418 if exchange_allows_implicit_binding_key
420 && self.binding_keys.is_empty()
421 && !self.queue.is_empty()
422 {
423 return Some(HashSet::from([self.queue.name().to_string()]));
425 }
426
427 None
428 }
429
430 fn validate(&self) -> Result<(), IngressError> {
431 if self.acking_behavior == AckingBehavior::Manual {
432 if let Some(prefetch_count) = self.prefetch_count {
433 if usize::from(self.batch_size) > (u16::from(prefetch_count) as usize) {
434 return Err(IngressError::BatchSizeGreaterThanPrefetchCount {
435 ingress: self.name.to_string(),
436 prefetch_count,
437 batch_size: self.batch_size,
438 });
439 }
440 } else {
441 if self.batch_size > NonZeroUsize::MIN {
442 return Err(IngressError::BatchSizeWithoutPrefetchCount {
443 ingress: self.name.to_string(),
444 batch_size: self.batch_size,
445 });
446 }
447 }
448 }
449
450 if self.exchange.is_default() {
451 self.validate_default_exchange()?;
452 } else {
453 self.validate_non_default_exchange()?;
454 }
455
456 Ok(())
457 }
458
459 fn validate_default_exchange(&self) -> Result<(), IngressError> {
460 if self.queue.is_empty() {
462 return Err(IngressError::DefaultExchangeRequiresQueueName {
463 ingress: self.name.to_string(),
464 });
465 }
466
467 if !self.binding_keys.is_empty() {
469 return Err(IngressError::DefaultExchangeCannotHaveBindingKeys {
470 ingress: self.name.to_string(),
471 });
472 }
473
474 if !self.binding_headers.is_empty() {
476 return Err(IngressError::DefaultExchangeCannotHaveBindingHeaders {
477 ingress: self.name.to_string(),
478 });
479 }
480
481 Ok(())
482 }
483
484 fn validate_non_default_exchange(&self) -> Result<(), IngressError> {
485 match self.exchange.kind() {
486 ExchangeKind::Direct | ExchangeKind::Topic => self.validate_binding_keys()?,
487 ExchangeKind::Headers => self.validate_binding_headers()?,
488 ExchangeKind::Fanout | ExchangeKind::HashKey | ExchangeKind::HashId => {
489 self.validate_no_bindings()?
490 }
491 };
492
493 Ok(())
494 }
495
496 fn validate_binding_keys(&self) -> Result<(), IngressError> {
497 self.validate_no_binding_headers()?;
498
499 if self.binding_keys.is_empty() {
500 return Err(IngressError::ExchangeKindRequiresBindingKeys {
501 ingress: self.name.to_string(),
502 kind: self.exchange.kind(),
503 });
504 }
505
506 for key in &self.binding_keys {
507 if key.is_empty() {
508 return Err(IngressError::ExchangeKindCannotHaveEmptyBindingKey {
509 ingress: self.name.to_string(),
510 kind: self.exchange.kind(),
511 });
512 }
513 }
514
515 Ok(())
516 }
517
518 fn validate_binding_headers(&self) -> Result<(), IngressError> {
519 self.validate_no_binding_keys()?;
520
521 if self.binding_headers.is_empty() {
522 return Err(IngressError::ExchangeKindRequiresBindingHeaders {
523 ingress: self.name.to_string(),
524 kind: self.exchange.kind(),
525 });
526 }
527
528 for (key, value) in &self.binding_headers {
529 if key.is_empty() || value.is_empty() {
530 return Err(IngressError::ExchangeKindCannotHaveEmptyBindingHeader {
531 ingress: self.name.to_string(),
532 kind: self.exchange.kind(),
533 });
534 }
535 }
536
537 Ok(())
538 }
539
540 fn validate_no_binding_keys(&self) -> Result<(), IngressError> {
541 if !self.binding_keys.is_empty() {
542 return Err(IngressError::ExchangeKindCannotHaveBindingKeys {
543 ingress: self.name.to_string(),
544 kind: self.exchange.kind(),
545 });
546 }
547
548 Ok(())
549 }
550
551 fn validate_no_binding_headers(&self) -> Result<(), IngressError> {
552 if !self.binding_headers.is_empty() {
553 return Err(IngressError::ExchangeKindCannotHaveBindingHeaders {
554 ingress: self.name.to_string(),
555 kind: self.exchange.kind(),
556 });
557 }
558
559 Ok(())
560 }
561
562 fn validate_no_bindings(&self) -> Result<(), IngressError> {
563 self.validate_no_binding_keys()?;
564 self.validate_no_binding_headers()?;
565
566 Ok(())
567 }
568}
569
570impl Ingress {
571 fn default_name() -> &'static str {
572 "default"
573 }
574
575 fn default_durable() -> bool {
576 false
577 }
578
579 fn default_exclusive() -> bool {
580 false
581 }
582
583 fn default_auto_delete() -> bool {
584 false
585 }
586
587 fn default_batch_size() -> NonZeroUsize {
588 NonZeroUsize::MIN
589 }
590
591 fn default_batch_timeout() -> Duration {
592 Duration::from_millis(250)
593 }
594
595 fn default_prefetch_count() -> Option<NonZeroU16> {
596 None
597 }
598
599 fn default_acking_behavior() -> AckingBehavior {
600 AckingBehavior::Manual
601 }
602
603 fn default_gibberish_behavior() -> FinalizationKind {
604 FinalizationKind::Complete
605 }
606
607 fn default_binding_keys() -> HashSet<String> {
608 HashSet::default()
609 }
610
611 fn default_binding_headers() -> HashMap<String, Header> {
612 HashMap::default()
613 }
614
615 fn default_headers_behavior() -> HeadersMatchingBehavior {
616 HeadersMatchingBehavior::All
617 }
618}
619
620#[cfg(test)]
621impl Default for Ingress {
622 fn default() -> Self {
623 Self {
624 name: Arc::from(""),
625 exchange: Exchange::default(),
626 queue: Queue::default(),
627 durable: Self::default_durable(),
628 exclusive: Self::default_exclusive(),
629 auto_delete: Self::default_auto_delete(),
630 batch_size: Self::default_batch_size(),
631 batch_timeout: Self::default_batch_timeout(),
632 prefetch_count: Self::default_prefetch_count(),
633 acking_behavior: Self::default_acking_behavior(),
634 gibberish_behavior: Self::default_gibberish_behavior(),
635 binding_keys: Self::default_binding_keys(),
636 binding_headers: Self::default_binding_headers(),
637 headers_behavior: Self::default_headers_behavior(),
638 }
639 }
640}
641
642#[derive(Error, Debug, PartialEq, Eq)]
644pub enum IngressError {
645 #[error(
647 "invalid batch size configuration for ingress '{ingress}' with prefetch count of {prefetch_count}: expected <= {prefetch_count}, found {batch_size}"
648 )]
649 BatchSizeGreaterThanPrefetchCount {
650 ingress: String,
652 batch_size: NonZeroUsize,
654 prefetch_count: NonZeroU16,
656 },
657
658 #[error(
660 "invalid batch size configuration for ingress '{ingress}' without prefetch count: expected 1, found {batch_size}"
661 )]
662 BatchSizeWithoutPrefetchCount {
663 ingress: String,
665 batch_size: NonZeroUsize,
667 },
668
669 #[error(
671 "invalid configuration for ingress '{ingress}' with default exchange: expected queue name, found none/empty"
672 )]
673 DefaultExchangeRequiresQueueName {
674 ingress: String,
676 },
677
678 #[error(
680 "invalid configuration for ingress '{ingress}' with default exchange: expected no binding keys, found at least one"
681 )]
682 DefaultExchangeCannotHaveBindingKeys {
683 ingress: String,
685 },
686
687 #[error(
689 "invalid configuration for ingress '{ingress}' with default exchange: expected no binding headers, found at least one"
690 )]
691 DefaultExchangeCannotHaveBindingHeaders {
692 ingress: String,
694 },
695
696 #[error(
698 "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected at least one binding key, found none"
699 )]
700 ExchangeKindRequiresBindingKeys {
701 ingress: String,
703 kind: ExchangeKind,
705 },
706
707 #[error(
709 "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected no binding keys, found at least one"
710 )]
711 ExchangeKindCannotHaveBindingKeys {
712 ingress: String,
714 kind: ExchangeKind,
716 },
717
718 #[error(
720 "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected at least one binding header, found none"
721 )]
722 ExchangeKindRequiresBindingHeaders {
723 ingress: String,
725 kind: ExchangeKind,
727 },
728
729 #[error(
731 "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected no binding headers, found at least one"
732 )]
733 ExchangeKindCannotHaveBindingHeaders {
734 ingress: String,
736 kind: ExchangeKind,
738 },
739
740 #[error(
742 "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected non-empty binding keys, found an empty one"
743 )]
744 ExchangeKindCannotHaveEmptyBindingKey {
745 ingress: String,
747 kind: ExchangeKind,
749 },
750
751 #[error(
753 "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected non-empty binding header keys and values, found an empty one"
754 )]
755 ExchangeKindCannotHaveEmptyBindingHeader {
756 ingress: String,
758 kind: ExchangeKind,
760 },
761}
762
763impl AsRef<Ingress> for Ingress {
764 fn as_ref(&self) -> &Ingress {
765 self
766 }
767}
768
769impl AsRef<IngressLandscape> for IngressLandscape {
770 fn as_ref(&self) -> &IngressLandscape {
771 self
772 }
773}
774
775const _: () = {
776 impl<S> FromIterator<(S, Ingress)> for IngressLandscape
777 where
778 S: Into<Slug>,
779 {
780 fn from_iter<T: IntoIterator<Item = (S, Ingress)>>(iter: T) -> Self {
781 let ingresses = iter.into_iter().collect();
782
783 Self { ingresses }
784 }
785 }
786
787 impl<const N: usize, S> From<[(S, Ingress); N]> for IngressLandscape
788 where
789 S: Into<Slug>,
790 {
791 fn from(value: [(S, Ingress); N]) -> Self {
792 value.into_iter().collect()
793 }
794 }
795};
796
797const _: () = {
798 impl<'de> Deserialize<'de> for IngressLandscape {
799 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
800 where
801 D: Deserializer<'de>,
802 {
803 deserializer.deserialize_map(IngressLandscapeVisitor)
804 }
805 }
806
807 struct IngressLandscapeVisitor;
808
809 impl<'de> Visitor<'de> for IngressLandscapeVisitor {
810 type Value = IngressLandscape;
811
812 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
813 formatter.write_str("a map of RabbitMQ ingress landscape")
814 }
815
816 fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
817 where
818 A: MapAccess<'de>,
819 {
820 let grouped = Slug::group_map(map)?;
821 let mut ingresses = HashMap::with_capacity(grouped.len());
822
823 for (key, value) in grouped {
824 let seed = IngressSeed {
825 name: key.original(),
826 };
827 let handle = seed.deserialize(value).map_err(Error::custom)?;
828 ingresses.insert(key, handle);
829 }
830
831 Ok(IngressLandscape {
832 ingresses: SlugMap::new(ingresses),
833 })
834 }
835 }
836
837 struct IngressSeed<'a> {
838 name: &'a str,
839 }
840
841 impl<'de> DeserializeSeed<'de> for IngressSeed<'_> {
842 type Value = Ingress;
843
844 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
845 where
846 D: Deserializer<'de>,
847 {
848 deserializer.deserialize_any(IngressSeedVisitor { name: self.name })
849 }
850 }
851
852 struct IngressSeedVisitor<'a> {
853 name: &'a str,
854 }
855
856 impl<'de> Visitor<'de> for IngressSeedVisitor<'_> {
857 type Value = Ingress;
858
859 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
860 formatter.write_str("a map of RabbitMQ ingress or a string queue name")
861 }
862
863 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
864 where
865 E: Error,
866 {
867 Ingress::builder()
868 .with_name(self.name)
869 .with_queue_named(value)
870 .build()
871 .map_err(Error::custom)
872 }
873
874 fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
875 where
876 A: MapAccess<'de>,
877 {
878 visit_ingress(map, Some(self.name))
879 }
880 }
881
882 impl<'de> Deserialize<'de> for Ingress {
883 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
884 where
885 D: Deserializer<'de>,
886 {
887 deserializer.deserialize_map(IngressVisitor)
888 }
889 }
890
891 struct IngressVisitor;
892
893 impl<'de> Visitor<'de> for IngressVisitor {
894 type Value = Ingress;
895
896 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
897 formatter.write_str("a map of RabbitMQ ingress")
898 }
899
900 fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
901 where
902 A: MapAccess<'de>,
903 {
904 visit_ingress(map, None)
905 }
906 }
907
908 fn visit_ingress<'de, A>(mut map: A, known_name: Option<&str>) -> Result<Ingress, A::Error>
909 where
910 A: MapAccess<'de>,
911 {
912 let mut name: Option<String> = None;
913 let mut exchange = None;
914 let mut queue = None;
915 let mut durable = None;
916 let mut exclusive = None;
917 let mut auto_delete = None;
918 let mut batch_size = None;
919 let mut batch_timeout = None;
920 let mut prefetch_count = None;
921 let mut acking_behavior = None;
922 let mut gibberish_behavior = None;
923 let mut binding_keys: Option<OneOrMany<String>> = None;
924 let mut binding_headers = None;
925 let mut headers_behavior = None;
926
927 while let Some(key) = map.next_key()? {
928 match key {
929 IngressField::name => key.poll(&mut map, &mut name)?,
930 IngressField::exchange => key.poll(&mut map, &mut exchange)?,
931 IngressField::queue => key.poll(&mut map, &mut queue)?,
932 IngressField::durable => key.poll(&mut map, &mut durable)?,
933 IngressField::exclusive => key.poll(&mut map, &mut exclusive)?,
934 IngressField::auto_delete => key.poll(&mut map, &mut auto_delete)?,
935 IngressField::batch_size => key.poll(&mut map, &mut batch_size)?,
936 IngressField::batch_timeout => {
937 let duration_string = map.next_value::<String>()?;
938 let duration = parse_duration(&duration_string).map_err(Error::custom)?;
939 batch_timeout = Some(duration);
940 IgnoredAny
941 }
942 IngressField::prefetch_count => key.poll(&mut map, &mut prefetch_count)?,
943 IngressField::acking_behavior => key.poll(&mut map, &mut acking_behavior)?,
944 IngressField::gibberish_behavior => key.poll(&mut map, &mut gibberish_behavior)?,
945 IngressField::binding_keys => key.poll(&mut map, &mut binding_keys)?,
946 IngressField::binding_headers => key.poll(&mut map, &mut binding_headers)?,
947 IngressField::headers_behavior => key.poll(&mut map, &mut headers_behavior)?,
948 IngressField::__ignore => map.next_value()?,
949 };
950 }
951
952 let name = match known_name {
953 Some(known_name) => known_name,
954 None => name.as_deref().unwrap_or_else(|| Ingress::default_name()),
955 };
956
957 let mut builder = Ingress::builder()
958 .with_name(name)
959 .with_exchange(exchange.unwrap_or_default())
960 .with_queue(queue.unwrap_or_default());
961
962 if let Some(durable) = durable {
963 builder = builder.with_durable(durable);
964 }
965 if let Some(exclusive) = exclusive {
966 builder = builder.with_exclusive(exclusive);
967 }
968 if let Some(auto_delete) = auto_delete {
969 builder = builder.with_auto_delete(auto_delete);
970 }
971 if let Some(batch_size) = batch_size {
972 builder = builder.with_batch_size(batch_size);
973 }
974 if let Some(batch_timeout) = batch_timeout {
975 builder = builder.with_batch_timeout(batch_timeout);
976 }
977 if let Some(prefetch_count) = prefetch_count {
978 builder = builder.with_prefetch_count(prefetch_count);
979 }
980 if let Some(acking_behavior) = acking_behavior {
981 builder = builder.with_acking_behavior(acking_behavior);
982 }
983 if let Some(gibberish_behavior) = gibberish_behavior {
984 builder = builder.with_gibberish_behavior(gibberish_behavior);
985 }
986 if let Some(binding_keys) = binding_keys {
987 builder = builder.with_replaced_binding_keys(binding_keys.into());
988 }
989 if let Some(binding_headers) = binding_headers {
990 builder = builder.with_replaced_binding_headers(binding_headers);
991 }
992 if let Some(headers_behavior) = headers_behavior {
993 builder = builder.with_headers_behavior(headers_behavior);
994 }
995
996 builder.build().map_err(Error::custom)
997 }
998
999 impl_deserialize_field!(
1000 IngressField,
1001 strut_deserialize::Slug::eq_as_slugs,
1002 name,
1003 exchange,
1004 queue,
1005 durable,
1006 exclusive,
1007 auto_delete,
1008 batch_size,
1009 batch_timeout,
1010 prefetch_count | prefetch,
1011 acking_behavior | acking,
1012 gibberish_behavior | gibberish,
1013 binding_keys | binding_key,
1014 binding_headers | binding_header,
1015 headers_behavior | header_behavior,
1016 );
1017};
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022 use pretty_assertions::assert_eq;
1023
1024 #[test]
1025 fn deserialize_from_empty() {
1026 let input = "";
1028
1029 let actual_output = serde_yml::from_str::<Ingress>(input);
1031
1032 assert!(actual_output.is_err());
1034 }
1035
1036 #[test]
1037 fn deserialize_from_string() {
1038 let input = "\"test_ingress\"";
1040
1041 let actual_output = serde_yml::from_str::<Ingress>(input);
1043
1044 assert!(actual_output.is_err());
1046 }
1047
1048 #[test]
1049 fn deserialize_from_name() {
1050 let input = r#"
1052name: test_ingress
1053"#;
1054
1055 let actual_output = serde_yml::from_str::<Ingress>(input);
1057
1058 assert!(actual_output.is_err());
1060 }
1061
1062 #[test]
1063 fn deserialize_from_name_and_exchange() {
1064 let input = r#"
1066name: test_ingress
1067exchange: amq.fanout
1068"#;
1069 let expected_output = Ingress {
1070 name: "test_ingress".into(),
1071 exchange: Exchange::AmqFanout,
1072 ..Default::default()
1073 };
1074
1075 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap();
1077
1078 assert_eq!(expected_output, actual_output);
1080 }
1081
1082 #[test]
1083 fn deserialize_from_name_and_routing_key() {
1084 let input = r#"
1086name: test_ingress
1087queue: test_queue
1088"#;
1089 let expected_output = Ingress {
1090 name: "test_ingress".into(),
1091 queue: Queue::named("test_queue"),
1092 ..Default::default()
1093 };
1094
1095 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap();
1097
1098 assert_eq!(expected_output, actual_output);
1100 }
1101
1102 #[test]
1103 fn deserialize_from_full() {
1104 let input = r#"
1106extra_field: ignored
1107name: test_ingress
1108exchange: amq.topic
1109queue: test_queue
1110durable: true
1111exclusive: true
1112auto_delete: true
1113batch_size: 21
1114batch_timeout: 2s 150ms
1115prefetch_count: 42
1116acking_behavior: manual
1117gibberish_behavior: backwash
1118binding_keys:
1119 - test_binding_key_1
1120 - test_binding_key_2
1121binding_headers: {}
1122headers_behavior: any
1123"#;
1124 let expected_output = Ingress {
1125 name: "test_ingress".into(),
1126 exchange: Exchange::AmqTopic,
1127 queue: Queue::named("test_queue"),
1128 durable: true,
1129 exclusive: true,
1130 auto_delete: true,
1131 batch_size: NonZeroUsize::new(21).unwrap(),
1132 batch_timeout: Duration::from_millis(2150),
1133 prefetch_count: Some(NonZeroU16::new(42).unwrap()),
1134 acking_behavior: AckingBehavior::Manual,
1135 gibberish_behavior: FinalizationKind::Backwash,
1136 binding_keys: HashSet::from([
1137 "test_binding_key_1".to_string(),
1138 "test_binding_key_2".to_string(),
1139 ]),
1140 binding_headers: HashMap::new(),
1141 headers_behavior: HeadersMatchingBehavior::Any,
1142 ..Default::default()
1143 };
1144
1145 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap();
1147
1148 assert_eq!(expected_output, actual_output);
1150 }
1151
1152 #[test]
1153 fn default_exchange_requires_queue_name() {
1154 let expected_output = IngressError::DefaultExchangeRequiresQueueName {
1156 ingress: "test_ingress".into(),
1157 };
1158
1159 let actual_output = Ingress::builder()
1161 .with_name("test_ingress")
1162 .with_exchange(Exchange::Default)
1163 .with_queue(Queue::empty())
1164 .build()
1165 .unwrap_err();
1166
1167 assert_eq!(expected_output, actual_output);
1169 }
1170
1171 #[test]
1172 fn deserialize_default_exchange_requires_queue_name() {
1173 let input = r#"
1175name: test_ingress
1176exchange: ''
1177queue: ''
1178"#;
1179 let expected_output = IngressError::DefaultExchangeRequiresQueueName {
1180 ingress: "test_ingress".into(),
1181 };
1182
1183 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1185
1186 assert!(
1188 actual_output
1189 .to_string()
1190 .starts_with(&expected_output.to_string()),
1191 );
1192 }
1193
1194 #[test]
1195 fn default_exchange_cannot_have_binding_keys() {
1196 let expected_output = IngressError::DefaultExchangeCannotHaveBindingKeys {
1198 ingress: "test_ingress".into(),
1199 };
1200
1201 let actual_output = Ingress::builder()
1203 .with_name("test_ingress")
1204 .with_exchange(Exchange::Default)
1205 .with_queue_named("test_queue")
1206 .with_binding_key("test_binding_key")
1207 .build()
1208 .unwrap_err();
1209
1210 assert_eq!(expected_output, actual_output);
1212 }
1213
1214 #[test]
1215 fn deserialize_default_exchange_cannot_have_binding_keys() {
1216 let input = r#"
1218name: test_ingress
1219exchange: ''
1220queue: test_queue
1221binding_keys:
1222 - test_binding_key
1223"#;
1224 let expected_output = IngressError::DefaultExchangeCannotHaveBindingKeys {
1225 ingress: "test_ingress".into(),
1226 };
1227
1228 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1230
1231 assert!(
1233 actual_output
1234 .to_string()
1235 .starts_with(&expected_output.to_string()),
1236 );
1237 }
1238
1239 #[test]
1240 fn default_exchange_cannot_have_binding_headers() {
1241 let expected_output = IngressError::DefaultExchangeCannotHaveBindingHeaders {
1243 ingress: "test_ingress".into(),
1244 };
1245
1246 let actual_output = Ingress::builder()
1248 .with_name("test_ingress")
1249 .with_exchange(Exchange::Default)
1250 .with_queue_named("test_queue")
1251 .with_binding_header("test_binding_header", 42)
1252 .build()
1253 .unwrap_err();
1254
1255 assert_eq!(expected_output, actual_output);
1257 }
1258
1259 #[test]
1260 fn deserialize_default_exchange_cannot_have_binding_headers() {
1261 let input = r#"
1263name: test_ingress
1264exchange: ''
1265queue: test_queue
1266binding_headers:
1267 test_binding_header: '42'
1268"#;
1269 let expected_output = IngressError::DefaultExchangeCannotHaveBindingHeaders {
1270 ingress: "test_ingress".into(),
1271 };
1272
1273 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1275
1276 assert!(
1278 actual_output
1279 .to_string()
1280 .starts_with(&expected_output.to_string()),
1281 );
1282 }
1283
1284 #[test]
1285 fn exchange_kind_requires_binding_keys() {
1286 let expected_output = IngressError::ExchangeKindRequiresBindingKeys {
1288 ingress: "test_ingress".into(),
1289 kind: ExchangeKind::Direct,
1290 };
1291
1292 let actual_output = Ingress::builder()
1294 .with_name("test_ingress")
1295 .with_exchange(Exchange::AmqDirect)
1296 .build()
1297 .unwrap_err();
1298
1299 assert_eq!(expected_output, actual_output);
1301 }
1302
1303 #[test]
1304 fn deserialize_exchange_kind_requires_binding_keys() {
1305 let input = r#"
1307name: test_ingress
1308exchange: amq.direct
1309"#;
1310 let expected_output = IngressError::ExchangeKindRequiresBindingKeys {
1311 ingress: "test_ingress".into(),
1312 kind: ExchangeKind::Direct,
1313 };
1314
1315 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1317
1318 assert!(
1320 actual_output
1321 .to_string()
1322 .starts_with(&expected_output.to_string()),
1323 );
1324 }
1325
1326 #[test]
1327 fn exchange_kind_cannot_have_binding_keys() {
1328 let expected_output = IngressError::ExchangeKindCannotHaveBindingKeys {
1330 ingress: "test_ingress".into(),
1331 kind: ExchangeKind::Headers,
1332 };
1333
1334 let actual_output = Ingress::builder()
1336 .with_name("test_ingress")
1337 .with_exchange(Exchange::AmqHeaders)
1338 .with_queue_named("test_queue")
1339 .with_binding_key("test_binding_key")
1340 .with_binding_header("test_binding_header", 42)
1341 .build()
1342 .unwrap_err();
1343
1344 assert_eq!(expected_output, actual_output);
1346 }
1347
1348 #[test]
1349 fn deserialize_exchange_kind_cannot_have_binding_keys() {
1350 let input = r#"
1352name: test_ingress
1353exchange: amq.headers
1354queue: test_queue
1355binding_key: test_binding_key
1356binding_headers:
1357 test_binding_header: 42
1358"#;
1359 let expected_output = IngressError::ExchangeKindCannotHaveBindingKeys {
1360 ingress: "test_ingress".into(),
1361 kind: ExchangeKind::Headers,
1362 };
1363
1364 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1366
1367 assert!(
1369 actual_output
1370 .to_string()
1371 .starts_with(&expected_output.to_string()),
1372 );
1373 }
1374
1375 #[test]
1376 fn exchange_kind_requires_binding_headers() {
1377 let expected_output = IngressError::ExchangeKindRequiresBindingHeaders {
1379 ingress: "test_ingress".into(),
1380 kind: ExchangeKind::Headers,
1381 };
1382
1383 let actual_output = Ingress::builder()
1385 .with_name("test_ingress")
1386 .with_exchange(Exchange::AmqHeaders)
1387 .with_queue_named("test_queue")
1388 .build()
1389 .unwrap_err();
1390
1391 assert_eq!(expected_output, actual_output);
1393 }
1394
1395 #[test]
1396 fn deserialize_exchange_kind_requires_binding_headers() {
1397 let input = r#"
1399name: test_ingress
1400exchange: amq.headers
1401queue: test_queue
1402"#;
1403 let expected_output = IngressError::ExchangeKindRequiresBindingHeaders {
1404 ingress: "test_ingress".into(),
1405 kind: ExchangeKind::Headers,
1406 };
1407
1408 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1410
1411 assert!(
1413 actual_output
1414 .to_string()
1415 .starts_with(&expected_output.to_string()),
1416 );
1417 }
1418
1419 #[test]
1420 fn exchange_kind_cannot_have_binding_headers() {
1421 let expected_output = IngressError::ExchangeKindCannotHaveBindingHeaders {
1423 ingress: "test_ingress".into(),
1424 kind: ExchangeKind::Topic,
1425 };
1426
1427 let actual_output = Ingress::builder()
1429 .with_name("test_ingress")
1430 .with_exchange(Exchange::AmqTopic)
1431 .with_queue_named("test_queue")
1432 .with_binding_key("test_binding_key")
1433 .with_binding_header("test_binding_header", 42)
1434 .build()
1435 .unwrap_err();
1436
1437 assert_eq!(expected_output, actual_output);
1439 }
1440
1441 #[test]
1442 fn deserialize_exchange_kind_cannot_have_binding_headers() {
1443 let input = r#"
1445name: test_ingress
1446exchange: amq.topic
1447queue: test_queue
1448binding_key: test_binding_key
1449binding_headers:
1450 test_binding_header: 42
1451"#;
1452 let expected_output = IngressError::ExchangeKindCannotHaveBindingHeaders {
1453 ingress: "test_ingress".into(),
1454 kind: ExchangeKind::Topic,
1455 };
1456
1457 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1459
1460 assert!(
1462 actual_output
1463 .to_string()
1464 .starts_with(&expected_output.to_string()),
1465 );
1466 }
1467
1468 #[test]
1469 fn exchange_kind_cannot_have_empty_binding_key() {
1470 let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingKey {
1472 ingress: "test_ingress".into(),
1473 kind: ExchangeKind::Topic,
1474 };
1475
1476 let actual_output = Ingress::builder()
1478 .with_name("test_ingress")
1479 .with_exchange(Exchange::AmqTopic)
1480 .with_queue_named("test_queue")
1481 .with_binding_key("")
1482 .build()
1483 .unwrap_err();
1484
1485 assert_eq!(expected_output, actual_output);
1487 }
1488
1489 #[test]
1490 fn deserialize_exchange_kind_cannot_have_empty_binding_key() {
1491 let input = r#"
1493name: test_ingress
1494exchange: amq.topic
1495queue: test_queue
1496binding_key: ''
1497"#;
1498 let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingKey {
1499 ingress: "test_ingress".into(),
1500 kind: ExchangeKind::Topic,
1501 };
1502
1503 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1505
1506 assert!(
1508 actual_output
1509 .to_string()
1510 .starts_with(&expected_output.to_string()),
1511 );
1512 }
1513
1514 #[test]
1515 fn exchange_kind_cannot_have_empty_binding_header() {
1516 let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingHeader {
1518 ingress: "test_ingress".into(),
1519 kind: ExchangeKind::Headers,
1520 };
1521
1522 let actual_output = Ingress::builder()
1524 .with_name("test_ingress")
1525 .with_exchange(Exchange::AmqHeaders)
1526 .with_queue_named("test_queue")
1527 .with_binding_header("test_binding_header", "")
1528 .build()
1529 .unwrap_err();
1530
1531 assert_eq!(expected_output, actual_output);
1533 }
1534
1535 #[test]
1536 fn deserialize_exchange_kind_cannot_have_empty_binding_header() {
1537 let input = r#"
1539name: test_ingress
1540exchange: amq.headers
1541queue: test_queue
1542binding_header:
1543 test_binding_header: ''
1544"#;
1545 let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingHeader {
1546 ingress: "test_ingress".into(),
1547 kind: ExchangeKind::Headers,
1548 };
1549
1550 let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1552
1553 assert!(
1555 actual_output
1556 .to_string()
1557 .starts_with(&expected_output.to_string()),
1558 );
1559 }
1560
1561 #[test]
1562 fn deserialize_landscape_from_empty() {
1563 let input = "";
1565 let expected_output = IngressLandscape::default();
1566
1567 let actual_output = serde_yml::from_str::<IngressLandscape>(input).unwrap();
1569
1570 assert_eq!(expected_output, actual_output);
1572 }
1573
1574 #[test]
1575 fn deserialize_landscape_from_full() {
1576 let input = r#"
1578test_ingress_a: test_queue_a
1579test_ingress_b:
1580 exchange: test_exchange_b
1581 queue: test_queue_b
1582 binding_key: test_binding_key_b
1583"#;
1584 let expected_output = IngressLandscape::from([
1585 (
1586 "test_ingress_a",
1587 Ingress::builder()
1588 .with_name("test_ingress_a")
1589 .with_exchange(Exchange::Default)
1590 .with_queue_named("test_queue_a")
1591 .build()
1592 .unwrap(),
1593 ),
1594 (
1595 "test_ingress_b",
1596 Ingress::builder()
1597 .with_name("test_ingress_b")
1598 .with_exchange(Exchange::named("test_exchange_b").unwrap())
1599 .with_queue_named("test_queue_b")
1600 .with_binding_key("test_binding_key_b")
1601 .build()
1602 .unwrap(),
1603 ),
1604 ]);
1605
1606 let actual_output = serde_yml::from_str::<IngressLandscape>(input).unwrap();
1608
1609 assert_eq!(expected_output, actual_output);
1611 }
1612}