orientdb_client/sync/
session.rs1use super::network::cluster::{Cluster, Server};
2
3use super::client::OrientDBClientInternal;
4use super::statement::Statement;
5use crate::common::protocol::messages::request::{Close, Query};
6use crate::common::protocol::messages::response;
7use crate::sync::types::resultset::{PagedResultSet, ResultSet};
8use crate::{OrientError, OrientResult};
9use r2d2::{ManageConnection, Pool, PooledConnection};
10use std::sync::Arc;
11
12pub struct OSession {
13 pub client_id: i32,
14 pub session_id: i32,
15 pub token: Option<Vec<u8>>,
16 #[allow(unused_variables)]
18 #[allow(dead_code)]
19 cluster: Arc<Cluster>,
20 server: Arc<Server>,
21 pooled: bool,
22}
23
24impl OSession {
25 pub(crate) fn new(
26 client_id: i32,
27 session_id: i32,
28 token: Option<Vec<u8>>,
29 cluster: Arc<Cluster>,
30 server: Arc<Server>,
31 pooled: bool,
32 ) -> OSession {
33 OSession {
34 client_id,
35 session_id,
36 token,
37 cluster,
38 server,
39 pooled,
40 }
41 }
42
43 pub fn query<T: Into<String>>(&self, query: T) -> Statement {
44 Statement::new(self, query.into())
45 }
46
47 pub fn command<T: Into<String>>(&self, command: T) -> Statement {
48 Statement::new(self, command.into()).mode(0)
49 }
50
51 pub fn script_sql<T: Into<String>>(&self, script: T) -> Statement {
52 Statement::new(self, script.into())
53 .mode(2)
54 .language(String::from("SQL"))
55 }
56 pub fn script<T: Into<String>, S: Into<String>>(&self, script: T, language: S) -> Statement {
57 Statement::new(self, script.into())
58 .mode(2)
59 .language(language.into())
60 }
61
62 pub(crate) fn run(&self, query: Query) -> OrientResult<impl ResultSet> {
63 let mut conn = self.server.connection()?;
64 let page_size = query.page_size;
65 let q: response::Query = conn.send(query.into())?.payload();
66 Ok(PagedResultSet::new(
67 self.server.clone(),
68 q,
69 self.session_id,
70 self.token.clone(),
71 page_size,
72 ))
73 }
74 pub fn close(self) -> OrientResult<()> {
76 if !self.pooled {
77 return self.force_close();
78 }
79 Ok(())
80 }
81
82 fn force_close(mut self) -> OrientResult<()> {
83 let mut conn = self.server.connection()?;
84 self.session_id = -1;
85 self.token = None;
86 conn.send_and_forget(Close::new(self.session_id, self.token).into())?;
87 Ok(())
88 }
89}
90
91pub struct SessionPoolManager {
92 db: String,
93 user: String,
94 password: String,
95 server: Arc<Server>,
96 client: OrientDBClientInternal,
97}
98
99impl SessionPoolManager {
100 pub(crate) fn new(
101 client: OrientDBClientInternal,
102 server: Arc<Server>,
103 db_name: &str,
104 user: &str,
105 password: &str,
106 ) -> SessionPoolManager {
107 SessionPoolManager {
108 db: String::from(db_name),
109 user: String::from(user),
110 password: String::from(password),
111 server,
112 client,
113 }
114 }
115
116 pub(crate) fn managed(self, size: Option<u32>) -> OrientResult<SessionPool> {
117 let pool = Pool::builder().max_size(size.unwrap_or(20)).build(self)?;
118
119 Ok(SessionPool(pool))
120 }
121}
122
123impl ManageConnection for SessionPoolManager {
124 type Connection = OSession;
125 type Error = OrientError;
126
127 fn connect(&self) -> OrientResult<OSession> {
128 self.client._server_session(
129 self.server.clone(),
130 &self.db,
131 &self.user,
132 &self.password,
133 true,
134 )
135 }
136
137 fn is_valid(&self, _conn: &mut OSession) -> OrientResult<()> {
138 Ok(())
139 }
140
141 fn has_broken(&self, _conn: &mut OSession) -> bool {
142 false
143 }
144}
145
146#[derive(Clone)]
147pub struct SessionPool(Pool<SessionPoolManager>);
148
149pub type SessionPooled = PooledConnection<SessionPoolManager>;
150
151impl SessionPool {
152 pub fn get(&self) -> OrientResult<SessionPooled> {
153 self.0.get().map_err(OrientError::from)
154 }
155
156 pub fn size(&self) -> u32 {
157 self.0.state().connections
158 }
159 pub fn idle(&self) -> u32 {
160 self.0.state().idle_connections
161 }
162}