use crate::commons::QueueType;
use crate::requests::XArguments;
use crate::responses::QueueInfo;
use serde::Serialize;
use serde_json::{Map, Value, json};
#[derive(Serialize, Debug)]
pub struct QueueParams<'a> {
pub name: &'a str,
#[serde(skip_serializing)]
pub queue_type: QueueType,
pub durable: bool,
pub auto_delete: bool,
pub exclusive: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: XArguments,
}
impl<'a> QueueParams<'a> {
pub fn new_quorum_queue(name: &'a str, optional_args: XArguments) -> Self {
let typ = QueueType::Quorum;
let args = Self::combined_args(optional_args, &typ);
Self {
name,
queue_type: QueueType::Quorum,
durable: true,
auto_delete: false,
exclusive: false,
arguments: args,
}
}
pub fn new_stream(name: &'a str, optional_args: XArguments) -> Self {
let typ = QueueType::Stream;
let args = Self::combined_args(optional_args, &typ);
Self {
name,
queue_type: QueueType::Stream,
durable: true,
auto_delete: false,
exclusive: false,
arguments: args,
}
}
pub fn new_durable_classic_queue(name: &'a str, optional_args: XArguments) -> Self {
let typ = QueueType::Classic;
let args = Self::combined_args(optional_args, &typ);
Self {
name,
queue_type: QueueType::Classic,
durable: true,
auto_delete: false,
exclusive: false,
arguments: args,
}
}
pub fn new_transient_autodelete(name: &'a str, optional_args: XArguments) -> Self {
let typ = QueueType::Classic;
let args = Self::combined_args(optional_args, &typ);
Self {
name,
queue_type: QueueType::Classic,
durable: false,
auto_delete: true,
exclusive: false,
arguments: args,
}
}
pub fn new(
name: &'a str,
queue_type: QueueType,
durable: bool,
auto_delete: bool,
optional_args: XArguments,
) -> Self {
let args = Self::combined_args(optional_args, &queue_type);
Self {
name,
queue_type,
durable,
auto_delete,
exclusive: false,
arguments: args,
}
}
pub fn combined_args(optional_args: XArguments, queue_type: &QueueType) -> XArguments {
let mut result = Map::<String, Value>::new();
result.insert("x-queue-type".to_owned(), json!(queue_type));
if let Some(mut val) = optional_args {
result.append(&mut val)
}
Some(result)
}
pub fn with_message_ttl(mut self, millis: u64) -> Self {
self.add_argument("x-message-ttl".to_owned(), json!(millis));
self
}
pub fn with_queue_ttl(mut self, millis: u64) -> Self {
self.add_argument("x-expires".to_owned(), json!(millis));
self
}
pub fn with_max_length(mut self, max_length: u64) -> Self {
self.add_argument("x-max-length".to_owned(), json!(max_length));
self
}
pub fn with_max_length_bytes(mut self, max_length_in_bytes: u64) -> Self {
self.add_argument("x-max-length-bytes".to_owned(), json!(max_length_in_bytes));
self
}
pub fn with_dead_letter_exchange(mut self, exchange: &str) -> Self {
self.add_argument("x-dead-letter-exchange".to_owned(), json!(exchange));
self
}
pub fn with_dead_letter_routing_key(mut self, routing_key: &str) -> Self {
self.add_argument("x-dead-letter-routing-key".to_owned(), json!(routing_key));
self
}
pub fn with_argument(mut self, key: String, value: Value) -> Self {
self.add_argument(key, value);
self
}
fn add_argument(&mut self, key: String, value: Value) {
self.arguments
.get_or_insert_with(Default::default)
.insert(key, value);
}
}
impl<'a> From<&'a QueueInfo> for QueueParams<'a> {
fn from(queue: &'a QueueInfo) -> Self {
let queue_type = QueueType::from(queue.queue_type.as_str());
Self {
name: &queue.name,
queue_type,
durable: queue.durable,
auto_delete: queue.auto_delete,
exclusive: queue.exclusive,
arguments: if queue.arguments.is_empty() {
None
} else {
Some(queue.arguments.0.clone())
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OwnedQueueParams {
pub name: String,
pub queue_type: QueueType,
pub durable: bool,
pub auto_delete: bool,
pub exclusive: bool,
pub arguments: XArguments,
}
impl OwnedQueueParams {
pub fn new_quorum_queue(name: impl Into<String>, optional_args: XArguments) -> Self {
let name = name.into();
let typ = QueueType::Quorum;
let args = QueueParams::combined_args(optional_args, &typ);
Self {
name,
queue_type: QueueType::Quorum,
durable: true,
auto_delete: false,
exclusive: false,
arguments: args,
}
}
pub fn new_stream(name: impl Into<String>, optional_args: XArguments) -> Self {
let name = name.into();
let typ = QueueType::Stream;
let args = QueueParams::combined_args(optional_args, &typ);
Self {
name,
queue_type: QueueType::Stream,
durable: true,
auto_delete: false,
exclusive: false,
arguments: args,
}
}
pub fn new_durable_classic_queue(name: impl Into<String>, optional_args: XArguments) -> Self {
let name = name.into();
let typ = QueueType::Classic;
let args = QueueParams::combined_args(optional_args, &typ);
Self {
name,
queue_type: QueueType::Classic,
durable: true,
auto_delete: false,
exclusive: false,
arguments: args,
}
}
pub fn new(
name: impl Into<String>,
queue_type: QueueType,
durable: bool,
auto_delete: bool,
optional_args: XArguments,
) -> Self {
let name = name.into();
let args = QueueParams::combined_args(optional_args, &queue_type);
Self {
name,
queue_type,
durable,
auto_delete,
exclusive: false,
arguments: args,
}
}
pub fn with_message_ttl(mut self, millis: u64) -> Self {
self.add_argument("x-message-ttl", json!(millis));
self
}
pub fn with_queue_ttl(mut self, millis: u64) -> Self {
self.add_argument("x-expires", json!(millis));
self
}
pub fn with_max_length(mut self, max_length: u64) -> Self {
self.add_argument("x-max-length", json!(max_length));
self
}
pub fn with_max_length_bytes(mut self, max_length_in_bytes: u64) -> Self {
self.add_argument("x-max-length-bytes", json!(max_length_in_bytes));
self
}
pub fn with_dead_letter_exchange(mut self, exchange: impl Into<String>) -> Self {
self.add_argument("x-dead-letter-exchange", json!(exchange.into()));
self
}
pub fn with_dead_letter_routing_key(mut self, routing_key: impl Into<String>) -> Self {
self.add_argument("x-dead-letter-routing-key", json!(routing_key.into()));
self
}
pub fn with_argument(mut self, key: impl Into<String>, value: Value) -> Self {
self.add_argument(key, value);
self
}
fn add_argument(&mut self, key: impl Into<String>, value: Value) {
self.arguments
.get_or_insert_with(Default::default)
.insert(key.into(), value);
}
pub fn as_ref(&self) -> QueueParams<'_> {
QueueParams {
name: &self.name,
queue_type: self.queue_type.clone(),
durable: self.durable,
auto_delete: self.auto_delete,
exclusive: self.exclusive,
arguments: self.arguments.clone(),
}
}
}
impl From<QueueInfo> for OwnedQueueParams {
fn from(queue: QueueInfo) -> Self {
Self {
name: queue.name,
queue_type: QueueType::from(queue.queue_type.as_str()),
durable: queue.durable,
auto_delete: queue.auto_delete,
exclusive: queue.exclusive,
arguments: if queue.arguments.is_empty() {
None
} else {
Some(queue.arguments.0)
},
}
}
}
impl<'a> From<QueueParams<'a>> for OwnedQueueParams {
fn from(params: QueueParams<'a>) -> Self {
Self {
name: params.name.to_owned(),
queue_type: params.queue_type,
durable: params.durable,
auto_delete: params.auto_delete,
exclusive: params.exclusive,
arguments: params.arguments,
}
}
}
#[derive(Serialize, Debug)]
pub struct StreamParams<'a> {
pub name: &'a str,
pub expiration: &'a str,
pub max_length_bytes: Option<u64>,
pub max_segment_length_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: XArguments,
}
impl<'a> StreamParams<'a> {
pub fn new(name: &'a str, expiration: &'a str) -> Self {
Self {
name,
expiration,
max_length_bytes: None,
max_segment_length_bytes: None,
arguments: None,
}
}
pub fn with_expiration_and_length_limit(
name: &'a str,
expiration: &'a str,
max_length_bytes: u64,
) -> Self {
Self {
name,
expiration,
max_length_bytes: Some(max_length_bytes),
max_segment_length_bytes: None,
arguments: None,
}
}
pub fn with_max_length_bytes(mut self, bytes: u64) -> Self {
self.max_length_bytes = Some(bytes);
self
}
pub fn with_max_segment_length_bytes(mut self, bytes: u64) -> Self {
self.max_segment_length_bytes = Some(bytes);
self
}
pub fn with_argument(mut self, key: String, value: Value) -> Self {
self.arguments
.get_or_insert_with(Default::default)
.insert(key, value);
self
}
}