chat_system/messengers/
slack.rs1use crate::message::MessageType;
4use crate::{Message, Messenger};
5use anyhow::{Context, Result};
6use async_trait::async_trait;
7use reqwest::Client;
8use serde_json::{Value, json};
9use std::collections::HashMap;
10use tokio::sync::Mutex;
11
12pub struct SlackMessenger {
13 name: String,
14 token: String,
15 app_token: Option<String>,
16 default_channel: Option<String>,
17 api_base_url: String,
18 client: Client,
19 last_seen_ts: Mutex<HashMap<String, String>>,
20 connected: bool,
21}
22
23impl SlackMessenger {
24 pub fn new(name: impl Into<String>, token: impl Into<String>) -> Self {
25 Self {
26 name: name.into(),
27 token: token.into(),
28 app_token: None,
29 default_channel: None,
30 api_base_url: "https://slack.com/api".to_string(),
31 client: Client::new(),
32 last_seen_ts: Mutex::new(HashMap::new()),
33 connected: false,
34 }
35 }
36
37 pub fn with_app_token(mut self, token: impl Into<String>) -> Self {
40 self.app_token = Some(token.into());
41 self
42 }
43
44 pub fn with_default_channel(mut self, channel: impl Into<String>) -> Self {
46 self.default_channel = Some(channel.into());
47 self
48 }
49
50 pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
51 self.api_base_url = url.into();
52 self
53 }
54
55 fn api_url(&self, path: impl AsRef<str>) -> String {
56 format!(
57 "{}/{}",
58 self.api_base_url.trim_end_matches('/'),
59 path.as_ref().trim_start_matches('/')
60 )
61 }
62
63 async fn get_json(&self, path: impl AsRef<str>) -> Result<Value> {
64 let response = self
65 .client
66 .get(self.api_url(path))
67 .bearer_auth(&self.token)
68 .send()
69 .await
70 .context("Slack API request failed")?;
71
72 let status = response.status();
73 let body = response
74 .text()
75 .await
76 .context("Failed to read Slack response body")?;
77 if !status.is_success() {
78 anyhow::bail!("Slack API request failed {}: {}", status, body);
79 }
80
81 serde_json::from_str(&body).context("Invalid Slack API response")
82 }
83
84 async fn post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
85 let response = self
86 .client
87 .post(self.api_url(path))
88 .bearer_auth(&self.token)
89 .json(&body)
90 .send()
91 .await
92 .context("Slack API request failed")?;
93
94 let status = response.status();
95 let response_body = response
96 .text()
97 .await
98 .context("Failed to read Slack response body")?;
99 if !status.is_success() {
100 anyhow::bail!("Slack API request failed {}: {}", status, response_body);
101 }
102
103 serde_json::from_str(&response_body).context("Invalid Slack API response")
104 }
105
106 fn parse_ok_response(&self, data: &Value, operation: &str) -> Result<()> {
107 if data["ok"].as_bool().unwrap_or(false) {
108 Ok(())
109 } else {
110 anyhow::bail!("Slack {} failed: {:?}", operation, data);
111 }
112 }
113
114 async fn fetch_conversation_ids(&self) -> Result<Vec<String>> {
115 let data = self
116 .get_json("conversations.list?types=public_channel,private_channel,im,mpim&exclude_archived=true&limit=1000")
117 .await?;
118 self.parse_ok_response(&data, "conversations.list")?;
119
120 Ok(data["channels"]
121 .as_array()
122 .into_iter()
123 .flatten()
124 .filter_map(|channel| channel["id"].as_str().map(ToString::to_string))
125 .collect())
126 }
127
128 async fn fetch_channel_messages(
129 &self,
130 channel_id: &str,
131 last_seen_ts: Option<&str>,
132 ) -> Result<Vec<(String, Message)>> {
133 let mut path = format!("conversations.history?channel={channel_id}&limit=100");
134 if let Some(ts) = last_seen_ts {
135 path.push_str("&oldest=");
136 path.push_str(ts);
137 path.push_str("&inclusive=false");
138 }
139
140 let data = self.get_json(path).await?;
141 self.parse_ok_response(&data, "conversations.history")?;
142
143 let mut messages = Vec::new();
144 if let Some(entries) = data["messages"].as_array() {
145 for entry in entries.iter().rev() {
146 let Some(ts) = entry["ts"].as_str() else {
147 continue;
148 };
149
150 let content = entry["text"].as_str().unwrap_or("").to_string();
151 if content.is_empty() && entry.get("files").is_none() {
152 continue;
153 }
154
155 let sender = entry["user"]
156 .as_str()
157 .or_else(|| entry["bot_id"].as_str())
158 .unwrap_or("unknown")
159 .to_string();
160 let timestamp = ts
161 .split('.')
162 .next()
163 .and_then(|seconds| seconds.parse::<i64>().ok())
164 .unwrap_or_default();
165
166 messages.push((
167 ts.to_string(),
168 Message {
169 id: ts.to_string(),
170 sender,
171 content,
172 timestamp,
173 channel: Some(channel_id.to_string()),
174 reply_to: entry["thread_ts"]
175 .as_str()
176 .filter(|thread_ts| *thread_ts != ts)
177 .map(ToString::to_string),
178 thread_id: None,
179 media: None,
180 is_direct: false,
181 message_type: MessageType::Text,
182 edited_timestamp: None,
183 reactions: None,
184 },
185 ));
186 }
187 }
188
189 Ok(messages)
190 }
191}
192
193#[async_trait]
194impl Messenger for SlackMessenger {
195 fn name(&self) -> &str {
196 &self.name
197 }
198
199 fn messenger_type(&self) -> &str {
200 "slack"
201 }
202
203 async fn initialize(&mut self) -> Result<()> {
204 let data = self.get_json("auth.test").await?;
205 self.parse_ok_response(&data, "auth.test")?;
206 self.connected = true;
207 Ok(())
208 }
209
210 async fn send_message(&self, channel: &str, text: &str) -> Result<String> {
211 let data = self
212 .post_json(
213 "chat.postMessage",
214 json!({
215 "channel": channel,
216 "text": text,
217 }),
218 )
219 .await?;
220 self.parse_ok_response(&data, "chat.postMessage")?;
221
222 Ok(data["ts"].as_str().unwrap_or("").to_string())
223 }
224
225 async fn receive_messages(&self) -> Result<Vec<Message>> {
226 let conversation_ids = self.fetch_conversation_ids().await?;
227 let mut received = Vec::new();
228
229 for channel_id in conversation_ids {
230 let channel_last_seen = {
231 let last_seen = self.last_seen_ts.lock().await;
232 last_seen.get(&channel_id).cloned()
233 };
234 let channel_messages = self
235 .fetch_channel_messages(&channel_id, channel_last_seen.as_deref())
236 .await?;
237
238 if let Some((latest_ts, _)) = channel_messages.last() {
239 let mut last_seen = self.last_seen_ts.lock().await;
240 last_seen.insert(channel_id.clone(), latest_ts.clone());
241 }
242
243 received.extend(channel_messages.into_iter().map(|(_, message)| message));
244 }
245
246 Ok(received)
247 }
248
249 fn is_connected(&self) -> bool {
250 self.connected
251 }
252
253 async fn disconnect(&mut self) -> Result<()> {
254 self.last_seen_ts.lock().await.clear();
255 self.connected = false;
256 Ok(())
257 }
258}