syslog_rs/a_sync/
syslog_async_internal.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 *   2. The MIT License (MIT)
12 *                     
13 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::borrow::Cow;
18use std::fmt;
19use std::marker::PhantomData;
20
21use nix::libc;
22
23use crate::a_sync::syslog_trait::AsyncSyslogApi;
24use crate::formatters::SyslogFormatter;
25use crate::map_error_os;
26use crate::portable;
27use crate::common::*;
28use crate::error::SyRes;
29use crate::socket::TapType;
30use crate::AsyncSyslogDestination;
31
32use super::async_socket::*;
33
34/// A trait which generalize some operations which are different from the
35/// various async providers i.e `smol` or `tokio` or external.
36#[allow(async_fn_in_trait)]
37pub trait AsyncSyslogInternalIO: fmt::Debug + Send + 'static
38{
39    /// Sends a message to syscons device. Usually this is a `/dev/console`
40    /// defined by crate::common::PATH_CONSOLE.
41    /// 
42    /// # Arguemnts
43    /// 
44    /// * `logstat` - a instance setup [LogStat].
45    /// 
46    /// * `msg_payload` - a payload of the syslog message (without headers).
47    async fn send_to_syscons(logstat: LogStat, msg_payload: &str);
48
49    /// Sends a message to stderr device.
50    /// 
51    /// # Arguemnts
52    /// 
53    /// * `logstat` - a instance setup [LogStat].
54    /// 
55    /// * `msg` - a payload of the syslog message (without headers).
56    async fn send_to_stderr(logstat: LogStat, msg: &str);
57
58    /// Sleep the current task for `us` microseconds.
59    /// 
60    /// # Arguments
61    /// 
62    /// * `us` - microseconds.
63    async fn sleep_micro(us: u64);
64}
65
66/// A trait which generalize the mutex from the std lib's of multiple async executors.
67/// The trait should be implemented on the mutex direclty.
68#[allow(async_fn_in_trait)]
69pub trait AsyncMutex<F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
70{
71    /// A mutex guard type.
72    type MutxGuard<'mux>: AsyncMutexGuard<'mux, F, D, DS> where Self: 'mux;
73
74    /// Creates new mutex instance for type which implements the [AsyncSyslogApi].
75    fn a_new(v: DS) -> Self;
76
77    /// Locks the mutex emmiting the `mutex guard`.
78    async fn a_lock<'mux>(&'mux self) -> Self::MutxGuard<'mux>;
79}
80
81/// A trait which generalize the mutex guarding emited by the mutex from various async executors.
82pub trait AsyncMutexGuard<'mux, F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
83{
84    /// Returns the reference to the inner type of the mutex guard.
85    fn guard(&self) -> &DS;
86
87    /// Returns the mutable reference to the inner type of the mutex guard.
88    fn guard_mut(&mut self) -> &mut DS;
89}
90
91/// Internal structure of the syslog async client.
92#[derive(Debug)]
93pub struct AsyncSyslogInternal<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO>
94{
95    /// An identification i.e program name, thread name
96    logtag: String, 
97
98    /// A pid of the program.
99    logpid: String,
100
101    /// Defines how syslog operates
102    logstat: LogStat,
103
104    /// Holds the facility 
105    facility: LogFacility,
106
107    /// A logmask
108    logmask: i32,
109
110    /// A stream
111    stream: D::SocketTap,
112
113    /// Phantom for [SyslogFormatter]
114    _p: PhantomData<F>,
115
116    /// Phantom for the [AsyncSyslogInternalIO] which provides writing to console and other IO.
117    _p2: PhantomData<IO>,
118}
119
120
121impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogInternal<F, D, IO>
122{
123    /// Creates new instance of [SyslogInternal] which contains all 
124    /// client syslog logic.
125    ///
126    /// # Arguments
127    ///
128    /// * `ident` - An optional argument which takes ref to str. If none, the
129    ///     ident will be set later. Yje ident will be trucated to 48 UTF8
130    ///     chars.
131    /// 
132    /// * `logstat` - A [LogStat] flags separated by '|'
133    /// 
134    /// * `facility` - A [LogFacility] flag
135    /// 
136    /// * `req_tap` - A type of the syslog instance reuired. See [SyslogDestination].
137    /// 
138    /// # Returns
139    ///
140    /// A [SyRes] is returned with following:
141    /// 
142    /// * [Result::Ok] - with the instance.
143    /// 
144    /// * [Result::Err] - with error description.
145    pub(crate) 
146    fn new(
147        ident: Option<&str>, 
148        logstat: LogStat, 
149        facility: LogFacility, 
150        req_tap: D
151    ) -> SyRes<Self>
152    {
153        // check if log_facility is invalid
154        let log_facility =
155            if facility.is_empty() == false && 
156                (facility & !LogMask::LOG_FACMASK).is_empty() == true
157            {
158                facility
159            }
160            else
161            {
162                // default facility code
163                LogFacility::LOG_USER
164            };
165
166        let logtag = 
167            match ident
168            {
169                Some(r) => 
170                    truncate_n(r, RFC_MAX_APP_NAME).to_string(),
171                None => 
172                    truncate_n(
173                        portable::p_getprogname()
174                            .unwrap_or("".to_string())
175                            .as_str(),
176                        RFC_MAX_APP_NAME
177                    )
178                    .to_string()
179            };
180
181        let sock = D::SocketTap::new(req_tap)?;
182
183        return Ok(
184            Self
185            {
186                logtag: logtag,
187                logpid: portable::get_pid().to_string(),
188                logstat: logstat, 
189                facility: log_facility,
190                logmask: 0xff,
191                stream: sock,
192                _p: PhantomData,
193                _p2: PhantomData,
194            }
195        );
196    }
197
198    #[inline]
199    pub(crate) 
200    fn is_logmasked(&self, pri: i32) -> bool
201    {
202        if ((1 << (pri & LogMask::LOG_PRIMASK)) & self.logmask) == 0
203        {
204            return true;
205        }
206
207        return false;
208    }
209
210        /// Returns the type of the socket.
211    #[inline]
212    pub(crate)
213    fn get_taptype(&self) -> TapType
214    {
215        return self.stream.get_type();
216    }
217
218    /* 
219    /// Returns the maximum msg size in bytes. (Full msg with headers.)
220    #[inline]
221    pub(crate) 
222    fn get_max_msg_size(&self) -> usize
223    {
224        return self.stream.get_max_msg_size();
225    }
226    */
227
228    #[inline]
229    pub(crate) 
230    fn set_logtag<L: AsRef<str>>(&mut self, logtag: L, update_pid: bool)
231    {
232        self.logtag = 
233            truncate_n(logtag.as_ref(), RFC_MAX_APP_NAME).to_string();
234
235        if update_pid == true
236        {
237            self.logpid = portable::get_pid().to_string();
238        }
239
240        return;
241    }
242
243    /// Disconnects the unix stream from syslog.
244    #[inline]
245    pub(crate) async 
246    fn disconnectlog(&mut self)  -> SyRes<()>
247    {
248        return 
249            self
250                .stream
251                .disconnectlog()
252                .await
253                .map_err(|e| map_error_os!(e, "can not disconnect log properly"));
254    }
255}
256
257impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogApi<F, D>
258for AsyncSyslogInternal<F, D, IO>
259{
260    async 
261    fn update_tap_data(&mut self, tap_data: D) -> SyRes<()>
262    {
263        let is_con = self.stream.is_connected();
264
265        if is_con == true
266        {
267            self
268                .stream
269                .disconnectlog()
270                .await
271                .map_err(|e|
272                    map_error_os!(e, "update_tap_data() can not disconnect log properly")
273                )?;
274        }
275
276        self.stream.update_tap_data(tap_data);
277
278        if is_con == true
279        {
280            // replace with new instance
281            self.stream.connectlog().await?;
282        }
283
284        return Ok(());
285    }
286 
287    #[inline]
288    fn change_identity(&mut self, ident: &str)
289    {
290        self.set_logtag(ident, true);
291    }
292
293    async 
294    fn reconnect(&mut self) -> SyRes<()>
295    {
296        self.disconnectlog().await?;
297
298        self.connectlog().await?;
299
300        return Ok(());
301    }
302
303    async 
304    fn closelog(&mut self) -> SyRes<()>
305    {
306        return self.disconnectlog().await;
307    }
308    
309    fn set_logmask(&mut self, logmask: i32) -> i32
310    {
311        let oldmask = self.logmask;
312
313        if logmask != 0
314        {
315            self.logmask = logmask;
316        }
317
318        return oldmask;
319    }
320
321
322    /// Connects unix stream to the syslog and sets up the properties of
323    /// the unix stream.
324    #[inline]
325    async 
326    fn connectlog(&mut self) -> SyRes<()>
327    {
328        return self.stream.connectlog().await;
329    }
330    
331    /// An internal function which is called by the syslog or vsyslog.
332    async 
333    fn vsyslog1(&mut self, mut pri: Priority, fmt: F)
334    {
335        // check for invalid bits
336        if let Err(e) = pri.check_invalid_bits()
337        {
338            IO::send_to_stderr(self.logstat, &e.to_string()).await;
339        }
340
341        /*match check_invalid_bits(&mut pri)
342        {
343            Ok(_) => {},
344            Err(_e) => self.vsyslog1(get_internal_log(), fmt).await
345        }*/
346
347        // check priority against setlogmask
348        if self.is_logmasked(pri.bits()) == true
349        {
350            return;
351        }
352
353        // set default facility if not specified in pri
354        if (pri.bits() & LOG_FACMASK) == 0
355        {
356            pri.set_facility(self.facility);
357        };
358
359        let mut msg_formatted = 
360            fmt.vsyslog1_format(D::SocketTap::get_max_msg_size(), pri, &self.logtag, &self.logpid);
361
362        // output to stderr if required
363        IO::send_to_stderr(self.logstat, msg_formatted.get_stderr_output()).await;
364
365        if self.stream.is_connected() == false
366        {
367            // open connection
368            match self.connectlog().await
369            {
370                Ok(_) => {},
371                Err(e) =>
372                {
373                    IO::send_to_stderr(self.logstat, &e.into_inner() ).await;
374                    return;
375                }
376            }
377        }
378        
379        let fullmsg = msg_formatted.get_full_msg();
380
381
382        // There are two possible scenarios when send may fail:
383        // 1. syslog temporary unavailable
384        // 2. syslog out of buffer space
385        // If we are connected to priv socket then in case of 1 we reopen connection
386        //      and retry once.
387        // If we are connected to unpriv then in case of 2 repeatedly retrying to send
388        //      until syslog socket buffer space will be cleared
389
390        loop
391        {
392            match self.stream.send(fullmsg.as_bytes()).await
393            {
394                Ok(_) => return,
395                Err(err) =>
396                {   
397                    if self.get_taptype().is_network() == false
398                    {
399                        if let Some(libc::ENOBUFS) = err.raw_os_error()
400                        {
401                            // scenario 2
402                            if self.get_taptype().is_priv() == true
403                            {
404                                break;
405                            }
406
407                            IO::sleep_micro(1).await;
408                            //sleep(Duration::from_micros(1)).await;
409                        }
410                        else
411                        {
412                            // scenario 1
413                            let _ = self.disconnectlog().await;
414                            match self.connectlog().await
415                            {
416                                Ok(_) => {},
417                                Err(_e) => break,
418                            }
419
420                            // if resend will fail then probably the scn 2 will take place
421                        } 
422                    }
423                    else
424                    {
425                        let _ = self.disconnectlog().await;
426                        match self.connectlog().await
427                        {
428                            Ok(_) => {},
429                            Err(_e) => break,
430                        }
431                    }  
432                }
433            }
434        } // loop
435
436
437        // If program reached this point then transmission over socket failed.
438        // Try to output message to console
439
440        IO::send_to_syscons(self.logstat, msg_formatted.get_stderr_output()).await;
441    }
442}