odbc_api/
execute.rs

1use std::mem::transmute;
2
3use crate::{
4    CursorImpl, CursorPolling, Error, ParameterCollectionRef, Sleep,
5    handles::{AsStatementRef, SqlText, Statement, StatementRef},
6    parameter::Blob,
7    sleep::wait_for,
8};
9
10/// Shared implementation for executing a query with parameters between [`crate::Connection`],
11/// [`crate::Preallocated`] and [`crate::Prepared`].
12///
13/// # Parameters
14///
15/// * `lazy_statement`: Factory for statement handle used to execute the query. We pass the pass the
16///   statement lazily in order to avoid unnecessary allocating a statement handle in case the
17///   parameter set is empty.
18/// * `query`: SQL query to be executed. If `None` it is a assumed a prepared query is to be
19///   executed.
20/// * `params`: The parameters bound to the statement before query execution.
21pub fn execute_with_parameters<S>(
22    statement: S,
23    query: Option<&SqlText<'_>>,
24    params: impl ParameterCollectionRef,
25) -> Result<Option<CursorImpl<S>>, Error>
26where
27    S: AsStatementRef,
28{
29    unsafe {
30        if let Some(statement) = bind_parameters(statement, params)? {
31            execute(statement, query)
32        } else {
33            Ok(None)
34        }
35    }
36}
37
38/// Asynchronous sibiling of [`execute_with_parameters`]
39pub async fn execute_with_parameters_polling<S>(
40    statement: S,
41    query: Option<&SqlText<'_>>,
42    params: impl ParameterCollectionRef,
43    sleep: impl Sleep,
44) -> Result<Option<CursorPolling<S>>, Error>
45where
46    S: AsStatementRef,
47{
48    unsafe {
49        if let Some(statement) = bind_parameters(statement, params)? {
50            execute_polling(statement, query, sleep).await
51        } else {
52            Ok(None)
53        }
54    }
55}
56
57unsafe fn bind_parameters<S>(
58    mut statement: S,
59    mut params: impl ParameterCollectionRef,
60) -> Result<Option<S>, Error>
61where
62    S: AsStatementRef,
63{
64    unsafe {
65        let parameter_set_size = params.parameter_set_size();
66
67        let mut stmt = statement.as_stmt_ref();
68        // Reset parameters so we do not dereference stale once by mistake if we call
69        // `exec_direct`.
70        stmt.reset_parameters().into_result(&stmt)?;
71        stmt.set_paramset_size(parameter_set_size)
72            .into_result(&stmt)?;
73        // Bind new parameters passed by caller.
74        params.bind_parameters_to(&mut stmt)?;
75        Ok(Some(statement))
76    }
77}
78
79/// # Safety
80///
81/// * Execute may dereference pointers to bound parameters, so these must guaranteed to be valid
82///   then calling this function.
83/// * Furthermore all bound delayed parameters must be of type `*mut &mut dyn Blob`.
84pub unsafe fn execute<S>(
85    mut statement: S,
86    query: Option<&SqlText<'_>>,
87) -> Result<Option<CursorImpl<S>>, Error>
88where
89    S: AsStatementRef,
90{
91    unsafe {
92        let mut stmt = statement.as_stmt_ref();
93        let result = if let Some(sql) = query {
94            // We execute an unprepared "one shot query"
95            stmt.exec_direct(sql)
96        } else {
97            // We execute a prepared query
98            stmt.execute()
99        };
100
101        // If delayed parameters (e.g. input streams) are bound we might need to put data in order
102        // to execute.
103        let need_data = result
104            .on_success(|| false)
105            .on_no_data(|| false)
106            .on_need_data(|| true)
107            .into_result(&stmt)?;
108
109        if need_data {
110            // Check if any delayed parameters have been bound which stream data to the database at
111            // statement execution time. Loops over each bound stream.
112            while let Some(blob_ref) = next_blob_param(&mut stmt)? {
113                // Loop over all batches within each blob
114                while let Some(batch) = blob_ref.next_batch().map_err(Error::FailedReadingInput)? {
115                    stmt.put_binary_batch(batch).into_result(&stmt)?;
116                }
117            }
118        }
119
120        // Check if a result set has been created.
121        if stmt.num_result_cols().into_result(&stmt)? == 0 {
122            Ok(None)
123        } else {
124            // Safe: `statement` is in cursor state.
125            let cursor = CursorImpl::new(statement);
126            Ok(Some(cursor))
127        }
128    }
129}
130
131/// # Safety
132///
133/// * Execute may dereference pointers to bound parameters, so these must guaranteed to be valid
134///   then calling this function.
135/// * Furthermore all bound delayed parameters must be of type `*mut &mut dyn Blob`.
136pub async unsafe fn execute_polling<S>(
137    mut statement: S,
138    query: Option<&SqlText<'_>>,
139    mut sleep: impl Sleep,
140) -> Result<Option<CursorPolling<S>>, Error>
141where
142    S: AsStatementRef,
143{
144    unsafe {
145        let mut stmt = statement.as_stmt_ref();
146        let result = if let Some(sql) = query {
147            // We execute an unprepared "one shot query"
148            wait_for(|| stmt.exec_direct(sql), &mut sleep).await
149        } else {
150            // We execute a prepared query
151            wait_for(|| stmt.execute(), &mut sleep).await
152        };
153
154        // If delayed parameters (e.g. input streams) are bound we might need to put data in order
155        // to execute.
156        let need_data = result
157            .on_success(|| false)
158            .on_no_data(|| false)
159            .on_need_data(|| true)
160            .into_result(&stmt)?;
161
162        if need_data {
163            // Check if any delayed parameters have been bound which stream data to the database at
164            // statement execution time. Loops over each bound stream.
165            while let Some(blob_ref) = next_blob_param(&mut stmt)? {
166                // Loop over all batches within each blob
167                while let Some(batch) = blob_ref.next_batch().map_err(Error::FailedReadingInput)? {
168                    let result = wait_for(|| stmt.put_binary_batch(batch), &mut sleep).await;
169                    result.into_result(&stmt)?;
170                }
171            }
172        }
173
174        // Check if a result set has been created.
175        let num_result_cols = wait_for(|| stmt.num_result_cols(), &mut sleep)
176            .await
177            .into_result(&stmt)?;
178        if num_result_cols == 0 {
179            Ok(None)
180        } else {
181            // Safe: `statement` is in cursor state.
182            let cursor = CursorPolling::new(statement);
183            Ok(Some(cursor))
184        }
185    }
186}
187
188unsafe fn next_blob_param<'a>(
189    stmt: &mut StatementRef<'a>,
190) -> Result<Option<&'a mut dyn Blob>, Error> {
191    let maybe_ptr = stmt.param_data().into_result(stmt)?;
192    if let Some(blob_ptr) = maybe_ptr {
193        // The safe interfaces currently exclusively bind pointers to `Blob` trait objects
194        let blob_ptr: *mut &mut dyn Blob = unsafe { transmute(blob_ptr) };
195        let blob_ref = unsafe { &mut *blob_ptr };
196        Ok(Some(*blob_ref))
197    } else {
198        Ok(None)
199    }
200}
201
202/// Shared implementation for executing a columns query between [`crate::Connection`] and
203/// [`crate::Preallocated`].
204pub fn execute_columns<S>(
205    mut statement: S,
206    catalog_name: &SqlText,
207    schema_name: &SqlText,
208    table_name: &SqlText,
209    column_name: &SqlText,
210) -> Result<CursorImpl<S>, Error>
211where
212    S: AsStatementRef,
213{
214    let mut stmt = statement.as_stmt_ref();
215
216    stmt.columns(catalog_name, schema_name, table_name, column_name)
217        .into_result(&stmt)?;
218
219    // We assume columns always creates a result set, since it works like a SELECT statement.
220    debug_assert_ne!(stmt.num_result_cols().unwrap(), 0);
221
222    // Safe: `statement` is in cursor state
223    let cursor = unsafe { CursorImpl::new(statement) };
224    Ok(cursor)
225}
226
227/// Shared implementation for executing a tables query between [`crate::Connection`] and
228/// [`crate::Preallocated`].
229pub fn execute_tables<S>(
230    mut statement: S,
231    catalog_name: &SqlText,
232    schema_name: &SqlText,
233    table_name: &SqlText,
234    column_name: &SqlText,
235) -> Result<CursorImpl<S>, Error>
236where
237    S: AsStatementRef,
238{
239    let mut stmt = statement.as_stmt_ref();
240
241    stmt.tables(catalog_name, schema_name, table_name, column_name)
242        .into_result(&stmt)?;
243
244    // We assume tables always creates a result set, since it works like a SELECT statement.
245    debug_assert_ne!(stmt.num_result_cols().unwrap(), 0);
246
247    // Safe: `statement` is in Cursor state.
248    let cursor = unsafe { CursorImpl::new(statement) };
249
250    Ok(cursor)
251}
252
253/// Shared implementation for executing a foreign keys query between [`crate::Connection`] and
254/// [`crate::Preallocated`].
255pub fn execute_foreign_keys<S>(
256    mut statement: S,
257    pk_catalog_name: &SqlText,
258    pk_schema_name: &SqlText,
259    pk_table_name: &SqlText,
260    fk_catalog_name: &SqlText,
261    fk_schema_name: &SqlText,
262    fk_table_name: &SqlText,
263) -> Result<CursorImpl<S>, Error>
264where
265    S: AsStatementRef,
266{
267    let mut stmt = statement.as_stmt_ref();
268
269    stmt.foreign_keys(
270        pk_catalog_name,
271        pk_schema_name,
272        pk_table_name,
273        fk_catalog_name,
274        fk_schema_name,
275        fk_table_name,
276    )
277    .into_result(&stmt)?;
278
279    // We assume foreign keys always creates a result set, since it works like a SELECT statement.
280    debug_assert_ne!(stmt.num_result_cols().unwrap(), 0);
281
282    // Safe: `statement` is in Cursor state.
283    let cursor = unsafe { CursorImpl::new(statement) };
284
285    Ok(cursor)
286}