1use error_context::prelude::*;
2use log::{debug, log_enabled, trace};
3use odbc::{
4 Allocated, ColumnDescriptor, Connection as OdbcConnection, DiagnosticRecord, Executed,
5 NoResult, OdbcType, Prepared, ResultSetState, Statement,
6};
7use lazy_static::lazy_static;
8use std::convert::TryFrom;
9use std::error::Error;
10use std::fmt;
11use std::fmt::Debug;
12use std::sync::Mutex;
13
14use crate::result_set::{DataAccessError, ResultSet, ResultSetError};
15use crate::row::{Settings, Configuration, DefaultConfiguration, ColumnType, UnsupportedSqlDataType, TryFromRow};
16use crate::{Odbc, OdbcError};
17use crate::stats::{self, ConnectionOpenGuard};
18
19#[derive(Debug)]
23#[allow(clippy::large_enum_variant)]
24pub enum QueryError {
25 OdbcError(OdbcError),
26 BindError(DiagnosticRecord),
27 UnsupportedSqlDataType(UnsupportedSqlDataType),
28 ResultSetError(ResultSetError),
29 DataAccessError(DataAccessError),
30}
31
32impl fmt::Display for QueryError {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 match self {
35 QueryError::OdbcError(err) => write!(f, "{}", err),
36 QueryError::BindError(_) => {
37 write!(f, "ODBC call failed while binding parameter to statement")
38 }
39 QueryError::UnsupportedSqlDataType(_) => {
40 write!(f, "query schema has unsupported data type")
41 }
42 QueryError::ResultSetError(_) => write!(f, "failed to create result set for query"),
43 QueryError::DataAccessError(_) => write!(f, "failed to access result data"),
44 }
45 }
46}
47
48impl Error for QueryError {
49 fn source(&self) -> Option<&(dyn Error + 'static)> {
50 match self {
51 QueryError::OdbcError(err) => err.source(),
52 QueryError::BindError(err) => Some(err),
53 QueryError::UnsupportedSqlDataType(err) => Some(err),
54 QueryError::ResultSetError(err) => Some(err),
55 QueryError::DataAccessError(err) => Some(err),
56 }
57 }
58}
59
60impl From<ErrorContext<DiagnosticRecord, &'static str>> for QueryError {
61 fn from(err: ErrorContext<DiagnosticRecord, &'static str>) -> QueryError {
62 QueryError::OdbcError(err.into())
63 }
64}
65
66impl From<BindError> for QueryError {
67 fn from(err: BindError) -> QueryError {
68 QueryError::BindError(err.0)
69 }
70}
71
72impl From<OdbcError> for QueryError {
73 fn from(err: OdbcError) -> QueryError {
74 QueryError::OdbcError(err)
75 }
76}
77
78impl From<UnsupportedSqlDataType> for QueryError {
79 fn from(err: UnsupportedSqlDataType) -> QueryError {
80 QueryError::UnsupportedSqlDataType(err)
81 }
82}
83
84impl From<ResultSetError> for QueryError {
85 fn from(err: ResultSetError) -> QueryError {
86 QueryError::ResultSetError(err)
87 }
88}
89
90impl From<DataAccessError> for QueryError {
91 fn from(err: DataAccessError) -> QueryError {
92 QueryError::DataAccessError(err)
93 }
94}
95
96#[derive(Debug)]
98pub struct BindError(DiagnosticRecord);
99
100impl fmt::Display for BindError {
101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102 write!(f, "ODBC call failed while while binding parameter")
103 }
104}
105
106impl Error for BindError {
107 fn source(&self) -> Option<&(dyn Error + 'static)> {
108 Some(&self.0)
109 }
110}
111
112impl From<DiagnosticRecord> for BindError {
113 fn from(err: DiagnosticRecord) -> BindError {
114 BindError(err)
115 }
116}
117
118pub struct Binder<'h, 't, S> {
120 statement: Statement<'h, 't, S, NoResult>,
121 index: u16,
122}
123
124impl<S> fmt::Debug for Binder<'_, '_, S> {
125 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
126 fmt.debug_struct("Binder")
127 .field("index", &self.index)
128 .finish()
129 }
130}
131
132impl<'h, 't, S> Binder<'h, 't, S> {
133 pub fn bind<'new_t, T>(self, value: &'new_t T) -> Result<Binder<'h, 'new_t, S>, BindError>
134 where
135 T: OdbcType<'new_t> + Debug,
136 't: 'new_t,
137 {
138 let index = self.index + 1;
139 if log_enabled!(::log::Level::Trace) {
140 trace!("Parameter {}: {:?}", index, value);
141 }
142 let statement = self.statement.bind_parameter(index, value)?;
143
144 Ok(Binder { statement, index })
145 }
146
147 fn into_inner(self) -> Statement<'h, 't, S, NoResult> {
148 self.statement
149 }
150}
151
152impl<'h, 't, S> From<Statement<'h, 'h, S, NoResult>> for Binder<'h, 'h, S> {
153 fn from(statement: Statement<'h, 'h, S, NoResult>) -> Binder<'h, 'h, S> {
154 Binder {
155 statement,
156 index: 0,
157 }
158 }
159}
160
161pub struct PreparedStatement<'h>(Statement<'h, 'h, odbc::Prepared, odbc::NoResult>);
163
164impl<'h> fmt::Debug for PreparedStatement<'h> {
165 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
166 let mut d = f.debug_struct("PreparedStatement");
167
168 let schema = (1..=self.0.num_result_cols().map_err(|_| std::fmt::Error)?)
169 .map(|i| self.0.describe_col(i as u16))
170 .collect::<Result<Vec<ColumnDescriptor>, _>>()
171 .map_err(|_| std::fmt::Error)?;
172
173 d.field("odbc_schema", &schema);
174 d.finish()
175 }
176}
177
178impl<'h> PreparedStatement<'h> {
179 pub(crate) fn from_statement(
180 statement: Statement<'h, 'h, odbc::Prepared, odbc::NoResult>,
181 ) -> PreparedStatement<'h> {
182 PreparedStatement(statement)
183 }
184
185 pub fn schema(&self) -> Result<Vec<ColumnType>, QueryError> {
187 (1..=self.columns()?)
188 .map(|i| {
189 self.0
190 .describe_col(i as u16)
191 .wrap_error_while("getting column description")
192 .map_err(QueryError::from)
193 .and_then(|cd| ColumnType::try_from(cd).map_err(Into::into))
194 })
195 .collect::<Result<_, _>>()
196 }
197
198 pub fn columns(&self) -> Result<i16, OdbcError> {
200 Ok(self
201 .0
202 .num_result_cols()
203 .wrap_error_while("getting number of columns in prepared statement")?)
204 }
205}
206
207pub struct Connection {
209 connection: OdbcConnection<'static>,
210 settings: Settings,
211 _stats_guard: ConnectionOpenGuard,
212}
213
214unsafe impl Send for Connection {}
216
217lazy_static! {
218 static ref CONNECT_MUTEX: Mutex<()> = Mutex::new(());
219}
220
221impl fmt::Debug for Connection {
222 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
223 fmt.debug_struct("Connection")
224 .field("settings", &self.settings)
225 .finish()
226 }
227}
228
229impl Connection {
230 pub fn new(odbc: &'static Odbc, connection_string: &str) -> Result<Connection, OdbcError> {
233 Self::with_settings(odbc, connection_string, Default::default())
234 }
235
236 pub unsafe fn new_concurrent(odbc: &'static Odbc, connection_string: &str) -> Result<Connection, OdbcError> {
239 Self::with_settings_concurrent(odbc, connection_string, Default::default())
240 }
241
242 pub fn with_settings(
245 odbc: &'static Odbc,
246 connection_string: &str,
247 settings: Settings,
248 ) -> Result<Connection, OdbcError> {
249 unsafe {
250 let guard = CONNECT_MUTEX.lock().expect("Connection Mutex is poisoned!");
251 let res = Self::with_settings_concurrent(odbc, connection_string, settings);
252 drop(guard);
253 res
254 }
255 }
256
257 pub unsafe fn with_settings_concurrent(
260 odbc: &'static Odbc,
261 connection_string: &str,
262 settings: Settings,
263 ) -> Result<Connection, OdbcError> {
264 odbc.environment
265 .connect_with_connection_string(connection_string)
266 .wrap_error_while("connecting to database")
267 .map_err(Into::into)
268 .map(|connection| {
269 Connection {
270 connection,
271 settings,
272 _stats_guard: ConnectionOpenGuard::new(),
273 }
274 })
275 }
276}
277
278#[derive(Debug)]
285pub struct Handle<'c, C: Configuration = DefaultConfiguration> {
286 connection: &'c Connection,
287 configuration: C,
288}
289
290impl<'c: 'c> Connection {
291 pub fn handle(&'c mut self) -> Handle<'c, DefaultConfiguration> {
292 Handle {
293 connection: self,
294 configuration: DefaultConfiguration,
295 }
296 }
297
298 pub fn handle_with_configuration<C: Configuration>(&'c mut self, configuration: C) -> Handle<'c, C> {
299 Handle {
300 connection: self,
301 configuration,
302 }
303 }
304}
305
306impl<'h, 'c: 'h, C: Configuration> Handle<'c, C> {
307 pub fn with_configuration<CNew: Configuration>(&mut self, configuration: CNew) -> Handle<'c, CNew> {
308 Handle {
309 connection: self.connection,
310 configuration,
311 }
312 }
313
314 fn statement(&'h self) -> Result<Statement<'c, 'c, Allocated, NoResult>, OdbcError> {
315 Statement::with_parent(&self.connection.connection)
316 .wrap_error_while("pairing statement with connection")
317 .map_err(Into::into)
318 }
319
320 pub fn tables<'i, V>(
323 &'h mut self,
324 catalog: &'i str,
325 schema: Option<&'i str>,
326 table: Option<&'i str>,
327 table_type: Option<&'i str>,
328 ) -> Result<ResultSet<'h, 'c, V, Executed, C>, QueryError>
329 where
330 V: TryFromRow<C>,
331 {
332 debug!("Getting ODBC tables");
333 let statement = self.statement()?;
334
335 let (result_set, stats_guard): (ResultSetState<'c, 'c, Allocated>, _) = stats::query_execution(move || {
336 statement
337 .tables_str(
338 catalog,
339 schema.unwrap_or(""),
340 table.unwrap_or(""),
341 table_type.unwrap_or(""),
342 )
343 .wrap_error_while("executing direct statement")
344 .map(ResultSetState::Data)
345 })?;
346
347 Ok(ResultSet::from_result(
348 self,
349 result_set,
350 stats_guard,
351 &self.connection.settings,
352 self.configuration.clone(),
353 )?)
354 }
355
356 pub fn prepare(&'h mut self, query: &str) -> Result<PreparedStatement<'c>, OdbcError> {
359 debug!("Preparing ODBC query: {}", &query);
360
361 let statement = stats::query_preparing(|| -> Result<_, OdbcError> {
362 Ok(self
363 .statement()?
364 .prepare(query)
365 .wrap_error_while("preparing query")?)
366 })?;
367
368 Ok(PreparedStatement(statement))
369 }
370
371 pub fn query<V>(&'h mut self, query: &str) -> Result<ResultSet<'h, 'c, V, Executed, C>, QueryError>
373 where
374 V: TryFromRow<C>,
375 {
376 self.query_with_parameters(query, Ok)
377 }
378
379 pub fn query_with_parameters<'t, V, F>(
382 &'h mut self,
383 query: &str,
384 bind: F,
385 ) -> Result<ResultSet<'h, 'c, V, Executed, C>, QueryError>
386 where
387 V: TryFromRow<C>,
388 F: FnOnce(Binder<'c, 'c, Allocated>) -> Result<Binder<'c, 't, Allocated>, BindError>,
389 {
390 debug!("Direct ODBC query: {}", &query);
391
392 let statement = stats::query_preparing(|| -> Result<_, QueryError> {
393 Ok(bind(self.statement()?.into())?.into_inner())
395 })?;
396
397 let (result_set, stats_guard) = stats::query_execution(move || {
398 statement
399 .exec_direct(query)
400 .wrap_error_while("executing direct statement")
401 })?;
402
403 Ok(ResultSet::from_result(
404 self,
405 result_set,
406 stats_guard,
407 &self.connection.settings,
408 self.configuration.clone(),
409 )?)
410 }
411
412 pub fn execute<V>(
414 &'h mut self,
415 statement: PreparedStatement<'c>,
416 ) -> Result<ResultSet<'h, 'c, V, Prepared, C>, QueryError>
417 where
418 V: TryFromRow<C>,
419 {
420 self.execute_with_parameters(statement, Ok)
421 }
422
423 pub fn execute_with_parameters<'t, V, F>(
425 &'h mut self,
426 statement: PreparedStatement<'c>,
427 bind: F,
428 ) -> Result<ResultSet<'h, 'c, V, Prepared, C>, QueryError>
429 where
430 V: TryFromRow<C>,
431 F: FnOnce(Binder<'c, 'c, Prepared>) -> Result<Binder<'c, 't, Prepared>, BindError>,
432 {
433 let statement = stats::query_preparing(|| -> Result<_, QueryError> {
434 Ok(bind(statement.0.into())?.into_inner())
435 })?;
436
437 let (result_set, stats_guard) = stats::query_execution(move || {
438 statement
439 .execute()
440 .wrap_error_while("executing statement")
441 })?;
442
443 Ok(ResultSet::from_result(
444 self,
445 result_set,
446 stats_guard,
447 &self.connection.settings,
448 self.configuration.clone(),
449 )?)
450 }
451
452 pub fn start_transaction(&mut self) -> Result<(), QueryError> {
454 self.with_configuration(DefaultConfiguration).query::<()>("START TRANSACTION")?.no_result().unwrap();
455 Ok(())
456 }
457
458 pub fn commit(&mut self) -> Result<(), QueryError> {
460 self.with_configuration(DefaultConfiguration).query::<()>("COMMIT")?.no_result().unwrap();
461 Ok(())
462 }
463
464 pub fn rollback(&mut self) -> Result<(), QueryError> {
466 self.with_configuration(DefaultConfiguration).query::<()>("ROLLBACK")?.no_result().unwrap();
467 Ok(())
468 }
469
470 pub fn in_transaction<O, E>(
473 &mut self,
474 f: impl FnOnce(&mut Handle<'c, C>) -> Result<O, E>,
475 ) -> Result<Result<O, E>, QueryError> {
476 self.start_transaction()?;
477 Ok(match f(self) {
478 ok @ Ok(_) => {
479 self.commit()?;
480 ok
481 }
482 err @ Err(_) => {
483 self.rollback()?;
484 err
485 }
486 })
487 }
488
489 pub fn outside_of_transaction<O>(
492 &mut self,
493 f: impl FnOnce(&mut Handle<'c, C>) -> O,
494 ) -> Result<O, QueryError> {
495 self.commit()?;
496 let ret = f(self);
497 self.start_transaction()?;
498 Ok(ret)
499 }
500}