use std::cmp;
use std::env;
use std::fmt;
#[cfg(any(feature = "ssl", feature = "tls"))]
use std::fs::File;
#[cfg(any(feature = "ssl", feature = "tls"))]
use std::io::prelude::*;
#[cfg(any(feature = "ssl", feature = "tls"))]
use std::path::{Path, PathBuf};
use std::str::from_utf8;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use arrayvec::ArrayVec;
#[cfg(any(feature = "ssl", feature = "tls"))]
use dirs;
use failure::Error;
use futures::future::{self, result};
use futures::Stream;
use http::header::CONTENT_TYPE;
use http::request::Builder;
use hyper::client::HttpConnector;
use hyper::rt::Future;
use hyper::{self, Body, Chunk, Client, Method, Request, Response, StatusCode};
use hyper_mock::HostToReplyConnector;
#[cfg(feature = "openssl")]
use hyper_openssl::HttpsConnector;
#[cfg(feature = "tls")]
use hyper_tls;
#[cfg(unix)]
use hyperlocal::UnixConnector;
#[cfg(feature = "tls")]
use native_tls::{Certificate, Identity, TlsConnector};
#[cfg(feature = "openssl")]
use openssl::ssl::SslConnector;
#[cfg(feature = "openssl")]
use openssl::ssl::{SslFiletype, SslMethod};
use tokio::timer::Timeout;
use tokio_codec::FramedRead;
use container::LogOutput;
use either::EitherResponse;
use errors::{
APIVersionParseError, DockerResponseBadParameterError, DockerResponseConflictError,
DockerResponseNotFoundError, DockerResponseNotModifiedError, DockerResponseServerError,
JsonDataError,
};
#[cfg(windows)]
use named_pipe::NamedPipeConnector;
use read::{JsonLineDecoder, NewlineLogOutputDecoder, StreamReader};
use system::Version;
use uri::Uri;
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use serde_json;
#[cfg(unix)]
pub const DEFAULT_SOCKET: &'static str = "unix:///var/run/docker.sock";
#[cfg(windows)]
pub const DEFAULT_NAMED_PIPE: &'static str = "npipe:////./pipe/docker_engine";
pub const DEFAULT_DOCKER_HOST: &'static str = "tcp://localhost:2375";
const DEFAULT_NUM_THREADS: usize = 1;
const DEFAULT_TIMEOUT: u64 = 120;
pub const API_DEFAULT_VERSION: &'static ClientVersion = &ClientVersion {
major_version: 1,
minor_version: 40,
};
pub(crate) const TRUE_STR: &'static str = "true";
pub(crate) const FALSE_STR: &'static str = "false";
#[cfg(any(feature = "ssl", feature = "tls"))]
pub fn default_cert_path() -> Result<PathBuf, Error> {
use errors::NoCertPathError;
let from_env = env::var("DOCKER_CERT_PATH").or_else(|_| env::var("DOCKER_CONFIG"));
if let Ok(ref path) = from_env {
Ok(Path::new(path).to_owned())
} else {
let home = dirs::home_dir().ok_or_else(|| NoCertPathError {})?;
Ok(home.join(".docker"))
}
}
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
#[cfg(unix)]
Unix,
Http,
#[cfg(any(feature = "ssl", feature = "tls"))]
SSL,
#[cfg(windows)]
NamedPipe,
}
pub(crate) enum Transport {
Http {
client: Client<HttpConnector>,
},
#[cfg(feature = "openssl")]
Https {
client: Client<HttpsConnector<HttpConnector>>,
},
#[cfg(feature = "tls")]
Tls {
client: Client<hyper_tls::HttpsConnector<HttpConnector>>,
},
#[cfg(unix)]
Unix {
client: Client<UnixConnector>,
},
#[cfg(windows)]
NamedPipe {
client: Client<NamedPipeConnector>,
},
HostToReply {
client: Client<HostToReplyConnector>,
},
}
impl fmt::Debug for Transport {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Transport::Http { .. } => write!(f, "HTTP"),
#[cfg(feature = "openssl")]
Transport::Https { .. } => write!(f, "HTTPS(openssl)"),
#[cfg(feature = "tls")]
Transport::Tls { .. } => write!(f, "HTTPS(native)"),
#[cfg(unix)]
Transport::Unix { .. } => write!(f, "Unix"),
#[cfg(windows)]
Transport::NamedPipe { .. } => write!(f, "NamedPipe"),
Transport::HostToReply { .. } => write!(f, "HostToReply"),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct ClientVersion {
pub major_version: usize,
pub minor_version: usize,
}
pub(crate) enum MaybeClientVersion {
Some(ClientVersion),
None,
}
impl From<String> for MaybeClientVersion {
fn from(s: String) -> MaybeClientVersion {
match s
.split(".")
.map(|v| v.parse::<usize>())
.collect::<Vec<Result<usize, std::num::ParseIntError>>>()
.as_slice()
{
[Ok(first), Ok(second)] => MaybeClientVersion::Some(ClientVersion {
major_version: first.to_owned(),
minor_version: second.to_owned(),
}),
_ => MaybeClientVersion::None,
}
}
}
impl fmt::Display for ClientVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}.{}", self.major_version, self.minor_version)
}
}
impl PartialOrd for ClientVersion {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
match self.major_version.partial_cmp(&other.major_version) {
Some(cmp::Ordering::Equal) => self.minor_version.partial_cmp(&other.minor_version),
res => res,
}
}
}
impl From<&(AtomicUsize, AtomicUsize)> for ClientVersion {
fn from(tpl: &(AtomicUsize, AtomicUsize)) -> ClientVersion {
ClientVersion {
major_version: tpl.0.load(Ordering::Relaxed),
minor_version: tpl.1.load(Ordering::Relaxed),
}
}
}
#[derive(Debug)]
pub struct Docker {
pub(crate) transport: Arc<Transport>,
pub(crate) client_type: ClientType,
pub(crate) client_addr: String,
pub(crate) client_timeout: u64,
pub(crate) version: Arc<(AtomicUsize, AtomicUsize)>,
}
impl Clone for Docker {
fn clone(&self) -> Docker {
Docker {
transport: self.transport.clone(),
client_type: self.client_type.clone(),
client_addr: self.client_addr.clone(),
client_timeout: self.client_timeout,
version: self.version.clone(),
}
}
}
#[cfg(feature = "openssl")]
impl Docker {
pub fn connect_with_ssl_defaults() -> Result<Docker, Error> {
let cert_path = default_cert_path()?;
if let Ok(ref host) = env::var("DOCKER_HOST") {
Docker::connect_with_ssl(
host,
&cert_path.join("key.pem"),
&cert_path.join("cert.pem"),
&cert_path.join("ca.pem"),
DEFAULT_NUM_THREADS,
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
} else {
Docker::connect_with_ssl(
DEFAULT_DOCKER_HOST,
&cert_path.join("key.pem"),
&cert_path.join("cert.pem"),
&cert_path.join("ca.pem"),
DEFAULT_NUM_THREADS,
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
}
}
pub fn connect_with_ssl(
addr: &str,
ssl_key: &Path,
ssl_cert: &Path,
ssl_ca: &Path,
num_threads: usize,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("tcp://", "", 1);
let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls())?;
ssl_connector_builder.set_ca_file(ssl_ca)?;
ssl_connector_builder.set_certificate_file(ssl_cert, SslFiletype::PEM)?;
ssl_connector_builder.set_private_key_file(ssl_key, SslFiletype::PEM)?;
let mut http_connector = HttpConnector::new(num_threads);
http_connector.enforce_http(false);
let https_connector: HttpsConnector<HttpConnector> =
HttpsConnector::with_connector(http_connector, ssl_connector_builder)?;
let client_builder = Client::builder();
let client = client_builder.build(https_connector);
let transport = Transport::Https { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::SSL,
client_addr: client_addr.to_owned(),
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
impl Docker {
pub fn connect_with_http_defaults() -> Result<Docker, Error> {
let host = env::var("DOCKER_HOST").unwrap_or(DEFAULT_DOCKER_HOST.to_string());
Docker::connect_with_http(
&host,
DEFAULT_NUM_THREADS,
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
}
pub fn connect_with_http(
addr: &str,
num_threads: usize,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("tcp://", "", 1);
let http_connector = HttpConnector::new(num_threads);
let client_builder = Client::builder();
let client = client_builder.build(http_connector);
let transport = Transport::Http { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Http,
client_addr: client_addr.to_owned(),
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
#[cfg(unix)]
impl Docker {
pub fn connect_with_unix_defaults() -> Result<Docker, Error> {
Docker::connect_with_unix(DEFAULT_SOCKET, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_unix(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("unix://", "", 1);
let unix_connector = UnixConnector::new();
let mut client_builder = Client::builder();
client_builder.keep_alive(false);
let client = client_builder.build(unix_connector);
let transport = Transport::Unix { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::Unix,
client_addr: client_addr.to_owned(),
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
#[cfg(windows)]
impl Docker {
pub fn connect_with_named_pipe_defaults() -> Result<Docker, Error> {
Docker::connect_with_named_pipe(DEFAULT_NAMED_PIPE, DEFAULT_TIMEOUT, API_DEFAULT_VERSION)
}
pub fn connect_with_named_pipe(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("npipe://", "", 1);
let named_pipe_connector = NamedPipeConnector::new();
let mut client_builder = Client::builder();
client_builder.keep_alive(false);
client_builder.http1_title_case_headers(true);
let client = client_builder.build(named_pipe_connector);
let transport = Transport::NamedPipe { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::NamedPipe,
client_addr: client_addr.to_owned(),
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(API_DEFAULT_VERSION.major_version),
AtomicUsize::new(API_DEFAULT_VERSION.minor_version),
)),
};
Ok(docker)
}
}
#[cfg(any(unix, windows))]
impl Docker {
pub fn connect_with_local_defaults() -> Result<Docker, Error> {
#[cfg(unix)]
return Docker::connect_with_unix_defaults();
#[cfg(windows)]
return Docker::connect_with_named_pipe_defaults();
}
pub fn connect_with_local(
addr: &str,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
#[cfg(unix)]
return Docker::connect_with_unix(addr, timeout, client_version);
#[cfg(windows)]
return Docker::connect_with_named_pipe(addr, timeout, client_version);
}
}
#[cfg(feature = "tls")]
impl Docker {
pub fn connect_with_tls_defaults() -> Result<Docker, Error> {
let cert_path = default_cert_path()?;
if let Ok(ref host) = env::var("DOCKER_HOST") {
Docker::connect_with_tls(
host,
&cert_path.join("identity.pfx"),
&cert_path.join("ca.pem"),
"",
DEFAULT_NUM_THREADS,
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
} else {
Docker::connect_with_tls(
DEFAULT_DOCKER_HOST,
&cert_path.join("identity.pfx"),
&cert_path.join("ca.pem"),
"",
DEFAULT_NUM_THREADS,
DEFAULT_TIMEOUT,
API_DEFAULT_VERSION,
)
}
}
pub fn connect_with_tls(
addr: &str,
pkcs12_file: &Path,
ca_file: &Path,
pkcs12_password: &str,
num_thread: usize,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_addr = addr.replacen("tcp://", "", 1);
let mut tls_connector_builder = TlsConnector::builder();
let mut file = File::open(pkcs12_file)?;
let mut buf = vec![];
file.read_to_end(&mut buf)?;
let identity = Identity::from_pkcs12(&buf, pkcs12_password)?;
let mut file = File::open(ca_file)?;
let mut buf = vec![];
file.read_to_end(&mut buf)?;
let ca = Certificate::from_pem(&buf)?;
let tls_connector_builder = tls_connector_builder.identity(identity);
tls_connector_builder.add_root_certificate(ca);
let mut http_connector = HttpConnector::new(num_thread);
http_connector.enforce_http(false);
let https_connector: hyper_tls::HttpsConnector<HttpConnector> =
hyper_tls::HttpsConnector::from((http_connector, tls_connector_builder.build()?));
let client_builder = Client::builder();
let client = client_builder.build(https_connector);
let transport = Transport::Tls { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: ClientType::SSL,
client_addr: client_addr.to_owned(),
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}
#[derive(Debug)]
pub struct DockerChain {
pub(super) inner: Docker,
}
impl Clone for DockerChain {
fn clone(&self) -> DockerChain {
DockerChain {
inner: self.inner.clone(),
}
}
}
impl Docker {
pub fn chain(self) -> DockerChain {
DockerChain { inner: self }
}
pub(crate) fn process_into_value<T>(
&self,
req: Result<Request<Body>, Error>,
) -> impl Future<Item = T, Error = Error>
where
T: DeserializeOwned,
{
self.process_request(req).and_then(Docker::decode_response)
}
pub(crate) fn process_into_stream<T>(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = T, Error = Error>
where
T: DeserializeOwned,
{
self.process_request(req)
.into_stream()
.map(Docker::decode_into_stream::<T>)
.flatten()
}
pub(crate) fn process_into_stream_string(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = LogOutput, Error = Error> {
self.process_request(req)
.into_stream()
.map(Docker::decode_into_stream_string)
.flatten()
}
pub(crate) fn process_into_unit(
&self,
req: Result<Request<Body>, Error>,
) -> impl Future<Item = (), Error = Error> {
self.process_request(req).and_then(|_| Ok(()))
}
pub(crate) fn process_into_body(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = Chunk, Error = Error> {
self.process_request(req)
.into_stream()
.map(|response| response.into_body().from_err())
.flatten()
}
pub(crate) fn process_upgraded_stream_string(
&self,
req: Result<Request<Body>, Error>,
) -> impl Stream<Item = LogOutput, Error = Error> {
self.process_request(req)
.into_stream()
.map(Docker::decode_into_upgraded_stream_string)
.flatten()
}
pub(crate) fn transpose_option<T>(
option: Option<Result<T, Error>>,
) -> Result<Option<T>, Error> {
match option {
Some(Ok(x)) => Ok(Some(x)),
Some(Err(e)) => Err(e),
None => Ok(None),
}
}
pub(crate) fn serialize_payload<S>(body: Option<S>) -> Result<Body, Error>
where
S: Serialize,
{
match body.map(|inst| serde_json::to_string(&inst)) {
Some(Ok(res)) => Ok(Some(res)),
Some(Err(e)) => Err(e),
None => Ok(None),
}
.map_err(|e| e.into())
.map(|payload| {
debug!("{}", payload.clone().unwrap_or_else(String::new));
payload
.map(|content| content.into())
.unwrap_or(Body::empty())
})
}
pub fn client_version(&self) -> ClientVersion {
self.version.as_ref().into()
}
pub fn negotiate_version(self) -> impl Future<Item = Self, Error = Error> {
let req = self.build_request::<_, String, String>(
"/version",
Builder::new().method(Method::GET),
Ok(None::<ArrayVec<[(_, _); 0]>>),
Ok(Body::empty()),
);
self.process_into_value::<Version>(req)
.and_then(move |res| {
let err_api_version = res.api_version.clone();
let server_version: ClientVersion = match res.api_version.into() {
MaybeClientVersion::Some(client_version) => client_version,
MaybeClientVersion::None => {
return Err(APIVersionParseError {
api_version: err_api_version,
}
.into())
}
};
if server_version < self.client_version() {
self.version
.0
.store(server_version.major_version, Ordering::Relaxed);
self.version
.1
.store(server_version.minor_version, Ordering::Relaxed);
}
Ok(self)
})
}
fn process_request(
&self,
req: Result<Request<Body>, Error>,
) -> impl Future<Item = Response<Body>, Error = Error> {
let transport = self.transport.clone();
let timeout = self.client_timeout;
result(req)
.and_then(move |request| Docker::execute_request(transport, request, timeout))
.and_then(|response| {
let status = response.status();
match status {
s if s.is_success() => EitherResponse::A(future::ok(response)),
StatusCode::SWITCHING_PROTOCOLS => EitherResponse::G(future::ok(response)),
StatusCode::NOT_MODIFIED => {
EitherResponse::F(Docker::decode_into_string(response).and_then(
|message| Err(DockerResponseNotModifiedError { message }.into()),
))
}
StatusCode::CONFLICT => {
EitherResponse::E(Docker::decode_into_string(response).and_then(
|message| Err(DockerResponseConflictError { message }.into()),
))
}
StatusCode::BAD_REQUEST => {
EitherResponse::D(Docker::decode_into_string(response).and_then(
|message| Err(DockerResponseBadParameterError { message }.into()),
))
}
StatusCode::NOT_FOUND => {
EitherResponse::C(Docker::decode_into_string(response).and_then(
|message| Err(DockerResponseNotFoundError { message }.into()),
))
}
_ => EitherResponse::B(Docker::decode_into_string(response).and_then(
move |message| {
Err(DockerResponseServerError {
status_code: status.as_u16(),
message,
}
.into())
},
)),
}
})
}
pub(crate) fn build_request<O, K, V>(
&self,
path: &str,
builder: &mut Builder,
query: Result<Option<O>, Error>,
payload: Result<Body, Error>,
) -> Result<Request<Body>, Error>
where
O: IntoIterator,
O::Item: ::std::borrow::Borrow<(K, V)>,
K: AsRef<str>,
V: AsRef<str>,
{
query
.and_then(|q| payload.map(|body| (q, body)))
.and_then(|(q, body)| {
let uri = Uri::parse(
&self.client_addr,
&self.client_type,
path,
q,
&self.client_version(),
)?;
let request_uri: hyper::Uri = uri.into();
Ok(builder
.uri(request_uri)
.header(CONTENT_TYPE, "application/json")
.body(body)?)
})
}
fn execute_request(
transport: Arc<Transport>,
request: Request<Body>,
timeout: u64,
) -> impl Future<Item = Response<Body>, Error = Error> {
let now = Instant::now();
let request = match *transport {
Transport::Http { ref client } => client.request(request),
#[cfg(feature = "openssl")]
Transport::Https { ref client } => client.request(request),
#[cfg(feature = "tls")]
Transport::Tls { ref client } => client.request(request),
#[cfg(unix)]
Transport::Unix { ref client } => client.request(request),
#[cfg(windows)]
Transport::NamedPipe { ref client } => client.request(request),
Transport::HostToReply { ref client } => client.request(request),
};
Timeout::new_at(request, now + Duration::from_secs(timeout)).from_err()
}
fn decode_into_stream<T>(res: Response<Body>) -> impl Stream<Item = T, Error = Error>
where
T: DeserializeOwned,
{
FramedRead::new(
StreamReader::new(res.into_body().from_err()),
JsonLineDecoder::new(),
)
}
fn decode_into_stream_string(
res: Response<Body>,
) -> impl Stream<Item = LogOutput, Error = Error> {
FramedRead::new(
StreamReader::new(res.into_body().from_err()),
NewlineLogOutputDecoder::new(),
)
.from_err()
}
fn decode_into_upgraded_stream_string(
res: Response<Body>,
) -> impl Stream<Item = LogOutput, Error = Error> {
res.into_body()
.on_upgrade()
.into_stream()
.map(|r| FramedRead::new(r, NewlineLogOutputDecoder::new()))
.map_err::<Error, _>(|e: hyper::Error| e.into())
.flatten()
}
fn decode_into_string(response: Response<Body>) -> impl Future<Item = String, Error = Error> {
response
.into_body()
.concat2()
.from_err()
.and_then(|body| from_utf8(&body).map(|x| x.to_owned()).map_err(|e| e.into()))
}
fn decode_response<T>(response: Response<Body>) -> impl Future<Item = T, Error = Error>
where
T: DeserializeOwned,
{
Docker::decode_into_string(response).and_then(|contents| {
debug!("Decoded into string: {}", &contents);
serde_json::from_str::<T>(&contents).map_err(|e| {
if e.is_data() {
JsonDataError {
message: e.to_string(),
column: e.column(),
contents: contents.to_owned(),
}
.into()
} else {
e.into()
}
})
})
}
pub fn connect_with_host_to_reply(
connector: HostToReplyConnector,
client_addr: String,
timeout: u64,
client_version: &ClientVersion,
) -> Result<Docker, Error> {
let client_builder = Client::builder();
let client = client_builder.build(connector);
#[cfg(unix)]
let client_type = ClientType::Unix;
#[cfg(windows)]
let client_type = ClientType::NamedPipe;
let transport = Transport::HostToReply { client };
let docker = Docker {
transport: Arc::new(transport),
client_type: client_type,
client_addr,
client_timeout: timeout,
version: Arc::new((
AtomicUsize::new(client_version.major_version),
AtomicUsize::new(client_version.minor_version),
)),
};
Ok(docker)
}
}