gitcortex_mcp/mcp/
server.rs1use std::path::PathBuf;
2
3use anyhow::{Context, Result};
4use rmcp::{transport::io::stdio, ServiceExt};
5
6use crate::embeddings::{node_text, Embedder, SemanticIndex};
7use crate::mcp::tools::{GitCortexServer, SemanticState};
8use gitcortex_core::store::GraphStore;
9use gitcortex_store::branch;
10
11pub async fn serve(repo_root: PathBuf, compact: bool) -> Result<()> {
12 let handler = GitCortexServer::new_with_mode(&repo_root, compact)
13 .context("failed to open graph store")?;
14
15 let (sem_arc, store_arc, default_branch) = handler.semantic_context();
20 let repo_id = branch::repo_id(&repo_root);
21
22 tokio::task::spawn(async move {
23 let result = tokio::task::spawn_blocking(move || {
24 run_background_indexer(sem_arc, store_arc, &default_branch, &repo_id)
25 })
26 .await;
27
28 match result {
29 Ok(Ok(())) => tracing::info!("semantic indexer finished"),
30 Ok(Err(e)) => tracing::warn!("semantic indexer failed: {e}"),
31 Err(e) => tracing::warn!("semantic indexer panicked: {e}"),
32 }
33 });
34
35 let transport = stdio();
36 tracing::info!("GitCortex MCP server started (stdio, compact={compact})");
37
38 let service = handler.serve(transport).await.context("MCP server error")?;
39 service.waiting().await.context("MCP server stopped")?;
40
41 Ok(())
42}
43
44fn run_background_indexer(
45 sem_arc: std::sync::Arc<std::sync::Mutex<SemanticState>>,
46 store_arc: std::sync::Arc<std::sync::Mutex<gitcortex_store::kuzu::KuzuGraphStore>>,
47 branch: &str,
48 repo_id: &str,
49) -> anyhow::Result<()> {
50 let embedder = match Embedder::new() {
52 Ok(e) => e,
53 Err(e) => {
54 tracing::warn!("semantic search disabled: {e}");
55 if let Ok(mut s) = sem_arc.lock() {
56 *s = SemanticState::Disabled;
57 }
58 return Ok(());
59 }
60 };
61
62 let index_path =
64 branch::data_dir(repo_id).join(format!("embeddings_{}.bin", branch::sanitize(branch)));
65 let mut index = SemanticIndex::load_or_create(&index_path);
66
67 let nodes = {
69 let store = store_arc
70 .lock()
71 .map_err(|_| anyhow::anyhow!("store mutex poisoned"))?;
72 store.list_all_nodes(branch).unwrap_or_default()
73 };
74
75 let live_ids: std::collections::HashSet<String> = nodes.iter().map(|n| n.id.as_str()).collect();
77 let pruned = index.retain_ids(&live_ids);
78 if pruned > 0 {
79 tracing::info!("semantic index: pruned {pruned} stale vectors");
80 }
81
82 let missing: Vec<_> = nodes
83 .iter()
84 .filter(|n| !index.has(&n.id.as_str()))
85 .collect();
86
87 if !missing.is_empty() {
88 tracing::info!(
89 "semantic indexer: embedding {} new nodes on branch '{branch}'",
90 missing.len()
91 );
92 const BATCH: usize = 32;
93 for chunk in missing.chunks(BATCH) {
94 let texts: Vec<String> = chunk.iter().map(|n| node_text(n)).collect();
95 let ids: Vec<String> = chunk.iter().map(|n| n.id.as_str()).collect();
96 match embedder.embed_batch(texts) {
97 Ok(vecs) => {
98 for (id, vec) in ids.into_iter().zip(vecs) {
99 index.insert(id, vec);
100 }
101 }
102 Err(e) => tracing::warn!("embedding batch failed: {e}"),
103 }
104 }
105 index.save();
106 tracing::info!("semantic index: {} vectors", index.len());
107 } else if pruned > 0 {
108 index.save();
109 } else {
110 tracing::info!("semantic index up-to-date: {} vectors", index.len());
111 }
112
113 if let Ok(mut s) = sem_arc.lock() {
115 *s = SemanticState::Ready {
116 embedder: Box::new(embedder),
117 index: Box::new(index),
118 };
119 }
120
121 Ok(())
122}