Skip to main content

bollard/
docker.rs

1#[cfg(feature = "ssl_providerless")]
2use std::fs;
3use std::future::Future;
4#[cfg(feature = "ssl_providerless")]
5use std::io;
6#[cfg(any(feature = "pipe", feature = "ssl_providerless"))]
7use std::path::Path;
8#[cfg(feature = "ssl_providerless")]
9use std::path::PathBuf;
10use std::pin::Pin;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering;
13use std::sync::Arc;
14use std::time::Duration;
15use std::{cmp, env, fmt};
16
17use futures_core::Stream;
18use futures_util::future::FutureExt;
19use futures_util::future::TryFutureExt;
20use futures_util::stream::TryStreamExt;
21use futures_util::StreamExt;
22use http::header::CONTENT_TYPE;
23use http::request::Builder;
24use http_body_util::{BodyExt, Full, StreamBody};
25use hyper::body::{Frame, Incoming};
26use hyper::{self, body::Bytes, Method, Request, Response, StatusCode};
27#[cfg(feature = "ssl_providerless")]
28use hyper_rustls::HttpsConnector;
29#[cfg(any(feature = "http", test))]
30use hyper_util::client::legacy::connect::HttpConnector;
31#[cfg(any(feature = "http", feature = "ssh", feature = "pipe", test))]
32use hyper_util::{client::legacy::Client, rt::TokioExecutor};
33#[cfg(all(feature = "pipe", unix))]
34use hyperlocal::UnixConnector;
35use log::{debug, trace};
36#[cfg(feature = "ssl_providerless")]
37use rustls::{crypto::CryptoProvider, sign::CertifiedKey};
38#[cfg(feature = "ssl_providerless")]
39use rustls_pki_types::{CertificateDer, PrivateKeyDer};
40use serde_derive::{Deserialize, Serialize};
41use tokio::io::{split, AsyncRead, AsyncWrite};
42use tokio_util::codec::FramedRead;
43
44use crate::container::LogOutput;
45use crate::errors::Error;
46use crate::errors::Error::*;
47use crate::read::{
48    AsyncUpgraded, IncomingStream, JsonLineDecoder, NewlineLogOutputDecoder, StreamReader,
49};
50use crate::uri::Uri;
51#[cfg(all(feature = "pipe", windows))]
52use hyper_named_pipe::NamedPipeConnector;
53
54use crate::auth::{base64_url_encode, DockerCredentialsHeader};
55use serde::de::DeserializeOwned;
56use serde::ser::Serialize;
57
58#[cfg(feature = "websocket")]
59use tokio_tungstenite::WebSocketStream;
60
61/// The default `DOCKER_SOCKET` address that we will try to connect to.
62#[cfg(unix)]
63pub const DEFAULT_SOCKET: &str = "unix:///var/run/docker.sock";
64
65/// The default `DOCKER_NAMED_PIPE` address that a windows client will try to connect to.
66#[cfg(windows)]
67pub const DEFAULT_NAMED_PIPE: &str = "npipe:////./pipe/docker_engine";
68
69/// The default `DOCKER_TCP_ADDRESS` address that we will try to connect to.
70#[cfg(feature = "http")]
71pub const DEFAULT_TCP_ADDRESS: &str = "tcp://localhost:2375";
72
73/// The default `DOCKER_SSH_ADDRESS` address that we will try to connect to.
74#[cfg(feature = "ssh")]
75pub const DEFAULT_SSH_ADDRESS: &str = "ssh://localhost";
76
77/// The default rootless Podman socket path template.
78///
79/// The `{UID}` placeholder must be replaced with the actual user ID at runtime.
80/// Podman's rootless socket lives under `$XDG_RUNTIME_DIR/podman/podman.sock`,
81/// which on most Linux systems is `/run/user/{UID}/podman/podman.sock`.
82#[cfg(unix)]
83pub(crate) const DEFAULT_PODMAN_SOCKET_TEMPLATE: &str = "/run/user/{UID}/podman/podman.sock";
84
85/// The default Podman system socket (rootful, requires group membership or root).
86#[cfg(unix)]
87pub(crate) const DEFAULT_PODMAN_SYSTEM_SOCKET: &str = "unix:///run/podman/podman.sock";
88
89/// The default `DOCKER_HOST` address that we will try to connect to.
90#[cfg(unix)]
91pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_SOCKET;
92
93/// The default `DOCKER_HOST` address that we will try to connect to.
94#[cfg(windows)]
95pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_NAMED_PIPE;
96
97/// Default timeout for all requests is 2 minutes.
98#[cfg(any(feature = "http", feature = "ssh", feature = "pipe"))]
99const DEFAULT_TIMEOUT: u64 = 120;
100
101/// Default Client Version to communicate with the server.
102pub const API_DEFAULT_VERSION: &ClientVersion = &ClientVersion {
103    major_version: 1,
104    minor_version: 53,
105};
106
107#[derive(Debug, Clone)]
108pub(crate) enum ClientType {
109    #[cfg(all(feature = "pipe", unix))]
110    Unix,
111    #[cfg(feature = "http")]
112    Http,
113    #[cfg(feature = "ssl_providerless")]
114    SSL,
115    #[cfg(all(feature = "pipe", windows))]
116    NamedPipe,
117    #[cfg(feature = "ssh")]
118    Ssh,
119    Custom {
120        scheme: String,
121    },
122}
123
124/// `Request` from bollard used with `CustomTransport`
125pub type BollardRequest = Request<BodyType>;
126
127type TransportReturnTy = Pin<Box<dyn Future<Output = Result<Response<Incoming>, Error>> + Send>>;
128
129/// `CustomTransport` trait
130pub trait CustomTransport: Send + Sync {
131    /// Make a request, this returns a future
132    fn request(&self, request: BollardRequest) -> TransportReturnTy;
133}
134
135// auto impl for Fn(Request) -> Future<Output = Result<_, _>
136impl<Callback, ReturnTy> CustomTransport for Callback
137where
138    Callback: Fn(BollardRequest) -> ReturnTy + Send + Sync,
139    ReturnTy: Future<Output = Result<Response<Incoming>, Error>> + Send + 'static,
140{
141    fn request(&self, request: BollardRequest) -> TransportReturnTy {
142        Box::pin(self(request))
143    }
144}
145
146/// Transport is the type representing the means of communication
147/// with the Docker daemon.
148///
149/// Each transport usually encapsulate a hyper client
150/// with various Connect traits fulfilled.
151pub(crate) enum Transport {
152    #[cfg(feature = "http")]
153    Http {
154        client: Client<HttpConnector, BodyType>,
155    },
156    #[cfg(feature = "ssl_providerless")]
157    Https {
158        client: Client<HttpsConnector<HttpConnector>, BodyType>,
159    },
160    #[cfg(all(feature = "pipe", unix))]
161    Unix {
162        client: Client<UnixConnector, BodyType>,
163    },
164    #[cfg(all(feature = "pipe", windows))]
165    NamedPipe {
166        client: Client<NamedPipeConnector, BodyType>,
167    },
168    #[cfg(feature = "ssh")]
169    Ssh {
170        client: Client<crate::ssh::SshConnector, BodyType>,
171    },
172    #[cfg(test)]
173    Mock {
174        client: Client<yup_hyper_mock::HostToReplyConnector, BodyType>,
175    },
176    Custom {
177        transport: Box<dyn CustomTransport>,
178    },
179}
180
181impl fmt::Debug for Transport {
182    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183        match self {
184            #[cfg(feature = "http")]
185            Transport::Http { .. } => write!(f, "HTTP"),
186            #[cfg(feature = "ssl_providerless")]
187            Transport::Https { .. } => write!(f, "HTTPS(rustls)"),
188            #[cfg(all(feature = "pipe", unix))]
189            Transport::Unix { .. } => write!(f, "Unix"),
190            #[cfg(all(feature = "pipe", windows))]
191            Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
192            #[cfg(feature = "ssh")]
193            Transport::Ssh { .. } => write!(f, "SSH"),
194            #[cfg(test)]
195            Transport::Mock { .. } => write!(f, "Mock"),
196            Transport::Custom { .. } => write!(f, "Custom"),
197        }
198    }
199}
200
201#[derive(Debug, Copy, Clone, PartialEq)]
202/// Advisory version stub to use for communicating with the Server. The docker server will error if
203/// a higher client version is used than is compatible with the server. Beware also, that the
204/// docker server will return stubs for a higher version than the version set when communicating.
205///
206/// See also [negotiate_version](Docker::negotiate_version()), and the `client_version` argument when instantiating the
207/// [Docker] client instance.
208pub struct ClientVersion {
209    /// The major version number.
210    pub major_version: usize,
211    /// The minor version number.
212    pub minor_version: usize,
213}
214
215pub(crate) enum MaybeClientVersion {
216    Some(ClientVersion),
217    None,
218}
219
220impl<T: Into<String>> From<T> for MaybeClientVersion {
221    fn from(s: T) -> MaybeClientVersion {
222        match s
223            .into()
224            .split('.')
225            .map(|v| v.parse::<usize>())
226            .collect::<Vec<Result<usize, std::num::ParseIntError>>>()
227            .as_slice()
228        {
229            [Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
230                major_version: first.to_owned(),
231                minor_version: second.to_owned(),
232            }),
233            _ => MaybeClientVersion::None,
234        }
235    }
236}
237
238impl fmt::Display for ClientVersion {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        write!(f, "{}.{}", self.major_version, self.minor_version)
241    }
242}
243
244impl PartialOrd for ClientVersion {
245    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
246        match self.major_version.partial_cmp(&other.major_version) {
247            Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
248            res => res,
249        }
250    }
251}
252
253impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
254    fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
255        ClientVersion {
256            major_version: tpl.0.load(Ordering::Relaxed),
257            minor_version: tpl.1.load(Ordering::Relaxed),
258        }
259    }
260}
261
262pub type RequestModifier = Arc<dyn Fn(BollardRequest) -> BollardRequest + Send + Sync>;
263
264/// ---
265///
266/// # Docker
267///
268/// The main interface for calling the Docker API. Construct a new Docker instance using one of the
269/// connect methods:
270///  - [`Docker::connect_with_http_defaults`] (requires `http` feature)
271///  - `Docker::connect_with_named_pipe_defaults` (requires `pipe` feature, Windows only)
272///  - `Docker::connect_with_ssl_defaults` (requires `ssl` feature)
273///  - [`Docker::connect_with_unix_defaults`] (requires `pipe` feature, Unix only)
274///  - [`Docker::connect_with_local_defaults`]
275///  - [`Docker::connect_with_podman_defaults`] (Unix only)
276///  - `Docker::connect_with_ssh_defaults` (requires `ssh` feature)
277pub struct Docker {
278    pub(crate) transport: Arc<Transport>,
279    pub(crate) client_type: ClientType,
280    pub(crate) client_addr: String,
281    pub(crate) client_timeout: u64,
282    pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
283    pub(crate) request_modifier: Option<RequestModifier>,
284}
285
286impl fmt::Debug for Docker {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        f.debug_struct("Docker")
289            .field("transport", &self.transport)
290            .field("client_type", &self.client_type)
291            .field("client_addr", &self.client_addr)
292            .field("client_timeout", &self.client_timeout)
293            .field("version", &self.version)
294            .field(
295                "request_modifier",
296                &self.request_modifier.as_ref().map(|_| "<callback>"),
297            )
298            .finish()
299    }
300}
301
302impl Clone for Docker {
303    fn clone(&self) -> Docker {
304        Docker {
305            transport: self.transport.clone(),
306            client_type: self.client_type.clone(),
307            client_addr: self.client_addr.clone(),
308            client_timeout: self.client_timeout,
309            version: self.version.clone(),
310            request_modifier: self.request_modifier.clone(),
311        }
312    }
313}
314
315/// Internal model: Docker Server JSON payload when an error is emitted
316#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
317struct DockerServerErrorMessage {
318    message: String,
319}
320
321#[cfg(feature = "ssl_providerless")]
322#[derive(Debug)]
323struct DockerClientCertResolver {
324    ssl_key: PathBuf,
325    ssl_cert: PathBuf,
326}
327
328#[cfg(feature = "ssl_providerless")]
329impl DockerClientCertResolver {
330    /// The default directory in which to look for our Docker certificate
331    /// files.
332    pub fn default_cert_path() -> Result<PathBuf, Error> {
333        let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
334        if let Ok(ref path) = from_env {
335            Ok(Path::new(path).to_owned())
336        } else {
337            let home = home::home_dir().ok_or_else(|| NoHomePathError)?;
338            Ok(home.join(".docker"))
339        }
340    }
341
342    fn open_buffered(path: &Path) -> Result<io::BufReader<fs::File>, Error> {
343        Ok(io::BufReader::new(fs::File::open(path)?))
344    }
345
346    fn certs(path: &Path) -> Result<Vec<CertificateDer<'static>>, Error> {
347        use rustls_pki_types::pem::PemObject;
348        rustls_pki_types::CertificateDer::pem_reader_iter(&mut Self::open_buffered(path)?)
349            .filter_map(|res| {
350                if matches!(res, Err(rustls_pki_types::pem::Error::NoItemsFound)) {
351                    None
352                } else {
353                    Some(res)
354                }
355            })
356            .collect::<Result<Vec<CertificateDer<'static>>, rustls_pki_types::pem::Error>>()
357            .map_err(|_| CertPathError {
358                path: path.to_path_buf(),
359            })
360    }
361
362    fn keys(path: &Path) -> Result<Vec<PrivateKeyDer<'static>>, Error> {
363        use rustls_pki_types::pem::PemObject;
364        let mut rdr = Self::open_buffered(path)?;
365        let mut keys = vec![];
366        match rustls_pki_types::PrivateKeyDer::from_pem_reader(&mut rdr) {
367            Ok(key) => keys.push(key),
368            Err(rustls_pki_types::pem::Error::NoItemsFound) => {}
369            Err(_e) => {
370                return Err(CertPathError {
371                    path: path.to_path_buf(),
372                })
373            }
374        }
375        Ok(keys)
376    }
377
378    fn docker_client_key(&self) -> Result<Arc<CertifiedKey>, Error> {
379        let all_certs = Self::certs(&self.ssl_cert)?;
380
381        let mut all_keys = Self::keys(&self.ssl_key)?;
382        let key = if all_keys.len() == 1 {
383            all_keys.remove(0)
384        } else {
385            return Err(CertMultipleKeys {
386                count: all_keys.len(),
387                path: self.ssl_key.to_owned(),
388            });
389        };
390        let signing_key = CryptoProvider::get_default()
391            .expect("no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point")
392            .key_provider
393            .load_private_key(key)
394            .map_err(|_| CertParseError {
395                path: self.ssl_key.to_owned(),
396            })?;
397
398        Ok(Arc::new(CertifiedKey::new(all_certs, signing_key)))
399    }
400}
401
402#[cfg(feature = "ssl_providerless")]
403impl rustls::client::ResolvesClientCert for DockerClientCertResolver {
404    fn resolve(&self, _: &[&[u8]], _: &[rustls::SignatureScheme]) -> Option<Arc<CertifiedKey>> {
405        self.docker_client_key().ok()
406    }
407
408    fn has_certs(&self) -> bool {
409        true
410    }
411}
412
413/// A Docker implementation typed to connect to a secure HTTPS connection using the `rustls`
414/// library.
415#[cfg(feature = "ssl_providerless")]
416impl Docker {
417    /// Connect using secure HTTPS using defaults that are signalled by environment variables.
418    ///
419    /// # Defaults
420    ///
421    ///  - The connection url is sourced from the `DOCKER_HOST` environment variable.
422    ///  - The certificate directory is sourced from the `DOCKER_CERT_PATH` environment variable.
423    ///  - Certificates are named `key.pem`, `cert.pem` and `ca.pem` to indicate the private key,
424    ///    the server certificate and the certificate chain respectively.
425    ///  - The request timeout defaults to 2 minutes.
426    ///
427    /// # Examples
428    ///
429    /// ```rust,no_run
430    /// use bollard::Docker;
431    ///
432    /// use futures_util::future::TryFutureExt;
433    ///
434    /// let connection = Docker::connect_with_ssl_defaults().unwrap();
435    /// connection.ping()
436    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
437    /// ```
438    ///
439    /// # Panics
440    ///
441    /// This function will panic if neither `ssl` nor `aws-lc-rs` features are activated,
442    /// or if you are using the `ssl_providerless` feature without installing the custom cryptographic
443    /// provider before with [`rustls::crypto::CryptoProvider::install_default()`]
444    pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
445        let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
446        Self::connect_with_ssl_default_certs(&host)
447    }
448
449    /// Connect to a custom host using secure HTTPS, but with the default certificates.
450    fn connect_with_ssl_default_certs(host: &str) -> Result<Docker, Error> {
451        let cert_path = DockerClientCertResolver::default_cert_path()?;
452        Docker::connect_with_ssl(
453            host,
454            &cert_path.join("key.pem"),
455            &cert_path.join("cert.pem"),
456            &cert_path.join("ca.pem"),
457            DEFAULT_TIMEOUT,
458            API_DEFAULT_VERSION,
459        )
460    }
461
462    /// Connect using secure HTTPS.
463    ///
464    /// # Arguments
465    ///
466    ///  - `addr`: the connection url.
467    ///  - `ssl_key`: the private key path.
468    ///  - `ssl_cert`: the server certificate path.
469    ///  - `ssl_ca`: the certificate chain path.
470    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
471    ///  - `client_version`: the client version to communicate with the server.
472    ///
473    /// # Examples
474    ///
475    /// ```rust,no_run
476    /// use bollard::{API_DEFAULT_VERSION, Docker};
477    ///
478    /// use std::path::Path;
479    ///
480    /// use futures_util::future::TryFutureExt;
481    ///
482    /// let connection = Docker::connect_with_ssl(
483    ///     "tcp://localhost:2375/",
484    ///     Path::new("/certs/key.pem"),
485    ///     Path::new("/certs/cert.pem"),
486    ///     Path::new("/certs/ca.pem"),
487    ///     120,
488    ///     API_DEFAULT_VERSION).unwrap();
489    /// connection.ping()
490    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
491    /// ```
492    ///
493    /// # Panics
494    ///
495    /// This function will panic if neither `ssl` nor `aws-lc-rs` features are activated,
496    /// or if you are using the `ssl_providerless` feature without installing the custom cryptographic
497    /// provider before with [`rustls::crypto::CryptoProvider::install_default()`]
498    pub fn connect_with_ssl(
499        addr: &str,
500        ssl_key: &Path,
501        ssl_cert: &Path,
502        ssl_ca: &Path,
503        timeout: u64,
504        client_version: &ClientVersion,
505    ) -> Result<Docker, Error> {
506        use rustls_pki_types::pem::PemObject;
507        // This ensures that using docker-machine-esque addresses work with Hyper.
508        let client_addr = addr.replacen("tcp://", "", 1).replacen("https://", "", 1);
509
510        let mut root_store = rustls::RootCertStore::empty();
511
512        #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
513        let native_certs = rustls_native_certs::load_native_certs();
514
515        #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
516        if native_certs.errors.is_empty() {
517            for cert in native_certs.certs {
518                root_store
519                    .add(cert)
520                    .map_err(|err| NoNativeCertsError { err })?
521            }
522        } else {
523            return Err(LoadNativeCertsErrors {
524                errors: native_certs.errors,
525            });
526        }
527        #[cfg(any(feature = "test_ssl", feature = "webpki"))]
528        root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
529
530        let mut ca_pem = io::Cursor::new(fs::read(ssl_ca).map_err(|_| CertPathError {
531            path: ssl_ca.to_owned(),
532        })?);
533
534        root_store.add_parsable_certificates(
535            rustls_pki_types::CertificateDer::pem_reader_iter(&mut ca_pem)
536                .collect::<Result<Vec<_>, _>>()
537                .map_err(|_| CertParseError {
538                    path: ssl_ca.to_owned(),
539                })?,
540        );
541
542        let config = rustls::ClientConfig::builder()
543            .with_root_certificates(root_store)
544            .with_client_cert_resolver(Arc::new(DockerClientCertResolver {
545                ssl_key: ssl_key.to_owned(),
546                ssl_cert: ssl_cert.to_owned(),
547            }));
548
549        let mut http_connector = HttpConnector::new();
550        http_connector.enforce_http(false);
551
552        let https_connector: HttpsConnector<HttpConnector> =
553            HttpsConnector::from((http_connector, config));
554
555        let mut client_builder = Client::builder(TokioExecutor::new());
556        client_builder.pool_max_idle_per_host(0);
557
558        let client = client_builder.build(https_connector);
559        let transport = Transport::Https { client };
560        let docker = Docker {
561            transport: Arc::new(transport),
562            client_type: ClientType::SSL,
563            client_addr,
564            client_timeout: timeout,
565            version: Arc::new((
566                AtomicUsize::new(client_version.major_version),
567                AtomicUsize::new(client_version.minor_version),
568            )),
569            request_modifier: None,
570        };
571
572        Ok(docker)
573    }
574}
575
576#[cfg(feature = "http")]
577/// A Docker implementation typed to connect to an unsecure Http connection.
578impl Docker {
579    /// Connect using unsecured HTTP using defaults that are signalled by environment variables.
580    ///
581    /// # Defaults
582    ///
583    ///  - The connection url is sourced from the `DOCKER_HOST` environment variable, and defaults
584    ///    to `localhost:2375`.
585    ///  - The number of threads used for the HTTP connection pool defaults to 1.
586    ///  - The request timeout defaults to 2 minutes.
587    ///
588    /// # Examples
589    ///
590    /// ```rust,no_run
591    /// use bollard::Docker;
592    ///
593    /// use futures_util::future::TryFutureExt;
594    ///
595    /// let connection = Docker::connect_with_http_defaults().unwrap();
596    /// connection.ping()
597    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
598    /// ```
599    pub fn connect_with_http_defaults() -> Result<Docker, Error> {
600        let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
601        Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
602    }
603
604    /// Connect using unsecured HTTP.
605    ///
606    /// # Arguments
607    ///
608    ///  - `addr`: connection url including scheme and port.
609    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
610    ///  - `client_version`: the client version to communicate with the server.
611    ///
612    /// # Examples
613    ///
614    /// ```rust,no_run
615    /// use bollard::{API_DEFAULT_VERSION, Docker};
616    ///
617    /// use futures_util::future::TryFutureExt;
618    ///
619    /// let connection = Docker::connect_with_http(
620    ///                    "http://my-custom-docker-server:2735", 4, API_DEFAULT_VERSION)
621    ///                    .unwrap();
622    /// connection.ping()
623    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
624    /// ```
625    pub fn connect_with_http(
626        addr: &str,
627        timeout: u64,
628        client_version: &ClientVersion,
629    ) -> Result<Docker, Error> {
630        // This ensures that using docker-machine-esque addresses work with Hyper.
631        let client_addr = addr.replacen("tcp://", "", 1).replacen("http://", "", 1);
632
633        let http_connector = HttpConnector::new();
634
635        let mut client_builder = Client::builder(TokioExecutor::new());
636        client_builder.pool_max_idle_per_host(0);
637
638        let client = client_builder.build(http_connector);
639        let transport = Transport::Http { client };
640        let docker = Docker {
641            transport: Arc::new(transport),
642            client_type: ClientType::Http,
643            client_addr,
644            client_timeout: timeout,
645            version: Arc::new((
646                AtomicUsize::new(client_version.major_version),
647                AtomicUsize::new(client_version.minor_version),
648            )),
649            request_modifier: None,
650        };
651
652        Ok(docker)
653    }
654}
655
656/// A Docker implementation typed to custom connector.
657impl Docker {
658    /// Connect using custom transport implementation.
659    /// It has default implementation for `Fn(Request) -> Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + Sync`
660    ///
661    /// # Arguments
662    ///
663    ///  - `transport`: transport.
664    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
665    ///  - `client_version`: the client version to communicate with the server.
666    ///
667    /// # Examples
668    ///
669    /// ```rust,no_run
670    /// use bollard::{API_DEFAULT_VERSION, Docker, BollardRequest};
671    /// use futures_util::future::TryFutureExt;
672    /// use futures_util::FutureExt;
673    ///
674    /// let http_connector = hyper_util::client::legacy::connect::HttpConnector::new();
675    ///
676    /// let mut client_builder = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
677    /// client_builder.pool_max_idle_per_host(0);
678    ///
679    /// let client = std::sync::Arc::new(client_builder.build(http_connector));
680    ///
681    /// let connection = Docker::connect_with_custom_transport(
682    ///     move |req: BollardRequest| {
683    ///         let client = std::sync::Arc::clone(&client);
684    ///         Box::pin(async move {
685    ///             let (p, b) = req.into_parts();
686    ///             // let _prev = p.headers.insert("host", host);
687    ///             // let mut uri = p.uri.into_parts();
688    ///             //uri.path_and_query = uri.path_and_query.map(|paq|
689    ///             //   uri::PathAndQuery::try_from("/docker".to_owned() + paq.as_str())
690    ///             // ).transpose().map_err(bollard::errors::Error::from)?;
691    ///             // p.uri = uri.try_into().map_err(bollard::errors::Error::from)?;
692    ///             let req = BollardRequest::from_parts(p, b);
693    ///             client.request(req).await.map_err(bollard::errors::Error::from)
694    ///         })
695    ///     },
696    ///     Some("http://my-custom-docker-server:2735"),
697    ///     4,
698    ///     bollard::API_DEFAULT_VERSION,
699    /// ).unwrap();
700    ///
701    /// connection.ping()
702    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
703    /// ```
704    pub fn connect_with_custom_transport<S: Into<String>>(
705        transport: impl CustomTransport + 'static,
706        client_addr: Option<S>,
707        timeout: u64,
708        client_version: &ClientVersion,
709    ) -> Result<Docker, Error> {
710        let client_addr = client_addr.map(Into::into).unwrap_or_default();
711        let (scheme, client_addr) = client_addr
712            .split_once("://")
713            .unwrap_or(("", client_addr.as_str()));
714        let client_addr = client_addr.to_owned();
715        let scheme = scheme.to_owned();
716        let transport = Transport::Custom {
717            transport: Box::new(transport),
718        };
719        let docker = Docker {
720            transport: Arc::new(transport),
721            client_type: ClientType::Custom { scheme },
722            client_addr,
723            client_timeout: timeout,
724            version: Arc::new((
725                AtomicUsize::new(client_version.major_version),
726                AtomicUsize::new(client_version.minor_version),
727            )),
728            request_modifier: None,
729        };
730
731        Ok(docker)
732    }
733}
734
735/// A Docker implementation that wraps away which local implementation we are calling.
736#[cfg(all(feature = "pipe", any(unix, windows)))]
737impl Docker {
738    /// Connect using to either a Unix socket or a Windows named pipe using defaults common to the
739    /// standard docker configuration.
740    ///
741    /// # Defaults
742    ///
743    ///  - The unix socket location defaults to `/var/run/docker.sock`. The windows named pipe
744    ///    location defaults to `//./pipe/docker_engine`.
745    ///  - The request timeout defaults to 2 minutes.
746    ///
747    /// # Examples
748    ///
749    /// ```rust,no_run
750    /// use bollard::Docker;
751    ///
752    /// use futures_util::future::TryFutureExt;
753    ///
754    /// let connection = Docker::connect_with_socket_defaults().unwrap();
755    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
756    /// ```
757    pub fn connect_with_socket_defaults() -> Result<Docker, Error> {
758        #[cfg(unix)]
759        let path = DEFAULT_SOCKET;
760        #[cfg(windows)]
761        let path = DEFAULT_NAMED_PIPE;
762
763        Docker::connect_with_socket(path, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
764    }
765
766    /// Connect using a Unix socket or a Windows named pipe.
767    ///
768    /// # Arguments
769    ///
770    ///  - `path`: connection unix socket path or windows named pipe path.
771    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
772    ///  - `client_version`: the client version to communicate with the server.
773    ///
774    /// # Examples
775    ///
776    /// ```rust,no_run
777    /// use bollard::{API_DEFAULT_VERSION, Docker};
778    ///
779    /// use futures_util::future::TryFutureExt;
780    ///
781    /// let connection = Docker::connect_with_socket("/var/run/docker.sock", 120, API_DEFAULT_VERSION).unwrap();
782    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
783    /// ```
784    pub fn connect_with_socket(
785        path: &str,
786        timeout: u64,
787        client_version: &ClientVersion,
788    ) -> Result<Docker, Error> {
789        // Remove the scheme if present
790        let clean_path = path
791            .trim_start_matches("unix://")
792            .trim_start_matches("npipe://");
793
794        // Check if the socket file exists
795        if !Path::new(clean_path).exists() {
796            return Err(SocketNotFoundError(clean_path.to_string()));
797        }
798
799        #[cfg(unix)]
800        let docker = Docker::connect_with_unix(path, timeout, client_version)?;
801        #[cfg(windows)]
802        let docker = Docker::connect_with_named_pipe(path, timeout, client_version)?;
803
804        Ok(docker)
805    }
806
807    /// Connect using the local machine connection method with default arguments.
808    ///
809    /// Delegates to [`Docker::connect_with_unix_defaults`] on Unix or
810    /// `connect_with_named_pipe_defaults` on Windows.
811    ///
812    /// To connect to Podman instead, use [`Docker::connect_with_podman_defaults`].
813    pub fn connect_with_local_defaults() -> Result<Docker, Error> {
814        #[cfg(unix)]
815        return Docker::connect_with_unix_defaults();
816        #[cfg(windows)]
817        return Docker::connect_with_named_pipe_defaults();
818    }
819
820    /// Connect using the local machine connection method with supplied arguments.
821    ///
822    /// This is a simple wrapper over the OS specific handlers:
823    ///  * Unix: [`Docker::connect_with_unix`]
824    ///  * Windows: `Docker::connect_with_named_pipe`
825    pub fn connect_with_local(
826        addr: &str,
827        timeout: u64,
828        client_version: &ClientVersion,
829    ) -> Result<Docker, Error> {
830        #[cfg(unix)]
831        return Docker::connect_with_unix(addr, timeout, client_version);
832        #[cfg(windows)]
833        return Docker::connect_with_named_pipe(addr, timeout, client_version);
834    }
835}
836
837/// A Docker implementation with defaults.
838impl Docker {
839    /// Connect using a Unix socket, a Windows named pipe, or via HTTP.
840    /// The connection method is determined by the `DOCKER_HOST` environment variable.
841    ///
842    /// # Examples
843    ///
844    /// ```rust,no_run
845    /// use bollard::{API_DEFAULT_VERSION, Docker};
846    ///
847    /// use futures_util::future::TryFutureExt;
848    ///
849    /// let connection = Docker::connect_with_defaults().unwrap();
850    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
851    /// ```
852    pub fn connect_with_defaults() -> Result<Docker, Error> {
853        let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_DOCKER_HOST.to_string());
854        Self::connect_with_host(&host)
855    }
856
857    /// Connect using a Unix socket, a Windows named pipe, or via HTTP.
858    /// The connection method is determined by `host` parameter.
859    ///
860    /// # Examples
861    ///
862    /// ```rust,no_run
863    /// use bollard::{API_DEFAULT_VERSION, Docker};
864    ///
865    /// use futures_util::future::TryFutureExt;
866    ///
867    /// let connection = Docker::connect_with_host("unix:///var/run/docker.sock").unwrap();
868    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
869    /// ```
870    pub fn connect_with_host(host: &str) -> Result<Docker, Error> {
871        match host {
872            #[cfg(all(feature = "pipe", unix))]
873            h if h.starts_with("unix://") => {
874                Docker::connect_with_unix(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
875            }
876            #[cfg(all(feature = "pipe", windows))]
877            h if h.starts_with("npipe://") => {
878                Docker::connect_with_named_pipe(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
879            }
880            #[cfg(feature = "http")]
881            h if h.starts_with("tcp://") || h.starts_with("http://") => {
882                #[cfg(feature = "ssl_providerless")]
883                if env::var("DOCKER_TLS_VERIFY").is_ok() {
884                    return Docker::connect_with_ssl_default_certs(host);
885                }
886                Docker::connect_with_http(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
887            }
888            #[cfg(feature = "ssl_providerless")]
889            h if h.starts_with("https://") => Docker::connect_with_ssl_default_certs(host),
890            #[cfg(feature = "ssh")]
891            h if h.starts_with("ssh://") => {
892                Docker::connect_with_ssh(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION, None)
893            }
894            _ => Err(UnsupportedURISchemeError {
895                uri: host.to_string(),
896            }),
897        }
898    }
899}
900
901#[cfg(all(feature = "pipe", unix))]
902/// A Docker implementation typed to connect to a Unix socket.
903impl Docker {
904    /// Connect using a Unix socket using defaults common to the standard docker configuration.
905    ///
906    /// # Defaults
907    ///
908    ///  - The socket location defaults to the value of `DEFAULT_SOCKET` env if its set and the URL
909    ///    has `unix` scheme; otherwise `/var/run/docker.sock`.
910    ///  - The request timeout defaults to 2 minutes.
911    ///
912    /// # Examples
913    ///
914    /// ```rust,no_run
915    /// use bollard::Docker;
916    ///
917    /// use futures_util::future::TryFutureExt;
918    ///
919    /// let connection = Docker::connect_with_unix_defaults().unwrap();
920    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
921    /// ```
922    pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
923        // Using 3 variables to not have to copy/allocate `DEFAULT_SOCKET`.
924        let socket_path = env::var("DOCKER_HOST").ok().and_then(|p| {
925            if p.starts_with("unix://") {
926                Some(p)
927            } else {
928                None
929            }
930        });
931        let path = socket_path.as_deref();
932        let path_ref: &str = path.unwrap_or(DEFAULT_SOCKET);
933        Docker::connect_with_unix(path_ref, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
934    }
935
936    /// Resolve the rootless Podman socket path for the current user.
937    ///
938    /// Checks `$XDG_RUNTIME_DIR/podman/podman.sock` first, then falls back to
939    /// `/run/user/$UID/podman/podman.sock`. Returns `None` if neither exists.
940    #[cfg(unix)]
941    fn podman_rootless_socket_path() -> Option<String> {
942        // Prefer XDG_RUNTIME_DIR (the canonical way)
943        if let Ok(xrd) = env::var("XDG_RUNTIME_DIR") {
944            let sock = format!("{xrd}/podman/podman.sock");
945            if Path::new(&sock).exists() {
946                return Some(sock);
947            }
948        }
949
950        // Fall back to /run/user/$UID/podman/podman.sock
951        #[cfg(unix)]
952        {
953            use std::os::unix::fs::MetadataExt;
954            if let Ok(meta) = std::fs::metadata("/proc/self") {
955                let sock = DEFAULT_PODMAN_SOCKET_TEMPLATE.replace("{UID}", &meta.uid().to_string());
956                if Path::new(&sock).exists() {
957                    return Some(sock);
958                }
959            }
960        }
961
962        None
963    }
964
965    /// Resolve the Podman system socket path.
966    ///
967    /// Checks `/run/podman/podman.sock`. Returns `None` if it doesn't exist.
968    #[cfg(unix)]
969    fn podman_system_socket_path() -> Option<&'static str> {
970        let path = DEFAULT_PODMAN_SYSTEM_SOCKET
971            .strip_prefix("unix://")
972            .unwrap_or(DEFAULT_PODMAN_SYSTEM_SOCKET);
973        if Path::new(path).exists() {
974            Some(path)
975        } else {
976            None
977        }
978    }
979
980    /// Connect to a Podman socket with default arguments.
981    ///
982    /// # Socket discovery order
983    ///
984    /// 1. `$DOCKER_HOST` — if set and starts with `unix://`, used directly.
985    /// 2. Rootless Podman: `$XDG_RUNTIME_DIR/podman/podman.sock`
986    /// 3. Rootless Podman: `/run/user/$UID/podman/podman.sock`
987    /// 4. System Podman: `/run/podman/podman.sock`
988    /// 5. Falls back to the default Docker socket (`/var/run/docker.sock`).
989    ///
990    /// # Examples
991    ///
992    /// ```rust,no_run
993    /// use bollard::Docker;
994    ///
995    /// use futures_util::future::TryFutureExt;
996    ///
997    /// let connection = Docker::connect_with_podman_defaults().unwrap();
998    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
999    /// ```
1000    #[cfg(unix)]
1001    pub fn connect_with_podman_defaults() -> Result<Docker, Error> {
1002        // Honour explicit DOCKER_HOST first
1003        if let Some(host) = env::var("DOCKER_HOST")
1004            .ok()
1005            .filter(|p| p.starts_with("unix://"))
1006        {
1007            return Docker::connect_with_unix(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
1008        }
1009
1010        // Probe for Podman rootless socket
1011        if let Some(sock) = Self::podman_rootless_socket_path() {
1012            return Docker::connect_with_unix(&sock, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
1013        }
1014
1015        // Probe for Podman system socket
1016        if let Some(sock) = Self::podman_system_socket_path() {
1017            return Docker::connect_with_unix(sock, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
1018        }
1019
1020        // Fall back to default Docker socket
1021        Docker::connect_with_unix(DEFAULT_SOCKET, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
1022    }
1023
1024    /// Connect using a Unix socket.
1025    ///
1026    /// # Arguments
1027    ///
1028    ///  - `addr`: connection socket path.
1029    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
1030    ///  - `client_version`: the client version to communicate with the server.
1031    ///
1032    /// # Examples
1033    ///
1034    /// ```rust,no_run
1035    /// use bollard::{API_DEFAULT_VERSION, Docker};
1036    ///
1037    /// use futures_util::future::TryFutureExt;
1038    ///
1039    /// let connection = Docker::connect_with_unix("/var/run/docker.sock", 120, API_DEFAULT_VERSION).unwrap();
1040    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
1041    /// ```
1042    pub fn connect_with_unix(
1043        path: &str,
1044        timeout: u64,
1045        client_version: &ClientVersion,
1046    ) -> Result<Docker, Error> {
1047        let client_addr = path.replacen("unix://", "", 1);
1048
1049        // check if the socket file exists and is accessible
1050        if !Path::new(&client_addr).exists() {
1051            return Err(SocketNotFoundError(client_addr));
1052        }
1053
1054        let unix_connector = UnixConnector;
1055
1056        let mut client_builder = Client::builder(TokioExecutor::new());
1057        client_builder.pool_max_idle_per_host(0);
1058
1059        let client = client_builder.build(unix_connector);
1060        let transport = Transport::Unix { client };
1061        let docker = Docker {
1062            transport: Arc::new(transport),
1063            client_type: ClientType::Unix,
1064            client_addr,
1065            client_timeout: timeout,
1066            version: Arc::new((
1067                AtomicUsize::new(client_version.major_version),
1068                AtomicUsize::new(client_version.minor_version),
1069            )),
1070            request_modifier: None,
1071        };
1072
1073        Ok(docker)
1074    }
1075}
1076
1077#[cfg(all(feature = "pipe", windows))]
1078/// A Docker implementation typed to connect to a Windows Named Pipe, exclusive to the windows
1079/// target.
1080impl Docker {
1081    /// Connect using a Windows Named Pipe using defaults that are common to the standard docker
1082    /// configuration.
1083    ///
1084    /// # Defaults
1085    ///
1086    ///  - The socket location defaults to `//./pipe/docker_engine`.
1087    ///  - The request timeout defaults to 2 minutes.
1088    ///
1089    /// # Examples
1090    ///
1091    /// ```rust,no_run
1092    /// use bollard::Docker;
1093    ///
1094    /// use futures_util::future::TryFutureExt;
1095    ///
1096    /// let connection = Docker::connect_with_named_pipe_defaults().unwrap();
1097    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
1098    ///
1099    /// ```
1100    pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
1101        Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
1102    }
1103
1104    /// Connect using a Windows Named Pipe.
1105    ///
1106    /// # Arguments
1107    ///
1108    ///  - `addr`: socket location.
1109    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
1110    ///  - `client_version`: the client version to communicate with the server.
1111    ///
1112    /// # Examples
1113    ///
1114    /// ```rust,no_run
1115    /// use bollard::{API_DEFAULT_VERSION, Docker};
1116    ///
1117    /// use futures_util::future::TryFutureExt;
1118    ///
1119    /// let connection = Docker::connect_with_named_pipe(
1120    ///     "//./pipe/docker_engine", 120, API_DEFAULT_VERSION).unwrap();
1121    /// connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!")));
1122    ///
1123    /// ```
1124    pub fn connect_with_named_pipe(
1125        path: &str,
1126        timeout: u64,
1127        client_version: &ClientVersion,
1128    ) -> Result<Docker, Error> {
1129        let client_addr = path.replacen("npipe://", "", 1);
1130
1131        let named_pipe_connector = NamedPipeConnector;
1132
1133        let mut client_builder = Client::builder(TokioExecutor::new());
1134        client_builder.http1_title_case_headers(true);
1135        client_builder.pool_max_idle_per_host(0);
1136
1137        let client = client_builder.build(named_pipe_connector);
1138        let transport = Transport::NamedPipe { client };
1139        let docker = Docker {
1140            transport: Arc::new(transport),
1141            client_type: ClientType::NamedPipe,
1142            client_addr,
1143            client_timeout: timeout,
1144            version: Arc::new((
1145                AtomicUsize::new(client_version.major_version),
1146                AtomicUsize::new(client_version.minor_version),
1147            )),
1148            request_modifier: None,
1149        };
1150
1151        Ok(docker)
1152    }
1153}
1154
1155#[cfg(feature = "ssh")]
1156/// A Docker implementation typed to connect to an SSH connection.
1157impl Docker {
1158    /// Connect using SSH using defaults that are signalled by environment variables.
1159    ///
1160    /// # Defaults
1161    ///
1162    ///  - The connection url is sourced from the `DOCKER_HOST` environment variable, and defaults
1163    ///    to `ssh://localhost`.
1164    ///  - The number of threads used for the HTTP connection pool defaults to 1.
1165    ///  - The request timeout defaults to 2 minutes.
1166    ///
1167    /// # Examples
1168    ///
1169    /// ```rust,no_run
1170    /// use bollard::Docker;
1171    ///
1172    /// use futures_util::future::TryFutureExt;
1173    ///
1174    /// let connection = Docker::connect_with_ssh_defaults().unwrap();
1175    /// connection.ping()
1176    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
1177    /// ```
1178    pub fn connect_with_ssh_defaults() -> Result<Docker, Error> {
1179        let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_SSH_ADDRESS.to_string());
1180        Docker::connect_with_ssh(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION, None)
1181    }
1182
1183    /// Connect using SSH.
1184    ///
1185    /// # Arguments
1186    ///
1187    ///  - `addr`: connection url including scheme and port.
1188    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
1189    ///  - `client_version`: the client version to communicate with the server.
1190    ///
1191    /// # Examples
1192    ///
1193    /// ```rust,no_run
1194    /// use bollard::{API_DEFAULT_VERSION, Docker};
1195    ///
1196    /// use futures_util::future::TryFutureExt;
1197    ///
1198    /// let connection = Docker::connect_with_ssh(
1199    ///                    "ssh://user@my-custom-docker-server", 4, API_DEFAULT_VERSION, None)
1200    ///                    .unwrap();
1201    /// connection.ping()
1202    ///   .map_ok(|_| Ok::<_, ()>(println!("Connected!")));
1203    /// ```
1204    pub fn connect_with_ssh(
1205        addr: &str,
1206        timeout: u64,
1207        client_version: &ClientVersion,
1208        keypair_path: Option<String>,
1209    ) -> Result<Docker, Error> {
1210        let client_addr = addr.replacen("ssh://", "", 1);
1211
1212        let ssh_connector = match keypair_path {
1213            Some(path) => crate::ssh::SshConnector::with_keypair(path.to_string()),
1214            None => crate::ssh::SshConnector::new(),
1215        };
1216
1217        let client_builder = Client::builder(TokioExecutor::new());
1218
1219        let client = client_builder.build(ssh_connector);
1220        let transport = Transport::Ssh { client };
1221        let docker = Docker {
1222            transport: Arc::new(transport),
1223            client_type: ClientType::Ssh,
1224            client_addr,
1225            client_timeout: timeout,
1226            version: Arc::new((
1227                AtomicUsize::new(client_version.major_version),
1228                AtomicUsize::new(client_version.minor_version),
1229            )),
1230            request_modifier: None,
1231        };
1232
1233        Ok(docker)
1234    }
1235}
1236
1237#[cfg(test)]
1238impl Docker {
1239    ///
1240    ///  - `connector`: a `HostToReplyConnector` as defined in `yup_hyper_mock`
1241    ///  - `client_addr`: location to connect to.
1242    ///  - `timeout`: the read/write timeout (seconds) to use for every hyper connection
1243    ///  - `client_version`: the client version to communicate with the server.
1244    ///
1245    /// # Examples
1246    ///
1247    /// ```rust,no_run
1248    /// # extern crate bollard;
1249    /// # extern crate futures;
1250    /// # extern crate yup_hyper_mock;
1251    /// # fn main () {
1252    /// use bollard::{API_DEFAULT_VERSION, Docker};
1253    ///
1254    /// use futures::future::Future;
1255    ///
1256    /// # use yup_hyper_mock::HostToReplyConnector;
1257    /// let mut connector = HostToReplyConnector::default();
1258    /// connector.m.insert(
1259    ///   String::from("http://127.0.0.1"),
1260    ///   "HTTP/1.1 200 OK\r\nServer: mock1\r\nContent-Type: application/json\r\nContent-Length: 0\r\n\r\n".to_string()
1261    /// );
1262    /// let connection = Docker::connect_with_mock(connector, "127.0.0.1".to_string(), 5, API_DEFAULT_VERSION).unwrap();
1263    /// connection.ping()
1264    ///   .and_then(|_| Ok(println!("Connected!")));
1265    /// # }
1266    /// ```
1267    pub fn connect_with_mock(
1268        connector: yup_hyper_mock::HostToReplyConnector,
1269        client_addr: String,
1270        timeout: u64,
1271        client_version: &ClientVersion,
1272    ) -> Result<Docker, Error> {
1273        let client_builder = Client::builder(TokioExecutor::new());
1274        let client = client_builder.build(connector);
1275
1276        let (transport, client_type) = (Transport::Mock { client }, ClientType::Http);
1277
1278        let docker = Docker {
1279            transport: Arc::new(transport),
1280            client_type,
1281            client_addr,
1282            client_timeout: timeout,
1283            version: Arc::new((
1284                AtomicUsize::new(client_version.major_version),
1285                AtomicUsize::new(client_version.minor_version),
1286            )),
1287            request_modifier: None,
1288        };
1289
1290        Ok(docker)
1291    }
1292}
1293
1294impl Docker {
1295    /// Set the request timeout.
1296    ///
1297    /// This timeout is shared by all requests to the Docker Engine API.
1298    ///
1299    /// By default, 2 minutes.
1300    pub fn with_timeout(mut self, timeout: Duration) -> Self {
1301        self.set_timeout(timeout);
1302        self
1303    }
1304
1305    /// Get the current timeout.
1306    ///
1307    /// This timeout is shared by all requests to the Docker Engine API.
1308    pub fn timeout(&self) -> Duration {
1309        Duration::from_secs(self.client_timeout)
1310    }
1311
1312    /// Set the request timeout.
1313    ///
1314    /// This timeout is shared by all requests to the Docker Engine API.
1315    ///
1316    /// By default, 2 minutes.
1317    pub fn set_timeout(&mut self, timeout: Duration) {
1318        self.client_timeout = timeout.as_secs();
1319    }
1320
1321    /// Set a request modifier callback that runs before each request.
1322    ///
1323    /// This callback can modify the request before it is sent to the Docker Engine API.
1324    /// Useful for setting `User-Agent` headers or other request modifications.
1325    ///
1326    /// # Example
1327    ///
1328    /// ```rust,no_run
1329    /// use bollard::Docker;
1330    /// use http::header::{HeaderValue, USER_AGENT};
1331    ///
1332    /// let docker = Docker::connect_with_socket_defaults()
1333    ///     .unwrap()
1334    ///     .with_request_modifier(|mut req| {
1335    ///         req.headers_mut().insert(USER_AGENT, HeaderValue::from_static("my-app/1.0"));
1336    ///         req
1337    ///     });
1338    /// ```
1339    pub fn with_request_modifier<F>(mut self, modifier: F) -> Self
1340    where
1341        F: Fn(BollardRequest) -> BollardRequest + Send + Sync + 'static,
1342    {
1343        self.request_modifier = Some(Arc::new(modifier));
1344        self
1345    }
1346}
1347
1348// The implementation block for Docker requests
1349impl Docker {
1350    pub(crate) fn process_into_value<T>(
1351        &self,
1352        req: Result<Request<BodyType>, Error>,
1353    ) -> impl Future<Output = Result<T, Error>>
1354    where
1355        T: DeserializeOwned,
1356    {
1357        let fut = self.process_request(req);
1358        async move { Docker::decode_response(fut.await?).await }
1359    }
1360
1361    pub(crate) fn process_into_stream<T>(
1362        &self,
1363        req: Result<Request<BodyType>, Error>,
1364    ) -> impl Stream<Item = Result<T, Error>> + Unpin
1365    where
1366        T: DeserializeOwned,
1367    {
1368        Box::pin(
1369            self.process_request(req)
1370                .map_ok(Docker::decode_into_stream::<T>)
1371                .into_stream()
1372                .try_flatten(),
1373        )
1374    }
1375
1376    pub(crate) fn process_into_stream_string(
1377        &self,
1378        req: Result<Request<BodyType>, Error>,
1379    ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
1380        Box::pin(
1381            self.process_request(req)
1382                .map_ok(Docker::decode_into_stream_string)
1383                .try_flatten_stream(),
1384        )
1385    }
1386
1387    pub(crate) fn process_into_unit(
1388        &self,
1389        req: Result<Request<BodyType>, Error>,
1390    ) -> impl Future<Output = Result<(), Error>> {
1391        let fut = self.process_request(req);
1392        async move {
1393            fut.await?;
1394            Ok(())
1395        }
1396    }
1397
1398    pub(crate) fn process_into_body(
1399        &self,
1400        req: Result<Request<BodyType>, Error>,
1401    ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
1402        Box::pin(
1403            self.process_request(req)
1404                .map_ok(|response| IncomingStream::new(response.into_body()))
1405                .into_stream()
1406                .try_flatten(),
1407        )
1408    }
1409
1410    pub(crate) fn process_into_string(
1411        &self,
1412        req: Result<Request<BodyType>, Error>,
1413    ) -> impl Future<Output = Result<String, Error>> {
1414        let fut = self.process_request(req);
1415        async move {
1416            let response = fut.await?;
1417            Docker::decode_into_string(response).await
1418        }
1419    }
1420
1421    pub(crate) async fn process_upgraded(
1422        &self,
1423        req: Result<Request<BodyType>, Error>,
1424    ) -> Result<(impl AsyncRead, impl AsyncWrite), Error> {
1425        let res = self.process_request(req).await?;
1426        let upgraded = hyper::upgrade::on(res).await?;
1427        let tokio_upgraded = AsyncUpgraded::new(upgraded);
1428
1429        Ok(split(tokio_upgraded))
1430    }
1431
1432    #[cfg(all(feature = "websocket", unix))]
1433    pub(crate) async fn process_websocket<O>(
1434        &self,
1435        path: &str,
1436        query: Option<O>,
1437    ) -> Result<WebSocketStream<tokio::net::UnixStream>, Error>
1438    where
1439        O: Serialize,
1440    {
1441        use tokio_tungstenite::client_async;
1442
1443        let query_string = match query {
1444            Some(q) => {
1445                let qs = serde_urlencoded::to_string(&q)?;
1446                if qs.is_empty() {
1447                    String::new()
1448                } else {
1449                    format!("?{}", qs)
1450                }
1451            }
1452            None => String::new(),
1453        };
1454
1455        let ws_uri = format!("ws://localhost{}{}", path, query_string);
1456        debug!("WebSocket URI: {}", ws_uri);
1457
1458        let stream = tokio::net::UnixStream::connect(&self.client_addr).await?;
1459
1460        let (ws_stream, _response) = client_async(&ws_uri, stream)
1461            .await
1462            .map_err(|e| Error::WebSocketError { err: Box::new(e) })?;
1463
1464        Ok(ws_stream)
1465    }
1466
1467    #[cfg(all(feature = "websocket", not(unix)))]
1468    pub(crate) async fn process_websocket<O>(
1469        &self,
1470        path: &str,
1471        query: Option<O>,
1472    ) -> Result<WebSocketStream<tokio::net::TcpStream>, Error>
1473    where
1474        O: Serialize,
1475    {
1476        use tokio_tungstenite::client_async;
1477
1478        let query_string = match query {
1479            Some(q) => {
1480                let qs = serde_urlencoded::to_string(&q)?;
1481                if qs.is_empty() {
1482                    String::new()
1483                } else {
1484                    format!("?{}", qs)
1485                }
1486            }
1487            None => String::new(),
1488        };
1489
1490        let ws_uri = format!("ws://{}{}{}", self.client_addr, path, query_string);
1491        debug!("WebSocket URI: {}", ws_uri);
1492
1493        let stream = tokio::net::TcpStream::connect(&self.client_addr).await?;
1494
1495        let (ws_stream, _response) = client_async(&ws_uri, stream)
1496            .await
1497            .map_err(|e| Error::WebSocketError { err: Box::new(e) })?;
1498
1499        Ok(ws_stream)
1500    }
1501
1502    pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<BodyType, Error>
1503    where
1504        S: Serialize,
1505    {
1506        match body.map(|inst| serde_json::to_string(&inst)) {
1507            Some(Ok(res)) => Ok(Some(res)),
1508            Some(Err(e)) => Err(e.into()),
1509            None => Ok(None),
1510        }
1511        .map(|payload| {
1512            debug!("{}", payload.clone().unwrap_or_default());
1513            payload
1514                .map(|content| BodyType::Left(Full::new(content.into())))
1515                .unwrap_or(BodyType::Left(Full::new(Bytes::new())))
1516        })
1517    }
1518
1519    /// Return the currently set client version.
1520    pub fn client_version(&self) -> ClientVersion {
1521        self.version.as_ref().into()
1522    }
1523
1524    /// Check with the server for a supported version, and downgrade the client version if
1525    /// appropriate.
1526    ///
1527    /// # Examples:
1528    ///
1529    /// ```rust,no_run
1530    ///     use bollard::Docker;
1531    ///
1532    ///     let docker = Docker::connect_with_http_defaults().unwrap();
1533    ///     async move {
1534    ///         &docker.negotiate_version().await.unwrap().version();
1535    ///     };
1536    /// ```
1537    pub async fn negotiate_version(self) -> Result<Self, Error> {
1538        let req = self.build_request(
1539            "/version",
1540            Builder::new().method(Method::GET),
1541            None::<String>,
1542            Ok(BodyType::Left(Full::new(Bytes::new()))),
1543        );
1544
1545        let res = self
1546            .process_into_value::<crate::models::SystemVersion>(req)
1547            .await?;
1548
1549        let server_version: ClientVersion = if let Some(api_version) = res.api_version {
1550            match api_version.into() {
1551                MaybeClientVersion::Some(client_version) => client_version,
1552                MaybeClientVersion::None => {
1553                    return Err(APIVersionParseError {});
1554                }
1555            }
1556        } else {
1557            return Err(APIVersionParseError {});
1558        };
1559
1560        if server_version < self.client_version() {
1561            self.version
1562                .0
1563                .store(server_version.major_version, Ordering::Relaxed);
1564            self.version
1565                .1
1566                .store(server_version.minor_version, Ordering::Relaxed);
1567        }
1568
1569        Ok(self)
1570    }
1571
1572    pub(crate) fn process_request(
1573        &self,
1574        request: Result<Request<BodyType>, Error>,
1575    ) -> impl Future<Output = Result<Response<Incoming>, Error>> {
1576        let transport = self.transport.clone();
1577        let timeout = self.client_timeout;
1578
1579        match request.as_ref().map(|b| b.body()) {
1580            Ok(http_body_util::Either::Left(bytes)) => trace!("request: {bytes:?}"),
1581            Ok(http_body_util::Either::Right(_)) => trace!("request: (stream)"),
1582            Err(e) => trace!("request: Err({e:?}"),
1583        };
1584
1585        async move {
1586            let request = request?;
1587            let response = Docker::execute_request(transport, request, timeout).await?;
1588
1589            let status = response.status();
1590            match status {
1591                // Status code 200 - 299 or 304
1592                s if s.is_success() || s == StatusCode::NOT_MODIFIED => Ok(response),
1593
1594                StatusCode::SWITCHING_PROTOCOLS => Ok(response),
1595
1596                // All other status codes
1597                _ => {
1598                    let contents = Docker::decode_into_string(response).await?;
1599
1600                    let mut message = String::new();
1601                    if !contents.is_empty() {
1602                        message = serde_json::from_str::<DockerServerErrorMessage>(&contents)
1603                            .map(|msg| msg.message)
1604                            .or_else(|e| {
1605                                if e.is_data() || e.is_syntax() {
1606                                    Ok(contents)
1607                                } else {
1608                                    Err(e)
1609                                }
1610                            })?;
1611                    }
1612                    Err(DockerResponseServerError {
1613                        status_code: status.as_u16(),
1614                        message,
1615                    })
1616                }
1617            }
1618        }
1619    }
1620
1621    pub(crate) fn build_request<O>(
1622        &self,
1623        path: &str,
1624        builder: Builder,
1625        query: Option<O>,
1626        payload: Result<BodyType, Error>,
1627    ) -> Result<Request<BodyType>, Error>
1628    where
1629        O: Serialize,
1630    {
1631        let uri = Uri::parse(
1632            &self.client_addr,
1633            &self.client_type,
1634            path,
1635            query,
1636            &self.client_version(),
1637        )?;
1638        let request_uri: hyper::Uri = uri.try_into()?;
1639        debug!("{}", &request_uri);
1640
1641        let request = builder
1642            .uri(request_uri)
1643            .header(CONTENT_TYPE, "application/json")
1644            .body(payload?)?;
1645
1646        let request = if let Some(modifier) = &self.request_modifier {
1647            modifier(request)
1648        } else {
1649            request
1650        };
1651
1652        Ok(request)
1653    }
1654
1655    pub(crate) fn build_request_with_registry_auth<O>(
1656        &self,
1657        path: &str,
1658        mut builder: Builder,
1659        query: Option<O>,
1660        payload: Result<BodyType, Error>,
1661        credentials: DockerCredentialsHeader,
1662    ) -> Result<Request<BodyType>, Error>
1663    where
1664        O: Serialize,
1665    {
1666        match credentials {
1667            DockerCredentialsHeader::Config(Some(config)) => {
1668                let value = base64_url_encode(&serde_json::to_string(&config)?);
1669                builder = builder.header("X-Registry-Config", value)
1670            }
1671            DockerCredentialsHeader::Auth(Some(config)) => {
1672                let value = base64_url_encode(&serde_json::to_string(&config)?);
1673                builder = builder.header("X-Registry-Auth", value)
1674            }
1675            _ => {}
1676        }
1677
1678        self.build_request(path, builder, query, payload)
1679    }
1680
1681    async fn execute_request(
1682        transport: Arc<Transport>,
1683        req: Request<BodyType>,
1684        timeout: u64,
1685    ) -> Result<Response<Incoming>, Error> {
1686        // This is where we determine to which transport we issue the request.
1687        let request = match *transport {
1688            #[cfg(feature = "http")]
1689            Transport::Http { ref client } => client.request(req).map_err(Error::from).boxed(),
1690            #[cfg(feature = "ssl_providerless")]
1691            Transport::Https { ref client } => client.request(req).map_err(Error::from).boxed(),
1692            #[cfg(all(feature = "pipe", unix))]
1693            Transport::Unix { ref client } => client.request(req).map_err(Error::from).boxed(),
1694            #[cfg(all(feature = "pipe", windows))]
1695            Transport::NamedPipe { ref client } => client.request(req).map_err(Error::from).boxed(),
1696            #[cfg(feature = "ssh")]
1697            Transport::Ssh { ref client } => client.request(req).map_err(Error::from).boxed(),
1698            #[cfg(test)]
1699            Transport::Mock { ref client } => client.request(req).map_err(Error::from).boxed(),
1700            Transport::Custom { ref transport } => transport.request(req).boxed(),
1701        };
1702
1703        match tokio::time::timeout(Duration::from_secs(timeout), request).await {
1704            Ok(v) => Ok(v?),
1705            Err(_) => Err(RequestTimeoutError),
1706        }
1707    }
1708
1709    fn decode_into_stream<T>(res: Response<Incoming>) -> impl Stream<Item = Result<T, Error>>
1710    where
1711        T: DeserializeOwned,
1712    {
1713        FramedRead::new(StreamReader::new(res.into_body()), JsonLineDecoder::new())
1714    }
1715
1716    fn decode_into_stream_string(
1717        res: Response<Incoming>,
1718    ) -> impl Stream<Item = Result<LogOutput, Error>> {
1719        FramedRead::new(
1720            StreamReader::new(res.into_body()),
1721            NewlineLogOutputDecoder::new(false),
1722        )
1723        .map_err(Error::from)
1724    }
1725
1726    async fn decode_into_string(response: Response<Incoming>) -> Result<String, Error> {
1727        let body = response.into_body().collect().await?.to_bytes();
1728
1729        Ok(String::from_utf8_lossy(&body).to_string())
1730    }
1731
1732    async fn decode_response<T>(response: Response<Incoming>) -> Result<T, Error>
1733    where
1734        T: DeserializeOwned,
1735    {
1736        let bytes = response.into_body().collect().await?.to_bytes();
1737
1738        debug!("Decoded into string: {}", &String::from_utf8_lossy(&bytes));
1739
1740        serde_json::from_slice::<T>(&bytes).map_err(|e| {
1741            if e.is_data() || e.is_syntax() {
1742                JsonDataError {
1743                    message: e.to_string(),
1744                    column: e.column(),
1745                    #[cfg(feature = "json_data_content")]
1746                    contents: String::from_utf8_lossy(&bytes).to_string(),
1747                }
1748            } else {
1749                e.into()
1750            }
1751        })
1752    }
1753}
1754
1755/// Either a stream or a full response
1756pub(crate) type BodyType = http_body_util::Either<
1757    Full<Bytes>,
1758    StreamBody<Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, std::io::Error>> + Send>>>,
1759>;
1760
1761/// Convenience method to wrap a stream of bytes into frames for a bollard BodyType
1762pub fn body_stream(body: impl Stream<Item = Bytes> + Send + 'static) -> BodyType {
1763    BodyType::Right(StreamBody::new(Box::pin(body.map(|a| Ok(Frame::data(a))))))
1764}
1765
1766/// Convenience method to wrap a stream of failable bytes into frames for a bollard BodyType
1767pub fn body_try_stream(
1768    body: impl Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
1769) -> BodyType {
1770    BodyType::Right(StreamBody::new(Box::pin(body.map_ok(Frame::data))))
1771}
1772
1773/// Convenience method to wrap bytes into a bollard BodyType
1774pub fn body_full(body: Bytes) -> BodyType {
1775    BodyType::Left(Full::new(body))
1776}
1777
1778#[cfg(test)]
1779mod tests {
1780    use super::*;
1781
1782    #[cfg(unix)]
1783    mod podman {
1784        use super::*;
1785
1786        #[test]
1787        fn rootless_socket_from_xdg_runtime_dir() {
1788            let dir = tempfile::tempdir().unwrap();
1789            let sock_dir = dir.path().join("podman");
1790            std::fs::create_dir_all(&sock_dir).unwrap();
1791            let sock = sock_dir.join("podman.sock");
1792            std::fs::write(&sock, b"").unwrap();
1793
1794            // Set XDG_RUNTIME_DIR to our temp dir
1795            let _guard = TempEnvVar::set("XDG_RUNTIME_DIR", dir.path().to_str().unwrap());
1796
1797            let found = Docker::podman_rootless_socket_path();
1798            assert_eq!(found.as_deref(), Some(sock.to_str().unwrap()));
1799        }
1800
1801        #[test]
1802        fn rootless_socket_returns_none_when_missing() {
1803            // Point XDG_RUNTIME_DIR at empty dir — no podman socket
1804            let dir = tempfile::tempdir().unwrap();
1805            let _guard = TempEnvVar::set("XDG_RUNTIME_DIR", dir.path().to_str().unwrap());
1806
1807            let found = Docker::podman_rootless_socket_path();
1808            // May still find one via /run/user/$UID fallback on a Podman host,
1809            // but should not find one in the XDG dir we set.
1810            let xdg_sock = dir.path().join("podman/podman.sock");
1811            if let Some(ref path) = found {
1812                assert_ne!(path.as_str(), xdg_sock.to_str().unwrap());
1813            }
1814        }
1815
1816        #[test]
1817        fn system_socket_returns_none_when_missing() {
1818            // System socket at /run/podman/podman.sock may or may not exist
1819            // depending on the host, but the function should not panic.
1820            let _ = Docker::podman_system_socket_path();
1821        }
1822
1823        #[test]
1824        fn connect_with_podman_defaults_respects_docker_host() {
1825            // Create a fake socket
1826            let dir = tempfile::tempdir().unwrap();
1827            let sock = dir.path().join("test.sock");
1828            std::fs::write(&sock, b"").unwrap();
1829
1830            let uri = format!("unix://{}", sock.display());
1831            let _guard = TempEnvVar::set("DOCKER_HOST", &uri);
1832
1833            let docker = Docker::connect_with_podman_defaults().unwrap();
1834            assert_eq!(docker.client_addr, sock.to_str().unwrap());
1835        }
1836
1837        /// RAII guard that sets an env var and restores the previous value on drop.
1838        struct TempEnvVar {
1839            key: String,
1840            prev: Option<String>,
1841        }
1842
1843        impl TempEnvVar {
1844            fn set(key: &str, val: &str) -> Self {
1845                let prev = env::var(key).ok();
1846                env::set_var(key, val);
1847                Self {
1848                    key: key.to_string(),
1849                    prev,
1850                }
1851            }
1852        }
1853
1854        impl Drop for TempEnvVar {
1855            fn drop(&mut self) {
1856                match &self.prev {
1857                    Some(v) => env::set_var(&self.key, v),
1858                    None => env::remove_var(&self.key),
1859                }
1860            }
1861        }
1862    }
1863
1864    #[cfg(all(unix, feature = "pipe"))]
1865    mod docker_defaults {
1866        use super::*;
1867
1868        #[test]
1869        fn connect_with_unix_defaults_respects_docker_host() {
1870            let dir = tempfile::tempdir().unwrap();
1871            let sock = dir.path().join("test.sock");
1872            std::fs::write(&sock, b"").unwrap();
1873
1874            let uri = format!("unix://{}", sock.display());
1875            // Temporarily set DOCKER_HOST
1876            let prev = env::var("DOCKER_HOST").ok();
1877            env::set_var("DOCKER_HOST", &uri);
1878
1879            let docker = Docker::connect_with_unix_defaults().unwrap();
1880            assert_eq!(docker.client_addr, sock.to_str().unwrap());
1881
1882            // Restore
1883            match prev {
1884                Some(v) => env::set_var("DOCKER_HOST", v),
1885                None => env::remove_var("DOCKER_HOST"),
1886            }
1887        }
1888
1889        #[test]
1890        fn connect_with_unix_defaults_ignores_non_unix_docker_host() {
1891            let prev = env::var("DOCKER_HOST").ok();
1892            env::set_var("DOCKER_HOST", "tcp://localhost:2375");
1893
1894            // Should fall through to DEFAULT_SOCKET, which may or may not exist
1895            let result = Docker::connect_with_unix_defaults();
1896            // On a system without Docker, this errors with SocketNotFoundError — that's fine
1897            if let Err(SocketNotFoundError(addr)) = &result {
1898                assert!(addr.contains("docker.sock"));
1899            }
1900
1901            match prev {
1902                Some(v) => env::set_var("DOCKER_HOST", v),
1903                None => env::remove_var("DOCKER_HOST"),
1904            }
1905        }
1906    }
1907}