use std::sync::Once;
use tarantool::{
cbus::{self, oneshot, RecvError},
fiber,
};
use crate::error::{Error, Result};
use super::{Transport, TransportReceiver, TransportSender};
const DEFAULT_CBUS_ENDPONT: &str = "tros_cbus_endpoint";
static DEFAULT_LOOP_FIBER_INIT: Once = Once::new();
#[derive(Debug)]
pub struct CBusTransport<'a> {
cbus_endpoint: &'a str,
}
impl<'a> Default for CBusTransport<'a> {
fn default() -> Self {
DEFAULT_LOOP_FIBER_INIT.call_once(|| {
fiber::Builder::new()
.name("tros_cbus_loop_f")
.func(move || {
let cbus_endpoint = cbus::Endpoint::new(DEFAULT_CBUS_ENDPONT)
.expect("error on start cbus endpoint");
cbus_endpoint.cbus_loop();
})
.start_non_joinable()
.unwrap();
});
Self::new_unchecked(DEFAULT_CBUS_ENDPONT)
}
}
impl<'a> CBusTransport<'a> {
pub fn new_unchecked(cbus_endpoint: &'a str) -> Self {
Self { cbus_endpoint }
}
}
impl<'a> Transport for CBusTransport<'a> {
type Sender<T> = oneshot::Sender<T> where T: Send;
type Receiver<T> = oneshot::EndpointReceiver<T> where T: Send;
fn create_channel<T: Send>(&self) -> (Self::Sender<T>, Self::Receiver<T>) {
oneshot::channel(self.cbus_endpoint)
}
}
impl<T> TransportSender<T> for oneshot::Sender<T> {
fn send(self, data: T) -> Result<()> {
oneshot::Sender::send(self, data);
Ok(())
}
}
impl<T> TransportReceiver<T> for oneshot::EndpointReceiver<T> {
fn receive(self) -> Result<T> {
oneshot::EndpointReceiver::receive(self).map_err(|err| match err {
RecvError::Disconnected => Error::sender_disconnected(),
})
}
}