1use futures::{SinkExt, StreamExt};
2use tokio::sync::mpsc;
3use tokio_tungstenite::connect_async;
4use tokio_tungstenite::tungstenite::Message as WsMessage;
5
6use super::{GatewayMessage, GatewayResponse, GatewayTransport};
7
8pub struct SlackTransport {
11 app_token: String,
12 bot_token: String,
13 allowed_users: Vec<String>,
14}
15
16impl SlackTransport {
17 pub fn new(app_token: String, bot_token: String, allowed_users: Vec<String>) -> Self {
18 Self {
19 app_token,
20 bot_token,
21 allowed_users,
22 }
23 }
24
25 async fn get_socket_url(&self) -> anyhow::Result<String> {
27 let client = reqwest::Client::new();
28 let resp: serde_json::Value = client
29 .post("https://slack.com/api/apps.connections.open")
30 .header("Authorization", format!("Bearer {}", self.app_token))
31 .send()
32 .await?
33 .json()
34 .await?;
35
36 if !resp["ok"].as_bool().unwrap_or(false) {
37 anyhow::bail!(
38 "Slack Socket Mode connection failed: {}",
39 resp["error"].as_str().unwrap_or("unknown")
40 );
41 }
42
43 Ok(resp["url"].as_str().unwrap_or("").to_string())
44 }
45
46 async fn post_message(
48 &self,
49 channel: &str,
50 text: &str,
51 buttons: &[Vec<String>],
52 ) -> anyhow::Result<()> {
53 let client = reqwest::Client::new();
54
55 let mut blocks: Vec<serde_json::Value> = vec![serde_json::json!({
56 "type": "section",
57 "text": {
58 "type": "mrkdwn",
59 "text": text,
60 }
61 })];
62
63 if !buttons.is_empty() {
64 let elements: Vec<serde_json::Value> = buttons
65 .iter()
66 .flat_map(|row| {
67 row.iter().map(|label| {
68 serde_json::json!({
69 "type": "button",
70 "text": {
71 "type": "plain_text",
72 "text": label,
73 },
74 "value": label,
75 "action_id": label,
76 })
77 })
78 })
79 .collect();
80 blocks.push(serde_json::json!({
81 "type": "actions",
82 "elements": elements,
83 }));
84 }
85
86 client
87 .post("https://slack.com/api/chat.postMessage")
88 .header("Authorization", format!("Bearer {}", self.bot_token))
89 .json(&serde_json::json!({
90 "channel": channel,
91 "blocks": blocks,
92 }))
93 .send()
94 .await?;
95
96 Ok(())
97 }
98}
99
100#[async_trait::async_trait]
101impl GatewayTransport for SlackTransport {
102 fn name(&self) -> &str {
103 "slack"
104 }
105
106 async fn start(&self, tx: mpsc::UnboundedSender<GatewayMessage>) -> anyhow::Result<()> {
107 let allowed = self.allowed_users.clone();
108
109 tracing::info!("Slack gateway starting (Socket Mode)");
110
111 let socket_url = self.get_socket_url().await?;
112
113 tokio::spawn(async move {
114 match connect_async(&socket_url).await {
115 Ok((mut ws_stream, _)) => {
116 while let Some(Ok(msg)) = ws_stream.next().await {
117 if let WsMessage::Text(text) = msg {
118 if let Ok(payload) = serde_json::from_str::<serde_json::Value>(&text) {
119 let event_type = payload["type"].as_str().unwrap_or("");
120
121 if event_type == "events_api" {
122 let event = &payload["payload"]["event"];
123 let ev_type = event["type"].as_str().unwrap_or("");
124
125 if ev_type == "message" && event["subtype"].is_null() {
126 let user = event["user"].as_str().unwrap_or("").to_string();
127 let channel =
128 event["channel"].as_str().unwrap_or("").to_string();
129 let text = event["text"].as_str().unwrap_or("").to_string();
130 let ts = event["ts"].as_str().map(|s| s.to_string());
131
132 if !allowed.is_empty() && !allowed.contains(&user) {
133 continue;
134 }
135
136 if !text.is_empty() {
137 let _ = tx.send(GatewayMessage {
138 surface: "slack".into(),
139 user_id: user,
140 chat_id: channel,
141 text,
142 message_id: ts,
143 });
144 }
145 }
146 }
147
148 if let Some(envelope_id) = payload["envelope_id"].as_str() {
150 let ack = serde_json::json!({
151 "envelope_id": envelope_id,
152 });
153 let _ = ws_stream
154 .send(WsMessage::Text(ack.to_string().into()))
155 .await;
156 }
157 }
158 }
159 }
160 }
161 Err(e) => {
162 tracing::error!("Slack WebSocket connection failed: {}", e);
163 }
164 }
165 });
166
167 Ok(())
168 }
169
170 async fn send(&self, response: GatewayResponse) -> anyhow::Result<()> {
171 self.post_message(&response.chat_id, &response.text, &response.buttons)
172 .await
173 }
174
175 async fn stop(&self) -> anyhow::Result<()> {
176 tracing::info!("Slack gateway stopped");
177 Ok(())
178 }
179}