1use std::borrow::Cow;
24use std::marker::PhantomData;
25use std::thread;
26use std::sync::{Arc, Mutex, Weak};
27use std::sync::atomic::{AtomicBool, Ordering};
28
29use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
30use tokio::sync::oneshot;
31
32
33use crate::formatters::SyslogFormatter;
34use crate::socket::TapTypeData;
35use crate::{map_error, map_error_code, throw_error};
36use crate::common::*;
37use crate::error::SyRes;
38
39
40use super::syslog_sync_internal::SyncSyslogInternal;
41use super::syslog_trait::SyslogApi;
42
43#[derive(Debug)]
44pub struct SyslogQueueAdapter
45{
46 run_control: Weak<AtomicBool>,
48
49 tx: UnboundedSender<SyCmd>
51}
52
53impl SyslogQueueAdapter
54{
55 fn new(tx: UnboundedSender<SyCmd>, run_control: Weak<AtomicBool>) -> Self
56 {
57 return Self{ run_control, tx };
58 }
59
60 pub(crate)
61 fn consume(self) -> SyRes<UnboundedSender<SyCmd>>
62 {
63 if let None = self.run_control.upgrade()
64 {
65 throw_error!("Can not attach queue adapter to async, a queue thread is not running!");
66 }
67
68 return Ok(self.tx);
69 }
70}
71
72pub(crate) enum SyCmd
74{
75 Syslog
77 {
78 pri: Priority,
79 msg: String
80 },
81
82 Logmask
84 {
85 logmask: i32,
86 loopback: oneshot::Sender<i32>,
87 },
88
89 ChangeIdentity
91 {
92 identity: String,
93 },
94
95 UpdateTap
97 {
98 tap_type: TapTypeData,
99 loopback: oneshot::Sender<SyRes<()>>,
100 },
101
102 ConnectLog
103 {
104 loopback: oneshot::Sender<SyRes<()>>
105 },
106
107 DisconnectLog
108 {
109 loopback: oneshot::Sender<SyRes<()>>
110 },
111
112 Reconnect,
114
115 #[allow(unused)]
117 Stop,
118}
119
120impl SyCmd
121{
122 pub(crate)
124 fn form_syslog(pri: Priority, msg: String) -> Self
125 {
126 return
127 Self::Syslog
128 {
129 pri, msg
130 };
131 }
132
133 pub(crate)
134 fn form_connectlog() -> (Self, oneshot::Receiver<SyRes<()>>)
135 {
136 let (tx, rx) = oneshot::channel::<SyRes<()>>();
137
138 return
139 (Self::ConnectLog{ loopback: tx }, rx);
140 }
141
142 pub(crate)
143 fn form_disconnectlog() -> (Self, oneshot::Receiver<SyRes<()>>)
144 {
145 let (tx, rx) = oneshot::channel::<SyRes<()>>();
146
147 return
148 (Self::DisconnectLog{ loopback: tx }, rx);
149 }
150
151 pub(crate)
157 fn form_logmask(logmask: i32) -> (Self, oneshot::Receiver<i32>)
158 {
159 let (tx, rx) = oneshot::channel::<i32>();
160
161 return
162 (Self::Logmask{ logmask, loopback: tx }, rx);
163 }
164
165 pub(crate)
168 fn form_change_ident(identity: String) -> Self
169 {
170 return
171 Self::ChangeIdentity
172 {
173 identity: identity
174 };
175 }
176
177 pub(crate)
181 fn form_update_tap(new_tap_type: TapTypeData) -> (Self, oneshot::Receiver<SyRes<()>>)
182 {
183 let (tx, rx) = oneshot::channel::<SyRes<()>>();
184
185 return (
186 Self::UpdateTap
187 {
188 tap_type: new_tap_type,
189 loopback: tx
190 },
191 rx
192 );
193 }
194
195 pub(crate)
198 fn form_reconnect() -> Self
199 {
200 return Self::Reconnect;
201 }
202
203 #[allow(unused)]
208 pub(crate)
209 fn form_stop() -> Self
210 {
211 return Self::Stop;
212 }
213}
214
215struct SyslogInternal<F: SyslogFormatter>
216{
217 run_flag: Arc<AtomicBool>,
219
220 tasks: UnboundedReceiver<SyCmd>,
222
223 inner: SyncSyslogInternal<F>,
225}
226
227
228
229impl<F: SyslogFormatter> SyslogInternal<F>
230{
231 fn new(logstat: LogStat, ssi: SyncSyslogInternal<F>) -> SyRes<(Self, UnboundedSender<SyCmd>, Weak<AtomicBool>)>
232 {
233 let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
235 let run_control = Arc::downgrade(&run_flag);
236
237 let (sender, receiver) = unbounded_channel::<SyCmd>();
239
240 let mut inst =
242 SyslogInternal
243 {
244 run_flag: run_flag,
245 tasks: receiver,
246 inner: ssi
247 };
248
249 if logstat.contains(LogStat::LOG_NDELAY) == true
250 {
251 inst.inner.connectlog()?;
252 }
253
254 return Ok((inst, sender, run_control));
255 }
256
257 fn thread_worker(mut self)
258 {
259 loop
260 {
261 if self.run_flag.load(Ordering::Relaxed) == false
263 {
264 break;
266 }
267
268 match self.tasks.blocking_recv()
269 {
270 Some(task) =>
271 {
272 match task
273 {
274 SyCmd::Syslog{ pri, msg } =>
275 {
276 self.inner.vsyslog1(pri, &msg);
277 },
278 SyCmd::Logmask{ logmask, loopback } =>
279 {
280 let pri = self.inner.set_logmask(logmask);
281
282 let _ = loopback.send(pri);
283 },
284 SyCmd::ChangeIdentity{ identity } =>
285 {
286 self.inner.set_logtag(identity, true);
287 },
288
289 SyCmd::UpdateTap{ tap_type, loopback } =>
290 {
291 if self.inner.get_tap_type().is_file() == true
292 {
293 let res = self.inner.update_tap_data(tap_type);
294
295 if let Err(Err(e)) = loopback.send(res)
296 {
297 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
298 }
299 }
300 },
301
302 SyCmd::ConnectLog{ loopback} =>
303 {
304 if let Err(Err(e)) = loopback.send(self.inner.connectlog())
305 {
306 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
307 }
308 },
309
310 SyCmd::DisconnectLog{ loopback} =>
311 {
312 if let Err(Err(e)) = loopback.send(self.inner.disconnectlog())
313 {
314 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
315 }
316 },
317
318 SyCmd::Reconnect =>
319 {
320 if let Err(e) = self.inner.disconnectlog()
321 {
322 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
323 }
324
325 if let Err(e) = self.inner.connectlog()
326 {
327 self.inner.send_to_stderr(&[Cow::Owned(e.to_string())]);
328 }
329 },
330 SyCmd::Stop =>
331 {
332 break;
334 }
335 }
336 },
337 None =>
338 {
339 break;
340 }
341 } } return;
346 }
347}
348
349#[derive(Debug, Clone)]
351pub struct SyslogQueue<F: SyslogFormatter>
352{
353 run_control: Weak<AtomicBool>,
355
356 tasks: UnboundedSender<SyCmd>,
358
359 thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
361
362 _p: PhantomData<F>,
363}
364
365unsafe impl<F: SyslogFormatter> Send for SyslogQueue<F> {}
366unsafe impl<F: SyslogFormatter> Sync for SyslogQueue<F> {}
367
368impl<F: SyslogFormatter> Drop for SyslogQueue<F>
369{
370 fn drop(&mut self)
371 {
372 if let Some(ctrl) = self.run_control.upgrade()
373 {
374 ctrl.store(false, Ordering::SeqCst);
375
376 if let Err(_e) = self.tasks.send(SyCmd::form_stop())
377 {
378
379 }
380
381 let join_handle = self.thread.lock().unwrap().take().unwrap();
382
383 let _ = join_handle.join();
384 }
385 }
386}
387
388impl<F: SyslogFormatter> SyslogQueue<F>
389{
390 pub
391 fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap: TapTypeData) -> SyRes<SyslogQueue<F>>
392 {
393 let (inst, sender, run_ctrl) =
396 SyslogInternal
397 ::<F>
398 ::new(
399 logstat,
400 SyncSyslogInternal::<F>::new(ident, logstat, facility, net_tap)?
401 )?;
402
403
404 let thr_name: String = "syslog_queue/0".into();
405
406 let thread_hnd =
408 thread::Builder::new()
409 .name(thr_name.clone())
410 .spawn(move || SyslogInternal::<F>::thread_worker(inst))
411 .map_err(|e|
412 map_error!("{} thread spawn failed. {}", thr_name, e)
413 )?;
414
415 let ret =
417 Self
418 {
419 run_control: run_ctrl,
420 tasks: sender,
421 thread: Arc::new(Mutex::new(Some(thread_hnd))),
422 _p: PhantomData::<F>,
423 };
424
425 return Ok(ret);
426 }
427}
428
429impl<F: SyslogFormatter> SyslogApi for SyslogQueue<F>
430{
431 fn connectlog(&mut self) -> SyRes<()>
432 {
433 let (sy_cmd, loopback) =
434 SyCmd::form_connectlog();
435
436 self
437 .tasks
438 .send(sy_cmd)
439 .map_err(|e| map_error_code!(SendError, "connectlog() error: '{}'", e))?;
440
441 return
442 loopback
443 .blocking_recv()
444 .map_err(|e|
445 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
446 )?;
447 }
448
449 fn setlogmask(&self, logmask: i32) -> SyRes<i32>
450 {
451 let (sy_cmd, loopback) =
452 SyCmd::form_logmask(logmask);
453
454 self
455 .tasks
456 .send(sy_cmd)
457 .map_err(|e|
458 map_error_code!(SendError, "closelog() error: '{}'", e)
459 )?;
460
461 return
462 loopback
463 .blocking_recv()
464 .map_err(|e|
465 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
466 );
467 }
468
469 fn closelog(&self) -> SyRes<()>
470 {
471 let (sy_cmd, loopback) =
472 SyCmd::form_disconnectlog();
473
474 self
476 .tasks
477 .send(sy_cmd)
478 .map_err(|e|
479 map_error_code!(SendError, "closelog() error: '{}'", e)
480 )?;
481
482 return
483 loopback
484 .blocking_recv()
485 .map_err(|e|
486 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
487 )?;
488 }
489
490 fn syslog(&self, pri: Priority, fmt: String)
491 {
492 let sy_cmd = SyCmd::form_syslog(pri, fmt);
498
499 let _ = self.tasks.send(sy_cmd);
500
501 return;
502 }
503
504 fn vsyslog<S: AsRef<str>>(&self, pri: Priority, fmt: S)
505 {
506 let sy_cmd =
507 SyCmd::form_syslog(pri, fmt.as_ref().to_string());
508
509 let _ = self.tasks.send(sy_cmd);
510
511 return;
512 }
513
514 fn change_identity(&self, ident: &str) -> SyRes<()>
515 {
516 let sy_cmd =
517 SyCmd::form_change_ident(ident.to_string());
518
519 return
520 self
521 .tasks
522 .send(sy_cmd)
523 .map_err(|e|
524 map_error_code!(SendError, "change_identity() error: '{}'", e)
525 );
526 }
527
528 fn reconnect(&self) -> SyRes<()>
529 {
530 return
531 self
532 .tasks
533 .send(SyCmd::form_reconnect())
534 .map_err(|e|
535 map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
536 );
537 }
538
539 fn update_tap_data(&self, tap_data: TapTypeData) -> SyRes<()>
540 {
541 let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
542
543 self
544 .tasks
545 .send(tap_data_cmd)
546 .map_err(|e|
547 map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
548 )?;
549
550 return
551 loopback
552 .blocking_recv()
553 .map_err(|e|
554 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
555 )?;
556 }
557}
558
559impl<F: SyslogFormatter> SyslogQueue<F>
560{
561
562 pub
563 fn make_adapter(&self) -> SyslogQueueAdapter
564 {
565 return SyslogQueueAdapter::new(self.clone_task_channel(), self.clone_run_ctrl());
566 }
567
568 fn clone_run_ctrl(&self) -> Weak<AtomicBool>
569 {
570 return self.run_control.clone();
571 }
572
573 fn clone_task_channel(&self) -> UnboundedSender<SyCmd>
574 {
575 return self.tasks.clone();
576 }
577}
578
579