#![allow(non_local_definitions)]
use std::{
collections::HashMap,
future::Future,
ops::Deref,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use bytes::Bytes;
use pyo3::{
exceptions::{PyRuntimeError, PyStopAsyncIteration, PyTypeError},
iter::IterNextOutput,
prelude::*,
};
use tokio::{runtime::Handle, sync::Mutex};
use crate::PyError;
#[pyclass]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Blob(aws_smithy_types::Blob);
impl Blob {
pub fn new<T: Into<Vec<u8>>>(input: T) -> Self {
Self(aws_smithy_types::Blob::new(input))
}
pub fn into_inner(self) -> Vec<u8> {
self.0.into_inner()
}
}
impl AsRef<[u8]> for Blob {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
#[pymethods]
impl Blob {
#[new]
pub fn pynew(input: Vec<u8>) -> Self {
Self(aws_smithy_types::Blob::new(input))
}
#[getter(data)]
pub fn get_data(&self) -> &[u8] {
self.as_ref()
}
#[setter(data)]
pub fn set_data(&mut self, data: Vec<u8>) {
*self = Self::pynew(data);
}
}
impl From<aws_smithy_types::Blob> for Blob {
fn from(other: aws_smithy_types::Blob) -> Blob {
Blob(other)
}
}
impl From<Blob> for aws_smithy_types::Blob {
fn from(other: Blob) -> aws_smithy_types::Blob {
other.0
}
}
impl<'blob> From<&'blob Blob> for &'blob aws_smithy_types::Blob {
fn from(other: &'blob Blob) -> &'blob aws_smithy_types::Blob {
&other.0
}
}
#[pyclass]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DateTime(aws_smithy_types::date_time::DateTime);
#[pyclass]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Format {
DateTime,
HttpDate,
EpochSeconds,
}
impl From<Format> for aws_smithy_types::date_time::Format {
fn from(variant: Format) -> aws_smithy_types::date_time::Format {
match variant {
Format::DateTime => aws_smithy_types::date_time::Format::DateTime,
Format::HttpDate => aws_smithy_types::date_time::Format::HttpDate,
Format::EpochSeconds => aws_smithy_types::date_time::Format::EpochSeconds,
}
}
}
impl DateTime {
pub fn fmt(
&self,
format: aws_smithy_types::date_time::Format,
) -> Result<String, aws_smithy_types::date_time::DateTimeFormatError> {
self.0.fmt(format)
}
}
#[pymethods]
impl DateTime {
#[staticmethod]
pub fn from_secs(epoch_seconds: i64) -> Self {
Self(aws_smithy_types::date_time::DateTime::from_secs(
epoch_seconds,
))
}
#[staticmethod]
pub fn from_millis(epoch_millis: i64) -> Self {
Self(aws_smithy_types::date_time::DateTime::from_secs(
epoch_millis,
))
}
#[staticmethod]
pub fn from_nanos(epoch_nanos: i128) -> PyResult<Self> {
Ok(Self(
aws_smithy_types::date_time::DateTime::from_nanos(epoch_nanos)
.map_err(PyError::DateTimeConversion)?,
))
}
#[staticmethod]
pub fn read(s: &str, format: Format, delim: char) -> PyResult<(Self, &str)> {
let (self_, next) = aws_smithy_types::date_time::DateTime::read(s, format.into(), delim)
.map_err(PyError::DateTimeParse)?;
Ok((Self(self_), next))
}
#[staticmethod]
pub fn from_fractional_secs(epoch_seconds: i64, fraction: f64) -> Self {
Self(aws_smithy_types::date_time::DateTime::from_fractional_secs(
epoch_seconds,
fraction,
))
}
#[staticmethod]
pub fn from_secs_and_nanos(seconds: i64, subsecond_nanos: u32) -> Self {
Self(aws_smithy_types::date_time::DateTime::from_secs_and_nanos(
seconds,
subsecond_nanos,
))
}
#[staticmethod]
pub fn from_secs_f64(epoch_seconds: f64) -> Self {
Self(aws_smithy_types::date_time::DateTime::from_secs_f64(
epoch_seconds,
))
}
#[staticmethod]
pub fn from_str(s: &str, format: Format) -> PyResult<Self> {
Ok(Self(
aws_smithy_types::date_time::DateTime::from_str(s, format.into())
.map_err(PyError::DateTimeParse)?,
))
}
pub fn as_nanos(&self) -> i128 {
self.0.as_nanos()
}
pub fn as_secs_f64(&self) -> f64 {
self.0.as_secs_f64()
}
pub fn has_subsec_nanos(&self) -> bool {
self.0.has_subsec_nanos()
}
pub fn secs(&self) -> i64 {
self.0.secs()
}
pub fn subsec_nanos(&self) -> u32 {
self.0.subsec_nanos()
}
pub fn to_millis(&self) -> PyResult<i64> {
Ok(self.0.to_millis().map_err(PyError::DateTimeConversion)?)
}
}
impl From<aws_smithy_types::DateTime> for DateTime {
fn from(other: aws_smithy_types::DateTime) -> DateTime {
DateTime(other)
}
}
impl Deref for DateTime {
type Target = aws_smithy_types::DateTime;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[pyclass]
#[derive(Debug, Clone)]
pub struct ByteStream(Arc<Mutex<aws_smithy_types::byte_stream::ByteStream>>);
impl futures::stream::Stream for ByteStream {
type Item = Result<Bytes, aws_smithy_types::byte_stream::error::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let stream = self.0.lock();
tokio::pin!(stream);
match stream.poll(cx) {
Poll::Ready(mut stream) => Pin::new(&mut *stream).poll_next(cx),
Poll::Pending => Poll::Pending,
}
}
}
async fn yield_data_chunk(
body: Arc<Mutex<aws_smithy_types::byte_stream::ByteStream>>,
) -> PyResult<Option<Bytes>> {
let mut stream = body.lock().await;
stream
.next()
.await
.transpose()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
impl ByteStream {
pub fn new(body: aws_smithy_types::body::SdkBody) -> Self {
Self(Arc::new(Mutex::new(
aws_smithy_types::byte_stream::ByteStream::new(body),
)))
}
}
impl Default for ByteStream {
fn default() -> Self {
Self::new(aws_smithy_types::body::SdkBody::from(""))
}
}
#[pymethods]
impl ByteStream {
#[new]
pub fn newpy(input: &[u8]) -> Self {
Self(Arc::new(Mutex::new(
aws_smithy_types::byte_stream::ByteStream::new(aws_smithy_types::body::SdkBody::from(
input,
)),
)))
}
#[staticmethod]
pub fn from_path_blocking(py: Python, path: String) -> PyResult<Py<PyAny>> {
let byte_stream = Handle::current().block_on(async {
aws_smithy_types::byte_stream::ByteStream::from_path(path)
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})?;
let result = Self(Arc::new(Mutex::new(byte_stream)));
Ok(result.into_py(py))
}
#[staticmethod]
pub fn from_path(py: Python<'_>, path: String) -> PyResult<&PyAny> {
pyo3_asyncio::tokio::future_into_py(py, async move {
let byte_stream = aws_smithy_types::byte_stream::ByteStream::from_path(path)
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
Ok(Self(Arc::new(Mutex::new(byte_stream))))
})
}
pub fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
slf
}
pub fn __next__(slf: PyRefMut<Self>) -> PyResult<IterNextOutput<Py<PyAny>, PyObject>> {
let body = slf.0.clone();
let data_chunk = futures::executor::block_on(yield_data_chunk(body));
match data_chunk {
Ok(Some(data_chunk)) => Ok(IterNextOutput::Yield(data_chunk.into_py(slf.py()))),
Ok(None) => Ok(IterNextOutput::Return(slf.py().None())),
Err(e) => Err(e),
}
}
pub fn __aiter__(slf: PyRef<Self>) -> PyRef<Self> {
slf
}
pub fn __anext__(slf: PyRefMut<Self>) -> PyResult<Option<PyObject>> {
let body = slf.0.clone();
let fut = pyo3_asyncio::tokio::future_into_py(slf.py(), async move {
let data = yield_data_chunk(body).await?;
match data {
Some(data) => Ok(Python::with_gil(|py| data.into_py(py))),
None => Err(PyStopAsyncIteration::new_err("stream exhausted")),
}
})?;
Ok(Some(fut.into()))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Document(aws_smithy_types::Document);
impl IntoPy<PyObject> for Document {
fn into_py(self, py: Python<'_>) -> PyObject {
use aws_smithy_types::{Document as D, Number};
match self.0 {
D::Object(obj) => obj
.into_iter()
.map(|(k, v)| (k, Document(v).into_py(py)))
.collect::<HashMap<_, _>>()
.into_py(py),
D::Array(vec) => vec
.into_iter()
.map(|d| Document(d).into_py(py))
.collect::<Vec<_>>()
.into_py(py),
D::Number(Number::Float(f)) => f.into_py(py),
D::Number(Number::PosInt(pi)) => pi.into_py(py),
D::Number(Number::NegInt(ni)) => ni.into_py(py),
D::String(str) => str.into_py(py),
D::Bool(bool) => bool.into_py(py),
D::Null => py.None(),
}
}
}
impl FromPyObject<'_> for Document {
fn extract(obj: &PyAny) -> PyResult<Self> {
use aws_smithy_types::{Document as D, Number};
if let Ok(obj) = obj.extract::<HashMap<String, Document>>() {
Ok(Self(D::Object(
obj.into_iter().map(|(k, v)| (k, v.0)).collect(),
)))
} else if let Ok(vec) = obj.extract::<Vec<Self>>() {
Ok(Self(D::Array(vec.into_iter().map(|d| d.0).collect())))
} else if let Ok(b) = obj.extract::<bool>() {
Ok(Self(D::Bool(b)))
} else if let Ok(pi) = obj.extract::<u64>() {
Ok(Self(D::Number(Number::PosInt(pi))))
} else if let Ok(ni) = obj.extract::<i64>() {
Ok(Self(D::Number(Number::NegInt(ni))))
} else if let Ok(f) = obj.extract::<f64>() {
Ok(Self(D::Number(Number::Float(f))))
} else if let Ok(s) = obj.extract::<String>() {
Ok(Self(D::String(s)))
} else if obj.is_none() {
Ok(Self(D::Null))
} else {
Err(PyTypeError::new_err(format!(
"'{obj}' cannot be converted to 'Document'",
)))
}
}
}
impl Deref for Document {
type Target = aws_smithy_types::Document;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<aws_smithy_types::Document> for Document {
fn from(other: aws_smithy_types::Document) -> Document {
Document(other)
}
}
#[cfg(test)]
mod tests {
use pyo3::py_run;
use super::*;
#[test]
fn blob_can_be_used_in_python_when_initialized_in_rust() {
crate::tests::initialize();
Python::with_gil(|py| {
let blob = Blob::new("some data".as_bytes().to_vec());
let blob = PyCell::new(py, blob).unwrap();
py_run!(
py,
blob,
r#"
assert blob.data == b"some data"
assert len(blob.data) == 9
blob.data = b"some other data"
assert blob.data == b"some other data"
assert len(blob.data) == 15
"#
);
})
}
#[test]
fn blob_can_be_initialized_in_python() {
crate::tests::initialize();
Python::with_gil(|py| {
let types = PyModule::new(py, "types").unwrap();
types.add_class::<Blob>().unwrap();
py_run!(
py,
types,
r#"
blob = types.Blob(b"some data")
assert blob.data == b"some data"
assert len(blob.data) == 9
blob.data = b"some other data"
assert blob.data == b"some other data"
assert len(blob.data) == 15
"#
);
})
}
#[test]
fn datetime_can_be_used_in_python_when_initialized_in_rust() {
crate::tests::initialize();
Python::with_gil(|py| {
let datetime = DateTime::from_secs(100);
let datetime = PyCell::new(py, datetime).unwrap();
py_run!(py, datetime, "assert datetime.secs() == 100");
})
}
#[test]
fn datetime_can_by_initialized_in_python() {
crate::tests::initialize();
Python::with_gil(|py| {
let types = PyModule::new(py, "types").unwrap();
types.add_class::<DateTime>().unwrap();
py_run!(
py,
types,
"assert types.DateTime.from_secs(100).secs() == 100"
);
})
}
#[tokio::test]
async fn bytestream_can_be_used_in_sync_python_when_initialized_in_rust() -> PyResult<()> {
crate::tests::initialize();
Python::with_gil(|py| {
let bytes = "repeat\n".repeat(100000);
let bytestream = ByteStream::newpy(bytes.as_bytes());
let bytestream = PyCell::new(py, bytestream).unwrap();
py_run!(
py,
bytestream,
r#"
for chunk in bytestream:
assert len(chunk) > 10
"#
)
});
Ok(())
}
#[test]
fn document_type() {
use aws_smithy_types::{Document as D, Number};
crate::tests::initialize();
let cases = [
(D::Null, "None"),
(D::Bool(true), "True"),
(D::Bool(false), "False"),
(D::String("foobar".to_string()), "'foobar'"),
(D::Number(Number::Float(42.0)), "42.0"),
(D::Number(Number::PosInt(142)), "142"),
(D::Number(Number::NegInt(-152)), "-152"),
(
D::Array(vec![
D::Bool(false),
D::String("qux".to_string()),
D::Number(Number::Float(1.0)),
D::Array(vec![D::String("inner".to_string()), D::Bool(true)]),
]),
"[False, 'qux', 1.0, ['inner', True]]",
),
(
D::Object(
[
("t".to_string(), D::Bool(true)),
("foo".to_string(), D::String("foo".to_string())),
("f42".to_string(), D::Number(Number::Float(42.0))),
("i42".to_string(), D::Number(Number::PosInt(42))),
("f".to_string(), D::Bool(false)),
(
"vec".to_string(),
D::Array(vec![
D::String("inner".to_string()),
D::Object(
[
(
"nested".to_string(),
D::String("nested_value".to_string()),
),
("nested_num".to_string(), D::Number(Number::NegInt(-42))),
]
.into(),
),
]),
),
]
.into(),
),
"{
't': True,
'foo': 'foo',
'f42': 42.0,
'i42': 42,
'f': False,
'vec': [
'inner',
{'nested': 'nested_value', 'nested_num': -42}
]
}",
),
];
for (rust_ty, python_repr) in cases {
Python::with_gil(|py| {
let value = Document(rust_ty.clone()).into_py(py);
py_run!(py, value, &format!("assert value == {python_repr}"));
});
Python::with_gil(|py| {
let py_value = py.eval(python_repr, None, None).unwrap();
let doc = py_value.extract::<Document>().unwrap();
assert_eq!(doc, Document(rust_ty.clone()));
});
Python::with_gil(|py| {
let doc = Document(rust_ty);
let doc2 = doc.clone().into_py(py).extract(py).unwrap();
assert_eq!(doc, doc2);
});
}
}
}