use std::time::Duration;
use tempfile::TempDir;
use tinytown::message::MessageType;
use tinytown::{
Agent, AgentId, AgentState, AgentType, Message, Priority, Task, TaskId, TaskService, TaskState,
Town,
};
use uuid::Uuid;
struct TownGuard {
town: Town,
temp_dir: TempDir,
}
impl Drop for TownGuard {
fn drop(&mut self) {
cleanup_redis(&self.temp_dir);
}
}
impl std::ops::Deref for TownGuard {
type Target = Town;
fn deref(&self) -> &Self::Target {
&self.town
}
}
async fn create_test_town(name: &str) -> Result<TownGuard, Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let town_name = unique_town_name(name);
let town = Town::init(temp_dir.path(), &town_name).await?;
Ok(TownGuard { town, temp_dir })
}
fn cleanup_redis(temp_dir: &TempDir) {
let pid_file = temp_dir.path().join(".tt/redis.pid");
if let Ok(pid_str) = std::fs::read_to_string(&pid_file)
&& let Ok(pid) = pid_str.trim().parse::<i32>()
{
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
}
fn unique_town_name(prefix: &str) -> String {
format!("{prefix}-{}", Uuid::new_v4())
}
fn reserve_unused_port() -> Result<u16, Box<dyn std::error::Error>> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();
drop(listener);
Ok(port)
}
#[tokio::test]
async fn test_town_initialization() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let town_path = temp_dir.path();
let town_name = unique_town_name("test-town");
let town = Town::init(town_path, &town_name).await?;
assert!(town_path.join(".tt").exists());
assert!(town_path.join(".tt/agents").exists());
assert!(town_path.join(".tt/logs").exists());
assert!(town_path.join(".tt/tasks").exists());
assert!(town_path.join("tinytown.toml").exists());
let config = town.config();
assert_eq!(config.name, town_name);
assert_eq!(config.root, town_path);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_town_connect() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let town_path = temp_dir.path();
let town_name = unique_town_name("connect-test");
let _town1 = Town::init(town_path, &town_name).await?;
let town2 = Town::connect(town_path).await?;
let config = town2.config();
assert_eq!(config.name, town_name);
drop(town2);
drop(_town1);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_supervisor_aliases_resolve_without_spawned_agent()
-> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("supervisor-alias-test").await?;
let msg = Message::new(
AgentId::supervisor(),
AgentId::supervisor(),
MessageType::Informational {
summary: "Human decision needed".to_string(),
},
);
town.channel().send(&msg).await?;
assert_eq!(town.agent("supervisor").await?.id(), AgentId::supervisor());
assert_eq!(town.agent("conductor").await?.id(), AgentId::supervisor());
assert_eq!(town.agent("Conductor").await?.id(), AgentId::supervisor());
let supervisor_inbox = tinytown::MessageService::get_inbox(&town, "supervisor").await?;
let conductor_inbox = tinytown::MessageService::get_inbox(&town, "conductor").await?;
assert_eq!(supervisor_inbox.total_messages, 1);
assert_eq!(conductor_inbox.total_messages, 1);
assert_eq!(supervisor_inbox.agent_id, AgentId::supervisor());
assert_eq!(conductor_inbox.agent_id, AgentId::supervisor());
assert_eq!(
supervisor_inbox.messages[0].summary,
"Human decision needed"
);
assert_eq!(conductor_inbox.messages[0].summary, "Human decision needed");
Ok(())
}
#[tokio::test]
async fn test_reserved_supervisor_names_cannot_be_spawned() -> Result<(), Box<dyn std::error::Error>>
{
let town = create_test_town("supervisor-reserved-name-test").await?;
assert!(matches!(
town.spawn_agent("supervisor", "claude").await,
Err(tinytown::Error::Config(_))
));
assert!(matches!(
town.spawn_agent("conductor", "claude").await,
Err(tinytown::Error::Config(_))
));
assert!(town.spawn_agent("conductor-helper", "claude").await.is_ok());
Ok(())
}
#[tokio::test]
async fn test_agent_spawn() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("agent-spawn-test").await?;
let agent_handle = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent_handle.id();
assert_ne!(agent_id, AgentId::supervisor());
let agent_state = agent_handle.state().await?;
assert!(agent_state.is_some());
let agent = agent_state.unwrap();
assert_eq!(agent.name, "worker-1");
assert_eq!(agent.cli, "claude");
assert_eq!(agent.agent_type, AgentType::Worker);
assert_eq!(agent.state, AgentState::Starting);
assert_eq!(agent.tasks_completed, 0);
Ok(())
}
#[tokio::test]
async fn test_multiple_agents_spawn() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("multi-agent-test").await?;
let agent1 = town.spawn_agent("worker-1", "claude").await?;
let agent2 = town.spawn_agent("worker-2", "gemini").await?;
let agent3 = town.spawn_agent("worker-3", "claude").await?;
assert_ne!(agent1.id(), agent2.id());
assert_ne!(agent2.id(), agent3.id());
assert_ne!(agent1.id(), agent3.id());
let state1 = agent1.state().await?;
let state2 = agent2.state().await?;
let state3 = agent3.state().await?;
assert!(state1.is_some());
assert!(state2.is_some());
assert!(state3.is_some());
Ok(())
}
#[tokio::test]
async fn test_agent_state_update() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("agent-state-test").await?;
let _agent_handle = town.spawn_agent("worker-1", "claude").await?;
let mut agent = Agent::new("worker-1", "claude", AgentType::Worker);
agent.state = AgentState::Idle;
agent.tasks_completed = 5;
town.channel().set_agent_state(&agent).await?;
let retrieved = town.channel().get_agent_state(agent.id).await?;
assert!(retrieved.is_some());
let retrieved_agent = retrieved.unwrap();
assert_eq!(retrieved_agent.state, AgentState::Idle);
assert_eq!(retrieved_agent.tasks_completed, 5);
Ok(())
}
#[tokio::test]
async fn test_message_send() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("message-send-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent.id();
let msg = Message::new(AgentId::supervisor(), agent_id, MessageType::Ping);
town.channel().send(&msg).await?;
let inbox_len = agent.inbox_len().await?;
assert_eq!(inbox_len, 1);
Ok(())
}
#[tokio::test]
async fn test_message_receive() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("message-receive-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent.id();
let original_msg = Message::new(AgentId::supervisor(), agent_id, MessageType::Ping);
town.channel().send(&original_msg).await?;
let received = town.channel().try_receive(agent_id).await?;
assert!(received.is_some());
let msg = received.unwrap();
assert_eq!(msg.id, original_msg.id);
assert_eq!(msg.from, AgentId::supervisor());
assert_eq!(msg.to, agent_id);
Ok(())
}
#[tokio::test]
async fn test_message_priority() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("message-priority-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent.id();
let low_msg = Message::new(AgentId::supervisor(), agent_id, MessageType::Ping)
.with_priority(Priority::Low);
let high_msg = Message::new(AgentId::supervisor(), agent_id, MessageType::Pong)
.with_priority(Priority::High);
town.channel().send(&low_msg).await?;
town.channel().send(&high_msg).await?;
let first = town.channel().try_receive(agent_id).await?.unwrap();
assert_eq!(first.id, high_msg.id);
Ok(())
}
#[tokio::test]
async fn test_message_try_receive() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("message-try-receive-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent.id();
let empty = town.channel().try_receive(agent_id).await?;
assert!(empty.is_none());
let msg = Message::new(AgentId::supervisor(), agent_id, MessageType::Ping);
town.channel().send(&msg).await?;
let received = town.channel().try_receive(agent_id).await?;
assert!(received.is_some());
assert_eq!(received.unwrap().id, msg.id);
Ok(())
}
#[tokio::test]
async fn test_message_correlation() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("message-correlation-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent.id();
let request = Message::new(AgentId::supervisor(), agent_id, MessageType::StatusRequest);
let request_id = request.id;
let response = Message::new(
agent_id,
AgentId::supervisor(),
MessageType::StatusResponse {
state: "idle".to_string(),
current_task: None,
},
)
.with_correlation(request_id);
assert_eq!(response.correlation_id, Some(request_id));
Ok(())
}
#[tokio::test]
async fn test_task_creation() -> Result<(), Box<dyn std::error::Error>> {
let task = Task::new("Fix the bug in auth.rs");
assert_eq!(task.description, "Fix the bug in auth.rs");
assert_eq!(task.state, TaskState::Pending);
assert!(task.assigned_to.is_none());
assert!(task.result.is_none());
assert!(task.completed_at.is_none());
Ok(())
}
#[tokio::test]
async fn test_task_assignment() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("task-assignment-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let mut task = Task::new("Implement feature X");
task.assign(agent.id());
let task_id = agent.assign(task).await?;
let stored_task = town.channel().get_task(task_id).await?;
assert!(stored_task.is_some());
let stored = stored_task.unwrap();
assert_eq!(stored.description, "Implement feature X");
assert_eq!(stored.state, TaskState::Assigned);
assert_eq!(stored.assigned_to, Some(agent.id()));
Ok(())
}
#[tokio::test]
async fn test_multiple_task_assignment() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("multi-task-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let task1 = Task::new("Task 1");
let task2 = Task::new("Task 2");
let task3 = Task::new("Task 3");
let id1 = agent.assign(task1).await?;
let id2 = agent.assign(task2).await?;
let id3 = agent.assign(task3).await?;
let stored1 = town.channel().get_task(id1).await?;
let stored2 = town.channel().get_task(id2).await?;
let stored3 = town.channel().get_task(id3).await?;
assert!(stored1.is_some());
assert!(stored2.is_some());
assert!(stored3.is_some());
assert_eq!(stored1.unwrap().description, "Task 1");
assert_eq!(stored2.unwrap().description, "Task 2");
assert_eq!(stored3.unwrap().description, "Task 3");
Ok(())
}
#[tokio::test]
async fn test_task_state_transitions() -> Result<(), Box<dyn std::error::Error>> {
let _town = create_test_town("task-state-test").await?;
let mut task = Task::new("Test task");
assert_eq!(task.state, TaskState::Pending);
let agent_id = AgentId::new();
task.assign(agent_id);
assert_eq!(task.state, TaskState::Assigned);
assert_eq!(task.assigned_to, Some(agent_id));
task.start();
assert_eq!(task.state, TaskState::Running);
assert!(task.started_at.is_some());
task.complete("Task completed successfully");
assert_eq!(task.state, TaskState::Completed);
assert_eq!(task.result, Some("Task completed successfully".to_string()));
assert!(task.completed_at.is_some());
Ok(())
}
#[tokio::test]
async fn test_task_failure() -> Result<(), Box<dyn std::error::Error>> {
let mut task = Task::new("Failing task");
task.assign(AgentId::new());
task.start();
task.fail("Connection timeout");
assert_eq!(task.state, TaskState::Failed);
assert_eq!(task.result, Some("Connection timeout".to_string()));
assert!(task.completed_at.is_some());
Ok(())
}
#[tokio::test]
async fn test_task_tags() -> Result<(), Box<dyn std::error::Error>> {
let task = Task::new("Implement API endpoint").with_tags(vec!["backend", "api", "urgent"]);
assert_eq!(task.tags.len(), 3);
assert!(task.tags.contains(&"backend".to_string()));
assert!(task.tags.contains(&"api".to_string()));
assert!(task.tags.contains(&"urgent".to_string()));
Ok(())
}
#[tokio::test]
async fn test_task_hierarchy() -> Result<(), Box<dyn std::error::Error>> {
let parent_id = TaskId::new();
let child_task = Task::new("Subtask").with_parent(parent_id);
assert_eq!(child_task.parent_id, Some(parent_id));
Ok(())
}
#[tokio::test]
async fn test_task_persistence() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("task-persistence-test").await?;
let mut task = Task::new("Persistent task");
let agent_id = AgentId::new();
task.assign(agent_id);
town.channel().set_task(&task).await?;
let retrieved = town.channel().get_task(task.id).await?;
assert!(retrieved.is_some());
let retrieved_task = retrieved.unwrap();
assert_eq!(retrieved_task.description, "Persistent task");
assert_eq!(retrieved_task.state, TaskState::Assigned);
assert_eq!(retrieved_task.assigned_to, Some(agent_id));
Ok(())
}
#[tokio::test]
async fn test_current_task_resolves_real_task_id_over_description_uuid()
-> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("current-task-real-id-test").await?;
let agent = town.spawn_agent("frontend", "claude").await?;
let mission_uuid = "c7d2e4dd-30e5-48e5-8bfc-95d5f14b13bf";
let mut task = Task::new(format!(
"Mission {}: implement GitHub issue #5 and keep the UI stable",
mission_uuid
));
task.assign(agent.id());
let task_id = agent.assign(task).await?;
assert_ne!(task_id.to_string(), mission_uuid);
TaskService::set_current_for_agent(town.channel(), agent.id(), task_id).await?;
let current = TaskService::current_for_agent(town.channel(), agent.id())
.await?
.expect("tracked current task");
assert_eq!(current.id, task_id);
assert!(current.description.contains(mission_uuid));
assert_eq!(current.assigned_to, Some(agent.id()));
assert_eq!(current.state, TaskState::Assigned);
Ok(())
}
#[tokio::test]
async fn test_complete_clears_current_task_and_increments_agent_stats()
-> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("current-task-complete-test").await?;
let agent = town.spawn_agent("backend", "claude").await?;
let mut task = Task::new("Implement API endpoint");
task.assign(agent.id());
let task_id = agent.assign(task).await?;
TaskService::set_current_for_agent(town.channel(), agent.id(), task_id).await?;
let completion = TaskService::complete(
town.channel(),
task_id,
Some("Implemented API endpoint".to_string()),
)
.await?
.expect("completed task");
assert_eq!(completion.task.id, task_id);
assert_eq!(completion.task.state, TaskState::Completed);
assert_eq!(completion.result, "Implemented API endpoint");
assert!(completion.cleared_current_task);
assert_eq!(completion.tasks_completed, Some(1));
let agent_state = town
.channel()
.get_agent_state(agent.id())
.await?
.expect("agent state");
assert_eq!(agent_state.current_task, None);
assert_eq!(agent_state.tasks_completed, 1);
let current = TaskService::current_for_agent(town.channel(), agent.id()).await?;
assert!(current.is_none());
let stored_task = town
.channel()
.get_task(task_id)
.await?
.expect("stored completed task");
assert_eq!(stored_task.state, TaskState::Completed);
assert_eq!(
stored_task.result,
Some("Implemented API endpoint".to_string())
);
Ok(())
}
#[tokio::test]
async fn test_complete_workflow() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("complete-workflow-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let mut task = Task::new("Implement new feature");
task.assign(agent.id());
let task_id = agent.assign(task).await?;
let stored_task = town.channel().get_task(task_id).await?;
assert!(stored_task.is_some());
assert_eq!(stored_task.unwrap().state, TaskState::Assigned);
agent.send(MessageType::StatusRequest).await?;
let inbox_len = agent.inbox_len().await?;
assert_eq!(inbox_len, 2);
Ok(())
}
#[tokio::test]
async fn test_agent_state_transitions() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("agent-transitions-test").await?;
let agent_handle = town.spawn_agent("worker-1", "claude").await?;
let initial = agent_handle.state().await?;
assert_eq!(initial.unwrap().state, AgentState::Starting);
let mut agent = Agent::new("worker-1", "claude", AgentType::Worker);
agent.id = agent_handle.id();
agent.state = AgentState::Idle;
town.channel().set_agent_state(&agent).await?;
let idle = agent_handle.state().await?;
assert_eq!(idle.unwrap().state, AgentState::Idle);
agent.state = AgentState::Working;
town.channel().set_agent_state(&agent).await?;
let working = agent_handle.state().await?;
assert_eq!(working.unwrap().state, AgentState::Working);
Ok(())
}
#[tokio::test]
async fn test_task_lifecycle_with_agent() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("task-lifecycle-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let mut task = Task::new("Complete task");
assert_eq!(task.state, TaskState::Pending);
task.assign(agent.id());
assert_eq!(task.state, TaskState::Assigned);
town.channel().set_task(&task).await?;
task.start();
town.channel().set_task(&task).await?;
let running = town.channel().get_task(task.id).await?;
assert_eq!(running.unwrap().state, TaskState::Running);
task.complete("Successfully completed");
town.channel().set_task(&task).await?;
let completed = town.channel().get_task(task.id).await?;
let completed_task = completed.unwrap();
assert_eq!(completed_task.state, TaskState::Completed);
assert_eq!(
completed_task.result,
Some("Successfully completed".to_string())
);
Ok(())
}
#[tokio::test]
async fn test_message_inbox_ordering() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("inbox-ordering-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent.id();
for i in 0..3 {
let msg = Message::new(
AgentId::supervisor(),
agent_id,
MessageType::Custom {
kind: "test".to_string(),
payload: format!("message-{}", i),
},
);
town.channel().send(&msg).await?;
}
let inbox_len = agent.inbox_len().await?;
assert_eq!(inbox_len, 3);
let _msg1 = town.channel().try_receive(agent_id).await?.unwrap();
let _msg2 = town.channel().try_receive(agent_id).await?.unwrap();
let _msg3 = town.channel().try_receive(agent_id).await?.unwrap();
let final_len = agent.inbox_len().await?;
assert_eq!(final_len, 0);
Ok(())
}
#[tokio::test]
async fn test_agent_wait_timeout() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("agent-wait-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let mut agent_state = Agent::new("worker-1", "claude", AgentType::Worker);
agent_state.id = agent.id();
agent_state.state = AgentState::Idle;
town.channel().set_agent_state(&agent_state).await?;
let start = std::time::Instant::now();
agent.wait().await?;
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_secs(1));
Ok(())
}
#[tokio::test]
async fn test_task_not_found() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("task-not-found-test").await?;
let fake_id = TaskId::new();
let result = town.channel().get_task(fake_id).await?;
assert!(result.is_none());
Ok(())
}
#[tokio::test]
async fn test_agent_state_not_found() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("agent-state-not-found-test").await?;
let fake_id = AgentId::new();
let result = town.channel().get_agent_state(fake_id).await?;
assert!(result.is_none());
Ok(())
}
#[tokio::test]
async fn test_message_receive_timeout() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("message-timeout-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let start = std::time::Instant::now();
let result = town
.channel()
.receive(agent.id(), Duration::from_millis(100))
.await?;
let elapsed = start.elapsed();
assert!(result.is_none());
assert!(elapsed >= Duration::from_millis(100));
Ok(())
}
#[tokio::test]
async fn test_task_terminal_states() -> Result<(), Box<dyn std::error::Error>> {
let mut task1 = Task::new("Task 1");
task1.complete("Done");
assert!(task1.state.is_terminal());
let mut task2 = Task::new("Task 2");
task2.fail("Error");
assert!(task2.state.is_terminal());
let task3 = Task::new("Task 3");
assert!(!task3.state.is_terminal());
Ok(())
}
#[tokio::test]
async fn test_agent_can_accept_work() -> Result<(), Box<dyn std::error::Error>> {
assert!(AgentState::Idle.can_accept_work());
assert!(!AgentState::Working.can_accept_work());
assert!(!AgentState::Paused.can_accept_work());
assert!(!AgentState::Starting.can_accept_work());
Ok(())
}
#[tokio::test]
async fn test_agent_terminal_states() -> Result<(), Box<dyn std::error::Error>> {
assert!(AgentState::Stopped.is_terminal());
assert!(AgentState::Error.is_terminal());
assert!(!AgentState::Idle.is_terminal());
assert!(!AgentState::Working.is_terminal());
Ok(())
}
#[tokio::test]
async fn test_many_agents() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("many-agents-test").await?;
let mut agent_ids = Vec::new();
for i in 0..10 {
let agent = town.spawn_agent(&format!("worker-{}", i), "claude").await?;
agent_ids.push(agent.id());
}
let unique_count = agent_ids
.iter()
.collect::<std::collections::HashSet<_>>()
.len();
assert_eq!(unique_count, 10);
Ok(())
}
#[tokio::test]
async fn test_many_tasks() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("many-tasks-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let mut task_ids = Vec::new();
for i in 0..20 {
let task = Task::new(format!("Task {}", i));
let task_id = agent.assign(task).await?;
task_ids.push(task_id);
}
let unique_count = task_ids
.iter()
.collect::<std::collections::HashSet<_>>()
.len();
assert_eq!(unique_count, 20);
Ok(())
}
#[tokio::test]
async fn test_many_messages() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("many-messages-test").await?;
let agent = town.spawn_agent("worker-1", "claude").await?;
let agent_id = agent.id();
for i in 0..50 {
let msg = Message::new(
AgentId::supervisor(),
agent_id,
MessageType::Custom {
kind: "test".to_string(),
payload: format!("msg-{}", i),
},
);
town.channel().send(&msg).await?;
}
let inbox_len = agent.inbox_len().await?;
assert_eq!(inbox_len, 50);
Ok(())
}
#[tokio::test]
async fn test_plan_init_tasks_file() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
tinytown::plan::init_tasks_file(temp_dir.path())?;
let tasks_file = temp_dir.path().join("tasks.toml");
assert!(tasks_file.exists());
let tasks = tinytown::plan::load_tasks_file(temp_dir.path())?;
assert_eq!(tasks.meta.description, "Task plan for this project");
assert_eq!(tasks.tasks.len(), 1);
assert_eq!(tasks.tasks[0].id, "example-1");
assert_eq!(tasks.tasks[0].status, "pending");
Ok(())
}
#[tokio::test]
async fn test_plan_load_save_tasks_file() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::plan::{TaskEntry, TasksFile, TasksMeta};
let temp_dir = TempDir::new()?;
let tasks = TasksFile {
meta: TasksMeta {
description: "Test plan".to_string(),
default_agent: Some("developer".to_string()),
},
tasks: vec![
TaskEntry {
id: "task-1".to_string(),
description: "Build the API".to_string(),
agent: Some("backend".to_string()),
status: "pending".to_string(),
tags: vec!["api".to_string(), "backend".to_string()],
parent: None,
},
TaskEntry {
id: "task-2".to_string(),
description: "Write tests".to_string(),
agent: Some("tester".to_string()),
status: "pending".to_string(),
tags: vec!["tests".to_string()],
parent: Some("task-1".to_string()),
},
],
};
tinytown::plan::save_tasks_file(temp_dir.path(), &tasks)?;
let loaded = tinytown::plan::load_tasks_file(temp_dir.path())?;
assert_eq!(loaded.meta.description, "Test plan");
assert_eq!(loaded.meta.default_agent, Some("developer".to_string()));
assert_eq!(loaded.tasks.len(), 2);
assert_eq!(loaded.tasks[0].id, "task-1");
assert_eq!(loaded.tasks[0].agent, Some("backend".to_string()));
assert_eq!(loaded.tasks[1].parent, Some("task-1".to_string()));
Ok(())
}
#[tokio::test]
async fn test_plan_push_to_redis() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("plan-push-test").await?;
let town_path = town.config().root.clone();
tinytown::plan::init_tasks_file(&town_path)?;
let tasks = tinytown::plan::TasksFile {
meta: tinytown::plan::TasksMeta {
description: "Push test".to_string(),
default_agent: None,
},
tasks: vec![tinytown::plan::TaskEntry {
id: "push-task-1".to_string(),
description: "Task to push".to_string(),
agent: None,
status: "pending".to_string(),
tags: vec!["test".to_string()],
parent: None,
}],
};
tinytown::plan::save_tasks_file(&town_path, &tasks)?;
let count = tinytown::plan::push_tasks_to_redis(&town_path, town.channel()).await?;
assert_eq!(count, 1);
Ok(())
}
#[tokio::test]
async fn test_default_cli_config() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::GlobalConfig;
let town = create_test_town("default-cli-test").await?;
let config = town.config();
let global = GlobalConfig::load().unwrap_or_default();
assert!(!config.default_cli.is_empty());
assert_eq!(
config.default_cli,
config.resolve_cli_name(&global.default_cli)
);
assert_eq!(
config.conductor_cli_name(),
global
.conductor_cli
.as_deref()
.unwrap_or(global.default_cli.as_str())
);
assert!(config.agent_clis.contains_key("claude"));
assert!(config.agent_clis.contains_key("auggie"));
assert!(config.agent_clis.contains_key("codex"));
Ok(())
}
#[tokio::test]
async fn test_detect_orphaned_agents_working_state() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("recovery-working-test").await?;
let agent_handle = town.spawn_agent("orphaned-worker", "claude").await?;
let agent_id = agent_handle.id();
let mut agent = Agent::new("orphaned-worker", "claude", AgentType::Worker);
agent.id = agent_id;
agent.state = AgentState::Working;
agent.last_heartbeat = chrono::Utc::now() - chrono::Duration::minutes(3);
town.channel().set_agent_state(&agent).await?;
let agents = town.list_agents().await;
let orphaned = agents.iter().find(|a| a.id == agent_id);
assert!(orphaned.is_some());
assert_eq!(orphaned.unwrap().state, AgentState::Working);
let heartbeat_age = chrono::Utc::now() - orphaned.unwrap().last_heartbeat;
assert!(heartbeat_age.num_seconds() > 120);
Ok(())
}
#[tokio::test]
async fn test_detect_orphaned_agents_starting_state() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("recovery-starting-test").await?;
let agent_handle = town.spawn_agent("stuck-starting", "auggie").await?;
let agent_id = agent_handle.id();
let mut agent = Agent::new("stuck-starting", "auggie", AgentType::Worker);
agent.id = agent_id;
agent.state = AgentState::Starting;
agent.last_heartbeat = chrono::Utc::now() - chrono::Duration::minutes(5);
town.channel().set_agent_state(&agent).await?;
let state = agent_handle.state().await?;
assert!(state.is_some());
assert_eq!(state.unwrap().state, AgentState::Starting);
Ok(())
}
#[tokio::test]
async fn test_non_active_agents_not_orphaned() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("recovery-non-active-test").await?;
let stopped_handle = town.spawn_agent("stopped-agent", "claude").await?;
let paused_handle = town.spawn_agent("paused-agent", "claude").await?;
let mut stopped_agent = Agent::new("stopped-agent", "claude", AgentType::Worker);
stopped_agent.id = stopped_handle.id();
stopped_agent.state = AgentState::Stopped;
stopped_agent.last_heartbeat = chrono::Utc::now() - chrono::Duration::minutes(10);
town.channel().set_agent_state(&stopped_agent).await?;
let mut paused_agent = Agent::new("paused-agent", "claude", AgentType::Worker);
paused_agent.id = paused_handle.id();
paused_agent.state = AgentState::Paused;
paused_agent.last_heartbeat = chrono::Utc::now() - chrono::Duration::minutes(10);
town.channel().set_agent_state(&paused_agent).await?;
let agents = town.list_agents().await;
for agent in &agents {
let is_active_state = matches!(
agent.state,
AgentState::Working | AgentState::Starting | AgentState::Idle | AgentState::Draining
);
assert!(
!is_active_state,
"Agent {} should not be in a recoverable state",
agent.name
);
}
Ok(())
}
#[tokio::test]
async fn test_recover_agent_to_stopped() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("recovery-transition-test").await?;
let agent_handle = town.spawn_agent("recoverable", "claude").await?;
let agent_id = agent_handle.id();
let mut agent = Agent::new("recoverable", "claude", AgentType::Worker);
agent.id = agent_id;
agent.state = AgentState::Working;
agent.last_heartbeat = chrono::Utc::now() - chrono::Duration::minutes(3);
town.channel().set_agent_state(&agent).await?;
let state_before = agent_handle.state().await?;
assert_eq!(state_before.unwrap().state, AgentState::Working);
agent.state = AgentState::Stopped;
town.channel().set_agent_state(&agent).await?;
let state_after = agent_handle.state().await?;
assert_eq!(state_after.unwrap().state, AgentState::Stopped);
Ok(())
}
#[tokio::test]
async fn test_recovery_service_recovers_stale_idle_agent() -> Result<(), Box<dyn std::error::Error>>
{
let town = create_test_town("recovery-idle-service-test").await?;
let agent_handle = town.spawn_agent("recoverable-idle", "claude").await?;
let agent_id = agent_handle.id();
let mut agent = Agent::new("recoverable-idle", "claude", AgentType::Worker);
agent.id = agent_id;
agent.state = AgentState::Idle;
agent.last_heartbeat = chrono::Utc::now() - chrono::Duration::minutes(3);
town.channel().set_agent_state(&agent).await?;
let recover_result = tinytown::RecoveryService::recover(&town, town.root()).await?;
assert_eq!(recover_result.agents_recovered, 1);
assert_eq!(recover_result.recovered_agents.len(), 1);
assert_eq!(recover_result.recovered_agents[0].id, agent_id);
assert_eq!(recover_result.recovered_agents[0].state, AgentState::Idle);
let recovered_state = agent_handle.state().await?;
assert_eq!(recovered_state.unwrap().state, AgentState::Stopped);
Ok(())
}
#[tokio::test]
async fn test_recovery_service_skips_healthy_idle_agent() -> Result<(), Box<dyn std::error::Error>>
{
let town = create_test_town("recovery-idle-healthy-test").await?;
let agent_handle = town.spawn_agent("healthy-idle", "claude").await?;
let agent_id = agent_handle.id();
let mut agent = Agent::new("healthy-idle", "claude", AgentType::Worker);
agent.id = agent_id;
agent.state = AgentState::Idle;
agent.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(30);
town.channel().set_agent_state(&agent).await?;
let recover_result = tinytown::RecoveryService::recover(&town, town.root()).await?;
assert_eq!(recover_result.agents_recovered, 0);
let state_after = agent_handle.state().await?;
assert_eq!(state_after.unwrap().state, AgentState::Idle);
Ok(())
}
#[tokio::test]
async fn test_no_orphans_when_healthy() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("recovery-healthy-test").await?;
let handle1 = town.spawn_agent("healthy-1", "claude").await?;
let handle2 = town.spawn_agent("healthy-2", "auggie").await?;
let mut agent1 = Agent::new("healthy-1", "claude", AgentType::Worker);
agent1.id = handle1.id();
agent1.state = AgentState::Working;
agent1.last_heartbeat = chrono::Utc::now() - chrono::Duration::seconds(30);
town.channel().set_agent_state(&agent1).await?;
let mut agent2 = Agent::new("healthy-2", "auggie", AgentType::Worker);
agent2.id = handle2.id();
agent2.state = AgentState::Idle;
agent2.last_heartbeat = chrono::Utc::now();
town.channel().set_agent_state(&agent2).await?;
let agents = town.list_agents().await;
let mut orphan_count = 0;
for agent in &agents {
let is_active = matches!(
agent.state,
AgentState::Working | AgentState::Starting | AgentState::Idle | AgentState::Draining
);
if is_active {
let heartbeat_age = chrono::Utc::now() - agent.last_heartbeat;
if heartbeat_age.num_seconds() > 120 {
orphan_count += 1;
}
}
}
assert_eq!(
orphan_count, 0,
"No agents should be orphaned when heartbeats are recent"
);
Ok(())
}
#[tokio::test]
async fn test_towns_toml_format() -> Result<(), Box<dyn std::error::Error>> {
let toml_content = r#"
[[towns]]
path = "/path/to/town1"
name = "my-town"
[[towns]]
path = "/path/to/town2"
name = "another-town"
"#;
#[derive(Debug, Clone, serde::Deserialize)]
struct TownEntry {
path: String,
name: String,
}
#[derive(Debug, Clone, serde::Deserialize)]
struct TownsFile {
towns: Vec<TownEntry>,
}
let parsed: TownsFile = toml::from_str(toml_content)?;
assert_eq!(parsed.towns.len(), 2);
assert_eq!(parsed.towns[0].name, "my-town");
assert_eq!(parsed.towns[0].path, "/path/to/town1");
assert_eq!(parsed.towns[1].name, "another-town");
assert_eq!(parsed.towns[1].path, "/path/to/town2");
Ok(())
}
#[tokio::test]
async fn test_empty_towns_toml() -> Result<(), Box<dyn std::error::Error>> {
#[derive(Debug, Clone, serde::Deserialize, Default)]
struct TownsFile {
#[serde(default)]
towns: Vec<TownEntry>,
}
#[allow(dead_code)]
#[derive(Debug, Clone, serde::Deserialize)]
struct TownEntry {
path: String,
name: String,
}
let empty_content = "";
let parsed: TownsFile = toml::from_str(empty_content).unwrap_or_default();
assert_eq!(parsed.towns.len(), 0);
let explicit_empty = "towns = []";
let parsed2: TownsFile = toml::from_str(explicit_empty)?;
assert_eq!(parsed2.towns.len(), 0);
Ok(())
}
#[tokio::test]
async fn test_global_config_dir_constant() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::global_config::GLOBAL_CONFIG_DIR;
assert_eq!(GLOBAL_CONFIG_DIR, ".tt");
Ok(())
}
#[tokio::test]
async fn test_global_config_defaults() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::global_config::GlobalConfig;
let config = GlobalConfig::default();
assert!(config.agent_clis.is_empty());
let toml_str = r#"
default_cli = "claude"
conductor_cli = "codex"
"#;
let parsed: GlobalConfig = toml::from_str(toml_str)?;
assert_eq!(parsed.default_cli, "claude");
assert_eq!(parsed.conductor_cli.as_deref(), Some("codex"));
Ok(())
}
#[tokio::test]
async fn test_town_config_supports_separate_conductor_cli() -> Result<(), Box<dyn std::error::Error>>
{
use tinytown::Config;
let temp_dir = TempDir::new()?;
let town_path = temp_dir.path();
let config_path = town_path.join("tinytown.toml");
std::fs::write(
&config_path,
r#"
name = "split-cli-test"
default_cli = "codex-mini"
conductor_cli = "codex"
"#,
)?;
let config = Config::load(town_path)?;
assert_eq!(config.default_cli, "codex-mini");
assert_eq!(config.conductor_cli.as_deref(), Some("codex"));
assert_eq!(config.conductor_cli_name(), "codex");
Ok(())
}
#[tokio::test]
async fn test_town_init_creates_structure() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let town_path = temp_dir.path();
let town_name = unique_town_name("init-structure-test");
let _town = Town::init(town_path, &town_name).await?;
assert!(town_path.join(".tt").exists());
assert!(town_path.join(".tt/agents").exists());
assert!(town_path.join(".tt/logs").exists());
assert!(town_path.join(".tt/tasks").exists());
let toml_config = town_path.join("tinytown.toml");
let json_config = town_path.join("tinytown.json");
assert!(toml_config.exists() || json_config.exists());
drop(_town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_town_status_info() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("status-info-test").await?;
let _agent1 = town.spawn_agent("worker-1", "claude").await?;
let _agent2 = town.spawn_agent("worker-2", "auggie").await?;
let agents = town.list_agents().await;
assert_eq!(agents.len(), 2);
let config = town.config();
assert!(config.name.starts_with("status-info-test-"));
Ok(())
}
#[tokio::test]
async fn test_town_agent_activity_report() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("activity-report-test").await?;
let active_handle = town.spawn_agent("active-worker", "claude").await?;
let idle_handle = town.spawn_agent("idle-worker", "claude").await?;
let mut active_agent = Agent::new("active-worker", "claude", AgentType::Worker);
active_agent.id = active_handle.id();
active_agent.state = AgentState::Working;
active_agent.last_heartbeat = chrono::Utc::now();
town.channel().set_agent_state(&active_agent).await?;
let mut idle_agent = Agent::new("idle-worker", "claude", AgentType::Worker);
idle_agent.id = idle_handle.id();
idle_agent.state = AgentState::Idle;
idle_agent.last_heartbeat = chrono::Utc::now();
town.channel().set_agent_state(&idle_agent).await?;
let agents = town.list_agents().await;
let working_count = agents
.iter()
.filter(|a| a.state == AgentState::Working)
.count();
let idle_count = agents
.iter()
.filter(|a| a.state == AgentState::Idle)
.count();
assert_eq!(working_count, 1);
assert_eq!(idle_count, 1);
Ok(())
}
#[tokio::test]
async fn test_backlog_add_list_claim() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("backlog-test").await?;
let channel = town.channel();
let backlog = channel.backlog_list().await?;
assert!(backlog.is_empty());
assert_eq!(channel.backlog_len().await?, 0);
let task1_id = TaskId::new();
let task2_id = TaskId::new();
let task3_id = TaskId::new();
channel.backlog_push(task1_id).await?;
channel.backlog_push(task2_id).await?;
channel.backlog_push(task3_id).await?;
assert_eq!(channel.backlog_len().await?, 3);
let backlog = channel.backlog_list().await?;
assert_eq!(backlog.len(), 3);
assert_eq!(backlog[0], task1_id);
assert_eq!(backlog[1], task2_id);
assert_eq!(backlog[2], task3_id);
let claimed = channel.backlog_pop().await?;
assert!(claimed.is_some());
assert_eq!(claimed.unwrap(), task1_id);
assert_eq!(channel.backlog_len().await?, 2);
let removed = channel.backlog_remove(task3_id).await?;
assert!(removed);
assert_eq!(channel.backlog_len().await?, 1);
let backlog = channel.backlog_list().await?;
assert_eq!(backlog.len(), 1);
assert_eq!(backlog[0], task2_id);
let claimed = channel.backlog_pop().await?;
assert_eq!(claimed, Some(task2_id));
assert_eq!(channel.backlog_len().await?, 0);
let empty_pop = channel.backlog_pop().await?;
assert!(empty_pop.is_none());
Ok(())
}
#[tokio::test]
async fn test_reclaim_from_dead_agent() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("reclaim-test").await?;
let channel = town.channel();
let agent_handle = town.spawn_agent("dead-worker", "claude").await?;
let agent_id = agent_handle.id();
let msg1 = Message::new(AgentId::supervisor(), agent_id, MessageType::Ping);
let msg2 = Message::new(
AgentId::supervisor(),
agent_id,
MessageType::TaskAssign {
task_id: "task-1".to_string(),
},
);
let msg3 = Message::new(
AgentId::supervisor(),
agent_id,
MessageType::TaskAssign {
task_id: "task-2".to_string(),
},
);
channel.send(&msg1).await?;
channel.send(&msg2).await?;
channel.send(&msg3).await?;
let inbox_len = agent_handle.inbox_len().await?;
assert_eq!(inbox_len, 3);
let mut agent = Agent::new("dead-worker", "claude", AgentType::Worker);
agent.id = agent_id;
agent.state = AgentState::Stopped;
channel.set_agent_state(&agent).await?;
let drained = channel.drain_inbox(agent_id).await?;
assert_eq!(drained.len(), 3);
assert_eq!(drained[0].id, msg1.id);
assert_eq!(drained[1].id, msg2.id);
assert_eq!(drained[2].id, msg3.id);
let inbox_len = agent_handle.inbox_len().await?;
assert_eq!(inbox_len, 0);
Ok(())
}
#[tokio::test]
async fn test_reclaim_to_backlog() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("reclaim-backlog-test").await?;
let channel = town.channel();
let agent_handle = town.spawn_agent("failing-worker", "claude").await?;
let agent_id = agent_handle.id();
let task1_id = TaskId::new();
let task2_id = TaskId::new();
let msg1 = Message::new(
AgentId::supervisor(),
agent_id,
MessageType::TaskAssign {
task_id: task1_id.to_string(),
},
);
let msg2 = Message::new(
AgentId::supervisor(),
agent_id,
MessageType::TaskAssign {
task_id: task2_id.to_string(),
},
);
channel.send(&msg1).await?;
channel.send(&msg2).await?;
let mut agent = Agent::new("failing-worker", "claude", AgentType::Worker);
agent.id = agent_id;
agent.state = AgentState::Error;
channel.set_agent_state(&agent).await?;
let drained = channel.drain_inbox(agent_id).await?;
for msg in &drained {
if let MessageType::TaskAssign { task_id: task_str } = &msg.msg_type
&& let Ok(task_id) = task_str.parse::<TaskId>()
{
channel.backlog_push(task_id).await?;
}
}
let backlog = channel.backlog_list().await?;
assert_eq!(backlog.len(), 2);
assert_eq!(backlog[0], task1_id);
assert_eq!(backlog[1], task2_id);
Ok(())
}
#[tokio::test]
async fn test_move_message_to_inbox() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("move-message-test").await?;
let channel = town.channel();
let agent1 = town.spawn_agent("worker-1", "claude").await?;
let agent2 = town.spawn_agent("worker-2", "claude").await?;
let agent1_id = agent1.id();
let agent2_id = agent2.id();
let original_msg = Message::new(
AgentId::supervisor(),
agent1_id,
MessageType::TaskAssign {
task_id: "important-task".to_string(),
},
);
channel.send(&original_msg).await?;
assert_eq!(agent1.inbox_len().await?, 1);
assert_eq!(agent2.inbox_len().await?, 0);
let drained = channel.drain_inbox(agent1_id).await?;
assert_eq!(drained.len(), 1);
channel
.move_message_to_inbox(&drained[0], agent2_id)
.await?;
assert_eq!(agent1.inbox_len().await?, 0);
assert_eq!(agent2.inbox_len().await?, 1);
let received = channel.try_receive(agent2_id).await?;
assert!(received.is_some());
let msg = received.unwrap();
match msg.msg_type {
MessageType::TaskAssign { task_id } => assert_eq!(task_id, "important-task"),
_ => panic!("Expected TaskAssign message type"),
}
Ok(())
}
#[tokio::test]
async fn test_redis_url_unix_socket() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis = RedisConfig {
url: None,
use_socket: true,
socket_path: "redis.sock".to_string(),
host: "127.0.0.1".to_string(),
port: 6379,
persist: false,
aof_path: "redis.aof".to_string(),
password: None,
tls_enabled: false,
tls_cert: None,
tls_key: None,
tls_ca_cert: None,
bind: "127.0.0.1".to_string(),
};
assert!(config.redis.use_socket);
let url = config.redis_url();
assert!(
url.starts_with("unix://"),
"Expected unix:// URL, got: {}",
url
);
assert!(
url.contains("redis.sock"),
"Expected socket path in URL, got: {}",
url
);
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_redis_url_tcp_no_password() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
unsafe {
std::env::remove_var("TINYTOWN_REDIS_PASSWORD");
}
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis = RedisConfig {
use_socket: false,
host: "127.0.0.1".to_string(),
port: 6380,
password: None,
tls_enabled: false,
..Default::default()
};
let url = config.redis_url();
assert_eq!(url, "redis://127.0.0.1:6380", "Unexpected URL: {}", url);
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_redis_url_tcp_with_password() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
unsafe {
std::env::remove_var("TINYTOWN_REDIS_PASSWORD");
}
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis = RedisConfig {
use_socket: false,
host: "localhost".to_string(),
port: 6379,
password: Some("secret123".to_string()),
tls_enabled: false,
..Default::default()
};
let url = config.redis_url();
assert_eq!(
url, "redis://:secret123@localhost:6379",
"Unexpected URL: {}",
url
);
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_redis_url_tls_enabled() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
unsafe {
std::env::remove_var("TINYTOWN_REDIS_PASSWORD");
}
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis = RedisConfig {
use_socket: false,
host: "redis.example.com".to_string(),
port: 6379,
password: Some("tls-password".to_string()),
tls_enabled: true,
..Default::default()
};
let url = config.redis_url();
assert!(
url.starts_with("rediss://"),
"Expected rediss:// scheme, got: {}",
url
);
assert_eq!(url, "rediss://:tls-password@redis.example.com:6379");
Ok(())
}
#[tokio::test]
async fn test_is_remote_redis() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis.use_socket = true;
assert!(
!config.is_remote_redis(),
"Unix socket should not be remote"
);
config.redis = RedisConfig {
use_socket: false,
host: "localhost".to_string(),
port: 6379,
..Default::default()
};
assert!(!config.is_remote_redis(), "localhost should not be remote");
config.redis.host = "127.0.0.1".to_string();
assert!(!config.is_remote_redis(), "127.0.0.1 should not be remote");
config.redis.host = "127.0.1.1".to_string();
assert!(!config.is_remote_redis(), "127.x.x.x should not be remote");
config.redis.host = "redis.example.com".to_string();
assert!(
config.is_remote_redis(),
"redis.example.com should be remote"
);
config.redis.host = "192.168.1.100".to_string();
assert!(config.is_remote_redis(), "192.168.1.100 should be remote");
Ok(())
}
#[tokio::test]
async fn test_redis_config_defaults() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::config::RedisConfig;
let config = RedisConfig::default();
assert!(
config.url.is_none(),
"Explicit URL should be unset by default"
);
assert!(config.use_socket, "Default should use Unix socket");
assert_eq!(config.socket_path, ".tt/redis.sock");
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 6379);
assert!(
config.password.is_none(),
"Password should be None by default"
);
assert!(!config.tls_enabled, "TLS should be disabled by default");
assert!(config.tls_cert.is_none());
assert!(config.tls_key.is_none());
assert!(config.tls_ca_cert.is_none());
assert_eq!(config.bind, "127.0.0.1");
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_redis_password_env_var_override() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
unsafe {
std::env::remove_var("TINYTOWN_REDIS_PASSWORD");
}
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis = RedisConfig {
use_socket: false,
host: "localhost".to_string(),
port: 6379,
password: Some("config-password".to_string()),
tls_enabled: false,
..Default::default()
};
unsafe {
std::env::set_var("TINYTOWN_REDIS_PASSWORD", "env-password");
}
assert_eq!(
config.redis_password(),
Some("env-password".to_string()),
"Env var should override config password"
);
let url = config.redis_url();
assert!(
url.contains("env-password"),
"URL should use env var password, got: {}",
url
);
assert!(
!url.contains("config-password"),
"URL should NOT use config password, got: {}",
url
);
unsafe {
std::env::remove_var("TINYTOWN_REDIS_PASSWORD");
}
assert_eq!(
config.redis_password(),
Some("config-password".to_string()),
"After removing env var, should use config password"
);
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_redis_url_redacted_masks_password() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
unsafe {
std::env::remove_var("TINYTOWN_REDIS_PASSWORD");
}
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis = RedisConfig {
use_socket: false,
host: "redis.example.com".to_string(),
port: 6379,
password: Some("super-secret-password".to_string()),
tls_enabled: false,
..Default::default()
};
let real_url = config.redis_url();
assert!(
real_url.contains("super-secret-password"),
"Real URL should contain password"
);
let redacted_url = config.redis_url_redacted();
assert!(
!redacted_url.contains("super-secret-password"),
"Redacted URL should NOT contain password"
);
assert!(
redacted_url.contains("****"),
"Redacted URL should contain mask: {}",
redacted_url
);
assert_eq!(redacted_url, "redis://:****@redis.example.com:6379");
config.redis.tls_enabled = true;
let redacted_tls = config.redis_url_redacted();
assert!(redacted_tls.starts_with("rediss://"));
assert!(redacted_tls.contains("****"));
assert!(!redacted_tls.contains("super-secret-password"));
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_redis_url_redacted_no_password() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
use tinytown::config::RedisConfig;
unsafe {
std::env::remove_var("TINYTOWN_REDIS_PASSWORD");
}
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis = RedisConfig {
use_socket: false,
host: "localhost".to_string(),
port: 6379,
password: None,
tls_enabled: false,
..Default::default()
};
let real_url = config.redis_url();
let redacted_url = config.redis_url_redacted();
assert_eq!(real_url, redacted_url, "URLs should match when no password");
assert_eq!(real_url, "redis://localhost:6379");
Ok(())
}
#[tokio::test]
async fn test_redis_url_config_override() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis.use_socket = false;
config.redis.host = "should-not-be-used".to_string();
config.redis.port = 6399;
config.redis.url = Some("redis://override.example.com:6381/2".to_string());
assert_eq!(config.redis_url(), "redis://override.example.com:6381/2");
assert!(config.is_remote_redis());
Ok(())
}
#[tokio::test]
async fn test_redis_url_redacted_masks_explicit_url_password()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis.url = Some("rediss://default:cloud-secret@redis.example.com:6380/0".to_string());
assert_eq!(
config.redis_url_redacted(),
"rediss://default:****@redis.example.com:6380/0"
);
assert!(!config.redis_url_redacted().contains("cloud-secret"));
Ok(())
}
#[tokio::test]
async fn test_redis_url_redacted_preserves_encoded_username()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis.url =
Some("rediss://user%40name:cloud-secret@redis.example.com:6380/0".to_string());
assert_eq!(
config.redis_url_redacted(),
"rediss://user%40name:****@redis.example.com:6380/0"
);
assert!(!config.redis_url_redacted().contains("%2540"));
Ok(())
}
#[tokio::test]
async fn test_redis_url_redacted_masks_malformed_explicit_url_password()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis.url = Some("redis://default:cloud-secret@".to_string());
assert_eq!(config.redis_url_redacted(), "redis://default:****@");
assert!(!config.redis_url_redacted().contains("cloud-secret"));
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_redis_url_env_override() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
unsafe {
std::env::remove_var("REDIS_URL");
}
let temp_dir = TempDir::new()?;
let mut config = Config::new("test-town", temp_dir.path());
config.redis.url = Some("redis://config.example.com:6379".to_string());
unsafe {
std::env::set_var("REDIS_URL", "redis://env.example.com:6380/5");
}
assert_eq!(config.redis_url(), "redis://env.example.com:6380/5");
assert_eq!(
config.redis_url_redacted(),
"redis://env.example.com:6380/5"
);
assert!(config.is_remote_redis());
unsafe {
std::env::remove_var("REDIS_URL");
}
Ok(())
}
#[tokio::test]
async fn test_town_init_with_explicit_redis_url_skips_local_startup()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
let source_dir = TempDir::new()?;
let source_town = Town::init(source_dir.path(), unique_town_name("redis-source")).await?;
let source_url = source_town.config().redis_url();
let target_dir = TempDir::new()?;
let mut config = Config::new(unique_town_name("redis-target"), target_dir.path());
config.redis.url = Some(source_url);
config.redis.host = "invalid-host".to_string();
config.redis.port = 1;
config.redis.use_socket = false;
let target_town = Town::init_with_config(config).await?;
assert!(target_town.config().is_remote_redis());
assert!(!target_dir.path().join(".tt/redis.pid").exists());
Ok(())
}
#[tokio::test]
#[serial_test::serial]
async fn test_town_init_with_redis_url_env_override_skips_local_startup()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
unsafe {
std::env::remove_var("REDIS_URL");
}
let source_dir = TempDir::new()?;
let source_town = Town::init(source_dir.path(), unique_town_name("redis-env-source")).await?;
unsafe {
std::env::set_var("REDIS_URL", source_town.config().redis_url());
}
let target_dir = TempDir::new()?;
let mut config = Config::new(unique_town_name("redis-env-target"), target_dir.path());
config.redis.host = "invalid-host".to_string();
config.redis.port = 1;
config.redis.use_socket = false;
let target_town = Town::init_with_config(config).await?;
assert!(target_town.config().is_remote_redis());
assert!(!target_dir.path().join(".tt/redis.pid").exists());
unsafe {
std::env::remove_var("REDIS_URL");
}
Ok(())
}
#[tokio::test]
async fn test_town_init_with_unreachable_explicit_redis_url_returns_clear_error()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::{Config, Error};
let temp_dir = TempDir::new()?;
let mut config = Config::new(unique_town_name("redis-fail"), temp_dir.path());
let unused_port = reserve_unused_port()?;
config.redis.url = Some(format!("redis://127.0.0.1:{unused_port}"));
match Town::init_with_config(config).await {
Err(Error::Config(message)) => {
assert!(
message.contains("Failed to connect to configured Redis"),
"unexpected message: {message}"
);
assert!(
message.contains(&format!("redis://127.0.0.1:{unused_port}")),
"unexpected message: {message}"
);
}
Ok(_) => panic!("expected config error, got successful town init"),
Err(other) => panic!("expected config error, got {other}"),
}
Ok(())
}
#[test]
fn test_mission_id_creation_and_parsing() {
use tinytown::mission::{MissionId, WatchId, WorkItemId};
use uuid::Uuid;
let id1 = MissionId::new();
let id2 = MissionId::new();
assert_ne!(id1, id2, "Each new ID should be unique");
let uuid = Uuid::new_v4();
let id_from_uuid = MissionId::from_uuid(uuid);
assert_eq!(format!("{}", id_from_uuid), format!("{}", uuid));
let id_str = id1.to_string();
let parsed: MissionId = id_str.parse().expect("Should parse MissionId");
assert_eq!(id1, parsed);
let default_id = MissionId::default();
assert_ne!(default_id, id1, "Default should create new ID");
let work_id = WorkItemId::new();
let work_str = work_id.to_string();
let parsed_work: WorkItemId = work_str.parse().expect("Should parse WorkItemId");
assert_eq!(work_id, parsed_work);
let watch_id = WatchId::new();
let watch_str = watch_id.to_string();
let parsed_watch: WatchId = watch_str.parse().expect("Should parse WatchId");
assert_eq!(watch_id, parsed_watch);
}
#[test]
fn test_objective_ref_display() {
use tinytown::mission::ObjectiveRef;
let issue_ref = ObjectiveRef::Issue {
owner: "redis-field-engineering".into(),
repo: "tinytown".into(),
number: 42,
};
assert_eq!(
format!("{}", issue_ref),
"redis-field-engineering/tinytown#42"
);
let doc_ref = ObjectiveRef::Doc {
path: "docs/design.md".into(),
};
assert_eq!(format!("{}", doc_ref), "docs/design.md");
}
#[test]
fn test_mission_run_state_transitions() {
use tinytown::mission::{MissionRun, MissionState, ObjectiveRef};
let objectives = vec![ObjectiveRef::Issue {
owner: "owner".into(),
repo: "repo".into(),
number: 1,
}];
let mut mission = MissionRun::new(objectives.clone());
assert_eq!(mission.state, MissionState::Planning);
assert!(mission.blocked_reason.is_none());
assert_eq!(mission.objective_refs.len(), 1);
mission.start();
assert_eq!(mission.state, MissionState::Running);
mission.block("Waiting for CI");
assert_eq!(mission.state, MissionState::Blocked);
assert_eq!(mission.blocked_reason.as_deref(), Some("Waiting for CI"));
mission.complete();
assert_eq!(mission.state, MissionState::Completed);
assert!(mission.blocked_reason.is_none());
let mut mission2 = MissionRun::new(objectives);
mission2.fail("Unrecoverable error");
assert_eq!(mission2.state, MissionState::Failed);
assert_eq!(
mission2.blocked_reason.as_deref(),
Some("Unrecoverable error")
);
}
#[test]
fn test_mission_run_with_policy() {
use tinytown::mission::{MissionPolicy, MissionRun, ObjectiveRef};
let objectives = vec![ObjectiveRef::Doc {
path: "README.md".into(),
}];
let policy = MissionPolicy {
max_parallel_items: 5,
reviewer_required: false,
auto_merge: true,
watch_interval_secs: 60,
};
let mission = MissionRun::new(objectives).with_policy(policy.clone());
assert_eq!(mission.policy.max_parallel_items, 5);
assert!(!mission.policy.reviewer_required);
assert!(mission.policy.auto_merge);
assert_eq!(mission.policy.watch_interval_secs, 60);
}
#[test]
fn test_work_item_state_transitions() {
use tinytown::AgentId;
use tinytown::mission::{MissionId, WorkItem, WorkKind, WorkStatus};
let mission_id = MissionId::new();
let mut work_item = WorkItem::new(mission_id, "Implement feature", WorkKind::Implement);
assert_eq!(work_item.status, WorkStatus::Pending);
assert!(!work_item.status.is_terminal());
assert!(!work_item.status.is_ready());
assert!(work_item.assigned_to.is_none());
assert!(work_item.artifact_refs.is_empty());
work_item.mark_ready();
assert_eq!(work_item.status, WorkStatus::Ready);
assert!(work_item.status.is_ready());
let agent_id = AgentId::new();
work_item.assign(agent_id);
assert_eq!(work_item.status, WorkStatus::Assigned);
assert_eq!(work_item.assigned_to, Some(agent_id));
work_item.start();
assert_eq!(work_item.status, WorkStatus::Running);
work_item.block();
assert_eq!(work_item.status, WorkStatus::Blocked);
work_item.complete(vec!["https://github.com/owner/repo/pull/1".into()]);
assert_eq!(work_item.status, WorkStatus::Done);
assert!(work_item.status.is_terminal());
assert_eq!(work_item.artifact_refs.len(), 1);
}
#[test]
fn test_work_item_builder_methods() {
use tinytown::mission::{MissionId, WorkItem, WorkItemId, WorkKind};
let mission_id = MissionId::new();
let dep1 = WorkItemId::new();
let dep2 = WorkItemId::new();
let work_item = WorkItem::new(mission_id, "Test feature", WorkKind::Test)
.with_dependencies(vec![dep1, dep2])
.with_owner_role("tester")
.with_source_ref("owner/repo#42");
assert_eq!(work_item.depends_on.len(), 2);
assert!(work_item.depends_on.contains(&dep1));
assert!(work_item.depends_on.contains(&dep2));
assert_eq!(work_item.owner_role.as_deref(), Some("tester"));
assert_eq!(work_item.source_ref.as_deref(), Some("owner/repo#42"));
assert_eq!(work_item.kind, WorkKind::Test);
}
#[test]
fn test_watch_item_scheduling() {
use tinytown::mission::{
MissionId, TriggerAction, WatchItem, WatchKind, WatchStatus, WorkItemId,
};
let mission_id = MissionId::new();
let work_item_id = WorkItemId::new();
let watch = WatchItem::new(
mission_id,
work_item_id,
WatchKind::PrChecks,
"https://github.com/owner/repo/pull/1",
1,
);
assert_eq!(watch.status, WatchStatus::Active);
assert_eq!(watch.kind, WatchKind::PrChecks);
assert!(watch.last_check_at.is_none());
assert_eq!(watch.consecutive_failures, 0);
let watch_with_trigger = WatchItem::new(
mission_id,
work_item_id,
WatchKind::ReviewComments,
"pr/1",
60,
)
.with_trigger(TriggerAction::NotifyReviewer);
assert_eq!(watch_with_trigger.on_trigger, TriggerAction::NotifyReviewer);
}
#[test]
fn test_watch_item_check_recording() {
use chrono::Utc;
use tinytown::mission::{MissionId, WatchItem, WatchKind, WorkItemId};
let mission_id = MissionId::new();
let work_item_id = WorkItemId::new();
let mut watch = WatchItem::new(mission_id, work_item_id, WatchKind::PrChecks, "pr/1", 60);
let before_check = Utc::now();
watch.record_check();
assert!(watch.last_check_at.is_some());
assert!(watch.last_check_at.unwrap() >= before_check);
assert_eq!(watch.consecutive_failures, 0);
assert!(watch.next_due_at > before_check);
watch.record_failure();
assert_eq!(watch.consecutive_failures, 1);
watch.record_failure();
assert_eq!(watch.consecutive_failures, 2);
watch.record_failure();
assert_eq!(watch.consecutive_failures, 3);
}
#[test]
fn test_watch_item_snooze_and_complete() {
use chrono::Utc;
use tinytown::mission::{MissionId, WatchItem, WatchKind, WatchStatus, WorkItemId};
let mission_id = MissionId::new();
let work_item_id = WorkItemId::new();
let mut watch = WatchItem::new(
mission_id,
work_item_id,
WatchKind::Mergeability,
"pr/1",
60,
);
watch.snooze(300);
assert_eq!(watch.status, WatchStatus::Snoozed);
assert!(watch.next_due_at > Utc::now());
watch.complete();
assert_eq!(watch.status, WatchStatus::Done);
}
#[test]
fn test_work_kind_and_status_variants() {
use tinytown::mission::{WorkKind, WorkStatus};
let kinds = [
WorkKind::Design,
WorkKind::Implement,
WorkKind::Test,
WorkKind::Review,
WorkKind::MergeGate,
WorkKind::Followup,
];
assert_eq!(kinds.len(), 6);
assert_eq!(WorkKind::default(), WorkKind::Implement);
let statuses = [
WorkStatus::Pending,
WorkStatus::Ready,
WorkStatus::Assigned,
WorkStatus::Running,
WorkStatus::Blocked,
WorkStatus::Done,
];
assert_eq!(statuses.len(), 6);
assert_eq!(WorkStatus::default(), WorkStatus::Pending);
assert!(!WorkStatus::Pending.is_terminal());
assert!(!WorkStatus::Ready.is_terminal());
assert!(!WorkStatus::Assigned.is_terminal());
assert!(!WorkStatus::Running.is_terminal());
assert!(!WorkStatus::Blocked.is_terminal());
assert!(WorkStatus::Done.is_terminal());
assert!(!WorkStatus::Pending.is_ready());
assert!(WorkStatus::Ready.is_ready());
assert!(!WorkStatus::Assigned.is_ready());
}
#[test]
fn test_enum_defaults() {
use tinytown::mission::{MissionState, TriggerAction, WatchKind, WatchStatus};
assert_eq!(MissionState::default(), MissionState::Planning);
assert_eq!(WatchKind::default(), WatchKind::PrChecks);
assert_eq!(WatchStatus::default(), WatchStatus::Active);
assert_eq!(TriggerAction::default(), TriggerAction::CreateFixTask);
}
#[test]
fn test_mission_policy_defaults() {
use tinytown::mission::MissionPolicy;
let policy = MissionPolicy::default();
assert_eq!(policy.max_parallel_items, 2);
assert!(policy.reviewer_required);
assert!(!policy.auto_merge);
assert_eq!(policy.watch_interval_secs, 180);
}
#[tokio::test]
async fn test_mission_storage_save_and_get_mission() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{MissionRun, MissionState, MissionStorage, ObjectiveRef};
let town = create_test_town("mission-storage-basic").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let objectives = vec![ObjectiveRef::Issue {
owner: "owner".into(),
repo: "repo".into(),
number: 42,
}];
let mission = MissionRun::new(objectives);
let mission_id = mission.id;
storage.save_mission(&mission).await?;
let retrieved = storage.get_mission(mission_id).await?;
assert!(retrieved.is_some());
let retrieved = retrieved.unwrap();
assert_eq!(retrieved.id, mission_id);
assert_eq!(retrieved.state, MissionState::Planning);
assert_eq!(retrieved.objective_refs.len(), 1);
Ok(())
}
#[tokio::test]
async fn test_mission_storage_delete_mission() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{MissionRun, MissionStorage, ObjectiveRef};
let town = create_test_town("mission-storage-delete").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
let mission_id = mission.id;
storage.save_mission(&mission).await?;
assert!(storage.get_mission(mission_id).await?.is_some());
let deleted = storage.delete_mission(mission_id).await?;
assert!(deleted);
assert!(storage.get_mission(mission_id).await?.is_none());
let deleted_again = storage.delete_mission(mission_id).await?;
assert!(!deleted_again);
Ok(())
}
#[tokio::test]
async fn test_mission_storage_active_set() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{MissionRun, MissionStorage, ObjectiveRef};
let town = create_test_town("mission-storage-active").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let active = storage.list_active().await?;
assert!(active.is_empty());
let mission1 = MissionRun::new(vec![ObjectiveRef::Doc {
path: "doc1.md".into(),
}]);
let mission2 = MissionRun::new(vec![ObjectiveRef::Doc {
path: "doc2.md".into(),
}]);
let id1 = mission1.id;
let id2 = mission2.id;
storage.save_mission(&mission1).await?;
storage.save_mission(&mission2).await?;
storage.add_active(id1).await?;
storage.add_active(id2).await?;
let active = storage.list_active().await?;
assert_eq!(active.len(), 2);
assert!(active.contains(&id1));
assert!(active.contains(&id2));
storage.remove_active(id1).await?;
let active = storage.list_active().await?;
assert_eq!(active.len(), 1);
assert!(!active.contains(&id1));
assert!(active.contains(&id2));
Ok(())
}
#[tokio::test]
async fn test_mission_storage_work_items() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{MissionRun, MissionStorage, ObjectiveRef, WorkItem, WorkKind};
let town = create_test_town("mission-storage-work").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
let mission_id = mission.id;
storage.save_mission(&mission).await?;
let work1 = WorkItem::new(mission_id, "Design feature", WorkKind::Design);
let work2 = WorkItem::new(mission_id, "Implement feature", WorkKind::Implement);
let work1_id = work1.id;
let work2_id = work2.id;
storage.save_work_item(&work1).await?;
storage.save_work_item(&work2).await?;
let retrieved = storage.get_work_item(mission_id, work1_id).await?;
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().title, "Design feature");
let items = storage.list_work_items(mission_id).await?;
assert_eq!(items.len(), 2);
let deleted = storage.delete_work_item(mission_id, work1_id).await?;
assert!(deleted);
let items = storage.list_work_items(mission_id).await?;
assert_eq!(items.len(), 1);
assert_eq!(items[0].id, work2_id);
Ok(())
}
#[tokio::test]
async fn test_mission_scheduler_assigns_persisted_tasks() -> Result<(), Box<dyn std::error::Error>>
{
use tinytown::mission::{
MissionRun, MissionScheduler, MissionStorage, ObjectiveRef, WorkItem, WorkKind,
};
let town = create_test_town("mission-scheduler-task-assign").await?;
let agent_handle = town.spawn_agent("backend-worker", "claude").await?;
let mut agent = Agent::new("backend-worker", "claude", AgentType::Worker);
agent.id = agent_handle.id();
agent.state = AgentState::Idle;
town.channel().set_agent_state(&agent).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mut mission = MissionRun::new(vec![ObjectiveRef::Issue {
owner: "owner".into(),
repo: "repo".into(),
number: 42,
}]);
mission.policy.reviewer_required = false;
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut work_item = WorkItem::new(mission.id, "Implement feature", WorkKind::Implement);
work_item.mark_ready();
let work_item_id = work_item.id;
storage.save_work_item(&work_item).await?;
let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone());
let result = scheduler.tick().await?;
assert_eq!(result.total_assigned, 1);
let inbox = town.channel().peek_inbox(agent_handle.id(), 10).await?;
assert_eq!(inbox.len(), 1);
let task_id = match &inbox[0].msg_type {
MessageType::TaskAssign { task_id } => task_id.parse::<TaskId>()?,
other => panic!("expected TaskAssign, got {:?}", other),
};
let task = town
.channel()
.get_task(task_id)
.await?
.expect("stored task");
assert_eq!(task.assigned_to, Some(agent_handle.id()));
assert!(
task.description
.contains("[Mission Work Item] Implement feature")
);
assert!(task.tags.iter().any(|tag| tag == "mission-work-item"));
assert!(
task.tags
.iter()
.any(|tag| tag == &format!("mission:{}", mission.id))
);
assert!(
task.tags
.iter()
.any(|tag| tag == &format!("work-item:{}", work_item_id))
);
Ok(())
}
#[tokio::test]
async fn test_mission_storage_watch_items() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
MissionRun, MissionStorage, ObjectiveRef, WatchItem, WatchKind, WorkItem, WorkKind,
};
let town = create_test_town("mission-storage-watch").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
let mission_id = mission.id;
storage.save_mission(&mission).await?;
let work = WorkItem::new(mission_id, "Implement", WorkKind::Implement);
let work_id = work.id;
storage.save_work_item(&work).await?;
let watch = WatchItem::new(mission_id, work_id, WatchKind::PrChecks, "pr/123", 60);
let watch_id = watch.id;
storage.save_watch_item(&watch).await?;
let retrieved = storage.get_watch_item(mission_id, watch_id).await?;
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().target_ref, "pr/123");
let watches = storage.list_watch_items(mission_id).await?;
assert_eq!(watches.len(), 1);
let deleted = storage.delete_watch_item(mission_id, watch_id).await?;
assert!(deleted);
assert!(storage.list_watch_items(mission_id).await?.is_empty());
Ok(())
}
#[tokio::test]
async fn test_mission_storage_events() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{MissionRun, MissionStorage, ObjectiveRef};
let town = create_test_town("mission-storage-events").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
let mission_id = mission.id;
storage.save_mission(&mission).await?;
storage.log_event(mission_id, "Mission started").await?;
storage.log_event(mission_id, "Work item assigned").await?;
storage.log_event(mission_id, "PR created").await?;
let events = storage.get_events(mission_id, 10).await?;
assert_eq!(events.len(), 3);
assert!(events[0].contains("PR created"));
assert!(events[1].contains("Work item assigned"));
assert!(events[2].contains("Mission started"));
Ok(())
}
#[tokio::test]
async fn test_mission_storage_list_all() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{MissionRun, MissionStorage, ObjectiveRef};
let town = create_test_town("mission-storage-list-all").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mission1 = MissionRun::new(vec![ObjectiveRef::Doc {
path: "doc1.md".into(),
}]);
let mission2 = MissionRun::new(vec![ObjectiveRef::Doc {
path: "doc2.md".into(),
}]);
let mission3 = MissionRun::new(vec![ObjectiveRef::Doc {
path: "doc3.md".into(),
}]);
storage.save_mission(&mission1).await?;
storage.save_mission(&mission2).await?;
storage.save_mission(&mission3).await?;
let all = storage.list_all_missions().await?;
assert_eq!(all.len(), 3);
let ids: Vec<_> = all.iter().map(|m| m.id).collect();
assert!(ids.contains(&mission1.id));
assert!(ids.contains(&mission2.id));
assert!(ids.contains(&mission3.id));
Ok(())
}
#[tokio::test]
async fn test_mission_storage_list_due_watches() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
MissionRun, MissionStorage, ObjectiveRef, WatchItem, WatchKind, WorkItem, WorkKind,
};
let town = create_test_town("mission-storage-due-watches").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
let mission_id = mission.id;
storage.save_mission(&mission).await?;
storage.add_active(mission_id).await?;
let work = WorkItem::new(mission_id, "Implement", WorkKind::Implement);
let work_id = work.id;
storage.save_work_item(&work).await?;
let watch = WatchItem::new(mission_id, work_id, WatchKind::PrChecks, "pr/123", 0);
storage.save_watch_item(&watch).await?;
let due = storage.list_due_watches().await?;
assert_eq!(due.len(), 1);
assert_eq!(due[0].target_ref, "pr/123");
Ok(())
}
#[tokio::test]
async fn test_mission_storage_update_flow() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
MissionRun, MissionState, MissionStorage, ObjectiveRef, WorkItem, WorkKind, WorkStatus,
};
let town = create_test_town("mission-storage-update").await?;
let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name());
let mut mission = MissionRun::new(vec![ObjectiveRef::Issue {
owner: "owner".into(),
repo: "repo".into(),
number: 1,
}]);
let mission_id = mission.id;
storage.save_mission(&mission).await?;
mission.start();
storage.save_mission(&mission).await?;
let retrieved = storage.get_mission(mission_id).await?.unwrap();
assert_eq!(retrieved.state, MissionState::Running);
let mut work = WorkItem::new(mission_id, "Task", WorkKind::Implement);
let work_id = work.id;
storage.save_work_item(&work).await?;
work.mark_ready();
storage.save_work_item(&work).await?;
let retrieved_work = storage.get_work_item(mission_id, work_id).await?.unwrap();
assert_eq!(retrieved_work.status, WorkStatus::Ready);
Ok(())
}
#[tokio::test]
async fn test_mission_record_submission_creates_review_task_and_watches()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
MissionRun, MissionScheduler, MissionStorage, ObjectiveRef, WatchKind, WorkItem,
WorkItemCompletion, WorkKind,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-record-submission");
let town = Town::init(temp_dir.path(), &town_name).await?;
let worker = town.spawn_agent("backend-worker", "claude").await?;
let reviewer = town.spawn_agent("reviewer", "claude").await?;
let mut worker_state = Agent::new("backend-worker", "claude", AgentType::Worker);
worker_state.id = worker.id();
worker_state.state = AgentState::Idle;
town.channel().set_agent_state(&worker_state).await?;
let mut reviewer_state = Agent::new("reviewer", "claude", AgentType::Worker);
reviewer_state.id = reviewer.id();
reviewer_state.state = AgentState::Idle;
town.channel().set_agent_state(&reviewer_state).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Issue {
owner: "owner".into(),
repo: "repo".into(),
number: 1,
}]);
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.assign(worker.id());
storage.save_work_item(&item).await?;
let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone());
let completion = scheduler
.record_submission(
mission.id,
item.id,
vec![
"task:123".into(),
"Opened PR https://github.com/test/repo/pull/42".into(),
],
)
.await?;
assert_eq!(completion, WorkItemCompletion::WaitingForReview);
let watches = storage.list_watch_items(mission.id).await?;
assert_eq!(watches.len(), 4);
assert!(
watches
.iter()
.any(|watch| watch.kind == WatchKind::PrChecks)
);
assert!(
watches
.iter()
.any(|watch| watch.kind == WatchKind::BugbotComments)
);
assert!(
watches
.iter()
.any(|watch| watch.kind == WatchKind::ReviewComments)
);
assert!(
watches
.iter()
.any(|watch| watch.kind == WatchKind::Mergeability)
);
let review_tasks: Vec<_> = town
.channel()
.list_tasks()
.await?
.into_iter()
.filter(|task| task.tags.iter().any(|tag| tag == "mission-review-task"))
.collect();
assert_eq!(review_tasks.len(), 1);
assert_eq!(review_tasks[0].assigned_to, Some(reviewer.id()));
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_reviewer_approval_finalizes_item() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
MissionRun, MissionScheduler, MissionStorage, ObjectiveRef, WorkItem, WorkItemCompletion,
WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-review-approve");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.block();
item.record_artifacts(["task:1"]);
storage.save_work_item(&item).await?;
let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone());
let completion = scheduler
.approve_submission(mission.id, item.id, vec!["approved: looks good".into()])
.await?;
assert_eq!(completion, WorkItemCompletion::Completed);
let updated = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated.status, WorkStatus::Done);
assert!(updated.reviewer_approved);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_creates_fix_task_from_failing_watch()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
CheckStatus, DispatcherConfig, MissionDispatcher, MissionRun, MissionStorage,
MockGitHubClient, ObjectiveRef, PrCheckResult, ReviewState, WatchItem, WatchKind, WorkItem,
WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-fix");
let town = Town::init(temp_dir.path(), &town_name).await?;
let worker = town.spawn_agent("backend-worker", "claude").await?;
let mut worker_state = Agent::new("backend-worker", "claude", AgentType::Worker);
worker_state.id = worker.id();
worker_state.state = AgentState::Idle;
town.channel().set_agent_state(&worker_state).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.assign(worker.id());
item.block();
item.record_artifacts(["https://github.com/test/repo/pull/99"]);
storage.save_work_item(&item).await?;
let watch = WatchItem::new(mission.id, item.id, WatchKind::PrChecks, "test/repo#99", 0);
storage.save_watch_item(&watch).await?;
let mut github = MockGitHubClient::new();
github.set_pr_checks(
"test",
"repo",
99,
PrCheckResult {
pr_number: 99,
repo: "test/repo".into(),
status: CheckStatus::Failure,
checks: vec![],
mergeable: false,
review_state: ReviewState::Pending,
blocking_comments: vec!["CI failed".into()],
},
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
let result = dispatcher.tick(None).await?;
assert_eq!(result.watch_result.watches_triggered, 1);
let fix_tasks: Vec<_> = town
.channel()
.list_tasks()
.await?
.into_iter()
.filter(|task| task.tags.iter().any(|tag| tag == "mission-fix-task"))
.collect();
assert_eq!(fix_tasks.len(), 1);
assert_eq!(fix_tasks[0].assigned_to, Some(worker.id()));
let updated_watch = storage.get_watch_item(mission.id, watch.id).await?.unwrap();
assert_ne!(updated_watch.status, tinytown::mission::WatchStatus::Active);
let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated_item.status, WorkStatus::Blocked);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_finalizes_after_successful_watch()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
CheckStatus, DispatcherConfig, MissionDispatcher, MissionRun, MissionStorage,
MockGitHubClient, ObjectiveRef, PrCheckResult, ReviewState, TriggerAction, WatchItem,
WatchKind, WatchStatus, WorkItem, WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-success");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.policy.reviewer_required = false;
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.block();
item.record_artifacts(["https://github.com/test/repo/pull/7"]);
storage.save_work_item(&item).await?;
let watch = WatchItem::new(mission.id, item.id, WatchKind::PrChecks, "test/repo#7", 0)
.with_trigger(TriggerAction::AdvancePipeline);
storage.save_watch_item(&watch).await?;
let mut github = MockGitHubClient::new();
github.set_pr_checks(
"test",
"repo",
7,
PrCheckResult {
pr_number: 7,
repo: "test/repo".into(),
status: CheckStatus::Success,
checks: vec![],
mergeable: true,
review_state: ReviewState::NotRequired,
blocking_comments: vec![],
},
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
let result = dispatcher.tick(None).await?;
assert_eq!(result.watch_result.watches_completed, 1);
let updated_watch = storage.get_watch_item(mission.id, watch.id).await?.unwrap();
assert_eq!(updated_watch.status, WatchStatus::Done);
let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated_item.status, WorkStatus::Done);
let updated_mission = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(
updated_mission.state,
tinytown::mission::MissionState::Completed
);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_self_resolves_clean_comment_watches()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
CheckStatus, DispatcherConfig, MissionDispatcher, MissionRun, MissionStorage,
MockGitHubClient, ObjectiveRef, PrCheckResult, ReviewState, TriggerAction, WatchItem,
WatchKind, WatchStatus, WorkItem, WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-comment-clean");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.policy.reviewer_required = false;
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.block();
item.record_artifacts(["https://github.com/test/repo/pull/13"]);
storage.save_work_item(&item).await?;
let bugbot_watch = WatchItem::new(
mission.id,
item.id,
WatchKind::BugbotComments,
"test/repo#13",
0,
)
.with_trigger(TriggerAction::CreateFixTask);
let review_watch = WatchItem::new(
mission.id,
item.id,
WatchKind::ReviewComments,
"test/repo#13",
0,
)
.with_trigger(TriggerAction::CreateFixTask);
storage.save_watch_item(&bugbot_watch).await?;
storage.save_watch_item(&review_watch).await?;
let mut github = MockGitHubClient::new();
github.set_pr_checks(
"test",
"repo",
13,
PrCheckResult {
pr_number: 13,
repo: "test/repo".into(),
status: CheckStatus::Success,
checks: vec![],
mergeable: true,
review_state: ReviewState::NotRequired,
blocking_comments: vec![],
},
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
let result = dispatcher.tick(None).await?;
assert_eq!(result.watch_result.watches_completed, 2);
let updated_bugbot = storage
.get_watch_item(mission.id, bugbot_watch.id)
.await?
.unwrap();
assert_eq!(updated_bugbot.status, WatchStatus::Done);
let updated_review = storage
.get_watch_item(mission.id, review_watch.id)
.await?
.unwrap();
assert_eq!(updated_review.status, WatchStatus::Done);
let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated_item.status, WorkStatus::Done);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_self_resolves_triggered_comment_watches()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
BugbotComment, CheckStatus, DispatcherConfig, MissionDispatcher, MissionRun,
MissionStorage, MockGitHubClient, ObjectiveRef, PrCheckResult, ReviewComment, ReviewState,
TriggerAction, WatchItem, WatchKind, WatchStatus, WorkItem, WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-comment-resolve");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.policy.reviewer_required = false;
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.block();
item.record_artifacts(["https://github.com/test/repo/pull/17"]);
storage.save_work_item(&item).await?;
let bugbot_watch = WatchItem::new(
mission.id,
item.id,
WatchKind::BugbotComments,
"test/repo#17",
60,
)
.with_trigger(TriggerAction::CreateFixTask);
let review_watch = WatchItem::new(
mission.id,
item.id,
WatchKind::ReviewComments,
"test/repo#17",
60,
)
.with_trigger(TriggerAction::CreateFixTask);
storage.save_watch_item(&bugbot_watch).await?;
storage.save_watch_item(&review_watch).await?;
let mut failing_github = MockGitHubClient::new();
failing_github.set_pr_checks(
"test",
"repo",
17,
PrCheckResult {
pr_number: 17,
repo: "test/repo".into(),
status: CheckStatus::Success,
checks: vec![],
mergeable: true,
review_state: ReviewState::ChangesRequested,
blocking_comments: vec!["changes requested".into()],
},
);
failing_github.reviews.insert(
"test/repo#17".into(),
vec![ReviewComment {
author: "reviewer".into(),
body: "Please fix this".into(),
is_actionable: true,
}],
);
failing_github.bugbot_comments.insert(
"test/repo#17".into(),
vec![BugbotComment {
bot_name: "bugbot".into(),
severity: "high".into(),
description: "Security issue".into(),
file_path: Some("src/lib.rs".into()),
}],
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
failing_github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
let first_result = dispatcher.tick(None).await?;
assert_eq!(first_result.watch_result.watches_triggered, 2);
let mut snoozed_bugbot = storage
.get_watch_item(mission.id, bugbot_watch.id)
.await?
.unwrap();
assert_eq!(snoozed_bugbot.status, WatchStatus::Snoozed);
snoozed_bugbot.next_due_at = chrono::Utc::now() - chrono::Duration::seconds(1);
storage.save_watch_item(&snoozed_bugbot).await?;
let mut snoozed_review = storage
.get_watch_item(mission.id, review_watch.id)
.await?
.unwrap();
assert_eq!(snoozed_review.status, WatchStatus::Snoozed);
snoozed_review.next_due_at = chrono::Utc::now() - chrono::Duration::seconds(1);
storage.save_watch_item(&snoozed_review).await?;
let mut clean_github = MockGitHubClient::new();
clean_github.set_pr_checks(
"test",
"repo",
17,
PrCheckResult {
pr_number: 17,
repo: "test/repo".into(),
status: CheckStatus::Success,
checks: vec![],
mergeable: true,
review_state: ReviewState::Approved,
blocking_comments: vec![],
},
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
clean_github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
let second_result = dispatcher.tick(None).await?;
assert_eq!(second_result.watch_result.watches_completed, 2);
let updated_bugbot = storage
.get_watch_item(mission.id, bugbot_watch.id)
.await?
.unwrap();
assert_eq!(updated_bugbot.status, WatchStatus::Done);
let updated_review = storage
.get_watch_item(mission.id, review_watch.id)
.await?
.unwrap();
assert_eq!(updated_review.status, WatchStatus::Done);
let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated_item.status, WorkStatus::Done);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_rechecks_snoozed_watch() -> Result<(), Box<dyn std::error::Error>>
{
use tinytown::mission::{
CheckStatus, DispatcherConfig, MissionDispatcher, MissionRun, MissionStorage,
MockGitHubClient, ObjectiveRef, PrCheckResult, ReviewState, TriggerAction, WatchItem,
WatchKind, WatchStatus, WorkItem, WorkKind,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-snooze");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.block();
item.record_artifacts(["https://github.com/test/repo/pull/99"]);
storage.save_work_item(&item).await?;
let watch = WatchItem::new(mission.id, item.id, WatchKind::PrChecks, "test/repo#99", 60)
.with_trigger(TriggerAction::CreateFixTask);
storage.save_watch_item(&watch).await?;
let mut failing_github = MockGitHubClient::new();
failing_github.set_pr_checks(
"test",
"repo",
99,
PrCheckResult {
pr_number: 99,
repo: "test/repo".into(),
status: CheckStatus::Failure,
checks: vec![],
mergeable: false,
review_state: ReviewState::Pending,
blocking_comments: vec!["CI failed".into()],
},
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
failing_github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(None).await?;
let mut snoozed_watch = storage.get_watch_item(mission.id, watch.id).await?.unwrap();
assert_eq!(snoozed_watch.status, WatchStatus::Snoozed);
snoozed_watch.next_due_at = chrono::Utc::now() - chrono::Duration::seconds(1);
storage.save_watch_item(&snoozed_watch).await?;
let mut succeeding_github = MockGitHubClient::new();
succeeding_github.set_pr_checks(
"test",
"repo",
99,
PrCheckResult {
pr_number: 99,
repo: "test/repo".into(),
status: CheckStatus::Success,
checks: vec![],
mergeable: true,
review_state: ReviewState::NotRequired,
blocking_comments: vec![],
},
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
succeeding_github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(None).await?;
let updated_watch = storage.get_watch_item(mission.id, watch.id).await?.unwrap();
assert_eq!(updated_watch.status, WatchStatus::Done);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_mergeability_respects_reviewer_gate()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
CheckStatus, DispatcherConfig, MissionDispatcher, MissionRun, MissionState, MissionStorage,
MockGitHubClient, ObjectiveRef, PrCheckResult, ReviewState, TriggerAction, WatchItem,
WatchKind, WatchStatus, WorkItem, WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-mergeability");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.policy.reviewer_required = true;
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.block();
item.record_artifacts(["https://github.com/test/repo/pull/7"]);
storage.save_work_item(&item).await?;
let watch = WatchItem::new(
mission.id,
item.id,
WatchKind::Mergeability,
"test/repo#7",
0,
)
.with_trigger(TriggerAction::AdvancePipeline);
storage.save_watch_item(&watch).await?;
let mut github = MockGitHubClient::new();
github.set_pr_checks(
"test",
"repo",
7,
PrCheckResult {
pr_number: 7,
repo: "test/repo".into(),
status: CheckStatus::Success,
checks: vec![],
mergeable: true,
review_state: ReviewState::Pending,
blocking_comments: vec![],
},
);
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
github,
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(None).await?;
let updated_watch = storage.get_watch_item(mission.id, watch.id).await?.unwrap();
assert_eq!(updated_watch.status, WatchStatus::Done);
let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated_item.status, WorkStatus::Blocked);
assert!(!updated_item.reviewer_approved);
let updated_mission = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(updated_mission.state, MissionState::Running);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_scheduler_does_not_finalize_running_items_with_artifacts()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
MissionRun, MissionScheduler, MissionState, MissionStorage, ObjectiveRef, WorkItem,
WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-running-item");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.policy.reviewer_required = false;
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.status = WorkStatus::Running;
item.record_artifacts(["https://github.com/test/repo/pull/11"]);
storage.save_work_item(&item).await?;
let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone());
scheduler.tick_missions(&[mission.id]).await?;
let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated_item.status, WorkStatus::Running);
let updated_mission = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(updated_mission.state, MissionState::Running);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_escalates_to_conductor_when_stuck()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
DispatcherConfig, MissionDispatcher, MissionRun, MissionStorage, MockGitHubClient,
ObjectiveRef, WorkItem, WorkKind,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-help");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.mark_ready();
storage.save_work_item(&item).await?;
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
MockGitHubClient::new(),
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(None).await?;
let inbox = town.channel().peek_inbox(AgentId::supervisor(), 10).await?;
assert!(inbox.iter().any(|message| {
matches!(
&message.msg_type,
MessageType::Query { question } if question.contains("[Mission Help Needed]")
)
}));
let updated = storage.get_mission(mission.id).await?.unwrap();
assert!(updated.dispatcher_last_help_request_at.is_some());
assert!(updated.dispatcher_last_help_request_reason.is_some());
assert_eq!(updated.dispatcher_help_request_attempts, 1);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_help_request_backoff() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
DispatcherConfig, MissionDispatcher, MissionRun, MissionStorage, MockGitHubClient,
ObjectiveRef, WorkItem, WorkKind,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-backoff");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.start();
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.mark_ready();
storage.save_work_item(&item).await?;
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
MockGitHubClient::new(),
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
help_repeat_interval_secs: 86_400,
help_repeat_backoff_cap: 8,
},
);
dispatcher.tick(None).await?;
let after_first = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(after_first.dispatcher_help_request_attempts, 1);
let inbox_before = town
.channel()
.peek_inbox(AgentId::supervisor(), 50)
.await?
.len();
dispatcher.tick(None).await?;
let inbox_after = town
.channel()
.peek_inbox(AgentId::supervisor(), 50)
.await?
.len();
assert_eq!(
inbox_after, inbox_before,
"dispatcher should not rebroadcast within the backoff window"
);
let after_second = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(after_second.dispatcher_help_request_attempts, 1);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_processes_conductor_note() -> Result<(), Box<dyn std::error::Error>>
{
use tinytown::mission::{
DispatcherConfig, MissionControlMessage, MissionDispatcher, MissionRun, MissionState,
MissionStorage, MockGitHubClient, ObjectiveRef,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-note");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.block("Waiting on operator");
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let note = MissionControlMessage::new(mission.id, "conductor", "resume and retry now");
let note_id = note.id.clone();
storage.save_control_message(¬e).await?;
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
MockGitHubClient::new(),
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(Some(mission.id)).await?;
let updated = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(updated.state, MissionState::Running);
assert!(updated.dispatcher_last_progress_at.is_some());
let messages = storage.list_control_messages(mission.id).await?;
let processed = messages
.into_iter()
.find(|message| message.id == note_id)
.expect("control note should exist");
assert!(processed.processed_at.is_some());
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_ignores_resume_note_for_failed_mission()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
DispatcherConfig, MissionControlMessage, MissionDispatcher, MissionRun, MissionState,
MissionStorage, MockGitHubClient, ObjectiveRef,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-resume-failed");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.fail("Unrecoverable error");
storage.save_mission(&mission).await?;
let note = MissionControlMessage::new(mission.id, "conductor", "resume and retry now");
let note_id = note.id.clone();
storage.save_control_message(¬e).await?;
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
MockGitHubClient::new(),
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(Some(mission.id)).await?;
let updated = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(updated.state, MissionState::Failed);
assert_eq!(
updated.blocked_reason.as_deref(),
Some("Unrecoverable error")
);
let messages = storage.list_control_messages(mission.id).await?;
let processed = messages
.into_iter()
.find(|message| message.id == note_id)
.expect("control note should exist");
assert!(processed.processed_at.is_some());
let events = storage.get_events(mission.id, 10).await?;
assert!(
events
.iter()
.any(|event| event.contains("ignored resume directive"))
);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_ignores_pause_note_for_blocked_mission()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
DispatcherConfig, MissionControlMessage, MissionDispatcher, MissionRun, MissionState,
MissionStorage, MockGitHubClient, ObjectiveRef,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-pause-blocked");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.block("Waiting on operator");
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let note = MissionControlMessage::new(mission.id, "conductor", "pause until tomorrow");
let note_id = note.id.clone();
storage.save_control_message(¬e).await?;
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
MockGitHubClient::new(),
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(Some(mission.id)).await?;
let updated = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(updated.state, MissionState::Blocked);
assert_eq!(
updated.blocked_reason.as_deref(),
Some("Waiting on operator")
);
let messages = storage.list_control_messages(mission.id).await?;
let processed = messages
.into_iter()
.find(|message| message.id == note_id)
.expect("control note should exist");
assert!(processed.processed_at.is_some());
let events = storage.get_events(mission.id, 10).await?;
assert!(
events
.iter()
.any(|event| event.contains("ignored pause directive"))
);
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_mission_dispatcher_resume_note_completes_blocking_watches()
-> Result<(), Box<dyn std::error::Error>> {
use tinytown::mission::{
DispatcherConfig, MissionControlMessage, MissionDispatcher, MissionRun, MissionState,
MissionStorage, MockGitHubClient, ObjectiveRef, TriggerAction, WatchItem, WatchKind,
WatchStatus, WorkItem, WorkKind, WorkStatus,
};
let temp_dir = TempDir::new()?;
let town_name = unique_town_name("mission-dispatch-resume-watches");
let town = Town::init(temp_dir.path(), &town_name).await?;
let storage = MissionStorage::new(town.channel().conn().clone(), &town_name);
let mut mission = MissionRun::new(vec![ObjectiveRef::Doc {
path: "test.md".into(),
}]);
mission.policy.reviewer_required = false;
mission.block("Waiting on external watch");
storage.save_mission(&mission).await?;
storage.add_active(mission.id).await?;
let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement);
item.block();
item.record_artifacts(["test/repo#21"]);
storage.save_work_item(&item).await?;
let watch = WatchItem::new(
mission.id,
item.id,
WatchKind::PrChecks,
"test/repo#21",
3600,
)
.with_trigger(TriggerAction::AdvancePipeline);
storage.save_watch_item(&watch).await?;
let note = MissionControlMessage::new(mission.id, "conductor", "resume and finish now");
storage.save_control_message(¬e).await?;
let dispatcher = MissionDispatcher::new(
storage.clone(),
town.channel().clone(),
MockGitHubClient::new(),
DispatcherConfig {
tick_interval_secs: 1,
lock_ttl_secs: 30,
..DispatcherConfig::default()
},
);
dispatcher.tick(Some(mission.id)).await?;
let updated_watch = storage.get_watch_item(mission.id, watch.id).await?.unwrap();
assert_eq!(updated_watch.status, WatchStatus::Done);
let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap();
assert_eq!(updated_item.status, WorkStatus::Done);
let updated_mission = storage.get_mission(mission.id).await?.unwrap();
assert_eq!(updated_mission.state, MissionState::Completed);
assert!(updated_mission.blocked_reason.is_none());
drop(town);
cleanup_redis(&temp_dir);
Ok(())
}
#[tokio::test]
async fn test_docket_ensure_group() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-group-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
channel.docket_ensure_group().await?;
Ok(())
}
#[tokio::test]
async fn test_docket_push() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-push-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
let task_id = TaskId::new();
let entry_id = channel
.docket_push(
task_id,
"Implement feature X",
"normal",
"conductor",
"worker-1",
)
.await?;
assert!(
entry_id.contains('-'),
"Entry ID should contain '-': {}",
entry_id
);
let len = channel.docket_len().await?;
assert_eq!(len, 1);
Ok(())
}
#[tokio::test]
async fn test_docket_push_multiple() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-push-multi-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
for i in 0..5 {
let task_id = TaskId::new();
channel
.docket_push(
task_id,
&format!("Task {}", i),
"normal",
"conductor",
"worker-1",
)
.await?;
}
let len = channel.docket_len().await?;
assert_eq!(len, 5);
Ok(())
}
#[tokio::test]
async fn test_docket_read() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-read-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
let task_id = TaskId::new();
channel
.docket_push(task_id, "Build the API", "high", "conductor", "agent-1")
.await?;
let result = channel.docket_read("agent-1", 100).await?;
assert!(result.is_some(), "Should have read an entry");
let (entry_id, fields) = result.unwrap();
assert!(entry_id.contains('-'));
assert_eq!(fields.get("task_id").unwrap(), &task_id.to_string());
assert_eq!(fields.get("type").unwrap(), "task_assign");
assert_eq!(fields.get("message").unwrap(), "Build the API");
assert_eq!(fields.get("priority").unwrap(), "high");
assert_eq!(fields.get("from").unwrap(), "conductor");
assert_eq!(fields.get("to").unwrap(), "agent-1");
assert!(fields.contains_key("timestamp"));
Ok(())
}
#[tokio::test]
async fn test_docket_read_empty() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-read-empty-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
let result = channel.docket_read("agent-1", 50).await?;
assert!(result.is_none(), "Should return None when stream is empty");
Ok(())
}
#[tokio::test]
async fn test_docket_ack() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-ack-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
let task_id = TaskId::new();
channel
.docket_push(task_id, "Fix the bug", "normal", "conductor", "worker-1")
.await?;
let (entry_id, _fields) = channel
.docket_read("worker-1", 100)
.await?
.expect("should read entry");
let pending_before = channel.docket_pending_count().await?;
assert_eq!(pending_before, 1, "Should have 1 pending entry before ACK");
channel.docket_ack(&entry_id).await?;
let pending_after = channel.docket_pending_count().await?;
assert_eq!(pending_after, 0, "Should have 0 pending entries after ACK");
Ok(())
}
#[tokio::test]
async fn test_docket_pending_visibility() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-pending-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
for i in 0..3 {
let task_id = TaskId::new();
channel
.docket_push(
task_id,
&format!("Task {}", i),
"normal",
"conductor",
"worker-1",
)
.await?;
}
for _ in 0..3 {
channel.docket_read("worker-1", 100).await?;
}
let pending = channel.docket_pending_count().await?;
assert_eq!(pending, 3, "Should have 3 pending entries");
Ok(())
}
#[tokio::test]
async fn test_docket_log_event() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-event-test").await?;
let channel = town.channel();
let task_id = TaskId::new();
let id1 = channel
.docket_log_event(task_id, "assigned", "Assigned to worker-1")
.await?;
let id2 = channel
.docket_log_event(task_id, "started", "Worker began processing")
.await?;
let id3 = channel
.docket_log_event(task_id, "completed", "Task finished successfully")
.await?;
assert!(id1.contains('-'));
assert!(id2.contains('-'));
assert!(id3.contains('-'));
assert!(id2 > id1, "Event IDs should be ordered");
assert!(id3 > id2, "Event IDs should be ordered");
Ok(())
}
#[tokio::test]
async fn test_docket_full_lifecycle() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-lifecycle-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
let task_id = TaskId::new();
let _push_id = channel
.docket_push(
task_id,
"Deploy to production",
"high",
"conductor",
"deployer",
)
.await?;
assert_eq!(channel.docket_len().await?, 1);
let (entry_id, fields) = channel
.docket_read("deployer", 100)
.await?
.expect("should read the task");
assert_eq!(fields.get("task_id").unwrap(), &task_id.to_string());
assert_eq!(fields.get("message").unwrap(), "Deploy to production");
assert_eq!(channel.docket_pending_count().await?, 1);
channel
.docket_log_event(task_id, "started", "Beginning deployment")
.await?;
channel
.docket_log_event(task_id, "progress", "50% complete")
.await?;
channel.docket_ack(&entry_id).await?;
channel
.docket_log_event(task_id, "completed", "Deployment successful")
.await?;
assert_eq!(channel.docket_pending_count().await?, 0);
assert_eq!(channel.docket_len().await?, 1);
Ok(())
}
#[tokio::test]
async fn test_use_streams_config_default() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("use-streams-config-test").await?;
let config = town.config();
assert!(!config.use_streams, "use_streams should default to false");
Ok(())
}
#[tokio::test]
async fn test_use_streams_config_parse() -> Result<(), Box<dyn std::error::Error>> {
use tempfile::TempDir;
use tinytown::Config;
let temp_dir = TempDir::new()?;
let config_path = temp_dir.path().join("tinytown.toml");
std::fs::write(
&config_path,
r#"
name = "stream-test"
use_streams = true
"#,
)?;
let config = Config::load(temp_dir.path())?;
assert!(
config.use_streams,
"use_streams should be true when set in config"
);
Ok(())
}
#[tokio::test]
async fn test_agent_idle_timeout_config_parse() -> Result<(), Box<dyn std::error::Error>> {
use tinytown::Config;
let temp_dir = TempDir::new()?;
let config_path = temp_dir.path().join("tinytown.toml");
std::fs::write(
&config_path,
r#"
name = "agent-timeout-test"
[agent]
idle_timeout_secs = 42
"#,
)?;
let config = Config::load(temp_dir.path())?;
assert_eq!(config.agent.idle_timeout_secs, 42);
Ok(())
}
#[tokio::test]
async fn test_agent_loop_exits_cleanly_after_idle_timeout() -> Result<(), Box<dyn std::error::Error>>
{
let town = create_test_town("agent-loop-idle-timeout").await?;
let town_path = town.config().root.clone();
let mut config = tinytown::Config::load(&town_path)?;
config.agent.idle_timeout_secs = 1;
config.save()?;
let handle = town.spawn_agent("idle-worker", "claude").await?;
let agent_id = handle.id();
let status = tokio::task::spawn_blocking(move || {
std::process::Command::new(env!("CARGO_BIN_EXE_tt"))
.arg("--town")
.arg(&town_path)
.arg("agent-loop")
.arg("idle-worker")
.arg(agent_id.to_string())
.arg("100")
.status()
})
.await??;
assert!(status.success(), "agent-loop should exit cleanly");
let agent = town
.channel()
.get_agent_state(agent_id)
.await?
.expect("idle worker should still be registered");
assert_eq!(agent.state, AgentState::Stopped);
Ok(())
}
#[tokio::test]
async fn test_agent_loop_ignores_stale_terminal_current_task()
-> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("agent-loop-stale-current-task").await?;
let town_path = town.config().root.clone();
let mut config = tinytown::Config::load(&town_path)?;
config.agent.idle_timeout_secs = 1;
config.save()?;
let handle = town.spawn_agent("idle-worker", "claude").await?;
let agent_id = handle.id();
let mut task = Task::new("Already finished task");
task.assign(agent_id);
task.complete("done");
town.channel().set_task(&task).await?;
let mut agent = town
.channel()
.get_agent_state(agent_id)
.await?
.expect("idle worker should exist");
agent.state = AgentState::Idle;
agent.current_task = Some(task.id);
town.channel().set_agent_state(&agent).await?;
let status = tokio::task::spawn_blocking(move || {
std::process::Command::new(env!("CARGO_BIN_EXE_tt"))
.arg("--town")
.arg(&town_path)
.arg("agent-loop")
.arg("idle-worker")
.arg(agent_id.to_string())
.arg("100")
.status()
})
.await??;
assert!(status.success(), "agent-loop should exit cleanly");
let agent = town
.channel()
.get_agent_state(agent_id)
.await?
.expect("idle worker should still be registered");
assert_eq!(agent.state, AgentState::Stopped);
assert_eq!(agent.current_task, None);
Ok(())
}
#[tokio::test]
async fn test_docket_multiple_consumers() -> Result<(), Box<dyn std::error::Error>> {
let town = create_test_town("docket-multi-consumer-test").await?;
let channel = town.channel();
channel.docket_ensure_group().await?;
let task1 = TaskId::new();
let task2 = TaskId::new();
channel
.docket_push(task1, "Task A", "normal", "conductor", "any")
.await?;
channel
.docket_push(task2, "Task B", "normal", "conductor", "any")
.await?;
let read1 = channel.docket_read("consumer-1", 100).await?;
let read2 = channel.docket_read("consumer-2", 100).await?;
assert!(read1.is_some(), "Consumer 1 should get a task");
assert!(read2.is_some(), "Consumer 2 should get a task");
let (id1, fields1) = read1.unwrap();
let (id2, fields2) = read2.unwrap();
assert_ne!(
fields1.get("task_id"),
fields2.get("task_id"),
"Each consumer should get a different task"
);
assert_eq!(channel.docket_pending_count().await?, 2);
channel.docket_ack(&id1).await?;
channel.docket_ack(&id2).await?;
assert_eq!(channel.docket_pending_count().await?, 0);
Ok(())
}