oracle_nosql_rust_sdk/write_multiple_request.rs
1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5// https://oss.oracle.com/licenses/upl/
6//
7use crate::delete_request::DeleteRequest;
8use crate::error::NoSQLError;
9use crate::error::NoSQLErrorCode::IllegalArgument;
10use crate::handle::Handle;
11use crate::handle::SendOptions;
12use crate::nson::NsonSubRequest;
13use crate::nson::*;
14use crate::put_request::PutRequest;
15use crate::reader::Reader;
16use crate::types::{Capacity, FieldType, FieldValue, MapValue, NoSQLRow, OpCode};
17use crate::writer::Writer;
18use crate::Version;
19use std::result::Result;
20use std::time::Duration;
21
22// For doc only
23#[allow(unused_imports)]
24use crate::{DeleteResult, PutResult};
25
26/// Struct used to perform multiple [`PutRequest`]s and/or [`DeleteRequest`]s in a single operation.
27#[derive(Default, Debug)]
28pub struct WriteMultipleRequest {
29 pub(crate) table_name: String,
30 pub(crate) compartment_id: String,
31 // TODO: pub(crate) namespace: String,
32 pub(crate) timeout: Option<Duration>,
33 pub(crate) sub_requests: Vec<Box<dyn NsonSubRequest>>,
34 // TODO durability: Option<Vec<u8>>
35
36 // TODO: limiters, retry stats, etc
37}
38
39/// Struct representing the result of a single sub-operation of a [`WriteMultipleRequest`].
40#[derive(Default, Debug)]
41pub struct SubOperationResult {
42 pub(crate) success: bool,
43 pub(crate) version: Option<Version>,
44 pub(crate) consumed: Option<Capacity>,
45 pub(crate) generated_value: Option<FieldValue>,
46 pub(crate) existing_modification_time: i64,
47 pub(crate) existing_value: Option<MapValue>,
48 pub(crate) existing_version: Option<Version>,
49 // TODO: stats, etc... (base)
50}
51
52impl SubOperationResult {
53 /// Get the success result of the sub-operation.
54 pub fn success(&self) -> bool {
55 self.success
56 }
57 /// For `Put` operations,
58 /// Get the Version of the now-current record. This value is `Some` if the put operation succeeded. It
59 /// may be used in subsequent [`PutRequest::if_version()`] calls.
60 /// This is valid for `Put` operations only.
61 pub fn version(&self) -> Option<&Version> {
62 if let Some(v) = &self.version {
63 return Some(v);
64 }
65 None
66 }
67 /// Get the consumed capacity (read/write units) of the operation. This is only valid in the NoSQL Cloud Service.
68 pub fn consumed(&self) -> Option<&Capacity> {
69 if let Some(c) = &self.consumed {
70 return Some(c);
71 }
72 None
73 }
74 /// for `Put` operations, get the value generated if the operation created a new value. This can happen if the table contains an
75 /// identity column or string column declared as a generated UUID. If the table has no such column, this value is `None`.
76 /// This is valid for `Put` operations only.
77 pub fn generated_value(&self) -> Option<&FieldValue> {
78 if let Some(r) = &self.generated_value {
79 return Some(r);
80 }
81 None
82 }
83
84 /// For `Put` operations, see [`PutResult::existing_modification_time()`]. For `Delete` operations,
85 /// see [`DeleteResult::existing_modification_time()`].
86 // TODO: make this a Time field
87 pub fn existing_modification_time(&self) -> i64 {
88 self.existing_modification_time
89 }
90 /// For `Put` operations, see [`PutResult::existing_value()`]. For `Delete` operations,
91 /// see [`DeleteResult::existing_value()`].
92 pub fn existing_value(&self) -> Option<&MapValue> {
93 if let Some(v) = &self.existing_value {
94 return Some(v);
95 }
96 None
97 }
98 /// For `Put` operations, see [`PutResult::existing_version()`]. For `Delete` operations,
99 /// see [`DeleteResult::existing_version()`].
100 pub fn existing_version(&self) -> Option<&Version> {
101 if let Some(v) = &self.existing_version {
102 return Some(v);
103 }
104 None
105 }
106}
107
108/// Struct representing the combined results of a [`WriteMultipleRequest`] operation.
109#[derive(Default, Debug)]
110pub struct WriteMultipleResult {
111 pub(crate) results: Vec<SubOperationResult>,
112 pub(crate) failed_operation_index: i32,
113 pub(crate) consumed: Option<Capacity>,
114}
115
116impl WriteMultipleResult {
117 /// Get a vector of sub-operation results. This vector is ordered in the same order as
118 /// put/delete items were added to the `WriteMultipleRequest`.
119 pub fn results(&self) -> &Vec<SubOperationResult> {
120 &self.results
121 }
122 /// Get the offset of the first failed operation.
123 /// If there are no failures, -1 is returned.
124 pub fn failed_operation_index(&self) -> i32 {
125 self.failed_operation_index
126 }
127 /// Get the consumed capacity (read/write units) of the overall operation. This is only valid in the NoSQL Cloud Service.
128 pub fn consumed(&self) -> Option<&Capacity> {
129 if let Some(c) = &self.consumed {
130 return Some(c);
131 }
132 None
133 }
134}
135
136impl WriteMultipleRequest {
137 pub fn new(table_name: &str) -> WriteMultipleRequest {
138 WriteMultipleRequest {
139 table_name: table_name.to_string(),
140 ..Default::default()
141 }
142 }
143
144 /// Specify the timeout value for the request.
145 ///
146 /// This is optional.
147 /// If set, it must be greater than or equal to 1 millisecond, otherwise an
148 /// IllegalArgument error will be returned.
149 /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
150 pub fn timeout(mut self, t: &Duration) -> Self {
151 self.timeout = Some(t.clone());
152 self
153 }
154
155 /// Cloud Service only: set the name or id of a compartment to be used for this operation.
156 ///
157 /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
158 /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
159 /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
160 /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
161 ///
162 /// If no compartment is given, the root compartment of the tenancy will be used.
163 pub fn compartment_id(mut self, compartment_id: &str) -> Self {
164 self.compartment_id = compartment_id.to_string();
165 self
166 }
167
168 pub fn add(mut self, r: Box<dyn NsonSubRequest>) -> WriteMultipleRequest {
169 self.sub_requests.push(r);
170 self
171 }
172
173 // let mut data = Vec<MyStruct>::new();
174 // ... fill in vector with many MyStructs ...
175 //
176 // let res: WriteMultipleResult = WriteMultipleRequest::new(tablename)
177 // .put(data)
178 // .execute(&handle);
179
180 // Note this consumes the collection
181 pub fn put<T>(mut self, collection: T) -> Result<WriteMultipleRequest, NoSQLError>
182 where
183 T: IntoIterator,
184 T::Item: NoSQLRow,
185 {
186 for item in collection {
187 // note: this implies collection.into_iter()
188 self.sub_requests
189 .push(Box::new(PutRequest::new("").put(item)?));
190 }
191 Ok(self)
192 }
193
194 // Note this consumes the collection
195 pub fn delete<T>(mut self, collection: T) -> Result<WriteMultipleRequest, NoSQLError>
196 where
197 T: IntoIterator,
198 T::Item: NoSQLRow,
199 {
200 for item in collection {
201 // note: this implies collection.into_iter()
202 match item.to_map_value() {
203 Ok(value) => {
204 self.sub_requests
205 .push(Box::new(DeleteRequest::new("", value)));
206 }
207 Err(e) => {
208 // TODO: save error as source
209 return Err(NoSQLError::new(
210 IllegalArgument,
211 &format!("could not convert struct to MapValue: {}", e.to_string()),
212 ));
213 }
214 }
215 }
216 Ok(self)
217 }
218
219 pub async fn execute(&self, h: &Handle) -> Result<WriteMultipleResult, NoSQLError> {
220 // TODO: validate: size > 0, etc
221 let mut w: Writer = Writer::new();
222 w.write_i16(h.inner.serial_version);
223 let timeout = h.get_timeout(&self.timeout);
224 self.serialize_internal(&mut w, &timeout);
225 let mut opts = SendOptions {
226 timeout: timeout,
227 retryable: false,
228 compartment_id: self.compartment_id.clone(),
229 ..Default::default()
230 };
231 let mut r = h.send_and_receive(w, &mut opts).await?;
232 let resp = WriteMultipleRequest::nson_deserialize(&mut r)?;
233 Ok(resp)
234 }
235
236 fn serialize_internal(&self, w: &mut Writer, timeout: &Duration) {
237 let mut ns = NsonSerializer::start_request(w);
238 ns.start_header();
239
240 // TableName
241 // If all ops use the same table name, write that
242 // single table name to the output stream.
243 // If any of them are different, write all table
244 // names to the individual ops.
245 // Possible optimization: if most use one table,
246 // write that in the header and only write minority
247 // table names in specific ops.
248 // TODO if self.is_single_table() {
249 ns.write_header(OpCode::WriteMultiple, timeout, &self.table_name);
250 //} else {
251 //ns.write_header(OpCode::WriteMultiple, self.timeout_ms, "");
252 //}
253 ns.end_header();
254
255 // TODO: compartment
256 ns.start_payload();
257 ns.write_i32_field(DURABILITY, 0); // TODO
258 ns.write_i32_field(NUM_OPERATIONS, self.sub_requests.len() as i32);
259
260 // OPERATIONS: array of maps
261 ns.start_array(OPERATIONS);
262 for rq in self.sub_requests.as_slice() {
263 ns.write_subrequest(rq, timeout);
264 ns.end_array_field(0);
265 }
266 ns.end_array(OPERATIONS);
267
268 ns.end_payload();
269 ns.end_request();
270 }
271
272 pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<WriteMultipleResult, NoSQLError> {
273 let mut walker = MapWalker::new(r)?;
274 let mut res: WriteMultipleResult = Default::default();
275 res.failed_operation_index = -1;
276 while walker.has_next() {
277 walker.next()?;
278 let name = walker.current_name();
279 match name.as_str() {
280 ERROR_CODE => {
281 walker.handle_error_code()?;
282 }
283 CONSUMED => {
284 res.consumed = Some(walker.read_nson_consumed_capacity()?);
285 //println!(" consumed={:?}", res.consumed);
286 }
287 WM_SUCCESS => {
288 // array of operation result maps
289 MapWalker::expect_type(walker.r, FieldType::Array)?;
290 let _ = walker.r.read_i32()?; // skip array size in bytes
291 let num_elements = walker.r.read_i32()?;
292 res.results = Vec::with_capacity(num_elements as usize);
293 for _n in 1..=num_elements {
294 res.results
295 .push(WriteMultipleRequest::read_result(walker.r)?);
296 }
297 }
298 WM_FAILURE => {
299 WriteMultipleRequest::read_failed_results(walker.r, &mut res)?;
300 }
301 _ => {
302 //println!(" write_multiple_result: skipping field '{}'", name);
303 walker.skip_nson_field()?;
304 }
305 }
306 }
307 Ok(res)
308 }
309
310 fn read_failed_results(
311 r: &mut Reader,
312 res: &mut WriteMultipleResult,
313 ) -> Result<(), NoSQLError> {
314 let mut walker = MapWalker::new(r)?;
315 while walker.has_next() {
316 walker.next()?;
317 let name = walker.current_name();
318 match name.as_str() {
319 WM_FAIL_INDEX => {
320 res.failed_operation_index = walker.read_nson_i32()?;
321 }
322 WM_FAIL_RESULT => {
323 res.results
324 .push(WriteMultipleRequest::read_result(walker.r)?);
325 }
326 _ => {
327 //println!(" read_failed_results: skipping field '{}'", name);
328 walker.skip_nson_field()?;
329 }
330 }
331 }
332 Ok(())
333 }
334
335 // TODO: make this common to all write result operations
336 fn read_result(r: &mut Reader) -> Result<SubOperationResult, NoSQLError> {
337 let mut walker = MapWalker::new(r)?;
338 let mut res: SubOperationResult = Default::default();
339 while walker.has_next() {
340 walker.next()?;
341 let name = walker.current_name();
342 match name.as_str() {
343 SUCCESS => {
344 //println!(" read_result: SUCCESS");
345 res.success = walker.read_nson_boolean()?;
346 }
347 ROW_VERSION => {
348 //println!(" read_result: ROW_VERSION");
349 res.version = Some(walker.read_nson_binary()?);
350 }
351 GENERATED => {
352 //println!(" read_result: GENERATED");
353 res.generated_value = Some(walker.read_nson_field_value()?);
354 //println!("generated_value={:?}", res.generated_value);
355 }
356 RETURN_INFO => {
357 //println!(" read_result: RETURN_INFO");
358 WriteMultipleRequest::read_return_info(walker.r, &mut res)?;
359 }
360 _ => {
361 //println!(" read_result: skipping field '{}'", name);
362 walker.skip_nson_field()?;
363 }
364 }
365 }
366 Ok(res)
367 }
368
369 fn read_return_info(r: &mut Reader, res: &mut SubOperationResult) -> Result<(), NoSQLError> {
370 let mut walker = MapWalker::new(r)?;
371 while walker.has_next() {
372 walker.next()?;
373 let name = walker.current_name();
374 match name.as_str() {
375 EXISTING_MOD_TIME => {
376 //println!(" read_ri: EXISTING_MOD_TIME");
377 res.existing_modification_time = walker.read_nson_i64()?;
378 }
379 //EXISTING_EXPIRATION => {
380 //println!(" read_ri: EXISTING_EXPIRATION");
381 //res.existing_expiration_time = walker.read_nson_i64()?;
382 //},
383 EXISTING_VERSION => {
384 //println!(" read_ri: EXISTING_VERSION");
385 res.existing_version = Some(walker.read_nson_binary()?);
386 }
387 EXISTING_VALUE => {
388 //println!(" read_ri: EXISTING_VALUE");
389 res.existing_value = Some(walker.read_nson_map()?);
390 }
391 _ => {
392 //println!(" read_ri: skipping field '{}'", name);
393 walker.skip_nson_field()?;
394 }
395 }
396 }
397 Ok(())
398 }
399}
400
401impl NsonRequest for WriteMultipleRequest {
402 fn serialize(&self, w: &mut Writer, timeout: &Duration) {
403 self.serialize_internal(w, timeout);
404 }
405}