1use super::network::cluster::SyncConnection;
2use super::network::cluster::{Cluster, Server};
3use crate::common::protocol::messages::request::{
4 Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open,
5};
6use crate::common::protocol::messages::response;
7use crate::common::ConnectionOptions;
8use crate::sync::session::{OSession, SessionPool, SessionPoolManager};
9use crate::{DatabaseType, OrientResult};
10use std::net::SocketAddr;
11use std::net::ToSocketAddrs;
12use std::ops::Deref;
13use std::sync::Arc;
14
15#[derive(Clone)]
16pub struct OrientDB {
17 internal: OrientDBClientInternal,
18}
19
20impl OrientDB {
21 pub fn connect<T: Into<ConnectionOptions>>(options: T) -> OrientResult<OrientDB> {
22 let opts = options.into();
23
24 let addr: SocketAddr = format!("{}:{}", opts.host, opts.port)
25 .to_socket_addrs()?
26 .next()
27 .expect("Cannot parse socket address");
28
29 let cluster = Cluster::builder()
30 .add_server(addr)
31 .pool_max(opts.pool_size)
32 .build();
33
34 let internal = OrientDBClientInternal {
35 cluster: Arc::new(cluster),
36 };
37
38 Ok(OrientDB { internal })
39 }
40}
41
42#[derive(Clone)]
43pub struct OrientDBClientInternal {
44 cluster: Arc<Cluster>,
45}
46
47impl Deref for OrientDB {
48 type Target = OrientDBClientInternal;
49
50 fn deref(&self) -> &OrientDBClientInternal {
51 &self.internal
52 }
53}
54
55struct AdminSession {
56 session_id: i32,
57 token: Option<Vec<u8>>,
58}
59
60impl OrientDBClientInternal {
61 pub fn sessions(
62 &self,
63 db_name: &str,
64 user: &str,
65 password: &str,
66 size: Option<u32>,
67 ) -> OrientResult<SessionPool> {
68 let server = self.cluster.select();
69 SessionPoolManager::new(self.clone(), server, db_name, user, password).managed(size)
70 }
71 pub fn session(&self, db_name: &str, user: &str, password: &str) -> OrientResult<OSession> {
72 self._session(db_name, user, password, false)
73 }
74 pub(crate) fn _session(
75 &self,
76 db_name: &str,
77 user: &str,
78 password: &str,
79 pooled: bool,
80 ) -> OrientResult<OSession> {
81 let server = self.cluster.select();
82 self._server_session(server, db_name, user, password, pooled)
83 }
84 pub(crate) fn _server_session(
85 &self,
86 server: Arc<Server>,
87 db_name: &str,
88 user: &str,
89 password: &str,
90 pooled: bool,
91 ) -> OrientResult<OSession> {
92 let mut conn = server.connection()?;
93
94 let response: response::Open = conn
95 .send(Open::new(db_name, user, password).into())?
96 .payload();
97
98 Ok(OSession::new(
99 -1,
100 response.session_id,
101 response.token,
102 self.cluster.clone(),
103 server,
104 pooled,
105 ))
106 }
107
108 fn run_as_admin<R, W>(&self, user: &str, password: &str, work: W) -> OrientResult<R>
109 where
110 W: FnOnce(AdminSession, &mut SyncConnection) -> OrientResult<R>,
111 {
112 let pooled = self.cluster.connection()?;
113 let mut conn = pooled.0;
114 let response: response::Connect = conn.send(Connect::new(user, password).into())?.payload();
115 let admin = AdminSession {
116 session_id: response.session_id,
117 token: response.token.clone(),
118 };
119 let result = work(admin, &mut conn);
120
121 conn.send_and_forget(Close::new(response.session_id, response.token).into())?;
122
123 result
124 }
125 pub fn create_database(
126 &self,
127 db_name: &str,
128 user: &str,
129 password: &str,
130 db_mode: DatabaseType,
131 ) -> OrientResult<()> {
132 self.run_as_admin(user, password, move |session, conn| {
133 let _open: response::CreateDB = conn
134 .send(
135 CreateDB::new(
136 MsgHeader::new(session.session_id, session.token),
137 db_name,
138 db_mode,
139 )
140 .into(),
141 )?
142 .payload();
143 Ok(())
144 })
145 }
146
147 pub fn exist_database(
148 &self,
149 db_name: &str,
150 user: &str,
151 password: &str,
152 db_type: DatabaseType,
153 ) -> OrientResult<bool> {
154 self.run_as_admin(user, password, move |session, conn| {
155 let exist: response::ExistDB = conn
156 .send(
157 ExistDB::new(
158 MsgHeader::new(session.session_id, session.token),
159 db_name,
160 db_type,
161 )
162 .into(),
163 )?
164 .payload();
165 Ok(exist.exist)
166 })
167 }
168
169 pub fn drop_database(
170 &self,
171 db_name: &str,
172 user: &str,
173 password: &str,
174 db_type: DatabaseType,
175 ) -> OrientResult<()> {
176 self.run_as_admin(user, password, move |session, conn| {
177 let _drop: response::DropDB = conn
178 .send(
179 DropDB::new(
180 MsgHeader::new(session.session_id, session.token),
181 db_name,
182 db_type,
183 )
184 .into(),
185 )?
186 .payload();
187 Ok(())
188 })
189 }
190}