rust_logic_graph/node/
mod.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::sync::Arc;
5use tracing::{debug, info};
6
7use crate::core::Context;
8use crate::rule::RuleResult;
9
10#[derive(Debug, Serialize, Deserialize, Clone, Default)]
11pub enum NodeType {
12    #[default]
13    RuleNode,
14    DBNode,
15    AINode,
16    GrpcNode,
17    SubgraphNode,
18    ConditionalNode,
19    LoopNode,
20    TryCatchNode,
21    RetryNode,
22    CircuitBreakerNode,
23}
24
25#[async_trait]
26pub trait Node: Send + Sync {
27    fn id(&self) -> &str;
28    fn node_type(&self) -> NodeType;
29    async fn run(&self, ctx: &mut Context) -> RuleResult;
30}
31
32// ============================================================
33// RuleNode - Evaluates conditions and transforms data
34// ============================================================
35
36#[derive(Debug, Clone)]
37pub struct RuleNode {
38    pub id: String,
39    pub condition: String,
40}
41
42impl RuleNode {
43    pub fn new(id: impl Into<String>, condition: impl Into<String>) -> Self {
44        Self {
45            id: id.into(),
46            condition: condition.into(),
47        }
48    }
49}
50
51#[async_trait]
52impl Node for RuleNode {
53    fn id(&self) -> &str {
54        &self.id
55    }
56
57    fn node_type(&self) -> NodeType {
58        NodeType::RuleNode
59    }
60
61    async fn run(&self, ctx: &mut Context) -> RuleResult {
62        info!(
63            "RuleNode[{}]: Evaluating condition '{}'",
64            self.id, self.condition
65        );
66
67        // Simple condition evaluation (can be extended with proper parser)
68        let result = if self.condition == "true" {
69            Value::Bool(true)
70        } else if self.condition == "false" {
71            Value::Bool(false)
72        } else {
73            // Try to evaluate based on context
74            ctx.data
75                .get(&self.condition)
76                .cloned()
77                .unwrap_or(Value::Bool(true))
78        };
79
80        debug!("RuleNode[{}]: Result = {:?}", self.id, result);
81        ctx.data
82            .insert(format!("{}_result", self.id), result.clone());
83
84        Ok(result)
85    }
86}
87
88// ============================================================
89// DBNode - Database operations with pluggable executor
90// ============================================================
91
92/// Trait for database executors - implement this for MySQL, Postgres, etc.
93#[async_trait]
94pub trait DatabaseExecutor: Send + Sync {
95    /// Execute a query and return JSON result
96    async fn execute(&self, query: &str, params: &[&str]) -> Result<Value, String>;
97}
98
99/// Mock database executor (default for examples/testing)
100#[derive(Debug, Clone)]
101pub struct MockDatabaseExecutor;
102
103#[async_trait]
104impl DatabaseExecutor for MockDatabaseExecutor {
105    async fn execute(&self, query: &str, _params: &[&str]) -> Result<Value, String> {
106        // Simulate async DB operation
107        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
108
109        Ok(serde_json::json!({
110            "query": query,
111            "rows": [
112                {"id": 1, "name": "Alice", "active": true},
113                {"id": 2, "name": "Bob", "active": false}
114            ],
115            "count": 2
116        }))
117    }
118}
119
120#[derive(Clone)]
121pub struct DBNode {
122    pub id: String,
123    pub query: String,
124    executor: Option<Arc<dyn DatabaseExecutor>>,
125    /// Context keys to extract as query parameters
126    param_keys: Option<Vec<String>>,
127}
128
129impl DBNode {
130    pub fn new(id: impl Into<String>, query: impl Into<String>) -> Self {
131        Self {
132            id: id.into(),
133            query: query.into(),
134            executor: None,
135            param_keys: None,
136        }
137    }
138
139    /// Create DBNode with parameter keys to extract from context
140    pub fn with_params(
141        id: impl Into<String>,
142        query: impl Into<String>,
143        param_keys: Vec<String>,
144    ) -> Self {
145        Self {
146            id: id.into(),
147            query: query.into(),
148            executor: None,
149            param_keys: Some(param_keys),
150        }
151    }
152
153    /// Create DBNode with custom executor (MySQL, Postgres, etc.)
154    pub fn with_executor(
155        id: impl Into<String>,
156        query: impl Into<String>,
157        executor: Arc<dyn DatabaseExecutor>,
158    ) -> Self {
159        Self {
160            id: id.into(),
161            query: query.into(),
162            executor: Some(executor),
163            param_keys: None,
164        }
165    }
166
167    /// Create DBNode with custom executor and parameter keys
168    pub fn with_executor_and_params(
169        id: impl Into<String>,
170        query: impl Into<String>,
171        executor: Arc<dyn DatabaseExecutor>,
172        param_keys: Vec<String>,
173    ) -> Self {
174        Self {
175            id: id.into(),
176            query: query.into(),
177            executor: Some(executor),
178            param_keys: Some(param_keys),
179        }
180    }
181}
182
183impl std::fmt::Debug for DBNode {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("DBNode")
186            .field("id", &self.id)
187            .field("query", &self.query)
188            .field("has_executor", &self.executor.is_some())
189            .field("param_keys", &self.param_keys)
190            .finish()
191    }
192}
193
194#[async_trait]
195impl Node for DBNode {
196    fn id(&self) -> &str {
197        &self.id
198    }
199
200    fn node_type(&self) -> NodeType {
201        NodeType::DBNode
202    }
203
204    async fn run(&self, ctx: &mut Context) -> RuleResult {
205        info!("DBNode[{}]: Executing query '{}'", self.id, self.query);
206
207        let executor = self
208            .executor
209            .as_ref()
210            .map(|e| e.clone())
211            .unwrap_or_else(|| Arc::new(MockDatabaseExecutor) as Arc<dyn DatabaseExecutor>);
212
213        // Extract params from context based on param_keys
214        let params: Vec<String> = if let Some(keys) = &self.param_keys {
215            keys.iter()
216                .filter_map(|key| {
217                    ctx.get(key).map(|value| {
218                        // Convert JSON value to string for SQL binding
219                        match value {
220                            Value::String(s) => s.clone(),
221                            Value::Number(n) => n.to_string(),
222                            Value::Bool(b) => b.to_string(),
223                            Value::Null => "null".to_string(),
224                            _ => value.to_string(),
225                        }
226                    })
227                })
228                .collect()
229        } else {
230            vec![]
231        };
232
233        if !params.is_empty() {
234            debug!(
235                "DBNode[{}]: Using {} parameter(s) from context: {:?}",
236                self.id,
237                params.len(),
238                params
239            );
240        }
241
242        let params_refs: Vec<&str> = params.iter().map(|s| s.as_str()).collect();
243
244        let result = executor
245            .execute(&self.query, &params_refs)
246            .await
247            .map_err(|e| crate::rule::RuleError::Eval(format!("Database error: {}", e)))?;
248
249        debug!("DBNode[{}]: Query result = {:?}", self.id, result);
250        ctx.data
251            .insert(format!("{}_result", self.id), result.clone());
252
253        Ok(result)
254    }
255}
256
257// ============================================================
258// AINode - Simulates AI/LLM operations
259// ============================================================
260
261#[derive(Debug, Clone)]
262pub struct AINode {
263    pub id: String,
264    pub prompt: String,
265}
266
267impl AINode {
268    pub fn new(id: impl Into<String>, prompt: impl Into<String>) -> Self {
269        Self {
270            id: id.into(),
271            prompt: prompt.into(),
272        }
273    }
274}
275
276#[async_trait]
277impl Node for AINode {
278    fn id(&self) -> &str {
279        &self.id
280    }
281
282    fn node_type(&self) -> NodeType {
283        NodeType::AINode
284    }
285
286    async fn run(&self, ctx: &mut Context) -> RuleResult {
287        info!("AINode[{}]: Processing prompt '{}'", self.id, self.prompt);
288
289        // Simulate async AI API call
290        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
291
292        // Mock AI response based on context
293        let context_summary: Vec<String> = ctx.data.keys().cloned().collect();
294        let mock_response = serde_json::json!({
295            "prompt": self.prompt,
296            "response": format!("AI processed: {} with context keys: {:?}", self.prompt, context_summary),
297            "confidence": 0.95,
298            "model": "mock-gpt-4"
299        });
300
301        debug!("AINode[{}]: AI response = {:?}", self.id, mock_response);
302        ctx.data
303            .insert(format!("{}_result", self.id), mock_response.clone());
304
305        Ok(mock_response)
306    }
307}
308
309// ============================================================
310// GrpcNode - Calls gRPC services
311// ============================================================
312
313#[derive(Debug, Clone)]
314pub struct GrpcNode {
315    pub id: String,
316    pub service_url: String,
317    pub method: String,
318}
319
320impl GrpcNode {
321    pub fn new(
322        id: impl Into<String>,
323        service_url: impl Into<String>,
324        method: impl Into<String>,
325    ) -> Self {
326        Self {
327            id: id.into(),
328            service_url: service_url.into(),
329            method: method.into(),
330        }
331    }
332}
333
334#[async_trait]
335impl Node for GrpcNode {
336    fn id(&self) -> &str {
337        &self.id
338    }
339
340    fn node_type(&self) -> NodeType {
341        NodeType::GrpcNode
342    }
343
344    async fn run(&self, ctx: &mut Context) -> RuleResult {
345        info!(
346            "GrpcNode[{}]: Calling gRPC service '{}' method '{}'",
347            self.id, self.service_url, self.method
348        );
349
350        // Simulate async gRPC call
351        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
352
353        // Mock gRPC response
354        let mock_response = serde_json::json!({
355            "service": self.service_url,
356            "method": self.method,
357            "status": "OK",
358            "response": format!("gRPC call to {} completed", self.method),
359            "latency_ms": 100
360        });
361
362        debug!("GrpcNode[{}]: gRPC response = {:?}", self.id, mock_response);
363        ctx.data
364            .insert(format!("{}_result", self.id), mock_response.clone());
365
366        Ok(mock_response)
367    }
368}
369
370// ============================================================
371// SubgraphNode - Nested graph execution for reusable components
372// ============================================================
373
374use crate::core::executor::Executor;
375use crate::core::{Graph, GraphDef};
376use std::collections::HashMap;
377
378#[derive(Debug, Clone)]
379pub struct SubgraphNode {
380    pub id: String,
381    pub graph_def: GraphDef,
382    pub input_mapping: HashMap<String, String>, // parent_key -> child_key
383    pub output_mapping: HashMap<String, String>, // child_key -> parent_key
384}
385
386impl SubgraphNode {
387    pub fn new(id: impl Into<String>, graph_def: GraphDef) -> Self {
388        Self {
389            id: id.into(),
390            graph_def,
391            input_mapping: HashMap::new(),
392            output_mapping: HashMap::new(),
393        }
394    }
395
396    pub fn with_input_mapping(mut self, mapping: HashMap<String, String>) -> Self {
397        self.input_mapping = mapping;
398        self
399    }
400
401    pub fn with_output_mapping(mut self, mapping: HashMap<String, String>) -> Self {
402        self.output_mapping = mapping;
403        self
404    }
405}
406
407#[async_trait]
408impl Node for SubgraphNode {
409    fn id(&self) -> &str {
410        &self.id
411    }
412
413    fn node_type(&self) -> NodeType {
414        NodeType::SubgraphNode
415    }
416
417    async fn run(&self, ctx: &mut Context) -> RuleResult {
418        info!(
419            "🔷 SubgraphNode[{}]: Executing nested graph with {} nodes",
420            self.id,
421            self.graph_def.nodes.len()
422        );
423
424        // Create child graph with mapped inputs
425        let mut child_graph = Graph::new(self.graph_def.clone());
426
427        // Map inputs from parent context to child context
428        for (parent_key, child_key) in &self.input_mapping {
429            if let Some(value) = ctx.data.get(parent_key) {
430                debug!(
431                    "SubgraphNode[{}]: Mapping {} -> {}",
432                    self.id, parent_key, child_key
433                );
434                child_graph
435                    .context
436                    .data
437                    .insert(child_key.clone(), value.clone());
438            }
439        }
440
441        // Create executor and register nodes from subgraph definition
442        let mut executor = Executor::new();
443
444        // TODO: Need to register nodes from graph_def
445        // This requires access to node constructors, which should be handled
446        // by a NodeFactory or similar pattern
447
448        // Execute child graph
449        executor.execute(&mut child_graph).await.map_err(|e| {
450            crate::rule::RuleError::Eval(format!("Subgraph execution failed: {}", e))
451        })?;
452
453        // Map outputs from child context back to parent context
454        for (child_key, parent_key) in &self.output_mapping {
455            if let Some(value) = child_graph.context.data.get(child_key) {
456                debug!(
457                    "SubgraphNode[{}]: Mapping output {} -> {}",
458                    self.id, child_key, parent_key
459                );
460                ctx.data.insert(parent_key.clone(), value.clone());
461            }
462        }
463
464        // Store subgraph result
465        let result = serde_json::json!({
466            "status": "completed",
467            "nodes_executed": self.graph_def.nodes.len()
468        });
469
470        ctx.data
471            .insert(format!("{}_result", self.id), result.clone());
472
473        info!("✅ SubgraphNode[{}]: Completed successfully", self.id);
474        Ok(result)
475    }
476}
477
478// ============================================================
479// ConditionalNode - If/else branching with dynamic routing
480// ============================================================
481
482#[derive(Debug, Clone)]
483pub struct ConditionalNode {
484    pub id: String,
485    pub condition: String,
486    pub true_branch: Option<String>,  // Node ID to route to if true
487    pub false_branch: Option<String>, // Node ID to route to if false
488}
489
490impl ConditionalNode {
491    pub fn new(id: impl Into<String>, condition: impl Into<String>) -> Self {
492        Self {
493            id: id.into(),
494            condition: condition.into(),
495            true_branch: None,
496            false_branch: None,
497        }
498    }
499
500    pub fn with_branches(
501        mut self,
502        true_branch: impl Into<String>,
503        false_branch: impl Into<String>,
504    ) -> Self {
505        self.true_branch = Some(true_branch.into());
506        self.false_branch = Some(false_branch.into());
507        self
508    }
509}
510
511#[async_trait]
512impl Node for ConditionalNode {
513    fn id(&self) -> &str {
514        &self.id
515    }
516
517    fn node_type(&self) -> NodeType {
518        NodeType::ConditionalNode
519    }
520
521    async fn run(&self, ctx: &mut Context) -> RuleResult {
522        info!(
523            "🔀 ConditionalNode[{}]: Evaluating condition: {}",
524            self.id, self.condition
525        );
526
527        // Use existing rule engine to evaluate condition
528        use crate::rule::Rule;
529        let rule = Rule::new(&self.id, &self.condition);
530        let eval_result = rule.evaluate(&ctx.data)?;
531
532        let condition_met = match eval_result {
533            Value::Bool(b) => b,
534            _ => false,
535        };
536
537        let selected_branch = if condition_met {
538            self.true_branch.as_ref()
539        } else {
540            self.false_branch.as_ref()
541        };
542
543        let result = serde_json::json!({
544            "condition_met": condition_met,
545            "selected_branch": selected_branch,
546            "condition": self.condition
547        });
548
549        ctx.data
550            .insert(format!("{}_result", self.id), result.clone());
551        ctx.data.insert(
552            "_branch_taken".to_string(),
553            Value::String(
554                selected_branch
555                    .cloned()
556                    .unwrap_or_else(|| "none".to_string()),
557            ),
558        );
559
560        info!(
561            "✅ ConditionalNode[{}]: Branch selected: {:?}",
562            self.id, selected_branch
563        );
564
565        Ok(result)
566    }
567}
568
569// ============================================================
570// LoopNode - While loops and collection iteration
571// ============================================================
572
573#[derive(Debug, Clone)]
574pub struct LoopNode {
575    pub id: String,
576    pub condition: String,              // Loop while this is true
577    pub max_iterations: usize,          // Safety limit
578    pub body_node_id: Option<String>,   // Node to execute in loop body
579    pub collection_key: Option<String>, // For iterating over arrays
580}
581
582impl LoopNode {
583    pub fn new_while(
584        id: impl Into<String>,
585        condition: impl Into<String>,
586        max_iterations: usize,
587    ) -> Self {
588        Self {
589            id: id.into(),
590            condition: condition.into(),
591            max_iterations,
592            body_node_id: None,
593            collection_key: None,
594        }
595    }
596
597    pub fn new_foreach(id: impl Into<String>, collection_key: impl Into<String>) -> Self {
598        Self {
599            id: id.into(),
600            condition: "true".to_string(),
601            max_iterations: 10000,
602            body_node_id: None,
603            collection_key: Some(collection_key.into()),
604        }
605    }
606
607    pub fn with_body_node(mut self, body_node_id: impl Into<String>) -> Self {
608        self.body_node_id = Some(body_node_id.into());
609        self
610    }
611}
612
613#[async_trait]
614impl Node for LoopNode {
615    fn id(&self) -> &str {
616        &self.id
617    }
618
619    fn node_type(&self) -> NodeType {
620        NodeType::LoopNode
621    }
622
623    async fn run(&self, ctx: &mut Context) -> RuleResult {
624        info!("🔁 LoopNode[{}]: Starting loop execution", self.id);
625
626        let mut iterations = 0;
627        let mut loop_results = Vec::new();
628
629        // Check if this is a collection iteration
630        if let Some(collection_key) = &self.collection_key {
631            // Clone collection first to avoid borrow checker issues
632            let collection_clone = ctx.data.get(collection_key).cloned();
633
634            if let Some(collection) = collection_clone {
635                if let Some(array) = collection.as_array() {
636                    info!(
637                        "LoopNode[{}]: Iterating over collection with {} items",
638                        self.id,
639                        array.len()
640                    );
641
642                    for (index, item) in array.iter().enumerate() {
643                        if iterations >= self.max_iterations {
644                            info!(
645                                "⚠️  LoopNode[{}]: Max iterations ({}) reached",
646                                self.id, self.max_iterations
647                            );
648                            break;
649                        }
650
651                        ctx.data
652                            .insert("_loop_index".to_string(), Value::from(index));
653                        ctx.data.insert("_loop_item".to_string(), item.clone());
654
655                        // TODO: Execute body node if specified
656                        // This requires access to executor or node registry
657
658                        loop_results.push(serde_json::json!({
659                            "iteration": index,
660                            "item": item
661                        }));
662
663                        iterations += 1;
664                    }
665                }
666            }
667        } else {
668            // While loop
669            use crate::rule::Rule;
670            let rule = Rule::new(&self.id, &self.condition);
671
672            while iterations < self.max_iterations {
673                let eval_result = rule.evaluate(&ctx.data)?;
674                let should_continue = match eval_result {
675                    Value::Bool(b) => b,
676                    _ => false,
677                };
678
679                if !should_continue {
680                    break;
681                }
682
683                ctx.data
684                    .insert("_loop_iteration".to_string(), Value::from(iterations));
685
686                // TODO: Execute body node if specified
687
688                loop_results.push(serde_json::json!({
689                    "iteration": iterations
690                }));
691
692                iterations += 1;
693            }
694        }
695
696        let result = serde_json::json!({
697            "iterations": iterations,
698            "completed": iterations < self.max_iterations,
699            "results": loop_results
700        });
701
702        ctx.data
703            .insert(format!("{}_result", self.id), result.clone());
704
705        info!(
706            "✅ LoopNode[{}]: Completed {} iterations",
707            self.id, iterations
708        );
709        Ok(result)
710    }
711}
712
713// ============================================================
714// TryCatchNode - Error handling with fallback
715// ============================================================
716
717#[derive(Debug, Clone)]
718pub struct TryCatchNode {
719    pub id: String,
720    pub try_node_id: String,
721    pub catch_node_id: Option<String>,
722    pub finally_node_id: Option<String>,
723}
724
725impl TryCatchNode {
726    pub fn new(id: impl Into<String>, try_node_id: impl Into<String>) -> Self {
727        Self {
728            id: id.into(),
729            try_node_id: try_node_id.into(),
730            catch_node_id: None,
731            finally_node_id: None,
732        }
733    }
734
735    pub fn with_catch(mut self, catch_node_id: impl Into<String>) -> Self {
736        self.catch_node_id = Some(catch_node_id.into());
737        self
738    }
739
740    pub fn with_finally(mut self, finally_node_id: impl Into<String>) -> Self {
741        self.finally_node_id = Some(finally_node_id.into());
742        self
743    }
744}
745
746#[async_trait]
747impl Node for TryCatchNode {
748    fn id(&self) -> &str {
749        &self.id
750    }
751
752    fn node_type(&self) -> NodeType {
753        NodeType::TryCatchNode
754    }
755
756    async fn run(&self, ctx: &mut Context) -> RuleResult {
757        info!("🛡️  TryCatchNode[{}]: Executing try block", self.id);
758
759        // TODO: Execute try_node_id
760        // For now, simulate success/failure based on context
761        let error_occurred = ctx
762            .data
763            .get("_simulate_error")
764            .and_then(|v| v.as_bool())
765            .unwrap_or(false);
766
767        let result = if error_occurred {
768            info!(
769                "⚠️  TryCatchNode[{}]: Error occurred, executing catch block",
770                self.id
771            );
772            ctx.data.insert(
773                "_error".to_string(),
774                Value::String("Simulated error".to_string()),
775            );
776
777            serde_json::json!({
778                "status": "error_handled",
779                "try_node": self.try_node_id,
780                "catch_node": self.catch_node_id
781            })
782        } else {
783            serde_json::json!({
784                "status": "success",
785                "try_node": self.try_node_id
786            })
787        };
788
789        // TODO: Execute finally block if specified
790
791        ctx.data
792            .insert(format!("{}_result", self.id), result.clone());
793        Ok(result)
794    }
795}
796
797// ============================================================
798// RetryNode - Automatic retry with exponential backoff
799// ============================================================
800
801use tokio::time::{sleep, Duration};
802
803#[derive(Debug, Clone)]
804pub struct RetryNode {
805    pub id: String,
806    pub target_node_id: String,
807    pub max_retries: usize,
808    pub initial_delay_ms: u64,
809    pub backoff_multiplier: f64,
810}
811
812impl RetryNode {
813    pub fn new(
814        id: impl Into<String>,
815        target_node_id: impl Into<String>,
816        max_retries: usize,
817    ) -> Self {
818        Self {
819            id: id.into(),
820            target_node_id: target_node_id.into(),
821            max_retries,
822            initial_delay_ms: 100,
823            backoff_multiplier: 2.0,
824        }
825    }
826
827    pub fn with_backoff(mut self, initial_delay_ms: u64, multiplier: f64) -> Self {
828        self.initial_delay_ms = initial_delay_ms;
829        self.backoff_multiplier = multiplier;
830        self
831    }
832}
833
834#[async_trait]
835impl Node for RetryNode {
836    fn id(&self) -> &str {
837        &self.id
838    }
839
840    fn node_type(&self) -> NodeType {
841        NodeType::RetryNode
842    }
843
844    async fn run(&self, ctx: &mut Context) -> RuleResult {
845        info!(
846            "🔄 RetryNode[{}]: Starting with max {} retries",
847            self.id, self.max_retries
848        );
849
850        let mut attempt = 0;
851        let mut delay_ms = self.initial_delay_ms;
852
853        while attempt <= self.max_retries {
854            // TODO: Execute target_node_id
855            // For now, simulate retry logic
856
857            let should_retry = ctx
858                .data
859                .get("_simulate_failure")
860                .and_then(|v| v.as_bool())
861                .unwrap_or(false)
862                && attempt < self.max_retries;
863
864            if !should_retry {
865                let result = serde_json::json!({
866                    "status": "success",
867                    "attempts": attempt + 1,
868                    "target_node": self.target_node_id
869                });
870                ctx.data
871                    .insert(format!("{}_result", self.id), result.clone());
872                info!(
873                    "✅ RetryNode[{}]: Succeeded after {} attempts",
874                    self.id,
875                    attempt + 1
876                );
877                return Ok(result);
878            }
879
880            info!(
881                "⚠️  RetryNode[{}]: Attempt {} failed, retrying in {}ms",
882                self.id,
883                attempt + 1,
884                delay_ms
885            );
886
887            sleep(Duration::from_millis(delay_ms)).await;
888            delay_ms = (delay_ms as f64 * self.backoff_multiplier) as u64;
889            attempt += 1;
890        }
891
892        let error_result = serde_json::json!({
893            "status": "failed",
894            "attempts": attempt,
895            "target_node": self.target_node_id
896        });
897
898        ctx.data
899            .insert(format!("{}_result", self.id), error_result.clone());
900        info!(
901            "❌ RetryNode[{}]: Failed after {} attempts",
902            self.id, attempt
903        );
904
905        Ok(error_result)
906    }
907}
908
909// ============================================================
910// CircuitBreakerNode - Prevent cascading failures
911// ============================================================
912
913#[derive(Debug, Clone)]
914pub struct CircuitBreakerNode {
915    pub id: String,
916    pub target_node_id: String,
917    pub failure_threshold: usize,
918    pub timeout_ms: u64,
919    pub half_open_timeout_ms: u64,
920}
921
922impl CircuitBreakerNode {
923    pub fn new(
924        id: impl Into<String>,
925        target_node_id: impl Into<String>,
926        failure_threshold: usize,
927    ) -> Self {
928        Self {
929            id: id.into(),
930            target_node_id: target_node_id.into(),
931            failure_threshold,
932            timeout_ms: 5000,
933            half_open_timeout_ms: 30000,
934        }
935    }
936}
937
938#[async_trait]
939impl Node for CircuitBreakerNode {
940    fn id(&self) -> &str {
941        &self.id
942    }
943
944    fn node_type(&self) -> NodeType {
945        NodeType::CircuitBreakerNode
946    }
947
948    async fn run(&self, ctx: &mut Context) -> RuleResult {
949        info!("⚡ CircuitBreakerNode[{}]: Checking circuit state", self.id);
950
951        // TODO: Implement proper circuit breaker state machine
952        // States: Closed, Open, HalfOpen
953        // For now, simple implementation
954
955        let is_circuit_open = ctx
956            .data
957            .get("_circuit_open")
958            .and_then(|v| v.as_bool())
959            .unwrap_or(false);
960
961        if is_circuit_open {
962            info!(
963                "🚫 CircuitBreakerNode[{}]: Circuit is OPEN, fast-failing",
964                self.id
965            );
966            let result = serde_json::json!({
967                "status": "circuit_open",
968                "message": "Circuit breaker is open, request rejected"
969            });
970            ctx.data
971                .insert(format!("{}_result", self.id), result.clone());
972            return Ok(result);
973        }
974
975        // Circuit is closed, execute target node
976        // TODO: Execute target_node_id and track failures
977
978        let result = serde_json::json!({
979            "status": "success",
980            "circuit_state": "closed",
981            "target_node": self.target_node_id
982        });
983
984        ctx.data
985            .insert(format!("{}_result", self.id), result.clone());
986        info!("✅ CircuitBreakerNode[{}]: Request completed", self.id);
987
988        Ok(result)
989    }
990}