1mod api;
2mod formatting;
3pub(crate) mod handlers;
4
5use crate::agent::{Agent, AgentHandle};
6use crate::config::{AgentKind, Config};
7use slack_morphism::prelude::*;
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use tokio::sync::{oneshot, Mutex};
11use tokio::time::Instant;
12use tracing::{info, warn};
13
14pub use api::{ensure_repo_channels, post_channel_message, post_thread_reply};
16pub use formatting::split_for_slack;
17pub use handlers::handle_slash_command;
18
19pub(crate) const SLACK_MAX_MESSAGE_CHARS: usize = 39_000;
22pub(crate) const SLACK_UPDATE_MAX_CHARS: usize = 3_900;
25const FALLBACK_TITLE_MAX_LEN: usize = 80;
27
28#[derive(Clone)]
30pub struct AppState {
31 pub config: Arc<Config>,
32 pub sessions: crate::session::SessionStore,
33 pub agents: Arc<HashMap<AgentKind, Arc<dyn Agent>>>,
34 pub bot_token: SlackApiToken,
35 pub slack_client: Arc<SlackHyperClient>,
36 pub repo_channels: Arc<tokio::sync::RwLock<HashMap<String, String>>>,
38 pub bot_user_id: Arc<String>,
40 pub in_progress: Arc<Mutex<HashSet<String>>>,
42 pub pending_repos: Arc<Mutex<HashMap<String, Instant>>>,
45 pub pending_session_ids: Arc<Mutex<HashSet<String>>>,
48 pub agent_handles: Arc<Mutex<HashMap<String, AgentHandle>>>,
50 pub kill_senders: Arc<Mutex<HashMap<String, oneshot::Sender<()>>>>,
54 pub last_plan: Arc<Mutex<HashMap<String, String>>>,
56 pub pending_answers: Arc<Mutex<HashMap<String, oneshot::Sender<String>>>>,
58 pub pending_tool_approvals: Arc<Mutex<HashMap<String, oneshot::Sender<bool>>>>,
60 pub rate_limiter: Arc<Mutex<HashMap<String, Instant>>>,
62 pub thread_models: Arc<Mutex<HashMap<String, String>>>,
64}
65
66impl AppState {
67 pub fn is_allowed_user(&self, user_id: &str) -> bool {
68 self.config
69 .slack
70 .allowed_users
71 .contains(&user_id.to_string())
72 }
73
74 pub async fn repo_for_channel(&self, channel_id: &str) -> Option<String> {
76 let channels = self.repo_channels.read().await;
77 channels
78 .iter()
79 .find(|(_, cid)| cid.as_str() == channel_id)
80 .map(|(name, _)| name.clone())
81 }
82
83 pub async fn try_claim_session_for_sync(&self, repo_name: &str, session_id: &str) -> bool {
90 if self.sessions.has_session_id(session_id).await {
92 return false;
93 }
94
95 let pending_repos = self.pending_repos.lock().await;
98 let mut pending_sessions = self.pending_session_ids.lock().await;
99
100 if pending_repos.contains_key(repo_name) || pending_sessions.contains(session_id) {
102 return false;
103 }
104
105 pending_sessions.insert(session_id.to_string());
107 true
108 }
109
110 pub async fn release_claimed_session(&self, session_id: &str) {
112 self.pending_session_ids.lock().await.remove(session_id);
113 }
114
115 pub async fn cleanup_stale_pending_repos(&self) {
118 const MAX_PENDING_DURATION: std::time::Duration = std::time::Duration::from_secs(5 * 60);
119
120 let mut pending = self.pending_repos.lock().await;
121 let before = pending.len();
122 pending.retain(|repo, timestamp| {
123 let elapsed = timestamp.elapsed();
124 if elapsed > MAX_PENDING_DURATION {
125 warn!(
126 "Cleaning up stale pending repo '{}' (pending for {:?})",
127 repo, elapsed
128 );
129 false
130 } else {
131 true
132 }
133 });
134 let cleaned = before - pending.len();
135 if cleaned > 0 {
136 info!("Cleaned up {} stale pending repo(s)", cleaned);
137 }
138 }
139
140 pub fn is_live_mode(&self) -> bool {
142 self.config.defaults.streaming_mode == crate::config::StreamingMode::Live
143 }
144
145 pub async fn resolved_model(&self, repo_name: &str, thread_ts: Option<&str>) -> String {
147 if let Some(ts) = thread_ts {
148 if let Some(m) = self.thread_models.lock().await.get(ts) {
149 return m.clone();
150 }
151 }
152 match self.config.repos.get(repo_name) {
153 Some(repo) => repo.resolved_model(&self.config.defaults),
154 None => crate::config::DEFAULT_MODEL.to_string(),
155 }
156 }
157
158 pub(crate) async fn rate_limit(&self, channel_id: &str) {
160 let min_interval =
161 std::time::Duration::from_millis(self.config.tuning.rate_limit_interval_ms);
162 let mut limiter = self.rate_limiter.lock().await;
163 if let Some(last) = limiter.get(channel_id) {
164 let elapsed = last.elapsed();
165 if elapsed < min_interval {
166 let wait = min_interval - elapsed;
167 drop(limiter); tokio::time::sleep(wait).await;
169 limiter = self.rate_limiter.lock().await;
170 }
171 }
172 limiter.insert(channel_id.to_string(), Instant::now());
173 }
174}
175
176pub async fn handle_message(state: AppState, event: SlackMessageEvent) {
178 let channel_id = match &event.origin.channel {
179 Some(c) => c.to_string(),
180 None => return,
181 };
182
183 let user_id = match event.sender.user.as_ref() {
184 Some(u) => u.to_string(),
185 None => return,
186 };
187
188 if user_id == state.bot_user_id.as_str() {
190 return;
191 }
192
193 if let Some(ref subtype) = event.subtype {
195 info!(
196 "Ignoring message with subtype {:?} in channel {}",
197 subtype, channel_id
198 );
199 return;
200 }
201
202 if !state.is_allowed_user(&user_id) {
204 tracing::warn!(
205 "Unauthorized message from user {} in channel {}",
206 user_id,
207 channel_id
208 );
209 return;
210 }
211
212 let text = match event.content.as_ref().and_then(|c| c.text.as_ref()) {
213 Some(t) if !t.is_empty() => t.to_string(),
214 _ => {
215 info!(
216 "Ignoring message without text content from user {} in channel {}",
217 user_id, channel_id
218 );
219 return;
220 }
221 };
222
223 let message_ts = event.origin.ts.to_string();
224
225 if let Some(thread_ts) = event.origin.thread_ts.as_ref() {
227 let thread_ts = thread_ts.to_string();
228 info!(
229 "Thread reply from {} in channel {} (thread {}): {}",
230 user_id,
231 channel_id,
232 thread_ts,
233 &text[..crate::util::floor_char_boundary(
234 &text,
235 state.config.tuning.log_preview_max_len
236 )]
237 );
238 handlers::handle_thread_reply(state, channel_id, thread_ts, message_ts, text).await;
239 } else {
240 info!(
241 "New message from {} in channel {}: {}",
242 user_id,
243 channel_id,
244 &text[..crate::util::floor_char_boundary(
245 &text,
246 state.config.tuning.log_preview_max_len
247 )]
248 );
249 handlers::handle_new_message(state, channel_id, message_ts, user_id, text).await;
250 }
251}