Skip to main content

agentic_workflow_mcp/tools/
processing_tools.rs

1use serde_json::json;
2
3use crate::types::{ToolDefinition, ToolResult};
4use super::registry::EngineState;
5
6fn def(name: &str, desc: &str, props: serde_json::Value) -> ToolDefinition {
7    ToolDefinition {
8        name: name.to_string(),
9        description: desc.to_string(),
10        input_schema: json!({ "type": "object", "properties": props }),
11    }
12}
13
14fn s(d: &str) -> serde_json::Value { json!({ "type": "string", "description": d }) }
15fn i(d: &str) -> serde_json::Value { json!({ "type": "integer", "description": d }) }
16
17pub fn definitions() -> Vec<ToolDefinition> {
18    vec![
19        // Batch (5)
20        def("workflow_batch_create", "Create a batch job to process multiple items through a workflow",
21            json!({ "workflow_id": s("Workflow ID"), "items": json!({ "type": "array", "description": "Items to process" }), "concurrency": i("Max parallel items"), "checkpoint_every": i("Checkpoint interval") })),
22        def("workflow_batch_run", "Start executing a batch job",
23            json!({ "batch_id": s("Batch job ID") })),
24        def("workflow_batch_progress", "Get progress of a batch job",
25            json!({ "batch_id": s("Batch job ID") })),
26        def("workflow_batch_resume", "Resume a batch job from its last checkpoint",
27            json!({ "batch_id": s("Batch job ID") })),
28        def("workflow_batch_report", "Get completion report for a batch job",
29            json!({ "batch_id": s("Batch job ID") })),
30        // Stream (6)
31        def("workflow_stream_create", "Create a stream processor for continuous event processing",
32            json!({ "name": s("Processor name"), "workflow_id": s("Workflow ID"), "source_type": s("file_watch, queue, webhook"), "max_queue_size": i("Max queue size") })),
33        def("workflow_stream_start", "Start consuming from a stream",
34            json!({ "stream_id": s("Stream processor ID") })),
35        def("workflow_stream_status", "Get status of a stream processor",
36            json!({ "stream_id": s("Stream processor ID") })),
37        def("workflow_stream_pause", "Pause stream consumption",
38            json!({ "stream_id": s("Stream processor ID") })),
39        def("workflow_stream_checkpoint", "Force a checkpoint at the current stream position",
40            json!({ "stream_id": s("Stream processor ID"), "offset": i("Current offset"), "items_processed": i("Items processed so far") })),
41        def("workflow_stream_fork", "Add a fork to split stream events by condition",
42            json!({ "stream_id": s("Stream processor ID"), "name": s("Fork name"), "condition": s("Fork condition"), "target_workflow_id": s("Target workflow for matching events") })),
43        // Fan-out (4)
44        def("workflow_fanout_create", "Create a fan-out step for parallel distribution",
45            json!({ "destinations": json!({ "type": "array", "items": { "type": "object" }, "description": "Destination configs" }), "completion_policy": s("wait_all, wait_any, wait_n"), "timeout_ms": i("Timeout") })),
46        def("workflow_fanout_execute", "Start executing a fan-out step",
47            json!({ "fanout_id": s("Fan-out step ID"), "execution_id": s("Execution ID") })),
48        def("workflow_fanout_status", "Get status of a fan-out execution",
49            json!({ "execution_id": s("Execution ID") })),
50        def("workflow_fanout_policy", "Get the fan-out step definition and completion policy",
51            json!({ "fanout_id": s("Fan-out step ID") })),
52    ]
53}
54
55pub fn dispatch(
56    name: &str,
57    params: serde_json::Value,
58    state: &mut EngineState,
59) -> Result<ToolResult, (i32, String)> {
60    match name {
61        // --- Batch ---
62        "workflow_batch_create" => {
63            let wid = params["workflow_id"].as_str().unwrap_or("");
64            let items: Vec<serde_json::Value> = params["items"]
65                .as_array()
66                .cloned()
67                .unwrap_or_default();
68            let concurrency = params["concurrency"].as_u64().unwrap_or(1) as usize;
69            let checkpoint = params["checkpoint_every"].as_u64().unwrap_or(10) as usize;
70            match state.batch.create_batch(wid, items, concurrency, checkpoint) {
71                Ok(bid) => Ok(ToolResult::text(json!({
72                    "batch_id": bid, "status": "created"
73                }).to_string())),
74                Err(e) => Ok(ToolResult::error(format!("{}", e))),
75            }
76        }
77        "workflow_batch_run" => {
78            let bid = params["batch_id"].as_str().unwrap_or("");
79            match state.batch.get_job(bid) {
80                Ok(_) => Ok(ToolResult::text(json!({
81                    "batch_id": bid, "status": "running"
82                }).to_string())),
83                Err(e) => Ok(ToolResult::error(format!("{}", e))),
84            }
85        }
86        "workflow_batch_progress" => {
87            let bid = params["batch_id"].as_str().unwrap_or("");
88            match state.batch.get_progress(bid) {
89                Ok(p) => Ok(ToolResult::text(json!({
90                    "batch_id": p.batch_id, "total": p.total_items,
91                    "completed": p.completed, "failed": p.failed,
92                    "percent_complete": p.percent_complete
93                }).to_string())),
94                Err(e) => Ok(ToolResult::error(format!("{}", e))),
95            }
96        }
97        "workflow_batch_resume" => {
98            let bid = params["batch_id"].as_str().unwrap_or("");
99            match state.batch.get_progress(bid) {
100                Ok(p) => Ok(ToolResult::text(json!({
101                    "batch_id": bid, "status": "resumed",
102                    "resuming_from_checkpoint": p.last_checkpoint_index
103                }).to_string())),
104                Err(e) => Ok(ToolResult::error(format!("{}", e))),
105            }
106        }
107        "workflow_batch_report" => {
108            let bid = params["batch_id"].as_str().unwrap_or("");
109            match state.batch.get_report(bid) {
110                Ok(r) => Ok(ToolResult::text(json!({
111                    "batch_id": r.batch_id, "total": r.total_items,
112                    "success": r.success_count, "failed": r.fail_count,
113                    "avg_duration_ms": r.avg_item_duration_ms
114                }).to_string())),
115                Err(e) => Ok(ToolResult::error(format!("{}", e))),
116            }
117        }
118        // --- Stream ---
119        "workflow_stream_create" => {
120            let sname = params["name"].as_str().unwrap_or("stream");
121            let wid = params["workflow_id"].as_str().unwrap_or("");
122            let source = match params["source_type"].as_str().unwrap_or("file_watch") {
123                "queue" => agentic_workflow::types::StreamSource::Queue {
124                    queue_name: "default".to_string(),
125                    connection: "localhost".to_string(),
126                },
127                "webhook" => agentic_workflow::types::StreamSource::Webhook {
128                    endpoint: "/stream".to_string(),
129                },
130                _ => agentic_workflow::types::StreamSource::FileWatch {
131                    path: "/tmp".to_string(),
132                    pattern: None,
133                },
134            };
135            let max_q = params["max_queue_size"].as_u64().unwrap_or(100) as usize;
136            match state.stream.create_processor(sname, wid, source, None, max_q) {
137                Ok(sid) => Ok(ToolResult::text(json!({
138                    "stream_id": sid, "status": "created"
139                }).to_string())),
140                Err(e) => Ok(ToolResult::error(format!("{}", e))),
141            }
142        }
143        "workflow_stream_start" => {
144            let sid = params["stream_id"].as_str().unwrap_or("");
145            match state.stream.start(sid) {
146                Ok(()) => Ok(ToolResult::text(json!({ "stream_id": sid, "status": "running" }).to_string())),
147                Err(e) => Ok(ToolResult::error(format!("{}", e))),
148            }
149        }
150        "workflow_stream_status" => {
151            let sid = params["stream_id"].as_str().unwrap_or("");
152            match state.stream.get_processor(sid) {
153                Ok(p) => Ok(ToolResult::text(json!({
154                    "stream_id": p.id, "name": p.name,
155                    "status": format!("{:?}", p.status)
156                }).to_string())),
157                Err(e) => Ok(ToolResult::error(format!("{}", e))),
158            }
159        }
160        "workflow_stream_pause" => {
161            let sid = params["stream_id"].as_str().unwrap_or("");
162            match state.stream.pause(sid) {
163                Ok(()) => Ok(ToolResult::text(json!({ "stream_id": sid, "status": "paused" }).to_string())),
164                Err(e) => Ok(ToolResult::error(format!("{}", e))),
165            }
166        }
167        "workflow_stream_checkpoint" => {
168            let sid = params["stream_id"].as_str().unwrap_or("");
169            let offset = params["offset"].as_u64().unwrap_or(0);
170            let items = params["items_processed"].as_u64().unwrap_or(0);
171            match state.stream.checkpoint(sid, offset, items) {
172                Ok(()) => Ok(ToolResult::text(json!({
173                    "stream_id": sid, "offset": offset, "status": "checkpointed"
174                }).to_string())),
175                Err(e) => Ok(ToolResult::error(format!("{}", e))),
176            }
177        }
178        "workflow_stream_fork" => {
179            let sid = params["stream_id"].as_str().unwrap_or("");
180            let fname = params["name"].as_str().unwrap_or("fork");
181            let condition = params["condition"].as_str().unwrap_or("true");
182            let target = params["target_workflow_id"].as_str().unwrap_or("");
183            match state.stream.add_fork(sid, fname, condition, target) {
184                Ok(fid) => Ok(ToolResult::text(json!({
185                    "fork_id": fid, "stream_id": sid, "status": "created"
186                }).to_string())),
187                Err(e) => Ok(ToolResult::error(format!("{}", e))),
188            }
189        }
190        // --- Fan-out ---
191        "workflow_fanout_create" => {
192            let dests: Vec<agentic_workflow::types::fanout::FanOutDestination> = params["destinations"]
193                .as_array()
194                .map(|arr| arr.iter().enumerate().map(|(i, d)| {
195                    agentic_workflow::types::FanOutDestination {
196                        id: d["id"].as_str().unwrap_or(&format!("d{}", i)).to_string(),
197                        name: d["name"].as_str().unwrap_or("dest").to_string(),
198                        step_config: d["config"].clone(),
199                    }
200                }).collect())
201                .unwrap_or_default();
202            let policy = match params["completion_policy"].as_str().unwrap_or("wait_all") {
203                "wait_any" => agentic_workflow::types::CompletionPolicy::WaitAny,
204                "wait_n" => agentic_workflow::types::CompletionPolicy::WaitN(1),
205                _ => agentic_workflow::types::CompletionPolicy::WaitAll,
206            };
207            let agg = agentic_workflow::types::ResultAggregator::Merge;
208            let timeout = params["timeout_ms"].as_u64();
209            match state.fanout.create_fanout(dests, policy, agg, timeout) {
210                Ok(fid) => Ok(ToolResult::text(json!({
211                    "fanout_id": fid, "status": "created"
212                }).to_string())),
213                Err(e) => Ok(ToolResult::error(format!("{}", e))),
214            }
215        }
216        "workflow_fanout_execute" => {
217            let fid = params["fanout_id"].as_str().unwrap_or("");
218            let eid = params["execution_id"].as_str().unwrap_or("");
219            match state.fanout.start_execution(fid, eid) {
220                Ok(()) => Ok(ToolResult::text(json!({
221                    "fanout_id": fid, "execution_id": eid, "status": "executing"
222                }).to_string())),
223                Err(e) => Ok(ToolResult::error(format!("{}", e))),
224            }
225        }
226        "workflow_fanout_status" => {
227            let eid = params["execution_id"].as_str().unwrap_or("");
228            match state.fanout.get_status(eid) {
229                Ok(st) => {
230                    let branches: Vec<_> = st.branches.iter().map(|b| json!({
231                        "destination_id": b.destination_id,
232                        "status": format!("{:?}", b.status)
233                    })).collect();
234                    Ok(ToolResult::text(json!({
235                        "execution_id": eid, "completed": st.completed,
236                        "branches": branches
237                    }).to_string()))
238                }
239                Err(e) => Ok(ToolResult::error(format!("{}", e))),
240            }
241        }
242        "workflow_fanout_policy" => {
243            let fid = params["fanout_id"].as_str().unwrap_or("");
244            match state.fanout.get_step(fid) {
245                Ok(step) => Ok(ToolResult::text(json!({
246                    "fanout_id": step.id,
247                    "destinations": step.destinations.len(),
248                    "timeout_ms": step.timeout_ms
249                }).to_string())),
250                Err(e) => Ok(ToolResult::error(format!("{}", e))),
251            }
252        }
253        _ => Ok(ToolResult::error(format!("Unknown processing tool: {}", name))),
254    }
255}