1use std::default::Default;
4
5pub struct Client {
10 props: lapin::ConnectionProperties,
11}
12
13impl Client {
14 pub fn new() -> Self {
15 Self {
16 ..Default::default()
17 }
18 }
19 pub async fn connect(&self, uri: &str) -> crate::Result<Connection> {
20 let c = lapin::Connection::connect(uri, self.props.clone())
21 .await
22 .map_err(crate::Error::from)?;
23 Ok(Connection(c))
24 }
25}
26
27impl Default for Client {
28 fn default() -> Self {
29 Self {
30 props: lapin::ConnectionProperties::default(),
31 }
32 }
33}
34
35#[derive(Clone)]
41pub struct Connection(lapin::Connection);
42
43#[derive(Clone)]
44pub struct QueueOptions {
45 pub kind: lapin::ExchangeKind,
46 pub ex_opts: lapin::options::ExchangeDeclareOptions,
47 pub ex_field: lapin::types::FieldTable,
48 pub queue_opts: lapin::options::QueueDeclareOptions,
49 pub queue_field: lapin::types::FieldTable,
50 pub bind_opts: lapin::options::QueueBindOptions,
51 pub bind_field: lapin::types::FieldTable,
52}
53
54impl Connection {
55 pub fn producer_builder(&self) -> crate::ProducerBuilder {
60 crate::ProducerBuilder::new(self.clone())
61 }
62 pub fn consumer_builder(&self) -> crate::ConsumerBuilder {
67 crate::ConsumerBuilder::new(self.clone())
68 }
69 pub async fn channel(&self) -> crate::Result<lapin::Channel> {
72 self.0.create_channel().await.map_err(crate::Error::from)
73 }
74 pub async fn queue(
77 &self,
78 ex: &str,
79 queue: &str,
80 opts: QueueOptions,
81 ) -> crate::Result<(lapin::Channel, lapin::Queue)> {
82 let ch = self.0.create_channel().await.map_err(crate::Error::from)?;
83 let q = ch
84 .queue_declare(queue, opts.queue_opts, opts.queue_field)
85 .await
86 .map_err(crate::Error::from)?;
87 if Self::is_default_exchange(ex) {
88 return Ok((ch, q));
91 }
92 ch.exchange_declare(ex, opts.kind, opts.ex_opts, opts.ex_field)
93 .await
94 .map_err(crate::Error::from)?;
95 let routing_key = if Self::is_ephemeral_queue(queue) {
96 q.name().as_str()
97 } else {
98 queue
99 };
100 ch.queue_bind(
101 queue,
102 ex,
103 routing_key,
104 opts.bind_opts.clone(),
105 opts.bind_field.clone(),
106 )
107 .await
108 .map_err(crate::Error::from)?;
109 Ok((ch, q))
110 }
111 fn is_default_exchange(name: &str) -> bool {
112 name == crate::DEFAULT_EXCHANGE
113 }
114 fn is_ephemeral_queue(name: &str) -> bool {
115 name == crate::EPHEMERAL_QUEUE
116 }
117}