import dataclasses
import io
import sys
import types
from . import events
from . import futures
from . import tasks
__all__ = (
'capture_call_graph',
'format_call_graph',
'print_call_graph',
'FrameCallGraphEntry',
'FutureCallGraph',
)
@dataclasses.dataclass(frozen=True, slots=True)
class FrameCallGraphEntry:
frame: types.FrameType
@dataclasses.dataclass(frozen=True, slots=True)
class FutureCallGraph:
future: futures.Future
call_stack: tuple["FrameCallGraphEntry", ...]
awaited_by: tuple["FutureCallGraph", ...]
def _build_graph_for_future(
future: futures.Future,
*,
limit: int | None = None,
) -> FutureCallGraph:
if not isinstance(future, futures.Future):
raise TypeError(
f"{future!r} object does not appear to be compatible "
f"with asyncio.Future"
)
coro = None
if get_coro := getattr(future, 'get_coro', None):
coro = get_coro() if limit != 0 else None
st: list[FrameCallGraphEntry] = []
awaited_by: list[FutureCallGraph] = []
while coro is not None:
if hasattr(coro, 'cr_await'):
st.append(FrameCallGraphEntry(coro.cr_frame))
coro = coro.cr_await
elif hasattr(coro, 'ag_await'):
st.append(FrameCallGraphEntry(coro.cr_frame))
coro = coro.ag_await
else:
break
if future._asyncio_awaited_by:
for parent in future._asyncio_awaited_by:
awaited_by.append(_build_graph_for_future(parent, limit=limit))
if limit is not None:
if limit > 0:
st = st[:limit]
elif limit < 0:
st = st[limit:]
st.reverse()
return FutureCallGraph(future, tuple(st), tuple(awaited_by))
def capture_call_graph(
future: futures.Future | None = None,
/,
*,
depth: int = 1,
limit: int | None = None,
) -> FutureCallGraph | None:
loop = events._get_running_loop()
if future is not None:
if loop is None or future is not tasks.current_task(loop=loop):
return _build_graph_for_future(future, limit=limit)
else:
if loop is None:
raise RuntimeError(
'capture_call_graph() is called outside of a running '
'event loop and no *future* to introspect was provided')
future = tasks.current_task(loop=loop)
if future is None:
return None
if not isinstance(future, futures.Future):
raise TypeError(
f"{future!r} object does not appear to be compatible "
f"with asyncio.Future"
)
call_stack: list[FrameCallGraphEntry] = []
f = sys._getframe(depth) if limit != 0 else None
try:
while f is not None:
is_async = f.f_generator is not None
call_stack.append(FrameCallGraphEntry(f))
if is_async:
if f.f_back is not None and f.f_back.f_generator is None:
break
f = f.f_back
finally:
del f
awaited_by = []
if future._asyncio_awaited_by:
for parent in future._asyncio_awaited_by:
awaited_by.append(_build_graph_for_future(parent, limit=limit))
if limit is not None:
limit *= -1
if limit > 0:
call_stack = call_stack[:limit]
elif limit < 0:
call_stack = call_stack[limit:]
return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by))
def format_call_graph(
future: futures.Future | None = None,
/,
*,
depth: int = 1,
limit: int | None = None,
) -> str:
def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None:
def add_line(line: str) -> None:
buf.append(level * ' ' + line)
if isinstance(st.future, tasks.Task):
add_line(
f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})'
)
else:
add_line(
f'* Future(id={id(st.future):#x})'
)
if st.call_stack:
add_line(
f' + Call stack:'
)
for ste in st.call_stack:
f = ste.frame
if f.f_generator is None:
f = ste.frame
add_line(
f' | File {f.f_code.co_filename!r},'
f' line {f.f_lineno}, in'
f' {f.f_code.co_qualname}()'
)
else:
c = f.f_generator
try:
f = c.cr_frame
code = c.cr_code
tag = 'async'
except AttributeError:
try:
f = c.ag_frame
code = c.ag_code
tag = 'async generator'
except AttributeError:
f = c.gi_frame
code = c.gi_code
tag = 'generator'
add_line(
f' | File {f.f_code.co_filename!r},'
f' line {f.f_lineno}, in'
f' {tag} {code.co_qualname}()'
)
if st.awaited_by:
add_line(
f' + Awaited by:'
)
for fut in st.awaited_by:
render_level(fut, buf, level + 1)
graph = capture_call_graph(future, depth=depth + 1, limit=limit)
if graph is None:
return ""
buf: list[str] = []
try:
render_level(graph, buf, 0)
finally:
del graph
return '\n'.join(buf)
def print_call_graph(
future: futures.Future | None = None,
/,
*,
file: io.Writer[str] | None = None,
depth: int = 1,
limit: int | None = None,
) -> None:
print(format_call_graph(future, depth=depth, limit=limit), file=file)