pub use typevar::*;
#[pymodule(sub)]
pub(crate) mod typevar {
use crate::{
AsObject, Context, Py, PyObject, PyObjectRef, PyPayload, PyRef, PyResult, VirtualMachine,
builtins::{PyTuple, PyTupleRef, PyType, PyTypeRef, make_union},
common::lock::PyMutex,
function::{FuncArgs, PyComparisonValue},
protocol::PyNumberMethods,
stdlib::_typing::{call_typing_func_object, decl::const_evaluator_alloc},
types::{AsNumber, Comparable, Constructor, Iterable, PyComparisonOp, Representable},
};
fn type_check(arg: PyObjectRef, msg: &str, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
if vm.is_none(&arg) {
return Ok(arg.class().to_owned().into());
}
let message_str: PyObjectRef = vm.ctx.new_str(msg).into();
call_typing_func_object(vm, "_type_check", (arg, message_str))
}
fn variance_repr(
name: &str,
infer_variance: bool,
covariant: bool,
contravariant: bool,
) -> String {
if infer_variance {
return name.to_owned();
}
let prefix = if covariant {
'+'
} else if contravariant {
'-'
} else {
'~'
};
format!("{prefix}{name}")
}
fn caller(vm: &VirtualMachine) -> Option<PyObjectRef> {
let frame = vm.current_frame()?;
frame.globals.get_item("__name__", vm).ok()
}
fn set_module_from_caller(obj: &PyObject, vm: &VirtualMachine) -> PyResult<()> {
let module_value: PyObjectRef = if let Some(module_name) = caller(vm) {
if let Ok(name_str) = module_name.str(vm)
&& let Some(name) = name_str.to_str()
&& (name == "builtins" || name.starts_with('<'))
{
return Ok(());
}
module_name
} else {
vm.ctx.none()
};
obj.set_attr("__module__", module_value, vm)?;
Ok(())
}
#[pyattr]
#[pyclass(name = "TypeVar", module = "typing")]
#[derive(Debug, PyPayload)]
#[allow(dead_code)]
pub struct TypeVar {
name: PyObjectRef, bound: PyMutex<PyObjectRef>,
evaluate_bound: PyObjectRef,
constraints: PyMutex<PyObjectRef>,
evaluate_constraints: PyObjectRef,
default_value: PyMutex<PyObjectRef>,
evaluate_default: PyMutex<PyObjectRef>,
covariant: bool,
contravariant: bool,
infer_variance: bool,
}
#[pyclass(
flags(HAS_DICT, HAS_WEAKREF),
with(AsNumber, Constructor, Representable)
)]
impl TypeVar {
#[pymethod]
fn __mro_entries__(&self, _bases: PyObjectRef, vm: &VirtualMachine) -> PyResult {
Err(vm.new_type_error("Cannot subclass an instance of TypeVar"))
}
#[pygetset]
fn __name__(&self) -> PyObjectRef {
self.name.clone()
}
#[pygetset]
fn __constraints__(&self, vm: &VirtualMachine) -> PyResult {
let mut constraints = self.constraints.lock();
if !vm.is_none(&constraints) {
return Ok(constraints.clone());
}
let r = if !vm.is_none(&self.evaluate_constraints) {
*constraints = self.evaluate_constraints.call((1i32,), vm)?;
constraints.clone()
} else {
vm.ctx.empty_tuple.clone().into()
};
Ok(r)
}
#[pygetset]
fn __bound__(&self, vm: &VirtualMachine) -> PyResult {
let mut bound = self.bound.lock();
if !vm.is_none(&bound) {
return Ok(bound.clone());
}
let r = if !vm.is_none(&self.evaluate_bound) {
*bound = self.evaluate_bound.call((1i32,), vm)?;
bound.clone()
} else {
vm.ctx.none()
};
Ok(r)
}
#[pygetset]
const fn __covariant__(&self) -> bool {
self.covariant
}
#[pygetset]
const fn __contravariant__(&self) -> bool {
self.contravariant
}
#[pygetset]
const fn __infer_variance__(&self) -> bool {
self.infer_variance
}
#[pygetset]
fn __default__(&self, vm: &VirtualMachine) -> PyResult {
{
let default_value = self.default_value.lock();
if !default_value.is(&vm.ctx.typing_no_default) {
return Ok(default_value.clone());
}
}
let evaluator = self.evaluate_default.lock().clone();
if !vm.is_none(&evaluator) {
let result = evaluator.call((1i32,), vm)?;
*self.default_value.lock() = result.clone();
Ok(result)
} else {
Ok(vm.ctx.typing_no_default.clone().into())
}
}
#[pygetset]
fn evaluate_bound(&self, vm: &VirtualMachine) -> PyResult {
if !vm.is_none(&self.evaluate_bound) {
return Ok(self.evaluate_bound.clone());
}
let bound = self.bound.lock();
if !vm.is_none(&bound) {
return Ok(const_evaluator_alloc(bound.clone(), vm));
}
Ok(vm.ctx.none())
}
#[pygetset]
fn evaluate_constraints(&self, vm: &VirtualMachine) -> PyResult {
if !vm.is_none(&self.evaluate_constraints) {
return Ok(self.evaluate_constraints.clone());
}
let constraints = self.constraints.lock();
if !vm.is_none(&constraints) {
return Ok(const_evaluator_alloc(constraints.clone(), vm));
}
Ok(vm.ctx.none())
}
#[pygetset]
fn evaluate_default(&self, vm: &VirtualMachine) -> PyResult {
let evaluator = self.evaluate_default.lock().clone();
if !vm.is_none(&evaluator) {
return Ok(evaluator);
}
let default_value = self.default_value.lock().clone();
if !default_value.is(&vm.ctx.typing_no_default) {
return Ok(const_evaluator_alloc(default_value, vm));
}
Ok(vm.ctx.none())
}
#[pymethod]
fn __typing_subst__(
zelf: crate::PyRef<Self>,
arg: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult {
let self_obj: PyObjectRef = zelf.into();
call_typing_func_object(vm, "_typevar_subst", (self_obj, arg))
}
#[pymethod]
fn __reduce__(&self) -> PyObjectRef {
self.name.clone()
}
#[pymethod]
fn has_default(&self, vm: &VirtualMachine) -> bool {
if !vm.is_none(&self.evaluate_default.lock()) {
return true;
}
let default_value = self.default_value.lock();
!default_value.is(&vm.ctx.typing_no_default)
}
#[pymethod]
fn __typing_prepare_subst__(
zelf: crate::PyRef<Self>,
alias: PyObjectRef,
args: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult {
let args_tuple =
if let Ok(tuple) = args.try_to_ref::<rustpython_vm::builtins::PyTuple>(vm) {
tuple
} else {
return Ok(args);
};
let parameters = alias.get_attr(identifier!(vm, __parameters__), vm)?;
let params_tuple: PyTupleRef = parameters.try_into_value(vm)?;
let self_obj: PyObjectRef = zelf.to_owned().into();
let param_index = params_tuple.iter().position(|p| p.is(&self_obj));
if let Some(index) = param_index {
if args_tuple.len() <= index && zelf.has_default(vm) {
let mut new_args: Vec<PyObjectRef> = args_tuple.iter().cloned().collect();
while new_args.len() <= index {
if new_args.len() == index {
let default_val = zelf.__default__(vm)?;
new_args.push(default_val);
} else {
break;
}
}
return Ok(rustpython_vm::builtins::PyTuple::new_ref(new_args, &vm.ctx).into());
}
}
Ok(args)
}
}
impl Representable for TypeVar {
#[inline(always)]
fn repr_str(zelf: &crate::Py<Self>, vm: &VirtualMachine) -> PyResult<String> {
let name = zelf.name.str_utf8(vm)?;
Ok(variance_repr(
name.as_str(),
zelf.infer_variance,
zelf.covariant,
zelf.contravariant,
))
}
}
impl AsNumber for TypeVar {
fn as_number() -> &'static PyNumberMethods {
static AS_NUMBER: PyNumberMethods = PyNumberMethods {
or: Some(|a, b, vm| {
let args = PyTuple::new_ref(vec![a.to_owned(), b.to_owned()], &vm.ctx);
make_union(&args, vm)
}),
..PyNumberMethods::NOT_IMPLEMENTED
};
&AS_NUMBER
}
}
impl Constructor for TypeVar {
type Args = FuncArgs;
fn slot_new(cls: PyTypeRef, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
let typevar = <Self as Constructor>::py_new(&cls, args, vm)?;
let obj = typevar.into_ref_with_type(vm, cls)?;
let obj_ref: PyObjectRef = obj.into();
set_module_from_caller(&obj_ref, vm)?;
Ok(obj_ref)
}
fn py_new(_cls: &Py<PyType>, args: Self::Args, vm: &VirtualMachine) -> PyResult<Self> {
let mut kwargs = args.kwargs;
let (name, constraints) = if args.args.is_empty() {
if let Some(name) = kwargs.swap_remove("name") {
(name, vec![])
} else {
return Err(
vm.new_type_error("TypeVar() missing required argument: 'name' (pos 1)")
);
}
} else if args.args.len() == 1 {
(args.args[0].clone(), vec![])
} else {
let name = args.args[0].clone();
let constraints = args.args[1..].to_vec();
(name, constraints)
};
let bound = kwargs.swap_remove("bound");
let covariant = kwargs
.swap_remove("covariant")
.map(|v| v.try_to_bool(vm))
.transpose()?
.unwrap_or(false);
let contravariant = kwargs
.swap_remove("contravariant")
.map(|v| v.try_to_bool(vm))
.transpose()?
.unwrap_or(false);
let infer_variance = kwargs
.swap_remove("infer_variance")
.map(|v| v.try_to_bool(vm))
.transpose()?
.unwrap_or(false);
let default = kwargs.swap_remove("default");
if !kwargs.is_empty() {
let unexpected_keys: Vec<String> = kwargs.keys().map(|s| s.to_string()).collect();
return Err(vm.new_type_error(format!(
"TypeVar() got unexpected keyword argument(s): {}",
unexpected_keys.join(", ")
)));
}
if covariant && contravariant {
return Err(vm.new_value_error("Bivariant type variables are not supported."));
}
if infer_variance && (covariant || contravariant) {
return Err(vm.new_value_error("Variance cannot be specified with infer_variance"));
}
let (constraints_obj, evaluate_constraints) = if !constraints.is_empty() {
if constraints.len() == 1 {
return Err(vm.new_type_error("A single constraint is not allowed"));
}
if bound.is_some() {
return Err(vm.new_type_error("Constraints cannot be used with bound"));
}
let constraints_tuple = vm.ctx.new_tuple(constraints);
(constraints_tuple.into(), vm.ctx.none())
} else {
(vm.ctx.none(), vm.ctx.none())
};
let (bound_obj, evaluate_bound) = if let Some(bound) = bound {
if vm.is_none(&bound) {
(vm.ctx.none(), vm.ctx.none())
} else {
let bound = type_check(bound, "Bound must be a type.", vm)?;
(bound, vm.ctx.none())
}
} else {
(vm.ctx.none(), vm.ctx.none())
};
let (default_value, evaluate_default) = if let Some(default) = default {
(default, vm.ctx.none())
} else {
(vm.ctx.typing_no_default.clone().into(), vm.ctx.none())
};
Ok(Self {
name,
bound: PyMutex::new(bound_obj),
evaluate_bound,
constraints: PyMutex::new(constraints_obj),
evaluate_constraints,
default_value: PyMutex::new(default_value),
evaluate_default: PyMutex::new(evaluate_default),
covariant,
contravariant,
infer_variance,
})
}
}
impl TypeVar {
pub fn new(
vm: &VirtualMachine,
name: PyObjectRef,
evaluate_bound: PyObjectRef,
evaluate_constraints: PyObjectRef,
) -> Self {
Self {
name,
bound: PyMutex::new(vm.ctx.none()),
evaluate_bound,
constraints: PyMutex::new(vm.ctx.none()),
evaluate_constraints,
default_value: PyMutex::new(vm.ctx.typing_no_default.clone().into()),
evaluate_default: PyMutex::new(vm.ctx.none()),
covariant: false,
contravariant: false,
infer_variance: true,
}
}
}
#[pyattr]
#[pyclass(name = "ParamSpec", module = "typing")]
#[derive(Debug, PyPayload)]
#[allow(dead_code)]
pub struct ParamSpec {
name: PyObjectRef,
bound: Option<PyObjectRef>,
default_value: PyMutex<PyObjectRef>,
evaluate_default: PyMutex<PyObjectRef>,
covariant: bool,
contravariant: bool,
infer_variance: bool,
}
#[pyclass(
flags(HAS_DICT, HAS_WEAKREF),
with(AsNumber, Constructor, Representable)
)]
impl ParamSpec {
#[pymethod]
fn __mro_entries__(&self, _bases: PyObjectRef, vm: &VirtualMachine) -> PyResult {
Err(vm.new_type_error("Cannot subclass an instance of ParamSpec"))
}
#[pygetset]
fn __name__(&self) -> PyObjectRef {
self.name.clone()
}
#[pygetset]
fn args(zelf: crate::PyRef<Self>, vm: &VirtualMachine) -> PyResult {
let self_obj: PyObjectRef = zelf.into();
let psa = ParamSpecArgs {
__origin__: self_obj,
};
Ok(psa.into_ref(&vm.ctx).into())
}
#[pygetset]
fn kwargs(zelf: crate::PyRef<Self>, vm: &VirtualMachine) -> PyResult {
let self_obj: PyObjectRef = zelf.into();
let psk = ParamSpecKwargs {
__origin__: self_obj,
};
Ok(psk.into_ref(&vm.ctx).into())
}
#[pygetset]
fn __bound__(&self, vm: &VirtualMachine) -> PyObjectRef {
if let Some(bound) = self.bound.clone() {
return bound;
}
vm.ctx.none()
}
#[pygetset]
const fn __covariant__(&self) -> bool {
self.covariant
}
#[pygetset]
const fn __contravariant__(&self) -> bool {
self.contravariant
}
#[pygetset]
const fn __infer_variance__(&self) -> bool {
self.infer_variance
}
#[pygetset]
fn __default__(&self, vm: &VirtualMachine) -> PyResult {
{
let default_value = self.default_value.lock();
if !default_value.is(&vm.ctx.typing_no_default) {
return Ok(default_value.clone());
}
}
let evaluator = self.evaluate_default.lock().clone();
if !vm.is_none(&evaluator) {
let result = evaluator.call((1i32,), vm)?;
*self.default_value.lock() = result.clone();
Ok(result)
} else {
Ok(vm.ctx.typing_no_default.clone().into())
}
}
#[pygetset]
fn evaluate_default(&self, vm: &VirtualMachine) -> PyResult {
let evaluator = self.evaluate_default.lock().clone();
if !vm.is_none(&evaluator) {
return Ok(evaluator);
}
let default_value = self.default_value.lock().clone();
if !default_value.is(&vm.ctx.typing_no_default) {
return Ok(const_evaluator_alloc(default_value, vm));
}
Ok(vm.ctx.none())
}
#[pymethod]
fn __reduce__(&self) -> PyResult {
Ok(self.name.clone())
}
#[pymethod]
fn has_default(&self, vm: &VirtualMachine) -> bool {
if !vm.is_none(&self.evaluate_default.lock()) {
return true;
}
!self.default_value.lock().is(&vm.ctx.typing_no_default)
}
#[pymethod]
fn __typing_subst__(
zelf: crate::PyRef<Self>,
arg: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult {
let self_obj: PyObjectRef = zelf.into();
call_typing_func_object(vm, "_paramspec_subst", (self_obj, arg))
}
#[pymethod]
fn __typing_prepare_subst__(
zelf: crate::PyRef<Self>,
alias: PyObjectRef,
args: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult {
let self_obj: PyObjectRef = zelf.into();
call_typing_func_object(vm, "_paramspec_prepare_subst", (self_obj, alias, args))
}
}
impl AsNumber for ParamSpec {
fn as_number() -> &'static PyNumberMethods {
static AS_NUMBER: PyNumberMethods = PyNumberMethods {
or: Some(|a, b, vm| {
let args = PyTuple::new_ref(vec![a.to_owned(), b.to_owned()], &vm.ctx);
make_union(&args, vm)
}),
..PyNumberMethods::NOT_IMPLEMENTED
};
&AS_NUMBER
}
}
impl Constructor for ParamSpec {
type Args = FuncArgs;
fn slot_new(cls: PyTypeRef, args: Self::Args, vm: &VirtualMachine) -> PyResult {
let mut kwargs = args.kwargs;
let name = if args.args.is_empty() {
if let Some(name) = kwargs.swap_remove("name") {
name
} else {
return Err(
vm.new_type_error("ParamSpec() missing required argument: 'name' (pos 1)")
);
}
} else if args.args.len() == 1 {
args.args[0].clone()
} else {
return Err(vm.new_type_error("ParamSpec() takes at most 1 positional argument"));
};
let bound = kwargs
.swap_remove("bound")
.map(|b| type_check(b, "Bound must be a type.", vm))
.transpose()?;
let covariant = kwargs
.swap_remove("covariant")
.map(|v| v.try_to_bool(vm))
.transpose()?
.unwrap_or(false);
let contravariant = kwargs
.swap_remove("contravariant")
.map(|v| v.try_to_bool(vm))
.transpose()?
.unwrap_or(false);
let infer_variance = kwargs
.swap_remove("infer_variance")
.map(|v| v.try_to_bool(vm))
.transpose()?
.unwrap_or(false);
let default = kwargs.swap_remove("default");
if !kwargs.is_empty() {
let unexpected_keys: Vec<String> = kwargs.keys().map(|s| s.to_string()).collect();
return Err(vm.new_type_error(format!(
"ParamSpec() got unexpected keyword argument(s): {}",
unexpected_keys.join(", ")
)));
}
if covariant && contravariant {
return Err(vm.new_value_error("Bivariant type variables are not supported."));
}
if infer_variance && (covariant || contravariant) {
return Err(vm.new_value_error("Variance cannot be specified with infer_variance"));
}
let default_value = default.unwrap_or_else(|| vm.ctx.typing_no_default.clone().into());
let paramspec = Self {
name,
bound,
default_value: PyMutex::new(default_value),
evaluate_default: PyMutex::new(vm.ctx.none()),
covariant,
contravariant,
infer_variance,
};
let obj = paramspec.into_ref_with_type(vm, cls)?;
let obj_ref: PyObjectRef = obj.into();
set_module_from_caller(&obj_ref, vm)?;
Ok(obj_ref)
}
fn py_new(_cls: &Py<PyType>, _args: Self::Args, _vm: &VirtualMachine) -> PyResult<Self> {
unimplemented!("use slot_new")
}
}
impl Representable for ParamSpec {
#[inline(always)]
fn repr_str(zelf: &crate::Py<Self>, vm: &VirtualMachine) -> PyResult<String> {
let name = zelf.__name__().str_utf8(vm)?;
Ok(variance_repr(
name.as_str(),
zelf.infer_variance,
zelf.covariant,
zelf.contravariant,
))
}
}
impl ParamSpec {
pub fn new(name: PyObjectRef, vm: &VirtualMachine) -> Self {
Self {
name,
bound: None,
default_value: PyMutex::new(vm.ctx.typing_no_default.clone().into()),
evaluate_default: PyMutex::new(vm.ctx.none()),
covariant: false,
contravariant: false,
infer_variance: true,
}
}
}
#[pyattr]
#[pyclass(name = "TypeVarTuple", module = "typing")]
#[derive(Debug, PyPayload)]
#[allow(dead_code)]
pub struct TypeVarTuple {
name: PyObjectRef,
default_value: PyMutex<PyObjectRef>,
evaluate_default: PyMutex<PyObjectRef>,
}
#[pyclass(
flags(HAS_DICT, HAS_WEAKREF),
with(Constructor, Representable, Iterable)
)]
impl TypeVarTuple {
#[pygetset]
fn __name__(&self) -> PyObjectRef {
self.name.clone()
}
#[pygetset]
fn __default__(&self, vm: &VirtualMachine) -> PyResult {
{
let default_value = self.default_value.lock();
if !default_value.is(&vm.ctx.typing_no_default) {
return Ok(default_value.clone());
}
}
let evaluator = self.evaluate_default.lock().clone();
if !vm.is_none(&evaluator) {
let result = evaluator.call((1i32,), vm)?;
*self.default_value.lock() = result.clone();
Ok(result)
} else {
Ok(vm.ctx.typing_no_default.clone().into())
}
}
#[pygetset]
fn evaluate_default(&self, vm: &VirtualMachine) -> PyResult {
let evaluator = self.evaluate_default.lock().clone();
if !vm.is_none(&evaluator) {
return Ok(evaluator);
}
let default_value = self.default_value.lock().clone();
if !default_value.is(&vm.ctx.typing_no_default) {
return Ok(const_evaluator_alloc(default_value, vm));
}
Ok(vm.ctx.none())
}
#[pymethod]
fn has_default(&self, vm: &VirtualMachine) -> bool {
if !vm.is_none(&self.evaluate_default.lock()) {
return true;
}
let default_value = self.default_value.lock();
!default_value.is(&vm.ctx.typing_no_default)
}
#[pymethod]
fn __reduce__(&self) -> PyObjectRef {
self.name.clone()
}
#[pymethod]
fn __mro_entries__(&self, _bases: PyObjectRef, vm: &VirtualMachine) -> PyResult {
Err(vm.new_type_error("Cannot subclass an instance of TypeVarTuple"))
}
#[pymethod]
fn __typing_subst__(&self, _arg: PyObjectRef, vm: &VirtualMachine) -> PyResult {
Err(vm.new_type_error("Substitution of bare TypeVarTuple is not supported"))
}
#[pymethod]
fn __typing_prepare_subst__(
zelf: crate::PyRef<Self>,
alias: PyObjectRef,
args: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult {
let self_obj: PyObjectRef = zelf.into();
call_typing_func_object(vm, "_typevartuple_prepare_subst", (self_obj, alias, args))
}
}
impl Iterable for TypeVarTuple {
fn iter(zelf: PyRef<Self>, vm: &VirtualMachine) -> PyResult {
let typing = vm.import("typing", 0)?;
let unpack = typing.get_attr("Unpack", vm)?;
let zelf_obj: PyObjectRef = zelf.into();
let unpacked = vm.call_method(&unpack, "__getitem__", (zelf_obj,))?;
let list = vm.ctx.new_list(vec![unpacked]);
let list_obj: PyObjectRef = list.into();
vm.call_method(&list_obj, "__iter__", ())
}
}
impl Constructor for TypeVarTuple {
type Args = FuncArgs;
fn slot_new(cls: PyTypeRef, args: Self::Args, vm: &VirtualMachine) -> PyResult {
let mut kwargs = args.kwargs;
let name = if args.args.is_empty() {
if let Some(name) = kwargs.swap_remove("name") {
name
} else {
return Err(vm.new_type_error(
"TypeVarTuple() missing required argument: 'name' (pos 1)",
));
}
} else if args.args.len() == 1 {
args.args[0].clone()
} else {
return Err(vm.new_type_error("TypeVarTuple() takes at most 1 positional argument"));
};
let default = kwargs.swap_remove("default");
if !kwargs.is_empty() {
let unexpected_keys: Vec<String> = kwargs.keys().map(|s| s.to_string()).collect();
return Err(vm.new_type_error(format!(
"TypeVarTuple() got unexpected keyword argument(s): {}",
unexpected_keys.join(", ")
)));
}
let (default_value, evaluate_default) = if let Some(default) = default {
(default, vm.ctx.none())
} else {
(vm.ctx.typing_no_default.clone().into(), vm.ctx.none())
};
let typevartuple = Self {
name,
default_value: PyMutex::new(default_value),
evaluate_default: PyMutex::new(evaluate_default),
};
let obj = typevartuple.into_ref_with_type(vm, cls)?;
let obj_ref: PyObjectRef = obj.into();
set_module_from_caller(&obj_ref, vm)?;
Ok(obj_ref)
}
fn py_new(_cls: &Py<PyType>, _args: Self::Args, _vm: &VirtualMachine) -> PyResult<Self> {
unimplemented!("use slot_new")
}
}
impl Representable for TypeVarTuple {
#[inline(always)]
fn repr_str(zelf: &crate::Py<Self>, vm: &VirtualMachine) -> PyResult<String> {
let name = zelf.name.str(vm)?;
Ok(name.to_string())
}
}
impl TypeVarTuple {
pub fn new(name: PyObjectRef, vm: &VirtualMachine) -> Self {
Self {
name,
default_value: PyMutex::new(vm.ctx.typing_no_default.clone().into()),
evaluate_default: PyMutex::new(vm.ctx.none()),
}
}
}
#[pyattr]
#[pyclass(name = "ParamSpecArgs", module = "typing")]
#[derive(Debug, PyPayload)]
#[allow(dead_code)]
pub struct ParamSpecArgs {
__origin__: PyObjectRef,
}
#[pyclass(with(Constructor, Representable, Comparable), flags(HAS_WEAKREF))]
impl ParamSpecArgs {
#[pymethod]
fn __mro_entries__(&self, _bases: PyObjectRef, vm: &VirtualMachine) -> PyResult {
Err(vm.new_type_error("Cannot subclass an instance of ParamSpecArgs"))
}
#[pygetset]
fn __origin__(&self) -> PyObjectRef {
self.__origin__.clone()
}
}
impl Constructor for ParamSpecArgs {
type Args = (PyObjectRef,);
fn py_new(_cls: &Py<PyType>, args: Self::Args, _vm: &VirtualMachine) -> PyResult<Self> {
let origin = args.0;
Ok(Self { __origin__: origin })
}
}
impl Representable for ParamSpecArgs {
#[inline(always)]
fn repr_str(zelf: &crate::Py<Self>, vm: &VirtualMachine) -> PyResult<String> {
if let Ok(name) = zelf.__origin__.get_attr("__name__", vm) {
return Ok(format!("{name}.args", name = name.str(vm)?));
}
Ok(format!("{:?}.args", zelf.__origin__))
}
}
impl Comparable for ParamSpecArgs {
fn cmp(
zelf: &crate::Py<Self>,
other: &PyObject,
op: PyComparisonOp,
vm: &VirtualMachine,
) -> PyResult<PyComparisonValue> {
op.eq_only(|| {
if other.class().is(zelf.class())
&& let Some(other_args) = other.downcast_ref::<ParamSpecArgs>()
{
let eq = zelf.__origin__.rich_compare_bool(
&other_args.__origin__,
PyComparisonOp::Eq,
vm,
)?;
return Ok(PyComparisonValue::Implemented(eq));
}
Ok(PyComparisonValue::NotImplemented)
})
}
}
#[pyattr]
#[pyclass(name = "ParamSpecKwargs", module = "typing")]
#[derive(Debug, PyPayload)]
#[allow(dead_code)]
pub struct ParamSpecKwargs {
__origin__: PyObjectRef,
}
#[pyclass(with(Constructor, Representable, Comparable), flags(HAS_WEAKREF))]
impl ParamSpecKwargs {
#[pymethod]
fn __mro_entries__(&self, _bases: PyObjectRef, vm: &VirtualMachine) -> PyResult {
Err(vm.new_type_error("Cannot subclass an instance of ParamSpecKwargs"))
}
#[pygetset]
fn __origin__(&self) -> PyObjectRef {
self.__origin__.clone()
}
}
impl Constructor for ParamSpecKwargs {
type Args = (PyObjectRef,);
fn py_new(_cls: &Py<PyType>, args: Self::Args, _vm: &VirtualMachine) -> PyResult<Self> {
let origin = args.0;
Ok(Self { __origin__: origin })
}
}
impl Representable for ParamSpecKwargs {
#[inline(always)]
fn repr_str(zelf: &crate::Py<Self>, vm: &VirtualMachine) -> PyResult<String> {
if let Ok(name) = zelf.__origin__.get_attr("__name__", vm) {
return Ok(format!("{name}.kwargs", name = name.str(vm)?));
}
Ok(format!("{:?}.kwargs", zelf.__origin__))
}
}
impl Comparable for ParamSpecKwargs {
fn cmp(
zelf: &crate::Py<Self>,
other: &PyObject,
op: PyComparisonOp,
vm: &VirtualMachine,
) -> PyResult<PyComparisonValue> {
op.eq_only(|| {
if other.class().is(zelf.class())
&& let Some(other_kwargs) = other.downcast_ref::<ParamSpecKwargs>()
{
let eq = zelf.__origin__.rich_compare_bool(
&other_kwargs.__origin__,
PyComparisonOp::Eq,
vm,
)?;
return Ok(PyComparisonValue::Implemented(eq));
}
Ok(PyComparisonValue::NotImplemented)
})
}
}
fn call_typing_args_kwargs(
name: &'static str,
cls: PyTypeRef,
args: FuncArgs,
vm: &VirtualMachine,
) -> PyResult {
let typing = vm.import("typing", 0)?;
let func = typing.get_attr(name, vm)?;
let mut call_args = vec![cls.into()];
call_args.extend(args.args);
let func_args = FuncArgs {
args: call_args,
kwargs: args.kwargs,
};
func.call(func_args, vm)
}
#[pyattr]
#[pyclass(name = "Generic", module = "typing")]
#[derive(Debug, PyPayload)]
#[allow(dead_code)]
pub struct Generic;
#[pyclass(flags(BASETYPE, HEAPTYPE))]
impl Generic {
#[pyattr]
fn __slots__(ctx: &Context) -> PyTupleRef {
ctx.empty_tuple.clone()
}
#[pyclassmethod]
fn __class_getitem__(cls: PyTypeRef, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
call_typing_args_kwargs("_generic_class_getitem", cls, args, vm)
}
#[pyclassmethod]
fn __init_subclass__(cls: PyTypeRef, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
call_typing_args_kwargs("_generic_init_subclass", cls, args, vm)
}
}
pub fn set_typeparam_default(
type_param: PyObjectRef,
evaluate_default: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult {
fn try_set_default<T>(
obj: &PyObject,
evaluate_default: &PyObject,
get_field: impl FnOnce(&T) -> &PyMutex<PyObjectRef>,
) -> bool
where
T: PyPayload,
{
if let Some(typed_obj) = obj.downcast_ref::<T>() {
*get_field(typed_obj).lock() = evaluate_default.to_owned();
true
} else {
false
}
}
if try_set_default::<TypeVar>(&type_param, &evaluate_default, |tv| &tv.evaluate_default)
|| try_set_default::<ParamSpec>(&type_param, &evaluate_default, |ps| {
&ps.evaluate_default
})
|| try_set_default::<TypeVarTuple>(&type_param, &evaluate_default, |tvt| {
&tvt.evaluate_default
})
{
Ok(type_param)
} else {
Err(vm.new_type_error(format!(
"Expected a type param, got {}",
type_param.class().name()
)))
}
}
}