sqlx_core_oldapi/odbc/connection/
mod.rs

1use crate::connection::Connection;
2use crate::error::Error;
3use crate::odbc::{
4    Odbc, OdbcArguments, OdbcBufferSettings, OdbcColumn, OdbcConnectOptions, OdbcQueryResult,
5    OdbcRow, OdbcTypeInfo,
6};
7use crate::transaction::Transaction;
8use either::Either;
9use odbc_api::handles::AsStatementRef;
10use sqlx_rt::spawn_blocking;
11mod odbc_bridge;
12use crate::odbc::{OdbcStatement, OdbcStatementMetadata};
13use futures_core::future::BoxFuture;
14use futures_util::future;
15use odbc_api::ConnectionTransitions;
16use odbc_api::{handles::StatementConnection, Prepared, ResultSetMetadata, SharedConnection};
17use odbc_bridge::{establish_connection, execute_sql};
18use std::borrow::Cow;
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21
22mod executor;
23
24type PreparedStatement = Prepared<StatementConnection<SharedConnection<'static>>>;
25type SharedPreparedStatement = Arc<Mutex<PreparedStatement>>;
26
27fn collect_columns(prepared: &mut PreparedStatement) -> Result<Vec<OdbcColumn>, Error> {
28    let count = prepared.num_result_cols()?;
29    let mut columns = Vec::with_capacity(count as usize);
30    for i in 1..=count {
31        columns.push(create_column(prepared, i as u16)?);
32    }
33    Ok(columns)
34}
35
36fn create_column(stmt: &mut PreparedStatement, index: u16) -> Result<OdbcColumn, Error> {
37    let mut cd = odbc_api::ColumnDescription::default();
38    stmt.describe_col(index, &mut cd)?;
39
40    Ok(OdbcColumn {
41        name: decode_column_name(cd.name, index),
42        type_info: OdbcTypeInfo::new(cd.data_type),
43        ordinal: usize::from(index.checked_sub(1).unwrap()),
44    })
45}
46
47pub(super) trait ColumnNameDecode {
48    fn decode_or_default(self, index: u16) -> String;
49}
50
51impl ColumnNameDecode for Vec<u8> {
52    fn decode_or_default(self, index: u16) -> String {
53        String::from_utf8(self).unwrap_or_else(|_| format!("col{}", index - 1))
54    }
55}
56
57impl ColumnNameDecode for Vec<u16> {
58    fn decode_or_default(self, index: u16) -> String {
59        String::from_utf16(&self).unwrap_or_else(|_| format!("col{}", index - 1))
60    }
61}
62
63pub(super) fn decode_column_name<T: ColumnNameDecode>(name: T, index: u16) -> String {
64    name.decode_or_default(index)
65}
66
67/// A connection to an ODBC-accessible database.
68///
69/// ODBC uses a blocking C API, so we offload blocking calls to the runtime's blocking
70/// thread-pool via `spawn_blocking` and synchronize access with a mutex.
71pub struct OdbcConnection {
72    pub(crate) conn: SharedConnection<'static>,
73    pub(crate) stmt_cache: HashMap<Arc<str>, SharedPreparedStatement>,
74    pub(crate) buffer_settings: OdbcBufferSettings,
75}
76
77impl std::fmt::Debug for OdbcConnection {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("OdbcConnection")
80            .field("conn", &self.conn)
81            .field("buffer_settings", &self.buffer_settings)
82            .finish()
83    }
84}
85
86impl OdbcConnection {
87    pub(crate) async fn with_conn<R, F, S>(&mut self, operation: S, f: F) -> Result<R, Error>
88    where
89        R: Send + 'static,
90        F: FnOnce(&mut odbc_api::Connection<'static>) -> Result<R, Error> + Send + 'static,
91        S: std::fmt::Display + Send + 'static,
92    {
93        let conn = Arc::clone(&self.conn);
94        spawn_blocking(move || {
95            let mut conn_guard = conn.lock().map_err(|_| {
96                Error::Protocol(format!("ODBC {}: failed to lock connection", operation))
97            })?;
98            f(&mut conn_guard)
99        })
100        .await
101    }
102
103    pub(crate) async fn establish(options: &OdbcConnectOptions) -> Result<Self, Error> {
104        let shared_conn = spawn_blocking({
105            let options = options.clone();
106            move || {
107                let conn = establish_connection(&options)?;
108                let shared_conn = odbc_api::SharedConnection::new(std::sync::Mutex::new(conn));
109                Ok::<_, Error>(shared_conn)
110            }
111        })
112        .await?;
113
114        Ok(Self {
115            conn: shared_conn,
116            stmt_cache: HashMap::new(),
117            buffer_settings: options.buffer_settings,
118        })
119    }
120
121    // (dbms_name moved to the Connection trait implementation)
122
123    pub(crate) async fn ping_blocking(&mut self) -> Result<(), Error> {
124        self.with_conn("ping", move |conn| {
125            conn.execute("SELECT 1", (), None)?;
126            Ok(())
127        })
128        .await
129    }
130
131    pub(crate) async fn begin_blocking(&mut self) -> Result<(), Error> {
132        self.with_conn("begin", move |conn| {
133            conn.set_autocommit(false)?;
134            Ok(())
135        })
136        .await
137    }
138
139    pub(crate) async fn commit_blocking(&mut self) -> Result<(), Error> {
140        self.with_conn("commit", move |conn| {
141            conn.commit()?;
142            conn.set_autocommit(true)?;
143            Ok(())
144        })
145        .await
146    }
147
148    pub(crate) async fn rollback_blocking(&mut self) -> Result<(), Error> {
149        self.with_conn("rollback", move |conn| {
150            conn.rollback()?;
151            conn.set_autocommit(true)?;
152            Ok(())
153        })
154        .await
155    }
156
157    /// Launches a background task to execute the SQL statement and send the results to the returned channel.
158    pub(crate) fn execute_stream(
159        &mut self,
160        sql: &str,
161        args: Option<OdbcArguments>,
162    ) -> flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>> {
163        let (tx, rx) = flume::bounded(64);
164
165        let maybe_prepared = if let Some(prepared) = self.stmt_cache.get(sql) {
166            MaybePrepared::Prepared(Arc::clone(prepared))
167        } else {
168            MaybePrepared::NotPrepared(sql.to_string())
169        };
170
171        let conn = Arc::clone(&self.conn);
172        let buffer_settings = self.buffer_settings;
173        sqlx_rt::spawn(sqlx_rt::spawn_blocking(move || {
174            let mut conn = conn.lock().expect("failed to lock connection");
175            if let Err(e) = execute_sql(&mut conn, maybe_prepared, args, &tx, buffer_settings) {
176                let _ = tx.send(Err(e));
177            }
178        }));
179
180        rx
181    }
182
183    pub(crate) async fn clear_cached_statements(&mut self) -> Result<(), Error> {
184        // Clear the statement metadata cache
185        self.stmt_cache.clear();
186        Ok(())
187    }
188
189    pub async fn prepare<'a>(&mut self, sql: &'a str) -> Result<OdbcStatement<'a>, Error> {
190        let conn = Arc::clone(&self.conn);
191        let sql_arc = Arc::from(sql.to_string());
192        let sql_clone = Arc::clone(&sql_arc);
193        let (prepared, metadata) = spawn_blocking(move || {
194            let mut prepared = conn.into_prepared(&sql_clone)?;
195            let metadata = OdbcStatementMetadata {
196                columns: collect_columns(&mut prepared)?,
197                parameters: usize::from(prepared.num_params()?),
198            };
199            Ok::<_, Error>((prepared, metadata))
200        })
201        .await?;
202        self.stmt_cache
203            .insert(Arc::clone(&sql_arc), Arc::new(Mutex::new(prepared)));
204        Ok(OdbcStatement {
205            sql: Cow::Borrowed(sql),
206            metadata,
207        })
208    }
209}
210
211pub(crate) enum MaybePrepared {
212    Prepared(SharedPreparedStatement),
213    NotPrepared(String),
214}
215
216impl std::fmt::Debug for MaybePrepared {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        match self {
219            MaybePrepared::Prepared(prepared) => f
220                .debug_tuple("Prepared")
221                .field(&prepared.lock().unwrap().as_stmt_ref())
222                .finish(),
223            MaybePrepared::NotPrepared(sql) => f.debug_tuple("NotPrepared").field(sql).finish(),
224        }
225    }
226}
227
228impl Connection for OdbcConnection {
229    type Database = Odbc;
230
231    type Options = OdbcConnectOptions;
232
233    fn close(self) -> BoxFuture<'static, Result<(), Error>> {
234        Box::pin(async move {
235            // Drop connection by moving Arc and letting it fall out of scope.
236            drop(self);
237            Ok(())
238        })
239    }
240
241    fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
242        Box::pin(async move { Ok(()) })
243    }
244
245    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
246        Box::pin(self.ping_blocking())
247    }
248
249    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
250    where
251        Self: Sized,
252    {
253        Transaction::begin(self)
254    }
255
256    #[doc(hidden)]
257    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
258        Box::pin(future::ok(()))
259    }
260
261    #[doc(hidden)]
262    fn should_flush(&self) -> bool {
263        false
264    }
265
266    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
267        Box::pin(self.clear_cached_statements())
268    }
269
270    fn dbms_name(&mut self) -> BoxFuture<'_, Result<String, Error>> {
271        Box::pin(async move {
272            self.with_conn("dbms_name", move |conn| {
273                Ok(conn.database_management_system_name()?)
274            })
275            .await
276        })
277    }
278}
279
280// moved helpers to connection/inner.rs