use crate::{
AsObject, Py, PyObject, PyObjectRef, PyRef, PyResult, TryFromObject, VirtualMachine,
builtins::{
PyBytes, PyDict, PyDictRef, PyGenericAlias, PyInt, PyList, PyStr, PyTuple, PyTupleRef,
PyType, PyTypeRef, PyUtf8Str, pystr::AsPyStr,
},
common::{hash::PyHash, str::to_ascii},
convert::{ToPyObject, ToPyResult},
dict_inner::DictKey,
function::{Either, FuncArgs, PyArithmeticValue, PySetterValue},
object::PyPayload,
protocol::PyIter,
types::{Constructor, PyComparisonOp},
};
impl PyObjectRef {
#[inline(always)]
pub fn rich_compare(self, other: Self, op_id: PyComparisonOp, vm: &VirtualMachine) -> PyResult {
self._cmp(&other, op_id, vm).map(|res| res.to_pyobject(vm))
}
pub fn bytes(self, vm: &VirtualMachine) -> PyResult {
let bytes_type = vm.ctx.types.bytes_type;
match self.downcast_exact::<PyInt>(vm) {
Ok(int) => Err(vm.new_downcast_type_error(bytes_type, &int)),
Err(obj) => {
let args = FuncArgs::from(vec![obj]);
<PyBytes as Constructor>::slot_new(bytes_type.to_owned(), args, vm)
}
}
}
pub fn is_true(self, vm: &VirtualMachine) -> PyResult<bool> {
self.try_to_bool(vm)
}
pub fn not(self, vm: &VirtualMachine) -> PyResult<bool> {
self.is_true(vm).map(|x| !x)
}
pub fn length_hint(self, defaultvalue: usize, vm: &VirtualMachine) -> PyResult<usize> {
Ok(vm.length_hint_opt(self)?.unwrap_or(defaultvalue))
}
pub fn dir(self, vm: &VirtualMachine) -> PyResult<PyList> {
let attributes = self.class().get_attributes();
let dict = PyDict::from_attributes(attributes, vm)?.into_ref(&vm.ctx);
if let Some(object_dict) = self.dict() {
vm.call_method(
dict.as_object(),
identifier!(vm, update).as_str(),
(object_dict,),
)?;
}
let attributes: Vec<_> = dict.into_iter().map(|(k, _v)| k).collect();
Ok(PyList::from(attributes))
}
}
impl PyObject {
pub fn get_iter(&self, vm: &VirtualMachine) -> PyResult<PyIter> {
PyIter::try_from_object(vm, self.to_owned())
}
pub fn get_aiter(&self, vm: &VirtualMachine) -> PyResult {
use crate::builtins::PyCoroutine;
let aiter_method = self.class().get_attr(identifier!(vm, __aiter__));
let Some(_aiter_method) = aiter_method else {
return Err(vm.new_type_error(format!(
"'{}' object is not an async iterable",
self.class().name()
)));
};
let iterator = vm.call_special_method(self, identifier!(vm, __aiter__), ())?;
if iterator.downcast_ref::<PyCoroutine>().is_some() {
return Err(vm.new_type_error(
"'async_iterator' object cannot be interpreted as an async iterable; \
perhaps you forgot to call aiter()?",
));
}
if !iterator.class().has_attr(identifier!(vm, __anext__)) {
return Err(vm.new_type_error(format!(
"'{}' object is not an async iterator",
iterator.class().name()
)));
}
Ok(iterator)
}
pub fn has_attr<'a>(&self, attr_name: impl AsPyStr<'a>, vm: &VirtualMachine) -> PyResult<bool> {
self.get_attr(attr_name, vm).map(|o| !vm.is_none(&o))
}
pub fn get_attr<'a>(&self, attr_name: impl AsPyStr<'a>, vm: &VirtualMachine) -> PyResult {
let attr_name = attr_name.as_pystr(&vm.ctx);
self.get_attr_inner(attr_name, vm)
}
#[cfg_attr(feature = "flame-it", flame("PyObjectRef"))]
#[inline]
pub(crate) fn get_attr_inner(&self, attr_name: &Py<PyStr>, vm: &VirtualMachine) -> PyResult {
vm_trace!("object.__getattribute__: {:?} {:?}", self, attr_name);
let getattro = self.class().slots.getattro.load().unwrap();
getattro(self, attr_name, vm).inspect_err(|exc| {
vm.set_attribute_error_context(exc, self.to_owned(), attr_name.to_owned());
})
}
pub fn call_set_attr(
&self,
vm: &VirtualMachine,
attr_name: &Py<PyStr>,
attr_value: PySetterValue,
) -> PyResult<()> {
let setattro = {
let cls = self.class();
cls.slots.setattro.load().ok_or_else(|| {
let has_getattr = cls.slots.getattro.load().is_some();
vm.new_type_error(format!(
"'{}' object has {} attributes ({} {})",
cls.name(),
if has_getattr { "only read-only" } else { "no" },
if attr_value.is_assign() {
"assign to"
} else {
"del"
},
attr_name
))
})?
};
setattro(self, attr_name, attr_value, vm)
}
pub fn set_attr<'a>(
&self,
attr_name: impl AsPyStr<'a>,
attr_value: impl Into<PyObjectRef>,
vm: &VirtualMachine,
) -> PyResult<()> {
let attr_name = attr_name.as_pystr(&vm.ctx);
let attr_value = attr_value.into();
self.call_set_attr(vm, attr_name, PySetterValue::Assign(attr_value))
}
#[cfg_attr(feature = "flame-it", flame)]
pub fn generic_setattr(
&self,
attr_name: &Py<PyStr>,
value: PySetterValue,
vm: &VirtualMachine,
) -> PyResult<()> {
vm_trace!("object.__setattr__({:?}, {}, {:?})", self, attr_name, value);
if let Some(attr) = vm
.ctx
.interned_str(attr_name)
.and_then(|attr_name| self.get_class_attr(attr_name))
{
let descr_set = attr.class().slots.descr_set.load();
if let Some(descriptor) = descr_set {
return descriptor(&attr, self.to_owned(), value, vm);
}
}
if let Some(dict) = self.dict() {
if let PySetterValue::Assign(value) = value {
dict.set_item(attr_name, value, vm)?;
} else {
dict.del_item(attr_name, vm).map_err(|e| {
if e.fast_isinstance(vm.ctx.exceptions.key_error) {
vm.new_no_attribute_error(self.to_owned(), attr_name.to_owned())
} else {
e
}
})?;
}
Ok(())
} else {
Err(vm.new_no_attribute_error(self.to_owned(), attr_name.to_owned()))
}
}
pub fn generic_getattr(&self, name: &Py<PyStr>, vm: &VirtualMachine) -> PyResult {
self.generic_getattr_opt(name, None, vm)?
.ok_or_else(|| vm.new_no_attribute_error(self.to_owned(), name.to_owned()))
}
pub fn generic_getattr_opt(
&self,
name_str: &Py<PyStr>,
dict: Option<PyDictRef>,
vm: &VirtualMachine,
) -> PyResult<Option<PyObjectRef>> {
let name = name_str.as_wtf8();
let obj_cls = self.class();
let cls_attr_name = vm.ctx.interned_str(name_str);
let cls_attr = match cls_attr_name.and_then(|name| obj_cls.get_attr(name)) {
Some(descr) => {
let descr_cls = descr.class();
let descr_get = descr_cls.slots.descr_get.load();
if let Some(descr_get) = descr_get
&& descr_cls.slots.descr_set.load().is_some()
{
let cls = obj_cls.to_owned().into();
return descr_get(descr, Some(self.to_owned()), Some(cls), vm).map(Some);
}
Some((descr, descr_get))
}
None => None,
};
let dict = dict.or_else(|| self.dict());
let attr = if let Some(dict) = dict {
dict.get_item_opt(name, vm)?
} else {
None
};
if let Some(obj_attr) = attr {
Ok(Some(obj_attr))
} else if let Some((attr, descr_get)) = cls_attr {
match descr_get {
Some(descr_get) => {
let cls = obj_cls.to_owned().into();
descr_get(attr, Some(self.to_owned()), Some(cls), vm).map(Some)
}
None => Ok(Some(attr)),
}
} else {
Ok(None)
}
}
pub fn del_attr<'a>(&self, attr_name: impl AsPyStr<'a>, vm: &VirtualMachine) -> PyResult<()> {
let attr_name = attr_name.as_pystr(&vm.ctx);
self.call_set_attr(vm, attr_name, PySetterValue::Delete)
}
#[inline] fn _cmp(
&self,
other: &Self,
op: PyComparisonOp,
vm: &VirtualMachine,
) -> PyResult<Either<PyObjectRef, bool>> {
vm.with_recursion("in comparison", || self._cmp_inner(other, op, vm))
}
fn _cmp_inner(
&self,
other: &Self,
op: PyComparisonOp,
vm: &VirtualMachine,
) -> PyResult<Either<PyObjectRef, bool>> {
let swapped = op.swapped();
let call_cmp = |obj: &Self, other: &Self, op| {
let Some(cmp) = obj.class().slots.richcompare.load() else {
return Ok(PyArithmeticValue::NotImplemented);
};
let r = match cmp(obj, other, op, vm)? {
Either::A(obj) => PyArithmeticValue::from_object(vm, obj).map(Either::A),
Either::B(arithmetic) => arithmetic.map(Either::B),
};
Ok(r)
};
let mut checked_reverse_op = false;
let is_strict_subclass = {
let self_class = self.class();
let other_class = other.class();
!self_class.is(other_class) && other_class.fast_issubclass(self_class)
};
if is_strict_subclass {
let res = call_cmp(other, self, swapped)?;
checked_reverse_op = true;
if let PyArithmeticValue::Implemented(x) = res {
return Ok(x);
}
}
if let PyArithmeticValue::Implemented(x) = call_cmp(self, other, op)? {
return Ok(x);
}
if !checked_reverse_op {
let res = call_cmp(other, self, swapped)?;
if let PyArithmeticValue::Implemented(x) = res {
return Ok(x);
}
}
match op {
PyComparisonOp::Eq => Ok(Either::B(self.is(&other))),
PyComparisonOp::Ne => Ok(Either::B(!self.is(&other))),
_ => Err(vm.new_type_error(format!(
"'{}' not supported between instances of '{}' and '{}'",
op.operator_token(),
self.class().name(),
other.class().name()
))),
}
}
#[inline(always)]
pub fn rich_compare_bool(
&self,
other: &Self,
op_id: PyComparisonOp,
vm: &VirtualMachine,
) -> PyResult<bool> {
match self._cmp(other, op_id, vm)? {
Either::A(obj) => obj.try_to_bool(vm),
Either::B(other) => Ok(other),
}
}
pub fn repr_utf8(&self, vm: &VirtualMachine) -> PyResult<PyRef<PyUtf8Str>> {
self.repr(vm)?.try_into_utf8(vm)
}
pub fn repr(&self, vm: &VirtualMachine) -> PyResult<PyRef<PyStr>> {
vm.with_recursion("while getting the repr of an object", || {
self.class().slots.repr.load().map_or_else(
|| {
Err(vm.new_runtime_error(format!(
"BUG: object of type '{}' has no __repr__ method. This is a bug in RustPython.",
self.class().name()
)))
},
|repr| repr(self, vm),
)
})
}
pub fn ascii(&self, vm: &VirtualMachine) -> PyResult<PyRef<PyStr>> {
let repr = self.repr(vm)?;
if repr.as_wtf8().is_ascii() {
Ok(repr)
} else {
Ok(vm.ctx.new_str(to_ascii(repr.as_wtf8())))
}
}
pub fn str_utf8(&self, vm: &VirtualMachine) -> PyResult<PyRef<PyUtf8Str>> {
self.str(vm)?.try_into_utf8(vm)
}
pub fn str(&self, vm: &VirtualMachine) -> PyResult<PyRef<PyStr>> {
let obj = match self.to_owned().downcast_exact::<PyStr>(vm) {
Ok(s) => return Ok(s.into_pyref()),
Err(obj) => obj,
};
let obj = match obj.downcast_exact::<PyInt>(vm) {
Ok(int) => {
return Ok(vm.ctx.new_str(int.to_str_radix_10()));
}
Err(obj) => obj,
};
let str_method = match vm.get_special_method(&obj, identifier!(vm, __str__))? {
Some(str_method) => str_method,
None => return obj.repr(vm),
};
let s = str_method.invoke((), vm)?;
s.downcast::<PyStr>().map_err(|obj| {
vm.new_type_error(format!(
"__str__ returned non-string (type {})",
obj.class().name()
))
})
}
fn check_class<F>(&self, vm: &VirtualMachine, msg: F) -> PyResult<()>
where
F: Fn() -> String,
{
let cls = self;
match cls.abstract_get_bases(vm)? {
Some(_bases) => Ok(()), None => {
Err(vm.new_type_error(msg()))
}
}
}
fn abstract_get_bases(&self, vm: &VirtualMachine) -> PyResult<Option<PyTupleRef>> {
match vm.get_attribute_opt(self.to_owned(), identifier!(vm, __bases__))? {
Some(bases) => {
match PyTupleRef::try_from_object(vm, bases) {
Ok(tuple) => Ok(Some(tuple)),
Err(_) => Ok(None), }
}
None => Ok(None), }
}
fn abstract_issubclass(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
let mut bases: PyTupleRef;
let mut derived = self;
let bases = loop {
if derived.is(cls) {
return Ok(true);
}
let Some(derived_bases) = derived.abstract_get_bases(vm)? else {
return Ok(false);
};
let n = derived_bases.len();
match n {
0 => return Ok(false),
1 => {
bases = derived_bases;
derived = &bases.as_slice()[0];
continue;
}
_ => {
break derived_bases;
}
}
};
let n = bases.len();
debug_assert!(n >= 2);
for i in 0..n {
let result = vm.with_recursion("in __issubclass__", || {
bases.as_slice()[i].abstract_issubclass(cls, vm)
})?;
if result {
return Ok(true);
}
}
Ok(false)
}
fn recursive_issubclass(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
if let Some(cls) = PyType::check(cls)
&& let Some(derived) = PyType::check(self)
{
return Ok(derived.is_subtype(cls));
}
self.check_class(vm, || {
format!("issubclass() arg 1 must be a class, not {}", self.class())
})?;
if !cls.class().is(vm.ctx.types.union_type) {
cls.check_class(vm, || {
format!(
"issubclass() arg 2 must be a class, a tuple of classes, or a union, not {}",
cls.class()
)
})?;
}
self.abstract_issubclass(cls, vm)
}
pub fn real_is_subclass(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
self.recursive_issubclass(cls, vm)
}
pub fn is_subclass(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
let derived = self;
if cls.class().is(vm.ctx.types.type_type) {
if derived.is(cls) {
return Ok(true);
}
return derived.recursive_issubclass(cls, vm);
}
let cls = if cls.class().is(vm.ctx.types.union_type) {
let union = cls
.downcast_ref::<crate::builtins::PyUnion>()
.expect("union is already checked");
union.args().as_object()
} else {
cls
};
if let Some(tuple) = cls.downcast_ref::<PyTuple>() {
for item in tuple {
if vm.with_recursion("in __subclasscheck__", || derived.is_subclass(item, vm))? {
return Ok(true);
}
}
return Ok(false);
}
if let Some(checker) = cls.lookup_special(identifier!(vm, __subclasscheck__), vm) {
let res = vm.with_recursion("in __subclasscheck__", || {
checker.call((derived.to_owned(),), vm)
})?;
return res.try_to_bool(vm);
}
derived.recursive_issubclass(cls, vm)
}
pub(crate) fn real_is_instance(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
self.object_isinstance(cls, vm)
}
fn object_isinstance(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
if let Ok(cls) = cls.try_to_ref::<PyType>(vm) {
let mut retval = self.class().is_subtype(cls);
if !retval
&& let Some(i_cls) =
vm.get_attribute_opt(self.to_owned(), identifier!(vm, __class__))?
&& let Ok(i_cls_type) = PyTypeRef::try_from_object(vm, i_cls)
&& !i_cls_type.is(self.class())
{
retval = i_cls_type.is_subtype(cls);
}
Ok(retval)
} else {
cls.check_class(vm, || {
format!(
"isinstance() arg 2 must be a type, a tuple of types, or a union, not {}",
cls.class()
)
})?;
if let Some(i_cls) =
vm.get_attribute_opt(self.to_owned(), identifier!(vm, __class__))?
{
i_cls.abstract_issubclass(cls, vm)
} else {
Ok(false)
}
}
}
pub fn is_instance(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
self.object_recursive_isinstance(cls, vm)
}
fn object_recursive_isinstance(&self, cls: &Self, vm: &VirtualMachine) -> PyResult<bool> {
if self.class().is(cls) {
return Ok(true);
}
if cls.class().is(vm.ctx.types.type_type) {
return self.object_isinstance(cls, vm);
}
let cls = if cls.class().is(vm.ctx.types.union_type) {
let union = cls
.try_to_ref::<crate::builtins::PyUnion>(vm)
.expect("checked by is");
union.args().as_object()
} else {
cls
};
if let Some(tuple) = cls.downcast_ref::<PyTuple>() {
for item in tuple {
if vm.with_recursion("in __instancecheck__", || {
self.object_recursive_isinstance(item, vm)
})? {
return Ok(true);
}
}
return Ok(false);
}
if let Some(checker) = cls.lookup_special(identifier!(vm, __instancecheck__), vm) {
let res = vm.with_recursion("in __instancecheck__", || {
checker.call((self.to_owned(),), vm)
})?;
return res.try_to_bool(vm);
}
self.object_isinstance(cls, vm)
}
pub fn hash(&self, vm: &VirtualMachine) -> PyResult<PyHash> {
if let Some(hash) = self.class().slots.hash.load() {
return hash(self, vm);
}
Err(vm.new_exception_msg(
vm.ctx.exceptions.type_error.to_owned(),
format!("unhashable type: '{}'", self.class().name()).into(),
))
}
pub fn obj_type(&self) -> PyObjectRef {
self.class().to_owned().into()
}
pub fn type_check(&self, typ: &Py<PyType>) -> bool {
self.fast_isinstance(typ)
}
pub fn length_opt(&self, vm: &VirtualMachine) -> Option<PyResult<usize>> {
self.sequence_unchecked()
.length_opt(vm)
.or_else(|| self.mapping_unchecked().length_opt(vm))
}
pub fn length(&self, vm: &VirtualMachine) -> PyResult<usize> {
self.length_opt(vm).ok_or_else(|| {
vm.new_type_error(format!(
"object of type '{}' has no len()",
self.class().name()
))
})?
}
pub fn get_item<K: DictKey + ?Sized>(&self, needle: &K, vm: &VirtualMachine) -> PyResult {
if let Some(dict) = self.downcast_ref_if_exact::<PyDict>(vm) {
return dict.get_item(needle, vm);
}
let needle = needle.to_pyobject(vm);
if let Ok(mapping) = self.try_mapping(vm) {
mapping.subscript(&needle, vm)
} else if let Ok(seq) = self.try_sequence(vm) {
let i = needle.key_as_isize(vm)?;
seq.get_item(i, vm)
} else {
if self.class().fast_issubclass(vm.ctx.types.type_type) {
if self.is(vm.ctx.types.type_type) {
return PyGenericAlias::from_args(self.class().to_owned(), needle, vm)
.to_pyresult(vm);
}
if let Some(class_getitem) =
vm.get_attribute_opt(self.to_owned(), identifier!(vm, __class_getitem__))?
&& !vm.is_none(&class_getitem)
{
return class_getitem.call((needle,), vm);
}
return Err(vm.new_type_error(format!(
"type '{}' is not subscriptable",
self.downcast_ref::<PyType>().unwrap().name()
)));
}
Err(vm.new_type_error(format!("'{}' object is not subscriptable", self.class())))
}
}
pub fn set_item<K: DictKey + ?Sized>(
&self,
needle: &K,
value: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult<()> {
if let Some(dict) = self.downcast_ref_if_exact::<PyDict>(vm) {
return dict.set_item(needle, value, vm);
}
let mapping = self.mapping_unchecked();
if let Some(f) = mapping.slots().ass_subscript.load() {
let needle = needle.to_pyobject(vm);
return f(mapping, &needle, Some(value), vm);
}
let seq = self.sequence_unchecked();
if let Some(f) = seq.slots().ass_item.load() {
let i = needle.key_as_isize(vm)?;
return f(seq, i, Some(value), vm);
}
Err(vm.new_type_error(format!(
"'{}' does not support item assignment",
self.class()
)))
}
pub fn del_item<K: DictKey + ?Sized>(&self, needle: &K, vm: &VirtualMachine) -> PyResult<()> {
if let Some(dict) = self.downcast_ref_if_exact::<PyDict>(vm) {
return dict.del_item(needle, vm);
}
let mapping = self.mapping_unchecked();
if let Some(f) = mapping.slots().ass_subscript.load() {
let needle = needle.to_pyobject(vm);
return f(mapping, &needle, None, vm);
}
let seq = self.sequence_unchecked();
if let Some(f) = seq.slots().ass_item.load() {
let i = needle.key_as_isize(vm)?;
return f(seq, i, None, vm);
}
Err(vm.new_type_error(format!("'{}' does not support item deletion", self.class())))
}
pub fn lookup_special(&self, attr: &Py<PyStr>, vm: &VirtualMachine) -> Option<PyObjectRef> {
let obj_cls = self.class();
let res = obj_cls.lookup_ref(attr, vm)?;
let descr_get = res.class().slots.descr_get.load();
if let Some(descr_get) = descr_get {
let obj_cls = obj_cls.to_owned().into();
descr_get(res, Some(self.to_owned()), Some(obj_cls), vm).ok()
} else {
Some(res)
}
}
}