collet 0.1.1

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! ACP session management — bridges ACP protocol to collet's agent loop.

use std::collections::HashMap;

use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::agent::context::ConversationContext;
use crate::agent::r#loop::AgentEvent;
use crate::api::provider::OpenAiCompatibleProvider;
use crate::config::Config;
use crate::project_cache;

use super::protocol::{SessionNewParams, UpdatePayload};

/// A running ACP session — owns the cancellation token and event receiver.
pub struct AcpSession {
    pub session_id: String,
    pub working_dir: String,
    pub cancel: CancellationToken,
    pub event_rx: mpsc::UnboundedReceiver<AgentEvent>,
    /// Conversation context returned when the agent finishes (for session resume).
    pub context: Option<ConversationContext>,
}

/// Manages multiple concurrent ACP sessions.
pub struct SessionManager {
    sessions: HashMap<String, AcpSession>,
    config: Config,
    client: OpenAiCompatibleProvider,
    next_id: u64,
}

impl SessionManager {
    pub fn new(config: Config, client: OpenAiCompatibleProvider) -> Self {
        Self {
            sessions: HashMap::new(),
            config,
            client,
            next_id: 1,
        }
    }

    /// Create a new session and spawn the agent loop.
    pub async fn create_session(&mut self, params: SessionNewParams) -> anyhow::Result<String> {
        let session_id = format!("acp-{}", self.next_id);
        self.next_id += 1;

        let working_dir = params.cwd.clone();

        // Build/reuse cached repo map
        let snap = {
            let wd = working_dir.clone();
            tokio::task::spawn_blocking(move || project_cache::snapshot(&wd))
                .await
                .unwrap()
        };

        let mut system_prompt = crate::agent::prompt::build_default_prompt(
            &snap.map_string,
            snap.file_count,
            snap.symbol_count,
            None,
        );

        // Inject working directory context so the LLM knows which project it's in
        let project_name = std::path::Path::new(&working_dir)
            .file_name()
            .map(|n| n.to_string_lossy().to_string())
            .unwrap_or_default();
        system_prompt.push_str(&format!(
            "\n\n## Environment\n\n- Working directory: {working_dir}\n- Project: {project_name}\n"
        ));

        let context = ConversationContext::with_budget(
            system_prompt,
            self.config.context_max_tokens,
            self.config.compaction_threshold,
        );

        let (_event_tx, event_rx) = mpsc::unbounded_channel::<AgentEvent>();
        let cancel = CancellationToken::new();

        let session = AcpSession {
            session_id: session_id.clone(),
            working_dir: working_dir.clone(),
            cancel,
            event_rx,
            context: Some(context),
        };

        self.sessions.insert(session_id.clone(), session);

        Ok(session_id)
    }

    /// Send a prompt to an existing session, spawning the agent loop.
    pub fn send_prompt(
        &mut self,
        session_id: &str,
        text: String,
        _mode_override: Option<String>,
    ) -> anyhow::Result<()> {
        let session = self
            .sessions
            .get_mut(session_id)
            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;

        let context = session.context.take().ok_or_else(|| {
            anyhow::anyhow!("Session {} is already processing a prompt", session_id)
        })?;

        let (event_tx, event_rx) = mpsc::unbounded_channel::<AgentEvent>();
        session.event_rx = event_rx;

        let client = self.client.clone();
        let config = self.config.clone();
        let working_dir = session.working_dir.clone();
        let cancel = session.cancel.clone();
        let lsp_manager = crate::lsp::manager::LspManager::new(working_dir.clone());
        let approval_gate = if config.yolo {
            crate::agent::approval::ApprovalGate::yolo()
        } else {
            crate::agent::approval::ApprovalGate::headless(
                crate::agent::approval::SharedApproveMode::new(
                    crate::agent::approval::ApproveMode::Auto,
                ),
            )
        };

        tokio::spawn(async move {
            crate::agent::r#loop::run_with_mode(crate::agent::r#loop::AgentParams {
                client,
                config,
                context,
                user_msg: text,
                working_dir,
                event_tx,
                cancel,
                lsp_manager,
                trust_level: crate::trust::TrustLevel::Full,
                approval_gate,
                images: Vec::new(),
            })
            .await;
        });

        Ok(())
    }

    /// Cancel a running session.
    pub fn cancel_session(&self, session_id: &str) -> bool {
        if let Some(session) = self.sessions.get(session_id) {
            session.cancel.cancel();
            true
        } else {
            false
        }
    }

    /// Get mutable reference to a session.
    pub fn get_session_mut(&mut self, session_id: &str) -> Option<&mut AcpSession> {
        self.sessions.get_mut(session_id)
    }

    /// Store returned context back into the session (for resume).
    pub fn return_context(&mut self, session_id: &str, context: ConversationContext) {
        if let Some(session) = self.sessions.get_mut(session_id) {
            session.context = Some(context);
        }
    }

    /// Remove a session (called by the `session/close` handler).
    pub fn remove_session(&mut self, session_id: &str) -> Option<AcpSession> {
        self.sessions.remove(session_id)
    }
}

/// Convert an `AgentEvent` into an `UpdatePayload` for ACP streaming.
pub fn agent_event_to_update(event: &AgentEvent) -> Option<UpdatePayload> {
    match event {
        AgentEvent::Token(text) => Some(UpdatePayload::Token { text: text.clone() }),
        AgentEvent::Response(text) => Some(UpdatePayload::Response { text: text.clone() }),
        AgentEvent::ToolCall { name, args, .. } => Some(UpdatePayload::ToolCall {
            name: name.clone(),
            args: args.clone(),
        }),
        AgentEvent::ToolResult {
            name,
            result,
            success,
            ..
        } => Some(UpdatePayload::ToolResult {
            name: name.clone(),
            result: result.clone(),
            success: *success,
        }),
        AgentEvent::FileModified { path } => {
            Some(UpdatePayload::FileModified { path: path.clone() })
        }
        AgentEvent::Status {
            iteration,
            elapsed_secs,
            ..
        } => Some(UpdatePayload::Status {
            iteration: *iteration,
            elapsed_secs: *elapsed_secs,
        }),
        AgentEvent::PhaseChange { label } => Some(UpdatePayload::Phase {
            label: label.clone(),
        }),
        AgentEvent::PlanReady { plan, .. } => Some(UpdatePayload::Plan { text: plan.clone() }),
        AgentEvent::Error(msg) => Some(UpdatePayload::Error {
            message: msg.clone(),
        }),
        AgentEvent::GuardStop(msg) => Some(UpdatePayload::Error {
            message: msg.clone(),
        }),
        AgentEvent::Done { .. } => Some(UpdatePayload::Done),
        // Swarm events, LSP installs, performance, MCP pids — skip for now
        _ => None,
    }
}