orientdb_client/sync/network/
cluster.rs1use super::conn::Connection;
2
3use r2d2::{ManageConnection, Pool, PooledConnection};
4
5use crate::{OrientError, OrientResult};
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9pub type SyncConnection = PooledConnection<ServerConnectionManager>;
10
11pub struct Cluster {
12 servers: Vec<Arc<Server>>,
13}
14
15impl Cluster {
16 pub(crate) fn builder() -> ClusterBuilder {
17 ClusterBuilder::default()
18 }
19
20 pub(crate) fn connection(&self) -> OrientResult<(SyncConnection, Arc<Server>)> {
21 let conn = self.servers[0].connection()?;
22 Ok((conn, self.servers[0].clone()))
23 }
24
25 pub(crate) fn select(&self) -> Arc<Server> {
26 self.servers[0].clone()
27 }
28}
29pub struct ClusterBuilder {
30 pool_max: u32,
31 servers: Vec<SocketAddr>,
32}
33
34impl ClusterBuilder {
35 pub fn build(self) -> Cluster {
36 let pool_max = self.pool_max;
37 let servers = self
38 .servers
39 .into_iter()
40 .map(|s| {
41 Arc::new(Server::new(s, pool_max).unwrap())
43 })
44 .collect();
45 Cluster { servers }
46 }
47
48 pub fn pool_max(mut self, pool_max: u32) -> Self {
49 self.pool_max = pool_max;
50 self
51 }
52
53 pub fn add_server<T: Into<SocketAddr>>(mut self, address: T) -> Self {
54 self.servers.push(address.into());
55 self
56 }
57}
58impl Default for ClusterBuilder {
59 fn default() -> ClusterBuilder {
60 ClusterBuilder {
61 pool_max: 20,
62 servers: vec![],
63 }
64 }
65}
66pub struct Server {
67 pool: Pool<ServerConnectionManager>,
68}
69
70impl Server {
71 fn new(address: SocketAddr, pool_max: u32) -> OrientResult<Server> {
72 let manager = ServerConnectionManager { address };
73 let pool = Pool::builder().max_size(pool_max).build(manager)?;
74
75 Ok(Server { pool })
76 }
77
78 pub(crate) fn connection(&self) -> OrientResult<PooledConnection<ServerConnectionManager>> {
79 self.pool.get().map_err(OrientError::from)
80 }
81}
82pub struct ServerConnectionManager {
83 address: SocketAddr,
84}
85
86impl ManageConnection for ServerConnectionManager {
87 type Connection = Connection;
88 type Error = OrientError;
89
90 fn connect(&self) -> OrientResult<Connection> {
91 Connection::connect(&self.address)
92 }
93
94 fn is_valid(&self, _conn: &mut Connection) -> OrientResult<()> {
95 Ok(())
96 }
97
98 fn has_broken(&self, _conn: &mut Connection) -> bool {
99 false
100 }
101}