r2d2_nats/
lib.rs

1pub extern crate nats;
2pub extern crate r2d2;
3
4use nats::{Client, NatsError};
5use std::error;
6use std::error::Error as _StdError;
7use std::fmt;
8
9#[derive(Debug)]
10pub enum Error {
11    Other(NatsError),
12}
13
14impl fmt::Display for Error {
15    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
16        match self.cause() {
17            Some(cause) => write!(fmt, "{}: {}", self.description(), cause),
18            None => write!(fmt, "{}", self.description()),
19        }
20    }
21}
22
23impl error::Error for Error {
24    fn description(&self) -> &str {
25        match *self {
26            Error::Other(ref err) => err.description(),
27        }
28    }
29    fn cause(&self) -> Option<&error::Error> {
30        match *self {
31            Error::Other(ref err) => err.cause(),
32        }
33    }
34}
35
36#[derive(Debug)]
37pub struct NatsConnectionManager {
38    params: String,
39}
40
41impl NatsConnectionManager {
42    pub fn new(connection_string: String) -> Result<NatsConnectionManager, NatsError> {
43        Ok(NatsConnectionManager {
44            params: connection_string,
45        })
46    }
47}
48
49impl r2d2::ManageConnection for NatsConnectionManager {
50    type Connection = Client;
51    type Error = Error;
52
53    fn connect(&self) -> Result<Client, Error> {
54        match Client::new(self.params.to_owned()) {
55            Ok(client) => Ok(client),
56            Err(err) => Err(Error::Other(err)),
57        }
58    }
59
60    fn is_valid(&self, conn: &mut Client) -> Result<(), Error> {
61        match conn.publish("r2d2_nats", "PING".as_bytes()) {
62            Ok(_) => Ok(()),
63            Err(err) => Err(Error::Other(err)),
64        }
65    }
66
67    fn has_broken(&self, _conn: &mut Client) -> bool {
68        false
69    }
70}