chat_system/messengers/
slack.rs1use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use reqwest::Client;
8use serde_json::{Value, json};
9use std::collections::HashMap;
10use tokio::sync::Mutex;
11
12pub struct SlackMessenger {
13 name: String,
14 token: String,
15 api_base_url: String,
16 client: Client,
17 last_seen_ts: Mutex<HashMap<String, String>>,
18 connected: bool,
19}
20
21impl SlackMessenger {
22 pub fn new(name: impl Into<String>, token: impl Into<String>) -> Self {
23 Self {
24 name: name.into(),
25 token: token.into(),
26 api_base_url: "https://slack.com/api".to_string(),
27 client: Client::new(),
28 last_seen_ts: Mutex::new(HashMap::new()),
29 connected: false,
30 }
31 }
32
33 pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
34 self.api_base_url = url.into();
35 self
36 }
37
38 fn api_url(&self, path: impl AsRef<str>) -> String {
39 format!(
40 "{}/{}",
41 self.api_base_url.trim_end_matches('/'),
42 path.as_ref().trim_start_matches('/')
43 )
44 }
45
46 async fn get_json(&self, path: impl AsRef<str>) -> Result<Value> {
47 let response = self
48 .client
49 .get(self.api_url(path))
50 .bearer_auth(&self.token)
51 .send()
52 .await
53 .context("Slack API request failed")?;
54
55 let status = response.status();
56 let body = response
57 .text()
58 .await
59 .context("Failed to read Slack response body")?;
60 if !status.is_success() {
61 anyhow::bail!("Slack API request failed {}: {}", status, body);
62 }
63
64 serde_json::from_str(&body).context("Invalid Slack API response")
65 }
66
67 async fn post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
68 let response = self
69 .client
70 .post(self.api_url(path))
71 .bearer_auth(&self.token)
72 .json(&body)
73 .send()
74 .await
75 .context("Slack API request failed")?;
76
77 let status = response.status();
78 let response_body = response
79 .text()
80 .await
81 .context("Failed to read Slack response body")?;
82 if !status.is_success() {
83 anyhow::bail!("Slack API request failed {}: {}", status, response_body);
84 }
85
86 serde_json::from_str(&response_body).context("Invalid Slack API response")
87 }
88
89 fn parse_ok_response(&self, data: &Value, operation: &str) -> Result<()> {
90 if data["ok"].as_bool().unwrap_or(false) {
91 Ok(())
92 } else {
93 anyhow::bail!("Slack {} failed: {:?}", operation, data);
94 }
95 }
96
97 async fn fetch_conversation_ids(&self) -> Result<Vec<String>> {
98 let data = self
99 .get_json("conversations.list?types=public_channel,private_channel,im,mpim&exclude_archived=true&limit=1000")
100 .await?;
101 self.parse_ok_response(&data, "conversations.list")?;
102
103 Ok(data["channels"]
104 .as_array()
105 .into_iter()
106 .flatten()
107 .filter_map(|channel| channel["id"].as_str().map(ToString::to_string))
108 .collect())
109 }
110
111 async fn fetch_channel_messages(
112 &self,
113 channel_id: &str,
114 last_seen_ts: Option<&str>,
115 ) -> Result<Vec<(String, Message)>> {
116 let mut path = format!("conversations.history?channel={channel_id}&limit=100");
117 if let Some(ts) = last_seen_ts {
118 path.push_str("&oldest=");
119 path.push_str(ts);
120 path.push_str("&inclusive=false");
121 }
122
123 let data = self.get_json(path).await?;
124 self.parse_ok_response(&data, "conversations.history")?;
125
126 let mut messages = Vec::new();
127 if let Some(entries) = data["messages"].as_array() {
128 for entry in entries.iter().rev() {
129 let Some(ts) = entry["ts"].as_str() else {
130 continue;
131 };
132
133 let content = entry["text"].as_str().unwrap_or("").to_string();
134 if content.is_empty() && entry.get("files").is_none() {
135 continue;
136 }
137
138 let sender = entry["user"]
139 .as_str()
140 .or_else(|| entry["bot_id"].as_str())
141 .unwrap_or("unknown")
142 .to_string();
143 let timestamp = ts
144 .split('.')
145 .next()
146 .and_then(|seconds| seconds.parse::<i64>().ok())
147 .unwrap_or_default();
148
149 messages.push((
150 ts.to_string(),
151 Message {
152 id: ts.to_string(),
153 sender,
154 content,
155 timestamp,
156 channel: Some(channel_id.to_string()),
157 reply_to: entry["thread_ts"]
158 .as_str()
159 .filter(|thread_ts| *thread_ts != ts)
160 .map(ToString::to_string),
161 thread_id: None,
162 media: None,
163 is_direct: false,
164 message_type: MessageType::Text,
165 edited_timestamp: None,
166 reactions: None,
167 },
168 ));
169 }
170 }
171
172 Ok(messages)
173 }
174}
175
176#[async_trait]
177impl Messenger for SlackMessenger {
178 fn name(&self) -> &str {
179 &self.name
180 }
181
182 fn messenger_type(&self) -> &str {
183 "slack"
184 }
185
186 async fn initialize(&mut self) -> Result<()> {
187 let data = self.get_json("auth.test").await?;
188 self.parse_ok_response(&data, "auth.test")?;
189 self.connected = true;
190 Ok(())
191 }
192
193 async fn send_message(&self, channel: &str, text: &str) -> Result<String> {
194 let data = self
195 .post_json(
196 "chat.postMessage",
197 json!({
198 "channel": channel,
199 "text": text,
200 }),
201 )
202 .await?;
203 self.parse_ok_response(&data, "chat.postMessage")?;
204
205 Ok(data["ts"].as_str().unwrap_or("").to_string())
206 }
207
208 async fn receive_messages(&self) -> Result<Vec<Message>> {
209 let conversation_ids = self.fetch_conversation_ids().await?;
210 let mut received = Vec::new();
211
212 for channel_id in conversation_ids {
213 let channel_last_seen = {
214 let last_seen = self.last_seen_ts.lock().await;
215 last_seen.get(&channel_id).cloned()
216 };
217 let channel_messages = self
218 .fetch_channel_messages(&channel_id, channel_last_seen.as_deref())
219 .await?;
220
221 if let Some((latest_ts, _)) = channel_messages.last() {
222 let mut last_seen = self.last_seen_ts.lock().await;
223 last_seen.insert(channel_id.clone(), latest_ts.clone());
224 }
225
226 received.extend(channel_messages.into_iter().map(|(_, message)| message));
227 }
228
229 Ok(received)
230 }
231
232 fn is_connected(&self) -> bool {
233 self.connected
234 }
235
236 async fn disconnect(&mut self) -> Result<()> {
237 self.last_seen_ts.lock().await.clear();
238 self.connected = false;
239 Ok(())
240 }
241}