strut_rabbitmq/routing/ingress/
queue.rs1use crate::{QueueKind, QueueRenamingBehavior};
2use serde::de::{Error, MapAccess, Visitor};
3use serde::{Deserialize, Deserializer};
4use std::borrow::Cow;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use strut_core::AppReplica;
8use strut_factory::impl_deserialize_field;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct Queue {
13 name: Arc<str>,
14 kind: QueueKind,
15 rename: QueueRenamingBehavior,
16}
17
18impl Default for Queue {
19 fn default() -> Self {
20 Self::named(Self::default_name())
21 }
22}
23
24impl Queue {
25 pub fn named(name: impl AsRef<str>) -> Self {
28 Self {
29 name: Arc::from(name.as_ref()),
30 kind: Self::default_kind(),
31 rename: Self::default_rename(),
32 }
33 }
34
35 pub fn empty() -> Self {
38 Self {
39 name: "".into(),
40 kind: Self::default_kind(),
41 rename: Self::default_rename(),
42 }
43 }
44
45 pub fn with_kind(self, kind: QueueKind) -> Self {
47 Self { kind, ..self }
48 }
49
50 pub fn with_rename(self, rename: QueueRenamingBehavior) -> Self {
52 Self { rename, ..self }
53 }
54}
55
56impl Queue {
57 pub fn name(&self) -> Cow<'_, str> {
60 if self.name.is_empty() {
62 return Cow::Borrowed(&self.name);
63 }
64
65 match self.rename {
66 QueueRenamingBehavior::Verbatim => Cow::Borrowed(&self.name),
67 QueueRenamingBehavior::ReplicaIndex => {
68 if let Some(index) = AppReplica::index() {
69 Cow::Owned(format!("{}.{}", &self.name, index))
70 } else {
71 Cow::Borrowed(&self.name)
72 }
73 }
74 QueueRenamingBehavior::ReplicaLifetimeId => Cow::Owned(format!(
75 "{}.{}",
76 &self.name,
77 AppReplica::lifetime_id().dotted(),
78 )),
79 }
80 }
81
82 pub fn is_empty(&self) -> bool {
88 self.name.is_empty()
89 }
90
91 pub fn kind(&self) -> QueueKind {
93 self.kind
94 }
95
96 pub fn rename(&self) -> QueueRenamingBehavior {
98 self.rename
99 }
100}
101
102impl Queue {
103 fn default_name() -> &'static str {
104 ""
105 }
106
107 fn default_kind() -> QueueKind {
108 QueueKind::Classic
109 }
110
111 fn default_rename() -> QueueRenamingBehavior {
112 QueueRenamingBehavior::Verbatim
113 }
114}
115
116const _: () = {
117 impl<'de> Deserialize<'de> for Queue {
118 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
119 where
120 D: Deserializer<'de>,
121 {
122 deserializer.deserialize_any(QueueVisitor)
123 }
124 }
125
126 struct QueueVisitor;
127
128 impl<'de> Visitor<'de> for QueueVisitor {
129 type Value = Queue;
130
131 fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
132 formatter.write_str("a map of RabbitMQ queue or a string queue name")
133 }
134
135 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
136 where
137 E: Error,
138 {
139 Ok(Queue::named(value))
140 }
141
142 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
143 where
144 A: MapAccess<'de>,
145 {
146 let mut name: Option<String> = None;
147 let mut kind = None;
148 let mut rename = None;
149
150 while let Some(key) = map.next_key()? {
151 match key {
152 QueueField::name => key.poll(&mut map, &mut name)?,
153 QueueField::kind => key.poll(&mut map, &mut kind)?,
154 QueueField::rename => key.poll(&mut map, &mut rename)?,
155 QueueField::__ignore => map.next_value()?,
156 };
157 }
158
159 let mut queue = Queue::named(name.as_deref().unwrap_or_else(|| Queue::default_name()));
160
161 if let Some(kind) = kind {
162 queue = queue.with_kind(kind);
163 }
164
165 if let Some(rename) = rename {
166 queue = queue.with_rename(rename);
167 }
168
169 Ok(queue)
170 }
171 }
172
173 impl_deserialize_field!(
174 QueueField,
175 strut_deserialize::Slug::eq_as_slugs,
176 name,
177 kind,
178 rename | renaming | renaming_behavior,
179 );
180};
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use pretty_assertions::assert_eq;
186
187 #[test]
188 fn from_empty() {
189 let input = "{}";
191 let expected_output = Queue::default();
192
193 let actual_output = serde_yml::from_str::<Queue>(input).unwrap();
195
196 assert!(actual_output.is_empty());
198 assert_eq!(expected_output, actual_output);
199 }
200
201 #[test]
202 fn from_string() {
203 let input = "test_queue";
205 let expected_output = Queue {
206 name: "test_queue".into(),
207 ..Default::default()
208 };
209
210 let actual_output = serde_yml::from_str::<Queue>(input).unwrap();
212
213 assert!(!actual_output.is_empty());
215 assert_eq!(expected_output, actual_output);
216 }
217
218 #[test]
219 fn deserialize_from_full() {
220 let input = r#"
222extra_field: ignored
223name: test_queue
224kind: quorum
225"#;
226 let expected_queue = Queue {
227 name: "test_queue".into(),
228 kind: QueueKind::Quorum,
229 ..Default::default()
230 };
231
232 let actual_queue = serde_yml::from_str::<Queue>(input).unwrap();
234
235 assert!(!actual_queue.is_empty());
237 assert_eq!(expected_queue, actual_queue);
238 }
239}