varpulis_connector_slack/
lib.rs1use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use tokio::sync::Mutex;
11use tracing::warn;
12use varpulis_connector_api::{
13 ConfigParamInfo, ConnectorComponentInfo, ConnectorConfig, ConnectorError, ConnectorFactory,
14 ConnectorHealth, Sink, SinkConnector, SinkConnectorAdapter,
15};
16use varpulis_core::event::Event;
17use varpulis_core::Value;
18
19pub fn event_to_slack_payload(event: &Event) -> serde_json::Value {
25 let get_str = |key: &str| -> String {
26 event
27 .get(key)
28 .map(|v| match v {
29 Value::Str(s) => s.to_string(),
30 other => other.to_string(),
31 })
32 .unwrap_or_default()
33 };
34
35 let rule = get_str("rule");
36 let rule = if rule.is_empty() {
37 event.event_type.to_string()
38 } else {
39 rule
40 };
41 let mitre = get_str("mitre");
42 let severity = get_str("severity");
43 let summary = get_str("summary");
44
45 let severity_emoji = match severity.as_str() {
46 "critical" => ":red_circle:",
47 "high" => ":large_orange_circle:",
48 "medium" => ":large_yellow_circle:",
49 "low" => ":large_blue_circle:",
50 _ => ":white_circle:",
51 };
52
53 let skip = ["rule", "mitre", "severity", "summary", "event_type"];
55 let mut context_parts: Vec<String> = Vec::new();
56 for (key, val) in &event.data {
57 if !skip.contains(&key.as_ref()) {
58 let val_str = match val {
59 Value::Str(s) => s.to_string(),
60 other => other.to_string(),
61 };
62 context_parts.push(format!("*{key}:* {val_str}"));
63 }
64 }
65
66 let mut blocks = vec![
67 serde_json::json!({
68 "type": "header",
69 "text": {
70 "type": "plain_text",
71 "text": format!("{severity_emoji} {rule}"),
72 "emoji": true
73 }
74 }),
75 serde_json::json!({
76 "type": "section",
77 "fields": [
78 {
79 "type": "mrkdwn",
80 "text": format!("*Severity:*\n{}", severity)
81 },
82 {
83 "type": "mrkdwn",
84 "text": format!("*MITRE ATT&CK:*\n{}", if mitre.is_empty() { "N/A" } else { &mitre })
85 }
86 ]
87 }),
88 ];
89
90 if !summary.is_empty() {
91 blocks.push(serde_json::json!({
92 "type": "section",
93 "text": {
94 "type": "mrkdwn",
95 "text": summary
96 }
97 }));
98 }
99
100 if !context_parts.is_empty() {
101 let context_text = context_parts.join(" | ");
103 let truncated = if context_text.len() > 2900 {
104 format!("{}...", &context_text[..2900])
105 } else {
106 context_text
107 };
108 blocks.push(serde_json::json!({
109 "type": "context",
110 "elements": [{
111 "type": "mrkdwn",
112 "text": truncated
113 }]
114 }));
115 }
116
117 serde_json::json!({ "blocks": blocks })
118}
119
120pub struct SlackWebhookSink {
126 name: String,
127 webhook_url: String,
128 client: reqwest::Client,
129 last_send: Mutex<Option<Instant>>,
131 min_interval: Duration,
132}
133
134impl std::fmt::Debug for SlackWebhookSink {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 f.debug_struct("SlackWebhookSink")
137 .field("name", &self.name)
138 .field("webhook_url", &"[redacted]")
139 .finish_non_exhaustive()
140 }
141}
142
143impl SlackWebhookSink {
144 pub fn new(name: &str, webhook_url: &str) -> Self {
146 Self {
147 name: name.to_string(),
148 webhook_url: webhook_url.to_string(),
149 client: reqwest::Client::new(),
150 last_send: Mutex::new(None),
151 min_interval: Duration::from_secs(1),
152 }
153 }
154}
155
156#[async_trait]
157impl SinkConnector for SlackWebhookSink {
158 fn name(&self) -> &str {
159 &self.name
160 }
161
162 async fn send(&self, event: &Event) -> Result<(), ConnectorError> {
163 {
165 let mut last = self.last_send.lock().await;
166 if let Some(prev) = *last {
167 let elapsed = prev.elapsed();
168 if elapsed < self.min_interval {
169 tokio::time::sleep(self.min_interval - elapsed).await;
170 }
171 }
172 *last = Some(Instant::now());
173 }
174
175 let payload = event_to_slack_payload(event);
176
177 let response = self
178 .client
179 .post(&self.webhook_url)
180 .json(&payload)
181 .timeout(Duration::from_secs(10))
182 .send()
183 .await
184 .map_err(|e| ConnectorError::SendFailed(format!("Slack webhook POST: {e}")))?;
185
186 if !response.status().is_success() {
187 let status = response.status();
188 let body = response
189 .text()
190 .await
191 .unwrap_or_else(|_| "unknown".to_string());
192 warn!("Slack webhook returned {}: {}", status, body);
193 return Err(ConnectorError::SendFailed(format!(
194 "Slack returned {status}: {body}"
195 )));
196 }
197
198 Ok(())
199 }
200
201 async fn send_to_topic(
202 &self,
203 events: &[Arc<Event>],
204 _topic: &str,
205 ) -> Result<(), ConnectorError> {
206 for event in events {
207 self.send(event).await?;
208 }
209 Ok(())
210 }
211
212 async fn flush(&self) -> Result<(), ConnectorError> {
213 Ok(())
214 }
215
216 async fn close(&self) -> Result<(), ConnectorError> {
217 Ok(())
218 }
219
220 fn health_check(&self) -> ConnectorHealth {
221 ConnectorHealth::healthy(0)
222 }
223}
224
225static SLACK_CONFIG_PARAMS: &[ConfigParamInfo] = &[ConfigParamInfo {
230 name: "webhook_url",
231 description: "Slack incoming webhook URL",
232 required: true,
233 default_value: None,
234}];
235
236static SLACK_INFO: ConnectorComponentInfo = ConnectorComponentInfo {
237 connector_type: "slack",
238 display_name: "Slack",
239 description: "Slack incoming webhook sink for security alerts",
240 feature_flag: "",
241 supports_source: false,
242 supports_sink: true,
243 supports_managed: false,
244 config_params: SLACK_CONFIG_PARAMS,
245};
246
247struct SlackFactory;
248
249impl ConnectorFactory for SlackFactory {
250 fn info(&self) -> &ConnectorComponentInfo {
251 &SLACK_INFO
252 }
253
254 fn create_managed(
255 &self,
256 _name: &str,
257 _config: &ConnectorConfig,
258 ) -> Result<Box<dyn varpulis_connector_api::ManagedConnector>, ConnectorError> {
259 Err(ConnectorError::NotAvailable(
260 "slack managed connector not supported".to_string(),
261 ))
262 }
263
264 fn create_sink_connector(
265 &self,
266 config: &ConnectorConfig,
267 ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
268 let url = config.url.clone();
269 if url.is_empty() {
270 return Err(ConnectorError::ConfigError(
271 "webhook_url is required for Slack connector".to_string(),
272 ));
273 }
274 Ok(Box::new(SlackWebhookSink::new("slack", &url)))
275 }
276
277 fn create_engine_sink(
278 &self,
279 name: &str,
280 config: &ConnectorConfig,
281 _topic_override: Option<&str>,
282 _context_name: Option<&str>,
283 ) -> Result<Arc<dyn Sink>, ConnectorError> {
284 let connector = self.create_sink_connector(config)?;
285 Ok(Arc::new(SinkConnectorAdapter::new(name, connector)))
286 }
287}
288
289inventory::submit! { &SlackFactory as &dyn ConnectorFactory }
290
291#[cfg(test)]
292mod tests {
293 use chrono::Utc;
294 use varpulis_core::event::Event;
295 use varpulis_core::value::FxIndexMap;
296
297 use super::*;
298
299 #[test]
300 fn test_slack_payload_structure() {
301 let mut data = FxIndexMap::default();
302 data.insert("rule".into(), Value::str("full_killchain"));
303 data.insert("mitre".into(), Value::str("T1059,T1003"));
304 data.insert("severity".into(), Value::str("critical"));
305 data.insert("summary".into(), Value::str("Kill chain detected"));
306 data.insert("host".into(), Value::str("WS01"));
307
308 let event = Event {
309 event_type: "KillChainAlert".into(),
310 timestamp: Utc::now(),
311 data,
312 };
313
314 let payload = event_to_slack_payload(&event);
315 let blocks = payload["blocks"].as_array().unwrap();
316
317 let header = &blocks[0];
319 assert_eq!(header["type"], "header");
320 let header_text = header["text"]["text"].as_str().unwrap();
321 assert!(header_text.contains("full_killchain"));
322 assert!(header_text.contains(":red_circle:"));
323
324 let section = &blocks[1];
326 assert_eq!(section["type"], "section");
327
328 let summary_block = &blocks[2];
330 assert_eq!(summary_block["type"], "section");
331 assert!(summary_block["text"]["text"]
332 .as_str()
333 .unwrap()
334 .contains("Kill chain detected"));
335 }
336
337 #[test]
338 fn test_severity_emojis() {
339 let make_event = |sev: &str| -> Event {
340 let mut data = FxIndexMap::default();
341 data.insert("severity".into(), Value::str(sev));
342 Event {
343 event_type: "Alert".into(),
344 timestamp: Utc::now(),
345 data,
346 }
347 };
348
349 let p = event_to_slack_payload(&make_event("critical"));
350 assert!(p["blocks"][0]["text"]["text"]
351 .as_str()
352 .unwrap()
353 .contains(":red_circle:"));
354
355 let p = event_to_slack_payload(&make_event("low"));
356 assert!(p["blocks"][0]["text"]["text"]
357 .as_str()
358 .unwrap()
359 .contains(":large_blue_circle:"));
360 }
361}