mcai_docker/
container.rs

1use crate::error::Result;
2use async_std::prelude::StreamExt;
3use bollard::container::{
4  CreateContainerOptions, InspectContainerOptions, ListContainersOptions, StopContainerOptions,
5};
6use bollard::models::{ContainerInspectResponse, PortBinding};
7use bollard::{
8  container::{AttachContainerOptions, Config, RemoveContainerOptions},
9  models::HostConfig,
10  Docker,
11};
12use std::collections::hash_map::RandomState;
13use std::collections::HashMap;
14use std::io::Write;
15
16#[derive(Clone, Debug, Default)]
17struct HardwareOptions {
18  binds: Option<Vec<String>>,
19  cpu: Option<f32>,
20  network_mode: Option<String>,
21  port_bindings: Option<HashMap<String, Option<Vec<PortBinding>>, RandomState>>,
22  ram: Option<i64>,
23  swap: Option<i64>,
24}
25
26impl From<&HardwareOptions> for HostConfig {
27  fn from(hardware_options: &HardwareOptions) -> Self {
28    let cpu_quota: Option<i64> = hardware_options.cpu.map(|c| (c * 100_000.0) as i64);
29    HostConfig {
30      binds: hardware_options.binds.clone(),
31      cpu_period: Some(100_000),
32      cpu_quota,
33      memory: hardware_options.ram,
34      memory_swap: hardware_options.swap,
35      network_mode: hardware_options.network_mode.clone(),
36      port_bindings: hardware_options.port_bindings.clone(),
37      ..Default::default()
38    }
39  }
40}
41
42#[derive(Debug, Default)]
43pub struct ContainerBuilder {
44  image: String,
45  name: Option<String>,
46  cmd: Option<Vec<String>>,
47  env: Option<Vec<String>>,
48  hardware: HardwareOptions,
49  tty: Option<bool>,
50}
51
52impl ContainerBuilder {
53  pub(crate) fn new(image: &str) -> Self {
54    ContainerBuilder {
55      image: image.to_string(),
56      ..Default::default()
57    }
58  }
59
60  pub fn with_name(mut self, name: &str) -> Self {
61    self.name = Some(name.to_string());
62    self
63  }
64
65  pub fn with_command(mut self, cmd: &str) -> Self {
66    self.cmd = Some(cmd.split_whitespace().map(|c| c.to_string()).collect());
67    self
68  }
69
70  pub fn with_env(mut self, key: &str, value: &(dyn ToString + Send + Sync)) -> Self {
71    let var = format!("{}={}", key, value.to_string());
72    if let Some(env) = &mut self.env {
73      env.push(var)
74    } else {
75      self.env = Some(vec![var])
76    }
77    self
78  }
79
80  pub fn with_envs(mut self, envs: &[(&str, &(dyn ToString + Send + Sync))]) -> Self {
81    let mut envs = envs
82      .iter()
83      .map(|(key, value)| format!("{}={}", key, value.to_string()))
84      .collect();
85
86    if let Some(env) = &mut self.env {
87      env.append(&mut envs)
88    } else {
89      self.env = Some(envs)
90    }
91    self
92  }
93
94  pub fn with_cpu_limit(mut self, cpu: f32) -> Self {
95    self.hardware.cpu = Some(cpu);
96    self
97  }
98
99  pub fn with_ram_limit(mut self, ram: i64) -> Self {
100    self.hardware.ram = Some(ram);
101    self
102  }
103
104  pub fn with_swap_limit(mut self, swap: i64) -> Self {
105    self.hardware.swap = Some(swap);
106    self
107  }
108
109  pub fn with_enable_tty(mut self) -> Self {
110    self.tty = Some(true);
111    self
112  }
113
114  pub fn with_disable_tty(mut self) -> Self {
115    self.tty = Some(false);
116    self
117  }
118
119  pub fn with_network(mut self, network: &str) -> Self {
120    self.hardware.network_mode = Some(network.to_string());
121    self
122  }
123
124  pub fn with_port_binding(
125    mut self,
126    port: u16,
127    host_port: Option<u16>,
128    host_ip: Option<&str>,
129  ) -> Self {
130    let binding = Some(vec![PortBinding {
131      host_ip: host_ip.map(|ip| ip.to_string()),
132      host_port: host_port.map(|p| p.to_string()),
133    }]);
134
135    if let Some(ports) = &mut self.hardware.port_bindings {
136      ports.insert(port.to_string(), binding);
137    } else {
138      self.hardware.port_bindings = Some(HashMap::from([(port.to_string(), binding)]));
139    };
140    self
141  }
142
143  pub fn with_port_bindings(mut self, bindings: &[(u16, Option<u16>, Option<&str>)]) -> Self {
144    for (port, host_port, host_ip) in bindings {
145      self = self.with_port_binding(*port, *host_port, *host_ip);
146    }
147    self
148  }
149
150  pub fn with_volume_binding(mut self, host_path: &str, container_path: &str) -> Self {
151    let mapping = format!("{}:{}", host_path, container_path);
152    if let Some(binds) = &mut self.hardware.binds {
153      binds.push(mapping)
154    } else {
155      self.hardware.binds = Some(vec![mapping])
156    };
157    self
158  }
159
160  pub fn with_volume_bindings(mut self, bindings: &[(&str, &str)]) -> Self {
161    for (host_path, container_path) in bindings {
162      self = self.with_volume_binding(host_path, container_path);
163    }
164    self
165  }
166
167  pub async fn build(&self, docker: &Docker) -> Result<Container> {
168    let worker_config = Config {
169      image: Some(self.image.clone()),
170      cmd: self.cmd.clone(),
171      env: self.env.clone(),
172      host_config: Some(HostConfig::from(&self.hardware)),
173      tty: self.tty,
174      ..Default::default()
175    };
176
177    let container_options = self
178      .name
179      .clone()
180      .map(|name| CreateContainerOptions { name });
181
182    let id = docker
183      .create_container::<String, String>(container_options, worker_config)
184      .await?
185      .id;
186
187    Ok(Container::new(id, self.name.clone()))
188  }
189}
190
191#[derive(Debug)]
192pub struct Container {
193  pub id: String,
194  pub name: Option<String>,
195}
196
197impl Container {
198  pub fn new(id: String, name: Option<String>) -> Self {
199    Container { id, name }
200  }
201
202  pub async fn get_by_name(docker: &Docker, name: &str) -> Option<Self> {
203    let filters = HashMap::from([("name".to_string(), vec![name.to_string()])]);
204    let list_options = Some(ListContainersOptions {
205      all: true,
206      filters,
207      limit: Some(1),
208      ..Default::default()
209    });
210
211    docker
212      .list_containers(list_options)
213      .await
214      .ok()
215      .unwrap_or_default()
216      .first()
217      .map(|c| Self::new(c.id.clone().unwrap(), Some(name.to_string())))
218  }
219
220  pub async fn run(&self, docker: &Docker, writer: Option<&mut (dyn Write + Send)>) -> Result<()> {
221    let writer_and_stdout = if let Some(writer) = writer {
222      let logs_options = Some(AttachContainerOptions::<String> {
223        stdout: Some(true),
224        stream: Some(true),
225        ..Default::default()
226      });
227
228      Some((
229        writer,
230        docker.attach_container(&self.id, logs_options).await?,
231      ))
232    } else {
233      None
234    };
235
236    docker.start_container::<String>(&self.id, None).await?;
237
238    if let Some((writer, mut stdout_stream)) = writer_and_stdout {
239      while let Some(msg) = stdout_stream.output.next().await {
240        msg.map(|m| writer.write(&m.to_string().into_bytes()))??;
241      }
242    }
243    Ok(())
244  }
245
246  pub async fn inspect(&self, docker: &Docker, size: bool) -> Result<ContainerInspectResponse> {
247    let inspect_options = Some(InspectContainerOptions { size });
248    Ok(docker.inspect_container(&self.id, inspect_options).await?)
249  }
250
251  pub async fn stop(&self, docker: &Docker, time: i64) -> Result<()> {
252    let stop_options = Some(StopContainerOptions { t: time });
253    Ok(docker.stop_container(&self.id, stop_options).await?)
254  }
255
256  pub async fn remove(&self, docker: &Docker, force: bool) -> Result<()> {
257    docker
258      .remove_container(
259        &self.id,
260        Some(RemoveContainerOptions {
261          force,
262          ..Default::default()
263        }),
264      )
265      .await?;
266    Ok(())
267  }
268}