tarantool_module/
fiber.rs

1//! Сooperative multitasking module
2//!
3//! With the fiber module, you can:
4//! - create, run and manage [fibers](struct.Fiber.html),
5//! - use a synchronization mechanism for fibers, similar to “condition variables” and similar to operating-system
6//! functions such as `pthread_cond_wait()` plus `pthread_cond_signal()`.
7//!
8//! See also:
9//! - [Threads, fibers and yields](https://www.tarantool.io/en/doc/latest/book/box/atomic/#threads-fibers-and-yields)
10//! - [Lua reference: Module fiber](https://www.tarantool.io/en/doc/latest/reference/reference_lua/fiber/)
11//! - [C API reference: Module fiber](https://www.tarantool.io/en/doc/latest/dev_guide/reference_capi/fiber/)
12use std::ffi::CString;
13use std::marker::PhantomData;
14use std::os::raw::c_void;
15
16use va_list::VaList;
17
18use crate::error::{Error, TarantoolError};
19
20/// A fiber is a set of instructions which are executed with cooperative multitasking.
21///
22/// Fibers managed by the fiber module are associated with a user-supplied function called the fiber function.
23///
24/// A fiber has three possible states: **running**, **suspended** or **dead**.
25/// When a fiber is started with [fiber.start()](struct.Fiber.html#method.start), it is **running**.
26/// When a fiber is created with [Fiber::new()](struct.Fiber.html#method.new) (and has not been started yet) or yields control
27/// with [sleep()](fn.sleep.html), it is **suspended**.
28/// When a fiber ends (because the fiber function ends), it is **dead**.
29///
30/// A runaway fiber can be stopped with [fiber.cancel()](struct.Fiber.html#method.cancel).
31/// However, [fiber.cancel()](struct.Fiber.html#method.cancel) is advisory — it works only if the runaway fiber calls
32/// [is_cancelled()](fn.is_cancelled.html) occasionally. In practice, a runaway fiber can only become unresponsive if it
33/// does many computations and does not check whether it has been cancelled.
34///
35/// The other potential problem comes from fibers which never get scheduled, because they are not subscribed to any events,
36/// or because no relevant events occur. Such morphing fibers can be killed with [fiber.cancel()](struct.Fiber.html#method.cancel)
37/// at any time, since [fiber.cancel()](struct.Fiber.html#method.cancel) sends an asynchronous wakeup event to the fiber,
38/// and [is_cancelled()](fn.is_cancelled.html) is checked whenever such a wakeup event occurs.
39///
40/// Example:
41/// ```rust
42/// use tarantool_module::fiber::Fiber;
43/// let mut fiber = Fiber::new("test_fiber", &mut |_| {
44///     println!("I'm a fiber");
45///     0
46/// });
47/// fiber.start(());
48/// println!("Fiber started")
49/// ```
50///
51/// ```text
52/// I'm a fiber
53/// Fiber started
54/// ```
55pub struct Fiber<'a, T: 'a> {
56    inner: *mut ffi::Fiber,
57    callback: *mut c_void,
58    phantom: PhantomData<&'a T>,
59}
60
61impl<'a, T> Fiber<'a, T> {
62    /// Create a new fiber.
63    ///
64    /// Takes a fiber from fiber cache, if it's not empty. Can fail only if there is not enough memory for
65    /// the fiber structure or fiber stack.
66    ///
67    /// The created fiber automatically returns itself to the fiber cache when its `main` function
68    /// completes. The initial fiber state is **suspended**.
69    ///
70    /// Ordinarily [Fiber::new()](#method.new) is used in conjunction with [fiber.set_joinable()](#method.set_joinable)
71    /// and [fiber.join()](#method.join)
72    ///
73    /// - `name` - string with fiber name
74    /// - `callback` - function for run inside fiber
75    ///
76    /// See also: [fiber.start()](#method.start)
77    pub fn new<F>(name: &str, callback: &mut F) -> Self
78    where
79        F: FnMut(Box<T>) -> i32,
80    {
81        let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
82        Self {
83            inner: unsafe { ffi::fiber_new(CString::new(name).unwrap().as_ptr(), trampoline) },
84            callback: callback_ptr,
85            phantom: PhantomData,
86        }
87    }
88
89    /// Create a new fiber with defined attributes.
90    ///
91    /// Can fail only if there is not enough memory for the fiber structure or fiber stack.
92    ///
93    /// The created fiber automatically returns itself to the fiber cache if has default stack size
94    /// when its `main` function completes. The initial fiber state is **suspended**.
95    ///
96    /// - `name` - string with fiber name
97    /// - `fiber_attr` - fiber attributes
98    /// - `callback` - function for run inside fiber
99    ///
100    /// See also: [fiber.start()](#method.start)
101    pub fn new_with_attr<F>(name: &str, attr: &FiberAttr, callback: &mut F) -> Self
102    where
103        F: FnMut(Box<T>) -> i32,
104    {
105        let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
106        Self {
107            inner: unsafe {
108                ffi::fiber_new_ex(CString::new(name).unwrap().as_ptr(), attr.inner, trampoline)
109            },
110            callback: callback_ptr,
111            phantom: PhantomData,
112        }
113    }
114
115    /// Start execution of created fiber.
116    ///
117    /// - `arg` - argument to start the fiber with
118    ///
119    /// See also: [fiber.new()](#method.new)
120    pub fn start(&mut self, arg: T) {
121        unsafe {
122            ffi::fiber_start(self.inner, self.callback, Box::into_raw(Box::<T>::new(arg)));
123        }
124    }
125
126    /// Interrupt a synchronous wait of a fiber.
127    pub fn wakeup(&self) {
128        unsafe { ffi::fiber_wakeup(self.inner) }
129    }
130
131    /// Wait until the fiber is dead and then move its execution status to the caller.
132    ///
133    /// “Join” a joinable fiber. That is, let the fiber’s function run and wait until the fiber’s status is **dead**
134    /// (normally a status becomes **dead** when the function execution finishes). Joining will cause a yield,
135    /// therefore, if the fiber is currently in a **suspended** state, execution of its fiber function will resume.
136    ///
137    /// This kind of waiting is more convenient than going into a loop and periodically checking the status;
138    /// however, it works only if the fiber was created with [fiber.new()](#method.new) and was made joinable with
139    /// [fiber.set_joinable()](#method.set_joinable).
140    ///
141    /// The fiber must not be detached (See also: [fiber.set_joinable()](#method.set_joinable)).
142    ///
143    /// Return: fiber function return code
144    pub fn join(&self) -> i32 {
145        unsafe { ffi::fiber_join(self.inner) }
146    }
147
148    /// Set fiber to be joinable (false by default).
149    ///
150    /// - `is_joinable` - status to set
151    pub fn set_joinable(&mut self, is_joinable: bool) {
152        unsafe { ffi::fiber_set_joinable(self.inner, is_joinable) }
153    }
154
155    /// Cancel a fiber. (set `FIBER_IS_CANCELLED` flag)
156    ///
157    /// Running and suspended fibers can be cancelled. After a fiber has been cancelled, attempts to operate on it will
158    /// cause error: the fiber is dead. But a dead fiber can still report its id and status.
159    /// Possible errors: cancel is not permitted for the specified fiber object.
160    ///
161    /// If target fiber's flag `FIBER_IS_CANCELLABLE` set, then it would be woken up (maybe prematurely).
162    /// Then current fiber yields until the target fiber is dead (or is woken up by
163    /// [fiber.wakeup()](#method.wakeup)).
164    pub fn cancel(&mut self) {
165        unsafe { ffi::fiber_cancel(self.inner) }
166    }
167}
168
169/// Make it possible or not possible to wakeup the current
170/// fiber immediately when it's cancelled.
171///
172/// - `is_cancellable` - status to set
173///
174/// Returns previous state.
175pub fn set_cancellable(is_cancellable: bool) -> bool {
176    unsafe { ffi::fiber_set_cancellable(is_cancellable) }
177}
178
179/// Check current fiber for cancellation (it must be checked manually).
180pub fn is_cancelled() -> bool {
181    unsafe { ffi::fiber_is_cancelled() }
182}
183
184/// Put the current fiber to sleep for at least `time` seconds.
185///
186/// Yield control to the scheduler and sleep for the specified number of seconds.
187/// Only the current fiber can be made to sleep.
188///
189/// - `time` - time to sleep
190///
191/// > **Note:** this is a cancellation point (See also: [is_cancelled()](fn.is_cancelled.html))
192pub fn sleep(time: f64) {
193    unsafe { ffi::fiber_sleep(time) }
194}
195
196/// Report loop begin time as double (cheap).
197pub fn time() -> f64 {
198    unsafe { ffi::fiber_time() }
199}
200
201/// Report loop begin time as 64-bit int.
202pub fn time64() -> u64 {
203    unsafe { ffi::fiber_time64() }
204}
205
206/// Report loop begin time as double (cheap). Uses monotonic clock.
207pub fn clock() -> f64 {
208    unsafe { ffi::fiber_clock() }
209}
210
211/// Report loop begin time as 64-bit int. Uses monotonic clock.
212pub fn clock64() -> u64 {
213    unsafe { ffi::fiber_clock64() }
214}
215
216/// Yield control to the scheduler.
217///
218/// Return control to another fiber and wait until it'll be woken. Equivalent to `fiber.sleep(0)`.
219///
220/// See also: [Fiber::wakeup()](struct.Fiber.html#method.wakeup)
221pub fn fiber_yield() {
222    unsafe { ffi::fiber_yield() }
223}
224
225/// Reschedule fiber to end of event loop cycle.
226pub fn reschedule() {
227    unsafe { ffi::fiber_reschedule() }
228}
229
230/// Fiber attributes container
231pub struct FiberAttr {
232    inner: *mut ffi::FiberAttr,
233}
234
235impl FiberAttr {
236    /// Create a new fiber attribute container and initialize it with default parameters.
237    /// Can be used for many fibers creation, corresponding fibers will not take ownership.
238    ///
239    /// This is safe to drop `FiberAttr` value when fibers created with this attribute still exist.
240    pub fn new() -> Self {
241        FiberAttr {
242            inner: unsafe { ffi::fiber_attr_new() },
243        }
244    }
245
246    /// Get stack size from the fiber attribute.
247    ///
248    /// Returns: stack size
249    pub fn stack_size(&self) -> usize {
250        unsafe { ffi::fiber_attr_getstacksize(self.inner) }
251    }
252
253    ///Set stack size for the fiber attribute.
254    ///
255    /// - `stack_size` - stack size for new fibers
256    pub fn set_stack_size(&mut self, stack_size: usize) -> Result<(), Error> {
257        if unsafe { ffi::fiber_attr_setstacksize(self.inner, stack_size) } < 0 {
258            Err(TarantoolError::last().into())
259        } else {
260            Ok(())
261        }
262    }
263}
264
265impl Drop for FiberAttr {
266    fn drop(&mut self) {
267        unsafe { ffi::fiber_attr_delete(self.inner) }
268    }
269}
270
271/// Conditional variable for cooperative multitasking (fibers).
272///
273/// A cond (short for "condition variable") is a synchronization primitive
274/// that allow fibers to yield until some predicate is satisfied. Fiber
275/// conditions have two basic operations - `wait()` and `signal()`. [cond.wait()](#method.wait)
276/// suspends execution of fiber (i.e. yields) until [cond.signal()](#method.signal) is called.
277///
278/// Example:
279///
280/// ```rust
281/// use tarantool_module::fiber::Cond;
282/// let cond = fiber.cond();
283/// cond.wait();
284/// ```
285///
286/// The job will hang because [cond.wait()](#method.wait) – will go to sleep until the condition variable changes.
287///
288/// ```rust
289/// // Call from another fiber:
290/// cond.signal();
291/// ```
292///
293/// The waiting stopped, and the [cond.wait()](#method.wait) function returned true.
294///
295/// This example depended on the use of a global conditional variable with the arbitrary name cond.
296/// In real life, programmers would make sure to use different conditional variable names for different applications.
297///
298/// Unlike `pthread_cond`, [Cond]() doesn't require mutex/latch wrapping.
299pub struct Cond {
300    inner: *mut ffi::FiberCond,
301}
302
303/// - call [Cond::new()](#method.new) to create a named condition variable, which will be called `cond` for examples in this section.
304/// - call [cond.wait()](#method.wait) to make a fiber wait for a signal via a condition variable.
305/// - call [cond.signal()](#method.signal) to send a signal to wake up a single fiber that has executed [cond.wait()](#method.wait).
306/// - call [cond.broadcast()](#method.broadcast) to send a signal to all fibers that have executed [cond.wait()](#method.wait).
307impl Cond {
308    /// Instantiate a new fiber cond object.
309    pub fn new() -> Self {
310        Cond {
311            inner: unsafe { ffi::fiber_cond_new() },
312        }
313    }
314
315    /// Wake one fiber waiting for the cond.
316    /// Does nothing if no one is waiting. Does not yield.
317    pub fn signal(&self) {
318        unsafe { ffi::fiber_cond_signal(self.inner) }
319    }
320
321    /// Wake up all fibers waiting for the cond.
322    /// Does not yield.
323    pub fn broadcast(&self) {
324        unsafe { ffi::fiber_cond_broadcast(self.inner) }
325    }
326
327    /// Suspend the execution of the current fiber (i.e. yield) until [signal()](#method.signal) is called.
328    ///
329    /// Like pthread_cond, FiberCond can issue spurious wake ups caused by explicit
330    /// [Fiber::wakeup()](struct.Fiber.html#method.wakeup) or [Fiber::cancel()](struct.Fiber.html#method.cancel)
331    /// calls. It is highly recommended to wrap calls to this function into a loop
332    /// and check an actual predicate and `fiber_testcancel()` on every iteration.
333    ///
334    /// - `timeout` - timeout in seconds
335    ///
336    /// Returns:
337    /// - `true` on [signal()](#method.signal) call or a spurious wake up.
338    /// - `false` on timeout, diag is set to `TimedOut`
339    pub fn wait_timeout(&self, timeout: f64) -> bool {
340        !(unsafe { ffi::fiber_cond_wait_timeout(self.inner, timeout) } < 0)
341    }
342
343    /// Shortcut for [wait_timeout()](#method.wait_timeout).
344    pub fn wait(&self) -> bool {
345        !(unsafe { ffi::fiber_cond_wait(self.inner) } < 0)
346    }
347}
348
349impl Drop for Cond {
350    fn drop(&mut self) {
351        unsafe { ffi::fiber_cond_delete(self.inner) }
352    }
353}
354
355/// A lock for cooperative multitasking environment
356pub struct Latch {
357    inner: *mut ffi::Latch,
358}
359
360impl Latch {
361    /// Allocate and initialize the new latch.
362    pub fn new() -> Self {
363        Latch {
364            inner: unsafe { ffi::box_latch_new() },
365        }
366    }
367
368    /// Lock a latch. Waits indefinitely until the current fiber can gain access to the latch.
369    pub fn lock(&self) -> LatchGuard {
370        unsafe { ffi::box_latch_lock(self.inner) };
371        LatchGuard { latch: self }
372    }
373
374    /// Try to lock a latch. Return immediately if the latch is locked.
375    ///
376    /// Returns:
377    /// - `Some` - success
378    /// - `None` - the latch is locked.
379    pub fn try_lock(&self) -> Option<LatchGuard> {
380        if unsafe { ffi::box_latch_trylock(self.inner) } == 0 {
381            Some(LatchGuard { latch: self })
382        } else {
383            None
384        }
385    }
386}
387
388impl Drop for Latch {
389    fn drop(&mut self) {
390        unsafe { ffi::box_latch_delete(self.inner) }
391    }
392}
393
394/// An RAII implementation of a "scoped lock" of a latch. When this structure is dropped (falls out of scope),
395/// the lock will be unlocked.
396pub struct LatchGuard<'a> {
397    latch: &'a Latch,
398}
399
400impl<'a> Drop for LatchGuard<'a> {
401    fn drop(&mut self) {
402        unsafe { ffi::box_latch_unlock(self.latch.inner) }
403    }
404}
405
406pub(crate) unsafe fn unpack_callback<F, T>(callback: &mut F) -> (*mut c_void, ffi::FiberFunc)
407where
408    F: FnMut(Box<T>) -> i32,
409{
410    unsafe extern "C" fn trampoline<F, T>(mut args: VaList) -> i32
411    where
412        F: FnMut(Box<T>) -> i32,
413    {
414        let closure: &mut F = &mut *(args.get::<*const c_void>() as *mut F);
415        let arg = Box::from_raw(args.get::<*const c_void>() as *mut T);
416        (*closure)(arg)
417    }
418    (callback as *mut F as *mut c_void, Some(trampoline::<F, T>))
419}
420
421mod ffi {
422    use std::os::raw::{c_char, c_int};
423
424    use va_list::VaList;
425
426    #[repr(C)]
427    pub struct Fiber {
428        _unused: [u8; 0],
429    }
430
431    pub type FiberFunc = Option<unsafe extern "C" fn(VaList) -> c_int>;
432
433    extern "C" {
434        pub fn fiber_new(name: *const c_char, f: FiberFunc) -> *mut Fiber;
435        pub fn fiber_new_ex(
436            name: *const c_char,
437            fiber_attr: *const FiberAttr,
438            f: FiberFunc,
439        ) -> *mut Fiber;
440        pub fn fiber_yield();
441        pub fn fiber_start(callee: *mut Fiber, ...);
442        pub fn fiber_wakeup(f: *mut Fiber);
443        pub fn fiber_cancel(f: *mut Fiber);
444        pub fn fiber_set_cancellable(yesno: bool) -> bool;
445        pub fn fiber_set_joinable(fiber: *mut Fiber, yesno: bool);
446        pub fn fiber_join(f: *mut Fiber) -> c_int;
447        pub fn fiber_sleep(s: f64);
448        pub fn fiber_is_cancelled() -> bool;
449        pub fn fiber_time() -> f64;
450        pub fn fiber_time64() -> u64;
451        pub fn fiber_clock() -> f64;
452        pub fn fiber_clock64() -> u64;
453        pub fn fiber_reschedule();
454    }
455
456    #[repr(C)]
457    pub struct FiberAttr {
458        _unused: [u8; 0],
459    }
460
461    extern "C" {
462        pub fn fiber_attr_new() -> *mut FiberAttr;
463        pub fn fiber_attr_delete(fiber_attr: *mut FiberAttr);
464        pub fn fiber_attr_setstacksize(fiber_attr: *mut FiberAttr, stack_size: usize) -> c_int;
465        pub fn fiber_attr_getstacksize(fiber_attr: *mut FiberAttr) -> usize;
466    }
467
468    #[repr(C)]
469    pub struct FiberCond {
470        _unused: [u8; 0],
471    }
472
473    extern "C" {
474        pub fn fiber_cond_new() -> *mut FiberCond;
475        pub fn fiber_cond_delete(cond: *mut FiberCond);
476        pub fn fiber_cond_signal(cond: *mut FiberCond);
477        pub fn fiber_cond_broadcast(cond: *mut FiberCond);
478        pub fn fiber_cond_wait_timeout(cond: *mut FiberCond, timeout: f64) -> c_int;
479        pub fn fiber_cond_wait(cond: *mut FiberCond) -> c_int;
480    }
481
482    #[repr(C)]
483    pub struct Latch {
484        _unused: [u8; 0],
485    }
486
487    extern "C" {
488        pub fn box_latch_new() -> *mut Latch;
489        pub fn box_latch_delete(latch: *mut Latch);
490        pub fn box_latch_lock(latch: *mut Latch);
491        pub fn box_latch_trylock(latch: *mut Latch) -> c_int;
492        pub fn box_latch_unlock(latch: *mut Latch);
493    }
494}