1use crate::{ConfirmationLevel, Exchange, ExchangeKind};
2use serde::de::{DeserializeSeed, Error, MapAccess, Visitor};
3use serde::{Deserialize, Deserializer};
4use std::collections::HashMap;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use strut_deserialize::{Slug, SlugMap};
8use strut_factory::impl_deserialize_field;
9use thiserror::Error;
10
11#[derive(Debug, Default, Clone, PartialEq, Eq)]
13pub struct EgressLandscape {
14 egresses: SlugMap<Egress>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct Egress {
25 name: Arc<str>,
26 exchange: Arc<str>,
27 routing_key: Arc<str>,
28 confirmation: ConfirmationLevel,
29 force_durable: bool,
30}
31
32impl EgressLandscape {
33 pub fn contains(&self, name: impl AsRef<str>) -> bool {
36 self.egresses.contains_key(name.as_ref())
37 }
38
39 pub fn get(&self, name: impl AsRef<str>) -> Option<&Egress> {
43 self.egresses.get(name.as_ref())
44 }
45
46 pub fn expect(&self, name: impl AsRef<str>) -> &Egress {
49 let name = name.as_ref();
50
51 self.get(name)
52 .unwrap_or_else(|| panic!("requested an undefined RabbitMQ egress '{}'", name))
53 }
54}
55
56impl Egress {
57 pub fn builder() -> EgressBuilder {
59 EgressBuilder::new()
60 }
61}
62
63impl Egress {
64 pub fn name(&self) -> &str {
66 &self.name
67 }
68
69 pub fn exchange(&self) -> &str {
71 &self.exchange
72 }
73
74 pub fn routing_key(&self) -> &str {
76 &self.routing_key
77 }
78
79 pub fn confirmation(&self) -> ConfirmationLevel {
81 self.confirmation
82 }
83
84 pub fn force_durable(&self) -> bool {
86 self.force_durable
87 }
88}
89
90impl Egress {
91 pub(crate) fn requires_any_confirmation(&self) -> bool {
95 match self.confirmation {
96 ConfirmationLevel::Transmitted => false,
97 ConfirmationLevel::Accepted => true,
98 ConfirmationLevel::Routed => true,
99 }
100 }
101
102 pub(crate) fn requires_mandatory_publish(&self) -> bool {
105 match self.confirmation {
106 ConfirmationLevel::Transmitted => false,
107 ConfirmationLevel::Accepted => false,
108 ConfirmationLevel::Routed => true,
109 }
110 }
111}
112
113#[derive(Debug)]
115pub struct EgressBuilder {
116 name: Arc<str>,
117 exchange: Arc<str>,
118 routing_key: Arc<str>,
119 confirmation: ConfirmationLevel,
120 force_durable: bool,
121}
122
123impl EgressBuilder {
124 pub fn new() -> Self {
126 Self {
127 name: Arc::from(Egress::default_name()),
128 exchange: Arc::from(Egress::default_exchange()),
129 routing_key: Arc::from(Egress::default_routing_key()),
130 confirmation: Egress::default_confirmation(),
131 force_durable: Egress::default_force_durable(),
132 }
133 }
134
135 pub fn with_name(self, name: impl AsRef<str>) -> Self {
137 Self {
138 name: Arc::from(name.as_ref()),
139 ..self
140 }
141 }
142
143 pub fn with_exchange(self, exchange: impl AsRef<str>) -> Self {
145 Self {
146 exchange: Arc::from(exchange.as_ref()),
147 ..self
148 }
149 }
150
151 pub fn with_routing_key(self, routing_key: impl AsRef<str>) -> Self {
153 Self {
154 routing_key: Arc::from(routing_key.as_ref()),
155 ..self
156 }
157 }
158
159 pub fn with_confirmation(self, confirmation: ConfirmationLevel) -> Self {
162 Self {
163 confirmation,
164 ..self
165 }
166 }
167
168 pub fn with_force_durable(self, force_durable: bool) -> Self {
171 Self {
172 force_durable,
173 ..self
174 }
175 }
176
177 pub fn build(self) -> Result<Egress, EgressError> {
186 self.validate()?;
187
188 Ok(Egress {
189 name: self.name,
190 exchange: self.exchange,
191 routing_key: self.routing_key,
192 confirmation: self.confirmation,
193 force_durable: self.force_durable,
194 })
195 }
196
197 fn validate(&self) -> Result<(), EgressError> {
200 if let Some(builtin_exchange) = Exchange::try_builtin_named(&self.exchange) {
201 match builtin_exchange.kind() {
202 ExchangeKind::Direct | ExchangeKind::Topic | ExchangeKind::HashKey => {
203 if self.routing_key.is_empty() {
204 return Err(EgressError::ExchangeRequiresRoutingKey {
205 egress: self.name.to_string(),
206 exchange: builtin_exchange,
207 });
208 };
209 }
210 ExchangeKind::Fanout | ExchangeKind::Headers | ExchangeKind::HashId => {
211 if !self.routing_key.is_empty() {
212 return Err(EgressError::ExchangeCannotHaveRoutingKey {
213 egress: self.name.to_string(),
214 exchange: builtin_exchange,
215 routing_key: self.routing_key.to_string(),
216 });
217 }
218 }
219 };
220 }
221
222 Ok(())
223 }
224}
225
226impl Egress {
227 fn default_name() -> &'static str {
228 "default"
229 }
230
231 fn default_exchange() -> &'static str {
232 ""
233 }
234
235 fn default_routing_key() -> &'static str {
236 ""
237 }
238
239 fn default_confirmation() -> ConfirmationLevel {
240 ConfirmationLevel::Transmitted
241 }
242
243 fn default_force_durable() -> bool {
244 false
245 }
246}
247
248#[cfg(test)]
249impl Default for Egress {
250 fn default() -> Self {
251 Self {
252 name: Arc::from(""),
253 exchange: Arc::from(Self::default_exchange()),
254 routing_key: Arc::from(Self::default_routing_key()),
255 confirmation: Self::default_confirmation(),
256 force_durable: Self::default_force_durable(),
257 }
258 }
259}
260
261#[derive(Error, Debug, PartialEq, Eq)]
263pub enum EgressError {
264 #[error(
266 "invalid configuration for egress '{egress}' with built-in exchange '{exchange}' ({exchange:?}, type '{}'): expected routing key, found none/empty",
267 .exchange.kind(),
268 )]
269 ExchangeRequiresRoutingKey {
270 egress: String,
272 exchange: Exchange,
274 },
275
276 #[error(
278 "invalid configuration for egress '{egress}' with built-in exchange '{exchange}' ({exchange:?}, type '{}'): expected no/empty routing key, found '{routing_key}'",
279 .exchange.kind(),
280 )]
281 ExchangeCannotHaveRoutingKey {
282 egress: String,
284 exchange: Exchange,
286 routing_key: String,
288 },
289}
290
291impl AsRef<Egress> for Egress {
292 fn as_ref(&self) -> &Egress {
293 self
294 }
295}
296
297impl AsRef<EgressLandscape> for EgressLandscape {
298 fn as_ref(&self) -> &EgressLandscape {
299 self
300 }
301}
302
303const _: () = {
304 impl<S> FromIterator<(S, Egress)> for EgressLandscape
305 where
306 S: Into<Slug>,
307 {
308 fn from_iter<T: IntoIterator<Item = (S, Egress)>>(iter: T) -> Self {
309 let egresses = iter.into_iter().collect();
310
311 Self { egresses }
312 }
313 }
314
315 impl<const N: usize, S> From<[(S, Egress); N]> for EgressLandscape
316 where
317 S: Into<Slug>,
318 {
319 fn from(value: [(S, Egress); N]) -> Self {
320 value.into_iter().collect()
321 }
322 }
323};
324
325const _: () = {
326 impl<'de> Deserialize<'de> for EgressLandscape {
327 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
328 where
329 D: Deserializer<'de>,
330 {
331 deserializer.deserialize_map(EgressLandscapeVisitor)
332 }
333 }
334
335 struct EgressLandscapeVisitor;
336
337 impl<'de> Visitor<'de> for EgressLandscapeVisitor {
338 type Value = EgressLandscape;
339
340 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
341 formatter.write_str("a map of RabbitMQ egress landscape")
342 }
343
344 fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
345 where
346 A: MapAccess<'de>,
347 {
348 let grouped = Slug::group_map(map)?;
349 let mut egresses = HashMap::with_capacity(grouped.len());
350
351 for (key, value) in grouped {
352 let seed = EgressSeed {
353 name: key.original(),
354 };
355 let handle = seed.deserialize(value).map_err(Error::custom)?;
356 egresses.insert(key, handle);
357 }
358
359 Ok(EgressLandscape {
360 egresses: SlugMap::new(egresses),
361 })
362 }
363 }
364
365 struct EgressSeed<'a> {
366 name: &'a str,
367 }
368
369 impl<'de> DeserializeSeed<'de> for EgressSeed<'_> {
370 type Value = Egress;
371
372 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
373 where
374 D: Deserializer<'de>,
375 {
376 deserializer.deserialize_any(EgressSeedVisitor { name: self.name })
377 }
378 }
379
380 struct EgressSeedVisitor<'a> {
381 name: &'a str,
382 }
383
384 impl<'de> Visitor<'de> for EgressSeedVisitor<'_> {
385 type Value = Egress;
386
387 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
388 formatter.write_str("a map of RabbitMQ egress or a string routing key")
389 }
390
391 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
392 where
393 E: Error,
394 {
395 Egress::builder()
396 .with_name(self.name)
397 .with_routing_key(value)
398 .build()
399 .map_err(Error::custom)
400 }
401
402 fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
403 where
404 A: MapAccess<'de>,
405 {
406 visit_egress(map, Some(self.name))
407 }
408 }
409
410 impl<'de> Deserialize<'de> for Egress {
411 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
412 where
413 D: Deserializer<'de>,
414 {
415 deserializer.deserialize_map(EgressVisitor)
416 }
417 }
418
419 struct EgressVisitor;
420
421 impl<'de> Visitor<'de> for EgressVisitor {
422 type Value = Egress;
423
424 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
425 formatter.write_str("a map of RabbitMQ egress")
426 }
427
428 fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
429 where
430 A: MapAccess<'de>,
431 {
432 visit_egress(map, None)
433 }
434 }
435
436 fn visit_egress<'de, A>(mut map: A, known_name: Option<&str>) -> Result<Egress, A::Error>
437 where
438 A: MapAccess<'de>,
439 {
440 let mut name: Option<String> = None;
441 let mut exchange: Option<String> = None;
442 let mut routing_key: Option<String> = None;
443 let mut confirmation = None;
444 let mut force_durable = None;
445
446 while let Some(key) = map.next_key()? {
447 match key {
448 EgressField::name => key.poll(&mut map, &mut name)?,
449 EgressField::exchange => key.poll(&mut map, &mut exchange)?,
450 EgressField::routing_key => key.poll(&mut map, &mut routing_key)?,
451 EgressField::confirmation => key.poll(&mut map, &mut confirmation)?,
452 EgressField::force_durable => key.poll(&mut map, &mut force_durable)?,
453 EgressField::__ignore => map.next_value()?,
454 };
455 }
456
457 let name = match known_name {
458 Some(known_name) => known_name,
459 None => name.as_deref().unwrap_or_else(|| Egress::default_name()),
460 };
461
462 let mut builder = Egress::builder();
463
464 let exchange = exchange
465 .as_deref()
466 .unwrap_or_else(|| Egress::default_exchange());
467 let routing_key = routing_key
468 .as_deref()
469 .unwrap_or_else(|| Egress::default_routing_key());
470
471 builder = builder
472 .with_name(name)
473 .with_exchange(exchange)
474 .with_routing_key(routing_key);
475
476 if let Some(confirmation) = confirmation {
477 builder = builder.with_confirmation(confirmation);
478 }
479
480 if let Some(force_durable) = force_durable {
481 builder = builder.with_force_durable(force_durable);
482 }
483
484 Ok(builder.build().map_err(Error::custom)?)
485 }
486
487 impl_deserialize_field!(
488 EgressField,
489 strut_deserialize::Slug::eq_as_slugs,
490 name,
491 exchange,
492 routing_key,
493 confirmation | confirmation_level,
494 force_durable,
495 );
496};
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use pretty_assertions::assert_eq;
502
503 #[test]
504 fn deserialize_from_empty() {
505 let input = "";
507
508 let actual_output = serde_yml::from_str::<Egress>(input);
510
511 assert!(actual_output.is_err());
513 }
514
515 #[test]
516 fn deserialize_from_string() {
517 let input = "\"test_egress\"";
519
520 let actual_output = serde_yml::from_str::<Egress>(input);
522
523 assert!(actual_output.is_err());
525 }
526
527 #[test]
528 fn deserialize_from_name() {
529 let input = r#"
531name: test_egress
532"#;
533
534 let actual_output = serde_yml::from_str::<Egress>(input);
536
537 assert!(actual_output.is_err());
539 }
540
541 #[test]
542 fn deserialize_from_name_and_exchange() {
543 let input = r#"
545name: test_egress
546exchange: amq.fanout
547"#;
548 let expected_output = Egress {
549 name: "test_egress".into(),
550 exchange: Exchange::AmqFanout.name().into(),
551 ..Default::default()
552 };
553
554 let actual_output = serde_yml::from_str::<Egress>(input).unwrap();
556
557 assert_eq!(expected_output, actual_output);
559 }
560
561 #[test]
562 fn deserialize_from_name_and_routing_key() {
563 let input = r#"
565name: test_egress
566routing_key: test_routing_key
567"#;
568 let expected_output = Egress {
569 name: "test_egress".into(),
570 routing_key: "test_routing_key".into(),
571 ..Default::default()
572 };
573
574 let actual_output = serde_yml::from_str::<Egress>(input).unwrap();
576
577 assert_eq!(expected_output, actual_output);
579 }
580
581 #[test]
582 fn deserialize_from_full() {
583 let input = r#"
585extra_field: ignored
586name: test_egress
587exchange: amq.topic
588routing_key: test_routing_key
589confirmation: routed
590force_durable: true
591"#;
592 let expected_output = Egress {
593 name: "test_egress".into(),
594 exchange: Exchange::AmqTopic.name().into(),
595 routing_key: "test_routing_key".into(),
596 confirmation: ConfirmationLevel::Routed,
597 force_durable: true,
598 ..Default::default()
599 };
600
601 let actual_output = serde_yml::from_str::<Egress>(input).unwrap();
603
604 assert_eq!(expected_output, actual_output);
606 }
607
608 #[test]
609 fn exchange_requires_routing_key() {
610 let expected_output = EgressError::ExchangeRequiresRoutingKey {
612 egress: "test_egress".into(),
613 exchange: Exchange::AmqTopic,
614 };
615
616 let actual_output = Egress::builder()
618 .with_name("test_egress")
619 .with_exchange(Exchange::AmqTopic.name())
620 .with_routing_key("")
621 .build()
622 .unwrap_err();
623
624 assert_eq!(expected_output, actual_output);
626 }
627
628 #[test]
629 fn deserialize_exchange_requires_routing_key() {
630 let input = r#"
632name: test_egress
633exchange: amq.topic
634"#;
635 let expected_output = EgressError::ExchangeRequiresRoutingKey {
636 egress: "test_egress".into(),
637 exchange: Exchange::AmqTopic,
638 };
639
640 let actual_output = serde_yml::from_str::<Egress>(input).unwrap_err();
642
643 assert!(
645 actual_output
646 .to_string()
647 .starts_with(&expected_output.to_string()),
648 );
649 }
650
651 #[test]
652 fn exchange_cannot_have_routing_key() {
653 let expected_output = EgressError::ExchangeCannotHaveRoutingKey {
655 egress: "test_egress".into(),
656 exchange: Exchange::AmqHeaders,
657 routing_key: "test_routing_key".into(),
658 };
659
660 let actual_output = Egress::builder()
662 .with_name("test_egress")
663 .with_exchange(Exchange::AmqHeaders.name())
664 .with_routing_key("test_routing_key")
665 .build()
666 .unwrap_err();
667
668 assert_eq!(expected_output, actual_output);
670 }
671
672 #[test]
673 fn deserialize_exchange_cannot_have_routing_key() {
674 let input = r#"
676name: test_egress
677exchange: amq.headers
678routing_key: test_routing_key
679"#;
680 let expected_output = EgressError::ExchangeCannotHaveRoutingKey {
681 egress: "test_egress".into(),
682 exchange: Exchange::AmqHeaders,
683 routing_key: "test_routing_key".into(),
684 };
685
686 let actual_output = serde_yml::from_str::<Egress>(input).unwrap_err();
688
689 assert!(
691 actual_output
692 .to_string()
693 .starts_with(&expected_output.to_string()),
694 );
695 }
696
697 #[test]
698 fn deserialize_landscape_from_empty() {
699 let input = "";
701 let expected_output = EgressLandscape::default();
702
703 let actual_output = serde_yml::from_str::<EgressLandscape>(input).unwrap();
705
706 assert_eq!(expected_output, actual_output);
708 }
709
710 #[test]
711 fn deserialize_landscape_from_full() {
712 let input = r#"
714test_egress_a: test_routing_key_a
715test_egress_b:
716 exchange: test_exchange_b
717 routing_key: test_routing_key_b
718"#;
719 let expected_output = EgressLandscape::from([
720 (
721 "test_egress_a",
722 Egress::builder()
723 .with_name("test_egress_a")
724 .with_exchange("")
725 .with_routing_key("test_routing_key_a")
726 .build()
727 .unwrap(),
728 ),
729 (
730 "test_egress_b",
731 Egress::builder()
732 .with_name("test_egress_b")
733 .with_exchange("test_exchange_b")
734 .with_routing_key("test_routing_key_b")
735 .build()
736 .unwrap(),
737 ),
738 ]);
739
740 let actual_output = serde_yml::from_str::<EgressLandscape>(input).unwrap();
742
743 assert_eq!(expected_output, actual_output);
745 }
746}