strut_rabbitmq/routing/
ingress.rs

1use crate::{
2    AckingBehavior, Exchange, ExchangeKind, FinalizationKind, Header, HeadersMatchingBehavior,
3    Queue,
4};
5use humantime::parse_duration;
6use serde::de::{DeserializeSeed, Error, IgnoredAny, MapAccess, Visitor};
7use serde::{Deserialize, Deserializer};
8use std::collections::{HashMap, HashSet};
9use std::fmt::Formatter;
10use std::num::{NonZeroU16, NonZeroUsize};
11use std::sync::Arc;
12use std::time::Duration;
13use strut_deserialize::{OneOrMany, Slug, SlugMap};
14use strut_factory::impl_deserialize_field;
15use thiserror::Error;
16
17pub mod queue;
18
19/// Represents a collection of uniquely named [`Ingress`] definitions.
20#[derive(Debug, Default, Clone, PartialEq, Eq)]
21pub struct IngressLandscape {
22    ingresses: SlugMap<Ingress>,
23}
24
25/// Defines an inbound path for messages being consumed from a RabbitMQ cluster.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct Ingress {
28    name: Arc<str>,
29    exchange: Exchange,
30    queue: Queue,
31    durable: bool,
32    exclusive: bool,
33    auto_delete: bool,
34    batch_size: NonZeroUsize,
35    batch_timeout: Duration,
36    prefetch_count: Option<NonZeroU16>,
37    acking_behavior: AckingBehavior,
38    gibberish_behavior: FinalizationKind,
39    // Exchange kind-specific configuration:
40    binding_keys: HashSet<String>,             // direct, topic
41    binding_headers: HashMap<String, Header>,  // headers
42    headers_behavior: HeadersMatchingBehavior, // headers
43}
44
45impl IngressLandscape {
46    /// Reports whether this landscape contains a [`Ingress`] with the
47    /// given unique name.
48    pub fn contains(&self, name: impl AsRef<str>) -> bool {
49        self.ingresses.contains_key(name.as_ref())
50    }
51
52    /// Retrieves `Some` reference to a [`Ingress`] from this landscape
53    /// under the given name, or `None`, if the name is not present in the
54    /// landscape.
55    pub fn get(&self, name: impl AsRef<str>) -> Option<&Ingress> {
56        self.ingresses.get(name.as_ref())
57    }
58
59    /// Retrieves a reference to a [`Ingress`] from this landscape under
60    /// the given name. Panics if the name is not present in the collection.
61    pub fn expect(&self, name: impl AsRef<str>) -> &Ingress {
62        let name = name.as_ref();
63
64        self.get(name)
65            .unwrap_or_else(|| panic!("requested an undefined RabbitMQ ingress '{}'", name))
66    }
67}
68
69impl Ingress {
70    /// Creates a new [`IngressBuilder`].
71    pub fn builder() -> IngressBuilder {
72        IngressBuilder::new()
73    }
74}
75
76impl Ingress {
77    /// Reports the ingress name for this definition.
78    pub fn name(&self) -> &str {
79        &self.name
80    }
81
82    /// Returns the exchange definition as part of this ingress definition.
83    pub fn exchange(&self) -> &Exchange {
84        &self.exchange
85    }
86
87    /// Returns the queue definition as part of this ingress definition.
88    pub fn queue(&self) -> &Queue {
89        &self.queue
90    }
91
92    /// Reports the ingress `durable` flag for this definition.
93    pub fn durable(&self) -> bool {
94        self.durable
95    }
96
97    /// Reports the ingress `exclusive` flag for this definition.
98    pub fn exclusive(&self) -> bool {
99        self.exclusive
100    }
101
102    /// Reports the ingress `auto_delete` flag for this definition.
103    pub fn auto_delete(&self) -> bool {
104        self.auto_delete
105    }
106
107    /// Reports the desired `no_ack` value for a consumer based on this ingress
108    /// definition.
109    ///
110    /// This boolean value is recognized by RabbitMQ and can be a bit misleading:
111    ///
112    /// - `false` = messages must be acknowledged manually by the client.
113    /// - `true` = messages are acknowledged automatically by the broker,
114    pub fn no_ack(&self) -> bool {
115        match self.acking_behavior {
116            AckingBehavior::Manual => false,
117            AckingBehavior::Auto => true,
118        }
119    }
120
121    /// Reports whether the messages delivered by a consumer based on this
122    /// ingress definition are delivered in pending state (need to be manually
123    /// finalized) or are delivered pre-finalized (pre-acknowledged on delivery).
124    pub fn delivers_pending(&self) -> bool {
125        match self.acking_behavior {
126            AckingBehavior::Manual => true,
127            AckingBehavior::Auto => false,
128        }
129    }
130
131    /// Reports the ingress batch size for this definition.
132    ///
133    /// The [`Subscriber`] supports batch-consuming messages. After the first
134    /// message of the batch is consumed, any message that arrive within the
135    /// [timeout](Ingress::batch_timeout) will be appended to the same batch,
136    /// unless this size limit is reached.
137    pub fn batch_size(&self) -> NonZeroUsize {
138        self.batch_size
139    }
140
141    /// Reports the ingress batch timeout for this definition.
142    ///
143    /// The [`Subscriber`] supports batch-consuming messages. After the first
144    /// message of the batch is consumed, any message that arrive within this
145    /// timeout will be appended to the same batch, unless the
146    /// [size limit](Ingress::batch_size) is reached.
147    pub fn batch_timeout(&self) -> Duration {
148        self.batch_timeout
149    }
150
151    /// Reports the ingress prefetch count for this definition.
152    pub fn prefetch_count(&self) -> Option<NonZeroU16> {
153        self.prefetch_count
154    }
155
156    /// Reports the ingress acking behavior for this definition.
157    pub fn acking_behavior(&self) -> AckingBehavior {
158        self.acking_behavior
159    }
160
161    /// Reports the ingress gibberish behavior for this definition.
162    pub fn gibberish_behavior(&self) -> FinalizationKind {
163        self.gibberish_behavior
164    }
165
166    /// Reports the ingress binding keys for this definition.
167    pub fn binding_keys(&self) -> &HashSet<String> {
168        &self.binding_keys
169    }
170
171    /// Reports the ingress binding headers for this definition.
172    pub fn binding_headers(&self) -> &HashMap<String, Header> {
173        &self.binding_headers
174    }
175
176    /// Reports the ingress headers behavior for this definition.
177    pub fn headers_behavior(&self) -> HeadersMatchingBehavior {
178        self.headers_behavior
179    }
180}
181
182/// Temporary struct for accumulating ingress configuration before finalizing it
183/// into a [`Ingress`]. This builder intends to apply validation only at
184/// meaningful states of the configuration, as opposed to every intermediary
185/// state.
186#[derive(Debug)]
187pub struct IngressBuilder {
188    name: Arc<str>,
189    exchange: Exchange,
190    queue: Queue,
191    durable: bool,
192    exclusive: bool,
193    auto_delete: bool,
194    batch_size: NonZeroUsize,
195    batch_timeout: Duration,
196    prefetch_count: Option<NonZeroU16>,
197    acking_behavior: AckingBehavior,
198    gibberish_behavior: FinalizationKind,
199    // Exchange kind-specific configuration:
200    binding_keys: HashSet<String>,             // direct, topic
201    binding_headers: HashMap<String, Header>,  // headers
202    headers_behavior: HeadersMatchingBehavior, // headers
203}
204
205impl IngressBuilder {
206    /// Creates a new [`Ingress`] builder.
207    pub fn new() -> Self {
208        Self {
209            name: Arc::from(Ingress::default_name()),
210            exchange: Exchange::default(),
211            queue: Queue::default(),
212            durable: Ingress::default_durable(),
213            exclusive: Ingress::default_exclusive(),
214            auto_delete: Ingress::default_auto_delete(),
215            batch_size: Ingress::default_batch_size(),
216            batch_timeout: Ingress::default_batch_timeout(),
217            prefetch_count: Ingress::default_prefetch_count(),
218            acking_behavior: Ingress::default_acking_behavior(),
219            gibberish_behavior: Ingress::default_gibberish_behavior(),
220            binding_keys: Ingress::default_binding_keys(),
221            binding_headers: Ingress::default_binding_headers(),
222            headers_behavior: Ingress::default_headers_behavior(),
223        }
224    }
225
226    /// Recreates this ingress definition builder with the given name.
227    pub fn with_name(self, name: impl AsRef<str>) -> Self {
228        Self {
229            name: Arc::from(name.as_ref()),
230            ..self
231        }
232    }
233
234    /// Recreates this ingress definition builder with the given exchange.
235    pub fn with_exchange(self, exchange: Exchange) -> Self {
236        Self { exchange, ..self }
237    }
238
239    /// Recreates this ingress definition builder with the given queue.
240    pub fn with_queue(self, queue: Queue) -> Self {
241        Self { queue, ..self }
242    }
243
244    /// Recreates this ingress definition builder with a queue with the given
245    /// name.
246    pub fn with_queue_named(self, queue: impl AsRef<str>) -> Self {
247        Self {
248            queue: Queue::named(queue),
249            ..self
250        }
251    }
252
253    /// Recreates this ingress definition builder with the given `durable` flag.
254    pub fn with_durable(self, durable: bool) -> Self {
255        Self { durable, ..self }
256    }
257
258    /// Recreates this ingress definition builder with the given `exclusive` flag.
259    pub fn with_exclusive(self, exclusive: bool) -> Self {
260        Self { exclusive, ..self }
261    }
262
263    /// Recreates this ingress definition builder with the given `auto_delete` flag.
264    pub fn with_auto_delete(self, auto_delete: bool) -> Self {
265        Self {
266            auto_delete,
267            ..self
268        }
269    }
270
271    /// Recreates this ingress definition builder with the given batch size.
272    pub fn with_batch_size(self, batch_size: NonZeroUsize) -> Self {
273        Self { batch_size, ..self }
274    }
275
276    /// Recreates this ingress definition builder with the given batch timeout.
277    pub fn with_batch_timeout(self, batch_timeout: Duration) -> Self {
278        Self {
279            batch_timeout,
280            ..self
281        }
282    }
283
284    /// Recreates this ingress definition builder with the given prefetch count.
285    pub fn with_prefetch_count(self, prefetch_count: Option<NonZeroU16>) -> Self {
286        Self {
287            prefetch_count,
288            ..self
289        }
290    }
291
292    /// Recreates this ingress definition builder with the given acking behavior.
293    pub fn with_acking_behavior(self, acking_behavior: AckingBehavior) -> Self {
294        Self {
295            acking_behavior,
296            ..self
297        }
298    }
299
300    /// Recreates this ingress definition builder with the given gibberish behavior.
301    pub fn with_gibberish_behavior(self, gibberish_behavior: FinalizationKind) -> Self {
302        Self {
303            gibberish_behavior,
304            ..self
305        }
306    }
307
308    /// Recreates this ingress definition builder, adding the given binding key
309    /// to the ones already included.
310    pub fn with_binding_key(self, binding_key: impl Into<String>) -> Self {
311        let mut binding_keys = self.binding_keys;
312        binding_keys.insert(binding_key.into());
313
314        Self {
315            binding_keys,
316            ..self
317        }
318    }
319
320    /// Recreates this ingress definition builder with the given binding keys.
321    ///
322    /// This will replace any previously
323    /// [added](IngressBuilder::with_binding_key) binding keys.
324    pub fn with_replaced_binding_keys(self, binding_keys: HashSet<String>) -> Self {
325        Self {
326            binding_keys,
327            ..self
328        }
329    }
330
331    /// Recreates this ingress definition builder, adding the given binding
332    /// header to the ones already included.
333    pub fn with_binding_header<V>(self, key: impl Into<String>, value: V) -> Self
334    where
335        Header: From<V>,
336    {
337        let mut binding_headers = self.binding_headers;
338        binding_headers.insert(key.into(), Header::from(value));
339
340        Self {
341            binding_headers,
342            ..self
343        }
344    }
345
346    /// Recreates this ingress definition builder with the given binding headers.
347    ///
348    /// This will replace any previously
349    /// [added](IngressBuilder::with_binding_header) binding headers.
350    pub fn with_replaced_binding_headers(self, binding_headers: HashMap<String, Header>) -> Self {
351        Self {
352            binding_headers,
353            ..self
354        }
355    }
356
357    /// Recreates this ingress definition builder with the headers matching
358    /// behavior set to `all` (all headers must match).
359    pub fn with_matching_all_headers(self) -> Self {
360        Self {
361            headers_behavior: HeadersMatchingBehavior::All,
362            ..self
363        }
364    }
365
366    /// Recreates this ingress definition builder with the headers matching
367    /// behavior set to `any` (at least one header must match).
368    pub fn with_matching_any_headers(self) -> Self {
369        Self {
370            headers_behavior: HeadersMatchingBehavior::Any,
371            ..self
372        }
373    }
374
375    /// Recreates this ingress definition builder with the given headers behavior.
376    pub fn with_headers_behavior(self, headers_behavior: HeadersMatchingBehavior) -> Self {
377        Self {
378            headers_behavior,
379            ..self
380        }
381    }
382
383    /// Finalizes the builder, validates its state, and, assuming valid state,
384    /// returns the [`Ingress`].
385    pub fn build(mut self) -> Result<Ingress, IngressError> {
386        // At the last moment, slide in a possible implicit binding key
387        if let Some(implicit_binding_key) = self.maybe_implicit_binding_key() {
388            self.binding_keys = implicit_binding_key;
389        }
390
391        self.validate()?;
392
393        Ok(Ingress {
394            name: self.name,
395            exchange: self.exchange,
396            queue: self.queue,
397            durable: self.durable,
398            exclusive: self.exclusive,
399            auto_delete: self.auto_delete,
400            batch_size: self.batch_size,
401            batch_timeout: self.batch_timeout,
402            prefetch_count: self.prefetch_count,
403            acking_behavior: self.acking_behavior,
404            gibberish_behavior: self.gibberish_behavior,
405            binding_keys: self.binding_keys,
406            binding_headers: self.binding_headers,
407            headers_behavior: self.headers_behavior,
408        })
409    }
410
411    fn maybe_implicit_binding_key(&self) -> Option<HashSet<String>> {
412        // Does the exchange allow explicit binding key?
413        let exchange_allows_implicit_binding_key = match self.exchange.kind() {
414            ExchangeKind::Direct | ExchangeKind::Topic => !self.exchange.is_default(),
415            _ => false,
416        };
417
418        // Can we maybe use the queue name as a binding key?
419        if exchange_allows_implicit_binding_key
420            && self.binding_keys.is_empty()
421            && !self.queue.is_empty()
422        {
423            // Use queue name also as a binding key
424            return Some(HashSet::from([self.queue.name().to_string()]));
425        }
426
427        None
428    }
429
430    fn validate(&self) -> Result<(), IngressError> {
431        if self.acking_behavior == AckingBehavior::Manual {
432            if let Some(prefetch_count) = self.prefetch_count {
433                if usize::from(self.batch_size) > (u16::from(prefetch_count) as usize) {
434                    return Err(IngressError::BatchSizeGreaterThanPrefetchCount {
435                        ingress: self.name.to_string(),
436                        prefetch_count,
437                        batch_size: self.batch_size,
438                    });
439                }
440            } else {
441                if self.batch_size > NonZeroUsize::MIN {
442                    return Err(IngressError::BatchSizeWithoutPrefetchCount {
443                        ingress: self.name.to_string(),
444                        batch_size: self.batch_size,
445                    });
446                }
447            }
448        }
449
450        if self.exchange.is_default() {
451            self.validate_default_exchange()?;
452        } else {
453            self.validate_non_default_exchange()?;
454        }
455
456        Ok(())
457    }
458
459    fn validate_default_exchange(&self) -> Result<(), IngressError> {
460        // Ensure queue name is not empty
461        if self.queue.is_empty() {
462            return Err(IngressError::DefaultExchangeRequiresQueueName {
463                ingress: self.name.to_string(),
464            });
465        }
466
467        // Ensure there are no binding keys
468        if !self.binding_keys.is_empty() {
469            return Err(IngressError::DefaultExchangeCannotHaveBindingKeys {
470                ingress: self.name.to_string(),
471            });
472        }
473
474        // Ensure there are no binding headers
475        if !self.binding_headers.is_empty() {
476            return Err(IngressError::DefaultExchangeCannotHaveBindingHeaders {
477                ingress: self.name.to_string(),
478            });
479        }
480
481        Ok(())
482    }
483
484    fn validate_non_default_exchange(&self) -> Result<(), IngressError> {
485        match self.exchange.kind() {
486            ExchangeKind::Direct | ExchangeKind::Topic => self.validate_binding_keys()?,
487            ExchangeKind::Headers => self.validate_binding_headers()?,
488            ExchangeKind::Fanout | ExchangeKind::HashKey | ExchangeKind::HashId => {
489                self.validate_no_bindings()?
490            }
491        };
492
493        Ok(())
494    }
495
496    fn validate_binding_keys(&self) -> Result<(), IngressError> {
497        self.validate_no_binding_headers()?;
498
499        if self.binding_keys.is_empty() {
500            return Err(IngressError::ExchangeKindRequiresBindingKeys {
501                ingress: self.name.to_string(),
502                kind: self.exchange.kind(),
503            });
504        }
505
506        for key in &self.binding_keys {
507            if key.is_empty() {
508                return Err(IngressError::ExchangeKindCannotHaveEmptyBindingKey {
509                    ingress: self.name.to_string(),
510                    kind: self.exchange.kind(),
511                });
512            }
513        }
514
515        Ok(())
516    }
517
518    fn validate_binding_headers(&self) -> Result<(), IngressError> {
519        self.validate_no_binding_keys()?;
520
521        if self.binding_headers.is_empty() {
522            return Err(IngressError::ExchangeKindRequiresBindingHeaders {
523                ingress: self.name.to_string(),
524                kind: self.exchange.kind(),
525            });
526        }
527
528        for (key, value) in &self.binding_headers {
529            if key.is_empty() || value.is_empty() {
530                return Err(IngressError::ExchangeKindCannotHaveEmptyBindingHeader {
531                    ingress: self.name.to_string(),
532                    kind: self.exchange.kind(),
533                });
534            }
535        }
536
537        Ok(())
538    }
539
540    fn validate_no_binding_keys(&self) -> Result<(), IngressError> {
541        if !self.binding_keys.is_empty() {
542            return Err(IngressError::ExchangeKindCannotHaveBindingKeys {
543                ingress: self.name.to_string(),
544                kind: self.exchange.kind(),
545            });
546        }
547
548        Ok(())
549    }
550
551    fn validate_no_binding_headers(&self) -> Result<(), IngressError> {
552        if !self.binding_headers.is_empty() {
553            return Err(IngressError::ExchangeKindCannotHaveBindingHeaders {
554                ingress: self.name.to_string(),
555                kind: self.exchange.kind(),
556            });
557        }
558
559        Ok(())
560    }
561
562    fn validate_no_bindings(&self) -> Result<(), IngressError> {
563        self.validate_no_binding_keys()?;
564        self.validate_no_binding_headers()?;
565
566        Ok(())
567    }
568}
569
570impl Ingress {
571    fn default_name() -> &'static str {
572        "default"
573    }
574
575    fn default_durable() -> bool {
576        false
577    }
578
579    fn default_exclusive() -> bool {
580        false
581    }
582
583    fn default_auto_delete() -> bool {
584        false
585    }
586
587    fn default_batch_size() -> NonZeroUsize {
588        NonZeroUsize::MIN
589    }
590
591    fn default_batch_timeout() -> Duration {
592        Duration::from_millis(250)
593    }
594
595    fn default_prefetch_count() -> Option<NonZeroU16> {
596        None
597    }
598
599    fn default_acking_behavior() -> AckingBehavior {
600        AckingBehavior::Manual
601    }
602
603    fn default_gibberish_behavior() -> FinalizationKind {
604        FinalizationKind::Complete
605    }
606
607    fn default_binding_keys() -> HashSet<String> {
608        HashSet::default()
609    }
610
611    fn default_binding_headers() -> HashMap<String, Header> {
612        HashMap::default()
613    }
614
615    fn default_headers_behavior() -> HeadersMatchingBehavior {
616        HeadersMatchingBehavior::All
617    }
618}
619
620#[cfg(test)]
621impl Default for Ingress {
622    fn default() -> Self {
623        Self {
624            name: Arc::from(""),
625            exchange: Exchange::default(),
626            queue: Queue::default(),
627            durable: Self::default_durable(),
628            exclusive: Self::default_exclusive(),
629            auto_delete: Self::default_auto_delete(),
630            batch_size: Self::default_batch_size(),
631            batch_timeout: Self::default_batch_timeout(),
632            prefetch_count: Self::default_prefetch_count(),
633            acking_behavior: Self::default_acking_behavior(),
634            gibberish_behavior: Self::default_gibberish_behavior(),
635            binding_keys: Self::default_binding_keys(),
636            binding_headers: Self::default_binding_headers(),
637            headers_behavior: Self::default_headers_behavior(),
638        }
639    }
640}
641
642/// Represents the various error states of a RabbitMQ ingress definition.
643#[derive(Error, Debug, PartialEq, Eq)]
644pub enum IngressError {
645    /// Indicates batch size greater than prefetch count
646    #[error(
647        "invalid batch size configuration for ingress '{ingress}' with prefetch count of {prefetch_count}: expected <= {prefetch_count}, found {batch_size}"
648    )]
649    BatchSizeGreaterThanPrefetchCount {
650        /// Ingress name
651        ingress: String,
652        /// Batch size
653        batch_size: NonZeroUsize,
654        /// Prefetch count,
655        prefetch_count: NonZeroU16,
656    },
657
658    /// Indicates batch size on an ingress without prefetch count
659    #[error(
660        "invalid batch size configuration for ingress '{ingress}' without prefetch count: expected 1, found {batch_size}"
661    )]
662    BatchSizeWithoutPrefetchCount {
663        /// Ingress name
664        ingress: String,
665        /// Batch size
666        batch_size: NonZeroUsize,
667    },
668
669    /// Indicates the absence of a queue name where it is required.
670    #[error(
671        "invalid configuration for ingress '{ingress}' with default exchange: expected queue name, found none/empty"
672    )]
673    DefaultExchangeRequiresQueueName {
674        /// Ingress name
675        ingress: String,
676    },
677
678    /// Indicates the presence of binding keys on a default exchange, which doesn’t allow them.
679    #[error(
680        "invalid configuration for ingress '{ingress}' with default exchange: expected no binding keys, found at least one"
681    )]
682    DefaultExchangeCannotHaveBindingKeys {
683        /// Ingress name
684        ingress: String,
685    },
686
687    /// Indicates the presence of binding headers on a default exchange, which doesn’t allow them.
688    #[error(
689        "invalid configuration for ingress '{ingress}' with default exchange: expected no binding headers, found at least one"
690    )]
691    DefaultExchangeCannotHaveBindingHeaders {
692        /// Ingress name
693        ingress: String,
694    },
695
696    /// Indicates the absence of binding keys on an exchange kind that requires them.
697    #[error(
698        "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected at least one binding key, found none"
699    )]
700    ExchangeKindRequiresBindingKeys {
701        /// Ingress name
702        ingress: String,
703        /// Exchange kind
704        kind: ExchangeKind,
705    },
706
707    /// Indicates the presence of binding keys on an exchange kind that ignores them.
708    #[error(
709        "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected no binding keys, found at least one"
710    )]
711    ExchangeKindCannotHaveBindingKeys {
712        /// Ingress name
713        ingress: String,
714        /// Exchange kind
715        kind: ExchangeKind,
716    },
717
718    /// Indicates the absence of binding headers on an exchange kind that requires them.
719    #[error(
720        "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected at least one binding header, found none"
721    )]
722    ExchangeKindRequiresBindingHeaders {
723        /// Ingress name
724        ingress: String,
725        /// Exchange kind
726        kind: ExchangeKind,
727    },
728
729    /// Indicates the presence of binding headers on an exchange kind that ignores them.
730    #[error(
731        "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected no binding headers, found at least one"
732    )]
733    ExchangeKindCannotHaveBindingHeaders {
734        /// Ingress name
735        ingress: String,
736        /// Exchange kind
737        kind: ExchangeKind,
738    },
739
740    /// Indicates the presence of an empty binding key on an exchange kind that requires them.
741    #[error(
742        "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected non-empty binding keys, found an empty one"
743    )]
744    ExchangeKindCannotHaveEmptyBindingKey {
745        /// Ingress name
746        ingress: String,
747        /// Exchange kind
748        kind: ExchangeKind,
749    },
750
751    /// Indicates the presence of an empty binding header on an exchange kind that requires them.
752    #[error(
753        "invalid configuration for ingress '{ingress}' with exchange of type '{kind:?}': expected non-empty binding header keys and values, found an empty one"
754    )]
755    ExchangeKindCannotHaveEmptyBindingHeader {
756        /// Ingress name
757        ingress: String,
758        /// Exchange kind
759        kind: ExchangeKind,
760    },
761}
762
763impl AsRef<Ingress> for Ingress {
764    fn as_ref(&self) -> &Ingress {
765        self
766    }
767}
768
769impl AsRef<IngressLandscape> for IngressLandscape {
770    fn as_ref(&self) -> &IngressLandscape {
771        self
772    }
773}
774
775const _: () = {
776    impl<S> FromIterator<(S, Ingress)> for IngressLandscape
777    where
778        S: Into<Slug>,
779    {
780        fn from_iter<T: IntoIterator<Item = (S, Ingress)>>(iter: T) -> Self {
781            let ingresses = iter.into_iter().collect();
782
783            Self { ingresses }
784        }
785    }
786
787    impl<const N: usize, S> From<[(S, Ingress); N]> for IngressLandscape
788    where
789        S: Into<Slug>,
790    {
791        fn from(value: [(S, Ingress); N]) -> Self {
792            value.into_iter().collect()
793        }
794    }
795};
796
797const _: () = {
798    impl<'de> Deserialize<'de> for IngressLandscape {
799        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
800        where
801            D: Deserializer<'de>,
802        {
803            deserializer.deserialize_map(IngressLandscapeVisitor)
804        }
805    }
806
807    struct IngressLandscapeVisitor;
808
809    impl<'de> Visitor<'de> for IngressLandscapeVisitor {
810        type Value = IngressLandscape;
811
812        fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
813            formatter.write_str("a map of RabbitMQ ingress landscape")
814        }
815
816        fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
817        where
818            A: MapAccess<'de>,
819        {
820            let grouped = Slug::group_map(map)?;
821            let mut ingresses = HashMap::with_capacity(grouped.len());
822
823            for (key, value) in grouped {
824                let seed = IngressSeed {
825                    name: key.original(),
826                };
827                let handle = seed.deserialize(value).map_err(Error::custom)?;
828                ingresses.insert(key, handle);
829            }
830
831            Ok(IngressLandscape {
832                ingresses: SlugMap::new(ingresses),
833            })
834        }
835    }
836
837    struct IngressSeed<'a> {
838        name: &'a str,
839    }
840
841    impl<'de> DeserializeSeed<'de> for IngressSeed<'_> {
842        type Value = Ingress;
843
844        fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
845        where
846            D: Deserializer<'de>,
847        {
848            deserializer.deserialize_any(IngressSeedVisitor { name: self.name })
849        }
850    }
851
852    struct IngressSeedVisitor<'a> {
853        name: &'a str,
854    }
855
856    impl<'de> Visitor<'de> for IngressSeedVisitor<'_> {
857        type Value = Ingress;
858
859        fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
860            formatter.write_str("a map of RabbitMQ ingress or a string queue name")
861        }
862
863        fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
864        where
865            E: Error,
866        {
867            Ingress::builder()
868                .with_name(self.name)
869                .with_queue_named(value)
870                .build()
871                .map_err(Error::custom)
872        }
873
874        fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
875        where
876            A: MapAccess<'de>,
877        {
878            visit_ingress(map, Some(self.name))
879        }
880    }
881
882    impl<'de> Deserialize<'de> for Ingress {
883        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
884        where
885            D: Deserializer<'de>,
886        {
887            deserializer.deserialize_map(IngressVisitor)
888        }
889    }
890
891    struct IngressVisitor;
892
893    impl<'de> Visitor<'de> for IngressVisitor {
894        type Value = Ingress;
895
896        fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
897            formatter.write_str("a map of RabbitMQ ingress")
898        }
899
900        fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
901        where
902            A: MapAccess<'de>,
903        {
904            visit_ingress(map, None)
905        }
906    }
907
908    fn visit_ingress<'de, A>(mut map: A, known_name: Option<&str>) -> Result<Ingress, A::Error>
909    where
910        A: MapAccess<'de>,
911    {
912        let mut name: Option<String> = None;
913        let mut exchange = None;
914        let mut queue = None;
915        let mut durable = None;
916        let mut exclusive = None;
917        let mut auto_delete = None;
918        let mut batch_size = None;
919        let mut batch_timeout = None;
920        let mut prefetch_count = None;
921        let mut acking_behavior = None;
922        let mut gibberish_behavior = None;
923        let mut binding_keys: Option<OneOrMany<String>> = None;
924        let mut binding_headers = None;
925        let mut headers_behavior = None;
926
927        while let Some(key) = map.next_key()? {
928            match key {
929                IngressField::name => key.poll(&mut map, &mut name)?,
930                IngressField::exchange => key.poll(&mut map, &mut exchange)?,
931                IngressField::queue => key.poll(&mut map, &mut queue)?,
932                IngressField::durable => key.poll(&mut map, &mut durable)?,
933                IngressField::exclusive => key.poll(&mut map, &mut exclusive)?,
934                IngressField::auto_delete => key.poll(&mut map, &mut auto_delete)?,
935                IngressField::batch_size => key.poll(&mut map, &mut batch_size)?,
936                IngressField::batch_timeout => {
937                    let duration_string = map.next_value::<String>()?;
938                    let duration = parse_duration(&duration_string).map_err(Error::custom)?;
939                    batch_timeout = Some(duration);
940                    IgnoredAny
941                }
942                IngressField::prefetch_count => key.poll(&mut map, &mut prefetch_count)?,
943                IngressField::acking_behavior => key.poll(&mut map, &mut acking_behavior)?,
944                IngressField::gibberish_behavior => key.poll(&mut map, &mut gibberish_behavior)?,
945                IngressField::binding_keys => key.poll(&mut map, &mut binding_keys)?,
946                IngressField::binding_headers => key.poll(&mut map, &mut binding_headers)?,
947                IngressField::headers_behavior => key.poll(&mut map, &mut headers_behavior)?,
948                IngressField::__ignore => map.next_value()?,
949            };
950        }
951
952        let name = match known_name {
953            Some(known_name) => known_name,
954            None => name.as_deref().unwrap_or_else(|| Ingress::default_name()),
955        };
956
957        let mut builder = Ingress::builder()
958            .with_name(name)
959            .with_exchange(exchange.unwrap_or_default())
960            .with_queue(queue.unwrap_or_default());
961
962        if let Some(durable) = durable {
963            builder = builder.with_durable(durable);
964        }
965        if let Some(exclusive) = exclusive {
966            builder = builder.with_exclusive(exclusive);
967        }
968        if let Some(auto_delete) = auto_delete {
969            builder = builder.with_auto_delete(auto_delete);
970        }
971        if let Some(batch_size) = batch_size {
972            builder = builder.with_batch_size(batch_size);
973        }
974        if let Some(batch_timeout) = batch_timeout {
975            builder = builder.with_batch_timeout(batch_timeout);
976        }
977        if let Some(prefetch_count) = prefetch_count {
978            builder = builder.with_prefetch_count(prefetch_count);
979        }
980        if let Some(acking_behavior) = acking_behavior {
981            builder = builder.with_acking_behavior(acking_behavior);
982        }
983        if let Some(gibberish_behavior) = gibberish_behavior {
984            builder = builder.with_gibberish_behavior(gibberish_behavior);
985        }
986        if let Some(binding_keys) = binding_keys {
987            builder = builder.with_replaced_binding_keys(binding_keys.into());
988        }
989        if let Some(binding_headers) = binding_headers {
990            builder = builder.with_replaced_binding_headers(binding_headers);
991        }
992        if let Some(headers_behavior) = headers_behavior {
993            builder = builder.with_headers_behavior(headers_behavior);
994        }
995
996        builder.build().map_err(Error::custom)
997    }
998
999    impl_deserialize_field!(
1000        IngressField,
1001        strut_deserialize::Slug::eq_as_slugs,
1002        name,
1003        exchange,
1004        queue,
1005        durable,
1006        exclusive,
1007        auto_delete,
1008        batch_size,
1009        batch_timeout,
1010        prefetch_count | prefetch,
1011        acking_behavior | acking,
1012        gibberish_behavior | gibberish,
1013        binding_keys | binding_key,
1014        binding_headers | binding_header,
1015        headers_behavior | header_behavior,
1016    );
1017};
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use pretty_assertions::assert_eq;
1023
1024    #[test]
1025    fn deserialize_from_empty() {
1026        // Given
1027        let input = "";
1028
1029        // When
1030        let actual_output = serde_yml::from_str::<Ingress>(input);
1031
1032        // Then
1033        assert!(actual_output.is_err());
1034    }
1035
1036    #[test]
1037    fn deserialize_from_string() {
1038        // Given
1039        let input = "\"test_ingress\"";
1040
1041        // When
1042        let actual_output = serde_yml::from_str::<Ingress>(input);
1043
1044        // Then
1045        assert!(actual_output.is_err());
1046    }
1047
1048    #[test]
1049    fn deserialize_from_name() {
1050        // Given
1051        let input = r#"
1052name: test_ingress
1053"#;
1054
1055        // When
1056        let actual_output = serde_yml::from_str::<Ingress>(input);
1057
1058        // Then
1059        assert!(actual_output.is_err());
1060    }
1061
1062    #[test]
1063    fn deserialize_from_name_and_exchange() {
1064        // Given
1065        let input = r#"
1066name: test_ingress
1067exchange: amq.fanout
1068"#;
1069        let expected_output = Ingress {
1070            name: "test_ingress".into(),
1071            exchange: Exchange::AmqFanout,
1072            ..Default::default()
1073        };
1074
1075        // When
1076        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap();
1077
1078        // Then
1079        assert_eq!(expected_output, actual_output);
1080    }
1081
1082    #[test]
1083    fn deserialize_from_name_and_routing_key() {
1084        // Given
1085        let input = r#"
1086name: test_ingress
1087queue: test_queue
1088"#;
1089        let expected_output = Ingress {
1090            name: "test_ingress".into(),
1091            queue: Queue::named("test_queue"),
1092            ..Default::default()
1093        };
1094
1095        // When
1096        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap();
1097
1098        // Then
1099        assert_eq!(expected_output, actual_output);
1100    }
1101
1102    #[test]
1103    fn deserialize_from_full() {
1104        // Given
1105        let input = r#"
1106extra_field: ignored
1107name: test_ingress
1108exchange: amq.topic
1109queue: test_queue
1110durable: true
1111exclusive: true
1112auto_delete: true
1113batch_size: 21
1114batch_timeout: 2s 150ms
1115prefetch_count: 42
1116acking_behavior: manual
1117gibberish_behavior: backwash
1118binding_keys:
1119  - test_binding_key_1
1120  - test_binding_key_2
1121binding_headers: {}
1122headers_behavior: any
1123"#;
1124        let expected_output = Ingress {
1125            name: "test_ingress".into(),
1126            exchange: Exchange::AmqTopic,
1127            queue: Queue::named("test_queue"),
1128            durable: true,
1129            exclusive: true,
1130            auto_delete: true,
1131            batch_size: NonZeroUsize::new(21).unwrap(),
1132            batch_timeout: Duration::from_millis(2150),
1133            prefetch_count: Some(NonZeroU16::new(42).unwrap()),
1134            acking_behavior: AckingBehavior::Manual,
1135            gibberish_behavior: FinalizationKind::Backwash,
1136            binding_keys: HashSet::from([
1137                "test_binding_key_1".to_string(),
1138                "test_binding_key_2".to_string(),
1139            ]),
1140            binding_headers: HashMap::new(),
1141            headers_behavior: HeadersMatchingBehavior::Any,
1142            ..Default::default()
1143        };
1144
1145        // When
1146        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap();
1147
1148        // Then
1149        assert_eq!(expected_output, actual_output);
1150    }
1151
1152    #[test]
1153    fn default_exchange_requires_queue_name() {
1154        // Given
1155        let expected_output = IngressError::DefaultExchangeRequiresQueueName {
1156            ingress: "test_ingress".into(),
1157        };
1158
1159        // When
1160        let actual_output = Ingress::builder()
1161            .with_name("test_ingress")
1162            .with_exchange(Exchange::Default)
1163            .with_queue(Queue::empty())
1164            .build()
1165            .unwrap_err();
1166
1167        // Then
1168        assert_eq!(expected_output, actual_output);
1169    }
1170
1171    #[test]
1172    fn deserialize_default_exchange_requires_queue_name() {
1173        // Given
1174        let input = r#"
1175name: test_ingress
1176exchange: ''
1177queue: ''
1178"#;
1179        let expected_output = IngressError::DefaultExchangeRequiresQueueName {
1180            ingress: "test_ingress".into(),
1181        };
1182
1183        // When
1184        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1185
1186        // Then
1187        assert!(
1188            actual_output
1189                .to_string()
1190                .starts_with(&expected_output.to_string()),
1191        );
1192    }
1193
1194    #[test]
1195    fn default_exchange_cannot_have_binding_keys() {
1196        // Given
1197        let expected_output = IngressError::DefaultExchangeCannotHaveBindingKeys {
1198            ingress: "test_ingress".into(),
1199        };
1200
1201        // When
1202        let actual_output = Ingress::builder()
1203            .with_name("test_ingress")
1204            .with_exchange(Exchange::Default)
1205            .with_queue_named("test_queue")
1206            .with_binding_key("test_binding_key")
1207            .build()
1208            .unwrap_err();
1209
1210        // Then
1211        assert_eq!(expected_output, actual_output);
1212    }
1213
1214    #[test]
1215    fn deserialize_default_exchange_cannot_have_binding_keys() {
1216        // Given
1217        let input = r#"
1218name: test_ingress
1219exchange: ''
1220queue: test_queue
1221binding_keys:
1222  - test_binding_key
1223"#;
1224        let expected_output = IngressError::DefaultExchangeCannotHaveBindingKeys {
1225            ingress: "test_ingress".into(),
1226        };
1227
1228        // When
1229        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1230
1231        // Then
1232        assert!(
1233            actual_output
1234                .to_string()
1235                .starts_with(&expected_output.to_string()),
1236        );
1237    }
1238
1239    #[test]
1240    fn default_exchange_cannot_have_binding_headers() {
1241        // Given
1242        let expected_output = IngressError::DefaultExchangeCannotHaveBindingHeaders {
1243            ingress: "test_ingress".into(),
1244        };
1245
1246        // When
1247        let actual_output = Ingress::builder()
1248            .with_name("test_ingress")
1249            .with_exchange(Exchange::Default)
1250            .with_queue_named("test_queue")
1251            .with_binding_header("test_binding_header", 42)
1252            .build()
1253            .unwrap_err();
1254
1255        // Then
1256        assert_eq!(expected_output, actual_output);
1257    }
1258
1259    #[test]
1260    fn deserialize_default_exchange_cannot_have_binding_headers() {
1261        // Given
1262        let input = r#"
1263name: test_ingress
1264exchange: ''
1265queue: test_queue
1266binding_headers:
1267    test_binding_header: '42'
1268"#;
1269        let expected_output = IngressError::DefaultExchangeCannotHaveBindingHeaders {
1270            ingress: "test_ingress".into(),
1271        };
1272
1273        // When
1274        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1275
1276        // Then
1277        assert!(
1278            actual_output
1279                .to_string()
1280                .starts_with(&expected_output.to_string()),
1281        );
1282    }
1283
1284    #[test]
1285    fn exchange_kind_requires_binding_keys() {
1286        // Given
1287        let expected_output = IngressError::ExchangeKindRequiresBindingKeys {
1288            ingress: "test_ingress".into(),
1289            kind: ExchangeKind::Direct,
1290        };
1291
1292        // When
1293        let actual_output = Ingress::builder()
1294            .with_name("test_ingress")
1295            .with_exchange(Exchange::AmqDirect)
1296            .build()
1297            .unwrap_err();
1298
1299        // Then
1300        assert_eq!(expected_output, actual_output);
1301    }
1302
1303    #[test]
1304    fn deserialize_exchange_kind_requires_binding_keys() {
1305        // Given
1306        let input = r#"
1307name: test_ingress
1308exchange: amq.direct
1309"#;
1310        let expected_output = IngressError::ExchangeKindRequiresBindingKeys {
1311            ingress: "test_ingress".into(),
1312            kind: ExchangeKind::Direct,
1313        };
1314
1315        // When
1316        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1317
1318        // Then
1319        assert!(
1320            actual_output
1321                .to_string()
1322                .starts_with(&expected_output.to_string()),
1323        );
1324    }
1325
1326    #[test]
1327    fn exchange_kind_cannot_have_binding_keys() {
1328        // Given
1329        let expected_output = IngressError::ExchangeKindCannotHaveBindingKeys {
1330            ingress: "test_ingress".into(),
1331            kind: ExchangeKind::Headers,
1332        };
1333
1334        // When
1335        let actual_output = Ingress::builder()
1336            .with_name("test_ingress")
1337            .with_exchange(Exchange::AmqHeaders)
1338            .with_queue_named("test_queue")
1339            .with_binding_key("test_binding_key")
1340            .with_binding_header("test_binding_header", 42)
1341            .build()
1342            .unwrap_err();
1343
1344        // Then
1345        assert_eq!(expected_output, actual_output);
1346    }
1347
1348    #[test]
1349    fn deserialize_exchange_kind_cannot_have_binding_keys() {
1350        // Given
1351        let input = r#"
1352name: test_ingress
1353exchange: amq.headers
1354queue: test_queue
1355binding_key: test_binding_key
1356binding_headers:
1357    test_binding_header: 42
1358"#;
1359        let expected_output = IngressError::ExchangeKindCannotHaveBindingKeys {
1360            ingress: "test_ingress".into(),
1361            kind: ExchangeKind::Headers,
1362        };
1363
1364        // When
1365        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1366
1367        // Then
1368        assert!(
1369            actual_output
1370                .to_string()
1371                .starts_with(&expected_output.to_string()),
1372        );
1373    }
1374
1375    #[test]
1376    fn exchange_kind_requires_binding_headers() {
1377        // Given
1378        let expected_output = IngressError::ExchangeKindRequiresBindingHeaders {
1379            ingress: "test_ingress".into(),
1380            kind: ExchangeKind::Headers,
1381        };
1382
1383        // When
1384        let actual_output = Ingress::builder()
1385            .with_name("test_ingress")
1386            .with_exchange(Exchange::AmqHeaders)
1387            .with_queue_named("test_queue")
1388            .build()
1389            .unwrap_err();
1390
1391        // Then
1392        assert_eq!(expected_output, actual_output);
1393    }
1394
1395    #[test]
1396    fn deserialize_exchange_kind_requires_binding_headers() {
1397        // Given
1398        let input = r#"
1399name: test_ingress
1400exchange: amq.headers
1401queue: test_queue
1402"#;
1403        let expected_output = IngressError::ExchangeKindRequiresBindingHeaders {
1404            ingress: "test_ingress".into(),
1405            kind: ExchangeKind::Headers,
1406        };
1407
1408        // When
1409        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1410
1411        // Then
1412        assert!(
1413            actual_output
1414                .to_string()
1415                .starts_with(&expected_output.to_string()),
1416        );
1417    }
1418
1419    #[test]
1420    fn exchange_kind_cannot_have_binding_headers() {
1421        // Given
1422        let expected_output = IngressError::ExchangeKindCannotHaveBindingHeaders {
1423            ingress: "test_ingress".into(),
1424            kind: ExchangeKind::Topic,
1425        };
1426
1427        // When
1428        let actual_output = Ingress::builder()
1429            .with_name("test_ingress")
1430            .with_exchange(Exchange::AmqTopic)
1431            .with_queue_named("test_queue")
1432            .with_binding_key("test_binding_key")
1433            .with_binding_header("test_binding_header", 42)
1434            .build()
1435            .unwrap_err();
1436
1437        // Then
1438        assert_eq!(expected_output, actual_output);
1439    }
1440
1441    #[test]
1442    fn deserialize_exchange_kind_cannot_have_binding_headers() {
1443        // Given
1444        let input = r#"
1445name: test_ingress
1446exchange: amq.topic
1447queue: test_queue
1448binding_key: test_binding_key
1449binding_headers:
1450    test_binding_header: 42
1451"#;
1452        let expected_output = IngressError::ExchangeKindCannotHaveBindingHeaders {
1453            ingress: "test_ingress".into(),
1454            kind: ExchangeKind::Topic,
1455        };
1456
1457        // When
1458        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1459
1460        // Then
1461        assert!(
1462            actual_output
1463                .to_string()
1464                .starts_with(&expected_output.to_string()),
1465        );
1466    }
1467
1468    #[test]
1469    fn exchange_kind_cannot_have_empty_binding_key() {
1470        // Given
1471        let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingKey {
1472            ingress: "test_ingress".into(),
1473            kind: ExchangeKind::Topic,
1474        };
1475
1476        // When
1477        let actual_output = Ingress::builder()
1478            .with_name("test_ingress")
1479            .with_exchange(Exchange::AmqTopic)
1480            .with_queue_named("test_queue")
1481            .with_binding_key("")
1482            .build()
1483            .unwrap_err();
1484
1485        // Then
1486        assert_eq!(expected_output, actual_output);
1487    }
1488
1489    #[test]
1490    fn deserialize_exchange_kind_cannot_have_empty_binding_key() {
1491        // Given
1492        let input = r#"
1493name: test_ingress
1494exchange: amq.topic
1495queue: test_queue
1496binding_key: ''
1497"#;
1498        let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingKey {
1499            ingress: "test_ingress".into(),
1500            kind: ExchangeKind::Topic,
1501        };
1502
1503        // When
1504        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1505
1506        // Then
1507        assert!(
1508            actual_output
1509                .to_string()
1510                .starts_with(&expected_output.to_string()),
1511        );
1512    }
1513
1514    #[test]
1515    fn exchange_kind_cannot_have_empty_binding_header() {
1516        // Given
1517        let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingHeader {
1518            ingress: "test_ingress".into(),
1519            kind: ExchangeKind::Headers,
1520        };
1521
1522        // When
1523        let actual_output = Ingress::builder()
1524            .with_name("test_ingress")
1525            .with_exchange(Exchange::AmqHeaders)
1526            .with_queue_named("test_queue")
1527            .with_binding_header("test_binding_header", "")
1528            .build()
1529            .unwrap_err();
1530
1531        // Then
1532        assert_eq!(expected_output, actual_output);
1533    }
1534
1535    #[test]
1536    fn deserialize_exchange_kind_cannot_have_empty_binding_header() {
1537        // Given
1538        let input = r#"
1539name: test_ingress
1540exchange: amq.headers
1541queue: test_queue
1542binding_header:
1543    test_binding_header: ''
1544"#;
1545        let expected_output = IngressError::ExchangeKindCannotHaveEmptyBindingHeader {
1546            ingress: "test_ingress".into(),
1547            kind: ExchangeKind::Headers,
1548        };
1549
1550        // When
1551        let actual_output = serde_yml::from_str::<Ingress>(input).unwrap_err();
1552
1553        // Then
1554        assert!(
1555            actual_output
1556                .to_string()
1557                .starts_with(&expected_output.to_string()),
1558        );
1559    }
1560
1561    #[test]
1562    fn deserialize_landscape_from_empty() {
1563        // Given
1564        let input = "";
1565        let expected_output = IngressLandscape::default();
1566
1567        // When
1568        let actual_output = serde_yml::from_str::<IngressLandscape>(input).unwrap();
1569
1570        // Then
1571        assert_eq!(expected_output, actual_output);
1572    }
1573
1574    #[test]
1575    fn deserialize_landscape_from_full() {
1576        // Given
1577        let input = r#"
1578test_ingress_a: test_queue_a
1579test_ingress_b:
1580    exchange: test_exchange_b
1581    queue: test_queue_b
1582    binding_key: test_binding_key_b
1583"#;
1584        let expected_output = IngressLandscape::from([
1585            (
1586                "test_ingress_a",
1587                Ingress::builder()
1588                    .with_name("test_ingress_a")
1589                    .with_exchange(Exchange::Default)
1590                    .with_queue_named("test_queue_a")
1591                    .build()
1592                    .unwrap(),
1593            ),
1594            (
1595                "test_ingress_b",
1596                Ingress::builder()
1597                    .with_name("test_ingress_b")
1598                    .with_exchange(Exchange::named("test_exchange_b").unwrap())
1599                    .with_queue_named("test_queue_b")
1600                    .with_binding_key("test_binding_key_b")
1601                    .build()
1602                    .unwrap(),
1603            ),
1604        ]);
1605
1606        // When
1607        let actual_output = serde_yml::from_str::<IngressLandscape>(input).unwrap();
1608
1609        // Then
1610        assert_eq!(expected_output, actual_output);
1611    }
1612}