pub mod builder;
pub mod errors;
pub mod rep;
pub mod transport;
pub mod tty;
mod tarball;
pub use crate::{
builder::{
BuildOptions, ContainerConnectionOptions, ContainerFilter, ContainerListOptions,
ContainerOptions, EventsOptions, ExecContainerOptions, ExecResizeOptions, ImageFilter,
ImageListOptions, LogsOptions, NetworkCreateOptions, NetworkListOptions, PullOptions,
RegistryAuth, RmContainerOptions, ServiceFilter, ServiceListOptions, ServiceOptions,
TagOptions, VolumeCreateOptions,
},
errors::Error,
};
use crate::{
rep::{
Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event,
ExecDetails, Exit, History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo,
NetworkDetails as NetworkInfo, SearchResult, ServiceCreateInfo, ServiceDetails,
Services as ServicesRep, Stats, Status, Top, Version, Volume as VolumeRep,
VolumeCreateInfo, Volumes as VolumesRep,
},
transport::{tar, Headers, Payload, Transport},
tty::Multiplexer as TtyMultiPlexer,
};
use futures_util::{
io::{AsyncRead, AsyncWrite},
stream::Stream,
TryFutureExt, TryStreamExt,
};
pub use hyper::Uri;
use hyper::{client::HttpConnector, Body, Client, Method};
#[cfg(feature = "tls")]
use hyper_openssl::HttpsConnector;
#[cfg(feature = "unix-socket")]
use hyperlocal::UnixConnector;
use mime::Mime;
#[cfg(feature = "tls")]
use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
use serde_json::Value;
use std::{env, io, io::Read, iter, path::Path, time::Duration};
use url::form_urlencoded;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
pub struct Docker {
transport: Transport,
}
pub struct Image<'docker> {
docker: &'docker Docker,
name: String,
}
impl<'docker> Image<'docker> {
pub fn new<S>(
docker: &'docker Docker,
name: S,
) -> Self
where
S: Into<String>,
{
Image {
docker,
name: name.into(),
}
}
pub async fn inspect(&self) -> Result<ImageDetails> {
self.docker
.get_json(&format!("/images/{}/json", self.name)[..])
.await
}
pub async fn history(&self) -> Result<Vec<History>> {
self.docker
.get_json(&format!("/images/{}/history", self.name)[..])
.await
}
pub async fn delete(&self) -> Result<Vec<Status>> {
self.docker
.delete_json::<Vec<Status>>(&format!("/images/{}", self.name)[..])
.await
}
pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + Unpin + 'docker {
Box::pin(
self.docker
.stream_get(format!("/images/{}/get", self.name))
.map_ok(|c| c.to_vec()),
)
}
pub async fn tag(
&self,
opts: &TagOptions,
) -> Result<()> {
let mut path = vec![format!("/images/{}/tag", self.name)];
if let Some(query) = opts.serialize() {
path.push(query)
}
let _ = self.docker.post(&path.join("?"), None).await?;
Ok(())
}
}
pub struct Images<'docker> {
docker: &'docker Docker,
}
impl<'docker> Images<'docker> {
pub fn new(docker: &'docker Docker) -> Self {
Images { docker }
}
pub fn build(
&self,
opts: &BuildOptions,
) -> impl Stream<Item = Result<Value>> + Unpin + 'docker {
let mut endpoint = vec!["/build".to_owned()];
if let Some(query) = opts.serialize() {
endpoint.push(query)
}
let mut bytes = Vec::default();
let tar_result = tarball::dir(&mut bytes, opts.path.as_str());
let docker = self.docker;
Box::pin(
async move {
tar_result?;
let value_stream = docker.stream_post_into_values(
endpoint.join("?"),
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
);
Ok(value_stream)
}
.try_flatten_stream(),
)
}
pub async fn list(
&self,
opts: &ImageListOptions,
) -> Result<Vec<ImageRep>> {
let mut path = vec!["/images/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
self.docker.get_json::<Vec<ImageRep>>(&path.join("?")).await
}
pub fn get<S>(
&self,
name: S,
) -> Image<'docker>
where
S: Into<String>,
{
Image::new(self.docker, name)
}
pub async fn search(
&self,
term: &str,
) -> Result<Vec<SearchResult>> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("term", term)
.finish();
self.docker
.get_json::<Vec<SearchResult>>(&format!("/images/search?{}", query)[..])
.await
}
pub fn pull(
&self,
opts: &PullOptions,
) -> impl Stream<Item = Result<Value>> + Unpin + 'docker {
let mut path = vec!["/images/create".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
let headers = opts
.auth_header()
.map(|a| iter::once(("X-Registry-Auth", a)));
Box::pin(
self.docker
.stream_post_into_values(path.join("?"), None, headers),
)
}
pub fn export(
&self,
names: Vec<&str>,
) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
let params = names.iter().map(|n| ("names", *n));
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish();
self.docker
.stream_get(format!("/images/get?{}", query))
.map_ok(|c| c.to_vec())
}
pub fn import<R>(
self,
mut tarball: R,
) -> impl Stream<Item = Result<Value>> + Unpin + 'docker
where
R: Read + Send + 'docker,
{
Box::pin(
async move {
let mut bytes = Vec::default();
tarball.read_to_end(&mut bytes)?;
let value_stream = self.docker.stream_post_into_values(
"/images/load",
Some((Body::from(bytes), tar())),
None::<iter::Empty<_>>,
);
Ok(value_stream)
}
.try_flatten_stream(),
)
}
}
pub struct Container<'docker> {
docker: &'docker Docker,
id: String,
}
impl<'docker> Container<'docker> {
pub fn new<S>(
docker: &'docker Docker,
id: S,
) -> Self
where
S: Into<String>,
{
Container {
docker,
id: id.into(),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn inspect(&self) -> Result<ContainerDetails> {
self.docker
.get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..])
.await
}
pub async fn top(
&self,
psargs: Option<&str>,
) -> Result<Top> {
let mut path = vec![format!("/containers/{}/top", self.id)];
if let Some(ref args) = psargs {
let encoded = form_urlencoded::Serializer::new(String::new())
.append_pair("ps_args", args)
.finish();
path.push(encoded)
}
self.docker.get_json(&path.join("?")).await
}
pub fn logs(
&self,
opts: &LogsOptions,
) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
let mut path = vec![format!("/containers/{}/logs", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
let stream = Box::pin(self.docker.stream_get(path.join("?")));
Box::pin(tty::decode(stream))
}
async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'docker> {
self.docker
.stream_post_upgrade(
format!(
"/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
self.id
),
None,
)
.await
}
pub async fn attach(&self) -> Result<TtyMultiPlexer<'docker>> {
let tcp_stream = self.attach_raw().await?;
Ok(TtyMultiPlexer::new(tcp_stream))
}
pub async fn changes(&self) -> Result<Vec<Change>> {
self.docker
.get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..])
.await
}
pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
self.docker
.stream_get(format!("/containers/{}/export", self.id))
.map_ok(|c| c.to_vec())
}
pub fn stats(&self) -> impl Stream<Item = Result<Stats>> + Unpin + 'docker {
let codec = futures_codec::LinesCodec {};
let reader = Box::pin(
self.docker
.stream_get(format!("/containers/{}/stats", self.id))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
)
.into_async_read();
Box::pin(
futures_codec::FramedRead::new(reader, codec)
.map_err(Error::IO)
.and_then(|s: String| async move {
serde_json::from_str(&s).map_err(Error::SerdeJsonError)
}),
)
}
pub async fn start(&self) -> Result<()> {
self.docker
.post(&format!("/containers/{}/start", self.id)[..], None)
.await?;
Ok(())
}
pub async fn stop(
&self,
wait: Option<Duration>,
) -> Result<()> {
let mut path = vec![format!("/containers/{}/stop", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
.append_pair("t", &w.as_secs().to_string())
.finish();
path.push(encoded)
}
self.docker.post(&path.join("?"), None).await?;
Ok(())
}
pub async fn restart(
&self,
wait: Option<Duration>,
) -> Result<()> {
let mut path = vec![format!("/containers/{}/restart", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
.append_pair("t", &w.as_secs().to_string())
.finish();
path.push(encoded)
}
self.docker.post(&path.join("?"), None).await?;
Ok(())
}
pub async fn kill(
&self,
signal: Option<&str>,
) -> Result<()> {
let mut path = vec![format!("/containers/{}/kill", self.id)];
if let Some(sig) = signal {
let encoded = form_urlencoded::Serializer::new(String::new())
.append_pair("signal", &sig.to_owned())
.finish();
path.push(encoded)
}
self.docker.post(&path.join("?"), None).await?;
Ok(())
}
pub async fn rename(
&self,
name: &str,
) -> Result<()> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("name", name)
.finish();
self.docker
.post(
&format!("/containers/{}/rename?{}", self.id, query)[..],
None,
)
.await?;
Ok(())
}
pub async fn pause(&self) -> Result<()> {
self.docker
.post(&format!("/containers/{}/pause", self.id)[..], None)
.await?;
Ok(())
}
pub async fn unpause(&self) -> Result<()> {
self.docker
.post(&format!("/containers/{}/unpause", self.id)[..], None)
.await?;
Ok(())
}
pub async fn wait(&self) -> Result<Exit> {
self.docker
.post_json(
format!("/containers/{}/wait", self.id),
Option::<(Body, Mime)>::None,
)
.await
}
pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/containers/{}", self.id)[..])
.await?;
Ok(())
}
pub async fn remove(
&self,
opts: RmContainerOptions,
) -> Result<()> {
let mut path = vec![format!("/containers/{}", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
self.docker.delete(&path.join("?")).await?;
Ok(())
}
pub fn exec(
&self,
opts: &ExecContainerOptions,
) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
Exec::create_and_start(self.docker, &self.id, opts)
}
pub fn copy_from(
&self,
path: &Path,
) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
let path_arg = form_urlencoded::Serializer::new(String::new())
.append_pair("path", &path.to_string_lossy())
.finish();
let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg);
self.docker.stream_get(endpoint).map_ok(|c| c.to_vec())
}
pub async fn copy_file_into<P: AsRef<Path>>(
&self,
path: P,
bytes: &[u8],
) -> Result<()> {
let path = path.as_ref();
let mut ar = tar::Builder::new(Vec::new());
let mut header = tar::Header::new_gnu();
header.set_size(bytes.len() as u64);
header.set_mode(0o0644);
ar.append_data(
&mut header,
path.to_path_buf()
.iter()
.skip(1)
.collect::<std::path::PathBuf>(),
bytes,
)
.unwrap();
let data = ar.into_inner().unwrap();
self.copy_to(Path::new("/"), data.into()).await?;
Ok(())
}
pub async fn copy_to(
&self,
path: &Path,
body: Body,
) -> Result<()> {
let path_arg = form_urlencoded::Serializer::new(String::new())
.append_pair("path", &path.to_string_lossy())
.finish();
let mime = "application/x-tar".parse::<Mime>().unwrap();
self.docker
.put(
&format!("/containers/{}/archive?{}", self.id, path_arg),
Some((body, mime)),
)
.await?;
Ok(())
}
}
pub struct Containers<'docker> {
docker: &'docker Docker,
}
impl<'docker> Containers<'docker> {
pub fn new(docker: &'docker Docker) -> Self {
Containers { docker }
}
pub async fn list(
&self,
opts: &ContainerListOptions,
) -> Result<Vec<ContainerRep>> {
let mut path = vec!["/containers/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query)
}
self.docker
.get_json::<Vec<ContainerRep>>(&path.join("?"))
.await
}
pub fn get<S>(
&self,
name: S,
) -> Container<'docker>
where
S: Into<String>,
{
Container::new(self.docker, name)
}
pub async fn create(
&self,
opts: &ContainerOptions,
) -> Result<ContainerCreateInfo> {
let body: Body = opts.serialize()?.into();
let mut path = vec!["/containers/create".to_owned()];
if let Some(ref name) = opts.name {
path.push(
form_urlencoded::Serializer::new(String::new())
.append_pair("name", name)
.finish(),
);
}
self.docker
.post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
.await
}
}
pub struct Exec<'docker> {
docker: &'docker Docker,
id: String,
}
impl<'docker> Exec<'docker> {
fn new<S>(
docker: &'docker Docker,
id: S,
) -> Self
where
S: Into<String>,
{
Exec {
docker,
id: id.into(),
}
}
pub async fn create(
docker: &'docker Docker,
container_id: &str,
opts: &ExecContainerOptions,
) -> Result<Exec<'docker>> {
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
id: String,
}
let body: Body = opts.serialize()?.into();
let id = docker
.post_json(
&format!("/containers/{}/exec", container_id),
Some((body, mime::APPLICATION_JSON)),
)
.await
.map(|resp: Response| resp.id)?;
Ok(Exec::new(docker, id))
}
pub(crate) fn create_and_start(
docker: &'docker Docker,
container_id: &str,
opts: &ExecContainerOptions,
) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
id: String,
}
let body_result = opts.serialize();
let container_endpoint = format!("/containers/{}/exec", container_id);
Box::pin(
async move {
let body: Body = body_result?.into();
let exec_id = docker
.post_json(&container_endpoint, Some((body, mime::APPLICATION_JSON)))
.await
.map(|resp: Response| resp.id)?;
let stream = Box::pin(docker.stream_post(
format!("/exec/{}/start", exec_id),
Some(("{}".into(), mime::APPLICATION_JSON)),
None::<iter::Empty<_>>,
));
Ok(tty::decode(stream))
}
.try_flatten_stream(),
)
}
pub async fn get<S>(
docker: &'docker Docker,
id: S,
) -> Exec<'docker>
where
S: Into<String>,
{
Exec::new(docker, id)
}
pub fn start(&self) -> impl Stream<Item = Result<tty::TtyChunk>> + 'docker {
let docker = self.docker;
let endpoint = format!("/exec/{}/start", &self.id);
Box::pin(
async move {
let stream = Box::pin(docker.stream_post(
endpoint,
Some(("{}".into(), mime::APPLICATION_JSON)),
None::<iter::Empty<_>>,
));
Ok(tty::decode(stream))
}
.try_flatten_stream(),
)
}
pub async fn inspect(&self) -> Result<ExecDetails> {
self.docker
.get_json(&format!("/exec/{}/json", &self.id)[..])
.await
}
pub async fn resize(
&self,
opts: &ExecResizeOptions,
) -> Result<()> {
let body: Body = opts.serialize()?.into();
self.docker
.post_json(
&format!("/exec/{}/resize", &self.id)[..],
Some((body, mime::APPLICATION_JSON)),
)
.await
}
}
pub struct Networks<'docker> {
docker: &'docker Docker,
}
impl<'docker> Networks<'docker> {
pub fn new(docker: &'docker Docker) -> Self {
Networks { docker }
}
pub async fn list(
&self,
opts: &NetworkListOptions,
) -> Result<Vec<NetworkInfo>> {
let mut path = vec!["/networks".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
self.docker.get_json(&path.join("?")).await
}
pub fn get<S>(
&self,
id: S,
) -> Network<'docker>
where
S: Into<String>,
{
Network::new(self.docker, id)
}
pub async fn create(
&self,
opts: &NetworkCreateOptions,
) -> Result<NetworkCreateInfo> {
let body: Body = opts.serialize()?.into();
let path = vec!["/networks/create".to_owned()];
self.docker
.post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
.await
}
}
pub struct Network<'docker> {
docker: &'docker Docker,
id: String,
}
impl<'docker> Network<'docker> {
pub fn new<S>(
docker: &'docker Docker,
id: S,
) -> Self
where
S: Into<String>,
{
Network {
docker,
id: id.into(),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn inspect(&self) -> Result<NetworkInfo> {
self.docker
.get_json(&format!("/networks/{}", self.id)[..])
.await
}
pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/networks/{}", self.id)[..])
.await?;
Ok(())
}
pub async fn connect(
&self,
opts: &ContainerConnectionOptions,
) -> Result<()> {
self.do_connection("connect", opts).await
}
pub async fn disconnect(
&self,
opts: &ContainerConnectionOptions,
) -> Result<()> {
self.do_connection("disconnect", opts).await
}
async fn do_connection(
&self,
segment: &str,
opts: &ContainerConnectionOptions,
) -> Result<()> {
let body: Body = opts.serialize()?.into();
self.docker
.post(
&format!("/networks/{}/{}", self.id, segment)[..],
Some((body, mime::APPLICATION_JSON)),
)
.await?;
Ok(())
}
}
pub struct Volumes<'docker> {
docker: &'docker Docker,
}
impl<'docker> Volumes<'docker> {
pub fn new(docker: &'docker Docker) -> Self {
Volumes { docker }
}
pub async fn create(
&self,
opts: &VolumeCreateOptions,
) -> Result<VolumeCreateInfo> {
let body: Body = opts.serialize()?.into();
let path = vec!["/volumes/create".to_owned()];
self.docker
.post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
.await
}
pub async fn list(&self) -> Result<Vec<VolumeRep>> {
let path = vec!["/volumes".to_owned()];
let volumes_rep = self.docker.get_json::<VolumesRep>(&path.join("?")).await?;
Ok(match volumes_rep.volumes {
Some(volumes) => volumes,
None => vec![],
})
}
pub fn get(
&self,
name: &str,
) -> Volume<'docker> {
Volume::new(self.docker, name)
}
}
pub struct Volume<'docker> {
docker: &'docker Docker,
name: String,
}
impl<'docker> Volume<'docker> {
pub fn new<S>(
docker: &'docker Docker,
name: S,
) -> Self
where
S: Into<String>,
{
Volume {
docker,
name: name.into(),
}
}
pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/volumes/{}", self.name)[..])
.await?;
Ok(())
}
}
pub struct Services<'docker> {
docker: &'docker Docker,
}
impl<'docker> Services<'docker> {
pub fn new(docker: &'docker Docker) -> Self {
Services { docker }
}
pub async fn list(
&self,
opts: &ServiceListOptions,
) -> Result<ServicesRep> {
let mut path = vec!["/services".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
self.docker.get_json::<ServicesRep>(&path.join("?")).await
}
pub fn get(
&self,
name: &str,
) -> Service<'docker> {
Service::new(self.docker, name)
}
}
pub struct Service<'docker> {
docker: &'docker Docker,
name: String,
}
impl<'docker> Service<'docker> {
pub fn new<S>(
docker: &'docker Docker,
name: S,
) -> Self
where
S: Into<String>,
{
Service {
docker,
name: name.into(),
}
}
pub async fn create(
&self,
opts: &ServiceOptions,
) -> Result<ServiceCreateInfo> {
let body: Body = opts.serialize()?.into();
let path = vec!["/service/create".to_owned()];
let headers = opts
.auth_header()
.map(|a| iter::once(("X-Registry-Auth", a)));
self.docker
.post_json_headers(
&path.join("?"),
Some((body, mime::APPLICATION_JSON)),
headers,
)
.await
}
pub async fn inspect(&self) -> Result<ServiceDetails> {
self.docker
.get_json(&format!("/services/{}", self.name)[..])
.await
}
pub async fn delete(&self) -> Result<()> {
self.docker
.delete_json(&format!("/services/{}", self.name)[..])
.await
}
pub fn logs(
&self,
opts: &LogsOptions,
) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
let mut path = vec![format!("/services/{}/logs", self.name)];
if let Some(query) = opts.serialize() {
path.push(query)
}
let stream = Box::pin(self.docker.stream_get(path.join("?")));
Box::pin(tty::decode(stream))
}
}
fn get_http_connector() -> HttpConnector {
let mut http = HttpConnector::new();
http.enforce_http(false);
http
}
#[cfg(feature = "tls")]
fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
let http = get_http_connector();
if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") {
let mut connector = SslConnector::builder(SslMethod::tls()).unwrap();
connector.set_cipher_list("DEFAULT").unwrap();
let cert = &format!("{}/cert.pem", certs);
let key = &format!("{}/key.pem", certs);
connector
.set_certificate_file(&Path::new(cert), SslFiletype::PEM)
.unwrap();
connector
.set_private_key_file(&Path::new(key), SslFiletype::PEM)
.unwrap();
if env::var("DOCKER_TLS_VERIFY").is_ok() {
let ca = &format!("{}/ca.pem", certs);
connector.set_ca_file(&Path::new(ca)).unwrap();
}
let tcp_host_str = if tcp_host_str.contains("tcp://") {
tcp_host_str.replace("tcp://", "https://")
} else {
tcp_host_str
};
Docker {
transport: Transport::EncryptedTcp {
client: Client::builder()
.build(HttpsConnector::with_connector(http, connector).unwrap()),
host: tcp_host_str,
},
}
} else {
Docker {
transport: Transport::Tcp {
client: Client::builder().build(http),
host: tcp_host_str,
},
}
}
}
#[cfg(not(feature = "tls"))]
fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
let http = get_http_connector();
Docker {
transport: Transport::Tcp {
client: Client::builder().build(http),
host: tcp_host_str,
},
}
}
impl Docker {
pub fn new() -> Docker {
match env::var("DOCKER_HOST").ok() {
Some(host) => {
#[cfg(feature = "unix-socket")]
if let Some(path) = host.strip_prefix("unix://") {
return Docker::unix(path);
}
let host = host.parse().expect("invalid url");
Docker::host(host)
}
#[cfg(feature = "unix-socket")]
None => Docker::unix("/var/run/docker.sock"),
#[cfg(not(feature = "unix-socket"))]
None => panic!("Unix socket support is disabled"),
}
}
#[cfg(feature = "unix-socket")]
pub fn unix<S>(socket_path: S) -> Docker
where
S: Into<String>,
{
Docker {
transport: Transport::Unix {
client: Client::builder()
.pool_max_idle_per_host(0)
.build(UnixConnector),
path: socket_path.into(),
},
}
}
pub fn host(host: Uri) -> Docker {
let tcp_host_str = format!(
"{}://{}:{}",
host.scheme_str().unwrap(),
host.host().unwrap().to_owned(),
host.port_u16().unwrap_or(80)
);
match host.scheme_str() {
#[cfg(feature = "unix-socket")]
Some("unix") => Docker {
transport: Transport::Unix {
client: Client::builder().build(UnixConnector),
path: host.path().to_owned(),
},
},
#[cfg(not(feature = "unix-socket"))]
Some("unix") => panic!("Unix socket support is disabled"),
_ => get_docker_for_tcp(tcp_host_str),
}
}
pub fn images(&'_ self) -> Images<'_> {
Images::new(self)
}
pub fn containers(&'_ self) -> Containers<'_> {
Containers::new(self)
}
pub fn services(&'_ self) -> Services<'_> {
Services::new(self)
}
pub fn networks(&'_ self) -> Networks<'_> {
Networks::new(self)
}
pub fn volumes(&'_ self) -> Volumes<'_> {
Volumes::new(self)
}
pub async fn version(&self) -> Result<Version> {
self.get_json("/version").await
}
pub async fn info(&self) -> Result<Info> {
self.get_json("/info").await
}
pub async fn ping(&self) -> Result<String> {
self.get("/_ping").await
}
pub fn events<'docker>(
&'docker self,
opts: &EventsOptions,
) -> impl Stream<Item = Result<Event>> + Unpin + 'docker {
let mut path = vec!["/events".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
let reader = Box::pin(
self.stream_get(path.join("?"))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
)
.into_async_read();
let codec = futures_codec::LinesCodec {};
Box::pin(
futures_codec::FramedRead::new(reader, codec)
.map_err(Error::IO)
.and_then(|s: String| async move {
serde_json::from_str(&s).map_err(Error::SerdeJsonError)
}),
)
}
async fn get(
&self,
endpoint: &str,
) -> Result<String> {
self.transport
.request(Method::GET, endpoint, Payload::None, Headers::None)
.await
}
async fn get_json<T: serde::de::DeserializeOwned>(
&self,
endpoint: &str,
) -> Result<T> {
let raw_string = self
.transport
.request(Method::GET, endpoint, Payload::None, Headers::None)
.await?;
Ok(serde_json::from_str::<T>(&raw_string)?)
}
async fn post(
&self,
endpoint: &str,
body: Option<(Body, Mime)>,
) -> Result<String> {
self.transport
.request(Method::POST, endpoint, body, Headers::None)
.await
}
async fn put(
&self,
endpoint: &str,
body: Option<(Body, Mime)>,
) -> Result<String> {
self.transport
.request(Method::PUT, endpoint, body, Headers::None)
.await
}
async fn post_json<T, B>(
&self,
endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
) -> Result<T>
where
T: serde::de::DeserializeOwned,
B: Into<Body>,
{
let string = self
.transport
.request(Method::POST, endpoint, body, Headers::None)
.await?;
Ok(serde_json::from_str::<T>(&string)?)
}
async fn post_json_headers<'a, T, B, H>(
&self,
endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
headers: Option<H>,
) -> Result<T>
where
T: serde::de::DeserializeOwned,
B: Into<Body>,
H: IntoIterator<Item = (&'static str, String)> + 'a,
{
let string = self
.transport
.request(Method::POST, endpoint, body, headers)
.await?;
Ok(serde_json::from_str::<T>(&string)?)
}
async fn delete(
&self,
endpoint: &str,
) -> Result<String> {
self.transport
.request(Method::DELETE, endpoint, Payload::None, Headers::None)
.await
}
async fn delete_json<T: serde::de::DeserializeOwned>(
&self,
endpoint: &str,
) -> Result<T> {
let string = self
.transport
.request(Method::DELETE, endpoint, Payload::None, Headers::None)
.await?;
Ok(serde_json::from_str::<T>(&string)?)
}
fn stream_post<'a, H>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Option<(Body, Mime)>,
headers: Option<H>,
) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a
where
H: IntoIterator<Item = (&'static str, String)> + 'a,
{
self.transport
.stream_chunks(Method::POST, endpoint, body, headers)
}
fn stream_post_into_values<'a, H>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Option<(Body, Mime)>,
headers: Option<H>,
) -> impl Stream<Item = Result<Value>> + 'a
where
H: IntoIterator<Item = (&'static str, String)> + 'a,
{
self.stream_post(endpoint, body, headers)
.and_then(|chunk| async move {
let stream = futures_util::stream::iter(
serde_json::Deserializer::from_slice(&chunk)
.into_iter()
.collect::<Vec<_>>(),
)
.map_err(Error::from);
Ok(stream)
})
.try_flatten()
}
fn stream_get<'a>(
&'a self,
endpoint: impl AsRef<str> + Unpin + 'a,
) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a {
let headers = Some(Vec::default());
self.transport
.stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
}
async fn stream_post_upgrade<'a>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Option<(Body, Mime)>,
) -> Result<impl futures_util::io::AsyncRead + futures_util::io::AsyncWrite + 'a> {
self.transport
.stream_upgrade(Method::POST, endpoint, body)
.await
}
}
impl Default for Docker {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "unix-socket")]
#[test]
fn unix_host_env() {
use super::Docker;
use std::env;
env::set_var("DOCKER_HOST", "unix:///docker.sock");
let d = Docker::new();
match d.transport {
crate::transport::Transport::Unix { path, .. } => {
assert_eq!(path, "/docker.sock");
}
_ => {
panic!("Expected transport to be unix.");
}
}
env::set_var("DOCKER_HOST", "http://localhost:8000");
let d = Docker::new();
match d.transport {
crate::transport::Transport::Tcp { host, .. } => {
assert_eq!(host, "http://localhost:8000");
}
_ => {
panic!("Expected transport to be http.");
}
}
}
}