Skip to main content

kontext_dev_sdk/
orchestrator.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use tokio::sync::{Mutex, RwLock};
8
9use crate::KontextDevError;
10use crate::client::{
11    ClientState, ConnectSessionResult, IntegrationInfo, KontextClient, KontextClientConfig,
12    ToolResult, create_kontext_client,
13};
14use crate::mcp::{
15    KontextMcp, KontextMcpConfig, KontextTool, RuntimeIntegrationCategory,
16    RuntimeIntegrationRecord, extract_text_content,
17};
18use crate::prompt_guidance::{KontextPromptGuidance, build_kontext_prompt_guidance};
19
20const RETRY_DELAY_MS: u64 = 250;
21const INTERNAL_TOKEN_SKEW_SECS: u64 = 30;
22
23#[derive(Clone, Debug)]
24pub struct ToolRouteRecord {
25    pub tool_id: String,
26    pub backend_id: String,
27    pub source: BackendSource,
28    pub backend_tool_id: String,
29    pub integration_id: Option<String>,
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum BackendSource {
34    Gateway,
35    Internal,
36}
37
38#[derive(Clone, Debug)]
39pub struct RouteConflictRecord {
40    pub tool_id: String,
41    pub kept: ToolRouteRecord,
42    pub dropped: ToolRouteRecord,
43}
44
45#[derive(Clone, Debug)]
46struct RouteInventoryCandidate {
47    tool: KontextTool,
48    route: ToolRouteRecord,
49}
50
51#[derive(Clone, Debug)]
52struct RouteInventorySnapshot {
53    version: u64,
54    tools: Vec<KontextTool>,
55    routes: HashMap<String, ToolRouteRecord>,
56    #[allow(dead_code)]
57    conflicts: Vec<RouteConflictRecord>,
58}
59
60impl RouteInventorySnapshot {
61    fn empty() -> Self {
62        Self {
63            version: 0,
64            tools: Vec::new(),
65            routes: HashMap::new(),
66            conflicts: Vec::new(),
67        }
68    }
69}
70
71#[derive(Clone, Debug)]
72struct InternalResourceToken {
73    access_token: String,
74    expires_at: Option<Instant>,
75}
76
77impl InternalResourceToken {
78    fn is_fresh(&self) -> bool {
79        match self.expires_at {
80            Some(expires_at) => {
81                let skew = Duration::from_secs(INTERNAL_TOKEN_SKEW_SECS);
82                Instant::now()
83                    .checked_add(skew)
84                    .is_some_and(|v| v < expires_at)
85            }
86            None => false,
87        }
88    }
89}
90
91pub type KontextOrchestratorState = ClientState;
92pub type KontextOrchestratorConfig = KontextClientConfig;
93
94#[derive(Clone)]
95pub struct KontextOrchestrator {
96    state: Arc<RwLock<KontextOrchestratorState>>,
97    config: KontextOrchestratorConfig,
98    gateway_client: KontextClient,
99    route_inventory: Arc<RwLock<RouteInventorySnapshot>>,
100    build_lock: Arc<Mutex<()>>,
101    last_build_key: Arc<RwLock<Option<String>>>,
102    version_counter: Arc<Mutex<u64>>,
103    reset_generation: Arc<Mutex<u64>>,
104    build_run_counter: Arc<Mutex<u64>>,
105    latest_committed_build_run: Arc<Mutex<u64>>,
106    runtime_integrations: Arc<RwLock<HashMap<String, RuntimeIntegrationRecord>>>,
107    internal_mcps: Arc<RwLock<HashMap<String, KontextMcp>>>,
108    internal_token_cache: Arc<Mutex<HashMap<String, InternalResourceToken>>>,
109    internal_token_locks: Arc<Mutex<HashMap<String, Arc<Mutex<()>>>>>,
110    managed_internal_ops: Arc<Mutex<HashMap<String, u32>>>,
111    resolved_internal_listings: Arc<RwLock<HashSet<String>>>,
112    integration_refresh_lock: Arc<Mutex<()>>,
113}
114
115impl KontextOrchestrator {
116    pub fn new(config: KontextOrchestratorConfig) -> Self {
117        let gateway_client = create_kontext_client(config.clone());
118
119        Self {
120            state: Arc::new(RwLock::new(ClientState::Idle)),
121            config,
122            gateway_client,
123            route_inventory: Arc::new(RwLock::new(RouteInventorySnapshot::empty())),
124            build_lock: Arc::new(Mutex::new(())),
125            last_build_key: Arc::new(RwLock::new(None)),
126            version_counter: Arc::new(Mutex::new(0)),
127            reset_generation: Arc::new(Mutex::new(0)),
128            build_run_counter: Arc::new(Mutex::new(0)),
129            latest_committed_build_run: Arc::new(Mutex::new(0)),
130            runtime_integrations: Arc::new(RwLock::new(HashMap::new())),
131            internal_mcps: Arc::new(RwLock::new(HashMap::new())),
132            internal_token_cache: Arc::new(Mutex::new(HashMap::new())),
133            internal_token_locks: Arc::new(Mutex::new(HashMap::new())),
134            managed_internal_ops: Arc::new(Mutex::new(HashMap::new())),
135            resolved_internal_listings: Arc::new(RwLock::new(HashSet::new())),
136            integration_refresh_lock: Arc::new(Mutex::new(())),
137        }
138    }
139
140    pub async fn state(&self) -> KontextOrchestratorState {
141        *self.state.read().await
142    }
143
144    pub fn mcp(&self) -> &KontextMcp {
145        self.gateway_client.mcp()
146    }
147
148    pub async fn connect(&self) -> Result<(), KontextDevError> {
149        self.ensure_connected().await
150    }
151
152    pub async fn disconnect(&self) {
153        self.dispose_internal_clients().await;
154        self.runtime_integrations.write().await.clear();
155        self.reset_inventory().await;
156        self.gateway_client.disconnect().await;
157        self.internal_token_cache.lock().await.clear();
158        *self.state.write().await = ClientState::Idle;
159    }
160
161    pub async fn get_connect_page_url(&self) -> Result<ConnectSessionResult, KontextDevError> {
162        self.ensure_connected().await?;
163        self.gateway_client.get_connect_page_url().await
164    }
165
166    pub async fn sign_in(&self) -> Result<(), KontextDevError> {
167        // Mirrors TS semantics: sign-in in mixed mode is equivalent to connect.
168        self.ensure_connected().await
169    }
170
171    pub async fn sign_out(&self) -> Result<(), KontextDevError> {
172        self.dispose_internal_clients().await;
173        self.runtime_integrations.write().await.clear();
174        self.reset_inventory().await;
175        self.internal_token_cache.lock().await.clear();
176        self.gateway_client.sign_out().await?;
177        *self.state.write().await = ClientState::Idle;
178        Ok(())
179    }
180
181    pub async fn integrations_list(&self) -> Result<Vec<IntegrationInfo>, KontextDevError> {
182        self.ensure_connected().await?;
183
184        let discovered = self.refresh_integration_inventory().await?;
185
186        let gateway_statuses = self
187            .gateway_client
188            .integrations_list()
189            .await
190            .unwrap_or_default();
191        let gateway_by_id = gateway_statuses
192            .into_iter()
193            .map(|item| (item.id.clone(), item))
194            .collect::<HashMap<_, _>>();
195
196        let needs_internal_connect_link = discovered.iter().any(|integration| {
197            integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
198                && !integration
199                    .connection
200                    .as_ref()
201                    .map(|c| c.connected)
202                    .unwrap_or(false)
203        });
204
205        let mut shared_connect_url = None;
206        if needs_internal_connect_link
207            && let Ok(connect) = self.gateway_client.get_connect_page_url().await
208        {
209            shared_connect_url = Some(connect.connect_url);
210        }
211
212        Ok(discovered
213            .into_iter()
214            .map(|integration| {
215                let gateway_status = gateway_by_id.get(&integration.id);
216                let connected = gateway_status
217                    .map(|item| item.connected)
218                    .unwrap_or_else(|| {
219                        integration
220                            .connection
221                            .as_ref()
222                            .map(|c| c.connected)
223                            .unwrap_or(false)
224                    });
225                let connect_url = gateway_status
226                    .and_then(|item| item.connect_url.clone())
227                    .or_else(|| {
228                        (!connected
229                            && integration.category
230                                == RuntimeIntegrationCategory::InternalMcpCredentials)
231                            .then(|| shared_connect_url.clone())
232                            .flatten()
233                    });
234                let reason = gateway_status
235                    .and_then(|item| item.reason.clone())
236                    .or_else(|| {
237                        (!connected
238                            && integration.category
239                                == RuntimeIntegrationCategory::InternalMcpCredentials)
240                            .then_some("credentials_required".to_string())
241                    });
242
243                IntegrationInfo {
244                    id: integration.id,
245                    name: integration.name,
246                    connected,
247                    connect_url,
248                    reason,
249                }
250            })
251            .collect())
252    }
253
254    pub async fn tools_list(&self) -> Result<Vec<KontextTool>, KontextDevError> {
255        self.tools_list_with_options(None).await
256    }
257
258    pub async fn prompt_guidance(&self) -> Result<KontextPromptGuidance, KontextDevError> {
259        let tools = self.tools_list().await?;
260        let integrations = self.integrations_list().await?;
261        let tool_names = tools
262            .into_iter()
263            .map(|tool| tool.name)
264            .collect::<Vec<String>>();
265
266        Ok(build_kontext_prompt_guidance(
267            tool_names.as_slice(),
268            integrations.as_slice(),
269        ))
270    }
271
272    pub async fn tools_list_with_options(
273        &self,
274        options: Option<ToolsListOptions>,
275    ) -> Result<Vec<KontextTool>, KontextDevError> {
276        self.ensure_connected().await?;
277
278        let mut inventory = self.build_tool_inventory(options.clone(), None).await?;
279
280        let missing_internal = self.get_internal_integrations_missing_tools().await;
281        if !missing_internal.is_empty() {
282            let refreshed = self.refresh_integration_inventory().await?;
283            inventory = self.build_tool_inventory(options, Some(refreshed)).await?;
284        }
285
286        let unresolved_internal = self.get_internal_integrations_missing_tools().await;
287        if self.state().await == ClientState::NeedsAuth
288            && self.gateway_client.state().await == ClientState::Ready
289            && unresolved_internal.is_empty()
290        {
291            *self.state.write().await = ClientState::Ready;
292        }
293
294        Ok(inventory.tools)
295    }
296
297    pub async fn tools_execute(
298        &self,
299        tool_id: &str,
300        args: Option<Map<String, Value>>,
301    ) -> Result<ToolResult, KontextDevError> {
302        self.ensure_connected().await?;
303
304        let route = self.route_for_execution(tool_id).await?;
305
306        match route.source {
307            BackendSource::Gateway => {
308                self.gateway_client
309                    .tools_execute(route.backend_tool_id.as_str(), args)
310                    .await
311            }
312            BackendSource::Internal => {
313                let Some(integration_id) = route.integration_id else {
314                    return Err(KontextDevError::ConnectSession {
315                        message: format!(
316                            "route for tool `{tool_id}` is missing integration metadata"
317                        ),
318                    });
319                };
320
321                let integration =
322                    self.integration_by_id(&integration_id)
323                        .await?
324                        .ok_or_else(|| KontextDevError::ConnectSession {
325                            message: format!(
326                                "internal integration `{integration_id}` is no longer attached"
327                            ),
328                        })?;
329
330                let internal_mcp = self.internal_mcp_for(&integration).await.ok_or_else(|| {
331                    KontextDevError::ConnectSession {
332                        message: format!(
333                            "internal integration `{integration_id}` has no MCP client"
334                        ),
335                    }
336                })?;
337
338                let token = match self
339                    .ensure_internal_resource_token(&integration, false)
340                    .await
341                {
342                    Ok(token) => token,
343                    Err(err) if is_auth_recovery_required(&err) => {
344                        self.ensure_internal_resource_token(&integration, true)
345                            .await?
346                    }
347                    Err(err) => return Err(err),
348                };
349
350                let integration_id_for_exec = integration_id.clone();
351                let execute_once = || {
352                    let integration_id = integration_id_for_exec.clone();
353                    let internal_mcp = internal_mcp.clone();
354                    let token = token.clone();
355                    let backend_tool_id = route.backend_tool_id.clone();
356                    let args = args.clone();
357                    async move {
358                        self.run_managed_internal_op(integration_id.as_str(), || async {
359                            let raw = internal_mcp
360                                .call_tool_with_access_token(
361                                    token.as_str(),
362                                    backend_tool_id.as_str(),
363                                    args,
364                                )
365                                .await?;
366                            Ok(ToolResult {
367                                content: extract_text_content(&raw),
368                                raw,
369                            })
370                        })
371                        .await
372                    }
373                };
374
375                match with_transient_retry(execute_once, 1).await {
376                    Ok(result) => Ok(result),
377                    Err(err) => {
378                        if is_auth_recovery_required(&err) {
379                            *self.state.write().await = ClientState::NeedsAuth;
380                        }
381                        Err(err)
382                    }
383                }
384            }
385        }
386    }
387
388    async fn ensure_connected(&self) -> Result<(), KontextDevError> {
389        let state = self.state().await;
390        if state == ClientState::Ready {
391            return Ok(());
392        }
393
394        *self.state.write().await = ClientState::Connecting;
395
396        match self.gateway_client.connect().await {
397            Ok(()) => {
398                if let Err(err) = self.refresh_integration_inventory().await {
399                    if is_auth_recovery_required(&err) {
400                        *self.state.write().await = ClientState::NeedsAuth;
401                    } else {
402                        *self.state.write().await = ClientState::Failed;
403                    }
404                    return Err(err);
405                }
406                *self.state.write().await = ClientState::Ready;
407                Ok(())
408            }
409            Err(err) => {
410                if is_auth_recovery_required(&err) {
411                    *self.state.write().await = ClientState::NeedsAuth;
412                } else {
413                    *self.state.write().await = ClientState::Failed;
414                }
415                Err(err)
416            }
417        }
418    }
419
420    async fn refresh_integration_inventory(
421        &self,
422    ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
423        let _guard = self.integration_refresh_lock.lock().await;
424
425        let list_once = || async { self.gateway_client.mcp().list_integrations().await };
426        let items = with_transient_retry(list_once, 1).await?;
427        self.apply_runtime_integrations(&items).await;
428        Ok(items)
429    }
430
431    async fn apply_runtime_integrations(&self, integrations: &[RuntimeIntegrationRecord]) {
432        let mut runtime = self.runtime_integrations.write().await;
433        runtime.clear();
434
435        for integration in integrations {
436            runtime.insert(integration.id.clone(), integration.clone());
437        }
438        drop(runtime);
439
440        let desired = integrations
441            .iter()
442            .filter(|integration| {
443                integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
444            })
445            .map(|integration| (integration.id.clone(), integration.url.clone()))
446            .collect::<BTreeMap<_, _>>();
447
448        let mut internal_mcps = self.internal_mcps.write().await;
449
450        internal_mcps.retain(|integration_id, mcp| {
451            desired
452                .get(integration_id)
453                .is_some_and(|url| mcp.mcp_url().ok().as_deref() == Some(url.as_str()))
454        });
455
456        for integration in integrations {
457            if integration.category != RuntimeIntegrationCategory::InternalMcpCredentials {
458                continue;
459            }
460            if internal_mcps.contains_key(&integration.id) {
461                continue;
462            }
463
464            internal_mcps.insert(
465                integration.id.clone(),
466                KontextMcp::new(KontextMcpConfig {
467                    client_session_id: self.config.client_session_id.clone(),
468                    client_id: self.config.client_id.clone(),
469                    redirect_uri: self.config.redirect_uri.clone(),
470                    url: Some(integration.url.clone()),
471                    server: self.config.server_url.clone(),
472                    client_secret: self.config.client_secret.clone(),
473                    scope: self.config.scope.clone(),
474                    resource: Some(integration.url.clone()),
475                    session_key: Some(format!("internal:{}", integration.id)),
476                    integration_ui_url: self.config.integration_ui_url.clone(),
477                    integration_return_to: self.config.integration_return_to.clone(),
478                    auth_timeout_seconds: self.config.auth_timeout_seconds,
479                    open_connect_page_on_login: Some(false),
480                    token_cache_path: None,
481                }),
482            );
483        }
484    }
485
486    async fn internal_mcp_for(&self, integration: &RuntimeIntegrationRecord) -> Option<KontextMcp> {
487        self.internal_mcps
488            .read()
489            .await
490            .get(&integration.id)
491            .cloned()
492    }
493
494    async fn integration_by_id(
495        &self,
496        integration_id: &str,
497    ) -> Result<Option<RuntimeIntegrationRecord>, KontextDevError> {
498        if let Some(existing) = self
499            .runtime_integrations
500            .read()
501            .await
502            .get(integration_id)
503            .cloned()
504        {
505            return Ok(Some(existing));
506        }
507
508        let refreshed = self.refresh_integration_inventory().await?;
509        Ok(refreshed.into_iter().find(|item| item.id == integration_id))
510    }
511
512    async fn ensure_internal_resource_token(
513        &self,
514        integration: &RuntimeIntegrationRecord,
515        force_exchange: bool,
516    ) -> Result<String, KontextDevError> {
517        if !force_exchange
518            && let Some(existing) = self
519                .internal_token_cache
520                .lock()
521                .await
522                .get(&integration.id)
523                .cloned()
524            && existing.is_fresh()
525        {
526            return Ok(existing.access_token);
527        }
528
529        let integration_lock = {
530            let mut locks = self.internal_token_locks.lock().await;
531            locks
532                .entry(integration.id.clone())
533                .or_insert_with(|| Arc::new(Mutex::new(())))
534                .clone()
535        };
536
537        let _guard = integration_lock.lock().await;
538
539        if !force_exchange
540            && let Some(existing) = self
541                .internal_token_cache
542                .lock()
543                .await
544                .get(&integration.id)
545                .cloned()
546            && existing.is_fresh()
547        {
548            return Ok(existing.access_token);
549        }
550
551        let session = self.gateway_client.mcp().authenticate_mcp().await?;
552        let exchanged = self
553            .gateway_client
554            .mcp()
555            .client()
556            .exchange_for_resource(&session.gateway_token.access_token, &integration.url, None)
557            .await?;
558
559        let expires_at = exchanged.expires_in.and_then(|seconds| {
560            (seconds > 0)
561                .then(|| Instant::now().checked_add(Duration::from_secs(seconds as u64)))
562                .flatten()
563        });
564
565        let token = InternalResourceToken {
566            access_token: exchanged.access_token,
567            expires_at,
568        };
569
570        let value = token.access_token.clone();
571        self.internal_token_cache
572            .lock()
573            .await
574            .insert(integration.id.clone(), token);
575        Ok(value)
576    }
577
578    async fn build_inventory_candidates(
579        &self,
580        options: Option<ToolsListOptions>,
581        runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
582    ) -> Result<Vec<RouteInventoryCandidate>, KontextDevError> {
583        self.resolved_internal_listings.write().await.clear();
584
585        let mut candidates = Vec::new();
586
587        let list_gateway_tools_once = || {
588            let gateway_client = self.gateway_client.clone();
589            async move { gateway_client.tools_list().await }
590        };
591        let gateway_tools = with_transient_retry(list_gateway_tools_once, 1)
592            .await?
593            .into_iter()
594            .take(
595                options
596                    .as_ref()
597                    .and_then(|value| value.limit)
598                    .unwrap_or(u32::MAX) as usize,
599            )
600            .collect::<Vec<_>>();
601        for tool in gateway_tools {
602            let route = ToolRouteRecord {
603                tool_id: tool.id.clone(),
604                backend_id: gateway_backend_id(),
605                source: BackendSource::Gateway,
606                backend_tool_id: tool.id.clone(),
607                integration_id: None,
608            };
609            candidates.push(RouteInventoryCandidate { tool, route });
610        }
611
612        let integrations = if let Some(runtime_integrations) = runtime_integrations_override {
613            self.apply_runtime_integrations(&runtime_integrations).await;
614            runtime_integrations
615        } else {
616            self.refresh_integration_inventory().await?
617        };
618
619        let mut internal_integrations = integrations
620            .into_iter()
621            .filter(|integration| {
622                integration.category == RuntimeIntegrationCategory::InternalMcpCredentials
623            })
624            .collect::<Vec<_>>();
625        internal_integrations.sort_by(|a, b| a.id.cmp(&b.id));
626
627        let mut pending_auth_retries = Vec::new();
628
629        for integration in &internal_integrations {
630            match self
631                .add_internal_tools_for_integration(&mut candidates, integration, false)
632                .await
633            {
634                Ok(()) => {}
635                Err(err) => {
636                    if is_authorization_error(&err) {
637                        pending_auth_retries.push(integration.clone());
638                        continue;
639                    }
640                    if is_token_exchange_error(&err) {
641                        *self.state.write().await = ClientState::NeedsAuth;
642                    }
643                }
644            }
645        }
646
647        let mut unresolved_internal_auth = false;
648        for integration in pending_auth_retries {
649            match self
650                .add_internal_tools_for_integration(&mut candidates, &integration, true)
651                .await
652            {
653                Ok(()) => {}
654                Err(err) => {
655                    if is_auth_recovery_required(&err) {
656                        unresolved_internal_auth = true;
657                        continue;
658                    }
659                }
660            }
661        }
662
663        if unresolved_internal_auth {
664            *self.state.write().await = ClientState::NeedsAuth;
665        } else if self.state().await == ClientState::NeedsAuth
666            && self.gateway_client.state().await == ClientState::Ready
667        {
668            *self.state.write().await = ClientState::Ready;
669        }
670
671        Ok(candidates)
672    }
673
674    async fn add_internal_tools_for_integration(
675        &self,
676        candidates: &mut Vec<RouteInventoryCandidate>,
677        integration: &RuntimeIntegrationRecord,
678        force_exchange: bool,
679    ) -> Result<(), KontextDevError> {
680        let Some(internal_mcp) = self.internal_mcp_for(integration).await else {
681            return Ok(());
682        };
683
684        let token = self
685            .ensure_internal_resource_token(integration, force_exchange)
686            .await?;
687
688        let list_once = || {
689            let integration_id = integration.id.clone();
690            let internal_mcp = internal_mcp.clone();
691            let token = token.clone();
692            async move {
693                self.run_managed_internal_op(&integration_id, || async {
694                    internal_mcp
695                        .list_tools_with_access_token(token.as_str())
696                        .await
697                })
698                .await
699            }
700        };
701
702        let tools = with_transient_retry(list_once, 1).await?;
703
704        self.resolved_internal_listings
705            .write()
706            .await
707            .insert(integration.id.clone());
708
709        for tool in tools {
710            let backend_tool_id = if tool.id.is_empty() {
711                tool.name.clone()
712            } else {
713                tool.id.clone()
714            };
715            let unified_id = format!("{}:{}", integration.id, tool.name);
716
717            let mapped = KontextTool {
718                id: unified_id.clone(),
719                name: tool.name,
720                description: tool.description,
721                input_schema: tool.input_schema,
722                server: Some(crate::mcp::KontextToolServer {
723                    id: integration.id.clone(),
724                    name: Some(integration.name.clone()),
725                }),
726            };
727
728            let route = ToolRouteRecord {
729                tool_id: unified_id,
730                backend_id: internal_backend_id(&integration.id),
731                source: BackendSource::Internal,
732                backend_tool_id,
733                integration_id: Some(integration.id.clone()),
734            };
735
736            candidates.push(RouteInventoryCandidate {
737                tool: mapped,
738                route,
739            });
740        }
741
742        Ok(())
743    }
744
745    async fn run_managed_internal_op<T, F, Fut>(
746        &self,
747        integration_id: &str,
748        operation: F,
749    ) -> Result<T, KontextDevError>
750    where
751        F: FnOnce() -> Fut,
752        Fut: Future<Output = Result<T, KontextDevError>>,
753    {
754        {
755            let mut managed = self.managed_internal_ops.lock().await;
756            *managed.entry(integration_id.to_string()).or_insert(0) += 1;
757        }
758
759        let result = operation().await;
760
761        {
762            let mut managed = self.managed_internal_ops.lock().await;
763            if let Some(count) = managed.get_mut(integration_id) {
764                *count = count.saturating_sub(1);
765                if *count == 0 {
766                    managed.remove(integration_id);
767                }
768            }
769        }
770
771        result
772    }
773
774    async fn build_tool_inventory(
775        &self,
776        options: Option<ToolsListOptions>,
777        runtime_integrations_override: Option<Vec<RuntimeIntegrationRecord>>,
778    ) -> Result<RouteInventorySnapshot, KontextDevError> {
779        let key = inventory_build_key(options.clone(), runtime_integrations_override.as_ref());
780
781        let _guard = self.build_lock.lock().await;
782
783        {
784            let last_key = self.last_build_key.read().await;
785            let snapshot = self.route_inventory.read().await;
786            if snapshot.version > 0 && last_key.as_deref() == Some(key.as_str()) {
787                return Ok(snapshot.clone());
788            }
789        }
790
791        let build_generation = *self.reset_generation.lock().await;
792        let build_run = {
793            let mut counter = self.build_run_counter.lock().await;
794            *counter += 1;
795            *counter
796        };
797
798        let candidates = self
799            .build_inventory_candidates(options, runtime_integrations_override)
800            .await?;
801
802        if *self.reset_generation.lock().await != build_generation {
803            return Ok(self.route_inventory.read().await.clone());
804        }
805
806        let latest_committed = *self.latest_committed_build_run.lock().await;
807        let snapshot = if build_run >= latest_committed {
808            *self.latest_committed_build_run.lock().await = build_run;
809            let mut version = self.version_counter.lock().await;
810            *version += 1;
811            let built = build_route_inventory(*version, candidates);
812            *self.route_inventory.write().await = built.clone();
813            *self.last_build_key.write().await = Some(key);
814            built
815        } else {
816            let current_version = self.route_inventory.read().await.version;
817            build_route_inventory(current_version, candidates)
818        };
819
820        Ok(snapshot)
821    }
822
823    async fn route_for_execution(&self, tool_id: &str) -> Result<ToolRouteRecord, KontextDevError> {
824        if let Some(route) = self
825            .route_inventory
826            .read()
827            .await
828            .routes
829            .get(tool_id)
830            .cloned()
831        {
832            return Ok(route);
833        }
834
835        let _ = self.build_tool_inventory(None, None).await?;
836
837        if let Some(route) = self
838            .route_inventory
839            .read()
840            .await
841            .routes
842            .get(tool_id)
843            .cloned()
844        {
845            return Ok(route);
846        }
847
848        Err(KontextDevError::ConnectSession {
849            message: format!(
850                "unknown tool `{tool_id}`. Call tools.list() to refresh available tools"
851            ),
852        })
853    }
854
855    async fn get_internal_integrations_missing_tools(&self) -> Vec<String> {
856        let resolved = self.resolved_internal_listings.read().await.clone();
857        self.internal_mcps
858            .read()
859            .await
860            .keys()
861            .filter(|integration_id| !resolved.contains(*integration_id))
862            .cloned()
863            .collect()
864    }
865
866    async fn dispose_internal_clients(&self) {
867        self.internal_mcps.write().await.clear();
868        self.internal_token_cache.lock().await.clear();
869        self.internal_token_locks.lock().await.clear();
870        self.resolved_internal_listings.write().await.clear();
871    }
872
873    async fn reset_inventory(&self) {
874        *self.reset_generation.lock().await += 1;
875        *self.version_counter.lock().await = 0;
876
877        let build_run = *self.build_run_counter.lock().await;
878        *self.latest_committed_build_run.lock().await = build_run;
879
880        *self.route_inventory.write().await = RouteInventorySnapshot::empty();
881        *self.last_build_key.write().await = None;
882    }
883}
884
885#[derive(Clone, Debug)]
886pub struct ToolsListOptions {
887    pub limit: Option<u32>,
888}
889
890pub fn create_kontext_orchestrator(config: KontextOrchestratorConfig) -> KontextOrchestrator {
891    KontextOrchestrator::new(config)
892}
893
894fn gateway_backend_id() -> String {
895    "gateway".to_string()
896}
897
898fn internal_backend_id(integration_id: &str) -> String {
899    format!("internal:{integration_id}")
900}
901
902fn inventory_build_key(
903    options: Option<ToolsListOptions>,
904    runtime_integrations: Option<&Vec<RuntimeIntegrationRecord>>,
905) -> String {
906    let limit_key = match options.and_then(|item| item.limit) {
907        Some(limit) => limit.to_string(),
908        None => "none".to_string(),
909    };
910    let mut key = format!("limit:{limit_key}");
911    key.push_str("|runtime:");
912    if let Some(integrations) = runtime_integrations {
913        let mut ids = integrations
914            .iter()
915            .map(|integration| integration.id.clone())
916            .collect::<Vec<_>>();
917        ids.sort();
918        key.push_str(ids.join(",").as_str());
919    }
920    key
921}
922
923fn build_route_inventory(
924    version: u64,
925    candidates: Vec<RouteInventoryCandidate>,
926) -> RouteInventorySnapshot {
927    let mut tools = Vec::<KontextTool>::new();
928    let mut routes = HashMap::<String, ToolRouteRecord>::new();
929    let mut conflicts = Vec::<RouteConflictRecord>::new();
930
931    for candidate in candidates {
932        let tool_id = candidate.route.tool_id.clone();
933        if !routes.contains_key(&tool_id) {
934            routes.insert(tool_id.clone(), candidate.route);
935            tools.push(candidate.tool);
936            continue;
937        }
938
939        let existing = routes
940            .get(&tool_id)
941            .cloned()
942            .unwrap_or_else(|| candidate.route.clone());
943
944        // TS parity default policy is keep_existing.
945        conflicts.push(RouteConflictRecord {
946            tool_id,
947            kept: existing,
948            dropped: candidate.route,
949        });
950    }
951
952    RouteInventorySnapshot {
953        version,
954        tools,
955        routes,
956        conflicts,
957    }
958}
959
960async fn with_transient_retry<T, F, Fut>(
961    mut operation: F,
962    max_retries: usize,
963) -> Result<T, KontextDevError>
964where
965    F: FnMut() -> Fut,
966    Fut: Future<Output = Result<T, KontextDevError>>,
967{
968    for attempt in 0..=max_retries {
969        match operation().await {
970            Ok(value) => return Ok(value),
971            Err(err) => {
972                if attempt >= max_retries || !is_transient_error(&err) {
973                    return Err(err);
974                }
975                tokio::time::sleep(Duration::from_millis(RETRY_DELAY_MS)).await;
976            }
977        }
978    }
979
980    Err(KontextDevError::ConnectSession {
981        message: "transient retry loop exhausted unexpectedly".to_string(),
982    })
983}
984
985fn is_transient_error(err: &KontextDevError) -> bool {
986    match err {
987        KontextDevError::ConnectSession { message }
988        | KontextDevError::TokenExchange { message, .. }
989        | KontextDevError::TokenRequest { message, .. } => {
990            let lower = message.to_ascii_lowercase();
991            lower.contains("429")
992                || lower.contains("500")
993                || lower.contains("502")
994                || lower.contains("503")
995                || lower.contains("504")
996                || lower.contains("timed out")
997                || lower.contains("connection reset")
998                || lower.contains("broken pipe")
999                || lower.contains("temporarily unavailable")
1000                || lower.contains("econnreset")
1001                || lower.contains("network")
1002        }
1003        _ => false,
1004    }
1005}
1006
1007fn is_authorization_error(err: &KontextDevError) -> bool {
1008    match err {
1009        KontextDevError::OAuthCallbackTimeout { .. }
1010        | KontextDevError::OAuthCallbackCancelled
1011        | KontextDevError::MissingAuthorizationCode
1012        | KontextDevError::OAuthCallbackError { .. }
1013        | KontextDevError::InvalidOAuthState
1014        | KontextDevError::TokenRequest { .. }
1015        | KontextDevError::TokenExchange { .. } => true,
1016        KontextDevError::ConnectSession { message }
1017        | KontextDevError::IntegrationOAuthInit { message } => {
1018            let lower = message.to_ascii_lowercase();
1019            lower.contains("401")
1020                || lower.contains("unauthorized")
1021                || lower.contains("invalid token")
1022                || lower.contains("authorization")
1023                || lower.contains("access token expired")
1024        }
1025        _ => false,
1026    }
1027}
1028
1029fn is_token_exchange_error(err: &KontextDevError) -> bool {
1030    matches!(err, KontextDevError::TokenExchange { .. })
1031}
1032
1033fn is_auth_recovery_required(err: &KontextDevError) -> bool {
1034    is_authorization_error(err) || is_token_exchange_error(err)
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039    use super::*;
1040
1041    fn route(tool_id: &str, backend_id: &str) -> ToolRouteRecord {
1042        ToolRouteRecord {
1043            tool_id: tool_id.to_string(),
1044            backend_id: backend_id.to_string(),
1045            source: BackendSource::Gateway,
1046            backend_tool_id: tool_id.to_string(),
1047            integration_id: None,
1048        }
1049    }
1050
1051    fn candidate(tool_id: &str, backend_id: &str) -> RouteInventoryCandidate {
1052        RouteInventoryCandidate {
1053            tool: KontextTool {
1054                id: tool_id.to_string(),
1055                name: tool_id.to_string(),
1056                description: None,
1057                input_schema: None,
1058                server: None,
1059            },
1060            route: route(tool_id, backend_id),
1061        }
1062    }
1063
1064    #[test]
1065    fn build_route_inventory_keeps_first_route_on_conflict() {
1066        let snapshot = build_route_inventory(
1067            1,
1068            vec![
1069                candidate("tool-a", "gateway"),
1070                candidate("tool-a", "internal:linear"),
1071                candidate("tool-b", "gateway"),
1072            ],
1073        );
1074
1075        assert_eq!(snapshot.tools.len(), 2);
1076        assert_eq!(snapshot.routes.get("tool-a").unwrap().backend_id, "gateway");
1077        assert_eq!(snapshot.conflicts.len(), 1);
1078        assert_eq!(snapshot.conflicts[0].kept.backend_id, "gateway");
1079        assert_eq!(snapshot.conflicts[0].dropped.backend_id, "internal:linear");
1080    }
1081
1082    #[test]
1083    fn inventory_build_key_is_stable_for_same_runtime_integrations() {
1084        let runtime = vec![
1085            RuntimeIntegrationRecord {
1086                id: "b".to_string(),
1087                name: "B".to_string(),
1088                url: "https://b".to_string(),
1089                category: RuntimeIntegrationCategory::InternalMcpCredentials,
1090                connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1091                auth_mode: None,
1092                credential_schema: None,
1093                requires_oauth: None,
1094                connection: None,
1095            },
1096            RuntimeIntegrationRecord {
1097                id: "a".to_string(),
1098                name: "A".to_string(),
1099                url: "https://a".to_string(),
1100                category: RuntimeIntegrationCategory::InternalMcpCredentials,
1101                connect_type: crate::mcp::RuntimeIntegrationConnectType::Credentials,
1102                auth_mode: None,
1103                credential_schema: None,
1104                requires_oauth: None,
1105                connection: None,
1106            },
1107        ];
1108
1109        let first = inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1110        let second =
1111            inventory_build_key(Some(ToolsListOptions { limit: Some(10) }), Some(&runtime));
1112        assert_eq!(first, second);
1113    }
1114
1115    #[test]
1116    fn inventory_build_key_distinguishes_none_from_zero_limit() {
1117        let no_limit = inventory_build_key(None, None);
1118        let zero_limit = inventory_build_key(Some(ToolsListOptions { limit: Some(0) }), None);
1119
1120        assert_ne!(no_limit, zero_limit);
1121        assert_eq!(no_limit, "limit:none|runtime:");
1122        assert_eq!(zero_limit, "limit:0|runtime:");
1123    }
1124
1125    #[test]
1126    fn transient_error_detection_matches_network_and_5xx() {
1127        let network = KontextDevError::ConnectSession {
1128            message: "connection reset by peer".to_string(),
1129        };
1130        let server_500 = KontextDevError::ConnectSession {
1131            message: "500 internal server error".to_string(),
1132        };
1133        let bad_request = KontextDevError::ConnectSession {
1134            message: "400 bad request".to_string(),
1135        };
1136
1137        assert!(is_transient_error(&network));
1138        assert!(is_transient_error(&server_500));
1139        assert!(!is_transient_error(&bad_request));
1140    }
1141}