1use serde_json::json;
2
3use crate::types::{ToolDefinition, ToolResult};
4use super::registry::EngineState;
5
6fn def(name: &str, desc: &str, props: serde_json::Value) -> ToolDefinition {
7 ToolDefinition {
8 name: name.to_string(),
9 description: desc.to_string(),
10 input_schema: json!({ "type": "object", "properties": props }),
11 }
12}
13
14fn s(d: &str) -> serde_json::Value { json!({ "type": "string", "description": d }) }
15fn i(d: &str) -> serde_json::Value { json!({ "type": "integer", "description": d }) }
16
17pub fn definitions() -> Vec<ToolDefinition> {
18 vec![
19 def("workflow_batch_create", "Create a batch job to process multiple items through a workflow",
21 json!({ "workflow_id": s("Workflow ID"), "items": json!({ "type": "array", "description": "Items to process" }), "concurrency": i("Max parallel items"), "checkpoint_every": i("Checkpoint interval") })),
22 def("workflow_batch_run", "Start executing a batch job",
23 json!({ "batch_id": s("Batch job ID") })),
24 def("workflow_batch_progress", "Get progress of a batch job",
25 json!({ "batch_id": s("Batch job ID") })),
26 def("workflow_batch_resume", "Resume a batch job from its last checkpoint",
27 json!({ "batch_id": s("Batch job ID") })),
28 def("workflow_batch_report", "Get completion report for a batch job",
29 json!({ "batch_id": s("Batch job ID") })),
30 def("workflow_stream_create", "Create a stream processor for continuous event processing",
32 json!({ "name": s("Processor name"), "workflow_id": s("Workflow ID"), "source_type": s("file_watch, queue, webhook"), "max_queue_size": i("Max queue size") })),
33 def("workflow_stream_start", "Start consuming from a stream",
34 json!({ "stream_id": s("Stream processor ID") })),
35 def("workflow_stream_status", "Get status of a stream processor",
36 json!({ "stream_id": s("Stream processor ID") })),
37 def("workflow_stream_pause", "Pause stream consumption",
38 json!({ "stream_id": s("Stream processor ID") })),
39 def("workflow_stream_checkpoint", "Force a checkpoint at the current stream position",
40 json!({ "stream_id": s("Stream processor ID"), "offset": i("Current offset"), "items_processed": i("Items processed so far") })),
41 def("workflow_stream_fork", "Add a fork to split stream events by condition",
42 json!({ "stream_id": s("Stream processor ID"), "name": s("Fork name"), "condition": s("Fork condition"), "target_workflow_id": s("Target workflow for matching events") })),
43 def("workflow_fanout_create", "Create a fan-out step for parallel distribution",
45 json!({ "destinations": json!({ "type": "array", "items": { "type": "object" }, "description": "Destination configs" }), "completion_policy": s("wait_all, wait_any, wait_n"), "timeout_ms": i("Timeout") })),
46 def("workflow_fanout_execute", "Start executing a fan-out step",
47 json!({ "fanout_id": s("Fan-out step ID"), "execution_id": s("Execution ID") })),
48 def("workflow_fanout_status", "Get status of a fan-out execution",
49 json!({ "execution_id": s("Execution ID") })),
50 def("workflow_fanout_policy", "Get the fan-out step definition and completion policy",
51 json!({ "fanout_id": s("Fan-out step ID") })),
52 ]
53}
54
55pub fn dispatch(
56 name: &str,
57 params: serde_json::Value,
58 state: &mut EngineState,
59) -> Result<ToolResult, (i32, String)> {
60 match name {
61 "workflow_batch_create" => {
63 let wid = params["workflow_id"].as_str().unwrap_or("");
64 let items: Vec<serde_json::Value> = params["items"]
65 .as_array()
66 .cloned()
67 .unwrap_or_default();
68 let concurrency = params["concurrency"].as_u64().unwrap_or(1) as usize;
69 let checkpoint = params["checkpoint_every"].as_u64().unwrap_or(10) as usize;
70 match state.batch.create_batch(wid, items, concurrency, checkpoint) {
71 Ok(bid) => Ok(ToolResult::text(json!({
72 "batch_id": bid, "status": "created"
73 }).to_string())),
74 Err(e) => Ok(ToolResult::error(format!("{}", e))),
75 }
76 }
77 "workflow_batch_run" => {
78 let bid = params["batch_id"].as_str().unwrap_or("");
79 match state.batch.get_job(bid) {
80 Ok(_) => Ok(ToolResult::text(json!({
81 "batch_id": bid, "status": "running"
82 }).to_string())),
83 Err(e) => Ok(ToolResult::error(format!("{}", e))),
84 }
85 }
86 "workflow_batch_progress" => {
87 let bid = params["batch_id"].as_str().unwrap_or("");
88 match state.batch.get_progress(bid) {
89 Ok(p) => Ok(ToolResult::text(json!({
90 "batch_id": p.batch_id, "total": p.total_items,
91 "completed": p.completed, "failed": p.failed,
92 "percent_complete": p.percent_complete
93 }).to_string())),
94 Err(e) => Ok(ToolResult::error(format!("{}", e))),
95 }
96 }
97 "workflow_batch_resume" => {
98 let bid = params["batch_id"].as_str().unwrap_or("");
99 match state.batch.get_progress(bid) {
100 Ok(p) => Ok(ToolResult::text(json!({
101 "batch_id": bid, "status": "resumed",
102 "resuming_from_checkpoint": p.last_checkpoint_index
103 }).to_string())),
104 Err(e) => Ok(ToolResult::error(format!("{}", e))),
105 }
106 }
107 "workflow_batch_report" => {
108 let bid = params["batch_id"].as_str().unwrap_or("");
109 match state.batch.get_report(bid) {
110 Ok(r) => Ok(ToolResult::text(json!({
111 "batch_id": r.batch_id, "total": r.total_items,
112 "success": r.success_count, "failed": r.fail_count,
113 "avg_duration_ms": r.avg_item_duration_ms
114 }).to_string())),
115 Err(e) => Ok(ToolResult::error(format!("{}", e))),
116 }
117 }
118 "workflow_stream_create" => {
120 let sname = params["name"].as_str().unwrap_or("stream");
121 let wid = params["workflow_id"].as_str().unwrap_or("");
122 let source = match params["source_type"].as_str().unwrap_or("file_watch") {
123 "queue" => agentic_workflow::types::StreamSource::Queue {
124 queue_name: "default".to_string(),
125 connection: "localhost".to_string(),
126 },
127 "webhook" => agentic_workflow::types::StreamSource::Webhook {
128 endpoint: "/stream".to_string(),
129 },
130 _ => agentic_workflow::types::StreamSource::FileWatch {
131 path: "/tmp".to_string(),
132 pattern: None,
133 },
134 };
135 let max_q = params["max_queue_size"].as_u64().unwrap_or(100) as usize;
136 match state.stream.create_processor(sname, wid, source, None, max_q) {
137 Ok(sid) => Ok(ToolResult::text(json!({
138 "stream_id": sid, "status": "created"
139 }).to_string())),
140 Err(e) => Ok(ToolResult::error(format!("{}", e))),
141 }
142 }
143 "workflow_stream_start" => {
144 let sid = params["stream_id"].as_str().unwrap_or("");
145 match state.stream.start(sid) {
146 Ok(()) => Ok(ToolResult::text(json!({ "stream_id": sid, "status": "running" }).to_string())),
147 Err(e) => Ok(ToolResult::error(format!("{}", e))),
148 }
149 }
150 "workflow_stream_status" => {
151 let sid = params["stream_id"].as_str().unwrap_or("");
152 match state.stream.get_processor(sid) {
153 Ok(p) => Ok(ToolResult::text(json!({
154 "stream_id": p.id, "name": p.name,
155 "status": format!("{:?}", p.status)
156 }).to_string())),
157 Err(e) => Ok(ToolResult::error(format!("{}", e))),
158 }
159 }
160 "workflow_stream_pause" => {
161 let sid = params["stream_id"].as_str().unwrap_or("");
162 match state.stream.pause(sid) {
163 Ok(()) => Ok(ToolResult::text(json!({ "stream_id": sid, "status": "paused" }).to_string())),
164 Err(e) => Ok(ToolResult::error(format!("{}", e))),
165 }
166 }
167 "workflow_stream_checkpoint" => {
168 let sid = params["stream_id"].as_str().unwrap_or("");
169 let offset = params["offset"].as_u64().unwrap_or(0);
170 let items = params["items_processed"].as_u64().unwrap_or(0);
171 match state.stream.checkpoint(sid, offset, items) {
172 Ok(()) => Ok(ToolResult::text(json!({
173 "stream_id": sid, "offset": offset, "status": "checkpointed"
174 }).to_string())),
175 Err(e) => Ok(ToolResult::error(format!("{}", e))),
176 }
177 }
178 "workflow_stream_fork" => {
179 let sid = params["stream_id"].as_str().unwrap_or("");
180 let fname = params["name"].as_str().unwrap_or("fork");
181 let condition = params["condition"].as_str().unwrap_or("true");
182 let target = params["target_workflow_id"].as_str().unwrap_or("");
183 match state.stream.add_fork(sid, fname, condition, target) {
184 Ok(fid) => Ok(ToolResult::text(json!({
185 "fork_id": fid, "stream_id": sid, "status": "created"
186 }).to_string())),
187 Err(e) => Ok(ToolResult::error(format!("{}", e))),
188 }
189 }
190 "workflow_fanout_create" => {
192 let dests: Vec<agentic_workflow::types::fanout::FanOutDestination> = params["destinations"]
193 .as_array()
194 .map(|arr| arr.iter().enumerate().map(|(i, d)| {
195 agentic_workflow::types::FanOutDestination {
196 id: d["id"].as_str().unwrap_or(&format!("d{}", i)).to_string(),
197 name: d["name"].as_str().unwrap_or("dest").to_string(),
198 step_config: d["config"].clone(),
199 }
200 }).collect())
201 .unwrap_or_default();
202 let policy = match params["completion_policy"].as_str().unwrap_or("wait_all") {
203 "wait_any" => agentic_workflow::types::CompletionPolicy::WaitAny,
204 "wait_n" => agentic_workflow::types::CompletionPolicy::WaitN(1),
205 _ => agentic_workflow::types::CompletionPolicy::WaitAll,
206 };
207 let agg = agentic_workflow::types::ResultAggregator::Merge;
208 let timeout = params["timeout_ms"].as_u64();
209 match state.fanout.create_fanout(dests, policy, agg, timeout) {
210 Ok(fid) => Ok(ToolResult::text(json!({
211 "fanout_id": fid, "status": "created"
212 }).to_string())),
213 Err(e) => Ok(ToolResult::error(format!("{}", e))),
214 }
215 }
216 "workflow_fanout_execute" => {
217 let fid = params["fanout_id"].as_str().unwrap_or("");
218 let eid = params["execution_id"].as_str().unwrap_or("");
219 match state.fanout.start_execution(fid, eid) {
220 Ok(()) => Ok(ToolResult::text(json!({
221 "fanout_id": fid, "execution_id": eid, "status": "executing"
222 }).to_string())),
223 Err(e) => Ok(ToolResult::error(format!("{}", e))),
224 }
225 }
226 "workflow_fanout_status" => {
227 let eid = params["execution_id"].as_str().unwrap_or("");
228 match state.fanout.get_status(eid) {
229 Ok(st) => {
230 let branches: Vec<_> = st.branches.iter().map(|b| json!({
231 "destination_id": b.destination_id,
232 "status": format!("{:?}", b.status)
233 })).collect();
234 Ok(ToolResult::text(json!({
235 "execution_id": eid, "completed": st.completed,
236 "branches": branches
237 }).to_string()))
238 }
239 Err(e) => Ok(ToolResult::error(format!("{}", e))),
240 }
241 }
242 "workflow_fanout_policy" => {
243 let fid = params["fanout_id"].as_str().unwrap_or("");
244 match state.fanout.get_step(fid) {
245 Ok(step) => Ok(ToolResult::text(json!({
246 "fanout_id": step.id,
247 "destinations": step.destinations.len(),
248 "timeout_ms": step.timeout_ms
249 }).to_string())),
250 Err(e) => Ok(ToolResult::error(format!("{}", e))),
251 }
252 }
253 _ => Ok(ToolResult::error(format!("Unknown processing tool: {}", name))),
254 }
255}