use std::cmp;
use std::env;
use std::fmt;
#[cfg(feature = "ssl")]
use std::fs;
use std::future::Future;
#[cfg(feature = "ssl")]
use std::io;
#[cfg(feature = "ssl")]
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use futures_core::Stream;
use futures_util::future::FutureExt;
use futures_util::future::TryFutureExt;
use futures_util::stream;
use futures_util::stream::TryStreamExt;
use http::header::CONTENT_TYPE;
use http::request::Builder;
use hyper::client::HttpConnector;
use hyper::{self, body::Bytes, Body, Client, Method, Request, Response, StatusCode};
#[cfg(feature = "ssl")]
use hyper_rustls::HttpsConnector;
#[cfg(unix)]
use hyperlocal::UnixClient as UnixConnector;
#[cfg(feature = "ssl")]
use rustls::internal::pemfile;
#[cfg(feature = "ssl")]
use rustls::sign::{CertifiedKey, RSASigningKey};
use tokio_util::codec::FramedRead;
use crate::container::LogOutput;
use crate::errors::Error;
use crate::errors::Error::*;
#[cfg(windows)]
use crate::named_pipe::NamedPipeConnector;
use crate::read::{JsonLineDecoder, NewlineLogOutputDecoder, StreamReader};
use crate::uri::Uri;
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
#[cfg(unix)]
pub const DEFAULT_SOCKET: &str = "unix:///var/run/docker.sock";
#[cfg(windows)]
pub const DEFAULT_NAMED_PIPE: &str = "npipe:////./pipe/docker_engine";
pub const DEFAULT_DOCKER_HOST: &str = "tcp://localhost:2375";
const DEFAULT_TIMEOUT: u64 = 120;
pub const API_DEFAULT_VERSION: &ClientVersion = &ClientVersion {
major_version: 1,
minor_version: 40,
};
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
#[cfg(unix)]
Unix,
Http,
#[cfg(feature = "ssl")]
SSL,
#[cfg(windows)]
NamedPipe,
}
pub(crate) enum Transport {
Http {
client: Client<HttpConnector>,
},
#[cfg(feature = "ssl")]
Https {
client: Client<HttpsConnector<HttpConnector>>,
},
#[cfg(unix)]
Unix {
client: Client<UnixConnector>,
},
#[cfg(windows)]
NamedPipe {
client: Client<NamedPipeConnector>,
},
}
impl fmt::Debug for Transport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Transport::Http { .. } => write!(f, "HTTP"),
#[cfg(feature = "ssl")]
Transport::Https { .. } => write!(f, "HTTPS(rustls)"),
#[cfg(unix)]
Transport::Unix { .. } => write!(f, "Unix"),
#[cfg(windows)]
Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct ClientVersion {
pub major_version: usize,
pub minor_version: usize,
}
pub(crate) enum MaybeClientVersion {
Some(ClientVersion),
None,
}
impl<T: Into<String>> From<T> for MaybeClientVersion {
fn from(s: T) -> MaybeClientVersion {
match s
.into()
.split('.')
.map(|v| v.parse::<usize>())
.collect::<Vec<Result<usize, std::num::ParseIntError>>>()
.as_slice()
{
[Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
major_version: first.to_owned(),
minor_version: second.to_owned(),
}),
_ => MaybeClientVersion::None,
}
}
}
impl fmt::Display for ClientVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}.{}", self.major_version, self.minor_version)
}
}
impl PartialOrd for ClientVersion {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
match self.major_version.partial_cmp(&other.major_version) {
Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
res => res,
}
}
}
impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
ClientVersion {
major_version: tpl.0.load(Ordering::Relaxed),
minor_version: tpl.1.load(Ordering::Relaxed),
}
}
}
pub(crate) fn serialize_as_json<T, S>(t: &T, s: S) -> Result<S::Ok, S::Error>
where
T: Serialize,
S: serde::Serializer,
{
s.serialize_str(
&serde_json::to_string(t).map_err(|e| serde::ser::Error::custom(format!("{}", e)))?,
)
}
pub(crate) fn serialize_as_timestamp<S>(
opt: &Option<DateTime<Utc>>,
s: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match opt {
Some(t) => s.serialize_str(&format!("{}.{}", t.timestamp(), t.timestamp_subsec_nanos())),
None => s.serialize_str(""),
}
}
#[derive(Debug)]
pub struct Docker {
pub(crate) transport: Arc<Transport>,
pub(crate) client_type: ClientType,
pub(crate) client_addr: String,
pub(crate) client_timeout: u64,
pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
}
impl Clone for Docker {
fn clone(&self) -> Docker {
Docker {
transport: self.transport.clone(),
client_type: self.client_type.clone(),
client_addr: self.client_addr.clone(),
client_timeout: self.client_timeout,
version: self.version.clone(),
}
}
}
#[cfg(feature = "ssl")]
struct DockerClientCertResolver {
ssl_key: PathBuf,
ssl_cert: PathBuf,
}
#[cfg(feature = "ssl")]
impl DockerClientCertResolver {
pub fn default_cert_path() -> Result<PathBuf, Error> {
let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
if let Ok(ref path) = from_env {
Ok(Path::new(path).to_owned())
} else {
let home = dirs_next::home_dir().ok_or_else(|| NoCertPathError)?;
Ok(home.join(".docker"))
}
}
fn open_buffered(path: &Path) -> Result<io::BufReader<fs::File>, Error> {
Ok(io::BufReader::new(fs::File::open(path)?))
}
fn certs(path: &Path) -> Result<Vec<rustls::Certificate>, Error> {
Ok(
pemfile::certs(&mut Self::open_buffered(path)?).map_err(|_| CertPathError {
path: path.to_path_buf(),
})?,
)
}
fn keys(path: &Path) -> Result<Vec<rustls::PrivateKey>, Error> {
let mut rdr = Self::open_buffered(path)?;
let keys = pemfile::rsa_private_keys(&mut rdr).map_err(|_| CertPathError {
path: path.to_path_buf(),
})?;
Ok(keys)
}
fn docker_client_key(&self) -> Result<CertifiedKey, Error> {
let all_certs = Self::certs(&self.ssl_cert)?;
let mut all_keys = Self::keys(&self.ssl_key)?;
let key = if all_keys.len() == 1 {
all_keys.remove(0)
} else {
return Err(CertMultipleKeys {
count: all_keys.len(),
path: self.ssl_key.to_owned(),
});
};
let signing_key = RSASigningKey::new(&key).map_err(|_| CertParseError {
path: self.ssl_key.to_owned(),
})?;
Ok(CertifiedKey::new(
all_certs,
Arc::new(Box::new(signing_key)),
))
}
}
#[cfg(feature = "ssl")]
impl rustls::ResolvesClientCert for DockerClientCertResolver {
fn resolve(&self, _: &[&[u8]], _: &[rustls::SignatureScheme]) -> Option<CertifiedKey> {
self.docker_client_key().ok()
}
fn has_certs(&self) -> bool {
true
}
}
#[cfg(feature = "ssl")]
impl Docker {
pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
let cert_path = DockerClientCertResolver::default_cert_path()?;
if let Ok(ref host) = env::var("DOCKER_HOST") {
Docker::connect_with_ssl(
host,
&cert_path.join("key.pem"),
&cert_path.join("cert.pem"),
&cert_path.join("ca.pem"),
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
} else {
Docker::connect_with_ssl(
DEFAULT_DOCKER_HOST,
&cert_path.join("key.pem"),
&cert_path.join("cert.pem"),
&cert_path.join("ca.pem"),
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
}
}
pub fn connect_with_ssl(
addr: &str,
ssl_key: &Path,
ssl_cert: &Path,
ssl_ca: &Path,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("tcp://", "", 1).replacen("https://", "", 1);
let mut config = rustls::ClientConfig::new();
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
config.ct_logs = Some(&ct_logs::LOGS);
config.root_store = match rustls_native_certs::load_native_certs() {
Ok(store) => store,
Err((Some(store), err)) => {
warn!("could not load all certificates: {}", err);
store
}
Err((None, err)) => {
warn!("cannot access native certificate store: {}", err);
config.root_store
}
};
let mut ca_pem = io::Cursor::new(fs::read(ssl_ca)?);
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
config
.root_store
.add_pem_file(&mut ca_pem)
.map_err(|_| CertParseError {
path: ssl_ca.to_owned(),
})?;
config.client_auth_cert_resolver = Arc::new(DockerClientCertResolver {
ssl_key: ssl_key.to_owned(),
ssl_cert: ssl_cert.to_owned(),
});
let mut http_connector = HttpConnector::new();
http_connector.enforce_http(false);
let https_connector: HttpsConnector<HttpConnector> =
HttpsConnector::from((http_connector, config));
let client_builder = Client::builder();
let client = client_builder.build(https_connector);
let transport = Transport::Https { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::SSL,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
impl Docker {
pub fn connect_with_http_defaults() -> Result<Docker, Error> {
let host = env::var("DOCKER_HOST").unwrap_or_else(|_| DEFAULT_DOCKER_HOST.to_string());
Docker::connect_with_http(&host, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_http(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("tcp://", "", 1).replacen("http://", "", 1);
let http_connector = HttpConnector::new();
let client_builder = Client::builder();
let client = client_builder.build(http_connector);
let transport = Transport::Http { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Http,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
#[cfg(unix)]
impl Docker {
pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
Docker::connect_with_unix(DEFAULT_SOCKET, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_unix(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("unix://", "", 1);
let unix_connector = UnixConnector;
let mut client_builder = Client::builder();
client_builder.pool_max_idle_per_host(0);
let client = client_builder.build(unix_connector);
let transport = Transport::Unix { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Unix,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
#[cfg(windows)]
impl Docker {
pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_named_pipe(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("npipe://", "", 1);
let named_pipe_connector = NamedPipeConnector;
let mut client_builder = Client::builder();
client_builder.pool_max_idle_per_host(0);
client_builder.http1_title_case_headers(true);
let client = client_builder.build(named_pipe_connector);
let transport = Transport::NamedPipe { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::NamedPipe,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
#[cfg(any(unix, windows))]
impl Docker {
pub fn connect_with_local_defaults() -> Result<Docker, Error> {
#[cfg(unix)]
return Docker::connect_with_unix_defaults();
#[cfg(windows)]
return Docker::connect_with_named_pipe_defaults();
}
pub fn connect_with_local(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
#[cfg(unix)]
return Docker::connect_with_unix(addr, timeout, client_version);
#[cfg(windows)]
return Docker::connect_with_named_pipe(addr, timeout, client_version);
}
}
impl Docker {
pub(crate) fn process_into_value<T>(
&self,
req: Result<Request<Body>, Error>,
) -> impl Future<Output = Result<T, Error>>
where
T: DeserializeOwned,
{
let fut = self.process_request(req);
async move {
let response = fut.await?;
Docker::decode_response(response).await
}
}
pub(crate) fn process_into_stream<T>(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = Result<T, Error>> + Unpin
where
T: DeserializeOwned,
{
Box::pin(
self.process_request(req)
.map_ok(Docker::decode_into_stream::<T>)
.into_stream()
.try_flatten(),
)
}
pub(crate) fn process_into_stream_string(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = Result<LogOutput, Error>> + Unpin {
Box::pin(
self.process_request(req)
.map_ok(Docker::decode_into_stream_string)
.try_flatten_stream(),
)
}
pub(crate) fn process_into_unit(
&self,
req: Result<Request<Body>, Error>,
) -> impl Future<Output = Result<(), Error>> {
let fut = self.process_request(req);
async move {
fut.await?;
Ok(())
}
}
pub(crate) fn process_into_body(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
Box::pin(
self.process_request(req)
.map_ok(|response| response.into_body().map_err(Error::from))
.into_stream()
.try_flatten(),
)
}
pub(crate) fn process_upgraded_stream_string(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = Result<LogOutput, Error>> {
let fut = self.process_request(req);
stream::once(async move { fut.await.map(Docker::decode_into_upgraded_stream_string) })
.try_flatten()
}
pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<Body, Error>
where
S: Serialize,
{
match body.map(|inst| serde_json::to_string(&inst)) {
Some(Ok(res)) => Ok(Some(res)),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
.map(|payload| {
debug!("{}", payload.clone().unwrap_or_else(String::new));
payload
.map(|content| content.into())
.unwrap_or_else(Body::empty)
})
}
pub fn client_version(&self) -> ClientVersion {
self.version.as_ref().into()
}
pub async fn negotiate_version(self) -> Result<Self, Error> {
let req = self.build_request(
"/version",
Builder::new().method(Method::GET),
None::<String>,
Ok(Body::empty()),
);
let res = self
.process_into_value::<crate::system::Version>(req)
.await?;
let err_api_version = res.api_version.as_ref().unwrap().clone();
let server_version: ClientVersion = match res.api_version.as_ref().unwrap().into() {
MaybeClientVersion::Some(client_version) => client_version,
MaybeClientVersion::None => {
return Err(APIVersionParseError {
api_version: err_api_version,
})
}
};
if server_version < self.client_version() {
self.version
.0
.store(server_version.major_version, Ordering::Relaxed);
self.version
.1
.store(server_version.minor_version, Ordering::Relaxed);
}
Ok(self)
}
fn process_request(
&self,
request: Result<Request<Body>, Error>,
) -> impl Future<Output = Result<Response<Body>, Error>> {
let transport = self.transport.clone();
let timeout = self.client_timeout;
debug!("request: {:?}", request.as_ref().unwrap());
async move {
let request = request?;
let response = Docker::execute_request(transport, request, timeout).await?;
let status = response.status();
match status {
s if s.is_success() => Ok(response),
StatusCode::SWITCHING_PROTOCOLS => Ok(response),
StatusCode::NOT_MODIFIED => {
let message = Docker::decode_into_string(response).await?;
Err(DockerResponseNotModifiedError { message })
}
StatusCode::CONFLICT => {
let message = Docker::decode_into_string(response).await?;
Err(DockerResponseConflictError { message })
}
StatusCode::BAD_REQUEST => {
let message = Docker::decode_into_string(response).await?;
Err(DockerResponseBadParameterError { message })
}
StatusCode::NOT_FOUND => {
let message = Docker::decode_into_string(response).await?;
Err(DockerResponseNotFoundError { message })
}
_ => {
let message = Docker::decode_into_string(response).await?;
Err(DockerResponseServerError {
status_code: status.as_u16(),
message,
})
}
}
}
}
pub(crate) fn build_request<O>(
&self,
path: &str,
builder: Builder,
query: Option<O>,
payload: Result<Body, Error>,
) -> Result<Request<Body>, Error>
where
O: Serialize,
{
let uri = Uri::parse(
&self.client_addr,
&self.client_type,
path,
query,
&self.client_version(),
)?;
let request_uri: hyper::Uri = uri.into();
debug!("{}", &request_uri);
Ok(builder
.uri(request_uri)
.header(CONTENT_TYPE, "application/json")
.body(payload?)?)
}
async fn execute_request(
transport: Arc<Transport>,
req: Request<Body>,
timeout: u64,
) -> Result<Response<Body>, Error> {
let request = match *transport {
Transport::Http { ref client } => client.request(req),
#[cfg(feature = "ssl")]
Transport::Https { ref client } => client.request(req),
#[cfg(unix)]
Transport::Unix { ref client } => client.request(req),
#[cfg(windows)]
Transport::NamedPipe { ref client } => client.request(req),
};
match tokio::time::timeout(Duration::from_secs(timeout), request).await {
Ok(v) => Ok(v?),
Err(_) => Err(RequestTimeoutError),
}
}
fn decode_into_stream<T>(res: Response<Body>) -> impl Stream<Item = Result<T, Error>>
where
T: DeserializeOwned,
{
FramedRead::new(
StreamReader::new(res.into_body().map_err(Error::from)),
JsonLineDecoder::new(),
)
}
fn decode_into_stream_string(
res: Response<Body>,
) -> impl Stream<Item = Result<LogOutput, Error>> {
FramedRead::new(
StreamReader::new(res.into_body().map_err(Error::from)),
NewlineLogOutputDecoder::new(),
)
}
fn decode_into_upgraded_stream_string(
res: Response<Body>,
) -> impl Stream<Item = Result<LogOutput, Error>> {
res.into_body()
.on_upgrade()
.into_stream()
.map_ok(|r| FramedRead::new(r, NewlineLogOutputDecoder::new()))
.try_flatten()
}
async fn decode_into_string(response: Response<Body>) -> Result<String, Error> {
let body = hyper::body::to_bytes(response.into_body()).await?;
Ok(String::from_utf8_lossy(&body).to_string())
}
async fn decode_response<T>(response: Response<Body>) -> Result<T, Error>
where
T: DeserializeOwned,
{
let contents = Docker::decode_into_string(response).await?;
debug!("Decoded into string: {}", &contents);
serde_json::from_str::<T>(&contents).map_err(|e| {
if e.is_data() {
JsonDataError {
message: e.to_string(),
column: e.column(),
contents: contents.to_owned(),
}
} else {
e.into()
}
})
}
}