1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use tokio::sync::{Mutex, RwLock};
8
9use crate::KontextDevError;
10use crate::client::{
11 ClientState, ConnectSessionResult, IntegrationInfo, KontextClient, KontextClientConfig,
12 ToolResult, create_kontext_client,
13};
14use crate::mcp::{
15 KontextMcp, KontextMcpConfig, KontextTool, RuntimeIntegrationCategory,
16 RuntimeIntegrationRecord, extract_text_content,
17};
18use crate::prompt_guidance::{KontextPromptGuidance, build_kontext_prompt_guidance};
19
20const RETRY_DELAY_MS: u64 = 250;
21const INTERNAL_TOKEN_SKEW_SECS: u64 = 30;
22
23#[derive(Clone, Debug)]
24pub struct ToolRouteRecord {
25 pub tool_id: String,
26 pub backend_id: String,
27 pub source: BackendSource,
28 pub backend_tool_id: String,
29 pub integration_id: Option<String>,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum BackendSource {
34 Gateway,
35 Internal,
36}
37
38#[derive(Clone, Debug)]
39pub struct RouteConflictRecord {
40 pub tool_id: String,
41 pub kept: ToolRouteRecord,
42 pub dropped: ToolRouteRecord,
43}
44
45#[derive(Clone, Debug)]
46struct RouteInventoryCandidate {
47 tool: KontextTool,
48 route: ToolRouteRecord,
49}
50
51#[derive(Clone, Debug)]
52struct RouteInventorySnapshot {
53 version: u64,
54 tools: Vec<KontextTool>,
55 routes: HashMap<String, ToolRouteRecord>,
56 #[allow(dead_code)]
57 conflicts: Vec<RouteConflictRecord>,
58}
59
60impl RouteInventorySnapshot {
61 fn empty() -> Self {
62 Self {
63 version: 0,
64 tools: Vec::new(),
65 routes: HashMap::new(),
66 conflicts: Vec::new(),
67 }
68 }
69}
70
71#[derive(Clone, Debug)]
72struct InternalResourceToken {
73 access_token: String,
74 expires_at: Option<Instant>,
75}
76
77impl InternalResourceToken {
78 fn is_fresh(&self) -> bool {
79 match self.expires_at {
80 Some(expires_at) => {
81 let skew = Duration::from_secs(INTERNAL_TOKEN_SKEW_SECS);
82 Instant::now()
83 .checked_add(skew)
84 .is_some_and(|v| v < expires_at)
85 }
86 None => false,
87 }
88 }
89}
90
91pub type KontextOrchestratorState = ClientState;
92pub type KontextOrchestratorConfig = KontextClientConfig;
93
94#[derive(Clone)]
95pub struct KontextOrchestrator {
96 state: Arc<RwLock<KontextOrchestratorState>>,
97 config: KontextOrchestratorConfig,
98 gateway_client: KontextClient,
99 route_inventory: Arc<RwLock<RouteInventorySnapshot>>,
100 build_lock: Arc<Mutex<()>>,
101 last_build_key: Arc<RwLock<Option<String>>>,
102 version_counter: Arc<Mutex<u64>>,
103 reset_generation: Arc<Mutex<u64>>,
104 build_run_counter: Arc<Mutex<u64>>,
105 latest_committed_build_run: Arc<Mutex<u64>>,
106 runtime_integrations: Arc<RwLock<HashMap<String, RuntimeIntegrationRecord>>>,
107 internal_mcps: Arc<RwLock<HashMap<String, KontextMcp>>>,
108 internal_token_cache: Arc<Mutex<HashMap<String, InternalResourceToken>>>,
109 internal_token_locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
110 managed_internal_ops: Arc<Mutex<HashMap<String, u32>>>,
111 resolved_internal_listings: Arc<RwLock<HashSet<String>>>,
112 integration_refresh_lock: Arc<Mutex<()>>,
113}
114
115impl KontextOrchestrator {
116 pub fn new(config: KontextOrchestratorConfig) -> Self {
117 let gateway_client = create_kontext_client(config.clone());
118
119 Self {
120 state: Arc::new(RwLock::new(ClientState::Idle)),
121 config,
122 gateway_client,
123 route_inventory: Arc::new(RwLock::new(RouteInventorySnapshot::empty())),
124 build_lock: Arc::new(Mutex::new(())),
125 last_build_key: Arc::new(RwLock::new(None)),
126 version_counter: Arc::new(Mutex::new(0)),
127 reset_generation: Arc::new(Mutex::new(0)),
128 build_run_counter: Arc::new(Mutex::new(0)),
129 latest_committed_build_run: Arc::new(Mutex::new(0)),
130 runtime_integrations: Arc::new(RwLock::new(HashMap::new())),
131 internal_mcps: Arc::new(RwLock::new(HashMap::new())),
132 internal_token_cache: Arc::new(Mutex::new(HashMap::new())),
133 internal_token_locks: Arc::new(Mutex::new(HashMap::new())),
134 managed_internal_ops: Arc::new(Mutex::new(HashMap::new())),
135 resolved_internal_listings: Arc::new(RwLock::new(HashSet::new())),
136 integration_refresh_lock: Arc::new(Mutex::new(())),
137 }
138 }
139
140 pub async fn state(&self) -> KontextOrchestratorState {
141 *self.state.read().await
142 }
143
144 pub fn mcp(&self) -> &KontextMcp {
145 self.gateway_client.mcp()
146 }
147
148 pub async fn connect(&self) -> Result<(), KontextDevError> {
149 self.ensure_connected().await
150 }
151
152 pub async fn disconnect(&self) {
153 self.dispose_internal_clients().await;
154 self.runtime_integrations.write().await.clear();
155 self.reset_inventory().await;
156 self.gateway_client.disconnect().await;
157 self.internal_token_cache.lock().await.clear();
158 *self.state.write().await = ClientState::Idle;
159 }
160
161 pub async fn get_connect_page_url(&self) -> Result<ConnectSessionResult, KontextDevError> {
162 self.ensure_connected().await?;
163 self.gateway_client.get_connect_page_url().await
164 }
165
166 pub async fn sign_in(&self) -> Result<(), KontextDevError> {
167 self.ensure_connected().await
169 }
170
171 pub async fn sign_out(&self) -> Result<(), KontextDevError> {
172 self.dispose_internal_clients().await;
173 self.runtime_integrations.write().await.clear();
174 self.reset_inventory().await;
175 self.internal_token_cache.lock().await.clear();
176 self.gateway_client.sign_out().await?;
177 *self.state.write().await = ClientState::Idle;
178 Ok(())
179 }
180
181 pub async fn integrations_list(&self) -> Result<Vec<IntegrationInfo>, KontextDevError> {
182 self.ensure_connected().await?;
183
184 let discovered = self.refresh_integration_inventory().await?;
185
186 let gateway_statuses = self
187 .gateway_client
188 .integrations_list()
189 .await
190 .unwrap_or_default();
191 let gateway_by_id = gateway_statuses
192 .into_iter()
193 .map(|item| (item.id.clone(), item))
194 .collect::<HashMap<_, _>>();
195
196 let needs_internal_connect_link = discovered.iter().any(|integration| {
197 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
198 && !integration
199 .connection
200 .as_ref()
201 .map(|c| c.connected)
202 .unwrap_or(false)
203 });
204
205 let mut shared_connect_url = None;
206 if needs_internal_connect_link
207 && let Ok(connect) = self.gateway_client.get_connect_page_url().await
208 {
209 shared_connect_url = Some(connect.connect_url);
210 }
211
212 Ok(discovered
213 .into_iter()
214 .map(|integration| {
215 let gateway_status = gateway_by_id.get(&integration.id);
216 let connected = gateway_status
217 .map(|item| item.connected)
218 .unwrap_or_else(|| {
219 integration
220 .connection
221 .as_ref()
222 .map(|c| c.connected)
223 .unwrap_or(false)
224 });
225 let connect_url = gateway_status
226 .and_then(|item| item.connect_url.clone())
227 .or_else(|| {
228 (!connected
229 && integration.category
230 == RuntimeIntegrationCategory::InternalMcpCredentials)
231 .then(|| shared_connect_url.clone())
232 .flatten()
233 });
234 let reason = gateway_status
235 .and_then(|item| item.reason.clone())
236 .or_else(|| {
237 (!connected
238 && integration.category
239 == RuntimeIntegrationCategory::InternalMcpCredentials)
240 .then_some("credentials_required".to_string())
241 });
242
243 IntegrationInfo {
244 id: integration.id,
245 name: integration.name,
246 connected,
247 connect_url,
248 reason,
249 }
250 })
251 .collect())
252 }
253
254 pub async fn tools_list(&self) -> Result<Vec<KontextTool>, KontextDevError> {
255 self.tools_list_with_options(None).await
256 }
257
258 pub async fn prompt_guidance(&self) -> Result<KontextPromptGuidance, KontextDevError> {
259 let tools = self.tools_list().await?;
260 let integrations = self.integrations_list().await?;
261 let tool_names = tools
262 .into_iter()
263 .map(|tool| tool.name)
264 .collect::<Vec<String>>();
265
266 Ok(build_kontext_prompt_guidance(
267 tool_names.as_slice(),
268 integrations.as_slice(),
269 ))
270 }
271
272 pub async fn tools_list_with_options(
273 &self,
274 options: Option<ToolsListOptions>,
275 ) -> Result<Vec<KontextTool>, KontextDevError> {
276 self.ensure_connected().await?;
277
278 let mut inventory = self.build_tool_inventory(options.clone(), None).await?;
279
280 let missing_internal = self.get_internal_integrations_missing_tools().await;
281 if !missing_internal.is_empty() {
282 let refreshed = self.refresh_integration_inventory().await?;
283 inventory = self.build_tool_inventory(options, Some(refreshed)).await?;
284 }
285
286 let unresolved_internal = self.get_internal_integrations_missing_tools().await;
287 if self.state().await == ClientState::NeedsAuth
288 && self.gateway_client.state().await == ClientState::Ready
289 && unresolved_internal.is_empty()
290 {
291 *self.state.write().await = ClientState::Ready;
292 }
293
294 Ok(inventory.tools)
295 }
296
297 pub async fn tools_execute(
298 &self,
299 tool_id: &str,
300 args: Option<Map<String, Value>>,
301 ) -> Result<ToolResult, KontextDevError> {
302 self.ensure_connected().await?;
303
304 let route = self.route_for_execution(tool_id).await?;
305
306 match route.source {
307 BackendSource::Gateway => {
308 self.gateway_client
309 .tools_execute(route.backend_tool_id.as_str(), args)
310 .await
311 }
312 BackendSource::Internal => {
313 let Some(integration_id) = route.integration_id else {
314 return Err(KontextDevError::ConnectSession {
315 message: format!(
316 "route for tool `{tool_id}` is missing integration metadata"
317 ),
318 });
319 };
320
321 let integration =
322 self.integration_by_id(&integration_id)
323 .await?
324 .ok_or_else(|| KontextDevError::ConnectSession {
325 message: format!(
326 "internal integration `{integration_id}` is no longer attached"
327 ),
328 })?;
329
330 let internal_mcp = self.internal_mcp_for(&integration).await.ok_or_else(|| {
331 KontextDevError::ConnectSession {
332 message: format!(
333 "internal integration `{integration_id}` has no MCP client"
334 ),
335 }
336 })?;
337
338 let token = match self
339 .ensure_internal_resource_token(&integration, false)
340 .await
341 {
342 Ok(token) => token,
343 Err(err) if is_auth_recovery_required(&err) => {
344 self.ensure_internal_resource_token(&integration, true)
345 .await?
346 }
347 Err(err) => return Err(err),
348 };
349
350 let integration_id_for_exec = integration_id.clone();
351 let execute_once = || {
352 let integration_id = integration_id_for_exec.clone();
353 let internal_mcp = internal_mcp.clone();
354 let token = token.clone();
355 let backend_tool_id = route.backend_tool_id.clone();
356 let args = args.clone();
357 async move {
358 self.run_managed_internal_op(integration_id.as_str(), || async {
359 let raw = internal_mcp
360 .call_tool_with_access_token(
361 token.as_str(),
362 backend_tool_id.as_str(),
363 args,
364 )
365 .await?;
366 Ok(ToolResult {
367 content: extract_text_content(&raw),
368 raw,
369 })
370 })
371 .await
372 }
373 };
374
375 match with_transient_retry(execute_once, 1).await {
376 Ok(result) => Ok(result),
377 Err(err) => {
378 if is_auth_recovery_required(&err) {
379 *self.state.write().await = ClientState::NeedsAuth;
380 }
381 Err(err)
382 }
383 }
384 }
385 }
386 }
387
388 async fn ensure_connected(&self) -> Result<(), KontextDevError> {
389 let state = self.state().await;
390 if state == ClientState::Ready {
391 return Ok(());
392 }
393
394 *self.state.write().await = ClientState::Connecting;
395
396 match self.gateway_client.connect().await {
397 Ok(()) => {
398 if let Err(err) = self.refresh_integration_inventory().await {
399 if is_auth_recovery_required(&err) {
400 *self.state.write().await = ClientState::NeedsAuth;
401 } else {
402 *self.state.write().await = ClientState::Failed;
403 }
404 return Err(err);
405 }
406 *self.state.write().await = ClientState::Ready;
407 Ok(())
408 }
409 Err(err) => {
410 if is_auth_recovery_required(&err) {
411 *self.state.write().await = ClientState::NeedsAuth;
412 } else {
413 *self.state.write().await = ClientState::Failed;
414 }
415 Err(err)
416 }
417 }
418 }
419
420 async fn refresh_integration_inventory(
421 &self,
422 ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
423 let _guard = self.integration_refresh_lock.lock().await;
424
425 let list_once = || async { self.gateway_client.mcp().list_integrations().await };
426 let items = with_transient_retry(list_once, 1).await?;
427 self.apply_runtime_integrations(&items).await;
428 Ok(items)
429 }
430
431 async fn apply_runtime_integrations(&self, integrations: &[RuntimeIntegrationRecord]) {
432 let mut runtime = self.runtime_integrations.write().await;
433 runtime.clear();
434
435 for integration in integrations {
436 runtime.insert(integration.id.clone(), integration.clone());
437 }
438 drop(runtime);
439
440 let desired = integrations
441 .iter()
442 .filter(|integration| {
443 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
444 })
445 .map(|integration| (integration.id.clone(), integration.url.clone()))
446 .collect::<BTreeMap<_, _>>();
447
448 let mut internal_mcps = self.internal_mcps.write().await;
449
450 internal_mcps.retain(|integration_id, mcp| {
451 desired
452 .get(integration_id)
453 .is_some_and(|url| mcp.mcp_url().ok().as_deref() == Some(url.as_str()))
454 });
455
456 for integration in integrations {
457 if integration.category != RuntimeIntegrationCategory::InternalMcpCredentials {
458 continue;
459 }
460 if internal_mcps.contains_key(&integration.id) {
461 continue;
462 }
463
464 internal_mcps.insert(
465 integration.id.clone(),
466 KontextMcp::new(KontextMcpConfig {
467 client_session_id: self.config.client_session_id.clone(),
468 client_id: self.config.client_id.clone(),
469 redirect_uri: self.config.redirect_uri.clone(),
470 url: Some(integration.url.clone()),
471 server: self.config.server_url.clone(),
472 client_secret: self.config.client_secret.clone(),
473 scope: self.config.scope.clone(),
474 resource: Some(integration.url.clone()),
475 session_key: Some(format!("internal:{}", integration.id)),
476 integration_ui_url: self.config.integration_ui_url.clone(),
477 integration_return_to: self.config.integration_return_to.clone(),
478 auth_timeout_seconds: self.config.auth_timeout_seconds,
479 open_connect_page_on_login: Some(false),
480 token_cache_path: None,
481 }),
482 );
483 }
484 }
485
486 async fn internal_mcp_for(&self, integration: &RuntimeIntegrationRecord) -> Option<KontextMcp> {
487 self.internal_mcps
488 .read()
489 .await
490 .get(&integration.id)
491 .cloned()
492 }
493
494 async fn integration_by_id(
495 &self,
496 integration_id: &str,
497 ) -> Result<Option<RuntimeIntegrationRecord>, KontextDevError> {
498 if let Some(existing) = self
499 .runtime_integrations
500 .read()
501 .await
502 .get(integration_id)
503 .cloned()
504 {
505 return Ok(Some(existing));
506 }
507
508 let refreshed = self.refresh_integration_inventory().await?;
509 Ok(refreshed.into_iter().find(|item| item.id == integration_id))
510 }
511
512 async fn ensure_internal_resource_token(
513 &self,
514 integration: &RuntimeIntegrationRecord,
515 force_exchange: bool,
516 ) -> Result<String, KontextDevError> {
517 if !force_exchange
518 && let Some(existing) = self
519 .internal_token_cache
520 .lock()
521 .await
522 .get(&integration.id)
523 .cloned()
524 && existing.is_fresh()
525 {
526 return Ok(existing.access_token);
527 }
528
529 let integration_lock = {
530 let mut locks = self.internal_token_locks.lock().await;
531 locks
532 .entry(integration.id.clone())
533 .or_insert_with(|| Arc::new(Mutex::new(())))
534 .clone()
535 };
536
537 let _guard = integration_lock.lock().await;
538
539 if !force_exchange
540 && let Some(existing) = self
541 .internal_token_cache
542 .lock()
543 .await
544 .get(&integration.id)
545 .cloned()
546 && existing.is_fresh()
547 {
548 return Ok(existing.access_token);
549 }
550
551 let session = self.gateway_client.mcp().authenticate_mcp().await?;
552 let exchanged = self
553 .gateway_client
554 .mcp()
555 .client()
556 .exchange_for_resource(&session.gateway_token.access_token, &integration.url, None)
557 .await?;
558
559 let expires_at = exchanged.expires_in.and_then(|seconds| {
560 (seconds > 0)
561 .then(|| Instant::now().checked_add(Duration::from_secs(seconds as u64)))
562 .flatten()
563 });
564
565 let token = InternalResourceToken {
566 access_token: exchanged.access_token,
567 expires_at,
568 };
569
570 let value = token.access_token.clone();
571 self.internal_token_cache
572 .lock()
573 .await
574 .insert(integration.id.clone(), token);
575 Ok(value)
576 }
577
578 async fn build_inventory_candidates(
579 &self,
580 options: Option<ToolsListOptions>,
581 runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
582 ) -> Result<Vec<RouteInventoryCandidate>, KontextDevError> {
583 self.resolved_internal_listings.write().await.clear();
584
585 let mut candidates = Vec::new();
586
587 let list_gateway_tools_once = || {
588 let gateway_client = self.gateway_client.clone();
589 async move { gateway_client.tools_list().await }
590 };
591 let gateway_tools = with_transient_retry(list_gateway_tools_once, 1)
592 .await?
593 .into_iter()
594 .take(
595 options
596 .as_ref()
597 .and_then(|value| value.limit)
598 .unwrap_or(u32::MAX) as usize,
599 )
600 .collect::<Vec<_>>();
601 for tool in gateway_tools {
602 let route = ToolRouteRecord {
603 tool_id: tool.id.clone(),
604 backend_id: gateway_backend_id(),
605 source: BackendSource::Gateway,
606 backend_tool_id: tool.id.clone(),
607 integration_id: None,
608 };
609 candidates.push(RouteInventoryCandidate { tool, route });
610 }
611
612 let integrations = if let Some(runtime_integrations) = runtime_integrations_override {
613 self.apply_runtime_integrations(&runtime_integrations).await;
614 runtime_integrations
615 } else {
616 self.refresh_integration_inventory().await?
617 };
618
619 let mut internal_integrations = integrations
620 .into_iter()
621 .filter(|integration| {
622 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
623 })
624 .collect::<Vec<_>>();
625 internal_integrations.sort_by(|a, b| a.id.cmp(&b.id));
626
627 let mut pending_auth_retries = Vec::new();
628
629 for integration in &internal_integrations {
630 match self
631 .add_internal_tools_for_integration(&mut candidates, integration, false)
632 .await
633 {
634 Ok(()) => {}
635 Err(err) => {
636 if is_authorization_error(&err) {
637 pending_auth_retries.push(integration.clone());
638 continue;
639 }
640 if is_token_exchange_error(&err) {
641 *self.state.write().await = ClientState::NeedsAuth;
642 }
643 }
644 }
645 }
646
647 let mut unresolved_internal_auth = false;
648 for integration in pending_auth_retries {
649 match self
650 .add_internal_tools_for_integration(&mut candidates, &integration, true)
651 .await
652 {
653 Ok(()) => {}
654 Err(err) => {
655 if is_auth_recovery_required(&err) {
656 unresolved_internal_auth = true;
657 continue;
658 }
659 }
660 }
661 }
662
663 if unresolved_internal_auth {
664 *self.state.write().await = ClientState::NeedsAuth;
665 } else if self.state().await == ClientState::NeedsAuth
666 && self.gateway_client.state().await == ClientState::Ready
667 {
668 *self.state.write().await = ClientState::Ready;
669 }
670
671 Ok(candidates)
672 }
673
674 async fn add_internal_tools_for_integration(
675 &self,
676 candidates: &mut Vec<RouteInventoryCandidate>,
677 integration: &RuntimeIntegrationRecord,
678 force_exchange: bool,
679 ) -> Result<(), KontextDevError> {
680 let Some(internal_mcp) = self.internal_mcp_for(integration).await else {
681 return Ok(());
682 };
683
684 let token = self
685 .ensure_internal_resource_token(integration, force_exchange)
686 .await?;
687
688 let list_once = || {
689 let integration_id = integration.id.clone();
690 let internal_mcp = internal_mcp.clone();
691 let token = token.clone();
692 async move {
693 self.run_managed_internal_op(&integration_id, || async {
694 internal_mcp
695 .list_tools_with_access_token(token.as_str())
696 .await
697 })
698 .await
699 }
700 };
701
702 let tools = with_transient_retry(list_once, 1).await?;
703
704 self.resolved_internal_listings
705 .write()
706 .await
707 .insert(integration.id.clone());
708
709 for tool in tools {
710 let backend_tool_id = if tool.id.is_empty() {
711 tool.name.clone()
712 } else {
713 tool.id.clone()
714 };
715 let unified_id = format!("{}:{}", integration.id, tool.name);
716
717 let mapped = KontextTool {
718 id: unified_id.clone(),
719 name: tool.name,
720 description: tool.description,
721 input_schema: tool.input_schema,
722 server: Some(crate::mcp::KontextToolServer {
723 id: integration.id.clone(),
724 name: Some(integration.name.clone()),
725 }),
726 };
727
728 let route = ToolRouteRecord {
729 tool_id: unified_id,
730 backend_id: internal_backend_id(&integration.id),
731 source: BackendSource::Internal,
732 backend_tool_id,
733 integration_id: Some(integration.id.clone()),
734 };
735
736 candidates.push(RouteInventoryCandidate {
737 tool: mapped,
738 route,
739 });
740 }
741
742 Ok(())
743 }
744
745 async fn run_managed_internal_op<T, F, Fut>(
746 &self,
747 integration_id: &str,
748 operation: F,
749 ) -> Result<T, KontextDevError>
750 where
751 F: FnOnce() -> Fut,
752 Fut: Future<Output = Result<T, KontextDevError>>,
753 {
754 {
755 let mut managed = self.managed_internal_ops.lock().await;
756 *managed.entry(integration_id.to_string()).or_insert(0) += 1;
757 }
758
759 let result = operation().await;
760
761 {
762 let mut managed = self.managed_internal_ops.lock().await;
763 if let Some(count) = managed.get_mut(integration_id) {
764 *count = count.saturating_sub(1);
765 if *count == 0 {
766 managed.remove(integration_id);
767 }
768 }
769 }
770
771 result
772 }
773
774 async fn build_tool_inventory(
775 &self,
776 options: Option<ToolsListOptions>,
777 runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
778 ) -> Result<RouteInventorySnapshot, KontextDevError> {
779 let key = inventory_build_key(options.clone(), runtime_integrations_override.as_ref());
780
781 let _guard = self.build_lock.lock().await;
782
783 {
784 let last_key = self.last_build_key.read().await;
785 let snapshot = self.route_inventory.read().await;
786 if snapshot.version > 0 && last_key.as_deref() == Some(key.as_str()) {
787 return Ok(snapshot.clone());
788 }
789 }
790
791 let build_generation = *self.reset_generation.lock().await;
792 let build_run = {
793 let mut counter = self.build_run_counter.lock().await;
794 *counter += 1;
795 *counter
796 };
797
798 let candidates = self
799 .build_inventory_candidates(options, runtime_integrations_override)
800 .await?;
801
802 if *self.reset_generation.lock().await != build_generation {
803 return Ok(self.route_inventory.read().await.clone());
804 }
805
806 let latest_committed = *self.latest_committed_build_run.lock().await;
807 let snapshot = if build_run >= latest_committed {
808 *self.latest_committed_build_run.lock().await = build_run;
809 let mut version = self.version_counter.lock().await;
810 *version += 1;
811 let built = build_route_inventory(*version, candidates);
812 *self.route_inventory.write().await = built.clone();
813 *self.last_build_key.write().await = Some(key);
814 built
815 } else {
816 let current_version = self.route_inventory.read().await.version;
817 build_route_inventory(current_version, candidates)
818 };
819
820 Ok(snapshot)
821 }
822
823 async fn route_for_execution(&self, tool_id: &str) -> Result<ToolRouteRecord, KontextDevError> {
824 if let Some(route) = self
825 .route_inventory
826 .read()
827 .await
828 .routes
829 .get(tool_id)
830 .cloned()
831 {
832 return Ok(route);
833 }
834
835 let _ = self.build_tool_inventory(None, None).await?;
836
837 if let Some(route) = self
838 .route_inventory
839 .read()
840 .await
841 .routes
842 .get(tool_id)
843 .cloned()
844 {
845 return Ok(route);
846 }
847
848 Err(KontextDevError::ConnectSession {
849 message: format!(
850 "unknown tool `{tool_id}`. Call tools.list() to refresh available tools"
851 ),
852 })
853 }
854
855 async fn get_internal_integrations_missing_tools(&self) -> Vec<String> {
856 let resolved = self.resolved_internal_listings.read().await.clone();
857 self.internal_mcps
858 .read()
859 .await
860 .keys()
861 .filter(|integration_id| !resolved.contains(*integration_id))
862 .cloned()
863 .collect()
864 }
865
866 async fn dispose_internal_clients(&self) {
867 self.internal_mcps.write().await.clear();
868 self.internal_token_cache.lock().await.clear();
869 self.internal_token_locks.lock().await.clear();
870 self.resolved_internal_listings.write().await.clear();
871 }
872
873 async fn reset_inventory(&self) {
874 *self.reset_generation.lock().await += 1;
875 *self.version_counter.lock().await = 0;
876
877 let build_run = *self.build_run_counter.lock().await;
878 *self.latest_committed_build_run.lock().await = build_run;
879
880 *self.route_inventory.write().await = RouteInventorySnapshot::empty();
881 *self.last_build_key.write().await = None;
882 }
883}
884
885#[derive(Clone, Debug)]
886pub struct ToolsListOptions {
887 pub limit: Option<u32>,
888}
889
890pub fn create_kontext_orchestrator(config: KontextOrchestratorConfig) -> KontextOrchestrator {
891 KontextOrchestrator::new(config)
892}
893
894fn gateway_backend_id() -> String {
895 "gateway".to_string()
896}
897
898fn internal_backend_id(integration_id: &str) -> String {
899 format!("internal:{integration_id}")
900}
901
902fn inventory_build_key(
903 options: Option<ToolsListOptions>,
904 runtime_integrations: Option<&Vec<RuntimeIntegrationRecord>>,
905) -> String {
906 let limit_key = match options.and_then(|item| item.limit) {
907 Some(limit) => limit.to_string(),
908 None => "none".to_string(),
909 };
910 let mut key = format!("limit:{limit_key}");
911 key.push_str("|runtime:");
912 if let Some(integrations) = runtime_integrations {
913 let mut ids = integrations
914 .iter()
915 .map(|integration| integration.id.clone())
916 .collect::<Vec<_>>();
917 ids.sort();
918 key.push_str(ids.join(",").as_str());
919 }
920 key
921}
922
923fn build_route_inventory(
924 version: u64,
925 candidates: Vec<RouteInventoryCandidate>,
926) -> RouteInventorySnapshot {
927 let mut tools = Vec::<KontextTool>::new();
928 let mut routes = HashMap::<String, ToolRouteRecord>::new();
929 let mut conflicts = Vec::<RouteConflictRecord>::new();
930
931 for candidate in candidates {
932 let tool_id = candidate.route.tool_id.clone();
933 if !routes.contains_key(&tool_id) {
934 routes.insert(tool_id.clone(), candidate.route);
935 tools.push(candidate.tool);
936 continue;
937 }
938
939 let existing = routes
940 .get(&tool_id)
941 .cloned()
942 .unwrap_or_else(|| candidate.route.clone());
943
944 conflicts.push(RouteConflictRecord {
946 tool_id,
947 kept: existing,
948 dropped: candidate.route,
949 });
950 }
951
952 RouteInventorySnapshot {
953 version,
954 tools,
955 routes,
956 conflicts,
957 }
958}
959
960async fn with_transient_retry<T, F, Fut>(
961 mut operation: F,
962 max_retries: usize,
963) -> Result<T, KontextDevError>
964where
965 F: FnMut() -> Fut,
966 Fut: Future<Output = Result<T, KontextDevError>>,
967{
968 for attempt in 0..=max_retries {
969 match operation().await {
970 Ok(value) => return Ok(value),
971 Err(err) => {
972 if attempt >= max_retries || !is_transient_error(&err) {
973 return Err(err);
974 }
975 tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
976 }
977 }
978 }
979
980 Err(KontextDevError::ConnectSession {
981 message: "transient retry loop exhausted unexpectedly".to_string(),
982 })
983}
984
985fn is_transient_error(err: &KontextDevError) -> bool {
986 match err {
987 KontextDevError::ConnectSession { message }
988 | KontextDevError::TokenExchange { message, .. }
989 | KontextDevError::TokenRequest { message, .. } => {
990 let lower = message.to_ascii_lowercase();
991 lower.contains("429")
992 || lower.contains("500")
993 || lower.contains("502")
994 || lower.contains("503")
995 || lower.contains("504")
996 || lower.contains("timed out")
997 || lower.contains("connection reset")
998 || lower.contains("broken pipe")
999 || lower.contains("temporarily unavailable")
1000 || lower.contains("econnreset")
1001 || lower.contains("network")
1002 }
1003 _ => false,
1004 }
1005}
1006
1007fn is_authorization_error(err: &KontextDevError) -> bool {
1008 match err {
1009 KontextDevError::OAuthCallbackTimeout { .. }
1010 | KontextDevError::OAuthCallbackCancelled
1011 | KontextDevError::MissingAuthorizationCode
1012 | KontextDevError::OAuthCallbackError { .. }
1013 | KontextDevError::InvalidOAuthState
1014 | KontextDevError::TokenRequest { .. }
1015 | KontextDevError::TokenExchange { .. } => true,
1016 KontextDevError::ConnectSession { message }
1017 | KontextDevError::IntegrationOAuthInit { message } => {
1018 let lower = message.to_ascii_lowercase();
1019 lower.contains("401")
1020 || lower.contains("unauthorized")
1021 || lower.contains("invalid token")
1022 || lower.contains("authorization")
1023 || lower.contains("access token expired")
1024 }
1025 _ => false,
1026 }
1027}
1028
1029fn is_token_exchange_error(err: &KontextDevError) -> bool {
1030 matches!(err, KontextDevError::TokenExchange { .. })
1031}
1032
1033fn is_auth_recovery_required(err: &KontextDevError) -> bool {
1034 is_authorization_error(err) || is_token_exchange_error(err)
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use super::*;
1040
1041 fn route(tool_id: &str, backend_id: &str) -> ToolRouteRecord {
1042 ToolRouteRecord {
1043 tool_id: tool_id.to_string(),
1044 backend_id: backend_id.to_string(),
1045 source: BackendSource::Gateway,
1046 backend_tool_id: tool_id.to_string(),
1047 integration_id: None,
1048 }
1049 }
1050
1051 fn candidate(tool_id: &str, backend_id: &str) -> RouteInventoryCandidate {
1052 RouteInventoryCandidate {
1053 tool: KontextTool {
1054 id: tool_id.to_string(),
1055 name: tool_id.to_string(),
1056 description: None,
1057 input_schema: None,
1058 server: None,
1059 },
1060 route: route(tool_id, backend_id),
1061 }
1062 }
1063
1064 #[test]
1065 fn build_route_inventory_keeps_first_route_on_conflict() {
1066 let snapshot = build_route_inventory(
1067 1,
1068 vec![
1069 candidate("tool-a", "gateway"),
1070 candidate("tool-a", "internal:linear"),
1071 candidate("tool-b", "gateway"),
1072 ],
1073 );
1074
1075 assert_eq!(snapshot.tools.len(), 2);
1076 assert_eq!(snapshot.routes.get("tool-a").unwrap().backend_id, "gateway");
1077 assert_eq!(snapshot.conflicts.len(), 1);
1078 assert_eq!(snapshot.conflicts[0].kept.backend_id, "gateway");
1079 assert_eq!(snapshot.conflicts[0].dropped.backend_id, "internal:linear");
1080 }
1081
1082 #[test]
1083 fn inventory_build_key_is_stable_for_same_runtime_integrations() {
1084 let runtime = vec![
1085 RuntimeIntegrationRecord {
1086 id: "b".to_string(),
1087 name: "B".to_string(),
1088 url: "https://b".to_string(),
1089 category: RuntimeIntegrationCategory::InternalMcpCredentials,
1090 connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1091 auth_mode: None,
1092 credential_schema: None,
1093 requires_oauth: None,
1094 connection: None,
1095 },
1096 RuntimeIntegrationRecord {
1097 id: "a".to_string(),
1098 name: "A".to_string(),
1099 url: "https://a".to_string(),
1100 category: RuntimeIntegrationCategory::InternalMcpCredentials,
1101 connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1102 auth_mode: None,
1103 credential_schema: None,
1104 requires_oauth: None,
1105 connection: None,
1106 },
1107 ];
1108
1109 let first = inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1110 let second =
1111 inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1112 assert_eq!(first, second);
1113 }
1114
1115 #[test]
1116 fn inventory_build_key_distinguishes_none_from_zero_limit() {
1117 let no_limit = inventory_build_key(None, None);
1118 let zero_limit = inventory_build_key(Some(ToolsListOptions { limit: Some(0) }), None);
1119
1120 assert_ne!(no_limit, zero_limit);
1121 assert_eq!(no_limit, "limit:none|runtime:");
1122 assert_eq!(zero_limit, "limit:0|runtime:");
1123 }
1124
1125 #[test]
1126 fn transient_error_detection_matches_network_and_5xx() {
1127 let network = KontextDevError::ConnectSession {
1128 message: "connection reset by peer".to_string(),
1129 };
1130 let server_500 = KontextDevError::ConnectSession {
1131 message: "500 internal server error".to_string(),
1132 };
1133 let bad_request = KontextDevError::ConnectSession {
1134 message: "400 bad request".to_string(),
1135 };
1136
1137 assert!(is_transient_error(&network));
1138 assert!(is_transient_error(&server_500));
1139 assert!(!is_transient_error(&bad_request));
1140 }
1141}