use log::{LevelFilter, Log, Metadata, Record, SetLoggerError};
use serde::{Deserialize, Serialize};
use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc::{self, Receiver, SyncSender, TrySendError};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
pub const DEFAULT_LOG_UDP_PORT: u16 = 39101;
const MAX_BATCH_SIZE: usize = 50;
const BATCH_TIMEOUT_MS: u64 = 20;
const CHANNEL_CAPACITY: usize = 1000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp_ms: u64,
pub level: String,
pub source: String,
pub target: String,
pub message: String,
}
impl LogEntry {
pub fn from_record(record: &Record, source: &str) -> Self {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
Self {
timestamp_ms,
level: record.level().to_string(),
source: source.to_string(),
target: record.target().to_string(),
message: format!("{}", record.args()),
}
}
}
pub struct UdpLogger {
tx: SyncSender<LogEntry>,
level: LevelFilter,
source: String,
}
impl UdpLogger {
fn new(server_addr: SocketAddr, level: LevelFilter, source: String) -> Self {
let (tx, rx) = mpsc::sync_channel::<LogEntry>(CHANNEL_CAPACITY);
thread::spawn(move || {
log_sender_thread(rx, server_addr);
});
Self { tx, level, source }
}
}
impl Log for UdpLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &Record) {
if !self.enabled(record.metadata()) {
return;
}
let entry = LogEntry::from_record(record, &self.source);
match self.tx.try_send(entry) {
Ok(_) => {}
Err(TrySendError::Full(_)) => {
}
Err(TrySendError::Disconnected(_)) => {
}
}
}
fn flush(&self) {
}
}
fn log_sender_thread(rx: Receiver<LogEntry>, server_addr: SocketAddr) {
let socket = match UdpSocket::bind("0.0.0.0:0") {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to create UDP socket for logging: {}", e);
return;
}
};
if let Err(e) = socket.set_read_timeout(Some(Duration::from_millis(BATCH_TIMEOUT_MS))) {
eprintln!("Failed to set socket timeout: {}", e);
}
let mut batch: Vec<LogEntry> = Vec::with_capacity(MAX_BATCH_SIZE);
let mut last_send = Instant::now();
loop {
match rx.recv_timeout(Duration::from_millis(BATCH_TIMEOUT_MS)) {
Ok(entry) => {
batch.push(entry);
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
if !batch.is_empty() {
send_batch(&socket, &server_addr, &batch);
}
break;
}
}
let timeout_elapsed = last_send.elapsed() >= Duration::from_millis(BATCH_TIMEOUT_MS);
if !batch.is_empty() && (batch.len() >= MAX_BATCH_SIZE || timeout_elapsed) {
send_batch(&socket, &server_addr, &batch);
batch.clear();
last_send = Instant::now();
}
}
}
fn send_batch(socket: &UdpSocket, server_addr: &SocketAddr, batch: &[LogEntry]) {
match serde_json::to_vec(batch) {
Ok(json) => {
let _ = socket.send_to(&json, server_addr);
}
Err(e) => {
eprintln!("Failed to serialize log batch: {}", e);
}
}
}
pub fn init_udp_logger(
server_host: &str,
port: u16,
level: LevelFilter,
source: &str,
) -> Result<(), SetLoggerError> {
let server_addr: SocketAddr = format!("{}:{}", server_host, port)
.parse()
.unwrap_or_else(|_| {
format!("127.0.0.1:{}", port).parse().unwrap()
});
let logger = UdpLogger::new(server_addr, level, source.to_string());
log::set_boxed_logger(Box::new(logger))?;
log::set_max_level(level);
Ok(())
}
pub fn init_default_logger() -> Result<(), SetLoggerError> {
init_udp_logger("127.0.0.1", DEFAULT_LOG_UDP_PORT, LevelFilter::Info, "control")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_log_entry_from_record() {
let entry = LogEntry {
timestamp_ms: 1234567890,
level: "INFO".to_string(),
source: "test".to_string(),
target: "test::module".to_string(),
message: "Test message".to_string(),
};
assert_eq!(entry.level, "INFO");
assert_eq!(entry.source, "test");
}
#[test]
fn test_log_entry_serialization() {
let entry = LogEntry {
timestamp_ms: 1234567890,
level: "DEBUG".to_string(),
source: "control".to_string(),
target: "my_program".to_string(),
message: "Hello world".to_string(),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("DEBUG"));
assert!(json.contains("Hello world"));
let batch = vec![entry.clone(), entry];
let batch_json = serde_json::to_string(&batch).unwrap();
assert!(batch_json.starts_with('['));
}
}