tsubakuro_rust_core/job/job.rs
1use std::{sync::Arc, time::Duration};
2
3use super::cancel_job::CancelJob;
4use log::{error, warn};
5
6use crate::{
7 client_error,
8 error::TgError,
9 service::endpoint::endpoint_broker::EndpointBroker,
10 session::wire::{response::WireResponse, response_box::SlotEntryHandle, Wire},
11 util::Timeout,
12};
13
14/// Job.
15///
16/// An object that provides asynchronous response data,
17/// and it may be canceled before the underlying task was done.
18///
19/// **thread unsafe**
20///
21/// # Examples
22/// ```
23/// use tsubakuro_rust_core::prelude::*;
24///
25/// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
26/// let sql = "insert into tb values(1, 'abc')";
27/// let mut job = client.execute_async(transaction, sql).await?;
28///
29/// let execute_result = job.take_for(std::time::Duration::from_secs(10)).await?;
30///
31/// Ok(())
32/// }
33/// ```
34pub struct Job<T> {
35 name: String,
36 wire: Arc<Wire>,
37 slot_handle: Arc<SlotEntryHandle>,
38 converter: Box<dyn Fn(Arc<SlotEntryHandle>, WireResponse) -> Result<T, TgError> + Send>,
39 default_timeout: Duration,
40 done: bool,
41 taked: bool,
42 canceled: bool,
43 closed: bool,
44 fail_on_drop_error: bool,
45}
46
47impl<T> std::fmt::Debug for Job<T> {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("Job")
50 .field("name", &self.name)
51 .field("wire", &self.wire)
52 .field("slot_handle", &self.slot_handle)
53 .field("default_timeout", &self.default_timeout)
54 .field("done", &self.done)
55 .field("taked", &self.taked)
56 .field("canceled", &self.canceled)
57 .field("closed", &self.closed)
58 .finish()
59 }
60}
61
62impl<T> Job<T> {
63 pub(crate) fn new<F>(
64 name: &str,
65 wire: Arc<Wire>,
66 slot_handle: Arc<SlotEntryHandle>,
67 converter: F,
68 default_timeout: Duration,
69 fail_on_drop_error: bool,
70 ) -> Job<T>
71 where
72 F: Fn(Arc<SlotEntryHandle>, WireResponse) -> Result<T, TgError> + Send + 'static,
73 {
74 Job {
75 name: name.to_string(),
76 wire,
77 slot_handle,
78 converter: Box::new(converter),
79 default_timeout,
80 done: false,
81 taked: false,
82 canceled: false,
83 closed: false,
84 fail_on_drop_error,
85 }
86 }
87
88 /// Get job name.
89 pub fn name(&self) -> &String {
90 &self.name
91 }
92
93 /// Set default timeout.
94 pub fn set_default_timeout(&mut self, timeout: Duration) {
95 self.default_timeout = timeout;
96 }
97
98 /// Wait for response.
99 ///
100 /// # Returns
101 /// - `Ok(true)` - Response received.
102 /// - `Ok(false)` - Timed out.
103 ///
104 /// # Examples
105 /// ```
106 /// use tsubakuro_rust_core::prelude::*;
107 ///
108 /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
109 /// let done = job.wait(std::time::Duration::from_secs(10)).await?;
110 /// if done {
111 /// let execute_result = job.take().await?;
112 /// }
113 ///
114 /// Ok(())
115 /// }
116 /// ```
117 pub async fn wait(&mut self, timeout: Duration) -> Result<bool, TgError> {
118 // self.check_cancel()?;
119 // self.check_close()?;
120 if self.done {
121 return Ok(true);
122 }
123
124 let slot_handle = self.slot_handle.clone();
125 let timeout = Timeout::new(timeout);
126 let result = self.wire.wait_response(slot_handle, &timeout).await;
127 if let Ok(true) = result {
128 self.done = true;
129 }
130 result
131 }
132
133 /// Whether a response has been received.
134 ///
135 /// # Returns
136 /// - `Ok(true)` - Response received.
137 /// - `Ok(false)` - No response received.
138 ///
139 /// # Examples
140 /// ```
141 /// use tsubakuro_rust_core::prelude::*;
142 ///
143 /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
144 /// loop {
145 /// let done = job.is_done().await?;
146 /// if done {
147 /// let execute_result = job.take().await?;
148 /// break;
149 /// }
150 /// }
151 ///
152 /// Ok(())
153 /// }
154 /// ```
155 pub async fn is_done(&mut self) -> Result<bool, TgError> {
156 if self.done {
157 return Ok(true);
158 }
159 if self.canceled {
160 return Ok(false);
161 }
162 if self.closed {
163 return Ok(false);
164 }
165
166 let slot_handle = self.slot_handle.clone();
167 let result = self.wire.check_response(slot_handle).await;
168 if let Ok(true) = result {
169 self.done = true;
170 }
171 result
172 }
173
174 /// Retrieves the result value, or wait until response has been received.
175 ///
176 /// You can only take once to retrieve the value.
177 ///
178 /// # Examples
179 /// ```
180 /// use tsubakuro_rust_core::prelude::*;
181 ///
182 /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
183 /// let execute_result = job.take().await?;
184 ///
185 /// Ok(())
186 /// }
187 /// ```
188 pub async fn take(&mut self) -> Result<T, TgError> {
189 let timeout = self.default_timeout;
190 self.take_for(timeout).await
191 }
192
193 /// Retrieves the result value, or wait until response has been received.
194 ///
195 /// You can only take once to retrieve the value.
196 ///
197 /// # Examples
198 /// ```
199 /// use tsubakuro_rust_core::prelude::*;
200 ///
201 /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
202 /// let execute_result = job.take_for(std::time::Duration::from_secs(10)).await?;
203 ///
204 /// Ok(())
205 /// }
206 /// ```
207 pub async fn take_for(&mut self, timeout: Duration) -> Result<T, TgError> {
208 if self.taked {
209 return Err(client_error!(format!("Job<{}> already taked", self.name)));
210 }
211 // self.check_cancel()?;
212 // self.check_close()?;
213
214 let slot_handle = &self.slot_handle;
215 let timeout = Timeout::new(timeout);
216 let response = self.wire.pull_response(slot_handle, &timeout).await?;
217 self.done = true;
218 self.taked = true;
219 (self.converter)(slot_handle.clone(), response)
220 }
221
222 /// Retrieves the result value if a response has been received.
223 ///
224 /// You can only take once to retrieve the value.
225 ///
226 /// # Returns
227 /// - `Ok(Some(value))` - result value
228 /// - `Ok(None)` - No response received.
229 ///
230 /// # Examples
231 /// ```
232 /// use tsubakuro_rust_core::prelude::*;
233 ///
234 /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
235 /// loop {
236 /// if let Some(execute_result) = job.take_if_ready().await? {
237 /// break;
238 /// }
239 /// }
240 ///
241 /// Ok(())
242 /// }
243 /// ```
244 pub async fn take_if_ready(&mut self) -> Result<Option<T>, TgError> {
245 if self.is_done().await? {
246 let value = self.take_for(Duration::ZERO).await?;
247 Ok(Some(value))
248 } else {
249 Ok(None)
250 }
251 }
252
253 /// Cancel job.
254 ///
255 /// # Returns
256 /// - `Ok(true)` - Response received.
257 /// - `Ok(false)` - Timed out.
258 ///
259 /// The response is not necessarily OPERATION_CANCELED.
260 /// Depending on the timing, it may be a normal processing result.
261 ///
262 /// # Examples
263 /// ```
264 /// use tsubakuro_rust_core::prelude::*;
265 ///
266 /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
267 /// job.cancel().await?;
268 ///
269 /// Ok(())
270 /// }
271 /// ```
272 pub async fn cancel(self) -> Result<bool, TgError> {
273 let timeout = self.default_timeout;
274 self.cancel_for(timeout).await
275 }
276
277 /// Cancel job.
278 ///
279 /// # Returns
280 /// - `Ok(true)` - Response already received, or cancel already started.
281 /// - `Ok(false)` - Timed out.
282 ///
283 /// The response is not necessarily OPERATION_CANCELED.
284 /// Depending on the timing, it may be a normal processing result.
285 pub async fn cancel_for(self, timeout: Duration) -> Result<bool, TgError> {
286 let job = self.cancel_async().await?;
287 match job {
288 Some(mut job) => {
289 let done = job.wait(timeout).await?;
290 Ok(done)
291 }
292 _ => Ok(true),
293 }
294 }
295
296 /// Cancel job.
297 ///
298 /// # Returns
299 /// - `Ok(Some(CancelJob))` - Cancellation started.
300 /// - `Ok(None)` - Not canceled. (response already received, or cancel already started)
301 ///
302 /// # Examples
303 /// ```
304 /// use tsubakuro_rust_core::prelude::*;
305 ///
306 /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
307 /// if let Some(mut cancel_job) = job.cancel_async().await? {
308 /// cancel_job.wait(std::time::Duration::from_secs(10)).await?;
309 /// }
310 ///
311 /// Ok(())
312 /// }
313 /// ```
314 pub async fn cancel_async(mut self) -> Result<Option<CancelJob>, TgError> {
315 // self.check_close()?;
316 if self.done {
317 return Ok(None);
318 }
319 if self.canceled {
320 return Ok(None);
321 }
322 self.canceled = true;
323
324 self.send_cancel().await?;
325 let job = CancelJob::new(self.wire.clone(), self.slot_handle.clone());
326 Ok(Some(job))
327 }
328
329 // fn check_cancel(&self) -> Result<(), TgError> {
330 // if self.canceled {
331 // return Err(client_error!(format!("Job<{}> canceled", self.name)));
332 // }
333 // Ok(())
334 // }
335
336 /// Disposes this resource.
337 ///
338 /// If no response is received and no cancellation is made, then execute cancel.
339 pub async fn close(mut self) -> Result<(), TgError> {
340 if self.closed {
341 return Ok(());
342 }
343 self.closed = true;
344
345 if self.done || self.canceled {
346 return Ok(());
347 }
348
349 self.send_cancel().await // send only (do not check response)
350 }
351
352 // fn check_close(&self) -> Result<(), TgError> {
353 // if self.closed {
354 // return Err(client_error!(format!("Job<{}> already closed", self.name)));
355 // }
356 // Ok(())
357 // }
358
359 async fn send_cancel(&self) -> Result<(), TgError> {
360 let slot = self.slot_handle.slot();
361 EndpointBroker::cancel(&self.wire, slot).await?;
362
363 Ok(())
364 }
365
366 /// for debug
367 #[doc(hidden)]
368 pub fn set_fail_on_drop_error(&mut self, value: bool) {
369 self.fail_on_drop_error = value;
370 }
371
372 pub(crate) fn fail_on_drop_error(&self) -> bool {
373 self.fail_on_drop_error
374 }
375}
376
377impl<T> Drop for Job<T> {
378 fn drop(&mut self) {
379 if self.done || self.canceled || self.closed {
380 return;
381 }
382 if self.slot_handle.exists_wire_response() {
383 return;
384 }
385
386 std::thread::scope(|scope| {
387 scope.spawn(move || {
388 let runtime = {
389 match tokio::runtime::Runtime::new() {
390 Ok(runtime) => runtime,
391 Err(e) => {
392 error!("Job<{}>.drop() runtime::new error. {}", self.name, e);
393 if self.fail_on_drop_error() {
394 panic!("Job<{}>.drop() runtime::new error. {}", self.name, e);
395 }
396 return;
397 }
398 }
399 };
400 runtime.block_on(async {
401 let result = self.send_cancel().await; // send only (do not check response)
402 if let Err(e) = result {
403 warn!("Job<{}>.drop() close error. {}", self.name, e);
404 if self.fail_on_drop_error() {
405 panic!("Job<{}>.drop() close error. {}", self.name, e);
406 }
407 }
408 })
409 });
410 });
411 }
412}