1use core::fmt;
21use std::sync::{Arc, Mutex, RwLock};
22
23use crossbeam::atomic::AtomicCell;
24use log::warn;
25
26use crate::{
27 common::{error::ConnectionError, info::SessionInfo, Result, SessionType, TransactionType},
28 connection::ServerConnection,
29 Database, Options, Transaction,
30};
31
32type Callback = Box<dyn FnMut() + Send>;
33
34pub struct Session {
36 database: Database,
37 server_session: RwLock<ServerSession>,
38 session_type: SessionType,
39 is_open: Arc<AtomicCell<bool>>,
40 on_close: Arc<Mutex<Vec<Callback>>>,
41 on_reopen: Mutex<Vec<Callback>>,
42}
43
44#[derive(Clone, Debug)]
45struct ServerSession {
46 connection: ServerConnection,
47 info: SessionInfo,
48}
49
50impl fmt::Debug for Session {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_struct("Session")
53 .field("database", &self.database)
54 .field("session_type", &self.session_type)
55 .field("server_session", &self.server_session)
56 .field("is_open", &self.is_open)
57 .finish()
58 }
59}
60
61impl Drop for Session {
62 fn drop(&mut self) {
63 if let Err(err) = self.force_close() {
64 warn!("Error encountered while closing session: {}", err);
65 }
66 }
67}
68
69impl Session {
70 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
73 pub async fn new(database: Database, session_type: SessionType) -> Result<Self> {
74 Self::new_with_options(database, session_type, Options::new()).await
75 }
76
77 #[cfg_attr(feature = "sync", doc = "Session::new_with_options(database, session_type, options);")]
89 #[cfg_attr(not(feature = "sync"), doc = "Session::new_with_options(database, session_type, options).await;")]
90 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
92 pub async fn new_with_options(database: Database, session_type: SessionType, options: Options) -> Result<Self> {
93 let server_session = database
94 .run_failsafe(|database| async move {
95 let session_info =
96 database.connection().open_session(database.name().to_owned(), session_type, options).await?;
97 Ok(ServerSession { connection: database.connection().clone(), info: session_info })
98 })
99 .await?;
100
101 let is_open = Arc::new(AtomicCell::new(true));
102 let on_close: Arc<Mutex<Vec<Callback>>> = Arc::new(Mutex::new(vec![Box::new({
103 let is_open = is_open.clone();
104 move || is_open.store(false)
105 })]));
106 register_persistent_on_close(&server_session.info, on_close.clone());
107
108 Ok(Self {
109 database,
110 session_type,
111 server_session: RwLock::new(server_session),
112 is_open,
113 on_close,
114 on_reopen: Mutex::default(),
115 })
116 }
117
118 pub fn database_name(&self) -> &str {
126 self.database.name()
127 }
128
129 pub fn type_(&self) -> SessionType {
131 self.session_type
132 }
133
134 pub fn is_open(&self) -> bool {
142 self.is_open.load()
143 }
144
145 pub fn force_close(&self) -> Result {
153 if self.is_open.compare_exchange(true, false).is_ok() {
154 let server_session = self.server_session.write().unwrap();
155 server_session.connection.close_session(server_session.info.session_id.clone())?;
156 }
157 Ok(())
158 }
159
160 pub fn on_close(&self, callback: impl FnMut() + Send + 'static) {
172 self.on_close.lock().unwrap().push(Box::new(callback));
173 }
174
175 fn on_server_session_close(&self, callback: impl FnOnce() + Send + 'static) {
176 let server_session = self.server_session.write().unwrap();
177 server_session.info.on_close_register_sink.send(Box::new(callback)).ok();
178 }
179
180 pub fn on_reopen(&self, callback: impl FnMut() + Send + 'static) {
194 self.on_reopen.lock().unwrap().push(Box::new(callback));
195 }
196
197 fn reopened(&self) {
198 self.on_reopen.lock().unwrap().iter_mut().for_each(|callback| (callback)());
199 let server_session = self.server_session.write().unwrap();
200 register_persistent_on_close(&server_session.info, self.on_close.clone());
201 }
202
203 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
206 pub async fn transaction(&self, transaction_type: TransactionType) -> Result<Transaction<'_>> {
207 self.transaction_with_options(transaction_type, Options::new()).await
208 }
209
210 #[cfg_attr(feature = "sync", doc = "session.transaction_with_options(transaction_type, options);")]
221 #[cfg_attr(not(feature = "sync"), doc = "session.transaction_with_options(transaction_type, options).await;")]
222 #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
224 pub async fn transaction_with_options(
225 &self,
226 transaction_type: TransactionType,
227 options: Options,
228 ) -> Result<Transaction<'_>> {
229 if !self.is_open() {
230 return Err(ConnectionError::SessionIsClosed.into());
231 }
232
233 let ServerSession { connection: server_connection, info: SessionInfo { session_id, network_latency, .. } } =
234 self.server_session.read().unwrap().clone();
235
236 let (transaction_stream, transaction_shutdown_sink) = match server_connection
237 .open_transaction(session_id.clone(), transaction_type, options, network_latency)
238 .await
239 {
240 Ok((transaction_stream, transaction_shutdown_sink)) => (transaction_stream, transaction_shutdown_sink),
241 Err(_err) => {
242 self.is_open.store(false);
243 server_connection.close_session(session_id).ok();
244
245 let (server_session, (transaction_stream, transaction_shutdown_sink)) = self
246 .database
247 .run_failsafe(|database| {
248 let session_type = self.session_type;
249 async move {
250 let connection = database.connection();
251 let database_name = database.name().to_owned();
252 let session_info = connection.open_session(database_name, session_type, options).await?;
253 Ok((
254 ServerSession { connection: connection.clone(), info: session_info.clone() },
255 connection
256 .open_transaction(
257 session_info.session_id,
258 transaction_type,
259 options,
260 session_info.network_latency,
261 )
262 .await?,
263 ))
264 }
265 })
266 .await?;
267 *self.server_session.write().unwrap() = server_session;
268 self.is_open.store(true);
269 self.reopened();
270 (transaction_stream, transaction_shutdown_sink)
271 }
272 };
273
274 self.on_server_session_close(move || {
275 transaction_shutdown_sink.send(()).ok();
276 });
277
278 Ok(Transaction::new(transaction_stream))
279 }
280}
281
282fn register_persistent_on_close(server_session_info: &SessionInfo, callbacks: Arc<Mutex<Vec<Callback>>>) {
283 server_session_info
284 .on_close_register_sink
285 .send(Box::new(move || callbacks.lock().unwrap().iter_mut().for_each(|callback| (callback)())))
286 .ok();
287}