#![forbid(unsafe_code)]
#![deny(clippy::all, rustdoc::all)]
#![warn(clippy::pedantic)]
#![doc = include_str!("../README.md")]
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http::{header::HeaderName, HeaderValue, Method, Request, Response, Uri};
use http_body::{combinators::UnsyncBoxBody, Body, Full};
use pyo3::{
exceptions::PyValueError,
pyclass, pymethods,
types::{PyBytes, PySequence, PyTuple},
IntoPy, Py, PyAny, PyErr, PyResult, Python,
};
use tower::{
util::{BoxCloneService, Oneshot},
Service, ServiceExt,
};
mod util;
use self::util::BoxBuf;
#[pyclass(subclass)]
pub struct Resource {
service: BoxCloneService<Request<Full<Bytes>>, Response<UnsyncBoxBody<BoxBuf, PyErr>>, PyErr>,
}
impl Resource {
pub fn from_service<S, B, E>(service: S) -> Self
where
S: Service<Request<Full<Bytes>>, Response = Response<B>, Error = E>
+ Clone
+ Send
+ 'static,
S::Future: Send,
B: Body + Send + 'static,
B::Data: Buf,
B::Error: Into<PyErr>,
E: Into<PyErr> + 'static,
{
let service = service
.map_response(|response: Response<B>| {
response.map(|body| {
let body = body.map_data(BoxBuf::new).map_err(Into::into);
UnsyncBoxBody::new(body)
})
})
.map_err(Into::into)
.boxed_clone();
Self { service }
}
}
#[pymethods]
impl Resource {
#[setter]
fn server(&self, _server: &PyAny) {
let _ = self;
}
#[getter(isLeaf)]
fn is_leaf(&self) -> bool {
let _ = self;
true
}
fn render<'a>(&self, py: Python<'a>, request: &'a PyAny) -> PyResult<&'a PyAny> {
let service = self.service.clone();
let not_done_yet = py.import("twisted.web.server")?.getattr("NOT_DONE_YET")?;
let defer = py.import("twisted.internet.defer")?;
let future = handle_twisted_request_through_service(service, request)?;
let deferred = defer
.getattr("Deferred")?
.call_method1("fromFuture", (future,))?;
defer.getattr("ensureDeferred")?.call1((deferred,))?;
Ok(not_done_yet)
}
}
pub fn read_io_body(body: &PyAny, chunk_size: usize) -> PyResult<Bytes> {
let mut buf = BytesMut::new();
loop {
let bytes: &PyBytes = body.call_method1("read", (chunk_size,))?.downcast()?;
if bytes.as_bytes().is_empty() {
return Ok(buf.into());
}
buf.put(bytes.as_bytes());
}
}
pub fn http_request_from_twisted(request: &PyAny) -> PyResult<Request<Bytes>> {
let content = request.getattr("content")?;
let body = read_io_body(content, 4096)?;
let mut req = Request::new(body);
let uri: &PyBytes = request.getattr("uri")?.downcast()?;
*req.uri_mut() =
Uri::try_from(uri.as_bytes()).map_err(|_| PyValueError::new_err("invalid uri"))?;
let method: &PyBytes = request.getattr("method")?.downcast()?;
*req.method_mut() = Method::from_bytes(method.as_bytes())
.map_err(|_| PyValueError::new_err("invalid method"))?;
{
let headers = req.headers_mut();
let headers_iter = request
.getattr("requestHeaders")?
.call_method0("getAllRawHeaders")?
.iter()?;
for header in headers_iter {
let header = header?;
let header: &PyTuple = header.downcast()?;
let name: &PyBytes = header.get_item(0)?.downcast()?;
let name = HeaderName::from_bytes(name.as_bytes())
.map_err(|_| PyValueError::new_err("invalid header name"))?;
let values: &PySequence = header.get_item(1)?.downcast()?;
for index in 0..values.len()? {
let value: &PyBytes = values.get_item(index)?.downcast()?;
let value = HeaderValue::from_bytes(value.as_bytes())
.map_err(|_| PyValueError::new_err("invalid header value"))?;
headers.append(name.clone(), value);
}
}
}
Ok(req)
}
pub fn http_response_to_twisted<B>(request: &PyAny, response: Response<B>) -> PyResult<()>
where
B: Buf,
{
let (parts, mut body) = response.into_parts();
send_parts(request, &parts)?;
while body.remaining() != 0 {
let chunk = body.chunk();
request.call_method1("write", (chunk,))?;
body.advance(chunk.len());
}
request.call_method0("finish")?;
Ok(())
}
fn send_parts(request: &PyAny, parts: &http::response::Parts) -> PyResult<()> {
request.call_method1("setResponseCode", (parts.status.as_u16(),))?;
let response_headers = request.getattr("responseHeaders")?;
for (name, value) in parts.headers.iter() {
response_headers.call_method1("addRawHeader", (name.as_str(), value.as_bytes()))?;
}
Ok(())
}
async fn send_body<B>(request: Py<PyAny>, body: B) -> PyResult<()>
where
B: Body + Send + 'static,
B::Data: Buf,
PyErr: From<B::Error>,
{
futures_util::pin_mut!(body);
while let Some(res) = body.data().await {
let mut data = res?;
Python::with_gil(|py| {
while data.remaining() != 0 {
let chunk = data.chunk();
request.call_method1(py, "write", (chunk,))?;
data.advance(chunk.len());
}
PyResult::Ok(())
})?;
}
Python::with_gil(|py| request.call_method0(py, "finish"))?;
Ok(())
}
#[allow(clippy::trait_duplication_in_bounds)]
pub fn handle_twisted_request_through_service<S, E, ResBody>(
service: S,
twisted_request: &PyAny,
) -> PyResult<&PyAny>
where
S: Service<Request<Full<Bytes>>, Response = Response<ResBody>, Error = E> + Send + 'static,
S::Future: Send,
ResBody: Body + Send + 'static,
PyErr: From<E> + From<ResBody::Error>,
{
let py = twisted_request.py();
let request = http_request_from_twisted(twisted_request)?;
let request = request.map(Full::new);
let twisted_request = twisted_request.into_py(py);
pyo3_asyncio::tokio::future_into_py(py, async move {
let response = Oneshot::new(service, request).await?;
let (parts, body) = response.into_parts();
Python::with_gil(|py| send_parts(twisted_request.as_ref(py), &parts))?;
send_body(twisted_request, body).await
})
}