orientdb_client/sync/network/
cluster.rs

1use super::conn::Connection;
2
3use r2d2::{ManageConnection, Pool, PooledConnection};
4
5use crate::{OrientError, OrientResult};
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9pub type SyncConnection = PooledConnection<ServerConnectionManager>;
10
11pub struct Cluster {
12    servers: Vec<Arc<Server>>,
13}
14
15impl Cluster {
16    pub(crate) fn builder() -> ClusterBuilder {
17        ClusterBuilder::default()
18    }
19
20    pub(crate) fn connection(&self) -> OrientResult<(SyncConnection, Arc<Server>)> {
21        let conn = self.servers[0].connection()?;
22        Ok((conn, self.servers[0].clone()))
23    }
24
25    pub(crate) fn select(&self) -> Arc<Server> {
26        self.servers[0].clone()
27    }
28}
29pub struct ClusterBuilder {
30    pool_max: u32,
31    servers: Vec<SocketAddr>,
32}
33
34impl ClusterBuilder {
35    pub fn build(self) -> Cluster {
36        let pool_max = self.pool_max;
37        let servers = self
38            .servers
39            .into_iter()
40            .map(|s| {
41                // handle unreachable servers
42                Arc::new(Server::new(s, pool_max).unwrap())
43            })
44            .collect();
45        Cluster { servers }
46    }
47
48    pub fn pool_max(mut self, pool_max: u32) -> Self {
49        self.pool_max = pool_max;
50        self
51    }
52
53    pub fn add_server<T: Into<SocketAddr>>(mut self, address: T) -> Self {
54        self.servers.push(address.into());
55        self
56    }
57}
58impl Default for ClusterBuilder {
59    fn default() -> ClusterBuilder {
60        ClusterBuilder {
61            pool_max: 20,
62            servers: vec![],
63        }
64    }
65}
66pub struct Server {
67    pool: Pool<ServerConnectionManager>,
68}
69
70impl Server {
71    fn new(address: SocketAddr, pool_max: u32) -> OrientResult<Server> {
72        let manager = ServerConnectionManager { address };
73        let pool = Pool::builder().max_size(pool_max).build(manager)?;
74
75        Ok(Server { pool })
76    }
77
78    pub(crate) fn connection(&self) -> OrientResult<PooledConnection<ServerConnectionManager>> {
79        self.pool.get().map_err(OrientError::from)
80    }
81}
82pub struct ServerConnectionManager {
83    address: SocketAddr,
84}
85
86impl ManageConnection for ServerConnectionManager {
87    type Connection = Connection;
88    type Error = OrientError;
89
90    fn connect(&self) -> OrientResult<Connection> {
91        Connection::connect(&self.address)
92    }
93
94    fn is_valid(&self, _conn: &mut Connection) -> OrientResult<()> {
95        Ok(())
96    }
97
98    fn has_broken(&self, _conn: &mut Connection) -> bool {
99        false
100    }
101}