rustpython-pylib 0.5.0

A subset of the Python standard library for use with RustPython
Documentation
import argparse
import ast
import asyncio
import asyncio.tools
import concurrent.futures
import contextvars
import inspect
import os
import site
import sys
import threading
import types
import warnings

from _colorize import get_theme
from _pyrepl.console import InteractiveColoredConsole

from . import futures


class AsyncIOInteractiveConsole(InteractiveColoredConsole):

    def __init__(self, locals, loop):
        super().__init__(locals, filename="<stdin>")
        self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT

        self.loop = loop
        self.context = contextvars.copy_context()

    def runcode(self, code):
        global return_code
        future = concurrent.futures.Future()

        def callback():
            global return_code
            global repl_future
            global keyboard_interrupted

            repl_future = None
            keyboard_interrupted = False

            func = types.FunctionType(code, self.locals)
            try:
                coro = func()
            except SystemExit as se:
                return_code = se.code
                self.loop.stop()
                return
            except KeyboardInterrupt as ex:
                keyboard_interrupted = True
                future.set_exception(ex)
                return
            except BaseException as ex:
                future.set_exception(ex)
                return

            if not inspect.iscoroutine(coro):
                future.set_result(coro)
                return

            try:
                repl_future = self.loop.create_task(coro, context=self.context)
                futures._chain_future(repl_future, future)
            except BaseException as exc:
                future.set_exception(exc)

        self.loop.call_soon_threadsafe(callback, context=self.context)

        try:
            return future.result()
        except SystemExit as se:
            return_code = se.code
            self.loop.stop()
            return
        except BaseException:
            if keyboard_interrupted:
                if not CAN_USE_PYREPL:
                    self.write("\nKeyboardInterrupt\n")
            else:
                self.showtraceback()
            return self.STATEMENT_FAILED

class REPLThread(threading.Thread):

    def run(self):
        global return_code

        try:
            banner = (
                f'asyncio REPL {sys.version} on {sys.platform}\n'
                f'Use "await" directly instead of "asyncio.run()".\n'
                f'Type "help", "copyright", "credits" or "license" '
                f'for more information.\n'
            )

            console.write(banner)

            if startup_path := os.getenv("PYTHONSTARTUP"):
                sys.audit("cpython.run_startup", startup_path)

                import tokenize
                with tokenize.open(startup_path) as f:
                    startup_code = compile(f.read(), startup_path, "exec")
                    exec(startup_code, console.locals)

            ps1 = getattr(sys, "ps1", ">>> ")
            if CAN_USE_PYREPL:
                theme = get_theme().syntax
                ps1 = f"{theme.prompt}{ps1}{theme.reset}"
            console.write(f"{ps1}import asyncio\n")

            if CAN_USE_PYREPL:
                from _pyrepl.simple_interact import (
                    run_multiline_interactive_console,
                )
                try:
                    run_multiline_interactive_console(console)
                except SystemExit:
                    # expected via the `exit` and `quit` commands
                    pass
                except BaseException:
                    # unexpected issue
                    console.showtraceback()
                    console.write("Internal error, ")
                    return_code = 1
            else:
                console.interact(banner="", exitmsg="")
        finally:
            warnings.filterwarnings(
                'ignore',
                message=r'^coroutine .* was never awaited$',
                category=RuntimeWarning)

            loop.call_soon_threadsafe(loop.stop)

    def interrupt(self) -> None:
        if not CAN_USE_PYREPL:
            return

        from _pyrepl.simple_interact import _get_reader
        r = _get_reader()
        if r.threading_hook is not None:
            r.threading_hook.add("")  # type: ignore


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        prog="python3 -m asyncio",
        description="Interactive asyncio shell and CLI tools",
        color=True,
    )
    subparsers = parser.add_subparsers(help="sub-commands", dest="command")
    ps = subparsers.add_parser(
        "ps", help="Display a table of all pending tasks in a process"
    )
    ps.add_argument("pid", type=int, help="Process ID to inspect")
    pstree = subparsers.add_parser(
        "pstree", help="Display a tree of all pending tasks in a process"
    )
    pstree.add_argument("pid", type=int, help="Process ID to inspect")
    args = parser.parse_args()
    match args.command:
        case "ps":
            asyncio.tools.display_awaited_by_tasks_table(args.pid)
            sys.exit(0)
        case "pstree":
            asyncio.tools.display_awaited_by_tasks_tree(args.pid)
            sys.exit(0)
        case None:
            pass  # continue to the interactive shell
        case _:
            # shouldn't happen as an invalid command-line wouldn't parse
            # but let's keep it for the next person adding a command
            print(f"error: unhandled command {args.command}", file=sys.stderr)
            parser.print_usage(file=sys.stderr)
            sys.exit(1)

    sys.audit("cpython.run_stdin")

    if os.getenv('PYTHON_BASIC_REPL'):
        CAN_USE_PYREPL = False
    else:
        from _pyrepl.main import CAN_USE_PYREPL

    return_code = 0
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    repl_locals = {'asyncio': asyncio}
    for key in {'__name__', '__package__',
                '__loader__', '__spec__',
                '__builtins__', '__file__'}:
        repl_locals[key] = locals()[key]

    console = AsyncIOInteractiveConsole(repl_locals, loop)

    repl_future = None
    keyboard_interrupted = False

    try:
        import readline  # NoQA
    except ImportError:
        readline = None

    interactive_hook = getattr(sys, "__interactivehook__", None)

    if interactive_hook is not None:
        sys.audit("cpython.run_interactivehook", interactive_hook)
        interactive_hook()

    if interactive_hook is site.register_readline:
        # Fix the completer function to use the interactive console locals
        try:
            import rlcompleter
        except:
            pass
        else:
            if readline is not None:
                completer = rlcompleter.Completer(console.locals)
                readline.set_completer(completer.complete)

    repl_thread = REPLThread(name="Interactive thread")
    repl_thread.daemon = True
    repl_thread.start()

    while True:
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            keyboard_interrupted = True
            if repl_future and not repl_future.done():
                repl_future.cancel()
            repl_thread.interrupt()
            continue
        else:
            break

    console.write('exiting asyncio REPL...\n')
    sys.exit(return_code)