1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use tokio::sync::{Mutex, RwLock};
8
9use crate::KontextDevError;
10use crate::client::{
11 ClientState, ConnectSessionResult, IntegrationInfo, KontextClient, KontextClientConfig,
12 ToolResult, create_kontext_client,
13};
14use crate::mcp::{
15 KontextMcp, KontextMcpConfig, KontextTool, RuntimeIntegrationCategory,
16 RuntimeIntegrationRecord, extract_text_content,
17};
18
19const RETRY_DELAY_MS: u64 = 250;
20const INTERNAL_TOKEN_SKEW_SECS: u64 = 30;
21
22#[derive(Clone, Debug)]
23pub struct ToolRouteRecord {
24 pub tool_id: String,
25 pub backend_id: String,
26 pub source: BackendSource,
27 pub backend_tool_id: String,
28 pub integration_id: Option<String>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum BackendSource {
33 Gateway,
34 Internal,
35}
36
37#[derive(Clone, Debug)]
38pub struct RouteConflictRecord {
39 pub tool_id: String,
40 pub kept: ToolRouteRecord,
41 pub dropped: ToolRouteRecord,
42}
43
44#[derive(Clone, Debug)]
45struct RouteInventoryCandidate {
46 tool: KontextTool,
47 route: ToolRouteRecord,
48}
49
50#[derive(Clone, Debug)]
51struct RouteInventorySnapshot {
52 version: u64,
53 tools: Vec<KontextTool>,
54 routes: HashMap<String, ToolRouteRecord>,
55 #[allow(dead_code)]
56 conflicts: Vec<RouteConflictRecord>,
57}
58
59impl RouteInventorySnapshot {
60 fn empty() -> Self {
61 Self {
62 version: 0,
63 tools: Vec::new(),
64 routes: HashMap::new(),
65 conflicts: Vec::new(),
66 }
67 }
68}
69
70#[derive(Clone, Debug)]
71struct InternalResourceToken {
72 access_token: String,
73 expires_at: Option<Instant>,
74}
75
76impl InternalResourceToken {
77 fn is_fresh(&self) -> bool {
78 match self.expires_at {
79 Some(expires_at) => {
80 let skew = Duration::from_secs(INTERNAL_TOKEN_SKEW_SECS);
81 Instant::now()
82 .checked_add(skew)
83 .is_some_and(|v| v < expires_at)
84 }
85 None => false,
86 }
87 }
88}
89
90pub type KontextOrchestratorState = ClientState;
91pub type KontextOrchestratorConfig = KontextClientConfig;
92
93#[derive(Clone)]
94pub struct KontextOrchestrator {
95 state: Arc<RwLock<KontextOrchestratorState>>,
96 config: KontextOrchestratorConfig,
97 gateway_client: KontextClient,
98 route_inventory: Arc<RwLock<RouteInventorySnapshot>>,
99 build_lock: Arc<Mutex<()>>,
100 last_build_key: Arc<RwLock<Option<String>>>,
101 version_counter: Arc<Mutex<u64>>,
102 reset_generation: Arc<Mutex<u64>>,
103 build_run_counter: Arc<Mutex<u64>>,
104 latest_committed_build_run: Arc<Mutex<u64>>,
105 runtime_integrations: Arc<RwLock<HashMap<String, RuntimeIntegrationRecord>>>,
106 internal_mcps: Arc<RwLock<HashMap<String, KontextMcp>>>,
107 internal_token_cache: Arc<Mutex<HashMap<String, InternalResourceToken>>>,
108 internal_token_locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
109 managed_internal_ops: Arc<Mutex<HashMap<String, u32>>>,
110 resolved_internal_listings: Arc<RwLock<HashSet<String>>>,
111 integration_refresh_lock: Arc<Mutex<()>>,
112}
113
114impl KontextOrchestrator {
115 pub fn new(config: KontextOrchestratorConfig) -> Self {
116 let gateway_client = create_kontext_client(config.clone());
117
118 Self {
119 state: Arc::new(RwLock::new(ClientState::Idle)),
120 config,
121 gateway_client,
122 route_inventory: Arc::new(RwLock::new(RouteInventorySnapshot::empty())),
123 build_lock: Arc::new(Mutex::new(())),
124 last_build_key: Arc::new(RwLock::new(None)),
125 version_counter: Arc::new(Mutex::new(0)),
126 reset_generation: Arc::new(Mutex::new(0)),
127 build_run_counter: Arc::new(Mutex::new(0)),
128 latest_committed_build_run: Arc::new(Mutex::new(0)),
129 runtime_integrations: Arc::new(RwLock::new(HashMap::new())),
130 internal_mcps: Arc::new(RwLock::new(HashMap::new())),
131 internal_token_cache: Arc::new(Mutex::new(HashMap::new())),
132 internal_token_locks: Arc::new(Mutex::new(HashMap::new())),
133 managed_internal_ops: Arc::new(Mutex::new(HashMap::new())),
134 resolved_internal_listings: Arc::new(RwLock::new(HashSet::new())),
135 integration_refresh_lock: Arc::new(Mutex::new(())),
136 }
137 }
138
139 pub async fn state(&self) -> KontextOrchestratorState {
140 *self.state.read().await
141 }
142
143 pub fn mcp(&self) -> &KontextMcp {
144 self.gateway_client.mcp()
145 }
146
147 pub async fn connect(&self) -> Result<(), KontextDevError> {
148 self.ensure_connected().await
149 }
150
151 pub async fn disconnect(&self) {
152 self.dispose_internal_clients().await;
153 self.runtime_integrations.write().await.clear();
154 self.reset_inventory().await;
155 self.gateway_client.disconnect().await;
156 self.internal_token_cache.lock().await.clear();
157 *self.state.write().await = ClientState::Idle;
158 }
159
160 pub async fn get_connect_page_url(&self) -> Result<ConnectSessionResult, KontextDevError> {
161 self.ensure_connected().await?;
162 self.gateway_client.get_connect_page_url().await
163 }
164
165 pub async fn sign_in(&self) -> Result<(), KontextDevError> {
166 self.ensure_connected().await
168 }
169
170 pub async fn sign_out(&self) -> Result<(), KontextDevError> {
171 self.dispose_internal_clients().await;
172 self.runtime_integrations.write().await.clear();
173 self.reset_inventory().await;
174 self.internal_token_cache.lock().await.clear();
175 self.gateway_client.sign_out().await?;
176 *self.state.write().await = ClientState::Idle;
177 Ok(())
178 }
179
180 pub async fn integrations_list(&self) -> Result<Vec<IntegrationInfo>, KontextDevError> {
181 self.ensure_connected().await?;
182
183 let discovered = self.refresh_integration_inventory().await?;
184
185 let gateway_statuses = self
186 .gateway_client
187 .integrations_list()
188 .await
189 .unwrap_or_default();
190 let gateway_by_id = gateway_statuses
191 .into_iter()
192 .map(|item| (item.id.clone(), item))
193 .collect::<HashMap<_, _>>();
194
195 let needs_internal_connect_link = discovered.iter().any(|integration| {
196 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
197 && !integration
198 .connection
199 .as_ref()
200 .map(|c| c.connected)
201 .unwrap_or(false)
202 });
203
204 let mut shared_connect_url = None;
205 if needs_internal_connect_link
206 && let Ok(connect) = self.gateway_client.get_connect_page_url().await
207 {
208 shared_connect_url = Some(connect.connect_url);
209 }
210
211 Ok(discovered
212 .into_iter()
213 .map(|integration| {
214 let gateway_status = gateway_by_id.get(&integration.id);
215 let connected = gateway_status
216 .map(|item| item.connected)
217 .unwrap_or_else(|| {
218 integration
219 .connection
220 .as_ref()
221 .map(|c| c.connected)
222 .unwrap_or(false)
223 });
224 let connect_url = gateway_status
225 .and_then(|item| item.connect_url.clone())
226 .or_else(|| {
227 (!connected
228 && integration.category
229 == RuntimeIntegrationCategory::InternalMcpCredentials)
230 .then(|| shared_connect_url.clone())
231 .flatten()
232 });
233 let reason = gateway_status
234 .and_then(|item| item.reason.clone())
235 .or_else(|| {
236 (!connected
237 && integration.category
238 == RuntimeIntegrationCategory::InternalMcpCredentials)
239 .then_some("credentials_required".to_string())
240 });
241
242 IntegrationInfo {
243 id: integration.id,
244 name: integration.name,
245 connected,
246 connect_url,
247 reason,
248 }
249 })
250 .collect())
251 }
252
253 pub async fn tools_list(&self) -> Result<Vec<KontextTool>, KontextDevError> {
254 self.tools_list_with_options(None).await
255 }
256
257 pub async fn tools_list_with_options(
258 &self,
259 options: Option<ToolsListOptions>,
260 ) -> Result<Vec<KontextTool>, KontextDevError> {
261 self.ensure_connected().await?;
262
263 let mut inventory = self.build_tool_inventory(options.clone(), None).await?;
264
265 let missing_internal = self.get_internal_integrations_missing_tools().await;
266 if !missing_internal.is_empty() {
267 let refreshed = self.refresh_integration_inventory().await?;
268 inventory = self.build_tool_inventory(options, Some(refreshed)).await?;
269 }
270
271 let unresolved_internal = self.get_internal_integrations_missing_tools().await;
272 if self.state().await == ClientState::NeedsAuth
273 && self.gateway_client.state().await == ClientState::Ready
274 && unresolved_internal.is_empty()
275 {
276 *self.state.write().await = ClientState::Ready;
277 }
278
279 Ok(inventory.tools)
280 }
281
282 pub async fn tools_execute(
283 &self,
284 tool_id: &str,
285 args: Option<Map<String, Value>>,
286 ) -> Result<ToolResult, KontextDevError> {
287 self.ensure_connected().await?;
288
289 let route = self.route_for_execution(tool_id).await?;
290
291 match route.source {
292 BackendSource::Gateway => {
293 self.gateway_client
294 .tools_execute(route.backend_tool_id.as_str(), args)
295 .await
296 }
297 BackendSource::Internal => {
298 let Some(integration_id) = route.integration_id else {
299 return Err(KontextDevError::ConnectSession {
300 message: format!(
301 "route for tool `{tool_id}` is missing integration metadata"
302 ),
303 });
304 };
305
306 let integration =
307 self.integration_by_id(&integration_id)
308 .await?
309 .ok_or_else(|| KontextDevError::ConnectSession {
310 message: format!(
311 "internal integration `{integration_id}` is no longer attached"
312 ),
313 })?;
314
315 let internal_mcp = self.internal_mcp_for(&integration).await.ok_or_else(|| {
316 KontextDevError::ConnectSession {
317 message: format!(
318 "internal integration `{integration_id}` has no MCP client"
319 ),
320 }
321 })?;
322
323 let token = match self
324 .ensure_internal_resource_token(&integration, false)
325 .await
326 {
327 Ok(token) => token,
328 Err(err) if is_auth_recovery_required(&err) => {
329 self.ensure_internal_resource_token(&integration, true)
330 .await?
331 }
332 Err(err) => return Err(err),
333 };
334
335 let integration_id_for_exec = integration_id.clone();
336 let execute_once = || {
337 let integration_id = integration_id_for_exec.clone();
338 let internal_mcp = internal_mcp.clone();
339 let token = token.clone();
340 let backend_tool_id = route.backend_tool_id.clone();
341 let args = args.clone();
342 async move {
343 self.run_managed_internal_op(integration_id.as_str(), || async {
344 let raw = internal_mcp
345 .call_tool_with_access_token(
346 token.as_str(),
347 backend_tool_id.as_str(),
348 args,
349 )
350 .await?;
351 Ok(ToolResult {
352 content: extract_text_content(&raw),
353 raw,
354 })
355 })
356 .await
357 }
358 };
359
360 match with_transient_retry(execute_once, 1).await {
361 Ok(result) => Ok(result),
362 Err(err) => {
363 if is_auth_recovery_required(&err) {
364 *self.state.write().await = ClientState::NeedsAuth;
365 }
366 Err(err)
367 }
368 }
369 }
370 }
371 }
372
373 async fn ensure_connected(&self) -> Result<(), KontextDevError> {
374 let state = self.state().await;
375 if state == ClientState::Ready {
376 return Ok(());
377 }
378
379 *self.state.write().await = ClientState::Connecting;
380
381 match self.gateway_client.connect().await {
382 Ok(()) => {
383 if let Err(err) = self.refresh_integration_inventory().await {
384 if is_auth_recovery_required(&err) {
385 *self.state.write().await = ClientState::NeedsAuth;
386 } else {
387 *self.state.write().await = ClientState::Failed;
388 }
389 return Err(err);
390 }
391 *self.state.write().await = ClientState::Ready;
392 Ok(())
393 }
394 Err(err) => {
395 if is_auth_recovery_required(&err) {
396 *self.state.write().await = ClientState::NeedsAuth;
397 } else {
398 *self.state.write().await = ClientState::Failed;
399 }
400 Err(err)
401 }
402 }
403 }
404
405 async fn refresh_integration_inventory(
406 &self,
407 ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
408 let _guard = self.integration_refresh_lock.lock().await;
409
410 let list_once = || async { self.gateway_client.mcp().list_integrations().await };
411 let items = with_transient_retry(list_once, 1).await?;
412 self.apply_runtime_integrations(&items).await;
413 Ok(items)
414 }
415
416 async fn apply_runtime_integrations(&self, integrations: &[RuntimeIntegrationRecord]) {
417 let mut runtime = self.runtime_integrations.write().await;
418 runtime.clear();
419
420 for integration in integrations {
421 runtime.insert(integration.id.clone(), integration.clone());
422 }
423 drop(runtime);
424
425 let desired = integrations
426 .iter()
427 .filter(|integration| {
428 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
429 })
430 .map(|integration| (integration.id.clone(), integration.url.clone()))
431 .collect::<BTreeMap<_, _>>();
432
433 let mut internal_mcps = self.internal_mcps.write().await;
434
435 internal_mcps.retain(|integration_id, mcp| {
436 desired
437 .get(integration_id)
438 .is_some_and(|url| mcp.mcp_url().ok().as_deref() == Some(url.as_str()))
439 });
440
441 for integration in integrations {
442 if integration.category != RuntimeIntegrationCategory::InternalMcpCredentials {
443 continue;
444 }
445 if internal_mcps.contains_key(&integration.id) {
446 continue;
447 }
448
449 internal_mcps.insert(
450 integration.id.clone(),
451 KontextMcp::new(KontextMcpConfig {
452 client_id: self.config.client_id.clone(),
453 redirect_uri: self.config.redirect_uri.clone(),
454 url: Some(integration.url.clone()),
455 server: self.config.server_url.clone(),
456 client_secret: self.config.client_secret.clone(),
457 scope: self.config.scope.clone(),
458 resource: Some(integration.url.clone()),
459 session_key: Some(format!("internal:{}", integration.id)),
460 integration_ui_url: self.config.integration_ui_url.clone(),
461 integration_return_to: self.config.integration_return_to.clone(),
462 auth_timeout_seconds: self.config.auth_timeout_seconds,
463 open_connect_page_on_login: Some(false),
464 token_cache_path: None,
465 }),
466 );
467 }
468 }
469
470 async fn internal_mcp_for(&self, integration: &RuntimeIntegrationRecord) -> Option<KontextMcp> {
471 self.internal_mcps
472 .read()
473 .await
474 .get(&integration.id)
475 .cloned()
476 }
477
478 async fn integration_by_id(
479 &self,
480 integration_id: &str,
481 ) -> Result<Option<RuntimeIntegrationRecord>, KontextDevError> {
482 if let Some(existing) = self
483 .runtime_integrations
484 .read()
485 .await
486 .get(integration_id)
487 .cloned()
488 {
489 return Ok(Some(existing));
490 }
491
492 let refreshed = self.refresh_integration_inventory().await?;
493 Ok(refreshed.into_iter().find(|item| item.id == integration_id))
494 }
495
496 async fn ensure_internal_resource_token(
497 &self,
498 integration: &RuntimeIntegrationRecord,
499 force_exchange: bool,
500 ) -> Result<String, KontextDevError> {
501 if !force_exchange
502 && let Some(existing) = self
503 .internal_token_cache
504 .lock()
505 .await
506 .get(&integration.id)
507 .cloned()
508 && existing.is_fresh()
509 {
510 return Ok(existing.access_token);
511 }
512
513 let integration_lock = {
514 let mut locks = self.internal_token_locks.lock().await;
515 locks
516 .entry(integration.id.clone())
517 .or_insert_with(|| Arc::new(Mutex::new(())))
518 .clone()
519 };
520
521 let _guard = integration_lock.lock().await;
522
523 if !force_exchange
524 && let Some(existing) = self
525 .internal_token_cache
526 .lock()
527 .await
528 .get(&integration.id)
529 .cloned()
530 && existing.is_fresh()
531 {
532 return Ok(existing.access_token);
533 }
534
535 let session = self.gateway_client.mcp().authenticate_mcp().await?;
536 let exchanged = self
537 .gateway_client
538 .mcp()
539 .client()
540 .exchange_for_resource(&session.gateway_token.access_token, &integration.url, None)
541 .await?;
542
543 let expires_at = exchanged.expires_in.and_then(|seconds| {
544 (seconds > 0)
545 .then(|| Instant::now().checked_add(Duration::from_secs(seconds as u64)))
546 .flatten()
547 });
548
549 let token = InternalResourceToken {
550 access_token: exchanged.access_token,
551 expires_at,
552 };
553
554 let value = token.access_token.clone();
555 self.internal_token_cache
556 .lock()
557 .await
558 .insert(integration.id.clone(), token);
559 Ok(value)
560 }
561
562 async fn build_inventory_candidates(
563 &self,
564 options: Option<ToolsListOptions>,
565 runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
566 ) -> Result<Vec<RouteInventoryCandidate>, KontextDevError> {
567 self.resolved_internal_listings.write().await.clear();
568
569 let mut candidates = Vec::new();
570
571 let list_gateway_tools_once = || {
572 let gateway_client = self.gateway_client.clone();
573 async move { gateway_client.tools_list().await }
574 };
575 let gateway_tools = with_transient_retry(list_gateway_tools_once, 1)
576 .await?
577 .into_iter()
578 .take(
579 options
580 .as_ref()
581 .and_then(|value| value.limit)
582 .unwrap_or(u32::MAX) as usize,
583 )
584 .collect::<Vec<_>>();
585 for tool in gateway_tools {
586 let route = ToolRouteRecord {
587 tool_id: tool.id.clone(),
588 backend_id: gateway_backend_id(),
589 source: BackendSource::Gateway,
590 backend_tool_id: tool.id.clone(),
591 integration_id: None,
592 };
593 candidates.push(RouteInventoryCandidate { tool, route });
594 }
595
596 let integrations = if let Some(runtime_integrations) = runtime_integrations_override {
597 self.apply_runtime_integrations(&runtime_integrations).await;
598 runtime_integrations
599 } else {
600 self.refresh_integration_inventory().await?
601 };
602
603 let mut internal_integrations = integrations
604 .into_iter()
605 .filter(|integration| {
606 integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
607 })
608 .collect::<Vec<_>>();
609 internal_integrations.sort_by(|a, b| a.id.cmp(&b.id));
610
611 let mut pending_auth_retries = Vec::new();
612
613 for integration in &internal_integrations {
614 match self
615 .add_internal_tools_for_integration(&mut candidates, integration, false)
616 .await
617 {
618 Ok(()) => {}
619 Err(err) => {
620 if is_authorization_error(&err) {
621 pending_auth_retries.push(integration.clone());
622 continue;
623 }
624 if is_token_exchange_error(&err) {
625 *self.state.write().await = ClientState::NeedsAuth;
626 }
627 }
628 }
629 }
630
631 let mut unresolved_internal_auth = false;
632 for integration in pending_auth_retries {
633 match self
634 .add_internal_tools_for_integration(&mut candidates, &integration, true)
635 .await
636 {
637 Ok(()) => {}
638 Err(err) => {
639 if is_auth_recovery_required(&err) {
640 unresolved_internal_auth = true;
641 continue;
642 }
643 }
644 }
645 }
646
647 if unresolved_internal_auth {
648 *self.state.write().await = ClientState::NeedsAuth;
649 } else if self.state().await == ClientState::NeedsAuth
650 && self.gateway_client.state().await == ClientState::Ready
651 {
652 *self.state.write().await = ClientState::Ready;
653 }
654
655 Ok(candidates)
656 }
657
658 async fn add_internal_tools_for_integration(
659 &self,
660 candidates: &mut Vec<RouteInventoryCandidate>,
661 integration: &RuntimeIntegrationRecord,
662 force_exchange: bool,
663 ) -> Result<(), KontextDevError> {
664 let Some(internal_mcp) = self.internal_mcp_for(integration).await else {
665 return Ok(());
666 };
667
668 let token = self
669 .ensure_internal_resource_token(integration, force_exchange)
670 .await?;
671
672 let list_once = || {
673 let integration_id = integration.id.clone();
674 let internal_mcp = internal_mcp.clone();
675 let token = token.clone();
676 async move {
677 self.run_managed_internal_op(&integration_id, || async {
678 internal_mcp
679 .list_tools_with_access_token(token.as_str())
680 .await
681 })
682 .await
683 }
684 };
685
686 let tools = with_transient_retry(list_once, 1).await?;
687
688 self.resolved_internal_listings
689 .write()
690 .await
691 .insert(integration.id.clone());
692
693 for tool in tools {
694 let backend_tool_id = if tool.id.is_empty() {
695 tool.name.clone()
696 } else {
697 tool.id.clone()
698 };
699 let unified_id = format!("{}:{}", integration.id, tool.name);
700
701 let mapped = KontextTool {
702 id: unified_id.clone(),
703 name: tool.name,
704 description: tool.description,
705 input_schema: tool.input_schema,
706 server: Some(crate::mcp::KontextToolServer {
707 id: integration.id.clone(),
708 name: Some(integration.name.clone()),
709 }),
710 };
711
712 let route = ToolRouteRecord {
713 tool_id: unified_id,
714 backend_id: internal_backend_id(&integration.id),
715 source: BackendSource::Internal,
716 backend_tool_id,
717 integration_id: Some(integration.id.clone()),
718 };
719
720 candidates.push(RouteInventoryCandidate {
721 tool: mapped,
722 route,
723 });
724 }
725
726 Ok(())
727 }
728
729 async fn run_managed_internal_op<T, F, Fut>(
730 &self,
731 integration_id: &str,
732 operation: F,
733 ) -> Result<T, KontextDevError>
734 where
735 F: FnOnce() -> Fut,
736 Fut: Future<Output = Result<T, KontextDevError>>,
737 {
738 {
739 let mut managed = self.managed_internal_ops.lock().await;
740 *managed.entry(integration_id.to_string()).or_insert(0) += 1;
741 }
742
743 let result = operation().await;
744
745 {
746 let mut managed = self.managed_internal_ops.lock().await;
747 if let Some(count) = managed.get_mut(integration_id) {
748 *count = count.saturating_sub(1);
749 if *count == 0 {
750 managed.remove(integration_id);
751 }
752 }
753 }
754
755 result
756 }
757
758 async fn build_tool_inventory(
759 &self,
760 options: Option<ToolsListOptions>,
761 runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
762 ) -> Result<RouteInventorySnapshot, KontextDevError> {
763 let key = inventory_build_key(options.clone(), runtime_integrations_override.as_ref());
764
765 let _guard = self.build_lock.lock().await;
766
767 {
768 let last_key = self.last_build_key.read().await;
769 let snapshot = self.route_inventory.read().await;
770 if snapshot.version > 0 && last_key.as_deref() == Some(key.as_str()) {
771 return Ok(snapshot.clone());
772 }
773 }
774
775 let build_generation = *self.reset_generation.lock().await;
776 let build_run = {
777 let mut counter = self.build_run_counter.lock().await;
778 *counter += 1;
779 *counter
780 };
781
782 let candidates = self
783 .build_inventory_candidates(options, runtime_integrations_override)
784 .await?;
785
786 if *self.reset_generation.lock().await != build_generation {
787 return Ok(self.route_inventory.read().await.clone());
788 }
789
790 let latest_committed = *self.latest_committed_build_run.lock().await;
791 let snapshot = if build_run >= latest_committed {
792 *self.latest_committed_build_run.lock().await = build_run;
793 let mut version = self.version_counter.lock().await;
794 *version += 1;
795 let built = build_route_inventory(*version, candidates);
796 *self.route_inventory.write().await = built.clone();
797 *self.last_build_key.write().await = Some(key);
798 built
799 } else {
800 let current_version = self.route_inventory.read().await.version;
801 build_route_inventory(current_version, candidates)
802 };
803
804 Ok(snapshot)
805 }
806
807 async fn route_for_execution(&self, tool_id: &str) -> Result<ToolRouteRecord, KontextDevError> {
808 if let Some(route) = self
809 .route_inventory
810 .read()
811 .await
812 .routes
813 .get(tool_id)
814 .cloned()
815 {
816 return Ok(route);
817 }
818
819 let _ = self.build_tool_inventory(None, None).await?;
820
821 if let Some(route) = self
822 .route_inventory
823 .read()
824 .await
825 .routes
826 .get(tool_id)
827 .cloned()
828 {
829 return Ok(route);
830 }
831
832 Err(KontextDevError::ConnectSession {
833 message: format!(
834 "unknown tool `{tool_id}`. Call tools.list() to refresh available tools"
835 ),
836 })
837 }
838
839 async fn get_internal_integrations_missing_tools(&self) -> Vec<String> {
840 let resolved = self.resolved_internal_listings.read().await.clone();
841 self.internal_mcps
842 .read()
843 .await
844 .keys()
845 .filter(|integration_id| !resolved.contains(*integration_id))
846 .cloned()
847 .collect()
848 }
849
850 async fn dispose_internal_clients(&self) {
851 self.internal_mcps.write().await.clear();
852 self.internal_token_cache.lock().await.clear();
853 self.internal_token_locks.lock().await.clear();
854 self.resolved_internal_listings.write().await.clear();
855 }
856
857 async fn reset_inventory(&self) {
858 *self.reset_generation.lock().await += 1;
859 *self.version_counter.lock().await = 0;
860
861 let build_run = *self.build_run_counter.lock().await;
862 *self.latest_committed_build_run.lock().await = build_run;
863
864 *self.route_inventory.write().await = RouteInventorySnapshot::empty();
865 *self.last_build_key.write().await = None;
866 }
867}
868
869#[derive(Clone, Debug)]
870pub struct ToolsListOptions {
871 pub limit: Option<u32>,
872}
873
874pub fn create_kontext_orchestrator(config: KontextOrchestratorConfig) -> KontextOrchestrator {
875 KontextOrchestrator::new(config)
876}
877
878fn gateway_backend_id() -> String {
879 "gateway".to_string()
880}
881
882fn internal_backend_id(integration_id: &str) -> String {
883 format!("internal:{integration_id}")
884}
885
886fn inventory_build_key(
887 options: Option<ToolsListOptions>,
888 runtime_integrations: Option<&Vec<RuntimeIntegrationRecord>>,
889) -> String {
890 let limit_key = match options.and_then(|item| item.limit) {
891 Some(limit) => limit.to_string(),
892 None => "none".to_string(),
893 };
894 let mut key = format!("limit:{limit_key}");
895 key.push_str("|runtime:");
896 if let Some(integrations) = runtime_integrations {
897 let mut ids = integrations
898 .iter()
899 .map(|integration| integration.id.clone())
900 .collect::<Vec<_>>();
901 ids.sort();
902 key.push_str(ids.join(",").as_str());
903 }
904 key
905}
906
907fn build_route_inventory(
908 version: u64,
909 candidates: Vec<RouteInventoryCandidate>,
910) -> RouteInventorySnapshot {
911 let mut tools = Vec::<KontextTool>::new();
912 let mut routes = HashMap::<String, ToolRouteRecord>::new();
913 let mut conflicts = Vec::<RouteConflictRecord>::new();
914
915 for candidate in candidates {
916 let tool_id = candidate.route.tool_id.clone();
917 if !routes.contains_key(&tool_id) {
918 routes.insert(tool_id.clone(), candidate.route);
919 tools.push(candidate.tool);
920 continue;
921 }
922
923 let existing = routes
924 .get(&tool_id)
925 .cloned()
926 .unwrap_or_else(|| candidate.route.clone());
927
928 conflicts.push(RouteConflictRecord {
930 tool_id,
931 kept: existing,
932 dropped: candidate.route,
933 });
934 }
935
936 RouteInventorySnapshot {
937 version,
938 tools,
939 routes,
940 conflicts,
941 }
942}
943
944async fn with_transient_retry<T, F, Fut>(
945 mut operation: F,
946 max_retries: usize,
947) -> Result<T, KontextDevError>
948where
949 F: FnMut() -> Fut,
950 Fut: Future<Output = Result<T, KontextDevError>>,
951{
952 for attempt in 0..=max_retries {
953 match operation().await {
954 Ok(value) => return Ok(value),
955 Err(err) => {
956 if attempt >= max_retries || !is_transient_error(&err) {
957 return Err(err);
958 }
959 tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
960 }
961 }
962 }
963
964 Err(KontextDevError::ConnectSession {
965 message: "transient retry loop exhausted unexpectedly".to_string(),
966 })
967}
968
969fn is_transient_error(err: &KontextDevError) -> bool {
970 match err {
971 KontextDevError::ConnectSession { message }
972 | KontextDevError::TokenExchange { message, .. }
973 | KontextDevError::TokenRequest { message, .. } => {
974 let lower = message.to_ascii_lowercase();
975 lower.contains("429")
976 || lower.contains("500")
977 || lower.contains("502")
978 || lower.contains("503")
979 || lower.contains("504")
980 || lower.contains("timed out")
981 || lower.contains("connection reset")
982 || lower.contains("broken pipe")
983 || lower.contains("temporarily unavailable")
984 || lower.contains("econnreset")
985 || lower.contains("network")
986 }
987 _ => false,
988 }
989}
990
991fn is_authorization_error(err: &KontextDevError) -> bool {
992 match err {
993 KontextDevError::OAuthCallbackTimeout { .. }
994 | KontextDevError::OAuthCallbackCancelled
995 | KontextDevError::MissingAuthorizationCode
996 | KontextDevError::OAuthCallbackError { .. }
997 | KontextDevError::InvalidOAuthState
998 | KontextDevError::TokenRequest { .. }
999 | KontextDevError::TokenExchange { .. } => true,
1000 KontextDevError::ConnectSession { message }
1001 | KontextDevError::IntegrationOAuthInit { message } => {
1002 let lower = message.to_ascii_lowercase();
1003 lower.contains("401")
1004 || lower.contains("unauthorized")
1005 || lower.contains("invalid token")
1006 || lower.contains("authorization")
1007 || lower.contains("access token expired")
1008 }
1009 _ => false,
1010 }
1011}
1012
1013fn is_token_exchange_error(err: &KontextDevError) -> bool {
1014 matches!(err, KontextDevError::TokenExchange { .. })
1015}
1016
1017fn is_auth_recovery_required(err: &KontextDevError) -> bool {
1018 is_authorization_error(err) || is_token_exchange_error(err)
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 use super::*;
1024
1025 fn route(tool_id: &str, backend_id: &str) -> ToolRouteRecord {
1026 ToolRouteRecord {
1027 tool_id: tool_id.to_string(),
1028 backend_id: backend_id.to_string(),
1029 source: BackendSource::Gateway,
1030 backend_tool_id: tool_id.to_string(),
1031 integration_id: None,
1032 }
1033 }
1034
1035 fn candidate(tool_id: &str, backend_id: &str) -> RouteInventoryCandidate {
1036 RouteInventoryCandidate {
1037 tool: KontextTool {
1038 id: tool_id.to_string(),
1039 name: tool_id.to_string(),
1040 description: None,
1041 input_schema: None,
1042 server: None,
1043 },
1044 route: route(tool_id, backend_id),
1045 }
1046 }
1047
1048 #[test]
1049 fn build_route_inventory_keeps_first_route_on_conflict() {
1050 let snapshot = build_route_inventory(
1051 1,
1052 vec![
1053 candidate("tool-a", "gateway"),
1054 candidate("tool-a", "internal:linear"),
1055 candidate("tool-b", "gateway"),
1056 ],
1057 );
1058
1059 assert_eq!(snapshot.tools.len(), 2);
1060 assert_eq!(snapshot.routes.get("tool-a").unwrap().backend_id, "gateway");
1061 assert_eq!(snapshot.conflicts.len(), 1);
1062 assert_eq!(snapshot.conflicts[0].kept.backend_id, "gateway");
1063 assert_eq!(snapshot.conflicts[0].dropped.backend_id, "internal:linear");
1064 }
1065
1066 #[test]
1067 fn inventory_build_key_is_stable_for_same_runtime_integrations() {
1068 let runtime = vec![
1069 RuntimeIntegrationRecord {
1070 id: "b".to_string(),
1071 name: "B".to_string(),
1072 url: "https://b".to_string(),
1073 category: RuntimeIntegrationCategory::InternalMcpCredentials,
1074 connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1075 auth_mode: None,
1076 credential_schema: None,
1077 requires_oauth: None,
1078 connection: None,
1079 },
1080 RuntimeIntegrationRecord {
1081 id: "a".to_string(),
1082 name: "A".to_string(),
1083 url: "https://a".to_string(),
1084 category: RuntimeIntegrationCategory::InternalMcpCredentials,
1085 connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1086 auth_mode: None,
1087 credential_schema: None,
1088 requires_oauth: None,
1089 connection: None,
1090 },
1091 ];
1092
1093 let first = inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1094 let second =
1095 inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1096 assert_eq!(first, second);
1097 }
1098
1099 #[test]
1100 fn inventory_build_key_distinguishes_none_from_zero_limit() {
1101 let no_limit = inventory_build_key(None, None);
1102 let zero_limit = inventory_build_key(Some(ToolsListOptions { limit: Some(0) }), None);
1103
1104 assert_ne!(no_limit, zero_limit);
1105 assert_eq!(no_limit, "limit:none|runtime:");
1106 assert_eq!(zero_limit, "limit:0|runtime:");
1107 }
1108
1109 #[test]
1110 fn transient_error_detection_matches_network_and_5xx() {
1111 let network = KontextDevError::ConnectSession {
1112 message: "connection reset by peer".to_string(),
1113 };
1114 let server_500 = KontextDevError::ConnectSession {
1115 message: "500 internal server error".to_string(),
1116 };
1117 let bad_request = KontextDevError::ConnectSession {
1118 message: "400 bad request".to_string(),
1119 };
1120
1121 assert!(is_transient_error(&network));
1122 assert!(is_transient_error(&server_500));
1123 assert!(!is_transient_error(&bad_request));
1124 }
1125}