use crate::internal::ffi;
#[allow(unused_imports)]
use crate::plugin::interface::PicoContext;
use crate::plugin::interface::ServiceId;
use crate::util::tarantool_error_to_box_error;
use crate::util::DisplayErrorLocation;
use std::cell::Cell;
use std::time::Duration;
use tarantool::error::BoxError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::fiber::FiberId;
use tarantool::fiber::{Channel, RecvError};
use tarantool::util::IntoClones;
#[track_caller]
pub fn register_job<F>(service_id: &ServiceId, job: F) -> Result<(), BoxError>
where
F: FnOnce(CancellationToken) + 'static,
{
let loc = std::panic::Location::caller();
let tag = format!("{}:{}", loc.file(), loc.line());
register_tagged_job(service_id, job, &tag)
}
#[allow(deprecated)]
pub fn register_tagged_job<F>(service_id: &ServiceId, job: F, tag: &str) -> Result<(), BoxError>
where
F: FnOnce(CancellationToken) + 'static,
{
let (token, handle) = CancellationToken::new();
let finish_channel = handle.finish_channel.clone();
let fiber_id = fiber::Builder::new()
.name(tag)
.func(move || {
job(token);
_ = finish_channel.send(());
})
.start_non_joinable()
.map_err(tarantool_error_to_box_error)?;
let plugin = &service_id.plugin;
let service = &service_id.service;
let version = &service_id.version;
let token = FfiBackgroundJobCancellationToken::new(
fiber_id,
handle.cancel_channel,
handle.finish_channel,
);
register_background_job_cancellation_token(plugin, service, version, tag, token)?;
Ok(())
}
fn register_background_job_cancellation_token(
plugin: &str,
service: &str,
version: &str,
job_tag: &str,
token: FfiBackgroundJobCancellationToken,
) -> Result<(), BoxError> {
let rc = unsafe {
ffi::pico_ffi_background_register_job_cancellation_token(
plugin.into(),
service.into(),
version.into(),
job_tag.into(),
token,
)
};
if rc != 0 {
return Err(BoxError::last());
}
Ok(())
}
#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
#[repr(C)]
pub struct JobCancellationResult {
pub n_total: u32,
pub n_timeouts: u32,
}
impl JobCancellationResult {
#[inline(always)]
pub fn new(n_total: u32, n_timeouts: u32) -> Self {
Self {
n_total,
n_timeouts,
}
}
}
#[inline(always)]
pub fn cancel_jobs_by_tag(
service_id: &ServiceId,
job_tag: &str,
timeout: Duration,
) -> Result<JobCancellationResult, BoxError> {
cancel_background_jobs_by_tag_inner(
&service_id.plugin,
&service_id.service,
&service_id.version,
job_tag,
timeout,
)
}
pub fn cancel_background_jobs_by_tag_inner(
plugin: &str,
service: &str,
version: &str,
job_tag: &str,
timeout: Duration,
) -> Result<JobCancellationResult, BoxError> {
let mut result = JobCancellationResult::default();
let rc = unsafe {
ffi::pico_ffi_background_cancel_jobs_by_tag(
plugin.into(),
service.into(),
version.into(),
job_tag.into(),
timeout.as_secs_f64(),
&mut result,
)
};
if rc != 0 {
return Err(BoxError::last());
}
Ok(result)
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Some of jobs are not fully completed, expected: {0}, completed: {1}")]
PartialCompleted(usize, usize),
#[error("timeout")]
CancellationTimeout,
}
#[non_exhaustive]
#[derive(Debug)]
pub struct CancellationToken {
cancel_channel: Channel<()>,
}
impl CancellationToken {
#[allow(deprecated)]
pub fn new() -> (CancellationToken, CancellationTokenHandle) {
let (cancel_tx, cancel_rx) = Channel::new(1).into_clones();
(
CancellationToken {
cancel_channel: cancel_rx,
},
CancellationTokenHandle {
cancel_channel: cancel_tx,
finish_channel: Channel::new(1),
},
)
}
pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
match self.cancel_channel.recv_timeout(timeout) {
Err(RecvError::Timeout) => Err(Error::CancellationTimeout),
_ => Ok(()),
}
}
}
#[derive(Debug)]
#[deprecated = "don't use this"]
pub struct CancellationTokenHandle {
cancel_channel: Channel<()>,
finish_channel: Channel<()>,
}
#[allow(deprecated)]
impl CancellationTokenHandle {
pub fn cancel(self) -> Channel<()> {
let Self {
cancel_channel,
finish_channel,
} = self;
_ = cancel_channel.send(());
finish_channel
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ServiceWorkerManager {
service_id: ServiceId,
}
impl ServiceWorkerManager {
#[inline(always)]
pub(crate) fn new(service_id: ServiceId) -> Self {
Self { service_id }
}
#[track_caller]
#[inline(always)]
pub fn register_job<F>(&self, job: F) -> tarantool::Result<()>
where
F: FnOnce(CancellationToken) + 'static,
{
register_job(&self.service_id, job)?;
Ok(())
}
#[inline(always)]
pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> tarantool::Result<()>
where
F: FnOnce(CancellationToken) + 'static,
{
register_tagged_job(&self.service_id, job, tag)?;
Ok(())
}
pub fn cancel_tagged(&self, tag: &str, timeout: Duration) -> Result<(), Error> {
let res = cancel_jobs_by_tag(&self.service_id, tag, timeout);
let res = match res {
Ok(res) => res,
Err(e) => {
let loc = DisplayErrorLocation(&e);
tarantool::say_error!("unexpected error: {loc}{e}");
return Ok(());
}
};
if res.n_timeouts != 0 {
let n_completed = res.n_total - res.n_timeouts;
return Err(Error::PartialCompleted(res.n_total as _, n_completed as _));
}
Ok(())
}
#[deprecated = "use `PicoContext::set_jobs_shutdown_timeout` instead"]
pub fn set_shutdown_timeout(&self, timeout: Duration) {
let plugin = &self.service_id.plugin;
let service = &self.service_id.service;
let version = &self.service_id.version;
set_jobs_shutdown_timeout(plugin, service, version, timeout)
}
}
pub fn set_jobs_shutdown_timeout(plugin: &str, service: &str, version: &str, timeout: Duration) {
let rc = unsafe {
ffi::pico_ffi_background_set_jobs_shutdown_timeout(
plugin.into(),
service.into(),
version.into(),
timeout.as_secs_f64(),
)
};
debug_assert!(
rc == 0,
"return code is only for future compatibility at the moment"
);
}
#[derive(Debug)]
pub struct CancellationCallbackState {
cancel_channel: Channel<()>,
finish_channel: Channel<()>,
status: Cell<CancellationCallbackStatus>,
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
pub enum CancellationCallbackStatus {
#[default]
Initial = 0,
JobCancelled = 1,
JobFinished = 2,
}
impl CancellationCallbackState {
fn new(cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
Self {
cancel_channel,
finish_channel,
status: Cell::new(CancellationCallbackStatus::Initial),
}
}
fn cancellation_callback(&self, action: u64, timeout: Duration) -> Result<(), BoxError> {
use CancellationCallbackStatus::*;
let next_status = action;
if next_status == JobCancelled as u64 {
debug_assert_eq!(self.status.get(), Initial);
_ = self.cancel_channel.send(());
self.status.set(JobCancelled);
} else if next_status == JobFinished as u64 {
debug_assert_eq!(self.status.get(), JobCancelled);
self.finish_channel.recv_timeout(timeout).map_err(|e| {
BoxError::new(TarantoolErrorCode::Timeout, i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(e))
})?;
self.status.set(JobFinished);
} else {
return Err(BoxError::new(
TarantoolErrorCode::IllegalParams,
format!("unexpected action: {action}"),
));
}
Ok(())
}
}
#[inline]
fn i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(
e: fiber::channel::RecvError,
) -> &'static str {
match e {
fiber::channel::RecvError::Timeout => "timeout",
fiber::channel::RecvError::Disconnected => "disconnected",
}
}
#[repr(C)]
pub struct FfiBackgroundJobCancellationToken {
callback: extern "C-unwind" fn(data: *const Self, action: u64, timeout: f64) -> i32,
drop: extern "C-unwind" fn(*mut Self),
closure_pointer: *mut (),
pub fiber_id: FiberId,
}
impl Drop for FfiBackgroundJobCancellationToken {
#[inline(always)]
fn drop(&mut self) {
(self.drop)(self)
}
}
impl FfiBackgroundJobCancellationToken {
fn new(fiber_id: FiberId, cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
let callback_state = CancellationCallbackState::new(cancel_channel, finish_channel);
let callback = move |action, timeout| {
let res = callback_state.cancellation_callback(action, timeout);
if let Err(e) = res {
e.set_last();
return -1;
}
0
};
Self::new_inner(fiber_id, callback)
}
fn new_inner<F>(fiber_id: FiberId, f: F) -> Self
where
F: FnMut(u64, Duration) -> i32,
{
let closure = Box::new(f);
let closure_pointer: *mut F = Box::into_raw(closure);
Self {
callback: Self::trampoline::<F>,
drop: Self::drop_handler::<F>,
closure_pointer: closure_pointer.cast(),
fiber_id,
}
}
extern "C-unwind" fn trampoline<F>(data: *const Self, action: u64, timeout: f64) -> i32
where
F: FnMut(u64, Duration) -> i32,
{
let closure_pointer: *mut F = unsafe { (*data).closure_pointer.cast::<F>() };
let closure = unsafe { &mut *closure_pointer };
closure(action, Duration::from_secs_f64(timeout))
}
extern "C-unwind" fn drop_handler<F>(handler: *mut Self) {
unsafe {
let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>();
let closure = Box::from_raw(closure_pointer);
drop(closure);
if cfg!(debug_assertions) {
(*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
}
}
}
#[inline(always)]
pub fn cancel_job(&self) {
let rc = (self.callback)(self, CancellationCallbackStatus::JobCancelled as _, 0.0);
debug_assert!(rc == 0);
}
#[inline(always)]
#[allow(clippy::result_unit_err)]
pub fn wait_job_finished(&self, timeout: Duration) -> Result<(), ()> {
let rc = (self.callback)(
self,
CancellationCallbackStatus::JobFinished as _,
timeout.as_secs_f64(),
);
if rc == -1 {
return Err(());
}
Ok(())
}
}