swiftide_docker_executor/
running_docker_executor.rs1use anyhow::Context as _;
2use async_trait::async_trait;
3use bollard::{
4 query_parameters::{InspectContainerOptions, KillContainerOptions, RemoveContainerOptions},
5 secret::{ContainerState, ContainerStateStatusEnum},
6};
7use codegen::shell_executor_client::ShellExecutorClient;
8use futures_util::Stream;
9use std::{collections::HashMap, net::IpAddr, path::Path, sync::Arc};
10pub use swiftide_core::ToolExecutor;
11use swiftide_core::{Command, CommandError, CommandOutput, Loader as _, prelude::StreamExt as _};
12use tokio_stream::wrappers::ReceiverStream;
13use tokio_util::sync::CancellationToken;
14
15use crate::{
16 ContextBuilder, ContextError, DockerExecutor, DockerExecutorError, client::Client,
17 container_configurator::ContainerConfigurator, container_starter::ContainerStarter,
18 dockerfile_manager::DockerfileManager, image_builder::ImageBuilder,
19};
20
21pub mod codegen {
22 tonic::include_proto!("shell");
23}
24pub use bollard::container::LogOutput;
25
26#[derive(Clone, Debug)]
27pub struct RunningDockerExecutor {
28 pub container_id: String,
29 pub(crate) docker: Arc<Client>,
30 pub container_port: String,
31 pub container_ip: IpAddr,
32 dropped: bool,
33 retain_on_drop: bool,
34
35 pub(crate) env_clear: bool,
37 pub(crate) remove_env: Vec<String>,
38 pub(crate) env: HashMap<String, String>,
39
40 cancel_token: Arc<CancellationToken>,
42}
43
44impl From<RunningDockerExecutor> for Arc<dyn ToolExecutor> {
45 fn from(val: RunningDockerExecutor) -> Self {
46 Arc::new(val) as Arc<dyn ToolExecutor>
47 }
48}
49
50#[async_trait]
51impl ToolExecutor for RunningDockerExecutor {
52 #[tracing::instrument(skip(self), err)]
53 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
54 match cmd {
55 Command::Shell(cmd) => self.exec_shell(cmd).await,
56 Command::ReadFile(path) => self.read_file(path).await,
57 Command::WriteFile(path, content) => self.write_file(path, content).await,
58 _ => unimplemented!(),
59 }
60 }
61
62 async fn stream_files(
63 &self,
64 path: &Path,
65 extensions: Option<Vec<String>>,
66 ) -> anyhow::Result<swiftide_core::indexing::IndexingStream<String>> {
67 let extensions = extensions.unwrap_or_default();
68 Ok(self.as_file_loader(path, extensions).into_stream())
69 }
70}
71
72impl RunningDockerExecutor {
73 pub async fn start(
75 builder: &DockerExecutor,
76 ) -> Result<RunningDockerExecutor, DockerExecutorError> {
77 let docker = Client::lazy_client().await?;
78
79 let mut tmp_dockerfile_name = None;
81 let mut image_name = builder.image_name.clone();
82 let dockerfile = &builder.dockerfile;
83 let context_path = &builder.context_path;
84 let user = builder.user.as_deref();
85 let container_uuid = builder.container_uuid;
86
87 if let Some(dockerfile) = dockerfile {
89 let dockerfile_manager = DockerfileManager::new(context_path);
91 let tmp_dockerfile = dockerfile_manager.prepare_dockerfile(dockerfile).await?;
92
93 tracing::warn!(
95 "Creating archive for context from {}",
96 context_path.display()
97 );
98 let context = ContextBuilder::from_path(context_path, tmp_dockerfile.path())?
99 .build_tar()
100 .await?;
101
102 tracing::debug!("Context build with size: {} bytes", context.len());
103
104 let tmp_dockerfile_name_inner = tmp_dockerfile
105 .path()
106 .file_name()
107 .ok_or_else(|| {
108 ContextError::CustomDockerfile("Could not read custom dockerfile".to_string())
109 })
110 .map(|s| s.to_string_lossy().to_string())?;
111
112 drop(tmp_dockerfile); let tag = container_uuid
116 .to_string()
117 .split_once('-')
118 .map(|(tag, _)| tag)
119 .unwrap_or("latest")
120 .to_string();
121
122 let image_builder = ImageBuilder::new(docker.clone());
123 let image_name_with_tag = image_builder
124 .build_image(
125 context,
126 tmp_dockerfile_name_inner.as_ref(),
127 &image_name,
128 &tag,
129 )
130 .await?;
131
132 image_name = image_name_with_tag;
133 tmp_dockerfile_name = Some(tmp_dockerfile_name_inner);
134 }
135
136 let container_config = ContainerConfigurator::new(docker.socket_path.clone())
138 .create_container_config(&image_name, user, &docker)
139 .await;
140
141 tracing::info!("Starting container with image: {image_name} and uuid: {container_uuid}");
143 let container_starter = ContainerStarter::new(docker.clone());
144 let (container_id, container_ip, container_port) = container_starter
145 .start_container(&image_name, &container_uuid, container_config)
146 .await?;
147
148 let executor = RunningDockerExecutor {
151 container_id,
152 docker,
153 container_port,
154 container_ip,
155 env_clear: builder.env_clear,
156 remove_env: builder.remove_env.clone(),
157 env: builder.env.clone(),
158 dropped: false,
159 retain_on_drop: builder.retain_on_drop,
160 cancel_token: Arc::new(CancellationToken::new()),
161 };
162
163 if let Some(tmp_dockerfile_name) = tmp_dockerfile_name {
164 executor
165 .exec_shell(&format!("rm {}", tmp_dockerfile_name.as_str()))
166 .await
167 .context("failed to remove temporary dockerfile")
168 .map_err(DockerExecutorError::Start)?;
169 }
170
171 Ok(executor)
172 }
173
174 pub async fn container_state(&self) -> Result<ContainerState, DockerExecutorError> {
178 let container = self
179 .docker
180 .inspect_container(&self.container_id, None::<InspectContainerOptions>)
181 .await?;
182
183 container.state.ok_or_else(|| {
184 DockerExecutorError::ContainerStateMissing(self.container_id.to_string())
185 })
186 }
187
188 pub async fn is_running(&self) -> bool {
192 self.container_state()
193 .await
194 .map(|state| state.status == Some(ContainerStateStatusEnum::RUNNING))
195 .unwrap_or(false)
196 }
197
198 pub async fn logs(&self) -> Result<Vec<String>, DockerExecutorError> {
200 let mut logs = Vec::new();
201 let mut stream = self.docker.logs(
202 &self.container_id,
203 Some(bollard::query_parameters::LogsOptions {
204 follow: false,
205 stdout: true,
206 stderr: true,
207 tail: "all".to_string(),
208 ..Default::default()
209 }),
210 );
211
212 while let Some(log_result) = stream.next().await {
213 match log_result {
214 Ok(log_output) => match log_output {
215 LogOutput::Console { message }
216 | LogOutput::StdOut { message }
217 | LogOutput::StdErr { message } => {
218 logs.push(String::from_utf8_lossy(&message).trim().to_string());
219 }
220 _ => {}
221 },
222 Err(e) => tracing::error!("Error retrieving logs: {e}"), }
224 }
225
226 Ok(logs)
227 }
228
229 pub async fn logs_stream(
231 &self,
232 ) -> impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
233 let docker = self.docker.clone();
234 let container_id = self.container_id.clone();
235 let cancel = self.cancel_token.clone();
236
237 let (tx, rx) = tokio::sync::mpsc::channel(100);
238
239 tokio::spawn(async move {
240 tokio::select!(
241 _ = cancel.cancelled() => {
242 tracing::debug!("Logs stream cancelled");
243 },
244 _ = async move {
245 let mut stream = docker.logs(
246 &container_id,
247 Some(bollard::query_parameters::LogsOptions {
248 follow: true,
249 stdout: true,
250 stderr: true,
251 tail: "all".to_string(),
252 ..Default::default()
253 }),
254 );
255 tracing::debug!("Starting logs stream for container");
256 while let Some(log_result) = stream.next().await {
257 if let Err(err) = tx.send(log_result)
258 .await {
259 return tracing::error!("Failed to send log item: {}", err);
260 }
261 }
262 } => {
263 tracing::debug!("Logs stream ended gracefully");
264 },
265 else => {
266 tracing::error!("Logs stream ended unexpectedly");
267 }
268 );
269
270 tracing::debug!("Closing logs stream channel");
271 });
272
273 ReceiverStream::new(rx)
274 }
275
276 async fn exec_shell(&self, cmd: &str) -> Result<CommandOutput, CommandError> {
277 let mut client = ShellExecutorClient::connect(format!(
278 "http://{}:{}",
279 self.container_ip, self.container_port
280 ))
281 .await
282 .map_err(anyhow::Error::from)?;
283
284 let request = tonic::Request::new(codegen::ShellRequest {
285 command: cmd.to_string(),
286 env_clear: self.env_clear,
287 env_remove: self.remove_env.clone(),
288 envs: self.env.clone(),
289 });
290
291 let response = client
292 .exec_shell(request)
293 .await
294 .map_err(anyhow::Error::from)?;
295
296 let codegen::ShellResponse {
297 stdout,
298 stderr,
299 exit_code,
300 } = response.into_inner();
301
302 let output = stdout.trim().to_string() + stderr.trim();
304 if exit_code == 0 {
306 Ok(output.into())
307 } else {
308 Err(CommandError::NonZeroExit(output.into()))
309 }
310 }
311
312 #[tracing::instrument(skip(self))]
313 async fn read_file(&self, path: &Path) -> Result<CommandOutput, CommandError> {
314 self.exec_shell(&format!("cat {}", path.display())).await
315 }
316
317 #[tracing::instrument(skip(self, content))]
318 async fn write_file(&self, path: &Path, content: &str) -> Result<CommandOutput, CommandError> {
319 let cmd = indoc::formatdoc! {r#"
320 cat << 'EOFKWAAK' > {path}
321 {content}
322 EOFKWAAK"#,
323 path = path.display(),
324 content = content.trim_end()
325
326 };
327
328 let write_file_result = self.exec_shell(&cmd).await;
329
330 if let Err(CommandError::NonZeroExit(write_file)) = &write_file_result
332 && [
333 "no such file or directory",
334 "directory nonexistent",
335 "nonexistent directory",
336 ]
337 .iter()
338 .any(|&s| write_file.output.to_lowercase().contains(s))
339 {
340 let path = path.parent().context("No parent directory")?;
341 let mkdircmd = format!("mkdir -p {}", path.display());
342 let _ = self.exec_shell(&mkdircmd).await?;
343
344 return self.exec_shell(&cmd).await;
345 }
346
347 write_file_result
348 }
349
350 pub async fn shutdown(&self) -> Result<(), DockerExecutorError> {
352 self.cancel_token.cancel();
354
355 tracing::warn!(
356 "Dropped; stopping and removing container {container_id}",
357 container_id = self.container_id
358 );
359
360 let docker = self.docker.clone();
361 let container_id = self.container_id.clone();
362
363 tracing::debug!(
364 "Stopping container {container_id}",
365 container_id = container_id
366 );
367 docker
368 .kill_container(
369 &container_id,
370 Some(KillContainerOptions {
371 signal: "SIGTERM".to_string(),
372 }),
373 )
374 .await?;
375
376 tracing::debug!(
377 "Removing container {container_id}",
378 container_id = container_id
379 );
380
381 docker
382 .remove_container(
383 &container_id,
384 Some(RemoveContainerOptions {
385 force: true,
386 v: true,
387 ..Default::default()
388 }),
389 )
390 .await?;
391
392 Ok(())
393 }
394}
395
396impl Drop for RunningDockerExecutor {
397 fn drop(&mut self) {
398 if self.dropped {
399 tracing::debug!(
400 "Executor already dropped; skipping stop and remove for container {}",
401 self.container_id
402 );
403 return;
404 }
405 if self.retain_on_drop {
406 tracing::debug!(
407 "Retaining container {} on drop; not stopping or removing",
408 self.container_id
409 );
410 return;
411 }
412 self.dropped = true;
413 self.cancel_token.cancel();
414
415 let this = self.clone();
416 let container_id = self.container_id.clone();
417
418 tokio::task::spawn_blocking(move || {
419 let handle = tokio::runtime::Handle::current();
420 handle.block_on(async move { this.shutdown().await })
421 });
422 tracing::debug!("Container stopped {container_id}");
423 }
424}