fuse_rule/
agent.rs

1use arrow::array::Array;
2use arrow::record_batch::RecordBatch;
3use async_trait::async_trait;
4use tracing::{debug, info, warn};
5
6#[derive(Debug, Clone)]
7pub struct Activation {
8    pub rule_id: String,
9    pub rule_name: String,
10    pub action: String,
11    pub context: Option<RecordBatch>,
12}
13
14#[async_trait]
15pub trait Agent: Send + Sync {
16    fn name(&self) -> &str;
17    async fn execute(&self, activation: &Activation) -> anyhow::Result<()>;
18}
19
20pub struct LoggerAgent;
21
22#[async_trait]
23impl Agent for LoggerAgent {
24    fn name(&self) -> &str {
25        "logger"
26    }
27
28    async fn execute(&self, activation: &Activation) -> anyhow::Result<()> {
29        let rows = activation
30            .context
31            .as_ref()
32            .map(|b| b.num_rows())
33            .unwrap_or(0);
34
35        // Log matched rows data for debugging
36        if let Some(batch) = &activation.context {
37            let field_names: Vec<String> = batch
38                .schema()
39                .fields()
40                .iter()
41                .map(|f| f.name().clone())
42                .collect();
43            debug!(
44                agent = self.name(),
45                rule_id = %activation.rule_id,
46                matched_rows = rows,
47                columns = ?field_names,
48                "Agent firing with matched data"
49            );
50        }
51
52        info!(
53            agent = self.name(),
54            action = %activation.action,
55            rule_name = %activation.rule_name,
56            rule_id = %activation.rule_id,
57            matched_rows = rows,
58            "Agent firing action"
59        );
60        Ok(())
61    }
62}
63
64pub struct WebhookAgent {
65    pub url: String,
66    client: reqwest::Client,
67    template: Option<handlebars::Handlebars<'static>>,
68}
69
70impl WebhookAgent {
71    pub fn new(url: String, template: Option<String>) -> Self {
72        // Use connection pooling with proper configuration
73        let client = reqwest::Client::builder()
74            .pool_max_idle_per_host(10)
75            .timeout(std::time::Duration::from_secs(30))
76            .build()
77            .expect("Failed to create HTTP client");
78
79        // Compile Handlebars template if provided
80        let compiled_template = template.and_then(|t| {
81            let mut handlebars = handlebars::Handlebars::new();
82            handlebars.set_strict_mode(true);
83            if handlebars.register_template_string("webhook", &t).is_ok() {
84                Some(handlebars)
85            } else {
86                None
87            }
88        });
89
90        Self {
91            url,
92            client,
93            template: compiled_template,
94        }
95    }
96}
97
98#[async_trait]
99impl Agent for WebhookAgent {
100    fn name(&self) -> &str {
101        "webhook"
102    }
103
104    async fn execute(&self, activation: &Activation) -> anyhow::Result<()> {
105        // Build template context
106        let mut context = serde_json::json!({
107            "rule_id": activation.rule_id,
108            "rule_name": activation.rule_name,
109            "action": activation.action,
110            "count": activation.context.as_ref().map(|b| b.num_rows()).unwrap_or(0),
111            "matched_rows": activation.context.as_ref().map(|b| b.num_rows()).unwrap_or(0),
112            "timestamp": chrono::Utc::now().to_rfc3339(),
113        });
114
115        // Add matched rows data if available (rich context)
116        let mut matched_data = Vec::new();
117        if let Some(batch) = &activation.context {
118            // Convert RecordBatch to JSON for rich context
119            let schema = batch.schema();
120            for row_idx in 0..batch.num_rows() {
121                let mut row = serde_json::Map::new();
122                for col_idx in 0..batch.num_columns() {
123                    let field = schema.field(col_idx);
124                    let array = batch.column(col_idx);
125                    // Convert array value to JSON (simplified - handles common types)
126                    let value = match array.data_type() {
127                        arrow::datatypes::DataType::Int32 => {
128                            if let Some(arr) =
129                                array.as_any().downcast_ref::<arrow::array::Int32Array>()
130                            {
131                                if !arr.is_null(row_idx) {
132                                    serde_json::Value::Number(arr.value(row_idx).into())
133                                } else {
134                                    serde_json::Value::Null
135                                }
136                            } else {
137                                serde_json::Value::Null
138                            }
139                        }
140                        arrow::datatypes::DataType::Int64 => {
141                            if let Some(arr) =
142                                array.as_any().downcast_ref::<arrow::array::Int64Array>()
143                            {
144                                if !arr.is_null(row_idx) {
145                                    serde_json::Value::Number(arr.value(row_idx).into())
146                                } else {
147                                    serde_json::Value::Null
148                                }
149                            } else {
150                                serde_json::Value::Null
151                            }
152                        }
153                        arrow::datatypes::DataType::Float64 => {
154                            if let Some(arr) =
155                                array.as_any().downcast_ref::<arrow::array::Float64Array>()
156                            {
157                                if !arr.is_null(row_idx) {
158                                    serde_json::Value::Number(
159                                        serde_json::Number::from_f64(arr.value(row_idx))
160                                            .unwrap_or_else(|| serde_json::Number::from(0)),
161                                    )
162                                } else {
163                                    serde_json::Value::Null
164                                }
165                            } else {
166                                serde_json::Value::Null
167                            }
168                        }
169                        arrow::datatypes::DataType::Boolean => {
170                            if let Some(arr) =
171                                array.as_any().downcast_ref::<arrow::array::BooleanArray>()
172                            {
173                                if !arr.is_null(row_idx) {
174                                    serde_json::Value::Bool(arr.value(row_idx))
175                                } else {
176                                    serde_json::Value::Null
177                                }
178                            } else {
179                                serde_json::Value::Null
180                            }
181                        }
182                        arrow::datatypes::DataType::Utf8 => {
183                            if let Some(arr) =
184                                array.as_any().downcast_ref::<arrow::array::StringArray>()
185                            {
186                                if !arr.is_null(row_idx) {
187                                    serde_json::Value::String(arr.value(row_idx).to_string())
188                                } else {
189                                    serde_json::Value::Null
190                                }
191                            } else {
192                                serde_json::Value::Null
193                            }
194                        }
195                        _ => serde_json::Value::Null,
196                    };
197                    row.insert(field.name().clone(), value);
198                }
199                matched_data.push(serde_json::Value::Object(row));
200            }
201            context.as_object_mut().unwrap().insert(
202                "matched_data".to_string(),
203                serde_json::Value::Array(matched_data.clone()),
204            );
205            context.as_object_mut().unwrap().insert(
206                "matched_rows".to_string(),
207                serde_json::Value::Array(matched_data),
208            );
209        }
210
211        // Render payload using template or default JSON
212        let payload: serde_json::Value = if let Some(ref template) = self.template {
213            // Use Handlebars template
214            match template.render("webhook", &context) {
215                Ok(rendered) => {
216                    // Try to parse as JSON, fallback to string
217                    serde_json::from_str(&rendered)
218                        .unwrap_or_else(|_| serde_json::json!({ "text": rendered }))
219                }
220                Err(e) => {
221                    // Template rendering failed, fallback to default JSON
222                    warn!(error = %e, "Template rendering failed, using default payload");
223                    context
224                }
225            }
226        } else {
227            // Use default JSON format
228            context
229        };
230
231        debug!(url = %self.url, rule_id = %activation.rule_id, "Sending webhook");
232
233        // In a real production system, we might want to handle retries here
234        self.client.post(&self.url).json(&payload).send().await?;
235
236        Ok(())
237    }
238}