Skip to main content

typedb_driver/database/
session.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20use core::fmt;
21use std::sync::{Arc, Mutex, RwLock};
22
23use crossbeam::atomic::AtomicCell;
24use log::warn;
25
26use crate::{
27    common::{error::ConnectionError, info::SessionInfo, Result, SessionType, TransactionType},
28    connection::ServerConnection,
29    Database, Options, Transaction,
30};
31
32type Callback = Box<dyn FnMut() + Send>;
33
34/// A session with a TypeDB database.
35pub struct Session {
36    database: Database,
37    server_session: RwLock<ServerSession>,
38    session_type: SessionType,
39    is_open: Arc<AtomicCell<bool>>,
40    on_close: Arc<Mutex<Vec<Callback>>>,
41    on_reopen: Mutex<Vec<Callback>>,
42}
43
44#[derive(Clone, Debug)]
45struct ServerSession {
46    connection: ServerConnection,
47    info: SessionInfo,
48}
49
50impl fmt::Debug for Session {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_struct("Session")
53            .field("database", &self.database)
54            .field("session_type", &self.session_type)
55            .field("server_session", &self.server_session)
56            .field("is_open", &self.is_open)
57            .finish()
58    }
59}
60
61impl Drop for Session {
62    fn drop(&mut self) {
63        if let Err(err) = self.force_close() {
64            warn!("Error encountered while closing session: {}", err);
65        }
66    }
67}
68
69impl Session {
70    /// Opens a communication tunnel (session) to the given database with default options.
71    /// See [`Session::new_with_options`]
72    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
73    pub async fn new(database: Database, session_type: SessionType) -> Result<Self> {
74        Self::new_with_options(database, session_type, Options::new()).await
75    }
76
77    /// Opens a communication tunnel (session) to the given database on the running TypeDB server.
78    ///
79    /// # Arguments
80    ///
81    /// * `database` -- The database with which the session connects
82    /// * `session_type` -- The type of session to be created (DATA or SCHEMA)
83    /// * `options` -- `TypeDBOptions` for the session
84    ///
85    /// # Examples
86    ///
87    /// ```rust
88    #[cfg_attr(feature = "sync", doc = "Session::new_with_options(database, session_type, options);")]
89    #[cfg_attr(not(feature = "sync"), doc = "Session::new_with_options(database, session_type, options).await;")]
90    /// ```
91    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
92    pub async fn new_with_options(database: Database, session_type: SessionType, options: Options) -> Result<Self> {
93        let server_session = database
94            .run_failsafe(|database| async move {
95                let session_info =
96                    database.connection().open_session(database.name().to_owned(), session_type, options).await?;
97                Ok(ServerSession { connection: database.connection().clone(), info: session_info })
98            })
99            .await?;
100
101        let is_open = Arc::new(AtomicCell::new(true));
102        let on_close: Arc<Mutex<Vec<Callback>>> = Arc::new(Mutex::new(vec![Box::new({
103            let is_open = is_open.clone();
104            move || is_open.store(false)
105        })]));
106        register_persistent_on_close(&server_session.info, on_close.clone());
107
108        Ok(Self {
109            database,
110            session_type,
111            server_session: RwLock::new(server_session),
112            is_open,
113            on_close,
114            on_reopen: Mutex::default(),
115        })
116    }
117
118    /// Returns the name of the database of the session.
119    ///
120    /// # Examples
121    ///
122    /// ```rust
123    /// session.database_name();
124    /// ```
125    pub fn database_name(&self) -> &str {
126        self.database.name()
127    }
128
129    /// The current session’s type (SCHEMA or DATA)
130    pub fn type_(&self) -> SessionType {
131        self.session_type
132    }
133
134    /// Checks whether this session is open.
135    ///
136    /// # Examples
137    ///
138    /// ```rust
139    /// session.is_open();
140    /// ```
141    pub fn is_open(&self) -> bool {
142        self.is_open.load()
143    }
144
145    /// Closes the session. Before opening a new session, the session currently open should first be closed.
146    ///
147    /// # Examples
148    ///
149    /// ```rust
150    /// session.force_close();
151    /// ```
152    pub fn force_close(&self) -> Result {
153        if self.is_open.compare_exchange(true, false).is_ok() {
154            let server_session = self.server_session.write().unwrap();
155            server_session.connection.close_session(server_session.info.session_id.clone())?;
156        }
157        Ok(())
158    }
159
160    /// Registers a callback function which will be executed when this session is closed.
161    ///
162    /// # Arguments
163    ///
164    /// * `function` -- The callback function.
165    ///
166    /// # Examples
167    ///
168    /// ```rust
169    /// session.on_close(function);
170    /// ```
171    pub fn on_close(&self, callback: impl FnMut() + Send + 'static) {
172        self.on_close.lock().unwrap().push(Box::new(callback));
173    }
174
175    fn on_server_session_close(&self, callback: impl FnOnce() + Send + 'static) {
176        let server_session = self.server_session.write().unwrap();
177        server_session.info.on_close_register_sink.send(Box::new(callback)).ok();
178    }
179
180    /// Registers a callback function which will be executed when this session is reopened.
181    /// A session may be closed if it times out, or loses the connection to the database.
182    /// In such situations, the session is reopened automatically when opening a new transaction.
183    ///
184    /// # Arguments
185    ///
186    /// * `function` -- The callback function.
187    ///
188    /// # Examples
189    ///
190    /// ```rust
191    /// session.on_reopen(function);
192    /// ```
193    pub fn on_reopen(&self, callback: impl FnMut() + Send + 'static) {
194        self.on_reopen.lock().unwrap().push(Box::new(callback));
195    }
196
197    fn reopened(&self) {
198        self.on_reopen.lock().unwrap().iter_mut().for_each(|callback| (callback)());
199        let server_session = self.server_session.write().unwrap();
200        register_persistent_on_close(&server_session.info, self.on_close.clone());
201    }
202
203    /// Opens a transaction to perform read or write queries on the database connected to the session.
204    /// See [`Session::transaction_with_options`]
205    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
206    pub async fn transaction(&self, transaction_type: TransactionType) -> Result<Transaction<'_>> {
207        self.transaction_with_options(transaction_type, Options::new()).await
208    }
209
210    /// Opens a transaction to perform read or write queries on the database connected to the session.
211    ///
212    /// # Arguments
213    ///
214    /// * `transaction_type` -- The type of transaction to be created (READ or WRITE)
215    /// * `options` -- Options for the session
216    ///
217    /// # Examples
218    ///
219    /// ```rust
220    #[cfg_attr(feature = "sync", doc = "session.transaction_with_options(transaction_type, options);")]
221    #[cfg_attr(not(feature = "sync"), doc = "session.transaction_with_options(transaction_type, options).await;")]
222    /// ```
223    #[cfg_attr(feature = "sync", maybe_async::must_be_sync)]
224    pub async fn transaction_with_options(
225        &self,
226        transaction_type: TransactionType,
227        options: Options,
228    ) -> Result<Transaction<'_>> {
229        if !self.is_open() {
230            return Err(ConnectionError::SessionIsClosed.into());
231        }
232
233        let ServerSession { connection: server_connection, info: SessionInfo { session_id, network_latency, .. } } =
234            self.server_session.read().unwrap().clone();
235
236        let (transaction_stream, transaction_shutdown_sink) = match server_connection
237            .open_transaction(session_id.clone(), transaction_type, options, network_latency)
238            .await
239        {
240            Ok((transaction_stream, transaction_shutdown_sink)) => (transaction_stream, transaction_shutdown_sink),
241            Err(_err) => {
242                self.is_open.store(false);
243                server_connection.close_session(session_id).ok();
244
245                let (server_session, (transaction_stream, transaction_shutdown_sink)) = self
246                    .database
247                    .run_failsafe(|database| {
248                        let session_type = self.session_type;
249                        async move {
250                            let connection = database.connection();
251                            let database_name = database.name().to_owned();
252                            let session_info = connection.open_session(database_name, session_type, options).await?;
253                            Ok((
254                                ServerSession { connection: connection.clone(), info: session_info.clone() },
255                                connection
256                                    .open_transaction(
257                                        session_info.session_id,
258                                        transaction_type,
259                                        options,
260                                        session_info.network_latency,
261                                    )
262                                    .await?,
263                            ))
264                        }
265                    })
266                    .await?;
267                *self.server_session.write().unwrap() = server_session;
268                self.is_open.store(true);
269                self.reopened();
270                (transaction_stream, transaction_shutdown_sink)
271            }
272        };
273
274        self.on_server_session_close(move || {
275            transaction_shutdown_sink.send(()).ok();
276        });
277
278        Ok(Transaction::new(transaction_stream))
279    }
280}
281
282fn register_persistent_on_close(server_session_info: &SessionInfo, callbacks: Arc<Mutex<Vec<Callback>>>) {
283    server_session_info
284        .on_close_register_sink
285        .send(Box::new(move || callbacks.lock().unwrap().iter_mut().for_each(|callback| (callback)())))
286        .ok();
287}