oracle_nosql_rust_sdk/table_request.rs
1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5// https://oss.oracle.com/licenses/upl/
6//
7use crate::error::NoSQLErrorCode::RequestTimeout;
8use crate::error::{ia_err, NoSQLError};
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::reader::Reader;
13use crate::types::{OpCode, TableLimits, TableState};
14use crate::writer::Writer;
15use std::result::Result;
16use std::thread::sleep;
17use std::time::{Duration, Instant};
18
19/// Struct used for creating or modifying a table in the NoSQL Database.
20///
21/// This is the main method for creating, altering, and dropping tables in the
22/// NoSQL Database. It can also be used to alter table limits for Cloud operation.
23///
24/// Example:
25/// ```no_run
26/// use oracle_nosql_rust_sdk::TableRequest;
27/// use oracle_nosql_rust_sdk::types::*;
28/// # use oracle_nosql_rust_sdk::Handle;
29/// # use std::error::Error;
30/// # #[tokio::main]
31/// # pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
32/// # let handle = Handle::builder().build().await?;
33/// // Create an example table
34/// TableRequest::new("testusers")
35/// .statement(
36/// "create table if not exists testusers (id integer, name string,
37/// created timestamp(3), primary key(id))",
38/// )
39/// // the following line is only needed for Cloud mode
40/// .limits(&TableLimits::provisioned(1000, 1000, 10))
41/// .execute(&handle)
42/// .await?
43/// // wait up to 15 seconds for table to be created
44/// .wait_for_completion_ms(&handle, 15000, 500)
45/// .await?;
46/// # Ok(())
47/// # }
48///```
49
50#[derive(Default, Debug)]
51pub struct TableRequest {
52 pub(crate) table_name: String,
53 pub(crate) compartment_id: String,
54 pub(crate) namespace: String,
55 pub(crate) timeout: Option<Duration>,
56 pub(crate) statement: String,
57 pub(crate) limits: Option<TableLimits>,
58 pub(crate) match_etag: Option<String>,
59 // TODO: tags
60}
61
62/// Struct used to get information about a table in the NoSQL Database.
63#[derive(Default, Debug)]
64pub struct GetTableRequest {
65 pub(crate) table_name: String,
66 pub(crate) compartment_id: String,
67 pub(crate) namespace: String,
68 pub(crate) operation_id: String,
69 pub(crate) timeout: Option<Duration>,
70 // TODO: tags
71}
72
73/// Struct representing the result of a [`TableRequest`] or a [`GetTableRequest`].
74#[derive(Default, Debug)]
75pub struct TableResult {
76 pub(crate) table_name: String,
77 pub(crate) compartment_id: String, // TODO: Option<>?
78 pub(crate) namespace: String, // TODO: Option<>?
79 pub(crate) table_ocid: String,
80 pub(crate) ddl: String,
81 pub(crate) operation_id: String, // TODO: Option<>?
82 pub(crate) schema: String,
83 pub(crate) state: TableState,
84 pub(crate) limits: Option<TableLimits>,
85 pub(crate) match_etag: Option<String>,
86 // TODO: MRT fields
87}
88
89impl TableRequest {
90 /// Create a new TableRequest.
91 ///
92 /// `table_name` is required and must be non-empty.
93 pub fn new(table_name: &str) -> TableRequest {
94 TableRequest {
95 table_name: table_name.to_string(),
96 ..Default::default()
97 }
98 }
99
100 /// Specify the timeout value for the request.
101 ///
102 /// This is optional.
103 /// If set, it must be greater than or equal to 1 millisecond, otherwise an
104 /// IllegalArgument error will be returned.
105 /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
106 ///
107 /// Note this is just the timeout for the initial request. The actual operation may take significantly longer,
108 /// and its completion should be waited for by calling [`TableResult::wait_for_completion()`].
109 pub fn timeout(mut self, t: &Duration) -> Self {
110 self.timeout = Some(t.clone());
111 self
112 }
113
114 /// Cloud Service only: set the name or id of a compartment to be used for this operation.
115 ///
116 /// If the associated handle authenticated as an Instance Principal, this value must be an OCID.
117 /// In all other cases, the value may be specified as either a name (or path for nested compartments) or as an OCID.
118 ///
119 /// If no compartment is given, the default compartment id for the handle is used. If that value was
120 /// not specified, the root compartment of the tenancy will be used.
121 pub fn compartment_id(mut self, compartment_id: &str) -> Self {
122 self.compartment_id = compartment_id.to_string();
123 self
124 }
125
126 /// On-premises only: set the namespace for the operation.
127 pub fn namespace(mut self, namespace: &str) -> TableRequest {
128 self.namespace = namespace.to_string();
129 self
130 }
131
132 /// Set the DDL statement for the table operation.
133 ///
134 /// This is required, unless the operation is used solely to change the table
135 /// limits with [`TableRequest::limits()`].
136 pub fn statement(mut self, stmt: &str) -> TableRequest {
137 self.statement = stmt.to_string();
138 self
139 }
140
141 /// Cloud only: specify table limits for the table.
142 ///
143 /// This method can be used when creating a table, or later to change the
144 /// limits on an existing table.
145 pub fn limits(mut self, limits: &TableLimits) -> TableRequest {
146 self.limits = Some(limits.clone());
147 self
148 }
149
150 /// Cloud only: set a matching tag for the operation to succeed.
151 ///
152 /// This method sets an ETag in the request that must be matched for the operation
153 /// to proceed. The ETag must be non-empty and have been returned in a
154 /// previous [`TableResult`]. This is a form of optimistic concurrency
155 /// control, allowing an application to ensure that no unexpected modifications
156 /// have been made to the table.
157 pub fn match_etag(mut self, match_etag: &str) -> TableRequest {
158 self.match_etag = Some(match_etag.to_string());
159 self
160 }
161
162 /// Execute the table request.
163 ///
164 /// This starts the asynchronous execution of the request in the system. The returned result should be
165 /// used to wait for completion by calling [`TableResult::wait_for_completion()`].
166 pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
167 // TODO: validate
168 let mut w: Writer = Writer::new();
169 w.write_i16(h.inner.serial_version);
170 let timeout = h.get_timeout(&self.timeout);
171 self.nson_serialize(&mut w, &timeout);
172 let mut opts = SendOptions {
173 timeout: timeout,
174 retryable: false,
175 compartment_id: self.compartment_id.clone(),
176 ..Default::default()
177 };
178 let mut r = h.send_and_receive(w, &mut opts).await?;
179 let resp = TableRequest::nson_deserialize(&mut r)?;
180 Ok(resp)
181 }
182
183 pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
184 let mut ns = NsonSerializer::start_request(w);
185 ns.start_header();
186 ns.write_header(OpCode::TableRequest, timeout, &self.table_name);
187 ns.end_header();
188
189 // payload
190 ns.start_payload();
191 ns.write_string_field(STATEMENT, &self.statement);
192 ns.write_limits(&self.limits);
193 // TODO: freeform/defined tags
194 if let Some(etag) = &self.match_etag {
195 ns.write_string_field(ETAG, etag);
196 }
197 // TODO: these are currently only in http headers. Add to NSON?
198 //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
199 //ns.write_string_field(NAMESPACE, &self.namespace);
200 ns.end_payload();
201
202 ns.end_request();
203 }
204
205 pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<TableResult, NoSQLError> {
206 let mut walker = MapWalker::new(r)?;
207 let mut res: TableResult = Default::default();
208 while walker.has_next() {
209 walker.next()?;
210 let name = walker.current_name();
211 match name.as_str() {
212 ERROR_CODE => {
213 walker.handle_error_code()?;
214 }
215 COMPARTMENT_OCID => {
216 res.compartment_id = walker.read_nson_string()?;
217 //println!(" comp_id={:?}", res.compartment_id);
218 }
219 NAMESPACE => {
220 res.namespace = walker.read_nson_string()?;
221 //println!(" namespace={:?}", res.namespace);
222 }
223 TABLE_OCID => {
224 res.table_ocid = walker.read_nson_string()?;
225 //println!(" table_ocid={:?}", res.table_ocid);
226 }
227 TABLE_NAME => {
228 res.table_name = walker.read_nson_string()?;
229 //println!(" table_name={:?}", res.table_name);
230 }
231 TABLE_SCHEMA => {
232 res.schema = walker.read_nson_string()?;
233 //println!(" schema={:?}", res.schema);
234 }
235 TABLE_DDL => {
236 res.ddl = walker.read_nson_string()?;
237 //println!(" ddl={:?}", res.ddl);
238 }
239 OPERATION_ID => {
240 res.operation_id = walker.read_nson_string()?;
241 //println!(" operation_id={:?}", res.operation_id);
242 }
243 LIMITS => {
244 res.limits = Some(walker.read_nson_limits()?);
245 //println!(" limits={:?}", res.limits);
246 }
247 TABLE_STATE => {
248 let s = walker.read_nson_i32()?;
249 res.state = TableState::from_int(s)?;
250 //println!(" state={:?}", res.state);
251 }
252 ETAG => {
253 res.match_etag = Some(walker.read_nson_string()?);
254 }
255 _ => {
256 //println!(" table_result: skipping field '{}'", name);
257 walker.skip_nson_field()?;
258 }
259 }
260 }
261 Ok(res)
262 }
263}
264
265impl NsonRequest for TableRequest {
266 fn serialize(&self, w: &mut Writer, timeout: &Duration) {
267 self.nson_serialize(w, timeout);
268 }
269}
270
271impl GetTableRequest {
272 pub fn new(table_name: &str) -> GetTableRequest {
273 GetTableRequest {
274 table_name: table_name.to_string(),
275 ..Default::default()
276 }
277 }
278
279 /// Specify the timeout value for the request.
280 ///
281 /// This is optional.
282 /// If set, it must be greater than or equal to 1 millisecond, otherwise an
283 /// IllegalArgument error will be returned.
284 /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
285 pub fn timeout(mut self, t: &Duration) -> Self {
286 self.timeout = Some(t.clone());
287 self
288 }
289
290 /// Cloud Service only: set the name or id of a compartment to be used for this operation.
291 ///
292 /// If the associated handle authenticated as an Instance Principal, this value must be an OCID.
293 /// In all other cases, the value may be specified as either a name (or path for nested compartments) or as an OCID.
294 ///
295 /// If no compartment is given, the default compartment id for the handle is used. If that value was
296 /// not specified, the root compartment of the tenancy will be used.
297 pub fn compartment_id(mut self, compartment_id: &str) -> Self {
298 self.compartment_id = compartment_id.to_string();
299 self
300 }
301
302 pub fn operation_id(mut self, op_id: &str) -> GetTableRequest {
303 self.operation_id = op_id.to_string();
304 self
305 }
306
307 /// On-premises only: set the namespace for the operation.
308 pub fn namespace(mut self, namespace: &str) -> GetTableRequest {
309 self.namespace = namespace.to_string();
310 self
311 }
312
313 pub async fn execute(&self, h: &Handle) -> Result<TableResult, NoSQLError> {
314 // TODO: validate
315 let mut w: Writer = Writer::new();
316 w.write_i16(h.inner.serial_version);
317 let timeout = h.get_timeout(&self.timeout);
318 self.nson_serialize(&mut w, &timeout);
319 let mut opts = SendOptions {
320 timeout: timeout,
321 retryable: true,
322 compartment_id: self.compartment_id.clone(),
323 namespace: self.namespace.clone(),
324 ..Default::default()
325 };
326 let mut r = h.send_and_receive(w, &mut opts).await?;
327 let resp = TableRequest::nson_deserialize(&mut r)?;
328 Ok(resp)
329 }
330
331 pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
332 let mut ns = NsonSerializer::start_request(w);
333 ns.start_header();
334 ns.write_header(OpCode::GetTable, timeout, &self.table_name);
335 ns.end_header();
336
337 // payload
338 ns.start_payload();
339 ns.write_string_field(OPERATION_ID, &self.operation_id);
340 // TODO: these are currently only in http headers. Add to NSON?
341 //ns.write_string_field(COMPARTMENT_OCID, &self.compartment_id);
342 //ns.write_string_field(NAMESPACE, &self.namespace);
343 ns.end_payload();
344
345 ns.end_request();
346 }
347}
348
349impl NsonRequest for GetTableRequest {
350 fn serialize(&self, w: &mut Writer, timeout: &Duration) {
351 self.nson_serialize(w, timeout);
352 }
353}
354
355impl TableResult {
356 /// Wait for a TableRequest to complete.
357 ///
358 /// This method will loop, polling the system for the status of the SystemRequest
359 /// until it either succeeds, gets an error, or times out.
360 pub async fn wait_for_completion(
361 &mut self,
362 h: &Handle,
363 wait: Duration,
364 delay: Duration,
365 ) -> Result<(), NoSQLError> {
366 if self.is_terminal() {
367 return Ok(());
368 }
369 if wait < delay {
370 return ia_err!("wait duration must be greater than delay duration");
371 }
372
373 let start_time = Instant::now();
374 let mut first_loop = true;
375
376 while self.is_terminal() == false {
377 //println!(" table-request: elapsed={:?}", start_time.elapsed());
378 if start_time.elapsed() > wait {
379 return Err(NoSQLError::new(
380 RequestTimeout,
381 "operation not completed in expected time",
382 ));
383 }
384
385 let get_request = GetTableRequest::new(self.table_name.as_str())
386 .operation_id(self.operation_id.as_str())
387 .compartment_id(self.compartment_id.as_str())
388 .namespace(self.namespace.as_str());
389 // TODO: namespace? Java sdk doesn't add it...?
390
391 if !first_loop {
392 sleep(delay);
393 }
394
395 let res = get_request.execute(h).await?;
396
397 // TODO: copy_most method?
398 self.state = res.state;
399 self.limits = res.limits;
400 self.schema = res.schema;
401 self.ddl = res.ddl;
402 // TODO: tags, MRT data
403
404 first_loop = false;
405 }
406
407 Ok(())
408 }
409
410 /// Wait for a TableRequest to complete.
411 ///
412 /// This method will loop, polling the system for the status of the SystemRequest
413 /// until it either succeeds, gets an error, or times out.
414 ///
415 /// This is a convenience method to allow direct millisecond values instead of creating
416 /// `Duration` structs.
417 pub async fn wait_for_completion_ms(
418 &mut self,
419 h: &Handle,
420 wait_ms: u64,
421 delay_ms: u64,
422 ) -> Result<(), NoSQLError> {
423 self.wait_for_completion(
424 h,
425 Duration::from_millis(wait_ms),
426 Duration::from_millis(delay_ms),
427 )
428 .await
429 }
430
431 fn is_terminal(&self) -> bool {
432 self.state == TableState::Active || self.state == TableState::Dropped
433 }
434
435 /// Get the table name.
436 ///
437 /// This is only valid for [`GetTableRequest`].
438 pub fn table_name(&self) -> String {
439 self.table_name.clone()
440 }
441 /// Cloud only: get the compartment id of the table.
442 ///
443 /// This is only valid for [`GetTableRequest`].
444 pub fn compartment_id(&self) -> String {
445 self.compartment_id.clone()
446 }
447 /// On-premises only: get the namespace of the table.
448 ///
449 /// This is only valid for [`GetTableRequest`].
450 pub fn namespace(&self) -> String {
451 self.namespace.clone()
452 }
453 /// Cloud only: get the OCID of the table.
454 ///
455 /// This is only valid for [`GetTableRequest`].
456 pub fn table_ocid(&self) -> String {
457 self.table_ocid.clone()
458 }
459 /// Get the DDL statement that was used to create the table.
460 ///
461 /// Note this will reflect any `ALTER TABLE` operations as well.
462 pub fn ddl(&self) -> String {
463 self.ddl.clone()
464 }
465 /// Get the internal operation ID for an in-progress table request.
466 ///
467 /// This is typically not needed by applications; it is available for testing purposes only.
468 /// Internally, [`TableResult::wait_for_completion()`] uses this value when polling the system.
469 pub fn operation_id(&self) -> String {
470 self.operation_id.clone()
471 }
472 /// Get the schema of the table.
473 ///
474 /// Note this will reflect any `ALTER TABLE` operations as well.
475 pub fn schema(&self) -> String {
476 self.schema.clone()
477 }
478 /// Get the current state of the table.
479 pub fn state(&self) -> TableState {
480 self.state.clone()
481 }
482 /// Cloud only: get the table limits.
483 pub fn limits(&self) -> Option<TableLimits> {
484 if let Some(l) = &self.limits {
485 return Some(l.clone());
486 }
487 None
488 }
489 /// Cloud only: get the match ETag for the table.
490 ///
491 /// see [`TableRequest::match_etag()`] for more details.
492 pub fn match_etag(&self) -> Option<String> {
493 if let Some(etag) = &self.match_etag {
494 return Some(etag.clone());
495 }
496 None
497 }
498}