use std::cell::UnsafeCell;
use std::ffi::CString;
use std::future::Future;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::ptr::NonNull;
use std::time::Duration;
use crate::time::Instant;
use crate::tlua::{self as tlua, AsLua};
#[cfg(not(all(target_arch = "aarch64", target_os = "macos")))]
use ::va_list::{VaList, VaPrimitive};
use tlua::unwrap_or;
#[cfg(all(target_arch = "aarch64", target_os = "macos"))]
use crate::va_list::{VaList, VaPrimitive};
use crate::error::{TarantoolError, TarantoolErrorCode};
use crate::ffi::{lua, tarantool as ffi};
use crate::Result;
use crate::{c_ptr, set_error};
pub mod r#async;
pub mod channel;
pub use channel::{
Channel, RecvError, RecvTimeout, SendError, SendTimeout, TryRecvError, TrySendError,
};
pub mod mutex;
use crate::ffi::tarantool::fiber_sleep;
pub use mutex::Mutex;
pub use r#async::block_on;
mod csw;
pub use csw::check_yield;
pub use csw::csw;
pub use csw::YieldResult;
macro_rules! impl_debug_stub {
($t:ident $($p:tt)*) => {
impl $($p)* ::std::fmt::Debug for $t $($p)* {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.debug_struct(::std::stringify!($t))
.finish_non_exhaustive()
}
}
}
}
macro_rules! impl_eq_hash {
($t:ident $($p:tt)*) => {
impl $($p)* ::std::cmp::PartialEq for $t $($p)* {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}
impl $($p)* ::std::cmp::Eq for $t $($p)* {}
impl $($p)* ::std::hash::Hash for $t $($p)* {
fn hash<H>(&self, state: &mut H)
where
H: ::std::hash::Hasher,
{
self.inner.hash(state)
}
}
}
}
pub struct Fiber<'a, T: 'a> {
inner: *mut ffi::Fiber,
callback: *mut c_void,
phantom: PhantomData<&'a T>,
}
impl_debug_stub! {Fiber<'a, T>}
impl<'a, T> Fiber<'a, T> {
pub fn new<F>(name: &str, callback: &mut F) -> Self
where
F: FnMut(Box<T>) -> i32,
{
let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
let name_cstr = CString::new(name).expect("fiber name should not contain nul bytes");
Self {
inner: unsafe { ffi::fiber_new(name_cstr.as_ptr(), trampoline) },
callback: callback_ptr,
phantom: PhantomData,
}
}
pub fn new_with_attr<F>(name: &str, attr: &FiberAttr, callback: &mut F) -> Self
where
F: FnMut(Box<T>) -> i32,
{
let (callback_ptr, trampoline) = unsafe { unpack_callback(callback) };
let name_cstr = CString::new(name).expect("fiber name should not contain nul bytes");
Self {
inner: unsafe { ffi::fiber_new_ex(name_cstr.as_ptr(), attr.inner, trampoline) },
callback: callback_ptr,
phantom: PhantomData,
}
}
pub fn start(&mut self, arg: T) {
unsafe {
let boxed_arg = Box::into_raw(Box::<T>::new(arg));
ffi::fiber_start(self.inner, self.callback, boxed_arg);
}
}
pub fn wakeup(&self) {
unsafe { ffi::fiber_wakeup(self.inner) }
}
pub fn join(&self) -> i32 {
unsafe { ffi::fiber_join(self.inner) }
}
pub fn set_joinable(&mut self, is_joinable: bool) {
unsafe { ffi::fiber_set_joinable(self.inner, is_joinable) }
}
pub fn cancel(&mut self) {
unsafe { ffi::fiber_cancel(self.inner) }
}
}
pub struct Builder<F> {
name: Option<String>,
attr: Option<FiberAttr>,
f: F,
}
impl_debug_stub! {Builder<F>}
impl Builder<NoFunc> {
#[inline(always)]
pub fn new() -> Self {
Builder {
name: None,
attr: None,
f: NoFunc,
}
}
#[inline]
pub fn func<'f, F, T>(self, f: F) -> Builder<F>
where
F: FnOnce() -> T,
F: 'f,
{
Builder {
name: self.name,
attr: self.attr,
f,
}
}
#[inline(always)]
pub fn func_async<'f, F, T>(self, f: F) -> Builder<impl FnOnce() -> T + 'f>
where
F: Future<Output = T> + 'f,
T: 'f,
{
self.func(|| block_on(f))
}
#[deprecated = "Use `Builder::func` instead"]
#[inline(always)]
pub fn proc<'f, F>(self, f: F) -> Builder<F>
where
F: FnOnce(),
F: 'f,
{
self.func(f)
}
#[deprecated = "Use `Builder::func_async` instead"]
#[inline(always)]
pub fn proc_async<'f, F>(self, f: F) -> Builder<impl FnOnce() + 'f>
where
F: Future<Output = ()> + 'f,
{
self.func_async(f)
}
}
impl Default for Builder<NoFunc> {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl<F> Builder<F> {
#[inline(always)]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
#[inline(always)]
pub fn stack_size(mut self, stack_size: usize) -> Result<Self> {
let mut attr = FiberAttr::new();
attr.set_stack_size(stack_size)?;
self.attr = Some(attr);
Ok(self)
}
}
impl<'f, F, T> Builder<F>
where
F: FnOnce() -> T + 'f,
T: 'f,
{
#[inline(always)]
pub fn start(self) -> Result<JoinHandle<'f, T>> {
let Self { name, attr, f } = self;
let name = name.unwrap_or_else(|| "<rust>".into());
Fyber::spawn_and_yield(name, f, attr.as_ref())
}
#[inline(always)]
pub fn defer(self) -> Result<JoinHandle<'f, T>> {
let Self { name, attr, f } = self;
let name = name.unwrap_or_else(|| "<rust>".into());
if unsafe { crate::ffi::has_fiber_set_ctx() } {
Fyber::spawn_deferred(name, f, attr.as_ref())
} else {
Fyber::spawn_lua(name, f, attr.as_ref())
}
}
#[inline(always)]
pub fn defer_ffi(self) -> Result<JoinHandle<'f, T>> {
let Self { name, attr, f } = self;
let name = name.unwrap_or_else(|| "<rust>".into());
Fyber::spawn_deferred(name, f, attr.as_ref())
}
#[inline(always)]
pub fn defer_lua(self) -> Result<JoinHandle<'f, T>> {
let Self { name, attr, f } = self;
let name = name.unwrap_or_else(|| "<rust>".into());
Fyber::spawn_lua(name, f, attr.as_ref())
}
}
pub struct Fyber<F, T> {
_marker: PhantomData<(F, T)>,
}
impl<F, T> ::std::fmt::Debug for Fyber<F, T> {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.debug_struct("Fyber").finish_non_exhaustive()
}
}
impl<'f, F, T> Fyber<F, T>
where
F: FnOnce() -> T + 'f,
T: 'f,
{
pub fn spawn_and_yield(
name: String,
f: F,
attr: Option<&FiberAttr>,
) -> Result<JoinHandle<'f, T>> {
let cname = CString::new(name).expect("fiber name may not contain interior null bytes");
let inner_raw = unsafe {
if let Some(attr) = attr {
ffi::fiber_new_ex(
cname.as_ptr(),
attr.inner,
Some(Self::trampoline_for_immediate),
)
} else {
ffi::fiber_new(cname.as_ptr(), Some(Self::trampoline_for_immediate))
}
};
let inner = unwrap_or!(NonNull::new(inner_raw),
return Err(TarantoolError::last().into());
);
unsafe {
ffi::fiber_set_joinable(inner.as_ptr(), true);
let result_cell = needs_returning::<T>().then(FiberResultCell::default);
let result_ptr = result_cell
.as_ref()
.map_or(std::ptr::null_mut(), |cell| cell.get());
let boxed_f = Box::new(f);
ffi::fiber_start(inner.as_ptr(), Box::into_raw(boxed_f), result_ptr);
let jh = JoinHandle::ffi(inner, result_cell);
Ok(jh)
}
}
unsafe extern "C" fn trampoline_for_immediate(mut args: VaList) -> i32 {
let f = args.get_boxed::<F>();
let result_ptr = args.get_ptr::<Option<T>>();
let t = f();
if needs_returning::<T>() {
assert!(!result_ptr.is_null());
std::ptr::write(result_ptr, Some(t));
} else if cfg!(debug_assertions) {
assert!(result_ptr.is_null());
}
0
}
pub fn spawn_deferred(
name: String,
f: F,
attr: Option<&FiberAttr>,
) -> Result<JoinHandle<'f, T>> {
let cname = CString::new(name).expect("fiber name may not contain interior null bytes");
let inner_raw = unsafe {
if let Some(attr) = attr {
ffi::fiber_new_ex(
cname.as_ptr(),
attr.inner,
Some(Self::trampoline_for_deferred_ffi),
)
} else {
ffi::fiber_new(cname.as_ptr(), Some(Self::trampoline_for_deferred_ffi))
}
};
let inner = unwrap_or!(NonNull::new(inner_raw),
return Err(TarantoolError::last().into());
);
unsafe {
ffi::fiber_set_joinable(inner.as_ptr(), true);
let result_cell = needs_returning::<T>().then(FiberResultCell::default);
let result_ptr = result_cell
.as_ref()
.map_or(std::ptr::null_mut(), |cell| cell.get());
let ctx = Box::new(DeferredFiberContext { f, result_ptr });
ffi::fiber_set_ctx(inner.as_ptr(), Box::into_raw(ctx) as _);
ffi::fiber_wakeup(inner.as_ptr());
let jh = JoinHandle::ffi(inner, result_cell);
Ok(jh)
}
}
unsafe extern "C" fn trampoline_for_deferred_ffi(_: VaList) -> i32 {
let fiber_self = ffi::fiber_self();
let ctx = ffi::fiber_get_ctx(fiber_self);
let ctx = Box::from_raw(ctx.cast::<DeferredFiberContext<F, T>>());
ffi::fiber_set_ctx(fiber_self, std::ptr::null_mut());
let t = (ctx.f)();
if needs_returning::<T>() {
assert!(!ctx.result_ptr.is_null());
std::ptr::write(ctx.result_ptr, Some(t));
} else if cfg!(debug_assertions) {
assert!(ctx.result_ptr.is_null());
}
0
}
}
struct DeferredFiberContext<F, T> {
f: F,
result_ptr: *mut Option<T>,
}
impl<'f, F, T> Fyber<F, T>
where
F: FnOnce() -> T + 'f,
T: 'f,
{
pub fn spawn_lua(name: String, f: F, _attr: Option<&FiberAttr>) -> Result<JoinHandle<'f, T>> {
let fiber_ref = unsafe {
let l = ffi::luaT_state();
lua::lua_getglobal(l, c_ptr!("require"));
lua::lua_pushstring(l, c_ptr!("fiber"));
impl_details::guarded_pcall(l, 1, 1)?;
lua::lua_getfield(l, -1, c_ptr!("new"));
impl_details::push_userdata(l, f);
lua::lua_pushcclosure(l, Self::trampoline_for_lua, 1);
impl_details::guarded_pcall(l, 1, 1).map_err(|e| {
lua::lua_pop(l, 1);
e
})?;
lua::lua_getfield(l, -1, c_ptr!("set_joinable"));
lua::lua_pushvalue(l, -2); lua::lua_pushboolean(l, true as _);
impl_details::guarded_pcall(l, 2, 0) .map_err(|e| panic!("{}", e))
.unwrap();
lua::lua_getfield(l, -1, c_ptr!("name"));
lua::lua_pushvalue(l, -2); lua::lua_pushlstring(l, name.as_ptr() as _, name.len());
impl_details::guarded_pcall(l, 2, 0) .map_err(|e| panic!("{}", e))
.unwrap();
let fiber_ref = lua::luaL_ref(l, lua::LUA_REGISTRYINDEX);
lua::lua_pop(l, 1);
fiber_ref
};
Ok(JoinHandle::lua(fiber_ref))
}
unsafe extern "C" fn trampoline_for_lua(l: *mut lua::lua_State) -> i32 {
let ud_ptr = lua::lua_touserdata(l, lua::lua_upvalueindex(1));
let f = (ud_ptr as *mut Option<F>)
.as_mut()
.unwrap_or_else(||
tlua::error!(l, "failed to extract upvalue"))
.take()
.unwrap_or_else(||
tlua::error!(l, "rust FnOnce callback was called more than once"));
let res = f();
if needs_returning::<T>() {
impl_details::push_userdata(l, res);
1
} else {
0
}
}
}
mod impl_details {
use super::*;
use crate::tlua::{AsLua, LuaError, PushGuard, StaticLua};
pub(super) unsafe fn lua_error_from_top(l: *mut lua::lua_State) -> LuaError {
let mut len = std::mem::MaybeUninit::uninit();
let data = lua::lua_tolstring(l, -1, len.as_mut_ptr());
assert!(!data.is_null());
let msg_bytes = std::slice::from_raw_parts(data as *mut u8, len.assume_init());
let msg = String::from_utf8_lossy(msg_bytes);
tlua::LuaError::ExecutionError(msg)
}
pub(super) unsafe fn guarded_pcall(
lptr: *mut lua::lua_State,
nargs: i32,
nresults: i32,
) -> Result<()> {
match lua::lua_pcall(lptr, nargs, nresults, 0) {
lua::LUA_OK => Ok(()),
lua::LUA_ERRRUN => {
let err = lua_error_from_top(lptr).into();
lua::lua_pop(lptr, 1);
Err(err)
}
code => panic!("lua_pcall: Unrecoverable failure code: {}", code),
}
}
pub(super) unsafe fn lua_fiber_join(f_ref: i32) -> Result<PushGuard<StaticLua>> {
let l = crate::global_lua();
let lptr = l.as_lua();
let top_svp = lua::lua_gettop(lptr);
lua::lua_rawgeti(lptr, lua::LUA_REGISTRYINDEX, f_ref);
lua::lua_getfield(lptr, -1, c_ptr!("join"));
lua::lua_pushvalue(lptr, -2);
lua::luaL_unref(lptr, lua::LUA_REGISTRYINDEX, f_ref);
guarded_pcall(lptr, 1, 2).map_err(|e| {
lua::lua_pop(lptr, 1);
e
})?;
let top = lua::lua_gettop(lptr);
assert_eq!(top - top_svp, 3);
let guard = PushGuard::new(l, 3);
assert_ne!(lua::lua_toboolean(lptr, -2), 0);
Ok(guard)
}
pub(super) unsafe fn push_userdata<T>(lua: tlua::LuaState, value: T) {
use tlua::ffi;
type UDBox<T> = Option<T>;
let ud_ptr = ffi::lua_newuserdata(lua, std::mem::size_of::<UDBox<T>>());
std::ptr::write(ud_ptr.cast::<UDBox<T>>(), Some(value));
if std::mem::needs_drop::<T>() {
ffi::lua_newtable(lua);
ffi::lua_pushstring(lua, c_ptr!("__gc"));
ffi::lua_pushcfunction(lua, wrap_gc::<T>);
ffi::lua_settable(lua, -3);
ffi::lua_setmetatable(lua, -2);
}
unsafe extern "C" fn wrap_gc<T>(lua: *mut ffi::lua_State) -> i32 {
let ud_ptr = ffi::lua_touserdata(lua, 1);
let ud = ud_ptr
.cast::<UDBox<T>>()
.as_mut()
.expect("__gc called with userdata pointing to NULL");
drop(ud.take());
0
}
}
}
pub struct NoFunc;
pub struct JoinHandle<'f, T> {
inner: Option<JoinHandleImpl<T>>,
marker: PhantomData<&'f ()>,
}
#[deprecated = "Use `fiber::JoinHandle<'f, ()>` instead"]
pub type UnitJoinHandle<'f> = JoinHandle<'f, ()>;
#[deprecated = "Use `fiber::JoinHandle<'f, T>` instead"]
pub type LuaJoinHandle<'f, T> = JoinHandle<'f, T>;
#[deprecated = "Use `fiber::JoinHandle<'f, ()>` instead"]
pub type LuaUnitJoinHandle<'f> = JoinHandle<'f, ()>;
#[derive(Debug)]
enum JoinHandleImpl<T> {
Ffi {
fiber: NonNull<ffi::Fiber>,
result_cell: Option<FiberResultCell<T>>,
},
Lua {
fiber_ref: i32,
},
}
type FiberResultCell<T> = Box<UnsafeCell<Option<T>>>;
impl_debug_stub! {JoinHandle<'f, T>}
impl_eq_hash! {JoinHandle<'f, T>}
impl<'f, T> JoinHandle<'f, T> {
#[inline(always)]
fn ffi(fiber: NonNull<ffi::Fiber>, result_cell: Option<FiberResultCell<T>>) -> Self {
Self {
inner: Some(JoinHandleImpl::Ffi { fiber, result_cell }),
marker: PhantomData,
}
}
#[inline(always)]
fn lua(fiber_ref: i32) -> Self {
Self {
inner: Some(JoinHandleImpl::Lua { fiber_ref }),
marker: PhantomData,
}
}
pub fn join(mut self) -> T {
let inner = self
.inner
.take()
.expect("after construction join is called at most once");
match inner {
JoinHandleImpl::Ffi {
fiber,
mut result_cell,
} => {
let _code = unsafe { ffi::fiber_join(fiber.as_ptr()) };
if needs_returning::<T>() {
let mut result_cell = result_cell
.take()
.expect("should not be None for non unit types");
result_cell
.get_mut()
.take()
.expect("should have been set by the fiber function")
} else {
if cfg!(debug_assertions) {
assert!(result_cell.is_none());
}
#[allow(clippy::uninit_assumed_init)]
unsafe {
std::mem::MaybeUninit::uninit().assume_init()
}
}
}
JoinHandleImpl::Lua { fiber_ref } => unsafe {
let guard = impl_details::lua_fiber_join(fiber_ref)
.map_err(|e| panic!("Unrecoverable lua failure: {}", e))
.unwrap();
if needs_returning::<T>() {
let ud_ptr = lua::lua_touserdata(guard.as_lua(), -1);
let res = (ud_ptr as *mut Option<T>)
.as_mut()
.expect("fiber:join must return correct userdata")
.take()
.expect("data can only be taken once from the UDBox");
res
} else {
if cfg!(debug_assertions) {
assert!(lua::lua_isnil(guard.as_lua(), -1));
}
#[allow(clippy::uninit_assumed_init)]
std::mem::MaybeUninit::uninit().assume_init()
}
},
}
}
}
impl<'f, T> Drop for JoinHandle<'f, T> {
fn drop(&mut self) {
if self.inner.is_some() {
panic!("JoinHandle dropped before being joined")
}
}
}
#[rustfmt::skip]
impl<T> ::std::cmp::PartialEq for JoinHandleImpl<T> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Ffi { fiber: self_fiber, .. }, Self::Ffi { fiber: other_fiber, .. },) => {
self_fiber == other_fiber
}
(Self::Lua { fiber_ref: self_ref, .. }, Self::Lua { fiber_ref: other_ref, .. },) => {
self_ref == other_ref
}
(_, _) => false,
}
}
}
impl<T> ::std::cmp::Eq for JoinHandleImpl<T> {}
impl<T> ::std::hash::Hash for JoinHandleImpl<T> {
fn hash<H>(&self, state: &mut H)
where
H: ::std::hash::Hasher,
{
match self {
Self::Ffi { fiber, .. } => fiber.hash(state),
Self::Lua { fiber_ref, .. } => fiber_ref.hash(state),
}
}
}
trait TrampolineArgs {
unsafe fn get<T>(&mut self) -> T
where
T: VaPrimitive;
unsafe fn get_boxed<T>(&mut self) -> Box<T> {
Box::from_raw(self.get::<*const c_void>() as *mut T)
}
unsafe fn get_ptr<T>(&mut self) -> *mut T {
self.get::<*const c_void>() as *mut T
}
unsafe fn get_str(&mut self) -> String {
let buf = self.get::<*const u8>() as *mut u8;
let length = self.get::<usize>();
let capacity = self.get::<usize>();
String::from_raw_parts(buf, length, capacity)
}
}
impl TrampolineArgs for VaList {
unsafe fn get<T>(&mut self) -> T
where
T: VaPrimitive,
{
self.get::<T>()
}
}
#[inline(always)]
pub fn start<'f, F, T>(f: F) -> JoinHandle<'f, T>
where
F: FnOnce() -> T,
F: 'f,
T: 'f,
{
Builder::new().func(f).start().unwrap()
}
#[inline(always)]
pub fn start_async<'f, F, T>(f: F) -> JoinHandle<'f, T>
where
F: Future<Output = T> + 'f,
T: 'f,
{
start(|| block_on(f))
}
#[deprecated = "Use `fiber::start` instead"]
#[inline(always)]
pub fn start_proc<'f, F>(f: F) -> JoinHandle<'f, ()>
where
F: FnOnce(),
F: 'f,
{
start(f)
}
#[inline(always)]
pub fn defer<'f, F, T>(f: F) -> JoinHandle<'f, T>
where
F: FnOnce() -> T,
F: 'f,
T: 'f,
{
Builder::new().func(f).defer().unwrap()
}
#[inline(always)]
pub fn defer_async<'f, F, T>(f: F) -> JoinHandle<'f, T>
where
F: Future<Output = T> + 'f,
T: 'f,
{
defer(|| block_on(f))
}
#[deprecated = "Use `fiber::defer` instead"]
#[inline(always)]
pub fn defer_proc<'f, F>(f: F) -> JoinHandle<'f, ()>
where
F: FnOnce(),
F: 'f,
{
defer(f)
}
pub fn set_cancellable(is_cancellable: bool) -> bool {
unsafe { ffi::fiber_set_cancellable(is_cancellable) }
}
pub fn is_cancelled() -> bool {
unsafe { ffi::fiber_is_cancelled() }
}
#[inline(always)]
pub fn sleep(time: Duration) {
unsafe { ffi::fiber_sleep(time.as_secs_f64()) }
}
#[inline(always)]
pub fn clock() -> Instant {
let secs = unsafe { ffi::fiber_clock() };
Instant(Duration::from_secs_f64(secs))
}
#[inline(always)]
pub fn fiber_yield() {
unsafe { ffi::fiber_yield() }
}
pub fn r#yield() -> Result<()> {
unsafe { fiber_sleep(0f64) };
if is_cancelled() {
set_error!(TarantoolErrorCode::ProcLua, "fiber is cancelled");
return Err(TarantoolError::last().into());
}
Ok(())
}
#[inline(always)]
pub fn reschedule() {
unsafe { ffi::fiber_reschedule() }
}
#[derive(Debug)]
pub struct FiberAttr {
inner: *mut ffi::FiberAttr,
}
impl FiberAttr {
#[inline(always)]
pub fn new() -> Self {
FiberAttr {
inner: unsafe { ffi::fiber_attr_new() },
}
}
#[inline(always)]
pub fn stack_size(&self) -> usize {
unsafe { ffi::fiber_attr_getstacksize(self.inner) }
}
#[inline(always)]
pub fn set_stack_size(&mut self, stack_size: usize) -> Result<()> {
if unsafe { ffi::fiber_attr_setstacksize(self.inner, stack_size) } < 0 {
Err(TarantoolError::last().into())
} else {
Ok(())
}
}
}
impl Default for FiberAttr {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl Drop for FiberAttr {
#[inline(always)]
fn drop(&mut self) {
unsafe { ffi::fiber_attr_delete(self.inner) }
}
}
#[derive(Debug)]
pub struct Cond {
inner: *mut ffi::FiberCond,
}
impl Cond {
#[inline(always)]
pub fn new() -> Self {
Cond {
inner: unsafe { ffi::fiber_cond_new() },
}
}
#[inline(always)]
pub fn signal(&self) {
unsafe { ffi::fiber_cond_signal(self.inner) }
}
#[inline(always)]
pub fn broadcast(&self) {
unsafe { ffi::fiber_cond_broadcast(self.inner) }
}
#[inline(always)]
pub fn wait_timeout(&self, timeout: Duration) -> bool {
unsafe { ffi::fiber_cond_wait_timeout(self.inner, timeout.as_secs_f64()) >= 0 }
}
#[inline(always)]
pub fn wait(&self) -> bool {
unsafe { ffi::fiber_cond_wait(self.inner) >= 0 }
}
}
impl Default for Cond {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl Drop for Cond {
#[inline(always)]
fn drop(&mut self) {
unsafe { ffi::fiber_cond_delete(self.inner) }
}
}
#[derive(Debug)]
pub struct Latch {
inner: *mut ffi::Latch,
}
impl Latch {
#[inline(always)]
pub fn new() -> Self {
Latch {
inner: unsafe { ffi::box_latch_new() },
}
}
#[inline(always)]
pub fn lock(&self) -> LatchGuard {
unsafe { ffi::box_latch_lock(self.inner) };
LatchGuard {
latch_inner: self.inner,
}
}
#[inline(always)]
pub fn try_lock(&self) -> Option<LatchGuard> {
if unsafe { ffi::box_latch_trylock(self.inner) } == 0 {
Some(LatchGuard {
latch_inner: self.inner,
})
} else {
None
}
}
}
impl Default for Latch {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl Drop for Latch {
#[inline(always)]
fn drop(&mut self) {
unsafe { ffi::box_latch_delete(self.inner) }
}
}
#[derive(Debug)]
pub struct LatchGuard {
latch_inner: *mut ffi::Latch,
}
impl Drop for LatchGuard {
#[inline(always)]
fn drop(&mut self) {
unsafe { ffi::box_latch_unlock(self.latch_inner) }
}
}
pub(crate) unsafe fn unpack_callback<F, T>(callback: &mut F) -> (*mut c_void, ffi::FiberFunc)
where
F: FnMut(Box<T>) -> i32,
{
unsafe extern "C" fn trampoline<F, T>(mut args: VaList) -> i32
where
F: FnMut(Box<T>) -> i32,
{
let closure: &mut F = &mut *(args.get::<*const c_void>() as *mut F);
let boxed_arg = Box::from_raw(args.get::<*const c_void>() as *mut T);
(*closure)(boxed_arg)
}
(callback as *mut F as *mut c_void, Some(trampoline::<F, T>))
}
const fn needs_returning<T>() -> bool {
std::mem::size_of::<T>() != 0 || std::mem::needs_drop::<T>()
}
const _: () = {
assert!(needs_returning::<i32>());
assert!(needs_returning::<bool>());
assert!(!needs_returning::<()>());
struct UnitStruct;
assert!(!needs_returning::<UnitStruct>());
struct DroppableUnitStruct;
impl Drop for DroppableUnitStruct {
fn drop(&mut self) {}
}
assert!(needs_returning::<DroppableUnitStruct>());
};
#[cfg(feature = "internal_test")]
mod tests {
use super::*;
use std::cell::RefCell;
use std::rc::Rc;
#[crate::test(tarantool = "crate")]
fn builder_async_func() {
let jh = Builder::new().func_async(async { 69 }).start().unwrap();
let res = jh.join();
assert_eq!(res, 69);
}
#[crate::test(tarantool = "crate")]
#[allow(deprecated)]
fn builder_async_proc() {
let res = Rc::new(RefCell::new(0u32));
let res_moved = res.clone();
let jh = Builder::new()
.proc_async(async move {
*res_moved.borrow_mut() = 1;
})
.start()
.unwrap();
jh.join();
assert_eq!(*res.borrow(), 1);
}
#[crate::test(tarantool = "crate")]
fn fiber_sleep_and_clock() {
let before_sleep = clock();
let sleep_for = Duration::from_millis(100);
sleep(sleep_for);
assert!(before_sleep.elapsed() >= sleep_for);
assert!(clock() >= before_sleep);
assert!(clock() - before_sleep >= sleep_for);
}
}