1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::default::Default;
pub struct Client {
    props: lapin::ConnectionProperties,
}
impl Client {
    pub fn new() -> Self {
        Self {
            ..Default::default()
        }
    }
    pub async fn connect(&self, uri: &str) -> crate::Result<Connection> {
        let c = lapin::Connection::connect(uri, self.props.clone())
            .await
            .map_err(crate::Error::from)?;
        Ok(Connection(c))
    }
}
impl Default for Client {
    fn default() -> Self {
        Self {
            props: lapin::ConnectionProperties::default(),
        }
    }
}
#[derive(Clone)]
pub struct Connection(lapin::Connection);
#[derive(Clone)]
pub struct QueueOptions {
    pub kind: lapin::ExchangeKind,
    pub ex_opts: lapin::options::ExchangeDeclareOptions,
    pub ex_field: lapin::types::FieldTable,
    pub queue_opts: lapin::options::QueueDeclareOptions,
    pub queue_field: lapin::types::FieldTable,
    pub bind_opts: lapin::options::QueueBindOptions,
    pub bind_field: lapin::types::FieldTable,
}
impl Connection {
    
    
    
    
    pub fn producer_builder(&self) -> crate::ProducerBuilder {
        crate::ProducerBuilder::new(self.clone())
    }
    
    
    
    
    pub fn consumer_builder(&self) -> crate::ConsumerBuilder {
        crate::ConsumerBuilder::new(self.clone())
    }
    
    
    pub async fn channel(&self) -> crate::Result<lapin::Channel> {
        self.0.create_channel().await.map_err(crate::Error::from)
    }
    
    
    pub async fn queue(
        &self,
        ex: &str,
        queue: &str,
        opts: QueueOptions,
    ) -> crate::Result<(lapin::Channel, lapin::Queue)> {
        let ch = self.0.create_channel().await.map_err(crate::Error::from)?;
        let q = ch
            .queue_declare(queue, opts.queue_opts, opts.queue_field)
            .await
            .map_err(crate::Error::from)?;
        if Self::is_default_exchange(ex) {
            
            
            return Ok((ch, q));
        }
        ch.exchange_declare(ex, opts.kind, opts.ex_opts, opts.ex_field)
            .await
            .map_err(crate::Error::from)?;
        let routing_key = if Self::is_ephemeral_queue(queue) {
            q.name().as_str()
        } else {
            queue
        };
        ch.queue_bind(
            queue,
            ex,
            routing_key,
            opts.bind_opts.clone(),
            opts.bind_field.clone(),
        )
        .await
        .map_err(crate::Error::from)?;
        Ok((ch, q))
    }
    fn is_default_exchange(name: &str) -> bool {
        name == crate::DEFAULT_EXCHANGE
    }
    fn is_ephemeral_queue(name: &str) -> bool {
        name == crate::EPHEMERAL_QUEUE
    }
}