swiftide_docker_executor/
running_docker_executor.rs

1use anyhow::Context as _;
2use async_trait::async_trait;
3use bollard::{
4    query_parameters::{InspectContainerOptions, KillContainerOptions, RemoveContainerOptions},
5    secret::{ContainerState, ContainerStateStatusEnum},
6};
7use codegen::shell_executor_client::ShellExecutorClient;
8use futures_util::Stream;
9use std::{collections::HashMap, net::IpAddr, path::Path, sync::Arc};
10pub use swiftide_core::ToolExecutor;
11use swiftide_core::{Command, CommandError, CommandOutput, Loader as _, prelude::StreamExt as _};
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16    ContextBuilder, ContextError, DockerExecutor, DockerExecutorError, client::Client,
17    container_configurator::ContainerConfigurator, container_starter::ContainerStarter,
18    dockerfile_manager::DockerfileManager, image_builder::ImageBuilder,
19};
20
21pub mod codegen {
22    tonic::include_proto!("shell");
23}
24pub use bollard::container::LogOutput;
25
26#[derive(Clone, Debug)]
27pub struct RunningDockerExecutor {
28    pub container_id: String,
29    pub(crate) docker: Arc<Client>,
30    pub container_port: String,
31    pub container_ip: IpAddr,
32    dropped: bool,
33    retain_on_drop: bool,
34
35    // Default environment configuration for the executor
36    pub(crate) env_clear: bool,
37    pub(crate) remove_env: Vec<String>,
38    pub(crate) env: HashMap<String, String>,
39
40    /// Cancellation token to stop anything polling the docker api
41    cancel_token: Arc<CancellationToken>,
42}
43
44impl From<RunningDockerExecutor> for Arc<dyn ToolExecutor> {
45    fn from(val: RunningDockerExecutor) -> Self {
46        Arc::new(val) as Arc<dyn ToolExecutor>
47    }
48}
49
50#[async_trait]
51impl ToolExecutor for RunningDockerExecutor {
52    #[tracing::instrument(skip(self), err)]
53    async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
54        match cmd {
55            Command::Shell(cmd) => self.exec_shell(cmd).await,
56            Command::ReadFile(path) => self.read_file(path).await,
57            Command::WriteFile(path, content) => self.write_file(path, content).await,
58            _ => unimplemented!(),
59        }
60    }
61
62    async fn stream_files(
63        &self,
64        path: &Path,
65        extensions: Option<Vec<String>>,
66    ) -> anyhow::Result<swiftide_core::indexing::IndexingStream<String>> {
67        let extensions = extensions.unwrap_or_default();
68        Ok(self.as_file_loader(path, extensions).into_stream())
69    }
70}
71
72impl RunningDockerExecutor {
73    /// Starts a docker container with a given context and image name
74    pub async fn start(
75        builder: &DockerExecutor,
76    ) -> Result<RunningDockerExecutor, DockerExecutorError> {
77        let docker = Client::lazy_client().await?;
78
79        // Any temporary dockrerfile created during the build process
80        let mut tmp_dockerfile_name = None;
81        let mut image_name = builder.image_name.clone();
82        let dockerfile = &builder.dockerfile;
83        let context_path = &builder.context_path;
84        let user = builder.user.as_deref();
85        let container_uuid = builder.container_uuid;
86
87        // Only build if a dockerfile is provided
88        if let Some(dockerfile) = dockerfile {
89            // Prepare dockerfile
90            let dockerfile_manager = DockerfileManager::new(context_path);
91            let tmp_dockerfile = dockerfile_manager.prepare_dockerfile(dockerfile).await?;
92
93            // Build context
94            tracing::warn!(
95                "Creating archive for context from {}",
96                context_path.display()
97            );
98            let context = ContextBuilder::from_path(context_path, tmp_dockerfile.path())?
99                .build_tar()
100                .await?;
101
102            tracing::debug!("Context build with size: {} bytes", context.len());
103
104            let tmp_dockerfile_name_inner = tmp_dockerfile
105                .path()
106                .file_name()
107                .ok_or_else(|| {
108                    ContextError::CustomDockerfile("Could not read custom dockerfile".to_string())
109                })
110                .map(|s| s.to_string_lossy().to_string())?;
111
112            drop(tmp_dockerfile); // Make sure the temporary file is removed right away
113
114            // Build image
115            let tag = container_uuid
116                .to_string()
117                .split_once('-')
118                .map(|(tag, _)| tag)
119                .unwrap_or("latest")
120                .to_string();
121
122            let image_builder = ImageBuilder::new(docker.clone());
123            let image_name_with_tag = image_builder
124                .build_image(
125                    context,
126                    tmp_dockerfile_name_inner.as_ref(),
127                    &image_name,
128                    &tag,
129                )
130                .await?;
131
132            image_name = image_name_with_tag;
133            tmp_dockerfile_name = Some(tmp_dockerfile_name_inner);
134        }
135
136        // Configure container
137        let container_config = ContainerConfigurator::new(docker.socket_path.clone())
138            .create_container_config(&image_name, user, &docker)
139            .await;
140
141        // Start container
142        tracing::info!("Starting container with image: {image_name} and uuid: {container_uuid}");
143        let container_starter = ContainerStarter::new(docker.clone());
144        let (container_id, container_ip, container_port) = container_starter
145            .start_container(&image_name, &container_uuid, container_config)
146            .await?;
147
148        // Remove the temporary dockerfile from the container
149
150        let executor = RunningDockerExecutor {
151            container_id,
152            docker,
153            container_port,
154            container_ip,
155            env_clear: builder.env_clear,
156            remove_env: builder.remove_env.clone(),
157            env: builder.env.clone(),
158            dropped: false,
159            retain_on_drop: builder.retain_on_drop,
160            cancel_token: Arc::new(CancellationToken::new()),
161        };
162
163        if let Some(tmp_dockerfile_name) = tmp_dockerfile_name {
164            executor
165                .exec_shell(&format!("rm {}", tmp_dockerfile_name.as_str()))
166                .await
167                .context("failed to remove temporary dockerfile")
168                .map_err(DockerExecutorError::Start)?;
169        }
170
171        Ok(executor)
172    }
173
174    /// Returns the underlying bollard status of the container
175    ///
176    /// Useful for checking if the executor is running or not
177    pub async fn container_state(&self) -> Result<ContainerState, DockerExecutorError> {
178        let container = self
179            .docker
180            .inspect_container(&self.container_id, None::<InspectContainerOptions>)
181            .await?;
182
183        container.state.ok_or_else(|| {
184            DockerExecutorError::ContainerStateMissing(self.container_id.to_string())
185        })
186    }
187
188    /// Check if the executor and its underlying container is running
189    ///
190    /// Will ignore any errors and assume it is not if there are
191    pub async fn is_running(&self) -> bool {
192        self.container_state()
193            .await
194            .map(|state| state.status == Some(ContainerStateStatusEnum::RUNNING))
195            .unwrap_or(false)
196    }
197
198    /// Returns the logs of the container
199    pub async fn logs(&self) -> Result<Vec<String>, DockerExecutorError> {
200        let mut logs = Vec::new();
201        let mut stream = self.docker.logs(
202            &self.container_id,
203            Some(bollard::query_parameters::LogsOptions {
204                follow: false,
205                stdout: true,
206                stderr: true,
207                tail: "all".to_string(),
208                ..Default::default()
209            }),
210        );
211
212        while let Some(log_result) = stream.next().await {
213            match log_result {
214                Ok(log_output) => match log_output {
215                    LogOutput::Console { message }
216                    | LogOutput::StdOut { message }
217                    | LogOutput::StdErr { message } => {
218                        logs.push(String::from_utf8_lossy(&message).trim().to_string());
219                    }
220                    _ => {}
221                },
222                Err(e) => tracing::error!("Error retrieving logs: {e}"), // Error handling
223            }
224        }
225
226        Ok(logs)
227    }
228
229    /// Streams the logs of the container as raw `bollard::container::LogOutput` items.
230    pub async fn logs_stream(
231        &self,
232    ) -> impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
233        let docker = self.docker.clone();
234        let container_id = self.container_id.clone();
235        let cancel = self.cancel_token.clone();
236
237        let (tx, rx) = tokio::sync::mpsc::channel(100);
238
239        tokio::spawn(async move {
240            tokio::select!(
241                _ = cancel.cancelled() => {
242                    tracing::debug!("Logs stream cancelled");
243                },
244                _ = async move {
245                    let mut stream = docker.logs(
246                        &container_id,
247                        Some(bollard::query_parameters::LogsOptions {
248                            follow: true,
249                            stdout: true,
250                            stderr: true,
251                            tail: "all".to_string(),
252                            ..Default::default()
253                        }),
254                    );
255                    tracing::debug!("Starting logs stream for container");
256                    while let Some(log_result) = stream.next().await {
257                        if let Err(err) = tx.send(log_result)
258                        .await {
259                            return tracing::error!("Failed to send log item: {}", err);
260                        }
261                    }
262                } => {
263                    tracing::debug!("Logs stream ended gracefully");
264                },
265                else => {
266                    tracing::error!("Logs stream ended unexpectedly");
267                }
268            );
269
270            tracing::debug!("Closing logs stream channel");
271        });
272
273        ReceiverStream::new(rx)
274    }
275
276    async fn exec_shell(&self, cmd: &str) -> Result<CommandOutput, CommandError> {
277        let mut client = ShellExecutorClient::connect(format!(
278            "http://{}:{}",
279            self.container_ip, self.container_port
280        ))
281        .await
282        .map_err(anyhow::Error::from)?;
283
284        let request = tonic::Request::new(codegen::ShellRequest {
285            command: cmd.to_string(),
286            env_clear: self.env_clear,
287            env_remove: self.remove_env.clone(),
288            envs: self.env.clone(),
289        });
290
291        let response = client
292            .exec_shell(request)
293            .await
294            .map_err(anyhow::Error::from)?;
295
296        let codegen::ShellResponse {
297            stdout,
298            stderr,
299            exit_code,
300        } = response.into_inner();
301
302        // // Trim both stdout and stderr to remove surrounding whitespace and newlines
303        let output = stdout.trim().to_string() + stderr.trim();
304        //
305        if exit_code == 0 {
306            Ok(output.into())
307        } else {
308            Err(CommandError::NonZeroExit(output.into()))
309        }
310    }
311
312    #[tracing::instrument(skip(self))]
313    async fn read_file(&self, path: &Path) -> Result<CommandOutput, CommandError> {
314        self.exec_shell(&format!("cat {}", path.display())).await
315    }
316
317    #[tracing::instrument(skip(self, content))]
318    async fn write_file(&self, path: &Path, content: &str) -> Result<CommandOutput, CommandError> {
319        let cmd = indoc::formatdoc! {r#"
320            cat << 'EOFKWAAK' > {path}
321            {content}
322            EOFKWAAK"#,
323            path = path.display(),
324            content = content.trim_end()
325
326        };
327
328        let write_file_result = self.exec_shell(&cmd).await;
329
330        // If the directory or file does not exist, create it
331        if let Err(CommandError::NonZeroExit(write_file)) = &write_file_result
332            && [
333                "no such file or directory",
334                "directory nonexistent",
335                "nonexistent directory",
336            ]
337            .iter()
338            .any(|&s| write_file.output.to_lowercase().contains(s))
339        {
340            let path = path.parent().context("No parent directory")?;
341            let mkdircmd = format!("mkdir -p {}", path.display());
342            let _ = self.exec_shell(&mkdircmd).await?;
343
344            return self.exec_shell(&cmd).await;
345        }
346
347        write_file_result
348    }
349
350    /// Stops and removes the container associated with this executor.
351    pub async fn shutdown(&self) -> Result<(), DockerExecutorError> {
352        // Stop any jobs that might block the docker socket
353        self.cancel_token.cancel();
354
355        tracing::warn!(
356            "Dropped; stopping and removing container {container_id}",
357            container_id = self.container_id
358        );
359
360        let docker = self.docker.clone();
361        let container_id = self.container_id.clone();
362
363        tracing::debug!(
364            "Stopping container {container_id}",
365            container_id = container_id
366        );
367        docker
368            .kill_container(
369                &container_id,
370                Some(KillContainerOptions {
371                    signal: "SIGTERM".to_string(),
372                }),
373            )
374            .await?;
375
376        tracing::debug!(
377            "Removing container {container_id}",
378            container_id = container_id
379        );
380
381        docker
382            .remove_container(
383                &container_id,
384                Some(RemoveContainerOptions {
385                    force: true,
386                    v: true,
387                    ..Default::default()
388                }),
389            )
390            .await?;
391
392        Ok(())
393    }
394}
395
396impl Drop for RunningDockerExecutor {
397    fn drop(&mut self) {
398        if self.dropped {
399            tracing::debug!(
400                "Executor already dropped; skipping stop and remove for container {}",
401                self.container_id
402            );
403            return;
404        }
405        if self.retain_on_drop {
406            tracing::debug!(
407                "Retaining container {} on drop; not stopping or removing",
408                self.container_id
409            );
410            return;
411        }
412        self.dropped = true;
413        self.cancel_token.cancel();
414
415        let this = self.clone();
416        let container_id = self.container_id.clone();
417
418        tokio::task::spawn_blocking(move || {
419            let handle = tokio::runtime::Handle::current();
420            handle.block_on(async move { this.shutdown().await })
421        });
422        tracing::debug!("Container stopped {container_id}");
423    }
424}