lapin 4.7.0

AMQP client library
Documentation
use crate::{
    exchange::ExchangeKind,
    options::{ExchangeDeclareOptions, QueueDeclareOptions},
    topology::{ExchangeDefinition, QueueDefinition},
    types::{FieldTable, ShortString},
};
use std::{
    collections::HashMap,
    sync::{Arc, Mutex, MutexGuard},
};

#[derive(Clone, Default)]
pub(crate) struct Registry(Arc<Mutex<Inner>>);

impl Registry {
    pub(crate) fn exchanges_topology(&self) -> Vec<ExchangeDefinition> {
        self.lock_inner().exchanges.values().cloned().collect()
    }

    pub(crate) fn queues_topology(&self) -> Vec<QueueDefinition> {
        self.lock_inner().queues.values().cloned().collect()
    }

    pub(crate) fn register_exchange(
        &self,
        name: ShortString,
        kind: ExchangeKind,
        options: ExchangeDeclareOptions,
        arguments: FieldTable,
    ) {
        let mut inner = self.lock_inner();
        if let Some(exchange) = inner.exchanges.get_mut(&name) {
            exchange.set_declared(kind, options, arguments);
        } else {
            inner.exchanges.insert(
                name.clone(),
                ExchangeDefinition::declared(name, kind, options, arguments),
            );
        }
    }

    pub(crate) fn deregister_exchange(&self, name: &str) {
        self.lock_inner().exchanges.remove(name);
    }

    pub(crate) fn register_exchange_binding(
        &self,
        destination: ShortString,
        source: ShortString,
        routing_key: ShortString,
        arguments: FieldTable,
    ) {
        self.lock_inner()
            .exchanges
            .entry(destination.clone())
            .or_insert_with(|| ExchangeDefinition::undeclared(destination))
            .register_binding(source, routing_key, arguments);
    }

    pub(crate) fn deregister_exchange_binding(
        &self,
        destination: &str,
        source: &str,
        routing_key: &str,
        arguments: &FieldTable,
    ) {
        if let Some(destination) = self.lock_inner().exchanges.get_mut(destination) {
            destination.deregister_binding(source, routing_key, arguments);
        }
    }

    pub(crate) fn register_queue(
        &self,
        name: ShortString,
        options: QueueDeclareOptions,
        arguments: FieldTable,
    ) {
        let mut inner = self.lock_inner();
        if let Some(queue) = inner.queues.get_mut(&name) {
            queue.set_declared(options, arguments);
        } else {
            inner.queues.insert(
                name.clone(),
                QueueDefinition::declared(name, options, arguments),
            );
        }
    }

    pub(crate) fn deregister_queue(&self, name: &str) {
        self.lock_inner().queues.remove(name);
    }

    pub(crate) fn register_queue_binding(
        &self,
        destination: ShortString,
        source: ShortString,
        routing_key: ShortString,
        arguments: FieldTable,
    ) {
        self.lock_inner()
            .queues
            .entry(destination.clone())
            .or_insert_with(|| QueueDefinition::undeclared(destination))
            .register_binding(source, routing_key, arguments);
    }

    pub(crate) fn deregister_queue_binding(
        &self,
        destination: &str,
        source: &str,
        routing_key: &str,
        arguments: &FieldTable,
    ) {
        if let Some(destination) = self.lock_inner().queues.get_mut(destination) {
            destination.deregister_binding(source, routing_key, arguments);
        }
    }

    fn lock_inner(&self) -> MutexGuard<'_, Inner> {
        self.0.lock().unwrap_or_else(|e| e.into_inner())
    }
}

#[derive(Default)]
struct Inner {
    exchanges: HashMap<ShortString, ExchangeDefinition>,
    queues: HashMap<ShortString, QueueDefinition>,
}