Skip to main content

hermes_bot/slack/
mod.rs

1mod api;
2mod formatting;
3pub(crate) mod handlers;
4
5use crate::agent::{Agent, AgentHandle};
6use crate::config::{AgentKind, Config};
7use slack_morphism::prelude::*;
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::{Mutex, oneshot};
11use tokio::time::Instant;
12use tracing::{info, warn};
13
14// Re-export public API.
15pub use api::{ensure_repo_channels, post_channel_message, post_thread_reply};
16pub use formatting::split_for_slack;
17pub use handlers::handle_slash_command;
18
19/// Slack's approximate max message length (characters) for `chat.postMessage`.
20/// Configurable via tuning.slack_max_message_chars, defaults to 39000.
21pub(crate) const SLACK_MAX_MESSAGE_CHARS: usize = 39_000;
22/// Slack's max text length for `chat.update` (the API enforces ~4 000 chars).
23/// This is a Slack API limitation and cannot be configured.
24pub(crate) const SLACK_UPDATE_MAX_CHARS: usize = 3_900;
25/// Max length of the fallback title (truncated at word boundary).
26const FALLBACK_TITLE_MAX_LEN: usize = 80;
27
28// ── Sub-structs ───────────────────────────────────────────────────────
29
30/// Slack API connectivity, identity, channel mapping, rate limiting, and event dedup.
31pub struct SlackContext {
32    pub token: SlackApiToken,
33    pub client: Arc<SlackHyperClient>,
34    pub bot_user_id: String,
35    pub repo_channels: tokio::sync::RwLock<HashMap<String, String>>,
36    pub(crate) rate_limiter: Mutex<HashMap<String, Instant>>,
37    pub(crate) seen_messages: Mutex<HashMap<String, Instant>>,
38}
39
40impl SlackContext {
41    /// Look up which repo a channel belongs to.
42    pub async fn repo_for_channel(&self, channel_id: &str) -> Option<String> {
43        let channels = self.repo_channels.read().await;
44        channels
45            .iter()
46            .find(|(_, cid)| cid.as_str() == channel_id)
47            .map(|(name, _)| name.clone())
48    }
49
50    /// Deduplicate a message event. Returns `true` if this is a duplicate.
51    pub async fn is_duplicate(&self, message_ts: &str) -> bool {
52        let mut seen = self.seen_messages.lock().await;
53        if seen.contains_key(message_ts) {
54            return true;
55        }
56        seen.insert(message_ts.to_string(), Instant::now());
57        // Prune entries older than 5 minutes to prevent unbounded growth.
58        if seen.len() > 100 {
59            let cutoff = std::time::Duration::from_secs(5 * 60);
60            seen.retain(|_, ts| ts.elapsed() < cutoff);
61        }
62        false
63    }
64
65    /// Wait if needed to respect Slack's per-channel rate limit before posting.
66    pub async fn rate_limit(&self, channel_id: &str, interval_ms: u64) {
67        let min_interval = std::time::Duration::from_millis(interval_ms);
68        let mut limiter = self.rate_limiter.lock().await;
69        if let Some(last) = limiter.get(channel_id) {
70            let elapsed = last.elapsed();
71            if elapsed < min_interval {
72                let wait = min_interval - elapsed;
73                drop(limiter); // Release lock while sleeping.
74                tokio::time::sleep(wait).await;
75                limiter = self.rate_limiter.lock().await;
76            }
77        }
78        limiter.insert(channel_id.to_string(), Instant::now());
79    }
80}
81
82/// A message queued while the agent is busy processing a turn.
83pub struct QueuedMessage {
84    pub text: String,
85    pub message_ts: String,
86}
87
88/// Per-thread runtime state (all keyed by `thread_ts`).
89pub struct ThreadState {
90    pub handles: Mutex<HashMap<String, AgentHandle>>,
91    pub kill_senders: Mutex<HashMap<String, oneshot::Sender<()>>>,
92    pub plans: Mutex<HashMap<String, String>>,
93    pub pending_answers: Mutex<HashMap<String, oneshot::Sender<String>>>,
94    pub pending_approvals: Mutex<HashMap<String, oneshot::Sender<bool>>>,
95    pub models: Mutex<HashMap<String, String>>,
96    pub in_progress: Mutex<HashSet<String>>,
97    pub queued_messages: Mutex<HashMap<String, Vec<QueuedMessage>>>,
98}
99
100impl ThreadState {
101    /// Remove all state for a thread (handles, kill_senders, plans, models,
102    /// pending_answers, pending_approvals, queued_messages). Does NOT touch `in_progress`.
103    pub async fn cleanup(&self, thread_ts: &str) {
104        self.handles.lock().await.remove(thread_ts);
105        self.kill_senders.lock().await.remove(thread_ts);
106        self.plans.lock().await.remove(thread_ts);
107        self.models.lock().await.remove(thread_ts);
108        self.queued_messages.lock().await.remove(thread_ts);
109        // Dropping the senders unblocks process_events → ChannelClosed.
110        self.pending_answers.lock().await.remove(thread_ts);
111        self.pending_approvals.lock().await.remove(thread_ts);
112    }
113
114    /// Drain kill_senders (sending kill signals), clear handles/plans/pending_answers.
115    pub async fn shutdown(&self) {
116        let mut kill_senders = self.kill_senders.lock().await;
117        let count = kill_senders.len();
118        for (thread_ts, kill_tx) in kill_senders.drain() {
119            let _ = kill_tx.send(());
120            info!("Killed agent for thread {}", thread_ts);
121        }
122        if count > 0 {
123            info!("Cleaned up {} agent process(es)", count);
124        }
125        self.handles.lock().await.clear();
126        self.plans.lock().await.clear();
127        self.pending_answers.lock().await.clear();
128        self.queued_messages.lock().await.clear();
129    }
130
131    /// Returns the number of active agent handles.
132    pub async fn active_count(&self) -> usize {
133        self.handles.lock().await.len()
134    }
135}
136
137/// Sync coordination guards for local CLI session sync.
138pub struct SyncGuard {
139    pub pending_repos: Mutex<HashMap<String, Instant>>,
140    pub pending_session_ids: Mutex<HashSet<String>>,
141}
142
143impl SyncGuard {
144    /// Atomically check both maps; inserts session_id if available.
145    /// Returns `true` if the claim succeeded.
146    pub async fn try_claim(&self, repo_name: &str, session_id: &str) -> bool {
147        let pending_repos = self.pending_repos.lock().await;
148        let mut pending_sessions = self.pending_session_ids.lock().await;
149
150        if pending_repos.contains_key(repo_name) || pending_sessions.contains(session_id) {
151            return false;
152        }
153
154        pending_sessions.insert(session_id.to_string());
155        true
156    }
157
158    /// Remove a session ID from pending_session_ids.
159    pub async fn release(&self, session_id: &str) {
160        self.pending_session_ids.lock().await.remove(session_id);
161    }
162
163    /// Remove repos that have been pending for >5 minutes.
164    pub async fn cleanup_stale(&self) {
165        const MAX_PENDING_DURATION: std::time::Duration = std::time::Duration::from_secs(5 * 60);
166
167        let mut pending = self.pending_repos.lock().await;
168        let before = pending.len();
169        pending.retain(|repo, timestamp| {
170            let elapsed = timestamp.elapsed();
171            if elapsed > MAX_PENDING_DURATION {
172                warn!(
173                    "Cleaning up stale pending repo '{}' (pending for {:?})",
174                    repo, elapsed
175                );
176                false
177            } else {
178                true
179            }
180        });
181        let cleaned = before - pending.len();
182        if cleaned > 0 {
183            info!("Cleaned up {} stale pending repo(s)", cleaned);
184        }
185    }
186}
187
188// ── AppState ──────────────────────────────────────────────────────────
189
190/// Shared application state passed to all handlers.
191#[derive(Clone)]
192pub struct AppState {
193    pub config: Arc<Config>,
194    pub agents: Arc<HashMap<AgentKind, Arc<dyn Agent>>>,
195    pub sessions: crate::session::SessionStore,
196    pub slack: Arc<SlackContext>,
197    pub threads: Arc<ThreadState>,
198    pub sync: Arc<SyncGuard>,
199}
200
201impl AppState {
202    pub fn is_allowed_user(&self, user_id: &str) -> bool {
203        self.config
204            .slack
205            .allowed_users
206            .contains(&user_id.to_string())
207    }
208
209    /// Atomically claim a new session for sync if:
210    /// 1. The session_id is not already owned by Hermes (in SessionStore or pending)
211    /// 2. The repo doesn't have an in-flight new message
212    ///
213    /// Returns true if the claim succeeded, false if already claimed.
214    /// On success, the session_id is added to pending_session_ids (caller must remove it later).
215    pub async fn try_claim_session_for_sync(&self, repo_name: &str, session_id: &str) -> bool {
216        // First check if session already exists (read-only, no lock held).
217        if self.sessions.has_session_id(session_id).await {
218            return false;
219        }
220
221        // Delegate to SyncGuard for the atomic check.
222        self.sync.try_claim(repo_name, session_id).await
223    }
224
225    /// Whether we're in "live" streaming mode (edit messages in real-time).
226    pub fn is_live_mode(&self) -> bool {
227        self.config.defaults.streaming_mode == crate::config::StreamingMode::Live
228    }
229
230    /// Resolve the model for a thread: thread override > repo config > global default.
231    pub async fn resolved_model(&self, repo_name: &str, thread_ts: Option<&str>) -> String {
232        if let Some(ts) = thread_ts
233            && let Some(m) = self.threads.models.lock().await.get(ts)
234        {
235            return m.clone();
236        }
237        match self.config.repos.get(repo_name) {
238            Some(repo) => repo.resolved_model(&self.config.defaults),
239            None => crate::config::DEFAULT_MODEL.to_string(),
240        }
241    }
242}
243
244/// Handle an incoming Slack message event.
245pub async fn handle_message(state: AppState, event: SlackMessageEvent) {
246    let channel_id = match &event.origin.channel {
247        Some(c) => c.to_string(),
248        None => return,
249    };
250
251    let user_id = match event.sender.user.as_ref() {
252        Some(u) => u.to_string(),
253        None => return,
254    };
255
256    // Ignore bot's own messages.
257    if user_id == state.slack.bot_user_id {
258        return;
259    }
260
261    // Only process regular user messages (ignore subtypes like channel_join, message_changed, etc.).
262    if let Some(ref subtype) = event.subtype {
263        info!(
264            "Ignoring message with subtype {:?} in channel {}",
265            subtype, channel_id
266        );
267        return;
268    }
269
270    // Check authorization.
271    if !state.is_allowed_user(&user_id) {
272        tracing::warn!(
273            "Unauthorized message from user {} in channel {}",
274            user_id,
275            channel_id
276        );
277        return;
278    }
279
280    let text = match event.content.as_ref().and_then(|c| c.text.as_ref()) {
281        Some(t) if !t.is_empty() => t.to_string(),
282        _ => {
283            info!(
284                "Ignoring message without text content from user {} in channel {}",
285                user_id, channel_id
286            );
287            return;
288        }
289    };
290
291    let message_ts = event.origin.ts.to_string();
292
293    // Deduplicate: Socket Mode can redeliver the same event if ack is slow.
294    if state.slack.is_duplicate(&message_ts).await {
295        info!("Ignoring duplicate event for message {}", message_ts);
296        return;
297    }
298
299    // Determine if this is a thread reply or a new top-level message.
300    if let Some(thread_ts) = event.origin.thread_ts.as_ref() {
301        let thread_ts = thread_ts.to_string();
302        info!(
303            "Thread reply from {} in channel {} (thread {}): {}",
304            user_id,
305            channel_id,
306            thread_ts,
307            &text[..crate::util::floor_char_boundary(
308                &text,
309                state.config.tuning.log_preview_max_len
310            )]
311        );
312        handlers::handle_thread_reply(state, channel_id, thread_ts, message_ts, text).await;
313    } else {
314        info!(
315            "New message from {} in channel {}: {}",
316            user_id,
317            channel_id,
318            &text[..crate::util::floor_char_boundary(
319                &text,
320                state.config.tuning.log_preview_max_len
321            )]
322        );
323        handlers::handle_new_message(state, channel_id, message_ts, user_id, text).await;
324    }
325}