bollard_next/
docker.rs

1use std::convert::Infallible;
2#[cfg(feature = "ssl_providerless")]
3use std::fs;
4use std::future::Future;
5#[cfg(feature = "ssl_providerless")]
6use std::io;
7#[cfg(any(feature = "pipe", feature = "ssl_providerless"))]
8use std::path::Path;
9#[cfg(feature = "ssl_providerless")]
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::AtomicUsize;
13use std::sync::atomic::Ordering;
14use std::sync::Arc;
15use std::time::Duration;
16use std::{cmp, env, fmt};
17
18use futures_core::Stream;
19use futures_util::future::FutureExt;
20use futures_util::future::TryFutureExt;
21use futures_util::stream::TryStreamExt;
22use futures_util::StreamExt;
23use http::header::CONTENT_TYPE;
24use http::request::Builder;
25use http_body_util::{BodyExt, Full, StreamBody};
26use hyper::body::{Frame, Incoming};
27use hyper::{self, body::Bytes, Method, Request, Response, StatusCode};
28#[cfg(feature = "ssl_providerless")]
29use hyper_rustls::HttpsConnector;
30#[cfg(any(feature = "http", test))]
31use hyper_util::{
32    client::legacy::{connect::HttpConnector, Client},
33    rt::TokioExecutor,
34};
35#[cfg(all(feature = "pipe", unix))]
36use hyperlocal::UnixConnector;
37use log::{debug, trace};
38#[cfg(feature = "ssl_providerless")]
39use rustls::{crypto::CryptoProvider, sign::CertifiedKey};
40#[cfg(feature = "ssl_providerless")]
41use rustls_pki_types::{CertificateDer, PrivateKeyDer};
42use serde_derive::{Deserialize, Serialize};
43use tokio::io::{split, AsyncRead, AsyncWrite};
44use tokio_util::codec::FramedRead;
45
46use crate::container::LogOutput;
47use crate::errors::Error;
48use crate::errors::Error::*;
49use crate::read::{
50    AsyncUpgraded, IncomingStream, JsonLineDecoder, NewlineLogOutputDecoder, StreamReader,
51};
52use crate::uri::Uri;
53#[cfg(all(feature = "pipe", windows))]
54use hyper_named_pipe::NamedPipeConnector;
55
56use crate::auth::{base64_url_encode, DockerCredentialsHeader};
57use serde::de::DeserializeOwned;
58use serde::ser::Serialize;
59
60/// The default `DOCKER_SOCKET` address that we will try to connect to.
61#[cfg(unix)]
62pub const DEFAULT_SOCKET: &str = "unix:///var/run/docker.sock";
63
64/// The default `DOCKER_NAMED_PIPE` address that a windows client will try to connect to.
65#[cfg(windows)]
66pub const DEFAULT_NAMED_PIPE: &str = "npipe:////./pipe/docker_engine";
67
68/// The default `DOCKER_TCP_ADDRESS` address that we will try to connect to.
69#[cfg(feature = "http")]
70pub const DEFAULT_TCP_ADDRESS: &str = "tcp://localhost:2375";
71
72/// The default `DOCKER_HOST` address that we will try to connect to.
73#[cfg(unix)]
74pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_SOCKET;
75
76/// The default `DOCKER_HOST` address that we will try to connect to.
77#[cfg(windows)]
78pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_NAMED_PIPE;
79
80/// Default timeout for all requests is 2 minutes.
81#[cfg(feature = "http")]
82const DEFAULT_TIMEOUT: u64 = 120;
83
84/// Default Client Version to communicate with the server.
85pub const API_DEFAULT_VERSION: &ClientVersion = &ClientVersion {
86    major_version: 1,
87    minor_version: 45,
88};
89
90#[derive(Debug, Clone)]
91pub(crate) enum ClientType {
92    #[cfg(all(feature = "pipe", unix))]
93    Unix,
94    #[cfg(feature = "http")]
95    Http,
96    #[cfg(feature = "ssl_providerless")]
97    SSL,
98    #[cfg(all(feature = "pipe", windows))]
99    NamedPipe,
100    Custom {
101        scheme: String,
102    },
103}
104
105/// `Request` from bollard used with `CustomTransport`
106pub type BollardRequest = Request<BodyType>;
107
108type TransportReturnTy =
109    Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>;
110
111/// `CustomTransport` trait
112pub trait CustomTransport: Send + Sync {
113    /// Make a request, this returns a future
114    fn request(&self, request: BollardRequest) -> TransportReturnTy;
115}
116
117// auto impl for Fn(Request) -> Future<Output = Result<_, _>
118impl<Callback, ReturnTy> CustomTransport for Callback
119where
120    Callback: Fn(BollardRequest) -> ReturnTy + Send + Sync,
121    ReturnTy: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
122{
123    fn request(&self, request: BollardRequest) -> TransportReturnTy {
124        Box::pin(self(request))
125    }
126}
127
128/// Transport is the type representing the means of communication
129/// with the Docker daemon.
130///
131/// Each transport usually encapsulate a hyper client
132/// with various Connect traits fulfilled.
133pub(crate) enum Transport {
134    #[cfg(feature = "http")]
135    Http {
136        client: Client<HttpConnector, BodyType>,
137    },
138    #[cfg(feature = "ssl_providerless")]
139    Https {
140        client: Client<HttpsConnector<HttpConnector>, BodyType>,
141    },
142    #[cfg(all(feature = "pipe", unix))]
143    Unix {
144        client: Client<UnixConnector, BodyType>,
145    },
146    #[cfg(all(feature = "pipe", windows))]
147    NamedPipe {
148        client: Client<NamedPipeConnector, BodyType>,
149    },
150    #[cfg(test)]
151    Mock {
152        client: Client<yup_hyper_mock::HostToReplyConnector, BodyType>,
153    },
154    Custom {
155        transport: Box<dyn CustomTransport>,
156    },
157}
158
159impl fmt::Debug for Transport {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        match self {
162            #[cfg(feature = "http")]
163            Transport::Http { .. } => write!(f, "HTTP"),
164            #[cfg(feature = "ssl_providerless")]
165            Transport::Https { .. } => write!(f, "HTTPS(rustls)"),
166            #[cfg(all(feature = "pipe", unix))]
167            Transport::Unix { .. } => write!(f, "Unix"),
168            #[cfg(all(feature = "pipe", windows))]
169            Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
170            #[cfg(test)]
171            Transport::Mock { .. } => write!(f, "Mock"),
172            Transport::Custom { .. } => write!(f, "Custom"),
173        }
174    }
175}
176
177#[derive(Debug, Copy, Clone, PartialEq)]
178/// Advisory version stub to use for communicating with the Server. The docker server will error if
179/// a higher client version is used than is compatible with the server. Beware also, that the
180/// docker server will return stubs for a higher version than the version set when communicating.
181///
182/// See also [negotiate_version](Docker::negotiate_version()), and the `client_version` argument when instantiating the
183/// [Docker] client instance.
184pub struct ClientVersion {
185    /// The major version number.
186    pub major_version: usize,
187    /// The minor version number.
188    pub minor_version: usize,
189}
190
191pub(crate) enum MaybeClientVersion {
192    Some(ClientVersion),
193    None,
194}
195
196impl<T: Into<String>> From<T> for MaybeClientVersion {
197    fn from(s: T) -> MaybeClientVersion {
198        match s
199            .into()
200            .split('.')
201            .map(|v| v.parse::<usize>())
202            .collect::<Vec<Result<usize, std::num::ParseIntError>>>()
203            .as_slice()
204        {
205            [Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
206                major_version: first.to_owned(),
207                minor_version: second.to_owned(),
208            }),
209            _ => MaybeClientVersion::None,
210        }
211    }
212}
213
214impl fmt::Display for ClientVersion {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        write!(f, "{}.{}", self.major_version, self.minor_version)
217    }
218}
219
220impl PartialOrd for ClientVersion {
221    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
222        match self.major_version.partial_cmp(&other.major_version) {
223            Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
224            res => res,
225        }
226    }
227}
228
229impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
230    fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
231        ClientVersion {
232            major_version: tpl.0.load(Ordering::Relaxed),
233            minor_version: tpl.1.load(Ordering::Relaxed),
234        }
235    }
236}
237
238pub(crate) fn serialize_as_json<T, S>(t: &T, s: S) -> Result<S::Ok, S::Error>
239where
240    T: Serialize,
241    S: serde::Serializer,
242{
243    s.serialize_str(
244        &serde_json::to_string(t).map_err(|e| serde::ser::Error::custom(format!("{e}")))?,
245    )
246}
247
248pub(crate) fn serialize_join_newlines<S>(t: &[&str], s: S) -> Result<S::Ok, S::Error>
249where
250    S: serde::Serializer,
251{
252    s.serialize_str(&t.join("\n"))
253}
254
255#[cfg(feature = "time")]
256pub fn deserialize_rfc3339<'de, D: serde::Deserializer<'de>>(
257    d: D,
258) -> Result<time::OffsetDateTime, D::Error> {
259    let s: String = serde::Deserialize::deserialize(d)?;
260    time::OffsetDateTime::parse(&s, &time::format_description::well_known::Rfc3339)
261        .map_err(|e| serde::de::Error::custom(format!("{:?}", e)))
262}
263
264#[cfg(feature = "time")]
265pub fn serialize_rfc3339<S: serde::Serializer>(
266    date: &time::OffsetDateTime,
267    s: S,
268) -> Result<S::Ok, S::Error> {
269    s.serialize_str(
270        &date
271            .format(&time::format_description::well_known::Rfc3339)
272            .map_err(|e| serde::ser::Error::custom(format!("{:?}", e)))?,
273    )
274}
275
276#[cfg(feature = "time")]
277pub(crate) fn serialize_as_timestamp<S>(
278    opt: &Option<crate::models::BollardDate>,
279    s: S,
280) -> Result<S::Ok, S::Error>
281where
282    S: serde::Serializer,
283{
284    match opt {
285        Some(t) => s.serialize_str(&format!(
286            "{}.{}",
287            t.unix_timestamp(),
288            t.unix_timestamp_nanos()
289        )),
290        None => s.serialize_str(""),
291    }
292}
293
294#[cfg(all(feature = "chrono", not(feature = "time")))]
295pub(crate) fn serialize_as_timestamp<S>(
296    opt: &Option<crate::models::BollardDate>,
297    s: S,
298) -> Result<S::Ok, S::Error>
299where
300    S: serde::Serializer,
301{
302    match opt {
303        Some(t) => s.serialize_str(&format!("{}.{}", t.timestamp(), t.timestamp_subsec_nanos())),
304        None => s.serialize_str(""),
305    }
306}
307
308#[derive(Debug)]
309/// ---
310///
311/// # Docker
312///
313/// The main interface for calling the Docker API. Construct a new Docker instance using one of the
314/// connect methods:
315///  - [`Docker::connect_with_http_defaults`](Docker::connect_with_http_defaults())
316///  - [`Docker::connect_with_named_pipe_defaults`](Docker::connect_with_named_pipe_defaults())
317///  - [`Docker::connect_with_ssl_defaults`](Docker::connect_with_ssl_defaults())
318///  - [`Docker::connect_with_unix_defaults`](Docker::connect_with_unix_defaults())
319///  - [`Docker::connect_with_local_defaults`](Docker::connect_with_local_defaults())
320pub struct Docker {
321    pub(crate) transport: Arc<Transport>,
322    pub(crate) client_type: ClientType,
323    pub(crate) client_addr: String,
324    pub(crate) client_timeout: u64,
325    pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
326}
327
328impl Clone for Docker {
329    fn clone(&self) -> Docker {
330        Docker {
331            transport: self.transport.clone(),
332            client_type: self.client_type.clone(),
333            client_addr: self.client_addr.clone(),
334            client_timeout: self.client_timeout,
335            version: self.version.clone(),
336        }
337    }
338}
339
340/// Internal model: Docker Server JSON payload when an error is emitted
341#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
342struct DockerServerErrorMessage {
343    message: String,
344}
345
346#[cfg(feature = "ssl_providerless")]
347#[derive(Debug)]
348struct DockerClientCertResolver {
349    ssl_key: PathBuf,
350    ssl_cert: PathBuf,
351}
352
353#[cfg(feature = "ssl_providerless")]
354impl DockerClientCertResolver {
355    /// The default directory in which to look for our Docker certificate
356    /// files.
357    pub fn default_cert_path() -> Result<PathBuf, Error> {
358        let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
359        if let Ok(ref path) = from_env {
360            Ok(Path::new(path).to_owned())
361        } else {
362            let home = home::home_dir().ok_or_else(|| NoHomePathError)?;
363            Ok(home.join(".docker"))
364        }
365    }
366
367    fn open_buffered(path: &Path) -> Result<io::BufReader<fs::File>, Error> {
368        Ok(io::BufReader::new(fs::File::open(path)?))
369    }
370
371    fn certs(path: &Path) -> Result<Vec<CertificateDer<'static>>, Error> {
372        Ok(rustls_pemfile::certs(&mut Self::open_buffered(path)?)
373            .collect::<Result<Vec<CertificateDer<'static>>, io::Error>>()?)
374    }
375
376    fn keys(path: &Path) -> Result<Vec<PrivateKeyDer<'static>>, Error> {
377        let mut rdr = Self::open_buffered(path)?;
378        let mut keys = vec![];
379        if let Some(key) = rustls_pemfile::private_key(&mut rdr).map_err(|_| CertPathError {
380            path: path.to_path_buf(),
381        })? {
382            keys.push(key);
383        }
384
385        Ok(keys)
386    }
387
388    fn docker_client_key(&self) -> Result<Arc<CertifiedKey>, Error> {
389        let all_certs = Self::certs(&self.ssl_cert)?;
390
391        let mut all_keys = Self::keys(&self.ssl_key)?;
392        let key = if all_keys.len() == 1 {
393            all_keys.remove(0)
394        } else {
395            return Err(CertMultipleKeys {
396                count: all_keys.len(),
397                path: self.ssl_key.to_owned(),
398            });
399        };
400        let signing_key = CryptoProvider::get_default()
401            .expect("no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point")
402            .key_provider
403            .load_private_key(key)
404            .map_err(|_| CertParseError {
405                path: self.ssl_key.to_owned(),
406            })?;
407
408        Ok(Arc::new(CertifiedKey::new(all_certs, signing_key)))
409    }
410}
411
412#[cfg(feature = "ssl_providerless")]
413impl rustls::client::ResolvesClientCert for DockerClientCertResolver {
414    fn resolve(&self, _: &[&[u8]], _: &[rustls::SignatureScheme]) -> Option<Arc<CertifiedKey>> {
415        self.docker_client_key().ok()
416    }
417
418    fn has_certs(&self) -> bool {
419        true
420    }
421}
422
423/// A Docker implementation typed to connect to a secure HTTPS connection using the `rustls`
424/// library.
425#[cfg(feature = "ssl_providerless")]
426impl Docker {
427    /// Connect using secure HTTPS using defaults that are signalled by environment variables.
428    ///
429    /// # Defaults
430    ///
431    ///  - The connection url is sourced from the `DOCKER_HOST` environment variable.
432    ///  - The certificate directory is sourced from the `DOCKER_CERT_PATH` environment variable.
433    ///  - Certificates are named `key.pem`, `cert.pem` and `ca.pem` to indicate the private key,
434    ///    the server certificate and the certificate chain respectively.
435    ///  - The request timeout defaults to 2 minutes.
436    ///
437    /// # Examples
438    ///
439    /// ```rust,no_run
440    /// use bollard_next::Docker;
441    ///
442    /// use futures_util::future::TryFutureExt;
443    ///
444    /// let connection = Docker::connect_with_ssl_defaults().unwrap();
445    /// connection.ping()
446    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
447    /// ```
448    ///
449    /// # Panics
450    ///
451    /// This function will panic if neither `ssl` nor `aws-lc-rs` features are activated,
452    /// or if you are using the `ssl_providerless` feature without installing the custom cryptographic
453    /// provider before with [`rustls::crypto::CryptoProvider::install_default()`]
454    pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
455        let cert_path = DockerClientCertResolver::default_cert_path()?;
456        Docker::connect_with_ssl(
457            if let Ok(ref host) = env::var("DOCKER_HOST") {
458                host
459            } else {
460                DEFAULT_TCP_ADDRESS
461            },
462            &cert_path.join("key.pem"),
463            &cert_path.join("cert.pem"),
464            &cert_path.join("ca.pem"),
465            DEFAULT_TIMEOUT,
466            API_DEFAULT_VERSION,
467        )
468    }
469
470    /// Connect using secure HTTPS.
471    ///
472    /// # Arguments
473    ///
474    ///  - `addr`: the connection url.
475    ///  - `ssl_key`: the private key path.
476    ///  - `ssl_cert`: the server certificate path.
477    ///  - `ssl_ca`: the certificate chain path.
478    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
479    ///  - `client_version`: the client version to communicate with the server.
480    ///
481    /// # Examples
482    ///
483    /// ```rust,no_run
484    /// use bollard_next::{API_DEFAULT_VERSION, Docker};
485    ///
486    /// use std::path::Path;
487    ///
488    /// use futures_util::future::TryFutureExt;
489    ///
490    /// let connection = Docker::connect_with_ssl(
491    ///     "tcp://localhost:2375/",
492    ///     Path::new("/certs/key.pem"),
493    ///     Path::new("/certs/cert.pem"),
494    ///     Path::new("/certs/ca.pem"),
495    ///     120,
496    ///     API_DEFAULT_VERSION).unwrap();
497    /// connection.ping()
498    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
499    /// ```
500    ///
501    /// # Panics
502    ///
503    /// This function will panic if neither `ssl` nor `aws-lc-rs` features are activated,
504    /// or if you are using the `ssl_providerless` feature without installing the custom cryptographic
505    /// provider before with [`rustls::crypto::CryptoProvider::install_default()`]
506    pub fn connect_with_ssl(
507        addr: &str,
508        ssl_key: &Path,
509        ssl_cert: &Path,
510        ssl_ca: &Path,
511        timeout: u64,
512        client_version: &ClientVersion,
513    ) -> Result<Docker, Error> {
514        // This ensures that using docker-machine-esque addresses work with Hyper.
515        let client_addr = addr.replacen("tcp://", "", 1).replacen("https://", "", 1);
516
517        let mut root_store = rustls::RootCertStore::empty();
518
519        #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
520        let native_certs = rustls_native_certs::load_native_certs();
521
522        #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
523        if native_certs.errors.is_empty() {
524            for cert in native_certs.certs {
525                root_store
526                    .add(cert)
527                    .map_err(|err| NoNativeCertsError { err })?
528            }
529        } else {
530            return Err(LoadNativeCertsErrors {
531                errors: native_certs.errors,
532            });
533        }
534        #[cfg(any(feature = "test_ssl", feature = "webpki"))]
535        root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
536
537        let mut ca_pem = io::Cursor::new(fs::read(ssl_ca).map_err(|_| CertPathError {
538            path: ssl_ca.to_owned(),
539        })?);
540
541        root_store.add_parsable_certificates(
542            rustls_pemfile::certs(&mut ca_pem).collect::<Result<Vec<_>, _>>()?,
543        );
544
545        let config = rustls::ClientConfig::builder()
546            .with_root_certificates(root_store)
547            .with_client_cert_resolver(Arc::new(DockerClientCertResolver {
548                ssl_key: ssl_key.to_owned(),
549                ssl_cert: ssl_cert.to_owned(),
550            }));
551
552        let mut http_connector = HttpConnector::new();
553        http_connector.enforce_http(false);
554
555        let https_connector: HttpsConnector<HttpConnector> =
556            HttpsConnector::from((http_connector, config));
557
558        let mut client_builder = Client::builder(TokioExecutor::new());
559        client_builder.pool_max_idle_per_host(0);
560
561        let client = client_builder.build(https_connector);
562        let transport = Transport::Https { client };
563        let docker = Docker {
564            transport: Arc::new(transport),
565            client_type: ClientType::SSL,
566            client_addr,
567            client_timeout: timeout,
568            version: Arc::new((
569                AtomicUsize::new(client_version.major_version),
570                AtomicUsize::new(client_version.minor_version),
571            )),
572        };
573
574        Ok(docker)
575    }
576}
577
578#[cfg(feature = "http")]
579/// A Docker implementation typed to connect to an unsecure Http connection.
580impl Docker {
581    /// Connect using unsecured HTTP using defaults that are signalled by environment variables.
582    ///
583    /// # Defaults
584    ///
585    ///  - The connection url is sourced from the `DOCKER_HOST` environment variable, and defaults
586    ///    to `localhost:2375`.
587    ///  - The number of threads used for the HTTP connection pool defaults to 1.
588    ///  - The request timeout defaults to 2 minutes.
589    ///
590    /// # Examples
591    ///
592    /// ```rust,no_run
593    /// use bollard_next::Docker;
594    ///
595    /// use futures_util::future::TryFutureExt;
596    ///
597    /// let connection = Docker::connect_with_http_defaults().unwrap();
598    /// connection.ping()
599    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
600    /// ```
601    pub fn connect_with_http_defaults() -> Result<Docker, Error> {
602        let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
603        Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
604    }
605
606    /// Connect using unsecured HTTP.
607    ///
608    /// # Arguments
609    ///
610    ///  - `addr`: connection url including scheme and port.
611    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
612    ///  - `client_version`: the client version to communicate with the server.
613    ///
614    /// # Examples
615    ///
616    /// ```rust,no_run
617    /// use bollard_next::{API_DEFAULT_VERSION, Docker};
618    ///
619    /// use futures_util::future::TryFutureExt;
620    ///
621    /// let connection = Docker::connect_with_http(
622    ///                    "http://my-custom-docker-server:2735", 4, API_DEFAULT_VERSION)
623    ///                    .unwrap();
624    /// connection.ping()
625    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
626    /// ```
627    pub fn connect_with_http(
628        addr: &str,
629        timeout: u64,
630        client_version: &ClientVersion,
631    ) -> Result<Docker, Error> {
632        // This ensures that using docker-machine-esque addresses work with Hyper.
633        let client_addr = addr.replacen("tcp://", "", 1).replacen("http://", "", 1);
634
635        let http_connector = HttpConnector::new();
636
637        let mut client_builder = Client::builder(TokioExecutor::new());
638        client_builder.pool_max_idle_per_host(0);
639
640        let client = client_builder.build(http_connector);
641        let transport = Transport::Http { client };
642        let docker = Docker {
643            transport: Arc::new(transport),
644            client_type: ClientType::Http,
645            client_addr,
646            client_timeout: timeout,
647            version: Arc::new((
648                AtomicUsize::new(client_version.major_version),
649                AtomicUsize::new(client_version.minor_version),
650            )),
651        };
652
653        Ok(docker)
654    }
655}
656
657/// A Docker implementation typed to custom connector.
658impl Docker {
659    /// Connect using custom transport implementation.
660    /// It has default implementation for `Fn(Request) -> Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + Sync`
661    ///
662    /// # Arguments
663    ///
664    ///  - `transport`: transport.
665    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
666    ///  - `client_version`: the client version to communicate with the server.
667    ///
668    /// # Examples
669    ///
670    /// ```rust,no_run
671    /// use bollard::{API_DEFAULT_VERSION, Docker, BollardRequest};
672    /// use futures_util::future::TryFutureExt;
673    /// use futures_util::FutureExt;
674
675    /// let http_connector = hyper_util::client::legacy::connect::HttpConnector::new();
676    ///
677    /// let mut client_builder = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
678    /// client_builder.pool_max_idle_per_host(0);
679    ///
680    /// let client = std::sync::Arc::new(client_builder.build(http_connector));
681    ///
682    /// let connection = Docker::connect_with_custom_transport(
683    ///     move |req: BollardRequest| {
684    ///         let client = std::sync::Arc::clone(&client);
685    ///         Box::pin(async move {
686    ///             let (p, b) = req.into_parts();
687    ///             // let _prev = p.headers.insert("host", host);
688    ///             // let mut uri = p.uri.into_parts();
689    ///             //uri.path_and_query = uri.path_and_query.map(|paq|
690    ///             //   uri::PathAndQuery::try_from("/docker".to_owned() + paq.as_str())
691    ///             // ).transpose().map_err(bollard::errors::Error::from)?;
692    ///             // p.uri = uri.try_into().map_err(bollard::errors::Error::from)?;
693    ///             let req = BollardRequest::from_parts(p, b);
694    ///             client.request(req).await.map_err(bollard::errors::Error::from)
695    ///         })
696    ///     },
697    ///     Some("http://my-custom-docker-server:2735"),
698    ///     4,
699    ///     bollard::API_DEFAULT_VERSION,
700    /// ).unwrap();
701    ///
702    /// connection.ping()
703    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
704    /// ```
705    pub fn connect_with_custom_transport<S: Into<String>>(
706        transport: impl CustomTransport + 'static,
707        client_addr: Option<S>,
708        timeout: u64,
709        client_version: &ClientVersion,
710    ) -> Result<Docker, Error> {
711        let client_addr = client_addr.map(Into::into).unwrap_or_default();
712        let (scheme, client_addr) = client_addr
713            .split_once("://")
714            .unwrap_or(("", client_addr.as_str()));
715        let client_addr = client_addr.to_owned();
716        let scheme = scheme.to_owned();
717        let transport = Transport::Custom {
718            transport: Box::new(transport),
719        };
720        let docker = Docker {
721            transport: Arc::new(transport),
722            client_type: ClientType::Custom { scheme },
723            client_addr,
724            client_timeout: timeout,
725            version: Arc::new((
726                AtomicUsize::new(client_version.major_version),
727                AtomicUsize::new(client_version.minor_version),
728            )),
729        };
730
731        Ok(docker)
732    }
733}
734
735/// A Docker implementation that wraps away which local implementation we are calling.
736#[cfg(all(feature = "pipe", any(unix, windows)))]
737impl Docker {
738    /// Connect using to either a Unix socket or a Windows named pipe using defaults common to the
739    /// standard docker configuration.
740    ///
741    /// # Defaults
742    ///
743    ///  - The unix socket location defaults to `/var/run/docker.sock`. The windows named pipe
744    ///    location defaults to `//./pipe/docker_engine`.
745    ///  - The request timeout defaults to 2 minutes.
746    ///
747    /// # Examples
748    ///
749    /// ```rust,no_run
750    /// use bollard_next::Docker;
751    ///
752    /// use futures_util::future::TryFutureExt;
753    ///
754    /// let connection = Docker::connect_with_socket_defaults().unwrap();
755    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
756    /// ```
757    pub fn connect_with_socket_defaults() -> Result<Docker, Error> {
758        #[cfg(unix)]
759        let path = DEFAULT_SOCKET;
760        #[cfg(windows)]
761        let path = DEFAULT_NAMED_PIPE;
762
763        Docker::connect_with_socket(path, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
764    }
765
766    /// Connect using a Unix socket or a Windows named pipe.
767    ///
768    /// # Arguments
769    ///
770    ///  - `path`: connection unix socket path or windows named pipe path.
771    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
772    ///  - `client_version`: the client version to communicate with the server.
773    ///
774    /// # Examples
775    ///
776    /// ```rust,no_run
777    /// use bollard_next::{API_DEFAULT_VERSION, Docker};
778    ///
779    /// use futures_util::future::TryFutureExt;
780    ///
781    /// let connection = Docker::connect_with_socket("/var/run/docker.sock", 120, API_DEFAULT_VERSION).unwrap();
782    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
783    /// ```
784    pub fn connect_with_socket(
785        path: &str,
786        timeout: u64,
787        client_version: &ClientVersion,
788    ) -> Result<Docker, Error> {
789        // Remove the scheme if present
790        let clean_path = path
791            .trim_start_matches("unix://")
792            .trim_start_matches("npipe://");
793
794        // Check if the socket file exists
795        if !std::path::Path::new(clean_path).exists() {
796            return Err(Error::SocketNotFoundError(clean_path.to_string()));
797        }
798
799        #[cfg(unix)]
800        let docker = Docker::connect_with_unix(path, timeout, client_version)?;
801        #[cfg(windows)]
802        let docker = Docker::connect_with_named_pipe(path, timeout, client_version)?;
803
804        Ok(docker)
805    }
806
807    /// Connect using the local machine connection method with default arguments.
808    ///
809    /// This is a simple wrapper over the OS specific handlers:
810    ///  * Unix: [`Docker::connect_with_unix_defaults`]
811    ///  * Windows: [`Docker::connect_with_named_pipe_defaults`]
812    ///
813    /// [`Docker::connect_with_unix_defaults`]: Docker::connect_with_unix_defaults()
814    /// [`Docker::connect_with_named_pipe_defaults`]: Docker::connect_with_named_pipe_defaults()
815    pub fn connect_with_local_defaults() -> Result<Docker, Error> {
816        #[cfg(unix)]
817        return Docker::connect_with_unix_defaults();
818        #[cfg(windows)]
819        return Docker::connect_with_named_pipe_defaults();
820    }
821
822    /// Connect using the local machine connection method with supplied arguments.
823    ///
824    /// This is a simple wrapper over the OS specific handlers:
825    ///  * Unix: [`Docker::connect_with_unix`]
826    ///  * Windows: [`Docker::connect_with_named_pipe`]
827    ///
828    /// [`Docker::connect_with_unix`]: Docker::connect_with_unix()
829    /// [`Docker::connect_with_named_pipe`]: Docker::connect_with_named_pipe()
830    pub fn connect_with_local(
831        addr: &str,
832        timeout: u64,
833        client_version: &ClientVersion,
834    ) -> Result<Docker, Error> {
835        #[cfg(unix)]
836        return Docker::connect_with_unix(addr, timeout, client_version);
837        #[cfg(windows)]
838        return Docker::connect_with_named_pipe(addr, timeout, client_version);
839    }
840}
841
842/// A Docker implementation with defaults.
843impl Docker {
844    /// Connect using a Unix socket, a Windows named pipe, or via HTTP.
845    /// The connection method is determined by the `DOCKER_HOST` environment variable.
846    ///
847    /// # Examples
848    ///
849    /// ```rust,no_run
850    /// use bollard::{API_DEFAULT_VERSION, Docker};
851    ///
852    /// use futures_util::future::TryFutureExt;
853    ///
854    /// let connection = Docker::connect_with_defaults().unwrap();
855    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
856    /// ```
857    pub fn connect_with_defaults() -> Result<Docker, Error> {
858        let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_DOCKER_HOST.to_string());
859        match host {
860            #[cfg(all(feature = "pipe", unix))]
861            h if h.starts_with("unix://") => {
862                Docker::connect_with_unix(&h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
863            }
864            #[cfg(all(feature = "pipe", windows))]
865            h if h.starts_with("npipe://") => {
866                Docker::connect_with_named_pipe(&h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
867            }
868            #[cfg(feature = "http")]
869            h if h.starts_with("tcp://") || h.starts_with("http://") => {
870                #[cfg(feature = "ssl_providerless")]
871                if env::var("DOCKER_TLS_VERIFY").is_ok() {
872                    return Docker::connect_with_ssl_defaults();
873                }
874                Docker::connect_with_http_defaults()
875            }
876            #[cfg(feature = "ssl_providerless")]
877            h if h.starts_with("https://") => Docker::connect_with_ssl_defaults(),
878            _ => Err(UnsupportedURISchemeError {
879                uri: host.to_string(),
880            }),
881        }
882    }
883}
884
885#[cfg(all(feature = "pipe", unix))]
886/// A Docker implementation typed to connect to a Unix socket.
887impl Docker {
888    /// Connect using a Unix socket using defaults common to the standard docker configuration.
889    ///
890    /// # Defaults
891    ///
892    ///  - The socket location defaults to the value of `DEFAULT_SOCKET` env if its set and the URL
893    ///    has `unix` scheme; otherwise `/var/run/docker.sock`.
894    ///  - The request timeout defaults to 2 minutes.
895    ///
896    /// # Examples
897    ///
898    /// ```rust,no_run
899    /// use bollard_next::Docker;
900    ///
901    /// use futures_util::future::TryFutureExt;
902    ///
903    /// let connection = Docker::connect_with_unix_defaults().unwrap();
904    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
905    /// ```
906    pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
907        // Using 3 variables to not have to copy/allocate `DEFAULT_SOCKET`.
908        let socket_path = env::var("DOCKER_HOST").ok().and_then(|p| {
909            if p.starts_with("unix://") {
910                Some(p)
911            } else {
912                None
913            }
914        });
915        let path = socket_path.as_deref();
916        let path_ref: &str = path.unwrap_or(DEFAULT_SOCKET);
917        Docker::connect_with_unix(path_ref, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
918    }
919
920    /// Connect using a Unix socket.
921    ///
922    /// # Arguments
923    ///
924    ///  - `addr`: connection socket path.
925    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
926    ///  - `client_version`: the client version to communicate with the server.
927    ///
928    /// # Examples
929    ///
930    /// ```rust,no_run
931    /// use bollard_next::{API_DEFAULT_VERSION, Docker};
932    ///
933    /// use futures_util::future::TryFutureExt;
934    ///
935    /// let connection = Docker::connect_with_unix("/var/run/docker.sock", 120, API_DEFAULT_VERSION).unwrap();
936    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
937    /// ```
938    pub fn connect_with_unix(
939        path: &str,
940        timeout: u64,
941        client_version: &ClientVersion,
942    ) -> Result<Docker, Error> {
943        let client_addr = path.replacen("unix://", "", 1);
944
945        // check if the socket file exists and is accessible
946        if !Path::new(&client_addr).exists() {
947            return Err(Error::SocketNotFoundError(client_addr));
948        }
949
950        let unix_connector = UnixConnector;
951
952        let mut client_builder = Client::builder(TokioExecutor::new());
953        client_builder.pool_max_idle_per_host(0);
954
955        let client = client_builder.build(unix_connector);
956        let transport = Transport::Unix { client };
957        let docker = Docker {
958            transport: Arc::new(transport),
959            client_type: ClientType::Unix,
960            client_addr,
961            client_timeout: timeout,
962            version: Arc::new((
963                AtomicUsize::new(client_version.major_version),
964                AtomicUsize::new(client_version.minor_version),
965            )),
966        };
967
968        Ok(docker)
969    }
970}
971
972#[cfg(all(feature = "pipe", windows))]
973/// A Docker implementation typed to connect to a Windows Named Pipe, exclusive to the windows
974/// target.
975impl Docker {
976    /// Connect using a Windows Named Pipe using defaults that are common to the standard docker
977    /// configuration.
978    ///
979    /// # Defaults
980    ///
981    ///  - The socket location defaults to `//./pipe/docker_engine`.
982    ///  - The request timeout defaults to 2 minutes.
983    ///
984    /// # Examples
985    ///
986    /// ```rust,no_run
987    /// use bollard_next::Docker;
988    ///
989    /// use futures_util::future::TryFutureExt;
990    ///
991    /// let connection = Docker::connect_with_named_pipe_defaults().unwrap();
992    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
993    ///
994    /// ```
995    pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
996        Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
997    }
998
999    /// Connect using a Windows Named Pipe.
1000    ///
1001    /// # Arguments
1002    ///
1003    ///  - `addr`: socket location.
1004    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
1005    ///  - `client_version`: the client version to communicate with the server.
1006    ///
1007    /// # Examples
1008    ///
1009    /// ```rust,no_run
1010    /// use bollard_next::{API_DEFAULT_VERSION, Docker};
1011    ///
1012    /// use futures_util::future::TryFutureExt;
1013    ///
1014    /// let connection = Docker::connect_with_named_pipe(
1015    ///     "//./pipe/docker_engine", 120, API_DEFAULT_VERSION).unwrap();
1016    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
1017    ///
1018    /// ```
1019    pub fn connect_with_named_pipe(
1020        path: &str,
1021        timeout: u64,
1022        client_version: &ClientVersion,
1023    ) -> Result<Docker, Error> {
1024        let client_addr = path.replacen("npipe://", "", 1);
1025
1026        let named_pipe_connector = NamedPipeConnector;
1027
1028        let mut client_builder = Client::builder(TokioExecutor::new());
1029        client_builder.http1_title_case_headers(true);
1030        client_builder.pool_max_idle_per_host(0);
1031
1032        let client = client_builder.build(named_pipe_connector);
1033        let transport = Transport::NamedPipe { client };
1034        let docker = Docker {
1035            transport: Arc::new(transport),
1036            client_type: ClientType::NamedPipe,
1037            client_addr,
1038            client_timeout: timeout,
1039            version: Arc::new((
1040                AtomicUsize::new(client_version.major_version),
1041                AtomicUsize::new(client_version.minor_version),
1042            )),
1043        };
1044
1045        Ok(docker)
1046    }
1047}
1048
1049#[cfg(test)]
1050impl Docker {
1051    ///
1052    ///  - `connector`: a `HostToReplyConnector` as defined in `yup_hyper_mock`
1053    ///  - `client_addr`: location to connect to.
1054    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
1055    ///  - `client_version`: the client version to communicate with the server.
1056    ///
1057    /// # Examples
1058    ///
1059    /// ```rust,no_run
1060    /// # extern crate bollard;
1061    /// # extern crate futures;
1062    /// # extern crate yup_hyper_mock;
1063    /// # fn main () {
1064    /// use bollard_next::{API_DEFAULT_VERSION, Docker};
1065    ///
1066    /// use futures::future::Future;
1067    ///
1068    /// # use yup_hyper_mock::HostToReplyConnector;
1069    /// let mut connector = HostToReplyConnector::default();
1070    /// connector.m.insert(
1071    ///   String::from("http://127.0.0.1"),
1072    ///   "HTTP/1.1 200 OK\r\nServer: mock1\r\nContent-Type: application/json\r\nContent-Length: 0\r\n\r\n".to_string()
1073    /// );
1074    /// let connection = Docker::connect_with_mock(connector, "127.0.0.1".to_string(), 5, API_DEFAULT_VERSION).unwrap();
1075    /// connection.ping()
1076    ///   .and_then(|_| Ok(println!("Connected!")));
1077    /// # }
1078    /// ```
1079    pub fn connect_with_mock(
1080        connector: yup_hyper_mock::HostToReplyConnector,
1081        client_addr: String,
1082        timeout: u64,
1083        client_version: &ClientVersion,
1084    ) -> Result<Docker, Error> {
1085        let client_builder = Client::builder(TokioExecutor::new());
1086        let client = client_builder.build(connector);
1087
1088        let (transport, client_type) = (Transport::Mock { client }, ClientType::Http);
1089
1090        let docker = Docker {
1091            transport: Arc::new(transport),
1092            client_type,
1093            client_addr,
1094            client_timeout: timeout,
1095            version: Arc::new((
1096                AtomicUsize::new(client_version.major_version),
1097                AtomicUsize::new(client_version.minor_version),
1098            )),
1099        };
1100
1101        Ok(docker)
1102    }
1103}
1104
1105impl Docker {
1106    /// Set the request timeout.
1107    ///
1108    /// This timeout is shared by all requests to the Docker Engine API.
1109    ///
1110    /// By default, 2 minutes.
1111    pub fn with_timeout(mut self, timeout: Duration) -> Self {
1112        self.set_timeout(timeout);
1113        self
1114    }
1115
1116    /// Get the current timeout.
1117    ///
1118    /// This timeout is shared by all requests to the Docker Engine API.
1119    pub fn timeout(&self) -> Duration {
1120        Duration::from_secs(self.client_timeout)
1121    }
1122
1123    /// Set the request timeout.
1124    ///
1125    /// This timeout is shared by all requests to the Docker Engine API.
1126    ///
1127    /// By default, 2 minutes.
1128    pub fn set_timeout(&mut self, timeout: Duration) {
1129        self.client_timeout = timeout.as_secs();
1130    }
1131}
1132
1133// The implementation block for Docker requests
1134impl Docker {
1135    pub(crate) fn process_into_value<T>(
1136        &self,
1137        req: Result<Request<BodyType>, Error>,
1138    ) -> impl Future<Output = Result<T, Error>>
1139    where
1140        T: DeserializeOwned,
1141    {
1142        let fut = self.process_request(req);
1143        async move { Docker::decode_response(fut.await?).await }
1144    }
1145
1146    pub(crate) fn process_into_stream<T>(
1147        &self,
1148        req: Result<Request<BodyType>, Error>,
1149    ) -> impl Stream<Item = Result<T, Error>> + Unpin
1150    where
1151        T: DeserializeOwned,
1152    {
1153        Box::pin(
1154            self.process_request(req)
1155                .map_ok(Docker::decode_into_stream::<T>)
1156                .into_stream()
1157                .try_flatten(),
1158        )
1159    }
1160
1161    pub(crate) fn process_into_stream_string(
1162        &self,
1163        req: Result<Request<BodyType>, Error>,
1164    ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
1165        Box::pin(
1166            self.process_request(req)
1167                .map_ok(Docker::decode_into_stream_string)
1168                .try_flatten_stream(),
1169        )
1170    }
1171
1172    pub(crate) fn process_into_unit(
1173        &self,
1174        req: Result<Request<BodyType>, Error>,
1175    ) -> impl Future<Output = Result<(), Error>> {
1176        let fut = self.process_request(req);
1177        async move {
1178            fut.await?;
1179            Ok(())
1180        }
1181    }
1182
1183    pub(crate) fn process_into_body(
1184        &self,
1185        req: Result<Request<BodyType>, Error>,
1186    ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
1187        Box::pin(
1188            self.process_request(req)
1189                .map_ok(|response| IncomingStream::new(response.into_body()))
1190                .into_stream()
1191                .try_flatten(),
1192        )
1193    }
1194
1195    pub(crate) fn process_into_string(
1196        &self,
1197        req: Result<Request<BodyType>, Error>,
1198    ) -> impl Future<Output = Result<String, Error>> {
1199        let fut = self.process_request(req);
1200        async move {
1201            let response = fut.await?;
1202            Docker::decode_into_string(response).await
1203        }
1204    }
1205
1206    pub(crate) async fn process_upgraded(
1207        &self,
1208        req: Result<Request<BodyType>, Error>,
1209    ) -> Result<(impl AsyncRead, impl AsyncWrite), Error> {
1210        let res = self.process_request(req).await?;
1211        let upgraded = hyper::upgrade::on(res).await?;
1212        let tokio_upgraded = AsyncUpgraded::new(upgraded);
1213
1214        Ok(split(tokio_upgraded))
1215    }
1216
1217    pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<BodyType, Error>
1218    where
1219        S: Serialize,
1220    {
1221        match body.map(|inst| serde_json::to_string(&inst)) {
1222            Some(Ok(res)) => Ok(Some(res)),
1223            Some(Err(e)) => Err(e.into()),
1224            None => Ok(None),
1225        }
1226        .map(|payload| {
1227            debug!("{}", payload.clone().unwrap_or_default());
1228            payload
1229                .map(|content| BodyType::Left(Full::new(content.into())))
1230                .unwrap_or(BodyType::Left(Full::new(Bytes::new())))
1231        })
1232    }
1233
1234    /// Return the currently set client version.
1235    pub fn client_version(&self) -> ClientVersion {
1236        self.version.as_ref().into()
1237    }
1238
1239    /// Check with the server for a supported version, and downgrade the client version if
1240    /// appropriate.
1241    ///
1242    /// # Examples:
1243    ///
1244    /// ```rust,no_run
1245    ///     use bollard_next::Docker;
1246    ///
1247    ///     let docker = Docker::connect_with_http_defaults().unwrap();
1248    ///     async move {
1249    ///         &docker.negotiate_version().await.unwrap().version();
1250    ///     };
1251    /// ```
1252    pub async fn negotiate_version(self) -> Result<Self, Error> {
1253        let req = self.build_request(
1254            "/version",
1255            Builder::new().method(Method::GET),
1256            None::<String>,
1257            Ok(BodyType::Left(Full::new(Bytes::new()))),
1258        );
1259
1260        let res = self
1261            .process_into_value::<crate::system::Version>(req)
1262            .await?;
1263
1264        let server_version: ClientVersion = if let Some(api_version) = res.api_version {
1265            match api_version.into() {
1266                MaybeClientVersion::Some(client_version) => client_version,
1267                MaybeClientVersion::None => {
1268                    return Err(APIVersionParseError {});
1269                }
1270            }
1271        } else {
1272            return Err(APIVersionParseError {});
1273        };
1274
1275        if server_version < self.client_version() {
1276            self.version
1277                .0
1278                .store(server_version.major_version, Ordering::Relaxed);
1279            self.version
1280                .1
1281                .store(server_version.minor_version, Ordering::Relaxed);
1282        }
1283
1284        Ok(self)
1285    }
1286
1287    pub(crate) fn process_request(
1288        &self,
1289        request: Result<Request<BodyType>, Error>,
1290    ) -> impl Future<Output = Result<Response<Incoming>, Error>> {
1291        let transport = self.transport.clone();
1292        let timeout = self.client_timeout;
1293
1294        match request.as_ref().map(|b| b.body()) {
1295            Ok(http_body_util::Either::Left(bytes)) => trace!("request: {:?}", bytes),
1296            Ok(http_body_util::Either::Right(_)) => trace!("request: (stream)"),
1297            Err(e) => trace!("request: Err({e:?}"),
1298        };
1299
1300        async move {
1301            let request = request?;
1302            let response = Docker::execute_request(transport, request, timeout).await?;
1303
1304            let status = response.status();
1305            match status {
1306                // Status code 200 - 299 or 304
1307                s if s.is_success() || s == StatusCode::NOT_MODIFIED => Ok(response),
1308
1309                StatusCode::SWITCHING_PROTOCOLS => Ok(response),
1310
1311                // All other status codes
1312                _ => {
1313                    let contents = Docker::decode_into_string(response).await?;
1314
1315                    let mut message = String::new();
1316                    if !contents.is_empty() {
1317                        message = serde_json::from_str::<DockerServerErrorMessage>(&contents)
1318                            .map(|msg| msg.message)
1319                            .or_else(|e| if e.is_data() { Ok(contents) } else { Err(e) })?;
1320                    }
1321                    Err(DockerResponseServerError {
1322                        status_code: status.as_u16(),
1323                        message,
1324                    })
1325                }
1326            }
1327        }
1328    }
1329
1330    pub(crate) fn build_request<O>(
1331        &self,
1332        path: &str,
1333        builder: Builder,
1334        query: Option<O>,
1335        payload: Result<BodyType, Error>,
1336    ) -> Result<Request<BodyType>, Error>
1337    where
1338        O: Serialize,
1339    {
1340        let uri = Uri::parse(
1341            &self.client_addr,
1342            &self.client_type,
1343            path,
1344            query,
1345            &self.client_version(),
1346        )?;
1347        let request_uri: hyper::Uri = uri.try_into()?;
1348        debug!("{}", &request_uri);
1349        Ok(builder
1350            .uri(request_uri)
1351            .header(CONTENT_TYPE, "application/json")
1352            .body(payload?)?)
1353    }
1354
1355    pub(crate) fn build_request_with_registry_auth<O>(
1356        &self,
1357        path: &str,
1358        mut builder: Builder,
1359        query: Option<O>,
1360        payload: Result<BodyType, Error>,
1361        credentials: DockerCredentialsHeader,
1362    ) -> Result<Request<BodyType>, Error>
1363    where
1364        O: Serialize,
1365    {
1366        match credentials {
1367            DockerCredentialsHeader::Config(config) => {
1368                let value = match config {
1369                    Some(config) => base64_url_encode(&serde_json::to_string(&config)?),
1370                    None => "".into(),
1371                };
1372
1373                builder = builder.header("X-Registry-Config", value)
1374            }
1375            DockerCredentialsHeader::Auth(auth) => {
1376                let value = match auth {
1377                    Some(config) => base64_url_encode(&serde_json::to_string(&config)?),
1378                    None => "".into(),
1379                };
1380
1381                builder = builder.header("X-Registry-Auth", value)
1382            }
1383        }
1384
1385        self.build_request(path, builder, query, payload)
1386    }
1387
1388    async fn execute_request(
1389        transport: Arc<Transport>,
1390        req: Request<BodyType>,
1391        timeout: u64,
1392    ) -> Result<Response<Incoming>, Error> {
1393        // This is where we determine to which transport we issue the request.
1394        let request = match *transport {
1395            #[cfg(feature = "http")]
1396            Transport::Http { ref client } => client.request(req).map_err(Error::from).boxed(),
1397            #[cfg(feature = "ssl_providerless")]
1398            Transport::Https { ref client } => client.request(req).map_err(Error::from).boxed(),
1399            #[cfg(all(feature = "pipe", unix))]
1400            Transport::Unix { ref client } => client.request(req).map_err(Error::from).boxed(),
1401            #[cfg(all(feature = "pipe", windows))]
1402            Transport::NamedPipe { ref client } => client.request(req).map_err(Error::from).boxed(),
1403            #[cfg(test)]
1404            Transport::Mock { ref client } => client.request(req).map_err(Error::from).boxed(),
1405            Transport::Custom { ref transport } => transport.request(req).boxed(),
1406        };
1407
1408        match tokio::time::timeout(Duration::from_secs(timeout), request).await {
1409            Ok(v) => Ok(v?),
1410            Err(_) => Err(RequestTimeoutError),
1411        }
1412    }
1413
1414    fn decode_into_stream<T>(res: Response<Incoming>) -> impl Stream<Item = Result<T, Error>>
1415    where
1416        T: DeserializeOwned,
1417    {
1418        FramedRead::new(StreamReader::new(res.into_body()), JsonLineDecoder::new())
1419    }
1420
1421    fn decode_into_stream_string(
1422        res: Response<Incoming>,
1423    ) -> impl Stream<Item = Result<LogOutput, Error>> {
1424        FramedRead::new(
1425            StreamReader::new(res.into_body()),
1426            NewlineLogOutputDecoder::new(false),
1427        )
1428        .map_err(Error::from)
1429    }
1430
1431    async fn decode_into_string(response: Response<Incoming>) -> Result<String, Error> {
1432        let body = response.into_body().collect().await?.to_bytes();
1433
1434        Ok(String::from_utf8_lossy(&body).to_string())
1435    }
1436
1437    async fn decode_response<T>(response: Response<Incoming>) -> Result<T, Error>
1438    where
1439        T: DeserializeOwned,
1440    {
1441        let bytes = response.into_body().collect().await?.to_bytes();
1442
1443        debug!("Decoded into string: {}", &String::from_utf8_lossy(&bytes));
1444
1445        serde_json::from_slice::<T>(&bytes).map_err(|e| {
1446            if e.is_data() {
1447                JsonDataError {
1448                    message: e.to_string(),
1449                    column: e.column(),
1450                    #[cfg(feature = "json_data_content")]
1451                    contents: String::from_utf8_lossy(&bytes).to_string(),
1452                }
1453            } else {
1454                e.into()
1455            }
1456        })
1457    }
1458}
1459
1460/// Either a stream or a full response
1461pub(crate) type BodyType = http_body_util::Either<
1462    Full<Bytes>,
1463    StreamBody<Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, Infallible>> + Send>>>,
1464>;
1465
1466pub(crate) fn body_stream(body: impl Stream<Item = Bytes> + Send + 'static) -> BodyType {
1467    BodyType::Right(StreamBody::new(Box::pin(body.map(|a| Ok(Frame::data(a))))))
1468}