oracle_nosql_rust_sdk/query_request.rs
1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5// https://oss.oracle.com/licenses/upl/
6//
7use crate::error::ia_err;
8use crate::error::NoSQLError;
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::plan_iter::{deserialize_plan_iter, PlanIterKind, PlanIterState};
13use crate::prepared_statement::PreparedStatement;
14use crate::reader::Reader;
15use crate::receive_iter::ReceiveIterData;
16use crate::types::NoSQLColumnToFieldValue;
17use crate::types::{Capacity, Consistency, FieldType, FieldValue, MapValue, OpCode, TopologyInfo};
18use crate::writer::Writer;
19
20use std::collections::HashMap;
21use std::result::Result;
22use std::time::Duration;
23use tracing::trace;
24
25/// Encapsulates a SQL query of a NoSQL Database table.
26///
27/// A query may be either a string query
28/// statement or a prepared query, which may include bind variables.
29/// A query request cannot have both a string statement and prepared query, but
30/// it must have one or the other.
31///
32/// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
33///
34/// ## Simple Example
35/// Here is a simple example of running a query that will return every row in a table named `users`:
36///
37/// ```no_run
38/// # use oracle_nosql_rust_sdk::{Handle, QueryRequest};
39/// # #[tokio::main]
40/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
41/// let handle = Handle::builder().build().await?;
42/// let results = QueryRequest::new("select * from users")
43/// .execute(&handle).await?;
44/// for row in results.rows() {
45/// println!("Row = {}", row);
46/// }
47/// # Ok(())
48/// # }
49/// ```
50///
51/// For performance reasons, prepared queries are preferred for queries that may
52/// be reused. Prepared queries bypass compilation of the query. They also allow
53/// for parameterized queries using bind variables.
54#[derive(Default, Debug)]
55pub struct QueryRequest {
56 pub(crate) prepare_only: bool,
57 //pub(crate) limit: u32,
58 pub(crate) max_read_kb: u32,
59 pub(crate) max_write_kb: u32,
60 pub(crate) consistency: Consistency,
61 pub(crate) timeout: Option<Duration>,
62 pub(crate) compartment_id: String,
63
64 // max_memory_consumption specifies the maximum amount of memory in bytes that
65 // may be consumed by the query at the client for operations such as
66 // duplicate elimination (which may be required due to the use of an index
67 // on an array or map) and sorting. Such operations may consume a lot of
68 // memory as they need to cache the full result set or a large subset of
69 // it at the client memory.
70 //
71 // The default value is 1GB (1,000,000,000).
72 // TODO pub max_memory_consumption: i64,
73
74 // Durability is currently only used in On-Prem installations.
75 // This setting only applies if the query modifies
76 // a row using an INSERT, UPSERT, or DELETE statement. If the query is
77 // read-only it is ignored.
78 // Added in SDK Version 1.4.0
79 // TODO Durability types.Durability
80
81 // private fields: driver and RCB data
82
83 // statement specifies a query statement.
84 statement: Option<String>,
85
86 // prepared_statement specifies the prepared query statement.
87 pub(crate) prepared_statement: PreparedStatement,
88
89 // shortcuts
90 has_driver: bool,
91
92 pub(crate) is_done: bool,
93
94 // created/used by internal iterators
95 is_internal: bool,
96
97 // reached_limit indicates if the query execution reached the size-based or
98 // number-based limit. If so, query execution must stop and a batch of
99 // results (potentially empty) must be returned to the application.
100 pub(crate) reached_limit: bool,
101
102 pub(crate) consumed_capacity: Capacity,
103
104 // memory_consumption represents the amount of memory in bytes that were
105 // consumed by the query at the client for operations such as duplicate
106 // elimination and sorting.
107 // TODO pub memory_consumption: i64,
108
109 // sql_hash_tag is a portion of the hash value of SQL text, used as a tag
110 // for query tracing.
111 pub(crate) sql_hash_tag: Vec<u8>,
112
113 // err represents a non-retryable error returned by a query batch.
114 err: Option<NoSQLError>,
115
116 pub(crate) continuation_key: Option<Vec<u8>>,
117
118 pub(crate) shard_id: i32,
119
120 // total number of batches executed
121 pub(crate) batch_counter: i32,
122
123 // for "advanced" queries using plan iterators
124 pub(crate) num_registers: i32,
125 pub(crate) registers: Vec<FieldValue>,
126
127 pub(crate) topology_info: TopologyInfo,
128}
129
130/// Struct representing the result of a query operation.
131#[derive(Default, Debug)]
132pub struct QueryResult {
133 pub(crate) rows: Vec<MapValue>,
134 pub(crate) prepared_statement: PreparedStatement,
135 pub(crate) consumed: Capacity,
136 // TODO: stats, consumed, etc.
137}
138
139impl QueryResult {
140 /// Get the query result rows, if any.
141 ///
142 /// If the query returned no rows, this will return a reference to an empty vector.
143 /// Otherwise, it will return a reference to the rows in the order specified by the query.
144 pub fn rows(&self) -> &Vec<MapValue> {
145 &self.rows
146 }
147 /// Take the query result rows, setting the result back to an empty vector.
148 ///
149 /// If the query returned no rows, this will return an empty vector.
150 /// Otherwise, it will return the rows in the vector, giving the ownership
151 /// of the rows to the caller.
152 pub fn take_rows(&mut self) -> Vec<MapValue> {
153 std::mem::take(&mut self.rows)
154 }
155 /// Get the prepared statement after execution of a query.
156 ///
157 /// The prepared statement can then be used in subsequent query requests, saving the
158 /// extra step of preparing each query again.
159 pub fn prepared_statement(&self) -> PreparedStatement {
160 let mut ps = self.prepared_statement.clone();
161 let _ = ps.reset();
162 ps
163 }
164 /// Return the total capacity that was consumed during the execution of the query.
165 ///
166 /// This is only relevant for NoSQL Cloud operation. It returns a [`Capacity`] struct which
167 /// contains the total read units, read KB, and write units used by the query execution.
168 pub fn consumed(&self) -> Capacity {
169 self.consumed.clone()
170 }
171}
172
173impl QueryRequest {
174 /// Create a new QueryRequest from a SQL query string.
175 ///
176 /// While this struct is named `QueryRequest`, the SQL supplied to it does not
177 /// necessarily have to be a `SELECT` query. It could also be one of `INSERT`, `UPDATE`,
178 /// or `DELETE`.
179 ///
180 /// See the [SQL for NoSQL Database Guide](https://docs.oracle.com/en/database/other-databases/nosql-database/24.1/sqlfornosql/introduction-sql.html) for details on creating and using queries.
181 ///
182 /// Note: this request should not be used for DDL statements (those that create or modify tables or indexes, such as `CREATE TABLE`). For DDL statements, use [`TableRequest`](crate::TableRequest) instead.
183 ///
184 pub fn new(statement: &str) -> Self {
185 QueryRequest {
186 statement: Some(statement.to_string()),
187 shard_id: -1,
188 ..Default::default()
189 }
190 }
191
192 /// Create a new QueryRequest from a previously prepared query statement.
193 ///
194 /// Use of this method is recommended when executing the same type of query multiple
195 /// times with different values for parameters. Doing so will save resources by not
196 /// re-preparing the query on every execution.
197 ///
198 /// To set bind variables for query execution, first create the request with this method,
199 /// then call [`QueryRequest::set_variable()`] for all desired bind variables. Then execute the
200 /// query with [`QueryRequest::execute()`].
201 pub fn new_prepared(prepared_statement: &PreparedStatement) -> Self {
202 let ti: TopologyInfo;
203 if let Some(t) = &prepared_statement.topology_info {
204 ti = t.clone();
205 } else {
206 panic!(
207 "Invalid prepared statement passed to new_prepared! Missing toploogy info. ps={:?}",
208 prepared_statement
209 );
210 }
211 QueryRequest {
212 prepared_statement: prepared_statement.clone(),
213 shard_id: -1,
214 topology_info: ti,
215 ..Default::default()
216 }
217 }
218
219 /// Specify that this query execution should only prepare the query.
220 ///
221 /// Setting this value to true and then calling [`QueryRequest::execute()`]
222 /// will result in only the query being prepared, and no result rows being returned.
223 /// The prepared statement can then be retrieved using [`QueryResult::prepared_statement()`]
224 /// and can be used in subsequent query calls using [`QueryRequest::new_prepared()`].
225 pub fn prepare_only(mut self) -> Self {
226 self.prepare_only = true;
227 self
228 }
229
230 /// Specify the timeout value for the request.
231 ///
232 /// This is optional.
233 /// If set, it must be greater than or equal to 1 millisecond, otherwise an
234 /// IllegalArgument error will be returned.
235 /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
236 pub fn timeout(mut self, t: &Duration) -> Self {
237 self.timeout = Some(t.clone());
238 self
239 }
240
241 /// Cloud Service only: set the name or id of a compartment to be used for this operation.
242 ///
243 /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
244 /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
245 /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
246 /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
247 ///
248 /// If no compartment is given, the root compartment of the tenancy will be used.
249 pub fn compartment_id(mut self, compartment_id: &str) -> Self {
250 self.compartment_id = compartment_id.to_string();
251 self
252 }
253
254 // Specify a limit on number of items returned by the operation.
255 //
256 // This allows an operation to return less than the default amount of data.
257 //pub fn limit(mut self, l: u32) -> Self {
258 //self.limit = l;
259 //self
260 //}
261
262 /// Specify the desired consistency policy for the request.
263 ///
264 /// If not set, the default consistency of [`Consistency::Eventual`] is used.
265 pub fn consistency(mut self, c: &Consistency) -> Self {
266 self.consistency = c.clone();
267 self
268 }
269
270 /// Specify the limit on the total data read during a single batch operation, in KB.
271 ///
272 /// For cloud service, this value can only reduce the system defined limit.
273 /// An attempt to increase the limit beyond the system defined limit will
274 /// cause an IllegalArgument error. This limit is independent of read units
275 /// consumed by the operation.
276 ///
277 /// It is recommended that for tables with relatively low provisioned read
278 /// throughput that this limit be set to less than or equal to one half
279 /// of the provisioned throughput in order to reduce the possibility of throttling
280 /// errors.
281 pub fn max_read_kb(mut self, max: u32) -> Self {
282 self.max_read_kb = max;
283 self
284 }
285
286 /// Specify the limit on the total data written during a single batch operation, in KB.
287 ///
288 /// For cloud service, this value can only reduce the system defined limit.
289 /// An attempt to increase the limit beyond the system defined limit will
290 /// cause an IllegalArgument error. This limit is independent of write units
291 /// consumed by the operation.
292 ///
293 /// This limit is independent of write units consumed by the operation.
294 pub fn max_write_kb(mut self, max: u32) -> Self {
295 self.max_write_kb = max;
296 self
297 }
298
299 // used by ext_var_ref_iter
300 pub(crate) fn get_external_var(&self, id: i32) -> Option<&FieldValue> {
301 if self.prepared_statement.is_empty() {
302 return None;
303 }
304 self.prepared_statement.get_variable_by_id(id)
305 }
306
307 // note private
308 async fn get_results(
309 &mut self,
310 handle: &Handle,
311 results: &mut Vec<MapValue>,
312 ) -> Result<(), NoSQLError> {
313 // this is where everything happens
314
315 if let Some(e) = &self.err {
316 return Err(e.clone());
317 }
318
319 if self.prepare_only == true || self.prepared_statement.is_simple() {
320 // results already fetched from nson_deserialize()
321 //println!("get_results: prepare or simple: returning");
322 return Ok(());
323 }
324
325 //let driver_plan = &mut self.prepared_statement.driver_query_plan;
326 let mut driver_plan = std::mem::take(&mut self.prepared_statement.driver_query_plan);
327
328 if driver_plan.get_state() == PlanIterState::Uninitialized {
329 //println!("get_results: initializing driver plan");
330 self.reached_limit = false;
331 //self.memory_consumption = 0;
332 self.consumed_capacity = Capacity::default();
333 self.sql_hash_tag = Default::default();
334
335 self.consumed_capacity.read_kb += 1; // prep cost
336 self.consumed_capacity.read_units += 1; // prep cost
337 driver_plan.open(self, handle)?;
338 }
339
340 let mut more;
341 loop {
342 //println!("get_results: calling driver_plan.next()");
343 more = driver_plan.next(self, handle).await?;
344 if more == false {
345 //println!("get_results: no more results: breaking");
346 break;
347 }
348 //println!("get_results: pushing 1 result");
349 results.push(driver_plan.get_result(self).get_map_value()?);
350 //if self.limit > 0 && results.len() >= self.limit as usize {
351 //println!(
352 //"get_results: reached limit: results_size={}, limit={}",
353 //results.len(),
354 //self.limit
355 //);
356 //self.reached_limit = true;
357 //break;
358 //}
359 }
360
361 self.prepared_statement.driver_query_plan = driver_plan;
362
363 if more {
364 // non-advanced queries just need Some/None, value not used
365 self.continuation_key = Some(Vec::new());
366 self.is_done = false;
367 } else {
368 if self.reached_limit {
369 // there is more to do, but we reached a limit
370 self.continuation_key = Some(Vec::new());
371 self.reached_limit = false;
372 self.is_done = false;
373 } else {
374 self.continuation_key = None;
375 self.is_done = true;
376 }
377 }
378
379 Ok(())
380 }
381
382 pub(crate) fn copy_for_internal(&self) -> Self {
383 if self.prepared_statement.is_empty() {
384 panic!("prepared statement is empty in copy_for_internal");
385 }
386 QueryRequest {
387 is_internal: true,
388 prepared_statement: self.prepared_statement.copy_for_internal(),
389 shard_id: self.shard_id,
390 //limit: self.limit,
391 // purposefully not copying registers
392 num_registers: -1,
393 timeout: self.timeout.clone(),
394 ..Default::default()
395 }
396 }
397
398 pub(crate) fn reset(&mut self) -> Result<(), NoSQLError> {
399 self.is_done = false;
400 self.reached_limit = false;
401 self.batch_counter = 0;
402 self.consumed_capacity = Capacity::default();
403 // clear prepared statement iterators
404 self.prepared_statement.reset()
405 }
406
407 /// Set a named bind variable for execution of a prepared query.
408 ///
409 /// See [`PreparedStatement`] for an example of using this method.
410 pub fn set_variable(
411 &mut self,
412 name: &str,
413 value: &impl NoSQLColumnToFieldValue,
414 ) -> Result<(), NoSQLError> {
415 if self.prepared_statement.is_empty() {
416 return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
417 }
418 let fv = value.to_field_value();
419 self.prepared_statement.set_variable(name, &fv)
420 }
421
422 /// Set a positional bind variable for execution of a prepared query.
423 ///
424 /// This is similar to [`set_variable()`](QueryRequest::set_variable()) but uses integer-based positional parameters:
425 /// ```no_run
426 /// # use oracle_nosql_rust_sdk::{Handle, QueryRequest, NoSQLColumnToFieldValue};
427 /// # #[tokio::main]
428 /// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
429 /// let handle = Handle::builder().build().await?;
430 /// let prep_result = QueryRequest::new("insert into testusers(id, name) values(?, ?)")
431 /// .prepare_only()
432 /// .execute(&handle)
433 /// .await?;
434 /// let data = vec!["jane", "john", "jasper"];
435 /// let mut qreq = QueryRequest::new_prepared(&prep_result.prepared_statement());
436 /// for i in 0..data.len() {
437 /// let id = (i as i32) + 100;
438 /// qreq.set_variable_by_id(1, &id)?;
439 /// qreq.set_variable_by_id(2, &data[i])?;
440 /// let result = qreq.execute(&handle).await?;
441 /// println!("Insert result = {:?}", result);
442 /// }
443 /// # Ok(())
444 /// # }
445 pub fn set_variable_by_id(
446 &mut self,
447 id: i32,
448 value: &impl NoSQLColumnToFieldValue,
449 ) -> Result<(), NoSQLError> {
450 if self.prepared_statement.is_empty() {
451 return ia_err!("cannot set bind variables: no prepared statement in QueryRequest");
452 }
453 let fv = value.to_field_value();
454 self.prepared_statement.set_variable_by_id(id, &fv)
455 }
456
457 /// Execute the query to full completion.
458 ///
459 /// This is the preferred method for execution of a query. Internally, this method will loop
460 /// calling `execute_batch()` until all results are returned and all post-processing (sorting,
461 /// grouping, aggregations, etc) are complete.
462 ///
463 /// If the query has no rows to return, [`QueryResult::rows()`] will return an empty vector.
464 /// Otherwise it will return a vector of
465 /// [`MapValue`](crate::types::MapValue) structs in the order specified by the
466 /// query statement.
467 pub async fn execute(&mut self, h: &Handle) -> Result<QueryResult, NoSQLError> {
468 let mut iter_data = ReceiveIterData::default();
469 let mut results: Vec<MapValue> = Vec::new();
470 self.reset()?;
471 while self.is_done == false {
472 //println!("execute_internal doing next batch");
473 self.execute_batch_internal(h, &mut results, &mut iter_data)
474 .await?;
475 self.batch_counter += 1;
476 if self.batch_counter > 10000 {
477 panic!("Batch_internal infinite loop detected: self={:?}", self);
478 }
479 }
480
481 if self.prepared_statement.is_empty() {
482 panic!("empty prepared statement after execute!");
483 }
484
485 // TODO: retries, stats, etc
486 let mut qres = QueryResult {
487 prepared_statement: self.prepared_statement.clone(),
488 consumed: self.consumed_capacity.clone(),
489 rows: results,
490 };
491 let _ = qres.prepared_statement.reset();
492 Ok(qres)
493 }
494
495 /// Execute one batch of a query.
496 ///
497 /// This will execute at most one round-trip to the server. It should be called in a loop
498 /// until `is_done()` returns `true`. Note that any one batch execution may not set any results,
499 /// since some queries require many server round trips to finish (sorting, for example).
500 ///
501 /// It is recommended to use [`execute()`](QueryRequest::execute()) instead of this method.
502 /// *This method may be deprecated in future releases*.
503 pub async fn execute_batch(
504 &mut self,
505 handle: &Handle,
506 results: &mut Vec<MapValue>,
507 ) -> Result<(), NoSQLError> {
508 let mut _data = ReceiveIterData::default();
509 self.execute_batch_internal(handle, results, &mut _data)
510 .await
511 }
512
513 /// Determine if the query is complete.
514 ///
515 /// If using [`QueryRequest::execute_batch()`] in a loop, this method determines when
516 /// to terminate the loop, specifying that no more results exist for this query execution.
517 /// This is only necessary if executing queries in batch looping mode.
518 pub fn is_done(&self) -> bool {
519 self.is_done
520 }
521
522 pub(crate) async fn execute_batch_internal(
523 &mut self,
524 handle: &Handle,
525 results: &mut Vec<MapValue>,
526 iter_data: &mut ReceiveIterData,
527 ) -> Result<(), NoSQLError> {
528 trace!(
529 "EBI: batch_counter={} num_results={}",
530 self.batch_counter,
531 results.len()
532 );
533
534 self.reached_limit = false;
535
536 // internal queries do not use plan iterators/etc - they just return plain results.
537 if self.is_internal == false {
538 /*
539 * The following "if" may be true for advanced queries only. For
540 * such queries, the "if" will be true (i.e., the QueryRequest will
541 * be bound with a QueryDriver) if and only if this is not the 1st
542 * execute() call for this query. In this case we just return a new,
543 * empty QueryResult. Actual computation of a result batch will take
544 * place when the app calls getResults() on the QueryResult.
545 */
546 if self.has_driver {
547 //trace("QueryRequest has QueryDriver", 2);
548 return self.get_results(handle, results).await;
549 }
550
551 /*
552 * If it is an advanced query and we are here, then this must be
553 * the 1st execute() call for the query. If the query has been
554 * prepared before, we create a QueryDriver and bind it with the
555 * QueryRequest. Then, we create and return an empty QueryResult.
556 * Actual computation of a result batch will take place when the
557 * app calls getResults() on the QueryResult.
558 */
559 if self.prepared_statement.is_empty() == false
560 && self.prepared_statement.is_simple() == false
561 {
562 //trace("QueryRequest has no QueryDriver, but is prepared", 2);
563 self.num_registers = self.prepared_statement.num_registers;
564 self.registers = Vec::new();
565 for _i in 0..self.num_registers {
566 self.registers.push(FieldValue::Uninitialized);
567 }
568 self.has_driver = true;
569 return self.get_results(handle, results).await;
570 }
571
572 /*
573 * If we are here, then this is either (a) a simple query or (b) an
574 * advanced query that has not been prepared already, which also
575 * implies that this is the 1st execute() call on this query. For
576 * a non-prepared advanced query, the effect of this 1st execute()
577 * call is to send the query to the proxy for compilation, get back
578 * the prepared query, but no query results, create a QueryDriver,
579 * and bind it with the QueryRequest (see
580 * QueryRequestSerializer.deserialize()), and return an empty
581 * QueryResult.
582 * a non-prepared simple query will return results.
583 */
584 //trace("QueryRequest has no QueryDriver and is not prepared", 2);
585 //self.batch_counter += 1;
586 }
587
588 let mut w: Writer = Writer::new();
589 w.write_i16(handle.inner.serial_version);
590 let timeout = handle.get_timeout(&self.timeout);
591 self.serialize_internal(&mut w, &timeout)?;
592 let mut opts = SendOptions {
593 timeout: timeout,
594 retryable: true,
595 compartment_id: self.compartment_id.clone(),
596 ..Default::default()
597 };
598 let mut r = handle.send_and_receive(w, &mut opts).await?;
599 self.continuation_key = None;
600 self.nson_deserialize(&mut r, results, iter_data)?;
601 if self.continuation_key.is_none() {
602 trace!("continuation key is None, setting is_done");
603 self.is_done = true;
604 }
605 Ok(())
606 }
607
608 fn serialize_internal(&self, w: &mut Writer, timeout: &Duration) -> Result<(), NoSQLError> {
609 let mut ns = NsonSerializer::start_request(w);
610 ns.start_header();
611 if self.prepare_only {
612 ns.write_header(OpCode::Prepare, timeout, "");
613 } else {
614 ns.write_header(OpCode::Query, timeout, "");
615 }
616 ns.end_header();
617 ns.start_payload();
618
619 //TODO writeConsistency(ns, rq.getConsistency());
620 //if (rq.getDurability() != null) {
621 //writeMapField(ns, DURABILITY,
622 //getDurability(rq.getDurability()));
623 //}
624
625 if self.max_read_kb > 0 {
626 ns.write_i32_field(MAX_READ_KB, self.max_read_kb as i32);
627 }
628 if self.max_write_kb > 0 {
629 ns.write_i32_field(MAX_WRITE_KB, self.max_write_kb as i32);
630 }
631 //if self.limit > 0 {
632 //ns.write_i32_field(NUMBER_LIMIT, self.limit as i32);
633 //}
634
635 //writeMapFieldNZ(ns, TRACE_LEVEL, rq.getTraceLevel());
636 //if (rq.getTraceLevel() > 0) {
637 //writeMapField(ns, TRACE_AT_LOG_FILES, rq.getLogFileTracing());
638 //writeMapField(ns, BATCH_COUNTER, rq.getBatchCounter());
639
640 ns.write_i32_field(QUERY_VERSION, 3); // TODO: QUERY_V4
641 if self.prepared_statement.is_empty() == false {
642 ns.write_bool_field(IS_PREPARED, true);
643 ns.write_bool_field(IS_SIMPLE_QUERY, self.prepared_statement.is_simple());
644 ns.write_binary_field(PREPARED_QUERY, &self.prepared_statement.statement);
645 if self.prepared_statement.data.bind_variables.len() > 0 {
646 ns.start_array(BIND_VARIABLES);
647 for (k, v) in &self.prepared_statement.data.bind_variables {
648 ns.start_map("");
649 trace!(" BIND: name={} value={:?}", k, v);
650 ns.write_string_field(NAME, k);
651 ns.write_field(VALUE, v);
652 ns.end_map("");
653 ns.incr_size(1);
654 }
655 ns.end_array(BIND_VARIABLES);
656 }
657 } else {
658 if let Some(s) = &self.statement {
659 ns.write_string_field(STATEMENT, &s);
660 } else {
661 return ia_err!("no statement or prepared statement");
662 }
663 }
664 if let Some(ck) = &self.continuation_key {
665 if ck.len() > 0 {
666 ns.write_binary_field(CONTINUATION_KEY, ck);
667 //println!("Wrote {} byte continuation key", ck.len());
668 }
669 }
670
671 //writeLongMapFieldNZ(ns, SERVER_MEMORY_CONSUMPTION,
672 //rq.getMaxServerMemoryConsumption());
673 //writeMathContext(ns, rq.getMathContext());
674
675 if self.shard_id > -1 {
676 //println!("Q: SHARD_ID={}", self.shard_id);
677 ns.write_i32_field(SHARD_ID, self.shard_id);
678 }
679 //if (queryVersion >= QueryDriver.QUERY_V4) {
680 //if (rq.getQueryName() != null) {
681 //writeMapField(ns, QUERY_NAME, rq.getQueryName());
682 //}
683 //if (rq.getVirtualScan() != null) {
684 //writeVirtualScan(ns, rq.getVirtualScan());
685 //}
686 //}
687
688 ns.end_payload();
689 ns.end_request();
690 Ok(())
691 }
692
693 // TODO
694 //theRCB.tallyRateLimitDelayedMs(result.getRateLimitDelayedMs());
695 //theRCB.tallyRetryStats(result.getRetryStats());
696 // TODO: support deduping of results
697
698 pub(crate) fn add_results(
699 &self,
700 walker: &mut MapWalker,
701 results: &mut Vec<MapValue>,
702 ) -> Result<(), NoSQLError> {
703 let t = FieldType::try_from_u8(walker.r.read_byte()?)?;
704 if t != FieldType::Array {
705 return ia_err!("bad type in queryResults: {:?}, should be Array", t);
706 }
707 walker.r.read_i32()?; // length of array in bytes
708 let num_elements = walker.r.read_i32()?;
709 trace!("read_results: num_results={}", num_elements);
710 if num_elements <= 0 {
711 return Ok(());
712 }
713 for _i in 0..num_elements {
714 if let FieldValue::Map(m) = walker.r.read_field_value()? {
715 //println!("Result: {:?}", m);
716 results.push(m);
717 } else {
718 return ia_err!("got invalid type of value in query results");
719 }
720 }
721 Ok(())
722 }
723
724 // Deserialize results for a QueryRequest.
725 fn nson_deserialize(
726 &mut self,
727 r: &mut Reader,
728 results: &mut Vec<MapValue>,
729 iter_data: &mut ReceiveIterData,
730 ) -> Result<(), NoSQLError> {
731 // TODO short serialVersion
732 // TODO short queryVersion
733
734 let is_prepared_request = !self.prepared_statement.is_empty();
735
736 let mut ti = TopologyInfo::default();
737 self.continuation_key = None;
738 iter_data.continuation_key = None;
739
740 let mut walker = MapWalker::new(r)?;
741 while walker.has_next() {
742 walker.next()?;
743 let name = walker.current_name();
744 match name.as_str() {
745 ERROR_CODE => {
746 walker.handle_error_code()?;
747 }
748 CONSUMED => {
749 let cap = walker.read_nson_consumed_capacity()?;
750 self.consumed_capacity.add(&cap);
751 }
752 QUERY_RESULTS => {
753 self.add_results(&mut walker, results)?;
754 }
755 CONTINUATION_KEY => {
756 let ck = walker.read_nson_binary()?;
757 if ck.len() > 0 {
758 trace!("Read {} byte continuation key", ck.len());
759 iter_data.continuation_key = Some(ck.clone());
760 self.continuation_key = Some(ck);
761 }
762 }
763 SORT_PHASE1_RESULTS => {
764 self.read_phase_1_results(iter_data, &walker.read_nson_binary()?)?;
765 }
766 PREPARED_QUERY => {
767 if is_prepared_request {
768 return ia_err!("got prepared query in result for already prepared query");
769 }
770 self.prepared_statement.statement = walker.read_nson_binary()?;
771 }
772 DRIVER_QUERY_PLAN => {
773 if is_prepared_request {
774 return ia_err!("got driver plan in result for already prepared query");
775 }
776 let v = walker.read_nson_binary()?;
777 self.get_driver_plan_info(&v)?;
778 }
779 REACHED_LIMIT => {
780 self.reached_limit = walker.read_nson_boolean()?;
781 trace!("REACHED_LIMIT={}", self.reached_limit);
782 }
783 TABLE_NAME => {
784 self.prepared_statement.table_name = Some(walker.read_nson_string()?);
785 }
786 NAMESPACE => {
787 self.prepared_statement.namespace = Some(walker.read_nson_string()?);
788 }
789 QUERY_PLAN_STRING => {
790 self.prepared_statement.query_plan = walker.read_nson_string()?;
791 }
792 QUERY_RESULT_SCHEMA => {
793 self.prepared_statement.query_schema = walker.read_nson_string()?;
794 }
795 QUERY_OPERATION => {
796 // TODO: is this an enum? try_from()?
797 self.prepared_statement.operation = walker.read_nson_i32()? as u8;
798 }
799 TOPOLOGY_INFO => {
800 //println!("deser: TOPOLOGY_INFO");
801 self.prepared_statement.topology_info = Some(walker.read_nson_topology_info()?);
802 }
803 /* QUERY_V3 and earlier return topo differently */
804 PROXY_TOPO_SEQNUM => {
805 //println!("deser: PROXY_TOPO_SEQNUM");
806 ti.seq_num = walker.read_nson_i32()?;
807 }
808 SHARD_IDS => {
809 //println!("deser: SHARD_IDS");
810 ti.shard_ids = walker.read_nson_i32_array()?;
811 }
812 _ => {
813 trace!(" query_response: skipping field '{}'", name);
814 walker.skip_nson_field()?;
815 } /*
816 // added in QUERY_V4
817 else if (name.equals(VIRTUAL_SCANS)) {
818 readType(in, Nson.TYPE_ARRAY);
819 in.readInt(); // length of array in bytes
820 int numScans = in.readInt(); // number of array elements
821 virtualScans = new VirtualScan[numScans];
822 for (int i = 0; i < numScans; ++i) {
823 virtualScans[i] = readVirtualScan(in);
824 }
825
826 /* added in QUERY_V4 */
827 } else if (name.equals(QUERY_BATCH_TRACES)) {
828 readType(in, Nson.TYPE_ARRAY);
829 in.readInt(); // length of array in bytes
830 int numTraces = in.readInt() / 2; // number of array elements
831 queryTraces = new TreeMap<String,String>();
832 for (int i = 0; i < numTraces; ++i) {
833 String batchName = Nson.readNsonString(in);
834 String batchTrace = Nson.readNsonString(in);
835 queryTraces.put(batchName, batchTrace);
836 }
837 }
838 */
839 }
840 }
841
842 if ti.is_valid() {
843 self.prepared_statement.topology_info = Some(ti);
844 }
845
846 if let Some(ti) = &self.prepared_statement.topology_info {
847 //println!("deser: got TI={:?}", ti);
848 self.topology_info = ti.clone();
849 } else {
850 trace!("deser: NO VALID TOPOLOGY RECEIVED");
851 }
852
853 if self.prepare_only == true {
854 if self.prepared_statement.is_empty() {
855 return ia_err!("got no prepared statement when prepare_only was set");
856 }
857 self.is_done = true;
858 } else {
859 if !self.prepared_statement.is_simple() && self.continuation_key.is_none() {
860 // dummy cont key so is_done won't be set
861 trace!("Adding dummy continuation key");
862 self.continuation_key = Some(Vec::new());
863 }
864 }
865
866 Ok(())
867 }
868
869 fn get_driver_plan_info(&mut self, v: &Vec<u8>) -> Result<(), NoSQLError> {
870 if v.len() == 0 {
871 return Ok(());
872 }
873 let mut r = Reader::new().from_bytes(v);
874 self.prepared_statement.driver_query_plan = deserialize_plan_iter(&mut r)?;
875 //println!(
876 //"driver query plan:\n{:?}",
877 //self.prepared_statement.driver_query_plan
878 //);
879 if self.prepared_statement.driver_query_plan.get_kind() == PlanIterKind::Empty {
880 return Ok(());
881 }
882 self.prepared_statement.num_iterators = r.read_i32()?;
883 //println!(
884 //" QUERY_PLAN: iterators={}",
885 //self.prepared_statement.num_iterators
886 //);
887 self.prepared_statement.num_registers = r.read_i32()?;
888 //println!(
889 //" QUERY_PLAN: registers={}",
890 //self.prepared_statement.num_registers
891 //);
892 let len = r.read_i32()?;
893 if len <= 0 {
894 return Ok(());
895 }
896 let mut hm: HashMap<String, i32> = HashMap::with_capacity(len as usize);
897 for _i in 0..len {
898 let name = r.read_string()?;
899 let id = r.read_i32()?;
900 hm.insert(name, id);
901 }
902 self.prepared_statement.variable_to_ids = Some(hm);
903 Ok(())
904 }
905
906 fn read_phase_1_results(
907 &mut self,
908 iter_data: &mut ReceiveIterData,
909 arr: &Vec<u8>,
910 ) -> Result<(), NoSQLError> {
911 let mut r: Reader = Reader::new().from_bytes(arr);
912 iter_data.in_sort_phase_1 = r.read_bool()?;
913 iter_data.pids = r.read_i32_array()?;
914 if iter_data.pids.len() > 0 {
915 iter_data.num_results_per_pid = r.read_i32_array()?;
916 iter_data.part_continuation_keys = Vec::new();
917 for _x in 0..iter_data.num_results_per_pid.len() {
918 iter_data.part_continuation_keys.push(r.read_binary()?);
919 }
920 }
921 Ok(())
922 }
923
924 pub(crate) fn get_result(&mut self, reg: i32) -> FieldValue {
925 if self.num_registers <= reg {
926 panic!("INVALID GET REGISTER ACCESS");
927 }
928 //println!(
929 //" get_result register {}: {:?}",
930 //reg, self.registers[reg as usize]
931 //);
932 std::mem::take(&mut self.registers[reg as usize])
933 }
934
935 pub(crate) fn get_result_ref(&self, reg: i32) -> &FieldValue {
936 //println!(
937 //" get_result_ref register {}: {:?}",
938 //reg, self.registers[reg as usize]
939 //);
940 &self.registers[reg as usize]
941 }
942
943 pub(crate) fn set_result(&mut self, reg: i32, val: FieldValue) {
944 if self.num_registers <= reg {
945 panic!("INVALID SET REGISTER ACCESS");
946 }
947 //println!(" set_result register {}: {:?}", reg, val);
948 self.registers[reg as usize] = val;
949 }
950}