nyquest_backend_curl/
blocking.rs

1use std::io;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, Mutex};
4
5use nyquest_interface::blocking::Request;
6use nyquest_interface::{Error as NyquestError, Result as NyquestResult};
7
8mod handler;
9mod multi_easy;
10
11use crate::share::Share;
12use crate::url::concat_url;
13use multi_easy::MultiEasy;
14
15#[derive(Clone)]
16pub struct CurlEasyClient {
17    options: Arc<nyquest_interface::client::ClientOptions>,
18    slot: Arc<Mutex<Option<MultiEasy>>>,
19    share: Share,
20}
21
22struct EasyHandleGuard<S: AsRef<Mutex<Option<MultiEasy>>>> {
23    slot: S,
24    handle: ManuallyDrop<Mutex<MultiEasy>>, // TODO: use std::sync::Exclusive when stabilized
25}
26
27type OwnedEasyHandleGuard = EasyHandleGuard<Arc<Mutex<Option<MultiEasy>>>>;
28
29pub struct CurlBlockingResponse {
30    status: u16,
31    content_length: Option<u64>,
32    headers: Vec<(String, String)>,
33    handle: OwnedEasyHandleGuard,
34    max_response_buffer_size: Option<u64>,
35}
36
37impl<S: AsRef<Mutex<Option<MultiEasy>>>> EasyHandleGuard<S> {
38    fn handle_mut(&mut self) -> &mut MultiEasy {
39        self.handle.get_mut().unwrap()
40    }
41}
42
43impl EasyHandleGuard<&'_ Arc<Mutex<Option<MultiEasy>>>> {
44    fn into_owned(self) -> OwnedEasyHandleGuard {
45        let mut this = ManuallyDrop::new(self);
46        // Safety: self inside ManuallyDrop will not be dropped, hence the handle will not be taken out from Drop
47        let handle = unsafe { ManuallyDrop::take(&mut this.handle) };
48        EasyHandleGuard {
49            slot: this.slot.clone(),
50            handle: ManuallyDrop::new(handle),
51        }
52    }
53}
54
55impl<S: AsRef<Mutex<Option<MultiEasy>>>> Drop for EasyHandleGuard<S> {
56    fn drop(&mut self) {
57        // Safety: the handle is only taken out once which is here, except in `into_owned` where a `ManuallyDrop` is
58        // used to suppress our Drop
59        let mut handle = unsafe { ManuallyDrop::take(&mut self.handle) };
60        let mut slot = self.slot.as_ref().lock().unwrap();
61        if slot.is_none() {
62            handle.get_mut().unwrap().reset_state();
63            *slot = Some(handle.into_inner().unwrap());
64        }
65    }
66}
67
68impl CurlEasyClient {
69    pub fn new(options: nyquest_interface::client::ClientOptions) -> Self {
70        Self {
71            options: Arc::new(options),
72            slot: Arc::new(Mutex::new(None)),
73            share: Share::new(),
74        }
75    }
76
77    fn get_or_create_handle(&self) -> EasyHandleGuard<&Arc<Mutex<Option<MultiEasy>>>> {
78        let slot = {
79            let mut slot = self.slot.lock().unwrap();
80            slot.take()
81        };
82        let handle = match slot {
83            Some(handle) => handle,
84            None => MultiEasy::new(self.share.clone()),
85        };
86        EasyHandleGuard {
87            slot: &self.slot,
88            handle: ManuallyDrop::new(Mutex::new(handle)),
89        }
90    }
91}
92
93impl io::Read for CurlBlockingResponse {
94    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
95        let handle = self.handle.handle_mut();
96        let res = handle.poll_bytes(|response_buf| {
97            let len = response_buf.len().min(buf.len());
98            buf[..len].copy_from_slice(&response_buf[..len]);
99            response_buf.drain(..len);
100            len
101        });
102        match res {
103            Ok(len) => Ok(len),
104            Err(NyquestError::RequestTimeout) => Err(io::ErrorKind::TimedOut.into()),
105            Err(NyquestError::Io(e)) => Err(e),
106            Err(e) => unreachable!("Unexpected error: {}", e),
107        }
108    }
109}
110
111impl nyquest_interface::blocking::BlockingResponse for CurlBlockingResponse {
112    fn status(&self) -> u16 {
113        self.status
114    }
115
116    fn content_length(&self) -> Option<u64> {
117        self.content_length
118    }
119
120    fn get_header(&self, header: &str) -> nyquest_interface::Result<Vec<String>> {
121        Ok(self
122            .headers
123            .iter()
124            .filter(|(k, _)| k.eq_ignore_ascii_case(header))
125            .map(|(_, v)| v.clone())
126            .collect())
127    }
128
129    fn text(&mut self) -> nyquest_interface::Result<String> {
130        let buf = self.bytes()?;
131        #[cfg(feature = "charset")]
132        if let Some((_, mut charset)) = self
133            .get_header("content-type")?
134            .pop()
135            .unwrap_or_default()
136            .split(';')
137            .filter_map(|s| s.split_once('='))
138            .find(|(k, _)| k.trim().eq_ignore_ascii_case("charset"))
139        {
140            charset = charset.trim_matches('"');
141            if let Ok(decoded) = iconv_native::decode_lossy(&buf, charset.trim()) {
142                return Ok(decoded);
143            }
144        }
145        Ok(String::from_utf8_lossy(&buf).into_owned())
146    }
147
148    fn bytes(&mut self) -> nyquest_interface::Result<Vec<u8>> {
149        let handle = self.handle.handle_mut();
150        handle.poll_until_whole_response(self.max_response_buffer_size)?;
151        let buf = handle.take_response_buffer();
152        if self
153            .max_response_buffer_size
154            .map(|limit| buf.len() > limit as usize)
155            .unwrap_or_default()
156        {
157            return Err(NyquestError::ResponseTooLarge);
158        }
159        Ok(buf)
160    }
161}
162
163impl nyquest_interface::blocking::BlockingClient for CurlEasyClient {
164    type Response = CurlBlockingResponse;
165
166    fn request(&self, req: Request) -> nyquest_interface::Result<Self::Response> {
167        let mut handle_guard = self.get_or_create_handle();
168        // FIXME: properly concat base_url and url
169        let url = concat_url(self.options.base_url.as_deref(), &req.relative_uri);
170        let handle = handle_guard.handle_mut();
171        handle.populate_request(&url, req, &self.options)?;
172        handle.poll_until_response_headers()?;
173        let mut headers_buf = handle.take_response_headers_buffer();
174        let headers = headers_buf
175            .iter_mut()
176            .filter_map(|line| std::str::from_utf8_mut(&mut *line).ok())
177            .filter_map(|line| line.split_once(':'))
178            .map(|(k, v)| (k.into(), v.trim_start().into()))
179            .collect();
180        Ok(CurlBlockingResponse {
181            status: handle.status()?,
182            content_length: handle.content_length()?,
183            headers,
184            handle: handle_guard.into_owned(),
185            max_response_buffer_size: self.options.max_response_buffer_size,
186        })
187    }
188}
189
190impl nyquest_interface::blocking::BlockingBackend for crate::CurlBackend {
191    type BlockingClient = CurlEasyClient;
192
193    fn create_blocking_client(
194        &self,
195        options: nyquest_interface::client::ClientOptions,
196    ) -> NyquestResult<Self::BlockingClient> {
197        Ok(CurlEasyClient::new(options))
198    }
199}