1use std::borrow::Cow;
16use std::marker::PhantomData;
17use std::{fmt, thread};
18use std::sync::{Arc, Mutex, Weak};
19use std::sync::atomic::{AtomicBool, Ordering};
20
21use crate::formatters::{DefaultSyslogFormatter, SyslogFormatter};
22use crate::{map_error, SyslogDestination, SyslogLocal};
23use crate::common::*;
24use crate::error::SyRes;
25
26use super::{syslog_trait::SyslogApi, StreamableSyslogApi, syslog_sync_internal::SyncSyslogInternal, SyslogStream, StreamableSyslog};
27
28
29pub enum SyCmd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
31{
32 Syslog
34 {
35 pri: Priority,
36 msg: F
37 },
38
39 Logmask
41 {
42 logmask: i32,
43 loopback: S::OneShotChannelSnd<i32>,
44 },
45
46 ChangeIdentity
48 {
49 identity: String,
50 },
51
52 UpdateTap
54 {
55 tap_type: D,loopback: S::OneShotChannelSnd<SyRes<()>>,
57 },
58
59 ConnectLog
60 {
61 loopback: S::OneShotChannelSnd<SyRes<()>>
62 },
63
64 DisconnectLog
65 {
66 loopback: S::OneShotChannelSnd<SyRes<()>>
67 },
68
69 Reconnect,
71
72 #[allow(unused)]
74 Stop,
75}
76
77
78impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyCmd<F, D, S>
79{
80 pub(crate)
82 fn form_syslog(pri: Priority, msg: F) -> Self
83 {
84 return
85 Self::Syslog
86 {
87 pri, msg
88 };
89 }
90
91 pub(crate)
92 fn form_connectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
93 {
94 let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
95
96 return
97 (Self::ConnectLog{ loopback: tx }, rx);
98 }
99
100 pub(crate)
101 fn form_disconnectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
102 {
103 let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
104
105 return
106 (Self::DisconnectLog{ loopback: tx }, rx);
107 }
108
109 pub(crate)
115 fn form_logmask(logmask: i32) -> (Self, S::OneShotChannelRcv<i32>)
116 {
117 let (tx, rx) = S::create_oneshot_channel::<i32>();
118
119 return
120 (Self::Logmask{ logmask, loopback: tx }, rx);
121 }
122
123 pub(crate)
126 fn form_change_ident(identity: String) -> Self
127 {
128 return
129 Self::ChangeIdentity
130 {
131 identity: identity
132 };
133 }
134
135 pub(crate)
139 fn form_update_tap(new_tap_type: D) -> (Self, S::OneShotChannelRcv<SyRes<()>>)
140 {
141 let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
142
143 return (
144 Self::UpdateTap
145 {
146 tap_type: new_tap_type,
147 loopback: tx
148 },
149 rx
150 );
151 }
152
153 pub(crate)
156 fn form_reconnect() -> Self
157 {
158 return Self::Reconnect;
159 }
160
161 #[allow(unused)]
166 pub(crate)
167 fn form_stop() -> Self
168 {
169 return Self::Stop;
170 }
171}
172
173struct SyslogInternal<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
175{
176 run_flag: Arc<AtomicBool>,
178
179 tasks: S::ChannelRcv,
181
182 inner: SyncSyslogInternal<F, D>,
184}
185
186
187
188impl<F, D, S> SyslogInternal<F, D, S>
189where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
190{
191 fn new(logstat: LogStat, ssi: SyncSyslogInternal<F, D>) -> SyRes<(Self, S::ChannelSnd, Weak<AtomicBool>)>
192 {
193 let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
195 let run_control = Arc::downgrade(&run_flag);
196
197 let (sender, receiver) = S::create_channel();
199
200 let mut inst =
202 SyslogInternal
203 {
204 run_flag: run_flag,
205 tasks: receiver,
206 inner: ssi,
207 };
208
209 if logstat.contains(LogStat::LOG_NDELAY) == true
210 {
211 inst.inner.connectlog()?;
212 }
213
214 return Ok((inst, sender, run_control));
215 }
216
217 fn thread_worker(mut self)
218 {
219 loop
220 {
221 if self.run_flag.load(Ordering::Relaxed) == false
223 {
224 break;
226 }
227
228 match self.tasks.q_recv_blocking()
229 {
230 Some(task) =>
231 {
232 match task
233 {
234 SyCmd::Syslog{ pri, msg } =>
235 {
236 self.inner.vsyslog1(pri, &msg);
237 },
238 SyCmd::Logmask{ logmask, loopback } =>
239 {
240 let pri = self.inner.set_logmask(logmask);
241
242 let _ = loopback.send_once_blocking(pri);
243 },
244 SyCmd::ChangeIdentity{ identity } =>
245 {
246 self.inner.set_logtag(identity, true);
247 },
248
249 SyCmd::UpdateTap{ tap_type, loopback } =>
250 {
251 if self.inner.get_taptype().is_file() == true
252 {
253 let res = self.inner.update_tap_data(tap_type);
254
255 if let Err(Err(e)) = loopback.send_once_blocking(res)
256 {
257 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
258 }
259 }
260 },
261
262 SyCmd::ConnectLog{ loopback} =>
263 {
264 if let Err(Err(e)) = loopback.send_once_blocking(self.inner.connectlog())
265 {
266 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
267 }
268 },
269
270 SyCmd::DisconnectLog{ loopback} =>
271 {
272 if let Err(Err(e)) = loopback.send_once_blocking(self.inner.disconnectlog())
273 {
274 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
275 }
276 },
277
278 SyCmd::Reconnect =>
279 {
280 if let Err(e) = self.inner.disconnectlog()
281 {
282 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
283 }
284
285 if let Err(e) = self.inner.connectlog()
286 {
287 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
288 }
289 },
290 SyCmd::Stop =>
291 {
292 break;
294 }
295 }
296 },
297 None =>
298 {
299 break;
300 }
301 } } return;
306 }
307}
308
309pub trait SyslogQueueChanRcv<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Send
312{
313 fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>;
315}
316
317#[allow(async_fn_in_trait)]
320pub trait SyslogQueueChanSnd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Clone
321{
322 fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>;
324
325 async fn q_send(&self, _msg: SyCmd<F, D, S>) -> SyRes<()>
327 {
328 crate::throw_error!("async is not availabe here");
329 }
330}
331
332#[allow(async_fn_in_trait)]
335pub trait SyslogQueueOneChanRcv<C>
336{
337 fn recv_once_blocking(self) -> SyRes<C>;
339
340 async fn recv_once(self) -> SyRes<C> where Self: Sized
343 {
344 crate::throw_error!("async is not availabe here");
345 }
346}
347
348pub trait SyslogQueueOneChanSnd<C: Send>: fmt::Debug + Send
351{
352 fn send_once_blocking(self, data: C) -> Result<(), C>;
354}
355
356pub trait SyslogQueueChannel<F: SyslogFormatter, D: SyslogDestination>: fmt::Debug + Send + Clone + 'static
360{
361 type ChannelSnd: SyslogQueueChanSnd<F, D, Self>;
363
364 type ChannelRcv: SyslogQueueChanRcv<F, D, Self>;
366
367 type OneShotChannelSnd<C: Send + fmt::Debug>: SyslogQueueOneChanSnd<C>;
369
370 type OneShotChannelRcv<C>: SyslogQueueOneChanRcv<C>;
372
373 fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv);
375
376 fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>);
378}
379
380
381
382#[derive(Debug, Clone)]
401pub struct SyslogQueue<S, F = DefaultSyslogFormatter, D = SyslogLocal>
402where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
403{
404 run_control: Weak<AtomicBool>,
406
407 pub(crate) tasks: S::ChannelSnd,thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
412
413 _p: PhantomData<F>,
415
416 _p2: PhantomData<D>,
418}
419
420unsafe impl<F, D, S> Send for SyslogQueue<S, F, D>
421where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
422{}
423
424unsafe impl<F, D, S> Sync for SyslogQueue<S, F, D>
425where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
426{}
427
428impl<F, D, S> Drop for SyslogQueue<S, F, D>
429where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
430{
431 fn drop(&mut self)
432 {
433 if let Some(ctrl) = self.run_control.upgrade()
434 {
435 ctrl.store(false, Ordering::SeqCst);
436
437 if let Err(_e) = self.tasks.q_send_blocking(SyCmd::form_stop())
438 {
439
440 }
441
442 let join_handle = self.thread.lock().unwrap().take().unwrap();
443
444 let _ = join_handle.join();
445 }
446 }
447}
448
449impl<S: SyslogQueueChannel<DefaultSyslogFormatter, SyslogLocal>> SyslogQueue<S>
450{
451
452 pub
453 fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap: SyslogLocal) -> SyRes<Self>
454 {
455 let (inst, sender, run_ctrl) =
458 SyslogInternal
459 ::<DefaultSyslogFormatter, SyslogLocal, S>
460 ::new(
461 logstat,
462 SyncSyslogInternal::<DefaultSyslogFormatter, _>::new(ident, logstat, facility, net_tap)?
463 )?;
464
465
466 let thr_name: String = "syslog_queue/0".into();
467
468 let thread_hnd =
470 thread::Builder::new()
471 .name(thr_name.clone())
472 .spawn(move || SyslogInternal::<DefaultSyslogFormatter, SyslogLocal, S>::thread_worker(inst))
473 .map_err(|e|
474 map_error!("{} thread spawn failed. {}", thr_name, e)
475 )?;
476
477 let ret =
479 Self
480 {
481 run_control: run_ctrl,
482 tasks: sender,
483 thread: Arc::new(Mutex::new(Some(thread_hnd))),
484 _p: PhantomData::<DefaultSyslogFormatter>,
485 _p2: PhantomData::<SyslogLocal>,
486 };
487
488 return Ok(ret);
489 }
490}
491
492impl<F, D, S> SyslogQueue<S, F, D>
493where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
494{
495 pub
496 fn openlog_with(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap: D) -> SyRes<SyslogQueue<S, F, D>>
497 {
498 let (inst, sender, run_ctrl) =
501 SyslogInternal
502 ::<F, D, S>
503 ::new(
504 logstat,
505 SyncSyslogInternal::<F, D>::new(ident, logstat, facility, net_tap)?
506 )?;
507
508
509 let thr_name: String = "syslog_queue/0".into();
510
511 let thread_hnd =
513 thread::Builder::new()
514 .name(thr_name.clone())
515 .spawn(move || SyslogInternal::<F, D, S>::thread_worker(inst))
516 .map_err(|e|
517 map_error!("{} thread spawn failed. {}", thr_name, e)
518 )?;
519
520 let ret =
522 Self
523 {
524 run_control: run_ctrl,
525 tasks: sender,
526 thread: Arc::new(Mutex::new(Some(thread_hnd))),
527 _p: PhantomData::<F>,
528 _p2: PhantomData::<D>,
529 };
530
531 return Ok(ret);
532 }
533}
534
535impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> StreamableSyslogApi<D, F, SyslogQueue<S, F, D>>
536for SyslogQueue<S, F, D>
537{
538 fn make_stream(&self, pri: Priority) -> Box<dyn SyslogStream>
539 {
540 let inst =
541 StreamableSyslog
542 {
543 inner: self.clone(),
544 pri: pri,
545 _p: PhantomData::<F>,
546 _p1: PhantomData::<D>,
547
548 };
549
550 return Box::new(inst);
551 }
552}
553
554impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogApi<F, D>
555for SyslogQueue<S, F, D>
556{
557 fn connectlog(&self) -> SyRes<()>
558 {
559 let (sy_cmd, loopback) =
560 SyCmd::form_connectlog();
561
562 self.tasks.q_send_blocking(sy_cmd)?;
563
564 return
565 loopback
566 .recv_once_blocking()?;
567 }
568
569 fn setlogmask(&self, logmask: i32) -> SyRes<i32>
570 {
571 let (sy_cmd, loopback) =
572 SyCmd::form_logmask(logmask);
573
574 self.tasks.q_send_blocking(sy_cmd)?;
575
576 return
577 loopback
578 .recv_once_blocking();
579 }
580
581 fn closelog(&self) -> SyRes<()>
582 {
583 let (sy_cmd, loopback) =
584 SyCmd::form_disconnectlog();
585
586 self.tasks.q_send_blocking(sy_cmd)?;
588
589 return
590 loopback
591 .recv_once_blocking()?;
592 }
593
594 fn syslog(&self, pri: Priority, fmt: F)
595 {
596 let sy_cmd = SyCmd::form_syslog(pri, fmt);
602
603 let _ = self.tasks.q_send_blocking(sy_cmd);
604 return;
607 }
608
609 fn change_identity(&self, ident: &str) -> SyRes<()>
610 {
611 let sy_cmd =
612 SyCmd::form_change_ident(ident.to_string());
613
614 return
615 self.tasks.q_send_blocking(sy_cmd);
616 }
617
618 fn reconnect(&self) -> SyRes<()>
619 {
620 return
621 self.tasks.q_send_blocking(SyCmd::form_reconnect());
622 }
623
624 fn update_tap_data(&self, tap_data: D) -> SyRes<()>
625 {
626 let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
627
628 self.tasks.q_send_blocking(tap_data_cmd)?;
629
630 return
631 loopback
632 .recv_once_blocking()?;
633 }
634}
635
636#[cfg(all(feature = "build_with_queue", feature = "async_enabled"))]
638pub mod syslog_async_queue
639{
640 use crate::error::SyRes;
641 use crate::sy_sync_queue::{SyCmd, SyslogQueueChanSnd, SyslogQueueChannel, SyslogQueueOneChanRcv};
642 use crate::Priority;
643 use crate::{formatters::SyslogFormatter, SyslogDestination, SyslogQueue};
644 use crate::a_sync::syslog_trait::AsyncSyslogQueueApi;
645
646 impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> AsyncSyslogQueueApi<F, D>
647 for SyslogQueue<S, F, D>
648 {
649 async
650 fn a_connectlog(&mut self) -> SyRes<()>
651 {
652 let (sy_cmd, loopback) =
653 SyCmd::form_connectlog();
654
655 self.tasks.q_send(sy_cmd).await?;
656
657 return
658 loopback
659 .recv_once()
660 .await?;
661 }
662
663 async
680 fn a_setlogmask(&self, logmask: i32) -> SyRes<i32>
681 {
682 let (sy_cmd, loopback) =
683 SyCmd::form_logmask(logmask);
684
685 self.tasks.q_send(sy_cmd).await?;
686
687 return
688 loopback
689 .recv_once()
690 .await;
691 }
692
693 async
695 fn a_closelog(&self) -> SyRes<()>
696 {
697 let (sy_cmd, loopback) =
698 SyCmd::form_disconnectlog();
699
700 self.tasks.q_send(sy_cmd).await?;
702
703 return
704 loopback
705 .recv_once()
706 .await?;
707 }
708
709 #[inline]
718 async
719 fn a_syslog(&self, pri: Priority, fmt: F)
720 {
721 let sy_cmd = SyCmd::form_syslog(pri, fmt);
722
723 let _ = self.tasks.q_send(sy_cmd).await;
724
725 return;
726 }
727
728
729 async
739 fn a_reconnect(&self) -> SyRes<()>
740 {
741 return
742 self.tasks.q_send(SyCmd::form_reconnect()).await;
743 }
744
745 async
746 fn a_change_identity(&self, ident: &str) -> SyRes<()>
747 {
748 let sy_cmd =
749 SyCmd::form_change_ident(ident.to_string());
750
751 return
752 self.tasks.q_send(sy_cmd).await;
753 }
754
755 async
775 fn a_update_tap_data(&self, new_tap: D) -> SyRes<()>
776 {
777 let (tap_data_cmd, loopback) = SyCmd::form_update_tap(new_tap);
778
779 self.tasks.q_send(tap_data_cmd).await?;
780
781 return
782 loopback
783 .recv_once()
784 .await?;
785 }
786 }
787}
788
789
790