oracle_nosql_rust_sdk/
table_request.rs

1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5//  https://oss.oracle.com/licenses/upl/
6//
7use crate::error::NoSQLErrorCode::RequestTimeout;
8use crate::error::{ia_err, NoSQLError};
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::reader::Reader;
13use crate::types::{OpCode, TableLimits, TableState};
14use crate::writer::Writer;
15use std::result::Result;
16use std::thread::sleep;
17use std::time::{Duration, Instant};
18
19/// Struct used for creating or modifying a table in the NoSQL Database.
20///
21/// This is the main method for creating, altering, and dropping tables in the
22/// NoSQL Database. It can also be used to alter table limits for Cloud operation.
23///
24/// Example:
25/// ```no_run
26/// use oracle_nosql_rust_sdk::TableRequest;
27/// use oracle_nosql_rust_sdk::types::*;
28/// # use oracle_nosql_rust_sdk::Handle;
29/// # use std::error::Error;
30/// # #[tokio::main]
31/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
32/// # let handle = Handle::builder().build().await?;
33///     // Create an example table
34///     TableRequest::new("testusers")
35///         .statement(
36///             "create table if not exists testusers (id integer, name string,
37///             created timestamp(3), primary key(id))",
38///         )
39///         // the following line is only needed for Cloud mode
40///         .limits(&TableLimits::provisioned(1000, 1000, 10))
41///         .execute(&handle)
42///         .await?
43///         // wait up to 15 seconds for table to be created
44///         .wait_for_completion_ms(&handle, 15000, 500)
45///         .await?;
46/// # Ok(())
47/// # }
48///```
49
50#[derive(Default, Debug)]
51pub struct TableRequest {
52    pub(crate) table_name: String,
53    pub(crate) compartment_id: String,
54    pub(crate) namespace: String,
55    pub(crate) timeout: Option<Duration>,
56    pub(crate) statement: String,
57    pub(crate) limits: Option<TableLimits>,
58    pub(crate) match_etag: Option<String>,
59    // TODO: tags
60}
61
62/// Struct used to get information about a table in the NoSQL Database.
63#[derive(Default, Debug)]
64pub struct GetTableRequest {
65    pub(crate) table_name: String,
66    pub(crate) compartment_id: String,
67    pub(crate) namespace: String,
68    pub(crate) operation_id: String,
69    pub(crate) timeout: Option<Duration>,
70    // TODO: tags
71}
72
73/// Struct representing the result of a [`TableRequest`] or a [`GetTableRequest`].
74#[derive(Default, Debug)]
75pub struct TableResult {
76    pub(crate) table_name: String,
77    pub(crate) compartment_id: String, // TODO: Option<>?
78    pub(crate) namespace: String,      // TODO: Option<>?
79    pub(crate) table_ocid: String,
80    pub(crate) ddl: String,
81    pub(crate) operation_id: String, // TODO: Option<>?
82    pub(crate) schema: String,
83    pub(crate) state: TableState,
84    pub(crate) limits: Option<TableLimits>,
85    pub(crate) match_etag: Option<String>,
86    // TODO: MRT fields
87}
88
89impl TableRequest {
90    /// Create a new TableRequest.
91    ///
92    /// `table_name` is required and must be non-empty.
93    pub fn new(table_name: &str) -> TableRequest {
94        TableRequest {
95            table_name: table_name.to_string(),
96            ..Default::default()
97        }
98    }
99
100    /// Specify the timeout value for the request.
101    ///
102    /// This is optional.
103    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
104    /// IllegalArgument error will be returned.
105    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
106    ///
107    /// Note this is just the timeout for the initial request. The actual operation may take significantly longer,
108    /// and its completion should be waited for by calling [`TableResult::wait_for_completion()`].
109    pub fn timeout(mut self, t: &Duration) -> Self {
110        self.timeout = Some(t.clone());
111        self
112    }
113
114    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
115    ///
116    /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
117    /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
118    /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
119    /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
120    ///
121    /// If no compartment is given, the root compartment of the tenancy will be used.
122    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
123        self.compartment_id = compartment_id.to_string();
124        self
125    }
126
127    /// On-premises only: set the namespace for the operation.
128    pub fn namespace(mut self, namespace: &str) -> TableRequest {
129        self.namespace = namespace.to_string();
130        self
131    }
132
133    /// Set the DDL statement for the table operation.
134    ///
135    /// This is required, unless the operation is used solely to change the table
136    /// limits with [`TableRequest::limits()`].
137    pub fn statement(mut self, stmt: &str) -> TableRequest {
138        self.statement = stmt.to_string();
139        self
140    }
141
142    /// Cloud only: specify table limits for the table.
143    ///
144    /// This method can be used when creating a table, or later to change the
145    /// limits on an existing table.
146    pub fn limits(mut self, limits: &TableLimits) -> TableRequest {
147        self.limits = Some(limits.clone());
148        self
149    }
150
151    /// Cloud only: set a matching tag for the operation to succeed.
152    ///
153    /// This method sets an ETag in the request that must be matched for the operation
154    /// to proceed. The ETag must be non-empty and have been returned in a
155    /// previous [`TableResult`]. This is a form of optimistic concurrency
156    /// control, allowing an application to ensure that no unexpected modifications
157    /// have been made to the table.
158    pub fn match_etag(mut self, match_etag: &str) -> TableRequest {
159        self.match_etag = Some(match_etag.to_string());
160        self
161    }
162
163    /// Execute the table request.
164    ///
165    /// This starts the asynchronous execution of the request in the system. The returned result should be
166    /// used to wait for completion by calling [`TableResult::wait_for_completion()`].
167    pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
168        // TODO: validate
169        let mut w: Writer = Writer::new();
170        w.write_i16(h.inner.serial_version);
171        let timeout = h.get_timeout(&self.timeout);
172        self.nson_serialize(&mut w, &timeout);
173        let mut opts = SendOptions {
174            timeout: timeout,
175            retryable: false,
176            compartment_id: self.compartment_id.clone(),
177            ..Default::default()
178        };
179        let mut r = h.send_and_receive(w, &mut opts).await?;
180        let resp = TableRequest::nson_deserialize(&mut r)?;
181        Ok(resp)
182    }
183
184    pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
185        let mut ns = NsonSerializer::start_request(w);
186        ns.start_header();
187        ns.write_header(OpCode::TableRequest, timeout, &self.table_name);
188        ns.end_header();
189
190        // payload
191        ns.start_payload();
192        ns.write_string_field(STATEMENT, &self.statement);
193        ns.write_limits(&self.limits);
194        // TODO: freeform/defined tags
195        if let Some(etag) = &self.match_etag {
196            ns.write_string_field(ETAG, etag);
197        }
198        // TODO: these are currently only in http headers. Add to NSON?
199        //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
200        //ns.write_string_field(NAMESPACE, &self.namespace);
201        ns.end_payload();
202
203        ns.end_request();
204    }
205
206    pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<TableResult, NoSQLError> {
207        let mut walker = MapWalker::new(r)?;
208        let mut res: TableResult = Default::default();
209        while walker.has_next() {
210            walker.next()?;
211            let name = walker.current_name();
212            match name.as_str() {
213                ERROR_CODE => {
214                    walker.handle_error_code()?;
215                }
216                COMPARTMENT_OCID => {
217                    res.compartment_id = walker.read_nson_string()?;
218                    //println!(" comp_id={:?}", res.compartment_id);
219                }
220                NAMESPACE => {
221                    res.namespace = walker.read_nson_string()?;
222                    //println!(" namespace={:?}", res.namespace);
223                }
224                TABLE_OCID => {
225                    res.table_ocid = walker.read_nson_string()?;
226                    //println!(" table_ocid={:?}", res.table_ocid);
227                }
228                TABLE_NAME => {
229                    res.table_name = walker.read_nson_string()?;
230                    //println!(" table_name={:?}", res.table_name);
231                }
232                TABLE_SCHEMA => {
233                    res.schema = walker.read_nson_string()?;
234                    //println!(" schema={:?}", res.schema);
235                }
236                TABLE_DDL => {
237                    res.ddl = walker.read_nson_string()?;
238                    //println!(" ddl={:?}", res.ddl);
239                }
240                OPERATION_ID => {
241                    res.operation_id = walker.read_nson_string()?;
242                    //println!(" operation_id={:?}", res.operation_id);
243                }
244                LIMITS => {
245                    res.limits = Some(walker.read_nson_limits()?);
246                    //println!(" limits={:?}", res.limits);
247                }
248                TABLE_STATE => {
249                    let s = walker.read_nson_i32()?;
250                    res.state = TableState::from_int(s)?;
251                    //println!(" state={:?}", res.state);
252                }
253                ETAG => {
254                    res.match_etag = Some(walker.read_nson_string()?);
255                }
256                _ => {
257                    //println!("   table_result: skipping field '{}'", name);
258                    walker.skip_nson_field()?;
259                }
260            }
261        }
262        Ok(res)
263    }
264}
265
266impl NsonRequest for TableRequest {
267    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
268        self.nson_serialize(w, timeout);
269    }
270}
271
272impl GetTableRequest {
273    pub fn new(table_name: &str) -> GetTableRequest {
274        GetTableRequest {
275            table_name: table_name.to_string(),
276            ..Default::default()
277        }
278    }
279
280    /// Specify the timeout value for the request.
281    ///
282    /// This is optional.
283    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
284    /// IllegalArgument error will be returned.
285    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
286    pub fn timeout(mut self, t: &Duration) -> Self {
287        self.timeout = Some(t.clone());
288        self
289    }
290
291    /// Cloud Service only: set the name or id of a compartment to be used for this operation.
292    ///
293    /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
294    /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
295    /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
296    /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
297    ///
298    /// If no compartment is given, the root compartment of the tenancy will be used.
299    pub fn compartment_id(mut self, compartment_id: &str) -> Self {
300        self.compartment_id = compartment_id.to_string();
301        self
302    }
303
304    pub fn operation_id(mut self, op_id: &str) -> GetTableRequest {
305        self.operation_id = op_id.to_string();
306        self
307    }
308
309    /// On-premises only: set the namespace for the operation.
310    pub fn namespace(mut self, namespace: &str) -> GetTableRequest {
311        self.namespace = namespace.to_string();
312        self
313    }
314
315    pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
316        // TODO: validate
317        let mut w: Writer = Writer::new();
318        w.write_i16(h.inner.serial_version);
319        let timeout = h.get_timeout(&self.timeout);
320        self.nson_serialize(&mut w, &timeout);
321        let mut opts = SendOptions {
322            timeout: timeout,
323            retryable: true,
324            compartment_id: self.compartment_id.clone(),
325            namespace: self.namespace.clone(),
326            ..Default::default()
327        };
328        let mut r = h.send_and_receive(w, &mut opts).await?;
329        let resp = TableRequest::nson_deserialize(&mut r)?;
330        Ok(resp)
331    }
332
333    pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
334        let mut ns = NsonSerializer::start_request(w);
335        ns.start_header();
336        ns.write_header(OpCode::GetTable, timeout, &self.table_name);
337        ns.end_header();
338
339        // payload
340        ns.start_payload();
341        ns.write_string_field(OPERATION_ID, &self.operation_id);
342        // TODO: these are currently only in http headers. Add to NSON?
343        //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
344        //ns.write_string_field(NAMESPACE, &self.namespace);
345        ns.end_payload();
346
347        ns.end_request();
348    }
349}
350
351impl NsonRequest for GetTableRequest {
352    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
353        self.nson_serialize(w, timeout);
354    }
355}
356
357impl TableResult {
358    /// Wait for a TableRequest to complete.
359    ///
360    /// This method will loop, polling the system for the status of the SystemRequest
361    /// until it either succeeds, gets an error, or times out.
362    pub async fn wait_for_completion(
363        &mut self,
364        h: &Handle,
365        wait: Duration,
366        delay: Duration,
367    ) -> Result<(), NoSQLError> {
368        if self.is_terminal() {
369            return Ok(());
370        }
371        if wait < delay {
372            return ia_err!("wait duration must be greater than delay duration");
373        }
374
375        let start_time = Instant::now();
376        let mut first_loop = true;
377
378        while self.is_terminal() == false {
379            //println!("  table-request: elapsed={:?}", start_time.elapsed());
380            if start_time.elapsed() > wait {
381                return Err(NoSQLError::new(
382                    RequestTimeout,
383                    "operation not completed in expected time",
384                ));
385            }
386
387            let get_request = GetTableRequest::new(self.table_name.as_str())
388                .operation_id(self.operation_id.as_str())
389                .compartment_id(self.compartment_id.as_str())
390                .namespace(self.namespace.as_str());
391            // TODO: namespace? Java sdk doesn't add it...?
392
393            if !first_loop {
394                sleep(delay);
395            }
396
397            let res = get_request.execute(h).await?;
398
399            // TODO: copy_most method?
400            self.state = res.state;
401            self.limits = res.limits;
402            self.schema = res.schema;
403            self.ddl = res.ddl;
404            // TODO: tags, MRT data
405
406            first_loop = false;
407        }
408
409        Ok(())
410    }
411
412    /// Wait for a TableRequest to complete.
413    ///
414    /// This method will loop, polling the system for the status of the SystemRequest
415    /// until it either succeeds, gets an error, or times out.
416    ///
417    /// This is a convenience method to allow direct millisecond values instead of creating
418    /// `Duration` structs.
419    pub async fn wait_for_completion_ms(
420        &mut self,
421        h: &Handle,
422        wait_ms: u64,
423        delay_ms: u64,
424    ) -> Result<(), NoSQLError> {
425        self.wait_for_completion(
426            h,
427            Duration::from_millis(wait_ms),
428            Duration::from_millis(delay_ms),
429        )
430        .await
431    }
432
433    fn is_terminal(&self) -> bool {
434        self.state == TableState::Active || self.state == TableState::Dropped
435    }
436
437    /// Get the table name.
438    ///
439    /// This is only valid for [`GetTableRequest`].
440    pub fn table_name(&self) -> String {
441        self.table_name.clone()
442    }
443    /// Cloud only: get the compartment id of the table.
444    ///
445    /// This is only valid for [`GetTableRequest`].
446    pub fn compartment_id(&self) -> String {
447        self.compartment_id.clone()
448    }
449    /// On-premises only: get the namespace of the table.
450    ///
451    /// This is only valid for [`GetTableRequest`].
452    pub fn namespace(&self) -> String {
453        self.namespace.clone()
454    }
455    /// Cloud only: get the OCID of the table.
456    ///
457    /// This is only valid for [`GetTableRequest`].
458    pub fn table_ocid(&self) -> String {
459        self.table_ocid.clone()
460    }
461    /// Get the DDL statement that was used to create the table.
462    ///
463    /// Note this will reflect any `ALTER TABLE` operations as well.
464    pub fn ddl(&self) -> String {
465        self.ddl.clone()
466    }
467    /// Get the internal operation ID for an in-progress table request.
468    ///
469    /// This is typically not needed by applications; it is available for testing purposes only.
470    /// Internally, [`TableResult::wait_for_completion()`] uses this value when polling the system.
471    pub fn operation_id(&self) -> String {
472        self.operation_id.clone()
473    }
474    /// Get the schema of the table.
475    ///
476    /// Note this will reflect any `ALTER TABLE` operations as well.
477    pub fn schema(&self) -> String {
478        self.schema.clone()
479    }
480    /// Get the current state of the table.
481    pub fn state(&self) -> TableState {
482        self.state.clone()
483    }
484    /// Cloud only: get the table limits.
485    pub fn limits(&self) -> Option<TableLimits> {
486        if let Some(l) = &self.limits {
487            return Some(l.clone());
488        }
489        None
490    }
491    /// Cloud only: get the match ETag for the table.
492    ///
493    /// see [`TableRequest::match_etag()`] for more details.
494    pub fn match_etag(&self) -> Option<String> {
495        if let Some(etag) = &self.match_etag {
496            return Some(etag.clone());
497        }
498        None
499    }
500}