nyquest_backend_curl/
blocking.rs1use std::io;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, Mutex};
4
5use nyquest_interface::blocking::Request;
6use nyquest_interface::{Error as NyquestError, Result as NyquestResult};
7
8mod handler;
9mod multi_easy;
10
11use crate::share::Share;
12use crate::url::concat_url;
13use multi_easy::MultiEasy;
14
15#[derive(Clone)]
16pub struct CurlEasyClient {
17 options: Arc<nyquest_interface::client::ClientOptions>,
18 slot: Arc<Mutex<Option<MultiEasy>>>,
19 share: Share,
20}
21
22struct EasyHandleGuard<S: AsRef<Mutex<Option<MultiEasy>>>> {
23 slot: S,
24 handle: ManuallyDrop<Mutex<MultiEasy>>, }
26
27type OwnedEasyHandleGuard = EasyHandleGuard<Arc<Mutex<Option<MultiEasy>>>>;
28
29pub struct CurlBlockingResponse {
30 status: u16,
31 content_length: Option<u64>,
32 headers: Vec<(String, String)>,
33 handle: OwnedEasyHandleGuard,
34 max_response_buffer_size: Option<u64>,
35}
36
37impl<S: AsRef<Mutex<Option<MultiEasy>>>> EasyHandleGuard<S> {
38 fn handle_mut(&mut self) -> &mut MultiEasy {
39 self.handle.get_mut().unwrap()
40 }
41}
42
43impl EasyHandleGuard<&'_ Arc<Mutex<Option<MultiEasy>>>> {
44 fn into_owned(self) -> OwnedEasyHandleGuard {
45 let mut this = ManuallyDrop::new(self);
46 let handle = unsafe { ManuallyDrop::take(&mut this.handle) };
48 EasyHandleGuard {
49 slot: this.slot.clone(),
50 handle: ManuallyDrop::new(handle),
51 }
52 }
53}
54
55impl<S: AsRef<Mutex<Option<MultiEasy>>>> Drop for EasyHandleGuard<S> {
56 fn drop(&mut self) {
57 let mut handle = unsafe { ManuallyDrop::take(&mut self.handle) };
60 let mut slot = self.slot.as_ref().lock().unwrap();
61 if slot.is_none() {
62 handle.get_mut().unwrap().reset_state();
63 *slot = Some(handle.into_inner().unwrap());
64 }
65 }
66}
67
68impl CurlEasyClient {
69 pub fn new(options: nyquest_interface::client::ClientOptions) -> Self {
70 Self {
71 options: Arc::new(options),
72 slot: Arc::new(Mutex::new(None)),
73 share: Share::new(),
74 }
75 }
76
77 fn get_or_create_handle(&self) -> EasyHandleGuard<&Arc<Mutex<Option<MultiEasy>>>> {
78 let slot = {
79 let mut slot = self.slot.lock().unwrap();
80 slot.take()
81 };
82 let handle = match slot {
83 Some(handle) => handle,
84 None => MultiEasy::new(self.share.clone()),
85 };
86 EasyHandleGuard {
87 slot: &self.slot,
88 handle: ManuallyDrop::new(Mutex::new(handle)),
89 }
90 }
91}
92
93impl io::Read for CurlBlockingResponse {
94 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
95 let handle = self.handle.handle_mut();
96 let res = handle.poll_bytes(|response_buf| {
97 let len = response_buf.len().min(buf.len());
98 buf[..len].copy_from_slice(&response_buf[..len]);
99 response_buf.drain(..len);
100 len
101 });
102 match res {
103 Ok(len) => Ok(len),
104 Err(NyquestError::RequestTimeout) => Err(io::ErrorKind::TimedOut.into()),
105 Err(NyquestError::Io(e)) => Err(e),
106 Err(e) => unreachable!("Unexpected error: {}", e),
107 }
108 }
109}
110
111impl nyquest_interface::blocking::BlockingResponse for CurlBlockingResponse {
112 fn status(&self) -> u16 {
113 self.status
114 }
115
116 fn content_length(&self) -> Option<u64> {
117 self.content_length
118 }
119
120 fn get_header(&self, header: &str) -> nyquest_interface::Result<Vec<String>> {
121 Ok(self
122 .headers
123 .iter()
124 .filter(|(k, _)| k.eq_ignore_ascii_case(header))
125 .map(|(_, v)| v.clone())
126 .collect())
127 }
128
129 fn text(&mut self) -> nyquest_interface::Result<String> {
130 let buf = self.bytes()?;
131 #[cfg(feature = "charset")]
132 if let Some((_, mut charset)) = self
133 .get_header("content-type")?
134 .pop()
135 .unwrap_or_default()
136 .split(';')
137 .filter_map(|s| s.split_once('='))
138 .find(|(k, _)| k.trim().eq_ignore_ascii_case("charset"))
139 {
140 charset = charset.trim_matches('"');
141 if let Ok(decoded) = iconv_native::decode_lossy(&buf, charset.trim()) {
142 return Ok(decoded);
143 }
144 }
145 Ok(String::from_utf8_lossy(&buf).into_owned())
146 }
147
148 fn bytes(&mut self) -> nyquest_interface::Result<Vec<u8>> {
149 let handle = self.handle.handle_mut();
150 handle.poll_until_whole_response(self.max_response_buffer_size)?;
151 let buf = handle.take_response_buffer();
152 if self
153 .max_response_buffer_size
154 .map(|limit| buf.len() > limit as usize)
155 .unwrap_or_default()
156 {
157 return Err(NyquestError::ResponseTooLarge);
158 }
159 Ok(buf)
160 }
161}
162
163impl nyquest_interface::blocking::BlockingClient for CurlEasyClient {
164 type Response = CurlBlockingResponse;
165
166 fn request(&self, req: Request) -> nyquest_interface::Result<Self::Response> {
167 let mut handle_guard = self.get_or_create_handle();
168 let url = concat_url(self.options.base_url.as_deref(), &req.relative_uri);
170 let handle = handle_guard.handle_mut();
171 handle.populate_request(&url, req, &self.options)?;
172 handle.poll_until_response_headers()?;
173 let mut headers_buf = handle.take_response_headers_buffer();
174 let headers = headers_buf
175 .iter_mut()
176 .filter_map(|line| std::str::from_utf8_mut(&mut *line).ok())
177 .filter_map(|line| line.split_once(':'))
178 .map(|(k, v)| (k.into(), v.trim_start().into()))
179 .collect();
180 Ok(CurlBlockingResponse {
181 status: handle.status()?,
182 content_length: handle.content_length()?,
183 headers,
184 handle: handle_guard.into_owned(),
185 max_response_buffer_size: self.options.max_response_buffer_size,
186 })
187 }
188}
189
190impl nyquest_interface::blocking::BlockingBackend for crate::CurlBackend {
191 type BlockingClient = CurlEasyClient;
192
193 fn create_blocking_client(
194 &self,
195 options: nyquest_interface::client::ClientOptions,
196 ) -> NyquestResult<Self::BlockingClient> {
197 Ok(CurlEasyClient::new(options))
198 }
199}