mcai-docker 0.2.2

Library to start and stop MCAI workers images with Docker.
Documentation
use crate::error::Result;
use async_std::prelude::StreamExt;
use bollard::container::{
  CreateContainerOptions, InspectContainerOptions, ListContainersOptions, StopContainerOptions,
};
use bollard::models::{ContainerInspectResponse, PortBinding};
use bollard::{
  container::{AttachContainerOptions, Config, RemoveContainerOptions},
  models::HostConfig,
  Docker,
};
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::io::Write;

#[derive(Clone, Debug, Default)]
struct HardwareOptions {
  binds: Option<Vec<String>>,
  cpu: Option<f32>,
  network_mode: Option<String>,
  port_bindings: Option<HashMap<String, Option<Vec<PortBinding>>, RandomState>>,
  ram: Option<i64>,
  swap: Option<i64>,
}

impl From<&HardwareOptions> for HostConfig {
  fn from(hardware_options: &HardwareOptions) -> Self {
    let cpu_quota: Option<i64> = hardware_options.cpu.map(|c| (c * 100_000.0) as i64);
    HostConfig {
      binds: hardware_options.binds.clone(),
      cpu_period: Some(100_000),
      cpu_quota,
      memory: hardware_options.ram,
      memory_swap: hardware_options.swap,
      network_mode: hardware_options.network_mode.clone(),
      port_bindings: hardware_options.port_bindings.clone(),
      ..Default::default()
    }
  }
}

#[derive(Debug, Default)]
pub struct ContainerBuilder {
  image: String,
  name: Option<String>,
  cmd: Option<Vec<String>>,
  env: Option<Vec<String>>,
  hardware: HardwareOptions,
  tty: Option<bool>,
}

impl ContainerBuilder {
  pub(crate) fn new(image: &str) -> Self {
    ContainerBuilder {
      image: image.to_string(),
      ..Default::default()
    }
  }

  pub fn with_name(mut self, name: &str) -> Self {
    self.name = Some(name.to_string());
    self
  }

  pub fn with_command(mut self, cmd: &str) -> Self {
    self.cmd = Some(cmd.split_whitespace().map(|c| c.to_string()).collect());
    self
  }

  pub fn with_env(mut self, key: &str, value: &(dyn ToString + Send + Sync)) -> Self {
    let var = format!("{}={}", key, value.to_string());
    if let Some(env) = &mut self.env {
      env.push(var)
    } else {
      self.env = Some(vec![var])
    }
    self
  }

  pub fn with_envs(mut self, envs: &[(&str, &(dyn ToString + Send + Sync))]) -> Self {
    let mut envs = envs
      .iter()
      .map(|(key, value)| format!("{}={}", key, value.to_string()))
      .collect();

    if let Some(env) = &mut self.env {
      env.append(&mut envs)
    } else {
      self.env = Some(envs)
    }
    self
  }

  pub fn with_cpu_limit(mut self, cpu: f32) -> Self {
    self.hardware.cpu = Some(cpu);
    self
  }

  pub fn with_ram_limit(mut self, ram: i64) -> Self {
    self.hardware.ram = Some(ram);
    self
  }

  pub fn with_swap_limit(mut self, swap: i64) -> Self {
    self.hardware.swap = Some(swap);
    self
  }

  pub fn with_enable_tty(mut self) -> Self {
    self.tty = Some(true);
    self
  }

  pub fn with_disable_tty(mut self) -> Self {
    self.tty = Some(false);
    self
  }

  pub fn with_network(mut self, network: &str) -> Self {
    self.hardware.network_mode = Some(network.to_string());
    self
  }

  pub fn with_port_binding(
    mut self,
    port: u16,
    host_port: Option<u16>,
    host_ip: Option<&str>,
  ) -> Self {
    let binding = Some(vec![PortBinding {
      host_ip: host_ip.map(|ip| ip.to_string()),
      host_port: host_port.map(|p| p.to_string()),
    }]);

    if let Some(ports) = &mut self.hardware.port_bindings {
      ports.insert(port.to_string(), binding);
    } else {
      self.hardware.port_bindings = Some(HashMap::from([(port.to_string(), binding)]));
    };
    self
  }

