use super::TimerSignal;
use crate::{
utils::{check_err, get_time_range},
Result,
};
use std::sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
};
use std::{
thread::{self, JoinHandle},
time::Duration,
};
const LOG_TAG: &str = "Pyroscope::Timer";
#[derive(Debug, Default)]
pub struct Timer {
txs: Arc<Mutex<Vec<Sender<TimerSignal>>>>,
pub handle: Option<JoinHandle<Result<()>>>,
}
impl Timer {
pub fn initialize(cycle: Duration) -> Result<Self> {
log::info!(target: LOG_TAG, "Initializing Timer");
let txs = Arc::new(Mutex::new(Vec::new()));
let (tx, _rx): (Sender<TimerSignal>, Receiver<TimerSignal>) = channel();
txs.lock()?.push(tx);
let kqueue = kqueue()?;
let handle = Some({
let txs = txs.clone();
thread::spawn(move || {
let initial_event = Timer::register_initial_expiration(kqueue)?;
Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?;
let loop_event = Timer::register_loop_expiration(kqueue, cycle)?;
loop {
if txs.lock()?.len() == 0 {
log::info!(target: LOG_TAG, "Timer thread terminated");
return Ok(());
}
let from = TimerSignal::NextSnapshot(get_time_range(0)?.from);
txs.lock()?.iter().for_each(|tx| {
match tx.send(from) {
Ok(_) => {
log::trace!(target: LOG_TAG, "Sent event to listener @ {:?}", &tx)
}
Err(_e) => {} }
});
Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?;
}
})
});
Ok(Self { handle, txs })
}
pub fn attach_listener(&mut self, tx: Sender<TimerSignal>) -> Result<()> {
let txs = Arc::clone(&self.txs);
txs.lock()?.push(tx);
Ok(())
}
pub fn drop_listeners(&mut self) -> Result<()> {
let txs = Arc::clone(&self.txs);
txs.lock()?.clear();
Ok(())
}
fn wait_event(kqueue: i32, events: *mut libc::kevent) -> Result<()> {
kevent(kqueue, [].as_mut_ptr(), 0, events, 1, std::ptr::null())?;
Ok(())
}
fn register_initial_expiration(kqueue: i32) -> Result<libc::kevent> {
let first_fire = get_time_range(0)?.until;
let initial_event = libc::kevent {
ident: 1,
filter: libc::EVFILT_TIMER,
flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT,
fflags: libc::NOTE_ABSOLUTE | libc::NOTE_SECONDS,
data: first_fire as isize,
udata: 0 as *mut libc::c_void,
};
kevent(
kqueue,
[initial_event].as_ptr() as *const libc::kevent,
1,
[].as_mut_ptr(),
0,
std::ptr::null(),
)?;
Ok(initial_event)
}
fn register_loop_expiration(kqueue: i32, duration: Duration) -> Result<libc::kevent> {
let loop_event = libc::kevent {
ident: 1,
filter: libc::EVFILT_TIMER,
flags: libc::EV_ADD | libc::EV_ENABLE,
fflags: 0,
data: duration.as_millis() as isize,
udata: 0 as *mut libc::c_void,
};
let _ke = kevent(
kqueue,
[loop_event].as_ptr() as *const libc::kevent,
1,
[].as_mut_ptr(),
0,
std::ptr::null(),
)?;
Ok(loop_event)
}
}
fn kqueue() -> Result<i32> {
check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32)
}
fn kevent(
kqueue: i32,
change: *const libc::kevent,
c_count: libc::c_int,
events: *mut libc::kevent,
e_count: libc::c_int,
timeout: *const libc::timespec,
) -> Result<()> {
check_err(unsafe { libc::kevent(kqueue, change, c_count, events, e_count, timeout) })?;
Ok(())
}