use crate::error::Result;
use async_std::prelude::StreamExt;
use bollard::container::{
CreateContainerOptions, InspectContainerOptions, ListContainersOptions, StopContainerOptions,
};
use bollard::models::{ContainerInspectResponse, PortBinding};
use bollard::{
container::{AttachContainerOptions, Config, RemoveContainerOptions},
models::HostConfig,
Docker,
};
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::io::Write;
#[derive(Clone, Debug, Default)]
struct HardwareOptions {
binds: Option<Vec<String>>,
cpu: Option<f32>,
network_mode: Option<String>,
port_bindings: Option<HashMap<String, Option<Vec<PortBinding>>, RandomState>>,
ram: Option<i64>,
swap: Option<i64>,
}
impl From<&HardwareOptions> for HostConfig {
fn from(hardware_options: &HardwareOptions) -> Self {
let cpu_quota: Option<i64> = hardware_options.cpu.map(|c| (c * 100_000.0) as i64);
HostConfig {
binds: hardware_options.binds.clone(),
cpu_period: Some(100_000),
cpu_quota,
memory: hardware_options.ram,
memory_swap: hardware_options.swap,
network_mode: hardware_options.network_mode.clone(),
port_bindings: hardware_options.port_bindings.clone(),
..Default::default()
}
}
}
#[derive(Debug, Default)]
pub struct ContainerBuilder {
image: String,
name: Option<String>,
cmd: Option<Vec<String>>,
env: Option<Vec<String>>,
hardware: HardwareOptions,
tty: Option<bool>,
}
impl ContainerBuilder {
pub(crate) fn new(image: &str) -> Self {
ContainerBuilder {
image: image.to_string(),
..Default::default()
}
}
pub fn with_name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
pub fn with_command(mut self, cmd: &str) -> Self {
self.cmd = Some(cmd.split_whitespace().map(|c| c.to_string()).collect());
self
}
pub fn with_env(mut self, key: &str, value: &(dyn ToString + Send + Sync)) -> Self {
let var = format!("{}={}", key, value.to_string());
if let Some(env) = &mut self.env {
env.push(var)
} else {
self.env = Some(vec![var])
}
self
}
pub fn with_envs(mut self, envs: &[(&str, &(dyn ToString + Send + Sync))]) -> Self {
let mut envs = envs
.iter()
.map(|(key, value)| format!("{}={}", key, value.to_string()))
.collect();
if let Some(env) = &mut self.env {
env.append(&mut envs)
} else {
self.env = Some(envs)
}
self
}
pub fn with_cpu_limit(mut self, cpu: f32) -> Self {
self.hardware.cpu = Some(cpu);
self
}
pub fn with_ram_limit(mut self, ram: i64) -> Self {
self.hardware.ram = Some(ram);
self
}
pub fn with_swap_limit(mut self, swap: i64) -> Self {
self.hardware.swap = Some(swap);
self
}
pub fn with_enable_tty(mut self) -> Self {
self.tty = Some(true);
self
}
pub fn with_disable_tty(mut self) -> Self {
self.tty = Some(false);
self
}
pub fn with_network(mut self, network: &str) -> Self {
self.hardware.network_mode = Some(network.to_string());
self
}
pub fn with_port_binding(
mut self,
port: u16,
host_port: Option<u16>,
host_ip: Option<&str>,
) -> Self {
let binding = Some(vec![PortBinding {
host_ip: host_ip.map(|ip| ip.to_string()),
host_port: host_port.map(|p| p.to_string()),
}]);
if let Some(ports) = &mut self.hardware.port_bindings {
ports.insert(port.to_string(), binding);
} else {
self.hardware.port_bindings = Some(HashMap::from([(port.to_string(), binding)]));
};
self
}
pub fn with_port_bindings(mut self, bindings: &[(u16, Option<u16>, Option<&str>)]) -> Self {
for (port, host_port, host_ip) in bindings {
self = self.with_port_binding(*port, *host_port, *host_ip);
}
self
}
pub fn with_volume_binding(mut self, host_path: &str, container_path: &str) -> Self {
let mapping = format!("{}:{}", host_path, container_path);
if let Some(binds) = &mut self.hardware.binds {
binds.push(mapping)
} else {
self.hardware.binds = Some(vec![mapping])
};
self
}
pub fn with_volume_bindings(mut self, bindings: &[(&str, &str)]) -> Self {
for (host_path, container_path) in bindings {
self = self.with_volume_binding(host_path, container_path);
}
self
}
pub async fn build(&self, docker: &Docker) -> Result<Container> {
let worker_config = Config {
image: Some(self.image.clone()),
cmd: self.cmd.clone(),
env: self.env.clone(),
host_config: Some(HostConfig::from(&self.hardware)),
tty: self.tty,
..Default::default()
};
let container_options = self
.name
.clone()
.map(|name| CreateContainerOptions { name });
let id = docker
.create_container::<String, String>(container_options, worker_config)
.await?
.id;
Ok(Container::new(id, self.name.clone()))
}
}
#[derive(Debug)]
pub struct Container {
pub id: String,
pub name: Option<String>,
}
impl Container {
pub fn new(id: String, name: Option<String>) -> Self {
Container { id, name }
}
pub async fn get_by_name(docker: &Docker, name: &str) -> Option<Self> {
let filters = HashMap::from([("name".to_string(), vec![name.to_string()])]);
let list_options = Some(ListContainersOptions {
all: true,
filters,
limit: Some(1),
..Default::default()
});
docker
.list_containers(list_options)
.await
.ok()
.unwrap_or_default()
.first()
.map(|c| Self::new(c.id.clone().unwrap(), Some(name.to_string())))
}
pub async fn run(&self, docker: &Docker, writer: Option<&mut (dyn Write + Send)>) -> Result<()> {
let writer_and_stdout = if let Some(writer) = writer {
let logs_options = Some(AttachContainerOptions::<String> {
stdout: Some(true),
stream: Some(true),
..Default::default()
});
Some((
writer,
docker.attach_container(&self.id, logs_options).await?,
))
} else {
None
};
docker.start_container::<String>(&self.id, None).await?;
if let Some((writer, mut stdout_stream)) = writer_and_stdout {
while let Some(msg) = stdout_stream.output.next().await {
msg.map(|m| writer.write(&m.to_string().into_bytes()))??;
}
}
Ok(())
}
pub async fn inspect(&self, docker: &Docker, size: bool) -> Result<ContainerInspectResponse> {
let inspect_options = Some(InspectContainerOptions { size });
Ok(docker.inspect_container(&self.id, inspect_options).await?)
}
pub async fn stop(&self, docker: &Docker, time: i64) -> Result<()> {
let stop_options = Some(StopContainerOptions { t: time });
Ok(docker.stop_container(&self.id, stop_options).await?)
}
pub async fn remove(&self, docker: &Docker, force: bool) -> Result<()> {
docker
.remove_container(
&self.id,
Some(RemoveContainerOptions {
force,
..Default::default()
}),
)
.await?;
Ok(())
}
}