1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::sync::Arc;
5use tracing::{debug, info};
6
7use crate::core::Context;
8use crate::rule::RuleResult;
9
10#[derive(Debug, Serialize, Deserialize, Clone, Default)]
11pub enum NodeType {
12 #[default]
13 RuleNode,
14 DBNode,
15 AINode,
16 GrpcNode,
17 SubgraphNode,
18 ConditionalNode,
19 LoopNode,
20 TryCatchNode,
21 RetryNode,
22 CircuitBreakerNode,
23}
24
25#[async_trait]
26pub trait Node: Send + Sync {
27 fn id(&self) -> &str;
28 fn node_type(&self) -> NodeType;
29 async fn run(&self, ctx: &mut Context) -> RuleResult;
30}
31
32#[derive(Debug, Clone)]
37pub struct RuleNode {
38 pub id: String,
39 pub condition: String,
40}
41
42impl RuleNode {
43 pub fn new(id: impl Into<String>, condition: impl Into<String>) -> Self {
44 Self {
45 id: id.into(),
46 condition: condition.into(),
47 }
48 }
49}
50
51#[async_trait]
52impl Node for RuleNode {
53 fn id(&self) -> &str {
54 &self.id
55 }
56
57 fn node_type(&self) -> NodeType {
58 NodeType::RuleNode
59 }
60
61 async fn run(&self, ctx: &mut Context) -> RuleResult {
62 info!(
63 "RuleNode[{}]: Evaluating condition '{}'",
64 self.id, self.condition
65 );
66
67 let result = if self.condition == "true" {
69 Value::Bool(true)
70 } else if self.condition == "false" {
71 Value::Bool(false)
72 } else {
73 ctx.data
75 .get(&self.condition)
76 .cloned()
77 .unwrap_or(Value::Bool(true))
78 };
79
80 debug!("RuleNode[{}]: Result = {:?}", self.id, result);
81 ctx.data
82 .insert(format!("{}_result", self.id), result.clone());
83
84 Ok(result)
85 }
86}
87
88#[async_trait]
94pub trait DatabaseExecutor: Send + Sync {
95 async fn execute(&self, query: &str, params: &[&str]) -> Result<Value, String>;
97}
98
99#[derive(Debug, Clone)]
101pub struct MockDatabaseExecutor;
102
103#[async_trait]
104impl DatabaseExecutor for MockDatabaseExecutor {
105 async fn execute(&self, query: &str, _params: &[&str]) -> Result<Value, String> {
106 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
108
109 Ok(serde_json::json!({
110 "query": query,
111 "rows": [
112 {"id": 1, "name": "Alice", "active": true},
113 {"id": 2, "name": "Bob", "active": false}
114 ],
115 "count": 2
116 }))
117 }
118}
119
120#[derive(Clone)]
121pub struct DBNode {
122 pub id: String,
123 pub query: String,
124 executor: Option<Arc<dyn DatabaseExecutor>>,
125 param_keys: Option<Vec<String>>,
127}
128
129impl DBNode {
130 pub fn new(id: impl Into<String>, query: impl Into<String>) -> Self {
131 Self {
132 id: id.into(),
133 query: query.into(),
134 executor: None,
135 param_keys: None,
136 }
137 }
138
139 pub fn with_params(
141 id: impl Into<String>,
142 query: impl Into<String>,
143 param_keys: Vec<String>,
144 ) -> Self {
145 Self {
146 id: id.into(),
147 query: query.into(),
148 executor: None,
149 param_keys: Some(param_keys),
150 }
151 }
152
153 pub fn with_executor(
155 id: impl Into<String>,
156 query: impl Into<String>,
157 executor: Arc<dyn DatabaseExecutor>,
158 ) -> Self {
159 Self {
160 id: id.into(),
161 query: query.into(),
162 executor: Some(executor),
163 param_keys: None,
164 }
165 }
166
167 pub fn with_executor_and_params(
169 id: impl Into<String>,
170 query: impl Into<String>,
171 executor: Arc<dyn DatabaseExecutor>,
172 param_keys: Vec<String>,
173 ) -> Self {
174 Self {
175 id: id.into(),
176 query: query.into(),
177 executor: Some(executor),
178 param_keys: Some(param_keys),
179 }
180 }
181}
182
183impl std::fmt::Debug for DBNode {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 f.debug_struct("DBNode")
186 .field("id", &self.id)
187 .field("query", &self.query)
188 .field("has_executor", &self.executor.is_some())
189 .field("param_keys", &self.param_keys)
190 .finish()
191 }
192}
193
194#[async_trait]
195impl Node for DBNode {
196 fn id(&self) -> &str {
197 &self.id
198 }
199
200 fn node_type(&self) -> NodeType {
201 NodeType::DBNode
202 }
203
204 async fn run(&self, ctx: &mut Context) -> RuleResult {
205 info!("DBNode[{}]: Executing query '{}'", self.id, self.query);
206
207 let executor = self
208 .executor
209 .as_ref()
210 .map(|e| e.clone())
211 .unwrap_or_else(|| Arc::new(MockDatabaseExecutor) as Arc<dyn DatabaseExecutor>);
212
213 let params: Vec<String> = if let Some(keys) = &self.param_keys {
215 keys.iter()
216 .filter_map(|key| {
217 ctx.get(key).map(|value| {
218 match value {
220 Value::String(s) => s.clone(),
221 Value::Number(n) => n.to_string(),
222 Value::Bool(b) => b.to_string(),
223 Value::Null => "null".to_string(),
224 _ => value.to_string(),
225 }
226 })
227 })
228 .collect()
229 } else {
230 vec![]
231 };
232
233 if !params.is_empty() {
234 debug!(
235 "DBNode[{}]: Using {} parameter(s) from context: {:?}",
236 self.id,
237 params.len(),
238 params
239 );
240 }
241
242 let params_refs: Vec<&str> = params.iter().map(|s| s.as_str()).collect();
243
244 let result = executor
245 .execute(&self.query, ¶ms_refs)
246 .await
247 .map_err(|e| crate::rule::RuleError::Eval(format!("Database error: {}", e)))?;
248
249 debug!("DBNode[{}]: Query result = {:?}", self.id, result);
250 ctx.data
251 .insert(format!("{}_result", self.id), result.clone());
252
253 Ok(result)
254 }
255}
256
257#[derive(Debug, Clone)]
262pub struct AINode {
263 pub id: String,
264 pub prompt: String,
265}
266
267impl AINode {
268 pub fn new(id: impl Into<String>, prompt: impl Into<String>) -> Self {
269 Self {
270 id: id.into(),
271 prompt: prompt.into(),
272 }
273 }
274}
275
276#[async_trait]
277impl Node for AINode {
278 fn id(&self) -> &str {
279 &self.id
280 }
281
282 fn node_type(&self) -> NodeType {
283 NodeType::AINode
284 }
285
286 async fn run(&self, ctx: &mut Context) -> RuleResult {
287 info!("AINode[{}]: Processing prompt '{}'", self.id, self.prompt);
288
289 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
291
292 let context_summary: Vec<String> = ctx.data.keys().cloned().collect();
294 let mock_response = serde_json::json!({
295 "prompt": self.prompt,
296 "response": format!("AI processed: {} with context keys: {:?}", self.prompt, context_summary),
297 "confidence": 0.95,
298 "model": "mock-gpt-4"
299 });
300
301 debug!("AINode[{}]: AI response = {:?}", self.id, mock_response);
302 ctx.data
303 .insert(format!("{}_result", self.id), mock_response.clone());
304
305 Ok(mock_response)
306 }
307}
308
309#[derive(Debug, Clone)]
314pub struct GrpcNode {
315 pub id: String,
316 pub service_url: String,
317 pub method: String,
318}
319
320impl GrpcNode {
321 pub fn new(
322 id: impl Into<String>,
323 service_url: impl Into<String>,
324 method: impl Into<String>,
325 ) -> Self {
326 Self {
327 id: id.into(),
328 service_url: service_url.into(),
329 method: method.into(),
330 }
331 }
332}
333
334#[async_trait]
335impl Node for GrpcNode {
336 fn id(&self) -> &str {
337 &self.id
338 }
339
340 fn node_type(&self) -> NodeType {
341 NodeType::GrpcNode
342 }
343
344 async fn run(&self, ctx: &mut Context) -> RuleResult {
345 info!(
346 "GrpcNode[{}]: Calling gRPC service '{}' method '{}'",
347 self.id, self.service_url, self.method
348 );
349
350 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
352
353 let mock_response = serde_json::json!({
355 "service": self.service_url,
356 "method": self.method,
357 "status": "OK",
358 "response": format!("gRPC call to {} completed", self.method),
359 "latency_ms": 100
360 });
361
362 debug!("GrpcNode[{}]: gRPC response = {:?}", self.id, mock_response);
363 ctx.data
364 .insert(format!("{}_result", self.id), mock_response.clone());
365
366 Ok(mock_response)
367 }
368}
369
370use crate::core::executor::Executor;
375use crate::core::{Graph, GraphDef};
376use std::collections::HashMap;
377
378#[derive(Debug, Clone)]
379pub struct SubgraphNode {
380 pub id: String,
381 pub graph_def: GraphDef,
382 pub input_mapping: HashMap<String, String>, pub output_mapping: HashMap<String, String>, }
385
386impl SubgraphNode {
387 pub fn new(id: impl Into<String>, graph_def: GraphDef) -> Self {
388 Self {
389 id: id.into(),
390 graph_def,
391 input_mapping: HashMap::new(),
392 output_mapping: HashMap::new(),
393 }
394 }
395
396 pub fn with_input_mapping(mut self, mapping: HashMap<String, String>) -> Self {
397 self.input_mapping = mapping;
398 self
399 }
400
401 pub fn with_output_mapping(mut self, mapping: HashMap<String, String>) -> Self {
402 self.output_mapping = mapping;
403 self
404 }
405}
406
407#[async_trait]
408impl Node for SubgraphNode {
409 fn id(&self) -> &str {
410 &self.id
411 }
412
413 fn node_type(&self) -> NodeType {
414 NodeType::SubgraphNode
415 }
416
417 async fn run(&self, ctx: &mut Context) -> RuleResult {
418 info!(
419 "🔷 SubgraphNode[{}]: Executing nested graph with {} nodes",
420 self.id,
421 self.graph_def.nodes.len()
422 );
423
424 let mut child_graph = Graph::new(self.graph_def.clone());
426
427 for (parent_key, child_key) in &self.input_mapping {
429 if let Some(value) = ctx.data.get(parent_key) {
430 debug!(
431 "SubgraphNode[{}]: Mapping {} -> {}",
432 self.id, parent_key, child_key
433 );
434 child_graph
435 .context
436 .data
437 .insert(child_key.clone(), value.clone());
438 }
439 }
440
441 let mut executor = Executor::new();
443
444 executor.execute(&mut child_graph).await.map_err(|e| {
450 crate::rule::RuleError::Eval(format!("Subgraph execution failed: {}", e))
451 })?;
452
453 for (child_key, parent_key) in &self.output_mapping {
455 if let Some(value) = child_graph.context.data.get(child_key) {
456 debug!(
457 "SubgraphNode[{}]: Mapping output {} -> {}",
458 self.id, child_key, parent_key
459 );
460 ctx.data.insert(parent_key.clone(), value.clone());
461 }
462 }
463
464 let result = serde_json::json!({
466 "status": "completed",
467 "nodes_executed": self.graph_def.nodes.len()
468 });
469
470 ctx.data
471 .insert(format!("{}_result", self.id), result.clone());
472
473 info!("✅ SubgraphNode[{}]: Completed successfully", self.id);
474 Ok(result)
475 }
476}
477
478#[derive(Debug, Clone)]
483pub struct ConditionalNode {
484 pub id: String,
485 pub condition: String,
486 pub true_branch: Option<String>, pub false_branch: Option<String>, }
489
490impl ConditionalNode {
491 pub fn new(id: impl Into<String>, condition: impl Into<String>) -> Self {
492 Self {
493 id: id.into(),
494 condition: condition.into(),
495 true_branch: None,
496 false_branch: None,
497 }
498 }
499
500 pub fn with_branches(
501 mut self,
502 true_branch: impl Into<String>,
503 false_branch: impl Into<String>,
504 ) -> Self {
505 self.true_branch = Some(true_branch.into());
506 self.false_branch = Some(false_branch.into());
507 self
508 }
509}
510
511#[async_trait]
512impl Node for ConditionalNode {
513 fn id(&self) -> &str {
514 &self.id
515 }
516
517 fn node_type(&self) -> NodeType {
518 NodeType::ConditionalNode
519 }
520
521 async fn run(&self, ctx: &mut Context) -> RuleResult {
522 info!(
523 "🔀 ConditionalNode[{}]: Evaluating condition: {}",
524 self.id, self.condition
525 );
526
527 use crate::rule::Rule;
529 let rule = Rule::new(&self.id, &self.condition);
530 let eval_result = rule.evaluate(&ctx.data)?;
531
532 let condition_met = match eval_result {
533 Value::Bool(b) => b,
534 _ => false,
535 };
536
537 let selected_branch = if condition_met {
538 self.true_branch.as_ref()
539 } else {
540 self.false_branch.as_ref()
541 };
542
543 let result = serde_json::json!({
544 "condition_met": condition_met,
545 "selected_branch": selected_branch,
546 "condition": self.condition
547 });
548
549 ctx.data
550 .insert(format!("{}_result", self.id), result.clone());
551 ctx.data.insert(
552 "_branch_taken".to_string(),
553 Value::String(
554 selected_branch
555 .cloned()
556 .unwrap_or_else(|| "none".to_string()),
557 ),
558 );
559
560 info!(
561 "✅ ConditionalNode[{}]: Branch selected: {:?}",
562 self.id, selected_branch
563 );
564
565 Ok(result)
566 }
567}
568
569#[derive(Debug, Clone)]
574pub struct LoopNode {
575 pub id: String,
576 pub condition: String, pub max_iterations: usize, pub body_node_id: Option<String>, pub collection_key: Option<String>, }
581
582impl LoopNode {
583 pub fn new_while(
584 id: impl Into<String>,
585 condition: impl Into<String>,
586 max_iterations: usize,
587 ) -> Self {
588 Self {
589 id: id.into(),
590 condition: condition.into(),
591 max_iterations,
592 body_node_id: None,
593 collection_key: None,
594 }
595 }
596
597 pub fn new_foreach(id: impl Into<String>, collection_key: impl Into<String>) -> Self {
598 Self {
599 id: id.into(),
600 condition: "true".to_string(),
601 max_iterations: 10000,
602 body_node_id: None,
603 collection_key: Some(collection_key.into()),
604 }
605 }
606
607 pub fn with_body_node(mut self, body_node_id: impl Into<String>) -> Self {
608 self.body_node_id = Some(body_node_id.into());
609 self
610 }
611}
612
613#[async_trait]
614impl Node for LoopNode {
615 fn id(&self) -> &str {
616 &self.id
617 }
618
619 fn node_type(&self) -> NodeType {
620 NodeType::LoopNode
621 }
622
623 async fn run(&self, ctx: &mut Context) -> RuleResult {
624 info!("🔁 LoopNode[{}]: Starting loop execution", self.id);
625
626 let mut iterations = 0;
627 let mut loop_results = Vec::new();
628
629 if let Some(collection_key) = &self.collection_key {
631 let collection_clone = ctx.data.get(collection_key).cloned();
633
634 if let Some(collection) = collection_clone {
635 if let Some(array) = collection.as_array() {
636 info!(
637 "LoopNode[{}]: Iterating over collection with {} items",
638 self.id,
639 array.len()
640 );
641
642 for (index, item) in array.iter().enumerate() {
643 if iterations >= self.max_iterations {
644 info!(
645 "⚠️ LoopNode[{}]: Max iterations ({}) reached",
646 self.id, self.max_iterations
647 );
648 break;
649 }
650
651 ctx.data
652 .insert("_loop_index".to_string(), Value::from(index));
653 ctx.data.insert("_loop_item".to_string(), item.clone());
654
655 loop_results.push(serde_json::json!({
659 "iteration": index,
660 "item": item
661 }));
662
663 iterations += 1;
664 }
665 }
666 }
667 } else {
668 use crate::rule::Rule;
670 let rule = Rule::new(&self.id, &self.condition);
671
672 while iterations < self.max_iterations {
673 let eval_result = rule.evaluate(&ctx.data)?;
674 let should_continue = match eval_result {
675 Value::Bool(b) => b,
676 _ => false,
677 };
678
679 if !should_continue {
680 break;
681 }
682
683 ctx.data
684 .insert("_loop_iteration".to_string(), Value::from(iterations));
685
686 loop_results.push(serde_json::json!({
689 "iteration": iterations
690 }));
691
692 iterations += 1;
693 }
694 }
695
696 let result = serde_json::json!({
697 "iterations": iterations,
698 "completed": iterations < self.max_iterations,
699 "results": loop_results
700 });
701
702 ctx.data
703 .insert(format!("{}_result", self.id), result.clone());
704
705 info!(
706 "✅ LoopNode[{}]: Completed {} iterations",
707 self.id, iterations
708 );
709 Ok(result)
710 }
711}
712
713#[derive(Debug, Clone)]
718pub struct TryCatchNode {
719 pub id: String,
720 pub try_node_id: String,
721 pub catch_node_id: Option<String>,
722 pub finally_node_id: Option<String>,
723}
724
725impl TryCatchNode {
726 pub fn new(id: impl Into<String>, try_node_id: impl Into<String>) -> Self {
727 Self {
728 id: id.into(),
729 try_node_id: try_node_id.into(),
730 catch_node_id: None,
731 finally_node_id: None,
732 }
733 }
734
735 pub fn with_catch(mut self, catch_node_id: impl Into<String>) -> Self {
736 self.catch_node_id = Some(catch_node_id.into());
737 self
738 }
739
740 pub fn with_finally(mut self, finally_node_id: impl Into<String>) -> Self {
741 self.finally_node_id = Some(finally_node_id.into());
742 self
743 }
744}
745
746#[async_trait]
747impl Node for TryCatchNode {
748 fn id(&self) -> &str {
749 &self.id
750 }
751
752 fn node_type(&self) -> NodeType {
753 NodeType::TryCatchNode
754 }
755
756 async fn run(&self, ctx: &mut Context) -> RuleResult {
757 info!("🛡️ TryCatchNode[{}]: Executing try block", self.id);
758
759 let error_occurred = ctx
762 .data
763 .get("_simulate_error")
764 .and_then(|v| v.as_bool())
765 .unwrap_or(false);
766
767 let result = if error_occurred {
768 info!(
769 "⚠️ TryCatchNode[{}]: Error occurred, executing catch block",
770 self.id
771 );
772 ctx.data.insert(
773 "_error".to_string(),
774 Value::String("Simulated error".to_string()),
775 );
776
777 serde_json::json!({
778 "status": "error_handled",
779 "try_node": self.try_node_id,
780 "catch_node": self.catch_node_id
781 })
782 } else {
783 serde_json::json!({
784 "status": "success",
785 "try_node": self.try_node_id
786 })
787 };
788
789 ctx.data
792 .insert(format!("{}_result", self.id), result.clone());
793 Ok(result)
794 }
795}
796
797use tokio::time::{sleep, Duration};
802
803#[derive(Debug, Clone)]
804pub struct RetryNode {
805 pub id: String,
806 pub target_node_id: String,
807 pub max_retries: usize,
808 pub initial_delay_ms: u64,
809 pub backoff_multiplier: f64,
810}
811
812impl RetryNode {
813 pub fn new(
814 id: impl Into<String>,
815 target_node_id: impl Into<String>,
816 max_retries: usize,
817 ) -> Self {
818 Self {
819 id: id.into(),
820 target_node_id: target_node_id.into(),
821 max_retries,
822 initial_delay_ms: 100,
823 backoff_multiplier: 2.0,
824 }
825 }
826
827 pub fn with_backoff(mut self, initial_delay_ms: u64, multiplier: f64) -> Self {
828 self.initial_delay_ms = initial_delay_ms;
829 self.backoff_multiplier = multiplier;
830 self
831 }
832}
833
834#[async_trait]
835impl Node for RetryNode {
836 fn id(&self) -> &str {
837 &self.id
838 }
839
840 fn node_type(&self) -> NodeType {
841 NodeType::RetryNode
842 }
843
844 async fn run(&self, ctx: &mut Context) -> RuleResult {
845 info!(
846 "🔄 RetryNode[{}]: Starting with max {} retries",
847 self.id, self.max_retries
848 );
849
850 let mut attempt = 0;
851 let mut delay_ms = self.initial_delay_ms;
852
853 while attempt <= self.max_retries {
854 let should_retry = ctx
858 .data
859 .get("_simulate_failure")
860 .and_then(|v| v.as_bool())
861 .unwrap_or(false)
862 && attempt < self.max_retries;
863
864 if !should_retry {
865 let result = serde_json::json!({
866 "status": "success",
867 "attempts": attempt + 1,
868 "target_node": self.target_node_id
869 });
870 ctx.data
871 .insert(format!("{}_result", self.id), result.clone());
872 info!(
873 "✅ RetryNode[{}]: Succeeded after {} attempts",
874 self.id,
875 attempt + 1
876 );
877 return Ok(result);
878 }
879
880 info!(
881 "⚠️ RetryNode[{}]: Attempt {} failed, retrying in {}ms",
882 self.id,
883 attempt + 1,
884 delay_ms
885 );
886
887 sleep(Duration::from_millis(delay_ms)).await;
888 delay_ms = (delay_ms as f64 * self.backoff_multiplier) as u64;
889 attempt += 1;
890 }
891
892 let error_result = serde_json::json!({
893 "status": "failed",
894 "attempts": attempt,
895 "target_node": self.target_node_id
896 });
897
898 ctx.data
899 .insert(format!("{}_result", self.id), error_result.clone());
900 info!(
901 "❌ RetryNode[{}]: Failed after {} attempts",
902 self.id, attempt
903 );
904
905 Ok(error_result)
906 }
907}
908
909#[derive(Debug, Clone)]
914pub struct CircuitBreakerNode {
915 pub id: String,
916 pub target_node_id: String,
917 pub failure_threshold: usize,
918 pub timeout_ms: u64,
919 pub half_open_timeout_ms: u64,
920}
921
922impl CircuitBreakerNode {
923 pub fn new(
924 id: impl Into<String>,
925 target_node_id: impl Into<String>,
926 failure_threshold: usize,
927 ) -> Self {
928 Self {
929 id: id.into(),
930 target_node_id: target_node_id.into(),
931 failure_threshold,
932 timeout_ms: 5000,
933 half_open_timeout_ms: 30000,
934 }
935 }
936}
937
938#[async_trait]
939impl Node for CircuitBreakerNode {
940 fn id(&self) -> &str {
941 &self.id
942 }
943
944 fn node_type(&self) -> NodeType {
945 NodeType::CircuitBreakerNode
946 }
947
948 async fn run(&self, ctx: &mut Context) -> RuleResult {
949 info!("⚡ CircuitBreakerNode[{}]: Checking circuit state", self.id);
950
951 let is_circuit_open = ctx
956 .data
957 .get("_circuit_open")
958 .and_then(|v| v.as_bool())
959 .unwrap_or(false);
960
961 if is_circuit_open {
962 info!(
963 "🚫 CircuitBreakerNode[{}]: Circuit is OPEN, fast-failing",
964 self.id
965 );
966 let result = serde_json::json!({
967 "status": "circuit_open",
968 "message": "Circuit breaker is open, request rejected"
969 });
970 ctx.data
971 .insert(format!("{}_result", self.id), result.clone());
972 return Ok(result);
973 }
974
975 let result = serde_json::json!({
979 "status": "success",
980 "circuit_state": "closed",
981 "target_node": self.target_node_id
982 });
983
984 ctx.data
985 .insert(format!("{}_result", self.id), result.clone());
986 info!("✅ CircuitBreakerNode[{}]: Request completed", self.id);
987
988 Ok(result)
989 }
990}