oracle_nosql_rust_sdk/table_request.rs
1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5// https://oss.oracle.com/licenses/upl/
6//
7use crate::error::NoSQLErrorCode::RequestTimeout;
8use crate::error::{ia_err, NoSQLError};
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::reader::Reader;
13use crate::types::{OpCode, TableLimits, TableState};
14use crate::writer::Writer;
15use std::result::Result;
16use std::thread::sleep;
17use std::time::{Duration, Instant};
18
19/// Struct used for creating or modifying a table in the NoSQL Database.
20///
21/// This is the main method for creating, altering, and dropping tables in the
22/// NoSQL Database. It can also be used to alter table limits for Cloud operation.
23///
24/// Example:
25/// ```no_run
26/// use oracle_nosql_rust_sdk::TableRequest;
27/// use oracle_nosql_rust_sdk::types::*;
28/// # use oracle_nosql_rust_sdk::Handle;
29/// # use std::error::Error;
30/// # #[tokio::main]
31/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
32/// # let handle = Handle::builder().build().await?;
33/// // Create an example table
34/// TableRequest::new("testusers")
35/// .statement(
36/// "create table if not exists testusers (id integer, name string,
37/// created timestamp(3), primary key(id))",
38/// )
39/// // the following line is only needed for Cloud mode
40/// .limits(&TableLimits::provisioned(1000, 1000, 10))
41/// .execute(&handle)
42/// .await?
43/// // wait up to 15 seconds for table to be created
44/// .wait_for_completion_ms(&handle, 15000, 500)
45/// .await?;
46/// # Ok(())
47/// # }
48///```
49
50#[derive(Default, Debug)]
51pub struct TableRequest {
52 pub(crate) table_name: String,
53 pub(crate) compartment_id: String,
54 pub(crate) namespace: String,
55 pub(crate) timeout: Option<Duration>,
56 pub(crate) statement: String,
57 pub(crate) limits: Option<TableLimits>,
58 pub(crate) match_etag: Option<String>,
59 // TODO: tags
60}
61
62/// Struct used to get information about a table in the NoSQL Database.
63#[derive(Default, Debug)]
64pub struct GetTableRequest {
65 pub(crate) table_name: String,
66 pub(crate) compartment_id: String,
67 pub(crate) namespace: String,
68 pub(crate) operation_id: String,
69 pub(crate) timeout: Option<Duration>,
70 // TODO: tags
71}
72
73/// Struct representing the result of a [`TableRequest`] or a [`GetTableRequest`].
74#[derive(Default, Debug)]
75pub struct TableResult {
76 pub(crate) table_name: String,
77 pub(crate) compartment_id: String, // TODO: Option<>?
78 pub(crate) namespace: String, // TODO: Option<>?
79 pub(crate) table_ocid: String,
80 pub(crate) ddl: String,
81 pub(crate) operation_id: String, // TODO: Option<>?
82 pub(crate) schema: String,
83 pub(crate) state: TableState,
84 pub(crate) limits: Option<TableLimits>,
85 pub(crate) match_etag: Option<String>,
86 // TODO: MRT fields
87}
88
89impl TableRequest {
90 /// Create a new TableRequest.
91 ///
92 /// `table_name` is required and must be non-empty.
93 pub fn new(table_name: &str) -> TableRequest {
94 TableRequest {
95 table_name: table_name.to_string(),
96 ..Default::default()
97 }
98 }
99
100 /// Specify the timeout value for the request.
101 ///
102 /// This is optional.
103 /// If set, it must be greater than or equal to 1 millisecond, otherwise an
104 /// IllegalArgument error will be returned.
105 /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
106 ///
107 /// Note this is just the timeout for the initial request. The actual operation may take significantly longer,
108 /// and its completion should be waited for by calling [`TableResult::wait_for_completion()`].
109 pub fn timeout(mut self, t: &Duration) -> Self {
110 self.timeout = Some(t.clone());
111 self
112 }
113
114 /// Cloud Service only: set the name or id of a compartment to be used for this operation.
115 ///
116 /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
117 /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
118 /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
119 /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
120 ///
121 /// If no compartment is given, the root compartment of the tenancy will be used.
122 pub fn compartment_id(mut self, compartment_id: &str) -> Self {
123 self.compartment_id = compartment_id.to_string();
124 self
125 }
126
127 /// On-premises only: set the namespace for the operation.
128 pub fn namespace(mut self, namespace: &str) -> TableRequest {
129 self.namespace = namespace.to_string();
130 self
131 }
132
133 /// Set the DDL statement for the table operation.
134 ///
135 /// This is required, unless the operation is used solely to change the table
136 /// limits with [`TableRequest::limits()`].
137 pub fn statement(mut self, stmt: &str) -> TableRequest {
138 self.statement = stmt.to_string();
139 self
140 }
141
142 /// Cloud only: specify table limits for the table.
143 ///
144 /// This method can be used when creating a table, or later to change the
145 /// limits on an existing table.
146 pub fn limits(mut self, limits: &TableLimits) -> TableRequest {
147 self.limits = Some(limits.clone());
148 self
149 }
150
151 /// Cloud only: set a matching tag for the operation to succeed.
152 ///
153 /// This method sets an ETag in the request that must be matched for the operation
154 /// to proceed. The ETag must be non-empty and have been returned in a
155 /// previous [`TableResult`]. This is a form of optimistic concurrency
156 /// control, allowing an application to ensure that no unexpected modifications
157 /// have been made to the table.
158 pub fn match_etag(mut self, match_etag: &str) -> TableRequest {
159 self.match_etag = Some(match_etag.to_string());
160 self
161 }
162
163 /// Execute the table request.
164 ///
165 /// This starts the asynchronous execution of the request in the system. The returned result should be
166 /// used to wait for completion by calling [`TableResult::wait_for_completion()`].
167 pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
168 // TODO: validate
169 let mut w: Writer = Writer::new();
170 w.write_i16(h.inner.serial_version);
171 let timeout = h.get_timeout(&self.timeout);
172 self.nson_serialize(&mut w, &timeout);
173 let mut opts = SendOptions {
174 timeout: timeout,
175 retryable: false,
176 compartment_id: self.compartment_id.clone(),
177 ..Default::default()
178 };
179 let mut r = h.send_and_receive(w, &mut opts).await?;
180 let resp = TableRequest::nson_deserialize(&mut r)?;
181 Ok(resp)
182 }
183
184 pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
185 let mut ns = NsonSerializer::start_request(w);
186 ns.start_header();
187 ns.write_header(OpCode::TableRequest, timeout, &self.table_name);
188 ns.end_header();
189
190 // payload
191 ns.start_payload();
192 ns.write_string_field(STATEMENT, &self.statement);
193 ns.write_limits(&self.limits);
194 // TODO: freeform/defined tags
195 if let Some(etag) = &self.match_etag {
196 ns.write_string_field(ETAG, etag);
197 }
198 // TODO: these are currently only in http headers. Add to NSON?
199 //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
200 //ns.write_string_field(NAMESPACE, &self.namespace);
201 ns.end_payload();
202
203 ns.end_request();
204 }
205
206 pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<TableResult, NoSQLError> {
207 let mut walker = MapWalker::new(r)?;
208 let mut res: TableResult = Default::default();
209 while walker.has_next() {
210 walker.next()?;
211 let name = walker.current_name();
212 match name.as_str() {
213 ERROR_CODE => {
214 walker.handle_error_code()?;
215 }
216 COMPARTMENT_OCID => {
217 res.compartment_id = walker.read_nson_string()?;
218 //println!(" comp_id={:?}", res.compartment_id);
219 }
220 NAMESPACE => {
221 res.namespace = walker.read_nson_string()?;
222 //println!(" namespace={:?}", res.namespace);
223 }
224 TABLE_OCID => {
225 res.table_ocid = walker.read_nson_string()?;
226 //println!(" table_ocid={:?}", res.table_ocid);
227 }
228 TABLE_NAME => {
229 res.table_name = walker.read_nson_string()?;
230 //println!(" table_name={:?}", res.table_name);
231 }
232 TABLE_SCHEMA => {
233 res.schema = walker.read_nson_string()?;
234 //println!(" schema={:?}", res.schema);
235 }
236 TABLE_DDL => {
237 res.ddl = walker.read_nson_string()?;
238 //println!(" ddl={:?}", res.ddl);
239 }
240 OPERATION_ID => {
241 res.operation_id = walker.read_nson_string()?;
242 //println!(" operation_id={:?}", res.operation_id);
243 }
244 LIMITS => {
245 res.limits = Some(walker.read_nson_limits()?);
246 //println!(" limits={:?}", res.limits);
247 }
248 TABLE_STATE => {
249 let s = walker.read_nson_i32()?;
250 res.state = TableState::from_int(s)?;
251 //println!(" state={:?}", res.state);
252 }
253 ETAG => {
254 res.match_etag = Some(walker.read_nson_string()?);
255 }
256 _ => {
257 //println!(" table_result: skipping field '{}'", name);
258 walker.skip_nson_field()?;
259 }
260 }
261 }
262 Ok(res)
263 }
264}
265
266impl NsonRequest for TableRequest {
267 fn serialize(&self, w: &mut Writer, timeout: &Duration) {
268 self.nson_serialize(w, timeout);
269 }
270}
271
272impl GetTableRequest {
273 pub fn new(table_name: &str) -> GetTableRequest {
274 GetTableRequest {
275 table_name: table_name.to_string(),
276 ..Default::default()
277 }
278 }
279
280 /// Specify the timeout value for the request.
281 ///
282 /// This is optional.
283 /// If set, it must be greater than or equal to 1 millisecond, otherwise an
284 /// IllegalArgument error will be returned.
285 /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
286 pub fn timeout(mut self, t: &Duration) -> Self {
287 self.timeout = Some(t.clone());
288 self
289 }
290
291 /// Cloud Service only: set the name or id of a compartment to be used for this operation.
292 ///
293 /// The compartment may be specified as either a name (or path for nested compartments) or as an id (OCID).
294 /// A name (vs id) can only be used when authenticated using a specific user identity. It is not available if
295 /// the associated handle authenticated as an Instance Principal (which can be done when calling the service from
296 /// a compute instance in the Oracle Cloud Infrastructure: see [`HandleBuilder::cloud_auth_from_instance()`](crate::HandleBuilder::cloud_auth_from_instance()).)
297 ///
298 /// If no compartment is given, the root compartment of the tenancy will be used.
299 pub fn compartment_id(mut self, compartment_id: &str) -> Self {
300 self.compartment_id = compartment_id.to_string();
301 self
302 }
303
304 pub fn operation_id(mut self, op_id: &str) -> GetTableRequest {
305 self.operation_id = op_id.to_string();
306 self
307 }
308
309 /// On-premises only: set the namespace for the operation.
310 pub fn namespace(mut self, namespace: &str) -> GetTableRequest {
311 self.namespace = namespace.to_string();
312 self
313 }
314
315 pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
316 // TODO: validate
317 let mut w: Writer = Writer::new();
318 w.write_i16(h.inner.serial_version);
319 let timeout = h.get_timeout(&self.timeout);
320 self.nson_serialize(&mut w, &timeout);
321 let mut opts = SendOptions {
322 timeout: timeout,
323 retryable: true,
324 compartment_id: self.compartment_id.clone(),
325 namespace: self.namespace.clone(),
326 ..Default::default()
327 };
328 let mut r = h.send_and_receive(w, &mut opts).await?;
329 let resp = TableRequest::nson_deserialize(&mut r)?;
330 Ok(resp)
331 }
332
333 pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
334 let mut ns = NsonSerializer::start_request(w);
335 ns.start_header();
336 ns.write_header(OpCode::GetTable, timeout, &self.table_name);
337 ns.end_header();
338
339 // payload
340 ns.start_payload();
341 ns.write_string_field(OPERATION_ID, &self.operation_id);
342 // TODO: these are currently only in http headers. Add to NSON?
343 //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
344 //ns.write_string_field(NAMESPACE, &self.namespace);
345 ns.end_payload();
346
347 ns.end_request();
348 }
349}
350
351impl NsonRequest for GetTableRequest {
352 fn serialize(&self, w: &mut Writer, timeout: &Duration) {
353 self.nson_serialize(w, timeout);
354 }
355}
356
357impl TableResult {
358 /// Wait for a TableRequest to complete.
359 ///
360 /// This method will loop, polling the system for the status of the SystemRequest
361 /// until it either succeeds, gets an error, or times out.
362 pub async fn wait_for_completion(
363 &mut self,
364 h: &Handle,
365 wait: Duration,
366 delay: Duration,
367 ) -> Result<(), NoSQLError> {
368 if self.is_terminal() {
369 return Ok(());
370 }
371 if wait < delay {
372 return ia_err!("wait duration must be greater than delay duration");
373 }
374
375 let start_time = Instant::now();
376 let mut first_loop = true;
377
378 while self.is_terminal() == false {
379 //println!(" table-request: elapsed={:?}", start_time.elapsed());
380 if start_time.elapsed() > wait {
381 return Err(NoSQLError::new(
382 RequestTimeout,
383 "operation not completed in expected time",
384 ));
385 }
386
387 let get_request = GetTableRequest::new(self.table_name.as_str())
388 .operation_id(self.operation_id.as_str())
389 .compartment_id(self.compartment_id.as_str())
390 .namespace(self.namespace.as_str());
391 // TODO: namespace? Java sdk doesn't add it...?
392
393 if !first_loop {
394 sleep(delay);
395 }
396
397 let res = get_request.execute(h).await?;
398
399 // TODO: copy_most method?
400 self.state = res.state;
401 self.limits = res.limits;
402 self.schema = res.schema;
403 self.ddl = res.ddl;
404 // TODO: tags, MRT data
405
406 first_loop = false;
407 }
408
409 Ok(())
410 }
411
412 /// Wait for a TableRequest to complete.
413 ///
414 /// This method will loop, polling the system for the status of the SystemRequest
415 /// until it either succeeds, gets an error, or times out.
416 ///
417 /// This is a convenience method to allow direct millisecond values instead of creating
418 /// `Duration` structs.
419 pub async fn wait_for_completion_ms(
420 &mut self,
421 h: &Handle,
422 wait_ms: u64,
423 delay_ms: u64,
424 ) -> Result<(), NoSQLError> {
425 self.wait_for_completion(
426 h,
427 Duration::from_millis(wait_ms),
428 Duration::from_millis(delay_ms),
429 )
430 .await
431 }
432
433 fn is_terminal(&self) -> bool {
434 self.state == TableState::Active || self.state == TableState::Dropped
435 }
436
437 /// Get the table name.
438 ///
439 /// This is only valid for [`GetTableRequest`].
440 pub fn table_name(&self) -> String {
441 self.table_name.clone()
442 }
443 /// Cloud only: get the compartment id of the table.
444 ///
445 /// This is only valid for [`GetTableRequest`].
446 pub fn compartment_id(&self) -> String {
447 self.compartment_id.clone()
448 }
449 /// On-premises only: get the namespace of the table.
450 ///
451 /// This is only valid for [`GetTableRequest`].
452 pub fn namespace(&self) -> String {
453 self.namespace.clone()
454 }
455 /// Cloud only: get the OCID of the table.
456 ///
457 /// This is only valid for [`GetTableRequest`].
458 pub fn table_ocid(&self) -> String {
459 self.table_ocid.clone()
460 }
461 /// Get the DDL statement that was used to create the table.
462 ///
463 /// Note this will reflect any `ALTER TABLE` operations as well.
464 pub fn ddl(&self) -> String {
465 self.ddl.clone()
466 }
467 /// Get the internal operation ID for an in-progress table request.
468 ///
469 /// This is typically not needed by applications; it is available for testing purposes only.
470 /// Internally, [`TableResult::wait_for_completion()`] uses this value when polling the system.
471 pub fn operation_id(&self) -> String {
472 self.operation_id.clone()
473 }
474 /// Get the schema of the table.
475 ///
476 /// Note this will reflect any `ALTER TABLE` operations as well.
477 pub fn schema(&self) -> String {
478 self.schema.clone()
479 }
480 /// Get the current state of the table.
481 pub fn state(&self) -> TableState {
482 self.state.clone()
483 }
484 /// Cloud only: get the table limits.
485 pub fn limits(&self) -> Option<TableLimits> {
486 if let Some(l) = &self.limits {
487 return Some(l.clone());
488 }
489 None
490 }
491 /// Cloud only: get the match ETag for the table.
492 ///
493 /// see [`TableRequest::match_etag()`] for more details.
494 pub fn match_etag(&self) -> Option<String> {
495 if let Some(etag) = &self.match_etag {
496 return Some(etag.clone());
497 }
498 None
499 }
500}