use std::{
io::{self, ErrorKind},
os::unix::net::UnixDatagram,
path::Path,
time::UNIX_EPOCH,
};
use bytes::BufMut;
use parking_lot::RwLockUpgradableReadGuard;
use crate::{thread, Buffer, Event, Record, LOGGER_ENTRY_MAX_LEN};
const LOGDW: &str = "/dev/socket/logdw";
lazy_static::lazy_static! {
static ref SOCKET: LogdSocket = LogdSocket::connect(Path::new(LOGDW));
}
struct LogdSocket {
socket: parking_lot::RwLock<UnixDatagram>,
}
impl LogdSocket {
pub fn connect(path: &Path) -> LogdSocket {
let socket = UnixDatagram::unbound().expect("failed to create socket");
socket.connect(path).ok();
socket
.set_nonblocking(true)
.expect("failed to set the logd socket to non blocking");
let lock = parking_lot::RwLock::new(socket);
LogdSocket { socket: lock }
}
pub fn send(&self, buffer: &[u8]) -> io::Result<()> {
let lock = self.socket.upgradable_read();
match lock.send(buffer) {
Ok(_) => (),
Err(e) if e.kind() == ErrorKind::WouldBlock => (), Err(_) => {
let socket = UnixDatagram::unbound()?;
let mut lock = RwLockUpgradableReadGuard::upgrade(lock);
socket.connect(LOGDW)?;
socket.set_nonblocking(true)?;
socket.send(buffer)?;
*lock = socket;
}
}
Ok(())
}
}
pub(crate) fn log(record: &Record) {
let tag_len = record.tag.len() + 1;
let message_len = record.message.len() + 1;
let mut buffer = bytes::BytesMut::with_capacity(12 + tag_len + message_len);
let timestamp = record.timestamp.duration_since(UNIX_EPOCH).unwrap();
buffer.put_u8(record.buffer_id.into());
buffer.put_u16_le(thread::id() as u16);
buffer.put_u32_le(timestamp.as_secs() as u32);
buffer.put_u32_le(timestamp.subsec_nanos());
buffer.put_u8(record.priority as u8);
buffer.put(record.tag.as_bytes());
buffer.put_u8(0);
buffer.put(record.message.as_bytes());
buffer.put_u8(0);
if let Err(e) = SOCKET.send(&buffer) {
eprintln!("Failed to send log message \"{}: {}\": {}", record.tag, record.message, e);
}
}
pub(crate) fn write_event(log_buffer: Buffer, event: &Event) {
let mut buffer = bytes::BytesMut::with_capacity(LOGGER_ENTRY_MAX_LEN);
let timestamp = event.timestamp.duration_since(UNIX_EPOCH).unwrap();
buffer.put_u8(log_buffer.into());
buffer.put_u16_le(thread::id() as u16);
buffer.put_u32_le(timestamp.as_secs() as u32);
buffer.put_u32_le(timestamp.subsec_nanos());
buffer.put_u32_le(event.tag);
buffer.put(event.value.as_bytes());
if let Err(e) = SOCKET.send(&buffer) {
eprintln!("Failed to write event {:?}: {}", event, e);
}
}
#[test]
fn smoke() {
use crate::Priority;
use std::time::SystemTime;
let tempdir = tempfile::tempdir().unwrap();
let socket = tempdir.path().join("socket");
{
let socket = socket.to_owned();
std::thread::spawn(move || loop {
std::fs::remove_file(&socket).ok();
let _socket = std::os::unix::net::UnixDatagram::bind(&socket).expect("Failed to bind");
std::thread::sleep(std::time::Duration::from_millis(1));
});
}
let start = std::time::Instant::now();
while start.elapsed() < std::time::Duration::from_secs(5) {
let timestamp = SystemTime::now();
let record = Record {
timestamp,
pid: std::process::id() as u16,
thread_id: thread::id() as u16,
buffer_id: Buffer::Main,
tag: "test",
priority: Priority::Info,
message: "test",
};
log(&record);
}
}