Skip to main content

kontext_dev_sdk/
mcp.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tokio::sync::Mutex;
6
7use crate::KontextAuthSession;
8use crate::KontextDevClient;
9use crate::KontextDevConfig;
10use crate::KontextDevError;
11
12pub const DEFAULT_SERVER: &str = "https://api.kontext.dev";
13const MCP_SESSION_HEADER: &str = "Mcp-Session-Id";
14const META_SEARCH_TOOLS: &str = "SEARCH_TOOLS";
15const META_EXECUTE_TOOL: &str = "EXECUTE_TOOL";
16const DEFAULT_MCP_PROTOCOL_VERSION: &str = "2025-06-18";
17const STREAMABLE_HTTP_ACCEPT: &str = "application/json, text/event-stream";
18const STREAM_CONTENT_TYPE: &str = "text/event-stream";
19
20pub fn normalize_kontext_server_url(server: &str) -> String {
21    let mut url = server.trim_end_matches('/').to_string();
22    if let Some(stripped) = url.strip_suffix("/api/v1") {
23        url = stripped.to_string();
24    }
25    if let Some(stripped) = url.strip_suffix("/mcp") {
26        url = stripped.to_string();
27    }
28    url.trim_end_matches('/').to_string()
29}
30
31#[derive(Clone, Debug)]
32pub struct KontextMcpConfig {
33    pub client_session_id: String,
34    pub client_id: String,
35    pub redirect_uri: String,
36    pub url: Option<String>,
37    pub server: Option<String>,
38    pub client_secret: Option<String>,
39    pub scope: Option<String>,
40    pub resource: Option<String>,
41    pub session_key: Option<String>,
42    pub integration_ui_url: Option<String>,
43    pub integration_return_to: Option<String>,
44    pub auth_timeout_seconds: Option<i64>,
45    pub open_connect_page_on_login: Option<bool>,
46    pub token_cache_path: Option<String>,
47}
48
49impl Default for KontextMcpConfig {
50    fn default() -> Self {
51        Self {
52            client_session_id: String::new(),
53            client_id: String::new(),
54            redirect_uri: "http://localhost:3333/callback".to_string(),
55            url: None,
56            server: Some(DEFAULT_SERVER.to_string()),
57            client_secret: None,
58            scope: None,
59            resource: None,
60            session_key: None,
61            integration_ui_url: None,
62            integration_return_to: None,
63            auth_timeout_seconds: None,
64            open_connect_page_on_login: None,
65            token_cache_path: None,
66        }
67    }
68}
69
70#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
71#[serde(rename_all = "snake_case")]
72pub enum RuntimeIntegrationCategory {
73    GatewayRemoteMcp,
74    InternalMcpCredentials,
75}
76
77#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
78#[serde(rename_all = "snake_case")]
79pub enum RuntimeIntegrationConnectType {
80    Oauth,
81    UserToken,
82    Credentials,
83    None,
84}
85
86#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
87#[serde(rename_all = "camelCase")]
88pub struct RuntimeIntegrationRecord {
89    pub id: String,
90    pub name: String,
91    pub url: String,
92    pub category: RuntimeIntegrationCategory,
93    pub connect_type: RuntimeIntegrationConnectType,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub auth_mode: Option<String>,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub credential_schema: Option<serde_json::Value>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub requires_oauth: Option<bool>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub connection: Option<RuntimeIntegrationConnection>,
102}
103
104#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
105#[serde(rename_all = "camelCase")]
106pub struct RuntimeIntegrationConnection {
107    pub connected: bool,
108    pub status: String,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub expires_at: Option<String>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub display_name: Option<String>,
113}
114
115#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
116#[serde(rename_all = "camelCase")]
117pub struct KontextTool {
118    pub id: String,
119    pub name: String,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub description: Option<String>,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub input_schema: Option<serde_json::Value>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub server: Option<KontextToolServer>,
126}
127
128#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
129#[serde(rename_all = "camelCase")]
130pub struct KontextToolServer {
131    pub id: String,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub name: Option<String>,
134}
135
136#[derive(Clone, Debug, Default)]
137struct McpSessionState {
138    session_id: Option<String>,
139    access_token: Option<String>,
140}
141
142#[derive(Clone, Debug)]
143pub struct KontextMcp {
144    config: KontextMcpConfig,
145    client: KontextDevClient,
146    http: reqwest::Client,
147    session: Arc<Mutex<McpSessionState>>,
148}
149
150impl KontextMcp {
151    pub fn new(config: KontextMcpConfig) -> Self {
152        let server =
153            normalize_kontext_server_url(config.server.as_deref().unwrap_or(DEFAULT_SERVER));
154        let sdk_config = KontextDevConfig {
155            server,
156            client_id: config.client_id.clone(),
157            client_secret: config.client_secret.clone(),
158            scope: config.scope.clone().unwrap_or_default(),
159            server_name: "kontext-dev".to_string(),
160            resource: config
161                .resource
162                .clone()
163                .unwrap_or_else(|| "mcp-gateway".to_string()),
164            integration_ui_url: config.integration_ui_url.clone(),
165            integration_return_to: config.integration_return_to.clone(),
166            open_connect_page_on_login: config.open_connect_page_on_login.unwrap_or(true),
167            auth_timeout_seconds: config.auth_timeout_seconds.unwrap_or(300),
168            token_cache_path: config.token_cache_path.clone(),
169            redirect_uri: config.redirect_uri.clone(),
170        };
171
172        Self {
173            config,
174            client: KontextDevClient::new(sdk_config),
175            http: reqwest::Client::new(),
176            session: Arc::new(Mutex::new(McpSessionState::default())),
177        }
178    }
179
180    pub fn client(&self) -> &KontextDevClient {
181        &self.client
182    }
183
184    pub async fn authenticate_mcp(&self) -> Result<KontextAuthSession, KontextDevError> {
185        self.client.authenticate_mcp().await
186    }
187
188    pub fn mcp_url(&self) -> Result<String, KontextDevError> {
189        if let Some(url) = &self.config.url {
190            return Ok(url.clone());
191        }
192        self.client.mcp_url()
193    }
194
195    pub async fn list_integrations(
196        &self,
197    ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
198        let session = self.authenticate_mcp().await?;
199        let base = self.client.server_base_url()?;
200        let response = self
201            .http
202            .get(format!("{}/mcp/integrations", base.trim_end_matches('/')))
203            .bearer_auth(session.gateway_token.access_token)
204            .send()
205            .await
206            .map_err(|err| KontextDevError::ConnectSession {
207                message: err.to_string(),
208            })?;
209
210        if !response.status().is_success() {
211            let status = response.status();
212            let body = response.text().await.unwrap_or_default();
213            return Err(KontextDevError::ConnectSession {
214                message: format!("{status}: {body}"),
215            });
216        }
217
218        #[derive(Deserialize)]
219        struct IntegrationsResponse {
220            items: Vec<RuntimeIntegrationRecord>,
221        }
222
223        let payload = response
224            .json::<IntegrationsResponse>()
225            .await
226            .map_err(|err| KontextDevError::ConnectSession {
227                message: err.to_string(),
228            })?;
229
230        Ok(payload.items)
231    }
232
233    pub async fn list_tools(&self) -> Result<Vec<KontextTool>, KontextDevError> {
234        let session = self.authenticate_mcp().await?;
235        self.list_tools_with_access_token(&session.gateway_token.access_token)
236            .await
237    }
238
239    pub async fn list_tools_with_access_token(
240        &self,
241        access_token: &str,
242    ) -> Result<Vec<KontextTool>, KontextDevError> {
243        let result = self
244            .json_rpc_with_session(
245                access_token,
246                "tools/list",
247                json!({}),
248                Some("list-tools"),
249                true,
250            )
251            .await?;
252        parse_tools_list_result(&result)
253    }
254
255    pub async fn call_tool(
256        &self,
257        tool_id: &str,
258        args: Option<serde_json::Map<String, serde_json::Value>>,
259    ) -> Result<serde_json::Value, KontextDevError> {
260        let session = self.authenticate_mcp().await?;
261        self.call_tool_with_access_token(&session.gateway_token.access_token, tool_id, args)
262            .await
263    }
264
265    pub async fn call_tool_with_access_token(
266        &self,
267        access_token: &str,
268        tool_id: &str,
269        args: Option<serde_json::Map<String, serde_json::Value>>,
270    ) -> Result<serde_json::Value, KontextDevError> {
271        self.json_rpc_with_session(
272            access_token,
273            "tools/call",
274            json!({ "name": tool_id, "arguments": args.unwrap_or_default() }),
275            Some("call-tool"),
276            true,
277        )
278        .await
279    }
280
281    async fn json_rpc_with_session(
282        &self,
283        access_token: &str,
284        method: &str,
285        params: Value,
286        id: Option<&str>,
287        allow_session_retry: bool,
288    ) -> Result<Value, KontextDevError> {
289        let max_attempts = if allow_session_retry { 2 } else { 1 };
290        for attempt in 0..max_attempts {
291            let session_id = self.ensure_mcp_session(access_token).await?;
292
293            let response = self
294                .http
295                .post(self.mcp_url()?)
296                .bearer_auth(access_token)
297                .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
298                .header(MCP_SESSION_HEADER, &session_id)
299                .json(&json!({
300                    "jsonrpc": "2.0",
301                    "id": id.unwrap_or("1"),
302                    "method": method,
303                    "params": params,
304                }))
305                .send()
306                .await
307                .map_err(|err| KontextDevError::ConnectSession {
308                    message: err.to_string(),
309                })?;
310
311            if !response.status().is_success() {
312                let status = response.status();
313                let body = response.text().await.unwrap_or_default();
314                let retryable = attempt + 1 < max_attempts && is_invalid_session_error(&body);
315                if retryable {
316                    self.invalidate_session().await;
317                    continue;
318                }
319                return Err(KontextDevError::ConnectSession {
320                    message: format!("{status}: {body}"),
321                });
322            }
323
324            let payload = parse_json_or_streamable_response(response).await?;
325
326            if let Some(error) = payload.get("error") {
327                let message = extract_jsonrpc_error_message(error);
328                let retryable =
329                    attempt + 1 < max_attempts && is_invalid_session_error(message.as_str());
330                if retryable {
331                    self.invalidate_session().await;
332                    continue;
333                }
334                return Err(KontextDevError::ConnectSession { message });
335            }
336
337            return Ok(payload.get("result").cloned().unwrap_or(Value::Null));
338        }
339
340        Err(KontextDevError::ConnectSession {
341            message: "MCP request failed after session retry".to_string(),
342        })
343    }
344
345    async fn ensure_mcp_session(&self, access_token: &str) -> Result<String, KontextDevError> {
346        {
347            let guard = self.session.lock().await;
348            if guard.access_token.as_deref() == Some(access_token)
349                && let Some(session_id) = guard.session_id.clone()
350            {
351                return Ok(session_id);
352            }
353        }
354
355        let initialize_response = self
356            .http
357            .post(self.mcp_url()?)
358            .bearer_auth(access_token)
359            .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
360            .json(&json!({
361                "jsonrpc": "2.0",
362                "id": "initialize",
363                "method": "initialize",
364                "params": {
365                    "protocolVersion": DEFAULT_MCP_PROTOCOL_VERSION,
366                    "capabilities": {
367                        "tools": {}
368                    },
369                    "clientInfo": {
370                        "name": "kontext-dev-sdk-rs",
371                        "version": env!("CARGO_PKG_VERSION"),
372                        "sessionId": self.config.client_session_id
373                    }
374                }
375            }))
376            .send()
377            .await
378            .map_err(|err| KontextDevError::ConnectSession {
379                message: err.to_string(),
380            })?;
381
382        if !initialize_response.status().is_success() {
383            let status = initialize_response.status();
384            let body = initialize_response.text().await.unwrap_or_default();
385            return Err(KontextDevError::ConnectSession {
386                message: format!("{status}: {body}"),
387            });
388        }
389
390        let session_header = initialize_response
391            .headers()
392            .get(MCP_SESSION_HEADER)
393            .or_else(|| initialize_response.headers().get("mcp-session-id"))
394            .and_then(|value| value.to_str().ok())
395            .map(|value| value.trim().to_string());
396
397        let initialize_payload = parse_json_or_streamable_response(initialize_response).await?;
398
399        if let Some(error) = initialize_payload.get("error") {
400            return Err(KontextDevError::ConnectSession {
401                message: extract_jsonrpc_error_message(error),
402            });
403        }
404
405        let session_id = session_header
406            .or_else(|| {
407                initialize_payload
408                    .get("result")
409                    .and_then(|result| result.get("sessionId"))
410                    .and_then(|value| value.as_str())
411                    .map(|value| value.to_string())
412            })
413            .or_else(|| {
414                initialize_payload
415                    .get("result")
416                    .and_then(|result| result.get("session_id"))
417                    .and_then(|value| value.as_str())
418                    .map(|value| value.to_string())
419            })
420            .ok_or_else(|| KontextDevError::ConnectSession {
421                message: "MCP initialize did not return a session id".to_string(),
422            })?;
423
424        // Best-effort initialized notification. Many servers accept requests
425        // without it, but send it to follow the Streamable HTTP MCP handshake.
426        let _ = self
427            .http
428            .post(self.mcp_url()?)
429            .bearer_auth(access_token)
430            .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
431            .header(MCP_SESSION_HEADER, &session_id)
432            .json(&json!({
433                "jsonrpc": "2.0",
434                "method": "notifications/initialized",
435                "params": {}
436            }))
437            .send()
438            .await;
439
440        {
441            let mut guard = self.session.lock().await;
442            guard.session_id = Some(session_id.clone());
443            guard.access_token = Some(access_token.to_string());
444        }
445
446        Ok(session_id)
447    }
448
449    async fn invalidate_session(&self) {
450        let mut guard = self.session.lock().await;
451        guard.session_id = None;
452        guard.access_token = None;
453    }
454}
455
456fn extract_jsonrpc_error_message(error: &Value) -> String {
457    error
458        .get("message")
459        .and_then(|value| value.as_str())
460        .map(ToString::to_string)
461        .or_else(|| {
462            error
463                .get("error_description")
464                .and_then(|value| value.as_str())
465                .map(ToString::to_string)
466        })
467        .unwrap_or_else(|| error.to_string())
468}
469
470fn is_invalid_session_error(message: &str) -> bool {
471    let lower = message.to_ascii_lowercase();
472    lower.contains("no valid session id")
473        || lower.contains("no valid session-id")
474        || lower.contains("invalid session")
475}
476
477async fn parse_json_or_streamable_response(
478    response: reqwest::Response,
479) -> Result<Value, KontextDevError> {
480    let content_type = response
481        .headers()
482        .get(reqwest::header::CONTENT_TYPE)
483        .and_then(|value| value.to_str().ok())
484        .map(|value| value.to_ascii_lowercase())
485        .unwrap_or_default();
486    let body = response
487        .text()
488        .await
489        .map_err(|err| KontextDevError::ConnectSession {
490            message: err.to_string(),
491        })?;
492
493    parse_json_or_streamable_body(&body, &content_type)
494        .map_err(|message| KontextDevError::ConnectSession { message })
495}
496
497fn parse_json_or_streamable_body(body: &str, content_type: &str) -> Result<Value, String> {
498    let parse_json = || serde_json::from_str::<Value>(body).map_err(|err| err.to_string());
499    let parse_sse = || parse_sse_last_json_event(body);
500
501    if content_type.contains(STREAM_CONTENT_TYPE) {
502        return parse_sse().ok_or_else(|| {
503            "failed to parse streamable MCP response as SSE JSON events".to_string()
504        });
505    }
506
507    parse_json().or_else(|json_err| {
508        parse_sse().ok_or_else(|| format!("failed to decode response body: {json_err}"))
509    })
510}
511
512fn parse_sse_last_json_event(body: &str) -> Option<Value> {
513    let mut current_data = Vec::<String>::new();
514    let mut last_json = None;
515
516    let flush_data = |current_data: &mut Vec<String>, last_json: &mut Option<Value>| {
517        if current_data.is_empty() {
518            return;
519        }
520        let data = current_data.join("\n");
521        current_data.clear();
522        let trimmed = data.trim();
523        if trimmed.is_empty() || trimmed == "[DONE]" {
524            return;
525        }
526        if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
527            *last_json = Some(value);
528        }
529    };
530
531    for line in body.lines() {
532        let line = line.trim_end_matches('\r');
533        if line.is_empty() {
534            flush_data(&mut current_data, &mut last_json);
535            continue;
536        }
537        if let Some(data) = line.strip_prefix("data:") {
538            current_data.push(data.trim_start().to_string());
539            continue;
540        }
541        if let Ok(value) = serde_json::from_str::<Value>(line) {
542            last_json = Some(value);
543        }
544    }
545    flush_data(&mut current_data, &mut last_json);
546
547    last_json
548}
549
550pub(crate) fn has_meta_gateway_tools(tools: &[KontextTool]) -> bool {
551    let mut has_search = false;
552    let mut has_execute = false;
553    for tool in tools {
554        if tool.name == META_SEARCH_TOOLS {
555            has_search = true;
556        } else if tool.name == META_EXECUTE_TOOL {
557            has_execute = true;
558        }
559    }
560    has_search && has_execute
561}
562
563pub(crate) fn extract_json_resource_text(result: &Value) -> Option<String> {
564    let content = result.get("content")?.as_array()?;
565    for item in content {
566        if item.get("type").and_then(Value::as_str) != Some("resource") {
567            continue;
568        }
569        let Some(resource) = item.get("resource") else {
570            continue;
571        };
572        if resource.get("mimeType").and_then(Value::as_str) != Some("application/json") {
573            continue;
574        }
575        if let Some(text) = resource.get("text").and_then(Value::as_str) {
576            return Some(text.to_string());
577        }
578    }
579    None
580}
581
582pub(crate) fn extract_text_content(result: &Value) -> String {
583    let Some(content) = result.get("content").and_then(Value::as_array) else {
584        return result.to_string();
585    };
586
587    let mut text_items = Vec::new();
588    for item in content {
589        if item.get("type").and_then(Value::as_str) == Some("text")
590            && let Some(text) = item.get("text").and_then(Value::as_str)
591        {
592            text_items.push(text.to_string());
593        }
594    }
595    if !text_items.is_empty() {
596        return text_items.join("\n");
597    }
598
599    let mut resource_items = Vec::new();
600    for item in content {
601        if item.get("type").and_then(Value::as_str) != Some("resource") {
602            continue;
603        }
604        let Some(resource_text) = item
605            .get("resource")
606            .and_then(|resource| resource.get("text"))
607            .and_then(Value::as_str)
608        else {
609            continue;
610        };
611
612        let parsed = serde_json::from_str::<Value>(resource_text)
613            .ok()
614            .map(|value| extract_text_content(&value))
615            .unwrap_or_else(|| resource_text.to_string());
616        resource_items.push(parsed);
617    }
618
619    if !resource_items.is_empty() {
620        return resource_items.join("\n");
621    }
622
623    content
624        .iter()
625        .map(Value::to_string)
626        .collect::<Vec<_>>()
627        .join("\n")
628}
629
630#[derive(Clone, Debug)]
631pub(crate) struct GatewayToolsPayload {
632    pub tools: Vec<KontextTool>,
633    pub errors: Vec<GatewayToolError>,
634    pub elicitations: Vec<GatewayElicitation>,
635}
636
637#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
638#[serde(rename_all = "camelCase")]
639pub struct GatewayToolError {
640    pub server_id: String,
641    #[serde(default)]
642    pub server_name: Option<String>,
643    #[serde(default)]
644    pub reason: Option<String>,
645}
646
647#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
648#[serde(rename_all = "camelCase")]
649pub struct GatewayElicitation {
650    pub url: String,
651    #[serde(default)]
652    pub message: Option<String>,
653    #[serde(default)]
654    pub integration_id: Option<String>,
655    #[serde(default)]
656    pub integration_name: Option<String>,
657}
658
659#[derive(Clone, Debug, Deserialize)]
660#[serde(rename_all = "camelCase")]
661struct GatewayToolSummary {
662    id: String,
663    name: String,
664    #[serde(default)]
665    description: Option<String>,
666    #[serde(default)]
667    input_schema: Option<Value>,
668    #[serde(default)]
669    server: Option<GatewayToolServer>,
670}
671
672#[derive(Clone, Debug, Deserialize)]
673#[serde(rename_all = "camelCase")]
674struct GatewayToolServer {
675    #[serde(default)]
676    id: Option<String>,
677    #[serde(default)]
678    name: Option<String>,
679}
680
681#[derive(Debug, Deserialize)]
682#[serde(rename_all = "camelCase")]
683struct RawTool {
684    name: String,
685    #[serde(default)]
686    description: Option<String>,
687    #[serde(default)]
688    input_schema: Option<serde_json::Value>,
689}
690
691fn parse_tools_list_result(result: &Value) -> Result<Vec<KontextTool>, KontextDevError> {
692    let tools = result
693        .get("tools")
694        .and_then(|value| value.as_array())
695        .cloned()
696        .unwrap_or_default();
697
698    tools
699        .into_iter()
700        .map(|tool| {
701            let raw: RawTool =
702                serde_json::from_value(tool).map_err(|err| KontextDevError::ConnectSession {
703                    message: format!("invalid tool payload: {err}"),
704                })?;
705
706            Ok(KontextTool {
707                id: raw.name.clone(),
708                name: raw.name,
709                description: raw.description,
710                input_schema: raw.input_schema,
711                server: None,
712            })
713        })
714        .collect()
715}
716
717pub(crate) fn parse_gateway_tools_payload(
718    raw: &Value,
719) -> Result<GatewayToolsPayload, KontextDevError> {
720    let json_text =
721        extract_json_resource_text(raw).ok_or_else(|| KontextDevError::ConnectSession {
722            message: "SEARCH_TOOLS did not return JSON resource content".to_string(),
723        })?;
724
725    let parsed = serde_json::from_str::<Value>(&json_text).map_err(|err| {
726        KontextDevError::ConnectSession {
727            message: format!("SEARCH_TOOLS returned invalid JSON: {err}"),
728        }
729    })?;
730
731    if let Some(items) = parsed.as_array() {
732        let tools = items
733            .iter()
734            .cloned()
735            .map(serde_json::from_value::<GatewayToolSummary>)
736            .collect::<Result<Vec<_>, _>>()
737            .map_err(|err| KontextDevError::ConnectSession {
738                message: format!("SEARCH_TOOLS returned invalid tool entry: {err}"),
739            })?
740            .into_iter()
741            .map(to_kontext_gateway_tool)
742            .collect();
743        return Ok(GatewayToolsPayload {
744            tools,
745            errors: Vec::new(),
746            elicitations: Vec::new(),
747        });
748    }
749
750    let Some(obj) = parsed.as_object() else {
751        return Err(KontextDevError::ConnectSession {
752            message: "SEARCH_TOOLS response was not a JSON array or object".to_string(),
753        });
754    };
755
756    let tools = obj
757        .get("items")
758        .and_then(Value::as_array)
759        .cloned()
760        .unwrap_or_default()
761        .into_iter()
762        .map(serde_json::from_value::<GatewayToolSummary>)
763        .collect::<Result<Vec<_>, _>>()
764        .map_err(|err| KontextDevError::ConnectSession {
765            message: format!("SEARCH_TOOLS items contained invalid tool data: {err}"),
766        })?
767        .into_iter()
768        .map(to_kontext_gateway_tool)
769        .collect::<Vec<_>>();
770
771    let errors = obj
772        .get("errors")
773        .and_then(Value::as_array)
774        .cloned()
775        .unwrap_or_default()
776        .into_iter()
777        .filter_map(|value| serde_json::from_value::<GatewayToolError>(value).ok())
778        .collect::<Vec<_>>();
779
780    let elicitations = obj
781        .get("elicitations")
782        .and_then(Value::as_array)
783        .cloned()
784        .unwrap_or_default()
785        .into_iter()
786        .filter_map(|value| serde_json::from_value::<GatewayElicitation>(value).ok())
787        .collect::<Vec<_>>();
788
789    Ok(GatewayToolsPayload {
790        tools,
791        errors,
792        elicitations,
793    })
794}
795
796fn to_kontext_gateway_tool(summary: GatewayToolSummary) -> KontextTool {
797    let server = summary.server.and_then(|server| {
798        server.id.map(|id| KontextToolServer {
799            id,
800            name: server.name,
801        })
802    });
803
804    KontextTool {
805        id: summary.id,
806        name: summary.name,
807        description: summary.description,
808        input_schema: summary.input_schema,
809        server,
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use super::*;
816
817    #[test]
818    fn parse_json_or_streamable_body_parses_json_payload() {
819        let parsed = parse_json_or_streamable_body(
820            r#"{"jsonrpc":"2.0","result":{"ok":true}}"#,
821            "application/json",
822        )
823        .expect("json should parse");
824        assert_eq!(parsed["result"]["ok"], Value::Bool(true));
825    }
826
827    #[test]
828    fn parse_json_or_streamable_body_parses_sse_payload() {
829        let parsed = parse_json_or_streamable_body(
830            "event: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{\"sessionId\":\"abc\"}}\n\n",
831            "text/event-stream",
832        )
833        .expect("sse should parse");
834        assert_eq!(
835            parsed["result"]["sessionId"],
836            Value::String("abc".to_string())
837        );
838    }
839
840    #[test]
841    fn parse_json_or_streamable_body_falls_back_to_sse_when_content_type_is_json() {
842        let parsed = parse_json_or_streamable_body(
843            "data: {\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]}}\n\n",
844            "application/json",
845        )
846        .expect("sse fallback should parse");
847        assert_eq!(parsed["result"]["tools"], Value::Array(Vec::new()));
848    }
849
850    #[test]
851    fn raw_tool_parses_input_schema_from_camel_case_key() {
852        let parsed: RawTool = serde_json::from_value(serde_json::json!({
853            "name": "SEARCH_TOOLS",
854            "description": "Search available tools",
855            "inputSchema": { "type": "object", "properties": { "limit": { "type": "number" } } }
856        }))
857        .expect("raw tool should deserialize");
858
859        assert_eq!(parsed.name, "SEARCH_TOOLS");
860        assert_eq!(
861            parsed
862                .input_schema
863                .as_ref()
864                .and_then(|value| value.get("type"))
865                .and_then(Value::as_str),
866            Some("object")
867        );
868    }
869
870    #[test]
871    fn extract_json_resource_text_skips_resource_items_without_resource_payload() {
872        let payload = serde_json::json!({
873            "content": [
874                { "type": "resource" },
875                {
876                    "type": "resource",
877                    "resource": {
878                        "mimeType": "application/json",
879                        "text": "{\"ok\":true}"
880                    }
881                }
882            ]
883        });
884
885        assert_eq!(
886            extract_json_resource_text(&payload),
887            Some("{\"ok\":true}".to_string())
888        );
889    }
890
891    #[test]
892    fn runtime_integration_record_parses_user_token_connect_type() {
893        let parsed: RuntimeIntegrationRecord = serde_json::from_value(serde_json::json!({
894            "id": "convex-int",
895            "name": "Convex",
896            "url": "https://convex.example.com/mcp",
897            "category": "gateway_remote_mcp",
898            "connectType": "user_token"
899        }))
900        .expect("record should deserialize");
901
902        assert_eq!(
903            parsed.connect_type,
904            RuntimeIntegrationConnectType::UserToken
905        );
906    }
907
908    #[test]
909    fn runtime_integration_record_rejects_unknown_connect_type() {
910        let err = serde_json::from_value::<RuntimeIntegrationRecord>(serde_json::json!({
911            "id": "convex-int",
912            "name": "Convex",
913            "url": "https://convex.example.com/mcp",
914            "category": "gateway_remote_mcp",
915            "connectType": "api_key"
916        }))
917        .expect_err("record should reject unknown connect type");
918
919        assert!(err.to_string().contains("unknown variant"));
920    }
921}