1use super::traits::{Channel, ChannelMessage, SendMessage};
2use anyhow::Context;
3use async_trait::async_trait;
4use base64::Engine as _;
5use chrono::Utc;
6use futures_util::{SinkExt, StreamExt};
7use reqwest::header::HeaderMap;
8use std::collections::{HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Mutex;
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::io::AsyncWriteExt;
13use tokio_tungstenite::tungstenite::Message as WsMessage;
14
15#[derive(Clone)]
16struct CachedSlackDisplayName {
17 display_name: String,
18 expires_at: Instant,
19}
20
21#[allow(clippy::struct_excessive_bools)]
23pub struct SlackChannel {
24 bot_token: String,
25 app_token: Option<String>,
26 channel_id: Option<String>,
27 channel_ids: Vec<String>,
28 allowed_users: Vec<String>,
29 thread_replies: bool,
30 mention_only: bool,
31 group_reply_allowed_sender_ids: Vec<String>,
32 user_display_name_cache: Mutex<HashMap<String, CachedSlackDisplayName>>,
33 workspace_dir: Option<PathBuf>,
34 active_assistant_thread: Mutex<HashMap<String, String>>,
36 use_markdown_blocks: bool,
38 proxy_url: Option<String>,
40 transcription: Option<crate::config::TranscriptionConfig>,
43 transcription_manager: Option<std::sync::Arc<super::transcription::TranscriptionManager>>,
44 stream_drafts: bool,
46 draft_update_interval_ms: u64,
48 last_draft_edit: Mutex<HashMap<String, Instant>>,
50 lazy_draft_ts: tokio::sync::Mutex<HashMap<String, String>>,
54 cancel_reaction: Option<String>,
56 approval_registry: Option<std::sync::Arc<crate::gateway::approval_registry::ApprovalRegistry>>,
60 gateway_port: u16,
63}
64
65const SLACK_HISTORY_MAX_RETRIES: u32 = 3;
66const SLACK_HISTORY_DEFAULT_RETRY_AFTER_SECS: u64 = 1;
67const SLACK_HISTORY_MAX_BACKOFF_SECS: u64 = 120;
68const SLACK_HISTORY_MAX_JITTER_MS: u64 = 500;
69const SLACK_SOCKET_MODE_INITIAL_BACKOFF_SECS: u64 = 3;
70const SLACK_SOCKET_MODE_MAX_BACKOFF_SECS: u64 = 120;
71const SLACK_SOCKET_MODE_MAX_JITTER_MS: u64 = 500;
72const SLACK_USER_CACHE_TTL_SECS: u64 = 6 * 60 * 60;
73const SLACK_ATTACHMENT_IMAGE_MAX_BYTES: usize = 5 * 1024 * 1024;
74const SLACK_ATTACHMENT_IMAGE_INLINE_FALLBACK_MAX_BYTES: usize = 512 * 1024;
75const SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES: usize = 256 * 1024;
76const SLACK_ATTACHMENT_TEXT_INLINE_MAX_CHARS: usize = 12_000;
77const SLACK_MARKDOWN_BLOCK_MAX_CHARS: usize = 12_000;
78const SLACK_BLOCK_TEXT_MAX_CHARS: usize = 3_000;
79const SLACK_MAX_BLOCKS_PER_MESSAGE: usize = 50;
80const SLACK_ATTACHMENT_FILENAME_MAX_CHARS: usize = 128;
81const SLACK_USER_CACHE_MAX_ENTRIES: usize = 1000;
82const SLACK_ATTACHMENT_SAVE_SUBDIR: &str = "slack_files";
83const SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE: usize = 8;
84const SLACK_PERMALINK_MAX_LINKS_PER_MESSAGE: usize = 3;
85const SLACK_PERMALINK_THREAD_MAX_REPLIES: usize = 20;
86const SLACK_PERMALINK_TEXT_MAX_CHARS: usize = 8_000;
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89struct SlackPermalinkRef {
90 url: String,
91 channel_id: String,
92 message_ts: String,
93 thread_ts_hint: Option<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97enum SlackPermalinkLookup {
98 Message(serde_json::Value),
99 AccessDenied(String),
100 NotFound,
101}
102
103fn extract_slack_ts(message_id: &str) -> &str {
109 message_id
110 .strip_prefix("slack_")
111 .and_then(|rest| {
112 rest.find('.').map(|dot_pos| {
113 let underscore = rest[..dot_pos].rfind('_').unwrap_or(0);
114 &rest[underscore + 1..]
115 })
116 })
117 .unwrap_or(message_id)
118}
119
120fn unicode_emoji_to_slack_name(emoji: &str) -> &str {
125 match emoji {
126 "\u{1F440}" => "eyes", "\u{2705}" => "white_check_mark", "\u{26A0}\u{FE0F}" | "\u{26A0}" => "warning", "\u{274C}" => "x", "\u{1F44D}" => "thumbsup", "\u{1F44E}" => "thumbsdown", "\u{2B50}" => "star", "\u{1F389}" => "tada", "\u{1F914}" => "thinking_face", "\u{1F525}" => "fire", _ => emoji.trim_matches(':'),
137 }
138}
139const SLACK_DRAFT_UPDATE_INTERVAL_MS: u64 = 1200;
142
143const SLACK_MESSAGE_MAX_CHARS: usize = 40_000;
145
146const LAZY_DRAFT_PREFIX: &str = "lazy:";
148
149const SLACK_ATTACHMENT_RENDER_CONCURRENCY: usize = 3;
150const SLACK_POLL_ACTIVE_THREAD_MAX: usize = 50;
151const SLACK_POLL_THREAD_EXPIRE_SECS: u64 = 24 * 60 * 60;
152const SLACK_MEDIA_REDIRECT_MAX_HOPS: usize = 5;
153const SLACK_ALLOWED_MEDIA_HOST_SUFFIXES: &[&str] =
154 &["slack.com", "slack-edge.com", "slack-files.com"];
155const SLACK_SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &[
156 "image/png",
157 "image/jpeg",
158 "image/webp",
159 "image/gif",
160 "image/bmp",
161];
162
163impl SlackChannel {
164 pub fn new(
165 bot_token: String,
166 app_token: Option<String>,
167 channel_id: Option<String>,
168 channel_ids: Vec<String>,
169 allowed_users: Vec<String>,
170 ) -> Self {
171 Self {
172 bot_token,
173 app_token,
174 channel_id,
175 channel_ids,
176 allowed_users,
177 thread_replies: true,
178 mention_only: false,
179 group_reply_allowed_sender_ids: Vec::new(),
180 user_display_name_cache: Mutex::new(HashMap::new()),
181 workspace_dir: None,
182 active_assistant_thread: Mutex::new(HashMap::new()),
183 use_markdown_blocks: false,
184 proxy_url: None,
185 transcription: None,
186 transcription_manager: None,
187 stream_drafts: false,
188 draft_update_interval_ms: SLACK_DRAFT_UPDATE_INTERVAL_MS,
189 last_draft_edit: Mutex::new(HashMap::new()),
190 lazy_draft_ts: tokio::sync::Mutex::new(HashMap::new()),
191 cancel_reaction: None,
192 approval_registry: None,
193 gateway_port: 42617,
194 }
195 }
196
197 pub fn with_approval_registry(
201 mut self,
202 registry: std::sync::Arc<crate::gateway::approval_registry::ApprovalRegistry>,
203 gateway_port: u16,
204 ) -> Self {
205 self.approval_registry = Some(registry);
206 self.gateway_port = gateway_port;
207 self
208 }
209
210 fn try_intercept_approval(
216 &self,
217 channel_id: &str,
218 thread_ts: Option<&str>,
219 clean_text: &str,
220 author_display: &str,
221 ) -> bool {
222 let Some(ref registry) = self.approval_registry else {
223 return false;
224 };
225 let Some(thread_ts) = thread_ts else {
228 return false;
229 };
230 let Some((run_id, is_approve, feedback)) =
231 registry.match_slack_keyword(channel_id, Some(thread_ts), clean_text)
232 else {
233 return false;
234 };
235 let Some(pending) = registry.try_claim(&run_id) else {
236 return false;
237 };
238
239 let port = self.gateway_port;
240 let token = self.bot_token.clone();
241 let channel = channel_id.to_string();
242 let thread_ts = thread_ts.to_string();
243 let workflow_name = pending.workflow_name.clone();
244 let author = author_display.to_string();
245
246 tokio::spawn(async move {
247 let url = format!("http://127.0.0.1:{port}/api/workflows/runs/{run_id}/approve");
249 let client = reqwest::Client::new();
250 match client
251 .post(&url)
252 .json(&serde_json::json!({
253 "approved": is_approve,
254 "feedback": feedback,
255 }))
256 .send()
257 .await
258 {
259 Ok(resp) if resp.status().is_success() => {
260 tracing::info!(
261 run_id = %run_id,
262 approved = %is_approve,
263 "Slack: workflow approval processed"
264 );
265 }
266 Ok(resp) => {
267 tracing::warn!(
268 run_id = %run_id,
269 status = %resp.status(),
270 "Slack: workflow approval endpoint returned error"
271 );
272 }
273 Err(e) => {
274 tracing::warn!(
275 run_id = %run_id,
276 error = %e,
277 "Slack: failed to call workflow approval endpoint"
278 );
279 }
280 }
281
282 let confirm = if is_approve {
283 format!("✅ Workflow `{workflow_name}` approved by {author}")
284 } else if feedback.is_empty() {
285 format!("❌ Workflow `{workflow_name}` rejected by {author}")
286 } else {
287 format!("❌ Workflow `{workflow_name}` rejected by {author}. Feedback: {feedback}")
288 };
289
290 let body = serde_json::json!({
291 "channel": channel,
292 "text": confirm,
293 "thread_ts": thread_ts,
294 });
295 let _ = client
296 .post("https://slack.com/api/chat.postMessage")
297 .bearer_auth(&token)
298 .json(&body)
299 .send()
300 .await;
301 });
302
303 true
304 }
305
306 pub fn with_group_reply_policy(
308 mut self,
309 mention_only: bool,
310 allowed_sender_ids: Vec<String>,
311 ) -> Self {
312 self.mention_only = mention_only;
313 self.group_reply_allowed_sender_ids =
314 Self::normalize_group_reply_allowed_sender_ids(allowed_sender_ids);
315 self
316 }
317
318 pub fn with_thread_replies(mut self, thread_replies: bool) -> Self {
320 self.thread_replies = thread_replies;
321 self
322 }
323
324 pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
326 self.workspace_dir = Some(dir);
327 self
328 }
329
330 pub fn with_markdown_blocks(mut self, enabled: bool) -> Self {
334 self.use_markdown_blocks = enabled;
335 self
336 }
337
338 pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
339 self.proxy_url = proxy_url;
340 self
341 }
342
343 pub fn with_transcription(mut self, config: crate::config::TranscriptionConfig) -> Self {
345 if !config.enabled {
346 return self;
347 }
348 match super::transcription::TranscriptionManager::new(&config) {
349 Ok(m) => {
350 self.transcription_manager = Some(std::sync::Arc::new(m));
351 self.transcription = Some(config);
352 }
353 Err(e) => {
354 tracing::warn!(
355 "transcription manager init failed, voice transcription disabled: {e}"
356 );
357 }
358 }
359 self
360 }
361
362 pub fn with_streaming(mut self, enabled: bool, interval_ms: u64) -> Self {
364 self.stream_drafts = enabled;
365 if interval_ms > 0 {
366 self.draft_update_interval_ms = interval_ms;
367 }
368 self
369 }
370
371 pub fn with_cancel_reaction(mut self, reaction: Option<String>) -> Self {
373 self.cancel_reaction = reaction;
374 self
375 }
376
377 async fn delete_message(&self, channel_id: &str, ts: &str) -> anyhow::Result<()> {
379 let body = serde_json::json!({
380 "channel": channel_id,
381 "ts": ts,
382 });
383
384 let resp = self
385 .http_client()
386 .post("https://slack.com/api/chat.delete")
387 .bearer_auth(&self.bot_token)
388 .json(&body)
389 .send()
390 .await?;
391
392 let resp_body: serde_json::Value = resp.json().await?;
393 if resp_body.get("ok") != Some(&serde_json::Value::Bool(true)) {
394 let err = resp_body
395 .get("error")
396 .and_then(|e| e.as_str())
397 .unwrap_or("unknown");
398 tracing::debug!("Slack chat.delete failed: {err}");
399 }
400
401 Ok(())
402 }
403
404 async fn resolve_draft_ts(&self, message_id: &str) -> Option<String> {
409 if !message_id.starts_with(LAZY_DRAFT_PREFIX) {
410 return Some(message_id.to_string());
411 }
412 self.lazy_draft_ts.lock().await.get(message_id).cloned()
413 }
414
415 async fn materialize_lazy_draft(
418 &self,
419 lazy_id: &str,
420 text: &str,
421 ) -> anyhow::Result<Option<String>> {
422 let rest = lazy_id.strip_prefix(LAZY_DRAFT_PREFIX).unwrap_or(lazy_id);
424 let (channel_id, thread_ts) = match rest.find(':') {
425 Some(pos) => {
426 let ts = &rest[pos + 1..];
427 (&rest[..pos], if ts.is_empty() { None } else { Some(ts) })
428 }
429 None => (rest, None),
430 };
431
432 let mut body = serde_json::json!({
433 "channel": channel_id,
434 "text": text,
435 });
436 if text.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
437 body["blocks"] = serde_json::json!([{
438 "type": "markdown",
439 "text": text
440 }]);
441 }
442 if let Some(ts) = thread_ts {
443 body["thread_ts"] = serde_json::json!(ts);
444 }
445
446 let resp = self
447 .http_client()
448 .post("https://slack.com/api/chat.postMessage")
449 .bearer_auth(&self.bot_token)
450 .json(&body)
451 .send()
452 .await?;
453
454 let resp_body: serde_json::Value = resp.json().await?;
455 if resp_body.get("ok") != Some(&serde_json::Value::Bool(true)) {
456 let err = resp_body
457 .get("error")
458 .and_then(|e| e.as_str())
459 .unwrap_or("unknown");
460 anyhow::bail!("Slack chat.postMessage (lazy draft) failed: {err}");
461 }
462
463 let ts = resp_body
464 .get("ts")
465 .and_then(|v| v.as_str())
466 .map(ToString::to_string);
467
468 if let Some(ref real_ts) = ts {
469 self.lazy_draft_ts
470 .lock()
471 .await
472 .insert(lazy_id.to_string(), real_ts.clone());
473 }
474
475 Ok(ts)
476 }
477
478 async fn set_assistant_status(&self, channel_id: &str, status: &str) {
480 let thread_ts = {
481 let map = match self.active_assistant_thread.lock() {
482 Ok(m) => m,
483 Err(_) => return,
484 };
485 match map.get(channel_id) {
486 Some(ts) => ts.clone(),
487 None => return,
488 }
489 };
490
491 let body = serde_json::json!({
492 "channel_id": channel_id,
493 "thread_ts": thread_ts,
494 "status": status,
495 });
496
497 let _ = self
498 .http_client()
499 .post("https://slack.com/api/assistant.threads.setStatus")
500 .bearer_auth(&self.bot_token)
501 .json(&body)
502 .send()
503 .await;
504 }
505
506 fn http_client(&self) -> reqwest::Client {
507 crate::config::build_channel_proxy_client_with_timeouts(
508 "channel.slack",
509 self.proxy_url.as_deref(),
510 30,
511 10,
512 )
513 }
514
515 pub async fn post_message(&self, channel: &str, text: &str) -> anyhow::Result<String> {
521 let body = serde_json::json!({
522 "channel": channel,
523 "text": text,
524 });
525
526 let resp = self
527 .http_client()
528 .post("https://slack.com/api/chat.postMessage")
529 .bearer_auth(&self.bot_token)
530 .json(&body)
531 .send()
532 .await?;
533
534 let status = resp.status();
535 let raw = resp
536 .text()
537 .await
538 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
539
540 if !status.is_success() {
541 let sanitized = crate::providers::sanitize_api_error(&raw);
542 anyhow::bail!("Slack chat.postMessage failed ({status}): {sanitized}");
543 }
544
545 let parsed: serde_json::Value = serde_json::from_str(&raw).unwrap_or_default();
546 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
547 let err = parsed
548 .get("error")
549 .and_then(|e| e.as_str())
550 .unwrap_or("unknown");
551 anyhow::bail!("Slack chat.postMessage failed: {err}");
552 }
553
554 parsed
555 .get("ts")
556 .and_then(|v| v.as_str())
557 .map(String::from)
558 .ok_or_else(|| anyhow::anyhow!("Slack chat.postMessage response missing 'ts'"))
559 }
560
561 pub async fn update_message(&self, channel: &str, ts: &str, text: &str) -> anyhow::Result<()> {
566 let body = serde_json::json!({
567 "channel": channel,
568 "ts": ts,
569 "text": text,
570 });
571
572 let resp = self
573 .http_client()
574 .post("https://slack.com/api/chat.update")
575 .bearer_auth(&self.bot_token)
576 .json(&body)
577 .send()
578 .await?;
579
580 let status = resp.status();
581 let raw = resp
582 .text()
583 .await
584 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
585
586 if !status.is_success() {
587 let sanitized = crate::providers::sanitize_api_error(&raw);
588 anyhow::bail!("Slack chat.update failed ({status}): {sanitized}");
589 }
590
591 let parsed: serde_json::Value = serde_json::from_str(&raw).unwrap_or_default();
592 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
593 let err = parsed
594 .get("error")
595 .and_then(|e| e.as_str())
596 .unwrap_or("unknown");
597 anyhow::bail!("Slack chat.update failed: {err}");
598 }
599
600 Ok(())
601 }
602
603 fn is_user_allowed(&self, user_id: &str) -> bool {
607 self.allowed_users.iter().any(|u| u == "*" || u == user_id)
608 }
609
610 fn is_group_sender_trigger_enabled(&self, user_id: &str) -> bool {
611 let user_id = user_id.trim();
612 if user_id.is_empty() {
613 return false;
614 }
615
616 self.group_reply_allowed_sender_ids
617 .iter()
618 .any(|entry| entry == "*" || entry == user_id)
619 }
620
621 fn outbound_thread_ts<'a>(&self, message: &'a SendMessage) -> Option<&'a str> {
622 if self.thread_replies {
623 message.thread_ts.as_deref()
624 } else {
625 None
626 }
627 }
628
629 async fn get_bot_user_id(&self) -> Option<String> {
631 let resp: serde_json::Value = self
632 .http_client()
633 .get("https://slack.com/api/auth.test")
634 .bearer_auth(&self.bot_token)
635 .send()
636 .await
637 .ok()?
638 .json()
639 .await
640 .ok()?;
641
642 resp.get("user_id")
643 .and_then(|u| u.as_str())
644 .map(String::from)
645 }
646
647 fn inbound_thread_ts(msg: &serde_json::Value, ts: &str) -> Option<String> {
650 msg.get("thread_ts")
651 .and_then(|t| t.as_str())
652 .or(if ts.is_empty() { None } else { Some(ts) })
653 .map(str::to_string)
654 }
655
656 fn inbound_thread_ts_genuine_only(msg: &serde_json::Value) -> Option<String> {
662 msg.get("thread_ts")
663 .and_then(|t| t.as_str())
664 .map(str::to_string)
665 }
666
667 fn inbound_interruption_scope_id(msg: &serde_json::Value, ts: &str) -> Option<String> {
678 msg.get("thread_ts")
679 .and_then(|t| t.as_str())
680 .filter(|&t| t != ts)
681 .map(str::to_string)
682 }
683
684 fn normalized_channel_id(input: Option<&str>) -> Option<String> {
685 input
686 .map(str::trim)
687 .filter(|v| !v.is_empty() && *v != "*")
688 .map(ToOwned::to_owned)
689 }
690
691 fn configured_channel_id(&self) -> Option<String> {
692 Self::normalized_channel_id(self.channel_id.as_deref())
693 }
694
695 fn scoped_channel_ids(&self) -> Option<Vec<String>> {
698 let mut seen = HashSet::new();
699 let ids: Vec<String> = self
700 .channel_ids
701 .iter()
702 .filter_map(|entry| Self::normalized_channel_id(Some(entry)))
703 .filter(|id| seen.insert(id.clone()))
704 .collect();
705 if !ids.is_empty() {
706 return Some(ids);
707 }
708 self.configured_channel_id().map(|id| vec![id])
709 }
710
711 fn configured_app_token(&self) -> Option<String> {
712 self.app_token
713 .as_deref()
714 .map(str::trim)
715 .filter(|value| !value.is_empty())
716 .map(ToOwned::to_owned)
717 }
718
719 fn normalize_group_reply_allowed_sender_ids(sender_ids: Vec<String>) -> Vec<String> {
720 let mut normalized = sender_ids
721 .into_iter()
722 .map(|entry| entry.trim().to_string())
723 .filter(|entry| !entry.is_empty())
724 .collect::<Vec<_>>();
725 normalized.sort();
726 normalized.dedup();
727 normalized
728 }
729
730 fn user_cache_ttl() -> Duration {
731 Duration::from_secs(SLACK_USER_CACHE_TTL_SECS)
732 }
733
734 fn sanitize_display_name(name: &str) -> Option<String> {
735 let trimmed = name.trim();
736 if trimmed.is_empty() {
737 None
738 } else {
739 Some(trimmed.to_string())
740 }
741 }
742
743 fn extract_user_display_name(payload: &serde_json::Value) -> Option<String> {
744 let user = payload.get("user")?;
745 let profile = user.get("profile");
746
747 let candidates = [
748 profile
749 .and_then(|p| p.get("display_name"))
750 .and_then(|v| v.as_str()),
751 profile
752 .and_then(|p| p.get("display_name_normalized"))
753 .and_then(|v| v.as_str()),
754 profile
755 .and_then(|p| p.get("real_name_normalized"))
756 .and_then(|v| v.as_str()),
757 profile
758 .and_then(|p| p.get("real_name"))
759 .and_then(|v| v.as_str()),
760 user.get("real_name").and_then(|v| v.as_str()),
761 user.get("name").and_then(|v| v.as_str()),
762 ];
763
764 for candidate in candidates.into_iter().flatten() {
765 if let Some(display_name) = Self::sanitize_display_name(candidate) {
766 return Some(display_name);
767 }
768 }
769
770 None
771 }
772
773 fn cached_sender_display_name(&self, user_id: &str) -> Option<String> {
774 let now = Instant::now();
775 let Ok(mut cache) = self.user_display_name_cache.lock() else {
776 return None;
777 };
778
779 if let Some(entry) = cache.get(user_id) {
780 if now <= entry.expires_at {
781 return Some(entry.display_name.clone());
782 }
783 }
784
785 cache.remove(user_id);
786 None
787 }
788
789 fn cache_sender_display_name(&self, user_id: &str, display_name: &str) {
790 let Ok(mut cache) = self.user_display_name_cache.lock() else {
791 return;
792 };
793 if cache.len() >= SLACK_USER_CACHE_MAX_ENTRIES {
794 let now = Instant::now();
795 cache.retain(|_, v| v.expires_at > now);
796 }
797 cache.insert(
798 user_id.to_string(),
799 CachedSlackDisplayName {
800 display_name: display_name.to_string(),
801 expires_at: Instant::now() + Self::user_cache_ttl(),
802 },
803 );
804 }
805
806 async fn fetch_sender_display_name(&self, user_id: &str) -> Option<String> {
807 let resp = match self
808 .http_client()
809 .get("https://slack.com/api/users.info")
810 .bearer_auth(&self.bot_token)
811 .query(&[("user", user_id)])
812 .send()
813 .await
814 {
815 Ok(response) => response,
816 Err(err) => {
817 tracing::warn!("Slack users.info request failed for {user_id}: {err}");
818 return None;
819 }
820 };
821
822 let status = resp.status();
823 let body = resp
824 .text()
825 .await
826 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
827
828 if !status.is_success() {
829 let sanitized = crate::providers::sanitize_api_error(&body);
830 tracing::warn!("Slack users.info failed for {user_id} ({status}): {sanitized}");
831 return None;
832 }
833
834 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
835 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
836 let err = payload
837 .get("error")
838 .and_then(|e| e.as_str())
839 .unwrap_or("unknown");
840 tracing::warn!("Slack users.info returned error for {user_id}: {err}");
841 return None;
842 }
843
844 Self::extract_user_display_name(&payload)
845 }
846
847 async fn resolve_sender_identity(&self, user_id: &str) -> String {
848 let user_id = user_id.trim();
849 if user_id.is_empty() {
850 return String::new();
851 }
852
853 if let Some(display_name) = self.cached_sender_display_name(user_id) {
854 return display_name;
855 }
856
857 if let Some(display_name) = self.fetch_sender_display_name(user_id).await {
858 self.cache_sender_display_name(user_id, &display_name);
859 return display_name;
860 }
861
862 user_id.to_string()
863 }
864
865 fn is_group_channel_id(channel_id: &str) -> bool {
866 matches!(channel_id.chars().next(), Some('C' | 'G'))
867 }
868
869 fn contains_bot_mention(text: &str, bot_user_id: &str) -> bool {
870 if bot_user_id.is_empty() {
871 return false;
872 }
873 text.contains(&format!("<@{bot_user_id}>"))
874 }
875
876 fn strip_bot_mentions(text: &str, bot_user_id: &str) -> String {
877 if bot_user_id.is_empty() {
878 return text.trim().to_string();
879 }
880 text.replace(&format!("<@{bot_user_id}>"), " ")
881 .trim()
882 .to_string()
883 }
884
885 fn normalize_incoming_text(
886 text: &str,
887 require_mention: bool,
888 bot_user_id: &str,
889 ) -> Option<String> {
890 if require_mention && !Self::contains_bot_mention(text, bot_user_id) {
891 return None;
892 }
893
894 Some(Self::strip_bot_mentions(text, bot_user_id))
897 }
898
899 fn normalize_incoming_content(
900 text: &str,
901 require_mention: bool,
902 bot_user_id: &str,
903 ) -> Option<String> {
904 let normalized = Self::normalize_incoming_text(text, require_mention, bot_user_id)?;
905 if normalized.is_empty() {
906 return None;
907 }
908 Some(normalized)
909 }
910
911 fn is_supported_message_subtype(subtype: Option<&str>) -> bool {
912 matches!(subtype, None | Some("file_share" | "thread_broadcast"))
913 }
914
915 fn compose_incoming_content(text: String, attachment_blocks: Vec<String>) -> Option<String> {
916 let mut sections = Vec::new();
917 if !text.trim().is_empty() {
918 sections.push(text.trim().to_string());
919 }
920 for block in attachment_blocks {
921 if !block.trim().is_empty() {
922 sections.push(block);
923 }
924 }
925
926 if sections.is_empty() {
927 None
928 } else {
929 Some(sections.join("\n\n"))
930 }
931 }
932
933 async fn build_incoming_content(
934 &self,
935 message: &serde_json::Value,
936 require_mention: bool,
937 bot_user_id: &str,
938 ) -> Option<String> {
939 let text = message
940 .get("text")
941 .and_then(|value| value.as_str())
942 .unwrap_or_default();
943 let normalized_text = Self::normalize_incoming_text(text, require_mention, bot_user_id)?;
944 let attachment_blocks = self.render_file_attachments(message).await;
945 let permalink_blocks = self.resolve_permalink_blocks(&normalized_text).await;
946 let mut blocks = attachment_blocks;
947 blocks.extend(permalink_blocks);
948 Self::compose_incoming_content(normalized_text, blocks)
949 }
950
951 async fn resolve_permalink_blocks(&self, text: &str) -> Vec<String> {
952 let permalinks = Self::extract_slack_permalinks(text);
953 if permalinks.is_empty() {
954 return Vec::new();
955 }
956 let tasks = permalinks
957 .into_iter()
958 .map(|permalink| async move { self.resolve_slack_permalink(&permalink).await });
959
960 futures_util::stream::iter(tasks)
961 .buffer_unordered(SLACK_ATTACHMENT_RENDER_CONCURRENCY)
962 .filter_map(|block| async move { block })
963 .collect()
964 .await
965 }
966
967 fn extract_slack_permalinks(text: &str) -> Vec<SlackPermalinkRef> {
968 let mut permalinks = Vec::new();
969 let mut seen = HashSet::new();
970
971 for token in text.split_whitespace() {
972 if permalinks.len() >= SLACK_PERMALINK_MAX_LINKS_PER_MESSAGE {
973 break;
974 }
975
976 let Some(url) = Self::extract_url_token(token) else {
977 continue;
978 };
979 let Some(permalink) = Self::parse_slack_permalink(&url) else {
980 continue;
981 };
982 if seen.insert((permalink.channel_id.clone(), permalink.message_ts.clone())) {
983 permalinks.push(permalink);
984 }
985 }
986
987 permalinks
988 }
989
990 fn extract_url_token(token: &str) -> Option<String> {
991 let trimmed = token.trim();
992 if trimmed.is_empty() {
993 return None;
994 }
995
996 let candidate = if trimmed.starts_with('<') && trimmed.ends_with('>') {
997 trimmed
998 .trim_start_matches('<')
999 .trim_end_matches('>')
1000 .split('|')
1001 .next()
1002 .unwrap_or_default()
1003 .trim()
1004 } else {
1005 trimmed.trim_matches(|ch: char| {
1006 matches!(
1007 ch,
1008 '(' | ')' | '[' | ']' | '{' | '}' | '"' | '\'' | ',' | ';'
1009 )
1010 })
1011 };
1012
1013 if candidate.starts_with("https://") || candidate.starts_with("http://") {
1014 Some(candidate.to_string())
1015 } else {
1016 None
1017 }
1018 }
1019
1020 fn parse_slack_permalink(raw_url: &str) -> Option<SlackPermalinkRef> {
1021 let url = reqwest::Url::parse(raw_url).ok()?;
1022 let host = url.host_str()?.trim_end_matches('.').to_ascii_lowercase();
1023 if host != "slack.com" && !host.ends_with(".slack.com") {
1024 return None;
1025 }
1026
1027 let mut segments = url.path_segments()?;
1028 let first = segments.next()?;
1029 let second = segments.next()?;
1030 let third = segments.next()?;
1031 if first != "archives" || segments.next().is_some() {
1032 return None;
1033 }
1034
1035 let channel_id = second.trim();
1036 if channel_id.is_empty() {
1037 return None;
1038 }
1039
1040 let message_ts = Self::parse_slack_permalink_ts(third)?;
1041 let thread_ts_hint = url
1042 .query_pairs()
1043 .find(|(key, _)| key == "thread_ts")
1044 .map(|(_, value)| value.trim().to_string())
1045 .filter(|value| Self::is_valid_slack_ts(value));
1046
1047 Some(SlackPermalinkRef {
1048 url: raw_url.to_string(),
1049 channel_id: channel_id.to_string(),
1050 message_ts,
1051 thread_ts_hint,
1052 })
1053 }
1054
1055 fn parse_slack_permalink_ts(segment: &str) -> Option<String> {
1056 let digits = segment.strip_prefix('p')?.trim();
1057 if digits.len() <= 6 || !digits.chars().all(|ch| ch.is_ascii_digit()) {
1058 return None;
1059 }
1060
1061 let (secs, micros) = digits.split_at(digits.len() - 6);
1062 Some(format!("{secs}.{micros}"))
1063 }
1064
1065 fn is_valid_slack_ts(ts: &str) -> bool {
1066 let Some((secs, micros)) = ts.split_once('.') else {
1067 return false;
1068 };
1069 !secs.is_empty()
1070 && micros.len() == 6
1071 && secs.chars().all(|ch| ch.is_ascii_digit())
1072 && micros.chars().all(|ch| ch.is_ascii_digit())
1073 }
1074
1075 async fn resolve_slack_permalink(&self, permalink: &SlackPermalinkRef) -> Option<String> {
1076 let message_lookup = self
1077 .fetch_permalink_message(&permalink.channel_id, &permalink.message_ts)
1078 .await;
1079 let message = match message_lookup {
1080 SlackPermalinkLookup::Message(message) => message,
1081 SlackPermalinkLookup::AccessDenied(reason) => {
1082 return Some(Self::format_permalink_access_denied(permalink, &reason));
1083 }
1084 SlackPermalinkLookup::NotFound => {
1085 let thread_ts = permalink.thread_ts_hint.as_deref()?;
1086 let replies = self
1087 .fetch_thread_messages_with_retry(&permalink.channel_id, thread_ts)
1088 .await?;
1089 let target = replies.into_iter().find(|reply| {
1090 reply.get("ts").and_then(|value| value.as_str())
1091 == Some(permalink.message_ts.as_str())
1092 });
1093 let target = target?;
1094 return self
1095 .format_permalink_context(permalink, target, Some(thread_ts))
1096 .await;
1097 }
1098 };
1099
1100 let thread_ts = message
1101 .get("thread_ts")
1102 .and_then(|value| value.as_str())
1103 .filter(|thread_ts| Self::is_valid_slack_ts(thread_ts))
1104 .map(str::to_string);
1105
1106 self.format_permalink_context(permalink, message, thread_ts.as_deref())
1107 .await
1108 }
1109
1110 async fn fetch_permalink_message(
1111 &self,
1112 channel_id: &str,
1113 message_ts: &str,
1114 ) -> SlackPermalinkLookup {
1115 let resp = match self
1116 .http_client()
1117 .get("https://slack.com/api/conversations.history")
1118 .bearer_auth(&self.bot_token)
1119 .query(&[
1120 ("channel", channel_id),
1121 ("oldest", message_ts),
1122 ("latest", message_ts),
1123 ("inclusive", "true"),
1124 ("limit", "1"),
1125 ])
1126 .send()
1127 .await
1128 {
1129 Ok(response) => response,
1130 Err(err) => {
1131 tracing::warn!(
1132 "Slack permalink resolver: conversations.history request failed for channel={} ts={}: {}",
1133 channel_id,
1134 message_ts,
1135 err
1136 );
1137 return SlackPermalinkLookup::NotFound;
1138 }
1139 };
1140
1141 let status = resp.status();
1142 let body = resp
1143 .text()
1144 .await
1145 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
1146 if !status.is_success() {
1147 let sanitized = crate::providers::sanitize_api_error(&body);
1148 tracing::warn!(
1149 "Slack permalink resolver: conversations.history failed for channel={} ts={} ({}): {}",
1150 channel_id,
1151 message_ts,
1152 status,
1153 sanitized
1154 );
1155 return SlackPermalinkLookup::NotFound;
1156 }
1157
1158 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
1159 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
1160 let err = payload
1161 .get("error")
1162 .and_then(|value| value.as_str())
1163 .unwrap_or("unknown");
1164 return match err {
1165 "not_in_channel" => SlackPermalinkLookup::AccessDenied(
1166 "The Slack bot is not in that channel. Invite the app to the channel and try again."
1167 .to_string(),
1168 ),
1169 "missing_scope" => SlackPermalinkLookup::AccessDenied(
1170 "The Slack app is missing the scope needed to read that channel."
1171 .to_string(),
1172 ),
1173 _ => {
1174 tracing::warn!(
1175 "Slack permalink resolver: conversations.history returned error for channel={} ts={}: {}",
1176 channel_id, message_ts, err
1177 );
1178 SlackPermalinkLookup::NotFound
1179 }
1180 };
1181 }
1182
1183 let messages = payload
1184 .get("messages")
1185 .and_then(|messages| messages.as_array())
1186 .cloned()
1187 .unwrap_or_default();
1188 messages
1189 .first()
1190 .cloned()
1191 .map(SlackPermalinkLookup::Message)
1192 .unwrap_or(SlackPermalinkLookup::NotFound)
1193 }
1194
1195 fn format_permalink_access_denied(permalink: &SlackPermalinkRef, reason: &str) -> String {
1196 format!(
1197 "[Slack Link Access]\nURL: {}\nStatus: {}",
1198 permalink.url, reason
1199 )
1200 }
1201
1202 async fn fetch_thread_messages_with_retry(
1203 &self,
1204 channel_id: &str,
1205 thread_ts: &str,
1206 ) -> Option<Vec<serde_json::Value>> {
1207 let payload = self
1208 .fetch_thread_replies_with_retry(channel_id, thread_ts, "0")
1209 .await?;
1210 let messages = payload
1211 .get("messages")
1212 .and_then(|messages| messages.as_array())
1213 .cloned()
1214 .unwrap_or_default();
1215 Some(messages)
1216 }
1217
1218 async fn format_permalink_context(
1219 &self,
1220 permalink: &SlackPermalinkRef,
1221 message: serde_json::Value,
1222 thread_ts: Option<&str>,
1223 ) -> Option<String> {
1224 let mut lines = vec![
1225 "[Slack Link Context]".to_string(),
1226 format!("URL: {}", permalink.url),
1227 ];
1228
1229 if let Some(thread_ts) = thread_ts {
1230 let replies = self
1231 .fetch_thread_messages_with_retry(&permalink.channel_id, thread_ts)
1232 .await
1233 .unwrap_or_else(|| vec![message.clone()]);
1234 let rendered = self
1235 .render_permalink_thread_messages(&replies, &permalink.message_ts)
1236 .await;
1237 if rendered.is_empty() {
1238 return None;
1239 }
1240 lines.push("Thread:".to_string());
1241 lines.extend(rendered);
1242 } else {
1243 let rendered = self.render_permalink_message_line(&message, true).await?;
1244 lines.push("Message:".to_string());
1245 lines.push(rendered);
1246 }
1247
1248 Self::truncate_text(&lines.join("\n"), SLACK_PERMALINK_TEXT_MAX_CHARS)
1249 }
1250
1251 async fn render_permalink_thread_messages(
1252 &self,
1253 messages: &[serde_json::Value],
1254 target_ts: &str,
1255 ) -> Vec<String> {
1256 let mut rendered = Vec::new();
1257 let total = messages.len();
1258 let start = total.saturating_sub(SLACK_PERMALINK_THREAD_MAX_REPLIES);
1259
1260 if start > 0 {
1261 rendered.push(format!("… {} earlier thread messages omitted …", start));
1262 }
1263
1264 for message in &messages[start..] {
1265 if let Some(line) = self
1266 .render_permalink_message_line(
1267 message,
1268 message.get("ts").and_then(|value| value.as_str()) == Some(target_ts),
1269 )
1270 .await
1271 {
1272 rendered.push(line);
1273 }
1274 }
1275
1276 rendered
1277 }
1278
1279 async fn render_permalink_message_line(
1280 &self,
1281 message: &serde_json::Value,
1282 highlight: bool,
1283 ) -> Option<String> {
1284 let user_id = message
1285 .get("user")
1286 .or_else(|| message.get("bot_id"))
1287 .and_then(|value| value.as_str())
1288 .unwrap_or_default();
1289 let sender = if user_id.is_empty() {
1290 "unknown".to_string()
1291 } else {
1292 self.resolve_sender_identity(user_id).await
1293 };
1294
1295 let text = message
1296 .get("text")
1297 .and_then(|value| value.as_str())
1298 .map(str::trim)
1299 .filter(|value| !value.is_empty())
1300 .unwrap_or("[no text]");
1301 let attachment_blocks = self.render_file_attachments(message).await;
1302 let content = Self::compose_incoming_content(text.to_string(), attachment_blocks)
1303 .unwrap_or_else(|| text.to_string())
1304 .replace('\n', " ");
1305 let prefix = if highlight { ">" } else { "-" };
1306 Some(format!("{prefix} {sender}: {content}"))
1307 }
1308
1309 async fn render_file_attachments(&self, message: &serde_json::Value) -> Vec<String> {
1310 let Some(files) = message.get("files").and_then(|value| value.as_array()) else {
1311 return Vec::new();
1312 };
1313
1314 if files.len() > SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE {
1315 tracing::warn!(
1316 "Slack message has {} files; processing first {} only",
1317 files.len(),
1318 SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE
1319 );
1320 }
1321
1322 let limited_files = files
1323 .iter()
1324 .take(SLACK_ATTACHMENT_MAX_FILES_PER_MESSAGE)
1325 .cloned()
1326 .collect::<Vec<_>>();
1327
1328 let tasks =
1329 limited_files
1330 .into_iter()
1331 .enumerate()
1332 .map(|(idx, raw_file)| async move {
1333 (idx, self.render_file_attachment(&raw_file).await)
1334 });
1335
1336 let mut rendered = futures_util::stream::iter(tasks)
1337 .buffer_unordered(SLACK_ATTACHMENT_RENDER_CONCURRENCY)
1338 .collect::<Vec<_>>()
1339 .await;
1340 rendered.sort_by_key(|(idx, _)| *idx);
1341 rendered
1342 .into_iter()
1343 .filter_map(|(_, block)| block)
1344 .collect()
1345 }
1346
1347 async fn render_file_attachment(&self, raw_file: &serde_json::Value) -> Option<String> {
1348 let file = self
1349 .hydrate_file_object(raw_file)
1350 .await
1351 .unwrap_or_else(|| raw_file.clone());
1352
1353 if Self::is_audio_file(&file) {
1356 if let Some(transcribed) = self.try_transcribe_audio_file(&file).await {
1357 return Some(transcribed);
1358 }
1359 }
1360 if Self::is_image_file(&file) {
1361 if let Some(marker) = self.fetch_image_marker(&file).await {
1362 return Some(marker);
1363 }
1364 }
1365
1366 let mut snippet = Self::file_text_preview(&file);
1367 if snippet.is_none() && Self::is_probably_text_file(&file) {
1368 snippet = self.download_text_snippet(&file).await;
1369 }
1370
1371 if let Some(text) = snippet {
1372 if !text.trim().is_empty() {
1373 return Some(Self::format_snippet_attachment(&file, &text));
1374 }
1375 }
1376
1377 Some(Self::format_attachment_summary(&file))
1378 }
1379
1380 async fn hydrate_file_object(&self, file: &serde_json::Value) -> Option<serde_json::Value> {
1381 let file_id = Self::slack_file_id(file)?;
1382 let file_access = file
1383 .get("file_access")
1384 .and_then(|value| value.as_str())
1385 .unwrap_or_default();
1386 let mode = Self::slack_file_mode(file).unwrap_or_default();
1387
1388 let requires_lookup = file_access.eq_ignore_ascii_case("check_file_info")
1389 || Self::slack_file_download_url(file).is_none()
1390 || (Self::is_probably_text_file(file) && Self::file_text_preview(file).is_none())
1391 || (mode == "snippet" && file.get("preview").is_none());
1392 if !requires_lookup {
1393 return Some(file.clone());
1394 }
1395
1396 self.fetch_file_info(file_id)
1397 .await
1398 .or_else(|| Some(file.clone()))
1399 }
1400
1401 async fn fetch_file_info(&self, file_id: &str) -> Option<serde_json::Value> {
1402 let resp = match self
1403 .http_client()
1404 .get("https://slack.com/api/files.info")
1405 .bearer_auth(&self.bot_token)
1406 .query(&[("file", file_id)])
1407 .send()
1408 .await
1409 {
1410 Ok(response) => response,
1411 Err(err) => {
1412 tracing::warn!("Slack files.info request failed for {file_id}: {err}");
1413 return None;
1414 }
1415 };
1416
1417 let status = resp.status();
1418 let body = resp
1419 .text()
1420 .await
1421 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
1422 if !status.is_success() {
1423 let sanitized = crate::providers::sanitize_api_error(&body);
1424 tracing::warn!("Slack files.info failed for {file_id} ({status}): {sanitized}");
1425 return None;
1426 }
1427
1428 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
1429 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
1430 let err = payload
1431 .get("error")
1432 .and_then(|value| value.as_str())
1433 .unwrap_or("unknown");
1434 tracing::warn!("Slack files.info returned error for {file_id}: {err}");
1435 return None;
1436 }
1437
1438 payload.get("file").cloned()
1439 }
1440
1441 fn slack_file_id(file: &serde_json::Value) -> Option<&str> {
1442 file.get("id").and_then(|value| value.as_str())
1443 }
1444
1445 fn slack_file_name(file: &serde_json::Value) -> String {
1446 file.get("title")
1447 .and_then(|value| value.as_str())
1448 .filter(|value| !value.trim().is_empty())
1449 .or_else(|| file.get("name").and_then(|value| value.as_str()))
1450 .unwrap_or("attachment")
1451 .trim()
1452 .to_string()
1453 }
1454
1455 fn slack_file_mode(file: &serde_json::Value) -> Option<String> {
1456 file.get("mode")
1457 .and_then(|value| value.as_str())
1458 .map(|value| value.to_ascii_lowercase())
1459 }
1460
1461 fn slack_file_mime(file: &serde_json::Value) -> Option<String> {
1462 file.get("mimetype")
1463 .and_then(|value| value.as_str())
1464 .map(|value| value.to_ascii_lowercase())
1465 }
1466
1467 fn slack_file_download_url(file: &serde_json::Value) -> Option<&str> {
1468 file.get("url_private_download")
1469 .and_then(|value| value.as_str())
1470 .or_else(|| file.get("url_private").and_then(|value| value.as_str()))
1471 }
1472
1473 fn slack_image_candidate_urls(file: &serde_json::Value) -> Vec<String> {
1474 let mut urls = Vec::new();
1475 let mut seen = HashSet::new();
1476 for key in [
1477 "thumb_1024",
1478 "thumb_960",
1479 "thumb_800",
1480 "thumb_720",
1481 "thumb_480",
1482 "thumb_360",
1483 "thumb_160",
1484 "url_private_download",
1485 "url_private",
1486 ] {
1487 if let Some(url) = file.get(key).and_then(|value| value.as_str()) {
1488 let trimmed = url.trim();
1489 if trimmed.is_empty() {
1490 continue;
1491 }
1492 if seen.insert(trimmed.to_string()) {
1493 urls.push(trimmed.to_string());
1494 }
1495 }
1496 }
1497 urls
1498 }
1499
1500 fn is_allowed_slack_media_hostname(host: &str) -> bool {
1501 let normalized = host.trim().trim_end_matches('.').to_ascii_lowercase();
1502 if normalized.is_empty() {
1503 return false;
1504 }
1505
1506 SLACK_ALLOWED_MEDIA_HOST_SUFFIXES
1507 .iter()
1508 .any(|suffix| normalized == *suffix || normalized.ends_with(&format!(".{suffix}")))
1509 }
1510
1511 fn redact_slack_url(url: &reqwest::Url) -> String {
1512 let host = url.host_str().unwrap_or("unknown-host");
1513 let tail = url
1514 .path_segments()
1515 .and_then(|mut segments| {
1516 segments
1517 .rfind(|segment| !segment.is_empty())
1518 .map(str::to_string)
1519 })
1520 .unwrap_or_else(|| "root".to_string());
1521 format!("{host}/.../{tail}")
1522 }
1523
1524 fn redact_raw_slack_url(raw_url: &str) -> String {
1525 reqwest::Url::parse(raw_url)
1526 .map(|parsed| Self::redact_slack_url(&parsed))
1527 .unwrap_or_else(|_| "<invalid-url>".to_string())
1528 }
1529
1530 fn redact_redirect_location(location: &str) -> String {
1531 match reqwest::Url::parse(location) {
1532 Ok(url) => Self::redact_slack_url(&url),
1533 Err(_) => {
1534 let tail = location
1535 .split(['?', '#'])
1536 .next()
1537 .unwrap_or_default()
1538 .trim_end_matches('/')
1539 .rsplit('/')
1540 .next()
1541 .filter(|segment| !segment.is_empty())
1542 .unwrap_or("relative");
1543 format!("relative/.../{tail}")
1544 }
1545 }
1546 }
1547
1548 fn validate_slack_private_file_url(raw_url: &str) -> Option<reqwest::Url> {
1549 let parsed = match reqwest::Url::parse(raw_url) {
1550 Ok(url) => url,
1551 Err(err) => {
1552 let redacted_raw = Self::redact_raw_slack_url(raw_url);
1553 tracing::warn!("Slack file URL parse failed for {redacted_raw}: {err}");
1554 return None;
1555 }
1556 };
1557 let redacted = Self::redact_slack_url(&parsed);
1558
1559 if parsed.scheme() != "https" {
1560 tracing::warn!(
1561 "Slack file URL rejected due to non-HTTPS scheme for {}: {}",
1562 redacted,
1563 parsed.scheme()
1564 );
1565 return None;
1566 }
1567
1568 let Some(host) = parsed.host_str() else {
1569 tracing::warn!("Slack file URL rejected due to missing host: {redacted}");
1570 return None;
1571 };
1572 if !Self::is_allowed_slack_media_hostname(host) {
1573 tracing::warn!("Slack file URL rejected due to non-Slack host: {redacted}");
1574 return None;
1575 }
1576
1577 Some(parsed)
1578 }
1579
1580 fn resolve_https_redirect_target(base: &reqwest::Url, location: &str) -> Option<reqwest::Url> {
1581 let redacted_base = Self::redact_slack_url(base);
1582 let redacted_location = Self::redact_redirect_location(location);
1583 let target = match base.join(location) {
1584 Ok(url) => url,
1585 Err(err) => {
1586 tracing::warn!(
1587 "Slack file redirect URL parse failed for base {} and location {}: {}",
1588 redacted_base,
1589 redacted_location,
1590 err
1591 );
1592 return None;
1593 }
1594 };
1595 let redacted_target = Self::redact_slack_url(&target);
1596 if target.scheme() != "https" {
1597 tracing::warn!(
1598 "Slack file redirect rejected due to non-HTTPS scheme for {}",
1599 redacted_target
1600 );
1601 return None;
1602 }
1603 let Some(host) = target.host_str() else {
1604 tracing::warn!(
1605 "Slack file redirect rejected due to missing host for {}",
1606 redacted_target
1607 );
1608 return None;
1609 };
1610 if !Self::is_allowed_slack_media_hostname(host) {
1611 tracing::warn!(
1612 "Slack file redirect rejected due to non-Slack host for {}",
1613 redacted_target
1614 );
1615 return None;
1616 }
1617 Some(target)
1618 }
1619
1620 fn slack_media_http_client_no_redirect(&self) -> anyhow::Result<reqwest::Client> {
1621 let builder = crate::config::apply_channel_proxy_to_builder(
1622 reqwest::Client::builder()
1623 .redirect(reqwest::redirect::Policy::none())
1624 .timeout(Duration::from_secs(30))
1625 .connect_timeout(Duration::from_secs(10)),
1626 "channel.slack",
1627 self.proxy_url.as_deref(),
1628 );
1629 builder
1630 .build()
1631 .context("failed to build Slack media no-redirect HTTP client")
1632 }
1633
1634 async fn fetch_slack_private_file(&self, raw_url: &str) -> Option<reqwest::Response> {
1635 let parsed = Self::validate_slack_private_file_url(raw_url)?;
1636 let redacted_parsed = Self::redact_slack_url(&parsed);
1637 let client = match self.slack_media_http_client_no_redirect() {
1638 Ok(client) => client,
1639 Err(err) => {
1640 tracing::warn!("Slack file fetch failed for {}: {}", redacted_parsed, err);
1641 return None;
1642 }
1643 };
1644 let mut current_url = parsed;
1645
1646 for redirect_hop in 0..=SLACK_MEDIA_REDIRECT_MAX_HOPS {
1647 let redacted_current = Self::redact_slack_url(¤t_url);
1648 let mut req = client.get(current_url.clone());
1649 if redirect_hop == 0 {
1650 req = req.bearer_auth(&self.bot_token);
1651 }
1652 let response = match req.send().await {
1653 Ok(response) => response,
1654 Err(err) => {
1655 tracing::warn!("Slack file fetch failed for {}: {}", redacted_current, err);
1656 return None;
1657 }
1658 };
1659
1660 if !response.status().is_redirection() {
1661 return Some(response);
1662 }
1663
1664 if redirect_hop == SLACK_MEDIA_REDIRECT_MAX_HOPS {
1665 tracing::warn!(
1666 "Slack file redirect limit exceeded for {} after {} hops",
1667 redacted_current,
1668 SLACK_MEDIA_REDIRECT_MAX_HOPS
1669 );
1670 return Some(response);
1671 }
1672
1673 let Some(location) = response.headers().get(reqwest::header::LOCATION) else {
1674 return Some(response);
1675 };
1676 let Ok(location) = location.to_str() else {
1677 tracing::warn!(
1678 "Slack file redirect location header is not valid UTF-8 for {}",
1679 redacted_current
1680 );
1681 return Some(response);
1682 };
1683 let Some(next_url) = Self::resolve_https_redirect_target(¤t_url, location) else {
1684 return Some(response);
1685 };
1686 current_url = next_url;
1687 }
1688
1689 None
1690 }
1691
1692 async fn fetch_image_marker(&self, file: &serde_json::Value) -> Option<String> {
1693 let file_name = Self::slack_file_name(file);
1694 let image_urls = Self::slack_image_candidate_urls(file);
1695 if image_urls.is_empty() {
1696 tracing::warn!(
1697 "Slack file attachment is image-like but has no downloadable URL: {}",
1698 file_name
1699 );
1700 return None;
1701 }
1702
1703 for url in image_urls {
1704 if let Some(marker) = self.download_private_image_as_marker(&url, file).await {
1705 return Some(marker);
1706 }
1707 }
1708
1709 tracing::warn!("Slack image attachment download failed for {file_name}");
1710 None
1711 }
1712
1713 async fn download_private_image_as_marker(
1714 &self,
1715 url: &str,
1716 file: &serde_json::Value,
1717 ) -> Option<String> {
1718 let redacted_url = Self::redact_raw_slack_url(url);
1719 let resp = self.fetch_slack_private_file(url).await?;
1720
1721 let status = resp.status();
1722 if !status.is_success() {
1723 let body = resp
1724 .text()
1725 .await
1726 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
1727 let sanitized = crate::providers::sanitize_api_error(&body);
1728 tracing::warn!(
1729 "Slack image fetch failed for {} ({status}): {sanitized}",
1730 redacted_url
1731 );
1732 return None;
1733 }
1734
1735 let content_type = resp
1736 .headers()
1737 .get(reqwest::header::CONTENT_TYPE)
1738 .and_then(|value| value.to_str().ok())
1739 .map(str::to_string);
1740 if let Some(content_length) = resp.content_length() {
1741 let content_length = usize::try_from(content_length).unwrap_or(usize::MAX);
1742 if content_length > SLACK_ATTACHMENT_IMAGE_MAX_BYTES {
1743 tracing::warn!(
1744 "Slack image fetch skipped for {}: content-length {} exceeds {} bytes",
1745 redacted_url,
1746 content_length,
1747 SLACK_ATTACHMENT_IMAGE_MAX_BYTES
1748 );
1749 return None;
1750 }
1751 }
1752
1753 let bytes = match resp.bytes().await {
1754 Ok(bytes) => bytes,
1755 Err(err) => {
1756 tracing::warn!("Slack image body read failed for {}: {err}", redacted_url);
1757 return None;
1758 }
1759 };
1760 if bytes.is_empty() {
1761 tracing::warn!("Slack image body is empty for {}", redacted_url);
1762 return None;
1763 }
1764 if bytes.len() > SLACK_ATTACHMENT_IMAGE_MAX_BYTES {
1765 tracing::warn!(
1766 "Slack image body too large for {}: {} bytes exceeds {} bytes",
1767 redacted_url,
1768 bytes.len(),
1769 SLACK_ATTACHMENT_IMAGE_MAX_BYTES
1770 );
1771 return None;
1772 }
1773
1774 let Some(mime) =
1775 Self::detect_image_mime(content_type.as_deref(), file, bytes.as_ref(), url)
1776 else {
1777 tracing::warn!("Slack image MIME detection failed for {}", redacted_url);
1778 return None;
1779 };
1780 if !Self::is_supported_image_mime(&mime) {
1781 tracing::warn!(
1782 "Slack image MIME not supported for {}: {mime}",
1783 redacted_url
1784 );
1785 return None;
1786 }
1787
1788 let file_name = Self::slack_file_name(file);
1789 if let Some(saved_path) = self
1790 .persist_image_attachment(file, &file_name, &mime, bytes.as_ref())
1791 .await
1792 {
1793 return Some(format!("[IMAGE:{}]", saved_path.display()));
1794 }
1795
1796 if bytes.len() > SLACK_ATTACHMENT_IMAGE_INLINE_FALLBACK_MAX_BYTES {
1797 tracing::warn!(
1798 "Slack image inline fallback skipped for {}: {} bytes exceeds {} bytes",
1799 redacted_url,
1800 bytes.len(),
1801 SLACK_ATTACHMENT_IMAGE_INLINE_FALLBACK_MAX_BYTES
1802 );
1803 return None;
1804 }
1805
1806 let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
1807 Some(format!("[IMAGE:data:{mime};base64,{encoded}]"))
1808 }
1809
1810 fn detect_image_mime(
1811 content_type_header: Option<&str>,
1812 file: &serde_json::Value,
1813 bytes: &[u8],
1814 source_url: &str,
1815 ) -> Option<String> {
1816 let redacted_source = Self::redact_raw_slack_url(source_url);
1817 if let Some(magic_mime) = Self::mime_from_magic(bytes) {
1818 return Some(magic_mime.to_string());
1819 }
1820
1821 if let Some(header_mime) = content_type_header
1822 .and_then(Self::normalized_content_type)
1823 .filter(|mime| mime.starts_with("image/"))
1824 {
1825 tracing::warn!(
1826 "Slack image MIME mismatch for {}: HTTP header claims {}, but bytes do not match a supported image signature",
1827 redacted_source,
1828 header_mime
1829 );
1830 }
1831
1832 if let Some(file_mime) =
1833 Self::slack_file_mime(file).filter(|mime| mime.starts_with("image/"))
1834 {
1835 tracing::warn!(
1836 "Slack image MIME mismatch for {}: file metadata claims {}, but bytes do not match a supported image signature",
1837 redacted_source,
1838 file_mime
1839 );
1840 }
1841
1842 if let Some(ext) = Self::file_extension(source_url)
1843 .or_else(|| Self::file_extension(&Self::slack_file_name(file)))
1844 {
1845 if let Some(mime) = Self::mime_from_extension(&ext) {
1846 tracing::warn!(
1847 "Slack image MIME mismatch for {}: filename extension implies {}, but bytes do not match a supported image signature",
1848 redacted_source,
1849 mime
1850 );
1851 }
1852 }
1853
1854 None
1855 }
1856
1857 fn normalized_content_type(content_type: &str) -> Option<String> {
1858 let mime = content_type
1859 .split(';')
1860 .next()
1861 .unwrap_or_default()
1862 .trim()
1863 .to_ascii_lowercase();
1864 if mime.is_empty() { None } else { Some(mime) }
1865 }
1866
1867 fn is_supported_image_mime(mime: &str) -> bool {
1868 SLACK_SUPPORTED_IMAGE_MIME_TYPES.contains(&mime)
1869 }
1870
1871 fn mime_from_extension(ext: &str) -> Option<&'static str> {
1872 match ext.to_ascii_lowercase().as_str() {
1873 "png" => Some("image/png"),
1874 "jpg" | "jpeg" => Some("image/jpeg"),
1875 "gif" => Some("image/gif"),
1876 "webp" => Some("image/webp"),
1877 "bmp" => Some("image/bmp"),
1878 _ => None,
1879 }
1880 }
1881
1882 fn mime_from_magic(bytes: &[u8]) -> Option<&'static str> {
1883 if bytes.len() >= 8
1884 && bytes.starts_with(&[0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'])
1885 {
1886 return Some("image/png");
1887 }
1888 if bytes.len() >= 3 && bytes.starts_with(&[0xff, 0xd8, 0xff]) {
1889 return Some("image/jpeg");
1890 }
1891 if bytes.len() >= 6 && (bytes.starts_with(b"GIF87a") || bytes.starts_with(b"GIF89a")) {
1892 return Some("image/gif");
1893 }
1894 if bytes.len() >= 12 && bytes.starts_with(b"RIFF") && &bytes[8..12] == b"WEBP" {
1895 return Some("image/webp");
1896 }
1897 if bytes.len() >= 2 && bytes.starts_with(b"BM") {
1898 return Some("image/bmp");
1899 }
1900 None
1901 }
1902
1903 async fn persist_image_attachment(
1904 &self,
1905 file: &serde_json::Value,
1906 file_name: &str,
1907 mime: &str,
1908 bytes: &[u8],
1909 ) -> Option<PathBuf> {
1910 let workspace = self.workspace_dir.as_ref()?;
1911 let safe_name = Self::sanitize_attachment_filename(file_name)
1912 .unwrap_or_else(|| "attachment".to_string());
1913 let ext = Self::image_extension_for_mime(mime).unwrap_or("png");
1914 let safe_name = Self::ensure_file_extension(&safe_name, ext);
1915 let file_id = Self::slack_file_id(file)
1916 .map(Self::sanitize_file_id)
1917 .unwrap_or_else(|| "file".to_string());
1918 let generated_name = format!(
1919 "slack_{}_{}_{}",
1920 Utc::now().timestamp_millis(),
1921 file_id,
1922 safe_name
1923 );
1924
1925 let output_path = match Self::resolve_workspace_attachment_output_path(
1926 workspace,
1927 &generated_name,
1928 )
1929 .await
1930 {
1931 Ok(path) => path,
1932 Err(err) => {
1933 tracing::warn!(
1934 "Slack image attachment path resolution failed for {}: {err}",
1935 file_name
1936 );
1937 return None;
1938 }
1939 };
1940
1941 let Some(parent_dir) = output_path.parent() else {
1942 tracing::warn!(
1943 "Slack image attachment write failed for {}: missing parent directory",
1944 output_path.display()
1945 );
1946 return None;
1947 };
1948
1949 let file_tail = output_path
1950 .file_name()
1951 .and_then(|name| name.to_str())
1952 .unwrap_or("attachment");
1953 let temp_name = format!(
1954 ".{file_tail}.{}.part",
1955 Utc::now().timestamp_nanos_opt().unwrap_or_default()
1956 );
1957 let temp_path = parent_dir.join(temp_name);
1958
1959 let mut temp_file = match tokio::fs::OpenOptions::new()
1960 .create_new(true)
1961 .write(true)
1962 .open(&temp_path)
1963 .await
1964 {
1965 Ok(file) => file,
1966 Err(err) => {
1967 tracing::warn!(
1968 "Slack image attachment temp open failed for {}: {err}",
1969 temp_path.display()
1970 );
1971 return None;
1972 }
1973 };
1974
1975 if let Err(err) = temp_file.write_all(bytes).await {
1976 tracing::warn!(
1977 "Slack image attachment temp write failed for {}: {err}",
1978 temp_path.display()
1979 );
1980 let _ = tokio::fs::remove_file(&temp_path).await;
1981 return None;
1982 }
1983 if let Err(err) = temp_file.sync_all().await {
1984 tracing::warn!(
1985 "Slack image attachment temp sync failed for {}: {err}",
1986 temp_path.display()
1987 );
1988 let _ = tokio::fs::remove_file(&temp_path).await;
1989 return None;
1990 }
1991 drop(temp_file);
1992
1993 match tokio::fs::symlink_metadata(&output_path).await {
1997 Ok(meta) if meta.file_type().is_symlink() => {
1998 tracing::warn!(
1999 "Slack image attachment refused: output path is a symlink: {}",
2000 output_path.display()
2001 );
2002 let _ = tokio::fs::remove_file(&temp_path).await;
2003 return None;
2004 }
2005 _ => {}
2006 }
2007
2008 if let Err(err) = tokio::fs::rename(&temp_path, &output_path).await {
2009 tracing::warn!(
2010 "Slack image attachment finalize failed for {}: {err}",
2011 output_path.display()
2012 );
2013 let _ = tokio::fs::remove_file(&temp_path).await;
2014 return None;
2015 }
2016
2017 Some(output_path)
2018 }
2019
2020 async fn resolve_workspace_attachment_output_path(
2021 workspace: &Path,
2022 file_name: &str,
2023 ) -> anyhow::Result<PathBuf> {
2024 let safe_name = Self::sanitize_attachment_filename(file_name)
2025 .ok_or_else(|| anyhow::anyhow!("invalid attachment filename: {file_name}"))?;
2026
2027 tokio::fs::create_dir_all(workspace).await?;
2028 let workspace_root = tokio::fs::canonicalize(workspace)
2029 .await
2030 .unwrap_or_else(|_| workspace.to_path_buf());
2031
2032 let save_dir = workspace.join(SLACK_ATTACHMENT_SAVE_SUBDIR);
2033 tokio::fs::create_dir_all(&save_dir).await?;
2034 let resolved_save_dir = tokio::fs::canonicalize(&save_dir).await.with_context(|| {
2035 format!(
2036 "failed to resolve Slack attachment save directory: {}",
2037 save_dir.display()
2038 )
2039 })?;
2040
2041 if !resolved_save_dir.starts_with(&workspace_root) {
2042 anyhow::bail!(
2043 "Slack attachment save directory escapes workspace: {}",
2044 resolved_save_dir.display()
2045 );
2046 }
2047
2048 Ok(resolved_save_dir.join(safe_name))
2049 }
2050
2051 fn sanitize_attachment_filename(file_name: &str) -> Option<String> {
2052 let basename = Path::new(file_name).file_name()?.to_str()?.trim();
2053 if basename.is_empty() || basename == "." || basename == ".." {
2054 return None;
2055 }
2056
2057 let sanitized: String = basename
2058 .replace(['/', '\\'], "_")
2059 .chars()
2060 .take(SLACK_ATTACHMENT_FILENAME_MAX_CHARS)
2061 .collect();
2062 if sanitized.is_empty() || sanitized == "." || sanitized == ".." {
2063 None
2064 } else {
2065 Some(sanitized)
2066 }
2067 }
2068
2069 fn sanitize_file_id(file_id: &str) -> String {
2070 let cleaned: String = file_id
2071 .chars()
2072 .filter(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-'))
2073 .take(64)
2074 .collect();
2075 if cleaned.is_empty() {
2076 "file".to_string()
2077 } else {
2078 cleaned
2079 }
2080 }
2081
2082 fn ensure_file_extension(file_name: &str, extension: &str) -> String {
2083 if Path::new(file_name).extension().is_some() {
2084 file_name.to_string()
2085 } else {
2086 format!("{file_name}.{extension}")
2087 }
2088 }
2089
2090 fn image_extension_for_mime(mime: &str) -> Option<&'static str> {
2091 match mime {
2092 "image/png" => Some("png"),
2093 "image/jpeg" => Some("jpg"),
2094 "image/webp" => Some("webp"),
2095 "image/gif" => Some("gif"),
2096 "image/bmp" => Some("bmp"),
2097 _ => None,
2098 }
2099 }
2100
2101 fn file_extension(value: &str) -> Option<String> {
2102 let before_query = value.split('?').next().unwrap_or(value);
2103 before_query
2104 .rsplit('/')
2105 .next()
2106 .unwrap_or(before_query)
2107 .rsplit_once('.')
2108 .map(|(_, ext)| ext.to_ascii_lowercase())
2109 }
2110
2111 fn file_text_preview(file: &serde_json::Value) -> Option<String> {
2112 let preview = file
2113 .get("preview")
2114 .and_then(|value| value.as_str())
2115 .or_else(|| {
2116 file.get("preview_highlight")
2117 .and_then(|value| value.as_str())
2118 })
2119 .or_else(|| {
2120 file.get("initial_comment")
2121 .and_then(|comment| comment.get("comment"))
2122 .and_then(|value| value.as_str())
2123 })?;
2124 Self::truncate_text(preview, SLACK_ATTACHMENT_TEXT_INLINE_MAX_CHARS)
2125 }
2126
2127 fn truncate_text(value: &str, max_chars: usize) -> Option<String> {
2128 let mut out = String::new();
2129 let mut count = 0usize;
2130 for ch in value.chars() {
2131 if count >= max_chars {
2132 break;
2133 }
2134 out.push(ch);
2135 count += 1;
2136 }
2137 let was_truncated = count >= max_chars && value.chars().nth(max_chars).is_some();
2138 let mut out = out.trim().to_string();
2139 if out.is_empty() {
2140 return None;
2141 }
2142 if was_truncated {
2143 out.push_str("\n…[truncated]");
2144 }
2145 Some(out)
2146 }
2147
2148 fn is_probably_text_file(file: &serde_json::Value) -> bool {
2149 if matches!(
2150 Self::slack_file_mode(file).as_deref(),
2151 Some("snippet" | "post")
2152 ) {
2153 return true;
2154 }
2155
2156 if Self::slack_file_mime(file)
2157 .as_deref()
2158 .is_some_and(|mime| mime.starts_with("text/"))
2159 {
2160 return true;
2161 }
2162
2163 if file
2164 .get("filetype")
2165 .and_then(|value| value.as_str())
2166 .map(|value| value.to_ascii_lowercase())
2167 .as_deref()
2168 .is_some_and(Self::is_text_filetype)
2169 {
2170 return true;
2171 }
2172
2173 Self::file_extension(&Self::slack_file_name(file))
2174 .as_deref()
2175 .is_some_and(Self::is_text_filetype)
2176 }
2177
2178 fn is_text_filetype(filetype: &str) -> bool {
2179 matches!(
2180 filetype,
2181 "txt"
2182 | "text"
2183 | "md"
2184 | "markdown"
2185 | "csv"
2186 | "tsv"
2187 | "json"
2188 | "yaml"
2189 | "yml"
2190 | "toml"
2191 | "xml"
2192 | "html"
2193 | "css"
2194 | "js"
2195 | "ts"
2196 | "jsx"
2197 | "tsx"
2198 | "py"
2199 | "rs"
2200 | "go"
2201 | "java"
2202 | "kt"
2203 | "c"
2204 | "cc"
2205 | "cpp"
2206 | "h"
2207 | "hpp"
2208 | "cs"
2209 | "php"
2210 | "rb"
2211 | "swift"
2212 | "sql"
2213 | "log"
2214 | "ini"
2215 | "conf"
2216 | "cfg"
2217 | "env"
2218 | "sh"
2219 | "bash"
2220 | "zsh"
2221 )
2222 }
2223
2224 fn is_image_file(file: &serde_json::Value) -> bool {
2225 if Self::slack_file_mime(file)
2226 .as_deref()
2227 .is_some_and(|mime| mime.starts_with("image/"))
2228 {
2229 return true;
2230 }
2231
2232 if file
2233 .get("filetype")
2234 .and_then(|value| value.as_str())
2235 .map(|value| value.to_ascii_lowercase())
2236 .as_deref()
2237 .is_some_and(|filetype| Self::mime_from_extension(filetype).is_some())
2238 {
2239 return true;
2240 }
2241
2242 Self::file_extension(&Self::slack_file_name(file))
2243 .as_deref()
2244 .is_some_and(|ext| Self::mime_from_extension(ext).is_some())
2245 }
2246
2247 const AUDIO_EXTENSIONS: &[&str] = &[
2249 "flac", "mp3", "mpeg", "mpga", "mp4", "m4a", "ogg", "oga", "opus", "wav", "webm",
2250 ];
2251
2252 fn is_audio_file(file: &serde_json::Value) -> bool {
2255 if let Some(subtype) = file.get("subtype").and_then(|v| v.as_str()) {
2257 if subtype == "slack_audio" {
2258 return true;
2259 }
2260 }
2261
2262 if Self::slack_file_mime(file)
2263 .as_deref()
2264 .is_some_and(|mime| mime.starts_with("audio/"))
2265 {
2266 return true;
2267 }
2268
2269 if let Some(ft) = file
2270 .get("filetype")
2271 .and_then(|v| v.as_str())
2272 .map(|v| v.to_ascii_lowercase())
2273 {
2274 if Self::AUDIO_EXTENSIONS.contains(&ft.as_str()) {
2275 return true;
2276 }
2277 }
2278
2279 Self::file_extension(&Self::slack_file_name(file))
2280 .as_deref()
2281 .is_some_and(|ext| Self::AUDIO_EXTENSIONS.contains(&ext))
2282 }
2283
2284 async fn try_transcribe_audio_file(&self, file: &serde_json::Value) -> Option<String> {
2288 let manager = self.transcription_manager.as_deref()?;
2289
2290 let url = Self::slack_file_download_url(file)?;
2291 let file_name = Self::slack_file_name(file);
2292 let redacted_url = Self::redact_raw_slack_url(url);
2293
2294 let resp = self.fetch_slack_private_file(url).await?;
2295 let status = resp.status();
2296 if !status.is_success() {
2297 tracing::warn!(
2298 "Slack voice file download failed for {} ({status})",
2299 redacted_url
2300 );
2301 return None;
2302 }
2303
2304 let audio_data = match resp.bytes().await {
2305 Ok(bytes) => bytes.to_vec(),
2306 Err(e) => {
2307 tracing::warn!("Slack voice file read failed for {}: {e}", redacted_url);
2308 return None;
2309 }
2310 };
2311
2312 let transcription_filename = if Self::file_extension(&file_name).is_some() {
2314 file_name.clone()
2315 } else {
2316 let mime_ext = Self::slack_file_mime(file)
2318 .and_then(|mime| mime.rsplit('/').next().map(|s| s.to_string()))
2319 .unwrap_or_else(|| "ogg".to_string());
2320 format!("voice.{mime_ext}")
2321 };
2322
2323 match manager
2324 .transcribe(&audio_data, &transcription_filename)
2325 .await
2326 {
2327 Ok(text) => {
2328 let trimmed = text.trim();
2329 if trimmed.is_empty() {
2330 tracing::info!("Slack voice transcription returned empty text, skipping");
2331 None
2332 } else {
2333 tracing::info!(
2334 "Slack: transcribed voice file {} ({} chars)",
2335 file_name,
2336 trimmed.len()
2337 );
2338 Some(format!("[Voice] {trimmed}"))
2339 }
2340 }
2341 Err(e) => {
2342 tracing::warn!("Slack voice transcription failed for {}: {e}", file_name);
2343 Some(Self::format_attachment_summary(file))
2344 }
2345 }
2346 }
2347
2348 async fn download_text_snippet(&self, file: &serde_json::Value) -> Option<String> {
2349 let url = Self::slack_file_download_url(file)?;
2350 let redacted_url = Self::redact_raw_slack_url(url);
2351 let resp = self.fetch_slack_private_file(url).await?;
2352
2353 let status = resp.status();
2354 if !status.is_success() {
2355 let body = resp
2356 .text()
2357 .await
2358 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
2359 let sanitized = crate::providers::sanitize_api_error(&body);
2360 tracing::warn!(
2361 "Slack snippet fetch failed for {} ({status}): {sanitized}",
2362 redacted_url
2363 );
2364 return None;
2365 }
2366
2367 if let Some(content_length) = resp.content_length() {
2368 let content_length = usize::try_from(content_length).unwrap_or(usize::MAX);
2369 if content_length > SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES {
2370 tracing::warn!(
2371 "Slack snippet download skipped for {}: content-length {} exceeds {} bytes",
2372 redacted_url,
2373 content_length,
2374 SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES
2375 );
2376 return None;
2377 }
2378 }
2379
2380 let bytes = match resp.bytes().await {
2381 Ok(bytes) => bytes,
2382 Err(err) => {
2383 tracing::warn!("Slack snippet body read failed for {}: {err}", redacted_url);
2384 return None;
2385 }
2386 };
2387 if bytes.is_empty() {
2388 return None;
2389 }
2390 if bytes.len() > SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES {
2391 tracing::warn!(
2392 "Slack snippet body too large for {}: {} bytes exceeds {} bytes",
2393 redacted_url,
2394 bytes.len(),
2395 SLACK_ATTACHMENT_TEXT_DOWNLOAD_MAX_BYTES
2396 );
2397 return None;
2398 }
2399 if bytes.contains(&0) {
2400 tracing::warn!("Slack snippet body appears binary for {}", redacted_url);
2401 return None;
2402 }
2403
2404 let text = String::from_utf8_lossy(&bytes);
2405 Self::truncate_text(&text, SLACK_ATTACHMENT_TEXT_INLINE_MAX_CHARS)
2406 }
2407
2408 fn format_snippet_attachment(file: &serde_json::Value, snippet: &str) -> String {
2409 let file_name = Self::slack_file_name(file);
2410 let language = file
2411 .get("filetype")
2412 .and_then(|value| value.as_str())
2413 .map(Self::sanitize_code_fence_language)
2414 .unwrap_or_else(|| "text".to_string());
2415
2416 let fence = if snippet.contains("```") {
2417 "````"
2418 } else {
2419 "```"
2420 };
2421 format!("[SNIPPET:{file_name}]\n{fence}{language}\n{snippet}\n{fence}")
2422 }
2423
2424 fn sanitize_code_fence_language(input: &str) -> String {
2425 let normalized = input
2426 .trim()
2427 .chars()
2428 .filter(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '_' | '-' | '+'))
2429 .collect::<String>();
2430 if normalized.is_empty() {
2431 "text".to_string()
2432 } else {
2433 normalized
2434 }
2435 }
2436
2437 fn format_attachment_summary(file: &serde_json::Value) -> String {
2438 let file_name = Self::slack_file_name(file);
2439 let mime = Self::slack_file_mime(file).unwrap_or_else(|| "unknown".to_string());
2440 let size = file
2441 .get("size")
2442 .and_then(|value| value.as_u64())
2443 .map(|value| format!("{value} bytes"))
2444 .unwrap_or_else(|| "unknown size".to_string());
2445 format!("[ATTACHMENT:{file_name} | mime={mime} | size={size}]")
2446 }
2447
2448 fn extract_channel_ids(list_payload: &serde_json::Value) -> Vec<String> {
2449 let mut ids = list_payload
2450 .get("channels")
2451 .and_then(|c| c.as_array())
2452 .into_iter()
2453 .flatten()
2454 .filter_map(|channel| {
2455 let id = channel.get("id").and_then(|id| id.as_str())?;
2456 let is_archived = channel
2457 .get("is_archived")
2458 .and_then(|v| v.as_bool())
2459 .unwrap_or(false);
2460 let is_member = channel
2461 .get("is_member")
2462 .and_then(|v| v.as_bool())
2463 .unwrap_or(true);
2464 if is_archived || !is_member {
2465 return None;
2466 }
2467 Some(id.to_string())
2468 })
2469 .collect::<Vec<_>>();
2470 ids.sort();
2471 ids.dedup();
2472 ids
2473 }
2474
2475 async fn list_accessible_channels(&self) -> anyhow::Result<Vec<String>> {
2476 let mut channels = Vec::new();
2477 let mut cursor: Option<String> = None;
2478
2479 loop {
2480 let mut query_params = vec![
2481 ("exclude_archived", "true".to_string()),
2482 ("limit", "200".to_string()),
2483 (
2484 "types",
2485 "public_channel,private_channel,mpim,im".to_string(),
2486 ),
2487 ];
2488 if let Some(ref next) = cursor {
2489 query_params.push(("cursor", next.clone()));
2490 }
2491
2492 let resp = self
2493 .http_client()
2494 .get("https://slack.com/api/conversations.list")
2495 .bearer_auth(&self.bot_token)
2496 .query(&query_params)
2497 .send()
2498 .await?;
2499
2500 let status = resp.status();
2501 let body = resp
2502 .text()
2503 .await
2504 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
2505
2506 if !status.is_success() {
2507 let sanitized = crate::providers::sanitize_api_error(&body);
2508 anyhow::bail!("Slack conversations.list failed ({status}): {sanitized}");
2509 }
2510
2511 let data: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
2512 if data.get("ok") == Some(&serde_json::Value::Bool(false)) {
2513 let err = data
2514 .get("error")
2515 .and_then(|e| e.as_str())
2516 .unwrap_or("unknown");
2517 anyhow::bail!("Slack conversations.list failed: {err}");
2518 }
2519
2520 channels.extend(Self::extract_channel_ids(&data));
2521
2522 cursor = data
2523 .get("response_metadata")
2524 .and_then(|rm| rm.get("next_cursor"))
2525 .and_then(|c| c.as_str())
2526 .map(str::trim)
2527 .filter(|c| !c.is_empty())
2528 .map(ToOwned::to_owned);
2529
2530 if cursor.is_none() {
2531 break;
2532 }
2533 }
2534
2535 channels.sort();
2536 channels.dedup();
2537 Ok(channels)
2538 }
2539
2540 fn slack_now_ts() -> String {
2541 let now = SystemTime::now()
2542 .duration_since(UNIX_EPOCH)
2543 .unwrap_or_default();
2544 format!("{}.{:06}", now.as_secs(), now.subsec_micros())
2545 }
2546
2547 fn ensure_poll_cursor(
2548 cursors: &mut HashMap<String, String>,
2549 channel_id: &str,
2550 now_ts: &str,
2551 ) -> String {
2552 cursors
2553 .entry(channel_id.to_string())
2554 .or_insert_with(|| now_ts.to_string())
2555 .clone()
2556 }
2557
2558 fn parse_block_action_as_command(
2563 envelope: &serde_json::Value,
2564 _bot_user_id: &str,
2565 ) -> Option<ChannelMessage> {
2566 let payload = envelope.get("payload")?;
2567
2568 let payload_type = payload.get("type").and_then(|v| v.as_str())?;
2569 if payload_type != "block_actions" {
2570 return None;
2571 }
2572
2573 let actions = payload.get("actions").and_then(|v| v.as_array())?;
2574 let action = actions.first()?;
2575
2576 let action_id = action.get("action_id").and_then(|v| v.as_str())?;
2577 let selected_value = action
2578 .get("selected_option")
2579 .and_then(|o| o.get("value"))
2580 .and_then(|v| v.as_str())?;
2581
2582 let command = match action_id {
2583 "construct_config_provider" => format!("/models {selected_value}"),
2584 "construct_config_model" => format!("/model {selected_value}"),
2585 _ => return None,
2586 };
2587
2588 let user = payload
2589 .get("user")
2590 .and_then(|u| u.get("id"))
2591 .and_then(|v| v.as_str())
2592 .unwrap_or("unknown");
2593
2594 let channel_id = payload
2595 .get("channel")
2596 .and_then(|c| c.get("id"))
2597 .and_then(|v| v.as_str())
2598 .unwrap_or_default();
2599
2600 if channel_id.is_empty() {
2601 tracing::warn!("Slack block_actions: missing channel ID in interactive payload");
2602 return None;
2603 }
2604
2605 let ts = payload
2606 .get("message")
2607 .and_then(|m| m.get("ts"))
2608 .and_then(|v| v.as_str())
2609 .unwrap_or("0");
2610
2611 Some(ChannelMessage {
2612 id: format!("slack_{channel_id}_{ts}_action"),
2613 sender: user.to_string(),
2614 reply_target: channel_id.to_string(),
2615 content: command,
2616 channel: "slack".to_string(),
2617 timestamp: SystemTime::now()
2618 .duration_since(UNIX_EPOCH)
2619 .unwrap_or_default()
2620 .as_secs(),
2621 thread_ts: payload
2622 .get("message")
2623 .and_then(|m| m.get("thread_ts"))
2624 .and_then(|v| v.as_str())
2625 .map(str::to_string),
2626 interruption_scope_id: None,
2627 attachments: vec![],
2628 })
2629 }
2630
2631 async fn open_socket_mode_url(&self) -> anyhow::Result<String> {
2632 let app_token = self
2633 .configured_app_token()
2634 .ok_or_else(|| anyhow::anyhow!("Slack Socket Mode requires app_token"))?;
2635
2636 let resp = self
2637 .http_client()
2638 .post("https://slack.com/api/apps.connections.open")
2639 .bearer_auth(app_token)
2640 .send()
2641 .await?;
2642
2643 let status = resp.status();
2644 let body = resp
2645 .text()
2646 .await
2647 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
2648
2649 if !status.is_success() {
2650 let sanitized = crate::providers::sanitize_api_error(&body);
2651 anyhow::bail!("Slack apps.connections.open failed ({status}): {sanitized}");
2652 }
2653
2654 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
2655 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
2656 let err = parsed
2657 .get("error")
2658 .and_then(|e| e.as_str())
2659 .unwrap_or("unknown");
2660 anyhow::bail!("Slack apps.connections.open failed: {err}");
2661 }
2662
2663 parsed
2664 .get("url")
2665 .and_then(|v| v.as_str())
2666 .map(ToOwned::to_owned)
2667 .ok_or_else(|| anyhow::anyhow!("Slack apps.connections.open did not return url"))
2668 }
2669
2670 async fn listen_socket_mode(
2671 &self,
2672 tx: tokio::sync::mpsc::Sender<ChannelMessage>,
2673 bot_user_id: &str,
2674 scoped_channels: Option<Vec<String>>,
2675 ) -> anyhow::Result<()> {
2676 let mut last_ts_by_channel: HashMap<String, String> = HashMap::new();
2677 let mut open_url_attempt: u32 = 0;
2678 let mut socket_reconnect_attempt: u32 = 0;
2679
2680 loop {
2681 let ws_url = match self.open_socket_mode_url().await {
2682 Ok(url) => {
2683 open_url_attempt = 0;
2684 url
2685 }
2686 Err(e) => {
2687 let wait = Self::compute_socket_mode_retry_delay(open_url_attempt);
2688 tracing::warn!(
2689 "Slack Socket Mode: failed to open websocket URL: {e}; retrying in {:.3}s (attempt #{})",
2690 wait.as_secs_f64(),
2691 open_url_attempt.saturating_add(1),
2692 );
2693 open_url_attempt = open_url_attempt.saturating_add(1);
2694 tokio::time::sleep(wait).await;
2695 continue;
2696 }
2697 };
2698
2699 let (ws_stream, _) = match crate::config::ws_connect_with_proxy(
2700 &ws_url,
2701 "channel.slack",
2702 self.proxy_url.as_deref(),
2703 )
2704 .await
2705 {
2706 Ok(connection) => {
2707 socket_reconnect_attempt = 0;
2708 connection
2709 }
2710 Err(e) => {
2711 let wait = Self::compute_socket_mode_retry_delay(socket_reconnect_attempt);
2712 tracing::warn!(
2713 "Slack Socket Mode: websocket connect failed: {e}; retrying in {:.3}s (attempt #{})",
2714 wait.as_secs_f64(),
2715 socket_reconnect_attempt.saturating_add(1),
2716 );
2717 socket_reconnect_attempt = socket_reconnect_attempt.saturating_add(1);
2718 tokio::time::sleep(wait).await;
2719 continue;
2720 }
2721 };
2722 tracing::info!("Slack Socket Mode: websocket connected");
2723
2724 let (mut write, mut read) = ws_stream.split();
2725
2726 while let Some(frame) = read.next().await {
2727 let text = match frame {
2728 Ok(WsMessage::Text(text)) => text,
2729 Ok(WsMessage::Ping(payload)) => {
2730 if let Err(e) = write.send(WsMessage::Pong(payload)).await {
2731 tracing::warn!("Slack Socket Mode: pong send failed: {e}");
2732 break;
2733 }
2734 continue;
2735 }
2736 Ok(WsMessage::Close(_)) => {
2737 tracing::warn!("Slack Socket Mode: websocket closed by server");
2738 break;
2739 }
2740 Ok(_) => continue,
2741 Err(e) => {
2742 tracing::warn!("Slack Socket Mode: websocket read failed: {e}");
2743 break;
2744 }
2745 };
2746
2747 let envelope: serde_json::Value = match serde_json::from_str(text.as_ref()) {
2748 Ok(value) => value,
2749 Err(e) => {
2750 tracing::warn!("Slack Socket Mode: invalid JSON payload: {e}");
2751 continue;
2752 }
2753 };
2754
2755 if let Some(envelope_id) = envelope.get("envelope_id").and_then(|v| v.as_str()) {
2756 let ack = serde_json::json!({ "envelope_id": envelope_id });
2757 if let Err(e) = write.send(WsMessage::Text(ack.to_string().into())).await {
2758 tracing::warn!("Slack Socket Mode: ack send failed: {e}");
2759 break;
2760 }
2761 }
2762
2763 let envelope_type = envelope
2764 .get("type")
2765 .and_then(|v| v.as_str())
2766 .unwrap_or_default();
2767 if envelope_type == "disconnect" {
2768 tracing::warn!("Slack Socket Mode: received disconnect event");
2769 break;
2770 }
2771
2772 if envelope_type == "interactive" {
2774 if let Some(msg) = Self::parse_block_action_as_command(&envelope, bot_user_id) {
2775 if tx.send(msg).await.is_err() {
2776 return Ok(());
2777 }
2778 }
2779 continue;
2780 }
2781
2782 if envelope_type != "events_api" {
2783 continue;
2784 }
2785
2786 let Some(event) = envelope
2787 .get("payload")
2788 .and_then(|payload| payload.get("event"))
2789 else {
2790 continue;
2791 };
2792 let event_type = event
2793 .get("type")
2794 .and_then(|v| v.as_str())
2795 .unwrap_or_default();
2796
2797 if event_type == "assistant_thread_started"
2799 || event_type == "assistant_thread_context_changed"
2800 {
2801 if let Some(thread) = event.get("assistant_thread") {
2802 let ch = thread
2803 .get("channel_id")
2804 .and_then(|v| v.as_str())
2805 .unwrap_or_default();
2806 let tts = thread
2807 .get("thread_ts")
2808 .and_then(|v| v.as_str())
2809 .unwrap_or_default();
2810 if !ch.is_empty() && !tts.is_empty() {
2811 if let Ok(mut map) = self.active_assistant_thread.lock() {
2812 map.insert(ch.to_string(), tts.to_string());
2813 }
2814 }
2815 }
2816 continue;
2817 }
2818
2819 if event_type == "reaction_added" {
2821 if let Some(ref cancel_emoji) = self.cancel_reaction {
2822 let reaction = event
2823 .get("reaction")
2824 .and_then(|v| v.as_str())
2825 .unwrap_or_default();
2826 if reaction == cancel_emoji.as_str() {
2827 let user = event
2828 .get("user")
2829 .and_then(|v| v.as_str())
2830 .unwrap_or_default();
2831 if !user.is_empty() && self.is_user_allowed(user) {
2832 let item = event.get("item");
2833 let item_channel = item
2834 .and_then(|i| i.get("channel"))
2835 .and_then(|v| v.as_str())
2836 .unwrap_or_default();
2837 let item_ts = item
2838 .and_then(|i| i.get("ts"))
2839 .and_then(|v| v.as_str())
2840 .unwrap_or_default();
2841 if !item_channel.is_empty() && !item_ts.is_empty() {
2842 let thread_ts = Some(item_ts.to_string());
2846 let scope_id = Some(item_ts.to_string());
2847 let sender = self.resolve_sender_identity(user).await;
2848 let cancel_msg = ChannelMessage {
2849 id: format!("slack_{item_channel}_{item_ts}_cancel"),
2850 sender,
2851 reply_target: item_channel.to_string(),
2852 content: "/stop".to_string(),
2853 channel: "slack".to_string(),
2854 timestamp: std::time::SystemTime::now()
2855 .duration_since(std::time::UNIX_EPOCH)
2856 .unwrap_or_default()
2857 .as_secs(),
2858 thread_ts,
2859 interruption_scope_id: scope_id,
2860 attachments: vec![],
2861 };
2862 tracing::info!(
2863 "Slack: :{cancel_emoji}: reaction from {user} \
2864 on {item_channel}/{item_ts} — sending /stop"
2865 );
2866 if tx.send(cancel_msg).await.is_err() {
2867 return Ok(());
2868 }
2869 }
2870 }
2871 }
2872 }
2873 continue;
2874 }
2875
2876 if event_type != "message" {
2877 continue;
2878 }
2879 let subtype = event.get("subtype").and_then(|v| v.as_str());
2880 if !Self::is_supported_message_subtype(subtype) {
2881 continue;
2882 }
2883
2884 let channel_id = event
2885 .get("channel")
2886 .and_then(|v| v.as_str())
2887 .map(str::to_string)
2888 .unwrap_or_default();
2889 if channel_id.is_empty() {
2890 continue;
2891 }
2892 if let Some(ref configured_channels) = scoped_channels {
2893 if !configured_channels.iter().any(|id| id == &channel_id) {
2894 continue;
2895 }
2896 }
2897
2898 let user = event
2899 .get("user")
2900 .and_then(|v| v.as_str())
2901 .unwrap_or_default();
2902 if user.is_empty() || user == bot_user_id {
2903 continue;
2904 }
2905 if !self.is_user_allowed(user) {
2906 tracing::warn!("Slack: ignoring message from unauthorized user: {user}");
2907 continue;
2908 }
2909
2910 let ts = event.get("ts").and_then(|v| v.as_str()).unwrap_or_default();
2911 if ts.is_empty() {
2912 continue;
2913 }
2914 let last_ts = last_ts_by_channel
2915 .get(&channel_id)
2916 .map(String::as_str)
2917 .unwrap_or_default();
2918 if ts <= last_ts {
2919 continue;
2920 }
2921
2922 let is_group_message = Self::is_group_channel_id(&channel_id);
2923 let is_thread_reply = event.get("thread_ts").and_then(|v| v.as_str()).is_some();
2924 let allow_sender_without_mention =
2925 is_group_message && self.is_group_sender_trigger_enabled(user);
2926 let require_mention = self.mention_only
2927 && is_group_message
2928 && !allow_sender_without_mention
2929 && !is_thread_reply;
2930
2931 let Some(normalized_text) = self
2932 .build_incoming_content(event, require_mention, bot_user_id)
2933 .await
2934 else {
2935 continue;
2936 };
2937
2938 let inbound_thread_ts = Self::inbound_thread_ts_genuine_only(event);
2942 if self.try_intercept_approval(
2943 &channel_id,
2944 inbound_thread_ts.as_deref(),
2945 &normalized_text,
2946 user,
2947 ) {
2948 last_ts_by_channel.insert(channel_id.clone(), ts.to_string());
2949 continue;
2950 }
2951
2952 last_ts_by_channel.insert(channel_id.clone(), ts.to_string());
2953 let sender = self.resolve_sender_identity(user).await;
2954
2955 let channel_msg = ChannelMessage {
2956 id: format!("slack_{channel_id}_{ts}"),
2957 sender,
2958 reply_target: channel_id.clone(),
2959 content: normalized_text,
2960 channel: "slack".to_string(),
2961 timestamp: std::time::SystemTime::now()
2962 .duration_since(std::time::UNIX_EPOCH)
2963 .unwrap_or_default()
2964 .as_secs(),
2965 thread_ts: if self.thread_replies {
2966 Self::inbound_thread_ts(event, ts)
2967 } else {
2968 Self::inbound_thread_ts_genuine_only(event)
2969 },
2970 interruption_scope_id: Self::inbound_interruption_scope_id(event, ts),
2971 attachments: vec![],
2972 };
2973
2974 if let Some(ref tts) = channel_msg.thread_ts {
2976 if let Ok(mut map) = self.active_assistant_thread.lock() {
2977 map.insert(channel_id.clone(), tts.clone());
2978 }
2979 }
2980
2981 if tx.send(channel_msg).await.is_err() {
2982 return Ok(());
2983 }
2984 }
2985
2986 let wait = Self::compute_socket_mode_retry_delay(socket_reconnect_attempt);
2987 tracing::warn!(
2988 "Slack Socket Mode: reconnecting in {:.3}s (attempt #{})...",
2989 wait.as_secs_f64(),
2990 socket_reconnect_attempt.saturating_add(1),
2991 );
2992 socket_reconnect_attempt = socket_reconnect_attempt.saturating_add(1);
2993 tokio::time::sleep(wait).await;
2994 }
2995 }
2996
2997 fn parse_retry_after_secs(headers: &HeaderMap) -> Option<u64> {
2998 let value = headers
2999 .get(reqwest::header::RETRY_AFTER)?
3000 .to_str()
3001 .ok()?
3002 .trim();
3003 Self::parse_retry_after_value(value)
3004 }
3005
3006 fn parse_retry_after_value(value: &str) -> Option<u64> {
3007 if value.is_empty() {
3008 return None;
3009 }
3010
3011 if let Ok(seconds) = value.parse::<u64>() {
3012 return Some(seconds);
3013 }
3014
3015 let truncated = value
3016 .split_once('.')
3017 .map(|(whole, _)| whole)
3018 .unwrap_or(value);
3019 truncated.parse::<u64>().ok()
3020 }
3021
3022 fn jitter_ms(max_jitter_ms: u64) -> u64 {
3023 if max_jitter_ms == 0 {
3024 return 0;
3025 }
3026 rand::random::<u64>() % (max_jitter_ms + 1)
3027 }
3028
3029 fn compute_exponential_backoff_delay(
3030 base_retry_after_secs: u64,
3031 attempt: u32,
3032 max_backoff_secs: u64,
3033 jitter_ms: u64,
3034 ) -> Duration {
3035 let multiplier = 1_u64.checked_shl(attempt).unwrap_or(u64::MAX);
3036 let backoff_secs = base_retry_after_secs
3037 .saturating_mul(multiplier)
3038 .min(max_backoff_secs);
3039 Duration::from_secs(backoff_secs) + Duration::from_millis(jitter_ms)
3040 }
3041
3042 fn compute_retry_delay(base_retry_after_secs: u64, attempt: u32, jitter_ms: u64) -> Duration {
3043 Self::compute_exponential_backoff_delay(
3044 base_retry_after_secs,
3045 attempt,
3046 SLACK_HISTORY_MAX_BACKOFF_SECS,
3047 jitter_ms,
3048 )
3049 }
3050
3051 fn compute_socket_mode_retry_delay(attempt: u32) -> Duration {
3052 let jitter_ms = Self::jitter_ms(SLACK_SOCKET_MODE_MAX_JITTER_MS);
3053 Self::compute_exponential_backoff_delay(
3054 SLACK_SOCKET_MODE_INITIAL_BACKOFF_SECS,
3055 attempt,
3056 SLACK_SOCKET_MODE_MAX_BACKOFF_SECS,
3057 jitter_ms,
3058 )
3059 }
3060
3061 fn next_retry_timestamp(wait: Duration) -> String {
3062 match chrono::Duration::from_std(wait) {
3063 Ok(delta) => (Utc::now() + delta).to_rfc3339(),
3064 Err(_) => Utc::now().to_rfc3339(),
3065 }
3066 }
3067
3068 fn evaluate_health(bot_ok: bool, socket_mode_enabled: bool, socket_mode_ok: bool) -> bool {
3069 if !bot_ok {
3070 return false;
3071 }
3072 if socket_mode_enabled {
3073 return socket_mode_ok;
3074 }
3075 true
3076 }
3077
3078 fn slack_api_call_succeeded(status: reqwest::StatusCode, body: &str) -> bool {
3079 if !status.is_success() {
3080 return false;
3081 }
3082
3083 let parsed: serde_json::Value = serde_json::from_str(body).unwrap_or_default();
3084 parsed
3085 .get("ok")
3086 .and_then(|value| value.as_bool())
3087 .unwrap_or(false)
3088 }
3089
3090 async fn fetch_history_with_retry(
3091 &self,
3092 channel_id: &str,
3093 params: &[(&str, String)],
3094 ) -> Option<serde_json::Value> {
3095 let mut total_wait = Duration::from_secs(0);
3096
3097 for attempt in 0..=SLACK_HISTORY_MAX_RETRIES {
3098 let resp = match self
3099 .http_client()
3100 .get("https://slack.com/api/conversations.history")
3101 .bearer_auth(&self.bot_token)
3102 .query(params)
3103 .send()
3104 .await
3105 {
3106 Ok(r) => r,
3107 Err(e) => {
3108 tracing::warn!("Slack poll error for channel {channel_id}: {e}");
3109 return None;
3110 }
3111 };
3112
3113 let status = resp.status();
3114 let headers = resp.headers().clone();
3115 let body = resp
3116 .text()
3117 .await
3118 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
3119
3120 let is_ratelimited_http = status == reqwest::StatusCode::TOO_MANY_REQUESTS;
3121 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
3122 let is_ratelimited_payload = payload.get("ok") == Some(&serde_json::Value::Bool(false))
3123 && payload
3124 .get("error")
3125 .and_then(|e| e.as_str())
3126 .is_some_and(|err| err == "ratelimited");
3127
3128 if is_ratelimited_http || is_ratelimited_payload {
3129 if attempt >= SLACK_HISTORY_MAX_RETRIES {
3130 tracing::error!(
3131 "Slack rate limit retries exhausted for conversations.history on channel {}. Total wait: {}s across {} attempts. Proceeding without channel history.",
3132 channel_id,
3133 total_wait.as_secs(),
3134 SLACK_HISTORY_MAX_RETRIES
3135 );
3136 return None;
3137 }
3138
3139 let retry_after_secs = Self::parse_retry_after_secs(&headers)
3140 .unwrap_or(SLACK_HISTORY_DEFAULT_RETRY_AFTER_SECS);
3141 let jitter_ms = Self::jitter_ms(SLACK_HISTORY_MAX_JITTER_MS);
3142 let wait = Self::compute_retry_delay(retry_after_secs, attempt, jitter_ms);
3143 total_wait += wait;
3144 let next_retry_at = Self::next_retry_timestamp(wait);
3145 tracing::warn!(
3146 "Slack conversations.history rate limited for channel {}. Retry-After: {}s. Attempt {}/{}. Next retry at {}.",
3147 channel_id,
3148 retry_after_secs,
3149 attempt + 1,
3150 SLACK_HISTORY_MAX_RETRIES,
3151 next_retry_at
3152 );
3153 tokio::time::sleep(wait).await;
3154 continue;
3155 }
3156
3157 if !status.is_success() {
3158 let sanitized = crate::providers::sanitize_api_error(&body);
3159 tracing::warn!(
3160 "Slack history request failed for channel {} ({}): {}",
3161 channel_id,
3162 status,
3163 sanitized
3164 );
3165 return None;
3166 }
3167
3168 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
3169 let err = payload
3170 .get("error")
3171 .and_then(|e| e.as_str())
3172 .unwrap_or("unknown");
3173 tracing::warn!("Slack history error for channel {channel_id}: {err}");
3174 return None;
3175 }
3176
3177 return Some(payload);
3178 }
3179
3180 None
3181 }
3182
3183 async fn fetch_thread_replies_with_retry(
3184 &self,
3185 channel_id: &str,
3186 thread_ts: &str,
3187 oldest: &str,
3188 ) -> Option<serde_json::Value> {
3189 let mut total_wait = Duration::from_secs(0);
3190
3191 for attempt in 0..=SLACK_HISTORY_MAX_RETRIES {
3192 let resp = match self
3193 .http_client()
3194 .get("https://slack.com/api/conversations.replies")
3195 .bearer_auth(&self.bot_token)
3196 .query(&[
3197 ("channel", channel_id),
3198 ("ts", thread_ts),
3199 ("oldest", oldest),
3200 ("limit", "50"),
3201 ])
3202 .send()
3203 .await
3204 {
3205 Ok(r) => r,
3206 Err(e) => {
3207 tracing::warn!(
3208 "Slack conversations.replies error for thread {thread_ts} in {channel_id}: {e}"
3209 );
3210 return None;
3211 }
3212 };
3213
3214 let status = resp.status();
3215 let headers = resp.headers().clone();
3216 let body = resp
3217 .text()
3218 .await
3219 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
3220
3221 let is_ratelimited_http = status == reqwest::StatusCode::TOO_MANY_REQUESTS;
3222 let payload: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
3223 let is_ratelimited_payload = payload.get("ok") == Some(&serde_json::Value::Bool(false))
3224 && payload
3225 .get("error")
3226 .and_then(|e| e.as_str())
3227 .is_some_and(|err| err == "ratelimited");
3228
3229 if is_ratelimited_http || is_ratelimited_payload {
3230 if attempt >= SLACK_HISTORY_MAX_RETRIES {
3231 tracing::error!(
3232 "Slack rate limit retries exhausted for conversations.replies on thread {} in channel {}. Total wait: {}s across {} attempts.",
3233 thread_ts,
3234 channel_id,
3235 total_wait.as_secs(),
3236 SLACK_HISTORY_MAX_RETRIES
3237 );
3238 return None;
3239 }
3240
3241 let retry_after_secs = Self::parse_retry_after_secs(&headers)
3242 .unwrap_or(SLACK_HISTORY_DEFAULT_RETRY_AFTER_SECS);
3243 let jitter_ms = Self::jitter_ms(SLACK_HISTORY_MAX_JITTER_MS);
3244 let wait = Self::compute_retry_delay(retry_after_secs, attempt, jitter_ms);
3245 total_wait += wait;
3246 let next_retry_at = Self::next_retry_timestamp(wait);
3247 tracing::warn!(
3248 "Slack conversations.replies rate limited for thread {} in channel {}. Retry-After: {}s. Attempt {}/{}. Next retry at {}.",
3249 thread_ts,
3250 channel_id,
3251 retry_after_secs,
3252 attempt + 1,
3253 SLACK_HISTORY_MAX_RETRIES,
3254 next_retry_at
3255 );
3256 tokio::time::sleep(wait).await;
3257 continue;
3258 }
3259
3260 if !status.is_success() {
3261 let sanitized = crate::providers::sanitize_api_error(&body);
3262 tracing::warn!(
3263 "Slack conversations.replies failed for thread {} in channel {} ({}): {}",
3264 thread_ts,
3265 channel_id,
3266 status,
3267 sanitized
3268 );
3269 return None;
3270 }
3271
3272 if payload.get("ok") == Some(&serde_json::Value::Bool(false)) {
3273 let err = payload
3274 .get("error")
3275 .and_then(|e| e.as_str())
3276 .unwrap_or("unknown");
3277 tracing::warn!(
3278 "Slack conversations.replies error for thread {} in channel {}: {}",
3279 thread_ts,
3280 channel_id,
3281 err
3282 );
3283 return None;
3284 }
3285
3286 return Some(payload);
3287 }
3288
3289 None
3290 }
3291
3292 fn extract_active_threads(messages: &[serde_json::Value]) -> Vec<(String, String)> {
3295 messages
3296 .iter()
3297 .filter_map(|msg| {
3298 let thread_ts = msg.get("thread_ts").and_then(|v| v.as_str())?;
3299 let ts = msg.get("ts").and_then(|v| v.as_str()).unwrap_or_default();
3300 if ts != thread_ts {
3302 return None;
3303 }
3304 let reply_count = msg.get("reply_count").and_then(|v| v.as_u64()).unwrap_or(0);
3305 if reply_count == 0 {
3306 return None;
3307 }
3308 let latest_reply = msg
3309 .get("latest_reply")
3310 .and_then(|v| v.as_str())
3311 .unwrap_or(thread_ts);
3312 Some((thread_ts.to_string(), latest_reply.to_string()))
3313 })
3314 .collect()
3315 }
3316
3317 fn evict_stale_threads(
3320 active_threads: &mut HashMap<String, (String, String, Instant)>,
3321 now: Instant,
3322 ) {
3323 let max_age = Duration::from_secs(SLACK_POLL_THREAD_EXPIRE_SECS);
3324 active_threads
3325 .retain(|_, (_, _, last_activity)| now.duration_since(*last_activity) < max_age);
3326 if active_threads.len() > SLACK_POLL_ACTIVE_THREAD_MAX {
3327 let overflow = active_threads.len() - SLACK_POLL_ACTIVE_THREAD_MAX;
3328 let mut entries: Vec<_> = active_threads
3329 .iter()
3330 .map(|(k, (_, _, t))| (k.clone(), *t))
3331 .collect();
3332 entries.sort_by_key(|(_, t)| *t);
3333 for (key, _) in entries.into_iter().take(overflow) {
3334 active_threads.remove(&key);
3335 }
3336 }
3337 }
3338}
3339
3340const SLACK_TRUNCATION_INDICATOR: &str = "\n\n...[message truncated]";
3341
3342fn split_text_into_chunks(text: &str, max_chars: usize, max_chunks: usize) -> Vec<String> {
3346 if text.len() <= max_chars {
3347 return vec![text.to_string()];
3348 }
3349
3350 let mut chunks: Vec<String> = Vec::new();
3351 let mut remaining = text;
3352
3353 while !remaining.is_empty() && chunks.len() < max_chunks {
3354 let is_last_slot = chunks.len() + 1 == max_chunks;
3355
3356 if remaining.len() <= max_chars && !is_last_slot {
3357 chunks.push(remaining.to_string());
3358 break;
3359 }
3360
3361 if is_last_slot {
3362 if remaining.len() <= max_chars {
3364 chunks.push(remaining.to_string());
3365 } else {
3366 let avail = max_chars - SLACK_TRUNCATION_INDICATOR.len();
3368 let break_at = remaining[..avail]
3369 .rfind('\n')
3370 .map(|i| i + 1)
3371 .or_else(|| remaining[..avail].rfind(' ').map(|i| i + 1))
3372 .unwrap_or(avail);
3373 let mut chunk = remaining[..break_at].to_string();
3374 chunk.push_str(SLACK_TRUNCATION_INDICATOR);
3375 chunks.push(chunk);
3376 }
3377 break;
3378 }
3379
3380 let limit = max_chars.min(remaining.len());
3382 let break_at = remaining[..limit]
3383 .rfind('\n')
3384 .map(|i| i + 1)
3385 .or_else(|| remaining[..limit].rfind(' ').map(|i| i + 1))
3386 .unwrap_or(limit);
3387
3388 chunks.push(remaining[..break_at].to_string());
3389 remaining = &remaining[break_at..];
3390 }
3391
3392 chunks
3393}
3394
3395#[async_trait]
3396impl Channel for SlackChannel {
3397 fn name(&self) -> &str {
3398 "slack"
3399 }
3400
3401 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
3402 let body = if let Some(blocks_json) = message.content.strip_prefix(super::BLOCK_KIT_PREFIX)
3404 {
3405 let blocks: serde_json::Value = serde_json::from_str(blocks_json)
3406 .context("invalid Block Kit JSON in runtime command response")?;
3407 let mut body = serde_json::json!({
3408 "channel": message.recipient,
3409 "text": "Model configuration",
3410 "blocks": blocks
3411 });
3412 if let Some(ts) = self.outbound_thread_ts(message) {
3413 body["thread_ts"] = serde_json::json!(ts);
3414 }
3415 body
3416 } else {
3417 let mut body = serde_json::json!({
3418 "channel": message.recipient,
3419 "text": message.content
3420 });
3421
3422 let block_limit = if self.use_markdown_blocks {
3427 SLACK_MARKDOWN_BLOCK_MAX_CHARS
3428 } else {
3429 SLACK_BLOCK_TEXT_MAX_CHARS
3430 };
3431 if message.content.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
3432 let chunks = split_text_into_chunks(
3433 &message.content,
3434 block_limit,
3435 SLACK_MAX_BLOCKS_PER_MESSAGE,
3436 );
3437 let blocks: Vec<serde_json::Value> = chunks
3438 .into_iter()
3439 .map(|chunk| {
3440 if self.use_markdown_blocks {
3441 serde_json::json!({
3442 "type": "markdown",
3443 "text": chunk
3444 })
3445 } else {
3446 serde_json::json!({
3447 "type": "section",
3448 "text": {
3449 "type": "mrkdwn",
3450 "text": chunk
3451 }
3452 })
3453 }
3454 })
3455 .collect();
3456 body["blocks"] = serde_json::Value::Array(blocks);
3457 }
3458
3459 if let Some(ts) = self.outbound_thread_ts(message) {
3460 body["thread_ts"] = serde_json::json!(ts);
3461 }
3462 body
3463 };
3464
3465 let resp = self
3466 .http_client()
3467 .post("https://slack.com/api/chat.postMessage")
3468 .bearer_auth(&self.bot_token)
3469 .json(&body)
3470 .send()
3471 .await?;
3472
3473 let status = resp.status();
3474 let body = resp
3475 .text()
3476 .await
3477 .unwrap_or_else(|e| format!("<failed to read response body: {e}>"));
3478
3479 if !status.is_success() {
3480 let sanitized = crate::providers::sanitize_api_error(&body);
3481 anyhow::bail!("Slack chat.postMessage failed ({status}): {sanitized}");
3482 }
3483
3484 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
3486 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
3487 let err = parsed
3488 .get("error")
3489 .and_then(|e| e.as_str())
3490 .unwrap_or("unknown");
3491 anyhow::bail!("Slack chat.postMessage failed: {err}");
3492 }
3493
3494 Ok(())
3495 }
3496
3497 fn supports_draft_updates(&self) -> bool {
3498 self.stream_drafts
3499 }
3500
3501 async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> {
3502 if !self.stream_drafts {
3503 return Ok(None);
3504 }
3505
3506 let thread_ts = self.outbound_thread_ts(message).unwrap_or_default();
3509 let lazy_id = format!("{LAZY_DRAFT_PREFIX}{}:{}", message.recipient, thread_ts);
3510 Ok(Some(lazy_id))
3511 }
3512
3513 async fn update_draft(
3514 &self,
3515 recipient: &str,
3516 message_id: &str,
3517 text: &str,
3518 ) -> anyhow::Result<()> {
3519 if message_id.starts_with(LAZY_DRAFT_PREFIX)
3521 && self.resolve_draft_ts(message_id).await.is_none()
3522 {
3523 let _ = self.materialize_lazy_draft(message_id, text).await;
3526 self.last_draft_edit
3527 .lock()
3528 .expect("last_draft_edit lock")
3529 .insert(recipient.to_string(), Instant::now());
3530 return Ok(());
3531 }
3532
3533 let real_ts = match self.resolve_draft_ts(message_id).await {
3535 Some(ts) => ts,
3536 None => return Ok(()),
3537 };
3538
3539 {
3541 let last_edits = self.last_draft_edit.lock().expect("last_draft_edit lock");
3542 if let Some(last_time) = last_edits.get(recipient) {
3543 let elapsed_ms = u64::try_from(last_time.elapsed().as_millis()).unwrap_or(u64::MAX);
3544 if elapsed_ms < self.draft_update_interval_ms {
3545 return Ok(());
3546 }
3547 }
3548 }
3549
3550 self.last_draft_edit
3553 .lock()
3554 .expect("last_draft_edit lock")
3555 .insert(recipient.to_string(), Instant::now());
3556
3557 let display_text = if text.len() > SLACK_MESSAGE_MAX_CHARS {
3560 text[..text
3561 .char_indices()
3562 .take_while(|(idx, _)| *idx < SLACK_MESSAGE_MAX_CHARS)
3563 .last()
3564 .map_or(0, |(idx, ch)| idx + ch.len_utf8())]
3565 .to_string()
3566 } else {
3567 text.to_string()
3568 };
3569
3570 let client = self.http_client();
3571 let token = self.bot_token.clone();
3572 let channel = recipient.to_string();
3573 tokio::spawn(async move {
3574 let mut body = serde_json::json!({
3575 "channel": channel,
3576 "ts": real_ts,
3577 "text": &display_text,
3578 });
3579 if display_text.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
3580 body["blocks"] = serde_json::json!([{
3581 "type": "markdown",
3582 "text": &display_text
3583 }]);
3584 }
3585 match client
3586 .post("https://slack.com/api/chat.update")
3587 .bearer_auth(&token)
3588 .json(&body)
3589 .send()
3590 .await
3591 {
3592 Ok(resp) => {
3593 if let Ok(resp_body) = resp.json::<serde_json::Value>().await {
3594 if resp_body.get("ok") != Some(&serde_json::Value::Bool(true)) {
3595 let err = resp_body
3596 .get("error")
3597 .and_then(|e| e.as_str())
3598 .unwrap_or("unknown");
3599 tracing::debug!("Slack chat.update (draft) failed: {err}");
3600 }
3601 }
3602 }
3603 Err(e) => {
3604 tracing::debug!("Slack chat.update (draft) HTTP error: {e}");
3605 }
3606 }
3607 });
3608
3609 Ok(())
3610 }
3611
3612 async fn update_draft_progress(
3613 &self,
3614 recipient: &str,
3615 _message_id: &str,
3616 text: &str,
3617 ) -> anyhow::Result<()> {
3618 let status_line = text.trim().lines().last().unwrap_or("").trim();
3619 if status_line.is_empty() || status_line.starts_with("\u{1f914}") {
3622 return Ok(());
3623 }
3624 self.set_assistant_status(recipient, status_line).await;
3625 Ok(())
3626 }
3627
3628 async fn finalize_draft(
3629 &self,
3630 recipient: &str,
3631 message_id: &str,
3632 text: &str,
3633 ) -> anyhow::Result<()> {
3634 self.last_draft_edit
3636 .lock()
3637 .expect("last_draft_edit lock")
3638 .remove(recipient);
3639
3640 let draft_thread_ts = message_id
3643 .strip_prefix(LAZY_DRAFT_PREFIX)
3644 .and_then(|rest| rest.find(':').map(|pos| &rest[pos + 1..]))
3645 .filter(|ts| !ts.is_empty())
3646 .map(String::from);
3647
3648 let real_ts = self.resolve_draft_ts(message_id).await;
3649 self.lazy_draft_ts.lock().await.remove(message_id);
3651
3652 let Some(real_ts) = real_ts else {
3653 let msg = SendMessage::new(text, recipient).in_thread(draft_thread_ts);
3655 return self.send(&msg).await;
3656 };
3657
3658 if text.len() > SLACK_MESSAGE_MAX_CHARS {
3660 let _ = self.delete_message(recipient, &real_ts).await;
3661 let msg = SendMessage::new(text, recipient).in_thread(draft_thread_ts);
3662 return self.send(&msg).await;
3663 }
3664
3665 let mut body = serde_json::json!({
3667 "channel": recipient,
3668 "ts": real_ts,
3669 "text": text,
3670 });
3671
3672 if text.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
3674 body["blocks"] = serde_json::json!([{
3675 "type": "markdown",
3676 "text": text
3677 }]);
3678 }
3679
3680 let resp = self
3681 .http_client()
3682 .post("https://slack.com/api/chat.update")
3683 .bearer_auth(&self.bot_token)
3684 .json(&body)
3685 .send()
3686 .await?;
3687
3688 let resp_body: serde_json::Value = resp.json().await?;
3689 if resp_body.get("ok") == Some(&serde_json::Value::Bool(true)) {
3690 return Ok(());
3691 }
3692
3693 let err = resp_body
3695 .get("error")
3696 .and_then(|e| e.as_str())
3697 .unwrap_or("unknown");
3698 tracing::debug!("Slack chat.update (finalize) failed: {err}; falling back to delete+send");
3699
3700 let _ = self.delete_message(recipient, &real_ts).await;
3701 let msg = SendMessage::new(text, recipient).in_thread(draft_thread_ts);
3702 self.send(&msg).await
3703 }
3704
3705 async fn cancel_draft(&self, recipient: &str, message_id: &str) -> anyhow::Result<()> {
3706 self.last_draft_edit
3707 .lock()
3708 .expect("last_draft_edit lock")
3709 .remove(recipient);
3710 let real_ts = self.resolve_draft_ts(message_id).await;
3711 self.lazy_draft_ts.lock().await.remove(message_id);
3712 if let Some(ts) = real_ts {
3713 self.delete_message(recipient, &ts).await
3714 } else {
3715 Ok(())
3716 }
3717 }
3718
3719 async fn add_reaction(
3720 &self,
3721 channel_id: &str,
3722 message_id: &str,
3723 emoji: &str,
3724 ) -> anyhow::Result<()> {
3725 let ts = extract_slack_ts(message_id);
3726 let name = unicode_emoji_to_slack_name(emoji);
3727
3728 let body = serde_json::json!({
3729 "channel": channel_id,
3730 "timestamp": ts,
3731 "name": name
3732 });
3733
3734 let resp = self
3735 .http_client()
3736 .post("https://slack.com/api/reactions.add")
3737 .bearer_auth(&self.bot_token)
3738 .json(&body)
3739 .send()
3740 .await?;
3741
3742 let status = resp.status();
3743 let text = resp.text().await.unwrap_or_default();
3744
3745 if !status.is_success() {
3746 let sanitized = crate::providers::sanitize_api_error(&text);
3747 anyhow::bail!("Slack reactions.add failed ({status}): {sanitized}");
3748 }
3749
3750 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap_or_default();
3751 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
3752 let err = parsed
3753 .get("error")
3754 .and_then(|e| e.as_str())
3755 .unwrap_or("unknown");
3756 if err != "already_reacted" {
3757 anyhow::bail!("Slack reactions.add failed: {err}");
3758 }
3759 }
3760
3761 Ok(())
3762 }
3763
3764 async fn remove_reaction(
3765 &self,
3766 channel_id: &str,
3767 message_id: &str,
3768 emoji: &str,
3769 ) -> anyhow::Result<()> {
3770 let ts = extract_slack_ts(message_id);
3771 let name = unicode_emoji_to_slack_name(emoji);
3772
3773 let body = serde_json::json!({
3774 "channel": channel_id,
3775 "timestamp": ts,
3776 "name": name
3777 });
3778
3779 let resp = self
3780 .http_client()
3781 .post("https://slack.com/api/reactions.remove")
3782 .bearer_auth(&self.bot_token)
3783 .json(&body)
3784 .send()
3785 .await?;
3786
3787 let status = resp.status();
3788 let text = resp.text().await.unwrap_or_default();
3789
3790 if !status.is_success() {
3791 let sanitized = crate::providers::sanitize_api_error(&text);
3792 anyhow::bail!("Slack reactions.remove failed ({status}): {sanitized}");
3793 }
3794
3795 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap_or_default();
3796 if parsed.get("ok") == Some(&serde_json::Value::Bool(false)) {
3797 let err = parsed
3798 .get("error")
3799 .and_then(|e| e.as_str())
3800 .unwrap_or("unknown");
3801 if err != "no_reaction" {
3802 anyhow::bail!("Slack reactions.remove failed: {err}");
3803 }
3804 }
3805
3806 Ok(())
3807 }
3808
3809 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
3810 let bot_user_id = self.get_bot_user_id().await.unwrap_or_default();
3811 let scoped_channels = self.scoped_channel_ids();
3812 if self.configured_app_token().is_some() {
3813 tracing::info!("Slack channel listening in Socket Mode");
3814 return self
3815 .listen_socket_mode(tx, &bot_user_id, scoped_channels)
3816 .await;
3817 }
3818
3819 let mut discovered_channels: Vec<String> = Vec::new();
3820 let mut last_discovery = Instant::now();
3821 let mut last_ts_by_channel: HashMap<String, String> = HashMap::new();
3822 let mut active_threads: HashMap<String, (String, String, Instant)> = HashMap::new();
3824
3825 if let Some(ref channel_ids) = scoped_channels {
3826 tracing::info!(
3827 "Slack channel listening on {} configured channel(s): {}",
3828 channel_ids.len(),
3829 channel_ids.join(", ")
3830 );
3831 } else {
3832 tracing::info!(
3833 "Slack channel_id/channel_ids not set (or wildcard only); listening across all accessible channels."
3834 );
3835 }
3836
3837 loop {
3838 tokio::time::sleep(Duration::from_secs(3)).await;
3839
3840 let target_channels = if let Some(ref channel_ids) = scoped_channels {
3841 channel_ids.clone()
3842 } else {
3843 if discovered_channels.is_empty()
3844 || last_discovery.elapsed() >= Duration::from_secs(60)
3845 {
3846 match self.list_accessible_channels().await {
3847 Ok(channels) => {
3848 if channels != discovered_channels {
3849 tracing::info!(
3850 "Slack auto-discovery refreshed: listening on {} channel(s).",
3851 channels.len()
3852 );
3853 }
3854 discovered_channels = channels;
3855 }
3856 Err(e) => {
3857 tracing::warn!("Slack channel discovery failed: {e}");
3858 }
3859 }
3860 last_discovery = Instant::now();
3861 }
3862
3863 discovered_channels.clone()
3864 };
3865
3866 if target_channels.is_empty() {
3867 tracing::debug!("Slack: no accessible channels discovered yet");
3868 continue;
3869 }
3870
3871 for channel_id in target_channels {
3872 let had_cursor = last_ts_by_channel.contains_key(&channel_id);
3873 let bootstrap_ts = Self::slack_now_ts();
3874 let cursor_ts =
3875 Self::ensure_poll_cursor(&mut last_ts_by_channel, &channel_id, &bootstrap_ts);
3876 if !had_cursor {
3877 tracing::debug!(
3878 "Slack: initialized cursor for channel {} at {} to prevent historical replay",
3879 channel_id,
3880 cursor_ts
3881 );
3882 }
3883 let params = vec![
3884 ("channel", channel_id.clone()),
3885 ("limit", "10".to_string()),
3886 ("oldest", cursor_ts),
3887 ];
3888
3889 let Some(data) = self.fetch_history_with_retry(&channel_id, ¶ms).await else {
3890 continue;
3891 };
3892
3893 if let Some(messages) = data.get("messages").and_then(|m| m.as_array()) {
3894 for (thread_ts, latest_reply) in Self::extract_active_threads(messages) {
3896 let entry = active_threads.entry(thread_ts.clone()).or_insert_with(|| {
3897 (channel_id.clone(), thread_ts.clone(), Instant::now())
3898 });
3899 if latest_reply > entry.1 {
3900 entry.1 = latest_reply;
3901 }
3902 entry.2 = Instant::now();
3903 }
3904
3905 for msg in messages.iter().rev() {
3907 let subtype = msg.get("subtype").and_then(|value| value.as_str());
3908 if !Self::is_supported_message_subtype(subtype) {
3909 continue;
3910 }
3911 let ts = msg.get("ts").and_then(|t| t.as_str()).unwrap_or("");
3912 let user = msg
3913 .get("user")
3914 .and_then(|u| u.as_str())
3915 .unwrap_or("unknown");
3916 let last_ts = last_ts_by_channel
3917 .get(&channel_id)
3918 .map(String::as_str)
3919 .unwrap_or("");
3920
3921 if user == bot_user_id {
3923 continue;
3924 }
3925
3926 if !self.is_user_allowed(user) {
3928 tracing::warn!(
3929 "Slack: ignoring message from unauthorized user: {user}"
3930 );
3931 continue;
3932 }
3933
3934 if ts <= last_ts {
3935 continue;
3936 }
3937
3938 let is_group_message = Self::is_group_channel_id(&channel_id);
3939 let is_thread_reply =
3940 msg.get("thread_ts").and_then(|v| v.as_str()).is_some();
3941 let allow_sender_without_mention =
3942 is_group_message && self.is_group_sender_trigger_enabled(user);
3943 let require_mention = self.mention_only
3944 && is_group_message
3945 && !allow_sender_without_mention
3946 && !is_thread_reply;
3947 let Some(normalized_text) = self
3948 .build_incoming_content(msg, require_mention, &bot_user_id)
3949 .await
3950 else {
3951 continue;
3952 };
3953
3954 let inbound_thread_ts = Self::inbound_thread_ts_genuine_only(msg);
3955 if self.try_intercept_approval(
3956 &channel_id,
3957 inbound_thread_ts.as_deref(),
3958 &normalized_text,
3959 user,
3960 ) {
3961 last_ts_by_channel.insert(channel_id.clone(), ts.to_string());
3962 continue;
3963 }
3964
3965 last_ts_by_channel.insert(channel_id.clone(), ts.to_string());
3966 let sender = self.resolve_sender_identity(user).await;
3967
3968 let channel_msg = ChannelMessage {
3969 id: format!("slack_{channel_id}_{ts}"),
3970 sender,
3971 reply_target: channel_id.clone(),
3972 content: normalized_text,
3973 channel: "slack".to_string(),
3974 timestamp: std::time::SystemTime::now()
3975 .duration_since(std::time::UNIX_EPOCH)
3976 .unwrap_or_default()
3977 .as_secs(),
3978 thread_ts: if self.thread_replies {
3979 Self::inbound_thread_ts(msg, ts)
3980 } else {
3981 Self::inbound_thread_ts_genuine_only(msg)
3982 },
3983 interruption_scope_id: Self::inbound_interruption_scope_id(msg, ts),
3984 attachments: vec![],
3985 };
3986
3987 if tx.send(channel_msg).await.is_err() {
3988 return Ok(());
3989 }
3990 }
3991 }
3992 }
3993
3994 Self::evict_stale_threads(&mut active_threads, Instant::now());
3996 let thread_snapshot: Vec<(String, String, String)> = active_threads
3997 .iter()
3998 .map(|(thread_ts, (ch, last_reply, _))| {
3999 (thread_ts.clone(), ch.clone(), last_reply.clone())
4000 })
4001 .collect();
4002
4003 for (thread_ts, thread_channel_id, last_reply_ts) in thread_snapshot {
4004 let Some(data) = self
4005 .fetch_thread_replies_with_retry(&thread_channel_id, &thread_ts, &last_reply_ts)
4006 .await
4007 else {
4008 continue;
4009 };
4010
4011 let Some(replies) = data.get("messages").and_then(|m| m.as_array()) else {
4012 continue;
4013 };
4014
4015 for reply in replies {
4016 let reply_ts = reply.get("ts").and_then(|v| v.as_str()).unwrap_or_default();
4017 if reply_ts.is_empty() || reply_ts <= last_reply_ts.as_str() {
4018 continue;
4019 }
4020 let subtype = reply.get("subtype").and_then(|v| v.as_str());
4021 if !Self::is_supported_message_subtype(subtype) {
4022 continue;
4023 }
4024
4025 let user = reply
4026 .get("user")
4027 .and_then(|u| u.as_str())
4028 .unwrap_or_default();
4029 if user.is_empty() || user == bot_user_id {
4030 continue;
4031 }
4032 if !self.is_user_allowed(user) {
4033 continue;
4034 }
4035
4036 let require_mention = false;
4039 let Some(normalized_text) = self
4040 .build_incoming_content(reply, require_mention, &bot_user_id)
4041 .await
4042 else {
4043 continue;
4044 };
4045
4046 if let Some(entry) = active_threads.get_mut(&thread_ts) {
4048 if reply_ts > entry.1.as_str() {
4049 entry.1 = reply_ts.to_string();
4050 }
4051 entry.2 = Instant::now();
4052 }
4053
4054 if self.try_intercept_approval(
4055 &thread_channel_id,
4056 Some(&thread_ts),
4057 &normalized_text,
4058 user,
4059 ) {
4060 continue;
4061 }
4062
4063 let sender = self.resolve_sender_identity(user).await;
4064
4065 let channel_msg = ChannelMessage {
4066 id: format!("slack_{thread_channel_id}_{reply_ts}"),
4067 sender,
4068 reply_target: thread_channel_id.clone(),
4069 content: normalized_text,
4070 channel: "slack".to_string(),
4071 timestamp: std::time::SystemTime::now()
4072 .duration_since(std::time::UNIX_EPOCH)
4073 .unwrap_or_default()
4074 .as_secs(),
4075 thread_ts: Some(thread_ts.clone()),
4076 interruption_scope_id: Some(thread_ts.clone()),
4077 attachments: vec![],
4078 };
4079
4080 if tx.send(channel_msg).await.is_err() {
4081 return Ok(());
4082 }
4083 }
4084 }
4085 }
4086 }
4087
4088 async fn health_check(&self) -> bool {
4089 let bot_ok = match self
4090 .http_client()
4091 .get("https://slack.com/api/auth.test")
4092 .bearer_auth(&self.bot_token)
4093 .send()
4094 .await
4095 {
4096 Ok(response) => {
4097 let status = response.status();
4098 let body = response.text().await.unwrap_or_default();
4099 Self::slack_api_call_succeeded(status, &body)
4100 }
4101 Err(_) => false,
4102 };
4103 let socket_mode_enabled = self.configured_app_token().is_some();
4104 let socket_mode_ok = if socket_mode_enabled {
4105 self.open_socket_mode_url().await.is_ok()
4106 } else {
4107 true
4108 };
4109 Self::evaluate_health(bot_ok, socket_mode_enabled, socket_mode_ok)
4110 }
4111
4112 async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> {
4113 let thread_ts = {
4114 let map = self
4115 .active_assistant_thread
4116 .lock()
4117 .map_err(|e| anyhow::anyhow!("lock poisoned: {e}"))?;
4118 match map.get(recipient) {
4119 Some(ts) => ts.clone(),
4120 None => return Ok(()),
4121 }
4122 };
4123
4124 let body = serde_json::json!({
4125 "channel_id": recipient,
4126 "thread_ts": thread_ts,
4127 "status": "is thinking...",
4128 });
4129
4130 if let Ok(resp) = self
4132 .http_client()
4133 .post("https://slack.com/api/assistant.threads.setStatus")
4134 .bearer_auth(&self.bot_token)
4135 .json(&body)
4136 .send()
4137 .await
4138 {
4139 if !resp.status().is_success() {
4140 tracing::debug!(
4141 "assistant.threads.setStatus returned {}; ignoring",
4142 resp.status()
4143 );
4144 }
4145 }
4146
4147 Ok(())
4148 }
4149
4150 async fn stop_typing(&self, recipient: &str) -> anyhow::Result<()> {
4151 if self.stream_drafts {
4155 self.set_assistant_status(recipient, "").await;
4156 }
4157 Ok(())
4158 }
4159}
4160
4161#[cfg(test)]
4162mod tests {
4163 use super::*;
4164
4165 #[test]
4166 fn slack_channel_name() {
4167 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
4168 assert_eq!(ch.name(), "slack");
4169 }
4170
4171 #[test]
4172 fn slack_channel_with_channel_id() {
4173 let ch = SlackChannel::new(
4174 "xoxb-fake".into(),
4175 None,
4176 Some("C12345".into()),
4177 vec![],
4178 vec![],
4179 );
4180 assert_eq!(ch.channel_id, Some("C12345".to_string()));
4181 }
4182
4183 #[test]
4184 fn slack_group_reply_policy_defaults_to_all_messages() {
4185 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["*".into()]);
4186 assert!(ch.thread_replies);
4187 assert!(!ch.mention_only);
4188 assert!(ch.group_reply_allowed_sender_ids.is_empty());
4189 }
4190
4191 #[test]
4192 fn with_thread_replies_sets_flag() {
4193 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
4194 .with_thread_replies(false);
4195 assert!(!ch.thread_replies);
4196 }
4197
4198 #[test]
4199 fn outbound_thread_ts_respects_thread_replies_setting() {
4200 let msg = SendMessage::new("hello", "C123").in_thread(Some("1741234567.100001".into()));
4201
4202 let threaded = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
4203 assert_eq!(threaded.outbound_thread_ts(&msg), Some("1741234567.100001"));
4204
4205 let channel_root = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
4206 .with_thread_replies(false);
4207 assert_eq!(channel_root.outbound_thread_ts(&msg), None);
4208 }
4209
4210 #[test]
4211 fn with_workspace_dir_sets_field() {
4212 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
4213 .with_workspace_dir(PathBuf::from("/tmp/slack-workspace"));
4214 assert_eq!(
4215 ch.workspace_dir.as_deref(),
4216 Some(std::path::Path::new("/tmp/slack-workspace"))
4217 );
4218 }
4219
4220 #[test]
4221 fn slack_group_reply_policy_applies_sender_overrides() {
4222 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["*".into()])
4223 .with_group_reply_policy(true, vec![" U111 ".into(), "U111".into(), "U222".into()]);
4224
4225 assert!(ch.mention_only);
4226 assert_eq!(
4227 ch.group_reply_allowed_sender_ids,
4228 vec!["U111".to_string(), "U222".to_string()]
4229 );
4230 assert!(ch.is_group_sender_trigger_enabled("U111"));
4231 assert!(!ch.is_group_sender_trigger_enabled("U999"));
4232 }
4233
4234 #[test]
4235 fn normalized_channel_id_respects_wildcard_and_blank() {
4236 assert_eq!(SlackChannel::normalized_channel_id(None), None);
4237 assert_eq!(SlackChannel::normalized_channel_id(Some("")), None);
4238 assert_eq!(SlackChannel::normalized_channel_id(Some(" ")), None);
4239 assert_eq!(SlackChannel::normalized_channel_id(Some("*")), None);
4240 assert_eq!(SlackChannel::normalized_channel_id(Some(" * ")), None);
4241 assert_eq!(
4242 SlackChannel::normalized_channel_id(Some(" C12345 ")),
4243 Some("C12345".to_string())
4244 );
4245 }
4246
4247 #[test]
4248 fn configured_app_token_ignores_blank_values() {
4249 let ch = SlackChannel::new("xoxb-fake".into(), Some(" ".into()), None, vec![], vec![]);
4250 assert_eq!(ch.configured_app_token(), None);
4251 }
4252
4253 #[test]
4254 fn configured_app_token_trims_value() {
4255 let ch = SlackChannel::new(
4256 "xoxb-fake".into(),
4257 Some(" xapp-123 ".into()),
4258 None,
4259 vec![],
4260 vec![],
4261 );
4262 assert_eq!(ch.configured_app_token().as_deref(), Some("xapp-123"));
4263 }
4264
4265 #[test]
4266 fn scoped_channel_ids_prefers_explicit_list() {
4267 let ch = SlackChannel::new(
4268 "xoxb-fake".into(),
4269 None,
4270 Some("C_SINGLE".into()),
4271 vec!["C_LIST1".into(), "D_DM1".into()],
4272 vec![],
4273 );
4274 assert_eq!(
4275 ch.scoped_channel_ids(),
4276 Some(vec!["C_LIST1".to_string(), "D_DM1".to_string()])
4277 );
4278 }
4279
4280 #[test]
4281 fn scoped_channel_ids_falls_back_to_single_channel_id() {
4282 let ch = SlackChannel::new(
4283 "xoxb-fake".into(),
4284 None,
4285 Some("C_SINGLE".into()),
4286 vec![],
4287 vec![],
4288 );
4289 assert_eq!(ch.scoped_channel_ids(), Some(vec!["C_SINGLE".to_string()]));
4290 }
4291
4292 #[test]
4293 fn scoped_channel_ids_returns_none_for_wildcard_mode() {
4294 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
4295 assert_eq!(ch.scoped_channel_ids(), None);
4296 }
4297
4298 #[test]
4299 fn is_group_channel_id_detects_channel_prefixes() {
4300 assert!(SlackChannel::is_group_channel_id("C123"));
4301 assert!(SlackChannel::is_group_channel_id("G123"));
4302 assert!(!SlackChannel::is_group_channel_id("D123"));
4303 assert!(!SlackChannel::is_group_channel_id(""));
4304 }
4305
4306 #[test]
4307 fn extract_channel_ids_filters_archived_and_non_member_entries() {
4308 let payload = serde_json::json!({
4309 "channels": [
4310 {"id": "C1", "is_archived": false, "is_member": true},
4311 {"id": "C2", "is_archived": true, "is_member": true},
4312 {"id": "C3", "is_archived": false, "is_member": false},
4313 {"id": "C1", "is_archived": false, "is_member": true},
4314 {"id": "C4"}
4315 ]
4316 });
4317 let ids = SlackChannel::extract_channel_ids(&payload);
4318 assert_eq!(ids, vec!["C1".to_string(), "C4".to_string()]);
4319 }
4320
4321 #[test]
4322 fn empty_allowlist_denies_everyone() {
4323 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
4324 assert!(!ch.is_user_allowed("U12345"));
4325 assert!(!ch.is_user_allowed("anyone"));
4326 }
4327
4328 #[test]
4329 fn wildcard_allows_everyone() {
4330 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["*".into()]);
4331 assert!(ch.is_user_allowed("U12345"));
4332 }
4333
4334 #[test]
4335 fn extract_user_display_name_prefers_profile_display_name() {
4336 let payload = serde_json::json!({
4337 "ok": true,
4338 "user": {
4339 "name": "fallback_name",
4340 "profile": {
4341 "display_name": "Display Name",
4342 "real_name": "Real Name"
4343 }
4344 }
4345 });
4346
4347 assert_eq!(
4348 SlackChannel::extract_user_display_name(&payload).as_deref(),
4349 Some("Display Name")
4350 );
4351 }
4352
4353 #[test]
4354 fn extract_user_display_name_falls_back_to_username() {
4355 let payload = serde_json::json!({
4356 "ok": true,
4357 "user": {
4358 "name": "fallback_name",
4359 "profile": {
4360 "display_name": " ",
4361 "real_name": ""
4362 }
4363 }
4364 });
4365
4366 assert_eq!(
4367 SlackChannel::extract_user_display_name(&payload).as_deref(),
4368 Some("fallback_name")
4369 );
4370 }
4371
4372 #[test]
4373 fn cached_sender_display_name_returns_none_when_expired() {
4374 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["*".into()]);
4375 {
4376 let mut cache = ch.user_display_name_cache.lock().unwrap();
4377 cache.insert(
4378 "U123".to_string(),
4379 CachedSlackDisplayName {
4380 display_name: "Expired Name".to_string(),
4381 expires_at: Instant::now()
4382 .checked_sub(Duration::from_secs(1))
4383 .expect("instant should allow subtracting one second in tests"),
4384 },
4385 );
4386 }
4387
4388 assert_eq!(ch.cached_sender_display_name("U123"), None);
4389 }
4390
4391 #[test]
4392 fn cached_sender_display_name_returns_cached_value_when_valid() {
4393 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["*".into()]);
4394 ch.cache_sender_display_name("U123", "Cached Name");
4395
4396 assert_eq!(
4397 ch.cached_sender_display_name("U123").as_deref(),
4398 Some("Cached Name")
4399 );
4400 }
4401
4402 #[test]
4403 fn normalize_incoming_content_requires_mention_when_enabled() {
4404 assert!(SlackChannel::normalize_incoming_content("hello", true, "U_BOT").is_none());
4405 assert_eq!(
4406 SlackChannel::normalize_incoming_content("<@U_BOT> run", true, "U_BOT").as_deref(),
4407 Some("run")
4408 );
4409 }
4410
4411 #[test]
4412 fn normalize_incoming_content_without_mention_mode_keeps_message() {
4413 assert_eq!(
4414 SlackChannel::normalize_incoming_content(" hello world ", false, "U_BOT").as_deref(),
4415 Some("hello world")
4416 );
4417 }
4418
4419 #[test]
4420 fn compose_incoming_content_allows_attachment_only_messages() {
4421 let composed = SlackChannel::compose_incoming_content(
4422 String::new(),
4423 vec!["[IMAGE:data:image/png;base64,aaaa]".to_string()],
4424 );
4425 assert_eq!(
4426 composed.as_deref(),
4427 Some("[IMAGE:data:image/png;base64,aaaa]")
4428 );
4429 }
4430
4431 #[test]
4432 fn parse_slack_permalink_accepts_standard_archives_link() {
4433 let parsed = SlackChannel::parse_slack_permalink(
4434 "https://acme.slack.com/archives/C12345678/p1712345678901234",
4435 )
4436 .expect("permalink");
4437
4438 assert_eq!(parsed.channel_id, "C12345678");
4439 assert_eq!(parsed.message_ts, "1712345678.901234");
4440 assert_eq!(parsed.thread_ts_hint, None);
4441 }
4442
4443 #[test]
4444 fn parse_slack_permalink_reads_thread_hint_when_present() {
4445 let parsed = SlackChannel::parse_slack_permalink(
4446 "https://acme.slack.com/archives/C12345678/p1712345678901234?thread_ts=1712345600.000100&cid=C12345678",
4447 )
4448 .expect("permalink");
4449
4450 assert_eq!(parsed.thread_ts_hint.as_deref(), Some("1712345600.000100"));
4451 }
4452
4453 #[test]
4454 fn parse_slack_permalink_rejects_non_message_links() {
4455 assert!(SlackChannel::parse_slack_permalink("https://example.com/path").is_none());
4456 assert!(
4457 SlackChannel::parse_slack_permalink("https://acme.slack.com/client/T1/C1").is_none()
4458 );
4459 assert!(
4460 SlackChannel::parse_slack_permalink("https://acme.slack.com/archives/C1/not-a-message")
4461 .is_none()
4462 );
4463 }
4464
4465 #[test]
4466 fn extract_slack_permalinks_handles_slack_angle_bracket_format() {
4467 let permalinks = SlackChannel::extract_slack_permalinks(
4468 "Please inspect <https://acme.slack.com/archives/C123/p1712345678901234|message> now",
4469 );
4470
4471 assert_eq!(permalinks.len(), 1);
4472 assert_eq!(permalinks[0].channel_id, "C123");
4473 assert_eq!(permalinks[0].message_ts, "1712345678.901234");
4474 }
4475
4476 #[test]
4477 fn extract_slack_permalinks_deduplicates_message_targets() {
4478 let permalinks = SlackChannel::extract_slack_permalinks(
4479 "https://acme.slack.com/archives/C123/p1712345678901234 again <https://acme.slack.com/archives/C123/p1712345678901234|same>",
4480 );
4481
4482 assert_eq!(permalinks.len(), 1);
4483 }
4484
4485 #[test]
4486 fn message_subtype_support_allows_file_share() {
4487 assert!(SlackChannel::is_supported_message_subtype(None));
4488 assert!(SlackChannel::is_supported_message_subtype(Some(
4489 "file_share"
4490 )));
4491 assert!(SlackChannel::is_supported_message_subtype(Some(
4492 "thread_broadcast"
4493 )));
4494 assert!(!SlackChannel::is_supported_message_subtype(Some(
4495 "message_changed"
4496 )));
4497 assert!(!SlackChannel::is_supported_message_subtype(Some(
4498 "channel_join"
4499 )));
4500 }
4501
4502 #[test]
4503 fn file_text_preview_prefers_preview_field() {
4504 let file = serde_json::json!({
4505 "preview": "line 1\nline 2",
4506 "preview_highlight": "ignored"
4507 });
4508 assert_eq!(
4509 SlackChannel::file_text_preview(&file).as_deref(),
4510 Some("line 1\nline 2")
4511 );
4512 }
4513
4514 #[test]
4515 fn is_image_file_detects_mimetype_or_extension() {
4516 let from_mime = serde_json::json!({"mimetype":"image/png"});
4517 let from_ext = serde_json::json!({"name":"photo.jpeg"});
4518 let non_image = serde_json::json!({"name":"notes.txt","mimetype":"text/plain"});
4519 assert!(SlackChannel::is_image_file(&from_mime));
4520 assert!(SlackChannel::is_image_file(&from_ext));
4521 assert!(!SlackChannel::is_image_file(&non_image));
4522 }
4523
4524 #[test]
4525 fn detect_image_mime_rejects_non_image_bytes_despite_image_metadata() {
4526 let file = serde_json::json!({"mimetype":"image/png","name":"wow.png"});
4527 let html_bytes = b"<!DOCTYPE html><html><body>login required</body></html>";
4528 assert_eq!(
4529 SlackChannel::detect_image_mime(
4530 Some("image/png"),
4531 &file,
4532 html_bytes,
4533 "https://files.slack.com/files-pri/T1/F2/wow.png"
4534 ),
4535 None
4536 );
4537 }
4538
4539 #[test]
4540 fn detect_image_mime_prefers_magic_bytes_over_misleading_metadata() {
4541 let file = serde_json::json!({"mimetype":"image/bmp","name":"wow.png"});
4542 let png_header = [0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n'];
4543 assert_eq!(
4544 SlackChannel::detect_image_mime(
4545 Some("image/bmp"),
4546 &file,
4547 &png_header,
4548 "https://files.slack.com/files-pri/T1/F2/wow.png"
4549 )
4550 .as_deref(),
4551 Some("image/png")
4552 );
4553 }
4554
4555 #[test]
4556 fn is_probably_text_file_accepts_snippet_mode() {
4557 let snippet = serde_json::json!({"mode":"snippet"});
4558 let plain = serde_json::json!({"mimetype":"text/plain"});
4559 let binary = serde_json::json!({"mimetype":"application/octet-stream","name":"a.bin"});
4560 assert!(SlackChannel::is_probably_text_file(&snippet));
4561 assert!(SlackChannel::is_probably_text_file(&plain));
4562 assert!(!SlackChannel::is_probably_text_file(&binary));
4563 }
4564
4565 #[test]
4566 fn sanitize_attachment_filename_strips_path_traversal() {
4567 assert_eq!(
4568 SlackChannel::sanitize_attachment_filename("../../secret.txt").as_deref(),
4569 Some("secret.txt")
4570 );
4571 assert_eq!(
4572 SlackChannel::sanitize_attachment_filename(r"..\\..\\secret.txt").as_deref(),
4573 Some("..__..__secret.txt")
4574 );
4575 assert!(SlackChannel::sanitize_attachment_filename("..").is_none());
4576 }
4577
4578 #[test]
4579 fn ensure_file_extension_appends_when_missing() {
4580 assert_eq!(
4581 SlackChannel::ensure_file_extension("capture", "png"),
4582 "capture.png"
4583 );
4584 assert_eq!(
4585 SlackChannel::ensure_file_extension("capture.jpeg", "png"),
4586 "capture.jpeg"
4587 );
4588 }
4589
4590 #[test]
4591 fn is_allowed_slack_media_hostname_matches_suffixes() {
4592 assert!(SlackChannel::is_allowed_slack_media_hostname(
4593 "files.slack.com"
4594 ));
4595 assert!(SlackChannel::is_allowed_slack_media_hostname(
4596 "downloads.slack-edge.com"
4597 ));
4598 assert!(SlackChannel::is_allowed_slack_media_hostname(
4599 "foo.slack-files.com"
4600 ));
4601 assert!(!SlackChannel::is_allowed_slack_media_hostname(
4602 "example.com"
4603 ));
4604 }
4605
4606 #[test]
4607 fn validate_slack_private_file_url_rejects_invalid_schemes_and_hosts() {
4608 assert!(
4609 SlackChannel::validate_slack_private_file_url("https://files.slack.com/f").is_some()
4610 );
4611 assert!(
4612 SlackChannel::validate_slack_private_file_url("http://files.slack.com/f").is_none()
4613 );
4614 assert!(SlackChannel::validate_slack_private_file_url("https://example.com/f").is_none());
4615 assert!(SlackChannel::validate_slack_private_file_url("not a url").is_none());
4616 }
4617
4618 #[test]
4619 fn resolve_https_redirect_target_enforces_https() {
4620 let base = reqwest::Url::parse("https://files.slack.com/path/file").unwrap();
4621 let ok = SlackChannel::resolve_https_redirect_target(&base, "/next");
4622 assert_eq!(
4623 ok.as_ref().map(|url| url.as_str()),
4624 Some("https://files.slack.com/next")
4625 );
4626
4627 let rejected =
4628 SlackChannel::resolve_https_redirect_target(&base, "http://files.slack.com/next");
4629 assert!(rejected.is_none());
4630
4631 let rejected_host =
4632 SlackChannel::resolve_https_redirect_target(&base, "https://example.com/next");
4633 assert!(rejected_host.is_none());
4634 }
4635
4636 #[test]
4637 fn redact_slack_url_hides_query_fragments() {
4638 let url = reqwest::Url::parse(
4639 "https://files.slack.com/files-pri/T1/F2/wow.png?token=secret#fragment",
4640 )
4641 .unwrap();
4642 let redacted = SlackChannel::redact_slack_url(&url);
4643 assert_eq!(redacted, "files.slack.com/.../wow.png");
4644 assert!(!redacted.contains('?'));
4645 assert!(!redacted.contains("token="));
4646 assert!(!redacted.contains('#'));
4647 }
4648
4649 #[test]
4650 fn redact_redirect_location_keeps_only_relative_tail() {
4651 let redacted =
4652 SlackChannel::redact_redirect_location("/files-pri/T1/F2/wow.png?token=secret");
4653 assert_eq!(redacted, "relative/.../wow.png");
4654 assert!(!redacted.contains("token="));
4655 }
4656
4657 #[tokio::test]
4658 async fn resolve_workspace_attachment_output_path_stays_in_workspace() {
4659 let workspace = tempfile::tempdir().unwrap();
4660 let output =
4661 SlackChannel::resolve_workspace_attachment_output_path(workspace.path(), "capture.png")
4662 .await
4663 .unwrap();
4664
4665 let root = tokio::fs::canonicalize(workspace.path()).await.unwrap();
4666 assert!(output.starts_with(&root));
4667 assert!(output.to_string_lossy().contains("slack_files"));
4668 }
4669
4670 #[tokio::test]
4671 async fn persist_image_attachment_writes_bytes_without_part_leftovers() {
4672 let workspace = tempfile::tempdir().unwrap();
4673 let channel = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
4674 .with_workspace_dir(workspace.path().to_path_buf());
4675 let file = serde_json::json!({"id":"F1","name":"wow.png"});
4676 let png_bytes = vec![
4677 0x89, b'P', b'N', b'G', b'\r', b'\n', 0x1a, b'\n', 0x00, 0x01, 0x02, 0x03,
4678 ];
4679
4680 let output = channel
4681 .persist_image_attachment(&file, "wow.png", "image/png", &png_bytes)
4682 .await
4683 .expect("attachment path");
4684 let stored = tokio::fs::read(&output).await.expect("stored bytes");
4685 assert_eq!(stored, png_bytes);
4686
4687 let save_dir = output.parent().unwrap();
4688 let mut entries = tokio::fs::read_dir(save_dir).await.unwrap();
4689 while let Some(entry) = entries.next_entry().await.unwrap() {
4690 let name = entry.file_name().to_string_lossy().to_string();
4691 assert!(
4692 !name.ends_with(".part"),
4693 "unexpected temp artifact left behind: {name}"
4694 );
4695 }
4696 }
4697
4698 #[test]
4699 fn evaluate_health_enforces_socket_mode_probe_when_enabled() {
4700 assert!(!SlackChannel::evaluate_health(false, false, true));
4701 assert!(!SlackChannel::evaluate_health(false, true, true));
4702 assert!(SlackChannel::evaluate_health(true, false, false));
4703 assert!(SlackChannel::evaluate_health(true, false, true));
4704 assert!(!SlackChannel::evaluate_health(true, true, false));
4705 assert!(SlackChannel::evaluate_health(true, true, true));
4706 }
4707
4708 #[test]
4709 fn slack_api_call_succeeded_requires_ok_true_in_body() {
4710 assert!(!SlackChannel::slack_api_call_succeeded(
4711 reqwest::StatusCode::OK,
4712 r#"{"ok":false,"error":"invalid_auth"}"#
4713 ));
4714 }
4715
4716 #[test]
4717 fn slack_api_call_succeeded_accepts_ok_true() {
4718 assert!(SlackChannel::slack_api_call_succeeded(
4719 reqwest::StatusCode::OK,
4720 r#"{"ok":true}"#
4721 ));
4722 }
4723
4724 #[test]
4725 fn specific_allowlist_filters() {
4726 let ch = SlackChannel::new(
4727 "xoxb-fake".into(),
4728 None,
4729 None,
4730 vec![],
4731 vec!["U111".into(), "U222".into()],
4732 );
4733 assert!(ch.is_user_allowed("U111"));
4734 assert!(ch.is_user_allowed("U222"));
4735 assert!(!ch.is_user_allowed("U333"));
4736 }
4737
4738 #[test]
4739 fn allowlist_exact_match_not_substring() {
4740 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["U111".into()]);
4741 assert!(!ch.is_user_allowed("U1111"));
4742 assert!(!ch.is_user_allowed("U11"));
4743 }
4744
4745 #[test]
4746 fn allowlist_empty_user_id() {
4747 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["U111".into()]);
4748 assert!(!ch.is_user_allowed(""));
4749 }
4750
4751 #[test]
4752 fn allowlist_case_sensitive() {
4753 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["U111".into()]);
4754 assert!(ch.is_user_allowed("U111"));
4755 assert!(!ch.is_user_allowed("u111"));
4756 }
4757
4758 #[test]
4759 fn allowlist_wildcard_and_specific() {
4760 let ch = SlackChannel::new(
4761 "xoxb-fake".into(),
4762 None,
4763 None,
4764 vec![],
4765 vec!["U111".into(), "*".into()],
4766 );
4767 assert!(ch.is_user_allowed("U111"));
4768 assert!(ch.is_user_allowed("anyone"));
4769 }
4770
4771 #[test]
4774 fn slack_message_id_format_includes_channel_and_ts() {
4775 let ts = "1234567890.123456";
4777 let channel_id = "C12345";
4778 let expected_id = format!("slack_{channel_id}_{ts}");
4779 assert_eq!(expected_id, "slack_C12345_1234567890.123456");
4780 }
4781
4782 #[test]
4783 fn slack_message_id_is_deterministic() {
4784 let ts = "1234567890.123456";
4786 let channel_id = "C12345";
4787 let id1 = format!("slack_{channel_id}_{ts}");
4788 let id2 = format!("slack_{channel_id}_{ts}");
4789 assert_eq!(id1, id2);
4790 }
4791
4792 #[test]
4793 fn slack_message_id_different_ts_different_id() {
4794 let channel_id = "C12345";
4796 let id1 = format!("slack_{channel_id}_1234567890.123456");
4797 let id2 = format!("slack_{channel_id}_1234567890.123457");
4798 assert_ne!(id1, id2);
4799 }
4800
4801 #[test]
4802 fn slack_message_id_different_channel_different_id() {
4803 let ts = "1234567890.123456";
4805 let id1 = format!("slack_C12345_{ts}");
4806 let id2 = format!("slack_C67890_{ts}");
4807 assert_ne!(id1, id2);
4808 }
4809
4810 #[test]
4811 fn slack_message_id_no_uuid_randomness() {
4812 let ts = "1234567890.123456";
4814 let channel_id = "C12345";
4815 let id = format!("slack_{channel_id}_{ts}");
4816 assert!(!id.contains('-')); assert!(id.starts_with("slack_"));
4818 }
4819
4820 #[test]
4821 fn inbound_thread_ts_prefers_explicit_thread_ts() {
4822 let msg = serde_json::json!({
4823 "ts": "123.002",
4824 "thread_ts": "123.001"
4825 });
4826
4827 let thread_ts = SlackChannel::inbound_thread_ts(&msg, "123.002");
4828 assert_eq!(thread_ts.as_deref(), Some("123.001"));
4829 }
4830
4831 #[test]
4832 fn inbound_thread_ts_falls_back_to_ts() {
4833 let msg = serde_json::json!({
4834 "ts": "123.001"
4835 });
4836
4837 let thread_ts = SlackChannel::inbound_thread_ts(&msg, "123.001");
4838 assert_eq!(thread_ts.as_deref(), Some("123.001"));
4839 }
4840
4841 #[test]
4842 fn inbound_thread_ts_none_when_ts_missing() {
4843 let msg = serde_json::json!({});
4844
4845 let thread_ts = SlackChannel::inbound_thread_ts(&msg, "");
4846 assert_eq!(thread_ts, None);
4847 }
4848
4849 #[test]
4850 fn ensure_poll_cursor_bootstraps_new_channel() {
4851 let mut cursors = HashMap::new();
4852 let now_ts = "1700000000.123456";
4853
4854 let cursor = SlackChannel::ensure_poll_cursor(&mut cursors, "C123", now_ts);
4855 assert_eq!(cursor, now_ts);
4856 assert_eq!(cursors.get("C123").map(String::as_str), Some(now_ts));
4857 }
4858
4859 #[test]
4860 fn ensure_poll_cursor_keeps_existing_cursor() {
4861 let mut cursors = HashMap::from([("C123".to_string(), "1700000000.000001".to_string())]);
4862 let cursor = SlackChannel::ensure_poll_cursor(&mut cursors, "C123", "9999999999.999999");
4863
4864 assert_eq!(cursor, "1700000000.000001");
4865 assert_eq!(
4866 cursors.get("C123").map(String::as_str),
4867 Some("1700000000.000001")
4868 );
4869 }
4870
4871 #[test]
4872 fn parse_retry_after_value_accepts_integer_seconds() {
4873 assert_eq!(SlackChannel::parse_retry_after_value("30"), Some(30));
4874 }
4875
4876 #[test]
4877 fn parse_retry_after_value_accepts_decimal_seconds() {
4878 assert_eq!(SlackChannel::parse_retry_after_value("2.9"), Some(2));
4879 }
4880
4881 #[test]
4882 fn parse_retry_after_value_rejects_non_numeric_values() {
4883 assert_eq!(SlackChannel::parse_retry_after_value("later"), None);
4884 assert_eq!(SlackChannel::parse_retry_after_value(""), None);
4885 }
4886
4887 #[test]
4888 fn parse_retry_after_secs_reads_header_value() {
4889 let mut headers = HeaderMap::new();
4890 headers.insert(reqwest::header::RETRY_AFTER, "45".parse().unwrap());
4891 assert_eq!(SlackChannel::parse_retry_after_secs(&headers), Some(45));
4892 }
4893
4894 #[test]
4895 fn compute_retry_delay_applies_backoff_and_jitter_with_cap() {
4896 let delay = SlackChannel::compute_retry_delay(30, 3, 250);
4897 assert_eq!(delay, Duration::from_secs(120) + Duration::from_millis(250));
4898 }
4899
4900 #[test]
4903 fn extract_active_threads_finds_thread_parents_with_replies() {
4904 let messages = vec![
4905 serde_json::json!({
4906 "ts": "100.000",
4907 "thread_ts": "100.000",
4908 "reply_count": 3,
4909 "latest_reply": "103.000"
4910 }),
4911 serde_json::json!({
4912 "ts": "200.000",
4913 "text": "no thread"
4914 }),
4915 serde_json::json!({
4916 "ts": "300.000",
4917 "thread_ts": "300.000",
4918 "reply_count": 0
4919 }),
4920 ];
4921
4922 let threads = SlackChannel::extract_active_threads(&messages);
4923 assert_eq!(threads.len(), 1);
4924 assert_eq!(threads[0].0, "100.000");
4925 assert_eq!(threads[0].1, "103.000");
4926 }
4927
4928 #[test]
4929 fn extract_active_threads_ignores_reply_messages() {
4930 let messages = vec![serde_json::json!({
4932 "ts": "101.000",
4933 "thread_ts": "100.000",
4934 "text": "reply in thread"
4935 })];
4936
4937 let threads = SlackChannel::extract_active_threads(&messages);
4938 assert!(threads.is_empty());
4939 }
4940
4941 #[test]
4942 fn extract_active_threads_uses_thread_ts_as_fallback_latest_reply() {
4943 let messages = vec![serde_json::json!({
4944 "ts": "100.000",
4945 "thread_ts": "100.000",
4946 "reply_count": 1
4947 })];
4948
4949 let threads = SlackChannel::extract_active_threads(&messages);
4950 assert_eq!(threads.len(), 1);
4951 assert_eq!(threads[0].1, "100.000");
4952 }
4953
4954 #[test]
4955 fn evict_stale_threads_removes_expired_entries() {
4956 let mut threads: HashMap<String, (String, String, Instant)> = HashMap::new();
4957 let old = Instant::now()
4958 .checked_sub(Duration::from_secs(SLACK_POLL_THREAD_EXPIRE_SECS + 1))
4959 .unwrap();
4960 threads.insert(
4961 "old.thread".to_string(),
4962 ("C1".to_string(), "old.reply".to_string(), old),
4963 );
4964 threads.insert(
4965 "new.thread".to_string(),
4966 ("C1".to_string(), "new.reply".to_string(), Instant::now()),
4967 );
4968
4969 SlackChannel::evict_stale_threads(&mut threads, Instant::now());
4970 assert_eq!(threads.len(), 1);
4971 assert!(threads.contains_key("new.thread"));
4972 }
4973
4974 #[test]
4975 fn evict_stale_threads_trims_excess_by_oldest_key() {
4976 let mut threads: HashMap<String, (String, String, Instant)> = HashMap::new();
4977 let now = Instant::now();
4978 for i in 0..(SLACK_POLL_ACTIVE_THREAD_MAX + 5) {
4979 threads.insert(
4980 format!("{i:06}.000"),
4981 ("C1".to_string(), format!("{i:06}.001"), now),
4982 );
4983 }
4984
4985 SlackChannel::evict_stale_threads(&mut threads, now);
4986 assert_eq!(threads.len(), SLACK_POLL_ACTIVE_THREAD_MAX);
4987 }
4988
4989 #[test]
4990 fn is_supported_message_subtype_rejects_message_replied() {
4991 assert!(!SlackChannel::is_supported_message_subtype(Some(
4993 "message_replied"
4994 )));
4995 }
4996
4997 #[test]
4998 fn extract_slack_ts_from_standard_message_id() {
4999 assert_eq!(
5000 extract_slack_ts("slack_C1234567890_1234567890.123456"),
5001 "1234567890.123456"
5002 );
5003 }
5004
5005 #[test]
5006 fn extract_slack_ts_from_raw_ts_passthrough() {
5007 assert_eq!(extract_slack_ts("1234567890.123456"), "1234567890.123456");
5008 }
5009
5010 #[test]
5011 fn extract_slack_ts_from_unprefixed_id() {
5012 assert_eq!(extract_slack_ts("unknown_format"), "unknown_format");
5013 }
5014
5015 #[test]
5016 fn unicode_emoji_maps_to_slack_eyes() {
5017 assert_eq!(unicode_emoji_to_slack_name("\u{1F440}"), "eyes");
5018 }
5019
5020 #[test]
5021 fn unicode_emoji_maps_to_slack_check_mark() {
5022 assert_eq!(unicode_emoji_to_slack_name("\u{2705}"), "white_check_mark");
5023 }
5024
5025 #[test]
5026 fn unicode_emoji_maps_to_slack_warning() {
5027 assert_eq!(unicode_emoji_to_slack_name("\u{26A0}\u{FE0F}"), "warning");
5028 assert_eq!(unicode_emoji_to_slack_name("\u{26A0}"), "warning");
5029 }
5030
5031 #[test]
5032 fn unicode_emoji_colon_wrapped_passthrough() {
5033 assert_eq!(
5034 unicode_emoji_to_slack_name(":custom_emoji:"),
5035 "custom_emoji"
5036 );
5037 }
5038
5039 #[test]
5040 fn inbound_thread_ts_on_thread_reply_uses_thread_ts() {
5041 let reply = serde_json::json!({
5042 "ts": "200.000",
5043 "thread_ts": "100.000",
5044 "text": "a thread reply"
5045 });
5046 let thread_ts = SlackChannel::inbound_thread_ts(&reply, "200.000");
5047 assert_eq!(thread_ts.as_deref(), Some("100.000"));
5048 }
5049
5050 #[test]
5051 fn inbound_thread_ts_genuine_only_returns_none_for_top_level() {
5052 let msg = serde_json::json!({
5054 "ts": "100.000",
5055 "text": "hello"
5056 });
5057 assert_eq!(SlackChannel::inbound_thread_ts_genuine_only(&msg), None);
5058 }
5059
5060 #[test]
5061 fn inbound_thread_ts_genuine_only_returns_thread_ts_for_replies() {
5062 let reply = serde_json::json!({
5064 "ts": "200.000",
5065 "thread_ts": "100.000",
5066 "text": "a reply"
5067 });
5068 assert_eq!(
5069 SlackChannel::inbound_thread_ts_genuine_only(&reply).as_deref(),
5070 Some("100.000")
5071 );
5072 }
5073
5074 #[test]
5075 fn session_key_stable_without_thread_replies() {
5076 use crate::channels::traits::ChannelMessage;
5079
5080 let make_msg = |ts: &str| ChannelMessage {
5081 id: format!("slack_C123_{ts}"),
5082 sender: "U_alice".into(),
5083 reply_target: "C123".into(),
5084 content: "text".into(),
5085 channel: "slack".into(),
5086 timestamp: 0,
5087 thread_ts: None, interruption_scope_id: None,
5089 attachments: vec![],
5090 };
5091
5092 let msg1 = make_msg("100.000");
5093 let msg2 = make_msg("200.000");
5094
5095 let key1 = super::super::conversation_history_key(&msg1);
5096 let key2 = super::super::conversation_history_key(&msg2);
5097 assert_eq!(key1, key2, "session key should be stable across messages");
5098 }
5099
5100 #[test]
5101 fn session_key_varies_with_thread_replies() {
5102 use crate::channels::traits::ChannelMessage;
5105
5106 let make_msg = |ts: &str| ChannelMessage {
5107 id: format!("slack_C123_{ts}"),
5108 sender: "U_alice".into(),
5109 reply_target: "C123".into(),
5110 content: "text".into(),
5111 channel: "slack".into(),
5112 timestamp: 0,
5113 thread_ts: Some(ts.to_string()), interruption_scope_id: None,
5115 attachments: vec![],
5116 };
5117
5118 let msg1 = make_msg("100.000");
5119 let msg2 = make_msg("200.000");
5120
5121 let key1 = super::super::conversation_history_key(&msg1);
5122 let key2 = super::super::conversation_history_key(&msg2);
5123 assert_ne!(key1, key2, "session key should differ per thread");
5124 }
5125
5126 #[test]
5127 fn slack_send_uses_markdown_blocks() {
5128 let msg = SendMessage::new("**bold** and _italic_", "C123");
5129 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
5130
5131 let mut body = serde_json::json!({
5133 "channel": msg.recipient,
5134 "text": msg.content
5135 });
5136 if msg.content.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
5137 body["blocks"] = serde_json::json!([{
5138 "type": "markdown",
5139 "text": msg.content
5140 }]);
5141 }
5142
5143 let blocks = body["blocks"]
5145 .as_array()
5146 .expect("blocks should be an array");
5147 assert_eq!(blocks.len(), 1);
5148 assert_eq!(blocks[0]["type"], "markdown");
5149 assert_eq!(blocks[0]["text"], msg.content);
5150 assert_eq!(body["text"], msg.content);
5152 let _ = ch.name();
5154 }
5155
5156 #[test]
5157 fn slack_send_skips_markdown_blocks_for_long_content() {
5158 let long_content = "x".repeat(SLACK_MARKDOWN_BLOCK_MAX_CHARS + 1);
5159 let msg = SendMessage::new(long_content.clone(), "C123");
5160
5161 let mut body = serde_json::json!({
5162 "channel": msg.recipient,
5163 "text": msg.content
5164 });
5165 if msg.content.len() <= SLACK_MARKDOWN_BLOCK_MAX_CHARS {
5166 body["blocks"] = serde_json::json!([{
5167 "type": "markdown",
5168 "text": msg.content
5169 }]);
5170 }
5171
5172 assert!(
5173 body.get("blocks").is_none(),
5174 "blocks should not be set for oversized content"
5175 );
5176 }
5177
5178 #[tokio::test]
5179 async fn start_typing_requires_thread_context() {
5180 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
5181 let result = ch.start_typing("C999").await;
5183 assert!(
5184 result.is_ok(),
5185 "start_typing should succeed as no-op without thread context"
5186 );
5187 }
5188
5189 #[test]
5190 fn assistant_thread_tracking() {
5191 let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
5192
5193 {
5195 let map = ch.active_assistant_thread.lock().unwrap();
5196 assert!(map.is_empty());
5197 }
5198
5199 {
5201 let mut map = ch.active_assistant_thread.lock().unwrap();
5202 map.insert("C123".to_string(), "1741234567.000100".to_string());
5203 }
5204
5205 {
5207 let map = ch.active_assistant_thread.lock().unwrap();
5208 assert_eq!(map.get("C123"), Some(&"1741234567.000100".to_string()),);
5209 assert_eq!(map.get("C999"), None);
5210 }
5211 }
5212}