agentic_workflow_mcp/tools/
execution_tools.rs1use serde_json::json;
2
3use crate::types::{ToolDefinition, ToolResult};
4use super::registry::EngineState;
5
6pub fn definitions() -> Vec<ToolDefinition> {
7 vec![
8 ToolDefinition {
9 name: "workflow_run".to_string(),
10 description: "Start executing a registered workflow".to_string(),
11 input_schema: json!({
12 "type": "object",
13 "properties": {
14 "workflow_id": { "type": "string", "description": "Workflow ID to execute" }
15 },
16 "required": ["workflow_id"]
17 }),
18 },
19 ToolDefinition {
20 name: "workflow_status".to_string(),
21 description: "Get the current status of a workflow execution".to_string(),
22 input_schema: json!({
23 "type": "object",
24 "properties": {
25 "execution_id": { "type": "string", "description": "Execution ID" }
26 },
27 "required": ["execution_id"]
28 }),
29 },
30 ToolDefinition {
31 name: "workflow_progress".to_string(),
32 description: "Get detailed progress of a workflow execution".to_string(),
33 input_schema: json!({
34 "type": "object",
35 "properties": {
36 "execution_id": { "type": "string", "description": "Execution ID" }
37 },
38 "required": ["execution_id"]
39 }),
40 },
41 ToolDefinition {
42 name: "workflow_observe".to_string(),
43 description: "Get execution context including step states and variables".to_string(),
44 input_schema: json!({
45 "type": "object",
46 "properties": {
47 "execution_id": { "type": "string", "description": "Execution ID" }
48 },
49 "required": ["execution_id"]
50 }),
51 },
52 ToolDefinition {
53 name: "workflow_pause".to_string(),
54 description: "Pause a running workflow execution".to_string(),
55 input_schema: json!({
56 "type": "object",
57 "properties": {
58 "execution_id": { "type": "string", "description": "Execution ID" }
59 },
60 "required": ["execution_id"]
61 }),
62 },
63 ToolDefinition {
64 name: "workflow_resume".to_string(),
65 description: "Resume a paused workflow execution".to_string(),
66 input_schema: json!({
67 "type": "object",
68 "properties": {
69 "execution_id": { "type": "string", "description": "Execution ID" }
70 },
71 "required": ["execution_id"]
72 }),
73 },
74 ToolDefinition {
75 name: "workflow_cancel".to_string(),
76 description: "Cancel a running or paused workflow execution".to_string(),
77 input_schema: json!({
78 "type": "object",
79 "properties": {
80 "execution_id": { "type": "string", "description": "Execution ID" }
81 },
82 "required": ["execution_id"]
83 }),
84 },
85 ToolDefinition {
86 name: "workflow_intervene".to_string(),
87 description: "Inject a variable or override into a running execution".to_string(),
88 input_schema: json!({
89 "type": "object",
90 "properties": {
91 "execution_id": { "type": "string", "description": "Execution ID" },
92 "action": { "type": "string", "description": "Intervention action: set_variable, skip_step" },
93 "key": { "type": "string", "description": "Variable name or step ID" },
94 "value": { "description": "Value to set" }
95 },
96 "required": ["execution_id", "action"]
97 }),
98 },
99 ]
100}
101
102pub fn dispatch(
103 name: &str,
104 params: serde_json::Value,
105 state: &mut EngineState,
106) -> Result<ToolResult, (i32, String)> {
107 match name {
108 "workflow_run" => {
109 let wf_id = params["workflow_id"].as_str().unwrap_or("");
110 match state.dag.start_execution(wf_id) {
111 Ok(exec_id) => Ok(ToolResult::text(json!({
112 "execution_id": exec_id,
113 "status": "running"
114 }).to_string())),
115 Err(e) => Ok(ToolResult::error(format!("{}", e))),
116 }
117 }
118 "workflow_status" => {
119 let exec_id = params["execution_id"].as_str().unwrap_or("");
120 match state.dag.get_execution(exec_id) {
121 Ok(ctx) => Ok(ToolResult::text(json!({
122 "execution_id": ctx.execution_id,
123 "workflow_id": ctx.workflow_id,
124 "status": format!("{:?}", ctx.status),
125 "started_at": ctx.started_at.to_rfc3339()
126 }).to_string())),
127 Err(e) => Ok(ToolResult::error(format!("{}", e))),
128 }
129 }
130 "workflow_progress" => {
131 let exec_id = params["execution_id"].as_str().unwrap_or("");
132 match state.dag.get_progress(exec_id) {
133 Ok(p) => Ok(ToolResult::text(json!({
134 "execution_id": p.execution_id,
135 "total_steps": p.total_steps,
136 "completed": p.completed_steps,
137 "failed": p.failed_steps,
138 "running": p.running_steps,
139 "pending": p.pending_steps,
140 "percent_complete": p.percent_complete
141 }).to_string())),
142 Err(e) => Ok(ToolResult::error(format!("{}", e))),
143 }
144 }
145 "workflow_observe" => {
146 let exec_id = params["execution_id"].as_str().unwrap_or("");
147 match state.dag.get_execution(exec_id) {
148 Ok(ctx) => {
149 let steps: Vec<_> = ctx.step_states.values().map(|s| json!({
150 "step_id": s.step_id,
151 "lifecycle": format!("{:?}", s.lifecycle),
152 "attempt": s.attempt
153 })).collect();
154 Ok(ToolResult::text(json!({
155 "execution_id": ctx.execution_id,
156 "status": format!("{:?}", ctx.status),
157 "steps": steps,
158 "variables": ctx.variables
159 }).to_string()))
160 }
161 Err(e) => Ok(ToolResult::error(format!("{}", e))),
162 }
163 }
164 "workflow_pause" => {
165 let exec_id = params["execution_id"].as_str().unwrap_or("");
166 match state.dag.pause_execution(exec_id) {
167 Ok(()) => Ok(ToolResult::text(json!({
168 "execution_id": exec_id,
169 "status": "paused"
170 }).to_string())),
171 Err(e) => Ok(ToolResult::error(format!("{}", e))),
172 }
173 }
174 "workflow_resume" => {
175 let exec_id = params["execution_id"].as_str().unwrap_or("");
176 match state.dag.resume_execution(exec_id) {
177 Ok(()) => Ok(ToolResult::text(json!({
178 "execution_id": exec_id,
179 "status": "running"
180 }).to_string())),
181 Err(e) => Ok(ToolResult::error(format!("{}", e))),
182 }
183 }
184 "workflow_cancel" => {
185 let exec_id = params["execution_id"].as_str().unwrap_or("");
186 match state.dag.cancel_execution(exec_id) {
187 Ok(()) => Ok(ToolResult::text(json!({
188 "execution_id": exec_id,
189 "status": "cancelled"
190 }).to_string())),
191 Err(e) => Ok(ToolResult::error(format!("{}", e))),
192 }
193 }
194 "workflow_intervene" => {
195 let exec_id = params["execution_id"].as_str().unwrap_or("");
196 let action = params["action"].as_str().unwrap_or("");
197 let key = params["key"].as_str().unwrap_or("");
198 let value = ¶ms["value"];
199 match action {
200 "set_variable" => {
201 Ok(ToolResult::text(json!({
203 "execution_id": exec_id,
204 "action": "set_variable",
205 "key": key,
206 "value": value,
207 "status": "applied"
208 }).to_string()))
209 }
210 "skip_step" => {
211 Ok(ToolResult::text(json!({
212 "execution_id": exec_id,
213 "action": "skip_step",
214 "step_id": key,
215 "status": "applied"
216 }).to_string()))
217 }
218 _ => Ok(ToolResult::error(format!("Unknown intervention: {}", action))),
219 }
220 }
221 _ => Ok(ToolResult::error(format!("Unknown execution tool: {}", name))),
222 }
223}