1use crate::internal::ffi;
5#[allow(unused_imports)]
6use crate::plugin::interface::PicoContext;
7use crate::plugin::interface::ServiceId;
8use crate::util::tarantool_error_to_box_error;
9use crate::util::DisplayErrorLocation;
10use std::cell::Cell;
11use std::rc::Rc;
12use std::time::Duration;
13use tarantool::error::BoxError;
14use tarantool::error::TarantoolErrorCode;
15use tarantool::fiber;
16use tarantool::fiber::FiberId;
17use tarantool::fiber::{Channel, RecvError};
18use tarantool::util::IntoClones;
19
20#[track_caller]
22pub fn register_job<F>(service_id: &ServiceId, job: F) -> Result<(), BoxError>
23where
24 F: FnOnce(CancellationToken) + 'static,
25{
26 let loc = std::panic::Location::caller();
27 let tag = format!("{}:{}", loc.file(), loc.line());
28
29 register_tagged_job(service_id, job, &tag)
30}
31
32#[allow(deprecated)]
34pub fn register_tagged_job<F>(service_id: &ServiceId, job: F, tag: &str) -> Result<(), BoxError>
35where
36 F: FnOnce(CancellationToken) + 'static,
37{
38 let (token, handle) = CancellationToken::new();
39 let finish_event = Rc::clone(&handle.finish_event);
40
41 let fiber_id = fiber::Builder::new()
42 .name(tag)
43 .func(move || {
44 job(token);
45 finish_event.signal();
46 })
47 .start_non_joinable()
48 .map_err(tarantool_error_to_box_error)?;
49
50 let plugin = &service_id.plugin;
51 let service = &service_id.service;
52 let version = &service_id.version;
53
54 let token = FfiBackgroundJobCancellationToken::new(
55 fiber_id,
56 handle.cancel_channel,
57 handle.finish_event,
58 );
59 register_background_job_cancellation_token(plugin, service, version, tag, token)?;
60
61 Ok(())
62}
63
64fn register_background_job_cancellation_token(
65 plugin: &str,
66 service: &str,
67 version: &str,
68 job_tag: &str,
69 token: FfiBackgroundJobCancellationToken,
70) -> Result<(), BoxError> {
71 let rc = unsafe {
73 ffi::pico_ffi_background_register_job_cancellation_token(
74 plugin.into(),
75 service.into(),
76 version.into(),
77 job_tag.into(),
78 token,
79 )
80 };
81
82 if rc != 0 {
83 return Err(BoxError::last());
84 }
85
86 Ok(())
87}
88
89#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
95#[repr(C)]
96pub struct JobCancellationResult {
97 pub n_total: u32,
99 pub n_timeouts: u32,
101}
102
103impl JobCancellationResult {
104 #[inline(always)]
105 pub fn new(n_total: u32, n_timeouts: u32) -> Self {
106 Self {
107 n_total,
108 n_timeouts,
109 }
110 }
111}
112
113#[inline(always)]
115pub fn cancel_jobs_by_tag(
116 service_id: &ServiceId,
117 job_tag: &str,
118 timeout: Duration,
119) -> Result<JobCancellationResult, BoxError> {
120 cancel_background_jobs_by_tag_inner(
121 &service_id.plugin,
122 &service_id.service,
123 &service_id.version,
124 job_tag,
125 timeout,
126 )
127}
128
129pub fn cancel_background_jobs_by_tag_inner(
131 plugin: &str,
132 service: &str,
133 version: &str,
134 job_tag: &str,
135 timeout: Duration,
136) -> Result<JobCancellationResult, BoxError> {
137 let mut result = JobCancellationResult::default();
138 let rc = unsafe {
140 ffi::pico_ffi_background_cancel_jobs_by_tag(
141 plugin.into(),
142 service.into(),
143 version.into(),
144 job_tag.into(),
145 timeout.as_secs_f64(),
146 &mut result,
147 )
148 };
149
150 if rc != 0 {
151 return Err(BoxError::last());
152 }
153
154 Ok(result)
155}
156
157#[derive(thiserror::Error, Debug)]
158pub enum Error {
159 #[error("Some of jobs are not fully completed, expected: {0}, completed: {1}")]
160 PartialCompleted(usize, usize),
161 #[error("timeout")]
162 CancellationTimeout,
163}
164
165#[non_exhaustive]
171#[derive(Debug)]
172pub struct CancellationToken {
173 cancel_channel: Channel<()>,
174}
175
176impl CancellationToken {
177 #[allow(deprecated)]
181 pub fn new() -> (CancellationToken, CancellationTokenHandle) {
182 let (cancel_tx, cancel_rx) = Channel::new(1).into_clones();
183 (
184 CancellationToken {
185 cancel_channel: cancel_rx,
186 },
187 CancellationTokenHandle {
188 cancel_channel: cancel_tx,
189 finish_event: Rc::new(OnceEvent::new()),
190 },
191 )
192 }
193
194 pub fn wait_timeout(&self, timeout: Duration) -> Result<(), Error> {
204 match self.cancel_channel.recv_timeout(timeout) {
205 Err(RecvError::Timeout) => Err(Error::CancellationTimeout),
206 _ => Ok(()),
207 }
208 }
209}
210
211#[derive(Debug)]
212#[deprecated = "don't use this"]
213pub struct CancellationTokenHandle {
214 cancel_channel: Channel<()>,
215 finish_event: Rc<OnceEvent>,
216}
217
218#[allow(deprecated)]
219impl CancellationTokenHandle {
220 pub fn cancel(self) -> Rc<OnceEvent> {
224 let Self {
225 cancel_channel,
226 finish_event,
227 } = self;
228 _ = cancel_channel.send(());
229 finish_event
230 }
231}
232
233#[derive(Clone, Debug)]
240#[non_exhaustive]
241pub struct ServiceWorkerManager {
242 service_id: ServiceId,
243}
244
245impl ServiceWorkerManager {
246 #[inline(always)]
247 pub(crate) fn new(service_id: ServiceId) -> Self {
248 Self { service_id }
249 }
250
251 #[track_caller]
283 #[inline(always)]
284 pub fn register_job<F>(&self, job: F) -> tarantool::Result<()>
285 where
286 F: FnOnce(CancellationToken) + 'static,
287 {
288 register_job(&self.service_id, job)?;
289
290 Ok(())
291 }
292
293 #[inline(always)]
301 pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> tarantool::Result<()>
302 where
303 F: FnOnce(CancellationToken) + 'static,
304 {
305 register_tagged_job(&self.service_id, job, tag)?;
306
307 Ok(())
308 }
309
310 pub fn cancel_tagged(&self, tag: &str, timeout: Duration) -> Result<(), Error> {
320 let res = cancel_jobs_by_tag(&self.service_id, tag, timeout);
321 let res = match res {
322 Ok(res) => res,
323 Err(e) => {
324 let loc = DisplayErrorLocation(&e);
325 tarantool::say_error!("unexpected error: {loc}{e}");
326 return Ok(());
327 }
328 };
329
330 if res.n_timeouts != 0 {
331 let n_completed = res.n_total - res.n_timeouts;
332 return Err(Error::PartialCompleted(res.n_total as _, n_completed as _));
333 }
334
335 Ok(())
336 }
337
338 #[deprecated = "use `PicoContext::set_jobs_shutdown_timeout` instead"]
344 pub fn set_shutdown_timeout(&self, timeout: Duration) {
345 let plugin = &self.service_id.plugin;
346 let service = &self.service_id.service;
347 let version = &self.service_id.version;
348 set_jobs_shutdown_timeout(plugin, service, version, timeout)
349 }
350}
351
352pub fn set_jobs_shutdown_timeout(plugin: &str, service: &str, version: &str, timeout: Duration) {
364 let rc = unsafe {
366 ffi::pico_ffi_background_set_jobs_shutdown_timeout(
367 plugin.into(),
368 service.into(),
369 version.into(),
370 timeout.as_secs_f64(),
371 )
372 };
373 debug_assert!(
374 rc == 0,
375 "return code is only for future compatibility at the moment"
376 );
377}
378
379#[derive(Debug)]
386pub struct OnceEvent {
387 finished: Cell<bool>,
388 cond: fiber::Cond,
389}
390
391impl OnceEvent {
392 fn new() -> Self {
393 Self {
394 finished: Cell::new(false),
395 cond: fiber::Cond::new(),
396 }
397 }
398
399 pub fn signal(&self) {
401 self.finished.set(true);
402 self.cond.broadcast();
403 }
404
405 #[inline]
406 pub fn is_finished(&self) -> bool {
407 self.finished.get()
408 }
409
410 pub fn wait_timeout(&self, timeout: Duration) {
411 self.cond.wait_timeout(timeout);
412 }
413}
414
415#[derive(Debug)]
420pub struct CancellationCallbackState {
421 cancel_channel: Channel<()>,
422 finish_event: Rc<OnceEvent>,
423 status: Cell<CancellationCallbackStatus>,
424}
425
426#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
427pub enum CancellationCallbackStatus {
428 #[default]
429 Initial = 0,
430 JobCancelled = 1,
431 JobFinished = 2,
432}
433
434impl CancellationCallbackState {
435 fn new(cancel_channel: Channel<()>, finish_event: Rc<OnceEvent>) -> Self {
436 Self {
437 cancel_channel,
438 finish_event,
439 status: Cell::new(CancellationCallbackStatus::Initial),
440 }
441 }
442
443 fn cancellation_callback(&self, action: u64, timeout: Duration) -> Result<(), BoxError> {
444 use CancellationCallbackStatus::*;
445 let next_status = action;
446
447 if next_status == JobCancelled as u64 {
448 if self.status.get() == JobCancelled || self.status.get() == JobFinished {
450 return Ok(());
451 }
452 debug_assert_eq!(self.status.get(), Initial);
453 _ = self.cancel_channel.send(());
454
455 self.status.set(JobCancelled);
456 } else if next_status == JobFinished as u64 {
457 if self.status.get() != JobCancelled && self.status.get() != JobFinished {
458 return Err(BoxError::new(
459 TarantoolErrorCode::IllegalParams,
460 "cannot wait for job that wasn't cancelled",
461 ));
462 }
463
464 let deadline = fiber::clock().saturating_add(timeout);
465
466 loop {
467 if self.finish_event.is_finished() {
468 self.status.set(JobFinished);
469 return Ok(());
470 }
471
472 let remaining = deadline.duration_since(fiber::clock());
473 if remaining.is_zero() {
474 return Err(BoxError::new(TarantoolErrorCode::Timeout, "timeout"));
475 }
476
477 self.finish_event.wait_timeout(remaining);
478 }
479 } else {
480 return Err(BoxError::new(
481 TarantoolErrorCode::IllegalParams,
482 format!("unexpected action: {action}"),
483 ));
484 }
485
486 Ok(())
487 }
488}
489
490#[repr(C)]
496pub struct FfiBackgroundJobCancellationToken {
497 callback: extern "C-unwind" fn(data: *const Self, action: u64, timeout: f64) -> i32,
501 drop: extern "C-unwind" fn(*mut Self),
502
503 closure_pointer: *mut (),
505
506 pub fiber_id: FiberId,
508}
509
510impl Drop for FfiBackgroundJobCancellationToken {
511 #[inline(always)]
512 fn drop(&mut self) {
513 (self.drop)(self)
514 }
515}
516
517impl FfiBackgroundJobCancellationToken {
518 fn new(fiber_id: FiberId, cancel_channel: Channel<()>, finish_event: Rc<OnceEvent>) -> Self {
519 let callback_state = CancellationCallbackState::new(cancel_channel, finish_event);
520 let callback = move |action, timeout| {
521 let res = callback_state.cancellation_callback(action, timeout);
522 if let Err(e) = res {
523 e.set_last();
524 return -1;
525 }
526
527 0
528 };
529
530 Self::new_inner(fiber_id, callback)
531 }
532
533 fn new_inner<F>(fiber_id: FiberId, f: F) -> Self
540 where
541 F: FnMut(u64, Duration) -> i32,
542 {
543 let closure = Box::new(f);
544 let closure_pointer: *mut F = Box::into_raw(closure);
545
546 Self {
547 callback: Self::trampoline::<F>,
548 drop: Self::drop_handler::<F>,
549 closure_pointer: closure_pointer.cast(),
550
551 fiber_id,
552 }
553 }
554
555 extern "C-unwind" fn trampoline<F>(data: *const Self, action: u64, timeout: f64) -> i32
560 where
561 F: FnMut(u64, Duration) -> i32,
562 {
563 let closure_pointer: *mut F = unsafe { (*data).closure_pointer.cast::<F>() };
565 let closure = unsafe { &mut *closure_pointer };
566
567 closure(action, Duration::from_secs_f64(timeout))
568 }
569
570 extern "C-unwind" fn drop_handler<F>(handler: *mut Self) {
571 unsafe {
572 let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>();
573 let closure = Box::from_raw(closure_pointer);
574 drop(closure);
575
576 if cfg!(debug_assertions) {
577 (*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
579 }
580 }
581 }
582
583 #[inline(always)]
585 pub fn cancel_job(&self) {
586 let rc = (self.callback)(self, CancellationCallbackStatus::JobCancelled as _, 0.0);
587 debug_assert!(rc == 0);
588 }
589
590 #[inline(always)]
592 #[allow(clippy::result_unit_err)]
593 pub fn wait_job_finished(&self, timeout: Duration) -> Result<(), ()> {
594 let rc = (self.callback)(
595 self,
596 CancellationCallbackStatus::JobFinished as _,
597 timeout.as_secs_f64(),
598 );
599 if rc == -1 {
600 return Err(());
603 }
604
605 Ok(())
606 }
607}