oracle_nosql_rust_sdk/
table_request.rs

1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5//  https://oss.oracle.com/licenses/upl/
6//
7use crate::error::NoSQLErrorCode::RequestTimeout;
8use crate::error::{ia_err, NoSQLError};
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::reader::Reader;
13use crate::types::{OpCode, TableLimits, TableState};
14use crate::writer::Writer;
15use std::result::Result;
16use std::thread::sleep;
17use std::time::{Duration, Instant};
18
19/// Struct used for creating or modifying a table in the NoSQL Database.
20///
21/// This is the main method for creating, altering, and dropping tables in the
22/// NoSQL Database. It can also be used to alter table limits for Cloud operation.
23///
24/// Example:
25/// ```no_run
26/// use oracle_nosql_rust_sdk::TableRequest;
27/// use oracle_nosql_rust_sdk::types::*;
28/// # use oracle_nosql_rust_sdk::Handle;
29/// # use std::error::Error;
30/// # #[tokio::main]
31/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
32/// # let handle = Handle::builder().build().await?;
33///     // Create an example table
34///     TableRequest::new("testusers")
35///         .statement(
36///             "create table if not exists testusers (id integer, name string,
37///             created timestamp(3), primary key(id))",
38///         )
39///         // the following line is only needed for Cloud mode
40///         .limits(&TableLimits::provisioned(1000, 1000, 10))
41///         .execute(&handle)
42///         .await?
43///         // wait up to 15 seconds for table to be created
44///         .wait_for_completion_ms(&handle, 15000, 500)
45///         .await?;
46/// # Ok(())
47/// # }
48///```
49
50#[derive(Default, Debug)]
51pub struct TableRequest {
52    pub(crate) table_name: String,
53    pub(crate) compartment_id: String,
54    pub(crate) namespace: String,
55    pub(crate) timeout: Option<Duration>,
56    pub(crate) statement: String,
57    pub(crate) limits: Option<TableLimits>,
58    pub(crate) match_etag: Option<String>,
59    // TODO: tags
60}
61
62/// Struct used to get information about a table in the NoSQL Database.
63#[derive(Default, Debug)]
64pub struct GetTableRequest {
65    pub(crate) table_name: String,
66    pub(crate) compartment_id: String,
67    pub(crate) namespace: String,
68    pub(crate) operation_id: String,
69    pub(crate) timeout: Option<Duration>,
70    // TODO: tags
71}
72
73/// Struct representing the result of a [`TableRequest`] or a [`GetTableRequest`].
74#[derive(Default, Debug)]
75pub struct TableResult {
76    pub(crate) table_name: String,
77    pub(crate) compartment_id: String, // TODO: Option<>?
78    pub(crate) namespace: String,      // TODO: Option<>?
79    pub(crate) table_ocid: String,
80    pub(crate) ddl: String,
81    pub(crate) operation_id: String, // TODO: Option<>?
82    pub(crate) schema: String,
83    pub(crate) state: TableState,
84    pub(crate) limits: Option<TableLimits>,
85    pub(crate) match_etag: Option<String>,
86    // TODO: MRT fields
87}
88
89impl TableRequest {
90    /// Create a new TableRequest.
91    ///
92    /// `table_name` is required and must be non-empty.
93    pub fn new(table_name: &str) -> TableRequest {
94        TableRequest {
95            table_name: table_name.to_string(),
96            ..Default::default()
97        }
98    }
99
100    /// Specify the timeout value for the request.
101    ///
102    /// This is optional.
103    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
104    /// IllegalArgument error will be returned.
105    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
106    ///
107    /// Note this is just the timeout for the initial request. The actual operation may take significantly longer,
108    /// and its completion should be waited for by calling [`TableResult::wait_for_completion()`].
109    pub fn timeout(mut self, t: &Duration) -> Self {
110        self.timeout = Some(t.clone());
111        self
112    }
113
114    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
115    ///
116    /// If the associated handle authenticated as an Instance Principal, this value must be an OCID.
117    /// In all other cases, the value may be specified as either a name (or path for nested compartments) or as an OCID.
118    ///
119    /// If no compartment is given, the default compartment id for the handle is used. If that value was
120    /// not specified, the root compartment of the tenancy will be used.
121    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
122        self.compartment_id = compartment_id.to_string();
123        self
124    }
125
126    /// On-premises only: set the namespace for the operation.
127    pub fn namespace(mut self, namespace: &str) -> TableRequest {
128        self.namespace = namespace.to_string();
129        self
130    }
131
132    /// Set the DDL statement for the table operation.
133    ///
134    /// This is required, unless the operation is used solely to change the table
135    /// limits with [`TableRequest::limits()`].
136    pub fn statement(mut self, stmt: &str) -> TableRequest {
137        self.statement = stmt.to_string();
138        self
139    }
140
141    /// Cloud only: specify table limits for the table.
142    ///
143    /// This method can be used when creating a table, or later to change the
144    /// limits on an existing table.
145    pub fn limits(mut self, limits: &TableLimits) -> TableRequest {
146        self.limits = Some(limits.clone());
147        self
148    }
149
150    /// Cloud only: set a matching tag for the operation to succeed.
151    ///
152    /// This method sets an ETag in the request that must be matched for the operation
153    /// to proceed. The ETag must be non-empty and have been returned in a
154    /// previous [`TableResult`]. This is a form of optimistic concurrency
155    /// control, allowing an application to ensure that no unexpected modifications
156    /// have been made to the table.
157    pub fn match_etag(mut self, match_etag: &str) -> TableRequest {
158        self.match_etag = Some(match_etag.to_string());
159        self
160    }
161
162    /// Execute the table request.
163    ///
164    /// This starts the asynchronous execution of the request in the system. The returned result should be
165    /// used to wait for completion by calling [`TableResult::wait_for_completion()`].
166    pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
167        // TODO: validate
168        let mut w: Writer = Writer::new();
169        w.write_i16(h.inner.serial_version);
170        let timeout = h.get_timeout(&self.timeout);
171        self.nson_serialize(&mut w, &timeout);
172        let mut opts = SendOptions {
173            timeout: timeout,
174            retryable: false,
175            compartment_id: self.compartment_id.clone(),
176            ..Default::default()
177        };
178        let mut r = h.send_and_receive(w, &mut opts).await?;
179        let resp = TableRequest::nson_deserialize(&mut r)?;
180        Ok(resp)
181    }
182
183    pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
184        let mut ns = NsonSerializer::start_request(w);
185        ns.start_header();
186        ns.write_header(OpCode::TableRequest, timeout, &self.table_name);
187        ns.end_header();
188
189        // payload
190        ns.start_payload();
191        ns.write_string_field(STATEMENT, &self.statement);
192        ns.write_limits(&self.limits);
193        // TODO: freeform/defined tags
194        if let Some(etag) = &self.match_etag {
195            ns.write_string_field(ETAG, etag);
196        }
197        // TODO: these are currently only in http headers. Add to NSON?
198        //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
199        //ns.write_string_field(NAMESPACE, &self.namespace);
200        ns.end_payload();
201
202        ns.end_request();
203    }
204
205    pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<TableResult, NoSQLError> {
206        let mut walker = MapWalker::new(r)?;
207        let mut res: TableResult = Default::default();
208        while walker.has_next() {
209            walker.next()?;
210            let name = walker.current_name();
211            match name.as_str() {
212                ERROR_CODE => {
213                    walker.handle_error_code()?;
214                }
215                COMPARTMENT_OCID => {
216                    res.compartment_id = walker.read_nson_string()?;
217                    //println!(" comp_id={:?}", res.compartment_id);
218                }
219                NAMESPACE => {
220                    res.namespace = walker.read_nson_string()?;
221                    //println!(" namespace={:?}", res.namespace);
222                }
223                TABLE_OCID => {
224                    res.table_ocid = walker.read_nson_string()?;
225                    //println!(" table_ocid={:?}", res.table_ocid);
226                }
227                TABLE_NAME => {
228                    res.table_name = walker.read_nson_string()?;
229                    //println!(" table_name={:?}", res.table_name);
230                }
231                TABLE_SCHEMA => {
232                    res.schema = walker.read_nson_string()?;
233                    //println!(" schema={:?}", res.schema);
234                }
235                TABLE_DDL => {
236                    res.ddl = walker.read_nson_string()?;
237                    //println!(" ddl={:?}", res.ddl);
238                }
239                OPERATION_ID => {
240                    res.operation_id = walker.read_nson_string()?;
241                    //println!(" operation_id={:?}", res.operation_id);
242                }
243                LIMITS => {
244                    res.limits = Some(walker.read_nson_limits()?);
245                    //println!(" limits={:?}", res.limits);
246                }
247                TABLE_STATE => {
248                    let s = walker.read_nson_i32()?;
249                    res.state = TableState::from_int(s)?;
250                    //println!(" state={:?}", res.state);
251                }
252                ETAG => {
253                    res.match_etag = Some(walker.read_nson_string()?);
254                }
255                _ => {
256                    //println!("   table_result: skipping field '{}'", name);
257                    walker.skip_nson_field()?;
258                }
259            }
260        }
261        Ok(res)
262    }
263}
264
265impl NsonRequest for TableRequest {
266    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
267        self.nson_serialize(w, timeout);
268    }
269}
270
271impl GetTableRequest {
272    pub fn new(table_name: &str) -> GetTableRequest {
273        GetTableRequest {
274            table_name: table_name.to_string(),
275            ..Default::default()
276        }
277    }
278
279    /// Specify the timeout value for the request.
280    ///
281    /// This is optional.
282    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
283    /// IllegalArgument error will be returned.
284    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
285    pub fn timeout(mut self, t: &Duration) -> Self {
286        self.timeout = Some(t.clone());
287        self
288    }
289
290    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
291    ///
292    /// If the associated handle authenticated as an Instance Principal, this value must be an OCID.
293    /// In all other cases, the value may be specified as either a name (or path for nested compartments) or as an OCID.
294    ///
295    /// If no compartment is given, the default compartment id for the handle is used. If that value was
296    /// not specified, the root compartment of the tenancy will be used.
297    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
298        self.compartment_id = compartment_id.to_string();
299        self
300    }
301
302    pub fn operation_id(mut self, op_id: &str) -> GetTableRequest {
303        self.operation_id = op_id.to_string();
304        self
305    }
306
307    /// On-premises only: set the namespace for the operation.
308    pub fn namespace(mut self, namespace: &str) -> GetTableRequest {
309        self.namespace = namespace.to_string();
310        self
311    }
312
313    pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
314        // TODO: validate
315        let mut w: Writer = Writer::new();
316        w.write_i16(h.inner.serial_version);
317        let timeout = h.get_timeout(&self.timeout);
318        self.nson_serialize(&mut w, &timeout);
319        let mut opts = SendOptions {
320            timeout: timeout,
321            retryable: true,
322            compartment_id: self.compartment_id.clone(),
323            namespace: self.namespace.clone(),
324            ..Default::default()
325        };
326        let mut r = h.send_and_receive(w, &mut opts).await?;
327        let resp = TableRequest::nson_deserialize(&mut r)?;
328        Ok(resp)
329    }
330
331    pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
332        let mut ns = NsonSerializer::start_request(w);
333        ns.start_header();
334        ns.write_header(OpCode::GetTable, timeout, &self.table_name);
335        ns.end_header();
336
337        // payload
338        ns.start_payload();
339        ns.write_string_field(OPERATION_ID, &self.operation_id);
340        // TODO: these are currently only in http headers. Add to NSON?
341        //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
342        //ns.write_string_field(NAMESPACE, &self.namespace);
343        ns.end_payload();
344
345        ns.end_request();
346    }
347}
348
349impl NsonRequest for GetTableRequest {
350    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
351        self.nson_serialize(w, timeout);
352    }
353}
354
355impl TableResult {
356    /// Wait for a TableRequest to complete.
357    ///
358    /// This method will loop, polling the system for the status of the SystemRequest
359    /// until it either succeeds, gets an error, or times out.
360    pub async fn wait_for_completion(
361        &mut self,
362        h: &Handle,
363        wait: Duration,
364        delay: Duration,
365    ) -> Result<(), NoSQLError> {
366        if self.is_terminal() {
367            return Ok(());
368        }
369        if wait < delay {
370            return ia_err!("wait duration must be greater than delay duration");
371        }
372
373        let start_time = Instant::now();
374        let mut first_loop = true;
375
376        while self.is_terminal() == false {
377            //println!("  table-request: elapsed={:?}", start_time.elapsed());
378            if start_time.elapsed() > wait {
379                return Err(NoSQLError::new(
380                    RequestTimeout,
381                    "operation not completed in expected time",
382                ));
383            }
384
385            let get_request = GetTableRequest::new(self.table_name.as_str())
386                .operation_id(self.operation_id.as_str())
387                .compartment_id(self.compartment_id.as_str())
388                .namespace(self.namespace.as_str());
389            // TODO: namespace? Java sdk doesn't add it...?
390
391            if !first_loop {
392                sleep(delay);
393            }
394
395            let res = get_request.execute(h).await?;
396
397            // TODO: copy_most method?
398            self.state = res.state;
399            self.limits = res.limits;
400            self.schema = res.schema;
401            self.ddl = res.ddl;
402            // TODO: tags, MRT data
403
404            first_loop = false;
405        }
406
407        Ok(())
408    }
409
410    /// Wait for a TableRequest to complete.
411    ///
412    /// This method will loop, polling the system for the status of the SystemRequest
413    /// until it either succeeds, gets an error, or times out.
414    ///
415    /// This is a convenience method to allow direct millisecond values instead of creating
416    /// `Duration` structs.
417    pub async fn wait_for_completion_ms(
418        &mut self,
419        h: &Handle,
420        wait_ms: u64,
421        delay_ms: u64,
422    ) -> Result<(), NoSQLError> {
423        self.wait_for_completion(
424            h,
425            Duration::from_millis(wait_ms),
426            Duration::from_millis(delay_ms),
427        )
428        .await
429    }
430
431    fn is_terminal(&self) -> bool {
432        self.state == TableState::Active || self.state == TableState::Dropped
433    }
434
435    /// Get the table name.
436    ///
437    /// This is only valid for [`GetTableRequest`].
438    pub fn table_name(&self) -> String {
439        self.table_name.clone()
440    }
441    /// Cloud only: get the compartment id of the table.
442    ///
443    /// This is only valid for [`GetTableRequest`].
444    pub fn compartment_id(&self) -> String {
445        self.compartment_id.clone()
446    }
447    /// On-premises only: get the namespace of the table.
448    ///
449    /// This is only valid for [`GetTableRequest`].
450    pub fn namespace(&self) -> String {
451        self.namespace.clone()
452    }
453    /// Cloud only: get the OCID of the table.
454    ///
455    /// This is only valid for [`GetTableRequest`].
456    pub fn table_ocid(&self) -> String {
457        self.table_ocid.clone()
458    }
459    /// Get the DDL statement that was used to create the table.
460    ///
461    /// Note this will reflect any `ALTER TABLE` operations as well.
462    pub fn ddl(&self) -> String {
463        self.ddl.clone()
464    }
465    /// Get the internal operation ID for an in-progress table request.
466    ///
467    /// This is typically not needed by applications; it is available for testing purposes only.
468    /// Internally, [`TableResult::wait_for_completion()`] uses this value when polling the system.
469    pub fn operation_id(&self) -> String {
470        self.operation_id.clone()
471    }
472    /// Get the schema of the table.
473    ///
474    /// Note this will reflect any `ALTER TABLE` operations as well.
475    pub fn schema(&self) -> String {
476        self.schema.clone()
477    }
478    /// Get the current state of the table.
479    pub fn state(&self) -> TableState {
480        self.state.clone()
481    }
482    /// Cloud only: get the table limits.
483    pub fn limits(&self) -> Option<TableLimits> {
484        if let Some(l) = &self.limits {
485            return Some(l.clone());
486        }
487        None
488    }
489    /// Cloud only: get the match ETag for the table.
490    ///
491    /// see [`TableRequest::match_etag()`] for more details.
492    pub fn match_etag(&self) -> Option<String> {
493        if let Some(etag) = &self.match_etag {
494            return Some(etag.clone());
495        }
496        None
497    }
498}