collet 0.1.0

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! LSP manager — maintains a pool of active language server clients.
//!
//! Spawns LSP servers on-demand per language, sends notifications for file
//! changes, and queries diagnostics to provide feedback to the agent.

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

use super::client::{LspClient, extension_to_language_id, find_server_for_language, path_to_uri};
use super::protocol::Diagnostic;

/// Shared LSP manager that maintains active clients per language.
#[derive(Clone)]
pub struct LspManager {
    inner: Arc<Mutex<LspManagerInner>>,
    /// Spawned server command names, readable without async.
    spawned_names: Arc<std::sync::Mutex<Vec<String>>>,
    /// Cached child PIDs for sync access (updated on spawn).
    cached_pids: Arc<std::sync::Mutex<Vec<u32>>>,
    /// Per-language spawn gate: Semaphore(1) ensures at most one spawn runs at
    /// a time per language, eliminating the TOCTOU gap between the "is anyone
    /// spawning?" check and the actual spawn.  Tasks that find the semaphore
    /// unavailable block here until the in-flight spawn completes, then
    /// re-check `inner.clients` — no polling loop needed.
    spawn_locks: Arc<dashmap::DashMap<String, Arc<tokio::sync::Semaphore>>>,
}

struct LspManagerInner {
    /// Active LSP clients by language ID.
    clients: HashMap<String, LspClient>,
    /// Project root directory.
    root_dir: String,
    /// Files that have been opened (to avoid duplicate didOpen).
    opened_files: HashMap<String, String>, // path → language_id
    /// Languages whose LSP server failed to spawn (don't retry).
    failed_languages: std::collections::HashSet<String>,
}

impl LspManager {
    /// Create a new LSP manager for the given project root.
    pub fn new(root_dir: String) -> Self {
        Self {
            inner: Arc::new(Mutex::new(LspManagerInner {
                clients: HashMap::new(),
                root_dir,
                opened_files: HashMap::new(),
                failed_languages: std::collections::HashSet::new(),
            })),
            spawned_names: Arc::new(std::sync::Mutex::new(Vec::new())),
            cached_pids: Arc::new(std::sync::Mutex::new(Vec::new())),
            spawn_locks: Arc::new(dashmap::DashMap::new()),
        }
    }

    /// Notify LSP of a file being opened or changed.
    ///
    /// If the file's language has no active LSP client one is spawned **outside
    /// the async mutex** so concurrent writes to different languages are not
    /// serialised behind a single long-held lock.  The mutex is held only for
    /// brief read/write of the client map.
    ///
    /// A per-language `Semaphore(1)` in `spawn_locks` guarantees at most one
    /// spawn runs at a time — concurrent callers block on the semaphore and
    /// re-check `inner.clients` once the in-flight spawn finishes, so no
    /// polling loop is needed and there is no check-then-act race.
    ///
    /// Sends `didOpen` on first encounter, `didChange` on subsequent calls.
    pub async fn notify_file_change(&self, file_path: &str, content: &str) -> anyhow::Result<()> {
        // Resolve language ID.
        let ext = std::path::Path::new(file_path)
            .extension()
            .and_then(|e| e.to_str())
            .unwrap_or("");
        let language_id = {
            let inner = self.inner.lock().await;
            let id = match extension_to_language_id(ext) {
                Some(id) => id,
                None => {
                    tracing::debug!("No language ID for extension {ext}, skipping LSP");
                    return Ok(());
                }
            };
            if inner.failed_languages.contains(id) {
                return Ok(());
            }
            // Fast path: client already ready.
            if inner.clients.contains_key(id) {
                drop(inner);
                // Skip to notification phase below by using a synthetic id.
                id.to_string()
            } else {
                id.to_string()
            }
        };

        // Ensure the client exists, spawning if necessary.
        // The per-language semaphore serialises concurrent spawn attempts so
        // only one task runs spawn_client at a time.  All others block here
        // and find the client ready once the permit is released.
        {
            let sem = self
                .spawn_locks
                .entry(language_id.clone())
                .or_insert_with(|| Arc::new(tokio::sync::Semaphore::new(1)))
                .clone();

            let _permit = sem.acquire().await;

            // Re-check under the permit: another task may have just spawned.
            let needs_spawn = {
                let inner = self.inner.lock().await;
                !inner.clients.contains_key(&language_id)
                    && !inner.failed_languages.contains(&language_id)
            };

            if needs_spawn {
                let root_dir = self.inner.lock().await.root_dir.clone();
                match Self::spawn_client(&language_id, &root_dir).await {
                    Ok(client) => {
                        let mut inner = self.inner.lock().await;
                        if let Some(pid) = client.pid()
                            && let Ok(mut pids) = self.cached_pids.lock()
                        {
                            pids.push(pid);
                        }
                        inner.clients.insert(language_id.clone(), client);
                        if let Some(cfg) = find_server_for_language(&language_id) {
                            let mut names = match self.spawned_names.lock() {
                                Ok(g) => g,
                                Err(e) => e.into_inner(),
                            };
                            if !names.contains(&cfg.command) {
                                names.push(cfg.command);
                            }
                        }
                        tracing::info!("Spawned LSP client for {language_id}");
                    }
                    Err(e) => {
                        tracing::warn!(
                            "Failed to spawn LSP for {language_id}: {e} (will not retry)"
                        );
                        self.inner
                            .lock()
                            .await
                            .failed_languages
                            .insert(language_id.clone());
                        return Ok(());
                    }
                }
            }
            // _permit dropped here → other waiters unblocked
        }

        // ── Phase 3: send notification (lock held only for I/O) ──────────────
        let mut inner = self.inner.lock().await;

        // Spawn may have failed; skip silently.
        if !inner.clients.contains_key(&language_id) {
            return Ok(());
        }

        let uri = path_to_uri(file_path);
        let is_first_open = !inner.opened_files.contains_key(file_path);
        let client = inner
            .clients
            .get_mut(&language_id)
            .expect("key verified by contains_key check above");

        if is_first_open {
            client.send_did_open(&uri, &language_id, content).await?;
            inner
                .opened_files
                .insert(file_path.to_string(), language_id.clone());
            tracing::debug!("Sent didOpen for {file_path}");
        } else {
            client.send_did_change(&uri, content).await?;
            tracing::debug!("Sent didChange for {file_path}");
        }

        Ok(())
    }

