orientdb_client/sync/
client.rs

1use super::network::cluster::SyncConnection;
2use super::network::cluster::{Cluster, Server};
3use crate::common::protocol::messages::request::{
4    Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open,
5};
6use crate::common::protocol::messages::response;
7use crate::common::ConnectionOptions;
8use crate::sync::session::{OSession, SessionPool, SessionPoolManager};
9use crate::{DatabaseType, OrientResult};
10use std::net::SocketAddr;
11use std::net::ToSocketAddrs;
12use std::ops::Deref;
13use std::sync::Arc;
14
15#[derive(Clone)]
16pub struct OrientDB {
17    internal: OrientDBClientInternal,
18}
19
20impl OrientDB {
21    pub fn connect<T: Into<ConnectionOptions>>(options: T) -> OrientResult<OrientDB> {
22        let opts = options.into();
23
24        let addr: SocketAddr = format!("{}:{}", opts.host, opts.port)
25            .to_socket_addrs()?
26            .next()
27            .expect("Cannot parse socket address");
28
29        let cluster = Cluster::builder()
30            .add_server(addr)
31            .pool_max(opts.pool_size)
32            .build();
33
34        let internal = OrientDBClientInternal {
35            cluster: Arc::new(cluster),
36        };
37
38        Ok(OrientDB { internal })
39    }
40}
41
42#[derive(Clone)]
43pub struct OrientDBClientInternal {
44    cluster: Arc<Cluster>,
45}
46
47impl Deref for OrientDB {
48    type Target = OrientDBClientInternal;
49
50    fn deref(&self) -> &OrientDBClientInternal {
51        &self.internal
52    }
53}
54
55struct AdminSession {
56    session_id: i32,
57    token: Option<Vec<u8>>,
58}
59
60impl OrientDBClientInternal {
61    pub fn sessions(
62        &self,
63        db_name: &str,
64        user: &str,
65        password: &str,
66        size: Option<u32>,
67    ) -> OrientResult<SessionPool> {
68        let server = self.cluster.select();
69        SessionPoolManager::new(self.clone(), server, db_name, user, password).managed(size)
70    }
71    pub fn session(&self, db_name: &str, user: &str, password: &str) -> OrientResult<OSession> {
72        self._session(db_name, user, password, false)
73    }
74    pub(crate) fn _session(
75        &self,
76        db_name: &str,
77        user: &str,
78        password: &str,
79        pooled: bool,
80    ) -> OrientResult<OSession> {
81        let server = self.cluster.select();
82        self._server_session(server, db_name, user, password, pooled)
83    }
84    pub(crate) fn _server_session(
85        &self,
86        server: Arc<Server>,
87        db_name: &str,
88        user: &str,
89        password: &str,
90        pooled: bool,
91    ) -> OrientResult<OSession> {
92        let mut conn = server.connection()?;
93
94        let response: response::Open = conn
95            .send(Open::new(db_name, user, password).into())?
96            .payload();
97
98        Ok(OSession::new(
99            -1,
100            response.session_id,
101            response.token,
102            self.cluster.clone(),
103            server,
104            pooled,
105        ))
106    }
107
108    fn run_as_admin<R, W>(&self, user: &str, password: &str, work: W) -> OrientResult<R>
109    where
110        W: FnOnce(AdminSession, &mut SyncConnection) -> OrientResult<R>,
111    {
112        let pooled = self.cluster.connection()?;
113        let mut conn = pooled.0;
114        let response: response::Connect = conn.send(Connect::new(user, password).into())?.payload();
115        let admin = AdminSession {
116            session_id: response.session_id,
117            token: response.token.clone(),
118        };
119        let result = work(admin, &mut conn);
120
121        conn.send_and_forget(Close::new(response.session_id, response.token).into())?;
122
123        result
124    }
125    pub fn create_database(
126        &self,
127        db_name: &str,
128        user: &str,
129        password: &str,
130        db_mode: DatabaseType,
131    ) -> OrientResult<()> {
132        self.run_as_admin(user, password, move |session, conn| {
133            let _open: response::CreateDB = conn
134                .send(
135                    CreateDB::new(
136                        MsgHeader::new(session.session_id, session.token),
137                        db_name,
138                        db_mode,
139                    )
140                    .into(),
141                )?
142                .payload();
143            Ok(())
144        })
145    }
146
147    pub fn exist_database(
148        &self,
149        db_name: &str,
150        user: &str,
151        password: &str,
152        db_type: DatabaseType,
153    ) -> OrientResult<bool> {
154        self.run_as_admin(user, password, move |session, conn| {
155            let exist: response::ExistDB = conn
156                .send(
157                    ExistDB::new(
158                        MsgHeader::new(session.session_id, session.token),
159                        db_name,
160                        db_type,
161                    )
162                    .into(),
163                )?
164                .payload();
165            Ok(exist.exist)
166        })
167    }
168
169    pub fn drop_database(
170        &self,
171        db_name: &str,
172        user: &str,
173        password: &str,
174        db_type: DatabaseType,
175    ) -> OrientResult<()> {
176        self.run_as_admin(user, password, move |session, conn| {
177            let _drop: response::DropDB = conn
178                .send(
179                    DropDB::new(
180                        MsgHeader::new(session.session_id, session.token),
181                        db_name,
182                        db_type,
183                    )
184                    .into(),
185                )?
186                .payload();
187            Ok(())
188        })
189    }
190}