swiftide_docker_executor/
running_docker_executor.rs1use anyhow::Context as _;
2use async_trait::async_trait;
3use bollard::{
4 query_parameters::{InspectContainerOptions, KillContainerOptions, RemoveContainerOptions},
5 secret::{ContainerState, ContainerStateStatusEnum},
6};
7use codegen::shell_executor_client::ShellExecutorClient;
8use futures_util::Stream;
9use std::{collections::HashMap, path::Path, sync::Arc};
10pub use swiftide_core::ToolExecutor;
11use swiftide_core::{Command, CommandError, CommandOutput, Loader as _, prelude::StreamExt as _};
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16 ContextBuilder, ContextError, DockerExecutor, DockerExecutorError, client::Client,
17 container_configurator::ContainerConfigurator, container_starter::ContainerStarter,
18 dockerfile_manager::DockerfileManager, image_builder::ImageBuilder,
19};
20
21pub mod codegen {
22 tonic::include_proto!("shell");
23}
24pub use bollard::container::LogOutput;
25
26#[derive(Clone, Debug)]
27pub struct RunningDockerExecutor {
28 pub container_id: String,
29 pub(crate) docker: Arc<Client>,
30 pub host_port: String,
31 dropped: bool,
32 retain_on_drop: bool,
33
34 pub(crate) env_clear: bool,
36 pub(crate) remove_env: Vec<String>,
37 pub(crate) env: HashMap<String, String>,
38
39 cancel_token: Arc<CancellationToken>,
41}
42
43impl From<RunningDockerExecutor> for Arc<dyn ToolExecutor> {
44 fn from(val: RunningDockerExecutor) -> Self {
45 Arc::new(val) as Arc<dyn ToolExecutor>
46 }
47}
48
49#[async_trait]
50impl ToolExecutor for RunningDockerExecutor {
51 #[tracing::instrument(skip(self), err)]
52 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
53 match cmd {
54 Command::Shell(cmd) => self.exec_shell(cmd).await,
55 Command::ReadFile(path) => self.read_file(path).await,
56 Command::WriteFile(path, content) => self.write_file(path, content).await,
57 _ => unimplemented!(),
58 }
59 }
60
61 async fn stream_files(
62 &self,
63 path: &Path,
64 extensions: Option<Vec<String>>,
65 ) -> anyhow::Result<swiftide_core::indexing::IndexingStream> {
66 let extensions = extensions.unwrap_or_default();
67 Ok(self.as_file_loader(path, extensions).into_stream())
68 }
69}
70
71impl RunningDockerExecutor {
72 pub async fn start(
74 builder: &DockerExecutor,
75 ) -> Result<RunningDockerExecutor, DockerExecutorError> {
76 let docker = Client::lazy_client().await?;
77
78 let mut tmp_dockerfile_name = None;
80 let mut image_name = builder.image_name.clone();
81 let dockerfile = &builder.dockerfile;
82 let context_path = &builder.context_path;
83 let user = builder.user.as_deref();
84 let container_uuid = builder.container_uuid;
85
86 if let Some(dockerfile) = dockerfile {
88 let dockerfile_manager = DockerfileManager::new(context_path);
90 let tmp_dockerfile = dockerfile_manager.prepare_dockerfile(dockerfile).await?;
91
92 tracing::warn!(
94 "Creating archive for context from {}",
95 context_path.display()
96 );
97 let context = ContextBuilder::from_path(context_path, tmp_dockerfile.path())?
98 .build_tar()
99 .await?;
100
101 tracing::debug!("Context build with size: {} bytes", context.len());
102
103 let tmp_dockerfile_name_inner = tmp_dockerfile
104 .path()
105 .file_name()
106 .ok_or_else(|| {
107 ContextError::CustomDockerfile("Could not read custom dockerfile".to_string())
108 })
109 .map(|s| s.to_string_lossy().to_string())?;
110
111 drop(tmp_dockerfile); let tag = container_uuid
115 .to_string()
116 .split_once('-')
117 .map(|(tag, _)| tag)
118 .unwrap_or("latest")
119 .to_string();
120
121 let image_builder = ImageBuilder::new(docker.clone());
122 let image_name_with_tag = image_builder
123 .build_image(
124 context,
125 tmp_dockerfile_name_inner.as_ref(),
126 &image_name,
127 &tag,
128 )
129 .await?;
130
131 image_name = image_name_with_tag;
132 tmp_dockerfile_name = Some(tmp_dockerfile_name_inner);
133 }
134
135 let container_config = ContainerConfigurator::new(docker.socket_path.clone())
137 .create_container_config(&image_name, user);
138
139 tracing::info!("Starting container with image: {image_name} and uuid: {container_uuid}");
141 let container_starter = ContainerStarter::new(docker.clone());
142 let (container_id, host_port) = container_starter
143 .start_container(&image_name, &container_uuid, container_config)
144 .await?;
145
146 let executor = RunningDockerExecutor {
149 container_id,
150 docker,
151 host_port,
152 env_clear: builder.env_clear,
153 remove_env: builder.remove_env.clone(),
154 env: builder.env.clone(),
155 dropped: false,
156 retain_on_drop: builder.retain_on_drop,
157 cancel_token: Arc::new(CancellationToken::new()),
158 };
159
160 if let Some(tmp_dockerfile_name) = tmp_dockerfile_name {
161 executor
162 .exec_shell(&format!("rm {}", tmp_dockerfile_name.as_str()))
163 .await
164 .context("failed to remove temporary dockerfile")
165 .map_err(DockerExecutorError::Start)?;
166 }
167
168 Ok(executor)
169 }
170
171 pub async fn container_state(&self) -> Result<ContainerState, DockerExecutorError> {
175 let container = self
176 .docker
177 .inspect_container(&self.container_id, None::<InspectContainerOptions>)
178 .await?;
179
180 container.state.ok_or_else(|| {
181 DockerExecutorError::ContainerStateMissing(self.container_id.to_string())
182 })
183 }
184
185 pub async fn is_running(&self) -> bool {
189 self.container_state()
190 .await
191 .map(|state| state.status == Some(ContainerStateStatusEnum::RUNNING))
192 .unwrap_or(false)
193 }
194
195 pub async fn logs(&self) -> Result<Vec<String>, DockerExecutorError> {
197 let mut logs = Vec::new();
198 let mut stream = self.docker.logs(
199 &self.container_id,
200 Some(bollard::query_parameters::LogsOptions {
201 follow: false,
202 stdout: true,
203 stderr: true,
204 tail: "all".to_string(),
205 ..Default::default()
206 }),
207 );
208
209 while let Some(log_result) = stream.next().await {
210 match log_result {
211 Ok(log_output) => match log_output {
212 LogOutput::Console { message }
213 | LogOutput::StdOut { message }
214 | LogOutput::StdErr { message } => {
215 logs.push(String::from_utf8_lossy(&message).trim().to_string());
216 }
217 _ => {}
218 },
219 Err(e) => tracing::error!("Error retrieving logs: {e}"), }
221 }
222
223 Ok(logs)
224 }
225
226 pub async fn logs_stream(
228 &self,
229 ) -> impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
230 let docker = self.docker.clone();
231 let container_id = self.container_id.clone();
232 let cancel = self.cancel_token.clone();
233
234 let (tx, rx) = tokio::sync::mpsc::channel(100);
235
236 tokio::spawn(async move {
237 tokio::select!(
238 _ = cancel.cancelled() => {
239 tracing::debug!("Logs stream cancelled");
240 },
241 _ = async move {
242 let mut stream = docker.logs(
243 &container_id,
244 Some(bollard::query_parameters::LogsOptions {
245 follow: true,
246 stdout: true,
247 stderr: true,
248 tail: "all".to_string(),
249 ..Default::default()
250 }),
251 );
252 tracing::debug!("Starting logs stream for container");
253 while let Some(log_result) = stream.next().await {
254 if let Err(err) = tx.send(log_result)
255 .await {
256 return tracing::error!("Failed to send log item: {}", err);
257 }
258 }
259 } => {
260 tracing::debug!("Logs stream ended gracefully");
261 },
262 else => {
263 tracing::error!("Logs stream ended unexpectedly");
264 }
265 );
266
267 tracing::debug!("Closing logs stream channel");
268 });
269
270 ReceiverStream::new(rx)
271 }
272
273 async fn exec_shell(&self, cmd: &str) -> Result<CommandOutput, CommandError> {
274 let mut client =
275 ShellExecutorClient::connect(format!("http://127.0.0.1:{}", self.host_port))
276 .await
277 .map_err(anyhow::Error::from)?;
278
279 let request = tonic::Request::new(codegen::ShellRequest {
280 command: cmd.to_string(),
281 env_clear: self.env_clear,
282 env_remove: self.remove_env.clone(),
283 envs: self.env.clone(),
284 });
285
286 let response = client
287 .exec_shell(request)
288 .await
289 .map_err(anyhow::Error::from)?;
290
291 let codegen::ShellResponse {
292 stdout,
293 stderr,
294 exit_code,
295 } = response.into_inner();
296
297 let output = stdout.trim().to_string() + stderr.trim();
299 if exit_code == 0 {
301 Ok(output.into())
302 } else {
303 Err(CommandError::NonZeroExit(output.into()))
304 }
305 }
306
307 #[tracing::instrument(skip(self))]
308 async fn read_file(&self, path: &Path) -> Result<CommandOutput, CommandError> {
309 self.exec_shell(&format!("cat {}", path.display())).await
310 }
311
312 #[tracing::instrument(skip(self, content))]
313 async fn write_file(&self, path: &Path, content: &str) -> Result<CommandOutput, CommandError> {
314 let cmd = indoc::formatdoc! {r#"
315 cat << 'EOFKWAAK' > {path}
316 {content}
317 EOFKWAAK"#,
318 path = path.display(),
319 content = content.trim_end()
320
321 };
322
323 let write_file_result = self.exec_shell(&cmd).await;
324
325 if let Err(CommandError::NonZeroExit(write_file)) = &write_file_result
327 && [
328 "no such file or directory",
329 "directory nonexistent",
330 "nonexistent directory",
331 ]
332 .iter()
333 .any(|&s| write_file.output.to_lowercase().contains(s))
334 {
335 let path = path.parent().context("No parent directory")?;
336 let mkdircmd = format!("mkdir -p {}", path.display());
337 let _ = self.exec_shell(&mkdircmd).await?;
338
339 return self.exec_shell(&cmd).await;
340 }
341
342 write_file_result
343 }
344
345 pub async fn shutdown(&self) -> Result<(), DockerExecutorError> {
347 self.cancel_token.cancel();
349
350 tracing::warn!(
351 "Dropped; stopping and removing container {container_id}",
352 container_id = self.container_id
353 );
354
355 let docker = self.docker.clone();
356 let container_id = self.container_id.clone();
357
358 tracing::debug!(
359 "Stopping container {container_id}",
360 container_id = container_id
361 );
362 docker
363 .kill_container(
364 &container_id,
365 Some(KillContainerOptions {
366 signal: "SIGTERM".to_string(),
367 }),
368 )
369 .await?;
370
371 tracing::debug!(
372 "Removing container {container_id}",
373 container_id = container_id
374 );
375
376 docker
377 .remove_container(
378 &container_id,
379 Some(RemoveContainerOptions {
380 force: true,
381 v: true,
382 ..Default::default()
383 }),
384 )
385 .await?;
386
387 Ok(())
388 }
389}
390
391impl Drop for RunningDockerExecutor {
392 fn drop(&mut self) {
393 if self.dropped {
394 tracing::debug!(
395 "Executor already dropped; skipping stop and remove for container {}",
396 self.container_id
397 );
398 return;
399 }
400 if self.retain_on_drop {
401 tracing::debug!(
402 "Retaining container {} on drop; not stopping or removing",
403 self.container_id
404 );
405 return;
406 }
407 self.dropped = true;
408 self.cancel_token.cancel();
409
410 let this = self.clone();
411 let container_id = self.container_id.clone();
412
413 tokio::task::spawn_blocking(move || {
414 let handle = tokio::runtime::Handle::current();
415 handle.block_on(async move { this.shutdown().await })
416 });
417 tracing::debug!("Container stopped {container_id}");
418 }
419}