use crate::spec_ai_collective::capability::CapabilityTracker;
use crate::spec_ai_collective::types::{CollectiveError, Domain, InstanceId, Result, TaskId};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "snake_case")]
pub enum TaskPriority {
Low,
Normal,
High,
Critical,
}
#[allow(clippy::derivable_impls)]
impl Default for TaskPriority {
fn default() -> Self {
Self::Normal
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
Pending,
Delegated { to: InstanceId },
InProgress { by: InstanceId },
Completed,
Failed { reason: String },
Cancelled,
TimedOut,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DelegatedTask {
pub task_id: TaskId,
pub parent_task_id: Option<TaskId>,
pub task_type: String,
pub description: String,
pub required_capabilities: Vec<Domain>,
pub payload: serde_json::Value,
#[serde(default)]
pub priority: TaskPriority,
pub deadline: Option<DateTime<Utc>>,
#[serde(default)]
pub delegation_chain: Vec<InstanceId>,
#[serde(default = "default_status")]
pub status: TaskStatus,
pub created_at: DateTime<Utc>,
#[serde(default = "default_max_retries")]
pub max_retries: u32,
#[serde(default)]
pub retry_count: u32,
}
fn default_status() -> TaskStatus {
TaskStatus::Pending
}
fn default_max_retries() -> u32 {
3
}
impl DelegatedTask {
pub fn new(
task_type: impl Into<String>,
description: impl Into<String>,
payload: serde_json::Value,
) -> Self {
Self {
task_id: uuid::Uuid::new_v4().to_string(),
parent_task_id: None,
task_type: task_type.into(),
description: description.into(),
required_capabilities: Vec::new(),
payload,
priority: TaskPriority::Normal,
deadline: None,
delegation_chain: Vec::new(),
status: TaskStatus::Pending,
created_at: Utc::now(),
max_retries: 3,
retry_count: 0,
}
}
pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
self.required_capabilities = capabilities;
self
}
pub fn with_priority(mut self, priority: TaskPriority) -> Self {
self.priority = priority;
self
}
pub fn with_deadline(mut self, deadline: DateTime<Utc>) -> Self {
self.deadline = Some(deadline);
self
}
pub fn is_expired(&self) -> bool {
self.deadline.map(|d| Utc::now() > d).unwrap_or(false)
}
pub fn can_retry(&self) -> bool {
self.retry_count < self.max_retries
}
pub fn record_delegation(&mut self, from: InstanceId, to: InstanceId) {
self.delegation_chain.push(from);
self.status = TaskStatus::Delegated { to };
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskResult {
pub task_id: TaskId,
pub executor_id: InstanceId,
pub status: TaskStatus,
pub result: Option<serde_json::Value>,
pub metrics: ExecutionMetrics,
#[serde(default)]
pub learnings: Vec<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ExecutionMetrics {
pub duration_ms: u64,
pub tool_calls: u32,
pub model_calls: u32,
pub confidence: f32,
pub tokens_used: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingDecision {
pub target_instance: InstanceId,
pub confidence: f32,
pub reasoning: String,
pub fallback_instances: Vec<InstanceId>,
}
#[derive(Debug)]
pub struct DelegationManager {
instance_id: InstanceId,
pending_tasks: HashMap<TaskId, DelegatedTask>,
delegated_tasks: HashMap<TaskId, DelegatedTask>,
received_tasks: HashMap<TaskId, DelegatedTask>,
completed_tasks: HashMap<TaskId, TaskResult>,
min_capability_score: f32,
#[allow(dead_code)]
default_timeout: Duration,
}
impl DelegationManager {
pub fn new(instance_id: InstanceId) -> Self {
Self {
instance_id,
pending_tasks: HashMap::new(),
delegated_tasks: HashMap::new(),
received_tasks: HashMap::new(),
completed_tasks: HashMap::new(),
min_capability_score: 0.3,
default_timeout: Duration::minutes(30),
}
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn set_min_capability_score(&mut self, score: f32) {
self.min_capability_score = score;
}
pub fn add_task(&mut self, task: DelegatedTask) {
self.pending_tasks.insert(task.task_id.clone(), task);
}
pub fn get_routing_decision(
&self,
task: &DelegatedTask,
tracker: &CapabilityTracker,
) -> Result<RoutingDecision> {
let agents =
tracker.get_capable_agents(&task.required_capabilities, self.min_capability_score);
if agents.is_empty() {
return Err(CollectiveError::NoCapableAgent(task.task_type.clone()));
}
let primary = &agents[0];
let fallbacks: Vec<InstanceId> = agents
.iter()
.skip(1)
.take(3)
.map(|a| a.instance_id.clone())
.collect();
let reasoning = if primary.is_self {
format!("Self is best candidate with score {:.2}", primary.score)
} else {
format!(
"Agent {} has best capability score {:.2}",
primary.instance_id, primary.score
)
};
Ok(RoutingDecision {
target_instance: primary.instance_id.clone(),
confidence: primary.score,
reasoning,
fallback_instances: fallbacks,
})
}
pub fn mark_delegated(&mut self, task_id: &str, to: InstanceId) -> Result<()> {
if let Some(mut task) = self.pending_tasks.remove(task_id) {
task.record_delegation(self.instance_id.clone(), to.clone());
self.delegated_tasks.insert(task_id.to_string(), task);
Ok(())
} else {
Err(CollectiveError::DelegationFailed(format!(
"Task not found: {}",
task_id
)))
}
}
pub fn accept_task(&mut self, mut task: DelegatedTask) -> Result<()> {
task.status = TaskStatus::InProgress {
by: self.instance_id.clone(),
};
self.received_tasks.insert(task.task_id.clone(), task);
Ok(())
}
pub fn report_completion(&mut self, result: TaskResult) {
let task_id = result.task_id.clone();
if let Some(task) = self.received_tasks.get_mut(&task_id) {
task.status = result.status.clone();
}
self.completed_tasks.insert(task_id, result);
}
pub fn pending_tasks(&self) -> &HashMap<TaskId, DelegatedTask> {
&self.pending_tasks
}
pub fn delegated_tasks(&self) -> &HashMap<TaskId, DelegatedTask> {
&self.delegated_tasks
}
pub fn received_tasks(&self) -> &HashMap<TaskId, DelegatedTask> {
&self.received_tasks
}
pub fn get_result(&self, task_id: &str) -> Option<&TaskResult> {
self.completed_tasks.get(task_id)
}
pub fn handle_failure(&mut self, task_id: &str, reason: &str) -> Result<Option<InstanceId>> {
if let Some(task) = self.delegated_tasks.get_mut(task_id) {
task.retry_count += 1;
if task.can_retry() {
if let Some(mut task) = self.delegated_tasks.remove(task_id) {
task.status = TaskStatus::Pending;
self.pending_tasks.insert(task_id.to_string(), task);
}
Ok(None)
} else {
task.status = TaskStatus::Failed {
reason: reason.to_string(),
};
Err(CollectiveError::DelegationFailed(format!(
"Task {} failed after {} retries: {}",
task_id, task.retry_count, reason
)))
}
} else {
Err(CollectiveError::DelegationFailed(format!(
"Task not found: {}",
task_id
)))
}
}
pub fn cleanup_expired(&mut self) -> Vec<TaskId> {
let mut expired = Vec::new();
self.pending_tasks.retain(|id, task| {
if task.is_expired() {
expired.push(id.clone());
false
} else {
true
}
});
for (id, task) in &mut self.delegated_tasks {
if task.is_expired() {
task.status = TaskStatus::TimedOut;
expired.push(id.clone());
}
}
expired
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_creation() {
let task = DelegatedTask::new(
"code_review",
"Review the auth module",
serde_json::json!({"file": "auth.rs"}),
)
.with_capabilities(vec!["code_review".to_string(), "rust".to_string()])
.with_priority(TaskPriority::High);
assert_eq!(task.task_type, "code_review");
assert_eq!(task.required_capabilities.len(), 2);
assert_eq!(task.priority, TaskPriority::High);
assert!(!task.is_expired());
}
#[test]
fn test_delegation_manager() {
let mut manager = DelegationManager::new("agent-1".to_string());
let task = DelegatedTask::new("data_analysis", "Analyze sales data", serde_json::json!({}));
let task_id = task.task_id.clone();
manager.add_task(task);
assert!(manager.pending_tasks().contains_key(&task_id));
manager
.mark_delegated(&task_id, "agent-2".to_string())
.unwrap();
assert!(!manager.pending_tasks().contains_key(&task_id));
assert!(manager.delegated_tasks().contains_key(&task_id));
}
}