odbc_iter/
query.rs

1use error_context::prelude::*;
2use log::{debug, log_enabled, trace};
3use odbc::{
4    Allocated, ColumnDescriptor, Connection as OdbcConnection, DiagnosticRecord, Executed,
5    NoResult, OdbcType, Prepared, ResultSetState, Statement,
6};
7use lazy_static::lazy_static;
8use std::convert::TryFrom;
9use std::error::Error;
10use std::fmt;
11use std::fmt::Debug;
12use std::sync::Mutex;
13
14use crate::result_set::{DataAccessError, ResultSet, ResultSetError};
15use crate::row::{Settings, Configuration, DefaultConfiguration, ColumnType, UnsupportedSqlDataType, TryFromRow};
16use crate::{Odbc, OdbcError};
17use crate::stats::{self, ConnectionOpenGuard};
18
19/// Errors related to execution of queries.
20///
21/// `OdbcError` and `DataAccessError` can be converted into `QueryError`.
22#[derive(Debug)]
23#[allow(clippy::large_enum_variant)]
24pub enum QueryError {
25    OdbcError(OdbcError),
26    BindError(DiagnosticRecord),
27    UnsupportedSqlDataType(UnsupportedSqlDataType),
28    ResultSetError(ResultSetError),
29    DataAccessError(DataAccessError),
30}
31
32impl fmt::Display for QueryError {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        match self {
35            QueryError::OdbcError(err) => write!(f, "{}", err),
36            QueryError::BindError(_) => {
37                write!(f, "ODBC call failed while binding parameter to statement")
38            }
39            QueryError::UnsupportedSqlDataType(_) => {
40                write!(f, "query schema has unsupported data type")
41            }
42            QueryError::ResultSetError(_) => write!(f, "failed to create result set for query"),
43            QueryError::DataAccessError(_) => write!(f, "failed to access result data"),
44        }
45    }
46}
47
48impl Error for QueryError {
49    fn source(&self) -> Option<&(dyn Error + 'static)> {
50        match self {
51            QueryError::OdbcError(err) => err.source(),
52            QueryError::BindError(err) => Some(err),
53            QueryError::UnsupportedSqlDataType(err) => Some(err),
54            QueryError::ResultSetError(err) => Some(err),
55            QueryError::DataAccessError(err) => Some(err),
56        }
57    }
58}
59
60impl From<ErrorContext<DiagnosticRecord, &'static str>> for QueryError {
61    fn from(err: ErrorContext<DiagnosticRecord, &'static str>) -> QueryError {
62        QueryError::OdbcError(err.into())
63    }
64}
65
66impl From<BindError> for QueryError {
67    fn from(err: BindError) -> QueryError {
68        QueryError::BindError(err.0)
69    }
70}
71
72impl From<OdbcError> for QueryError {
73    fn from(err: OdbcError) -> QueryError {
74        QueryError::OdbcError(err)
75    }
76}
77
78impl From<UnsupportedSqlDataType> for QueryError {
79    fn from(err: UnsupportedSqlDataType) -> QueryError {
80        QueryError::UnsupportedSqlDataType(err)
81    }
82}
83
84impl From<ResultSetError> for QueryError {
85    fn from(err: ResultSetError) -> QueryError {
86        QueryError::ResultSetError(err)
87    }
88}
89
90impl From<DataAccessError> for QueryError {
91    fn from(err: DataAccessError) -> QueryError {
92        QueryError::DataAccessError(err)
93    }
94}
95
96/// Error that can happen when binding values to parametrized queries.
97#[derive(Debug)]
98pub struct BindError(DiagnosticRecord);
99
100impl fmt::Display for BindError {
101    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102        write!(f, "ODBC call failed while while binding parameter")
103    }
104}
105
106impl Error for BindError {
107    fn source(&self) -> Option<&(dyn Error + 'static)> {
108        Some(&self.0)
109    }
110}
111
112impl From<DiagnosticRecord> for BindError {
113    fn from(err: DiagnosticRecord) -> BindError {
114        BindError(err)
115    }
116}
117
118/// Controls binding of parametrized query values.
119pub struct Binder<'h, 't, S> {
120    statement: Statement<'h, 't, S, NoResult>,
121    index: u16,
122}
123
124impl<S> fmt::Debug for Binder<'_, '_, S> {
125    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
126        fmt.debug_struct("Binder")
127            .field("index", &self.index)
128            .finish()
129    }
130}
131
132impl<'h, 't, S> Binder<'h, 't, S> {
133    pub fn bind<'new_t, T>(self, value: &'new_t T) -> Result<Binder<'h, 'new_t, S>, BindError>
134    where
135        T: OdbcType<'new_t> + Debug,
136        't: 'new_t,
137    {
138        let index = self.index + 1;
139        if log_enabled!(::log::Level::Trace) {
140            trace!("Parameter {}: {:?}", index, value);
141        }
142        let statement = self.statement.bind_parameter(index, value)?;
143
144        Ok(Binder { statement, index })
145    }
146
147    fn into_inner(self) -> Statement<'h, 't, S, NoResult> {
148        self.statement
149    }
150}
151
152impl<'h, 't, S> From<Statement<'h, 'h, S, NoResult>> for Binder<'h, 'h, S> {
153    fn from(statement: Statement<'h, 'h, S, NoResult>) -> Binder<'h, 'h, S> {
154        Binder {
155            statement,
156            index: 0,
157        }
158    }
159}
160
161/// ODBC prepared statement.
162pub struct PreparedStatement<'h>(Statement<'h, 'h, odbc::Prepared, odbc::NoResult>);
163
164impl<'h> fmt::Debug for PreparedStatement<'h> {
165    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
166        let mut d = f.debug_struct("PreparedStatement");
167
168        let schema = (1..=self.0.num_result_cols().map_err(|_| std::fmt::Error)?)
169            .map(|i| self.0.describe_col(i as u16))
170            .collect::<Result<Vec<ColumnDescriptor>, _>>()
171            .map_err(|_| std::fmt::Error)?;
172
173        d.field("odbc_schema", &schema);
174        d.finish()
175    }
176}
177
178impl<'h> PreparedStatement<'h> {
179    pub(crate) fn from_statement(
180        statement: Statement<'h, 'h, odbc::Prepared, odbc::NoResult>,
181    ) -> PreparedStatement<'h> {
182        PreparedStatement(statement)
183    }
184
185    /// Query schema information deduced from prepared statement SQL text.
186    pub fn schema(&self) -> Result<Vec<ColumnType>, QueryError> {
187        (1..=self.columns()?)
188            .map(|i| {
189                self.0
190                    .describe_col(i as u16)
191                    .wrap_error_while("getting column description")
192                    .map_err(QueryError::from)
193                    .and_then(|cd| ColumnType::try_from(cd).map_err(Into::into))
194            })
195            .collect::<Result<_, _>>()
196    }
197
198    /// Query number of columns that would be returned by execution of this prepared statement.
199    pub fn columns(&self) -> Result<i16, OdbcError> {
200        Ok(self
201            .0
202            .num_result_cols()
203            .wrap_error_while("getting number of columns in prepared statement")?)
204    }
205}
206
207/// Database connection.
208pub struct Connection {
209    connection: OdbcConnection<'static>,
210    settings: Settings,
211    _stats_guard: ConnectionOpenGuard,
212}
213
214/// Assuming drivers support sending Connection between threads.
215unsafe impl Send for Connection {}
216
217lazy_static! {
218    static ref CONNECT_MUTEX: Mutex<()> = Mutex::new(());
219}
220
221impl fmt::Debug for Connection {
222    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
223        fmt.debug_struct("Connection")
224            .field("settings", &self.settings)
225            .finish()
226    }
227}
228
229impl Connection {
230    /// Connect to database using connection string with default configuration options.
231    /// This implementation will synchronize driver connect calls.
232    pub fn new(odbc: &'static Odbc, connection_string: &str) -> Result<Connection, OdbcError> {
233        Self::with_settings(odbc, connection_string, Default::default())
234    }
235
236    /// Connect to database using connection string with default configuration options.
237    /// Assume that driver connect call is thread safe.
238    pub unsafe fn new_concurrent(odbc: &'static Odbc, connection_string: &str) -> Result<Connection, OdbcError> {
239        Self::with_settings_concurrent(odbc, connection_string, Default::default())
240    }
241
242    /// Connect to database using connection string with configuration options.
243    /// This implementation will synchronize driver connect calls.
244    pub fn with_settings(
245        odbc: &'static Odbc,
246        connection_string: &str,
247        settings: Settings,
248    ) -> Result<Connection, OdbcError> {
249        unsafe {
250            let guard = CONNECT_MUTEX.lock().expect("Connection Mutex is poisoned!");
251            let res = Self::with_settings_concurrent(odbc, connection_string, settings);
252            drop(guard);
253            res
254        }
255    }
256
257    /// Connect to database using connection string with configuration options.
258    /// Assume that driver connect call is thread safe.
259    pub unsafe fn with_settings_concurrent(
260        odbc: &'static Odbc,
261        connection_string: &str,
262        settings: Settings,
263    ) -> Result<Connection, OdbcError> {
264        odbc.environment
265            .connect_with_connection_string(connection_string)
266            .wrap_error_while("connecting to database")
267            .map_err(Into::into)
268            .map(|connection| {
269                Connection {
270                    connection,
271                    settings,
272                    _stats_guard: ConnectionOpenGuard::new(),
273                }
274            })
275    }
276}
277
278/// Statically ensures that `Connection` can only be used after `ResultSet` was consumed to avoid runtime
279/// errors.
280///
281/// Operations on `Connection` are only allowed after `ResultSet` was dropped.
282/// Allocated `PreparedStatement` objects reference `Connection` directly so `Handle` can be still used to
283/// query or allocate more `PreparedStatement` objects.
284#[derive(Debug)]
285pub struct Handle<'c, C: Configuration = DefaultConfiguration> {
286    connection: &'c Connection,
287    configuration: C,
288}
289
290impl<'c: 'c> Connection {
291    pub fn handle(&'c mut self) -> Handle<'c, DefaultConfiguration> {
292        Handle {
293            connection: self,
294            configuration: DefaultConfiguration,
295        }
296    }
297
298    pub fn handle_with_configuration<C: Configuration>(&'c mut self, configuration: C) -> Handle<'c, C> {
299        Handle {
300            connection: self,
301            configuration,
302        }
303    }
304}
305
306impl<'h, 'c: 'h, C: Configuration> Handle<'c, C> {
307    pub fn with_configuration<CNew: Configuration>(&mut self, configuration: CNew) -> Handle<'c, CNew> {
308        Handle {
309            connection: self.connection,
310            configuration,
311        }
312    }
313
314    fn statement(&'h self) -> Result<Statement<'c, 'c, Allocated, NoResult>, OdbcError> {
315        Statement::with_parent(&self.connection.connection)
316            .wrap_error_while("pairing statement with connection")
317            .map_err(Into::into)
318    }
319
320    /// Query list of tables from given catalog.
321    /// Optionally result can be filtered by table name and table type.
322    pub fn tables<'i, V>(
323        &'h mut self,
324        catalog: &'i str,
325        schema: Option<&'i str>,
326        table: Option<&'i str>,
327        table_type: Option<&'i str>,
328    ) -> Result<ResultSet<'h, 'c, V, Executed, C>, QueryError>
329    where
330        V: TryFromRow<C>,
331    {
332        debug!("Getting ODBC tables");
333        let statement = self.statement()?;
334
335        let (result_set, stats_guard): (ResultSetState<'c, 'c, Allocated>, _) = stats::query_execution(move || {
336            statement
337                .tables_str(
338                    catalog,
339                    schema.unwrap_or(""),
340                    table.unwrap_or(""),
341                    table_type.unwrap_or(""),
342                )
343                .wrap_error_while("executing direct statement")
344                .map(ResultSetState::Data)
345        })?;
346
347        Ok(ResultSet::from_result(
348            self,
349            result_set,
350            stats_guard,
351            &self.connection.settings,
352            self.configuration.clone(),
353        )?)
354    }
355
356    /// Prepare statement for fast execution and parametrization.
357    /// For one-off queries it is more efficient to use `query()` function.
358    pub fn prepare(&'h mut self, query: &str) -> Result<PreparedStatement<'c>, OdbcError> {
359        debug!("Preparing ODBC query: {}", &query);
360
361        let statement = stats::query_preparing(|| -> Result<_, OdbcError> {
362            Ok(self
363                .statement()?
364                .prepare(query)
365                .wrap_error_while("preparing query")?)
366        })?;
367
368        Ok(PreparedStatement(statement))
369    }
370
371    /// Execute one-off query.
372    pub fn query<V>(&'h mut self, query: &str) -> Result<ResultSet<'h, 'c, V, Executed, C>, QueryError>
373    where
374        V: TryFromRow<C>,
375    {
376        self.query_with_parameters(query, Ok)
377    }
378
379    /// Execute one-off query with parameters.
380    /// This creates prepared statement and binds values to it before execution.
381    pub fn query_with_parameters<'t, V, F>(
382        &'h mut self,
383        query: &str,
384        bind: F,
385    ) -> Result<ResultSet<'h, 'c, V, Executed, C>, QueryError>
386    where
387        V: TryFromRow<C>,
388        F: FnOnce(Binder<'c, 'c, Allocated>) -> Result<Binder<'c, 't, Allocated>, BindError>,
389    {
390        debug!("Direct ODBC query: {}", &query);
391
392        let statement = stats::query_preparing(|| -> Result<_, QueryError> {
393            //TODO: this take a long time potentially; can I reuse one for all direct queries?
394            Ok(bind(self.statement()?.into())?.into_inner())
395        })?;
396
397        let (result_set, stats_guard) = stats::query_execution(move || {
398            statement
399                .exec_direct(query)
400                .wrap_error_while("executing direct statement")
401        })?;
402
403        Ok(ResultSet::from_result(
404            self,
405            result_set,
406            stats_guard,
407            &self.connection.settings,
408            self.configuration.clone(),
409        )?)
410    }
411
412    /// Execute prepared statement without parameters.
413    pub fn execute<V>(
414        &'h mut self,
415        statement: PreparedStatement<'c>,
416    ) -> Result<ResultSet<'h, 'c, V, Prepared, C>, QueryError>
417    where
418        V: TryFromRow<C>,
419    {
420        self.execute_with_parameters(statement, Ok)
421    }
422
423    /// Bind parameters and execute prepared statement.
424    pub fn execute_with_parameters<'t, V, F>(
425        &'h mut self,
426        statement: PreparedStatement<'c>,
427        bind: F,
428    ) -> Result<ResultSet<'h, 'c, V, Prepared, C>, QueryError>
429    where
430        V: TryFromRow<C>,
431        F: FnOnce(Binder<'c, 'c, Prepared>) -> Result<Binder<'c, 't, Prepared>, BindError>,
432    {
433        let statement = stats::query_preparing(|| -> Result<_, QueryError> {
434            Ok(bind(statement.0.into())?.into_inner())
435        })?;
436
437        let (result_set, stats_guard) = stats::query_execution(move || {
438            statement
439                .execute()
440                .wrap_error_while("executing statement")
441        })?;
442
443        Ok(ResultSet::from_result(
444            self,
445            result_set,
446            stats_guard,
447            &self.connection.settings,
448            self.configuration.clone(),
449        )?)
450    }
451
452    /// Calls "START TRANSACTION"
453    pub fn start_transaction(&mut self) -> Result<(), QueryError> {
454        self.with_configuration(DefaultConfiguration).query::<()>("START TRANSACTION")?.no_result().unwrap();
455        Ok(())
456    }
457
458    /// Calls "COMMIT"
459    pub fn commit(&mut self) -> Result<(), QueryError> {
460        self.with_configuration(DefaultConfiguration).query::<()>("COMMIT")?.no_result().unwrap();
461        Ok(())
462    }
463
464    /// Calls "ROLLBACK"
465    pub fn rollback(&mut self) -> Result<(), QueryError> {
466        self.with_configuration(DefaultConfiguration).query::<()>("ROLLBACK")?.no_result().unwrap();
467        Ok(())
468    }
469
470    /// Call function in transaction.
471    /// If function returns Err the transaction will be rolled back otherwise committed.
472    pub fn in_transaction<O, E>(
473        &mut self,
474        f: impl FnOnce(&mut Handle<'c, C>) -> Result<O, E>,
475    ) -> Result<Result<O, E>, QueryError> {
476        self.start_transaction()?;
477        Ok(match f(self) {
478            ok @ Ok(_) => {
479                self.commit()?;
480                ok
481            }
482            err @ Err(_) => {
483                self.rollback()?;
484                err
485            }
486        })
487    }
488
489    /// Commit current transaction, run function and start new transaction.
490    /// This is useful when you need to do changes with auto-commit (for example change schema) while in open transaction already.
491    pub fn outside_of_transaction<O>(
492        &mut self,
493        f: impl FnOnce(&mut Handle<'c, C>) -> O,
494    ) -> Result<O, QueryError> {
495        self.commit()?;
496        let ret = f(self);
497        self.start_transaction()?;
498        Ok(ret)
499    }
500}