sibyl/stmt/nonblocking.rs
1//! Nonblocking SQL statement methods
2
3use super::{Statement, bind::Params, cols::{DEFAULT_LONG_BUFFER_SIZE, Columns}};
4use crate::{Result, oci::*, Session, Error, Rows, Cursor, ToSql, Row};
5use parking_lot::RwLock;
6use once_cell::sync::OnceCell;
7
8impl<'a> Statement<'a> {
9 /// Creates a new statement
10 pub(crate) async fn new(sql: &str, session: &'a Session<'a>) -> Result<Statement<'a>> {
11 let err = Handle::<OCIError>::new(session)?;
12 let stmt = futures::StmtPrepare::new(session.get_svc(), &err, sql).await?;
13 let params = Params::new(&stmt, &err)?.map(|params| RwLock::new(params));
14 let stmt = Self {session, svc: session.get_svc(), stmt, params, cols: OnceCell::new(), err, max_long: DEFAULT_LONG_BUFFER_SIZE};
15 stmt.set_prefetch_rows(10)?;
16 Ok(stmt)
17 }
18
19 /// Binds provided arguments to SQL parameter placeholders. Returns indexes of parameter placeholders for the OUT args.
20 fn bind_args(&self, args: &mut impl ToSql) -> Result<()> {
21 if let Some(params) = &self.params {
22 params.write().bind_args(&self.stmt, &self.err, args)
23 } else {
24 Ok(())
25 }
26 }
27
28 /// Executes the prepared statement. Returns the OCI result code from OCIStmtExecute.
29 async fn exec(&self, stmt_type: u16, args: &mut impl ToSql) -> Result<i32> {
30 self.bind_args(args)?;
31 futures::StmtExecute::new(self.svc.clone(), &self.err, &self.stmt, stmt_type).await
32 }
33
34 /**
35 Executes the prepared statement. Returns the number of rows affected.
36
37 # Parameters
38
39 * `args` - SQL statement arguments - a single argument or a tuple of arguments
40
41 Where each argument can be represented by:
42 - a value: `val` (IN)
43 - a reference: `&val` (IN)
44 - a mutable reference: `&mut val` (OUT or INOUT)
45 - a 2-item tuple where first item is a parameter name: `(":NAME", val)`
46
47 # Example
48
49 ```
50 # sibyl::block_on(async {
51 # let session = sibyl::test_env::get_session().await?;
52 let stmt = session.prepare("
53 INSERT INTO hr.departments
54 ( department_id, department_name, manager_id, location_id )
55 VALUES ( hr.departments_seq.nextval, :department_name, :manager_id
56 , (SELECT location_id FROM hr.locations WHERE city = :city)
57 )
58 RETURNING department_id INTO :department_id
59 ").await?;
60 let mut department_id = 0u32;
61
62 let num_updated_rows = stmt.execute((
63 ( ":DEPARTMENT_NAME", "Security" ),
64 ( ":MANAGER_ID", "" ),
65 ( ":CITY", "Seattle" ),
66 ( ":DEPARTMENT_ID", &mut department_id ),
67 )).await?;
68
69 assert_eq!(num_updated_rows, 1);
70 assert!(!stmt.is_null(":DEPARTMENT_ID")?);
71 assert!(department_id > 0);
72 # session.rollback().await?;
73 # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
74 ```
75 */
76 pub async fn execute(&self, mut args: impl ToSql) -> Result<usize> {
77 let stmt_type: u16 = self.get_attr(OCI_ATTR_STMT_TYPE)?;
78 if stmt_type == OCI_STMT_SELECT {
79 return Err( Error::new("Use `query` to execute SELECT") );
80 }
81 self.exec(stmt_type, &mut args).await?;
82 let num_rows = self.row_count()?;
83 if let Some(params) = &self.params {
84 if num_rows == 0 {
85 params.write().set_out_to_null();
86 }
87 params.read().update_out_args(&mut args)?;
88 }
89 Ok(num_rows)
90 }
91
92 /**
93 Executes the prepared statement. Returns "streaming iterator" over the returned rows.
94
95 # Parameters
96
97 * `args` - SQL statement arguments - a single argument or a tuple of arguments
98
99 Where each argument can be represented by:
100 - a value: `val` (IN)
101 - a reference: `&val` (IN)
102 - a 2-item tuple where first item is a parameter name: `(":NAME", val)`
103
104 # Example
105
106 ```
107 # use std::collections::HashMap;
108 # sibyl::block_on(async {
109 # let session = sibyl::test_env::get_session().await?;
110 let stmt = session.prepare("
111 SELECT employee_id, last_name, first_name
112 FROM hr.employees
113 WHERE manager_id = :id
114 ORDER BY employee_id
115 ").await?;
116 stmt.set_prefetch_rows(5)?;
117
118 let rows = stmt.query(103).await?; // 103 is Alexander Hunold
119
120 let mut subs = HashMap::new();
121 while let Some( row ) = rows.next().await? {
122 // EMPLOYEE_ID is NOT NULL, so we can safely unwrap it
123 let id : u32 = row.get(0)?;
124 // Same for the LAST_NAME.
125 // Note that `last_name` is retrieved as a slice. This is fast as it
126 // borrows directly from the column buffer, but it can only live until
127 // the end of the current scope, i.e. only during the lifetime of the
128 // current row.
129 let last_name : &str = row.get(1)?;
130 // FIRST_NAME is NULL-able...
131 let first_name : Option<&str> = row.get(2)?;
132 let name = first_name.map_or(last_name.to_string(),
133 |first_name| format!("{}, {}", last_name, first_name)
134 );
135 subs.insert(id, name);
136 }
137 assert_eq!(stmt.row_count()?, 4);
138 assert_eq!(subs.len(), 4);
139 assert!(subs.contains_key(&104), "Bruce Ernst");
140 assert!(subs.contains_key(&105), "David Austin");
141 assert!(subs.contains_key(&106), "Valli Pataballa");
142 assert!(subs.contains_key(&107), "Diana Lorentz");
143 # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
144 ```
145 */
146 pub async fn query(&'a self, mut args: impl ToSql) -> Result<Rows<'a>> {
147 let stmt_type: u16 = self.get_attr(OCI_ATTR_STMT_TYPE)?;
148 if stmt_type != OCI_STMT_SELECT {
149 return Err( Error::new("Use `execute` to execute statements other than SELECT") );
150 }
151 let res = self.exec(stmt_type, &mut args).await?;
152
153 if self.cols.get().is_none() {
154 let cols = Columns::new(Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), self.max_long)?;
155 self.cols.get_or_init(|| RwLock::new(cols));
156 }
157
158 match res {
159 OCI_SUCCESS | OCI_SUCCESS_WITH_INFO | OCI_NO_DATA => {
160 Ok( Rows::from_query(res, self) )
161 }
162 _ => Err( Error::oci(&self.err, res) )
163 }
164 }
165
166 /**
167 Convenience method to execute a query that returns a single rows.
168
169 If the query returns more than one row, `query_single` will return only the first
170 row and ignore the rest.
171
172 # Parameters
173
174 * `args` - SQL statement arguments - a single argument or a tuple of arguments
175
176 Where each argument can be represented by:
177 - a value: `val` (IN)
178 - a reference: `&val` (IN)
179 - a 2-item tuple where first item is a parameter name: `(":NAME", val)`
180
181 # Returns
182
183 - `None` - if query did not return any rows
184 - `Some(row) - a single row (even if query returned more than one row)
185
186 # Example
187
188 ```
189 use std::collections::HashMap;
190
191 # sibyl::block_on(async {
192 # let session = sibyl::test_env::get_session().await?;
193 let stmt = session.prepare("
194 SELECT country_id, state_province, city, postal_code, street_address
195 FROM hr.locations
196 WHERE location_id = :id
197 ").await?;
198
199 let row = stmt.query_single(1800).await?;
200
201 assert!(row.is_some());
202 let row = row.unwrap();
203 let country_id : &str = row.get(0)?;
204 let state_province : &str = row.get(1)?;
205 let city : &str = row.get(2)?;
206 let postal_code : &str = row.get(3)?;
207 let street_address : &str = row.get(4)?;
208 assert_eq!(country_id, "CA");
209 assert_eq!(state_province, "Ontario");
210 assert_eq!(city, "Toronto");
211 assert_eq!(postal_code, "M5V 2L7");
212 assert_eq!(street_address, "147 Spadina Ave");
213 # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
214 ```
215 */
216 pub async fn query_single(&'a self, mut args: impl ToSql) -> Result<Option<Row<'a>>> {
217 let stmt_type: u16 = self.get_attr(OCI_ATTR_STMT_TYPE)?;
218 if stmt_type != OCI_STMT_SELECT {
219 return Err( Error::new("Use `execute` to execute statements other than SELECT") );
220 }
221 self.set_prefetch_rows(1)?;
222 let res = self.exec(stmt_type, &mut args).await?;
223
224 if self.cols.get().is_none() {
225 let cols = Columns::new(Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), self.max_long)?;
226 self.cols.get_or_init(|| RwLock::new(cols));
227 }
228
229 match res {
230 OCI_NO_DATA => Ok(None),
231 OCI_SUCCESS | OCI_SUCCESS_WITH_INFO => Rows::from_query(res, self).single().await,
232 _ => Err( Error::oci(&self.err, res) )
233 }
234 }
235
236 /**
237 Retrieves a single implicit result (cursor) in the order in which they were returned
238 from the PL/SQL procedure or block. If no more results are available, then `None` is
239 returned.
240
241 PL/SQL provides a subprogram RETURN_RESULT in the DBMS_SQL package to return the result
242 of an executed statement. Only SELECT query result-sets can be implicitly returned by a
243 PL/SQL procedure or block.
244
245 `next_result` can be called iteratively by the application to retrieve each implicit
246 result from an executed PL/SQL statement. Applications retrieve each result-set sequentially
247 but can fetch rows from any result-set independently.
248
249 # Example
250
251 ```
252 use sibyl::Number;
253 use std::cmp::Ordering::Equal;
254
255 # sibyl::block_on(async {
256 # let session = sibyl::test_env::get_session().await?;
257 let stmt = session.prepare("
258 DECLARE
259 c1 SYS_REFCURSOR;
260 c2 SYS_REFCURSOR;
261 BEGIN
262 OPEN c1 FOR
263 SELECT department_name, first_name, last_name, salary
264 FROM (
265 SELECT first_name, last_name, salary, department_id
266 , ROW_NUMBER() OVER (ORDER BY salary) ord
267 FROM hr.employees
268 ) e
269 JOIN hr.departments d
270 ON d.department_id = e.department_id
271 WHERE ord = 1
272 ;
273 DBMS_SQL.RETURN_RESULT (c1);
274
275 OPEN c2 FOR
276 SELECT department_name, first_name, last_name, salary
277 FROM (
278 SELECT first_name, last_name, salary, department_id
279 , MEDIAN(salary) OVER () median_salary
280 FROM hr.employees
281 ) e
282 JOIN hr.departments d
283 ON d.department_id = e.department_id
284 WHERE salary = median_salary
285 ORDER BY department_name, last_name, first_name
286 ;
287 DBMS_SQL.RETURN_RESULT (c2);
288 END;
289 ").await?;
290 let expected_lowest_salary = Number::from_int(2100, &session)?;
291 let expected_median_salary = Number::from_int(6200, &session)?;
292
293 stmt.execute(()).await?;
294
295 let lowest_payed_employee = stmt.next_result().await?.unwrap();
296
297 let rows = lowest_payed_employee.rows().await?;
298 let row = rows.next().await?.unwrap();
299
300 let department_name : &str = row.get(0)?;
301 let first_name : &str = row.get(1)?;
302 let last_name : &str = row.get(2)?;
303 let salary : Number = row.get(3)?;
304
305 assert_eq!(department_name, "Shipping");
306 assert_eq!(first_name, "TJ");
307 assert_eq!(last_name, "Olson");
308 assert_eq!(salary.compare(&expected_lowest_salary)?, Equal);
309
310 let row = rows.next().await?;
311 assert!(row.is_none());
312
313 let median_salary_employees = stmt.next_result().await?.unwrap();
314
315 let rows = median_salary_employees.rows().await?;
316
317 let row = rows.next().await?.unwrap();
318 let department_name : &str = row.get(0)?;
319 let first_name : &str = row.get(1)?;
320 let last_name : &str = row.get(2)?;
321 let salary : Number = row.get(3)?;
322
323 assert_eq!(department_name, "Sales");
324 assert_eq!(first_name, "Amit");
325 assert_eq!(last_name, "Banda");
326 assert_eq!(salary.compare(&expected_median_salary)?, Equal);
327
328 let row = rows.next().await?.unwrap();
329
330 let department_name : &str = row.get(0)?;
331 let first_name : &str = row.get(1)?;
332 let last_name : &str = row.get(2)?;
333 let salary : Number = row.get(3)?;
334
335 assert_eq!(department_name, "Sales");
336 assert_eq!(first_name, "Charles");
337 assert_eq!(last_name, "Johnson");
338 assert_eq!(salary.compare(&expected_median_salary)?, Equal);
339
340 let row = rows.next().await?;
341 assert!(row.is_none());
342
343 assert!(stmt.next_result().await?.is_none());
344 # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
345 ```
346 */
347 pub async fn next_result(&'a self) -> Result<Option<Cursor<'a>>> {
348 let res = futures::StmtGetNextResult::new(self.svc.clone(), &self.stmt, &self.err).await?;
349 if let Some(stmt) = res {
350 Ok(Some(Cursor::implicit(stmt, self)))
351 } else {
352 Ok(None)
353 }
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use crate::*;
360
361 #[test]
362 fn async_query() -> Result<()> {
363 block_on(async {
364 let session = crate::test_env::get_session().await?;
365 let stmt = session.prepare("
366 SELECT employee_id
367 FROM (
368 SELECT employee_id
369 , row_number() OVER (ORDER BY hire_date) AS hire_date_rank
370 FROM hr.employees
371 )
372 WHERE hire_date_rank = 1
373 ").await?;
374 let row = stmt.query_single(()).await?.unwrap();
375 let id : usize = row.get(0)?;
376 assert_eq!(id, 102);
377
378 Ok(())
379 })
380 }
381
382 #[test]
383 fn plsql_args() -> std::result::Result<(),Box<dyn std::error::Error>> {
384 block_on(async {
385 let session = crate::test_env::get_session().await?;
386
387 let stmt = session.prepare("
388 BEGIN
389 SELECT city, street_address
390 INTO :city, :street_address
391 FROM hr.locations
392 WHERE location_id = :location_id;
393 EXCEPTION
394 WHEN NO_DATA_FOUND THEN
395 :city := 'Unknown';
396 :street_address := NULL;
397 END;
398 ").await?;
399 let mut city = String::with_capacity(30);
400 let mut addr = String::with_capacity(40);
401 let num_rows = stmt.execute((
402 ( ":LOCATION_ID", 2500 ),
403 ( ":CITY", &mut city ),
404 ( ":STREET_ADDRESS", &mut addr )
405 )).await?;
406
407 assert_eq!(num_rows, 1);
408 assert!(!stmt.is_null(":CITY")?);
409 assert_eq!(city, "Oxford");
410 assert!(!stmt.is_null(":STREET_ADDRESS")?);
411 assert_eq!(addr, "Magdalen Centre, The Oxford Science Park");
412
413 let num_rows = stmt.execute((
414 ( ":LOCATION_ID", 2400 ),
415 ( ":CITY", &mut city ),
416 ( ":STREET_ADDRESS", &mut addr )
417 )).await?;
418
419 assert!(num_rows > 0);
420 assert!(!stmt.is_null(":CITY")?);
421 assert_eq!(city, "London");
422 assert!(!stmt.is_null(":STREET_ADDRESS")?);
423 assert_eq!(addr, "8204 Arthur St");
424
425 let num_rows = stmt.execute((
426 ( ":LOCATION_ID", 2200 ),
427 ( ":CITY", &mut city ),
428 ( ":STREET_ADDRESS", &mut addr )
429 )).await?;
430
431 assert_eq!(num_rows, 2);
432 assert!(!stmt.is_null(":CITY")?);
433 assert_eq!(city, "Sydney");
434 assert!(!stmt.is_null(":STREET_ADDRESS")?);
435 assert_eq!(addr, "12-98 Victoria Street");
436
437 let num_rows = stmt.execute((
438 ( ":LOCATION_ID", 3300 ),
439 ( ":CITY", &mut city ),
440 ( ":STREET_ADDRESS", &mut addr )
441 )).await?;
442
443 assert_eq!(num_rows, 2);
444 assert!(!stmt.is_null(":CITY")?);
445 assert_eq!(city, "Unknown");
446 assert!(stmt.is_null(":STREET_ADDRESS")?);
447
448 session.rollback().await?;
449 Ok(())
450 })
451 }
452
453 #[test]
454 fn plsql_one_out_arg() -> std::result::Result<(),Box<dyn std::error::Error>> {
455 block_on(async {
456 let session = crate::test_env::get_session().await?;
457
458 let stmt = session.prepare("
459 BEGIN
460 SELECT street_address
461 INTO :addr
462 FROM hr.locations
463 WHERE location_id = :id;
464 END;
465 ").await?;
466
467 let mut addr = String::with_capacity(40);
468
469 let num_rows = stmt.execute((
470 ( ":ID", 2500 ),
471 ( ":ADDR", &mut addr )
472 )).await?;
473
474 assert!(num_rows > 0);
475 assert!(!stmt.is_null(":ADDR")?);
476 assert_eq!(addr, "Magdalen Centre, The Oxford Science Park");
477
478 let num_rows = stmt.execute((
479 ( ":ID", 2400 ),
480 ( ":ADDR", &mut addr )
481 )).await?;
482
483 assert!(num_rows > 0);
484 assert!(!stmt.is_null(":ADDR")?);
485 assert_eq!(addr, "8204 Arthur St");
486
487 let num_rows = stmt.execute((
488 ( ":ID", 2200 ),
489 ( ":ADDR", &mut addr )
490 )).await?;
491
492 assert!(num_rows > 0);
493 assert!(!stmt.is_null(":ADDR")?);
494 assert_eq!(addr, "12-98 Victoria Street");
495
496 let num_rows = stmt.execute((
497 ( ":ID", 3300 ),
498 ( ":ADDR", &mut addr )
499 )).await?;
500
501 assert_eq!(num_rows, 0);
502 assert!(stmt.is_null(":ADDR")?);
503
504 let num_rows = stmt.execute((
505 ( ":ID", 2200 ),
506 ( ":ADDR", &mut addr )
507 )).await?;
508
509 assert!(num_rows > 0);
510 assert!(!stmt.is_null(":ADDR")?);
511 assert_eq!(addr, "12-98 Victoria Street");
512
513 let num_rows = stmt.execute((
514 ( ":ID", 2500 ),
515 ( ":ADDR", &mut addr )
516 )).await?;
517
518 assert!(num_rows > 0);
519 assert!(!stmt.is_null(":ADDR")?);
520 assert_eq!(addr, "Magdalen Centre, The Oxford Science Park");
521
522 session.rollback().await?;
523 Ok(())
524 })
525 }
526
527 #[test]
528 fn update_many_rows() -> std::result::Result<(),Box<dyn std::error::Error>> {
529 block_on(async {
530 let session = crate::test_env::get_session().await?;
531
532 let stmt = session.prepare("
533 UPDATE hr.employees
534 SET salary = Round(salary * :rate, -2)
535 WHERE manager_id = :manager_id
536 ").await?;
537
538 let num_rows = stmt.execute((
539 (":MANAGER_ID", 103 ),
540 (":RATE", 1.02 ),
541 )).await?;
542 assert_eq!(num_rows, 4);
543
544 let num_rows = stmt.execute((
545 (":MANAGER_ID", 108 ),
546 (":RATE", 1.03 ),
547 )).await?;
548 assert_eq!(num_rows, 5);
549
550 session.rollback().await?;
551 Ok(())
552 })
553 }
554
555 #[test]
556 fn single_row_query() -> Result<()> {
557 block_on(async {
558 let session = crate::test_env::get_session().await?;
559
560 let stmt = session.prepare("
561 SELECT country_id, state_province, city, postal_code, street_address
562 FROM hr.locations
563 WHERE location_id = :id
564 ").await?;
565 let row = stmt.query_single(1800).await?;
566 assert!(row.is_some());
567 let row = row.unwrap();
568 let country_id : &str = row.get(0)?;
569 let state_province : &str = row.get(1)?;
570 let city : &str = row.get(2)?;
571 let postal_code : &str = row.get(3)?;
572 let street_address : &str = row.get(4)?;
573 assert_eq!(country_id, "CA");
574 assert_eq!(state_province, "Ontario");
575 assert_eq!(city, "Toronto");
576 assert_eq!(postal_code, "M5V 2L7");
577 assert_eq!(street_address, "147 Spadina Ave");
578
579 let stmt = session.prepare("
580 SELECT location_id, state_province, city, postal_code, street_address
581 FROM hr.locations
582 WHERE country_id = :country_id
583 ORDER BY location_id
584 ").await?;
585 let row = stmt.query_single("CA").await?;
586 assert!(row.is_some());
587 let row = row.unwrap();
588 let location_id : u16 = row.get(0)?;
589 let state_province : &str = row.get(1)?;
590 let city : &str = row.get(2)?;
591 let postal_code : &str = row.get(3)?;
592 let street_address : &str = row.get(4)?;
593 assert_eq!(location_id, 1800);
594 assert_eq!(state_province, "Ontario");
595 assert_eq!(city, "Toronto");
596 assert_eq!(postal_code, "M5V 2L7");
597 assert_eq!(street_address, "147 Spadina Ave");
598
599 Ok(())
600 })
601 }
602}