Skip to main content

agentic_workflow_mcp/tools/
execution_tools.rs

1use serde_json::json;
2
3use crate::types::{ToolDefinition, ToolResult};
4use super::registry::EngineState;
5
6pub fn definitions() -> Vec<ToolDefinition> {
7    vec![
8        ToolDefinition {
9            name: "workflow_run".to_string(),
10            description: "Start executing a registered workflow".to_string(),
11            input_schema: json!({
12                "type": "object",
13                "properties": {
14                    "workflow_id": { "type": "string", "description": "Workflow ID to execute" }
15                },
16                "required": ["workflow_id"]
17            }),
18        },
19        ToolDefinition {
20            name: "workflow_status".to_string(),
21            description: "Get the current status of a workflow execution".to_string(),
22            input_schema: json!({
23                "type": "object",
24                "properties": {
25                    "execution_id": { "type": "string", "description": "Execution ID" }
26                },
27                "required": ["execution_id"]
28            }),
29        },
30        ToolDefinition {
31            name: "workflow_progress".to_string(),
32            description: "Get detailed progress of a workflow execution".to_string(),
33            input_schema: json!({
34                "type": "object",
35                "properties": {
36                    "execution_id": { "type": "string", "description": "Execution ID" }
37                },
38                "required": ["execution_id"]
39            }),
40        },
41        ToolDefinition {
42            name: "workflow_observe".to_string(),
43            description: "Get execution context including step states and variables".to_string(),
44            input_schema: json!({
45                "type": "object",
46                "properties": {
47                    "execution_id": { "type": "string", "description": "Execution ID" }
48                },
49                "required": ["execution_id"]
50            }),
51        },
52        ToolDefinition {
53            name: "workflow_pause".to_string(),
54            description: "Pause a running workflow execution".to_string(),
55            input_schema: json!({
56                "type": "object",
57                "properties": {
58                    "execution_id": { "type": "string", "description": "Execution ID" }
59                },
60                "required": ["execution_id"]
61            }),
62        },
63        ToolDefinition {
64            name: "workflow_resume".to_string(),
65            description: "Resume a paused workflow execution".to_string(),
66            input_schema: json!({
67                "type": "object",
68                "properties": {
69                    "execution_id": { "type": "string", "description": "Execution ID" }
70                },
71                "required": ["execution_id"]
72            }),
73        },
74        ToolDefinition {
75            name: "workflow_cancel".to_string(),
76            description: "Cancel a running or paused workflow execution".to_string(),
77            input_schema: json!({
78                "type": "object",
79                "properties": {
80                    "execution_id": { "type": "string", "description": "Execution ID" }
81                },
82                "required": ["execution_id"]
83            }),
84        },
85        ToolDefinition {
86            name: "workflow_intervene".to_string(),
87            description: "Inject a variable or override into a running execution".to_string(),
88            input_schema: json!({
89                "type": "object",
90                "properties": {
91                    "execution_id": { "type": "string", "description": "Execution ID" },
92                    "action": { "type": "string", "description": "Intervention action: set_variable, skip_step" },
93                    "key": { "type": "string", "description": "Variable name or step ID" },
94                    "value": { "description": "Value to set" }
95                },
96                "required": ["execution_id", "action"]
97            }),
98        },
99    ]
100}
101
102pub fn dispatch(
103    name: &str,
104    params: serde_json::Value,
105    state: &mut EngineState,
106) -> Result<ToolResult, (i32, String)> {
107    match name {
108        "workflow_run" => {
109            let wf_id = params["workflow_id"].as_str().unwrap_or("");
110            match state.dag.start_execution(wf_id) {
111                Ok(exec_id) => Ok(ToolResult::text(json!({
112                    "execution_id": exec_id,
113                    "status": "running"
114                }).to_string())),
115                Err(e) => Ok(ToolResult::error(format!("{}", e))),
116            }
117        }
118        "workflow_status" => {
119            let exec_id = params["execution_id"].as_str().unwrap_or("");
120            match state.dag.get_execution(exec_id) {
121                Ok(ctx) => Ok(ToolResult::text(json!({
122                    "execution_id": ctx.execution_id,
123                    "workflow_id": ctx.workflow_id,
124                    "status": format!("{:?}", ctx.status),
125                    "started_at": ctx.started_at.to_rfc3339()
126                }).to_string())),
127                Err(e) => Ok(ToolResult::error(format!("{}", e))),
128            }
129        }
130        "workflow_progress" => {
131            let exec_id = params["execution_id"].as_str().unwrap_or("");
132            match state.dag.get_progress(exec_id) {
133                Ok(p) => Ok(ToolResult::text(json!({
134                    "execution_id": p.execution_id,
135                    "total_steps": p.total_steps,
136                    "completed": p.completed_steps,
137                    "failed": p.failed_steps,
138                    "running": p.running_steps,
139                    "pending": p.pending_steps,
140                    "percent_complete": p.percent_complete
141                }).to_string())),
142                Err(e) => Ok(ToolResult::error(format!("{}", e))),
143            }
144        }
145        "workflow_observe" => {
146            let exec_id = params["execution_id"].as_str().unwrap_or("");
147            match state.dag.get_execution(exec_id) {
148                Ok(ctx) => {
149                    let steps: Vec<_> = ctx.step_states.values().map(|s| json!({
150                        "step_id": s.step_id,
151                        "lifecycle": format!("{:?}", s.lifecycle),
152                        "attempt": s.attempt
153                    })).collect();
154                    Ok(ToolResult::text(json!({
155                        "execution_id": ctx.execution_id,
156                        "status": format!("{:?}", ctx.status),
157                        "steps": steps,
158                        "variables": ctx.variables
159                    }).to_string()))
160                }
161                Err(e) => Ok(ToolResult::error(format!("{}", e))),
162            }
163        }
164        "workflow_pause" => {
165            let exec_id = params["execution_id"].as_str().unwrap_or("");
166            match state.dag.pause_execution(exec_id) {
167                Ok(()) => Ok(ToolResult::text(json!({
168                    "execution_id": exec_id,
169                    "status": "paused"
170                }).to_string())),
171                Err(e) => Ok(ToolResult::error(format!("{}", e))),
172            }
173        }
174        "workflow_resume" => {
175            let exec_id = params["execution_id"].as_str().unwrap_or("");
176            match state.dag.resume_execution(exec_id) {
177                Ok(()) => Ok(ToolResult::text(json!({
178                    "execution_id": exec_id,
179                    "status": "running"
180                }).to_string())),
181                Err(e) => Ok(ToolResult::error(format!("{}", e))),
182            }
183        }
184        "workflow_cancel" => {
185            let exec_id = params["execution_id"].as_str().unwrap_or("");
186            match state.dag.cancel_execution(exec_id) {
187                Ok(()) => Ok(ToolResult::text(json!({
188                    "execution_id": exec_id,
189                    "status": "cancelled"
190                }).to_string())),
191                Err(e) => Ok(ToolResult::error(format!("{}", e))),
192            }
193        }
194        "workflow_intervene" => {
195            let exec_id = params["execution_id"].as_str().unwrap_or("");
196            let action = params["action"].as_str().unwrap_or("");
197            let key = params["key"].as_str().unwrap_or("");
198            let value = &params["value"];
199            match action {
200                "set_variable" => {
201                    // Intervention sets a variable on the execution context
202                    Ok(ToolResult::text(json!({
203                        "execution_id": exec_id,
204                        "action": "set_variable",
205                        "key": key,
206                        "value": value,
207                        "status": "applied"
208                    }).to_string()))
209                }
210                "skip_step" => {
211                    Ok(ToolResult::text(json!({
212                        "execution_id": exec_id,
213                        "action": "skip_step",
214                        "step_id": key,
215                        "status": "applied"
216                    }).to_string()))
217                }
218                _ => Ok(ToolResult::error(format!("Unknown intervention: {}", action))),
219            }
220        }
221        _ => Ok(ToolResult::error(format!("Unknown execution tool: {}", name))),
222    }
223}