mod context;
mod future;
mod tokio;
use crate::debugger::address::Address;
pub use crate::debugger::r#async::future::AsyncFnFutureState;
pub use crate::debugger::r#async::future::Future;
use nix::sys::signal::Signal;
pub use tokio::TokioVersion;
pub use tokio::extract_tokio_version_naive;
pub use tokio::park::BlockThread;
use tokio::task::task_header_state_value_and_ptr;
pub use tokio::worker::Worker;
use super::address::GlobalAddress;
use super::breakpoint::Breakpoint;
use super::debugee::tracer::WatchpointHitType;
use super::register::debug::BreakCondition;
use super::register::debug::BreakSize;
use crate::debugger::address::RelocatedAddress;
use crate::debugger::r#async::context::TokioAnalyzeContext;
use crate::debugger::r#async::future::ParseFutureStateError;
use crate::debugger::r#async::tokio::worker::OwnedList;
use crate::debugger::error::Error::NoFunctionRanges;
use crate::debugger::error::Error::PlaceNotFound;
use crate::debugger::error::Error::ProcessExit;
use crate::debugger::utils::PopIf;
use crate::debugger::variable::dqe::{Dqe, Selector};
use crate::debugger::{Debugger, Error};
use crate::disable_when_not_stared;
use crate::weak_error;
use nix::unistd::Pid;
use std::rc::Rc;
use tokio::park::try_as_park_thread;
use tokio::worker::try_as_worker;
#[derive(Debug, Clone)]
pub struct TaskBacktrace {
pub raw_ptr: RelocatedAddress,
pub task_id: u64,
pub futures: Vec<Future>,
}
#[derive(Debug)]
pub struct AsyncBacktrace {
pub workers: Vec<Worker>,
pub block_threads: Vec<BlockThread>,
pub tasks: Rc<Vec<TaskBacktrace>>,
}
impl AsyncBacktrace {
pub fn current_task(&self) -> Option<&TaskBacktrace> {
let mb_active_block_thread = self.block_threads.iter().find(|t| t.in_focus);
if let Some(bt) = mb_active_block_thread {
Some(&bt.bt)
} else {
let active_worker = self.workers.iter().find(|t| t.in_focus)?;
let active_task_id = active_worker.active_task;
let active_task = if let Some(active_task_id) = active_task_id {
self.tasks.iter().find(|t| t.task_id == active_task_id)
} else {
active_worker.active_task_standby.as_ref()
}?;
Some(active_task)
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum AsyncError {
#[error("Backtrace for thread {0} not found")]
BacktraceShouldExist(Pid),
#[error("Parse future state: {0}")]
ParseFutureState(ParseFutureStateError),
#[error("Incorrect assumption about async runtime: {0}")]
IncorrectAssumption(&'static str),
#[error("Current task not found")]
NoCurrentTaskFound,
#[error(
"Async step are impossible cause watchpoint limit is reached (maximum 4 watchpoints), try to remove unused"
)]
NotEnoughWatchpointsForStep,
}
enum AsyncStepResult {
Done {
task_id: u64,
completed: bool,
},
SignalInterrupt {
signal: Signal,
quiet: bool,
},
WatchpointInterrupt {
pid: Pid,
addr: RelocatedAddress,
ty: WatchpointHitType,
quiet: bool,
},
#[allow(unused)]
Breakpoint {
pid: Pid,
addr: RelocatedAddress,
},
}
impl AsyncStepResult {
fn signal_interrupt_quiet(signal: Signal) -> Self {
Self::SignalInterrupt {
signal,
quiet: true,
}
}
fn signal_interrupt(signal: Signal) -> Self {
Self::SignalInterrupt {
signal,
quiet: false,
}
}
fn wp_interrupt_quite(pid: Pid, addr: RelocatedAddress, ty: WatchpointHitType) -> Self {
Self::WatchpointInterrupt {
pid,
addr,
ty,
quiet: true,
}
}
fn wp_interrupt(pid: Pid, addr: RelocatedAddress, ty: WatchpointHitType) -> Self {
Self::WatchpointInterrupt {
pid,
addr,
ty,
quiet: false,
}
}
}
impl Debugger {
pub fn async_backtrace(&mut self) -> Result<AsyncBacktrace, Error> {
let tokio_version = self.debugee.tokio_version();
disable_when_not_stared!(self);
let ecx = self.ecx().clone();
let threads = self.debugee.thread_state(&ecx)?;
let mut analyze_context = TokioAnalyzeContext::new(self, tokio_version.unwrap_or_default());
let mut backtrace = AsyncBacktrace {
workers: vec![],
block_threads: vec![],
tasks: Rc::new(vec![]),
};
let mut tasks = Rc::new(vec![]);
for thread in threads {
let worker = weak_error!(try_as_worker(&mut analyze_context, &thread));
if let Some(Some(w)) = worker {
if tasks.is_empty() {
let mut context_initialized_var = analyze_context
.debugger()
.read_variable(Dqe::Variable(Selector::by_name("CONTEXT", false)))?;
let context_initialized =
context_initialized_var
.pop_if_single_el()
.ok_or(Error::Async(AsyncError::IncorrectAssumption(
"CONTEXT not found",
)))?;
tasks = Rc::new(
OwnedList::try_extract(&analyze_context, context_initialized)?
.into_iter()
.filter_map(|t| weak_error!(t.backtrace()))
.collect(),
);
backtrace.tasks = tasks.clone();
}
backtrace.workers.push(w);
} else {
let thread = weak_error!(try_as_park_thread(&mut analyze_context, &thread));
if let Some(Some(pt)) = thread {
backtrace.block_threads.push(pt);
}
}
}
self.ecx_swap(ecx);
Ok(backtrace)
}
pub fn async_step_over(&mut self) -> Result<(), Error> {
disable_when_not_stared!(self);
self.ecx_restore_frame()?;
match self.async_step_over_any()? {
AsyncStepResult::Done { task_id, completed } => {
self.execute_on_async_step_hook(task_id, completed)?
}
AsyncStepResult::SignalInterrupt { signal, quiet } if !quiet => {
self.hooks.on_signal(signal);
}
AsyncStepResult::WatchpointInterrupt {
pid,
addr,
ref ty,
quiet,
} if !quiet => self.execute_on_watchpoint_hook(pid, addr, ty)?,
_ => {}
};
Ok(())
}
fn async_step_over_any(&mut self) -> Result<AsyncStepResult, Error> {
let ecx = self.ecx();
let mut current_location = ecx.location();
let async_bt = self.async_backtrace()?;
let current_task = async_bt
.current_task()
.ok_or(AsyncError::NoCurrentTaskFound)?;
let task_id = current_task.task_id;
let initial_task_context = self
.read_variable(Dqe::Variable(Selector::by_name("_task_context", true)))?
.pop_if_single_el()
.and_then(|qr| qr.into_value().into_raw_ptr())
.and_then(|ptr| ptr.value)
.ok_or(AsyncError::IncorrectAssumption(
"`_task_context` local variable should exist",
))? as usize;
let task_ptr = current_task.raw_ptr;
let future_is_waiter = |f: &Future| {
if let Future::TokioJoinHandleFuture(jh_f) = f {
return jh_f.wait_for_task == task_ptr;
}
false
};
let waiter_found = async_bt
.tasks
.iter()
.flat_map(|t| t.futures.iter())
.chain(
async_bt
.block_threads
.iter()
.flat_map(|thread| thread.bt.futures.iter()),
)
.any(future_is_waiter);
let waiter_wp = if waiter_found {
let (_, state_ptr) = task_header_state_value_and_ptr(self, task_ptr)?;
let state_addr = RelocatedAddress::from(state_ptr);
Some(
self.set_watchpoint_on_memory(
state_addr,
BreakSize::Bytes8,
BreakCondition::DataWrites,
true,
)?
.to_owned(),
)
} else {
None
};
let (func, info) = loop {
let dwarf = &self.debugee.debug_info(current_location.pc)?;
if let Ok(Some((func, info))) = dwarf.find_function_by_pc(current_location.global_pc) {
break (func, info);
}
match self.single_step_instruction()? {
Some(super::StopReason::SignalStop(_, sign)) => {
return Ok(AsyncStepResult::signal_interrupt(sign));
}
Some(super::StopReason::Watchpoint(pid, addr, ty)) => {
return Ok(AsyncStepResult::wp_interrupt(pid, addr, ty));
}
_ => {}
}
current_location = self.ecx().location();
};
let prolog = func.prolog()?;
let dwarf = &self.debugee.debug_info(current_location.pc)?;
let inline_ranges = func.inline_ranges();
let current_place = dwarf
.find_place_from_pc(current_location.global_pc)?
.ok_or(PlaceNotFound(current_location.global_pc))?;
let mut step_over_breakpoints = vec![];
let mut to_delete = vec![];
let mut task_completed = false;
let fn_full_name = info.full_name();
for range in func.ranges() {
let mut place = func
.unit()
.find_place_by_pc(GlobalAddress::from(range.begin))
.ok_or_else(|| NoFunctionRanges(fn_full_name.clone()))?;
while place.address.in_range(&range) {
if place.address.in_range(&prolog) {
match place.next() {
None => break,
Some(n) => place = n,
}
continue;
}
let in_inline_range = place.address.in_ranges(&inline_ranges);
if !in_inline_range
&& place.is_stmt
&& place.address != current_place.address
&& place.line_number != current_place.line_number
{
let load_addr = place
.address
.relocate_to_segment_by_pc(&self.debugee, current_location.pc)?;
if self.breakpoints.get_enabled(load_addr).is_none() {
step_over_breakpoints.push(load_addr);
to_delete.push(load_addr);
}
}
match place.next() {
None => break,
Some(n) => place = n,
}
}
}
step_over_breakpoints
.into_iter()
.try_for_each(|load_addr| {
self.breakpoints
.add_and_enable(Breakpoint::new_temporary_async(
dwarf.pathname(),
load_addr,
current_location.pid,
))
.map(|_| ())
})?;
macro_rules! clear {
() => {
to_delete.into_iter().try_for_each(|addr| {
self.remove_breakpoint(Address::Relocated(addr)).map(|_| ())
})?;
if let Some(wp) = waiter_wp {
self.remove_watchpoint_by_addr(wp.address)?;
}
};
}
loop {
let stop_reason = self.continue_execution()?;
match stop_reason {
super::StopReason::SignalStop(_, sign) => {
clear!();
return Ok(AsyncStepResult::signal_interrupt_quiet(sign));
}
super::StopReason::Watchpoint(pid, current_pc, ty) => {
let is_tmp_wp = if let WatchpointHitType::DebugRegister(ref reg) = ty {
self.watchpoints
.all()
.iter()
.any(|wp| wp.register() == Some(*reg) && wp.is_temporary())
} else {
false
};
if is_tmp_wp {
let (value, _) = task_header_state_value_and_ptr(self, task_ptr)?;
if value & tokio::types::complete_flag() == tokio::types::complete_flag() {
task_completed = true;
break;
} else {
continue;
}
} else {
clear!();
return Ok(AsyncStepResult::wp_interrupt_quite(pid, current_pc, ty));
}
}
super::StopReason::DebugeeExit(code) => {
clear!();
return Err(ProcessExit(code));
}
_ => {}
}
let mb_task_context = self
.read_variable(Dqe::Variable(Selector::by_name("_task_context", true)))?
.pop_if_single_el()
.and_then(|qr| qr.into_value().into_raw_ptr())
.and_then(|ptr| ptr.value)
.map(|val| val as usize);
let context_equals: bool = if let Some(task_context) = mb_task_context {
task_context == initial_task_context
} else {
false
};
if context_equals {
break;
}
let async_bt = self.async_backtrace()?;
if !async_bt.tasks.iter().any(|t| t.task_id == task_id) {
break;
}
let current_task = async_bt.current_task();
if current_task.map(|t| t.task_id) == Some(task_id) {
break;
}
}
clear!();
self.ecx_update_location()?;
Ok(AsyncStepResult::Done {
task_id,
completed: task_completed,
})
}
pub fn async_step_out(&mut self) -> Result<(), Error> {
disable_when_not_stared!(self);
self.ecx_restore_frame()?;
match self.step_out_task()? {
AsyncStepResult::Done { task_id, completed } => {
self.execute_on_async_step_hook(task_id, completed)?
}
AsyncStepResult::SignalInterrupt { signal, quiet } if !quiet => {
self.hooks.on_signal(signal);
}
AsyncStepResult::WatchpointInterrupt {
pid,
addr,
ref ty,
quiet,
} if !quiet => self.execute_on_watchpoint_hook(pid, addr, ty)?,
_ => {}
};
Ok(())
}
fn step_out_task(&mut self) -> Result<AsyncStepResult, Error> {
let async_bt = self.async_backtrace()?;
let current_task = async_bt
.current_task()
.ok_or(AsyncError::NoCurrentTaskFound)?;
let task_id = current_task.task_id;
let task_ptr = current_task.raw_ptr;
let (_, state_ptr) = task_header_state_value_and_ptr(self, task_ptr)?;
let state_addr = RelocatedAddress::from(state_ptr);
let wp = self
.set_watchpoint_on_memory(
state_addr,
BreakSize::Bytes8,
BreakCondition::DataWrites,
true,
)?
.to_owned();
self.breakpoints
.active_breakpoints()
.iter()
.for_each(|brkpt| {
_ = brkpt.disable();
});
macro_rules! clear {
() => {
self.breakpoints
.active_breakpoints()
.iter()
.for_each(|brkpt| {
_ = brkpt.enable();
});
self.remove_watchpoint_by_addr(wp.address)?;
};
}
loop {
let stop_reason = self.continue_execution()?;
match stop_reason {
super::StopReason::SignalStop(_, sign) => {
clear!();
return Ok(AsyncStepResult::signal_interrupt_quiet(sign));
}
super::StopReason::Watchpoint(pid, current_pc, ty) => {
let is_tmp_wp = if let WatchpointHitType::DebugRegister(ref reg) = ty {
self.watchpoints
.all()
.iter()
.any(|wp| wp.register() == Some(*reg) && wp.is_temporary())
} else {
false
};
if is_tmp_wp {
let (value, _) = task_header_state_value_and_ptr(self, task_ptr)?;
if value & tokio::types::complete_flag() == tokio::types::complete_flag() {
break;
} else {
continue;
}
} else {
self.remove_watchpoint_by_addr(wp.address)?;
return Ok(AsyncStepResult::wp_interrupt_quite(pid, current_pc, ty));
}
}
super::StopReason::DebugeeExit(code) => {
clear!();
return Err(ProcessExit(code));
}
super::StopReason::Breakpoint(_, _) => {
continue;
}
super::debugee::tracer::StopReason::NoSuchProcess(_) => {
clear!();
debug_assert!(false, "unreachable error `NoSuchProcess`");
return Err(ProcessExit(0));
}
super::debugee::tracer::StopReason::DebugeeStart => {
unreachable!()
}
}
}
clear!();
self.ecx_update_location()?;
Ok(AsyncStepResult::Done {
task_id,
completed: true,
})
}
}