use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use futures_core::Stream;
use futures_util::{StreamExt, TryStreamExt};
use http::header::{CONNECTION, CONTENT_TYPE, UPGRADE};
use http::request::Builder;
use http_body_util::Full;
use hyper::{body::Bytes, Method};
use serde::Serialize;
use serde_derive::Deserialize;
use std::cmp::Eq;
use std::collections::HashMap;
use std::hash::Hash;
use tokio::io::AsyncWrite;
use tokio_util::codec::FramedRead;
use std::fmt;
use std::pin::Pin;
use super::Docker;
use crate::docker::BodyType;
use crate::errors::Error;
use crate::models::*;
#[cfg(feature = "websocket")]
use crate::read::websocket::{WebSocketReader, WebSocketWriter};
use crate::read::NewlineLogOutputDecoder;
#[derive(Debug, Deserialize)]
pub struct PathStatResponse {
#[serde(rename = "name")]
pub name: String,
#[serde(rename = "size")]
pub size: i64,
#[serde(rename = "mode")]
pub file_mode: u32,
#[serde(rename = "mtime")]
pub modification_time: Option<String>,
#[serde(rename = "linkTarget")]
pub link_target: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
#[allow(missing_docs)]
pub struct NetworkingConfig<T: Into<String> + Hash + Eq> {
pub endpoints_config: HashMap<T, EndpointSettings>,
}
impl<T> From<NetworkingConfig<T>> for crate::models::NetworkingConfig
where
T: Into<String> + Hash + Eq,
{
fn from(config: NetworkingConfig<T>) -> Self {
crate::models::NetworkingConfig {
endpoints_config: Some(
config
.endpoints_config
.into_iter()
.map(|(k, v)| (k.into(), v))
.collect(),
),
}
}
}
pub struct AttachContainerResults {
pub output: Pin<Box<dyn Stream<Item = Result<LogOutput, Error>> + Send>>,
pub input: Pin<Box<dyn AsyncWrite + Send>>,
}
impl fmt::Debug for AttachContainerResults {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AttachContainerResults")
}
}
#[derive(Debug, Clone, PartialEq)]
#[allow(missing_docs)]
pub enum LogOutput {
StdErr { message: Bytes },
StdOut { message: Bytes },
StdIn { message: Bytes },
Console { message: Bytes },
}
impl fmt::Display for LogOutput {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let message = match &self {
LogOutput::StdErr { message } => message,
LogOutput::StdOut { message } => message,
LogOutput::StdIn { message } => message,
LogOutput::Console { message } => message,
};
write!(f, "{}", String::from_utf8_lossy(message))
}
}
impl AsRef<[u8]> for LogOutput {
fn as_ref(&self) -> &[u8] {
match self {
LogOutput::StdErr { message } => message.as_ref(),
LogOutput::StdOut { message } => message.as_ref(),
LogOutput::StdIn { message } => message.as_ref(),
LogOutput::Console { message } => message.as_ref(),
}
}
}
impl LogOutput {
pub fn into_bytes(self) -> Bytes {
match self {
LogOutput::StdErr { message } => message,
LogOutput::StdOut { message } => message,
LogOutput::StdIn { message } => message,
LogOutput::Console { message } => message,
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Serialize)]
pub struct CreateCheckpointOptions {
#[serde(rename = "CheckpointID")]
pub checkpoint_id: String,
#[serde(rename = "CheckpointDir")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checkpoint_dir: Option<String>,
#[serde(rename = "Exit")]
pub exit: bool,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize)]
pub struct ListCheckpointsOptions {
#[serde(rename = "dir")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checkpoint_dir: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize)]
pub struct DeleteCheckpointOptions {
#[serde(rename = "dir")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checkpoint_dir: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct Checkpoint {
#[serde(rename = "Name")]
pub name: String,
}
impl Docker {
pub async fn list_containers(
&self,
options: Option<crate::query_parameters::ListContainersOptions>,
) -> Result<Vec<ContainerSummary>, Error> {
let url = "/containers/json";
let req = self.build_request(
url,
Builder::new().method(Method::GET),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_value(req).await
}
pub async fn create_container(
&self,
options: Option<crate::query_parameters::CreateContainerOptions>,
config: ContainerCreateBody,
) -> Result<ContainerCreateResponse, Error> {
let url = "/containers/create";
let req = self.build_request(
url,
Builder::new().method(Method::POST),
options,
Docker::serialize_payload(Some(config)),
);
self.process_into_value(req).await
}
pub async fn start_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::StartContainerOptions>,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/start");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn stop_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::StopContainerOptions>,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/stop");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn remove_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::RemoveContainerOptions>,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}");
let req = self.build_request(
&url,
Builder::new().method(Method::DELETE),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub fn wait_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::WaitContainerOptions>,
) -> impl Stream<Item = Result<ContainerWaitResponse, Error>> {
let url = format!("/containers/{container_name}/wait");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_stream(req).map(|res| match res {
Ok(ContainerWaitResponse {
status_code: code,
error:
Some(ContainerWaitExitError {
message: Some(error),
}),
}) if code > 0 => Err(Error::DockerContainerWaitError { error, code }),
Ok(ContainerWaitResponse {
status_code: code,
error: None,
}) if code > 0 => Err(Error::DockerContainerWaitError {
error: String::new(),
code,
}),
v => v,
})
}
pub async fn attach_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::AttachContainerOptions>,
) -> Result<AttachContainerResults, Error> {
let url = format!("/containers/{container_name}/attach");
let req = self.build_request(
&url,
Builder::new()
.method(Method::POST)
.header(CONNECTION, "Upgrade")
.header(UPGRADE, "tcp"),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
let (read, write) = self.process_upgraded(req).await?;
let log = FramedRead::new(read, NewlineLogOutputDecoder::new(true)).map_err(|e| e.into());
Ok(AttachContainerResults {
output: Box::pin(log),
input: Box::pin(write),
})
}
#[cfg(all(feature = "websocket", unix))]
pub async fn attach_container_websocket(
&self,
container_name: &str,
options: Option<crate::query_parameters::AttachContainerOptions>,
) -> Result<AttachContainerResults, Error> {
let path = format!("/containers/{container_name}/attach/ws");
let ws_stream = self.process_websocket(&path, options).await?;
let (write, read) = futures_util::StreamExt::split(ws_stream);
let ws_reader = WebSocketReader::new(read);
let log =
FramedRead::new(ws_reader, NewlineLogOutputDecoder::new(true)).map_err(|e| e.into());
let ws_writer = WebSocketWriter::new(write);
Ok(AttachContainerResults {
output: Box::pin(log),
input: Box::pin(ws_writer),
})
}
#[cfg(all(feature = "websocket", not(unix)))]
pub async fn attach_container_websocket(
&self,
container_name: &str,
options: Option<crate::query_parameters::AttachContainerOptions>,
) -> Result<AttachContainerResults, Error> {
let path = format!("/containers/{container_name}/attach/ws");
let ws_stream = self.process_websocket(&path, options).await?;
let (write, read) = futures_util::StreamExt::split(ws_stream);
let ws_reader = WebSocketReader::new(read);
let log =
FramedRead::new(ws_reader, NewlineLogOutputDecoder::new(true)).map_err(|e| e.into());
let ws_writer = WebSocketWriter::new(write);
Ok(AttachContainerResults {
output: Box::pin(log),
input: Box::pin(ws_writer),
})
}
pub async fn resize_container_tty(
&self,
container_name: &str,
options: crate::query_parameters::ResizeContainerTTYOptions,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/resize");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
Some(options),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn restart_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::RestartContainerOptions>,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/restart");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn inspect_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::InspectContainerOptions>,
) -> Result<ContainerInspectResponse, Error> {
let url = format!("/containers/{container_name}/json");
let req = self.build_request(
&url,
Builder::new().method(Method::GET),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_value(req).await
}
pub async fn get_container_archive_info(
&self,
container_name: &str,
options: Option<crate::query_parameters::ContainerArchiveInfoOptions>,
) -> Result<PathStatResponse, Error> {
let url = format!("/containers/{container_name}/archive");
let req = self.build_request(
&url,
Builder::new().method(Method::HEAD),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
let container_path_stat_header = "X-Docker-Container-Path-Stat";
let response = self.process_request(req).await?;
let container_path_stat = response
.headers()
.get(container_path_stat_header)
.ok_or(Error::HttpHeaderNotFoundError(
container_path_stat_header.to_owned(),
))?
.to_str()?;
let decoded_response = BASE64_STANDARD.decode(container_path_stat)?;
let path_stat: PathStatResponse = serde_json::from_slice(&decoded_response)?;
Ok(path_stat)
}
pub async fn top_processes(
&self,
container_name: &str,
options: Option<crate::query_parameters::TopOptions>,
) -> Result<ContainerTopResponse, Error> {
let url = format!("/containers/{container_name}/top");
let req = self.build_request(
&url,
Builder::new().method(Method::GET),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_value(req).await
}
pub fn logs(
&self,
container_name: &str,
options: Option<crate::query_parameters::LogsOptions>,
) -> impl Stream<Item = Result<LogOutput, Error>> {
let url = format!("/containers/{container_name}/logs");
let req = self.build_request(
&url,
Builder::new().method(Method::GET),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_stream_string(req)
}
pub async fn container_changes(
&self,
container_name: &str,
) -> Result<Option<Vec<FilesystemChange>>, Error> {
let url = format!("/containers/{container_name}/changes");
let req = self.build_request(
&url,
Builder::new().method(Method::GET),
None::<String>,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_value(req).await
}
pub fn stats(
&self,
container_name: &str,
options: Option<crate::query_parameters::StatsOptions>,
) -> impl Stream<Item = Result<ContainerStatsResponse, Error>> {
let url = format!("/containers/{container_name}/stats");
let req = self.build_request(
&url,
Builder::new().method(Method::GET),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_stream(req)
}
pub async fn kill_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::KillContainerOptions>,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/kill");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn update_container(
&self,
container_name: &str,
config: ContainerUpdateBody,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/update");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
None::<String>,
Docker::serialize_payload(Some(config)),
);
self.process_into_unit(req).await
}
pub async fn rename_container(
&self,
container_name: &str,
options: crate::query_parameters::RenameContainerOptions,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/rename");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
Some(options),
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn pause_container(&self, container_name: &str) -> Result<(), Error> {
let url = format!("/containers/{container_name}/pause");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
None::<String>,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn unpause_container(&self, container_name: &str) -> Result<(), Error> {
let url = format!("/containers/{container_name}/unpause");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
None::<String>,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
pub async fn prune_containers(
&self,
options: Option<crate::query_parameters::PruneContainersOptions>,
) -> Result<ContainerPruneResponse, Error> {
let url = "/containers/prune";
let req = self.build_request(
url,
Builder::new().method(Method::POST),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_value(req).await
}
#[inline(always)]
#[deprecated(
since = "0.19.0",
note = "This method is refactored into upload_to_container"
)]
pub async fn upload_to_container_streaming(
&self,
container_name: &str,
options: Option<crate::query_parameters::UploadToContainerOptions>,
tar: impl Stream<Item = Bytes> + Send + 'static,
) -> Result<(), Error> {
self.upload_to_container(container_name, options, crate::body_stream(tar))
.await
}
pub async fn upload_to_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::UploadToContainerOptions>,
tar: BodyType,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/archive");
let req = self.build_request(
&url,
Builder::new()
.method(Method::PUT)
.header(CONTENT_TYPE, "application/x-tar"),
options,
Ok(tar),
);
self.process_into_unit(req).await
}
pub fn download_from_container(
&self,
container_name: &str,
options: Option<crate::query_parameters::DownloadFromContainerOptions>,
) -> impl Stream<Item = Result<Bytes, Error>> {
let url = format!("/containers/{container_name}/archive");
let req = self.build_request(
&url,
Builder::new().method(Method::GET),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_body(req)
}
pub fn export_container(
&self,
container_name: &str,
) -> impl Stream<Item = Result<Bytes, Error>> {
let url = format!("/containers/{container_name}/export");
let req = self.build_request(
&url,
Builder::new()
.method(Method::GET)
.header(CONTENT_TYPE, "application/json"),
None::<String>,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_body(req)
}
pub async fn create_checkpoint(
&self,
container_name: &str,
options: CreateCheckpointOptions,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/checkpoints");
let req = self.build_request(
&url,
Builder::new().method(Method::POST),
None::<String>,
Docker::serialize_payload(Some(options)),
);
self.process_into_unit(req).await
}
pub async fn list_checkpoints(
&self,
container_name: &str,
options: Option<ListCheckpointsOptions>,
) -> Result<Vec<Checkpoint>, Error> {
let url = format!("/containers/{container_name}/checkpoints");
let req = self.build_request(
&url,
Builder::new().method(Method::GET),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
let result: Option<Vec<Checkpoint>> = self.process_into_value(req).await?;
Ok(result.unwrap_or_default())
}
pub async fn delete_checkpoint(
&self,
container_name: &str,
checkpoint_id: &str,
options: Option<DeleteCheckpointOptions>,
) -> Result<(), Error> {
let url = format!("/containers/{container_name}/checkpoints/{checkpoint_id}");
let req = self.build_request(
&url,
Builder::new().method(Method::DELETE),
options,
Ok(BodyType::Left(Full::new(Bytes::new()))),
);
self.process_into_unit(req).await
}
}
#[cfg(not(windows))]
#[cfg(test)]
mod tests {
use futures_util::TryStreamExt;
use yup_hyper_mock::HostToReplyConnector;
use crate::models::ContainerCreateBody;
use crate::query_parameters::{CreateContainerOptions, WaitContainerOptions};
use crate::{Docker, API_DEFAULT_VERSION};
#[tokio::test]
async fn test_container_wait_with_error() {
let mut connector = HostToReplyConnector::default();
connector.m.insert(
String::from("http://127.0.0.1"),
"HTTP/1.1 200 OK\r\nServer:mock1\r\nContent-Type:application/json\r\n\r\n{\"Error\":null,\"StatusCode\":1}".to_string(),
);
let docker =
Docker::connect_with_mock(connector, "127.0.0.1".to_string(), 5, API_DEFAULT_VERSION)
.unwrap();
let result = &docker
.wait_container("wait_container_test", None::<WaitContainerOptions>)
.try_collect::<Vec<_>>()
.await;
assert!(matches!(
result,
Err(crate::errors::Error::DockerContainerWaitError { code: _, error: _ })
));
}
#[tokio::test]
async fn test_output_non_json_error() {
let mut connector = HostToReplyConnector::default();
connector.m.insert(
String::from("http://127.0.0.1"),
"HTTP/1.1 200 OK\r\nServer:mock1\r\nContent-Type:plain/text\r\n\r\nthis is not json"
.to_string(),
);
let docker =
Docker::connect_with_mock(connector, "127.0.0.1".to_string(), 5, API_DEFAULT_VERSION)
.unwrap();
let host_config = bollard_stubs::models::HostConfig {
mounts: Some(vec![bollard_stubs::models::Mount {
target: Some(String::from("/tmp")),
source: Some(String::from("./tmp")),
typ: Some(bollard_stubs::models::MountType::BIND),
consistency: Some(String::from("default")),
..Default::default()
}]),
..Default::default()
};
let result = &docker
.create_container(
Some(CreateContainerOptions {
name: Some("mount_volume_container_failure_test".to_string()),
..Default::default()
}),
ContainerCreateBody {
image: Some("some_image".to_string()),
host_config: Some(host_config),
..Default::default()
},
)
.await;
println!("{result:#?}");
assert!(matches!(
result,
Err(crate::errors::Error::JsonDataError {
message: _,
column: 2,
..
})
));
}
}