oracle_nosql_rust_sdk/
system_request.rs

1//
2// Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3//
4// Licensed under the Universal Permissive License v 1.0 as shown at
5//  https://oss.oracle.com/licenses/upl/
6//
7use crate::error::NoSQLErrorCode::RequestTimeout;
8use crate::error::{ia_err, NoSQLError};
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::reader::Reader;
13use crate::types::{OpCode, OperationState};
14use crate::writer::Writer;
15use std::result::Result;
16use std::thread::sleep;
17use std::time::{Duration, Instant};
18
19/// Struct used for on-premise-only requests.
20///
21/// This is used to perform any table-independent administrative operation such as
22/// create/drop of namespaces and security-relevant operations (create/drop
23/// users and roles). These operations are asynchronous and completion needs
24/// to be checked.
25///
26/// Examples of statements used in this object include:
27///  - CREATE NAMESPACE mynamespace
28///  - CREATE USER some_user IDENTIFIED BY password
29///  - CREATE ROLE some_role
30///  - GRANT ROLE some_role TO USER some_user
31///
32/// Execution of operations specified by this request are implicitly asynchronous.
33/// These are potentially long-running operations.
34/// [`SystemRequest::execute()`] returns a [`SystemResult`] instance that
35/// can be used to poll until the operation succeeds or fails.
36#[derive(Default, Debug)]
37pub struct SystemRequest {
38    pub(crate) statement: String,
39    pub(crate) timeout: Option<Duration>,
40}
41
42/// Struct used to query the status of an in-progress [`SystemRequest`].
43#[derive(Default, Debug)]
44pub(crate) struct SystemStatusRequest {
45    pub operation_id: String,
46    pub timeout: Option<Duration>,
47}
48
49/// Struct representing the result of a [`SystemRequest`].
50#[derive(Default, Debug)]
51pub struct SystemResult {
52    pub(crate) operation_id: String, // TODO: Option<>?
53    pub(crate) state: OperationState,
54    pub(crate) statement: String,
55    pub(crate) result_string: String,
56}
57
58impl SystemRequest {
59    /// Create a new SystemRequest. `statement` must be non-empty.
60    pub fn new(statement: &str) -> SystemRequest {
61        SystemRequest {
62            statement: statement.to_string(),
63            ..Default::default()
64        }
65    }
66
67    /// Specify the timeout value for the request.
68    ///
69    /// This is optional.
70    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
71    /// IllegalArgument error will be returned.
72    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
73    pub fn timeout(mut self, t: &Duration) -> Self {
74        self.timeout = Some(t.clone());
75        self
76    }
77
78    /// Execute the system request.
79    ///
80    /// This starts the asynchronous execution of the request in the system. The returned result should be
81    /// used to wait for completion by calling [`SystemResult::wait_for_completion()`].
82    pub async fn execute(&self, h: &Handle) -> Result<SystemResult, NoSQLError> {
83        // TODO: validate
84        let mut w: Writer = Writer::new();
85        w.write_i16(h.inner.serial_version);
86        let timeout = h.get_timeout(&self.timeout);
87        self.nson_serialize(&mut w, &timeout);
88        let mut opts = SendOptions {
89            timeout: timeout,
90            retryable: false,
91            ..Default::default()
92        };
93        let mut r = h.send_and_receive(w, &mut opts).await?;
94        let resp = SystemRequest::nson_deserialize(&mut r)?;
95        Ok(resp)
96    }
97
98    pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
99        let mut ns = NsonSerializer::start_request(w);
100        ns.start_header();
101        ns.write_header(OpCode::SystemRequest, timeout, "");
102        ns.end_header();
103
104        // payload
105        ns.start_payload();
106        ns.write_string_field(STATEMENT, &self.statement);
107        ns.end_payload();
108
109        ns.end_request();
110    }
111
112    pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<SystemResult, NoSQLError> {
113        let mut walker = MapWalker::new(r)?;
114        let mut res: SystemResult = Default::default();
115        while walker.has_next() {
116            walker.next()?;
117            let name = walker.current_name();
118            match name.as_str() {
119                ERROR_CODE => {
120                    walker.handle_error_code()?;
121                }
122                OPERATION_ID => {
123                    res.operation_id = walker.read_nson_string()?;
124                    //println!(" operation_id={:?}", res.operation_id);
125                }
126                STATEMENT => {
127                    res.statement = walker.read_nson_string()?;
128                    //println!(" statement={:?}", res.statement);
129                }
130                SYSOP_RESULT => {
131                    res.result_string = walker.read_nson_string()?;
132                    //println!(" result_string={:?}", res.result_string);
133                }
134                SYSOP_STATE => {
135                    let s = walker.read_nson_i32()?;
136                    res.state = OperationState::from_int(s)?;
137                    //println!(" state={:?}", res.state);
138                }
139                _ => {
140                    //println!("   system_result: skipping field '{}'", name);
141                    walker.skip_nson_field()?;
142                }
143            }
144        }
145        Ok(res)
146    }
147}
148
149impl NsonRequest for SystemRequest {
150    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
151        self.nson_serialize(w, timeout);
152    }
153}
154
155impl SystemStatusRequest {
156    pub fn new(operation_id: &str) -> SystemStatusRequest {
157        SystemStatusRequest {
158            operation_id: operation_id.to_string(),
159            ..Default::default()
160        }
161    }
162
163    /// Specify the timeout value for the request.
164    ///
165    /// This is optional.
166    /// If set, it must be greater than or equal to 1 millisecond, otherwise an
167    /// IllegalArgument error will be returned.
168    /// If not set, the default timeout value configured for the [`Handle`](crate::HandleBuilder::timeout()) is used.
169    #[allow(dead_code)]
170    pub fn timeout(mut self, t: &Duration) -> Self {
171        self.timeout = Some(t.clone());
172        self
173    }
174
175    pub async fn execute(&self, h: &Handle) -> Result<SystemResult, NoSQLError> {
176        // TODO: validate
177        let mut w: Writer = Writer::new();
178        w.write_i16(h.inner.serial_version);
179        let timeout = h.get_timeout(&self.timeout);
180        self.nson_serialize(&mut w, &timeout);
181        let mut opts = SendOptions {
182            timeout: timeout,
183            retryable: true,
184            ..Default::default()
185        };
186        let mut r = h.send_and_receive(w, &mut opts).await?;
187        let resp = SystemRequest::nson_deserialize(&mut r)?;
188        Ok(resp)
189    }
190
191    pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
192        let mut ns = NsonSerializer::start_request(w);
193        ns.start_header();
194        ns.write_header(OpCode::SystemStatusRequest, timeout, "");
195        ns.end_header();
196
197        // payload
198        ns.start_payload();
199        ns.write_string_field(OPERATION_ID, &self.operation_id);
200        ns.end_payload();
201
202        ns.end_request();
203    }
204}
205
206impl NsonRequest for SystemStatusRequest {
207    fn serialize(&self, w: &mut Writer, timeout: &Duration) {
208        self.nson_serialize(w, timeout);
209    }
210}
211
212impl SystemResult {
213    /// Wait for a SystemRequest to complete.
214    ///
215    /// This method will loop, polling the system for the status of the SystemRequest
216    /// until it either succeeds, gets an error, or times out.
217    pub async fn wait_for_completion(
218        &mut self,
219        h: &Handle,
220        wait: Duration,
221        delay: Duration,
222    ) -> Result<(), NoSQLError> {
223        if self.state == OperationState::Complete {
224            return Ok(());
225        }
226        if wait < delay {
227            return ia_err!("wait duration must be greater than delay duration");
228        }
229
230        let start_time = Instant::now();
231        let mut first_loop = true;
232
233        while self.state != OperationState::Complete {
234            if start_time.elapsed() > wait {
235                return Err(NoSQLError::new(
236                    RequestTimeout,
237                    "Operation not completed in expected time",
238                ));
239            }
240
241            if !first_loop {
242                sleep(delay);
243            }
244
245            let res = SystemStatusRequest::new(self.operation_id.as_str())
246                .execute(h)
247                .await?;
248
249            // operation_id and statement do not change
250            self.state = res.state;
251            self.result_string = res.result_string;
252
253            first_loop = false;
254        }
255
256        Ok(())
257    }
258
259    /// Wait for a SystemRequest to complete.
260    ///
261    /// This method will loop, polling the system for the status of the SystemRequest
262    /// until it either succeeds, gets an error, or times out.
263    ///
264    /// This is a convenience method to allow direct millisecond values instead of creating
265    /// `Duration` structs.
266    pub async fn wait_for_completion_ms(
267        &mut self,
268        h: &Handle,
269        wait_ms: u64,
270        delay_ms: u64,
271    ) -> Result<(), NoSQLError> {
272        self.wait_for_completion(
273            h,
274            Duration::from_millis(wait_ms),
275            Duration::from_millis(delay_ms),
276        )
277        .await
278    }
279
280    pub fn operation_id(&self) -> String {
281        self.operation_id.clone()
282    }
283
284    pub fn state(&self) -> OperationState {
285        self.state.clone()
286    }
287
288    pub fn statement(&self) -> String {
289        self.statement.clone()
290    }
291
292    pub fn result_string(&self) -> String {
293        self.result_string.clone()
294    }
295}