swiftide_docker_executor/
running_docker_executor.rs

1use anyhow::Context as _;
2use async_trait::async_trait;
3use bollard::{
4    query_parameters::{InspectContainerOptions, KillContainerOptions, RemoveContainerOptions},
5    secret::{ContainerState, ContainerStateStatusEnum},
6};
7use codegen::shell_executor_client::ShellExecutorClient;
8use futures_util::Stream;
9use std::{
10    collections::HashMap,
11    net::IpAddr,
12    path::{Path, PathBuf},
13    sync::Arc,
14    time::Duration,
15};
16pub use swiftide_core::ToolExecutor;
17use swiftide_core::{Command, CommandError, CommandOutput, Loader as _, prelude::StreamExt as _};
18use tokio_stream::wrappers::ReceiverStream;
19use tokio_util::sync::CancellationToken;
20
21use crate::{
22    ContextBuilder, ContextError, DockerExecutor, DockerExecutorError, client::Client,
23    container_configurator::ContainerConfigurator, container_starter::ContainerStarter,
24    dockerfile_manager::DockerfileManager, image_builder::ImageBuilder,
25};
26
27pub mod codegen {
28    tonic::include_proto!("shell");
29}
30pub use bollard::container::LogOutput;
31
32#[derive(Clone, Debug)]
33pub struct RunningDockerExecutor {
34    pub container_id: String,
35    pub(crate) docker: Arc<Client>,
36    pub container_port: String,
37    pub container_ip: IpAddr,
38    dropped: bool,
39    retain_on_drop: bool,
40
41    // Default environment configuration for the executor
42    pub(crate) env_clear: bool,
43    pub(crate) remove_env: Vec<String>,
44    pub(crate) env: HashMap<String, String>,
45    pub(crate) default_timeout: Option<Duration>,
46    pub(crate) workdir: PathBuf,
47
48    /// Cancellation token to stop anything polling the docker api
49    cancel_token: Arc<CancellationToken>,
50}
51
52impl From<RunningDockerExecutor> for Arc<dyn ToolExecutor> {
53    fn from(val: RunningDockerExecutor) -> Self {
54        Arc::new(val) as Arc<dyn ToolExecutor>
55    }
56}
57
58#[async_trait]
59impl ToolExecutor for RunningDockerExecutor {
60    #[tracing::instrument(skip(self), err)]
61    async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
62        let workdir = self.resolve_workdir(cmd);
63        let timeout = self.resolve_timeout(cmd);
64
65        match cmd {
66            Command::Shell { command, .. } => self.exec_shell(command, &workdir, timeout).await,
67            Command::ReadFile { path, .. } => self.exec_read_file(&workdir, path, timeout).await,
68            Command::WriteFile { path, content, .. } => {
69                self.exec_write_file(&workdir, path, content, timeout).await
70            }
71            _ => unimplemented!(),
72        }
73    }
74
75    async fn stream_files(
76        &self,
77        path: &Path,
78        extensions: Option<Vec<String>>,
79    ) -> anyhow::Result<swiftide_core::indexing::IndexingStream<String>> {
80        let extensions = extensions.unwrap_or_default();
81        Ok(self.as_file_loader(path, extensions).into_stream())
82    }
83}
84
85impl RunningDockerExecutor {
86    /// Starts a docker container with a given context and image name
87    pub async fn start(
88        builder: &DockerExecutor,
89    ) -> Result<RunningDockerExecutor, DockerExecutorError> {
90        let docker = Client::lazy_client().await?;
91
92        // Any temporary dockrerfile created during the build process
93        let mut tmp_dockerfile_name = None;
94        let mut image_name = builder.image_name.clone();
95        let dockerfile = &builder.dockerfile;
96        let context_path = &builder.context_path;
97        let user = builder.user.as_deref();
98        let container_uuid = builder.container_uuid;
99
100        // Only build if a dockerfile is provided
101        if let Some(dockerfile) = dockerfile {
102            // Prepare dockerfile
103            let dockerfile_manager = DockerfileManager::new(context_path);
104            let tmp_dockerfile = dockerfile_manager.prepare_dockerfile(dockerfile).await?;
105
106            // Build context
107            tracing::warn!(
108                "Creating archive for context from {}",
109                context_path.display()
110            );
111            let context = ContextBuilder::from_path(context_path, tmp_dockerfile.path())?
112                .build_tar()
113                .await?;
114
115            tracing::debug!("Context build with size: {} bytes", context.len());
116
117            let tmp_dockerfile_name_inner = tmp_dockerfile
118                .path()
119                .file_name()
120                .ok_or_else(|| {
121                    ContextError::CustomDockerfile("Could not read custom dockerfile".to_string())
122                })
123                .map(|s| s.to_string_lossy().to_string())?;
124
125            drop(tmp_dockerfile); // Make sure the temporary file is removed right away
126
127            // Build image
128            let tag = container_uuid
129                .to_string()
130                .split_once('-')
131                .map(|(tag, _)| tag)
132                .unwrap_or("latest")
133                .to_string();
134
135            let image_builder = ImageBuilder::new(docker.clone());
136            let image_name_with_tag = image_builder
137                .build_image(
138                    context,
139                    tmp_dockerfile_name_inner.as_ref(),
140                    &image_name,
141                    &tag,
142                )
143                .await?;
144
145            image_name = image_name_with_tag;
146            tmp_dockerfile_name = Some(tmp_dockerfile_name_inner);
147        }
148
149        // Configure container
150        let container_config = ContainerConfigurator::new(docker.socket_path.clone())
151            .create_container_config(&image_name, user, &docker)
152            .await;
153
154        // Start container
155        tracing::info!("Starting container with image: {image_name} and uuid: {container_uuid}");
156        let container_starter = ContainerStarter::new(docker.clone());
157        let (container_id, container_ip, container_port) = container_starter
158            .start_container(&image_name, &container_uuid, container_config)
159            .await?;
160
161        // Remove the temporary dockerfile from the container
162
163        let executor = RunningDockerExecutor {
164            container_id,
165            docker,
166            container_port,
167            container_ip,
168            env_clear: builder.env_clear,
169            remove_env: builder.remove_env.clone(),
170            env: builder.env.clone(),
171            dropped: false,
172            retain_on_drop: builder.retain_on_drop,
173            cancel_token: Arc::new(CancellationToken::new()),
174            default_timeout: builder.default_timeout,
175            workdir: builder.workdir.clone(),
176        };
177
178        if let Some(tmp_dockerfile_name) = tmp_dockerfile_name {
179            let mut removal_targets = vec![tmp_dockerfile_name.clone()];
180
181            if executor.workdir.is_absolute() {
182                removal_targets.push(
183                    executor
184                        .workdir
185                        .join(&tmp_dockerfile_name)
186                        .display()
187                        .to_string(),
188                );
189            }
190
191            let default_workdir = Path::new("/app");
192            if executor.workdir != default_workdir {
193                removal_targets.push(
194                    default_workdir
195                        .join(&tmp_dockerfile_name)
196                        .display()
197                        .to_string(),
198                );
199            }
200
201            removal_targets.sort();
202            removal_targets.dedup();
203
204            let removal_args = removal_targets
205                .iter()
206                .map(|target| format!("{target:?}"))
207                .collect::<Vec<_>>()
208                .join(" ");
209
210            let removal_cmd = format!("rm -f -- {removal_args}");
211
212            executor
213                .exec_shell(&removal_cmd, Path::new("/"), executor.default_timeout)
214                .await
215                .context("failed to remove temporary dockerfile")
216                .map_err(DockerExecutorError::Start)?;
217        }
218
219        Ok(executor)
220    }
221
222    /// Returns the underlying bollard status of the container
223    ///
224    /// Useful for checking if the executor is running or not
225    pub async fn container_state(&self) -> Result<ContainerState, DockerExecutorError> {
226        let container = self
227            .docker
228            .inspect_container(&self.container_id, None::<InspectContainerOptions>)
229            .await?;
230
231        container.state.ok_or_else(|| {
232            DockerExecutorError::ContainerStateMissing(self.container_id.to_string())
233        })
234    }
235
236    /// Check if the executor and its underlying container is running
237    ///
238    /// Will ignore any errors and assume it is not if there are
239    pub async fn is_running(&self) -> bool {
240        self.container_state()
241            .await
242            .map(|state| state.status == Some(ContainerStateStatusEnum::RUNNING))
243            .unwrap_or(false)
244    }
245
246    /// Returns the logs of the container
247    pub async fn logs(&self) -> Result<Vec<String>, DockerExecutorError> {
248        let mut logs = Vec::new();
249        let mut stream = self.docker.logs(
250            &self.container_id,
251            Some(bollard::query_parameters::LogsOptions {
252                follow: false,
253                stdout: true,
254                stderr: true,
255                tail: "all".to_string(),
256                ..Default::default()
257            }),
258        );
259
260        while let Some(log_result) = stream.next().await {
261            match log_result {
262                Ok(log_output) => match log_output {
263                    LogOutput::Console { message }
264                    | LogOutput::StdOut { message }
265                    | LogOutput::StdErr { message } => {
266                        logs.push(String::from_utf8_lossy(&message).trim().to_string());
267                    }
268                    _ => {}
269                },
270                Err(e) => tracing::error!("Error retrieving logs: {e}"), // Error handling
271            }
272        }
273
274        Ok(logs)
275    }
276
277    /// Streams the logs of the container as raw `bollard::container::LogOutput` items.
278    pub async fn logs_stream(
279        &self,
280    ) -> impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
281        let docker = self.docker.clone();
282        let container_id = self.container_id.clone();
283        let cancel = self.cancel_token.clone();
284
285        let (tx, rx) = tokio::sync::mpsc::channel(100);
286
287        tokio::spawn(async move {
288            tokio::select!(
289                _ = cancel.cancelled() => {
290                    tracing::debug!("Logs stream cancelled");
291                },
292                _ = async move {
293                    let mut stream = docker.logs(
294                        &container_id,
295                        Some(bollard::query_parameters::LogsOptions {
296                            follow: true,
297                            stdout: true,
298                            stderr: true,
299                            tail: "all".to_string(),
300                            ..Default::default()
301                        }),
302                    );
303                    tracing::debug!("Starting logs stream for container");
304                    while let Some(log_result) = stream.next().await {
305                        if let Err(err) = tx.send(log_result)
306                        .await {
307                            return tracing::error!("Failed to send log item: {}", err);
308                        }
309                    }
310                } => {
311                    tracing::debug!("Logs stream ended gracefully");
312                },
313                else => {
314                    tracing::error!("Logs stream ended unexpectedly");
315                }
316            );
317
318            tracing::debug!("Closing logs stream channel");
319        });
320
321        ReceiverStream::new(rx)
322    }
323
324    fn resolve_workdir(&self, cmd: &Command) -> PathBuf {
325        match cmd.current_dir_path() {
326            Some(path) if path.is_absolute() => path.to_path_buf(),
327            Some(path) => self.workdir.join(path),
328            None => self.workdir.clone(),
329        }
330    }
331
332    fn resolve_timeout(&self, cmd: &Command) -> Option<Duration> {
333        cmd.timeout_duration().copied().or(self.default_timeout)
334    }
335
336    async fn exec_shell(
337        &self,
338        cmd: &str,
339        workdir: &Path,
340        timeout: Option<Duration>,
341    ) -> Result<CommandOutput, CommandError> {
342        let mut client = ShellExecutorClient::connect(format!(
343            "http://{}:{}",
344            self.container_ip, self.container_port
345        ))
346        .await
347        .map_err(anyhow::Error::from)?;
348
349        let timeout_ms = timeout.map(duration_to_millis);
350        tracing::debug!(?timeout_ms, "sending shell request with timeout");
351
352        let request = tonic::Request::new(codegen::ShellRequest {
353            command: cmd.to_string(),
354            env_clear: self.env_clear,
355            env_remove: self.remove_env.clone(),
356            envs: self.env.clone(),
357            timeout_ms,
358            cwd: Some(workdir.display().to_string()),
359        });
360
361        let response = match client.exec_shell(request).await {
362            Ok(resp) => resp.into_inner(),
363            Err(status) => {
364                if status.code() == tonic::Code::DeadlineExceeded {
365                    if let Some(limit) = timeout {
366                        let message = status.message().to_string();
367                        let output = if message.is_empty() {
368                            CommandOutput::empty()
369                        } else {
370                            CommandOutput::new(message)
371                        };
372
373                        return Err(CommandError::TimedOut {
374                            timeout: limit,
375                            output,
376                        });
377                    }
378
379                    return Err(CommandError::ExecutorError(status.into()));
380                }
381
382                return Err(CommandError::ExecutorError(status.into()));
383            }
384        };
385
386        let codegen::ShellResponse {
387            stdout,
388            stderr,
389            exit_code,
390        } = response;
391
392        let stdout = stdout.trim().to_string();
393        let stderr = stderr.trim().to_string();
394        let merged = merge_stream_output(&stdout, &stderr);
395
396        if exit_code == 0 {
397            Ok(CommandOutput::new(merged))
398        } else {
399            Err(CommandError::NonZeroExit(CommandOutput::new(merged)))
400        }
401    }
402
403    #[tracing::instrument(skip(self))]
404    async fn exec_read_file(
405        &self,
406        workdir: &Path,
407        path: &Path,
408        timeout: Option<Duration>,
409    ) -> Result<CommandOutput, CommandError> {
410        let cmd = format!("cat {}", path.display());
411        self.exec_shell(&cmd, workdir, timeout).await
412    }
413
414    #[tracing::instrument(skip(self, content))]
415    async fn exec_write_file(
416        &self,
417        workdir: &Path,
418        path: &Path,
419        content: &str,
420        timeout: Option<Duration>,
421    ) -> Result<CommandOutput, CommandError> {
422        let cmd = indoc::formatdoc! {
423            r#"
424            cat << 'EOFKWAAK' > {path}
425            {content}
426            EOFKWAAK"#,
427            path = path.display(),
428            content = content.trim_end()
429
430        };
431
432        let write_file_result = self.exec_shell(&cmd, workdir, timeout).await;
433
434        // If the directory or file does not exist, create it
435        if let Err(CommandError::NonZeroExit(write_file)) = &write_file_result
436            && [
437                "no such file or directory",
438                "directory nonexistent",
439                "nonexistent directory",
440            ]
441            .iter()
442            .any(|&s| write_file.output.to_lowercase().contains(s))
443        {
444            let path = path.parent().context("No parent directory")?;
445            let mkdircmd = format!("mkdir -p {}", path.display());
446            let _ = self.exec_shell(&mkdircmd, workdir, timeout).await?;
447
448            return self.exec_shell(&cmd, workdir, timeout).await;
449        }
450
451        write_file_result
452    }
453
454    /// Stops and removes the container associated with this executor.
455    pub async fn shutdown(&self) -> Result<(), DockerExecutorError> {
456        // Stop any jobs that might block the docker socket
457        self.cancel_token.cancel();
458
459        tracing::warn!(
460            "Dropped; stopping and removing container {container_id}",
461            container_id = self.container_id
462        );
463
464        let docker = self.docker.clone();
465        let container_id = self.container_id.clone();
466
467        tracing::debug!(
468            "Stopping container {container_id}",
469            container_id = container_id
470        );
471        docker
472            .kill_container(
473                &container_id,
474                Some(KillContainerOptions {
475                    signal: "SIGTERM".to_string(),
476                }),
477            )
478            .await?;
479
480        tracing::debug!(
481            "Removing container {container_id}",
482            container_id = container_id
483        );
484
485        docker
486            .remove_container(
487                &container_id,
488                Some(RemoveContainerOptions {
489                    force: true,
490                    v: true,
491                    ..Default::default()
492                }),
493            )
494            .await?;
495
496        Ok(())
497    }
498}
499
500fn merge_stream_output(stdout: &str, stderr: &str) -> String {
501    match (stdout.is_empty(), stderr.is_empty()) {
502        (true, true) => String::new(),
503        (false, true) => stdout.to_string(),
504        (true, false) => stderr.to_string(),
505        (false, false) => format!("{stdout}\n{stderr}"),
506    }
507}
508
509fn duration_to_millis(duration: Duration) -> u64 {
510    let millis = duration.as_millis();
511    if millis > u64::MAX as u128 {
512        u64::MAX
513    } else {
514        millis as u64
515    }
516}
517
518impl Drop for RunningDockerExecutor {
519    fn drop(&mut self) {
520        if self.dropped {
521            tracing::debug!(
522                "Executor already dropped; skipping stop and remove for container {}",
523                self.container_id
524            );
525            return;
526        }
527        if self.retain_on_drop {
528            tracing::debug!(
529                "Retaining container {} on drop; not stopping or removing",
530                self.container_id
531            );
532            return;
533        }
534        self.dropped = true;
535        self.cancel_token.cancel();
536
537        let this = self.clone();
538        let container_id = self.container_id.clone();
539
540        tokio::task::spawn_blocking(move || {
541            let handle = tokio::runtime::Handle::current();
542            handle.block_on(async move { this.shutdown().await })
543        });
544        tracing::debug!("Container stopped {container_id}");
545    }
546}