picodata_plugin/
background.rs

1//! Background container for long live jobs.
2//! Background container guarantees job liveness across plugin life cycle.
3
4use crate::internal::ffi;
5#[allow(unused_imports)]
6use crate::plugin::interface::PicoContext;
7use crate::plugin::interface::ServiceId;
8use crate::util::tarantool_error_to_box_error;
9use crate::util::DisplayErrorLocation;
10use std::cell::Cell;
11use std::time::Duration;
12use tarantool::error::BoxError;
13use tarantool::error::TarantoolErrorCode;
14use tarantool::fiber;
15use tarantool::fiber::FiberId;
16use tarantool::fiber::{Channel, RecvError};
17use tarantool::util::IntoClones;
18
19/// Same as [`PicoContext::register_job`].
20#[track_caller]
21pub fn register_job<F>(service_id: &ServiceId, job: F) -> Result<(), BoxError>
22where
23    F: FnOnce(CancellationToken) + 'static,
24{
25    let loc = std::panic::Location::caller();
26    let tag = format!("{}:{}", loc.file(), loc.line());
27
28    register_tagged_job(service_id, job, &tag)
29}
30
31/// Same as [`PicoContext::register_tagged_job`].
32#[allow(deprecated)]
33pub fn register_tagged_job<F>(service_id: &ServiceId, job: F, tag: &str) -> Result<(), BoxError>
34where
35    F: FnOnce(CancellationToken) + 'static,
36{
37    let (token, handle) = CancellationToken::new();
38    let finish_channel = handle.finish_channel.clone();
39
40    let fiber_id = fiber::Builder::new()
41        .name(tag)
42        .func(move || {
43            job(token);
44            // send shutdown signal to the waiter side
45            _ = finish_channel.send(());
46        })
47        .start_non_joinable()
48        .map_err(tarantool_error_to_box_error)?;
49
50    let plugin = &service_id.plugin;
51    let service = &service_id.service;
52    let version = &service_id.version;
53
54    let token = FfiBackgroundJobCancellationToken::new(
55        fiber_id,
56        handle.cancel_channel,
57        handle.finish_channel,
58    );
59    register_background_job_cancellation_token(plugin, service, version, tag, token)?;
60
61    Ok(())
62}
63
64fn register_background_job_cancellation_token(
65    plugin: &str,
66    service: &str,
67    version: &str,
68    job_tag: &str,
69    token: FfiBackgroundJobCancellationToken,
70) -> Result<(), BoxError> {
71    // SAFETY: safe as long as picodata version is compatible
72    let rc = unsafe {
73        ffi::pico_ffi_background_register_job_cancellation_token(
74            plugin.into(),
75            service.into(),
76            version.into(),
77            job_tag.into(),
78            token,
79        )
80    };
81
82    if rc != 0 {
83        return Err(BoxError::last());
84    }
85
86    Ok(())
87}
88
89////////////////////////////////////////////////////////////////////////////////
90// cancel_jobs_by_tag
91////////////////////////////////////////////////////////////////////////////////
92
93/// Outcome of the request to cancel a set of background jobs.
94#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
95#[repr(C)]
96pub struct JobCancellationResult {
97    /// Attempted to cancel this many background jobs.
98    pub n_total: u32,
99    /// This many jobs didn't finish in time.
100    pub n_timeouts: u32,
101}
102
103impl JobCancellationResult {
104    #[inline(always)]
105    pub fn new(n_total: u32, n_timeouts: u32) -> Self {
106        Self {
107            n_total,
108            n_timeouts,
109        }
110    }
111}
112
113/// Same as [`PicoContext::cancel_tagged_jobs`].
114#[inline(always)]
115pub fn cancel_jobs_by_tag(
116    service_id: &ServiceId,
117    job_tag: &str,
118    timeout: Duration,
119) -> Result<JobCancellationResult, BoxError> {
120    cancel_background_jobs_by_tag_inner(
121        &service_id.plugin,
122        &service_id.service,
123        &service_id.version,
124        job_tag,
125        timeout,
126    )
127}
128
129/// Same as [`PicoContext::cancel_tagged_jobs`].
130pub fn cancel_background_jobs_by_tag_inner(
131    plugin: &str,
132    service: &str,
133    version: &str,
134    job_tag: &str,
135    timeout: Duration,
136) -> Result<JobCancellationResult, BoxError> {
137    let mut result = JobCancellationResult::default();
138    // SAFETY: safe as long as picodata version is compatible
139    let rc = unsafe {
140        ffi::pico_ffi_background_cancel_jobs_by_tag(
141            plugin.into(),
142            service.into(),
143            version.into(),
144            job_tag.into(),
145            timeout.as_secs_f64(),
146            &mut result,
147        )
148    };
149
150    if rc != 0 {
151        return Err(BoxError::last());
152    }
153
154    Ok(result)
155}
156
157#[derive(thiserror::Error, Debug)]
158pub enum Error {
159    #[error("Some of jobs are not fully completed, expected: {0}, completed: {1}")]
160    PartialCompleted(usize, usize),
161    #[error("timeout")]
162    CancellationTimeout,
163}
164
165////////////////////////////////////////////////////////////////////////////////
166// CancellationToken
167////////////////////////////////////////////////////////////////////////////////
168
169/// A token which can be used to signal a cancellation request to the job.
170#[non_exhaustive]
171#[derive(Debug)]
172pub struct CancellationToken {
173    cancel_channel: Channel<()>,
174}
175
176impl CancellationToken {
177    /// Create a cancellation token and cancellation token handle pair.
178    /// User should use cancellation token for graceful shutdown their job.
179    /// Cancellation token handle used by `picodata` for sending cancel signal to a user job.
180    #[allow(deprecated)]
181    pub fn new() -> (CancellationToken, CancellationTokenHandle) {
182        let (cancel_tx, cancel_rx) = Channel::new(1).into_clones();
183        (
184            CancellationToken {
185                cancel_channel: cancel_rx,
186            },
187            CancellationTokenHandle {
188                cancel_channel: cancel_tx,
189                finish_channel: Channel::new(1),
190            },
191        )
192    }
193
194    /// Wait until token are canceled.
195    ///
196    /// # Arguments
197    ///
198    /// * `timeout`: if cancellation does not occur within the specified timeout - return error
199    ///
200    /// # Errors
201    ///
202    /// Return `Ok` if token is canceled or [`Error::CancellationTimeout`] if timeout is reached.
203    pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
204        match self.cancel_channel.recv_timeout(timeout) {
205            Err(RecvError::Timeout) => Err(Error::CancellationTimeout),
206            _ => Ok(()),
207        }
208    }
209}
210
211#[derive(Debug)]
212#[deprecated = "don't use this"]
213pub struct CancellationTokenHandle {
214    cancel_channel: Channel<()>,
215    finish_channel: Channel<()>,
216}
217
218#[allow(deprecated)]
219impl CancellationTokenHandle {
220    /// Cancel related job and return a backpressure channel.
221    /// Caller should wait a message in the backpressure channel
222    /// to make sure the job is completed successfully (graceful shutdown occurred).
223    pub fn cancel(self) -> Channel<()> {
224        let Self {
225            cancel_channel,
226            finish_channel,
227        } = self;
228        _ = cancel_channel.send(());
229        finish_channel
230    }
231}
232
233////////////////////////////////////////////////////////////////////////////////
234// Java-style API
235////////////////////////////////////////////////////////////////////////////////
236
237/// [`ServiceWorkerManager`] allows plugin services
238/// to create long-live jobs and manage their life cycle.
239#[derive(Clone, Debug)]
240#[non_exhaustive]
241pub struct ServiceWorkerManager {
242    service_id: ServiceId,
243}
244
245impl ServiceWorkerManager {
246    #[inline(always)]
247    pub(crate) fn new(service_id: ServiceId) -> Self {
248        Self { service_id }
249    }
250
251    /// Add a new job to the execution.
252    /// Job work life cycle will be tied to the service life cycle;
253    /// this means that job will be canceled just before service is stopped.
254    ///
255    /// # Arguments
256    ///
257    /// * `job`: callback that will be executed in separated fiber.
258    ///   Note that it is your responsibility to organize job graceful shutdown, see a
259    ///   [`CancellationToken`] for details.
260    ///
261    /// # Examples
262    ///
263    /// ```no_run
264    /// use std::time::Duration;
265    /// use picodata_plugin::background::CancellationToken;
266    ///
267    /// # use picodata_plugin::background::ServiceWorkerManager;
268    /// # fn test(worker_manager: ServiceWorkerManager) {
269    ///
270    /// // this job will print "hello" every second,
271    /// // and print "bye" after being canceled
272    /// fn hello_printer(cancel: CancellationToken) {
273    ///     while cancel.wait_timeout(Duration::from_secs(1)).is_err() {
274    ///         println!("hello!");
275    ///     }
276    ///     println!("job cancelled, bye!")
277    /// }
278    /// worker_manager.register_job(hello_printer).unwrap();
279    ///
280    /// # }
281    /// ```
282    #[track_caller]
283    #[inline(always)]
284    pub fn register_job<F>(&self, job: F) -> tarantool::Result<()>
285    where
286        F: FnOnce(CancellationToken) + 'static,
287    {
288        register_job(&self.service_id, job)?;
289
290        Ok(())
291    }
292
293    /// Same as [`ServiceWorkerManager::register_job`] but caller may provide a special tag.
294    /// This tag may be used for manual job cancellation using [`ServiceWorkerManager::cancel_tagged`].
295    ///
296    /// # Arguments
297    ///
298    /// * `job`: callback that will be executed in separated fiber
299    /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs
300    #[inline(always)]
301    pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> tarantool::Result<()>
302    where
303        F: FnOnce(CancellationToken) + 'static,
304    {
305        register_tagged_job(&self.service_id, job, tag)?;
306
307        Ok(())
308    }
309
310    /// Cancel all jobs related to the given tag.
311    /// This function return after all related jobs will be gracefully shutdown or
312    /// after `timeout` duration.
313    /// May return [`Error::PartialCompleted`] if timeout is reached.
314    ///
315    /// # Arguments
316    ///
317    /// * `tag`: determine what jobs should be cancelled
318    /// * `timeout`: shutdown timeout
319    pub fn cancel_tagged(&self, tag: &str, timeout: Duration) -> Result<(), Error> {
320        let res = cancel_jobs_by_tag(&self.service_id, tag, timeout);
321        let res = match res {
322            Ok(res) => res,
323            Err(e) => {
324                let loc = DisplayErrorLocation(&e);
325                tarantool::say_error!("unexpected error: {loc}{e}");
326                return Ok(());
327            }
328        };
329
330        if res.n_timeouts != 0 {
331            let n_completed = res.n_total - res.n_timeouts;
332            return Err(Error::PartialCompleted(res.n_total as _, n_completed as _));
333        }
334
335        Ok(())
336    }
337
338    /// In case when jobs were canceled by `picodata` use this function for determine
339    /// a shutdown timeout - time duration that `picodata` uses to ensure that all
340    /// jobs gracefully end.
341    ///
342    /// By default, 5-second timeout are used.
343    #[deprecated = "use `PicoContext::set_jobs_shutdown_timeout` instead"]
344    pub fn set_shutdown_timeout(&self, timeout: Duration) {
345        let plugin = &self.service_id.plugin;
346        let service = &self.service_id.service;
347        let version = &self.service_id.version;
348        set_jobs_shutdown_timeout(plugin, service, version, timeout)
349    }
350}
351
352////////////////////////////////////////////////////////////////////////////////
353// set_background_jobs_shutdown_timeout
354////////////////////////////////////////////////////////////////////////////////
355
356/// In case when jobs were canceled by `picodata` use this function to determine
357/// a shutdown timeout - time duration that `picodata` uses to ensure that all
358/// jobs gracefully end.
359///
360/// By default, 5-second timeout are used.
361///
362/// Consider using [`PicoContext::set_jobs_shutdown_timeout`] instead
363pub fn set_jobs_shutdown_timeout(plugin: &str, service: &str, version: &str, timeout: Duration) {
364    // SAFETY: safe as long as picodata version is compatible
365    let rc = unsafe {
366        ffi::pico_ffi_background_set_jobs_shutdown_timeout(
367            plugin.into(),
368            service.into(),
369            version.into(),
370            timeout.as_secs_f64(),
371        )
372    };
373    debug_assert!(
374        rc == 0,
375        "return code is only for future compatibility at the moment"
376    );
377}
378
379////////////////////////////////////////////////////////////////////////////////
380// CancellationCallbackState
381////////////////////////////////////////////////////////////////////////////////
382
383#[derive(Debug)]
384pub struct CancellationCallbackState {
385    cancel_channel: Channel<()>,
386    finish_channel: Channel<()>,
387    status: Cell<CancellationCallbackStatus>,
388}
389
390#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
391pub enum CancellationCallbackStatus {
392    #[default]
393    Initial = 0,
394    JobCancelled = 1,
395    JobFinished = 2,
396}
397
398impl CancellationCallbackState {
399    fn new(cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
400        Self {
401            cancel_channel,
402            finish_channel,
403            status: Cell::new(CancellationCallbackStatus::Initial),
404        }
405    }
406
407    fn cancellation_callback(&self, action: u64, timeout: Duration) -> Result<(), BoxError> {
408        use CancellationCallbackStatus::*;
409        let next_status = action;
410
411        if next_status == JobCancelled as u64 {
412            debug_assert_eq!(self.status.get(), Initial);
413            _ = self.cancel_channel.send(());
414
415            self.status.set(JobCancelled);
416        } else if next_status == JobFinished as u64 {
417            debug_assert_eq!(self.status.get(), JobCancelled);
418            self.finish_channel.recv_timeout(timeout).map_err(|e| {
419                BoxError::new(TarantoolErrorCode::Timeout, i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(e))
420            })?;
421
422            self.status.set(JobFinished);
423        } else {
424            return Err(BoxError::new(
425                TarantoolErrorCode::IllegalParams,
426                format!("unexpected action: {action}"),
427            ));
428        }
429
430        Ok(())
431    }
432}
433
434#[inline]
435fn i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(
436    e: fiber::channel::RecvError,
437) -> &'static str {
438    match e {
439        fiber::channel::RecvError::Timeout => "timeout",
440        fiber::channel::RecvError::Disconnected => "disconnected",
441    }
442}
443
444////////////////////////////////////////////////////////////////////////////////
445// ffi wrappers
446////////////////////////////////////////////////////////////////////////////////
447
448/// **For internal use**.
449#[repr(C)]
450pub struct FfiBackgroundJobCancellationToken {
451    /// This is just a way to do arbitrarily complex things across ABI boundary
452    /// without introducing a large amount of FFI-safe wrappers. This will
453    /// always be [`CancellationCallbackState::cancellation_callback`].
454    callback: extern "C-unwind" fn(data: *const Self, action: u64, timeout: f64) -> i32,
455    drop: extern "C-unwind" fn(*mut Self),
456
457    /// The pointer to the closure object.
458    closure_pointer: *mut (),
459
460    /// This is the background job fiber which will be cancelled by this token.
461    pub fiber_id: FiberId,
462}
463
464impl Drop for FfiBackgroundJobCancellationToken {
465    #[inline(always)]
466    fn drop(&mut self) {
467        (self.drop)(self)
468    }
469}
470
471impl FfiBackgroundJobCancellationToken {
472    fn new(fiber_id: FiberId, cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
473        let callback_state = CancellationCallbackState::new(cancel_channel, finish_channel);
474        let callback = move |action, timeout| {
475            let res = callback_state.cancellation_callback(action, timeout);
476            if let Err(e) = res {
477                e.set_last();
478                return -1;
479            }
480
481            0
482        };
483
484        Self::new_inner(fiber_id, callback)
485    }
486
487    /// This function is needed, because we need this `F` type parameter so that
488    /// we can specialize the `callback` and `drop` with it inside this function.
489    /// If rust supported something like `type F = type_of(callback);` we
490    /// wouldn't need this additional function and would just write this code in
491    /// the [`Self::new`] above.
492    // FIXME: just define an explicit extern "C" fn for cancellation_callback?
493    fn new_inner<F>(fiber_id: FiberId, f: F) -> Self
494    where
495        F: FnMut(u64, Duration) -> i32,
496    {
497        let closure = Box::new(f);
498        let closure_pointer: *mut F = Box::into_raw(closure);
499
500        Self {
501            callback: Self::trampoline::<F>,
502            drop: Self::drop_handler::<F>,
503            closure_pointer: closure_pointer.cast(),
504
505            fiber_id,
506        }
507    }
508
509    /// An ABI-safe wrapper which calls the rust closure stored in `handler`.
510    ///
511    /// The result of the closure is copied onto the fiber's region allocation
512    /// and the pointer to that allocation is written into `output`.
513    extern "C-unwind" fn trampoline<F>(data: *const Self, action: u64, timeout: f64) -> i32
514    where
515        F: FnMut(u64, Duration) -> i32,
516    {
517        // This is safe. To verify see `register_rpc_handler` above.
518        let closure_pointer: *mut F = unsafe { (*data).closure_pointer.cast::<F>() };
519        let closure = unsafe { &mut *closure_pointer };
520
521        closure(action, Duration::from_secs_f64(timeout))
522    }
523
524    extern "C-unwind" fn drop_handler<F>(handler: *mut Self) {
525        unsafe {
526            let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>();
527            let closure = Box::from_raw(closure_pointer);
528            drop(closure);
529
530            if cfg!(debug_assertions) {
531                // Overwrite the pointer with garbage so that we fail loudly is case of a bug
532                (*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
533            }
534        }
535    }
536
537    /// The error is returned via [`BoxError::set_last`].
538    #[inline(always)]
539    pub fn cancel_job(&self) {
540        let rc = (self.callback)(self, CancellationCallbackStatus::JobCancelled as _, 0.0);
541        debug_assert!(rc == 0);
542    }
543
544    /// The error is returned via [`BoxError::set_last`].
545    #[inline(always)]
546    #[allow(clippy::result_unit_err)]
547    pub fn wait_job_finished(&self, timeout: Duration) -> Result<(), ()> {
548        let rc = (self.callback)(
549            self,
550            CancellationCallbackStatus::JobFinished as _,
551            timeout.as_secs_f64(),
552        );
553        if rc == -1 {
554            // Actual error is passed through tarantool. Can't return BoxError
555            // here, because tarantool-module version may be different in picodata.
556            return Err(());
557        }
558
559        Ok(())
560    }
561}