strut_rabbitmq/routing/
egress.rs

1use crate::{ConfirmationLevel, Exchange, ExchangeKind};
2use serde::de::{DeserializeSeed, Error, MapAccess, Visitor};
3use serde::{Deserialize, Deserializer};
4use std::collections::HashMap;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use strut_deserialize::{Slug, SlugMap};
8use strut_factory::impl_deserialize_field;
9use thiserror::Error;
10
11/// Represents a collection of uniquely named [`Egress`] definitions.
12#[derive(Debug, Default, Clone, PartialEq, Eq)]
13pub struct EgressLandscape {
14    egresses: SlugMap<Egress>,
15}
16
17/// Defines an outbound path for messages being sent into a RabbitMQ cluster.
18///
19/// Note, that the exchange is just a string name and not a full-fledged
20/// [`Exchange`] definition. This is because the exchange is only referenced
21/// by name on the outbound side. The responsibility for declaring an exchange is
22/// entirely on the inbound side.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct Egress {
25    name: Arc<str>,
26    exchange: Arc<str>,
27    routing_key: Arc<str>,
28    confirmation: ConfirmationLevel,
29    force_durable: bool,
30}
31
32impl EgressLandscape {
33    /// Reports whether this landscape contains a [`Egress`] with the
34    /// given unique name.
35    pub fn contains(&self, name: impl AsRef<str>) -> bool {
36        self.egresses.contains_key(name.as_ref())
37    }
38
39    /// Retrieves `Some` reference to a [`Egress`] from this landscape
40    /// under the given name, or `None`, if the name is not present in the
41    /// landscape.
42    pub fn get(&self, name: impl AsRef<str>) -> Option<&Egress> {
43        self.egresses.get(name.as_ref())
44    }
45
46    /// Retrieves a reference to a [`Egress`] from this landscape under
47    /// the given name. Panics if the name is not present in the collection.
48    pub fn expect(&self, name: impl AsRef<str>) -> &Egress {
49        let name = name.as_ref();
50
51        self.get(name)
52            .unwrap_or_else(|| panic!("requested an undefined RabbitMQ egress '{}'", name))
53    }
54}
55
56impl Egress {
57    /// Creates a new [`EgressBuilder`].
58    pub fn builder() -> EgressBuilder {
59        EgressBuilder::new()
60    }
61}
62
63impl Egress {
64    /// Reports the egress name for this definition.
65    pub fn name(&self) -> &str {
66        &self.name
67    }
68
69    /// Reports the egress exchange name for this definition.
70    pub fn exchange(&self) -> &str {
71        &self.exchange
72    }
73
74    /// Reports the egress routing key for this definition.
75    pub fn routing_key(&self) -> &str {
76        &self.routing_key
77    }
78
79    /// Reports the egress confirmation level for this definition.
80    pub fn confirmation(&self) -> ConfirmationLevel {
81        self.confirmation
82    }
83
84    /// Reports the egress `force_durable` flag for this definition.
85    pub fn force_durable(&self) -> bool {
86        self.force_durable
87    }
88}
89
90impl Egress {
91    /// Reports whether this definition requires any sending confirmation beyond
92    /// the bare minimum of network transmission. If so, this should prompt the
93    /// publisher to enable publisher confirms on the RabbitMQ channel.
94    pub(crate) fn requires_any_confirmation(&self) -> bool {
95        match self.confirmation {
96            ConfirmationLevel::Transmitted => false,
97            ConfirmationLevel::Accepted => true,
98            ConfirmationLevel::Routed => true,
99        }
100    }
101
102    /// Reports whether this definition warrants a `mandatory` flag on the
103    /// RabbitMQ `basic_publish` call.
104    pub(crate) fn requires_mandatory_publish(&self) -> bool {
105        match self.confirmation {
106            ConfirmationLevel::Transmitted => false,
107            ConfirmationLevel::Accepted => false,
108            ConfirmationLevel::Routed => true,
109        }
110    }
111}
112
113/// Builds an [`Egress`] incrementally and validates it on the final stage.
114#[derive(Debug)]
115pub struct EgressBuilder {
116    name: Arc<str>,
117    exchange: Arc<str>,
118    routing_key: Arc<str>,
119    confirmation: ConfirmationLevel,
120    force_durable: bool,
121}
122
123impl EgressBuilder {
124    /// Creates a new [`Egress`] builder.
125    pub fn new() -> Self {
126        Self {
127            name: Arc::from(Egress::default_name()),
128            exchange: Arc::from(Egress::default_exchange()),
129            routing_key: Arc::from(Egress::default_routing_key()),
130            confirmation: Egress::default_confirmation(),
131            force_durable: Egress::default_force_durable(),
132        }
133    }
134
135    /// Recreates this egress definition builder with the given name.
136    pub fn with_name(self, name: impl AsRef<str>) -> Self {
137        Self {
138            name: Arc::from(name.as_ref()),
139            ..self
140        }
141    }
142
143    /// Recreates this egress definition builder with the given exchange name.
144    pub fn with_exchange(self, exchange: impl AsRef<str>) -> Self {
145        Self {
146            exchange: Arc::from(exchange.as_ref()),
147            ..self
148        }
149    }
150
151    /// Recreates this egress definition builder with the given routing key.
152    pub fn with_routing_key(self, routing_key: impl AsRef<str>) -> Self {
153        Self {
154            routing_key: Arc::from(routing_key.as_ref()),
155            ..self
156        }
157    }
158
159    /// Recreates this egress definition builder with the given confirmation
160    /// level.
161    pub fn with_confirmation(self, confirmation: ConfirmationLevel) -> Self {
162        Self {
163            confirmation,
164            ..self
165        }
166    }
167
168    /// Recreates this egress definition builder with the given `force_durable`
169    /// flag.
170    pub fn with_force_durable(self, force_durable: bool) -> Self {
171        Self {
172            force_durable,
173            ..self
174        }
175    }
176
177    /// Finalizes the builder, validates its state, and, assuming valid state,
178    /// returns the [`Ingress`].
179    ///
180    /// In case the exchange name is the name of one of the known built-in RabbitMQ
181    /// exchanges, some validation is applied on the given values. The built-in exchanges
182    /// either require a non-empty routing key, or, on the opposite, ignore the routing
183    /// key. For built-in exchanges, this method will return an error if the given routing
184    /// key does not match the requirement.
185    pub fn build(self) -> Result<Egress, EgressError> {
186        self.validate()?;
187
188        Ok(Egress {
189            name: self.name,
190            exchange: self.exchange,
191            routing_key: self.routing_key,
192            confirmation: self.confirmation,
193            force_durable: self.force_durable,
194        })
195    }
196
197    /// Validates whether the given combination of exchange name and routing key make
198    /// sense.
199    fn validate(&self) -> Result<(), EgressError> {
200        if let Some(builtin_exchange) = Exchange::try_builtin_named(&self.exchange) {
201            match builtin_exchange.kind() {
202                ExchangeKind::Direct | ExchangeKind::Topic | ExchangeKind::HashKey => {
203                    if self.routing_key.is_empty() {
204                        return Err(EgressError::ExchangeRequiresRoutingKey {
205                            egress: self.name.to_string(),
206                            exchange: builtin_exchange,
207                        });
208                    };
209                }
210                ExchangeKind::Fanout | ExchangeKind::Headers | ExchangeKind::HashId => {
211                    if !self.routing_key.is_empty() {
212                        return Err(EgressError::ExchangeCannotHaveRoutingKey {
213                            egress: self.name.to_string(),
214                            exchange: builtin_exchange,
215                            routing_key: self.routing_key.to_string(),
216                        });
217                    }
218                }
219            };
220        }
221
222        Ok(())
223    }
224}
225
226impl Egress {
227    fn default_name() -> &'static str {
228        "default"
229    }
230
231    fn default_exchange() -> &'static str {
232        ""
233    }
234
235    fn default_routing_key() -> &'static str {
236        ""
237    }
238
239    fn default_confirmation() -> ConfirmationLevel {
240        ConfirmationLevel::Transmitted
241    }
242
243    fn default_force_durable() -> bool {
244        false
245    }
246}
247
248#[cfg(test)]
249impl Default for Egress {
250    fn default() -> Self {
251        Self {
252            name: Arc::from(""),
253            exchange: Arc::from(Self::default_exchange()),
254            routing_key: Arc::from(Self::default_routing_key()),
255            confirmation: Self::default_confirmation(),
256            force_durable: Self::default_force_durable(),
257        }
258    }
259}
260
261/// Represents the various error states of a RabbitMQ egress definition.
262#[derive(Error, Debug, PartialEq, Eq)]
263pub enum EgressError {
264    /// Indicates the absence of a routing key where it is required.
265    #[error(
266        "invalid configuration for egress '{egress}' with built-in exchange '{exchange}' ({exchange:?}, type '{}'): expected routing key, found none/empty",
267        .exchange.kind(),
268    )]
269    ExchangeRequiresRoutingKey {
270        /// Egress name
271        egress: String,
272        /// Built-in exchange that requires a routing key
273        exchange: Exchange,
274    },
275
276    /// Indicates the presence of a routing key where it is ignored.
277    #[error(
278        "invalid configuration for egress '{egress}' with built-in exchange '{exchange}' ({exchange:?}, type '{}'): expected no/empty routing key, found '{routing_key}'",
279        .exchange.kind(),
280    )]
281    ExchangeCannotHaveRoutingKey {
282        /// Egress name
283        egress: String,
284        /// Built-in exchange that ignores a routing key
285        exchange: Exchange,
286        /// Given routing key
287        routing_key: String,
288    },
289}
290
291impl AsRef<Egress> for Egress {
292    fn as_ref(&self) -> &Egress {
293        self
294    }
295}
296
297impl AsRef<EgressLandscape> for EgressLandscape {
298    fn as_ref(&self) -> &EgressLandscape {
299        self
300    }
301}
302
303const _: () = {
304    impl<S> FromIterator<(S, Egress)> for EgressLandscape
305    where
306        S: Into<Slug>,
307    {
308        fn from_iter<T: IntoIterator<Item = (S, Egress)>>(iter: T) -> Self {
309            let egresses = iter.into_iter().collect();
310
311            Self { egresses }
312        }
313    }
314
315    impl<const N: usize, S> From<[(S, Egress); N]> for EgressLandscape
316    where
317        S: Into<Slug>,
318    {
319        fn from(value: [(S, Egress); N]) -> Self {
320            value.into_iter().collect()
321        }
322    }
323};
324
325const _: () = {
326    impl<'de> Deserialize<'de> for EgressLandscape {
327        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
328        where
329            D: Deserializer<'de>,
330        {
331            deserializer.deserialize_map(EgressLandscapeVisitor)
332        }
333    }
334
335    struct EgressLandscapeVisitor;
336
337    impl<'de> Visitor<'de> for EgressLandscapeVisitor {
338        type Value = EgressLandscape;
339
340        fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
341            formatter.write_str("a map of RabbitMQ egress landscape")
342        }
343
344        fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
345        where
346            A: MapAccess<'de>,
347        {
348            let grouped = Slug::group_map(map)?;
349            let mut egresses = HashMap::with_capacity(grouped.len());
350
351            for (key, value) in grouped {
352                let seed = EgressSeed {
353                    name: key.original(),
354                };
355                let handle = seed.deserialize(value).map_err(Error::custom)?;
356                egresses.insert(key, handle);
357            }
358
359            Ok(EgressLandscape {
360                egresses: SlugMap::new(egresses),
361            })
362        }
363    }
364
365    struct EgressSeed<'a> {
366        name: &'a str,
367    }
368
369    impl<'de> DeserializeSeed<'de> for EgressSeed<'_> {
370        type Value = Egress;
371
372        fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
373        where
374            D: Deserializer<'de>,
375        {
376            deserializer.deserialize_any(EgressSeedVisitor { name: self.name })
377        }
378    }
379
380    struct EgressSeedVisitor<'a> {
381        name: &'a str,
382    }
383
384    impl<'de> Visitor<'de> for EgressSeedVisitor<'_> {
385        type Value = Egress;
386
387        fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
388            formatter.write_str("a map of RabbitMQ egress or a string routing key")
389        }
390
391        fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
392        where
393            E: Error,
394        {
395            Egress::builder()
396                .with_name(self.name)
397                .with_routing_key(value)
398                .build()
399                .map_err(Error::custom)
400        }
401
402        fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
403        where
404            A: MapAccess<'de>,
405        {
406            visit_egress(map, Some(self.name))
407        }
408    }
409
410    impl<'de> Deserialize<'de> for Egress {
411        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
412        where
413            D: Deserializer<'de>,
414        {
415            deserializer.deserialize_map(EgressVisitor)
416        }
417    }
418
419    struct EgressVisitor;
420
421    impl<'de> Visitor<'de> for EgressVisitor {
422        type Value = Egress;
423
424        fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
425            formatter.write_str("a map of RabbitMQ egress")
426        }
427
428        fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
429        where
430            A: MapAccess<'de>,
431        {
432            visit_egress(map, None)
433        }
434    }
435
436    fn visit_egress<'de, A>(mut map: A, known_name: Option<&str>) -> Result<Egress, A::Error>
437    where
438        A: MapAccess<'de>,
439    {
440        let mut name: Option<String> = None;
441        let mut exchange: Option<String> = None;
442        let mut routing_key: Option<String> = None;
443        let mut confirmation = None;
444        let mut force_durable = None;
445
446        while let Some(key) = map.next_key()? {
447            match key {
448                EgressField::name => key.poll(&mut map, &mut name)?,
449                EgressField::exchange => key.poll(&mut map, &mut exchange)?,
450                EgressField::routing_key => key.poll(&mut map, &mut routing_key)?,
451                EgressField::confirmation => key.poll(&mut map, &mut confirmation)?,
452                EgressField::force_durable => key.poll(&mut map, &mut force_durable)?,
453                EgressField::__ignore => map.next_value()?,
454            };
455        }
456
457        let name = match known_name {
458            Some(known_name) => known_name,
459            None => name.as_deref().unwrap_or_else(|| Egress::default_name()),
460        };
461
462        let mut builder = Egress::builder();
463
464        let exchange = exchange
465            .as_deref()
466            .unwrap_or_else(|| Egress::default_exchange());
467        let routing_key = routing_key
468            .as_deref()
469            .unwrap_or_else(|| Egress::default_routing_key());
470
471        builder = builder
472            .with_name(name)
473            .with_exchange(exchange)
474            .with_routing_key(routing_key);
475
476        if let Some(confirmation) = confirmation {
477            builder = builder.with_confirmation(confirmation);
478        }
479
480        if let Some(force_durable) = force_durable {
481            builder = builder.with_force_durable(force_durable);
482        }
483
484        Ok(builder.build().map_err(Error::custom)?)
485    }
486
487    impl_deserialize_field!(
488        EgressField,
489        strut_deserialize::Slug::eq_as_slugs,
490        name,
491        exchange,
492        routing_key,
493        confirmation | confirmation_level,
494        force_durable,
495    );
496};
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use pretty_assertions::assert_eq;
502
503    #[test]
504    fn deserialize_from_empty() {
505        // Given
506        let input = "";
507
508        // When
509        let actual_output = serde_yml::from_str::<Egress>(input);
510
511        // Then
512        assert!(actual_output.is_err());
513    }
514
515    #[test]
516    fn deserialize_from_string() {
517        // Given
518        let input = "\"test_egress\"";
519
520        // When
521        let actual_output = serde_yml::from_str::<Egress>(input);
522
523        // Then
524        assert!(actual_output.is_err());
525    }
526
527    #[test]
528    fn deserialize_from_name() {
529        // Given
530        let input = r#"
531name: test_egress
532"#;
533
534        // When
535        let actual_output = serde_yml::from_str::<Egress>(input);
536
537        // Then
538        assert!(actual_output.is_err());
539    }
540
541    #[test]
542    fn deserialize_from_name_and_exchange() {
543        // Given
544        let input = r#"
545name: test_egress
546exchange: amq.fanout
547"#;
548        let expected_output = Egress {
549            name: "test_egress".into(),
550            exchange: Exchange::AmqFanout.name().into(),
551            ..Default::default()
552        };
553
554        // When
555        let actual_output = serde_yml::from_str::<Egress>(input).unwrap();
556
557        // Then
558        assert_eq!(expected_output, actual_output);
559    }
560
561    #[test]
562    fn deserialize_from_name_and_routing_key() {
563        // Given
564        let input = r#"
565name: test_egress
566routing_key: test_routing_key
567"#;
568        let expected_output = Egress {
569            name: "test_egress".into(),
570            routing_key: "test_routing_key".into(),
571            ..Default::default()
572        };
573
574        // When
575        let actual_output = serde_yml::from_str::<Egress>(input).unwrap();
576
577        // Then
578        assert_eq!(expected_output, actual_output);
579    }
580
581    #[test]
582    fn deserialize_from_full() {
583        // Given
584        let input = r#"
585extra_field: ignored
586name: test_egress
587exchange: amq.topic
588routing_key: test_routing_key
589confirmation: routed
590force_durable: true
591"#;
592        let expected_output = Egress {
593            name: "test_egress".into(),
594            exchange: Exchange::AmqTopic.name().into(),
595            routing_key: "test_routing_key".into(),
596            confirmation: ConfirmationLevel::Routed,
597            force_durable: true,
598            ..Default::default()
599        };
600
601        // When
602        let actual_output = serde_yml::from_str::<Egress>(input).unwrap();
603
604        // Then
605        assert_eq!(expected_output, actual_output);
606    }
607
608    #[test]
609    fn exchange_requires_routing_key() {
610        // Given
611        let expected_output = EgressError::ExchangeRequiresRoutingKey {
612            egress: "test_egress".into(),
613            exchange: Exchange::AmqTopic,
614        };
615
616        // When
617        let actual_output = Egress::builder()
618            .with_name("test_egress")
619            .with_exchange(Exchange::AmqTopic.name())
620            .with_routing_key("")
621            .build()
622            .unwrap_err();
623
624        // Then
625        assert_eq!(expected_output, actual_output);
626    }
627
628    #[test]
629    fn deserialize_exchange_requires_routing_key() {
630        // Given
631        let input = r#"
632name: test_egress
633exchange: amq.topic
634"#;
635        let expected_output = EgressError::ExchangeRequiresRoutingKey {
636            egress: "test_egress".into(),
637            exchange: Exchange::AmqTopic,
638        };
639
640        // When
641        let actual_output = serde_yml::from_str::<Egress>(input).unwrap_err();
642
643        // Then
644        assert!(
645            actual_output
646                .to_string()
647                .starts_with(&expected_output.to_string()),
648        );
649    }
650
651    #[test]
652    fn exchange_cannot_have_routing_key() {
653        // Given
654        let expected_output = EgressError::ExchangeCannotHaveRoutingKey {
655            egress: "test_egress".into(),
656            exchange: Exchange::AmqHeaders,
657            routing_key: "test_routing_key".into(),
658        };
659
660        // When
661        let actual_output = Egress::builder()
662            .with_name("test_egress")
663            .with_exchange(Exchange::AmqHeaders.name())
664            .with_routing_key("test_routing_key")
665            .build()
666            .unwrap_err();
667
668        // Then
669        assert_eq!(expected_output, actual_output);
670    }
671
672    #[test]
673    fn deserialize_exchange_cannot_have_routing_key() {
674        // Given
675        let input = r#"
676name: test_egress
677exchange: amq.headers
678routing_key: test_routing_key
679"#;
680        let expected_output = EgressError::ExchangeCannotHaveRoutingKey {
681            egress: "test_egress".into(),
682            exchange: Exchange::AmqHeaders,
683            routing_key: "test_routing_key".into(),
684        };
685
686        // When
687        let actual_output = serde_yml::from_str::<Egress>(input).unwrap_err();
688
689        // Then
690        assert!(
691            actual_output
692                .to_string()
693                .starts_with(&expected_output.to_string()),
694        );
695    }
696
697    #[test]
698    fn deserialize_landscape_from_empty() {
699        // Given
700        let input = "";
701        let expected_output = EgressLandscape::default();
702
703        // When
704        let actual_output = serde_yml::from_str::<EgressLandscape>(input).unwrap();
705
706        // Then
707        assert_eq!(expected_output, actual_output);
708    }
709
710    #[test]
711    fn deserialize_landscape_from_full() {
712        // Given
713        let input = r#"
714test_egress_a: test_routing_key_a
715test_egress_b:
716    exchange: test_exchange_b
717    routing_key: test_routing_key_b
718"#;
719        let expected_output = EgressLandscape::from([
720            (
721                "test_egress_a",
722                Egress::builder()
723                    .with_name("test_egress_a")
724                    .with_exchange("")
725                    .with_routing_key("test_routing_key_a")
726                    .build()
727                    .unwrap(),
728            ),
729            (
730                "test_egress_b",
731                Egress::builder()
732                    .with_name("test_egress_b")
733                    .with_exchange("test_exchange_b")
734                    .with_routing_key("test_routing_key_b")
735                    .build()
736                    .unwrap(),
737            ),
738        ]);
739
740        // When
741        let actual_output = serde_yml::from_str::<EgressLandscape>(input).unwrap();
742
743        // Then
744        assert_eq!(expected_output, actual_output);
745    }
746}