use super::{LCPipe, Message};
use crate::cbus::RecvError;
use crate::fiber::Cond;
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
struct Channel<T> {
message: UnsafeCell<Option<T>>,
cond: Arc<Cond>,
ready: AtomicBool,
}
unsafe impl<T> Sync for Channel<T> where T: Send {}
unsafe impl<T> Send for Channel<T> where T: Send {}
impl<T> Channel<T> {
fn new() -> Self {
Self {
message: UnsafeCell::new(None),
ready: AtomicBool::new(false),
cond: Arc::new(Cond::new()),
}
}
}
pub struct Sender<T> {
channel: Arc<Channel<T>>,
pipe: Arc<LCPipe>,
}
pub struct EndpointReceiver<T> {
channel: Arc<Channel<T>>,
}
pub fn channel_on_pipe<T>(pipe: Arc<LCPipe>) -> (Sender<T>, EndpointReceiver<T>) {
let channel = Arc::new(Channel::new());
(
Sender {
channel: channel.clone(),
pipe,
},
EndpointReceiver { channel },
)
}
pub fn channel<T>(cbus_endpoint: &str) -> (Sender<T>, EndpointReceiver<T>) {
channel_on_pipe(Arc::new(LCPipe::new(cbus_endpoint)))
}
impl<T> Sender<T> {
pub fn send(self, message: T) {
unsafe { *self.channel.message.get() = Some(message) };
self.channel.ready.store(true, Ordering::Release);
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let cond = Arc::clone(&self.channel.cond);
let msg = Message::new(move || {
cond.signal();
});
self.pipe.push_message(msg);
}
}
impl<T> EndpointReceiver<T> {
pub fn receive(self) -> Result<T, RecvError> {
if !self.channel.ready.swap(false, Ordering::Acquire) {
self.channel.cond.wait();
}
unsafe {
self.channel
.message
.get()
.as_mut()
.expect("unexpected null pointer")
.take()
}
.ok_or(RecvError::Disconnected)
}
}
impl<T> Default for Channel<T> {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "internal_test")]
mod tests {
use super::super::tests::run_cbus_endpoint;
use crate::cbus;
use crate::cbus::{oneshot, RecvError};
use crate::fiber::{check_yield, YieldResult};
use std::sync::Arc;
use std::time::Duration;
use std::{mem, thread};
#[crate::test(tarantool = "crate")]
pub fn oneshot_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_test");
let (sender, receiver) = oneshot::channel("oneshot_test");
let thread = thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
sender.send(1);
});
assert_eq!(
check_yield(|| { receiver.receive().unwrap() }),
YieldResult::Yielded(1)
);
thread.join().unwrap();
let (sender, receiver) = oneshot::channel("oneshot_test");
let thread = thread::spawn(move || {
sender.send(2);
});
thread.join().unwrap();
assert_eq!(
check_yield(|| { receiver.receive().unwrap() }),
YieldResult::DidntYield(2)
);
cbus_fiber.cancel();
}
#[crate::test(tarantool = "crate")]
pub fn oneshot_multiple_channels_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_multiple_channels_test");
let pipe = cbus::LCPipe::new("oneshot_multiple_channels_test");
let pipe = Arc::new(pipe);
let (sender1, receiver1) = oneshot::channel_on_pipe(Arc::clone(&pipe));
let (sender2, receiver2) = oneshot::channel_on_pipe(Arc::clone(&pipe));
let thread1 = thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
sender1.send("1");
});
let thread2 = thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
sender2.send("2");
});
let result2 = receiver2.receive();
let result1 = receiver1.receive();
assert!(matches!(result1, Ok("1")));
assert!(matches!(result2, Ok("2")));
thread1.join().unwrap();
thread2.join().unwrap();
cbus_fiber.cancel();
}
#[crate::test(tarantool = "crate")]
pub fn oneshot_sender_drop_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_sender_drop_test");
let (sender, receiver) = oneshot::channel::<()>("oneshot_sender_drop_test");
let thread = thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
mem::drop(sender)
});
let result = receiver.receive();
assert!(matches!(result, Err(RecvError::Disconnected)));
thread.join().unwrap();
cbus_fiber.cancel();
}
}