nyquest_backend_curl/
blocking.rs

1use std::io;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, Mutex};
4
5use nyquest_interface::blocking::Request;
6use nyquest_interface::{Error as NyquestError, Result as NyquestResult};
7
8mod handler;
9mod multi_easy;
10mod part_reader;
11mod set;
12
13use crate::curl_ng::easy::Share;
14use crate::url::concat_url;
15use multi_easy::MultiEasy;
16
17#[derive(Clone)]
18pub struct CurlEasyClient {
19    options: Arc<nyquest_interface::client::ClientOptions>,
20    slot: Arc<MultiEasySlot>,
21    share: Share,
22}
23
24struct MultiEasySlot {
25    multi_easy: Mutex<Option<MultiEasy>>,
26}
27
28struct EasyHandleGuard<S: AsRef<MultiEasySlot>> {
29    slot: S,
30    handle: ManuallyDrop<Mutex<MultiEasy>>, // TODO: use std::sync::Exclusive when stabilized
31}
32
33type OwnedEasyHandleGuard = EasyHandleGuard<Arc<MultiEasySlot>>;
34
35pub struct CurlBlockingResponse {
36    status: u16,
37    content_length: Option<u64>,
38    headers: Vec<(String, String)>,
39    handle: OwnedEasyHandleGuard,
40    max_response_buffer_size: Option<u64>,
41}
42
43impl<S: AsRef<MultiEasySlot>> EasyHandleGuard<S> {
44    fn handle_mut(&mut self) -> &mut MultiEasy {
45        self.handle.get_mut().unwrap()
46    }
47}
48
49impl EasyHandleGuard<&'_ Arc<MultiEasySlot>> {
50    fn into_owned(self) -> OwnedEasyHandleGuard {
51        let mut this = ManuallyDrop::new(self);
52        // Safety: self inside ManuallyDrop will not be dropped, hence the handle will not be taken out from Drop
53        let handle = unsafe { ManuallyDrop::take(&mut this.handle) };
54        EasyHandleGuard {
55            slot: this.slot.clone(),
56            handle: ManuallyDrop::new(handle),
57        }
58    }
59}
60
61impl<S: AsRef<MultiEasySlot>> Drop for EasyHandleGuard<S> {
62    fn drop(&mut self) {
63        // Safety: the handle is only taken out once which is here, except in `into_owned` where a `ManuallyDrop` is
64        // used to suppress our Drop
65        let mut handle = unsafe { ManuallyDrop::take(&mut self.handle) };
66        let mut slot = self.slot.as_ref().multi_easy.lock().unwrap();
67        if slot.is_none() && handle.get_mut().unwrap().reset_state().is_ok() {
68            *slot = Some(handle.into_inner().unwrap());
69        }
70    }
71}
72
73impl CurlEasyClient {
74    pub fn new(options: nyquest_interface::client::ClientOptions) -> Self {
75        Self {
76            options: Arc::new(options),
77            slot: Arc::new(MultiEasySlot {
78                multi_easy: Mutex::new(None),
79            }),
80            share: Share::new(),
81        }
82    }
83
84    fn get_or_create_handle(&self) -> NyquestResult<EasyHandleGuard<&Arc<MultiEasySlot>>> {
85        let slot = {
86            let mut slot = self.slot.multi_easy.lock().unwrap();
87            slot.take()
88        };
89        let handle = match slot {
90            Some(handle) => handle,
91            None => MultiEasy::new(&self.share)?,
92        };
93        Ok(EasyHandleGuard {
94            slot: &self.slot,
95            handle: ManuallyDrop::new(Mutex::new(handle)),
96        })
97    }
98}
99
100impl io::Read for CurlBlockingResponse {
101    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
102        let handle = self.handle.handle_mut();
103        let res = handle.poll_bytes(|response_buf| {
104            let len = response_buf.len().min(buf.len());
105            buf[..len].copy_from_slice(&response_buf[..len]);
106            response_buf.drain(..len);
107            len
108        });
109        match res {
110            Ok(len) => Ok(len),
111            Err(NyquestError::RequestTimeout) => Err(io::ErrorKind::TimedOut.into()),
112            Err(NyquestError::Io(e)) => Err(e),
113            Err(e) => unreachable!("Unexpected error: {}", e),
114        }
115    }
116}
117
118impl nyquest_interface::blocking::BlockingResponse for CurlBlockingResponse {
119    fn status(&self) -> u16 {
120        self.status
121    }
122
123    fn content_length(&self) -> Option<u64> {
124        self.content_length
125    }
126
127    fn get_header(&self, header: &str) -> nyquest_interface::Result<Vec<String>> {
128        Ok(self
129            .headers
130            .iter()
131            .filter(|(k, _)| k.eq_ignore_ascii_case(header))
132            .map(|(_, v)| v.clone())
133            .collect())
134    }
135
136    fn text(&mut self) -> nyquest_interface::Result<String> {
137        let buf = self.bytes()?;
138        #[cfg(feature = "charset")]
139        if let Some((_, mut charset)) = self
140            .get_header("content-type")?
141            .pop()
142            .unwrap_or_default()
143            .split(';')
144            .filter_map(|s| s.split_once('='))
145            .find(|(k, _)| k.trim().eq_ignore_ascii_case("charset"))
146        {
147            charset = charset.trim_matches('"');
148            if let Ok(decoded) = iconv_native::decode_lossy(&buf, charset.trim()) {
149                return Ok(decoded);
150            }
151        }
152        Ok(String::from_utf8_lossy(&buf).into_owned())
153    }
154
155    fn bytes(&mut self) -> nyquest_interface::Result<Vec<u8>> {
156        let handle = self.handle.handle_mut();
157        handle.poll_until_whole_response(self.max_response_buffer_size)?;
158        let buf = handle.take_response_buffer();
159        if self
160            .max_response_buffer_size
161            .map(|limit| buf.len() > limit as usize)
162            .unwrap_or_default()
163        {
164            return Err(NyquestError::ResponseTooLarge);
165        }
166        Ok(buf)
167    }
168}
169
170impl nyquest_interface::blocking::BlockingClient for CurlEasyClient {
171    type Response = CurlBlockingResponse;
172
173    fn request(&self, req: Request) -> nyquest_interface::Result<Self::Response> {
174        let mut handle_guard = self.get_or_create_handle()?;
175        // FIXME: properly concat base_url and url
176        let url = concat_url(self.options.base_url.as_deref(), &req.relative_uri);
177        let handle: &mut MultiEasy = handle_guard.handle_mut();
178        handle.populate_request(&url, req, &self.options)?;
179        handle.poll_until_response_headers()?;
180        let mut headers_buf = handle.take_response_headers_buffer();
181        let headers = headers_buf
182            .iter_mut()
183            .filter_map(|line| std::str::from_utf8_mut(&mut *line).ok())
184            .filter_map(|line| line.split_once(':'))
185            .map(|(k, v)| (k.into(), v.trim_start().into()))
186            .collect();
187        Ok(CurlBlockingResponse {
188            status: handle.status()?,
189            content_length: handle.content_length()?,
190            headers,
191            handle: handle_guard.into_owned(),
192            max_response_buffer_size: self.options.max_response_buffer_size,
193        })
194    }
195}
196
197impl nyquest_interface::blocking::BlockingBackend for crate::CurlBackend {
198    type BlockingClient = CurlEasyClient;
199
200    fn create_blocking_client(
201        &self,
202        options: nyquest_interface::client::ClientOptions,
203    ) -> NyquestResult<Self::BlockingClient> {
204        Ok(CurlEasyClient::new(options))
205    }
206}