1mod capability_generator; pub mod codegen;
3pub mod config;
4pub mod config_watcher;
5mod decision;
6mod embedding;
7mod index;
8pub mod js_orchestrator; pub mod models;
10mod pool;
11pub mod registry; pub use embedding::{EmbeddingBackend, MockEmbeddingBackend};
14pub use index::{MemRoutingIndex, MethodEmbedding, ToolEmbedding};
15pub use pool::McpConnectionPool;
16
17pub use decision::{CandidateToolInfo, DecisionEngine, DecisionInput, DecisionOutcome, LlmClient};
18
19use self::{
20 config::McpConfigManager,
21 index::{ScoredMethod, ScoredTool},
22 models::{
23 ExecuteToolRequest, ExecuteToolResponse, IntelligentRouteRequest, IntelligentRouteResponse,
24 MethodSchemaResponse, RouteExecutionResult, SelectedRoute, ToolVectorRecord,
25 },
26 pool::DiscoveredTool,
27};
28use anyhow::{anyhow, Result};
29use fastembed::{EmbeddingModel, InitOptions, TextEmbedding};
30use memvdb::normalize;
31use parking_lot::Mutex;
32use rmcp::model::Tool;
33use serde_json::{json, Value};
34use std::{collections::HashMap, sync::Arc, time::Instant};
35use tokio::sync::RwLock;
36
37const METHOD_VECTOR_PREFIX: &str = "method";
38
39pub struct IntelligentRouter {
40 embedder: Arc<Mutex<TextEmbedding>>,
41 index: Mutex<MemRoutingIndex>,
42 decision_engine: Arc<DecisionEngine>,
43 connection_pool: Arc<McpConnectionPool>,
44 tool_registry: RwLock<HashMap<String, Tool>>,
45 dynamic_registry: Option<Arc<registry::DynamicToolRegistry>>, js_orchestrator: Option<Arc<js_orchestrator::WorkflowOrchestrator>>, }
48
49impl IntelligentRouter {
50 pub async fn initialize() -> Result<Self> {
51 let config_manager = McpConfigManager::load()?;
52 let config_arc = Arc::new(config_manager.config().clone());
53
54 let embedder = Arc::new(Mutex::new(
56 TextEmbedding::try_new(
57 InitOptions::new(EmbeddingModel::AllMiniLML6V2)
58 .with_show_download_progress(true)
59 )
60 .map_err(|e| anyhow!("Failed to initialize fastembed: {}", e))?
61 ));
62
63 let decision_endpoint = std::env::var("OPENAI_ENDPOINT")
65 .unwrap_or_else(|_| "http://localhost:11434".to_string());
66 let decision_model =
67 std::env::var("OPENAI_MODEL").unwrap_or_else(|_| "qwen3:1.7b".to_string());
68
69 let connection_pool = Arc::new(McpConnectionPool::new(config_arc.clone()));
71 let discovered = connection_pool.warm_up().await?;
72
73 let capability_generator = capability_generator::CapabilityGenerator::new();
75
76 let capability_description = capability_generator
77 .generate_capability_description(&discovered)?;
78
79 eprintln!(
80 "📝 Generated capability description: {}",
81 capability_description
82 );
83
84 let base_tools = vec![Tool {
86 name: "intelligent_route".into(),
87 title: Some("Intelligent Tool Router".into()),
88 description: Some(capability_description.into()),
89 input_schema: Arc::new(serde_json::from_value(json!({
90 "type": "object",
91 "properties": {
92 "user_request": {
93 "type": "string",
94 "description": "Natural language description of what you want to accomplish"
95 },
96 "max_candidates": {
97 "type": "integer",
98 "description": "Maximum number of candidate tools to consider (default: 3)",
99 "minimum": 1,
100 "maximum": 10
101 }
102 },
103 "required": ["user_request"]
104 }))?),
105 output_schema: None,
106 icons: None,
107 annotations: None,
108 execution: None,
109 meta: None,
110 }];
111
112 let registry_config = registry::RegistryConfig {
114 max_dynamic_tools: 5,
115 default_ttl_seconds: 86400, cleanup_interval_seconds: 3600, };
118 let dynamic_registry = Arc::new(registry::DynamicToolRegistry::with_config(
119 base_tools,
120 registry_config,
121 ));
122 let _cleanup_task = dynamic_registry.start_cleanup_task();
123
124 let has_external_api = std::env::var("OPENAI_TOKEN").is_ok()
126 || std::env::var("OPENAI_ENDPOINT")
127 .ok()
128 .map(|v| v != "http://localhost:11434")
129 .unwrap_or(false);
130
131 let (decision_engine, js_orchestrator) = if has_external_api {
132 match codegen::CodeGeneratorFactory::from_env(
134 decision_endpoint.clone(),
135 decision_model.clone(),
136 ) {
137 Ok(generator) => {
138 let decision_engine = Arc::new(DecisionEngine::new(
139 &decision_endpoint,
140 &decision_model,
141 120,
142 )?);
143 let orchestrator = Some(Arc::new(
144 js_orchestrator::WorkflowOrchestrator::with_planner(generator),
145 ));
146 (decision_engine, orchestrator)
147 }
148 Err(e) => {
149 eprintln!("⚠️ Code generator initialization failed: {}", e);
150 eprintln!("🔍 Falling back to vector-only mode");
151 let decision_engine = Arc::new(DecisionEngine::new(
152 &decision_endpoint,
153 &decision_model,
154 120,
155 )?);
156 (decision_engine, None)
157 }
158 }
159 } else {
160 eprintln!("🔍 No external LLM API detected (set OPENAI_TOKEN or OPENAI_ENDPOINT to enable orchestration)");
162 let decision_engine = Arc::new(DecisionEngine::new(
163 &decision_endpoint,
164 &decision_model,
165 120,
166 )?);
167 (decision_engine, None)
168 };
169
170 let mut index = MemRoutingIndex::new(384)?; let tool_registry = RwLock::new(HashMap::new());
172 let embeddings = build_embeddings(&embedder, &discovered, config_arc.as_ref())?;
173 index.rebuild(&embeddings.tools, &embeddings.methods)?;
174
175 populate_registry(&tool_registry, discovered).await;
176
177 Ok(Self {
178 embedder,
179 index: Mutex::new(index),
180 decision_engine,
181 connection_pool,
182 tool_registry,
183 dynamic_registry: Some(dynamic_registry),
184 js_orchestrator,
185 })
186 }
187
188 pub fn new_with_components(
190 embedder: Arc<Mutex<TextEmbedding>>,
191 index: MemRoutingIndex,
192 decision_engine: Arc<DecisionEngine>,
193 connection_pool: Arc<McpConnectionPool>,
194 tool_registry: RwLock<HashMap<String, Tool>>,
195 dynamic_registry: Option<Arc<registry::DynamicToolRegistry>>,
196 js_orchestrator: Option<Arc<js_orchestrator::WorkflowOrchestrator>>,
197 ) -> Self {
198 Self {
199 embedder,
200 index: Mutex::new(index),
201 decision_engine,
202 connection_pool,
203 tool_registry,
204 dynamic_registry,
205 js_orchestrator,
206 }
207 }
208
209 pub fn dynamic_registry(&self) -> Option<Arc<registry::DynamicToolRegistry>> {
211 self.dynamic_registry.clone()
212 }
213
214 pub fn tool_registry(&self) -> &RwLock<HashMap<String, Tool>> {
216 &self.tool_registry
217 }
218
219 pub async fn intelligent_route(
220 &self,
221 request: IntelligentRouteRequest,
222 ) -> Result<IntelligentRouteResponse> {
223 if request.user_request.trim().is_empty() {
224 return Ok(IntelligentRouteResponse {
225 success: false,
226 message: "user_request cannot be empty".into(),
227 confidence: 0.0,
228 selected_tool: None,
229 result: None,
230 alternatives: Vec::new(),
231 tool_schema: None,
232 dynamically_registered: false,
233 });
234 }
235
236 let embed = self.embedder
237 .lock()
238 .embed(vec![request.user_request.clone()], None)
239 .map_err(|e| anyhow!("Embedding generation failed: {}", e))?
240 .into_iter()
241 .next()
242 .ok_or_else(|| anyhow!("No embedding generated"))?;
243 let embed = normalize(&embed);
244
245 if matches!(request.execution_mode, models::ExecutionMode::Query) {
247 eprintln!("🔍 Query mode: using vector search (no tool registration)");
248 return self.vector_mode(&request, &embed).await;
249 }
250
251 match self.js_orchestrator.as_ref() {
254 None => {
255 eprintln!("🔍 LLM not configured, using vector search mode");
256 self.vector_mode(&request, &embed).await
257 }
258 Some(orchestrator) => {
259 let fast_threshold = 0.75_f32;
262 let top_score = {
263 let index = self.index.lock();
264 index
265 .search_tools(&embed, 1)
266 .ok()
267 .and_then(|scores| scores.into_iter().next())
268 .map(|st| st.score)
269 };
270
271 if let Some(score) = top_score {
272 if score >= fast_threshold {
273 eprintln!(
274 "⚡ High-confidence vector match ({:.2}), using fast vector_mode (skipping LLM orchestration)",
275 score
276 );
277 return self.vector_mode(&request, &embed).await;
278 }
279 }
280
281 eprintln!("🤖 Trying LLM orchestration mode...");
282 match self
283 .try_orchestrate(orchestrator.as_ref(), &request, &embed)
284 .await
285 {
286 Ok(response) => {
287 eprintln!("✅ LLM orchestration succeeded");
288 Ok(response)
289 }
290 Err(err) => {
291 eprintln!("⚠️ LLM failed: {}, falling back to vector mode", err);
292 self.vector_mode(&request, &embed).await
293 }
294 }
295 }
296 }
297 }
298
299 async fn vector_mode(
301 &self,
302 request: &IntelligentRouteRequest,
303 embed: &[f32],
304 ) -> Result<IntelligentRouteResponse> {
305 let max_tools = request
306 .max_candidates
307 .unwrap_or(config::DEFAULT_MAX_TOOLS_PER_REQUEST);
308
309 let (tool_scores, method_scores) = {
310 let index = self.index.lock();
311 let tools = index.search_tools(embed, max_tools)?;
312 let methods = index.search_methods(embed, max_tools * 2)?;
313 (tools, methods)
314 };
315
316 if tool_scores.is_empty() {
317 return Ok(IntelligentRouteResponse {
318 success: false,
319 message: "No MCP tools matched the request".into(),
320 confidence: 0.0,
321 selected_tool: None,
322 result: None,
323 alternatives: Vec::new(),
324 tool_schema: None,
325 dynamically_registered: false,
326 });
327 }
328
329 let candidate_infos = build_candidates(&tool_scores, &method_scores);
330
331 let (server, tool, arguments, rationale, confidence) = match self
333 .decision_engine
334 .decide(DecisionInput {
335 user_request: request.user_request.clone(),
336 candidates: candidate_infos.clone(),
337 })
338 .await
339 {
340 Ok(decision) => {
341 eprintln!("✅ Vector mode: LLM decision succeeded");
342 (
343 decision.server,
344 decision.tool,
345 decision.arguments,
346 decision.rationale,
347 decision.confidence,
348 )
349 }
350 Err(e) => {
351 eprintln!("⚠️ Vector mode: LLM unavailable ({}), using top vector match", e);
352 let top = &candidate_infos[0];
353 (
354 top.server.clone(),
355 top.tool.clone(),
356 Value::Object(Default::default()),
357 "Best vector match (LLM unavailable)".to_string(),
358 0.6, )
360 }
361 };
362
363 let execute_message = match request.execution_mode {
364 models::ExecutionMode::Dynamic => {
365 format!(
366 "Selected tool: {}::{} (will be dynamically registered)",
367 server, tool
368 )
369 }
370 models::ExecutionMode::Query => {
371 format!(
372 "Suggested tool: {}::{} (review and call execute_tool)",
373 server, tool
374 )
375 }
376 };
377
378 Ok(IntelligentRouteResponse {
379 success: true,
380 confidence,
381 message: execute_message,
382 selected_tool: Some(SelectedRoute {
383 mcp_server: server,
384 tool_name: tool,
385 arguments,
386 rationale,
387 }),
388 result: None,
389 alternatives: candidate_infos
390 .into_iter()
391 .skip(1)
392 .take(2)
393 .map(|cand| SelectedRoute {
394 mcp_server: cand.server,
395 tool_name: cand.tool,
396 arguments: Value::Null,
397 rationale: cand.description,
398 })
399 .collect(),
400 tool_schema: None,
401 dynamically_registered: false,
402 })
403 }
404
405 async fn try_orchestrate(
407 &self,
408 orchestrator: &js_orchestrator::WorkflowOrchestrator,
409 request: &IntelligentRouteRequest,
410 embed: &[f32],
411 ) -> Result<IntelligentRouteResponse> {
412 eprintln!(" 🔍 [DEBUG] try_orchestrate started");
413
414 let candidate_infos: Vec<CandidateToolInfo> = {
417 let registry = self.tool_registry.read().await;
418 registry
419 .iter()
420 .map(|(key, tool_def)| {
421 let parts: Vec<&str> = key.split("::").collect();
422 let server = parts.get(0).map(|s| s.to_string()).unwrap_or_default();
423 let tool_name = parts.get(1).map(|s| s.to_string()).unwrap_or_default();
424 let description = tool_def
425 .description
426 .as_ref()
427 .map(|d| d.to_string())
428 .unwrap_or_default();
429 let schema = serde_json::to_string(&*tool_def.input_schema).ok();
430
431 CandidateToolInfo {
432 server,
433 tool: tool_name,
434 description,
435 schema_snippet: schema,
436 }
437 })
438 .collect()
439 };
440
441 eprintln!(
442 " 🔍 [DEBUG] Passing {} tools to orchestrator (all available tools)",
443 candidate_infos.len()
444 );
445
446 if candidate_infos.is_empty() {
447 return Err(anyhow!("No candidate tools for orchestration"));
448 }
449
450 eprintln!(" 🔍 [DEBUG] Calling orchestrator.orchestrate()...");
451
452 let orchestrated_tool = match orchestrator
453 .orchestrate(&request.user_request, &candidate_infos)
454 .await
455 {
456 Ok(tool) => {
457 eprintln!(" ✅ [DEBUG] Orchestration succeeded: {}", tool.name);
458 tool
459 }
460 Err(e) => {
461 eprintln!(" ❌ [DEBUG] Orchestration failed: {}", e);
462 return Err(e);
463 }
464 };
465
466 let Some(registry) = self.dynamic_registry.as_ref() else {
467 return Err(anyhow!("Dynamic registry not initialized"));
468 };
469
470 let (mcp_server, message) = if let Some(proxy_info) = &orchestrated_tool.proxy_info {
472 let tool_key = format!("{}::{}", proxy_info.server, proxy_info.tool_name);
474 let tool_def = {
475 let tool_registry = self.tool_registry.read().await;
476 tool_registry.get(&tool_key).cloned()
477 };
478
479 let tool = match tool_def {
480 Some(def) => rmcp::model::Tool {
481 name: orchestrated_tool.name.clone().into(),
482 title: None,
483 description: Some(orchestrated_tool.description.clone().into()),
484 input_schema: def.input_schema.clone(),
485 output_schema: None,
486 icons: None,
487 annotations: None,
488 execution: None,
489 meta: None,
490 },
491 None => {
492 let schema_map = match &orchestrated_tool.input_schema {
494 serde_json::Value::Object(map) => map.clone(),
495 _ => serde_json::Map::new(),
496 };
497 rmcp::model::Tool {
498 name: orchestrated_tool.name.clone().into(),
499 title: None,
500 description: Some(orchestrated_tool.description.clone().into()),
501 input_schema: std::sync::Arc::new(schema_map),
502 output_schema: None,
503 icons: None,
504 annotations: None,
505 execution: None,
506 meta: None,
507 }
508 }
509 };
510
511 registry
512 .register_proxied_tool(
513 proxy_info.server.clone(),
514 proxy_info.tool_name.clone(),
515 tool,
516 )
517 .await?;
518
519 (
520 proxy_info.server.clone(),
521 format!(
522 "Registered '{}' (proxy to {}::{}). Use this tool directly.",
523 orchestrated_tool.name, proxy_info.server, proxy_info.tool_name
524 ),
525 )
526 } else if let Some(js_code) = &orchestrated_tool.js_code {
527 registry
529 .register_js_tool(
530 orchestrated_tool.name.clone(),
531 orchestrated_tool.description.clone(),
532 orchestrated_tool.input_schema.clone(),
533 js_code.clone(),
534 )
535 .await?;
536
537 (
538 "orchestrated".to_string(),
539 format!(
540 "Created orchestrated workflow '{}'. Use this tool to solve your request.",
541 orchestrated_tool.name
542 ),
543 )
544 } else {
545 return Err(anyhow!(
546 "Invalid orchestrated tool: neither proxy_info nor js_code present"
547 ));
548 };
549
550 Ok(IntelligentRouteResponse {
551 success: true,
552 message,
553 confidence: 1.0,
554 selected_tool: Some(SelectedRoute {
555 mcp_server: mcp_server.into(),
556 tool_name: orchestrated_tool.name.clone(),
557 arguments: Value::Object(Default::default()),
558 rationale: orchestrated_tool.description.clone(),
559 }),
560 result: None,
561 alternatives: Vec::new(),
562 tool_schema: Some(orchestrated_tool.input_schema),
563 dynamically_registered: true,
564 })
565 }
566
567 pub async fn get_method_schema(
568 &self,
569 server: &str,
570 tool: &str,
571 ) -> Result<MethodSchemaResponse> {
572 let registry = self.tool_registry.read().await;
573 let key = registry_key(server, tool);
574 let Some(definition) = registry.get(&key) else {
575 return Ok(MethodSchemaResponse {
576 success: false,
577 schema: None,
578 description: None,
579 annotations: None,
580 message: Some(format!("Unknown tool {server}::{tool}")),
581 });
582 };
583 let schema = Value::Object((*definition.input_schema).clone());
584 let annotations = definition
585 .annotations
586 .as_ref()
587 .map(|ann| serde_json::to_value(ann).unwrap_or(json!({})));
588 Ok(MethodSchemaResponse {
589 success: true,
590 schema: Some(schema),
591 description: definition.description.as_ref().map(|d| d.to_string()),
592 annotations,
593 message: None,
594 })
595 }
596
597 pub async fn execute_tool(&self, request: ExecuteToolRequest) -> Result<ExecuteToolResponse> {
600 let start = Instant::now();
601 let execution = self
602 .connection_pool
603 .call_tool(
604 &request.mcp_server,
605 &request.tool_name,
606 request.arguments.clone(),
607 )
608 .await;
609 let duration = start.elapsed().as_millis();
610
611 match execution {
612 Ok(output) => Ok(ExecuteToolResponse {
613 success: true,
614 message: "Tool executed successfully".to_string(),
615 result: Some(RouteExecutionResult {
616 mcp_server: request.mcp_server,
617 tool_name: request.tool_name,
618 duration_ms: duration,
619 output,
620 raw_stdout: None,
621 }),
622 }),
623 Err(err) => Ok(ExecuteToolResponse {
624 success: false,
625 message: format!("Tool execution failed: {err}"),
626 result: None,
627 }),
628 }
629 }
630
631 pub fn connection_pool(&self) -> Arc<McpConnectionPool> {
632 Arc::clone(&self.connection_pool)
633 }
634}
635
636struct PreparedEmbeddings {
637 tools: Vec<ToolEmbedding>,
638 methods: Vec<MethodEmbedding>,
639}
640
641fn build_embeddings(
642 embedder: &Arc<Mutex<TextEmbedding>>,
643 tools: &[DiscoveredTool],
644 _config: &config::McpConfig,
645) -> Result<PreparedEmbeddings> {
646 let mut docs = Vec::with_capacity(tools.len());
648 let mut metas: Vec<(String, String, String, HashMap<String, String>)> = Vec::with_capacity(tools.len());
649
650 for tool in tools {
651 let category = "uncategorized".to_string();
652 let description = tool
653 .definition
654 .description
655 .as_deref()
656 .unwrap_or("No description provided")
657 .to_string();
658 let schema_value = Value::Object((*tool.definition.input_schema).clone());
659 let schema_string = schema_value.to_string();
660
661 let doc = format!(
662 "{tool}\nDescription: {description}",
663 tool = tool.definition.name,
664 description = description,
665 );
666 docs.push(doc);
667
668 let mut metadata = HashMap::new();
669 metadata.insert("server".into(), tool.server.clone());
670 metadata.insert("tool".into(), tool.definition.name.to_string());
671 metadata.insert("description".into(), description.clone());
672 metadata.insert("category".into(), category);
673 metadata.insert("schema".into(), schema_string);
674 metas.push((tool.server.clone(), tool.definition.name.to_string(), description, metadata));
675 }
676
677 let vectors = embedder
679 .lock()
680 .embed(docs, None)
681 .map_err(|e| anyhow!("Batch embedding failed: {}", e))?;
682
683 let mut tool_embeddings = Vec::with_capacity(vectors.len());
684 let mut method_embeddings = Vec::with_capacity(vectors.len());
685
686 for (vector, (server, tool_name, description, metadata)) in vectors.into_iter().zip(metas) {
687 let vector = normalize(&vector);
688
689 tool_embeddings.push(ToolEmbedding {
690 record: ToolVectorRecord {
691 id: format!("{}::{}", server, tool_name),
692 server: server.clone(),
693 tool_name: tool_name.clone(),
694 description: description.clone(),
695 metadata: metadata.clone(),
696 },
697 vector: vector.clone(),
698 });
699
700 method_embeddings.push(MethodEmbedding {
701 record: crate::mcp_routing::models::MethodVectorRecord {
702 id: format!("{METHOD_VECTOR_PREFIX}::{server}::{tool_name}"),
703 server,
704 tool_name,
705 description,
706 metadata,
707 },
708 vector,
709 });
710 }
711
712 Ok(PreparedEmbeddings {
713 tools: tool_embeddings,
714 methods: method_embeddings,
715 })
716}
717
718async fn populate_registry(registry: &RwLock<HashMap<String, Tool>>, tools: Vec<DiscoveredTool>) {
719 let mut guard = registry.write().await;
720 guard.clear();
721 for tool in tools {
722 guard.insert(
723 registry_key(&tool.server, &tool.definition.name),
724 tool.definition,
725 );
726 }
727}
728
729fn build_candidates(tools: &[ScoredTool], methods: &[ScoredMethod]) -> Vec<CandidateToolInfo> {
730 let method_map: HashMap<String, &ScoredMethod> = methods
731 .iter()
732 .map(|method| (registry_key(&method.server, &method.tool), method))
733 .collect();
734
735 tools
736 .iter()
737 .map(|tool| {
738 let key = registry_key(&tool.server, &tool.tool);
739 let schema = method_map
740 .get(&key)
741 .and_then(|method| method.metadata.get("schema").cloned());
742 CandidateToolInfo {
743 server: tool.server.clone(),
744 tool: tool.tool.clone(),
745 description: tool.description.clone().unwrap_or_default(),
746 schema_snippet: schema,
747 }
748 })
749 .collect()
750}
751
752fn registry_key(server: &str, tool: &str) -> String {
753 format!("{server}::{tool}")
754}