rabbitmq_http_client/requests/
federation.rs

1// Copyright (C) 2023-2025 RabbitMQ Core Team (teamrabbitmq@gmail.com)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::commons::{ChannelUseMode, MessageTransferAcknowledgementMode, QueueType};
16use crate::requests::parameters::RuntimeParameterDefinition;
17use crate::responses::FederationUpstream;
18use serde::{Deserialize, Serialize};
19use serde_json::{Map, json};
20use std::fmt::{Display, Formatter, Result};
21
22/// Controls when federation resources (temporary queues/exchanges) are cleaned up.
23///
24/// Federation creates temporary resources on the downstream cluster. This enum controls
25/// when these resources are removed to prevent resource leaks.
26#[derive(Default, Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
27#[serde(rename_all = "lowercase")]
28pub enum FederationResourceCleanupMode {
29    /// Default cleanup behavior: resources are cleaned up when the federation link is stopped
30    #[default]
31    Default,
32    /// Never clean up federation resources automatically (manual cleanup required)
33    Never,
34}
35
36impl From<&str> for FederationResourceCleanupMode {
37    fn from(value: &str) -> Self {
38        match value {
39            "default" => FederationResourceCleanupMode::Default,
40            "never" => FederationResourceCleanupMode::Never,
41            _ => FederationResourceCleanupMode::default(),
42        }
43    }
44}
45
46impl From<String> for FederationResourceCleanupMode {
47    fn from(value: String) -> Self {
48        Self::from(value.as_str())
49    }
50}
51
52impl Display for FederationResourceCleanupMode {
53    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
54        match self {
55            FederationResourceCleanupMode::Default => write!(f, "default"),
56            FederationResourceCleanupMode::Never => write!(f, "never"),
57        }
58    }
59}
60
61/// [Runtime parameter](https://www.rabbitmq.com/docs/parameters) component name used by federation upstreams.
62///
63/// This constant is used internally when creating [`RuntimeParameterDefinition`]
64/// instances for federation upstreams.
65pub const FEDERATION_UPSTREAM_COMPONENT: &str = "federation-upstream";
66
67/// Parameters specific to [queue federation](https://www.rabbitmq.com/docs/federated-queues).
68#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
69pub struct QueueFederationParams<'a> {
70    /// Name of the upstream queue to federate from (None uses the same name as downstream)
71    pub queue: Option<&'a str>,
72    /// Consumer tag for the federation link (None uses auto-generated tag)
73    pub consumer_tag: Option<&'a str>,
74}
75
76impl<'a> QueueFederationParams<'a> {
77    /// Returns queue federation parameters with a specific upstream queue name.
78    ///
79    /// Use this when the upstream queue has a different name than the downstream queue.
80    pub fn new(queue: &'a str) -> Self {
81        Self {
82            queue: Some(queue),
83            ..Default::default()
84        }
85    }
86
87    /// Returns queue federation parameters with both queue name and consumer tag.
88    ///
89    /// Use this when you need to specify both the upstream queue name and a custom
90    /// consumer tag for identification and management purposes.
91    pub fn new_with_consumer_tag(queue: &'a str, consumer_tag: &'a str) -> Self {
92        Self {
93            queue: Some(queue),
94            consumer_tag: Some(consumer_tag),
95        }
96    }
97
98    /// Returns queue federation parameters from optional values.
99    ///
100    /// Useful when constructing parameters from CLI arguments or other optional sources.
101    pub fn from_options(queue: Option<&'a str>, consumer_tag: Option<&'a str>) -> Self {
102        Self {
103            queue,
104            consumer_tag,
105        }
106    }
107}
108
109/// Parameters specific to [exchange federation](https://www.rabbitmq.com/docs/federated-exchanges).
110#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
111pub struct ExchangeFederationParams<'a> {
112    /// Name of the upstream exchange to federate from (None uses the same name as downstream)
113    pub exchange: Option<&'a str>,
114    /// Maximum hops for federation chains to prevent infinite loops
115    pub max_hops: Option<u8>,
116    /// Queue type for the temporary federation queue
117    pub queue_type: QueueType,
118    /// Time-to-live for the federation queue in milliseconds
119    pub ttl: Option<u32>,
120    /// Message TTL for federated messages in milliseconds
121    pub message_ttl: Option<u32>,
122    /// When to clean up temporary federation resources
123    pub resource_cleanup_mode: FederationResourceCleanupMode,
124}
125
126impl ExchangeFederationParams<'_> {
127    /// Returns exchange federation parameters with the specified queue type.
128    ///
129    /// The queue type determines the characteristics of the temporary queue used
130    /// for the federation link. Use quorum queues for durability or classic for simplicity.
131    pub fn new(queue_type: QueueType) -> Self {
132        Self {
133            queue_type,
134            ..Default::default()
135        }
136    }
137}
138
139/// Matches the default used by the federation plugin.
140pub const DEFAULT_FEDERATION_PREFETCH: u32 = 1000;
141/// Matches the default used by the federation plugin.
142pub const DEFAULT_FEDERATION_RECONNECT_DELAY: u32 = 5;
143
144/// Represents a set of parameters that define a federation upstream
145/// and a number of the federation type-specific (exchange, queue) parameters
146/// that are associated with an upstream.
147///
148/// A federation upstream is declared as a runtime parameter,
149/// therefore this type implements a conversion that is used
150/// by [`crate::api::Client#declare_federation_upstream`] and [`crate::blocking_api::Client#declare_federation_upstream`]
151#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct FederationUpstreamParams<'a> {
153    pub name: &'a str,
154    pub vhost: &'a str,
155    pub uri: &'a str,
156    pub reconnect_delay: u32,
157    pub trust_user_id: bool,
158    pub prefetch_count: u32,
159    pub ack_mode: MessageTransferAcknowledgementMode,
160    pub bind_using_nowait: bool,
161    pub channel_use_mode: ChannelUseMode,
162
163    pub queue_federation: Option<QueueFederationParams<'a>>,
164    pub exchange_federation: Option<ExchangeFederationParams<'a>>,
165}
166
167impl<'a> FederationUpstreamParams<'a> {
168    /// Creates a federation upstream that will be used for [queue federation](https://www.rabbitmq.com/docs/federated-queues).
169    pub fn new_queue_federation_upstream(
170        vhost: &'a str,
171        name: &'a str,
172        uri: &'a str,
173        params: QueueFederationParams<'a>,
174    ) -> Self {
175        Self {
176            vhost,
177            name,
178            uri,
179            ack_mode: MessageTransferAcknowledgementMode::WhenConfirmed,
180            reconnect_delay: DEFAULT_FEDERATION_RECONNECT_DELAY,
181            trust_user_id: false,
182            prefetch_count: DEFAULT_FEDERATION_PREFETCH,
183            bind_using_nowait: false,
184            channel_use_mode: ChannelUseMode::default(),
185            exchange_federation: None,
186            queue_federation: Some(params),
187        }
188    }
189
190    /// Creates a federation upstream that will be used for [exchange federation](https://www.rabbitmq.com/docs/federated-exchanges).
191    pub fn new_exchange_federation_upstream(
192        vhost: &'a str,
193        name: &'a str,
194        uri: &'a str,
195        params: ExchangeFederationParams<'a>,
196    ) -> Self {
197        Self {
198            vhost,
199            name,
200            uri,
201            ack_mode: MessageTransferAcknowledgementMode::WhenConfirmed,
202            reconnect_delay: DEFAULT_FEDERATION_RECONNECT_DELAY,
203            trust_user_id: false,
204            prefetch_count: DEFAULT_FEDERATION_PREFETCH,
205            bind_using_nowait: false,
206            channel_use_mode: ChannelUseMode::default(),
207            queue_federation: None,
208            exchange_federation: Some(params),
209        }
210    }
211}
212
213impl<'a> From<FederationUpstreamParams<'a>> for RuntimeParameterDefinition<'a> {
214    fn from(params: FederationUpstreamParams<'a>) -> Self {
215        let mut value = Map::new();
216
217        value.insert("uri".to_owned(), json!(params.uri));
218        value.insert("prefetch-count".to_owned(), json!(params.prefetch_count));
219        value.insert("trust-user-id".to_owned(), json!(params.trust_user_id));
220        value.insert("reconnect-delay".to_owned(), json!(params.reconnect_delay));
221        value.insert("ack-mode".to_owned(), json!(params.ack_mode));
222        value.insert("bind-nowait".to_owned(), json!(params.bind_using_nowait));
223        value.insert(
224            "channel-use-mode".to_owned(),
225            json!(params.channel_use_mode),
226        );
227
228        if let Some(qf) = params.queue_federation {
229            value.insert("queue".to_owned(), json!(qf.queue));
230            if let Some(val) = qf.consumer_tag {
231                value.insert("consumer-tag".to_owned(), json!(val));
232            }
233        }
234
235        if let Some(ef) = params.exchange_federation {
236            value.insert("queue-type".to_owned(), json!(ef.queue_type));
237            value.insert(
238                "resource-cleanup-mode".to_owned(),
239                json!(ef.resource_cleanup_mode),
240            );
241
242            if let Some(val) = ef.exchange {
243                value.insert("exchange".to_owned(), json!(val));
244            };
245            if let Some(val) = ef.max_hops {
246                value.insert("max-hops".to_owned(), json!(val));
247            }
248            if let Some(val) = ef.ttl {
249                value.insert("expires".to_owned(), json!(val));
250            }
251            if let Some(val) = ef.message_ttl {
252                value.insert("message-ttl".to_owned(), json!(val));
253            }
254        }
255
256        Self {
257            name: params.name,
258            vhost: params.vhost,
259            component: FEDERATION_UPSTREAM_COMPONENT,
260            value,
261        }
262    }
263}
264
265/// This auxiliary type is an owned version of FederationUpstreamParams.
266/// This is one of the simplest approaches to conversion because
267/// of [`FederationUpstreamParams`]'s lifetimes.
268#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
269pub struct OwnedFederationUpstreamParams {
270    pub name: String,
271    pub vhost: String,
272    pub uri: String,
273    pub reconnect_delay: u32,
274    pub trust_user_id: bool,
275    pub prefetch_count: u32,
276    pub ack_mode: MessageTransferAcknowledgementMode,
277    pub bind_using_nowait: bool,
278    pub channel_use_mode: ChannelUseMode,
279
280    pub queue_federation: Option<OwnedQueueFederationParams>,
281    pub exchange_federation: Option<OwnedExchangeFederationParams>,
282}
283
284impl OwnedFederationUpstreamParams {
285    /// Returns a copy with the URI replaced.
286    pub fn with_uri(mut self, uri: impl Into<String>) -> Self {
287        self.uri = uri.into();
288        self
289    }
290}
291
292/// Owned version of QueueFederationParams
293#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
294pub struct OwnedQueueFederationParams {
295    pub queue: Option<String>,
296    pub consumer_tag: Option<String>,
297}
298
299/// Owned version of ExchangeFederationParams
300#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
301pub struct OwnedExchangeFederationParams {
302    pub exchange: Option<String>,
303    pub max_hops: Option<u8>,
304    pub queue_type: QueueType,
305    pub ttl: Option<u32>,
306    pub message_ttl: Option<u32>,
307    pub resource_cleanup_mode: FederationResourceCleanupMode,
308}
309
310impl From<FederationUpstream> for OwnedFederationUpstreamParams {
311    fn from(upstream: FederationUpstream) -> Self {
312        // Create queue federation parameters if queue-related fields are present
313        let queue_federation = if upstream.queue.is_some() || upstream.consumer_tag.is_some() {
314            Some(OwnedQueueFederationParams {
315                queue: upstream.queue,
316                consumer_tag: upstream.consumer_tag,
317            })
318        } else {
319            None
320        };
321
322        // Create exchange federation parameters if exchange-related fields are present
323        let exchange_federation = if upstream.exchange.is_some()
324            || upstream.max_hops.is_some()
325            || upstream.queue_type.is_some()
326            || upstream.expires.is_some()
327            || upstream.message_ttl.is_some()
328            || upstream.resource_cleanup_mode != FederationResourceCleanupMode::default()
329        {
330            Some(OwnedExchangeFederationParams {
331                exchange: upstream.exchange,
332                max_hops: upstream.max_hops,
333                queue_type: upstream.queue_type.unwrap_or_default(),
334                ttl: upstream.expires,
335                message_ttl: upstream.message_ttl,
336                resource_cleanup_mode: upstream.resource_cleanup_mode,
337            })
338        } else {
339            None
340        };
341
342        Self {
343            name: upstream.name,
344            vhost: upstream.vhost,
345            uri: upstream.uri,
346            reconnect_delay: upstream
347                .reconnect_delay
348                .unwrap_or(DEFAULT_FEDERATION_RECONNECT_DELAY),
349            trust_user_id: upstream.trust_user_id.unwrap_or(false),
350            ack_mode: upstream.ack_mode,
351            prefetch_count: upstream
352                .prefetch_count
353                .unwrap_or(DEFAULT_FEDERATION_PREFETCH),
354            bind_using_nowait: upstream.bind_using_nowait,
355            channel_use_mode: upstream.channel_use_mode,
356            queue_federation,
357            exchange_federation,
358        }
359    }
360}
361
362impl<'a> From<&'a OwnedFederationUpstreamParams> for FederationUpstreamParams<'a> {
363    fn from(owned: &'a OwnedFederationUpstreamParams) -> Self {
364        let queue_federation = owned
365            .queue_federation
366            .as_ref()
367            .map(|qf| QueueFederationParams {
368                queue: qf.queue.as_deref(),
369                consumer_tag: qf.consumer_tag.as_deref(),
370            });
371
372        let exchange_federation =
373            owned
374                .exchange_federation
375                .as_ref()
376                .map(|ef| ExchangeFederationParams {
377                    exchange: ef.exchange.as_deref(),
378                    max_hops: ef.max_hops,
379                    queue_type: ef.queue_type.clone(),
380                    ttl: ef.ttl,
381                    message_ttl: ef.message_ttl,
382                    resource_cleanup_mode: ef.resource_cleanup_mode.clone(),
383                });
384
385        Self {
386            name: &owned.name,
387            vhost: &owned.vhost,
388            uri: &owned.uri,
389            reconnect_delay: owned.reconnect_delay,
390            trust_user_id: owned.trust_user_id,
391            prefetch_count: owned.prefetch_count,
392            ack_mode: owned.ack_mode.clone(),
393            bind_using_nowait: owned.bind_using_nowait,
394            channel_use_mode: owned.channel_use_mode.clone(),
395            queue_federation,
396            exchange_federation,
397        }
398    }
399}