oracle_nosql_rust_sdk/
query_request.rs

1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5//  https://oss.oracle.com/licenses/upl/
6//
7use crate::error::ia_err;
8use crate::error::NoSQLError;
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::plan_iter::{deserialize_plan_iter, PlanIterKind, PlanIterState};
13use crate::prepared_statement::PreparedStatement;
14use crate::reader::Reader;
15use crate::receive_iter::ReceiveIterData;
16use crate::types::NoSQLColumnToFieldValue;
17use crate::types::{Capacity, Consistency, FieldType, FieldValue, MapValue, OpCode, TopologyInfo};
18use crate::writer::Writer;
19
20use std::collections::HashMap;
21use std::result::Result;
22use std::time::Duration;
23use tracing::trace;
24
25/// Encapsulates a SQL query of a NoSQL Database table.
26///
27/// A query may be either a string query
28/// statement or a prepared query, which may include bind variables.
29/// A query request cannot have both a string statement and prepared query, but
30/// it must have one or the other.
31///
32/// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
33///
34/// ## Simple Example
35/// Here is a simple example of running a query that will return every row in a table named `users`:
36///
37/// ```no_run
38/// # use oracle_nosql_rust_sdk::{Handle, QueryRequest};
39/// # #[tokio::main]
40/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
41/// let handle = Handle::builder().build().await?;
42/// let results = QueryRequest::new("select * from users")
43///               .execute(&handle).await?;
44/// for row in results.rows() {
45///     println!("Row = {}", row);
46/// }
47/// # Ok(())
48/// # }
49/// ```
50///
51/// For performance reasons, prepared queries are preferred for queries that may
52/// be reused. Prepared queries bypass compilation of the query. They also allow
53/// for parameterized queries using bind variables.
54#[derive(Default, Debug)]
55pub struct QueryRequest {
56    pub(crate) prepare_only: bool,
57    //pub(crate) limit: u32,
58    pub(crate) max_read_kb: u32,
59    pub(crate) max_write_kb: u32,
60    pub(crate) consistency: Consistency,
61    pub(crate) timeout: Option<Duration>,
62    pub(crate) compartment_id: String,
63
64    // max_memory_consumption specifies the maximum amount of memory in bytes that
65    // may be consumed by the query at the client for operations such as
66    // duplicate elimination (which may be required due to the use of an index
67    // on an array or map) and sorting. Such operations may consume a lot of
68    // memory as they need to cache the full result set or a large subset of
69    // it at the client memory.
70    //
71    // The default value is 1GB (1,000,000,000).
72    // TODO pub max_memory_consumption: i64,
73
74    // Durability is currently only used in On-Prem installations.
75    // This setting only applies if the query modifies
76    // a row using an INSERT, UPSERT, or DELETE statement. If the query is
77    // read-only it is ignored.
78    // Added in SDK Version 1.4.0
79    // TODO Durability types.Durability
80
81    // private fields: driver and RCB data
82
83    // statement specifies a query statement.
84    statement: Option<String>,
85
86    // prepared_statement specifies the prepared query statement.
87    pub(crate) prepared_statement: PreparedStatement,
88
89    // shortcuts
90    has_driver: bool,
91
92    pub(crate) is_done: bool,
93
94    // created/used by internal iterators
95    is_internal: bool,
96
97    // reached_limit indicates if the query execution reached the size-based or
98    // number-based limit. If so, query execution must stop and a batch of
99    // results (potentially empty) must be returned to the application.
100    pub(crate) reached_limit: bool,
101
102    pub(crate) consumed_capacity: Capacity,
103
104    // memory_consumption represents the amount of memory in bytes that were
105    // consumed by the query at the client for operations such as duplicate
106    // elimination and sorting.
107    // TODO pub memory_consumption: i64,
108
109    // sql_hash_tag is a portion of the hash value of SQL text, used as a tag
110    // for query tracing.
111    pub(crate) sql_hash_tag: Vec<u8>,
112
113    // err represents a non-retryable error returned by a query batch.
114    err: Option<NoSQLError>,
115
116    pub(crate) continuation_key: Option<Vec<u8>>,
117
118    pub(crate) shard_id: i32,
119
120    // total number of batches executed
121    pub(crate) batch_counter: i32,
122
123    // for "advanced" queries using plan iterators
124    pub(crate) num_registers: i32,
125    pub(crate) registers: Vec<FieldValue>,
126
127    pub(crate) topology_info: TopologyInfo,
128}
129
130/// Struct representing the result of a query operation.
131#[derive(Default, Debug)]
132pub struct QueryResult {
133    pub(crate) rows: Vec<MapValue>,
134    pub(crate) prepared_statement: PreparedStatement,
135    pub(crate) consumed: Capacity,
136    // TODO: stats, consumed, etc.
137}
138
139impl QueryResult {
140    /// Get the query result rows, if any.
141    ///
142    /// If the query returned no rows, this will return a reference to an empty vector.
143    /// Otherwise, it will return a reference to the rows in the order specified by the query.
144    pub fn rows(&self) -> &Vec<MapValue> {
145        &self.rows
146    }
147    /// Take the query result rows, setting the result back to an empty vector.
148    ///
149    /// If the query returned no rows, this will return an empty vector.
150    /// Otherwise, it will return the rows in the vector, giving the ownership
151    /// of the rows to the caller.
152    pub fn take_rows(&mut self) -> Vec<MapValue> {
153        std::mem::take(&mut self.rows)
154    }
155    /// Get the prepared statement after execution of a query.
156    ///
157    /// The prepared statement can then be used in subsequent query requests, saving the
158    /// extra step of preparing each query again.
159    pub fn prepared_statement(&self) -> PreparedStatement {
160        let mut ps = self.prepared_statement.clone();
161        let _ = ps.reset();
162        ps
163    }
164    /// Return the total capacity that was consumed during the execution of the query.
165    ///
166    /// This is only relevant for NoSQL Cloud operation. It returns a [`Capacity`] struct which
167    /// contains the total read units, read KB, and write units used by the query execution.
168    pub fn consumed(&self) -> Capacity {
169        self.consumed.clone()
170    }
171}
172
173impl QueryRequest {
174    /// Create a new QueryRequest from a SQL query string.
175    ///
176    /// While this struct is named `QueryRequest`, the SQL supplied to it does not
177    /// necessarily have to be a `SELECT` query. It could also be one of `INSERT`, `UPDATE`,
178    /// or `DELETE`.
179    ///
180    /// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
181    ///
182    /// Note: this request should not be used for DDL statements (those that create or modify tables or indexes, such as `CREATE TABLE`). For DDL statements, use [`TableRequest`](crate::TableRequest) instead.
183    ///
184    pub fn new(statement: &str) -> Self {
185        QueryRequest {
186            statement: Some(statement.to_string()),
187            shard_id: -1,
188            ..Default::default()
189        }
190    }
191
192    /// Create a new QueryRequest from a previously prepared query statement.
193    ///
194    /// Use of this method is recommended when executing the same type of query multiple
195    /// times with different values for parameters. Doing so will save resources by not
196    /// re-preparing the query on every execution.
197    ///
198    /// To set bind variables for query execution, first create the request with this method,
199    /// then call [`QueryRequest::set_variable()`] for all desired bind variables. Then execute the
200    /// query with [`QueryRequest::execute()`].
201    pub fn new_prepared(prepared_statement: &PreparedStatement) -> Self {
202        let ti: TopologyInfo;
203        if let Some(t) = &prepared_statement.topology_info {
204            ti = t.clone();
205        } else {
206            panic!(
207                "Invalid prepared statement passed to new_prepared! Missing toploogy info. ps={:?}",
208                prepared_statement
209            );
210        }
211        QueryRequest {
212            prepared_statement: prepared_statement.clone(),
213            shard_id: -1,
214            topology_info: ti,
215            ..Default::default()
216        }
217    }
218
219    /// Specify that this query execution should only prepare the query.
220    ///
221    /// Setting this value to true and then calling [`QueryRequest::execute()`]
222    /// will result in only the query being prepared, and no result rows being returned.
223    /// The prepared statement can then be retrieved using [`QueryResult::prepared_statement()`]
224    /// and can be used in subsequent query calls using [`QueryRequest::new_prepared()`].
225    pub fn prepare_only(mut self) -> Self {
226        self.prepare_only = true;
227        self
228    }
229
230    /// Specify the timeout value for the request.
231    ///
232    /// This is optional.
233    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
234    /// IllegalArgument error will be returned.
235    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
236    pub fn timeout(mut self, t: &Duration) -> Self {
237        self.timeout = Some(t.clone());
238        self
239    }
240
241    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
242    ///
243    /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
244    /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
245    /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
246    /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
247    ///
248    /// If no compartment is given, the root compartment of the tenancy will be used.
249    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
250        self.compartment_id = compartment_id.to_string();
251        self
252    }
253
254    // Specify a limit on number of items returned by the operation.
255    //
256    // This allows an operation to return less than the default amount of data.
257    //pub fn limit(mut self, l: u32) -> Self {
258    //self.limit = l;
259    //self
260    //}
261
262    /// Specify the desired consistency policy for the request.
263    ///
264    /// If not set, the default consistency of [`Consistency::Eventual`] is used.
265    pub fn consistency(mut self, c: &Consistency) -> Self {
266        self.consistency = c.clone();
267        self
268    }
269
270    /// Specify the limit on the total data read during a single batch operation, in KB.
271    ///
272    /// For cloud service, this value can only reduce the system defined limit.
273    /// An attempt to increase the limit beyond the system defined limit will
274    /// cause an IllegalArgument error. This limit is independent of read units
275    /// consumed by the operation.
276    ///
277    /// It is recommended that for tables with relatively low provisioned read
278    /// throughput that this limit be set to less than or equal to one half
279    /// of the provisioned throughput in order to reduce the possibility of throttling
280    /// errors.
281    pub fn max_read_kb(mut self, max: u32) -> Self {
282        self.max_read_kb = max;
283        self
284    }
285
286    /// Specify the limit on the total data written during a single batch operation, in KB.
287    ///
288    /// For cloud service, this value can only reduce the system defined limit.
289    /// An attempt to increase the limit beyond the system defined limit will
290    /// cause an IllegalArgument error. This limit is independent of write units
291    /// consumed by the operation.
292    ///
293    /// This limit is independent of write units consumed by the operation.
294    pub fn max_write_kb(mut self, max: u32) -> Self {
295        self.max_write_kb = max;
296        self
297    }
298
299    // used by ext_var_ref_iter
300    pub(crate) fn get_external_var(&self, id: i32) -> Option<&FieldValue> {
301        if self.prepared_statement.is_empty() {
302            return None;
303        }
304        self.prepared_statement.get_variable_by_id(id)
305    }
306
307    // note private
308    async fn get_results(
309        &mut self,
310        handle: &Handle,
311        results: &mut Vec<MapValue>,
312    ) -> Result<(), NoSQLError> {
313        // this is where everything happens
314
315        if let Some(e) = &self.err {
316            return Err(e.clone());
317        }
318
319        if self.prepare_only == true || self.prepared_statement.is_simple() {
320            // results already fetched from nson_deserialize()
321            //println!("get_results: prepare or simple: returning");
322            return Ok(());
323        }
324
325        //let driver_plan = &mut self.prepared_statement.driver_query_plan;
326        let mut driver_plan = std::mem::take(&mut self.prepared_statement.driver_query_plan);
327
328        if driver_plan.get_state() == PlanIterState::Uninitialized {
329            //println!("get_results: initializing driver plan");
330            self.reached_limit = false;
331            //self.memory_consumption = 0;
332            self.consumed_capacity = Capacity::default();
333            self.sql_hash_tag = Default::default();
334
335            self.consumed_capacity.read_kb += 1; // prep cost
336            self.consumed_capacity.read_units += 1; // prep cost
337            driver_plan.open(self, handle)?;
338        }
339
340        let mut more;
341        loop {
342            //println!("get_results: calling driver_plan.next()");
343            more = driver_plan.next(self, handle).await?;
344            if more == false {
345                //println!("get_results: no more results: breaking");
346                break;
347            }
348            //println!("get_results: pushing 1 result");
349            results.push(driver_plan.get_result(self).get_map_value()?);
350            //if self.limit > 0 && results.len() >= self.limit as usize {
351            //println!(
352            //"get_results: reached limit: results_size={}, limit={}",
353            //results.len(),
354            //self.limit
355            //);
356            //self.reached_limit = true;
357            //break;
358            //}
359        }
360
361        self.prepared_statement.driver_query_plan = driver_plan;
362
363        if more {
364            // non-advanced queries just need Some/None, value not used
365            self.continuation_key = Some(Vec::new());
366            self.is_done = false;
367        } else {
368            if self.reached_limit {
369                // there is more to do, but we reached a limit
370                self.continuation_key = Some(Vec::new());
371                self.reached_limit = false;
372                self.is_done = false;
373            } else {
374                self.continuation_key = None;
375                self.is_done = true;
376            }
377        }
378
379        Ok(())
380    }
381
382    pub(crate) fn copy_for_internal(&self) -> Self {
383        if self.prepared_statement.is_empty() {
384            panic!("prepared statement is empty in copy_for_internal");
385        }
386        QueryRequest {
387            is_internal: true,
388            prepared_statement: self.prepared_statement.copy_for_internal(),
389            shard_id: self.shard_id,
390            //limit: self.limit,
391            // purposefully not copying registers
392            num_registers: -1,
393            timeout: self.timeout.clone(),
394            ..Default::default()
395        }
396    }
397
398    pub(crate) fn reset(&mut self) -> Result<(), NoSQLError> {
399        self.is_done = false;
400        self.reached_limit = false;
401        self.batch_counter = 0;
402        self.consumed_capacity = Capacity::default();
403        // clear prepared statement iterators
404        self.prepared_statement.reset()
405    }
406
407    /// Set a named bind variable for execution of a prepared query.
408    ///
409    /// See [`PreparedStatement`] for an example of using this method.
410    pub fn set_variable(
411        &mut self,
412        name: &str,
413        value: &impl NoSQLColumnToFieldValue,
414    ) -> Result<(), NoSQLError> {
415        if self.prepared_statement.is_empty() {
416            return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
417        }
418        let fv = value.to_field_value();
419        self.prepared_statement.set_variable(name, &fv)
420    }
421
422    /// Set a positional bind variable for execution of a prepared query.
423    ///
424    /// This is similar to [`set_variable()`](QueryRequest::set_variable()) but uses integer-based positional parameters:
425    /// ```no_run
426    /// # use oracle_nosql_rust_sdk::{Handle, QueryRequest, NoSQLColumnToFieldValue};
427    /// # #[tokio::main]
428    /// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
429    /// let handle = Handle::builder().build().await?;
430    /// let prep_result = QueryRequest::new("insert into testusers(id, name) values(?, ?)")
431    ///     .prepare_only()
432    ///     .execute(&handle)
433    ///     .await?;
434    /// let data = vec!["jane", "john", "jasper"];
435    /// let mut qreq = QueryRequest::new_prepared(&prep_result.prepared_statement());
436    /// for i in 0..data.len() {
437    ///     let id = (i as i32) + 100;
438    ///     qreq.set_variable_by_id(1, &id)?;
439    ///     qreq.set_variable_by_id(2, &data[i])?;
440    ///     let result = qreq.execute(&handle).await?;
441    ///     println!("Insert result = {:?}", result);
442    /// }
443    /// # Ok(())
444    /// # }
445    pub fn set_variable_by_id(
446        &mut self,
447        id: i32,
448        value: &impl NoSQLColumnToFieldValue,
449    ) -> Result<(), NoSQLError> {
450        if self.prepared_statement.is_empty() {
451            return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
452        }
453        let fv = value.to_field_value();
454        self.prepared_statement.set_variable_by_id(id, &fv)
455    }
456
457    /// Execute the query to full completion.
458    ///
459    /// This is the preferred method for execution of a query. Internally, this method will loop
460    /// calling `execute_batch()` until all results are returned and all post-processing (sorting,
461    /// grouping, aggregations, etc) are complete.
462    ///
463    /// If the query has no rows to return, [`QueryResult::rows()`] will return an empty vector.
464    /// Otherwise it will return a vector of
465    /// [`MapValue`](crate::types::MapValue) structs in the order specified by the
466    /// query statement.
467    pub async fn execute(&mut self, h: &Handle) -> Result<QueryResult, NoSQLError> {
468        let mut iter_data = ReceiveIterData::default();
469        let mut results: Vec<MapValue> = Vec::new();
470        self.reset()?;
471        while self.is_done == false {
472            //println!("execute_internal doing next batch");
473            self.execute_batch_internal(h, &mut results, &mut iter_data)
474                .await?;
475            self.batch_counter += 1;
476            if self.batch_counter > 10000 {
477                panic!("Batch_internal infinite loop detected: self={:?}", self);
478            }
479        }
480
481        if self.prepared_statement.is_empty() {
482            panic!("empty prepared statement after execute!");
483        }
484
485        // TODO: retries, stats, etc
486        let mut qres = QueryResult {
487            prepared_statement: self.prepared_statement.clone(),
488            consumed: self.consumed_capacity.clone(),
489            rows: results,
490        };
491        let _ = qres.prepared_statement.reset();
492        Ok(qres)
493    }
494
495    /// Execute one batch of a query.
496    ///
497    /// This will execute at most one round-trip to the server. It should be called in a loop
498    /// until `is_done()` returns `true`. Note that any one batch execution may not set any results,
499    /// since some queries require many server round trips to finish (sorting, for example).
500    ///
501    /// It is recommended to use [`execute()`](QueryRequest::execute()) instead of this method.
502    /// *This method may be deprecated in future releases*.
503    pub async fn execute_batch(
504        &mut self,
505        handle: &Handle,
506        results: &mut Vec<MapValue>,
507    ) -> Result<(), NoSQLError> {
508        let mut _data = ReceiveIterData::default();
509        self.execute_batch_internal(handle, results, &mut _data)
510            .await
511    }
512
513    /// Determine if the query is complete.
514    ///
515    /// If using [`QueryRequest::execute_batch()`] in a loop, this method determines when
516    /// to terminate the loop, specifying that no more results exist for this query execution.
517    /// This is only necessary if executing queries in batch looping mode.
518    pub fn is_done(&self) -> bool {
519        self.is_done
520    }
521
522    pub(crate) async fn execute_batch_internal(
523        &mut self,
524        handle: &Handle,
525        results: &mut Vec<MapValue>,
526        iter_data: &mut ReceiveIterData,
527    ) -> Result<(), NoSQLError> {
528        trace!(
529            "EBI: batch_counter={} num_results={}",
530            self.batch_counter,
531            results.len()
532        );
533
534        self.reached_limit = false;
535
536        // internal queries do not use plan iterators/etc - they just return plain results.
537        if self.is_internal == false {
538            /*
539             * The following "if" may be true for advanced queries only. For
540             * such queries, the "if" will be true (i.e., the QueryRequest will
541             * be bound with a QueryDriver) if and only if this is not the 1st
542             * execute() call for this query. In this case we just return a new,
543             * empty QueryResult. Actual computation of a result batch will take
544             * place when the app calls getResults() on the QueryResult.
545             */
546            if self.has_driver {
547                //trace("QueryRequest has QueryDriver", 2);
548                return self.get_results(handle, results).await;
549            }
550
551            /*
552             * If it is an advanced query and we are here, then this must be
553             * the 1st execute() call for the query. If the query has been
554             * prepared before, we create a QueryDriver and bind it with the
555             * QueryRequest. Then, we create and return an empty QueryResult.
556             * Actual computation of a result batch will take place when the
557             * app calls getResults() on the QueryResult.
558             */
559            if self.prepared_statement.is_empty() == false
560                && self.prepared_statement.is_simple() == false
561            {
562                //trace("QueryRequest has no QueryDriver, but is prepared", 2);
563                self.num_registers = self.prepared_statement.num_registers;
564                self.registers = Vec::new();
565                for _i in 0..self.num_registers {
566                    self.registers.push(FieldValue::Uninitialized);
567                }
568                self.has_driver = true;
569                return self.get_results(handle, results).await;
570            }
571
572            /*
573             * If we are here, then this is either (a) a simple query or (b) an
574             * advanced query that has not been prepared already, which also
575             * implies that this is the 1st execute() call on this query. For
576             * a non-prepared advanced query, the effect of this 1st execute()
577             * call is to send the query to the proxy for compilation, get back
578             * the prepared query, but no query results, create a QueryDriver,
579             * and bind it with the QueryRequest (see
580             * QueryRequestSerializer.deserialize()), and return an empty
581             * QueryResult.
582             * a non-prepared simple query will return results.
583             */
584            //trace("QueryRequest has no QueryDriver and is not prepared", 2);
585            //self.batch_counter += 1;
586        }
587
588        let mut w: Writer = Writer::new();
589        w.write_i16(handle.inner.serial_version);
590        let timeout = handle.get_timeout(&self.timeout);
591        self.serialize_internal(&mut w, &timeout)?;
592        let mut opts = SendOptions {
593            timeout: timeout,
594            retryable: true,
595            compartment_id: self.compartment_id.clone(),
596            ..Default::default()
597        };
598        let mut r = handle.send_and_receive(w, &mut opts).await?;
599        self.continuation_key = None;
600        self.nson_deserialize(&mut r, results, iter_data)?;
601        if self.continuation_key.is_none() {
602            trace!("continuation key is None, setting is_done");
603            self.is_done = true;
604        }
605        Ok(())
606    }
607
608    fn serialize_internal(&self, w: &mut Writer, timeout: &Duration) -> Result<(), NoSQLError> {
609        let mut ns = NsonSerializer::start_request(w);
610        ns.start_header();
611        if self.prepare_only {
612            ns.write_header(OpCode::Prepare, timeout, "");
613        } else {
614            ns.write_header(OpCode::Query, timeout, "");
615        }
616        ns.end_header();
617        ns.start_payload();
618
619        //TODO writeConsistency(ns, rq.getConsistency());
620        //if (rq.getDurability() != null) {
621        //writeMapField(ns, DURABILITY,
622        //getDurability(rq.getDurability()));
623        //}
624
625        if self.max_read_kb > 0 {
626            ns.write_i32_field(MAX_READ_KB, self.max_read_kb as i32);
627        }
628        if self.max_write_kb > 0 {
629            ns.write_i32_field(MAX_WRITE_KB, self.max_write_kb as i32);
630        }
631        //if self.limit > 0 {
632        //ns.write_i32_field(NUMBER_LIMIT, self.limit as i32);
633        //}
634
635        //writeMapFieldNZ(ns, TRACE_LEVEL, rq.getTraceLevel());
636        //if (rq.getTraceLevel() > 0) {
637        //writeMapField(ns, TRACE_AT_LOG_FILES, rq.getLogFileTracing());
638        //writeMapField(ns, BATCH_COUNTER, rq.getBatchCounter());
639
640        ns.write_i32_field(QUERY_VERSION, 3); // TODO: QUERY_V4
641        if self.prepared_statement.is_empty() == false {
642            ns.write_bool_field(IS_PREPARED, true);
643            ns.write_bool_field(IS_SIMPLE_QUERY, self.prepared_statement.is_simple());
644            ns.write_binary_field(PREPARED_QUERY, &self.prepared_statement.statement);
645            if self.prepared_statement.data.bind_variables.len() > 0 {
646                ns.start_array(BIND_VARIABLES);
647                for (k, v) in &self.prepared_statement.data.bind_variables {
648                    ns.start_map("");
649                    trace!(" BIND: name={} value={:?}", k, v);
650                    ns.write_string_field(NAME, k);
651                    ns.write_field(VALUE, v);
652                    ns.end_map("");
653                    ns.incr_size(1);
654                }
655                ns.end_array(BIND_VARIABLES);
656            }
657        } else {
658            if let Some(s) = &self.statement {
659                ns.write_string_field(STATEMENT, &s);
660            } else {
661                return ia_err!("no statement or prepared statement");
662            }
663        }
664        if let Some(ck) = &self.continuation_key {
665            if ck.len() > 0 {
666                ns.write_binary_field(CONTINUATION_KEY, ck);
667                //println!("Wrote {} byte continuation key", ck.len());
668            }
669        }
670
671        //writeLongMapFieldNZ(ns, SERVER_MEMORY_CONSUMPTION,
672        //rq.getMaxServerMemoryConsumption());
673        //writeMathContext(ns, rq.getMathContext());
674
675        if self.shard_id > -1 {
676            //println!("Q: SHARD_ID={}", self.shard_id);
677            ns.write_i32_field(SHARD_ID, self.shard_id);
678        }
679        //if (queryVersion >= QueryDriver.QUERY_V4) {
680        //if (rq.getQueryName() != null) {
681        //writeMapField(ns, QUERY_NAME, rq.getQueryName());
682        //}
683        //if (rq.getVirtualScan() != null) {
684        //writeVirtualScan(ns, rq.getVirtualScan());
685        //}
686        //}
687
688        ns.end_payload();
689        ns.end_request();
690        Ok(())
691    }
692
693    // TODO
694    //theRCB.tallyRateLimitDelayedMs(result.getRateLimitDelayedMs());
695    //theRCB.tallyRetryStats(result.getRetryStats());
696    // TODO: support deduping of results
697
698    pub(crate) fn add_results(
699        &self,
700        walker: &mut MapWalker,
701        results: &mut Vec<MapValue>,
702    ) -> Result<(), NoSQLError> {
703        let t = FieldType::try_from_u8(walker.r.read_byte()?)?;
704        if t != FieldType::Array {
705            return ia_err!("bad type in queryResults: {:?}, should be Array", t);
706        }
707        walker.r.read_i32()?; // length of array in bytes
708        let num_elements = walker.r.read_i32()?;
709        trace!("read_results: num_results={}", num_elements);
710        if num_elements <= 0 {
711            return Ok(());
712        }
713        for _i in 0..num_elements {
714            if let FieldValue::Map(m) = walker.r.read_field_value()? {
715                //println!("Result: {:?}", m);
716                results.push(m);
717            } else {
718                return ia_err!("got invalid type of value in query results");
719            }
720        }
721        Ok(())
722    }
723
724    // Deserialize results for a QueryRequest.
725    fn nson_deserialize(
726        &mut self,
727        r: &mut Reader,
728        results: &mut Vec<MapValue>,
729        iter_data: &mut ReceiveIterData,
730    ) -> Result<(), NoSQLError> {
731        // TODO short serialVersion
732        // TODO short queryVersion
733
734        let is_prepared_request = !self.prepared_statement.is_empty();
735
736        let mut ti = TopologyInfo::default();
737        self.continuation_key = None;
738        iter_data.continuation_key = None;
739
740        let mut walker = MapWalker::new(r)?;
741        while walker.has_next() {
742            walker.next()?;
743            let name = walker.current_name();
744            match name.as_str() {
745                ERROR_CODE => {
746                    walker.handle_error_code()?;
747                }
748                CONSUMED => {
749                    let cap = walker.read_nson_consumed_capacity()?;
750                    self.consumed_capacity.add(&cap);
751                }
752                QUERY_RESULTS => {
753                    self.add_results(&mut walker, results)?;
754                }
755                CONTINUATION_KEY => {
756                    let ck = walker.read_nson_binary()?;
757                    if ck.len() > 0 {
758                        trace!("Read {} byte continuation key", ck.len());
759                        iter_data.continuation_key = Some(ck.clone());
760                        self.continuation_key = Some(ck);
761                    }
762                }
763                SORT_PHASE1_RESULTS => {
764                    self.read_phase_1_results(iter_data, &walker.read_nson_binary()?)?;
765                }
766                PREPARED_QUERY => {
767                    if is_prepared_request {
768                        return ia_err!("got prepared query in result for already prepared query");
769                    }
770                    self.prepared_statement.statement = walker.read_nson_binary()?;
771                }
772                DRIVER_QUERY_PLAN => {
773                    if is_prepared_request {
774                        return ia_err!("got driver plan in result for already prepared query");
775                    }
776                    let v = walker.read_nson_binary()?;
777                    self.get_driver_plan_info(&v)?;
778                }
779                REACHED_LIMIT => {
780                    self.reached_limit = walker.read_nson_boolean()?;
781                    trace!("REACHED_LIMIT={}", self.reached_limit);
782                }
783                TABLE_NAME => {
784                    self.prepared_statement.table_name = Some(walker.read_nson_string()?);
785                }
786                NAMESPACE => {
787                    self.prepared_statement.namespace = Some(walker.read_nson_string()?);
788                }
789                QUERY_PLAN_STRING => {
790                    self.prepared_statement.query_plan = walker.read_nson_string()?;
791                }
792                QUERY_RESULT_SCHEMA => {
793                    self.prepared_statement.query_schema = walker.read_nson_string()?;
794                }
795                QUERY_OPERATION => {
796                    // TODO: is this an enum? try_from()?
797                    self.prepared_statement.operation = walker.read_nson_i32()? as u8;
798                }
799                TOPOLOGY_INFO => {
800                    //println!("deser: TOPOLOGY_INFO");
801                    self.prepared_statement.topology_info = Some(walker.read_nson_topology_info()?);
802                }
803                /* QUERY_V3 and earlier return topo differently */
804                PROXY_TOPO_SEQNUM => {
805                    //println!("deser: PROXY_TOPO_SEQNUM");
806                    ti.seq_num = walker.read_nson_i32()?;
807                }
808                SHARD_IDS => {
809                    //println!("deser: SHARD_IDS");
810                    ti.shard_ids = walker.read_nson_i32_array()?;
811                }
812                _ => {
813                    trace!("   query_response: skipping field '{}'", name);
814                    walker.skip_nson_field()?;
815                } /*
816                  // added in QUERY_V4
817                  else if (name.equals(VIRTUAL_SCANS)) {
818                      readType(in, Nson.TYPE_ARRAY);
819                      in.readInt(); // length of array in bytes
820                      int numScans = in.readInt(); // number of array elements
821                      virtualScans = new VirtualScan[numScans];
822                      for (int i = 0; i < numScans; ++i) {
823                          virtualScans[i] = readVirtualScan(in);
824                      }
825
826                  /* added in QUERY_V4 */
827                  } else if (name.equals(QUERY_BATCH_TRACES)) {
828                      readType(in, Nson.TYPE_ARRAY);
829                      in.readInt(); // length of array in bytes
830                      int numTraces = in.readInt() / 2; // number of array elements
831                      queryTraces = new TreeMap<String,String>();
832                      for (int i = 0; i < numTraces; ++i) {
833                          String batchName = Nson.readNsonString(in);
834                          String batchTrace = Nson.readNsonString(in);
835                          queryTraces.put(batchName, batchTrace);
836                      }
837                  }
838                  */
839            }
840        }
841
842        if ti.is_valid() {
843            self.prepared_statement.topology_info = Some(ti);
844        }
845
846        if let Some(ti) = &self.prepared_statement.topology_info {
847            //println!("deser: got TI={:?}", ti);
848            self.topology_info = ti.clone();
849        } else {
850            trace!("deser: NO VALID TOPOLOGY RECEIVED");
851        }
852
853        if self.prepare_only == true {
854            if self.prepared_statement.is_empty() {
855                return ia_err!("got no prepared statement when prepare_only was set");
856            }
857            self.is_done = true;
858        } else {
859            if !self.prepared_statement.is_simple() && self.continuation_key.is_none() {
860                // dummy cont key so is_done won't be set
861                trace!("Adding dummy continuation key");
862                self.continuation_key = Some(Vec::new());
863            }
864        }
865
866        Ok(())
867    }
868
869    fn get_driver_plan_info(&mut self, v: &Vec<u8>) -> Result<(), NoSQLError> {
870        if v.len() == 0 {
871            return Ok(());
872        }
873        let mut r = Reader::new().from_bytes(v);
874        self.prepared_statement.driver_query_plan = deserialize_plan_iter(&mut r)?;
875        //println!(
876        //"driver query plan:\n{:?}",
877        //self.prepared_statement.driver_query_plan
878        //);
879        if self.prepared_statement.driver_query_plan.get_kind() == PlanIterKind::Empty {
880            return Ok(());
881        }
882        self.prepared_statement.num_iterators = r.read_i32()?;
883        //println!(
884        //"   QUERY_PLAN: iterators={}",
885        //self.prepared_statement.num_iterators
886        //);
887        self.prepared_statement.num_registers = r.read_i32()?;
888        //println!(
889        //"   QUERY_PLAN: registers={}",
890        //self.prepared_statement.num_registers
891        //);
892        let len = r.read_i32()?;
893        if len <= 0 {
894            return Ok(());
895        }
896        let mut hm: HashMap<String, i32> = HashMap::with_capacity(len as usize);
897        for _i in 0..len {
898            let name = r.read_string()?;
899            let id = r.read_i32()?;
900            hm.insert(name, id);
901        }
902        self.prepared_statement.variable_to_ids = Some(hm);
903        Ok(())
904    }
905
906    fn read_phase_1_results(
907        &mut self,
908        iter_data: &mut ReceiveIterData,
909        arr: &Vec<u8>,
910    ) -> Result<(), NoSQLError> {
911        let mut r: Reader = Reader::new().from_bytes(arr);
912        iter_data.in_sort_phase_1 = r.read_bool()?;
913        iter_data.pids = r.read_i32_array()?;
914        if iter_data.pids.len() > 0 {
915            iter_data.num_results_per_pid = r.read_i32_array()?;
916            iter_data.part_continuation_keys = Vec::new();
917            for _x in 0..iter_data.num_results_per_pid.len() {
918                iter_data.part_continuation_keys.push(r.read_binary()?);
919            }
920        }
921        Ok(())
922    }
923
924    pub(crate) fn get_result(&mut self, reg: i32) -> FieldValue {
925        if self.num_registers <= reg {
926            panic!("INVALID GET REGISTER ACCESS");
927        }
928        //println!(
929        //" get_result register {}: {:?}",
930        //reg, self.registers[reg as usize]
931        //);
932        std::mem::take(&mut self.registers[reg as usize])
933    }
934
935    pub(crate) fn get_result_ref(&self, reg: i32) -> &FieldValue {
936        //println!(
937        //" get_result_ref register {}: {:?}",
938        //reg, self.registers[reg as usize]
939        //);
940        &self.registers[reg as usize]
941    }
942
943    pub(crate) fn set_result(&mut self, reg: i32, val: FieldValue) {
944        if self.num_registers <= reg {
945            panic!("INVALID SET REGISTER ACCESS");
946        }
947        //println!(" set_result register {}: {:?}", reg, val);
948        self.registers[reg as usize] = val;
949    }
950}