1use anyhow::{Result, bail};
12use async_trait::async_trait;
13use serde_json::{Value, json};
14
15use crate::tools::{Tool, ToolDefinition};
16use crate::workflow::{
17 EdgeDef, InputDef, NodeDef, NodeType, OutputDef,
18 parser::{parse_workflow, parse_workflow_from_file, to_yaml},
19};
20use std::fs;
21use std::path::PathBuf;
22
23fn get_workflow_dir(location: &str) -> Result<PathBuf> {
25 match location {
26 "project" => {
27 let cwd = std::env::current_dir()
28 .map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?;
29 Ok(cwd.join(".matrix").join("workflows"))
30 }
31 "user" => dirs::home_dir()
32 .ok_or_else(|| anyhow::anyhow!("Failed to get home directory"))
33 .map(|p| p.join(".matrix").join("workflows")),
34 _ => bail!("Invalid location: {}. Use 'project' or 'user'", location),
35 }
36}
37
38pub struct WorkflowCreateTool;
40
41#[async_trait]
42impl Tool for WorkflowCreateTool {
43 fn definition(&self) -> ToolDefinition {
44 ToolDefinition {
45 name: "workflow_create".to_string(),
46 description: "创建和编辑 workflow YAML 文件,支持迭代修改。
47
48【核心功能】
49此工具是 workflow 制作的核心工具,支持完整的创建-编辑-验证循环。
50
51【节点类型和必需字段】
52**基础字段**(所有节点):
53- id: 节点唯一标识
54- type: 节点类型 (start/end/task/condition/parallel/approval/wait/subworkflow)
55- name: 节点显示名称
56
57**特殊字段**(按节点类型):
58- **task**: task(任务名), params(参数), timeout_ms, on_failure
59- **condition**: branches(条件分支列表,**必须用 name 字段**)
60 - 每个分支: {\"name\": \"分支名\", \"condition\": \"表达式\", \"target\": \"目标节点ID\"}
61 - ⚠️ 分支用 `name` 而非 `id`
62- **approval**: approvers(审批人列表), timeout_ms
63- **parallel/pipeline**: parallel_branches(并行分支列表)
64 - 每个分支: {\"name\": \"分支名\", \"nodes\": [节点列表]}
65- **wait**: wait_ms(等待时间)
66- **subworkflow**: workflow(子工作流名称)
67
68【功能模式】
69- **create**: 创建新 workflow(首次创建)
70- **edit**: 编辑现有 workflow(精确修改节点、边、参数)
71- **template**: 获取模板(快速开始,推荐先用模板)
72- **validate**: 验证结构(检查错误)
73- **info**: 查看 workflow 详情
74
75【推荐流程】
761. template → 获取 condition/parallel/approval 模板作为起点
772. create → 创建初始版本
783. validate → 检查是否有错误
794. edit → 多次调整直到满意
80
81【常见错误】
821. ❌ branches 分支用 id 而非 name → 正确: {\"name\": \"分支名\", ...}
832. ❌ 缺少 start 或 end 节点 → 必须有这两种节点
843. ❌ 边引用不存在节点 → 检查节点 ID 是否正确
854. ❌ approval 缺少 approvers → 必须指定审批人
86
87【使用示例】
88获取 condition 模板:
89{\"mode\": \"template\", \"template_type\": \"condition\"}
90
91创建带条件分支的节点:
92{\"id\": \"check\", \"type\": \"condition\", \"name\": \"检查\",
93 \"branches\": [{\"name\": \"是\", \"condition\": \"{{value}} > 0\", \"target\": \"yes_node\"}]}
94
95创建审批节点:
96{\"id\": \"approve\", \"type\": \"approval\", \"name\": \"用户确认\",
97 \"approvers\": [\"user\"], \"timeout_ms\": 300000}".to_string(),
98 parameters: json!({
99 "type": "object",
100 "properties": {
101 "mode": {
102 "type": "string",
103 "enum": ["create", "edit", "template", "validate", "info"],
104 "description": "操作模式",
105 "default": "create"
106 },
107 "workflow_id": {
108 "type": "string",
109 "description": "目标 workflow ID (edit/info 模式必需)"
110 },
111 "workflow": {
112 "type": "object",
113 "description": "完整 workflow 定义 (create 模式)"
114 },
115 "yaml_content": {
116 "type": "string",
117 "description": "YAML 内容字符串 (可选)"
118 },
119 "edit_operation": {
120 "type": "string",
121 "enum": [
122 "add_node", "remove_node", "update_node",
123 "add_edge", "remove_edge",
124 "add_input", "remove_input", "update_input",
125 "add_output", "remove_output", "update_output",
126 "update_metadata"
127 ],
128 "description": "编辑操作类型 (edit 模式)"
129 },
130 "edit_target": {
131 "type": "string",
132 "description": "编辑目标:节点ID、边ID、输入参数名等"
133 },
134 "edit_value": {
135 "type": "object",
136 "description": "新值:节点定义、边定义、参数定义等"
137 },
138 "location": {
139 "type": "string",
140 "enum": ["project", "user"],
141 "description": "保存位置",
142 "default": "project"
143 },
144 "template_type": {
145 "type": "string",
146 "enum": ["simple", "parallel", "condition", "research", "batch", "approval"],
147 "description": "模板类型 (template 模式)"
148 },
149 "overwrite": {
150 "type": "boolean",
151 "description": "是否覆盖已存在文件",
152 "default": false
153 }
154 },
155 "required": ["mode"]
156 }),
157 is_priority: false,
158 }
159 }
160
161 async fn execute(&self, params: Value) -> Result<String> {
162 let mode = params["mode"].as_str().unwrap_or("create");
163
164 match mode {
165 "create" => self.handle_create(params).await,
166 "edit" => self.handle_edit(params).await,
167 "template" => self.handle_template(params).await,
168 "validate" => self.handle_validate(params).await,
169 "info" => self.handle_info(params).await,
170 _ => bail!(
171 "Invalid mode: {}. Supported modes: create, edit, template, validate, info",
172 mode
173 ),
174 }
175 }
176}
177
178impl WorkflowCreateTool {
179 async fn handle_create(&self, params: Value) -> Result<String> {
181 let workflow_def = if let Some(yaml_str) = params["yaml_content"].as_str() {
183 parse_workflow(yaml_str).map_err(|e| anyhow::anyhow!("Failed to parse YAML: {}", e))?
185 } else if let Some(workflow_json) = params.get("workflow") {
186 serde_json::from_value(workflow_json.clone())
188 .map_err(|e| anyhow::anyhow!("Failed to parse workflow JSON: {}", e))?
189 } else {
190 bail!(
191 "Missing workflow definition. Provide either 'workflow' (JSON) or 'yaml_content' (YAML string)"
192 );
193 };
194
195 workflow_def
197 .validate()
198 .map_err(|e| anyhow::anyhow!("Workflow validation failed: {}", e))?;
199
200 let location = params["location"].as_str().unwrap_or("project");
202 let overwrite = params["overwrite"].as_bool().unwrap_or(false);
203
204 let save_dir = get_workflow_dir(location)?;
205
206 fs::create_dir_all(&save_dir)
208 .map_err(|e| anyhow::anyhow!("Failed to create directory {:?}: {}", save_dir, e))?;
209
210 let file_path = save_dir.join(format!("{}.yaml", workflow_def.id));
212 if file_path.exists() && !overwrite {
213 bail!(
214 "Workflow file already exists: {:?}\nSet overwrite=true to replace it.",
215 file_path
216 );
217 }
218
219 let yaml_content = to_yaml(&workflow_def)
221 .map_err(|e| anyhow::anyhow!("Failed to convert to YAML: {}", e))?;
222
223 fs::write(&file_path, &yaml_content)
225 .map_err(|e| anyhow::anyhow!("Failed to write file {:?}: {}", file_path, e))?;
226
227 Ok(format!(
228 "✓ Workflow created successfully!\n\n📄 File: {:?}\n📁 ID: {}\n📝 Name: {}\n\n```yaml\n{}\n```\n\nUse 'workflow_run' to execute this workflow.",
229 file_path, workflow_def.id, workflow_def.name, yaml_content
230 ))
231 }
232
233 async fn handle_template(&self, params: Value) -> Result<String> {
235 let template_type = params["template_type"].as_str().unwrap_or("simple");
236
237 let template = match template_type {
238 "simple" => self.get_simple_template(),
239 "parallel" => self.get_parallel_template(),
240 "condition" => self.get_condition_template(),
241 "research" => self.get_research_template(),
242 "batch" => self.get_batch_template(),
243 "approval" => self.get_approval_template(),
244 _ => bail!(
245 "Unknown template type: {}. Available: simple, parallel, condition, research, batch, approval",
246 template_type
247 ),
248 };
249
250 Ok(format!(
251 "📋 Workflow Template: {}\n\n```yaml\n{}\n```\n\n💡 Tips:\n- Copy this template and modify as needed\n- Use 'workflow_create' with mode='create' to save it\n- Each workflow must have a start node and an end node",
252 template_type, template
253 ))
254 }
255
256 async fn handle_validate(&self, params: Value) -> Result<String> {
258 let workflow_def = if let Some(yaml_str) = params["yaml_content"].as_str() {
259 parse_workflow(yaml_str).map_err(|e| anyhow::anyhow!("YAML parsing failed: {}", e))?
260 } else if let Some(workflow_json) = params.get("workflow") {
261 serde_json::from_value(workflow_json.clone())
262 .map_err(|e| anyhow::anyhow!("JSON parsing failed: {}", e))?
263 } else {
264 bail!(
265 "Missing workflow definition. Provide either 'workflow' (JSON) or 'yaml_content' (YAML string)"
266 );
267 };
268
269 match workflow_def.validate() {
271 Ok(()) => Ok(format!(
272 "✓ Workflow validation passed!\n\n📁 ID: {}\n📝 Name: {}\n📊 Nodes: {}\n🔗 Edges: {}\n📥 Inputs: {}\n📤 Outputs: {}",
273 workflow_def.id,
274 workflow_def.name,
275 workflow_def.nodes.len(),
276 workflow_def.edges.len(),
277 workflow_def.inputs.len(),
278 workflow_def.outputs.len()
279 )),
280 Err(e) => bail!("Workflow validation failed: {}", e),
281 }
282 }
283
284 async fn handle_edit(&self, params: Value) -> Result<String> {
286 let workflow_id = params["workflow_id"]
288 .as_str()
289 .ok_or_else(|| anyhow::anyhow!("Missing 'workflow_id' parameter"))?;
290
291 let location = params["location"].as_str().unwrap_or("project");
293 let save_dir = get_workflow_dir(location)?;
294 let file_path = save_dir.join(format!("{}.yaml", workflow_id));
295
296 if !file_path.exists() {
298 bail!("Workflow '{}' not found at {:?}", workflow_id, file_path);
299 }
300
301 let mut workflow = parse_workflow_from_file(&file_path)
302 .map_err(|e| anyhow::anyhow!("Failed to load workflow: {}", e))?;
303
304 let edit_operation = params["edit_operation"]
306 .as_str()
307 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_operation' parameter"))?;
308
309 let edit_target = params["edit_target"].as_str();
310 let edit_value = params.get("edit_value").cloned();
311
312 match edit_operation {
314 "add_node" => {
316 let node_json = edit_value
317 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for add_node"))?;
318 let new_node: NodeDef = serde_json::from_value(node_json)
319 .map_err(|e| anyhow::anyhow!("Invalid node definition: {}", e))?;
320
321 if workflow.nodes.iter().any(|n| n.id == new_node.id) {
323 bail!("Node '{}' already exists", new_node.id);
324 }
325
326 workflow.nodes.push(new_node);
327 }
328
329 "remove_node" => {
330 let node_id = edit_target
331 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_target' for remove_node"))?;
332 let idx = workflow
333 .nodes
334 .iter()
335 .position(|n| n.id == node_id)
336 .ok_or_else(|| anyhow::anyhow!("Node '{}' not found", node_id))?;
337
338 let node = &workflow.nodes[idx];
340 if node.node_type == NodeType::Start || node.node_type == NodeType::End {
341 bail!("Cannot remove start/end nodes");
342 }
343
344 workflow
346 .edges
347 .retain(|e| e.from != node_id && e.to != node_id);
348 workflow.nodes.remove(idx);
349 }
350
351 "update_node" => {
352 let node_id = edit_target
353 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_target' for update_node"))?;
354 let node = workflow
355 .nodes
356 .iter_mut()
357 .find(|n| n.id == node_id)
358 .ok_or_else(|| anyhow::anyhow!("Node '{}' not found", node_id))?;
359
360 let updates = edit_value
361 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for update_node"))?;
362
363 if let Some(name) = updates.get("name").and_then(|v| v.as_str()) {
365 node.name = name.to_string();
366 }
367 if let Some(task) = updates.get("task").and_then(|v| v.as_str()) {
368 node.task = Some(task.to_string());
369 }
370 if let Some(params_val) = updates.get("params") {
371 node.params = serde_json::from_value(params_val.clone())
372 .map_err(|e| anyhow::anyhow!("Invalid params: {}", e))?;
373 }
374 if let Some(timeout) = updates.get("timeout_ms").and_then(|v| v.as_u64()) {
375 node.timeout_ms = Some(timeout);
376 }
377 }
378
379 "add_edge" => {
381 let edge_json = edit_value
382 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for add_edge"))?;
383 let new_edge: EdgeDef = serde_json::from_value(edge_json)
384 .map_err(|e| anyhow::anyhow!("Invalid edge definition: {}", e))?;
385
386 if !workflow.nodes.iter().any(|n| n.id == new_edge.from) {
388 bail!("Source node '{}' not found", new_edge.from);
389 }
390 if !workflow.nodes.iter().any(|n| n.id == new_edge.to) {
391 bail!("Target node '{}' not found", new_edge.to);
392 }
393
394 workflow.edges.push(new_edge);
395 }
396
397 "remove_edge" => {
398 let from = edit_target.ok_or_else(|| {
399 anyhow::anyhow!("Missing 'edit_target' (from node) for remove_edge")
400 })?;
401 let to = params["to"]
402 .as_str()
403 .ok_or_else(|| anyhow::anyhow!("Missing 'to' parameter for remove_edge"))?;
404
405 workflow.edges.retain(|e| e.from != from || e.to != to);
406 }
407
408 "add_input" => {
410 let input_json = edit_value
411 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for add_input"))?;
412 let new_input: InputDef = serde_json::from_value(input_json)
413 .map_err(|e| anyhow::anyhow!("Invalid input definition: {}", e))?;
414
415 workflow.inputs.push(new_input);
416 }
417
418 "remove_input" => {
419 let input_name = edit_target
420 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_target' for remove_input"))?;
421 workflow.inputs.retain(|i| i.name != input_name);
422 }
423
424 "update_input" => {
425 let input_name = edit_target
426 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_target' for update_input"))?;
427 let input = workflow
428 .inputs
429 .iter_mut()
430 .find(|i| i.name == input_name)
431 .ok_or_else(|| anyhow::anyhow!("Input '{}' not found", input_name))?;
432
433 let updates = edit_value
434 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for update_input"))?;
435
436 if let Some(desc) = updates.get("description").and_then(|v| v.as_str()) {
437 input.description = Some(desc.to_string());
438 }
439 if let Some(required) = updates.get("required").and_then(|v| v.as_bool()) {
440 input.required = required;
441 }
442 if let Some(default) = updates.get("default") {
443 input.default = Some(default.clone());
444 }
445 }
446
447 "add_output" => {
449 let output_json = edit_value
450 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for add_output"))?;
451 let new_output: OutputDef = serde_json::from_value(output_json)
452 .map_err(|e| anyhow::anyhow!("Invalid output definition: {}", e))?;
453
454 workflow.outputs.push(new_output);
455 }
456
457 "remove_output" => {
458 let output_name = edit_target
459 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_target' for remove_output"))?;
460 workflow.outputs.retain(|o| o.name != output_name);
461 }
462
463 "update_output" => {
464 let output_name = edit_target
465 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_target' for update_output"))?;
466 let output = workflow
467 .outputs
468 .iter_mut()
469 .find(|o| o.name == output_name)
470 .ok_or_else(|| anyhow::anyhow!("Output '{}' not found", output_name))?;
471
472 let updates = edit_value
473 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for update_output"))?;
474
475 if let Some(value) = updates.get("value").and_then(|v| v.as_str()) {
476 output.value = value.to_string();
477 }
478 if let Some(desc) = updates.get("description").and_then(|v| v.as_str()) {
479 output.description = Some(desc.to_string());
480 }
481 }
482
483 "update_metadata" => {
485 let updates = edit_value
486 .ok_or_else(|| anyhow::anyhow!("Missing 'edit_value' for update_metadata"))?;
487
488 if let Some(name) = updates.get("name").and_then(|v| v.as_str()) {
489 workflow.name = name.to_string();
490 }
491 if let Some(desc) = updates.get("description").and_then(|v| v.as_str()) {
492 workflow.description = Some(desc.to_string());
493 }
494 if let Some(version) = updates.get("version").and_then(|v| v.as_str()) {
495 workflow.version = version.to_string();
496 }
497 }
498
499 _ => bail!("Unknown edit operation: {}", edit_operation),
500 }
501
502 workflow
504 .validate()
505 .map_err(|e| anyhow::anyhow!("Workflow validation failed after edit: {}", e))?;
506
507 let yaml_content =
509 to_yaml(&workflow).map_err(|e| anyhow::anyhow!("Failed to convert to YAML: {}", e))?;
510 fs::write(&file_path, &yaml_content)
511 .map_err(|e| anyhow::anyhow!("Failed to write file {:?}: {}", file_path, e))?;
512
513 Ok(format!(
514 "✓ Workflow '{}' updated successfully!\n\nOperation: {}\n📄 File: {:?}\n\n```yaml\n{}\n```\n\nUse 'info' mode to see detailed structure.",
515 workflow_id, edit_operation, file_path, yaml_content
516 ))
517 }
518
519 async fn handle_info(&self, params: Value) -> Result<String> {
521 let workflow_id = params["workflow_id"]
522 .as_str()
523 .ok_or_else(|| anyhow::anyhow!("Missing 'workflow_id' parameter"))?;
524
525 let location = params["location"].as_str().unwrap_or("project");
527 let save_dir = get_workflow_dir(location)?;
528 let file_path = save_dir.join(format!("{}.yaml", workflow_id));
529
530 if !file_path.exists() {
532 bail!("Workflow '{}' not found at {:?}", workflow_id, file_path);
533 }
534
535 let workflow = parse_workflow_from_file(&file_path)
536 .map_err(|e| anyhow::anyhow!("Failed to load workflow: {}", e))?;
537
538 let mut info = format!(
539 "📋 Workflow: {}\n\n📁 ID: {}\n📝 Name: {}\n📌 Version: {}\n",
540 workflow_id, workflow.id, workflow.name, workflow.version
541 );
542
543 if let Some(ref desc) = workflow.description {
544 info.push_str(&format!("📖 Description: {}\n", desc));
545 }
546
547 info.push_str("\n📊 Nodes:\n");
549 for node in &workflow.nodes {
550 let node_type = match node.node_type {
551 NodeType::Start => "start",
552 NodeType::End => "end",
553 NodeType::Task => "task",
554 NodeType::Condition => "condition",
555 NodeType::Parallel => "parallel",
556 NodeType::Pipeline => "pipeline",
557 NodeType::Wait => "wait",
558 NodeType::Approval => "approval",
559 NodeType::SubWorkflow => "subworkflow",
560 };
561 info.push_str(&format!(" - {} [{}] {}\n", node.id, node_type, node.name));
562
563 if let Some(ref task) = node.task {
564 info.push_str(&format!(" Task: {}\n", task));
565 }
566 }
567
568 info.push_str("\n🔗 Edges:\n");
570 for edge in &workflow.edges {
571 let label = edge
572 .label
573 .as_ref()
574 .map(|l| format!(" ({})", l))
575 .unwrap_or_default();
576 info.push_str(&format!(" {} → {}{}\n", edge.from, edge.to, label));
577 }
578
579 if !workflow.inputs.is_empty() {
581 info.push_str("\n📥 Inputs:\n");
582 for input in &workflow.inputs {
583 let required = if input.required { " (required)" } else { "" };
584 info.push_str(&format!(
585 " - {} [{}]{}\n",
586 input.name, input.input_type, required
587 ));
588 if let Some(ref desc) = input.description {
589 info.push_str(&format!(" {}\n", desc));
590 }
591 }
592 }
593
594 if !workflow.outputs.is_empty() {
596 info.push_str("\n📤 Outputs:\n");
597 for output in &workflow.outputs {
598 info.push_str(&format!(" - {}: {}\n", output.name, output.value));
599 }
600 }
601
602 Ok(info)
603 }
604
605 fn get_simple_template(&self) -> String {
607 r#"id: my-workflow
608name: My Workflow
609version: "1.0.0"
610description: A simple workflow example using bash tool
611
612inputs:
613 - name: param1
614 type: string
615 required: true
616 description: First parameter
617
618outputs:
619 - name: result
620 value: "{{nodes.task1.output}}"
621
622nodes:
623 - id: start
624 type: start
625 name: Start
626
627 - id: task1
628 type: task
629 name: First Task
630 task: bash
631 params:
632 command: "echo '{{inputs.param1}}'"
633 timeout_ms: 5000
634 on_failure:
635 type: abort
636
637 - id: end
638 type: end
639 name: End
640
641edges:
642 - from: start
643 to: task1
644 - from: task1
645 to: end
646"#
647 .to_string()
648 }
649
650 fn get_parallel_template(&self) -> String {
652 r#"id: parallel-workflow
653name: Parallel Workflow
654version: "1.0.0"
655description: A workflow with parallel execution using bash tool
656
657inputs:
658 - name: data
659 type: string
660 required: true
661 description: Input data to process
662
663nodes:
664 - id: start
665 type: start
666 name: Start
667
668 - id: parallel1
669 type: parallel
670 name: Parallel Processing
671 parallel_branches:
672 - name: Branch A
673 nodes:
674 - id: task_a
675 type: task
676 name: Task A
677 task: bash
678 params:
679 command: "echo 'Processing A: {{inputs.data}}'"
680 - name: Branch B
681 nodes:
682 - id: task_b
683 type: task
684 name: Task B
685 task: bash
686 params:
687 command: "echo 'Processing B: {{inputs.data}}'"
688
689 - id: merge
690 type: task
691 name: Merge Results
692 task: bash
693 params:
694 command: "echo 'Merged results from A and B'"
695 timeout_ms: 10000
696
697 - id: end
698 type: end
699 name: End
700
701edges:
702 - from: start
703 to: parallel1
704 - from: parallel1
705 to: merge
706 - from: merge
707 to: end
708"#
709 .to_string()
710 }
711
712 fn get_condition_template(&self) -> String {
714 r#"id: condition-workflow
715name: Conditional Workflow
716version: "1.0.0"
717description: A workflow with conditional branches using bash tool
718
719inputs:
720 - name: value
721 type: number
722 required: true
723 description: Value to check
724
725nodes:
726 - id: start
727 type: start
728 name: Start
729
730 - id: check_value
731 type: condition
732 name: Check Value
733 branches:
734 - name: Positive
735 condition: "{{inputs.value}} > 0"
736 target: positive_task
737 - name: Negative
738 condition: "{{inputs.value}} < 0"
739 target: negative_task
740 - name: Zero
741 condition: "{{inputs.value}} == 0"
742 target: zero_task
743
744 - id: positive_task
745 type: task
746 name: Handle Positive
747 task: bash
748 params:
749 command: "echo 'Value is positive'"
750 timeout_ms: 5000
751
752 - id: negative_task
753 type: task
754 name: Handle Negative
755 task: bash
756 params:
757 command: "echo 'Value is negative'"
758 timeout_ms: 5000
759
760 - id: zero_task
761 type: task
762 name: Handle Zero
763 task: bash
764 params:
765 command: "echo 'Value is zero'"
766 timeout_ms: 5000
767
768 - id: end
769 type: end
770 name: End
771
772edges:
773 - from: start
774 to: check_value
775 - from: positive_task
776 to: end
777 - from: negative_task
778 to: end
779 - from: zero_task
780 to: end
781"#
782 .to_string()
783 }
784
785 fn get_research_template(&self) -> String {
787 r#"id: research-workflow
788name: Research Workflow
789version: "1.0.0"
790description: Automated research and report generation
791
792inputs:
793 - name: topic
794 type: string
795 required: true
796 description: Research topic
797 - name: depth
798 type: string
799 required: false
800 default: "medium"
801 description: Research depth (shallow/medium/deep)
802
803outputs:
804 - name: report
805 value: "{{nodes.generate_report.output}}"
806
807nodes:
808 - id: start
809 type: start
810 name: Start
811
812 - id: search_web
813 type: task
814 name: Search Web
815 task: websearch
816 params:
817 query: "{{inputs.topic}}"
818 max_results: 10
819 on_failure:
820 type: retry
821 max_attempts: 3
822
823 - id: analyze_results
824 type: task
825 name: Analyze Results
826 task: analyze
827 params:
828 data: "{{nodes.search_web.output}}"
829 depth: "{{inputs.depth}}"
830
831 - id: generate_report
832 type: task
833 name: Generate Report
834 task: content_generation
835 params:
836 topic: "{{inputs.topic}}"
837 research_data: "{{nodes.analyze_results.output}}"
838 style: "professional"
839
840 - id: end
841 type: end
842 name: End
843
844edges:
845 - from: start
846 to: search_web
847 - from: search_web
848 to: analyze_results
849 - from: analyze_results
850 to: generate_report
851 - from: generate_report
852 to: end
853"#
854 .to_string()
855 }
856
857 fn get_batch_template(&self) -> String {
859 r#"id: batch-workflow
860name: Batch Processing Workflow
861version: "1.0.0"
862description: Process multiple items in batch
863
864inputs:
865 - name: items
866 type: array
867 required: true
868 description: Array of items to process
869 - name: operation
870 type: string
871 required: true
872 description: Operation to perform on each item
873
874nodes:
875 - id: start
876 type: start
877 name: Start
878
879 - id: prepare_batch
880 type: task
881 name: Prepare Batch
882 task: prepare_batch
883 params:
884 items: "{{inputs.items}}"
885
886 - id: process_items
887 type: parallel
888 name: Process Items
889 parallel_branches:
890 - name: Process Chunk 1
891 nodes:
892 - id: chunk1
893 type: task
894 name: Process Chunk 1
895 task: "{{inputs.operation}}"
896 params:
897 items: "{{nodes.prepare_batch.chunk1}}"
898 - name: Process Chunk 2
899 nodes:
900 - id: chunk2
901 type: task
902 name: Process Chunk 2
903 task: "{{inputs.operation}}"
904 params:
905 items: "{{nodes.prepare_batch.chunk2}}"
906
907 - id: aggregate_results
908 type: task
909 name: Aggregate Results
910 task: aggregate
911 params:
912 results:
913 - "{{nodes.chunk1.output}}"
914 - "{{nodes.chunk2.output}}"
915
916 - id: end
917 type: end
918 name: End
919
920edges:
921 - from: start
922 to: prepare_batch
923 - from: prepare_batch
924 to: process_items
925 - from: process_items
926 to: aggregate_results
927 - from: aggregate_results
928 to: end
929"#
930 .to_string()
931 }
932
933 fn get_approval_template(&self) -> String {
935 r#"id: approval-workflow
936name: Approval Workflow
937version: "1.0.0"
938description: Workflow with user approval checkpoint
939
940inputs:
941 - name: task_data
942 type: string
943 required: true
944 description: Data to process before approval
945
946outputs:
947 - name: final_result
948 value: "{{nodes.final_task.output}}"
949
950nodes:
951 - id: start
952 type: start
953 name: Start
954
955 - id: prepare_task
956 type: task
957 name: Prepare Content
958 task: bash
959 params:
960 command: "echo 'Processing: {{inputs.task_data}}'"
961 timeout_ms: 10000
962
963 - id: user_approval
964 type: approval
965 name: User Approval
966 description: Please review and approve the result
967 approvers:
968 - "user"
969 timeout_ms: 300000
970
971 - id: check_approval
972 type: condition
973 name: Check Approval Result
974 branches:
975 - name: Approved
976 condition: "{{approval_result}} == 'approved'"
977 target: final_task
978 - name: Rejected
979 condition: "{{approval_result}} == 'rejected'"
980 target: revise_task
981
982 - id: revise_task
983 type: task
984 name: Revise Content
985 task: bash
986 params:
987 command: "echo 'Revising content based on feedback'"
988 timeout_ms: 10000
989
990 - id: final_task
991 type: task
992 name: Final Output
993 task: bash
994 params:
995 command: "echo 'Final approved output'"
996 timeout_ms: 10000
997
998 - id: end
999 type: end
1000 name: End
1001
1002edges:
1003 - from: start
1004 to: prepare_task
1005 - from: prepare_task
1006 to: user_approval
1007 - from: user_approval
1008 to: check_approval
1009 - from: check_approval
1010 to: final_task
1011 label: Approved
1012 - from: check_approval
1013 to: revise_task
1014 label: Rejected
1015 - from: revise_task
1016 to: user_approval
1017 - from: final_task
1018 to: end
1019"#
1020 .to_string()
1021 }
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026 use super::*;
1027 use serde_json::json;
1028
1029 #[tokio::test]
1030 async fn test_template_simple() {
1031 let tool = WorkflowCreateTool;
1032 let result = tool
1033 .execute(json!({
1034 "mode": "template",
1035 "template_type": "simple"
1036 }))
1037 .await
1038 .unwrap();
1039
1040 assert!(result.contains("simple"));
1041 assert!(result.contains("id:"));
1042 assert!(result.contains("nodes:"));
1043 assert!(result.contains("edges:"));
1044 }
1045
1046 #[tokio::test]
1047 async fn test_template_research() {
1048 let tool = WorkflowCreateTool;
1049 let result = tool
1050 .execute(json!({
1051 "mode": "template",
1052 "template_type": "research"
1053 }))
1054 .await
1055 .unwrap();
1056
1057 assert!(result.contains("research"));
1058 assert!(result.contains("websearch"));
1059 assert!(result.contains("generate_report"));
1060 }
1061
1062 #[tokio::test]
1063 async fn test_validate_valid_workflow() {
1064 let tool = WorkflowCreateTool;
1065 let result = tool.execute(json!({
1066 "mode": "validate",
1067 "yaml_content": "id: test\nname: Test\nnodes:\n - id: start\n type: start\n name: Start\n - id: end\n type: end\n name: End\nedges:\n - from: start\n to: end"
1068 })).await.unwrap();
1069
1070 assert!(result.contains("validation passed"));
1071 assert!(result.contains("Nodes: 2"));
1072 assert!(result.contains("Edges: 1"));
1073 }
1074
1075 #[tokio::test]
1076 async fn test_validate_invalid_workflow() {
1077 let tool = WorkflowCreateTool;
1078 let result = tool.execute(json!({
1079 "mode": "validate",
1080 "yaml_content": "id: test\nname: Test\nnodes:\n - id: task1\n type: task\n name: Task"
1081 })).await;
1082
1083 assert!(result.is_err());
1085 assert!(
1086 result
1087 .unwrap_err()
1088 .to_string()
1089 .contains("validation failed")
1090 );
1091 }
1092
1093 #[tokio::test]
1094 async fn test_info_mode() {
1095 let tool = WorkflowCreateTool;
1096
1097 let create_result = tool.execute(json!({
1099 "mode": "create",
1100 "workflow": {
1101 "id": "test-info-workflow",
1102 "name": "Test Info Workflow",
1103 "version": "1.0.0",
1104 "description": "A test workflow for info mode",
1105 "nodes": [
1106 {"id": "start", "type": "start", "name": "Start"},
1107 {"id": "task1", "type": "task", "name": "Task 1", "task": "process"},
1108 {"id": "end", "type": "end", "name": "End"}
1109 ],
1110 "edges": [
1111 {"from": "start", "to": "task1"},
1112 {"from": "task1", "to": "end"}
1113 ],
1114 "inputs": [
1115 {"name": "data", "type": "string", "required": true, "description": "Input data"}
1116 ],
1117 "outputs": [
1118 {"name": "result", "value": "{{nodes.task1.output}}"}
1119 ]
1120 },
1121 "location": "project",
1122 "overwrite": true
1123 })).await.unwrap();
1124
1125 assert!(create_result.contains("created successfully"));
1126
1127 let info_result = tool
1129 .execute(json!({
1130 "mode": "info",
1131 "workflow_id": "test-info-workflow"
1132 }))
1133 .await
1134 .unwrap();
1135
1136 assert!(info_result.contains("test-info-workflow"));
1137 assert!(info_result.contains("Test Info Workflow"));
1138 assert!(info_result.contains("Task 1"));
1139 assert!(info_result.contains("start → task1"));
1140 assert!(info_result.contains("data [string] (required)"));
1141 }
1142
1143 #[tokio::test]
1144 async fn test_edit_update_node() {
1145 let tool = WorkflowCreateTool;
1146
1147 tool.execute(json!({
1149 "mode": "create",
1150 "workflow": {
1151 "id": "test-edit-workflow",
1152 "name": "Test Edit Workflow",
1153 "version": "1.0.0",
1154 "nodes": [
1155 {"id": "start", "type": "start", "name": "Start"},
1156 {"id": "task1", "type": "task", "name": "Old Name", "task": "old_task"},
1157 {"id": "end", "type": "end", "name": "End"}
1158 ],
1159 "edges": [
1160 {"from": "start", "to": "task1"},
1161 {"from": "task1", "to": "end"}
1162 ]
1163 },
1164 "location": "project",
1165 "overwrite": true
1166 }))
1167 .await
1168 .unwrap();
1169
1170 let edit_result = tool
1172 .execute(json!({
1173 "mode": "edit",
1174 "workflow_id": "test-edit-workflow",
1175 "edit_operation": "update_node",
1176 "edit_target": "task1",
1177 "edit_value": {
1178 "name": "New Name",
1179 "task": "new_task"
1180 }
1181 }))
1182 .await
1183 .unwrap();
1184
1185 assert!(edit_result.contains("updated successfully"));
1186 assert!(edit_result.contains("update_node"));
1187
1188 let info_result = tool
1190 .execute(json!({
1191 "mode": "info",
1192 "workflow_id": "test-edit-workflow"
1193 }))
1194 .await
1195 .unwrap();
1196
1197 assert!(info_result.contains("New Name"));
1198 assert!(info_result.contains("new_task"));
1199 assert!(!info_result.contains("Old Name"));
1200 }
1201
1202 #[tokio::test]
1203 async fn test_edit_add_node() {
1204 let tool = WorkflowCreateTool;
1205
1206 tool.execute(json!({
1208 "mode": "create",
1209 "workflow": {
1210 "id": "test-add-node-workflow",
1211 "name": "Test Add Node",
1212 "version": "1.0.0",
1213 "nodes": [
1214 {"id": "start", "type": "start", "name": "Start"},
1215 {"id": "end", "type": "end", "name": "End"}
1216 ],
1217 "edges": [
1218 {"from": "start", "to": "end"}
1219 ]
1220 },
1221 "location": "project",
1222 "overwrite": true
1223 }))
1224 .await
1225 .unwrap();
1226
1227 let edit_result = tool
1229 .execute(json!({
1230 "mode": "edit",
1231 "workflow_id": "test-add-node-workflow",
1232 "edit_operation": "add_node",
1233 "edit_value": {
1234 "id": "new_task",
1235 "type": "task",
1236 "name": "New Task",
1237 "task": "process"
1238 }
1239 }))
1240 .await
1241 .unwrap();
1242
1243 assert!(edit_result.contains("updated successfully"));
1244
1245 let edge_result = tool
1247 .execute(json!({
1248 "mode": "edit",
1249 "workflow_id": "test-add-node-workflow",
1250 "edit_operation": "add_edge",
1251 "edit_value": {
1252 "from": "start",
1253 "to": "new_task"
1254 }
1255 }))
1256 .await
1257 .unwrap();
1258
1259 assert!(edge_result.contains("updated successfully"));
1260
1261 tool.execute(json!({
1263 "mode": "edit",
1264 "workflow_id": "test-add-node-workflow",
1265 "edit_operation": "add_edge",
1266 "edit_value": {
1267 "from": "new_task",
1268 "to": "end"
1269 }
1270 }))
1271 .await
1272 .unwrap();
1273
1274 let info_result = tool
1276 .execute(json!({
1277 "mode": "info",
1278 "workflow_id": "test-add-node-workflow"
1279 }))
1280 .await
1281 .unwrap();
1282
1283 assert!(info_result.contains("new_task"));
1284 assert!(info_result.contains("New Task"));
1285 assert!(info_result.contains("start → new_task"));
1286 assert!(info_result.contains("new_task → end"));
1287 }
1288
1289 #[tokio::test]
1290 async fn test_invalid_mode() {
1291 let tool = WorkflowCreateTool;
1292 let result = tool
1293 .execute(json!({
1294 "mode": "invalid"
1295 }))
1296 .await;
1297
1298 assert!(result.is_err());
1299 }
1300
1301 #[test]
1302 fn test_definition() {
1303 let tool = WorkflowCreateTool;
1304 let def = tool.definition();
1305
1306 assert_eq!(def.name, "workflow_create");
1307 assert!(def.description.contains("创建")); assert!(def.parameters["properties"]["mode"].is_object());
1309 }
1310}