oracle_nosql_rust_sdk/query_request.rs
1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5// https://oss.oracle.com/licenses/upl/
6//
7use crate::error::ia_err;
8use crate::error::NoSQLError;
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::plan_iter::{deserialize_plan_iter, PlanIterKind, PlanIterState};
13use crate::prepared_statement::PreparedStatement;
14use crate::reader::Reader;
15use crate::receive_iter::ReceiveIterData;
16use crate::types::NoSQLColumnToFieldValue;
17use crate::types::{Capacity, Consistency, FieldType, FieldValue, MapValue, OpCode, TopologyInfo};
18use crate::writer::Writer;
19
20use std::collections::HashMap;
21use std::result::Result;
22use std::time::Duration;
23use tracing::trace;
24
25/// Encapsulates a SQL query of a NoSQL Database table.
26///
27/// A query may be either a string query
28/// statement or a prepared query, which may include bind variables.
29/// A query request cannot have both a string statement and prepared query, but
30/// it must have one or the other.
31///
32/// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
33///
34/// ## Simple Example
35/// Here is a simple example of running a query that will return every row in a table named `users`:
36///
37/// ```no_run
38/// # use oracle_nosql_rust_sdk::{Handle, QueryRequest};
39/// # #[tokio::main]
40/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
41/// let handle = Handle::builder().build().await?;
42/// let results = QueryRequest::new("select * from users")
43/// .execute(&handle).await?;
44/// for row in results.rows() {
45/// println!("Row = {}", row);
46/// }
47/// # Ok(())
48/// # }
49/// ```
50///
51/// For performance reasons, prepared queries are preferred for queries that may
52/// be reused. Prepared queries bypass compilation of the query. They also allow
53/// for parameterized queries using bind variables.
54#[derive(Default, Debug)]
55pub struct QueryRequest {
56 pub(crate) prepare_only: bool,
57 //pub(crate) limit: u32,
58 pub(crate) max_read_kb: u32,
59 pub(crate) max_write_kb: u32,
60 pub(crate) consistency: Consistency,
61 pub(crate) timeout: Option<Duration>,
62 pub(crate) compartment_id: String,
63
64 // max_memory_consumption specifies the maximum amount of memory in bytes that
65 // may be consumed by the query at the client for operations such as
66 // duplicate elimination (which may be required due to the use of an index
67 // on an array or map) and sorting. Such operations may consume a lot of
68 // memory as they need to cache the full result set or a large subset of
69 // it at the client memory.
70 //
71 // The default value is 1GB (1,000,000,000).
72 // TODO pub max_memory_consumption: i64,
73
74 // Durability is currently only used in On-Prem installations.
75 // This setting only applies if the query modifies
76 // a row using an INSERT, UPSERT, or DELETE statement. If the query is
77 // read-only it is ignored.
78 // Added in SDK Version 1.4.0
79 // TODO Durability types.Durability
80
81 // private fields: driver and RCB data
82
83 // statement specifies a query statement.
84 statement: Option<String>,
85
86 // prepared_statement specifies the prepared query statement.
87 pub(crate) prepared_statement: PreparedStatement,
88
89 // shortcuts
90 has_driver: bool,
91
92 pub(crate) is_done: bool,
93
94 // created/used by internal iterators
95 is_internal: bool,
96
97 // reached_limit indicates if the query execution reached the size-based or
98 // number-based limit. If so, query execution must stop and a batch of
99 // results (potentially empty) must be returned to the application.
100 pub(crate) reached_limit: bool,
101
102 pub(crate) consumed_capacity: Capacity,
103
104 // memory_consumption represents the amount of memory in bytes that were
105 // consumed by the query at the client for operations such as duplicate
106 // elimination and sorting.
107 // TODO pub memory_consumption: i64,
108
109 // sql_hash_tag is a portion of the hash value of SQL text, used as a tag
110 // for query tracing.
111 pub(crate) sql_hash_tag: Vec<u8>,
112
113 // err represents a non-retryable error returned by a query batch.
114 err: Option<NoSQLError>,
115
116 pub(crate) continuation_key: Option<Vec<u8>>,
117
118 pub(crate) shard_id: i32,
119
120 // total number of batches executed
121 pub(crate) batch_counter: i32,
122
123 // for "advanced" queries using plan iterators
124 pub(crate) num_registers: i32,
125 pub(crate) registers: Vec<FieldValue>,
126
127 pub(crate) topology_info: TopologyInfo,
128}
129
130/// Struct representing the result of a query operation.
131#[derive(Default, Debug)]
132pub struct QueryResult {
133 pub(crate) rows: Vec<MapValue>,
134 pub(crate) prepared_statement: PreparedStatement,
135 pub(crate) consumed: Capacity,
136 // TODO: stats, consumed, etc.
137}
138
139impl QueryResult {
140 /// Get the query result rows, if any.
141 ///
142 /// If the query returned no rows, this will return a reference to an empty vector.
143 /// Otherwise, it will return a reference to the rows in the order specified by the query.
144 pub fn rows(&self) -> &Vec<MapValue> {
145 &self.rows
146 }
147 /// Take the query result rows, setting the result back to an empty vector.
148 ///
149 /// If the query returned no rows, this will return an empty vector.
150 /// Otherwise, it will return the rows in the vector, giving the ownership
151 /// of the rows to the caller.
152 pub fn take_rows(&mut self) -> Vec<MapValue> {
153 std::mem::take(&mut self.rows)
154 }
155 /// Get the prepared statement after execution of a query.
156 ///
157 /// The prepared statement can then be used in subsequent query requests, saving the
158 /// extra step of preparing each query again.
159 pub fn prepared_statement(&self) -> PreparedStatement {
160 let mut ps = self.prepared_statement.clone();
161 let _ = ps.reset();
162 ps
163 }
164 /// Return the total capacity that was consumed during the execution of the query.
165 ///
166 /// This is only relevant for NoSQL Cloud operation. It returns a [`Capacity`] struct which
167 /// contains the total read units, read KB, and write units used by the query execution.
168 pub fn consumed(&self) -> Capacity {
169 self.consumed.clone()
170 }
171}
172
173impl QueryRequest {
174 /// Create a new QueryRequest from a SQL query string.
175 ///
176 /// While this struct is named `QueryRequest`, the SQL supplied to it does not
177 /// necessarily have to be a `SELECT` query. It could also be one of `INSERT`, `UPDATE`,
178 /// or `DELETE`.
179 ///
180 /// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
181 ///
182 /// Note: this request should not be used for DDL statements (those that create or modify tables or indexes, such as `CREATE TABLE`). For DDL statements, use [`TableRequest`](crate::TableRequest) instead.
183 ///
184 pub fn new(statement: &str) -> Self {
185 QueryRequest {
186 statement: Some(statement.to_string()),
187 shard_id: -1,
188 ..Default::default()
189 }
190 }
191
192 /// Create a new QueryRequest from a previously prepared query statement.
193 ///
194 /// Use of this method is recommended when executing the same type of query multiple
195 /// times with different values for parameters. Doing so will save resources by not
196 /// re-preparing the query on every execution.
197 ///
198 /// To set bind variables for query execution, first create the request with this method,
199 /// then call [`QueryRequest::set_variable()`] for all desired bind variables. Then execute the
200 /// query with [`QueryRequest::execute()`].
201 pub fn new_prepared(prepared_statement: &PreparedStatement) -> Self {
202 let ti: TopologyInfo;
203 if let Some(t) = &prepared_statement.topology_info {
204 ti = t.clone();
205 } else {
206 panic!(
207 "Invalid prepared statement passed to new_prepared! Missing toploogy info. ps={:?}",
208 prepared_statement
209 );
210 }
211 QueryRequest {
212 prepared_statement: prepared_statement.clone(),
213 shard_id: -1,
214 topology_info: ti,
215 ..Default::default()
216 }
217 }
218
219 /// Specify that this query execution should only prepare the query.
220 ///
221 /// Setting this value to true and then calling [`QueryRequest::execute()`]
222 /// will result in only the query being prepared, and no result rows being returned.
223 /// The prepared statement can then be retrieved using [`QueryResult::prepared_statement()`]
224 /// and can be used in subsequent query calls using [`QueryRequest::new_prepared()`].
225 pub fn prepare_only(mut self) -> Self {
226 self.prepare_only = true;
227 self
228 }
229
230 /// Specify the timeout value for the request.
231 ///
232 /// This is optional.
233 /// If set, it must be greater than or equal to 1 millisecond, otherwise an
234 /// IllegalArgument error will be returned.
235 /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
236 pub fn timeout(mut self, t: &Duration) -> Self {
237 self.timeout = Some(t.clone());
238 self
239 }
240
241 /// Cloud Service only: set the name or id of a compartment to be used for this operation.
242 ///
243 /// If the associated handle authenticated as an Instance Principal, this value must be an OCID.
244 /// In all other cases, the value may be specified as either a name (or path for nested compartments) or as an OCID.
245 ///
246 /// If no compartment is given, the default compartment id for the handle is used. If that value was
247 /// not specified, the root compartment of the tenancy will be used.
248 pub fn compartment_id(mut self, compartment_id: &str) -> Self {
249 self.compartment_id = compartment_id.to_string();
250 self
251 }
252
253 // Specify a limit on number of items returned by the operation.
254 //
255 // This allows an operation to return less than the default amount of data.
256 //pub fn limit(mut self, l: u32) -> Self {
257 //self.limit = l;
258 //self
259 //}
260
261 /// Specify the desired consistency policy for the request.
262 ///
263 /// If not set, the default consistency of [`Consistency::Eventual`] is used.
264 pub fn consistency(mut self, c: &Consistency) -> Self {
265 self.consistency = c.clone();
266 self
267 }
268
269 /// Specify the limit on the total data read during a single batch operation, in KB.
270 ///
271 /// For cloud service, this value can only reduce the system defined limit.
272 /// An attempt to increase the limit beyond the system defined limit will
273 /// cause an IllegalArgument error. This limit is independent of read units
274 /// consumed by the operation.
275 ///
276 /// It is recommended that for tables with relatively low provisioned read
277 /// throughput that this limit be set to less than or equal to one half
278 /// of the provisioned throughput in order to reduce the possibility of throttling
279 /// errors.
280 pub fn max_read_kb(mut self, max: u32) -> Self {
281 self.max_read_kb = max;
282 self
283 }
284
285 /// Specify the limit on the total data written during a single batch operation, in KB.
286 ///
287 /// For cloud service, this value can only reduce the system defined limit.
288 /// An attempt to increase the limit beyond the system defined limit will
289 /// cause an IllegalArgument error. This limit is independent of write units
290 /// consumed by the operation.
291 ///
292 /// This limit is independent of write units consumed by the operation.
293 pub fn max_write_kb(mut self, max: u32) -> Self {
294 self.max_write_kb = max;
295 self
296 }
297
298 // used by ext_var_ref_iter
299 pub(crate) fn get_external_var(&self, id: i32) -> Option<&FieldValue> {
300 if self.prepared_statement.is_empty() {
301 return None;
302 }
303 self.prepared_statement.get_variable_by_id(id)
304 }
305
306 // note private
307 async fn get_results(
308 &mut self,
309 handle: &Handle,
310 results: &mut Vec<MapValue>,
311 ) -> Result<(), NoSQLError> {
312 // this is where everything happens
313
314 if let Some(e) = &self.err {
315 return Err(e.clone());
316 }
317
318 if self.prepare_only == true || self.prepared_statement.is_simple() {
319 // results already fetched from nson_deserialize()
320 //println!("get_results: prepare or simple: returning");
321 return Ok(());
322 }
323
324 //let driver_plan = &mut self.prepared_statement.driver_query_plan;
325 let mut driver_plan = std::mem::take(&mut self.prepared_statement.driver_query_plan);
326
327 if driver_plan.get_state() == PlanIterState::Uninitialized {
328 //println!("get_results: initializing driver plan");
329 self.reached_limit = false;
330 //self.memory_consumption = 0;
331 self.consumed_capacity = Capacity::default();
332 self.sql_hash_tag = Default::default();
333
334 self.consumed_capacity.read_kb += 1; // prep cost
335 self.consumed_capacity.read_units += 1; // prep cost
336 driver_plan.open(self, handle)?;
337 }
338
339 let mut more;
340 loop {
341 //println!("get_results: calling driver_plan.next()");
342 more = driver_plan.next(self, handle).await?;
343 if more == false {
344 //println!("get_results: no more results: breaking");
345 break;
346 }
347 //println!("get_results: pushing 1 result");
348 results.push(driver_plan.get_result(self).get_map_value()?);
349 //if self.limit > 0 && results.len() >= self.limit as usize {
350 //println!(
351 //"get_results: reached limit: results_size={}, limit={}",
352 //results.len(),
353 //self.limit
354 //);
355 //self.reached_limit = true;
356 //break;
357 //}
358 }
359
360 self.prepared_statement.driver_query_plan = driver_plan;
361
362 if more {
363 // non-advanced queries just need Some/None, value not used
364 self.continuation_key = Some(Vec::new());
365 self.is_done = false;
366 } else {
367 if self.reached_limit {
368 // there is more to do, but we reached a limit
369 self.continuation_key = Some(Vec::new());
370 self.reached_limit = false;
371 self.is_done = false;
372 } else {
373 self.continuation_key = None;
374 self.is_done = true;
375 }
376 }
377
378 Ok(())
379 }
380
381 pub(crate) fn copy_for_internal(&self) -> Self {
382 if self.prepared_statement.is_empty() {
383 panic!("prepared statement is empty in copy_for_internal");
384 }
385 QueryRequest {
386 is_internal: true,
387 prepared_statement: self.prepared_statement.copy_for_internal(),
388 shard_id: self.shard_id,
389 //limit: self.limit,
390 // purposefully not copying registers
391 num_registers: -1,
392 timeout: self.timeout.clone(),
393 ..Default::default()
394 }
395 }
396
397 pub(crate) fn reset(&mut self) -> Result<(), NoSQLError> {
398 self.is_done = false;
399 self.reached_limit = false;
400 self.batch_counter = 0;
401 self.consumed_capacity = Capacity::default();
402 // clear prepared statement iterators
403 self.prepared_statement.reset()
404 }
405
406 /// Set a named bind variable for execution of a prepared query.
407 ///
408 /// See [`PreparedStatement`] for an example of using this method.
409 pub fn set_variable(
410 &mut self,
411 name: &str,
412 value: &impl NoSQLColumnToFieldValue,
413 ) -> Result<(), NoSQLError> {
414 if self.prepared_statement.is_empty() {
415 return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
416 }
417 let fv = value.to_field_value();
418 self.prepared_statement.set_variable(name, &fv)
419 }
420
421 /// Set a positional bind variable for execution of a prepared query.
422 ///
423 /// This is similar to [`set_variable()`](QueryRequest::set_variable()) but uses integer-based positional parameters:
424 /// ```no_run
425 /// # use oracle_nosql_rust_sdk::{Handle, QueryRequest, NoSQLColumnToFieldValue};
426 /// # #[tokio::main]
427 /// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
428 /// let handle = Handle::builder().build().await?;
429 /// let prep_result = QueryRequest::new("insert into testusers(id, name) values(?, ?)")
430 /// .prepare_only()
431 /// .execute(&handle)
432 /// .await?;
433 /// let data = vec!["jane", "john", "jasper"];
434 /// let mut qreq = QueryRequest::new_prepared(&prep_result.prepared_statement());
435 /// for i in 0..data.len() {
436 /// let id = (i as i32) + 100;
437 /// qreq.set_variable_by_id(1, &id)?;
438 /// qreq.set_variable_by_id(2, &data[i])?;
439 /// let result = qreq.execute(&handle).await?;
440 /// println!("Insert result = {:?}", result);
441 /// }
442 /// # Ok(())
443 /// # }
444 pub fn set_variable_by_id(
445 &mut self,
446 id: i32,
447 value: &impl NoSQLColumnToFieldValue,
448 ) -> Result<(), NoSQLError> {
449 if self.prepared_statement.is_empty() {
450 return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
451 }
452 let fv = value.to_field_value();
453 self.prepared_statement.set_variable_by_id(id, &fv)
454 }
455
456 /// Execute the query to full completion.
457 ///
458 /// This is the preferred method for execution of a query. Internally, this method will loop
459 /// calling `execute_batch()` until all results are returned and all post-processing (sorting,
460 /// grouping, aggregations, etc) are complete.
461 ///
462 /// If the query has no rows to return, [`QueryResult::rows()`] will return an empty vector.
463 /// Otherwise it will return a vector of
464 /// [`MapValue`](crate::types::MapValue) structs in the order specified by the
465 /// query statement.
466 pub async fn execute(&mut self, h: &Handle) -> Result<QueryResult, NoSQLError> {
467 let mut iter_data = ReceiveIterData::default();
468 let mut results: Vec<MapValue> = Vec::new();
469 self.reset()?;
470 while self.is_done == false {
471 //println!("execute_internal doing next batch");
472 self.execute_batch_internal(h, &mut results, &mut iter_data)
473 .await?;
474 self.batch_counter += 1;
475 if self.batch_counter > 10000 {
476 panic!("Batch_internal infinite loop detected: self={:?}", self);
477 }
478 }
479
480 if self.prepared_statement.is_empty() {
481 panic!("empty prepared statement after execute!");
482 }
483
484 // TODO: retries, stats, etc
485 let mut qres = QueryResult {
486 prepared_statement: self.prepared_statement.clone(),
487 consumed: self.consumed_capacity.clone(),
488 rows: results,
489 };
490 let _ = qres.prepared_statement.reset();
491 Ok(qres)
492 }
493
494 /// Execute one batch of a query.
495 ///
496 /// This will execute at most one round-trip to the server. It should be called in a loop
497 /// until `is_done()` returns `true`. Note that any one batch execution may not set any results,
498 /// since some queries require many server round trips to finish (sorting, for example).
499 ///
500 /// It is recommended to use [`execute()`](QueryRequest::execute()) instead of this method.
501 /// *This method may be deprecated in future releases*.
502 pub async fn execute_batch(
503 &mut self,
504 handle: &Handle,
505 results: &mut Vec<MapValue>,
506 ) -> Result<(), NoSQLError> {
507 let mut _data = ReceiveIterData::default();
508 self.execute_batch_internal(handle, results, &mut _data)
509 .await
510 }
511
512 /// Determine if the query is complete.
513 ///
514 /// If using [`QueryRequest::execute_batch()`] in a loop, this method determines when
515 /// to terminate the loop, specifying that no more results exist for this query execution.
516 /// This is only necessary if executing queries in batch looping mode.
517 pub fn is_done(&self) -> bool {
518 self.is_done
519 }
520
521 pub(crate) async fn execute_batch_internal(
522 &mut self,
523 handle: &Handle,
524 results: &mut Vec<MapValue>,
525 iter_data: &mut ReceiveIterData,
526 ) -> Result<(), NoSQLError> {
527 trace!(
528 "EBI: batch_counter={} num_results={}",
529 self.batch_counter,
530 results.len()
531 );
532
533 self.reached_limit = false;
534
535 // internal queries do not use plan iterators/etc - they just return plain results.
536 if self.is_internal == false {
537 /*
538 * The following "if" may be true for advanced queries only. For
539 * such queries, the "if" will be true (i.e., the QueryRequest will
540 * be bound with a QueryDriver) if and only if this is not the 1st
541 * execute() call for this query. In this case we just return a new,
542 * empty QueryResult. Actual computation of a result batch will take
543 * place when the app calls getResults() on the QueryResult.
544 */
545 if self.has_driver {
546 //trace("QueryRequest has QueryDriver", 2);
547 return self.get_results(handle, results).await;
548 }
549
550 /*
551 * If it is an advanced query and we are here, then this must be
552 * the 1st execute() call for the query. If the query has been
553 * prepared before, we create a QueryDriver and bind it with the
554 * QueryRequest. Then, we create and return an empty QueryResult.
555 * Actual computation of a result batch will take place when the
556 * app calls getResults() on the QueryResult.
557 */
558 if self.prepared_statement.is_empty() == false
559 && self.prepared_statement.is_simple() == false
560 {
561 //trace("QueryRequest has no QueryDriver, but is prepared", 2);
562 self.num_registers = self.prepared_statement.num_registers;
563 self.registers = Vec::new();
564 for _i in 0..self.num_registers {
565 self.registers.push(FieldValue::Uninitialized);
566 }
567 self.has_driver = true;
568 return self.get_results(handle, results).await;
569 }
570
571 /*
572 * If we are here, then this is either (a) a simple query or (b) an
573 * advanced query that has not been prepared already, which also
574 * implies that this is the 1st execute() call on this query. For
575 * a non-prepared advanced query, the effect of this 1st execute()
576 * call is to send the query to the proxy for compilation, get back
577 * the prepared query, but no query results, create a QueryDriver,
578 * and bind it with the QueryRequest (see
579 * QueryRequestSerializer.deserialize()), and return an empty
580 * QueryResult.
581 * a non-prepared simple query will return results.
582 */
583 //trace("QueryRequest has no QueryDriver and is not prepared", 2);
584 //self.batch_counter += 1;
585 }
586
587 let mut w: Writer = Writer::new();
588 w.write_i16(handle.inner.serial_version);
589 let timeout = handle.get_timeout(&self.timeout);
590 self.serialize_internal(&mut w, &timeout)?;
591 let mut opts = SendOptions {
592 timeout: timeout,
593 retryable: true,
594 compartment_id: self.compartment_id.clone(),
595 ..Default::default()
596 };
597 let mut r = handle.send_and_receive(w, &mut opts).await?;
598 self.continuation_key = None;
599 self.nson_deserialize(&mut r, results, iter_data)?;
600 if self.continuation_key.is_none() {
601 trace!("continuation key is None, setting is_done");
602 self.is_done = true;
603 }
604 Ok(())
605 }
606
607 fn serialize_internal(&self, w: &mut Writer, timeout: &Duration) -> Result<(), NoSQLError> {
608 let mut ns = NsonSerializer::start_request(w);
609 ns.start_header();
610 if self.prepare_only {
611 ns.write_header(OpCode::Prepare, timeout, "");
612 } else {
613 ns.write_header(OpCode::Query, timeout, "");
614 }
615 ns.end_header();
616 ns.start_payload();
617
618 //TODO writeConsistency(ns, rq.getConsistency());
619 //if (rq.getDurability() != null) {
620 //writeMapField(ns, DURABILITY,
621 //getDurability(rq.getDurability()));
622 //}
623
624 if self.max_read_kb > 0 {
625 ns.write_i32_field(MAX_READ_KB, self.max_read_kb as i32);
626 }
627 if self.max_write_kb > 0 {
628 ns.write_i32_field(MAX_WRITE_KB, self.max_write_kb as i32);
629 }
630 //if self.limit > 0 {
631 //ns.write_i32_field(NUMBER_LIMIT, self.limit as i32);
632 //}
633
634 //writeMapFieldNZ(ns, TRACE_LEVEL, rq.getTraceLevel());
635 //if (rq.getTraceLevel() > 0) {
636 //writeMapField(ns, TRACE_AT_LOG_FILES, rq.getLogFileTracing());
637 //writeMapField(ns, BATCH_COUNTER, rq.getBatchCounter());
638
639 ns.write_i32_field(QUERY_VERSION, 3); // TODO: QUERY_V4
640 if self.prepared_statement.is_empty() == false {
641 ns.write_bool_field(IS_PREPARED, true);
642 ns.write_bool_field(IS_SIMPLE_QUERY, self.prepared_statement.is_simple());
643 ns.write_binary_field(PREPARED_QUERY, &self.prepared_statement.statement);
644 if self.prepared_statement.data.bind_variables.len() > 0 {
645 ns.start_array(BIND_VARIABLES);
646 for (k, v) in &self.prepared_statement.data.bind_variables {
647 ns.start_map("");
648 trace!(" BIND: name={} value={:?}", k, v);
649 ns.write_string_field(NAME, k);
650 ns.write_field(VALUE, v);
651 ns.end_map("");
652 ns.incr_size(1);
653 }
654 ns.end_array(BIND_VARIABLES);
655 }
656 } else {
657 if let Some(s) = &self.statement {
658 ns.write_string_field(STATEMENT, &s);
659 } else {
660 return ia_err!("no statement or prepared statement");
661 }
662 }
663 if let Some(ck) = &self.continuation_key {
664 if ck.len() > 0 {
665 ns.write_binary_field(CONTINUATION_KEY, ck);
666 //println!("Wrote {} byte continuation key", ck.len());
667 }
668 }
669
670 //writeLongMapFieldNZ(ns, SERVER_MEMORY_CONSUMPTION,
671 //rq.getMaxServerMemoryConsumption());
672 //writeMathContext(ns, rq.getMathContext());
673
674 if self.shard_id > -1 {
675 //println!("Q: SHARD_ID={}", self.shard_id);
676 ns.write_i32_field(SHARD_ID, self.shard_id);
677 }
678 //if (queryVersion >= QueryDriver.QUERY_V4) {
679 //if (rq.getQueryName() != null) {
680 //writeMapField(ns, QUERY_NAME, rq.getQueryName());
681 //}
682 //if (rq.getVirtualScan() != null) {
683 //writeVirtualScan(ns, rq.getVirtualScan());
684 //}
685 //}
686
687 ns.end_payload();
688 ns.end_request();
689 Ok(())
690 }
691
692 // TODO
693 //theRCB.tallyRateLimitDelayedMs(result.getRateLimitDelayedMs());
694 //theRCB.tallyRetryStats(result.getRetryStats());
695 // TODO: support deduping of results
696
697 pub(crate) fn add_results(
698 &self,
699 walker: &mut MapWalker,
700 results: &mut Vec<MapValue>,
701 ) -> Result<(), NoSQLError> {
702 let t = FieldType::try_from_u8(walker.r.read_byte()?)?;
703 if t != FieldType::Array {
704 return ia_err!("bad type in queryResults: {:?}, should be Array", t);
705 }
706 walker.r.read_i32()?; // length of array in bytes
707 let num_elements = walker.r.read_i32()?;
708 trace!("read_results: num_results={}", num_elements);
709 if num_elements <= 0 {
710 return Ok(());
711 }
712 for _i in 0..num_elements {
713 if let FieldValue::Map(m) = walker.r.read_field_value()? {
714 //println!("Result: {:?}", m);
715 results.push(m);
716 } else {
717 return ia_err!("got invalid type of value in query results");
718 }
719 }
720 Ok(())
721 }
722
723 // Deserialize results for a QueryRequest.
724 fn nson_deserialize(
725 &mut self,
726 r: &mut Reader,
727 results: &mut Vec<MapValue>,
728 iter_data: &mut ReceiveIterData,
729 ) -> Result<(), NoSQLError> {
730 // TODO short serialVersion
731 // TODO short queryVersion
732
733 let is_prepared_request = !self.prepared_statement.is_empty();
734
735 let mut ti = TopologyInfo::default();
736 self.continuation_key = None;
737 iter_data.continuation_key = None;
738
739 let mut walker = MapWalker::new(r)?;
740 while walker.has_next() {
741 walker.next()?;
742 let name = walker.current_name();
743 match name.as_str() {
744 ERROR_CODE => {
745 walker.handle_error_code()?;
746 }
747 CONSUMED => {
748 let cap = walker.read_nson_consumed_capacity()?;
749 self.consumed_capacity.add(&cap);
750 }
751 QUERY_RESULTS => {
752 self.add_results(&mut walker, results)?;
753 }
754 CONTINUATION_KEY => {
755 let ck = walker.read_nson_binary()?;
756 if ck.len() > 0 {
757 trace!("Read {} byte continuation key", ck.len());
758 iter_data.continuation_key = Some(ck.clone());
759 self.continuation_key = Some(ck);
760 }
761 }
762 SORT_PHASE1_RESULTS => {
763 self.read_phase_1_results(iter_data, &walker.read_nson_binary()?)?;
764 }
765 PREPARED_QUERY => {
766 if is_prepared_request {
767 return ia_err!("got prepared query in result for already prepared query");
768 }
769 self.prepared_statement.statement = walker.read_nson_binary()?;
770 }
771 DRIVER_QUERY_PLAN => {
772 if is_prepared_request {
773 return ia_err!("got driver plan in result for already prepared query");
774 }
775 let v = walker.read_nson_binary()?;
776 self.get_driver_plan_info(&v)?;
777 }
778 REACHED_LIMIT => {
779 self.reached_limit = walker.read_nson_boolean()?;
780 trace!("REACHED_LIMIT={}", self.reached_limit);
781 }
782 TABLE_NAME => {
783 self.prepared_statement.table_name = Some(walker.read_nson_string()?);
784 }
785 NAMESPACE => {
786 self.prepared_statement.namespace = Some(walker.read_nson_string()?);
787 }
788 QUERY_PLAN_STRING => {
789 self.prepared_statement.query_plan = walker.read_nson_string()?;
790 }
791 QUERY_RESULT_SCHEMA => {
792 self.prepared_statement.query_schema = walker.read_nson_string()?;
793 }
794 QUERY_OPERATION => {
795 // TODO: is this an enum? try_from()?
796 self.prepared_statement.operation = walker.read_nson_i32()? as u8;
797 }
798 TOPOLOGY_INFO => {
799 //println!("deser: TOPOLOGY_INFO");
800 self.prepared_statement.topology_info = Some(walker.read_nson_topology_info()?);
801 }
802 /* QUERY_V3 and earlier return topo differently */
803 PROXY_TOPO_SEQNUM => {
804 //println!("deser: PROXY_TOPO_SEQNUM");
805 ti.seq_num = walker.read_nson_i32()?;
806 }
807 SHARD_IDS => {
808 //println!("deser: SHARD_IDS");
809 ti.shard_ids = walker.read_nson_i32_array()?;
810 }
811 _ => {
812 trace!(" query_response: skipping field '{}'", name);
813 walker.skip_nson_field()?;
814 } /*
815 // added in QUERY_V4
816 else if (name.equals(VIRTUAL_SCANS)) {
817 readType(in, Nson.TYPE_ARRAY);
818 in.readInt(); // length of array in bytes
819 int numScans = in.readInt(); // number of array elements
820 virtualScans = new VirtualScan[numScans];
821 for (int i = 0; i < numScans; ++i) {
822 virtualScans[i] = readVirtualScan(in);
823 }
824
825 /* added in QUERY_V4 */
826 } else if (name.equals(QUERY_BATCH_TRACES)) {
827 readType(in, Nson.TYPE_ARRAY);
828 in.readInt(); // length of array in bytes
829 int numTraces = in.readInt() / 2; // number of array elements
830 queryTraces = new TreeMap<String,String>();
831 for (int i = 0; i < numTraces; ++i) {
832 String batchName = Nson.readNsonString(in);
833 String batchTrace = Nson.readNsonString(in);
834 queryTraces.put(batchName, batchTrace);
835 }
836 }
837 */
838 }
839 }
840
841 if ti.is_valid() {
842 self.prepared_statement.topology_info = Some(ti);
843 }
844
845 if let Some(ti) = &self.prepared_statement.topology_info {
846 //println!("deser: got TI={:?}", ti);
847 self.topology_info = ti.clone();
848 } else {
849 trace!("deser: NO VALID TOPOLOGY RECEIVED");
850 }
851
852 if self.prepare_only == true {
853 if self.prepared_statement.is_empty() {
854 return ia_err!("got no prepared statement when prepare_only was set");
855 }
856 self.is_done = true;
857 } else {
858 if !self.prepared_statement.is_simple() && self.continuation_key.is_none() {
859 // dummy cont key so is_done won't be set
860 trace!("Adding dummy continuation key");
861 self.continuation_key = Some(Vec::new());
862 }
863 }
864
865 Ok(())
866 }
867
868 fn get_driver_plan_info(&mut self, v: &Vec<u8>) -> Result<(), NoSQLError> {
869 if v.len() == 0 {
870 return Ok(());
871 }
872 let mut r = Reader::new().from_bytes(v);
873 self.prepared_statement.driver_query_plan = deserialize_plan_iter(&mut r)?;
874 //println!(
875 //"driver query plan:\n{:?}",
876 //self.prepared_statement.driver_query_plan
877 //);
878 if self.prepared_statement.driver_query_plan.get_kind() == PlanIterKind::Empty {
879 return Ok(());
880 }
881 self.prepared_statement.num_iterators = r.read_i32()?;
882 //println!(
883 //" QUERY_PLAN: iterators={}",
884 //self.prepared_statement.num_iterators
885 //);
886 self.prepared_statement.num_registers = r.read_i32()?;
887 //println!(
888 //" QUERY_PLAN: registers={}",
889 //self.prepared_statement.num_registers
890 //);
891 let len = r.read_i32()?;
892 if len <= 0 {
893 return Ok(());
894 }
895 let mut hm: HashMap<String, i32> = HashMap::with_capacity(len as usize);
896 for _i in 0..len {
897 let name = r.read_string()?;
898 let id = r.read_i32()?;
899 hm.insert(name, id);
900 }
901 self.prepared_statement.variable_to_ids = Some(hm);
902 Ok(())
903 }
904
905 fn read_phase_1_results(
906 &mut self,
907 iter_data: &mut ReceiveIterData,
908 arr: &Vec<u8>,
909 ) -> Result<(), NoSQLError> {
910 let mut r: Reader = Reader::new().from_bytes(arr);
911 iter_data.in_sort_phase_1 = r.read_bool()?;
912 iter_data.pids = r.read_i32_array()?;
913 if iter_data.pids.len() > 0 {
914 iter_data.num_results_per_pid = r.read_i32_array()?;
915 iter_data.part_continuation_keys = Vec::new();
916 for _x in 0..iter_data.num_results_per_pid.len() {
917 iter_data.part_continuation_keys.push(r.read_binary()?);
918 }
919 }
920 Ok(())
921 }
922
923 pub(crate) fn get_result(&mut self, reg: i32) -> FieldValue {
924 if self.num_registers <= reg {
925 panic!("INVALID GET REGISTER ACCESS");
926 }
927 //println!(
928 //" get_result register {}: {:?}",
929 //reg, self.registers[reg as usize]
930 //);
931 std::mem::take(&mut self.registers[reg as usize])
932 }
933
934 pub(crate) fn get_result_ref(&self, reg: i32) -> &FieldValue {
935 //println!(
936 //" get_result_ref register {}: {:?}",
937 //reg, self.registers[reg as usize]
938 //);
939 &self.registers[reg as usize]
940 }
941
942 pub(crate) fn set_result(&mut self, reg: i32, val: FieldValue) {
943 if self.num_registers <= reg {
944 panic!("INVALID SET REGISTER ACCESS");
945 }
946 //println!(" set_result register {}: {:?}", reg, val);
947 self.registers[reg as usize] = val;
948 }
949}