rust_logic_graph/node/
mod.rs

1
2use async_trait::async_trait;
3use serde::{Serialize, Deserialize};
4use serde_json::Value;
5use tracing::{info, debug};
6use std::sync::Arc;
7
8use crate::core::Context;
9use crate::rule::RuleResult;
10
11#[derive(Debug, Serialize, Deserialize, Clone)]
12pub enum NodeType {
13    RuleNode,
14    DBNode,
15    AINode,
16    GrpcNode,
17}
18
19#[async_trait]
20pub trait Node: Send + Sync {
21    fn id(&self) -> &str;
22    fn node_type(&self) -> NodeType;
23    async fn run(&self, ctx: &mut Context) -> RuleResult;
24}
25
26// ============================================================
27// RuleNode - Evaluates conditions and transforms data
28// ============================================================
29
30#[derive(Debug, Clone)]
31pub struct RuleNode {
32    pub id: String,
33    pub condition: String,
34}
35
36impl RuleNode {
37    pub fn new(id: impl Into<String>, condition: impl Into<String>) -> Self {
38        Self {
39            id: id.into(),
40            condition: condition.into(),
41        }
42    }
43}
44
45#[async_trait]
46impl Node for RuleNode {
47    fn id(&self) -> &str {
48        &self.id
49    }
50
51    fn node_type(&self) -> NodeType {
52        NodeType::RuleNode
53    }
54
55    async fn run(&self, ctx: &mut Context) -> RuleResult {
56        info!("RuleNode[{}]: Evaluating condition '{}'", self.id, self.condition);
57
58        // Simple condition evaluation (can be extended with proper parser)
59        let result = if self.condition == "true" {
60            Value::Bool(true)
61        } else if self.condition == "false" {
62            Value::Bool(false)
63        } else {
64            // Try to evaluate based on context
65            ctx.data.get(&self.condition).cloned().unwrap_or(Value::Bool(true))
66        };
67
68        debug!("RuleNode[{}]: Result = {:?}", self.id, result);
69        ctx.data.insert(format!("{}_result", self.id), result.clone());
70
71        Ok(result)
72    }
73}
74
75// ============================================================
76// DBNode - Database operations with pluggable executor
77// ============================================================
78
79/// Trait for database executors - implement this for MySQL, Postgres, etc.
80#[async_trait]
81pub trait DatabaseExecutor: Send + Sync {
82    /// Execute a query and return JSON result
83    async fn execute(&self, query: &str, params: &[&str]) -> Result<Value, String>;
84}
85
86/// Mock database executor (default for examples/testing)
87#[derive(Debug, Clone)]
88pub struct MockDatabaseExecutor;
89
90#[async_trait]
91impl DatabaseExecutor for MockDatabaseExecutor {
92    async fn execute(&self, query: &str, _params: &[&str]) -> Result<Value, String> {
93        // Simulate async DB operation
94        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
95        
96        Ok(serde_json::json!({
97            "query": query,
98            "rows": [
99                {"id": 1, "name": "Alice", "active": true},
100                {"id": 2, "name": "Bob", "active": false}
101            ],
102            "count": 2
103        }))
104    }
105}
106
107#[derive(Clone)]
108pub struct DBNode {
109    pub id: String,
110    pub query: String,
111    executor: Option<Arc<dyn DatabaseExecutor>>,
112}
113
114impl DBNode {
115    pub fn new(id: impl Into<String>, query: impl Into<String>) -> Self {
116        Self {
117            id: id.into(),
118            query: query.into(),
119            executor: None,
120        }
121    }
122    
123    /// Create DBNode with custom executor (MySQL, Postgres, etc.)
124    pub fn with_executor(
125        id: impl Into<String>,
126        query: impl Into<String>,
127        executor: Arc<dyn DatabaseExecutor>,
128    ) -> Self {
129        Self {
130            id: id.into(),
131            query: query.into(),
132            executor: Some(executor),
133        }
134    }
135}
136
137impl std::fmt::Debug for DBNode {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("DBNode")
140            .field("id", &self.id)
141            .field("query", &self.query)
142            .field("has_executor", &self.executor.is_some())
143            .finish()
144    }
145}
146
147#[async_trait]
148impl Node for DBNode {
149    fn id(&self) -> &str {
150        &self.id
151    }
152
153    fn node_type(&self) -> NodeType {
154        NodeType::DBNode
155    }
156
157    async fn run(&self, ctx: &mut Context) -> RuleResult {
158        info!("DBNode[{}]: Executing query '{}'", self.id, self.query);
159
160        let executor = self.executor.as_ref()
161            .map(|e| e.clone())
162            .unwrap_or_else(|| Arc::new(MockDatabaseExecutor) as Arc<dyn DatabaseExecutor>);
163        
164        // Get params from context (if any)
165        let params: Vec<String> = vec![];  // TODO: Extract from context if needed
166        let params_refs: Vec<&str> = params.iter().map(|s| s.as_str()).collect();
167        
168        let result = executor.execute(&self.query, &params_refs).await
169            .map_err(|e| crate::rule::RuleError::Eval(format!("Database error: {}", e)))?;
170
171        debug!("DBNode[{}]: Query result = {:?}", self.id, result);
172        ctx.data.insert(format!("{}_result", self.id), result.clone());
173
174        Ok(result)
175    }
176}
177
178// ============================================================
179// AINode - Simulates AI/LLM operations
180// ============================================================
181
182#[derive(Debug, Clone)]
183pub struct AINode {
184    pub id: String,
185    pub prompt: String,
186}
187
188impl AINode {
189    pub fn new(id: impl Into<String>, prompt: impl Into<String>) -> Self {
190        Self {
191            id: id.into(),
192            prompt: prompt.into(),
193        }
194    }
195}
196
197#[async_trait]
198impl Node for AINode {
199    fn id(&self) -> &str {
200        &self.id
201    }
202
203    fn node_type(&self) -> NodeType {
204        NodeType::AINode
205    }
206
207    async fn run(&self, ctx: &mut Context) -> RuleResult {
208        info!("AINode[{}]: Processing prompt '{}'", self.id, self.prompt);
209
210        // Simulate async AI API call
211        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
212
213        // Mock AI response based on context
214        let context_summary: Vec<String> = ctx.data.keys().cloned().collect();
215        let mock_response = serde_json::json!({
216            "prompt": self.prompt,
217            "response": format!("AI processed: {} with context keys: {:?}", self.prompt, context_summary),
218            "confidence": 0.95,
219            "model": "mock-gpt-4"
220        });
221
222        debug!("AINode[{}]: AI response = {:?}", self.id, mock_response);
223        ctx.data.insert(format!("{}_result", self.id), mock_response.clone());
224
225        Ok(mock_response)
226    }
227}
228
229// ============================================================
230// GrpcNode - Calls gRPC services
231// ============================================================
232
233#[derive(Debug, Clone)]
234pub struct GrpcNode {
235    pub id: String,
236    pub service_url: String,
237    pub method: String,
238}
239
240impl GrpcNode {
241    pub fn new(id: impl Into<String>, service_url: impl Into<String>, method: impl Into<String>) -> Self {
242        Self {
243            id: id.into(),
244            service_url: service_url.into(),
245            method: method.into(),
246        }
247    }
248}
249
250#[async_trait]
251impl Node for GrpcNode {
252    fn id(&self) -> &str {
253        &self.id
254    }
255
256    fn node_type(&self) -> NodeType {
257        NodeType::GrpcNode
258    }
259
260    async fn run(&self, ctx: &mut Context) -> RuleResult {
261        info!("GrpcNode[{}]: Calling gRPC service '{}' method '{}'", 
262              self.id, self.service_url, self.method);
263
264        // Simulate async gRPC call
265        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
266
267        // Mock gRPC response
268        let mock_response = serde_json::json!({
269            "service": self.service_url,
270            "method": self.method,
271            "status": "OK",
272            "response": format!("gRPC call to {} completed", self.method),
273            "latency_ms": 100
274        });
275
276        debug!("GrpcNode[{}]: gRPC response = {:?}", self.id, mock_response);
277        ctx.data.insert(format!("{}_result", self.id), mock_response.clone());
278
279        Ok(mock_response)
280    }
281}
282