oracle_nosql_rust_sdk/
system_request.rs1use crate::error::NoSQLErrorCode::RequestTimeout;
8use crate::error::{ia_err, NoSQLError};
9use crate::handle::Handle;
10use crate::handle::SendOptions;
11use crate::nson::*;
12use crate::reader::Reader;
13use crate::types::{OpCode, OperationState};
14use crate::writer::Writer;
15use std::result::Result;
16use std::thread::sleep;
17use std::time::{Duration, Instant};
18
19#[derive(Default, Debug)]
37pub struct SystemRequest {
38 pub(crate) statement: String,
39 pub(crate) timeout: Option<Duration>,
40}
41
42#[derive(Default, Debug)]
44pub(crate) struct SystemStatusRequest {
45 pub operation_id: String,
46 pub timeout: Option<Duration>,
47}
48
49#[derive(Default, Debug)]
51pub struct SystemResult {
52 pub(crate) operation_id: String, pub(crate) state: OperationState,
54 pub(crate) statement: String,
55 pub(crate) result_string: String,
56}
57
58impl SystemRequest {
59 pub fn new(statement: &str) -> SystemRequest {
61 SystemRequest {
62 statement: statement.to_string(),
63 ..Default::default()
64 }
65 }
66
67 pub fn timeout(mut self, t: &Duration) -> Self {
74 self.timeout = Some(t.clone());
75 self
76 }
77
78 pub async fn execute(&self, h: &Handle) -> Result<SystemResult, NoSQLError> {
83 let mut w: Writer = Writer::new();
85 w.write_i16(h.inner.serial_version);
86 let timeout = h.get_timeout(&self.timeout);
87 self.nson_serialize(&mut w, &timeout);
88 let mut opts = SendOptions {
89 timeout: timeout,
90 retryable: false,
91 ..Default::default()
92 };
93 let mut r = h.send_and_receive(w, &mut opts).await?;
94 let resp = SystemRequest::nson_deserialize(&mut r)?;
95 Ok(resp)
96 }
97
98 pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
99 let mut ns = NsonSerializer::start_request(w);
100 ns.start_header();
101 ns.write_header(OpCode::SystemRequest, timeout, "");
102 ns.end_header();
103
104 ns.start_payload();
106 ns.write_string_field(STATEMENT, &self.statement);
107 ns.end_payload();
108
109 ns.end_request();
110 }
111
112 pub(crate) fn nson_deserialize(r: &mut Reader) -> Result<SystemResult, NoSQLError> {
113 let mut walker = MapWalker::new(r)?;
114 let mut res: SystemResult = Default::default();
115 while walker.has_next() {
116 walker.next()?;
117 let name = walker.current_name();
118 match name.as_str() {
119 ERROR_CODE => {
120 walker.handle_error_code()?;
121 }
122 OPERATION_ID => {
123 res.operation_id = walker.read_nson_string()?;
124 }
126 STATEMENT => {
127 res.statement = walker.read_nson_string()?;
128 }
130 SYSOP_RESULT => {
131 res.result_string = walker.read_nson_string()?;
132 }
134 SYSOP_STATE => {
135 let s = walker.read_nson_i32()?;
136 res.state = OperationState::from_int(s)?;
137 }
139 _ => {
140 walker.skip_nson_field()?;
142 }
143 }
144 }
145 Ok(res)
146 }
147}
148
149impl NsonRequest for SystemRequest {
150 fn serialize(&self, w: &mut Writer, timeout: &Duration) {
151 self.nson_serialize(w, timeout);
152 }
153}
154
155impl SystemStatusRequest {
156 pub fn new(operation_id: &str) -> SystemStatusRequest {
157 SystemStatusRequest {
158 operation_id: operation_id.to_string(),
159 ..Default::default()
160 }
161 }
162
163 #[allow(dead_code)]
170 pub fn timeout(mut self, t: &Duration) -> Self {
171 self.timeout = Some(t.clone());
172 self
173 }
174
175 pub async fn execute(&self, h: &Handle) -> Result<SystemResult, NoSQLError> {
176 let mut w: Writer = Writer::new();
178 w.write_i16(h.inner.serial_version);
179 let timeout = h.get_timeout(&self.timeout);
180 self.nson_serialize(&mut w, &timeout);
181 let mut opts = SendOptions {
182 timeout: timeout,
183 retryable: true,
184 ..Default::default()
185 };
186 let mut r = h.send_and_receive(w, &mut opts).await?;
187 let resp = SystemRequest::nson_deserialize(&mut r)?;
188 Ok(resp)
189 }
190
191 pub(crate) fn nson_serialize(&self, w: &mut Writer, timeout: &Duration) {
192 let mut ns = NsonSerializer::start_request(w);
193 ns.start_header();
194 ns.write_header(OpCode::SystemStatusRequest, timeout, "");
195 ns.end_header();
196
197 ns.start_payload();
199 ns.write_string_field(OPERATION_ID, &self.operation_id);
200 ns.end_payload();
201
202 ns.end_request();
203 }
204}
205
206impl NsonRequest for SystemStatusRequest {
207 fn serialize(&self, w: &mut Writer, timeout: &Duration) {
208 self.nson_serialize(w, timeout);
209 }
210}
211
212impl SystemResult {
213 pub async fn wait_for_completion(
218 &mut self,
219 h: &Handle,
220 wait: Duration,
221 delay: Duration,
222 ) -> Result<(), NoSQLError> {
223 if self.state == OperationState::Complete {
224 return Ok(());
225 }
226 if wait < delay {
227 return ia_err!("wait duration must be greater than delay duration");
228 }
229
230 let start_time = Instant::now();
231 let mut first_loop = true;
232
233 while self.state != OperationState::Complete {
234 if start_time.elapsed() > wait {
235 return Err(NoSQLError::new(
236 RequestTimeout,
237 "Operation not completed in expected time",
238 ));
239 }
240
241 if !first_loop {
242 sleep(delay);
243 }
244
245 let res = SystemStatusRequest::new(self.operation_id.as_str())
246 .execute(h)
247 .await?;
248
249 self.state = res.state;
251 self.result_string = res.result_string;
252
253 first_loop = false;
254 }
255
256 Ok(())
257 }
258
259 pub async fn wait_for_completion_ms(
267 &mut self,
268 h: &Handle,
269 wait_ms: u64,
270 delay_ms: u64,
271 ) -> Result<(), NoSQLError> {
272 self.wait_for_completion(
273 h,
274 Duration::from_millis(wait_ms),
275 Duration::from_millis(delay_ms),
276 )
277 .await
278 }
279
280 pub fn operation_id(&self) -> String {
281 self.operation_id.clone()
282 }
283
284 pub fn state(&self) -> OperationState {
285 self.state.clone()
286 }
287
288 pub fn statement(&self) -> String {
289 self.statement.clone()
290 }
291
292 pub fn result_string(&self) -> String {
293 self.result_string.clone()
294 }
295}