Skip to main content

chat_system/messengers/
slack.rs

1//! Slack messenger — Web API implementation.
2
3use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use reqwest::Client;
8use serde_json::{Value, json};
9use std::collections::HashMap;
10use tokio::sync::Mutex;
11
12pub struct SlackMessenger {
13    name: String,
14    token: String,
15    app_token: Option<String>,
16    default_channel: Option<String>,
17    api_base_url: String,
18    client: Client,
19    last_seen_ts: Mutex<HashMap<String, String>>,
20    connected: bool,
21}
22
23impl SlackMessenger {
24    pub fn new(name: impl Into<String>, token: impl Into<String>) -> Self {
25        Self {
26            name: name.into(),
27            token: token.into(),
28            app_token: None,
29            default_channel: None,
30            api_base_url: "https://slack.com/api".to_string(),
31            client: Client::new(),
32            last_seen_ts: Mutex::new(HashMap::new()),
33            connected: false,
34        }
35    }
36
37    /// Set the app-level token for Socket Mode connections.
38    /// This token starts with `xapp-` and enables real-time event delivery.
39    pub fn with_app_token(mut self, token: impl Into<String>) -> Self {
40        self.app_token = Some(token.into());
41        self
42    }
43
44    /// Set a default channel to send messages to when no recipient is specified.
45    pub fn with_default_channel(mut self, channel: impl Into<String>) -> Self {
46        self.default_channel = Some(channel.into());
47        self
48    }
49
50    pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
51        self.api_base_url = url.into();
52        self
53    }
54
55    fn api_url(&self, path: impl AsRef<str>) -> String {
56        format!(
57            "{}/{}",
58            self.api_base_url.trim_end_matches('/'),
59            path.as_ref().trim_start_matches('/')
60        )
61    }
62
63    async fn get_json(&self, path: impl AsRef<str>) -> Result<Value> {
64        let response = self
65            .client
66            .get(self.api_url(path))
67            .bearer_auth(&self.token)
68            .send()
69            .await
70            .context("Slack API request failed")?;
71
72        let status = response.status();
73        let body = response
74            .text()
75            .await
76            .context("Failed to read Slack response body")?;
77        if !status.is_success() {
78            anyhow::bail!("Slack API request failed {}: {}", status, body);
79        }
80
81        serde_json::from_str(&body).context("Invalid Slack API response")
82    }
83
84    async fn post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
85        let response = self
86            .client
87            .post(self.api_url(path))
88            .bearer_auth(&self.token)
89            .json(&body)
90            .send()
91            .await
92            .context("Slack API request failed")?;
93
94        let status = response.status();
95        let response_body = response
96            .text()
97            .await
98            .context("Failed to read Slack response body")?;
99        if !status.is_success() {
100            anyhow::bail!("Slack API request failed {}: {}", status, response_body);
101        }
102
103        serde_json::from_str(&response_body).context("Invalid Slack API response")
104    }
105
106    fn parse_ok_response(&self, data: &Value, operation: &str) -> Result<()> {
107        if data["ok"].as_bool().unwrap_or(false) {
108            Ok(())
109        } else {
110            anyhow::bail!("Slack {} failed: {:?}", operation, data);
111        }
112    }
113
114    async fn fetch_conversation_ids(&self) -> Result<Vec<String>> {
115        let data = self
116            .get_json("conversations.list?types=public_channel,private_channel,im,mpim&exclude_archived=true&limit=1000")
117            .await?;
118        self.parse_ok_response(&data, "conversations.list")?;
119
120        Ok(data["channels"]
121            .as_array()
122            .into_iter()
123            .flatten()
124            .filter_map(|channel| channel["id"].as_str().map(ToString::to_string))
125            .collect())
126    }
127
128    async fn fetch_channel_messages(
129        &self,
130        channel_id: &str,
131        last_seen_ts: Option<&str>,
132    ) -> Result<Vec<(String, Message)>> {
133        let mut path = format!("conversations.history?channel={channel_id}&limit=100");
134        if let Some(ts) = last_seen_ts {
135            path.push_str("&oldest=");
136            path.push_str(ts);
137            path.push_str("&inclusive=false");
138        }
139
140        let data = self.get_json(path).await?;
141        self.parse_ok_response(&data, "conversations.history")?;
142
143        let mut messages = Vec::new();
144        if let Some(entries) = data["messages"].as_array() {
145            for entry in entries.iter().rev() {
146                let Some(ts) = entry["ts"].as_str() else {
147                    continue;
148                };
149
150                let content = entry["text"].as_str().unwrap_or("").to_string();
151                if content.is_empty() && entry.get("files").is_none() {
152                    continue;
153                }
154
155                let sender = entry["user"]
156                    .as_str()
157                    .or_else(|| entry["bot_id"].as_str())
158                    .unwrap_or("unknown")
159                    .to_string();
160                let timestamp = ts
161                    .split('.')
162                    .next()
163                    .and_then(|seconds| seconds.parse::<i64>().ok())
164                    .unwrap_or_default();
165
166                messages.push((
167                    ts.to_string(),
168                    Message {
169                        id: ts.to_string(),
170                        sender,
171                        content,
172                        timestamp,
173                        channel: Some(channel_id.to_string()),
174                        reply_to: entry["thread_ts"]
175                            .as_str()
176                            .filter(|thread_ts| *thread_ts != ts)
177                            .map(ToString::to_string),
178                        thread_id: None,
179                        media: None,
180                        is_direct: false,
181                        message_type: MessageType::Text,
182                        edited_timestamp: None,
183                        reactions: None,
184                    },
185                ));
186            }
187        }
188
189        Ok(messages)
190    }
191}
192
193#[async_trait]
194impl Messenger for SlackMessenger {
195    fn name(&self) -> &str {
196        &self.name
197    }
198
199    fn messenger_type(&self) -> &str {
200        "slack"
201    }
202
203    async fn initialize(&mut self) -> Result<()> {
204        let data = self.get_json("auth.test").await?;
205        self.parse_ok_response(&data, "auth.test")?;
206        self.connected = true;
207        Ok(())
208    }
209
210    async fn send_message(&self, channel: &str, text: &str) -> Result<String> {
211        let data = self
212            .post_json(
213                "chat.postMessage",
214                json!({
215                "channel": channel,
216                "text": text,
217                }),
218            )
219            .await?;
220        self.parse_ok_response(&data, "chat.postMessage")?;
221
222        Ok(data["ts"].as_str().unwrap_or("").to_string())
223    }
224
225    async fn receive_messages(&self) -> Result<Vec<Message>> {
226        let conversation_ids = self.fetch_conversation_ids().await?;
227        let mut received = Vec::new();
228
229        for channel_id in conversation_ids {
230            let channel_last_seen = {
231                let last_seen = self.last_seen_ts.lock().await;
232                last_seen.get(&channel_id).cloned()
233            };
234            let channel_messages = self
235                .fetch_channel_messages(&channel_id, channel_last_seen.as_deref())
236                .await?;
237
238            if let Some((latest_ts, _)) = channel_messages.last() {
239                let mut last_seen = self.last_seen_ts.lock().await;
240                last_seen.insert(channel_id.clone(), latest_ts.clone());
241            }
242
243            received.extend(channel_messages.into_iter().map(|(_, message)| message));
244        }
245
246        Ok(received)
247    }
248
249    fn is_connected(&self) -> bool {
250        self.connected
251    }
252
253    async fn disconnect(&mut self) -> Result<()> {
254        self.last_seen_ts.lock().await.clear();
255        self.connected = false;
256        Ok(())
257    }
258}