syslog_rs/sync/
syslog_sync_queue.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14
15use std::borrow::Cow;
16use std::marker::PhantomData;
17use std::{fmt, thread};
18use std::sync::{Arc, Mutex, Weak};
19use std::sync::atomic::{AtomicBool, Ordering};
20
21use crate::formatters::{DefaultSyslogFormatter, SyslogFormatter};
22use crate::sync::syslog_sync_internal::SyslogSocketLockless;
23use crate::sync::{LogItems, SyStream};
24use crate::sync::DefaultQueueAdapter;
25
26use crate::{map_error, SyStreamApi, SyncSyslog, SyslogDestination, SyslogLocal};
27use crate::common::*;
28use crate::error::SyRes;
29
30use super::syslog_trait::SyslogApi;
31
32
33/// A wrapper for the data commands in the queue
34pub enum SyCmd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
35{
36    /// A message to syslog server
37    Syslog
38    {
39        pri: Priority,
40        msg: F
41    },
42
43    /// A reuest to change logmask
44    Logmask
45    {
46        logmask: i32, 
47        loopback: S::OneShotChannelSnd<i32>,
48    },
49
50    /// A request to change identity
51    ChangeIdentity
52    {
53        identity: Option<String>,
54    },
55
56    /// Updates the tap settings
57    UpdateTap
58    {
59        tap_type: D,//TapTypeData,
60        loopback: S::OneShotChannelSnd<SyRes<()>>,
61    },
62
63    ConnectLog
64    {
65        loopback: S::OneShotChannelSnd<SyRes<()>>
66    },
67
68    DisconnectLog
69    {
70        loopback: S::OneShotChannelSnd<SyRes<()>>
71    },
72
73    /// A request to rotate file or reconnect.
74    Reconnect,
75
76    /// A request to stop processing and quit
77    #[allow(unused)]
78    Stop,
79}
80
81
82impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyCmd<F, D, S>
83{
84    /// Construct a message with data to send.
85    pub(crate) 
86    fn form_syslog(pri: Priority, msg: F) -> Self
87    {
88        return 
89            Self::Syslog
90            {
91                pri, msg
92            };
93    }
94
95    pub(crate) 
96    fn form_connectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
97    {
98        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
99
100        return 
101            (Self::ConnectLog{ loopback: tx }, rx);
102    }
103
104    pub(crate) 
105    fn form_disconnectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
106    {
107        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
108
109        return 
110            (Self::DisconnectLog{ loopback: tx }, rx);
111    }
112
113    /// Constructs a message to make logmask with or without previous PRI.
114    /// 
115    /// # Arguments
116    /// 
117    /// * `logmask` - a new logmask
118    pub(crate) 
119    fn form_logmask(logmask: i32) -> (Self, S::OneShotChannelRcv<i32>)
120    {
121        let (tx, rx) = S::create_oneshot_channel::<i32>();
122
123        return 
124            (Self::Logmask{ logmask, loopback: tx }, rx);
125    }
126
127    /// Constructs a message which should change the identity (appname) of the
128    /// instance.
129    pub(crate) 
130    fn form_change_ident(identity: Option<String>) -> Self
131    {
132        return 
133            Self::ChangeIdentity
134            {
135                identity: identity
136            };
137    }
138
139    /// Constructs a message which changes the destination of the log messages i.e
140    /// changing path of the dst file or address. The `new_tap_type` 
141    /// should be the same variant [TapTypeData] as previous.
142    pub(crate)
143    fn form_update_tap(new_tap_type: D/*TapTypeData*/) -> (Self, S::OneShotChannelRcv<SyRes<()>>)
144    {
145        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
146
147        return (
148            Self::UpdateTap  
149            { 
150                tap_type: new_tap_type,
151                loopback: tx
152            },
153            rx
154        );
155    }
156
157    /// Constructs a message to handle SIGHUP. This is usefull only when the instance
158    /// is writing directly into the file. Or just reconnect.
159    pub(crate) 
160    fn form_reconnect() -> Self
161    {
162        return Self::Reconnect;
163    }
164
165    /// Constructs a message to stop thread gracefully. After receiving this
166    /// message a thread will quit and all messages that would be sent after
167    /// this message will be cleared from queue and a new messages will not be
168    /// received.
169    #[allow(unused)]
170    pub(crate) 
171    fn form_stop() -> Self
172    {
173        return Self::Stop;
174    }
175}
176
177/// Internal struct of the syslog client thread.
178struct SyslogInternal<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
179{
180    /// A explicit stop flag
181    run_flag: Arc<AtomicBool>,
182
183    /// commands channel
184    tasks: S::ChannelRcv,
185
186    /// Log config
187    log_items: LogItems,
188
189    /// socket
190    socket: SyslogSocketLockless<D>,
191}
192
193
194
195impl<F, D, S> SyslogInternal<F, D, S>
196where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
197{
198    fn new(log_items: LogItems, socket: SyslogSocketLockless<D>) -> SyRes<(Self, S::ChannelSnd, Weak<AtomicBool>)>
199    {
200        // control flag
201        let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
202        let run_control = Arc::downgrade(&run_flag);
203
204        // creating queue for messages
205        let (sender, receiver) = S::create_channel();
206
207        // creating internal syslog struct
208        let mut inst = 
209            SyslogInternal
210            {
211                run_flag: run_flag,
212                tasks: receiver,
213                log_items: log_items,
214                socket: socket
215            };
216
217        if inst.log_items.logstat.contains(LogStat::LOG_NDELAY) == true
218        {
219            inst.socket.connectlog()?;
220        }
221
222        return Ok((inst, sender, run_control));
223    }
224
225    fn thread_worker(mut self)
226    {
227        loop
228        {
229            // self will be dropped as soon as thread will be stopped
230            if self.run_flag.load(Ordering::Relaxed) == false
231            {
232                // force leave
233                break;
234            }	
235
236            match self.tasks.q_recv_blocking()
237			{
238				Some(task) =>
239				{
240                    match task
241                    {
242                        SyCmd::Syslog{ pri, msg } =>
243                        {
244                            let Some(formatted) = self.log_items.vsyslog1_msg::<F, D>(pri, &msg)
245                                else { continue };
246
247                            self.socket.vsyslog1(formatted.1, formatted.0);
248                        },
249                        SyCmd::Logmask{ logmask, loopback } =>
250                        {
251                            let pri = self.log_items.set_logmask(logmask);
252
253                            let _ = loopback.send_once_blocking(pri);
254                        },
255                        SyCmd::ChangeIdentity{ identity } =>
256                        {
257                            self.log_items.set_identity(identity.as_ref().map(|v| v.as_str()));
258                        },
259
260                        SyCmd::UpdateTap{ tap_type, loopback } =>
261                        {
262                            let res = self.socket.update_tap_data(tap_type);
263                            
264                            if let Err(Err(e)) = loopback.send_once_blocking(res)
265                            {
266                                self.log_items.logstat.send_to_stderr(&e.to_string());
267                            }
268                        },
269
270                        SyCmd::ConnectLog{ loopback} => 
271                        {
272                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.connectlog())
273                            {
274                                self.log_items.logstat.send_to_stderr(&e.to_string());
275                            }
276                        },
277
278                        SyCmd::DisconnectLog{ loopback} => 
279                        {
280                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.disconnectlog())
281                            {
282                                self.log_items.logstat.send_to_stderr(&e.to_string());
283                            }
284                        },
285
286                        SyCmd::Reconnect =>
287                        {
288                            if let Err(e) = self.socket.disconnectlog()
289                            {
290                                self.log_items.logstat.send_to_stderr(&e.to_string());
291                            }
292                                
293                            if let Err(e) = self.socket.connectlog()
294                            {
295                                self.log_items.logstat.send_to_stderr(&e.to_string());
296                            }
297                        },
298                        SyCmd::Stop =>
299                        {
300                            // ignore the rest
301                            break;
302                        }
303                    }
304                },
305                None =>
306                {
307                    break;
308                }
309            } // match
310
311        } // loop
312
313        return;
314    }
315}
316
317/// A trait which should be implemented by the channel provider which forms a command queue.
318/// This trait provides a blocking receive interface on the receiver side.
319pub trait SyslogQueueChanRcv<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Send
320{
321    /// Receive from channel in blocking mode.
322    fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>;
323}
324
325/// A trait which should be implemented by the channel provider which forms a command queue.
326/// This trait provides both or either the blocking send interface.
327#[allow(async_fn_in_trait)]
328pub trait SyslogQueueChanSnd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Clone
329{
330    /// Sends the message over the channel in blocking mode.
331    fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>;
332
333    /// Sends the message over the channel in async mode. By default, it returns error.
334    async fn q_send(&self, _msg: SyCmd<F, D, S>) -> SyRes<()>
335    {
336        crate::throw_error!("async is not availabe here"); 
337    }
338}
339
340/// A trait which should be implemented by the channel provider which forms a command queue.
341/// This trait provides a Oneshot channel a receive interface for both sync and async modes.
342#[allow(async_fn_in_trait)]
343pub trait SyslogQueueOneChanRcv<C>
344{
345    /// Receives once from the channel in sync mode.
346    fn recv_once_blocking(self) -> SyRes<C>;
347
348    /// Receives once from the channel in async mode. By default, it returns error if not 
349    /// implemented.
350    async fn recv_once(self) -> SyRes<C> where Self: Sized
351    {
352        crate::throw_error!("async is not availabe here"); 
353    }
354}
355
356/// A trait which should be implemented by the channel provider which forms a command queue.
357/// This trait provides a Oneshot channel a send interface for blocking mode.
358pub trait SyslogQueueOneChanSnd<C: Send>: fmt::Debug + Send
359{
360    /// Send to channel in blocing mode.
361    fn send_once_blocking(self, data: C) -> Result<(), C>;
362}
363
364/// A trait which should be implemented by the channel provider which forms a command queue.
365/// This trait provides a common interface which includes everything i.e channel, oneshot 
366/// channel and manipulations.
367pub trait SyslogQueueChannel<F: SyslogFormatter, D: SyslogDestination>: fmt::Debug + Send + Clone + 'static
368{
369    const ADAPTER_NAME: &'static str;
370    
371    /// A send side of the channel type.
372    type ChannelSnd: SyslogQueueChanSnd<F, D, Self>;
373    
374    /// A receive side of the channel type.
375    type ChannelRcv: SyslogQueueChanRcv<F, D, Self>;
376
377    /// A oneshot send side of the channel type.
378    type OneShotChannelSnd<C: Send + fmt::Debug>: SyslogQueueOneChanSnd<C>;
379
380    /// A oneshor receive side of the channel type.
381    type OneShotChannelRcv<C>: SyslogQueueOneChanRcv<C>;
382
383    /// Creates unbounded channel.
384    fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv);   
385
386    /// Creates oneshot channel.
387    fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>);
388}
389
390
391
392/// A parallel, shared instance of the syslog client which is running in the 
393/// separate thread and uses a crossbeam channel to receive the messages from 
394/// the program. It is also capable to combine sync and async i.e sync code and
395/// async code is writing to the same syslog connection.
396/// 
397/// For this isntance a [SyslogApi] and [SyStreamApi] are implemented.
398/// 
399/// Also if `async` is enabled, a [AsyncSyslogQueueApi] is implemented.
400/// 
401/// ```ignore
402/// let log = 
403///     QueuedSyslog::openlog(
404///         Some("test1"), 
405///         LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
406///         LogFacility::LOG_DAEMON,
407///         SyslogLocal::new()
408///     );
409/// ```
410/// 
411/// ```ignore
412/// let log = 
413///     QueuedSyslog
414///         ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
415///         ::openlog_with(
416///             Some("test1"), 
417///             LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
418///             LogFacility::LOG_DAEMON,
419///             SyslogLocal::new()
420///         );
421/// ```
422/// 
423/// ```ignore
424/// pub static SYSLOG3: LazyLock<SyncSyslog<DefaultSyslogFormatter, SyslogLocal,>> = 
425///     LazyLock::new(|| 
426///         {
427///             QueuedSyslog
428///                 ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
429///                 ::openlog_with(
430///                     Some("test1"), 
431///                     LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
432///                     LogFacility::LOG_DAEMON,
433///                     SyslogLocal::new()
434///                 )
435///                 .unwrap()
436///         }
437///     );
438/// ```
439/// 
440/// A stream is availble via [SyStreamApi].
441/// 
442/// ```ignore
443/// let _ = write!(SYSLOG.stream(Priority::LOG_DEBUG), "test {} 123 stream test ", d);
444/// ```
445/// 
446/// # Generics
447/// 
448/// * `S` - a [SyslogQueueChannel] a MPSC provider.
449/// 
450/// * `F` - a [SyslogFormatter] which sets the instance which would 
451///     format the message.
452/// 
453/// * `D` - a [SyslogDestination] instance which is either:
454///     [SyslogLocal], [SyslogFile], [SyslogNet], [SyslogTls]. By
455///     default a `SyslogLocal` is selected.
456#[derive(Debug, Clone)]
457pub struct QueuedSyslog<S = DefaultQueueAdapter, F = DefaultSyslogFormatter, D = SyslogLocal>
458where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
459{   
460    /// Control flag
461    run_control: Weak<AtomicBool>,
462
463    /// commands channel
464    pub(crate) tasks: S::ChannelSnd,//q_adapter::SendChannelSyCmd<D>,
465
466    /// process thread
467    thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
468
469    /// phantom for [SyslogFormatter]
470    _p: PhantomData<F>,
471
472    /// phantom for [SyslogDestination]
473    _p2: PhantomData<D>,
474}
475
476
477unsafe impl<F, D, S> Send for QueuedSyslog<S, F, D> 
478where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
479{}
480
481
482impl<F, D, S> Drop for QueuedSyslog<S, F, D>
483where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
484{
485    fn drop(&mut self) 
486    {
487        if let Some(ctrl) = self.run_control.upgrade()
488        {   
489            ctrl.store(false, Ordering::SeqCst);
490
491            if let Err(_e) = self.tasks.q_send_blocking(SyCmd::form_stop())
492            {
493
494            }
495
496            let join_handle = self.thread.lock().unwrap().take().unwrap();
497
498            let _ = join_handle.join();
499        }
500    }
501}
502
503impl QueuedSyslog
504{
505    /// Opens a default connection to the local syslog server with default formatter with
506    /// formatter [SyslogFormatter] and destination [SyslogLocal].
507    /// 
508    /// # Arguments
509    /// 
510    /// * `ident` - A program name which will appear on the logs. If none, will be determined
511    ///     automatically.
512    /// 
513    /// * `logstat` - [LogStat] an instance config.
514    /// 
515    /// * `facility` - [LogFacility] a syslog facility.
516    /// 
517    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
518    /// 
519    /// # Returns
520    /// 
521    /// A [SyRes] is returned ([Result]) with: 
522    /// 
523    /// * [Result::Ok] - with instance
524    /// 
525    /// * [Result::Err] - with error description.
526    pub 
527    fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: SyslogLocal) -> SyRes<Self>
528    {
529        // creating internal syslog struct
530
531         let log_items = 
532                LogItems::new(ident, 0xff, logstat, facility);
533
534        let stream = 
535            SyslogSocketLockless::<SyslogLocal>::new(logstat, net_tap_prov)?;
536
537        let (inst, sender, run_ctrl) = 
538            SyslogInternal
539                ::<DefaultSyslogFormatter, SyslogLocal, DefaultQueueAdapter>
540                ::new(log_items, stream)?;
541        
542        
543        let thr_name: String = "syslog_queue/0".into();
544
545        // initiate a thread
546        let thread_hnd = 
547            thread::Builder::new()
548                .name(thr_name.clone())
549                .spawn(move || 
550                    SyslogInternal
551                        ::<DefaultSyslogFormatter, SyslogLocal, DefaultQueueAdapter>
552                        ::thread_worker(inst)
553                )
554                .map_err(|e| 
555                    map_error!("{} thread spawn failed. {}", thr_name, e)
556                )?;
557
558        // creating a syslog public struct instance
559        let ret = 
560            Self
561            {
562                run_control: run_ctrl,
563                tasks: sender,
564                thread: Arc::new(Mutex::new(Some(thread_hnd))),
565                _p: PhantomData,
566                _p2: PhantomData
567            };
568
569        return Ok(ret);
570    }
571}
572
573impl<F, D, S> QueuedSyslog<S, F, D>
574where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
575{
576    /// Opens a default connection to the local syslog server with default formatter with
577    /// provided generics.
578    /// 
579    /// # Arguments
580    /// 
581    /// * `ident` - A program name which will appear on the logs. If none, will be determined
582    ///     automatically.
583    /// 
584    /// * `logstat` - [LogStat] an instance config.
585    /// 
586    /// * `facility` - [LogFacility] a syslog facility.
587    /// 
588    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
589    /// 
590    /// # Returns
591    /// 
592    /// A [SyRes] is returned ([Result]) with: 
593    /// 
594    /// * [Result::Ok] - with instance
595    /// 
596    /// * [Result::Err] - with error description.
597    pub 
598    fn openlog_with(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: D) -> SyRes<QueuedSyslog<S, F, D>>
599    {
600        // creating internal syslog struct
601
602         let log_items = 
603                LogItems::new(ident, 0xff, logstat, facility);
604
605        let stream = 
606            SyslogSocketLockless::<D>::new(logstat, net_tap_prov)?;
607
608        let (inst, sender, run_ctrl) = 
609            SyslogInternal::<F, D, S>::new(log_items, stream)?;
610        
611        
612        let thr_name: String = "syslog_queue/0".into();
613
614        // initiate a thread
615        let thread_hnd = 
616            thread::Builder::new()
617                .name(thr_name.clone())
618                .spawn(move || SyslogInternal::<F, D, S>::thread_worker(inst))
619                .map_err(|e| 
620                    map_error!("{} thread spawn failed. {}", thr_name, e)
621                )?;
622
623        // creating a syslog public struct instance
624        let ret = 
625            Self
626            {
627                run_control: run_ctrl,
628                tasks: sender,
629                thread: Arc::new(Mutex::new(Some(thread_hnd))),
630                _p: PhantomData::<F>,
631                _p2: PhantomData::<D>,
632            };
633
634        return Ok(ret);
635    }
636}
637
638
639impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogApi<F, D> 
640for QueuedSyslog<S, F, D>
641{
642    fn connectlog(&self) -> SyRes<()>
643    {
644        let (sy_cmd, loopback) = 
645            SyCmd::form_connectlog();
646
647        self.tasks.q_send_blocking(sy_cmd)?;
648
649        return 
650            loopback
651                .recv_once_blocking()?;
652    }
653
654    fn setlogmask(&self, logmask: i32) -> SyRes<i32> 
655    {
656        let (sy_cmd, loopback) = 
657            SyCmd::form_logmask(logmask);
658
659        self.tasks.q_send_blocking(sy_cmd)?;
660
661        return 
662            loopback
663                .recv_once_blocking();
664    }
665
666    fn closelog(&self) -> SyRes<()> 
667    {
668        let (sy_cmd, loopback) = 
669            SyCmd::form_disconnectlog();
670
671        // send stop
672        self.tasks.q_send_blocking(sy_cmd)?;
673
674        return 
675            loopback
676                .recv_once_blocking()?;
677    }
678
679    fn syslog(&self, pri: Priority, fmt: F) 
680    {
681        // even if the thread is in a process of termination, there is
682        // no need to sync access to the run_control field as even if
683        // syslog thread will terminate before someone push something on the
684        // queue, it will be left in the queue until the end of program's time.
685        
686        let sy_cmd = SyCmd::form_syslog(pri, fmt);
687
688        let _ = self.tasks.q_send_blocking(sy_cmd);
689        //tasks.send(sy_cmd);
690
691        return;
692    }
693
694    fn change_identity(&self, ident: Option<&str>) -> SyRes<()>
695    {
696        let sy_cmd = 
697            SyCmd::form_change_ident(ident.map(|v| v.to_string()));
698
699        return 
700            self.tasks.q_send_blocking(sy_cmd);
701    }
702
703    fn reconnect(&self) -> SyRes<()>
704    {
705        return 
706            self.tasks.q_send_blocking(SyCmd::form_reconnect());
707    }
708
709    fn update_tap_data(&self, tap_data: D) -> SyRes<()>
710    {
711        let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
712
713        self.tasks.q_send_blocking(tap_data_cmd)?;
714
715        return 
716            loopback
717                .recv_once_blocking()?;
718    }
719}
720
721
722impl<'stream, F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyStreamApi<'stream, F, D, QueuedSyslog<S, F, D>> 
723for QueuedSyslog<S, F, D>
724{
725    fn stream(&'stream self, pri: Priority) -> SyStream<'stream, D, F, QueuedSyslog<S, F, D>> 
726    {
727        return 
728            SyStream 
729            { 
730                inner: self, 
731                pri: pri, 
732                _p: PhantomData, 
733                _p1: PhantomData 
734            };
735    }
736}
737
738/// A queued implementation for ASYNC.
739#[cfg(all(feature = "build_with_queue", feature = "async_enabled"))]
740pub mod syslog_async_queue
741{
742    use crate::error::SyRes;
743    use crate::sy_sync_queue::{SyCmd, SyslogQueueChanSnd, SyslogQueueChannel, SyslogQueueOneChanRcv};
744    use crate::Priority;
745    use crate::{formatters::SyslogFormatter, SyslogDestination, QueuedSyslog};
746    use crate::a_sync::syslog_trait::AsyncSyslogQueueApi;
747
748    impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> AsyncSyslogQueueApi<F, D> 
749    for QueuedSyslog<S, F, D>
750    {
751        async 
752        fn a_connectlog(&mut self) -> SyRes<()> 
753        {
754            let (sy_cmd, loopback) = 
755                SyCmd::form_connectlog();
756
757            self.tasks.q_send(sy_cmd).await?;
758
759            return 
760                loopback
761                    .recv_once()
762                    .await?;
763        }
764
765        /// Sets the logmask to filter out the syslog calls. This function behaves 
766        /// differently as it behaves in syslog_sync.rs or syslog_async.rs.
767        /// It may return an error if: syslog thread had exit and some thread calls
768        /// this function. Or something happened with channel. 
769        /// This function blocks until the previous mask is received.
770        /// 
771        /// See macroses [LOG_MASK] and [LOG_UPTO] to generate mask
772        ///
773        /// # Example
774        ///
775        /// LOG_MASK!(Priority::LOG_EMERG) | LOG_MASK!(Priority::LOG_ERROR)
776        ///
777        /// or
778        ///
779        /// ~(LOG_MASK!(Priority::LOG_INFO))
780        /// LOG_UPTO!(Priority::LOG_ERROR) 
781        async 
782        fn a_setlogmask(&self, logmask: i32) -> SyRes<i32>
783        {
784            let (sy_cmd, loopback) = 
785                SyCmd::form_logmask(logmask);
786
787            self.tasks.q_send(sy_cmd).await?;
788
789            return 
790                loopback
791                    .recv_once()
792                    .await;
793        }
794
795        /// Closes connection to the syslog server
796        async 
797        fn a_closelog(&self) -> SyRes<()>
798        {
799            let (sy_cmd, loopback) = 
800                SyCmd::form_disconnectlog();
801
802            // send stop
803            self.tasks.q_send(sy_cmd).await?;
804
805            return 
806                loopback
807                    .recv_once()
808                    .await?;
809        }
810
811        /// Similar to libc, syslog() sends data to syslog server, but asynchroniously.
812        /// 
813        /// # Arguments
814        ///
815        /// * `pri` - a priority [Priority]
816        ///
817        /// * `fmt` - a program's message to be sent as payload. The message is encoded with the
818        ///     [SyslogFormatter] and may be different for different formatters.
819        #[inline]
820        async 
821        fn a_syslog(&self, pri: Priority, fmt: F)
822        {
823            let sy_cmd = SyCmd::form_syslog(pri, fmt);
824
825            let _ = self.tasks.q_send(sy_cmd).await;
826
827            return;
828        }
829
830
831        /// Performs the reconnection to the syslog server or file re-open.
832        /// 
833        /// # Returns
834        /// 
835        /// A [Result] is retured as [SyRes].
836        /// 
837        /// * [Result::Ok] - with empty inner type.
838        /// 
839        /// * [Result::Err] - an error code and description
840        async 
841        fn a_reconnect(&self) -> SyRes<()>
842        {
843            return 
844                self.tasks.q_send(SyCmd::form_reconnect()).await;
845        }
846
847        async 
848        fn a_change_identity(&self, ident: &str) -> SyRes<()> 
849        {
850            let sy_cmd = 
851                SyCmd::form_change_ident(Some(ident.to_string()));
852
853            return 
854                self.tasks.q_send(sy_cmd).await;
855        }
856
857        /// Updates the inner instance destionation i.e path to file
858        /// or server address. The type of destination can not be changed.
859        /// 
860        /// This function disconnects from syslog server if previously was 
861        /// connected (and reconnects if was connected previously).
862        /// 
863        /// # Arguments 
864        /// 
865        /// * `new_tap` - a consumed instance of type `D` [SyslogDestination]
866        /// 
867        /// # Returns 
868        /// 
869        /// A [SyRes] is returned. An error may be returned if:
870        /// 
871        /// * connection to server was failed
872        /// 
873        /// * incorrect type
874        /// 
875        /// * disconnect frm server failed
876        async 
877        fn a_update_tap_data(&self, new_tap: D) -> SyRes<()>
878        {
879            let (tap_data_cmd, loopback) = SyCmd::form_update_tap(new_tap);
880
881            self.tasks.q_send(tap_data_cmd).await?;
882
883            return 
884                loopback
885                    .recv_once()
886                    .await?;
887        }
888    }
889}
890
891
892
893#[cfg(all(feature = "build_with_queue", not(feature = "async_enabled")))]
894#[cfg(test)]
895mod test_queue_sync
896{
897    use crate::{formatters::DefaultSyslogFormatter, sync::{crossbeam_queue_adapter::DefaultQueueAdapter}, LogFacility, LogStat, Priority, SyslogApi, SyslogLocal, QueuedSyslog};
898
899    #[test]
900    fn test_queue_single_message()
901    {
902
903        let log = 
904                QueuedSyslog
905                    ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
906                    ::openlog_with(
907                    Some("test1"), 
908                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
909                    LogFacility::LOG_DAEMON,
910                SyslogLocal::new());
911
912        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
913
914        let log = log.unwrap();
915
916        let msg1 = format!("test UTF-8 проверка BOM UTF-8");
917        let now = std::time::Instant::now();
918
919        log.syslog(Priority::LOG_DEBUG, msg1.into());
920
921        let dur = now.elapsed();
922        println!("{:?}", dur);
923
924        let msg2 = format!("test UTF-8  きるさお命泉ぶねりよ日子金れっ");
925
926        let now = std::time::Instant::now();
927
928        log.syslog(Priority::LOG_DEBUG, msg2.into());
929
930        let dur = now.elapsed();
931        println!("{:?}", dur);
932
933        let _ = log.closelog();
934
935        return;
936    }
937}
938
939#[cfg(all(feature = "build_with_queue", feature = "async_embedded"))]
940#[cfg(test)]
941mod tests_queue
942{
943    use crate::{formatters::DefaultSyslogFormatter, sync::DefaultQueueAdapter, LogFacility, LogStat, Priority, QueuedSyslog, SyslogApi, SyslogLocal};
944 
945
946    #[test]
947    fn test_multithreading()
948    {
949        use std::sync::Arc;
950        use std::thread;
951        use std::time::{Instant, Duration};
952        use crate::LOG_MASK;
953
954        let log = 
955            QueuedSyslog
956                ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
957                ::openlog_with(
958                    Some("test5"), 
959                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
960                    LogFacility::LOG_DAEMON, SyslogLocal::new()
961                );
962
963        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
964
965        let log = Arc::new(log.unwrap());
966        let c1_log = log.clone();
967        let c2_log = log.clone();
968
969        thread::spawn(move|| {
970            for i in 0..5
971            {
972                thread::sleep(Duration::from_nanos(200));
973                let now = Instant::now();
974                c1_log.syslog(Priority::LOG_DEBUG, format!("a message from thread 1 #{}[]", i).into());
975                let elapsed = now.elapsed();
976                println!("t1: {:?}", elapsed);
977            }
978        });
979
980        thread::spawn(move|| {
981            for i in 0..5
982            {
983                thread::sleep(Duration::from_nanos(201));
984                let now = Instant::now();
985                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
986                let elapsed = now.elapsed();
987                println!("t2: {:?}", elapsed);
988            }
989        });
990
991        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
992
993        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
994        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
995
996        let now = Instant::now();
997        log.syslog(Priority::LOG_DEBUG, format!("A message from main, сообщение от главнюка").into());
998        let elapsed = now.elapsed();
999        println!("main: {:?}", elapsed);
1000
1001        thread::sleep(Duration::from_secs(2));
1002
1003        let _ = log.closelog();
1004
1005        thread::sleep(Duration::from_millis(500));
1006
1007        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1008
1009        assert_eq!(res.is_ok(), true);
1010
1011        return;
1012    }
1013
1014    #[cfg(feature = "build_ext_file")]
1015    #[test]
1016    fn test_file_multithreading()
1017    {
1018        use std::sync::Arc;
1019        use std::thread;
1020        use std::time::{Instant, Duration};
1021        use crate::formatters::DefaultSyslogFormatterFile;
1022        use crate::{SyslogFile, LOG_MASK};
1023
1024        let log = 
1025            QueuedSyslog
1026                ::<DefaultQueueAdapter, DefaultSyslogFormatterFile, SyslogFile>
1027                ::openlog_with(
1028                    Some("test2"), 
1029                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
1030                    LogFacility::LOG_DAEMON, 
1031                    SyslogFile::new("/tmp/syslog_rs_test2.log")
1032                );
1033                    
1034
1035        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
1036
1037        let log = Arc::new(log.unwrap());
1038        let c1_log = log.clone();
1039        let c2_log = log.clone();
1040
1041        thread::spawn(move|| {
1042            for i in 0..5
1043            {
1044                thread::sleep(Duration::from_nanos(200));
1045                let now = Instant::now();
1046                c1_log.syslog(Priority::LOG_ALERT, format!("a message from thread 1 #{}[]", i).into());
1047                let elapsed = now.elapsed();
1048                println!("t1: {:?}", elapsed);
1049            }
1050        });
1051
1052        thread::spawn(move|| {
1053            for i in 0..5
1054            {
1055                thread::sleep(Duration::from_nanos(201));
1056                let now = Instant::now();
1057                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
1058                let elapsed = now.elapsed();
1059                println!("t2: {:?}", elapsed);
1060            }
1061        });
1062
1063        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1064
1065        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
1066        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
1067
1068        let now = Instant::now();
1069        log.syslog(Priority::LOG_DEBUG, format!("A message from main, きるさお命泉ぶねりよ日子金れっ").into());
1070        let elapsed = now.elapsed();
1071        println!("main: {:?}", elapsed);
1072
1073        thread::sleep(Duration::from_secs(2));
1074
1075        let _ = log.closelog();
1076
1077        thread::sleep(Duration::from_millis(500));
1078
1079        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1080
1081        assert_eq!(res.is_ok(), true);
1082
1083        return;
1084    }
1085}