1use std::mem::transmute;
2
3use crate::{
4 CursorImpl, CursorPolling, Error, ParameterCollectionRef, Sleep,
5 handles::{AsStatementRef, SqlText, Statement, StatementRef},
6 parameter::Blob,
7 sleep::wait_for,
8};
9
10pub fn execute_with_parameters<S>(
22 statement: S,
23 query: Option<&SqlText<'_>>,
24 params: impl ParameterCollectionRef,
25) -> Result<Option<CursorImpl<S>>, Error>
26where
27 S: AsStatementRef,
28{
29 unsafe {
30 if let Some(statement) = bind_parameters(statement, params)? {
31 execute(statement, query)
32 } else {
33 Ok(None)
34 }
35 }
36}
37
38pub async fn execute_with_parameters_polling<S>(
40 statement: S,
41 query: Option<&SqlText<'_>>,
42 params: impl ParameterCollectionRef,
43 sleep: impl Sleep,
44) -> Result<Option<CursorPolling<S>>, Error>
45where
46 S: AsStatementRef,
47{
48 unsafe {
49 if let Some(statement) = bind_parameters(statement, params)? {
50 execute_polling(statement, query, sleep).await
51 } else {
52 Ok(None)
53 }
54 }
55}
56
57unsafe fn bind_parameters<S>(
58 mut statement: S,
59 mut params: impl ParameterCollectionRef,
60) -> Result<Option<S>, Error>
61where
62 S: AsStatementRef,
63{
64 unsafe {
65 let parameter_set_size = params.parameter_set_size();
66
67 let mut stmt = statement.as_stmt_ref();
68 stmt.reset_parameters().into_result(&stmt)?;
71 stmt.set_paramset_size(parameter_set_size)
72 .into_result(&stmt)?;
73 params.bind_parameters_to(&mut stmt)?;
75 Ok(Some(statement))
76 }
77}
78
79pub unsafe fn execute<S>(
85 mut statement: S,
86 query: Option<&SqlText<'_>>,
87) -> Result<Option<CursorImpl<S>>, Error>
88where
89 S: AsStatementRef,
90{
91 unsafe {
92 let mut stmt = statement.as_stmt_ref();
93 let result = if let Some(sql) = query {
94 stmt.exec_direct(sql)
96 } else {
97 stmt.execute()
99 };
100
101 let need_data = result
104 .on_success(|| false)
105 .on_no_data(|| false)
106 .on_need_data(|| true)
107 .into_result(&stmt)?;
108
109 if need_data {
110 while let Some(blob_ref) = next_blob_param(&mut stmt)? {
113 while let Some(batch) = blob_ref.next_batch().map_err(Error::FailedReadingInput)? {
115 stmt.put_binary_batch(batch).into_result(&stmt)?;
116 }
117 }
118 }
119
120 if stmt.num_result_cols().into_result(&stmt)? == 0 {
122 Ok(None)
123 } else {
124 let cursor = CursorImpl::new(statement);
126 Ok(Some(cursor))
127 }
128 }
129}
130
131pub async unsafe fn execute_polling<S>(
137 mut statement: S,
138 query: Option<&SqlText<'_>>,
139 mut sleep: impl Sleep,
140) -> Result<Option<CursorPolling<S>>, Error>
141where
142 S: AsStatementRef,
143{
144 unsafe {
145 let mut stmt = statement.as_stmt_ref();
146 let result = if let Some(sql) = query {
147 wait_for(|| stmt.exec_direct(sql), &mut sleep).await
149 } else {
150 wait_for(|| stmt.execute(), &mut sleep).await
152 };
153
154 let need_data = result
157 .on_success(|| false)
158 .on_no_data(|| false)
159 .on_need_data(|| true)
160 .into_result(&stmt)?;
161
162 if need_data {
163 while let Some(blob_ref) = next_blob_param(&mut stmt)? {
166 while let Some(batch) = blob_ref.next_batch().map_err(Error::FailedReadingInput)? {
168 let result = wait_for(|| stmt.put_binary_batch(batch), &mut sleep).await;
169 result.into_result(&stmt)?;
170 }
171 }
172 }
173
174 let num_result_cols = wait_for(|| stmt.num_result_cols(), &mut sleep)
176 .await
177 .into_result(&stmt)?;
178 if num_result_cols == 0 {
179 Ok(None)
180 } else {
181 let cursor = CursorPolling::new(statement);
183 Ok(Some(cursor))
184 }
185 }
186}
187
188unsafe fn next_blob_param<'a>(
189 stmt: &mut StatementRef<'a>,
190) -> Result<Option<&'a mut dyn Blob>, Error> {
191 let maybe_ptr = stmt.param_data().into_result(stmt)?;
192 if let Some(blob_ptr) = maybe_ptr {
193 let blob_ptr: *mut &mut dyn Blob = unsafe { transmute(blob_ptr) };
195 let blob_ref = unsafe { &mut *blob_ptr };
196 Ok(Some(*blob_ref))
197 } else {
198 Ok(None)
199 }
200}
201
202pub fn execute_columns<S>(
205 mut statement: S,
206 catalog_name: &SqlText,
207 schema_name: &SqlText,
208 table_name: &SqlText,
209 column_name: &SqlText,
210) -> Result<CursorImpl<S>, Error>
211where
212 S: AsStatementRef,
213{
214 let mut stmt = statement.as_stmt_ref();
215
216 stmt.columns(catalog_name, schema_name, table_name, column_name)
217 .into_result(&stmt)?;
218
219 debug_assert_ne!(stmt.num_result_cols().unwrap(), 0);
221
222 let cursor = unsafe { CursorImpl::new(statement) };
224 Ok(cursor)
225}
226
227pub fn execute_tables<S>(
230 mut statement: S,
231 catalog_name: &SqlText,
232 schema_name: &SqlText,
233 table_name: &SqlText,
234 column_name: &SqlText,
235) -> Result<CursorImpl<S>, Error>
236where
237 S: AsStatementRef,
238{
239 let mut stmt = statement.as_stmt_ref();
240
241 stmt.tables(catalog_name, schema_name, table_name, column_name)
242 .into_result(&stmt)?;
243
244 debug_assert_ne!(stmt.num_result_cols().unwrap(), 0);
246
247 let cursor = unsafe { CursorImpl::new(statement) };
249
250 Ok(cursor)
251}
252
253pub fn execute_foreign_keys<S>(
256 mut statement: S,
257 pk_catalog_name: &SqlText,
258 pk_schema_name: &SqlText,
259 pk_table_name: &SqlText,
260 fk_catalog_name: &SqlText,
261 fk_schema_name: &SqlText,
262 fk_table_name: &SqlText,
263) -> Result<CursorImpl<S>, Error>
264where
265 S: AsStatementRef,
266{
267 let mut stmt = statement.as_stmt_ref();
268
269 stmt.foreign_keys(
270 pk_catalog_name,
271 pk_schema_name,
272 pk_table_name,
273 fk_catalog_name,
274 fk_schema_name,
275 fk_table_name,
276 )
277 .into_result(&stmt)?;
278
279 debug_assert_ne!(stmt.num_result_cols().unwrap(), 0);
281
282 let cursor = unsafe { CursorImpl::new(statement) };
284
285 Ok(cursor)
286}