use crate::agent::AgentProvider;
use crate::agent::log_line::LogLine;
use crate::agent::task_logger::TaskLogger;
use crate::context::AppContext;
use crate::context::TaskStorage;
use crate::docker::{DockerManager, image_manager::DockerImageManager};
use crate::git::RepoManager;
use crate::task::{Task, TaskStatus};
use crate::tui::events::{ServerEvent, ServerEventSender, emit_or_print};
use std::sync::Arc;
#[derive(Debug)]
pub struct TaskExecutionResult {
pub branch_name: String,
pub message: String,
}
#[derive(Debug)]
pub struct TaskExecutionError {
pub message: String,
pub is_warmup_failure: bool,
}
impl From<String> for TaskExecutionError {
fn from(message: String) -> Self {
Self {
message,
is_warmup_failure: false,
}
}
}
pub struct TaskRunner {
task_storage: Arc<TaskStorage>,
ctx: AppContext,
repo_manager: RepoManager,
docker_manager: DockerManager,
event_sender: Option<ServerEventSender>,
}
impl TaskRunner {
pub fn new(
ctx: &AppContext,
docker_manager: DockerManager,
event_sender: Option<ServerEventSender>,
) -> Self {
Self {
task_storage: ctx.task_storage(),
ctx: ctx.clone(),
repo_manager: RepoManager::new(ctx),
docker_manager,
event_sender,
}
}
pub async fn store_and_run(
&self,
task: &Task,
) -> Result<TaskExecutionResult, TaskExecutionError> {
let mut stored_task = task.clone();
stored_task.status = TaskStatus::Running;
stored_task.started_at = Some(chrono::Utc::now());
if let Err(e) = self.task_storage.add_task(stored_task).await {
emit_or_print(
&self.event_sender,
ServerEvent::WarningMessage(format!("Error storing task: {e}")),
);
}
self.run_with_lifecycle(task).await
}
pub async fn run_queued(&self, task: &Task) -> Result<TaskExecutionResult, TaskExecutionError> {
if let Err(e) = self.task_storage.mark_running(&task.id).await {
emit_or_print(
&self.event_sender,
ServerEvent::WarningMessage(format!("Error updating task status: {e}")),
);
}
self.run_with_lifecycle(task).await
}
async fn run_with_lifecycle(
&self,
task: &Task,
) -> Result<TaskExecutionResult, TaskExecutionError> {
let result = self.run_in_container(task).await;
match &result {
Ok(exec_result) => {
self.ctx.notification_client().notify_task_complete(
&task.name,
true,
Some(&exec_result.message),
);
if let Err(e) = self
.task_storage
.mark_complete(&task.id, &exec_result.branch_name)
.await
{
emit_or_print(
&self.event_sender,
ServerEvent::WarningMessage(format!("Error updating task status: {e}")),
);
}
}
Err(e) => {
self.ctx.notification_client().notify_task_complete(
&task.name,
false,
Some(&e.message),
);
if let Err(storage_err) = self.task_storage.mark_failed(&task.id, &e.message).await
{
emit_or_print(
&self.event_sender,
ServerEvent::WarningMessage(format!(
"Error updating task status: {storage_err}"
)),
);
}
}
}
result
}
async fn run_in_container(
&self,
task: &Task,
) -> Result<TaskExecutionResult, TaskExecutionError> {
let agent = AgentProvider::get_agent(&task.agent, self.ctx.tsk_env())
.map_err(|e| format!("Error getting agent: {e}"))?;
agent
.validate()
.await
.map_err(|e| format!("Agent validation failed: {e}"))?;
let repo_path = task.copied_repo_path.as_ref().ok_or_else(|| {
format!(
"Task '{}' has no copied repository. This may indicate the task is waiting for its parent to complete.",
task.id
)
})?;
let branch_name = task.branch_name.clone();
let task_logger = match self.ctx.tsk_env().open_agent_log(&task.id) {
Ok(file) => TaskLogger::new(file, self.event_sender.is_some()),
Err(e) => {
emit_or_print(
&self.event_sender,
ServerEvent::WarningMessage(format!("Failed to create agent log: {e}")),
);
TaskLogger::no_file()
}
};
let task_image_manager =
DockerImageManager::new(&self.ctx, self.docker_manager.client(), None);
let resolved_config =
crate::docker::resolve_config_from_task(task, &self.ctx, &self.event_sender);
let docker_image_tag = match task_image_manager
.ensure_image(&crate::docker::image_manager::EnsureImageOptions {
stack: &task.stack,
agent: &task.agent,
project: Some(&task.project),
build_root: Some(repo_path.as_path()),
force_rebuild: true,
logger: &task_logger,
resolved_config: Some(&resolved_config),
})
.await
{
Ok(tag) => tag,
Err(e) => {
let msg = format!("Error ensuring Docker image: {e:?}");
task_logger.log(LogLine::tsk_error(&msg));
return Err(msg.into());
}
};
task_logger.log(LogLine::tsk_success("Docker image ready"));
task_logger.log(LogLine::tsk_message(format!(
"Launching {} agent...",
agent.name()
)));
if let Err(e) = agent.warmup().await {
return Err(TaskExecutionError {
message: format!("Agent warmup failed: {e}"),
is_warmup_failure: true,
});
}
let (_output, task_result) = match self
.docker_manager
.run_task_container(&docker_image_tag, task, agent.as_ref())
.await
{
Ok(result) => result,
Err(e) => {
return Err(format!("Error running container: {e}").into());
}
};
let commit_message = format!("tsk automated changes for task: {}", task.name);
match self
.repo_manager
.commit_changes(repo_path, &commit_message)
.await
{
Ok(commit_warnings) => {
for warning in commit_warnings {
task_logger.log(LogLine::tsk_warning(warning));
}
}
Err(e) => {
task_logger.log(LogLine::tsk_warning(format!(
"Error committing changes: {e}"
)));
}
}
task_logger.log(LogLine::tsk_message(format!(
"Saving changes to {} (branch {})...",
task.repo_root.display(),
branch_name,
)));
match self
.repo_manager
.fetch_changes(
repo_path,
&branch_name,
&task.repo_root,
&task.source_commit,
task.source_branch.as_deref(),
resolved_config.git_town,
)
.await
{
Ok(result) => {
for warning in &result.warnings {
task_logger.log(LogLine::tsk_warning(warning.clone()));
}
if result.has_changes {
task_logger.log(LogLine::tsk_success(format!(
"Branch {branch_name} is now available"
)));
} else {
task_logger.log(LogLine::tsk_message("No changes - branch not created"));
}
}
Err(e) => {
task_logger.log(LogLine::tsk_warning(format!("Error fetching changes: {e}")));
}
}
if task_result.success {
Ok(TaskExecutionResult {
branch_name,
message: task_result.message,
})
} else {
Err(TaskExecutionError {
message: task_result.message,
is_warmup_failure: false,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::git_test_utils::TestGitRepository;
#[tokio::test]
async fn test_store_and_run_success() {
use crate::context::AppContext;
use crate::test_utils::FixedResponseDockerClient;
let test_repo = TestGitRepository::new().unwrap();
test_repo.init_with_commit().unwrap();
test_repo
.create_file(".tsk/tasks/instructions.md", "Test task instructions")
.unwrap();
test_repo.create_file("test.txt", "test content").unwrap();
test_repo.stage_all().unwrap();
test_repo.commit("Add test files").unwrap();
let docker_client = Arc::new(FixedResponseDockerClient {
logs_output: "Test output".to_string(),
..Default::default()
});
let ctx = AppContext::builder().build();
let tsk_env = ctx.tsk_env();
let claude_json_path = tsk_env.claude_config_dir().join("..").join(".claude.json");
if let Some(parent) = claude_json_path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(&claude_json_path, "{}").unwrap();
let docker_manager = DockerManager::new(&ctx, docker_client, None);
let task_runner = TaskRunner::new(&ctx, docker_manager, None);
let task_copy_dir = tsk_env.task_dir("test-task-123");
crate::file_system::copy_dir(test_repo.path(), &task_copy_dir)
.await
.unwrap();
let task = Task {
id: "test-task-123".to_string(),
repo_root: test_repo.path().to_path_buf(),
task_type: "feature".to_string(),
instructions_file: task_copy_dir
.join(".tsk/tasks/instructions.md")
.to_string_lossy()
.to_string(),
branch_name: "tsk/feature/test-task/test-task-123".to_string(),
source_commit: test_repo.get_current_commit().unwrap(),
copied_repo_path: Some(task_copy_dir),
..Task::test_default()
};
let result = task_runner.store_and_run(&task).await;
assert!(result.is_ok(), "Error: {:?}", result.as_ref().err());
let execution_result = result.unwrap();
assert!(execution_result.branch_name.contains("test-task"));
let stored = ctx.task_storage().get_task("test-task-123").await.unwrap();
assert_eq!(stored.unwrap().status, TaskStatus::Complete);
}
#[tokio::test]
async fn test_store_and_run_infrastructure_failure() {
use crate::context::AppContext;
use crate::test_utils::FixedResponseDockerClient;
let test_repo = TestGitRepository::new().unwrap();
test_repo.init_with_commit().unwrap();
test_repo
.create_file(".tsk/tasks/instructions.md", "Test task instructions")
.unwrap();
test_repo.create_file("test.txt", "test content").unwrap();
test_repo.stage_all().unwrap();
test_repo.commit("Add test files").unwrap();
let docker_client = Arc::new(FixedResponseDockerClient {
should_fail_start: true,
..Default::default()
});
let ctx = AppContext::builder().build();
let tsk_env = ctx.tsk_env();
let claude_json_path = tsk_env.claude_config_dir().join("..").join(".claude.json");
if let Some(parent) = claude_json_path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(&claude_json_path, "{}").unwrap();
let docker_manager = DockerManager::new(&ctx, docker_client, None);
let task_runner = TaskRunner::new(&ctx, docker_manager, None);
let task_copy_dir = tsk_env.task_dir("infra-fail-123");
crate::file_system::copy_dir(test_repo.path(), &task_copy_dir)
.await
.unwrap();
let task = Task {
id: "infra-fail-123".to_string(),
repo_root: test_repo.path().to_path_buf(),
task_type: "feature".to_string(),
instructions_file: task_copy_dir
.join(".tsk/tasks/instructions.md")
.to_string_lossy()
.to_string(),
branch_name: "tsk/feature/infra-fail/infra-fail-123".to_string(),
source_commit: test_repo.get_current_commit().unwrap(),
copied_repo_path: Some(task_copy_dir),
..Task::test_default()
};
let result = task_runner.store_and_run(&task).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(
error.message.contains("Error running container"),
"Expected infrastructure error, got: {}",
error.message
);
assert!(!error.is_warmup_failure);
let stored = ctx.task_storage().get_task("infra-fail-123").await.unwrap();
assert_eq!(stored.unwrap().status, TaskStatus::Failed);
}
#[tokio::test]
async fn test_store_and_run() {
use crate::context::AppContext;
use crate::context::docker_client::DockerClient;
use crate::test_utils::NoOpDockerClient;
let test_repo = TestGitRepository::new().unwrap();
test_repo.init_with_commit().unwrap();
let ctx = AppContext::builder().build();
let tsk_env = ctx.tsk_env();
let task_id = "store-exec-1".to_string();
let task_dir_path = tsk_env.task_dir(&task_id);
std::fs::create_dir_all(&task_dir_path).unwrap();
let instructions_path = task_dir_path.join("instructions.md");
std::fs::write(&instructions_path, "Test instructions").unwrap();
let task = Task {
id: task_id.clone(),
repo_root: test_repo.path().to_path_buf(),
name: "store-exec-task".to_string(),
branch_name: format!("tsk/{task_id}"),
copied_repo_path: Some(task_dir_path.join("repo")),
..Task::test_default()
};
let docker_client: Arc<dyn DockerClient> = Arc::new(NoOpDockerClient);
let docker_manager = DockerManager::new(&ctx, docker_client, None);
let task_runner = TaskRunner::new(&ctx, docker_manager, None);
let _result = task_runner.store_and_run(&task).await;
let storage = ctx.task_storage();
let stored = storage.get_task(&task_id).await.unwrap();
assert!(stored.is_some(), "Task should exist in storage");
let stored_task = stored.unwrap();
assert_ne!(
stored_task.status,
TaskStatus::Queued,
"Task should not be Queued after store_and_run"
);
assert!(
stored_task.started_at.is_some(),
"Task should have started_at set"
);
let all_tasks = storage.list_tasks().await.unwrap();
assert!(
all_tasks.iter().any(|t| t.id == task_id),
"Task should appear in the list of all tasks"
);
}
}