matrixcode_core/workflow/executors/
validate.rs1use anyhow::{Context, Result};
6use async_trait::async_trait;
7use std::sync::Arc;
8
9use crate::providers::{ChatRequest, ContentBlock, Message, MessageContent, Provider};
10use crate::workflow::context::WorkflowContext;
11use crate::workflow::def::NodeDef;
12use crate::workflow::rule_engine::{Rule, RuleEngine, ValidationResult};
13use crate::workflow::template::TemplateRenderer;
14use super::node_executor::NodeExecutor;
15
16#[derive(Debug, Clone)]
18pub struct ValidateExecutorConfig {
19 pub enable_ai_validation: bool,
21 pub ai_validation_prompt: String,
23 pub abort_on_ai_failure: bool,
25}
26
27impl Default for ValidateExecutorConfig {
28 fn default() -> Self {
29 Self {
30 enable_ai_validation: false,
31 ai_validation_prompt: String::new(),
32 abort_on_ai_failure: true,
33 }
34 }
35}
36
37pub struct ValidateExecutor {
41 provider: Option<Arc<dyn Provider>>,
43 config: ValidateExecutorConfig,
45 template_renderer: TemplateRenderer,
47}
48
49impl ValidateExecutor {
50 pub fn new() -> Self {
52 Self {
53 provider: None,
54 config: ValidateExecutorConfig::default(),
55 template_renderer: TemplateRenderer::new(),
56 }
57 }
58
59 pub fn with_ai(provider: Arc<dyn Provider>, config: ValidateExecutorConfig) -> Self {
61 Self {
62 provider: Some(provider),
63 config,
64 template_renderer: TemplateRenderer::new(),
65 }
66 }
67
68 async fn validate_with_ai(
70 &self,
71 data: &serde_json::Value,
72 context: &WorkflowContext,
73 ) -> Result<ValidationResult> {
74 if let Some(provider) = &self.provider {
75 let prompt = if self.config.ai_validation_prompt.is_empty() {
77 format!(
78 "Please validate the following data and return a JSON object with 'passed' (boolean) and 'errors' (array of strings):\n{}",
79 serde_json::to_string_pretty(data)?
80 )
81 } else {
82 self.template_renderer.render(&self.config.ai_validation_prompt, &context.variables)?
83 };
84
85 let messages = vec![Message {
87 role: crate::providers::Role::User,
88 content: MessageContent::Text(prompt),
89 }];
90
91 let request = ChatRequest {
92 messages,
93 tools: Vec::new(),
94 system: Some("You are a data validator. Return JSON with 'passed' and 'errors' fields.".to_string()),
95 think: false,
96 max_tokens: 1024,
97 server_tools: Vec::new(),
98 enable_caching: false,
99 };
100
101 let response = provider.chat(request).await?;
103
104 for block in &response.content {
106 if let ContentBlock::Text { text } = block
107 && let Ok(json) = serde_json::from_str::<serde_json::Value>(text) {
108 let passed = json.get("passed")
109 .and_then(|v| v.as_bool())
110 .unwrap_or(false);
111 let errors = json.get("errors")
112 .and_then(|v| v.as_array())
113 .map(|arr| arr.iter()
114 .filter_map(|v| v.as_str().map(|s| s.to_string()))
115 .collect())
116 .unwrap_or_default();
117
118 return Ok(ValidationResult {
119 passed,
120 errors,
121 });
122 }
123 }
124
125 Ok(ValidationResult::failure("Failed to parse AI validation response".to_string()))
127 } else {
128 Ok(ValidationResult::success())
130 }
131 }
132}
133
134impl Default for ValidateExecutor {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140#[async_trait]
141impl NodeExecutor for ValidateExecutor {
142 async fn execute(
143 &self,
144 node: &NodeDef,
145 context: &mut WorkflowContext,
146 ) -> Result<serde_json::Value> {
147 let rules_json = node.params.get("rules")
149 .ok_or_else(|| anyhow::anyhow!("Validate executor requires 'rules' parameter"))?;
150
151 let rules: Vec<Rule> = serde_json::from_value(rules_json.clone())
153 .with_context(|| "Failed to parse validation rules")?;
154
155 let mut rule_engine = RuleEngine::new();
157
158 let mut result = ValidationResult::success();
160 for rule in &rules {
161 result = result.merge(rule_engine.validate(rule, &context.variables)?);
162 }
163
164 if result.passed && self.config.enable_ai_validation && self.provider.is_some() {
166 let context_vars: serde_json::Map<String, serde_json::Value> = context
168 .variables
169 .iter()
170 .map(|(k, v)| (k.clone(), v.clone()))
171 .collect();
172
173 let data_to_validate = node.params.get("data")
174 .cloned()
175 .unwrap_or(serde_json::Value::Object(context_vars));
176
177 let ai_result = self.validate_with_ai(&data_to_validate, context).await?;
178 result = result.merge(ai_result);
179 }
180
181 let output = serde_json::json!({
183 "passed": result.passed,
184 "errors": result.errors,
185 "node_id": node.id,
186 });
187
188 if !result.passed && self.config.abort_on_ai_failure {
190 return Err(anyhow::anyhow!("Validation failed: {}", result.errors.join("; ")));
191 }
192
193 Ok(output)
194 }
195
196 fn name(&self) -> &str {
197 "validate_executor"
198 }
199}