nyquest_backend_curl/
async.rs1use std::cell::RefCell;
2use std::io;
3use std::pin::pin;
4use std::task::{ready, Poll};
5use std::{pin::Pin, sync::Arc, task::Context};
6
7use futures_util::future::{select, Either};
8use nyquest_interface::r#async::{futures_io, AsyncResponse};
9use nyquest_interface::Error as NyquestError;
10
11mod handler;
12mod r#loop;
13mod pause;
14mod read_task;
15mod set;
16mod shared;
17
18use crate::curl_ng::easy::{AsRawEasyMut as _, Share};
19use crate::r#async::handler::AsyncHandler;
20use crate::request::{create_easy, AsCallbackMut as _};
21use crate::url::concat_url;
22
23type Easy = crate::request::BoxEasyHandle<handler::AsyncHandler>;
24
25pub struct CurlMultiClientInner {
26 options: nyquest_interface::client::ClientOptions,
27 loop_manager: r#loop::LoopManager,
28 share: Share,
29}
30#[derive(Clone)]
31pub struct CurlMultiClient {
32 inner: Arc<CurlMultiClientInner>,
33}
34
35pub struct CurlAsyncResponse {
36 status: u16,
37 content_length: Option<u64>,
38 headers: Vec<(String, String)>,
39 handle: r#loop::RequestHandle,
40 max_response_buffer_size: Option<u64>,
41}
42
43impl AsyncResponse for CurlAsyncResponse {
44 fn status(&self) -> u16 {
45 self.status
46 }
47
48 fn content_length(&self) -> Option<u64> {
49 self.content_length
50 }
51
52 fn get_header(&self, header: &str) -> nyquest_interface::Result<Vec<String>> {
53 Ok(self
54 .headers
55 .iter()
56 .filter(|(k, _)| k.eq_ignore_ascii_case(header))
57 .map(|(_, v)| v.clone())
58 .collect())
59 }
60
61 async fn text(mut self: Pin<&mut Self>) -> nyquest_interface::Result<String> {
62 let buf = self.as_mut().bytes().await?;
63 #[cfg(feature = "charset")]
64 if let Some((_, mut charset)) = self
65 .get_header("content-type")?
66 .pop()
67 .unwrap_or_default()
68 .split(';')
69 .filter_map(|s| s.split_once('='))
70 .find(|(k, _)| k.trim().eq_ignore_ascii_case("charset"))
71 {
72 charset = charset.trim_matches('"');
73 if let Ok(decoded) = iconv_native::decode_lossy(&buf, charset.trim()) {
74 return Ok(decoded);
75 }
76 }
77 Ok(String::from_utf8_lossy(&buf).into_owned())
78 }
79
80 async fn bytes(self: Pin<&mut Self>) -> nyquest_interface::Result<Vec<u8>> {
81 let this = self.get_mut();
82 let mut buf = vec![];
83 while let Some(()) = this
84 .handle
85 .poll_bytes_async(|data| {
86 if let Some(max_response_buffer_size) = this.max_response_buffer_size {
87 if buf.len() + data.len() > max_response_buffer_size as usize {
88 return Err(NyquestError::ResponseTooLarge);
89 }
90 }
91 buf.extend_from_slice(data);
92 data.clear();
93 Ok(())
94 })
95 .await?
96 {}
97 Ok(buf)
98 }
99}
100
101impl futures_io::AsyncRead for CurlAsyncResponse {
102 fn poll_read(
103 self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 buf: &mut [u8],
106 ) -> Poll<io::Result<usize>> {
107 let this = self.get_mut();
108 let poll_res = ready!(this.handle.poll_bytes(cx, |data| {
109 let read_len = data.len().min(buf.len());
110 buf[..read_len].copy_from_slice(&data[..read_len]);
111 data.drain(..read_len);
112 Ok(read_len)
113 }));
114 Poll::Ready(match poll_res {
115 Ok(None) => Ok(0),
116 Ok(Some(read_len)) => Ok(read_len),
117 Err(NyquestError::RequestTimeout) => {
118 return Poll::Ready(Err(io::ErrorKind::TimedOut.into()))
119 }
120 Err(NyquestError::Io(e)) => return Poll::Ready(Err(e)),
121 Err(e) => unreachable!("Unexpected error: {}", e),
122 })
123 }
124}
125
126impl nyquest_interface::r#async::AsyncClient for CurlMultiClient {
127 type Response = CurlAsyncResponse;
128
129 async fn request(
130 &self,
131 req: nyquest_interface::r#async::Request,
132 ) -> nyquest_interface::Result<Self::Response> {
133 let (req, read_task_collection) = {
134 let mut easy = create_easy(AsyncHandler::default(), &self.inner.share)?;
135 let raw = easy.as_mut().as_raw_easy_mut().raw();
136 easy.as_callback_mut().pause = Some(pause::EasyPause::new(raw));
137 let url = concat_url(self.inner.options.base_url.as_deref(), &req.relative_uri);
139 let req_ctx = easy.as_callback_mut().ctx.clone();
140 let read_task_collection = RefCell::new(read_task::ReadTaskCollection::new(req_ctx));
141 crate::request::populate_request(
142 &url,
143 req,
144 &self.inner.options,
145 easy.as_mut(),
146 |easy, stream| {
147 read_task_collection
148 .borrow_mut()
149 .add_in_handler(easy, stream)
150 },
151 |stream| {
152 read_task_collection
153 .borrow_mut()
154 .add_mime_part_reader(stream)
155 },
156 )?;
157 let req = self.inner.loop_manager.start_request(easy).await?;
158 (req, read_task_collection.into_inner())
159 };
160 let res_task = pin!(req.wait_for_response());
161 let read_task_collection = pin!(read_task_collection.execute(&self.inner.loop_manager));
162 let mut res = match select(res_task, read_task_collection).await {
163 Either::Left((res, _)) => res?,
164 Either::Right((Err(e), _)) => return Err(e),
165 Either::Right((Ok(_), _)) => unreachable!(),
166 };
167 res.max_response_buffer_size = self.inner.options.max_response_buffer_size;
168 Ok(res)
169 }
170}
171
172impl nyquest_interface::r#async::AsyncBackend for crate::CurlBackend {
173 type AsyncClient = CurlMultiClient;
174
175 async fn create_async_client(
176 &self,
177 options: nyquest_interface::client::ClientOptions,
178 ) -> Result<Self::AsyncClient, NyquestError> {
179 Ok(CurlMultiClient {
180 inner: Arc::new(CurlMultiClientInner {
181 loop_manager: r#loop::LoopManager::new(),
182 options,
183 share: Share::new(),
184 }),
185 })
186 }
187}