Skip to main content

hermes_bot/slack/
mod.rs

1mod api;
2mod formatting;
3pub(crate) mod handlers;
4
5use crate::agent::{Agent, AgentHandle};
6use crate::config::{AgentKind, Config};
7use slack_morphism::prelude::*;
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::{oneshot, Mutex};
11use tokio::time::Instant;
12use tracing::{info, warn};
13
14// Re-export public API.
15pub use api::{ensure_repo_channels, post_channel_message, post_thread_reply};
16pub use formatting::split_for_slack;
17pub use handlers::handle_slash_command;
18
19/// Slack's approximate max message length (characters) for `chat.postMessage`.
20/// Configurable via tuning.slack_max_message_chars, defaults to 39000.
21pub(crate) const SLACK_MAX_MESSAGE_CHARS: usize = 39_000;
22/// Slack's max text length for `chat.update` (the API enforces ~4 000 chars).
23/// This is a Slack API limitation and cannot be configured.
24pub(crate) const SLACK_UPDATE_MAX_CHARS: usize = 3_900;
25/// Max length of the fallback title (truncated at word boundary).
26const FALLBACK_TITLE_MAX_LEN: usize = 80;
27
28/// Shared application state passed to all handlers.
29#[derive(Clone)]
30pub struct AppState {
31    pub config: Arc<Config>,
32    pub sessions: crate::session::SessionStore,
33    pub agents: Arc<HashMap<AgentKind, Arc<dyn Agent>>>,
34    pub bot_token: SlackApiToken,
35    pub slack_client: Arc<SlackHyperClient>,
36    /// Repo name → channel ID mapping (populated on startup).
37    pub repo_channels: Arc<tokio::sync::RwLock<HashMap<String, String>>>,
38    /// Bot's own user ID (to filter self-messages).
39    pub bot_user_id: Arc<String>,
40    /// Concurrency guard: set of thread_ts currently being processed.
41    pub in_progress: Arc<Mutex<HashSet<String>>>,
42    /// Repos with in-flight new message processing (prevents sync from duplicating threads).
43    /// Tracks repo name → timestamp when marked pending for cleanup.
44    pub pending_repos: Arc<Mutex<HashMap<String, Instant>>>,
45    /// Session IDs currently being claimed (not yet persisted to SessionStore).
46    /// Prevents race between sync and handle_new_message where both try to claim the same session.
47    pub pending_session_ids: Arc<Mutex<HashSet<String>>>,
48    /// Running agent processes: thread_ts → AgentHandle.
49    pub agent_handles: Arc<Mutex<HashMap<String, AgentHandle>>>,
50    /// Kill senders: thread_ts → oneshot kill signal.
51    /// Stored separately so `!stop` can always reach the kill signal
52    /// even when the handle is borrowed by a running task.
53    pub kill_senders: Arc<Mutex<HashMap<String, oneshot::Sender<()>>>>,
54    /// Last detected plan content per thread: thread_ts → plan text.
55    pub last_plan: Arc<Mutex<HashMap<String, String>>>,
56    /// Pending question answers: thread_ts → oneshot sender for the user's reply.
57    pub pending_answers: Arc<Mutex<HashMap<String, oneshot::Sender<String>>>>,
58    /// Pending tool approvals: thread_ts → oneshot sender for the user's allow/deny.
59    pub pending_tool_approvals: Arc<Mutex<HashMap<String, oneshot::Sender<bool>>>>,
60    /// Per-channel rate limiter: channel_id → last write time.
61    pub rate_limiter: Arc<Mutex<HashMap<String, Instant>>>,
62    /// Per-thread model overrides from `/model` command: thread_ts → model ID.
63    pub thread_models: Arc<Mutex<HashMap<String, String>>>,
64}
65
66impl AppState {
67    pub fn is_allowed_user(&self, user_id: &str) -> bool {
68        self.config
69            .slack
70            .allowed_users
71            .contains(&user_id.to_string())
72    }
73
74    /// Look up which repo a channel belongs to.
75    pub async fn repo_for_channel(&self, channel_id: &str) -> Option<String> {
76        let channels = self.repo_channels.read().await;
77        channels
78            .iter()
79            .find(|(_, cid)| cid.as_str() == channel_id)
80            .map(|(name, _)| name.clone())
81    }
82
83    /// Atomically claim a new session for sync if:
84    /// 1. The session_id is not already owned by Hermes (in SessionStore or pending)
85    /// 2. The repo doesn't have an in-flight new message
86    ///
87    /// Returns true if the claim succeeded, false if already claimed.
88    /// On success, the session_id is added to pending_session_ids (caller must remove it later).
89    pub async fn try_claim_session_for_sync(&self, repo_name: &str, session_id: &str) -> bool {
90        // First check if session already exists (read-only, no lock held).
91        if self.sessions.has_session_id(session_id).await {
92            return false;
93        }
94
95        // Now atomically check and claim using both locks together to prevent TOCTOU.
96        // We need both locks to ensure atomicity.
97        let pending_repos = self.pending_repos.lock().await;
98        let mut pending_sessions = self.pending_session_ids.lock().await;
99
100        // Check if repo is pending or session is pending.
101        if pending_repos.contains_key(repo_name) || pending_sessions.contains(session_id) {
102            return false;
103        }
104
105        // Both checks passed — claim the session.
106        pending_sessions.insert(session_id.to_string());
107        true
108    }
109
110    /// Release a claimed session ID (called after session is persisted to SessionStore).
111    pub async fn release_claimed_session(&self, session_id: &str) {
112        self.pending_session_ids.lock().await.remove(session_id);
113    }
114
115    /// Clean up repos that have been pending for too long (>5 minutes).
116    /// This handles cases where PendingRepoGuard drop fails silently.
117    pub async fn cleanup_stale_pending_repos(&self) {
118        const MAX_PENDING_DURATION: std::time::Duration = std::time::Duration::from_secs(5 * 60);
119
120        let mut pending = self.pending_repos.lock().await;
121        let before = pending.len();
122        pending.retain(|repo, timestamp| {
123            let elapsed = timestamp.elapsed();
124            if elapsed > MAX_PENDING_DURATION {
125                warn!(
126                    "Cleaning up stale pending repo '{}' (pending for {:?})",
127                    repo, elapsed
128                );
129                false
130            } else {
131                true
132            }
133        });
134        let cleaned = before - pending.len();
135        if cleaned > 0 {
136            info!("Cleaned up {} stale pending repo(s)", cleaned);
137        }
138    }
139
140    /// Whether we're in "live" streaming mode (edit messages in real-time).
141    pub fn is_live_mode(&self) -> bool {
142        self.config.defaults.streaming_mode == crate::config::StreamingMode::Live
143    }
144
145    /// Resolve the model for a thread: thread override > repo config > global default.
146    pub async fn resolved_model(&self, repo_name: &str, thread_ts: Option<&str>) -> String {
147        if let Some(ts) = thread_ts {
148            if let Some(m) = self.thread_models.lock().await.get(ts) {
149                return m.clone();
150            }
151        }
152        match self.config.repos.get(repo_name) {
153            Some(repo) => repo.resolved_model(&self.config.defaults),
154            None => crate::config::DEFAULT_MODEL.to_string(),
155        }
156    }
157
158    /// Wait if needed to respect Slack's per-channel rate limit before posting.
159    pub(crate) async fn rate_limit(&self, channel_id: &str) {
160        let min_interval =
161            std::time::Duration::from_millis(self.config.tuning.rate_limit_interval_ms);
162        let mut limiter = self.rate_limiter.lock().await;
163        if let Some(last) = limiter.get(channel_id) {
164            let elapsed = last.elapsed();
165            if elapsed < min_interval {
166                let wait = min_interval - elapsed;
167                drop(limiter); // Release lock while sleeping.
168                tokio::time::sleep(wait).await;
169                limiter = self.rate_limiter.lock().await;
170            }
171        }
172        limiter.insert(channel_id.to_string(), Instant::now());
173    }
174}
175
176/// Handle an incoming Slack message event.
177pub async fn handle_message(state: AppState, event: SlackMessageEvent) {
178    let channel_id = match &event.origin.channel {
179        Some(c) => c.to_string(),
180        None => return,
181    };
182
183    let user_id = match event.sender.user.as_ref() {
184        Some(u) => u.to_string(),
185        None => return,
186    };
187
188    // Ignore bot's own messages.
189    if user_id == state.bot_user_id.as_str() {
190        return;
191    }
192
193    // Only process regular user messages (ignore subtypes like channel_join, message_changed, etc.).
194    if let Some(ref subtype) = event.subtype {
195        info!(
196            "Ignoring message with subtype {:?} in channel {}",
197            subtype, channel_id
198        );
199        return;
200    }
201
202    // Check authorization.
203    if !state.is_allowed_user(&user_id) {
204        tracing::warn!(
205            "Unauthorized message from user {} in channel {}",
206            user_id,
207            channel_id
208        );
209        return;
210    }
211
212    let text = match event.content.as_ref().and_then(|c| c.text.as_ref()) {
213        Some(t) if !t.is_empty() => t.to_string(),
214        _ => {
215            info!(
216                "Ignoring message without text content from user {} in channel {}",
217                user_id, channel_id
218            );
219            return;
220        }
221    };
222
223    let message_ts = event.origin.ts.to_string();
224
225    // Determine if this is a thread reply or a new top-level message.
226    if let Some(thread_ts) = event.origin.thread_ts.as_ref() {
227        let thread_ts = thread_ts.to_string();
228        info!(
229            "Thread reply from {} in channel {} (thread {}): {}",
230            user_id,
231            channel_id,
232            thread_ts,
233            &text[..crate::util::floor_char_boundary(
234                &text,
235                state.config.tuning.log_preview_max_len
236            )]
237        );
238        handlers::handle_thread_reply(state, channel_id, thread_ts, message_ts, text).await;
239    } else {
240        info!(
241            "New message from {} in channel {}: {}",
242            user_id,
243            channel_id,
244            &text[..crate::util::floor_char_boundary(
245                &text,
246                state.config.tuning.log_preview_max_len
247            )]
248        );
249        handlers::handle_new_message(state, channel_id, message_ts, user_id, text).await;
250    }
251}