extern crate libc;
extern crate nix;
extern crate threadinfo;
extern crate unwind_sys;
use self::{
nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal},
threadinfo::Thread,
unwind_sys::*,
};
use std::{cell::UnsafeCell, fs, io, mem, process};
use types::{Frame, Sample, Unwinder};
struct PosixSemaphore {
sem: UnsafeCell<libc::sem_t>,
}
impl PosixSemaphore {
#[allow(deprecated)]
pub fn new(value: u32) -> io::Result<Self> {
let mut sem: libc::sem_t = unsafe { mem::uninitialized() };
let r = unsafe {
libc::sem_init(&mut sem, 0 , value)
};
if r == -1 {
return Err(io::Error::last_os_error());
}
Ok(PosixSemaphore {
sem: UnsafeCell::new(sem),
})
}
pub fn post(&self) -> io::Result<()> {
if unsafe { libc::sem_post(self.sem.get()) } == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
pub fn wait(&self) -> io::Result<()> {
if unsafe { libc::sem_wait(self.sem.get()) } == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
pub fn wait_through_intr(&self) -> io::Result<()> {
loop {
match self.wait() {
Err(os_error) => {
let err = os_error.raw_os_error().expect("os error");
if err == libc::EINTR {
continue;
}
return Err(os_error);
}
_ => return Ok(()),
}
}
}
}
unsafe impl Sync for PosixSemaphore {}
impl Drop for PosixSemaphore {
fn drop(&mut self) {
unsafe { libc::sem_destroy(self.sem.get()) };
}
}
struct SharedState {
msg2: Option<PosixSemaphore>,
msg3: Option<PosixSemaphore>,
msg4: Option<PosixSemaphore>,
context: Option<libc::ucontext_t>,
}
static mut SHARED_STATE: SharedState = SharedState {
msg2: None,
msg3: None,
msg4: None,
context: None,
};
fn clear_shared_state() {
unsafe {
SHARED_STATE.msg2 = None;
SHARED_STATE.msg3 = None;
SHARED_STATE.msg4 = None;
SHARED_STATE.context = None;
}
}
fn reset_shared_state() {
unsafe {
SHARED_STATE.msg2 = Some(PosixSemaphore::new(0).expect("valid semaphore"));
SHARED_STATE.msg3 = Some(PosixSemaphore::new(0).expect("valid semaphore"));
SHARED_STATE.msg4 = Some(PosixSemaphore::new(0).expect("valid semaphore"));
SHARED_STATE.context = None;
}
}
pub struct Sampler {
old_handler: SigAction,
}
impl Sampler {
pub fn new() -> Self {
let handler = SigHandler::SigAction(sigprof_handler);
let action = SigAction::new(
handler,
SaFlags::SA_RESTART | SaFlags::SA_SIGINFO,
SigSet::empty(),
);
let old = unsafe { sigaction(Signal::SIGPROF, &action).expect("signal handler set") };
Sampler { old_handler: old }
}
pub fn suspend_and_resume_thread<F, T>(&self, thread: Thread, callback: F) -> T
where
F: FnOnce(&mut libc::ucontext_t) -> T,
{
debug_assert!(!thread.is_current_thread(), "Can't suspend sampler itself!");
reset_shared_state();
thread.send_signal(libc::SIGPROF);
unsafe {
SHARED_STATE
.msg2
.as_ref()
.unwrap()
.wait_through_intr()
.expect("msg2 wait succeeded");
}
let results = unsafe { callback(&mut SHARED_STATE.context.expect("valid context")) };
unsafe {
SHARED_STATE.msg3.as_ref().unwrap().post().expect("posted");
}
unsafe {
SHARED_STATE
.msg4
.as_ref()
.unwrap()
.wait_through_intr()
.expect("msg4 wait succeeded");
}
clear_shared_state();
results
}
}
impl Default for Sampler {
fn default() -> Self {
Self::new()
}
}
impl Drop for Sampler {
fn drop(&mut self) {
unsafe {
sigaction(Signal::SIGPROF, &self.old_handler).expect("previous signal handler restored")
};
}
}
extern "C" fn sigprof_handler(
sig: libc::c_int,
_info: *mut libc::siginfo_t,
ctx: *mut libc::c_void,
) {
assert_eq!(sig, libc::SIGPROF);
unsafe {
let context: libc::ucontext_t = *(ctx as *mut libc::ucontext_t);
SHARED_STATE.context = Some(context);
SHARED_STATE.msg2.as_ref().unwrap().post().expect("posted");
SHARED_STATE
.msg3
.as_ref()
.unwrap()
.wait_through_intr()
.expect("msg3 wait succeeded");
SHARED_STATE.msg4.as_ref().unwrap().post().expect("posted");
}
}
pub struct LibunwindUnwinder {
frames: Sample,
}
impl LibunwindUnwinder {
pub fn new(max_frames: usize) -> Self {
Self {
frames: Vec::with_capacity(max_frames),
}
}
}
impl Unwinder<&mut libc::ucontext_t> for LibunwindUnwinder {
#[allow(deprecated)]
fn unwind(mut self, context: &mut libc::ucontext_t) -> Result<Sample, i32> {
let mut cursor: unw_cursor_t = unsafe { mem::uninitialized() };
let init = unsafe { unw_init_local(&mut cursor, context) };
if init < 0 {
return Err(init);
}
loop {
if self.frames.len() == self.frames.capacity() {
break;
}
let step = unsafe { unw_step(&mut cursor) };
if step == 0 {
break;
} else if step < 0 {
return Err(step);
}
let mut ip = 0;
let rr = unsafe { unw_get_reg(&mut cursor, UNW_REG_IP, &mut ip) };
if rr < 0 {
return Err(rr);
}
let frame = Frame { ip };
self.frames.push(frame);
}
Ok(self.frames)
}
}
#[cfg(test)]
mod tests {
extern crate libc;
extern crate nix;
extern crate rustc_demangle;
extern crate std;
use super::*;
use self::rustc_demangle::demangle;
use std::{
sync::{mpsc::channel, Arc},
thread::spawn,
};
static mut SIGNAL_RECEIVED: bool = false;
extern "C" fn acknowledge_sigprof(
sig: libc::c_int,
_info: *mut libc::siginfo_t,
_ctx: *mut libc::c_void,
) {
assert_eq!(sig, libc::SIGPROF);
unsafe {
SIGNAL_RECEIVED = true;
}
}
#[test]
fn test_sigprof() {
let handler = SigHandler::SigAction(acknowledge_sigprof);
let action = SigAction::new(
handler,
SaFlags::SA_RESTART | SaFlags::SA_SIGINFO,
SigSet::empty(),
);
unsafe {
sigaction(Signal::SIGPROF, &action).expect("signal handler set");
}
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let handle = spawn(move || {
let tid = threadinfo::current_thread().unwrap();
tx.send(tid).unwrap();
rx2.recv().unwrap();
});
let to = rx.recv().unwrap();
to.send_signal(libc::SIGPROF);
tx2.send(()).unwrap();
handle.join().expect("successful join");
unsafe {
assert!(SIGNAL_RECEIVED);
}
}
#[test]
fn test_semaphore() {
let semaphore = Arc::new(PosixSemaphore::new(0).expect("init"));
let semaphoret = semaphore.clone();
let handle = spawn(move || {
semaphoret.post().expect("posted");
});
semaphore.wait().expect("received wait");
handle.join().expect("successful join");
}
#[test]
fn test_suspend_resume() {
let sampler = Sampler::new();
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let handle = spawn(move || {
tx.send(threadinfo::current_thread().unwrap()).unwrap();
rx2.recv().unwrap();
});
let to = rx.recv().unwrap();
sampler.suspend_and_resume_thread(to, |context| {
assert!(context.uc_stack.ss_size > 0);
tx2.send(()).unwrap();
});
handle.join().unwrap();
unsafe {
assert!(SHARED_STATE.context.is_none());
}
}
#[test]
#[should_panic]
fn test_suspend_resume_itself() {
let sampler = Sampler::new();
let to = threadinfo::current_thread().unwrap();
sampler.suspend_and_resume_thread(to, |_| {});
}
#[test]
#[ignore] #[allow(deprecated)]
fn test_suspend_resume_unwind() {
let sampler = Sampler::new();
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let handle = spawn(move || {
let baz = || {
tx.send(threadinfo::current_thread().unwrap()).unwrap();
rx2.recv().unwrap();
};
let bar = || {
baz();
};
let foo = || {
bar();
};
foo();
});
let to = rx.recv().unwrap();
sampler.suspend_and_resume_thread(to, |context| unsafe {
assert!(context.uc_stack.ss_size > 0);
let mut cursor: unw_cursor_t = mem::uninitialized();
let mut offset = 0;
unw_init_local(&mut cursor, context);
while unw_step(&mut cursor) > 0 {
let mut buf = vec![0; 256];
let r = unw_get_proc_name(
&mut cursor,
buf.as_mut_ptr() as *mut i8,
buf.len(),
&mut offset,
);
if r < 0 {
eprintln!("error {}", r);
} else {
let len = buf.iter().position(|b| *b == 0).unwrap();
buf.truncate(len);
let name = String::from_utf8_lossy(&buf).into_owned();
eprintln!("fn {:#}", demangle(&name));
}
}
tx2.send(()).unwrap();
});
handle.join().unwrap();
unsafe {
assert!(SHARED_STATE.context.is_none());
}
}
}