syslog_rs/a_sync/
syslog_async_internal.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 *   2. The MIT License (MIT)
12 *                     
13 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::fmt;
18use std::marker::PhantomData;
19
20use crate::a_sync::syslog_trait::AsyncSyslogApi;
21use crate::formatters::SyslogFormatter;
22use crate::map_error_os;
23use crate::portable;
24use crate::common::*;
25use crate::error::SyRes;
26use crate::socket::TapType;
27use crate::AsyncSyslogDestination;
28
29use super::async_socket::*;
30
31/// A trait which generalize some operations which are different from the
32/// various async providers i.e `smol` or `tokio` or external.
33#[allow(async_fn_in_trait)]
34pub trait AsyncSyslogInternalIO: fmt::Debug + Send + 'static
35{
36    /// Sends a message to syscons device. Usually this is a `/dev/console`
37    /// defined by crate::common::PATH_CONSOLE.
38    /// 
39    /// # Arguemnts
40    /// 
41    /// * `logstat` - a instance setup [LogStat].
42    /// 
43    /// * `msg_payload` - a payload of the syslog message (without headers).
44    async fn send_to_syscons(logstat: LogStat, msg_payload: &str);
45
46    /// Sends a message to stderr device.
47    /// 
48    /// # Arguemnts
49    /// 
50    /// * `logstat` - a instance setup [LogStat].
51    /// 
52    /// * `msg` - a payload of the syslog message (without headers).
53    async fn send_to_stderr(logstat: LogStat, msg: &str);
54
55    /// Sleep the current task for `us` microseconds.
56    /// 
57    /// # Arguments
58    /// 
59    /// * `us` - microseconds.
60    async fn sleep_micro(us: u64);
61}
62
63/// A trait which generalize the mutex from the std lib's of multiple async executors.
64/// The trait should be implemented on the mutex direclty.
65#[allow(async_fn_in_trait)]
66pub trait AsyncMutex<F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
67{
68    /// A mutex guard type.
69    type MutxGuard<'mux>: AsyncMutexGuard<'mux, F, D, DS> where Self: 'mux;
70
71    /// Creates new mutex instance for type which implements the [AsyncSyslogApi].
72    fn a_new(v: DS) -> Self;
73
74    /// Locks the mutex emmiting the `mutex guard`.
75    async fn a_lock<'mux>(&'mux self) -> Self::MutxGuard<'mux>;
76}
77
78/// A trait which generalize the mutex guarding emited by the mutex from various async executors.
79pub trait AsyncMutexGuard<'mux, F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
80{
81    /// Returns the reference to the inner type of the mutex guard.
82    fn guard(&self) -> &DS;
83
84    /// Returns the mutable reference to the inner type of the mutex guard.
85    fn guard_mut(&mut self) -> &mut DS;
86}
87
88/// Internal structure of the syslog async client.
89#[derive(Debug)]
90pub struct AsyncSyslogInternal<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO>
91{
92    /// An identification i.e program name, thread name
93    logtag: String, 
94
95    /// A pid of the program.
96    logpid: String,
97
98    /// Defines how syslog operates
99    logstat: LogStat,
100
101    /// Holds the facility 
102    facility: LogFacility,
103
104    /// A logmask
105    logmask: i32,
106
107    /// A stream
108    stream: D::SocketTap,
109
110    /// Phantom for [SyslogFormatter]
111    _p: PhantomData<F>,
112
113    /// Phantom for the [AsyncSyslogInternalIO] which provides writing to console and other IO.
114    _p2: PhantomData<IO>,
115}
116
117
118impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogInternal<F, D, IO>
119{
120    /// Creates new instance of [SyslogInternal] which contains all 
121    /// client syslog logic.
122    ///
123    /// # Arguments
124    ///
125    /// * `ident` - An optional argument which takes ref to str. If none, the
126    ///     ident will be set later. Yje ident will be trucated to 48 UTF8
127    ///     chars.
128    /// 
129    /// * `logstat` - A [LogStat] flags separated by '|'
130    /// 
131    /// * `facility` - A [LogFacility] flag
132    /// 
133    /// * `req_tap` - A type of the syslog instance reuired. See [SyslogDestination].
134    /// 
135    /// # Returns
136    ///
137    /// A [SyRes] is returned with following:
138    /// 
139    /// * [Result::Ok] - with the instance.
140    /// 
141    /// * [Result::Err] - with error description.
142    pub(crate) 
143    fn new(
144        ident: Option<&str>, 
145        logstat: LogStat, 
146        facility: LogFacility, 
147        req_tap: D
148    ) -> SyRes<Self>
149    {
150        // check if log_facility is invalid
151        let log_facility =
152            if facility.is_empty() == false && 
153                (facility & !LogMask::LOG_FACMASK).is_empty() == true
154            {
155                facility
156            }
157            else
158            {
159                // default facility code
160                LogFacility::LOG_USER
161            };
162
163        let logtag = 
164            match ident
165            {
166                Some(r) => 
167                    truncate_n(r, RFC_MAX_APP_NAME).to_string(),
168                None => 
169                    truncate_n(
170                        portable::p_getprogname()
171                            .unwrap_or("".to_string())
172                            .as_str(),
173                        RFC_MAX_APP_NAME
174                    )
175                    .to_string()
176            };
177
178        let sock = D::SocketTap::new(req_tap)?;
179
180        return Ok(
181            Self
182            {
183                logtag: logtag,
184                logpid: portable::get_pid().to_string(),
185                logstat: logstat, 
186                facility: log_facility,
187                logmask: 0xff,
188                stream: sock,
189                _p: PhantomData,
190                _p2: PhantomData,
191            }
192        );
193    }
194
195    #[inline]
196    pub(crate) 
197    fn is_logmasked(&self, pri: i32) -> bool
198    {
199        if ((1 << (pri & LogMask::LOG_PRIMASK)) & self.logmask) == 0
200        {
201            return true;
202        }
203
204        return false;
205    }
206
207        /// Returns the type of the socket.
208    #[inline]
209    pub(crate)
210    fn get_taptype(&self) -> TapType
211    {
212        return self.stream.get_type();
213    }
214
215    /* 
216    /// Returns the maximum msg size in bytes. (Full msg with headers.)
217    #[inline]
218    pub(crate) 
219    fn get_max_msg_size(&self) -> usize
220    {
221        return self.stream.get_max_msg_size();
222    }
223    */
224
225    #[inline]
226    pub(crate) 
227    fn set_logtag<L: AsRef<str>>(&mut self, logtag: L, update_pid: bool)
228    {
229        self.logtag = 
230            truncate_n(logtag.as_ref(), RFC_MAX_APP_NAME).to_string();
231
232        if update_pid == true
233        {
234            self.logpid = portable::get_pid().to_string();
235        }
236
237        return;
238    }
239
240    /// Disconnects the unix stream from syslog.
241    #[inline]
242    pub(crate) async 
243    fn disconnectlog(&mut self)  -> SyRes<()>
244    {
245        return 
246            self
247                .stream
248                .disconnectlog()
249                .await
250                .map_err(|e| map_error_os!(e, "can not disconnect log properly"));
251    }
252}
253
254impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogApi<F, D>
255for AsyncSyslogInternal<F, D, IO>
256{
257    async 
258    fn update_tap_data(&mut self, tap_data: D) -> SyRes<()>
259    {
260        let is_con = self.stream.is_connected();
261
262        if is_con == true
263        {
264            self
265                .stream
266                .disconnectlog()
267                .await
268                .map_err(|e|
269                    map_error_os!(e, "update_tap_data() can not disconnect log properly")
270                )?;
271        }
272
273        self.stream.update_tap_data(tap_data);
274
275        if is_con == true
276        {
277            // replace with new instance
278            self.stream.connectlog().await?;
279        }
280
281        return Ok(());
282    }
283 
284    #[inline]
285    fn change_identity(&mut self, ident: &str)
286    {
287        self.set_logtag(ident, true);
288    }
289
290    async 
291    fn reconnect(&mut self) -> SyRes<()>
292    {
293        self.disconnectlog().await?;
294
295        self.connectlog().await?;
296
297        return Ok(());
298    }
299
300    async 
301    fn closelog(&mut self) -> SyRes<()>
302    {
303        return self.disconnectlog().await;
304    }
305    
306    fn set_logmask(&mut self, logmask: i32) -> i32
307    {
308        let oldmask = self.logmask;
309
310        if logmask != 0
311        {
312            self.logmask = logmask;
313        }
314
315        return oldmask;
316    }
317
318
319    /// Connects unix stream to the syslog and sets up the properties of
320    /// the unix stream.
321    #[inline]
322    async 
323    fn connectlog(&mut self) -> SyRes<()>
324    {
325        return self.stream.connectlog().await;
326    }
327    
328    /// An internal function which is called by the syslog or vsyslog.
329    async 
330    fn vsyslog1(&mut self, mut pri: Priority, fmt: F)
331    {
332        // check for invalid bits
333        if let Err(e) = pri.check_invalid_bits()
334        {
335            IO::send_to_stderr(self.logstat, &e.to_string()).await;
336        }
337
338        /*match check_invalid_bits(&mut pri)
339        {
340            Ok(_) => {},
341            Err(_e) => self.vsyslog1(get_internal_log(), fmt).await
342        }*/
343
344        // check priority against setlogmask
345        if self.is_logmasked(pri.bits()) == true
346        {
347            return;
348        }
349
350        // set default facility if not specified in pri
351        if (pri.bits() & LOG_FACMASK) == 0
352        {
353            pri.set_facility(self.facility);
354        };
355
356        // set PID if needed
357        let msg_pid = 
358            if self.logstat.intersects(LogStat::LOG_PID) == true
359            {
360                Some(self.logpid.as_str())
361            }
362            else
363            {
364                None
365            };
366
367        let mut msg_formatted = 
368            fmt.vsyslog1_format(D::SocketTap::get_max_msg_size(), pri, &self.logtag, msg_pid);
369
370        // output to stderr if required
371        IO::send_to_stderr(self.logstat, msg_formatted.get_stderr_output()).await;
372
373        if self.stream.is_connected() == false
374        {
375            // open connection
376            match self.connectlog().await
377            {
378                Ok(_) => {},
379                Err(e) =>
380                {
381                    IO::send_to_stderr(self.logstat, &e.into_inner() ).await;
382                    return;
383                }
384            }
385        }
386        
387        let fullmsg = msg_formatted.get_full_msg();
388
389
390        // There are two possible scenarios when send may fail:
391        // 1. syslog temporary unavailable
392        // 2. syslog out of buffer space
393        // If we are connected to priv socket then in case of 1 we reopen connection
394        //      and retry once.
395        // If we are connected to unpriv then in case of 2 repeatedly retrying to send
396        //      until syslog socket buffer space will be cleared
397
398        loop
399        {
400            match self.stream.send(fullmsg.as_bytes()).await
401            {
402                Ok(_) => 
403                    return,
404                Err(err) =>
405                {   
406                    if self.get_taptype().is_network() == false
407                    {
408                        #[cfg(target_family = "unix")]
409                        {
410                            if let Some(nix::libc::ENOBUFS) = err.raw_os_error()
411                            {
412                                // scenario 2
413                                if self.get_taptype().is_priv() == true
414                                {
415                                    break;
416                                }
417
418                                IO::sleep_micro(1).await;
419                                //sleep(Duration::from_micros(1)).await;
420                            }
421                            else
422                            {
423                                // scenario 1
424                                let _ = self.disconnectlog().await;
425                                match self.connectlog().await
426                                {
427                                    Ok(_) => {},
428                                    Err(_e) => break,
429                                }
430
431                               
432                            } 
433                        }
434
435                         #[cfg(target_family = "windows")]
436                        {
437                            let Ok(werr) = err.downcast::<windows::core::Error>()
438                                else
439                                {
440                                    IO::send_to_stderr(self.logstat, "error downcast failed").await;
441                                    break;
442                                };
443
444                            IO::send_to_stderr(self.logstat, &werr.message()).await;   
445              
446                            break;
447                        }
448
449                        // if resend will fail then probably the scn 2 will take place
450                    }
451                    else
452                    {
453                        let _ = self.disconnectlog().await;
454                        match self.connectlog().await
455                        {
456                            Ok(_) => {},
457                            Err(_e) => break,
458                        }
459                    }  
460                }
461            }
462        } // loop
463
464
465        // If program reached this point then transmission over socket failed.
466        // Try to output message to console
467
468        IO::send_to_syscons(self.logstat, msg_formatted.get_stderr_output()).await;
469    }
470}