1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use futures::{Future, Poll};
use lapin::{
  Connect as LapinConnect, Connection,
  confirmation::Confirmation,
};

use crate::{
  Channel, ConfirmationFuture, ConnectionProperties, Error,
  uri::AMQPUri,
};

/// Connect to a server and create channels
#[derive(Clone)]
pub struct Client {
  conn: Connection,
}

impl Client {
  /// Connect to an AMQP Server
  pub fn connect(uri: &str, options: ConnectionProperties) -> ClientFuture {
    Connect::connect(uri, options)
  }

  /// Connect to an AMQP Server
  pub fn connect_uri(uri: AMQPUri, options: ConnectionProperties) -> ClientFuture {
    Connect::connect(uri, options)
  }

  /// Return a future that resolves to a `Channel` once the method succeeds
  pub fn create_channel(&self) -> impl Future<Item = Channel, Error = Error> + Send + 'static {
    Channel::create(&self.conn)
  }

  /// Register an error handler which will be called when connection reaches an Error state
  pub fn on_error<E: Fn() + Send + 'static>(&self, handler: Box<E>) {
    self.conn.on_error(handler);
  }
}

pub struct ClientFuture(ConfirmationFuture<Connection>);

impl Future for ClientFuture {
  type Item = Client;
  type Error = Error;

  fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    Ok(self.0.poll()?.map(|conn| Client { conn }))
  }
}

impl From<Confirmation<Connection>> for ClientFuture {
  fn from(confirmation: Confirmation<Connection>) -> Self {
    Self(confirmation.into())
  }
}

/// Trait providing a method to connect to an AMQP server
pub trait Connect {
  /// Connect to an AMQP server
  fn connect(self, options: ConnectionProperties) -> ClientFuture;
}

impl Connect for AMQPUri {
  fn connect(self, options: ConnectionProperties) -> ClientFuture {
    LapinConnect::connect(self, options).into()
  }
}

impl Connect for &str {
  fn connect(self, options: ConnectionProperties) -> ClientFuture {
    LapinConnect::connect(self, options).into()
  }
}