#[cfg(feature = "ssl_providerless")]
use std::fs;
use std::future::Future;
#[cfg(feature = "ssl_providerless")]
use std::io;
#[cfg(any(feature = "pipe", feature = "ssl_providerless"))]
use std::path::Path;
#[cfg(feature = "ssl_providerless")]
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, env, fmt};
use futures_core::Stream;
use futures_util::future::FutureExt;
use futures_util::future::TryFutureExt;
use futures_util::stream::TryStreamExt;
use futures_util::StreamExt;
use http::header::CONTENT_TYPE;
use http::request::Builder;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::{Frame, Incoming};
use hyper::{self, body::Bytes, Method, Request, Response, StatusCode};
#[cfg(feature = "ssl_providerless")]
use hyper_rustls::HttpsConnector;
#[cfg(any(feature = "http", test))]
use hyper_util::client::legacy::connect::HttpConnector;
#[cfg(any(feature = "http", feature = "ssh", feature = "pipe", test))]
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
#[cfg(all(feature = "pipe", unix))]
use hyperlocal::UnixConnector;
use log::{debug, trace};
#[cfg(feature = "ssl_providerless")]
use rustls::{crypto::CryptoProvider, sign::CertifiedKey};
#[cfg(feature = "ssl_providerless")]
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use serde_derive::{Deserialize, Serialize};
use tokio::io::{split, AsyncRead, AsyncWrite};
use tokio_util::codec::FramedRead;
use crate::container::LogOutput;
use crate::errors::Error;
use crate::errors::Error::*;
use crate::read::{
AsyncUpgraded, IncomingStream, JsonLineDecoder, NewlineLogOutputDecoder, StreamReader,
};
use crate::uri::Uri;
#[cfg(all(feature = "pipe", windows))]
use hyper_named_pipe::NamedPipeConnector;
use crate::auth::{base64_url_encode, DockerCredentialsHeader};
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
#[cfg(feature = "websocket")]
use tokio_tungstenite::WebSocketStream;
#[cfg(unix)]
pub const DEFAULT_SOCKET: &str = "unix:///var/run/docker.sock";
#[cfg(windows)]
pub const DEFAULT_NAMED_PIPE: &str = "npipe:////./pipe/docker_engine";
#[cfg(feature = "http")]
pub const DEFAULT_TCP_ADDRESS: &str = "tcp://localhost:2375";
#[cfg(feature = "ssh")]
pub const DEFAULT_SSH_ADDRESS: &str = "ssh://localhost";
#[cfg(unix)]
pub(crate) const DEFAULT_PODMAN_SOCKET_TEMPLATE: &str = "/run/user/{UID}/podman/podman.sock";
#[cfg(unix)]
pub(crate) const DEFAULT_PODMAN_SYSTEM_SOCKET: &str = "unix:///run/podman/podman.sock";
#[cfg(unix)]
pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_SOCKET;
#[cfg(windows)]
pub const DEFAULT_DOCKER_HOST: &str = DEFAULT_NAMED_PIPE;
#[cfg(any(feature = "http", feature = "ssh", feature = "pipe"))]
const DEFAULT_TIMEOUT: u64 = 120;
pub const API_DEFAULT_VERSION: &ClientVersion = &ClientVersion {
major_version: 1,
minor_version: 53,
};
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
#[cfg(all(feature = "pipe", unix))]
Unix,
#[cfg(feature = "http")]
Http,
#[cfg(feature = "ssl_providerless")]
SSL,
#[cfg(all(feature = "pipe", windows))]
NamedPipe,
#[cfg(feature = "ssh")]
Ssh,
Custom {
scheme: String,
},
}
pub type BollardRequest = Request<BodyType>;
type TransportReturnTy = Pin<Box<dyn Future<Output = Result<Response<Incoming>, Error>> + Send>>;
pub trait CustomTransport: Send + Sync {
fn request(&self, request: BollardRequest) -> TransportReturnTy;
}
impl<Callback, ReturnTy> CustomTransport for Callback
where
Callback: Fn(BollardRequest) -> ReturnTy + Send + Sync,
ReturnTy: Future<Output = Result<Response<Incoming>, Error>> + Send + 'static,
{
fn request(&self, request: BollardRequest) -> TransportReturnTy {
Box::pin(self(request))
}
}
pub(crate) enum Transport {
#[cfg(feature = "http")]
Http {
client: Client<HttpConnector, BodyType>,
},
#[cfg(feature = "ssl_providerless")]
Https {
client: Client<HttpsConnector<HttpConnector>, BodyType>,
},
#[cfg(all(feature = "pipe", unix))]
Unix {
client: Client<UnixConnector, BodyType>,
},
#[cfg(all(feature = "pipe", windows))]
NamedPipe {
client: Client<NamedPipeConnector, BodyType>,
},
#[cfg(feature = "ssh")]
Ssh {
client: Client<crate::ssh::SshConnector, BodyType>,
},
#[cfg(test)]
Mock {
client: Client<yup_hyper_mock::HostToReplyConnector, BodyType>,
},
Custom {
transport: Box<dyn CustomTransport>,
},
}
impl fmt::Debug for Transport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(feature = "http")]
Transport::Http { .. } => write!(f, "HTTP"),
#[cfg(feature = "ssl_providerless")]
Transport::Https { .. } => write!(f, "HTTPS(rustls)"),
#[cfg(all(feature = "pipe", unix))]
Transport::Unix { .. } => write!(f, "Unix"),
#[cfg(all(feature = "pipe", windows))]
Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
#[cfg(feature = "ssh")]
Transport::Ssh { .. } => write!(f, "SSH"),
#[cfg(test)]
Transport::Mock { .. } => write!(f, "Mock"),
Transport::Custom { .. } => write!(f, "Custom"),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct ClientVersion {
pub major_version: usize,
pub minor_version: usize,
}
pub(crate) enum MaybeClientVersion {
Some(ClientVersion),
None,
}
impl<T: Into<String>> From<T> for MaybeClientVersion {
fn from(s: T) -> MaybeClientVersion {
match s
.into()
.split('.')
.map(|v| v.parse::<usize>())
.collect::<Vec<Result<usize, std::num::ParseIntError>>>()
.as_slice()
{
[Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
major_version: first.to_owned(),
minor_version: second.to_owned(),
}),
_ => MaybeClientVersion::None,
}
}
}
impl fmt::Display for ClientVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}.{}", self.major_version, self.minor_version)
}
}
impl PartialOrd for ClientVersion {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
match self.major_version.partial_cmp(&other.major_version) {
Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
res => res,
}
}
}
impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
ClientVersion {
major_version: tpl.0.load(Ordering::Relaxed),
minor_version: tpl.1.load(Ordering::Relaxed),
}
}
}
pub type RequestModifier = Arc<dyn Fn(BollardRequest) -> BollardRequest + Send + Sync>;
pub struct Docker {
pub(crate) transport: Arc<Transport>,
pub(crate) client_type: ClientType,
pub(crate) client_addr: String,
pub(crate) client_timeout: u64,
pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
pub(crate) request_modifier: Option<RequestModifier>,
}
impl fmt::Debug for Docker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Docker")
.field("transport", &self.transport)
.field("client_type", &self.client_type)
.field("client_addr", &self.client_addr)
.field("client_timeout", &self.client_timeout)
.field("version", &self.version)
.field(
"request_modifier",
&self.request_modifier.as_ref().map(|_| "<callback>"),
)
.finish()
}
}
impl Clone for Docker {
fn clone(&self) -> Docker {
Docker {
transport: self.transport.clone(),
client_type: self.client_type.clone(),
client_addr: self.client_addr.clone(),
client_timeout: self.client_timeout,
version: self.version.clone(),
request_modifier: self.request_modifier.clone(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
struct DockerServerErrorMessage {
message: String,
}
#[cfg(feature = "ssl_providerless")]
#[derive(Debug)]
struct DockerClientCertResolver {
ssl_key: PathBuf,
ssl_cert: PathBuf,
}
#[cfg(feature = "ssl_providerless")]
impl DockerClientCertResolver {
pub fn default_cert_path() -> Result<PathBuf, Error> {
let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
if let Ok(ref path) = from_env {
Ok(Path::new(path).to_owned())
} else {
let home = home::home_dir().ok_or_else(|| NoHomePathError)?;
Ok(home.join(".docker"))
}
}
fn open_buffered(path: &Path) -> Result<io::BufReader<fs::File>, Error> {
Ok(io::BufReader::new(fs::File::open(path)?))
}
fn certs(path: &Path) -> Result<Vec<CertificateDer<'static>>, Error> {
use rustls_pki_types::pem::PemObject;
rustls_pki_types::CertificateDer::pem_reader_iter(&mut Self::open_buffered(path)?)
.filter_map(|res| {
if matches!(res, Err(rustls_pki_types::pem::Error::NoItemsFound)) {
None
} else {
Some(res)
}
})
.collect::<Result<Vec<CertificateDer<'static>>, rustls_pki_types::pem::Error>>()
.map_err(|_| CertPathError {
path: path.to_path_buf(),
})
}
fn keys(path: &Path) -> Result<Vec<PrivateKeyDer<'static>>, Error> {
use rustls_pki_types::pem::PemObject;
let mut rdr = Self::open_buffered(path)?;
let mut keys = vec![];
match rustls_pki_types::PrivateKeyDer::from_pem_reader(&mut rdr) {
Ok(key) => keys.push(key),
Err(rustls_pki_types::pem::Error::NoItemsFound) => {}
Err(_e) => {
return Err(CertPathError {
path: path.to_path_buf(),
})
}
}
Ok(keys)
}
fn docker_client_key(&self) -> Result<Arc<CertifiedKey>, Error> {
let all_certs = Self::certs(&self.ssl_cert)?;
let mut all_keys = Self::keys(&self.ssl_key)?;
let key = if all_keys.len() == 1 {
all_keys.remove(0)
} else {
return Err(CertMultipleKeys {
count: all_keys.len(),
path: self.ssl_key.to_owned(),
});
};
let signing_key = CryptoProvider::get_default()
.expect("no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point")
.key_provider
.load_private_key(key)
.map_err(|_| CertParseError {
path: self.ssl_key.to_owned(),
})?;
Ok(Arc::new(CertifiedKey::new(all_certs, signing_key)))
}
}
#[cfg(feature = "ssl_providerless")]
impl rustls::client::ResolvesClientCert for DockerClientCertResolver {
fn resolve(&self, _: &[&[u8]], _: &[rustls::SignatureScheme]) -> Option<Arc<CertifiedKey>> {
self.docker_client_key().ok()
}
fn has_certs(&self) -> bool {
true
}
}
#[cfg(feature = "ssl_providerless")]
impl Docker {
pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
Self::connect_with_ssl_default_certs(&host)
}
fn connect_with_ssl_default_certs(host: &str) -> Result<Docker, Error> {
let cert_path = DockerClientCertResolver::default_cert_path()?;
Docker::connect_with_ssl(
host,
&cert_path.join("key.pem"),
&cert_path.join("cert.pem"),
&cert_path.join("ca.pem"),
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
}
pub fn connect_with_ssl(
addr: &str,
ssl_key: &Path,
ssl_cert: &Path,
ssl_ca: &Path,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
use rustls_pki_types::pem::PemObject;
let client_addr = addr.replacen("tcp://", "", 1).replacen("https://", "", 1);
let mut root_store = rustls::RootCertStore::empty();
#[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
let native_certs = rustls_native_certs::load_native_certs();
#[cfg(not(any(feature = "test_ssl", feature = "webpki")))]
if native_certs.errors.is_empty() {
for cert in native_certs.certs {
root_store
.add(cert)
.map_err(|err| NoNativeCertsError { err })?
}
} else {
return Err(LoadNativeCertsErrors {
errors: native_certs.errors,
});
}
#[cfg(any(feature = "test_ssl", feature = "webpki"))]
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let mut ca_pem = io::Cursor::new(fs::read(ssl_ca).map_err(|_| CertPathError {
path: ssl_ca.to_owned(),
})?);
root_store.add_parsable_certificates(
rustls_pki_types::CertificateDer::pem_reader_iter(&mut ca_pem)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| CertParseError {
path: ssl_ca.to_owned(),
})?,
);
let config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_client_cert_resolver(Arc::new(DockerClientCertResolver {
ssl_key: ssl_key.to_owned(),
ssl_cert: ssl_cert.to_owned(),
}));
let mut http_connector = HttpConnector::new();
http_connector.enforce_http(false);
let https_connector: HttpsConnector<HttpConnector> =
HttpsConnector::from((http_connector, config));
let mut client_builder = Client::builder(TokioExecutor::new());
client_builder.pool_max_idle_per_host(0);
let client = client_builder.build(https_connector);
let transport = Transport::Https { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::SSL,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
request_modifier: None,
};
Ok(docker)
}
}
#[cfg(feature = "http")]
impl Docker {
pub fn connect_with_http_defaults() -> Result<Docker, Error> {
let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_TCP_ADDRESS.to_string());
Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_http(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("tcp://", "", 1).replacen("http://", "", 1);
let http_connector = HttpConnector::new();
let mut client_builder = Client::builder(TokioExecutor::new());
client_builder.pool_max_idle_per_host(0);
let client = client_builder.build(http_connector);
let transport = Transport::Http { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Http,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
request_modifier: None,
};
Ok(docker)
}
}
impl Docker {
pub fn connect_with_custom_transport<S: Into<String>>(
transport: impl CustomTransport + 'static,
client_addr: Option<S>,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = client_addr.map(Into::into).unwrap_or_default();
let (scheme, client_addr) = client_addr
.split_once("://")
.unwrap_or(("", client_addr.as_str()));
let client_addr = client_addr.to_owned();
let scheme = scheme.to_owned();
let transport = Transport::Custom {
transport: Box::new(transport),
};
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Custom { scheme },
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
request_modifier: None,
};
Ok(docker)
}
}
#[cfg(all(feature = "pipe", any(unix, windows)))]
impl Docker {
pub fn connect_with_socket_defaults() -> Result<Docker, Error> {
#[cfg(unix)]
let path = DEFAULT_SOCKET;
#[cfg(windows)]
let path = DEFAULT_NAMED_PIPE;
Docker::connect_with_socket(path, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_socket(
path: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let clean_path = path
.trim_start_matches("unix://")
.trim_start_matches("npipe://");
if !Path::new(clean_path).exists() {
return Err(SocketNotFoundError(clean_path.to_string()));
}
#[cfg(unix)]
let docker = Docker::connect_with_unix(path, timeout, client_version)?;
#[cfg(windows)]
let docker = Docker::connect_with_named_pipe(path, timeout, client_version)?;
Ok(docker)
}
pub fn connect_with_local_defaults() -> Result<Docker, Error> {
#[cfg(unix)]
return Docker::connect_with_unix_defaults();
#[cfg(windows)]
return Docker::connect_with_named_pipe_defaults();
}
pub fn connect_with_local(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
#[cfg(unix)]
return Docker::connect_with_unix(addr, timeout, client_version);
#[cfg(windows)]
return Docker::connect_with_named_pipe(addr, timeout, client_version);
}
}
impl Docker {
pub fn connect_with_defaults() -> Result<Docker, Error> {
let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_DOCKER_HOST.to_string());
Self::connect_with_host(&host)
}
pub fn connect_with_host(host: &str) -> Result<Docker, Error> {
match host {
#[cfg(all(feature = "pipe", unix))]
h if h.starts_with("unix://") => {
Docker::connect_with_unix(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
#[cfg(all(feature = "pipe", windows))]
h if h.starts_with("npipe://") => {
Docker::connect_with_named_pipe(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
#[cfg(feature = "http")]
h if h.starts_with("tcp://") || h.starts_with("http://") => {
#[cfg(feature = "ssl_providerless")]
if env::var("DOCKER_TLS_VERIFY").is_ok() {
return Docker::connect_with_ssl_default_certs(host);
}
Docker::connect_with_http(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
#[cfg(feature = "ssl_providerless")]
h if h.starts_with("https://") => Docker::connect_with_ssl_default_certs(host),
#[cfg(feature = "ssh")]
h if h.starts_with("ssh://") => {
Docker::connect_with_ssh(h, DEFAULT_TIMEOUT, API_DEFAULT_VERSION, None)
}
_ => Err(UnsupportedURISchemeError {
uri: host.to_string(),
}),
}
}
}
#[cfg(all(feature = "pipe", unix))]
impl Docker {
pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
let socket_path = env::var("DOCKER_HOST").ok().and_then(|p| {
if p.starts_with("unix://") {
Some(p)
} else {
None
}
});
let path = socket_path.as_deref();
let path_ref: &str = path.unwrap_or(DEFAULT_SOCKET);
Docker::connect_with_unix(path_ref, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
#[cfg(unix)]
fn podman_rootless_socket_path() -> Option<String> {
if let Ok(xrd) = env::var("XDG_RUNTIME_DIR") {
let sock = format!("{xrd}/podman/podman.sock");
if Path::new(&sock).exists() {
return Some(sock);
}
}
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
if let Ok(meta) = std::fs::metadata("/proc/self") {
let sock = DEFAULT_PODMAN_SOCKET_TEMPLATE.replace("{UID}", &meta.uid().to_string());
if Path::new(&sock).exists() {
return Some(sock);
}
}
}
None
}
#[cfg(unix)]
fn podman_system_socket_path() -> Option<&'static str> {
let path = DEFAULT_PODMAN_SYSTEM_SOCKET
.strip_prefix("unix://")
.unwrap_or(DEFAULT_PODMAN_SYSTEM_SOCKET);
if Path::new(path).exists() {
Some(path)
} else {
None
}
}
#[cfg(unix)]
pub fn connect_with_podman_defaults() -> Result<Docker, Error> {
if let Some(host) = env::var("DOCKER_HOST")
.ok()
.filter(|p| p.starts_with("unix://"))
{
return Docker::connect_with_unix(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
}
if let Some(sock) = Self::podman_rootless_socket_path() {
return Docker::connect_with_unix(&sock, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
}
if let Some(sock) = Self::podman_system_socket_path() {
return Docker::connect_with_unix(sock, DEFAULT_TIMEOUT, API_DEFAULT_VERSION);
}
Docker::connect_with_unix(DEFAULT_SOCKET, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_unix(
path: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = path.replacen("unix://", "", 1);
if !Path::new(&client_addr).exists() {
return Err(SocketNotFoundError(client_addr));
}
let unix_connector = UnixConnector;
let mut client_builder = Client::builder(TokioExecutor::new());
client_builder.pool_max_idle_per_host(0);
let client = client_builder.build(unix_connector);
let transport = Transport::Unix { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Unix,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
request_modifier: None,
};
Ok(docker)
}
}
#[cfg(all(feature = "pipe", windows))]
impl Docker {
pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_named_pipe(
path: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = path.replacen("npipe://", "", 1);
let named_pipe_connector = NamedPipeConnector;
let mut client_builder = Client::builder(TokioExecutor::new());
client_builder.http1_title_case_headers(true);
client_builder.pool_max_idle_per_host(0);
let client = client_builder.build(named_pipe_connector);
let transport = Transport::NamedPipe { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::NamedPipe,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
request_modifier: None,
};
Ok(docker)
}
}
#[cfg(feature = "ssh")]
impl Docker {
pub fn connect_with_ssh_defaults() -> Result<Docker, Error> {
let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_SSH_ADDRESS.to_string());
Docker::connect_with_ssh(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION, None)
}
pub fn connect_with_ssh(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
keypair_path: Option<String>,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("ssh://", "", 1);
let ssh_connector = match keypair_path {
Some(path) => crate::ssh::SshConnector::with_keypair(path.to_string()),
None => crate::ssh::SshConnector::new(),
};
let client_builder = Client::builder(TokioExecutor::new());
let client = client_builder.build(ssh_connector);
let transport = Transport::Ssh { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Ssh,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
request_modifier: None,
};
Ok(docker)
}
}
#[cfg(test)]
impl Docker {
pub fn connect_with_mock(
connector: yup_hyper_mock::HostToReplyConnector,
client_addr: String,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_builder = Client::builder(TokioExecutor::new());
let client = client_builder.build(connector);
let (transport, client_type) = (Transport::Mock { client }, ClientType::Http);
let docker = Docker {
transport: Arc::new(transport),
client_type,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
request_modifier: None,
};
Ok(docker)
}
}
impl Docker {
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.set_timeout(timeout);
self
}
pub fn timeout(&self) -> Duration {
Duration::from_secs(self.client_timeout)
}
pub fn set_timeout(&mut self, timeout: Duration) {
self.client_timeout = timeout.as_secs();
}
pub fn with_request_modifier<F>(mut self, modifier: F) -> Self
where
F: Fn(BollardRequest) -> BollardRequest + Send + Sync + 'static,
{
self.request_modifier = Some(Arc::new(modifier));
self
}
}
impl Docker {
pub(crate) fn process_into_value<T>(
&self,
req: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<T, Error>>
where
T: DeserializeOwned,
{
let fut = self.process_request(req);
async move { Docker::decode_response(fut.await?).await }
}
pub(crate) fn process_into_stream<T>(
&self,
req: Result<Request<BodyType>, Error>,
) -> impl Stream<Item = Result<T, Error>> + Unpin
where
T: DeserializeOwned,
{
Box::pin(
self.process_request(req)
.map_ok(Docker::decode_into_stream::<T>)
.into_stream()
.try_flatten(),
)
}
pub(crate) fn process_into_stream_string(
&self,
req: Result<Request<BodyType>, Error>,
) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
Box::pin(
self.process_request(req)
.map_ok(Docker::decode_into_stream_string)
.try_flatten_stream(),
)
}
pub(crate) fn process_into_unit(
&self,
req: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<(), Error>> {
let fut = self.process_request(req);
async move {
fut.await?;
Ok(())
}
}
pub(crate) fn process_into_body(
&self,
req: Result<Request<BodyType>, Error>,
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
Box::pin(
self.process_request(req)
.map_ok(|response| IncomingStream::new(response.into_body()))
.into_stream()
.try_flatten(),
)
}
pub(crate) fn process_into_string(
&self,
req: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<String, Error>> {
let fut = self.process_request(req);
async move {
let response = fut.await?;
Docker::decode_into_string(response).await
}
}
pub(crate) async fn process_upgraded(
&self,
req: Result<Request<BodyType>, Error>,
) -> Result<(impl AsyncRead, impl AsyncWrite), Error> {
let res = self.process_request(req).await?;
let upgraded = hyper::upgrade::on(res).await?;
let tokio_upgraded = AsyncUpgraded::new(upgraded);
Ok(split(tokio_upgraded))
}
#[cfg(all(feature = "websocket", unix))]
pub(crate) async fn process_websocket<O>(
&self,
path: &str,
query: Option<O>,
) -> Result<WebSocketStream<tokio::net::UnixStream>, Error>
where
O: Serialize,
{
use tokio_tungstenite::client_async;
let query_string = match query {
Some(q) => {
let qs = serde_urlencoded::to_string(&q)?;
if qs.is_empty() {
String::new()
} else {
format!("?{}", qs)
}
}
None => String::new(),
};
let ws_uri = format!("ws://localhost{}{}", path, query_string);
debug!("WebSocket URI: {}", ws_uri);
let stream = tokio::net::UnixStream::connect(&self.client_addr).await?;
let (ws_stream, _response) = client_async(&ws_uri, stream)
.await
.map_err(|e| Error::WebSocketError { err: Box::new(e) })?;
Ok(ws_stream)
}
#[cfg(all(feature = "websocket", not(unix)))]
pub(crate) async fn process_websocket<O>(
&self,
path: &str,
query: Option<O>,
) -> Result<WebSocketStream<tokio::net::TcpStream>, Error>
where
O: Serialize,
{
use tokio_tungstenite::client_async;
let query_string = match query {
Some(q) => {
let qs = serde_urlencoded::to_string(&q)?;
if qs.is_empty() {
String::new()
} else {
format!("?{}", qs)
}
}
None => String::new(),
};
let ws_uri = format!("ws://{}{}{}", self.client_addr, path, query_string);
debug!("WebSocket URI: {}", ws_uri);
let stream = tokio::net::TcpStream::connect(&self.client_addr).await?;
let (ws_stream, _response) = client_async(&ws_uri, stream)
.await
.map_err(|e| Error::WebSocketError { err: Box::new(e) })?;
Ok(ws_stream)
}
pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<BodyType, Error>
where
S: Serialize,
{
match body.map(|inst| serde_json::to_string(&inst)) {
Some(Ok(res)) => Ok(Some(res)),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
.map(|payload| {
debug!("{}", payload.clone().unwrap_or_default());
payload
.map(|content| BodyType::Left(Full::new(content.into())))
.unwrap_or(BodyType::Left(Full::new(Bytes::new())))
})
}
pub fn client_version(&self) -> ClientVersion {
self.version.as_ref().into()
}
pub async fn negotiate_version(self) -> Result<Self, Error> {
let req = self.build_request(
"/version",
Builder::new().method(Method::GET),
None::<String>,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
let res = self
.process_into_value::<crate::models::SystemVersion>(req)
.await?;
let server_version: ClientVersion = if let Some(api_version) = res.api_version {
match api_version.into() {
MaybeClientVersion::Some(client_version) => client_version,
MaybeClientVersion::None => {
return Err(APIVersionParseError {});
}
}
} else {
return Err(APIVersionParseError {});
};
if server_version < self.client_version() {
self.version
.0
.store(server_version.major_version, Ordering::Relaxed);
self.version
.1
.store(server_version.minor_version, Ordering::Relaxed);
}
Ok(self)
}
pub(crate) fn process_request(
&self,
request: Result<Request<BodyType>, Error>,
) -> impl Future<Output = Result<Response<Incoming>, Error>> {
let transport = self.transport.clone();
let timeout = self.client_timeout;
match request.as_ref().map(|b| b.body()) {
Ok(http_body_util::Either::Left(bytes)) => trace!("request: {bytes:?}"),
Ok(http_body_util::Either::Right(_)) => trace!("request: (stream)"),
Err(e) => trace!("request: Err({e:?}"),
};
async move {
let request = request?;
let response = Docker::execute_request(transport, request, timeout).await?;
let status = response.status();
match status {
s if s.is_success() || s == StatusCode::NOT_MODIFIED => Ok(response),
StatusCode::SWITCHING_PROTOCOLS => Ok(response),
_ => {
let contents = Docker::decode_into_string(response).await?;
let mut message = String::new();
if !contents.is_empty() {
message = serde_json::from_str::<DockerServerErrorMessage>(&contents)
.map(|msg| msg.message)
.or_else(|e| {
if e.is_data() || e.is_syntax() {
Ok(contents)
} else {
Err(e)
}
})?;
}
Err(DockerResponseServerError {
status_code: status.as_u16(),
message,
})
}
}
}
}
pub(crate) fn build_request<O>(
&self,
path: &str,
builder: Builder,
query: Option<O>,
payload: Result<BodyType, Error>,
) -> Result<Request<BodyType>, Error>
where
O: Serialize,
{
let uri = Uri::parse(
&self.client_addr,
&self.client_type,
path,
query,
&self.client_version(),
)?;
let request_uri: hyper::Uri = uri.try_into()?;
debug!("{}", &request_uri);
let request = builder
.uri(request_uri)
.header(CONTENT_TYPE, "application/json")
.body(payload?)?;
let request = if let Some(modifier) = &self.request_modifier {
modifier(request)
} else {
request
};
Ok(request)
}
pub(crate) fn build_request_with_registry_auth<O>(
&self,
path: &str,
mut builder: Builder,
query: Option<O>,
payload: Result<BodyType, Error>,
credentials: DockerCredentialsHeader,
) -> Result<Request<BodyType>, Error>
where
O: Serialize,
{
match credentials {
DockerCredentialsHeader::Config(Some(config)) => {
let value = base64_url_encode(&serde_json::to_string(&config)?);
builder = builder.header("X-Registry-Config", value)
}
DockerCredentialsHeader::Auth(Some(config)) => {
let value = base64_url_encode(&serde_json::to_string(&config)?);
builder = builder.header("X-Registry-Auth", value)
}
_ => {}
}
self.build_request(path, builder, query, payload)
}
async fn execute_request(
transport: Arc<Transport>,
req: Request<BodyType>,
timeout: u64,
) -> Result<Response<Incoming>, Error> {
let request = match *transport {
#[cfg(feature = "http")]
Transport::Http { ref client } => client.request(req).map_err(Error::from).boxed(),
#[cfg(feature = "ssl_providerless")]
Transport::Https { ref client } => client.request(req).map_err(Error::from).boxed(),
#[cfg(all(feature = "pipe", unix))]
Transport::Unix { ref client } => client.request(req).map_err(Error::from).boxed(),
#[cfg(all(feature = "pipe", windows))]
Transport::NamedPipe { ref client } => client.request(req).map_err(Error::from).boxed(),
#[cfg(feature = "ssh")]
Transport::Ssh { ref client } => client.request(req).map_err(Error::from).boxed(),
#[cfg(test)]
Transport::Mock { ref client } => client.request(req).map_err(Error::from).boxed(),
Transport::Custom { ref transport } => transport.request(req).boxed(),
};
match tokio::time::timeout(Duration::from_secs(timeout), request).await {
Ok(v) => Ok(v?),
Err(_) => Err(RequestTimeoutError),
}
}
fn decode_into_stream<T>(res: Response<Incoming>) -> impl Stream<Item = Result<T, Error>>
where
T: DeserializeOwned,
{
FramedRead::new(StreamReader::new(res.into_body()), JsonLineDecoder::new())
}
fn decode_into_stream_string(
res: Response<Incoming>,
) -> impl Stream<Item = Result<LogOutput, Error>> {
FramedRead::new(
StreamReader::new(res.into_body()),
NewlineLogOutputDecoder::new(false),
)
.map_err(Error::from)
}
async fn decode_into_string(response: Response<Incoming>) -> Result<String, Error> {
let body = response.into_body().collect().await?.to_bytes();
Ok(String::from_utf8_lossy(&body).to_string())
}
async fn decode_response<T>(response: Response<Incoming>) -> Result<T, Error>
where
T: DeserializeOwned,
{
let bytes = response.into_body().collect().await?.to_bytes();
debug!("Decoded into string: {}", &String::from_utf8_lossy(&bytes));
serde_json::from_slice::<T>(&bytes).map_err(|e| {
if e.is_data() || e.is_syntax() {
JsonDataError {
message: e.to_string(),
column: e.column(),
#[cfg(feature = "json_data_content")]
contents: String::from_utf8_lossy(&bytes).to_string(),
}
} else {
e.into()
}
})
}
}
pub(crate) type BodyType = http_body_util::Either<
Full<Bytes>,
StreamBody<Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, std::io::Error>> + Send>>>,
>;
pub fn body_stream(body: impl Stream<Item = Bytes> + Send + 'static) -> BodyType {
BodyType::Right(StreamBody::new(Box::pin(body.map(|a| Ok(Frame::data(a))))))
}
pub fn body_try_stream(
body: impl Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
) -> BodyType {
BodyType::Right(StreamBody::new(Box::pin(body.map_ok(Frame::data))))
}
pub fn body_full(body: Bytes) -> BodyType {
BodyType::Left(Full::new(body))
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(unix)]
mod podman {
use super::*;
#[test]
fn rootless_socket_from_xdg_runtime_dir() {
let dir = tempfile::tempdir().unwrap();
let sock_dir = dir.path().join("podman");
std::fs::create_dir_all(&sock_dir).unwrap();
let sock = sock_dir.join("podman.sock");
std::fs::write(&sock, b"").unwrap();
let _guard = TempEnvVar::set("XDG_RUNTIME_DIR", dir.path().to_str().unwrap());
let found = Docker::podman_rootless_socket_path();
assert_eq!(found.as_deref(), Some(sock.to_str().unwrap()));
}
#[test]
fn rootless_socket_returns_none_when_missing() {
let dir = tempfile::tempdir().unwrap();
let _guard = TempEnvVar::set("XDG_RUNTIME_DIR", dir.path().to_str().unwrap());
let found = Docker::podman_rootless_socket_path();
let xdg_sock = dir.path().join("podman/podman.sock");
if let Some(ref path) = found {
assert_ne!(path.as_str(), xdg_sock.to_str().unwrap());
}
}
#[test]
fn system_socket_returns_none_when_missing() {
let _ = Docker::podman_system_socket_path();
}
#[test]
fn connect_with_podman_defaults_respects_docker_host() {
let dir = tempfile::tempdir().unwrap();
let sock = dir.path().join("test.sock");
std::fs::write(&sock, b"").unwrap();
let uri = format!("unix://{}", sock.display());
let _guard = TempEnvVar::set("DOCKER_HOST", &uri);
let docker = Docker::connect_with_podman_defaults().unwrap();
assert_eq!(docker.client_addr, sock.to_str().unwrap());
}
struct TempEnvVar {
key: String,
prev: Option<String>,
}
impl TempEnvVar {
fn set(key: &str, val: &str) -> Self {
let prev = env::var(key).ok();
env::set_var(key, val);
Self {
key: key.to_string(),
prev,
}
}
}
impl Drop for TempEnvVar {
fn drop(&mut self) {
match &self.prev {
Some(v) => env::set_var(&self.key, v),
None => env::remove_var(&self.key),
}
}
}
}
#[cfg(all(unix, feature = "pipe"))]
mod docker_defaults {
use super::*;
#[test]
fn connect_with_unix_defaults_respects_docker_host() {
let dir = tempfile::tempdir().unwrap();
let sock = dir.path().join("test.sock");
std::fs::write(&sock, b"").unwrap();
let uri = format!("unix://{}", sock.display());
let prev = env::var("DOCKER_HOST").ok();
env::set_var("DOCKER_HOST", &uri);
let docker = Docker::connect_with_unix_defaults().unwrap();
assert_eq!(docker.client_addr, sock.to_str().unwrap());
match prev {
Some(v) => env::set_var("DOCKER_HOST", v),
None => env::remove_var("DOCKER_HOST"),
}
}
#[test]
fn connect_with_unix_defaults_ignores_non_unix_docker_host() {
let prev = env::var("DOCKER_HOST").ok();
env::set_var("DOCKER_HOST", "tcp://localhost:2375");
let result = Docker::connect_with_unix_defaults();
if let Err(SocketNotFoundError(addr)) = &result {
assert!(addr.contains("docker.sock"));
}
match prev {
Some(v) => env::set_var("DOCKER_HOST", v),
None => env::remove_var("DOCKER_HOST"),
}
}
}
}