use anyhow::Result;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Clone)]
pub struct ExecutionContext {
pub repo_root: PathBuf,
pub forge_path: PathBuf,
pub current_branch: Option<String>,
pub changed_files: Vec<PathBuf>,
pub shared_state: Arc<RwLock<HashMap<String, serde_json::Value>>>,
pub traffic_analyzer: Arc<dyn TrafficAnalyzer + Send + Sync>,
pub component_manager: Option<Arc<RwLock<crate::context::ComponentStateManager>>>,
}
impl std::fmt::Debug for ExecutionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExecutionContext")
.field("repo_root", &self.repo_root)
.field("forge_path", &self.forge_path)
.field("current_branch", &self.current_branch)
.field("changed_files", &self.changed_files)
.field("traffic_analyzer", &"<dyn TrafficAnalyzer>")
.finish()
}
}
impl ExecutionContext {
pub fn new(repo_root: PathBuf, forge_path: PathBuf) -> Self {
let component_manager = crate::context::ComponentStateManager::new(&forge_path)
.ok()
.map(|mgr| Arc::new(RwLock::new(mgr)));
Self {
repo_root,
forge_path,
current_branch: None,
changed_files: Vec::new(),
shared_state: Arc::new(RwLock::new(HashMap::new())),
traffic_analyzer: Arc::new(DefaultTrafficAnalyzer),
component_manager,
}
}
pub fn set<T: Serialize>(&self, key: impl Into<String>, value: T) -> Result<()> {
let json = serde_json::to_value(value)?;
self.shared_state.write().insert(key.into(), json);
Ok(())
}
pub fn get<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
let state = self.shared_state.read();
if let Some(value) = state.get(key) {
let result = serde_json::from_value(value.clone())?;
Ok(Some(result))
} else {
Ok(None)
}
}
pub fn find_patterns(&self, _pattern: &str) -> Result<Vec<PatternMatch>> {
Ok(Vec::new())
}
}
#[derive(Debug, Clone)]
pub struct PatternMatch {
pub file: PathBuf,
pub line: usize,
pub col: usize,
pub text: String,
pub captures: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolOutput {
pub success: bool,
pub files_modified: Vec<PathBuf>,
pub files_created: Vec<PathBuf>,
pub files_deleted: Vec<PathBuf>,
pub message: String,
pub duration_ms: u64,
}
impl ToolOutput {
pub fn success() -> Self {
Self {
success: true,
files_modified: Vec::new(),
files_created: Vec::new(),
files_deleted: Vec::new(),
message: "Success".to_string(),
duration_ms: 0,
}
}
pub fn failure(message: impl Into<String>) -> Self {
Self {
success: false,
files_modified: Vec::new(),
files_created: Vec::new(),
files_deleted: Vec::new(),
message: message.into(),
duration_ms: 0,
}
}
}
pub trait DxTool: Send + Sync {
fn name(&self) -> &str;
fn version(&self) -> &str;
fn priority(&self) -> u32;
fn execute(&mut self, context: &ExecutionContext) -> Result<ToolOutput>;
fn should_run(&self, _context: &ExecutionContext) -> bool {
true
}
fn dependencies(&self) -> Vec<String> {
Vec::new()
}
fn before_execute(&mut self, _context: &ExecutionContext) -> Result<()> {
Ok(())
}
fn after_execute(&mut self, _context: &ExecutionContext, _output: &ToolOutput) -> Result<()> {
Ok(())
}
fn on_error(&mut self, _context: &ExecutionContext, _error: &anyhow::Error) -> Result<()> {
Ok(())
}
fn timeout_seconds(&self) -> u64 {
60
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum TrafficBranch {
Green,
Yellow { conflicts: Vec<Conflict> },
Red { conflicts: Vec<Conflict> },
}
#[derive(Debug, Clone, PartialEq)]
pub struct Conflict {
pub path: PathBuf,
pub line: usize,
pub reason: String,
}
pub trait TrafficAnalyzer {
fn analyze(&self, file: &Path) -> Result<TrafficBranch>;
fn can_auto_merge(&self, conflicts: &[Conflict]) -> bool;
}
pub struct DefaultTrafficAnalyzer;
impl TrafficAnalyzer for DefaultTrafficAnalyzer {
fn analyze(&self, file: &Path) -> Result<TrafficBranch> {
let extension = file
.extension()
.and_then(|e| e.to_str())
.unwrap_or("");
let green_patterns = [
"md", "txt", "json", "css", "scss", "less", "png", "jpg", "svg", "ico", "test.ts", "test.js", "spec.ts", "spec.js", ];
let red_patterns = [
"proto", "graphql", "gql", "sql", ];
if green_patterns.iter().any(|p| extension.ends_with(p)) {
return Ok(TrafficBranch::Green);
}
if red_patterns.iter().any(|p| extension.ends_with(p)) {
let conflict = Conflict {
path: file.to_path_buf(),
line: 0,
reason: format!("Breaking change potential: {} file modification", extension),
};
return Ok(TrafficBranch::Red {
conflicts: vec![conflict],
});
}
if matches!(
extension,
"ts" | "tsx" | "js" | "jsx" | "rs" | "go" | "py" | "java" | "cpp" | "c" | "h"
) {
let path_str = file.to_string_lossy().to_lowercase();
if path_str.contains("api")
|| path_str.contains("interface")
|| path_str.contains("types")
|| path_str.contains("schema")
{
let conflict = Conflict {
path: file.to_path_buf(),
line: 0,
reason: "API/Type definition file modification".to_string(),
};
return Ok(TrafficBranch::Red {
conflicts: vec![conflict],
});
}
return Ok(TrafficBranch::Yellow {
conflicts: vec![],
});
}
Ok(TrafficBranch::Yellow {
conflicts: vec![],
})
}
fn can_auto_merge(&self, conflicts: &[Conflict]) -> bool {
conflicts.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct OrchestratorConfig {
pub parallel: bool,
pub fail_fast: bool,
pub max_concurrent: usize,
pub traffic_branch_enabled: bool,
}
impl Default for OrchestratorConfig {
fn default() -> Self {
Self {
parallel: false,
fail_fast: true,
max_concurrent: 4,
traffic_branch_enabled: true,
}
}
}
pub struct Orchestrator {
tools: Vec<Box<dyn DxTool>>,
context: ExecutionContext,
config: OrchestratorConfig,
}
impl Orchestrator {
pub fn new(repo_root: impl Into<PathBuf>) -> Result<Self> {
let repo_root = repo_root.into();
let forge_path = repo_root.join(".dx/forge");
Ok(Self {
tools: Vec::new(),
context: ExecutionContext::new(repo_root, forge_path),
config: OrchestratorConfig::default(),
})
}
pub fn with_config(repo_root: impl Into<PathBuf>, config: OrchestratorConfig) -> Result<Self> {
let repo_root = repo_root.into();
let forge_path = repo_root.join(".dx/forge");
Ok(Self {
tools: Vec::new(),
context: ExecutionContext::new(repo_root, forge_path),
config,
})
}
pub fn set_config(&mut self, config: OrchestratorConfig) {
self.config = config;
}
pub fn register_tool(&mut self, tool: Box<dyn DxTool>) -> Result<()> {
let name = tool.name().to_string();
tracing::info!(
"📦 Registered tool: {} v{} (priority: {})",
name,
tool.version(),
tool.priority()
);
self.tools.push(tool);
Ok(())
}
pub fn execute_all(&mut self) -> Result<Vec<ToolOutput>> {
let start_time = std::time::Instant::now();
tracing::info!("🎼 Orchestrator starting execution of {} tools", self.tools.len());
self.tools.sort_by_key(|t| t.priority());
tracing::debug!("🔍 Validating tool dependencies...");
self.validate_dependencies()?;
tracing::debug!("🔄 Checking for circular dependencies...");
self.check_circular_dependencies()?;
tracing::debug!(
"📋 Execution order: {}",
self.tools
.iter()
.map(|t| format!("{}(p:{})", t.name(), t.priority()))
.collect::<Vec<_>>()
.join(" → ")
);
let outputs = if self.config.parallel {
self.execute_parallel()
} else {
self.execute_sequential()
}?;
let duration = start_time.elapsed();
let success_count = outputs.iter().filter(|o| o.success).count();
let failed_count = outputs.len() - success_count;
tracing::info!(
"🏁 Orchestration complete in {:.2}s: {} succeeded, {} failed",
duration.as_secs_f64(),
success_count,
failed_count
);
Ok(outputs)
}
fn execute_sequential(&mut self) -> Result<Vec<ToolOutput>> {
let mut outputs = Vec::new();
let context = self.context.clone();
let total_tools = self.tools.len();
let mut executed = 0;
let mut skipped = 0;
let mut failed = 0;
for tool in &mut self.tools {
if !tool.should_run(&context) {
tracing::info!("⏭️ Skipping {}: pre-check failed", tool.name());
skipped += 1;
continue;
}
tracing::info!(
"🚀 Executing: {} v{} (priority: {}, {}/{})",
tool.name(),
tool.version(),
tool.priority(),
executed + 1,
total_tools
);
match Self::execute_tool_with_hooks(tool, &context) {
Ok(output) => {
if output.success {
executed += 1;
tracing::info!("✅ {} completed in {}ms", tool.name(), output.duration_ms);
} else {
failed += 1;
tracing::error!("❌ {} failed: {}", tool.name(), output.message);
if self.config.fail_fast {
tracing::error!("💥 Fail-fast enabled, stopping orchestration");
return Err(anyhow::anyhow!("Tool {} failed: {}", tool.name(), output.message));
}
}
outputs.push(output);
}
Err(e) => {
failed += 1;
tracing::error!("💥 {} error: {}", tool.name(), e);
if self.config.fail_fast {
tracing::error!("💥 Fail-fast enabled, stopping orchestration");
return Err(e);
}
outputs.push(ToolOutput::failure(format!("Error: {}", e)));
}
}
}
tracing::info!(
"📊 Sequential execution complete: {} executed, {} skipped, {} failed",
executed,
skipped,
failed
);
Ok(outputs)
}
fn execute_parallel(&mut self) -> Result<Vec<ToolOutput>> {
tracing::info!("🚀 Parallel execution mode (max {} concurrent)", self.config.max_concurrent);
let dep_graph = self.build_dependency_graph();
let waves = self.compute_execution_waves(&dep_graph)?;
tracing::debug!("📊 Execution waves: {}", waves.len());
for (i, wave) in waves.iter().enumerate() {
tracing::debug!(" Wave {}: {} tools", i + 1, wave.len());
}
let mut all_outputs = Vec::new();
let context = self.context.clone();
for (wave_idx, wave_tools) in waves.into_iter().enumerate() {
tracing::info!("🌊 Executing wave {} with {} tools", wave_idx + 1, wave_tools.len());
let mut wave_outputs = Vec::new();
for tool_idx in wave_tools {
let tool = &mut self.tools[tool_idx];
if !tool.should_run(&context) {
tracing::info!("⏭️ Skipping {}: pre-check failed", tool.name());
continue;
}
tracing::info!("🚀 Executing: {} v{}", tool.name(), tool.version());
match Self::execute_tool_with_hooks(tool, &context) {
Ok(output) => {
if output.success {
tracing::info!("✅ {} completed in {}ms", tool.name(), output.duration_ms);
} else {
tracing::error!("❌ {} failed: {}", tool.name(), output.message);
if self.config.fail_fast {
return Err(anyhow::anyhow!("Tool {} failed: {}", tool.name(), output.message));
}
}
wave_outputs.push(output);
}
Err(e) => {
tracing::error!("💥 {} error: {}", tool.name(), e);
if self.config.fail_fast {
return Err(e);
}
wave_outputs.push(ToolOutput::failure(format!("Error: {}", e)));
}
}
}
all_outputs.extend(wave_outputs);
}
Ok(all_outputs)
}
fn build_dependency_graph(&self) -> HashMap<String, HashSet<String>> {
let mut graph = HashMap::new();
for tool in &self.tools {
let deps: HashSet<String> = tool.dependencies().into_iter().collect();
graph.insert(tool.name().to_string(), deps);
}
graph
}
fn compute_execution_waves(&self, dep_graph: &HashMap<String, HashSet<String>>) -> Result<Vec<Vec<usize>>> {
let mut waves: Vec<Vec<usize>> = Vec::new();
let mut completed: HashSet<String> = HashSet::new();
let mut remaining: Vec<usize> = (0..self.tools.len()).collect();
while !remaining.is_empty() {
let mut current_wave = Vec::new();
let mut next_remaining = Vec::new();
for &idx in &remaining {
let tool = &self.tools[idx];
let tool_name = tool.name().to_string();
let deps = dep_graph.get(&tool_name).cloned().unwrap_or_default();
let all_deps_met = deps.iter().all(|dep| completed.contains(dep));
if all_deps_met {
current_wave.push(idx);
completed.insert(tool_name);
} else {
next_remaining.push(idx);
}
}
if current_wave.is_empty() && !remaining.is_empty() {
let unmet: Vec<String> = remaining.iter()
.map(|&idx| self.tools[idx].name().to_string())
.collect();
return Err(anyhow::anyhow!(
"Cannot resolve dependencies for tools: {}",
unmet.join(", ")
));
}
if !current_wave.is_empty() {
waves.push(current_wave);
}
remaining = next_remaining;
}
Ok(waves)
}
fn execute_tool_with_hooks(tool: &mut Box<dyn DxTool>, context: &ExecutionContext) -> Result<ToolOutput> {
let start = std::time::Instant::now();
let tool_name = tool.name().to_string();
tracing::debug!("📝 Running before_execute hook for {}", tool_name);
tool.before_execute(context)?;
let result = if tool.timeout_seconds() > 0 {
tracing::debug!(
"⏱️ Executing {} with {}s timeout (note: timeout monitoring not yet implemented for sync tools)",
tool_name,
tool.timeout_seconds()
);
tool.execute(context)
} else {
tracing::debug!("🚀 Executing {} without timeout", tool_name);
tool.execute(context)
};
match result {
Ok(mut output) => {
let duration = start.elapsed();
output.duration_ms = duration.as_millis() as u64;
tracing::info!(
"✅ {} completed successfully in {:.2}s",
tool_name,
duration.as_secs_f64()
);
if !output.files_modified.is_empty() {
tracing::debug!(" 📝 Modified {} files", output.files_modified.len());
}
if !output.files_created.is_empty() {
tracing::debug!(" ✨ Created {} files", output.files_created.len());
}
if !output.files_deleted.is_empty() {
tracing::debug!(" 🗑️ Deleted {} files", output.files_deleted.len());
}
tracing::debug!("📝 Running after_execute hook for {}", tool_name);
tool.after_execute(context, &output)?;
Ok(output)
}
Err(e) => {
let duration = start.elapsed();
tracing::error!(
"❌ {} failed after {:.2}s: {}",
tool_name,
duration.as_secs_f64(),
e
);
tracing::debug!("📝 Running on_error hook for {}", tool_name);
tool.on_error(context, &e)?;
Err(e)
}
}
}
fn check_circular_dependencies(&self) -> Result<()> {
let mut visited = HashSet::new();
let mut stack = HashSet::new();
for tool in &self.tools {
if !visited.contains(tool.name()) {
self.check_circular_deps_recursive(tool.name(), &mut visited, &mut stack)?;
}
}
Ok(())
}
fn check_circular_deps_recursive(
&self,
tool_name: &str,
visited: &mut HashSet<String>,
stack: &mut HashSet<String>,
) -> Result<()> {
visited.insert(tool_name.to_string());
stack.insert(tool_name.to_string());
if let Some(tool) = self.tools.iter().find(|t| t.name() == tool_name) {
for dep in tool.dependencies() {
if !visited.contains(&dep) {
self.check_circular_deps_recursive(&dep, visited, stack)?;
} else if stack.contains(&dep) {
return Err(anyhow::anyhow!(
"Circular dependency detected: {} -> {}",
tool_name,
dep
));
}
}
}
stack.remove(tool_name);
Ok(())
}
fn validate_dependencies(&self) -> Result<()> {
let tool_names: HashSet<String> = self.tools.iter().map(|t| t.name().to_string()).collect();
for tool in &self.tools {
for dep in tool.dependencies() {
if !tool_names.contains(&dep) {
anyhow::bail!(
"Tool '{}' requires '{}' but it's not registered",
tool.name(),
dep
);
}
}
}
Ok(())
}
pub fn context(&self) -> &ExecutionContext {
&self.context
}
pub fn context_mut(&mut self) -> &mut ExecutionContext {
&mut self.context
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MockTool {
name: String,
priority: u32,
}
impl DxTool for MockTool {
fn name(&self) -> &str {
&self.name
}
fn version(&self) -> &str {
"1.0.0"
}
fn priority(&self) -> u32 {
self.priority
}
fn execute(&mut self, _ctx: &ExecutionContext) -> Result<ToolOutput> {
Ok(ToolOutput::success())
}
}
#[test]
fn test_orchestrator_priority_order() {
let mut orch = Orchestrator::new("/tmp/test").unwrap();
orch.register_tool(Box::new(MockTool {
name: "tool-c".into(),
priority: 30,
}))
.unwrap();
orch.register_tool(Box::new(MockTool {
name: "tool-a".into(),
priority: 10,
}))
.unwrap();
orch.register_tool(Box::new(MockTool {
name: "tool-b".into(),
priority: 20,
}))
.unwrap();
let outputs = orch.execute_all().unwrap();
assert_eq!(outputs.len(), 3);
assert!(outputs.iter().all(|o| o.success));
}
}