use crate::loom::sync::Arc;
use crate::runtime::context;
use crate::runtime::scheduler::{self, current_thread, Inject};
use crate::task::Id;
use backtrace::BacktraceFrame;
use std::cell::Cell;
use std::collections::VecDeque;
use std::ffi::c_void;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{self, Poll};
mod symbol;
mod trace_impl;
mod tree;
use symbol::Symbol;
use tree::Tree;
use super::{Notified, OwnedTasks, Schedule};
type Backtrace = Vec<BacktraceFrame>;
type SymbolTrace = Vec<Symbol>;
pub(crate) struct Context {
active_frame: Cell<Option<NonNull<Frame>>>,
#[allow(clippy::type_complexity)]
trace_leaf_fn: Cell<Option<NonNull<dyn FnMut(&TraceMeta)>>>,
}
struct Frame {
inner_addr: *const c_void,
parent: Option<NonNull<Frame>>,
}
#[derive(Clone, Debug)]
pub(crate) struct Trace {
backtraces: Vec<Backtrace>,
}
pin_project_lite::pin_project! {
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Root<T> {
#[pin]
future: T,
}
}
const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
as part of shutting down the current \
thread, so collecting a taskdump is not \
possible.";
impl Context {
pub(crate) const fn new() -> Self {
Context {
active_frame: Cell::new(None),
trace_leaf_fn: Cell::new(None),
}
}
unsafe fn try_with_current<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Self) -> R,
{
unsafe { crate::runtime::context::with_trace(f) }
}
unsafe fn with_current_frame<F, R>(f: F) -> R
where
F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
{
unsafe {
Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
}
}
fn current_frame_addr() -> Option<*const c_void> {
unsafe {
Context::try_with_current(|ctx| {
ctx.active_frame
.get()
.map(|frame| frame.as_ref().inner_addr)
})
.flatten()
}
}
fn try_with_current_trace_leaf_fn<F, R>(f: F) -> Option<R>
where
F: for<'a> FnOnce(&'a mut dyn FnMut(&TraceMeta)) -> R,
{
let mut ret = None;
let inner = |context: &Context| {
if let Some(mut trace_leaf_fn) = context.trace_leaf_fn.replace(None) {
let _restore = defer(move || {
context.trace_leaf_fn.set(Some(trace_leaf_fn));
});
ret = Some(f(unsafe { trace_leaf_fn.as_mut() }));
}
};
unsafe { Self::try_with_current(inner) };
ret
}
pub(crate) fn is_tracing() -> bool {
unsafe { Self::try_with_current(|ctx| ctx.trace_leaf_fn.get().is_some()).unwrap_or(false) }
}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct TraceMeta {
pub root_addr: Option<*const c_void>,
pub trace_leaf_addr: *const c_void,
}
pub fn trace_with<FN, FT, R>(f: FN, mut trace_leaf: FT) -> R
where
FN: FnOnce() -> R,
FT: FnMut(&TraceMeta),
{
let trace_leaf_dyn = (&mut trace_leaf) as &mut (dyn FnMut(&TraceMeta) + '_);
let trace_leaf_dyn = unsafe {
std::mem::transmute::<
*mut (dyn FnMut(&TraceMeta) + '_),
*mut (dyn FnMut(&TraceMeta) + 'static),
>(trace_leaf_dyn)
};
let trace_leaf_dyn = unsafe { NonNull::new_unchecked(trace_leaf_dyn) };
let mut old_trace_leaf_fn = None;
unsafe {
Context::try_with_current(|ctx| {
old_trace_leaf_fn = ctx.trace_leaf_fn.replace(Some(trace_leaf_dyn));
})
};
let _restore = defer(move || {
unsafe {
Context::try_with_current(|ctx| {
ctx.trace_leaf_fn.set(old_trace_leaf_fn);
})
};
});
f()
}
impl Trace {
#[inline(never)]
pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
where
F: FnOnce() -> R,
{
trace_impl::capture(f)
}
pub(crate) fn empty() -> Self {
Self { backtraces: vec![] }
}
fn push_backtrace(&mut self, bt: Vec<BacktraceFrame>) {
self.backtraces.push(bt);
}
#[inline(never)]
pub(crate) fn root<F>(future: F) -> Root<F> {
Root { future }
}
pub(crate) fn backtraces(&self) -> &[Backtrace] {
&self.backtraces
}
}
#[inline(never)]
pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
let root_addr = Context::current_frame_addr();
let ret = Context::try_with_current_trace_leaf_fn(|leaf_fn| {
let meta = TraceMeta {
root_addr,
trace_leaf_addr: trace_leaf as *const c_void,
};
leaf_fn(&meta);
context::with_scheduler(|scheduler| {
if let Some(scheduler) = scheduler {
match scheduler {
scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
#[cfg(feature = "rt-multi-thread")]
scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
}
}
});
});
match ret {
Some(()) => Poll::Pending,
None => Poll::Ready(()),
}
}
impl fmt::Display for Trace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Tree::from_trace(self.clone()).fmt(f)
}
}
fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
use std::mem::ManuallyDrop;
struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
#[inline(always)]
fn drop(&mut self) {
unsafe {
ManuallyDrop::take(&mut self.0)();
}
}
}
Defer(ManuallyDrop::new(f))
}
impl<T: Future> Future for Root<T> {
type Output = T::Output;
#[inline(never)]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
unsafe {
let mut frame = Frame {
inner_addr: Self::poll as *const c_void,
parent: None,
};
Context::with_current_frame(|current| {
frame.parent = current.take();
current.set(Some(NonNull::from(&frame)));
});
let _restore = defer(|| {
Context::with_current_frame(|current| {
current.set(frame.parent);
});
});
let this = self.project();
this.future.poll(cx)
}
}
}
pub(in crate::runtime) fn trace_current_thread(
owned: &OwnedTasks<Arc<current_thread::Handle>>,
local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
injection: &Inject<Arc<current_thread::Handle>>,
) -> Vec<(Id, Trace)> {
let mut dequeued = Vec::new();
while let Some(task) = local.pop_back() {
dequeued.push(task);
}
while let Some(task) = injection.pop() {
dequeued.push(task);
}
trace_owned(owned, dequeued)
}
cfg_rt_multi_thread! {
use crate::loom::sync::Mutex;
use crate::runtime::scheduler::multi_thread;
use crate::runtime::scheduler::multi_thread::Synced;
use crate::runtime::scheduler::inject::Shared;
pub(in crate::runtime) unsafe fn trace_multi_thread(
owned: &OwnedTasks<Arc<multi_thread::Handle>>,
local: &mut multi_thread::queue::Local<Arc<multi_thread::Handle>>,
synced: &Mutex<Synced>,
injection: &Shared<Arc<multi_thread::Handle>>,
) -> Vec<(Id, Trace)> {
let mut dequeued = Vec::new();
while let Some(notified) = local.pop() {
dequeued.push(notified);
}
let mut synced = synced.lock();
while let Some(notified) = unsafe { injection.pop(&mut synced.inject) } {
dequeued.push(notified);
}
drop(synced);
trace_owned(owned, dequeued)
}
}
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)> {
let mut tasks = dequeued;
owned.for_each(|task| {
if let Some(notified) = task.notify_for_tracing() {
tasks.push(notified);
}
});
tasks
.into_iter()
.map(|task| {
let local_notified = owned.assert_owner(task);
let id = local_notified.task.id();
let ((), trace) = Trace::capture(|| local_notified.run());
(id, trace)
})
.collect()
}