syslog_rs/a_sync/async_tokio/
async_queue_adapter.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14use std::fmt;
15
16use tokio::sync::oneshot;
17
18use crate::
19{
20    error::SyRes, 
21    formatters::SyslogFormatter, 
22    map_error_code, 
23    sy_sync_queue::
24    {
25        SyCmd, 
26        SyslogQueueChanRcv, 
27        SyslogQueueChanSnd, 
28        SyslogQueueChannel, 
29        SyslogQueueOneChanRcv, 
30        SyslogQueueOneChanSnd
31    }, 
32    SyslogDestination
33};
34
35
36impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanRcv<F, D, S> 
37for tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, S>>
38{
39    fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>
40    {
41        return 
42            self
43                .blocking_recv();
44    }
45}
46 
47
48impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanSnd<F, D, S> 
49for tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, S>>
50{
51    fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()> 
52    {
53        return 
54            self
55                .send(msg)
56                .map_err(|e| 
57                    map_error_code!(SendError, "send_blocking() error: '{}'", e)
58                );
59    }
60
61    async 
62    fn q_send(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
63    {
64        return 
65            self
66                .send(msg)
67                .map_err(|e| 
68                    map_error_code!(SendError, "send_blocking() error: '{}'", e)
69                );
70    }
71}
72
73impl<C: Send + fmt::Debug> SyslogQueueOneChanSnd<C> for tokio::sync::oneshot::Sender<C>
74{
75    fn send_once_blocking(self, data: C) -> Result<(), C>
76    {
77        return 
78            self.send(data);
79                
80    }
81}
82
83impl<C> SyslogQueueOneChanRcv<C> for tokio::sync::oneshot::Receiver<C>
84{
85    fn recv_once_blocking(self) -> SyRes<C> 
86    {
87        return 
88            self
89                .blocking_recv()
90                .map_err(|e|
91                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
92                );
93    }
94
95    async 
96    fn recv_once(self) -> SyRes<C> where Self: Sized
97    {
98        return 
99            self
100                .await
101                .map_err(|e|
102                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
103                );
104
105    }
106}
107
108#[derive(Debug, Clone)]
109pub struct TokioQueueAdapter;
110
111unsafe impl Send for TokioQueueAdapter {}
112
113impl<F: SyslogFormatter, D: SyslogDestination> SyslogQueueChannel<F, D> for TokioQueueAdapter
114{
115    type ChannelSnd = tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, Self>>;
116
117    type ChannelRcv = tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, Self>>;
118
119    type OneShotChannelSnd<C: Send + fmt::Debug> = tokio::sync::oneshot::Sender<C>;
120
121    type OneShotChannelRcv<C> = tokio::sync::oneshot::Receiver<C>;
122
123    fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv) 
124    {
125        return tokio::sync::mpsc::unbounded_channel::<SyCmd<F, D, Self>>();
126    }
127
128    fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>) 
129    {
130        return oneshot::channel::<C>();
131    }
132}
133
134
135pub type DefaultQueueAdapter = TokioQueueAdapter;
136