oracle_nosql_rust_sdk/
write_multiple_request.rs

1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5//  https://oss.oracle.com/licenses/upl/
6//
7use crate::delete_request::DeleteRequest;
8use crate::error::NoSQLError;
9use crate::error::NoSQLErrorCode::IllegalArgument;
10use crate::handle::Handle;
11use crate::handle::SendOptions;
12use crate::nson::NsonSubRequest;
13use crate::nson::*;
14use crate::put_request::PutRequest;
15use crate::reader::Reader;
16use crate::types::{Capacity, FieldType, FieldValue, MapValue, NoSQLRow, OpCode};
17use crate::writer::Writer;
18use crate::Version;
19use std::result::Result;
20use std::time::Duration;
21
22// For doc only
23#[allow(unused_imports)]
24use crate::{DeleteResult, PutResult};
25
26/// Struct used to perform multiple [`PutRequest`]s and/or [`DeleteRequest`]s in a single operation.
27#[derive(Default, Debug)]
28pub struct WriteMultipleRequest {
29    pub(crate) table_name: String,
30    pub(crate) compartment_id: String,
31    // TODO: pub(crate) namespace: String,
32    pub(crate) timeout: Option<Duration>,
33    pub(crate) sub_requests: Vec<Box<dyn NsonSubRequest>>,
34    // TODO durability: Option<Vec<u8>>
35
36    // TODO: limiters, retry stats, etc
37}
38
39/// Struct representing the result of a single sub-operation of a [`WriteMultipleRequest`].
40#[derive(Default, Debug)]
41pub struct SubOperationResult {
42    pub(crate) success: bool,
43    pub(crate) version: Option<Version>,
44    pub(crate) consumed: Option<Capacity>,
45    pub(crate) generated_value: Option<FieldValue>,
46    pub(crate) existing_modification_time: i64,
47    pub(crate) existing_value: Option<MapValue>,
48    pub(crate) existing_version: Option<Version>,
49    // TODO: stats, etc... (base)
50}
51
52impl SubOperationResult {
53    /// Get the success result of the sub-operation.
54    pub fn success(&self) -> bool {
55        self.success
56    }
57    /// For `Put` operations,
58    /// Get the Version of the now-current record. This value is `Some` if the put operation succeeded. It
59    /// may be used in subsequent [`PutRequest::if_version()`] calls.
60    /// This is valid for `Put` operations only.
61    pub fn version(&self) -> Option<&Version> {
62        if let Some(v) = &self.version {
63            return Some(v);
64        }
65        None
66    }
67    /// Get the consumed capacity (read/write units) of the operation. This is only valid in the NoSQL Cloud Service.
68    pub fn consumed(&self) -> Option<&Capacity> {
69        if let Some(c) = &self.consumed {
70            return Some(c);
71        }
72        None
73    }
74    /// for `Put` operations, get the value generated if the operation created a new value. This can happen if the table contains an
75    /// identity column or string column declared as a generated UUID. If the table has no such column, this value is `None`.
76    /// This is valid for `Put` operations only.
77    pub fn generated_value(&self) -> Option<&FieldValue> {
78        if let Some(r) = &self.generated_value {
79            return Some(r);
80        }
81        None
82    }
83
84    /// For `Put` operations, see [`PutResult::existing_modification_time()`]. For `Delete` operations,
85    /// see [`DeleteResult::existing_modification_time()`].
86    // TODO: make this a Time field
87    pub fn existing_modification_time(&self) -> i64 {
88        self.existing_modification_time
89    }
90    /// For `Put` operations, see [`PutResult::existing_value()`]. For `Delete` operations,
91    /// see [`DeleteResult::existing_value()`].
92    pub fn existing_value(&self) -> Option<&MapValue> {
93        if let Some(v) = &self.existing_value {
94            return Some(v);
95        }
96        None
97    }
98    /// For `Put` operations, see [`PutResult::existing_version()`]. For `Delete` operations,
99    /// see [`DeleteResult::existing_version()`].
100    pub fn existing_version(&self) -> Option<&Version> {
101        if let Some(v) = &self.existing_version {
102            return Some(v);
103        }
104        None
105    }
106}
107
108/// Struct representing the combined results of a [`WriteMultipleRequest`] operation.
109#[derive(Default, Debug)]
110pub struct WriteMultipleResult {
111    pub(crate) results: Vec<SubOperationResult>,
112    pub(crate) failed_operation_index: i32,
113    pub(crate) consumed: Option<Capacity>,
114}
115
116impl WriteMultipleResult {
117    /// Get a vector of sub-operation results. This vector is ordered in the same order as
118    /// put/delete items were added to the `WriteMultipleRequest`.
119    pub fn results(&self) -> &Vec<SubOperationResult> {
120        &self.results
121    }
122    /// Get the offset of the first failed operation.
123    /// If there are no failures, -1 is returned.
124    pub fn failed_operation_index(&self) -> i32 {
125        self.failed_operation_index
126    }
127    /// Get the consumed capacity (read/write units) of the overall operation. This is only valid in the NoSQL Cloud Service.
128    pub fn consumed(&self) -> Option<&Capacity> {
129        if let Some(c) = &self.consumed {
130            return Some(c);
131        }
132        None
133    }
134}
135
136impl WriteMultipleRequest {
137    pub fn new(table_name: &str) -> WriteMultipleRequest {
138        WriteMultipleRequest {
139            table_name: table_name.to_string(),
140            ..Default::default()
141        }
142    }
143
144    /// Specify the timeout value for the request.
145    ///
146    /// This is optional.
147    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
148    /// IllegalArgument error will be returned.
149    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
150    pub fn timeout(mut self, t: &Duration) -> Self {
151        self.timeout = Some(t.clone());
152        self
153    }
154
155    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
156    ///
157    /// If the associated handle authenticated as an Instance Principal, this value must be an OCID.
158    /// In all other cases, the value may be specified as either a name (or path for nested compartments) or as an OCID.
159    ///
160    /// If no compartment is given, the default compartment id for the handle is used. If that value was
161    /// not specified, the root compartment of the tenancy will be used.
162    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
163        self.compartment_id = compartment_id.to_string();
164        self
165    }
166
167    pub fn add(mut self, r: Box<dyn NsonSubRequest>) -> WriteMultipleRequest {
168        self.sub_requests.push(r);
169        self
170    }
171
172    // let mut data = Vec<MyStruct>::new();
173    // ... fill in vector with many MyStructs ...
174    //
175    // let res: WriteMultipleResult = WriteMultipleRequest::new(tablename)
176    //   .put(data)
177    //   .execute(&handle);
178
179    // Note this consumes the collection
180    pub fn put<T>(mut self, collection: T) -> Result<WriteMultipleRequest, NoSQLError>
181    where
182        T: IntoIterator,
183        T::Item: NoSQLRow,
184    {
185        for item in collection {
186            // note: this implies collection.into_iter()
187            self.sub_requests
188                .push(Box::new(PutRequest::new("").put(item)?));
189        }
190        Ok(self)
191    }
192
193    // Note this consumes the collection
194    pub fn delete<T>(mut self, collection: T) -> Result<WriteMultipleRequest, NoSQLError>
195    where
196        T: IntoIterator,
197        T::Item: NoSQLRow,
198    {
199        for item in collection {
200            // note: this implies collection.into_iter()
201            match item.to_map_value() {
202                Ok(value) => {
203                    self.sub_requests
204                        .push(Box::new(DeleteRequest::new("", value)));
205                }
206                Err(e) => {
207                    // TODO: save error as source
208                    return Err(NoSQLError::new(
209                        IllegalArgument,
210                        &format!("could not convert struct to MapValue: {}", e.to_string()),
211                    ));
212                }
213            }
214        }
215        Ok(self)
216    }
217
218    pub async fn execute(&self, h: &Handle) -> Result<WriteMultipleResult, NoSQLError> {
219        // TODO: validate: size > 0, etc
220        let mut w: Writer = Writer::new();
221        w.write_i16(h.inner.serial_version);
222        let timeout = h.get_timeout(&self.timeout);
223        self.serialize_internal(&mut w, &timeout);
224        let mut opts = SendOptions {
225            timeout: timeout,
226            retryable: false,
227            compartment_id: self.compartment_id.clone(),
228            ..Default::default()
229        };
230        let mut r = h.send_and_receive(w, &mut opts).await?;
231        let resp = WriteMultipleRequest::nson_deserialize(&mut r)?;
232        Ok(resp)
233    }
234
235    fn serialize_internal(&self, w: &mut Writer, timeout: &Duration) {
236        let mut ns = NsonSerializer::start_request(w);
237        ns.start_header();
238
239        // TableName
240        // If all ops use the same table name, write that
241        // single table name to the output stream.
242        // If any of them are different, write all table
243        // names to the individual ops.
244        // Possible optimization: if most use one table,
245        // write that in the header and only write minority
246        // table names in specific ops.
247        // TODO if self.is_single_table() {
248        ns.write_header(OpCode::WriteMultiple, timeout, &self.table_name);
249        //} else {
250        //ns.write_header(OpCode::WriteMultiple, self.timeout_ms, "");
251        //}
252        ns.end_header();
253
254        // TODO: compartment
255        ns.start_payload();
256        ns.write_i32_field(DURABILITY, 0); // TODO
257        ns.write_i32_field(NUM_OPERATIONS, self.sub_requests.len() as i32);
258
259        // OPERATIONS: array of maps
260        ns.start_array(OPERATIONS);
261        for rq in self.sub_requests.as_slice() {
262            ns.write_subrequest(rq, timeout);
263            ns.end_array_field(0);
264        }
265        ns.end_array(OPERATIONS);
266
267        ns.end_payload();
268        ns.end_request();
269    }
270
271    pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<WriteMultipleResult, NoSQLError> {
272        let mut walker = MapWalker::new(r)?;
273        let mut res: WriteMultipleResult = Default::default();
274        res.failed_operation_index = -1;
275        while walker.has_next() {
276            walker.next()?;
277            let name = walker.current_name();
278            match name.as_str() {
279                ERROR_CODE => {
280                    walker.handle_error_code()?;
281                }
282                CONSUMED => {
283                    res.consumed = Some(walker.read_nson_consumed_capacity()?);
284                    //println!(" consumed={:?}", res.consumed);
285                }
286                WM_SUCCESS => {
287                    // array of operation result maps
288                    MapWalker::expect_type(walker.r, FieldType::Array)?;
289                    let _ = walker.r.read_i32()?; // skip array size in bytes
290                    let num_elements = walker.r.read_i32()?;
291                    res.results = Vec::with_capacity(num_elements as usize);
292                    for _n in 1..=num_elements {
293                        res.results
294                            .push(WriteMultipleRequest::read_result(walker.r)?);
295                    }
296                }
297                WM_FAILURE => {
298                    WriteMultipleRequest::read_failed_results(walker.r, &mut res)?;
299                }
300                _ => {
301                    //println!("   write_multiple_result: skipping field '{}'", name);
302                    walker.skip_nson_field()?;
303                }
304            }
305        }
306        Ok(res)
307    }
308
309    fn read_failed_results(
310        r: &mut Reader,
311        res: &mut WriteMultipleResult,
312    ) -> Result<(), NoSQLError> {
313        let mut walker = MapWalker::new(r)?;
314        while walker.has_next() {
315            walker.next()?;
316            let name = walker.current_name();
317            match name.as_str() {
318                WM_FAIL_INDEX => {
319                    res.failed_operation_index = walker.read_nson_i32()?;
320                }
321                WM_FAIL_RESULT => {
322                    res.results
323                        .push(WriteMultipleRequest::read_result(walker.r)?);
324                }
325                _ => {
326                    //println!("   read_failed_results: skipping field '{}'", name);
327                    walker.skip_nson_field()?;
328                }
329            }
330        }
331        Ok(())
332    }
333
334    // TODO: make this common to all write result operations
335    fn read_result(r: &mut Reader) -> Result<SubOperationResult, NoSQLError> {
336        let mut walker = MapWalker::new(r)?;
337        let mut res: SubOperationResult = Default::default();
338        while walker.has_next() {
339            walker.next()?;
340            let name = walker.current_name();
341            match name.as_str() {
342                SUCCESS => {
343                    //println!("   read_result: SUCCESS");
344                    res.success = walker.read_nson_boolean()?;
345                }
346                ROW_VERSION => {
347                    //println!("   read_result: ROW_VERSION");
348                    res.version = Some(walker.read_nson_binary()?);
349                }
350                GENERATED => {
351                    //println!("   read_result: GENERATED");
352                    res.generated_value = Some(walker.read_nson_field_value()?);
353                    //println!("generated_value={:?}", res.generated_value);
354                }
355                RETURN_INFO => {
356                    //println!("   read_result: RETURN_INFO");
357                    WriteMultipleRequest::read_return_info(walker.r, &mut res)?;
358                }
359                _ => {
360                    //println!("   read_result: skipping field '{}'", name);
361                    walker.skip_nson_field()?;
362                }
363            }
364        }
365        Ok(res)
366    }
367
368    fn read_return_info(r: &mut Reader, res: &mut SubOperationResult) -> Result<(), NoSQLError> {
369        let mut walker = MapWalker::new(r)?;
370        while walker.has_next() {
371            walker.next()?;
372            let name = walker.current_name();
373            match name.as_str() {
374                EXISTING_MOD_TIME => {
375                    //println!("   read_ri: EXISTING_MOD_TIME");
376                    res.existing_modification_time = walker.read_nson_i64()?;
377                }
378                //EXISTING_EXPIRATION => {
379                //println!("   read_ri: EXISTING_EXPIRATION");
380                //res.existing_expiration_time = walker.read_nson_i64()?;
381                //},
382                EXISTING_VERSION => {
383                    //println!("   read_ri: EXISTING_VERSION");
384                    res.existing_version = Some(walker.read_nson_binary()?);
385                }
386                EXISTING_VALUE => {
387                    //println!("   read_ri: EXISTING_VALUE");
388                    res.existing_value = Some(walker.read_nson_map()?);
389                }
390                _ => {
391                    //println!("   read_ri: skipping field '{}'", name);
392                    walker.skip_nson_field()?;
393                }
394            }
395        }
396        Ok(())
397    }
398}
399
400impl NsonRequest for WriteMultipleRequest {
401    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
402        self.serialize_internal(w, timeout);
403    }
404}