1use std::convert::Infallible;
2#[cfg(feature = "ssl_providerless")]
3use std::fs;
4use std::future::Future;
5#[cfg(feature = "ssl_providerless")]
6use std::io;
7#[cfg(any(feature = "pipe", feature = "ssl_providerless"))]
8use std::path::Path;
9#[cfg(feature = "ssl_providerless")]
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::sync::atomic::AtomicUsize;
13use std::sync::atomic::Ordering;
14use std::sync::Arc;
15use std::time::Duration;
16use std::{cmp, env, fmt};
17
18use futures_core::Stream;
19use futures_util::future::FutureExt;
20use futures_util::future::TryFutureExt;
21use futures_util::stream::TryStreamExt;
22use futures_util::StreamExt;
23use http::header::CONTENT_TYPE;
24use http::request::Builder;
25use http_body_util::{BodyExt, Full, StreamBody};
26use hyper::body::{Frame, Incoming};
27use hyper::{self, body::Bytes, Method, Request, Response, StatusCode};
28#[cfg(feature = "ssl_providerless")]
29use hyper_rustls::HttpsConnector;
30#[cfg(any(feature = "http", test))]
31use hyper_util::{
32 client::legacy::{connect::HttpConnector, Client},
33 rt::TokioExecutor,
34};
35#[cfg(all(feature = "pipe", unix))]
36use hyperlocal::UnixConnector;
37use log::{debug, trace};
38#[cfg(feature = "ssl_providerless")]
39use rustls::{crypto::CryptoProvider, sign::CertifiedKey};
40#[cfg(feature = "ssl_providerless")]
41use rustls_pki_types::{CertificateDer, PrivateKeyDer};
42use serde_derive::{Deserialize, Serialize};
43use tokio::io::{split, AsyncRead, AsyncWrite};
44use tokio_util::codec::FramedRead;
45
46use crate::container::LogOutput;
47use crate::errors::Error;
48use crate::errors::Error::*;
49use crate::read::{
50 AsyncUpgraded, IncomingStream, JsonLineDecoder, NewlineLogOutputDecoder, StreamReader,
51};
52use crate::uri::Uri;
53#[cfg(all(feature = "pipe", windows))]
54use hyper_named_pipe::NamedPipeConnector;
55
56use crate::auth::{base64_url_encode, DockerCredentialsHeader};
57use serde::de::DeserializeOwned;
58use serde::ser::Serialize;
59
60#[cfg(unix)]
62pub const DEFAULT_SOCKET: &str = "unix:///var/run/docker.sock";
63
64#[cfg(windows)]
66pub const DEFAULT_NAMED_PIPE: &str = "npipe:////./pipe/docker_engine";
67
68#[cfg(feature = "http")]
70pub const DEFAULT_TCP_ADDRESS: &str = "tcp://localhost:2375";
71
72#[cfg(unix)]
74pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_SOCKET;
75
76#[cfg(windows)]
78pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_NAMED_PIPE;
79
80#[cfg(feature = "http")]
82const DEFAULT_TIMEOUT: u64 = 120;
83
84pub const API_DEFAULT_VERSION: &ClientVersion = &ClientVersion {
86 major_version: 1,
87 minor_version: 45,
88};
89
90#[derive(Debug, Clone)]
91pub(crate) enum ClientType {
92 #[cfg(all(feature = "pipe", unix))]
93 Unix,
94 #[cfg(feature = "http")]
95 Http,
96 #[cfg(feature = "ssl_providerless")]
97 SSL,
98 #[cfg(all(feature = "pipe", windows))]
99 NamedPipe,
100 Custom {
101 scheme: String,
102 },
103}
104
105pub type BollardRequest = Request<BodyType>;
107
108type TransportReturnTy =
109 Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>;
110
111pub trait CustomTransport: Send + Sync {
113 fn request(&self, request: BollardRequest) -> TransportReturnTy;
115}
116
117impl<Callback, ReturnTy> CustomTransport for Callback
119where
120 Callback: Fn(BollardRequest) -> ReturnTy + Send + Sync,
121 ReturnTy: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
122{
123 fn request(&self, request: BollardRequest) -> TransportReturnTy {
124 Box::pin(self(request))
125 }
126}
127
128pub(crate) enum Transport {
134 #[cfg(feature = "http")]
135 Http {
136 client: Client<HttpConnector, BodyType>,
137 },
138 #[cfg(feature = "ssl_providerless")]
139 Https {
140 client: Client<HttpsConnector<HttpConnector>, BodyType>,
141 },
142 #[cfg(all(feature = "pipe", unix))]
143 Unix {
144 client: Client<UnixConnector, BodyType>,
145 },
146 #[cfg(all(feature = "pipe", windows))]
147 NamedPipe {
148 client: Client<NamedPipeConnector, BodyType>,
149 },
150 #[cfg(test)]
151 Mock {
152 client: Client<yup_hyper_mock::HostToReplyConnector, BodyType>,
153 },
154 Custom {
155 transport: Box<dyn CustomTransport>,
156 },
157}
158
159impl fmt::Debug for Transport {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 match self {
162 #[cfg(feature = "http")]
163 Transport::Http { .. } => write!(f, "HTTP"),
164 #[cfg(feature = "ssl_providerless")]
165 Transport::Https { .. } => write!(f, "HTTPS(rustls)"),
166 #[cfg(all(feature = "pipe", unix))]
167 Transport::Unix { .. } => write!(f, "Unix"),
168 #[cfg(all(feature = "pipe", windows))]
169 Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
170 #[cfg(test)]
171 Transport::Mock { .. } => write!(f, "Mock"),
172 Transport::Custom { .. } => write!(f, "Custom"),
173 }
174 }
175}
176
177#[derive(Debug, Copy, Clone, PartialEq)]
178pub struct ClientVersion {
185 pub major_version: usize,
187 pub minor_version: usize,
189}
190
191pub(crate) enum MaybeClientVersion {
192 Some(ClientVersion),
193 None,
194}
195
196impl<T: Into<String>> From<T> for MaybeClientVersion {
197 fn from(s: T) -> MaybeClientVersion {
198 match s
199 .into()
200 .split('.')
201 .map(|v| v.parse::<usize>())
202 .collect::<Vec<Result<usize, std::num::ParseIntError>>>()
203 .as_slice()
204 {
205 [Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
206 major_version: first.to_owned(),
207 minor_version: second.to_owned(),
208 }),
209 _ => MaybeClientVersion::None,
210 }
211 }
212}
213
214impl fmt::Display for ClientVersion {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 write!(f, "{}.{}", self.major_version, self.minor_version)
217 }
218}
219
220impl PartialOrd for ClientVersion {
221 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
222 match self.major_version.partial_cmp(&other.major_version) {
223 Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
224 res => res,
225 }
226 }
227}
228
229impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
230 fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
231 ClientVersion {
232 major_version: tpl.0.load(Ordering::Relaxed),
233 minor_version: tpl.1.load(Ordering::Relaxed),
234 }
235 }
236}
237
238pub(crate) fn serialize_as_json<T, S>(t: &T, s: S) -> Result<S::Ok, S::Error>
239where
240 T: Serialize,
241 S: serde::Serializer,
242{
243 s.serialize_str(
244 &serde_json::to_string(t).map_err(|e| serde::ser::Error::custom(format!("{e}")))?,
245 )
246}
247
248pub(crate) fn serialize_join_newlines<S>(t: &[&str], s: S) -> Result<S::Ok, S::Error>
249where
250 S: serde::Serializer,
251{
252 s.serialize_str(&t.join("\n"))
253}
254
255#[cfg(feature = "time")]
256pub fn deserialize_rfc3339<'de, D: serde::Deserializer<'de>>(
257 d: D,
258) -> Result<time::OffsetDateTime, D::Error> {
259 let s: String = serde::Deserialize::deserialize(d)?;
260 time::OffsetDateTime::parse(&s, &time::format_description::well_known::Rfc3339)
261 .map_err(|e| serde::de::Error::custom(format!("{:?}", e)))
262}
263
264#[cfg(feature = "time")]
265pub fn serialize_rfc3339<S: serde::Serializer>(
266 date: &time::OffsetDateTime,
267 s: S,
268) -> Result<S::Ok, S::Error> {
269 s.serialize_str(
270 &date
271 .format(&time::format_description::well_known::Rfc3339)
272 .map_err(|e| serde::ser::Error::custom(format!("{:?}", e)))?,
273 )
274}
275
276#[cfg(feature = "time")]
277pub(crate) fn serialize_as_timestamp<S>(
278 opt: &Option<crate::models::BollardDate>,
279 s: S,
280) -> Result<S::Ok, S::Error>
281where
282 S: serde::Serializer,
283{
284 match opt {
285 Some(t) => s.serialize_str(&format!(
286 "{}.{}",
287 t.unix_timestamp(),
288 t.unix_timestamp_nanos()
289 )),
290 None => s.serialize_str(""),
291 }
292}
293
294#[cfg(all(feature = "chrono", not(feature = "time")))]
295pub(crate) fn serialize_as_timestamp<S>(
296 opt: &Option<crate::models::BollardDate>,
297 s: S,
298) -> Result<S::Ok, S::Error>
299where
300 S: serde::Serializer,
301{
302 match opt {
303 Some(t) => s.serialize_str(&format!("{}.{}", t.timestamp(), t.timestamp_subsec_nanos())),
304 None => s.serialize_str(""),
305 }
306}
307
308#[derive(Debug)]
309pub struct Docker {
321 pub(crate) transport: Arc<Transport>,
322 pub(crate) client_type: ClientType,
323 pub(crate) client_addr: String,
324 pub(crate) client_timeout: u64,
325 pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
326}
327
328impl Clone for Docker {
329 fn clone(&self) -> Docker {
330 Docker {
331 transport: self.transport.clone(),
332 client_type: self.client_type.clone(),
333 client_addr: self.client_addr.clone(),
334 client_timeout: self.client_timeout,
335 version: self.version.clone(),
336 }
337 }
338}
339
340#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
342struct DockerServerErrorMessage {
343 message: String,
344}
345
346#[cfg(feature = "ssl_providerless")]
347#[derive(Debug)]
348struct DockerClientCertResolver {
349 ssl_key: PathBuf,
350 ssl_cert: PathBuf,
351}
352
353#[cfg(feature = "ssl_providerless")]
354impl DockerClientCertResolver {
355 pub fn default_cert_path() -> Result<PathBuf, Error> {
358 let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
359 if let Ok(ref path) = from_env {
360 Ok(Path::new(path).to_owned())
361 } else {
362 let home = home::home_dir().ok_or_else(|| NoHomePathError)?;
363 Ok(home.join(".docker"))
364 }
365 }
366
367 fn open_buffered(path: &Path) -> Result<io::BufReader<fs::File>, Error> {
368 Ok(io::BufReader::new(fs::File::open(path)?))
369 }
370
371 fn certs(path: &Path) -> Result<Vec<CertificateDer<'static>>, Error> {
372 Ok(rustls_pemfile::certs(&mut Self::open_buffered(path)?)
373 .collect::<Result<Vec<CertificateDer<'static>>, io::Error>>()?)
374 }
375
376 fn keys(path: &Path) -> Result<Vec<PrivateKeyDer<'static>>, Error> {
377 let mut rdr = Self::open_buffered(path)?;
378 let mut keys = vec![];
379 if let Some(key) = rustls_pemfile::private_key(&mut rdr).map_err(|_| CertPathError {
380 path: path.to_path_buf(),
381 })? {
382 keys.push(key);
383 }
384
385 Ok(keys)
386 }
387
388 fn docker_client_key(&self) -> Result<Arc<CertifiedKey>, Error> {
389 let all_certs = Self::certs(&self.ssl_cert)?;
390
391 let mut all_keys = Self::keys(&self.ssl_key)?;
392 let key = if all_keys.len() == 1 {
393 all_keys.remove(0)
394 } else {
395 return Err(CertMultipleKeys {
396 count: all_keys.len(),
397 path: self.ssl_key.to_owned(),
398 });
399 };
400 let signing_key = CryptoProvider::get_default()
401 .expect("no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point")
402 .key_provider
403 .load_private_key(key)
404 .map_err(|_| CertParseError {
405 path: self.ssl_key.to_owned(),
406 })?;
407
408 Ok(Arc::new(CertifiedKey::new(all_certs, signing_key)))
409 }
410}
411
412#[cfg(feature = "ssl_providerless")]
413impl rustls::client::ResolvesClientCert for DockerClientCertResolver {
414 fn resolve(&self, _: &[&[u8]], _: &[rustls::SignatureScheme]) -> Option<Arc<CertifiedKey>> {
415 self.docker_client_key().ok()
416 }
417
418 fn has_certs(&self) -> bool {
419 true
420 }
421}
422
423#[cfg(feature = "ssl_providerless")]
426impl Docker {
427 pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
455 let cert_path = DockerClientCertResolver::default_cert_path()?;
456 Docker::connect_with_ssl(
457 if let Ok(ref host) = env::var("DOCKER_HOST") {
458 host
459 } else {
460 DEFAULT_TCP_ADDRESS
461 },
462 &cert_path.join("key.pem"),
463 &cert_path.join("cert.pem"),
464 &cert_path.join("ca.pem"),
465 DEFAULT_TIMEOUT,
466 API_DEFAULT_VERSION,
467 )
468 }
469
470 pub fn connect_with_ssl(
507 addr: &str,
508 ssl_key: &Path,
509 ssl_cert: &Path,
510 ssl_ca: &Path,
511 timeout: u64,
512 client_version: &ClientVersion,
513 ) -> Result<Docker, Error> {
514 let client_addr = addr.replacen("tcp://", "", 1).replacen("https://", "", 1);
516
517 let mut root_store = rustls::RootCertStore::empty();
518
519 #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
520 let native_certs = rustls_native_certs::load_native_certs();
521
522 #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
523 if native_certs.errors.is_empty() {
524 for cert in native_certs.certs {
525 root_store
526 .add(cert)
527 .map_err(|err| NoNativeCertsError { err })?
528 }
529 } else {
530 return Err(LoadNativeCertsErrors {
531 errors: native_certs.errors,
532 });
533 }
534 #[cfg(any(feature = "test_ssl", feature = "webpki"))]
535 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
536
537 let mut ca_pem = io::Cursor::new(fs::read(ssl_ca).map_err(|_| CertPathError {
538 path: ssl_ca.to_owned(),
539 })?);
540
541 root_store.add_parsable_certificates(
542 rustls_pemfile::certs(&mut ca_pem).collect::<Result<Vec<_>, _>>()?,
543 );
544
545 let config = rustls::ClientConfig::builder()
546 .with_root_certificates(root_store)
547 .with_client_cert_resolver(Arc::new(DockerClientCertResolver {
548 ssl_key: ssl_key.to_owned(),
549 ssl_cert: ssl_cert.to_owned(),
550 }));
551
552 let mut http_connector = HttpConnector::new();
553 http_connector.enforce_http(false);
554
555 let https_connector: HttpsConnector<HttpConnector> =
556 HttpsConnector::from((http_connector, config));
557
558 let mut client_builder = Client::builder(TokioExecutor::new());
559 client_builder.pool_max_idle_per_host(0);
560
561 let client = client_builder.build(https_connector);
562 let transport = Transport::Https { client };
563 let docker = Docker {
564 transport: Arc::new(transport),
565 client_type: ClientType::SSL,
566 client_addr,
567 client_timeout: timeout,
568 version: Arc::new((
569 AtomicUsize::new(client_version.major_version),
570 AtomicUsize::new(client_version.minor_version),
571 )),
572 };
573
574 Ok(docker)
575 }
576}
577
578#[cfg(feature = "http")]
579impl Docker {
581 pub fn connect_with_http_defaults() -> Result<Docker, Error> {
602 let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
603 Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
604 }
605
606 pub fn connect_with_http(
628 addr: &str,
629 timeout: u64,
630 client_version: &ClientVersion,
631 ) -> Result<Docker, Error> {
632 let client_addr = addr.replacen("tcp://", "", 1).replacen("http://", "", 1);
634
635 let http_connector = HttpConnector::new();
636
637 let mut client_builder = Client::builder(TokioExecutor::new());
638 client_builder.pool_max_idle_per_host(0);
639
640 let client = client_builder.build(http_connector);
641 let transport = Transport::Http { client };
642 let docker = Docker {
643 transport: Arc::new(transport),
644 client_type: ClientType::Http,
645 client_addr,
646 client_timeout: timeout,
647 version: Arc::new((
648 AtomicUsize::new(client_version.major_version),
649 AtomicUsize::new(client_version.minor_version),
650 )),
651 };
652
653 Ok(docker)
654 }
655}
656
657impl Docker {
659 pub fn connect_with_custom_transport<S: Into<String>>(
706 transport: impl CustomTransport + 'static,
707 client_addr: Option<S>,
708 timeout: u64,
709 client_version: &ClientVersion,
710 ) -> Result<Docker, Error> {
711 let client_addr = client_addr.map(Into::into).unwrap_or_default();
712 let (scheme, client_addr) = client_addr
713 .split_once("://")
714 .unwrap_or(("", client_addr.as_str()));
715 let client_addr = client_addr.to_owned();
716 let scheme = scheme.to_owned();
717 let transport = Transport::Custom {
718 transport: Box::new(transport),
719 };
720 let docker = Docker {
721 transport: Arc::new(transport),
722 client_type: ClientType::Custom { scheme },
723 client_addr,
724 client_timeout: timeout,
725 version: Arc::new((
726 AtomicUsize::new(client_version.major_version),
727 AtomicUsize::new(client_version.minor_version),
728 )),
729 };
730
731 Ok(docker)
732 }
733}
734
735#[cfg(all(feature = "pipe", any(unix, windows)))]
737impl Docker {
738 pub fn connect_with_socket_defaults() -> Result<Docker, Error> {
758 #[cfg(unix)]
759 let path = DEFAULT_SOCKET;
760 #[cfg(windows)]
761 let path = DEFAULT_NAMED_PIPE;
762
763 Docker::connect_with_socket(path, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
764 }
765
766 pub fn connect_with_socket(
785 path: &str,
786 timeout: u64,
787 client_version: &ClientVersion,
788 ) -> Result<Docker, Error> {
789 let clean_path = path
791 .trim_start_matches("unix://")
792 .trim_start_matches("npipe://");
793
794 if !std::path::Path::new(clean_path).exists() {
796 return Err(Error::SocketNotFoundError(clean_path.to_string()));
797 }
798
799 #[cfg(unix)]
800 let docker = Docker::connect_with_unix(path, timeout, client_version)?;
801 #[cfg(windows)]
802 let docker = Docker::connect_with_named_pipe(path, timeout, client_version)?;
803
804 Ok(docker)
805 }
806
807 pub fn connect_with_local_defaults() -> Result<Docker, Error> {
816 #[cfg(unix)]
817 return Docker::connect_with_unix_defaults();
818 #[cfg(windows)]
819 return Docker::connect_with_named_pipe_defaults();
820 }
821
822 pub fn connect_with_local(
831 addr: &str,
832 timeout: u64,
833 client_version: &ClientVersion,
834 ) -> Result<Docker, Error> {
835 #[cfg(unix)]
836 return Docker::connect_with_unix(addr, timeout, client_version);
837 #[cfg(windows)]
838 return Docker::connect_with_named_pipe(addr, timeout, client_version);
839 }
840}
841
842impl Docker {
844 pub fn connect_with_defaults() -> Result<Docker, Error> {
858 let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_DOCKER_HOST.to_string());
859 match host {
860 #[cfg(all(feature = "pipe", unix))]
861 h if h.starts_with("unix://") => {
862 Docker::connect_with_unix(&h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
863 }
864 #[cfg(all(feature = "pipe", windows))]
865 h if h.starts_with("npipe://") => {
866 Docker::connect_with_named_pipe(&h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
867 }
868 #[cfg(feature = "http")]
869 h if h.starts_with("tcp://") || h.starts_with("http://") => {
870 #[cfg(feature = "ssl_providerless")]
871 if env::var("DOCKER_TLS_VERIFY").is_ok() {
872 return Docker::connect_with_ssl_defaults();
873 }
874 Docker::connect_with_http_defaults()
875 }
876 #[cfg(feature = "ssl_providerless")]
877 h if h.starts_with("https://") => Docker::connect_with_ssl_defaults(),
878 _ => Err(UnsupportedURISchemeError {
879 uri: host.to_string(),
880 }),
881 }
882 }
883}
884
885#[cfg(all(feature = "pipe", unix))]
886impl Docker {
888 pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
907 let socket_path = env::var("DOCKER_HOST").ok().and_then(|p| {
909 if p.starts_with("unix://") {
910 Some(p)
911 } else {
912 None
913 }
914 });
915 let path = socket_path.as_deref();
916 let path_ref: &str = path.unwrap_or(DEFAULT_SOCKET);
917 Docker::connect_with_unix(path_ref, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
918 }
919
920 pub fn connect_with_unix(
939 path: &str,
940 timeout: u64,
941 client_version: &ClientVersion,
942 ) -> Result<Docker, Error> {
943 let client_addr = path.replacen("unix://", "", 1);
944
945 if !Path::new(&client_addr).exists() {
947 return Err(Error::SocketNotFoundError(client_addr));
948 }
949
950 let unix_connector = UnixConnector;
951
952 let mut client_builder = Client::builder(TokioExecutor::new());
953 client_builder.pool_max_idle_per_host(0);
954
955 let client = client_builder.build(unix_connector);
956 let transport = Transport::Unix { client };
957 let docker = Docker {
958 transport: Arc::new(transport),
959 client_type: ClientType::Unix,
960 client_addr,
961 client_timeout: timeout,
962 version: Arc::new((
963 AtomicUsize::new(client_version.major_version),
964 AtomicUsize::new(client_version.minor_version),
965 )),
966 };
967
968 Ok(docker)
969 }
970}
971
972#[cfg(all(feature = "pipe", windows))]
973impl Docker {
976 pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
996 Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
997 }
998
999 pub fn connect_with_named_pipe(
1020 path: &str,
1021 timeout: u64,
1022 client_version: &ClientVersion,
1023 ) -> Result<Docker, Error> {
1024 let client_addr = path.replacen("npipe://", "", 1);
1025
1026 let named_pipe_connector = NamedPipeConnector;
1027
1028 let mut client_builder = Client::builder(TokioExecutor::new());
1029 client_builder.http1_title_case_headers(true);
1030 client_builder.pool_max_idle_per_host(0);
1031
1032 let client = client_builder.build(named_pipe_connector);
1033 let transport = Transport::NamedPipe { client };
1034 let docker = Docker {
1035 transport: Arc::new(transport),
1036 client_type: ClientType::NamedPipe,
1037 client_addr,
1038 client_timeout: timeout,
1039 version: Arc::new((
1040 AtomicUsize::new(client_version.major_version),
1041 AtomicUsize::new(client_version.minor_version),
1042 )),
1043 };
1044
1045 Ok(docker)
1046 }
1047}
1048
1049#[cfg(test)]
1050impl Docker {
1051 pub fn connect_with_mock(
1080 connector: yup_hyper_mock::HostToReplyConnector,
1081 client_addr: String,
1082 timeout: u64,
1083 client_version: &ClientVersion,
1084 ) -> Result<Docker, Error> {
1085 let client_builder = Client::builder(TokioExecutor::new());
1086 let client = client_builder.build(connector);
1087
1088 let (transport, client_type) = (Transport::Mock { client }, ClientType::Http);
1089
1090 let docker = Docker {
1091 transport: Arc::new(transport),
1092 client_type,
1093 client_addr,
1094 client_timeout: timeout,
1095 version: Arc::new((
1096 AtomicUsize::new(client_version.major_version),
1097 AtomicUsize::new(client_version.minor_version),
1098 )),
1099 };
1100
1101 Ok(docker)
1102 }
1103}
1104
1105impl Docker {
1106 pub fn with_timeout(mut self, timeout: Duration) -> Self {
1112 self.set_timeout(timeout);
1113 self
1114 }
1115
1116 pub fn timeout(&self) -> Duration {
1120 Duration::from_secs(self.client_timeout)
1121 }
1122
1123 pub fn set_timeout(&mut self, timeout: Duration) {
1129 self.client_timeout = timeout.as_secs();
1130 }
1131}
1132
1133impl Docker {
1135 pub(crate) fn process_into_value<T>(
1136 &self,
1137 req: Result<Request<BodyType>, Error>,
1138 ) -> impl Future<Output = Result<T, Error>>
1139 where
1140 T: DeserializeOwned,
1141 {
1142 let fut = self.process_request(req);
1143 async move { Docker::decode_response(fut.await?).await }
1144 }
1145
1146 pub(crate) fn process_into_stream<T>(
1147 &self,
1148 req: Result<Request<BodyType>, Error>,
1149 ) -> impl Stream<Item = Result<T, Error>> + Unpin
1150 where
1151 T: DeserializeOwned,
1152 {
1153 Box::pin(
1154 self.process_request(req)
1155 .map_ok(Docker::decode_into_stream::<T>)
1156 .into_stream()
1157 .try_flatten(),
1158 )
1159 }
1160
1161 pub(crate) fn process_into_stream_string(
1162 &self,
1163 req: Result<Request<BodyType>, Error>,
1164 ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
1165 Box::pin(
1166 self.process_request(req)
1167 .map_ok(Docker::decode_into_stream_string)
1168 .try_flatten_stream(),
1169 )
1170 }
1171
1172 pub(crate) fn process_into_unit(
1173 &self,
1174 req: Result<Request<BodyType>, Error>,
1175 ) -> impl Future<Output = Result<(), Error>> {
1176 let fut = self.process_request(req);
1177 async move {
1178 fut.await?;
1179 Ok(())
1180 }
1181 }
1182
1183 pub(crate) fn process_into_body(
1184 &self,
1185 req: Result<Request<BodyType>, Error>,
1186 ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
1187 Box::pin(
1188 self.process_request(req)
1189 .map_ok(|response| IncomingStream::new(response.into_body()))
1190 .into_stream()
1191 .try_flatten(),
1192 )
1193 }
1194
1195 pub(crate) fn process_into_string(
1196 &self,
1197 req: Result<Request<BodyType>, Error>,
1198 ) -> impl Future<Output = Result<String, Error>> {
1199 let fut = self.process_request(req);
1200 async move {
1201 let response = fut.await?;
1202 Docker::decode_into_string(response).await
1203 }
1204 }
1205
1206 pub(crate) async fn process_upgraded(
1207 &self,
1208 req: Result<Request<BodyType>, Error>,
1209 ) -> Result<(impl AsyncRead, impl AsyncWrite), Error> {
1210 let res = self.process_request(req).await?;
1211 let upgraded = hyper::upgrade::on(res).await?;
1212 let tokio_upgraded = AsyncUpgraded::new(upgraded);
1213
1214 Ok(split(tokio_upgraded))
1215 }
1216
1217 pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<BodyType, Error>
1218 where
1219 S: Serialize,
1220 {
1221 match body.map(|inst| serde_json::to_string(&inst)) {
1222 Some(Ok(res)) => Ok(Some(res)),
1223 Some(Err(e)) => Err(e.into()),
1224 None => Ok(None),
1225 }
1226 .map(|payload| {
1227 debug!("{}", payload.clone().unwrap_or_default());
1228 payload
1229 .map(|content| BodyType::Left(Full::new(content.into())))
1230 .unwrap_or(BodyType::Left(Full::new(Bytes::new())))
1231 })
1232 }
1233
1234 pub fn client_version(&self) -> ClientVersion {
1236 self.version.as_ref().into()
1237 }
1238
1239 pub async fn negotiate_version(self) -> Result<Self, Error> {
1253 let req = self.build_request(
1254 "/version",
1255 Builder::new().method(Method::GET),
1256 None::<String>,
1257 Ok(BodyType::Left(Full::new(Bytes::new()))),
1258 );
1259
1260 let res = self
1261 .process_into_value::<crate::system::Version>(req)
1262 .await?;
1263
1264 let server_version: ClientVersion = if let Some(api_version) = res.api_version {
1265 match api_version.into() {
1266 MaybeClientVersion::Some(client_version) => client_version,
1267 MaybeClientVersion::None => {
1268 return Err(APIVersionParseError {});
1269 }
1270 }
1271 } else {
1272 return Err(APIVersionParseError {});
1273 };
1274
1275 if server_version < self.client_version() {
1276 self.version
1277 .0
1278 .store(server_version.major_version, Ordering::Relaxed);
1279 self.version
1280 .1
1281 .store(server_version.minor_version, Ordering::Relaxed);
1282 }
1283
1284 Ok(self)
1285 }
1286
1287 pub(crate) fn process_request(
1288 &self,
1289 request: Result<Request<BodyType>, Error>,
1290 ) -> impl Future<Output = Result<Response<Incoming>, Error>> {
1291 let transport = self.transport.clone();
1292 let timeout = self.client_timeout;
1293
1294 match request.as_ref().map(|b| b.body()) {
1295 Ok(http_body_util::Either::Left(bytes)) => trace!("request: {:?}", bytes),
1296 Ok(http_body_util::Either::Right(_)) => trace!("request: (stream)"),
1297 Err(e) => trace!("request: Err({e:?}"),
1298 };
1299
1300 async move {
1301 let request = request?;
1302 let response = Docker::execute_request(transport, request, timeout).await?;
1303
1304 let status = response.status();
1305 match status {
1306 s if s.is_success() || s == StatusCode::NOT_MODIFIED => Ok(response),
1308
1309 StatusCode::SWITCHING_PROTOCOLS => Ok(response),
1310
1311 _ => {
1313 let contents = Docker::decode_into_string(response).await?;
1314
1315 let mut message = String::new();
1316 if !contents.is_empty() {
1317 message = serde_json::from_str::<DockerServerErrorMessage>(&contents)
1318 .map(|msg| msg.message)
1319 .or_else(|e| if e.is_data() { Ok(contents) } else { Err(e) })?;
1320 }
1321 Err(DockerResponseServerError {
1322 status_code: status.as_u16(),
1323 message,
1324 })
1325 }
1326 }
1327 }
1328 }
1329
1330 pub(crate) fn build_request<O>(
1331 &self,
1332 path: &str,
1333 builder: Builder,
1334 query: Option<O>,
1335 payload: Result<BodyType, Error>,
1336 ) -> Result<Request<BodyType>, Error>
1337 where
1338 O: Serialize,
1339 {
1340 let uri = Uri::parse(
1341 &self.client_addr,
1342 &self.client_type,
1343 path,
1344 query,
1345 &self.client_version(),
1346 )?;
1347 let request_uri: hyper::Uri = uri.try_into()?;
1348 debug!("{}", &request_uri);
1349 Ok(builder
1350 .uri(request_uri)
1351 .header(CONTENT_TYPE, "application/json")
1352 .body(payload?)?)
1353 }
1354
1355 pub(crate) fn build_request_with_registry_auth<O>(
1356 &self,
1357 path: &str,
1358 mut builder: Builder,
1359 query: Option<O>,
1360 payload: Result<BodyType, Error>,
1361 credentials: DockerCredentialsHeader,
1362 ) -> Result<Request<BodyType>, Error>
1363 where
1364 O: Serialize,
1365 {
1366 match credentials {
1367 DockerCredentialsHeader::Config(config) => {
1368 let value = match config {
1369 Some(config) => base64_url_encode(&serde_json::to_string(&config)?),
1370 None => "".into(),
1371 };
1372
1373 builder = builder.header("X-Registry-Config", value)
1374 }
1375 DockerCredentialsHeader::Auth(auth) => {
1376 let value = match auth {
1377 Some(config) => base64_url_encode(&serde_json::to_string(&config)?),
1378 None => "".into(),
1379 };
1380
1381 builder = builder.header("X-Registry-Auth", value)
1382 }
1383 }
1384
1385 self.build_request(path, builder, query, payload)
1386 }
1387
1388 async fn execute_request(
1389 transport: Arc<Transport>,
1390 req: Request<BodyType>,
1391 timeout: u64,
1392 ) -> Result<Response<Incoming>, Error> {
1393 let request = match *transport {
1395 #[cfg(feature = "http")]
1396 Transport::Http { ref client } => client.request(req).map_err(Error::from).boxed(),
1397 #[cfg(feature = "ssl_providerless")]
1398 Transport::Https { ref client } => client.request(req).map_err(Error::from).boxed(),
1399 #[cfg(all(feature = "pipe", unix))]
1400 Transport::Unix { ref client } => client.request(req).map_err(Error::from).boxed(),
1401 #[cfg(all(feature = "pipe", windows))]
1402 Transport::NamedPipe { ref client } => client.request(req).map_err(Error::from).boxed(),
1403 #[cfg(test)]
1404 Transport::Mock { ref client } => client.request(req).map_err(Error::from).boxed(),
1405 Transport::Custom { ref transport } => transport.request(req).boxed(),
1406 };
1407
1408 match tokio::time::timeout(Duration::from_secs(timeout), request).await {
1409 Ok(v) => Ok(v?),
1410 Err(_) => Err(RequestTimeoutError),
1411 }
1412 }
1413
1414 fn decode_into_stream<T>(res: Response<Incoming>) -> impl Stream<Item = Result<T, Error>>
1415 where
1416 T: DeserializeOwned,
1417 {
1418 FramedRead::new(StreamReader::new(res.into_body()), JsonLineDecoder::new())
1419 }
1420
1421 fn decode_into_stream_string(
1422 res: Response<Incoming>,
1423 ) -> impl Stream<Item = Result<LogOutput, Error>> {
1424 FramedRead::new(
1425 StreamReader::new(res.into_body()),
1426 NewlineLogOutputDecoder::new(false),
1427 )
1428 .map_err(Error::from)
1429 }
1430
1431 async fn decode_into_string(response: Response<Incoming>) -> Result<String, Error> {
1432 let body = response.into_body().collect().await?.to_bytes();
1433
1434 Ok(String::from_utf8_lossy(&body).to_string())
1435 }
1436
1437 async fn decode_response<T>(response: Response<Incoming>) -> Result<T, Error>
1438 where
1439 T: DeserializeOwned,
1440 {
1441 let bytes = response.into_body().collect().await?.to_bytes();
1442
1443 debug!("Decoded into string: {}", &String::from_utf8_lossy(&bytes));
1444
1445 serde_json::from_slice::<T>(&bytes).map_err(|e| {
1446 if e.is_data() {
1447 JsonDataError {
1448 message: e.to_string(),
1449 column: e.column(),
1450 #[cfg(feature = "json_data_content")]
1451 contents: String::from_utf8_lossy(&bytes).to_string(),
1452 }
1453 } else {
1454 e.into()
1455 }
1456 })
1457 }
1458}
1459
1460pub(crate) type BodyType = http_body_util::Either<
1462 Full<Bytes>,
1463 StreamBody<Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, Infallible>> + Send>>>,
1464>;
1465
1466pub(crate) fn body_stream(body: impl Stream<Item = Bytes> + Send + 'static) -> BodyType {
1467 BodyType::Right(StreamBody::new(Box::pin(body.map(|a| Ok(Frame::data(a))))))
1468}