tarantool_module/fiber.rs
1//! Сooperative multitasking module
2//!
3//! With the fiber module, you can:
4//! - create, run and manage [fibers](struct.Fiber.html),
5//! - use a synchronization mechanism for fibers, similar to “condition variables” and similar to operating-system
6//! functions such as `pthread_cond_wait()` plus `pthread_cond_signal()`.
7//!
8//! See also:
9//! - [Threads, fibers and yields](https://www.tarantool.io/en/doc/latest/book/box/atomic/#threads-fibers-and-yields)
10//! - [Lua reference: Module fiber](https://www.tarantool.io/en/doc/latest/reference/reference_lua/fiber/)
11//! - [C API reference: Module fiber](https://www.tarantool.io/en/doc/latest/dev_guide/reference_capi/fiber/)
12use std::ffi::CString;
13use std::marker::PhantomData;
14use std::os::raw::c_void;
15
16use va_list::VaList;
17
18use crate::error::{Error, TarantoolError};
19
20/// A fiber is a set of instructions which are executed with cooperative multitasking.
21///
22/// Fibers managed by the fiber module are associated with a user-supplied function called the fiber function.
23///
24/// A fiber has three possible states: **running**, **suspended** or **dead**.
25/// When a fiber is started with [fiber.start()](struct.Fiber.html#method.start), it is **running**.
26/// When a fiber is created with [Fiber::new()](struct.Fiber.html#method.new) (and has not been started yet) or yields control
27/// with [sleep()](fn.sleep.html), it is **suspended**.
28/// When a fiber ends (because the fiber function ends), it is **dead**.
29///
30/// A runaway fiber can be stopped with [fiber.cancel()](struct.Fiber.html#method.cancel).
31/// However, [fiber.cancel()](struct.Fiber.html#method.cancel) is advisory — it works only if the runaway fiber calls
32/// [is_cancelled()](fn.is_cancelled.html) occasionally. In practice, a runaway fiber can only become unresponsive if it
33/// does many computations and does not check whether it has been cancelled.
34///
35/// The other potential problem comes from fibers which never get scheduled, because they are not subscribed to any events,
36/// or because no relevant events occur. Such morphing fibers can be killed with [fiber.cancel()](struct.Fiber.html#method.cancel)
37/// at any time, since [fiber.cancel()](struct.Fiber.html#method.cancel) sends an asynchronous wakeup event to the fiber,
38/// and [is_cancelled()](fn.is_cancelled.html) is checked whenever such a wakeup event occurs.
39///
40/// Example:
41/// ```rust
42/// use tarantool_module::fiber::Fiber;
43/// let mut fiber = Fiber::new("test_fiber", &mut |_| {
44/// println!("I'm a fiber");
45/// 0
46/// });
47/// fiber.start(());
48/// println!("Fiber started")
49/// ```
50///
51/// ```text
52/// I'm a fiber
53/// Fiber started
54/// ```
55pub struct Fiber<'a, T: 'a> {
56 inner: *mut ffi::Fiber,
57 callback: *mut c_void,
58 phantom: PhantomData<&'a T>,
59}
60
61impl<'a, T> Fiber<'a, T> {
62 /// Create a new fiber.
63 ///
64 /// Takes a fiber from fiber cache, if it's not empty. Can fail only if there is not enough memory for
65 /// the fiber structure or fiber stack.
66 ///
67 /// The created fiber automatically returns itself to the fiber cache when its `main` function
68 /// completes. The initial fiber state is **suspended**.
69 ///
70 /// Ordinarily [Fiber::new()](#method.new) is used in conjunction with [fiber.set_joinable()](#method.set_joinable)
71 /// and [fiber.join()](#method.join)
72 ///
73 /// - `name` - string with fiber name
74 /// - `callback` - function for run inside fiber
75 ///
76 /// See also: [fiber.start()](#method.start)
77 pub fn new<F>(name: &str, callback: &mut F) -> Self
78 where
79 F: FnMut(Box<T>) -> i32,
80 {
81 let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
82 Self {
83 inner: unsafe { ffi::fiber_new(CString::new(name).unwrap().as_ptr(), trampoline) },
84 callback: callback_ptr,
85 phantom: PhantomData,
86 }
87 }
88
89 /// Create a new fiber with defined attributes.
90 ///
91 /// Can fail only if there is not enough memory for the fiber structure or fiber stack.
92 ///
93 /// The created fiber automatically returns itself to the fiber cache if has default stack size
94 /// when its `main` function completes. The initial fiber state is **suspended**.
95 ///
96 /// - `name` - string with fiber name
97 /// - `fiber_attr` - fiber attributes
98 /// - `callback` - function for run inside fiber
99 ///
100 /// See also: [fiber.start()](#method.start)
101 pub fn new_with_attr<F>(name: &str, attr: &FiberAttr, callback: &mut F) -> Self
102 where
103 F: FnMut(Box<T>) -> i32,
104 {
105 let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
106 Self {
107 inner: unsafe {
108 ffi::fiber_new_ex(CString::new(name).unwrap().as_ptr(), attr.inner, trampoline)
109 },
110 callback: callback_ptr,
111 phantom: PhantomData,
112 }
113 }
114
115 /// Start execution of created fiber.
116 ///
117 /// - `arg` - argument to start the fiber with
118 ///
119 /// See also: [fiber.new()](#method.new)
120 pub fn start(&mut self, arg: T) {
121 unsafe {
122 ffi::fiber_start(self.inner, self.callback, Box::into_raw(Box::<T>::new(arg)));
123 }
124 }
125
126 /// Interrupt a synchronous wait of a fiber.
127 pub fn wakeup(&self) {
128 unsafe { ffi::fiber_wakeup(self.inner) }
129 }
130
131 /// Wait until the fiber is dead and then move its execution status to the caller.
132 ///
133 /// “Join” a joinable fiber. That is, let the fiber’s function run and wait until the fiber’s status is **dead**
134 /// (normally a status becomes **dead** when the function execution finishes). Joining will cause a yield,
135 /// therefore, if the fiber is currently in a **suspended** state, execution of its fiber function will resume.
136 ///
137 /// This kind of waiting is more convenient than going into a loop and periodically checking the status;
138 /// however, it works only if the fiber was created with [fiber.new()](#method.new) and was made joinable with
139 /// [fiber.set_joinable()](#method.set_joinable).
140 ///
141 /// The fiber must not be detached (See also: [fiber.set_joinable()](#method.set_joinable)).
142 ///
143 /// Return: fiber function return code
144 pub fn join(&self) -> i32 {
145 unsafe { ffi::fiber_join(self.inner) }
146 }
147
148 /// Set fiber to be joinable (false by default).
149 ///
150 /// - `is_joinable` - status to set
151 pub fn set_joinable(&mut self, is_joinable: bool) {
152 unsafe { ffi::fiber_set_joinable(self.inner, is_joinable) }
153 }
154
155 /// Cancel a fiber. (set `FIBER_IS_CANCELLED` flag)
156 ///
157 /// Running and suspended fibers can be cancelled. After a fiber has been cancelled, attempts to operate on it will
158 /// cause error: the fiber is dead. But a dead fiber can still report its id and status.
159 /// Possible errors: cancel is not permitted for the specified fiber object.
160 ///
161 /// If target fiber's flag `FIBER_IS_CANCELLABLE` set, then it would be woken up (maybe prematurely).
162 /// Then current fiber yields until the target fiber is dead (or is woken up by
163 /// [fiber.wakeup()](#method.wakeup)).
164 pub fn cancel(&mut self) {
165 unsafe { ffi::fiber_cancel(self.inner) }
166 }
167}
168
169/// Make it possible or not possible to wakeup the current
170/// fiber immediately when it's cancelled.
171///
172/// - `is_cancellable` - status to set
173///
174/// Returns previous state.
175pub fn set_cancellable(is_cancellable: bool) -> bool {
176 unsafe { ffi::fiber_set_cancellable(is_cancellable) }
177}
178
179/// Check current fiber for cancellation (it must be checked manually).
180pub fn is_cancelled() -> bool {
181 unsafe { ffi::fiber_is_cancelled() }
182}
183
184/// Put the current fiber to sleep for at least `time` seconds.
185///
186/// Yield control to the scheduler and sleep for the specified number of seconds.
187/// Only the current fiber can be made to sleep.
188///
189/// - `time` - time to sleep
190///
191/// > **Note:** this is a cancellation point (See also: [is_cancelled()](fn.is_cancelled.html))
192pub fn sleep(time: f64) {
193 unsafe { ffi::fiber_sleep(time) }
194}
195
196/// Report loop begin time as double (cheap).
197pub fn time() -> f64 {
198 unsafe { ffi::fiber_time() }
199}
200
201/// Report loop begin time as 64-bit int.
202pub fn time64() -> u64 {
203 unsafe { ffi::fiber_time64() }
204}
205
206/// Report loop begin time as double (cheap). Uses monotonic clock.
207pub fn clock() -> f64 {
208 unsafe { ffi::fiber_clock() }
209}
210
211/// Report loop begin time as 64-bit int. Uses monotonic clock.
212pub fn clock64() -> u64 {
213 unsafe { ffi::fiber_clock64() }
214}
215
216/// Yield control to the scheduler.
217///
218/// Return control to another fiber and wait until it'll be woken. Equivalent to `fiber.sleep(0)`.
219///
220/// See also: [Fiber::wakeup()](struct.Fiber.html#method.wakeup)
221pub fn fiber_yield() {
222 unsafe { ffi::fiber_yield() }
223}
224
225/// Reschedule fiber to end of event loop cycle.
226pub fn reschedule() {
227 unsafe { ffi::fiber_reschedule() }
228}
229
230/// Fiber attributes container
231pub struct FiberAttr {
232 inner: *mut ffi::FiberAttr,
233}
234
235impl FiberAttr {
236 /// Create a new fiber attribute container and initialize it with default parameters.
237 /// Can be used for many fibers creation, corresponding fibers will not take ownership.
238 ///
239 /// This is safe to drop `FiberAttr` value when fibers created with this attribute still exist.
240 pub fn new() -> Self {
241 FiberAttr {
242 inner: unsafe { ffi::fiber_attr_new() },
243 }
244 }
245
246 /// Get stack size from the fiber attribute.
247 ///
248 /// Returns: stack size
249 pub fn stack_size(&self) -> usize {
250 unsafe { ffi::fiber_attr_getstacksize(self.inner) }
251 }
252
253 ///Set stack size for the fiber attribute.
254 ///
255 /// - `stack_size` - stack size for new fibers
256 pub fn set_stack_size(&mut self, stack_size: usize) -> Result<(), Error> {
257 if unsafe { ffi::fiber_attr_setstacksize(self.inner, stack_size) } < 0 {
258 Err(TarantoolError::last().into())
259 } else {
260 Ok(())
261 }
262 }
263}
264
265impl Drop for FiberAttr {
266 fn drop(&mut self) {
267 unsafe { ffi::fiber_attr_delete(self.inner) }
268 }
269}
270
271/// Conditional variable for cooperative multitasking (fibers).
272///
273/// A cond (short for "condition variable") is a synchronization primitive
274/// that allow fibers to yield until some predicate is satisfied. Fiber
275/// conditions have two basic operations - `wait()` and `signal()`. [cond.wait()](#method.wait)
276/// suspends execution of fiber (i.e. yields) until [cond.signal()](#method.signal) is called.
277///
278/// Example:
279///
280/// ```rust
281/// use tarantool_module::fiber::Cond;
282/// let cond = fiber.cond();
283/// cond.wait();
284/// ```
285///
286/// The job will hang because [cond.wait()](#method.wait) – will go to sleep until the condition variable changes.
287///
288/// ```rust
289/// // Call from another fiber:
290/// cond.signal();
291/// ```
292///
293/// The waiting stopped, and the [cond.wait()](#method.wait) function returned true.
294///
295/// This example depended on the use of a global conditional variable with the arbitrary name cond.
296/// In real life, programmers would make sure to use different conditional variable names for different applications.
297///
298/// Unlike `pthread_cond`, [Cond]() doesn't require mutex/latch wrapping.
299pub struct Cond {
300 inner: *mut ffi::FiberCond,
301}
302
303/// - call [Cond::new()](#method.new) to create a named condition variable, which will be called `cond` for examples in this section.
304/// - call [cond.wait()](#method.wait) to make a fiber wait for a signal via a condition variable.
305/// - call [cond.signal()](#method.signal) to send a signal to wake up a single fiber that has executed [cond.wait()](#method.wait).
306/// - call [cond.broadcast()](#method.broadcast) to send a signal to all fibers that have executed [cond.wait()](#method.wait).
307impl Cond {
308 /// Instantiate a new fiber cond object.
309 pub fn new() -> Self {
310 Cond {
311 inner: unsafe { ffi::fiber_cond_new() },
312 }
313 }
314
315 /// Wake one fiber waiting for the cond.
316 /// Does nothing if no one is waiting. Does not yield.
317 pub fn signal(&self) {
318 unsafe { ffi::fiber_cond_signal(self.inner) }
319 }
320
321 /// Wake up all fibers waiting for the cond.
322 /// Does not yield.
323 pub fn broadcast(&self) {
324 unsafe { ffi::fiber_cond_broadcast(self.inner) }
325 }
326
327 /// Suspend the execution of the current fiber (i.e. yield) until [signal()](#method.signal) is called.
328 ///
329 /// Like pthread_cond, FiberCond can issue spurious wake ups caused by explicit
330 /// [Fiber::wakeup()](struct.Fiber.html#method.wakeup) or [Fiber::cancel()](struct.Fiber.html#method.cancel)
331 /// calls. It is highly recommended to wrap calls to this function into a loop
332 /// and check an actual predicate and `fiber_testcancel()` on every iteration.
333 ///
334 /// - `timeout` - timeout in seconds
335 ///
336 /// Returns:
337 /// - `true` on [signal()](#method.signal) call or a spurious wake up.
338 /// - `false` on timeout, diag is set to `TimedOut`
339 pub fn wait_timeout(&self, timeout: f64) -> bool {
340 !(unsafe { ffi::fiber_cond_wait_timeout(self.inner, timeout) } < 0)
341 }
342
343 /// Shortcut for [wait_timeout()](#method.wait_timeout).
344 pub fn wait(&self) -> bool {
345 !(unsafe { ffi::fiber_cond_wait(self.inner) } < 0)
346 }
347}
348
349impl Drop for Cond {
350 fn drop(&mut self) {
351 unsafe { ffi::fiber_cond_delete(self.inner) }
352 }
353}
354
355/// A lock for cooperative multitasking environment
356pub struct Latch {
357 inner: *mut ffi::Latch,
358}
359
360impl Latch {
361 /// Allocate and initialize the new latch.
362 pub fn new() -> Self {
363 Latch {
364 inner: unsafe { ffi::box_latch_new() },
365 }
366 }
367
368 /// Lock a latch. Waits indefinitely until the current fiber can gain access to the latch.
369 pub fn lock(&self) -> LatchGuard {
370 unsafe { ffi::box_latch_lock(self.inner) };
371 LatchGuard { latch: self }
372 }
373
374 /// Try to lock a latch. Return immediately if the latch is locked.
375 ///
376 /// Returns:
377 /// - `Some` - success
378 /// - `None` - the latch is locked.
379 pub fn try_lock(&self) -> Option<LatchGuard> {
380 if unsafe { ffi::box_latch_trylock(self.inner) } == 0 {
381 Some(LatchGuard { latch: self })
382 } else {
383 None
384 }
385 }
386}
387
388impl Drop for Latch {
389 fn drop(&mut self) {
390 unsafe { ffi::box_latch_delete(self.inner) }
391 }
392}
393
394/// An RAII implementation of a "scoped lock" of a latch. When this structure is dropped (falls out of scope),
395/// the lock will be unlocked.
396pub struct LatchGuard<'a> {
397 latch: &'a Latch,
398}
399
400impl<'a> Drop for LatchGuard<'a> {
401 fn drop(&mut self) {
402 unsafe { ffi::box_latch_unlock(self.latch.inner) }
403 }
404}
405
406pub(crate) unsafe fn unpack_callback<F, T>(callback: &mut F) -> (*mut c_void, ffi::FiberFunc)
407where
408 F: FnMut(Box<T>) -> i32,
409{
410 unsafe extern "C" fn trampoline<F, T>(mut args: VaList) -> i32
411 where
412 F: FnMut(Box<T>) -> i32,
413 {
414 let closure: &mut F = &mut *(args.get::<*const c_void>() as *mut F);
415 let arg = Box::from_raw(args.get::<*const c_void>() as *mut T);
416 (*closure)(arg)
417 }
418 (callback as *mut F as *mut c_void, Some(trampoline::<F, T>))
419}
420
421mod ffi {
422 use std::os::raw::{c_char, c_int};
423
424 use va_list::VaList;
425
426 #[repr(C)]
427 pub struct Fiber {
428 _unused: [u8; 0],
429 }
430
431 pub type FiberFunc = Option<unsafe extern "C" fn(VaList) -> c_int>;
432
433 extern "C" {
434 pub fn fiber_new(name: *const c_char, f: FiberFunc) -> *mut Fiber;
435 pub fn fiber_new_ex(
436 name: *const c_char,
437 fiber_attr: *const FiberAttr,
438 f: FiberFunc,
439 ) -> *mut Fiber;
440 pub fn fiber_yield();
441 pub fn fiber_start(callee: *mut Fiber, ...);
442 pub fn fiber_wakeup(f: *mut Fiber);
443 pub fn fiber_cancel(f: *mut Fiber);
444 pub fn fiber_set_cancellable(yesno: bool) -> bool;
445 pub fn fiber_set_joinable(fiber: *mut Fiber, yesno: bool);
446 pub fn fiber_join(f: *mut Fiber) -> c_int;
447 pub fn fiber_sleep(s: f64);
448 pub fn fiber_is_cancelled() -> bool;
449 pub fn fiber_time() -> f64;
450 pub fn fiber_time64() -> u64;
451 pub fn fiber_clock() -> f64;
452 pub fn fiber_clock64() -> u64;
453 pub fn fiber_reschedule();
454 }
455
456 #[repr(C)]
457 pub struct FiberAttr {
458 _unused: [u8; 0],
459 }
460
461 extern "C" {
462 pub fn fiber_attr_new() -> *mut FiberAttr;
463 pub fn fiber_attr_delete(fiber_attr: *mut FiberAttr);
464 pub fn fiber_attr_setstacksize(fiber_attr: *mut FiberAttr, stack_size: usize) -> c_int;
465 pub fn fiber_attr_getstacksize(fiber_attr: *mut FiberAttr) -> usize;
466 }
467
468 #[repr(C)]
469 pub struct FiberCond {
470 _unused: [u8; 0],
471 }
472
473 extern "C" {
474 pub fn fiber_cond_new() -> *mut FiberCond;
475 pub fn fiber_cond_delete(cond: *mut FiberCond);
476 pub fn fiber_cond_signal(cond: *mut FiberCond);
477 pub fn fiber_cond_broadcast(cond: *mut FiberCond);
478 pub fn fiber_cond_wait_timeout(cond: *mut FiberCond, timeout: f64) -> c_int;
479 pub fn fiber_cond_wait(cond: *mut FiberCond) -> c_int;
480 }
481
482 #[repr(C)]
483 pub struct Latch {
484 _unused: [u8; 0],
485 }
486
487 extern "C" {
488 pub fn box_latch_new() -> *mut Latch;
489 pub fn box_latch_delete(latch: *mut Latch);
490 pub fn box_latch_lock(latch: *mut Latch);
491 pub fn box_latch_trylock(latch: *mut Latch) -> c_int;
492 pub fn box_latch_unlock(latch: *mut Latch);
493 }
494}