use std::{fmt, time::Duration};
use crate::
{
error::SyRes,
formatters::SyslogFormatter,
map_error_code,
sy_sync_queue::
{
SyCmd,
SyslogQueueChanRcv,
SyslogQueueChanSnd,
SyslogQueueChannel,
SyslogQueueOneChanRcv,
SyslogQueueOneChanSnd
},
SyslogDestination
};
use crossbeam_channel::{bounded, unbounded};
impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanRcv<F, D, S>
for crossbeam_channel::Receiver<SyCmd<F, D, S>>
{
fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>
{
return
self
.recv_timeout(Duration::from_millis(100))
.ok();
}
}
impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanSnd<F, D, S>
for crossbeam_channel::Sender<SyCmd<F, D, S>>
{
fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
{
return
self
.send_timeout(msg, Duration::from_millis(100))
.map_err(|e|
map_error_code!(SendError, "send_blocking() error: '{}'", e)
);
}
}
impl<C: Send> SyslogQueueOneChanSnd<C> for crossbeam_channel::Sender<C>
{
fn send_once_blocking(self, data: C) -> Result<(), C>
{
return
self
.send_timeout(data, Duration::from_millis(100))
.map_err(|e| e.into_inner());
}
}
impl<C> SyslogQueueOneChanRcv<C> for crossbeam_channel::Receiver<C>
{
fn recv_once_blocking(self) -> SyRes<C>
{
return
self
.recv_timeout(Duration::from_millis(500))
.map_err(|e|
map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
);
}
}
#[derive(Debug, Clone)]
pub struct CrossbeamQueueAdapter;
unsafe impl Send for CrossbeamQueueAdapter {}
impl<F: SyslogFormatter, D: SyslogDestination> SyslogQueueChannel<F, D> for CrossbeamQueueAdapter
{
const ADAPTER_NAME: &'static str = "crossbeam";
type ChannelSnd = crossbeam_channel::Sender<SyCmd<F, D, Self>>;
type ChannelRcv = crossbeam_channel::Receiver<SyCmd<F, D, Self>>;
type OneShotChannelSnd<C: Send + fmt::Debug> = crossbeam_channel::Sender<C>;
type OneShotChannelRcv<C> = crossbeam_channel::Receiver<C>;
fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv)
{
return unbounded::<SyCmd<F, D, Self>>();
}
fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>)
{
return bounded::<C>(1);
}
}
pub type DefaultQueueAdapter = CrossbeamQueueAdapter;