agentic_workflow_mcp/tools/
intelligence_tools.rs1use serde_json::json;
2
3use crate::types::{ToolDefinition, ToolResult};
4use super::registry::EngineState;
5
6fn def(name: &str, desc: &str, props: serde_json::Value) -> ToolDefinition {
7 ToolDefinition {
8 name: name.to_string(),
9 description: desc.to_string(),
10 input_schema: json!({ "type": "object", "properties": props }),
11 }
12}
13
14fn s(d: &str) -> serde_json::Value { json!({ "type": "string", "description": d }) }
15
16pub fn definitions() -> Vec<ToolDefinition> {
17 vec![
18 def("workflow_archaeology_compare", "Compare two workflow executions side by side",
20 json!({ "execution_a": s("First execution ID"), "execution_b": s("Second execution ID") })),
21 def("workflow_archaeology_anomaly", "Detect anomalous executions for a workflow",
22 json!({ "workflow_id": s("Workflow ID") })),
23 def("workflow_archaeology_bottleneck", "Identify bottleneck steps across executions",
24 json!({ "workflow_id": s("Workflow ID") })),
25 def("workflow_archaeology_trend", "Get execution duration trend for a workflow",
26 json!({ "workflow_id": s("Workflow ID") })),
27 def("workflow_archaeology_root_cause", "Analyze root cause of execution failures",
28 json!({ "workflow_id": s("Workflow ID"), "execution_id": s("Failed execution ID") })),
29 def("workflow_predict_duration", "Predict execution duration based on historical data",
31 json!({ "workflow_id": s("Workflow ID") })),
32 def("workflow_predict_success", "Predict success probability for a workflow run",
33 json!({ "workflow_id": s("Workflow ID") })),
34 def("workflow_predict_resources", "Predict resource consumption for a workflow run",
35 json!({ "workflow_id": s("Workflow ID") })),
36 def("workflow_predict_cost", "Predict monetary cost for a workflow run",
37 json!({ "workflow_id": s("Workflow ID") })),
38 def("workflow_evolve_health", "Get workflow health score and issues",
40 json!({ "workflow_id": s("Workflow ID") })),
41 def("workflow_evolve_drift", "Detect performance drift in a workflow",
42 json!({ "workflow_id": s("Workflow ID") })),
43 def("workflow_evolve_suggest", "Get optimization suggestions for a workflow",
44 json!({ "workflow_id": s("Workflow ID") })),
45 def("workflow_evolve_outdated", "Identify steps with increasing failure rates",
46 json!({ "workflow_id": s("Workflow ID") })),
47 def("workflow_evolve_auto_fix", "Apply automatic fixes for detected issues",
48 json!({ "workflow_id": s("Workflow ID"), "issue_type": s("Issue type to fix") })),
49 def("workflow_dream_start", "Start idle-time workflow analysis and maintenance",
51 json!({ "workflow_id": s("Workflow ID") })),
52 def("workflow_dream_insights", "Get proactive insights discovered during dream state",
53 json!({ "workflow_id": s("Optional workflow ID filter") })),
54 def("workflow_dream_validate", "Validate dream insights before applying",
55 json!({ "workflow_id": s("Workflow ID") })),
56 def("workflow_dream_optimize", "Apply dream-state optimization recommendations",
57 json!({ "workflow_id": s("Workflow ID") })),
58 ]
59}
60
61pub fn dispatch(
62 name: &str,
63 params: serde_json::Value,
64 state: &mut EngineState,
65) -> Result<ToolResult, (i32, String)> {
66 match name {
67 "workflow_archaeology_compare" => {
69 let ea = params["execution_a"].as_str().unwrap_or("");
70 let eb = params["execution_b"].as_str().unwrap_or("");
71 match state.archaeology.compare(ea, eb) {
72 Ok(cmp) => Ok(ToolResult::text(json!({
73 "execution_a": cmp.execution_a,
74 "execution_b": cmp.execution_b,
75 "duration_a_ms": cmp.duration_a_ms,
76 "duration_b_ms": cmp.duration_b_ms,
77 "duration_ratio": cmp.duration_ratio,
78 "significant_diffs": cmp.significant_step_diffs.len()
79 }).to_string())),
80 Err(e) => Ok(ToolResult::error(format!("{}", e))),
81 }
82 }
83 "workflow_archaeology_anomaly" => {
84 let wid = params["workflow_id"].as_str().unwrap_or("");
85 let anomalies = state.archaeology.detect_anomalies(wid);
86 let items: Vec<_> = anomalies.iter().map(|a| json!({
87 "execution_id": a.execution_id,
88 "metric": a.metric,
89 "actual": a.actual,
90 "expected": a.expected,
91 "deviation_factor": a.deviation_factor
92 })).collect();
93 Ok(ToolResult::text(json!({ "anomalies": items }).to_string()))
94 }
95 "workflow_archaeology_bottleneck" => {
96 let wid = params["workflow_id"].as_str().unwrap_or("");
97 let bottlenecks = state.archaeology.bottlenecks(wid);
98 let items: Vec<_> = bottlenecks.iter().map(|b| json!({
99 "step_id": b.step_id,
100 "avg_duration_ms": b.avg_duration_ms,
101 "percent_of_total": b.percent_of_total
102 })).collect();
103 Ok(ToolResult::text(json!({ "bottlenecks": items }).to_string()))
104 }
105 "workflow_archaeology_trend" => {
106 let wid = params["workflow_id"].as_str().unwrap_or("");
107 let fps = state.archaeology.get_fingerprints(wid);
108 let points: Vec<_> = fps.iter().map(|f| json!({
109 "execution_id": f.execution_id,
110 "duration_ms": f.total_duration_ms,
111 "completed_at": f.completed_at.to_rfc3339()
112 })).collect();
113 Ok(ToolResult::text(json!({
114 "workflow_id": wid,
115 "trend_points": points,
116 "count": points.len()
117 }).to_string()))
118 }
119 "workflow_archaeology_root_cause" => {
120 let wid = params["workflow_id"].as_str().unwrap_or("");
121 let eid = params["execution_id"].as_str().unwrap_or("");
122 let fps = state.archaeology.get_fingerprints(wid);
123 let fp = fps.iter().find(|f| f.execution_id == eid);
124 match fp {
125 Some(f) => {
126 let failed_steps: Vec<_> = f.step_outcomes.iter()
127 .filter(|(_, o)| **o == agentic_workflow::types::StepLifecycle::Failed)
128 .map(|(sid, _)| sid.clone())
129 .collect();
130 Ok(ToolResult::text(json!({
131 "execution_id": eid,
132 "failed_steps": failed_steps,
133 "retry_count": f.retry_count,
134 "total_duration_ms": f.total_duration_ms
135 }).to_string()))
136 }
137 None => Ok(ToolResult::error(format!("Execution not found: {}", eid))),
138 }
139 }
140 "workflow_predict_duration" => {
142 let wid = params["workflow_id"].as_str().unwrap_or("");
143 match state.prediction.predict_duration(wid) {
144 Ok(p) => Ok(ToolResult::text(json!({
145 "workflow_id": p.workflow_id,
146 "predicted_ms": p.predicted_ms,
147 "confidence": p.confidence,
148 "min_ms": p.min_ms,
149 "max_ms": p.max_ms,
150 "based_on": p.based_on_executions
151 }).to_string())),
152 Err(e) => Ok(ToolResult::error(format!("{}", e))),
153 }
154 }
155 "workflow_predict_success" => {
156 let wid = params["workflow_id"].as_str().unwrap_or("");
157 match state.prediction.predict_success(wid) {
158 Ok(p) => Ok(ToolResult::text(json!({
159 "workflow_id": p.workflow_id,
160 "success_probability": p.success_probability,
161 "risk_factors": p.risk_factors.len(),
162 "based_on": p.based_on_executions
163 }).to_string())),
164 Err(e) => Ok(ToolResult::error(format!("{}", e))),
165 }
166 }
167 "workflow_predict_resources" => {
168 let wid = params["workflow_id"].as_str().unwrap_or("");
169 match state.prediction.predict_resources(wid) {
170 Ok(p) => Ok(ToolResult::text(json!({
171 "workflow_id": p.workflow_id,
172 "estimated_api_calls": p.estimated_api_calls,
173 "estimated_compute_seconds": p.estimated_compute_seconds,
174 "estimated_storage_bytes": p.estimated_storage_bytes
175 }).to_string())),
176 Err(e) => Ok(ToolResult::error(format!("{}", e))),
177 }
178 }
179 "workflow_predict_cost" => {
180 let wid = params["workflow_id"].as_str().unwrap_or("");
181 match state.prediction.predict_cost(wid) {
182 Ok(p) => Ok(ToolResult::text(json!({
183 "workflow_id": p.workflow_id,
184 "estimated_cost_usd": p.estimated_cost_usd,
185 "confidence": p.confidence
186 }).to_string())),
187 Err(e) => Ok(ToolResult::error(format!("{}", e))),
188 }
189 }
190 "workflow_evolve_health" => {
192 let wid = params["workflow_id"].as_str().unwrap_or("");
193 match state.evolution.health(wid) {
194 Ok(h) => Ok(ToolResult::text(json!({
195 "workflow_id": h.workflow_id,
196 "score": h.score,
197 "success_rate": h.success_rate,
198 "avg_duration_ms": h.avg_duration_ms,
199 "drift_detected": h.drift_detected,
200 "issues": h.issues.len()
201 }).to_string())),
202 Err(e) => Ok(ToolResult::error(format!("{}", e))),
203 }
204 }
205 "workflow_evolve_drift" => {
206 let wid = params["workflow_id"].as_str().unwrap_or("");
207 let drifting = state.evolution.detect_drift(wid);
208 Ok(ToolResult::text(json!({
209 "workflow_id": wid,
210 "drift_detected": drifting
211 }).to_string()))
212 }
213 "workflow_evolve_suggest" => {
214 let wid = params["workflow_id"].as_str().unwrap_or("");
215 let suggestions = state.evolution.suggest_optimizations(wid);
216 Ok(ToolResult::text(json!({
217 "workflow_id": wid,
218 "suggestions": suggestions
219 }).to_string()))
220 }
221 "workflow_evolve_outdated" => {
222 let wid = params["workflow_id"].as_str().unwrap_or("");
223 let outdated = state.evolution.outdated_steps(wid);
224 Ok(ToolResult::text(json!({
225 "workflow_id": wid,
226 "outdated_steps": outdated
227 }).to_string()))
228 }
229 "workflow_evolve_auto_fix" => {
230 let wid = params["workflow_id"].as_str().unwrap_or("");
231 let issue = params["issue_type"].as_str().unwrap_or("");
232 Ok(ToolResult::text(json!({
233 "workflow_id": wid,
234 "issue_type": issue,
235 "status": "auto_fix_applied"
236 }).to_string()))
237 }
238 "workflow_dream_start" => {
240 let wid = params["workflow_id"].as_str().unwrap_or("");
241 state.dream.add_insight(
242 wid,
243 agentic_workflow::intelligence::dream::InsightType::DependencyHealth,
244 "Dream state analysis started",
245 "info",
246 );
247 Ok(ToolResult::text(json!({
248 "workflow_id": wid,
249 "status": "dream_started"
250 }).to_string()))
251 }
252 "workflow_dream_insights" => {
253 let wid = params["workflow_id"].as_str();
254 let insights = match wid {
255 Some(w) => state.dream.insights_for_workflow(w)
256 .iter().map(|i| json!({
257 "workflow_id": i.workflow_id,
258 "type": format!("{:?}", i.insight_type),
259 "message": i.message,
260 "severity": i.severity
261 })).collect::<Vec<_>>(),
262 None => state.dream.get_insights()
263 .iter().map(|i| json!({
264 "workflow_id": i.workflow_id,
265 "type": format!("{:?}", i.insight_type),
266 "message": i.message,
267 "severity": i.severity
268 })).collect::<Vec<_>>(),
269 };
270 Ok(ToolResult::text(json!({ "insights": insights }).to_string()))
271 }
272 "workflow_dream_validate" => {
273 let wid = params["workflow_id"].as_str().unwrap_or("");
274 let insights = state.dream.insights_for_workflow(wid);
275 Ok(ToolResult::text(json!({
276 "workflow_id": wid,
277 "insights_count": insights.len(),
278 "status": "validated"
279 }).to_string()))
280 }
281 "workflow_dream_optimize" => {
282 let wid = params["workflow_id"].as_str().unwrap_or("");
283 Ok(ToolResult::text(json!({
284 "workflow_id": wid,
285 "status": "optimizations_applied"
286 }).to_string()))
287 }
288 _ => Ok(ToolResult::error(format!("Unknown intelligence tool: {}", name))),
289 }
290}