sqlx_core_oldapi/odbc/connection/
mod.rs

1use crate::connection::Connection;
2use crate::error::Error;
3use crate::odbc::{
4    Odbc, OdbcArguments, OdbcColumn, OdbcConnectOptions, OdbcQueryResult, OdbcRow, OdbcTypeInfo,
5};
6use crate::transaction::Transaction;
7use either::Either;
8use sqlx_rt::spawn_blocking;
9mod odbc_bridge;
10use crate::odbc::{OdbcStatement, OdbcStatementMetadata};
11use futures_core::future::BoxFuture;
12use futures_util::future;
13use odbc_api::ConnectionTransitions;
14use odbc_api::{handles::StatementConnection, Prepared, ResultSetMetadata, SharedConnection};
15use odbc_bridge::{establish_connection, execute_sql};
16use std::borrow::Cow;
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex};
19
20mod executor;
21
22type PreparedStatement = Prepared<StatementConnection<SharedConnection<'static>>>;
23type SharedPreparedStatement = Arc<Mutex<PreparedStatement>>;
24
25fn collect_columns(prepared: &mut PreparedStatement) -> Vec<OdbcColumn> {
26    let count = prepared.num_result_cols().unwrap_or(0);
27    (1..=count)
28        .map(|i| create_column(prepared, i as u16))
29        .collect()
30}
31
32fn create_column(stmt: &mut PreparedStatement, index: u16) -> OdbcColumn {
33    let mut cd = odbc_api::ColumnDescription::default();
34    let _ = stmt.describe_col(index, &mut cd);
35
36    OdbcColumn {
37        name: decode_column_name(cd.name, index),
38        type_info: OdbcTypeInfo::new(cd.data_type),
39        ordinal: usize::from(index.checked_sub(1).unwrap()),
40    }
41}
42
43pub(super) trait ColumnNameDecode {
44    fn decode_or_default(self, index: u16) -> String;
45}
46
47impl ColumnNameDecode for Vec<u8> {
48    fn decode_or_default(self, index: u16) -> String {
49        String::from_utf8(self).unwrap_or_else(|_| format!("col{}", index - 1))
50    }
51}
52
53impl ColumnNameDecode for Vec<u16> {
54    fn decode_or_default(self, index: u16) -> String {
55        String::from_utf16(&self).unwrap_or_else(|_| format!("col{}", index - 1))
56    }
57}
58
59pub(super) fn decode_column_name<T: ColumnNameDecode>(name: T, index: u16) -> String {
60    name.decode_or_default(index)
61}
62
63/// A connection to an ODBC-accessible database.
64///
65/// ODBC uses a blocking C API, so we offload blocking calls to the runtime's blocking
66/// thread-pool via `spawn_blocking` and synchronize access with a mutex.
67pub struct OdbcConnection {
68    pub(crate) conn: SharedConnection<'static>,
69    pub(crate) stmt_cache: HashMap<Arc<str>, SharedPreparedStatement>,
70}
71
72impl std::fmt::Debug for OdbcConnection {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("OdbcConnection")
75            .field("conn", &self.conn)
76            .finish()
77    }
78}
79
80impl OdbcConnection {
81    pub(crate) async fn with_conn<R, F, S>(&mut self, operation: S, f: F) -> Result<R, Error>
82    where
83        R: Send + 'static,
84        F: FnOnce(&mut odbc_api::Connection<'static>) -> Result<R, Error> + Send + 'static,
85        S: std::fmt::Display + Send + 'static,
86    {
87        let conn = Arc::clone(&self.conn);
88        spawn_blocking(move || {
89            let mut conn_guard = conn.lock().map_err(|_| {
90                Error::Protocol(format!("ODBC {}: failed to lock connection", operation))
91            })?;
92            f(&mut conn_guard)
93        })
94        .await
95    }
96
97    pub(crate) async fn establish(options: &OdbcConnectOptions) -> Result<Self, Error> {
98        let shared_conn = spawn_blocking({
99            let options = options.clone();
100            move || {
101                let conn = establish_connection(&options)?;
102                let shared_conn = odbc_api::SharedConnection::new(std::sync::Mutex::new(conn));
103                Ok::<_, Error>(shared_conn)
104            }
105        })
106        .await?;
107
108        Ok(Self {
109            conn: shared_conn,
110            stmt_cache: HashMap::new(),
111        })
112    }
113
114    // (dbms_name moved to the Connection trait implementation)
115
116    pub(crate) async fn ping_blocking(&mut self) -> Result<(), Error> {
117        self.with_conn("ping", move |conn| {
118            conn.execute("SELECT 1", (), None)?;
119            Ok(())
120        })
121        .await
122    }
123
124    pub(crate) async fn begin_blocking(&mut self) -> Result<(), Error> {
125        self.with_conn("begin", move |conn| {
126            conn.set_autocommit(false)?;
127            Ok(())
128        })
129        .await
130    }
131
132    pub(crate) async fn commit_blocking(&mut self) -> Result<(), Error> {
133        self.with_conn("commit", move |conn| {
134            conn.commit()?;
135            conn.set_autocommit(true)?;
136            Ok(())
137        })
138        .await
139    }
140
141    pub(crate) async fn rollback_blocking(&mut self) -> Result<(), Error> {
142        self.with_conn("rollback", move |conn| {
143            conn.rollback()?;
144            conn.set_autocommit(true)?;
145            Ok(())
146        })
147        .await
148    }
149
150    /// Launches a background task to execute the SQL statement and send the results to the returned channel.
151    pub(crate) fn execute_stream(
152        &mut self,
153        sql: &str,
154        args: Option<OdbcArguments>,
155    ) -> flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>> {
156        let (tx, rx) = flume::bounded(64);
157
158        let maybe_prepared = if let Some(prepared) = self.stmt_cache.get(sql) {
159            MaybePrepared::Prepared(Arc::clone(prepared))
160        } else {
161            MaybePrepared::NotPrepared(sql.to_string())
162        };
163
164        let conn = Arc::clone(&self.conn);
165        sqlx_rt::spawn(sqlx_rt::spawn_blocking(move || {
166            let mut conn = conn.lock().expect("failed to lock connection");
167            if let Err(e) = execute_sql(&mut conn, maybe_prepared, args, &tx) {
168                let _ = tx.send(Err(e));
169            }
170        }));
171
172        rx
173    }
174
175    pub(crate) async fn clear_cached_statements(&mut self) -> Result<(), Error> {
176        // Clear the statement metadata cache
177        self.stmt_cache.clear();
178        Ok(())
179    }
180
181    pub async fn prepare<'a>(&mut self, sql: &'a str) -> Result<OdbcStatement<'a>, Error> {
182        let conn = Arc::clone(&self.conn);
183        let sql_arc = Arc::from(sql.to_string());
184        let sql_clone = Arc::clone(&sql_arc);
185        let (prepared, metadata) = spawn_blocking(move || {
186            let mut prepared = conn.into_prepared(&sql_clone)?;
187            let metadata = OdbcStatementMetadata {
188                columns: collect_columns(&mut prepared),
189                parameters: usize::from(prepared.num_params().unwrap_or(0)),
190            };
191            Ok::<_, Error>((prepared, metadata))
192        })
193        .await?;
194        self.stmt_cache
195            .insert(Arc::clone(&sql_arc), Arc::new(Mutex::new(prepared)));
196        Ok(OdbcStatement {
197            sql: Cow::Borrowed(sql),
198            metadata,
199        })
200    }
201}
202
203pub(crate) enum MaybePrepared {
204    Prepared(SharedPreparedStatement),
205    NotPrepared(String),
206}
207
208impl Connection for OdbcConnection {
209    type Database = Odbc;
210
211    type Options = OdbcConnectOptions;
212
213    fn close(self) -> BoxFuture<'static, Result<(), Error>> {
214        Box::pin(async move {
215            // Drop connection by moving Arc and letting it fall out of scope.
216            drop(self);
217            Ok(())
218        })
219    }
220
221    fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
222        Box::pin(async move { Ok(()) })
223    }
224
225    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
226        Box::pin(self.ping_blocking())
227    }
228
229    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
230    where
231        Self: Sized,
232    {
233        Transaction::begin(self)
234    }
235
236    #[doc(hidden)]
237    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
238        Box::pin(future::ok(()))
239    }
240
241    #[doc(hidden)]
242    fn should_flush(&self) -> bool {
243        false
244    }
245
246    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
247        Box::pin(self.clear_cached_statements())
248    }
249
250    fn dbms_name(&mut self) -> BoxFuture<'_, Result<String, Error>> {
251        Box::pin(async move {
252            self.with_conn("dbms_name", move |conn| {
253                Ok(conn.database_management_system_name()?)
254            })
255            .await
256        })
257    }
258}
259
260// moved helpers to connection/inner.rs