use std::fmt;
use tokio::sync::oneshot;
use crate::
{
error::SyRes,
formatters::SyslogFormatter,
map_error_code,
sy_sync_queue::
{
SyCmd,
SyslogQueueChanRcv,
SyslogQueueChanSnd,
SyslogQueueChannel,
SyslogQueueOneChanRcv,
SyslogQueueOneChanSnd
},
SyslogDestination
};
impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanRcv<F, D, S>
for tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, S>>
{
fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>
{
return
self
.blocking_recv();
}
}
impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanSnd<F, D, S>
for tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, S>>
{
fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
{
return
self
.send(msg)
.map_err(|e|
map_error_code!(SendError, "send_blocking() error: '{}'", e)
);
}
async
fn q_send(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
{
return
self
.send(msg)
.map_err(|e|
map_error_code!(SendError, "send_blocking() error: '{}'", e)
);
}
}
impl<C: Send + fmt::Debug> SyslogQueueOneChanSnd<C> for tokio::sync::oneshot::Sender<C>
{
fn send_once_blocking(self, data: C) -> Result<(), C>
{
return
self.send(data);
}
}
impl<C> SyslogQueueOneChanRcv<C> for tokio::sync::oneshot::Receiver<C>
{
fn recv_once_blocking(self) -> SyRes<C>
{
return
self
.blocking_recv()
.map_err(|e|
map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
);
}
async
fn recv_once(self) -> SyRes<C> where Self: Sized
{
return
self
.await
.map_err(|e|
map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
);
}
}
#[derive(Debug, Clone)]
pub struct TokioQueueAdapter;
unsafe impl Send for TokioQueueAdapter {}
impl<F: SyslogFormatter, D: SyslogDestination> SyslogQueueChannel<F, D> for TokioQueueAdapter
{
const ADAPTER_NAME: &'static str = "tokio";
type ChannelSnd = tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, Self>>;
type ChannelRcv = tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, Self>>;
type OneShotChannelSnd<C: Send + fmt::Debug> = tokio::sync::oneshot::Sender<C>;
type OneShotChannelRcv<C> = tokio::sync::oneshot::Receiver<C>;
fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv)
{
return tokio::sync::mpsc::unbounded_channel::<SyCmd<F, D, Self>>();
}
fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>)
{
return oneshot::channel::<C>();
}
}
pub type DefaultQueueAdapter = TokioQueueAdapter;