cyper 0.9.0-rc.1

HTTP client library based on compio and hyper
Documentation
use std::{io::Read, rc::Rc};

#[cfg(feature = "nyquest-blocking-stream")]
use compio::bytes::Bytes;
use compio::runtime::Runtime;
use futures_util::StreamExt;
#[cfg(feature = "nyquest-blocking-stream")]
use nyquest_interface::{Body, Request};
use nyquest_interface::{
    Result,
    blocking::{BlockingBackend, BlockingClient, BlockingResponse, Request as BlockingRequest},
    client::ClientOptions,
};
use send_wrapper::SendWrapper;

use super::{CyperBackend, CyperClient, CyperResponse, to_io_error};

impl BlockingBackend for CyperBackend {
    type BlockingClient = CyperBlockingClient;

    fn create_blocking_client(&self, options: ClientOptions) -> Result<Self::BlockingClient> {
        let client = self.create_client(options)?;
        let runtime = Rc::new(Runtime::new()?);
        Ok(CyperBlockingClient {
            client,
            runtime: SendWrapper::new(runtime),
        })
    }
}

#[derive(Clone)]
pub struct CyperBlockingClient {
    client: CyperClient,
    runtime: SendWrapper<Rc<Runtime>>,
}

impl BlockingClient for CyperBlockingClient {
    type Response = CyperBlockingResponse;

    fn request(&self, req: BlockingRequest) -> Result<Self::Response> {
        let resp = self.runtime.block_on(self.client.request(
            #[cfg(not(feature = "nyquest-blocking-stream"))]
            req,
            #[cfg(feature = "nyquest-blocking-stream")]
            Request {
                method: req.method,
                relative_uri: req.relative_uri,
                additional_headers: req.additional_headers,
                body: req.body.map(|body| {
                    match body {
                        Body::Stream {
                            stream,
                            content_type,
                        } => Body::Stream {
                            stream: futures_util::stream::iter(WrapBoxedStream(stream)),
                            content_type,
                        },
                        Body::Bytes {
                            content,
                            content_type,
                        } => Body::Bytes {
                            content,
                            content_type,
                        },
                        Body::Form { fields } => Body::Form { fields },
                        #[cfg(feature = "nyquest-multipart")]
                        Body::Multipart { parts } => Body::Multipart {
                            parts: parts
                                .into_iter()
                                .map(|part| nyquest_interface::Part {
                                    headers: part.headers,
                                    name: part.name,
                                    filename: part.filename,
                                    content_type: part.content_type,
                                    body: match part.body {
                                        nyquest_interface::PartBody::Bytes { content } => {
                                            nyquest_interface::PartBody::Bytes { content }
                                        }
                                        nyquest_interface::PartBody::Stream(s) => {
                                            nyquest_interface::PartBody::Stream(
                                                futures_util::stream::iter(WrapBoxedStream(s)),
                                            )
                                        }
                                    },
                                })
                                .collect(),
                        },
                    }
                }),
            },
        ))?;
        Ok(CyperBlockingResponse {
            resp,
            runtime: self.runtime.clone(),
        })
    }
}

#[cfg(feature = "nyquest-blocking-stream")]
struct WrapBoxedStream(nyquest_interface::blocking::BoxedStream);

#[cfg(feature = "nyquest-blocking-stream")]
impl Iterator for WrapBoxedStream {
    type Item = crate::Result<Bytes>;

    fn next(&mut self) -> Option<Self::Item> {
        let mut buffer = [0u8; 1024];
        let res = self.0.read(&mut buffer);
        match res {
            Ok(0) => None,
            Ok(n) => {
                let bytes = Bytes::copy_from_slice(&buffer[..n]);
                Some(Ok(bytes))
            }
            Err(e) => Some(Err(e.into())),
        }
    }
}

pub struct CyperBlockingResponse {
    resp: CyperResponse,
    runtime: SendWrapper<Rc<Runtime>>,
}

impl Read for CyperBlockingResponse {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let res = self.runtime.block_on(self.resp.resp.next());
        match res {
            Some(Ok(bytes)) => {
                let len = bytes.len().min(buf.len());
                buf[..len].copy_from_slice(&bytes[..len]);
                Ok(len)
            }
            Some(Err(e)) => Err(to_io_error(e)),
            None => Ok(0),
        }
    }
}

impl BlockingResponse for CyperBlockingResponse {
    fn status(&self) -> u16 {
        self.resp.status()
    }

    fn content_length(&self) -> Option<u64> {
        self.resp.content_length()
    }

    fn get_header(&self, header: &str) -> Result<Vec<String>> {
        self.resp.get_header(header)
    }

    fn text(&mut self) -> Result<String> {
        BlockingResponse::bytes(self).map(|b| String::from_utf8_lossy(&b).to_string())
    }

    fn bytes(&mut self) -> Result<Vec<u8>> {
        self.runtime.block_on(self.resp.bytes_impl())
    }
}