nyquest_backend_curl/
blocking.rs1use std::io;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, Mutex};
4
5use nyquest_interface::blocking::Request;
6use nyquest_interface::{Error as NyquestError, Result as NyquestResult};
7
8mod handler;
9mod multi_easy;
10mod part_reader;
11mod set;
12
13use crate::curl_ng::easy::Share;
14use crate::url::concat_url;
15use multi_easy::MultiEasy;
16
17#[derive(Clone)]
18pub struct CurlEasyClient {
19 options: Arc<nyquest_interface::client::ClientOptions>,
20 slot: Arc<MultiEasySlot>,
21 share: Share,
22}
23
24struct MultiEasySlot {
25 multi_easy: Mutex<Option<MultiEasy>>,
26}
27
28struct EasyHandleGuard<S: AsRef<MultiEasySlot>> {
29 slot: S,
30 handle: ManuallyDrop<Mutex<MultiEasy>>, }
32
33type OwnedEasyHandleGuard = EasyHandleGuard<Arc<MultiEasySlot>>;
34
35pub struct CurlBlockingResponse {
36 status: u16,
37 content_length: Option<u64>,
38 headers: Vec<(String, String)>,
39 handle: OwnedEasyHandleGuard,
40 max_response_buffer_size: Option<u64>,
41}
42
43impl<S: AsRef<MultiEasySlot>> EasyHandleGuard<S> {
44 fn handle_mut(&mut self) -> &mut MultiEasy {
45 self.handle.get_mut().unwrap()
46 }
47}
48
49impl EasyHandleGuard<&'_ Arc<MultiEasySlot>> {
50 fn into_owned(self) -> OwnedEasyHandleGuard {
51 let mut this = ManuallyDrop::new(self);
52 let handle = unsafe { ManuallyDrop::take(&mut this.handle) };
54 EasyHandleGuard {
55 slot: this.slot.clone(),
56 handle: ManuallyDrop::new(handle),
57 }
58 }
59}
60
61impl<S: AsRef<MultiEasySlot>> Drop for EasyHandleGuard<S> {
62 fn drop(&mut self) {
63 let mut handle = unsafe { ManuallyDrop::take(&mut self.handle) };
66 let mut slot = self.slot.as_ref().multi_easy.lock().unwrap();
67 if slot.is_none() && handle.get_mut().unwrap().reset_state().is_ok() {
68 *slot = Some(handle.into_inner().unwrap());
69 }
70 }
71}
72
73impl CurlEasyClient {
74 pub fn new(options: nyquest_interface::client::ClientOptions) -> Self {
75 Self {
76 options: Arc::new(options),
77 slot: Arc::new(MultiEasySlot {
78 multi_easy: Mutex::new(None),
79 }),
80 share: Share::new(),
81 }
82 }
83
84 fn get_or_create_handle(&self) -> NyquestResult<EasyHandleGuard<&Arc<MultiEasySlot>>> {
85 let slot = {
86 let mut slot = self.slot.multi_easy.lock().unwrap();
87 slot.take()
88 };
89 let handle = match slot {
90 Some(handle) => handle,
91 None => MultiEasy::new(&self.share)?,
92 };
93 Ok(EasyHandleGuard {
94 slot: &self.slot,
95 handle: ManuallyDrop::new(Mutex::new(handle)),
96 })
97 }
98}
99
100impl io::Read for CurlBlockingResponse {
101 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
102 let handle = self.handle.handle_mut();
103 let res = handle.poll_bytes(|response_buf| {
104 let len = response_buf.len().min(buf.len());
105 buf[..len].copy_from_slice(&response_buf[..len]);
106 response_buf.drain(..len);
107 len
108 });
109 match res {
110 Ok(len) => Ok(len),
111 Err(NyquestError::RequestTimeout) => Err(io::ErrorKind::TimedOut.into()),
112 Err(NyquestError::Io(e)) => Err(e),
113 Err(e) => unreachable!("Unexpected error: {}", e),
114 }
115 }
116}
117
118impl nyquest_interface::blocking::BlockingResponse for CurlBlockingResponse {
119 fn status(&self) -> u16 {
120 self.status
121 }
122
123 fn content_length(&self) -> Option<u64> {
124 self.content_length
125 }
126
127 fn get_header(&self, header: &str) -> nyquest_interface::Result<Vec<String>> {
128 Ok(self
129 .headers
130 .iter()
131 .filter(|(k, _)| k.eq_ignore_ascii_case(header))
132 .map(|(_, v)| v.clone())
133 .collect())
134 }
135
136 fn text(&mut self) -> nyquest_interface::Result<String> {
137 let buf = self.bytes()?;
138 #[cfg(feature = "charset")]
139 if let Some((_, mut charset)) = self
140 .get_header("content-type")?
141 .pop()
142 .unwrap_or_default()
143 .split(';')
144 .filter_map(|s| s.split_once('='))
145 .find(|(k, _)| k.trim().eq_ignore_ascii_case("charset"))
146 {
147 charset = charset.trim_matches('"');
148 if let Ok(decoded) = iconv_native::decode_lossy(&buf, charset.trim()) {
149 return Ok(decoded);
150 }
151 }
152 Ok(String::from_utf8_lossy(&buf).into_owned())
153 }
154
155 fn bytes(&mut self) -> nyquest_interface::Result<Vec<u8>> {
156 let handle = self.handle.handle_mut();
157 handle.poll_until_whole_response(self.max_response_buffer_size)?;
158 let buf = handle.take_response_buffer();
159 if self
160 .max_response_buffer_size
161 .map(|limit| buf.len() > limit as usize)
162 .unwrap_or_default()
163 {
164 return Err(NyquestError::ResponseTooLarge);
165 }
166 Ok(buf)
167 }
168}
169
170impl nyquest_interface::blocking::BlockingClient for CurlEasyClient {
171 type Response = CurlBlockingResponse;
172
173 fn request(&self, req: Request) -> nyquest_interface::Result<Self::Response> {
174 let mut handle_guard = self.get_or_create_handle()?;
175 let url = concat_url(self.options.base_url.as_deref(), &req.relative_uri);
177 let handle: &mut MultiEasy = handle_guard.handle_mut();
178 handle.populate_request(&url, req, &self.options)?;
179 handle.poll_until_response_headers()?;
180 let mut headers_buf = handle.take_response_headers_buffer();
181 let headers = headers_buf
182 .iter_mut()
183 .filter_map(|line| std::str::from_utf8_mut(&mut *line).ok())
184 .filter_map(|line| line.split_once(':'))
185 .map(|(k, v)| (k.into(), v.trim_start().into()))
186 .collect();
187 Ok(CurlBlockingResponse {
188 status: handle.status()?,
189 content_length: handle.content_length()?,
190 headers,
191 handle: handle_guard.into_owned(),
192 max_response_buffer_size: self.options.max_response_buffer_size,
193 })
194 }
195}
196
197impl nyquest_interface::blocking::BlockingBackend for crate::CurlBackend {
198 type BlockingClient = CurlEasyClient;
199
200 fn create_blocking_client(
201 &self,
202 options: nyquest_interface::client::ClientOptions,
203 ) -> NyquestResult<Self::BlockingClient> {
204 Ok(CurlEasyClient::new(options))
205 }
206}