use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use terraphim_rolegraph::RoleGraph;
use crate::{
AnalysisConfig, DecompositionConfig, DecompositionResult, ExecutionPlan, ExecutionPlanner,
KnowledgeGraphConfig, KnowledgeGraphExecutionPlanner, KnowledgeGraphIntegration,
KnowledgeGraphTaskAnalyzer, KnowledgeGraphTaskDecomposer, PlanningConfig, Task, TaskAnalysis,
TaskAnalyzer, TaskDecomposer, TaskDecompositionResult, TerraphimKnowledgeGraph,
};
use crate::Automata;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskDecompositionWorkflow {
pub original_task: Task,
pub analysis: TaskAnalysis,
pub decomposition: DecompositionResult,
pub execution_plan: ExecutionPlan,
pub metadata: WorkflowMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowMetadata {
pub executed_at: chrono::DateTime<chrono::Utc>,
pub total_execution_time_ms: u64,
pub confidence_score: f64,
pub subtask_count: u32,
pub parallelism_factor: f64,
pub version: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskDecompositionSystemConfig {
pub analysis_config: AnalysisConfig,
pub decomposition_config: DecompositionConfig,
pub planning_config: PlanningConfig,
pub knowledge_graph_config: KnowledgeGraphConfig,
pub enrich_context: bool,
pub min_confidence_threshold: f64,
}
impl Default for TaskDecompositionSystemConfig {
fn default() -> Self {
Self {
analysis_config: AnalysisConfig::default(),
decomposition_config: DecompositionConfig::default(),
planning_config: PlanningConfig::default(),
knowledge_graph_config: KnowledgeGraphConfig::default(),
enrich_context: true,
min_confidence_threshold: 0.6,
}
}
}
#[async_trait]
pub trait TaskDecompositionSystem: Send + Sync {
async fn decompose_task_workflow(
&self,
task: &Task,
config: &TaskDecompositionSystemConfig,
) -> TaskDecompositionResult<TaskDecompositionWorkflow>;
async fn analyze_task(
&self,
task: &Task,
config: &AnalysisConfig,
) -> TaskDecompositionResult<TaskAnalysis>;
async fn decompose_task(
&self,
task: &Task,
config: &DecompositionConfig,
) -> TaskDecompositionResult<DecompositionResult>;
async fn create_execution_plan(
&self,
decomposition: &DecompositionResult,
config: &PlanningConfig,
) -> TaskDecompositionResult<ExecutionPlan>;
async fn validate_workflow(
&self,
workflow: &TaskDecompositionWorkflow,
) -> TaskDecompositionResult<bool>;
}
pub struct TerraphimTaskDecompositionSystem {
analyzer: Arc<dyn TaskAnalyzer>,
decomposer: Arc<dyn TaskDecomposer>,
planner: Arc<dyn ExecutionPlanner>,
knowledge_graph: Arc<dyn KnowledgeGraphIntegration>,
config: TaskDecompositionSystemConfig,
}
impl TerraphimTaskDecompositionSystem {
pub fn new(
automata: Arc<Automata>,
role_graph: Arc<RoleGraph>,
config: TaskDecompositionSystemConfig,
) -> Self {
let knowledge_graph = Arc::new(TerraphimKnowledgeGraph::new(
automata.clone(),
role_graph.clone(),
config.knowledge_graph_config.clone(),
));
let analyzer = Arc::new(KnowledgeGraphTaskAnalyzer::new(
automata.clone(),
role_graph.clone(),
));
let decomposer = Arc::new(KnowledgeGraphTaskDecomposer::new(automata, role_graph));
let planner = Arc::new(KnowledgeGraphExecutionPlanner::new());
Self {
analyzer,
decomposer,
planner,
knowledge_graph,
config,
}
}
pub fn with_default_config(automata: Arc<Automata>, role_graph: Arc<RoleGraph>) -> Self {
Self::new(
automata,
role_graph,
TaskDecompositionSystemConfig::default(),
)
}
fn calculate_workflow_confidence(
&self,
analysis: &TaskAnalysis,
decomposition: &DecompositionResult,
execution_plan: &ExecutionPlan,
) -> f64 {
let analysis_weight = 0.3;
let decomposition_weight = 0.4;
let planning_weight = 0.3;
let weighted_score = analysis.confidence_score * analysis_weight
+ decomposition.metadata.confidence_score * decomposition_weight
+ execution_plan.metadata.confidence_score * planning_weight;
weighted_score.clamp(0.0, 1.0)
}
fn validate_workflow_quality(&self, workflow: &TaskDecompositionWorkflow) -> bool {
if workflow.metadata.confidence_score < self.config.min_confidence_threshold {
warn!(
"Workflow confidence {} below threshold {}",
workflow.metadata.confidence_score, self.config.min_confidence_threshold
);
return false;
}
if workflow.decomposition.subtasks.len() <= 1
&& workflow.original_task.complexity.requires_decomposition()
{
warn!("Complex task was not meaningfully decomposed");
return false;
}
if workflow.execution_plan.phases.is_empty() {
warn!("Execution plan has no phases");
return false;
}
true
}
}
#[async_trait]
impl TaskDecompositionSystem for TerraphimTaskDecompositionSystem {
async fn decompose_task_workflow(
&self,
task: &Task,
config: &TaskDecompositionSystemConfig,
) -> TaskDecompositionResult<TaskDecompositionWorkflow> {
let start_time = std::time::Instant::now();
info!(
"Starting task decomposition workflow for task: {}",
task.task_id
);
let mut working_task = task.clone();
if config.enrich_context {
debug!("Enriching task context");
self.knowledge_graph
.enrich_task_context(&mut working_task)
.await?;
}
debug!("Analyzing task complexity and requirements");
let analysis = self
.analyzer
.analyze_task(&working_task, &config.analysis_config)
.await?;
let decomposition = if analysis.complexity.requires_decomposition() {
debug!("Decomposing task into subtasks");
self.decomposer
.decompose_task(&working_task, &config.decomposition_config)
.await?
} else {
debug!("Task does not require decomposition, creating single-task result");
DecompositionResult {
original_task: working_task.task_id.clone(),
subtasks: vec![working_task.clone()],
dependencies: HashMap::new(),
metadata: crate::DecompositionMetadata {
strategy_used: config.decomposition_config.strategy.clone(),
depth: 0,
subtask_count: 1,
concepts_analyzed: analysis.knowledge_domains.clone(),
roles_identified: Vec::new(),
confidence_score: 0.9,
parallelism_factor: 1.0,
},
}
};
debug!("Creating execution plan");
let execution_plan = self
.planner
.create_plan(&decomposition, &config.planning_config)
.await?;
let execution_time = start_time.elapsed();
let confidence_score =
self.calculate_workflow_confidence(&analysis, &decomposition, &execution_plan);
let workflow = TaskDecompositionWorkflow {
original_task: working_task,
analysis,
decomposition: decomposition.clone(),
execution_plan: execution_plan.clone(),
metadata: WorkflowMetadata {
executed_at: chrono::Utc::now(),
total_execution_time_ms: execution_time.as_millis() as u64,
confidence_score,
subtask_count: decomposition.subtasks.len() as u32,
parallelism_factor: execution_plan.metadata.parallelism_factor,
version: 1,
},
};
if !self.validate_workflow_quality(&workflow) {
warn!(
"Workflow quality validation flagged concerns for task {}",
task.task_id
);
}
info!(
"Completed task decomposition workflow for task {} in {}ms, confidence: {:.2}",
task.task_id,
workflow.metadata.total_execution_time_ms,
workflow.metadata.confidence_score
);
Ok(workflow)
}
async fn analyze_task(
&self,
task: &Task,
config: &AnalysisConfig,
) -> TaskDecompositionResult<TaskAnalysis> {
self.analyzer.analyze_task(task, config).await
}
async fn decompose_task(
&self,
task: &Task,
config: &DecompositionConfig,
) -> TaskDecompositionResult<DecompositionResult> {
self.decomposer.decompose_task(task, config).await
}
async fn create_execution_plan(
&self,
decomposition: &DecompositionResult,
config: &PlanningConfig,
) -> TaskDecompositionResult<ExecutionPlan> {
self.planner.create_plan(decomposition, config).await
}
async fn validate_workflow(
&self,
workflow: &TaskDecompositionWorkflow,
) -> TaskDecompositionResult<bool> {
let analysis_valid =
workflow.analysis.confidence_score >= self.config.min_confidence_threshold;
let decomposition_valid = self
.decomposer
.validate_decomposition(&workflow.decomposition)
.await?;
let plan_valid = self.planner.validate_plan(&workflow.execution_plan).await?;
Ok(analysis_valid && decomposition_valid && plan_valid) }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Task, TaskComplexity};
fn create_test_automata() -> Arc<Automata> {
Arc::new(Automata::default())
}
async fn create_test_role_graph() -> Arc<RoleGraph> {
use terraphim_automata::{AutomataPath, load_thesaurus};
use terraphim_types::RoleName;
let role_name = RoleName::new("test_role");
let thesaurus = load_thesaurus(&AutomataPath::local_example())
.await
.unwrap();
let role_graph = RoleGraph::new(role_name, thesaurus).await.unwrap();
Arc::new(role_graph)
}
fn create_test_task() -> Task {
Task::new(
"test_task".to_string(),
"Complex task requiring decomposition and analysis".to_string(),
TaskComplexity::Complex,
1,
)
}
#[tokio::test]
async fn test_system_creation() {
let automata = create_test_automata();
let role_graph = create_test_role_graph().await;
let config = TaskDecompositionSystemConfig::default();
let system = TerraphimTaskDecompositionSystem::new(automata, role_graph, config);
assert!(system.config.enrich_context);
}
#[tokio::test]
async fn test_workflow_execution() {
let automata = create_test_automata();
let role_graph = create_test_role_graph().await;
let system = TerraphimTaskDecompositionSystem::with_default_config(automata, role_graph);
let task = create_test_task();
let config = TaskDecompositionSystemConfig {
min_confidence_threshold: 0.1, ..Default::default()
};
let result = system.decompose_task_workflow(&task, &config).await;
assert!(result.is_ok());
let workflow = result.unwrap();
assert_eq!(workflow.original_task.task_id, "test_task");
assert!(!workflow.decomposition.subtasks.is_empty());
assert!(!workflow.execution_plan.phases.is_empty());
assert!(workflow.metadata.confidence_score > 0.0);
}
#[tokio::test]
async fn test_simple_task_workflow() {
let automata = create_test_automata();
let role_graph = create_test_role_graph().await;
let system = TerraphimTaskDecompositionSystem::with_default_config(automata, role_graph);
let simple_task = Task::new(
"simple_task".to_string(),
"Simple task".to_string(),
TaskComplexity::Simple,
1,
);
let config = TaskDecompositionSystemConfig::default();
let result = system.decompose_task_workflow(&simple_task, &config).await;
assert!(result.is_ok());
let workflow = result.unwrap();
assert_eq!(workflow.decomposition.subtasks.len(), 1);
assert_eq!(workflow.decomposition.metadata.depth, 0);
}
#[tokio::test]
async fn test_workflow_validation() {
let automata = create_test_automata();
let role_graph = create_test_role_graph().await;
let config = TaskDecompositionSystemConfig {
min_confidence_threshold: 0.1, ..Default::default()
};
let system = TerraphimTaskDecompositionSystem::new(automata, role_graph, config.clone());
let task = create_test_task();
let workflow = system
.decompose_task_workflow(&task, &config)
.await
.unwrap();
let is_valid = system.validate_workflow(&workflow).await.unwrap();
assert!(is_valid);
}
#[test]
fn test_system_config_defaults() {
let config = TaskDecompositionSystemConfig::default();
assert!(config.enrich_context);
assert_eq!(config.min_confidence_threshold, 0.6);
assert_eq!(config.analysis_config.min_confidence_threshold, 0.6);
assert_eq!(config.decomposition_config.max_depth, 3);
}
#[tokio::test]
async fn test_confidence_calculation() {
let automata = create_test_automata();
let role_graph = create_test_role_graph().await;
let config = TaskDecompositionSystemConfig {
min_confidence_threshold: 0.1, ..Default::default()
};
let system = TerraphimTaskDecompositionSystem::new(automata, role_graph, config.clone());
let task = create_test_task();
let workflow_result = system.decompose_task_workflow(&task, &config).await;
match workflow_result {
Ok(workflow) => {
assert!(workflow.metadata.confidence_score > 0.0);
assert!(workflow.metadata.confidence_score <= 1.0);
let manual_confidence = system.calculate_workflow_confidence(
&workflow.analysis,
&workflow.decomposition,
&workflow.execution_plan,
);
assert_eq!(workflow.metadata.confidence_score, manual_confidence);
}
Err(e) => {
println!("Workflow decomposition failed: {:?}", e);
panic!("Workflow decomposition should succeed with low confidence threshold");
}
}
}
}