lapin 1.10.0

AMQP client library
Documentation
use crate::{
    exchange::ExchangeKind,
    options::{ExchangeDeclareOptions, QueueDeclareOptions},
    topology::{BindingDefinition, ExchangeDefinition},
    topology_internal::QueueDefinitionInternal,
    types::{FieldTable, ShortString},
};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};

#[derive(Clone, Default)]
pub(crate) struct Registry(Arc<Mutex<Inner>>);

impl Registry {
    pub(crate) fn exchanges_topology(&self) -> Vec<ExchangeDefinition> {
        self.0.lock().exchanges.values().cloned().collect()
    }

    pub(crate) fn queues_topology(&self, exclusive: bool) -> Vec<QueueDefinitionInternal> {
        self.0
            .lock()
            .queues
            .values()
            .filter(|q| q.is_exclusive() == exclusive)
            .map(|q| q.clone())
            .collect()
    }

    pub(crate) fn register_exchange(
        &self,
        name: ShortString,
        kind: ExchangeKind,
        options: ExchangeDeclareOptions,
        arguments: FieldTable,
    ) {
        let mut inner = self.0.lock();
        if let Some(exchange) = inner.exchanges.get_mut(&name) {
            exchange.kind = Some(kind);
            exchange.options = Some(options);
            exchange.arguments = Some(arguments);
        } else {
            inner.exchanges.insert(
                name.clone(),
                ExchangeDefinition {
                    name,
                    kind: Some(kind),
                    options: Some(options),
                    arguments: Some(arguments),
                    bindings: Vec::new(),
                },
            );
        }
    }

    pub(crate) fn deregister_exchange(&self, name: &str) {
        self.0.lock().exchanges.remove(name);
    }

    pub(crate) fn register_exchange_binding(
        &self,
        destination: ShortString,
        source: ShortString,
        routing_key: ShortString,
        arguments: FieldTable,
    ) {
        self.0
            .lock()
            .exchanges
            .entry(destination.clone())
            .or_insert_with(|| ExchangeDefinition {
                name: destination,
                kind: None,
                options: None,
                arguments: None,
                bindings: Vec::new(),
            })
            .bindings
            .push(BindingDefinition {
                source,
                routing_key,
                arguments,
            });
    }

    pub(crate) fn deregister_exchange_binding(
        &self,
        destination: &str,
        source: &str,
        routing_key: &str,
        arguments: &FieldTable,
    ) {
        if let Some(destination) = self.0.lock().exchanges.get_mut(destination) {
            destination.bindings.retain(|binding| {
                binding.source.as_str() != source
                    || binding.routing_key.as_str() != routing_key
                    || &binding.arguments != arguments
            });
        }
    }

    pub(crate) fn register_queue(
        &self,
        name: ShortString,
        options: QueueDeclareOptions,
        arguments: FieldTable,
    ) {
        let mut inner = self.0.lock();
        if let Some(queue) = inner.queues.get_mut(&name) {
            queue.set_declared(options, arguments);
        } else {
            inner.queues.insert(
                name.clone(),
                QueueDefinitionInternal::declared(name, options, arguments),
            );
        }
    }

    pub(crate) fn deregister_queue(&self, name: &str) {
        self.0.lock().queues.remove(name);
    }

    pub(crate) fn register_queue_binding(
        &self,
        destination: ShortString,
        source: ShortString,
        routing_key: ShortString,
        arguments: FieldTable,
    ) {
        self.0
            .lock()
            .queues
            .entry(destination.clone())
            .or_insert_with(|| QueueDefinitionInternal::undeclared(destination))
            .register_binding(source, routing_key, arguments);
    }

    pub(crate) fn deregister_queue_binding(
        &self,
        destination: &str,
        source: &str,
        routing_key: &str,
        arguments: &FieldTable,
    ) {
        if let Some(destination) = self.0.lock().queues.get_mut(destination) {
            destination.deregister_binding(source, routing_key, arguments);
        }
    }
}

#[derive(Default)]
struct Inner {
    exchanges: HashMap<ShortString, ExchangeDefinition>,
    queues: HashMap<ShortString, QueueDefinitionInternal>,
}