1#[cfg(feature = "ssl_providerless")]
2use std::fs;
3use std::future::Future;
4#[cfg(feature = "ssl_providerless")]
5use std::io;
6#[cfg(any(feature = "pipe", feature = "ssl_providerless"))]
7use std::path::Path;
8#[cfg(feature = "ssl_providerless")]
9use std::path::PathBuf;
10use std::pin::Pin;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering;
13use std::sync::Arc;
14use std::time::Duration;
15use std::{cmp, env, fmt};
16
17use futures_core::Stream;
18use futures_util::future::FutureExt;
19use futures_util::future::TryFutureExt;
20use futures_util::stream::TryStreamExt;
21use futures_util::StreamExt;
22use http::header::CONTENT_TYPE;
23use http::request::Builder;
24use http_body_util::{BodyExt, Full, StreamBody};
25use hyper::body::{Frame, Incoming};
26use hyper::{self, body::Bytes, Method, Request, Response, StatusCode};
27#[cfg(feature = "ssl_providerless")]
28use hyper_rustls::HttpsConnector;
29#[cfg(any(feature = "http", test))]
30use hyper_util::client::legacy::connect::HttpConnector;
31#[cfg(any(feature = "http", feature = "ssh", feature = "pipe", test))]
32use hyper_util::{client::legacy::Client, rt::TokioExecutor};
33#[cfg(all(feature = "pipe", unix))]
34use hyperlocal::UnixConnector;
35use log::{debug, trace};
36#[cfg(feature = "ssl_providerless")]
37use rustls::{crypto::CryptoProvider, sign::CertifiedKey};
38#[cfg(feature = "ssl_providerless")]
39use rustls_pki_types::{CertificateDer, PrivateKeyDer};
40use serde_derive::{Deserialize, Serialize};
41use tokio::io::{split, AsyncRead, AsyncWrite};
42use tokio_util::codec::FramedRead;
43
44use crate::container::LogOutput;
45use crate::errors::Error;
46use crate::errors::Error::*;
47use crate::read::{
48 AsyncUpgraded, IncomingStream, JsonLineDecoder, NewlineLogOutputDecoder, StreamReader,
49};
50use crate::uri::Uri;
51#[cfg(all(feature = "pipe", windows))]
52use hyper_named_pipe::NamedPipeConnector;
53
54use crate::auth::{base64_url_encode, DockerCredentialsHeader};
55use serde::de::DeserializeOwned;
56use serde::ser::Serialize;
57
58#[cfg(feature = "websocket")]
59use tokio_tungstenite::WebSocketStream;
60
61#[cfg(unix)]
63pub const DEFAULT_SOCKET: &str = "unix:///var/run/docker.sock";
64
65#[cfg(windows)]
67pub const DEFAULT_NAMED_PIPE: &str = "npipe:////./pipe/docker_engine";
68
69#[cfg(feature = "http")]
71pub const DEFAULT_TCP_ADDRESS: &str = "tcp://localhost:2375";
72
73#[cfg(feature = "ssh")]
75pub const DEFAULT_SSH_ADDRESS: &str = "ssh://localhost";
76
77#[cfg(unix)]
83pub(crate) const DEFAULT_PODMAN_SOCKET_TEMPLATE: &str = "/run/user/{UID}/podman/podman.sock";
84
85#[cfg(unix)]
87pub(crate) const DEFAULT_PODMAN_SYSTEM_SOCKET: &str = "unix:///run/podman/podman.sock";
88
89#[cfg(unix)]
91pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_SOCKET;
92
93#[cfg(windows)]
95pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_NAMED_PIPE;
96
97#[cfg(any(feature = "http", feature = "ssh", feature = "pipe"))]
99const DEFAULT_TIMEOUT: u64 = 120;
100
101pub const API_DEFAULT_VERSION: &ClientVersion = &ClientVersion {
103 major_version: 1,
104 minor_version: 53,
105};
106
107#[derive(Debug, Clone)]
108pub(crate) enum ClientType {
109 #[cfg(all(feature = "pipe", unix))]
110 Unix,
111 #[cfg(feature = "http")]
112 Http,
113 #[cfg(feature = "ssl_providerless")]
114 SSL,
115 #[cfg(all(feature = "pipe", windows))]
116 NamedPipe,
117 #[cfg(feature = "ssh")]
118 Ssh,
119 Custom {
120 scheme: String,
121 },
122}
123
124pub type BollardRequest = Request<BodyType>;
126
127type TransportReturnTy = Pin<Box<dyn Future<Output = Result<Response<Incoming>, Error>> + Send>>;
128
129pub trait CustomTransport: Send + Sync {
131 fn request(&self, request: BollardRequest) -> TransportReturnTy;
133}
134
135impl<Callback, ReturnTy> CustomTransport for Callback
137where
138 Callback: Fn(BollardRequest) -> ReturnTy + Send + Sync,
139 ReturnTy: Future<Output = Result<Response<Incoming>, Error>> + Send + 'static,
140{
141 fn request(&self, request: BollardRequest) -> TransportReturnTy {
142 Box::pin(self(request))
143 }
144}
145
146pub(crate) enum Transport {
152 #[cfg(feature = "http")]
153 Http {
154 client: Client<HttpConnector, BodyType>,
155 },
156 #[cfg(feature = "ssl_providerless")]
157 Https {
158 client: Client<HttpsConnector<HttpConnector>, BodyType>,
159 },
160 #[cfg(all(feature = "pipe", unix))]
161 Unix {
162 client: Client<UnixConnector, BodyType>,
163 },
164 #[cfg(all(feature = "pipe", windows))]
165 NamedPipe {
166 client: Client<NamedPipeConnector, BodyType>,
167 },
168 #[cfg(feature = "ssh")]
169 Ssh {
170 client: Client<crate::ssh::SshConnector, BodyType>,
171 },
172 #[cfg(test)]
173 Mock {
174 client: Client<yup_hyper_mock::HostToReplyConnector, BodyType>,
175 },
176 Custom {
177 transport: Box<dyn CustomTransport>,
178 },
179}
180
181impl fmt::Debug for Transport {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 match self {
184 #[cfg(feature = "http")]
185 Transport::Http { .. } => write!(f, "HTTP"),
186 #[cfg(feature = "ssl_providerless")]
187 Transport::Https { .. } => write!(f, "HTTPS(rustls)"),
188 #[cfg(all(feature = "pipe", unix))]
189 Transport::Unix { .. } => write!(f, "Unix"),
190 #[cfg(all(feature = "pipe", windows))]
191 Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
192 #[cfg(feature = "ssh")]
193 Transport::Ssh { .. } => write!(f, "SSH"),
194 #[cfg(test)]
195 Transport::Mock { .. } => write!(f, "Mock"),
196 Transport::Custom { .. } => write!(f, "Custom"),
197 }
198 }
199}
200
201#[derive(Debug, Copy, Clone, PartialEq)]
202pub struct ClientVersion {
209 pub major_version: usize,
211 pub minor_version: usize,
213}
214
215pub(crate) enum MaybeClientVersion {
216 Some(ClientVersion),
217 None,
218}
219
220impl<T: Into<String>> From<T> for MaybeClientVersion {
221 fn from(s: T) -> MaybeClientVersion {
222 match s
223 .into()
224 .split('.')
225 .map(|v| v.parse::<usize>())
226 .collect::<Vec<Result<usize, std::num::ParseIntError>>>()
227 .as_slice()
228 {
229 [Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
230 major_version: first.to_owned(),
231 minor_version: second.to_owned(),
232 }),
233 _ => MaybeClientVersion::None,
234 }
235 }
236}
237
238impl fmt::Display for ClientVersion {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 write!(f, "{}.{}", self.major_version, self.minor_version)
241 }
242}
243
244impl PartialOrd for ClientVersion {
245 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
246 match self.major_version.partial_cmp(&other.major_version) {
247 Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
248 res => res,
249 }
250 }
251}
252
253impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
254 fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
255 ClientVersion {
256 major_version: tpl.0.load(Ordering::Relaxed),
257 minor_version: tpl.1.load(Ordering::Relaxed),
258 }
259 }
260}
261
262pub type RequestModifier = Arc<dyn Fn(BollardRequest) -> BollardRequest + Send + Sync>;
263
264pub struct Docker {
278 pub(crate) transport: Arc<Transport>,
279 pub(crate) client_type: ClientType,
280 pub(crate) client_addr: String,
281 pub(crate) client_timeout: u64,
282 pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
283 pub(crate) request_modifier: Option<RequestModifier>,
284}
285
286impl fmt::Debug for Docker {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 f.debug_struct("Docker")
289 .field("transport", &self.transport)
290 .field("client_type", &self.client_type)
291 .field("client_addr", &self.client_addr)
292 .field("client_timeout", &self.client_timeout)
293 .field("version", &self.version)
294 .field(
295 "request_modifier",
296 &self.request_modifier.as_ref().map(|_| "<callback>"),
297 )
298 .finish()
299 }
300}
301
302impl Clone for Docker {
303 fn clone(&self) -> Docker {
304 Docker {
305 transport: self.transport.clone(),
306 client_type: self.client_type.clone(),
307 client_addr: self.client_addr.clone(),
308 client_timeout: self.client_timeout,
309 version: self.version.clone(),
310 request_modifier: self.request_modifier.clone(),
311 }
312 }
313}
314
315#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
317struct DockerServerErrorMessage {
318 message: String,
319}
320
321#[cfg(feature = "ssl_providerless")]
322#[derive(Debug)]
323struct DockerClientCertResolver {
324 ssl_key: PathBuf,
325 ssl_cert: PathBuf,
326}
327
328#[cfg(feature = "ssl_providerless")]
329impl DockerClientCertResolver {
330 pub fn default_cert_path() -> Result<PathBuf, Error> {
333 let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
334 if let Ok(ref path) = from_env {
335 Ok(Path::new(path).to_owned())
336 } else {
337 let home = home::home_dir().ok_or_else(|| NoHomePathError)?;
338 Ok(home.join(".docker"))
339 }
340 }
341
342 fn open_buffered(path: &Path) -> Result<io::BufReader<fs::File>, Error> {
343 Ok(io::BufReader::new(fs::File::open(path)?))
344 }
345
346 fn certs(path: &Path) -> Result<Vec<CertificateDer<'static>>, Error> {
347 use rustls_pki_types::pem::PemObject;
348 rustls_pki_types::CertificateDer::pem_reader_iter(&mut Self::open_buffered(path)?)
349 .filter_map(|res| {
350 if matches!(res, Err(rustls_pki_types::pem::Error::NoItemsFound)) {
351 None
352 } else {
353 Some(res)
354 }
355 })
356 .collect::<Result<Vec<CertificateDer<'static>>, rustls_pki_types::pem::Error>>()
357 .map_err(|_| CertPathError {
358 path: path.to_path_buf(),
359 })
360 }
361
362 fn keys(path: &Path) -> Result<Vec<PrivateKeyDer<'static>>, Error> {
363 use rustls_pki_types::pem::PemObject;
364 let mut rdr = Self::open_buffered(path)?;
365 let mut keys = vec![];
366 match rustls_pki_types::PrivateKeyDer::from_pem_reader(&mut rdr) {
367 Ok(key) => keys.push(key),
368 Err(rustls_pki_types::pem::Error::NoItemsFound) => {}
369 Err(_e) => {
370 return Err(CertPathError {
371 path: path.to_path_buf(),
372 })
373 }
374 }
375 Ok(keys)
376 }
377
378 fn docker_client_key(&self) -> Result<Arc<CertifiedKey>, Error> {
379 let all_certs = Self::certs(&self.ssl_cert)?;
380
381 let mut all_keys = Self::keys(&self.ssl_key)?;
382 let key = if all_keys.len() == 1 {
383 all_keys.remove(0)
384 } else {
385 return Err(CertMultipleKeys {
386 count: all_keys.len(),
387 path: self.ssl_key.to_owned(),
388 });
389 };
390 let signing_key = CryptoProvider::get_default()
391 .expect("no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point")
392 .key_provider
393 .load_private_key(key)
394 .map_err(|_| CertParseError {
395 path: self.ssl_key.to_owned(),
396 })?;
397
398 Ok(Arc::new(CertifiedKey::new(all_certs, signing_key)))
399 }
400}
401
402#[cfg(feature = "ssl_providerless")]
403impl rustls::client::ResolvesClientCert for DockerClientCertResolver {
404 fn resolve(&self, _: &[&[u8]], _: &[rustls::SignatureScheme]) -> Option<Arc<CertifiedKey>> {
405 self.docker_client_key().ok()
406 }
407
408 fn has_certs(&self) -> bool {
409 true
410 }
411}
412
413#[cfg(feature = "ssl_providerless")]
416impl Docker {
417 pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
445 let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
446 Self::connect_with_ssl_default_certs(&host)
447 }
448
449 fn connect_with_ssl_default_certs(host: &str) -> Result<Docker, Error> {
451 let cert_path = DockerClientCertResolver::default_cert_path()?;
452 Docker::connect_with_ssl(
453 host,
454 &cert_path.join("key.pem"),
455 &cert_path.join("cert.pem"),
456 &cert_path.join("ca.pem"),
457 DEFAULT_TIMEOUT,
458 API_DEFAULT_VERSION,
459 )
460 }
461
462 pub fn connect_with_ssl(
499 addr: &str,
500 ssl_key: &Path,
501 ssl_cert: &Path,
502 ssl_ca: &Path,
503 timeout: u64,
504 client_version: &ClientVersion,
505 ) -> Result<Docker, Error> {
506 use rustls_pki_types::pem::PemObject;
507 let client_addr = addr.replacen("tcp://", "", 1).replacen("https://", "", 1);
509
510 let mut root_store = rustls::RootCertStore::empty();
511
512 #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
513 let native_certs = rustls_native_certs::load_native_certs();
514
515 #[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
516 if native_certs.errors.is_empty() {
517 for cert in native_certs.certs {
518 root_store
519 .add(cert)
520 .map_err(|err| NoNativeCertsError { err })?
521 }
522 } else {
523 return Err(LoadNativeCertsErrors {
524 errors: native_certs.errors,
525 });
526 }
527 #[cfg(any(feature = "test_ssl", feature = "webpki"))]
528 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
529
530 let mut ca_pem = io::Cursor::new(fs::read(ssl_ca).map_err(|_| CertPathError {
531 path: ssl_ca.to_owned(),
532 })?);
533
534 root_store.add_parsable_certificates(
535 rustls_pki_types::CertificateDer::pem_reader_iter(&mut ca_pem)
536 .collect::<Result<Vec<_>, _>>()
537 .map_err(|_| CertParseError {
538 path: ssl_ca.to_owned(),
539 })?,
540 );
541
542 let config = rustls::ClientConfig::builder()
543 .with_root_certificates(root_store)
544 .with_client_cert_resolver(Arc::new(DockerClientCertResolver {
545 ssl_key: ssl_key.to_owned(),
546 ssl_cert: ssl_cert.to_owned(),
547 }));
548
549 let mut http_connector = HttpConnector::new();
550 http_connector.enforce_http(false);
551
552 let https_connector: HttpsConnector<HttpConnector> =
553 HttpsConnector::from((http_connector, config));
554
555 let mut client_builder = Client::builder(TokioExecutor::new());
556 client_builder.pool_max_idle_per_host(0);
557
558 let client = client_builder.build(https_connector);
559 let transport = Transport::Https { client };
560 let docker = Docker {
561 transport: Arc::new(transport),
562 client_type: ClientType::SSL,
563 client_addr,
564 client_timeout: timeout,
565 version: Arc::new((
566 AtomicUsize::new(client_version.major_version),
567 AtomicUsize::new(client_version.minor_version),
568 )),
569 request_modifier: None,
570 };
571
572 Ok(docker)
573 }
574}
575
576#[cfg(feature = "http")]
577impl Docker {
579 pub fn connect_with_http_defaults() -> Result<Docker, Error> {
600 let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
601 Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
602 }
603
604 pub fn connect_with_http(
626 addr: &str,
627 timeout: u64,
628 client_version: &ClientVersion,
629 ) -> Result<Docker, Error> {
630 let client_addr = addr.replacen("tcp://", "", 1).replacen("http://", "", 1);
632
633 let http_connector = HttpConnector::new();
634
635 let mut client_builder = Client::builder(TokioExecutor::new());
636 client_builder.pool_max_idle_per_host(0);
637
638 let client = client_builder.build(http_connector);
639 let transport = Transport::Http { client };
640 let docker = Docker {
641 transport: Arc::new(transport),
642 client_type: ClientType::Http,
643 client_addr,
644 client_timeout: timeout,
645 version: Arc::new((
646 AtomicUsize::new(client_version.major_version),
647 AtomicUsize::new(client_version.minor_version),
648 )),
649 request_modifier: None,
650 };
651
652 Ok(docker)
653 }
654}
655
656impl Docker {
658 pub fn connect_with_custom_transport<S: Into<String>>(
705 transport: impl CustomTransport + 'static,
706 client_addr: Option<S>,
707 timeout: u64,
708 client_version: &ClientVersion,
709 ) -> Result<Docker, Error> {
710 let client_addr = client_addr.map(Into::into).unwrap_or_default();
711 let (scheme, client_addr) = client_addr
712 .split_once("://")
713 .unwrap_or(("", client_addr.as_str()));
714 let client_addr = client_addr.to_owned();
715 let scheme = scheme.to_owned();
716 let transport = Transport::Custom {
717 transport: Box::new(transport),
718 };
719 let docker = Docker {
720 transport: Arc::new(transport),
721 client_type: ClientType::Custom { scheme },
722 client_addr,
723 client_timeout: timeout,
724 version: Arc::new((
725 AtomicUsize::new(client_version.major_version),
726 AtomicUsize::new(client_version.minor_version),
727 )),
728 request_modifier: None,
729 };
730
731 Ok(docker)
732 }
733}
734
735#[cfg(all(feature = "pipe", any(unix, windows)))]
737impl Docker {
738 pub fn connect_with_socket_defaults() -> Result<Docker, Error> {
758 #[cfg(unix)]
759 let path = DEFAULT_SOCKET;
760 #[cfg(windows)]
761 let path = DEFAULT_NAMED_PIPE;
762
763 Docker::connect_with_socket(path, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
764 }
765
766 pub fn connect_with_socket(
785 path: &str,
786 timeout: u64,
787 client_version: &ClientVersion,
788 ) -> Result<Docker, Error> {
789 let clean_path = path
791 .trim_start_matches("unix://")
792 .trim_start_matches("npipe://");
793
794 if !Path::new(clean_path).exists() {
796 return Err(SocketNotFoundError(clean_path.to_string()));
797 }
798
799 #[cfg(unix)]
800 let docker = Docker::connect_with_unix(path, timeout, client_version)?;
801 #[cfg(windows)]
802 let docker = Docker::connect_with_named_pipe(path, timeout, client_version)?;
803
804 Ok(docker)
805 }
806
807 pub fn connect_with_local_defaults() -> Result<Docker, Error> {
814 #[cfg(unix)]
815 return Docker::connect_with_unix_defaults();
816 #[cfg(windows)]
817 return Docker::connect_with_named_pipe_defaults();
818 }
819
820 pub fn connect_with_local(
826 addr: &str,
827 timeout: u64,
828 client_version: &ClientVersion,
829 ) -> Result<Docker, Error> {
830 #[cfg(unix)]
831 return Docker::connect_with_unix(addr, timeout, client_version);
832 #[cfg(windows)]
833 return Docker::connect_with_named_pipe(addr, timeout, client_version);
834 }
835}
836
837impl Docker {
839 pub fn connect_with_defaults() -> Result<Docker, Error> {
853 let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_DOCKER_HOST.to_string());
854 Self::connect_with_host(&host)
855 }
856
857 pub fn connect_with_host(host: &str) -> Result<Docker, Error> {
871 match host {
872 #[cfg(all(feature = "pipe", unix))]
873 h if h.starts_with("unix://") => {
874 Docker::connect_with_unix(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
875 }
876 #[cfg(all(feature = "pipe", windows))]
877 h if h.starts_with("npipe://") => {
878 Docker::connect_with_named_pipe(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
879 }
880 #[cfg(feature = "http")]
881 h if h.starts_with("tcp://") || h.starts_with("http://") => {
882 #[cfg(feature = "ssl_providerless")]
883 if env::var("DOCKER_TLS_VERIFY").is_ok() {
884 return Docker::connect_with_ssl_default_certs(host);
885 }
886 Docker::connect_with_http(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
887 }
888 #[cfg(feature = "ssl_providerless")]
889 h if h.starts_with("https://") => Docker::connect_with_ssl_default_certs(host),
890 #[cfg(feature = "ssh")]
891 h if h.starts_with("ssh://") => {
892 Docker::connect_with_ssh(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION, None)
893 }
894 _ => Err(UnsupportedURISchemeError {
895 uri: host.to_string(),
896 }),
897 }
898 }
899}
900
901#[cfg(all(feature = "pipe", unix))]
902impl Docker {
904 pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
923 let socket_path = env::var("DOCKER_HOST").ok().and_then(|p| {
925 if p.starts_with("unix://") {
926 Some(p)
927 } else {
928 None
929 }
930 });
931 let path = socket_path.as_deref();
932 let path_ref: &str = path.unwrap_or(DEFAULT_SOCKET);
933 Docker::connect_with_unix(path_ref, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
934 }
935
936 #[cfg(unix)]
941 fn podman_rootless_socket_path() -> Option<String> {
942 if let Ok(xrd) = env::var("XDG_RUNTIME_DIR") {
944 let sock = format!("{xrd}/podman/podman.sock");
945 if Path::new(&sock).exists() {
946 return Some(sock);
947 }
948 }
949
950 #[cfg(unix)]
952 {
953 use std::os::unix::fs::MetadataExt;
954 if let Ok(meta) = std::fs::metadata("/proc/self") {
955 let sock = DEFAULT_PODMAN_SOCKET_TEMPLATE.replace("{UID}", &meta.uid().to_string());
956 if Path::new(&sock).exists() {
957 return Some(sock);
958 }
959 }
960 }
961
962 None
963 }
964
965 #[cfg(unix)]
969 fn podman_system_socket_path() -> Option<&'static str> {
970 let path = DEFAULT_PODMAN_SYSTEM_SOCKET
971 .strip_prefix("unix://")
972 .unwrap_or(DEFAULT_PODMAN_SYSTEM_SOCKET);
973 if Path::new(path).exists() {
974 Some(path)
975 } else {
976 None
977 }
978 }
979
980 #[cfg(unix)]
1001 pub fn connect_with_podman_defaults() -> Result<Docker, Error> {
1002 if let Some(host) = env::var("DOCKER_HOST")
1004 .ok()
1005 .filter(|p| p.starts_with("unix://"))
1006 {
1007 return Docker::connect_with_unix(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
1008 }
1009
1010 if let Some(sock) = Self::podman_rootless_socket_path() {
1012 return Docker::connect_with_unix(&sock, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
1013 }
1014
1015 if let Some(sock) = Self::podman_system_socket_path() {
1017 return Docker::connect_with_unix(sock, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
1018 }
1019
1020 Docker::connect_with_unix(DEFAULT_SOCKET, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
1022 }
1023
1024 pub fn connect_with_unix(
1043 path: &str,
1044 timeout: u64,
1045 client_version: &ClientVersion,
1046 ) -> Result<Docker, Error> {
1047 let client_addr = path.replacen("unix://", "", 1);
1048
1049 if !Path::new(&client_addr).exists() {
1051 return Err(SocketNotFoundError(client_addr));
1052 }
1053
1054 let unix_connector = UnixConnector;
1055
1056 let mut client_builder = Client::builder(TokioExecutor::new());
1057 client_builder.pool_max_idle_per_host(0);
1058
1059 let client = client_builder.build(unix_connector);
1060 let transport = Transport::Unix { client };
1061 let docker = Docker {
1062 transport: Arc::new(transport),
1063 client_type: ClientType::Unix,
1064 client_addr,
1065 client_timeout: timeout,
1066 version: Arc::new((
1067 AtomicUsize::new(client_version.major_version),
1068 AtomicUsize::new(client_version.minor_version),
1069 )),
1070 request_modifier: None,
1071 };
1072
1073 Ok(docker)
1074 }
1075}
1076
1077#[cfg(all(feature = "pipe", windows))]
1078impl Docker {
1081 pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
1101 Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
1102 }
1103
1104 pub fn connect_with_named_pipe(
1125 path: &str,
1126 timeout: u64,
1127 client_version: &ClientVersion,
1128 ) -> Result<Docker, Error> {
1129 let client_addr = path.replacen("npipe://", "", 1);
1130
1131 let named_pipe_connector = NamedPipeConnector;
1132
1133 let mut client_builder = Client::builder(TokioExecutor::new());
1134 client_builder.http1_title_case_headers(true);
1135 client_builder.pool_max_idle_per_host(0);
1136
1137 let client = client_builder.build(named_pipe_connector);
1138 let transport = Transport::NamedPipe { client };
1139 let docker = Docker {
1140 transport: Arc::new(transport),
1141 client_type: ClientType::NamedPipe,
1142 client_addr,
1143 client_timeout: timeout,
1144 version: Arc::new((
1145 AtomicUsize::new(client_version.major_version),
1146 AtomicUsize::new(client_version.minor_version),
1147 )),
1148 request_modifier: None,
1149 };
1150
1151 Ok(docker)
1152 }
1153}
1154
1155#[cfg(feature = "ssh")]
1156impl Docker {
1158 pub fn connect_with_ssh_defaults() -> Result<Docker, Error> {
1179 let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_SSH_ADDRESS.to_string());
1180 Docker::connect_with_ssh(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION, None)
1181 }
1182
1183 pub fn connect_with_ssh(
1205 addr: &str,
1206 timeout: u64,
1207 client_version: &ClientVersion,
1208 keypair_path: Option<String>,
1209 ) -> Result<Docker, Error> {
1210 let client_addr = addr.replacen("ssh://", "", 1);
1211
1212 let ssh_connector = match keypair_path {
1213 Some(path) => crate::ssh::SshConnector::with_keypair(path.to_string()),
1214 None => crate::ssh::SshConnector::new(),
1215 };
1216
1217 let client_builder = Client::builder(TokioExecutor::new());
1218
1219 let client = client_builder.build(ssh_connector);
1220 let transport = Transport::Ssh { client };
1221 let docker = Docker {
1222 transport: Arc::new(transport),
1223 client_type: ClientType::Ssh,
1224 client_addr,
1225 client_timeout: timeout,
1226 version: Arc::new((
1227 AtomicUsize::new(client_version.major_version),
1228 AtomicUsize::new(client_version.minor_version),
1229 )),
1230 request_modifier: None,
1231 };
1232
1233 Ok(docker)
1234 }
1235}
1236
1237#[cfg(test)]
1238impl Docker {
1239 pub fn connect_with_mock(
1268 connector: yup_hyper_mock::HostToReplyConnector,
1269 client_addr: String,
1270 timeout: u64,
1271 client_version: &ClientVersion,
1272 ) -> Result<Docker, Error> {
1273 let client_builder = Client::builder(TokioExecutor::new());
1274 let client = client_builder.build(connector);
1275
1276 let (transport, client_type) = (Transport::Mock { client }, ClientType::Http);
1277
1278 let docker = Docker {
1279 transport: Arc::new(transport),
1280 client_type,
1281 client_addr,
1282 client_timeout: timeout,
1283 version: Arc::new((
1284 AtomicUsize::new(client_version.major_version),
1285 AtomicUsize::new(client_version.minor_version),
1286 )),
1287 request_modifier: None,
1288 };
1289
1290 Ok(docker)
1291 }
1292}
1293
1294impl Docker {
1295 pub fn with_timeout(mut self, timeout: Duration) -> Self {
1301 self.set_timeout(timeout);
1302 self
1303 }
1304
1305 pub fn timeout(&self) -> Duration {
1309 Duration::from_secs(self.client_timeout)
1310 }
1311
1312 pub fn set_timeout(&mut self, timeout: Duration) {
1318 self.client_timeout = timeout.as_secs();
1319 }
1320
1321 pub fn with_request_modifier<F>(mut self, modifier: F) -> Self
1340 where
1341 F: Fn(BollardRequest) -> BollardRequest + Send + Sync + 'static,
1342 {
1343 self.request_modifier = Some(Arc::new(modifier));
1344 self
1345 }
1346}
1347
1348impl Docker {
1350 pub(crate) fn process_into_value<T>(
1351 &self,
1352 req: Result<Request<BodyType>, Error>,
1353 ) -> impl Future<Output = Result<T, Error>>
1354 where
1355 T: DeserializeOwned,
1356 {
1357 let fut = self.process_request(req);
1358 async move { Docker::decode_response(fut.await?).await }
1359 }
1360
1361 pub(crate) fn process_into_stream<T>(
1362 &self,
1363 req: Result<Request<BodyType>, Error>,
1364 ) -> impl Stream<Item = Result<T, Error>> + Unpin
1365 where
1366 T: DeserializeOwned,
1367 {
1368 Box::pin(
1369 self.process_request(req)
1370 .map_ok(Docker::decode_into_stream::<T>)
1371 .into_stream()
1372 .try_flatten(),
1373 )
1374 }
1375
1376 pub(crate) fn process_into_stream_string(
1377 &self,
1378 req: Result<Request<BodyType>, Error>,
1379 ) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
1380 Box::pin(
1381 self.process_request(req)
1382 .map_ok(Docker::decode_into_stream_string)
1383 .try_flatten_stream(),
1384 )
1385 }
1386
1387 pub(crate) fn process_into_unit(
1388 &self,
1389 req: Result<Request<BodyType>, Error>,
1390 ) -> impl Future<Output = Result<(), Error>> {
1391 let fut = self.process_request(req);
1392 async move {
1393 fut.await?;
1394 Ok(())
1395 }
1396 }
1397
1398 pub(crate) fn process_into_body(
1399 &self,
1400 req: Result<Request<BodyType>, Error>,
1401 ) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
1402 Box::pin(
1403 self.process_request(req)
1404 .map_ok(|response| IncomingStream::new(response.into_body()))
1405 .into_stream()
1406 .try_flatten(),
1407 )
1408 }
1409
1410 pub(crate) fn process_into_string(
1411 &self,
1412 req: Result<Request<BodyType>, Error>,
1413 ) -> impl Future<Output = Result<String, Error>> {
1414 let fut = self.process_request(req);
1415 async move {
1416 let response = fut.await?;
1417 Docker::decode_into_string(response).await
1418 }
1419 }
1420
1421 pub(crate) async fn process_upgraded(
1422 &self,
1423 req: Result<Request<BodyType>, Error>,
1424 ) -> Result<(impl AsyncRead, impl AsyncWrite), Error> {
1425 let res = self.process_request(req).await?;
1426 let upgraded = hyper::upgrade::on(res).await?;
1427 let tokio_upgraded = AsyncUpgraded::new(upgraded);
1428
1429 Ok(split(tokio_upgraded))
1430 }
1431
1432 #[cfg(all(feature = "websocket", unix))]
1433 pub(crate) async fn process_websocket<O>(
1434 &self,
1435 path: &str,
1436 query: Option<O>,
1437 ) -> Result<WebSocketStream<tokio::net::UnixStream>, Error>
1438 where
1439 O: Serialize,
1440 {
1441 use tokio_tungstenite::client_async;
1442
1443 let query_string = match query {
1444 Some(q) => {
1445 let qs = serde_urlencoded::to_string(&q)?;
1446 if qs.is_empty() {
1447 String::new()
1448 } else {
1449 format!("?{}", qs)
1450 }
1451 }
1452 None => String::new(),
1453 };
1454
1455 let ws_uri = format!("ws://localhost{}{}", path, query_string);
1456 debug!("WebSocket URI: {}", ws_uri);
1457
1458 let stream = tokio::net::UnixStream::connect(&self.client_addr).await?;
1459
1460 let (ws_stream, _response) = client_async(&ws_uri, stream)
1461 .await
1462 .map_err(|e| Error::WebSocketError { err: Box::new(e) })?;
1463
1464 Ok(ws_stream)
1465 }
1466
1467 #[cfg(all(feature = "websocket", not(unix)))]
1468 pub(crate) async fn process_websocket<O>(
1469 &self,
1470 path: &str,
1471 query: Option<O>,
1472 ) -> Result<WebSocketStream<tokio::net::TcpStream>, Error>
1473 where
1474 O: Serialize,
1475 {
1476 use tokio_tungstenite::client_async;
1477
1478 let query_string = match query {
1479 Some(q) => {
1480 let qs = serde_urlencoded::to_string(&q)?;
1481 if qs.is_empty() {
1482 String::new()
1483 } else {
1484 format!("?{}", qs)
1485 }
1486 }
1487 None => String::new(),
1488 };
1489
1490 let ws_uri = format!("ws://{}{}{}", self.client_addr, path, query_string);
1491 debug!("WebSocket URI: {}", ws_uri);
1492
1493 let stream = tokio::net::TcpStream::connect(&self.client_addr).await?;
1494
1495 let (ws_stream, _response) = client_async(&ws_uri, stream)
1496 .await
1497 .map_err(|e| Error::WebSocketError { err: Box::new(e) })?;
1498
1499 Ok(ws_stream)
1500 }
1501
1502 pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<BodyType, Error>
1503 where
1504 S: Serialize,
1505 {
1506 match body.map(|inst| serde_json::to_string(&inst)) {
1507 Some(Ok(res)) => Ok(Some(res)),
1508 Some(Err(e)) => Err(e.into()),
1509 None => Ok(None),
1510 }
1511 .map(|payload| {
1512 debug!("{}", payload.clone().unwrap_or_default());
1513 payload
1514 .map(|content| BodyType::Left(Full::new(content.into())))
1515 .unwrap_or(BodyType::Left(Full::new(Bytes::new())))
1516 })
1517 }
1518
1519 pub fn client_version(&self) -> ClientVersion {
1521 self.version.as_ref().into()
1522 }
1523
1524 pub async fn negotiate_version(self) -> Result<Self, Error> {
1538 let req = self.build_request(
1539 "/version",
1540 Builder::new().method(Method::GET),
1541 None::<String>,
1542 Ok(BodyType::Left(Full::new(Bytes::new()))),
1543 );
1544
1545 let res = self
1546 .process_into_value::<crate::models::SystemVersion>(req)
1547 .await?;
1548
1549 let server_version: ClientVersion = if let Some(api_version) = res.api_version {
1550 match api_version.into() {
1551 MaybeClientVersion::Some(client_version) => client_version,
1552 MaybeClientVersion::None => {
1553 return Err(APIVersionParseError {});
1554 }
1555 }
1556 } else {
1557 return Err(APIVersionParseError {});
1558 };
1559
1560 if server_version < self.client_version() {
1561 self.version
1562 .0
1563 .store(server_version.major_version, Ordering::Relaxed);
1564 self.version
1565 .1
1566 .store(server_version.minor_version, Ordering::Relaxed);
1567 }
1568
1569 Ok(self)
1570 }
1571
1572 pub(crate) fn process_request(
1573 &self,
1574 request: Result<Request<BodyType>, Error>,
1575 ) -> impl Future<Output = Result<Response<Incoming>, Error>> {
1576 let transport = self.transport.clone();
1577 let timeout = self.client_timeout;
1578
1579 match request.as_ref().map(|b| b.body()) {
1580 Ok(http_body_util::Either::Left(bytes)) => trace!("request: {bytes:?}"),
1581 Ok(http_body_util::Either::Right(_)) => trace!("request: (stream)"),
1582 Err(e) => trace!("request: Err({e:?}"),
1583 };
1584
1585 async move {
1586 let request = request?;
1587 let response = Docker::execute_request(transport, request, timeout).await?;
1588
1589 let status = response.status();
1590 match status {
1591 s if s.is_success() || s == StatusCode::NOT_MODIFIED => Ok(response),
1593
1594 StatusCode::SWITCHING_PROTOCOLS => Ok(response),
1595
1596 _ => {
1598 let contents = Docker::decode_into_string(response).await?;
1599
1600 let mut message = String::new();
1601 if !contents.is_empty() {
1602 message = serde_json::from_str::<DockerServerErrorMessage>(&contents)
1603 .map(|msg| msg.message)
1604 .or_else(|e| {
1605 if e.is_data() || e.is_syntax() {
1606 Ok(contents)
1607 } else {
1608 Err(e)
1609 }
1610 })?;
1611 }
1612 Err(DockerResponseServerError {
1613 status_code: status.as_u16(),
1614 message,
1615 })
1616 }
1617 }
1618 }
1619 }
1620
1621 pub(crate) fn build_request<O>(
1622 &self,
1623 path: &str,
1624 builder: Builder,
1625 query: Option<O>,
1626 payload: Result<BodyType, Error>,
1627 ) -> Result<Request<BodyType>, Error>
1628 where
1629 O: Serialize,
1630 {
1631 let uri = Uri::parse(
1632 &self.client_addr,
1633 &self.client_type,
1634 path,
1635 query,
1636 &self.client_version(),
1637 )?;
1638 let request_uri: hyper::Uri = uri.try_into()?;
1639 debug!("{}", &request_uri);
1640
1641 let request = builder
1642 .uri(request_uri)
1643 .header(CONTENT_TYPE, "application/json")
1644 .body(payload?)?;
1645
1646 let request = if let Some(modifier) = &self.request_modifier {
1647 modifier(request)
1648 } else {
1649 request
1650 };
1651
1652 Ok(request)
1653 }
1654
1655 pub(crate) fn build_request_with_registry_auth<O>(
1656 &self,
1657 path: &str,
1658 mut builder: Builder,
1659 query: Option<O>,
1660 payload: Result<BodyType, Error>,
1661 credentials: DockerCredentialsHeader,
1662 ) -> Result<Request<BodyType>, Error>
1663 where
1664 O: Serialize,
1665 {
1666 match credentials {
1667 DockerCredentialsHeader::Config(Some(config)) => {
1668 let value = base64_url_encode(&serde_json::to_string(&config)?);
1669 builder = builder.header("X-Registry-Config", value)
1670 }
1671 DockerCredentialsHeader::Auth(Some(config)) => {
1672 let value = base64_url_encode(&serde_json::to_string(&config)?);
1673 builder = builder.header("X-Registry-Auth", value)
1674 }
1675 _ => {}
1676 }
1677
1678 self.build_request(path, builder, query, payload)
1679 }
1680
1681 async fn execute_request(
1682 transport: Arc<Transport>,
1683 req: Request<BodyType>,
1684 timeout: u64,
1685 ) -> Result<Response<Incoming>, Error> {
1686 let request = match *transport {
1688 #[cfg(feature = "http")]
1689 Transport::Http { ref client } => client.request(req).map_err(Error::from).boxed(),
1690 #[cfg(feature = "ssl_providerless")]
1691 Transport::Https { ref client } => client.request(req).map_err(Error::from).boxed(),
1692 #[cfg(all(feature = "pipe", unix))]
1693 Transport::Unix { ref client } => client.request(req).map_err(Error::from).boxed(),
1694 #[cfg(all(feature = "pipe", windows))]
1695 Transport::NamedPipe { ref client } => client.request(req).map_err(Error::from).boxed(),
1696 #[cfg(feature = "ssh")]
1697 Transport::Ssh { ref client } => client.request(req).map_err(Error::from).boxed(),
1698 #[cfg(test)]
1699 Transport::Mock { ref client } => client.request(req).map_err(Error::from).boxed(),
1700 Transport::Custom { ref transport } => transport.request(req).boxed(),
1701 };
1702
1703 match tokio::time::timeout(Duration::from_secs(timeout), request).await {
1704 Ok(v) => Ok(v?),
1705 Err(_) => Err(RequestTimeoutError),
1706 }
1707 }
1708
1709 fn decode_into_stream<T>(res: Response<Incoming>) -> impl Stream<Item = Result<T, Error>>
1710 where
1711 T: DeserializeOwned,
1712 {
1713 FramedRead::new(StreamReader::new(res.into_body()), JsonLineDecoder::new())
1714 }
1715
1716 fn decode_into_stream_string(
1717 res: Response<Incoming>,
1718 ) -> impl Stream<Item = Result<LogOutput, Error>> {
1719 FramedRead::new(
1720 StreamReader::new(res.into_body()),
1721 NewlineLogOutputDecoder::new(false),
1722 )
1723 .map_err(Error::from)
1724 }
1725
1726 async fn decode_into_string(response: Response<Incoming>) -> Result<String, Error> {
1727 let body = response.into_body().collect().await?.to_bytes();
1728
1729 Ok(String::from_utf8_lossy(&body).to_string())
1730 }
1731
1732 async fn decode_response<T>(response: Response<Incoming>) -> Result<T, Error>
1733 where
1734 T: DeserializeOwned,
1735 {
1736 let bytes = response.into_body().collect().await?.to_bytes();
1737
1738 debug!("Decoded into string: {}", &String::from_utf8_lossy(&bytes));
1739
1740 serde_json::from_slice::<T>(&bytes).map_err(|e| {
1741 if e.is_data() || e.is_syntax() {
1742 JsonDataError {
1743 message: e.to_string(),
1744 column: e.column(),
1745 #[cfg(feature = "json_data_content")]
1746 contents: String::from_utf8_lossy(&bytes).to_string(),
1747 }
1748 } else {
1749 e.into()
1750 }
1751 })
1752 }
1753}
1754
1755pub(crate) type BodyType = http_body_util::Either<
1757 Full<Bytes>,
1758 StreamBody<Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, std::io::Error>> + Send>>>,
1759>;
1760
1761pub fn body_stream(body: impl Stream<Item = Bytes> + Send + 'static) -> BodyType {
1763 BodyType::Right(StreamBody::new(Box::pin(body.map(|a| Ok(Frame::data(a))))))
1764}
1765
1766pub fn body_try_stream(
1768 body: impl Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
1769) -> BodyType {
1770 BodyType::Right(StreamBody::new(Box::pin(body.map_ok(Frame::data))))
1771}
1772
1773pub fn body_full(body: Bytes) -> BodyType {
1775 BodyType::Left(Full::new(body))
1776}
1777
1778#[cfg(test)]
1779mod tests {
1780 use super::*;
1781
1782 #[cfg(unix)]
1783 mod podman {
1784 use super::*;
1785
1786 #[test]
1787 fn rootless_socket_from_xdg_runtime_dir() {
1788 let dir = tempfile::tempdir().unwrap();
1789 let sock_dir = dir.path().join("podman");
1790 std::fs::create_dir_all(&sock_dir).unwrap();
1791 let sock = sock_dir.join("podman.sock");
1792 std::fs::write(&sock, b"").unwrap();
1793
1794 let _guard = TempEnvVar::set("XDG_RUNTIME_DIR", dir.path().to_str().unwrap());
1796
1797 let found = Docker::podman_rootless_socket_path();
1798 assert_eq!(found.as_deref(), Some(sock.to_str().unwrap()));
1799 }
1800
1801 #[test]
1802 fn rootless_socket_returns_none_when_missing() {
1803 let dir = tempfile::tempdir().unwrap();
1805 let _guard = TempEnvVar::set("XDG_RUNTIME_DIR", dir.path().to_str().unwrap());
1806
1807 let found = Docker::podman_rootless_socket_path();
1808 let xdg_sock = dir.path().join("podman/podman.sock");
1811 if let Some(ref path) = found {
1812 assert_ne!(path.as_str(), xdg_sock.to_str().unwrap());
1813 }
1814 }
1815
1816 #[test]
1817 fn system_socket_returns_none_when_missing() {
1818 let _ = Docker::podman_system_socket_path();
1821 }
1822
1823 #[test]
1824 fn connect_with_podman_defaults_respects_docker_host() {
1825 let dir = tempfile::tempdir().unwrap();
1827 let sock = dir.path().join("test.sock");
1828 std::fs::write(&sock, b"").unwrap();
1829
1830 let uri = format!("unix://{}", sock.display());
1831 let _guard = TempEnvVar::set("DOCKER_HOST", &uri);
1832
1833 let docker = Docker::connect_with_podman_defaults().unwrap();
1834 assert_eq!(docker.client_addr, sock.to_str().unwrap());
1835 }
1836
1837 struct TempEnvVar {
1839 key: String,
1840 prev: Option<String>,
1841 }
1842
1843 impl TempEnvVar {
1844 fn set(key: &str, val: &str) -> Self {
1845 let prev = env::var(key).ok();
1846 env::set_var(key, val);
1847 Self {
1848 key: key.to_string(),
1849 prev,
1850 }
1851 }
1852 }
1853
1854 impl Drop for TempEnvVar {
1855 fn drop(&mut self) {
1856 match &self.prev {
1857 Some(v) => env::set_var(&self.key, v),
1858 None => env::remove_var(&self.key),
1859 }
1860 }
1861 }
1862 }
1863
1864 #[cfg(all(unix, feature = "pipe"))]
1865 mod docker_defaults {
1866 use super::*;
1867
1868 #[test]
1869 fn connect_with_unix_defaults_respects_docker_host() {
1870 let dir = tempfile::tempdir().unwrap();
1871 let sock = dir.path().join("test.sock");
1872 std::fs::write(&sock, b"").unwrap();
1873
1874 let uri = format!("unix://{}", sock.display());
1875 let prev = env::var("DOCKER_HOST").ok();
1877 env::set_var("DOCKER_HOST", &uri);
1878
1879 let docker = Docker::connect_with_unix_defaults().unwrap();
1880 assert_eq!(docker.client_addr, sock.to_str().unwrap());
1881
1882 match prev {
1884 Some(v) => env::set_var("DOCKER_HOST", v),
1885 None => env::remove_var("DOCKER_HOST"),
1886 }
1887 }
1888
1889 #[test]
1890 fn connect_with_unix_defaults_ignores_non_unix_docker_host() {
1891 let prev = env::var("DOCKER_HOST").ok();
1892 env::set_var("DOCKER_HOST", "tcp://localhost:2375");
1893
1894 let result = Docker::connect_with_unix_defaults();
1896 if let Err(SocketNotFoundError(addr)) = &result {
1898 assert!(addr.contains("docker.sock"));
1899 }
1900
1901 match prev {
1902 Some(v) => env::set_var("DOCKER_HOST", v),
1903 None => env::remove_var("DOCKER_HOST"),
1904 }
1905 }
1906 }
1907}