1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
//! Session pool — manages per-channel agent sessions.
use std::collections::HashMap;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use super::adapter::{ChannelId, StreamingLevel, WorkspaceScope};
use crate::agent::context::ConversationContext;
/// A plan produced by the architect phase, waiting for user approval.
pub struct PendingPlan {
pub plan: String,
pub arch_context: ConversationContext,
pub user_msg: String,
pub system_prompt: String,
}
/// A message queued for processing after the current agent turn finishes.
/// Inspired by remotecode's session-state.ts message queue pattern.
pub struct QueuedMessage {
/// The user's text.
pub text: String,
}
/// A single agent session bound to a channel.
pub struct AgentSession {
/// UUID session identifier.
pub session_id: String,
/// Project directory this session operates on.
pub project_dir: String,
/// Optional model override (None = use config default).
pub model_override: Option<String>,
/// Optional agent name override (for display and system_prompt).
pub agent_override: Option<String>,
/// Streaming detail level.
pub streaming_level: StreamingLevel,
/// Filesystem access scope.
pub workspace_scope: WorkspaceScope,
/// Conversation context (owned — moved to agent loop during execution).
pub context: ConversationContext,
/// Cancellation token for the currently running agent.
pub cancel_token: Option<CancellationToken>,
/// Whether an agent is currently executing.
pub busy: bool,
/// Last activity timestamp for stale eviction.
pub last_activity: Instant,
/// Message ID being edited for streaming updates.
pub streaming_message_id: Option<String>,
/// Buffer for batching streaming tokens.
pub streaming_buffer: String,
/// Architect-produced plan awaiting user approval.
pub pending_plan: Option<PendingPlan>,
/// Last time a pending-plan notification was sent (for cooldown).
pub last_notified_at: Option<Instant>,
/// Session-scoped tool approval cache (for "Allow for session" in cautious mode).
pub session_approvals: crate::agent::approval::SessionApprovals,
/// Shared MCP manager — initialized once per session, reused every message.
pub mcp: Option<std::sync::Arc<crate::mcp::manager::McpManager>>,
/// Shared skill registry — discovered once per session.
pub skills: Option<std::sync::Arc<crate::skills::SkillRegistry>>,
/// Shared BM25 tool index — built from mcp+skills, reused per message.
pub tool_index: Option<std::sync::Arc<crate::tools::tool_index::ToolIndex>>,
/// Message queue — when the agent is busy, incoming messages are queued and
/// processed sequentially after the current task finishes. Inspired by
/// remotecode's per-session message queue (session-state.ts).
pub message_queue: Vec<QueuedMessage>,
/// When `true`, output from the current agent turn is silently dropped.
/// Set when the user switches projects or cancels — prevents stale output
/// from appearing in the new session context.
pub suppressed: bool,
}
impl AgentSession {
pub fn new(
project_dir: String,
context: ConversationContext,
streaming_level: StreamingLevel,
workspace_scope: WorkspaceScope,
) -> Self {
Self {
session_id: uuid::Uuid::new_v4().to_string(),
project_dir,
model_override: None,
agent_override: None,
streaming_level,
workspace_scope,
context,
cancel_token: None,
busy: false,
last_activity: Instant::now(),
streaming_message_id: None,
streaming_buffer: String::new(),
pending_plan: None,
last_notified_at: None,
session_approvals: crate::agent::approval::SessionApprovals::new(),
mcp: None,
skills: None,
tool_index: None,
message_queue: Vec::new(),
suppressed: false,
}
}
/// Touch — update last activity timestamp.
pub fn touch(&mut self) {
self.last_activity = Instant::now();
}
/// Seconds since last activity.
pub fn idle_secs(&self) -> u64 {
self.last_activity.elapsed().as_secs()
}
/// Returns true if a pending-plan notification should be sent now,
/// applying a cooldown to avoid repeated alerts.
/// Updates `last_notified_at` when returning true.
pub fn should_notify(&mut self, cooldown_secs: u64) -> bool {
let elapsed = self
.last_notified_at
.map(|t| t.elapsed().as_secs())
.unwrap_or(u64::MAX);
if elapsed >= cooldown_secs {
self.last_notified_at = Some(Instant::now());
true
} else {
false
}
}
// ── Message queue helpers (inspired by remotecode session-state.ts) ──
/// Enqueue a message for sequential processing after the current turn.
pub fn enqueue(&mut self, text: String) {
self.message_queue.push(QueuedMessage { text });
}
/// Drain the next queued message (FIFO). Returns `None` if empty.
pub fn drain_next(&mut self) -> Option<String> {
if self.message_queue.is_empty() {
return None;
}
Some(self.message_queue.remove(0).text)
}
/// Clear all queued messages (e.g. on cancel).
pub fn clear_queue(&mut self) {
self.message_queue.clear();
}
/// Suppress output for this session (used during project switches).
pub fn suppress(&mut self) {
self.suppressed = true;
}
}
/// Pool of channel-keyed agent sessions.
pub struct SessionPool {
sessions: HashMap<ChannelId, AgentSession>,
/// Default session timeout in seconds (for eviction).
timeout_secs: u64,
}
impl SessionPool {
pub fn new(timeout_secs: u64) -> Self {
Self {
sessions: HashMap::new(),
timeout_secs,
}
}
/// Get a session for a channel.
pub fn get(&self, channel: &ChannelId) -> Option<&AgentSession> {
self.sessions.get(channel)
}
/// Get mutable reference to a session.
pub fn get_mut(&mut self, channel: &ChannelId) -> Option<&mut AgentSession> {
self.sessions.get_mut(channel)
}
/// Insert a new session, returning any previous session.
pub fn insert(&mut self, channel: ChannelId, session: AgentSession) -> Option<AgentSession> {
self.sessions.insert(channel, session)
}
/// Remove a session.
pub fn remove(&mut self, channel: &ChannelId) -> Option<AgentSession> {
self.sessions.remove(channel)
}
/// Check if a session exists for a channel.
pub fn contains(&self, channel: &ChannelId) -> bool {
self.sessions.contains_key(channel)
}
/// Evict stale sessions (idle > timeout, not busy).
/// Returns the number of sessions evicted.
/// Spawns background tasks to shut down MCP child processes for evicted sessions.
pub fn evict_stale(&mut self) -> usize {
let timeout = self.timeout_secs;
let mut evicted_sessions: Vec<AgentSession> = Vec::new();
self.sessions.retain(|_, session| {
if !session.busy && session.idle_secs() >= timeout {
// Clone context data needed for MCP shutdown before removing.
// We cannot take ownership directly from DashMap retain, so we
// remove the entry and collect it separately.
evicted_sessions.push(std::mem::replace(
session,
AgentSession::new(
String::new(),
session.context.clone(),
session.streaming_level,
session.workspace_scope,
),
));
false
} else {
true
}
});
let evicted = evicted_sessions.len();
if evicted > 0 {
tracing::debug!(
evicted,
remaining = self.len(),
"Session pool: evicted stale sessions"
);
for session in evicted_sessions {
if let Some(mcp) = session.mcp {
tokio::spawn(async move { mcp.shutdown_all().await });
}
}
}
evicted
}
/// Channels that have a pending plan waiting for approval (not busy).
pub fn pending_sessions(&self) -> Vec<ChannelId> {
self.sessions
.iter()
.filter(|(_, s)| s.pending_plan.is_some() && !s.busy)
.map(|(ch, _)| ch.clone())
.collect()
}
/// Number of active sessions.
pub fn len(&self) -> usize {
self.sessions.len()
}
pub fn is_empty(&self) -> bool {
self.sessions.is_empty()
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::context::ConversationContext;
fn make_session(project: &str) -> AgentSession {
let ctx = ConversationContext::with_budget("test".to_string(), 1000, 0.8);
AgentSession::new(
project.to_string(),
ctx,
StreamingLevel::Compact,
WorkspaceScope::Project,
)
}
#[test]
fn crud() {
let mut pool = SessionPool::new(300);
let ch = ChannelId::new("telegram", "123");
assert!(pool.get(&ch).is_none());
pool.insert(ch.clone(), make_session("/home/user/app"));
assert!(pool.contains(&ch));
assert_eq!(pool.len(), 1);
let session = pool.get(&ch).unwrap();
assert_eq!(session.project_dir, "/home/user/app");
pool.remove(&ch);
assert!(pool.is_empty());
}
#[test]
fn evict_stale() {
let mut pool = SessionPool::new(0); // 0 second timeout = everything stale
let ch = ChannelId::new("slack", "C01");
pool.insert(ch, make_session("/tmp"));
let evicted = pool.evict_stale();
assert_eq!(evicted, 1);
assert!(pool.is_empty());
}
#[test]
fn pending_sessions_filters_correctly() {
let mut pool = SessionPool::new(300);
let ch1 = ChannelId::new("telegram", "111");
let ch2 = ChannelId::new("slack", "C02");
let ch3 = ChannelId::new("discord", "999");
let mut s1 = make_session("/a");
s1.pending_plan = Some(PendingPlan {
plan: "do stuff".to_string(),
arch_context: crate::agent::context::ConversationContext::with_budget(
"test".to_string(),
1000,
0.8,
),
user_msg: "msg".to_string(),
system_prompt: "sp".to_string(),
});
let mut s2 = make_session("/b");
s2.pending_plan = Some(PendingPlan {
plan: "do more".to_string(),
arch_context: crate::agent::context::ConversationContext::with_budget(
"test".to_string(),
1000,
0.8,
),
user_msg: "msg".to_string(),
system_prompt: "sp".to_string(),
});
s2.busy = true; // busy — should be excluded
pool.insert(ch1.clone(), s1);
pool.insert(ch2, s2);
pool.insert(ch3, make_session("/c")); // no pending plan
let pending = pool.pending_sessions();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0], ch1);
}
#[test]
fn should_notify_cooldown() {
let mut s = make_session("/x");
// First call: no previous notification → should send
assert!(s.should_notify(300));
// Immediate second call: within cooldown → should NOT send
assert!(!s.should_notify(300));
// Zero-second cooldown → always sends
assert!(s.should_notify(0));
}
}