pub(crate) use _operator::module_def;
#[pymodule]
mod _operator {
use crate::{
AsObject, Py, PyObjectRef, PyPayload, PyRef, PyResult, VirtualMachine,
builtins::{PyInt, PyIntRef, PyStr, PyStrRef, PyTupleRef, PyType, PyTypeRef, PyUtf8StrRef},
common::wtf8::{Wtf8, Wtf8Buf},
function::{ArgBytesLike, Either, FuncArgs, KwArgs, OptionalArg},
protocol::PyIter,
recursion::ReprGuard,
types::{Callable, Constructor, PyComparisonOp, Representable},
};
use constant_time_eq::constant_time_eq;
#[pyfunction]
fn lt(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
a.rich_compare(b, PyComparisonOp::Lt, vm)
}
#[pyfunction]
fn le(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
a.rich_compare(b, PyComparisonOp::Le, vm)
}
#[pyfunction]
fn gt(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
a.rich_compare(b, PyComparisonOp::Gt, vm)
}
#[pyfunction]
fn ge(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
a.rich_compare(b, PyComparisonOp::Ge, vm)
}
#[pyfunction]
fn eq(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
a.rich_compare(b, PyComparisonOp::Eq, vm)
}
#[pyfunction]
fn ne(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
a.rich_compare(b, PyComparisonOp::Ne, vm)
}
#[pyfunction]
fn not_(a: PyObjectRef, vm: &VirtualMachine) -> PyResult<bool> {
a.try_to_bool(vm).map(|r| !r)
}
#[pyfunction]
fn truth(a: PyObjectRef, vm: &VirtualMachine) -> PyResult<bool> {
a.try_to_bool(vm)
}
#[pyfunction]
fn is_(a: PyObjectRef, b: PyObjectRef) -> PyResult<bool> {
Ok(a.is(&b))
}
#[pyfunction]
fn is_not(a: PyObjectRef, b: PyObjectRef) -> PyResult<bool> {
Ok(!a.is(&b))
}
#[pyfunction]
fn abs(a: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._abs(&a)
}
#[pyfunction]
fn add(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._add(&a, &b)
}
#[pyfunction]
fn and_(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._and(&a, &b)
}
#[pyfunction]
fn floordiv(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._floordiv(&a, &b)
}
#[pyfunction]
fn index(a: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyIntRef> {
a.try_index(vm)
}
#[pyfunction]
fn invert(pos: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._invert(&pos)
}
#[pyfunction]
fn lshift(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._lshift(&a, &b)
}
#[pyfunction(name = "mod")]
fn mod_(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._mod(&a, &b)
}
#[pyfunction]
fn mul(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._mul(&a, &b)
}
#[pyfunction]
fn matmul(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._matmul(&a, &b)
}
#[pyfunction]
fn neg(pos: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._neg(&pos)
}
#[pyfunction]
fn or_(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._or(&a, &b)
}
#[pyfunction]
fn pos(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._pos(&obj)
}
#[pyfunction]
fn pow(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._pow(&a, &b, vm.ctx.none.as_object())
}
#[pyfunction]
fn rshift(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._rshift(&a, &b)
}
#[pyfunction]
fn sub(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._sub(&a, &b)
}
#[pyfunction]
fn truediv(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._truediv(&a, &b)
}
#[pyfunction]
fn xor(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._xor(&a, &b)
}
#[pyfunction]
fn concat(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
if !a.class().has_attr(identifier!(vm, __getitem__))
|| a.fast_isinstance(vm.ctx.types.dict_type)
{
return Err(
vm.new_type_error(format!("{} object can't be concatenated", a.class().name()))
);
}
vm._add(&a, &b)
}
#[pyfunction]
fn contains(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult<bool> {
vm._contains(&a, &b)
}
#[pyfunction(name = "countOf")]
fn count_of(a: PyIter, b: PyObjectRef, vm: &VirtualMachine) -> PyResult<usize> {
let mut count: usize = 0;
for element in a.iter_without_hint::<PyObjectRef>(vm)? {
let element = element?;
if element.is(&b) || vm.bool_eq(&b, &element)? {
count += 1;
}
}
Ok(count)
}
#[pyfunction]
fn delitem(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
a.del_item(&*b, vm)
}
#[pyfunction]
fn getitem(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
a.get_item(&*b, vm)
}
#[pyfunction(name = "indexOf")]
fn index_of(a: PyIter, b: PyObjectRef, vm: &VirtualMachine) -> PyResult<usize> {
for (index, element) in a.iter_without_hint::<PyObjectRef>(vm)?.enumerate() {
let element = element?;
if element.is(&b) || vm.bool_eq(&b, &element)? {
return Ok(index);
}
}
Err(vm.new_value_error("sequence.index(x): x not in sequence"))
}
#[pyfunction]
fn setitem(
a: PyObjectRef,
b: PyObjectRef,
c: PyObjectRef,
vm: &VirtualMachine,
) -> PyResult<()> {
a.set_item(&*b, c, vm)
}
#[pyfunction]
fn length_hint(obj: PyObjectRef, default: OptionalArg, vm: &VirtualMachine) -> PyResult<usize> {
let default: usize = default
.map(|v| {
if !v.fast_isinstance(vm.ctx.types.int_type) {
return Err(vm.new_type_error(format!(
"'{}' object cannot be interpreted as an integer",
v.class().name()
)));
}
v.downcast_ref::<PyInt>().unwrap().try_to_primitive(vm)
})
.unwrap_or(Ok(0))?;
obj.length_hint(default, vm)
}
#[pyfunction]
fn iadd(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._iadd(&a, &b)
}
#[pyfunction]
fn iand(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._iand(&a, &b)
}
#[pyfunction]
fn iconcat(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
if !a.class().has_attr(identifier!(vm, __getitem__))
|| a.fast_isinstance(vm.ctx.types.dict_type)
{
return Err(vm.new_type_error(format!(
"'{}' object can't be concatenated",
a.class().name()
)));
}
vm._iadd(&a, &b)
}
#[pyfunction]
fn ifloordiv(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._ifloordiv(&a, &b)
}
#[pyfunction]
fn ilshift(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._ilshift(&a, &b)
}
#[pyfunction]
fn imod(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._imod(&a, &b)
}
#[pyfunction]
fn imul(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._imul(&a, &b)
}
#[pyfunction]
fn imatmul(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._imatmul(&a, &b)
}
#[pyfunction]
fn ior(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._ior(&a, &b)
}
#[pyfunction]
fn ipow(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._ipow(&a, &b, vm.ctx.none.as_object())
}
#[pyfunction]
fn irshift(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._irshift(&a, &b)
}
#[pyfunction]
fn isub(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._isub(&a, &b)
}
#[pyfunction]
fn itruediv(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._itruediv(&a, &b)
}
#[pyfunction]
fn ixor(a: PyObjectRef, b: PyObjectRef, vm: &VirtualMachine) -> PyResult {
vm._ixor(&a, &b)
}
#[pyfunction]
fn _compare_digest(
a: Either<PyStrRef, ArgBytesLike>,
b: Either<PyStrRef, ArgBytesLike>,
vm: &VirtualMachine,
) -> PyResult<bool> {
let res = match (a, b) {
(Either::A(a), Either::A(b)) => {
if !a.isascii() || !b.isascii() {
return Err(vm.new_type_error(
"comparing strings with non-ASCII characters is not supported",
));
}
constant_time_eq(a.as_bytes(), b.as_bytes())
}
(Either::B(a), Either::B(b)) => a.with_ref(|a| b.with_ref(|b| constant_time_eq(a, b))),
_ => {
return Err(
vm.new_type_error("unsupported operand types(s) or combination of types")
);
}
};
Ok(res)
}
#[pyattr]
#[pyclass(name = "attrgetter")]
#[derive(Debug, PyPayload)]
struct PyAttrGetter {
attrs: Vec<PyStrRef>,
}
#[pyclass(with(Callable, Constructor, Representable))]
impl PyAttrGetter {
#[pygetset]
fn __text_signature__(&self) -> &'static str {
"(obj, /)"
}
#[pymethod]
fn __reduce__(zelf: PyRef<Self>, vm: &VirtualMachine) -> PyResult<(PyTypeRef, PyTupleRef)> {
let attrs = vm
.ctx
.new_tuple(zelf.attrs.iter().map(|v| v.clone().into()).collect());
Ok((zelf.class().to_owned(), attrs))
}
fn get_single_attr(
obj: PyObjectRef,
attr: &Py<PyStr>,
vm: &VirtualMachine,
) -> PyResult<PyObjectRef> {
let attr_str = attr.as_bytes();
let parts = attr_str.split(|&b| b == b'.').collect::<Vec<_>>();
if parts.len() == 1 {
return obj.get_attr(attr, vm);
}
let mut obj = obj;
for part in parts {
let part = Wtf8::from_bytes(part).expect("originally valid WTF-8");
obj = obj.get_attr(&vm.ctx.new_str(part), vm)?;
}
Ok(obj)
}
}
impl Constructor for PyAttrGetter {
type Args = FuncArgs;
fn py_new(_cls: &Py<PyType>, args: Self::Args, vm: &VirtualMachine) -> PyResult<Self> {
let n_attr = args.args.len();
if !args.kwargs.is_empty() {
return Err(vm.new_type_error("attrgetter() takes no keyword arguments"));
}
if n_attr == 0 {
return Err(vm.new_type_error("attrgetter expected 1 argument, got 0."));
}
let mut attrs = Vec::with_capacity(n_attr);
for o in args.args {
if let Ok(r) = o.try_into_value(vm) {
attrs.push(r);
} else {
return Err(vm.new_type_error("attribute name must be a string"));
}
}
Ok(Self { attrs })
}
}
impl Callable for PyAttrGetter {
type Args = PyObjectRef;
fn call(zelf: &Py<Self>, obj: Self::Args, vm: &VirtualMachine) -> PyResult {
if zelf.attrs.len() == 1 {
return Self::get_single_attr(obj, &zelf.attrs[0], vm);
}
let mut results = Vec::with_capacity(zelf.attrs.len());
for o in &zelf.attrs {
results.push(Self::get_single_attr(obj.clone(), o, vm)?);
}
Ok(vm.ctx.new_tuple(results).into())
}
}
impl Representable for PyAttrGetter {
#[inline]
fn repr_wtf8(zelf: &Py<Self>, vm: &VirtualMachine) -> PyResult<Wtf8Buf> {
let mut result = Wtf8Buf::from("operator.attrgetter(");
if let Some(_guard) = ReprGuard::enter(vm, zelf.as_object()) {
let mut first = true;
for part in &zelf.attrs {
if !first {
result.push_str(", ");
}
first = false;
result.push_wtf8(part.as_object().repr(vm)?.as_wtf8());
}
} else {
result.push_str("...");
}
result.push_char(')');
Ok(result)
}
}
#[pyattr]
#[pyclass(name = "itemgetter")]
#[derive(Debug, PyPayload)]
struct PyItemGetter {
items: Vec<PyObjectRef>,
}
#[pyclass(with(Callable, Constructor, Representable))]
impl PyItemGetter {
#[pygetset]
fn __text_signature__(&self) -> &'static str {
"(obj, /)"
}
#[pymethod]
fn __reduce__(zelf: PyRef<Self>, vm: &VirtualMachine) -> PyObjectRef {
let items = vm.ctx.new_tuple(zelf.items.to_vec());
vm.new_pyobj((zelf.class().to_owned(), items))
}
}
impl Constructor for PyItemGetter {
type Args = FuncArgs;
fn py_new(_cls: &Py<PyType>, args: Self::Args, vm: &VirtualMachine) -> PyResult<Self> {
if !args.kwargs.is_empty() {
return Err(vm.new_type_error("itemgetter() takes no keyword arguments"));
}
if args.args.is_empty() {
return Err(vm.new_type_error("itemgetter expected 1 argument, got 0."));
}
Ok(Self { items: args.args })
}
}
impl Callable for PyItemGetter {
type Args = PyObjectRef;
fn call(zelf: &Py<Self>, obj: Self::Args, vm: &VirtualMachine) -> PyResult {
if zelf.items.len() == 1 {
return obj.get_item(&*zelf.items[0], vm);
}
let mut results = Vec::with_capacity(zelf.items.len());
for item in &zelf.items {
results.push(obj.get_item(&**item, vm)?);
}
Ok(vm.ctx.new_tuple(results).into())
}
}
impl Representable for PyItemGetter {
#[inline]
fn repr_wtf8(zelf: &Py<Self>, vm: &VirtualMachine) -> PyResult<Wtf8Buf> {
let mut result = Wtf8Buf::from("operator.itemgetter(");
if let Some(_guard) = ReprGuard::enter(vm, zelf.as_object()) {
let mut first = true;
for item in &zelf.items {
if !first {
result.push_str(", ");
}
first = false;
result.push_wtf8(item.repr(vm)?.as_wtf8());
}
} else {
result.push_str("...");
}
result.push_char(')');
Ok(result)
}
}
#[pyattr]
#[pyclass(name = "methodcaller")]
#[derive(Debug, PyPayload)]
struct PyMethodCaller {
name: PyUtf8StrRef,
args: FuncArgs,
}
#[pyclass(with(Callable, Constructor, Representable))]
impl PyMethodCaller {
#[pygetset]
fn __text_signature__(&self) -> &'static str {
"(obj, /)"
}
#[pymethod]
fn __reduce__(zelf: PyRef<Self>, vm: &VirtualMachine) -> PyResult<PyTupleRef> {
if zelf.args.kwargs.is_empty() {
let mut py_args = vec![zelf.name.as_object().to_owned()];
py_args.append(&mut zelf.args.args.clone());
Ok(vm.new_tuple((zelf.class().to_owned(), vm.ctx.new_tuple(py_args))))
} else {
let partial = vm.import("functools", 0)?.get_attr("partial", vm)?;
let args = FuncArgs::new(
vec![zelf.class().to_owned().into(), zelf.name.clone().into()],
KwArgs::new(zelf.args.kwargs.clone()),
);
let callable = partial.call(args, vm)?;
Ok(vm.new_tuple((callable, vm.ctx.new_tuple(zelf.args.args.clone()))))
}
}
}
impl Constructor for PyMethodCaller {
type Args = (PyObjectRef, FuncArgs);
fn py_new(
_cls: &Py<PyType>,
(name, args): Self::Args,
vm: &VirtualMachine,
) -> PyResult<Self> {
let name = name
.try_into_value(vm)
.map_err(|_| vm.new_type_error("method name must be a string"))?;
Ok(Self { name, args })
}
}
impl Callable for PyMethodCaller {
type Args = PyObjectRef;
#[inline]
fn call(zelf: &Py<Self>, obj: Self::Args, vm: &VirtualMachine) -> PyResult {
vm.call_method(&obj, zelf.name.as_str(), zelf.args.clone())
}
}
impl Representable for PyMethodCaller {
#[inline]
fn repr_wtf8(zelf: &Py<Self>, vm: &VirtualMachine) -> PyResult<Wtf8Buf> {
let mut result = Wtf8Buf::from("operator.methodcaller(");
if let Some(_guard) = ReprGuard::enter(vm, zelf.as_object()) {
let args = &zelf.args.args;
let kwargs = &zelf.args.kwargs;
result.push_wtf8(zelf.name.as_object().repr(vm)?.as_wtf8());
for v in args {
result.push_str(", ");
result.push_wtf8(v.repr(vm)?.as_wtf8());
}
for (key, value) in kwargs {
result.push_str(", ");
result.push_str(key);
result.push_char('=');
result.push_wtf8(value.repr(vm)?.as_wtf8());
}
} else {
result.push_str("...");
}
result.push_char(')');
Ok(result)
}
}
}