1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use devboy_core::types::ChatType;
8use devboy_core::{
9 Error, GetChatsParams, GetMessagesParams, MessageAttachment, MessageAuthor, MessengerChat,
10 MessengerMessage, MessengerProvider, Pagination, ProviderResult, Result, SearchMessagesParams,
11 SendMessageParams,
12};
13use reqwest::header::HeaderMap;
14use secrecy::{ExposeSecret, SecretString};
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use tokio::sync::{Mutex, RwLock};
18use tokio::time::{Instant, sleep_until};
19use tracing::debug;
20
21use crate::DEFAULT_SLACK_API_URL;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct SlackAuthInfo {
25 pub user_id: String,
26 pub user_name: Option<String>,
27 pub team_id: String,
28 pub team_name: String,
29 pub bot_id: Option<String>,
30 pub url: Option<String>,
31 pub scopes: Vec<String>,
32 pub missing_scopes: Vec<String>,
33}
34
35#[derive(Clone)]
36pub struct SlackClient {
37 token: SecretString,
38 base_url: String,
39 http: reqwest::Client,
40 required_scopes: Vec<String>,
41 user_cache: Arc<RwLock<HashMap<String, MessageAuthor>>>,
42 rate_limiter: Arc<SlackRateLimiter>,
43}
44
45impl fmt::Debug for SlackClient {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 f.debug_struct("SlackClient")
48 .field("token", &"<redacted>")
49 .field("base_url", &self.base_url)
50 .field("http", &self.http)
51 .field("required_scopes", &self.required_scopes)
52 .field("user_cache", &self.user_cache)
53 .field("rate_limiter", &self.rate_limiter)
54 .finish()
55 }
56}
57
58const SLACK_READ_INTERVAL: Duration = Duration::from_millis(1200);
59const SLACK_WRITE_INTERVAL: Duration = Duration::from_secs(1);
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62enum SlackRateLimitBucket {
63 Read,
64 Write,
65}
66
67#[derive(Debug)]
68struct SlackRateLimiter {
69 state: Mutex<SlackRateLimitState>,
70 read_interval: Duration,
71 write_interval: Duration,
72}
73
74#[derive(Debug)]
75struct SlackRateLimitState {
76 read_ready_at: Instant,
77 write_ready_at: Instant,
78}
79
80#[derive(Debug, Deserialize)]
81struct SlackAuthTestResponse {
82 ok: bool,
83 error: Option<String>,
84 url: Option<String>,
85 team: Option<String>,
86 user: Option<String>,
87 #[serde(rename = "team_id")]
88 team_id: Option<String>,
89 #[serde(rename = "user_id")]
90 user_id: Option<String>,
91 #[serde(rename = "bot_id")]
92 bot_id: Option<String>,
93}
94
95#[derive(Debug, Deserialize)]
96struct SlackResponseMetadata {
97 #[serde(default)]
98 next_cursor: String,
99}
100
101#[derive(Debug, Deserialize)]
102struct SlackConversationsListResponse {
103 ok: bool,
104 error: Option<String>,
105 #[serde(default)]
106 channels: Vec<SlackConversation>,
107 response_metadata: Option<SlackResponseMetadata>,
108}
109
110#[derive(Debug, Clone, Deserialize)]
111struct SlackConversation {
112 id: String,
113 name: Option<String>,
114 user: Option<String>,
115 is_group: Option<bool>,
116 is_im: Option<bool>,
117 is_mpim: Option<bool>,
118 is_archived: Option<bool>,
119 num_members: Option<u32>,
120 purpose: Option<SlackTextValue>,
121 topic: Option<SlackTextValue>,
122}
123
124#[derive(Debug, Clone, Deserialize)]
125struct SlackTextValue {
126 value: Option<String>,
127}
128
129#[derive(Debug, Deserialize)]
130struct SlackMessagesResponse {
131 ok: bool,
132 error: Option<String>,
133 #[serde(default)]
134 messages: Vec<SlackMessage>,
135 has_more: Option<bool>,
136 response_metadata: Option<SlackResponseMetadata>,
137}
138
139#[derive(Debug, Clone, Default, Serialize, Deserialize)]
140struct SlackSearchCursor {
141 version: u8,
142 current_chat_id: Option<String>,
143 current_message_cursor: Option<String>,
144 #[serde(default)]
145 current_message_offset: usize,
146 #[serde(default)]
147 pending_chat_ids: Vec<String>,
148 #[serde(default)]
149 pending_chat_index: usize,
150 next_chats_cursor: Option<String>,
151}
152
153#[derive(Debug, Deserialize)]
154struct SlackPostMessageResponse {
155 ok: bool,
156 error: Option<String>,
157 channel: Option<String>,
158 ts: Option<String>,
159 message: Option<SlackMessage>,
160}
161
162#[derive(Debug, Deserialize)]
163struct SlackUsersInfoResponse {
164 ok: bool,
165 error: Option<String>,
166 user: Option<SlackUser>,
167}
168
169#[derive(Debug, Clone, Deserialize)]
170struct SlackUser {
171 id: String,
172 name: Option<String>,
173 profile: Option<SlackUserProfile>,
174}
175
176#[derive(Debug, Clone, Deserialize)]
177struct SlackUserProfile {
178 real_name: Option<String>,
179 display_name: Option<String>,
180 image_72: Option<String>,
181 #[serde(default)]
184 email: Option<String>,
185}
186
187#[derive(Debug, Clone, Deserialize)]
188struct SlackMessage {
189 ts: String,
190 text: Option<String>,
191 user: Option<String>,
192 username: Option<String>,
193 bot_id: Option<String>,
194 thread_ts: Option<String>,
195 parent_user_id: Option<String>,
196 subtype: Option<String>,
197 edited: Option<serde_json::Value>,
198 files: Option<Vec<SlackFile>>,
199 attachments: Option<Vec<SlackRichAttachment>>,
200 bot_profile: Option<SlackBotProfile>,
201}
202
203#[derive(Debug, Clone, Deserialize)]
204struct SlackBotProfile {
205 id: Option<String>,
206 name: Option<String>,
207 icons: Option<SlackBotIcons>,
208}
209
210#[derive(Debug, Clone, Deserialize)]
211struct SlackBotIcons {
212 image_72: Option<String>,
213}
214
215#[derive(Debug, Clone, Deserialize)]
216struct SlackFile {
217 id: Option<String>,
218 name: Option<String>,
219 mimetype: Option<String>,
220 filetype: Option<String>,
221 url_private: Option<String>,
222 permalink: Option<String>,
223}
224
225#[derive(Debug, Clone, Deserialize)]
226struct SlackRichAttachment {
227 id: Option<u64>,
228 title: Option<String>,
229 fallback: Option<String>,
230 service_name: Option<String>,
231 title_link: Option<String>,
232 from_url: Option<String>,
233}
234
235impl SlackClient {
236 pub fn new(token: SecretString) -> Self {
237 Self {
238 token,
239 base_url: DEFAULT_SLACK_API_URL.to_string(),
240 http: reqwest::Client::new(),
241 required_scopes: devboy_core::default_slack_required_scopes(),
242 user_cache: Arc::new(RwLock::new(HashMap::new())),
243 rate_limiter: Arc::new(SlackRateLimiter::new(
244 SLACK_READ_INTERVAL,
245 SLACK_WRITE_INTERVAL,
246 )),
247 }
248 }
249
250 pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
251 self.base_url = base_url.into().trim_end_matches('/').to_string();
252 self
253 }
254
255 pub fn with_required_scopes(mut self, required_scopes: Vec<String>) -> Self {
256 self.required_scopes = required_scopes;
257 self
258 }
259
260 pub fn required_scopes(&self) -> &[String] {
261 &self.required_scopes
262 }
263
264 pub async fn auth_info(&self) -> Result<SlackAuthInfo> {
265 let url = format!("{}/auth.test", self.base_url);
266 debug!(url, "slack auth.test request");
267
268 let response = self.send_form_request("auth.test", &[]).await?;
269 let headers = response.headers().clone();
270 let payload: SlackAuthTestResponse = map_http_error(response).await?;
271
272 if !payload.ok {
273 let message = payload
274 .error
275 .unwrap_or_else(|| "unknown_slack_error".to_string());
276 return Err(match message.as_str() {
277 "invalid_auth" | "not_authed" => Error::Unauthorized(message),
278 "missing_scope" => Error::Forbidden(message),
279 _ => Error::Api {
280 status: 200,
281 message,
282 },
283 });
284 }
285
286 let scopes = parse_scopes(&headers);
287 let missing_scopes = self
288 .required_scopes
289 .iter()
290 .filter(|scope| !scopes.iter().any(|actual| actual == *scope))
291 .cloned()
292 .collect();
293
294 Ok(SlackAuthInfo {
295 user_id: payload.user_id.unwrap_or_default(),
296 user_name: payload.user,
297 team_id: payload.team_id.unwrap_or_default(),
298 team_name: payload.team.unwrap_or_default(),
299 bot_id: payload.bot_id,
300 url: payload.url,
301 scopes,
302 missing_scopes,
303 })
304 }
305
306 pub async fn ensure_healthy(&self) -> Result<SlackAuthInfo> {
307 let info = self.auth_info().await?;
308 if info.missing_scopes.is_empty() {
309 Ok(info)
310 } else {
311 Err(Error::Forbidden(format!(
312 "Slack token is missing required scopes: {}",
313 info.missing_scopes.join(", ")
314 )))
315 }
316 }
317
318 async fn post_form<T>(&self, method: &str, params: &[(&str, String)]) -> Result<T>
319 where
320 T: DeserializeOwned,
321 {
322 let response = self.send_form_request(method, params).await?;
323 map_http_error(response).await
324 }
325
326 async fn send_form_request(
327 &self,
328 method: &str,
329 params: &[(&str, String)],
330 ) -> Result<reqwest::Response> {
331 let url = format!("{}/{}", self.base_url, method);
332 let bucket = slack_rate_limit_bucket(method);
333 debug!(url, ?bucket, "slack api request");
334
335 for attempt in 0..2 {
336 self.rate_limiter.acquire(bucket).await;
337
338 let response = self
339 .http
340 .post(&url)
341 .bearer_auth(self.token.expose_secret())
342 .form(params)
343 .send()
344 .await
345 .map_err(|e| Error::Network(e.to_string()))?;
346
347 if response.status().as_u16() != 429 {
348 return Ok(response);
349 }
350
351 let retry_after = response
352 .headers()
353 .get("retry-after")
354 .and_then(|value| value.to_str().ok())
355 .and_then(|value| value.parse::<u64>().ok());
356
357 self.rate_limiter
358 .on_rate_limited(bucket, retry_after.map(Duration::from_secs))
359 .await;
360
361 if attempt == 0 && retry_after.is_some() {
362 continue;
363 }
364
365 return Ok(response);
366 }
367
368 unreachable!("slack request retry loop should always return");
369 }
370
371 async fn get_conversations(
372 &self,
373 params: &GetChatsParams,
374 ) -> Result<ProviderResult<MessengerChat>> {
375 let limit = params.limit.unwrap_or(100).min(1000);
376 let mut form = vec![
377 ("limit", limit.to_string()),
378 (
379 "types",
380 slack_conversation_types(params.chat_type).to_string(),
381 ),
382 (
383 "exclude_archived",
384 (!params.include_inactive.unwrap_or(false)).to_string(),
385 ),
386 ];
387 if let Some(cursor) = params.cursor.as_ref() {
388 form.push(("cursor", cursor.clone()));
389 }
390
391 let payload: SlackConversationsListResponse =
392 self.post_form("conversations.list", &form).await?;
393 ensure_ok(payload.ok, payload.error)?;
394
395 let mut items: Vec<_> = payload
396 .channels
397 .into_iter()
398 .filter(|chat| matches_chat_filter(chat, params))
399 .map(map_chat)
400 .collect();
401
402 if let Some(limit) = params.limit {
403 items.truncate(limit as usize);
404 }
405
406 let has_more = payload
407 .response_metadata
408 .as_ref()
409 .map(|meta| !meta.next_cursor.is_empty())
410 .unwrap_or(false);
411 let next_cursor = slack_next_cursor(payload.response_metadata.as_ref());
412
413 Ok(ProviderResult::new(items).with_pagination(Pagination {
414 offset: 0,
415 limit,
416 total: None,
417 has_more,
418 next_cursor,
419 }))
420 }
421
422 async fn get_messages_page(
423 &self,
424 params: &GetMessagesParams,
425 ) -> Result<ProviderResult<MessengerMessage>> {
426 let limit = params.limit.unwrap_or(100).min(1000);
427 let mut form = vec![
428 ("channel", params.chat_id.clone()),
429 ("limit", limit.to_string()),
430 ("inclusive", "true".to_string()),
431 ];
432 if let Some(cursor) = params.cursor.as_ref() {
433 form.push(("cursor", cursor.clone()));
434 }
435 if let Some(since) = normalize_ts_param("since", params.since.as_deref())? {
436 form.push(("oldest", since));
437 }
438 if let Some(until) = normalize_ts_param("until", params.until.as_deref())? {
439 form.push(("latest", until));
440 }
441
442 let payload: SlackMessagesResponse = if let Some(thread_id) = params.thread_id.as_ref() {
443 form.push(("ts", thread_id.clone()));
444 self.post_form("conversations.replies", &form).await?
445 } else {
446 self.post_form("conversations.history", &form).await?
447 };
448 ensure_ok(payload.ok, payload.error)?;
449
450 let mut items = Vec::with_capacity(payload.messages.len());
451 for message in payload.messages {
452 items.push(self.map_message(¶ms.chat_id, message).await?);
453 }
454
455 let has_more = payload.has_more.unwrap_or(false)
456 || payload
457 .response_metadata
458 .as_ref()
459 .map(|meta| !meta.next_cursor.is_empty())
460 .unwrap_or(false);
461 let next_cursor = slack_next_cursor(payload.response_metadata.as_ref());
462
463 Ok(ProviderResult::new(items).with_pagination(Pagination {
464 offset: 0,
465 limit,
466 total: None,
467 has_more,
468 next_cursor,
469 }))
470 }
471
472 async fn map_message(&self, chat_id: &str, message: SlackMessage) -> Result<MessengerMessage> {
473 let ts = message.ts.clone();
474 let thread_id = message.thread_ts.clone();
475 let reply_to_id = thread_id.as_ref().filter(|thread| *thread != &ts).cloned();
476
477 Ok(MessengerMessage {
478 id: ts.clone(),
479 chat_id: chat_id.to_string(),
480 text: normalize_mrkdwn(message.text.as_deref().unwrap_or_default()),
481 author: self.resolve_author(&message).await?,
482 source: "slack".to_string(),
483 timestamp: ts,
484 thread_id,
485 reply_to_id,
486 attachments: map_attachments(&message),
487 is_edited: message.edited.is_some(),
488 })
489 }
490
491 async fn resolve_author(&self, message: &SlackMessage) -> Result<MessageAuthor> {
492 if let Some(user_id) = message.user.as_deref() {
493 return self.get_user(user_id).await;
494 }
495
496 if let Some(bot_profile) = message.bot_profile.as_ref() {
497 return Ok(MessageAuthor {
498 id: bot_profile
499 .id
500 .clone()
501 .or_else(|| message.bot_id.clone())
502 .unwrap_or_else(|| "slack-bot".to_string()),
503 name: bot_profile
504 .name
505 .clone()
506 .or_else(|| message.username.clone())
507 .unwrap_or_else(|| "Slack Bot".to_string()),
508 username: message.username.clone(),
509 avatar_url: bot_profile
510 .icons
511 .as_ref()
512 .and_then(|icons| icons.image_72.clone()),
513 });
514 }
515
516 Ok(MessageAuthor {
517 id: message
518 .bot_id
519 .clone()
520 .or_else(|| message.parent_user_id.clone())
521 .unwrap_or_else(|| "unknown".to_string()),
522 name: message
523 .username
524 .clone()
525 .or_else(|| message.subtype.clone())
526 .unwrap_or_else(|| "Unknown".to_string()),
527 username: message.username.clone(),
528 avatar_url: None,
529 })
530 }
531
532 async fn get_user(&self, user_id: &str) -> Result<MessageAuthor> {
533 if let Some(cached) = self.user_cache.read().await.get(user_id).cloned() {
534 return Ok(cached);
535 }
536
537 let payload: SlackUsersInfoResponse = self
538 .post_form("users.info", &[("user", user_id.to_string())])
539 .await?;
540 ensure_ok(payload.ok, payload.error)?;
541
542 let user = payload
543 .user
544 .ok_or_else(|| Error::InvalidData("Slack users.info returned no user".to_string()))?;
545 let profile = user.profile.as_ref();
546 let display_name = profile
547 .and_then(|profile| profile.display_name.clone())
548 .filter(|name| !name.is_empty());
549 let real_name = profile
550 .and_then(|profile| profile.real_name.clone())
551 .filter(|name| !name.is_empty());
552 let username = user.name.filter(|name| !name.is_empty());
553 let name = display_name
554 .clone()
555 .or(real_name)
556 .or_else(|| username.clone())
557 .unwrap_or_else(|| user.id.clone());
558
559 let author = MessageAuthor {
560 id: user.id,
561 name,
562 username,
563 avatar_url: profile.and_then(|profile| profile.image_72.clone()),
564 };
565
566 self.user_cache
567 .write()
568 .await
569 .insert(user_id.to_string(), author.clone());
570
571 Ok(author)
572 }
573}
574
575#[async_trait]
576impl MessengerProvider for SlackClient {
577 fn provider_name(&self) -> &'static str {
578 "slack"
579 }
580
581 async fn get_chats(&self, params: GetChatsParams) -> Result<ProviderResult<MessengerChat>> {
582 self.get_conversations(¶ms).await
583 }
584
585 async fn get_messages(
586 &self,
587 params: GetMessagesParams,
588 ) -> Result<ProviderResult<MessengerMessage>> {
589 self.get_messages_page(¶ms).await
590 }
591
592 async fn search_messages(
593 &self,
594 params: SearchMessagesParams,
595 ) -> Result<ProviderResult<MessengerMessage>> {
596 let query = params.query.trim().to_lowercase();
597 if query.is_empty() {
598 return Err(Error::InvalidData(
599 "search query must not be empty".to_string(),
600 ));
601 }
602
603 let limit = params.limit.unwrap_or(20) as usize;
604 let mut found = Vec::new();
605
606 let (has_more, next_cursor) = if let Some(chat_id) = params.chat_id.as_ref() {
607 let mut state = parse_search_cursor(params.cursor.as_deref())?;
608 if state.current_chat_id.is_none() {
609 state.current_chat_id = Some(chat_id.clone());
610 }
611 if state.current_chat_id.as_deref() != Some(chat_id.as_str()) {
612 state = SlackSearchCursor {
613 current_chat_id: Some(chat_id.clone()),
614 ..SlackSearchCursor::default()
615 };
616 }
617 if state.current_message_cursor.is_none()
618 && state.current_message_offset == 0
619 && state.next_chats_cursor.is_some()
620 && !has_pending_chat_ids(&state)
621 {
622 state.current_message_cursor = state.next_chats_cursor.take();
623 }
624
625 loop {
626 let messages = self
627 .get_messages_page(&GetMessagesParams {
628 chat_id: chat_id.clone(),
629 limit: Some(params.limit.unwrap_or(100)),
630 cursor: state.current_message_cursor.clone(),
631 thread_id: None,
632 since: params.since.clone(),
633 until: params.until.clone(),
634 })
635 .await?;
636 let pagination = messages.pagination.clone();
637 let page_len = messages.items.len();
638
639 for message in messages
640 .items
641 .into_iter()
642 .skip(state.current_message_offset)
643 {
644 state.current_message_offset += 1;
645 if message.text.to_lowercase().contains(&query) {
646 found.push(message);
647 if found.len() >= limit {
648 break;
649 }
650 }
651 }
652
653 let page_has_more = pagination.as_ref().map(|p| p.has_more).unwrap_or(false);
654 let next_page_cursor = pagination.as_ref().and_then(|p| p.next_cursor.clone());
655
656 if found.len() >= limit {
657 if state.current_message_offset >= page_len {
658 if page_has_more {
659 state.current_message_cursor = next_page_cursor;
660 state.current_message_offset = 0;
661 } else {
662 state.current_chat_id = None;
663 state.current_message_cursor = None;
664 state.current_message_offset = 0;
665 }
666 }
667 break;
668 }
669
670 if page_has_more {
671 state.current_message_cursor = next_page_cursor;
672 state.current_message_offset = 0;
673 continue;
674 }
675
676 state.current_chat_id = None;
677 state.current_message_cursor = None;
678 state.current_message_offset = 0;
679 break;
680 }
681
682 let has_more = state.current_chat_id.is_some();
683 let next_cursor = serialize_search_cursor(&state)?;
684 (has_more, next_cursor)
685 } else {
686 let mut state = parse_search_cursor(params.cursor.as_deref())?;
687 let mut chats_loaded = state.current_chat_id.is_some()
688 || has_pending_chat_ids(&state)
689 || state.next_chats_cursor.is_some()
690 || params.cursor.is_some();
691
692 loop {
693 if found.len() >= limit {
694 break;
695 }
696
697 if let Some(chat_id) = state.current_chat_id.clone() {
698 let messages = self
699 .get_messages_page(&GetMessagesParams {
700 chat_id,
701 limit: Some(100),
702 cursor: state.current_message_cursor.clone(),
703 thread_id: None,
704 since: params.since.clone(),
705 until: params.until.clone(),
706 })
707 .await?;
708 let pagination = messages.pagination.clone();
709 let page_len = messages.items.len();
710
711 for message in messages
712 .items
713 .into_iter()
714 .skip(state.current_message_offset)
715 {
716 state.current_message_offset += 1;
717 if message.text.to_lowercase().contains(&query) {
718 found.push(message);
719 if found.len() >= limit {
720 break;
721 }
722 }
723 }
724
725 if found.len() >= limit {
726 if state.current_message_offset >= page_len {
727 let next_message_cursor = pagination
728 .as_ref()
729 .and_then(|page| page.next_cursor.clone());
730 if pagination
731 .as_ref()
732 .map(|page| page.has_more)
733 .unwrap_or(false)
734 {
735 state.current_message_cursor = next_message_cursor;
736 state.current_message_offset = 0;
737 } else {
738 state.current_chat_id = None;
739 state.current_message_cursor = None;
740 state.current_message_offset = 0;
741 }
742 }
743 break;
744 }
745
746 let next_message_cursor = pagination
747 .as_ref()
748 .and_then(|page| page.next_cursor.clone());
749 if pagination
750 .as_ref()
751 .map(|page| page.has_more)
752 .unwrap_or(false)
753 {
754 state.current_message_cursor = next_message_cursor;
755 state.current_message_offset = 0;
756 continue;
757 }
758
759 state.current_chat_id = None;
760 state.current_message_cursor = None;
761 state.current_message_offset = 0;
762 continue;
763 }
764
765 if let Some(next_chat_id) = take_next_pending_chat_id(&mut state) {
766 state.current_chat_id = Some(next_chat_id);
767 state.current_message_cursor = None;
768 state.current_message_offset = 0;
769 continue;
770 }
771
772 if chats_loaded && state.next_chats_cursor.is_none() {
773 break;
774 }
775
776 let chats = self
777 .get_conversations(&GetChatsParams {
778 search: None,
779 chat_type: None,
780 limit: Some(100),
781 cursor: state.next_chats_cursor.clone(),
782 include_inactive: Some(false),
783 })
784 .await?;
785 state.pending_chat_ids = chats.items.into_iter().map(|chat| chat.id).collect();
786 state.pending_chat_index = 0;
787 state.next_chats_cursor = chats.pagination.and_then(|page| page.next_cursor);
788 chats_loaded = true;
789 }
790
791 let has_more = state.current_chat_id.is_some()
792 || has_pending_chat_ids(&state)
793 || state.next_chats_cursor.is_some();
794 let next_cursor = serialize_search_cursor(&state)?;
795 (has_more, next_cursor)
796 };
797
798 Ok(ProviderResult::new(found).with_pagination(Pagination {
799 offset: 0,
800 limit: limit as u32,
801 total: None,
802 has_more,
803 next_cursor,
804 }))
805 }
806
807 async fn send_message(&self, params: SendMessageParams) -> Result<MessengerMessage> {
808 if !params.attachments.is_empty() {
809 return Err(Error::ProviderUnsupported {
810 provider: self.provider_name().to_string(),
811 operation: "send_message attachments".to_string(),
812 });
813 }
814
815 let thread_ts = params
816 .thread_id
817 .clone()
818 .or_else(|| params.reply_to_id.clone());
819 let mut form = vec![
820 ("channel", params.chat_id.clone()),
821 ("text", params.text.clone()),
822 ("unfurl_links", "false".to_string()),
823 ("unfurl_media", "false".to_string()),
824 ];
825 if let Some(thread_id) = thread_ts.as_ref() {
826 form.push(("thread_ts", thread_id.clone()));
827 }
828
829 let payload: SlackPostMessageResponse = self.post_form("chat.postMessage", &form).await?;
830 ensure_ok(payload.ok, payload.error)?;
831
832 let mut message = payload.message.unwrap_or(SlackMessage {
833 ts: payload.ts.clone().unwrap_or_default(),
834 text: Some(params.text),
835 user: None,
836 username: None,
837 bot_id: None,
838 thread_ts: thread_ts.clone(),
839 parent_user_id: None,
840 subtype: None,
841 edited: None,
842 files: None,
843 attachments: None,
844 bot_profile: None,
845 });
846
847 if message.thread_ts.is_none() {
848 message.thread_ts = thread_ts;
849 }
850
851 self.map_message(
852 payload.channel.as_deref().unwrap_or(¶ms.chat_id),
853 message,
854 )
855 .await
856 }
857}
858
859fn parse_search_cursor(cursor: Option<&str>) -> Result<SlackSearchCursor> {
860 let Some(cursor) = cursor.map(str::trim).filter(|cursor| !cursor.is_empty()) else {
861 return Ok(SlackSearchCursor::default());
862 };
863
864 match serde_json::from_str::<SlackSearchCursor>(cursor) {
865 Ok(state) => Ok(state),
866 Err(_) => Ok(SlackSearchCursor {
867 next_chats_cursor: Some(cursor.to_string()),
868 ..SlackSearchCursor::default()
869 }),
870 }
871}
872
873fn serialize_search_cursor(state: &SlackSearchCursor) -> Result<Option<String>> {
874 let has_more = state.current_chat_id.is_some()
875 || has_pending_chat_ids(state)
876 || state.next_chats_cursor.is_some();
877 if !has_more {
878 return Ok(None);
879 }
880
881 serde_json::to_string(state)
882 .map(Some)
883 .map_err(|e| Error::InvalidData(format!("failed to serialize Slack search cursor: {e}")))
884}
885
886fn map_chat(chat: SlackConversation) -> MessengerChat {
887 let name = conversation_name(&chat);
888 let description = chat
889 .purpose
890 .as_ref()
891 .and_then(|value| value.value.clone())
892 .filter(|value| !value.is_empty())
893 .or_else(|| {
894 chat.topic
895 .as_ref()
896 .and_then(|value| value.value.clone())
897 .filter(|value| !value.is_empty())
898 });
899
900 MessengerChat {
901 id: chat.id.clone(),
902 key: format!("slack:{}", chat.id),
903 name,
904 chat_type: slack_chat_type(&chat),
905 source: "slack".to_string(),
906 member_count: chat.num_members,
907 description,
908 is_active: !chat.is_archived.unwrap_or(false),
909 }
910}
911
912fn map_attachments(message: &SlackMessage) -> Vec<MessageAttachment> {
913 let mut attachments = Vec::new();
914
915 if let Some(files) = message.files.as_ref() {
916 attachments.extend(files.iter().map(|file| MessageAttachment {
917 id: file.id.clone(),
918 name: file.name.clone(),
919 attachment_type: file.filetype.clone().or_else(|| Some("file".to_string())),
920 url: file.permalink.clone().or_else(|| file.url_private.clone()),
921 mime_type: file.mimetype.clone(),
922 }));
923 }
924
925 if let Some(rich_attachments) = message.attachments.as_ref() {
926 attachments.extend(rich_attachments.iter().map(|attachment| {
927 MessageAttachment {
928 id: attachment.id.map(|id| id.to_string()),
929 name: attachment
930 .title
931 .clone()
932 .or_else(|| attachment.fallback.clone()),
933 attachment_type: attachment.service_name.clone(),
934 url: attachment
935 .title_link
936 .clone()
937 .or_else(|| attachment.from_url.clone()),
938 mime_type: None,
939 }
940 }));
941 }
942
943 attachments
944}
945
946fn matches_chat_filter(chat: &SlackConversation, params: &GetChatsParams) -> bool {
947 if let Some(expected) = params.chat_type
948 && slack_chat_type(chat) != expected
949 {
950 return false;
951 }
952
953 if let Some(search) = params.search.as_deref() {
954 let needle = search.to_lowercase();
955 let haystack = conversation_name(chat).to_lowercase();
956 if !haystack.contains(&needle) {
957 return false;
958 }
959 }
960
961 if !params.include_inactive.unwrap_or(false) && chat.is_archived.unwrap_or(false) {
962 return false;
963 }
964
965 true
966}
967
968fn slack_chat_type(chat: &SlackConversation) -> ChatType {
969 if chat.is_im.unwrap_or(false) {
970 ChatType::Direct
971 } else if chat.is_group.unwrap_or(false) || chat.is_mpim.unwrap_or(false) {
972 ChatType::Group
973 } else {
974 ChatType::Channel
975 }
976}
977
978fn conversation_name(chat: &SlackConversation) -> String {
979 chat.name
980 .clone()
981 .filter(|name| !name.is_empty())
982 .or_else(|| chat.user.clone().map(|user| format!("dm-{}", user)))
983 .unwrap_or_else(|| chat.id.clone())
984}
985
986fn slack_conversation_types(chat_type: Option<ChatType>) -> &'static str {
987 match chat_type {
988 Some(ChatType::Direct) => "im",
989 Some(ChatType::Group) => "mpim,private_channel",
990 Some(ChatType::Channel) => "public_channel,private_channel",
991 None => "public_channel,private_channel,mpim,im",
992 }
993}
994
995fn slack_next_cursor(metadata: Option<&SlackResponseMetadata>) -> Option<String> {
996 metadata
997 .map(|metadata| metadata.next_cursor.trim())
998 .filter(|cursor| !cursor.is_empty())
999 .map(ToOwned::to_owned)
1000}
1001
1002fn slack_rate_limit_bucket(method: &str) -> SlackRateLimitBucket {
1003 match method {
1004 "chat.postMessage" => SlackRateLimitBucket::Write,
1005 _ => SlackRateLimitBucket::Read,
1006 }
1007}
1008
1009fn normalize_ts_param(field_name: &str, value: Option<&str>) -> Result<Option<String>> {
1010 let Some(value) = value.map(str::trim) else {
1011 return Ok(None);
1012 };
1013 if value.is_empty() {
1014 return Ok(None);
1015 }
1016 if value.parse::<f64>().is_ok() {
1017 Ok(Some(value.to_string()))
1018 } else {
1019 Err(Error::InvalidData(format!(
1020 "{field_name} must be a Slack timestamp string"
1021 )))
1022 }
1023}
1024
1025fn has_pending_chat_ids(state: &SlackSearchCursor) -> bool {
1026 state.pending_chat_index < state.pending_chat_ids.len()
1027}
1028
1029fn take_next_pending_chat_id(state: &mut SlackSearchCursor) -> Option<String> {
1030 let next_chat_id = state
1031 .pending_chat_ids
1032 .get(state.pending_chat_index)
1033 .cloned()?;
1034 state.pending_chat_index += 1;
1035 if state.pending_chat_index >= state.pending_chat_ids.len() {
1036 state.pending_chat_ids.clear();
1037 state.pending_chat_index = 0;
1038 }
1039 Some(next_chat_id)
1040}
1041
1042fn parse_scopes(headers: &HeaderMap) -> Vec<String> {
1043 headers
1044 .get("x-oauth-scopes")
1045 .and_then(|value| value.to_str().ok())
1046 .map(|value| {
1047 value
1048 .split(',')
1049 .map(str::trim)
1050 .filter(|scope| !scope.is_empty())
1051 .map(ToString::to_string)
1052 .collect()
1053 })
1054 .unwrap_or_default()
1055}
1056
1057impl SlackRateLimiter {
1058 fn new(read_interval: Duration, write_interval: Duration) -> Self {
1059 let now = Instant::now();
1060 Self {
1061 state: Mutex::new(SlackRateLimitState {
1062 read_ready_at: now,
1063 write_ready_at: now,
1064 }),
1065 read_interval,
1066 write_interval,
1067 }
1068 }
1069
1070 async fn acquire(&self, bucket: SlackRateLimitBucket) {
1071 let (wait_until, interval) = {
1072 let mut state = self.state.lock().await;
1073 let now = Instant::now();
1074 let (ready_at, interval) = match bucket {
1075 SlackRateLimitBucket::Read => (&mut state.read_ready_at, self.read_interval),
1076 SlackRateLimitBucket::Write => (&mut state.write_ready_at, self.write_interval),
1077 };
1078 let wait_until = (*ready_at).max(now);
1079 *ready_at = wait_until + interval;
1080 (wait_until, interval)
1081 };
1082
1083 debug!(
1084 ?bucket,
1085 ?wait_until,
1086 ?interval,
1087 "slack rate limiter acquired slot"
1088 );
1089
1090 if wait_until > Instant::now() {
1091 sleep_until(wait_until).await;
1092 }
1093 }
1094
1095 async fn on_rate_limited(&self, bucket: SlackRateLimitBucket, retry_after: Option<Duration>) {
1096 let delay = retry_after.unwrap_or(match bucket {
1097 SlackRateLimitBucket::Read => self.read_interval,
1098 SlackRateLimitBucket::Write => self.write_interval,
1099 });
1100 let next_ready = Instant::now() + delay;
1101 let mut state = self.state.lock().await;
1102 let ready_at = match bucket {
1103 SlackRateLimitBucket::Read => &mut state.read_ready_at,
1104 SlackRateLimitBucket::Write => &mut state.write_ready_at,
1105 };
1106 if next_ready > *ready_at {
1107 *ready_at = next_ready;
1108 }
1109 }
1110}
1111
1112async fn map_http_error<T>(response: reqwest::Response) -> Result<T>
1113where
1114 T: DeserializeOwned,
1115{
1116 let status = response.status();
1117 let retry_after = response
1118 .headers()
1119 .get("retry-after")
1120 .and_then(|value| value.to_str().ok())
1121 .and_then(|value| value.parse::<u64>().ok());
1122
1123 if status.as_u16() == 429 {
1124 return Err(Error::RateLimited { retry_after });
1125 }
1126
1127 if !status.is_success() {
1128 let text = response.text().await.unwrap_or_default();
1129 return Err(Error::from_status(status.as_u16(), text));
1130 }
1131
1132 response
1133 .json()
1134 .await
1135 .map_err(|e| Error::InvalidData(e.to_string()))
1136}
1137
1138fn ensure_ok(ok: bool, error: Option<String>) -> Result<()> {
1139 if ok {
1140 Ok(())
1141 } else {
1142 Err(map_slack_error(
1143 error.unwrap_or_else(|| "unknown_slack_error".to_string()),
1144 ))
1145 }
1146}
1147
1148fn map_slack_error(message: String) -> Error {
1149 match message.as_str() {
1150 "invalid_auth" | "not_authed" => Error::Unauthorized(message),
1151 "missing_scope" | "not_allowed_token_type" => Error::Forbidden(message),
1152 "channel_not_found" | "user_not_found" => Error::NotFound(message),
1153 "ratelimited" => Error::RateLimited { retry_after: None },
1154 _ => Error::Api {
1155 status: 200,
1156 message,
1157 },
1158 }
1159}
1160
1161fn normalize_mrkdwn(text: &str) -> String {
1162 let decoded = text
1163 .replace("&", "&")
1164 .replace("<", "<")
1165 .replace(">", ">");
1166
1167 let mut output = String::new();
1168 let mut chars = decoded.chars().peekable();
1169
1170 while let Some(ch) = chars.next() {
1171 if ch == '<' {
1172 let mut token = String::new();
1173 let mut closed = false;
1174 for next in chars.by_ref() {
1175 if next == '>' {
1176 closed = true;
1177 break;
1178 }
1179 token.push(next);
1180 }
1181
1182 if closed {
1183 output.push_str(&normalize_slack_token(&token));
1184 } else {
1185 output.push('<');
1186 output.push_str(&token);
1187 }
1188 } else {
1189 output.push(ch);
1190 }
1191 }
1192
1193 output
1194}
1195
1196fn normalize_slack_token(token: &str) -> String {
1197 if let Some(user) = token.strip_prefix('@') {
1198 let mut parts = user.splitn(2, '|');
1199 let user_id = parts.next().unwrap_or(user);
1200 let label = parts
1201 .next()
1202 .filter(|label| !label.is_empty())
1203 .unwrap_or(user_id);
1204 return format!("@{}", label);
1205 }
1206 if let Some(rest) = token.strip_prefix('#') {
1207 let mut parts = rest.splitn(2, '|');
1208 let _ = parts.next();
1209 let label = parts.next().unwrap_or(rest);
1210 return format!("#{}", label);
1211 }
1212 if let Some(rest) = token.strip_prefix('!') {
1213 return rest.replace('|', " ");
1214 }
1215 if let Some((url, label)) = token.split_once('|') {
1216 return format!("[{}]({})", label, url);
1217 }
1218 token.to_string()
1219}
1220
1221fn slack_user_to_core(user: SlackUser) -> devboy_core::User {
1222 let profile = user.profile.clone();
1223 let display = profile
1224 .as_ref()
1225 .and_then(|p| p.display_name.clone())
1226 .filter(|s| !s.is_empty());
1227 let real = profile
1228 .as_ref()
1229 .and_then(|p| p.real_name.clone())
1230 .filter(|s| !s.is_empty());
1231 let username_candidate = user.name.clone().filter(|s| !s.is_empty());
1232 let name = display
1233 .clone()
1234 .or(real.clone())
1235 .or(username_candidate.clone());
1236 devboy_core::User {
1237 id: user.id,
1238 username: username_candidate.unwrap_or_default(),
1239 name,
1240 email: profile.as_ref().and_then(|p| p.email.clone()),
1241 avatar_url: profile.and_then(|p| p.image_72),
1242 }
1243}
1244
1245#[async_trait]
1246impl devboy_core::UserProvider for SlackClient {
1247 fn provider_name(&self) -> &'static str {
1248 "slack"
1249 }
1250
1251 async fn get_user_profile(&self, user_id: &str) -> Result<devboy_core::User> {
1252 let payload: SlackUsersInfoResponse = self
1253 .post_form("users.info", &[("user", user_id.to_string())])
1254 .await?;
1255 ensure_ok(payload.ok, payload.error)?;
1256 let user = payload
1257 .user
1258 .ok_or_else(|| Error::InvalidData("Slack users.info returned no user".to_string()))?;
1259 Ok(slack_user_to_core(user))
1260 }
1261
1262 async fn lookup_user_by_email(&self, email: &str) -> Result<Option<devboy_core::User>> {
1263 let payload: SlackUsersInfoResponse = self
1264 .post_form("users.lookupByEmail", &[("email", email.to_string())])
1265 .await?;
1266 if !payload.ok {
1267 match payload.error.as_deref() {
1271 Some("users_not_found") => return Ok(None),
1272 _ => {
1273 ensure_ok(payload.ok, payload.error)?;
1274 }
1275 }
1276 }
1277 Ok(payload.user.map(slack_user_to_core))
1278 }
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283 use super::*;
1284 use httpmock::Method::POST;
1285 use httpmock::MockServer;
1286
1287 fn token(s: &str) -> SecretString {
1288 SecretString::from(s.to_string())
1289 }
1290
1291 #[tokio::test]
1292 async fn auth_info_reads_identity_and_scopes() {
1293 let server = MockServer::start();
1294 server.mock(|when, then| {
1295 when.method(POST).path("/auth.test");
1296 then.status(200)
1297 .header(
1298 "x-oauth-scopes",
1299 "channels:read, channels:history, groups:read, groups:history, im:read, im:history, mpim:read, mpim:history, chat:write, users:read",
1300 )
1301 .json_body(serde_json::json!({
1302 "ok": true,
1303 "url": "https://example.slack.com/",
1304 "team": "Example",
1305 "user": "devboy",
1306 "team_id": "T123",
1307 "user_id": "U123",
1308 "bot_id": "B123"
1309 }));
1310 });
1311
1312 let info = SlackClient::new(token("xoxb-test"))
1313 .with_base_url(server.base_url())
1314 .auth_info()
1315 .await
1316 .unwrap();
1317
1318 assert_eq!(info.team_name, "Example");
1319 assert_eq!(info.user_id, "U123");
1320 assert!(info.missing_scopes.is_empty());
1321 }
1322
1323 #[tokio::test]
1324 async fn ensure_healthy_fails_when_scopes_missing() {
1325 let server = MockServer::start();
1326 server.mock(|when, then| {
1327 when.method(POST).path("/auth.test");
1328 then.status(200)
1329 .header("x-oauth-scopes", "channels:read")
1330 .json_body(serde_json::json!({
1331 "ok": true,
1332 "team": "Example",
1333 "team_id": "T123",
1334 "user_id": "U123"
1335 }));
1336 });
1337
1338 let error = SlackClient::new(token("xoxb-test"))
1339 .with_base_url(server.base_url())
1340 .ensure_healthy()
1341 .await
1342 .unwrap_err();
1343
1344 assert!(error.to_string().contains("missing required scopes"));
1345 }
1346
1347 #[tokio::test]
1348 async fn get_chats_maps_slack_conversations() {
1349 let server = MockServer::start();
1350 server.mock(|when, then| {
1351 when.method(POST).path("/conversations.list");
1352 then.status(200).json_body(serde_json::json!({
1353 "ok": true,
1354 "channels": [
1355 {
1356 "id": "C123",
1357 "name": "engineering",
1358 "is_channel": true,
1359 "is_archived": false,
1360 "num_members": 4,
1361 "purpose": { "value": "Team chat" }
1362 }
1363 ],
1364 "response_metadata": { "next_cursor": "chat-cursor-1" }
1365 }));
1366 });
1367
1368 let result = SlackClient::new(token("xoxb-test"))
1369 .with_base_url(server.base_url())
1370 .get_chats(GetChatsParams::default())
1371 .await
1372 .unwrap();
1373
1374 assert_eq!(result.items.len(), 1);
1375 assert_eq!(result.items[0].name, "engineering");
1376 assert_eq!(result.items[0].chat_type, ChatType::Channel);
1377 assert_eq!(
1378 result
1379 .pagination
1380 .as_ref()
1381 .and_then(|pagination| pagination.next_cursor.as_deref()),
1382 Some("chat-cursor-1")
1383 );
1384 }
1385
1386 #[tokio::test]
1387 async fn get_messages_fetches_thread_replies() {
1388 let server = MockServer::start();
1389 server.mock(|when, then| {
1390 when.method(POST).path("/conversations.replies");
1391 then.status(200).json_body(serde_json::json!({
1392 "ok": true,
1393 "messages": [
1394 {
1395 "ts": "1710000000.000100",
1396 "text": "Root",
1397 "user": "U123",
1398 "thread_ts": "1710000000.000100"
1399 },
1400 {
1401 "ts": "1710000001.000100",
1402 "text": "Reply",
1403 "user": "U123",
1404 "thread_ts": "1710000000.000100"
1405 }
1406 ],
1407 "response_metadata": { "next_cursor": "reply-cursor-1" }
1408 }));
1409 });
1410 server.mock(|when, then| {
1411 when.method(POST).path("/users.info");
1412 then.status(200).json_body(serde_json::json!({
1413 "ok": true,
1414 "user": {
1415 "id": "U123",
1416 "name": "andrey",
1417 "profile": {
1418 "display_name": "Andrey",
1419 "real_name": "Andrey Maznyak",
1420 "image_72": "https://example.com/avatar.png"
1421 }
1422 }
1423 }));
1424 });
1425
1426 let result = SlackClient::new(token("xoxb-test"))
1427 .with_base_url(server.base_url())
1428 .get_messages(GetMessagesParams {
1429 chat_id: "C123".to_string(),
1430 limit: Some(20),
1431 cursor: None,
1432 thread_id: Some("1710000000.000100".to_string()),
1433 since: None,
1434 until: None,
1435 })
1436 .await
1437 .unwrap();
1438
1439 assert_eq!(result.items.len(), 2);
1440 assert_eq!(
1441 result.items[1].reply_to_id.as_deref(),
1442 Some("1710000000.000100")
1443 );
1444 assert_eq!(result.items[0].author.name, "Andrey");
1445 assert_eq!(
1446 result
1447 .pagination
1448 .as_ref()
1449 .and_then(|pagination| pagination.next_cursor.as_deref()),
1450 Some("reply-cursor-1")
1451 );
1452 }
1453
1454 #[tokio::test]
1455 async fn send_message_maps_response() {
1456 let server = MockServer::start();
1457 server.mock(|when, then| {
1458 when.method(POST).path("/chat.postMessage");
1459 then.status(200).json_body(serde_json::json!({
1460 "ok": true,
1461 "channel": "C123",
1462 "ts": "1710000100.000200",
1463 "message": {
1464 "ts": "1710000100.000200",
1465 "text": "hello world",
1466 "bot_profile": {
1467 "id": "B123",
1468 "name": "Devboy",
1469 "icons": { "image_72": "https://example.com/bot.png" }
1470 }
1471 }
1472 }));
1473 });
1474
1475 let result = SlackClient::new(token("xoxb-test"))
1476 .with_base_url(server.base_url())
1477 .send_message(SendMessageParams {
1478 chat_id: "C123".to_string(),
1479 text: "hello world".to_string(),
1480 thread_id: None,
1481 reply_to_id: None,
1482 attachments: vec![],
1483 })
1484 .await
1485 .unwrap();
1486
1487 assert_eq!(result.chat_id, "C123");
1488 assert_eq!(result.text, "hello world");
1489 assert_eq!(result.author.name, "Devboy");
1490 }
1491
1492 #[tokio::test]
1493 async fn send_message_uses_reply_to_id_as_thread_ts_when_thread_id_missing() {
1494 let server = MockServer::start();
1495 let post_message = server.mock(|when, then| {
1496 when.method(POST).path("/chat.postMessage");
1497 then.status(200).json_body(serde_json::json!({
1498 "ok": true,
1499 "channel": "C123",
1500 "ts": "1710000100.000200",
1501 "message": {
1502 "ts": "1710000100.000200",
1503 "text": "reply message"
1504 }
1505 }));
1506 });
1507
1508 let result = SlackClient::new(token("xoxb-test"))
1509 .with_base_url(server.base_url())
1510 .send_message(SendMessageParams {
1511 chat_id: "C123".to_string(),
1512 text: "reply message".to_string(),
1513 thread_id: None,
1514 reply_to_id: Some("1710000000.000100".to_string()),
1515 attachments: vec![],
1516 })
1517 .await
1518 .unwrap();
1519
1520 post_message.assert_calls(1);
1521 assert_eq!(result.thread_id.as_deref(), Some("1710000000.000100"));
1522 assert_eq!(result.reply_to_id.as_deref(), Some("1710000000.000100"));
1523 }
1524
1525 #[tokio::test]
1526 async fn send_message_rejects_attachments() {
1527 let err = SlackClient::new(token("xoxb-test"))
1528 .send_message(SendMessageParams {
1529 chat_id: "C123".to_string(),
1530 text: "reply message".to_string(),
1531 thread_id: None,
1532 reply_to_id: None,
1533 attachments: vec![MessageAttachment {
1534 id: Some("att-1".to_string()),
1535 name: Some("report.txt".to_string()),
1536 attachment_type: None,
1537 url: None,
1538 mime_type: None,
1539 }],
1540 })
1541 .await
1542 .unwrap_err();
1543
1544 assert!(matches!(
1545 err,
1546 Error::ProviderUnsupported { provider, operation }
1547 if provider == "slack" && operation == "send_message attachments"
1548 ));
1549 }
1550
1551 #[tokio::test]
1552 async fn get_messages_caches_resolved_users() {
1553 let server = MockServer::start();
1554 server.mock(|when, then| {
1555 when.method(POST).path("/conversations.history");
1556 then.status(200).json_body(serde_json::json!({
1557 "ok": true,
1558 "messages": [
1559 {
1560 "ts": "1710000000.000100",
1561 "text": "First",
1562 "user": "U123"
1563 },
1564 {
1565 "ts": "1710000001.000100",
1566 "text": "Second",
1567 "user": "U123"
1568 }
1569 ]
1570 }));
1571 });
1572 let users_info = server.mock(|when, then| {
1573 when.method(POST).path("/users.info");
1574 then.status(200).json_body(serde_json::json!({
1575 "ok": true,
1576 "user": {
1577 "id": "U123",
1578 "name": "andrey",
1579 "profile": {
1580 "display_name": "Andrey"
1581 }
1582 }
1583 }));
1584 });
1585
1586 let client = SlackClient::new(token("xoxb-test")).with_base_url(server.base_url());
1587 let result = client
1588 .get_messages(GetMessagesParams {
1589 chat_id: "C123".to_string(),
1590 limit: Some(20),
1591 cursor: None,
1592 thread_id: None,
1593 since: None,
1594 until: None,
1595 })
1596 .await
1597 .unwrap();
1598
1599 assert_eq!(result.items.len(), 2);
1600 assert_eq!(result.items[0].author.name, "Andrey");
1601 assert_eq!(result.items[1].author.name, "Andrey");
1602 users_info.assert_calls(1);
1603 }
1604
1605 #[test]
1606 fn client_configuration_helpers_update_settings() {
1607 let client = SlackClient::new(token("xoxb-test"))
1608 .with_base_url("https://slack.example.test/")
1609 .with_required_scopes(vec!["search:read".to_string(), "chat:write".to_string()]);
1610
1611 assert_eq!(client.base_url, "https://slack.example.test");
1612 assert_eq!(
1613 client.required_scopes(),
1614 ["search:read".to_string(), "chat:write".to_string()]
1615 );
1616 }
1617
1618 #[tokio::test]
1619 async fn auth_info_maps_invalid_auth_to_unauthorized() {
1620 let server = MockServer::start();
1621 server.mock(|when, then| {
1622 when.method(POST).path("/auth.test");
1623 then.status(200).json_body(serde_json::json!({
1624 "ok": false,
1625 "error": "invalid_auth"
1626 }));
1627 });
1628
1629 let error = SlackClient::new(token("xoxb-test"))
1630 .with_base_url(server.base_url())
1631 .auth_info()
1632 .await
1633 .unwrap_err();
1634
1635 assert!(matches!(error, Error::Unauthorized(message) if message == "invalid_auth"));
1636 }
1637
1638 #[tokio::test]
1639 async fn auth_info_maps_missing_scope_to_forbidden() {
1640 let server = MockServer::start();
1641 server.mock(|when, then| {
1642 when.method(POST).path("/auth.test");
1643 then.status(200).json_body(serde_json::json!({
1644 "ok": false,
1645 "error": "missing_scope"
1646 }));
1647 });
1648
1649 let error = SlackClient::new(token("xoxb-test"))
1650 .with_base_url(server.base_url())
1651 .auth_info()
1652 .await
1653 .unwrap_err();
1654
1655 assert!(matches!(error, Error::Forbidden(message) if message == "missing_scope"));
1656 }
1657
1658 #[tokio::test]
1659 async fn auth_info_maps_rate_limit_to_rate_limited_error() {
1660 let server = MockServer::start();
1661 server.mock(|when, then| {
1662 when.method(POST).path("/auth.test");
1663 then.status(429).header("retry-after", "7");
1664 });
1665
1666 let error = SlackClient::new(token("xoxb-test"))
1667 .with_base_url(server.base_url())
1668 .auth_info()
1669 .await
1670 .unwrap_err();
1671
1672 assert!(matches!(
1673 error,
1674 Error::RateLimited {
1675 retry_after: Some(7)
1676 }
1677 ));
1678 }
1679
1680 #[tokio::test]
1681 async fn search_messages_rejects_empty_query() {
1682 let error = SlackClient::new(token("xoxb-test"))
1683 .search_messages(SearchMessagesParams {
1684 query: " ".to_string(),
1685 ..SearchMessagesParams::default()
1686 })
1687 .await
1688 .unwrap_err();
1689
1690 assert!(
1691 matches!(error, Error::InvalidData(message) if message.contains("must not be empty"))
1692 );
1693 }
1694
1695 #[tokio::test]
1696 async fn search_messages_global_cursor_resumes_within_chat_before_next_chat_page() {
1697 let server = MockServer::start();
1698 server.mock(|when, then| {
1699 when.method(POST).path("/conversations.list");
1700 then.status(200).json_body(serde_json::json!({
1701 "ok": true,
1702 "channels": [
1703 { "id": "C1", "name": "general", "is_channel": true, "is_archived": false },
1704 { "id": "C2", "name": "random", "is_channel": true, "is_archived": false }
1705 ],
1706 "response_metadata": { "next_cursor": "chat-page-2" }
1707 }));
1708 });
1709 server.mock(|when, then| {
1710 when.method(POST).path("/conversations.history");
1711 then.status(200).json_body(serde_json::json!({
1712 "ok": true,
1713 "messages": [
1714 { "ts": "1710000000.000100", "text": "no match", "username": "bot" },
1715 { "ts": "1710000001.000100", "text": "Needle on first page", "username": "bot" }
1716 ],
1717 "has_more": true,
1718 "response_metadata": { "next_cursor": "msg-page-2" }
1719 }));
1720 });
1721
1722 let client = SlackClient::new(token("xoxb-test")).with_base_url(server.base_url());
1723 let first = client
1724 .search_messages(SearchMessagesParams {
1725 query: "needle".to_string(),
1726 limit: Some(1),
1727 ..SearchMessagesParams::default()
1728 })
1729 .await
1730 .unwrap();
1731
1732 assert_eq!(first.items.len(), 1);
1733 assert_eq!(first.items[0].chat_id, "C1");
1734 assert!(first.pagination.as_ref().unwrap().has_more);
1735
1736 let cursor = first
1737 .pagination
1738 .as_ref()
1739 .and_then(|pagination| pagination.next_cursor.clone())
1740 .unwrap();
1741 let state = parse_search_cursor(Some(&cursor)).unwrap();
1742 assert_eq!(state.current_chat_id.as_deref(), Some("C1"));
1743 assert_eq!(state.current_message_cursor.as_deref(), Some("msg-page-2"));
1744 assert_eq!(state.current_message_offset, 0);
1745 assert_eq!(state.pending_chat_ids, vec!["C1", "C2"]);
1746 assert_eq!(state.pending_chat_index, 1);
1747 assert_eq!(state.next_chats_cursor.as_deref(), Some("chat-page-2"));
1748 }
1749
1750 #[tokio::test]
1751 async fn search_messages_chat_cursor_resumes_within_page_before_next_page() {
1752 let server = MockServer::start();
1753 server.mock(|when, then| {
1754 when.method(POST).path("/conversations.history");
1755 then.status(200).json_body(serde_json::json!({
1756 "ok": true,
1757 "messages": [
1758 { "ts": "1710000000.000100", "text": "needle first", "username": "bot" },
1759 { "ts": "1710000001.000100", "text": "needle second", "username": "bot" },
1760 { "ts": "1710000002.000100", "text": "no match", "username": "bot" }
1761 ],
1762 "has_more": true,
1763 "response_metadata": { "next_cursor": "msg-page-2" }
1764 }));
1765 });
1766
1767 let client = SlackClient::new(token("xoxb-test")).with_base_url(server.base_url());
1768 let first = client
1769 .search_messages(SearchMessagesParams {
1770 chat_id: Some("C1".to_string()),
1771 query: "needle".to_string(),
1772 limit: Some(1),
1773 ..SearchMessagesParams::default()
1774 })
1775 .await
1776 .unwrap();
1777
1778 assert_eq!(first.items.len(), 1);
1779 assert_eq!(first.items[0].text, "needle first");
1780 assert!(first.pagination.as_ref().unwrap().has_more);
1781
1782 let cursor = first
1783 .pagination
1784 .as_ref()
1785 .and_then(|pagination| pagination.next_cursor.clone())
1786 .unwrap();
1787 let state = parse_search_cursor(Some(&cursor)).unwrap();
1788 assert_eq!(state.current_chat_id.as_deref(), Some("C1"));
1789 assert_eq!(state.current_message_cursor, None);
1790 assert_eq!(state.current_message_offset, 1);
1791
1792 let second = client
1793 .search_messages(SearchMessagesParams {
1794 chat_id: Some("C1".to_string()),
1795 query: "needle".to_string(),
1796 limit: Some(1),
1797 cursor: Some(cursor),
1798 ..SearchMessagesParams::default()
1799 })
1800 .await
1801 .unwrap();
1802
1803 assert_eq!(second.items.len(), 1);
1804 assert_eq!(second.items[0].text, "needle second");
1805 assert!(second.pagination.as_ref().unwrap().has_more);
1806
1807 let second_cursor = second
1808 .pagination
1809 .as_ref()
1810 .and_then(|pagination| pagination.next_cursor.clone())
1811 .unwrap();
1812 let second_state = parse_search_cursor(Some(&second_cursor)).unwrap();
1813 assert_eq!(second_state.current_chat_id.as_deref(), Some("C1"));
1814 assert_eq!(second_state.current_message_cursor, None);
1815 assert_eq!(second_state.current_message_offset, 2);
1816 }
1817
1818 #[test]
1819 fn helper_functions_cover_filtering_and_mapping_cases() {
1820 let archived_group = SlackConversation {
1821 id: "C1".to_string(),
1822 name: Some("Project Alpha".to_string()),
1823 user: None,
1824 is_group: Some(true),
1825 is_im: None,
1826 is_mpim: None,
1827 is_archived: Some(true),
1828 num_members: Some(3),
1829 purpose: Some(SlackTextValue {
1830 value: Some("".to_string()),
1831 }),
1832 topic: Some(SlackTextValue {
1833 value: Some("Topic text".to_string()),
1834 }),
1835 };
1836
1837 assert_eq!(slack_chat_type(&archived_group), ChatType::Group);
1838 assert_eq!(conversation_name(&archived_group), "Project Alpha");
1839 assert!(!matches_chat_filter(
1840 &archived_group,
1841 &GetChatsParams {
1842 search: Some("alpha".to_string()),
1843 chat_type: Some(ChatType::Group),
1844 include_inactive: Some(false),
1845 ..GetChatsParams::default()
1846 }
1847 ));
1848 assert!(matches_chat_filter(
1849 &archived_group,
1850 &GetChatsParams {
1851 search: Some("alpha".to_string()),
1852 chat_type: Some(ChatType::Group),
1853 include_inactive: Some(true),
1854 ..GetChatsParams::default()
1855 }
1856 ));
1857
1858 let direct_chat = SlackConversation {
1859 id: "D1".to_string(),
1860 name: None,
1861 user: Some("U123".to_string()),
1862 is_group: None,
1863 is_im: Some(true),
1864 is_mpim: None,
1865 is_archived: Some(false),
1866 num_members: None,
1867 purpose: None,
1868 topic: None,
1869 };
1870 assert_eq!(slack_chat_type(&direct_chat), ChatType::Direct);
1871 assert_eq!(conversation_name(&direct_chat), "dm-U123");
1872
1873 let mapped = map_chat(archived_group);
1874 assert_eq!(mapped.description.as_deref(), Some("Topic text"));
1875 assert!(!mapped.is_active);
1876 }
1877
1878 #[test]
1879 fn attachment_and_cursor_helpers_cover_fallback_paths() {
1880 let attachments = map_attachments(&SlackMessage {
1881 ts: "1710000000.000100".to_string(),
1882 text: None,
1883 user: None,
1884 username: None,
1885 bot_id: None,
1886 thread_ts: None,
1887 parent_user_id: None,
1888 subtype: None,
1889 edited: None,
1890 files: Some(vec![SlackFile {
1891 id: Some("F1".to_string()),
1892 name: Some("report.pdf".to_string()),
1893 mimetype: Some("application/pdf".to_string()),
1894 filetype: None,
1895 url_private: Some("https://private.example/report.pdf".to_string()),
1896 permalink: None,
1897 }]),
1898 attachments: Some(vec![SlackRichAttachment {
1899 id: Some(42),
1900 title: None,
1901 fallback: Some("Fallback title".to_string()),
1902 service_name: Some("docs".to_string()),
1903 title_link: None,
1904 from_url: Some("https://example.com/doc".to_string()),
1905 }]),
1906 bot_profile: None,
1907 });
1908
1909 assert_eq!(attachments.len(), 2);
1910 assert_eq!(attachments[0].attachment_type.as_deref(), Some("file"));
1911 assert_eq!(
1912 attachments[0].url.as_deref(),
1913 Some("https://private.example/report.pdf")
1914 );
1915 assert_eq!(attachments[1].id.as_deref(), Some("42"));
1916 assert_eq!(attachments[1].name.as_deref(), Some("Fallback title"));
1917 assert_eq!(
1918 attachments[1].url.as_deref(),
1919 Some("https://example.com/doc")
1920 );
1921
1922 assert_eq!(slack_conversation_types(Some(ChatType::Direct)), "im");
1923 assert_eq!(
1924 slack_conversation_types(Some(ChatType::Group)),
1925 "mpim,private_channel"
1926 );
1927 assert_eq!(
1928 slack_conversation_types(Some(ChatType::Channel)),
1929 "public_channel,private_channel"
1930 );
1931 assert_eq!(
1932 slack_conversation_types(None),
1933 "public_channel,private_channel,mpim,im"
1934 );
1935 assert_eq!(
1936 slack_next_cursor(Some(&SlackResponseMetadata {
1937 next_cursor: " cursor-1 ".to_string(),
1938 })),
1939 Some("cursor-1".to_string())
1940 );
1941 assert_eq!(slack_next_cursor(None), None);
1942 }
1943
1944 #[test]
1945 fn ts_scope_and_markdown_helpers_cover_edge_cases() {
1946 let mut headers = HeaderMap::new();
1947 headers.insert(
1948 "x-oauth-scopes",
1949 " channels:read, , chat:write ".parse().unwrap(),
1950 );
1951
1952 assert_eq!(
1953 normalize_ts_param("since", Some(" 1710000000.000100 "))
1954 .unwrap()
1955 .as_deref(),
1956 Some("1710000000.000100")
1957 );
1958 assert!(matches!(
1959 normalize_ts_param("until", Some("not-a-ts")).unwrap_err(),
1960 Error::InvalidData(message) if message == "until must be a Slack timestamp string"
1961 ));
1962 assert_eq!(normalize_ts_param("since", Some(" ")).unwrap(), None);
1963 assert_eq!(
1964 parse_scopes(&headers),
1965 vec!["channels:read".to_string(), "chat:write".to_string()]
1966 );
1967 assert!(parse_scopes(&HeaderMap::new()).is_empty());
1968
1969 assert_eq!(normalize_slack_token("@U123"), "@U123");
1970 assert_eq!(normalize_slack_token("@U123|andrey"), "@andrey");
1971 assert_eq!(normalize_slack_token("#C123|general"), "#general");
1972 assert_eq!(normalize_slack_token("!here"), "here");
1973 assert_eq!(
1974 normalize_slack_token("https://example.com|docs"),
1975 "[docs](https://example.com)"
1976 );
1977 assert_eq!(normalize_slack_token("plain-token"), "plain-token");
1978 assert_eq!(
1979 normalize_mrkdwn("unterminated <https://example.com"),
1980 "unterminated <https://example.com"
1981 );
1982 }
1983
1984 #[test]
1985 fn slack_error_helpers_map_expected_variants() {
1986 assert!(ensure_ok(true, None).is_ok());
1987 assert!(matches!(
1988 ensure_ok(false, Some("missing_scope".to_string())).unwrap_err(),
1989 Error::Forbidden(message) if message == "missing_scope"
1990 ));
1991 assert!(matches!(
1992 map_slack_error("invalid_auth".to_string()),
1993 Error::Unauthorized(message) if message == "invalid_auth"
1994 ));
1995 assert!(matches!(
1996 map_slack_error("not_allowed_token_type".to_string()),
1997 Error::Forbidden(message) if message == "not_allowed_token_type"
1998 ));
1999 assert!(matches!(
2000 map_slack_error("channel_not_found".to_string()),
2001 Error::NotFound(message) if message == "channel_not_found"
2002 ));
2003 assert!(matches!(
2004 map_slack_error("ratelimited".to_string()),
2005 Error::RateLimited { retry_after: None }
2006 ));
2007 assert!(matches!(
2008 map_slack_error("other".to_string()),
2009 Error::Api { status: 200, message } if message == "other"
2010 ));
2011 }
2012
2013 #[test]
2014 fn normalize_slack_markup_to_markdownish_text() {
2015 let text = normalize_mrkdwn(
2016 "See <https://example.com|docs> and talk to <@U123> in <#C123|general>",
2017 );
2018 assert!(text.contains("[docs](https://example.com)"));
2019 assert!(text.contains("@U123"));
2020 assert!(text.contains("#general"));
2021 }
2022
2023 #[test]
2024 fn rate_limit_bucket_matches_write_method() {
2025 assert_eq!(
2026 slack_rate_limit_bucket("chat.postMessage"),
2027 SlackRateLimitBucket::Write
2028 );
2029 assert_eq!(
2030 slack_rate_limit_bucket("conversations.history"),
2031 SlackRateLimitBucket::Read
2032 );
2033 }
2034
2035 #[tokio::test]
2036 async fn rate_limiter_spaces_same_bucket_requests() {
2037 let limiter = SlackRateLimiter::new(Duration::from_millis(25), Duration::from_millis(10));
2038
2039 let start = std::time::Instant::now();
2040 limiter.acquire(SlackRateLimitBucket::Read).await;
2041 limiter.acquire(SlackRateLimitBucket::Read).await;
2042
2043 assert!(start.elapsed() >= Duration::from_millis(20));
2044 }
2045
2046 #[tokio::test]
2047 async fn rate_limiter_keeps_read_and_write_buckets_independent() {
2048 let limiter = SlackRateLimiter::new(Duration::from_millis(50), Duration::from_millis(10));
2049
2050 limiter.acquire(SlackRateLimitBucket::Read).await;
2051
2052 let start = std::time::Instant::now();
2053 limiter.acquire(SlackRateLimitBucket::Write).await;
2054
2055 assert!(start.elapsed() < Duration::from_millis(25));
2056 }
2057
2058 #[test]
2059 fn parse_search_cursor_accepts_legacy_chat_cursor() {
2060 let state = parse_search_cursor(Some("chat-cursor-1")).unwrap();
2061
2062 assert_eq!(state.next_chats_cursor.as_deref(), Some("chat-cursor-1"));
2063 assert!(state.current_chat_id.is_none());
2064 assert!(state.pending_chat_ids.is_empty());
2065 }
2066
2067 #[test]
2068 fn search_cursor_round_trips_with_message_progress() {
2069 let cursor = SlackSearchCursor {
2070 version: 1,
2071 current_chat_id: Some("C123".to_string()),
2072 current_message_cursor: Some("msg-cursor-2".to_string()),
2073 current_message_offset: 37,
2074 pending_chat_ids: vec!["C124".to_string(), "C125".to_string()],
2075 pending_chat_index: 1,
2076 next_chats_cursor: Some("chat-cursor-9".to_string()),
2077 };
2078
2079 let encoded = serialize_search_cursor(&cursor).unwrap().unwrap();
2080 let decoded = parse_search_cursor(Some(&encoded)).unwrap();
2081
2082 assert_eq!(decoded.version, 1);
2083 assert_eq!(decoded.current_chat_id.as_deref(), Some("C123"));
2084 assert_eq!(
2085 decoded.current_message_cursor.as_deref(),
2086 Some("msg-cursor-2")
2087 );
2088 assert_eq!(decoded.current_message_offset, 37);
2089 assert_eq!(decoded.pending_chat_ids, vec!["C124", "C125"]);
2090 assert_eq!(decoded.pending_chat_index, 1);
2091 assert_eq!(decoded.next_chats_cursor.as_deref(), Some("chat-cursor-9"));
2092 }
2093
2094 #[test]
2095 fn take_next_pending_chat_id_advances_without_shifting() {
2096 let mut state = SlackSearchCursor {
2097 pending_chat_ids: vec!["C124".to_string(), "C125".to_string()],
2098 ..SlackSearchCursor::default()
2099 };
2100
2101 assert_eq!(
2102 take_next_pending_chat_id(&mut state).as_deref(),
2103 Some("C124")
2104 );
2105 assert_eq!(state.pending_chat_ids, vec!["C124", "C125"]);
2106 assert_eq!(state.pending_chat_index, 1);
2107 assert!(has_pending_chat_ids(&state));
2108
2109 assert_eq!(
2110 take_next_pending_chat_id(&mut state).as_deref(),
2111 Some("C125")
2112 );
2113 assert!(state.pending_chat_ids.is_empty());
2114 assert_eq!(state.pending_chat_index, 0);
2115 assert!(!has_pending_chat_ids(&state));
2116 }
2117
2118 #[test]
2119 fn serialize_search_cursor_omits_finished_state() {
2120 let encoded = serialize_search_cursor(&SlackSearchCursor::default()).unwrap();
2121
2122 assert!(encoded.is_none());
2123 }
2124
2125 #[tokio::test]
2130 async fn user_provider_get_user_profile_maps_fields() {
2131 use devboy_core::UserProvider;
2132 let server = MockServer::start();
2133 server.mock(|when, then| {
2134 when.method(POST).path("/users.info");
2135 then.status(200).json_body(serde_json::json!({
2136 "ok": true,
2137 "user": {
2138 "id": "U1",
2139 "name": "alice",
2140 "profile": {
2141 "real_name": "Alice Liddell",
2142 "display_name": "alice.l",
2143 "image_72": "https://cdn/alice.png",
2144 "email": "alice@example.com"
2145 }
2146 }
2147 }));
2148 });
2149
2150 let user = SlackClient::new(token("xoxb-test"))
2151 .with_base_url(server.base_url())
2152 .get_user_profile("U1")
2153 .await
2154 .unwrap();
2155
2156 assert_eq!(user.id, "U1");
2157 assert_eq!(user.username, "alice");
2158 assert_eq!(user.name.as_deref(), Some("alice.l"));
2159 assert_eq!(user.email.as_deref(), Some("alice@example.com"));
2160 assert_eq!(user.avatar_url.as_deref(), Some("https://cdn/alice.png"));
2161 }
2162
2163 #[tokio::test]
2164 async fn user_provider_lookup_by_email_users_not_found_returns_none() {
2165 use devboy_core::UserProvider;
2166 let server = MockServer::start();
2167 server.mock(|when, then| {
2168 when.method(POST).path("/users.lookupByEmail");
2169 then.status(200).json_body(serde_json::json!({
2170 "ok": false,
2171 "error": "users_not_found"
2172 }));
2173 });
2174
2175 let result = SlackClient::new(token("xoxb-test"))
2176 .with_base_url(server.base_url())
2177 .lookup_user_by_email("missing@example.com")
2178 .await
2179 .unwrap();
2180
2181 assert!(result.is_none());
2182 }
2183
2184 #[tokio::test]
2185 async fn user_provider_lookup_by_email_hit_returns_user() {
2186 use devboy_core::UserProvider;
2187 let server = MockServer::start();
2188 server.mock(|when, then| {
2189 when.method(POST).path("/users.lookupByEmail");
2190 then.status(200).json_body(serde_json::json!({
2191 "ok": true,
2192 "user": {
2193 "id": "U2",
2194 "name": "bob",
2195 "profile": {
2196 "real_name": "Bob",
2197 "display_name": "",
2198 "image_72": null,
2199 "email": "bob@example.com"
2200 }
2201 }
2202 }));
2203 });
2204
2205 let user = SlackClient::new(token("xoxb-test"))
2206 .with_base_url(server.base_url())
2207 .lookup_user_by_email("bob@example.com")
2208 .await
2209 .unwrap()
2210 .expect("user match");
2211
2212 assert_eq!(user.id, "U2");
2213 assert_eq!(user.email.as_deref(), Some("bob@example.com"));
2214 assert_eq!(user.name.as_deref(), Some("Bob"));
2216 }
2217
2218 #[tokio::test]
2219 async fn user_provider_lookup_by_email_propagates_other_errors() {
2220 use devboy_core::UserProvider;
2221 let server = MockServer::start();
2222 server.mock(|when, then| {
2223 when.method(POST).path("/users.lookupByEmail");
2224 then.status(200).json_body(serde_json::json!({
2225 "ok": false,
2226 "error": "missing_scope"
2227 }));
2228 });
2229
2230 let err = SlackClient::new(token("xoxb-test"))
2231 .with_base_url(server.base_url())
2232 .lookup_user_by_email("alice@example.com")
2233 .await
2234 .expect_err("missing_scope must not be silently swallowed");
2235 let msg = err.to_string();
2236 assert!(msg.contains("missing_scope"), "unexpected: {msg}");
2237 }
2238}