1pub use super::action_node_types::*;
18
19use std::collections::HashMap;
20
21pub trait ActionNodeCodeGen {
27 fn generate_code(&self, node_id: &str) -> String;
29
30 fn required_imports(&self) -> Vec<&'static str>;
32
33 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)>;
35}
36
37pub fn generate_error_handling_wrapper(node_id: &str, props: &StandardProperties) -> String {
43 let mut code = String::new();
44
45 match props.error_handling.mode {
46 ErrorMode::Stop => {
47 code.push_str(" // Error handling: stop on error\n");
49 }
50 ErrorMode::Continue => {
51 code.push_str(&format!(
52 " // Error handling: continue on error\n\
53 let {}_result = match {}_execute(state).await {{\n\
54 Ok(v) => v,\n\
55 Err(e) => {{\n\
56 tracing::warn!(node = \"{}\", error = %e, \"Node failed, continuing\");\n\
57 serde_json::Value::Null\n\
58 }}\n\
59 }};\n",
60 node_id, node_id, node_id
61 ));
62 }
63 ErrorMode::Retry => {
64 let retry_count = props.error_handling.retry_count.unwrap_or(3);
65 let retry_delay = props.error_handling.retry_delay.unwrap_or(1000);
66 code.push_str(&format!(
67 " // Error handling: retry up to {} times with {}ms delay\n\
68 let mut {}_attempts = 0u32;\n\
69 let {}_result = loop {{\n\
70 match {}_execute(state).await {{\n\
71 Ok(v) => break v,\n\
72 Err(e) => {{\n\
73 {}_attempts += 1;\n\
74 if {}_attempts >= {} {{\n\
75 return Err(e.into());\n\
76 }}\n\
77 tracing::warn!(node = \"{}\", attempt = {}_attempts, error = %e, \"Retrying\");\n\
78 tokio::time::sleep(std::time::Duration::from_millis({})).await;\n\
79 }}\n\
80 }}\n\
81 }};\n",
82 retry_count, retry_delay,
83 node_id, node_id, node_id, node_id, node_id, retry_count, node_id, node_id, retry_delay
84 ));
85 }
86 ErrorMode::Fallback => {
87 let fallback = props
88 .error_handling
89 .fallback_value
90 .as_ref()
91 .map(|v| v.to_string())
92 .unwrap_or_else(|| "serde_json::Value::Null".to_string());
93 code.push_str(&format!(
94 " // Error handling: fallback on error\n\
95 let {}_result = match {}_execute(state).await {{\n\
96 Ok(v) => v,\n\
97 Err(e) => {{\n\
98 tracing::warn!(node = \"{}\", error = %e, \"Using fallback value\");\n\
99 serde_json::json!({})\n\
100 }}\n\
101 }};\n",
102 node_id, node_id, node_id, fallback
103 ));
104 }
105 }
106
107 code
108}
109
110pub fn generate_skip_condition(node_id: &str, condition: &Option<String>) -> String {
112 match condition {
113 Some(cond) if !cond.is_empty() => {
114 format!(
115 " // Skip condition check\n\
116 if !evaluate_condition(\"{}\", state)? {{\n\
117 tracing::info!(node = \"{}\", \"Skipping node due to condition\");\n\
118 return Ok(serde_json::Value::Null);\n\
119 }}\n\n",
120 cond.replace('"', "\\\""),
121 node_id
122 )
123 }
124 _ => String::new(),
125 }
126}
127
128pub fn generate_callbacks(node_id: &str, callbacks: &Callbacks, phase: &str) -> String {
130 let callback = match phase {
131 "start" => &callbacks.on_start,
132 "complete" => &callbacks.on_complete,
133 "error" => &callbacks.on_error,
134 _ => return String::new(),
135 };
136
137 match callback {
138 Some(cb) if !cb.is_empty() => {
139 format!(
140 " // {} callback\n\
141 if let Err(e) = execute_callback(\"{}\", state).await {{\n\
142 tracing::warn!(node = \"{}\", callback = \"{}\", error = %e, \"Callback failed\");\n\
143 }}\n",
144 phase,
145 cb.replace('"', "\\\""),
146 node_id,
147 phase
148 )
149 }
150 _ => String::new(),
151 }
152}
153
154pub fn generate_timeout_wrapper(node_id: &str, timeout_ms: u64) -> String {
156 format!(
157 " // Timeout: {}ms\n\
158 let {}_future = {}_execute(state);\n\
159 let {}_result = tokio::time::timeout(\n\
160 std::time::Duration::from_millis({}),\n\
161 {}_future\n\
162 ).await.map_err(|_| ActionError::Timeout {{ node: \"{}\".to_string(), timeout_ms: {} }})??;\n",
163 timeout_ms, node_id, node_id, node_id, timeout_ms, node_id, node_id, timeout_ms
164 )
165}
166
167pub fn generate_interpolation_helper() -> &'static str {
169 r#"
170/// Interpolate {{variable}} patterns in a string with state values
171fn interpolate_variables(template: &str, state: &State) -> String {
172 let re = regex::Regex::new(r"\{\{(\w+(?:\.\w+)*)\}\}").unwrap();
173 re.replace_all(template, |caps: ®ex::Captures| {
174 let path = &caps[1];
175 get_nested_value(state, path)
176 .map(|v| match v {
177 serde_json::Value::String(s) => s.clone(),
178 other => other.to_string(),
179 })
180 .unwrap_or_default()
181 }).to_string()
182}
183
184/// Get a nested value from state using dot notation
185fn get_nested_value(state: &State, path: &str) -> Option<&serde_json::Value> {
186 let parts: Vec<&str> = path.split('.').collect();
187 let mut current = state.get(parts[0])?;
188 for part in &parts[1..] {
189 current = current.get(part)?;
190 }
191 Some(current)
192}
193"#
194}
195
196impl ActionNodeCodeGen for TriggerNodeConfig {
201 fn generate_code(&self, node_id: &str) -> String {
202 let mut code = String::new();
203
204 code.push_str(&format!("// Trigger Node: {}\n", self.standard.name));
205 code.push_str(&format!(
206 "async fn {}_trigger(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n",
207 node_id
208 ));
209
210 match self.trigger_type {
211 TriggerType::Manual => {
212 code.push_str(" // Manual trigger - workflow started by user\n");
213 code.push_str(" tracing::info!(\"Manual trigger activated\");\n");
214 code.push_str(" Ok(serde_json::json!({ \"trigger\": \"manual\", \"timestamp\": chrono::Utc::now().to_rfc3339() }))\n");
215 }
216 TriggerType::Webhook => {
217 if let Some(webhook) = &self.webhook {
218 code.push_str(&format!(
219 " // Webhook trigger: {} {}\n",
220 webhook.method, webhook.path
221 ));
222 code.push_str(&format!(" // Auth: {}\n", webhook.auth));
223 code.push_str(" // Note: Webhook handler is set up in the server routes\n");
224 code.push_str(" // This function processes the incoming webhook payload\n");
225 code.push_str(" let payload = state.get(\"webhook_payload\").cloned().unwrap_or(serde_json::Value::Null);\n");
226 code.push_str(" Ok(payload)\n");
227 } else {
228 code.push_str(" Ok(serde_json::Value::Null)\n");
229 }
230 }
231 TriggerType::Schedule => {
232 if let Some(schedule) = &self.schedule {
233 code.push_str(&format!(
234 " // Schedule trigger: {} ({})\n",
235 schedule.cron, schedule.timezone
236 ));
237 code.push_str(" // Note: Cron job is set up externally\n");
238 code.push_str(" Ok(serde_json::json!({\n");
239 code.push_str(" \"trigger\": \"schedule\",\n");
240 code.push_str(&format!(" \"cron\": \"{}\",\n", schedule.cron));
241 code.push_str(&format!(
242 " \"timezone\": \"{}\",\n",
243 schedule.timezone
244 ));
245 code.push_str(" \"timestamp\": chrono::Utc::now().to_rfc3339()\n");
246 code.push_str(" }))\n");
247 } else {
248 code.push_str(" Ok(serde_json::Value::Null)\n");
249 }
250 }
251 TriggerType::Event => {
252 if let Some(event) = &self.event {
253 code.push_str(&format!(
254 " // Event trigger: {} from {}\n",
255 event.event_type, event.source
256 ));
257 code.push_str(" let event_data = state.get(\"event_data\").cloned().unwrap_or(serde_json::Value::Null);\n");
258 code.push_str(" Ok(event_data)\n");
259 } else {
260 code.push_str(" Ok(serde_json::Value::Null)\n");
261 }
262 }
263 }
264
265 code.push_str("}\n\n");
266
267 if self.trigger_type == TriggerType::Webhook {
269 if let Some(webhook) = &self.webhook {
270 code.push_str(&generate_webhook_handler(node_id, webhook));
271 }
272 }
273
274 code
275 }
276
277 fn required_imports(&self) -> Vec<&'static str> {
278 let mut imports = vec!["chrono"];
279 if self.trigger_type == TriggerType::Webhook {
280 imports.push("axum");
281 }
282 if self.trigger_type == TriggerType::Schedule {
283 imports.push("tokio_cron_scheduler");
284 }
285 imports
286 }
287
288 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
289 let mut deps = vec![("chrono", "0.4")];
290 if self.trigger_type == TriggerType::Webhook {
291 deps.push(("axum", "0.7"));
292 }
293 if self.trigger_type == TriggerType::Schedule {
294 deps.push(("tokio-cron-scheduler", "0.10"));
295 }
296 deps
297 }
298}
299
300fn generate_webhook_handler(node_id: &str, webhook: &WebhookConfig) -> String {
301 let mut code = String::new();
302
303 code.push_str(&format!("// Webhook handler for {}\n", node_id));
304 code.push_str(&format!("async fn {}_webhook_handler(\n", node_id));
305
306 match webhook.auth.as_str() {
307 "bearer" => {
308 code.push_str(" headers: axum::http::HeaderMap,\n");
309 }
310 "api_key" => {
311 code.push_str(" headers: axum::http::HeaderMap,\n");
312 }
313 _ => {}
314 }
315
316 if webhook.method == "POST" {
317 code.push_str(" axum::Json(payload): axum::Json<serde_json::Value>,\n");
318 } else {
319 code.push_str(" axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,\n");
320 }
321
322 code.push_str(") -> impl axum::response::IntoResponse {\n");
323
324 match webhook.auth.as_str() {
326 "bearer" => {
327 code.push_str(" // Validate bearer token\n");
328 code.push_str(" let auth_header = headers.get(\"Authorization\").and_then(|v| v.to_str().ok());\n");
329 code.push_str(
330 " if !auth_header.map(|h| h.starts_with(\"Bearer \")).unwrap_or(false) {\n",
331 );
332 code.push_str(" return (axum::http::StatusCode::UNAUTHORIZED, \"Invalid authorization\").into_response();\n");
333 code.push_str(" }\n");
334 }
335 "api_key" => {
336 let header_name = webhook
337 .auth_config
338 .as_ref()
339 .and_then(|c| c.header_name.as_ref())
340 .map(|s| s.as_str())
341 .unwrap_or("X-API-Key");
342 code.push_str(" // Validate API key\n");
343 code.push_str(&format!(
344 " let api_key = headers.get(\"{}\").and_then(|v| v.to_str().ok());\n",
345 header_name
346 ));
347 code.push_str(" if api_key.is_none() {\n");
348 code.push_str(" return (axum::http::StatusCode::UNAUTHORIZED, \"Missing API key\").into_response();\n");
349 code.push_str(" }\n");
350 }
351 _ => {}
352 }
353
354 if webhook.method == "POST" {
355 code.push_str(" axum::Json(payload).into_response()\n");
356 } else {
357 code.push_str(" axum::Json(serde_json::json!(params)).into_response()\n");
358 }
359
360 code.push_str("}\n\n");
361
362 code
363}
364
365impl ActionNodeCodeGen for HttpNodeConfig {
370 fn generate_code(&self, node_id: &str) -> String {
371 let mut code = String::new();
372
373 code.push_str(&format!("// HTTP Node: {}\n", self.standard.name));
374 code.push_str(&format!("async fn {}_http(\n", node_id));
375 code.push_str(" state: &mut State,\n");
376 code.push_str(" client: &reqwest::Client,\n");
377 code.push_str(") -> Result<serde_json::Value, ActionError> {\n");
378
379 code.push_str(&format!(
381 " let url = interpolate_variables(\"{}\", state);\n",
382 self.url.replace('"', "\\\"")
383 ));
384 code.push_str(" tracing::debug!(url = %url, \"Making HTTP request\");\n\n");
385
386 let method = match self.method {
388 HttpMethod::Get => "get",
389 HttpMethod::Post => "post",
390 HttpMethod::Put => "put",
391 HttpMethod::Patch => "patch",
392 HttpMethod::Delete => "delete",
393 };
394 code.push_str(&format!(
395 " let mut request = client.{}(&url);\n\n",
396 method
397 ));
398
399 if !self.headers.is_empty() {
401 code.push_str(" // Headers\n");
402 for (key, value) in &self.headers {
403 code.push_str(&format!(
404 " request = request.header(\"{}\", interpolate_variables(\"{}\", state));\n",
405 key,
406 value.replace('"', "\\\"")
407 ));
408 }
409 code.push('\n');
410 }
411
412 match self.auth.auth_type.as_str() {
414 "bearer" => {
415 if let Some(bearer) = &self.auth.bearer {
416 code.push_str(" // Bearer authentication\n");
417 code.push_str(&format!(
418 " let token = interpolate_variables(\"{}\", state);\n",
419 bearer.token.replace('"', "\\\"")
420 ));
421 code.push_str(" request = request.bearer_auth(&token);\n\n");
422 }
423 }
424 "basic" => {
425 if let Some(basic) = &self.auth.basic {
426 code.push_str(" // Basic authentication\n");
427 code.push_str(&format!(
428 " let username = interpolate_variables(\"{}\", state);\n",
429 basic.username.replace('"', "\\\"")
430 ));
431 code.push_str(&format!(
432 " let password = interpolate_variables(\"{}\", state);\n",
433 basic.password.replace('"', "\\\"")
434 ));
435 code.push_str(
436 " request = request.basic_auth(&username, Some(&password));\n\n",
437 );
438 }
439 }
440 "api_key" => {
441 if let Some(api_key) = &self.auth.api_key {
442 code.push_str(" // API key authentication\n");
443 code.push_str(&format!(
444 " let api_key_value = interpolate_variables(\"{}\", state);\n",
445 api_key.value.replace('"', "\\\"")
446 ));
447 code.push_str(&format!(
448 " request = request.header(\"{}\", &api_key_value);\n\n",
449 api_key.header_name
450 ));
451 }
452 }
453 _ => {}
454 }
455
456 match self.body.body_type.as_str() {
458 "json" => {
459 if let Some(content) = &self.body.content {
460 code.push_str(" // JSON body\n");
461 code.push_str(&format!(
462 " let body_template = r#\"{}\"#;\n",
463 content.to_string().replace("\\", "\\\\")
464 ));
465 code.push_str(
466 " let body_str = interpolate_variables(body_template, state);\n",
467 );
468 code.push_str(
469 " let body: serde_json::Value = serde_json::from_str(&body_str)?;\n",
470 );
471 code.push_str(" request = request.json(&body);\n\n");
472 }
473 }
474 "form" => {
475 if let Some(content) = &self.body.content {
476 code.push_str(" // Form body\n");
477 code.push_str(&format!(
478 " let form_data: std::collections::HashMap<String, String> = serde_json::from_value(serde_json::json!({}))?\n",
479 content
480 ));
481 code.push_str(" .into_iter()\n");
482 code.push_str(" .map(|(k, v)| (k, interpolate_variables(&v, state)))\n");
483 code.push_str(" .collect();\n");
484 code.push_str(" request = request.form(&form_data);\n\n");
485 }
486 }
487 "raw" => {
488 if let Some(content) = &self.body.content {
489 code.push_str(" // Raw body\n");
490 code.push_str(&format!(
491 " let raw_body = interpolate_variables(\"{}\", state);\n",
492 content.to_string().replace('"', "\\\"")
493 ));
494 code.push_str(" request = request.body(raw_body);\n\n");
495 }
496 }
497 _ => {}
498 }
499
500 code.push_str(" // Send request\n");
502 code.push_str(" let response = request.send().await?;\n");
503 code.push_str(" let status = response.status();\n\n");
504
505 if let Some(validation) = &self.response.status_validation {
507 code.push_str(" // Validate status code\n");
508 code.push_str(&format!(
509 " if !validate_status_code(status.as_u16(), \"{}\") {{\n",
510 validation
511 ));
512 code.push_str(" return Err(ActionError::HttpStatus {\n");
513 code.push_str(" status: status.as_u16(),\n");
514 code.push_str(&format!(
515 " expected: \"{}\".to_string(),\n",
516 validation
517 ));
518 code.push_str(" });\n");
519 code.push_str(" }\n\n");
520 }
521
522 match self.response.response_type.as_str() {
524 "json" => {
525 code.push_str(" // Parse JSON response\n");
526 code.push_str(" let result: serde_json::Value = response.json().await?;\n");
527
528 if let Some(json_path) = &self.response.json_path {
530 code.push_str(&format!(
531 " let extracted = jsonpath_lib::select(&result, \"{}\")?\n",
532 json_path
533 ));
534 code.push_str(" .into_iter().next().cloned().unwrap_or(serde_json::Value::Null);\n");
535 code.push_str(&format!(
536 " state.insert(\"{}\".to_string(), extracted.clone());\n",
537 self.standard.mapping.output_key
538 ));
539 code.push_str(" Ok(extracted)\n");
540 } else {
541 code.push_str(&format!(
542 " state.insert(\"{}\".to_string(), result.clone());\n",
543 self.standard.mapping.output_key
544 ));
545 code.push_str(" Ok(result)\n");
546 }
547 }
548 "text" => {
549 code.push_str(" // Parse text response\n");
550 code.push_str(" let text = response.text().await?;\n");
551 code.push_str(" let result = serde_json::json!(text);\n");
552 code.push_str(&format!(
553 " state.insert(\"{}\".to_string(), result.clone());\n",
554 self.standard.mapping.output_key
555 ));
556 code.push_str(" Ok(result)\n");
557 }
558 "binary" => {
559 code.push_str(" // Get binary response\n");
560 code.push_str(" let bytes = response.bytes().await?;\n");
561 code.push_str(" let result = serde_json::json!({\n");
562 code.push_str(" \"size\": bytes.len(),\n");
563 code.push_str(" \"data\": base64::Engine::encode(&base64::engine::general_purpose::STANDARD, &bytes)\n");
564 code.push_str(" });\n");
565 code.push_str(&format!(
566 " state.insert(\"{}\".to_string(), result.clone());\n",
567 self.standard.mapping.output_key
568 ));
569 code.push_str(" Ok(result)\n");
570 }
571 _ => {
572 code.push_str(" let result: serde_json::Value = response.json().await?;\n");
573 code.push_str(" Ok(result)\n");
574 }
575 }
576
577 code.push_str("}\n\n");
578
579 code.push_str(generate_status_validation_helper());
581
582 code
583 }
584
585 fn required_imports(&self) -> Vec<&'static str> {
586 let mut imports = vec!["reqwest", "serde_json"];
587 if self.response.json_path.is_some() {
588 imports.push("jsonpath_lib");
589 }
590 if self.response.response_type == "binary" {
591 imports.push("base64");
592 }
593 imports
594 }
595
596 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
597 let mut deps = vec![
598 ("reqwest", "{ version = \"0.12\", features = [\"json\"] }"),
599 ("serde_json", "1"),
600 ];
601 if self.response.json_path.is_some() {
602 deps.push(("jsonpath-lib", "0.3"));
603 }
604 if self.response.response_type == "binary" {
605 deps.push(("base64", "0.21"));
606 }
607 deps
608 }
609}
610
611fn generate_status_validation_helper() -> &'static str {
612 r#"
613/// Validate HTTP status code against a pattern (e.g., "200-299", "200,201,204")
614fn validate_status_code(status: u16, pattern: &str) -> bool {
615 for part in pattern.split(',') {
616 let part = part.trim();
617 if part.contains('-') {
618 let range: Vec<&str> = part.split('-').collect();
619 if range.len() == 2 {
620 if let (Ok(start), Ok(end)) = (range[0].parse::<u16>(), range[1].parse::<u16>()) {
621 if status >= start && status <= end {
622 return true;
623 }
624 }
625 }
626 } else if let Ok(expected) = part.parse::<u16>() {
627 if status == expected {
628 return true;
629 }
630 }
631 }
632 false
633}
634"#
635}
636
637impl ActionNodeCodeGen for SetNodeConfig {
642 fn generate_code(&self, node_id: &str) -> String {
643 let mut code = String::new();
644
645 code.push_str(&format!("// Set Node: {}\n", self.standard.name));
646 code.push_str(&format!(
647 "async fn {}_set(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n",
648 node_id
649 ));
650
651 if let Some(env_vars) = &self.env_vars {
653 if env_vars.load_from_env {
654 code.push_str(" // Load environment variables\n");
655 if let Some(prefix) = &env_vars.prefix {
656 code.push_str(&format!(
657 " for (key, value) in std::env::vars().filter(|(k, _)| k.starts_with(\"{}\")) {{\n",
658 prefix
659 ));
660 } else {
661 code.push_str(" for (key, value) in std::env::vars() {\n");
662 }
663 code.push_str(" state.insert(key, serde_json::json!(value));\n");
664 code.push_str(" }\n\n");
665 }
666 }
667
668 match self.mode {
669 SetMode::Set => {
670 code.push_str(" // Set variables\n");
671 for var in &self.variables {
672 let value_code = match var.value_type.as_str() {
673 "expression" => {
674 format!(
675 "interpolate_variables(\"{}\", state)",
676 var.value.to_string().replace('"', "\\\"")
677 )
678 }
679 "json" => {
680 format!("serde_json::json!({})", var.value)
681 }
682 _ => {
683 format!("serde_json::json!({})", var.value)
684 }
685 };
686
687 if var.is_secret {
688 code.push_str(&format!(" // Secret: {}\n", var.key));
689 code.push_str(&format!(
690 " state.insert(\"{}\".to_string(), {});\n",
691 var.key, value_code
692 ));
693 code.push_str(&format!(
694 " tracing::debug!(key = \"{}\", \"Set secret variable (value masked)\");\n",
695 var.key
696 ));
697 } else {
698 code.push_str(&format!(
699 " state.insert(\"{}\".to_string(), {});\n",
700 var.key, value_code
701 ));
702 }
703 }
704 }
705 SetMode::Merge => {
706 code.push_str(" // Merge variables (deep merge with existing)\n");
707 for var in &self.variables {
708 code.push_str(&format!(
709 " if let Some(existing) = state.get(\"{}\").cloned() {{\n",
710 var.key
711 ));
712 code.push_str(&format!(
713 " let new_value = serde_json::json!({});\n",
714 var.value
715 ));
716 code.push_str(" let merged = deep_merge(&existing, &new_value);\n");
717 code.push_str(&format!(
718 " state.insert(\"{}\".to_string(), merged);\n",
719 var.key
720 ));
721 code.push_str(" } else {\n");
722 code.push_str(&format!(
723 " state.insert(\"{}\".to_string(), serde_json::json!({}));\n",
724 var.key, var.value
725 ));
726 code.push_str(" }\n");
727 }
728 }
729 SetMode::Delete => {
730 code.push_str(" // Delete variables\n");
731 for var in &self.variables {
732 code.push_str(&format!(" state.remove(\"{}\");\n", var.key));
733 }
734 }
735 }
736
737 code.push_str("\n // Return set variables\n");
739 code.push_str(" let result = serde_json::json!({\n");
740 for (i, var) in self.variables.iter().enumerate() {
741 let comma = if i < self.variables.len() - 1 {
742 ","
743 } else {
744 ""
745 };
746 if var.is_secret {
747 code.push_str(&format!(" \"{}\": \"***\"{}\n", var.key, comma));
748 } else {
749 code.push_str(&format!(
750 " \"{}\": state.get(\"{}\").cloned().unwrap_or(serde_json::Value::Null){}\n",
751 var.key, var.key, comma
752 ));
753 }
754 }
755 code.push_str(" });\n");
756 code.push_str(&format!(
757 " state.insert(\"{}\".to_string(), result.clone());\n",
758 self.standard.mapping.output_key
759 ));
760 code.push_str(" Ok(result)\n");
761 code.push_str("}\n\n");
762
763 code.push_str(generate_deep_merge_helper());
765
766 code
767 }
768
769 fn required_imports(&self) -> Vec<&'static str> {
770 vec!["serde_json"]
771 }
772
773 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
774 vec![("serde_json", "1")]
775 }
776}
777
778fn generate_deep_merge_helper() -> &'static str {
779 r#"
780/// Deep merge two JSON values
781fn deep_merge(base: &serde_json::Value, overlay: &serde_json::Value) -> serde_json::Value {
782 match (base, overlay) {
783 (serde_json::Value::Object(base_map), serde_json::Value::Object(overlay_map)) => {
784 let mut result = base_map.clone();
785 for (key, value) in overlay_map {
786 if let Some(base_value) = result.get(key) {
787 result.insert(key.clone(), deep_merge(base_value, value));
788 } else {
789 result.insert(key.clone(), value.clone());
790 }
791 }
792 serde_json::Value::Object(result)
793 }
794 _ => overlay.clone(),
795 }
796}
797"#
798}
799
800impl ActionNodeCodeGen for TransformNodeConfig {
805 fn generate_code(&self, node_id: &str) -> String {
806 let mut code = String::new();
807
808 code.push_str(&format!("// Transform Node: {}\n", self.standard.name));
809 code.push_str(&format!("async fn {}_transform(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n", node_id));
810
811 code.push_str(" // Get input data\n");
813 if let Some(input_mapping) = &self.standard.mapping.input_mapping {
814 if let Some(input_key) = input_mapping.get("input") {
815 code.push_str(&format!(
816 " let input = state.get(\"{}\").cloned().unwrap_or(serde_json::Value::Null);\n\n",
817 input_key
818 ));
819 } else {
820 code.push_str(" let input = state.clone();\n\n");
821 }
822 } else {
823 code.push_str(" let input = serde_json::json!(state.clone());\n\n");
824 }
825
826 match self.transform_type {
827 TransformType::Jsonpath => {
828 code.push_str(" // JSONPath transformation\n");
829 code.push_str(&format!(
830 " let result = jsonpath_lib::select(&input, \"{}\")\n",
831 self.expression.replace('"', "\\\"")
832 ));
833 code.push_str(" .map_err(|e| ActionError::Transform(e.to_string()))?\n");
834 code.push_str(" .into_iter()\n");
835 code.push_str(" .cloned()\n");
836 code.push_str(" .collect::<Vec<_>>();\n");
837 code.push_str(" let result = if result.len() == 1 { result.into_iter().next().unwrap() } else { serde_json::json!(result) };\n");
838 }
839 TransformType::Jmespath => {
840 code.push_str(" // JMESPath transformation\n");
841 code.push_str(&format!(
842 " let expr = jmespath::compile(\"{}\").map_err(|e| ActionError::Transform(e.to_string()))?;\n",
843 self.expression.replace('"', "\\\"")
844 ));
845 code.push_str(" let result = expr.search(&input).map_err(|e| ActionError::Transform(e.to_string()))?;\n");
846 code.push_str(" let result = serde_json::to_value(&result)?;\n");
847 }
848 TransformType::Template => {
849 code.push_str(" // Template transformation (handlebars-style)\n");
850 code.push_str(&format!(
851 " let template = \"{}\";\n",
852 self.expression.replace('"', "\\\"").replace('\n', "\\n")
853 ));
854 code.push_str(
855 " let result = serde_json::json!(interpolate_variables(template, state));\n",
856 );
857 }
858 TransformType::Javascript => {
859 code.push_str(" // JavaScript transformation (sandboxed)\n");
860 code.push_str(&format!(
861 " let code = r#\"{}\"#;\n",
862 self.expression.replace("\\", "\\\\")
863 ));
864 code.push_str(" let result = execute_js_transform(code, &input)?;\n");
865 }
866 }
867
868 if let Some(operations) = &self.operations {
870 for op in operations {
871 code.push_str(&format!(" // Built-in operation: {}\n", op.op_type));
872 match op.op_type.as_str() {
873 "pick" => {
874 if let Some(fields) = op.config.get("fields") {
875 code.push_str(&format!(
876 " let result = pick_fields(&result, &serde_json::json!({}));\n",
877 fields
878 ));
879 }
880 }
881 "omit" => {
882 if let Some(fields) = op.config.get("fields") {
883 code.push_str(&format!(
884 " let result = omit_fields(&result, &serde_json::json!({}));\n",
885 fields
886 ));
887 }
888 }
889 "flatten" => {
890 code.push_str(" let result = flatten_object(&result);\n");
891 }
892 "sort" => {
893 if let Some(key) = op.config.get("key") {
894 code.push_str(&format!(
895 " let result = sort_array(&result, {});\n",
896 key
897 ));
898 }
899 }
900 "unique" => {
901 code.push_str(" let result = unique_array(&result);\n");
902 }
903 _ => {}
904 }
905 }
906 }
907
908 if let Some(coercion) = &self.type_coercion {
910 code.push_str(&format!(
911 " let result = coerce_type(&result, \"{}\");\n",
912 coercion.target_type
913 ));
914 }
915
916 code.push_str(&format!(
917 "\n state.insert(\"{}\".to_string(), result.clone());\n",
918 self.standard.mapping.output_key
919 ));
920 code.push_str(" Ok(result)\n");
921 code.push_str("}\n\n");
922
923 code
924 }
925
926 fn required_imports(&self) -> Vec<&'static str> {
927 let mut imports = vec!["serde_json"];
928 match self.transform_type {
929 TransformType::Jsonpath => imports.push("jsonpath_lib"),
930 TransformType::Jmespath => imports.push("jmespath"),
931 TransformType::Javascript => imports.push("quickjs_rs"),
932 _ => {}
933 }
934 imports
935 }
936
937 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
938 let mut deps = vec![("serde_json", "1")];
939 match self.transform_type {
940 TransformType::Jsonpath => deps.push(("jsonpath-lib", "0.3")),
941 TransformType::Jmespath => deps.push(("jmespath", "0.3")),
942 TransformType::Javascript => deps.push(("quick-js", "0.4")),
943 _ => {}
944 }
945 deps
946 }
947}
948
949impl ActionNodeCodeGen for SwitchNodeConfig {
954 fn generate_code(&self, node_id: &str) -> String {
955 let mut code = String::new();
956
957 code.push_str(&format!("// Switch Node: {}\n", self.standard.name));
958 code.push_str(&format!(
959 "async fn {}_switch(state: &State) -> Result<&'static str, ActionError> {{\n",
960 node_id
961 ));
962
963 if let Some(expr_mode) = &self.expression_mode {
965 if expr_mode.enabled && !expr_mode.expression.is_empty() {
966 code.push_str(" // Expression-based routing\n");
967 code.push_str(&format!(
968 " let branch = evaluate_switch_expression(\"{}\", state)?;\n",
969 expr_mode.expression.replace('"', "\\\"")
970 ));
971 code.push_str(" Ok(branch)\n");
972 code.push_str("}\n\n");
973 return code;
974 }
975 }
976
977 match self.evaluation_mode {
979 EvaluationMode::FirstMatch => {
980 code.push_str(" // First match evaluation\n");
981 for condition in &self.conditions {
982 code.push_str(&format!(" // Condition: {}\n", condition.name));
983 code.push_str(&format!(
984 " if let Some(value) = get_nested_value(state, \"{}\") {{\n",
985 condition.field
986 ));
987
988 let comparison =
989 generate_condition_comparison(&condition.operator, &condition.value);
990 code.push_str(&format!(" if {} {{\n", comparison));
991 code.push_str(&format!(
992 " tracing::debug!(branch = \"{}\", \"Switch condition matched\");\n",
993 condition.output_port
994 ));
995 code.push_str(&format!(
996 " return Ok(\"{}\");\n",
997 condition.output_port
998 ));
999 code.push_str(" }\n");
1000 code.push_str(" }\n\n");
1001 }
1002 }
1003 EvaluationMode::AllMatch => {
1004 code.push_str(" // All match evaluation (fan-out: all branches execute via direct edges)\n");
1005 code.push_str(" // Store matched branches in state for observability\n");
1006 code.push_str(" let mut matched_branches: Vec<String> = Vec::new();\n\n");
1007 for condition in &self.conditions {
1008 code.push_str(&format!(
1009 " if let Some(value) = get_nested_value(state, \"{}\") {{\n",
1010 condition.field
1011 ));
1012 let comparison =
1013 generate_condition_comparison(&condition.operator, &condition.value);
1014 code.push_str(&format!(" if {} {{\n", comparison));
1015 code.push_str(&format!(
1016 " matched_branches.push(\"{}\".to_string());\n",
1017 condition.output_port
1018 ));
1019 code.push_str(" }\n");
1020 code.push_str(" }\n");
1021 }
1022 code.push_str("\n // All connected branches execute regardless — fan-out via direct edges\n");
1023 code.push_str(" // matched_branches is stored for debugging/observability\n");
1024 code.push_str(
1025 " Ok(serde_json::to_string(&matched_branches).unwrap_or_default())\n",
1026 );
1027 }
1028 }
1029
1030 if let Some(default) = &self.default_branch {
1032 code.push_str(&format!(" // Default branch\n Ok(\"{}\")\n", default));
1033 } else {
1034 code.push_str(" Err(ActionError::NoMatchingBranch { node: \"");
1035 code.push_str(node_id);
1036 code.push_str("\".to_string() })\n");
1037 }
1038
1039 code.push_str("}\n\n");
1040
1041 code
1042 }
1043
1044 fn required_imports(&self) -> Vec<&'static str> {
1045 vec!["serde_json"]
1046 }
1047
1048 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
1049 vec![("serde_json", "1")]
1050 }
1051}
1052
1053fn generate_condition_comparison(operator: &str, value: &Option<serde_json::Value>) -> String {
1054 let value_str = value
1055 .as_ref()
1056 .map(|v| v.to_string())
1057 .unwrap_or_else(|| "null".to_string());
1058
1059 match operator {
1060 "eq" => format!("value == &serde_json::json!({})", value_str),
1061 "neq" => format!("value != &serde_json::json!({})", value_str),
1062 "gt" => format!("value.as_f64().map(|n| n > {}).unwrap_or(false)", value_str),
1063 "lt" => format!("value.as_f64().map(|n| n < {}).unwrap_or(false)", value_str),
1064 "gte" => format!(
1065 "value.as_f64().map(|n| n >= {}).unwrap_or(false)",
1066 value_str
1067 ),
1068 "lte" => format!(
1069 "value.as_f64().map(|n| n <= {}).unwrap_or(false)",
1070 value_str
1071 ),
1072 "contains" => format!(
1073 "value.as_str().map(|s| s.contains({})).unwrap_or(false)",
1074 value_str
1075 ),
1076 "startsWith" => {
1077 format!(
1078 "value.as_str().map(|s| s.starts_with({})).unwrap_or(false)",
1079 value_str
1080 )
1081 }
1082 "endsWith" => {
1083 format!(
1084 "value.as_str().map(|s| s.ends_with({})).unwrap_or(false)",
1085 value_str
1086 )
1087 }
1088 "matches" => format!(
1089 "value.as_str().map(|s| regex::Regex::new({}).map(|r| r.is_match(s)).unwrap_or(false)).unwrap_or(false)",
1090 value_str
1091 ),
1092 "in" => format!(
1093 "serde_json::json!({}).as_array().map(|arr| arr.contains(value)).unwrap_or(false)",
1094 value_str
1095 ),
1096 "empty" => "value.as_str().map(|s| s.is_empty()).unwrap_or(value.is_null())".to_string(),
1097 "exists" => "!value.is_null()".to_string(),
1098 _ => "false".to_string(),
1099 }
1100}
1101
1102impl ActionNodeCodeGen for LoopNodeConfig {
1107 fn generate_code(&self, node_id: &str) -> String {
1108 let mut code = String::new();
1109
1110 code.push_str(&format!("// Loop Node: {}\n", self.standard.name));
1111 code.push_str(&format!("async fn {}_loop(\n", node_id));
1112 code.push_str(" state: &mut State,\n");
1113 code.push_str(" executor: &WorkflowExecutor,\n");
1114 code.push_str(") -> Result<serde_json::Value, ActionError> {\n");
1115
1116 match self.loop_type {
1117 LoopType::ForEach => {
1118 if let Some(for_each) = &self.for_each {
1119 code.push_str(&format!(
1120 " // forEach loop over '{}'\n",
1121 for_each.source_array
1122 ));
1123 code.push_str(&format!(
1124 " let source: Vec<serde_json::Value> = state.get(\"{}\")\n",
1125 for_each.source_array
1126 ));
1127 code.push_str(" .and_then(|v| v.as_array())\n");
1128 code.push_str(" .cloned()\n");
1129 code.push_str(" .unwrap_or_default();\n\n");
1130
1131 if self.results.collect {
1132 code.push_str(" let mut results = Vec::new();\n\n");
1133 }
1134
1135 if self.parallel.enabled {
1136 let batch_size = self.parallel.batch_size.unwrap_or(10);
1137 code.push_str(&format!(
1138 " // Parallel execution with batch size {}\n",
1139 batch_size
1140 ));
1141 code.push_str(&format!(
1142 " for chunk in source.chunks({}) {{\n",
1143 batch_size
1144 ));
1145 code.push_str(" let futures: Vec<_> = chunk.iter().enumerate().map(|(idx, item)| {\n");
1146 code.push_str(" let mut loop_state = state.clone();\n");
1147 code.push_str(&format!(
1148 " loop_state.insert(\"{}\".to_string(), item.clone());\n",
1149 for_each.item_var
1150 ));
1151 code.push_str(&format!(
1152 " loop_state.insert(\"{}\".to_string(), serde_json::json!(idx));\n",
1153 for_each.index_var
1154 ));
1155 code.push_str(" executor.execute_loop_body(loop_state)\n");
1156 code.push_str(" }).collect();\n\n");
1157 code.push_str(" let chunk_results = futures::future::join_all(futures).await;\n");
1158 if self.results.collect {
1159 code.push_str(" results.extend(chunk_results.into_iter().filter_map(|r| r.ok()));\n");
1160 }
1161
1162 if let Some(delay) = self.parallel.delay_between {
1163 code.push_str(&format!(
1164 "\n tokio::time::sleep(std::time::Duration::from_millis({})).await;\n",
1165 delay
1166 ));
1167 }
1168 code.push_str(" }\n");
1169 } else {
1170 code.push_str(" // Sequential execution\n");
1171 code.push_str(" for (idx, item) in source.iter().enumerate() {\n");
1172 code.push_str(&format!(
1173 " state.insert(\"{}\".to_string(), item.clone());\n",
1174 for_each.item_var
1175 ));
1176 code.push_str(&format!(
1177 " state.insert(\"{}\".to_string(), serde_json::json!(idx));\n",
1178 for_each.index_var
1179 ));
1180 code.push_str(" let result = executor.execute_loop_body(state.clone()).await?;\n");
1181 if self.results.collect {
1182 code.push_str(" results.push(result);\n");
1183 }
1184 code.push_str(" }\n");
1185 }
1186 }
1187 }
1188 LoopType::While => {
1189 if let Some(while_config) = &self.while_config {
1190 code.push_str(" // while loop\n");
1191 if self.results.collect {
1192 code.push_str(" let mut results = Vec::new();\n");
1193 }
1194 code.push_str(" let mut iteration = 0;\n");
1195 code.push_str(" const MAX_ITERATIONS: usize = 1000; // Safety limit\n\n");
1196 code.push_str(&format!(
1197 " while evaluate_condition(\"{}\", state)? && iteration < MAX_ITERATIONS {{\n",
1198 while_config.condition.replace('"', "\\\"")
1199 ));
1200 code.push_str(
1201 " let result = executor.execute_loop_body(state.clone()).await?;\n",
1202 );
1203 if self.results.collect {
1204 code.push_str(" results.push(result);\n");
1205 }
1206 code.push_str(" iteration += 1;\n");
1207 code.push_str(" }\n");
1208 }
1209 }
1210 LoopType::Times => {
1211 if let Some(times) = &self.times {
1212 let count = match ×.count {
1213 serde_json::Value::Number(n) => n.to_string(),
1214 serde_json::Value::String(s) => format!(
1215 "evaluate_expression(\"{}\", state)?.as_u64().unwrap_or(0) as usize",
1216 s
1217 ),
1218 _ => "0".to_string(),
1219 };
1220 code.push_str(&format!(" // times loop ({} iterations)\n", count));
1221 if self.results.collect {
1222 code.push_str(" let mut results = Vec::new();\n");
1223 }
1224 code.push_str(&format!(" for i in 0..{} {{\n", count));
1225 code.push_str(
1226 " state.insert(\"index\".to_string(), serde_json::json!(i));\n",
1227 );
1228 code.push_str(
1229 " let result = executor.execute_loop_body(state.clone()).await?;\n",
1230 );
1231 if self.results.collect {
1232 code.push_str(" results.push(result);\n");
1233 }
1234 code.push_str(" }\n");
1235 }
1236 }
1237 }
1238
1239 if self.results.collect {
1241 let agg_key = self
1242 .results
1243 .aggregation_key
1244 .as_deref()
1245 .unwrap_or(&self.standard.mapping.output_key);
1246 code.push_str(&format!(
1247 "\n let result = serde_json::json!(results);\n\
1248 state.insert(\"{}\".to_string(), result.clone());\n",
1249 agg_key
1250 ));
1251 } else {
1252 code.push_str("\n let result = serde_json::Value::Null;\n");
1253 }
1254
1255 code.push_str(" Ok(result)\n");
1256 code.push_str("}\n\n");
1257
1258 code
1259 }
1260
1261 fn required_imports(&self) -> Vec<&'static str> {
1262 let mut imports = vec!["serde_json"];
1263 if self.parallel.enabled {
1264 imports.push("futures");
1265 }
1266 imports
1267 }
1268
1269 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
1270 let mut deps = vec![("serde_json", "1")];
1271 if self.parallel.enabled {
1272 deps.push(("futures", "0.3"));
1273 }
1274 deps
1275 }
1276}
1277
1278impl ActionNodeCodeGen for MergeNodeConfig {
1283 fn generate_code(&self, node_id: &str) -> String {
1284 let mut code = String::new();
1285
1286 code.push_str(&format!("// Merge Node: {}\n", self.standard.name));
1287 code.push_str(&format!("async fn {}_merge(\n", node_id));
1288 code.push_str(" branch_results: Vec<(String, serde_json::Value)>,\n");
1289 code.push_str(" state: &mut State,\n");
1290 code.push_str(") -> Result<serde_json::Value, ActionError> {\n");
1291
1292 if self.timeout.enabled {
1294 code.push_str(&format!(" // Timeout: {}ms\n", self.timeout.ms));
1295 }
1296
1297 match self.mode {
1298 MergeMode::WaitAll => {
1299 code.push_str(" // Wait for all branches\n");
1300 code.push_str(
1301 " // Note: branch_results already contains all completed branches\n",
1302 );
1303 }
1304 MergeMode::WaitAny => {
1305 code.push_str(" // Wait for any branch (first to complete)\n");
1306 code.push_str(" if branch_results.is_empty() {\n");
1307 code.push_str(" return Err(ActionError::NoBranchCompleted);\n");
1308 code.push_str(" }\n");
1309 }
1310 MergeMode::WaitN => {
1311 let n = self.wait_count.unwrap_or(1);
1312 code.push_str(&format!(" // Wait for {} branches\n", n));
1313 code.push_str(&format!(" if branch_results.len() < {} {{\n", n));
1314 code.push_str(&format!(
1315 " return Err(ActionError::InsufficientBranches {{ expected: {}, got: branch_results.len() }});\n",
1316 n
1317 ));
1318 code.push_str(" }\n");
1319 }
1320 }
1321
1322 code.push_str("\n // Combine branch results\n");
1324 match self.combine_strategy {
1325 CombineStrategy::Array => {
1326 code.push_str(" let result: Vec<serde_json::Value> = branch_results.into_iter().map(|(_, v)| v).collect();\n");
1327 code.push_str(" let result = serde_json::json!(result);\n");
1328 }
1329 CombineStrategy::Object => {
1330 code.push_str(" let mut result_map = serde_json::Map::new();\n");
1331 code.push_str(" for (branch_key, value) in branch_results {\n");
1332 code.push_str(" result_map.insert(branch_key, value);\n");
1333 code.push_str(" }\n");
1334 code.push_str(" let result = serde_json::Value::Object(result_map);\n");
1335 }
1336 CombineStrategy::First => {
1337 code.push_str(" let result = branch_results.into_iter().next().map(|(_, v)| v).unwrap_or(serde_json::Value::Null);\n");
1338 }
1339 CombineStrategy::Last => {
1340 code.push_str(" let result = branch_results.into_iter().last().map(|(_, v)| v).unwrap_or(serde_json::Value::Null);\n");
1341 }
1342 }
1343
1344 code.push_str(&format!(
1345 "\n state.insert(\"{}\".to_string(), result.clone());\n",
1346 self.standard.mapping.output_key
1347 ));
1348 code.push_str(" Ok(result)\n");
1349 code.push_str("}\n\n");
1350
1351 code
1352 }
1353
1354 fn required_imports(&self) -> Vec<&'static str> {
1355 vec!["serde_json"]
1356 }
1357
1358 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
1359 vec![("serde_json", "1")]
1360 }
1361}
1362
1363impl ActionNodeCodeGen for WaitNodeConfig {
1368 fn generate_code(&self, node_id: &str) -> String {
1369 let mut code = String::new();
1370
1371 code.push_str(&format!("// Wait Node: {}\n", self.standard.name));
1372 code.push_str(&format!(
1373 "async fn {}_wait(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n",
1374 node_id
1375 ));
1376
1377 match self.wait_type {
1378 WaitType::Fixed => {
1379 if let Some(fixed) = &self.fixed {
1380 let ms = match fixed.unit.as_str() {
1381 "ms" => fixed.duration,
1382 "s" => fixed.duration * 1000,
1383 "m" => fixed.duration * 60 * 1000,
1384 "h" => fixed.duration * 60 * 60 * 1000,
1385 _ => fixed.duration,
1386 };
1387 code.push_str(&format!(
1388 " // Fixed wait: {} {}\n",
1389 fixed.duration, fixed.unit
1390 ));
1391 code.push_str(&format!(
1392 " tracing::debug!(duration_ms = {}, \"Waiting\");\n",
1393 ms
1394 ));
1395 code.push_str(&format!(
1396 " tokio::time::sleep(std::time::Duration::from_millis({})).await;\n",
1397 ms
1398 ));
1399 }
1400 }
1401 WaitType::Until => {
1402 if let Some(until) = &self.until {
1403 code.push_str(" // Wait until timestamp\n");
1404 code.push_str(&format!(
1405 " let target = chrono::DateTime::parse_from_rfc3339(\"{}\")\n",
1406 until.timestamp
1407 ));
1408 code.push_str(
1409 " .map_err(|e| ActionError::InvalidTimestamp(e.to_string()))?;\n",
1410 );
1411 code.push_str(" let now = chrono::Utc::now();\n");
1412 code.push_str(" if target > now {\n");
1413 code.push_str(
1414 " let duration = (target - now).to_std().unwrap_or_default();\n",
1415 );
1416 code.push_str(
1417 " tracing::debug!(until = %target, \"Waiting until timestamp\");\n",
1418 );
1419 code.push_str(" tokio::time::sleep(duration).await;\n");
1420 code.push_str(" }\n");
1421 }
1422 }
1423 WaitType::Webhook => {
1424 if let Some(webhook) = &self.webhook {
1425 code.push_str(&format!(
1426 " // Wait for webhook callback at '{}'\n",
1427 webhook.path
1428 ));
1429 code.push_str(&format!(" // Timeout: {}ms\n", webhook.timeout));
1430 code.push_str(
1431 " // Note: Webhook handler should signal completion via channel\n",
1432 );
1433 code.push_str(" let (tx, rx) = tokio::sync::oneshot::channel();\n");
1434 code.push_str(" // Register webhook handler...\n");
1435 code.push_str(&format!(
1436 " let result = tokio::time::timeout(\n\
1437 std::time::Duration::from_millis({}),\n\
1438 rx\n\
1439 ).await\n\
1440 .map_err(|_| ActionError::WebhookTimeout)?\n\
1441 .map_err(|_| ActionError::WebhookCancelled)?;\n",
1442 webhook.timeout
1443 ));
1444 code.push_str(" return Ok(result);\n");
1445 }
1446 }
1447 WaitType::Condition => {
1448 if let Some(condition) = &self.condition {
1449 code.push_str(" // Poll until condition is true\n");
1450 code.push_str(&format!(
1451 " let poll_interval = std::time::Duration::from_millis({});\n",
1452 condition.poll_interval
1453 ));
1454 code.push_str(&format!(
1455 " let max_wait = std::time::Duration::from_millis({});\n",
1456 condition.max_wait
1457 ));
1458 code.push_str(" let start = std::time::Instant::now();\n\n");
1459 code.push_str(" loop {\n");
1460 code.push_str(&format!(
1461 " if evaluate_condition(\"{}\", state)? {{\n",
1462 condition.expression.replace('"', "\\\"")
1463 ));
1464 code.push_str(" tracing::debug!(\"Condition met\");\n");
1465 code.push_str(" break;\n");
1466 code.push_str(" }\n\n");
1467 code.push_str(" if start.elapsed() >= max_wait {\n");
1468 code.push_str(" return Err(ActionError::ConditionTimeout {\n");
1469 code.push_str(&format!(
1470 " condition: \"{}\".to_string(),\n",
1471 condition.expression.replace('"', "\\\"")
1472 ));
1473 code.push_str(&format!(
1474 " timeout_ms: {},\n",
1475 condition.max_wait
1476 ));
1477 code.push_str(" });\n");
1478 code.push_str(" }\n\n");
1479 code.push_str(" tokio::time::sleep(poll_interval).await;\n");
1480 code.push_str(" }\n");
1481 }
1482 }
1483 }
1484
1485 code.push_str("\n Ok(serde_json::json!({ \"waited\": true }))\n");
1486 code.push_str("}\n\n");
1487
1488 code
1489 }
1490
1491 fn required_imports(&self) -> Vec<&'static str> {
1492 let mut imports = vec!["tokio"];
1493 if self.wait_type == WaitType::Until {
1494 imports.push("chrono");
1495 }
1496 imports
1497 }
1498
1499 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
1500 let mut deps = vec![("tokio", "{ version = \"1\", features = [\"time\"] }")];
1501 if self.wait_type == WaitType::Until {
1502 deps.push(("chrono", "0.4"));
1503 }
1504 deps
1505 }
1506}
1507
1508impl ActionNodeCodeGen for CodeNodeConfig {
1513 fn generate_code(&self, node_id: &str) -> String {
1514 let mut code = String::new();
1515
1516 code.push_str(&format!("// Code Node: {}\n", self.standard.name));
1517
1518 match self.language {
1519 CodeLanguage::Rust => {
1520 code.push_str(&format!(
1523 "/// Authored Rust body for code node `{}`\n",
1524 self.standard.name
1525 ));
1526 code.push_str(&format!(
1527 "fn {}_run(input: serde_json::Value) -> serde_json::Value {{\n",
1528 node_id
1529 ));
1530 code.push_str(&self.code);
1531 code.push_str("\n}\n\n");
1532
1533 code.push_str(&format!(
1534 "async fn {}_code(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n",
1535 node_id
1536 ));
1537 code.push_str(" let input = serde_json::json!(state.clone());\n");
1538 code.push_str(&format!(" let result = {}_run(input);\n", node_id));
1539 code.push_str(&format!(
1540 " state.insert(\"{}\".to_string(), result.clone());\n",
1541 self.standard.mapping.output_key
1542 ));
1543 code.push_str(" Ok(result)\n");
1544 code.push_str("}\n\n");
1545 }
1546 CodeLanguage::Javascript | CodeLanguage::Typescript => {
1547 code.push_str(&format!(
1549 "async fn {}_code(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n",
1550 node_id
1551 ));
1552
1553 code.push_str(" // Sandbox configuration\n");
1554 code.push_str(&format!(
1555 " let sandbox_config = SandboxConfig {{\n\
1556 network_access: {},\n\
1557 file_system_access: {},\n\
1558 memory_limit_mb: {},\n\
1559 time_limit_ms: {},\n\
1560 }};\n\n",
1561 self.sandbox.network_access,
1562 self.sandbox.file_system_access,
1563 self.sandbox.memory_limit,
1564 self.sandbox.time_limit
1565 ));
1566
1567 code.push_str(" let input = serde_json::json!(state.clone());\n\n");
1568 code.push_str(&format!(
1569 " let code = r#\"{}\"#;\n",
1570 self.code.replace('\\', "\\\\").replace('#', "\\#")
1571 ));
1572
1573 if matches!(self.language, CodeLanguage::Typescript) {
1574 code.push_str(" // TypeScript is transpiled to JavaScript\n");
1575 code.push_str(" let js_code = transpile_typescript(code)?;\n");
1576 code.push_str(
1577 " let result = execute_js_sandboxed(&js_code, &input, &sandbox_config)?;\n",
1578 );
1579 } else {
1580 code.push_str(
1581 " let result = execute_js_sandboxed(code, &input, &sandbox_config)?;\n",
1582 );
1583 }
1584
1585 code.push_str(&format!(
1586 "\n state.insert(\"{}\".to_string(), result.clone());\n",
1587 self.standard.mapping.output_key
1588 ));
1589 code.push_str(" Ok(result)\n");
1590 code.push_str("}\n\n");
1591
1592 code.push_str(generate_sandbox_helper());
1594 }
1595 }
1596
1597 code
1598 }
1599
1600 fn required_imports(&self) -> Vec<&'static str> {
1601 match self.language {
1602 CodeLanguage::Rust => vec!["serde_json"],
1603 CodeLanguage::Javascript | CodeLanguage::Typescript => {
1604 vec!["serde_json", "quick_js"]
1605 }
1606 }
1607 }
1608
1609 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
1610 match self.language {
1611 CodeLanguage::Rust => vec![("serde_json", "1")],
1612 CodeLanguage::Javascript | CodeLanguage::Typescript => {
1613 vec![("serde_json", "1"), ("quick-js", "0.4")]
1614 }
1615 }
1616 }
1617}
1618
1619fn generate_sandbox_helper() -> &'static str {
1620 r#"
1621/// Sandbox configuration for code execution
1622struct SandboxConfig {
1623 network_access: bool,
1624 file_system_access: bool,
1625 memory_limit_mb: u32,
1626 time_limit_ms: u64,
1627}
1628
1629/// Execute JavaScript code in a sandboxed environment
1630fn execute_js_sandboxed(
1631 code: &str,
1632 input: &serde_json::Value,
1633 config: &SandboxConfig,
1634) -> Result<serde_json::Value, ActionError> {
1635 use quick_js::{Context, JsValue};
1636
1637 let context = Context::new().map_err(|e| ActionError::SandboxInit(e.to_string()))?;
1638
1639 // Set memory limit
1640 // Note: quick-js doesn't have direct memory limit API, this is a placeholder
1641
1642 // Inject input as global variable
1643 let input_json = serde_json::to_string(input)?;
1644 context.eval(&format!("const input = {};", input_json))
1645 .map_err(|e| ActionError::CodeExecution(e.to_string()))?;
1646
1647 // Disable network/fs if not allowed
1648 if !config.network_access {
1649 context.eval("globalThis.fetch = undefined; globalThis.XMLHttpRequest = undefined;")
1650 .map_err(|e| ActionError::CodeExecution(e.to_string()))?;
1651 }
1652
1653 // Execute with timeout
1654 let result = std::thread::scope(|s| {
1655 let handle = s.spawn(|| {
1656 context.eval(code)
1657 });
1658
1659 // Wait with timeout
1660 std::thread::sleep(std::time::Duration::from_millis(config.time_limit_ms));
1661
1662 // Note: In production, would need proper timeout handling
1663 handle.join().unwrap_or(Err(quick_js::ExecutionError::Internal("Timeout".to_string())))
1664 });
1665
1666 let js_result = result.map_err(|e| ActionError::CodeExecution(e.to_string()))?;
1667
1668 // Convert JsValue to serde_json::Value
1669 js_value_to_json(js_result)
1670}
1671
1672fn js_value_to_json(value: quick_js::JsValue) -> Result<serde_json::Value, ActionError> {
1673 use quick_js::JsValue;
1674
1675 match value {
1676 JsValue::Null => Ok(serde_json::Value::Null),
1677 JsValue::Bool(b) => Ok(serde_json::json!(b)),
1678 JsValue::Int(i) => Ok(serde_json::json!(i)),
1679 JsValue::Float(f) => Ok(serde_json::json!(f)),
1680 JsValue::String(s) => Ok(serde_json::json!(s)),
1681 JsValue::Array(arr) => {
1682 let values: Result<Vec<_>, _> = arr.into_iter().map(js_value_to_json).collect();
1683 Ok(serde_json::json!(values?))
1684 }
1685 JsValue::Object(obj) => {
1686 let mut map = serde_json::Map::new();
1687 for (k, v) in obj {
1688 map.insert(k, js_value_to_json(v)?);
1689 }
1690 Ok(serde_json::Value::Object(map))
1691 }
1692 _ => Ok(serde_json::Value::Null),
1693 }
1694}
1695"#
1696}
1697
1698impl ActionNodeCodeGen for DatabaseNodeConfig {
1703 fn generate_code(&self, node_id: &str) -> String {
1704 let mut code = String::new();
1705
1706 code.push_str(&format!("// Database Node: {}\n", self.standard.name));
1707 code.push_str(&format!("async fn {}_database(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n", node_id));
1708
1709 code.push_str(" // Get connection string\n");
1711 if let Some(cred_ref) = &self.connection.credential_ref {
1712 code.push_str(&format!(
1713 " let connection_string = state.get(\"{}\")\n\
1714 .and_then(|v| v.as_str())\n\
1715 .ok_or_else(|| ActionError::MissingCredential(\"{}\".to_string()))?\n\
1716 .to_string();\n\n",
1717 cred_ref, cred_ref
1718 ));
1719 } else {
1720 code.push_str(&format!(
1721 " let connection_string = interpolate_variables(\"{}\", state);\n\n",
1722 self.connection.connection_string.replace('"', "\\\"")
1723 ));
1724 }
1725
1726 match self.db_type {
1727 DatabaseType::Postgresql | DatabaseType::Mysql | DatabaseType::Sqlite => {
1728 code.push_str(&generate_sql_code(node_id, self));
1729 }
1730 DatabaseType::Mongodb => {
1731 code.push_str(&generate_mongodb_code(node_id, self));
1732 }
1733 DatabaseType::Redis => {
1734 code.push_str(&generate_redis_code(node_id, self));
1735 }
1736 }
1737
1738 code.push_str("}\n\n");
1739
1740 code
1741 }
1742
1743 fn required_imports(&self) -> Vec<&'static str> {
1744 match self.db_type {
1745 DatabaseType::Postgresql | DatabaseType::Mysql | DatabaseType::Sqlite => {
1746 vec!["sqlx", "serde_json"]
1747 }
1748 DatabaseType::Mongodb => {
1749 vec!["mongodb", "serde_json"]
1750 }
1751 DatabaseType::Redis => {
1752 vec!["redis", "serde_json"]
1753 }
1754 }
1755 }
1756
1757 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
1758 match self.db_type {
1759 DatabaseType::Postgresql => {
1760 vec![
1761 (
1762 "sqlx",
1763 "{ version = \"0.7\", features = [\"runtime-tokio\", \"postgres\"] }",
1764 ),
1765 ("serde_json", "1"),
1766 ]
1767 }
1768 DatabaseType::Mysql => {
1769 vec![
1770 (
1771 "sqlx",
1772 "{ version = \"0.7\", features = [\"runtime-tokio\", \"mysql\"] }",
1773 ),
1774 ("serde_json", "1"),
1775 ]
1776 }
1777 DatabaseType::Sqlite => {
1778 vec![
1779 (
1780 "sqlx",
1781 "{ version = \"0.7\", features = [\"runtime-tokio\", \"sqlite\"] }",
1782 ),
1783 ("serde_json", "1"),
1784 ]
1785 }
1786 DatabaseType::Mongodb => {
1787 vec![("mongodb", "2"), ("serde_json", "1")]
1788 }
1789 DatabaseType::Redis => {
1790 vec![
1791 (
1792 "redis",
1793 "{ version = \"0.24\", features = [\"tokio-comp\"] }",
1794 ),
1795 ("serde_json", "1"),
1796 ]
1797 }
1798 }
1799 }
1800}
1801
1802fn generate_sql_code(_node_id: &str, config: &DatabaseNodeConfig) -> String {
1803 let mut code = String::new();
1804
1805 let db_type = match config.db_type {
1806 DatabaseType::Postgresql => "Postgres",
1807 DatabaseType::Mysql => "MySql",
1808 DatabaseType::Sqlite => "Sqlite",
1809 _ => "Postgres",
1810 };
1811
1812 let pool_size = config.connection.pool_size.unwrap_or(5);
1814 code.push_str(&format!(" // Create {} connection pool\n", db_type));
1815 code.push_str(&format!(
1816 " let pool = sqlx::{}Pool::connect_with(\n",
1817 db_type
1818 ));
1819 code.push_str(&format!(
1820 " sqlx::{}::{}ConnectOptions::from_str(&connection_string)?\n",
1821 db_type.to_lowercase(),
1822 db_type
1823 ));
1824 code.push_str(&format!(" .max_connections({})\n", pool_size));
1825 code.push_str(" ).await?;\n\n");
1826
1827 if let Some(sql) = &config.sql {
1828 code.push_str(&format!(" // SQL operation: {}\n", sql.operation));
1829
1830 match sql.operation.as_str() {
1831 "query" => {
1832 code.push_str(&format!(
1833 " let query = \"{}\";\n",
1834 sql.query.replace('"', "\\\"")
1835 ));
1836 code.push_str(" let rows = sqlx::query(query)\n");
1837
1838 if let Some(params) = &sql.params {
1840 for value in params.values() {
1841 code.push_str(&format!(" .bind(serde_json::json!({}))\n", value));
1842 }
1843 }
1844
1845 code.push_str(" .fetch_all(&pool).await?;\n\n");
1846 code.push_str(" // Convert rows to JSON\n");
1847 code.push_str(" let result: Vec<serde_json::Value> = rows.iter().map(|row| {\n");
1848 code.push_str(
1849 " // Note: Actual implementation would use row.get() for each column\n",
1850 );
1851 code.push_str(" serde_json::json!({})\n");
1852 code.push_str(" }).collect();\n");
1853 code.push_str(" let result = serde_json::json!(result);\n");
1854 }
1855 "insert" | "update" | "delete" | "upsert" => {
1856 code.push_str(&format!(
1857 " let query = \"{}\";\n",
1858 sql.query.replace('"', "\\\"")
1859 ));
1860 code.push_str(" let result = sqlx::query(query)\n");
1861
1862 if let Some(params) = &sql.params {
1863 for value in params.values() {
1864 code.push_str(&format!(" .bind(serde_json::json!({}))\n", value));
1865 }
1866 }
1867
1868 code.push_str(" .execute(&pool).await?;\n\n");
1869 code.push_str(" let result = serde_json::json!({\n");
1870 code.push_str(" \"rows_affected\": result.rows_affected()\n");
1871 code.push_str(" });\n");
1872 }
1873 _ => {
1874 code.push_str(" let result = serde_json::Value::Null;\n");
1875 }
1876 }
1877 } else {
1878 code.push_str(" let result = serde_json::Value::Null;\n");
1879 }
1880
1881 code.push_str(&format!(
1882 "\n state.insert(\"{}\".to_string(), result.clone());\n",
1883 config.standard.mapping.output_key
1884 ));
1885 code.push_str(" Ok(result)\n");
1886
1887 code
1888}
1889
1890fn generate_mongodb_code(_node_id: &str, config: &DatabaseNodeConfig) -> String {
1891 let mut code = String::new();
1892
1893 code.push_str(" // Create MongoDB client\n");
1894 code.push_str(" let client = mongodb::Client::with_uri_str(&connection_string).await?;\n");
1895 code.push_str(
1896 " let db = client.default_database().ok_or_else(|| ActionError::NoDatabase)?;\n\n",
1897 );
1898
1899 if let Some(mongo) = &config.mongodb {
1900 code.push_str(&format!(
1901 " let collection = db.collection::<mongodb::bson::Document>(\"{}\");\n\n",
1902 mongo.collection
1903 ));
1904
1905 match mongo.operation.as_str() {
1906 "find" => {
1907 let filter = mongo
1908 .filter
1909 .as_ref()
1910 .map(|f| f.to_string())
1911 .unwrap_or_else(|| "{}".to_string());
1912 code.push_str(&format!(
1913 " let filter = mongodb::bson::doc! {};\n",
1914 filter
1915 ));
1916 code.push_str(" let cursor = collection.find(filter, None).await?;\n");
1917 code.push_str(" let docs: Vec<_> = cursor.try_collect().await?;\n");
1918 code.push_str(" let result = serde_json::to_value(&docs)?;\n");
1919 }
1920 "findOne" => {
1921 let filter = mongo
1922 .filter
1923 .as_ref()
1924 .map(|f| f.to_string())
1925 .unwrap_or_else(|| "{}".to_string());
1926 code.push_str(&format!(
1927 " let filter = mongodb::bson::doc! {};\n",
1928 filter
1929 ));
1930 code.push_str(" let doc = collection.find_one(filter, None).await?;\n");
1931 code.push_str(" let result = serde_json::to_value(&doc)?;\n");
1932 }
1933 "insert" => {
1934 let doc = mongo
1935 .document
1936 .as_ref()
1937 .map(|d| d.to_string())
1938 .unwrap_or_else(|| "{}".to_string());
1939 code.push_str(&format!(" let doc = mongodb::bson::doc! {};\n", doc));
1940 code.push_str(" let result = collection.insert_one(doc, None).await?;\n");
1941 code.push_str(" let result = serde_json::json!({ \"inserted_id\": result.inserted_id.to_string() });\n");
1942 }
1943 "update" => {
1944 let filter = mongo
1945 .filter
1946 .as_ref()
1947 .map(|f| f.to_string())
1948 .unwrap_or_else(|| "{}".to_string());
1949 let doc = mongo
1950 .document
1951 .as_ref()
1952 .map(|d| d.to_string())
1953 .unwrap_or_else(|| "{}".to_string());
1954 code.push_str(&format!(
1955 " let filter = mongodb::bson::doc! {};\n",
1956 filter
1957 ));
1958 code.push_str(&format!(
1959 " let update = mongodb::bson::doc! {{ \"$set\": {} }};\n",
1960 doc
1961 ));
1962 code.push_str(
1963 " let result = collection.update_many(filter, update, None).await?;\n",
1964 );
1965 code.push_str(" let result = serde_json::json!({\n");
1966 code.push_str(" \"matched_count\": result.matched_count,\n");
1967 code.push_str(" \"modified_count\": result.modified_count\n");
1968 code.push_str(" });\n");
1969 }
1970 "delete" => {
1971 let filter = mongo
1972 .filter
1973 .as_ref()
1974 .map(|f| f.to_string())
1975 .unwrap_or_else(|| "{}".to_string());
1976 code.push_str(&format!(
1977 " let filter = mongodb::bson::doc! {};\n",
1978 filter
1979 ));
1980 code.push_str(" let result = collection.delete_many(filter, None).await?;\n");
1981 code.push_str(" let result = serde_json::json!({ \"deleted_count\": result.deleted_count });\n");
1982 }
1983 _ => {
1984 code.push_str(" let result = serde_json::Value::Null;\n");
1985 }
1986 }
1987 } else {
1988 code.push_str(" let result = serde_json::Value::Null;\n");
1989 }
1990
1991 code.push_str(&format!(
1992 "\n state.insert(\"{}\".to_string(), result.clone());\n",
1993 config.standard.mapping.output_key
1994 ));
1995 code.push_str(" Ok(result)\n");
1996
1997 code
1998}
1999
2000fn generate_redis_code(_node_id: &str, config: &DatabaseNodeConfig) -> String {
2001 let mut code = String::new();
2002
2003 code.push_str(" // Create Redis client\n");
2004 code.push_str(" let client = redis::Client::open(connection_string.as_str())?;\n");
2005 code.push_str(" let mut con = client.get_async_connection().await?;\n\n");
2006
2007 if let Some(redis) = &config.redis {
2008 code.push_str(&format!(" // Redis operation: {}\n", redis.operation));
2009
2010 match redis.operation.as_str() {
2011 "get" => {
2012 code.push_str(&format!(
2013 " let value: Option<String> = redis::cmd(\"GET\").arg(\"{}\").query_async(&mut con).await?;\n",
2014 redis.key
2015 ));
2016 code.push_str(" let result = serde_json::json!(value);\n");
2017 }
2018 "set" => {
2019 let value = redis
2020 .value
2021 .as_ref()
2022 .map(|v| v.to_string())
2023 .unwrap_or_else(|| "null".to_string());
2024 code.push_str(&format!(
2025 " let _: () = redis::cmd(\"SET\").arg(\"{}\").arg({}).query_async(&mut con).await?;\n",
2026 redis.key, value
2027 ));
2028 if let Some(ttl) = redis.ttl {
2029 code.push_str(&format!(
2030 " let _: () = redis::cmd(\"EXPIRE\").arg(\"{}\").arg({}).query_async(&mut con).await?;\n",
2031 redis.key, ttl
2032 ));
2033 }
2034 code.push_str(" let result = serde_json::json!({ \"ok\": true });\n");
2035 }
2036 "del" => {
2037 code.push_str(&format!(
2038 " let deleted: i64 = redis::cmd(\"DEL\").arg(\"{}\").query_async(&mut con).await?;\n",
2039 redis.key
2040 ));
2041 code.push_str(" let result = serde_json::json!({ \"deleted\": deleted });\n");
2042 }
2043 "hget" => {
2044 let field = redis
2045 .value
2046 .as_ref()
2047 .and_then(|v| v.as_str())
2048 .unwrap_or("field");
2049 code.push_str(&format!(
2050 " let value: Option<String> = redis::cmd(\"HGET\").arg(\"{}\").arg(\"{}\").query_async(&mut con).await?;\n",
2051 redis.key, field
2052 ));
2053 code.push_str(" let result = serde_json::json!(value);\n");
2054 }
2055 "hset" => {
2056 let value = redis
2057 .value
2058 .as_ref()
2059 .map(|v| v.to_string())
2060 .unwrap_or_else(|| "{}".to_string());
2061 code.push_str(&format!(
2062 " let _: () = redis::cmd(\"HSET\").arg(\"{}\").arg({}).query_async(&mut con).await?;\n",
2063 redis.key, value
2064 ));
2065 code.push_str(" let result = serde_json::json!({ \"ok\": true });\n");
2066 }
2067 "lpush" => {
2068 let value = redis
2069 .value
2070 .as_ref()
2071 .map(|v| v.to_string())
2072 .unwrap_or_else(|| "null".to_string());
2073 code.push_str(&format!(
2074 " let len: i64 = redis::cmd(\"LPUSH\").arg(\"{}\").arg({}).query_async(&mut con).await?;\n",
2075 redis.key, value
2076 ));
2077 code.push_str(" let result = serde_json::json!({ \"length\": len });\n");
2078 }
2079 "rpop" => {
2080 code.push_str(&format!(
2081 " let value: Option<String> = redis::cmd(\"RPOP\").arg(\"{}\").query_async(&mut con).await?;\n",
2082 redis.key
2083 ));
2084 code.push_str(" let result = serde_json::json!(value);\n");
2085 }
2086 _ => {
2087 code.push_str(" let result = serde_json::Value::Null;\n");
2088 }
2089 }
2090 } else {
2091 code.push_str(" let result = serde_json::Value::Null;\n");
2092 }
2093
2094 code.push_str(&format!(
2095 "\n state.insert(\"{}\".to_string(), result.clone());\n",
2096 config.standard.mapping.output_key
2097 ));
2098 code.push_str(" Ok(result)\n");
2099
2100 code
2101}
2102
2103impl ActionNodeCodeGen for EmailNodeConfig {
2108 fn generate_code(&self, node_id: &str) -> String {
2109 let mut code = String::new();
2110
2111 code.push_str(&format!("// Email Node: {}\n", self.standard.name));
2112 code.push_str(&format!(
2113 "async fn {}_email(state: &mut State) -> Result<serde_json::Value, ActionError> {{\n",
2114 node_id
2115 ));
2116
2117 match self.mode {
2118 EmailMode::Monitor => {
2119 code.push_str(&generate_imap_monitor_code(node_id, self));
2120 }
2121 EmailMode::Send => {
2122 code.push_str(&generate_smtp_send_code(node_id, self));
2123 }
2124 }
2125
2126 code.push_str("}\n\n");
2127
2128 code
2129 }
2130
2131 fn required_imports(&self) -> Vec<&'static str> {
2132 match self.mode {
2133 EmailMode::Monitor => vec!["imap", "native_tls", "mailparse", "serde_json"],
2134 EmailMode::Send => vec!["lettre", "serde_json"],
2135 }
2136 }
2137
2138 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
2139 match self.mode {
2140 EmailMode::Monitor => vec![
2141 ("imap", "3"),
2142 ("native-tls", "0.2"),
2143 ("mailparse", "0.14"),
2144 ("serde_json", "1"),
2145 ],
2146 EmailMode::Send => vec![
2147 (
2148 "lettre",
2149 "{ version = \"0.11\", features = [\"tokio1-native-tls\", \"builder\"] }",
2150 ),
2151 ("serde_json", "1"),
2152 ],
2153 }
2154 }
2155}
2156
2157fn generate_imap_monitor_code(_node_id: &str, config: &EmailNodeConfig) -> String {
2158 let mut code = String::new();
2159
2160 if let Some(imap) = &config.imap {
2161 code.push_str(" // IMAP email monitoring\n");
2162 code.push_str(&format!(
2163 " let host = interpolate_variables(\"{}\", state);\n",
2164 imap.host.replace('"', "\\\"")
2165 ));
2166 code.push_str(&format!(
2167 " let username = interpolate_variables(\"{}\", state);\n",
2168 imap.username.replace('"', "\\\"")
2169 ));
2170 code.push_str(&format!(
2171 " let password = interpolate_variables(\"{}\", state);\n",
2172 imap.password.replace('"', "\\\"")
2173 ));
2174
2175 if imap.secure {
2177 code.push_str("\n // Create TLS connection\n");
2178 code.push_str(" let tls = native_tls::TlsConnector::builder().build()?;\n");
2179 code.push_str(&format!(
2180 " let client = imap::connect((\"{}\", {}), &host, &tls)?;\n",
2181 imap.host, imap.port
2182 ));
2183 } else {
2184 code.push_str("\n // Create plain connection\n");
2185 code.push_str(&format!(
2186 " let client = imap::connect_insecure((\"{}\", {}))?;\n",
2187 imap.host, imap.port
2188 ));
2189 }
2190
2191 code.push_str("\n // Login\n");
2193 code.push_str(" let mut session = client.login(&username, &password)\n");
2194 code.push_str(" .map_err(|e| ActionError::EmailAuth(e.0.to_string()))?;\n");
2195
2196 code.push_str(&format!("\n // Select folder: {}\n", imap.folder));
2198 code.push_str(&format!(" session.select(\"{}\")?;\n", imap.folder));
2199
2200 code.push_str("\n // Build search criteria\n");
2202 let mut search_criteria = Vec::new();
2203
2204 if let Some(filters) = &config.filters {
2205 if filters.unread_only {
2206 search_criteria.push("UNSEEN".to_string());
2207 }
2208 if let Some(from) = &filters.from {
2209 search_criteria.push(format!("FROM \"{}\"", from));
2210 }
2211 if let Some(subject) = &filters.subject {
2212 search_criteria.push(format!("SUBJECT \"{}\"", subject));
2213 }
2214 if let Some(date_from) = &filters.date_from {
2215 search_criteria.push(format!("SINCE \"{}\"", date_from));
2216 }
2217 if let Some(date_to) = &filters.date_to {
2218 search_criteria.push(format!("BEFORE \"{}\"", date_to));
2219 }
2220 }
2221
2222 let search_str = if search_criteria.is_empty() {
2223 "ALL".to_string()
2224 } else {
2225 search_criteria.join(" ")
2226 };
2227
2228 code.push_str(&format!(
2229 " let search_result = session.search(\"{}\")?;\n",
2230 search_str
2231 ));
2232
2233 code.push_str("\n // Fetch messages\n");
2235 code.push_str(" let mut emails = Vec::new();\n");
2236 code.push_str(" for uid in search_result.iter() {\n");
2237 code.push_str(
2238 " let messages = session.fetch(uid.to_string(), \"(RFC822 ENVELOPE)\")?;\n",
2239 );
2240 code.push_str(" for message in messages.iter() {\n");
2241 code.push_str(" if let Some(body) = message.body() {\n");
2242 code.push_str(" let parsed = mailparse::parse_mail(body)?;\n");
2243 code.push_str(" let email_data = serde_json::json!({\n");
2244 code.push_str(" \"uid\": uid,\n");
2245 code.push_str(" \"from\": parsed.headers.iter()\n");
2246 code.push_str(" .find(|h| h.get_key() == \"From\")\n");
2247 code.push_str(" .map(|h| h.get_value()),\n");
2248 code.push_str(" \"to\": parsed.headers.iter()\n");
2249 code.push_str(" .find(|h| h.get_key() == \"To\")\n");
2250 code.push_str(" .map(|h| h.get_value()),\n");
2251 code.push_str(" \"subject\": parsed.headers.iter()\n");
2252 code.push_str(" .find(|h| h.get_key() == \"Subject\")\n");
2253 code.push_str(" .map(|h| h.get_value()),\n");
2254 code.push_str(" \"date\": parsed.headers.iter()\n");
2255 code.push_str(" .find(|h| h.get_key() == \"Date\")\n");
2256 code.push_str(" .map(|h| h.get_value()),\n");
2257 code.push_str(" \"body\": parsed.get_body()?,\n");
2258 code.push_str(" \"attachments\": parsed.subparts.iter()\n");
2259 code.push_str(" .filter(|p| p.get_content_disposition().disposition == mailparse::DispositionType::Attachment)\n");
2260 code.push_str(" .map(|p| serde_json::json!({\n");
2261 code.push_str(" \"filename\": p.get_content_disposition().params.get(\"filename\"),\n");
2262 code.push_str(" \"content_type\": p.ctype.mimetype.clone(),\n");
2263 code.push_str(" \"size\": p.get_body_raw()?.len()\n");
2264 code.push_str(" }))\n");
2265 code.push_str(" .collect::<Vec<_>>()\n");
2266 code.push_str(" });\n");
2267 code.push_str(" emails.push(email_data);\n");
2268
2269 if imap.mark_as_read {
2271 code.push_str("\n // Mark as read\n");
2272 code.push_str(
2273 " session.store(uid.to_string(), \"+FLAGS (\\\\Seen)\")?;\n",
2274 );
2275 }
2276
2277 code.push_str(" }\n");
2278 code.push_str(" }\n");
2279 code.push_str(" }\n");
2280
2281 code.push_str("\n // Logout\n");
2283 code.push_str(" session.logout()?;\n");
2284
2285 code.push_str("\n let result = serde_json::json!({\n");
2287 code.push_str(" \"count\": emails.len(),\n");
2288 code.push_str(" \"emails\": emails\n");
2289 code.push_str(" });\n");
2290 code.push_str(&format!(
2291 " state.insert(\"{}\".to_string(), result.clone());\n",
2292 config.standard.mapping.output_key
2293 ));
2294 code.push_str(" Ok(result)\n");
2295 } else {
2296 code.push_str(" // No IMAP configuration provided\n");
2297 code.push_str(" Ok(serde_json::Value::Null)\n");
2298 }
2299
2300 code
2301}
2302
2303fn generate_smtp_send_code(_node_id: &str, config: &EmailNodeConfig) -> String {
2304 let mut code = String::new();
2305
2306 if let Some(smtp) = &config.smtp {
2307 code.push_str(" // SMTP email sending\n");
2308 code.push_str(" use lettre::{Message, SmtpTransport, Transport};\n");
2309 code.push_str(" use lettre::transport::smtp::authentication::Credentials;\n");
2310 code.push_str(" use lettre::message::{header::ContentType, Attachment, MultiPart, SinglePart};\n\n");
2311
2312 code.push_str(&format!(
2314 " let host = interpolate_variables(\"{}\", state);\n",
2315 smtp.host.replace('"', "\\\"")
2316 ));
2317 code.push_str(&format!(
2318 " let username = interpolate_variables(\"{}\", state);\n",
2319 smtp.username.replace('"', "\\\"")
2320 ));
2321 code.push_str(&format!(
2322 " let password = interpolate_variables(\"{}\", state);\n",
2323 smtp.password.replace('"', "\\\"")
2324 ));
2325 code.push_str(&format!(
2326 " let from_email = interpolate_variables(\"{}\", state);\n",
2327 smtp.from_email.replace('"', "\\\"")
2328 ));
2329
2330 if let Some(from_name) = &smtp.from_name {
2331 code.push_str(&format!(
2332 " let from_name = interpolate_variables(\"{}\", state);\n",
2333 from_name.replace('"', "\\\"")
2334 ));
2335 }
2336
2337 if let Some(recipients) = &config.recipients {
2339 code.push_str(&format!(
2340 "\n let to = interpolate_variables(\"{}\", state);\n",
2341 recipients.to.replace('"', "\\\"")
2342 ));
2343
2344 if let Some(cc) = &recipients.cc {
2345 code.push_str(&format!(
2346 " let cc = interpolate_variables(\"{}\", state);\n",
2347 cc.replace('"', "\\\"")
2348 ));
2349 }
2350
2351 if let Some(bcc) = &recipients.bcc {
2352 code.push_str(&format!(
2353 " let bcc = interpolate_variables(\"{}\", state);\n",
2354 bcc.replace('"', "\\\"")
2355 ));
2356 }
2357 }
2358
2359 if let Some(content) = &config.content {
2361 code.push_str(&format!(
2362 "\n let subject = interpolate_variables(\"{}\", state);\n",
2363 content.subject.replace('"', "\\\"")
2364 ));
2365 code.push_str(&format!(
2366 " let body = interpolate_variables(\"{}\", state);\n",
2367 content.body.replace('"', "\\\"").replace('\n', "\\n")
2368 ));
2369 }
2370
2371 code.push_str("\n // Build email message\n");
2373
2374 if smtp.from_name.is_some() {
2375 code.push_str(" let from = format!(\"{} <{}>\", from_name, from_email).parse()?;\n");
2377 } else {
2378 code.push_str(" let from = from_email.parse()?;\n");
2379 }
2380
2381 code.push_str(" let mut message_builder = Message::builder()\n");
2382 code.push_str(" .from(from)\n");
2383
2384 code.push_str(" .to(to.parse()?);\n");
2386
2387 if config
2388 .recipients
2389 .as_ref()
2390 .and_then(|r| r.cc.as_ref())
2391 .is_some()
2392 {
2393 code.push_str(" message_builder = message_builder.cc(cc.parse()?);\n");
2394 }
2395
2396 if config
2397 .recipients
2398 .as_ref()
2399 .and_then(|r| r.bcc.as_ref())
2400 .is_some()
2401 {
2402 code.push_str(" message_builder = message_builder.bcc(bcc.parse()?);\n");
2403 }
2404
2405 code.push_str(" message_builder = message_builder.subject(&subject);\n");
2406
2407 if let Some(content) = &config.content {
2409 match content.body_type {
2410 EmailBodyType::Html => {
2411 code.push_str("\n // HTML body\n");
2412 code.push_str(" let body_part = SinglePart::builder()\n");
2413 code.push_str(" .header(ContentType::TEXT_HTML)\n");
2414 code.push_str(" .body(body);\n");
2415 }
2416 EmailBodyType::Text => {
2417 code.push_str("\n // Plain text body\n");
2418 code.push_str(" let body_part = SinglePart::builder()\n");
2419 code.push_str(" .header(ContentType::TEXT_PLAIN)\n");
2420 code.push_str(" .body(body);\n");
2421 }
2422 }
2423 } else {
2424 code.push_str(" let body_part = SinglePart::builder()\n");
2425 code.push_str(" .header(ContentType::TEXT_PLAIN)\n");
2426 code.push_str(" .body(String::new());\n");
2427 }
2428
2429 if let Some(attachments) = &config.attachments {
2431 if !attachments.is_empty() {
2432 code.push_str("\n // Build multipart message with attachments\n");
2433 code.push_str(
2434 " let mut multipart = MultiPart::mixed().singlepart(body_part);\n\n",
2435 );
2436
2437 for (i, attachment) in attachments.iter().enumerate() {
2438 code.push_str(&format!(
2439 " // Attachment {}: {}\n",
2440 i + 1,
2441 attachment.filename
2442 ));
2443 code.push_str(&format!(
2444 " if let Some(attachment_data) = state.get(\"{}\") {{\n",
2445 attachment.state_key
2446 ));
2447 code.push_str(
2448 " let data = if let Some(s) = attachment_data.as_str() {\n",
2449 );
2450 code.push_str(" base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s)?\n");
2451 code.push_str(" } else {\n");
2452 code.push_str(" serde_json::to_vec(attachment_data)?\n");
2453 code.push_str(" };\n");
2454
2455 let mime_type = attachment
2456 .mime_type
2457 .as_deref()
2458 .unwrap_or("application/octet-stream");
2459 code.push_str(&format!(
2460 " let attachment = Attachment::new(\"{}\".to_string())\n",
2461 attachment.filename
2462 ));
2463 code.push_str(&format!(
2464 " .body(data, \"{}\".parse()?);\n",
2465 mime_type
2466 ));
2467 code.push_str(" multipart = multipart.singlepart(attachment);\n");
2468 code.push_str(" }\n\n");
2469 }
2470
2471 code.push_str(" let email = message_builder.multipart(multipart)?;\n");
2472 } else {
2473 code.push_str("\n let email = message_builder.singlepart(body_part)?;\n");
2474 }
2475 } else {
2476 code.push_str("\n let email = message_builder.singlepart(body_part)?;\n");
2477 }
2478
2479 code.push_str("\n // Create SMTP transport\n");
2481 code.push_str(" let creds = Credentials::new(username, password);\n");
2482
2483 if smtp.secure {
2484 code.push_str(&format!(
2485 " let mailer = SmtpTransport::relay(&host)?\n\
2486 .port({})\n\
2487 .credentials(creds)\n\
2488 .build();\n",
2489 smtp.port
2490 ));
2491 } else {
2492 code.push_str(&format!(
2493 " let mailer = SmtpTransport::builder_dangerous(&host)\n\
2494 .port({})\n\
2495 .credentials(creds)\n\
2496 .build();\n",
2497 smtp.port
2498 ));
2499 }
2500
2501 code.push_str("\n // Send email\n");
2503 code.push_str(" let response = mailer.send(&email)?;\n");
2504 code.push_str(" tracing::info!(\"Email sent successfully\");\n");
2505
2506 code.push_str("\n let result = serde_json::json!({\n");
2508 code.push_str(" \"success\": true,\n");
2509 code.push_str(" \"message_id\": response.message().next().map(|s| s.to_string())\n");
2510 code.push_str(" });\n");
2511 code.push_str(&format!(
2512 " state.insert(\"{}\".to_string(), result.clone());\n",
2513 config.standard.mapping.output_key
2514 ));
2515 code.push_str(" Ok(result)\n");
2516 } else {
2517 code.push_str(" // No SMTP configuration provided\n");
2518 code.push_str(" Ok(serde_json::Value::Null)\n");
2519 }
2520
2521 code
2522}
2523
2524impl ActionNodeCodeGen for NotificationNodeConfig {
2529 fn generate_code(&self, node_id: &str) -> String {
2530 let mut code = String::new();
2531
2532 code.push_str(&format!("// Notification Node: {}\n", self.standard.name));
2533 code.push_str(&format!("async fn {}_notification(\n", node_id));
2534 code.push_str(" state: &mut State,\n");
2535 code.push_str(" client: &reqwest::Client,\n");
2536 code.push_str(") -> Result<serde_json::Value, ActionError> {\n");
2537
2538 code.push_str(&format!(
2540 " let webhook_url = interpolate_variables(\"{}\", state);\n",
2541 self.webhook_url.replace('"', "\\\"")
2542 ));
2543 code.push_str(" tracing::debug!(channel = \"{}\", \"Sending notification\");\n\n");
2544
2545 match self.channel {
2547 NotificationChannel::Slack => {
2548 code.push_str(&generate_slack_payload(self));
2549 }
2550 NotificationChannel::Discord => {
2551 code.push_str(&generate_discord_payload(self));
2552 }
2553 NotificationChannel::Teams => {
2554 code.push_str(&generate_teams_payload(self));
2555 }
2556 NotificationChannel::Webhook => {
2557 code.push_str(&generate_generic_webhook_payload(self));
2558 }
2559 }
2560
2561 code.push_str("\n // Send notification\n");
2563 code.push_str(" let response = client.post(&webhook_url)\n");
2564 code.push_str(" .header(\"Content-Type\", \"application/json\")\n");
2565 code.push_str(" .json(&payload)\n");
2566 code.push_str(" .send()\n");
2567 code.push_str(" .await\n");
2568 code.push_str(" .map_err(|e| ActionError::NotificationSend(e.to_string()))?;\n\n");
2569
2570 code.push_str(" let status = response.status();\n");
2572 code.push_str(" if !status.is_success() {\n");
2573 code.push_str(" let error_body = response.text().await.unwrap_or_default();\n");
2574 code.push_str(" return Err(ActionError::NotificationSend(format!(\n");
2575 code.push_str(" \"Notification failed with status {}: {}\",\n");
2576 code.push_str(" status, error_body\n");
2577 code.push_str(" )));\n");
2578 code.push_str(" }\n\n");
2579
2580 code.push_str(" let result = serde_json::json!({\n");
2582 code.push_str(" \"success\": true,\n");
2583 code.push_str(&format!(" \"channel\": \"{:?}\",\n", self.channel));
2584 code.push_str(" \"status\": status.as_u16()\n");
2585 code.push_str(" });\n");
2586 code.push_str(&format!(
2587 " state.insert(\"{}\".to_string(), result.clone());\n",
2588 self.standard.mapping.output_key
2589 ));
2590 code.push_str(" Ok(result)\n");
2591 code.push_str("}\n\n");
2592
2593 code
2594 }
2595
2596 fn required_imports(&self) -> Vec<&'static str> {
2597 vec!["reqwest", "serde_json"]
2598 }
2599
2600 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
2601 vec![
2602 ("reqwest", "{ version = \"0.12\", features = [\"json\"] }"),
2603 ("serde_json", "1"),
2604 ]
2605 }
2606}
2607
2608fn generate_slack_payload(config: &NotificationNodeConfig) -> String {
2609 let mut code = String::new();
2610
2611 code.push_str(" // Build Slack payload\n");
2612 code.push_str(&format!(
2613 " let text = interpolate_variables(\"{}\", state);\n",
2614 config
2615 .message
2616 .text
2617 .replace('"', "\\\"")
2618 .replace('\n', "\\n")
2619 ));
2620
2621 if let Some(blocks) = &config.message.blocks {
2623 if !blocks.is_empty() {
2624 code.push_str(" let blocks = serde_json::json!(");
2625 code.push_str(&serde_json::to_string(blocks).unwrap_or_else(|_| "[]".to_string()));
2626 code.push_str(");\n");
2627 code.push_str(" let mut payload = serde_json::json!({\n");
2628 code.push_str(" \"text\": text,\n");
2629 code.push_str(" \"blocks\": blocks\n");
2630 code.push_str(" });\n");
2631 } else {
2632 code.push_str(" let mut payload = serde_json::json!({ \"text\": text });\n");
2633 }
2634 } else {
2635 match config.message.format {
2637 MessageFormat::Markdown => {
2638 code.push_str(" // Slack uses mrkdwn format\n");
2639 code.push_str(" let mut payload = serde_json::json!({\n");
2640 code.push_str(" \"text\": text,\n");
2641 code.push_str(" \"mrkdwn\": true\n");
2642 code.push_str(" });\n");
2643 }
2644 _ => {
2645 code.push_str(" let mut payload = serde_json::json!({ \"text\": text });\n");
2646 }
2647 }
2648 }
2649
2650 if let Some(username) = &config.username {
2652 code.push_str(&format!(
2653 " payload[\"username\"] = serde_json::json!(interpolate_variables(\"{}\", state));\n",
2654 username.replace('"', "\\\"")
2655 ));
2656 }
2657
2658 if let Some(icon_url) = &config.icon_url {
2659 code.push_str(&format!(
2660 " payload[\"icon_url\"] = serde_json::json!(interpolate_variables(\"{}\", state));\n",
2661 icon_url.replace('"', "\\\"")
2662 ));
2663 }
2664
2665 if let Some(channel) = &config.target_channel {
2666 code.push_str(&format!(
2667 " payload[\"channel\"] = serde_json::json!(interpolate_variables(\"{}\", state));\n",
2668 channel.replace('"', "\\\"")
2669 ));
2670 }
2671
2672 code
2673}
2674
2675fn generate_discord_payload(config: &NotificationNodeConfig) -> String {
2676 let mut code = String::new();
2677
2678 code.push_str(" // Build Discord payload\n");
2679 code.push_str(&format!(
2680 " let content = interpolate_variables(\"{}\", state);\n",
2681 config
2682 .message
2683 .text
2684 .replace('"', "\\\"")
2685 .replace('\n', "\\n")
2686 ));
2687
2688 if let Some(blocks) = &config.message.blocks {
2690 if !blocks.is_empty() {
2691 code.push_str(" let embeds = serde_json::json!(");
2692 code.push_str(&serde_json::to_string(blocks).unwrap_or_else(|_| "[]".to_string()));
2693 code.push_str(");\n");
2694 code.push_str(" let mut payload = serde_json::json!({\n");
2695 code.push_str(" \"content\": content,\n");
2696 code.push_str(" \"embeds\": embeds\n");
2697 code.push_str(" });\n");
2698 } else {
2699 code.push_str(" let mut payload = serde_json::json!({ \"content\": content });\n");
2700 }
2701 } else {
2702 code.push_str(" let mut payload = serde_json::json!({ \"content\": content });\n");
2703 }
2704
2705 if let Some(username) = &config.username {
2707 code.push_str(&format!(
2708 " payload[\"username\"] = serde_json::json!(interpolate_variables(\"{}\", state));\n",
2709 username.replace('"', "\\\"")
2710 ));
2711 }
2712
2713 if let Some(icon_url) = &config.icon_url {
2714 code.push_str(&format!(
2715 " payload[\"avatar_url\"] = serde_json::json!(interpolate_variables(\"{}\", state));\n",
2716 icon_url.replace('"', "\\\"")
2717 ));
2718 }
2719
2720 code
2721}
2722
2723fn generate_teams_payload(config: &NotificationNodeConfig) -> String {
2724 let mut code = String::new();
2725
2726 code.push_str(" // Build Microsoft Teams payload (Adaptive Card format)\n");
2727 code.push_str(&format!(
2728 " let text = interpolate_variables(\"{}\", state);\n",
2729 config
2730 .message
2731 .text
2732 .replace('"', "\\\"")
2733 .replace('\n', "\\n")
2734 ));
2735
2736 if let Some(blocks) = &config.message.blocks {
2738 if !blocks.is_empty() {
2739 code.push_str(" // Using custom Adaptive Card\n");
2740 code.push_str(" let payload = serde_json::json!(");
2741 code.push_str(&serde_json::to_string(blocks).unwrap_or_else(|_| "{}".to_string()));
2742 code.push_str(");\n");
2743 } else {
2744 code.push_str(&generate_teams_simple_card());
2745 }
2746 } else {
2747 code.push_str(&generate_teams_simple_card());
2748 }
2749
2750 code
2751}
2752
2753fn generate_teams_simple_card() -> String {
2754 let mut code = String::new();
2755
2756 code.push_str(" // Simple message card format\n");
2757 code.push_str(" let payload = serde_json::json!({\n");
2758 code.push_str(" \"@type\": \"MessageCard\",\n");
2759 code.push_str(" \"@context\": \"http://schema.org/extensions\",\n");
2760 code.push_str(" \"summary\": &text,\n");
2761 code.push_str(" \"sections\": [{\n");
2762 code.push_str(" \"activityTitle\": \"Notification\",\n");
2763 code.push_str(" \"text\": &text\n");
2764 code.push_str(" }]\n");
2765 code.push_str(" });\n");
2766
2767 code
2768}
2769
2770fn generate_generic_webhook_payload(config: &NotificationNodeConfig) -> String {
2771 let mut code = String::new();
2772
2773 code.push_str(" // Build generic webhook payload\n");
2774 code.push_str(&format!(
2775 " let message = interpolate_variables(\"{}\", state);\n",
2776 config
2777 .message
2778 .text
2779 .replace('"', "\\\"")
2780 .replace('\n', "\\n")
2781 ));
2782
2783 if let Some(blocks) = &config.message.blocks {
2785 if !blocks.is_empty() {
2786 code.push_str(" // Using custom payload structure\n");
2787 code.push_str(" let payload = serde_json::json!(");
2788 code.push_str(&serde_json::to_string(blocks).unwrap_or_else(|_| "{}".to_string()));
2789 code.push_str(");\n");
2790 } else {
2791 code.push_str(" let payload = serde_json::json!({\n");
2792 code.push_str(" \"message\": message,\n");
2793 code.push_str(" \"timestamp\": chrono::Utc::now().to_rfc3339()\n");
2794 code.push_str(" });\n");
2795 }
2796 } else {
2797 code.push_str(" let payload = serde_json::json!({\n");
2798 code.push_str(" \"message\": message,\n");
2799 code.push_str(" \"timestamp\": chrono::Utc::now().to_rfc3339()\n");
2800 code.push_str(" });\n");
2801 }
2802
2803 code
2804}
2805
2806impl ActionNodeCodeGen for RssNodeConfig {
2811 fn generate_code(&self, node_id: &str) -> String {
2812 let mut code = String::new();
2813
2814 code.push_str(&format!("// RSS/Feed Node: {}\n", self.standard.name));
2815 code.push_str(&format!("async fn {}_rss(\n", node_id));
2816 code.push_str(" state: &mut State,\n");
2817 code.push_str(" client: &reqwest::Client,\n");
2818 code.push_str(") -> Result<serde_json::Value, ActionError> {\n");
2819
2820 code.push_str(&format!(
2822 " let feed_url = interpolate_variables(\"{}\", state);\n",
2823 self.feed_url.replace('"', "\\\"")
2824 ));
2825 code.push_str(" tracing::debug!(url = %feed_url, \"Fetching RSS feed\");\n\n");
2826
2827 code.push_str(" // Fetch feed content\n");
2829 code.push_str(" let response = client.get(&feed_url)\n");
2830 code.push_str(" .header(\"User-Agent\", \"ADK-Studio-RSS/1.0\")\n");
2831 code.push_str(" .send()\n");
2832 code.push_str(" .await\n");
2833 code.push_str(" .map_err(|e| ActionError::RssFetch(e.to_string()))?;\n\n");
2834
2835 code.push_str(" if !response.status().is_success() {\n");
2836 code.push_str(" return Err(ActionError::RssFetch(format!(\n");
2837 code.push_str(" \"Feed returned status {}\", response.status()\n");
2838 code.push_str(" )));\n");
2839 code.push_str(" }\n\n");
2840
2841 code.push_str(" let content = response.bytes().await\n");
2842 code.push_str(" .map_err(|e| ActionError::RssFetch(e.to_string()))?;\n\n");
2843
2844 code.push_str(" // Parse feed using feed-rs\n");
2846 code.push_str(" let feed = feed_rs::parser::parse(&content[..])\n");
2847 code.push_str(" .map_err(|e| ActionError::RssParse(e.to_string()))?;\n\n");
2848
2849 if let Some(tracking) = &self.seen_tracking {
2851 if tracking.enabled {
2852 code.push_str(" // Load seen items for deduplication\n");
2853 code.push_str(&format!(" let seen_key = \"{}\";\n", tracking.state_key));
2854 code.push_str(
2855 " let mut seen_items: std::collections::HashSet<String> = state\n",
2856 );
2857 code.push_str(" .get(seen_key)\n");
2858 code.push_str(" .and_then(|v| v.as_array())\n");
2859 code.push_str(" .map(|arr| arr.iter().filter_map(|v| v.as_str().map(String::from)).collect())\n");
2860 code.push_str(" .unwrap_or_default();\n\n");
2861 }
2862 }
2863
2864 code.push_str(" // Process feed entries\n");
2866 code.push_str(" let mut entries = Vec::new();\n");
2867 code.push_str(" for entry in feed.entries.iter() {\n");
2868
2869 if let Some(tracking) = &self.seen_tracking {
2871 if tracking.enabled {
2872 code.push_str(" // Skip already seen items\n");
2873 code.push_str(" let entry_id = entry.id.clone();\n");
2874 code.push_str(" if seen_items.contains(&entry_id) {\n");
2875 code.push_str(" continue;\n");
2876 code.push_str(" }\n\n");
2877 }
2878 }
2879
2880 if let Some(filters) = &self.filters {
2882 code.push_str(&generate_rss_filters(filters));
2883 }
2884
2885 code.push_str(" // Build entry data\n");
2887 code.push_str(" let entry_data = serde_json::json!({\n");
2888 code.push_str(" \"id\": entry.id,\n");
2889 code.push_str(" \"title\": entry.title.as_ref().map(|t| t.content.clone()),\n");
2890 code.push_str(" \"link\": entry.links.first().map(|l| l.href.clone()),\n");
2891 code.push_str(" \"published\": entry.published.map(|d| d.to_rfc3339()),\n");
2892 code.push_str(" \"updated\": entry.updated.map(|d| d.to_rfc3339()),\n");
2893 code.push_str(
2894 " \"summary\": entry.summary.as_ref().map(|s| s.content.clone()),\n",
2895 );
2896
2897 if self.include_content {
2898 code.push_str(
2899 " \"content\": entry.content.as_ref().map(|c| c.body.clone()),\n",
2900 );
2901 }
2902
2903 code.push_str(
2904 " \"authors\": entry.authors.iter().map(|a| serde_json::json!({\n",
2905 );
2906 code.push_str(" \"name\": a.name.clone(),\n");
2907 code.push_str(" \"email\": a.email.clone(),\n");
2908 code.push_str(" \"uri\": a.uri.clone()\n");
2909 code.push_str(" })).collect::<Vec<_>>(),\n");
2910 code.push_str(" \"categories\": entry.categories.iter().map(|c| c.term.clone()).collect::<Vec<_>>(),\n");
2911
2912 if self.parse_media {
2913 code.push_str(
2914 " \"media\": entry.media.iter().map(|m| serde_json::json!({\n",
2915 );
2916 code.push_str(
2917 " \"title\": m.title.as_ref().map(|t| t.content.clone()),\n",
2918 );
2919 code.push_str(
2920 " \"content\": m.content.iter().map(|c| serde_json::json!({\n",
2921 );
2922 code.push_str(" \"url\": c.url.as_ref().map(|u| u.to_string()),\n");
2923 code.push_str(" \"content_type\": c.content_type.as_ref().map(|t| t.to_string()),\n");
2924 code.push_str(" \"size\": c.size\n");
2925 code.push_str(" })).collect::<Vec<_>>(),\n");
2926 code.push_str(
2927 " \"thumbnails\": m.thumbnails.iter().map(|t| serde_json::json!({\n",
2928 );
2929 code.push_str(" \"url\": t.image.uri.clone(),\n");
2930 code.push_str(" \"width\": t.image.width,\n");
2931 code.push_str(" \"height\": t.image.height\n");
2932 code.push_str(" })).collect::<Vec<_>>()\n");
2933 code.push_str(" })).collect::<Vec<_>>(),\n");
2934 }
2935
2936 code.push_str(" });\n\n");
2937
2938 code.push_str(" entries.push(entry_data);\n");
2939
2940 if let Some(tracking) = &self.seen_tracking {
2942 if tracking.enabled {
2943 code.push_str(" seen_items.insert(entry_id);\n");
2944 }
2945 }
2946
2947 if let Some(max) = self.max_entries {
2949 code.push_str(&format!("\n // Limit to {} entries\n", max));
2950 code.push_str(&format!(" if entries.len() >= {} {{\n", max));
2951 code.push_str(" break;\n");
2952 code.push_str(" }\n");
2953 }
2954
2955 code.push_str(" }\n\n");
2956
2957 if let Some(tracking) = &self.seen_tracking {
2959 if tracking.enabled {
2960 code.push_str(" // Update seen items in state (with max limit)\n");
2961 code.push_str(&format!(" let max_seen = {};\n", tracking.max_items));
2962 code.push_str(" let seen_vec: Vec<String> = seen_items.into_iter()\n");
2963 code.push_str(" .take(max_seen as usize)\n");
2964 code.push_str(" .collect();\n");
2965 code.push_str(
2966 " state.insert(seen_key.to_string(), serde_json::json!(seen_vec));\n\n",
2967 );
2968 }
2969 }
2970
2971 code.push_str(" // Build result\n");
2973 code.push_str(" let result = serde_json::json!({\n");
2974 code.push_str(" \"feed\": {\n");
2975 code.push_str(" \"title\": feed.title.as_ref().map(|t| t.content.clone()),\n");
2976 code.push_str(
2977 " \"description\": feed.description.as_ref().map(|d| d.content.clone()),\n",
2978 );
2979 code.push_str(" \"link\": feed.links.first().map(|l| l.href.clone()),\n");
2980 code.push_str(" \"updated\": feed.updated.map(|d| d.to_rfc3339()),\n");
2981 code.push_str(" \"language\": feed.language.clone()\n");
2982 code.push_str(" },\n");
2983 code.push_str(" \"count\": entries.len(),\n");
2984 code.push_str(" \"entries\": entries\n");
2985 code.push_str(" });\n\n");
2986
2987 code.push_str(&format!(
2988 " state.insert(\"{}\".to_string(), result.clone());\n",
2989 self.standard.mapping.output_key
2990 ));
2991 code.push_str(
2992 " tracing::info!(count = entries.len(), \"Processed RSS feed entries\");\n",
2993 );
2994 code.push_str(" Ok(result)\n");
2995 code.push_str("}\n\n");
2996
2997 code
2998 }
2999
3000 fn required_imports(&self) -> Vec<&'static str> {
3001 vec!["reqwest", "feed_rs", "serde_json", "chrono"]
3002 }
3003
3004 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
3005 vec![
3006 ("reqwest", "{ version = \"0.12\", features = [\"json\"] }"),
3007 ("feed-rs", "2"),
3008 ("serde_json", "1"),
3009 ("chrono", "0.4"),
3010 ]
3011 }
3012}
3013
3014fn generate_rss_filters(filters: &FeedFilter) -> String {
3015 let mut code = String::new();
3016
3017 if let Some(keywords) = &filters.keywords {
3019 if !keywords.is_empty() {
3020 code.push_str(" // Keyword filter\n");
3021 code.push_str(" let title_text = entry.title.as_ref().map(|t| t.content.to_lowercase()).unwrap_or_default();\n");
3022 code.push_str(" let summary_text = entry.summary.as_ref().map(|s| s.content.to_lowercase()).unwrap_or_default();\n");
3023 code.push_str(" let keywords = vec![");
3024 for (i, kw) in keywords.iter().enumerate() {
3025 if i > 0 {
3026 code.push_str(", ");
3027 }
3028 code.push_str(&format!("\"{}\"", kw.to_lowercase().replace('"', "\\\"")));
3029 }
3030 code.push_str("];\n");
3031 code.push_str(" let has_keyword = keywords.iter().any(|kw| {\n");
3032 code.push_str(" title_text.contains(kw) || summary_text.contains(kw)\n");
3033 code.push_str(" });\n");
3034 code.push_str(" if !has_keyword {\n");
3035 code.push_str(" continue;\n");
3036 code.push_str(" }\n\n");
3037 }
3038 }
3039
3040 if let Some(author) = &filters.author {
3042 code.push_str(" // Author filter\n");
3043 code.push_str(&format!(
3044 " let author_filter = \"{}\".to_lowercase();\n",
3045 author.to_lowercase().replace('"', "\\\"")
3046 ));
3047 code.push_str(" let has_author = entry.authors.iter().any(|a| {\n");
3048 code.push_str(" a.name.to_lowercase().contains(&author_filter)\n");
3049 code.push_str(" });\n");
3050 code.push_str(" if !has_author {\n");
3051 code.push_str(" continue;\n");
3052 code.push_str(" }\n\n");
3053 }
3054
3055 if let Some(date_from) = &filters.date_from {
3057 code.push_str(" // Date from filter\n");
3058 code.push_str(&format!(
3059 " let date_from = chrono::DateTime::parse_from_rfc3339(\"{}\")\n",
3060 date_from
3061 ));
3062 code.push_str(" .map(|d| d.with_timezone(&chrono::Utc))\n");
3063 code.push_str(" .ok();\n");
3064 code.push_str(" if let Some(from) = date_from {\n");
3065 code.push_str(" let entry_date = entry.published.or(entry.updated);\n");
3066 code.push_str(" if let Some(ed) = entry_date {\n");
3067 code.push_str(" if ed < from {\n");
3068 code.push_str(" continue;\n");
3069 code.push_str(" }\n");
3070 code.push_str(" }\n");
3071 code.push_str(" }\n\n");
3072 }
3073
3074 if let Some(date_to) = &filters.date_to {
3076 code.push_str(" // Date to filter\n");
3077 code.push_str(&format!(
3078 " let date_to = chrono::DateTime::parse_from_rfc3339(\"{}\")\n",
3079 date_to
3080 ));
3081 code.push_str(" .map(|d| d.with_timezone(&chrono::Utc))\n");
3082 code.push_str(" .ok();\n");
3083 code.push_str(" if let Some(to) = date_to {\n");
3084 code.push_str(" let entry_date = entry.published.or(entry.updated);\n");
3085 code.push_str(" if let Some(ed) = entry_date {\n");
3086 code.push_str(" if ed > to {\n");
3087 code.push_str(" continue;\n");
3088 code.push_str(" }\n");
3089 code.push_str(" }\n");
3090 code.push_str(" }\n\n");
3091 }
3092
3093 if let Some(categories) = &filters.categories {
3095 if !categories.is_empty() {
3096 code.push_str(" // Category filter\n");
3097 code.push_str(" let category_filters: Vec<String> = vec![");
3098 for (i, cat) in categories.iter().enumerate() {
3099 if i > 0 {
3100 code.push_str(", ");
3101 }
3102 code.push_str(&format!("\"{}\"", cat.to_lowercase().replace('"', "\\\"")));
3103 }
3104 code.push_str("].into_iter().map(String::from).collect();\n");
3105 code.push_str(" let entry_categories: Vec<String> = entry.categories.iter()\n");
3106 code.push_str(" .map(|c| c.term.to_lowercase())\n");
3107 code.push_str(" .collect();\n");
3108 code.push_str(" let has_category = category_filters.iter().any(|cf| {\n");
3109 code.push_str(" entry_categories.iter().any(|ec| ec.contains(cf))\n");
3110 code.push_str(" });\n");
3111 code.push_str(" if !has_category {\n");
3112 code.push_str(" continue;\n");
3113 code.push_str(" }\n\n");
3114 }
3115 }
3116
3117 code
3118}
3119
3120impl ActionNodeCodeGen for FileNodeConfig {
3125 fn generate_code(&self, node_id: &str) -> String {
3126 let mut code = String::new();
3127
3128 code.push_str(&format!("// File Node: {}\n", self.standard.name));
3129 code.push_str(&format!("async fn {}_file(\n", node_id));
3130 code.push_str(" state: &mut State,\n");
3131 code.push_str(") -> Result<serde_json::Value, ActionError> {\n");
3132
3133 match self.operation {
3134 FileOperation::Read => {
3135 code.push_str(" // Read file operation\n");
3136 if let Some(local) = &self.local {
3137 code.push_str(&format!(
3138 " let path = interpolate_variables(\"{}\", state);\n",
3139 local.path.replace('"', "\\\"")
3140 ));
3141 code.push_str(" tracing::debug!(path = %path, \"Reading file\");\n\n");
3142 code.push_str(" let content = tokio::fs::read_to_string(&path).await\n");
3143 code.push_str(
3144 " .map_err(|e| ActionError::FileRead(e.to_string()))?;\n\n",
3145 );
3146
3147 if let Some(parse) = &self.parse {
3149 match parse.format {
3150 FileFormat::Json => {
3151 code.push_str(" let parsed: serde_json::Value = serde_json::from_str(&content)\n");
3152 code.push_str(" .map_err(|e| ActionError::FileParse(e.to_string()))?;\n");
3153 }
3154 FileFormat::Csv => {
3155 code.push_str(" // Parse CSV content\n");
3156 code.push_str(" let mut reader = csv::Reader::from_reader(content.as_bytes());\n");
3157 code.push_str(" let records: Vec<serde_json::Value> = reader.deserialize()\n");
3158 code.push_str(" .filter_map(|r: Result<serde_json::Value, _>| r.ok())\n");
3159 code.push_str(" .collect();\n");
3160 code.push_str(" let parsed = serde_json::json!(records);\n");
3161 }
3162 FileFormat::Xml => {
3163 code.push_str(" // XML parsing - convert to JSON\n");
3164 code.push_str(" let parsed = serde_json::json!({ \"content\": content });\n");
3165 }
3166 FileFormat::Text | FileFormat::Binary => {
3167 code.push_str(" let parsed = serde_json::json!({ \"content\": content });\n");
3168 }
3169 }
3170 } else {
3171 code.push_str(
3172 " let parsed = serde_json::json!({ \"content\": content });\n",
3173 );
3174 }
3175
3176 code.push_str("\n let result = serde_json::json!({\n");
3177 code.push_str(" \"path\": path,\n");
3178 code.push_str(" \"data\": parsed\n");
3179 code.push_str(" });\n");
3180 } else {
3181 code.push_str(" let result = serde_json::json!({ \"error\": \"No file path configured\" });\n");
3182 }
3183 }
3184 FileOperation::Write => {
3185 code.push_str(" // Write file operation\n");
3186 if let Some(local) = &self.local {
3187 code.push_str(&format!(
3188 " let path = interpolate_variables(\"{}\", state);\n",
3189 local.path.replace('"', "\\\"")
3190 ));
3191
3192 if let Some(write) = &self.write {
3193 code.push_str(&format!(
3194 " let content = interpolate_variables(\"{}\", state);\n",
3195 write.content.replace('"', "\\\"").replace('\n', "\\n")
3196 ));
3197 } else {
3198 code.push_str(" let content = String::new();\n");
3199 }
3200
3201 code.push_str(" tracing::debug!(path = %path, \"Writing file\");\n\n");
3202 code.push_str(" tokio::fs::write(&path, &content).await\n");
3203 code.push_str(
3204 " .map_err(|e| ActionError::FileWrite(e.to_string()))?;\n\n",
3205 );
3206 code.push_str(" let result = serde_json::json!({\n");
3207 code.push_str(" \"path\": path,\n");
3208 code.push_str(" \"bytes_written\": content.len()\n");
3209 code.push_str(" });\n");
3210 } else {
3211 code.push_str(" let result = serde_json::json!({ \"error\": \"No file path configured\" });\n");
3212 }
3213 }
3214 FileOperation::Delete => {
3215 code.push_str(" // Delete file operation\n");
3216 if let Some(local) = &self.local {
3217 code.push_str(&format!(
3218 " let path = interpolate_variables(\"{}\", state);\n",
3219 local.path.replace('"', "\\\"")
3220 ));
3221 code.push_str(" tracing::debug!(path = %path, \"Deleting file\");\n\n");
3222 code.push_str(" tokio::fs::remove_file(&path).await\n");
3223 code.push_str(
3224 " .map_err(|e| ActionError::FileDelete(e.to_string()))?;\n\n",
3225 );
3226 code.push_str(" let result = serde_json::json!({\n");
3227 code.push_str(" \"path\": path,\n");
3228 code.push_str(" \"deleted\": true\n");
3229 code.push_str(" });\n");
3230 } else {
3231 code.push_str(" let result = serde_json::json!({ \"error\": \"No file path configured\" });\n");
3232 }
3233 }
3234 FileOperation::List => {
3235 code.push_str(" // List files operation\n");
3236 if let Some(local) = &self.local {
3237 code.push_str(&format!(
3238 " let path = interpolate_variables(\"{}\", state);\n",
3239 local.path.replace('"', "\\\"")
3240 ));
3241 code.push_str(" tracing::debug!(path = %path, \"Listing directory\");\n\n");
3242 code.push_str(" let mut entries = Vec::new();\n");
3243 code.push_str(" let mut dir = tokio::fs::read_dir(&path).await\n");
3244 code.push_str(
3245 " .map_err(|e| ActionError::FileRead(e.to_string()))?;\n\n",
3246 );
3247 code.push_str(" while let Some(entry) = dir.next_entry().await\n");
3248 code.push_str(
3249 " .map_err(|e| ActionError::FileRead(e.to_string()))? {\n",
3250 );
3251 code.push_str(" let metadata = entry.metadata().await.ok();\n");
3252 code.push_str(" entries.push(serde_json::json!({\n");
3253 code.push_str(" \"name\": entry.file_name().to_string_lossy(),\n");
3254 code.push_str(" \"path\": entry.path().to_string_lossy(),\n");
3255 code.push_str(
3256 " \"is_file\": metadata.as_ref().map(|m| m.is_file()),\n",
3257 );
3258 code.push_str(
3259 " \"is_dir\": metadata.as_ref().map(|m| m.is_dir()),\n",
3260 );
3261 code.push_str(" \"size\": metadata.as_ref().map(|m| m.len())\n");
3262 code.push_str(" }));\n");
3263 code.push_str(" }\n\n");
3264 code.push_str(" let result = serde_json::json!({\n");
3265 code.push_str(" \"path\": path,\n");
3266 code.push_str(" \"count\": entries.len(),\n");
3267 code.push_str(" \"entries\": entries\n");
3268 code.push_str(" });\n");
3269 } else {
3270 code.push_str(" let result = serde_json::json!({ \"error\": \"No directory path configured\" });\n");
3271 }
3272 }
3273 }
3274
3275 code.push_str(&format!(
3276 "\n state.insert(\"{}\".to_string(), result.clone());\n",
3277 self.standard.mapping.output_key
3278 ));
3279 code.push_str(" Ok(result)\n");
3280 code.push_str("}\n\n");
3281
3282 code
3283 }
3284
3285 fn required_imports(&self) -> Vec<&'static str> {
3286 vec!["tokio", "serde_json"]
3287 }
3288
3289 fn required_dependencies(&self) -> Vec<(&'static str, &'static str)> {
3290 let mut deps = vec![
3291 ("tokio", "{ version = \"1\", features = [\"full\"] }"),
3292 ("serde_json", "1"),
3293 ];
3294
3295 if let Some(parse) = &self.parse {
3297 if parse.format == FileFormat::Csv {
3298 deps.push(("csv", "1"))
3299 }
3300 }
3301
3302 deps
3303 }
3304}
3305
3306pub fn generate_action_nodes_code(action_nodes: &HashMap<String, ActionNodeConfig>) -> String {
3312 let mut code = String::new();
3313
3314 code.push_str("// Action Nodes - Generated Code\n");
3316 code.push_str("// This code was generated by ADK Studio\n\n");
3317
3318 let mut imports: std::collections::HashSet<&str> = std::collections::HashSet::new();
3320 imports.insert("serde_json");
3321 imports.insert("tracing");
3322
3323 for node in action_nodes.values() {
3324 match node {
3325 ActionNodeConfig::Trigger(n) => imports.extend(n.required_imports()),
3326 ActionNodeConfig::Http(n) => imports.extend(n.required_imports()),
3327 ActionNodeConfig::Set(n) => imports.extend(n.required_imports()),
3328 ActionNodeConfig::Transform(n) => imports.extend(n.required_imports()),
3329 ActionNodeConfig::Switch(n) => imports.extend(n.required_imports()),
3330 ActionNodeConfig::Loop(n) => imports.extend(n.required_imports()),
3331 ActionNodeConfig::Merge(n) => imports.extend(n.required_imports()),
3332 ActionNodeConfig::Wait(n) => imports.extend(n.required_imports()),
3333 ActionNodeConfig::Code(n) => imports.extend(n.required_imports()),
3334 ActionNodeConfig::Database(n) => imports.extend(n.required_imports()),
3335 ActionNodeConfig::Email(n) => imports.extend(n.required_imports()),
3336 ActionNodeConfig::Notification(n) => imports.extend(n.required_imports()),
3337 ActionNodeConfig::Rss(n) => imports.extend(n.required_imports()),
3338 ActionNodeConfig::File(n) => imports.extend(n.required_imports()),
3339 }
3340 }
3341
3342 code.push_str("use std::collections::HashMap;\n");
3344 code.push_str("use serde_json::json;\n");
3345 code.push_str("use tracing;\n\n");
3346
3347 code.push_str("type State = HashMap<String, serde_json::Value>;\n\n");
3349
3350 code.push_str(generate_interpolation_helper());
3352 code.push('\n');
3353
3354 for (node_id, node) in action_nodes {
3356 let node_code = match node {
3357 ActionNodeConfig::Trigger(n) => n.generate_code(node_id),
3358 ActionNodeConfig::Http(n) => n.generate_code(node_id),
3359 ActionNodeConfig::Set(n) => n.generate_code(node_id),
3360 ActionNodeConfig::Transform(n) => n.generate_code(node_id),
3361 ActionNodeConfig::Switch(n) => n.generate_code(node_id),
3362 ActionNodeConfig::Loop(n) => n.generate_code(node_id),
3363 ActionNodeConfig::Merge(n) => n.generate_code(node_id),
3364 ActionNodeConfig::Wait(n) => n.generate_code(node_id),
3365 ActionNodeConfig::Code(n) => n.generate_code(node_id),
3366 ActionNodeConfig::Database(n) => n.generate_code(node_id),
3367 ActionNodeConfig::Email(n) => n.generate_code(node_id),
3368 ActionNodeConfig::Notification(n) => n.generate_code(node_id),
3369 ActionNodeConfig::Rss(n) => n.generate_code(node_id),
3370 ActionNodeConfig::File(n) => n.generate_code(node_id),
3371 };
3372 code.push_str(&node_code);
3373 }
3374
3375 code
3376}
3377
3378pub fn collect_action_node_dependencies(
3380 action_nodes: &HashMap<String, ActionNodeConfig>,
3381) -> Vec<(String, String)> {
3382 let mut deps: HashMap<String, String> = HashMap::new();
3383
3384 deps.insert("serde_json".to_string(), "1".to_string());
3386 deps.insert("tracing".to_string(), "0.1".to_string());
3387 deps.insert(
3388 "tokio".to_string(),
3389 "{ version = \"1\", features = [\"full\"] }".to_string(),
3390 );
3391 deps.insert("regex".to_string(), "1".to_string());
3392
3393 for node in action_nodes.values() {
3394 let node_deps: Vec<(&str, &str)> = match node {
3395 ActionNodeConfig::Trigger(n) => n.required_dependencies(),
3396 ActionNodeConfig::Http(n) => n.required_dependencies(),
3397 ActionNodeConfig::Set(n) => n.required_dependencies(),
3398 ActionNodeConfig::Transform(n) => n.required_dependencies(),
3399 ActionNodeConfig::Switch(n) => n.required_dependencies(),
3400 ActionNodeConfig::Loop(n) => n.required_dependencies(),
3401 ActionNodeConfig::Merge(n) => n.required_dependencies(),
3402 ActionNodeConfig::Wait(n) => n.required_dependencies(),
3403 ActionNodeConfig::Code(n) => n.required_dependencies(),
3404 ActionNodeConfig::Database(n) => n.required_dependencies(),
3405 ActionNodeConfig::Email(n) => n.required_dependencies(),
3406 ActionNodeConfig::Notification(n) => n.required_dependencies(),
3407 ActionNodeConfig::Rss(n) => n.required_dependencies(),
3408 ActionNodeConfig::File(n) => n.required_dependencies(),
3409 };
3410
3411 for (name, version) in node_deps {
3412 deps.insert(name.to_string(), version.to_string());
3413 }
3414 }
3415
3416 deps.into_iter().collect()
3417}
3418
3419pub fn validate_generated_code(code: &str) -> Result<(), String> {
3421 let open_braces = code.matches('{').count();
3425 let close_braces = code.matches('}').count();
3426 if open_braces != close_braces {
3427 return Err(format!(
3428 "Unbalanced braces: {} open, {} close",
3429 open_braces, close_braces
3430 ));
3431 }
3432
3433 let open_parens = code.matches('(').count();
3435 let close_parens = code.matches(')').count();
3436 if open_parens != close_parens {
3437 return Err(format!(
3438 "Unbalanced parentheses: {} open, {} close",
3439 open_parens, close_parens
3440 ));
3441 }
3442
3443 if code.contains(";;") {
3445 return Err("Double semicolon found".to_string());
3446 }
3447
3448 Ok(())
3455}
3456
3457#[cfg(test)]
3458mod tests {
3459 use super::*;
3460
3461 #[test]
3462 fn test_trigger_node_codegen() {
3463 let config = TriggerNodeConfig {
3464 standard: StandardProperties {
3465 id: "trigger_1".to_string(),
3466 name: "Start".to_string(),
3467 ..Default::default()
3468 },
3469 trigger_type: TriggerType::Manual,
3470 manual: Some(ManualTriggerConfig::default()),
3471 webhook: None,
3472 schedule: None,
3473 event: None,
3474 };
3475
3476 let code = config.generate_code("trigger_1");
3477 assert!(code.contains("async fn trigger_1_trigger"));
3478 assert!(code.contains("Manual trigger"));
3479 }
3480
3481 #[test]
3482 fn test_http_node_codegen() {
3483 let config = HttpNodeConfig {
3484 standard: StandardProperties {
3485 id: "http_1".to_string(),
3486 name: "API Call".to_string(),
3487 mapping: InputOutputMapping {
3488 output_key: "api_result".to_string(),
3489 ..Default::default()
3490 },
3491 ..Default::default()
3492 },
3493 method: HttpMethod::Get,
3494 url: "https://api.example.com/data".to_string(),
3495 auth: HttpAuth {
3496 auth_type: "none".to_string(),
3497 bearer: None,
3498 basic: None,
3499 api_key: None,
3500 },
3501 headers: HashMap::new(),
3502 body: HttpBody {
3503 body_type: "none".to_string(),
3504 content: None,
3505 },
3506 response: HttpResponse {
3507 response_type: "json".to_string(),
3508 status_validation: Some("200-299".to_string()),
3509 json_path: None,
3510 },
3511 rate_limit: None,
3512 };
3513
3514 let code = config.generate_code("http_1");
3515 assert!(code.contains("async fn http_1_http"));
3516 assert!(code.contains("client.get"));
3517 assert!(code.contains("api.example.com"));
3518 }
3519
3520 #[test]
3521 fn test_switch_node_codegen() {
3522 let config = SwitchNodeConfig {
3523 standard: StandardProperties {
3524 id: "switch_1".to_string(),
3525 name: "Router".to_string(),
3526 ..Default::default()
3527 },
3528 evaluation_mode: EvaluationMode::FirstMatch,
3529 conditions: vec![SwitchCondition {
3530 id: "cond_1".to_string(),
3531 name: "High".to_string(),
3532 field: "score".to_string(),
3533 operator: "gt".to_string(),
3534 value: Some(serde_json::json!(80)),
3535 output_port: "high".to_string(),
3536 }],
3537 default_branch: Some("default".to_string()),
3538 expression_mode: None,
3539 };
3540
3541 let code = config.generate_code("switch_1");
3542 assert!(code.contains("async fn switch_1_switch"));
3543 assert!(code.contains("First match"));
3544 assert!(code.contains("\"high\""));
3545 }
3546
3547 #[test]
3548 fn test_validate_generated_code() {
3549 let valid_code = r#"
3550async fn test() {
3551 let x = 1;
3552 something().await;
3553}
3554"#;
3555 assert!(validate_generated_code(valid_code).is_ok());
3556
3557 let unbalanced = "fn test() { { }";
3558 assert!(validate_generated_code(unbalanced).is_err());
3559 }
3560
3561 #[test]
3562 fn test_condition_comparison_generation() {
3563 assert!(generate_condition_comparison("eq", &Some(serde_json::json!(5))).contains("=="));
3564 assert!(generate_condition_comparison("gt", &Some(serde_json::json!(10))).contains(">"));
3565 assert!(
3566 generate_condition_comparison("contains", &Some(serde_json::json!("test")))
3567 .contains("contains")
3568 );
3569 }
3570}