syslog_rs/a_sync/
syslog_async.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * Licensed under the EUPL, Version 1.2 or - as soon they will be approved by
7 * the European Commission - subsequent versions of the EUPL (the "Licence").
8 * 
9 * You may not use this work except in compliance with the Licence.
10 * 
11 * You may obtain a copy of the Licence at:
12 * 
13 *    https://joinup.ec.europa.eu/software/page/eupl
14 * 
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the Licence is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 * Licence for the specific language governing permissions and limitations
19 * under the Licence.
20 */
21
22use std::marker::PhantomData;
23
24use crate::{error::SyRes, formatters::{DefaultSyslogFormatter, SyslogFormatter}, syslog_provider::*, LogFacility, LogStat, Priority};
25
26#[cfg(feature = "use_sync")]
27use crate::sy_sync::Syslog;
28
29
30#[cfg(feature = "use_sync_queue")]
31use crate::sy_sync_queue::SyslogQueue;
32#[cfg(feature = "use_sync_queue")]
33use super::syslog_async_queue::AsyncSyslogQueue;
34
35use super::{syslog_async_shared::AsyncSyslogShared, syslog_trait::AsyncSyslogApi};
36
37
38/// A main instance of the Syslog client.
39/// 
40/// * `D` - a [SyslogDestination] instance which is either:
41///     [SyslogLocal], [SyslogFile], [SyslogNet], [SyslogTls]. By
42///     default a `SyslogLocal` is set.
43/// 
44/// * `F` - a [SyslogFormatter] a message formatter for the destination
45///     server. See `formatters` i.e `FormatRfc3146`, `FormatRfc5424`, `FormatFile`. Or
46///     `DefaultSyslogFormatter`, `DefaultSyslogFormatterFile`.
47/// 
48/// * `S` - a [AsyncSyslogApi] sybsystem. It is either [AsyncSyslogShared] or [AsyncSyslogQueue] with 
49///     the same 'F' - formatter type. By default a [AsyncSyslogShared] is set.
50#[derive(Debug)]
51pub struct AsyncSyslog<D = SyslogLocal, F = DefaultSyslogFormatter, S = AsyncSyslogShared<F>>
52    (S, PhantomData<F>, PhantomData<D>)
53where D: SyslogDestination, F: SyslogFormatter + Sync, S: AsyncSyslogApi;
54
55impl AsyncSyslog
56{
57    /// Opens a default async connection to the local syslog server with default formatter.
58    /// 
59    /// # Arguments
60    /// 
61    /// * `ident` - A program name which will appear on the logs. If none, will be determined
62    ///     automatically.
63    /// 
64    /// * `logstat` - [LogStat] an instance config.
65    /// 
66    /// * `facility` - [LogFacility] a syslog facility.
67    /// 
68    /// * `dest` - a destination server. Allows to connect to the different local syslog server or
69    ///     will be determined automatically.
70    /// 
71    /// # Returns
72    /// 
73    /// A [SyRes] is returned ([Result]) with: 
74    /// 
75    /// * [Result::Ok] - with instance
76    /// 
77    /// * [Result::Err] - with error description.
78    pub async 
79    fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, dest: SyslogLocal) -> SyRes<Self>
80    {
81        let net_tap = dest.init()?;
82        
83        let syslog = AsyncSyslogShared::<DefaultSyslogFormatter>::openlog(ident, logstat, facility, net_tap).await?;
84
85        return Ok(Self(syslog, PhantomData::<DefaultSyslogFormatter>, PhantomData::<SyslogLocal>));
86    }
87}
88
89impl<D: SyslogDestination, F: SyslogFormatter + Sync> AsyncSyslog<D, F, AsyncSyslogShared<F>>
90{
91    /// Opens a special connection to the destination syslog server with specific formatter.
92    /// 
93    /// All struct generic should be specified before calling this function.
94    /// 
95    /// # Arguments
96    /// 
97    /// * `ident` - A program name which will appear on the logs. If none, will be determined
98    ///     automatically.
99    /// 
100    /// * `logstat` - [LogStat] an instance config.
101    /// 
102    /// * `facility` - [LogFacility] a syslog facility.
103    /// 
104    /// * `dest` - a destination server. Allows to connect to the different local syslog server or
105    ///     will be determined automatically.
106    /// 
107    /// # Returns
108    /// 
109    /// A [SyRes] is returned ([Result]) with: 
110    /// 
111    /// * [Result::Ok] - with instance
112    /// 
113    /// * [Result::Err] - with error description.
114    pub async 
115    fn openlog_with(ident: Option<&str>, logstat: LogStat, facility: LogFacility, dest: D) -> SyRes<AsyncSyslog<D, F, AsyncSyslogShared<F>>>
116    {
117        let net_tap = dest.init()?;
118        
119        let syslog = AsyncSyslogShared::<F>::openlog(ident, logstat, facility, net_tap).await?;
120       
121
122        return Ok(Self(syslog, PhantomData::<F>, PhantomData::<D>));
123    }
124
125    /// Sets the logmask to filter out the syslog calls. This function behaves 
126    /// differently as it behaves in syslog_sync.rs or syslog_async.rs.
127    /// It may return an error if: syslog thread had exit and some thread calls
128    /// this function. Or something happened with channel. 
129    /// This function blocks until the previous mask is received.
130    /// 
131    /// See macroses [LOG_MASK] and [LOG_UPTO] to generate mask
132    ///
133    /// # Example
134    ///
135    /// LOG_MASK!(Priority::LOG_EMERG) | LOG_MASK!(Priority::LOG_ERROR)
136    ///
137    /// or
138    ///
139    /// ~(LOG_MASK!(Priority::LOG_INFO))
140    /// LOG_UPTO!(Priority::LOG_ERROR) 
141    pub async 
142    fn setlogmask(&self, logmask: i32) -> SyRes<i32>
143    {
144        return self.0.setlogmask(logmask).await;
145    }
146
147    /// Changes the identity i.e program name which will appear on the logs.
148    /// 
149    /// Can return error if mutex is poisoned.
150    pub async 
151    fn change_identity(&self, ident: &str) -> SyRes<()>
152    {
153        return self.0.change_identity(ident).await;
154    }
155
156    /// Closes connection to the syslog server
157    pub async 
158    fn closelog(&self) -> SyRes<()>
159    {
160        return self.0.closelog().await;
161    }
162
163    /// Similar to libc, syslog() sends data to syslog server.
164    /// 
165    /// # Arguments
166    ///
167    /// * `pri` - a priority [Priority]
168    ///
169    /// * `fmt` - a program's message to be sent as payload.
170    #[inline]
171    pub async 
172    fn syslog(&self, pri: Priority, fmt: String)
173    {
174        self.0.syslog(pri, fmt).await;
175    }
176
177    /// Sends message to syslog (same as `syslog`).
178    #[inline]
179    pub async
180    fn vsyslog<M: AsRef<str>>(&self, pri: Priority, fmt: M)
181    {
182        self.0.vsyslog(pri, fmt.as_ref()).await;
183    }
184
185    /// Performs the reconnection to the syslog server or file re-open.
186    /// 
187    /// # Returns
188    /// 
189    /// A [Result] is retured as [SyRes].
190    /// 
191    /// * [Result::Ok] - with empty inner type.
192    /// 
193    /// * [Result::Err] - an error code and description
194    pub async 
195    fn reconnect(&self) -> SyRes<()>
196    {
197        return self.0.reconnect().await;
198    }
199
200    /// Updates the inner instance destionation i.e path to file
201    /// or server address. The type of destination can not be changed.
202    /// 
203    /// This function disconnects from syslog server if previously was 
204    /// connected (and reconnects if was connected previously).
205    /// 
206    /// # Arguments 
207    /// 
208    /// * `new_tap` - a consumed instance of type `D` [SyslogDestination]
209    /// 
210    /// # Returns 
211    /// 
212    /// A [SyRes] is returned. An error may be returned if:
213    /// 
214    /// * connection to server was failed
215    /// 
216    /// * incorrect type
217    /// 
218    /// * disconnect frm server failed
219    pub async 
220    fn update_tap(&self, new_tap: D) -> SyRes<()>
221    {
222        return self.0.update_tap_data(new_tap.init()?).await;
223    }
224}
225
226#[cfg(feature = "use_sync_queue")]
227impl<D: SyslogDestination, F: SyslogFormatter + Sync> AsyncSyslog<D, F, AsyncSyslogQueue<F>>
228{
229    /// Attaches to the [SyslogQueue] so the async instance of syslog will write to the
230    /// same instance with sync portion.
231    /// 
232    /// # Arguments
233    /// 
234    /// * `dest` - a [Syslog] instance of type [SyslogQueue]. The `D` [SyslogDestination] of 
235    ///     instance does shoud be of the same type as in `sync protion`. 
236    /// 
237    /// # Returns
238    /// 
239    /// A [SyRes] is returned. The [Result::Err] will be returned only of thread which serves the
240    /// message queue panics or was not started.
241    pub async 
242    fn attach_queue(dest: &Syslog<D, F, SyslogQueue<F>>) -> SyRes<AsyncSyslog<D, F, AsyncSyslogQueue<F>>>
243    {
244        let syslog = 
245            AsyncSyslogQueue::<F>::attachlog(dest.make_adapter()).await?;
246    
247        return Ok(Self(syslog, PhantomData::<F>, PhantomData::<D>));
248    }
249
250    /// Sets the logmask to filter out the syslog calls. This function behaves 
251    /// differently as it behaves in syslog_sync.rs or syslog_async.rs.
252    /// It may return an error if: syslog thread had exit and some thread calls
253    /// this function. Or something happened with channel. 
254    /// This function blocks until the previous mask is received.
255    /// 
256    /// See macroses [LOG_MASK] and [LOG_UPTO] to generate mask
257    ///
258    /// # Example
259    ///
260    /// LOG_MASK!(Priority::LOG_EMERG) | LOG_MASK!(Priority::LOG_ERROR)
261    ///
262    /// or
263    ///
264    /// ~(LOG_MASK!(Priority::LOG_INFO))
265    /// LOG_UPTO!(Priority::LOG_ERROR) 
266    pub async 
267    fn setlogmask(&self, logmask: i32) -> SyRes<i32>
268    {
269        return self.0.setlogmask(logmask).await;
270    }
271    
272
273    /// Closes connection to the syslog server
274    pub async 
275    fn closelog(&self) -> SyRes<()>
276    {
277        return self.0.closelog().await;
278    }
279
280    /// Similar to libc, syslog() sends data to syslog server.
281    /// 
282    /// # Arguments
283    ///
284    /// * `pri` - a priority [Priority]
285    ///
286    /// * `fmt` - a program's message to be sent as payload.
287    #[inline]
288    pub async 
289    fn syslog(&self, pri: Priority, fmt: String)
290    {
291        self.0.syslog(pri, fmt).await;
292    }
293
294    /// Sends message to syslog (same as `syslog`).
295    #[inline]
296    pub async 
297    fn vsyslog<M: AsRef<str>>(&self, pri: Priority, fmt: M)
298    {
299        self.0.vsyslog(pri, fmt.as_ref()).await;
300    }
301
302    /// Performs the reconnection to the syslog server or file re-open.
303    /// 
304    /// # Returns
305    /// 
306    /// A [Result] is retured as [SyRes].
307    /// 
308    /// * [Result::Ok] - with empty inner type.
309    /// 
310    /// * [Result::Err] - an error code and description
311    pub async  
312    fn reconnect(&self) -> SyRes<()>
313    {
314        return self.0.reconnect().await;
315    }
316
317    /// Updates the inner instance destionation i.e path to file
318    /// or server address. The type of destination can not be changed.
319    /// 
320    /// This function disconnects from syslog server if previously was 
321    /// connected (and reconnects if was connected previously).
322    /// 
323    /// # Arguments 
324    /// 
325    /// * `new_tap` - a consumed instance of type `D` [SyslogDestination]
326    /// 
327    /// # Returns 
328    /// 
329    /// A [SyRes] is returned. An error may be returned if:
330    /// 
331    /// * connection to server was failed
332    /// 
333    /// * incorrect type
334    /// 
335    /// * disconnect frm server failed
336    pub async 
337    fn update_tap(&self, new_tap: D) -> SyRes<()>
338    {
339        return self.0.update_tap_data(new_tap.init()?).await;
340    }
341}
342
343
344#[cfg(test)]
345mod tests
346{
347    use tokio::time::Instant;
348
349    use super::*;
350
351    //#[tokio::test(flavor = "single_thread", worker_threads = 1)]
352    #[tokio::test]
353    async fn test_multithreading()
354    {
355        use std::sync::Arc;
356        use tokio::time::{sleep, Duration};
357        
358        let log = 
359            AsyncSyslog::openlog(
360                Some("test1"), 
361                LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
362                LogFacility::LOG_DAEMON,
363                SyslogLocal::new()
364            ).await;
365
366        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
367
368        let log = Arc::new(log.unwrap());
369        let c1_log = log.clone();
370        let c2_log = log.clone();
371
372        tokio::spawn( async move 
373            {
374                for i in 0..5
375                {
376                    let cc_c1_log = c1_log.clone();
377                    sleep(Duration::from_nanos(200)).await;
378                    tokio::spawn( async move 
379                    {
380                        let m = format!("ASYNC a message from thread 1 #{}[]", i);
381                        let now = Instant::now();
382                        cc_c1_log.syslog(Priority::LOG_DEBUG, m).await;
383                        let elapsed = now.elapsed();
384                        println!("t1: {:?}", elapsed);
385                    });
386                }
387            }
388        );
389
390        tokio::spawn(async move 
391            {
392                for i in 0..5
393                {
394                    let cc_c2_log = c2_log.clone();
395                    sleep(Duration::from_nanos(201)).await;
396                    tokio::spawn( async move 
397                    {
398                        let m = format!("ASYNC きるさお命泉ぶねりよ日子金れっ {}", i);
399                        let now = Instant::now();
400                        cc_c2_log.vsyslog(Priority::LOG_DEBUG, m).await;
401                        let elapsed = now.elapsed();
402                        println!("t2: {:?}", elapsed);
403                    });
404                }
405            });
406
407        let m = format!("ASYNC A message from main, きるさお命泉ぶねりよ日子金れっ");
408        let now = Instant::now();
409        log.syslog(Priority::LOG_DEBUG, m).await;
410        let elapsed = now.elapsed();
411        println!("main: {:?}", elapsed);
412
413        sleep(Duration::from_secs(2)).await;
414
415        log.closelog().await.unwrap();
416
417        sleep(Duration::from_nanos(201)).await;
418
419        return;
420    }
421}