atomr_agents_coding_cli_isolator/
docker.rs1use std::collections::BTreeMap;
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bollard::container::{
14 AttachContainerOptions, AttachContainerResults, Config as ContainerConfig,
15 CreateContainerOptions, KillContainerOptions, LogOutput, RemoveContainerOptions,
16 StartContainerOptions, WaitContainerOptions,
17};
18use bollard::models::{ContainerWaitResponse, HostConfig, Mount, MountTypeEnum};
19use bollard::Docker;
20use futures_util::{StreamExt, TryStreamExt};
21use parking_lot::Mutex;
22use tokio::sync::mpsc;
23
24use atomr_agents_coding_cli_core::CliCommand;
25
26use crate::error::IsolatorError;
27use crate::handle::{ExitStatus, IsolationOpts, ProcessHandle};
28use crate::traits::Isolator;
29
30const CHANNEL_CAPACITY: usize = 256;
31
32#[derive(Debug, Clone)]
34pub struct DockerIsolatorConfig {
35 pub image: String,
37 pub extra_mounts: Vec<DockerMount>,
39 pub env: BTreeMap<String, String>,
41 pub network: Option<String>,
43 pub workdir_in_container: PathBuf,
46 pub auto_remove: bool,
48}
49
50impl DockerIsolatorConfig {
51 pub fn new(image: impl Into<String>) -> Self {
52 Self {
53 image: image.into(),
54 extra_mounts: Vec::new(),
55 env: BTreeMap::new(),
56 network: None,
57 workdir_in_container: PathBuf::from("/workspace"),
58 auto_remove: true,
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
64pub struct DockerMount {
65 pub host_path: PathBuf,
66 pub container_path: PathBuf,
67 pub read_only: bool,
68}
69
70pub struct DockerIsolator {
71 docker: Docker,
72 cfg: DockerIsolatorConfig,
73}
74
75impl DockerIsolator {
76 pub fn local(cfg: DockerIsolatorConfig) -> Result<Self, IsolatorError> {
78 let docker = Docker::connect_with_local_defaults()
79 .map_err(|e| IsolatorError::Docker(format!("connect: {e}")))?;
80 Ok(Self { docker, cfg })
81 }
82
83 pub fn with_docker(docker: Docker, cfg: DockerIsolatorConfig) -> Self {
84 Self { docker, cfg }
85 }
86}
87
88#[async_trait]
89impl Isolator for DockerIsolator {
90 fn name(&self) -> &str {
91 "docker"
92 }
93
94 async fn spawn(
95 &self,
96 cmd: CliCommand,
97 opts: IsolationOpts,
98 ) -> Result<Box<dyn ProcessHandle>, IsolatorError> {
99 let tty = cmd.allocate_pty;
100 let work_in_container = self.cfg.workdir_in_container.clone();
101
102 let mut argv: Vec<String> = vec![cmd.program.to_string_lossy().into_owned()];
104 for a in &cmd.args {
105 argv.push(a.to_string_lossy().into_owned());
106 }
107
108 let mut mounts: Vec<Mount> = Vec::with_capacity(1 + self.cfg.extra_mounts.len());
110 mounts.push(Mount {
111 target: Some(work_in_container.to_string_lossy().into_owned()),
112 source: Some(cmd.workdir.to_string_lossy().into_owned()),
113 typ: Some(MountTypeEnum::BIND),
114 read_only: Some(false),
115 ..Default::default()
116 });
117 for m in &self.cfg.extra_mounts {
118 mounts.push(Mount {
119 target: Some(m.container_path.to_string_lossy().into_owned()),
120 source: Some(m.host_path.to_string_lossy().into_owned()),
121 typ: Some(MountTypeEnum::BIND),
122 read_only: Some(m.read_only),
123 ..Default::default()
124 });
125 }
126
127 let mut env_vec: Vec<String> = Vec::new();
128 for (k, v) in self.cfg.env.iter().chain(cmd.env.iter()) {
129 env_vec.push(format!("{k}={v}"));
130 }
131
132 let host_config = HostConfig {
133 mounts: Some(mounts),
134 auto_remove: Some(self.cfg.auto_remove),
135 network_mode: self.cfg.network.clone(),
136 ..Default::default()
137 };
138
139 let config = ContainerConfig::<String> {
140 image: Some(self.cfg.image.clone()),
141 cmd: Some(argv),
142 env: Some(env_vec),
143 working_dir: Some(work_in_container.to_string_lossy().into_owned()),
144 attach_stdin: Some(true),
145 attach_stdout: Some(opts.capture_stdout || tty),
146 attach_stderr: Some(opts.capture_stderr || tty),
147 open_stdin: Some(true),
148 stdin_once: Some(false),
149 tty: Some(tty),
150 host_config: Some(host_config),
151 ..Default::default()
152 };
153
154 let create = self
155 .docker
156 .create_container(None::<CreateContainerOptions<String>>, config)
157 .await?;
158 let container_id = create.id;
159
160 let attach_opts = AttachContainerOptions::<String> {
161 stdin: Some(true),
162 stdout: Some(opts.capture_stdout || tty),
163 stderr: Some(opts.capture_stderr || tty),
164 stream: Some(true),
165 logs: Some(false),
166 detach_keys: None,
167 };
168 let AttachContainerResults { output, input } = self
169 .docker
170 .attach_container(&container_id, Some(attach_opts))
171 .await?;
172
173 self.docker
174 .start_container(&container_id, None::<StartContainerOptions<String>>)
175 .await?;
176
177 let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
179 let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
180 let (stdin_tx, mut stdin_rx_for_pump) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
181
182 let docker_for_wait = self.docker.clone();
183 let cid_for_wait = container_id.clone();
184 let cached_status: Arc<Mutex<Option<ExitStatus>>> = Arc::new(Mutex::new(None));
185
186 tokio::spawn(async move {
190 let mut s = output.boxed();
191 while let Some(item) = s.next().await {
192 match item {
193 Ok(LogOutput::StdOut { message }) | Ok(LogOutput::Console { message }) => {
194 if stdout_tx.send(message.to_vec()).await.is_err() {
195 break;
196 }
197 }
198 Ok(LogOutput::StdErr { message }) => {
199 if stderr_tx.send(message.to_vec()).await.is_err() {
200 break;
201 }
202 }
203 Ok(LogOutput::StdIn { .. }) => continue,
204 Err(_) => break,
205 }
206 }
207 });
208
209 tokio::spawn(async move {
211 use tokio::io::AsyncWriteExt;
212 let mut input = input;
213 while let Some(chunk) = stdin_rx_for_pump.recv().await {
214 if input.write_all(&chunk).await.is_err() {
215 break;
216 }
217 let _ = input.flush().await;
218 }
219 });
220
221 Ok(Box::new(DockerProcessHandle {
222 docker: self.docker.clone(),
223 container_id,
224 tty,
225 stdout_rx: Some(stdout_rx),
226 stderr_rx: Some(stderr_rx),
227 stdin_tx: Some(stdin_tx),
228 cached_status,
229 _wait_seed: (docker_for_wait, cid_for_wait),
230 }) as Box<dyn ProcessHandle>)
231 }
232}
233
234struct DockerProcessHandle {
235 docker: Docker,
236 container_id: String,
237 tty: bool,
238 stdout_rx: Option<mpsc::Receiver<Vec<u8>>>,
239 stderr_rx: Option<mpsc::Receiver<Vec<u8>>>,
240 stdin_tx: Option<mpsc::Sender<Vec<u8>>>,
241 cached_status: Arc<Mutex<Option<ExitStatus>>>,
242 _wait_seed: (Docker, String),
243}
244
245#[async_trait]
246impl ProcessHandle for DockerProcessHandle {
247 fn take_stdout(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
248 self.stdout_rx.take()
249 }
250 fn take_stderr(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
251 self.stderr_rx.take()
252 }
253 fn take_stdin(&mut self) -> Option<mpsc::Sender<Vec<u8>>> {
254 self.stdin_tx.take()
255 }
256 fn is_pty(&self) -> bool {
257 self.tty
258 }
259 async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<(), IsolatorError> {
260 if !self.tty {
261 return Err(IsolatorError::Unsupported("resize on non-tty container"));
262 }
263 self.docker
264 .resize_container_tty(
265 &self.container_id,
266 bollard::container::ResizeContainerTtyOptions {
267 height: rows,
268 width: cols,
269 },
270 )
271 .await?;
272 Ok(())
273 }
274 async fn kill(&mut self) -> Result<(), IsolatorError> {
275 let _ = self
276 .docker
277 .kill_container(
278 &self.container_id,
279 Some(KillContainerOptions { signal: "SIGTERM" }),
280 )
281 .await;
282 let _ = self
284 .docker
285 .remove_container(
286 &self.container_id,
287 Some(RemoveContainerOptions {
288 force: true,
289 ..Default::default()
290 }),
291 )
292 .await;
293 Ok(())
294 }
295 async fn wait(&mut self) -> Result<ExitStatus, IsolatorError> {
296 if let Some(cached) = *self.cached_status.lock() {
297 return Ok(cached);
298 }
299 let mut s = self
300 .docker
301 .wait_container(&self.container_id, None::<WaitContainerOptions<String>>);
302 let mut last: Option<ContainerWaitResponse> = None;
303 while let Some(item) = s.try_next().await.transpose() {
304 match item {
305 Ok(r) => last = Some(r),
306 Err(e) => return Err(IsolatorError::Docker(e.to_string())),
307 }
308 }
309 let code = last.and_then(|r| Some(r.status_code as i32)).unwrap_or(-1);
310 let status = ExitStatus::from_code(code);
311 *self.cached_status.lock() = Some(status);
312 Ok(status)
313 }
314}