use serde::Serialize;
use serde_json::{Map, Value};
pub use crate::commons::OverflowBehavior;
pub mod exchanges;
pub use exchanges::{ExchangeParams, OwnedExchangeParams};
pub mod parameters;
pub use parameters::{
GlobalRuntimeParameterDefinition, RuntimeParameterDefinition, RuntimeParameterValue,
};
pub mod policies;
pub use policies::{OwnedPolicyParams, PolicyDefinition, PolicyDefinitionBuilder, PolicyParams};
pub mod federation;
pub use federation::{
DEFAULT_FEDERATION_PREFETCH, DEFAULT_FEDERATION_RECONNECT_DELAY, ExchangeFederationParams,
FEDERATION_UPSTREAM_COMPONENT, FederationResourceCleanupMode, FederationUpstreamParams,
OwnedExchangeFederationParams, OwnedFederationUpstreamParams, OwnedQueueFederationParams,
QueueFederationParams,
};
pub mod shovels;
pub use shovels::{
Amqp10ShovelDestinationParams, Amqp10ShovelParams, Amqp10ShovelSourceParams,
Amqp091ShovelDestinationEndpoint, Amqp091ShovelDestinationParams, Amqp091ShovelParams,
Amqp091ShovelSourceEndpoint, Amqp091ShovelSourceParams, MessageProperties,
OwnedAmqp091ShovelDestinationEndpoint, OwnedAmqp091ShovelSourceEndpoint, SHOVEL_COMPONENT,
};
pub mod queues_and_streams;
pub use queues_and_streams::{OwnedQueueParams, QueueParams, StreamParams};
pub mod users;
pub use users::{BulkUserDelete, OwnedUserParams, UserParams};
pub mod permissions;
pub use permissions::{Permissions, TopicPermissions};
pub mod vhosts;
pub use vhosts::{VirtualHostParams, VirtualHostParamsBuilder};
pub mod bindings;
pub use bindings::BindingDeletionParams;
pub mod limits;
pub use limits::EnforcedLimitParams;
pub type XArguments = Option<Map<String, Value>>;
#[derive(Debug, Clone, Default)]
#[must_use]
pub struct XArgumentsBuilder {
inner: Map<String, Value>,
}
impl XArgumentsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_argument(mut self, arg: OptionalQueueArgument) -> Self {
let (key, value) = arg.to_key_value();
self.inner.insert(key, value);
self
}
pub fn message_ttl(self, millis: u64) -> Self {
self.with_argument(OptionalQueueArgument::MessageTtl(millis))
}
pub fn queue_ttl(self, millis: u64) -> Self {
self.with_argument(OptionalQueueArgument::QueueTtl(millis))
}
pub fn max_length(self, max: u64) -> Self {
self.with_argument(OptionalQueueArgument::MaxLength(max))
}
pub fn max_length_bytes(self, max_bytes: u64) -> Self {
self.with_argument(OptionalQueueArgument::MaxLengthBytes(max_bytes))
}
pub fn dead_letter_exchange(self, exchange: &str) -> Self {
self.with_argument(OptionalQueueArgument::DeadLetterExchange(
exchange.to_owned(),
))
}
pub fn dead_letter_routing_key(self, routing_key: &str) -> Self {
self.with_argument(OptionalQueueArgument::DeadLetterRoutingKey(
routing_key.to_owned(),
))
}
pub fn overflow_drop_head(self) -> Self {
self.with_argument(OptionalQueueArgument::Overflow(OverflowBehavior::DropHead))
}
pub fn overflow_reject_publish(self) -> Self {
self.with_argument(OptionalQueueArgument::Overflow(
OverflowBehavior::RejectPublish,
))
}
pub fn overflow_reject_publish_dlx(self) -> Self {
self.with_argument(OptionalQueueArgument::Overflow(
OverflowBehavior::RejectPublishDlx,
))
}
pub fn max_priority(self, max: u8) -> Self {
self.with_argument(OptionalQueueArgument::MaxPriority(max))
}
pub fn quorum_initial_group_size(self, size: u32) -> Self {
self.with_argument(OptionalQueueArgument::QuorumInitialGroupSize(size))
}
pub fn quorum_target_group_size(self, size: u32) -> Self {
self.with_argument(OptionalQueueArgument::QuorumTargetGroupSize(size))
}
pub fn delivery_limit(self, limit: u64) -> Self {
self.with_argument(OptionalQueueArgument::DeliveryLimit(limit))
}
pub fn single_active_consumer(self, enabled: bool) -> Self {
self.with_argument(OptionalQueueArgument::SingleActiveConsumer(enabled))
}
pub fn queue_leader_locator(self, locator: QueueLeaderLocator) -> Self {
self.with_argument(OptionalQueueArgument::QueueLeaderLocator(locator))
}
pub fn dead_letter_strategy(self, strategy: DeadLetterStrategy) -> Self {
self.with_argument(OptionalQueueArgument::DeadLetterStrategy(strategy))
}
pub fn initial_cluster_size(self, size: u32) -> Self {
self.with_argument(OptionalQueueArgument::InitialClusterSize(size))
}
pub fn max_age(self, age: &str) -> Self {
self.with_argument(OptionalQueueArgument::MaxAge(age.to_owned()))
}
pub fn stream_max_segment_size_bytes(self, bytes: u64) -> Self {
self.with_argument(OptionalQueueArgument::StreamMaxSegmentSizeBytes(bytes))
}
pub fn stream_filter_size_bytes(self, bytes: u8) -> Self {
self.with_argument(OptionalQueueArgument::StreamFilterSizeBytes(bytes))
}
pub fn custom(self, key: &str, value: Value) -> Self {
self.with_argument(OptionalQueueArgument::Custom(key.to_owned(), value))
}
pub fn build(self) -> XArguments {
if self.inner.is_empty() {
None
} else {
Some(self.inner)
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum OptionalQueueArgument {
MessageTtl(u64),
QueueTtl(u64),
MaxLength(u64),
MaxLengthBytes(u64),
DeadLetterExchange(String),
DeadLetterRoutingKey(String),
DeadLetterStrategy(DeadLetterStrategy),
Overflow(OverflowBehavior),
MaxPriority(u8),
QuorumInitialGroupSize(u32),
QuorumTargetGroupSize(u32),
QueueLeaderLocator(QueueLeaderLocator),
DeliveryLimit(u64),
SingleActiveConsumer(bool),
InitialClusterSize(u32),
MaxAge(String),
StreamMaxSegmentSizeBytes(u64),
StreamFilterSizeBytes(u8),
Custom(String, Value),
}
impl OptionalQueueArgument {
fn to_key_value(&self) -> (String, Value) {
match self {
OptionalQueueArgument::MessageTtl(v) => ("x-message-ttl".to_owned(), Value::from(*v)),
OptionalQueueArgument::QueueTtl(v) => ("x-expires".to_owned(), Value::from(*v)),
OptionalQueueArgument::MaxLength(v) => ("x-max-length".to_owned(), Value::from(*v)),
OptionalQueueArgument::MaxLengthBytes(v) => {
("x-max-length-bytes".to_owned(), Value::from(*v))
}
OptionalQueueArgument::DeadLetterExchange(v) => {
("x-dead-letter-exchange".to_owned(), Value::from(v.as_str()))
}
OptionalQueueArgument::DeadLetterRoutingKey(v) => (
"x-dead-letter-routing-key".to_owned(),
Value::from(v.as_str()),
),
OptionalQueueArgument::DeadLetterStrategy(v) => {
let s: &str = (*v).into();
("x-dead-letter-strategy".to_owned(), Value::from(s))
}
OptionalQueueArgument::Overflow(v) => {
let s: &str = (*v).into();
("x-overflow".to_owned(), Value::from(s))
}
OptionalQueueArgument::MaxPriority(v) => ("x-max-priority".to_owned(), Value::from(*v)),
OptionalQueueArgument::QuorumInitialGroupSize(v) => {
("x-quorum-initial-group-size".to_owned(), Value::from(*v))
}
OptionalQueueArgument::QuorumTargetGroupSize(v) => {
("x-quorum-target-group-size".to_owned(), Value::from(*v))
}
OptionalQueueArgument::QueueLeaderLocator(v) => {
let s: &str = (*v).into();
("x-queue-leader-locator".to_owned(), Value::from(s))
}
OptionalQueueArgument::DeliveryLimit(v) => {
("x-delivery-limit".to_owned(), Value::from(*v))
}
OptionalQueueArgument::SingleActiveConsumer(v) => {
("x-single-active-consumer".to_owned(), Value::from(*v))
}
OptionalQueueArgument::InitialClusterSize(v) => {
("x-initial-cluster-size".to_owned(), Value::from(*v))
}
OptionalQueueArgument::MaxAge(v) => ("x-max-age".to_owned(), Value::from(v.as_str())),
OptionalQueueArgument::StreamMaxSegmentSizeBytes(v) => (
"x-stream-max-segment-size-bytes".to_owned(),
Value::from(*v),
),
OptionalQueueArgument::StreamFilterSizeBytes(v) => {
("x-stream-filter-size-bytes".to_owned(), Value::from(*v))
}
OptionalQueueArgument::Custom(k, v) => (k.clone(), v.clone()),
}
}
pub fn to_x_arguments(args: impl IntoIterator<Item = OptionalQueueArgument>) -> XArguments {
let mut map = Map::new();
for arg in args {
let (key, value) = arg.to_key_value();
map.insert(key, value);
}
if map.is_empty() { None } else { Some(map) }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeadLetterStrategy {
AtMostOnce,
AtLeastOnce,
}
impl From<DeadLetterStrategy> for &str {
fn from(value: DeadLetterStrategy) -> Self {
match value {
DeadLetterStrategy::AtMostOnce => "at-most-once",
DeadLetterStrategy::AtLeastOnce => "at-least-once",
}
}
}
impl From<&str> for DeadLetterStrategy {
fn from(s: &str) -> Self {
match s {
"at-least-once" => DeadLetterStrategy::AtLeastOnce,
_ => DeadLetterStrategy::AtMostOnce, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueueLeaderLocator {
Balanced,
ClientLocal,
}
impl From<QueueLeaderLocator> for &str {
fn from(value: QueueLeaderLocator) -> Self {
match value {
QueueLeaderLocator::Balanced => "balanced",
QueueLeaderLocator::ClientLocal => "client-local",
}
}
}
impl From<&str> for QueueLeaderLocator {
fn from(s: &str) -> Self {
match s {
"client-local" => QueueLeaderLocator::ClientLocal,
_ => QueueLeaderLocator::Balanced, }
}
}
#[derive(Serialize, Default)]
pub struct EmptyPayload;
impl EmptyPayload {
pub fn new() -> Self {
Self
}
}