pub mod executor_adapter;
pub mod guard;
pub mod menu;
pub mod menu_system;
pub mod oracle;
pub mod stall_detection;
pub mod state_machine;
pub mod telemetry;
pub use executor_adapter::{ExecutorAdapter, ParallelExecution, ResourceManager, ResultAggregator};
pub use guard::{CapabilityMapper, GuardEngine, PolicyValidator, SecurityAnalyzer};
pub use menu::{InterventionResult, MenuSystem, UserInterventionOption};
pub use oracle::{
DeepResearchAssistant, Oracle, PlanningCommittee, PlanningConsensus, ResearchResult,
};
pub use stall_detection::{RecoveryStrategy, StallDetectionConfig, StallDetector, StallType};
pub use state_machine::{StateMachine, StateTransition, WorkflowState, WorkflowType};
pub use telemetry::{ExportFormat, TelemetryCollector, WorkflowMetrics};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
#[derive(Clone)]
pub struct PlannerExecutorController {
config: PlannerConfig,
state_machine: Arc<RwLock<StateMachine>>,
oracle: Arc<Oracle>,
guard: Arc<GuardEngine>,
executor_adapter: Arc<ExecutorAdapter>,
stall_detector: Arc<StallDetector>,
menu_system: Arc<MenuSystem>,
telemetry: Arc<Mutex<TelemetryCollector>>,
active_workflows: Arc<RwLock<HashMap<Uuid, WorkflowContext>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannerConfig {
pub ai_config: AiConfig,
pub security_config: SecurityConfig,
pub execution_config: ExecutionConfig,
pub stall_config: StallDetectionConfig,
pub telemetry_config: TelemetryConfig,
pub nats_config: NatsConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AiConfig {
pub provider: String, pub model: String,
pub max_tokens: u32,
pub temperature: f32,
pub timeout_seconds: u64,
pub retry_attempts: u32,
pub rate_limit_per_minute: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
pub enable_policy_validation: bool,
pub enable_security_analysis: bool,
pub enable_capability_restrictions: bool,
pub max_execution_time_seconds: u64,
pub max_parallel_operations: u32,
pub allowed_capabilities: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionConfig {
pub max_concurrent_workflows: u32,
pub max_workflow_duration_hours: u32,
pub resource_limits: ResourceLimits,
pub retry_policy: RetryPolicy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceLimits {
pub max_memory_mb: u64,
pub max_cpu_percent: u32,
pub max_disk_mb: u64,
pub max_network_connections: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_retries: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
pub backoff_multiplier: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TelemetryConfig {
pub enable_metrics: bool,
pub enable_audit_trail: bool,
pub export_formats: Vec<ExportFormat>,
pub retention_days: u32,
pub metrics_interval_seconds: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsConfig {
pub servers: Vec<String>,
pub subjects: NatsSubjects,
pub stream_config: NatsStreamConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsSubjects {
pub goals: String,
pub workflows: String,
pub results: String,
pub telemetry: String,
pub interventions: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsStreamConfig {
pub max_messages: u64,
pub max_bytes: u64,
pub retention_hours: u64,
pub replicas: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Goal {
pub id: Uuid,
pub description: String,
pub context: Option<String>,
pub constraints: Vec<String>,
pub success_criteria: Vec<String>,
pub priority: Priority,
pub metadata: HashMap<String, String>,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Priority {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone)]
pub struct WorkflowContext {
pub goal: Goal,
pub workflow_type: WorkflowType,
pub current_state: WorkflowState,
pub execution_history: Vec<StateTransition>,
pub oracle_decisions: Vec<oracle::OracleDecision>,
pub guard_validations: Vec<guard::GuardResult>,
pub execution_results: Vec<executor_adapter::ExecutionResult>,
pub stall_detections: Vec<stall_detection::StallEvent>,
pub user_interventions: Vec<menu_system::InterventionEvent>,
pub metrics: WorkflowMetrics,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
impl PlannerConfig {
pub fn production() -> Self {
Self {
ai_config: AiConfig {
provider: "claude".to_string(),
model: "claude-3-5-sonnet-20241022".to_string(),
max_tokens: 4096,
temperature: 0.1,
timeout_seconds: 120,
retry_attempts: 3,
rate_limit_per_minute: 60,
},
security_config: SecurityConfig {
enable_policy_validation: true,
enable_security_analysis: true,
enable_capability_restrictions: true,
max_execution_time_seconds: 3600,
max_parallel_operations: 10,
allowed_capabilities: vec![
"fs.read.v1".to_string(),
"http.fetch.v1".to_string(),
"process.run.v1".to_string(),
],
},
execution_config: ExecutionConfig {
max_concurrent_workflows: 5,
max_workflow_duration_hours: 24,
resource_limits: ResourceLimits {
max_memory_mb: 1024,
max_cpu_percent: 80,
max_disk_mb: 1024,
max_network_connections: 50,
},
retry_policy: RetryPolicy {
max_retries: 3,
initial_backoff_ms: 1000,
max_backoff_ms: 30000,
backoff_multiplier: 2.0,
},
},
stall_config: StallDetectionConfig::default(),
telemetry_config: TelemetryConfig {
enable_metrics: true,
enable_audit_trail: true,
export_formats: vec![ExportFormat::Json, ExportFormat::Prometheus],
retention_days: 30,
metrics_interval_seconds: 60,
},
nats_config: NatsConfig {
servers: vec!["nats://localhost:4222".to_string()],
subjects: NatsSubjects {
goals: "smith.planner.goals".to_string(),
workflows: "smith.planner.workflows".to_string(),
results: "smith.planner.results".to_string(),
telemetry: "smith.planner.telemetry".to_string(),
interventions: "smith.planner.interventions".to_string(),
},
stream_config: NatsStreamConfig {
max_messages: 1000000,
max_bytes: 1073741824, retention_hours: 168, replicas: 1,
},
},
}
}
pub fn development() -> Self {
let mut config = Self::production();
config.security_config.enable_policy_validation = false;
config.security_config.enable_security_analysis = false;
config.ai_config.temperature = 0.2;
config.execution_config.max_concurrent_workflows = 2;
config.telemetry_config.retention_days = 7;
config
}
pub fn test() -> Self {
let mut config = Self::development();
config.ai_config.provider = "mock".to_string();
config.ai_config.timeout_seconds = 5;
config.execution_config.max_workflow_duration_hours = 1;
config.telemetry_config.enable_audit_trail = false;
config
}
}
impl Goal {
pub fn new(description: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
description: description.into(),
context: None,
constraints: Vec::new(),
success_criteria: Vec::new(),
priority: Priority::Medium,
metadata: HashMap::new(),
created_at: chrono::Utc::now(),
}
}
pub fn with_context(mut self, context: impl Into<String>) -> Self {
self.context = Some(context.into());
self
}
pub fn with_constraints(mut self, constraints: Vec<String>) -> Self {
self.constraints = constraints;
self
}
pub fn with_success_criteria(mut self, criteria: Vec<String>) -> Self {
self.success_criteria = criteria;
self
}
pub fn with_priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}
impl PlannerExecutorController {
pub async fn new(config: PlannerConfig) -> Result<Self> {
info!("Initializing planner-executor controller");
let state_machine = Arc::new(RwLock::new(StateMachine::new()));
let oracle = Arc::new(Oracle::new(&config.ai_config).await?);
let guard = Arc::new(GuardEngine::new(&config.security_config).await?);
let executor_adapter = Arc::new(ExecutorAdapter::new(&config.execution_config).await?);
let stall_detector = Arc::new(StallDetector::new(config.stall_config.clone()));
let menu_system = Arc::new(MenuSystem::new());
let telemetry = Arc::new(Mutex::new(
TelemetryCollector::new(telemetry::TelemetryConfig::default()).await?,
));
let active_workflows = Arc::new(RwLock::new(HashMap::new()));
info!("Planner-executor controller initialized successfully");
Ok(Self {
config,
state_machine,
oracle,
guard,
executor_adapter,
stall_detector,
menu_system,
telemetry,
active_workflows,
})
}
pub async fn submit_goal(&self, goal: Goal, workflow_type: WorkflowType) -> Result<Uuid> {
let workflow_id = Uuid::new_v4();
info!(
workflow_id = %workflow_id,
goal_id = %goal.id,
"Submitting goal for execution"
);
let context = WorkflowContext {
goal: goal.clone(),
workflow_type,
current_state: WorkflowState::Initializing,
execution_history: Vec::new(),
oracle_decisions: Vec::new(),
guard_validations: Vec::new(),
execution_results: Vec::new(),
stall_detections: Vec::new(),
user_interventions: Vec::new(),
metrics: WorkflowMetrics {
total_workflows: 1,
active_workflows: 1,
successful_workflows: 0,
failed_workflows: 0,
average_completion_time: None,
state_distribution: HashMap::new(),
recent_events: Vec::new(),
},
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
self.active_workflows
.write()
.await
.insert(workflow_id, context);
let controller = self.clone();
tokio::spawn(async move {
if let Err(e) = controller.execute_workflow(workflow_id).await {
error!(workflow_id = %workflow_id, error = %e, "Workflow execution failed");
}
});
Ok(workflow_id)
}
async fn execute_workflow(&self, workflow_id: Uuid) -> Result<()> {
info!(workflow_id = %workflow_id, "Starting workflow execution");
loop {
let current_state = {
let workflows = self.active_workflows.read().await;
workflows
.get(&workflow_id)
.map(|ctx| ctx.current_state.clone())
.ok_or_else(|| anyhow::anyhow!("Workflow not found: {}", workflow_id))?
};
if let Some(stall_event) = self
.stall_detector
.check_stall(workflow_id, ¤t_state)
.await?
{
warn!(workflow_id = %workflow_id, stall_type = ?stall_event.stall_type, "Stall detected");
let recovery_result = self.handle_stall(workflow_id, stall_event).await?;
if !recovery_result {
return Err(anyhow::anyhow!("Unable to recover from stall"));
}
}
let transition_result = match current_state {
WorkflowState::Initializing => self.handle_initialize_state(workflow_id).await?,
WorkflowState::Planning => self.handle_planning_state(workflow_id).await?,
WorkflowState::Executing => self.handle_executing_state(workflow_id).await?,
WorkflowState::Evaluating => self.handle_evaluating_state(workflow_id).await?,
WorkflowState::Completed => {
info!(workflow_id = %workflow_id, "Workflow completed successfully");
break;
}
WorkflowState::Failed => {
warn!(workflow_id = %workflow_id, "Workflow failed");
break;
}
};
self.update_workflow_state(workflow_id, transition_result)
.await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
self.finalize_workflow(workflow_id).await?;
Ok(())
}
async fn handle_initialize_state(&self, workflow_id: Uuid) -> Result<StateTransition> {
debug!(workflow_id = %workflow_id, "Handling initialize state");
let goal = {
let workflows = self.active_workflows.read().await;
workflows
.get(&workflow_id)
.map(|ctx| ctx.goal.clone())
.ok_or_else(|| anyhow::anyhow!("Workflow not found: {}", workflow_id))?
};
let guard_result = self.guard.validate_goal(&goal).await?;
{
let mut workflows = self.active_workflows.write().await;
if let Some(ctx) = workflows.get_mut(&workflow_id) {
ctx.guard_validations.push(guard_result.clone());
ctx.updated_at = chrono::Utc::now();
}
}
if guard_result.approved {
Ok(StateTransition {
from: WorkflowState::Initializing,
to: WorkflowState::Planning,
timestamp: chrono::Utc::now(),
reason: "Initial validation passed".to_string(),
metadata: HashMap::new(),
})
} else {
Ok(StateTransition {
from: WorkflowState::Initializing,
to: WorkflowState::Failed,
timestamp: chrono::Utc::now(),
reason: format!("Initial validation failed: {}", guard_result.reason),
metadata: HashMap::new(),
})
}
}
async fn handle_planning_state(&self, workflow_id: Uuid) -> Result<StateTransition> {
debug!(workflow_id = %workflow_id, "Handling planning state");
let goal = {
let workflows = self.active_workflows.read().await;
workflows
.get(&workflow_id)
.map(|ctx| ctx.goal.clone())
.ok_or_else(|| anyhow::anyhow!("Workflow not found: {}", workflow_id))?
};
let oracle_decision = self.oracle.plan_execution(&goal).await?;
{
let mut workflows = self.active_workflows.write().await;
if let Some(ctx) = workflows.get_mut(&workflow_id) {
ctx.oracle_decisions.push(oracle_decision.clone());
ctx.updated_at = chrono::Utc::now();
}
}
let plan_validation = self.guard.validate_plan(&oracle_decision.plan).await?;
if plan_validation.approved {
Ok(StateTransition {
from: WorkflowState::Planning,
to: WorkflowState::Executing,
timestamp: chrono::Utc::now(),
reason: "Plan validated and approved".to_string(),
metadata: HashMap::new(),
})
} else {
if oracle_decision.confidence > 0.7 {
Ok(StateTransition {
from: WorkflowState::Planning,
to: WorkflowState::Planning, timestamp: chrono::Utc::now(),
reason: format!("Plan rejected, retrying: {}", plan_validation.reason),
metadata: HashMap::new(),
})
} else {
Ok(StateTransition {
from: WorkflowState::Planning,
to: WorkflowState::Failed,
timestamp: chrono::Utc::now(),
reason: format!("Low confidence plan rejected: {}", plan_validation.reason),
metadata: HashMap::new(),
})
}
}
}
async fn handle_executing_state(&self, workflow_id: Uuid) -> Result<StateTransition> {
debug!(workflow_id = %workflow_id, "Handling executing state");
let plan = {
let workflows = self.active_workflows.read().await;
workflows
.get(&workflow_id)
.and_then(|ctx| ctx.oracle_decisions.last())
.map(|decision| decision.plan.clone())
.ok_or_else(|| {
anyhow::anyhow!("No execution plan found for workflow: {}", workflow_id)
})?
};
let execution_result = self.executor_adapter.execute_plan(&plan).await?;
{
let mut workflows = self.active_workflows.write().await;
if let Some(ctx) = workflows.get_mut(&workflow_id) {
ctx.execution_results.push(execution_result.clone());
ctx.updated_at = chrono::Utc::now();
}
}
if execution_result.success {
Ok(StateTransition {
from: WorkflowState::Executing,
to: WorkflowState::Evaluating,
timestamp: chrono::Utc::now(),
reason: "Execution completed successfully".to_string(),
metadata: HashMap::new(),
})
} else {
if execution_result.retryable && execution_result.attempt_count < 3 {
Ok(StateTransition {
from: WorkflowState::Executing,
to: WorkflowState::Executing, timestamp: chrono::Utc::now(),
reason: format!(
"Execution failed, retrying: {}",
execution_result.error_message
),
metadata: HashMap::new(),
})
} else {
Ok(StateTransition {
from: WorkflowState::Executing,
to: WorkflowState::Failed,
timestamp: chrono::Utc::now(),
reason: format!("Execution failed: {}", execution_result.error_message),
metadata: HashMap::new(),
})
}
}
}
async fn handle_evaluating_state(&self, workflow_id: Uuid) -> Result<StateTransition> {
debug!(workflow_id = %workflow_id, "Handling evaluating state");
let (goal, execution_results) = {
let workflows = self.active_workflows.read().await;
workflows
.get(&workflow_id)
.map(|ctx| (ctx.goal.clone(), ctx.execution_results.clone()))
.ok_or_else(|| anyhow::anyhow!("Workflow not found: {}", workflow_id))?
};
let evaluation = self
.oracle
.evaluate_results(&goal, &execution_results)
.await?;
if evaluation.success {
Ok(StateTransition {
from: WorkflowState::Evaluating,
to: WorkflowState::Completed,
timestamp: chrono::Utc::now(),
reason: format!("Goal achieved: {}", evaluation.summary),
metadata: HashMap::new(),
})
} else {
if evaluation.improvement_possible && evaluation.confidence > 0.5 {
Ok(StateTransition {
from: WorkflowState::Evaluating,
to: WorkflowState::Planning, timestamp: chrono::Utc::now(),
reason: format!("Goal not achieved, re-planning: {}", evaluation.summary),
metadata: HashMap::new(),
})
} else {
Ok(StateTransition {
from: WorkflowState::Evaluating,
to: WorkflowState::Failed,
timestamp: chrono::Utc::now(),
reason: format!("Goal not achievable: {}", evaluation.summary),
metadata: HashMap::new(),
})
}
}
}
async fn handle_stall(
&self,
workflow_id: Uuid,
stall_event: stall_detection::StallEvent,
) -> Result<bool> {
warn!(workflow_id = %workflow_id, stall_type = ?stall_event.stall_type, "Handling workflow stall");
{
let mut workflows = self.active_workflows.write().await;
if let Some(ctx) = workflows.get_mut(&workflow_id) {
ctx.stall_detections.push(stall_event.clone());
ctx.updated_at = chrono::Utc::now();
}
}
match stall_event.recovery_strategy {
RecoveryStrategy::AutoRetry => {
info!(workflow_id = %workflow_id, "Attempting automatic retry for stall recovery");
Ok(true)
}
RecoveryStrategy::UserIntervention => {
info!(workflow_id = %workflow_id, "Requesting user intervention for stall recovery");
let intervention_result = self
.menu_system
.request_intervention(workflow_id, &stall_event)
.await?;
{
let mut workflows = self.active_workflows.write().await;
if let Some(ctx) = workflows.get_mut(&workflow_id) {
ctx.user_interventions
.push(intervention_result.clone().into());
ctx.updated_at = chrono::Utc::now();
}
}
Ok(intervention_result.continue_execution)
}
RecoveryStrategy::Escalate => {
warn!(workflow_id = %workflow_id, "Escalating stall - manual resolution required");
Ok(false)
}
RecoveryStrategy::Fail => {
error!(workflow_id = %workflow_id, "Unrecoverable stall detected");
Ok(false)
}
}
}
async fn update_workflow_state(
&self,
workflow_id: Uuid,
transition: StateTransition,
) -> Result<()> {
let mut workflows = self.active_workflows.write().await;
if let Some(ctx) = workflows.get_mut(&workflow_id) {
ctx.current_state = transition.to.clone();
ctx.execution_history.push(transition);
ctx.updated_at = chrono::Utc::now();
debug!(
workflow_id = %workflow_id,
new_state = ?ctx.current_state,
"Workflow state updated"
);
}
Ok(())
}
async fn finalize_workflow(&self, workflow_id: Uuid) -> Result<()> {
info!(workflow_id = %workflow_id, "Finalizing workflow");
let final_context = {
let workflows = self.active_workflows.read().await;
workflows.get(&workflow_id).cloned()
};
if let Some(context) = final_context {
let mut telemetry = self.telemetry.lock().await;
telemetry
.record_workflow_event(telemetry::WorkflowEvent {
workflow_id: workflow_id,
event_type: telemetry::WorkflowEventType::GoalCompleted {
success: context.current_state == WorkflowState::Completed,
duration: std::time::Duration::from_secs(0), },
timestamp: std::time::SystemTime::now(),
user_id: None,
metadata: HashMap::new(),
})
.await;
self.active_workflows.write().await.remove(&workflow_id);
info!(
workflow_id = %workflow_id,
final_state = ?context.current_state,
duration_seconds = (context.updated_at - context.created_at).num_seconds(),
"Workflow finalized"
);
}
Ok(())
}
pub async fn get_workflow_status(&self, workflow_id: Uuid) -> Result<Option<WorkflowContext>> {
let workflows = self.active_workflows.read().await;
Ok(workflows.get(&workflow_id).cloned())
}
pub async fn list_active_workflows(&self) -> Result<Vec<Uuid>> {
let workflows = self.active_workflows.read().await;
Ok(workflows.keys().cloned().collect())
}
pub async fn export_telemetry(&self, format: ExportFormat) -> Result<String> {
let telemetry = self.telemetry.lock().await;
telemetry
.export_metrics(format)
.await
.map_err(|e| anyhow::anyhow!("Telemetry export failed: {}", e))
}
pub async fn shutdown(&self) -> Result<()> {
info!("Shutting down planner-executor controller");
let timeout = tokio::time::Duration::from_secs(300); let start = tokio::time::Instant::now();
while !self.active_workflows.read().await.is_empty() && start.elapsed() < timeout {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
let remaining = self.active_workflows.read().await.len();
if remaining > 0 {
warn!(
remaining_workflows = remaining,
"Force shutting down remaining workflows"
);
}
let telemetry = self.telemetry.lock().await;
if let Ok(export) = telemetry.export_metrics(ExportFormat::Json).await {
info!("Final telemetry export completed: {} bytes", export.len());
}
info!("Planner-executor controller shutdown completed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_controller_initialization() {
let config = PlannerConfig::test();
let controller = PlannerExecutorController::new(config).await;
assert!(controller.is_ok());
}
#[tokio::test]
async fn test_goal_submission() {
let config = PlannerConfig::test();
let controller = PlannerExecutorController::new(config).await.unwrap();
let goal = Goal::new("Test goal")
.with_context("Test context")
.with_priority(Priority::Low);
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await;
assert!(workflow_id.is_ok());
}
#[tokio::test]
async fn test_workflow_status() {
let config = PlannerConfig::test();
let controller = PlannerExecutorController::new(config).await.unwrap();
let goal = Goal::new("Test goal");
let workflow_id = controller
.submit_goal(goal, WorkflowType::Simple)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let status = controller.get_workflow_status(workflow_id).await.unwrap();
assert!(status.is_some());
}
}
#[cfg(test)]
mod mod_tests;