1use arrow::array::Array;
2use arrow::record_batch::RecordBatch;
3use async_trait::async_trait;
4use tracing::{debug, info, warn};
5
6#[derive(Debug, Clone)]
7pub struct Activation {
8 pub rule_id: String,
9 pub rule_name: String,
10 pub action: String,
11 pub context: Option<RecordBatch>,
12}
13
14#[async_trait]
15pub trait Agent: Send + Sync {
16 fn name(&self) -> &str;
17 async fn execute(&self, activation: &Activation) -> anyhow::Result<()>;
18}
19
20pub struct LoggerAgent;
21
22#[async_trait]
23impl Agent for LoggerAgent {
24 fn name(&self) -> &str {
25 "logger"
26 }
27
28 async fn execute(&self, activation: &Activation) -> anyhow::Result<()> {
29 let rows = activation
30 .context
31 .as_ref()
32 .map(|b| b.num_rows())
33 .unwrap_or(0);
34
35 if let Some(batch) = &activation.context {
37 let field_names: Vec<String> = batch
38 .schema()
39 .fields()
40 .iter()
41 .map(|f| f.name().clone())
42 .collect();
43 debug!(
44 agent = self.name(),
45 rule_id = %activation.rule_id,
46 matched_rows = rows,
47 columns = ?field_names,
48 "Agent firing with matched data"
49 );
50 }
51
52 info!(
53 agent = self.name(),
54 action = %activation.action,
55 rule_name = %activation.rule_name,
56 rule_id = %activation.rule_id,
57 matched_rows = rows,
58 "Agent firing action"
59 );
60 Ok(())
61 }
62}
63
64pub struct WebhookAgent {
65 pub url: String,
66 client: reqwest::Client,
67 template: Option<handlebars::Handlebars<'static>>,
68}
69
70impl WebhookAgent {
71 pub fn new(url: String, template: Option<String>) -> Self {
72 let client = reqwest::Client::builder()
74 .pool_max_idle_per_host(10)
75 .timeout(std::time::Duration::from_secs(30))
76 .build()
77 .expect("Failed to create HTTP client");
78
79 let compiled_template = template.and_then(|t| {
81 let mut handlebars = handlebars::Handlebars::new();
82 handlebars.set_strict_mode(true);
83 if handlebars.register_template_string("webhook", &t).is_ok() {
84 Some(handlebars)
85 } else {
86 None
87 }
88 });
89
90 Self {
91 url,
92 client,
93 template: compiled_template,
94 }
95 }
96}
97
98#[async_trait]
99impl Agent for WebhookAgent {
100 fn name(&self) -> &str {
101 "webhook"
102 }
103
104 async fn execute(&self, activation: &Activation) -> anyhow::Result<()> {
105 let mut context = serde_json::json!({
107 "rule_id": activation.rule_id,
108 "rule_name": activation.rule_name,
109 "action": activation.action,
110 "count": activation.context.as_ref().map(|b| b.num_rows()).unwrap_or(0),
111 "matched_rows": activation.context.as_ref().map(|b| b.num_rows()).unwrap_or(0),
112 "timestamp": chrono::Utc::now().to_rfc3339(),
113 });
114
115 let mut matched_data = Vec::new();
117 if let Some(batch) = &activation.context {
118 let schema = batch.schema();
120 for row_idx in 0..batch.num_rows() {
121 let mut row = serde_json::Map::new();
122 for col_idx in 0..batch.num_columns() {
123 let field = schema.field(col_idx);
124 let array = batch.column(col_idx);
125 let value = match array.data_type() {
127 arrow::datatypes::DataType::Int32 => {
128 if let Some(arr) =
129 array.as_any().downcast_ref::<arrow::array::Int32Array>()
130 {
131 if !arr.is_null(row_idx) {
132 serde_json::Value::Number(arr.value(row_idx).into())
133 } else {
134 serde_json::Value::Null
135 }
136 } else {
137 serde_json::Value::Null
138 }
139 }
140 arrow::datatypes::DataType::Int64 => {
141 if let Some(arr) =
142 array.as_any().downcast_ref::<arrow::array::Int64Array>()
143 {
144 if !arr.is_null(row_idx) {
145 serde_json::Value::Number(arr.value(row_idx).into())
146 } else {
147 serde_json::Value::Null
148 }
149 } else {
150 serde_json::Value::Null
151 }
152 }
153 arrow::datatypes::DataType::Float64 => {
154 if let Some(arr) =
155 array.as_any().downcast_ref::<arrow::array::Float64Array>()
156 {
157 if !arr.is_null(row_idx) {
158 serde_json::Value::Number(
159 serde_json::Number::from_f64(arr.value(row_idx))
160 .unwrap_or_else(|| serde_json::Number::from(0)),
161 )
162 } else {
163 serde_json::Value::Null
164 }
165 } else {
166 serde_json::Value::Null
167 }
168 }
169 arrow::datatypes::DataType::Boolean => {
170 if let Some(arr) =
171 array.as_any().downcast_ref::<arrow::array::BooleanArray>()
172 {
173 if !arr.is_null(row_idx) {
174 serde_json::Value::Bool(arr.value(row_idx))
175 } else {
176 serde_json::Value::Null
177 }
178 } else {
179 serde_json::Value::Null
180 }
181 }
182 arrow::datatypes::DataType::Utf8 => {
183 if let Some(arr) =
184 array.as_any().downcast_ref::<arrow::array::StringArray>()
185 {
186 if !arr.is_null(row_idx) {
187 serde_json::Value::String(arr.value(row_idx).to_string())
188 } else {
189 serde_json::Value::Null
190 }
191 } else {
192 serde_json::Value::Null
193 }
194 }
195 _ => serde_json::Value::Null,
196 };
197 row.insert(field.name().clone(), value);
198 }
199 matched_data.push(serde_json::Value::Object(row));
200 }
201 context.as_object_mut().unwrap().insert(
202 "matched_data".to_string(),
203 serde_json::Value::Array(matched_data.clone()),
204 );
205 context.as_object_mut().unwrap().insert(
206 "matched_rows".to_string(),
207 serde_json::Value::Array(matched_data),
208 );
209 }
210
211 let payload: serde_json::Value = if let Some(ref template) = self.template {
213 match template.render("webhook", &context) {
215 Ok(rendered) => {
216 serde_json::from_str(&rendered)
218 .unwrap_or_else(|_| serde_json::json!({ "text": rendered }))
219 }
220 Err(e) => {
221 warn!(error = %e, "Template rendering failed, using default payload");
223 context
224 }
225 }
226 } else {
227 context
229 };
230
231 debug!(url = %self.url, rule_id = %activation.rule_id, "Sending webhook");
232
233 self.client.post(&self.url).json(&payload).send().await?;
235
236 Ok(())
237 }
238}