use crate::commons::MessageTransferAcknowledgementMode;
use crate::error::ConversionError;
use crate::requests::parameters::RuntimeParameterDefinition;
use crate::responses::{RuntimeParameter, Shovel};
use serde::Serialize;
use serde_json::{Map, Value, json};
pub const SHOVEL_COMPONENT: &str = "shovel";
#[derive(Serialize)]
pub struct Amqp091ShovelParams<'a> {
pub name: &'a str,
pub vhost: &'a str,
pub acknowledgement_mode: MessageTransferAcknowledgementMode,
pub reconnect_delay: Option<u32>,
pub source: Amqp091ShovelSourceParams<'a>,
pub destination: Amqp091ShovelDestinationParams<'a>,
}
impl<'a> From<Amqp091ShovelParams<'a>> for RuntimeParameterDefinition<'a> {
fn from(params: Amqp091ShovelParams<'a>) -> Self {
let mut value = Map::new();
value.insert("src-protocol".to_owned(), json!("amqp091"));
value.insert("dest-protocol".to_owned(), json!("amqp091"));
value.insert("src-uri".to_owned(), json!(params.source.source_uri));
if let Some(sq) = params.source.source_queue {
value.insert("src-queue".to_owned(), json!(sq));
}
if let Some(sx) = params.source.source_exchange {
value.insert("src-exchange".to_owned(), json!(sx));
}
if let Some(sxrk) = params.source.source_exchange_routing_key {
value.insert("src-exchange-key".to_owned(), json!(sxrk));
}
value.insert(
"dest-uri".to_owned(),
json!(params.destination.destination_uri),
);
value.insert("ack-mode".to_owned(), json!(params.acknowledgement_mode));
if let Some(dq) = params.destination.destination_queue {
value.insert("dest-queue".to_owned(), json!(dq));
}
if let Some(dx) = params.destination.destination_exchange {
value.insert("dest-exchange".to_owned(), json!(dx));
}
if let Some(dxrk) = params.destination.destination_exchange_routing_key {
value.insert("dest-exchange-key".to_owned(), json!(dxrk));
}
if params.source.predeclared {
value.insert("src-predeclared".to_owned(), json!(true));
}
if params.destination.predeclared {
value.insert("dest-predeclared".to_owned(), json!(true));
}
if let Some(val) = params.reconnect_delay {
value.insert("reconnect-delay".to_owned(), json!(val));
}
Self {
name: params.name,
vhost: params.vhost,
component: SHOVEL_COMPONENT,
value,
}
}
}
#[derive(Serialize)]
pub struct Amqp091ShovelSourceParams<'a> {
pub source_uri: &'a str,
pub source_queue: Option<&'a str>,
pub source_exchange: Option<&'a str>,
pub source_exchange_routing_key: Option<&'a str>,
pub predeclared: bool,
}
impl<'a> Amqp091ShovelSourceParams<'a> {
pub fn queue_source(source_uri: &'a str, source_queue: &'a str) -> Self {
Self {
source_uri,
source_queue: Some(source_queue),
source_exchange: None,
source_exchange_routing_key: None,
predeclared: false,
}
}
pub fn exchange_source(
source_uri: &'a str,
source_exchange: &'a str,
source_exchange_routing_key: Option<&'a str>,
) -> Self {
Self {
source_uri,
source_exchange: Some(source_exchange),
source_exchange_routing_key,
source_queue: None,
predeclared: false,
}
}
pub fn predeclared_queue_source(source_uri: &'a str, source_queue: &'a str) -> Self {
Self {
source_uri,
source_queue: Some(source_queue),
source_exchange: None,
source_exchange_routing_key: None,
predeclared: true,
}
}
pub fn predeclared_exchange_source(
source_uri: &'a str,
source_exchange: &'a str,
source_exchange_routing_key: Option<&'a str>,
) -> Self {
Self {
source_uri,
source_exchange: Some(source_exchange),
source_exchange_routing_key,
source_queue: None,
predeclared: true,
}
}
}
#[derive(Serialize)]
pub struct Amqp091ShovelDestinationParams<'a> {
pub destination_uri: &'a str,
pub destination_queue: Option<&'a str>,
pub destination_exchange: Option<&'a str>,
pub destination_exchange_routing_key: Option<&'a str>,
pub predeclared: bool,
}
impl<'a> Amqp091ShovelDestinationParams<'a> {
pub fn queue_destination(destination_uri: &'a str, destination_queue: &'a str) -> Self {
Self {
destination_uri,
destination_queue: Some(destination_queue),
destination_exchange: None,
destination_exchange_routing_key: None,
predeclared: false,
}
}
pub fn exchange_destination(
destination_uri: &'a str,
destination_exchange: &'a str,
destination_exchange_routing_key: Option<&'a str>,
) -> Self {
Self {
destination_uri,
destination_exchange: Some(destination_exchange),
destination_exchange_routing_key,
destination_queue: None,
predeclared: false,
}
}
pub fn predeclared_queue_destination(
destination_uri: &'a str,
destination_queue: &'a str,
) -> Self {
Self {
destination_uri,
destination_queue: Some(destination_queue),
destination_exchange: None,
destination_exchange_routing_key: None,
predeclared: true,
}
}
pub fn predeclared_exchange_destination(
destination_uri: &'a str,
destination_exchange: &'a str,
destination_exchange_routing_key: Option<&'a str>,
) -> Self {
Self {
destination_uri,
destination_exchange: Some(destination_exchange),
destination_exchange_routing_key,
destination_queue: None,
predeclared: true,
}
}
}
#[derive(Serialize)]
pub struct Amqp10ShovelParams<'a> {
pub name: &'a str,
pub vhost: &'a str,
pub acknowledgement_mode: MessageTransferAcknowledgementMode,
pub reconnect_delay: Option<u32>,
pub source: Amqp10ShovelSourceParams<'a>,
pub destination: Amqp10ShovelDestinationParams<'a>,
}
#[derive(Serialize)]
pub struct Amqp10ShovelSourceParams<'a> {
pub source_uri: &'a str,
pub source_address: &'a str,
}
impl<'a> Amqp10ShovelSourceParams<'a> {
pub fn new(uri: &'a str, address: &'a str) -> Self {
Self {
source_uri: uri,
source_address: address,
}
}
}
impl<'a> From<Amqp10ShovelParams<'a>> for RuntimeParameterDefinition<'a> {
fn from(params: Amqp10ShovelParams<'a>) -> Self {
let mut value = Map::new();
value.insert("src-protocol".to_owned(), json!("amqp10"));
value.insert("dest-protocol".to_owned(), json!("amqp10"));
value.insert("src-uri".to_owned(), json!(params.source.source_uri));
value.insert(
"src-address".to_owned(),
json!(params.source.source_address),
);
value.insert(
"dest-uri".to_owned(),
json!(params.destination.destination_uri),
);
value.insert(
"dest-address".to_owned(),
json!(params.destination.destination_address),
);
value.insert("ack-mode".to_owned(), json!(params.acknowledgement_mode));
if let Some(val) = params.reconnect_delay {
value.insert("reconnect-delay".to_owned(), json!(val));
}
Self {
name: params.name,
vhost: params.vhost,
component: SHOVEL_COMPONENT,
value,
}
}
}
#[derive(Serialize)]
pub struct Amqp10ShovelDestinationParams<'a> {
pub destination_uri: &'a str,
pub destination_address: &'a str,
}
impl<'a> Amqp10ShovelDestinationParams<'a> {
pub fn new(uri: &'a str, address: &'a str) -> Self {
Self {
destination_uri: uri,
destination_address: address,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Amqp091ShovelSourceEndpoint<'a> {
Queue {
uri: &'a str,
queue: &'a str,
predeclared: bool,
},
Exchange {
uri: &'a str,
exchange: &'a str,
routing_key: Option<&'a str>,
predeclared: bool,
},
}
impl<'a> Amqp091ShovelSourceEndpoint<'a> {
pub fn queue(uri: &'a str, queue: &'a str) -> Self {
Self::Queue {
uri,
queue,
predeclared: false,
}
}
pub fn predeclared_queue(uri: &'a str, queue: &'a str) -> Self {
Self::Queue {
uri,
queue,
predeclared: true,
}
}
pub fn exchange(uri: &'a str, exchange: &'a str, routing_key: Option<&'a str>) -> Self {
Self::Exchange {
uri,
exchange,
routing_key,
predeclared: false,
}
}
pub fn predeclared_exchange(
uri: &'a str,
exchange: &'a str,
routing_key: Option<&'a str>,
) -> Self {
Self::Exchange {
uri,
exchange,
routing_key,
predeclared: true,
}
}
pub fn uri(&self) -> &'a str {
match self {
Self::Queue { uri, .. } => uri,
Self::Exchange { uri, .. } => uri,
}
}
pub fn is_predeclared(&self) -> bool {
match self {
Self::Queue { predeclared, .. } => *predeclared,
Self::Exchange { predeclared, .. } => *predeclared,
}
}
}
impl<'a> From<Amqp091ShovelSourceEndpoint<'a>> for Amqp091ShovelSourceParams<'a> {
fn from(endpoint: Amqp091ShovelSourceEndpoint<'a>) -> Self {
match endpoint {
Amqp091ShovelSourceEndpoint::Queue {
uri,
queue,
predeclared,
} => Self {
source_uri: uri,
source_queue: Some(queue),
source_exchange: None,
source_exchange_routing_key: None,
predeclared,
},
Amqp091ShovelSourceEndpoint::Exchange {
uri,
exchange,
routing_key,
predeclared,
} => Self {
source_uri: uri,
source_queue: None,
source_exchange: Some(exchange),
source_exchange_routing_key: routing_key,
predeclared,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Amqp091ShovelDestinationEndpoint<'a> {
Queue {
uri: &'a str,
queue: &'a str,
predeclared: bool,
},
Exchange {
uri: &'a str,
exchange: &'a str,
routing_key: Option<&'a str>,
predeclared: bool,
},
}
impl<'a> Amqp091ShovelDestinationEndpoint<'a> {
pub fn queue(uri: &'a str, queue: &'a str) -> Self {
Self::Queue {
uri,
queue,
predeclared: false,
}
}
pub fn predeclared_queue(uri: &'a str, queue: &'a str) -> Self {
Self::Queue {
uri,
queue,
predeclared: true,
}
}
pub fn exchange(uri: &'a str, exchange: &'a str, routing_key: Option<&'a str>) -> Self {
Self::Exchange {
uri,
exchange,
routing_key,
predeclared: false,
}
}
pub fn predeclared_exchange(
uri: &'a str,
exchange: &'a str,
routing_key: Option<&'a str>,
) -> Self {
Self::Exchange {
uri,
exchange,
routing_key,
predeclared: true,
}
}
pub fn uri(&self) -> &'a str {
match self {
Self::Queue { uri, .. } => uri,
Self::Exchange { uri, .. } => uri,
}
}
pub fn is_predeclared(&self) -> bool {
match self {
Self::Queue { predeclared, .. } => *predeclared,
Self::Exchange { predeclared, .. } => *predeclared,
}
}
}
impl<'a> From<Amqp091ShovelDestinationEndpoint<'a>> for Amqp091ShovelDestinationParams<'a> {
fn from(endpoint: Amqp091ShovelDestinationEndpoint<'a>) -> Self {
match endpoint {
Amqp091ShovelDestinationEndpoint::Queue {
uri,
queue,
predeclared,
} => Self {
destination_uri: uri,
destination_queue: Some(queue),
destination_exchange: None,
destination_exchange_routing_key: None,
predeclared,
},
Amqp091ShovelDestinationEndpoint::Exchange {
uri,
exchange,
routing_key,
predeclared,
} => Self {
destination_uri: uri,
destination_queue: None,
destination_exchange: Some(exchange),
destination_exchange_routing_key: routing_key,
predeclared,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OwnedAmqp091ShovelSourceEndpoint {
Queue {
uri: String,
queue: String,
predeclared: bool,
},
Exchange {
uri: String,
exchange: String,
routing_key: Option<String>,
predeclared: bool,
},
}
impl OwnedAmqp091ShovelSourceEndpoint {
pub fn queue(uri: impl Into<String>, queue: impl Into<String>) -> Self {
Self::Queue {
uri: uri.into(),
queue: queue.into(),
predeclared: false,
}
}
pub fn predeclared_queue(uri: impl Into<String>, queue: impl Into<String>) -> Self {
Self::Queue {
uri: uri.into(),
queue: queue.into(),
predeclared: true,
}
}
pub fn exchange(
uri: impl Into<String>,
exchange: impl Into<String>,
routing_key: Option<String>,
) -> Self {
Self::Exchange {
uri: uri.into(),
exchange: exchange.into(),
routing_key,
predeclared: false,
}
}
pub fn predeclared_exchange(
uri: impl Into<String>,
exchange: impl Into<String>,
routing_key: Option<String>,
) -> Self {
Self::Exchange {
uri: uri.into(),
exchange: exchange.into(),
routing_key,
predeclared: true,
}
}
pub fn uri(&self) -> &str {
match self {
Self::Queue { uri, .. } => uri,
Self::Exchange { uri, .. } => uri,
}
}
pub fn is_predeclared(&self) -> bool {
match self {
Self::Queue { predeclared, .. } => *predeclared,
Self::Exchange { predeclared, .. } => *predeclared,
}
}
pub fn as_ref(&self) -> Amqp091ShovelSourceEndpoint<'_> {
match self {
Self::Queue {
uri,
queue,
predeclared,
} => Amqp091ShovelSourceEndpoint::Queue {
uri,
queue,
predeclared: *predeclared,
},
Self::Exchange {
uri,
exchange,
routing_key,
predeclared,
} => Amqp091ShovelSourceEndpoint::Exchange {
uri,
exchange,
routing_key: routing_key.as_deref(),
predeclared: *predeclared,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OwnedAmqp091ShovelDestinationEndpoint {
Queue {
uri: String,
queue: String,
predeclared: bool,
},
Exchange {
uri: String,
exchange: String,
routing_key: Option<String>,
predeclared: bool,
},
}
impl OwnedAmqp091ShovelDestinationEndpoint {
pub fn queue(uri: impl Into<String>, queue: impl Into<String>) -> Self {
Self::Queue {
uri: uri.into(),
queue: queue.into(),
predeclared: false,
}
}
pub fn predeclared_queue(uri: impl Into<String>, queue: impl Into<String>) -> Self {
Self::Queue {
uri: uri.into(),
queue: queue.into(),
predeclared: true,
}
}
pub fn exchange(
uri: impl Into<String>,
exchange: impl Into<String>,
routing_key: Option<String>,
) -> Self {
Self::Exchange {
uri: uri.into(),
exchange: exchange.into(),
routing_key,
predeclared: false,
}
}
pub fn predeclared_exchange(
uri: impl Into<String>,
exchange: impl Into<String>,
routing_key: Option<String>,
) -> Self {
Self::Exchange {
uri: uri.into(),
exchange: exchange.into(),
routing_key,
predeclared: true,
}
}
pub fn uri(&self) -> &str {
match self {
Self::Queue { uri, .. } => uri,
Self::Exchange { uri, .. } => uri,
}
}
pub fn is_predeclared(&self) -> bool {
match self {
Self::Queue { predeclared, .. } => *predeclared,
Self::Exchange { predeclared, .. } => *predeclared,
}
}
pub fn as_ref(&self) -> Amqp091ShovelDestinationEndpoint<'_> {
match self {
Self::Queue {
uri,
queue,
predeclared,
} => Amqp091ShovelDestinationEndpoint::Queue {
uri,
queue,
predeclared: *predeclared,
},
Self::Exchange {
uri,
exchange,
routing_key,
predeclared,
} => Amqp091ShovelDestinationEndpoint::Exchange {
uri,
exchange,
routing_key: routing_key.as_deref(),
predeclared: *predeclared,
},
}
}
}
pub type MessageProperties = Map<String, Value>;
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
pub struct OwnedShovelParams {
pub name: String,
pub vhost: String,
pub source_protocol: String,
pub destination_protocol: String,
pub acknowledgement_mode: MessageTransferAcknowledgementMode,
pub reconnect_delay: Option<u32>,
pub source_uri: String,
pub source_queue: Option<String>,
pub source_exchange: Option<String>,
pub source_exchange_routing_key: Option<String>,
pub source_address: Option<String>,
pub source_predeclared: Option<bool>,
pub destination_uri: String,
pub destination_queue: Option<String>,
pub destination_exchange: Option<String>,
pub destination_exchange_routing_key: Option<String>,
pub destination_address: Option<String>,
pub destination_predeclared: Option<bool>,
}
impl OwnedShovelParams {
pub fn with_source_uri(mut self, uri: impl Into<String>) -> Self {
self.source_uri = uri.into();
self
}
pub fn with_destination_uri(mut self, uri: impl Into<String>) -> Self {
self.destination_uri = uri.into();
self
}
}
impl TryFrom<RuntimeParameter> for OwnedShovelParams {
type Error = ConversionError;
fn try_from(param: RuntimeParameter) -> Result<Self, Self::Error> {
let values = ¶m.value.0;
let source_protocol = values
.get("src-protocol")
.and_then(|v| v.as_str())
.ok_or_else(|| ConversionError::MissingProperty {
argument: "src-protocol".to_string(),
})?
.to_string();
let destination_protocol = values
.get("dest-protocol")
.and_then(|v| v.as_str())
.ok_or_else(|| ConversionError::MissingProperty {
argument: "dest-protocol".to_string(),
})?
.to_string();
let source_uri = values
.get("src-uri")
.and_then(|v| v.as_str())
.ok_or_else(|| ConversionError::MissingProperty {
argument: "src-uri".to_string(),
})?
.to_string();
let destination_uri = values
.get("dest-uri")
.and_then(|v| v.as_str())
.ok_or_else(|| ConversionError::MissingProperty {
argument: "dest-uri".to_string(),
})?
.to_string();
let acknowledgement_mode = values
.get("ack-mode")
.and_then(|v| v.as_str())
.map(MessageTransferAcknowledgementMode::from)
.unwrap_or_default();
let reconnect_delay = values
.get("reconnect-delay")
.and_then(|v| v.as_u64())
.map(|v| v as u32);
let source_queue = values
.get("src-queue")
.and_then(|v| v.as_str())
.map(String::from);
let source_exchange = values
.get("src-exchange")
.and_then(|v| v.as_str())
.map(String::from);
let source_exchange_routing_key = values
.get("src-exchange-key")
.and_then(|v| v.as_str())
.map(String::from);
let source_address = values
.get("src-address")
.and_then(|v| v.as_str())
.map(String::from);
let source_predeclared = values.get("src-predeclared").and_then(|v| v.as_bool());
let destination_queue = values
.get("dest-queue")
.and_then(|v| v.as_str())
.map(String::from);
let destination_exchange = values
.get("dest-exchange")
.and_then(|v| v.as_str())
.map(String::from);
let destination_exchange_routing_key = values
.get("dest-exchange-key")
.and_then(|v| v.as_str())
.map(String::from);
let destination_address = values
.get("dest-address")
.and_then(|v| v.as_str())
.map(String::from);
let destination_predeclared = values.get("dest-predeclared").and_then(|v| v.as_bool());
Ok(OwnedShovelParams {
name: param.name,
vhost: param.vhost.to_string(),
source_protocol,
destination_protocol,
acknowledgement_mode,
reconnect_delay,
source_uri,
source_queue,
source_exchange,
source_exchange_routing_key,
source_address,
source_predeclared,
destination_uri,
destination_queue,
destination_exchange,
destination_exchange_routing_key,
destination_address,
destination_predeclared,
})
}
}
impl<'a> From<&'a OwnedShovelParams> for RuntimeParameterDefinition<'a> {
fn from(params: &'a OwnedShovelParams) -> Self {
let mut value = Map::new();
value.insert("src-protocol".to_owned(), json!(params.source_protocol));
value.insert(
"dest-protocol".to_owned(),
json!(params.destination_protocol),
);
value.insert("src-uri".to_owned(), json!(params.source_uri));
value.insert("dest-uri".to_owned(), json!(params.destination_uri));
value.insert("ack-mode".to_owned(), json!(params.acknowledgement_mode));
if let Some(delay) = params.reconnect_delay {
value.insert("reconnect-delay".to_owned(), json!(delay));
}
if let Some(queue) = ¶ms.source_queue {
value.insert("src-queue".to_owned(), json!(queue));
}
if let Some(exchange) = ¶ms.source_exchange {
value.insert("src-exchange".to_owned(), json!(exchange));
}
if let Some(key) = ¶ms.source_exchange_routing_key {
value.insert("src-exchange-key".to_owned(), json!(key));
}
if let Some(address) = ¶ms.source_address {
value.insert("src-address".to_owned(), json!(address));
}
if let Some(predeclared) = params.source_predeclared {
value.insert("src-predeclared".to_owned(), json!(predeclared));
}
if let Some(queue) = ¶ms.destination_queue {
value.insert("dest-queue".to_owned(), json!(queue));
}
if let Some(exchange) = ¶ms.destination_exchange {
value.insert("dest-exchange".to_owned(), json!(exchange));
}
if let Some(key) = ¶ms.destination_exchange_routing_key {
value.insert("dest-exchange-key".to_owned(), json!(key));
}
if let Some(address) = ¶ms.destination_address {
value.insert("dest-address".to_owned(), json!(address));
}
if let Some(predeclared) = params.destination_predeclared {
value.insert("dest-predeclared".to_owned(), json!(predeclared));
}
Self {
name: ¶ms.name,
vhost: ¶ms.vhost,
component: SHOVEL_COMPONENT,
value,
}
}
}
impl From<Shovel> for OwnedShovelParams {
fn from(shovel: Shovel) -> Self {
Self {
name: shovel.name,
vhost: shovel.vhost.unwrap_or_default(),
source_protocol: shovel
.source_protocol
.map(|p| p.to_string())
.unwrap_or_default(),
destination_protocol: shovel
.destination_protocol
.map(|p| p.to_string())
.unwrap_or_default(),
acknowledgement_mode: MessageTransferAcknowledgementMode::default(),
reconnect_delay: None,
source_uri: shovel.source_uri.unwrap_or_default(),
source_queue: shovel.source,
source_exchange: None,
source_exchange_routing_key: None,
source_address: shovel.source_address,
source_predeclared: None,
destination_uri: shovel.destination_uri.unwrap_or_default(),
destination_queue: shovel.destination,
destination_exchange: None,
destination_exchange_routing_key: None,
destination_address: shovel.destination_address,
destination_predeclared: None,
}
}
}