chat_system/messengers/
slack.rs1use crate::{Message, Messenger};
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use reqwest::Client;
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use tokio::sync::Mutex;
10
11pub struct SlackMessenger {
12 name: String,
13 token: String,
14 api_base_url: String,
15 client: Client,
16 last_seen_ts: Mutex<HashMap<String, String>>,
17 connected: bool,
18}
19
20impl SlackMessenger {
21 pub fn new(name: impl Into<String>, token: impl Into<String>) -> Self {
22 Self {
23 name: name.into(),
24 token: token.into(),
25 api_base_url: "https://slack.com/api".to_string(),
26 client: Client::new(),
27 last_seen_ts: Mutex::new(HashMap::new()),
28 connected: false,
29 }
30 }
31
32 pub fn with_api_base_url(mut self, url: impl Into<String>) -> Self {
33 self.api_base_url = url.into();
34 self
35 }
36
37 fn api_url(&self, path: impl AsRef<str>) -> String {
38 format!(
39 "{}/{}",
40 self.api_base_url.trim_end_matches('/'),
41 path.as_ref().trim_start_matches('/')
42 )
43 }
44
45 async fn get_json(&self, path: impl AsRef<str>) -> Result<Value> {
46 let response = self
47 .client
48 .get(self.api_url(path))
49 .bearer_auth(&self.token)
50 .send()
51 .await
52 .context("Slack API request failed")?;
53
54 let status = response.status();
55 let body = response.text().await.context("Failed to read Slack response body")?;
56 if !status.is_success() {
57 anyhow::bail!("Slack API request failed {}: {}", status, body);
58 }
59
60 serde_json::from_str(&body).context("Invalid Slack API response")
61 }
62
63 async fn post_json(&self, path: impl AsRef<str>, body: Value) -> Result<Value> {
64 let response = self
65 .client
66 .post(self.api_url(path))
67 .bearer_auth(&self.token)
68 .json(&body)
69 .send()
70 .await
71 .context("Slack API request failed")?;
72
73 let status = response.status();
74 let response_body = response.text().await.context("Failed to read Slack response body")?;
75 if !status.is_success() {
76 anyhow::bail!("Slack API request failed {}: {}", status, response_body);
77 }
78
79 serde_json::from_str(&response_body).context("Invalid Slack API response")
80 }
81
82 fn parse_ok_response(&self, data: &Value, operation: &str) -> Result<()> {
83 if data["ok"].as_bool().unwrap_or(false) {
84 Ok(())
85 } else {
86 anyhow::bail!("Slack {} failed: {:?}", operation, data);
87 }
88 }
89
90 async fn fetch_conversation_ids(&self) -> Result<Vec<String>> {
91 let data = self
92 .get_json("conversations.list?types=public_channel,private_channel,im,mpim&exclude_archived=true&limit=1000")
93 .await?;
94 self.parse_ok_response(&data, "conversations.list")?;
95
96 Ok(data["channels"]
97 .as_array()
98 .into_iter()
99 .flatten()
100 .filter_map(|channel| channel["id"].as_str().map(ToString::to_string))
101 .collect())
102 }
103
104 async fn fetch_channel_messages(
105 &self,
106 channel_id: &str,
107 last_seen_ts: Option<&str>,
108 ) -> Result<Vec<(String, Message)>> {
109 let mut path = format!("conversations.history?channel={channel_id}&limit=100");
110 if let Some(ts) = last_seen_ts {
111 path.push_str("&oldest=");
112 path.push_str(ts);
113 path.push_str("&inclusive=false");
114 }
115
116 let data = self.get_json(path).await?;
117 self.parse_ok_response(&data, "conversations.history")?;
118
119 let mut messages = Vec::new();
120 if let Some(entries) = data["messages"].as_array() {
121 for entry in entries.iter().rev() {
122 let Some(ts) = entry["ts"].as_str() else {
123 continue;
124 };
125
126 let content = entry["text"].as_str().unwrap_or("").to_string();
127 if content.is_empty() && entry.get("files").is_none() {
128 continue;
129 }
130
131 let sender = entry["user"]
132 .as_str()
133 .or_else(|| entry["bot_id"].as_str())
134 .unwrap_or("unknown")
135 .to_string();
136 let timestamp = ts
137 .split('.')
138 .next()
139 .and_then(|seconds| seconds.parse::<i64>().ok())
140 .unwrap_or_default();
141
142 messages.push((
143 ts.to_string(),
144 Message {
145 id: ts.to_string(),
146 sender,
147 content,
148 timestamp,
149 channel: Some(channel_id.to_string()),
150 reply_to: entry["thread_ts"]
151 .as_str()
152 .filter(|thread_ts| *thread_ts != ts)
153 .map(ToString::to_string),
154 media: None,
155 is_direct: false,
156 reactions: None,
157 },
158 ));
159 }
160 }
161
162 Ok(messages)
163 }
164}
165
166#[async_trait]
167impl Messenger for SlackMessenger {
168 fn name(&self) -> &str {
169 &self.name
170 }
171
172 fn messenger_type(&self) -> &str {
173 "slack"
174 }
175
176 async fn initialize(&mut self) -> Result<()> {
177 let data = self.get_json("auth.test").await?;
178 self.parse_ok_response(&data, "auth.test")?;
179 self.connected = true;
180 Ok(())
181 }
182
183 async fn send_message(&self, channel: &str, text: &str) -> Result<String> {
184 let data = self
185 .post_json(
186 "chat.postMessage",
187 json!({
188 "channel": channel,
189 "text": text,
190 }),
191 )
192 .await?;
193 self.parse_ok_response(&data, "chat.postMessage")?;
194
195 Ok(data["ts"].as_str().unwrap_or("").to_string())
196 }
197
198 async fn receive_messages(&self) -> Result<Vec<Message>> {
199 let conversation_ids = self.fetch_conversation_ids().await?;
200 let mut received = Vec::new();
201
202 for channel_id in conversation_ids {
203 let channel_last_seen = {
204 let last_seen = self.last_seen_ts.lock().await;
205 last_seen.get(&channel_id).cloned()
206 };
207 let channel_messages = self
208 .fetch_channel_messages(&channel_id, channel_last_seen.as_deref())
209 .await?;
210
211 if let Some((latest_ts, _)) = channel_messages.last() {
212 let mut last_seen = self.last_seen_ts.lock().await;
213 last_seen.insert(channel_id.clone(), latest_ts.clone());
214 }
215
216 received.extend(channel_messages.into_iter().map(|(_, message)| message));
217 }
218
219 Ok(received)
220 }
221
222 fn is_connected(&self) -> bool {
223 self.connected
224 }
225
226 async fn disconnect(&mut self) -> Result<()> {
227 self.last_seen_ts.lock().await.clear();
228 self.connected = false;
229 Ok(())
230 }
231}