syslog_rs/sync/
syslog_sync_queue.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 *   2. The MIT License (MIT)
12 *                     
13 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::borrow::Cow;
18use std::marker::PhantomData;
19use std::{fmt, thread};
20use std::sync::{Arc, Mutex, Weak};
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use crate::formatters::{DefaultSyslogFormatter, SyslogFormatter};
24use crate::sync::syslog_sync_internal::SyslogSocketLockless;
25use crate::sync::{LogItems, SyStream};
26use crate::sync::DefaultQueueAdapter;
27
28use crate::{map_error, SyStreamApi, SyncSyslog, SyslogDestination};
29
30#[cfg(target_family = "unix")]
31use crate::SyslogLocal;
32
33#[cfg(target_family = "windows")]
34use crate::WindowsEvent;
35
36#[cfg(target_family = "unix")]
37pub type DefaultLocalSyslogDestination = SyslogLocal;
38#[cfg(target_family = "windows")]
39pub type DefaultLocalSyslogDestination = WindowsEvent;
40
41use crate::common::*;
42use crate::error::SyRes;
43
44use super::syslog_trait::SyslogApi;
45
46
47/// A wrapper for the data commands in the queue
48pub enum SyCmd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
49{
50    /// A message to syslog server
51    Syslog
52    {
53        pri: Priority,
54        msg: F
55    },
56
57    /// A reuest to change logmask
58    Logmask
59    {
60        logmask: i32, 
61        loopback: S::OneShotChannelSnd<i32>,
62    },
63
64    /// A request to change identity
65    ChangeIdentity
66    {
67        identity: Option<String>,
68    },
69
70    /// Updates the tap settings
71    UpdateTap
72    {
73        tap_type: D,//TapTypeData,
74        loopback: S::OneShotChannelSnd<SyRes<()>>,
75    },
76
77    ConnectLog
78    {
79        loopback: S::OneShotChannelSnd<SyRes<()>>
80    },
81
82    DisconnectLog
83    {
84        loopback: S::OneShotChannelSnd<SyRes<()>>
85    },
86
87    /// A request to rotate file or reconnect.
88    Reconnect,
89
90    /// A request to stop processing and quit
91    #[allow(unused)]
92    Stop,
93}
94
95
96impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyCmd<F, D, S>
97{
98    /// Construct a message with data to send.
99    pub(crate) 
100    fn form_syslog(pri: Priority, msg: F) -> Self
101    {
102        return 
103            Self::Syslog
104            {
105                pri, msg
106            };
107    }
108
109    pub(crate) 
110    fn form_connectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
111    {
112        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
113
114        return 
115            (Self::ConnectLog{ loopback: tx }, rx);
116    }
117
118    pub(crate) 
119    fn form_disconnectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
120    {
121        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
122
123        return 
124            (Self::DisconnectLog{ loopback: tx }, rx);
125    }
126
127    /// Constructs a message to make logmask with or without previous PRI.
128    /// 
129    /// # Arguments
130    /// 
131    /// * `logmask` - a new logmask
132    pub(crate) 
133    fn form_logmask(logmask: i32) -> (Self, S::OneShotChannelRcv<i32>)
134    {
135        let (tx, rx) = S::create_oneshot_channel::<i32>();
136
137        return 
138            (Self::Logmask{ logmask, loopback: tx }, rx);
139    }
140
141    /// Constructs a message which should change the identity (appname) of the
142    /// instance.
143    pub(crate) 
144    fn form_change_ident(identity: Option<String>) -> Self
145    {
146        return 
147            Self::ChangeIdentity
148            {
149                identity: identity
150            };
151    }
152
153    /// Constructs a message which changes the destination of the log messages i.e
154    /// changing path of the dst file or address. The `new_tap_type` 
155    /// should be the same variant [TapTypeData] as previous.
156    pub(crate)
157    fn form_update_tap(new_tap_type: D/*TapTypeData*/) -> (Self, S::OneShotChannelRcv<SyRes<()>>)
158    {
159        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
160
161        return (
162            Self::UpdateTap  
163            { 
164                tap_type: new_tap_type,
165                loopback: tx
166            },
167            rx
168        );
169    }
170
171    /// Constructs a message to handle SIGHUP. This is usefull only when the instance
172    /// is writing directly into the file. Or just reconnect.
173    pub(crate) 
174    fn form_reconnect() -> Self
175    {
176        return Self::Reconnect;
177    }
178
179    /// Constructs a message to stop thread gracefully. After receiving this
180    /// message a thread will quit and all messages that would be sent after
181    /// this message will be cleared from queue and a new messages will not be
182    /// received.
183    #[allow(unused)]
184    pub(crate) 
185    fn form_stop() -> Self
186    {
187        return Self::Stop;
188    }
189}
190
191/// Internal struct of the syslog client thread.
192struct SyslogInternal<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
193{
194    /// A explicit stop flag
195    run_flag: Arc<AtomicBool>,
196
197    /// commands channel
198    tasks: S::ChannelRcv,
199
200    /// Log config
201    log_items: LogItems,
202
203    /// socket
204    socket: SyslogSocketLockless<D>,
205}
206
207
208
209impl<F, D, S> SyslogInternal<F, D, S>
210where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
211{
212    fn new(log_items: LogItems, socket: SyslogSocketLockless<D>) -> SyRes<(Self, S::ChannelSnd, Weak<AtomicBool>)>
213    {
214        // control flag
215        let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
216        let run_control = Arc::downgrade(&run_flag);
217
218        // creating queue for messages
219        let (sender, receiver) = S::create_channel();
220
221        // creating internal syslog struct
222        let mut inst = 
223            SyslogInternal
224            {
225                run_flag: run_flag,
226                tasks: receiver,
227                log_items: log_items,
228                socket: socket
229            };
230
231        if inst.log_items.logstat.contains(LogStat::LOG_NDELAY) == true
232        {
233            inst.socket.connectlog()?;
234        }
235
236        return Ok((inst, sender, run_control));
237    }
238
239    fn thread_worker(mut self)
240    {
241        loop
242        {
243            // self will be dropped as soon as thread will be stopped
244            if self.run_flag.load(Ordering::Relaxed) == false
245            {
246                // force leave
247                break;
248            }	
249
250            match self.tasks.q_recv_blocking()
251			{
252				Some(task) =>
253				{
254                    match task
255                    {
256                        SyCmd::Syslog{ pri, msg } =>
257                        {
258                            let Some(formatted) = self.log_items.vsyslog1_msg::<F, D>(pri, &msg)
259                                else { continue };
260
261                            self.socket.vsyslog1(formatted.1, formatted.0);
262                        },
263                        SyCmd::Logmask{ logmask, loopback } =>
264                        {
265                            let pri = self.log_items.set_logmask(logmask);
266
267                            let _ = loopback.send_once_blocking(pri);
268                        },
269                        SyCmd::ChangeIdentity{ identity } =>
270                        {
271                            self.log_items.set_identity(identity.as_ref().map(|v| v.as_str()));
272                        },
273
274                        SyCmd::UpdateTap{ tap_type, loopback } =>
275                        {
276                            let res = self.socket.update_tap_data(tap_type);
277                            
278                            if let Err(Err(e)) = loopback.send_once_blocking(res)
279                            {
280                                self.log_items.logstat.send_to_stderr(&e.to_string());
281                            }
282                        },
283
284                        SyCmd::ConnectLog{ loopback} => 
285                        {
286                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.connectlog())
287                            {
288                                self.log_items.logstat.send_to_stderr(&e.to_string());
289                            }
290                        },
291
292                        SyCmd::DisconnectLog{ loopback} => 
293                        {
294                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.disconnectlog())
295                            {
296                                self.log_items.logstat.send_to_stderr(&e.to_string());
297                            }
298                        },
299
300                        SyCmd::Reconnect =>
301                        {
302                            if let Err(e) = self.socket.disconnectlog()
303                            {
304                                self.log_items.logstat.send_to_stderr(&e.to_string());
305                            }
306                                
307                            if let Err(e) = self.socket.connectlog()
308                            {
309                                self.log_items.logstat.send_to_stderr(&e.to_string());
310                            }
311                        },
312                        SyCmd::Stop =>
313                        {
314                            // ignore the rest
315                            break;
316                        }
317                    }
318                },
319                None =>
320                {
321                    break;
322                }
323            } // match
324
325        } // loop
326
327        return;
328    }
329}
330
331/// A trait which should be implemented by the channel provider which forms a command queue.
332/// This trait provides a blocking receive interface on the receiver side.
333pub trait SyslogQueueChanRcv<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Send
334{
335    /// Receive from channel in blocking mode.
336    fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>;
337}
338
339/// A trait which should be implemented by the channel provider which forms a command queue.
340/// This trait provides both or either the blocking send interface.
341#[allow(async_fn_in_trait)]
342pub trait SyslogQueueChanSnd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Clone
343{
344    /// Sends the message over the channel in blocking mode.
345    fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>;
346
347    /// Sends the message over the channel in async mode. By default, it returns error.
348    async fn q_send(&self, _msg: SyCmd<F, D, S>) -> SyRes<()>
349    {
350        crate::throw_error!("async is not availabe here"); 
351    }
352}
353
354/// A trait which should be implemented by the channel provider which forms a command queue.
355/// This trait provides a Oneshot channel a receive interface for both sync and async modes.
356#[allow(async_fn_in_trait)]
357pub trait SyslogQueueOneChanRcv<C>
358{
359    /// Receives once from the channel in sync mode.
360    fn recv_once_blocking(self) -> SyRes<C>;
361
362    /// Receives once from the channel in async mode. By default, it returns error if not 
363    /// implemented.
364    async fn recv_once(self) -> SyRes<C> where Self: Sized
365    {
366        crate::throw_error!("async is not availabe here"); 
367    }
368}
369
370/// A trait which should be implemented by the channel provider which forms a command queue.
371/// This trait provides a Oneshot channel a send interface for blocking mode.
372pub trait SyslogQueueOneChanSnd<C: Send>: fmt::Debug + Send
373{
374    /// Send to channel in blocing mode.
375    fn send_once_blocking(self, data: C) -> Result<(), C>;
376}
377
378/// A trait which should be implemented by the channel provider which forms a command queue.
379/// This trait provides a common interface which includes everything i.e channel, oneshot 
380/// channel and manipulations.
381pub trait SyslogQueueChannel<F: SyslogFormatter, D: SyslogDestination>: fmt::Debug + Send + Clone + 'static
382{
383    const ADAPTER_NAME: &'static str;
384    
385    /// A send side of the channel type.
386    type ChannelSnd: SyslogQueueChanSnd<F, D, Self>;
387    
388    /// A receive side of the channel type.
389    type ChannelRcv: SyslogQueueChanRcv<F, D, Self>;
390
391    /// A oneshot send side of the channel type.
392    type OneShotChannelSnd<C: Send + fmt::Debug>: SyslogQueueOneChanSnd<C>;
393
394    /// A oneshor receive side of the channel type.
395    type OneShotChannelRcv<C>: SyslogQueueOneChanRcv<C>;
396
397    /// Creates unbounded channel.
398    fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv);   
399
400    /// Creates oneshot channel.
401    fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>);
402}
403
404
405
406/// A parallel, shared instance of the syslog client which is running in the 
407/// separate thread and uses a crossbeam channel to receive the messages from 
408/// the program. It is also capable to combine sync and async i.e sync code and
409/// async code is writing to the same syslog connection.
410/// 
411/// For this isntance a [SyslogApi] and [SyStreamApi] are implemented.
412/// 
413/// Also if `async` is enabled, a [AsyncSyslogQueueApi] is implemented.
414/// 
415/// ```ignore
416/// let log = 
417///     QueuedSyslog::openlog(
418///         Some("test1"), 
419///         LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
420///         LogFacility::LOG_DAEMON,
421///         SyslogLocal::new()
422///     );
423/// ```
424/// 
425/// ```ignore
426/// let log = 
427///     QueuedSyslog
428///         ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
429///         ::openlog_with(
430///             Some("test1"), 
431///             LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
432///             LogFacility::LOG_DAEMON,
433///             SyslogLocal::new()
434///         );
435/// ```
436/// 
437/// ```ignore
438/// pub static SYSLOG3: LazyLock<SyncSyslog<DefaultSyslogFormatter, SyslogLocal,>> = 
439///     LazyLock::new(|| 
440///         {
441///             QueuedSyslog
442///                 ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
443///                 ::openlog_with(
444///                     Some("test1"), 
445///                     LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
446///                     LogFacility::LOG_DAEMON,
447///                     SyslogLocal::new()
448///                 )
449///                 .unwrap()
450///         }
451///     );
452/// ```
453/// 
454/// A stream is availble via [SyStreamApi].
455/// 
456/// ```ignore
457/// let _ = write!(SYSLOG.stream(Priority::LOG_DEBUG), "test {} 123 stream test ", d);
458/// ```
459/// 
460/// # Generics
461/// 
462/// * `S` - a [SyslogQueueChannel] a MPSC provider.
463/// 
464/// * `F` - a [SyslogFormatter] which sets the instance which would 
465///     format the message.
466/// 
467/// * `D` - a [SyslogDestination] instance which is either:
468///     [SyslogLocal], [SyslogFile], [SyslogNet], [SyslogTls]. By
469///     default a `SyslogLocal` is selected.
470#[derive(Debug, Clone)]
471pub struct QueuedSyslog<S = DefaultQueueAdapter, F = DefaultSyslogFormatter, D = DefaultLocalSyslogDestination>
472where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
473{   
474    /// Control flag
475    run_control: Weak<AtomicBool>,
476
477    /// commands channel
478    pub(crate) tasks: S::ChannelSnd,//q_adapter::SendChannelSyCmd<D>,
479
480    /// process thread
481    thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
482
483    /// phantom for [SyslogFormatter]
484    _p: PhantomData<F>,
485
486    /// phantom for [SyslogDestination]
487    _p2: PhantomData<D>,
488}
489
490
491unsafe impl<F, D, S> Send for QueuedSyslog<S, F, D> 
492where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
493{}
494
495
496impl<F, D, S> Drop for QueuedSyslog<S, F, D>
497where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
498{
499    fn drop(&mut self) 
500    {
501        if let Some(ctrl) = self.run_control.upgrade()
502        {   
503            ctrl.store(false, Ordering::SeqCst);
504
505            if let Err(_e) = self.tasks.q_send_blocking(SyCmd::form_stop())
506            {
507
508            }
509
510            let join_handle = self.thread.lock().unwrap().take().unwrap();
511
512            let _ = join_handle.join();
513        }
514    }
515}
516
517impl QueuedSyslog
518{
519    /// Opens a default connection to the local syslog server with default formatter with
520    /// formatter [SyslogFormatter] and destination [SyslogLocal].
521    /// 
522    /// # Arguments
523    /// 
524    /// * `ident` - A program name which will appear on the logs. If none, will be determined
525    ///     automatically.
526    /// 
527    /// * `logstat` - [LogStat] an instance config.
528    /// 
529    /// * `facility` - [LogFacility] a syslog facility.
530    /// 
531    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
532    /// 
533    /// # Returns
534    /// 
535    /// A [SyRes] is returned ([Result]) with: 
536    /// 
537    /// * [Result::Ok] - with instance
538    /// 
539    /// * [Result::Err] - with error description.
540    pub 
541    fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: DefaultLocalSyslogDestination) -> SyRes<Self>
542    {
543        // creating internal syslog struct
544
545         let log_items = 
546                LogItems::new(ident, 0xff, logstat, facility);
547
548        let stream = 
549            SyslogSocketLockless::<DefaultLocalSyslogDestination>::new(logstat, net_tap_prov)?;
550
551        let (inst, sender, run_ctrl) = 
552            SyslogInternal
553                ::<DefaultSyslogFormatter, DefaultLocalSyslogDestination, DefaultQueueAdapter>
554                ::new(log_items, stream)?;
555        
556        
557        let thr_name: String = "syslog_queue/0".into();
558
559        // initiate a thread
560        let thread_hnd = 
561            thread::Builder::new()
562                .name(thr_name.clone())
563                .spawn(move || 
564                    SyslogInternal
565                        ::<DefaultSyslogFormatter, DefaultLocalSyslogDestination, DefaultQueueAdapter>
566                        ::thread_worker(inst)
567                )
568                .map_err(|e| 
569                    map_error!("{} thread spawn failed. {}", thr_name, e)
570                )?;
571
572        // creating a syslog public struct instance
573        let ret = 
574            Self
575            {
576                run_control: run_ctrl,
577                tasks: sender,
578                thread: Arc::new(Mutex::new(Some(thread_hnd))),
579                _p: PhantomData,
580                _p2: PhantomData
581            };
582
583        return Ok(ret);
584    }
585}
586
587impl<F, D, S> QueuedSyslog<S, F, D>
588where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
589{
590    /// Opens a default connection to the local syslog server with default formatter with
591    /// provided generics.
592    /// 
593    /// # Arguments
594    /// 
595    /// * `ident` - A program name which will appear on the logs. If none, will be determined
596    ///     automatically.
597    /// 
598    /// * `logstat` - [LogStat] an instance config.
599    /// 
600    /// * `facility` - [LogFacility] a syslog facility.
601    /// 
602    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
603    /// 
604    /// # Returns
605    /// 
606    /// A [SyRes] is returned ([Result]) with: 
607    /// 
608    /// * [Result::Ok] - with instance
609    /// 
610    /// * [Result::Err] - with error description.
611    pub 
612    fn openlog_with(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: D) -> SyRes<QueuedSyslog<S, F, D>>
613    {
614        // creating internal syslog struct
615
616         let log_items = 
617                LogItems::new(ident, 0xff, logstat, facility);
618
619        let stream = 
620            SyslogSocketLockless::<D>::new(logstat, net_tap_prov)?;
621
622        let (inst, sender, run_ctrl) = 
623            SyslogInternal::<F, D, S>::new(log_items, stream)?;
624        
625        
626        let thr_name: String = "syslog_queue/0".into();
627
628        // initiate a thread
629        let thread_hnd = 
630            thread::Builder::new()
631                .name(thr_name.clone())
632                .spawn(move || SyslogInternal::<F, D, S>::thread_worker(inst))
633                .map_err(|e| 
634                    map_error!("{} thread spawn failed. {}", thr_name, e)
635                )?;
636
637        // creating a syslog public struct instance
638        let ret = 
639            Self
640            {
641                run_control: run_ctrl,
642                tasks: sender,
643                thread: Arc::new(Mutex::new(Some(thread_hnd))),
644                _p: PhantomData::<F>,
645                _p2: PhantomData::<D>,
646            };
647
648        return Ok(ret);
649    }
650}
651
652
653impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogApi<F, D> 
654for QueuedSyslog<S, F, D>
655{
656    fn connectlog(&self) -> SyRes<()>
657    {
658        let (sy_cmd, loopback) = 
659            SyCmd::form_connectlog();
660
661        self.tasks.q_send_blocking(sy_cmd)?;
662
663        return 
664            loopback
665                .recv_once_blocking()?;
666    }
667
668    fn setlogmask(&self, logmask: i32) -> SyRes<i32> 
669    {
670        let (sy_cmd, loopback) = 
671            SyCmd::form_logmask(logmask);
672
673        self.tasks.q_send_blocking(sy_cmd)?;
674
675        return 
676            loopback
677                .recv_once_blocking();
678    }
679
680    fn closelog(&self) -> SyRes<()> 
681    {
682        let (sy_cmd, loopback) = 
683            SyCmd::form_disconnectlog();
684
685        // send stop
686        self.tasks.q_send_blocking(sy_cmd)?;
687
688        return 
689            loopback
690                .recv_once_blocking()?;
691    }
692
693    fn syslog(&self, pri: Priority, fmt: F) 
694    {
695        // even if the thread is in a process of termination, there is
696        // no need to sync access to the run_control field as even if
697        // syslog thread will terminate before someone push something on the
698        // queue, it will be left in the queue until the end of program's time.
699        
700        let sy_cmd = SyCmd::form_syslog(pri, fmt);
701
702        let _ = self.tasks.q_send_blocking(sy_cmd);
703        //tasks.send(sy_cmd);
704
705        return;
706    }
707
708    fn change_identity(&self, ident: Option<&str>) -> SyRes<()>
709    {
710        let sy_cmd = 
711            SyCmd::form_change_ident(ident.map(|v| v.to_string()));
712
713        return 
714            self.tasks.q_send_blocking(sy_cmd);
715    }
716
717    fn reconnect(&self) -> SyRes<()>
718    {
719        return 
720            self.tasks.q_send_blocking(SyCmd::form_reconnect());
721    }
722
723    fn update_tap_data(&self, tap_data: D) -> SyRes<()>
724    {
725        let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
726
727        self.tasks.q_send_blocking(tap_data_cmd)?;
728
729        return 
730            loopback
731                .recv_once_blocking()?;
732    }
733}
734
735
736impl<'stream, F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyStreamApi<'stream, F, D, QueuedSyslog<S, F, D>> 
737for QueuedSyslog<S, F, D>
738{
739    fn stream(&'stream self, pri: Priority) -> SyStream<'stream, D, F, QueuedSyslog<S, F, D>> 
740    {
741        return 
742            SyStream 
743            { 
744                inner: self, 
745                pri: pri, 
746                _p: PhantomData, 
747                _p1: PhantomData 
748            };
749    }
750}
751
752/// A queued implementation for ASYNC.
753#[cfg(all(feature = "build_with_queue", feature = "async_enabled"))]
754pub mod syslog_async_queue
755{
756    use crate::error::SyRes;
757    use crate::sy_sync_queue::{SyCmd, SyslogQueueChanSnd, SyslogQueueChannel, SyslogQueueOneChanRcv};
758    use crate::Priority;
759    use crate::{formatters::SyslogFormatter, SyslogDestination, QueuedSyslog};
760    use crate::a_sync::syslog_trait::AsyncSyslogQueueApi;
761
762    impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> AsyncSyslogQueueApi<F, D> 
763    for QueuedSyslog<S, F, D>
764    {
765        async 
766        fn a_connectlog(&mut self) -> SyRes<()> 
767        {
768            let (sy_cmd, loopback) = 
769                SyCmd::form_connectlog();
770
771            self.tasks.q_send(sy_cmd).await?;
772
773            return 
774                loopback
775                    .recv_once()
776                    .await?;
777        }
778
779        /// Sets the logmask to filter out the syslog calls. This function behaves 
780        /// differently as it behaves in syslog_sync.rs or syslog_async.rs.
781        /// It may return an error if: syslog thread had exit and some thread calls
782        /// this function. Or something happened with channel. 
783        /// This function blocks until the previous mask is received.
784        /// 
785        /// See macroses [LOG_MASK] and [LOG_UPTO] to generate mask
786        ///
787        /// # Example
788        ///
789        /// LOG_MASK!(Priority::LOG_EMERG) | LOG_MASK!(Priority::LOG_ERROR)
790        ///
791        /// or
792        ///
793        /// ~(LOG_MASK!(Priority::LOG_INFO))
794        /// LOG_UPTO!(Priority::LOG_ERROR) 
795        async 
796        fn a_setlogmask(&self, logmask: i32) -> SyRes<i32>
797        {
798            let (sy_cmd, loopback) = 
799                SyCmd::form_logmask(logmask);
800
801            self.tasks.q_send(sy_cmd).await?;
802
803            return 
804                loopback
805                    .recv_once()
806                    .await;
807        }
808
809        /// Closes connection to the syslog server
810        async 
811        fn a_closelog(&self) -> SyRes<()>
812        {
813            let (sy_cmd, loopback) = 
814                SyCmd::form_disconnectlog();
815
816            // send stop
817            self.tasks.q_send(sy_cmd).await?;
818
819            return 
820                loopback
821                    .recv_once()
822                    .await?;
823        }
824
825        /// Similar to libc, syslog() sends data to syslog server, but asynchroniously.
826        /// 
827        /// # Arguments
828        ///
829        /// * `pri` - a priority [Priority]
830        ///
831        /// * `fmt` - a program's message to be sent as payload. The message is encoded with the
832        ///     [SyslogFormatter] and may be different for different formatters.
833        #[inline]
834        async 
835        fn a_syslog(&self, pri: Priority, fmt: F)
836        {
837            let sy_cmd = SyCmd::form_syslog(pri, fmt);
838
839            let _ = self.tasks.q_send(sy_cmd).await;
840
841            return;
842        }
843
844
845        /// Performs the reconnection to the syslog server or file re-open.
846        /// 
847        /// # Returns
848        /// 
849        /// A [Result] is retured as [SyRes].
850        /// 
851        /// * [Result::Ok] - with empty inner type.
852        /// 
853        /// * [Result::Err] - an error code and description
854        async 
855        fn a_reconnect(&self) -> SyRes<()>
856        {
857            return 
858                self.tasks.q_send(SyCmd::form_reconnect()).await;
859        }
860
861        async 
862        fn a_change_identity(&self, ident: &str) -> SyRes<()> 
863        {
864            let sy_cmd = 
865                SyCmd::form_change_ident(Some(ident.to_string()));
866
867            return 
868                self.tasks.q_send(sy_cmd).await;
869        }
870
871        /// Updates the inner instance destionation i.e path to file
872        /// or server address. The type of destination can not be changed.
873        /// 
874        /// This function disconnects from syslog server if previously was 
875        /// connected (and reconnects if was connected previously).
876        /// 
877        /// # Arguments 
878        /// 
879        /// * `new_tap` - a consumed instance of type `D` [SyslogDestination]
880        /// 
881        /// # Returns 
882        /// 
883        /// A [SyRes] is returned. An error may be returned if:
884        /// 
885        /// * connection to server was failed
886        /// 
887        /// * incorrect type
888        /// 
889        /// * disconnect frm server failed
890        async 
891        fn a_update_tap_data(&self, new_tap: D) -> SyRes<()>
892        {
893            let (tap_data_cmd, loopback) = SyCmd::form_update_tap(new_tap);
894
895            self.tasks.q_send(tap_data_cmd).await?;
896
897            return 
898                loopback
899                    .recv_once()
900                    .await?;
901        }
902    }
903}
904
905
906
907#[cfg(all(feature = "build_with_queue", not(feature = "async_enabled")))]
908#[cfg(test)]
909mod test_queue_sync
910{
911    use crate::{formatters::DefaultSyslogFormatter, sync::{crossbeam_queue_adapter::DefaultQueueAdapter}, LogFacility, LogStat, Priority, SyslogApi, QueuedSyslog};
912
913    use super::DefaultLocalSyslogDestination;
914
915    #[cfg(target_family = "unix")]
916    use crate::SyslogLocal;
917
918    #[cfg(target_family = "windows")]
919    use crate::WindowsEvent;
920
921    #[test]
922    fn test_queue_single_message()
923    {
924        #[cfg(target_family = "unix")]
925        let syslog_provider = 
926            SyslogLocal::new();
927
928        #[cfg(target_family = "windows")]   
929        let syslog_provider = 
930            WindowsEvent::new(); 
931
932        let log = 
933                QueuedSyslog
934                    ::<DefaultQueueAdapter, DefaultSyslogFormatter, DefaultLocalSyslogDestination>
935                    ::openlog_with(
936                    Some("queue_single_message"), 
937                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
938                    LogFacility::LOG_DAEMON,
939                    syslog_provider
940                    );
941
942        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
943
944        let log = log.unwrap();
945
946        let msg1 = format!("test UTF-8 проверка BOM UTF-8");
947        let now = std::time::Instant::now();
948
949        log.syslog(Priority::LOG_DEBUG, msg1.into());
950
951        let dur = now.elapsed();
952        println!("{:?}", dur);
953
954        let msg2 = format!("test UTF-8  きるさお命泉ぶねりよ日子金れっ");
955
956        let now = std::time::Instant::now();
957
958        log.syslog(Priority::LOG_DEBUG, msg2.into());
959
960        let dur = now.elapsed();
961        println!("{:?}", dur);
962
963        let _ = log.closelog();
964
965        return;
966    }
967}
968
969#[cfg(all(feature = "build_with_queue", feature = "async_embedded"))]
970#[cfg(test)]
971mod tests_queue
972{
973    use crate::{formatters::DefaultSyslogFormatter, sync::DefaultQueueAdapter, LogFacility, LogStat, Priority, QueuedSyslog, SyslogApi};
974 
975    use super::DefaultLocalSyslogDestination;
976    
977    #[cfg(target_family = "unix")]
978    use crate::SyslogLocal;
979
980    #[cfg(target_family = "windows")]
981    use crate::WindowsEvent;
982
983    #[test]
984    fn test_multithreading()
985    {
986        use std::sync::Arc;
987        use std::thread;
988        use std::time::{Instant, Duration};
989        use crate::LOG_MASK;
990
991        #[cfg(target_family = "unix")]
992        let syslog_provider = 
993            SyslogLocal::new();
994
995        #[cfg(target_family = "windows")]   
996        let syslog_provider = 
997            WindowsEvent::new(); 
998
999
1000        let log = 
1001            QueuedSyslog
1002                ::<DefaultQueueAdapter, DefaultSyslogFormatter, DefaultLocalSyslogDestination>
1003                ::openlog_with(
1004                    Some("test5"), 
1005                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
1006                    LogFacility::LOG_DAEMON, syslog_provider
1007                );
1008
1009        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
1010
1011        let log = Arc::new(log.unwrap());
1012        let c1_log = log.clone();
1013        let c2_log = log.clone();
1014
1015        thread::spawn(move|| {
1016            for i in 0..5
1017            {
1018                thread::sleep(Duration::from_nanos(200));
1019                let now = Instant::now();
1020                c1_log.syslog(Priority::LOG_DEBUG, format!("a message from thread 1 #{}[]", i).into());
1021                let elapsed = now.elapsed();
1022                println!("t1: {:?}", elapsed);
1023            }
1024        });
1025
1026        thread::spawn(move|| {
1027            for i in 0..5
1028            {
1029                thread::sleep(Duration::from_nanos(201));
1030                let now = Instant::now();
1031                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
1032                let elapsed = now.elapsed();
1033                println!("t2: {:?}", elapsed);
1034            }
1035        });
1036
1037        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1038
1039        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
1040        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
1041
1042        let now = Instant::now();
1043        log.syslog(Priority::LOG_DEBUG, format!("A message from main, сообщение от главнюка").into());
1044        let elapsed = now.elapsed();
1045        println!("main: {:?}", elapsed);
1046
1047        thread::sleep(Duration::from_secs(2));
1048
1049        let _ = log.closelog();
1050
1051        thread::sleep(Duration::from_millis(500));
1052
1053        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1054
1055        assert_eq!(res.is_ok(), true);
1056
1057        return;
1058    }
1059
1060    #[cfg(feature = "build_ext_file")]
1061    #[test]
1062    fn test_file_multithreading()
1063    {
1064        use std::sync::Arc;
1065        use std::thread;
1066        use std::time::{Instant, Duration};
1067        use crate::formatters::DefaultSyslogFormatterFile;
1068        use crate::{SyslogFile, LOG_MASK};
1069
1070        let log = 
1071            QueuedSyslog
1072                ::<DefaultQueueAdapter, DefaultSyslogFormatterFile, SyslogFile>
1073                ::openlog_with(
1074                    Some("test2"), 
1075                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
1076                    LogFacility::LOG_DAEMON, 
1077                    SyslogFile::new("/tmp/syslog_rs_test2.log")
1078                );
1079                    
1080
1081        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
1082
1083        let log = Arc::new(log.unwrap());
1084        let c1_log = log.clone();
1085        let c2_log = log.clone();
1086
1087        thread::spawn(move|| {
1088            for i in 0..5
1089            {
1090                thread::sleep(Duration::from_nanos(200));
1091                let now = Instant::now();
1092                c1_log.syslog(Priority::LOG_ALERT, format!("a message from thread 1 #{}[]", i).into());
1093                let elapsed = now.elapsed();
1094                println!("t1: {:?}", elapsed);
1095            }
1096        });
1097
1098        thread::spawn(move|| {
1099            for i in 0..5
1100            {
1101                thread::sleep(Duration::from_nanos(201));
1102                let now = Instant::now();
1103                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
1104                let elapsed = now.elapsed();
1105                println!("t2: {:?}", elapsed);
1106            }
1107        });
1108
1109        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1110
1111        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
1112        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
1113
1114        let now = Instant::now();
1115        log.syslog(Priority::LOG_DEBUG, format!("A message from main, きるさお命泉ぶねりよ日子金れっ").into());
1116        let elapsed = now.elapsed();
1117        println!("main: {:?}", elapsed);
1118
1119        thread::sleep(Duration::from_secs(2));
1120
1121        let _ = log.closelog();
1122
1123        thread::sleep(Duration::from_millis(500));
1124
1125        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1126
1127        assert_eq!(res.is_ok(), true);
1128
1129        return;
1130    }
1131}