Skip to main content

chat_system/messengers/
slack.rs

1//! Slack messenger — Web API implementation.
2
3use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use reqwest::Client;
8use serde_json::{Value, json};
9use std::collections::HashMap;
10use tokio::sync::Mutex;
11
12pub struct SlackMessenger {
13    name: String,
14    token: String,
15    api_base_url: String,
16    client: Client,
17    last_seen_ts: Mutex<HashMap<String, String>>,
18    connected: bool,
19}
20
21impl SlackMessenger {
22    pub fn new(name: impl Into<String>, token: impl Into<String>) -> Self {
23        Self {
24            name: name.into(),
25            token: token.into(),
26            api_base_url: "https://slack.com/api".to_string(),
27            client: Client::new(),
28            last_seen_ts: Mutex::new(HashMap::new()),
29            connected: false,
30        }
31    }
32
33    pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
34        self.api_base_url = url.into();
35        self
36    }
37
38    fn api_url(&self, path: impl AsRef<str>) -> String {
39        format!(
40            "{}/{}",
41            self.api_base_url.trim_end_matches('/'),
42            path.as_ref().trim_start_matches('/')
43        )
44    }
45
46    async fn get_json(&self, path: impl AsRef<str>) -> Result<Value> {
47        let response = self
48            .client
49            .get(self.api_url(path))
50            .bearer_auth(&self.token)
51            .send()
52            .await
53            .context("Slack API request failed")?;
54
55        let status = response.status();
56        let body = response
57            .text()
58            .await
59            .context("Failed to read Slack response body")?;
60        if !status.is_success() {
61            anyhow::bail!("Slack API request failed {}: {}", status, body);
62        }
63
64        serde_json::from_str(&body).context("Invalid Slack API response")
65    }
66
67    async fn post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
68        let response = self
69            .client
70            .post(self.api_url(path))
71            .bearer_auth(&self.token)
72            .json(&body)
73            .send()
74            .await
75            .context("Slack API request failed")?;
76
77        let status = response.status();
78        let response_body = response
79            .text()
80            .await
81            .context("Failed to read Slack response body")?;
82        if !status.is_success() {
83            anyhow::bail!("Slack API request failed {}: {}", status, response_body);
84        }
85
86        serde_json::from_str(&response_body).context("Invalid Slack API response")
87    }
88
89    fn parse_ok_response(&self, data: &Value, operation: &str) -> Result<()> {
90        if data["ok"].as_bool().unwrap_or(false) {
91            Ok(())
92        } else {
93            anyhow::bail!("Slack {} failed: {:?}", operation, data);
94        }
95    }
96
97    async fn fetch_conversation_ids(&self) -> Result<Vec<String>> {
98        let data = self
99            .get_json("conversations.list?types=public_channel,private_channel,im,mpim&exclude_archived=true&limit=1000")
100            .await?;
101        self.parse_ok_response(&data, "conversations.list")?;
102
103        Ok(data["channels"]
104            .as_array()
105            .into_iter()
106            .flatten()
107            .filter_map(|channel| channel["id"].as_str().map(ToString::to_string))
108            .collect())
109    }
110
111    async fn fetch_channel_messages(
112        &self,
113        channel_id: &str,
114        last_seen_ts: Option<&str>,
115    ) -> Result<Vec<(String, Message)>> {
116        let mut path = format!("conversations.history?channel={channel_id}&limit=100");
117        if let Some(ts) = last_seen_ts {
118            path.push_str("&oldest=");
119            path.push_str(ts);
120            path.push_str("&inclusive=false");
121        }
122
123        let data = self.get_json(path).await?;
124        self.parse_ok_response(&data, "conversations.history")?;
125
126        let mut messages = Vec::new();
127        if let Some(entries) = data["messages"].as_array() {
128            for entry in entries.iter().rev() {
129                let Some(ts) = entry["ts"].as_str() else {
130                    continue;
131                };
132
133                let content = entry["text"].as_str().unwrap_or("").to_string();
134                if content.is_empty() && entry.get("files").is_none() {
135                    continue;
136                }
137
138                let sender = entry["user"]
139                    .as_str()
140                    .or_else(|| entry["bot_id"].as_str())
141                    .unwrap_or("unknown")
142                    .to_string();
143                let timestamp = ts
144                    .split('.')
145                    .next()
146                    .and_then(|seconds| seconds.parse::<i64>().ok())
147                    .unwrap_or_default();
148
149                messages.push((
150                    ts.to_string(),
151                    Message {
152                        id: ts.to_string(),
153                        sender,
154                        content,
155                        timestamp,
156                        channel: Some(channel_id.to_string()),
157                        reply_to: entry["thread_ts"]
158                            .as_str()
159                            .filter(|thread_ts| *thread_ts != ts)
160                            .map(ToString::to_string),
161                        thread_id: None,
162                        media: None,
163                        is_direct: false,
164                        message_type: MessageType::Text,
165                        edited_timestamp: None,
166                        reactions: None,
167                    },
168                ));
169            }
170        }
171
172        Ok(messages)
173    }
174}
175
176#[async_trait]
177impl Messenger for SlackMessenger {
178    fn name(&self) -> &str {
179        &self.name
180    }
181
182    fn messenger_type(&self) -> &str {
183        "slack"
184    }
185
186    async fn initialize(&mut self) -> Result<()> {
187        let data = self.get_json("auth.test").await?;
188        self.parse_ok_response(&data, "auth.test")?;
189        self.connected = true;
190        Ok(())
191    }
192
193    async fn send_message(&self, channel: &str, text: &str) -> Result<String> {
194        let data = self
195            .post_json(
196                "chat.postMessage",
197                json!({
198                "channel": channel,
199                "text": text,
200                }),
201            )
202            .await?;
203        self.parse_ok_response(&data, "chat.postMessage")?;
204
205        Ok(data["ts"].as_str().unwrap_or("").to_string())
206    }
207
208    async fn receive_messages(&self) -> Result<Vec<Message>> {
209        let conversation_ids = self.fetch_conversation_ids().await?;
210        let mut received = Vec::new();
211
212        for channel_id in conversation_ids {
213            let channel_last_seen = {
214                let last_seen = self.last_seen_ts.lock().await;
215                last_seen.get(&channel_id).cloned()
216            };
217            let channel_messages = self
218                .fetch_channel_messages(&channel_id, channel_last_seen.as_deref())
219                .await?;
220
221            if let Some((latest_ts, _)) = channel_messages.last() {
222                let mut last_seen = self.last_seen_ts.lock().await;
223                last_seen.insert(channel_id.clone(), latest_ts.clone());
224            }
225
226            received.extend(channel_messages.into_iter().map(|(_, message)| message));
227        }
228
229        Ok(received)
230    }
231
232    fn is_connected(&self) -> bool {
233        self.connected
234    }
235
236    async fn disconnect(&mut self) -> Result<()> {
237        self.last_seen_ts.lock().await.clear();
238        self.connected = false;
239        Ok(())
240    }
241}