1use crate::commons::{ChannelUseMode, MessageTransferAcknowledgementMode, QueueType};
16use crate::requests::parameters::RuntimeParameterDefinition;
17use crate::responses::FederationUpstream;
18use serde::{Deserialize, Serialize};
19use serde_json::{Map, json};
20use std::fmt::{Display, Formatter, Result};
21
22#[derive(Default, Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
27#[serde(rename_all = "lowercase")]
28pub enum FederationResourceCleanupMode {
29 #[default]
31 Default,
32 Never,
34}
35
36impl From<&str> for FederationResourceCleanupMode {
37 fn from(value: &str) -> Self {
38 match value {
39 "default" => FederationResourceCleanupMode::Default,
40 "never" => FederationResourceCleanupMode::Never,
41 _ => FederationResourceCleanupMode::default(),
42 }
43 }
44}
45
46impl From<String> for FederationResourceCleanupMode {
47 fn from(value: String) -> Self {
48 Self::from(value.as_str())
49 }
50}
51
52impl Display for FederationResourceCleanupMode {
53 fn fmt(&self, f: &mut Formatter<'_>) -> Result {
54 match self {
55 FederationResourceCleanupMode::Default => write!(f, "default"),
56 FederationResourceCleanupMode::Never => write!(f, "never"),
57 }
58 }
59}
60
61pub const FEDERATION_UPSTREAM_COMPONENT: &str = "federation-upstream";
66
67#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
69pub struct QueueFederationParams<'a> {
70 pub queue: Option<&'a str>,
72 pub consumer_tag: Option<&'a str>,
74}
75
76impl<'a> QueueFederationParams<'a> {
77 pub fn new(queue: &'a str) -> Self {
81 Self {
82 queue: Some(queue),
83 ..Default::default()
84 }
85 }
86
87 pub fn new_with_consumer_tag(queue: &'a str, consumer_tag: &'a str) -> Self {
92 Self {
93 queue: Some(queue),
94 consumer_tag: Some(consumer_tag),
95 }
96 }
97
98 pub fn from_options(queue: Option<&'a str>, consumer_tag: Option<&'a str>) -> Self {
102 Self {
103 queue,
104 consumer_tag,
105 }
106 }
107}
108
109#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
111pub struct ExchangeFederationParams<'a> {
112 pub exchange: Option<&'a str>,
114 pub max_hops: Option<u8>,
116 pub queue_type: QueueType,
118 pub ttl: Option<u32>,
120 pub message_ttl: Option<u32>,
122 pub resource_cleanup_mode: FederationResourceCleanupMode,
124}
125
126impl ExchangeFederationParams<'_> {
127 pub fn new(queue_type: QueueType) -> Self {
132 Self {
133 queue_type,
134 ..Default::default()
135 }
136 }
137}
138
139pub const DEFAULT_FEDERATION_PREFETCH: u32 = 1000;
141pub const DEFAULT_FEDERATION_RECONNECT_DELAY: u32 = 5;
143
144#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct FederationUpstreamParams<'a> {
153 pub name: &'a str,
154 pub vhost: &'a str,
155 pub uri: &'a str,
156 pub reconnect_delay: u32,
157 pub trust_user_id: bool,
158 pub prefetch_count: u32,
159 pub ack_mode: MessageTransferAcknowledgementMode,
160 pub bind_using_nowait: bool,
161 pub channel_use_mode: ChannelUseMode,
162
163 pub queue_federation: Option<QueueFederationParams<'a>>,
164 pub exchange_federation: Option<ExchangeFederationParams<'a>>,
165}
166
167impl<'a> FederationUpstreamParams<'a> {
168 pub fn new_queue_federation_upstream(
170 vhost: &'a str,
171 name: &'a str,
172 uri: &'a str,
173 params: QueueFederationParams<'a>,
174 ) -> Self {
175 Self {
176 vhost,
177 name,
178 uri,
179 ack_mode: MessageTransferAcknowledgementMode::WhenConfirmed,
180 reconnect_delay: DEFAULT_FEDERATION_RECONNECT_DELAY,
181 trust_user_id: false,
182 prefetch_count: DEFAULT_FEDERATION_PREFETCH,
183 bind_using_nowait: false,
184 channel_use_mode: ChannelUseMode::default(),
185 exchange_federation: None,
186 queue_federation: Some(params),
187 }
188 }
189
190 pub fn new_exchange_federation_upstream(
192 vhost: &'a str,
193 name: &'a str,
194 uri: &'a str,
195 params: ExchangeFederationParams<'a>,
196 ) -> Self {
197 Self {
198 vhost,
199 name,
200 uri,
201 ack_mode: MessageTransferAcknowledgementMode::WhenConfirmed,
202 reconnect_delay: DEFAULT_FEDERATION_RECONNECT_DELAY,
203 trust_user_id: false,
204 prefetch_count: DEFAULT_FEDERATION_PREFETCH,
205 bind_using_nowait: false,
206 channel_use_mode: ChannelUseMode::default(),
207 queue_federation: None,
208 exchange_federation: Some(params),
209 }
210 }
211}
212
213impl<'a> From<FederationUpstreamParams<'a>> for RuntimeParameterDefinition<'a> {
214 fn from(params: FederationUpstreamParams<'a>) -> Self {
215 let mut value = Map::new();
216
217 value.insert("uri".to_owned(), json!(params.uri));
218 value.insert("prefetch-count".to_owned(), json!(params.prefetch_count));
219 value.insert("trust-user-id".to_owned(), json!(params.trust_user_id));
220 value.insert("reconnect-delay".to_owned(), json!(params.reconnect_delay));
221 value.insert("ack-mode".to_owned(), json!(params.ack_mode));
222 value.insert("bind-nowait".to_owned(), json!(params.bind_using_nowait));
223 value.insert(
224 "channel-use-mode".to_owned(),
225 json!(params.channel_use_mode),
226 );
227
228 if let Some(qf) = params.queue_federation {
229 value.insert("queue".to_owned(), json!(qf.queue));
230 if let Some(val) = qf.consumer_tag {
231 value.insert("consumer-tag".to_owned(), json!(val));
232 }
233 }
234
235 if let Some(ef) = params.exchange_federation {
236 value.insert("queue-type".to_owned(), json!(ef.queue_type));
237 value.insert(
238 "resource-cleanup-mode".to_owned(),
239 json!(ef.resource_cleanup_mode),
240 );
241
242 if let Some(val) = ef.exchange {
243 value.insert("exchange".to_owned(), json!(val));
244 };
245 if let Some(val) = ef.max_hops {
246 value.insert("max-hops".to_owned(), json!(val));
247 }
248 if let Some(val) = ef.ttl {
249 value.insert("expires".to_owned(), json!(val));
250 }
251 if let Some(val) = ef.message_ttl {
252 value.insert("message-ttl".to_owned(), json!(val));
253 }
254 }
255
256 Self {
257 name: params.name,
258 vhost: params.vhost,
259 component: FEDERATION_UPSTREAM_COMPONENT,
260 value,
261 }
262 }
263}
264
265#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
269pub struct OwnedFederationUpstreamParams {
270 pub name: String,
271 pub vhost: String,
272 pub uri: String,
273 pub reconnect_delay: u32,
274 pub trust_user_id: bool,
275 pub prefetch_count: u32,
276 pub ack_mode: MessageTransferAcknowledgementMode,
277 pub bind_using_nowait: bool,
278 pub channel_use_mode: ChannelUseMode,
279
280 pub queue_federation: Option<OwnedQueueFederationParams>,
281 pub exchange_federation: Option<OwnedExchangeFederationParams>,
282}
283
284impl OwnedFederationUpstreamParams {
285 pub fn with_uri(mut self, uri: impl Into<String>) -> Self {
287 self.uri = uri.into();
288 self
289 }
290}
291
292#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
294pub struct OwnedQueueFederationParams {
295 pub queue: Option<String>,
296 pub consumer_tag: Option<String>,
297}
298
299#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
301pub struct OwnedExchangeFederationParams {
302 pub exchange: Option<String>,
303 pub max_hops: Option<u8>,
304 pub queue_type: QueueType,
305 pub ttl: Option<u32>,
306 pub message_ttl: Option<u32>,
307 pub resource_cleanup_mode: FederationResourceCleanupMode,
308}
309
310impl From<FederationUpstream> for OwnedFederationUpstreamParams {
311 fn from(upstream: FederationUpstream) -> Self {
312 let queue_federation = if upstream.queue.is_some() || upstream.consumer_tag.is_some() {
314 Some(OwnedQueueFederationParams {
315 queue: upstream.queue,
316 consumer_tag: upstream.consumer_tag,
317 })
318 } else {
319 None
320 };
321
322 let exchange_federation = if upstream.exchange.is_some()
324 || upstream.max_hops.is_some()
325 || upstream.queue_type.is_some()
326 || upstream.expires.is_some()
327 || upstream.message_ttl.is_some()
328 || upstream.resource_cleanup_mode != FederationResourceCleanupMode::default()
329 {
330 Some(OwnedExchangeFederationParams {
331 exchange: upstream.exchange,
332 max_hops: upstream.max_hops,
333 queue_type: upstream.queue_type.unwrap_or_default(),
334 ttl: upstream.expires,
335 message_ttl: upstream.message_ttl,
336 resource_cleanup_mode: upstream.resource_cleanup_mode,
337 })
338 } else {
339 None
340 };
341
342 Self {
343 name: upstream.name,
344 vhost: upstream.vhost,
345 uri: upstream.uri,
346 reconnect_delay: upstream
347 .reconnect_delay
348 .unwrap_or(DEFAULT_FEDERATION_RECONNECT_DELAY),
349 trust_user_id: upstream.trust_user_id.unwrap_or(false),
350 ack_mode: upstream.ack_mode,
351 prefetch_count: upstream
352 .prefetch_count
353 .unwrap_or(DEFAULT_FEDERATION_PREFETCH),
354 bind_using_nowait: upstream.bind_using_nowait,
355 channel_use_mode: upstream.channel_use_mode,
356 queue_federation,
357 exchange_federation,
358 }
359 }
360}
361
362impl<'a> From<&'a OwnedFederationUpstreamParams> for FederationUpstreamParams<'a> {
363 fn from(owned: &'a OwnedFederationUpstreamParams) -> Self {
364 let queue_federation = owned
365 .queue_federation
366 .as_ref()
367 .map(|qf| QueueFederationParams {
368 queue: qf.queue.as_deref(),
369 consumer_tag: qf.consumer_tag.as_deref(),
370 });
371
372 let exchange_federation =
373 owned
374 .exchange_federation
375 .as_ref()
376 .map(|ef| ExchangeFederationParams {
377 exchange: ef.exchange.as_deref(),
378 max_hops: ef.max_hops,
379 queue_type: ef.queue_type.clone(),
380 ttl: ef.ttl,
381 message_ttl: ef.message_ttl,
382 resource_cleanup_mode: ef.resource_cleanup_mode.clone(),
383 });
384
385 Self {
386 name: &owned.name,
387 vhost: &owned.vhost,
388 uri: &owned.uri,
389 reconnect_delay: owned.reconnect_delay,
390 trust_user_id: owned.trust_user_id,
391 prefetch_count: owned.prefetch_count,
392 ack_mode: owned.ack_mode.clone(),
393 bind_using_nowait: owned.bind_using_nowait,
394 channel_use_mode: owned.channel_use_mode.clone(),
395 queue_federation,
396 exchange_federation,
397 }
398 }
399}