# 1.2 Task Management System
**Deliverable**: Task tree architecture, scheduling algorithms, and dynamic task management
**Status**: ✅ Implemented - Core task tree, scheduling, and dependency resolution are fully functional
## Task Tree Data Structure
### Core Task Model
The task management system uses a hierarchical tree structure with rich metadata and state tracking:
```rust
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Task {
pub id: TaskId,
pub title: String,
pub description: String,
pub status: TaskStatus,
pub parent_id: Option<TaskId>,
pub children: Vec<TaskId>,
pub dependencies: Vec<TaskId>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub metadata: TaskMetadata,
pub execution_history: Vec<ExecutionRecord>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum TaskStatus {
/// Task is ready to be executed but hasn't started
Pending,
/// Task is currently being processed
InProgress {
started_at: DateTime<Utc>,
estimated_completion: Option<DateTime<Utc>>,
},
/// Task is temporarily blocked by external factors
Blocked {
reason: String,
blocked_at: DateTime<Utc>,
retry_after: Option<DateTime<Utc>>,
},
/// Task completed successfully
Completed {
completed_at: DateTime<Utc>,
result: TaskResult,
},
/// Task failed and cannot be recovered automatically
Failed {
failed_at: DateTime<Utc>,
error: TaskError,
retry_count: u32,
},
/// Task was deliberately skipped
Skipped {
reason: String,
skipped_at: DateTime<Utc>,
},
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskMetadata {
pub priority: TaskPriority,
pub estimated_complexity: Option<ComplexityLevel>,
pub estimated_duration: Option<Duration>,
pub repository_refs: Vec<RepositoryRef>,
pub file_refs: Vec<FileRef>,
pub tags: Vec<String>,
pub context_requirements: ContextRequirements,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum TaskPriority {
Critical = 10,
High = 8,
Normal = 5,
Low = 3,
Background = 1,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ComplexityLevel {
Trivial, // < 5 minutes
Simple, // 5-15 minutes
Moderate, // 15-60 minutes
Complex, // 1-4 hours
Epic, // > 4 hours
}
```
### Task Relationships
#### Hierarchical Structure
Tasks form a tree where:
- **Parent tasks** represent high-level goals or features
- **Child tasks** are concrete implementation steps
- **Root tasks** are top-level objectives specified by the user
#### Dependency Management
```rust
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskDependency {
pub task_id: TaskId,
pub dependency_type: DependencyType,
pub required_status: Vec<TaskStatus>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum DependencyType {
/// Must complete before this task can start
Prerequisite,
/// Should complete before this task starts (soft dependency)
Preferred,
/// Must be running concurrently
Concurrent,
/// Must not run at the same time
Exclusive,
}
```
## Task Tree Operations
### Dynamic Task Creation
The system supports creating new tasks during execution based on discovered requirements:
```rust
impl TaskTree {
/// Create subtasks dynamically during parent task execution
pub async fn create_subtasks(
&mut self,
parent_id: TaskId,
subtask_specs: Vec<TaskSpec>,
) -> Result<Vec<TaskId>> {
let parent = self.get_task_mut(parent_id)?;
// Update parent status if it's not already in progress
if matches!(parent.status, TaskStatus::Pending) {
parent.status = TaskStatus::InProgress {
started_at: Utc::now(),
estimated_completion: None,
};
}
let mut created_tasks = Vec::new();
for spec in subtask_specs {
let task_id = self.create_task_from_spec(spec, Some(parent_id))?;
parent.children.push(task_id);
created_tasks.push(task_id);
}
self.recalculate_dependencies().await?;
self.notify_tree_changed().await?;
Ok(created_tasks)
}
/// Merge duplicate or similar tasks to avoid redundant work
pub async fn deduplicate_tasks(&mut self) -> Result<Vec<TaskId>> {
let candidates = self.find_similar_tasks().await?;
let mut merged_tasks = Vec::new();
for cluster in candidates {
if cluster.len() > 1 {
let primary = cluster[0];
let duplicates = &cluster[1..];
// Merge metadata and dependencies
self.merge_task_cluster(primary, duplicates).await?;
// Remove duplicate tasks
for &duplicate_id in duplicates {
self.remove_task(duplicate_id).await?;
merged_tasks.push(duplicate_id);
}
}
}
Ok(merged_tasks)
}
}
```
### Context Inheritance
Child tasks automatically inherit context from their parents:
```rust
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ContextRequirements {
pub required_files: Vec<PathBuf>,
pub required_repositories: Vec<String>,
pub build_dependencies: Vec<String>,
pub environment_vars: HashMap<String, String>,
pub claude_context_keys: Vec<String>,
}
impl Task {
/// Calculate effective context by merging parent context
pub fn effective_context(&self, tree: &TaskTree) -> ContextRequirements {
let mut context = self.metadata.context_requirements.clone();
if let Some(parent_id) = self.parent_id {
if let Ok(parent) = tree.get_task(parent_id) {
let parent_context = parent.effective_context(tree);
context.merge_with(&parent_context);
}
}
context
}
}
```
## Task Scheduling System
### Intelligent Task Selection
The scheduler implements a sophisticated scoring system for task prioritization:
```rust
pub struct TaskScheduler {
scoring_weights: ScoringWeights,
resource_monitor: ResourceMonitor,
context_cache: ContextCache,
}
#[derive(Clone, Debug)]
pub struct ScoringWeights {
pub priority_weight: f64,
pub dependency_weight: f64,
pub context_similarity_weight: f64,
pub resource_availability_weight: f64,
pub failure_penalty_weight: f64,
pub age_bonus_weight: f64,
}
impl TaskScheduler {
pub async fn select_next_task(&self, tree: &TaskTree) -> Option<TaskId> {
let eligible_tasks = self.get_eligible_tasks(tree).await;
if eligible_tasks.is_empty() {
return None;
}
let scored_tasks = eligible_tasks
.iter()
.map(|&task_id| {
let score = self.calculate_task_score(task_id, tree);
(task_id, score)
})
.collect::<Vec<_>>();
// Select highest scoring task with some randomization to avoid stagnation
self.weighted_random_selection(scored_tasks).await
}
fn calculate_task_score(&self, task_id: TaskId, tree: &TaskTree) -> f64 {
let task = tree.get_task(task_id).unwrap();
let priority_score = self.calculate_priority_score(task);
let dependency_score = self.calculate_dependency_score(task, tree);
let context_score = self.calculate_context_score(task);
let resource_score = self.calculate_resource_score(task);
let history_score = self.calculate_history_score(task);
let age_score = self.calculate_age_score(task);
let weights = &self.scoring_weights;
priority_score * weights.priority_weight
+ dependency_score * weights.dependency_weight
+ context_score * weights.context_similarity_weight
+ resource_score * weights.resource_availability_weight
+ history_score * weights.failure_penalty_weight
+ age_score * weights.age_bonus_weight
}
}
```
### Dependency Resolution
The system tracks and resolves complex dependency chains:
```rust
impl TaskScheduler {
async fn get_eligible_tasks(&self, tree: &TaskTree) -> Vec<TaskId> {
let mut eligible = Vec::new();
for task_id in tree.get_all_task_ids() {
if self.is_task_eligible(task_id, tree).await {
eligible.push(task_id);
}
}
eligible
}
async fn is_task_eligible(&self, task_id: TaskId, tree: &TaskTree) -> bool {
let task = match tree.get_task(task_id) {
Ok(task) => task,
Err(_) => return false,
};
// Check basic status eligibility
if !matches!(task.status, TaskStatus::Pending) {
return false;
}
// Check all dependencies are satisfied
for dep in &task.dependencies {
if !self.is_dependency_satisfied(*dep, tree).await {
return false;
}
}
// Check resource availability
if !self.resource_monitor.can_execute_task(task).await {
return false;
}
// Check for exclusive dependencies
if self.has_conflicting_active_tasks(task, tree).await {
return false;
}
true
}
async fn is_dependency_satisfied(&self, dep_id: TaskId, tree: &TaskTree) -> bool {
match tree.get_task(dep_id) {
Ok(dep_task) => {
matches!(dep_task.status, TaskStatus::Completed { .. })
}
Err(_) => false, // Missing dependency is not satisfied
}
}
}
```
## Task Execution Engine
### Execution Context Management
Each task execution maintains rich context and state:
```rust
#[derive(Debug, Clone)]
pub struct TaskExecutionContext {
pub task_id: TaskId,
pub claude_session: ClaudeSessionId,
pub working_directory: PathBuf,
pub environment: HashMap<String, String>,
pub file_watchers: Vec<FileWatcher>,
pub resource_allocation: ResourceAllocation,
pub execution_start: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct ResourceAllocation {
pub max_memory_mb: u64,
pub max_cpu_percent: f64,
pub max_duration: Duration,
pub temp_storage_mb: u64,
}
pub struct TaskExecutor {
claude_interface: Arc<ClaudeCodeInterface>,
resource_manager: Arc<ResourceManager>,
context_manager: Arc<ContextManager>,
}
impl TaskExecutor {
pub async fn execute_task(
&self,
task_id: TaskId,
tree: &TaskTree,
) -> Result<TaskExecutionResult> {
let task = tree.get_task(task_id)?;
// Set up execution context
let context = self.prepare_execution_context(&task).await?;
// Update task status to in-progress
let start_time = Utc::now();
self.update_task_status(task_id, TaskStatus::InProgress {
started_at: start_time,
estimated_completion: self.estimate_completion_time(&task),
}).await?;
// Execute the task through Claude Code
let result = match self.execute_with_claude(&task, &context).await {
Ok(result) => result,
Err(error) => {
self.handle_execution_error(task_id, error).await?;
return Ok(TaskExecutionResult::Failed);
}
};
// Process execution results
self.process_execution_result(task_id, &result).await?;
Ok(result)
}
async fn execute_with_claude(
&self,
task: &Task,
context: &TaskExecutionContext,
) -> Result<TaskExecutionResult> {
// Prepare Claude Code prompt with task context
let prompt = self.build_task_prompt(task, context).await?;
// Execute through headless SDK
let response = self.claude_interface
.execute_task_with_context(prompt, context)
.await?;
// Parse response for results and any new subtasks
let result = self.parse_claude_response(response, task).await?;
Ok(result)
}
}
```
### Dynamic Task Decomposition
The system can break down complex tasks during execution:
```rust
#[derive(Debug, Clone)]
pub enum TaskExecutionResult {
Completed {
result: serde_json::Value,
files_modified: Vec<PathBuf>,
build_artifacts: Vec<PathBuf>,
},
CompletedWithSubtasks {
result: serde_json::Value,
subtasks: Vec<TaskSpec>,
files_modified: Vec<PathBuf>,
},
Blocked {
reason: String,
required_resources: Vec<String>,
retry_after: Option<Duration>,
},
Failed,
}
impl TaskExecutor {
async fn process_execution_result(
&self,
task_id: TaskId,
result: &TaskExecutionResult,
) -> Result<()> {
match result {
TaskExecutionResult::CompletedWithSubtasks { subtasks, .. } => {
// Create new subtasks dynamically
let tree = self.get_task_tree_mut().await?;
tree.create_subtasks(task_id, subtasks.clone()).await?;
// Update parent task status
self.update_task_status(task_id, TaskStatus::InProgress {
started_at: Utc::now(),
estimated_completion: None,
}).await?;
}
TaskExecutionResult::Completed { .. } => {
self.update_task_status(task_id, TaskStatus::Completed {
completed_at: Utc::now(),
result: result.clone(),
}).await?;
}
TaskExecutionResult::Blocked { reason, retry_after, .. } => {
self.update_task_status(task_id, TaskStatus::Blocked {
reason: reason.clone(),
blocked_at: Utc::now(),
retry_after: retry_after.map(|d| Utc::now() + d),
}).await?;
}
TaskExecutionResult::Failed => {
self.handle_task_failure(task_id).await?;
}
}
Ok(())
}
}
```
## Progress Tracking & Metrics
### Real-time Progress Monitoring
```rust
#[derive(Debug, Clone, Serialize)]
pub struct TaskTreeProgress {
pub total_tasks: u32,
pub completed_tasks: u32,
pub in_progress_tasks: u32,
pub blocked_tasks: u32,
pub failed_tasks: u32,
pub estimated_completion: Option<DateTime<Utc>>,
pub current_throughput: f64, // tasks per hour
}
impl TaskTree {
pub fn calculate_progress(&self) -> TaskTreeProgress {
let mut progress = TaskTreeProgress {
total_tasks: self.tasks.len() as u32,
completed_tasks: 0,
in_progress_tasks: 0,
blocked_tasks: 0,
failed_tasks: 0,
estimated_completion: None,
current_throughput: 0.0,
};
for task in self.tasks.values() {
match task.status {
TaskStatus::Completed { .. } => progress.completed_tasks += 1,
TaskStatus::InProgress { .. } => progress.in_progress_tasks += 1,
TaskStatus::Blocked { .. } => progress.blocked_tasks += 1,
TaskStatus::Failed { .. } => progress.failed_tasks += 1,
_ => {}
}
}
progress.estimated_completion = self.estimate_total_completion();
progress.current_throughput = self.calculate_throughput();
progress
}
}
```
### Performance Analytics
```rust
#[derive(Debug, Clone, Serialize)]
pub struct TaskMetrics {
pub average_completion_time: Duration,
pub success_rate: f64,
pub retry_rate: f64,
pub subtask_creation_rate: f64,
pub context_reuse_efficiency: f64,
pub bottleneck_analysis: BottleneckAnalysis,
}
#[derive(Debug, Clone, Serialize)]
pub struct BottleneckAnalysis {
pub most_blocked_task_types: Vec<String>,
pub longest_running_tasks: Vec<TaskId>,
pub resource_constraints: Vec<ResourceConstraint>,
pub dependency_chains: Vec<DependencyChain>,
}
impl TaskTree {
pub fn generate_metrics(&self, time_window: Duration) -> TaskMetrics {
let recent_tasks = self.get_tasks_in_window(time_window);
TaskMetrics {
average_completion_time: self.calculate_average_completion_time(&recent_tasks),
success_rate: self.calculate_success_rate(&recent_tasks),
retry_rate: self.calculate_retry_rate(&recent_tasks),
subtask_creation_rate: self.calculate_subtask_creation_rate(&recent_tasks),
context_reuse_efficiency: self.calculate_context_reuse(&recent_tasks),
bottleneck_analysis: self.analyze_bottlenecks(&recent_tasks),
}
}
}
```
This task management system provides sophisticated orchestration capabilities while maintaining flexibility for dynamic task creation and intelligent resource utilization during long-running development workflows.