codelens-mcp 1.9.38

Harness-native Rust MCP server for code intelligence — 107 tools, 25 languages, tree-sitter + hybrid semantic search, 6.1x fewer tokens than rg+cat on agent tasks
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};

use serde_json::Value;

use crate::AppState;
use crate::error::CodeLensError;

pub(crate) const MAX_PENDING_ANALYSIS_REQUESTS: usize = 32;
pub(crate) const STDIO_ANALYSIS_WORKER_COUNT: usize = 1;
pub(crate) const HTTP_ANALYSIS_WORKER_COUNT: usize = 2;

pub(crate) fn analysis_job_cost_units(kind: &str) -> usize {
    match kind {
        "impact_report" => 1,
        "refactor_safety_report" => 2,
        "dead_code_report" => 3,
        _ => 2,
    }
}

pub(crate) struct AnalysisJobRequest {
    pub(crate) job_id: String,
    pub(crate) kind: String,
    pub(crate) arguments: Value,
    pub(crate) profile_hint: Option<String>,
    pub(crate) cost_units: usize,
}

struct AnalysisQueueState {
    pending: VecDeque<AnalysisJobRequest>,
    pending_cost_units: usize,
    active_jobs: usize,
    active_cost_units: usize,
}

pub(crate) struct AnalysisWorkerQueue {
    inner: Arc<(Mutex<AnalysisQueueState>, Condvar)>,
}

impl AnalysisWorkerQueue {
    pub(crate) fn new(state: &AppState) -> Self {
        let inner = Arc::new((
            Mutex::new(AnalysisQueueState {
                pending: VecDeque::new(),
                pending_cost_units: 0,
                active_jobs: 0,
                active_cost_units: 0,
            }),
            Condvar::new(),
        ));
        let worker_limit = state.analysis_worker_limit();
        state.metrics().record_analysis_worker_pool(
            worker_limit,
            state.analysis_cost_budget(),
            state.transport_mode().as_str(),
        );
        for _ in 0..worker_limit {
            let inner_clone = Arc::clone(&inner);
            let worker_state = state.clone_for_worker();
            std::thread::spawn(move || {
                loop {
                    let request = {
                        let (lock, condvar) = &*inner_clone;
                        let mut guard = lock.lock().unwrap_or_else(|p| p.into_inner());
                        loop {
                            if guard.pending.is_empty() {
                                guard = condvar.wait(guard).unwrap_or_else(|p| p.into_inner());
                                continue;
                            }
                            let cost_budget = worker_state.analysis_cost_budget();
                            let next_index = guard.pending.iter().position(|request| {
                                let allowed_parallelism = worker_state
                                    .analysis_parallelism_for_profile(
                                        request.profile_hint.as_deref(),
                                    );
                                guard.active_jobs < allowed_parallelism
                                    && guard.active_cost_units + request.cost_units <= cost_budget
                            });
                            if let Some(index) = next_index {
                                let request = guard.pending.remove(index);
                                guard.active_jobs += 1;
                                if let Some(request) = request.as_ref() {
                                    guard.pending_cost_units =
                                        guard.pending_cost_units.saturating_sub(request.cost_units);
                                    guard.active_cost_units += request.cost_units;
                                }
                                let remaining_depth = guard.pending.len();
                                let remaining_cost_units =
                                    guard.pending_cost_units + guard.active_cost_units;
                                break request.map(|request| {
                                    (request, remaining_depth, remaining_cost_units)
                                });
                            }
                            guard = condvar.wait(guard).unwrap_or_else(|p| p.into_inner());
                        }
                    };
                    if let Some((request, remaining_depth, remaining_cost_units)) = request {
                        let scope = worker_state.project_scope_for_arguments(&request.arguments);
                        if worker_state
                            .get_analysis_job_for_scope(&scope, &request.job_id)
                            .as_ref()
                            .map(|job| job.status)
                            == Some(crate::runtime_types::JobLifecycle::Cancelled)
                        {
                            let (lock, condvar) = &*inner_clone;
                            let mut guard = lock.lock().unwrap_or_else(|p| p.into_inner());
                            guard.active_jobs = guard.active_jobs.saturating_sub(1);
                            guard.active_cost_units =
                                guard.active_cost_units.saturating_sub(request.cost_units);
                            condvar.notify_all();
                            continue;
                        }
                        let request_cost = request.cost_units;
                        worker_state
                            .metrics()
                            .record_analysis_job_started(remaining_depth, remaining_cost_units);
                        let final_status = crate::tools::report_jobs::run_analysis_job_from_queue(
                            &worker_state,
                            request.job_id,
                            request.kind,
                            request.arguments,
                        );
                        let (remaining_depth, remaining_cost_units) = {
                            let (lock, condvar) = &*inner_clone;
                            let mut guard = lock.lock().unwrap_or_else(|p| p.into_inner());
                            guard.active_jobs = guard.active_jobs.saturating_sub(1);
                            guard.active_cost_units =
                                guard.active_cost_units.saturating_sub(request_cost);
                            let remaining_depth = guard.pending.len();
                            let remaining_cost_units =
                                guard.pending_cost_units + guard.active_cost_units;
                            condvar.notify_all();
                            (remaining_depth, remaining_cost_units)
                        };
                        worker_state.metrics().record_analysis_job_finished(
                            final_status,
                            remaining_depth,
                            remaining_cost_units,
                        );
                    }
                }
            });
        }
        Self { inner }
    }

    pub(crate) fn enqueue(
        &self,
        request: AnalysisJobRequest,
    ) -> Result<(usize, usize, bool), CodeLensError> {
        let (lock, condvar) = &*self.inner;
        let mut guard = lock.lock().unwrap_or_else(|p| p.into_inner());
        if guard.pending.len() >= MAX_PENDING_ANALYSIS_REQUESTS {
            return Err(CodeLensError::Validation(format!(
                "analysis queue is full (>{MAX_PENDING_ANALYSIS_REQUESTS} pending jobs)"
            )));
        }
        let insert_at = guard
            .pending
            .iter()
            .position(|existing| existing.cost_units > request.cost_units)
            .unwrap_or(guard.pending.len());
        let priority_promoted = insert_at < guard.pending.len();
        let cost = request.cost_units;
        guard.pending.insert(insert_at, request);
        guard.pending_cost_units += cost;
        let depth = guard.pending.len();
        let weighted_depth = guard.pending_cost_units + guard.active_cost_units;
        condvar.notify_all();
        Ok((depth, weighted_depth, priority_promoted))
    }
}