1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
use super::conn::Connection;

use r2d2::{ManageConnection, Pool, PooledConnection};

use crate::{OrientError, OrientResult};
use std::net::SocketAddr;
use std::sync::Arc;

pub type SyncConnection = PooledConnection<ServerConnectionManager>;

pub struct Cluster {
    servers: Vec<Arc<Server>>,
}

impl Cluster {
    pub(crate) fn builder() -> ClusterBuilder {
        ClusterBuilder::default()
    }

    pub(crate) fn connection(&self) -> OrientResult<(SyncConnection, Arc<Server>)> {
        let conn = self.servers[0].connection()?;
        Ok((conn, self.servers[0].clone()))
    }

    pub(crate) fn select(&self) -> Arc<Server> {
        self.servers[0].clone()
    }
}
pub struct ClusterBuilder {
    pool_max: u32,
    servers: Vec<SocketAddr>,
}

impl ClusterBuilder {
    pub fn build(self) -> Cluster {
        let pool_max = self.pool_max;
        let servers = self
            .servers
            .into_iter()
            .map(|s| {
                // handle unreachable servers
                Arc::new(Server::new(s, pool_max).unwrap())
            })
            .collect();
        Cluster { servers }
    }

    pub fn pool_max(mut self, pool_max: u32) -> Self {
        self.pool_max = pool_max;
        self
    }

    pub fn add_server<T: Into<SocketAddr>>(mut self, address: T) -> Self {
        self.servers.push(address.into());
        self
    }
}
impl Default for ClusterBuilder {
    fn default() -> ClusterBuilder {
        ClusterBuilder {
            pool_max: 20,
            servers: vec![],
        }
    }
}
pub struct Server {
    pool: Pool<ServerConnectionManager>,
}

impl Server {
    fn new(address: SocketAddr, pool_max: u32) -> OrientResult<Server> {
        let manager = ServerConnectionManager { address };
        let pool = Pool::builder().max_size(pool_max).build(manager)?;

        Ok(Server { pool })
    }

    pub(crate) fn connection(&self) -> OrientResult<PooledConnection<ServerConnectionManager>> {
        self.pool.get().map_err(OrientError::from)
    }
}
pub struct ServerConnectionManager {
    address: SocketAddr,
}

impl ManageConnection for ServerConnectionManager {
    type Connection = Connection;
    type Error = OrientError;

    fn connect(&self) -> OrientResult<Connection> {
        Connection::connect(&self.address)
    }

    fn is_valid(&self, _conn: &mut Connection) -> OrientResult<()> {
        Ok(())
    }

    fn has_broken(&self, _conn: &mut Connection) -> bool {
        false
    }
}