syslog_rs/a_sync/async_tokio/
async_queue_adapter.rs1use std::fmt;
15
16use tokio::sync::oneshot;
17
18use crate::
19{
20 error::SyRes,
21 formatters::SyslogFormatter,
22 map_error_code,
23 sy_sync_queue::
24 {
25 SyCmd,
26 SyslogQueueChanRcv,
27 SyslogQueueChanSnd,
28 SyslogQueueChannel,
29 SyslogQueueOneChanRcv,
30 SyslogQueueOneChanSnd
31 },
32 SyslogDestination
33};
34
35
36impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanRcv<F, D, S>
37for tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, S>>
38{
39 fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>
40 {
41 return
42 self
43 .blocking_recv();
44 }
45}
46
47
48impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanSnd<F, D, S>
49for tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, S>>
50{
51 fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
52 {
53 return
54 self
55 .send(msg)
56 .map_err(|e|
57 map_error_code!(SendError, "send_blocking() error: '{}'", e)
58 );
59 }
60
61 async
62 fn q_send(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
63 {
64 return
65 self
66 .send(msg)
67 .map_err(|e|
68 map_error_code!(SendError, "send_blocking() error: '{}'", e)
69 );
70 }
71}
72
73impl<C: Send + fmt::Debug> SyslogQueueOneChanSnd<C> for tokio::sync::oneshot::Sender<C>
74{
75 fn send_once_blocking(self, data: C) -> Result<(), C>
76 {
77 return
78 self.send(data);
79
80 }
81}
82
83impl<C> SyslogQueueOneChanRcv<C> for tokio::sync::oneshot::Receiver<C>
84{
85 fn recv_once_blocking(self) -> SyRes<C>
86 {
87 return
88 self
89 .blocking_recv()
90 .map_err(|e|
91 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
92 );
93 }
94
95 async
96 fn recv_once(self) -> SyRes<C> where Self: Sized
97 {
98 return
99 self
100 .await
101 .map_err(|e|
102 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
103 );
104
105 }
106}
107
108#[derive(Debug, Clone)]
109pub struct TokioQueueAdapter;
110
111unsafe impl Send for TokioQueueAdapter {}
112
113impl<F: SyslogFormatter, D: SyslogDestination> SyslogQueueChannel<F, D> for TokioQueueAdapter
114{
115 type ChannelSnd = tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, Self>>;
116
117 type ChannelRcv = tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, Self>>;
118
119 type OneShotChannelSnd<C: Send + fmt::Debug> = tokio::sync::oneshot::Sender<C>;
120
121 type OneShotChannelRcv<C> = tokio::sync::oneshot::Receiver<C>;
122
123 fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv)
124 {
125 return tokio::sync::mpsc::unbounded_channel::<SyCmd<F, D, Self>>();
126 }
127
128 fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>)
129 {
130 return oneshot::channel::<C>();
131 }
132}
133
134
135pub type DefaultQueueAdapter = TokioQueueAdapter;
136