gix_transport/client/blocking_io/http/curl/
remote.rs

1use std::{
2    io,
3    io::{Read, Write},
4    sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
5    thread,
6    time::Duration,
7};
8
9use curl::easy::{Auth, Easy2};
10use gix_features::io::pipe;
11
12use crate::client::blocking_io::http::{
13    self,
14    curl::curl_is_spurious,
15    curl::Error,
16    options::{FollowRedirects, HttpVersion, ProxyAuthMethod, SslVersion},
17    redirect,
18    traits::PostBodyDataKind,
19};
20
21enum StreamOrBuffer {
22    Stream(pipe::Reader),
23    Buffer(std::io::Cursor<Vec<u8>>),
24}
25
26#[derive(Default)]
27struct Handler {
28    send_header: Option<pipe::Writer>,
29    send_data: Option<pipe::Writer>,
30    receive_body: Option<StreamOrBuffer>,
31    checked_status: bool,
32    last_status: usize,
33    follow: FollowRedirects,
34}
35
36impl Handler {
37    fn reset(&mut self) {
38        self.checked_status = false;
39        self.last_status = 0;
40        self.follow = FollowRedirects::default();
41    }
42    fn parse_status_inner(data: &[u8]) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
43        let code = data
44            .split(|b| *b == b' ')
45            .nth(1)
46            .ok_or("Expected HTTP/<VERSION> STATUS")?;
47        let code = std::str::from_utf8(code)?;
48        code.parse().map_err(Into::into)
49    }
50    fn parse_status(data: &[u8], follow: FollowRedirects) -> Option<(usize, Box<dyn std::error::Error + Send + Sync>)> {
51        let valid_end = match follow {
52            FollowRedirects::Initial | FollowRedirects::All => 308,
53            FollowRedirects::None => 299,
54        };
55        match Self::parse_status_inner(data) {
56            Ok(status) if !(200..=valid_end).contains(&status) => {
57                Some((status, format!("Received HTTP status {status}").into()))
58            }
59            Ok(_) => None,
60            Err(err) => Some((500, err)),
61        }
62    }
63}
64
65impl curl::easy::Handler for Handler {
66    fn write(&mut self, data: &[u8]) -> Result<usize, curl::easy::WriteError> {
67        drop(self.send_header.take()); // signal header readers to stop trying
68        match self.send_data.as_mut() {
69            Some(writer) => writer.write_all(data).map(|_| data.len()).or(Ok(0)),
70            None => Ok(0), // nothing more to receive, reader is done
71        }
72    }
73    fn read(&mut self, data: &mut [u8]) -> Result<usize, curl::easy::ReadError> {
74        match self.receive_body.as_mut() {
75            Some(StreamOrBuffer::Stream(reader)) => reader.read(data).map_err(|_err| curl::easy::ReadError::Abort),
76            Some(StreamOrBuffer::Buffer(cursor)) => cursor.read(data).map_err(|_err| curl::easy::ReadError::Abort),
77            None => Ok(0), // nothing more to read/writer depleted
78        }
79    }
80
81    fn header(&mut self, data: &[u8]) -> bool {
82        if let Some(writer) = self.send_header.as_mut() {
83            if self.checked_status {
84                writer.write_all(data).ok();
85            } else {
86                self.checked_status = true;
87                self.last_status = 200;
88                if let Some((status, err)) = Handler::parse_status(data, self.follow) {
89                    self.last_status = status;
90                    writer
91                        .channel
92                        .send(Err(io::Error::new(
93                            if status == 401 {
94                                io::ErrorKind::PermissionDenied
95                            } else if (500..600).contains(&status) {
96                                io::ErrorKind::ConnectionAborted
97                            } else {
98                                io::ErrorKind::Other
99                            },
100                            err,
101                        )))
102                        .ok();
103                }
104            }
105        }
106        true
107    }
108}
109
110pub struct Request {
111    pub url: String,
112    pub base_url: String,
113    pub headers: curl::easy::List,
114    pub upload_body_kind: Option<PostBodyDataKind>,
115    pub config: http::Options,
116}
117
118pub struct Response {
119    pub headers: pipe::Reader,
120    pub body: pipe::Reader,
121    pub upload_body: pipe::Writer,
122}
123
124pub fn new() -> (
125    thread::JoinHandle<Result<(), Error>>,
126    SyncSender<Request>,
127    Receiver<Response>,
128) {
129    let (req_send, req_recv) = sync_channel(0);
130    let (res_send, res_recv) = sync_channel(0);
131    let handle = std::thread::spawn(move || -> Result<(), Error> {
132        let mut handle = Easy2::new(Handler::default());
133        // We don't wait for the possibility for pipelining to become clear, and curl tries to reuse connections by default anyway.
134        handle.pipewait(false)?;
135        handle.tcp_keepalive(true)?;
136
137        let mut follow = None;
138        let mut redirected_base_url = None::<String>;
139
140        for Request {
141            url,
142            base_url,
143            mut headers,
144            upload_body_kind,
145            config:
146                http::Options {
147                    extra_headers,
148                    follow_redirects,
149                    low_speed_limit_bytes_per_second,
150                    low_speed_time_seconds,
151                    connect_timeout,
152                    proxy,
153                    no_proxy,
154                    proxy_auth_method,
155                    user_agent,
156                    proxy_authenticate,
157                    verbose,
158                    ssl_ca_info,
159                    ssl_version,
160                    ssl_verify,
161                    http_version,
162                    backend,
163                },
164        } in req_recv
165        {
166            let effective_url = redirect::swap_tails(redirected_base_url.as_deref(), &base_url, url.clone());
167            handle.url(&effective_url)?;
168
169            handle.post(upload_body_kind.is_some())?;
170            for header in extra_headers {
171                headers.append(&header)?;
172            }
173            // needed to avoid sending Expect: 100-continue, which adds another response and only CURL wants that
174            headers.append("Expect:")?;
175            handle.verbose(verbose)?;
176
177            if let Some(ca_info) = ssl_ca_info {
178                handle.cainfo(ca_info)?;
179            }
180
181            if let Some(ref mut curl_options) = backend.as_ref().and_then(|backend| backend.lock().ok()) {
182                if let Some(opts) = curl_options.downcast_mut::<super::Options>() {
183                    if let Some(enabled) = opts.schannel_check_revoke {
184                        handle.ssl_options(curl::easy::SslOpt::new().no_revoke(!enabled))?;
185                    }
186                }
187            }
188
189            if let Some(ssl_version) = ssl_version {
190                let (min, max) = ssl_version.min_max();
191                if min == max {
192                    handle.ssl_version(to_curl_ssl_version(min))?;
193                } else {
194                    handle.ssl_min_max_version(to_curl_ssl_version(min), to_curl_ssl_version(max))?;
195                }
196            }
197
198            handle.ssl_verify_peer(ssl_verify)?;
199            handle.ssl_verify_host(ssl_verify)?;
200
201            if let Some(http_version) = http_version {
202                let version = match http_version {
203                    HttpVersion::V1_1 => curl::easy::HttpVersion::V11,
204                    HttpVersion::V2 => curl::easy::HttpVersion::V2,
205                };
206                // Failing to set the version isn't critical, and may indeed fail depending on the version
207                // of libcurl we are built against.
208                // Furthermore, `git` itself doesn't actually check for errors when configuring curl at all,
209                // treating all or most flags as non-critical.
210                handle.http_version(version).ok();
211            }
212
213            let mut proxy_auth_action = None;
214            if let Some(proxy) = proxy {
215                handle.proxy(&proxy)?;
216                let proxy_type = if proxy.starts_with("socks5h") {
217                    curl::easy::ProxyType::Socks5Hostname
218                } else if proxy.starts_with("socks5") {
219                    curl::easy::ProxyType::Socks5
220                } else if proxy.starts_with("socks4a") {
221                    curl::easy::ProxyType::Socks4a
222                } else if proxy.starts_with("socks") {
223                    curl::easy::ProxyType::Socks4
224                } else {
225                    curl::easy::ProxyType::Http
226                };
227                handle.proxy_type(proxy_type)?;
228
229                if let Some((obtain_creds_action, authenticate)) = proxy_authenticate {
230                    let creds = authenticate.lock().expect("no panics in other threads")(obtain_creds_action)?
231                        .expect("action to fetch credentials");
232                    handle.proxy_username(&creds.identity.username)?;
233                    handle.proxy_password(&creds.identity.password)?;
234                    proxy_auth_action = Some((creds.next, authenticate));
235                }
236            }
237            if let Some(no_proxy) = no_proxy {
238                handle.noproxy(&no_proxy)?;
239            }
240            if let Some(user_agent) = user_agent {
241                handle.useragent(&user_agent)?;
242            }
243            handle.transfer_encoding(false)?;
244            if let Some(timeout) = connect_timeout {
245                handle.connect_timeout(timeout)?;
246            }
247            {
248                let mut auth = Auth::new();
249                match proxy_auth_method {
250                    ProxyAuthMethod::AnyAuth => auth
251                        .basic(true)
252                        .digest(true)
253                        .digest_ie(true)
254                        .gssnegotiate(true)
255                        .ntlm(true)
256                        .aws_sigv4(true),
257                    ProxyAuthMethod::Basic => auth.basic(true),
258                    ProxyAuthMethod::Digest => auth.digest(true),
259                    ProxyAuthMethod::Negotiate => auth.digest_ie(true),
260                    ProxyAuthMethod::Ntlm => auth.ntlm(true),
261                };
262                handle.proxy_auth(&auth)?;
263            }
264            handle.tcp_keepalive(true)?;
265
266            if low_speed_time_seconds > 0 && low_speed_limit_bytes_per_second > 0 {
267                handle.low_speed_limit(low_speed_limit_bytes_per_second)?;
268                handle.low_speed_time(Duration::from_secs(low_speed_time_seconds))?;
269            }
270            let (receive_data, receive_headers, send_body, mut receive_body) = {
271                let handler = handle.get_mut();
272                let (send, receive_data) = pipe::unidirectional(1);
273                handler.send_data = Some(send);
274                let (send, receive_headers) = pipe::unidirectional(1);
275                handler.send_header = Some(send);
276                let (send_body, receive_body) = pipe::unidirectional(0);
277                (receive_data, receive_headers, send_body, receive_body)
278            };
279
280            let follow = follow.get_or_insert(follow_redirects);
281            handle.get_mut().follow = *follow;
282            handle.follow_location(matches!(*follow, FollowRedirects::Initial | FollowRedirects::All))?;
283
284            if *follow == FollowRedirects::Initial {
285                *follow = FollowRedirects::None;
286            }
287
288            if res_send
289                .send(Response {
290                    headers: receive_headers,
291                    body: receive_data,
292                    upload_body: send_body,
293                })
294                .is_err()
295            {
296                break;
297            }
298
299            handle.get_mut().receive_body = Some(match upload_body_kind {
300                Some(PostBodyDataKind::Unbounded) | None => StreamOrBuffer::Stream(receive_body),
301                Some(PostBodyDataKind::BoundedAndFitsIntoMemory) => {
302                    let mut buf = Vec::<u8>::with_capacity(512);
303                    receive_body.read_to_end(&mut buf)?;
304                    handle.post_field_size(buf.len() as u64)?;
305                    drop(receive_body);
306                    StreamOrBuffer::Buffer(std::io::Cursor::new(buf))
307                }
308            });
309            handle.http_headers(headers)?;
310
311            if let Err(err) = handle.perform() {
312                let handler = handle.get_mut();
313                handler.reset();
314
315                if let Some((action, authenticate)) = proxy_auth_action {
316                    authenticate.lock().expect("no panics in other threads")(action.erase()).ok();
317                }
318                let err = Err(io::Error::new(
319                    if curl_is_spurious(&err) {
320                        std::io::ErrorKind::ConnectionReset
321                    } else {
322                        std::io::ErrorKind::Other
323                    },
324                    err,
325                ));
326                handler.receive_body.take();
327                match (handler.send_header.take(), handler.send_data.take()) {
328                    (Some(header), mut data) => {
329                        if let Err(TrySendError::Disconnected(err) | TrySendError::Full(err)) =
330                            header.channel.try_send(err)
331                        {
332                            if let Some(body) = data.take() {
333                                body.channel.try_send(err).ok();
334                            }
335                        }
336                    }
337                    (None, Some(body)) => {
338                        body.channel.try_send(err).ok();
339                    }
340                    (None, None) => {}
341                }
342            } else {
343                let handler = handle.get_mut();
344                if let Some((action, authenticate)) = proxy_auth_action {
345                    authenticate.lock().expect("no panics in other threads")(if handler.last_status == 200 {
346                        action.store()
347                    } else {
348                        action.erase()
349                    })?;
350                }
351                handler.reset();
352                handler.receive_body.take();
353                handler.send_header.take();
354                handler.send_data.take();
355                let actual_url = handle
356                    .effective_url()?
357                    .expect("effective url is present and valid UTF-8");
358                if actual_url != effective_url {
359                    redirected_base_url = redirect::base_url(actual_url, &base_url, url)?.into();
360                }
361            }
362        }
363        Ok(())
364    });
365    (handle, req_send, res_recv)
366}
367
368fn to_curl_ssl_version(vers: SslVersion) -> curl::easy::SslVersion {
369    use curl::easy::SslVersion::*;
370    match vers {
371        SslVersion::Default => Default,
372        SslVersion::TlsV1 => Tlsv1,
373        SslVersion::SslV2 => Sslv2,
374        SslVersion::SslV3 => Sslv3,
375        SslVersion::TlsV1_0 => Tlsv10,
376        SslVersion::TlsV1_1 => Tlsv11,
377        SslVersion::TlsV1_2 => Tlsv12,
378        SslVersion::TlsV1_3 => Tlsv13,
379    }
380}
381
382impl From<Error> for http::Error {
383    fn from(err: Error) -> Self {
384        http::Error::Detail {
385            description: err.to_string(),
386        }
387    }
388}
389
390impl From<curl::Error> for http::Error {
391    fn from(err: curl::Error) -> Self {
392        http::Error::Detail {
393            description: err.to_string(),
394        }
395    }
396}