use std::borrow::Cow;
use std::marker::PhantomData;
use std::{fmt, thread};
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::formatters::{DefaultSyslogFormatter, SyslogFormatter};
use crate::sync::syslog_sync_internal::SyslogSocketLockless;
use crate::sync::{LogItems, SyStream};
use crate::sync::DefaultQueueAdapter;
use crate::{map_error, SyStreamApi, SyncSyslog, SyslogDestination};
#[cfg(target_family = "unix")]
use crate::SyslogLocal;
#[cfg(target_family = "windows")]
use crate::WindowsEvent;
#[cfg(target_family = "unix")]
pub type DefaultLocalSyslogDestination = SyslogLocal;
#[cfg(target_family = "windows")]
pub type DefaultLocalSyslogDestination = WindowsEvent;
use crate::common::*;
use crate::error::SyRes;
use super::syslog_trait::SyslogApi;
pub enum SyCmd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
{
Syslog
{
pri: Priority,
msg: F
},
Logmask
{
logmask: i32,
loopback: S::OneShotChannelSnd<i32>,
},
ChangeIdentity
{
identity: Option<String>,
},
UpdateTap
{
tap_type: D, loopback: S::OneShotChannelSnd<SyRes<()>>,
},
ConnectLog
{
loopback: S::OneShotChannelSnd<SyRes<()>>
},
DisconnectLog
{
loopback: S::OneShotChannelSnd<SyRes<()>>
},
Reconnect,
#[allow(unused)]
Stop,
}
impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyCmd<F, D, S>
{
pub(crate)
fn form_syslog(pri: Priority, msg: F) -> Self
{
return
Self::Syslog
{
pri, msg
};
}
pub(crate)
fn form_connectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
{
let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
return
(Self::ConnectLog{ loopback: tx }, rx);
}
pub(crate)
fn form_disconnectlog() -> (Self, S::OneShotChannelRcv<SyRes<()>>)
{
let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
return
(Self::DisconnectLog{ loopback: tx }, rx);
}
pub(crate)
fn form_logmask(logmask: i32) -> (Self, S::OneShotChannelRcv<i32>)
{
let (tx, rx) = S::create_oneshot_channel::<i32>();
return
(Self::Logmask{ logmask, loopback: tx }, rx);
}
pub(crate)
fn form_change_ident(identity: Option<String>) -> Self
{
return
Self::ChangeIdentity
{
identity: identity
};
}
pub(crate)
fn form_update_tap(new_tap_type: D) -> (Self, S::OneShotChannelRcv<SyRes<()>>)
{
let (tx, rx) = S::create_oneshot_channel::<SyRes<()>>();
return (
Self::UpdateTap
{
tap_type: new_tap_type,
loopback: tx
},
rx
);
}
pub(crate)
fn form_reconnect() -> Self
{
return Self::Reconnect;
}
#[allow(unused)]
pub(crate)
fn form_stop() -> Self
{
return Self::Stop;
}
}
struct SyslogInternal<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>
{
run_flag: Arc<AtomicBool>,
tasks: S::ChannelRcv,
log_items: LogItems,
socket: SyslogSocketLockless<D>,
}
impl<F, D, S> SyslogInternal<F, D, S>
where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
{
fn new(log_items: LogItems, socket: SyslogSocketLockless<D>) -> SyRes<(Self, S::ChannelSnd, Weak<AtomicBool>)>
{
let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
let run_control = Arc::downgrade(&run_flag);
let (sender, receiver) = S::create_channel();
let mut inst =
SyslogInternal
{
run_flag: run_flag,
tasks: receiver,
log_items: log_items,
socket: socket
};
if inst.log_items.logstat.contains(LogStat::LOG_NDELAY) == true
{
inst.socket.connectlog()?;
}
return Ok((inst, sender, run_control));
}
fn thread_worker(mut self)
{
loop
{
if self.run_flag.load(Ordering::Relaxed) == false
{
break;
}
match self.tasks.q_recv_blocking()
{
Some(task) =>
{
match task
{
SyCmd::Syslog{ pri, msg } =>
{
let Some(formatted) = self.log_items.vsyslog1_msg::<F, D>(pri, &msg)
else { continue };
self.socket.vsyslog1(formatted.1, formatted.0);
},
SyCmd::Logmask{ logmask, loopback } =>
{
let pri = self.log_items.set_logmask(logmask);
let _ = loopback.send_once_blocking(pri);
},
SyCmd::ChangeIdentity{ identity } =>
{
self.log_items.set_identity(identity.as_ref().map(|v| v.as_str()));
},
SyCmd::UpdateTap{ tap_type, loopback } =>
{
let res = self.socket.update_tap_data(tap_type);
if let Err(Err(e)) = loopback.send_once_blocking(res)
{
self.log_items.logstat.send_to_stderr(&e.to_string());
}
},
SyCmd::ConnectLog{ loopback} =>
{
if let Err(Err(e)) = loopback.send_once_blocking(self.socket.connectlog())
{
self.log_items.logstat.send_to_stderr(&e.to_string());
}
},
SyCmd::DisconnectLog{ loopback} =>
{
if let Err(Err(e)) = loopback.send_once_blocking(self.socket.disconnectlog())
{
self.log_items.logstat.send_to_stderr(&e.to_string());
}
},
SyCmd::Reconnect =>
{
if let Err(e) = self.socket.disconnectlog()
{
self.log_items.logstat.send_to_stderr(&e.to_string());
}
if let Err(e) = self.socket.connectlog()
{
self.log_items.logstat.send_to_stderr(&e.to_string());
}
},
SyCmd::Stop =>
{
break;
}
}
},
None =>
{
break;
}
}
}
return;
}
}
pub trait SyslogQueueChanRcv<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Send
{
fn q_recv_blocking(&mut self) -> Option<SyCmd<F, D, S>>;
}
#[allow(async_fn_in_trait)]
pub trait SyslogQueueChanSnd<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>>: fmt::Debug + Clone
{
fn q_send_blocking(&self, msg: SyCmd<F, D, S>) -> SyRes<()>;
async fn q_send(&self, _msg: SyCmd<F, D, S>) -> SyRes<()>
{
crate::throw_error!("async is not availabe here");
}
}
#[allow(async_fn_in_trait)]
pub trait SyslogQueueOneChanRcv<C>
{
fn recv_once_blocking(self) -> SyRes<C>;
async fn recv_once(self) -> SyRes<C> where Self: Sized
{
crate::throw_error!("async is not availabe here");
}
}
pub trait SyslogQueueOneChanSnd<C: Send>: fmt::Debug + Send
{
fn send_once_blocking(self, data: C) -> Result<(), C>;
}
pub trait SyslogQueueChannel<F: SyslogFormatter, D: SyslogDestination>: fmt::Debug + Send + Clone + 'static
{
const ADAPTER_NAME: &'static str;
type ChannelSnd: SyslogQueueChanSnd<F, D, Self>;
type ChannelRcv: SyslogQueueChanRcv<F, D, Self>;
type OneShotChannelSnd<C: Send + fmt::Debug>: SyslogQueueOneChanSnd<C>;
type OneShotChannelRcv<C>: SyslogQueueOneChanRcv<C>;
fn create_channel() -> (Self::ChannelSnd, Self::ChannelRcv);
fn create_oneshot_channel<C: Send + fmt::Debug>() -> (Self::OneShotChannelSnd<C>, Self::OneShotChannelRcv<C>);
}
#[derive(Debug, Clone)]
pub struct QueuedSyslog<S = DefaultQueueAdapter, F = DefaultSyslogFormatter, D = DefaultLocalSyslogDestination>
where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
{
run_control: Weak<AtomicBool>,
pub(crate) tasks: S::ChannelSnd,
thread: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
_p: PhantomData<F>,
_p2: PhantomData<D>,
}
unsafe impl<F, D, S> Send for QueuedSyslog<S, F, D>
where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
{}
impl<F, D, S> Drop for QueuedSyslog<S, F, D>
where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
{
fn drop(&mut self)
{
if let Some(ctrl) = self.run_control.upgrade()
{
ctrl.store(false, Ordering::SeqCst);
if let Err(_e) = self.tasks.q_send_blocking(SyCmd::form_stop())
{
}
let join_handle = self.thread.lock().unwrap().take().unwrap();
let _ = join_handle.join();
}
}
}
impl QueuedSyslog
{
pub
fn openlog(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: DefaultLocalSyslogDestination) -> SyRes<Self>
{
let log_items =
LogItems::new(ident, 0xff, logstat, facility);
let stream =
SyslogSocketLockless::<DefaultLocalSyslogDestination>::new(logstat, net_tap_prov)?;
let (inst, sender, run_ctrl) =
SyslogInternal
::<DefaultSyslogFormatter, DefaultLocalSyslogDestination, DefaultQueueAdapter>
::new(log_items, stream)?;
let thr_name: String = "syslog_queue/0".into();
let thread_hnd =
thread::Builder::new()
.name(thr_name.clone())
.spawn(move ||
SyslogInternal
::<DefaultSyslogFormatter, DefaultLocalSyslogDestination, DefaultQueueAdapter>
::thread_worker(inst)
)
.map_err(|e|
map_error!("{} thread spawn failed. {}", thr_name, e)
)?;
let ret =
Self
{
run_control: run_ctrl,
tasks: sender,
thread: Arc::new(Mutex::new(Some(thread_hnd))),
_p: PhantomData,
_p2: PhantomData
};
return Ok(ret);
}
}
impl<F, D, S> QueuedSyslog<S, F, D>
where F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>
{
pub
fn openlog_with(ident: Option<&str>, logstat: LogStat, facility: LogFacility, net_tap_prov: D) -> SyRes<QueuedSyslog<S, F, D>>
{
let log_items =
LogItems::new(ident, 0xff, logstat, facility);
let stream =
SyslogSocketLockless::<D>::new(logstat, net_tap_prov)?;
let (inst, sender, run_ctrl) =
SyslogInternal::<F, D, S>::new(log_items, stream)?;
let thr_name: String = "syslog_queue/0".into();
let thread_hnd =
thread::Builder::new()
.name(thr_name.clone())
.spawn(move || SyslogInternal::<F, D, S>::thread_worker(inst))
.map_err(|e|
map_error!("{} thread spawn failed. {}", thr_name, e)
)?;
let ret =
Self
{
run_control: run_ctrl,
tasks: sender,
thread: Arc::new(Mutex::new(Some(thread_hnd))),
_p: PhantomData::<F>,
_p2: PhantomData::<D>,
};
return Ok(ret);
}
}
impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyslogApi<F, D>
for QueuedSyslog<S, F, D>
{
fn connectlog(&self) -> SyRes<()>
{
let (sy_cmd, loopback) =
SyCmd::form_connectlog();
self.tasks.q_send_blocking(sy_cmd)?;
return
loopback
.recv_once_blocking()?;
}
fn setlogmask(&self, logmask: i32) -> SyRes<i32>
{
let (sy_cmd, loopback) =
SyCmd::form_logmask(logmask);
self.tasks.q_send_blocking(sy_cmd)?;
return
loopback
.recv_once_blocking();
}
fn closelog(&self) -> SyRes<()>
{
let (sy_cmd, loopback) =
SyCmd::form_disconnectlog();
self.tasks.q_send_blocking(sy_cmd)?;
return
loopback
.recv_once_blocking()?;
}
fn syslog(&self, pri: Priority, fmt: F)
{
let sy_cmd = SyCmd::form_syslog(pri, fmt);
let _ = self.tasks.q_send_blocking(sy_cmd);
return;
}
fn change_identity(&self, ident: Option<&str>) -> SyRes<()>
{
let sy_cmd =
SyCmd::form_change_ident(ident.map(|v| v.to_string()));
return
self.tasks.q_send_blocking(sy_cmd);
}
fn reconnect(&self) -> SyRes<()>
{
return
self.tasks.q_send_blocking(SyCmd::form_reconnect());
}
fn update_tap_data(&self, tap_data: D) -> SyRes<()>
{
let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
self.tasks.q_send_blocking(tap_data_cmd)?;
return
loopback
.recv_once_blocking()?;
}
}
impl<'stream, F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> SyStreamApi<'stream, F, D, QueuedSyslog<S, F, D>>
for QueuedSyslog<S, F, D>
{
fn stream(&'stream self, pri: Priority) -> SyStream<'stream, D, F, QueuedSyslog<S, F, D>>
{
return
SyStream
{
inner: self,
pri: pri,
_p: PhantomData,
_p1: PhantomData
};
}
}
#[cfg(all(feature = "build_with_queue", feature = "async_enabled"))]
pub mod syslog_async_queue
{
use crate::error::SyRes;
use crate::sy_sync_queue::{SyCmd, SyslogQueueChanSnd, SyslogQueueChannel, SyslogQueueOneChanRcv};
use crate::Priority;
use crate::{formatters::SyslogFormatter, SyslogDestination, QueuedSyslog};
use crate::a_sync::syslog_trait::AsyncSyslogQueueApi;
impl<F: SyslogFormatter, D: SyslogDestination, S: SyslogQueueChannel<F, D>> AsyncSyslogQueueApi<F, D>
for QueuedSyslog<S, F, D>
{
async
fn a_connectlog(&mut self) -> SyRes<()>
{
let (sy_cmd, loopback) =
SyCmd::form_connectlog();
self.tasks.q_send(sy_cmd).await?;
return
loopback
.recv_once()
.await?;
}
async
fn a_setlogmask(&self, logmask: i32) -> SyRes<i32>
{
let (sy_cmd, loopback) =
SyCmd::form_logmask(logmask);
self.tasks.q_send(sy_cmd).await?;
return
loopback
.recv_once()
.await;
}
async
fn a_closelog(&self) -> SyRes<()>
{
let (sy_cmd, loopback) =
SyCmd::form_disconnectlog();
self.tasks.q_send(sy_cmd).await?;
return
loopback
.recv_once()
.await?;
}
#[inline]
async
fn a_syslog(&self, pri: Priority, fmt: F)
{
let sy_cmd = SyCmd::form_syslog(pri, fmt);
let _ = self.tasks.q_send(sy_cmd).await;
return;
}
async
fn a_reconnect(&self) -> SyRes<()>
{
return
self.tasks.q_send(SyCmd::form_reconnect()).await;
}
async
fn a_change_identity(&self, ident: &str) -> SyRes<()>
{
let sy_cmd =
SyCmd::form_change_ident(Some(ident.to_string()));
return
self.tasks.q_send(sy_cmd).await;
}
async
fn a_update_tap_data(&self, new_tap: D) -> SyRes<()>
{
let (tap_data_cmd, loopback) = SyCmd::form_update_tap(new_tap);
self.tasks.q_send(tap_data_cmd).await?;
return
loopback
.recv_once()
.await?;
}
}
}
#[cfg(all(feature = "build_with_queue", not(feature = "async_enabled")))]
#[cfg(test)]
mod test_queue_sync
{
use crate::{formatters::DefaultSyslogFormatter, sync::{crossbeam_queue_adapter::DefaultQueueAdapter}, LogFacility, LogStat, Priority, SyslogApi, QueuedSyslog};
use super::DefaultLocalSyslogDestination;
#[cfg(target_family = "unix")]
use crate::SyslogLocal;
#[cfg(target_family = "windows")]
use crate::WindowsEvent;
#[test]
fn test_queue_single_message()
{
#[cfg(target_family = "unix")]
let syslog_provider =
SyslogLocal::new();
#[cfg(target_family = "windows")]
let syslog_provider =
WindowsEvent::new();
let log =
QueuedSyslog
::<DefaultQueueAdapter, DefaultSyslogFormatter, DefaultLocalSyslogDestination>
::openlog_with(
Some("queue_single_message"),
LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID,
LogFacility::LOG_DAEMON,
syslog_provider
);
assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
let log = log.unwrap();
let msg1 = format!("test UTF-8 проверка BOM UTF-8");
let now = std::time::Instant::now();
log.syslog(Priority::LOG_DEBUG, msg1.into());
let dur = now.elapsed();
println!("{:?}", dur);
let msg2 = format!("test UTF-8 きるさお命泉ぶねりよ日子金れっ");
let now = std::time::Instant::now();
log.syslog(Priority::LOG_DEBUG, msg2.into());
let dur = now.elapsed();
println!("{:?}", dur);
let _ = log.closelog();
return;
}
}
#[cfg(all(feature = "build_with_queue", feature = "async_embedded"))]
#[cfg(test)]
mod tests_queue
{
use crate::{formatters::DefaultSyslogFormatter, sync::DefaultQueueAdapter, LogFacility, LogStat, Priority, QueuedSyslog, SyslogApi};
use super::DefaultLocalSyslogDestination;
#[cfg(target_family = "unix")]
use crate::SyslogLocal;
#[cfg(target_family = "windows")]
use crate::WindowsEvent;
#[test]
fn test_multithreading()
{
use std::sync::Arc;
use std::thread;
use std::time::{Instant, Duration};
use crate::LOG_MASK;
#[cfg(target_family = "unix")]
let syslog_provider =
SyslogLocal::new();
#[cfg(target_family = "windows")]
let syslog_provider =
WindowsEvent::new();
let log =
QueuedSyslog
::<DefaultQueueAdapter, DefaultSyslogFormatter, DefaultLocalSyslogDestination>
::openlog_with(
Some("test5"),
LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID,
LogFacility::LOG_DAEMON, syslog_provider
);
assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
let log = Arc::new(log.unwrap());
let c1_log = log.clone();
let c2_log = log.clone();
thread::spawn(move|| {
for i in 0..5
{
thread::sleep(Duration::from_nanos(200));
let now = Instant::now();
c1_log.syslog(Priority::LOG_DEBUG, format!("a message from thread 1 #{}[]", i).into());
let elapsed = now.elapsed();
println!("t1: {:?}", elapsed);
}
});
thread::spawn(move|| {
for i in 0..5
{
thread::sleep(Duration::from_nanos(201));
let now = Instant::now();
c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
let elapsed = now.elapsed();
println!("t2: {:?}", elapsed);
}
});
let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
assert_eq!(res.unwrap(), 0xff, "should be 0xff");
let now = Instant::now();
log.syslog(Priority::LOG_DEBUG, format!("A message from main, сообщение от главнюка").into());
let elapsed = now.elapsed();
println!("main: {:?}", elapsed);
thread::sleep(Duration::from_secs(2));
let _ = log.closelog();
thread::sleep(Duration::from_millis(500));
let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
assert_eq!(res.is_ok(), true);
return;
}
#[cfg(feature = "build_ext_file")]
#[test]
fn test_file_multithreading()
{
use std::sync::Arc;
use std::thread;
use std::time::{Instant, Duration};
use crate::formatters::DefaultSyslogFormatterFile;
use crate::{SyslogFile, LOG_MASK};
let log =
QueuedSyslog
::<DefaultQueueAdapter, DefaultSyslogFormatterFile, SyslogFile>
::openlog_with(
Some("test2"),
LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID,
LogFacility::LOG_DAEMON,
SyslogFile::new("/tmp/syslog_rs_test2.log")
);
assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
let log = Arc::new(log.unwrap());
let c1_log = log.clone();
let c2_log = log.clone();
thread::spawn(move|| {
for i in 0..5
{
thread::sleep(Duration::from_nanos(200));
let now = Instant::now();
c1_log.syslog(Priority::LOG_ALERT, format!("a message from thread 1 #{}[]", i).into());
let elapsed = now.elapsed();
println!("t1: {:?}", elapsed);
}
});
thread::spawn(move|| {
for i in 0..5
{
thread::sleep(Duration::from_nanos(201));
let now = Instant::now();
c2_log.syslog(Priority::LOG_DEBUG, format!("きるさお命泉ぶねりよ日子金れっ {}", i).into());
let elapsed = now.elapsed();
println!("t2: {:?}", elapsed);
}
});
let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
assert_eq!(res.unwrap(), 0xff, "should be 0xff");
let now = Instant::now();
log.syslog(Priority::LOG_DEBUG, format!("A message from main, きるさお命泉ぶねりよ日子金れっ").into());
let elapsed = now.elapsed();
println!("main: {:?}", elapsed);
thread::sleep(Duration::from_secs(2));
let _ = log.closelog();
thread::sleep(Duration::from_millis(500));
let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
assert_eq!(res.is_ok(), true);
return;
}
}