1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use devboy_core::types::ChatType;
8use devboy_core::{
9 Error, GetChatsParams, GetMessagesParams, MessageAttachment, MessageAuthor, MessengerChat,
10 MessengerMessage, MessengerProvider, Pagination, ProviderResult, Result, SearchMessagesParams,
11 SendMessageParams,
12};
13use reqwest::header::HeaderMap;
14use secrecy::{ExposeSecret, SecretString};
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use tokio::sync::{Mutex, RwLock};
18use tokio::time::{Instant, sleep_until};
19use tracing::debug;
20
21use crate::DEFAULT_SLACK_API_URL;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct SlackAuthInfo {
25 pub user_id: String,
26 pub user_name: Option<String>,
27 pub team_id: String,
28 pub team_name: String,
29 pub bot_id: Option<String>,
30 pub url: Option<String>,
31 pub scopes: Vec<String>,
32 pub missing_scopes: Vec<String>,
33}
34
35#[derive(Clone)]
36pub struct SlackClient {
37 token: SecretString,
38 base_url: String,
39 http: reqwest::Client,
40 required_scopes: Vec<String>,
41 user_cache: Arc<RwLock<HashMap<String, MessageAuthor>>>,
42 rate_limiter: Arc<SlackRateLimiter>,
43}
44
45impl fmt::Debug for SlackClient {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 f.debug_struct("SlackClient")
48 .field("token", &"<redacted>")
49 .field("base_url", &self.base_url)
50 .field("http", &self.http)
51 .field("required_scopes", &self.required_scopes)
52 .field("user_cache", &self.user_cache)
53 .field("rate_limiter", &self.rate_limiter)
54 .finish()
55 }
56}
57
58const SLACK_READ_INTERVAL: Duration = Duration::from_millis(1200);
59const SLACK_WRITE_INTERVAL: Duration = Duration::from_secs(1);
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62enum SlackRateLimitBucket {
63 Read,
64 Write,
65}
66
67#[derive(Debug)]
68struct SlackRateLimiter {
69 state: Mutex<SlackRateLimitState>,
70 read_interval: Duration,
71 write_interval: Duration,
72}
73
74#[derive(Debug)]
75struct SlackRateLimitState {
76 read_ready_at: Instant,
77 write_ready_at: Instant,
78}
79
80#[derive(Debug, Deserialize)]
81struct SlackAuthTestResponse {
82 ok: bool,
83 error: Option<String>,
84 url: Option<String>,
85 team: Option<String>,
86 user: Option<String>,
87 #[serde(rename = "team_id")]
88 team_id: Option<String>,
89 #[serde(rename = "user_id")]
90 user_id: Option<String>,
91 #[serde(rename = "bot_id")]
92 bot_id: Option<String>,
93}
94
95#[derive(Debug, Deserialize)]
96struct SlackResponseMetadata {
97 #[serde(default)]
98 next_cursor: String,
99}
100
101#[derive(Debug, Deserialize)]
102struct SlackConversationsListResponse {
103 ok: bool,
104 error: Option<String>,
105 #[serde(default)]
106 channels: Vec<SlackConversation>,
107 response_metadata: Option<SlackResponseMetadata>,
108}
109
110#[derive(Debug, Clone, Deserialize)]
111struct SlackConversation {
112 id: String,
113 name: Option<String>,
114 user: Option<String>,
115 is_group: Option<bool>,
116 is_im: Option<bool>,
117 is_mpim: Option<bool>,
118 is_archived: Option<bool>,
119 num_members: Option<u32>,
120 purpose: Option<SlackTextValue>,
121 topic: Option<SlackTextValue>,
122}
123
124#[derive(Debug, Clone, Deserialize)]
125struct SlackTextValue {
126 value: Option<String>,
127}
128
129#[derive(Debug, Deserialize)]
130struct SlackMessagesResponse {
131 ok: bool,
132 error: Option<String>,
133 #[serde(default)]
134 messages: Vec<SlackMessage>,
135 has_more: Option<bool>,
136 response_metadata: Option<SlackResponseMetadata>,
137}
138
139#[derive(Debug, Clone, Default, Serialize, Deserialize)]
140struct SlackSearchCursor {
141 version: u8,
142 current_chat_id: Option<String>,
143 current_message_cursor: Option<String>,
144 #[serde(default)]
145 current_message_offset: usize,
146 #[serde(default)]
147 pending_chat_ids: Vec<String>,
148 #[serde(default)]
149 pending_chat_index: usize,
150 next_chats_cursor: Option<String>,
151}
152
153#[derive(Debug, Deserialize)]
154struct SlackPostMessageResponse {
155 ok: bool,
156 error: Option<String>,
157 channel: Option<String>,
158 ts: Option<String>,
159 message: Option<SlackMessage>,
160}
161
162#[derive(Debug, Deserialize)]
163struct SlackUsersInfoResponse {
164 ok: bool,
165 error: Option<String>,
166 user: Option<SlackUser>,
167}
168
169#[derive(Debug, Clone, Deserialize)]
170struct SlackUser {
171 id: String,
172 name: Option<String>,
173 profile: Option<SlackUserProfile>,
174}
175
176#[derive(Debug, Clone, Deserialize)]
177struct SlackUserProfile {
178 real_name: Option<String>,
179 display_name: Option<String>,
180 image_72: Option<String>,
181 #[serde(default)]
184 email: Option<String>,
185}
186
187#[derive(Debug, Clone, Deserialize)]
188struct SlackMessage {
189 ts: String,
190 text: Option<String>,
191 user: Option<String>,
192 username: Option<String>,
193 bot_id: Option<String>,
194 thread_ts: Option<String>,
195 parent_user_id: Option<String>,
196 subtype: Option<String>,
197 edited: Option<serde_json::Value>,
198 files: Option<Vec<SlackFile>>,
199 attachments: Option<Vec<SlackRichAttachment>>,
200 bot_profile: Option<SlackBotProfile>,
201}
202
203#[derive(Debug, Clone, Deserialize)]
204struct SlackBotProfile {
205 id: Option<String>,
206 name: Option<String>,
207 icons: Option<SlackBotIcons>,
208}
209
210#[derive(Debug, Clone, Deserialize)]
211struct SlackBotIcons {
212 image_72: Option<String>,
213}
214
215#[derive(Debug, Clone, Deserialize)]
216struct SlackFile {
217 id: Option<String>,
218 name: Option<String>,
219 mimetype: Option<String>,
220 filetype: Option<String>,
221 size: Option<u64>,
222 url_private: Option<String>,
223 permalink: Option<String>,
224}
225
226#[derive(Debug, Clone, Deserialize)]
227struct SlackRichAttachment {
228 id: Option<u64>,
229 title: Option<String>,
230 fallback: Option<String>,
231 service_name: Option<String>,
232 title_link: Option<String>,
233 from_url: Option<String>,
234}
235
236impl SlackClient {
237 pub fn new(token: SecretString) -> Self {
238 Self {
239 token,
240 base_url: DEFAULT_SLACK_API_URL.to_string(),
241 http: reqwest::Client::new(),
242 required_scopes: devboy_core::default_slack_required_scopes(),
243 user_cache: Arc::new(RwLock::new(HashMap::new())),
244 rate_limiter: Arc::new(SlackRateLimiter::new(
245 SLACK_READ_INTERVAL,
246 SLACK_WRITE_INTERVAL,
247 )),
248 }
249 }
250
251 pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
252 self.base_url = base_url.into().trim_end_matches('/').to_string();
253 self
254 }
255
256 pub fn with_required_scopes(mut self, required_scopes: Vec<String>) -> Self {
257 self.required_scopes = required_scopes;
258 self
259 }
260
261 pub fn required_scopes(&self) -> &[String] {
262 &self.required_scopes
263 }
264
265 pub async fn auth_info(&self) -> Result<SlackAuthInfo> {
266 let url = format!("{}/auth.test", self.base_url);
267 debug!(url, "slack auth.test request");
268
269 let response = self.send_form_request("auth.test", &[]).await?;
270 let headers = response.headers().clone();
271 let payload: SlackAuthTestResponse = map_http_error(response).await?;
272
273 if !payload.ok {
274 let message = payload
275 .error
276 .unwrap_or_else(|| "unknown_slack_error".to_string());
277 return Err(match message.as_str() {
278 "invalid_auth" | "not_authed" => Error::Unauthorized(message),
279 "missing_scope" => Error::Forbidden(message),
280 _ => Error::Api {
281 status: 200,
282 message,
283 },
284 });
285 }
286
287 let scopes = parse_scopes(&headers);
288 let missing_scopes = self
289 .required_scopes
290 .iter()
291 .filter(|scope| !scopes.iter().any(|actual| actual == *scope))
292 .cloned()
293 .collect();
294
295 Ok(SlackAuthInfo {
296 user_id: payload.user_id.unwrap_or_default(),
297 user_name: payload.user,
298 team_id: payload.team_id.unwrap_or_default(),
299 team_name: payload.team.unwrap_or_default(),
300 bot_id: payload.bot_id,
301 url: payload.url,
302 scopes,
303 missing_scopes,
304 })
305 }
306
307 pub async fn ensure_healthy(&self) -> Result<SlackAuthInfo> {
308 let info = self.auth_info().await?;
309 if info.missing_scopes.is_empty() {
310 Ok(info)
311 } else {
312 Err(Error::Forbidden(format!(
313 "Slack token is missing required scopes: {}",
314 info.missing_scopes.join(", ")
315 )))
316 }
317 }
318
319 async fn post_form<T>(&self, method: &str, params: &[(&str, String)]) -> Result<T>
320 where
321 T: DeserializeOwned,
322 {
323 let response = self.send_form_request(method, params).await?;
324 map_http_error(response).await
325 }
326
327 async fn send_form_request(
328 &self,
329 method: &str,
330 params: &[(&str, String)],
331 ) -> Result<reqwest::Response> {
332 let url = format!("{}/{}", self.base_url, method);
333 let bucket = slack_rate_limit_bucket(method);
334 debug!(url, ?bucket, "slack api request");
335
336 for attempt in 0..2 {
337 self.rate_limiter.acquire(bucket).await;
338
339 let response = self
340 .http
341 .post(&url)
342 .bearer_auth(self.token.expose_secret())
343 .form(params)
344 .send()
345 .await
346 .map_err(|e| Error::Network(e.to_string()))?;
347
348 if response.status().as_u16() != 429 {
349 return Ok(response);
350 }
351
352 let retry_after = response
353 .headers()
354 .get("retry-after")
355 .and_then(|value| value.to_str().ok())
356 .and_then(|value| value.parse::<u64>().ok());
357
358 self.rate_limiter
359 .on_rate_limited(bucket, retry_after.map(Duration::from_secs))
360 .await;
361
362 if attempt == 0 && retry_after.is_some() {
363 continue;
364 }
365
366 return Ok(response);
367 }
368
369 unreachable!("slack request retry loop should always return");
370 }
371
372 async fn get_conversations(
373 &self,
374 params: &GetChatsParams,
375 ) -> Result<ProviderResult<MessengerChat>> {
376 let limit = params.limit.unwrap_or(100).min(1000);
377 let mut form = vec![
378 ("limit", limit.to_string()),
379 (
380 "types",
381 slack_conversation_types(params.chat_type).to_string(),
382 ),
383 (
384 "exclude_archived",
385 (!params.include_inactive.unwrap_or(false)).to_string(),
386 ),
387 ];
388 if let Some(cursor) = params.cursor.as_ref() {
389 form.push(("cursor", cursor.clone()));
390 }
391
392 let payload: SlackConversationsListResponse =
393 self.post_form("conversations.list", &form).await?;
394 ensure_ok(payload.ok, payload.error)?;
395
396 let mut items: Vec<_> = payload
397 .channels
398 .into_iter()
399 .filter(|chat| matches_chat_filter(chat, params))
400 .map(map_chat)
401 .collect();
402
403 if let Some(limit) = params.limit {
404 items.truncate(limit as usize);
405 }
406
407 let has_more = payload
408 .response_metadata
409 .as_ref()
410 .map(|meta| !meta.next_cursor.is_empty())
411 .unwrap_or(false);
412 let next_cursor = slack_next_cursor(payload.response_metadata.as_ref());
413
414 Ok(ProviderResult::new(items).with_pagination(Pagination {
415 offset: 0,
416 limit,
417 total: None,
418 has_more,
419 next_cursor,
420 }))
421 }
422
423 async fn get_messages_page(
424 &self,
425 params: &GetMessagesParams,
426 ) -> Result<ProviderResult<MessengerMessage>> {
427 let limit = params.limit.unwrap_or(100).min(1000);
428 let mut form = vec![
429 ("channel", params.chat_id.clone()),
430 ("limit", limit.to_string()),
431 ("inclusive", "true".to_string()),
432 ];
433 if let Some(cursor) = params.cursor.as_ref() {
434 form.push(("cursor", cursor.clone()));
435 }
436 if let Some(since) = normalize_ts_param("since", params.since.as_deref())? {
437 form.push(("oldest", since));
438 }
439 if let Some(until) = normalize_ts_param("until", params.until.as_deref())? {
440 form.push(("latest", until));
441 }
442
443 let payload: SlackMessagesResponse = if let Some(thread_id) = params.thread_id.as_ref() {
444 form.push(("ts", thread_id.clone()));
445 self.post_form("conversations.replies", &form).await?
446 } else {
447 self.post_form("conversations.history", &form).await?
448 };
449 ensure_ok(payload.ok, payload.error)?;
450
451 let mut items = Vec::with_capacity(payload.messages.len());
452 for message in payload.messages {
453 items.push(self.map_message(¶ms.chat_id, message).await?);
454 }
455
456 let has_more = payload.has_more.unwrap_or(false)
457 || payload
458 .response_metadata
459 .as_ref()
460 .map(|meta| !meta.next_cursor.is_empty())
461 .unwrap_or(false);
462 let next_cursor = slack_next_cursor(payload.response_metadata.as_ref());
463
464 Ok(ProviderResult::new(items).with_pagination(Pagination {
465 offset: 0,
466 limit,
467 total: None,
468 has_more,
469 next_cursor,
470 }))
471 }
472
473 async fn map_message(&self, chat_id: &str, message: SlackMessage) -> Result<MessengerMessage> {
474 let ts = message.ts.clone();
475 let thread_id = message.thread_ts.clone();
476 let reply_to_id = thread_id.as_ref().filter(|thread| *thread != &ts).cloned();
477
478 Ok(MessengerMessage {
479 id: ts.clone(),
480 chat_id: chat_id.to_string(),
481 text: normalize_mrkdwn(message.text.as_deref().unwrap_or_default()),
482 author: self.resolve_author(&message).await?,
483 source: "slack".to_string(),
484 timestamp: ts,
485 thread_id,
486 reply_to_id,
487 attachments: map_attachments(&message),
488 is_edited: message.edited.is_some(),
489 edit_date: None,
490 })
491 }
492
493 async fn resolve_author(&self, message: &SlackMessage) -> Result<MessageAuthor> {
494 if let Some(user_id) = message.user.as_deref() {
495 return self.get_user(user_id).await;
496 }
497
498 if let Some(bot_profile) = message.bot_profile.as_ref() {
499 return Ok(MessageAuthor {
500 id: bot_profile
501 .id
502 .clone()
503 .or_else(|| message.bot_id.clone())
504 .unwrap_or_else(|| "slack-bot".to_string()),
505 name: bot_profile
506 .name
507 .clone()
508 .or_else(|| message.username.clone())
509 .unwrap_or_else(|| "Slack Bot".to_string()),
510 username: message.username.clone(),
511 avatar_url: bot_profile
512 .icons
513 .as_ref()
514 .and_then(|icons| icons.image_72.clone()),
515 });
516 }
517
518 Ok(MessageAuthor {
519 id: message
520 .bot_id
521 .clone()
522 .or_else(|| message.parent_user_id.clone())
523 .unwrap_or_else(|| "unknown".to_string()),
524 name: message
525 .username
526 .clone()
527 .or_else(|| message.subtype.clone())
528 .unwrap_or_else(|| "Unknown".to_string()),
529 username: message.username.clone(),
530 avatar_url: None,
531 })
532 }
533
534 async fn get_user(&self, user_id: &str) -> Result<MessageAuthor> {
535 if let Some(cached) = self.user_cache.read().await.get(user_id).cloned() {
536 return Ok(cached);
537 }
538
539 let payload: SlackUsersInfoResponse = self
540 .post_form("users.info", &[("user", user_id.to_string())])
541 .await?;
542 ensure_ok(payload.ok, payload.error)?;
543
544 let user = payload
545 .user
546 .ok_or_else(|| Error::InvalidData("Slack users.info returned no user".to_string()))?;
547 let profile = user.profile.as_ref();
548 let display_name = profile
549 .and_then(|profile| profile.display_name.clone())
550 .filter(|name| !name.is_empty());
551 let real_name = profile
552 .and_then(|profile| profile.real_name.clone())
553 .filter(|name| !name.is_empty());
554 let username = user.name.filter(|name| !name.is_empty());
555 let name = display_name
556 .clone()
557 .or(real_name)
558 .or_else(|| username.clone())
559 .unwrap_or_else(|| user.id.clone());
560
561 let author = MessageAuthor {
562 id: user.id,
563 name,
564 username,
565 avatar_url: profile.and_then(|profile| profile.image_72.clone()),
566 };
567
568 self.user_cache
569 .write()
570 .await
571 .insert(user_id.to_string(), author.clone());
572
573 Ok(author)
574 }
575}
576
577#[async_trait]
578impl MessengerProvider for SlackClient {
579 fn provider_name(&self) -> &'static str {
580 "slack"
581 }
582
583 async fn get_chats(&self, params: GetChatsParams) -> Result<ProviderResult<MessengerChat>> {
584 self.get_conversations(¶ms).await
585 }
586
587 async fn get_messages(
588 &self,
589 params: GetMessagesParams,
590 ) -> Result<ProviderResult<MessengerMessage>> {
591 self.get_messages_page(¶ms).await
592 }
593
594 async fn search_messages(
595 &self,
596 params: SearchMessagesParams,
597 ) -> Result<ProviderResult<MessengerMessage>> {
598 let query = params.query.trim().to_lowercase();
599 if query.is_empty() {
600 return Err(Error::InvalidData(
601 "search query must not be empty".to_string(),
602 ));
603 }
604
605 let limit = params.limit.unwrap_or(20) as usize;
606 let mut found = Vec::new();
607
608 let (has_more, next_cursor) = if let Some(chat_id) = params.chat_id.as_ref() {
609 let mut state = parse_search_cursor(params.cursor.as_deref())?;
610 if state.current_chat_id.is_none() {
611 state.current_chat_id = Some(chat_id.clone());
612 }
613 if state.current_chat_id.as_deref() != Some(chat_id.as_str()) {
614 state = SlackSearchCursor {
615 current_chat_id: Some(chat_id.clone()),
616 ..SlackSearchCursor::default()
617 };
618 }
619 if state.current_message_cursor.is_none()
620 && state.current_message_offset == 0
621 && state.next_chats_cursor.is_some()
622 && !has_pending_chat_ids(&state)
623 {
624 state.current_message_cursor = state.next_chats_cursor.take();
625 }
626
627 loop {
628 let messages = self
629 .get_messages_page(&GetMessagesParams {
630 chat_id: chat_id.clone(),
631 limit: Some(params.limit.unwrap_or(100)),
632 cursor: state.current_message_cursor.clone(),
633 thread_id: None,
634 since: params.since.clone(),
635 until: params.until.clone(),
636 })
637 .await?;
638 let pagination = messages.pagination.clone();
639 let page_len = messages.items.len();
640
641 for message in messages
642 .items
643 .into_iter()
644 .skip(state.current_message_offset)
645 {
646 state.current_message_offset += 1;
647 if message.text.to_lowercase().contains(&query) {
648 found.push(message);
649 if found.len() >= limit {
650 break;
651 }
652 }
653 }
654
655 let page_has_more = pagination.as_ref().map(|p| p.has_more).unwrap_or(false);
656 let next_page_cursor = pagination.as_ref().and_then(|p| p.next_cursor.clone());
657
658 if found.len() >= limit {
659 if state.current_message_offset >= page_len {
660 if page_has_more {
661 state.current_message_cursor = next_page_cursor;
662 state.current_message_offset = 0;
663 } else {
664 state.current_chat_id = None;
665 state.current_message_cursor = None;
666 state.current_message_offset = 0;
667 }
668 }
669 break;
670 }
671
672 if page_has_more {
673 state.current_message_cursor = next_page_cursor;
674 state.current_message_offset = 0;
675 continue;
676 }
677
678 state.current_chat_id = None;
679 state.current_message_cursor = None;
680 state.current_message_offset = 0;
681 break;
682 }
683
684 let has_more = state.current_chat_id.is_some();
685 let next_cursor = serialize_search_cursor(&state)?;
686 (has_more, next_cursor)
687 } else {
688 let mut state = parse_search_cursor(params.cursor.as_deref())?;
689 let mut chats_loaded = state.current_chat_id.is_some()
690 || has_pending_chat_ids(&state)
691 || state.next_chats_cursor.is_some()
692 || params.cursor.is_some();
693
694 loop {
695 if found.len() >= limit {
696 break;
697 }
698
699 if let Some(chat_id) = state.current_chat_id.clone() {
700 let messages = self
701 .get_messages_page(&GetMessagesParams {
702 chat_id,
703 limit: Some(100),
704 cursor: state.current_message_cursor.clone(),
705 thread_id: None,
706 since: params.since.clone(),
707 until: params.until.clone(),
708 })
709 .await?;
710 let pagination = messages.pagination.clone();
711 let page_len = messages.items.len();
712
713 for message in messages
714 .items
715 .into_iter()
716 .skip(state.current_message_offset)
717 {
718 state.current_message_offset += 1;
719 if message.text.to_lowercase().contains(&query) {
720 found.push(message);
721 if found.len() >= limit {
722 break;
723 }
724 }
725 }
726
727 if found.len() >= limit {
728 if state.current_message_offset >= page_len {
729 let next_message_cursor = pagination
730 .as_ref()
731 .and_then(|page| page.next_cursor.clone());
732 if pagination
733 .as_ref()
734 .map(|page| page.has_more)
735 .unwrap_or(false)
736 {
737 state.current_message_cursor = next_message_cursor;
738 state.current_message_offset = 0;
739 } else {
740 state.current_chat_id = None;
741 state.current_message_cursor = None;
742 state.current_message_offset = 0;
743 }
744 }
745 break;
746 }
747
748 let next_message_cursor = pagination
749 .as_ref()
750 .and_then(|page| page.next_cursor.clone());
751 if pagination
752 .as_ref()
753 .map(|page| page.has_more)
754 .unwrap_or(false)
755 {
756 state.current_message_cursor = next_message_cursor;
757 state.current_message_offset = 0;
758 continue;
759 }
760
761 state.current_chat_id = None;
762 state.current_message_cursor = None;
763 state.current_message_offset = 0;
764 continue;
765 }
766
767 if let Some(next_chat_id) = take_next_pending_chat_id(&mut state) {
768 state.current_chat_id = Some(next_chat_id);
769 state.current_message_cursor = None;
770 state.current_message_offset = 0;
771 continue;
772 }
773
774 if chats_loaded && state.next_chats_cursor.is_none() {
775 break;
776 }
777
778 let chats = self
779 .get_conversations(&GetChatsParams {
780 search: None,
781 chat_type: None,
782 limit: Some(100),
783 cursor: state.next_chats_cursor.clone(),
784 include_inactive: Some(false),
785 })
786 .await?;
787 state.pending_chat_ids = chats.items.into_iter().map(|chat| chat.id).collect();
788 state.pending_chat_index = 0;
789 state.next_chats_cursor = chats.pagination.and_then(|page| page.next_cursor);
790 chats_loaded = true;
791 }
792
793 let has_more = state.current_chat_id.is_some()
794 || has_pending_chat_ids(&state)
795 || state.next_chats_cursor.is_some();
796 let next_cursor = serialize_search_cursor(&state)?;
797 (has_more, next_cursor)
798 };
799
800 Ok(ProviderResult::new(found).with_pagination(Pagination {
801 offset: 0,
802 limit: limit as u32,
803 total: None,
804 has_more,
805 next_cursor,
806 }))
807 }
808
809 async fn send_message(&self, params: SendMessageParams) -> Result<MessengerMessage> {
810 if !params.attachments.is_empty() {
811 return Err(Error::ProviderUnsupported {
812 provider: self.provider_name().to_string(),
813 operation: "send_message attachments".to_string(),
814 });
815 }
816
817 let thread_ts = params
818 .thread_id
819 .clone()
820 .or_else(|| params.reply_to_id.clone());
821 let mut form = vec![
822 ("channel", params.chat_id.clone()),
823 ("text", params.text.clone()),
824 ("unfurl_links", "false".to_string()),
825 ("unfurl_media", "false".to_string()),
826 ];
827 if let Some(thread_id) = thread_ts.as_ref() {
828 form.push(("thread_ts", thread_id.clone()));
829 }
830
831 let payload: SlackPostMessageResponse = self.post_form("chat.postMessage", &form).await?;
832 ensure_ok(payload.ok, payload.error)?;
833
834 let mut message = payload.message.unwrap_or(SlackMessage {
835 ts: payload.ts.clone().unwrap_or_default(),
836 text: Some(params.text),
837 user: None,
838 username: None,
839 bot_id: None,
840 thread_ts: thread_ts.clone(),
841 parent_user_id: None,
842 subtype: None,
843 edited: None,
844 files: None,
845 attachments: None,
846 bot_profile: None,
847 });
848
849 if message.thread_ts.is_none() {
850 message.thread_ts = thread_ts;
851 }
852
853 self.map_message(
854 payload.channel.as_deref().unwrap_or(¶ms.chat_id),
855 message,
856 )
857 .await
858 }
859}
860
861fn parse_search_cursor(cursor: Option<&str>) -> Result<SlackSearchCursor> {
862 let Some(cursor) = cursor.map(str::trim).filter(|cursor| !cursor.is_empty()) else {
863 return Ok(SlackSearchCursor::default());
864 };
865
866 match serde_json::from_str::<SlackSearchCursor>(cursor) {
867 Ok(state) => Ok(state),
868 Err(_) => Ok(SlackSearchCursor {
869 next_chats_cursor: Some(cursor.to_string()),
870 ..SlackSearchCursor::default()
871 }),
872 }
873}
874
875fn serialize_search_cursor(state: &SlackSearchCursor) -> Result<Option<String>> {
876 let has_more = state.current_chat_id.is_some()
877 || has_pending_chat_ids(state)
878 || state.next_chats_cursor.is_some();
879 if !has_more {
880 return Ok(None);
881 }
882
883 serde_json::to_string(state)
884 .map(Some)
885 .map_err(|e| Error::InvalidData(format!("failed to serialize Slack search cursor: {e}")))
886}
887
888fn map_chat(chat: SlackConversation) -> MessengerChat {
889 let name = conversation_name(&chat);
890 let description = chat
891 .purpose
892 .as_ref()
893 .and_then(|value| value.value.clone())
894 .filter(|value| !value.is_empty())
895 .or_else(|| {
896 chat.topic
897 .as_ref()
898 .and_then(|value| value.value.clone())
899 .filter(|value| !value.is_empty())
900 });
901
902 MessengerChat {
903 id: chat.id.clone(),
904 key: format!("slack:{}", chat.id),
905 name,
906 chat_type: slack_chat_type(&chat),
907 source: "slack".to_string(),
908 member_count: chat.num_members,
909 description,
910 is_active: !chat.is_archived.unwrap_or(false),
911 }
912}
913
914fn map_attachments(message: &SlackMessage) -> Vec<MessageAttachment> {
915 let mut attachments = Vec::new();
916
917 if let Some(files) = message.files.as_ref() {
918 attachments.extend(files.iter().map(|file| MessageAttachment {
919 id: file.id.clone(),
920 name: file.name.clone(),
921 attachment_type: file.filetype.clone().or_else(|| Some("file".to_string())),
922 url: file.permalink.clone().or_else(|| file.url_private.clone()),
923 mime_type: file.mimetype.clone(),
924 file_size: file.size,
925 }));
926 }
927
928 if let Some(rich_attachments) = message.attachments.as_ref() {
929 attachments.extend(rich_attachments.iter().map(|attachment| {
930 MessageAttachment {
931 id: attachment.id.map(|id| id.to_string()),
932 name: attachment
933 .title
934 .clone()
935 .or_else(|| attachment.fallback.clone()),
936 attachment_type: attachment.service_name.clone(),
937 url: attachment
938 .title_link
939 .clone()
940 .or_else(|| attachment.from_url.clone()),
941 mime_type: None,
942 file_size: None,
943 }
944 }));
945 }
946
947 attachments
948}
949
950fn matches_chat_filter(chat: &SlackConversation, params: &GetChatsParams) -> bool {
951 if let Some(expected) = params.chat_type
952 && slack_chat_type(chat) != expected
953 {
954 return false;
955 }
956
957 if let Some(search) = params.search.as_deref() {
958 let needle = search.to_lowercase();
959 let haystack = conversation_name(chat).to_lowercase();
960 if !haystack.contains(&needle) {
961 return false;
962 }
963 }
964
965 if !params.include_inactive.unwrap_or(false) && chat.is_archived.unwrap_or(false) {
966 return false;
967 }
968
969 true
970}
971
972fn slack_chat_type(chat: &SlackConversation) -> ChatType {
973 if chat.is_im.unwrap_or(false) {
974 ChatType::Direct
975 } else if chat.is_group.unwrap_or(false) || chat.is_mpim.unwrap_or(false) {
976 ChatType::Group
977 } else {
978 ChatType::Channel
979 }
980}
981
982fn conversation_name(chat: &SlackConversation) -> String {
983 chat.name
984 .clone()
985 .filter(|name| !name.is_empty())
986 .or_else(|| chat.user.clone().map(|user| format!("dm-{}", user)))
987 .unwrap_or_else(|| chat.id.clone())
988}
989
990fn slack_conversation_types(chat_type: Option<ChatType>) -> &'static str {
991 match chat_type {
992 Some(ChatType::Direct) => "im",
993 Some(ChatType::Group) => "mpim,private_channel",
994 Some(ChatType::Channel) => "public_channel,private_channel",
995 None => "public_channel,private_channel,mpim,im",
996 }
997}
998
999fn slack_next_cursor(metadata: Option<&SlackResponseMetadata>) -> Option<String> {
1000 metadata
1001 .map(|metadata| metadata.next_cursor.trim())
1002 .filter(|cursor| !cursor.is_empty())
1003 .map(ToOwned::to_owned)
1004}
1005
1006fn slack_rate_limit_bucket(method: &str) -> SlackRateLimitBucket {
1007 match method {
1008 "chat.postMessage" => SlackRateLimitBucket::Write,
1009 _ => SlackRateLimitBucket::Read,
1010 }
1011}
1012
1013fn normalize_ts_param(field_name: &str, value: Option<&str>) -> Result<Option<String>> {
1014 let Some(value) = value.map(str::trim) else {
1015 return Ok(None);
1016 };
1017 if value.is_empty() {
1018 return Ok(None);
1019 }
1020 if value.parse::<f64>().is_ok() {
1021 Ok(Some(value.to_string()))
1022 } else {
1023 Err(Error::InvalidData(format!(
1024 "{field_name} must be a Slack timestamp string"
1025 )))
1026 }
1027}
1028
1029fn has_pending_chat_ids(state: &SlackSearchCursor) -> bool {
1030 state.pending_chat_index < state.pending_chat_ids.len()
1031}
1032
1033fn take_next_pending_chat_id(state: &mut SlackSearchCursor) -> Option<String> {
1034 let next_chat_id = state
1035 .pending_chat_ids
1036 .get(state.pending_chat_index)
1037 .cloned()?;
1038 state.pending_chat_index += 1;
1039 if state.pending_chat_index >= state.pending_chat_ids.len() {
1040 state.pending_chat_ids.clear();
1041 state.pending_chat_index = 0;
1042 }
1043 Some(next_chat_id)
1044}
1045
1046fn parse_scopes(headers: &HeaderMap) -> Vec<String> {
1047 headers
1048 .get("x-oauth-scopes")
1049 .and_then(|value| value.to_str().ok())
1050 .map(|value| {
1051 value
1052 .split(',')
1053 .map(str::trim)
1054 .filter(|scope| !scope.is_empty())
1055 .map(ToString::to_string)
1056 .collect()
1057 })
1058 .unwrap_or_default()
1059}
1060
1061impl SlackRateLimiter {
1062 fn new(read_interval: Duration, write_interval: Duration) -> Self {
1063 let now = Instant::now();
1064 Self {
1065 state: Mutex::new(SlackRateLimitState {
1066 read_ready_at: now,
1067 write_ready_at: now,
1068 }),
1069 read_interval,
1070 write_interval,
1071 }
1072 }
1073
1074 async fn acquire(&self, bucket: SlackRateLimitBucket) {
1075 let (wait_until, interval) = {
1076 let mut state = self.state.lock().await;
1077 let now = Instant::now();
1078 let (ready_at, interval) = match bucket {
1079 SlackRateLimitBucket::Read => (&mut state.read_ready_at, self.read_interval),
1080 SlackRateLimitBucket::Write => (&mut state.write_ready_at, self.write_interval),
1081 };
1082 let wait_until = (*ready_at).max(now);
1083 *ready_at = wait_until + interval;
1084 (wait_until, interval)
1085 };
1086
1087 debug!(
1088 ?bucket,
1089 ?wait_until,
1090 ?interval,
1091 "slack rate limiter acquired slot"
1092 );
1093
1094 if wait_until > Instant::now() {
1095 sleep_until(wait_until).await;
1096 }
1097 }
1098
1099 async fn on_rate_limited(&self, bucket: SlackRateLimitBucket, retry_after: Option<Duration>) {
1100 let delay = retry_after.unwrap_or(match bucket {
1101 SlackRateLimitBucket::Read => self.read_interval,
1102 SlackRateLimitBucket::Write => self.write_interval,
1103 });
1104 let next_ready = Instant::now() + delay;
1105 let mut state = self.state.lock().await;
1106 let ready_at = match bucket {
1107 SlackRateLimitBucket::Read => &mut state.read_ready_at,
1108 SlackRateLimitBucket::Write => &mut state.write_ready_at,
1109 };
1110 if next_ready > *ready_at {
1111 *ready_at = next_ready;
1112 }
1113 }
1114}
1115
1116async fn map_http_error<T>(response: reqwest::Response) -> Result<T>
1117where
1118 T: DeserializeOwned,
1119{
1120 let status = response.status();
1121 let retry_after = response
1122 .headers()
1123 .get("retry-after")
1124 .and_then(|value| value.to_str().ok())
1125 .and_then(|value| value.parse::<u64>().ok());
1126
1127 if status.as_u16() == 429 {
1128 return Err(Error::RateLimited { retry_after });
1129 }
1130
1131 if !status.is_success() {
1132 let text = response.text().await.unwrap_or_default();
1133 return Err(Error::from_status(status.as_u16(), text));
1134 }
1135
1136 response
1137 .json()
1138 .await
1139 .map_err(|e| Error::InvalidData(e.to_string()))
1140}
1141
1142fn ensure_ok(ok: bool, error: Option<String>) -> Result<()> {
1143 if ok {
1144 Ok(())
1145 } else {
1146 Err(map_slack_error(
1147 error.unwrap_or_else(|| "unknown_slack_error".to_string()),
1148 ))
1149 }
1150}
1151
1152fn map_slack_error(message: String) -> Error {
1153 match message.as_str() {
1154 "invalid_auth" | "not_authed" => Error::Unauthorized(message),
1155 "missing_scope" | "not_allowed_token_type" => Error::Forbidden(message),
1156 "channel_not_found" | "user_not_found" => Error::NotFound(message),
1157 "ratelimited" => Error::RateLimited { retry_after: None },
1158 _ => Error::Api {
1159 status: 200,
1160 message,
1161 },
1162 }
1163}
1164
1165fn normalize_mrkdwn(text: &str) -> String {
1166 let decoded = text
1167 .replace("&", "&")
1168 .replace("<", "<")
1169 .replace(">", ">");
1170
1171 let mut output = String::new();
1172 let mut chars = decoded.chars().peekable();
1173
1174 while let Some(ch) = chars.next() {
1175 if ch == '<' {
1176 let mut token = String::new();
1177 let mut closed = false;
1178 for next in chars.by_ref() {
1179 if next == '>' {
1180 closed = true;
1181 break;
1182 }
1183 token.push(next);
1184 }
1185
1186 if closed {
1187 output.push_str(&normalize_slack_token(&token));
1188 } else {
1189 output.push('<');
1190 output.push_str(&token);
1191 }
1192 } else {
1193 output.push(ch);
1194 }
1195 }
1196
1197 output
1198}
1199
1200fn normalize_slack_token(token: &str) -> String {
1201 if let Some(user) = token.strip_prefix('@') {
1202 let mut parts = user.splitn(2, '|');
1203 let user_id = parts.next().unwrap_or(user);
1204 let label = parts
1205 .next()
1206 .filter(|label| !label.is_empty())
1207 .unwrap_or(user_id);
1208 return format!("@{}", label);
1209 }
1210 if let Some(rest) = token.strip_prefix('#') {
1211 let mut parts = rest.splitn(2, '|');
1212 let _ = parts.next();
1213 let label = parts.next().unwrap_or(rest);
1214 return format!("#{}", label);
1215 }
1216 if let Some(rest) = token.strip_prefix('!') {
1217 return rest.replace('|', " ");
1218 }
1219 if let Some((url, label)) = token.split_once('|') {
1220 return format!("[{}]({})", label, url);
1221 }
1222 token.to_string()
1223}
1224
1225fn slack_user_to_core(user: SlackUser) -> devboy_core::User {
1226 let profile = user.profile.clone();
1227 let display = profile
1228 .as_ref()
1229 .and_then(|p| p.display_name.clone())
1230 .filter(|s| !s.is_empty());
1231 let real = profile
1232 .as_ref()
1233 .and_then(|p| p.real_name.clone())
1234 .filter(|s| !s.is_empty());
1235 let username_candidate = user.name.clone().filter(|s| !s.is_empty());
1236 let name = display
1237 .clone()
1238 .or(real.clone())
1239 .or(username_candidate.clone());
1240 devboy_core::User {
1241 id: user.id,
1242 username: username_candidate.unwrap_or_default(),
1243 name,
1244 email: profile.as_ref().and_then(|p| p.email.clone()),
1245 avatar_url: profile.and_then(|p| p.image_72),
1246 }
1247}
1248
1249#[async_trait]
1250impl devboy_core::UserProvider for SlackClient {
1251 fn provider_name(&self) -> &'static str {
1252 "slack"
1253 }
1254
1255 async fn get_user_profile(&self, user_id: &str) -> Result<devboy_core::User> {
1256 let payload: SlackUsersInfoResponse = self
1257 .post_form("users.info", &[("user", user_id.to_string())])
1258 .await?;
1259 ensure_ok(payload.ok, payload.error)?;
1260 let user = payload
1261 .user
1262 .ok_or_else(|| Error::InvalidData("Slack users.info returned no user".to_string()))?;
1263 Ok(slack_user_to_core(user))
1264 }
1265
1266 async fn lookup_user_by_email(&self, email: &str) -> Result<Option<devboy_core::User>> {
1267 let payload: SlackUsersInfoResponse = self
1268 .post_form("users.lookupByEmail", &[("email", email.to_string())])
1269 .await?;
1270 if !payload.ok {
1271 match payload.error.as_deref() {
1275 Some("users_not_found") => return Ok(None),
1276 _ => {
1277 ensure_ok(payload.ok, payload.error)?;
1278 }
1279 }
1280 }
1281 Ok(payload.user.map(slack_user_to_core))
1282 }
1283}
1284
1285#[cfg(test)]
1286mod tests {
1287 use super::*;
1288 use httpmock::Method::POST;
1289 use httpmock::MockServer;
1290
1291 fn token(s: &str) -> SecretString {
1292 SecretString::from(s.to_string())
1293 }
1294
1295 #[tokio::test]
1296 async fn auth_info_reads_identity_and_scopes() {
1297 let server = MockServer::start();
1298 server.mock(|when, then| {
1299 when.method(POST).path("/auth.test");
1300 then.status(200)
1301 .header(
1302 "x-oauth-scopes",
1303 "channels:read, channels:history, groups:read, groups:history, im:read, im:history, mpim:read, mpim:history, chat:write, users:read",
1304 )
1305 .json_body(serde_json::json!({
1306 "ok": true,
1307 "url": "https://example.slack.com/",
1308 "team": "Example",
1309 "user": "devboy",
1310 "team_id": "T123",
1311 "user_id": "U123",
1312 "bot_id": "B123"
1313 }));
1314 });
1315
1316 let info = SlackClient::new(token("xoxb-test"))
1317 .with_base_url(server.base_url())
1318 .auth_info()
1319 .await
1320 .unwrap();
1321
1322 assert_eq!(info.team_name, "Example");
1323 assert_eq!(info.user_id, "U123");
1324 assert!(info.missing_scopes.is_empty());
1325 }
1326
1327 #[tokio::test]
1328 async fn ensure_healthy_fails_when_scopes_missing() {
1329 let server = MockServer::start();
1330 server.mock(|when, then| {
1331 when.method(POST).path("/auth.test");
1332 then.status(200)
1333 .header("x-oauth-scopes", "channels:read")
1334 .json_body(serde_json::json!({
1335 "ok": true,
1336 "team": "Example",
1337 "team_id": "T123",
1338 "user_id": "U123"
1339 }));
1340 });
1341
1342 let error = SlackClient::new(token("xoxb-test"))
1343 .with_base_url(server.base_url())
1344 .ensure_healthy()
1345 .await
1346 .unwrap_err();
1347
1348 assert!(error.to_string().contains("missing required scopes"));
1349 }
1350
1351 #[tokio::test]
1352 async fn get_chats_maps_slack_conversations() {
1353 let server = MockServer::start();
1354 server.mock(|when, then| {
1355 when.method(POST).path("/conversations.list");
1356 then.status(200).json_body(serde_json::json!({
1357 "ok": true,
1358 "channels": [
1359 {
1360 "id": "C123",
1361 "name": "engineering",
1362 "is_channel": true,
1363 "is_archived": false,
1364 "num_members": 4,
1365 "purpose": { "value": "Team chat" }
1366 }
1367 ],
1368 "response_metadata": { "next_cursor": "chat-cursor-1" }
1369 }));
1370 });
1371
1372 let result = SlackClient::new(token("xoxb-test"))
1373 .with_base_url(server.base_url())
1374 .get_chats(GetChatsParams::default())
1375 .await
1376 .unwrap();
1377
1378 assert_eq!(result.items.len(), 1);
1379 assert_eq!(result.items[0].name, "engineering");
1380 assert_eq!(result.items[0].chat_type, ChatType::Channel);
1381 assert_eq!(
1382 result
1383 .pagination
1384 .as_ref()
1385 .and_then(|pagination| pagination.next_cursor.as_deref()),
1386 Some("chat-cursor-1")
1387 );
1388 }
1389
1390 #[tokio::test]
1391 async fn get_messages_fetches_thread_replies() {
1392 let server = MockServer::start();
1393 server.mock(|when, then| {
1394 when.method(POST).path("/conversations.replies");
1395 then.status(200).json_body(serde_json::json!({
1396 "ok": true,
1397 "messages": [
1398 {
1399 "ts": "1710000000.000100",
1400 "text": "Root",
1401 "user": "U123",
1402 "thread_ts": "1710000000.000100"
1403 },
1404 {
1405 "ts": "1710000001.000100",
1406 "text": "Reply",
1407 "user": "U123",
1408 "thread_ts": "1710000000.000100"
1409 }
1410 ],
1411 "response_metadata": { "next_cursor": "reply-cursor-1" }
1412 }));
1413 });
1414 server.mock(|when, then| {
1415 when.method(POST).path("/users.info");
1416 then.status(200).json_body(serde_json::json!({
1417 "ok": true,
1418 "user": {
1419 "id": "U123",
1420 "name": "andrey",
1421 "profile": {
1422 "display_name": "Andrey",
1423 "real_name": "Andrey Maznyak",
1424 "image_72": "https://example.com/avatar.png"
1425 }
1426 }
1427 }));
1428 });
1429
1430 let result = SlackClient::new(token("xoxb-test"))
1431 .with_base_url(server.base_url())
1432 .get_messages(GetMessagesParams {
1433 chat_id: "C123".to_string(),
1434 limit: Some(20),
1435 cursor: None,
1436 thread_id: Some("1710000000.000100".to_string()),
1437 since: None,
1438 until: None,
1439 })
1440 .await
1441 .unwrap();
1442
1443 assert_eq!(result.items.len(), 2);
1444 assert_eq!(
1445 result.items[1].reply_to_id.as_deref(),
1446 Some("1710000000.000100")
1447 );
1448 assert_eq!(result.items[0].author.name, "Andrey");
1449 assert_eq!(
1450 result
1451 .pagination
1452 .as_ref()
1453 .and_then(|pagination| pagination.next_cursor.as_deref()),
1454 Some("reply-cursor-1")
1455 );
1456 }
1457
1458 #[tokio::test]
1459 async fn send_message_maps_response() {
1460 let server = MockServer::start();
1461 server.mock(|when, then| {
1462 when.method(POST).path("/chat.postMessage");
1463 then.status(200).json_body(serde_json::json!({
1464 "ok": true,
1465 "channel": "C123",
1466 "ts": "1710000100.000200",
1467 "message": {
1468 "ts": "1710000100.000200",
1469 "text": "hello world",
1470 "bot_profile": {
1471 "id": "B123",
1472 "name": "Devboy",
1473 "icons": { "image_72": "https://example.com/bot.png" }
1474 }
1475 }
1476 }));
1477 });
1478
1479 let result = SlackClient::new(token("xoxb-test"))
1480 .with_base_url(server.base_url())
1481 .send_message(SendMessageParams {
1482 chat_id: "C123".to_string(),
1483 text: "hello world".to_string(),
1484 thread_id: None,
1485 reply_to_id: None,
1486 attachments: vec![],
1487 })
1488 .await
1489 .unwrap();
1490
1491 assert_eq!(result.chat_id, "C123");
1492 assert_eq!(result.text, "hello world");
1493 assert_eq!(result.author.name, "Devboy");
1494 }
1495
1496 #[tokio::test]
1497 async fn send_message_uses_reply_to_id_as_thread_ts_when_thread_id_missing() {
1498 let server = MockServer::start();
1499 let post_message = server.mock(|when, then| {
1500 when.method(POST).path("/chat.postMessage");
1501 then.status(200).json_body(serde_json::json!({
1502 "ok": true,
1503 "channel": "C123",
1504 "ts": "1710000100.000200",
1505 "message": {
1506 "ts": "1710000100.000200",
1507 "text": "reply message"
1508 }
1509 }));
1510 });
1511
1512 let result = SlackClient::new(token("xoxb-test"))
1513 .with_base_url(server.base_url())
1514 .send_message(SendMessageParams {
1515 chat_id: "C123".to_string(),
1516 text: "reply message".to_string(),
1517 thread_id: None,
1518 reply_to_id: Some("1710000000.000100".to_string()),
1519 attachments: vec![],
1520 })
1521 .await
1522 .unwrap();
1523
1524 post_message.assert_calls(1);
1525 assert_eq!(result.thread_id.as_deref(), Some("1710000000.000100"));
1526 assert_eq!(result.reply_to_id.as_deref(), Some("1710000000.000100"));
1527 }
1528
1529 #[tokio::test]
1530 async fn send_message_rejects_attachments() {
1531 let err = SlackClient::new(token("xoxb-test"))
1532 .send_message(SendMessageParams {
1533 chat_id: "C123".to_string(),
1534 text: "reply message".to_string(),
1535 thread_id: None,
1536 reply_to_id: None,
1537 attachments: vec![MessageAttachment {
1538 id: Some("att-1".to_string()),
1539 name: Some("report.txt".to_string()),
1540 attachment_type: None,
1541 url: None,
1542 mime_type: None,
1543 file_size: None,
1544 }],
1545 })
1546 .await
1547 .unwrap_err();
1548
1549 assert!(matches!(
1550 err,
1551 Error::ProviderUnsupported { provider, operation }
1552 if provider == "slack" && operation == "send_message attachments"
1553 ));
1554 }
1555
1556 #[tokio::test]
1557 async fn get_messages_caches_resolved_users() {
1558 let server = MockServer::start();
1559 server.mock(|when, then| {
1560 when.method(POST).path("/conversations.history");
1561 then.status(200).json_body(serde_json::json!({
1562 "ok": true,
1563 "messages": [
1564 {
1565 "ts": "1710000000.000100",
1566 "text": "First",
1567 "user": "U123"
1568 },
1569 {
1570 "ts": "1710000001.000100",
1571 "text": "Second",
1572 "user": "U123"
1573 }
1574 ]
1575 }));
1576 });
1577 let users_info = server.mock(|when, then| {
1578 when.method(POST).path("/users.info");
1579 then.status(200).json_body(serde_json::json!({
1580 "ok": true,
1581 "user": {
1582 "id": "U123",
1583 "name": "andrey",
1584 "profile": {
1585 "display_name": "Andrey"
1586 }
1587 }
1588 }));
1589 });
1590
1591 let client = SlackClient::new(token("xoxb-test")).with_base_url(server.base_url());
1592 let result = client
1593 .get_messages(GetMessagesParams {
1594 chat_id: "C123".to_string(),
1595 limit: Some(20),
1596 cursor: None,
1597 thread_id: None,
1598 since: None,
1599 until: None,
1600 })
1601 .await
1602 .unwrap();
1603
1604 assert_eq!(result.items.len(), 2);
1605 assert_eq!(result.items[0].author.name, "Andrey");
1606 assert_eq!(result.items[1].author.name, "Andrey");
1607 users_info.assert_calls(1);
1608 }
1609
1610 #[test]
1611 fn client_configuration_helpers_update_settings() {
1612 let client = SlackClient::new(token("xoxb-test"))
1613 .with_base_url("https://slack.example.test/")
1614 .with_required_scopes(vec!["search:read".to_string(), "chat:write".to_string()]);
1615
1616 assert_eq!(client.base_url, "https://slack.example.test");
1617 assert_eq!(
1618 client.required_scopes(),
1619 ["search:read".to_string(), "chat:write".to_string()]
1620 );
1621 }
1622
1623 #[tokio::test]
1624 async fn auth_info_maps_invalid_auth_to_unauthorized() {
1625 let server = MockServer::start();
1626 server.mock(|when, then| {
1627 when.method(POST).path("/auth.test");
1628 then.status(200).json_body(serde_json::json!({
1629 "ok": false,
1630 "error": "invalid_auth"
1631 }));
1632 });
1633
1634 let error = SlackClient::new(token("xoxb-test"))
1635 .with_base_url(server.base_url())
1636 .auth_info()
1637 .await
1638 .unwrap_err();
1639
1640 assert!(matches!(error, Error::Unauthorized(message) if message == "invalid_auth"));
1641 }
1642
1643 #[tokio::test]
1644 async fn auth_info_maps_missing_scope_to_forbidden() {
1645 let server = MockServer::start();
1646 server.mock(|when, then| {
1647 when.method(POST).path("/auth.test");
1648 then.status(200).json_body(serde_json::json!({
1649 "ok": false,
1650 "error": "missing_scope"
1651 }));
1652 });
1653
1654 let error = SlackClient::new(token("xoxb-test"))
1655 .with_base_url(server.base_url())
1656 .auth_info()
1657 .await
1658 .unwrap_err();
1659
1660 assert!(matches!(error, Error::Forbidden(message) if message == "missing_scope"));
1661 }
1662
1663 #[tokio::test]
1664 async fn auth_info_maps_rate_limit_to_rate_limited_error() {
1665 let server = MockServer::start();
1666 server.mock(|when, then| {
1667 when.method(POST).path("/auth.test");
1668 then.status(429).header("retry-after", "7");
1669 });
1670
1671 let error = SlackClient::new(token("xoxb-test"))
1672 .with_base_url(server.base_url())
1673 .auth_info()
1674 .await
1675 .unwrap_err();
1676
1677 assert!(matches!(
1678 error,
1679 Error::RateLimited {
1680 retry_after: Some(7)
1681 }
1682 ));
1683 }
1684
1685 #[tokio::test]
1686 async fn search_messages_rejects_empty_query() {
1687 let error = SlackClient::new(token("xoxb-test"))
1688 .search_messages(SearchMessagesParams {
1689 query: " ".to_string(),
1690 ..SearchMessagesParams::default()
1691 })
1692 .await
1693 .unwrap_err();
1694
1695 assert!(
1696 matches!(error, Error::InvalidData(message) if message.contains("must not be empty"))
1697 );
1698 }
1699
1700 #[tokio::test]
1701 async fn search_messages_global_cursor_resumes_within_chat_before_next_chat_page() {
1702 let server = MockServer::start();
1703 server.mock(|when, then| {
1704 when.method(POST).path("/conversations.list");
1705 then.status(200).json_body(serde_json::json!({
1706 "ok": true,
1707 "channels": [
1708 { "id": "C1", "name": "general", "is_channel": true, "is_archived": false },
1709 { "id": "C2", "name": "random", "is_channel": true, "is_archived": false }
1710 ],
1711 "response_metadata": { "next_cursor": "chat-page-2" }
1712 }));
1713 });
1714 server.mock(|when, then| {
1715 when.method(POST).path("/conversations.history");
1716 then.status(200).json_body(serde_json::json!({
1717 "ok": true,
1718 "messages": [
1719 { "ts": "1710000000.000100", "text": "no match", "username": "bot" },
1720 { "ts": "1710000001.000100", "text": "Needle on first page", "username": "bot" }
1721 ],
1722 "has_more": true,
1723 "response_metadata": { "next_cursor": "msg-page-2" }
1724 }));
1725 });
1726
1727 let client = SlackClient::new(token("xoxb-test")).with_base_url(server.base_url());
1728 let first = client
1729 .search_messages(SearchMessagesParams {
1730 query: "needle".to_string(),
1731 limit: Some(1),
1732 ..SearchMessagesParams::default()
1733 })
1734 .await
1735 .unwrap();
1736
1737 assert_eq!(first.items.len(), 1);
1738 assert_eq!(first.items[0].chat_id, "C1");
1739 assert!(first.pagination.as_ref().unwrap().has_more);
1740
1741 let cursor = first
1742 .pagination
1743 .as_ref()
1744 .and_then(|pagination| pagination.next_cursor.clone())
1745 .unwrap();
1746 let state = parse_search_cursor(Some(&cursor)).unwrap();
1747 assert_eq!(state.current_chat_id.as_deref(), Some("C1"));
1748 assert_eq!(state.current_message_cursor.as_deref(), Some("msg-page-2"));
1749 assert_eq!(state.current_message_offset, 0);
1750 assert_eq!(state.pending_chat_ids, vec!["C1", "C2"]);
1751 assert_eq!(state.pending_chat_index, 1);
1752 assert_eq!(state.next_chats_cursor.as_deref(), Some("chat-page-2"));
1753 }
1754
1755 #[tokio::test]
1756 async fn search_messages_chat_cursor_resumes_within_page_before_next_page() {
1757 let server = MockServer::start();
1758 server.mock(|when, then| {
1759 when.method(POST).path("/conversations.history");
1760 then.status(200).json_body(serde_json::json!({
1761 "ok": true,
1762 "messages": [
1763 { "ts": "1710000000.000100", "text": "needle first", "username": "bot" },
1764 { "ts": "1710000001.000100", "text": "needle second", "username": "bot" },
1765 { "ts": "1710000002.000100", "text": "no match", "username": "bot" }
1766 ],
1767 "has_more": true,
1768 "response_metadata": { "next_cursor": "msg-page-2" }
1769 }));
1770 });
1771
1772 let client = SlackClient::new(token("xoxb-test")).with_base_url(server.base_url());
1773 let first = client
1774 .search_messages(SearchMessagesParams {
1775 chat_id: Some("C1".to_string()),
1776 query: "needle".to_string(),
1777 limit: Some(1),
1778 ..SearchMessagesParams::default()
1779 })
1780 .await
1781 .unwrap();
1782
1783 assert_eq!(first.items.len(), 1);
1784 assert_eq!(first.items[0].text, "needle first");
1785 assert!(first.pagination.as_ref().unwrap().has_more);
1786
1787 let cursor = first
1788 .pagination
1789 .as_ref()
1790 .and_then(|pagination| pagination.next_cursor.clone())
1791 .unwrap();
1792 let state = parse_search_cursor(Some(&cursor)).unwrap();
1793 assert_eq!(state.current_chat_id.as_deref(), Some("C1"));
1794 assert_eq!(state.current_message_cursor, None);
1795 assert_eq!(state.current_message_offset, 1);
1796
1797 let second = client
1798 .search_messages(SearchMessagesParams {
1799 chat_id: Some("C1".to_string()),
1800 query: "needle".to_string(),
1801 limit: Some(1),
1802 cursor: Some(cursor),
1803 ..SearchMessagesParams::default()
1804 })
1805 .await
1806 .unwrap();
1807
1808 assert_eq!(second.items.len(), 1);
1809 assert_eq!(second.items[0].text, "needle second");
1810 assert!(second.pagination.as_ref().unwrap().has_more);
1811
1812 let second_cursor = second
1813 .pagination
1814 .as_ref()
1815 .and_then(|pagination| pagination.next_cursor.clone())
1816 .unwrap();
1817 let second_state = parse_search_cursor(Some(&second_cursor)).unwrap();
1818 assert_eq!(second_state.current_chat_id.as_deref(), Some("C1"));
1819 assert_eq!(second_state.current_message_cursor, None);
1820 assert_eq!(second_state.current_message_offset, 2);
1821 }
1822
1823 #[test]
1824 fn helper_functions_cover_filtering_and_mapping_cases() {
1825 let archived_group = SlackConversation {
1826 id: "C1".to_string(),
1827 name: Some("Project Alpha".to_string()),
1828 user: None,
1829 is_group: Some(true),
1830 is_im: None,
1831 is_mpim: None,
1832 is_archived: Some(true),
1833 num_members: Some(3),
1834 purpose: Some(SlackTextValue {
1835 value: Some("".to_string()),
1836 }),
1837 topic: Some(SlackTextValue {
1838 value: Some("Topic text".to_string()),
1839 }),
1840 };
1841
1842 assert_eq!(slack_chat_type(&archived_group), ChatType::Group);
1843 assert_eq!(conversation_name(&archived_group), "Project Alpha");
1844 assert!(!matches_chat_filter(
1845 &archived_group,
1846 &GetChatsParams {
1847 search: Some("alpha".to_string()),
1848 chat_type: Some(ChatType::Group),
1849 include_inactive: Some(false),
1850 ..GetChatsParams::default()
1851 }
1852 ));
1853 assert!(matches_chat_filter(
1854 &archived_group,
1855 &GetChatsParams {
1856 search: Some("alpha".to_string()),
1857 chat_type: Some(ChatType::Group),
1858 include_inactive: Some(true),
1859 ..GetChatsParams::default()
1860 }
1861 ));
1862
1863 let direct_chat = SlackConversation {
1864 id: "D1".to_string(),
1865 name: None,
1866 user: Some("U123".to_string()),
1867 is_group: None,
1868 is_im: Some(true),
1869 is_mpim: None,
1870 is_archived: Some(false),
1871 num_members: None,
1872 purpose: None,
1873 topic: None,
1874 };
1875 assert_eq!(slack_chat_type(&direct_chat), ChatType::Direct);
1876 assert_eq!(conversation_name(&direct_chat), "dm-U123");
1877
1878 let mapped = map_chat(archived_group);
1879 assert_eq!(mapped.description.as_deref(), Some("Topic text"));
1880 assert!(!mapped.is_active);
1881 }
1882
1883 #[test]
1884 fn attachment_and_cursor_helpers_cover_fallback_paths() {
1885 let attachments = map_attachments(&SlackMessage {
1886 ts: "1710000000.000100".to_string(),
1887 text: None,
1888 user: None,
1889 username: None,
1890 bot_id: None,
1891 thread_ts: None,
1892 parent_user_id: None,
1893 subtype: None,
1894 edited: None,
1895 files: Some(vec![SlackFile {
1896 id: Some("F1".to_string()),
1897 name: Some("report.pdf".to_string()),
1898 mimetype: Some("application/pdf".to_string()),
1899 filetype: None,
1900 size: Some(1234),
1901 url_private: Some("https://private.example/report.pdf".to_string()),
1902 permalink: None,
1903 }]),
1904 attachments: Some(vec![SlackRichAttachment {
1905 id: Some(42),
1906 title: None,
1907 fallback: Some("Fallback title".to_string()),
1908 service_name: Some("docs".to_string()),
1909 title_link: None,
1910 from_url: Some("https://example.com/doc".to_string()),
1911 }]),
1912 bot_profile: None,
1913 });
1914
1915 assert_eq!(attachments.len(), 2);
1916 assert_eq!(attachments[0].attachment_type.as_deref(), Some("file"));
1917 assert_eq!(
1918 attachments[0].url.as_deref(),
1919 Some("https://private.example/report.pdf")
1920 );
1921 assert_eq!(attachments[1].id.as_deref(), Some("42"));
1922 assert_eq!(attachments[1].name.as_deref(), Some("Fallback title"));
1923 assert_eq!(
1924 attachments[1].url.as_deref(),
1925 Some("https://example.com/doc")
1926 );
1927
1928 assert_eq!(slack_conversation_types(Some(ChatType::Direct)), "im");
1929 assert_eq!(
1930 slack_conversation_types(Some(ChatType::Group)),
1931 "mpim,private_channel"
1932 );
1933 assert_eq!(
1934 slack_conversation_types(Some(ChatType::Channel)),
1935 "public_channel,private_channel"
1936 );
1937 assert_eq!(
1938 slack_conversation_types(None),
1939 "public_channel,private_channel,mpim,im"
1940 );
1941 assert_eq!(
1942 slack_next_cursor(Some(&SlackResponseMetadata {
1943 next_cursor: " cursor-1 ".to_string(),
1944 })),
1945 Some("cursor-1".to_string())
1946 );
1947 assert_eq!(slack_next_cursor(None), None);
1948 }
1949
1950 #[test]
1951 fn ts_scope_and_markdown_helpers_cover_edge_cases() {
1952 let mut headers = HeaderMap::new();
1953 headers.insert(
1954 "x-oauth-scopes",
1955 " channels:read, , chat:write ".parse().unwrap(),
1956 );
1957
1958 assert_eq!(
1959 normalize_ts_param("since", Some(" 1710000000.000100 "))
1960 .unwrap()
1961 .as_deref(),
1962 Some("1710000000.000100")
1963 );
1964 assert!(matches!(
1965 normalize_ts_param("until", Some("not-a-ts")).unwrap_err(),
1966 Error::InvalidData(message) if message == "until must be a Slack timestamp string"
1967 ));
1968 assert_eq!(normalize_ts_param("since", Some(" ")).unwrap(), None);
1969 assert_eq!(
1970 parse_scopes(&headers),
1971 vec!["channels:read".to_string(), "chat:write".to_string()]
1972 );
1973 assert!(parse_scopes(&HeaderMap::new()).is_empty());
1974
1975 assert_eq!(normalize_slack_token("@U123"), "@U123");
1976 assert_eq!(normalize_slack_token("@U123|andrey"), "@andrey");
1977 assert_eq!(normalize_slack_token("#C123|general"), "#general");
1978 assert_eq!(normalize_slack_token("!here"), "here");
1979 assert_eq!(
1980 normalize_slack_token("https://example.com|docs"),
1981 "[docs](https://example.com)"
1982 );
1983 assert_eq!(normalize_slack_token("plain-token"), "plain-token");
1984 assert_eq!(
1985 normalize_mrkdwn("unterminated <https://example.com"),
1986 "unterminated <https://example.com"
1987 );
1988 }
1989
1990 #[test]
1991 fn slack_error_helpers_map_expected_variants() {
1992 assert!(ensure_ok(true, None).is_ok());
1993 assert!(matches!(
1994 ensure_ok(false, Some("missing_scope".to_string())).unwrap_err(),
1995 Error::Forbidden(message) if message == "missing_scope"
1996 ));
1997 assert!(matches!(
1998 map_slack_error("invalid_auth".to_string()),
1999 Error::Unauthorized(message) if message == "invalid_auth"
2000 ));
2001 assert!(matches!(
2002 map_slack_error("not_allowed_token_type".to_string()),
2003 Error::Forbidden(message) if message == "not_allowed_token_type"
2004 ));
2005 assert!(matches!(
2006 map_slack_error("channel_not_found".to_string()),
2007 Error::NotFound(message) if message == "channel_not_found"
2008 ));
2009 assert!(matches!(
2010 map_slack_error("ratelimited".to_string()),
2011 Error::RateLimited { retry_after: None }
2012 ));
2013 assert!(matches!(
2014 map_slack_error("other".to_string()),
2015 Error::Api { status: 200, message } if message == "other"
2016 ));
2017 }
2018
2019 #[test]
2020 fn normalize_slack_markup_to_markdownish_text() {
2021 let text = normalize_mrkdwn(
2022 "See <https://example.com|docs> and talk to <@U123> in <#C123|general>",
2023 );
2024 assert!(text.contains("[docs](https://example.com)"));
2025 assert!(text.contains("@U123"));
2026 assert!(text.contains("#general"));
2027 }
2028
2029 #[test]
2030 fn rate_limit_bucket_matches_write_method() {
2031 assert_eq!(
2032 slack_rate_limit_bucket("chat.postMessage"),
2033 SlackRateLimitBucket::Write
2034 );
2035 assert_eq!(
2036 slack_rate_limit_bucket("conversations.history"),
2037 SlackRateLimitBucket::Read
2038 );
2039 }
2040
2041 #[tokio::test]
2042 async fn rate_limiter_spaces_same_bucket_requests() {
2043 let limiter = SlackRateLimiter::new(Duration::from_millis(25), Duration::from_millis(10));
2044
2045 let start = std::time::Instant::now();
2046 limiter.acquire(SlackRateLimitBucket::Read).await;
2047 limiter.acquire(SlackRateLimitBucket::Read).await;
2048
2049 assert!(start.elapsed() >= Duration::from_millis(20));
2050 }
2051
2052 #[tokio::test]
2053 async fn rate_limiter_keeps_read_and_write_buckets_independent() {
2054 let limiter = SlackRateLimiter::new(Duration::from_millis(50), Duration::from_millis(10));
2055
2056 limiter.acquire(SlackRateLimitBucket::Read).await;
2057
2058 let start = std::time::Instant::now();
2059 limiter.acquire(SlackRateLimitBucket::Write).await;
2060
2061 assert!(start.elapsed() < Duration::from_millis(25));
2062 }
2063
2064 #[test]
2065 fn parse_search_cursor_accepts_legacy_chat_cursor() {
2066 let state = parse_search_cursor(Some("chat-cursor-1")).unwrap();
2067
2068 assert_eq!(state.next_chats_cursor.as_deref(), Some("chat-cursor-1"));
2069 assert!(state.current_chat_id.is_none());
2070 assert!(state.pending_chat_ids.is_empty());
2071 }
2072
2073 #[test]
2074 fn search_cursor_round_trips_with_message_progress() {
2075 let cursor = SlackSearchCursor {
2076 version: 1,
2077 current_chat_id: Some("C123".to_string()),
2078 current_message_cursor: Some("msg-cursor-2".to_string()),
2079 current_message_offset: 37,
2080 pending_chat_ids: vec!["C124".to_string(), "C125".to_string()],
2081 pending_chat_index: 1,
2082 next_chats_cursor: Some("chat-cursor-9".to_string()),
2083 };
2084
2085 let encoded = serialize_search_cursor(&cursor).unwrap().unwrap();
2086 let decoded = parse_search_cursor(Some(&encoded)).unwrap();
2087
2088 assert_eq!(decoded.version, 1);
2089 assert_eq!(decoded.current_chat_id.as_deref(), Some("C123"));
2090 assert_eq!(
2091 decoded.current_message_cursor.as_deref(),
2092 Some("msg-cursor-2")
2093 );
2094 assert_eq!(decoded.current_message_offset, 37);
2095 assert_eq!(decoded.pending_chat_ids, vec!["C124", "C125"]);
2096 assert_eq!(decoded.pending_chat_index, 1);
2097 assert_eq!(decoded.next_chats_cursor.as_deref(), Some("chat-cursor-9"));
2098 }
2099
2100 #[test]
2101 fn take_next_pending_chat_id_advances_without_shifting() {
2102 let mut state = SlackSearchCursor {
2103 pending_chat_ids: vec!["C124".to_string(), "C125".to_string()],
2104 ..SlackSearchCursor::default()
2105 };
2106
2107 assert_eq!(
2108 take_next_pending_chat_id(&mut state).as_deref(),
2109 Some("C124")
2110 );
2111 assert_eq!(state.pending_chat_ids, vec!["C124", "C125"]);
2112 assert_eq!(state.pending_chat_index, 1);
2113 assert!(has_pending_chat_ids(&state));
2114
2115 assert_eq!(
2116 take_next_pending_chat_id(&mut state).as_deref(),
2117 Some("C125")
2118 );
2119 assert!(state.pending_chat_ids.is_empty());
2120 assert_eq!(state.pending_chat_index, 0);
2121 assert!(!has_pending_chat_ids(&state));
2122 }
2123
2124 #[test]
2125 fn serialize_search_cursor_omits_finished_state() {
2126 let encoded = serialize_search_cursor(&SlackSearchCursor::default()).unwrap();
2127
2128 assert!(encoded.is_none());
2129 }
2130
2131 #[tokio::test]
2136 async fn user_provider_get_user_profile_maps_fields() {
2137 use devboy_core::UserProvider;
2138 let server = MockServer::start();
2139 server.mock(|when, then| {
2140 when.method(POST).path("/users.info");
2141 then.status(200).json_body(serde_json::json!({
2142 "ok": true,
2143 "user": {
2144 "id": "U1",
2145 "name": "alice",
2146 "profile": {
2147 "real_name": "Alice Liddell",
2148 "display_name": "alice.l",
2149 "image_72": "https://cdn/alice.png",
2150 "email": "alice@example.com"
2151 }
2152 }
2153 }));
2154 });
2155
2156 let user = SlackClient::new(token("xoxb-test"))
2157 .with_base_url(server.base_url())
2158 .get_user_profile("U1")
2159 .await
2160 .unwrap();
2161
2162 assert_eq!(user.id, "U1");
2163 assert_eq!(user.username, "alice");
2164 assert_eq!(user.name.as_deref(), Some("alice.l"));
2165 assert_eq!(user.email.as_deref(), Some("alice@example.com"));
2166 assert_eq!(user.avatar_url.as_deref(), Some("https://cdn/alice.png"));
2167 }
2168
2169 #[tokio::test]
2170 async fn user_provider_lookup_by_email_users_not_found_returns_none() {
2171 use devboy_core::UserProvider;
2172 let server = MockServer::start();
2173 server.mock(|when, then| {
2174 when.method(POST).path("/users.lookupByEmail");
2175 then.status(200).json_body(serde_json::json!({
2176 "ok": false,
2177 "error": "users_not_found"
2178 }));
2179 });
2180
2181 let result = SlackClient::new(token("xoxb-test"))
2182 .with_base_url(server.base_url())
2183 .lookup_user_by_email("missing@example.com")
2184 .await
2185 .unwrap();
2186
2187 assert!(result.is_none());
2188 }
2189
2190 #[tokio::test]
2191 async fn user_provider_lookup_by_email_hit_returns_user() {
2192 use devboy_core::UserProvider;
2193 let server = MockServer::start();
2194 server.mock(|when, then| {
2195 when.method(POST).path("/users.lookupByEmail");
2196 then.status(200).json_body(serde_json::json!({
2197 "ok": true,
2198 "user": {
2199 "id": "U2",
2200 "name": "bob",
2201 "profile": {
2202 "real_name": "Bob",
2203 "display_name": "",
2204 "image_72": null,
2205 "email": "bob@example.com"
2206 }
2207 }
2208 }));
2209 });
2210
2211 let user = SlackClient::new(token("xoxb-test"))
2212 .with_base_url(server.base_url())
2213 .lookup_user_by_email("bob@example.com")
2214 .await
2215 .unwrap()
2216 .expect("user match");
2217
2218 assert_eq!(user.id, "U2");
2219 assert_eq!(user.email.as_deref(), Some("bob@example.com"));
2220 assert_eq!(user.name.as_deref(), Some("Bob"));
2222 }
2223
2224 #[tokio::test]
2225 async fn user_provider_lookup_by_email_propagates_other_errors() {
2226 use devboy_core::UserProvider;
2227 let server = MockServer::start();
2228 server.mock(|when, then| {
2229 when.method(POST).path("/users.lookupByEmail");
2230 then.status(200).json_body(serde_json::json!({
2231 "ok": false,
2232 "error": "missing_scope"
2233 }));
2234 });
2235
2236 let err = SlackClient::new(token("xoxb-test"))
2237 .with_base_url(server.base_url())
2238 .lookup_user_by_email("alice@example.com")
2239 .await
2240 .expect_err("missing_scope must not be silently swallowed");
2241 let msg = err.to_string();
2242 assert!(msg.contains("missing_scope"), "unexpected: {msg}");
2243 }
2244}