use super::{
PyClassMethod, PyDictRef, PyList, PyStaticMethod, PyStr, PyStrInterned, PyStrRef, PyTupleRef,
PyUtf8StrRef, PyWeak, mappingproxy::PyMappingProxy, object, union_,
};
use crate::{
AsObject, Context, Py, PyAtomicRef, PyObject, PyObjectRef, PyPayload, PyRef, PyResult,
TryFromObject, VirtualMachine,
builtins::{
PyBaseExceptionRef,
descriptor::{
MemberGetter, MemberKind, MemberSetter, PyDescriptorOwned, PyMemberDef,
PyMemberDescriptor,
},
function::{PyCellRef, PyFunction},
tuple::{IntoPyTuple, PyTuple},
},
class::{PyClassImpl, StaticType},
common::{
ascii,
borrow::BorrowedValue,
lock::{PyMutex, PyRwLock, PyRwLockReadGuard},
},
function::{FuncArgs, KwArgs, OptionalArg, PyMethodDef, PySetterValue},
object::{Traverse, TraverseFn},
protocol::{PyIterReturn, PyNumberMethods},
types::{
AsNumber, Callable, Constructor, GetAttr, Initializer, PyTypeFlags, PyTypeSlots,
Representable, SLOT_DEFS, SetAttr, TypeDataRef, TypeDataRefMut, TypeDataSlot,
},
};
use core::{
any::Any,
borrow::Borrow,
ops::Deref,
pin::Pin,
ptr::NonNull,
sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, Ordering},
};
use indexmap::{IndexMap, map::Entry};
use itertools::Itertools;
use num_traits::ToPrimitive;
use rustpython_common::wtf8::Wtf8;
use std::collections::HashSet;
#[pyclass(module = false, name = "type", traverse = "manual")]
pub struct PyType {
pub base: Option<PyTypeRef>,
pub bases: PyRwLock<Vec<PyTypeRef>>,
pub mro: PyRwLock<Vec<PyTypeRef>>,
pub subclasses: PyRwLock<Vec<PyRef<PyWeak>>>,
pub attributes: PyRwLock<PyAttributes>,
pub slots: PyTypeSlots,
pub heaptype_ext: Option<Pin<Box<HeapTypeExt>>>,
pub tp_version_tag: AtomicU32,
}
static NEXT_TYPE_VERSION: AtomicU32 = AtomicU32::new(1);
const TYPE_CACHE_SIZE_EXP: u32 = 12;
const TYPE_CACHE_SIZE: usize = 1 << TYPE_CACHE_SIZE_EXP;
const TYPE_CACHE_MASK: usize = TYPE_CACHE_SIZE - 1;
struct TypeCacheEntry {
sequence: AtomicU32,
version: AtomicU32,
name: AtomicPtr<PyStrInterned>,
value: AtomicPtr<PyObject>,
}
unsafe impl Send for TypeCacheEntry {}
unsafe impl Sync for TypeCacheEntry {}
impl TypeCacheEntry {
fn new() -> Self {
Self {
sequence: AtomicU32::new(0),
version: AtomicU32::new(0),
name: AtomicPtr::new(core::ptr::null_mut()),
value: AtomicPtr::new(core::ptr::null_mut()),
}
}
#[inline]
fn begin_write(&self) {
let mut seq = self.sequence.load(Ordering::Acquire);
loop {
while (seq & 1) != 0 {
core::hint::spin_loop();
seq = self.sequence.load(Ordering::Acquire);
}
match self.sequence.compare_exchange_weak(
seq,
seq.wrapping_add(1),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
core::sync::atomic::fence(Ordering::Release);
break;
}
Err(observed) => {
core::hint::spin_loop();
seq = observed;
}
}
}
}
#[inline]
fn end_write(&self) {
self.sequence.fetch_add(1, Ordering::Release);
}
#[inline]
fn begin_read(&self) -> u32 {
let mut sequence = self.sequence.load(Ordering::Acquire);
while (sequence & 1) != 0 {
core::hint::spin_loop();
sequence = self.sequence.load(Ordering::Acquire);
}
sequence
}
#[inline]
fn end_read(&self, previous: u32) -> bool {
core::sync::atomic::fence(Ordering::Acquire);
self.sequence.load(Ordering::Relaxed) == previous
}
fn clear_value(&self) {
self.value.store(core::ptr::null_mut(), Ordering::Relaxed);
}
}
static TYPE_CACHE: std::sync::LazyLock<Box<[TypeCacheEntry]>> = std::sync::LazyLock::new(|| {
(0..TYPE_CACHE_SIZE)
.map(|_| TypeCacheEntry::new())
.collect::<Vec<_>>()
.into_boxed_slice()
});
static TYPE_CACHE_CLEARING: AtomicBool = AtomicBool::new(false);
#[inline]
fn type_cache_hash(version: u32, name: &'static PyStrInterned) -> usize {
let name_hash = (name as *const PyStrInterned as usize >> 3) as u32;
((version ^ name_hash) as usize) & TYPE_CACHE_MASK
}
fn type_cache_clear_version(version: u32) {
for entry in TYPE_CACHE.iter() {
if entry.version.load(Ordering::Relaxed) == version {
entry.begin_write();
if entry.version.load(Ordering::Relaxed) == version {
entry.version.store(0, Ordering::Release);
entry.clear_value();
}
entry.end_write();
}
}
}
pub fn type_cache_clear() {
TYPE_CACHE_CLEARING.store(true, Ordering::Release);
for entry in TYPE_CACHE.iter() {
entry.begin_write();
entry.version.store(0, Ordering::Release);
entry.clear_value();
entry.end_write();
}
TYPE_CACHE_CLEARING.store(false, Ordering::Release);
}
unsafe impl crate::object::Traverse for PyType {
fn traverse(&self, tracer_fn: &mut crate::object::TraverseFn<'_>) {
self.base.traverse(tracer_fn);
self.bases.traverse(tracer_fn);
self.mro.traverse(tracer_fn);
self.subclasses.traverse(tracer_fn);
self.attributes
.read_recursive()
.iter()
.map(|(_, v)| v.traverse(tracer_fn))
.count();
if let Some(ext) = self.heaptype_ext.as_ref() {
ext.specialization_cache.traverse(tracer_fn);
}
}
fn clear(&mut self, out: &mut Vec<crate::PyObjectRef>) {
if let Some(base) = self.base.take() {
out.push(base.into());
}
if let Some(mut guard) = self.bases.try_write() {
for base in guard.drain(..) {
out.push(base.into());
}
}
if let Some(mut guard) = self.mro.try_write() {
for typ in guard.drain(..) {
out.push(typ.into());
}
}
if let Some(mut guard) = self.subclasses.try_write() {
for weak in guard.drain(..) {
out.push(weak.into());
}
}
if let Some(mut guard) = self.attributes.try_write() {
for (_, val) in guard.drain(..) {
out.push(val);
}
}
if let Some(ext) = self.heaptype_ext.as_ref() {
ext.specialization_cache.clear_into(out);
}
}
}
pub struct HeapTypeExt {
pub name: PyRwLock<PyUtf8StrRef>,
pub qualname: PyRwLock<PyStrRef>,
pub slots: Option<PyRef<PyTuple<PyStrRef>>>,
pub type_data: PyRwLock<Option<TypeDataSlot>>,
pub specialization_cache: TypeSpecializationCache,
}
pub struct TypeSpecializationCache {
pub init: PyAtomicRef<Option<PyFunction>>,
pub getitem: PyAtomicRef<Option<PyFunction>>,
pub getitem_version: AtomicU32,
write_lock: PyMutex<()>,
retired: PyRwLock<Vec<PyObjectRef>>,
}
impl TypeSpecializationCache {
fn new() -> Self {
Self {
init: PyAtomicRef::from(None::<PyRef<PyFunction>>),
getitem: PyAtomicRef::from(None::<PyRef<PyFunction>>),
getitem_version: AtomicU32::new(0),
write_lock: PyMutex::new(()),
retired: PyRwLock::new(Vec::new()),
}
}
#[inline]
fn retire_old_function(&self, old: Option<PyRef<PyFunction>>) {
if let Some(old) = old {
self.retired.write().push(old.into());
}
}
#[inline]
fn swap_init(&self, new_init: Option<PyRef<PyFunction>>, vm: Option<&VirtualMachine>) {
if let Some(vm) = vm {
self.init.swap_to_temporary_refs(new_init, vm);
return;
}
let old = unsafe { self.init.swap(new_init) };
self.retire_old_function(old);
}
#[inline]
fn swap_getitem(&self, new_getitem: Option<PyRef<PyFunction>>, vm: Option<&VirtualMachine>) {
if let Some(vm) = vm {
self.getitem.swap_to_temporary_refs(new_getitem, vm);
return;
}
let old = unsafe { self.getitem.swap(new_getitem) };
self.retire_old_function(old);
}
#[inline]
fn invalidate_for_type_modified(&self) {
let _guard = self.write_lock.lock();
self.swap_init(None, None);
self.swap_getitem(None, None);
}
fn traverse(&self, tracer_fn: &mut TraverseFn<'_>) {
if let Some(init) = self.init.deref() {
tracer_fn(init.as_object());
}
if let Some(getitem) = self.getitem.deref() {
tracer_fn(getitem.as_object());
}
self.retired
.read()
.iter()
.map(|obj| obj.traverse(tracer_fn))
.count();
}
fn clear_into(&self, out: &mut Vec<PyObjectRef>) {
let _guard = self.write_lock.lock();
let old_init = unsafe { self.init.swap(None) };
if let Some(old_init) = old_init {
out.push(old_init.into());
}
let old_getitem = unsafe { self.getitem.swap(None) };
if let Some(old_getitem) = old_getitem {
out.push(old_getitem.into());
}
self.getitem_version.store(0, Ordering::Release);
out.extend(self.retired.write().drain(..));
}
}
pub struct PointerSlot<T>(NonNull<T>);
unsafe impl<T> Sync for PointerSlot<T> {}
unsafe impl<T> Send for PointerSlot<T> {}
impl<T> PointerSlot<T> {
pub const unsafe fn borrow_static(&self) -> &'static T {
unsafe { self.0.as_ref() }
}
}
impl<T> Clone for PointerSlot<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for PointerSlot<T> {}
impl<T> From<&'static T> for PointerSlot<T> {
fn from(x: &'static T) -> Self {
Self(NonNull::from(x))
}
}
impl<T> AsRef<T> for PointerSlot<T> {
fn as_ref(&self) -> &T {
unsafe { self.0.as_ref() }
}
}
pub type PyTypeRef = PyRef<PyType>;
cfg_if::cfg_if! {
if #[cfg(feature = "threading")] {
unsafe impl Send for PyType {}
unsafe impl Sync for PyType {}
}
}
pub type PyAttributes = IndexMap<&'static PyStrInterned, PyObjectRef, ahash::RandomState>;
unsafe impl Traverse for PyAttributes {
fn traverse(&self, tracer_fn: &mut TraverseFn<'_>) {
self.values().for_each(|v| v.traverse(tracer_fn));
}
}
impl core::fmt::Display for PyType {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
core::fmt::Display::fmt(&self.name(), f)
}
}
impl core::fmt::Debug for PyType {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "[PyType {}]", &self.name())
}
}
impl PyPayload for PyType {
#[inline]
fn class(ctx: &Context) -> &'static Py<PyType> {
ctx.types.type_type
}
}
fn downcast_qualname(value: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyRef<PyStr>> {
match value.downcast::<PyStr>() {
Ok(value) => Ok(value),
Err(value) => Err(vm.new_type_error(format!(
"can only assign string to __qualname__, not '{}'",
value.class().name()
))),
}
}
fn is_subtype_with_mro(a_mro: &[PyTypeRef], a: &Py<PyType>, b: &Py<PyType>) -> bool {
if a.is(b) {
return true;
}
for item in a_mro {
if item.is(b) {
return true;
}
}
false
}
impl PyType {
pub fn assign_version_tag(&self) -> u32 {
let v = self.tp_version_tag.load(Ordering::Acquire);
if v != 0 {
return v;
}
for base in self.bases.read().iter() {
if base.assign_version_tag() == 0 {
return 0;
}
}
loop {
let current = NEXT_TYPE_VERSION.load(Ordering::Relaxed);
let Some(next) = current.checked_add(1) else {
return 0; };
if NEXT_TYPE_VERSION
.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.tp_version_tag.store(current, Ordering::Release);
return current;
}
}
}
pub fn modified(&self) {
if let Some(ext) = self.heaptype_ext.as_ref() {
ext.specialization_cache.invalidate_for_type_modified();
}
let old_version = self.tp_version_tag.load(Ordering::Acquire);
if old_version == 0 {
return;
}
self.tp_version_tag.store(0, Ordering::SeqCst);
type_cache_clear_version(old_version);
let subclasses = self.subclasses.read();
for weak_ref in subclasses.iter() {
if let Some(sub) = weak_ref.upgrade() {
sub.downcast_ref::<PyType>().unwrap().modified();
}
}
}
pub fn new_simple_heap(
name: &str,
base: &Py<PyType>,
ctx: &Context,
) -> Result<PyRef<Self>, String> {
Self::new_heap(
name,
vec![base.to_owned()],
Default::default(),
Default::default(),
Self::static_type().to_owned(),
ctx,
)
}
pub fn new_heap(
name: &str,
bases: Vec<PyRef<Self>>,
attrs: PyAttributes,
mut slots: PyTypeSlots,
metaclass: PyRef<Self>,
ctx: &Context,
) -> Result<PyRef<Self>, String> {
slots.flags |= PyTypeFlags::HEAPTYPE;
let name_utf8 = ctx.new_utf8_str(name);
let name = name_utf8.clone().into_wtf8();
let heaptype_ext = HeapTypeExt {
name: PyRwLock::new(name_utf8),
qualname: PyRwLock::new(name),
slots: None,
type_data: PyRwLock::new(None),
specialization_cache: TypeSpecializationCache::new(),
};
let base = bases[0].clone();
Self::new_heap_inner(base, bases, attrs, slots, heaptype_ext, metaclass, ctx)
}
pub(crate) fn check(obj: &PyObject) -> Option<&Py<Self>> {
obj.downcast_ref::<Self>()
}
fn resolve_mro(bases: &[PyRef<Self>]) -> Result<Vec<PyTypeRef>, String> {
let mut unique_bases = HashSet::new();
for base in bases {
if !unique_bases.insert(base.get_id()) {
return Err(format!("duplicate base class {}", base.name()));
}
}
let mros = bases
.iter()
.map(|base| base.mro_map_collect(|t| t.to_owned()))
.collect();
linearise_mro(mros)
}
fn inherit_patma_flags(slots: &mut PyTypeSlots, bases: &[PyRef<Self>]) {
const COLLECTION_FLAGS: PyTypeFlags = PyTypeFlags::from_bits_truncate(
PyTypeFlags::SEQUENCE.bits() | PyTypeFlags::MAPPING.bits(),
);
if slots.flags.intersects(COLLECTION_FLAGS) {
return;
}
for base in bases {
let base_flags = base.slots.flags & COLLECTION_FLAGS;
if !base_flags.is_empty() {
slots.flags |= base_flags;
return;
}
}
}
fn check_abc_tpflags(
slots: &mut PyTypeSlots,
attrs: &PyAttributes,
bases: &[PyRef<Self>],
ctx: &Context,
) {
const COLLECTION_FLAGS: PyTypeFlags = PyTypeFlags::from_bits_truncate(
PyTypeFlags::SEQUENCE.bits() | PyTypeFlags::MAPPING.bits(),
);
if slots.flags.intersects(COLLECTION_FLAGS) {
return;
}
let abc_tpflags_name = ctx.intern_str("__abc_tpflags__");
if let Some(abc_tpflags_obj) = attrs.get(abc_tpflags_name)
&& let Some(int_obj) = abc_tpflags_obj.downcast_ref::<crate::builtins::int::PyInt>()
{
let flags_val = int_obj.as_bigint().to_i64().unwrap_or(0);
let abc_flags = PyTypeFlags::from_bits_truncate(flags_val as u64);
slots.flags |= abc_flags & COLLECTION_FLAGS;
return;
}
for base in bases {
if let Some(abc_tpflags_obj) = base.find_name_in_mro(abc_tpflags_name)
&& let Some(int_obj) = abc_tpflags_obj.downcast_ref::<crate::builtins::int::PyInt>()
{
let flags_val = int_obj.as_bigint().to_i64().unwrap_or(0);
let abc_flags = PyTypeFlags::from_bits_truncate(flags_val as u64);
slots.flags |= abc_flags & COLLECTION_FLAGS;
return;
}
}
}
#[allow(clippy::too_many_arguments)]
fn new_heap_inner(
base: PyRef<Self>,
bases: Vec<PyRef<Self>>,
attrs: PyAttributes,
mut slots: PyTypeSlots,
heaptype_ext: HeapTypeExt,
metaclass: PyRef<Self>,
ctx: &Context,
) -> Result<PyRef<Self>, String> {
let mro = Self::resolve_mro(&bases)?;
if mro
.iter()
.any(|b| b.slots.flags.has_feature(PyTypeFlags::HAS_DICT))
{
slots.flags |= PyTypeFlags::HAS_DICT
}
if mro
.iter()
.any(|b| b.slots.flags.has_feature(PyTypeFlags::HAS_WEAKREF))
{
slots.flags |= PyTypeFlags::HAS_WEAKREF | PyTypeFlags::MANAGED_WEAKREF
}
Self::inherit_patma_flags(&mut slots, &bases);
Self::check_abc_tpflags(&mut slots, &attrs, &bases, ctx);
if slots.basicsize == 0 {
slots.basicsize = base.slots.basicsize;
}
Self::inherit_readonly_slots(&mut slots, &base);
if slots.flags.has_feature(PyTypeFlags::HAS_WEAKREF) {
slots.flags |= PyTypeFlags::MANAGED_WEAKREF;
}
if let Some(qualname) = attrs.get(identifier!(ctx, __qualname__))
&& !qualname.fast_isinstance(ctx.types.str_type)
{
return Err(format!(
"type __qualname__ must be a str, not {}",
qualname.class().name()
));
}
let new_type = PyRef::new_ref(
Self {
base: Some(base),
bases: PyRwLock::new(bases),
mro: PyRwLock::new(mro),
subclasses: PyRwLock::default(),
attributes: PyRwLock::new(attrs),
slots,
heaptype_ext: Some(Pin::new(Box::new(heaptype_ext))),
tp_version_tag: AtomicU32::new(0),
},
metaclass,
None,
);
new_type.mro.write().insert(0, new_type.clone());
new_type.init_slots(ctx);
let weakref_type = super::PyWeak::static_type();
for base in new_type.bases.read().iter() {
base.subclasses.write().push(
new_type
.as_object()
.downgrade_with_weakref_typ_opt(None, weakref_type.to_owned())
.unwrap(),
);
}
Ok(new_type)
}
pub fn new_static(
base: PyRef<Self>,
attrs: PyAttributes,
mut slots: PyTypeSlots,
metaclass: PyRef<Self>,
) -> Result<PyRef<Self>, String> {
if base.slots.flags.has_feature(PyTypeFlags::HAS_DICT) {
slots.flags |= PyTypeFlags::HAS_DICT
}
if base.slots.flags.has_feature(PyTypeFlags::HAS_WEAKREF) {
slots.flags |= PyTypeFlags::HAS_WEAKREF | PyTypeFlags::MANAGED_WEAKREF
}
Self::inherit_patma_flags(&mut slots, core::slice::from_ref(&base));
if slots.basicsize == 0 {
slots.basicsize = base.slots.basicsize;
}
Self::inherit_readonly_slots(&mut slots, &base);
if slots.flags.has_feature(PyTypeFlags::HAS_WEAKREF) {
slots.flags |= PyTypeFlags::MANAGED_WEAKREF;
}
let bases = PyRwLock::new(vec![base.clone()]);
let mro = base.mro_map_collect(|x| x.to_owned());
let new_type = PyRef::new_ref(
Self {
base: Some(base),
bases,
mro: PyRwLock::new(mro),
subclasses: PyRwLock::default(),
attributes: PyRwLock::new(attrs),
slots,
heaptype_ext: None,
tp_version_tag: AtomicU32::new(0),
},
metaclass,
None,
);
unsafe {
crate::gc_state::gc_state()
.untrack_object(core::ptr::NonNull::from(new_type.as_object()));
}
new_type.as_object().clear_gc_tracked();
new_type.mro.write().insert(0, new_type.clone());
Self::set_new(&new_type.slots, &new_type.base);
Self::set_alloc(&new_type.slots, &new_type.base);
let weakref_type = super::PyWeak::static_type();
for base in new_type.bases.read().iter() {
base.subclasses.write().push(
new_type
.as_object()
.downgrade_with_weakref_typ_opt(None, weakref_type.to_owned())
.unwrap(),
);
}
Ok(new_type)
}
pub(crate) fn init_slots(&self, ctx: &Context) {
let mro: Vec<_> = self.mro.read()[1..].to_vec();
for base in mro.iter() {
self.inherit_slots(base);
}
#[allow(clippy::mutable_key_type)]
let mut slot_name_set = std::collections::HashSet::new();
for cls in self.mro.read()[1..].iter() {
for &name in cls.attributes.read().keys() {
if name.as_bytes().starts_with(b"__") && name.as_bytes().ends_with(b"__") {
slot_name_set.insert(name);
}
}
}
for &name in self.attributes.read().keys() {
if name.as_bytes().starts_with(b"__") && name.as_bytes().ends_with(b"__") {
slot_name_set.insert(name);
}
}
let mut slot_names: Vec<_> = slot_name_set.into_iter().collect();
slot_names.sort_by_key(|name| name.as_str());
for attr_name in slot_names {
self.update_slot::<true>(attr_name, ctx);
}
Self::set_new(&self.slots, &self.base);
Self::set_alloc(&self.slots, &self.base);
}
fn set_new(slots: &PyTypeSlots, base: &Option<PyTypeRef>) {
if slots.flags.contains(PyTypeFlags::DISALLOW_INSTANTIATION) {
slots.new.store(None)
} else if slots.new.load().is_none() {
slots.new.store(
base.as_ref()
.map(|base| base.slots.new.load())
.unwrap_or(None),
)
}
}
fn set_alloc(slots: &PyTypeSlots, base: &Option<PyTypeRef>) {
if slots.alloc.load().is_none() {
slots.alloc.store(
base.as_ref()
.map(|base| base.slots.alloc.load())
.unwrap_or(None),
);
}
}
fn inherit_readonly_slots(slots: &mut PyTypeSlots, base: &Self) {
if slots.as_buffer.is_none() {
slots.as_buffer = base.slots.as_buffer;
}
}
pub(crate) fn inherit_slots(&self, base: &Self) {
for def in SLOT_DEFS {
def.accessor.copyslot_if_none(self, base);
}
}
pub fn set_str_attr<V: Into<PyObjectRef>>(
&self,
attr_name: &str,
value: V,
ctx: impl AsRef<Context>,
) {
let ctx = ctx.as_ref();
let attr_name = ctx.intern_str(attr_name);
self.set_attr(attr_name, value.into())
}
pub fn set_attr(&self, attr_name: &'static PyStrInterned, value: PyObjectRef) {
self.modified();
self.attributes.write().insert(attr_name, value);
}
pub fn get_attr(&self, attr_name: &'static PyStrInterned) -> Option<PyObjectRef> {
self.find_name_in_mro(attr_name)
}
pub(crate) fn cache_init_for_specialization(
&self,
init: PyRef<PyFunction>,
tp_version: u32,
vm: &VirtualMachine,
) -> bool {
let Some(ext) = self.heaptype_ext.as_ref() else {
return false;
};
if tp_version == 0 {
return false;
}
if self.tp_version_tag.load(Ordering::Acquire) != tp_version {
return false;
}
let _guard = ext.specialization_cache.write_lock.lock();
if self.tp_version_tag.load(Ordering::Acquire) != tp_version {
return false;
}
ext.specialization_cache.swap_init(Some(init), Some(vm));
true
}
pub(crate) fn get_cached_init_for_specialization(
&self,
tp_version: u32,
) -> Option<PyRef<PyFunction>> {
let ext = self.heaptype_ext.as_ref()?;
if tp_version == 0 {
return None;
}
if self.tp_version_tag.load(Ordering::Acquire) != tp_version {
return None;
}
ext.specialization_cache
.init
.to_owned_ordering(Ordering::Acquire)
}
pub(crate) fn cache_getitem_for_specialization(
&self,
getitem: PyRef<PyFunction>,
tp_version: u32,
vm: &VirtualMachine,
) -> bool {
let Some(ext) = self.heaptype_ext.as_ref() else {
return false;
};
if tp_version == 0 {
return false;
}
let _guard = ext.specialization_cache.write_lock.lock();
if self.tp_version_tag.load(Ordering::Acquire) != tp_version {
return false;
}
let func_version = getitem.get_version_for_current_state();
if func_version == 0 {
return false;
}
ext.specialization_cache
.swap_getitem(Some(getitem), Some(vm));
ext.specialization_cache
.getitem_version
.store(func_version, Ordering::Relaxed);
true
}
pub(crate) fn get_cached_getitem_for_specialization(&self) -> Option<(PyRef<PyFunction>, u32)> {
let ext = self.heaptype_ext.as_ref()?;
let getitem = ext
.specialization_cache
.getitem
.to_owned_ordering(Ordering::Acquire)?;
let cached_version = ext
.specialization_cache
.getitem_version
.load(Ordering::Relaxed);
if cached_version == 0 {
return None;
}
Some((getitem, cached_version))
}
pub fn get_direct_attr(&self, attr_name: &'static PyStrInterned) -> Option<PyObjectRef> {
self.attributes.read().get(attr_name).cloned()
}
fn find_name_in_mro(&self, name: &'static PyStrInterned) -> Option<PyObjectRef> {
let version = self.tp_version_tag.load(Ordering::Acquire);
if version != 0 {
let idx = type_cache_hash(version, name);
let entry = &TYPE_CACHE[idx];
let name_ptr = name as *const _ as *mut _;
loop {
let seq1 = entry.begin_read();
let v1 = entry.version.load(Ordering::Acquire);
let type_version = self.tp_version_tag.load(Ordering::Acquire);
if v1 != type_version
|| !core::ptr::eq(entry.name.load(Ordering::Relaxed), name_ptr)
{
break;
}
let ptr = entry.value.load(Ordering::Acquire);
if ptr.is_null() {
if entry.end_read(seq1) {
break;
}
continue;
}
if let Some(cloned) = unsafe { PyObject::try_to_owned_from_ptr(ptr) } {
let same_ptr = core::ptr::eq(entry.value.load(Ordering::Relaxed), ptr);
if same_ptr && entry.end_read(seq1) {
return Some(cloned);
}
drop(cloned);
continue;
}
break;
}
}
let assigned = if version == 0 {
self.assign_version_tag()
} else {
version
};
let result = self.find_name_in_mro_uncached(name);
if let Some(ref found) = result
&& assigned != 0
&& !TYPE_CACHE_CLEARING.load(Ordering::Acquire)
&& self.tp_version_tag.load(Ordering::Acquire) == assigned
{
let idx = type_cache_hash(assigned, name);
let entry = &TYPE_CACHE[idx];
let name_ptr = name as *const _ as *mut _;
entry.begin_write();
entry.version.store(0, Ordering::Release);
let new_ptr = &**found as *const PyObject as *mut PyObject;
entry.value.store(new_ptr, Ordering::Relaxed);
entry.name.store(name_ptr, Ordering::Relaxed);
entry.version.store(assigned, Ordering::Release);
entry.end_write();
}
result
}
fn find_name_in_mro_uncached(&self, name: &'static PyStrInterned) -> Option<PyObjectRef> {
for cls in self.mro.read().iter() {
if let Some(value) = cls.attributes.read().get(name) {
return Some(value.clone());
}
}
None
}
pub fn lookup_ref(&self, name: &Py<PyStr>, vm: &VirtualMachine) -> Option<PyObjectRef> {
let interned_name = vm.ctx.interned_str(name)?;
self.find_name_in_mro(interned_name)
}
pub fn get_super_attr(&self, attr_name: &'static PyStrInterned) -> Option<PyObjectRef> {
self.mro.read()[1..]
.iter()
.find_map(|class| class.attributes.read().get(attr_name).cloned())
}
pub fn has_attr(&self, attr_name: &'static PyStrInterned) -> bool {
self.has_name_in_mro(attr_name)
}
fn has_name_in_mro(&self, name: &'static PyStrInterned) -> bool {
let version = self.tp_version_tag.load(Ordering::Acquire);
if version != 0 {
let idx = type_cache_hash(version, name);
let entry = &TYPE_CACHE[idx];
let name_ptr = name as *const _ as *mut _;
loop {
let seq1 = entry.begin_read();
let v1 = entry.version.load(Ordering::Acquire);
let type_version = self.tp_version_tag.load(Ordering::Acquire);
if v1 != type_version
|| !core::ptr::eq(entry.name.load(Ordering::Relaxed), name_ptr)
{
break;
}
let ptr = entry.value.load(Ordering::Acquire);
if entry.end_read(seq1) {
if !ptr.is_null() {
return true;
}
break;
}
continue;
}
}
self.find_name_in_mro(name).is_some()
}
pub fn get_attributes(&self) -> PyAttributes {
let mut attributes = PyAttributes::default();
for bc in self.mro.read().iter().map(|cls| -> &Self { cls }).rev() {
for (name, value) in bc.attributes.read().iter() {
attributes.insert(name.to_owned(), value.clone());
}
}
attributes
}
pub(crate) fn __new__(zelf: PyRef<Self>, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
let (subtype, args): (PyRef<Self>, FuncArgs) = args.bind(vm)?;
if !subtype.fast_issubclass(&zelf) {
return Err(vm.new_type_error(format!(
"{zelf}.__new__({subtype}): {subtype} is not a subtype of {zelf}",
zelf = zelf.name(),
subtype = subtype.name(),
)));
}
call_slot_new(zelf, subtype, args, vm)
}
fn name_inner<'a, R: 'a>(
&'a self,
static_f: impl FnOnce(&'static str) -> R,
heap_f: impl FnOnce(&'a HeapTypeExt) -> R,
) -> R {
if let Some(ref ext) = self.heaptype_ext {
heap_f(ext)
} else {
static_f(self.slots.name)
}
}
pub fn slot_name(&self) -> BorrowedValue<'_, str> {
self.name_inner(
|name| name.into(),
|ext| {
PyRwLockReadGuard::map(ext.name.read(), |name: &PyUtf8StrRef| -> &str {
name.as_str()
})
.into()
},
)
}
pub fn name(&self) -> BorrowedValue<'_, str> {
self.name_inner(
|name| name.rsplit_once('.').map_or(name, |(_, name)| name).into(),
|ext| {
PyRwLockReadGuard::map(ext.name.read(), |name: &PyUtf8StrRef| -> &str {
name.as_str()
})
.into()
},
)
}
pub fn init_type_data<T: Any + Send + Sync + 'static>(&self, data: T) -> Result<(), String> {
let ext = self
.heaptype_ext
.as_ref()
.ok_or_else(|| "Cannot set type data on non-heap types".to_string())?;
let mut type_data = ext.type_data.write();
if type_data.is_some() {
return Err("Type data already initialized".to_string());
}
*type_data = Some(TypeDataSlot::new(data));
Ok(())
}
pub fn get_type_data<T: Any + 'static>(&self) -> Option<TypeDataRef<'_, T>> {
self.heaptype_ext
.as_ref()
.and_then(|ext| TypeDataRef::try_new(ext.type_data.read()))
}
pub fn get_type_data_mut<T: Any + 'static>(&self) -> Option<TypeDataRefMut<'_, T>> {
self.heaptype_ext
.as_ref()
.and_then(|ext| TypeDataRefMut::try_new(ext.type_data.write()))
}
pub fn has_type_data<T: Any + 'static>(&self) -> bool {
self.heaptype_ext.as_ref().is_some_and(|ext| {
ext.type_data
.read()
.as_ref()
.is_some_and(|slot| slot.get::<T>().is_some())
})
}
}
impl Py<PyType> {
pub(crate) fn is_subtype(&self, other: &Self) -> bool {
is_subtype_with_mro(&self.mro.read(), self, other)
}
pub fn check_exact<'a>(obj: &'a PyObject, vm: &VirtualMachine) -> Option<&'a Self> {
obj.downcast_ref_if_exact::<PyType>(vm)
}
pub fn fast_issubclass(&self, cls: &impl Borrow<PyObject>) -> bool {
self.as_object().is(cls.borrow()) || self.mro.read()[1..].iter().any(|c| c.is(cls.borrow()))
}
pub fn mro_map_collect<F, R>(&self, f: F) -> Vec<R>
where
F: Fn(&Self) -> R,
{
self.mro.read().iter().map(|x| x.deref()).map(f).collect()
}
pub fn mro_collect(&self) -> Vec<PyRef<PyType>> {
self.mro
.read()
.iter()
.map(|x| x.deref())
.map(|x| x.to_owned())
.collect()
}
pub fn iter_base_chain(&self) -> impl Iterator<Item = &Self> {
core::iter::successors(Some(self), |cls| cls.base.as_deref())
}
pub fn extend_methods(&'static self, method_defs: &'static [PyMethodDef], ctx: &Context) {
for method_def in method_defs {
let method = method_def.to_proper_method(self, ctx);
self.set_attr(ctx.intern_str(method_def.name), method);
}
}
}
#[pyclass(
with(
Py,
Constructor,
Initializer,
GetAttr,
SetAttr,
Callable,
AsNumber,
Representable
),
flags(BASETYPE, HAS_DICT, HAS_WEAKREF)
)]
impl PyType {
#[pygetset]
fn __bases__(&self, vm: &VirtualMachine) -> PyTupleRef {
vm.ctx.new_tuple(
self.bases
.read()
.iter()
.map(|x| x.as_object().to_owned())
.collect(),
)
}
#[pygetset(setter, name = "__bases__")]
fn set_bases(zelf: &Py<Self>, bases: Vec<PyTypeRef>, vm: &VirtualMachine) -> PyResult<()> {
if zelf.slots.flags.has_feature(PyTypeFlags::IMMUTABLETYPE) {
return Err(vm.new_type_error(format!(
"cannot set '__bases__' attribute of immutable type '{}'",
zelf.name()
)));
}
if bases.is_empty() {
return Err(vm.new_type_error(format!(
"can only assign non-empty tuple to %s.__bases__, not {}",
zelf.name()
)));
}
*zelf.bases.write() = bases;
fn update_mro_recursively(cls: &PyType, vm: &VirtualMachine) -> PyResult<()> {
let mut mro =
PyType::resolve_mro(&cls.bases.read()).map_err(|msg| vm.new_type_error(msg))?;
mro.insert(0, cls.mro.read()[0].to_owned());
*cls.mro.write() = mro;
for subclass in cls.subclasses.write().iter() {
let subclass = subclass.upgrade().unwrap();
let subclass: &Py<PyType> = subclass.downcast_ref().unwrap();
update_mro_recursively(subclass, vm)?;
}
Ok(())
}
update_mro_recursively(zelf, vm)?;
zelf.modified();
zelf.init_slots(&vm.ctx);
let weakref_type = super::PyWeak::static_type();
for base in zelf.bases.read().iter() {
base.subclasses.write().push(
zelf.as_object()
.downgrade_with_weakref_typ_opt(None, weakref_type.to_owned())
.unwrap(),
);
}
Ok(())
}
#[pygetset]
fn __base__(&self) -> Option<PyTypeRef> {
self.base.clone()
}
#[pygetset]
const fn __flags__(&self) -> u64 {
self.slots.flags.bits()
}
#[pygetset]
fn __basicsize__(&self) -> usize {
crate::object::SIZEOF_PYOBJECT_HEAD + self.slots.basicsize
}
#[pygetset]
fn __itemsize__(&self) -> usize {
self.slots.itemsize
}
#[pygetset]
pub fn __name__(&self, vm: &VirtualMachine) -> PyStrRef {
self.name_inner(
|name| {
vm.ctx
.interned_str(name.rsplit_once('.').map_or(name, |(_, name)| name))
.unwrap_or_else(|| {
panic!(
"static type name must be already interned but {} is not",
self.slot_name()
)
})
.to_owned()
},
|ext| ext.name.read().clone().into_wtf8(),
)
}
#[pygetset]
pub fn __qualname__(&self, vm: &VirtualMachine) -> PyObjectRef {
if let Some(ref heap_type) = self.heaptype_ext {
heap_type.qualname.read().clone().into()
} else {
vm.ctx.new_str(self.name().deref()).into()
}
}
#[pygetset(setter)]
fn set___qualname__(&self, value: PySetterValue, vm: &VirtualMachine) -> PyResult<()> {
self.check_set_special_type_attr(identifier!(vm, __qualname__), vm)?;
let value = value.ok_or_else(|| {
vm.new_type_error(format!(
"cannot delete '__qualname__' attribute of immutable type '{}'",
self.name()
))
})?;
let str_value = downcast_qualname(value, vm)?;
let heap_type = self.heaptype_ext.as_ref().ok_or_else(|| {
vm.new_type_error(format!(
"cannot set '__qualname__' attribute of immutable type '{}'",
self.name()
))
})?;
let _old_qualname = {
let mut qualname_guard = heap_type.qualname.write();
core::mem::replace(&mut *qualname_guard, str_value)
};
Ok(())
}
#[pygetset]
fn __annotate__(&self, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
if !self.slots.flags.has_feature(PyTypeFlags::HEAPTYPE) {
return Err(vm.new_attribute_error(format!(
"type object '{}' has no attribute '__annotate__'",
self.name()
)));
}
let mut attrs = self.attributes.write();
if let Some(annotate) = attrs.get(identifier!(vm, __annotate__)).cloned() {
return Ok(annotate);
}
if let Some(annotate) = attrs.get(identifier!(vm, __annotate_func__)).cloned() {
return Ok(annotate);
}
let none = vm.ctx.none();
attrs.insert(identifier!(vm, __annotate_func__), none.clone());
Ok(none)
}
#[pygetset(setter)]
fn set___annotate__(&self, value: PySetterValue, vm: &VirtualMachine) -> PyResult<()> {
let value = match value {
PySetterValue::Delete => {
return Err(vm.new_type_error("cannot delete __annotate__ attribute"));
}
PySetterValue::Assign(v) => v,
};
if self.slots.flags.has_feature(PyTypeFlags::IMMUTABLETYPE) {
return Err(vm.new_type_error(format!(
"cannot set '__annotate__' attribute of immutable type '{}'",
self.name()
)));
}
if !vm.is_none(&value) && !value.is_callable() {
return Err(vm.new_type_error("__annotate__ must be callable or None"));
}
let mut attrs = self.attributes.write();
if !vm.is_none(&value) {
attrs.swap_remove(identifier!(vm, __annotations_cache__));
}
attrs.insert(identifier!(vm, __annotate_func__), value.clone());
Ok(())
}
#[pygetset]
fn __annotations__(&self, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
let attrs = self.attributes.read();
if let Some(annotations) = attrs.get(identifier!(vm, __annotations__)).cloned() {
if !annotations.class().is(vm.ctx.types.getset_type) {
if vm.is_none(&annotations)
|| annotations.class().is(vm.ctx.types.dict_type)
|| self.slots.flags.has_feature(PyTypeFlags::HEAPTYPE)
{
return Ok(annotations);
}
return Err(vm.new_attribute_error(format!(
"type object '{}' has no attribute '__annotations__'",
self.name()
)));
}
}
if let Some(annotations) = attrs.get(identifier!(vm, __annotations_cache__)).cloned() {
if vm.is_none(&annotations)
|| annotations.class().is(vm.ctx.types.dict_type)
|| self.slots.flags.has_feature(PyTypeFlags::HEAPTYPE)
{
return Ok(annotations);
}
return Err(vm.new_attribute_error(format!(
"type object '{}' has no attribute '__annotations__'",
self.name()
)));
}
drop(attrs);
if !self.slots.flags.has_feature(PyTypeFlags::HEAPTYPE) {
return Err(vm.new_attribute_error(format!(
"type object '{}' has no attribute '__annotations__'",
self.name()
)));
}
let annotate = self.__annotate__(vm)?;
let annotations = if annotate.is_callable() {
let result = annotate.call((1i32,), vm)?;
if !result.class().is(vm.ctx.types.dict_type) {
return Err(vm.new_type_error(format!(
"__annotate__ returned non-dict of type '{}'",
result.class().name()
)));
}
result
} else {
vm.ctx.new_dict().into()
};
self.attributes
.write()
.insert(identifier!(vm, __annotations_cache__), annotations.clone());
Ok(annotations)
}
#[pygetset(setter)]
fn set___annotations__(
&self,
value: crate::function::PySetterValue<PyObjectRef>,
vm: &VirtualMachine,
) -> PyResult<()> {
if self.slots.flags.has_feature(PyTypeFlags::IMMUTABLETYPE) {
return Err(vm.new_type_error(format!(
"cannot set '__annotations__' attribute of immutable type '{}'",
self.name()
)));
}
let mut attrs = self.attributes.write();
let has_annotations = attrs.contains_key(identifier!(vm, __annotations__));
match value {
crate::function::PySetterValue::Assign(value) => {
let key = if has_annotations {
identifier!(vm, __annotations__)
} else {
identifier!(vm, __annotations_cache__)
};
attrs.insert(key, value);
if has_annotations {
attrs.swap_remove(identifier!(vm, __annotations_cache__));
}
}
crate::function::PySetterValue::Delete => {
let removed = if has_annotations {
attrs
.swap_remove(identifier!(vm, __annotations__))
.is_some()
} else {
attrs
.swap_remove(identifier!(vm, __annotations_cache__))
.is_some()
};
if !removed {
return Err(vm.new_attribute_error("__annotations__"));
}
if has_annotations {
attrs.swap_remove(identifier!(vm, __annotations_cache__));
}
}
}
attrs.swap_remove(identifier!(vm, __annotate_func__));
attrs.swap_remove(identifier!(vm, __annotate__));
Ok(())
}
#[pygetset]
pub fn __module__(&self, vm: &VirtualMachine) -> PyObjectRef {
self.attributes
.read()
.get(identifier!(vm, __module__))
.cloned()
.and_then(|found| {
if found.fast_isinstance(vm.ctx.types.getset_type) {
None
} else {
Some(found)
}
})
.unwrap_or_else(|| {
let slot_name = self.slot_name();
if let Some((module, _)) = slot_name.rsplit_once('.') {
vm.ctx.intern_str(module).to_object()
} else {
vm.ctx.new_str(ascii!("builtins")).into()
}
})
}
#[pygetset(setter)]
fn set___module__(&self, value: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
self.check_set_special_type_attr(identifier!(vm, __module__), vm)?;
let mut attributes = self.attributes.write();
attributes.swap_remove(identifier!(vm, __firstlineno__));
attributes.insert(identifier!(vm, __module__), value);
Ok(())
}
#[pyclassmethod]
fn __prepare__(
_cls: PyTypeRef,
_name: OptionalArg<PyObjectRef>,
_bases: OptionalArg<PyObjectRef>,
_kwargs: KwArgs,
vm: &VirtualMachine,
) -> PyDictRef {
vm.ctx.new_dict()
}
#[pymethod]
fn __subclasses__(&self) -> PyList {
let mut subclasses = self.subclasses.write();
subclasses.retain(|x| x.upgrade().is_some());
PyList::from(
subclasses
.iter()
.map(|x| x.upgrade().unwrap())
.collect::<Vec<_>>(),
)
}
pub fn __ror__(zelf: PyObjectRef, other: PyObjectRef, vm: &VirtualMachine) -> PyResult {
or_(other, zelf, vm)
}
pub fn __or__(zelf: PyObjectRef, other: PyObjectRef, vm: &VirtualMachine) -> PyResult {
or_(zelf, other, vm)
}
#[pygetset]
fn __dict__(zelf: PyRef<Self>) -> PyMappingProxy {
PyMappingProxy::from(zelf)
}
#[pygetset(setter)]
fn set___dict__(&self, _value: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
Err(vm.new_not_implemented_error(
"Setting __dict__ attribute on a type isn't yet implemented",
))
}
fn check_set_special_type_attr(
&self,
name: &PyStrInterned,
vm: &VirtualMachine,
) -> PyResult<()> {
if self.slots.flags.has_feature(PyTypeFlags::IMMUTABLETYPE) {
return Err(vm.new_type_error(format!(
"cannot set '{}' attribute of immutable type '{}'",
name,
self.slot_name()
)));
}
Ok(())
}
#[pygetset(setter)]
fn set___name__(&self, value: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
self.check_set_special_type_attr(identifier!(vm, __name__), vm)?;
let name = value.downcast::<PyStr>().map_err(|value| {
vm.new_type_error(format!(
"can only assign string to {}.__name__, not '{}'",
self.slot_name(),
value.class().slot_name(),
))
})?;
if name.as_bytes().contains(&0) {
return Err(vm.new_value_error("type name must not contain null characters"));
}
let name = name.try_into_utf8(vm)?;
let heap_type = self.heaptype_ext.as_ref().ok_or_else(|| {
vm.new_type_error(format!(
"cannot set '__name__' attribute of immutable type '{}'",
self.slot_name()
))
})?;
let _old_name = {
let mut name_guard = heap_type.name.write();
core::mem::replace(&mut *name_guard, name)
};
Ok(())
}
#[pygetset]
fn __text_signature__(&self) -> Option<String> {
self.slots
.doc
.and_then(|doc| get_text_signature_from_internal_doc(&self.name(), doc))
.map(|signature| signature.to_string())
}
#[pygetset]
fn __type_params__(&self, vm: &VirtualMachine) -> PyTupleRef {
let attrs = self.attributes.read();
let key = identifier!(vm, __type_params__);
if let Some(params) = attrs.get(&key)
&& let Ok(tuple) = params.clone().downcast::<PyTuple>()
{
return tuple;
}
vm.ctx.empty_tuple.clone()
}
#[pygetset(setter)]
fn set___type_params__(
&self,
value: PySetterValue<PyTupleRef>,
vm: &VirtualMachine,
) -> PyResult<()> {
match value {
PySetterValue::Assign(ref val) => {
let key = identifier!(vm, __type_params__);
self.check_set_special_type_attr(key, vm)?;
self.modified();
self.attributes.write().insert(key, val.clone().into());
}
PySetterValue::Delete => {
if self.slots.flags.has_feature(PyTypeFlags::IMMUTABLETYPE) {
return Err(vm.new_type_error(format!(
"cannot delete '__type_params__' attribute of immutable type '{}'",
self.slot_name()
)));
}
let key = identifier!(vm, __type_params__);
self.modified();
self.attributes.write().shift_remove(&key);
}
}
Ok(())
}
}
impl Constructor for PyType {
type Args = FuncArgs;
fn slot_new(metatype: PyTypeRef, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
vm_trace!("type.__new__ {:?}", args);
let is_type_type = metatype.is(vm.ctx.types.type_type);
if is_type_type && args.args.len() == 1 && args.kwargs.is_empty() {
return Ok(args.args[0].class().to_owned().into());
}
if args.args.len() != 3 {
return Err(vm.new_type_error(if is_type_type {
"type() takes 1 or 3 arguments".to_owned()
} else {
format!(
"type.__new__() takes exactly 3 arguments ({} given)",
args.args.len()
)
}));
}
let (name, bases, dict, kwargs): (PyStrRef, PyTupleRef, PyDictRef, KwArgs) =
args.clone().bind(vm)?;
if name.as_bytes().contains(&0) {
return Err(vm.new_value_error("type name must not contain null characters"));
}
let name = name.try_into_utf8(vm)?;
let (metatype, base, bases, base_is_type) = if bases.is_empty() {
let base = vm.ctx.types.object_type.to_owned();
(metatype, base.clone(), vec![base], false)
} else {
let bases = bases
.iter()
.map(|obj| {
obj.clone().downcast::<Self>().or_else(|obj| {
if vm
.get_attribute_opt(obj, identifier!(vm, __mro_entries__))?
.is_some()
{
Err(vm.new_type_error(
"type() doesn't support MRO entry resolution; \
use types.new_class()",
))
} else {
Err(vm.new_type_error("bases must be types"))
}
})
})
.collect::<PyResult<Vec<_>>>()?;
let winner = calculate_meta_class(metatype.clone(), &bases, vm)?;
let metatype = if !winner.is(&metatype) {
if let Some(ref slot_new) = winner.slots.new.load() {
return slot_new(winner, args, vm);
}
winner
} else {
metatype
};
let base = best_base(&bases, vm)?;
let base_is_type = base.is(vm.ctx.types.type_type);
(metatype, base.to_owned(), bases, base_is_type)
};
let qualname = dict
.get_item_opt(identifier!(vm, __qualname__), vm)?
.map(|obj| downcast_qualname(obj, vm))
.transpose()?
.unwrap_or_else(|| {
name.clone().into_wtf8()
});
let mut attributes = dict.to_attributes(vm);
attributes.shift_remove(identifier!(vm, __qualname__));
if let Some(doc) = attributes.get(identifier!(vm, __doc__))
&& let Some(doc_str) = doc.downcast_ref::<PyStr>()
{
doc_str.ensure_valid_utf8(vm)?;
}
if let Some(f) = attributes.get_mut(identifier!(vm, __init_subclass__))
&& f.class().is(vm.ctx.types.function_type)
{
*f = PyClassMethod::from(f.clone()).into_pyobject(vm);
}
if let Some(f) = attributes.get_mut(identifier!(vm, __class_getitem__))
&& f.class().is(vm.ctx.types.function_type)
{
*f = PyClassMethod::from(f.clone()).into_pyobject(vm);
}
if let Some(f) = attributes.get_mut(identifier!(vm, __new__))
&& f.class().is(vm.ctx.types.function_type)
{
*f = PyStaticMethod::from(f.clone()).into_pyobject(vm);
}
if let Some(current_frame) = vm.current_frame() {
let entry = attributes.entry(identifier!(vm, __module__));
if matches!(entry, Entry::Vacant(_)) {
let module_name = vm.unwrap_or_none(
current_frame
.globals
.get_item_opt(identifier!(vm, __name__), vm)?,
);
entry.or_insert(module_name);
}
}
if attributes.get(identifier!(vm, __eq__)).is_some()
&& attributes.get(identifier!(vm, __hash__)).is_none()
{
attributes.insert(identifier!(vm, __hash__), vm.ctx.none.clone().into());
}
let (heaptype_slots, add_dict, add_weakref): (
Option<PyRef<PyTuple<PyStrRef>>>,
bool,
bool,
) = if let Some(x) = attributes.get(identifier!(vm, __slots__)) {
if x.class().is(vm.ctx.types.bytes_type) {
return Err(vm.new_type_error("__slots__ items must be strings, not 'bytes'"));
}
let slots = if x.class().is(vm.ctx.types.str_type) {
let x = unsafe { x.downcast_unchecked_ref::<PyStr>() };
PyTuple::new_ref_typed(vec![x.to_owned()], &vm.ctx)
} else {
let iter = x.get_iter(vm)?;
let elements = {
let mut elements = Vec::new();
while let PyIterReturn::Return(element) = iter.next(vm)? {
if element.class().is(vm.ctx.types.bytes_type) {
return Err(
vm.new_type_error("__slots__ items must be strings, not 'bytes'")
);
}
elements.push(element);
}
elements
};
let tuple = elements.into_pytuple(vm);
tuple.try_into_typed(vm)?
};
let has_custom_slots = slots
.iter()
.any(|s| !matches!(s.as_bytes(), b"__dict__" | b"__weakref__"));
if has_custom_slots && base.slots.itemsize > 0 {
return Err(vm.new_type_error(format!(
"nonempty __slots__ not supported for subtype of '{}'",
base.name()
)));
}
let mut seen_dict = false;
let mut seen_weakref = false;
for slot in slots.iter() {
if !slot.isidentifier() {
return Err(vm.new_type_error("__slots__ must be identifiers"));
}
let slot_name = slot.as_bytes();
if slot_name == b"__dict__" {
if seen_dict {
return Err(
vm.new_type_error("__dict__ slot disallowed: we already got one")
);
}
seen_dict = true;
}
if slot_name == b"__weakref__" {
if seen_weakref {
return Err(
vm.new_type_error("__weakref__ slot disallowed: we already got one")
);
}
seen_weakref = true;
}
if attributes.contains_key(vm.ctx.intern_str(slot.as_wtf8())) {
return Err(vm.new_value_error(format!(
"'{}' in __slots__ conflicts with a class variable",
slot.as_wtf8()
)));
}
}
if seen_dict && base.slots.flags.has_feature(PyTypeFlags::HAS_DICT) {
return Err(vm.new_type_error("__dict__ slot disallowed: we already got one"));
}
if seen_weakref && base.slots.flags.has_feature(PyTypeFlags::HAS_WEAKREF) {
return Err(vm.new_type_error("__weakref__ slot disallowed: we already got one"));
}
let dict_name = "__dict__";
let weakref_name = "__weakref__";
let has_dict = slots.iter().any(|s| s.as_wtf8() == dict_name);
let add_weakref = seen_weakref;
let filtered_slots = if has_dict || add_weakref {
let filtered: Vec<PyStrRef> = slots
.iter()
.filter(|s| s.as_wtf8() != dict_name && s.as_wtf8() != weakref_name)
.cloned()
.collect();
PyTuple::new_ref_typed(filtered, &vm.ctx)
} else {
slots
};
(Some(filtered_slots), has_dict, add_weakref)
} else {
(None, false, false)
};
let base_member_count = bases
.iter()
.map(|base| base.slots.member_count)
.max()
.unwrap();
let heaptype_member_count = heaptype_slots.as_ref().map(|x| x.len()).unwrap_or(0);
let member_count: usize = base_member_count + heaptype_member_count;
let mut flags = PyTypeFlags::heap_type_flags();
let may_add_dict = !base.slots.flags.has_feature(PyTypeFlags::HAS_DICT);
if (heaptype_slots.is_none() && may_add_dict) || add_dict {
flags |= PyTypeFlags::HAS_DICT | PyTypeFlags::MANAGED_DICT;
}
let may_add_weakref = !base.slots.flags.has_feature(PyTypeFlags::HAS_WEAKREF);
if (heaptype_slots.is_none() && may_add_weakref) || add_weakref {
flags |= PyTypeFlags::HAS_WEAKREF | PyTypeFlags::MANAGED_WEAKREF;
}
let (slots, heaptype_ext) = {
let slots = PyTypeSlots {
flags,
member_count,
itemsize: base.slots.itemsize,
..PyTypeSlots::heap_default()
};
let heaptype_ext = HeapTypeExt {
name: PyRwLock::new(name),
qualname: PyRwLock::new(qualname),
slots: heaptype_slots.clone(),
type_data: PyRwLock::new(None),
specialization_cache: TypeSpecializationCache::new(),
};
(slots, heaptype_ext)
};
let typ = Self::new_heap_inner(
base,
bases,
attributes,
slots,
heaptype_ext,
metatype,
&vm.ctx,
)
.map_err(|e| vm.new_type_error(e))?;
if let Some(ref slots) = heaptype_slots {
let mut offset = base_member_count;
let class_name = typ.name().to_string();
for member in slots.as_slice() {
let member_str = member
.to_str()
.ok_or_else(|| vm.new_type_error("__slots__ must be valid UTF-8 strings"))?;
let mangled_name = mangle_name(&class_name, member_str);
let member_def = PyMemberDef {
name: mangled_name.clone(),
kind: MemberKind::ObjectEx,
getter: MemberGetter::Offset(offset),
setter: MemberSetter::Offset(offset),
doc: None,
};
let attr_name = vm.ctx.intern_str(mangled_name.as_str());
let member_descriptor: PyRef<PyMemberDescriptor> =
vm.ctx.new_pyref(PyMemberDescriptor {
common: PyDescriptorOwned {
typ: typ.clone(),
name: attr_name,
qualname: PyRwLock::new(None),
},
member: member_def,
});
typ.set_attr(attr_name, member_descriptor.into());
offset += 1;
}
}
{
let mut attrs = typ.attributes.write();
if let Some(cell) = attrs.get(identifier!(vm, __classcell__)) {
let cell = PyCellRef::try_from_object(vm, cell.clone()).map_err(|_| {
vm.new_type_error(format!(
"__classcell__ must be a nonlocal cell, not {}",
cell.class().name()
))
})?;
cell.set(Some(typ.clone().into()));
attrs.shift_remove(identifier!(vm, __classcell__));
}
if let Some(cell) = attrs.get(identifier!(vm, __classdictcell__)) {
let cell = PyCellRef::try_from_object(vm, cell.clone()).map_err(|_| {
vm.new_type_error(format!(
"__classdictcell__ must be a nonlocal cell, not {}",
cell.class().name()
))
})?;
cell.set(Some(dict.clone().into()));
attrs.shift_remove(identifier!(vm, __classdictcell__));
}
}
if !base_is_type && typ.slots.flags.has_feature(PyTypeFlags::HAS_DICT) {
let __dict__ = identifier!(vm, __dict__);
let has_inherited_dict = typ
.mro
.read()
.iter()
.any(|base| base.attributes.read().contains_key(&__dict__));
if !typ.attributes.read().contains_key(&__dict__) && !has_inherited_dict {
unsafe {
let descriptor =
vm.ctx
.new_getset("__dict__", &typ, subtype_get_dict, subtype_set_dict);
typ.attributes.write().insert(__dict__, descriptor.into());
}
}
}
if typ.slots.flags.has_feature(PyTypeFlags::HAS_WEAKREF) {
let __weakref__ = vm.ctx.intern_str("__weakref__");
let has_inherited_weakref = typ
.mro
.read()
.iter()
.any(|base| base.attributes.read().contains_key(&__weakref__));
if !typ.attributes.read().contains_key(&__weakref__) && !has_inherited_weakref {
unsafe {
let descriptor = vm.ctx.new_getset(
"__weakref__",
&typ,
subtype_get_weakref,
subtype_set_weakref,
);
typ.attributes
.write()
.insert(__weakref__, descriptor.into());
}
}
}
{
let __doc__ = identifier!(vm, __doc__);
if !typ.attributes.read().contains_key(&__doc__) {
typ.attributes.write().insert(__doc__, vm.ctx.none());
}
}
let attributes = typ
.attributes
.read()
.iter()
.filter_map(|(name, obj)| {
vm.get_method(obj.clone(), identifier!(vm, __set_name__))
.map(|res| res.map(|meth| (obj.clone(), name.to_owned(), meth)))
})
.collect::<PyResult<Vec<_>>>()?;
for (obj, name, set_name) in attributes {
set_name.call((typ.clone(), name), vm).inspect_err(|e| {
let note = format!(
"Error calling __set_name__ on '{}' instance '{}' in '{}'",
obj.class().name(),
name,
typ.name()
);
drop(vm.call_method(e.as_object(), "add_note", (vm.ctx.new_str(note.as_str()),)));
})?;
}
if let Some(init_subclass) = typ.get_super_attr(identifier!(vm, __init_subclass__)) {
let init_subclass = vm
.call_get_descriptor_specific(&init_subclass, None, Some(typ.clone().into()))
.unwrap_or(Ok(init_subclass))?;
init_subclass.call(kwargs, vm)?;
};
Ok(typ.into())
}
fn py_new(_cls: &Py<PyType>, _args: Self::Args, _vm: &VirtualMachine) -> PyResult<Self> {
unimplemented!("use slot_new")
}
}
const SIGNATURE_END_MARKER: &str = ")\n--\n\n";
fn get_signature(doc: &str) -> Option<&str> {
doc.find(SIGNATURE_END_MARKER).map(|index| &doc[..=index])
}
fn find_signature<'a>(name: &str, doc: &'a str) -> Option<&'a str> {
let name = name.rsplit('.').next().unwrap();
let doc = doc.strip_prefix(name)?;
doc.starts_with('(').then_some(doc)
}
pub(crate) fn get_text_signature_from_internal_doc<'a>(
name: &str,
internal_doc: &'a str,
) -> Option<&'a str> {
find_signature(name, internal_doc).and_then(get_signature)
}
fn get_doc_from_internal_doc<'a>(name: &str, internal_doc: &'a str) -> &'a str {
if let Some(doc_without_sig) = find_signature(name, internal_doc) {
if let Some(sig_end_pos) = doc_without_sig.find(SIGNATURE_END_MARKER) {
let after_sig = &doc_without_sig[sig_end_pos + SIGNATURE_END_MARKER.len()..];
return after_sig;
}
}
internal_doc
}
impl Initializer for PyType {
type Args = FuncArgs;
fn slot_init(_zelf: PyObjectRef, args: FuncArgs, vm: &VirtualMachine) -> PyResult<()> {
if args.args.len() == 1 && !args.kwargs.is_empty() {
return Err(vm.new_type_error("type.__init__() takes no keyword arguments"));
}
if args.args.len() != 1 && args.args.len() != 3 {
return Err(vm.new_type_error("type.__init__() takes 1 or 3 arguments"));
}
Ok(())
}
fn init(_zelf: PyRef<Self>, _args: Self::Args, _vm: &VirtualMachine) -> PyResult<()> {
unreachable!("slot_init is defined")
}
}
impl GetAttr for PyType {
fn getattro(zelf: &Py<Self>, name_str: &Py<PyStr>, vm: &VirtualMachine) -> PyResult {
#[cold]
fn attribute_error(
zelf: &Py<PyType>,
name: &Wtf8,
vm: &VirtualMachine,
) -> PyBaseExceptionRef {
vm.new_attribute_error(format!(
"type object '{}' has no attribute '{}'",
zelf.slot_name(),
name,
))
}
let Some(name) = vm.ctx.interned_str(name_str) else {
return Err(attribute_error(zelf, name_str.as_wtf8(), vm));
};
vm_trace!("type.__getattribute__({:?}, {:?})", zelf, name);
let mcl = zelf.class();
let mcl_attr = mcl.get_attr(name);
if let Some(ref attr) = mcl_attr {
let attr_class = attr.class();
let has_descr_set = attr_class.slots.descr_set.load().is_some();
if has_descr_set {
let descr_get = attr_class.slots.descr_get.load();
if let Some(descr_get) = descr_get {
let mcl = mcl.to_owned().into();
return descr_get(attr.clone(), Some(zelf.to_owned().into()), Some(mcl), vm);
}
}
}
let zelf_attr = zelf.get_attr(name);
if let Some(attr) = zelf_attr {
let descr_get = attr.class().slots.descr_get.load();
if let Some(descr_get) = descr_get {
descr_get(attr, None, Some(zelf.to_owned().into()), vm)
} else {
Ok(attr)
}
} else if let Some(attr) = mcl_attr {
vm.call_if_get_descriptor(&attr, zelf.to_owned().into())
} else {
Err(attribute_error(zelf, name_str.as_wtf8(), vm))
}
}
}
#[pyclass]
impl Py<PyType> {
#[pygetset]
fn __mro__(&self) -> PyTuple {
let elements: Vec<PyObjectRef> = self.mro_map_collect(|x| x.as_object().to_owned());
PyTuple::new_unchecked(elements.into_boxed_slice())
}
#[pygetset]
fn __doc__(&self, vm: &VirtualMachine) -> PyResult {
if !self.slots.flags.has_feature(PyTypeFlags::HEAPTYPE)
&& let Some(internal_doc) = self.slots.doc
{
let doc_str = get_doc_from_internal_doc(&self.name(), internal_doc);
return Ok(vm.ctx.new_str(doc_str).into());
}
if let Some(doc_attr) = self.get_direct_attr(vm.ctx.intern_str("__doc__")) {
let descr_get = doc_attr.class().slots.descr_get.load();
if let Some(descr_get) = descr_get {
descr_get(doc_attr, None, Some(self.to_owned().into()), vm)
} else {
Ok(doc_attr)
}
} else {
Ok(vm.ctx.none())
}
}
#[pygetset(setter)]
fn set___doc__(&self, value: PySetterValue, vm: &VirtualMachine) -> PyResult<()> {
let value = value.ok_or_else(|| {
vm.new_type_error(format!(
"cannot delete '__doc__' attribute of type '{}'",
self.name()
))
})?;
self.check_set_special_type_attr(identifier!(vm, __doc__), vm)?;
self.attributes
.write()
.insert(identifier!(vm, __doc__), value);
Ok(())
}
#[pymethod]
fn __dir__(&self) -> PyList {
let attributes: Vec<PyObjectRef> = self
.get_attributes()
.into_iter()
.map(|(k, _)| k.to_object())
.collect();
PyList::from(attributes)
}
#[pymethod]
fn __instancecheck__(&self, obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<bool> {
obj.real_is_instance(self.as_object(), vm)
}
#[pymethod]
fn __subclasscheck__(&self, subclass: PyObjectRef, vm: &VirtualMachine) -> PyResult<bool> {
subclass.real_is_subclass(self.as_object(), vm)
}
#[pyclassmethod]
fn __subclasshook__(_args: FuncArgs, vm: &VirtualMachine) -> PyObjectRef {
vm.ctx.not_implemented()
}
#[pymethod]
fn mro(&self) -> Vec<PyObjectRef> {
self.mro_map_collect(|cls| cls.to_owned().into())
}
}
impl SetAttr for PyType {
fn setattro(
zelf: &Py<Self>,
attr_name: &Py<PyStr>,
value: PySetterValue,
vm: &VirtualMachine,
) -> PyResult<()> {
let attr_name = vm.ctx.intern_str(attr_name.as_wtf8());
if zelf.slots.flags.has_feature(PyTypeFlags::IMMUTABLETYPE) {
return Err(vm.new_type_error(format!(
"cannot set '{}' attribute of immutable type '{}'",
attr_name,
zelf.slot_name()
)));
}
if let Some(attr) = zelf.get_class_attr(attr_name) {
let descr_set = attr.class().slots.descr_set.load();
if let Some(descriptor) = descr_set {
return descriptor(&attr, zelf.to_owned().into(), value, vm);
}
}
let assign = value.is_assign();
zelf.modified();
if let PySetterValue::Assign(value) = value {
zelf.attributes.write().insert(attr_name, value);
} else {
let prev_value = zelf.attributes.write().shift_remove(attr_name); if prev_value.is_none() {
return Err(vm.new_attribute_error(format!(
"type object '{}' has no attribute '{}'",
zelf.name(),
attr_name,
)));
}
}
if attr_name.as_wtf8().starts_with("__") && attr_name.as_wtf8().ends_with("__") {
if assign {
zelf.update_slot::<true>(attr_name, &vm.ctx);
} else {
zelf.update_slot::<false>(attr_name, &vm.ctx);
}
}
Ok(())
}
}
impl Callable for PyType {
type Args = FuncArgs;
fn call(zelf: &Py<Self>, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
vm_trace!("type_call: {:?}", zelf);
if zelf.is(vm.ctx.types.type_type) {
let num_args = args.args.len();
if num_args == 1 && args.kwargs.is_empty() {
return Ok(args.args[0].obj_type());
}
if num_args != 3 {
return Err(vm.new_type_error("type() takes 1 or 3 arguments"));
}
}
let obj = if let Some(slot_new) = zelf.slots.new.load() {
slot_new(zelf.to_owned(), args.clone(), vm)?
} else {
return Err(vm.new_type_error(format!("cannot create '{}' instances", zelf.slots.name)));
};
if !obj.class().fast_issubclass(zelf) {
return Ok(obj);
}
if let Some(init_method) = obj.class().slots.init.load() {
init_method(obj.clone(), args, vm)?;
}
Ok(obj)
}
}
impl AsNumber for PyType {
fn as_number() -> &'static PyNumberMethods {
static AS_NUMBER: PyNumberMethods = PyNumberMethods {
or: Some(|a, b, vm| or_(a.to_owned(), b.to_owned(), vm)),
..PyNumberMethods::NOT_IMPLEMENTED
};
&AS_NUMBER
}
}
impl Representable for PyType {
#[inline]
fn repr_str(zelf: &Py<Self>, vm: &VirtualMachine) -> PyResult<String> {
let module = zelf.__module__(vm);
let module = module.downcast_ref::<PyStr>().map(|m| m.as_wtf8());
let repr = match module {
Some(module) if module != "builtins" => {
let qualname = zelf.__qualname__(vm);
let qualname = qualname.downcast_ref::<PyStr>().map(|n| n.as_wtf8());
let name = zelf.name();
let qualname = qualname.unwrap_or_else(|| name.as_ref());
format!("<class '{module}.{qualname}'>")
}
_ => format!("<class '{}'>", zelf.slot_name()),
};
Ok(repr)
}
}
fn get_builtin_base_with_dict(typ: &Py<PyType>, vm: &VirtualMachine) -> Option<PyTypeRef> {
let mut current = Some(typ.to_owned());
while let Some(t) = current {
if t.is(vm.ctx.types.type_type) {
return Some(t);
}
if t.slots.flags.contains(PyTypeFlags::HAS_DICT)
&& !t.slots.flags.contains(PyTypeFlags::HEAPTYPE)
{
return Some(t);
}
current = t.__base__();
}
None
}
fn get_dict_descriptor(base: &Py<PyType>, vm: &VirtualMachine) -> Option<PyObjectRef> {
let dict_attr = identifier!(vm, __dict__);
base.lookup_ref(dict_attr, vm)
}
fn raise_dict_descriptor_error(obj: &PyObject, vm: &VirtualMachine) -> PyBaseExceptionRef {
vm.new_type_error(format!(
"this __dict__ descriptor does not support '{}' objects",
obj.class().name()
))
}
fn subtype_get_dict(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult {
let base = get_builtin_base_with_dict(obj.class(), vm);
if let Some(base_type) = base {
if let Some(descr) = get_dict_descriptor(&base_type, vm) {
vm.call_get_descriptor(&descr, obj.clone())
.unwrap_or_else(|| Err(raise_dict_descriptor_error(&obj, vm)))
} else {
Err(raise_dict_descriptor_error(&obj, vm))
}
} else {
object::object_get_dict(obj, vm).map(Into::into)
}
}
fn subtype_set_dict(obj: PyObjectRef, value: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
let base = get_builtin_base_with_dict(obj.class(), vm);
if let Some(base_type) = base {
if let Some(descr) = get_dict_descriptor(&base_type, vm) {
let descr_set = descr
.class()
.slots
.descr_set
.load()
.ok_or_else(|| raise_dict_descriptor_error(&obj, vm))?;
descr_set(&descr, obj, PySetterValue::Assign(value), vm)
} else {
Err(raise_dict_descriptor_error(&obj, vm))
}
} else {
object::object_set_dict(obj, value.try_into_value(vm)?, vm)?;
Ok(())
}
}
fn subtype_get_weakref(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult {
let weakref = obj.get_weakrefs();
Ok(weakref.unwrap_or_else(|| vm.ctx.none()))
}
fn subtype_set_weakref(obj: PyObjectRef, _value: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
Err(vm.new_attribute_error(format!(
"attribute '__weakref__' of '{}' objects is not writable",
obj.class().name()
)))
}
fn vectorcall_type(
zelf_obj: &PyObject,
args: Vec<PyObjectRef>,
nargs: usize,
kwnames: Option<&[PyObjectRef]>,
vm: &VirtualMachine,
) -> PyResult {
let zelf: &Py<PyType> = zelf_obj.downcast_ref().unwrap();
if zelf.is(vm.ctx.types.type_type) {
let no_kwargs = kwnames.is_none_or(|kw| kw.is_empty());
if nargs == 1 && no_kwargs {
return Ok(args[0].obj_type());
}
} else if zelf.slots.call.load().is_none() && zelf.slots.new.load().is_some() {
if let Some(type_vc) = zelf.slots.vectorcall.load() {
return type_vc(zelf_obj, args, nargs, kwnames, vm);
}
}
let func_args = FuncArgs::from_vectorcall_owned(args, nargs, kwnames);
PyType::call(zelf, func_args, vm)
}
pub(crate) fn init(ctx: &'static Context) {
PyType::extend_class(ctx, ctx.types.type_type);
ctx.types
.type_type
.slots
.vectorcall
.store(Some(vectorcall_type));
}
pub(crate) fn call_slot_new(
typ: PyTypeRef,
subtype: PyTypeRef,
args: FuncArgs,
vm: &VirtualMachine,
) -> PyResult {
if subtype
.slots
.flags
.has_feature(PyTypeFlags::DISALLOW_INSTANTIATION)
{
return Err(vm.new_type_error(format!("cannot create '{}' instances", subtype.slot_name())));
}
let mut staticbase = subtype.clone();
while staticbase.slots.flags.has_feature(PyTypeFlags::HEAPTYPE) {
if let Some(base) = staticbase.base.as_ref() {
staticbase = base.clone();
} else {
break;
}
}
let typ_new = typ.slots.new.load();
let staticbase_new = staticbase.slots.new.load();
if typ_new.map(|f| f as usize) != staticbase_new.map(|f| f as usize) {
return Err(vm.new_type_error(format!(
"{}.__new__({}) is not safe, use {}.__new__()",
typ.slot_name(),
subtype.slot_name(),
staticbase.slot_name()
)));
}
let slot_new = typ
.slots
.new
.load()
.expect("Should be able to find a new slot somewhere in the mro");
slot_new(subtype, args, vm)
}
pub(crate) fn or_(zelf: PyObjectRef, other: PyObjectRef, vm: &VirtualMachine) -> PyResult {
union_::or_op(zelf, other, vm)
}
fn take_next_base(bases: &mut [Vec<PyTypeRef>]) -> Option<PyTypeRef> {
for base in bases.iter() {
let head = base[0].clone();
if !bases.iter().any(|x| x[1..].iter().any(|x| x.is(&head))) {
for item in bases.iter_mut() {
if item[0].is(&head) {
item.remove(0);
}
}
return Some(head);
}
}
None
}
fn linearise_mro(mut bases: Vec<Vec<PyTypeRef>>) -> Result<Vec<PyTypeRef>, String> {
vm_trace!("Linearise MRO: {:?}", bases);
for (i, base_mro) in bases.iter().enumerate() {
let base = &base_mro[0]; for later_mro in &bases[i + 1..] {
if later_mro[1..].iter().any(|cls| cls.is(base)) {
return Err(format!(
"Cannot create a consistent method resolution order (MRO) for bases {}",
bases.iter().map(|x| x.first().unwrap()).format(", ")
));
}
}
}
let mut result = vec![];
while !bases.is_empty() {
let head = take_next_base(&mut bases).ok_or_else(|| {
format!(
"Cannot create a consistent method resolution order (MRO) for bases {}",
bases.iter().map(|x| x.first().unwrap()).format(", ")
)
})?;
result.push(head);
bases.retain(|x| !x.is_empty());
}
Ok(result)
}
fn calculate_meta_class(
metatype: PyTypeRef,
bases: &[PyTypeRef],
vm: &VirtualMachine,
) -> PyResult<PyTypeRef> {
let mut winner = metatype;
for base in bases {
let base_type = base.class();
if winner.fast_issubclass(base_type) {
continue;
} else if base_type.fast_issubclass(&winner) {
winner = base_type.to_owned();
continue;
}
let winner_is_subclass = winner.as_object().is_subclass(base_type.as_object(), vm)?;
if winner_is_subclass {
continue;
}
let base_type_is_subclass = base_type.as_object().is_subclass(winner.as_object(), vm)?;
if base_type_is_subclass {
winner = base_type.to_owned();
continue;
}
return Err(vm.new_type_error(
"metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass \
of the metaclasses of all its bases",
));
}
Ok(winner)
}
fn shape_differs(t1: &Py<PyType>, t2: &Py<PyType>) -> bool {
t1.__basicsize__() != t2.__basicsize__() || t1.slots.itemsize != t2.slots.itemsize
}
fn solid_base<'a>(typ: &'a Py<PyType>, vm: &VirtualMachine) -> &'a Py<PyType> {
let base = if let Some(base) = &typ.base {
solid_base(base, vm)
} else {
vm.ctx.types.object_type
};
if shape_differs(typ, base) { typ } else { base }
}
fn best_base<'a>(bases: &'a [PyTypeRef], vm: &VirtualMachine) -> PyResult<&'a Py<PyType>> {
let mut base: Option<&Py<PyType>> = None;
let mut winner: Option<&Py<PyType>> = None;
for base_i in bases {
if !base_i.slots.flags.has_feature(PyTypeFlags::BASETYPE) {
return Err(vm.new_type_error(format!(
"type '{}' is not an acceptable base type",
base_i.slot_name()
)));
}
let candidate = solid_base(base_i, vm);
if winner.is_none() {
winner = Some(candidate);
base = Some(base_i.deref());
} else if winner.unwrap().fast_issubclass(candidate) {
} else if candidate.fast_issubclass(winner.unwrap()) {
winner = Some(candidate);
base = Some(base_i.deref());
} else {
return Err(vm.new_type_error("multiple bases have instance layout conflict"));
}
}
debug_assert!(base.is_some());
Ok(base.unwrap())
}
fn mangle_name(class_name: &str, name: &str) -> String {
if !name.starts_with("__") || name.ends_with("__") || name.contains('.') {
return name.to_string();
}
let class_name = class_name.trim_start_matches('_');
format!("_{}{}", class_name, name)
}
#[cfg(test)]
mod tests {
use super::*;
fn map_ids(obj: Result<Vec<PyTypeRef>, String>) -> Result<Vec<usize>, String> {
Ok(obj?.into_iter().map(|x| x.get_id()).collect())
}
#[test]
fn test_linearise() {
let context = Context::genesis();
let object = context.types.object_type.to_owned();
let type_type = context.types.type_type.to_owned();
let a = PyType::new_heap(
"A",
vec![object.clone()],
PyAttributes::default(),
Default::default(),
type_type.clone(),
context,
)
.unwrap();
let b = PyType::new_heap(
"B",
vec![object.clone()],
PyAttributes::default(),
Default::default(),
type_type,
context,
)
.unwrap();
assert_eq!(
map_ids(linearise_mro(vec![
vec![object.clone()],
vec![object.clone()]
])),
map_ids(Ok(vec![object.clone()]))
);
assert_eq!(
map_ids(linearise_mro(vec![
vec![a.clone(), object.clone()],
vec![b.clone(), object.clone()],
])),
map_ids(Ok(vec![a, b, object]))
);
}
}