use super::TimerSignal;
use crate::{
utils::{check_err, get_time_range},
PyroscopeError, Result,
};
use std::sync::{
mpsc::{channel, Sender},
Arc, Mutex,
};
use std::{
thread::{self, JoinHandle},
time::Duration,
};
const LOG_TAG: &str = "Pyroscope::Timer";
#[derive(Debug)]
pub struct Timer {
txs: Arc<Mutex<Vec<Sender<TimerSignal>>>>,
pub handle: Option<JoinHandle<Result<()>>>,
}
impl Timer {
pub fn initialize(cycle: Duration) -> Result<Self> {
log::info!(target: LOG_TAG, "Initializing Timer");
let txs = Arc::new(Mutex::new(Vec::new()));
let (tx, _rx) = channel();
txs.lock()?.push(tx);
let timer_fd = Timer::set_timerfd(cycle)?;
let epoll_fd = Timer::create_epollfd(timer_fd)?;
let handle = Some({
let txs = txs.clone();
thread::spawn(move || {
loop {
if txs.lock()?.is_empty() {
unsafe { libc::close(timer_fd) };
unsafe { libc::close(epoll_fd) };
log::info!(target: LOG_TAG, "Timer thread terminated");
return Ok::<_, PyroscopeError>(());
}
let res = Timer::epoll_wait(timer_fd, epoll_fd);
if matches!(&res, Err(PyroscopeError::Io(err)) if err.kind() == std::io::ErrorKind::Interrupted)
{
continue;
}
res?;
let from = TimerSignal::NextSnapshot(get_time_range(0)?.from);
log::trace!(target: LOG_TAG, "Timer fired @ {}", from);
txs.lock()?.iter().for_each(|tx| {
match tx.send(from) {
Ok(_) => {
log::trace!(target: LOG_TAG, "Sent event to listener @ {:?}", &tx)
}
Err(_e) => {} }
});
}
})
});
Ok(Self { handle, txs })
}
fn set_timerfd(cycle: Duration) -> Result<libc::c_int> {
let clockid: libc::clockid_t = libc::CLOCK_REALTIME;
let clock_flags: libc::c_int = libc::TFD_NONBLOCK;
let tfd = timerfd_create(clockid, clock_flags)?;
let first_fire = get_time_range(0)?.until;
let mut new_value = libc::itimerspec {
it_interval: libc::timespec {
tv_sec: cycle.as_secs() as i64,
tv_nsec: cycle.subsec_nanos() as i64,
},
it_value: libc::timespec {
tv_sec: first_fire as i64,
tv_nsec: 0,
},
};
let mut old_value = libc::itimerspec {
it_interval: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
it_value: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
};
let set_flags = libc::TFD_TIMER_ABSTIME;
timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value)?;
Ok(tfd)
}
fn create_epollfd(timer_fd: libc::c_int) -> Result<libc::c_int> {
let epoll_fd = epoll_create1(0)?;
let mut event = libc::epoll_event {
events: libc::EPOLLIN as u32,
u64: 1,
};
let epoll_flags = libc::EPOLL_CTL_ADD;
epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event)?;
Ok(epoll_fd)
}
fn epoll_wait(timer_fd: libc::c_int, epoll_fd: libc::c_int) -> Result<()> {
let mut events = Vec::with_capacity(1);
unsafe {
epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1)?;
}
let mut buffer: u64 = 0;
let bufptr: *mut _ = &mut buffer;
unsafe {
read(timer_fd, bufptr as *mut libc::c_void, 8)?;
}
Ok(())
}
pub fn attach_listener(&mut self, tx: Sender<TimerSignal>) -> Result<()> {
let txs = Arc::clone(&self.txs);
txs.lock()?.push(tx);
Ok(())
}
pub fn drop_listeners(&mut self) -> Result<()> {
let txs = Arc::clone(&self.txs);
txs.lock()?.clear();
Ok(())
}
}
pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::timerfd_create(clockid, clock_flags) })
}
pub fn timerfd_settime(
timer_fd: i32,
set_flags: libc::c_int,
new_value: &mut libc::itimerspec,
old_value: &mut libc::itimerspec,
) -> Result<()> {
check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?;
Ok(())
}
pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::epoll_create1(epoll_flags) })
}
pub fn epoll_ctl(
epoll_fd: i32,
epoll_flags: libc::c_int,
timer_fd: i32,
event: &mut libc::epoll_event,
) -> Result<()> {
check_err(unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) })?;
Ok(())
}
pub unsafe fn epoll_wait(
epoll_fd: i32,
events: *mut libc::epoll_event,
maxevents: libc::c_int,
timeout: libc::c_int,
) -> Result<()> {
check_err(libc::epoll_wait(epoll_fd, events, maxevents, timeout))?;
Ok(())
}
pub unsafe fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
check_err(libc::read(timer_fd, bufptr, count))?;
Ok(())
}