use std::io::Write;
use std::thread::JoinHandle;
use std::time::Duration;
use anyhow::Context;
use crossbeam_channel::bounded;
use crossbeam_channel::unbounded;
use crossbeam_channel::SendTimeoutError;
use crossbeam_channel::Sender;
use crate::append::rolling_file::worker::Worker;
use crate::append::rolling_file::Message;
#[derive(Debug)]
pub struct WorkerGuard {
_guard: Option<JoinHandle<()>>,
sender: Sender<Message>,
shutdown: Sender<()>,
shutdown_timeout: Duration,
}
impl WorkerGuard {
fn new(
handle: JoinHandle<()>,
sender: Sender<Message>,
shutdown: Sender<()>,
shutdown_timeout: Option<Duration>,
) -> Self {
const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(100);
WorkerGuard {
_guard: Some(handle),
sender,
shutdown,
shutdown_timeout: shutdown_timeout.unwrap_or(DEFAULT_SHUTDOWN_TIMEOUT),
}
}
}
impl Drop for WorkerGuard {
fn drop(&mut self) {
let shutdown_timeout = self.shutdown_timeout;
match self
.sender
.send_timeout(Message::Shutdown, shutdown_timeout)
{
Ok(()) => {
let _ = self.shutdown.send_timeout((), shutdown_timeout);
}
Err(SendTimeoutError::Disconnected(_)) => (),
Err(SendTimeoutError::Timeout(err)) => {
eprintln!("failed to send shutdown signal to logging worker: {err:?}",)
}
}
}
}
#[derive(Clone, Debug)]
pub struct NonBlocking {
sender: Sender<Message>,
}
impl NonBlocking {
fn create<T: Write + Send + 'static>(
writer: T,
thread_name: String,
buffered_lines_limit: Option<usize>,
shutdown_timeout: Option<Duration>,
) -> (NonBlocking, WorkerGuard) {
let (sender, receiver) = match buffered_lines_limit {
Some(cap) => bounded(cap),
None => unbounded(),
};
let (shutdown_sender, shutdown_receiver) = bounded(0);
let worker = Worker::new(writer, receiver, shutdown_receiver);
let worker_guard = WorkerGuard::new(
worker.make_thread(thread_name),
sender.clone(),
shutdown_sender,
shutdown_timeout,
);
(Self { sender }, worker_guard)
}
pub(super) fn send(&self, record: Vec<u8>) -> anyhow::Result<()> {
self.sender
.send(Message::Record(record))
.context("failed to send log message")
}
}
#[derive(Debug)]
pub struct NonBlockingBuilder {
thread_name: String,
buffered_lines_limit: Option<usize>,
shutdown_timeout: Option<Duration>,
}
impl NonBlockingBuilder {
pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder {
self.buffered_lines_limit = Some(buffered_lines_limit);
self
}
pub fn shutdown_timeout(mut self, shutdown_timeout: Duration) -> NonBlockingBuilder {
self.shutdown_timeout = Some(shutdown_timeout);
self
}
pub fn thread_name(mut self, name: impl Into<String>) -> NonBlockingBuilder {
self.thread_name = name.into();
self
}
pub fn finish<T: Write + Send + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard) {
NonBlocking::create(
writer,
self.thread_name,
self.buffered_lines_limit,
self.shutdown_timeout,
)
}
}
impl Default for NonBlockingBuilder {
fn default() -> Self {
NonBlockingBuilder {
thread_name: "logforth-rolling-file".to_string(),
buffered_lines_limit: None,
shutdown_timeout: None,
}
}
}