nyquest_backend_curl/
async.rs

1use std::cell::RefCell;
2use std::io;
3use std::pin::pin;
4use std::task::{ready, Poll};
5use std::{pin::Pin, sync::Arc, task::Context};
6
7use futures_util::future::{select, Either};
8use nyquest_interface::r#async::{futures_io, AsyncResponse};
9use nyquest_interface::Error as NyquestError;
10
11mod handler;
12mod r#loop;
13mod pause;
14mod read_task;
15mod set;
16mod shared;
17
18use crate::curl_ng::easy::{AsRawEasyMut as _, Share};
19use crate::r#async::handler::AsyncHandler;
20use crate::request::{create_easy, AsCallbackMut as _};
21use crate::url::concat_url;
22
23type Easy = crate::request::BoxEasyHandle<handler::AsyncHandler>;
24
25pub struct CurlMultiClientInner {
26    options: nyquest_interface::client::ClientOptions,
27    loop_manager: r#loop::LoopManager,
28    share: Share,
29}
30#[derive(Clone)]
31pub struct CurlMultiClient {
32    inner: Arc<CurlMultiClientInner>,
33}
34
35pub struct CurlAsyncResponse {
36    status: u16,
37    content_length: Option<u64>,
38    headers: Vec<(String, String)>,
39    handle: r#loop::RequestHandle,
40    max_response_buffer_size: Option<u64>,
41}
42
43impl AsyncResponse for CurlAsyncResponse {
44    fn status(&self) -> u16 {
45        self.status
46    }
47
48    fn content_length(&self) -> Option<u64> {
49        self.content_length
50    }
51
52    fn get_header(&self, header: &str) -> nyquest_interface::Result<Vec<String>> {
53        Ok(self
54            .headers
55            .iter()
56            .filter(|(k, _)| k.eq_ignore_ascii_case(header))
57            .map(|(_, v)| v.clone())
58            .collect())
59    }
60
61    async fn text(mut self: Pin<&mut Self>) -> nyquest_interface::Result<String> {
62        let buf = self.as_mut().bytes().await?;
63        #[cfg(feature = "charset")]
64        if let Some((_, mut charset)) = self
65            .get_header("content-type")?
66            .pop()
67            .unwrap_or_default()
68            .split(';')
69            .filter_map(|s| s.split_once('='))
70            .find(|(k, _)| k.trim().eq_ignore_ascii_case("charset"))
71        {
72            charset = charset.trim_matches('"');
73            if let Ok(decoded) = iconv_native::decode_lossy(&buf, charset.trim()) {
74                return Ok(decoded);
75            }
76        }
77        Ok(String::from_utf8_lossy(&buf).into_owned())
78    }
79
80    async fn bytes(self: Pin<&mut Self>) -> nyquest_interface::Result<Vec<u8>> {
81        let this = self.get_mut();
82        let mut buf = vec![];
83        while let Some(()) = this
84            .handle
85            .poll_bytes_async(|data| {
86                if let Some(max_response_buffer_size) = this.max_response_buffer_size {
87                    if buf.len() + data.len() > max_response_buffer_size as usize {
88                        return Err(NyquestError::ResponseTooLarge);
89                    }
90                }
91                buf.extend_from_slice(data);
92                data.clear();
93                Ok(())
94            })
95            .await?
96        {}
97        Ok(buf)
98    }
99}
100
101impl futures_io::AsyncRead for CurlAsyncResponse {
102    fn poll_read(
103        self: Pin<&mut Self>,
104        cx: &mut Context<'_>,
105        buf: &mut [u8],
106    ) -> Poll<io::Result<usize>> {
107        let this = self.get_mut();
108        let poll_res = ready!(this.handle.poll_bytes(cx, |data| {
109            let read_len = data.len().min(buf.len());
110            buf[..read_len].copy_from_slice(&data[..read_len]);
111            data.drain(..read_len);
112            Ok(read_len)
113        }));
114        Poll::Ready(match poll_res {
115            Ok(None) => Ok(0),
116            Ok(Some(read_len)) => Ok(read_len),
117            Err(NyquestError::RequestTimeout) => {
118                return Poll::Ready(Err(io::ErrorKind::TimedOut.into()))
119            }
120            Err(NyquestError::Io(e)) => return Poll::Ready(Err(e)),
121            Err(e) => unreachable!("Unexpected error: {}", e),
122        })
123    }
124}
125
126impl nyquest_interface::r#async::AsyncClient for CurlMultiClient {
127    type Response = CurlAsyncResponse;
128
129    async fn request(
130        &self,
131        req: nyquest_interface::r#async::Request,
132    ) -> nyquest_interface::Result<Self::Response> {
133        let (req, read_task_collection) = {
134            let mut easy = create_easy(AsyncHandler::default(), &self.inner.share)?;
135            let raw = easy.as_mut().as_raw_easy_mut().raw();
136            easy.as_callback_mut().pause = Some(pause::EasyPause::new(raw));
137            // FIXME: properly concat base_url and url
138            let url = concat_url(self.inner.options.base_url.as_deref(), &req.relative_uri);
139            let req_ctx = easy.as_callback_mut().ctx.clone();
140            let read_task_collection = RefCell::new(read_task::ReadTaskCollection::new(req_ctx));
141            crate::request::populate_request(
142                &url,
143                req,
144                &self.inner.options,
145                easy.as_mut(),
146                |easy, stream| {
147                    read_task_collection
148                        .borrow_mut()
149                        .add_in_handler(easy, stream)
150                },
151                |stream| {
152                    read_task_collection
153                        .borrow_mut()
154                        .add_mime_part_reader(stream)
155                },
156            )?;
157            let req = self.inner.loop_manager.start_request(easy).await?;
158            (req, read_task_collection.into_inner())
159        };
160        let res_task = pin!(req.wait_for_response());
161        let read_task_collection = pin!(read_task_collection.execute(&self.inner.loop_manager));
162        let mut res = match select(res_task, read_task_collection).await {
163            Either::Left((res, _)) => res?,
164            Either::Right((Err(e), _)) => return Err(e),
165            Either::Right((Ok(_), _)) => unreachable!(),
166        };
167        res.max_response_buffer_size = self.inner.options.max_response_buffer_size;
168        Ok(res)
169    }
170}
171
172impl nyquest_interface::r#async::AsyncBackend for crate::CurlBackend {
173    type AsyncClient = CurlMultiClient;
174
175    async fn create_async_client(
176        &self,
177        options: nyquest_interface::client::ClientOptions,
178    ) -> Result<Self::AsyncClient, NyquestError> {
179        Ok(CurlMultiClient {
180            inner: Arc::new(CurlMultiClientInner {
181                loop_manager: r#loop::LoopManager::new(),
182                options,
183                share: Share::new(),
184            }),
185        })
186    }
187}