swiftide_docker_executor/
docker_tool_executor.rsuse anyhow::Context as _;
use async_trait::async_trait;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use swiftide_core::{prelude::StreamExt as _, Command, CommandError, CommandOutput, ToolExecutor};
use tokio::io::AsyncReadExt as _;
use tracing::{error, info};
use uuid::Uuid;
use bollard::{
container::{
Config, CreateContainerOptions, KillContainerOptions, LogOutput, StartContainerOptions,
},
exec::{CreateExecOptions, StartExecResults},
image::BuildImageOptions,
Docker,
};
use ignore::WalkBuilder;
use tokio_tar::{Builder, Header};
use crate::{ContextError, DockerExecutorError};
#[derive(Clone)]
pub struct RunningDockerExecutor {
container_id: String,
docker: Docker,
}
#[derive(Clone, Debug)]
pub struct DockerExecutor {
context_path: PathBuf,
image_name: String,
#[allow(dead_code)]
working_dir: PathBuf,
dockerfile: PathBuf,
container_uuid: Uuid,
}
impl Default for DockerExecutor {
fn default() -> Self {
Self {
container_uuid: Uuid::new_v4(),
context_path: ".".into(),
image_name: "docker-executor".into(),
working_dir: ".".into(),
dockerfile: "Dockerfile".into(),
}
}
}
impl From<RunningDockerExecutor> for Arc<dyn ToolExecutor> {
fn from(val: RunningDockerExecutor) -> Self {
Arc::new(val) as Arc<dyn ToolExecutor>
}
}
impl DockerExecutor {
pub fn with_context_path(&mut self, path: impl Into<PathBuf>) -> &mut Self {
self.context_path = path.into();
self
}
pub fn with_image_name(&mut self, name: impl Into<String>) -> &mut Self {
self.image_name = name.into();
self
}
pub fn with_container_uuid(&mut self, uuid: impl Into<Uuid>) -> &mut Self {
self.container_uuid = uuid.into();
self
}
pub fn with_dockerfile(&mut self, path: impl Into<PathBuf>) -> &mut Self {
self.dockerfile = path.into();
self
}
#[allow(dead_code)]
pub fn with_working_dir(&mut self, path: impl Into<PathBuf>) -> &mut Self {
self.working_dir = path.into();
self
}
pub async fn start(self) -> Result<RunningDockerExecutor, DockerExecutorError> {
RunningDockerExecutor::start(
self.container_uuid,
&self.context_path,
&self.dockerfile,
&self.image_name,
)
.await
}
}
#[async_trait]
impl ToolExecutor for RunningDockerExecutor {
#[tracing::instrument(skip(self), err)]
async fn exec_cmd(&self, cmd: &Command) -> Result<CommandOutput, CommandError> {
match cmd {
Command::Shell(cmd) => self.exec_shell(cmd).await,
Command::ReadFile(path) => self.read_file(path).await,
Command::WriteFile(path, content) => self.write_file(path, content).await,
_ => unimplemented!(),
}
}
}
impl RunningDockerExecutor {
pub async fn start(
container_uuid: Uuid,
context_path: &Path,
dockerfile: &Path,
image_name: &str,
) -> Result<RunningDockerExecutor, DockerExecutorError> {
let docker = Docker::connect_with_socket_defaults()?;
tracing::warn!(
"Creating archive for context from {}",
context_path.display()
);
let context = build_context_as_tar(context_path).await?;
let image_name = format!("kwaak-{image_name}");
let build_options = BuildImageOptions {
t: image_name.as_str(),
rm: true,
#[allow(clippy::unnecessary_to_owned)]
dockerfile: &dockerfile.to_string_lossy().into_owned(),
..Default::default()
};
tracing::warn!("Building docker image with name {image_name}");
{
let mut build_stream = docker.build_image(build_options, None, Some(context.into()));
while let Some(log) = build_stream.next().await {
match log {
Ok(output) => {
if let Some(stream) = output.stream {
info!("{}", stream);
}
}
Err(e) => error!("Error during build: {:?}", e),
}
}
}
let config = Config {
image: Some(image_name.as_str()),
tty: Some(true),
host_config: Some(bollard::models::HostConfig {
auto_remove: Some(true),
binds: Some(vec![String::from(
"/var/run/docker.sock:/var/run/docker.sock",
)]),
..Default::default()
}),
..Default::default()
};
let container_name = format!("kwaak-{image_name}-{container_uuid}");
let create_options = CreateContainerOptions {
name: container_name.as_str(),
..Default::default()
};
tracing::warn!("Creating container from image {image_name}");
let container_id = docker
.create_container(Some(create_options), config)
.await?
.id;
tracing::warn!("Starting container {container_id}");
docker
.start_container(&container_id, None::<StartContainerOptions<String>>)
.await?;
Ok(RunningDockerExecutor {
container_id,
docker,
})
}
async fn exec_shell(&self, cmd: &str) -> Result<CommandOutput, CommandError> {
let cmd = vec!["sh", "-c", cmd];
tracing::debug!("Executing command {cmd}", cmd = cmd.join(" "));
let exec = self
.docker
.create_exec(
&self.container_id,
CreateExecOptions {
attach_stdout: Some(true),
attach_stderr: Some(true),
cmd: Some(cmd),
..Default::default()
},
)
.await
.context("Failed to create docker exec")?
.id;
let mut stdout = String::new();
let mut stderr = String::new();
if let StartExecResults::Attached { mut output, .. } = self
.docker
.start_exec(&exec, None)
.await
.context("Failed to start docker exec")?
{
while let Some(Ok(msg)) = output.next().await {
match msg {
LogOutput::StdErr { .. } => stderr.push_str(&msg.to_string()),
LogOutput::StdOut { .. } => stdout.push_str(&msg.to_string()),
_ => {
stderr
.push_str("Command appears to wait for input, which is not supported");
break;
}
}
}
} else {
todo!();
}
let exec_inspect = self
.docker
.inspect_exec(&exec)
.await
.context("Failed to inspect docker exec result")?;
let exit_code = exec_inspect.exit_code.unwrap_or(0);
let output = stdout.trim().to_string() + stderr.trim();
if exit_code == 0 {
Ok(output.into())
} else {
Err(CommandError::NonZeroExit(output.into()))
}
}
#[tracing::instrument(skip(self))]
async fn read_file(&self, path: &Path) -> Result<CommandOutput, CommandError> {
self.exec_shell(&format!("cat {}", path.display())).await
}
#[tracing::instrument(skip(self, content))]
async fn write_file(&self, path: &Path, content: &str) -> Result<CommandOutput, CommandError> {
let cmd = indoc::formatdoc! {r#"
cat << 'EOFKWAAK' > {path}
{content}
EOFKWAAK"#,
path = path.display(),
content = content.trim_end()
};
let write_file_result = self.exec_shell(&cmd).await;
if let Err(CommandError::NonZeroExit(write_file)) = &write_file_result {
if ["No such file or directory", "Directory nonexistent"]
.iter()
.any(|&s| write_file.output.contains(s))
{
let path = path.parent().context("No parent directory")?;
let mkdircmd = format!("mkdir -p {}", path.display());
let _ = self.exec_shell(&mkdircmd).await?;
return self.exec_shell(&cmd).await;
}
}
write_file_result
}
}
impl Drop for RunningDockerExecutor {
fn drop(&mut self) {
tracing::warn!(
"Stopping container {container_id}",
container_id = self.container_id
);
let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
self.docker
.kill_container(
&self.container_id,
Some(KillContainerOptions { signal: "SIGKILL" }),
)
.await
})
});
if let Err(e) = result {
tracing::warn!(error = %e, "Error stopping container, might not be stopped");
}
}
}
async fn build_context_as_tar(context_path: &Path) -> Result<Vec<u8>, ContextError> {
let buffer = Vec::new();
let mut tar = Builder::new(buffer);
for entry in WalkBuilder::new(context_path)
.hidden(false)
.add_custom_ignore_filename(".dockerignore")
.build()
{
let entry = entry?;
let path = entry.path();
if path.is_file() {
let mut file = tokio::fs::File::open(path).await?;
let mut buffer_content = Vec::new();
file.read_to_end(&mut buffer_content).await?;
let mut header = Header::new_gnu();
header.set_size(buffer_content.len() as u64);
header.set_mode(0o644);
header.set_cksum();
let relative_path = path.strip_prefix(context_path)?;
tar.append_data(&mut header, relative_path, &*buffer_content)
.await?;
}
}
let result = tar.into_inner().await?;
Ok(result.clone())
}
#[cfg(test)]
mod tests {
use bollard::secret::ContainerStateStatusEnum;
use super::*;
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_runs_docker_and_echos() {
let executor = DockerExecutor::default()
.with_context_path(".")
.with_image_name("tests")
.to_owned()
.start()
.await
.unwrap();
let output = executor
.exec_cmd(&Command::Shell("echo hello".to_string()))
.await
.unwrap();
assert_eq!(output.to_string(), "hello");
}
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_context_present() {
let executor = DockerExecutor::default()
.with_context_path(".")
.with_image_name("tests")
.with_working_dir("/app")
.to_owned()
.start()
.await
.unwrap();
let ls = executor
.exec_cmd(&Command::Shell("ls -a".to_string()))
.await
.unwrap();
assert!(ls.to_string().contains("Cargo.toml"));
}
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_write_and_read_file_with_quotes() {
let content = r#"This is a "test" content with 'quotes' and special characters: \n \t"#;
let path = Path::new("test_file.txt");
let executor = DockerExecutor::default()
.with_context_path(".")
.with_image_name("test-files")
.with_working_dir("/app")
.to_owned()
.start()
.await
.unwrap();
let _ = executor
.exec_cmd(&Command::write_file(path, content))
.await
.unwrap();
let read_file = executor.exec_cmd(&Command::read_file(path)).await.unwrap();
assert_eq!(content, read_file.output);
}
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_write_and_read_file_markdown() {
let content = r#"# Example
```rust
fn main() {
let hello = "world";
println!("Hello, {}", hello);
}
```
```shell
$ cargo run
```"#;
let path = Path::new("test_file.txt");
let executor = DockerExecutor::default()
.with_context_path(".")
.with_image_name("test-files-md")
.with_working_dir("/app")
.to_owned()
.start()
.await
.unwrap();
let _ = executor
.exec_cmd(&Command::write_file(path, content))
.await
.unwrap();
let read_file = executor.exec_cmd(&Command::read_file(path)).await.unwrap();
assert_eq!(content, read_file.output);
}
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_assert_container_stopped_on_drop() {
let executor = DockerExecutor::default()
.with_context_path(".")
.with_image_name("test-drop")
.with_working_dir("/app")
.to_owned()
.start()
.await
.unwrap();
let docker = executor.docker.clone();
let container_id = executor.container_id.clone();
let container = docker.inspect_container(&container_id, None).await.unwrap();
assert_eq!(
container.state.as_ref().unwrap().status,
Some(ContainerStateStatusEnum::RUNNING)
);
drop(executor);
let container = match docker.inspect_container(&container_id, None).await {
Err(e) if e.to_string().contains("No such container") => {
return;
}
Ok(container) => container,
Err(e) => panic!("Error inspecting container: {e}"),
};
let status = container.state.as_ref().unwrap().status;
assert!(
status == Some(ContainerStateStatusEnum::REMOVING)
|| status == Some(ContainerStateStatusEnum::EXITED)
|| status == Some(ContainerStateStatusEnum::DEAD),
"Unexpected container state: {status:?}"
);
}
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_create_file_subdirectory_that_does_not_exist() {
let content = r#"# Example
```rust
fn main() {
let hello = "world";
println!("Hello, {}", hello);
}
```
```shell
$ cargo run
```"#;
let path = Path::new("doesnot/exist/test_file.txt");
let executor = DockerExecutor::default()
.with_context_path(".")
.with_image_name("test-files-missing-dir")
.with_working_dir("/app")
.to_owned()
.start()
.await
.unwrap();
let _ = executor
.exec_cmd(&Command::write_file(path, content))
.await
.unwrap();
let read_file = executor.exec_cmd(&Command::read_file(path)).await.unwrap();
assert_eq!(content, read_file.output);
}
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_custom_dockerfile() {
let context_path = tempfile::tempdir().unwrap();
std::process::Command::new("cp")
.arg("Dockerfile")
.arg(context_path.path().join("Dockerfile.custom"))
.output()
.unwrap();
let executor = DockerExecutor::default()
.with_context_path(context_path.path())
.with_image_name("test-custom")
.with_dockerfile("Dockerfile.custom")
.to_owned()
.start()
.await
.unwrap();
let output = executor
.exec_cmd(&Command::shell("echo hello"))
.await
.unwrap();
assert_eq!(output.to_string(), "hello");
}
}