sqlx_core_oldapi/odbc/connection/
mod.rs1use crate::connection::Connection;
2use crate::error::Error;
3use crate::odbc::{
4 Odbc, OdbcArguments, OdbcBufferSettings, OdbcColumn, OdbcConnectOptions, OdbcQueryResult,
5 OdbcRow, OdbcTypeInfo,
6};
7use crate::transaction::Transaction;
8use either::Either;
9use odbc_api::handles::AsStatementRef;
10use sqlx_rt::spawn_blocking;
11mod odbc_bridge;
12use crate::odbc::{OdbcStatement, OdbcStatementMetadata};
13use futures_core::future::BoxFuture;
14use futures_util::future;
15use odbc_api::ConnectionTransitions;
16use odbc_api::{handles::StatementConnection, Prepared, ResultSetMetadata, SharedConnection};
17use odbc_bridge::{establish_connection, execute_sql};
18use std::borrow::Cow;
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21
22mod executor;
23
24type PreparedStatement = Prepared<StatementConnection<SharedConnection<'static>>>;
25type SharedPreparedStatement = Arc<Mutex<PreparedStatement>>;
26
27fn collect_columns(prepared: &mut PreparedStatement) -> Result<Vec<OdbcColumn>, Error> {
28 let count = prepared.num_result_cols()?;
29 let mut columns = Vec::with_capacity(count as usize);
30 for i in 1..=count {
31 columns.push(create_column(prepared, i as u16)?);
32 }
33 Ok(columns)
34}
35
36fn create_column(stmt: &mut PreparedStatement, index: u16) -> Result<OdbcColumn, Error> {
37 let mut cd = odbc_api::ColumnDescription::default();
38 stmt.describe_col(index, &mut cd)?;
39
40 Ok(OdbcColumn {
41 name: decode_column_name(cd.name, index),
42 type_info: OdbcTypeInfo::new(cd.data_type),
43 ordinal: usize::from(index.checked_sub(1).unwrap()),
44 })
45}
46
47pub(super) trait ColumnNameDecode {
48 fn decode_or_default(self, index: u16) -> String;
49}
50
51impl ColumnNameDecode for Vec<u8> {
52 fn decode_or_default(self, index: u16) -> String {
53 String::from_utf8(self).unwrap_or_else(|_| format!("col{}", index - 1))
54 }
55}
56
57impl ColumnNameDecode for Vec<u16> {
58 fn decode_or_default(self, index: u16) -> String {
59 String::from_utf16(&self).unwrap_or_else(|_| format!("col{}", index - 1))
60 }
61}
62
63pub(super) fn decode_column_name<T: ColumnNameDecode>(name: T, index: u16) -> String {
64 name.decode_or_default(index)
65}
66
67pub struct OdbcConnection {
72 pub(crate) conn: SharedConnection<'static>,
73 pub(crate) stmt_cache: HashMap<Arc<str>, SharedPreparedStatement>,
74 pub(crate) buffer_settings: OdbcBufferSettings,
75}
76
77impl std::fmt::Debug for OdbcConnection {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("OdbcConnection")
80 .field("conn", &self.conn)
81 .field("buffer_settings", &self.buffer_settings)
82 .finish()
83 }
84}
85
86impl OdbcConnection {
87 pub(crate) async fn with_conn<R, F, S>(&mut self, operation: S, f: F) -> Result<R, Error>
88 where
89 R: Send + 'static,
90 F: FnOnce(&mut odbc_api::Connection<'static>) -> Result<R, Error> + Send + 'static,
91 S: std::fmt::Display + Send + 'static,
92 {
93 let conn = Arc::clone(&self.conn);
94 spawn_blocking(move || {
95 let mut conn_guard = conn.lock().map_err(|_| {
96 Error::Protocol(format!("ODBC {}: failed to lock connection", operation))
97 })?;
98 f(&mut conn_guard)
99 })
100 .await
101 }
102
103 pub(crate) async fn establish(options: &OdbcConnectOptions) -> Result<Self, Error> {
104 let shared_conn = spawn_blocking({
105 let options = options.clone();
106 move || {
107 let conn = establish_connection(&options)?;
108 let shared_conn = odbc_api::SharedConnection::new(std::sync::Mutex::new(conn));
109 Ok::<_, Error>(shared_conn)
110 }
111 })
112 .await?;
113
114 Ok(Self {
115 conn: shared_conn,
116 stmt_cache: HashMap::new(),
117 buffer_settings: options.buffer_settings,
118 })
119 }
120
121 pub(crate) async fn ping_blocking(&mut self) -> Result<(), Error> {
124 self.with_conn("ping", move |conn| {
125 conn.execute("SELECT 1", (), None)?;
126 Ok(())
127 })
128 .await
129 }
130
131 pub(crate) async fn begin_blocking(&mut self) -> Result<(), Error> {
132 self.with_conn("begin", move |conn| {
133 conn.set_autocommit(false)?;
134 Ok(())
135 })
136 .await
137 }
138
139 pub(crate) async fn commit_blocking(&mut self) -> Result<(), Error> {
140 self.with_conn("commit", move |conn| {
141 conn.commit()?;
142 conn.set_autocommit(true)?;
143 Ok(())
144 })
145 .await
146 }
147
148 pub(crate) async fn rollback_blocking(&mut self) -> Result<(), Error> {
149 self.with_conn("rollback", move |conn| {
150 conn.rollback()?;
151 conn.set_autocommit(true)?;
152 Ok(())
153 })
154 .await
155 }
156
157 pub(crate) fn execute_stream(
159 &mut self,
160 sql: &str,
161 args: Option<OdbcArguments>,
162 ) -> flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>> {
163 let (tx, rx) = flume::bounded(64);
164
165 let maybe_prepared = if let Some(prepared) = self.stmt_cache.get(sql) {
166 MaybePrepared::Prepared(Arc::clone(prepared))
167 } else {
168 MaybePrepared::NotPrepared(sql.to_string())
169 };
170
171 let conn = Arc::clone(&self.conn);
172 let buffer_settings = self.buffer_settings;
173 sqlx_rt::spawn(sqlx_rt::spawn_blocking(move || {
174 let mut conn = conn.lock().expect("failed to lock connection");
175 if let Err(e) = execute_sql(&mut conn, maybe_prepared, args, &tx, buffer_settings) {
176 let _ = tx.send(Err(e));
177 }
178 }));
179
180 rx
181 }
182
183 pub(crate) async fn clear_cached_statements(&mut self) -> Result<(), Error> {
184 self.stmt_cache.clear();
186 Ok(())
187 }
188
189 pub async fn prepare<'a>(&mut self, sql: &'a str) -> Result<OdbcStatement<'a>, Error> {
190 let conn = Arc::clone(&self.conn);
191 let sql_arc = Arc::from(sql.to_string());
192 let sql_clone = Arc::clone(&sql_arc);
193 let (prepared, metadata) = spawn_blocking(move || {
194 let mut prepared = conn.into_prepared(&sql_clone)?;
195 let metadata = OdbcStatementMetadata {
196 columns: collect_columns(&mut prepared)?,
197 parameters: usize::from(prepared.num_params()?),
198 };
199 Ok::<_, Error>((prepared, metadata))
200 })
201 .await?;
202 self.stmt_cache
203 .insert(Arc::clone(&sql_arc), Arc::new(Mutex::new(prepared)));
204 Ok(OdbcStatement {
205 sql: Cow::Borrowed(sql),
206 metadata,
207 })
208 }
209}
210
211pub(crate) enum MaybePrepared {
212 Prepared(SharedPreparedStatement),
213 NotPrepared(String),
214}
215
216impl std::fmt::Debug for MaybePrepared {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 match self {
219 MaybePrepared::Prepared(prepared) => f
220 .debug_tuple("Prepared")
221 .field(&prepared.lock().unwrap().as_stmt_ref())
222 .finish(),
223 MaybePrepared::NotPrepared(sql) => f.debug_tuple("NotPrepared").field(sql).finish(),
224 }
225 }
226}
227
228impl Connection for OdbcConnection {
229 type Database = Odbc;
230
231 type Options = OdbcConnectOptions;
232
233 fn close(self) -> BoxFuture<'static, Result<(), Error>> {
234 Box::pin(async move {
235 drop(self);
237 Ok(())
238 })
239 }
240
241 fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
242 Box::pin(async move { Ok(()) })
243 }
244
245 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
246 Box::pin(self.ping_blocking())
247 }
248
249 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
250 where
251 Self: Sized,
252 {
253 Transaction::begin(self)
254 }
255
256 #[doc(hidden)]
257 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
258 Box::pin(future::ok(()))
259 }
260
261 #[doc(hidden)]
262 fn should_flush(&self) -> bool {
263 false
264 }
265
266 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
267 Box::pin(self.clear_cached_statements())
268 }
269
270 fn dbms_name(&mut self) -> BoxFuture<'_, Result<String, Error>> {
271 Box::pin(async move {
272 self.with_conn("dbms_name", move |conn| {
273 Ok(conn.database_management_system_name()?)
274 })
275 .await
276 })
277 }
278}
279
280