1use crate::internal::ffi;
5#[allow(unused_imports)]
6use crate::plugin::interface::PicoContext;
7use crate::plugin::interface::ServiceId;
8use crate::util::tarantool_error_to_box_error;
9use crate::util::DisplayErrorLocation;
10use std::cell::Cell;
11use std::time::Duration;
12use tarantool::error::BoxError;
13use tarantool::error::TarantoolErrorCode;
14use tarantool::fiber;
15use tarantool::fiber::FiberId;
16use tarantool::fiber::{Channel, RecvError};
17use tarantool::util::IntoClones;
18
19#[track_caller]
21pub fn register_job<F>(service_id: &ServiceId, job: F) -> Result<(), BoxError>
22where
23 F: FnOnce(CancellationToken) + 'static,
24{
25 let loc = std::panic::Location::caller();
26 let tag = format!("{}:{}", loc.file(), loc.line());
27
28 register_tagged_job(service_id, job, &tag)
29}
30
31#[allow(deprecated)]
33pub fn register_tagged_job<F>(service_id: &ServiceId, job: F, tag: &str) -> Result<(), BoxError>
34where
35 F: FnOnce(CancellationToken) + 'static,
36{
37 let (token, handle) = CancellationToken::new();
38 let finish_channel = handle.finish_channel.clone();
39
40 let fiber_id = fiber::Builder::new()
41 .name(tag)
42 .func(move || {
43 job(token);
44 _ = finish_channel.send(());
46 })
47 .start_non_joinable()
48 .map_err(tarantool_error_to_box_error)?;
49
50 let plugin = &service_id.plugin;
51 let service = &service_id.service;
52 let version = &service_id.version;
53
54 let token = FfiBackgroundJobCancellationToken::new(
55 fiber_id,
56 handle.cancel_channel,
57 handle.finish_channel,
58 );
59 register_background_job_cancellation_token(plugin, service, version, tag, token)?;
60
61 Ok(())
62}
63
64fn register_background_job_cancellation_token(
65 plugin: &str,
66 service: &str,
67 version: &str,
68 job_tag: &str,
69 token: FfiBackgroundJobCancellationToken,
70) -> Result<(), BoxError> {
71 let rc = unsafe {
73 ffi::pico_ffi_background_register_job_cancellation_token(
74 plugin.into(),
75 service.into(),
76 version.into(),
77 job_tag.into(),
78 token,
79 )
80 };
81
82 if rc != 0 {
83 return Err(BoxError::last());
84 }
85
86 Ok(())
87}
88
89#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
95#[repr(C)]
96pub struct JobCancellationResult {
97 pub n_total: u32,
99 pub n_timeouts: u32,
101}
102
103impl JobCancellationResult {
104 #[inline(always)]
105 pub fn new(n_total: u32, n_timeouts: u32) -> Self {
106 Self {
107 n_total,
108 n_timeouts,
109 }
110 }
111}
112
113#[inline(always)]
115pub fn cancel_jobs_by_tag(
116 service_id: &ServiceId,
117 job_tag: &str,
118 timeout: Duration,
119) -> Result<JobCancellationResult, BoxError> {
120 cancel_background_jobs_by_tag_inner(
121 &service_id.plugin,
122 &service_id.service,
123 &service_id.version,
124 job_tag,
125 timeout,
126 )
127}
128
129pub fn cancel_background_jobs_by_tag_inner(
131 plugin: &str,
132 service: &str,
133 version: &str,
134 job_tag: &str,
135 timeout: Duration,
136) -> Result<JobCancellationResult, BoxError> {
137 let mut result = JobCancellationResult::default();
138 let rc = unsafe {
140 ffi::pico_ffi_background_cancel_jobs_by_tag(
141 plugin.into(),
142 service.into(),
143 version.into(),
144 job_tag.into(),
145 timeout.as_secs_f64(),
146 &mut result,
147 )
148 };
149
150 if rc != 0 {
151 return Err(BoxError::last());
152 }
153
154 Ok(result)
155}
156
157#[derive(thiserror::Error, Debug)]
158pub enum Error {
159 #[error("Some of jobs are not fully completed, expected: {0}, completed: {1}")]
160 PartialCompleted(usize, usize),
161 #[error("timeout")]
162 CancellationTimeout,
163}
164
165#[non_exhaustive]
171#[derive(Debug)]
172pub struct CancellationToken {
173 cancel_channel: Channel<()>,
174}
175
176impl CancellationToken {
177 #[allow(deprecated)]
181 pub fn new() -> (CancellationToken, CancellationTokenHandle) {
182 let (cancel_tx, cancel_rx) = Channel::new(1).into_clones();
183 (
184 CancellationToken {
185 cancel_channel: cancel_rx,
186 },
187 CancellationTokenHandle {
188 cancel_channel: cancel_tx,
189 finish_channel: Channel::new(1),
190 },
191 )
192 }
193
194 pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
204 match self.cancel_channel.recv_timeout(timeout) {
205 Err(RecvError::Timeout) => Err(Error::CancellationTimeout),
206 _ => Ok(()),
207 }
208 }
209}
210
211#[derive(Debug)]
212#[deprecated = "don't use this"]
213pub struct CancellationTokenHandle {
214 cancel_channel: Channel<()>,
215 finish_channel: Channel<()>,
216}
217
218#[allow(deprecated)]
219impl CancellationTokenHandle {
220 pub fn cancel(self) -> Channel<()> {
224 let Self {
225 cancel_channel,
226 finish_channel,
227 } = self;
228 _ = cancel_channel.send(());
229 finish_channel
230 }
231}
232
233#[derive(Clone, Debug)]
240#[non_exhaustive]
241pub struct ServiceWorkerManager {
242 service_id: ServiceId,
243}
244
245impl ServiceWorkerManager {
246 #[inline(always)]
247 pub(crate) fn new(service_id: ServiceId) -> Self {
248 Self { service_id }
249 }
250
251 #[track_caller]
283 #[inline(always)]
284 pub fn register_job<F>(&self, job: F) -> tarantool::Result<()>
285 where
286 F: FnOnce(CancellationToken) + 'static,
287 {
288 register_job(&self.service_id, job)?;
289
290 Ok(())
291 }
292
293 #[inline(always)]
301 pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> tarantool::Result<()>
302 where
303 F: FnOnce(CancellationToken) + 'static,
304 {
305 register_tagged_job(&self.service_id, job, tag)?;
306
307 Ok(())
308 }
309
310 pub fn cancel_tagged(&self, tag: &str, timeout: Duration) -> Result<(), Error> {
320 let res = cancel_jobs_by_tag(&self.service_id, tag, timeout);
321 let res = match res {
322 Ok(res) => res,
323 Err(e) => {
324 let loc = DisplayErrorLocation(&e);
325 tarantool::say_error!("unexpected error: {loc}{e}");
326 return Ok(());
327 }
328 };
329
330 if res.n_timeouts != 0 {
331 let n_completed = res.n_total - res.n_timeouts;
332 return Err(Error::PartialCompleted(res.n_total as _, n_completed as _));
333 }
334
335 Ok(())
336 }
337
338 #[deprecated = "use `PicoContext::set_jobs_shutdown_timeout` instead"]
344 pub fn set_shutdown_timeout(&self, timeout: Duration) {
345 let plugin = &self.service_id.plugin;
346 let service = &self.service_id.service;
347 let version = &self.service_id.version;
348 set_jobs_shutdown_timeout(plugin, service, version, timeout)
349 }
350}
351
352pub fn set_jobs_shutdown_timeout(plugin: &str, service: &str, version: &str, timeout: Duration) {
364 let rc = unsafe {
366 ffi::pico_ffi_background_set_jobs_shutdown_timeout(
367 plugin.into(),
368 service.into(),
369 version.into(),
370 timeout.as_secs_f64(),
371 )
372 };
373 debug_assert!(
374 rc == 0,
375 "return code is only for future compatibility at the moment"
376 );
377}
378
379#[derive(Debug)]
384pub struct CancellationCallbackState {
385 cancel_channel: Channel<()>,
386 finish_channel: Channel<()>,
387 status: Cell<CancellationCallbackStatus>,
388}
389
390#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
391pub enum CancellationCallbackStatus {
392 #[default]
393 Initial = 0,
394 JobCancelled = 1,
395 JobFinished = 2,
396}
397
398impl CancellationCallbackState {
399 fn new(cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
400 Self {
401 cancel_channel,
402 finish_channel,
403 status: Cell::new(CancellationCallbackStatus::Initial),
404 }
405 }
406
407 fn cancellation_callback(&self, action: u64, timeout: Duration) -> Result<(), BoxError> {
408 use CancellationCallbackStatus::*;
409 let next_status = action;
410
411 if next_status == JobCancelled as u64 {
412 debug_assert_eq!(self.status.get(), Initial);
413 _ = self.cancel_channel.send(());
414
415 self.status.set(JobCancelled);
416 } else if next_status == JobFinished as u64 {
417 debug_assert_eq!(self.status.get(), JobCancelled);
418 self.finish_channel.recv_timeout(timeout).map_err(|e| {
419 BoxError::new(TarantoolErrorCode::Timeout, i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(e))
420 })?;
421
422 self.status.set(JobFinished);
423 } else {
424 return Err(BoxError::new(
425 TarantoolErrorCode::IllegalParams,
426 format!("unexpected action: {action}"),
427 ));
428 }
429
430 Ok(())
431 }
432}
433
434#[inline]
435fn i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(
436 e: fiber::channel::RecvError,
437) -> &'static str {
438 match e {
439 fiber::channel::RecvError::Timeout => "timeout",
440 fiber::channel::RecvError::Disconnected => "disconnected",
441 }
442}
443
444#[repr(C)]
450pub struct FfiBackgroundJobCancellationToken {
451 callback: extern "C-unwind" fn(data: *const Self, action: u64, timeout: f64) -> i32,
455 drop: extern "C-unwind" fn(*mut Self),
456
457 closure_pointer: *mut (),
459
460 pub fiber_id: FiberId,
462}
463
464impl Drop for FfiBackgroundJobCancellationToken {
465 #[inline(always)]
466 fn drop(&mut self) {
467 (self.drop)(self)
468 }
469}
470
471impl FfiBackgroundJobCancellationToken {
472 fn new(fiber_id: FiberId, cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
473 let callback_state = CancellationCallbackState::new(cancel_channel, finish_channel);
474 let callback = move |action, timeout| {
475 let res = callback_state.cancellation_callback(action, timeout);
476 if let Err(e) = res {
477 e.set_last();
478 return -1;
479 }
480
481 0
482 };
483
484 Self::new_inner(fiber_id, callback)
485 }
486
487 fn new_inner<F>(fiber_id: FiberId, f: F) -> Self
494 where
495 F: FnMut(u64, Duration) -> i32,
496 {
497 let closure = Box::new(f);
498 let closure_pointer: *mut F = Box::into_raw(closure);
499
500 Self {
501 callback: Self::trampoline::<F>,
502 drop: Self::drop_handler::<F>,
503 closure_pointer: closure_pointer.cast(),
504
505 fiber_id,
506 }
507 }
508
509 extern "C-unwind" fn trampoline<F>(data: *const Self, action: u64, timeout: f64) -> i32
514 where
515 F: FnMut(u64, Duration) -> i32,
516 {
517 let closure_pointer: *mut F = unsafe { (*data).closure_pointer.cast::<F>() };
519 let closure = unsafe { &mut *closure_pointer };
520
521 closure(action, Duration::from_secs_f64(timeout))
522 }
523
524 extern "C-unwind" fn drop_handler<F>(handler: *mut Self) {
525 unsafe {
526 let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>();
527 let closure = Box::from_raw(closure_pointer);
528 drop(closure);
529
530 if cfg!(debug_assertions) {
531 (*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
533 }
534 }
535 }
536
537 #[inline(always)]
539 pub fn cancel_job(&self) {
540 let rc = (self.callback)(self, CancellationCallbackStatus::JobCancelled as _, 0.0);
541 debug_assert!(rc == 0);
542 }
543
544 #[inline(always)]
546 #[allow(clippy::result_unit_err)]
547 pub fn wait_job_finished(&self, timeout: Duration) -> Result<(), ()> {
548 let rc = (self.callback)(
549 self,
550 CancellationCallbackStatus::JobFinished as _,
551 timeout.as_secs_f64(),
552 );
553 if rc == -1 {
554 return Err(());
557 }
558
559 Ok(())
560 }
561}