Skip to main content

picodata_plugin/
background.rs

1//! Background container for long live jobs.
2//! Background container guarantees job liveness across plugin life cycle.
3
4use crate::internal::ffi;
5#[allow(unused_imports)]
6use crate::plugin::interface::PicoContext;
7use crate::plugin::interface::ServiceId;
8use crate::util::tarantool_error_to_box_error;
9use crate::util::DisplayErrorLocation;
10use std::cell::Cell;
11use std::rc::Rc;
12use std::time::Duration;
13use tarantool::error::BoxError;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::fiber;
16use tarantool::fiber::FiberId;
17use tarantool::fiber::{Channel, RecvError};
18use tarantool::util::IntoClones;
19
20/// Same as [`PicoContext::register_job`].
21#[track_caller]
22pub fn register_job<F>(service_id: &ServiceId, job: F) -> Result<(), BoxError>
23where
24    F: FnOnce(CancellationToken) + 'static,
25{
26    let loc = std::panic::Location::caller();
27    let tag = format!("{}:{}", loc.file(), loc.line());
28
29    register_tagged_job(service_id, job, &tag)
30}
31
32/// Same as [`PicoContext::register_tagged_job`].
33#[allow(deprecated)]
34pub fn register_tagged_job<F>(service_id: &ServiceId, job: F, tag: &str) -> Result<(), BoxError>
35where
36    F: FnOnce(CancellationToken) + 'static,
37{
38    let (token, handle) = CancellationToken::new();
39    let finish_event = Rc::clone(&handle.finish_event);
40
41    let fiber_id = fiber::Builder::new()
42        .name(tag)
43        .func(move || {
44            job(token);
45            finish_event.signal();
46        })
47        .start_non_joinable()
48        .map_err(tarantool_error_to_box_error)?;
49
50    let plugin = &service_id.plugin;
51    let service = &service_id.service;
52    let version = &service_id.version;
53
54    let token = FfiBackgroundJobCancellationToken::new(
55        fiber_id,
56        handle.cancel_channel,
57        handle.finish_event,
58    );
59    register_background_job_cancellation_token(plugin, service, version, tag, token)?;
60
61    Ok(())
62}
63
64fn register_background_job_cancellation_token(
65    plugin: &str,
66    service: &str,
67    version: &str,
68    job_tag: &str,
69    token: FfiBackgroundJobCancellationToken,
70) -> Result<(), BoxError> {
71    // SAFETY: safe as long as picodata version is compatible
72    let rc = unsafe {
73        ffi::pico_ffi_background_register_job_cancellation_token(
74            plugin.into(),
75            service.into(),
76            version.into(),
77            job_tag.into(),
78            token,
79        )
80    };
81
82    if rc != 0 {
83        return Err(BoxError::last());
84    }
85
86    Ok(())
87}
88
89////////////////////////////////////////////////////////////////////////////////
90// cancel_jobs_by_tag
91////////////////////////////////////////////////////////////////////////////////
92
93/// Outcome of the request to cancel a set of background jobs.
94#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
95#[repr(C)]
96pub struct JobCancellationResult {
97    /// Attempted to cancel this many background jobs.
98    pub n_total: u32,
99    /// This many jobs didn't finish in time.
100    pub n_timeouts: u32,
101}
102
103impl JobCancellationResult {
104    #[inline(always)]
105    pub fn new(n_total: u32, n_timeouts: u32) -> Self {
106        Self {
107            n_total,
108            n_timeouts,
109        }
110    }
111}
112
113/// Same as [`PicoContext::cancel_tagged_jobs`].
114#[inline(always)]
115pub fn cancel_jobs_by_tag(
116    service_id: &ServiceId,
117    job_tag: &str,
118    timeout: Duration,
119) -> Result<JobCancellationResult, BoxError> {
120    cancel_background_jobs_by_tag_inner(
121        &service_id.plugin,
122        &service_id.service,
123        &service_id.version,
124        job_tag,
125        timeout,
126    )
127}
128
129/// Same as [`PicoContext::cancel_tagged_jobs`].
130pub fn cancel_background_jobs_by_tag_inner(
131    plugin: &str,
132    service: &str,
133    version: &str,
134    job_tag: &str,
135    timeout: Duration,
136) -> Result<JobCancellationResult, BoxError> {
137    let mut result = JobCancellationResult::default();
138    // SAFETY: safe as long as picodata version is compatible
139    let rc = unsafe {
140        ffi::pico_ffi_background_cancel_jobs_by_tag(
141            plugin.into(),
142            service.into(),
143            version.into(),
144            job_tag.into(),
145            timeout.as_secs_f64(),
146            &mut result,
147        )
148    };
149
150    if rc != 0 {
151        return Err(BoxError::last());
152    }
153
154    Ok(result)
155}
156
157#[derive(thiserror::Error, Debug)]
158pub enum Error {
159    #[error("Some of jobs are not fully completed, expected: {0}, completed: {1}")]
160    PartialCompleted(usize, usize),
161    #[error("timeout")]
162    CancellationTimeout,
163}
164
165////////////////////////////////////////////////////////////////////////////////
166// CancellationToken
167////////////////////////////////////////////////////////////////////////////////
168
169/// A token which can be used to signal a cancellation request to the job.
170#[non_exhaustive]
171#[derive(Debug)]
172pub struct CancellationToken {
173    cancel_channel: Channel<()>,
174}
175
176impl CancellationToken {
177    /// Create a cancellation token and cancellation token handle pair.
178    /// User should use cancellation token for graceful shutdown their job.
179    /// Cancellation token handle used by `picodata` for sending cancel signal to a user job.
180    #[allow(deprecated)]
181    pub fn new() -> (CancellationToken, CancellationTokenHandle) {
182        let (cancel_tx, cancel_rx) = Channel::new(1).into_clones();
183        (
184            CancellationToken {
185                cancel_channel: cancel_rx,
186            },
187            CancellationTokenHandle {
188                cancel_channel: cancel_tx,
189                finish_event: Rc::new(OnceEvent::new()),
190            },
191        )
192    }
193
194    /// Wait until token are canceled.
195    ///
196    /// # Arguments
197    ///
198    /// * `timeout`: if cancellation does not occur within the specified timeout - return error
199    ///
200    /// # Errors
201    ///
202    /// Return `Ok` if token is canceled or [`Error::CancellationTimeout`] if timeout is reached.
203    pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
204        match self.cancel_channel.recv_timeout(timeout) {
205            Err(RecvError::Timeout) => Err(Error::CancellationTimeout),
206            _ => Ok(()),
207        }
208    }
209}
210
211#[derive(Debug)]
212#[deprecated = "don't use this"]
213pub struct CancellationTokenHandle {
214    cancel_channel: Channel<()>,
215    finish_event: Rc<OnceEvent>,
216}
217
218#[allow(deprecated)]
219impl CancellationTokenHandle {
220    /// Cancel related job and return a finish event.
221    /// Caller should wait on the finish event to make sure the job
222    /// is completed successfully (graceful shutdown occurred).
223    pub fn cancel(self) -> Rc<OnceEvent> {
224        let Self {
225            cancel_channel,
226            finish_event,
227        } = self;
228        _ = cancel_channel.send(());
229        finish_event
230    }
231}
232
233////////////////////////////////////////////////////////////////////////////////
234// Java-style API
235////////////////////////////////////////////////////////////////////////////////
236
237/// [`ServiceWorkerManager`] allows plugin services
238/// to create long-live jobs and manage their life cycle.
239#[derive(Clone, Debug)]
240#[non_exhaustive]
241pub struct ServiceWorkerManager {
242    service_id: ServiceId,
243}
244
245impl ServiceWorkerManager {
246    #[inline(always)]
247    pub(crate) fn new(service_id: ServiceId) -> Self {
248        Self { service_id }
249    }
250
251    /// Add a new job to the execution.
252    /// Job work life cycle will be tied to the service life cycle;
253    /// this means that job will be canceled just before service is stopped.
254    ///
255    /// # Arguments
256    ///
257    /// * `job`: callback that will be executed in separated fiber.
258    ///   Note that it is your responsibility to organize job graceful shutdown, see a
259    ///   [`CancellationToken`] for details.
260    ///
261    /// # Examples
262    ///
263    /// ```no_run
264    /// use std::time::Duration;
265    /// use picodata_plugin::background::CancellationToken;
266    ///
267    /// # use picodata_plugin::background::ServiceWorkerManager;
268    /// # fn test(worker_manager: ServiceWorkerManager) {
269    ///
270    /// // this job will print "hello" every second,
271    /// // and print "bye" after being canceled
272    /// fn hello_printer(cancel: CancellationToken) {
273    ///     while cancel.wait_timeout(Duration::from_secs(1)).is_err() {
274    ///         println!("hello!");
275    ///     }
276    ///     println!("job cancelled, bye!")
277    /// }
278    /// worker_manager.register_job(hello_printer).unwrap();
279    ///
280    /// # }
281    /// ```
282    #[track_caller]
283    #[inline(always)]
284    pub fn register_job<F>(&self, job: F) -> tarantool::Result<()>
285    where
286        F: FnOnce(CancellationToken) + 'static,
287    {
288        register_job(&self.service_id, job)?;
289
290        Ok(())
291    }
292
293    /// Same as [`ServiceWorkerManager::register_job`] but caller may provide a special tag.
294    /// This tag may be used for manual job cancellation using [`ServiceWorkerManager::cancel_tagged`].
295    ///
296    /// # Arguments
297    ///
298    /// * `job`: callback that will be executed in separated fiber
299    /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs
300    #[inline(always)]
301    pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> tarantool::Result<()>
302    where
303        F: FnOnce(CancellationToken) + 'static,
304    {
305        register_tagged_job(&self.service_id, job, tag)?;
306
307        Ok(())
308    }
309
310    /// Cancel all jobs related to the given tag.
311    /// This function return after all related jobs will be gracefully shutdown or
312    /// after `timeout` duration.
313    /// May return [`Error::PartialCompleted`] if timeout is reached.
314    ///
315    /// # Arguments
316    ///
317    /// * `tag`: determine what jobs should be cancelled
318    /// * `timeout`: shutdown timeout
319    pub fn cancel_tagged(&self, tag: &str, timeout: Duration) -> Result<(), Error> {
320        let res = cancel_jobs_by_tag(&self.service_id, tag, timeout);
321        let res = match res {
322            Ok(res) => res,
323            Err(e) => {
324                let loc = DisplayErrorLocation(&e);
325                tarantool::say_error!("unexpected error: {loc}{e}");
326                return Ok(());
327            }
328        };
329
330        if res.n_timeouts != 0 {
331            let n_completed = res.n_total - res.n_timeouts;
332            return Err(Error::PartialCompleted(res.n_total as _, n_completed as _));
333        }
334
335        Ok(())
336    }
337
338    /// In case when jobs were canceled by `picodata` use this function for determine
339    /// a shutdown timeout - time duration that `picodata` uses to ensure that all
340    /// jobs gracefully end.
341    ///
342    /// By default, 5-second timeout are used.
343    #[deprecated = "use `PicoContext::set_jobs_shutdown_timeout` instead"]
344    pub fn set_shutdown_timeout(&self, timeout: Duration) {
345        let plugin = &self.service_id.plugin;
346        let service = &self.service_id.service;
347        let version = &self.service_id.version;
348        set_jobs_shutdown_timeout(plugin, service, version, timeout)
349    }
350}
351
352////////////////////////////////////////////////////////////////////////////////
353// set_background_jobs_shutdown_timeout
354////////////////////////////////////////////////////////////////////////////////
355
356/// In case when jobs were canceled by `picodata` use this function to determine
357/// a shutdown timeout - time duration that `picodata` uses to ensure that all
358/// jobs gracefully end.
359///
360/// By default, 5-second timeout are used.
361///
362/// Consider using [`PicoContext::set_jobs_shutdown_timeout`] instead
363pub fn set_jobs_shutdown_timeout(plugin: &str, service: &str, version: &str, timeout: Duration) {
364    // SAFETY: safe as long as picodata version is compatible
365    let rc = unsafe {
366        ffi::pico_ffi_background_set_jobs_shutdown_timeout(
367            plugin.into(),
368            service.into(),
369            version.into(),
370            timeout.as_secs_f64(),
371        )
372    };
373    debug_assert!(
374        rc == 0,
375        "return code is only for future compatibility at the moment"
376    );
377}
378
379////////////////////////////////////////////////////////////////////////////////
380// OnceEvent
381////////////////////////////////////////////////////////////////////////////////
382
383/// A one-time event that can be signaled once and waited on by multiple fibers.
384/// Late waiters (after signal) return immediately.
385#[derive(Debug)]
386pub struct OnceEvent {
387    finished: Cell<bool>,
388    cond: fiber::Cond,
389}
390
391impl OnceEvent {
392    fn new() -> Self {
393        Self {
394            finished: Cell::new(false),
395            cond: fiber::Cond::new(),
396        }
397    }
398
399    /// Signal that the job has finished. Wakes all waiting fibers.
400    pub fn signal(&self) {
401        self.finished.set(true);
402        self.cond.broadcast();
403    }
404
405    #[inline]
406    pub fn is_finished(&self) -> bool {
407        self.finished.get()
408    }
409
410    pub fn wait_timeout(&self, timeout: Duration) {
411        self.cond.wait_timeout(timeout);
412    }
413}
414
415////////////////////////////////////////////////////////////////////////////////
416// CancellationCallbackState
417////////////////////////////////////////////////////////////////////////////////
418
419#[derive(Debug)]
420pub struct CancellationCallbackState {
421    cancel_channel: Channel<()>,
422    finish_event: Rc<OnceEvent>,
423    status: Cell<CancellationCallbackStatus>,
424}
425
426#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
427pub enum CancellationCallbackStatus {
428    #[default]
429    Initial = 0,
430    JobCancelled = 1,
431    JobFinished = 2,
432}
433
434impl CancellationCallbackState {
435    fn new(cancel_channel: Channel<()>, finish_event: Rc<OnceEvent>) -> Self {
436        Self {
437            cancel_channel,
438            finish_event,
439            status: Cell::new(CancellationCallbackStatus::Initial),
440        }
441    }
442
443    fn cancellation_callback(&self, action: u64, timeout: Duration) -> Result<(), BoxError> {
444        use CancellationCallbackStatus::*;
445        let next_status = action;
446
447        if next_status == JobCancelled as u64 {
448            // Cancellation is idempotent.
449            if self.status.get() == JobCancelled || self.status.get() == JobFinished {
450                return Ok(());
451            }
452            debug_assert_eq!(self.status.get(), Initial);
453            _ = self.cancel_channel.send(());
454
455            self.status.set(JobCancelled);
456        } else if next_status == JobFinished as u64 {
457            if self.status.get() != JobCancelled && self.status.get() != JobFinished {
458                return Err(BoxError::new(
459                    TarantoolErrorCode::IllegalParams,
460                    "cannot wait for job that wasn't cancelled",
461                ));
462            }
463
464            let deadline = fiber::clock().saturating_add(timeout);
465
466            loop {
467                if self.finish_event.is_finished() {
468                    self.status.set(JobFinished);
469                    return Ok(());
470                }
471
472                let remaining = deadline.duration_since(fiber::clock());
473                if remaining.is_zero() {
474                    return Err(BoxError::new(TarantoolErrorCode::Timeout, "timeout"));
475                }
476
477                self.finish_event.wait_timeout(remaining);
478            }
479        } else {
480            return Err(BoxError::new(
481                TarantoolErrorCode::IllegalParams,
482                format!("unexpected action: {action}"),
483            ));
484        }
485
486        Ok(())
487    }
488}
489
490////////////////////////////////////////////////////////////////////////////////
491// ffi wrappers
492////////////////////////////////////////////////////////////////////////////////
493
494/// **For internal use**.
495#[repr(C)]
496pub struct FfiBackgroundJobCancellationToken {
497    /// This is just a way to do arbitrarily complex things across ABI boundary
498    /// without introducing a large amount of FFI-safe wrappers. This will
499    /// always be [`CancellationCallbackState::cancellation_callback`].
500    callback: extern "C-unwind" fn(data: *const Self, action: u64, timeout: f64) -> i32,
501    drop: extern "C-unwind" fn(*mut Self),
502
503    /// The pointer to the closure object.
504    closure_pointer: *mut (),
505
506    /// This is the background job fiber which will be cancelled by this token.
507    pub fiber_id: FiberId,
508}
509
510impl Drop for FfiBackgroundJobCancellationToken {
511    #[inline(always)]
512    fn drop(&mut self) {
513        (self.drop)(self)
514    }
515}
516
517impl FfiBackgroundJobCancellationToken {
518    fn new(fiber_id: FiberId, cancel_channel: Channel<()>, finish_event: Rc<OnceEvent>) -> Self {
519        let callback_state = CancellationCallbackState::new(cancel_channel, finish_event);
520        let callback = move |action, timeout| {
521            let res = callback_state.cancellation_callback(action, timeout);
522            if let Err(e) = res {
523                e.set_last();
524                return -1;
525            }
526
527            0
528        };
529
530        Self::new_inner(fiber_id, callback)
531    }
532
533    /// This function is needed, because we need this `F` type parameter so that
534    /// we can specialize the `callback` and `drop` with it inside this function.
535    /// If rust supported something like `type F = type_of(callback);` we
536    /// wouldn't need this additional function and would just write this code in
537    /// the [`Self::new`] above.
538    // FIXME: just define an explicit extern "C" fn for cancellation_callback?
539    fn new_inner<F>(fiber_id: FiberId, f: F) -> Self
540    where
541        F: FnMut(u64, Duration) -> i32,
542    {
543        let closure = Box::new(f);
544        let closure_pointer: *mut F = Box::into_raw(closure);
545
546        Self {
547            callback: Self::trampoline::<F>,
548            drop: Self::drop_handler::<F>,
549            closure_pointer: closure_pointer.cast(),
550
551            fiber_id,
552        }
553    }
554
555    /// An ABI-safe wrapper which calls the rust closure stored in `handler`.
556    ///
557    /// The result of the closure is copied onto the fiber's region allocation
558    /// and the pointer to that allocation is written into `output`.
559    extern "C-unwind" fn trampoline<F>(data: *const Self, action: u64, timeout: f64) -> i32
560    where
561        F: FnMut(u64, Duration) -> i32,
562    {
563        // This is safe. To verify see `register_rpc_handler` above.
564        let closure_pointer: *mut F = unsafe { (*data).closure_pointer.cast::<F>() };
565        let closure = unsafe { &mut *closure_pointer };
566
567        closure(action, Duration::from_secs_f64(timeout))
568    }
569
570    extern "C-unwind" fn drop_handler<F>(handler: *mut Self) {
571        unsafe {
572            let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>();
573            let closure = Box::from_raw(closure_pointer);
574            drop(closure);
575
576            if cfg!(debug_assertions) {
577                // Overwrite the pointer with garbage so that we fail loudly is case of a bug
578                (*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
579            }
580        }
581    }
582
583    /// The error is returned via [`BoxError::set_last`].
584    #[inline(always)]
585    pub fn cancel_job(&self) {
586        let rc = (self.callback)(self, CancellationCallbackStatus::JobCancelled as _, 0.0);
587        debug_assert!(rc == 0);
588    }
589
590    /// The error is returned via [`BoxError::set_last`].
591    #[inline(always)]
592    #[allow(clippy::result_unit_err)]
593    pub fn wait_job_finished(&self, timeout: Duration) -> Result<(), ()> {
594        let rc = (self.callback)(
595            self,
596            CancellationCallbackStatus::JobFinished as _,
597            timeout.as_secs_f64(),
598        );
599        if rc == -1 {
600            // Actual error is passed through tarantool. Can't return BoxError
601            // here, because tarantool-module version may be different in picodata.
602            return Err(());
603        }
604
605        Ok(())
606    }
607}