syslog_rs/a_sync/async_tokio/
async_queue_adapter.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 *   2. The MIT License (MIT)
12 *                     
13 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16use std::fmt;
17
18use tokio::sync::oneshot;
19
20use crate::
21{
22    error::SyRes, 
23    formatters::SyslogFormatter, 
24    map_error_code, 
25    sy_sync_queue::
26    {
27        SyCmd, 
28        SyslogQueueChanRcv, 
29        SyslogQueueChanSnd, 
30        SyslogQueueChannel, 
31        SyslogQueueOneChanRcv, 
32        SyslogQueueOneChanSnd
33    }, 
34    SyslogDestination
35};
36
37
38impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanRcv<F, D, S> 
39for tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, S>>
40{
41    fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>
42    {
43        return 
44            self
45                .blocking_recv();
46    }
47}
48 
49
50impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogQueueChanSnd<F, D, S> 
51for tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, S>>
52{
53    fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()> 
54    {
55        return 
56            self
57                .send(msg)
58                .map_err(|e| 
59                    map_error_code!(SendError, "send_blocking() error: '{}'", e)
60                );
61    }
62
63    async 
64    fn q_send(&self, msg: SyCmd<F, D, S>) -> SyRes<()>
65    {
66        return 
67            self
68                .send(msg)
69                .map_err(|e| 
70                    map_error_code!(SendError, "send_blocking() error: '{}'", e)
71                );
72    }
73}
74
75impl<C: Send + fmt::Debug> SyslogQueueOneChanSnd<C> for tokio::sync::oneshot::Sender<C>
76{
77    fn send_once_blocking(self, data: C) -> Result<(), C>
78    {
79        return 
80            self.send(data);
81                
82    }
83}
84
85impl<C> SyslogQueueOneChanRcv<C> for tokio::sync::oneshot::Receiver<C>
86{
87    fn recv_once_blocking(self) -> SyRes<C> 
88    {
89        return 
90            self
91                .blocking_recv()
92                .map_err(|e|
93                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
94                );
95    }
96
97    async 
98    fn recv_once(self) -> SyRes<C> where Self: Sized
99    {
100        return 
101            self
102                .await
103                .map_err(|e|
104                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
105                );
106
107    }
108}
109
110#[derive(Debug, Clone)]
111pub struct TokioQueueAdapter;
112
113unsafe impl Send for TokioQueueAdapter {}
114
115impl<F: SyslogFormatter, D: SyslogDestination> SyslogQueueChannel<F, D> for TokioQueueAdapter
116{
117    const ADAPTER_NAME: &'static str = "tokio";
118    
119    type ChannelSnd = tokio::sync::mpsc::UnboundedSender<SyCmd<F, D, Self>>;
120
121    type ChannelRcv = tokio::sync::mpsc::UnboundedReceiver<SyCmd<F, D, Self>>;
122
123    type OneShotChannelSnd<C: Send + fmt::Debug> = tokio::sync::oneshot::Sender<C>;
124
125    type OneShotChannelRcv<C> = tokio::sync::oneshot::Receiver<C>;
126
127    fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv) 
128    {
129        return tokio::sync::mpsc::unbounded_channel::<SyCmd<F, D, Self>>();
130    }
131
132    fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>) 
133    {
134        return oneshot::channel::<C>();
135    }
136}
137
138
139pub type DefaultQueueAdapter = TokioQueueAdapter;
140