use crate::{
conn::{get_http_connector, Headers, Payload, Transport},
errors::{Error, Result},
Containers, Images, Networks, Volumes,
};
#[cfg(feature = "swarm")]
use crate::{Configs, Nodes, Plugins, Secrets, Services, Swarm, Tasks};
#[cfg(feature = "tls")]
use crate::conn::get_https_connector;
#[cfg(unix)]
use crate::conn::get_unix_connector;
use futures_util::{
io::{AsyncRead, AsyncWrite},
stream::Stream,
TryStreamExt,
};
use hyper::{body::Bytes, Body, Client, Method, Response};
use log::trace;
use serde::de::DeserializeOwned;
use std::path::Path;
#[derive(Debug, Clone)]
pub struct Docker {
transport: Transport,
}
impl Docker {
pub fn new<U>(uri: U) -> Result<Docker>
where
U: AsRef<str>,
{
let uri = uri.as_ref();
let mut it = uri.split("://");
match it.next() {
#[cfg(unix)]
Some("unix") => {
if let Some(path) = it.next() {
Ok(Docker::unix(path))
} else {
Err(Error::MissingAuthority)
}
}
#[cfg(not(unix))]
Some("unix") => Err(Error::UnsupportedScheme("unix".to_string())),
Some("tcp") | Some("http") => {
if let Some(host) = it.next() {
Docker::tcp(host)
} else {
Err(Error::MissingAuthority)
}
}
Some(scheme) => Err(Error::UnsupportedScheme(scheme.to_string())),
None => unreachable!(), }
}
#[cfg(unix)]
#[cfg_attr(docsrs, doc(cfg(unix)))]
pub fn unix<P>(socket_path: P) -> Docker
where
P: AsRef<Path>,
{
Docker {
transport: Transport::Unix {
client: Client::builder()
.pool_max_idle_per_host(0)
.build(get_unix_connector()),
path: socket_path.as_ref().to_path_buf(),
},
}
}
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub fn tls<H, P>(host: H, cert_path: P, verify: bool) -> Result<Docker>
where
H: AsRef<str>,
P: AsRef<Path>,
{
Ok(Docker {
transport: Transport::EncryptedTcp {
client: Client::builder().build(get_https_connector(cert_path.as_ref(), verify)?),
host: url::Url::parse(&format!("https://{}", host.as_ref()))
.map_err(Error::InvalidUrl)?,
},
})
}
pub fn tcp<H>(host: H) -> Result<Docker>
where
H: AsRef<str>,
{
Ok(Docker {
transport: Transport::Tcp {
client: Client::builder().build(get_http_connector()),
host: url::Url::parse(&format!("tcp://{}", host.as_ref()))
.map_err(Error::InvalidUrl)?,
},
})
}
pub fn images(&'_ self) -> Images<'_> {
Images::new(self)
}
pub fn containers(&'_ self) -> Containers<'_> {
Containers::new(self)
}
pub fn networks(&'_ self) -> Networks<'_> {
Networks::new(self)
}
pub fn volumes(&'_ self) -> Volumes<'_> {
Volumes::new(self)
}
pub(crate) async fn get(&self, endpoint: &str) -> Result<Response<Body>> {
self.transport
.request(Method::GET, endpoint, Payload::empty(), Headers::none())
.await
}
pub(crate) async fn get_json<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
let raw_string = self
.transport
.request_string(Method::GET, endpoint, Payload::empty(), Headers::none())
.await?;
trace!("{}", raw_string);
Ok(serde_json::from_str::<T>(&raw_string)?)
}
pub(crate) async fn post<B>(&self, endpoint: &str, body: Payload<B>) -> Result<String>
where
B: Into<Body>,
{
self.transport
.request_string(Method::POST, endpoint, body, Headers::none())
.await
}
pub(crate) async fn put<B>(&self, endpoint: &str, body: Payload<B>) -> Result<String>
where
B: Into<Body>,
{
self.transport
.request_string(Method::PUT, endpoint, body, Headers::none())
.await
}
pub(crate) async fn post_json<B, T>(
&self,
endpoint: impl AsRef<str>,
body: Payload<B>,
) -> Result<T>
where
T: DeserializeOwned,
B: Into<Body>,
{
let raw_string = self
.transport
.request_string(Method::POST, endpoint, body, Headers::none())
.await?;
trace!("{}", raw_string);
Ok(serde_json::from_str::<T>(&raw_string)?)
}
#[allow(dead_code)]
pub(crate) async fn post_json_headers<'a, B, T>(
&self,
endpoint: impl AsRef<str>,
body: Payload<B>,
headers: Option<Headers>,
) -> Result<T>
where
T: DeserializeOwned,
B: Into<Body>,
{
let raw_string = self
.transport
.request_string(Method::POST, endpoint, body, headers)
.await?;
trace!("{}", raw_string);
Ok(serde_json::from_str::<T>(&raw_string)?)
}
pub(crate) async fn delete(&self, endpoint: &str) -> Result<String> {
self.transport
.request_string(Method::DELETE, endpoint, Payload::empty(), Headers::none())
.await
}
pub(crate) async fn delete_json<T: DeserializeOwned>(&self, endpoint: &str) -> Result<T> {
let raw_string = self
.transport
.request_string(Method::DELETE, endpoint, Payload::empty(), Headers::none())
.await?;
trace!("{}", raw_string);
Ok(serde_json::from_str::<T>(&raw_string)?)
}
pub(crate) async fn head_response(&self, endpoint: &str) -> Result<Response<Body>> {
self.transport
.request(Method::HEAD, endpoint, Payload::empty(), Headers::none())
.await
}
pub(crate) fn stream_post<'a, B>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Payload<B>,
headers: Option<Headers>,
) -> impl Stream<Item = Result<Bytes>> + 'a
where
B: Into<Body> + 'a,
{
self.transport
.stream_chunks(Method::POST, endpoint, body, headers)
}
fn stream_json_post<'a, B>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Payload<B>,
headers: Option<Headers>,
) -> impl Stream<Item = Result<Bytes>> + 'a
where
B: Into<Body> + 'a,
{
self.transport
.stream_json_chunks(Method::POST, endpoint, body, headers)
}
pub(crate) fn stream_post_into<'a, B, T>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Payload<B>,
headers: Option<Headers>,
) -> impl Stream<Item = Result<T>> + 'a
where
B: Into<Body> + 'a,
T: DeserializeOwned,
{
self.stream_json_post(endpoint, body, headers)
.and_then(|chunk| async move {
trace!("got chunk {:?}", chunk);
let stream = futures_util::stream::iter(
serde_json::Deserializer::from_slice(&chunk)
.into_iter()
.collect::<Vec<_>>(),
)
.map_err(Error::from);
Ok(stream)
})
.try_flatten()
}
pub(crate) fn stream_get<'a>(
&'a self,
endpoint: impl AsRef<str> + Unpin + 'a,
) -> impl Stream<Item = Result<Bytes>> + 'a {
self.transport
.stream_chunks(Method::GET, endpoint, Payload::empty(), Headers::none())
}
pub(crate) async fn stream_post_upgrade<'a, B>(
&'a self,
endpoint: impl AsRef<str> + 'a,
body: Payload<B>,
) -> Result<impl AsyncRead + AsyncWrite + 'a>
where
B: Into<Body> + 'a,
{
self.transport
.stream_upgrade(Method::POST, endpoint, body)
.await
}
}
#[cfg(feature = "swarm")]
impl Docker {
pub fn services(&'_ self) -> Services<'_> {
Services::new(self)
}
pub fn configs(&'_ self) -> Configs<'_> {
Configs::new(self)
}
pub fn tasks(&'_ self) -> Tasks<'_> {
Tasks::new(self)
}
pub fn secrets(&'_ self) -> Secrets<'_> {
Secrets::new(self)
}
pub fn swarm(&'_ self) -> Swarm<'_> {
Swarm::new(self)
}
pub fn nodes(&'_ self) -> Nodes<'_> {
Nodes::new(self)
}
pub fn plugins(&'_ self) -> Plugins<'_> {
Plugins::new(self)
}
}
#[cfg(test)]
mod tests {
use super::{Docker, Error};
#[test]
fn creates_correct_docker() {
let d = Docker::new("tcp://127.0.0.1:80");
d.unwrap();
let d = Docker::new("http://127.0.0.1:80");
d.unwrap();
#[cfg(unix)]
let d = Docker::new("unix://127.0.0.1:80");
d.unwrap();
#[cfg(not(unix))]
{
let d = Docker::new("unix://127.0.0.1:80");
assert!(d.is_err());
match d.unwrap_err() {
Error::UnsupportedScheme(scheme) if &scheme == "unix" => {}
e => panic!(r#"Expected Error::UnsupportedScheme("unix"), got {}"#, e),
}
}
let d = Docker::new("rand://127.0.0.1:80");
match d.unwrap_err() {
Error::UnsupportedScheme(scheme) if &scheme == "rand" => {}
e => panic!(r#"Expected Error::UnsupportedScheme("rand"), got {}"#, e),
}
let d = Docker::new("invalid_uri");
match d.unwrap_err() {
Error::UnsupportedScheme(scheme) if &scheme == "invalid_uri" => {}
e => panic!(
r#"Expected Error::UnsupportedScheme("invalid_uri"), got {}"#,
e
),
}
let d = Docker::new("");
match d.unwrap_err() {
Error::UnsupportedScheme(scheme) if scheme.is_empty() => {}
e => panic!(r#"Expected Error::UnsupportedScheme(""), got {}"#, e),
}
}
}