Skip to main content

codex/mcp/
runtime.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    env,
4    ffi::OsString,
5    io,
6    path::{Path, PathBuf},
7    time::Duration,
8};
9
10use serde::{Deserialize, Serialize};
11use thiserror::Error;
12use tokio::{
13    process::{Child, ChildStderr, ChildStdin, ChildStdout, Command},
14    time,
15};
16
17use super::{
18    McpConfigError, McpConfigManager, McpServerDefinition, McpServerEntry, McpToolConfig,
19    McpTransport, StdioServerConfig, StdioServerDefinition, StreamableHttpDefinition,
20};
21
22/// Resolved runtime configuration for an MCP server, ready for spawning or connecting.
23#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
24pub struct McpRuntimeServer {
25    pub name: String,
26    pub transport: McpRuntimeTransport,
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub description: Option<String>,
29    #[serde(default, skip_serializing_if = "Vec::is_empty")]
30    pub tags: Vec<String>,
31    #[serde(default, skip_serializing_if = "Option::is_none")]
32    pub tools: Option<McpToolConfig>,
33}
34
35/// Transport-specific runtime configuration.
36#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(tag = "transport", rename_all = "snake_case")]
38pub enum McpRuntimeTransport {
39    Stdio(StdioServerDefinition),
40    StreamableHttp(ResolvedStreamableHttpDefinition),
41}
42
43/// HTTP runtime config with bearer tokens resolved from the environment.
44#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
45pub struct ResolvedStreamableHttpDefinition {
46    pub url: String,
47    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
48    pub headers: BTreeMap<String, String>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub bearer_env_var: Option<String>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub bearer_token: Option<String>,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub connect_timeout_ms: Option<u64>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub request_timeout_ms: Option<u64>,
57}
58
59/// Launcher/connector wrapper around a resolved MCP runtime server.
60#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
61pub struct McpServerLauncher {
62    pub name: String,
63    pub transport: McpServerLauncherTransport,
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub description: Option<String>,
66    #[serde(default, skip_serializing_if = "Vec::is_empty")]
67    pub tags: Vec<String>,
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub tools: Option<McpToolConfig>,
70}
71
72/// Transport-specific launcher/connector.
73#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
74pub enum McpServerLauncherTransport {
75    Stdio(StdioLauncher),
76    StreamableHttp(StreamableHttpConnector),
77}
78
79/// Prepared stdio launcher with merged env and startup timeout.
80#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
81pub struct StdioLauncher {
82    pub command: PathBuf,
83    pub args: Vec<String>,
84    pub env: Vec<(OsString, OsString)>,
85    pub current_dir: Option<PathBuf>,
86    pub timeout: Duration,
87    pub mirror_stdio: bool,
88}
89
90/// Prepared HTTP connector with resolved headers and timeouts.
91#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
92pub struct StreamableHttpConnector {
93    pub url: String,
94    pub headers: BTreeMap<String, String>,
95    pub bearer_env_var: Option<String>,
96    pub bearer_token: Option<String>,
97    pub connect_timeout: Option<Duration>,
98    pub request_timeout: Option<Duration>,
99}
100
101impl From<McpServerEntry> for McpRuntimeServer {
102    fn from(entry: McpServerEntry) -> Self {
103        let McpServerEntry { name, definition } = entry;
104        McpRuntimeServer::from_definition(name, definition)
105    }
106}
107
108impl McpRuntimeServer {
109    /// Builds a runtime config from a stored server definition.
110    pub fn from_definition(name: impl Into<String>, definition: McpServerDefinition) -> Self {
111        let McpServerDefinition {
112            transport,
113            description,
114            tags,
115            tools,
116        } = definition;
117
118        Self {
119            name: name.into(),
120            transport: McpRuntimeTransport::from_transport(transport),
121            description,
122            tags,
123            tools,
124        }
125    }
126
127    /// Converts a runtime server into a launcher/connector, merging stdio defaults.
128    pub fn into_launcher(self, defaults: &StdioServerConfig) -> McpServerLauncher {
129        let McpRuntimeServer {
130            name,
131            transport,
132            description,
133            tags,
134            tools,
135        } = self;
136
137        let transport = match transport {
138            McpRuntimeTransport::Stdio(def) => {
139                McpServerLauncherTransport::Stdio(StdioLauncher::from_runtime(def, defaults))
140            }
141            McpRuntimeTransport::StreamableHttp(def) => {
142                McpServerLauncherTransport::StreamableHttp(def.into())
143            }
144        };
145
146        McpServerLauncher {
147            name,
148            transport,
149            description,
150            tags,
151            tools,
152        }
153    }
154
155    /// Convenience clone-preserving conversion to a launcher/connector.
156    pub fn to_launcher(&self, defaults: &StdioServerConfig) -> McpServerLauncher {
157        self.clone().into_launcher(defaults)
158    }
159}
160
161impl McpRuntimeTransport {
162    fn from_transport(transport: McpTransport) -> Self {
163        match transport {
164            McpTransport::Stdio(definition) => McpRuntimeTransport::Stdio(definition),
165            McpTransport::StreamableHttp(definition) => {
166                McpRuntimeTransport::StreamableHttp(resolve_streamable_http(definition))
167            }
168        }
169    }
170}
171
172fn resolve_streamable_http(
173    definition: StreamableHttpDefinition,
174) -> ResolvedStreamableHttpDefinition {
175    let StreamableHttpDefinition {
176        url,
177        headers,
178        bearer_env_var,
179        connect_timeout_ms,
180        request_timeout_ms,
181    } = definition;
182
183    let mut headers = headers;
184    let mut bearer_token = None;
185    if let Some(env_var) = bearer_env_var.as_deref() {
186        if let Ok(token) = env::var(env_var) {
187            if !token.is_empty() {
188                let has_auth_header = headers
189                    .keys()
190                    .any(|key| key.eq_ignore_ascii_case("authorization"));
191                if !has_auth_header {
192                    headers.insert("Authorization".into(), format!("Bearer {token}"));
193                }
194                bearer_token = Some(token);
195            }
196        }
197    }
198
199    ResolvedStreamableHttpDefinition {
200        url,
201        headers,
202        bearer_env_var,
203        bearer_token,
204        connect_timeout_ms,
205        request_timeout_ms,
206    }
207}
208
209impl StdioLauncher {
210    fn from_runtime(definition: StdioServerDefinition, defaults: &StdioServerConfig) -> Self {
211        let env = merge_stdio_env(
212            defaults.code_home.as_deref(),
213            &defaults.env,
214            &definition.env,
215        );
216
217        Self {
218            command: PathBuf::from(definition.command),
219            args: definition.args,
220            env,
221            current_dir: defaults.current_dir.clone(),
222            timeout: definition
223                .timeout_ms
224                .map(Duration::from_millis)
225                .unwrap_or(defaults.startup_timeout),
226            mirror_stdio: defaults.mirror_stdio,
227        }
228    }
229
230    /// Builds a `tokio::process::Command` with merged env/dirs applied.
231    pub fn command(&self) -> Command {
232        let mut command = Command::new(&self.command);
233        command
234            .args(&self.args)
235            .stdin(std::process::Stdio::piped())
236            .stdout(std::process::Stdio::piped())
237            .stderr(std::process::Stdio::piped())
238            .kill_on_drop(true);
239
240        if let Some(dir) = &self.current_dir {
241            command.current_dir(dir);
242        }
243
244        for (key, value) in &self.env {
245            command.env(key, value);
246        }
247
248        command
249    }
250}
251
252impl From<ResolvedStreamableHttpDefinition> for StreamableHttpConnector {
253    fn from(definition: ResolvedStreamableHttpDefinition) -> Self {
254        let ResolvedStreamableHttpDefinition {
255            url,
256            headers,
257            bearer_env_var,
258            bearer_token,
259            connect_timeout_ms,
260            request_timeout_ms,
261        } = definition;
262
263        Self {
264            url,
265            headers,
266            bearer_env_var,
267            bearer_token,
268            connect_timeout: connect_timeout_ms.map(Duration::from_millis),
269            request_timeout: request_timeout_ms.map(Duration::from_millis),
270        }
271    }
272}
273
274pub(crate) fn merge_stdio_env(
275    code_home: Option<&Path>,
276    base_env: &[(OsString, OsString)],
277    runtime_env: &BTreeMap<String, String>,
278) -> Vec<(OsString, OsString)> {
279    let mut merged: HashMap<OsString, OsString> = HashMap::new();
280
281    if let Some(code_home) = code_home {
282        merged.insert(
283            OsString::from("CODEX_HOME"),
284            code_home.as_os_str().to_os_string(),
285        );
286    }
287
288    for (key, value) in base_env {
289        merged.insert(key.clone(), value.clone());
290    }
291
292    for (key, value) in runtime_env {
293        merged.insert(OsString::from(key), OsString::from(value));
294    }
295
296    merged.into_iter().collect()
297}
298
299/// Summarized runtime metadata for listing available MCP servers.
300#[derive(Clone, Debug, PartialEq, Eq)]
301pub struct McpRuntimeSummary {
302    pub name: String,
303    pub description: Option<String>,
304    pub tags: Vec<String>,
305    pub tools: Option<McpToolConfig>,
306    pub transport: McpRuntimeSummaryTransport,
307}
308
309/// Transport kind used by a runtime.
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub enum McpRuntimeSummaryTransport {
312    Stdio,
313    StreamableHttp,
314}
315
316impl From<&McpServerLauncher> for McpRuntimeSummary {
317    fn from(launcher: &McpServerLauncher) -> Self {
318        let transport = match launcher.transport {
319            McpServerLauncherTransport::Stdio(_) => McpRuntimeSummaryTransport::Stdio,
320            McpServerLauncherTransport::StreamableHttp(_) => {
321                McpRuntimeSummaryTransport::StreamableHttp
322            }
323        };
324
325        Self {
326            name: launcher.name.clone(),
327            description: launcher.description.clone(),
328            tags: launcher.tags.clone(),
329            tools: launcher.tools.clone(),
330            transport,
331        }
332    }
333}
334
335/// Errors surfaced while starting or stopping MCP runtimes.
336#[derive(Debug, Error)]
337pub enum McpRuntimeError {
338    #[error("runtime `{0}` not found")]
339    NotFound(String),
340    #[error("runtime `{name}` uses `{actual}` transport (expected {expected})")]
341    UnsupportedTransport {
342        name: String,
343        expected: &'static str,
344        actual: &'static str,
345    },
346    #[error("failed to spawn `{command:?}`: {source}")]
347    Spawn {
348        command: PathBuf,
349        #[source]
350        source: io::Error,
351    },
352    #[error("stdio pipes unavailable for `{name}`")]
353    MissingPipes { name: String },
354    #[error("failed to stop `{name}`: {source}")]
355    Stop {
356        name: String,
357        #[source]
358        source: io::Error,
359    },
360    #[error("timed out stopping `{name}` after {timeout:?}")]
361    StopTimeout { name: String, timeout: Duration },
362}
363
364/// Lightweight runtime manager that owns resolved launchers/connectors.
365///
366/// The manager is non-destructive: launchers remain available after `prepare`
367/// is called so callers can reuse connectors or restart stdio servers as
368/// needed.
369#[derive(Clone, Debug)]
370pub struct McpRuntimeManager {
371    launchers: BTreeMap<String, McpServerLauncher>,
372}
373
374impl McpRuntimeManager {
375    /// Construct a runtime manager from resolved launchers.
376    pub fn new(launchers: Vec<McpServerLauncher>) -> Self {
377        let mut map = BTreeMap::new();
378        for launcher in launchers {
379            map.insert(launcher.name.clone(), launcher);
380        }
381        Self { launchers: map }
382    }
383
384    /// Returns the available runtimes with tool hints intact.
385    pub fn available(&self) -> Vec<McpRuntimeSummary> {
386        self.launchers
387            .values()
388            .map(McpRuntimeSummary::from)
389            .collect()
390    }
391
392    /// Returns a cloned launcher/connector by name without mutating storage.
393    pub fn launcher(&self, name: &str) -> Option<McpServerLauncher> {
394        self.launchers.get(name).cloned()
395    }
396
397    /// Start a stdio runtime or hand back HTTP connector metadata.
398    pub fn prepare(&self, name: &str) -> Result<McpRuntimeHandle, McpRuntimeError> {
399        let Some(launcher) = self.launcher(name) else {
400            return Err(McpRuntimeError::NotFound(name.to_string()));
401        };
402
403        let tools = launcher.tools.clone();
404        match launcher.transport {
405            McpServerLauncherTransport::Stdio(launch) => {
406                let mut command = launch.command();
407                let spawn_target = launch.command.clone();
408                let mut child = command.spawn().map_err(|source| McpRuntimeError::Spawn {
409                    command: spawn_target,
410                    source,
411                })?;
412
413                let stdout = child.stdout.take();
414                let stdin = child.stdin.take();
415                if let (Some(stdout), Some(stdin)) = (stdout, stdin) {
416                    let stderr = child.stderr.take();
417                    Ok(McpRuntimeHandle::Stdio(ManagedStdioRuntime {
418                        name: launcher.name,
419                        tools,
420                        child,
421                        stdin,
422                        stdout,
423                        stderr,
424                        timeout: launch.timeout,
425                    }))
426                } else {
427                    let _ = child.start_kill();
428                    Err(McpRuntimeError::MissingPipes {
429                        name: launcher.name,
430                    })
431                }
432            }
433            McpServerLauncherTransport::StreamableHttp(connector) => {
434                Ok(McpRuntimeHandle::StreamableHttp(ManagedHttpRuntime {
435                    name: launcher.name,
436                    connector,
437                    tools,
438                }))
439            }
440        }
441    }
442}
443
444/// Read-only helpers around [`McpRuntimeManager`] backed by stored config.
445#[derive(Clone, Debug)]
446pub struct McpRuntimeApi {
447    manager: McpRuntimeManager,
448}
449
450impl McpRuntimeApi {
451    /// Build a runtime API from already prepared launchers/connectors.
452    pub fn new(launchers: Vec<McpServerLauncher>) -> Self {
453        Self {
454            manager: McpRuntimeManager::new(launchers),
455        }
456    }
457
458    /// Load runtime launchers from disk and merge Workstream A stdio defaults.
459    ///
460    /// This is non-destructive: stored definitions are read, resolved, and left untouched.
461    pub fn from_config(
462        config: &McpConfigManager,
463        defaults: &StdioServerConfig,
464    ) -> Result<Self, McpConfigError> {
465        let launchers = config.runtime_launchers(defaults)?;
466        Ok(Self::new(launchers))
467    }
468
469    /// List available runtimes along with tool hints.
470    pub fn available(&self) -> Vec<McpRuntimeSummary> {
471        self.manager.available()
472    }
473
474    /// Returns a launch-ready config for the given runtime.
475    pub fn launcher(&self, name: &str) -> Result<McpServerLauncher, McpRuntimeError> {
476        self.manager
477            .launcher(name)
478            .ok_or_else(|| McpRuntimeError::NotFound(name.to_string()))
479    }
480
481    /// Returns the stdio launcher for a runtime, erroring if it uses HTTP.
482    pub fn stdio_launcher(&self, name: &str) -> Result<StdioLauncher, McpRuntimeError> {
483        let launcher = self.launcher(name)?;
484        match launcher.transport {
485            McpServerLauncherTransport::Stdio(launch) => Ok(launch),
486            McpServerLauncherTransport::StreamableHttp(_) => {
487                Err(McpRuntimeError::UnsupportedTransport {
488                    name: launcher.name,
489                    expected: "stdio",
490                    actual: "streamable_http",
491                })
492            }
493        }
494    }
495
496    /// Returns the HTTP connector for a runtime, erroring if it uses stdio.
497    pub fn http_connector(&self, name: &str) -> Result<StreamableHttpConnector, McpRuntimeError> {
498        let launcher = self.launcher(name)?;
499        match launcher.transport {
500            McpServerLauncherTransport::StreamableHttp(connector) => Ok(connector),
501            McpServerLauncherTransport::Stdio(_) => Err(McpRuntimeError::UnsupportedTransport {
502                name: launcher.name,
503                expected: "streamable_http",
504                actual: "stdio",
505            }),
506        }
507    }
508
509    /// Prepare a runtime handle for connection or spawn.
510    pub fn prepare(&self, name: &str) -> Result<McpRuntimeHandle, McpRuntimeError> {
511        self.manager.prepare(name)
512    }
513}
514
515/// Handle returned by [`McpRuntimeManager::prepare`] for either transport.
516#[derive(Debug)]
517pub enum McpRuntimeHandle {
518    Stdio(ManagedStdioRuntime),
519    StreamableHttp(ManagedHttpRuntime),
520}
521
522impl McpRuntimeHandle {
523    /// Returns tool hints when present.
524    pub fn tools(&self) -> Option<&McpToolConfig> {
525        match self {
526            McpRuntimeHandle::Stdio(handle) => handle.tools.as_ref(),
527            McpRuntimeHandle::StreamableHttp(handle) => handle.tools.as_ref(),
528        }
529    }
530}
531
532/// Running stdio MCP server along with its pipes.
533#[derive(Debug)]
534pub struct ManagedStdioRuntime {
535    name: String,
536    tools: Option<McpToolConfig>,
537    child: Child,
538    stdin: ChildStdin,
539    stdout: ChildStdout,
540    stderr: Option<ChildStderr>,
541    timeout: Duration,
542}
543
544impl ManagedStdioRuntime {
545    /// Name of the runtime.
546    pub fn name(&self) -> &str {
547        &self.name
548    }
549
550    /// Tool allow/deny hints if provided.
551    pub fn tools(&self) -> Option<&McpToolConfig> {
552        self.tools.as_ref()
553    }
554
555    /// Writable pipe to the server.
556    pub fn stdin_mut(&mut self) -> &mut ChildStdin {
557        &mut self.stdin
558    }
559
560    /// Readable pipe from the server.
561    pub fn stdout_mut(&mut self) -> &mut ChildStdout {
562        &mut self.stdout
563    }
564
565    /// Optional stderr pipe from the server.
566    pub fn stderr_mut(&mut self) -> Option<&mut ChildStderr> {
567        self.stderr.as_mut()
568    }
569
570    /// Terminate the process and wait for exit (best-effort).
571    pub async fn stop(&mut self) -> Result<(), McpRuntimeError> {
572        if let Ok(Some(_)) = self.child.try_wait() {
573            return Ok(());
574        }
575
576        let _ = self.child.start_kill();
577        match time::timeout(self.timeout, self.child.wait()).await {
578            Ok(Ok(_)) => Ok(()),
579            Ok(Err(source)) => Err(McpRuntimeError::Stop {
580                name: self.name.clone(),
581                source,
582            }),
583            Err(_) => Err(McpRuntimeError::StopTimeout {
584                name: self.name.clone(),
585                timeout: self.timeout,
586            }),
587        }
588    }
589}
590
591impl Drop for ManagedStdioRuntime {
592    fn drop(&mut self) {
593        let _ = self.child.start_kill();
594    }
595}
596
597/// HTTP runtime connector with tool hints preserved.
598#[derive(Clone, Debug)]
599pub struct ManagedHttpRuntime {
600    pub name: String,
601    pub connector: StreamableHttpConnector,
602    pub tools: Option<McpToolConfig>,
603}