syslog_rs/a_sync/
syslog_async_internal.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14
15use std::borrow::Cow;
16use std::fmt;
17use std::marker::PhantomData;
18
19use nix::libc;
20
21use crate::a_sync::syslog_trait::AsyncSyslogApi;
22use crate::formatters::SyslogFormatter;
23use crate::map_error_os;
24use crate::portable;
25use crate::common::*;
26use crate::error::SyRes;
27use crate::socket::TapType;
28use crate::AsyncSyslogDestination;
29
30use super::async_socket::*;
31
32/// A trait which generalize some operations which are different from the
33/// various async providers i.e `smol` or `tokio` or external.
34#[allow(async_fn_in_trait)]
35pub trait AsyncSyslogInternalIO: fmt::Debug + Send + 'static
36{
37    /// Sends a message to syscons device. Usually this is a `/dev/console`
38    /// defined by crate::common::PATH_CONSOLE.
39    /// 
40    /// # Arguemnts
41    /// 
42    /// * `logstat` - a instance setup [LogStat].
43    /// 
44    /// * `msg_payload` - a payload of the syslog message (without headers).
45    async fn send_to_syscons(logstat: LogStat, msg_payload: &[Cow<'_, str>]);
46
47    /// Sends a message to stderr device.
48    /// 
49    /// # Arguemnts
50    /// 
51    /// * `logstat` - a instance setup [LogStat].
52    /// 
53    /// * `msg` - a payload of the syslog message (without headers).
54    async fn send_to_stderr(logstat: LogStat, msg: &[Cow<'_, str>]);
55
56    /// Sleep the current task for `us` microseconds.
57    /// 
58    /// # Arguments
59    /// 
60    /// * `us` - microseconds.
61    async fn sleep_micro(us: u64);
62}
63
64/// A trait which generalize the mutex from the std lib's of multiple async executors.
65/// The trait should be implemented on the mutex direclty.
66#[allow(async_fn_in_trait)]
67pub trait AsyncMutex<F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
68{
69    /// A mutex guard type.
70    type MutxGuard<'mux>: AsyncMutexGuard<'mux, F, D, DS> where Self: 'mux;
71
72    /// Creates new mutex instance for type which implements the [AsyncSyslogApi].
73    fn a_new(v: DS) -> Self;
74
75    /// Locks the mutex emmiting the `mutex guard`.
76    async fn a_lock<'mux>(&'mux self) -> Self::MutxGuard<'mux>;
77}
78
79/// A trait which generalize the mutex guarding emited by the mutex from various async executors.
80pub trait AsyncMutexGuard<'mux, F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
81{
82    /// Returns the reference to the inner type of the mutex guard.
83    fn guard(&self) -> &DS;
84
85    /// Returns the mutable reference to the inner type of the mutex guard.
86    fn guard_mut(&mut self) -> &mut DS;
87}
88
89/// Internal structure of the syslog async client.
90#[derive(Debug)]
91pub struct AsyncSyslogInternal<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO>
92{
93    /// An identification i.e program name, thread name
94    logtag: String, 
95
96    /// A pid of the program.
97    logpid: String,
98
99    /// Defines how syslog operates
100    logstat: LogStat,
101
102    /// Holds the facility 
103    facility: LogFacility,
104
105    /// A logmask
106    logmask: i32,
107
108    /// A stream
109    stream: D::SocketTap,
110
111    /// Phantom for [SyslogFormatter]
112    _p: PhantomData<F>,
113
114    /// Phantom for the [AsyncSyslogInternalIO] which provides writing to console and other IO.
115    _p2: PhantomData<IO>,
116}
117
118
119impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogInternal<F, D, IO>
120{
121    /// Creates new instance of [SyslogInternal] which contains all 
122    /// client syslog logic.
123    ///
124    /// # Arguments
125    ///
126    /// * `ident` - An optional argument which takes ref to str. If none, the
127    ///     ident will be set later. Yje ident will be trucated to 48 UTF8
128    ///     chars.
129    /// 
130    /// * `logstat` - A [LogStat] flags separated by '|'
131    /// 
132    /// * `facility` - A [LogFacility] flag
133    /// 
134    /// * `req_tap` - A type of the syslog instance reuired. See [SyslogDestination].
135    /// 
136    /// # Returns
137    ///
138    /// A [SyRes] is returned with following:
139    /// 
140    /// * [Result::Ok] - with the instance.
141    /// 
142    /// * [Result::Err] - with error description.
143    pub(crate) 
144    fn new(
145        ident: Option<&str>, 
146        logstat: LogStat, 
147        facility: LogFacility, 
148        req_tap: D
149    ) -> SyRes<Self>
150    {
151        // check if log_facility is invalid
152        let log_facility =
153            if facility.is_empty() == false && 
154                (facility & !LogMask::LOG_FACMASK).is_empty() == true
155            {
156                facility
157            }
158            else
159            {
160                // default facility code
161                LogFacility::LOG_USER
162            };
163
164        let logtag = 
165            match ident
166            {
167                Some(r) => 
168                    truncate_n(r, RFC_MAX_APP_NAME).to_string(),
169                None => 
170                    truncate_n(
171                        portable::p_getprogname()
172                            .unwrap_or("".to_string())
173                            .as_str(),
174                        RFC_MAX_APP_NAME
175                    )
176                    .to_string()
177            };
178
179        let sock = D::SocketTap::new(req_tap)?;
180
181        return Ok(
182            Self
183            {
184                logtag: logtag,
185                logpid: portable::get_pid().to_string(),
186                logstat: logstat, 
187                facility: log_facility,
188                logmask: 0xff,
189                stream: sock,
190                _p: PhantomData,
191                _p2: PhantomData,
192            }
193        );
194    }
195
196    #[inline]
197    pub(crate) 
198    fn is_logmasked(&self, pri: i32) -> bool
199    {
200        if ((1 << (pri & LogMask::LOG_PRIMASK)) & self.logmask) == 0
201        {
202            return true;
203        }
204
205        return false;
206    }
207
208        /// Returns the type of the socket.
209    #[inline]
210    pub(crate)
211    fn get_taptype(&self) -> TapType
212    {
213        return self.stream.get_type();
214    }
215
216    /// Returns the maximum msg size in bytes. (Full msg with headers.)
217    #[inline]
218    pub(crate) 
219    fn get_max_msg_size(&self) -> usize
220    {
221        return self.stream.get_max_msg_size();
222    }
223
224    #[inline]
225    pub(crate) 
226    fn set_logtag<L: AsRef<str>>(&mut self, logtag: L, update_pid: bool)
227    {
228        self.logtag = 
229            truncate_n(logtag.as_ref(), RFC_MAX_APP_NAME).to_string();
230
231        if update_pid == true
232        {
233            self.logpid = portable::get_pid().to_string();
234        }
235
236        return;
237    }
238
239    /// Disconnects the unix stream from syslog.
240    #[inline]
241    pub(crate) async 
242    fn disconnectlog(&mut self)  -> SyRes<()>
243    {
244        return 
245            self
246                .stream
247                .disconnectlog()
248                .await
249                .map_err(|e| map_error_os!(e, "can not disconnect log properly"));
250    }
251}
252
253impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogApi<F, D>
254for AsyncSyslogInternal<F, D, IO>
255{
256    async 
257    fn update_tap_data(&mut self, tap_data: D) -> SyRes<()>
258    {
259        let is_con = self.stream.is_connected();
260
261        if is_con == true
262        {
263            self
264                .stream
265                .disconnectlog()
266                .await
267                .map_err(|e|
268                    map_error_os!(e, "update_tap_data() can not disconnect log properly")
269                )?;
270        }
271
272        self.stream.update_tap_data(tap_data);
273
274        if is_con == true
275        {
276            // replace with new instance
277            self.stream.connectlog().await?;
278        }
279
280        return Ok(());
281    }
282 
283    #[inline]
284    fn change_identity(&mut self, ident: &str)
285    {
286        self.set_logtag(ident, true);
287    }
288
289    async 
290    fn reconnect(&mut self) -> SyRes<()>
291    {
292        self.disconnectlog().await?;
293
294        self.connectlog().await?;
295
296        return Ok(());
297    }
298
299    async 
300    fn closelog(&mut self) -> SyRes<()>
301    {
302        return self.disconnectlog().await;
303    }
304    
305    fn set_logmask(&mut self, logmask: i32) -> i32
306    {
307        let oldmask = self.logmask;
308
309        if logmask != 0
310        {
311            self.logmask = logmask;
312        }
313
314        return oldmask;
315    }
316
317
318    /// Connects unix stream to the syslog and sets up the properties of
319    /// the unix stream.
320    #[inline]
321    async 
322    fn connectlog(&mut self) -> SyRes<()>
323    {
324        return self.stream.connectlog().await;
325    }
326    
327    /// An internal function which is called by the syslog or vsyslog.
328    async 
329    fn vsyslog1(&mut self, mut pri: Priority, fmt: F)
330    {
331        // check for invalid bits
332        if let Err(e) = pri.check_invalid_bits()
333        {
334            IO::send_to_stderr(self.logstat, &[Cow::Owned(e.to_string())]).await;
335        }
336
337        /*match check_invalid_bits(&mut pri)
338        {
339            Ok(_) => {},
340            Err(_e) => self.vsyslog1(get_internal_log(), fmt).await
341        }*/
342
343        // check priority against setlogmask
344        if self.is_logmasked(pri.bits()) == true
345        {
346            return;
347        }
348
349        // set default facility if not specified in pri
350        if (pri.bits() & LOG_FACMASK) == 0
351        {
352            pri.set_facility(self.facility);
353        }
354
355        let progname = self.logtag.clone();
356        let pid = self.logpid.clone();
357
358        let msg_formatted = 
359            F::vsyslog1_format(self.get_taptype(), self.get_max_msg_size(), pri, &progname, &pid, &fmt);
360
361        // output to stderr if required
362        IO::send_to_stderr(self.logstat, msg_formatted.get_stderr_output()).await;
363
364        if self.stream.is_connected() == false
365        {
366            // open connection
367            match self.connectlog().await
368            {
369                Ok(_) => {},
370                Err(e) =>
371                {
372                    IO::send_to_stderr(self.logstat, &[Cow::Owned(e.into_inner())] ).await;
373                    return;
374                }
375            }
376        }
377        
378        let fullmsg = msg_formatted.concat();
379
380
381        // There are two possible scenarios when send may fail:
382        // 1. syslog temporary unavailable
383        // 2. syslog out of buffer space
384        // If we are connected to priv socket then in case of 1 we reopen connection
385        //      and retry once.
386        // If we are connected to unpriv then in case of 2 repeatedly retrying to send
387        //      until syslog socket buffer space will be cleared
388
389        loop
390        {
391            match self.stream.send(fullmsg.as_bytes()).await
392            {
393                Ok(_) => return,
394                Err(err) =>
395                {   
396                    if self.get_taptype().is_network() == false
397                    {
398                        if let Some(libc::ENOBUFS) = err.raw_os_error()
399                        {
400                            // scenario 2
401                            if self.get_taptype().is_priv() == true
402                            {
403                                break;
404                            }
405
406                            IO::sleep_micro(1).await;
407                            //sleep(Duration::from_micros(1)).await;
408                        }
409                        else
410                        {
411                            // scenario 1
412                            let _ = self.disconnectlog().await;
413                            match self.connectlog().await
414                            {
415                                Ok(_) => {},
416                                Err(_e) => break,
417                            }
418
419                            // if resend will fail then probably the scn 2 will take place
420                        } 
421                    }
422                    else
423                    {
424                        let _ = self.disconnectlog().await;
425                        match self.connectlog().await
426                        {
427                            Ok(_) => {},
428                            Err(_e) => break,
429                        }
430                    }  
431                }
432            }
433        } // loop
434
435
436        // If program reached this point then transmission over socket failed.
437        // Try to output message to console
438
439        IO::send_to_syscons(self.logstat, msg_formatted.get_stderr_output()).await;
440    }
441}