Skip to main content

varpulis_connector_slack/
lib.rs

1//! Slack webhook connector for Varpulis.
2//!
3//! Sends security alerts to Slack channels via incoming webhooks,
4//! formatted with Block Kit for severity-colored alert cards.
5
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use async_trait::async_trait;
10use tokio::sync::Mutex;
11use tracing::warn;
12use varpulis_connector_api::{
13    ConfigParamInfo, ConnectorComponentInfo, ConnectorConfig, ConnectorError, ConnectorFactory,
14    ConnectorHealth, Sink, SinkConnector, SinkConnectorAdapter,
15};
16use varpulis_core::event::Event;
17use varpulis_core::Value;
18
19// ---------------------------------------------------------------------------
20// Slack Message Formatting
21// ---------------------------------------------------------------------------
22
23/// Format a Varpulis event as a Slack Block Kit message payload.
24pub fn event_to_slack_payload(event: &Event) -> serde_json::Value {
25    let get_str = |key: &str| -> String {
26        event
27            .get(key)
28            .map(|v| match v {
29                Value::Str(s) => s.to_string(),
30                other => other.to_string(),
31            })
32            .unwrap_or_default()
33    };
34
35    let rule = get_str("rule");
36    let rule = if rule.is_empty() {
37        event.event_type.to_string()
38    } else {
39        rule
40    };
41    let mitre = get_str("mitre");
42    let severity = get_str("severity");
43    let summary = get_str("summary");
44
45    let severity_emoji = match severity.as_str() {
46        "critical" => ":red_circle:",
47        "high" => ":large_orange_circle:",
48        "medium" => ":large_yellow_circle:",
49        "low" => ":large_blue_circle:",
50        _ => ":white_circle:",
51    };
52
53    // Build context fields (skip standard alert fields)
54    let skip = ["rule", "mitre", "severity", "summary", "event_type"];
55    let mut context_parts: Vec<String> = Vec::new();
56    for (key, val) in &event.data {
57        if !skip.contains(&key.as_ref()) {
58            let val_str = match val {
59                Value::Str(s) => s.to_string(),
60                other => other.to_string(),
61            };
62            context_parts.push(format!("*{key}:* {val_str}"));
63        }
64    }
65
66    let mut blocks = vec![
67        serde_json::json!({
68            "type": "header",
69            "text": {
70                "type": "plain_text",
71                "text": format!("{severity_emoji} {rule}"),
72                "emoji": true
73            }
74        }),
75        serde_json::json!({
76            "type": "section",
77            "fields": [
78                {
79                    "type": "mrkdwn",
80                    "text": format!("*Severity:*\n{}", severity)
81                },
82                {
83                    "type": "mrkdwn",
84                    "text": format!("*MITRE ATT&CK:*\n{}", if mitre.is_empty() { "N/A" } else { &mitre })
85                }
86            ]
87        }),
88    ];
89
90    if !summary.is_empty() {
91        blocks.push(serde_json::json!({
92            "type": "section",
93            "text": {
94                "type": "mrkdwn",
95                "text": summary
96            }
97        }));
98    }
99
100    if !context_parts.is_empty() {
101        // Truncate to avoid Slack's 3000 char limit per block
102        let context_text = context_parts.join(" | ");
103        let truncated = if context_text.len() > 2900 {
104            format!("{}...", &context_text[..2900])
105        } else {
106            context_text
107        };
108        blocks.push(serde_json::json!({
109            "type": "context",
110            "elements": [{
111                "type": "mrkdwn",
112                "text": truncated
113            }]
114        }));
115    }
116
117    serde_json::json!({ "blocks": blocks })
118}
119
120// ---------------------------------------------------------------------------
121// Slack Webhook Sink
122// ---------------------------------------------------------------------------
123
124/// Slack webhook sink — sends alerts to Slack via incoming webhooks.
125pub struct SlackWebhookSink {
126    name: String,
127    webhook_url: String,
128    client: reqwest::Client,
129    /// Rate limiter: Slack allows ~1 msg/sec per webhook.
130    last_send: Mutex<Option<Instant>>,
131    min_interval: Duration,
132}
133
134impl std::fmt::Debug for SlackWebhookSink {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        f.debug_struct("SlackWebhookSink")
137            .field("name", &self.name)
138            .field("webhook_url", &"[redacted]")
139            .finish_non_exhaustive()
140    }
141}
142
143impl SlackWebhookSink {
144    /// Create a new Slack webhook sink.
145    pub fn new(name: &str, webhook_url: &str) -> Self {
146        Self {
147            name: name.to_string(),
148            webhook_url: webhook_url.to_string(),
149            client: reqwest::Client::new(),
150            last_send: Mutex::new(None),
151            min_interval: Duration::from_secs(1),
152        }
153    }
154}
155
156#[async_trait]
157impl SinkConnector for SlackWebhookSink {
158    fn name(&self) -> &str {
159        &self.name
160    }
161
162    async fn send(&self, event: &Event) -> Result<(), ConnectorError> {
163        // Rate limiting: wait if too soon since last send
164        {
165            let mut last = self.last_send.lock().await;
166            if let Some(prev) = *last {
167                let elapsed = prev.elapsed();
168                if elapsed < self.min_interval {
169                    tokio::time::sleep(self.min_interval - elapsed).await;
170                }
171            }
172            *last = Some(Instant::now());
173        }
174
175        let payload = event_to_slack_payload(event);
176
177        let response = self
178            .client
179            .post(&self.webhook_url)
180            .json(&payload)
181            .timeout(Duration::from_secs(10))
182            .send()
183            .await
184            .map_err(|e| ConnectorError::SendFailed(format!("Slack webhook POST: {e}")))?;
185
186        if !response.status().is_success() {
187            let status = response.status();
188            let body = response
189                .text()
190                .await
191                .unwrap_or_else(|_| "unknown".to_string());
192            warn!("Slack webhook returned {}: {}", status, body);
193            return Err(ConnectorError::SendFailed(format!(
194                "Slack returned {status}: {body}"
195            )));
196        }
197
198        Ok(())
199    }
200
201    async fn send_to_topic(
202        &self,
203        events: &[Arc<Event>],
204        _topic: &str,
205    ) -> Result<(), ConnectorError> {
206        for event in events {
207            self.send(event).await?;
208        }
209        Ok(())
210    }
211
212    async fn flush(&self) -> Result<(), ConnectorError> {
213        Ok(())
214    }
215
216    async fn close(&self) -> Result<(), ConnectorError> {
217        Ok(())
218    }
219
220    fn health_check(&self) -> ConnectorHealth {
221        ConnectorHealth::healthy(0)
222    }
223}
224
225// ---------------------------------------------------------------------------
226// Factory & Registration
227// ---------------------------------------------------------------------------
228
229static SLACK_CONFIG_PARAMS: &[ConfigParamInfo] = &[ConfigParamInfo {
230    name: "webhook_url",
231    description: "Slack incoming webhook URL",
232    required: true,
233    default_value: None,
234}];
235
236static SLACK_INFO: ConnectorComponentInfo = ConnectorComponentInfo {
237    connector_type: "slack",
238    display_name: "Slack",
239    description: "Slack incoming webhook sink for security alerts",
240    feature_flag: "",
241    supports_source: false,
242    supports_sink: true,
243    supports_managed: false,
244    config_params: SLACK_CONFIG_PARAMS,
245};
246
247struct SlackFactory;
248
249impl ConnectorFactory for SlackFactory {
250    fn info(&self) -> &ConnectorComponentInfo {
251        &SLACK_INFO
252    }
253
254    fn create_managed(
255        &self,
256        _name: &str,
257        _config: &ConnectorConfig,
258    ) -> Result<Box<dyn varpulis_connector_api::ManagedConnector>, ConnectorError> {
259        Err(ConnectorError::NotAvailable(
260            "slack managed connector not supported".to_string(),
261        ))
262    }
263
264    fn create_sink_connector(
265        &self,
266        config: &ConnectorConfig,
267    ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
268        let url = config.url.clone();
269        if url.is_empty() {
270            return Err(ConnectorError::ConfigError(
271                "webhook_url is required for Slack connector".to_string(),
272            ));
273        }
274        Ok(Box::new(SlackWebhookSink::new("slack", &url)))
275    }
276
277    fn create_engine_sink(
278        &self,
279        name: &str,
280        config: &ConnectorConfig,
281        _topic_override: Option<&str>,
282        _context_name: Option<&str>,
283    ) -> Result<Arc<dyn Sink>, ConnectorError> {
284        let connector = self.create_sink_connector(config)?;
285        Ok(Arc::new(SinkConnectorAdapter::new(name, connector)))
286    }
287}
288
289inventory::submit! { &SlackFactory as &dyn ConnectorFactory }
290
291#[cfg(test)]
292mod tests {
293    use chrono::Utc;
294    use varpulis_core::event::Event;
295    use varpulis_core::value::FxIndexMap;
296
297    use super::*;
298
299    #[test]
300    fn test_slack_payload_structure() {
301        let mut data = FxIndexMap::default();
302        data.insert("rule".into(), Value::str("full_killchain"));
303        data.insert("mitre".into(), Value::str("T1059,T1003"));
304        data.insert("severity".into(), Value::str("critical"));
305        data.insert("summary".into(), Value::str("Kill chain detected"));
306        data.insert("host".into(), Value::str("WS01"));
307
308        let event = Event {
309            event_type: "KillChainAlert".into(),
310            timestamp: Utc::now(),
311            data,
312        };
313
314        let payload = event_to_slack_payload(&event);
315        let blocks = payload["blocks"].as_array().unwrap();
316
317        // Header block
318        let header = &blocks[0];
319        assert_eq!(header["type"], "header");
320        let header_text = header["text"]["text"].as_str().unwrap();
321        assert!(header_text.contains("full_killchain"));
322        assert!(header_text.contains(":red_circle:"));
323
324        // Section with severity + MITRE
325        let section = &blocks[1];
326        assert_eq!(section["type"], "section");
327
328        // Summary section
329        let summary_block = &blocks[2];
330        assert_eq!(summary_block["type"], "section");
331        assert!(summary_block["text"]["text"]
332            .as_str()
333            .unwrap()
334            .contains("Kill chain detected"));
335    }
336
337    #[test]
338    fn test_severity_emojis() {
339        let make_event = |sev: &str| -> Event {
340            let mut data = FxIndexMap::default();
341            data.insert("severity".into(), Value::str(sev));
342            Event {
343                event_type: "Alert".into(),
344                timestamp: Utc::now(),
345                data,
346            }
347        };
348
349        let p = event_to_slack_payload(&make_event("critical"));
350        assert!(p["blocks"][0]["text"]["text"]
351            .as_str()
352            .unwrap()
353            .contains(":red_circle:"));
354
355        let p = event_to_slack_payload(&make_event("low"));
356        assert!(p["blocks"][0]["text"]["text"]
357            .as_str()
358            .unwrap()
359            .contains(":large_blue_circle:"));
360    }
361}