tsubakuro_rust_core/job/
job.rs

1use std::{sync::Arc, time::Duration};
2
3use super::cancel_job::CancelJob;
4use log::{error, warn};
5
6use crate::{
7    client_error,
8    error::TgError,
9    service::endpoint::endpoint_broker::EndpointBroker,
10    session::wire::{response::WireResponse, response_box::SlotEntryHandle, Wire},
11    util::Timeout,
12};
13
14/// Job.
15///
16/// An object that provides asynchronous response data,
17/// and it may be canceled before the underlying task was done.
18///
19/// **thread unsafe**
20///
21/// # Examples
22/// ```
23/// use tsubakuro_rust_core::prelude::*;
24///
25/// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
26///     let sql = "insert into tb values(1, 'abc')";
27///     let mut job = client.execute_async(transaction, sql).await?;
28///
29///     let execute_result = job.take_for(std::time::Duration::from_secs(10)).await?;
30///
31///     Ok(())
32/// }
33/// ```
34pub struct Job<T> {
35    name: String,
36    wire: Arc<Wire>,
37    slot_handle: Arc<SlotEntryHandle>,
38    converter: Box<dyn Fn(Arc<SlotEntryHandle>, WireResponse) -> Result<T, TgError> + Send>,
39    default_timeout: Duration,
40    done: bool,
41    taked: bool,
42    canceled: bool,
43    closed: bool,
44    fail_on_drop_error: bool,
45}
46
47impl<T> std::fmt::Debug for Job<T> {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("Job")
50            .field("name", &self.name)
51            .field("wire", &self.wire)
52            .field("slot_handle", &self.slot_handle)
53            .field("default_timeout", &self.default_timeout)
54            .field("done", &self.done)
55            .field("taked", &self.taked)
56            .field("canceled", &self.canceled)
57            .field("closed", &self.closed)
58            .finish()
59    }
60}
61
62impl<T> Job<T> {
63    pub(crate) fn new<F>(
64        name: &str,
65        wire: Arc<Wire>,
66        slot_handle: Arc<SlotEntryHandle>,
67        converter: F,
68        default_timeout: Duration,
69        fail_on_drop_error: bool,
70    ) -> Job<T>
71    where
72        F: Fn(Arc<SlotEntryHandle>, WireResponse) -> Result<T, TgError> + Send + 'static,
73    {
74        Job {
75            name: name.to_string(),
76            wire,
77            slot_handle,
78            converter: Box::new(converter),
79            default_timeout,
80            done: false,
81            taked: false,
82            canceled: false,
83            closed: false,
84            fail_on_drop_error,
85        }
86    }
87
88    /// Get job name.
89    pub fn name(&self) -> &String {
90        &self.name
91    }
92
93    /// Set default timeout.
94    pub fn set_default_timeout(&mut self, timeout: Duration) {
95        self.default_timeout = timeout;
96    }
97
98    /// Wait for response.
99    ///
100    /// # Returns
101    /// - `Ok(true)` - Response received.
102    /// - `Ok(false)` - Timed out.
103    ///
104    /// # Examples
105    /// ```
106    /// use tsubakuro_rust_core::prelude::*;
107    ///
108    /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
109    ///     let done = job.wait(std::time::Duration::from_secs(10)).await?;
110    ///     if done {
111    ///         let execute_result = job.take().await?;
112    ///     }
113    ///
114    ///     Ok(())
115    /// }
116    /// ```
117    pub async fn wait(&mut self, timeout: Duration) -> Result<bool, TgError> {
118        // self.check_cancel()?;
119        // self.check_close()?;
120        if self.done {
121            return Ok(true);
122        }
123
124        let slot_handle = self.slot_handle.clone();
125        let timeout = Timeout::new(timeout);
126        let result = self.wire.wait_response(slot_handle, &timeout).await;
127        if let Ok(true) = result {
128            self.done = true;
129        }
130        result
131    }
132
133    /// Whether a response has been received.
134    ///
135    /// # Returns
136    /// - `Ok(true)` - Response received.
137    /// - `Ok(false)` - No response received.
138    ///
139    /// # Examples
140    /// ```
141    /// use tsubakuro_rust_core::prelude::*;
142    ///
143    /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
144    ///     loop {
145    ///         let done = job.is_done().await?;
146    ///         if done {
147    ///             let execute_result = job.take().await?;
148    ///             break;
149    ///         }
150    ///     }
151    ///
152    ///     Ok(())
153    /// }
154    /// ```
155    pub async fn is_done(&mut self) -> Result<bool, TgError> {
156        if self.done {
157            return Ok(true);
158        }
159        if self.canceled {
160            return Ok(false);
161        }
162        if self.closed {
163            return Ok(false);
164        }
165
166        let slot_handle = self.slot_handle.clone();
167        let result = self.wire.check_response(slot_handle).await;
168        if let Ok(true) = result {
169            self.done = true;
170        }
171        result
172    }
173
174    /// Retrieves the result value, or wait until response has been received.
175    ///
176    /// You can only take once to retrieve the value.
177    ///
178    /// # Examples
179    /// ```
180    /// use tsubakuro_rust_core::prelude::*;
181    ///
182    /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
183    ///     let execute_result = job.take().await?;
184    ///
185    ///     Ok(())
186    /// }
187    /// ```
188    pub async fn take(&mut self) -> Result<T, TgError> {
189        let timeout = self.default_timeout;
190        self.take_for(timeout).await
191    }
192
193    /// Retrieves the result value, or wait until response has been received.
194    ///
195    /// You can only take once to retrieve the value.
196    ///
197    /// # Examples
198    /// ```
199    /// use tsubakuro_rust_core::prelude::*;
200    ///
201    /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
202    ///     let execute_result = job.take_for(std::time::Duration::from_secs(10)).await?;
203    ///
204    ///     Ok(())
205    /// }
206    /// ```
207    pub async fn take_for(&mut self, timeout: Duration) -> Result<T, TgError> {
208        if self.taked {
209            return Err(client_error!(format!("Job<{}> already taked", self.name)));
210        }
211        // self.check_cancel()?;
212        // self.check_close()?;
213
214        let slot_handle = &self.slot_handle;
215        let timeout = Timeout::new(timeout);
216        let response = self.wire.pull_response(slot_handle, &timeout).await?;
217        self.done = true;
218        self.taked = true;
219        (self.converter)(slot_handle.clone(), response)
220    }
221
222    /// Retrieves the result value if a response has been received.
223    ///
224    /// You can only take once to retrieve the value.
225    ///
226    /// # Returns
227    /// - `Ok(Some(value))` - result value
228    /// - `Ok(None)` - No response received.
229    ///
230    /// # Examples
231    /// ```
232    /// use tsubakuro_rust_core::prelude::*;
233    ///
234    /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
235    ///     loop {
236    ///         if let Some(execute_result) = job.take_if_ready().await? {
237    ///             break;
238    ///         }
239    ///     }
240    ///
241    ///     Ok(())
242    /// }
243    /// ```
244    pub async fn take_if_ready(&mut self) -> Result<Option<T>, TgError> {
245        if self.is_done().await? {
246            let value = self.take_for(Duration::ZERO).await?;
247            Ok(Some(value))
248        } else {
249            Ok(None)
250        }
251    }
252
253    /// Cancel job.
254    ///
255    /// # Returns
256    /// - `Ok(true)` - Response received.
257    /// - `Ok(false)` - Timed out.
258    ///
259    /// The response is not necessarily OPERATION_CANCELED.
260    /// Depending on the timing, it may be a normal processing result.
261    ///
262    /// # Examples
263    /// ```
264    /// use tsubakuro_rust_core::prelude::*;
265    ///
266    /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
267    ///     job.cancel().await?;
268    ///
269    ///     Ok(())
270    /// }
271    /// ```
272    pub async fn cancel(self) -> Result<bool, TgError> {
273        let timeout = self.default_timeout;
274        self.cancel_for(timeout).await
275    }
276
277    /// Cancel job.
278    ///
279    /// # Returns
280    /// - `Ok(true)` - Response already received, or cancel already started.
281    /// - `Ok(false)` - Timed out.
282    ///
283    /// The response is not necessarily OPERATION_CANCELED.
284    /// Depending on the timing, it may be a normal processing result.
285    pub async fn cancel_for(self, timeout: Duration) -> Result<bool, TgError> {
286        let job = self.cancel_async().await?;
287        match job {
288            Some(mut job) => {
289                let done = job.wait(timeout).await?;
290                Ok(done)
291            }
292            _ => Ok(true),
293        }
294    }
295
296    /// Cancel job.
297    ///
298    /// # Returns
299    /// - `Ok(Some(CancelJob))` - Cancellation started.
300    /// - `Ok(None)` - Not canceled. (response already received, or cancel already started)
301    ///
302    /// # Examples
303    /// ```
304    /// use tsubakuro_rust_core::prelude::*;
305    ///
306    /// async fn example(mut job: Job<SqlExecuteResult>) -> Result<(), TgError> {
307    ///     if let Some(mut cancel_job) = job.cancel_async().await? {
308    ///         cancel_job.wait(std::time::Duration::from_secs(10)).await?;
309    ///     }
310    ///
311    ///     Ok(())
312    /// }
313    /// ```
314    pub async fn cancel_async(mut self) -> Result<Option<CancelJob>, TgError> {
315        // self.check_close()?;
316        if self.done {
317            return Ok(None);
318        }
319        if self.canceled {
320            return Ok(None);
321        }
322        self.canceled = true;
323
324        self.send_cancel().await?;
325        let job = CancelJob::new(self.wire.clone(), self.slot_handle.clone());
326        Ok(Some(job))
327    }
328
329    // fn check_cancel(&self) -> Result<(), TgError> {
330    //     if self.canceled {
331    //         return Err(client_error!(format!("Job<{}> canceled", self.name)));
332    //     }
333    //     Ok(())
334    // }
335
336    /// Disposes this resource.
337    ///
338    /// If no response is received and no cancellation is made, then execute cancel.
339    pub async fn close(mut self) -> Result<(), TgError> {
340        if self.closed {
341            return Ok(());
342        }
343        self.closed = true;
344
345        if self.done || self.canceled {
346            return Ok(());
347        }
348
349        self.send_cancel().await // send only (do not check response)
350    }
351
352    // fn check_close(&self) -> Result<(), TgError> {
353    //     if self.closed {
354    //         return Err(client_error!(format!("Job<{}> already closed", self.name)));
355    //     }
356    //     Ok(())
357    // }
358
359    async fn send_cancel(&self) -> Result<(), TgError> {
360        let slot = self.slot_handle.slot();
361        EndpointBroker::cancel(&self.wire, slot).await?;
362
363        Ok(())
364    }
365
366    /// for debug
367    #[doc(hidden)]
368    pub fn set_fail_on_drop_error(&mut self, value: bool) {
369        self.fail_on_drop_error = value;
370    }
371
372    pub(crate) fn fail_on_drop_error(&self) -> bool {
373        self.fail_on_drop_error
374    }
375}
376
377impl<T> Drop for Job<T> {
378    fn drop(&mut self) {
379        if self.done || self.canceled || self.closed {
380            return;
381        }
382        if self.slot_handle.exists_wire_response() {
383            return;
384        }
385
386        std::thread::scope(|scope| {
387            scope.spawn(move || {
388                let runtime = {
389                    match tokio::runtime::Runtime::new() {
390                        Ok(runtime) => runtime,
391                        Err(e) => {
392                            error!("Job<{}>.drop() runtime::new error. {}", self.name, e);
393                            if self.fail_on_drop_error() {
394                                panic!("Job<{}>.drop() runtime::new error. {}", self.name, e);
395                            }
396                            return;
397                        }
398                    }
399                };
400                runtime.block_on(async {
401                    let result = self.send_cancel().await; // send only (do not check response)
402                    if let Err(e) = result {
403                        warn!("Job<{}>.drop() close error. {}", self.name, e);
404                        if self.fail_on_drop_error() {
405                            panic!("Job<{}>.drop() close error. {}", self.name, e);
406                        }
407                    }
408                })
409            });
410        });
411    }
412}