gix_transport/client/blocking_io/http/curl/
remote.rs1use std::{
2 io,
3 io::{Read, Write},
4 sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
5 thread,
6 time::Duration,
7};
8
9use curl::easy::{Auth, Easy2};
10use gix_features::io::pipe;
11
12use crate::client::blocking_io::http::{
13 self,
14 curl::curl_is_spurious,
15 curl::Error,
16 options::{FollowRedirects, HttpVersion, ProxyAuthMethod, SslVersion},
17 redirect,
18 traits::PostBodyDataKind,
19};
20
21enum StreamOrBuffer {
22 Stream(pipe::Reader),
23 Buffer(std::io::Cursor<Vec<u8>>),
24}
25
26#[derive(Default)]
27struct Handler {
28 send_header: Option<pipe::Writer>,
29 send_data: Option<pipe::Writer>,
30 receive_body: Option<StreamOrBuffer>,
31 checked_status: bool,
32 last_status: usize,
33 follow: FollowRedirects,
34}
35
36impl Handler {
37 fn reset(&mut self) {
38 self.checked_status = false;
39 self.last_status = 0;
40 self.follow = FollowRedirects::default();
41 }
42 fn parse_status_inner(data: &[u8]) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
43 let code = data
44 .split(|b| *b == b' ')
45 .nth(1)
46 .ok_or("Expected HTTP/<VERSION> STATUS")?;
47 let code = std::str::from_utf8(code)?;
48 code.parse().map_err(Into::into)
49 }
50 fn parse_status(data: &[u8], follow: FollowRedirects) -> Option<(usize, Box<dyn std::error::Error + Send + Sync>)> {
51 let valid_end = match follow {
52 FollowRedirects::Initial | FollowRedirects::All => 308,
53 FollowRedirects::None => 299,
54 };
55 match Self::parse_status_inner(data) {
56 Ok(status) if !(200..=valid_end).contains(&status) => {
57 Some((status, format!("Received HTTP status {status}").into()))
58 }
59 Ok(_) => None,
60 Err(err) => Some((500, err)),
61 }
62 }
63}
64
65impl curl::easy::Handler for Handler {
66 fn write(&mut self, data: &[u8]) -> Result<usize, curl::easy::WriteError> {
67 drop(self.send_header.take()); match self.send_data.as_mut() {
69 Some(writer) => writer.write_all(data).map(|_| data.len()).or(Ok(0)),
70 None => Ok(0), }
72 }
73 fn read(&mut self, data: &mut [u8]) -> Result<usize, curl::easy::ReadError> {
74 match self.receive_body.as_mut() {
75 Some(StreamOrBuffer::Stream(reader)) => reader.read(data).map_err(|_err| curl::easy::ReadError::Abort),
76 Some(StreamOrBuffer::Buffer(cursor)) => cursor.read(data).map_err(|_err| curl::easy::ReadError::Abort),
77 None => Ok(0), }
79 }
80
81 fn header(&mut self, data: &[u8]) -> bool {
82 if let Some(writer) = self.send_header.as_mut() {
83 if self.checked_status {
84 writer.write_all(data).ok();
85 } else {
86 self.checked_status = true;
87 self.last_status = 200;
88 if let Some((status, err)) = Handler::parse_status(data, self.follow) {
89 self.last_status = status;
90 writer
91 .channel
92 .send(Err(io::Error::new(
93 if status == 401 {
94 io::ErrorKind::PermissionDenied
95 } else if (500..600).contains(&status) {
96 io::ErrorKind::ConnectionAborted
97 } else {
98 io::ErrorKind::Other
99 },
100 err,
101 )))
102 .ok();
103 }
104 }
105 }
106 true
107 }
108}
109
110pub struct Request {
111 pub url: String,
112 pub base_url: String,
113 pub headers: curl::easy::List,
114 pub upload_body_kind: Option<PostBodyDataKind>,
115 pub config: http::Options,
116}
117
118pub struct Response {
119 pub headers: pipe::Reader,
120 pub body: pipe::Reader,
121 pub upload_body: pipe::Writer,
122}
123
124pub fn new() -> (
125 thread::JoinHandle<Result<(), Error>>,
126 SyncSender<Request>,
127 Receiver<Response>,
128) {
129 let (req_send, req_recv) = sync_channel(0);
130 let (res_send, res_recv) = sync_channel(0);
131 let handle = std::thread::spawn(move || -> Result<(), Error> {
132 let mut handle = Easy2::new(Handler::default());
133 handle.pipewait(false)?;
135 handle.tcp_keepalive(true)?;
136
137 let mut follow = None;
138 let mut redirected_base_url = None::<String>;
139
140 for Request {
141 url,
142 base_url,
143 mut headers,
144 upload_body_kind,
145 config:
146 http::Options {
147 extra_headers,
148 follow_redirects,
149 low_speed_limit_bytes_per_second,
150 low_speed_time_seconds,
151 connect_timeout,
152 proxy,
153 no_proxy,
154 proxy_auth_method,
155 user_agent,
156 proxy_authenticate,
157 verbose,
158 ssl_ca_info,
159 ssl_version,
160 ssl_verify,
161 http_version,
162 backend,
163 },
164 } in req_recv
165 {
166 let effective_url = redirect::swap_tails(redirected_base_url.as_deref(), &base_url, url.clone());
167 handle.url(&effective_url)?;
168
169 handle.post(upload_body_kind.is_some())?;
170 for header in extra_headers {
171 headers.append(&header)?;
172 }
173 headers.append("Expect:")?;
175 handle.verbose(verbose)?;
176
177 if let Some(ca_info) = ssl_ca_info {
178 handle.cainfo(ca_info)?;
179 }
180
181 if let Some(ref mut curl_options) = backend.as_ref().and_then(|backend| backend.lock().ok()) {
182 if let Some(opts) = curl_options.downcast_mut::<super::Options>() {
183 if let Some(enabled) = opts.schannel_check_revoke {
184 handle.ssl_options(curl::easy::SslOpt::new().no_revoke(!enabled))?;
185 }
186 }
187 }
188
189 if let Some(ssl_version) = ssl_version {
190 let (min, max) = ssl_version.min_max();
191 if min == max {
192 handle.ssl_version(to_curl_ssl_version(min))?;
193 } else {
194 handle.ssl_min_max_version(to_curl_ssl_version(min), to_curl_ssl_version(max))?;
195 }
196 }
197
198 handle.ssl_verify_peer(ssl_verify)?;
199 handle.ssl_verify_host(ssl_verify)?;
200
201 if let Some(http_version) = http_version {
202 let version = match http_version {
203 HttpVersion::V1_1 => curl::easy::HttpVersion::V11,
204 HttpVersion::V2 => curl::easy::HttpVersion::V2,
205 };
206 handle.http_version(version).ok();
211 }
212
213 let mut proxy_auth_action = None;
214 if let Some(proxy) = proxy {
215 handle.proxy(&proxy)?;
216 let proxy_type = if proxy.starts_with("socks5h") {
217 curl::easy::ProxyType::Socks5Hostname
218 } else if proxy.starts_with("socks5") {
219 curl::easy::ProxyType::Socks5
220 } else if proxy.starts_with("socks4a") {
221 curl::easy::ProxyType::Socks4a
222 } else if proxy.starts_with("socks") {
223 curl::easy::ProxyType::Socks4
224 } else {
225 curl::easy::ProxyType::Http
226 };
227 handle.proxy_type(proxy_type)?;
228
229 if let Some((obtain_creds_action, authenticate)) = proxy_authenticate {
230 let creds = authenticate.lock().expect("no panics in other threads")(obtain_creds_action)?
231 .expect("action to fetch credentials");
232 handle.proxy_username(&creds.identity.username)?;
233 handle.proxy_password(&creds.identity.password)?;
234 proxy_auth_action = Some((creds.next, authenticate));
235 }
236 }
237 if let Some(no_proxy) = no_proxy {
238 handle.noproxy(&no_proxy)?;
239 }
240 if let Some(user_agent) = user_agent {
241 handle.useragent(&user_agent)?;
242 }
243 handle.transfer_encoding(false)?;
244 if let Some(timeout) = connect_timeout {
245 handle.connect_timeout(timeout)?;
246 }
247 {
248 let mut auth = Auth::new();
249 match proxy_auth_method {
250 ProxyAuthMethod::AnyAuth => auth
251 .basic(true)
252 .digest(true)
253 .digest_ie(true)
254 .gssnegotiate(true)
255 .ntlm(true)
256 .aws_sigv4(true),
257 ProxyAuthMethod::Basic => auth.basic(true),
258 ProxyAuthMethod::Digest => auth.digest(true),
259 ProxyAuthMethod::Negotiate => auth.digest_ie(true),
260 ProxyAuthMethod::Ntlm => auth.ntlm(true),
261 };
262 handle.proxy_auth(&auth)?;
263 }
264 handle.tcp_keepalive(true)?;
265
266 if low_speed_time_seconds > 0 && low_speed_limit_bytes_per_second > 0 {
267 handle.low_speed_limit(low_speed_limit_bytes_per_second)?;
268 handle.low_speed_time(Duration::from_secs(low_speed_time_seconds))?;
269 }
270 let (receive_data, receive_headers, send_body, mut receive_body) = {
271 let handler = handle.get_mut();
272 let (send, receive_data) = pipe::unidirectional(1);
273 handler.send_data = Some(send);
274 let (send, receive_headers) = pipe::unidirectional(1);
275 handler.send_header = Some(send);
276 let (send_body, receive_body) = pipe::unidirectional(0);
277 (receive_data, receive_headers, send_body, receive_body)
278 };
279
280 let follow = follow.get_or_insert(follow_redirects);
281 handle.get_mut().follow = *follow;
282 handle.follow_location(matches!(*follow, FollowRedirects::Initial | FollowRedirects::All))?;
283
284 if *follow == FollowRedirects::Initial {
285 *follow = FollowRedirects::None;
286 }
287
288 if res_send
289 .send(Response {
290 headers: receive_headers,
291 body: receive_data,
292 upload_body: send_body,
293 })
294 .is_err()
295 {
296 break;
297 }
298
299 handle.get_mut().receive_body = Some(match upload_body_kind {
300 Some(PostBodyDataKind::Unbounded) | None => StreamOrBuffer::Stream(receive_body),
301 Some(PostBodyDataKind::BoundedAndFitsIntoMemory) => {
302 let mut buf = Vec::<u8>::with_capacity(512);
303 receive_body.read_to_end(&mut buf)?;
304 handle.post_field_size(buf.len() as u64)?;
305 drop(receive_body);
306 StreamOrBuffer::Buffer(std::io::Cursor::new(buf))
307 }
308 });
309 handle.http_headers(headers)?;
310
311 if let Err(err) = handle.perform() {
312 let handler = handle.get_mut();
313 handler.reset();
314
315 if let Some((action, authenticate)) = proxy_auth_action {
316 authenticate.lock().expect("no panics in other threads")(action.erase()).ok();
317 }
318 let err = Err(io::Error::new(
319 if curl_is_spurious(&err) {
320 std::io::ErrorKind::ConnectionReset
321 } else {
322 std::io::ErrorKind::Other
323 },
324 err,
325 ));
326 handler.receive_body.take();
327 match (handler.send_header.take(), handler.send_data.take()) {
328 (Some(header), mut data) => {
329 if let Err(TrySendError::Disconnected(err) | TrySendError::Full(err)) =
330 header.channel.try_send(err)
331 {
332 if let Some(body) = data.take() {
333 body.channel.try_send(err).ok();
334 }
335 }
336 }
337 (None, Some(body)) => {
338 body.channel.try_send(err).ok();
339 }
340 (None, None) => {}
341 }
342 } else {
343 let handler = handle.get_mut();
344 if let Some((action, authenticate)) = proxy_auth_action {
345 authenticate.lock().expect("no panics in other threads")(if handler.last_status == 200 {
346 action.store()
347 } else {
348 action.erase()
349 })?;
350 }
351 handler.reset();
352 handler.receive_body.take();
353 handler.send_header.take();
354 handler.send_data.take();
355 let actual_url = handle
356 .effective_url()?
357 .expect("effective url is present and valid UTF-8");
358 if actual_url != effective_url {
359 redirected_base_url = redirect::base_url(actual_url, &base_url, url)?.into();
360 }
361 }
362 }
363 Ok(())
364 });
365 (handle, req_send, res_recv)
366}
367
368fn to_curl_ssl_version(vers: SslVersion) -> curl::easy::SslVersion {
369 use curl::easy::SslVersion::*;
370 match vers {
371 SslVersion::Default => Default,
372 SslVersion::TlsV1 => Tlsv1,
373 SslVersion::SslV2 => Sslv2,
374 SslVersion::SslV3 => Sslv3,
375 SslVersion::TlsV1_0 => Tlsv10,
376 SslVersion::TlsV1_1 => Tlsv11,
377 SslVersion::TlsV1_2 => Tlsv12,
378 SslVersion::TlsV1_3 => Tlsv13,
379 }
380}
381
382impl From<Error> for http::Error {
383 fn from(err: Error) -> Self {
384 http::Error::Detail {
385 description: err.to_string(),
386 }
387 }
388}
389
390impl From<curl::Error> for http::Error {
391 fn from(err: curl::Error) -> Self {
392 http::Error::Detail {
393 description: err.to_string(),
394 }
395 }
396}