use std::thread;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use crossbeam::channel::{unbounded, Sender, Receiver};
use crossbeam::deque::{Injector, Steal};
use crate::{map_error, throw_error_code, map_error_code};
use super::syslog_sync_internal::SyncSyslogInternal;
use super::common::*;
use super::error::{SyRes, SyslogError, SyslogErrCode};
enum SyCmd
{
Syslog
{
pri: i32,
msg: String
},
Logmask
{
logmask: i32,
loopback: Option<Sender<i32>>,
},
ChangeIdentity
{
identity: String,
},
Stop,
}
impl SyCmd
{
fn form_syslog(pri: i32, msg: String) -> Self
{
return Self::Syslog
{
pri, msg
};
}
fn form_logmask(logmask: i32, need_prev_pri: bool) -> (Self, Option<Receiver<i32>>)
{
let ret =
if need_prev_pri == true
{
let (tx, rx) = unbounded::<i32>();
(Self::Logmask{logmask, loopback: Some(tx)}, Some(rx))
}
else
{
(Self::Logmask{logmask, loopback: None}, None)
};
return ret;
}
fn form_change_ident(identity: String) -> Self
{
return Self::ChangeIdentity
{
identity: identity
};
}
fn form_stop() -> Self
{
return Self::Stop;
}
}
struct SyslogInternal
{
run_flag: Arc<AtomicBool>,
tasks: Arc<Injector<SyCmd>>,
inner: SyncSyslogInternal,
}
impl Drop for SyslogInternal
{
fn drop(&mut self)
{
self.inner.disconnectlog();
}
}
impl SyslogInternal
{
fn new(
run_flag: Arc<AtomicBool>,
tasks: Arc<Injector<SyCmd>>,
ident: Option<&str>,
logstat: LogStat,
facility: LogFacility
) -> SyRes<Self>
{
let ret =
Self
{
run_flag: run_flag,
tasks: tasks,
inner: SyncSyslogInternal::new(ident, logstat, facility)
};
if logstat.contains(LogStat::LOG_NDELAY) == true
{
ret.inner.connectlog()?;
}
return Ok(ret);
}
fn thread_worker(self)
{
loop
{
if self.run_flag.load(Ordering::Relaxed) == false
{
break;
}
match self.tasks.steal()
{
Steal::Success(task) =>
{
match task
{
SyCmd::Syslog{pri, msg} =>
{
self.inner.vsyslog1(pri, msg);
},
SyCmd::Logmask{logmask, loopback} =>
{
let pri = self.inner.set_logmask(logmask);
if let Some(lbk) = loopback
{
let _ = lbk.send(pri);
}
},
SyCmd::ChangeIdentity{identity} =>
{
self.inner.set_logtag(identity);
},
SyCmd::Stop =>
{
break;
}
}
},
Steal::Retry =>
{
thread::sleep(Duration::from_nanos(100));
},
_ =>
{
thread::park_timeout(Duration::from_millis(500));
}
}
}
return;
}
}
pub struct Syslog
{
run_control: Weak<AtomicBool>,
tasks: Arc<Injector<SyCmd>>,
thread: Option<thread::JoinHandle<()>>,
}
unsafe impl Send for Syslog {}
unsafe impl Sync for Syslog {}
impl Syslog
{
pub fn openlog(
ident: Option<&str>,
logstat: LogStat,
facility: LogFacility) -> SyRes<Self>
{
let run_flag: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
let run_control = Arc::downgrade(&run_flag);
let tasks =
Arc::new(Injector::<SyCmd>::new());
let inst =
SyslogInternal::new(run_flag, tasks.clone(), ident, logstat, facility)?;
let thread_hnd =
thread::Builder::new()
.name("syslog/0".to_string())
.spawn(move || SyslogInternal::thread_worker(inst))
.map_err(|e| map_error!("ctor Parser: thread spawn failed. {}", e))?;
let ret =
Self
{
run_control: run_control,
tasks: tasks,
thread: Some(thread_hnd),
};
return Ok(ret);
}
pub fn setlogmask(&self, logmask: i32) -> SyRes<i32>
{
if self.run_control.upgrade().is_some() == true
{
let (sy_cmd, opt_rx) =
SyCmd::form_logmask(logmask, true);
self.tasks.push(sy_cmd);
self.thread.as_ref().unwrap().thread().unpark();
let rx = opt_rx.unwrap();
return rx.recv()
.map_err(|e|
map_error_code!(SyslogErrCode::UnboundedChannelError, "{}", e)
);
}
throw_error_code!(SyslogErrCode::SyslogThreadNotAvailable, "syslog is not available");
}
pub fn closelog(&self)
{
if self.run_control.upgrade().is_some() == true
{
self.tasks.push(SyCmd::form_stop());
self.thread.as_ref().unwrap().thread().unpark();
}
return;
}
pub fn syslog(&self, pri: Priority, fmt: String)
{
if self.run_control.upgrade().is_some() == true
{
let sy_cmd =
SyCmd::form_syslog(pri.bits(), fmt);
self.tasks.push(sy_cmd);
self.thread.as_ref().unwrap().thread().unpark();
}
return;
}
pub fn vsyslog<S: AsRef<str>>(&self, pri: Priority, fmt: S)
{
if self.run_control.upgrade().is_some() == true
{
let sy_cmd =
SyCmd::form_syslog(pri.bits(), fmt.as_ref().to_string());
self.tasks.push(sy_cmd);
self.thread.as_ref().unwrap().thread().unpark();
}
return;
}
pub fn change_identity<I: AsRef<str>>(&self, ident: I)
{
if self.run_control.upgrade().is_some() == true
{
let sy_cmd =
SyCmd::form_change_ident(ident.as_ref().to_string());
self.tasks.push(sy_cmd);
self.thread.as_ref().unwrap().thread().unpark();
}
return;
}
}
#[test]
fn test_multithreading()
{
use std::sync::Arc;
use std::thread;
use std::time::{Instant, Duration};
use super::LOG_MASK;
let log =
Syslog::openlog(
Some("test1"),
LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID,
LogFacility::LOG_DAEMON);
assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
let log = Arc::new(log.unwrap());
let c1_log = log.clone();
let c2_log = log.clone();
thread::spawn(move|| {
for i in 0..5
{
thread::sleep(Duration::from_nanos(200));
let now = Instant::now();
c1_log.syslog(Priority::LOG_DEBUG, format!("a message from thread 1 #{}[]", i));
let elapsed = now.elapsed();
println!("t1: {:?}", elapsed);
}
});
thread::spawn(move|| {
for i in 0..5
{
thread::sleep(Duration::from_nanos(201));
let now = Instant::now();
c2_log.syslog(Priority::LOG_DEBUG, format!("сообщение от треда 2 №{}ХЪ", i));
let elapsed = now.elapsed();
println!("t2: {:?}", elapsed);
}
});
let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
assert_eq!(res.unwrap(), 0xff, "should be 0xff");
let now = Instant::now();
log.syslog(Priority::LOG_DEBUG, format!("A message from main, сообщение от главнюка"));
let elapsed = now.elapsed();
println!("main: {:?}", elapsed);
thread::sleep(Duration::from_secs(2));
log.closelog();
thread::sleep(Duration::from_millis(500));
let res = log.setlogmask(!LOG_MASK!(Priority::LOG_ERR));
assert_eq!(res.is_err(), true, "not an error, why?");
let error = res.err().unwrap();
assert_eq!(error.get_errcode(), SyslogErrCode::SyslogThreadNotAvailable, "unexpected error code {:?}", error);
return;
}