strut_rabbitmq/routing/ingress/
queue.rs

1use crate::{QueueKind, QueueRenamingBehavior};
2use serde::de::{Error, MapAccess, Visitor};
3use serde::{Deserialize, Deserializer};
4use std::borrow::Cow;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use strut_core::AppReplica;
8use strut_factory::impl_deserialize_field;
9
10/// Defines of a RabbitMQ queue to be declared from the consuming side.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct Queue {
13    name: Arc<str>,
14    kind: QueueKind,
15    rename: QueueRenamingBehavior,
16}
17
18impl Default for Queue {
19    fn default() -> Self {
20        Self::named(Self::default_name())
21    }
22}
23
24impl Queue {
25    /// Creates a queue definition with the given name, falling on defaults for
26    /// all other configuration.
27    pub fn named(name: impl AsRef<str>) -> Self {
28        Self {
29            name: Arc::from(name.as_ref()),
30            kind: Self::default_kind(),
31            rename: Self::default_rename(),
32        }
33    }
34
35    /// Creates a queue definition without a given name (which will cause RabbitMQ
36    /// to generate one), falling on defaults for all other configuration.
37    pub fn empty() -> Self {
38        Self {
39            name: "".into(),
40            kind: Self::default_kind(),
41            rename: Self::default_rename(),
42        }
43    }
44
45    /// Re-creates this queue definition with the given kind.
46    pub fn with_kind(self, kind: QueueKind) -> Self {
47        Self { kind, ..self }
48    }
49
50    /// Re-creates this queue definition with the given renaming behavior.
51    pub fn with_rename(self, rename: QueueRenamingBehavior) -> Self {
52        Self { rename, ..self }
53    }
54}
55
56impl Queue {
57    /// Reports the queue name for this definition, taking into account the
58    /// [renaming behavior](QueueRenamingBehavior).
59    pub fn name(&self) -> Cow<'_, str> {
60        // If the queue name is empty, allow the broker to generate the name
61        if self.name.is_empty() {
62            return Cow::Borrowed(&self.name);
63        }
64
65        match self.rename {
66            QueueRenamingBehavior::Verbatim => Cow::Borrowed(&self.name),
67            QueueRenamingBehavior::ReplicaIndex => {
68                if let Some(index) = AppReplica::index() {
69                    Cow::Owned(format!("{}.{}", &self.name, index))
70                } else {
71                    Cow::Borrowed(&self.name)
72                }
73            }
74            QueueRenamingBehavior::ReplicaLifetimeId => Cow::Owned(format!(
75                "{}.{}",
76                &self.name,
77                AppReplica::lifetime_id().dotted(),
78            )),
79        }
80    }
81
82    /// Reports whether the queue name for this definition is empty.
83    ///
84    /// An empty name is a signal to RabbitMQ to generate a random queue name,
85    /// which may or may not be acceptable. For example, it is not possible to
86    /// define a queue with an empty name for the built-in default exchange.
87    pub fn is_empty(&self) -> bool {
88        self.name.is_empty()
89    }
90
91    /// Reports the queue kind for this definition.
92    pub fn kind(&self) -> QueueKind {
93        self.kind
94    }
95
96    /// Reports the queue renaming behavior for this definition.
97    pub fn rename(&self) -> QueueRenamingBehavior {
98        self.rename
99    }
100}
101
102impl Queue {
103    fn default_name() -> &'static str {
104        ""
105    }
106
107    fn default_kind() -> QueueKind {
108        QueueKind::Classic
109    }
110
111    fn default_rename() -> QueueRenamingBehavior {
112        QueueRenamingBehavior::Verbatim
113    }
114}
115
116const _: () = {
117    impl<'de> Deserialize<'de> for Queue {
118        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
119        where
120            D: Deserializer<'de>,
121        {
122            deserializer.deserialize_any(QueueVisitor)
123        }
124    }
125
126    struct QueueVisitor;
127
128    impl<'de> Visitor<'de> for QueueVisitor {
129        type Value = Queue;
130
131        fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
132            formatter.write_str("a map of RabbitMQ queue or a string queue name")
133        }
134
135        fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
136        where
137            E: Error,
138        {
139            Ok(Queue::named(value))
140        }
141
142        fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
143        where
144            A: MapAccess<'de>,
145        {
146            let mut name: Option<String> = None;
147            let mut kind = None;
148            let mut rename = None;
149
150            while let Some(key) = map.next_key()? {
151                match key {
152                    QueueField::name => key.poll(&mut map, &mut name)?,
153                    QueueField::kind => key.poll(&mut map, &mut kind)?,
154                    QueueField::rename => key.poll(&mut map, &mut rename)?,
155                    QueueField::__ignore => map.next_value()?,
156                };
157            }
158
159            let mut queue = Queue::named(name.as_deref().unwrap_or_else(|| Queue::default_name()));
160
161            if let Some(kind) = kind {
162                queue = queue.with_kind(kind);
163            }
164
165            if let Some(rename) = rename {
166                queue = queue.with_rename(rename);
167            }
168
169            Ok(queue)
170        }
171    }
172
173    impl_deserialize_field!(
174        QueueField,
175        strut_deserialize::Slug::eq_as_slugs,
176        name,
177        kind,
178        rename | renaming | renaming_behavior,
179    );
180};
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use pretty_assertions::assert_eq;
186
187    #[test]
188    fn from_empty() {
189        // Given
190        let input = "{}";
191        let expected_output = Queue::default();
192
193        // When
194        let actual_output = serde_yml::from_str::<Queue>(input).unwrap();
195
196        // Then
197        assert!(actual_output.is_empty());
198        assert_eq!(expected_output, actual_output);
199    }
200
201    #[test]
202    fn from_string() {
203        // Given
204        let input = "test_queue";
205        let expected_output = Queue {
206            name: "test_queue".into(),
207            ..Default::default()
208        };
209
210        // When
211        let actual_output = serde_yml::from_str::<Queue>(input).unwrap();
212
213        // Then
214        assert!(!actual_output.is_empty());
215        assert_eq!(expected_output, actual_output);
216    }
217
218    #[test]
219    fn deserialize_from_full() {
220        // Given
221        let input = r#"
222extra_field: ignored
223name: test_queue
224kind: quorum
225"#;
226        let expected_queue = Queue {
227            name: "test_queue".into(),
228            kind: QueueKind::Quorum,
229            ..Default::default()
230        };
231
232        // When
233        let actual_queue = serde_yml::from_str::<Queue>(input).unwrap();
234
235        // Then
236        assert!(!actual_queue.is_empty());
237        assert_eq!(expected_queue, actual_queue);
238    }
239}