collet 0.1.1

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! Session pool — manages per-channel agent sessions.

use std::collections::HashMap;
use std::time::Instant;
use tokio_util::sync::CancellationToken;

use super::adapter::{ChannelId, StreamingLevel, WorkspaceScope};
use crate::agent::context::ConversationContext;

/// A plan produced by the architect phase, waiting for user approval.
pub struct PendingPlan {
    pub plan: String,
    pub arch_context: ConversationContext,
    pub user_msg: String,
    pub system_prompt: String,
}

/// A message queued for processing after the current agent turn finishes.
/// Inspired by remotecode's session-state.ts message queue pattern.
pub struct QueuedMessage {
    /// The user's text.
    pub text: String,
}

/// A single agent session bound to a channel.
pub struct AgentSession {
    /// UUID session identifier.
    pub session_id: String,
    /// Project directory this session operates on.
    pub project_dir: String,
    /// Optional model override (None = use config default).
    pub model_override: Option<String>,
    /// Optional agent name override (for display and system_prompt).
    pub agent_override: Option<String>,
    /// Streaming detail level.
    pub streaming_level: StreamingLevel,
    /// Filesystem access scope.
    pub workspace_scope: WorkspaceScope,
    /// Conversation context (owned — moved to agent loop during execution).
    pub context: ConversationContext,
    /// Cancellation token for the currently running agent.
    pub cancel_token: Option<CancellationToken>,
    /// Whether an agent is currently executing.
    pub busy: bool,
    /// Last activity timestamp for stale eviction.
    pub last_activity: Instant,
    /// Message ID being edited for streaming updates.
    pub streaming_message_id: Option<String>,
    /// Buffer for batching streaming tokens.
    pub streaming_buffer: String,
    /// Architect-produced plan awaiting user approval.
    pub pending_plan: Option<PendingPlan>,
    /// Last time a pending-plan notification was sent (for cooldown).
    pub last_notified_at: Option<Instant>,
    /// Session-scoped tool approval cache (for "Allow for session" in cautious mode).
    pub session_approvals: crate::agent::approval::SessionApprovals,
    /// Shared MCP manager — initialized once per session, reused every message.
    pub mcp: Option<std::sync::Arc<crate::mcp::manager::McpManager>>,
    /// Shared skill registry — discovered once per session.
    pub skills: Option<std::sync::Arc<crate::skills::SkillRegistry>>,
    /// Shared BM25 tool index — built from mcp+skills, reused per message.
    pub tool_index: Option<std::sync::Arc<crate::tools::tool_index::ToolIndex>>,
    /// Message queue — when the agent is busy, incoming messages are queued and
    /// processed sequentially after the current task finishes.  Inspired by
    /// remotecode's per-session message queue (session-state.ts).
    pub message_queue: Vec<QueuedMessage>,
    /// When `true`, output from the current agent turn is silently dropped.
    /// Set when the user switches projects or cancels — prevents stale output
    /// from appearing in the new session context.
    pub suppressed: bool,
}

impl AgentSession {
    pub fn new(
        project_dir: String,
        context: ConversationContext,
        streaming_level: StreamingLevel,
        workspace_scope: WorkspaceScope,
    ) -> Self {
        Self {
            session_id: uuid::Uuid::new_v4().to_string(),
            project_dir,
            model_override: None,
            agent_override: None,
            streaming_level,
            workspace_scope,
            context,
            cancel_token: None,
            busy: false,
            last_activity: Instant::now(),
            streaming_message_id: None,
            streaming_buffer: String::new(),
            pending_plan: None,
            last_notified_at: None,
            session_approvals: crate::agent::approval::SessionApprovals::new(),
            mcp: None,
            skills: None,
            tool_index: None,
            message_queue: Vec::new(),
            suppressed: false,
        }
    }

    /// Touch — update last activity timestamp.
    pub fn touch(&mut self) {
        self.last_activity = Instant::now();
    }

    /// Seconds since last activity.
    pub fn idle_secs(&self) -> u64 {
        self.last_activity.elapsed().as_secs()
    }

    /// Returns true if a pending-plan notification should be sent now,
    /// applying a cooldown to avoid repeated alerts.
    /// Updates `last_notified_at` when returning true.
    pub fn should_notify(&mut self, cooldown_secs: u64) -> bool {
        let elapsed = self
            .last_notified_at
            .map(|t| t.elapsed().as_secs())
            .unwrap_or(u64::MAX);
        if elapsed >= cooldown_secs {
            self.last_notified_at = Some(Instant::now());
            true
        } else {
            false
        }
    }

    // ── Message queue helpers (inspired by remotecode session-state.ts) ──

    /// Enqueue a message for sequential processing after the current turn.
    pub fn enqueue(&mut self, text: String) {
        self.message_queue.push(QueuedMessage { text });
    }

    /// Drain the next queued message (FIFO). Returns `None` if empty.
    pub fn drain_next(&mut self) -> Option<String> {
        if self.message_queue.is_empty() {
            return None;
        }
        Some(self.message_queue.remove(0).text)
    }

    /// Clear all queued messages (e.g. on cancel).
    pub fn clear_queue(&mut self) {
        self.message_queue.clear();
    }

    /// Suppress output for this session (used during project switches).
    pub fn suppress(&mut self) {
        self.suppressed = true;
    }
}

/// Pool of channel-keyed agent sessions.
pub struct SessionPool {
    sessions: HashMap<ChannelId, AgentSession>,
    /// Default session timeout in seconds (for eviction).
    timeout_secs: u64,
}

