syslog_rs/sync/
syslog_sync_queue.rs

1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 * 
4 * Copyright 2025 Aleksandr Morozov
5 * 
6 * Licensed under the EUPL, Version 1.2 or - as soon they will be approved by
7 * the European Commission - subsequent versions of the EUPL (the "Licence").
8 * 
9 * You may not use this work except in compliance with the Licence.
10 * 
11 * You may obtain a copy of the Licence at:
12 * 
13 *    https://joinup.ec.europa.eu/software/page/eupl
14 * 
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the Licence is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 * Licence for the specific language governing permissions and limitations
19 * under the Licence.
20 */
21
22
23use std::borrow::Cow;
24use std::marker::PhantomData;
25use std::thread;
26use std::sync::{Arc, Mutex, Weak};
27use std::sync::atomic::{AtomicBool, Ordering};
28
29use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
30use tokio::sync::oneshot;
31
32
33use crate::formatters::SyslogFormatter;
34use crate::socket::TapTypeData;
35use crate::{map_error, map_error_code, throw_error};
36use crate::common::*;
37use crate::error::SyRes;
38
39
40use super::syslog_sync_internal::SyncSyslogInternal;
41use super::syslog_trait::SyslogApi;
42
43#[derive(Debug)]
44pub struct SyslogQueueAdapter
45{
46    /// A control status check.
47    run_control: Weak<AtomicBool>,
48
49    /// An incoming messages.
50    tx: UnboundedSender<SyCmd>
51}
52
53impl SyslogQueueAdapter
54{
55    fn new(tx: UnboundedSender<SyCmd>, run_control:  Weak<AtomicBool>) -> Self
56    {
57        return Self{ run_control, tx };
58    }
59
60    pub(crate) 
61    fn consume(self) -> SyRes<UnboundedSender<SyCmd>>
62    {
63        if let None = self.run_control.upgrade()
64        {
65            throw_error!("Can not attach queue adapter to async, a queue thread is not running!");
66        }
67
68        return Ok(self.tx);
69    }
70}
71
72/// A wrappes for the data in the queue
73pub(crate) enum SyCmd
74{
75    /// A message to syslog server
76    Syslog
77    {
78        pri: Priority,
79        msg: String
80    },
81
82    /// A reuest to change logmask
83    Logmask
84    {
85        logmask: i32, 
86        loopback: oneshot::Sender<i32>,
87    },
88
89    /// A request to change identity
90    ChangeIdentity
91    {
92        identity: String,
93    },
94
95    /// Updates the tap settings
96    UpdateTap
97    {
98        tap_type: TapTypeData,
99        loopback: oneshot::Sender<SyRes<()>>,
100    },
101
102    ConnectLog
103    {
104        loopback: oneshot::Sender<SyRes<()>>
105    },
106
107    DisconnectLog
108    {
109        loopback: oneshot::Sender<SyRes<()>>
110    },
111
112    /// A request to rotate file or reconnect.
113    Reconnect,
114
115    /// A request to stop processing and quit
116    #[allow(unused)]
117    Stop,
118}
119
120impl SyCmd
121{
122    /// Construct a message with data to send.
123    pub(crate) 
124    fn form_syslog(pri: Priority, msg: String) -> Self
125    {
126        return 
127            Self::Syslog
128            {
129                pri, msg
130            };
131    }
132
133    pub(crate) 
134    fn form_connectlog() -> (Self, oneshot::Receiver<SyRes<()>>)
135    {
136        let (tx, rx) = oneshot::channel::<SyRes<()>>();
137
138        return 
139            (Self::ConnectLog{ loopback: tx }, rx);
140    }
141
142    pub(crate) 
143    fn form_disconnectlog() -> (Self, oneshot::Receiver<SyRes<()>>)
144    {
145        let (tx, rx) = oneshot::channel::<SyRes<()>>();
146
147        return 
148            (Self::DisconnectLog{ loopback: tx }, rx);
149    }
150
151    /// Constructs a message to make logmask with or without previous PRI.
152    /// 
153    /// # Arguments
154    /// 
155    /// * `logmask` - a new logmask
156    pub(crate) 
157    fn form_logmask(logmask: i32) -> (Self, oneshot::Receiver<i32>)
158    {
159        let (tx, rx) = oneshot::channel::<i32>();
160
161        return 
162            (Self::Logmask{ logmask, loopback: tx }, rx);
163    }
164
165    /// Constructs a message which should change the identity (appname) of the
166    /// instance.
167    pub(crate) 
168    fn form_change_ident(identity: String) -> Self
169    {
170        return 
171            Self::ChangeIdentity
172            {
173                identity: identity
174            };
175    }
176
177    /// Constructs a message which changes the destination of the log messages i.e
178    /// changing path of the dst file or address. The `new_tap_type` 
179    /// should be the same variant [TapTypeData] as previous.
180    pub(crate)
181    fn form_update_tap(new_tap_type: TapTypeData) -> (Self, oneshot::Receiver<SyRes<()>>)
182    {
183        let (tx, rx) = oneshot::channel::<SyRes<()>>();
184
185        return (
186            Self::UpdateTap  
187            { 
188                tap_type: new_tap_type,
189                loopback: tx
190            },
191            rx
192        );
193    }
194
195    /// Constructs a message to handle SIGHUP. This is usefull only when the instance
196    /// is writing directly into the file. Or just reconnect.
197    pub(crate) 
198    fn form_reconnect() -> Self
199    {
200        return Self::Reconnect;
201    }
202
203    /// Constructs a message to stop thread gracefully. After receiving this
204    /// message a thread will quit and all messages that would be sent after
205    /// this message will be cleared from queue and a new messages will not be
206    /// received.
207    #[allow(unused)]
208    pub(crate) 
209    fn form_stop() -> Self
210    {
211        return Self::Stop;
212    }
213}
214
215struct SyslogInternal<F: SyslogFormatter>
216{
217    /// A explicit stop flag
218    run_flag: Arc<AtomicBool>,
219
220    /// commands channel
221    tasks: UnboundedReceiver<SyCmd>,
222
223    /// An syslog assets
224    inner: SyncSyslogInternal<F>,
225}
226
227
228
229impl<F: SyslogFormatter> SyslogInternal<F>
230{
231    fn new(logstat: LogStat, ssi: SyncSyslogInternal<F>) -> SyRes<(Self, UnboundedSender<SyCmd>, Weak<AtomicBool>)>
232    {
233        // control flag
234        let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
235        let run_control = Arc::downgrade(&run_flag);
236
237        // creating queue for messages
238        let (sender, receiver) = unbounded_channel::<SyCmd>();
239
240        // creating internal syslog struct
241        let mut inst = 
242            SyslogInternal
243            {
244                run_flag: run_flag,
245                tasks: receiver,
246                inner: ssi
247            };
248
249        if logstat.contains(LogStat::LOG_NDELAY) == true
250        {
251            inst.inner.connectlog()?;
252        }
253
254        return Ok((inst, sender, run_control));
255    }
256
257    fn thread_worker(mut self)
258    {
259        loop
260        {
261            // self will be dropped as soon as thread will be stopped
262            if self.run_flag.load(Ordering::Relaxed) == false
263            {
264                // force leave
265                break;
266            }	
267
268            match self.tasks.blocking_recv()
269			{
270				Some(task) =>
271				{
272                    match task
273                    {
274                        SyCmd::Syslog{ pri, msg } =>
275                        {
276                            self.inner.vsyslog1(pri, &msg);
277                        },
278                        SyCmd::Logmask{ logmask, loopback } =>
279                        {
280                            let pri = self.inner.set_logmask(logmask);
281
282                            let _ = loopback.send(pri);
283                        },
284                        SyCmd::ChangeIdentity{ identity } =>
285                        {
286                            self.inner.set_logtag(identity, true);
287                        },
288
289                        SyCmd::UpdateTap{ tap_type, loopback } =>
290                        {
291                            if self.inner.get_tap_type().is_file() == true
292                            {
293                                let res = self.inner.update_tap_data(tap_type);
294                                
295                                if let Err(Err(e)) = loopback.send(res)
296                                {
297                                    self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
298                                }
299                            }
300                        },
301
302                        SyCmd::ConnectLog{ loopback} => 
303                        {
304                            if let Err(Err(e)) = loopback.send(self.inner.connectlog())
305                            {
306                                self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
307                            }
308                        },
309
310                        SyCmd::DisconnectLog{ loopback} => 
311                        {
312                            if let Err(Err(e)) = loopback.send(self.inner.disconnectlog())
313                            {
314                                self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
315                            }
316                        },
317
318                        SyCmd::Reconnect =>
319                        {
320                            if let Err(e) = self.inner.disconnectlog()
321                            {
322                                self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
323                            }
324                                
325                            if let Err(e) = self.inner.connectlog()
326                            {
327                                self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
328                            }
329                        },
330                        SyCmd::Stop =>
331                        {
332                            // ignore the rest
333                            break;
334                        }
335                    }
336                },
337                None =>
338                {
339                    break;
340                }
341            } // match
342
343        } // loop
344
345        return;
346    }
347}
348
349/// A common instance which describes the syslog state
350#[derive(Debug, Clone)]
351pub struct SyslogQueue<F: SyslogFormatter>
352{   
353    /// Control flag
354    run_control: Weak<AtomicBool>,
355
356    /// commands channel
357    tasks: UnboundedSender<SyCmd>,
358
359    /// process thread
360    thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
361
362    _p: PhantomData<F>,
363}
364
365unsafe impl<F: SyslogFormatter> Send for SyslogQueue<F> {}
366unsafe impl<F: SyslogFormatter> Sync for SyslogQueue<F> {}
367
368impl<F: SyslogFormatter> Drop for SyslogQueue<F>
369{
370    fn drop(&mut self) 
371    {
372        if let Some(ctrl) = self.run_control.upgrade()
373        {   
374            ctrl.store(false, Ordering::SeqCst);
375
376            if let Err(_e) = self.tasks.send(SyCmd::form_stop())
377            {
378
379            }
380
381            let join_handle = self.thread.lock().unwrap().take().unwrap();
382
383            let _ = join_handle.join();
384        }
385    }
386}
387
388impl<F: SyslogFormatter> SyslogQueue<F>
389{
390    pub 
391    fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap: TapTypeData) -> SyRes<SyslogQueue<F>>
392    {
393        // creating internal syslog struct
394
395        let (inst, sender, run_ctrl) = 
396            SyslogInternal
397                ::<F>
398                ::new(
399                    logstat,
400                    SyncSyslogInternal::<F>::new(ident, logstat, facility, net_tap)?
401                )?;
402        
403        
404        let thr_name: String = "syslog_queue/0".into();
405
406        // initiate a thread
407        let thread_hnd = 
408            thread::Builder::new()
409                .name(thr_name.clone())
410                .spawn(move || SyslogInternal::<F>::thread_worker(inst))
411                .map_err(|e| 
412                    map_error!("{} thread spawn failed. {}", thr_name, e)
413                )?;
414
415        // creating a syslog public struct instance
416        let ret = 
417            Self
418            {
419                run_control: run_ctrl,
420                tasks: sender,
421                thread: Arc::new(Mutex::new(Some(thread_hnd))),
422                _p: PhantomData::<F>,
423            };
424
425        return Ok(ret);
426    }
427}
428
429impl<F: SyslogFormatter> SyslogApi for SyslogQueue<F>
430{
431    fn connectlog(&mut self) -> SyRes<()>
432    {
433        let (sy_cmd, loopback) = 
434            SyCmd::form_connectlog();
435
436        self
437            .tasks
438            .send(sy_cmd)
439            .map_err(|e| map_error_code!(SendError, "connectlog() error: '{}'", e))?;
440
441        return 
442            loopback
443                .blocking_recv()
444                .map_err(|e| 
445                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
446                )?;
447    }
448
449    fn setlogmask(&self, logmask: i32) -> SyRes<i32> 
450    {
451        let (sy_cmd, loopback) = 
452            SyCmd::form_logmask(logmask);
453
454        self
455            .tasks
456            .send(sy_cmd)
457            .map_err(|e| 
458                map_error_code!(SendError, "closelog() error: '{}'", e)
459            )?;
460
461        return 
462            loopback
463                .blocking_recv()
464                .map_err(|e| 
465                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
466                );
467    }
468
469    fn closelog(&self) -> SyRes<()> 
470    {
471        let (sy_cmd, loopback) = 
472            SyCmd::form_disconnectlog();
473
474        // send stop
475        self
476            .tasks
477            .send(sy_cmd)
478            .map_err(|e| 
479                map_error_code!(SendError, "closelog() error: '{}'", e)
480            )?;
481
482        return 
483            loopback
484                .blocking_recv()
485                .map_err(|e| 
486                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
487                )?;
488    }
489
490    fn syslog(&self, pri: Priority, fmt: String) 
491    {
492        // even if the thread is in a process of termination, there is
493        // no need to sync access to the run_control field as even if
494        // syslog thread will terminate before someone push something on the
495        // queue, it will be left in the queue until the end of program's time.
496        
497        let sy_cmd = SyCmd::form_syslog(pri, fmt);
498
499        let _ = self.tasks.send(sy_cmd);
500
501        return;
502    }
503
504    fn vsyslog<S: AsRef<str>>(&self, pri: Priority, fmt: S) 
505    {
506        let sy_cmd = 
507            SyCmd::form_syslog(pri, fmt.as_ref().to_string());
508
509        let _ = self.tasks.send(sy_cmd);
510
511        return;
512    }
513
514    fn change_identity(&self, ident: &str) -> SyRes<()>
515    {
516        let sy_cmd = 
517            SyCmd::form_change_ident(ident.to_string());
518
519        return 
520            self
521                .tasks
522                .send(sy_cmd)
523                .map_err(|e| 
524                    map_error_code!(SendError, "change_identity() error: '{}'", e)
525                );
526    }
527
528    fn reconnect(&self) -> SyRes<()>
529    {
530        return 
531            self
532                .tasks
533                .send(SyCmd::form_reconnect())
534                .map_err(|e| 
535                    map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
536                );
537    }
538
539    fn update_tap_data(&self, tap_data: TapTypeData) -> SyRes<()>
540    {
541        let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
542
543        self
544            .tasks
545            .send(tap_data_cmd)
546            .map_err(|e| 
547                map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
548            )?;
549
550        return 
551            loopback
552                .blocking_recv()
553                .map_err(|e| 
554                    map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
555                )?;
556    }
557}
558
559impl<F: SyslogFormatter> SyslogQueue<F>
560{
561
562    pub
563    fn make_adapter(&self) -> SyslogQueueAdapter
564    {
565        return SyslogQueueAdapter::new(self.clone_task_channel(), self.clone_run_ctrl());
566    }
567
568    fn clone_run_ctrl(&self) -> Weak<AtomicBool>
569    {
570        return self.run_control.clone();
571    } 
572
573    fn clone_task_channel(&self) -> UnboundedSender<SyCmd>
574    {
575        return self.tasks.clone();
576    }
577}
578
579