use std::{borrow::Cow, collections::HashMap, sync::Arc};
use bollard::{
container::{
Config, CreateContainerOptions, InspectContainerOptions, RemoveContainerOptions,
StartContainerOptions, StopContainerOptions,
},
image::CreateImageOptions,
query_parameters::{ListContainersOptions, ListImagesOptions},
secret::{
ContainerState, ContainerStateStatusEnum, Health, HealthConfig, HealthStatusEnum,
HostConfig, PortBinding,
},
Docker,
};
use futures::StreamExt;
pub struct RunningContainer {
name: Arc<str>,
docker: Docker,
}
impl RunningContainer {
pub async fn remove(&self) -> Result<(), anyhow::Error> {
remove(&self.docker, &self.name).await
}
pub async fn stop(&self) -> Result<(), anyhow::Error> {
stop(&self.docker, &self.name).await
}
}
pub async fn remove(docker: &Docker, name: &str) -> Result<(), anyhow::Error> {
Ok(docker
.remove_container(
name,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await?)
}
pub async fn stop(docker: &Docker, name: &str) -> Result<(), anyhow::Error> {
Ok(docker
.stop_container(name, Option::<StopContainerOptions>::None)
.await?)
}
pub struct ContainerRunnerBuilder<'a> {
name: Cow<'a, str>,
image: Option<String>,
port_bindings: Vec<(u16, u16)>,
env_vars: Vec<(String, String)>,
healthcheck: Option<HealthConfig>,
}
impl<'a> ContainerRunnerBuilder<'a> {
pub fn new(name: impl Into<Cow<'a, str>>) -> Self {
ContainerRunnerBuilder {
name: name.into(),
image: None,
port_bindings: Vec::new(),
env_vars: Vec::new(),
healthcheck: None,
}
}
pub fn image(mut self, image: String) -> Self {
self.image = Some(image);
self
}
pub fn add_port_binding(mut self, host_port: u16, container_port: u16) -> Self {
self.port_bindings.push((host_port, container_port));
self
}
pub fn add_env_var(mut self, key: &str, value: &str) -> Self {
self.env_vars.push((key.to_string(), value.to_string()));
self
}
pub fn healthcheck(mut self, healthcheck: HealthConfig) -> Self {
self.healthcheck = Some(healthcheck);
self
}
pub fn build(self) -> Result<ContainerRunner<'a>, anyhow::Error> {
let image = self
.image
.ok_or_else(|| anyhow::anyhow!("Image must be set"))?;
Ok(ContainerRunner::<'a> {
name: self.name,
docker: Docker::connect_with_local_defaults()?,
image,
port_bindings: self.port_bindings,
env_vars: self.env_vars,
healthcheck: self.healthcheck,
})
}
}
pub struct ContainerRunner<'a> {
name: Cow<'a, str>,
docker: Docker,
image: String,
port_bindings: Vec<(u16, u16)>,
env_vars: Vec<(String, String)>,
healthcheck: Option<HealthConfig>,
}
impl ContainerRunner<'_> {
pub async fn run(self) -> Result<RunningContainer, anyhow::Error> {
if self.is_container_running().await? {
remove(&self.docker, &self.name).await?;
}
self.pull_image().await?;
let options = CreateContainerOptions {
name: self.name.as_ref(),
platform: None,
};
let mut port_bindings_map = HashMap::new();
for (container_port, host_port) in self.port_bindings {
port_bindings_map.insert(
format!("{container_port}/tcp"),
Some(vec![PortBinding {
host_ip: Some("127.0.0.1".to_string()),
host_port: Some(format!("{host_port}/tcp")),
}]),
);
}
tracing::debug!("Port bindings: {:?}", port_bindings_map);
let port_bindings = if port_bindings_map.is_empty() {
None
} else {
Some(port_bindings_map)
};
let host_config = Some(HostConfig {
port_bindings,
..Default::default()
});
let env_vars: Vec<String> = self
.env_vars
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect();
let env_vars_str = env_vars.iter().map(String::as_str).collect::<Vec<&str>>();
let config = Config::<&str> {
image: Some(&self.image),
env: Some(env_vars_str),
host_config,
healthcheck: self.healthcheck,
..Default::default()
};
let _ = self.docker.create_container(Some(options), config).await?;
self.docker
.start_container(&self.name, None::<StartContainerOptions<String>>)
.await?;
let start_time = std::time::Instant::now();
loop {
let inspect_container = self
.docker
.inspect_container(&self.name, Option::<InspectContainerOptions>::None)
.await?;
tracing::trace!("Container status: {:?}", inspect_container.state);
if let Some(ContainerState {
status: Some(ContainerStateStatusEnum::RUNNING),
health:
Some(Health {
status: Some(HealthStatusEnum::HEALTHY),
..
}),
..
}) = inspect_container.state
{
tracing::debug!("Container running & healthy");
break;
}
if start_time.elapsed().as_secs() > 30 {
return Err(anyhow::anyhow!("Container failed to start"));
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Ok(RunningContainer {
name: self.name.into(),
docker: self.docker,
})
}
async fn pull_image(&self) -> Result<(), anyhow::Error> {
let images = self
.docker
.list_images(Option::<ListImagesOptions>::None)
.await?;
for image in images {
if image.repo_tags.iter().any(|t| t == &self.image) {
tracing::debug!("Docker image {} already pulled", self.image);
return Ok(());
}
}
let options = Some(CreateImageOptions::<&str> {
from_image: &self.image,
..Default::default()
});
let mut pulling_stream = self.docker.create_image(options, None, None);
while let Some(event) = pulling_stream.next().await {
tracing::debug!("Pulling image: {:?}", event?);
}
Ok(())
}
async fn is_container_running(&self) -> Result<bool, anyhow::Error> {
let containers = self
.docker
.list_containers(Option::<ListContainersOptions>::None)
.await?;
for container in containers {
let Some(names) = container.names else {
continue;
};
if names.iter().any(|n| {
tracing::debug!("Docker container: {n}");
n == &self.name || n == &format!("/{}", self.name)
}) {
tracing::debug!("Docker container {} already running", self.name);
return Ok(true);
}
}
Ok(false)
}
}