swiftide_docker_executor/
running_docker_executor.rs

1use anyhow::Context as _;
2use async_trait::async_trait;
3use bollard::{
4    query_parameters::{InspectContainerOptions, KillContainerOptions, RemoveContainerOptions},
5    secret::{ContainerState, ContainerStateStatusEnum},
6};
7use codegen::shell_executor_client::ShellExecutorClient;
8use futures_util::Stream;
9use std::{collections::HashMap, path::Path, sync::Arc};
10pub use swiftide_core::ToolExecutor;
11use swiftide_core::{Command, CommandError, CommandOutput, Loader as _, prelude::StreamExt as _};
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16    ContextBuilder, ContextError, DockerExecutor, DockerExecutorError, client::Client,
17    container_configurator::ContainerConfigurator, container_starter::ContainerStarter,
18    dockerfile_manager::DockerfileManager, image_builder::ImageBuilder,
19};
20
21pub mod codegen {
22    tonic::include_proto!("shell");
23}
24pub use bollard::container::LogOutput;
25
26#[derive(Clone, Debug)]
27pub struct RunningDockerExecutor {
28    pub container_id: String,
29    pub(crate) docker: Arc<Client>,
30    pub host_port: String,
31    dropped: bool,
32    retain_on_drop: bool,
33
34    // Default environment configuration for the executor
35    pub(crate) env_clear: bool,
36    pub(crate) remove_env: Vec<String>,
37    pub(crate) env: HashMap<String, String>,
38
39    /// Cancellation token to stop anything polling the docker api
40    cancel_token: Arc<CancellationToken>,
41}
42
43impl From<RunningDockerExecutor> for Arc<dyn ToolExecutor> {
44    fn from(val: RunningDockerExecutor) -> Self {
45        Arc::new(val) as Arc<dyn ToolExecutor>
46    }
47}
48
49#[async_trait]
50impl ToolExecutor for RunningDockerExecutor {
51    #[tracing::instrument(skip(self), err)]
52    async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
53        match cmd {
54            Command::Shell(cmd) => self.exec_shell(cmd).await,
55            Command::ReadFile(path) => self.read_file(path).await,
56            Command::WriteFile(path, content) => self.write_file(path, content).await,
57            _ => unimplemented!(),
58        }
59    }
60
61    async fn stream_files(
62        &self,
63        path: &Path,
64        extensions: Option<Vec<String>>,
65    ) -> anyhow::Result<swiftide_core::indexing::IndexingStream> {
66        let extensions = extensions.unwrap_or_default();
67        Ok(self.as_file_loader(path, extensions).into_stream())
68    }
69}
70
71impl RunningDockerExecutor {
72    /// Starts a docker container with a given context and image name
73    pub async fn start(
74        builder: &DockerExecutor,
75    ) -> Result<RunningDockerExecutor, DockerExecutorError> {
76        let docker = Client::lazy_client().await?;
77
78        // Any temporary dockrerfile created during the build process
79        let mut tmp_dockerfile_name = None;
80        let mut image_name = builder.image_name.clone();
81        let dockerfile = &builder.dockerfile;
82        let context_path = &builder.context_path;
83        let user = builder.user.as_deref();
84        let container_uuid = builder.container_uuid;
85
86        // Only build if a dockerfile is provided
87        if let Some(dockerfile) = dockerfile {
88            // Prepare dockerfile
89            let dockerfile_manager = DockerfileManager::new(context_path);
90            let tmp_dockerfile = dockerfile_manager.prepare_dockerfile(dockerfile).await?;
91
92            // Build context
93            tracing::warn!(
94                "Creating archive for context from {}",
95                context_path.display()
96            );
97            let context = ContextBuilder::from_path(context_path, tmp_dockerfile.path())?
98                .build_tar()
99                .await?;
100
101            tracing::debug!("Context build with size: {} bytes", context.len());
102
103            let tmp_dockerfile_name_inner = tmp_dockerfile
104                .path()
105                .file_name()
106                .ok_or_else(|| {
107                    ContextError::CustomDockerfile("Could not read custom dockerfile".to_string())
108                })
109                .map(|s| s.to_string_lossy().to_string())?;
110
111            drop(tmp_dockerfile); // Make sure the temporary file is removed right away
112
113            // Build image
114            let tag = container_uuid
115                .to_string()
116                .split_once('-')
117                .map(|(tag, _)| tag)
118                .unwrap_or("latest")
119                .to_string();
120
121            let image_builder = ImageBuilder::new(docker.clone());
122            let image_name_with_tag = image_builder
123                .build_image(
124                    context,
125                    tmp_dockerfile_name_inner.as_ref(),
126                    &image_name,
127                    &tag,
128                )
129                .await?;
130
131            image_name = image_name_with_tag;
132            tmp_dockerfile_name = Some(tmp_dockerfile_name_inner);
133        }
134
135        // Configure container
136        let container_config = ContainerConfigurator::new(docker.socket_path.clone())
137            .create_container_config(&image_name, user);
138
139        // Start container
140        tracing::info!("Starting container with image: {image_name} and uuid: {container_uuid}");
141        let container_starter = ContainerStarter::new(docker.clone());
142        let (container_id, host_port) = container_starter
143            .start_container(&image_name, &container_uuid, container_config)
144            .await?;
145
146        // Remove the temporary dockerfile from the container
147
148        let executor = RunningDockerExecutor {
149            container_id,
150            docker,
151            host_port,
152            env_clear: builder.env_clear,
153            remove_env: builder.remove_env.clone(),
154            env: builder.env.clone(),
155            dropped: false,
156            retain_on_drop: builder.retain_on_drop,
157            cancel_token: Arc::new(CancellationToken::new()),
158        };
159
160        if let Some(tmp_dockerfile_name) = tmp_dockerfile_name {
161            executor
162                .exec_shell(&format!("rm {}", tmp_dockerfile_name.as_str()))
163                .await
164                .context("failed to remove temporary dockerfile")
165                .map_err(DockerExecutorError::Start)?;
166        }
167
168        Ok(executor)
169    }
170
171    /// Returns the underlying bollard status of the container
172    ///
173    /// Useful for checking if the executor is running or not
174    pub async fn container_state(&self) -> Result<ContainerState, DockerExecutorError> {
175        let container = self
176            .docker
177            .inspect_container(&self.container_id, None::<InspectContainerOptions>)
178            .await?;
179
180        container.state.ok_or_else(|| {
181            DockerExecutorError::ContainerStateMissing(self.container_id.to_string())
182        })
183    }
184
185    /// Check if the executor and its underlying container is running
186    ///
187    /// Will ignore any errors and assume it is not if there are
188    pub async fn is_running(&self) -> bool {
189        self.container_state()
190            .await
191            .map(|state| state.status == Some(ContainerStateStatusEnum::RUNNING))
192            .unwrap_or(false)
193    }
194
195    /// Returns the logs of the container
196    pub async fn logs(&self) -> Result<Vec<String>, DockerExecutorError> {
197        let mut logs = Vec::new();
198        let mut stream = self.docker.logs(
199            &self.container_id,
200            Some(bollard::query_parameters::LogsOptions {
201                follow: false,
202                stdout: true,
203                stderr: true,
204                tail: "all".to_string(),
205                ..Default::default()
206            }),
207        );
208
209        while let Some(log_result) = stream.next().await {
210            match log_result {
211                Ok(log_output) => match log_output {
212                    LogOutput::Console { message }
213                    | LogOutput::StdOut { message }
214                    | LogOutput::StdErr { message } => {
215                        logs.push(String::from_utf8_lossy(&message).trim().to_string());
216                    }
217                    _ => {}
218                },
219                Err(e) => tracing::error!("Error retrieving logs: {e}"), // Error handling
220            }
221        }
222
223        Ok(logs)
224    }
225
226    /// Streams the logs of the container as raw `bollard::container::LogOutput` items.
227    pub async fn logs_stream(
228        &self,
229    ) -> impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
230        let docker = self.docker.clone();
231        let container_id = self.container_id.clone();
232        let cancel = self.cancel_token.clone();
233
234        let (tx, rx) = tokio::sync::mpsc::channel(100);
235
236        tokio::spawn(async move {
237            tokio::select!(
238                _ = cancel.cancelled() => {
239                    tracing::debug!("Logs stream cancelled");
240                },
241                _ = async move {
242                    let mut stream = docker.logs(
243                        &container_id,
244                        Some(bollard::query_parameters::LogsOptions {
245                            follow: true,
246                            stdout: true,
247                            stderr: true,
248                            tail: "all".to_string(),
249                            ..Default::default()
250                        }),
251                    );
252                    tracing::debug!("Starting logs stream for container");
253                    while let Some(log_result) = stream.next().await {
254                        if let Err(err) = tx.send(log_result)
255                        .await {
256                            return tracing::error!("Failed to send log item: {}", err);
257                        }
258                    }
259                } => {
260                    tracing::debug!("Logs stream ended gracefully");
261                },
262                else => {
263                    tracing::error!("Logs stream ended unexpectedly");
264                }
265            );
266
267            tracing::debug!("Closing logs stream channel");
268        });
269
270        ReceiverStream::new(rx)
271    }
272
273    async fn exec_shell(&self, cmd: &str) -> Result<CommandOutput, CommandError> {
274        let mut client =
275            ShellExecutorClient::connect(format!("http://127.0.0.1:{}", self.host_port))
276                .await
277                .map_err(anyhow::Error::from)?;
278
279        let request = tonic::Request::new(codegen::ShellRequest {
280            command: cmd.to_string(),
281            env_clear: self.env_clear,
282            env_remove: self.remove_env.clone(),
283            envs: self.env.clone(),
284        });
285
286        let response = client
287            .exec_shell(request)
288            .await
289            .map_err(anyhow::Error::from)?;
290
291        let codegen::ShellResponse {
292            stdout,
293            stderr,
294            exit_code,
295        } = response.into_inner();
296
297        // // Trim both stdout and stderr to remove surrounding whitespace and newlines
298        let output = stdout.trim().to_string() + stderr.trim();
299        //
300        if exit_code == 0 {
301            Ok(output.into())
302        } else {
303            Err(CommandError::NonZeroExit(output.into()))
304        }
305    }
306
307    #[tracing::instrument(skip(self))]
308    async fn read_file(&self, path: &Path) -> Result<CommandOutput, CommandError> {
309        self.exec_shell(&format!("cat {}", path.display())).await
310    }
311
312    #[tracing::instrument(skip(self, content))]
313    async fn write_file(&self, path: &Path, content: &str) -> Result<CommandOutput, CommandError> {
314        let cmd = indoc::formatdoc! {r#"
315            cat << 'EOFKWAAK' > {path}
316            {content}
317            EOFKWAAK"#,
318            path = path.display(),
319            content = content.trim_end()
320
321        };
322
323        let write_file_result = self.exec_shell(&cmd).await;
324
325        // If the directory or file does not exist, create it
326        if let Err(CommandError::NonZeroExit(write_file)) = &write_file_result
327            && [
328                "no such file or directory",
329                "directory nonexistent",
330                "nonexistent directory",
331            ]
332            .iter()
333            .any(|&s| write_file.output.to_lowercase().contains(s))
334        {
335            let path = path.parent().context("No parent directory")?;
336            let mkdircmd = format!("mkdir -p {}", path.display());
337            let _ = self.exec_shell(&mkdircmd).await?;
338
339            return self.exec_shell(&cmd).await;
340        }
341
342        write_file_result
343    }
344
345    /// Stops and removes the container associated with this executor.
346    pub async fn shutdown(&self) -> Result<(), DockerExecutorError> {
347        // Stop any jobs that might block the docker socket
348        self.cancel_token.cancel();
349
350        tracing::warn!(
351            "Dropped; stopping and removing container {container_id}",
352            container_id = self.container_id
353        );
354
355        let docker = self.docker.clone();
356        let container_id = self.container_id.clone();
357
358        tracing::debug!(
359            "Stopping container {container_id}",
360            container_id = container_id
361        );
362        docker
363            .kill_container(
364                &container_id,
365                Some(KillContainerOptions {
366                    signal: "SIGTERM".to_string(),
367                }),
368            )
369            .await?;
370
371        tracing::debug!(
372            "Removing container {container_id}",
373            container_id = container_id
374        );
375
376        docker
377            .remove_container(
378                &container_id,
379                Some(RemoveContainerOptions {
380                    force: true,
381                    v: true,
382                    ..Default::default()
383                }),
384            )
385            .await?;
386
387        Ok(())
388    }
389}
390
391impl Drop for RunningDockerExecutor {
392    fn drop(&mut self) {
393        if self.dropped {
394            tracing::debug!(
395                "Executor already dropped; skipping stop and remove for container {}",
396                self.container_id
397            );
398            return;
399        }
400        if self.retain_on_drop {
401            tracing::debug!(
402                "Retaining container {} on drop; not stopping or removing",
403                self.container_id
404            );
405            return;
406        }
407        self.dropped = true;
408        self.cancel_token.cancel();
409
410        let this = self.clone();
411        let container_id = self.container_id.clone();
412
413        tokio::task::spawn_blocking(move || {
414            let handle = tokio::runtime::Handle::current();
415            handle.block_on(async move { this.shutdown().await })
416        });
417        tracing::debug!("Container stopped {container_id}");
418    }
419}