orientdb_client/sync/
session.rs

1use super::network::cluster::{Cluster, Server};
2
3use super::client::OrientDBClientInternal;
4use super::statement::Statement;
5use crate::common::protocol::messages::request::{Close, Query};
6use crate::common::protocol::messages::response;
7use crate::sync::types::resultset::{PagedResultSet, ResultSet};
8use crate::{OrientError, OrientResult};
9use r2d2::{ManageConnection, Pool, PooledConnection};
10use std::sync::Arc;
11
12pub struct OSession {
13    pub client_id: i32,
14    pub session_id: i32,
15    pub token: Option<Vec<u8>>,
16    // Unused for now. After can be used to switch server in case of failure
17    #[allow(unused_variables)]
18    #[allow(dead_code)]
19    cluster: Arc<Cluster>,
20    server: Arc<Server>,
21    pooled: bool,
22}
23
24impl OSession {
25    pub(crate) fn new(
26        client_id: i32,
27        session_id: i32,
28        token: Option<Vec<u8>>,
29        cluster: Arc<Cluster>,
30        server: Arc<Server>,
31        pooled: bool,
32    ) -> OSession {
33        OSession {
34            client_id,
35            session_id,
36            token,
37            cluster,
38            server,
39            pooled,
40        }
41    }
42
43    pub fn query<T: Into<String>>(&self, query: T) -> Statement {
44        Statement::new(self, query.into())
45    }
46
47    pub fn command<T: Into<String>>(&self, command: T) -> Statement {
48        Statement::new(self, command.into()).mode(0)
49    }
50
51    pub fn script_sql<T: Into<String>>(&self, script: T) -> Statement {
52        Statement::new(self, script.into())
53            .mode(2)
54            .language(String::from("SQL"))
55    }
56    pub fn script<T: Into<String>, S: Into<String>>(&self, script: T, language: S) -> Statement {
57        Statement::new(self, script.into())
58            .mode(2)
59            .language(language.into())
60    }
61
62    pub(crate) fn run(&self, query: Query) -> OrientResult<impl ResultSet> {
63        let mut conn = self.server.connection()?;
64        let page_size = query.page_size;
65        let q: response::Query = conn.send(query.into())?.payload();
66        Ok(PagedResultSet::new(
67            self.server.clone(),
68            q,
69            self.session_id,
70            self.token.clone(),
71            page_size,
72        ))
73    }
74    /// Close a session
75    pub fn close(self) -> OrientResult<()> {
76        if !self.pooled {
77            return self.force_close();
78        }
79        Ok(())
80    }
81
82    fn force_close(mut self) -> OrientResult<()> {
83        let mut conn = self.server.connection()?;
84        self.session_id = -1;
85        self.token = None;
86        conn.send_and_forget(Close::new(self.session_id, self.token).into())?;
87        Ok(())
88    }
89}
90
91pub struct SessionPoolManager {
92    db: String,
93    user: String,
94    password: String,
95    server: Arc<Server>,
96    client: OrientDBClientInternal,
97}
98
99impl SessionPoolManager {
100    pub(crate) fn new(
101        client: OrientDBClientInternal,
102        server: Arc<Server>,
103        db_name: &str,
104        user: &str,
105        password: &str,
106    ) -> SessionPoolManager {
107        SessionPoolManager {
108            db: String::from(db_name),
109            user: String::from(user),
110            password: String::from(password),
111            server,
112            client,
113        }
114    }
115
116    pub(crate) fn managed(self, size: Option<u32>) -> OrientResult<SessionPool> {
117        let pool = Pool::builder().max_size(size.unwrap_or(20)).build(self)?;
118
119        Ok(SessionPool(pool))
120    }
121}
122
123impl ManageConnection for SessionPoolManager {
124    type Connection = OSession;
125    type Error = OrientError;
126
127    fn connect(&self) -> OrientResult<OSession> {
128        self.client._server_session(
129            self.server.clone(),
130            &self.db,
131            &self.user,
132            &self.password,
133            true,
134        )
135    }
136
137    fn is_valid(&self, _conn: &mut OSession) -> OrientResult<()> {
138        Ok(())
139    }
140
141    fn has_broken(&self, _conn: &mut OSession) -> bool {
142        false
143    }
144}
145
146#[derive(Clone)]
147pub struct SessionPool(Pool<SessionPoolManager>);
148
149pub type SessionPooled = PooledConnection<SessionPoolManager>;
150
151impl SessionPool {
152    pub fn get(&self) -> OrientResult<SessionPooled> {
153        self.0.get().map_err(OrientError::from)
154    }
155
156    pub fn size(&self) -> u32 {
157        self.0.state().connections
158    }
159    pub fn idle(&self) -> u32 {
160        self.0.state().idle_connections
161    }
162}