    /// Notify LSP that a file was saved.
    pub async fn notify_file_save(&self, file_path: &str) -> anyhow::Result<()> {
        let inner = self.inner.lock().await;

        if let Some(language_id) = inner.opened_files.get(file_path)
            && inner.clients.contains_key(language_id)
        {
            drop(inner);
            return self.send_did_save_internal(file_path).await;
        }

        Ok(())
    }

    async fn send_did_save_internal(&self, file_path: &str) -> anyhow::Result<()> {
        let mut inner = self.inner.lock().await;
        if let Some(language_id) = inner.opened_files.get(file_path).cloned()
            && let Some(client) = inner.clients.get_mut(&language_id)
        {
            let uri = path_to_uri(file_path);
            client.send_did_save(&uri).await?;
            tracing::debug!("Sent didSave for {file_path}");
        }
        Ok(())
    }

    /// Return the current diagnostics version counter for `file_path`.
    ///
    /// The counter is incremented by the background reader each time the LSP
    /// server pushes a `publishDiagnostics` notification.  Read this **before**
    /// sending a file notification, then poll until the value advances to know
    /// when the server has analysed the latest change.
    ///
    /// Returns `0` if the file has not been opened yet or if no LSP client is
    /// active for its language.
    pub async fn diag_version(&self, file_path: &str) -> u64 {
        let inner = self.inner.lock().await;
        if let Some(language_id) = inner.opened_files.get(file_path)
            && let Some(client) = inner.clients.get(language_id)
        {
            client.diag_version()
        } else {
            0
        }
    }

    /// Query diagnostics for a file.
    ///
    /// Returns a list of diagnostics (errors, warnings) from the LSP.
    /// Note: LSP servers typically push diagnostics via `publishDiagnostics`,
    /// but we can also query after a change and wait briefly for the server.
    pub async fn get_diagnostics(&self, file_path: &str) -> anyhow::Result<Vec<Diagnostic>> {
        let inner = self.inner.lock().await;

        if let Some(language_id) = inner.opened_files.get(file_path)
            && let Some(client) = inner.clients.get(language_id)
        {
            let uri = path_to_uri(file_path);
            let cache = client.diagnostics_cache();
            let diags = match cache.lock() {
                Ok(g) => g,
                Err(e) => e.into_inner(),
            };
            if let Some(file_diags) = diags.get(&uri) {
                return Ok(file_diags.clone());
            }
        }

        Ok(vec![])
    }

    /// Get the list of currently spawned server command names.
    pub fn get_spawned_names(&self) -> Vec<String> {
        match self.spawned_names.lock() {
            Ok(g) => g.clone(),
            Err(e) => e.into_inner().clone(),
        }
    }

    /// Collect PIDs of all active LSP server child processes (async, called after installs).
    pub async fn child_pids(&self) -> Vec<u32> {
        let inner = self.inner.lock().await;
        inner.clients.values().filter_map(|c| c.pid()).collect()
    }

    /// Get cached PIDs without async (for metrics collection in tick handler).
    pub fn cached_child_pids(&self) -> Vec<u32> {
        match self.cached_pids.lock() {
            Ok(g) => g.clone(),
            Err(e) => e.into_inner().clone(),
        }
    }

    /// Query document symbols for a file path (absolute) and convert them into
    /// the `repo_map::parser::Symbol` format for repo map integration.
    ///
    /// Returns `None` if no LSP client is available for the file's language.
    pub async fn symbols_for_file(
        &self,
        file_path: &str,
    ) -> Option<Vec<crate::repo_map::parser::Symbol>> {
        let uri = super::client::path_to_uri(file_path);
        let round_trip = super::client::uri_to_path(&uri);
        tracing::trace!(uri = %uri, round_trip = %round_trip, "LSP: resolving symbols");

        let language_id = {
            let path = std::path::Path::new(file_path);
            let ext = path.extension()?.to_str()?;
            super::client::extension_to_language_id(ext)?.to_string()
        };

        let mut inner = self.inner.lock().await;
        let client = inner.clients.get_mut(&language_id)?;
        let symbols = client.document_symbols(&uri).await.ok()?;
        Some(super::client::to_repo_symbols(&symbols, file_path))
    }

    /// Shutdown all active LSP clients.
    pub async fn shutdown_all(&self) -> anyhow::Result<()> {
        let mut inner = self.inner.lock().await;
        for (lang, client) in inner.clients.drain() {
            if let Err(e) = client.shutdown().await {
                tracing::warn!("Error shutting down LSP for {lang}: {e}");
            }
        }
        inner.opened_files.clear();
        Ok(())
    }

    /// Spawn a new LSP client for the given language.
    async fn spawn_client(language_id: &str, root_dir: &str) -> anyhow::Result<LspClient> {
        let config = find_server_for_language(language_id)
            .ok_or_else(|| anyhow::anyhow!("No LSP server config for {language_id}"))?;

        let mut client = LspClient::start(&config, root_dir)?;
        client.initialize(root_dir).await?;
        Ok(client)
    }
}