use std::collections::HashMap;
use std::rc::Rc;
use bollard::container::{CreateContainerOptions, LogOutput, LogsOptions, NetworkingConfig};
use bollard::service::{EndpointSettings, HostConfig};
use bollard::Docker;
use futures::StreamExt;
use tokio::{runtime::Handle, task::JoinHandle};
use crate::handle;
use crate::host::Mode;
use crate::network::Network;
use crate::port::Port;
use crate::runner::Test;
use crate::TestError;
use crate::config::{Config, ContainerConfig};
use crate::constants::{DOCKER_PLATFORM, LABEL_KEY, LABEL_VALUE};
enum ContainerStatus {
Created,
Started,
Stopped,
Removed,
Broken,
}
pub struct Container {
pub(crate) docker: Docker,
test: Rc<Test>,
id: String,
name: String,
status: ContainerStatus,
config: ContainerConfig,
sockets: HashMap<Port, String>,
logging_task: Option<JoinHandle<Result<(), TestError>>>,
upload: bool,
}
impl Container {
pub fn id(&self) -> &str {
&self.id
}
pub fn name(&self) -> &str {
&self.name
}
pub fn config(&self) -> &ContainerConfig {
&self.config
}
pub fn socket(&self, port: Port) -> Option<&str> {
self.sockets.get(&port).map(|s| s.as_str())
}
pub fn sockets(&self) -> &HashMap<Port, String> {
&self.sockets
}
fn test(&self) -> &Test {
&self.test
}
pub(super) async fn initialized(
docker: Docker,
test: Rc<Test>,
mode: Mode,
network: &Network,
config: &dyn Config,
) -> Result<Self, TestError> {
let config = config.to_container_config()?;
config.image().pull(&docker).await?;
let mut container = Container::create(docker, test, mode, network, config).await?;
container.ensure_upload().await?;
container.start().await?;
container.start_logging().await?;
if let Some(wait) = container.config.readiness() {
wait.probe(&container).await?;
}
Ok(container)
}
pub(super) async fn create(
docker: Docker,
test: Rc<Test>,
mode: Mode,
network: &Network,
config: ContainerConfig,
) -> Result<Self, TestError> {
log::info!("Creating Docker container for host {}.", config.hostname());
let handle = handle::for_container(config.hostname());
let exposed_ports: HashMap<_, _> = config
.ports()
.iter()
.map(|p| (format!("{}/tcp", p.port()), HashMap::new()))
.collect();
let upload = test.force_upload() || mode == Mode::Containerized;
let (port_bindings, sockets) = config.ports_and_sockets(mode)?;
let mounts = if !upload {
config.docker_mounts()
} else {
Default::default()
};
let platform = std::env::var(DOCKER_PLATFORM).ok();
let env = config.formatted_env();
let id = docker
.create_container(
Some(CreateContainerOptions {
name: handle.as_str(),
platform: platform.as_deref(),
}),
bollard::container::Config {
image: Some(config.image().locator()),
hostname: Some(config.hostname().to_string()),
env: (!env.is_empty()).then_some(env),
network_disabled: Some(false),
exposed_ports: (!exposed_ports.is_empty()).then_some(exposed_ports),
attach_stdin: Some(true),
attach_stdout: Some(true),
attach_stderr: Some(true),
open_stdin: Some(true),
host_config: Some(HostConfig {
port_bindings: (!port_bindings.is_empty()).then_some(port_bindings),
mounts: (!mounts.is_empty()).then_some(mounts),
..Default::default()
}),
networking_config: Some(NetworkingConfig {
endpoints_config: HashMap::from([(
network.id().to_string(),
EndpointSettings {
aliases: Some(vec![config.hostname().to_string()]),
..Default::default()
},
)]),
}),
cmd: Some(config.cmd().to_vec()),
labels: Some(HashMap::from([(
LABEL_KEY.to_string(),
LABEL_VALUE.to_string(),
)])),
..Default::default()
},
)
.await?
.id;
log::info!(
"Created container with id {id} for host {}.",
config.hostname()
);
Ok(Self {
id,
test,
name: handle,
docker,
config,
status: ContainerStatus::Created,
sockets,
logging_task: None,
upload,
})
}
pub(super) async fn start(&mut self) -> Result<(), TestError> {
self.docker
.start_container::<String>(&self.id, None)
.await?;
self.status = ContainerStatus::Started;
Ok(())
}
async fn start_logging(&mut self) -> Result<(), TestError> {
let filename = self
.test()
.target_dir()
.join(self.config.hostname())
.with_extension("log");
let mut file = tokio::fs::File::create(filename).await?;
let mut logs = self.docker.logs(
&self.id,
Some(LogsOptions::<&str> {
follow: true,
stdout: true,
stderr: true,
timestamps: true,
..Default::default()
}),
);
let dump_task = async move {
while let Some(log) = logs.next().await {
let message: Vec<u8> = match log? {
LogOutput::StdErr { message } => message,
LogOutput::StdOut { message } => message,
LogOutput::StdIn { message } => message,
LogOutput::Console { message } => message,
}
.into();
tokio::io::copy(&mut message.as_ref(), &mut file).await?;
}
Ok::<_, TestError>(())
};
self.logging_task = Some(tokio::spawn(dump_task));
Ok(())
}
async fn stop(&mut self) {
match self.docker.stop_container(self.name(), None).await {
Ok(()) => {
self.status = ContainerStatus::Stopped;
}
Err(e) => {
self.status = ContainerStatus::Broken;
log::error!("Unable to stop container `{}`: {e}.", self.name());
}
}
}
async fn dump_logs(&mut self) {
log::info!("Dumping logs for host {}.", self.config.hostname());
if let Some(logging_task) = self.logging_task.take() {
let name = self.name();
let result = match logging_task.await {
Ok(result) => result,
Err(e) => {
log::error!("Unable to join logging task for container `{name}`: {e}");
return;
}
};
match result {
Ok(()) => {}
Err(e) => log::error!("Failed log dumping for container `{name}`: {e}"),
}
}
}
async fn remove(&mut self) {
log::info!(
"Removing Docker container for host {}.",
self.config.hostname()
);
match self.docker.remove_container(&self.id, None).await {
Ok(()) => {
self.status = ContainerStatus::Removed;
}
Err(e) => {
self.status = ContainerStatus::Broken;
log::error!("Unable to remove container {}: {e}.", self.name());
}
}
self.dump_logs().await;
}
pub async fn ensure_upload(&self) -> Result<(), TestError> {
use bollard::container::UploadToContainerOptions;
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Read as _;
use tempfile::NamedTempFile;
if !self.upload {
return Ok(());
}
log::info!("Uploading files to host {}.", self.config().hostname());
for (src, base, dst) in self.config.mounts() {
let write = NamedTempFile::new()?;
let mut read = write.reopen()?;
let enc = GzEncoder::new(write, Compression::default());
let mut tar = tar::Builder::new(enc);
tar.append_dir_all(dst, src)?;
tar.into_inner()?.finish()?;
let mut contents = Vec::new();
read.read_to_end(&mut contents)?;
self.docker
.upload_to_container(
&self.id,
Some(UploadToContainerOptions {
path: base.clone(),
..Default::default()
}),
contents.into(),
)
.await
.map_err(|e| {
log::error!("Failed to load files for host {}.", self.config.hostname());
TestError::Startup(format!(
"Unable to upload file '{src}' -> '{base}/{dst}': {e:?}"
))
})?;
}
Ok(())
}
pub(super) async fn dispose(&mut self) {
match self.status {
ContainerStatus::Created => {
self.remove().await;
}
ContainerStatus::Started => {
self.stop().await;
self.remove().await;
}
ContainerStatus::Stopped => {
self.remove().await;
}
ContainerStatus::Removed => {}
ContainerStatus::Broken => {}
}
}
}
impl Drop for Container {
fn drop(&mut self) {
tokio::task::block_in_place(|| Handle::current().block_on(self.dispose()));
}
}