Skip to main content

atomr_agents_coding_cli_isolator/
docker.rs

1//! Docker isolator backend (via bollard).
2//!
3//! Creates a fresh container per spawn, attaches to its stdin/stdout/
4//! stderr (or a TTY for interactive), and bridges those streams to the
5//! same `ProcessHandle` channel surface as the local backend. Bind
6//! mounts the request's `workdir` into the container.
7
8use std::collections::BTreeMap;
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bollard::container::{
14    AttachContainerOptions, AttachContainerResults, Config as ContainerConfig,
15    CreateContainerOptions, KillContainerOptions, LogOutput, RemoveContainerOptions,
16    StartContainerOptions, WaitContainerOptions,
17};
18use bollard::models::{ContainerWaitResponse, HostConfig, Mount, MountTypeEnum};
19use bollard::Docker;
20use futures_util::{StreamExt, TryStreamExt};
21use parking_lot::Mutex;
22use tokio::sync::mpsc;
23
24use atomr_agents_coding_cli_core::CliCommand;
25
26use crate::error::IsolatorError;
27use crate::handle::{ExitStatus, IsolationOpts, ProcessHandle};
28use crate::traits::Isolator;
29
30const CHANNEL_CAPACITY: usize = 256;
31
32/// Configuration for `DockerIsolator`.
33#[derive(Debug, Clone)]
34pub struct DockerIsolatorConfig {
35    /// Docker image to use for every spawn.
36    pub image: String,
37    /// Additional host→container bind mounts on top of the workdir.
38    pub extra_mounts: Vec<DockerMount>,
39    /// Environment variables to set inside the container.
40    pub env: BTreeMap<String, String>,
41    /// Network mode (e.g. `bridge`, `none`). `None` = bridge.
42    pub network: Option<String>,
43    /// Container path the workdir is mounted at. Defaults to
44    /// `/workspace`.
45    pub workdir_in_container: PathBuf,
46    /// Auto-remove the container after exit.
47    pub auto_remove: bool,
48}
49
50impl DockerIsolatorConfig {
51    pub fn new(image: impl Into<String>) -> Self {
52        Self {
53            image: image.into(),
54            extra_mounts: Vec::new(),
55            env: BTreeMap::new(),
56            network: None,
57            workdir_in_container: PathBuf::from("/workspace"),
58            auto_remove: true,
59        }
60    }
61}
62
63#[derive(Debug, Clone)]
64pub struct DockerMount {
65    pub host_path: PathBuf,
66    pub container_path: PathBuf,
67    pub read_only: bool,
68}
69
70pub struct DockerIsolator {
71    docker: Docker,
72    cfg: DockerIsolatorConfig,
73}
74
75impl DockerIsolator {
76    /// Connect to the local Docker daemon (Unix socket or `DOCKER_HOST`).
77    pub fn local(cfg: DockerIsolatorConfig) -> Result<Self, IsolatorError> {
78        let docker = Docker::connect_with_local_defaults()
79            .map_err(|e| IsolatorError::Docker(format!("connect: {e}")))?;
80        Ok(Self { docker, cfg })
81    }
82
83    pub fn with_docker(docker: Docker, cfg: DockerIsolatorConfig) -> Self {
84        Self { docker, cfg }
85    }
86}
87
88#[async_trait]
89impl Isolator for DockerIsolator {
90    fn name(&self) -> &str {
91        "docker"
92    }
93
94    async fn spawn(
95        &self,
96        cmd: CliCommand,
97        opts: IsolationOpts,
98    ) -> Result<Box<dyn ProcessHandle>, IsolatorError> {
99        let tty = cmd.allocate_pty;
100        let work_in_container = self.cfg.workdir_in_container.clone();
101
102        // Compose the in-container command.
103        let mut argv: Vec<String> = vec![cmd.program.to_string_lossy().into_owned()];
104        for a in &cmd.args {
105            argv.push(a.to_string_lossy().into_owned());
106        }
107
108        // Mounts: workdir + extras.
109        let mut mounts: Vec<Mount> = Vec::with_capacity(1 + self.cfg.extra_mounts.len());
110        mounts.push(Mount {
111            target: Some(work_in_container.to_string_lossy().into_owned()),
112            source: Some(cmd.workdir.to_string_lossy().into_owned()),
113            typ: Some(MountTypeEnum::BIND),
114            read_only: Some(false),
115            ..Default::default()
116        });
117        for m in &self.cfg.extra_mounts {
118            mounts.push(Mount {
119                target: Some(m.container_path.to_string_lossy().into_owned()),
120                source: Some(m.host_path.to_string_lossy().into_owned()),
121                typ: Some(MountTypeEnum::BIND),
122                read_only: Some(m.read_only),
123                ..Default::default()
124            });
125        }
126
127        let mut env_vec: Vec<String> = Vec::new();
128        for (k, v) in self.cfg.env.iter().chain(cmd.env.iter()) {
129            env_vec.push(format!("{k}={v}"));
130        }
131
132        let host_config = HostConfig {
133            mounts: Some(mounts),
134            auto_remove: Some(self.cfg.auto_remove),
135            network_mode: self.cfg.network.clone(),
136            ..Default::default()
137        };
138
139        let config = ContainerConfig::<String> {
140            image: Some(self.cfg.image.clone()),
141            cmd: Some(argv),
142            env: Some(env_vec),
143            working_dir: Some(work_in_container.to_string_lossy().into_owned()),
144            attach_stdin: Some(true),
145            attach_stdout: Some(opts.capture_stdout || tty),
146            attach_stderr: Some(opts.capture_stderr || tty),
147            open_stdin: Some(true),
148            stdin_once: Some(false),
149            tty: Some(tty),
150            host_config: Some(host_config),
151            ..Default::default()
152        };
153
154        let create = self
155            .docker
156            .create_container(None::<CreateContainerOptions<String>>, config)
157            .await?;
158        let container_id = create.id;
159
160        let attach_opts = AttachContainerOptions::<String> {
161            stdin: Some(true),
162            stdout: Some(opts.capture_stdout || tty),
163            stderr: Some(opts.capture_stderr || tty),
164            stream: Some(true),
165            logs: Some(false),
166            detach_keys: None,
167        };
168        let AttachContainerResults { output, input } = self
169            .docker
170            .attach_container(&container_id, Some(attach_opts))
171            .await?;
172
173        self.docker
174            .start_container(&container_id, None::<StartContainerOptions<String>>)
175            .await?;
176
177        // Set up channels and pumps.
178        let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
179        let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
180        let (stdin_tx, mut stdin_rx_for_pump) = mpsc::channel::<Vec<u8>>(CHANNEL_CAPACITY);
181
182        let docker_for_wait = self.docker.clone();
183        let cid_for_wait = container_id.clone();
184        let cached_status: Arc<Mutex<Option<ExitStatus>>> = Arc::new(Mutex::new(None));
185
186        // Output pump: bollard yields `LogOutput` items (StdOut/StdErr
187        // variants, or Console when TTY). Forward each to the right
188        // channel.
189        tokio::spawn(async move {
190            let mut s = output.boxed();
191            while let Some(item) = s.next().await {
192                match item {
193                    Ok(LogOutput::StdOut { message }) | Ok(LogOutput::Console { message }) => {
194                        if stdout_tx.send(message.to_vec()).await.is_err() {
195                            break;
196                        }
197                    }
198                    Ok(LogOutput::StdErr { message }) => {
199                        if stderr_tx.send(message.to_vec()).await.is_err() {
200                            break;
201                        }
202                    }
203                    Ok(LogOutput::StdIn { .. }) => continue,
204                    Err(_) => break,
205                }
206            }
207        });
208
209        // Input pump: drain mpsc → bollard `AsyncWrite`.
210        tokio::spawn(async move {
211            use tokio::io::AsyncWriteExt;
212            let mut input = input;
213            while let Some(chunk) = stdin_rx_for_pump.recv().await {
214                if input.write_all(&chunk).await.is_err() {
215                    break;
216                }
217                let _ = input.flush().await;
218            }
219        });
220
221        Ok(Box::new(DockerProcessHandle {
222            docker: self.docker.clone(),
223            container_id,
224            tty,
225            stdout_rx: Some(stdout_rx),
226            stderr_rx: Some(stderr_rx),
227            stdin_tx: Some(stdin_tx),
228            cached_status,
229            _wait_seed: (docker_for_wait, cid_for_wait),
230        }) as Box<dyn ProcessHandle>)
231    }
232}
233
234struct DockerProcessHandle {
235    docker: Docker,
236    container_id: String,
237    tty: bool,
238    stdout_rx: Option<mpsc::Receiver<Vec<u8>>>,
239    stderr_rx: Option<mpsc::Receiver<Vec<u8>>>,
240    stdin_tx: Option<mpsc::Sender<Vec<u8>>>,
241    cached_status: Arc<Mutex<Option<ExitStatus>>>,
242    _wait_seed: (Docker, String),
243}
244
245#[async_trait]
246impl ProcessHandle for DockerProcessHandle {
247    fn take_stdout(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
248        self.stdout_rx.take()
249    }
250    fn take_stderr(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
251        self.stderr_rx.take()
252    }
253    fn take_stdin(&mut self) -> Option<mpsc::Sender<Vec<u8>>> {
254        self.stdin_tx.take()
255    }
256    fn is_pty(&self) -> bool {
257        self.tty
258    }
259    async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<(), IsolatorError> {
260        if !self.tty {
261            return Err(IsolatorError::Unsupported("resize on non-tty container"));
262        }
263        self.docker
264            .resize_container_tty(
265                &self.container_id,
266                bollard::container::ResizeContainerTtyOptions {
267                    height: rows,
268                    width: cols,
269                },
270            )
271            .await?;
272        Ok(())
273    }
274    async fn kill(&mut self) -> Result<(), IsolatorError> {
275        let _ = self
276            .docker
277            .kill_container(
278                &self.container_id,
279                Some(KillContainerOptions { signal: "SIGTERM" }),
280            )
281            .await;
282        // Best-effort remove (auto_remove handles success path).
283        let _ = self
284            .docker
285            .remove_container(
286                &self.container_id,
287                Some(RemoveContainerOptions {
288                    force: true,
289                    ..Default::default()
290                }),
291            )
292            .await;
293        Ok(())
294    }
295    async fn wait(&mut self) -> Result<ExitStatus, IsolatorError> {
296        if let Some(cached) = *self.cached_status.lock() {
297            return Ok(cached);
298        }
299        let mut s = self
300            .docker
301            .wait_container(&self.container_id, None::<WaitContainerOptions<String>>);
302        let mut last: Option<ContainerWaitResponse> = None;
303        while let Some(item) = s.try_next().await.transpose() {
304            match item {
305                Ok(r) => last = Some(r),
306                Err(e) => return Err(IsolatorError::Docker(e.to_string())),
307            }
308        }
309        let code = last.and_then(|r| Some(r.status_code as i32)).unwrap_or(-1);
310        let status = ExitStatus::from_code(code);
311        *self.cached_status.lock() = Some(status);
312        Ok(status)
313    }
314}