1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use tokio::sync::{Mutex, RwLock};
8
9use crate::KontextDevError;
10use crate::client::{
11 ClientState, ConnectSessionResult, IntegrationInfo, KontextClient, KontextClientConfig,
12 ToolResult, create_kontext_client,
13};
14use crate::mcp::{
15 KontextMcp, KontextMcpConfig, KontextTool, RuntimeIntegrationCategory,
16 RuntimeIntegrationRecord, extract_text_content,
17};
18
19const RETRY_DELAY_MS: u64 = 250;
20const INTERNAL_TOKEN_SKEW_SECS: u64 = 30;
21
22#[derive(Clone, Debug)]
23pub struct ToolRouteRecord {
24 pub tool_id: String,
25 pub backend_id: String,
26 pub source: BackendSource,
27 pub backend_tool_id: String,
28 pub integration_id: Option<String>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum BackendSource {
33 Gateway,
34 Internal,
35}
36
37#[derive(Clone, Debug)]
38pub struct RouteConflictRecord {
39 pub tool_id: String,
40 pub kept: ToolRouteRecord,
41 pub dropped: ToolRouteRecord,
42}
43
44#[derive(Clone, Debug)]
45struct RouteInventoryCandidate {
46 tool: KontextTool,
47 route: ToolRouteRecord,
48}
49
50#[derive(Clone, Debug)]
51struct RouteInventorySnapshot {
52 version: u64,
53 tools: Vec<KontextTool>,
54 routes: HashMap<String, ToolRouteRecord>,
55 #[allow(dead_code)]
56 conflicts: Vec<RouteConflictRecord>,
57}
58
59impl RouteInventorySnapshot {
60 fn empty() -> Self {
61 Self {
62 version: 0,
63 tools: Vec::new(),
64 routes: HashMap::new(),
65 conflicts: Vec::new(),
66 }
67 }
68}
69
70#[derive(Clone, Debug)]
71struct InternalResourceToken {
72 access_token: String,
73 expires_at: Option<Instant>,
74}
75
76impl InternalResourceToken {
77 fn is_fresh(&self) -> bool {
78 match self.expires_at {
79 Some(expires_at) => {
80 let skew = Duration::from_secs(INTERNAL_TOKEN_SKEW_SECS);
81 Instant::now()
82 .checked_add(skew)
83 .is_some_and(|v| v < expires_at)
84 }
85 None => false,
86 }
87 }
88}
89
90pub type KontextOrchestratorState = ClientState;
91pub type KontextOrchestratorConfig = KontextClientConfig;
92
93#[derive(Clone)]
94pub struct KontextOrchestrator {
95 state: Arc<RwLock<KontextOrchestratorState>>,
96 config: KontextOrchestratorConfig,
97 gateway_client: KontextClient,
98 route_inventory: Arc<RwLock<RouteInventorySnapshot>>,
99 build_lock: Arc<Mutex<()>>,
100 last_build_key: Arc<RwLock<Option<String>>>,
101 version_counter: Arc<Mutex<u64>>,
102 reset_generation: Arc<Mutex<u64>>,
103 build_run_counter: Arc<Mutex<u64>>,
104 latest_committed_build_run: Arc<Mutex<u64>>,
105 runtime_integrations: Arc<RwLock<HashMap<String, RuntimeIntegrationRecord>>>,
106 internal_mcps: Arc<RwLock<HashMap<String, KontextMcp>>>,
107 internal_token_cache: Arc<Mutex<HashMap<String, InternalResourceToken>>>,
108 internal_token_locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
109 managed_internal_ops: Arc<Mutex<HashMap<String, u32>>>,
110 resolved_internal_listings: Arc<RwLock<HashSet<String>>>,
111 integration_refresh_lock: Arc<Mutex<()>>,
112}
113
114impl KontextOrchestrator {
115 pub fn new(config: KontextOrchestratorConfig) -> Self {
116 let gateway_client = create_kontext_client(config.clone());
117
118 Self {
119 state: Arc::new(RwLock::new(ClientState::Idle)),
120 config,
121 gateway_client,
122 route_inventory: Arc::new(RwLock::new(RouteInventorySnapshot::empty())),
123 build_lock: Arc::new(Mutex::new(())),
124 last_build_key: Arc::new(RwLock::new(None)),
125 version_counter: Arc::new(Mutex::new(0)),
126 reset_generation: Arc::new(Mutex::new(0)),
127 build_run_counter: Arc::new(Mutex::new(0)),
128 latest_committed_build_run: Arc::new(Mutex::new(0)),
129 runtime_integrations: Arc::new(RwLock::new(HashMap::new())),
130 internal_mcps: Arc::new(RwLock::new(HashMap::new())),
131 internal_token_cache: Arc::new(Mutex::new(HashMap::new())),
132 internal_token_locks: Arc::new(Mutex::new(HashMap::new())),
133 managed_internal_ops: Arc::new(Mutex::new(HashMap::new())),
134 resolved_internal_listings: Arc::new(RwLock::new(HashSet::new())),
135 integration_refresh_lock: Arc::new(Mutex::new(())),
136 }
137 }
138
139 pub async fn state(&self) -> KontextOrchestratorState {
140 *self.state.read().await
141 }
142
143 pub fn mcp(&self) -> &KontextMcp {
144 self.gateway_client.mcp()
145 }
146
147 pub async fn connect(&self) -> Result<(), KontextDevError> {
148 self.ensure_connected().await
149 }
150
151 pub async fn disconnect(&self) {
152 self.dispose_internal_clients().await;
153 self.runtime_integrations.write().await.clear();
154 self.reset_inventory().await;
155 self.gateway_client.disconnect().await;
156 self.internal_token_cache.lock().await.clear();
157 *self.state.write().await = ClientState::Idle;
158 }
159
160 pub async fn get_connect_page_url(&self) -> Result<ConnectSessionResult, KontextDevError> {
161 self.ensure_connected().await?;
162 self.gateway_client.get_connect_page_url().await
163 }
164
165 pub async fn sign_in(&self) -> Result<(), KontextDevError> {
166 self.ensure_connected().await
168 }
169
170 pub async fn sign_out(&self) -> Result<(), KontextDevError> {
171 self.dispose_internal_clients().await;
172 self.runtime_integrations.write().await.clear();
173 self.reset_inventory().await;
174 self.internal_token_cache.lock().await.clear();
175 self.gateway_client.sign_out().await?;
176 *self.state.write().await = ClientState::Idle;
177 Ok(())
178 }
179
180 pub async fn integrations_list(&self) -> Result<Vec<IntegrationInfo>, KontextDevError> {
181 self.ensure_connected().await?;
182
183 let discovered = self.refresh_integration_inventory().await?;
184
185 let gateway_statuses = self
186 .gateway_client
187 .integrations_list()
188 .await
189 .unwrap_or_default();
190 let gateway_by_id = gateway_statuses
191 .into_iter()
192 .map(|item| (item.id.clone(), item))
193 .collect::<HashMap<_, _>>();
194
195 let needs_internal_connect_link = discovered.iter().any(|integration| {
196 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
197 && !integration
198 .connection
199 .as_ref()
200 .map(|c| c.connected)
201 .unwrap_or(false)
202 });
203
204 let mut shared_connect_url = None;
205 if needs_internal_connect_link
206 && let Ok(connect) = self.gateway_client.get_connect_page_url().await
207 {
208 shared_connect_url = Some(connect.connect_url);
209 }
210
211 Ok(discovered
212 .into_iter()
213 .map(|integration| {
214 let gateway_status = gateway_by_id.get(&integration.id);
215 let connected = gateway_status
216 .map(|item| item.connected)
217 .unwrap_or_else(|| {
218 integration
219 .connection
220 .as_ref()
221 .map(|c| c.connected)
222 .unwrap_or(false)
223 });
224 let connect_url = gateway_status
225 .and_then(|item| item.connect_url.clone())
226 .or_else(|| {
227 (!connected
228 && integration.category
229 == RuntimeIntegrationCategory::InternalMcpCredentials)
230 .then(|| shared_connect_url.clone())
231 .flatten()
232 });
233 let reason = gateway_status
234 .and_then(|item| item.reason.clone())
235 .or_else(|| {
236 (!connected
237 && integration.category
238 == RuntimeIntegrationCategory::InternalMcpCredentials)
239 .then_some("credentials_required".to_string())
240 });
241
242 IntegrationInfo {
243 id: integration.id,
244 name: integration.name,
245 connected,
246 connect_url,
247 reason,
248 }
249 })
250 .collect())
251 }
252
253 pub async fn tools_list(&self) -> Result<Vec<KontextTool>, KontextDevError> {
254 self.tools_list_with_options(None).await
255 }
256
257 pub async fn tools_list_with_options(
258 &self,
259 options: Option<ToolsListOptions>,
260 ) -> Result<Vec<KontextTool>, KontextDevError> {
261 self.ensure_connected().await?;
262
263 let mut inventory = self.build_tool_inventory(options.clone(), None).await?;
264
265 let missing_internal = self.get_internal_integrations_missing_tools().await;
266 if !missing_internal.is_empty() {
267 let refreshed = self.refresh_integration_inventory().await?;
268 inventory = self.build_tool_inventory(options, Some(refreshed)).await?;
269 }
270
271 let unresolved_internal = self.get_internal_integrations_missing_tools().await;
272 if self.state().await == ClientState::NeedsAuth
273 && self.gateway_client.state().await == ClientState::Ready
274 && unresolved_internal.is_empty()
275 {
276 *self.state.write().await = ClientState::Ready;
277 }
278
279 Ok(inventory.tools)
280 }
281
282 pub async fn tools_execute(
283 &self,
284 tool_id: &str,
285 args: Option<Map<String, Value>>,
286 ) -> Result<ToolResult, KontextDevError> {
287 self.ensure_connected().await?;
288
289 let route = self.route_for_execution(tool_id).await?;
290
291 match route.source {
292 BackendSource::Gateway => {
293 self.gateway_client
294 .tools_execute(route.backend_tool_id.as_str(), args)
295 .await
296 }
297 BackendSource::Internal => {
298 let Some(integration_id) = route.integration_id else {
299 return Err(KontextDevError::ConnectSession {
300 message: format!(
301 "route for tool `{tool_id}` is missing integration metadata"
302 ),
303 });
304 };
305
306 let integration =
307 self.integration_by_id(&integration_id)
308 .await?
309 .ok_or_else(|| KontextDevError::ConnectSession {
310 message: format!(
311 "internal integration `{integration_id}` is no longer attached"
312 ),
313 })?;
314
315 let internal_mcp = self.internal_mcp_for(&integration).await.ok_or_else(|| {
316 KontextDevError::ConnectSession {
317 message: format!(
318 "internal integration `{integration_id}` has no MCP client"
319 ),
320 }
321 })?;
322
323 let token = match self
324 .ensure_internal_resource_token(&integration, false)
325 .await
326 {
327 Ok(token) => token,
328 Err(err) if is_auth_recovery_required(&err) => {
329 self.ensure_internal_resource_token(&integration, true)
330 .await?
331 }
332 Err(err) => return Err(err),
333 };
334
335 let integration_id_for_exec = integration_id.clone();
336 let execute_once = || {
337 let integration_id = integration_id_for_exec.clone();
338 let internal_mcp = internal_mcp.clone();
339 let token = token.clone();
340 let backend_tool_id = route.backend_tool_id.clone();
341 let args = args.clone();
342 async move {
343 self.run_managed_internal_op(integration_id.as_str(), || async {
344 let raw = internal_mcp
345 .call_tool_with_access_token(
346 token.as_str(),
347 backend_tool_id.as_str(),
348 args,
349 )
350 .await?;
351 Ok(ToolResult {
352 content: extract_text_content(&raw),
353 raw,
354 })
355 })
356 .await
357 }
358 };
359
360 match with_transient_retry(execute_once, 1).await {
361 Ok(result) => Ok(result),
362 Err(err) => {
363 if is_auth_recovery_required(&err) {
364 *self.state.write().await = ClientState::NeedsAuth;
365 }
366 Err(err)
367 }
368 }
369 }
370 }
371 }
372
373 async fn ensure_connected(&self) -> Result<(), KontextDevError> {
374 let state = self.state().await;
375 if state == ClientState::Ready {
376 return Ok(());
377 }
378
379 *self.state.write().await = ClientState::Connecting;
380
381 match self.gateway_client.connect().await {
382 Ok(()) => {
383 if let Err(err) = self.refresh_integration_inventory().await {
384 if is_auth_recovery_required(&err) {
385 *self.state.write().await = ClientState::NeedsAuth;
386 } else {
387 *self.state.write().await = ClientState::Failed;
388 }
389 return Err(err);
390 }
391 *self.state.write().await = ClientState::Ready;
392 Ok(())
393 }
394 Err(err) => {
395 if is_auth_recovery_required(&err) {
396 *self.state.write().await = ClientState::NeedsAuth;
397 } else {
398 *self.state.write().await = ClientState::Failed;
399 }
400 Err(err)
401 }
402 }
403 }
404
405 async fn refresh_integration_inventory(
406 &self,
407 ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
408 let _guard = self.integration_refresh_lock.lock().await;
409
410 let list_once = || async { self.gateway_client.mcp().list_integrations().await };
411 let items = with_transient_retry(list_once, 1).await?;
412 self.apply_runtime_integrations(&items).await;
413 Ok(items)
414 }
415
416 async fn apply_runtime_integrations(&self, integrations: &[RuntimeIntegrationRecord]) {
417 let mut runtime = self.runtime_integrations.write().await;
418 runtime.clear();
419
420 for integration in integrations {
421 runtime.insert(integration.id.clone(), integration.clone());
422 }
423 drop(runtime);
424
425 let desired = integrations
426 .iter()
427 .filter(|integration| {
428 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
429 })
430 .map(|integration| (integration.id.clone(), integration.url.clone()))
431 .collect::<BTreeMap<_, _>>();
432
433 let mut internal_mcps = self.internal_mcps.write().await;
434
435 internal_mcps.retain(|integration_id, mcp| {
436 desired
437 .get(integration_id)
438 .is_some_and(|url| mcp.mcp_url().ok().as_deref() == Some(url.as_str()))
439 });
440
441 for integration in integrations {
442 if integration.category != RuntimeIntegrationCategory::InternalMcpCredentials {
443 continue;
444 }
445 if internal_mcps.contains_key(&integration.id) {
446 continue;
447 }
448
449 internal_mcps.insert(
450 integration.id.clone(),
451 KontextMcp::new(KontextMcpConfig {
452 client_session_id: self.config.client_session_id.clone(),
453 client_id: self.config.client_id.clone(),
454 redirect_uri: self.config.redirect_uri.clone(),
455 url: Some(integration.url.clone()),
456 server: self.config.server_url.clone(),
457 client_secret: self.config.client_secret.clone(),
458 scope: self.config.scope.clone(),
459 resource: Some(integration.url.clone()),
460 session_key: Some(format!("internal:{}", integration.id)),
461 integration_ui_url: self.config.integration_ui_url.clone(),
462 integration_return_to: self.config.integration_return_to.clone(),
463 auth_timeout_seconds: self.config.auth_timeout_seconds,
464 open_connect_page_on_login: Some(false),
465 token_cache_path: None,
466 }),
467 );
468 }
469 }
470
471 async fn internal_mcp_for(&self, integration: &RuntimeIntegrationRecord) -> Option<KontextMcp> {
472 self.internal_mcps
473 .read()
474 .await
475 .get(&integration.id)
476 .cloned()
477 }
478
479 async fn integration_by_id(
480 &self,
481 integration_id: &str,
482 ) -> Result<Option<RuntimeIntegrationRecord>, KontextDevError> {
483 if let Some(existing) = self
484 .runtime_integrations
485 .read()
486 .await
487 .get(integration_id)
488 .cloned()
489 {
490 return Ok(Some(existing));
491 }
492
493 let refreshed = self.refresh_integration_inventory().await?;
494 Ok(refreshed.into_iter().find(|item| item.id == integration_id))
495 }
496
497 async fn ensure_internal_resource_token(
498 &self,
499 integration: &RuntimeIntegrationRecord,
500 force_exchange: bool,
501 ) -> Result<String, KontextDevError> {
502 if !force_exchange
503 && let Some(existing) = self
504 .internal_token_cache
505 .lock()
506 .await
507 .get(&integration.id)
508 .cloned()
509 && existing.is_fresh()
510 {
511 return Ok(existing.access_token);
512 }
513
514 let integration_lock = {
515 let mut locks = self.internal_token_locks.lock().await;
516 locks
517 .entry(integration.id.clone())
518 .or_insert_with(|| Arc::new(Mutex::new(())))
519 .clone()
520 };
521
522 let _guard = integration_lock.lock().await;
523
524 if !force_exchange
525 && let Some(existing) = self
526 .internal_token_cache
527 .lock()
528 .await
529 .get(&integration.id)
530 .cloned()
531 && existing.is_fresh()
532 {
533 return Ok(existing.access_token);
534 }
535
536 let session = self.gateway_client.mcp().authenticate_mcp().await?;
537 let exchanged = self
538 .gateway_client
539 .mcp()
540 .client()
541 .exchange_for_resource(&session.gateway_token.access_token, &integration.url, None)
542 .await?;
543
544 let expires_at = exchanged.expires_in.and_then(|seconds| {
545 (seconds > 0)
546 .then(|| Instant::now().checked_add(Duration::from_secs(seconds as u64)))
547 .flatten()
548 });
549
550 let token = InternalResourceToken {
551 access_token: exchanged.access_token,
552 expires_at,
553 };
554
555 let value = token.access_token.clone();
556 self.internal_token_cache
557 .lock()
558 .await
559 .insert(integration.id.clone(), token);
560 Ok(value)
561 }
562
563 async fn build_inventory_candidates(
564 &self,
565 options: Option<ToolsListOptions>,
566 runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
567 ) -> Result<Vec<RouteInventoryCandidate>, KontextDevError> {
568 self.resolved_internal_listings.write().await.clear();
569
570 let mut candidates = Vec::new();
571
572 let list_gateway_tools_once = || {
573 let gateway_client = self.gateway_client.clone();
574 async move { gateway_client.tools_list().await }
575 };
576 let gateway_tools = with_transient_retry(list_gateway_tools_once, 1)
577 .await?
578 .into_iter()
579 .take(
580 options
581 .as_ref()
582 .and_then(|value| value.limit)
583 .unwrap_or(u32::MAX) as usize,
584 )
585 .collect::<Vec<_>>();
586 for tool in gateway_tools {
587 let route = ToolRouteRecord {
588 tool_id: tool.id.clone(),
589 backend_id: gateway_backend_id(),
590 source: BackendSource::Gateway,
591 backend_tool_id: tool.id.clone(),
592 integration_id: None,
593 };
594 candidates.push(RouteInventoryCandidate { tool, route });
595 }
596
597 let integrations = if let Some(runtime_integrations) = runtime_integrations_override {
598 self.apply_runtime_integrations(&runtime_integrations).await;
599 runtime_integrations
600 } else {
601 self.refresh_integration_inventory().await?
602 };
603
604 let mut internal_integrations = integrations
605 .into_iter()
606 .filter(|integration| {
607 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
608 })
609 .collect::<Vec<_>>();
610 internal_integrations.sort_by(|a, b| a.id.cmp(&b.id));
611
612 let mut pending_auth_retries = Vec::new();
613
614 for integration in &internal_integrations {
615 match self
616 .add_internal_tools_for_integration(&mut candidates, integration, false)
617 .await
618 {
619 Ok(()) => {}
620 Err(err) => {
621 if is_authorization_error(&err) {
622 pending_auth_retries.push(integration.clone());
623 continue;
624 }
625 if is_token_exchange_error(&err) {
626 *self.state.write().await = ClientState::NeedsAuth;
627 }
628 }
629 }
630 }
631
632 let mut unresolved_internal_auth = false;
633 for integration in pending_auth_retries {
634 match self
635 .add_internal_tools_for_integration(&mut candidates, &integration, true)
636 .await
637 {
638 Ok(()) => {}
639 Err(err) => {
640 if is_auth_recovery_required(&err) {
641 unresolved_internal_auth = true;
642 continue;
643 }
644 }
645 }
646 }
647
648 if unresolved_internal_auth {
649 *self.state.write().await = ClientState::NeedsAuth;
650 } else if self.state().await == ClientState::NeedsAuth
651 && self.gateway_client.state().await == ClientState::Ready
652 {
653 *self.state.write().await = ClientState::Ready;
654 }
655
656 Ok(candidates)
657 }
658
659 async fn add_internal_tools_for_integration(
660 &self,
661 candidates: &mut Vec<RouteInventoryCandidate>,
662 integration: &RuntimeIntegrationRecord,
663 force_exchange: bool,
664 ) -> Result<(), KontextDevError> {
665 let Some(internal_mcp) = self.internal_mcp_for(integration).await else {
666 return Ok(());
667 };
668
669 let token = self
670 .ensure_internal_resource_token(integration, force_exchange)
671 .await?;
672
673 let list_once = || {
674 let integration_id = integration.id.clone();
675 let internal_mcp = internal_mcp.clone();
676 let token = token.clone();
677 async move {
678 self.run_managed_internal_op(&integration_id, || async {
679 internal_mcp
680 .list_tools_with_access_token(token.as_str())
681 .await
682 })
683 .await
684 }
685 };
686
687 let tools = with_transient_retry(list_once, 1).await?;
688
689 self.resolved_internal_listings
690 .write()
691 .await
692 .insert(integration.id.clone());
693
694 for tool in tools {
695 let backend_tool_id = if tool.id.is_empty() {
696 tool.name.clone()
697 } else {
698 tool.id.clone()
699 };
700 let unified_id = format!("{}:{}", integration.id, tool.name);
701
702 let mapped = KontextTool {
703 id: unified_id.clone(),
704 name: tool.name,
705 description: tool.description,
706 input_schema: tool.input_schema,
707 server: Some(crate::mcp::KontextToolServer {
708 id: integration.id.clone(),
709 name: Some(integration.name.clone()),
710 }),
711 };
712
713 let route = ToolRouteRecord {
714 tool_id: unified_id,
715 backend_id: internal_backend_id(&integration.id),
716 source: BackendSource::Internal,
717 backend_tool_id,
718 integration_id: Some(integration.id.clone()),
719 };
720
721 candidates.push(RouteInventoryCandidate {
722 tool: mapped,
723 route,
724 });
725 }
726
727 Ok(())
728 }
729
730 async fn run_managed_internal_op<T, F, Fut>(
731 &self,
732 integration_id: &str,
733 operation: F,
734 ) -> Result<T, KontextDevError>
735 where
736 F: FnOnce() -> Fut,
737 Fut: Future<Output = Result<T, KontextDevError>>,
738 {
739 {
740 let mut managed = self.managed_internal_ops.lock().await;
741 *managed.entry(integration_id.to_string()).or_insert(0) += 1;
742 }
743
744 let result = operation().await;
745
746 {
747 let mut managed = self.managed_internal_ops.lock().await;
748 if let Some(count) = managed.get_mut(integration_id) {
749 *count = count.saturating_sub(1);
750 if *count == 0 {
751 managed.remove(integration_id);
752 }
753 }
754 }
755
756 result
757 }
758
759 async fn build_tool_inventory(
760 &self,
761 options: Option<ToolsListOptions>,
762 runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
763 ) -> Result<RouteInventorySnapshot, KontextDevError> {
764 let key = inventory_build_key(options.clone(), runtime_integrations_override.as_ref());
765
766 let _guard = self.build_lock.lock().await;
767
768 {
769 let last_key = self.last_build_key.read().await;
770 let snapshot = self.route_inventory.read().await;
771 if snapshot.version > 0 && last_key.as_deref() == Some(key.as_str()) {
772 return Ok(snapshot.clone());
773 }
774 }
775
776 let build_generation = *self.reset_generation.lock().await;
777 let build_run = {
778 let mut counter = self.build_run_counter.lock().await;
779 *counter += 1;
780 *counter
781 };
782
783 let candidates = self
784 .build_inventory_candidates(options, runtime_integrations_override)
785 .await?;
786
787 if *self.reset_generation.lock().await != build_generation {
788 return Ok(self.route_inventory.read().await.clone());
789 }
790
791 let latest_committed = *self.latest_committed_build_run.lock().await;
792 let snapshot = if build_run >= latest_committed {
793 *self.latest_committed_build_run.lock().await = build_run;
794 let mut version = self.version_counter.lock().await;
795 *version += 1;
796 let built = build_route_inventory(*version, candidates);
797 *self.route_inventory.write().await = built.clone();
798 *self.last_build_key.write().await = Some(key);
799 built
800 } else {
801 let current_version = self.route_inventory.read().await.version;
802 build_route_inventory(current_version, candidates)
803 };
804
805 Ok(snapshot)
806 }
807
808 async fn route_for_execution(&self, tool_id: &str) -> Result<ToolRouteRecord, KontextDevError> {
809 if let Some(route) = self
810 .route_inventory
811 .read()
812 .await
813 .routes
814 .get(tool_id)
815 .cloned()
816 {
817 return Ok(route);
818 }
819
820 let _ = self.build_tool_inventory(None, None).await?;
821
822 if let Some(route) = self
823 .route_inventory
824 .read()
825 .await
826 .routes
827 .get(tool_id)
828 .cloned()
829 {
830 return Ok(route);
831 }
832
833 Err(KontextDevError::ConnectSession {
834 message: format!(
835 "unknown tool `{tool_id}`. Call tools.list() to refresh available tools"
836 ),
837 })
838 }
839
840 async fn get_internal_integrations_missing_tools(&self) -> Vec<String> {
841 let resolved = self.resolved_internal_listings.read().await.clone();
842 self.internal_mcps
843 .read()
844 .await
845 .keys()
846 .filter(|integration_id| !resolved.contains(*integration_id))
847 .cloned()
848 .collect()
849 }
850
851 async fn dispose_internal_clients(&self) {
852 self.internal_mcps.write().await.clear();
853 self.internal_token_cache.lock().await.clear();
854 self.internal_token_locks.lock().await.clear();
855 self.resolved_internal_listings.write().await.clear();
856 }
857
858 async fn reset_inventory(&self) {
859 *self.reset_generation.lock().await += 1;
860 *self.version_counter.lock().await = 0;
861
862 let build_run = *self.build_run_counter.lock().await;
863 *self.latest_committed_build_run.lock().await = build_run;
864
865 *self.route_inventory.write().await = RouteInventorySnapshot::empty();
866 *self.last_build_key.write().await = None;
867 }
868}
869
870#[derive(Clone, Debug)]
871pub struct ToolsListOptions {
872 pub limit: Option<u32>,
873}
874
875pub fn create_kontext_orchestrator(config: KontextOrchestratorConfig) -> KontextOrchestrator {
876 KontextOrchestrator::new(config)
877}
878
879fn gateway_backend_id() -> String {
880 "gateway".to_string()
881}
882
883fn internal_backend_id(integration_id: &str) -> String {
884 format!("internal:{integration_id}")
885}
886
887fn inventory_build_key(
888 options: Option<ToolsListOptions>,
889 runtime_integrations: Option<&Vec<RuntimeIntegrationRecord>>,
890) -> String {
891 let limit_key = match options.and_then(|item| item.limit) {
892 Some(limit) => limit.to_string(),
893 None => "none".to_string(),
894 };
895 let mut key = format!("limit:{limit_key}");
896 key.push_str("|runtime:");
897 if let Some(integrations) = runtime_integrations {
898 let mut ids = integrations
899 .iter()
900 .map(|integration| integration.id.clone())
901 .collect::<Vec<_>>();
902 ids.sort();
903 key.push_str(ids.join(",").as_str());
904 }
905 key
906}
907
908fn build_route_inventory(
909 version: u64,
910 candidates: Vec<RouteInventoryCandidate>,
911) -> RouteInventorySnapshot {
912 let mut tools = Vec::<KontextTool>::new();
913 let mut routes = HashMap::<String, ToolRouteRecord>::new();
914 let mut conflicts = Vec::<RouteConflictRecord>::new();
915
916 for candidate in candidates {
917 let tool_id = candidate.route.tool_id.clone();
918 if !routes.contains_key(&tool_id) {
919 routes.insert(tool_id.clone(), candidate.route);
920 tools.push(candidate.tool);
921 continue;
922 }
923
924 let existing = routes
925 .get(&tool_id)
926 .cloned()
927 .unwrap_or_else(|| candidate.route.clone());
928
929 conflicts.push(RouteConflictRecord {
931 tool_id,
932 kept: existing,
933 dropped: candidate.route,
934 });
935 }
936
937 RouteInventorySnapshot {
938 version,
939 tools,
940 routes,
941 conflicts,
942 }
943}
944
945async fn with_transient_retry<T, F, Fut>(
946 mut operation: F,
947 max_retries: usize,
948) -> Result<T, KontextDevError>
949where
950 F: FnMut() -> Fut,
951 Fut: Future<Output = Result<T, KontextDevError>>,
952{
953 for attempt in 0..=max_retries {
954 match operation().await {
955 Ok(value) => return Ok(value),
956 Err(err) => {
957 if attempt >= max_retries || !is_transient_error(&err) {
958 return Err(err);
959 }
960 tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
961 }
962 }
963 }
964
965 Err(KontextDevError::ConnectSession {
966 message: "transient retry loop exhausted unexpectedly".to_string(),
967 })
968}
969
970fn is_transient_error(err: &KontextDevError) -> bool {
971 match err {
972 KontextDevError::ConnectSession { message }
973 | KontextDevError::TokenExchange { message, .. }
974 | KontextDevError::TokenRequest { message, .. } => {
975 let lower = message.to_ascii_lowercase();
976 lower.contains("429")
977 || lower.contains("500")
978 || lower.contains("502")
979 || lower.contains("503")
980 || lower.contains("504")
981 || lower.contains("timed out")
982 || lower.contains("connection reset")
983 || lower.contains("broken pipe")
984 || lower.contains("temporarily unavailable")
985 || lower.contains("econnreset")
986 || lower.contains("network")
987 }
988 _ => false,
989 }
990}
991
992fn is_authorization_error(err: &KontextDevError) -> bool {
993 match err {
994 KontextDevError::OAuthCallbackTimeout { .. }
995 | KontextDevError::OAuthCallbackCancelled
996 | KontextDevError::MissingAuthorizationCode
997 | KontextDevError::OAuthCallbackError { .. }
998 | KontextDevError::InvalidOAuthState
999 | KontextDevError::TokenRequest { .. }
1000 | KontextDevError::TokenExchange { .. } => true,
1001 KontextDevError::ConnectSession { message }
1002 | KontextDevError::IntegrationOAuthInit { message } => {
1003 let lower = message.to_ascii_lowercase();
1004 lower.contains("401")
1005 || lower.contains("unauthorized")
1006 || lower.contains("invalid token")
1007 || lower.contains("authorization")
1008 || lower.contains("access token expired")
1009 }
1010 _ => false,
1011 }
1012}
1013
1014fn is_token_exchange_error(err: &KontextDevError) -> bool {
1015 matches!(err, KontextDevError::TokenExchange { .. })
1016}
1017
1018fn is_auth_recovery_required(err: &KontextDevError) -> bool {
1019 is_authorization_error(err) || is_token_exchange_error(err)
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use super::*;
1025
1026 fn route(tool_id: &str, backend_id: &str) -> ToolRouteRecord {
1027 ToolRouteRecord {
1028 tool_id: tool_id.to_string(),
1029 backend_id: backend_id.to_string(),
1030 source: BackendSource::Gateway,
1031 backend_tool_id: tool_id.to_string(),
1032 integration_id: None,
1033 }
1034 }
1035
1036 fn candidate(tool_id: &str, backend_id: &str) -> RouteInventoryCandidate {
1037 RouteInventoryCandidate {
1038 tool: KontextTool {
1039 id: tool_id.to_string(),
1040 name: tool_id.to_string(),
1041 description: None,
1042 input_schema: None,
1043 server: None,
1044 },
1045 route: route(tool_id, backend_id),
1046 }
1047 }
1048
1049 #[test]
1050 fn build_route_inventory_keeps_first_route_on_conflict() {
1051 let snapshot = build_route_inventory(
1052 1,
1053 vec![
1054 candidate("tool-a", "gateway"),
1055 candidate("tool-a", "internal:linear"),
1056 candidate("tool-b", "gateway"),
1057 ],
1058 );
1059
1060 assert_eq!(snapshot.tools.len(), 2);
1061 assert_eq!(snapshot.routes.get("tool-a").unwrap().backend_id, "gateway");
1062 assert_eq!(snapshot.conflicts.len(), 1);
1063 assert_eq!(snapshot.conflicts[0].kept.backend_id, "gateway");
1064 assert_eq!(snapshot.conflicts[0].dropped.backend_id, "internal:linear");
1065 }
1066
1067 #[test]
1068 fn inventory_build_key_is_stable_for_same_runtime_integrations() {
1069 let runtime = vec![
1070 RuntimeIntegrationRecord {
1071 id: "b".to_string(),
1072 name: "B".to_string(),
1073 url: "https://b".to_string(),
1074 category: RuntimeIntegrationCategory::InternalMcpCredentials,
1075 connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1076 auth_mode: None,
1077 credential_schema: None,
1078 requires_oauth: None,
1079 connection: None,
1080 },
1081 RuntimeIntegrationRecord {
1082 id: "a".to_string(),
1083 name: "A".to_string(),
1084 url: "https://a".to_string(),
1085 category: RuntimeIntegrationCategory::InternalMcpCredentials,
1086 connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1087 auth_mode: None,
1088 credential_schema: None,
1089 requires_oauth: None,
1090 connection: None,
1091 },
1092 ];
1093
1094 let first = inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1095 let second =
1096 inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1097 assert_eq!(first, second);
1098 }
1099
1100 #[test]
1101 fn inventory_build_key_distinguishes_none_from_zero_limit() {
1102 let no_limit = inventory_build_key(None, None);
1103 let zero_limit = inventory_build_key(Some(ToolsListOptions { limit: Some(0) }), None);
1104
1105 assert_ne!(no_limit, zero_limit);
1106 assert_eq!(no_limit, "limit:none|runtime:");
1107 assert_eq!(zero_limit, "limit:0|runtime:");
1108 }
1109
1110 #[test]
1111 fn transient_error_detection_matches_network_and_5xx() {
1112 let network = KontextDevError::ConnectSession {
1113 message: "connection reset by peer".to_string(),
1114 };
1115 let server_500 = KontextDevError::ConnectSession {
1116 message: "500 internal server error".to_string(),
1117 };
1118 let bad_request = KontextDevError::ConnectSession {
1119 message: "400 bad request".to_string(),
1120 };
1121
1122 assert!(is_transient_error(&network));
1123 assert!(is_transient_error(&server_500));
1124 assert!(!is_transient_error(&bad_request));
1125 }
1126}