impl SessionPool {
    pub fn new(timeout_secs: u64) -> Self {
        Self {
            sessions: HashMap::new(),
            timeout_secs,
        }
    }

    /// Get a session for a channel.
    pub fn get(&self, channel: &ChannelId) -> Option<&AgentSession> {
        self.sessions.get(channel)
    }

    /// Get mutable reference to a session.
    pub fn get_mut(&mut self, channel: &ChannelId) -> Option<&mut AgentSession> {
        self.sessions.get_mut(channel)
    }

    /// Insert a new session, returning any previous session.
    pub fn insert(&mut self, channel: ChannelId, session: AgentSession) -> Option<AgentSession> {
        self.sessions.insert(channel, session)
    }

    /// Remove a session.
    pub fn remove(&mut self, channel: &ChannelId) -> Option<AgentSession> {
        self.sessions.remove(channel)
    }

    /// Check if a session exists for a channel.
    pub fn contains(&self, channel: &ChannelId) -> bool {
        self.sessions.contains_key(channel)
    }

    /// Evict stale sessions (idle > timeout, not busy).
    /// Returns the number of sessions evicted.
    /// Spawns background tasks to shut down MCP child processes for evicted sessions.
    pub fn evict_stale(&mut self) -> usize {
        let timeout = self.timeout_secs;
        let mut evicted_sessions: Vec<AgentSession> = Vec::new();
        self.sessions.retain(|_, session| {
            if !session.busy && session.idle_secs() >= timeout {
                // Clone context data needed for MCP shutdown before removing.
                // We cannot take ownership directly from DashMap retain, so we
                // remove the entry and collect it separately.
                evicted_sessions.push(std::mem::replace(
                    session,
                    AgentSession::new(
                        String::new(),
                        session.context.clone(),
                        session.streaming_level,
                        session.workspace_scope,
                    ),
                ));
                false
            } else {
                true
            }
        });
        let evicted = evicted_sessions.len();
        if evicted > 0 {
            tracing::debug!(
                evicted,
                remaining = self.len(),
                "Session pool: evicted stale sessions"
            );
            for session in evicted_sessions {
                if let Some(mcp) = session.mcp {
                    tokio::spawn(async move { mcp.shutdown_all().await });
                }
            }
        }
        evicted
    }

    /// Channels that have a pending plan waiting for approval (not busy).
    pub fn pending_sessions(&self) -> Vec<ChannelId> {
        self.sessions
            .iter()
            .filter(|(_, s)| s.pending_plan.is_some() && !s.busy)
            .map(|(ch, _)| ch.clone())
            .collect()
    }

    /// Number of active sessions.
    pub fn len(&self) -> usize {
        self.sessions.len()
    }

    pub fn is_empty(&self) -> bool {
        self.sessions.is_empty()
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use crate::agent::context::ConversationContext;

    fn make_session(project: &str) -> AgentSession {
        let ctx = ConversationContext::with_budget("test".to_string(), 1000, 0.8);
        AgentSession::new(
            project.to_string(),
            ctx,
            StreamingLevel::Compact,
            WorkspaceScope::Project,
        )
    }

    #[test]
    fn crud() {
        let mut pool = SessionPool::new(300);
        let ch = ChannelId::new("telegram", "123");

        assert!(pool.get(&ch).is_none());
        pool.insert(ch.clone(), make_session("/home/user/app"));
        assert!(pool.contains(&ch));
        assert_eq!(pool.len(), 1);

        let session = pool.get(&ch).unwrap();
        assert_eq!(session.project_dir, "/home/user/app");

        pool.remove(&ch);
        assert!(pool.is_empty());
    }

    #[test]
    fn evict_stale() {
        let mut pool = SessionPool::new(0); // 0 second timeout = everything stale
        let ch = ChannelId::new("slack", "C01");
        pool.insert(ch, make_session("/tmp"));

        let evicted = pool.evict_stale();
        assert_eq!(evicted, 1);
        assert!(pool.is_empty());
    }

    #[test]
    fn pending_sessions_filters_correctly() {
        let mut pool = SessionPool::new(300);
        let ch1 = ChannelId::new("telegram", "111");
        let ch2 = ChannelId::new("slack", "C02");
        let ch3 = ChannelId::new("discord", "999");

        let mut s1 = make_session("/a");
        s1.pending_plan = Some(PendingPlan {
            plan: "do stuff".to_string(),
            arch_context: crate::agent::context::ConversationContext::with_budget(
                "test".to_string(),
                1000,
                0.8,
            ),
            user_msg: "msg".to_string(),
            system_prompt: "sp".to_string(),
        });

        let mut s2 = make_session("/b");
        s2.pending_plan = Some(PendingPlan {
            plan: "do more".to_string(),
            arch_context: crate::agent::context::ConversationContext::with_budget(
                "test".to_string(),
                1000,
                0.8,
            ),
            user_msg: "msg".to_string(),
            system_prompt: "sp".to_string(),
        });
        s2.busy = true; // busy — should be excluded

        pool.insert(ch1.clone(), s1);
        pool.insert(ch2, s2);
        pool.insert(ch3, make_session("/c")); // no pending plan

        let pending = pool.pending_sessions();
        assert_eq!(pending.len(), 1);
        assert_eq!(pending[0], ch1);
    }

    #[test]
    fn should_notify_cooldown() {
        let mut s = make_session("/x");
        // First call: no previous notification → should send
        assert!(s.should_notify(300));
        // Immediate second call: within cooldown → should NOT send
        assert!(!s.should_notify(300));
        // Zero-second cooldown → always sends
        assert!(s.should_notify(0));
    }
}