use ntime::{Duration, Timestamp, sleep};
use std::sync::Mutex;
use std::sync::mpsc;
use std::thread;
use crate::attributes;
use crate::sink::LogUpdate;
use crate::types::{AsyncSinkSender, SinkRef};
const ASYNC_HANDLER_OP_TIMEOUT: Duration = Duration::from_secs(5);
const ASYNC_HANDLER_SPINLOCK_WAIT: Duration = Duration::from_millis(50);
static GLOBAL_ASYNC_HANDLER: Mutex<Option<AsyncSinkHandler>> = Mutex::new(None);
static GLOBAL_ASYNC_HANDLER_REFCOUNT: Mutex<u32> = Mutex::new(0);
pub enum AsyncSinkOp {
Log { sink: SinkRef, update: LogUpdate, attrs: attributes::Map },
FlushSink { sink: SinkRef },
}
struct AsyncSinkHandler {
tx: Option<AsyncSinkSender>,
rx_handler: Option<thread::JoinHandle<()>>,
}
impl AsyncSinkHandler {
fn new() -> Self {
let (tx, rx) = mpsc::channel::<AsyncSinkOp>();
let rx_handler = thread::spawn(move || {
while let Ok(cmd) = rx.recv() {
match cmd {
AsyncSinkOp::Log { sink, update, attrs } => match sink.lock() {
Ok(mut s) => match s.log(&update, &attrs) {
Ok(_) => (),
Err(e) => panic!("async log update {update:?} on sink {name} failed: {e}", name = s.name()),
},
Err(e) => panic!("failed to acquire lock on sink: {e}"),
},
AsyncSinkOp::FlushSink { sink } => match sink.lock() {
Ok(mut s) => match s.flush() {
Ok(_) => (),
Err(e) => panic!("async flush on sink {name} failed: {e}", name = s.name()),
},
Err(e) => panic!("failed to acquire lock on sink: {e}"),
},
};
}
});
Self {
tx: Some(tx),
rx_handler: Some(rx_handler),
}
}
fn get_sender(&self) -> AsyncSinkSender {
match self.tx {
Some(ref tx) => tx.clone(),
None => panic!("tried to get a sender for a closed async queue handler"),
}
}
fn shutdown(&mut self) {
self.tx = None;
match self.rx_handler.take() {
None => panic!("tried to shut down a closed sync queue handler"),
Some(rx_handler) => {
let start = Timestamp::now();
while !rx_handler.is_finished() {
if Timestamp::now().diff_as_duration(&start) > ASYNC_HANDLER_OP_TIMEOUT {
panic!("failed to shut downh AsyncSinkHanlder after {wait:?}", wait = ASYNC_HANDLER_OP_TIMEOUT,);
};
sleep(ASYNC_HANDLER_SPINLOCK_WAIT);
thread::yield_now();
}
}
};
}
}
impl Default for AsyncSinkHandler {
fn default() -> Self {
Self::new()
}
}
impl Drop for AsyncSinkHandler {
fn drop(&mut self) {
self.shutdown()
}
}
fn drop() {
*(GLOBAL_ASYNC_HANDLER.lock().unwrap()) = None;
}
pub fn refcount() -> u32 {
*(GLOBAL_ASYNC_HANDLER_REFCOUNT.lock().unwrap())
}
pub fn inc_refcount() {
*(GLOBAL_ASYNC_HANDLER_REFCOUNT.lock().unwrap()) += 1;
}
pub fn dec_refcount() {
let mut count = GLOBAL_ASYNC_HANDLER_REFCOUNT.lock().unwrap();
if *count == 0 {
panic!("async loggers count decremented below zero");
}
*count -= 1;
if *count == 0 {
drop();
}
}
pub fn get_sender() -> AsyncSinkSender {
GLOBAL_ASYNC_HANDLER.lock().unwrap().get_or_insert_default().get_sender()
}
pub fn log(tx: &AsyncSinkSender, sink: &SinkRef, update: &LogUpdate, attrs: &attributes::Map) {
match tx.send(AsyncSinkOp::Log {
sink: sink.clone(),
update: update.clone(),
attrs: attrs.clone(),
}) {
Ok(_) => (),
Err(e) => {
let sink_name = sink.lock().unwrap().name().to_string();
panic!("failed to queue log update {update:?} + {attrs} on {sink_name}: {e}");
}
};
}
pub fn flush(tx: &AsyncSinkSender, sink: &SinkRef) {
match tx.send(AsyncSinkOp::FlushSink { sink: sink.clone() }) {
Ok(_) => (),
Err(e) => {
let sink_name = sink.lock().unwrap().name().to_string();
panic!("failed to queue flush on {sink_name}: {e}");
}
};
}