use crate::{err::PyErrArguments, exceptions, types, PyErr, Python};
use crate::{IntoPyObject, Py, PyAny};
use std::io;
impl From<PyErr> for io::Error {
fn from(err: PyErr) -> Self {
let kind = Python::attach(|py| {
if err.is_instance_of::<exceptions::PyBrokenPipeError>(py) {
io::ErrorKind::BrokenPipe
} else if err.is_instance_of::<exceptions::PyConnectionRefusedError>(py) {
io::ErrorKind::ConnectionRefused
} else if err.is_instance_of::<exceptions::PyConnectionAbortedError>(py) {
io::ErrorKind::ConnectionAborted
} else if err.is_instance_of::<exceptions::PyConnectionResetError>(py) {
io::ErrorKind::ConnectionReset
} else if err.is_instance_of::<exceptions::PyInterruptedError>(py) {
io::ErrorKind::Interrupted
} else if err.is_instance_of::<exceptions::PyFileNotFoundError>(py) {
io::ErrorKind::NotFound
} else if err.is_instance_of::<exceptions::PyPermissionError>(py) {
io::ErrorKind::PermissionDenied
} else if err.is_instance_of::<exceptions::PyFileExistsError>(py) {
io::ErrorKind::AlreadyExists
} else if err.is_instance_of::<exceptions::PyBlockingIOError>(py) {
io::ErrorKind::WouldBlock
} else if err.is_instance_of::<exceptions::PyTimeoutError>(py) {
io::ErrorKind::TimedOut
} else if err.is_instance_of::<exceptions::PyMemoryError>(py) {
io::ErrorKind::OutOfMemory
} else if err.is_instance_of::<exceptions::PyIsADirectoryError>(py) {
io::ErrorKind::IsADirectory
} else if err.is_instance_of::<exceptions::PyNotADirectoryError>(py) {
io::ErrorKind::NotADirectory
} else {
io::ErrorKind::Other
}
});
io::Error::new(kind, err)
}
}
impl From<io::Error> for PyErr {
fn from(err: io::Error) -> PyErr {
if err.get_ref().is_some_and(|e| e.is::<PyErr>()) {
return *err.into_inner().unwrap().downcast().unwrap();
}
match err.kind() {
io::ErrorKind::BrokenPipe => exceptions::PyBrokenPipeError::new_err(err),
io::ErrorKind::ConnectionRefused => exceptions::PyConnectionRefusedError::new_err(err),
io::ErrorKind::ConnectionAborted => exceptions::PyConnectionAbortedError::new_err(err),
io::ErrorKind::ConnectionReset => exceptions::PyConnectionResetError::new_err(err),
io::ErrorKind::Interrupted => exceptions::PyInterruptedError::new_err(err),
io::ErrorKind::NotFound => exceptions::PyFileNotFoundError::new_err(err),
io::ErrorKind::PermissionDenied => exceptions::PyPermissionError::new_err(err),
io::ErrorKind::AlreadyExists => exceptions::PyFileExistsError::new_err(err),
io::ErrorKind::WouldBlock => exceptions::PyBlockingIOError::new_err(err),
io::ErrorKind::TimedOut => exceptions::PyTimeoutError::new_err(err),
io::ErrorKind::OutOfMemory => exceptions::PyMemoryError::new_err(err),
io::ErrorKind::IsADirectory => exceptions::PyIsADirectoryError::new_err(err),
io::ErrorKind::NotADirectory => exceptions::PyNotADirectoryError::new_err(err),
_ => exceptions::PyOSError::new_err(err),
}
}
}
impl PyErrArguments for io::Error {
fn arguments(self, py: Python<'_>) -> Py<PyAny> {
self.to_string()
.into_pyobject(py)
.unwrap()
.into_any()
.unbind()
}
}
impl<W> From<io::IntoInnerError<W>> for PyErr {
fn from(err: io::IntoInnerError<W>) -> PyErr {
err.into_error().into()
}
}
impl<W: Send + Sync> PyErrArguments for io::IntoInnerError<W> {
fn arguments(self, py: Python<'_>) -> Py<PyAny> {
self.into_error().arguments(py)
}
}
impl From<core::convert::Infallible> for PyErr {
fn from(_: core::convert::Infallible) -> PyErr {
unreachable!()
}
}
macro_rules! impl_to_pyerr {
($err: ty, $pyexc: ty) => {
impl PyErrArguments for $err {
fn arguments(self, py: Python<'_>) -> $crate::Py<$crate::PyAny> {
self.to_string()
.into_pyobject(py)
.unwrap()
.into_any()
.unbind()
}
}
impl core::convert::From<$err> for PyErr {
fn from(err: $err) -> PyErr {
<$pyexc>::new_err(err)
}
}
};
}
struct Utf8ErrorWithBytes {
err: core::str::Utf8Error,
bytes: Vec<u8>,
}
impl PyErrArguments for Utf8ErrorWithBytes {
fn arguments(self, py: Python<'_>) -> Py<PyAny> {
let Self { err, bytes } = self;
let start = err.valid_up_to();
let end = err.error_len().map_or(bytes.len(), |l| start + l);
let encoding = types::PyString::new(py, "utf-8").into_any();
let bytes = types::PyBytes::new(py, &bytes).into_any();
let start = types::PyInt::new(py, start).into_any();
let end = types::PyInt::new(py, end).into_any();
let reason = types::PyString::new(py, "invalid utf-8").into_any();
types::PyTuple::new(py, &[encoding, bytes, start, end, reason])
.unwrap()
.into_any()
.unbind()
}
}
impl PyErrArguments for alloc::string::FromUtf8Error {
fn arguments(self, py: Python<'_>) -> Py<PyAny> {
Utf8ErrorWithBytes {
err: self.utf8_error(),
bytes: self.into_bytes(),
}
.arguments(py)
}
}
impl core::convert::From<alloc::string::FromUtf8Error> for PyErr {
fn from(err: alloc::string::FromUtf8Error) -> PyErr {
exceptions::PyUnicodeDecodeError::new_err(err)
}
}
impl PyErrArguments for alloc::ffi::IntoStringError {
fn arguments(self, py: Python<'_>) -> Py<PyAny> {
Utf8ErrorWithBytes {
err: self.utf8_error(),
bytes: self.into_cstring().into_bytes(),
}
.arguments(py)
}
}
impl core::convert::From<alloc::ffi::IntoStringError> for PyErr {
fn from(err: alloc::ffi::IntoStringError) -> PyErr {
exceptions::PyUnicodeDecodeError::new_err(err)
}
}
impl_to_pyerr!(core::array::TryFromSliceError, exceptions::PyValueError);
impl_to_pyerr!(core::num::ParseIntError, exceptions::PyValueError);
impl_to_pyerr!(core::num::ParseFloatError, exceptions::PyValueError);
impl_to_pyerr!(core::num::TryFromIntError, exceptions::PyValueError);
impl_to_pyerr!(core::str::ParseBoolError, exceptions::PyValueError);
impl_to_pyerr!(alloc::ffi::NulError, exceptions::PyValueError);
impl_to_pyerr!(core::net::AddrParseError, exceptions::PyValueError);
impl_to_pyerr!(core::time::TryFromFloatSecsError, exceptions::PyValueError);
impl_to_pyerr!(std::time::SystemTimeError, exceptions::PyValueError);
impl_to_pyerr!(std::path::StripPrefixError, exceptions::PyValueError);
impl_to_pyerr!(std::env::JoinPathsError, exceptions::PyValueError);
impl_to_pyerr!(core::char::ParseCharError, exceptions::PyValueError);
impl_to_pyerr!(core::char::CharTryFromError, exceptions::PyValueError);
#[cfg(test)]
mod tests {
use super::*;
use crate::exceptions::PyUnicodeDecodeError;
use crate::types::PyAnyMethods;
use crate::{IntoPyObjectExt as _, PyErr, Python};
use std::io;
#[test]
fn io_errors() {
use crate::types::any::PyAnyMethods;
let check_err = |kind, expected_ty| {
Python::attach(|py| {
let rust_err = io::Error::new(kind, "some error msg");
let py_err: PyErr = rust_err.into();
let py_err_msg = format!("{expected_ty}: some error msg");
assert_eq!(py_err.to_string(), py_err_msg);
let py_error_clone = py_err.clone_ref(py);
let rust_err_from_py_err: io::Error = py_err.into();
assert_eq!(rust_err_from_py_err.to_string(), py_err_msg);
assert_eq!(rust_err_from_py_err.kind(), kind);
let py_err_recovered_from_rust_err: PyErr = rust_err_from_py_err.into();
assert!(py_err_recovered_from_rust_err
.value(py)
.is(py_error_clone.value(py))); })
};
check_err(io::ErrorKind::BrokenPipe, "BrokenPipeError");
check_err(io::ErrorKind::ConnectionRefused, "ConnectionRefusedError");
check_err(io::ErrorKind::ConnectionAborted, "ConnectionAbortedError");
check_err(io::ErrorKind::ConnectionReset, "ConnectionResetError");
check_err(io::ErrorKind::Interrupted, "InterruptedError");
check_err(io::ErrorKind::NotFound, "FileNotFoundError");
check_err(io::ErrorKind::PermissionDenied, "PermissionError");
check_err(io::ErrorKind::AlreadyExists, "FileExistsError");
check_err(io::ErrorKind::WouldBlock, "BlockingIOError");
check_err(io::ErrorKind::TimedOut, "TimeoutError");
check_err(io::ErrorKind::IsADirectory, "IsADirectoryError");
check_err(io::ErrorKind::NotADirectory, "NotADirectoryError");
}
#[test]
#[allow(invalid_from_utf8)]
fn utf8_errors() {
let bytes = b"abc\xffdef".to_vec();
let check_err = |py_err: PyErr| {
Python::attach(|py| {
let py_err = py_err.into_bound_py_any(py).unwrap();
assert!(py_err.is_instance_of::<exceptions::PyUnicodeDecodeError>());
assert_eq!(
py_err
.getattr("encoding")
.unwrap()
.extract::<String>()
.unwrap(),
"utf-8"
);
assert_eq!(
py_err
.getattr("object")
.unwrap()
.extract::<Vec<u8>>()
.unwrap(),
&*bytes
);
assert_eq!(
py_err.getattr("start").unwrap().extract::<usize>().unwrap(),
3
);
assert_eq!(
py_err.getattr("end").unwrap().extract::<usize>().unwrap(),
4
);
assert_eq!(
py_err
.getattr("reason")
.unwrap()
.extract::<String>()
.unwrap(),
"invalid utf-8"
);
});
};
let utf8_err_with_bytes = PyUnicodeDecodeError::new_err(Utf8ErrorWithBytes {
err: core::str::from_utf8(&bytes).expect_err("\\xff is invalid utf-8"),
bytes: bytes.clone(),
});
check_err(utf8_err_with_bytes);
let from_utf8_err = String::from_utf8(bytes.clone())
.expect_err("\\xff is invalid utf-8")
.into();
check_err(from_utf8_err);
let from_utf8_err = alloc::ffi::CString::new(bytes.clone())
.unwrap()
.into_string()
.expect_err("\\xff is invalid utf-8")
.into();
check_err(from_utf8_err);
}
#[test]
fn std_error_conversions() {
Python::attach(|py| {
let check_err = |err: PyErr, expected_msg: &str| {
let py_err = err.into_bound_py_any(py).unwrap();
assert!(py_err.is_instance_of::<exceptions::PyValueError>());
let msg = py_err.str().unwrap().to_string();
assert_eq!(msg, expected_msg);
};
let float_secs_err = core::time::Duration::try_from_secs_f32(-1.0).unwrap_err();
let expected = float_secs_err.to_string();
check_err(float_secs_err.into(), &expected);
let sys_time_err = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::now() + core::time::Duration::from_secs(1))
.unwrap_err();
let expected = sys_time_err.to_string();
check_err(sys_time_err.into(), &expected);
let strip_prefix_err = std::path::Path::new("/a/b/c")
.strip_prefix("/x/y/z")
.unwrap_err();
let expected = strip_prefix_err.to_string();
check_err(strip_prefix_err.into(), &expected);
let join_paths_err = std::env::join_paths(["a:b", "a;b", "a\"b"].iter()).unwrap_err();
let expected = join_paths_err.to_string();
check_err(join_paths_err.into(), &expected);
let parse_char_err = "abc".parse::<char>().unwrap_err();
let expected = parse_char_err.to_string();
check_err(parse_char_err.into(), &expected);
let char_try_from_err = char::try_from(0xD800_u32).unwrap_err();
let expected = char_try_from_err.to_string();
check_err(char_try_from_err.into(), &expected);
});
}
}