Skip to main content

chat_system/messengers/
slack.rs

1//! Slack messenger — Web API implementation.
2
3use crate::{Message, Messenger};
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use reqwest::Client;
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use tokio::sync::Mutex;
10
11pub struct SlackMessenger {
12    name: String,
13    token: String,
14    api_base_url: String,
15    client: Client,
16    last_seen_ts: Mutex<HashMap<String, String>>,
17    connected: bool,
18}
19
20impl SlackMessenger {
21    pub fn new(name: impl Into<String>, token: impl Into<String>) -> Self {
22        Self {
23            name: name.into(),
24            token: token.into(),
25            api_base_url: "https://slack.com/api".to_string(),
26            client: Client::new(),
27            last_seen_ts: Mutex::new(HashMap::new()),
28            connected: false,
29        }
30    }
31
32    pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
33        self.api_base_url = url.into();
34        self
35    }
36
37    fn api_url(&self, path: impl AsRef<str>) -> String {
38        format!(
39            "{}/{}",
40            self.api_base_url.trim_end_matches('/'),
41            path.as_ref().trim_start_matches('/')
42        )
43    }
44
45    async fn get_json(&self, path: impl AsRef<str>) -> Result<Value> {
46        let response = self
47            .client
48            .get(self.api_url(path))
49            .bearer_auth(&self.token)
50            .send()
51            .await
52            .context("Slack API request failed")?;
53
54        let status = response.status();
55        let body = response.text().await.context("Failed to read Slack response body")?;
56        if !status.is_success() {
57            anyhow::bail!("Slack API request failed {}: {}", status, body);
58        }
59
60        serde_json::from_str(&body).context("Invalid Slack API response")
61    }
62
63    async fn post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
64        let response = self
65            .client
66            .post(self.api_url(path))
67            .bearer_auth(&self.token)
68            .json(&body)
69            .send()
70            .await
71            .context("Slack API request failed")?;
72
73        let status = response.status();
74        let response_body = response.text().await.context("Failed to read Slack response body")?;
75        if !status.is_success() {
76            anyhow::bail!("Slack API request failed {}: {}", status, response_body);
77        }
78
79        serde_json::from_str(&response_body).context("Invalid Slack API response")
80    }
81
82    fn parse_ok_response(&self, data: &Value, operation: &str) -> Result<()> {
83        if data["ok"].as_bool().unwrap_or(false) {
84            Ok(())
85        } else {
86            anyhow::bail!("Slack {} failed: {:?}", operation, data);
87        }
88    }
89
90    async fn fetch_conversation_ids(&self) -> Result<Vec<String>> {
91        let data = self
92            .get_json("conversations.list?types=public_channel,private_channel,im,mpim&exclude_archived=true&limit=1000")
93            .await?;
94        self.parse_ok_response(&data, "conversations.list")?;
95
96        Ok(data["channels"]
97            .as_array()
98            .into_iter()
99            .flatten()
100            .filter_map(|channel| channel["id"].as_str().map(ToString::to_string))
101            .collect())
102    }
103
104    async fn fetch_channel_messages(
105        &self,
106        channel_id: &str,
107        last_seen_ts: Option<&str>,
108    ) -> Result<Vec<(String, Message)>> {
109        let mut path = format!("conversations.history?channel={channel_id}&limit=100");
110        if let Some(ts) = last_seen_ts {
111            path.push_str("&oldest=");
112            path.push_str(ts);
113            path.push_str("&inclusive=false");
114        }
115
116        let data = self.get_json(path).await?;
117        self.parse_ok_response(&data, "conversations.history")?;
118
119        let mut messages = Vec::new();
120        if let Some(entries) = data["messages"].as_array() {
121            for entry in entries.iter().rev() {
122                let Some(ts) = entry["ts"].as_str() else {
123                    continue;
124                };
125
126                let content = entry["text"].as_str().unwrap_or("").to_string();
127                if content.is_empty() && entry.get("files").is_none() {
128                    continue;
129                }
130
131                let sender = entry["user"]
132                    .as_str()
133                    .or_else(|| entry["bot_id"].as_str())
134                    .unwrap_or("unknown")
135                    .to_string();
136                let timestamp = ts
137                    .split('.')
138                    .next()
139                    .and_then(|seconds| seconds.parse::<i64>().ok())
140                    .unwrap_or_default();
141
142                messages.push((
143                    ts.to_string(),
144                    Message {
145                        id: ts.to_string(),
146                        sender,
147                        content,
148                        timestamp,
149                        channel: Some(channel_id.to_string()),
150                        reply_to: entry["thread_ts"]
151                            .as_str()
152                            .filter(|thread_ts| *thread_ts != ts)
153                            .map(ToString::to_string),
154                        media: None,
155                        is_direct: false,
156                        reactions: None,
157                    },
158                ));
159            }
160        }
161
162        Ok(messages)
163    }
164}
165
166#[async_trait]
167impl Messenger for SlackMessenger {
168    fn name(&self) -> &str {
169        &self.name
170    }
171
172    fn messenger_type(&self) -> &str {
173        "slack"
174    }
175
176    async fn initialize(&mut self) -> Result<()> {
177        let data = self.get_json("auth.test").await?;
178        self.parse_ok_response(&data, "auth.test")?;
179        self.connected = true;
180        Ok(())
181    }
182
183    async fn send_message(&self, channel: &str, text: &str) -> Result<String> {
184        let data = self
185            .post_json(
186                "chat.postMessage",
187                json!({
188                "channel": channel,
189                "text": text,
190                }),
191            )
192            .await?;
193        self.parse_ok_response(&data, "chat.postMessage")?;
194
195        Ok(data["ts"].as_str().unwrap_or("").to_string())
196    }
197
198    async fn receive_messages(&self) -> Result<Vec<Message>> {
199        let conversation_ids = self.fetch_conversation_ids().await?;
200        let mut received = Vec::new();
201
202        for channel_id in conversation_ids {
203            let channel_last_seen = {
204                let last_seen = self.last_seen_ts.lock().await;
205                last_seen.get(&channel_id).cloned()
206            };
207            let channel_messages = self
208                .fetch_channel_messages(&channel_id, channel_last_seen.as_deref())
209                .await?;
210
211            if let Some((latest_ts, _)) = channel_messages.last() {
212                let mut last_seen = self.last_seen_ts.lock().await;
213                last_seen.insert(channel_id.clone(), latest_ts.clone());
214            }
215
216            received.extend(channel_messages.into_iter().map(|(_, message)| message));
217        }
218
219        Ok(received)
220    }
221
222    fn is_connected(&self) -> bool {
223        self.connected
224    }
225
226    async fn disconnect(&mut self) -> Result<()> {
227        self.last_seen_ts.lock().await.clear();
228        self.connected = false;
229        Ok(())
230    }
231}