oracle_nosql_rust_sdk/
write_multiple_request.rs

1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5//  https://oss.oracle.com/licenses/upl/
6//
7use crate::delete_request::DeleteRequest;
8use crate::error::NoSQLError;
9use crate::error::NoSQLErrorCode::IllegalArgument;
10use crate::handle::Handle;
11use crate::handle::SendOptions;
12use crate::nson::NsonSubRequest;
13use crate::nson::*;
14use crate::put_request::PutRequest;
15use crate::reader::Reader;
16use crate::types::{Capacity, FieldType, FieldValue, MapValue, NoSQLRow, OpCode};
17use crate::writer::Writer;
18use crate::Version;
19use std::result::Result;
20use std::time::Duration;
21
22// For doc only
23#[allow(unused_imports)]
24use crate::{DeleteResult, PutResult};
25
26/// Struct used to perform multiple [`PutRequest`]s and/or [`DeleteRequest`]s in a single operation.
27#[derive(Default, Debug)]
28pub struct WriteMultipleRequest {
29    pub(crate) table_name: String,
30    pub(crate) compartment_id: String,
31    // TODO: pub(crate) namespace: String,
32    pub(crate) timeout: Option<Duration>,
33    pub(crate) sub_requests: Vec<Box<dyn NsonSubRequest>>,
34    // TODO durability: Option<Vec<u8>>
35
36    // TODO: limiters, retry stats, etc
37}
38
39/// Struct representing the result of a single sub-operation of a [`WriteMultipleRequest`].
40#[derive(Default, Debug)]
41pub struct SubOperationResult {
42    pub(crate) success: bool,
43    pub(crate) version: Option<Version>,
44    pub(crate) consumed: Option<Capacity>,
45    pub(crate) generated_value: Option<FieldValue>,
46    pub(crate) existing_modification_time: i64,
47    pub(crate) existing_value: Option<MapValue>,
48    pub(crate) existing_version: Option<Version>,
49    // TODO: stats, etc... (base)
50}
51
52impl SubOperationResult {
53    /// Get the success result of the sub-operation.
54    pub fn success(&self) -> bool {
55        self.success
56    }
57    /// For `Put` operations,
58    /// Get the Version of the now-current record. This value is `Some` if the put operation succeeded. It
59    /// may be used in subsequent [`PutRequest::if_version()`] calls.
60    /// This is valid for `Put` operations only.
61    pub fn version(&self) -> Option<&Version> {
62        if let Some(v) = &self.version {
63            return Some(v);
64        }
65        None
66    }
67    /// Get the consumed capacity (read/write units) of the operation. This is only valid in the NoSQL Cloud Service.
68    pub fn consumed(&self) -> Option<&Capacity> {
69        if let Some(c) = &self.consumed {
70            return Some(c);
71        }
72        None
73    }
74    /// for `Put` operations, get the value generated if the operation created a new value. This can happen if the table contains an
75    /// identity column or string column declared as a generated UUID. If the table has no such column, this value is `None`.
76    /// This is valid for `Put` operations only.
77    pub fn generated_value(&self) -> Option<&FieldValue> {
78        if let Some(r) = &self.generated_value {
79            return Some(r);
80        }
81        None
82    }
83
84    /// For `Put` operations, see [`PutResult::existing_modification_time()`]. For `Delete` operations,
85    /// see [`DeleteResult::existing_modification_time()`].
86    // TODO: make this a Time field
87    pub fn existing_modification_time(&self) -> i64 {
88        self.existing_modification_time
89    }
90    /// For `Put` operations, see [`PutResult::existing_value()`]. For `Delete` operations,
91    /// see [`DeleteResult::existing_value()`].
92    pub fn existing_value(&self) -> Option<&MapValue> {
93        if let Some(v) = &self.existing_value {
94            return Some(v);
95        }
96        None
97    }
98    /// For `Put` operations, see [`PutResult::existing_version()`]. For `Delete` operations,
99    /// see [`DeleteResult::existing_version()`].
100    pub fn existing_version(&self) -> Option<&Version> {
101        if let Some(v) = &self.existing_version {
102            return Some(v);
103        }
104        None
105    }
106}
107
108/// Struct representing the combined results of a [`WriteMultipleRequest`] operation.
109#[derive(Default, Debug)]
110pub struct WriteMultipleResult {
111    pub(crate) results: Vec<SubOperationResult>,
112    pub(crate) failed_operation_index: i32,
113    pub(crate) consumed: Option<Capacity>,
114}
115
116impl WriteMultipleResult {
117    /// Get a vector of sub-operation results. This vector is ordered in the same order as
118    /// put/delete items were added to the `WriteMultipleRequest`.
119    pub fn results(&self) -> &Vec<SubOperationResult> {
120        &self.results
121    }
122    /// Get the offset of the first failed operation.
123    /// If there are no failures, -1 is returned.
124    pub fn failed_operation_index(&self) -> i32 {
125        self.failed_operation_index
126    }
127    /// Get the consumed capacity (read/write units) of the overall operation. This is only valid in the NoSQL Cloud Service.
128    pub fn consumed(&self) -> Option<&Capacity> {
129        if let Some(c) = &self.consumed {
130            return Some(c);
131        }
132        None
133    }
134}
135
136impl WriteMultipleRequest {
137    pub fn new(table_name: &str) -> WriteMultipleRequest {
138        WriteMultipleRequest {
139            table_name: table_name.to_string(),
140            ..Default::default()
141        }
142    }
143
144    /// Specify the timeout value for the request.
145    ///
146    /// This is optional.
147    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
148    /// IllegalArgument error will be returned.
149    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
150    pub fn timeout(mut self, t: &Duration) -> Self {
151        self.timeout = Some(t.clone());
152        self
153    }
154
155    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
156    ///
157    /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
158    /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
159    /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
160    /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
161    ///
162    /// If no compartment is given, the root compartment of the tenancy will be used.
163    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
164        self.compartment_id = compartment_id.to_string();
165        self
166    }
167
168    pub fn add(mut self, r: Box<dyn NsonSubRequest>) -> WriteMultipleRequest {
169        self.sub_requests.push(r);
170        self
171    }
172
173    // let mut data = Vec<MyStruct>::new();
174    // ... fill in vector with many MyStructs ...
175    //
176    // let res: WriteMultipleResult = WriteMultipleRequest::new(tablename)
177    //   .put(data)
178    //   .execute(&handle);
179
180    // Note this consumes the collection
181    pub fn put<T>(mut self, collection: T) -> Result<WriteMultipleRequest, NoSQLError>
182    where
183        T: IntoIterator,
184        T::Item: NoSQLRow,
185    {
186        for item in collection {
187            // note: this implies collection.into_iter()
188            self.sub_requests
189                .push(Box::new(PutRequest::new("").put(item)?));
190        }
191        Ok(self)
192    }
193
194    // Note this consumes the collection
195    pub fn delete<T>(mut self, collection: T) -> Result<WriteMultipleRequest, NoSQLError>
196    where
197        T: IntoIterator,
198        T::Item: NoSQLRow,
199    {
200        for item in collection {
201            // note: this implies collection.into_iter()
202            match item.to_map_value() {
203                Ok(value) => {
204                    self.sub_requests
205                        .push(Box::new(DeleteRequest::new("", value)));
206                }
207                Err(e) => {
208                    // TODO: save error as source
209                    return Err(NoSQLError::new(
210                        IllegalArgument,
211                        &format!("could not convert struct to MapValue: {}", e.to_string()),
212                    ));
213                }
214            }
215        }
216        Ok(self)
217    }
218
219    pub async fn execute(&self, h: &Handle) -> Result<WriteMultipleResult, NoSQLError> {
220        // TODO: validate: size > 0, etc
221        let mut w: Writer = Writer::new();
222        w.write_i16(h.inner.serial_version);
223        let timeout = h.get_timeout(&self.timeout);
224        self.serialize_internal(&mut w, &timeout);
225        let mut opts = SendOptions {
226            timeout: timeout,
227            retryable: false,
228            compartment_id: self.compartment_id.clone(),
229            ..Default::default()
230        };
231        let mut r = h.send_and_receive(w, &mut opts).await?;
232        let resp = WriteMultipleRequest::nson_deserialize(&mut r)?;
233        Ok(resp)
234    }
235
236    fn serialize_internal(&self, w: &mut Writer, timeout: &Duration) {
237        let mut ns = NsonSerializer::start_request(w);
238        ns.start_header();
239
240        // TableName
241        // If all ops use the same table name, write that
242        // single table name to the output stream.
243        // If any of them are different, write all table
244        // names to the individual ops.
245        // Possible optimization: if most use one table,
246        // write that in the header and only write minority
247        // table names in specific ops.
248        // TODO if self.is_single_table() {
249        ns.write_header(OpCode::WriteMultiple, timeout, &self.table_name);
250        //} else {
251        //ns.write_header(OpCode::WriteMultiple, self.timeout_ms, "");
252        //}
253        ns.end_header();
254
255        // TODO: compartment
256        ns.start_payload();
257        ns.write_i32_field(DURABILITY, 0); // TODO
258        ns.write_i32_field(NUM_OPERATIONS, self.sub_requests.len() as i32);
259
260        // OPERATIONS: array of maps
261        ns.start_array(OPERATIONS);
262        for rq in self.sub_requests.as_slice() {
263            ns.write_subrequest(rq, timeout);
264            ns.end_array_field(0);
265        }
266        ns.end_array(OPERATIONS);
267
268        ns.end_payload();
269        ns.end_request();
270    }
271
272    pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<WriteMultipleResult, NoSQLError> {
273        let mut walker = MapWalker::new(r)?;
274        let mut res: WriteMultipleResult = Default::default();
275        res.failed_operation_index = -1;
276        while walker.has_next() {
277            walker.next()?;
278            let name = walker.current_name();
279            match name.as_str() {
280                ERROR_CODE => {
281                    walker.handle_error_code()?;
282                }
283                CONSUMED => {
284                    res.consumed = Some(walker.read_nson_consumed_capacity()?);
285                    //println!(" consumed={:?}", res.consumed);
286                }
287                WM_SUCCESS => {
288                    // array of operation result maps
289                    MapWalker::expect_type(walker.r, FieldType::Array)?;
290                    let _ = walker.r.read_i32()?; // skip array size in bytes
291                    let num_elements = walker.r.read_i32()?;
292                    res.results = Vec::with_capacity(num_elements as usize);
293                    for _n in 1..=num_elements {
294                        res.results
295                            .push(WriteMultipleRequest::read_result(walker.r)?);
296                    }
297                }
298                WM_FAILURE => {
299                    WriteMultipleRequest::read_failed_results(walker.r, &mut res)?;
300                }
301                _ => {
302                    //println!("   write_multiple_result: skipping field '{}'", name);
303                    walker.skip_nson_field()?;
304                }
305            }
306        }
307        Ok(res)
308    }
309
310    fn read_failed_results(
311        r: &mut Reader,
312        res: &mut WriteMultipleResult,
313    ) -> Result<(), NoSQLError> {
314        let mut walker = MapWalker::new(r)?;
315        while walker.has_next() {
316            walker.next()?;
317            let name = walker.current_name();
318            match name.as_str() {
319                WM_FAIL_INDEX => {
320                    res.failed_operation_index = walker.read_nson_i32()?;
321                }
322                WM_FAIL_RESULT => {
323                    res.results
324                        .push(WriteMultipleRequest::read_result(walker.r)?);
325                }
326                _ => {
327                    //println!("   read_failed_results: skipping field '{}'", name);
328                    walker.skip_nson_field()?;
329                }
330            }
331        }
332        Ok(())
333    }
334
335    // TODO: make this common to all write result operations
336    fn read_result(r: &mut Reader) -> Result<SubOperationResult, NoSQLError> {
337        let mut walker = MapWalker::new(r)?;
338        let mut res: SubOperationResult = Default::default();
339        while walker.has_next() {
340            walker.next()?;
341            let name = walker.current_name();
342            match name.as_str() {
343                SUCCESS => {
344                    //println!("   read_result: SUCCESS");
345                    res.success = walker.read_nson_boolean()?;
346                }
347                ROW_VERSION => {
348                    //println!("   read_result: ROW_VERSION");
349                    res.version = Some(walker.read_nson_binary()?);
350                }
351                GENERATED => {
352                    //println!("   read_result: GENERATED");
353                    res.generated_value = Some(walker.read_nson_field_value()?);
354                    //println!("generated_value={:?}", res.generated_value);
355                }
356                RETURN_INFO => {
357                    //println!("   read_result: RETURN_INFO");
358                    WriteMultipleRequest::read_return_info(walker.r, &mut res)?;
359                }
360                _ => {
361                    //println!("   read_result: skipping field '{}'", name);
362                    walker.skip_nson_field()?;
363                }
364            }
365        }
366        Ok(res)
367    }
368
369    fn read_return_info(r: &mut Reader, res: &mut SubOperationResult) -> Result<(), NoSQLError> {
370        let mut walker = MapWalker::new(r)?;
371        while walker.has_next() {
372            walker.next()?;
373            let name = walker.current_name();
374            match name.as_str() {
375                EXISTING_MOD_TIME => {
376                    //println!("   read_ri: EXISTING_MOD_TIME");
377                    res.existing_modification_time = walker.read_nson_i64()?;
378                }
379                //EXISTING_EXPIRATION => {
380                //println!("   read_ri: EXISTING_EXPIRATION");
381                //res.existing_expiration_time = walker.read_nson_i64()?;
382                //},
383                EXISTING_VERSION => {
384                    //println!("   read_ri: EXISTING_VERSION");
385                    res.existing_version = Some(walker.read_nson_binary()?);
386                }
387                EXISTING_VALUE => {
388                    //println!("   read_ri: EXISTING_VALUE");
389                    res.existing_value = Some(walker.read_nson_map()?);
390                }
391                _ => {
392                    //println!("   read_ri: skipping field '{}'", name);
393                    walker.skip_nson_field()?;
394                }
395            }
396        }
397        Ok(())
398    }
399}
400
401impl NsonRequest for WriteMultipleRequest {
402    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
403        self.serialize_internal(w, timeout);
404    }
405}