Skip to main content

kontext_dev_sdk/
orchestrator.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use tokio::sync::{Mutex, RwLock};
8
9use crate::KontextDevError;
10use crate::client::{
11    ClientState, ConnectSessionResult, IntegrationInfo, KontextClient, KontextClientConfig,
12    ToolResult, create_kontext_client,
13};
14use crate::mcp::{
15    KontextMcp, KontextMcpConfig, KontextTool, RuntimeIntegrationCategory,
16    RuntimeIntegrationRecord, extract_text_content,
17};
18
19const RETRY_DELAY_MS: u64 = 250;
20const INTERNAL_TOKEN_SKEW_SECS: u64 = 30;
21
22#[derive(Clone, Debug)]
23pub struct ToolRouteRecord {
24    pub tool_id: String,
25    pub backend_id: String,
26    pub source: BackendSource,
27    pub backend_tool_id: String,
28    pub integration_id: Option<String>,
29}
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum BackendSource {
33    Gateway,
34    Internal,
35}
36
37#[derive(Clone, Debug)]
38pub struct RouteConflictRecord {
39    pub tool_id: String,
40    pub kept: ToolRouteRecord,
41    pub dropped: ToolRouteRecord,
42}
43
44#[derive(Clone, Debug)]
45struct RouteInventoryCandidate {
46    tool: KontextTool,
47    route: ToolRouteRecord,
48}
49
50#[derive(Clone, Debug)]
51struct RouteInventorySnapshot {
52    version: u64,
53    tools: Vec<KontextTool>,
54    routes: HashMap<String, ToolRouteRecord>,
55    #[allow(dead_code)]
56    conflicts: Vec<RouteConflictRecord>,
57}
58
59impl RouteInventorySnapshot {
60    fn empty() -> Self {
61        Self {
62            version: 0,
63            tools: Vec::new(),
64            routes: HashMap::new(),
65            conflicts: Vec::new(),
66        }
67    }
68}
69
70#[derive(Clone, Debug)]
71struct InternalResourceToken {
72    access_token: String,
73    expires_at: Option<Instant>,
74}
75
76impl InternalResourceToken {
77    fn is_fresh(&self) -> bool {
78        match self.expires_at {
79            Some(expires_at) => {
80                let skew = Duration::from_secs(INTERNAL_TOKEN_SKEW_SECS);
81                Instant::now()
82                    .checked_add(skew)
83                    .is_some_and(|v| v < expires_at)
84            }
85            None => false,
86        }
87    }
88}
89
90pub type KontextOrchestratorState = ClientState;
91pub type KontextOrchestratorConfig = KontextClientConfig;
92
93#[derive(Clone)]
94pub struct KontextOrchestrator {
95    state: Arc<RwLock<KontextOrchestratorState>>,
96    config: KontextOrchestratorConfig,
97    gateway_client: KontextClient,
98    route_inventory: Arc<RwLock<RouteInventorySnapshot>>,
99    build_lock: Arc<Mutex<()>>,
100    last_build_key: Arc<RwLock<Option<String>>>,
101    version_counter: Arc<Mutex<u64>>,
102    reset_generation: Arc<Mutex<u64>>,
103    build_run_counter: Arc<Mutex<u64>>,
104    latest_committed_build_run: Arc<Mutex<u64>>,
105    runtime_integrations: Arc<RwLock<HashMap<String, RuntimeIntegrationRecord>>>,
106    internal_mcps: Arc<RwLock<HashMap<String, KontextMcp>>>,
107    internal_token_cache: Arc<Mutex<HashMap<String, InternalResourceToken>>>,
108    internal_token_locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
109    managed_internal_ops: Arc<Mutex<HashMap<String, u32>>>,
110    resolved_internal_listings: Arc<RwLock<HashSet<String>>>,
111    integration_refresh_lock: Arc<Mutex<()>>,
112}
113
114impl KontextOrchestrator {
115    pub fn new(config: KontextOrchestratorConfig) -> Self {
116        let gateway_client = create_kontext_client(config.clone());
117
118        Self {
119            state: Arc::new(RwLock::new(ClientState::Idle)),
120            config,
121            gateway_client,
122            route_inventory: Arc::new(RwLock::new(RouteInventorySnapshot::empty())),
123            build_lock: Arc::new(Mutex::new(())),
124            last_build_key: Arc::new(RwLock::new(None)),
125            version_counter: Arc::new(Mutex::new(0)),
126            reset_generation: Arc::new(Mutex::new(0)),
127            build_run_counter: Arc::new(Mutex::new(0)),
128            latest_committed_build_run: Arc::new(Mutex::new(0)),
129            runtime_integrations: Arc::new(RwLock::new(HashMap::new())),
130            internal_mcps: Arc::new(RwLock::new(HashMap::new())),
131            internal_token_cache: Arc::new(Mutex::new(HashMap::new())),
132            internal_token_locks: Arc::new(Mutex::new(HashMap::new())),
133            managed_internal_ops: Arc::new(Mutex::new(HashMap::new())),
134            resolved_internal_listings: Arc::new(RwLock::new(HashSet::new())),
135            integration_refresh_lock: Arc::new(Mutex::new(())),
136        }
137    }
138
139    pub async fn state(&self) -> KontextOrchestratorState {
140        *self.state.read().await
141    }
142
143    pub fn mcp(&self) -> &KontextMcp {
144        self.gateway_client.mcp()
145    }
146
147    pub async fn connect(&self) -> Result<(), KontextDevError> {
148        self.ensure_connected().await
149    }
150
151    pub async fn disconnect(&self) {
152        self.dispose_internal_clients().await;
153        self.runtime_integrations.write().await.clear();
154        self.reset_inventory().await;
155        self.gateway_client.disconnect().await;
156        self.internal_token_cache.lock().await.clear();
157        *self.state.write().await = ClientState::Idle;
158    }
159
160    pub async fn get_connect_page_url(&self) -> Result<ConnectSessionResult, KontextDevError> {
161        self.ensure_connected().await?;
162        self.gateway_client.get_connect_page_url().await
163    }
164
165    pub async fn sign_in(&self) -> Result<(), KontextDevError> {
166        // Mirrors TS semantics: sign-in in mixed mode is equivalent to connect.
167        self.ensure_connected().await
168    }
169
170    pub async fn sign_out(&self) -> Result<(), KontextDevError> {
171        self.dispose_internal_clients().await;
172        self.runtime_integrations.write().await.clear();
173        self.reset_inventory().await;
174        self.internal_token_cache.lock().await.clear();
175        self.gateway_client.sign_out().await?;
176        *self.state.write().await = ClientState::Idle;
177        Ok(())
178    }
179
180    pub async fn integrations_list(&self) -> Result<Vec<IntegrationInfo>, KontextDevError> {
181        self.ensure_connected().await?;
182
183        let discovered = self.refresh_integration_inventory().await?;
184
185        let gateway_statuses = self
186            .gateway_client
187            .integrations_list()
188            .await
189            .unwrap_or_default();
190        let gateway_by_id = gateway_statuses
191            .into_iter()
192            .map(|item| (item.id.clone(), item))
193            .collect::<HashMap<_, _>>();
194
195        let needs_internal_connect_link = discovered.iter().any(|integration| {
196            integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
197                && !integration
198                    .connection
199                    .as_ref()
200                    .map(|c| c.connected)
201                    .unwrap_or(false)
202        });
203
204        let mut shared_connect_url = None;
205        if needs_internal_connect_link
206            && let Ok(connect) = self.gateway_client.get_connect_page_url().await
207        {
208            shared_connect_url = Some(connect.connect_url);
209        }
210
211        Ok(discovered
212            .into_iter()
213            .map(|integration| {
214                let gateway_status = gateway_by_id.get(&integration.id);
215                let connected = gateway_status
216                    .map(|item| item.connected)
217                    .unwrap_or_else(|| {
218                        integration
219                            .connection
220                            .as_ref()
221                            .map(|c| c.connected)
222                            .unwrap_or(false)
223                    });
224                let connect_url = gateway_status
225                    .and_then(|item| item.connect_url.clone())
226                    .or_else(|| {
227                        (!connected
228                            && integration.category
229                                == RuntimeIntegrationCategory::InternalMcpCredentials)
230                            .then(|| shared_connect_url.clone())
231                            .flatten()
232                    });
233                let reason = gateway_status
234                    .and_then(|item| item.reason.clone())
235                    .or_else(|| {
236                        (!connected
237                            && integration.category
238                                == RuntimeIntegrationCategory::InternalMcpCredentials)
239                            .then_some("credentials_required".to_string())
240                    });
241
242                IntegrationInfo {
243                    id: integration.id,
244                    name: integration.name,
245                    connected,
246                    connect_url,
247                    reason,
248                }
249            })
250            .collect())
251    }
252
253    pub async fn tools_list(&self) -> Result<Vec<KontextTool>, KontextDevError> {
254        self.tools_list_with_options(None).await
255    }
256
257    pub async fn tools_list_with_options(
258        &self,
259        options: Option<ToolsListOptions>,
260    ) -> Result<Vec<KontextTool>, KontextDevError> {
261        self.ensure_connected().await?;
262
263        let mut inventory = self.build_tool_inventory(options.clone(), None).await?;
264
265        let missing_internal = self.get_internal_integrations_missing_tools().await;
266        if !missing_internal.is_empty() {
267            let refreshed = self.refresh_integration_inventory().await?;
268            inventory = self.build_tool_inventory(options, Some(refreshed)).await?;
269        }
270
271        let unresolved_internal = self.get_internal_integrations_missing_tools().await;
272        if self.state().await == ClientState::NeedsAuth
273            && self.gateway_client.state().await == ClientState::Ready
274            && unresolved_internal.is_empty()
275        {
276            *self.state.write().await = ClientState::Ready;
277        }
278
279        Ok(inventory.tools)
280    }
281
282    pub async fn tools_execute(
283        &self,
284        tool_id: &str,
285        args: Option<Map<String, Value>>,
286    ) -> Result<ToolResult, KontextDevError> {
287        self.ensure_connected().await?;
288
289        let route = self.route_for_execution(tool_id).await?;
290
291        match route.source {
292            BackendSource::Gateway => {
293                self.gateway_client
294                    .tools_execute(route.backend_tool_id.as_str(), args)
295                    .await
296            }
297            BackendSource::Internal => {
298                let Some(integration_id) = route.integration_id else {
299                    return Err(KontextDevError::ConnectSession {
300                        message: format!(
301                            "route for tool `{tool_id}` is missing integration metadata"
302                        ),
303                    });
304                };
305
306                let integration =
307                    self.integration_by_id(&integration_id)
308                        .await?
309                        .ok_or_else(|| KontextDevError::ConnectSession {
310                            message: format!(
311                                "internal integration `{integration_id}` is no longer attached"
312                            ),
313                        })?;
314
315                let internal_mcp = self.internal_mcp_for(&integration).await.ok_or_else(|| {
316                    KontextDevError::ConnectSession {
317                        message: format!(
318                            "internal integration `{integration_id}` has no MCP client"
319                        ),
320                    }
321                })?;
322
323                let token = match self
324                    .ensure_internal_resource_token(&integration, false)
325                    .await
326                {
327                    Ok(token) => token,
328                    Err(err) if is_auth_recovery_required(&err) => {
329                        self.ensure_internal_resource_token(&integration, true)
330                            .await?
331                    }
332                    Err(err) => return Err(err),
333                };
334
335                let integration_id_for_exec = integration_id.clone();
336                let execute_once = || {
337                    let integration_id = integration_id_for_exec.clone();
338                    let internal_mcp = internal_mcp.clone();
339                    let token = token.clone();
340                    let backend_tool_id = route.backend_tool_id.clone();
341                    let args = args.clone();
342                    async move {
343                        self.run_managed_internal_op(integration_id.as_str(), || async {
344                            let raw = internal_mcp
345                                .call_tool_with_access_token(
346                                    token.as_str(),
347                                    backend_tool_id.as_str(),
348                                    args,
349                                )
350                                .await?;
351                            Ok(ToolResult {
352                                content: extract_text_content(&raw),
353                                raw,
354                            })
355                        })
356                        .await
357                    }
358                };
359
360                match with_transient_retry(execute_once, 1).await {
361                    Ok(result) => Ok(result),
362                    Err(err) => {
363                        if is_auth_recovery_required(&err) {
364                            *self.state.write().await = ClientState::NeedsAuth;
365                        }
366                        Err(err)
367                    }
368                }
369            }
370        }
371    }
372
373    async fn ensure_connected(&self) -> Result<(), KontextDevError> {
374        let state = self.state().await;
375        if state == ClientState::Ready {
376            return Ok(());
377        }
378
379        *self.state.write().await = ClientState::Connecting;
380
381        match self.gateway_client.connect().await {
382            Ok(()) => {
383                if let Err(err) = self.refresh_integration_inventory().await {
384                    if is_auth_recovery_required(&err) {
385                        *self.state.write().await = ClientState::NeedsAuth;
386                    } else {
387                        *self.state.write().await = ClientState::Failed;
388                    }
389                    return Err(err);
390                }
391                *self.state.write().await = ClientState::Ready;
392                Ok(())
393            }
394            Err(err) => {
395                if is_auth_recovery_required(&err) {
396                    *self.state.write().await = ClientState::NeedsAuth;
397                } else {
398                    *self.state.write().await = ClientState::Failed;
399                }
400                Err(err)
401            }
402        }
403    }
404
405    async fn refresh_integration_inventory(
406        &self,
407    ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
408        let _guard = self.integration_refresh_lock.lock().await;
409
410        let list_once = || async { self.gateway_client.mcp().list_integrations().await };
411        let items = with_transient_retry(list_once, 1).await?;
412        self.apply_runtime_integrations(&items).await;
413        Ok(items)
414    }
415
416    async fn apply_runtime_integrations(&self, integrations: &[RuntimeIntegrationRecord]) {
417        let mut runtime = self.runtime_integrations.write().await;
418        runtime.clear();
419
420        for integration in integrations {
421            runtime.insert(integration.id.clone(), integration.clone());
422        }
423        drop(runtime);
424
425        let desired = integrations
426            .iter()
427            .filter(|integration| {
428                integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
429            })
430            .map(|integration| (integration.id.clone(), integration.url.clone()))
431            .collect::<BTreeMap<_, _>>();
432
433        let mut internal_mcps = self.internal_mcps.write().await;
434
435        internal_mcps.retain(|integration_id, mcp| {
436            desired
437                .get(integration_id)
438                .is_some_and(|url| mcp.mcp_url().ok().as_deref() == Some(url.as_str()))
439        });
440
441        for integration in integrations {
442            if integration.category != RuntimeIntegrationCategory::InternalMcpCredentials {
443                continue;
444            }
445            if internal_mcps.contains_key(&integration.id) {
446                continue;
447            }
448
449            internal_mcps.insert(
450                integration.id.clone(),
451                KontextMcp::new(KontextMcpConfig {
452                    client_id: self.config.client_id.clone(),
453                    redirect_uri: self.config.redirect_uri.clone(),
454                    url: Some(integration.url.clone()),
455                    server: self.config.server_url.clone(),
456                    client_secret: self.config.client_secret.clone(),
457                    scope: self.config.scope.clone(),
458                    resource: Some(integration.url.clone()),
459                    session_key: Some(format!("internal:{}", integration.id)),
460                    integration_ui_url: self.config.integration_ui_url.clone(),
461                    integration_return_to: self.config.integration_return_to.clone(),
462                    auth_timeout_seconds: self.config.auth_timeout_seconds,
463                    open_connect_page_on_login: Some(false),
464                    token_cache_path: None,
465                }),
466            );
467        }
468    }
469
470    async fn internal_mcp_for(&self, integration: &RuntimeIntegrationRecord) -> Option<KontextMcp> {
471        self.internal_mcps
472            .read()
473            .await
474            .get(&integration.id)
475            .cloned()
476    }
477
478    async fn integration_by_id(
479        &self,
480        integration_id: &str,
481    ) -> Result<Option<RuntimeIntegrationRecord>, KontextDevError> {
482        if let Some(existing) = self
483            .runtime_integrations
484            .read()
485            .await
486            .get(integration_id)
487            .cloned()
488        {
489            return Ok(Some(existing));
490        }
491
492        let refreshed = self.refresh_integration_inventory().await?;
493        Ok(refreshed.into_iter().find(|item| item.id == integration_id))
494    }
495
496    async fn ensure_internal_resource_token(
497        &self,
498        integration: &RuntimeIntegrationRecord,
499        force_exchange: bool,
500    ) -> Result<String, KontextDevError> {
501        if !force_exchange
502            && let Some(existing) = self
503                .internal_token_cache
504                .lock()
505                .await
506                .get(&integration.id)
507                .cloned()
508            && existing.is_fresh()
509        {
510            return Ok(existing.access_token);
511        }
512
513        let integration_lock = {
514            let mut locks = self.internal_token_locks.lock().await;
515            locks
516                .entry(integration.id.clone())
517                .or_insert_with(|| Arc::new(Mutex::new(())))
518                .clone()
519        };
520
521        let _guard = integration_lock.lock().await;
522
523        if !force_exchange
524            && let Some(existing) = self
525                .internal_token_cache
526                .lock()
527                .await
528                .get(&integration.id)
529                .cloned()
530            && existing.is_fresh()
531        {
532            return Ok(existing.access_token);
533        }
534
535        let session = self.gateway_client.mcp().authenticate_mcp().await?;
536        let exchanged = self
537            .gateway_client
538            .mcp()
539            .client()
540            .exchange_for_resource(&session.gateway_token.access_token, &integration.url, None)
541            .await?;
542
543        let expires_at = exchanged.expires_in.and_then(|seconds| {
544            (seconds > 0)
545                .then(|| Instant::now().checked_add(Duration::from_secs(seconds as u64)))
546                .flatten()
547        });
548
549        let token = InternalResourceToken {
550            access_token: exchanged.access_token,
551            expires_at,
552        };
553
554        let value = token.access_token.clone();
555        self.internal_token_cache
556            .lock()
557            .await
558            .insert(integration.id.clone(), token);
559        Ok(value)
560    }
561
562    async fn build_inventory_candidates(
563        &self,
564        options: Option<ToolsListOptions>,
565        runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
566    ) -> Result<Vec<RouteInventoryCandidate>, KontextDevError> {
567        self.resolved_internal_listings.write().await.clear();
568
569        let mut candidates = Vec::new();
570
571        let list_gateway_tools_once = || {
572            let gateway_client = self.gateway_client.clone();
573            async move { gateway_client.tools_list().await }
574        };
575        let gateway_tools = with_transient_retry(list_gateway_tools_once, 1)
576            .await?
577            .into_iter()
578            .take(
579                options
580                    .as_ref()
581                    .and_then(|value| value.limit)
582                    .unwrap_or(u32::MAX) as usize,
583            )
584            .collect::<Vec<_>>();
585        for tool in gateway_tools {
586            let route = ToolRouteRecord {
587                tool_id: tool.id.clone(),
588                backend_id: gateway_backend_id(),
589                source: BackendSource::Gateway,
590                backend_tool_id: tool.id.clone(),
591                integration_id: None,
592            };
593            candidates.push(RouteInventoryCandidate { tool, route });
594        }
595
596        let integrations = if let Some(runtime_integrations) = runtime_integrations_override {
597            self.apply_runtime_integrations(&runtime_integrations).await;
598            runtime_integrations
599        } else {
600            self.refresh_integration_inventory().await?
601        };
602
603        let mut internal_integrations = integrations
604            .into_iter()
605            .filter(|integration| {
606                integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
607            })
608            .collect::<Vec<_>>();
609        internal_integrations.sort_by(|a, b| a.id.cmp(&b.id));
610
611        let mut pending_auth_retries = Vec::new();
612
613        for integration in &internal_integrations {
614            match self
615                .add_internal_tools_for_integration(&mut candidates, integration, false)
616                .await
617            {
618                Ok(()) => {}
619                Err(err) => {
620                    if is_authorization_error(&err) {
621                        pending_auth_retries.push(integration.clone());
622                        continue;
623                    }
624                    if is_token_exchange_error(&err) {
625                        *self.state.write().await = ClientState::NeedsAuth;
626                    }
627                }
628            }
629        }
630
631        let mut unresolved_internal_auth = false;
632        for integration in pending_auth_retries {
633            match self
634                .add_internal_tools_for_integration(&mut candidates, &integration, true)
635                .await
636            {
637                Ok(()) => {}
638                Err(err) => {
639                    if is_auth_recovery_required(&err) {
640                        unresolved_internal_auth = true;
641                        continue;
642                    }
643                }
644            }
645        }
646
647        if unresolved_internal_auth {
648            *self.state.write().await = ClientState::NeedsAuth;
649        } else if self.state().await == ClientState::NeedsAuth
650            && self.gateway_client.state().await == ClientState::Ready
651        {
652            *self.state.write().await = ClientState::Ready;
653        }
654
655        Ok(candidates)
656    }
657
658    async fn add_internal_tools_for_integration(
659        &self,
660        candidates: &mut Vec<RouteInventoryCandidate>,
661        integration: &RuntimeIntegrationRecord,
662        force_exchange: bool,
663    ) -> Result<(), KontextDevError> {
664        let Some(internal_mcp) = self.internal_mcp_for(integration).await else {
665            return Ok(());
666        };
667
668        let token = self
669            .ensure_internal_resource_token(integration, force_exchange)
670            .await?;
671
672        let list_once = || {
673            let integration_id = integration.id.clone();
674            let internal_mcp = internal_mcp.clone();
675            let token = token.clone();
676            async move {
677                self.run_managed_internal_op(&integration_id, || async {
678                    internal_mcp
679                        .list_tools_with_access_token(token.as_str())
680                        .await
681                })
682                .await
683            }
684        };
685
686        let tools = with_transient_retry(list_once, 1).await?;
687
688        self.resolved_internal_listings
689            .write()
690            .await
691            .insert(integration.id.clone());
692
693        for tool in tools {
694            let backend_tool_id = if tool.id.is_empty() {
695                tool.name.clone()
696            } else {
697                tool.id.clone()
698            };
699            let unified_id = format!("{}:{}", integration.id, tool.name);
700
701            let mapped = KontextTool {
702                id: unified_id.clone(),
703                name: tool.name,
704                description: tool.description,
705                input_schema: tool.input_schema,
706                server: Some(crate::mcp::KontextToolServer {
707                    id: integration.id.clone(),
708                    name: Some(integration.name.clone()),
709                }),
710            };
711
712            let route = ToolRouteRecord {
713                tool_id: unified_id,
714                backend_id: internal_backend_id(&integration.id),
715                source: BackendSource::Internal,
716                backend_tool_id,
717                integration_id: Some(integration.id.clone()),
718            };
719
720            candidates.push(RouteInventoryCandidate {
721                tool: mapped,
722                route,
723            });
724        }
725
726        Ok(())
727    }
728
729    async fn run_managed_internal_op<T, F, Fut>(
730        &self,
731        integration_id: &str,
732        operation: F,
733    ) -> Result<T, KontextDevError>
734    where
735        F: FnOnce() -> Fut,
736        Fut: Future<Output = Result<T, KontextDevError>>,
737    {
738        {
739            let mut managed = self.managed_internal_ops.lock().await;
740            *managed.entry(integration_id.to_string()).or_insert(0) += 1;
741        }
742
743        let result = operation().await;
744
745        {
746            let mut managed = self.managed_internal_ops.lock().await;
747            if let Some(count) = managed.get_mut(integration_id) {
748                *count = count.saturating_sub(1);
749                if *count == 0 {
750                    managed.remove(integration_id);
751                }
752            }
753        }
754
755        result
756    }
757
758    async fn build_tool_inventory(
759        &self,
760        options: Option<ToolsListOptions>,
761        runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
762    ) -> Result<RouteInventorySnapshot, KontextDevError> {
763        let key = inventory_build_key(options.clone(), runtime_integrations_override.as_ref());
764
765        let _guard = self.build_lock.lock().await;
766
767        {
768            let last_key = self.last_build_key.read().await;
769            let snapshot = self.route_inventory.read().await;
770            if snapshot.version > 0 && last_key.as_deref() == Some(key.as_str()) {
771                return Ok(snapshot.clone());
772            }
773        }
774
775        let build_generation = *self.reset_generation.lock().await;
776        let build_run = {
777            let mut counter = self.build_run_counter.lock().await;
778            *counter += 1;
779            *counter
780        };
781
782        let candidates = self
783            .build_inventory_candidates(options, runtime_integrations_override)
784            .await?;
785
786        if *self.reset_generation.lock().await != build_generation {
787            return Ok(self.route_inventory.read().await.clone());
788        }
789
790        let latest_committed = *self.latest_committed_build_run.lock().await;
791        let snapshot = if build_run >= latest_committed {
792            *self.latest_committed_build_run.lock().await = build_run;
793            let mut version = self.version_counter.lock().await;
794            *version += 1;
795            let built = build_route_inventory(*version, candidates);
796            *self.route_inventory.write().await = built.clone();
797            *self.last_build_key.write().await = Some(key);
798            built
799        } else {
800            let current_version = self.route_inventory.read().await.version;
801            build_route_inventory(current_version, candidates)
802        };
803
804        Ok(snapshot)
805    }
806
807    async fn route_for_execution(&self, tool_id: &str) -> Result<ToolRouteRecord, KontextDevError> {
808        if let Some(route) = self
809            .route_inventory
810            .read()
811            .await
812            .routes
813            .get(tool_id)
814            .cloned()
815        {
816            return Ok(route);
817        }
818
819        let _ = self.build_tool_inventory(None, None).await?;
820
821        if let Some(route) = self
822            .route_inventory
823            .read()
824            .await
825            .routes
826            .get(tool_id)
827            .cloned()
828        {
829            return Ok(route);
830        }
831
832        Err(KontextDevError::ConnectSession {
833            message: format!(
834                "unknown tool `{tool_id}`. Call tools.list() to refresh available tools"
835            ),
836        })
837    }
838
839    async fn get_internal_integrations_missing_tools(&self) -> Vec<String> {
840        let resolved = self.resolved_internal_listings.read().await.clone();
841        self.internal_mcps
842            .read()
843            .await
844            .keys()
845            .filter(|integration_id| !resolved.contains(*integration_id))
846            .cloned()
847            .collect()
848    }
849
850    async fn dispose_internal_clients(&self) {
851        self.internal_mcps.write().await.clear();
852        self.internal_token_cache.lock().await.clear();
853        self.internal_token_locks.lock().await.clear();
854        self.resolved_internal_listings.write().await.clear();
855    }
856
857    async fn reset_inventory(&self) {
858        *self.reset_generation.lock().await += 1;
859        *self.version_counter.lock().await = 0;
860
861        let build_run = *self.build_run_counter.lock().await;
862        *self.latest_committed_build_run.lock().await = build_run;
863
864        *self.route_inventory.write().await = RouteInventorySnapshot::empty();
865        *self.last_build_key.write().await = None;
866    }
867}
868
869#[derive(Clone, Debug)]
870pub struct ToolsListOptions {
871    pub limit: Option<u32>,
872}
873
874pub fn create_kontext_orchestrator(config: KontextOrchestratorConfig) -> KontextOrchestrator {
875    KontextOrchestrator::new(config)
876}
877
878fn gateway_backend_id() -> String {
879    "gateway".to_string()
880}
881
882fn internal_backend_id(integration_id: &str) -> String {
883    format!("internal:{integration_id}")
884}
885
886fn inventory_build_key(
887    options: Option<ToolsListOptions>,
888    runtime_integrations: Option<&Vec<RuntimeIntegrationRecord>>,
889) -> String {
890    let limit_key = match options.and_then(|item| item.limit) {
891        Some(limit) => limit.to_string(),
892        None => "none".to_string(),
893    };
894    let mut key = format!("limit:{limit_key}");
895    key.push_str("|runtime:");
896    if let Some(integrations) = runtime_integrations {
897        let mut ids = integrations
898            .iter()
899            .map(|integration| integration.id.clone())
900            .collect::<Vec<_>>();
901        ids.sort();
902        key.push_str(ids.join(",").as_str());
903    }
904    key
905}
906
907fn build_route_inventory(
908    version: u64,
909    candidates: Vec<RouteInventoryCandidate>,
910) -> RouteInventorySnapshot {
911    let mut tools = Vec::<KontextTool>::new();
912    let mut routes = HashMap::<String, ToolRouteRecord>::new();
913    let mut conflicts = Vec::<RouteConflictRecord>::new();
914
915    for candidate in candidates {
916        let tool_id = candidate.route.tool_id.clone();
917        if !routes.contains_key(&tool_id) {
918            routes.insert(tool_id.clone(), candidate.route);
919            tools.push(candidate.tool);
920            continue;
921        }
922
923        let existing = routes
924            .get(&tool_id)
925            .cloned()
926            .unwrap_or_else(|| candidate.route.clone());
927
928        // TS parity default policy is keep_existing.
929        conflicts.push(RouteConflictRecord {
930            tool_id,
931            kept: existing,
932            dropped: candidate.route,
933        });
934    }
935
936    RouteInventorySnapshot {
937        version,
938        tools,
939        routes,
940        conflicts,
941    }
942}
943
944async fn with_transient_retry<T, F, Fut>(
945    mut operation: F,
946    max_retries: usize,
947) -> Result<T, KontextDevError>
948where
949    F: FnMut() -> Fut,
950    Fut: Future<Output = Result<T, KontextDevError>>,
951{
952    for attempt in 0..=max_retries {
953        match operation().await {
954            Ok(value) => return Ok(value),
955            Err(err) => {
956                if attempt >= max_retries || !is_transient_error(&err) {
957                    return Err(err);
958                }
959                tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
960            }
961        }
962    }
963
964    Err(KontextDevError::ConnectSession {
965        message: "transient retry loop exhausted unexpectedly".to_string(),
966    })
967}
968
969fn is_transient_error(err: &KontextDevError) -> bool {
970    match err {
971        KontextDevError::ConnectSession { message }
972        | KontextDevError::TokenExchange { message, .. }
973        | KontextDevError::TokenRequest { message, .. } => {
974            let lower = message.to_ascii_lowercase();
975            lower.contains("429")
976                || lower.contains("500")
977                || lower.contains("502")
978                || lower.contains("503")
979                || lower.contains("504")
980                || lower.contains("timed out")
981                || lower.contains("connection reset")
982                || lower.contains("broken pipe")
983                || lower.contains("temporarily unavailable")
984                || lower.contains("econnreset")
985                || lower.contains("network")
986        }
987        _ => false,
988    }
989}
990
991fn is_authorization_error(err: &KontextDevError) -> bool {
992    match err {
993        KontextDevError::OAuthCallbackTimeout { .. }
994        | KontextDevError::OAuthCallbackCancelled
995        | KontextDevError::MissingAuthorizationCode
996        | KontextDevError::OAuthCallbackError { .. }
997        | KontextDevError::InvalidOAuthState
998        | KontextDevError::TokenRequest { .. }
999        | KontextDevError::TokenExchange { .. } => true,
1000        KontextDevError::ConnectSession { message }
1001        | KontextDevError::IntegrationOAuthInit { message } => {
1002            let lower = message.to_ascii_lowercase();
1003            lower.contains("401")
1004                || lower.contains("unauthorized")
1005                || lower.contains("invalid token")
1006                || lower.contains("authorization")
1007                || lower.contains("access token expired")
1008        }
1009        _ => false,
1010    }
1011}
1012
1013fn is_token_exchange_error(err: &KontextDevError) -> bool {
1014    matches!(err, KontextDevError::TokenExchange { .. })
1015}
1016
1017fn is_auth_recovery_required(err: &KontextDevError) -> bool {
1018    is_authorization_error(err) || is_token_exchange_error(err)
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024
1025    fn route(tool_id: &str, backend_id: &str) -> ToolRouteRecord {
1026        ToolRouteRecord {
1027            tool_id: tool_id.to_string(),
1028            backend_id: backend_id.to_string(),
1029            source: BackendSource::Gateway,
1030            backend_tool_id: tool_id.to_string(),
1031            integration_id: None,
1032        }
1033    }
1034
1035    fn candidate(tool_id: &str, backend_id: &str) -> RouteInventoryCandidate {
1036        RouteInventoryCandidate {
1037            tool: KontextTool {
1038                id: tool_id.to_string(),
1039                name: tool_id.to_string(),
1040                description: None,
1041                input_schema: None,
1042                server: None,
1043            },
1044            route: route(tool_id, backend_id),
1045        }
1046    }
1047
1048    #[test]
1049    fn build_route_inventory_keeps_first_route_on_conflict() {
1050        let snapshot = build_route_inventory(
1051            1,
1052            vec![
1053                candidate("tool-a", "gateway"),
1054                candidate("tool-a", "internal:linear"),
1055                candidate("tool-b", "gateway"),
1056            ],
1057        );
1058
1059        assert_eq!(snapshot.tools.len(), 2);
1060        assert_eq!(snapshot.routes.get("tool-a").unwrap().backend_id, "gateway");
1061        assert_eq!(snapshot.conflicts.len(), 1);
1062        assert_eq!(snapshot.conflicts[0].kept.backend_id, "gateway");
1063        assert_eq!(snapshot.conflicts[0].dropped.backend_id, "internal:linear");
1064    }
1065
1066    #[test]
1067    fn inventory_build_key_is_stable_for_same_runtime_integrations() {
1068        let runtime = vec![
1069            RuntimeIntegrationRecord {
1070                id: "b".to_string(),
1071                name: "B".to_string(),
1072                url: "https://b".to_string(),
1073                category: RuntimeIntegrationCategory::InternalMcpCredentials,
1074                connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1075                auth_mode: None,
1076                credential_schema: None,
1077                requires_oauth: None,
1078                connection: None,
1079            },
1080            RuntimeIntegrationRecord {
1081                id: "a".to_string(),
1082                name: "A".to_string(),
1083                url: "https://a".to_string(),
1084                category: RuntimeIntegrationCategory::InternalMcpCredentials,
1085                connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1086                auth_mode: None,
1087                credential_schema: None,
1088                requires_oauth: None,
1089                connection: None,
1090            },
1091        ];
1092
1093        let first = inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1094        let second =
1095            inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1096        assert_eq!(first, second);
1097    }
1098
1099    #[test]
1100    fn inventory_build_key_distinguishes_none_from_zero_limit() {
1101        let no_limit = inventory_build_key(None, None);
1102        let zero_limit = inventory_build_key(Some(ToolsListOptions { limit: Some(0) }), None);
1103
1104        assert_ne!(no_limit, zero_limit);
1105        assert_eq!(no_limit, "limit:none|runtime:");
1106        assert_eq!(zero_limit, "limit:0|runtime:");
1107    }
1108
1109    #[test]
1110    fn transient_error_detection_matches_network_and_5xx() {
1111        let network = KontextDevError::ConnectSession {
1112            message: "connection reset by peer".to_string(),
1113        };
1114        let server_500 = KontextDevError::ConnectSession {
1115            message: "500 internal server error".to_string(),
1116        };
1117        let bad_request = KontextDevError::ConnectSession {
1118            message: "400 bad request".to_string(),
1119        };
1120
1121        assert!(is_transient_error(&network));
1122        assert!(is_transient_error(&server_500));
1123        assert!(!is_transient_error(&bad_request));
1124    }
1125}