sqlx_core_oldapi/odbc/connection/
mod.rs1use crate::connection::Connection;
2use crate::error::Error;
3use crate::odbc::{
4 Odbc, OdbcArguments, OdbcColumn, OdbcConnectOptions, OdbcQueryResult, OdbcRow, OdbcTypeInfo,
5};
6use crate::transaction::Transaction;
7use either::Either;
8use sqlx_rt::spawn_blocking;
9mod odbc_bridge;
10use crate::odbc::{OdbcStatement, OdbcStatementMetadata};
11use futures_core::future::BoxFuture;
12use futures_util::future;
13use odbc_api::ConnectionTransitions;
14use odbc_api::{handles::StatementConnection, Prepared, ResultSetMetadata, SharedConnection};
15use odbc_bridge::{establish_connection, execute_sql};
16use std::borrow::Cow;
17use std::collections::HashMap;
18use std::sync::{Arc, Mutex};
19
20mod executor;
21
22type PreparedStatement = Prepared<StatementConnection<SharedConnection<'static>>>;
23type SharedPreparedStatement = Arc<Mutex<PreparedStatement>>;
24
25fn collect_columns(prepared: &mut PreparedStatement) -> Vec<OdbcColumn> {
26 let count = prepared.num_result_cols().unwrap_or(0);
27 (1..=count)
28 .map(|i| create_column(prepared, i as u16))
29 .collect()
30}
31
32fn create_column(stmt: &mut PreparedStatement, index: u16) -> OdbcColumn {
33 let mut cd = odbc_api::ColumnDescription::default();
34 let _ = stmt.describe_col(index, &mut cd);
35
36 OdbcColumn {
37 name: decode_column_name(cd.name, index),
38 type_info: OdbcTypeInfo::new(cd.data_type),
39 ordinal: usize::from(index.checked_sub(1).unwrap()),
40 }
41}
42
43fn decode_column_name(name_bytes: Vec<u8>, index: u16) -> String {
44 String::from_utf8(name_bytes).unwrap_or_else(|_| format!("col{}", index - 1))
45}
46
47pub struct OdbcConnection {
52 pub(crate) conn: SharedConnection<'static>,
53 pub(crate) stmt_cache: HashMap<Arc<str>, SharedPreparedStatement>,
54}
55
56impl std::fmt::Debug for OdbcConnection {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("OdbcConnection")
59 .field("conn", &self.conn)
60 .finish()
61 }
62}
63
64impl OdbcConnection {
65 pub(crate) async fn with_conn<R, F, S>(&mut self, operation: S, f: F) -> Result<R, Error>
66 where
67 R: Send + 'static,
68 F: FnOnce(&mut odbc_api::Connection<'static>) -> Result<R, Error> + Send + 'static,
69 S: std::fmt::Display + Send + 'static,
70 {
71 let conn = Arc::clone(&self.conn);
72 spawn_blocking(move || {
73 let mut conn_guard = conn.lock().map_err(|_| {
74 Error::Protocol(format!("ODBC {}: failed to lock connection", operation))
75 })?;
76 f(&mut conn_guard)
77 })
78 .await
79 }
80
81 pub(crate) async fn establish(options: &OdbcConnectOptions) -> Result<Self, Error> {
82 let shared_conn = spawn_blocking({
83 let options = options.clone();
84 move || {
85 let conn = establish_connection(&options)?;
86 let shared_conn = odbc_api::SharedConnection::new(std::sync::Mutex::new(conn));
87 Ok::<_, Error>(shared_conn)
88 }
89 })
90 .await?;
91
92 Ok(Self {
93 conn: shared_conn,
94 stmt_cache: HashMap::new(),
95 })
96 }
97
98 pub(crate) async fn ping_blocking(&mut self) -> Result<(), Error> {
101 self.with_conn("ping", move |conn| {
102 conn.execute("SELECT 1", (), None)?;
103 Ok(())
104 })
105 .await
106 }
107
108 pub(crate) async fn begin_blocking(&mut self) -> Result<(), Error> {
109 self.with_conn("begin", move |conn| {
110 conn.set_autocommit(false)?;
111 Ok(())
112 })
113 .await
114 }
115
116 pub(crate) async fn commit_blocking(&mut self) -> Result<(), Error> {
117 self.with_conn("commit", move |conn| {
118 conn.commit()?;
119 conn.set_autocommit(true)?;
120 Ok(())
121 })
122 .await
123 }
124
125 pub(crate) async fn rollback_blocking(&mut self) -> Result<(), Error> {
126 self.with_conn("rollback", move |conn| {
127 conn.rollback()?;
128 conn.set_autocommit(true)?;
129 Ok(())
130 })
131 .await
132 }
133
134 pub(crate) fn execute_stream(
136 &mut self,
137 sql: &str,
138 args: Option<OdbcArguments>,
139 ) -> flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>> {
140 let (tx, rx) = flume::bounded(64);
141
142 let maybe_prepared = if let Some(prepared) = self.stmt_cache.get(sql) {
143 MaybePrepared::Prepared(Arc::clone(prepared))
144 } else {
145 MaybePrepared::NotPrepared(sql.to_string())
146 };
147
148 let conn = Arc::clone(&self.conn);
149 sqlx_rt::spawn(sqlx_rt::spawn_blocking(move || {
150 let mut conn = conn.lock().expect("failed to lock connection");
151 if let Err(e) = execute_sql(&mut conn, maybe_prepared, args, &tx) {
152 let _ = tx.send(Err(e));
153 }
154 }));
155
156 rx
157 }
158
159 pub(crate) async fn clear_cached_statements(&mut self) -> Result<(), Error> {
160 self.stmt_cache.clear();
162 Ok(())
163 }
164
165 pub async fn prepare<'a>(&mut self, sql: &'a str) -> Result<OdbcStatement<'a>, Error> {
166 let conn = Arc::clone(&self.conn);
167 let sql_arc = Arc::from(sql.to_string());
168 let sql_clone = Arc::clone(&sql_arc);
169 let (prepared, metadata) = spawn_blocking(move || {
170 let mut prepared = conn.into_prepared(&sql_clone)?;
171 let metadata = OdbcStatementMetadata {
172 columns: collect_columns(&mut prepared),
173 parameters: usize::from(prepared.num_params().unwrap_or(0)),
174 };
175 Ok::<_, Error>((prepared, metadata))
176 })
177 .await?;
178 self.stmt_cache
179 .insert(Arc::clone(&sql_arc), Arc::new(Mutex::new(prepared)));
180 Ok(OdbcStatement {
181 sql: Cow::Borrowed(sql),
182 metadata,
183 })
184 }
185}
186
187pub(crate) enum MaybePrepared {
188 Prepared(SharedPreparedStatement),
189 NotPrepared(String),
190}
191
192impl Connection for OdbcConnection {
193 type Database = Odbc;
194
195 type Options = OdbcConnectOptions;
196
197 fn close(self) -> BoxFuture<'static, Result<(), Error>> {
198 Box::pin(async move {
199 drop(self);
201 Ok(())
202 })
203 }
204
205 fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
206 Box::pin(async move { Ok(()) })
207 }
208
209 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
210 Box::pin(self.ping_blocking())
211 }
212
213 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
214 where
215 Self: Sized,
216 {
217 Transaction::begin(self)
218 }
219
220 #[doc(hidden)]
221 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
222 Box::pin(future::ok(()))
223 }
224
225 #[doc(hidden)]
226 fn should_flush(&self) -> bool {
227 false
228 }
229
230 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
231 Box::pin(self.clear_cached_statements())
232 }
233
234 fn dbms_name(&mut self) -> BoxFuture<'_, Result<String, Error>> {
235 Box::pin(async move {
236 self.with_conn("dbms_name", move |conn| {
237 Ok(conn.database_management_system_name()?)
238 })
239 .await
240 })
241 }
242}
243
244