1use anyhow::Context as _;
2use async_trait::async_trait;
3use bollard::{
4 query_parameters::{InspectContainerOptions, KillContainerOptions, RemoveContainerOptions},
5 secret::{ContainerState, ContainerStateStatusEnum},
6};
7use codegen::shell_executor_client::ShellExecutorClient;
8use futures_util::Stream;
9use std::{
10 collections::HashMap,
11 net::IpAddr,
12 path::{Path, PathBuf},
13 sync::Arc,
14 time::Duration,
15};
16pub use swiftide_core::ToolExecutor;
17use swiftide_core::{Command, CommandError, CommandOutput, Loader as _, prelude::StreamExt as _};
18use tokio_stream::wrappers::ReceiverStream;
19use tokio_util::sync::CancellationToken;
20
21use crate::{
22 ContextBuilder, ContextError, DockerExecutor, DockerExecutorError, client::Client,
23 container_configurator::ContainerConfigurator, container_starter::ContainerStarter,
24 dockerfile_manager::DockerfileManager, image_builder::ImageBuilder,
25};
26
27pub mod codegen {
28 tonic::include_proto!("shell");
29}
30pub use bollard::container::LogOutput;
31
32#[derive(Clone, Debug)]
33pub struct RunningDockerExecutor {
34 pub container_id: String,
35 pub(crate) docker: Arc<Client>,
36 pub container_port: String,
37 pub container_ip: IpAddr,
38 dropped: bool,
39 retain_on_drop: bool,
40
41 pub(crate) env_clear: bool,
43 pub(crate) remove_env: Vec<String>,
44 pub(crate) env: HashMap<String, String>,
45 pub(crate) default_timeout: Option<Duration>,
46 pub(crate) workdir: PathBuf,
47
48 cancel_token: Arc<CancellationToken>,
50}
51
52impl From<RunningDockerExecutor> for Arc<dyn ToolExecutor> {
53 fn from(val: RunningDockerExecutor) -> Self {
54 Arc::new(val) as Arc<dyn ToolExecutor>
55 }
56}
57
58#[async_trait]
59impl ToolExecutor for RunningDockerExecutor {
60 #[tracing::instrument(skip(self), err)]
61 async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
62 let workdir = self.resolve_workdir(cmd);
63 let timeout = self.resolve_timeout(cmd);
64
65 match cmd {
66 Command::Shell { command, .. } => self.exec_shell(command, &workdir, timeout).await,
67 Command::ReadFile { path, .. } => self.exec_read_file(&workdir, path, timeout).await,
68 Command::WriteFile { path, content, .. } => {
69 self.exec_write_file(&workdir, path, content, timeout).await
70 }
71 _ => unimplemented!(),
72 }
73 }
74
75 async fn stream_files(
76 &self,
77 path: &Path,
78 extensions: Option<Vec<String>>,
79 ) -> anyhow::Result<swiftide_core::indexing::IndexingStream<String>> {
80 let extensions = extensions.unwrap_or_default();
81 Ok(self.as_file_loader(path, extensions).into_stream())
82 }
83}
84
85impl RunningDockerExecutor {
86 pub async fn start(
88 builder: &DockerExecutor,
89 ) -> Result<RunningDockerExecutor, DockerExecutorError> {
90 let docker = Client::lazy_client().await?;
91
92 let mut tmp_dockerfile_name = None;
94 let mut image_name = builder.image_name.clone();
95 let dockerfile = &builder.dockerfile;
96 let context_path = &builder.context_path;
97 let user = builder.user.as_deref();
98 let container_uuid = builder.container_uuid;
99
100 if let Some(dockerfile) = dockerfile {
102 let dockerfile_manager = DockerfileManager::new(context_path);
104 let tmp_dockerfile = dockerfile_manager.prepare_dockerfile(dockerfile).await?;
105
106 tracing::warn!(
108 "Creating archive for context from {}",
109 context_path.display()
110 );
111 let context = ContextBuilder::from_path(context_path, tmp_dockerfile.path())?
112 .build_tar()
113 .await?;
114
115 tracing::debug!("Context build with size: {} bytes", context.len());
116
117 let tmp_dockerfile_name_inner = tmp_dockerfile
118 .path()
119 .file_name()
120 .ok_or_else(|| {
121 ContextError::CustomDockerfile("Could not read custom dockerfile".to_string())
122 })
123 .map(|s| s.to_string_lossy().to_string())?;
124
125 drop(tmp_dockerfile); let tag = container_uuid
129 .to_string()
130 .split_once('-')
131 .map(|(tag, _)| tag)
132 .unwrap_or("latest")
133 .to_string();
134
135 let image_builder = ImageBuilder::new(docker.clone());
136 let image_name_with_tag = image_builder
137 .build_image(
138 context,
139 tmp_dockerfile_name_inner.as_ref(),
140 &image_name,
141 &tag,
142 )
143 .await?;
144
145 image_name = image_name_with_tag;
146 tmp_dockerfile_name = Some(tmp_dockerfile_name_inner);
147 }
148
149 let container_config = ContainerConfigurator::new(docker.socket_path.clone())
151 .create_container_config(&image_name, user, &docker)
152 .await;
153
154 tracing::info!("Starting container with image: {image_name} and uuid: {container_uuid}");
156 let container_starter = ContainerStarter::new(docker.clone());
157 let (container_id, container_ip, container_port) = container_starter
158 .start_container(&image_name, &container_uuid, container_config)
159 .await?;
160
161 let executor = RunningDockerExecutor {
164 container_id,
165 docker,
166 container_port,
167 container_ip,
168 env_clear: builder.env_clear,
169 remove_env: builder.remove_env.clone(),
170 env: builder.env.clone(),
171 dropped: false,
172 retain_on_drop: builder.retain_on_drop,
173 cancel_token: Arc::new(CancellationToken::new()),
174 default_timeout: builder.default_timeout,
175 workdir: builder.workdir.clone(),
176 };
177
178 if let Some(tmp_dockerfile_name) = tmp_dockerfile_name {
179 let mut removal_targets = vec![tmp_dockerfile_name.clone()];
180
181 if executor.workdir.is_absolute() {
182 removal_targets.push(
183 executor
184 .workdir
185 .join(&tmp_dockerfile_name)
186 .display()
187 .to_string(),
188 );
189 }
190
191 let default_workdir = Path::new("/app");
192 if executor.workdir != default_workdir {
193 removal_targets.push(
194 default_workdir
195 .join(&tmp_dockerfile_name)
196 .display()
197 .to_string(),
198 );
199 }
200
201 removal_targets.sort();
202 removal_targets.dedup();
203
204 let removal_args = removal_targets
205 .iter()
206 .map(|target| format!("{target:?}"))
207 .collect::<Vec<_>>()
208 .join(" ");
209
210 let removal_cmd = format!("rm -f -- {removal_args}");
211
212 executor
213 .exec_shell(&removal_cmd, Path::new("/"), executor.default_timeout)
214 .await
215 .context("failed to remove temporary dockerfile")
216 .map_err(DockerExecutorError::Start)?;
217 }
218
219 Ok(executor)
220 }
221
222 pub async fn container_state(&self) -> Result<ContainerState, DockerExecutorError> {
226 let container = self
227 .docker
228 .inspect_container(&self.container_id, None::<InspectContainerOptions>)
229 .await?;
230
231 container.state.ok_or_else(|| {
232 DockerExecutorError::ContainerStateMissing(self.container_id.to_string())
233 })
234 }
235
236 pub async fn is_running(&self) -> bool {
240 self.container_state()
241 .await
242 .map(|state| state.status == Some(ContainerStateStatusEnum::RUNNING))
243 .unwrap_or(false)
244 }
245
246 pub async fn logs(&self) -> Result<Vec<String>, DockerExecutorError> {
248 let mut logs = Vec::new();
249 let mut stream = self.docker.logs(
250 &self.container_id,
251 Some(bollard::query_parameters::LogsOptions {
252 follow: false,
253 stdout: true,
254 stderr: true,
255 tail: "all".to_string(),
256 ..Default::default()
257 }),
258 );
259
260 while let Some(log_result) = stream.next().await {
261 match log_result {
262 Ok(log_output) => match log_output {
263 LogOutput::Console { message }
264 | LogOutput::StdOut { message }
265 | LogOutput::StdErr { message } => {
266 logs.push(String::from_utf8_lossy(&message).trim().to_string());
267 }
268 _ => {}
269 },
270 Err(e) => tracing::error!("Error retrieving logs: {e}"), }
272 }
273
274 Ok(logs)
275 }
276
277 pub async fn logs_stream(
279 &self,
280 ) -> impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
281 let docker = self.docker.clone();
282 let container_id = self.container_id.clone();
283 let cancel = self.cancel_token.clone();
284
285 let (tx, rx) = tokio::sync::mpsc::channel(100);
286
287 tokio::spawn(async move {
288 tokio::select!(
289 _ = cancel.cancelled() => {
290 tracing::debug!("Logs stream cancelled");
291 },
292 _ = async move {
293 let mut stream = docker.logs(
294 &container_id,
295 Some(bollard::query_parameters::LogsOptions {
296 follow: true,
297 stdout: true,
298 stderr: true,
299 tail: "all".to_string(),
300 ..Default::default()
301 }),
302 );
303 tracing::debug!("Starting logs stream for container");
304 while let Some(log_result) = stream.next().await {
305 if let Err(err) = tx.send(log_result)
306 .await {
307 return tracing::error!("Failed to send log item: {}", err);
308 }
309 }
310 } => {
311 tracing::debug!("Logs stream ended gracefully");
312 },
313 else => {
314 tracing::error!("Logs stream ended unexpectedly");
315 }
316 );
317
318 tracing::debug!("Closing logs stream channel");
319 });
320
321 ReceiverStream::new(rx)
322 }
323
324 fn resolve_workdir(&self, cmd: &Command) -> PathBuf {
325 match cmd.current_dir_path() {
326 Some(path) if path.is_absolute() => path.to_path_buf(),
327 Some(path) => self.workdir.join(path),
328 None => self.workdir.clone(),
329 }
330 }
331
332 fn resolve_timeout(&self, cmd: &Command) -> Option<Duration> {
333 cmd.timeout_duration().copied().or(self.default_timeout)
334 }
335
336 async fn exec_shell(
337 &self,
338 cmd: &str,
339 workdir: &Path,
340 timeout: Option<Duration>,
341 ) -> Result<CommandOutput, CommandError> {
342 let mut client = ShellExecutorClient::connect(format!(
343 "http://{}:{}",
344 self.container_ip, self.container_port
345 ))
346 .await
347 .map_err(anyhow::Error::from)?;
348
349 let timeout_ms = timeout.map(duration_to_millis);
350 tracing::debug!(?timeout_ms, "sending shell request with timeout");
351
352 let request = tonic::Request::new(codegen::ShellRequest {
353 command: cmd.to_string(),
354 env_clear: self.env_clear,
355 env_remove: self.remove_env.clone(),
356 envs: self.env.clone(),
357 timeout_ms,
358 cwd: Some(workdir.display().to_string()),
359 });
360
361 let response = match client.exec_shell(request).await {
362 Ok(resp) => resp.into_inner(),
363 Err(status) => {
364 if status.code() == tonic::Code::DeadlineExceeded {
365 if let Some(limit) = timeout {
366 let message = status.message().to_string();
367 let output = if message.is_empty() {
368 CommandOutput::empty()
369 } else {
370 CommandOutput::new(message)
371 };
372
373 return Err(CommandError::TimedOut {
374 timeout: limit,
375 output,
376 });
377 }
378
379 return Err(CommandError::ExecutorError(status.into()));
380 }
381
382 return Err(CommandError::ExecutorError(status.into()));
383 }
384 };
385
386 let codegen::ShellResponse {
387 stdout,
388 stderr,
389 exit_code,
390 } = response;
391
392 let stdout = stdout.trim().to_string();
393 let stderr = stderr.trim().to_string();
394 let merged = merge_stream_output(&stdout, &stderr);
395
396 if exit_code == 0 {
397 Ok(CommandOutput::new(merged))
398 } else {
399 Err(CommandError::NonZeroExit(CommandOutput::new(merged)))
400 }
401 }
402
403 #[tracing::instrument(skip(self))]
404 async fn exec_read_file(
405 &self,
406 workdir: &Path,
407 path: &Path,
408 timeout: Option<Duration>,
409 ) -> Result<CommandOutput, CommandError> {
410 let cmd = format!("cat {}", path.display());
411 self.exec_shell(&cmd, workdir, timeout).await
412 }
413
414 #[tracing::instrument(skip(self, content))]
415 async fn exec_write_file(
416 &self,
417 workdir: &Path,
418 path: &Path,
419 content: &str,
420 timeout: Option<Duration>,
421 ) -> Result<CommandOutput, CommandError> {
422 let cmd = indoc::formatdoc! {
423 r#"
424 cat << 'EOFKWAAK' > {path}
425 {content}
426 EOFKWAAK"#,
427 path = path.display(),
428 content = content.trim_end()
429
430 };
431
432 let write_file_result = self.exec_shell(&cmd, workdir, timeout).await;
433
434 if let Err(CommandError::NonZeroExit(write_file)) = &write_file_result
436 && [
437 "no such file or directory",
438 "directory nonexistent",
439 "nonexistent directory",
440 ]
441 .iter()
442 .any(|&s| write_file.output.to_lowercase().contains(s))
443 {
444 let path = path.parent().context("No parent directory")?;
445 let mkdircmd = format!("mkdir -p {}", path.display());
446 let _ = self.exec_shell(&mkdircmd, workdir, timeout).await?;
447
448 return self.exec_shell(&cmd, workdir, timeout).await;
449 }
450
451 write_file_result
452 }
453
454 pub async fn shutdown(&self) -> Result<(), DockerExecutorError> {
456 self.cancel_token.cancel();
458
459 tracing::warn!(
460 "Dropped; stopping and removing container {container_id}",
461 container_id = self.container_id
462 );
463
464 let docker = self.docker.clone();
465 let container_id = self.container_id.clone();
466
467 tracing::debug!(
468 "Stopping container {container_id}",
469 container_id = container_id
470 );
471 docker
472 .kill_container(
473 &container_id,
474 Some(KillContainerOptions {
475 signal: "SIGTERM".to_string(),
476 }),
477 )
478 .await?;
479
480 tracing::debug!(
481 "Removing container {container_id}",
482 container_id = container_id
483 );
484
485 docker
486 .remove_container(
487 &container_id,
488 Some(RemoveContainerOptions {
489 force: true,
490 v: true,
491 ..Default::default()
492 }),
493 )
494 .await?;
495
496 Ok(())
497 }
498}
499
500fn merge_stream_output(stdout: &str, stderr: &str) -> String {
501 match (stdout.is_empty(), stderr.is_empty()) {
502 (true, true) => String::new(),
503 (false, true) => stdout.to_string(),
504 (true, false) => stderr.to_string(),
505 (false, false) => format!("{stdout}\n{stderr}"),
506 }
507}
508
509fn duration_to_millis(duration: Duration) -> u64 {
510 let millis = duration.as_millis();
511 if millis > u64::MAX as u128 {
512 u64::MAX
513 } else {
514 millis as u64
515 }
516}
517
518impl Drop for RunningDockerExecutor {
519 fn drop(&mut self) {
520 if self.dropped {
521 tracing::debug!(
522 "Executor already dropped; skipping stop and remove for container {}",
523 self.container_id
524 );
525 return;
526 }
527 if self.retain_on_drop {
528 tracing::debug!(
529 "Retaining container {} on drop; not stopping or removing",
530 self.container_id
531 );
532 return;
533 }
534 self.dropped = true;
535 self.cancel_token.cancel();
536
537 let this = self.clone();
538 let container_id = self.container_id.clone();
539
540 tokio::task::spawn_blocking(move || {
541 let handle = tokio::runtime::Handle::current();
542 handle.block_on(async move { this.shutdown().await })
543 });
544 tracing::debug!("Container stopped {container_id}");
545 }
546}