syslog_rs/sync/
syslog_sync_queue.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 *   2. The MIT License (MIT)
12 *                     
13 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::borrow::Cow;
18use std::marker::PhantomData;
19use std::{fmt, thread};
20use std::sync::{Arc, Mutex, Weak};
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use crate::formatters::{DefaultSyslogFormatter, SyslogFormatter};
24use crate::sync::syslog_sync_internal::SyslogSocketLockless;
25use crate::sync::{LogItems, SyStream};
26use crate::sync::DefaultQueueAdapter;
27
28use crate::{map_error, SyStreamApi, SyncSyslog, SyslogDestination, SyslogLocal};
29use crate::common::*;
30use crate::error::SyRes;
31
32use super::syslog_trait::SyslogApi;
33
34
35/// A wrapper for the data commands in the queue
36pub enum SyCmd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
37{
38    /// A message to syslog server
39    Syslog
40    {
41        pri: Priority,
42        msg: F
43    },
44
45    /// A reuest to change logmask
46    Logmask
47    {
48        logmask: i32, 
49        loopback: S::OneShotChannelSnd<i32>,
50    },
51
52    /// A request to change identity
53    ChangeIdentity
54    {
55        identity: Option<String>,
56    },
57
58    /// Updates the tap settings
59    UpdateTap
60    {
61        tap_type: D,//TapTypeData,
62        loopback: S::OneShotChannelSnd<SyRes<()>>,
63    },
64
65    ConnectLog
66    {
67        loopback: S::OneShotChannelSnd<SyRes<()>>
68    },
69
70    DisconnectLog
71    {
72        loopback: S::OneShotChannelSnd<SyRes<()>>
73    },
74
75    /// A request to rotate file or reconnect.
76    Reconnect,
77
78    /// A request to stop processing and quit
79    #[allow(unused)]
80    Stop,
81}
82
83
84impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyCmd<F, D, S>
85{
86    /// Construct a message with data to send.
87    pub(crate) 
88    fn form_syslog(pri: Priority, msg: F) -> Self
89    {
90        return 
91            Self::Syslog
92            {
93                pri, msg
94            };
95    }
96
97    pub(crate) 
98    fn form_connectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
99    {
100        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
101
102        return 
103            (Self::ConnectLog{ loopback: tx }, rx);
104    }
105
106    pub(crate) 
107    fn form_disconnectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
108    {
109        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
110
111        return 
112            (Self::DisconnectLog{ loopback: tx }, rx);
113    }
114
115    /// Constructs a message to make logmask with or without previous PRI.
116    /// 
117    /// # Arguments
118    /// 
119    /// * `logmask` - a new logmask
120    pub(crate) 
121    fn form_logmask(logmask: i32) -> (Self, S::OneShotChannelRcv<i32>)
122    {
123        let (tx, rx) = S::create_oneshot_channel::<i32>();
124
125        return 
126            (Self::Logmask{ logmask, loopback: tx }, rx);
127    }
128
129    /// Constructs a message which should change the identity (appname) of the
130    /// instance.
131    pub(crate) 
132    fn form_change_ident(identity: Option<String>) -> Self
133    {
134        return 
135            Self::ChangeIdentity
136            {
137                identity: identity
138            };
139    }
140
141    /// Constructs a message which changes the destination of the log messages i.e
142    /// changing path of the dst file or address. The `new_tap_type` 
143    /// should be the same variant [TapTypeData] as previous.
144    pub(crate)
145    fn form_update_tap(new_tap_type: D/*TapTypeData*/) -> (Self, S::OneShotChannelRcv<SyRes<()>>)
146    {
147        let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
148
149        return (
150            Self::UpdateTap  
151            { 
152                tap_type: new_tap_type,
153                loopback: tx
154            },
155            rx
156        );
157    }
158
159    /// Constructs a message to handle SIGHUP. This is usefull only when the instance
160    /// is writing directly into the file. Or just reconnect.
161    pub(crate) 
162    fn form_reconnect() -> Self
163    {
164        return Self::Reconnect;
165    }
166
167    /// Constructs a message to stop thread gracefully. After receiving this
168    /// message a thread will quit and all messages that would be sent after
169    /// this message will be cleared from queue and a new messages will not be
170    /// received.
171    #[allow(unused)]
172    pub(crate) 
173    fn form_stop() -> Self
174    {
175        return Self::Stop;
176    }
177}
178
179/// Internal struct of the syslog client thread.
180struct SyslogInternal<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
181{
182    /// A explicit stop flag
183    run_flag: Arc<AtomicBool>,
184
185    /// commands channel
186    tasks: S::ChannelRcv,
187
188    /// Log config
189    log_items: LogItems,
190
191    /// socket
192    socket: SyslogSocketLockless<D>,
193}
194
195
196
197impl<F, D, S> SyslogInternal<F, D, S>
198where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
199{
200    fn new(log_items: LogItems, socket: SyslogSocketLockless<D>) -> SyRes<(Self, S::ChannelSnd, Weak<AtomicBool>)>
201    {
202        // control flag
203        let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
204        let run_control = Arc::downgrade(&run_flag);
205
206        // creating queue for messages
207        let (sender, receiver) = S::create_channel();
208
209        // creating internal syslog struct
210        let mut inst = 
211            SyslogInternal
212            {
213                run_flag: run_flag,
214                tasks: receiver,
215                log_items: log_items,
216                socket: socket
217            };
218
219        if inst.log_items.logstat.contains(LogStat::LOG_NDELAY) == true
220        {
221            inst.socket.connectlog()?;
222        }
223
224        return Ok((inst, sender, run_control));
225    }
226
227    fn thread_worker(mut self)
228    {
229        loop
230        {
231            // self will be dropped as soon as thread will be stopped
232            if self.run_flag.load(Ordering::Relaxed) == false
233            {
234                // force leave
235                break;
236            }	
237
238            match self.tasks.q_recv_blocking()
239			{
240				Some(task) =>
241				{
242                    match task
243                    {
244                        SyCmd::Syslog{ pri, msg } =>
245                        {
246                            let Some(formatted) = self.log_items.vsyslog1_msg::<F, D>(pri, &msg)
247                                else { continue };
248
249                            self.socket.vsyslog1(formatted.1, formatted.0);
250                        },
251                        SyCmd::Logmask{ logmask, loopback } =>
252                        {
253                            let pri = self.log_items.set_logmask(logmask);
254
255                            let _ = loopback.send_once_blocking(pri);
256                        },
257                        SyCmd::ChangeIdentity{ identity } =>
258                        {
259                            self.log_items.set_identity(identity.as_ref().map(|v| v.as_str()));
260                        },
261
262                        SyCmd::UpdateTap{ tap_type, loopback } =>
263                        {
264                            let res = self.socket.update_tap_data(tap_type);
265                            
266                            if let Err(Err(e)) = loopback.send_once_blocking(res)
267                            {
268                                self.log_items.logstat.send_to_stderr(&e.to_string());
269                            }
270                        },
271
272                        SyCmd::ConnectLog{ loopback} => 
273                        {
274                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.connectlog())
275                            {
276                                self.log_items.logstat.send_to_stderr(&e.to_string());
277                            }
278                        },
279
280                        SyCmd::DisconnectLog{ loopback} => 
281                        {
282                            if let Err(Err(e)) = loopback.send_once_blocking(self.socket.disconnectlog())
283                            {
284                                self.log_items.logstat.send_to_stderr(&e.to_string());
285                            }
286                        },
287
288                        SyCmd::Reconnect =>
289                        {
290                            if let Err(e) = self.socket.disconnectlog()
291                            {
292                                self.log_items.logstat.send_to_stderr(&e.to_string());
293                            }
294                                
295                            if let Err(e) = self.socket.connectlog()
296                            {
297                                self.log_items.logstat.send_to_stderr(&e.to_string());
298                            }
299                        },
300                        SyCmd::Stop =>
301                        {
302                            // ignore the rest
303                            break;
304                        }
305                    }
306                },
307                None =>
308                {
309                    break;
310                }
311            } // match
312
313        } // loop
314
315        return;
316    }
317}
318
319/// A trait which should be implemented by the channel provider which forms a command queue.
320/// This trait provides a blocking receive interface on the receiver side.
321pub trait SyslogQueueChanRcv<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Send
322{
323    /// Receive from channel in blocking mode.
324    fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>;
325}
326
327/// A trait which should be implemented by the channel provider which forms a command queue.
328/// This trait provides both or either the blocking send interface.
329#[allow(async_fn_in_trait)]
330pub trait SyslogQueueChanSnd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Clone
331{
332    /// Sends the message over the channel in blocking mode.
333    fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>;
334
335    /// Sends the message over the channel in async mode. By default, it returns error.
336    async fn q_send(&self, _msg: SyCmd<F, D, S>) -> SyRes<()>
337    {
338        crate::throw_error!("async is not availabe here"); 
339    }
340}
341
342/// A trait which should be implemented by the channel provider which forms a command queue.
343/// This trait provides a Oneshot channel a receive interface for both sync and async modes.
344#[allow(async_fn_in_trait)]
345pub trait SyslogQueueOneChanRcv<C>
346{
347    /// Receives once from the channel in sync mode.
348    fn recv_once_blocking(self) -> SyRes<C>;
349
350    /// Receives once from the channel in async mode. By default, it returns error if not 
351    /// implemented.
352    async fn recv_once(self) -> SyRes<C> where Self: Sized
353    {
354        crate::throw_error!("async is not availabe here"); 
355    }
356}
357
358/// A trait which should be implemented by the channel provider which forms a command queue.
359/// This trait provides a Oneshot channel a send interface for blocking mode.
360pub trait SyslogQueueOneChanSnd<C: Send>: fmt::Debug + Send
361{
362    /// Send to channel in blocing mode.
363    fn send_once_blocking(self, data: C) -> Result<(), C>;
364}
365
366/// A trait which should be implemented by the channel provider which forms a command queue.
367/// This trait provides a common interface which includes everything i.e channel, oneshot 
368/// channel and manipulations.
369pub trait SyslogQueueChannel<F: SyslogFormatter, D: SyslogDestination>: fmt::Debug + Send + Clone + 'static
370{
371    const ADAPTER_NAME: &'static str;
372    
373    /// A send side of the channel type.
374    type ChannelSnd: SyslogQueueChanSnd<F, D, Self>;
375    
376    /// A receive side of the channel type.
377    type ChannelRcv: SyslogQueueChanRcv<F, D, Self>;
378
379    /// A oneshot send side of the channel type.
380    type OneShotChannelSnd<C: Send + fmt::Debug>: SyslogQueueOneChanSnd<C>;
381
382    /// A oneshor receive side of the channel type.
383    type OneShotChannelRcv<C>: SyslogQueueOneChanRcv<C>;
384
385    /// Creates unbounded channel.
386    fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv);   
387
388    /// Creates oneshot channel.
389    fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>);
390}
391
392
393
394/// A parallel, shared instance of the syslog client which is running in the 
395/// separate thread and uses a crossbeam channel to receive the messages from 
396/// the program. It is also capable to combine sync and async i.e sync code and
397/// async code is writing to the same syslog connection.
398/// 
399/// For this isntance a [SyslogApi] and [SyStreamApi] are implemented.
400/// 
401/// Also if `async` is enabled, a [AsyncSyslogQueueApi] is implemented.
402/// 
403/// ```ignore
404/// let log = 
405///     QueuedSyslog::openlog(
406///         Some("test1"), 
407///         LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
408///         LogFacility::LOG_DAEMON,
409///         SyslogLocal::new()
410///     );
411/// ```
412/// 
413/// ```ignore
414/// let log = 
415///     QueuedSyslog
416///         ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
417///         ::openlog_with(
418///             Some("test1"), 
419///             LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
420///             LogFacility::LOG_DAEMON,
421///             SyslogLocal::new()
422///         );
423/// ```
424/// 
425/// ```ignore
426/// pub static SYSLOG3: LazyLock<SyncSyslog<DefaultSyslogFormatter, SyslogLocal,>> = 
427///     LazyLock::new(|| 
428///         {
429///             QueuedSyslog
430///                 ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
431///                 ::openlog_with(
432///                     Some("test1"), 
433///                     LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
434///                     LogFacility::LOG_DAEMON,
435///                     SyslogLocal::new()
436///                 )
437///                 .unwrap()
438///         }
439///     );
440/// ```
441/// 
442/// A stream is availble via [SyStreamApi].
443/// 
444/// ```ignore
445/// let _ = write!(SYSLOG.stream(Priority::LOG_DEBUG), "test {} 123 stream test ", d);
446/// ```
447/// 
448/// # Generics
449/// 
450/// * `S` - a [SyslogQueueChannel] a MPSC provider.
451/// 
452/// * `F` - a [SyslogFormatter] which sets the instance which would 
453///     format the message.
454/// 
455/// * `D` - a [SyslogDestination] instance which is either:
456///     [SyslogLocal], [SyslogFile], [SyslogNet], [SyslogTls]. By
457///     default a `SyslogLocal` is selected.
458#[derive(Debug, Clone)]
459pub struct QueuedSyslog<S = DefaultQueueAdapter, F = DefaultSyslogFormatter, D = SyslogLocal>
460where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
461{   
462    /// Control flag
463    run_control: Weak<AtomicBool>,
464
465    /// commands channel
466    pub(crate) tasks: S::ChannelSnd,//q_adapter::SendChannelSyCmd<D>,
467
468    /// process thread
469    thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
470
471    /// phantom for [SyslogFormatter]
472    _p: PhantomData<F>,
473
474    /// phantom for [SyslogDestination]
475    _p2: PhantomData<D>,
476}
477
478
479unsafe impl<F, D, S> Send for QueuedSyslog<S, F, D> 
480where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
481{}
482
483
484impl<F, D, S> Drop for QueuedSyslog<S, F, D>
485where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
486{
487    fn drop(&mut self) 
488    {
489        if let Some(ctrl) = self.run_control.upgrade()
490        {   
491            ctrl.store(false, Ordering::SeqCst);
492
493            if let Err(_e) = self.tasks.q_send_blocking(SyCmd::form_stop())
494            {
495
496            }
497
498            let join_handle = self.thread.lock().unwrap().take().unwrap();
499
500            let _ = join_handle.join();
501        }
502    }
503}
504
505impl QueuedSyslog
506{
507    /// Opens a default connection to the local syslog server with default formatter with
508    /// formatter [SyslogFormatter] and destination [SyslogLocal].
509    /// 
510    /// # Arguments
511    /// 
512    /// * `ident` - A program name which will appear on the logs. If none, will be determined
513    ///     automatically.
514    /// 
515    /// * `logstat` - [LogStat] an instance config.
516    /// 
517    /// * `facility` - [LogFacility] a syslog facility.
518    /// 
519    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
520    /// 
521    /// # Returns
522    /// 
523    /// A [SyRes] is returned ([Result]) with: 
524    /// 
525    /// * [Result::Ok] - with instance
526    /// 
527    /// * [Result::Err] - with error description.
528    pub 
529    fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: SyslogLocal) -> SyRes<Self>
530    {
531        // creating internal syslog struct
532
533         let log_items = 
534                LogItems::new(ident, 0xff, logstat, facility);
535
536        let stream = 
537            SyslogSocketLockless::<SyslogLocal>::new(logstat, net_tap_prov)?;
538
539        let (inst, sender, run_ctrl) = 
540            SyslogInternal
541                ::<DefaultSyslogFormatter, SyslogLocal, DefaultQueueAdapter>
542                ::new(log_items, stream)?;
543        
544        
545        let thr_name: String = "syslog_queue/0".into();
546
547        // initiate a thread
548        let thread_hnd = 
549            thread::Builder::new()
550                .name(thr_name.clone())
551                .spawn(move || 
552                    SyslogInternal
553                        ::<DefaultSyslogFormatter, SyslogLocal, DefaultQueueAdapter>
554                        ::thread_worker(inst)
555                )
556                .map_err(|e| 
557                    map_error!("{} thread spawn failed. {}", thr_name, e)
558                )?;
559
560        // creating a syslog public struct instance
561        let ret = 
562            Self
563            {
564                run_control: run_ctrl,
565                tasks: sender,
566                thread: Arc::new(Mutex::new(Some(thread_hnd))),
567                _p: PhantomData,
568                _p2: PhantomData
569            };
570
571        return Ok(ret);
572    }
573}
574
575impl<F, D, S> QueuedSyslog<S, F, D>
576where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
577{
578    /// Opens a default connection to the local syslog server with default formatter with
579    /// provided generics.
580    /// 
581    /// # Arguments
582    /// 
583    /// * `ident` - A program name which will appear on the logs. If none, will be determined
584    ///     automatically.
585    /// 
586    /// * `logstat` - [LogStat] an instance config.
587    /// 
588    /// * `facility` - [LogFacility] a syslog facility.
589    /// 
590    /// * `net_tap_prov` - a [SyslogLocal] instance with configuration.
591    /// 
592    /// # Returns
593    /// 
594    /// A [SyRes] is returned ([Result]) with: 
595    /// 
596    /// * [Result::Ok] - with instance
597    /// 
598    /// * [Result::Err] - with error description.
599    pub 
600    fn openlog_with(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: D) -> SyRes<QueuedSyslog<S, F, D>>
601    {
602        // creating internal syslog struct
603
604         let log_items = 
605                LogItems::new(ident, 0xff, logstat, facility);
606
607        let stream = 
608            SyslogSocketLockless::<D>::new(logstat, net_tap_prov)?;
609
610        let (inst, sender, run_ctrl) = 
611            SyslogInternal::<F, D, S>::new(log_items, stream)?;
612        
613        
614        let thr_name: String = "syslog_queue/0".into();
615
616        // initiate a thread
617        let thread_hnd = 
618            thread::Builder::new()
619                .name(thr_name.clone())
620                .spawn(move || SyslogInternal::<F, D, S>::thread_worker(inst))
621                .map_err(|e| 
622                    map_error!("{} thread spawn failed. {}", thr_name, e)
623                )?;
624
625        // creating a syslog public struct instance
626        let ret = 
627            Self
628            {
629                run_control: run_ctrl,
630                tasks: sender,
631                thread: Arc::new(Mutex::new(Some(thread_hnd))),
632                _p: PhantomData::<F>,
633                _p2: PhantomData::<D>,
634            };
635
636        return Ok(ret);
637    }
638}
639
640
641impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogApi<F, D> 
642for QueuedSyslog<S, F, D>
643{
644    fn connectlog(&self) -> SyRes<()>
645    {
646        let (sy_cmd, loopback) = 
647            SyCmd::form_connectlog();
648
649        self.tasks.q_send_blocking(sy_cmd)?;
650
651        return 
652            loopback
653                .recv_once_blocking()?;
654    }
655
656    fn setlogmask(&self, logmask: i32) -> SyRes<i32> 
657    {
658        let (sy_cmd, loopback) = 
659            SyCmd::form_logmask(logmask);
660
661        self.tasks.q_send_blocking(sy_cmd)?;
662
663        return 
664            loopback
665                .recv_once_blocking();
666    }
667
668    fn closelog(&self) -> SyRes<()> 
669    {
670        let (sy_cmd, loopback) = 
671            SyCmd::form_disconnectlog();
672
673        // send stop
674        self.tasks.q_send_blocking(sy_cmd)?;
675
676        return 
677            loopback
678                .recv_once_blocking()?;
679    }
680
681    fn syslog(&self, pri: Priority, fmt: F) 
682    {
683        // even if the thread is in a process of termination, there is
684        // no need to sync access to the run_control field as even if
685        // syslog thread will terminate before someone push something on the
686        // queue, it will be left in the queue until the end of program's time.
687        
688        let sy_cmd = SyCmd::form_syslog(pri, fmt);
689
690        let _ = self.tasks.q_send_blocking(sy_cmd);
691        //tasks.send(sy_cmd);
692
693        return;
694    }
695
696    fn change_identity(&self, ident: Option<&str>) -> SyRes<()>
697    {
698        let sy_cmd = 
699            SyCmd::form_change_ident(ident.map(|v| v.to_string()));
700
701        return 
702            self.tasks.q_send_blocking(sy_cmd);
703    }
704
705    fn reconnect(&self) -> SyRes<()>
706    {
707        return 
708            self.tasks.q_send_blocking(SyCmd::form_reconnect());
709    }
710
711    fn update_tap_data(&self, tap_data: D) -> SyRes<()>
712    {
713        let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
714
715        self.tasks.q_send_blocking(tap_data_cmd)?;
716
717        return 
718            loopback
719                .recv_once_blocking()?;
720    }
721}
722
723
724impl<'stream, F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyStreamApi<'stream, F, D, QueuedSyslog<S, F, D>> 
725for QueuedSyslog<S, F, D>
726{
727    fn stream(&'stream self, pri: Priority) -> SyStream<'stream, D, F, QueuedSyslog<S, F, D>> 
728    {
729        return 
730            SyStream 
731            { 
732                inner: self, 
733                pri: pri, 
734                _p: PhantomData, 
735                _p1: PhantomData 
736            };
737    }
738}
739
740/// A queued implementation for ASYNC.
741#[cfg(all(feature = "build_with_queue", feature = "async_enabled"))]
742pub mod syslog_async_queue
743{
744    use crate::error::SyRes;
745    use crate::sy_sync_queue::{SyCmd, SyslogQueueChanSnd, SyslogQueueChannel, SyslogQueueOneChanRcv};
746    use crate::Priority;
747    use crate::{formatters::SyslogFormatter, SyslogDestination, QueuedSyslog};
748    use crate::a_sync::syslog_trait::AsyncSyslogQueueApi;
749
750    impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> AsyncSyslogQueueApi<F, D> 
751    for QueuedSyslog<S, F, D>
752    {
753        async 
754        fn a_connectlog(&mut self) -> SyRes<()> 
755        {
756            let (sy_cmd, loopback) = 
757                SyCmd::form_connectlog();
758
759            self.tasks.q_send(sy_cmd).await?;
760
761            return 
762                loopback
763                    .recv_once()
764                    .await?;
765        }
766
767        /// Sets the logmask to filter out the syslog calls. This function behaves 
768        /// differently as it behaves in syslog_sync.rs or syslog_async.rs.
769        /// It may return an error if: syslog thread had exit and some thread calls
770        /// this function. Or something happened with channel. 
771        /// This function blocks until the previous mask is received.
772        /// 
773        /// See macroses [LOG_MASK] and [LOG_UPTO] to generate mask
774        ///
775        /// # Example
776        ///
777        /// LOG_MASK!(Priority::LOG_EMERG) | LOG_MASK!(Priority::LOG_ERROR)
778        ///
779        /// or
780        ///
781        /// ~(LOG_MASK!(Priority::LOG_INFO))
782        /// LOG_UPTO!(Priority::LOG_ERROR) 
783        async 
784        fn a_setlogmask(&self, logmask: i32) -> SyRes<i32>
785        {
786            let (sy_cmd, loopback) = 
787                SyCmd::form_logmask(logmask);
788
789            self.tasks.q_send(sy_cmd).await?;
790
791            return 
792                loopback
793                    .recv_once()
794                    .await;
795        }
796
797        /// Closes connection to the syslog server
798        async 
799        fn a_closelog(&self) -> SyRes<()>
800        {
801            let (sy_cmd, loopback) = 
802                SyCmd::form_disconnectlog();
803
804            // send stop
805            self.tasks.q_send(sy_cmd).await?;
806
807            return 
808                loopback
809                    .recv_once()
810                    .await?;
811        }
812
813        /// Similar to libc, syslog() sends data to syslog server, but asynchroniously.
814        /// 
815        /// # Arguments
816        ///
817        /// * `pri` - a priority [Priority]
818        ///
819        /// * `fmt` - a program's message to be sent as payload. The message is encoded with the
820        ///     [SyslogFormatter] and may be different for different formatters.
821        #[inline]
822        async 
823        fn a_syslog(&self, pri: Priority, fmt: F)
824        {
825            let sy_cmd = SyCmd::form_syslog(pri, fmt);
826
827            let _ = self.tasks.q_send(sy_cmd).await;
828
829            return;
830        }
831
832
833        /// Performs the reconnection to the syslog server or file re-open.
834        /// 
835        /// # Returns
836        /// 
837        /// A [Result] is retured as [SyRes].
838        /// 
839        /// * [Result::Ok] - with empty inner type.
840        /// 
841        /// * [Result::Err] - an error code and description
842        async 
843        fn a_reconnect(&self) -> SyRes<()>
844        {
845            return 
846                self.tasks.q_send(SyCmd::form_reconnect()).await;
847        }
848
849        async 
850        fn a_change_identity(&self, ident: &str) -> SyRes<()> 
851        {
852            let sy_cmd = 
853                SyCmd::form_change_ident(Some(ident.to_string()));
854
855            return 
856                self.tasks.q_send(sy_cmd).await;
857        }
858
859        /// Updates the inner instance destionation i.e path to file
860        /// or server address. The type of destination can not be changed.
861        /// 
862        /// This function disconnects from syslog server if previously was 
863        /// connected (and reconnects if was connected previously).
864        /// 
865        /// # Arguments 
866        /// 
867        /// * `new_tap` - a consumed instance of type `D` [SyslogDestination]
868        /// 
869        /// # Returns 
870        /// 
871        /// A [SyRes] is returned. An error may be returned if:
872        /// 
873        /// * connection to server was failed
874        /// 
875        /// * incorrect type
876        /// 
877        /// * disconnect frm server failed
878        async 
879        fn a_update_tap_data(&self, new_tap: D) -> SyRes<()>
880        {
881            let (tap_data_cmd, loopback) = SyCmd::form_update_tap(new_tap);
882
883            self.tasks.q_send(tap_data_cmd).await?;
884
885            return 
886                loopback
887                    .recv_once()
888                    .await?;
889        }
890    }
891}
892
893
894
895#[cfg(all(feature = "build_with_queue", not(feature = "async_enabled")))]
896#[cfg(test)]
897mod test_queue_sync
898{
899    use crate::{formatters::DefaultSyslogFormatter, sync::{crossbeam_queue_adapter::DefaultQueueAdapter}, LogFacility, LogStat, Priority, SyslogApi, SyslogLocal, QueuedSyslog};
900
901    #[test]
902    fn test_queue_single_message()
903    {
904
905        let log = 
906                QueuedSyslog
907                    ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
908                    ::openlog_with(
909                    Some("test1"), 
910                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
911                    LogFacility::LOG_DAEMON,
912                SyslogLocal::new());
913
914        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
915
916        let log = log.unwrap();
917
918        let msg1 = format!("test UTF-8 проверка BOM UTF-8");
919        let now = std::time::Instant::now();
920
921        log.syslog(Priority::LOG_DEBUG, msg1.into());
922
923        let dur = now.elapsed();
924        println!("{:?}", dur);
925
926        let msg2 = format!("test UTF-8  きるさお命泉ぶねりよ日子金れっ");
927
928        let now = std::time::Instant::now();
929
930        log.syslog(Priority::LOG_DEBUG, msg2.into());
931
932        let dur = now.elapsed();
933        println!("{:?}", dur);
934
935        let _ = log.closelog();
936
937        return;
938    }
939}
940
941#[cfg(all(feature = "build_with_queue", feature = "async_embedded"))]
942#[cfg(test)]
943mod tests_queue
944{
945    use crate::{formatters::DefaultSyslogFormatter, sync::DefaultQueueAdapter, LogFacility, LogStat, Priority, QueuedSyslog, SyslogApi, SyslogLocal};
946 
947
948    #[test]
949    fn test_multithreading()
950    {
951        use std::sync::Arc;
952        use std::thread;
953        use std::time::{Instant, Duration};
954        use crate::LOG_MASK;
955
956        let log = 
957            QueuedSyslog
958                ::<DefaultQueueAdapter, DefaultSyslogFormatter, SyslogLocal>
959                ::openlog_with(
960                    Some("test5"), 
961                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
962                    LogFacility::LOG_DAEMON, SyslogLocal::new()
963                );
964
965        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
966
967        let log = Arc::new(log.unwrap());
968        let c1_log = log.clone();
969        let c2_log = log.clone();
970
971        thread::spawn(move|| {
972            for i in 0..5
973            {
974                thread::sleep(Duration::from_nanos(200));
975                let now = Instant::now();
976                c1_log.syslog(Priority::LOG_DEBUG, format!("a message from thread 1 #{}[]", i).into());
977                let elapsed = now.elapsed();
978                println!("t1: {:?}", elapsed);
979            }
980        });
981
982        thread::spawn(move|| {
983            for i in 0..5
984            {
985                thread::sleep(Duration::from_nanos(201));
986                let now = Instant::now();
987                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
988                let elapsed = now.elapsed();
989                println!("t2: {:?}", elapsed);
990            }
991        });
992
993        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
994
995        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
996        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
997
998        let now = Instant::now();
999        log.syslog(Priority::LOG_DEBUG, format!("A message from main, сообщение от главнюка").into());
1000        let elapsed = now.elapsed();
1001        println!("main: {:?}", elapsed);
1002
1003        thread::sleep(Duration::from_secs(2));
1004
1005        let _ = log.closelog();
1006
1007        thread::sleep(Duration::from_millis(500));
1008
1009        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1010
1011        assert_eq!(res.is_ok(), true);
1012
1013        return;
1014    }
1015
1016    #[cfg(feature = "build_ext_file")]
1017    #[test]
1018    fn test_file_multithreading()
1019    {
1020        use std::sync::Arc;
1021        use std::thread;
1022        use std::time::{Instant, Duration};
1023        use crate::formatters::DefaultSyslogFormatterFile;
1024        use crate::{SyslogFile, LOG_MASK};
1025
1026        let log = 
1027            QueuedSyslog
1028                ::<DefaultQueueAdapter, DefaultSyslogFormatterFile, SyslogFile>
1029                ::openlog_with(
1030                    Some("test2"), 
1031                    LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID, 
1032                    LogFacility::LOG_DAEMON, 
1033                    SyslogFile::new("/tmp/syslog_rs_test2.log")
1034                );
1035                    
1036
1037        assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
1038
1039        let log = Arc::new(log.unwrap());
1040        let c1_log = log.clone();
1041        let c2_log = log.clone();
1042
1043        thread::spawn(move|| {
1044            for i in 0..5
1045            {
1046                thread::sleep(Duration::from_nanos(200));
1047                let now = Instant::now();
1048                c1_log.syslog(Priority::LOG_ALERT, format!("a message from thread 1 #{}[]", i).into());
1049                let elapsed = now.elapsed();
1050                println!("t1: {:?}", elapsed);
1051            }
1052        });
1053
1054        thread::spawn(move|| {
1055            for i in 0..5
1056            {
1057                thread::sleep(Duration::from_nanos(201));
1058                let now = Instant::now();
1059                c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
1060                let elapsed = now.elapsed();
1061                println!("t2: {:?}", elapsed);
1062            }
1063        });
1064
1065        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1066
1067        assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
1068        assert_eq!(res.unwrap(), 0xff, "should be 0xff");
1069
1070        let now = Instant::now();
1071        log.syslog(Priority::LOG_DEBUG, format!("A message from main, きるさお命泉ぶねりよ日子金れっ").into());
1072        let elapsed = now.elapsed();
1073        println!("main: {:?}", elapsed);
1074
1075        thread::sleep(Duration::from_secs(2));
1076
1077        let _ = log.closelog();
1078
1079        thread::sleep(Duration::from_millis(500));
1080
1081        let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
1082
1083        assert_eq!(res.is_ok(), true);
1084
1085        return;
1086    }
1087}