Skip to main content

sparrow/gateway/
slack.rs

1use futures::{SinkExt, StreamExt};
2use tokio::sync::mpsc;
3use tokio_tungstenite::connect_async;
4use tokio_tungstenite::tungstenite::Message as WsMessage;
5
6use super::{GatewayMessage, GatewayResponse, GatewayTransport};
7
8// ─── Slack Bot transport (Socket Mode) ──────────────────────────────────────────
9
10pub struct SlackTransport {
11    app_token: String,
12    bot_token: String,
13    allowed_users: Vec<String>,
14}
15
16impl SlackTransport {
17    pub fn new(app_token: String, bot_token: String, allowed_users: Vec<String>) -> Self {
18        Self {
19            app_token,
20            bot_token,
21            allowed_users,
22        }
23    }
24
25    /// Get WebSocket URL from Slack's apps.connections.open
26    async fn get_socket_url(&self) -> anyhow::Result<String> {
27        let client = reqwest::Client::new();
28        let resp: serde_json::Value = client
29            .post("https://slack.com/api/apps.connections.open")
30            .header("Authorization", format!("Bearer {}", self.app_token))
31            .send()
32            .await?
33            .json()
34            .await?;
35
36        if !resp["ok"].as_bool().unwrap_or(false) {
37            anyhow::bail!(
38                "Slack Socket Mode connection failed: {}",
39                resp["error"].as_str().unwrap_or("unknown")
40            );
41        }
42
43        Ok(resp["url"].as_str().unwrap_or("").to_string())
44    }
45
46    /// Post message via Slack Web API
47    async fn post_message(
48        &self,
49        channel: &str,
50        text: &str,
51        buttons: &[Vec<String>],
52    ) -> anyhow::Result<()> {
53        let client = reqwest::Client::new();
54
55        let mut blocks: Vec<serde_json::Value> = vec![serde_json::json!({
56            "type": "section",
57            "text": {
58                "type": "mrkdwn",
59                "text": text,
60            }
61        })];
62
63        if !buttons.is_empty() {
64            let elements: Vec<serde_json::Value> = buttons
65                .iter()
66                .flat_map(|row| {
67                    row.iter().map(|label| {
68                        serde_json::json!({
69                            "type": "button",
70                            "text": {
71                                "type": "plain_text",
72                                "text": label,
73                            },
74                            "value": label,
75                            "action_id": label,
76                        })
77                    })
78                })
79                .collect();
80            blocks.push(serde_json::json!({
81                "type": "actions",
82                "elements": elements,
83            }));
84        }
85
86        client
87            .post("https://slack.com/api/chat.postMessage")
88            .header("Authorization", format!("Bearer {}", self.bot_token))
89            .json(&serde_json::json!({
90                "channel": channel,
91                "blocks": blocks,
92            }))
93            .send()
94            .await?;
95
96        Ok(())
97    }
98}
99
100#[async_trait::async_trait]
101impl GatewayTransport for SlackTransport {
102    fn name(&self) -> &str {
103        "slack"
104    }
105
106    async fn start(&self, tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()> {
107        let allowed = self.allowed_users.clone();
108
109        tracing::info!("Slack gateway starting (Socket Mode)");
110
111        let socket_url = self.get_socket_url().await?;
112
113        tokio::spawn(async move {
114            match connect_async(&socket_url).await {
115                Ok((mut ws_stream, _)) => {
116                    while let Some(Ok(msg)) = ws_stream.next().await {
117                        if let WsMessage::Text(text) = msg {
118                            if let Ok(payload) = serde_json::from_str::<serde_json::Value>(&text) {
119                                let event_type = payload["type"].as_str().unwrap_or("");
120
121                                if event_type == "events_api" {
122                                    let event = &payload["payload"]["event"];
123                                    let ev_type = event["type"].as_str().unwrap_or("");
124
125                                    if ev_type == "message" && event["subtype"].is_null() {
126                                        let user = event["user"].as_str().unwrap_or("").to_string();
127                                        let channel =
128                                            event["channel"].as_str().unwrap_or("").to_string();
129                                        let text = event["text"].as_str().unwrap_or("").to_string();
130                                        let ts = event["ts"].as_str().map(|s| s.to_string());
131
132                                        if !allowed.is_empty() && !allowed.contains(&user) {
133                                            continue;
134                                        }
135
136                                        if !text.is_empty() {
137                                            let _ = tx.send(GatewayMessage {
138                                                surface: "slack".into(),
139                                                user_id: user,
140                                                chat_id: channel,
141                                                text,
142                                                message_id: ts,
143                                            });
144                                        }
145                                    }
146                                }
147
148                                // Send envelope ACK if needed
149                                if let Some(envelope_id) = payload["envelope_id"].as_str() {
150                                    let ack = serde_json::json!({
151                                        "envelope_id": envelope_id,
152                                    });
153                                    let _ = ws_stream
154                                        .send(WsMessage::Text(ack.to_string().into()))
155                                        .await;
156                                }
157                            }
158                        }
159                    }
160                }
161                Err(e) => {
162                    tracing::error!("Slack WebSocket connection failed: {}", e);
163                }
164            }
165        });
166
167        Ok(())
168    }
169
170    async fn send(&self, response: GatewayResponse) -> anyhow::Result<()> {
171        self.post_message(&response.chat_id, &response.text, &response.buttons)
172            .await
173    }
174
175    async fn stop(&self) -> anyhow::Result<()> {
176        tracing::info!("Slack gateway stopped");
177        Ok(())
178    }
179}