syslog_rs/sync/
syslog_sync_queue.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14
15use std::borrow::Cow;
16use std::marker::PhantomData;
17use std::{fmt, thread};
18use std::sync::{Arc, Mutex, Weak};
19use std::sync::atomic::{AtomicBool, Ordering};
20
21use crate::formatters::{DefaultSyslogFormatter, SyslogFormatter};
22use crate::sync::syslog_sync_internal::SyslogSocketLockless;
23use crate::sync::{LogItems, SyStream};
24use crate::sync::DefaultQueueAdapter;
25
26use crate::{map_error, SyStreamApi, SyncSyslog, SyslogDestination, SyslogLocal};
27use crate::common::*;
28use crate::error::SyRes;
29
30use super::syslog_trait::SyslogApi;
31
32
33/// A wrapper for the data commands in the queue
34pub enum SyCmd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
35{
36    /// A message to syslog server
37    Syslog
38    {
39        pri: Priority,
40        msg: F
41    },
42
43    /// A reuest to change logmask
44    Logmask
45    {
46        logmask: i32, 
47        loopback: S::OneShotChannelSnd<i32>,
48    },
49
50    /// A request to change identity
51    ChangeIdentity
52    {
53        identity: Option<String>,
54    },
55
56    /// Updates the tap settings
57    UpdateTap
58    {
59        tap_type: D,//TapTypeData,
60        loopback: S::OneShotChannelSnd<SyRes<()>>,
61    },
62
63    ConnectLog
64    {
65        loopback: S::OneShotChannelSnd<SyRes<()>>
66    },
67
68    DisconnectLog
69    {
70        loopback: S::OneShotChannelSnd<SyRes<()>>
71    },
72
73    /// A request to rotate file or reconnect.
74    Reconnect,
75
76    /// A request to stop processing and quit
77    #[allow(unused)]
78    Stop,
79}
80
81
82impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyCmd<F, D, S>
83{
84    /// Construct a message with data to send.
85    pub(crate) 
86    fn form_syslog(pri: Priority, msg: F) -> Self
87    {
88        return 
89            Self::Syslog
90            {
91                pri, msg
92            };
93    }
94
95    pub(crate) 
96    fn form_connectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
97    {
98        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
99
100        return 
101            (Self::ConnectLog{ loopback: tx }, rx);
102    }
103
104    pub(crate) 
105    fn form_disconnectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
106    {
107        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
108
109        return 
110            (Self::DisconnectLog{ loopback: tx }, rx);
111    }
112
113    /// Constructs a message to make logmask with or without previous PRI.
114    /// 
115    /// # Arguments
116    /// 
117    /// * `logmask` - a new logmask
118    pub(crate) 
119    fn form_logmask(logmask: i32) -> (Self, S::OneShotChannelRcv<i32>)
120    {
121        let (tx, rx) = S::create_oneshot_channel::<i32>();
122
123        return 
124            (Self::Logmask{ logmask, loopback: tx }, rx);
125    }
126
127    /// Constructs a message which should change the identity (appname) of the
128    /// instance.
129    pub(crate) 
130    fn form_change_ident(identity: Option<String>) -> Self
131    {
132        return 
133            Self::ChangeIdentity
134            {
135                identity: identity
136            };
137    }
138
139    /// Constructs a message which changes the destination of the log messages i.e
140    /// changing path of the dst file or address. The `new_tap_type` 
141    /// should be the same variant [TapTypeData] as previous.
142    pub(crate)
143    fn form_update_tap(new_tap_type: D/*TapTypeData*/) -> (Self, S::OneShotChannelRcv<SyRes<()>>)
144    {
145        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
146
147        return (
148            Self::UpdateTap  
149            { 
150                tap_type: new_tap_type,
151                loopback: tx
152            },
153            rx
154        );
155    }
156
157    /// Constructs a message to handle SIGHUP. This is usefull only when the instance
158    /// is writing directly into the file. Or just reconnect.
159    pub(crate) 
160    fn form_reconnect() -> Self
161    {
162        return Self::Reconnect;
163    }
164
165    /// Constructs a message to stop thread gracefully. After receiving this
166    /// message a thread will quit and all messages that would be sent after
167    /// this message will be cleared from queue and a new messages will not be
168    /// received.
169    #[allow(unused)]
170    pub(crate) 
171    fn form_stop() -> Self
172    {
173        return Self::Stop;
174    }
175}
176
177/// Internal struct of the syslog client thread.
178struct SyslogInternal<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
179{
180    /// A explicit stop flag
181    run_flag: Arc<AtomicBool>,
182
183    /// commands channel
184    tasks: S::ChannelRcv,
185
186    /// Log config
187    log_items: LogItems,
188
189    /// socket
190    socket: SyslogSocketLockless<D>,
191}
192
193
194
195impl<F, D, S> SyslogInternal<F, D, S>
196where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
197{
198    fn new(log_items: LogItems, socket: SyslogSocketLockless<D>) -> SyRes<(Self, S::ChannelSnd, Weak<AtomicBool>)>
199    {
200        // control flag
201        let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
202        let run_control = Arc::downgrade(&run_flag);
203
204        // creating queue for messages
205        let (sender, receiver) = S::create_channel();
206
207        // creating internal syslog struct
208        let mut inst = 
209            SyslogInternal
210            {
211                run_flag: run_flag,
212                tasks: receiver,
213                log_items: log_items,
214                socket: socket
215            };
216
217        if inst.log_items.logstat.contains(LogStat::LOG_NDELAY) == true
218        {
219            inst.socket.connectlog()?;
220        }
221
222        return Ok((inst, sender, run_control));
223    }
224
225    fn thread_worker(mut self)
226    {
227        loop
228        {
229            // self will be dropped as soon as thread will be stopped
230            if self.run_flag.load(Ordering::Relaxed) == false
231            {
232                // force leave
233                break;
234            }	
235
236            match self.tasks.q_recv_blocking()
237			{
238				Some(task) =>
239				{
240                    match task
241                    {
242                        SyCmd::Syslog{ pri, msg } =>
243                        {
244                            let Some(formatted) = self.log_items.vsyslog1_msg::<F, D>(pri, &msg)
245                                else { continue };
246
247                            self.socket.vsyslog1(formatted.1, &formatted.0);
248                        },
249                        SyCmd::Logmask{ logmask, loopback } =>
250                        {
251                            let pri = self.log_items.set_logmask(logmask);
252
253                            let _ = loopback.send_once_blocking(pri);
254                        },
255                        SyCmd::ChangeIdentity{ identity } =>
256                        {
257                            self.log_items.set_identity(identity.as_ref().map(|v| v.as_str()));
258                        },
259
260                        SyCmd::UpdateTap{ tap_type, loopback } =>
261                        {
262                            let res = self.socket.update_tap_data(tap_type);
263                            
264                            if let Err(Err(e)) = loopback.send_once_blocking(res)
265                            {
266                                self.log_items.logstat.send_to_stderr(&[Cow::Owned(e.to_string())]);
267                            }
268                        },
269
270                        SyCmd::ConnectLog{ loopback} => 
271                        {
272                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.connectlog())
273                            {
274                                self.log_items.logstat.send_to_stderr(&[Cow::Owned(e.to_string())]);
275                            }
276                        },
277
278                        SyCmd::DisconnectLog{ loopback} => 
279                        {
280                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.disconnectlog())
281                            {
282                                self.log_items.logstat.send_to_stderr(&[Cow::Owned(e.to_string())]);
283                            }
284                        },
285
286                        SyCmd::Reconnect =>
287                        {
288                            if let Err(e) = self.socket.disconnectlog()
289                            {
290                                self.log_items.logstat.send_to_stderr(&[Cow::Owned(e.to_string())]);
291                            }
292                                
293                            if let Err(e) = self.socket.connectlog()
294                            {
295                                self.log_items.logstat.send_to_stderr(&[Cow::Owned(e.to_string())]);
296                            }
297                        },
298                        SyCmd::Stop =>
299                        {
300                            // ignore the rest
301                            break;
302                        }
303                    }
304                },
305                None =>
306                {
307                    break;
308                }
309            } // match
310
311        } // loop
312
313        return;
314    }
315}
316
317/// A trait which should be implemented by the channel provider which forms a command queue.
318/// This trait provides a blocking receive interface on the receiver side.
319pub trait SyslogQueueChanRcv<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Send
320{
321    /// Receive from channel in blocking mode.
322    fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>;
323}
324
325/// A trait which should be implemented by the channel provider which forms a command queue.
326/// This trait provides both or either the blocking send interface.
327#[allow(async_fn_in_trait)]
328pub trait SyslogQueueChanSnd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Clone
329{
330    /// Sends the message over the channel in blocking mode.
331    fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>;
332
333    /// Sends the message over the channel in async mode. By default, it returns error.
334    async fn q_send(&self, _msg: SyCmd<F, D, S>) -> SyRes<()>
335    {
336        crate::throw_error!("async is not availabe here"); 
337    }
338}
339
340/// A trait which should be implemented by the channel provider which forms a command queue.
341/// This trait provides a Oneshot channel a receive interface for both sync and async modes.
342#[allow(async_fn_in_trait)]
343pub trait SyslogQueueOneChanRcv<C>
344{
345    /// Receives once from the channel in sync mode.
346    fn recv_once_blocking(self) -> SyRes<C>;
347
348    /// Receives once from the channel in async mode. By default, it returns error if not 
349    /// implemented.
350    async fn recv_once(self) -> SyRes<C> where Self: Sized
351    {
352        crate::throw_error!("async is not availabe here"); 
353    }
354}
355
356/// A trait which should be implemented by the channel provider which forms a command queue.
357/// This trait provides a Oneshot channel a send interface for blocking mode.
358pub trait SyslogQueueOneChanSnd<C: Send>: fmt::Debug + Send
359{
360    /// Send to channel in blocing mode.
361    fn send_once_blocking(self, data: C) -> Result<(), C>;
362}
363
364/// A trait which should be implemented by the channel provider which forms a command queue.
365/// This trait provides a common interface which includes everything i.e channel, oneshot 
366/// channel and manipulations.
367pub trait SyslogQueueChannel<F: SyslogFormatter, D: SyslogDestination>: fmt::Debug + Send + Clone + 'static
368{
369    const ADAPTER_NAME: &'static str;
370    
371    /// A send side of the channel type.
372    type ChannelSnd: SyslogQueueChanSnd<F, D, Self>;
373    
374    /// A receive side of the channel type.
375    type ChannelRcv: SyslogQueueChanRcv<F, D, Self>;
376
377    /// A oneshot send side of the channel type.
378    type OneShotChannelSnd<C: Send + fmt::Debug>: SyslogQueueOneChanSnd<C>;
379
380    /// A oneshor receive side of the channel type.
381    type OneShotChannelRcv<C>: SyslogQueueOneChanRcv<C>;
382
383    /// Creates unbounded channel.
384    fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv);   
385
386    /// Creates oneshot channel.
387    fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>);
388}
389
390
391
392/// A parallel, shared instance of the syslog client which is running in the 
393/// separate thread and uses a crossbeam channel to receive the messages from 
394/// the program. It is also capable to combine sync and async i.e sync code and
395/// async code is writing to the same syslog connection.
396/// 
397/// For this isntance a [SyslogApi] and [SyStreamApi] are implemented.
398/// 
399/// Also if `async` is enabled, a [AsyncSyslogQueueApi] is implemented.
400/// 
401/// ```ignore
402/// let log = 
403///     QueuedSyslog::openlog(
404///         Some("test1"), 
405///         LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
406///         LogFacility::LOG_DAEMON,
407///         SyslogLocal::new()
408///     );
409/// ```
410/// 
411/// ```ignore
412/// let log = 
413///     QueuedSyslog
414///         ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
415///         ::openlog_with(
416///             Some("test1"), 
417///             LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
418///             LogFacility::LOG_DAEMON,
419///             SyslogLocal::new()
420///         );
421/// ```
422/// 
423/// ```ignore
424/// pub static SYSLOG3: LazyLock<SyncSyslog<DefaultSyslogFormatter, SyslogLocal,>> = 
425///     LazyLock::new(|| 
426///         {
427///             QueuedSyslog
428///                 ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
429///                 ::openlog_with(
430///                     Some("test1"), 
431///                     LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
432///                     LogFacility::LOG_DAEMON,
433///                     SyslogLocal::new()
434///                 )
435///                 .unwrap()
436///         }
437///     );
438/// ```
439/// # Generics
440/// 
441/// * `S` - a [SyslogQueueChannel] a MPSC provider.
442/// 
443/// * `F` - a [SyslogFormatter] which sets the instance which would 
444///     format the message.
445/// 
446/// * `D` - a [SyslogDestination] instance which is either:
447///     [SyslogLocal], [SyslogFile], [SyslogNet], [SyslogTls]. By
448///     default a `SyslogLocal` is selected.
449#[derive(Debug, Clone)]
450pub struct QueuedSyslog<S = DefaultQueueAdapter, F = DefaultSyslogFormatter, D = SyslogLocal>
451where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
452{   
453    /// Control flag
454    run_control: Weak<AtomicBool>,
455
456    /// commands channel
457    pub(crate) tasks: S::ChannelSnd,//q_adapter::SendChannelSyCmd<D>,
458
459    /// process thread
460    thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
461
462    /// phantom for [SyslogFormatter]
463    _p: PhantomData<F>,
464
465    /// phantom for [SyslogDestination]
466    _p2: PhantomData<D>,
467}
468
469
470unsafe impl<F, D, S> Send for QueuedSyslog<S, F, D> 
471where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
472{}
473
474
475impl<F, D, S> Drop for QueuedSyslog<S, F, D>
476where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
477{
478    fn drop(&mut self) 
479    {
480        if let Some(ctrl) = self.run_control.upgrade()
481        {   
482            ctrl.store(false, Ordering::SeqCst);
483
484            if let Err(_e) = self.tasks.q_send_blocking(SyCmd::form_stop())
485            {
486
487            }
488
489            let join_handle = self.thread.lock().unwrap().take().unwrap();
490
491            let _ = join_handle.join();
492        }
493    }
494}
495
496impl QueuedSyslog
497{
498    /// Opens a default connection to the local syslog server with default formatter with
499    /// formatter [SyslogFormatter] and destination [SyslogLocal].
500    /// 
501    /// # Arguments
502    /// 
503    /// * `ident` - A program name which will appear on the logs. If none, will be determined
504    ///     automatically.
505    /// 
506    /// * `logstat` - [LogStat] an instance config.
507    /// 
508    /// * `facility` - [LogFacility] a syslog facility.
509    /// 
510    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
511    /// 
512    /// # Returns
513    /// 
514    /// A [SyRes] is returned ([Result]) with: 
515    /// 
516    /// * [Result::Ok] - with instance
517    /// 
518    /// * [Result::Err] - with error description.
519    pub 
520    fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: SyslogLocal) -> SyRes<Self>
521    {
522        // creating internal syslog struct
523
524         let log_items = 
525                LogItems::new(ident, 0xff, logstat, facility);
526
527        let stream = 
528            SyslogSocketLockless::<SyslogLocal>::new(logstat, net_tap_prov)?;
529
530        let (inst, sender, run_ctrl) = 
531            SyslogInternal
532                ::<DefaultSyslogFormatter, SyslogLocal, DefaultQueueAdapter>
533                ::new(log_items, stream)?;
534        
535        
536        let thr_name: String = "syslog_queue/0".into();
537
538        // initiate a thread
539        let thread_hnd = 
540            thread::Builder::new()
541                .name(thr_name.clone())
542                .spawn(move || 
543                    SyslogInternal
544                        ::<DefaultSyslogFormatter, SyslogLocal, DefaultQueueAdapter>
545                        ::thread_worker(inst)
546                )
547                .map_err(|e| 
548                    map_error!("{} thread spawn failed. {}", thr_name, e)
549                )?;
550
551        // creating a syslog public struct instance
552        let ret = 
553            Self
554            {
555                run_control: run_ctrl,
556                tasks: sender,
557                thread: Arc::new(Mutex::new(Some(thread_hnd))),
558                _p: PhantomData,
559                _p2: PhantomData
560            };
561
562        return Ok(ret);
563    }
564}
565
566impl<F, D, S> QueuedSyslog<S, F, D>
567where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
568{
569    /// Opens a default connection to the local syslog server with default formatter with
570    /// provided generics.
571    /// 
572    /// # Arguments
573    /// 
574    /// * `ident` - A program name which will appear on the logs. If none, will be determined
575    ///     automatically.
576    /// 
577    /// * `logstat` - [LogStat] an instance config.
578    /// 
579    /// * `facility` - [LogFacility] a syslog facility.
580    /// 
581    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
582    /// 
583    /// # Returns
584    /// 
585    /// A [SyRes] is returned ([Result]) with: 
586    /// 
587    /// * [Result::Ok] - with instance
588    /// 
589    /// * [Result::Err] - with error description.
590    pub 
591    fn openlog_with(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: D) -> SyRes<QueuedSyslog<S, F, D>>
592    {
593        // creating internal syslog struct
594
595         let log_items = 
596                LogItems::new(ident, 0xff, logstat, facility);
597
598        let stream = 
599            SyslogSocketLockless::<D>::new(logstat, net_tap_prov)?;
600
601        let (inst, sender, run_ctrl) = 
602            SyslogInternal::<F, D, S>::new(log_items, stream)?;
603        
604        
605        let thr_name: String = "syslog_queue/0".into();
606
607        // initiate a thread
608        let thread_hnd = 
609            thread::Builder::new()
610                .name(thr_name.clone())
611                .spawn(move || SyslogInternal::<F, D, S>::thread_worker(inst))
612                .map_err(|e| 
613                    map_error!("{} thread spawn failed. {}", thr_name, e)
614                )?;
615
616        // creating a syslog public struct instance
617        let ret = 
618            Self
619            {
620                run_control: run_ctrl,
621                tasks: sender,
622                thread: Arc::new(Mutex::new(Some(thread_hnd))),
623                _p: PhantomData::<F>,
624                _p2: PhantomData::<D>,
625            };
626
627        return Ok(ret);
628    }
629}
630
631
632impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogApi<F, D> 
633for QueuedSyslog<S, F, D>
634{
635    fn connectlog(&self) -> SyRes<()>
636    {
637        let (sy_cmd, loopback) = 
638            SyCmd::form_connectlog();
639
640        self.tasks.q_send_blocking(sy_cmd)?;
641
642        return 
643            loopback
644                .recv_once_blocking()?;
645    }
646
647    fn setlogmask(&self, logmask: i32) -> SyRes<i32> 
648    {
649        let (sy_cmd, loopback) = 
650            SyCmd::form_logmask(logmask);
651
652        self.tasks.q_send_blocking(sy_cmd)?;
653
654        return 
655            loopback
656                .recv_once_blocking();
657    }
658
659    fn closelog(&self) -> SyRes<()> 
660    {
661        let (sy_cmd, loopback) = 
662            SyCmd::form_disconnectlog();
663
664        // send stop
665        self.tasks.q_send_blocking(sy_cmd)?;
666
667        return 
668            loopback
669                .recv_once_blocking()?;
670    }
671
672    fn syslog(&self, pri: Priority, fmt: F) 
673    {
674        // even if the thread is in a process of termination, there is
675        // no need to sync access to the run_control field as even if
676        // syslog thread will terminate before someone push something on the
677        // queue, it will be left in the queue until the end of program's time.
678        
679        let sy_cmd = SyCmd::form_syslog(pri, fmt);
680
681        let _ = self.tasks.q_send_blocking(sy_cmd);
682        //tasks.send(sy_cmd);
683
684        return;
685    }
686
687    fn change_identity(&self, ident: Option<&str>) -> SyRes<()>
688    {
689        let sy_cmd = 
690            SyCmd::form_change_ident(ident.map(|v| v.to_string()));
691
692        return 
693            self.tasks.q_send_blocking(sy_cmd);
694    }
695
696    fn reconnect(&self) -> SyRes<()>
697    {
698        return 
699            self.tasks.q_send_blocking(SyCmd::form_reconnect());
700    }
701
702    fn update_tap_data(&self, tap_data: D) -> SyRes<()>
703    {
704        let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
705
706        self.tasks.q_send_blocking(tap_data_cmd)?;
707
708        return 
709            loopback
710                .recv_once_blocking()?;
711    }
712}
713
714
715impl<'stream, F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyStreamApi<'stream, F, D, QueuedSyslog<S, F, D>> 
716for QueuedSyslog<S, F, D>
717{
718    fn stream(&'stream self, pri: Priority) -> SyStream<'stream, D, F, QueuedSyslog<S, F, D>> 
719    {
720        return 
721            SyStream 
722            { 
723                inner: self, 
724                pri: pri, 
725                _p: PhantomData, 
726                _p1: PhantomData 
727            };
728    }
729}
730
731/// A queued implementation for ASYNC.
732#[cfg(all(feature = "build_with_queue", feature = "async_enabled"))]
733pub mod syslog_async_queue
734{
735    use crate::error::SyRes;
736    use crate::sy_sync_queue::{SyCmd, SyslogQueueChanSnd, SyslogQueueChannel, SyslogQueueOneChanRcv};
737    use crate::Priority;
738    use crate::{formatters::SyslogFormatter, SyslogDestination, QueuedSyslog};
739    use crate::a_sync::syslog_trait::AsyncSyslogQueueApi;
740
741    impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> AsyncSyslogQueueApi<F, D> 
742    for QueuedSyslog<S, F, D>
743    {
744        async 
745        fn a_connectlog(&mut self) -> SyRes<()> 
746        {
747            let (sy_cmd, loopback) = 
748                SyCmd::form_connectlog();
749
750            self.tasks.q_send(sy_cmd).await?;
751
752            return 
753                loopback
754                    .recv_once()
755                    .await?;
756        }
757
758        /// Sets the logmask to filter out the syslog calls. This function behaves 
759        /// differently as it behaves in syslog_sync.rs or syslog_async.rs.
760        /// It may return an error if: syslog thread had exit and some thread calls
761        /// this function. Or something happened with channel. 
762        /// This function blocks until the previous mask is received.
763        /// 
764        /// See macroses [LOG_MASK] and [LOG_UPTO] to generate mask
765        ///
766        /// # Example
767        ///
768        /// LOG_MASK!(Priority::LOG_EMERG) | LOG_MASK!(Priority::LOG_ERROR)
769        ///
770        /// or
771        ///
772        /// ~(LOG_MASK!(Priority::LOG_INFO))
773        /// LOG_UPTO!(Priority::LOG_ERROR) 
774        async 
775        fn a_setlogmask(&self, logmask: i32) -> SyRes<i32>
776        {
777            let (sy_cmd, loopback) = 
778                SyCmd::form_logmask(logmask);
779
780            self.tasks.q_send(sy_cmd).await?;
781
782            return 
783                loopback
784                    .recv_once()
785                    .await;
786        }
787
788        /// Closes connection to the syslog server
789        async 
790        fn a_closelog(&self) -> SyRes<()>
791        {
792            let (sy_cmd, loopback) = 
793                SyCmd::form_disconnectlog();
794
795            // send stop
796            self.tasks.q_send(sy_cmd).await?;
797
798            return 
799                loopback
800                    .recv_once()
801                    .await?;
802        }
803
804        /// Similar to libc, syslog() sends data to syslog server, but asynchroniously.
805        /// 
806        /// # Arguments
807        ///
808        /// * `pri` - a priority [Priority]
809        ///
810        /// * `fmt` - a program's message to be sent as payload. The message is encoded with the
811        ///     [SyslogFormatter] and may be different for different formatters.
812        #[inline]
813        async 
814        fn a_syslog(&self, pri: Priority, fmt: F)
815        {
816            let sy_cmd = SyCmd::form_syslog(pri, fmt);
817
818            let _ = self.tasks.q_send(sy_cmd).await;
819
820            return;
821        }
822
823
824        /// Performs the reconnection to the syslog server or file re-open.
825        /// 
826        /// # Returns
827        /// 
828        /// A [Result] is retured as [SyRes].
829        /// 
830        /// * [Result::Ok] - with empty inner type.
831        /// 
832        /// * [Result::Err] - an error code and description
833        async 
834        fn a_reconnect(&self) -> SyRes<()>
835        {
836            return 
837                self.tasks.q_send(SyCmd::form_reconnect()).await;
838        }
839
840        async 
841        fn a_change_identity(&self, ident: &str) -> SyRes<()> 
842        {
843            let sy_cmd = 
844                SyCmd::form_change_ident(Some(ident.to_string()));
845
846            return 
847                self.tasks.q_send(sy_cmd).await;
848        }
849
850        /// Updates the inner instance destionation i.e path to file
851        /// or server address. The type of destination can not be changed.
852        /// 
853        /// This function disconnects from syslog server if previously was 
854        /// connected (and reconnects if was connected previously).
855        /// 
856        /// # Arguments 
857        /// 
858        /// * `new_tap` - a consumed instance of type `D` [SyslogDestination]
859        /// 
860        /// # Returns 
861        /// 
862        /// A [SyRes] is returned. An error may be returned if:
863        /// 
864        /// * connection to server was failed
865        /// 
866        /// * incorrect type
867        /// 
868        /// * disconnect frm server failed
869        async 
870        fn a_update_tap_data(&self, new_tap: D) -> SyRes<()>
871        {
872            let (tap_data_cmd, loopback) = SyCmd::form_update_tap(new_tap);
873
874            self.tasks.q_send(tap_data_cmd).await?;
875
876            return 
877                loopback
878                    .recv_once()
879                    .await?;
880        }
881    }
882}
883
884
885
886#[cfg(all(feature = "build_with_queue", not(feature = "async_enabled")))]
887#[cfg(test)]
888mod test_queue_sync
889{
890    use crate::{formatters::DefaultSyslogFormatter, sync::{crossbeam_queue_adapter::DefaultQueueAdapter}, LogFacility, LogStat, Priority, SyslogApi, SyslogLocal, QueuedSyslog};
891
892    #[test]
893    fn test_queue_single_message()
894    {
895
896        let log = 
897                QueuedSyslog
898                    ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
899                    ::openlog_with(
900                    Some("test1"), 
901                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
902                    LogFacility::LOG_DAEMON,
903                SyslogLocal::new());
904
905        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
906
907        let log = log.unwrap();
908
909        let msg1 = format!("test UTF-8 проверка BOM UTF-8");
910        let now = std::time::Instant::now();
911
912        log.syslog(Priority::LOG_DEBUG, msg1.into());
913
914        let dur = now.elapsed();
915        println!("{:?}", dur);
916
917        let msg2 = format!("test UTF-8  きるさお命泉ぶねりよ日子金れっ");
918
919        let now = std::time::Instant::now();
920
921        log.syslog(Priority::LOG_DEBUG, msg2.into());
922
923        let dur = now.elapsed();
924        println!("{:?}", dur);
925
926        let _ = log.closelog();
927
928        return;
929    }
930}
931
932#[cfg(all(feature = "build_with_queue", feature = "async_embedded"))]
933#[cfg(test)]
934mod tests_queue
935{
936    use crate::{formatters::DefaultSyslogFormatter, sync::DefaultQueueAdapter, LogFacility, LogStat, Priority, QueuedSyslog, SyslogApi, SyslogLocal};
937 
938
939    #[test]
940    fn test_multithreading()
941    {
942        use std::sync::Arc;
943        use std::thread;
944        use std::time::{Instant, Duration};
945        use crate::LOG_MASK;
946
947        let log = 
948            QueuedSyslog
949                ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
950                ::openlog_with(
951                    Some("test5"), 
952                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
953                    LogFacility::LOG_DAEMON, SyslogLocal::new()
954                );
955
956        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
957
958        let log = Arc::new(log.unwrap());
959        let c1_log = log.clone();
960        let c2_log = log.clone();
961
962        thread::spawn(move|| {
963            for i in 0..5
964            {
965                thread::sleep(Duration::from_nanos(200));
966                let now = Instant::now();
967                c1_log.syslog(Priority::LOG_DEBUG, format!("a message from thread 1 #{}[]", i).into());
968                let elapsed = now.elapsed();
969                println!("t1: {:?}", elapsed);
970            }
971        });
972
973        thread::spawn(move|| {
974            for i in 0..5
975            {
976                thread::sleep(Duration::from_nanos(201));
977                let now = Instant::now();
978                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
979                let elapsed = now.elapsed();
980                println!("t2: {:?}", elapsed);
981            }
982        });
983
984        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
985
986        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
987        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
988
989        let now = Instant::now();
990        log.syslog(Priority::LOG_DEBUG, format!("A message from main, сообщение от главнюка").into());
991        let elapsed = now.elapsed();
992        println!("main: {:?}", elapsed);
993
994        thread::sleep(Duration::from_secs(2));
995
996        let _ = log.closelog();
997
998        thread::sleep(Duration::from_millis(500));
999
1000        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1001
1002        assert_eq!(res.is_ok(), true);
1003
1004        return;
1005    }
1006
1007    #[cfg(feature = "build_ext_file")]
1008    #[test]
1009    fn test_file_multithreading()
1010    {
1011        use std::sync::Arc;
1012        use std::thread;
1013        use std::time::{Instant, Duration};
1014        use crate::formatters::DefaultSyslogFormatterFile;
1015        use crate::{SyslogFile, LOG_MASK};
1016
1017        let log = 
1018            QueuedSyslog
1019                ::<DefaultQueueAdapter, DefaultSyslogFormatterFile, SyslogFile>
1020                ::openlog_with(
1021                    Some("test2"), 
1022                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
1023                    LogFacility::LOG_DAEMON, 
1024                    SyslogFile::new("/tmp/syslog_rs_test2.log")
1025                );
1026                    
1027
1028        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
1029
1030        let log = Arc::new(log.unwrap());
1031        let c1_log = log.clone();
1032        let c2_log = log.clone();
1033
1034        thread::spawn(move|| {
1035            for i in 0..5
1036            {
1037                thread::sleep(Duration::from_nanos(200));
1038                let now = Instant::now();
1039                c1_log.syslog(Priority::LOG_ALERT, format!("a message from thread 1 #{}[]", i).into());
1040                let elapsed = now.elapsed();
1041                println!("t1: {:?}", elapsed);
1042            }
1043        });
1044
1045        thread::spawn(move|| {
1046            for i in 0..5
1047            {
1048                thread::sleep(Duration::from_nanos(201));
1049                let now = Instant::now();
1050                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
1051                let elapsed = now.elapsed();
1052                println!("t2: {:?}", elapsed);
1053            }
1054        });
1055
1056        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1057
1058        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
1059        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
1060
1061        let now = Instant::now();
1062        log.syslog(Priority::LOG_DEBUG, format!("A message from main, きるさお命泉ぶねりよ日子金れっ").into());
1063        let elapsed = now.elapsed();
1064        println!("main: {:?}", elapsed);
1065
1066        thread::sleep(Duration::from_secs(2));
1067
1068        let _ = log.closelog();
1069
1070        thread::sleep(Duration::from_millis(500));
1071
1072        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1073
1074        assert_eq!(res.is_ok(), true);
1075
1076        return;
1077    }
1078}