1mod api;
2mod formatting;
3pub(crate) mod handlers;
4
5use crate::agent::{Agent, AgentHandle};
6use crate::config::{AgentKind, Config};
7use slack_morphism::prelude::*;
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::{Mutex, oneshot};
11use tokio::time::Instant;
12use tracing::{info, warn};
13
14pub use api::{ensure_repo_channels, post_channel_message, post_thread_reply};
16pub use formatting::split_for_slack;
17pub use handlers::handle_slash_command;
18
19pub(crate) const SLACK_MAX_MESSAGE_CHARS: usize = 39_000;
22pub(crate) const SLACK_UPDATE_MAX_CHARS: usize = 3_900;
25const FALLBACK_TITLE_MAX_LEN: usize = 80;
27
28pub struct SlackContext {
32 pub token: SlackApiToken,
33 pub client: Arc<SlackHyperClient>,
34 pub bot_user_id: String,
35 pub repo_channels: tokio::sync::RwLock<HashMap<String, String>>,
36 pub(crate) rate_limiter: Mutex<HashMap<String, Instant>>,
37 pub(crate) seen_messages: Mutex<HashMap<String, Instant>>,
38}
39
40impl SlackContext {
41 pub async fn repo_for_channel(&self, channel_id: &str) -> Option<String> {
43 let channels = self.repo_channels.read().await;
44 channels
45 .iter()
46 .find(|(_, cid)| cid.as_str() == channel_id)
47 .map(|(name, _)| name.clone())
48 }
49
50 pub async fn is_duplicate(&self, message_ts: &str) -> bool {
52 let mut seen = self.seen_messages.lock().await;
53 if seen.contains_key(message_ts) {
54 return true;
55 }
56 seen.insert(message_ts.to_string(), Instant::now());
57 if seen.len() > 100 {
59 let cutoff = std::time::Duration::from_secs(5 * 60);
60 seen.retain(|_, ts| ts.elapsed() < cutoff);
61 }
62 false
63 }
64
65 pub async fn rate_limit(&self, channel_id: &str, interval_ms: u64) {
67 let min_interval = std::time::Duration::from_millis(interval_ms);
68 let mut limiter = self.rate_limiter.lock().await;
69 if let Some(last) = limiter.get(channel_id) {
70 let elapsed = last.elapsed();
71 if elapsed < min_interval {
72 let wait = min_interval - elapsed;
73 drop(limiter); tokio::time::sleep(wait).await;
75 limiter = self.rate_limiter.lock().await;
76 }
77 }
78 limiter.insert(channel_id.to_string(), Instant::now());
79 }
80}
81
82pub struct QueuedMessage {
84 pub text: String,
85 pub message_ts: String,
86}
87
88pub struct ThreadState {
90 pub handles: Mutex<HashMap<String, AgentHandle>>,
91 pub kill_senders: Mutex<HashMap<String, oneshot::Sender<()>>>,
92 pub plans: Mutex<HashMap<String, String>>,
93 pub pending_answers: Mutex<HashMap<String, oneshot::Sender<String>>>,
94 pub pending_approvals: Mutex<HashMap<String, oneshot::Sender<bool>>>,
95 pub models: Mutex<HashMap<String, String>>,
96 pub in_progress: Mutex<HashSet<String>>,
97 pub queued_messages: Mutex<HashMap<String, Vec<QueuedMessage>>>,
98}
99
100impl ThreadState {
101 pub async fn cleanup(&self, thread_ts: &str) {
104 self.handles.lock().await.remove(thread_ts);
105 self.kill_senders.lock().await.remove(thread_ts);
106 self.plans.lock().await.remove(thread_ts);
107 self.models.lock().await.remove(thread_ts);
108 self.queued_messages.lock().await.remove(thread_ts);
109 self.pending_answers.lock().await.remove(thread_ts);
111 self.pending_approvals.lock().await.remove(thread_ts);
112 }
113
114 pub async fn shutdown(&self) {
116 let mut kill_senders = self.kill_senders.lock().await;
117 let count = kill_senders.len();
118 for (thread_ts, kill_tx) in kill_senders.drain() {
119 let _ = kill_tx.send(());
120 info!("Killed agent for thread {}", thread_ts);
121 }
122 if count > 0 {
123 info!("Cleaned up {} agent process(es)", count);
124 }
125 self.handles.lock().await.clear();
126 self.plans.lock().await.clear();
127 self.pending_answers.lock().await.clear();
128 self.queued_messages.lock().await.clear();
129 }
130
131 pub async fn active_count(&self) -> usize {
133 self.handles.lock().await.len()
134 }
135}
136
137pub struct SyncGuard {
139 pub pending_repos: Mutex<HashMap<String, Instant>>,
140 pub pending_session_ids: Mutex<HashSet<String>>,
141}
142
143impl SyncGuard {
144 pub async fn try_claim(&self, repo_name: &str, session_id: &str) -> bool {
147 let pending_repos = self.pending_repos.lock().await;
148 let mut pending_sessions = self.pending_session_ids.lock().await;
149
150 if pending_repos.contains_key(repo_name) || pending_sessions.contains(session_id) {
151 return false;
152 }
153
154 pending_sessions.insert(session_id.to_string());
155 true
156 }
157
158 pub async fn release(&self, session_id: &str) {
160 self.pending_session_ids.lock().await.remove(session_id);
161 }
162
163 pub async fn cleanup_stale(&self) {
165 const MAX_PENDING_DURATION: std::time::Duration = std::time::Duration::from_secs(5 * 60);
166
167 let mut pending = self.pending_repos.lock().await;
168 let before = pending.len();
169 pending.retain(|repo, timestamp| {
170 let elapsed = timestamp.elapsed();
171 if elapsed > MAX_PENDING_DURATION {
172 warn!(
173 "Cleaning up stale pending repo '{}' (pending for {:?})",
174 repo, elapsed
175 );
176 false
177 } else {
178 true
179 }
180 });
181 let cleaned = before - pending.len();
182 if cleaned > 0 {
183 info!("Cleaned up {} stale pending repo(s)", cleaned);
184 }
185 }
186}
187
188#[derive(Clone)]
192pub struct AppState {
193 pub config: Arc<Config>,
194 pub agents: Arc<HashMap<AgentKind, Arc<dyn Agent>>>,
195 pub sessions: crate::session::SessionStore,
196 pub slack: Arc<SlackContext>,
197 pub threads: Arc<ThreadState>,
198 pub sync: Arc<SyncGuard>,
199}
200
201impl AppState {
202 pub fn is_allowed_user(&self, user_id: &str) -> bool {
203 self.config
204 .slack
205 .allowed_users
206 .contains(&user_id.to_string())
207 }
208
209 pub async fn try_claim_session_for_sync(&self, repo_name: &str, session_id: &str) -> bool {
216 if self.sessions.has_session_id(session_id).await {
218 return false;
219 }
220
221 self.sync.try_claim(repo_name, session_id).await
223 }
224
225 pub fn is_live_mode(&self) -> bool {
227 self.config.defaults.streaming_mode == crate::config::StreamingMode::Live
228 }
229
230 pub async fn resolved_model(&self, repo_name: &str, thread_ts: Option<&str>) -> String {
232 if let Some(ts) = thread_ts
233 && let Some(m) = self.threads.models.lock().await.get(ts)
234 {
235 return m.clone();
236 }
237 match self.config.repos.get(repo_name) {
238 Some(repo) => repo.resolved_model(&self.config.defaults),
239 None => crate::config::DEFAULT_MODEL.to_string(),
240 }
241 }
242}
243
244pub async fn handle_message(state: AppState, event: SlackMessageEvent) {
246 let channel_id = match &event.origin.channel {
247 Some(c) => c.to_string(),
248 None => return,
249 };
250
251 let user_id = match event.sender.user.as_ref() {
252 Some(u) => u.to_string(),
253 None => return,
254 };
255
256 if user_id == state.slack.bot_user_id {
258 return;
259 }
260
261 if let Some(ref subtype) = event.subtype {
263 info!(
264 "Ignoring message with subtype {:?} in channel {}",
265 subtype, channel_id
266 );
267 return;
268 }
269
270 if !state.is_allowed_user(&user_id) {
272 tracing::warn!(
273 "Unauthorized message from user {} in channel {}",
274 user_id,
275 channel_id
276 );
277 return;
278 }
279
280 let text = match event.content.as_ref().and_then(|c| c.text.as_ref()) {
281 Some(t) if !t.is_empty() => t.to_string(),
282 _ => {
283 info!(
284 "Ignoring message without text content from user {} in channel {}",
285 user_id, channel_id
286 );
287 return;
288 }
289 };
290
291 let message_ts = event.origin.ts.to_string();
292
293 if state.slack.is_duplicate(&message_ts).await {
295 info!("Ignoring duplicate event for message {}", message_ts);
296 return;
297 }
298
299 if let Some(thread_ts) = event.origin.thread_ts.as_ref() {
301 let thread_ts = thread_ts.to_string();
302 info!(
303 "Thread reply from {} in channel {} (thread {}): {}",
304 user_id,
305 channel_id,
306 thread_ts,
307 &text[..crate::util::floor_char_boundary(
308 &text,
309 state.config.tuning.log_preview_max_len
310 )]
311 );
312 handlers::handle_thread_reply(state, channel_id, thread_ts, message_ts, text).await;
313 } else {
314 info!(
315 "New message from {} in channel {}: {}",
316 user_id,
317 channel_id,
318 &text[..crate::util::floor_char_boundary(
319 &text,
320 state.config.tuning.log_preview_max_len
321 )]
322 );
323 handlers::handle_new_message(state, channel_id, message_ts, user_id, text).await;
324 }
325}