syslog_rs/a_sync/async_tokio/
async_queue_adapter.rs1use std::fmt;
17
18use tokio::sync::oneshot;
19
20use crate::
21{
22 error::SyRes,
23 formatters::SyslogFormatter,
24 map_error_code,
25 sy_sync_queue::
26 {
27 SyCmd,
28 SyslogQueueChanRcv,
29 SyslogQueueChanSnd,
30 SyslogQueueChannel,
31 SyslogQueueOneChanRcv,
32 SyslogQueueOneChanSnd
33 },
34 SyslogDestination
35};
36
37
38impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanRcv<F, D, S>
39for tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, S>>
40{
41 fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>
42 {
43 return
44 self
45 .blocking_recv();
46 }
47}
48
49
50impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanSnd<F, D, S>
51for tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, S>>
52{
53 fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
54 {
55 return
56 self
57 .send(msg)
58 .map_err(|e|
59 map_error_code!(SendError, "send_blocking() error: '{}'", e)
60 );
61 }
62
63 async
64 fn q_send(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
65 {
66 return
67 self
68 .send(msg)
69 .map_err(|e|
70 map_error_code!(SendError, "send_blocking() error: '{}'", e)
71 );
72 }
73}
74
75impl<C: Send + fmt::Debug> SyslogQueueOneChanSnd<C> for tokio::sync::oneshot::Sender<C>
76{
77 fn send_once_blocking(self, data: C) -> Result<(), C>
78 {
79 return
80 self.send(data);
81
82 }
83}
84
85impl<C> SyslogQueueOneChanRcv<C> for tokio::sync::oneshot::Receiver<C>
86{
87 fn recv_once_blocking(self) -> SyRes<C>
88 {
89 return
90 self
91 .blocking_recv()
92 .map_err(|e|
93 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
94 );
95 }
96
97 async
98 fn recv_once(self) -> SyRes<C> where Self: Sized
99 {
100 return
101 self
102 .await
103 .map_err(|e|
104 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
105 );
106
107 }
108}
109
110#[derive(Debug, Clone)]
111pub struct TokioQueueAdapter;
112
113unsafe impl Send for TokioQueueAdapter {}
114
115impl<F: SyslogFormatter, D: SyslogDestination> SyslogQueueChannel<F, D> for TokioQueueAdapter
116{
117 const ADAPTER_NAME: &'static str = "tokio";
118
119 type ChannelSnd = tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, Self>>;
120
121 type ChannelRcv = tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, Self>>;
122
123 type OneShotChannelSnd<C: Send + fmt::Debug> = tokio::sync::oneshot::Sender<C>;
124
125 type OneShotChannelRcv<C> = tokio::sync::oneshot::Receiver<C>;
126
127 fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv)
128 {
129 return tokio::sync::mpsc::unbounded_channel::<SyCmd<F, D, Self>>();
130 }
131
132 fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>)
133 {
134 return oneshot::channel::<C>();
135 }
136}
137
138
139pub type DefaultQueueAdapter = TokioQueueAdapter;
140