oracle_nosql_rust_sdk/
query_request.rs

1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5//  https://oss.oracle.com/licenses/upl/
6//
7use crate::error::ia_err;
8use crate::error::NoSQLError;
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::plan_iter::{deserialize_plan_iter, PlanIterKind, PlanIterState};
13use crate::prepared_statement::PreparedStatement;
14use crate::reader::Reader;
15use crate::receive_iter::ReceiveIterData;
16use crate::types::NoSQLColumnToFieldValue;
17use crate::types::{Capacity, Consistency, FieldType, FieldValue, MapValue, OpCode, TopologyInfo};
18use crate::writer::Writer;
19
20use std::collections::HashMap;
21use std::result::Result;
22use std::time::Duration;
23use tracing::trace;
24
25/// Encapsulates a SQL query of a NoSQL Database table.
26///
27/// A query may be either a string query
28/// statement or a prepared query, which may include bind variables.
29/// A query request cannot have both a string statement and prepared query, but
30/// it must have one or the other.
31///
32/// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
33///
34/// ## Simple Example
35/// Here is a simple example of running a query that will return every row in a table named `users`:
36///
37/// ```no_run
38/// # use oracle_nosql_rust_sdk::{Handle, QueryRequest};
39/// # #[tokio::main]
40/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
41/// let handle = Handle::builder().build().await?;
42/// let results = QueryRequest::new("select * from users")
43///               .execute(&handle).await?;
44/// for row in results.rows() {
45///     println!("Row = {}", row);
46/// }
47/// # Ok(())
48/// # }
49/// ```
50///
51/// For performance reasons, prepared queries are preferred for queries that may
52/// be reused. Prepared queries bypass compilation of the query. They also allow
53/// for parameterized queries using bind variables.
54#[derive(Default, Debug)]
55pub struct QueryRequest {
56    pub(crate) prepare_only: bool,
57    //pub(crate) limit: u32,
58    pub(crate) max_read_kb: u32,
59    pub(crate) max_write_kb: u32,
60    pub(crate) consistency: Consistency,
61    pub(crate) timeout: Option<Duration>,
62    pub(crate) compartment_id: String,
63
64    // max_memory_consumption specifies the maximum amount of memory in bytes that
65    // may be consumed by the query at the client for operations such as
66    // duplicate elimination (which may be required due to the use of an index
67    // on an array or map) and sorting. Such operations may consume a lot of
68    // memory as they need to cache the full result set or a large subset of
69    // it at the client memory.
70    //
71    // The default value is 1GB (1,000,000,000).
72    // TODO pub max_memory_consumption: i64,
73
74    // Durability is currently only used in On-Prem installations.
75    // This setting only applies if the query modifies
76    // a row using an INSERT, UPSERT, or DELETE statement. If the query is
77    // read-only it is ignored.
78    // Added in SDK Version 1.4.0
79    // TODO Durability types.Durability
80
81    // private fields: driver and RCB data
82
83    // statement specifies a query statement.
84    statement: Option<String>,
85
86    // prepared_statement specifies the prepared query statement.
87    pub(crate) prepared_statement: PreparedStatement,
88
89    // shortcuts
90    has_driver: bool,
91
92    pub(crate) is_done: bool,
93
94    // created/used by internal iterators
95    is_internal: bool,
96
97    // reached_limit indicates if the query execution reached the size-based or
98    // number-based limit. If so, query execution must stop and a batch of
99    // results (potentially empty) must be returned to the application.
100    pub(crate) reached_limit: bool,
101
102    pub(crate) consumed_capacity: Capacity,
103
104    // memory_consumption represents the amount of memory in bytes that were
105    // consumed by the query at the client for operations such as duplicate
106    // elimination and sorting.
107    // TODO pub memory_consumption: i64,
108
109    // sql_hash_tag is a portion of the hash value of SQL text, used as a tag
110    // for query tracing.
111    pub(crate) sql_hash_tag: Vec<u8>,
112
113    // err represents a non-retryable error returned by a query batch.
114    err: Option<NoSQLError>,
115
116    pub(crate) continuation_key: Option<Vec<u8>>,
117
118    pub(crate) shard_id: i32,
119
120    // total number of batches executed
121    pub(crate) batch_counter: i32,
122
123    // for "advanced" queries using plan iterators
124    pub(crate) num_registers: i32,
125    pub(crate) registers: Vec<FieldValue>,
126
127    pub(crate) topology_info: TopologyInfo,
128}
129
130/// Struct representing the result of a query operation.
131#[derive(Default, Debug)]
132pub struct QueryResult {
133    pub(crate) rows: Vec<MapValue>,
134    pub(crate) prepared_statement: PreparedStatement,
135    pub(crate) consumed: Capacity,
136    // TODO: stats, consumed, etc.
137}
138
139impl QueryResult {
140    /// Get the query result rows, if any.
141    ///
142    /// If the query returned no rows, this will return a reference to an empty vector.
143    /// Otherwise, it will return a reference to the rows in the order specified by the query.
144    pub fn rows(&self) -> &Vec<MapValue> {
145        &self.rows
146    }
147    /// Take the query result rows, setting the result back to an empty vector.
148    ///
149    /// If the query returned no rows, this will return an empty vector.
150    /// Otherwise, it will return the rows in the vector, giving the ownership
151    /// of the rows to the caller.
152    pub fn take_rows(&mut self) -> Vec<MapValue> {
153        std::mem::take(&mut self.rows)
154    }
155    /// Get the prepared statement after execution of a query.
156    ///
157    /// The prepared statement can then be used in subsequent query requests, saving the
158    /// extra step of preparing each query again.
159    pub fn prepared_statement(&self) -> PreparedStatement {
160        let mut ps = self.prepared_statement.clone();
161        let _ = ps.reset();
162        ps
163    }
164    /// Return the total capacity that was consumed during the execution of the query.
165    ///
166    /// This is only relevant for NoSQL Cloud operation. It returns a [`Capacity`] struct which
167    /// contains the total read units, read KB, and write units used by the query execution.
168    pub fn consumed(&self) -> Capacity {
169        self.consumed.clone()
170    }
171}
172
173impl QueryRequest {
174    /// Create a new QueryRequest from a SQL query string.
175    ///
176    /// While this struct is named `QueryRequest`, the SQL supplied to it does not
177    /// necessarily have to be a `SELECT` query. It could also be one of `INSERT`, `UPDATE`,
178    /// or `DELETE`.
179    ///
180    /// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
181    ///
182    /// Note: this request should not be used for DDL statements (those that create or modify tables or indexes, such as `CREATE TABLE`). For DDL statements, use [`TableRequest`](crate::TableRequest) instead.
183    ///
184    pub fn new(statement: &str) -> Self {
185        QueryRequest {
186            statement: Some(statement.to_string()),
187            shard_id: -1,
188            ..Default::default()
189        }
190    }
191
192    /// Create a new QueryRequest from a previously prepared query statement.
193    ///
194    /// Use of this method is recommended when executing the same type of query multiple
195    /// times with different values for parameters. Doing so will save resources by not
196    /// re-preparing the query on every execution.
197    ///
198    /// To set bind variables for query execution, first create the request with this method,
199    /// then call [`QueryRequest::set_variable()`] for all desired bind variables. Then execute the
200    /// query with [`QueryRequest::execute()`].
201    pub fn new_prepared(prepared_statement: &PreparedStatement) -> Self {
202        let ti: TopologyInfo;
203        if let Some(t) = &prepared_statement.topology_info {
204            ti = t.clone();
205        } else {
206            panic!(
207                "Invalid prepared statement passed to new_prepared! Missing toploogy info. ps={:?}",
208                prepared_statement
209            );
210        }
211        QueryRequest {
212            prepared_statement: prepared_statement.clone(),
213            shard_id: -1,
214            topology_info: ti,
215            ..Default::default()
216        }
217    }
218
219    /// Specify that this query execution should only prepare the query.
220    ///
221    /// Setting this value to true and then calling [`QueryRequest::execute()`]
222    /// will result in only the query being prepared, and no result rows being returned.
223    /// The prepared statement can then be retrieved using [`QueryResult::prepared_statement()`]
224    /// and can be used in subsequent query calls using [`QueryRequest::new_prepared()`].
225    pub fn prepare_only(mut self) -> Self {
226        self.prepare_only = true;
227        self
228    }
229
230    /// Specify the timeout value for the request.
231    ///
232    /// This is optional.
233    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
234    /// IllegalArgument error will be returned.
235    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
236    pub fn timeout(mut self, t: &Duration) -> Self {
237        self.timeout = Some(t.clone());
238        self
239    }
240
241    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
242    ///
243    /// If the associated handle authenticated as an Instance Principal, this value must be an OCID.
244    /// In all other cases, the value may be specified as either a name (or path for nested compartments) or as an OCID.
245    ///
246    /// If no compartment is given, the default compartment id for the handle is used. If that value was
247    /// not specified, the root compartment of the tenancy will be used.
248    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
249        self.compartment_id = compartment_id.to_string();
250        self
251    }
252
253    // Specify a limit on number of items returned by the operation.
254    //
255    // This allows an operation to return less than the default amount of data.
256    //pub fn limit(mut self, l: u32) -> Self {
257    //self.limit = l;
258    //self
259    //}
260
261    /// Specify the desired consistency policy for the request.
262    ///
263    /// If not set, the default consistency of [`Consistency::Eventual`] is used.
264    pub fn consistency(mut self, c: &Consistency) -> Self {
265        self.consistency = c.clone();
266        self
267    }
268
269    /// Specify the limit on the total data read during a single batch operation, in KB.
270    ///
271    /// For cloud service, this value can only reduce the system defined limit.
272    /// An attempt to increase the limit beyond the system defined limit will
273    /// cause an IllegalArgument error. This limit is independent of read units
274    /// consumed by the operation.
275    ///
276    /// It is recommended that for tables with relatively low provisioned read
277    /// throughput that this limit be set to less than or equal to one half
278    /// of the provisioned throughput in order to reduce the possibility of throttling
279    /// errors.
280    pub fn max_read_kb(mut self, max: u32) -> Self {
281        self.max_read_kb = max;
282        self
283    }
284
285    /// Specify the limit on the total data written during a single batch operation, in KB.
286    ///
287    /// For cloud service, this value can only reduce the system defined limit.
288    /// An attempt to increase the limit beyond the system defined limit will
289    /// cause an IllegalArgument error. This limit is independent of write units
290    /// consumed by the operation.
291    ///
292    /// This limit is independent of write units consumed by the operation.
293    pub fn max_write_kb(mut self, max: u32) -> Self {
294        self.max_write_kb = max;
295        self
296    }
297
298    // used by ext_var_ref_iter
299    pub(crate) fn get_external_var(&self, id: i32) -> Option<&FieldValue> {
300        if self.prepared_statement.is_empty() {
301            return None;
302        }
303        self.prepared_statement.get_variable_by_id(id)
304    }
305
306    // note private
307    async fn get_results(
308        &mut self,
309        handle: &Handle,
310        results: &mut Vec<MapValue>,
311    ) -> Result<(), NoSQLError> {
312        // this is where everything happens
313
314        if let Some(e) = &self.err {
315            return Err(e.clone());
316        }
317
318        if self.prepare_only == true || self.prepared_statement.is_simple() {
319            // results already fetched from nson_deserialize()
320            //println!("get_results: prepare or simple: returning");
321            return Ok(());
322        }
323
324        //let driver_plan = &mut self.prepared_statement.driver_query_plan;
325        let mut driver_plan = std::mem::take(&mut self.prepared_statement.driver_query_plan);
326
327        if driver_plan.get_state() == PlanIterState::Uninitialized {
328            //println!("get_results: initializing driver plan");
329            self.reached_limit = false;
330            //self.memory_consumption = 0;
331            self.consumed_capacity = Capacity::default();
332            self.sql_hash_tag = Default::default();
333
334            self.consumed_capacity.read_kb += 1; // prep cost
335            self.consumed_capacity.read_units += 1; // prep cost
336            driver_plan.open(self, handle)?;
337        }
338
339        let mut more;
340        loop {
341            //println!("get_results: calling driver_plan.next()");
342            more = driver_plan.next(self, handle).await?;
343            if more == false {
344                //println!("get_results: no more results: breaking");
345                break;
346            }
347            //println!("get_results: pushing 1 result");
348            results.push(driver_plan.get_result(self).get_map_value()?);
349            //if self.limit > 0 && results.len() >= self.limit as usize {
350            //println!(
351            //"get_results: reached limit: results_size={}, limit={}",
352            //results.len(),
353            //self.limit
354            //);
355            //self.reached_limit = true;
356            //break;
357            //}
358        }
359
360        self.prepared_statement.driver_query_plan = driver_plan;
361
362        if more {
363            // non-advanced queries just need Some/None, value not used
364            self.continuation_key = Some(Vec::new());
365            self.is_done = false;
366        } else {
367            if self.reached_limit {
368                // there is more to do, but we reached a limit
369                self.continuation_key = Some(Vec::new());
370                self.reached_limit = false;
371                self.is_done = false;
372            } else {
373                self.continuation_key = None;
374                self.is_done = true;
375            }
376        }
377
378        Ok(())
379    }
380
381    pub(crate) fn copy_for_internal(&self) -> Self {
382        if self.prepared_statement.is_empty() {
383            panic!("prepared statement is empty in copy_for_internal");
384        }
385        QueryRequest {
386            is_internal: true,
387            prepared_statement: self.prepared_statement.copy_for_internal(),
388            shard_id: self.shard_id,
389            //limit: self.limit,
390            // purposefully not copying registers
391            num_registers: -1,
392            timeout: self.timeout.clone(),
393            ..Default::default()
394        }
395    }
396
397    pub(crate) fn reset(&mut self) -> Result<(), NoSQLError> {
398        self.is_done = false;
399        self.reached_limit = false;
400        self.batch_counter = 0;
401        self.consumed_capacity = Capacity::default();
402        // clear prepared statement iterators
403        self.prepared_statement.reset()
404    }
405
406    /// Set a named bind variable for execution of a prepared query.
407    ///
408    /// See [`PreparedStatement`] for an example of using this method.
409    pub fn set_variable(
410        &mut self,
411        name: &str,
412        value: &impl NoSQLColumnToFieldValue,
413    ) -> Result<(), NoSQLError> {
414        if self.prepared_statement.is_empty() {
415            return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
416        }
417        let fv = value.to_field_value();
418        self.prepared_statement.set_variable(name, &fv)
419    }
420
421    /// Set a positional bind variable for execution of a prepared query.
422    ///
423    /// This is similar to [`set_variable()`](QueryRequest::set_variable()) but uses integer-based positional parameters:
424    /// ```no_run
425    /// # use oracle_nosql_rust_sdk::{Handle, QueryRequest, NoSQLColumnToFieldValue};
426    /// # #[tokio::main]
427    /// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
428    /// let handle = Handle::builder().build().await?;
429    /// let prep_result = QueryRequest::new("insert into testusers(id, name) values(?, ?)")
430    ///     .prepare_only()
431    ///     .execute(&handle)
432    ///     .await?;
433    /// let data = vec!["jane", "john", "jasper"];
434    /// let mut qreq = QueryRequest::new_prepared(&prep_result.prepared_statement());
435    /// for i in 0..data.len() {
436    ///     let id = (i as i32) + 100;
437    ///     qreq.set_variable_by_id(1, &id)?;
438    ///     qreq.set_variable_by_id(2, &data[i])?;
439    ///     let result = qreq.execute(&handle).await?;
440    ///     println!("Insert result = {:?}", result);
441    /// }
442    /// # Ok(())
443    /// # }
444    pub fn set_variable_by_id(
445        &mut self,
446        id: i32,
447        value: &impl NoSQLColumnToFieldValue,
448    ) -> Result<(), NoSQLError> {
449        if self.prepared_statement.is_empty() {
450            return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
451        }
452        let fv = value.to_field_value();
453        self.prepared_statement.set_variable_by_id(id, &fv)
454    }
455
456    /// Execute the query to full completion.
457    ///
458    /// This is the preferred method for execution of a query. Internally, this method will loop
459    /// calling `execute_batch()` until all results are returned and all post-processing (sorting,
460    /// grouping, aggregations, etc) are complete.
461    ///
462    /// If the query has no rows to return, [`QueryResult::rows()`] will return an empty vector.
463    /// Otherwise it will return a vector of
464    /// [`MapValue`](crate::types::MapValue) structs in the order specified by the
465    /// query statement.
466    pub async fn execute(&mut self, h: &Handle) -> Result<QueryResult, NoSQLError> {
467        let mut iter_data = ReceiveIterData::default();
468        let mut results: Vec<MapValue> = Vec::new();
469        self.reset()?;
470        while self.is_done == false {
471            //println!("execute_internal doing next batch");
472            self.execute_batch_internal(h, &mut results, &mut iter_data)
473                .await?;
474            self.batch_counter += 1;
475            if self.batch_counter > 10000 {
476                panic!("Batch_internal infinite loop detected: self={:?}", self);
477            }
478        }
479
480        if self.prepared_statement.is_empty() {
481            panic!("empty prepared statement after execute!");
482        }
483
484        // TODO: retries, stats, etc
485        let mut qres = QueryResult {
486            prepared_statement: self.prepared_statement.clone(),
487            consumed: self.consumed_capacity.clone(),
488            rows: results,
489        };
490        let _ = qres.prepared_statement.reset();
491        Ok(qres)
492    }
493
494    /// Execute one batch of a query.
495    ///
496    /// This will execute at most one round-trip to the server. It should be called in a loop
497    /// until `is_done()` returns `true`. Note that any one batch execution may not set any results,
498    /// since some queries require many server round trips to finish (sorting, for example).
499    ///
500    /// It is recommended to use [`execute()`](QueryRequest::execute()) instead of this method.
501    /// *This method may be deprecated in future releases*.
502    pub async fn execute_batch(
503        &mut self,
504        handle: &Handle,
505        results: &mut Vec<MapValue>,
506    ) -> Result<(), NoSQLError> {
507        let mut _data = ReceiveIterData::default();
508        self.execute_batch_internal(handle, results, &mut _data)
509            .await
510    }
511
512    /// Determine if the query is complete.
513    ///
514    /// If using [`QueryRequest::execute_batch()`] in a loop, this method determines when
515    /// to terminate the loop, specifying that no more results exist for this query execution.
516    /// This is only necessary if executing queries in batch looping mode.
517    pub fn is_done(&self) -> bool {
518        self.is_done
519    }
520
521    pub(crate) async fn execute_batch_internal(
522        &mut self,
523        handle: &Handle,
524        results: &mut Vec<MapValue>,
525        iter_data: &mut ReceiveIterData,
526    ) -> Result<(), NoSQLError> {
527        trace!(
528            "EBI: batch_counter={} num_results={}",
529            self.batch_counter,
530            results.len()
531        );
532
533        self.reached_limit = false;
534
535        // internal queries do not use plan iterators/etc - they just return plain results.
536        if self.is_internal == false {
537            /*
538             * The following "if" may be true for advanced queries only. For
539             * such queries, the "if" will be true (i.e., the QueryRequest will
540             * be bound with a QueryDriver) if and only if this is not the 1st
541             * execute() call for this query. In this case we just return a new,
542             * empty QueryResult. Actual computation of a result batch will take
543             * place when the app calls getResults() on the QueryResult.
544             */
545            if self.has_driver {
546                //trace("QueryRequest has QueryDriver", 2);
547                return self.get_results(handle, results).await;
548            }
549
550            /*
551             * If it is an advanced query and we are here, then this must be
552             * the 1st execute() call for the query. If the query has been
553             * prepared before, we create a QueryDriver and bind it with the
554             * QueryRequest. Then, we create and return an empty QueryResult.
555             * Actual computation of a result batch will take place when the
556             * app calls getResults() on the QueryResult.
557             */
558            if self.prepared_statement.is_empty() == false
559                && self.prepared_statement.is_simple() == false
560            {
561                //trace("QueryRequest has no QueryDriver, but is prepared", 2);
562                self.num_registers = self.prepared_statement.num_registers;
563                self.registers = Vec::new();
564                for _i in 0..self.num_registers {
565                    self.registers.push(FieldValue::Uninitialized);
566                }
567                self.has_driver = true;
568                return self.get_results(handle, results).await;
569            }
570
571            /*
572             * If we are here, then this is either (a) a simple query or (b) an
573             * advanced query that has not been prepared already, which also
574             * implies that this is the 1st execute() call on this query. For
575             * a non-prepared advanced query, the effect of this 1st execute()
576             * call is to send the query to the proxy for compilation, get back
577             * the prepared query, but no query results, create a QueryDriver,
578             * and bind it with the QueryRequest (see
579             * QueryRequestSerializer.deserialize()), and return an empty
580             * QueryResult.
581             * a non-prepared simple query will return results.
582             */
583            //trace("QueryRequest has no QueryDriver and is not prepared", 2);
584            //self.batch_counter += 1;
585        }
586
587        let mut w: Writer = Writer::new();
588        w.write_i16(handle.inner.serial_version);
589        let timeout = handle.get_timeout(&self.timeout);
590        self.serialize_internal(&mut w, &timeout)?;
591        let mut opts = SendOptions {
592            timeout: timeout,
593            retryable: true,
594            compartment_id: self.compartment_id.clone(),
595            ..Default::default()
596        };
597        let mut r = handle.send_and_receive(w, &mut opts).await?;
598        self.continuation_key = None;
599        self.nson_deserialize(&mut r, results, iter_data)?;
600        if self.continuation_key.is_none() {
601            trace!("continuation key is None, setting is_done");
602            self.is_done = true;
603        }
604        Ok(())
605    }
606
607    fn serialize_internal(&self, w: &mut Writer, timeout: &Duration) -> Result<(), NoSQLError> {
608        let mut ns = NsonSerializer::start_request(w);
609        ns.start_header();
610        if self.prepare_only {
611            ns.write_header(OpCode::Prepare, timeout, "");
612        } else {
613            ns.write_header(OpCode::Query, timeout, "");
614        }
615        ns.end_header();
616        ns.start_payload();
617
618        //TODO writeConsistency(ns, rq.getConsistency());
619        //if (rq.getDurability() != null) {
620        //writeMapField(ns, DURABILITY,
621        //getDurability(rq.getDurability()));
622        //}
623
624        if self.max_read_kb > 0 {
625            ns.write_i32_field(MAX_READ_KB, self.max_read_kb as i32);
626        }
627        if self.max_write_kb > 0 {
628            ns.write_i32_field(MAX_WRITE_KB, self.max_write_kb as i32);
629        }
630        //if self.limit > 0 {
631        //ns.write_i32_field(NUMBER_LIMIT, self.limit as i32);
632        //}
633
634        //writeMapFieldNZ(ns, TRACE_LEVEL, rq.getTraceLevel());
635        //if (rq.getTraceLevel() > 0) {
636        //writeMapField(ns, TRACE_AT_LOG_FILES, rq.getLogFileTracing());
637        //writeMapField(ns, BATCH_COUNTER, rq.getBatchCounter());
638
639        ns.write_i32_field(QUERY_VERSION, 3); // TODO: QUERY_V4
640        if self.prepared_statement.is_empty() == false {
641            ns.write_bool_field(IS_PREPARED, true);
642            ns.write_bool_field(IS_SIMPLE_QUERY, self.prepared_statement.is_simple());
643            ns.write_binary_field(PREPARED_QUERY, &self.prepared_statement.statement);
644            if self.prepared_statement.data.bind_variables.len() > 0 {
645                ns.start_array(BIND_VARIABLES);
646                for (k, v) in &self.prepared_statement.data.bind_variables {
647                    ns.start_map("");
648                    trace!(" BIND: name={} value={:?}", k, v);
649                    ns.write_string_field(NAME, k);
650                    ns.write_field(VALUE, v);
651                    ns.end_map("");
652                    ns.incr_size(1);
653                }
654                ns.end_array(BIND_VARIABLES);
655            }
656        } else {
657            if let Some(s) = &self.statement {
658                ns.write_string_field(STATEMENT, &s);
659            } else {
660                return ia_err!("no statement or prepared statement");
661            }
662        }
663        if let Some(ck) = &self.continuation_key {
664            if ck.len() > 0 {
665                ns.write_binary_field(CONTINUATION_KEY, ck);
666                //println!("Wrote {} byte continuation key", ck.len());
667            }
668        }
669
670        //writeLongMapFieldNZ(ns, SERVER_MEMORY_CONSUMPTION,
671        //rq.getMaxServerMemoryConsumption());
672        //writeMathContext(ns, rq.getMathContext());
673
674        if self.shard_id > -1 {
675            //println!("Q: SHARD_ID={}", self.shard_id);
676            ns.write_i32_field(SHARD_ID, self.shard_id);
677        }
678        //if (queryVersion >= QueryDriver.QUERY_V4) {
679        //if (rq.getQueryName() != null) {
680        //writeMapField(ns, QUERY_NAME, rq.getQueryName());
681        //}
682        //if (rq.getVirtualScan() != null) {
683        //writeVirtualScan(ns, rq.getVirtualScan());
684        //}
685        //}
686
687        ns.end_payload();
688        ns.end_request();
689        Ok(())
690    }
691
692    // TODO
693    //theRCB.tallyRateLimitDelayedMs(result.getRateLimitDelayedMs());
694    //theRCB.tallyRetryStats(result.getRetryStats());
695    // TODO: support deduping of results
696
697    pub(crate) fn add_results(
698        &self,
699        walker: &mut MapWalker,
700        results: &mut Vec<MapValue>,
701    ) -> Result<(), NoSQLError> {
702        let t = FieldType::try_from_u8(walker.r.read_byte()?)?;
703        if t != FieldType::Array {
704            return ia_err!("bad type in queryResults: {:?}, should be Array", t);
705        }
706        walker.r.read_i32()?; // length of array in bytes
707        let num_elements = walker.r.read_i32()?;
708        trace!("read_results: num_results={}", num_elements);
709        if num_elements <= 0 {
710            return Ok(());
711        }
712        for _i in 0..num_elements {
713            if let FieldValue::Map(m) = walker.r.read_field_value()? {
714                //println!("Result: {:?}", m);
715                results.push(m);
716            } else {
717                return ia_err!("got invalid type of value in query results");
718            }
719        }
720        Ok(())
721    }
722
723    // Deserialize results for a QueryRequest.
724    fn nson_deserialize(
725        &mut self,
726        r: &mut Reader,
727        results: &mut Vec<MapValue>,
728        iter_data: &mut ReceiveIterData,
729    ) -> Result<(), NoSQLError> {
730        // TODO short serialVersion
731        // TODO short queryVersion
732
733        let is_prepared_request = !self.prepared_statement.is_empty();
734
735        let mut ti = TopologyInfo::default();
736        self.continuation_key = None;
737        iter_data.continuation_key = None;
738
739        let mut walker = MapWalker::new(r)?;
740        while walker.has_next() {
741            walker.next()?;
742            let name = walker.current_name();
743            match name.as_str() {
744                ERROR_CODE => {
745                    walker.handle_error_code()?;
746                }
747                CONSUMED => {
748                    let cap = walker.read_nson_consumed_capacity()?;
749                    self.consumed_capacity.add(&cap);
750                }
751                QUERY_RESULTS => {
752                    self.add_results(&mut walker, results)?;
753                }
754                CONTINUATION_KEY => {
755                    let ck = walker.read_nson_binary()?;
756                    if ck.len() > 0 {
757                        trace!("Read {} byte continuation key", ck.len());
758                        iter_data.continuation_key = Some(ck.clone());
759                        self.continuation_key = Some(ck);
760                    }
761                }
762                SORT_PHASE1_RESULTS => {
763                    self.read_phase_1_results(iter_data, &walker.read_nson_binary()?)?;
764                }
765                PREPARED_QUERY => {
766                    if is_prepared_request {
767                        return ia_err!("got prepared query in result for already prepared query");
768                    }
769                    self.prepared_statement.statement = walker.read_nson_binary()?;
770                }
771                DRIVER_QUERY_PLAN => {
772                    if is_prepared_request {
773                        return ia_err!("got driver plan in result for already prepared query");
774                    }
775                    let v = walker.read_nson_binary()?;
776                    self.get_driver_plan_info(&v)?;
777                }
778                REACHED_LIMIT => {
779                    self.reached_limit = walker.read_nson_boolean()?;
780                    trace!("REACHED_LIMIT={}", self.reached_limit);
781                }
782                TABLE_NAME => {
783                    self.prepared_statement.table_name = Some(walker.read_nson_string()?);
784                }
785                NAMESPACE => {
786                    self.prepared_statement.namespace = Some(walker.read_nson_string()?);
787                }
788                QUERY_PLAN_STRING => {
789                    self.prepared_statement.query_plan = walker.read_nson_string()?;
790                }
791                QUERY_RESULT_SCHEMA => {
792                    self.prepared_statement.query_schema = walker.read_nson_string()?;
793                }
794                QUERY_OPERATION => {
795                    // TODO: is this an enum? try_from()?
796                    self.prepared_statement.operation = walker.read_nson_i32()? as u8;
797                }
798                TOPOLOGY_INFO => {
799                    //println!("deser: TOPOLOGY_INFO");
800                    self.prepared_statement.topology_info = Some(walker.read_nson_topology_info()?);
801                }
802                /* QUERY_V3 and earlier return topo differently */
803                PROXY_TOPO_SEQNUM => {
804                    //println!("deser: PROXY_TOPO_SEQNUM");
805                    ti.seq_num = walker.read_nson_i32()?;
806                }
807                SHARD_IDS => {
808                    //println!("deser: SHARD_IDS");
809                    ti.shard_ids = walker.read_nson_i32_array()?;
810                }
811                _ => {
812                    trace!("   query_response: skipping field '{}'", name);
813                    walker.skip_nson_field()?;
814                } /*
815                  // added in QUERY_V4
816                  else if (name.equals(VIRTUAL_SCANS)) {
817                      readType(in, Nson.TYPE_ARRAY);
818                      in.readInt(); // length of array in bytes
819                      int numScans = in.readInt(); // number of array elements
820                      virtualScans = new VirtualScan[numScans];
821                      for (int i = 0; i < numScans; ++i) {
822                          virtualScans[i] = readVirtualScan(in);
823                      }
824
825                  /* added in QUERY_V4 */
826                  } else if (name.equals(QUERY_BATCH_TRACES)) {
827                      readType(in, Nson.TYPE_ARRAY);
828                      in.readInt(); // length of array in bytes
829                      int numTraces = in.readInt() / 2; // number of array elements
830                      queryTraces = new TreeMap<String,String>();
831                      for (int i = 0; i < numTraces; ++i) {
832                          String batchName = Nson.readNsonString(in);
833                          String batchTrace = Nson.readNsonString(in);
834                          queryTraces.put(batchName, batchTrace);
835                      }
836                  }
837                  */
838            }
839        }
840
841        if ti.is_valid() {
842            self.prepared_statement.topology_info = Some(ti);
843        }
844
845        if let Some(ti) = &self.prepared_statement.topology_info {
846            //println!("deser: got TI={:?}", ti);
847            self.topology_info = ti.clone();
848        } else {
849            trace!("deser: NO VALID TOPOLOGY RECEIVED");
850        }
851
852        if self.prepare_only == true {
853            if self.prepared_statement.is_empty() {
854                return ia_err!("got no prepared statement when prepare_only was set");
855            }
856            self.is_done = true;
857        } else {
858            if !self.prepared_statement.is_simple() && self.continuation_key.is_none() {
859                // dummy cont key so is_done won't be set
860                trace!("Adding dummy continuation key");
861                self.continuation_key = Some(Vec::new());
862            }
863        }
864
865        Ok(())
866    }
867
868    fn get_driver_plan_info(&mut self, v: &Vec<u8>) -> Result<(), NoSQLError> {
869        if v.len() == 0 {
870            return Ok(());
871        }
872        let mut r = Reader::new().from_bytes(v);
873        self.prepared_statement.driver_query_plan = deserialize_plan_iter(&mut r)?;
874        //println!(
875        //"driver query plan:\n{:?}",
876        //self.prepared_statement.driver_query_plan
877        //);
878        if self.prepared_statement.driver_query_plan.get_kind() == PlanIterKind::Empty {
879            return Ok(());
880        }
881        self.prepared_statement.num_iterators = r.read_i32()?;
882        //println!(
883        //"   QUERY_PLAN: iterators={}",
884        //self.prepared_statement.num_iterators
885        //);
886        self.prepared_statement.num_registers = r.read_i32()?;
887        //println!(
888        //"   QUERY_PLAN: registers={}",
889        //self.prepared_statement.num_registers
890        //);
891        let len = r.read_i32()?;
892        if len <= 0 {
893            return Ok(());
894        }
895        let mut hm: HashMap<String, i32> = HashMap::with_capacity(len as usize);
896        for _i in 0..len {
897            let name = r.read_string()?;
898            let id = r.read_i32()?;
899            hm.insert(name, id);
900        }
901        self.prepared_statement.variable_to_ids = Some(hm);
902        Ok(())
903    }
904
905    fn read_phase_1_results(
906        &mut self,
907        iter_data: &mut ReceiveIterData,
908        arr: &Vec<u8>,
909    ) -> Result<(), NoSQLError> {
910        let mut r: Reader = Reader::new().from_bytes(arr);
911        iter_data.in_sort_phase_1 = r.read_bool()?;
912        iter_data.pids = r.read_i32_array()?;
913        if iter_data.pids.len() > 0 {
914            iter_data.num_results_per_pid = r.read_i32_array()?;
915            iter_data.part_continuation_keys = Vec::new();
916            for _x in 0..iter_data.num_results_per_pid.len() {
917                iter_data.part_continuation_keys.push(r.read_binary()?);
918            }
919        }
920        Ok(())
921    }
922
923    pub(crate) fn get_result(&mut self, reg: i32) -> FieldValue {
924        if self.num_registers <= reg {
925            panic!("INVALID GET REGISTER ACCESS");
926        }
927        //println!(
928        //" get_result register {}: {:?}",
929        //reg, self.registers[reg as usize]
930        //);
931        std::mem::take(&mut self.registers[reg as usize])
932    }
933
934    pub(crate) fn get_result_ref(&self, reg: i32) -> &FieldValue {
935        //println!(
936        //" get_result_ref register {}: {:?}",
937        //reg, self.registers[reg as usize]
938        //);
939        &self.registers[reg as usize]
940    }
941
942    pub(crate) fn set_result(&mut self, reg: i32, val: FieldValue) {
943        if self.num_registers <= reg {
944            panic!("INVALID SET REGISTER ACCESS");
945        }
946        //println!(" set_result register {}: {:?}", reg, val);
947        self.registers[reg as usize] = val;
948    }
949}