  pub fn with_port_bindings(mut self, bindings: &[(u16, Option<u16>, Option<&str>)]) -> Self {
    for (port, host_port, host_ip) in bindings {
      self = self.with_port_binding(*port, *host_port, *host_ip);
    }
    self
  }

  pub fn with_volume_binding(mut self, host_path: &str, container_path: &str) -> Self {
    let mapping = format!("{}:{}", host_path, container_path);
    if let Some(binds) = &mut self.hardware.binds {
      binds.push(mapping)
    } else {
      self.hardware.binds = Some(vec![mapping])
    };
    self
  }

  pub fn with_volume_bindings(mut self, bindings: &[(&str, &str)]) -> Self {
    for (host_path, container_path) in bindings {
      self = self.with_volume_binding(host_path, container_path);
    }
    self
  }

  pub async fn build(&self, docker: &Docker) -> Result<Container> {
    let worker_config = Config {
      image: Some(self.image.clone()),
      cmd: self.cmd.clone(),
      env: self.env.clone(),
      host_config: Some(HostConfig::from(&self.hardware)),
      tty: self.tty,
      ..Default::default()
    };

    let container_options = self
      .name
      .clone()
      .map(|name| CreateContainerOptions { name });

    let id = docker
      .create_container::<String, String>(container_options, worker_config)
      .await?
      .id;

    Ok(Container::new(id, self.name.clone()))
  }
}

#[derive(Debug)]
pub struct Container {
  pub id: String,
  pub name: Option<String>,
}

impl Container {
  pub fn new(id: String, name: Option<String>) -> Self {
    Container { id, name }
  }

  pub async fn get_by_name(docker: &Docker, name: &str) -> Option<Self> {
    let filters = HashMap::from([("name".to_string(), vec![name.to_string()])]);
    let list_options = Some(ListContainersOptions {
      all: true,
      filters,
      limit: Some(1),
      ..Default::default()
    });

    docker
      .list_containers(list_options)
      .await
      .ok()
      .unwrap_or_default()
      .first()
      .map(|c| Self::new(c.id.clone().unwrap(), Some(name.to_string())))
  }

  pub async fn run(&self, docker: &Docker, writer: Option<&mut (dyn Write + Send)>) -> Result<()> {
    let writer_and_stdout = if let Some(writer) = writer {
      let logs_options = Some(AttachContainerOptions::<String> {
        stdout: Some(true),
        stream: Some(true),
        ..Default::default()
      });

      Some((
        writer,
        docker.attach_container(&self.id, logs_options).await?,
      ))
    } else {
      None
    };

    docker.start_container::<String>(&self.id, None).await?;

    if let Some((writer, mut stdout_stream)) = writer_and_stdout {
      while let Some(msg) = stdout_stream.output.next().await {
        msg.map(|m| writer.write(&m.to_string().into_bytes()))??;
      }
    }
    Ok(())
  }

  pub async fn inspect(&self, docker: &Docker, size: bool) -> Result<ContainerInspectResponse> {
    let inspect_options = Some(InspectContainerOptions { size });
    Ok(docker.inspect_container(&self.id, inspect_options).await?)
  }

  pub async fn stop(&self, docker: &Docker, time: i64) -> Result<()> {
    let stop_options = Some(StopContainerOptions { t: time });
    Ok(docker.stop_container(&self.id, stop_options).await?)
  }

  pub async fn remove(&self, docker: &Docker, force: bool) -> Result<()> {
    docker
      .remove_container(
        &self.id,
        Some(RemoveContainerOptions {
          force,
          ..Default::default()
        }),
      )
      .await?;
    Ok(())
  }
}