1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use super::conn::Connection;
use r2d2::{ManageConnection, Pool, PooledConnection};
use crate::{OrientError, OrientResult};
use std::net::SocketAddr;
use std::sync::Arc;
pub type SyncConnection = PooledConnection<ServerConnectionManager>;
pub struct Cluster {
servers: Vec<Arc<Server>>,
}
impl Cluster {
pub(crate) fn builder() -> ClusterBuilder {
ClusterBuilder::default()
}
pub(crate) fn connection(&self) -> OrientResult<(SyncConnection, Arc<Server>)> {
let conn = self.servers[0].connection()?;
Ok((conn, self.servers[0].clone()))
}
pub(crate) fn select(&self) -> Arc<Server> {
self.servers[0].clone()
}
}
pub struct ClusterBuilder {
pool_max: u32,
servers: Vec<SocketAddr>,
}
impl ClusterBuilder {
pub fn build(self) -> Cluster {
let pool_max = self.pool_max;
let servers = self
.servers
.into_iter()
.map(|s| {
Arc::new(Server::new(s, pool_max).unwrap())
})
.collect();
Cluster { servers }
}
pub fn pool_max(mut self, pool_max: u32) -> Self {
self.pool_max = pool_max;
self
}
pub fn add_server<T: Into<SocketAddr>>(mut self, address: T) -> Self {
self.servers.push(address.into());
self
}
}
impl Default for ClusterBuilder {
fn default() -> ClusterBuilder {
ClusterBuilder {
pool_max: 20,
servers: vec![],
}
}
}
pub struct Server {
pool: Pool<ServerConnectionManager>,
}
impl Server {
fn new(address: SocketAddr, pool_max: u32) -> OrientResult<Server> {
let manager = ServerConnectionManager { address };
let pool = Pool::builder().max_size(pool_max).build(manager)?;
Ok(Server { pool })
}
pub(crate) fn connection(&self) -> OrientResult<PooledConnection<ServerConnectionManager>> {
self.pool.get().map_err(OrientError::from)
}
}
pub struct ServerConnectionManager {
address: SocketAddr,
}
impl ManageConnection for ServerConnectionManager {
type Connection = Connection;
type Error = OrientError;
fn connect(&self) -> OrientResult<Connection> {
Connection::connect(&self.address)
}
fn is_valid(&self, _conn: &mut Connection) -> OrientResult<()> {
Ok(())
}
fn has_broken(&self, _conn: &mut Connection) -> bool {
false
}
}