from __future__ import annotations
import _colorize
from abc import ABC, abstractmethod
import ast
import code
from dataclasses import dataclass, field
import os.path
import sys
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import IO
from typing import Callable
@dataclass
class Event:
evt: str
data: str
raw: bytes = b""
@dataclass
class Console(ABC):
posxy: tuple[int, int]
screen: list[str] = field(default_factory=list)
height: int = 25
width: int = 80
def __init__(
self,
f_in: IO[bytes] | int = 0,
f_out: IO[bytes] | int = 1,
term: str = "",
encoding: str = "",
):
self.encoding = encoding or sys.getdefaultencoding()
if isinstance(f_in, int):
self.input_fd = f_in
else:
self.input_fd = f_in.fileno()
if isinstance(f_out, int):
self.output_fd = f_out
else:
self.output_fd = f_out.fileno()
@abstractmethod
def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
@abstractmethod
def prepare(self) -> None: ...
@abstractmethod
def restore(self) -> None: ...
@abstractmethod
def move_cursor(self, x: int, y: int) -> None: ...
@abstractmethod
def set_cursor_vis(self, visible: bool) -> None: ...
@abstractmethod
def getheightwidth(self) -> tuple[int, int]:
...
@abstractmethod
def get_event(self, block: bool = True) -> Event | None:
...
@abstractmethod
def push_char(self, char: int | bytes) -> None:
...
@abstractmethod
def beep(self) -> None: ...
@abstractmethod
def clear(self) -> None:
...
@abstractmethod
def finish(self) -> None:
...
@abstractmethod
def flushoutput(self) -> None:
...
@abstractmethod
def forgetinput(self) -> None:
...
@abstractmethod
def getpending(self) -> Event:
...
@abstractmethod
def wait(self, timeout: float | None) -> bool:
...
@property
def input_hook(self) -> Callable[[], int] | None:
...
@abstractmethod
def repaint(self) -> None: ...
class InteractiveColoredConsole(code.InteractiveConsole):
def __init__(
self,
locals: dict[str, object] | None = None,
filename: str = "<console>",
*,
local_exit: bool = False,
) -> None:
super().__init__(locals=locals, filename=filename, local_exit=local_exit) self.can_colorize = _colorize.can_colorize()
def showsyntaxerror(self, filename=None, **kwargs):
super().showsyntaxerror(filename=filename, **kwargs)
def _excepthook(self, typ, value, tb):
import traceback
lines = traceback.format_exception(
typ, value, tb,
colorize=self.can_colorize,
limit=traceback.BUILTIN_EXCEPTION_LIMIT)
self.write(''.join(lines))
def runsource(self, source, filename="<input>", symbol="single"):
try:
tree = self.compile.compiler(
source,
filename,
"exec",
ast.PyCF_ONLY_AST,
incomplete_input=False,
)
except (SyntaxError, OverflowError, ValueError):
self.showsyntaxerror(filename, source=source)
return False
if tree.body:
*_, last_stmt = tree.body
for stmt in tree.body:
wrapper = ast.Interactive if stmt is last_stmt else ast.Module
the_symbol = symbol if stmt is last_stmt else "exec"
item = wrapper([stmt])
try:
code = self.compile.compiler(item, filename, the_symbol)
except SyntaxError as e:
if e.args[0] == "'await' outside function":
python = os.path.basename(sys.executable)
e.add_note(
f"Try the asyncio REPL ({python} -m asyncio) to use"
f" top-level 'await' and run background asyncio tasks."
)
self.showsyntaxerror(filename, source=source)
return False
except (OverflowError, ValueError):
self.showsyntaxerror(filename, source=source)
return False
if code is None:
return True
self.runcode(code)
return False