use tokio::time::{sleep, Duration};
use tokio::sync::Mutex;
use async_recursion::async_recursion;
use chrono::offset::Local;
#[cfg(any(
target_os = "freebsd",
target_os = "dragonfly",
target_os = "openbsd",
target_os = "netbsd",
target_os = "macos"
))]
use chrono::SecondsFormat;
use super::async_socket::*;
use super::portable;
use super::common::*;
use super::error::{SyRes};
struct SyslogInternal
{
logtag: Option<String>,
logstat: LogStat,
facility: LogFacility,
logmask: i32,
stream: SyslogSocket,
}
unsafe impl Sync for SyslogInternal{}
unsafe impl Send for SyslogInternal{}
impl Drop for SyslogInternal
{
fn drop(&mut self)
{
self.disconnectlog();
}
}
impl SyslogInternal
{
fn new(
logtag: Option<&str>,
logstat: LogStat,
facility: LogFacility,
logmask: i32,
stream: SyslogSocket) -> SyRes<Self>
{
let logtag =
match logtag
{
Some(r) => Some(r.to_string()),
None => None,
};
let mut inst =
Self
{
logtag,
logstat,
facility,
logmask,
stream
};
if inst.logstat.intersects(LogStat::LOG_NDELAY) == true
{
inst.connectlog()?;
}
return Ok(inst);
}
}
impl SyslogInternal
{
pub fn set_logmask(&mut self, logmask: i32) -> i32
{
let oldmask = self.logmask;
if logmask != 0
{
self.logmask = logmask;
}
return oldmask;
}
pub fn send_to_stderr(&self, msg: &mut [u8])
{
if self.logstat.intersects(LogStat::LOG_PERROR) == true
{
let mut newline = String::from("\n");
send_to_stderr(libc::STDERR_FILENO, msg, &mut newline);
}
}
pub fn is_logmasked(&self, pri: i32) -> bool
{
if ((1 << (pri & LogMask::LOG_PRIMASK)) & self.logmask) == 0
{
return true;
}
return false;
}
pub fn set_logtag<L: AsRef<str>>(&mut self, logtag: L)
{
self.logtag =
Some(truncate_n(logtag.as_ref(), 48));
}
fn disconnectlog(&mut self)
{
if self.stream.is_none() == false
{
self.stream.shutdown();
self.stream = SyslogSocket::none();
}
}
fn connectlog(&mut self) -> SyRes<()>
{
let stream = SyslogSocket::connect()?;
self.stream = stream;
return Ok(());
}
#[cfg(target_os = "linux")]
#[async_recursion]
async fn vsyslog1(&mut self, mut pri: i32, fmt: &str)
{
match check_invalid_bits(&mut pri)
{
Ok(_) => {},
Err(_e) => self.vsyslog1(get_internal_log(), fmt).await
}
if self.is_logmasked(pri) == true
{
return;
}
if (pri & LOG_FACMASK) == 0
{
pri |= self.facility.bits();
}
let timedate = Local::now().format("%h %e %T").to_string();
if self.logtag.is_none() == true
{
match portable::p_getprogname()
{
Some(r) => self.set_logtag(r),
None => self.set_logtag("")
}
}
let progname = self.logtag.as_ref().unwrap();
let msg_final =
if fmt.ends_with("\n") == true
{
truncate(fmt)
}
else
{
fmt
};
let msg_pri =
[
b"<", pri.to_string().as_bytes(), b">"
].concat();
let msg_header =
[
timedate.as_bytes(),
].concat();
let mut msg =
[
b" ", progname.as_bytes(),
b"[", portable::get_pid().to_string().as_str().as_bytes(), b"]:",
b" ", msg_final.as_bytes()
].concat();
drop(progname);
self.send_to_stderr(&mut msg);
let fullmsg = [msg_pri.as_slice(), msg_header.as_slice(), msg.as_slice()].concat();
if self.stream.is_none() == true
{
match self.connectlog()
{
Ok(_) => {},
Err(e) =>
{
self.send_to_stderr(unsafe { e.eject_string().as_bytes_mut() } );
return;
}
}
}
loop
{
match self.stream.send(&fullmsg).await
{
Ok(_) => return,
Err(err) =>
{
if let Some(libc::ENOBUFS) = err.raw_os_error()
{
if self.stream.is_priv() == true
{
break;
}
sleep(Duration::from_micros(1)).await;
}
else
{
self.disconnectlog();
match self.connectlog()
{
Ok(_) => {},
Err(_e) => break,
}
}
}
}
}
if self.logstat.intersects(LogStat::LOG_CONS)
{
let fd = unsafe {
libc::open(
PATH_CONSOLE.as_ptr(),
libc::O_WRONLY | libc::O_NONBLOCK | libc::O_CLOEXEC,
0
)
};
if fd >= 0
{
let mut without_pri = [msg_header.as_slice(), msg.as_slice()].concat();
let mut newline = String::from("\r\n");
send_to_stderr(fd, without_pri.as_mut_slice(),&mut newline);
unsafe {libc::close(fd)};
}
}
}
#[cfg(any(
target_os = "freebsd",
target_os = "dragonfly",
target_os = "openbsd",
target_os = "netbsd",
target_os = "macos"
))]
#[async_recursion]
async fn vsyslog1(&mut self, mut pri: i32, fmt: &str)
{
match check_invalid_bits(&mut pri)
{
Ok(_) => {},
Err(_e) => self.vsyslog1(get_internal_log(), fmt.as_ref()).await
}
if self.is_logmasked(pri) == true
{
return;
}
if (pri & LOG_FACMASK) == 0
{
pri |= self.facility.bits();
}
let mut hostname_buf = [0u8; MAXHOSTNAMELEN];
let hostname =
match nix::unistd::gethostname(&mut hostname_buf)
{
Ok(r) =>
{
match r.to_str()
{
Ok(r) => r,
Err(_e) => NILVALUE
}
},
Err(_e) => NILVALUE,
};
let timedate =
Local::now().to_rfc3339_opts(SecondsFormat::Secs, false);
if self.logtag.is_none() == true
{
match portable::p_getprogname()
{
Some(r) => self.set_logtag(r),
None => self.set_logtag("")
}
}
let progname = self.logtag.as_ref().unwrap();
let msg_final =
if fmt.ends_with("\n") == true
{
truncate(fmt)
}
else
{
fmt
};
let msg_pri =
[
b"<", pri.to_string().as_bytes(), b">1"
].concat();
let msg_header =
[
b" ", timedate.as_bytes(),
b" ", hostname.as_bytes(),
].concat();
let mut msg =
[
b" ", progname.as_bytes(),
b" ", portable::get_pid().to_string().as_str().as_bytes(),
b" ", NILVALUE.as_bytes(),
b" ", NILVALUE.as_bytes(),
b" ", msg_final.as_bytes()
].concat();
self.send_to_stderr(&mut msg);
let fullmsg = [msg_pri.as_slice(), msg_header.as_slice(), msg.as_slice()].concat();
if self.stream.is_none() == true
{
match self.connectlog()
{
Ok(_) => {},
Err(e) =>
{
self.send_to_stderr(unsafe { e.eject_string().as_bytes_mut() } );
return;
}
}
}
loop
{
match self.stream.send(&fullmsg).await
{
Ok(_) => return,
Err(err) =>
{
if let Some(libc::ENOBUFS) = err.raw_os_error()
{
if self.stream.is_priv() == true
{
break;
}
sleep(Duration::from_micros(1)).await;
}
else
{
self.disconnectlog();
match self.connectlog()
{
Ok(_) => {},
Err(_e) => break,
}
}
}
}
}
if self.logstat.intersects(LogStat::LOG_CONS)
{
let fd = unsafe {
libc::open(
PATH_CONSOLE.as_ptr(),
libc::O_WRONLY | libc::O_NONBLOCK | libc::O_CLOEXEC,
0
)
};
if fd >= 0
{
let mut without_pri = [msg_header.as_slice(), msg.as_slice()].concat();
let mut newline = String::from("\r\n");
send_to_stderr(fd, without_pri.as_mut_slice(),&mut newline);
unsafe {libc::close(fd)};
}
}
}
}
pub struct Syslog
{
lock: Mutex<SyslogInternal>,
}
unsafe impl Send for Syslog {}
unsafe impl Sync for Syslog {}
impl Syslog
{
pub async fn openlog(
ident: Option<&str>,
logstat: LogStat,
facility: LogFacility) -> SyRes<Self>
{
let inner =
SyslogInternal::new(
ident,
logstat,
facility,
0xff,
SyslogSocket::none()
)?;
let ret = Self
{
lock: Mutex::new(inner),
};
return Ok(ret);
}
pub async fn setlogmask(&self, logmask: i32) -> i32
{
let mut lock = self.lock.lock().await;
let pri = lock.set_logmask(logmask);
drop(lock);
return pri;
}
pub async fn closelog(&self)
{
let mut lock = self.lock.lock().await;
lock.disconnectlog();
drop(lock);
}
pub async fn syslog(&self, pri: Priority, fmt: String)
{
self.lock.lock().await.vsyslog1(pri.bits(), fmt.as_str()).await;
}
pub async fn vsyslog<S: AsRef<str>>(&self, pri: Priority, fmt: S)
{
let mut lock = self.lock.lock().await;
lock.vsyslog1(pri.bits(), fmt.as_ref()).await;
drop(lock);
}
pub async fn change_identity<I: AsRef<str>>(&self, ident: I)
{
let mut lock = self.lock.lock().await;
lock.set_logtag(ident);
drop(lock);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_multithreading()
{
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use std::time::{Instant};
let log =
Syslog::openlog(
Some("test1"),
LogStat::LOG_CONS | LogStat::LOG_NDELAY | LogStat::LOG_PID,
LogFacility::LOG_DAEMON).await;
assert_eq!(log.is_ok(), true, "{}", log.err().unwrap());
let log = Arc::new(log.unwrap());
let c1_log = log.clone();
let c2_log = log.clone();
tokio::spawn( async move
{
for i in 0..5
{
let cc_c1_log = c1_log.clone();
sleep(Duration::from_nanos(200)).await;
tokio::spawn( async move
{
let now = Instant::now();
cc_c1_log.syslog(Priority::LOG_DEBUG, format!("a message from thread 1 #{}[]", i)).await;
let elapsed = now.elapsed();
println!("t1: {:?}", elapsed);
});
}
}
);
tokio::spawn(async move
{
for i in 0..5
{
let cc_c2_log = c2_log.clone();
sleep(Duration::from_nanos(201)).await;
tokio::spawn( async move
{
let now = Instant::now();
cc_c2_log.syslog(Priority::LOG_DEBUG, format!("сообщение от треда 2 №{}ХЪ", i)).await;
let elapsed = now.elapsed();
println!("t2: {:?}", elapsed);
});
}
});
let now = Instant::now();
log.syslog(Priority::LOG_DEBUG, format!("A message from main, сообщение от главнюка")).await;
let elapsed = now.elapsed();
println!("main: {:?}", elapsed);
sleep(Duration::from_secs(2)).await;
log.closelog().await;
sleep(Duration::from_nanos(201)).await;
return;
}