use crate::commons::{ChannelUseMode, MessageTransferAcknowledgementMode, QueueType};
use crate::requests::parameters::RuntimeParameterDefinition;
use crate::responses::FederationUpstream;
use serde::{Deserialize, Serialize};
use serde_json::{Map, json};
use std::fmt::{Display, Formatter, Result};
#[derive(Default, Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum FederationResourceCleanupMode {
#[default]
Default,
Never,
}
impl From<&str> for FederationResourceCleanupMode {
fn from(value: &str) -> Self {
match value {
"default" => FederationResourceCleanupMode::Default,
"never" => FederationResourceCleanupMode::Never,
_ => FederationResourceCleanupMode::default(),
}
}
}
impl From<String> for FederationResourceCleanupMode {
fn from(value: String) -> Self {
Self::from(value.as_str())
}
}
impl Display for FederationResourceCleanupMode {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
match self {
FederationResourceCleanupMode::Default => write!(f, "default"),
FederationResourceCleanupMode::Never => write!(f, "never"),
}
}
}
pub const FEDERATION_UPSTREAM_COMPONENT: &str = "federation-upstream";
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct QueueFederationParams<'a> {
pub queue: Option<&'a str>,
pub consumer_tag: Option<&'a str>,
}
impl<'a> QueueFederationParams<'a> {
pub fn new(queue: &'a str) -> Self {
Self {
queue: Some(queue),
..Default::default()
}
}
pub fn new_with_consumer_tag(queue: &'a str, consumer_tag: &'a str) -> Self {
Self {
queue: Some(queue),
consumer_tag: Some(consumer_tag),
}
}
pub fn from_options(queue: Option<&'a str>, consumer_tag: Option<&'a str>) -> Self {
Self {
queue,
consumer_tag,
}
}
}
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct ExchangeFederationParams<'a> {
pub exchange: Option<&'a str>,
pub max_hops: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub queue_type: Option<QueueType>,
pub ttl: Option<u32>,
pub message_ttl: Option<u32>,
pub resource_cleanup_mode: FederationResourceCleanupMode,
}
impl ExchangeFederationParams<'_> {
pub fn new(queue_type: QueueType) -> Self {
Self {
queue_type: Some(queue_type),
..Default::default()
}
}
}
pub const DEFAULT_FEDERATION_PREFETCH: u32 = 1000;
pub const DEFAULT_FEDERATION_RECONNECT_DELAY: u32 = 5;
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct FederationUpstreamParams<'a> {
pub name: &'a str,
pub vhost: &'a str,
pub uri: &'a str,
pub reconnect_delay: u32,
pub trust_user_id: bool,
pub prefetch_count: u32,
pub ack_mode: MessageTransferAcknowledgementMode,
pub bind_using_nowait: bool,
pub channel_use_mode: ChannelUseMode,
pub queue_federation: Option<QueueFederationParams<'a>>,
pub exchange_federation: Option<ExchangeFederationParams<'a>>,
}
impl<'a> FederationUpstreamParams<'a> {
pub fn new_queue_federation_upstream(
vhost: &'a str,
name: &'a str,
uri: &'a str,
params: QueueFederationParams<'a>,
) -> Self {
Self {
vhost,
name,
uri,
ack_mode: MessageTransferAcknowledgementMode::WhenConfirmed,
reconnect_delay: DEFAULT_FEDERATION_RECONNECT_DELAY,
trust_user_id: false,
prefetch_count: DEFAULT_FEDERATION_PREFETCH,
bind_using_nowait: false,
channel_use_mode: ChannelUseMode::default(),
exchange_federation: None,
queue_federation: Some(params),
}
}
pub fn new_exchange_federation_upstream(
vhost: &'a str,
name: &'a str,
uri: &'a str,
params: ExchangeFederationParams<'a>,
) -> Self {
Self {
vhost,
name,
uri,
ack_mode: MessageTransferAcknowledgementMode::WhenConfirmed,
reconnect_delay: DEFAULT_FEDERATION_RECONNECT_DELAY,
trust_user_id: false,
prefetch_count: DEFAULT_FEDERATION_PREFETCH,
bind_using_nowait: false,
channel_use_mode: ChannelUseMode::default(),
queue_federation: None,
exchange_federation: Some(params),
}
}
}
impl<'a> From<FederationUpstreamParams<'a>> for RuntimeParameterDefinition<'a> {
fn from(params: FederationUpstreamParams<'a>) -> Self {
let mut value = Map::new();
value.insert("uri".to_owned(), json!(params.uri));
value.insert("prefetch-count".to_owned(), json!(params.prefetch_count));
value.insert("trust-user-id".to_owned(), json!(params.trust_user_id));
value.insert("reconnect-delay".to_owned(), json!(params.reconnect_delay));
value.insert("ack-mode".to_owned(), json!(params.ack_mode));
value.insert("bind-nowait".to_owned(), json!(params.bind_using_nowait));
value.insert(
"channel-use-mode".to_owned(),
json!(params.channel_use_mode),
);
if let Some(qf) = params.queue_federation {
value.insert("queue".to_owned(), json!(qf.queue));
if let Some(val) = qf.consumer_tag {
value.insert("consumer-tag".to_owned(), json!(val));
}
}
if let Some(ef) = params.exchange_federation {
if let Some(qt) = &ef.queue_type {
value.insert("queue-type".to_owned(), json!(qt));
}
value.insert(
"resource-cleanup-mode".to_owned(),
json!(ef.resource_cleanup_mode),
);
if let Some(val) = ef.exchange {
value.insert("exchange".to_owned(), json!(val));
};
if let Some(val) = ef.max_hops {
value.insert("max-hops".to_owned(), json!(val));
}
if let Some(val) = ef.ttl {
value.insert("expires".to_owned(), json!(val));
}
if let Some(val) = ef.message_ttl {
value.insert("message-ttl".to_owned(), json!(val));
}
}
Self {
name: params.name,
vhost: params.vhost,
component: FEDERATION_UPSTREAM_COMPONENT,
value,
}
}
}
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct OwnedFederationUpstreamParams {
pub name: String,
pub vhost: String,
pub uri: String,
pub reconnect_delay: u32,
pub trust_user_id: bool,
pub prefetch_count: u32,
pub ack_mode: MessageTransferAcknowledgementMode,
pub bind_using_nowait: bool,
pub channel_use_mode: ChannelUseMode,
pub queue_federation: Option<OwnedQueueFederationParams>,
pub exchange_federation: Option<OwnedExchangeFederationParams>,
}
impl OwnedFederationUpstreamParams {
pub fn with_uri(mut self, uri: impl Into<String>) -> Self {
self.uri = uri.into();
self
}
}
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct OwnedQueueFederationParams {
pub queue: Option<String>,
pub consumer_tag: Option<String>,
}
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct OwnedExchangeFederationParams {
pub exchange: Option<String>,
pub max_hops: Option<u8>,
pub queue_type: Option<QueueType>,
pub ttl: Option<u32>,
pub message_ttl: Option<u32>,
pub resource_cleanup_mode: FederationResourceCleanupMode,
}
impl From<FederationUpstream> for OwnedFederationUpstreamParams {
fn from(upstream: FederationUpstream) -> Self {
let queue_federation = if upstream.queue.is_some() || upstream.consumer_tag.is_some() {
Some(OwnedQueueFederationParams {
queue: upstream.queue,
consumer_tag: upstream.consumer_tag,
})
} else {
None
};
let exchange_federation = if upstream.exchange.is_some()
|| upstream.max_hops.is_some()
|| upstream.queue_type.is_some()
|| upstream.expires.is_some()
|| upstream.message_ttl.is_some()
|| upstream.resource_cleanup_mode != FederationResourceCleanupMode::default()
{
Some(OwnedExchangeFederationParams {
exchange: upstream.exchange,
max_hops: upstream.max_hops,
queue_type: upstream.queue_type,
ttl: upstream.expires,
message_ttl: upstream.message_ttl,
resource_cleanup_mode: upstream.resource_cleanup_mode,
})
} else {
None
};
Self {
name: upstream.name,
vhost: upstream.vhost,
uri: upstream.uri,
reconnect_delay: upstream
.reconnect_delay
.unwrap_or(DEFAULT_FEDERATION_RECONNECT_DELAY),
trust_user_id: upstream.trust_user_id.unwrap_or(false),
ack_mode: upstream.ack_mode,
prefetch_count: upstream
.prefetch_count
.unwrap_or(DEFAULT_FEDERATION_PREFETCH),
bind_using_nowait: upstream.bind_using_nowait,
channel_use_mode: upstream.channel_use_mode,
queue_federation,
exchange_federation,
}
}
}
impl<'a> From<&'a OwnedFederationUpstreamParams> for FederationUpstreamParams<'a> {
fn from(owned: &'a OwnedFederationUpstreamParams) -> Self {
let queue_federation = owned
.queue_federation
.as_ref()
.map(|qf| QueueFederationParams {
queue: qf.queue.as_deref(),
consumer_tag: qf.consumer_tag.as_deref(),
});
let exchange_federation =
owned
.exchange_federation
.as_ref()
.map(|ef| ExchangeFederationParams {
exchange: ef.exchange.as_deref(),
max_hops: ef.max_hops,
queue_type: ef.queue_type.clone(),
ttl: ef.ttl,
message_ttl: ef.message_ttl,
resource_cleanup_mode: ef.resource_cleanup_mode.clone(),
});
Self {
name: &owned.name,
vhost: &owned.vhost,
uri: &owned.uri,
reconnect_delay: owned.reconnect_delay,
trust_user_id: owned.trust_user_id,
prefetch_count: owned.prefetch_count,
ack_mode: owned.ack_mode.clone(),
bind_using_nowait: owned.bind_using_nowait,
channel_use_mode: owned.channel_use_mode.clone(),
queue_federation,
exchange_federation,
}
}
}