Skip to main content

kontext_dev_sdk/
orchestrator.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use tokio::sync::{Mutex, RwLock};
8
9use crate::KontextDevError;
10use crate::client::{
11    ClientState, ConnectSessionResult, IntegrationInfo, KontextClient, KontextClientConfig,
12    ToolResult, create_kontext_client,
13};
14use crate::mcp::{
15    KontextMcp, KontextMcpConfig, KontextTool, RuntimeIntegrationCategory,
16    RuntimeIntegrationRecord, extract_text_content,
17};
18
19const RETRY_DELAY_MS: u64 = 250;
20const INTERNAL_TOKEN_SKEW_SECS: u64 = 30;
21
22#[derive(Clone, Debug)]
23pub struct ToolRouteRecord {
24    pub tool_id: String,
25    pub backend_id: String,
26    pub source: BackendSource,
27    pub backend_tool_id: String,
28    pub integration_id: Option<String>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum BackendSource {
33    Gateway,
34    Internal,
35}
36
37#[derive(Clone, Debug)]
38pub struct RouteConflictRecord {
39    pub tool_id: String,
40    pub kept: ToolRouteRecord,
41    pub dropped: ToolRouteRecord,
42}
43
44#[derive(Clone, Debug)]
45struct RouteInventoryCandidate {
46    tool: KontextTool,
47    route: ToolRouteRecord,
48}
49
50#[derive(Clone, Debug)]
51struct RouteInventorySnapshot {
52    version: u64,
53    tools: Vec<KontextTool>,
54    routes: HashMap<String, ToolRouteRecord>,
55    #[allow(dead_code)]
56    conflicts: Vec<RouteConflictRecord>,
57}
58
59impl RouteInventorySnapshot {
60    fn empty() -> Self {
61        Self {
62            version: 0,
63            tools: Vec::new(),
64            routes: HashMap::new(),
65            conflicts: Vec::new(),
66        }
67    }
68}
69
70#[derive(Clone, Debug)]
71struct InternalResourceToken {
72    access_token: String,
73    expires_at: Option<Instant>,
74}
75
76impl InternalResourceToken {
77    fn is_fresh(&self) -> bool {
78        match self.expires_at {
79            Some(expires_at) => {
80                let skew = Duration::from_secs(INTERNAL_TOKEN_SKEW_SECS);
81                Instant::now()
82                    .checked_add(skew)
83                    .is_some_and(|v| v < expires_at)
84            }
85            None => false,
86        }
87    }
88}
89
90pub type KontextOrchestratorState = ClientState;
91pub type KontextOrchestratorConfig = KontextClientConfig;
92
93#[derive(Clone)]
94pub struct KontextOrchestrator {
95    state: Arc<RwLock<KontextOrchestratorState>>,
96    config: KontextOrchestratorConfig,
97    gateway_client: KontextClient,
98    route_inventory: Arc<RwLock<RouteInventorySnapshot>>,
99    build_lock: Arc<Mutex<()>>,
100    last_build_key: Arc<RwLock<Option<String>>>,
101    version_counter: Arc<Mutex<u64>>,
102    reset_generation: Arc<Mutex<u64>>,
103    build_run_counter: Arc<Mutex<u64>>,
104    latest_committed_build_run: Arc<Mutex<u64>>,
105    runtime_integrations: Arc<RwLock<HashMap<String, RuntimeIntegrationRecord>>>,
106    internal_mcps: Arc<RwLock<HashMap<String, KontextMcp>>>,
107    internal_token_cache: Arc<Mutex<HashMap<String, InternalResourceToken>>>,
108    internal_token_locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
109    managed_internal_ops: Arc<Mutex<HashMap<String, u32>>>,
110    resolved_internal_listings: Arc<RwLock<HashSet<String>>>,
111    integration_refresh_lock: Arc<Mutex<()>>,
112}
113
114impl KontextOrchestrator {
115    pub fn new(config: KontextOrchestratorConfig) -> Self {
116        let gateway_client = create_kontext_client(config.clone());
117
118        Self {
119            state: Arc::new(RwLock::new(ClientState::Idle)),
120            config,
121            gateway_client,
122            route_inventory: Arc::new(RwLock::new(RouteInventorySnapshot::empty())),
123            build_lock: Arc::new(Mutex::new(())),
124            last_build_key: Arc::new(RwLock::new(None)),
125            version_counter: Arc::new(Mutex::new(0)),
126            reset_generation: Arc::new(Mutex::new(0)),
127            build_run_counter: Arc::new(Mutex::new(0)),
128            latest_committed_build_run: Arc::new(Mutex::new(0)),
129            runtime_integrations: Arc::new(RwLock::new(HashMap::new())),
130            internal_mcps: Arc::new(RwLock::new(HashMap::new())),
131            internal_token_cache: Arc::new(Mutex::new(HashMap::new())),
132            internal_token_locks: Arc::new(Mutex::new(HashMap::new())),
133            managed_internal_ops: Arc::new(Mutex::new(HashMap::new())),
134            resolved_internal_listings: Arc::new(RwLock::new(HashSet::new())),
135            integration_refresh_lock: Arc::new(Mutex::new(())),
136        }
137    }
138
139    pub async fn state(&self) -> KontextOrchestratorState {
140        *self.state.read().await
141    }
142
143    pub fn mcp(&self) -> &KontextMcp {
144        self.gateway_client.mcp()
145    }
146
147    pub async fn connect(&self) -> Result<(), KontextDevError> {
148        self.ensure_connected().await
149    }
150
151    pub async fn disconnect(&self) {
152        self.dispose_internal_clients().await;
153        self.runtime_integrations.write().await.clear();
154        self.reset_inventory().await;
155        self.gateway_client.disconnect().await;
156        self.internal_token_cache.lock().await.clear();
157        *self.state.write().await = ClientState::Idle;
158    }
159
160    pub async fn get_connect_page_url(&self) -> Result<ConnectSessionResult, KontextDevError> {
161        self.ensure_connected().await?;
162        self.gateway_client.get_connect_page_url().await
163    }
164
165    pub async fn sign_in(&self) -> Result<(), KontextDevError> {
166        // Mirrors TS semantics: sign-in in mixed mode is equivalent to connect.
167        self.ensure_connected().await
168    }
169
170    pub async fn sign_out(&self) -> Result<(), KontextDevError> {
171        self.dispose_internal_clients().await;
172        self.runtime_integrations.write().await.clear();
173        self.reset_inventory().await;
174        self.internal_token_cache.lock().await.clear();
175        self.gateway_client.sign_out().await?;
176        *self.state.write().await = ClientState::Idle;
177        Ok(())
178    }
179
180    pub async fn integrations_list(&self) -> Result<Vec<IntegrationInfo>, KontextDevError> {
181        self.ensure_connected().await?;
182
183        let discovered = self.refresh_integration_inventory().await?;
184
185        let gateway_statuses = self
186            .gateway_client
187            .integrations_list()
188            .await
189            .unwrap_or_default();
190        let gateway_by_id = gateway_statuses
191            .into_iter()
192            .map(|item| (item.id.clone(), item))
193            .collect::<HashMap<_, _>>();
194
195        let needs_internal_connect_link = discovered.iter().any(|integration| {
196            integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
197                && !integration
198                    .connection
199                    .as_ref()
200                    .map(|c| c.connected)
201                    .unwrap_or(false)
202        });
203
204        let mut shared_connect_url = None;
205        if needs_internal_connect_link
206            && let Ok(connect) = self.gateway_client.get_connect_page_url().await
207        {
208            shared_connect_url = Some(connect.connect_url);
209        }
210
211        Ok(discovered
212            .into_iter()
213            .map(|integration| {
214                let gateway_status = gateway_by_id.get(&integration.id);
215                let connected = gateway_status
216                    .map(|item| item.connected)
217                    .unwrap_or_else(|| {
218                        integration
219                            .connection
220                            .as_ref()
221                            .map(|c| c.connected)
222                            .unwrap_or(false)
223                    });
224                let connect_url = gateway_status
225                    .and_then(|item| item.connect_url.clone())
226                    .or_else(|| {
227                        (!connected
228                            && integration.category
229                                == RuntimeIntegrationCategory::InternalMcpCredentials)
230                            .then(|| shared_connect_url.clone())
231                            .flatten()
232                    });
233                let reason = gateway_status
234                    .and_then(|item| item.reason.clone())
235                    .or_else(|| {
236                        (!connected
237                            && integration.category
238                                == RuntimeIntegrationCategory::InternalMcpCredentials)
239                            .then_some("credentials_required".to_string())
240                    });
241
242                IntegrationInfo {
243                    id: integration.id,
244                    name: integration.name,
245                    connected,
246                    connect_url,
247                    reason,
248                }
249            })
250            .collect())
251    }
252
253    pub async fn tools_list(&self) -> Result<Vec<KontextTool>, KontextDevError> {
254        self.tools_list_with_options(None).await
255    }
256
257    pub async fn tools_list_with_options(
258        &self,
259        options: Option<ToolsListOptions>,
260    ) -> Result<Vec<KontextTool>, KontextDevError> {
261        self.ensure_connected().await?;
262
263        let mut inventory = self.build_tool_inventory(options.clone(), None).await?;
264
265        let missing_internal = self.get_internal_integrations_missing_tools().await;
266        if !missing_internal.is_empty() {
267            let refreshed = self.refresh_integration_inventory().await?;
268            inventory = self.build_tool_inventory(options, Some(refreshed)).await?;
269        }
270
271        let unresolved_internal = self.get_internal_integrations_missing_tools().await;
272        if self.state().await == ClientState::NeedsAuth
273            && self.gateway_client.state().await == ClientState::Ready
274            && unresolved_internal.is_empty()
275        {
276            *self.state.write().await = ClientState::Ready;
277        }
278
279        Ok(inventory.tools)
280    }
281
282    pub async fn tools_execute(
283        &self,
284        tool_id: &str,
285        args: Option<Map<String, Value>>,
286    ) -> Result<ToolResult, KontextDevError> {
287        self.ensure_connected().await?;
288
289        let route = self.route_for_execution(tool_id).await?;
290
291        match route.source {
292            BackendSource::Gateway => {
293                self.gateway_client
294                    .tools_execute(route.backend_tool_id.as_str(), args)
295                    .await
296            }
297            BackendSource::Internal => {
298                let Some(integration_id) = route.integration_id else {
299                    return Err(KontextDevError::ConnectSession {
300                        message: format!(
301                            "route for tool `{tool_id}` is missing integration metadata"
302                        ),
303                    });
304                };
305
306                let integration =
307                    self.integration_by_id(&integration_id)
308                        .await?
309                        .ok_or_else(|| KontextDevError::ConnectSession {
310                            message: format!(
311                                "internal integration `{integration_id}` is no longer attached"
312                            ),
313                        })?;
314
315                let internal_mcp = self.internal_mcp_for(&integration).await.ok_or_else(|| {
316                    KontextDevError::ConnectSession {
317                        message: format!(
318                            "internal integration `{integration_id}` has no MCP client"
319                        ),
320                    }
321                })?;
322
323                let token = match self
324                    .ensure_internal_resource_token(&integration, false)
325                    .await
326                {
327                    Ok(token) => token,
328                    Err(err) if is_auth_recovery_required(&err) => {
329                        self.ensure_internal_resource_token(&integration, true)
330                            .await?
331                    }
332                    Err(err) => return Err(err),
333                };
334
335                let integration_id_for_exec = integration_id.clone();
336                let execute_once = || {
337                    let integration_id = integration_id_for_exec.clone();
338                    let internal_mcp = internal_mcp.clone();
339                    let token = token.clone();
340                    let backend_tool_id = route.backend_tool_id.clone();
341                    let args = args.clone();
342                    async move {
343                        self.run_managed_internal_op(integration_id.as_str(), || async {
344                            let raw = internal_mcp
345                                .call_tool_with_access_token(
346                                    token.as_str(),
347                                    backend_tool_id.as_str(),
348                                    args,
349                                )
350                                .await?;
351                            Ok(ToolResult {
352                                content: extract_text_content(&raw),
353                                raw,
354                            })
355                        })
356                        .await
357                    }
358                };
359
360                match with_transient_retry(execute_once, 1).await {
361                    Ok(result) => Ok(result),
362                    Err(err) => {
363                        if is_auth_recovery_required(&err) {
364                            *self.state.write().await = ClientState::NeedsAuth;
365                        }
366                        Err(err)
367                    }
368                }
369            }
370        }
371    }
372
373    async fn ensure_connected(&self) -> Result<(), KontextDevError> {
374        let state = self.state().await;
375        if state == ClientState::Ready {
376            return Ok(());
377        }
378
379        *self.state.write().await = ClientState::Connecting;
380
381        match self.gateway_client.connect().await {
382            Ok(()) => {
383                if let Err(err) = self.refresh_integration_inventory().await {
384                    if is_auth_recovery_required(&err) {
385                        *self.state.write().await = ClientState::NeedsAuth;
386                    } else {
387                        *self.state.write().await = ClientState::Failed;
388                    }
389                    return Err(err);
390                }
391                *self.state.write().await = ClientState::Ready;
392                Ok(())
393            }
394            Err(err) => {
395                if is_auth_recovery_required(&err) {
396                    *self.state.write().await = ClientState::NeedsAuth;
397                } else {
398                    *self.state.write().await = ClientState::Failed;
399                }
400                Err(err)
401            }
402        }
403    }
404
405    async fn refresh_integration_inventory(
406        &self,
407    ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
408        let _guard = self.integration_refresh_lock.lock().await;
409
410        let list_once = || async { self.gateway_client.mcp().list_integrations().await };
411        let items = with_transient_retry(list_once, 1).await?;
412        self.apply_runtime_integrations(&items).await;
413        Ok(items)
414    }
415
416    async fn apply_runtime_integrations(&self, integrations: &[RuntimeIntegrationRecord]) {
417        let mut runtime = self.runtime_integrations.write().await;
418        runtime.clear();
419
420        for integration in integrations {
421            runtime.insert(integration.id.clone(), integration.clone());
422        }
423        drop(runtime);
424
425        let desired = integrations
426            .iter()
427            .filter(|integration| {
428                integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
429            })
430            .map(|integration| (integration.id.clone(), integration.url.clone()))
431            .collect::<BTreeMap<_, _>>();
432
433        let mut internal_mcps = self.internal_mcps.write().await;
434
435        internal_mcps.retain(|integration_id, mcp| {
436            desired
437                .get(integration_id)
438                .is_some_and(|url| mcp.mcp_url().ok().as_deref() == Some(url.as_str()))
439        });
440
441        for integration in integrations {
442            if integration.category != RuntimeIntegrationCategory::InternalMcpCredentials {
443                continue;
444            }
445            if internal_mcps.contains_key(&integration.id) {
446                continue;
447            }
448
449            internal_mcps.insert(
450                integration.id.clone(),
451                KontextMcp::new(KontextMcpConfig {
452                    client_session_id: self.config.client_session_id.clone(),
453                    client_id: self.config.client_id.clone(),
454                    redirect_uri: self.config.redirect_uri.clone(),
455                    url: Some(integration.url.clone()),
456                    server: self.config.server_url.clone(),
457                    client_secret: self.config.client_secret.clone(),
458                    scope: self.config.scope.clone(),
459                    resource: Some(integration.url.clone()),
460                    session_key: Some(format!("internal:{}", integration.id)),
461                    integration_ui_url: self.config.integration_ui_url.clone(),
462                    integration_return_to: self.config.integration_return_to.clone(),
463                    auth_timeout_seconds: self.config.auth_timeout_seconds,
464                    open_connect_page_on_login: Some(false),
465                    token_cache_path: None,
466                }),
467            );
468        }
469    }
470
471    async fn internal_mcp_for(&self, integration: &RuntimeIntegrationRecord) -> Option<KontextMcp> {
472        self.internal_mcps
473            .read()
474            .await
475            .get(&integration.id)
476            .cloned()
477    }
478
479    async fn integration_by_id(
480        &self,
481        integration_id: &str,
482    ) -> Result<Option<RuntimeIntegrationRecord>, KontextDevError> {
483        if let Some(existing) = self
484            .runtime_integrations
485            .read()
486            .await
487            .get(integration_id)
488            .cloned()
489        {
490            return Ok(Some(existing));
491        }
492
493        let refreshed = self.refresh_integration_inventory().await?;
494        Ok(refreshed.into_iter().find(|item| item.id == integration_id))
495    }
496
497    async fn ensure_internal_resource_token(
498        &self,
499        integration: &RuntimeIntegrationRecord,
500        force_exchange: bool,
501    ) -> Result<String, KontextDevError> {
502        if !force_exchange
503            && let Some(existing) = self
504                .internal_token_cache
505                .lock()
506                .await
507                .get(&integration.id)
508                .cloned()
509            && existing.is_fresh()
510        {
511            return Ok(existing.access_token);
512        }
513
514        let integration_lock = {
515            let mut locks = self.internal_token_locks.lock().await;
516            locks
517                .entry(integration.id.clone())
518                .or_insert_with(|| Arc::new(Mutex::new(())))
519                .clone()
520        };
521
522        let _guard = integration_lock.lock().await;
523
524        if !force_exchange
525            && let Some(existing) = self
526                .internal_token_cache
527                .lock()
528                .await
529                .get(&integration.id)
530                .cloned()
531            && existing.is_fresh()
532        {
533            return Ok(existing.access_token);
534        }
535
536        let session = self.gateway_client.mcp().authenticate_mcp().await?;
537        let exchanged = self
538            .gateway_client
539            .mcp()
540            .client()
541            .exchange_for_resource(&session.gateway_token.access_token, &integration.url, None)
542            .await?;
543
544        let expires_at = exchanged.expires_in.and_then(|seconds| {
545            (seconds > 0)
546                .then(|| Instant::now().checked_add(Duration::from_secs(seconds as u64)))
547                .flatten()
548        });
549
550        let token = InternalResourceToken {
551            access_token: exchanged.access_token,
552            expires_at,
553        };
554
555        let value = token.access_token.clone();
556        self.internal_token_cache
557            .lock()
558            .await
559            .insert(integration.id.clone(), token);
560        Ok(value)
561    }
562
563    async fn build_inventory_candidates(
564        &self,
565        options: Option<ToolsListOptions>,
566        runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
567    ) -> Result<Vec<RouteInventoryCandidate>, KontextDevError> {
568        self.resolved_internal_listings.write().await.clear();
569
570        let mut candidates = Vec::new();
571
572        let list_gateway_tools_once = || {
573            let gateway_client = self.gateway_client.clone();
574            async move { gateway_client.tools_list().await }
575        };
576        let gateway_tools = with_transient_retry(list_gateway_tools_once, 1)
577            .await?
578            .into_iter()
579            .take(
580                options
581                    .as_ref()
582                    .and_then(|value| value.limit)
583                    .unwrap_or(u32::MAX) as usize,
584            )
585            .collect::<Vec<_>>();
586        for tool in gateway_tools {
587            let route = ToolRouteRecord {
588                tool_id: tool.id.clone(),
589                backend_id: gateway_backend_id(),
590                source: BackendSource::Gateway,
591                backend_tool_id: tool.id.clone(),
592                integration_id: None,
593            };
594            candidates.push(RouteInventoryCandidate { tool, route });
595        }
596
597        let integrations = if let Some(runtime_integrations) = runtime_integrations_override {
598            self.apply_runtime_integrations(&runtime_integrations).await;
599            runtime_integrations
600        } else {
601            self.refresh_integration_inventory().await?
602        };
603
604        let mut internal_integrations = integrations
605            .into_iter()
606            .filter(|integration| {
607                integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
608            })
609            .collect::<Vec<_>>();
610        internal_integrations.sort_by(|a, b| a.id.cmp(&b.id));
611
612        let mut pending_auth_retries = Vec::new();
613
614        for integration in &internal_integrations {
615            match self
616                .add_internal_tools_for_integration(&mut candidates, integration, false)
617                .await
618            {
619                Ok(()) => {}
620                Err(err) => {
621                    if is_authorization_error(&err) {
622                        pending_auth_retries.push(integration.clone());
623                        continue;
624                    }
625                    if is_token_exchange_error(&err) {
626                        *self.state.write().await = ClientState::NeedsAuth;
627                    }
628                }
629            }
630        }
631
632        let mut unresolved_internal_auth = false;
633        for integration in pending_auth_retries {
634            match self
635                .add_internal_tools_for_integration(&mut candidates, &integration, true)
636                .await
637            {
638                Ok(()) => {}
639                Err(err) => {
640                    if is_auth_recovery_required(&err) {
641                        unresolved_internal_auth = true;
642                        continue;
643                    }
644                }
645            }
646        }
647
648        if unresolved_internal_auth {
649            *self.state.write().await = ClientState::NeedsAuth;
650        } else if self.state().await == ClientState::NeedsAuth
651            && self.gateway_client.state().await == ClientState::Ready
652        {
653            *self.state.write().await = ClientState::Ready;
654        }
655
656        Ok(candidates)
657    }
658
659    async fn add_internal_tools_for_integration(
660        &self,
661        candidates: &mut Vec<RouteInventoryCandidate>,
662        integration: &RuntimeIntegrationRecord,
663        force_exchange: bool,
664    ) -> Result<(), KontextDevError> {
665        let Some(internal_mcp) = self.internal_mcp_for(integration).await else {
666            return Ok(());
667        };
668
669        let token = self
670            .ensure_internal_resource_token(integration, force_exchange)
671            .await?;
672
673        let list_once = || {
674            let integration_id = integration.id.clone();
675            let internal_mcp = internal_mcp.clone();
676            let token = token.clone();
677            async move {
678                self.run_managed_internal_op(&integration_id, || async {
679                    internal_mcp
680                        .list_tools_with_access_token(token.as_str())
681                        .await
682                })
683                .await
684            }
685        };
686
687        let tools = with_transient_retry(list_once, 1).await?;
688
689        self.resolved_internal_listings
690            .write()
691            .await
692            .insert(integration.id.clone());
693
694        for tool in tools {
695            let backend_tool_id = if tool.id.is_empty() {
696                tool.name.clone()
697            } else {
698                tool.id.clone()
699            };
700            let unified_id = format!("{}:{}", integration.id, tool.name);
701
702            let mapped = KontextTool {
703                id: unified_id.clone(),
704                name: tool.name,
705                description: tool.description,
706                input_schema: tool.input_schema,
707                server: Some(crate::mcp::KontextToolServer {
708                    id: integration.id.clone(),
709                    name: Some(integration.name.clone()),
710                }),
711            };
712
713            let route = ToolRouteRecord {
714                tool_id: unified_id,
715                backend_id: internal_backend_id(&integration.id),
716                source: BackendSource::Internal,
717                backend_tool_id,
718                integration_id: Some(integration.id.clone()),
719            };
720
721            candidates.push(RouteInventoryCandidate {
722                tool: mapped,
723                route,
724            });
725        }
726
727        Ok(())
728    }
729
730    async fn run_managed_internal_op<T, F, Fut>(
731        &self,
732        integration_id: &str,
733        operation: F,
734    ) -> Result<T, KontextDevError>
735    where
736        F: FnOnce() -> Fut,
737        Fut: Future<Output = Result<T, KontextDevError>>,
738    {
739        {
740            let mut managed = self.managed_internal_ops.lock().await;
741            *managed.entry(integration_id.to_string()).or_insert(0) += 1;
742        }
743
744        let result = operation().await;
745
746        {
747            let mut managed = self.managed_internal_ops.lock().await;
748            if let Some(count) = managed.get_mut(integration_id) {
749                *count = count.saturating_sub(1);
750                if *count == 0 {
751                    managed.remove(integration_id);
752                }
753            }
754        }
755
756        result
757    }
758
759    async fn build_tool_inventory(
760        &self,
761        options: Option<ToolsListOptions>,
762        runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
763    ) -> Result<RouteInventorySnapshot, KontextDevError> {
764        let key = inventory_build_key(options.clone(), runtime_integrations_override.as_ref());
765
766        let _guard = self.build_lock.lock().await;
767
768        {
769            let last_key = self.last_build_key.read().await;
770            let snapshot = self.route_inventory.read().await;
771            if snapshot.version > 0 && last_key.as_deref() == Some(key.as_str()) {
772                return Ok(snapshot.clone());
773            }
774        }
775
776        let build_generation = *self.reset_generation.lock().await;
777        let build_run = {
778            let mut counter = self.build_run_counter.lock().await;
779            *counter += 1;
780            *counter
781        };
782
783        let candidates = self
784            .build_inventory_candidates(options, runtime_integrations_override)
785            .await?;
786
787        if *self.reset_generation.lock().await != build_generation {
788            return Ok(self.route_inventory.read().await.clone());
789        }
790
791        let latest_committed = *self.latest_committed_build_run.lock().await;
792        let snapshot = if build_run >= latest_committed {
793            *self.latest_committed_build_run.lock().await = build_run;
794            let mut version = self.version_counter.lock().await;
795            *version += 1;
796            let built = build_route_inventory(*version, candidates);
797            *self.route_inventory.write().await = built.clone();
798            *self.last_build_key.write().await = Some(key);
799            built
800        } else {
801            let current_version = self.route_inventory.read().await.version;
802            build_route_inventory(current_version, candidates)
803        };
804
805        Ok(snapshot)
806    }
807
808    async fn route_for_execution(&self, tool_id: &str) -> Result<ToolRouteRecord, KontextDevError> {
809        if let Some(route) = self
810            .route_inventory
811            .read()
812            .await
813            .routes
814            .get(tool_id)
815            .cloned()
816        {
817            return Ok(route);
818        }
819
820        let _ = self.build_tool_inventory(None, None).await?;
821
822        if let Some(route) = self
823            .route_inventory
824            .read()
825            .await
826            .routes
827            .get(tool_id)
828            .cloned()
829        {
830            return Ok(route);
831        }
832
833        Err(KontextDevError::ConnectSession {
834            message: format!(
835                "unknown tool `{tool_id}`. Call tools.list() to refresh available tools"
836            ),
837        })
838    }
839
840    async fn get_internal_integrations_missing_tools(&self) -> Vec<String> {
841        let resolved = self.resolved_internal_listings.read().await.clone();
842        self.internal_mcps
843            .read()
844            .await
845            .keys()
846            .filter(|integration_id| !resolved.contains(*integration_id))
847            .cloned()
848            .collect()
849    }
850
851    async fn dispose_internal_clients(&self) {
852        self.internal_mcps.write().await.clear();
853        self.internal_token_cache.lock().await.clear();
854        self.internal_token_locks.lock().await.clear();
855        self.resolved_internal_listings.write().await.clear();
856    }
857
858    async fn reset_inventory(&self) {
859        *self.reset_generation.lock().await += 1;
860        *self.version_counter.lock().await = 0;
861
862        let build_run = *self.build_run_counter.lock().await;
863        *self.latest_committed_build_run.lock().await = build_run;
864
865        *self.route_inventory.write().await = RouteInventorySnapshot::empty();
866        *self.last_build_key.write().await = None;
867    }
868}
869
870#[derive(Clone, Debug)]
871pub struct ToolsListOptions {
872    pub limit: Option<u32>,
873}
874
875pub fn create_kontext_orchestrator(config: KontextOrchestratorConfig) -> KontextOrchestrator {
876    KontextOrchestrator::new(config)
877}
878
879fn gateway_backend_id() -> String {
880    "gateway".to_string()
881}
882
883fn internal_backend_id(integration_id: &str) -> String {
884    format!("internal:{integration_id}")
885}
886
887fn inventory_build_key(
888    options: Option<ToolsListOptions>,
889    runtime_integrations: Option<&Vec<RuntimeIntegrationRecord>>,
890) -> String {
891    let limit_key = match options.and_then(|item| item.limit) {
892        Some(limit) => limit.to_string(),
893        None => "none".to_string(),
894    };
895    let mut key = format!("limit:{limit_key}");
896    key.push_str("|runtime:");
897    if let Some(integrations) = runtime_integrations {
898        let mut ids = integrations
899            .iter()
900            .map(|integration| integration.id.clone())
901            .collect::<Vec<_>>();
902        ids.sort();
903        key.push_str(ids.join(",").as_str());
904    }
905    key
906}
907
908fn build_route_inventory(
909    version: u64,
910    candidates: Vec<RouteInventoryCandidate>,
911) -> RouteInventorySnapshot {
912    let mut tools = Vec::<KontextTool>::new();
913    let mut routes = HashMap::<String, ToolRouteRecord>::new();
914    let mut conflicts = Vec::<RouteConflictRecord>::new();
915
916    for candidate in candidates {
917        let tool_id = candidate.route.tool_id.clone();
918        if !routes.contains_key(&tool_id) {
919            routes.insert(tool_id.clone(), candidate.route);
920            tools.push(candidate.tool);
921            continue;
922        }
923
924        let existing = routes
925            .get(&tool_id)
926            .cloned()
927            .unwrap_or_else(|| candidate.route.clone());
928
929        // TS parity default policy is keep_existing.
930        conflicts.push(RouteConflictRecord {
931            tool_id,
932            kept: existing,
933            dropped: candidate.route,
934        });
935    }
936
937    RouteInventorySnapshot {
938        version,
939        tools,
940        routes,
941        conflicts,
942    }
943}
944
945async fn with_transient_retry<T, F, Fut>(
946    mut operation: F,
947    max_retries: usize,
948) -> Result<T, KontextDevError>
949where
950    F: FnMut() -> Fut,
951    Fut: Future<Output = Result<T, KontextDevError>>,
952{
953    for attempt in 0..=max_retries {
954        match operation().await {
955            Ok(value) => return Ok(value),
956            Err(err) => {
957                if attempt >= max_retries || !is_transient_error(&err) {
958                    return Err(err);
959                }
960                tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
961            }
962        }
963    }
964
965    Err(KontextDevError::ConnectSession {
966        message: "transient retry loop exhausted unexpectedly".to_string(),
967    })
968}
969
970fn is_transient_error(err: &KontextDevError) -> bool {
971    match err {
972        KontextDevError::ConnectSession { message }
973        | KontextDevError::TokenExchange { message, .. }
974        | KontextDevError::TokenRequest { message, .. } => {
975            let lower = message.to_ascii_lowercase();
976            lower.contains("429")
977                || lower.contains("500")
978                || lower.contains("502")
979                || lower.contains("503")
980                || lower.contains("504")
981                || lower.contains("timed out")
982                || lower.contains("connection reset")
983                || lower.contains("broken pipe")
984                || lower.contains("temporarily unavailable")
985                || lower.contains("econnreset")
986                || lower.contains("network")
987        }
988        _ => false,
989    }
990}
991
992fn is_authorization_error(err: &KontextDevError) -> bool {
993    match err {
994        KontextDevError::OAuthCallbackTimeout { .. }
995        | KontextDevError::OAuthCallbackCancelled
996        | KontextDevError::MissingAuthorizationCode
997        | KontextDevError::OAuthCallbackError { .. }
998        | KontextDevError::InvalidOAuthState
999        | KontextDevError::TokenRequest { .. }
1000        | KontextDevError::TokenExchange { .. } => true,
1001        KontextDevError::ConnectSession { message }
1002        | KontextDevError::IntegrationOAuthInit { message } => {
1003            let lower = message.to_ascii_lowercase();
1004            lower.contains("401")
1005                || lower.contains("unauthorized")
1006                || lower.contains("invalid token")
1007                || lower.contains("authorization")
1008                || lower.contains("access token expired")
1009        }
1010        _ => false,
1011    }
1012}
1013
1014fn is_token_exchange_error(err: &KontextDevError) -> bool {
1015    matches!(err, KontextDevError::TokenExchange { .. })
1016}
1017
1018fn is_auth_recovery_required(err: &KontextDevError) -> bool {
1019    is_authorization_error(err) || is_token_exchange_error(err)
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use super::*;
1025
1026    fn route(tool_id: &str, backend_id: &str) -> ToolRouteRecord {
1027        ToolRouteRecord {
1028            tool_id: tool_id.to_string(),
1029            backend_id: backend_id.to_string(),
1030            source: BackendSource::Gateway,
1031            backend_tool_id: tool_id.to_string(),
1032            integration_id: None,
1033        }
1034    }
1035
1036    fn candidate(tool_id: &str, backend_id: &str) -> RouteInventoryCandidate {
1037        RouteInventoryCandidate {
1038            tool: KontextTool {
1039                id: tool_id.to_string(),
1040                name: tool_id.to_string(),
1041                description: None,
1042                input_schema: None,
1043                server: None,
1044            },
1045            route: route(tool_id, backend_id),
1046        }
1047    }
1048
1049    #[test]
1050    fn build_route_inventory_keeps_first_route_on_conflict() {
1051        let snapshot = build_route_inventory(
1052            1,
1053            vec![
1054                candidate("tool-a", "gateway"),
1055                candidate("tool-a", "internal:linear"),
1056                candidate("tool-b", "gateway"),
1057            ],
1058        );
1059
1060        assert_eq!(snapshot.tools.len(), 2);
1061        assert_eq!(snapshot.routes.get("tool-a").unwrap().backend_id, "gateway");
1062        assert_eq!(snapshot.conflicts.len(), 1);
1063        assert_eq!(snapshot.conflicts[0].kept.backend_id, "gateway");
1064        assert_eq!(snapshot.conflicts[0].dropped.backend_id, "internal:linear");
1065    }
1066
1067    #[test]
1068    fn inventory_build_key_is_stable_for_same_runtime_integrations() {
1069        let runtime = vec![
1070            RuntimeIntegrationRecord {
1071                id: "b".to_string(),
1072                name: "B".to_string(),
1073                url: "https://b".to_string(),
1074                category: RuntimeIntegrationCategory::InternalMcpCredentials,
1075                connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1076                auth_mode: None,
1077                credential_schema: None,
1078                requires_oauth: None,
1079                connection: None,
1080            },
1081            RuntimeIntegrationRecord {
1082                id: "a".to_string(),
1083                name: "A".to_string(),
1084                url: "https://a".to_string(),
1085                category: RuntimeIntegrationCategory::InternalMcpCredentials,
1086                connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1087                auth_mode: None,
1088                credential_schema: None,
1089                requires_oauth: None,
1090                connection: None,
1091            },
1092        ];
1093
1094        let first = inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1095        let second =
1096            inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1097        assert_eq!(first, second);
1098    }
1099
1100    #[test]
1101    fn inventory_build_key_distinguishes_none_from_zero_limit() {
1102        let no_limit = inventory_build_key(None, None);
1103        let zero_limit = inventory_build_key(Some(ToolsListOptions { limit: Some(0) }), None);
1104
1105        assert_ne!(no_limit, zero_limit);
1106        assert_eq!(no_limit, "limit:none|runtime:");
1107        assert_eq!(zero_limit, "limit:0|runtime:");
1108    }
1109
1110    #[test]
1111    fn transient_error_detection_matches_network_and_5xx() {
1112        let network = KontextDevError::ConnectSession {
1113            message: "connection reset by peer".to_string(),
1114        };
1115        let server_500 = KontextDevError::ConnectSession {
1116            message: "500 internal server error".to_string(),
1117        };
1118        let bad_request = KontextDevError::ConnectSession {
1119            message: "400 bad request".to_string(),
1120        };
1121
1122        assert!(is_transient_error(&network));
1123        assert!(is_transient_error(&server_500));
1124        assert!(!is_transient_error(&bad_request));
1125    }
1126}