rsfbclient/connection/
mod.rs

1//!
2//! Rust Firebird Client
3//!
4//! Connection functions
5//!
6use rsfbclient_core::{
7    Dialect, FbError, FirebirdClient, FirebirdClientDbEvents, FirebirdClientDbOps, FromRow,
8    IntoParams, TransactionConfiguration,
9};
10use std::{marker, mem};
11
12use crate::{
13    query::Queryable, statement::StatementData, transaction::TransactionData, Execute, Transaction,
14};
15use stmt_cache::{StmtCache, StmtCacheData};
16
17pub mod builders {
18
19    #![allow(unused_imports)]
20    use super::{
21        super::{charset, Charset},
22        Connection, ConnectionConfiguration, Dialect, FbError, FirebirdClient,
23        FirebirdClientFactory,
24    };
25
26    #[cfg(feature = "native_client")]
27    mod builder_native;
28    #[cfg(feature = "native_client")]
29    pub use builder_native::*;
30
31    #[cfg(feature = "pure_rust")]
32    mod builder_pure_rust;
33    #[cfg(feature = "pure_rust")]
34    pub use builder_pure_rust::*;
35}
36
37pub(crate) mod conn_string;
38pub(crate) mod stmt_cache;
39
40pub(crate) mod simple;
41pub use simple::SimpleConnection;
42
43/// A generic factory for creating multiple preconfigured instances of a particular client implementation
44/// Intended mainly for use by connection pool
45pub trait FirebirdClientFactory {
46    type C: FirebirdClient;
47
48    /// Construct a new instance of a client
49    fn new_instance(&self) -> Result<Self::C, FbError>;
50
51    /// Pull the connection configuration details out as a borrow
52    fn get_conn_conf(
53        &self,
54    ) -> &ConnectionConfiguration<<Self::C as FirebirdClientDbOps>::AttachmentConfig>;
55}
56
57/// Generic aggregate of configuration data for firebird db Connections
58/// The data required for forming connections is partly client-implementation-dependent
59#[derive(Clone)]
60pub struct ConnectionConfiguration<A> {
61    attachment_conf: A,
62    dialect: Dialect,
63    no_db_triggers: bool,
64    stmt_cache_size: usize,
65    transaction_conf: TransactionConfiguration,
66}
67
68impl<A: Default> Default for ConnectionConfiguration<A> {
69    fn default() -> Self {
70        Self {
71            attachment_conf: Default::default(),
72            dialect: Dialect::D3,
73            stmt_cache_size: 20,
74            transaction_conf: TransactionConfiguration::default(),
75            no_db_triggers: false,
76        }
77    }
78}
79
80/// A connection to a firebird database
81pub struct Connection<C: FirebirdClient> {
82    /// Database handler
83    pub(crate) handle: <C as FirebirdClientDbOps>::DbHandle,
84
85    /// Firebird dialect for the statements
86    pub(crate) dialect: Dialect,
87
88    /// Cache for the prepared statements
89    pub(crate) stmt_cache: StmtCache<StatementData<C>>,
90
91    /// Default transaction to be used when no explicit
92    /// transaction is used
93    pub(crate) def_tr: Option<TransactionData<C>>,
94
95    /// If true, methods in `Queryable` and `Executable` should not
96    /// automatically commit and rollback
97    pub(crate) in_transaction: bool,
98
99    /// Firebird client
100    pub(crate) cli: C,
101
102    /// Default configuration for new transactions
103    pub(crate) def_confs_tr: TransactionConfiguration,
104}
105
106impl<C: FirebirdClient> Connection<C> {
107    /// Open the client connection.
108    pub fn open(
109        mut cli: C,
110        conf: &ConnectionConfiguration<C::AttachmentConfig>,
111    ) -> Result<Connection<C>, FbError> {
112        let handle =
113            cli.attach_database(&conf.attachment_conf, conf.dialect, conf.no_db_triggers)?;
114        let stmt_cache = StmtCache::new(conf.stmt_cache_size);
115
116        Ok(Connection {
117            handle,
118            dialect: conf.dialect,
119            stmt_cache,
120            def_tr: None,
121            in_transaction: false,
122            cli,
123            def_confs_tr: conf.transaction_conf,
124        })
125    }
126
127    /// Create the database and start the client connection.
128    pub fn create_database(
129        mut cli: C,
130        conf: &ConnectionConfiguration<C::AttachmentConfig>,
131        page_size: Option<u32>,
132    ) -> Result<Connection<C>, FbError> {
133        let handle = cli.create_database(&conf.attachment_conf, page_size, conf.dialect)?;
134        let stmt_cache = StmtCache::new(conf.stmt_cache_size);
135
136        Ok(Connection {
137            handle,
138            dialect: conf.dialect,
139            stmt_cache,
140            def_tr: None,
141            in_transaction: false,
142            cli,
143            def_confs_tr: conf.transaction_conf,
144        })
145    }
146
147    /// Drop the current database
148    pub fn drop_database(mut self) -> Result<(), FbError> {
149        self.cli.drop_database(&mut self.handle)?;
150
151        Ok(())
152    }
153
154    /// Close the current connection.
155    pub fn close(mut self) -> Result<(), FbError> {
156        let res = self.cleanup_and_detach();
157        mem::forget(self);
158        res
159    }
160
161    // Cleans up statement cache and releases the database handle
162    fn cleanup_and_detach(&mut self) -> Result<(), FbError> {
163        StmtCache::close_all(self);
164
165        // Drop the default transaction
166        if let Some(mut tr) = self.def_tr.take() {
167            tr.rollback(self).ok();
168        }
169
170        self.cli.detach_database(&mut self.handle)?;
171
172        Ok(())
173    }
174
175    /// Run a closure with a transaction, if the closure returns an error
176    /// and the default transaction is not active, the transaction will rollback, else it will be committed
177    pub fn with_transaction<T, F>(&mut self, closure: F) -> Result<T, FbError>
178    where
179        F: FnOnce(&mut Transaction<C>) -> Result<T, FbError>,
180    {
181        self.with_transaction_config(self.def_confs_tr, closure)
182    }
183
184    /// Run a closure with a transaction, if the closure returns an error
185    /// and the default transaction is not active, the transaction will rollback, else it will be committed
186    pub fn with_transaction_config<T, F>(
187        &mut self,
188        confs: TransactionConfiguration,
189        closure: F,
190    ) -> Result<T, FbError>
191    where
192        F: FnOnce(&mut Transaction<C>) -> Result<T, FbError>,
193    {
194        let in_transaction = self.in_transaction;
195
196        let mut tr = if let Some(tr) = self.def_tr.take() {
197            tr.into_transaction(self)
198        } else {
199            Transaction::new(self, confs)?
200        };
201
202        let res = closure(&mut tr);
203
204        if !in_transaction {
205            if res.is_ok() {
206                tr.commit_retaining()?;
207            } else {
208                tr.rollback_retaining()?;
209            }
210        }
211
212        let tr = TransactionData::from_transaction(tr);
213
214        if let Some(mut tr) = self.def_tr.replace(tr) {
215            // Should never happen, but just to be sure
216            tr.rollback(self).ok();
217        }
218
219        res
220    }
221
222    /// Run a closure with the default transaction, no rollback or commit will be automatically performed
223    /// after the closure returns. The next call to this function will use the same transaction
224    /// if it was not closed with `commit_retaining` or `rollback_retaining`
225    fn use_transaction<T, F>(
226        &mut self,
227        confs: TransactionConfiguration,
228        closure: F,
229    ) -> Result<T, FbError>
230    where
231        F: FnOnce(&mut Transaction<C>) -> Result<T, FbError>,
232    {
233        let mut tr = if let Some(tr) = self.def_tr.take() {
234            tr.into_transaction(self)
235        } else {
236            Transaction::new(self, confs)?
237        };
238
239        let res = closure(&mut tr);
240
241        let tr = TransactionData::from_transaction(tr);
242
243        if let Some(mut tr) = self.def_tr.replace(tr) {
244            // Should never happen, but just to be sure
245            tr.rollback(self).ok();
246        }
247
248        res
249    }
250
251    /// Begins a new transaction, and instructs all the `query` and `execute` methods
252    /// performed in the [`Connection`] type to not automatically commit and rollback
253    /// until [`commit`][`Connection::commit`] or [`rollback`][`Connection::rollback`] are called
254    pub fn begin_transaction(&mut self) -> Result<(), FbError> {
255        self.begin_transaction_config(self.def_confs_tr)
256    }
257
258    /// Begins a new transaction with a new transaction configuration, and instructs
259    /// all the `query` and `execute` methods performed in the [`Connection`] type to
260    /// not automatically commit and rollback until [`commit`][`Connection::commit`]
261    /// or [`rollback`][`Connection::rollback`] are called
262    pub fn begin_transaction_config(
263        &mut self,
264        custom_confs: TransactionConfiguration,
265    ) -> Result<(), FbError> {
266        self.use_transaction(custom_confs, |_| Ok(()))?;
267
268        self.in_transaction = true;
269
270        Ok(())
271    }
272
273    /// Commit the default transaction
274    pub fn commit(&mut self) -> Result<(), FbError> {
275        self.in_transaction = false;
276
277        self.use_transaction(self.def_confs_tr, |tr| tr.commit_retaining())
278    }
279
280    /// Rollback the default transaction
281    pub fn rollback(&mut self) -> Result<(), FbError> {
282        self.in_transaction = false;
283
284        self.use_transaction(self.def_confs_tr, |tr| tr.rollback_retaining())
285    }
286}
287
288impl<C: FirebirdClient> Connection<C>
289where
290    C: FirebirdClientDbEvents,
291{
292    /// Wait for an event to be posted on database
293    pub fn wait_for_event(&mut self, name: String) -> Result<(), FbError> {
294        self.cli.wait_for_event(&mut self.handle, name)?;
295
296        Ok(())
297    }
298}
299
300impl<C: FirebirdClient> Drop for Connection<C> {
301    fn drop(&mut self) {
302        // Ignore the possible error value
303        let _ = self.cleanup_and_detach();
304    }
305}
306
307/// Variant of the `StatementIter` borrows `Connection` and uses the statement cache
308pub struct StmtIter<'a, R, C: FirebirdClient> {
309    /// Statement cache data. Wrapped in option to allow taking the value to send back to the cache
310    stmt_cache_data: Option<StmtCacheData<StatementData<C>>>,
311
312    conn: &'a mut Connection<C>,
313
314    _marker: marker::PhantomData<R>,
315}
316
317impl<R, C> Drop for StmtIter<'_, R, C>
318where
319    C: FirebirdClient,
320{
321    fn drop(&mut self) {
322        // Close the cursor
323        self.stmt_cache_data
324            .as_mut()
325            .unwrap()
326            .stmt
327            .close_cursor(self.conn)
328            .ok();
329
330        // Send the statement back to the cache
331        StmtCache::insert_and_close(self.conn, self.stmt_cache_data.take().unwrap()).ok();
332
333        if !self.conn.in_transaction {
334            // Commit the transaction
335            self.conn.commit().ok();
336        }
337    }
338}
339
340impl<R, C> Iterator for StmtIter<'_, R, C>
341where
342    R: FromRow,
343    C: FirebirdClient,
344{
345    type Item = Result<R, FbError>;
346
347    fn next(&mut self) -> Option<Self::Item> {
348        let stmt_cache_data = self.stmt_cache_data.as_mut().unwrap();
349
350        self.conn
351            .use_transaction(self.conn.def_confs_tr, move |tr| {
352                Ok(stmt_cache_data
353                    .stmt
354                    .fetch(tr.conn, &mut tr.data)
355                    .and_then(|row| row.map(FromRow::try_from).transpose())
356                    .transpose())
357            })
358            .unwrap_or_default()
359    }
360}
361
362impl<C> Queryable for Connection<C>
363where
364    C: FirebirdClient,
365{
366    fn query_iter<'a, P, R>(
367        &'a mut self,
368        sql: &str,
369        params: P,
370    ) -> Result<Box<dyn Iterator<Item = Result<R, FbError>> + 'a>, FbError>
371    where
372        P: IntoParams,
373        R: FromRow + 'static,
374    {
375        let stmt_cache_data = self.use_transaction(self.def_confs_tr, |tr| {
376            let params = params.to_params();
377
378            // Get a statement from the cache
379            let mut stmt_cache_data = StmtCache::get_or_prepare(tr, sql, params.named())?;
380
381            match stmt_cache_data.stmt.query(tr.conn, &mut tr.data, params) {
382                Ok(_) => Ok(stmt_cache_data),
383                Err(e) => {
384                    // Return the statement to the cache
385                    StmtCache::insert_and_close(tr.conn, stmt_cache_data)?;
386
387                    if !tr.conn.in_transaction {
388                        tr.rollback_retaining().ok();
389                    }
390
391                    Err(e)
392                }
393            }
394        })?;
395
396        let iter = StmtIter {
397            stmt_cache_data: Some(stmt_cache_data),
398            conn: self,
399            _marker: Default::default(),
400        };
401
402        Ok(Box::new(iter))
403    }
404}
405
406impl<C> Execute for Connection<C>
407where
408    C: FirebirdClient,
409{
410    fn execute<P>(&mut self, sql: &str, params: P) -> Result<usize, FbError>
411    where
412        P: IntoParams,
413    {
414        let params = params.to_params();
415
416        self.with_transaction(|tr| {
417            // Get a statement from the cache
418            let mut stmt_cache_data = StmtCache::get_or_prepare(tr, sql, params.named())?;
419
420            // Do not return now in case of error, because we need to return the statement to the cache
421            let res = stmt_cache_data.stmt.execute(tr.conn, &mut tr.data, params);
422
423            // Return the statement to the cache
424            StmtCache::insert_and_close(tr.conn, stmt_cache_data)?;
425
426            res
427        })
428    }
429
430    fn execute_returnable<P, R>(&mut self, sql: &str, params: P) -> Result<R, FbError>
431    where
432        P: IntoParams,
433        R: FromRow + 'static,
434    {
435        let params = params.to_params();
436
437        self.with_transaction(|tr| {
438            // Get a statement from the cache
439            let mut stmt_cache_data = StmtCache::get_or_prepare(tr, sql, params.named())?;
440
441            // Do not return now in case of error, because we need to return the statement to the cache
442            let res = stmt_cache_data.stmt.execute2(tr.conn, &mut tr.data, params);
443
444            // Return the statement to the cache
445            StmtCache::insert_and_close(tr.conn, stmt_cache_data)?;
446
447            let f_res = FromRow::try_from(res?)?;
448
449            Ok(f_res)
450        })
451    }
452}
453
454#[cfg(test)]
455mk_tests_default! {
456    use crate::*;
457
458    #[test]
459    fn remote_connection() -> Result<(), FbError> {
460        let conn = cbuilder().connect()?;
461
462        conn.close().expect("error closing the connection");
463
464        Ok(())
465    }
466
467    #[test]
468    fn query_iter() -> Result<(), FbError> {
469        let mut conn = cbuilder().connect()?;
470
471        let mut rows = 0;
472
473        for row in conn
474            .query_iter("SELECT -3 FROM RDB$DATABASE WHERE 1 = ?", (1,))?
475        {
476            let (v,): (i32,) = row?;
477
478            assert_eq!(v, -3);
479
480            rows += 1;
481        }
482
483        assert_eq!(rows, 1);
484
485        Ok(())
486    }
487}