syslog_rs/a_sync/
syslog_async_queue.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * Licensed under the EUPL, Version 1.2 or - as soon they will be approved by
7 * the European Commission - subsequent versions of the EUPL (the "Licence").
8 * 
9 * You may not use this work except in compliance with the Licence.
10 * 
11 * You may obtain a copy of the Licence at:
12 * 
13 *    https://joinup.ec.europa.eu/software/page/eupl
14 * 
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the Licence is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 * Licence for the specific language governing permissions and limitations
19 * under the Licence.
20 */
21
22use std::marker::PhantomData;
23
24use async_trait::async_trait;
25use tokio::sync::mpsc::UnboundedSender;
26
27use crate::
28{
29    error::SyRes, 
30    formatters::SyslogFormatter, 
31    map_error_code, 
32    socket::TapTypeData, 
33    sy_sync_queue::{SyCmd, SyslogQueueAdapter}, 
34    Priority
35};
36
37use super::syslog_trait::AsyncSyslogApi;
38
39
40
41/// A structure for the `sy_sync_queue::Syslog` instace mirrowing.
42#[derive(Debug)]
43pub struct AsyncSyslogQueue<F: SyslogFormatter>
44{
45    /// commands channel
46    tasks: UnboundedSender<SyCmd>,
47
48    _p: PhantomData<F>
49}
50
51unsafe impl<F: SyslogFormatter> Send for AsyncSyslogQueue<F> {}
52unsafe impl<F: SyslogFormatter> Sync for AsyncSyslogQueue<F> {}
53
54
55impl<F: SyslogFormatter> AsyncSyslogQueue<F>
56{
57    pub(crate) async 
58    fn attachlog(queue_adapter: SyslogQueueAdapter) -> SyRes<Self>
59    {
60        return Ok(
61            Self
62            {
63                tasks: queue_adapter.consume()?,
64                _p: PhantomData::<F>,
65            }
66        );
67    }
68}
69
70
71#[async_trait]
72impl<F: SyslogFormatter> AsyncSyslogApi for AsyncSyslogQueue<F>
73{
74    async 
75    fn connectlog(&mut self) -> SyRes<()>
76    {
77        let (sy_cmd, loopback) = 
78            SyCmd::form_connectlog();
79
80        self
81            .tasks
82            .send(sy_cmd)
83            .map_err(|e| map_error_code!(SendError, "connectlog() error: '{}'", e))?;
84
85        return 
86            loopback
87                .await
88                .map_err(|e| 
89                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
90                )?;
91    }
92
93    async 
94    fn setlogmask(&self, logmask: i32) -> SyRes<i32>
95    {
96        let (sy_cmd, loopback) = 
97            SyCmd::form_logmask(logmask);
98
99        self
100            .tasks
101            .send(sy_cmd)
102            .map_err(|e| map_error_code!(SendError, "closelog() error: '{}'", e))?;
103
104        return 
105            loopback
106                .await
107                .map_err(|e| 
108                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
109                );
110    }
111
112    async 
113    fn closelog(&self) -> SyRes<()>
114    {
115        let (sy_cmd, loopback) = 
116            SyCmd::form_disconnectlog();
117
118        // send stop
119        self
120            .tasks
121            .send(sy_cmd)
122            .map_err(|e| 
123                map_error_code!(SendError, "closelog() error: '{}'", e)
124            )?;
125
126        return 
127            loopback
128                .await
129                .map_err(|e| 
130                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
131                )?;
132    }
133
134    async 
135    fn syslog(&self, pri: Priority, fmt: String)
136    {
137        // even if the thread is in a process of termination, there is
138        // no need to sync access to the run_control field as even if
139        // syslog thread will terminate before someone push something on the
140        // queue, it will be left in the queue until the end of program's time.
141        let sy_cmd = SyCmd::form_syslog(pri, fmt);
142
143        let _ = self.tasks.send(sy_cmd);
144
145        return;
146    }
147
148    async 
149    fn vsyslog(&self, pri: Priority, fmt: &str)
150    {
151        let sy_cmd = SyCmd::form_syslog(pri, fmt.to_string());
152
153        let _ = self.tasks.send(sy_cmd);
154
155        return;
156    }
157
158    async 
159    fn change_identity(&self, ident: &str) -> SyRes<()>
160    {
161        let sy_cmd =  SyCmd::form_change_ident(ident.to_string());
162
163        return 
164            self
165                .tasks
166                .send(sy_cmd)
167                .map_err(|e| 
168                    map_error_code!(SendError, "change_identity() error: '{}'", e)
169                );
170    }
171
172    async 
173    fn reconnect(&self) -> SyRes<()>
174    {
175        return 
176            self
177                .tasks
178                .send(SyCmd::form_reconnect())
179                .map_err(|e| 
180                    map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
181                );
182    }
183
184    async 
185    fn update_tap_data(&self, tap_data: TapTypeData) -> SyRes<()>
186    {
187        let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
188
189        self
190            .tasks
191            .send(tap_data_cmd)
192            .map_err(|e| 
193                map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
194            )?;
195
196        return 
197            loopback
198                .await
199                .map_err(|e| 
200                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
201                )?;
202    }
203}
204