#![cfg(any(feature = "picodata", doc))]
pub mod oneshot;
pub mod sync;
pub mod unbounded;
use crate::ffi;
use crate::ffi::tarantool::{
cbus_endpoint_delete, cbus_endpoint_new, cbus_loop, lcpipe_delete, lcpipe_new, lcpipe_push_now,
};
use crate::fiber::Cond;
use std::ffi::CString;
use std::os::raw::c_void;
use std::ptr;
#[derive(Debug, thiserror::Error)]
pub enum RecvError {
#[error("sending half of a channel is disconnected")]
Disconnected,
}
#[derive(Debug, thiserror::Error)]
#[error("receiving half of a channel is disconnected")]
pub struct SendError<T>(pub T);
#[derive(Debug, thiserror::Error)]
pub enum CbusError {
#[error("endpoint with given name already registered")]
EndpointAlreadyExists,
}
#[repr(C)]
struct STailQEntry {
next: *const STailQEntry,
}
#[repr(C)]
pub struct MessageHop {
f: unsafe fn(*mut c_void),
_pipe: *const c_void,
}
#[repr(C)]
pub struct Message<T> {
fifo: STailQEntry,
route: *mut MessageHop,
hop: *mut MessageHop,
callback: Option<T>,
}
impl<F> Message<F>
where
F: FnOnce() + Send + 'static,
{
unsafe fn trampoline(msg: *mut c_void) {
let msg = msg.cast::<Self>();
let mut msg = Box::from_raw(msg);
if let Some(callback) = msg.callback.take() {
callback();
}
}
pub fn new(callback: F) -> Self {
let hop = MessageHop {
f: Self::trampoline,
_pipe: std::ptr::null(),
};
let hop = Box::new(hop);
let hop = Box::into_raw(hop);
Self {
fifo: STailQEntry { next: ptr::null() },
route: hop,
hop,
callback: Some(callback),
}
}
}
impl<T> Drop for Message<T> {
fn drop(&mut self) {
let hop = self.hop.cast::<MessageHop>();
drop(unsafe { Box::from_raw(hop) });
}
}
pub struct Endpoint {
endpoint: *const (),
}
impl Endpoint {
pub fn new(name: &str) -> Result<Self, CbusError> {
let mut endpoint: *mut () = ptr::null_mut();
let endpoint_ptr: *mut *mut () = &mut endpoint;
let name = CString::new(name).expect("endpoint name may not contain interior null bytes");
let err = unsafe { cbus_endpoint_new(endpoint_ptr as *mut *mut c_void, name.as_ptr()) };
if err != 0 {
return Err(CbusError::EndpointAlreadyExists);
}
Ok(Endpoint { endpoint })
}
pub fn cbus_loop(&self) {
unsafe { cbus_loop(self.endpoint as *mut c_void) }
}
}
impl Drop for Endpoint {
fn drop(&mut self) {
unsafe { cbus_endpoint_delete(self.endpoint as *mut c_void) };
}
}
pub struct LCPipe {
pipe: *mut ffi::tarantool::LCPipe,
}
unsafe impl Send for LCPipe {}
impl LCPipe {
pub fn new(endpoint_name: &str) -> Self {
let endpoint =
CString::new(endpoint_name).expect("endpoint name may not contain interior null bytes");
Self {
pipe: unsafe { lcpipe_new(endpoint.as_ptr()) },
}
}
pub fn push_message<T>(&mut self, msg: Message<T>) {
let msg = Box::new(msg);
let msg = Box::leak(msg);
unsafe { lcpipe_push_now(self.pipe, msg as *mut Message<T> as *mut c_void) }
}
}
impl Drop for LCPipe {
fn drop(&mut self) {
unsafe { lcpipe_delete(self.pipe) };
}
}
struct UnsafeCond(Cond);
impl UnsafeCond {
unsafe fn as_ref(&self) -> &Cond {
&self.0
}
}
unsafe impl Send for UnsafeCond {}
unsafe impl Sync for UnsafeCond {}
#[cfg(feature = "internal_test")]
mod tests {
use crate::cbus;
use crate::cbus::Message;
use crate::fiber;
use crate::fiber::Cond;
use crate::static_ref;
use std::thread;
use std::thread::ThreadId;
pub(super) fn run_cbus_endpoint(endpoint_name: &'static str) -> fiber::FiberId {
fiber::Builder::new()
.name("cbus_fiber")
.func(move || {
let cbus_endpoint = cbus::Endpoint::new(endpoint_name).unwrap();
cbus_endpoint.cbus_loop();
})
.start_non_joinable()
.expect("failed to start the cbus_fiber")
}
#[crate::test(tarantool = "crate")]
pub fn cbus_send_message_test() {
static mut TX_THREAD_ID: Option<ThreadId> = None;
static mut SENDER_THREAD_ID: Option<ThreadId> = None;
let cbus_fiber_id = run_cbus_endpoint("cbus_send_message_test");
struct CondPtr(*const Cond);
unsafe impl Send for CondPtr {}
let cond = Cond::new();
let cond_ptr = CondPtr(&cond as *const Cond);
let thread = thread::spawn(move || {
unsafe { SENDER_THREAD_ID = Some(thread::current().id()) };
let mut pipe = cbus::LCPipe::new("cbus_send_message_test");
let msg = Message::new(move || {
let _ = &cond_ptr;
unsafe { TX_THREAD_ID = Some(thread::current().id()) };
let cond = unsafe { cond_ptr.0.as_ref().unwrap() };
cond.broadcast();
});
pipe.push_message(msg);
});
cond.wait();
unsafe {
assert!(static_ref!(const SENDER_THREAD_ID).is_some());
assert!(static_ref!(const TX_THREAD_ID).is_some());
assert_ne!(
static_ref!(const SENDER_THREAD_ID),
static_ref!(const TX_THREAD_ID)
);
}
thread.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
}