import functools
from typing import Any, Callable, Optional, TypeVar, Union
F = TypeVar("F", bound=Callable[..., Any])
def replace_me(
since: Optional[Union[tuple[int, ...], str]] = None,
remove_in: Optional[Union[tuple[int, ...], str]] = None,
) -> Callable[[F], F]:
def function_decorator(callable: F) -> F:
import ast
import inspect
import textwrap
import warnings
from .ast_utils import substitute_parameters
deprecation_notice = ".. deprecated::"
if since:
deprecation_notice += f" {since}"
deprecation_notice += "\n This function is deprecated."
if remove_in:
deprecation_notice += f" It will be removed in version {remove_in}."
if callable.__doc__:
callable.__doc__ += "\n\n" + deprecation_notice
else:
callable.__doc__ = deprecation_notice
def emit_warning(
callable: Callable[..., Any], args: tuple[Any, ...], kwargs: dict[str, Any]
) -> None:
source = inspect.getsource(callable)
tree = ast.parse(textwrap.dedent(source))
func_def = tree.body[0]
if isinstance(func_def, (ast.FunctionDef, ast.AsyncFunctionDef)):
stmts = [
expr
for expr in func_def.body
if not isinstance(expr, ast.Expr)
or not isinstance(expr.value, ast.Constant)
]
if (
len(stmts) == 1
and isinstance(stmts[0], ast.Return)
and stmts[0].value
):
stmt = stmts[0]
assert isinstance(stmt.value, ast.expr)
replacement_expr: str = ast.unparse(stmt.value)
arg_map: dict[str, Any] = {}
func_args = (
func_def.args
if isinstance(func_def, (ast.FunctionDef, ast.AsyncFunctionDef))
else None
)
if func_args:
for i, arg in enumerate(func_args.args):
if i < len(args):
arg_map[arg.arg] = args[i]
for key, value in kwargs.items():
arg_map[key] = value
try:
expr_ast = ast.parse(replacement_expr, mode="eval").body
ast_param_map = {
name: ast.Constant(value=value)
if not isinstance(value, ast.AST)
else value
for name, value in arg_map.items()
}
result_ast = substitute_parameters(expr_ast, ast_param_map)
evaluated = ast.unparse(result_ast)
except Exception:
import logging
logger = logging.getLogger(__name__)
logger.exception(
"Failed to evaluate replacement expression for %s",
callable.__name__,
)
evaluated = replacement_expr
if since:
w = DeprecationWarning(
f"{callable!r} has been deprecated since {since}; use '{evaluated}' instead. Run 'dissolve migrate' to update your code automatically."
)
else:
w = DeprecationWarning(
f"{callable!r} has been deprecated; use '{evaluated}' instead. Run 'dissolve migrate' to update your code automatically."
)
else:
if since:
w = DeprecationWarning(
f"{callable.__name__} has been deprecated since {since}. Run 'dissolve migrate' to update your code automatically."
)
else:
w = DeprecationWarning(
f"{callable.__name__} has been deprecated. Run 'dissolve migrate' to update your code automatically."
)
else:
if since:
w = DeprecationWarning(
f"{callable.__name__} has been deprecated since {since}. Run 'dissolve migrate' to update your code automatically."
)
else:
w = DeprecationWarning(
f"{callable.__name__} has been deprecated. Run 'dissolve migrate' to update your code automatically."
)
warnings.warn(w, stacklevel=3)
if inspect.isclass(callable):
original_init = callable.__init__
def deprecated_init(self: Any, *args: Any, **kwargs: Any) -> Any:
emit_warning(callable, args, kwargs)
return original_init(self, *args, **kwargs)
callable.__init__ = deprecated_init
return callable
elif inspect.iscoroutinefunction(callable):
@functools.wraps(callable)
async def async_decorated_function(*args: Any, **kwargs: Any) -> Any:
emit_warning(callable, args, kwargs)
return await callable(*args, **kwargs)
return async_decorated_function else:
@functools.wraps(callable)
def decorated_function(*args: Any, **kwargs: Any) -> Any:
emit_warning(callable, args, kwargs)
return callable(*args, **kwargs)
return decorated_function
return function_decorator