1
2use async_trait::async_trait;
3use serde::{Serialize, Deserialize};
4use serde_json::Value;
5use tracing::{info, debug};
6use std::sync::Arc;
7
8use crate::core::Context;
9use crate::rule::RuleResult;
10
11#[derive(Debug, Serialize, Deserialize, Clone, Default)]
12pub enum NodeType {
13 #[default]
14 RuleNode,
15 DBNode,
16 AINode,
17 GrpcNode,
18 SubgraphNode,
19 ConditionalNode,
20 LoopNode,
21 TryCatchNode,
22 RetryNode,
23 CircuitBreakerNode,
24}
25
26#[async_trait]
27pub trait Node: Send + Sync {
28 fn id(&self) -> &str;
29 fn node_type(&self) -> NodeType;
30 async fn run(&self, ctx: &mut Context) -> RuleResult;
31}
32
33#[derive(Debug, Clone)]
38pub struct RuleNode {
39 pub id: String,
40 pub condition: String,
41}
42
43impl RuleNode {
44 pub fn new(id: impl Into<String>, condition: impl Into<String>) -> Self {
45 Self {
46 id: id.into(),
47 condition: condition.into(),
48 }
49 }
50}
51
52#[async_trait]
53impl Node for RuleNode {
54 fn id(&self) -> &str {
55 &self.id
56 }
57
58 fn node_type(&self) -> NodeType {
59 NodeType::RuleNode
60 }
61
62 async fn run(&self, ctx: &mut Context) -> RuleResult {
63 info!("RuleNode[{}]: Evaluating condition '{}'", self.id, self.condition);
64
65 let result = if self.condition == "true" {
67 Value::Bool(true)
68 } else if self.condition == "false" {
69 Value::Bool(false)
70 } else {
71 ctx.data.get(&self.condition).cloned().unwrap_or(Value::Bool(true))
73 };
74
75 debug!("RuleNode[{}]: Result = {:?}", self.id, result);
76 ctx.data.insert(format!("{}_result", self.id), result.clone());
77
78 Ok(result)
79 }
80}
81
82#[async_trait]
88pub trait DatabaseExecutor: Send + Sync {
89 async fn execute(&self, query: &str, params: &[&str]) -> Result<Value, String>;
91}
92
93#[derive(Debug, Clone)]
95pub struct MockDatabaseExecutor;
96
97#[async_trait]
98impl DatabaseExecutor for MockDatabaseExecutor {
99 async fn execute(&self, query: &str, _params: &[&str]) -> Result<Value, String> {
100 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
102
103 Ok(serde_json::json!({
104 "query": query,
105 "rows": [
106 {"id": 1, "name": "Alice", "active": true},
107 {"id": 2, "name": "Bob", "active": false}
108 ],
109 "count": 2
110 }))
111 }
112}
113
114#[derive(Clone)]
115pub struct DBNode {
116 pub id: String,
117 pub query: String,
118 executor: Option<Arc<dyn DatabaseExecutor>>,
119 param_keys: Option<Vec<String>>,
121}
122
123impl DBNode {
124 pub fn new(id: impl Into<String>, query: impl Into<String>) -> Self {
125 Self {
126 id: id.into(),
127 query: query.into(),
128 executor: None,
129 param_keys: None,
130 }
131 }
132
133 pub fn with_params(
135 id: impl Into<String>,
136 query: impl Into<String>,
137 param_keys: Vec<String>,
138 ) -> Self {
139 Self {
140 id: id.into(),
141 query: query.into(),
142 executor: None,
143 param_keys: Some(param_keys),
144 }
145 }
146
147 pub fn with_executor(
149 id: impl Into<String>,
150 query: impl Into<String>,
151 executor: Arc<dyn DatabaseExecutor>,
152 ) -> Self {
153 Self {
154 id: id.into(),
155 query: query.into(),
156 executor: Some(executor),
157 param_keys: None,
158 }
159 }
160
161 pub fn with_executor_and_params(
163 id: impl Into<String>,
164 query: impl Into<String>,
165 executor: Arc<dyn DatabaseExecutor>,
166 param_keys: Vec<String>,
167 ) -> Self {
168 Self {
169 id: id.into(),
170 query: query.into(),
171 executor: Some(executor),
172 param_keys: Some(param_keys),
173 }
174 }
175}
176
177impl std::fmt::Debug for DBNode {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 f.debug_struct("DBNode")
180 .field("id", &self.id)
181 .field("query", &self.query)
182 .field("has_executor", &self.executor.is_some())
183 .field("param_keys", &self.param_keys)
184 .finish()
185 }
186}
187
188#[async_trait]
189impl Node for DBNode {
190 fn id(&self) -> &str {
191 &self.id
192 }
193
194 fn node_type(&self) -> NodeType {
195 NodeType::DBNode
196 }
197
198 async fn run(&self, ctx: &mut Context) -> RuleResult {
199 info!("DBNode[{}]: Executing query '{}'", self.id, self.query);
200
201 let executor = self.executor.as_ref()
202 .map(|e| e.clone())
203 .unwrap_or_else(|| Arc::new(MockDatabaseExecutor) as Arc<dyn DatabaseExecutor>);
204
205 let params: Vec<String> = if let Some(keys) = &self.param_keys {
207 keys.iter()
208 .filter_map(|key| {
209 ctx.get(key).map(|value| {
210 match value {
212 Value::String(s) => s.clone(),
213 Value::Number(n) => n.to_string(),
214 Value::Bool(b) => b.to_string(),
215 Value::Null => "null".to_string(),
216 _ => value.to_string(),
217 }
218 })
219 })
220 .collect()
221 } else {
222 vec![]
223 };
224
225 if !params.is_empty() {
226 debug!("DBNode[{}]: Using {} parameter(s) from context: {:?}",
227 self.id, params.len(), params);
228 }
229
230 let params_refs: Vec<&str> = params.iter().map(|s| s.as_str()).collect();
231
232 let result = executor.execute(&self.query, ¶ms_refs).await
233 .map_err(|e| crate::rule::RuleError::Eval(format!("Database error: {}", e)))?;
234
235 debug!("DBNode[{}]: Query result = {:?}", self.id, result);
236 ctx.data.insert(format!("{}_result", self.id), result.clone());
237
238 Ok(result)
239 }
240}
241
242#[derive(Debug, Clone)]
247pub struct AINode {
248 pub id: String,
249 pub prompt: String,
250}
251
252impl AINode {
253 pub fn new(id: impl Into<String>, prompt: impl Into<String>) -> Self {
254 Self {
255 id: id.into(),
256 prompt: prompt.into(),
257 }
258 }
259}
260
261#[async_trait]
262impl Node for AINode {
263 fn id(&self) -> &str {
264 &self.id
265 }
266
267 fn node_type(&self) -> NodeType {
268 NodeType::AINode
269 }
270
271 async fn run(&self, ctx: &mut Context) -> RuleResult {
272 info!("AINode[{}]: Processing prompt '{}'", self.id, self.prompt);
273
274 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
276
277 let context_summary: Vec<String> = ctx.data.keys().cloned().collect();
279 let mock_response = serde_json::json!({
280 "prompt": self.prompt,
281 "response": format!("AI processed: {} with context keys: {:?}", self.prompt, context_summary),
282 "confidence": 0.95,
283 "model": "mock-gpt-4"
284 });
285
286 debug!("AINode[{}]: AI response = {:?}", self.id, mock_response);
287 ctx.data.insert(format!("{}_result", self.id), mock_response.clone());
288
289 Ok(mock_response)
290 }
291}
292
293#[derive(Debug, Clone)]
298pub struct GrpcNode {
299 pub id: String,
300 pub service_url: String,
301 pub method: String,
302}
303
304impl GrpcNode {
305 pub fn new(id: impl Into<String>, service_url: impl Into<String>, method: impl Into<String>) -> Self {
306 Self {
307 id: id.into(),
308 service_url: service_url.into(),
309 method: method.into(),
310 }
311 }
312}
313
314#[async_trait]
315impl Node for GrpcNode {
316 fn id(&self) -> &str {
317 &self.id
318 }
319
320 fn node_type(&self) -> NodeType {
321 NodeType::GrpcNode
322 }
323
324 async fn run(&self, ctx: &mut Context) -> RuleResult {
325 info!("GrpcNode[{}]: Calling gRPC service '{}' method '{}'",
326 self.id, self.service_url, self.method);
327
328 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
330
331 let mock_response = serde_json::json!({
333 "service": self.service_url,
334 "method": self.method,
335 "status": "OK",
336 "response": format!("gRPC call to {} completed", self.method),
337 "latency_ms": 100
338 });
339
340 debug!("GrpcNode[{}]: gRPC response = {:?}", self.id, mock_response);
341 ctx.data.insert(format!("{}_result", self.id), mock_response.clone());
342
343 Ok(mock_response)
344 }
345}
346
347use crate::core::{Graph, GraphDef};
352use crate::core::executor::Executor;
353use std::collections::HashMap;
354
355#[derive(Debug, Clone)]
356pub struct SubgraphNode {
357 pub id: String,
358 pub graph_def: GraphDef,
359 pub input_mapping: HashMap<String, String>, pub output_mapping: HashMap<String, String>, }
362
363impl SubgraphNode {
364 pub fn new(
365 id: impl Into<String>,
366 graph_def: GraphDef,
367 ) -> Self {
368 Self {
369 id: id.into(),
370 graph_def,
371 input_mapping: HashMap::new(),
372 output_mapping: HashMap::new(),
373 }
374 }
375
376 pub fn with_input_mapping(mut self, mapping: HashMap<String, String>) -> Self {
377 self.input_mapping = mapping;
378 self
379 }
380
381 pub fn with_output_mapping(mut self, mapping: HashMap<String, String>) -> Self {
382 self.output_mapping = mapping;
383 self
384 }
385}
386
387#[async_trait]
388impl Node for SubgraphNode {
389 fn id(&self) -> &str {
390 &self.id
391 }
392
393 fn node_type(&self) -> NodeType {
394 NodeType::SubgraphNode
395 }
396
397 async fn run(&self, ctx: &mut Context) -> RuleResult {
398 info!("🔷 SubgraphNode[{}]: Executing nested graph with {} nodes",
399 self.id, self.graph_def.nodes.len());
400
401 let mut child_graph = Graph::new(self.graph_def.clone());
403
404 for (parent_key, child_key) in &self.input_mapping {
406 if let Some(value) = ctx.data.get(parent_key) {
407 debug!("SubgraphNode[{}]: Mapping {} -> {}", self.id, parent_key, child_key);
408 child_graph.context.data.insert(child_key.clone(), value.clone());
409 }
410 }
411
412 let mut executor = Executor::new();
414
415 executor.execute(&mut child_graph).await
421 .map_err(|e| crate::rule::RuleError::Eval(format!("Subgraph execution failed: {}", e)))?;
422
423 for (child_key, parent_key) in &self.output_mapping {
425 if let Some(value) = child_graph.context.data.get(child_key) {
426 debug!("SubgraphNode[{}]: Mapping output {} -> {}", self.id, child_key, parent_key);
427 ctx.data.insert(parent_key.clone(), value.clone());
428 }
429 }
430
431 let result = serde_json::json!({
433 "status": "completed",
434 "nodes_executed": self.graph_def.nodes.len()
435 });
436
437 ctx.data.insert(format!("{}_result", self.id), result.clone());
438
439 info!("✅ SubgraphNode[{}]: Completed successfully", self.id);
440 Ok(result)
441 }
442}
443
444#[derive(Debug, Clone)]
449pub struct ConditionalNode {
450 pub id: String,
451 pub condition: String,
452 pub true_branch: Option<String>, pub false_branch: Option<String>, }
455
456impl ConditionalNode {
457 pub fn new(
458 id: impl Into<String>,
459 condition: impl Into<String>,
460 ) -> Self {
461 Self {
462 id: id.into(),
463 condition: condition.into(),
464 true_branch: None,
465 false_branch: None,
466 }
467 }
468
469 pub fn with_branches(
470 mut self,
471 true_branch: impl Into<String>,
472 false_branch: impl Into<String>,
473 ) -> Self {
474 self.true_branch = Some(true_branch.into());
475 self.false_branch = Some(false_branch.into());
476 self
477 }
478}
479
480#[async_trait]
481impl Node for ConditionalNode {
482 fn id(&self) -> &str {
483 &self.id
484 }
485
486 fn node_type(&self) -> NodeType {
487 NodeType::ConditionalNode
488 }
489
490 async fn run(&self, ctx: &mut Context) -> RuleResult {
491 info!("🔀 ConditionalNode[{}]: Evaluating condition: {}", self.id, self.condition);
492
493 use crate::rule::Rule;
495 let rule = Rule::new(&self.id, &self.condition);
496 let eval_result = rule.evaluate(&ctx.data)?;
497
498 let condition_met = match eval_result {
499 Value::Bool(b) => b,
500 _ => false,
501 };
502
503 let selected_branch = if condition_met {
504 self.true_branch.as_ref()
505 } else {
506 self.false_branch.as_ref()
507 };
508
509 let result = serde_json::json!({
510 "condition_met": condition_met,
511 "selected_branch": selected_branch,
512 "condition": self.condition
513 });
514
515 ctx.data.insert(format!("{}_result", self.id), result.clone());
516 ctx.data.insert("_branch_taken".to_string(), Value::String(
517 selected_branch.cloned().unwrap_or_else(|| "none".to_string())
518 ));
519
520 info!("✅ ConditionalNode[{}]: Branch selected: {:?}",
521 self.id, selected_branch);
522
523 Ok(result)
524 }
525}
526
527#[derive(Debug, Clone)]
532pub struct LoopNode {
533 pub id: String,
534 pub condition: String, pub max_iterations: usize, pub body_node_id: Option<String>, pub collection_key: Option<String>, }
539
540impl LoopNode {
541 pub fn new_while(
542 id: impl Into<String>,
543 condition: impl Into<String>,
544 max_iterations: usize,
545 ) -> Self {
546 Self {
547 id: id.into(),
548 condition: condition.into(),
549 max_iterations,
550 body_node_id: None,
551 collection_key: None,
552 }
553 }
554
555 pub fn new_foreach(
556 id: impl Into<String>,
557 collection_key: impl Into<String>,
558 ) -> Self {
559 Self {
560 id: id.into(),
561 condition: "true".to_string(),
562 max_iterations: 10000,
563 body_node_id: None,
564 collection_key: Some(collection_key.into()),
565 }
566 }
567
568 pub fn with_body_node(mut self, body_node_id: impl Into<String>) -> Self {
569 self.body_node_id = Some(body_node_id.into());
570 self
571 }
572}
573
574#[async_trait]
575impl Node for LoopNode {
576 fn id(&self) -> &str {
577 &self.id
578 }
579
580 fn node_type(&self) -> NodeType {
581 NodeType::LoopNode
582 }
583
584 async fn run(&self, ctx: &mut Context) -> RuleResult {
585 info!("🔁 LoopNode[{}]: Starting loop execution", self.id);
586
587 let mut iterations = 0;
588 let mut loop_results = Vec::new();
589
590 if let Some(collection_key) = &self.collection_key {
592 let collection_clone = ctx.data.get(collection_key).cloned();
594
595 if let Some(collection) = collection_clone {
596 if let Some(array) = collection.as_array() {
597 info!("LoopNode[{}]: Iterating over collection with {} items",
598 self.id, array.len());
599
600 for (index, item) in array.iter().enumerate() {
601 if iterations >= self.max_iterations {
602 info!("⚠️ LoopNode[{}]: Max iterations ({}) reached",
603 self.id, self.max_iterations);
604 break;
605 }
606
607 ctx.data.insert("_loop_index".to_string(), Value::from(index));
608 ctx.data.insert("_loop_item".to_string(), item.clone());
609
610 loop_results.push(serde_json::json!({
614 "iteration": index,
615 "item": item
616 }));
617
618 iterations += 1;
619 }
620 }
621 }
622 } else {
623 use crate::rule::Rule;
625 let rule = Rule::new(&self.id, &self.condition);
626
627 while iterations < self.max_iterations {
628 let eval_result = rule.evaluate(&ctx.data)?;
629 let should_continue = match eval_result {
630 Value::Bool(b) => b,
631 _ => false,
632 };
633
634 if !should_continue {
635 break;
636 }
637
638 ctx.data.insert("_loop_iteration".to_string(), Value::from(iterations));
639
640 loop_results.push(serde_json::json!({
643 "iteration": iterations
644 }));
645
646 iterations += 1;
647 }
648 }
649
650 let result = serde_json::json!({
651 "iterations": iterations,
652 "completed": iterations < self.max_iterations,
653 "results": loop_results
654 });
655
656 ctx.data.insert(format!("{}_result", self.id), result.clone());
657
658 info!("✅ LoopNode[{}]: Completed {} iterations", self.id, iterations);
659 Ok(result)
660 }
661}
662
663#[derive(Debug, Clone)]
668pub struct TryCatchNode {
669 pub id: String,
670 pub try_node_id: String,
671 pub catch_node_id: Option<String>,
672 pub finally_node_id: Option<String>,
673}
674
675impl TryCatchNode {
676 pub fn new(
677 id: impl Into<String>,
678 try_node_id: impl Into<String>,
679 ) -> Self {
680 Self {
681 id: id.into(),
682 try_node_id: try_node_id.into(),
683 catch_node_id: None,
684 finally_node_id: None,
685 }
686 }
687
688 pub fn with_catch(mut self, catch_node_id: impl Into<String>) -> Self {
689 self.catch_node_id = Some(catch_node_id.into());
690 self
691 }
692
693 pub fn with_finally(mut self, finally_node_id: impl Into<String>) -> Self {
694 self.finally_node_id = Some(finally_node_id.into());
695 self
696 }
697}
698
699#[async_trait]
700impl Node for TryCatchNode {
701 fn id(&self) -> &str {
702 &self.id
703 }
704
705 fn node_type(&self) -> NodeType {
706 NodeType::TryCatchNode
707 }
708
709 async fn run(&self, ctx: &mut Context) -> RuleResult {
710 info!("🛡️ TryCatchNode[{}]: Executing try block", self.id);
711
712 let error_occurred = ctx.data.get("_simulate_error")
715 .and_then(|v| v.as_bool())
716 .unwrap_or(false);
717
718 let result = if error_occurred {
719 info!("⚠️ TryCatchNode[{}]: Error occurred, executing catch block", self.id);
720 ctx.data.insert("_error".to_string(), Value::String("Simulated error".to_string()));
721
722 serde_json::json!({
723 "status": "error_handled",
724 "try_node": self.try_node_id,
725 "catch_node": self.catch_node_id
726 })
727 } else {
728 serde_json::json!({
729 "status": "success",
730 "try_node": self.try_node_id
731 })
732 };
733
734 ctx.data.insert(format!("{}_result", self.id), result.clone());
737 Ok(result)
738 }
739}
740
741use tokio::time::{sleep, Duration};
746
747#[derive(Debug, Clone)]
748pub struct RetryNode {
749 pub id: String,
750 pub target_node_id: String,
751 pub max_retries: usize,
752 pub initial_delay_ms: u64,
753 pub backoff_multiplier: f64,
754}
755
756impl RetryNode {
757 pub fn new(
758 id: impl Into<String>,
759 target_node_id: impl Into<String>,
760 max_retries: usize,
761 ) -> Self {
762 Self {
763 id: id.into(),
764 target_node_id: target_node_id.into(),
765 max_retries,
766 initial_delay_ms: 100,
767 backoff_multiplier: 2.0,
768 }
769 }
770
771 pub fn with_backoff(mut self, initial_delay_ms: u64, multiplier: f64) -> Self {
772 self.initial_delay_ms = initial_delay_ms;
773 self.backoff_multiplier = multiplier;
774 self
775 }
776}
777
778#[async_trait]
779impl Node for RetryNode {
780 fn id(&self) -> &str {
781 &self.id
782 }
783
784 fn node_type(&self) -> NodeType {
785 NodeType::RetryNode
786 }
787
788 async fn run(&self, ctx: &mut Context) -> RuleResult {
789 info!("🔄 RetryNode[{}]: Starting with max {} retries",
790 self.id, self.max_retries);
791
792 let mut attempt = 0;
793 let mut delay_ms = self.initial_delay_ms;
794
795 while attempt <= self.max_retries {
796 let should_retry = ctx.data.get("_simulate_failure")
800 .and_then(|v| v.as_bool())
801 .unwrap_or(false) && attempt < self.max_retries;
802
803 if !should_retry {
804 let result = serde_json::json!({
805 "status": "success",
806 "attempts": attempt + 1,
807 "target_node": self.target_node_id
808 });
809 ctx.data.insert(format!("{}_result", self.id), result.clone());
810 info!("✅ RetryNode[{}]: Succeeded after {} attempts",
811 self.id, attempt + 1);
812 return Ok(result);
813 }
814
815 info!("⚠️ RetryNode[{}]: Attempt {} failed, retrying in {}ms",
816 self.id, attempt + 1, delay_ms);
817
818 sleep(Duration::from_millis(delay_ms)).await;
819 delay_ms = (delay_ms as f64 * self.backoff_multiplier) as u64;
820 attempt += 1;
821 }
822
823 let error_result = serde_json::json!({
824 "status": "failed",
825 "attempts": attempt,
826 "target_node": self.target_node_id
827 });
828
829 ctx.data.insert(format!("{}_result", self.id), error_result.clone());
830 info!("❌ RetryNode[{}]: Failed after {} attempts", self.id, attempt);
831
832 Ok(error_result)
833 }
834}
835
836#[derive(Debug, Clone)]
841pub struct CircuitBreakerNode {
842 pub id: String,
843 pub target_node_id: String,
844 pub failure_threshold: usize,
845 pub timeout_ms: u64,
846 pub half_open_timeout_ms: u64,
847}
848
849impl CircuitBreakerNode {
850 pub fn new(
851 id: impl Into<String>,
852 target_node_id: impl Into<String>,
853 failure_threshold: usize,
854 ) -> Self {
855 Self {
856 id: id.into(),
857 target_node_id: target_node_id.into(),
858 failure_threshold,
859 timeout_ms: 5000,
860 half_open_timeout_ms: 30000,
861 }
862 }
863}
864
865#[async_trait]
866impl Node for CircuitBreakerNode {
867 fn id(&self) -> &str {
868 &self.id
869 }
870
871 fn node_type(&self) -> NodeType {
872 NodeType::CircuitBreakerNode
873 }
874
875 async fn run(&self, ctx: &mut Context) -> RuleResult {
876 info!("⚡ CircuitBreakerNode[{}]: Checking circuit state", self.id);
877
878 let is_circuit_open = ctx.data.get("_circuit_open")
883 .and_then(|v| v.as_bool())
884 .unwrap_or(false);
885
886 if is_circuit_open {
887 info!("🚫 CircuitBreakerNode[{}]: Circuit is OPEN, fast-failing", self.id);
888 let result = serde_json::json!({
889 "status": "circuit_open",
890 "message": "Circuit breaker is open, request rejected"
891 });
892 ctx.data.insert(format!("{}_result", self.id), result.clone());
893 return Ok(result);
894 }
895
896 let result = serde_json::json!({
900 "status": "success",
901 "circuit_state": "closed",
902 "target_node": self.target_node_id
903 });
904
905 ctx.data.insert(format!("{}_result", self.id), result.clone());
906 info!("✅ CircuitBreakerNode[{}]: Request completed", self.id);
907
908 Ok(result)
909 }
910}