sibyl/stmt/
nonblocking.rs

1//! Nonblocking SQL statement methods
2
3use super::{Statement, bind::Params, cols::{DEFAULT_LONG_BUFFER_SIZE, Columns}};
4use crate::{Result, oci::*, Session, Error, Rows, Cursor, ToSql, Row};
5use parking_lot::RwLock;
6use once_cell::sync::OnceCell;
7
8impl<'a> Statement<'a> {
9    /// Creates a new statement
10    pub(crate) async fn new(sql: &str, session: &'a Session<'a>) -> Result<Statement<'a>> {
11        let err = Handle::<OCIError>::new(session)?;
12        let stmt = futures::StmtPrepare::new(session.get_svc(), &err, sql).await?;
13        let params = Params::new(&stmt, &err)?.map(|params| RwLock::new(params));
14        let stmt = Self {session, svc: session.get_svc(), stmt, params, cols: OnceCell::new(), err, max_long: DEFAULT_LONG_BUFFER_SIZE};
15        stmt.set_prefetch_rows(10)?;
16        Ok(stmt)
17    }
18
19    /// Binds provided arguments to SQL parameter placeholders. Returns indexes of parameter placeholders for the OUT args.
20    fn bind_args(&self, args: &mut impl ToSql) -> Result<()> {
21        if let Some(params) = &self.params {
22            params.write().bind_args(&self.stmt, &self.err, args)
23        } else {
24            Ok(())
25        }
26    }
27
28    /// Executes the prepared statement. Returns the OCI result code from OCIStmtExecute.
29    async fn exec(&self, stmt_type: u16, args: &mut impl ToSql) -> Result<i32> {
30        self.bind_args(args)?;
31        futures::StmtExecute::new(self.svc.clone(), &self.err, &self.stmt, stmt_type).await
32    }
33
34    /**
35    Executes the prepared statement. Returns the number of rows affected.
36
37    # Parameters
38
39    * `args` - SQL statement arguments - a single argument or a tuple of arguments
40
41    Where each argument can be represented by:
42    - a value: `val` (IN)
43    - a reference: `&val` (IN)
44    - a mutable reference: `&mut val` (OUT or INOUT)
45    - a 2-item tuple where first item is a parameter name: `(":NAME", val)`
46
47    # Example
48
49    ```
50    # sibyl::block_on(async {
51    # let session = sibyl::test_env::get_session().await?;
52    let stmt = session.prepare("
53        INSERT INTO hr.departments
54               ( department_id, department_name, manager_id, location_id )
55        VALUES ( hr.departments_seq.nextval, :department_name, :manager_id
56               , (SELECT location_id FROM hr.locations WHERE city = :city)
57               )
58        RETURNING department_id INTO :department_id
59    ").await?;
60    let mut department_id = 0u32;
61
62    let num_updated_rows = stmt.execute((
63        ( ":DEPARTMENT_NAME", "Security"         ),
64        ( ":MANAGER_ID",      ""                 ),
65        ( ":CITY",            "Seattle"          ),
66        ( ":DEPARTMENT_ID",   &mut department_id ),
67    )).await?;
68
69    assert_eq!(num_updated_rows, 1);
70    assert!(!stmt.is_null(":DEPARTMENT_ID")?);
71    assert!(department_id > 0);
72    # session.rollback().await?;
73    # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
74    ```
75    */
76    pub async fn execute(&self, mut args: impl ToSql) -> Result<usize> {
77        let stmt_type: u16 = self.get_attr(OCI_ATTR_STMT_TYPE)?;
78        if stmt_type == OCI_STMT_SELECT {
79            return Err( Error::new("Use `query` to execute SELECT") );
80        }
81        self.exec(stmt_type, &mut args).await?;
82        let num_rows = self.row_count()?;
83        if let Some(params) = &self.params {
84            if num_rows == 0 {
85                params.write().set_out_to_null();
86            }
87            params.read().update_out_args(&mut args)?;
88        }
89        Ok(num_rows)
90    }
91
92    /**
93    Executes the prepared statement. Returns "streaming iterator" over the returned rows.
94
95    # Parameters
96
97    * `args` - SQL statement arguments - a single argument or a tuple of arguments
98
99    Where each argument can be represented by:
100    - a value: `val` (IN)
101    - a reference: `&val` (IN)
102    - a 2-item tuple where first item is a parameter name: `(":NAME", val)`
103
104    # Example
105
106    ```
107    # use std::collections::HashMap;
108    # sibyl::block_on(async {
109    # let session = sibyl::test_env::get_session().await?;
110    let stmt = session.prepare("
111        SELECT employee_id, last_name, first_name
112          FROM hr.employees
113         WHERE manager_id = :id
114      ORDER BY employee_id
115    ").await?;
116    stmt.set_prefetch_rows(5)?;
117
118    let rows = stmt.query(103).await?; // 103 is Alexander Hunold
119
120    let mut subs = HashMap::new();
121    while let Some( row ) = rows.next().await? {
122        // EMPLOYEE_ID is NOT NULL, so we can safely unwrap it
123        let id : u32 = row.get(0)?;
124        // Same for the LAST_NAME.
125        // Note that `last_name` is retrieved as a slice. This is fast as it
126        // borrows directly from the column buffer, but it can only live until
127        // the end of the current scope, i.e. only during the lifetime of the
128        // current row.
129        let last_name : &str = row.get(1)?;
130        // FIRST_NAME is NULL-able...
131        let first_name : Option<&str> = row.get(2)?;
132        let name = first_name.map_or(last_name.to_string(),
133            |first_name| format!("{}, {}", last_name, first_name)
134        );
135        subs.insert(id, name);
136    }
137    assert_eq!(stmt.row_count()?, 4);
138    assert_eq!(subs.len(), 4);
139    assert!(subs.contains_key(&104), "Bruce Ernst");
140    assert!(subs.contains_key(&105), "David Austin");
141    assert!(subs.contains_key(&106), "Valli Pataballa");
142    assert!(subs.contains_key(&107), "Diana Lorentz");
143    # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
144    ```
145    */
146    pub async fn query(&'a self, mut args: impl ToSql) -> Result<Rows<'a>> {
147        let stmt_type: u16 = self.get_attr(OCI_ATTR_STMT_TYPE)?;
148        if stmt_type != OCI_STMT_SELECT {
149            return Err( Error::new("Use `execute` to execute statements other than SELECT") );
150        }
151        let res = self.exec(stmt_type, &mut args).await?;
152
153        if self.cols.get().is_none() {
154            let cols = Columns::new(Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), self.max_long)?;
155            self.cols.get_or_init(|| RwLock::new(cols));
156        }
157
158        match res {
159            OCI_SUCCESS | OCI_SUCCESS_WITH_INFO | OCI_NO_DATA => {
160                Ok( Rows::from_query(res, self) )
161            }
162            _ => Err( Error::oci(&self.err, res) )
163        }
164    }
165
166    /**
167    Convenience method to execute a query that returns a single rows.
168
169    If the query returns more than one row, `query_single` will return only the first
170    row and ignore the rest.
171
172    # Parameters
173
174    * `args` - SQL statement arguments - a single argument or a tuple of arguments
175
176    Where each argument can be represented by:
177    - a value: `val` (IN)
178    - a reference: `&val` (IN)
179    - a 2-item tuple where first item is a parameter name: `(":NAME", val)`
180
181    # Returns
182
183    - `None` - if query did not return any rows
184    - `Some(row) - a single row (even if query returned more than one row)
185
186    # Example
187
188    ```
189    use std::collections::HashMap;
190
191    # sibyl::block_on(async {
192    # let session = sibyl::test_env::get_session().await?;
193    let stmt = session.prepare("
194        SELECT country_id, state_province, city, postal_code, street_address
195          FROM hr.locations
196         WHERE location_id = :id
197    ").await?;
198
199    let row = stmt.query_single(1800).await?;
200
201    assert!(row.is_some());
202    let row = row.unwrap();
203    let country_id     : &str = row.get(0)?;
204    let state_province : &str = row.get(1)?;
205    let city           : &str = row.get(2)?;
206    let postal_code    : &str = row.get(3)?;
207    let street_address : &str = row.get(4)?;
208    assert_eq!(country_id, "CA");
209    assert_eq!(state_province, "Ontario");
210    assert_eq!(city, "Toronto");
211    assert_eq!(postal_code, "M5V 2L7");
212    assert_eq!(street_address, "147 Spadina Ave");
213    # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
214    ```
215    */
216    pub async fn query_single(&'a self, mut args: impl ToSql) -> Result<Option<Row<'a>>> {
217        let stmt_type: u16 = self.get_attr(OCI_ATTR_STMT_TYPE)?;
218        if stmt_type != OCI_STMT_SELECT {
219            return Err( Error::new("Use `execute` to execute statements other than SELECT") );
220        }
221        self.set_prefetch_rows(1)?;
222        let res = self.exec(stmt_type, &mut args).await?;
223
224        if self.cols.get().is_none() {
225            let cols = Columns::new(Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), Ptr::from(self.as_ref()), self.max_long)?;
226            self.cols.get_or_init(|| RwLock::new(cols));
227        }
228
229        match res {
230            OCI_NO_DATA => Ok(None),
231            OCI_SUCCESS | OCI_SUCCESS_WITH_INFO => Rows::from_query(res, self).single().await,
232            _ => Err( Error::oci(&self.err, res) )
233        }
234    }
235
236    /**
237    Retrieves a single implicit result (cursor) in the order in which they were returned
238    from the PL/SQL procedure or block. If no more results are available, then `None` is
239    returned.
240
241    PL/SQL provides a subprogram RETURN_RESULT in the DBMS_SQL package to return the result
242    of an executed statement. Only SELECT query result-sets can be implicitly returned by a
243    PL/SQL procedure or block.
244
245    `next_result` can be called iteratively by the application to retrieve each implicit
246    result from an executed PL/SQL statement. Applications retrieve each result-set sequentially
247    but can fetch rows from any result-set independently.
248
249    # Example
250
251    ```
252    use sibyl::Number;
253    use std::cmp::Ordering::Equal;
254
255    # sibyl::block_on(async {
256    # let session = sibyl::test_env::get_session().await?;
257    let stmt = session.prepare("
258        DECLARE
259            c1 SYS_REFCURSOR;
260            c2 SYS_REFCURSOR;
261        BEGIN
262            OPEN c1 FOR
263                SELECT department_name, first_name, last_name, salary
264                  FROM (
265                        SELECT first_name, last_name, salary, department_id
266                             , ROW_NUMBER() OVER (ORDER BY salary) ord
267                          FROM hr.employees
268                       ) e
269                  JOIN hr.departments d
270                    ON d.department_id = e.department_id
271                 WHERE ord = 1
272            ;
273            DBMS_SQL.RETURN_RESULT (c1);
274
275            OPEN c2 FOR
276                SELECT department_name, first_name, last_name, salary
277                  FROM (
278                        SELECT first_name, last_name, salary, department_id
279                             , MEDIAN(salary) OVER () median_salary
280                          FROM hr.employees
281                       ) e
282                  JOIN hr.departments d
283                    ON d.department_id = e.department_id
284                 WHERE salary = median_salary
285              ORDER BY department_name, last_name, first_name
286            ;
287            DBMS_SQL.RETURN_RESULT (c2);
288        END;
289    ").await?;
290    let expected_lowest_salary = Number::from_int(2100, &session)?;
291    let expected_median_salary = Number::from_int(6200, &session)?;
292
293    stmt.execute(()).await?;
294
295    let lowest_payed_employee = stmt.next_result().await?.unwrap();
296
297    let rows = lowest_payed_employee.rows().await?;
298    let row = rows.next().await?.unwrap();
299
300    let department_name : &str = row.get(0)?;
301    let first_name : &str = row.get(1)?;
302    let last_name : &str = row.get(2)?;
303    let salary : Number = row.get(3)?;
304
305    assert_eq!(department_name, "Shipping");
306    assert_eq!(first_name, "TJ");
307    assert_eq!(last_name, "Olson");
308    assert_eq!(salary.compare(&expected_lowest_salary)?, Equal);
309
310    let row = rows.next().await?;
311    assert!(row.is_none());
312
313    let median_salary_employees = stmt.next_result().await?.unwrap();
314
315    let rows = median_salary_employees.rows().await?;
316
317    let row = rows.next().await?.unwrap();
318    let department_name : &str = row.get(0)?;
319    let first_name : &str = row.get(1)?;
320    let last_name : &str = row.get(2)?;
321    let salary : Number = row.get(3)?;
322
323    assert_eq!(department_name, "Sales");
324    assert_eq!(first_name, "Amit");
325    assert_eq!(last_name, "Banda");
326    assert_eq!(salary.compare(&expected_median_salary)?, Equal);
327
328    let row = rows.next().await?.unwrap();
329
330    let department_name : &str = row.get(0)?;
331    let first_name : &str = row.get(1)?;
332    let last_name : &str = row.get(2)?;
333    let salary : Number = row.get(3)?;
334
335    assert_eq!(department_name, "Sales");
336    assert_eq!(first_name, "Charles");
337    assert_eq!(last_name, "Johnson");
338    assert_eq!(salary.compare(&expected_median_salary)?, Equal);
339
340    let row = rows.next().await?;
341    assert!(row.is_none());
342
343    assert!(stmt.next_result().await?.is_none());
344    # Ok::<(),sibyl::Error>(()) }).expect("Ok from async");
345    ```
346    */
347    pub async fn next_result(&'a self) -> Result<Option<Cursor<'a>>> {
348        let res = futures::StmtGetNextResult::new(self.svc.clone(), &self.stmt, &self.err).await?;
349        if let Some(stmt) = res {
350            Ok(Some(Cursor::implicit(stmt, self)))
351        } else {
352            Ok(None)
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use crate::*;
360
361    #[test]
362    fn async_query() -> Result<()> {
363        block_on(async {
364            let session = crate::test_env::get_session().await?;
365            let stmt = session.prepare("
366                SELECT employee_id
367                  FROM (
368                        SELECT employee_id
369                             , row_number() OVER (ORDER BY hire_date) AS hire_date_rank
370                          FROM hr.employees
371                       )
372                 WHERE hire_date_rank = 1
373            ").await?;
374            let row = stmt.query_single(()).await?.unwrap();
375            let id : usize = row.get(0)?;
376            assert_eq!(id, 102);
377
378            Ok(())
379        })
380    }
381
382    #[test]
383    fn plsql_args() -> std::result::Result<(),Box<dyn std::error::Error>> {
384        block_on(async {
385            let session = crate::test_env::get_session().await?;
386
387            let stmt = session.prepare("
388                BEGIN
389                    SELECT city, street_address
390                      INTO :city, :street_address
391                      FROM hr.locations
392                     WHERE location_id = :location_id;
393                EXCEPTION
394                    WHEN NO_DATA_FOUND THEN
395                        :city := 'Unknown';
396                        :street_address := NULL;
397                END;
398            ").await?;
399            let mut city = String::with_capacity(30);
400            let mut addr = String::with_capacity(40);
401            let num_rows = stmt.execute((
402                ( ":LOCATION_ID",    2500      ),
403                ( ":CITY",           &mut city ),
404                ( ":STREET_ADDRESS", &mut addr )
405            )).await?;
406
407            assert_eq!(num_rows, 1);
408            assert!(!stmt.is_null(":CITY")?);
409            assert_eq!(city, "Oxford");
410            assert!(!stmt.is_null(":STREET_ADDRESS")?);
411            assert_eq!(addr, "Magdalen Centre, The Oxford Science Park");
412
413            let num_rows = stmt.execute((
414                ( ":LOCATION_ID",    2400      ),
415                ( ":CITY",           &mut city ),
416                ( ":STREET_ADDRESS", &mut addr )
417            )).await?;
418
419            assert!(num_rows > 0);
420            assert!(!stmt.is_null(":CITY")?);
421            assert_eq!(city, "London");
422            assert!(!stmt.is_null(":STREET_ADDRESS")?);
423            assert_eq!(addr, "8204 Arthur St");
424
425            let num_rows = stmt.execute((
426                ( ":LOCATION_ID",    2200      ),
427                ( ":CITY",           &mut city ),
428                ( ":STREET_ADDRESS", &mut addr )
429            )).await?;
430
431            assert_eq!(num_rows, 2);
432            assert!(!stmt.is_null(":CITY")?);
433            assert_eq!(city, "Sydney");
434            assert!(!stmt.is_null(":STREET_ADDRESS")?);
435            assert_eq!(addr, "12-98 Victoria Street");
436
437            let num_rows = stmt.execute((
438                ( ":LOCATION_ID",    3300      ),
439                ( ":CITY",           &mut city ),
440                ( ":STREET_ADDRESS", &mut addr )
441            )).await?;
442
443            assert_eq!(num_rows, 2);
444            assert!(!stmt.is_null(":CITY")?);
445            assert_eq!(city, "Unknown");
446            assert!(stmt.is_null(":STREET_ADDRESS")?);
447
448            session.rollback().await?;
449            Ok(())
450        })
451    }
452
453    #[test]
454    fn plsql_one_out_arg() -> std::result::Result<(),Box<dyn std::error::Error>> {
455        block_on(async {
456            let session = crate::test_env::get_session().await?;
457
458            let stmt = session.prepare("
459                BEGIN
460                    SELECT street_address
461                      INTO :addr
462                      FROM hr.locations
463                     WHERE location_id = :id;
464                END;
465            ").await?;
466
467            let mut addr = String::with_capacity(40);
468
469            let num_rows = stmt.execute((
470                ( ":ID",   2500      ),
471                ( ":ADDR", &mut addr )
472            )).await?;
473
474            assert!(num_rows > 0);
475            assert!(!stmt.is_null(":ADDR")?);
476            assert_eq!(addr, "Magdalen Centre, The Oxford Science Park");
477
478            let num_rows = stmt.execute((
479                ( ":ID",   2400      ),
480                ( ":ADDR", &mut addr )
481            )).await?;
482
483            assert!(num_rows > 0);
484            assert!(!stmt.is_null(":ADDR")?);
485            assert_eq!(addr, "8204 Arthur St");
486
487            let num_rows = stmt.execute((
488                ( ":ID",   2200      ),
489                ( ":ADDR", &mut addr )
490            )).await?;
491
492            assert!(num_rows > 0);
493            assert!(!stmt.is_null(":ADDR")?);
494            assert_eq!(addr, "12-98 Victoria Street");
495
496            let num_rows = stmt.execute((
497                ( ":ID",   3300      ),
498                ( ":ADDR", &mut addr )
499            )).await?;
500
501            assert_eq!(num_rows, 0);
502            assert!(stmt.is_null(":ADDR")?);
503
504            let num_rows = stmt.execute((
505                ( ":ID",   2200      ),
506                ( ":ADDR", &mut addr )
507            )).await?;
508
509            assert!(num_rows > 0);
510            assert!(!stmt.is_null(":ADDR")?);
511            assert_eq!(addr, "12-98 Victoria Street");
512
513            let num_rows = stmt.execute((
514                ( ":ID",   2500      ),
515                ( ":ADDR", &mut addr )
516            )).await?;
517
518            assert!(num_rows > 0);
519            assert!(!stmt.is_null(":ADDR")?);
520            assert_eq!(addr, "Magdalen Centre, The Oxford Science Park");
521
522            session.rollback().await?;
523            Ok(())
524        })
525    }
526
527    #[test]
528    fn update_many_rows() -> std::result::Result<(),Box<dyn std::error::Error>> {
529        block_on(async {
530            let session = crate::test_env::get_session().await?;
531
532            let stmt = session.prepare("
533                UPDATE hr.employees
534                   SET salary = Round(salary * :rate, -2)
535                 WHERE manager_id = :manager_id
536            ").await?;
537
538            let num_rows = stmt.execute((
539                (":MANAGER_ID", 103  ),
540                (":RATE",       1.02 ),
541            )).await?;
542            assert_eq!(num_rows, 4);
543
544            let num_rows = stmt.execute((
545                (":MANAGER_ID", 108  ),
546                (":RATE",       1.03 ),
547            )).await?;
548            assert_eq!(num_rows, 5);
549
550            session.rollback().await?;
551            Ok(())
552        })
553    }
554
555    #[test]
556    fn single_row_query() -> Result<()> {
557        block_on(async {
558            let session = crate::test_env::get_session().await?;
559
560            let stmt = session.prepare("
561                SELECT country_id, state_province, city, postal_code, street_address
562                  FROM hr.locations
563                 WHERE location_id = :id
564            ").await?;
565            let row = stmt.query_single(1800).await?;
566            assert!(row.is_some());
567            let row = row.unwrap();
568            let country_id     : &str = row.get(0)?;
569            let state_province : &str = row.get(1)?;
570            let city           : &str = row.get(2)?;
571            let postal_code    : &str = row.get(3)?;
572            let street_address : &str = row.get(4)?;
573            assert_eq!(country_id, "CA");
574            assert_eq!(state_province, "Ontario");
575            assert_eq!(city, "Toronto");
576            assert_eq!(postal_code, "M5V 2L7");
577            assert_eq!(street_address, "147 Spadina Ave");
578
579            let stmt = session.prepare("
580                SELECT location_id, state_province, city, postal_code, street_address
581                  FROM hr.locations
582                 WHERE country_id = :country_id
583              ORDER BY location_id
584            ").await?;
585            let row = stmt.query_single("CA").await?;
586            assert!(row.is_some());
587            let row = row.unwrap();
588            let location_id    : u16  = row.get(0)?;
589            let state_province : &str = row.get(1)?;
590            let city           : &str = row.get(2)?;
591            let postal_code    : &str = row.get(3)?;
592            let street_address : &str = row.get(4)?;
593            assert_eq!(location_id, 1800);
594            assert_eq!(state_province, "Ontario");
595            assert_eq!(city, "Toronto");
596            assert_eq!(postal_code, "M5V 2L7");
597            assert_eq!(street_address, "147 Spadina Ave");
598
599            Ok(())
600        })
601    }
602}