1
2use async_trait::async_trait;
3use serde::{Serialize, Deserialize};
4use serde_json::Value;
5use tracing::{info, debug};
6use std::sync::Arc;
7
8use crate::core::Context;
9use crate::rule::RuleResult;
10
11#[derive(Debug, Serialize, Deserialize, Clone)]
12pub enum NodeType {
13 RuleNode,
14 DBNode,
15 AINode,
16 GrpcNode,
17}
18
19#[async_trait]
20pub trait Node: Send + Sync {
21 fn id(&self) -> &str;
22 fn node_type(&self) -> NodeType;
23 async fn run(&self, ctx: &mut Context) -> RuleResult;
24}
25
26#[derive(Debug, Clone)]
31pub struct RuleNode {
32 pub id: String,
33 pub condition: String,
34}
35
36impl RuleNode {
37 pub fn new(id: impl Into<String>, condition: impl Into<String>) -> Self {
38 Self {
39 id: id.into(),
40 condition: condition.into(),
41 }
42 }
43}
44
45#[async_trait]
46impl Node for RuleNode {
47 fn id(&self) -> &str {
48 &self.id
49 }
50
51 fn node_type(&self) -> NodeType {
52 NodeType::RuleNode
53 }
54
55 async fn run(&self, ctx: &mut Context) -> RuleResult {
56 info!("RuleNode[{}]: Evaluating condition '{}'", self.id, self.condition);
57
58 let result = if self.condition == "true" {
60 Value::Bool(true)
61 } else if self.condition == "false" {
62 Value::Bool(false)
63 } else {
64 ctx.data.get(&self.condition).cloned().unwrap_or(Value::Bool(true))
66 };
67
68 debug!("RuleNode[{}]: Result = {:?}", self.id, result);
69 ctx.data.insert(format!("{}_result", self.id), result.clone());
70
71 Ok(result)
72 }
73}
74
75#[async_trait]
81pub trait DatabaseExecutor: Send + Sync {
82 async fn execute(&self, query: &str, params: &[&str]) -> Result<Value, String>;
84}
85
86#[derive(Debug, Clone)]
88pub struct MockDatabaseExecutor;
89
90#[async_trait]
91impl DatabaseExecutor for MockDatabaseExecutor {
92 async fn execute(&self, query: &str, _params: &[&str]) -> Result<Value, String> {
93 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
95
96 Ok(serde_json::json!({
97 "query": query,
98 "rows": [
99 {"id": 1, "name": "Alice", "active": true},
100 {"id": 2, "name": "Bob", "active": false}
101 ],
102 "count": 2
103 }))
104 }
105}
106
107#[derive(Clone)]
108pub struct DBNode {
109 pub id: String,
110 pub query: String,
111 executor: Option<Arc<dyn DatabaseExecutor>>,
112}
113
114impl DBNode {
115 pub fn new(id: impl Into<String>, query: impl Into<String>) -> Self {
116 Self {
117 id: id.into(),
118 query: query.into(),
119 executor: None,
120 }
121 }
122
123 pub fn with_executor(
125 id: impl Into<String>,
126 query: impl Into<String>,
127 executor: Arc<dyn DatabaseExecutor>,
128 ) -> Self {
129 Self {
130 id: id.into(),
131 query: query.into(),
132 executor: Some(executor),
133 }
134 }
135}
136
137impl std::fmt::Debug for DBNode {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("DBNode")
140 .field("id", &self.id)
141 .field("query", &self.query)
142 .field("has_executor", &self.executor.is_some())
143 .finish()
144 }
145}
146
147#[async_trait]
148impl Node for DBNode {
149 fn id(&self) -> &str {
150 &self.id
151 }
152
153 fn node_type(&self) -> NodeType {
154 NodeType::DBNode
155 }
156
157 async fn run(&self, ctx: &mut Context) -> RuleResult {
158 info!("DBNode[{}]: Executing query '{}'", self.id, self.query);
159
160 let executor = self.executor.as_ref()
161 .map(|e| e.clone())
162 .unwrap_or_else(|| Arc::new(MockDatabaseExecutor) as Arc<dyn DatabaseExecutor>);
163
164 let params: Vec<String> = vec![]; let params_refs: Vec<&str> = params.iter().map(|s| s.as_str()).collect();
167
168 let result = executor.execute(&self.query, ¶ms_refs).await
169 .map_err(|e| crate::rule::RuleError::Eval(format!("Database error: {}", e)))?;
170
171 debug!("DBNode[{}]: Query result = {:?}", self.id, result);
172 ctx.data.insert(format!("{}_result", self.id), result.clone());
173
174 Ok(result)
175 }
176}
177
178#[derive(Debug, Clone)]
183pub struct AINode {
184 pub id: String,
185 pub prompt: String,
186}
187
188impl AINode {
189 pub fn new(id: impl Into<String>, prompt: impl Into<String>) -> Self {
190 Self {
191 id: id.into(),
192 prompt: prompt.into(),
193 }
194 }
195}
196
197#[async_trait]
198impl Node for AINode {
199 fn id(&self) -> &str {
200 &self.id
201 }
202
203 fn node_type(&self) -> NodeType {
204 NodeType::AINode
205 }
206
207 async fn run(&self, ctx: &mut Context) -> RuleResult {
208 info!("AINode[{}]: Processing prompt '{}'", self.id, self.prompt);
209
210 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
212
213 let context_summary: Vec<String> = ctx.data.keys().cloned().collect();
215 let mock_response = serde_json::json!({
216 "prompt": self.prompt,
217 "response": format!("AI processed: {} with context keys: {:?}", self.prompt, context_summary),
218 "confidence": 0.95,
219 "model": "mock-gpt-4"
220 });
221
222 debug!("AINode[{}]: AI response = {:?}", self.id, mock_response);
223 ctx.data.insert(format!("{}_result", self.id), mock_response.clone());
224
225 Ok(mock_response)
226 }
227}
228
229#[derive(Debug, Clone)]
234pub struct GrpcNode {
235 pub id: String,
236 pub service_url: String,
237 pub method: String,
238}
239
240impl GrpcNode {
241 pub fn new(id: impl Into<String>, service_url: impl Into<String>, method: impl Into<String>) -> Self {
242 Self {
243 id: id.into(),
244 service_url: service_url.into(),
245 method: method.into(),
246 }
247 }
248}
249
250#[async_trait]
251impl Node for GrpcNode {
252 fn id(&self) -> &str {
253 &self.id
254 }
255
256 fn node_type(&self) -> NodeType {
257 NodeType::GrpcNode
258 }
259
260 async fn run(&self, ctx: &mut Context) -> RuleResult {
261 info!("GrpcNode[{}]: Calling gRPC service '{}' method '{}'",
262 self.id, self.service_url, self.method);
263
264 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
266
267 let mock_response = serde_json::json!({
269 "service": self.service_url,
270 "method": self.method,
271 "status": "OK",
272 "response": format!("gRPC call to {} completed", self.method),
273 "latency_ms": 100
274 });
275
276 debug!("GrpcNode[{}]: gRPC response = {:?}", self.id, mock_response);
277 ctx.data.insert(format!("{}_result", self.id), mock_response.clone());
278
279 Ok(mock_response)
280 }
281}
282