objectiveai-mcp-proxy 2.0.11

MCP (Model Context Protocol) proxy server for ObjectiveAI
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
//! Per-session state and per-session dispatch.
//!
//! A `Session` owns the upstream MCP connections that belong to one MCP
//! session and is responsible for fanning `tools/list` / `resources/list`
//! out to them and routing `tools/call` / `resources/read` to the right
//! upstream. The registry that minds session ids and hands out
//! `Arc<Session>`s lives in [`crate::session_manager`].

use dashmap::DashMap;
use futures::future::try_join_all;
use indexmap::IndexMap;
use std::sync::Arc;
use objectiveai_sdk::mcp::{
    Connection, JsonRpcNotification,
    resource::{ListResourcesResult, ReadResourceResult, Resource},
    tool::{CallToolRequestParams, CallToolResult, ContentBlock, ListToolsResult, Tool},
};
use tokio::sync::{Mutex, broadcast};
use tokio_util::sync::CancellationToken;

/// Capacity of the per-session outbound notification channel. Sized so
/// even a noisy upstream can't easily lap a slow SSE consumer.
const OUTBOUND_CAPACITY: usize = 64;

/// Hashable key for a JSON-RPC request id. JSON-RPC ids can be number,
/// string, or null, so we serialize to canonical JSON and hash on that.
fn request_id_key(id: &serde_json::Value) -> String {
    // serde_json::to_string is infallible for any Value.
    serde_json::to_string(id).unwrap_or_default()
}

/// Per-session state.
///
/// All routing, fan-out, and forwarding methods live here. The registry
/// that hands out `Arc<Session>`s by id is `SessionManager` (in
/// [`crate::session_manager`]).
#[derive(Debug)]
pub struct Session {
    /// Live upstream MCP connections keyed by their
    /// `initialize_result.server_info.name`. The key is the same string the
    /// proxy uses as the `<server-name>_` prefix on every tool name and
    /// resource URI it ships, so routing inbound `tools/call` /
    /// `resources/read` is just a longest-prefix-match lookup against this
    /// map's keys — no side-channel cache to keep coherent.
    ///
    /// Insertion order matches the order URLs appeared in `X-MCP-Servers`,
    /// so listings are deterministic.
    ///
    /// `Connection` is itself a cheaply-clonable Arc wrapper; dropping a
    /// `Connection` fires the upstream listener's wakeup signal so it can
    /// self-cancel within scheduler latency once no external handle remains.
    pub connections: IndexMap<String, Connection>,
    /// Fan-out channel for server-initiated notifications. Whenever an
    /// upstream emits `notifications/tools/list_changed` or
    /// `notifications/resources/list_changed`, a JsonRpcNotification with
    /// the matching method is published here. Subscribers (the SSE GET
    /// stream in `mcp::handle_get`) drain it onto the wire to the
    /// downstream client.
    ///
    /// `broadcast` rather than `mpsc` so multiple concurrent GET streams
    /// for the same session — which the MCP spec allows — each see every
    /// notification.
    pub outbound: broadcast::Sender<JsonRpcNotification>,
    /// In-flight per-request cancellation tokens, keyed by the inbound
    /// JSON-RPC request id (stringified for hashability — JSON-RPC ids
    /// can be number, string, or null). The downstream client cancels a
    /// request by sending `notifications/cancelled` with the matching
    /// `requestId`; the handler that owns that id observes the token
    /// firing via `tokio::select!` and returns a `-32800 request cancelled`
    /// JSON-RPC error. Drops the upstream call's future as a side effect.
    in_flight: DashMap<String, CancellationToken>,
    /// Content blocks accumulated via `POST /notify` between tool calls.
    /// Drained and prepended (wrapped in a `<system-reminder>` text
    /// block pair) on the next `tools/call` response so the model picks
    /// the message up at its next natural inspection point.
    pending_notifications: Mutex<Vec<ContentBlock>>,
    /// The canonical `URL → header_map` payload that was encoded into
    /// this session's id. Used by `handle_initialize`'s
    /// alive-in-memory branch to re-mint an id from the same byte-
    /// stable shape that was originally encoded — so even if the live
    /// `Connection`s rotated their internal state, the id remains
    /// derivable from the immutable per-upstream header set.
    pub payload: crate::session_manager::SessionPayload,
    /// Per-`server_name` allowlist of un-prefixed tool names. Derived
    /// once at `Session::new` time from
    /// [`crate::session_manager::SessionPayload::tool_allowlists`]
    /// (which is keyed by upstream URL) by walking the live
    /// connections and remapping URL -> server_name. A server_name
    /// absent from this map gets no filtering applied on `list_tools`.
    /// Empty Vec for a server_name => zero tools visible from that
    /// upstream.
    tool_allowlists_by_server: IndexMap<String, Vec<String>>,
}

impl Session {
    pub(crate) fn new(
        connections: IndexMap<String, Connection>,
        payload: crate::session_manager::SessionPayload,
    ) -> Self {
        let (outbound, _) = broadcast::channel(OUTBOUND_CAPACITY);

        // Wire each upstream's list_changed callbacks to publish a
        // matching notification onto the outbound channel. Callbacks fire
        // under the upstream's cache write lock (before the network
        // refresh), so by the time the downstream client re-fetches via
        // tools/list or resources/list it'll either get the new list or
        // wait on the lock until the new list lands — never sees the
        // stale list with a fresh notification.
        for connection in connections.values() {
            let tx = outbound.clone();
            connection.set_on_tools_list_changed(move || {
                let _ = tx.send(JsonRpcNotification {
                    jsonrpc: "2.0".into(),
                    method: "notifications/tools/list_changed".into(),
                    params: None,
                });
            });
            let tx = outbound.clone();
            connection.set_on_resources_list_changed(move || {
                let _ = tx.send(JsonRpcNotification {
                    jsonrpc: "2.0".into(),
                    method: "notifications/resources/list_changed".into(),
                    params: None,
                });
            });
        }

        // Remap the payload's URL-keyed tool_allowlists onto our
        // server_name-keyed connections map. URLs in the allowlist
        // that don't match any live connection are silently dropped
        // (the proxy just won't filter for them).
        let mut tool_allowlists_by_server: IndexMap<String, Vec<String>> = IndexMap::new();
        for (server_name, connection) in &connections {
            if let Some(names) = payload.tool_allowlists.get(&connection.url) {
                tool_allowlists_by_server
                    .insert(server_name.clone(), names.clone());
            }
        }

        Self {
            connections,
            outbound,
            in_flight: DashMap::new(),
            pending_notifications: Mutex::new(Vec::new()),
            payload,
            tool_allowlists_by_server,
        }
    }

    /// Append `blocks` to the pending-notifications queue. The next
    /// `tools/call` response on this session drains and prepends them.
    /// Caller-supplied `X-OBJECTIVEAI-AGENT-ID` captured at session-
    /// open time. Recovered from the encrypted session-id payload on
    /// every resume, so upstream sdk runners (which only carry the
    /// session id back) never have to re-send the header themselves.
    pub fn agent_id(&self) -> Option<&str> {
        self.payload.agent_id.as_deref()
    }

    pub async fn enqueue_notifications(&self, blocks: Vec<ContentBlock>) {
        if blocks.is_empty() {
            return;
        }
        self.pending_notifications.lock().await.extend(blocks);
    }

    /// Atomically take the queued notifications. Subsequent calls return
    /// `Vec::new()` until the next enqueue.
    pub async fn drain_notifications(&self) -> Vec<ContentBlock> {
        std::mem::take(&mut *self.pending_notifications.lock().await)
    }

    /// Non-draining peek — `true` iff the pending-notifications queue
    /// holds at least one block. Companion to [`Self::drain_notifications`]
    /// for callers that want to know whether a drain *would* return
    /// anything without consuming the queue.
    pub async fn has_pending_notifications(&self) -> bool {
        !self.pending_notifications.lock().await.is_empty()
    }

    /// Mint a [`CancellationToken`] for an inbound request id, store it,
    /// and hand back a clone. The handler `select!`s on the clone; the
    /// stored token is what `cancel_in_flight` fires.
    pub fn register_in_flight(&self, id: &serde_json::Value) -> CancellationToken {
        let token = CancellationToken::new();
        self.in_flight.insert(request_id_key(id), token.clone());
        token
    }

    /// Drop the in-flight token for `id`. Always paired with an earlier
    /// `register_in_flight` via a guard so we don't leak entries on the
    /// happy path.
    pub fn deregister_in_flight(&self, id: &serde_json::Value) {
        self.in_flight.remove(&request_id_key(id));
    }

    /// Fire the cancellation token associated with `id`, if any. Returns
    /// `true` if a token was found and cancelled. Triggered by an inbound
    /// `notifications/cancelled` from the downstream client.
    pub fn cancel_in_flight(&self, id: &serde_json::Value) -> bool {
        match self.in_flight.get(&request_id_key(id)) {
            Some(entry) => {
                entry.value().cancel();
                true
            }
            None => false,
        }
    }

    /// Fan `tools/list` out to every upstream in parallel, prefix each
    /// tool's name with `<server-name>_`, concatenate the per-upstream
    /// lists, and return the union sorted by name. Fails fast: the
    /// first upstream error short-circuits via `try_join_all` and is
    /// returned to the caller — we don't paper over a broken upstream.
    ///
    /// Sorting by name guarantees a stable order across calls regardless
    /// of upstream `HashMap` iteration order or per-upstream return
    /// order; downstream consumers (e.g. seeded mock agents) rely on
    /// this for deterministic output.
    pub async fn list_tools(&self) -> Result<ListToolsResult, Arc<objectiveai_sdk::mcp::Error>> {
        self.list_tools_filtered(None).await
    }

    /// Per-upstream variant of [`Self::list_tools`]: when `filter_url`
    /// is `Some`, only fans out to the single upstream whose connection
    /// `url` matches verbatim. When `None`, behaves identically to the
    /// no-arg form (fan out to every upstream).
    ///
    /// An unmatched `filter_url` returns an empty `ListToolsResult`
    /// (not an error) — the caller validates whether emptiness is
    /// acceptable. The per-upstream allowlist (`X-MCP-Tools-Allow`)
    /// is still applied to whichever upstream(s) participate.
    pub async fn list_tools_filtered(
        &self,
        filter_url: Option<&str>,
    ) -> Result<ListToolsResult, Arc<objectiveai_sdk::mcp::Error>> {
        let pairs: Vec<(&String, &Connection)> = match filter_url {
            Some(url) => self
                .connections
                .iter()
                .filter(|(_, c)| c.url == url)
                .collect(),
            None => self.connections.iter().collect(),
        };
        let results = try_join_all(
            pairs
                .iter()
                .map(|(_, c)| async move {
                    let r = c.list_tools().await;
                    r
                }),
        )
        .await?;

        let mut tools: Vec<Tool> = Vec::new();
        for ((server_name, _), arc) in pairs.into_iter().zip(results) {
            let allowlist = self.tool_allowlists_by_server.get(server_name);
            for tool in arc.iter() {
                // Filter against the per-upstream allowlist if one
                // was supplied via `X-MCP-Tools-Allow`. Match is on
                // the un-prefixed name (the name the upstream itself
                // returned), so callers don't have to know the
                // `<server_name>_` prefix the proxy stamps.
                if let Some(names) = allowlist {
                    if !names.iter().any(|n| n == &tool.name) {
                        continue;
                    }
                }
                let mut prefixed = tool.clone();
                prefixed.name = prefix_name(server_name, &tool.name);
                tools.push(prefixed);
            }
        }
        tools.sort_by(|a, b| a.name.cmp(&b.name));

        Ok(ListToolsResult {
            tools,
            next_cursor: None,
            _meta: None,
        })
    }

    /// Fan `resources/list` out to every upstream in parallel, prefix
    /// each URI with `<server-name>_`, concatenate the per-upstream
    /// lists, and return the union sorted by URI. Same fail-fast
    /// semantics as [`Session::list_tools`] — the first upstream error
    /// short-circuits and is returned to the caller.
    pub async fn list_resources(&self) -> Result<ListResourcesResult, Arc<objectiveai_sdk::mcp::Error>> {
        self.list_resources_filtered(None).await
    }

    /// Per-upstream variant of [`Self::list_resources`]: when
    /// `filter_url` is `Some`, only fans out to the single upstream
    /// whose connection `url` matches verbatim. When `None`, behaves
    /// identically to the no-arg form (fan out to every upstream).
    ///
    /// An unmatched `filter_url` returns an empty `ListResourcesResult`
    /// (not an error) — the caller validates whether emptiness is
    /// acceptable.
    pub async fn list_resources_filtered(
        &self,
        filter_url: Option<&str>,
    ) -> Result<ListResourcesResult, Arc<objectiveai_sdk::mcp::Error>> {
        let pairs: Vec<(&String, &Connection)> = match filter_url {
            Some(url) => self
                .connections
                .iter()
                .filter(|(_, c)| c.url == url)
                .collect(),
            None => self.connections.iter().collect(),
        };
        let results = try_join_all(
            pairs
                .iter()
                .map(|(_, c)| async move {
                    let r = c.list_resources().await;
                    r
                }),
        )
        .await?;

        let mut resources: Vec<Resource> = Vec::new();
        for ((server_name, _), arc) in pairs.into_iter().zip(results) {
            for resource in arc.iter() {
                let mut prefixed = resource.clone();
                prefixed.uri = prefix_name(server_name, &resource.uri);
                resources.push(prefixed);
            }
        }
        resources.sort_by(|a, b| a.uri.cmp(&b.uri));

        Ok(ListResourcesResult {
            resources,
            next_cursor: None,
            _meta: None,
        })
    }

    /// Forward `tools/call` to whichever upstream owns the named tool.
    /// Routing is longest-prefix-match against the connection map's keys —
    /// see [`Session::route`].
    pub async fn call_tool(
        &self,
        params: &CallToolRequestParams,
    ) -> Result<CallToolResult, CallToolError> {
        let (connection, original_name) = self
            .route(&params.name)
            .ok_or_else(|| CallToolError::ToolNotFound(params.name.clone()))?;

        // Forward to the upstream with the un-prefixed tool name it actually
        // knows; pass everything else (`arguments`, `task`, `_meta`) through
        // unchanged.
        let upstream_params = CallToolRequestParams {
            name: original_name,
            arguments: params.arguments.clone(),
            task: params.task.clone(),
            _meta: params._meta.clone(),
        };
        let r = connection.call_tool(&upstream_params).await;
        Ok(r?)
    }

    /// Forward `resources/read` to whichever upstream owns the URI. Same
    /// longest-prefix-match routing as [`Session::call_tool`].
    pub async fn read_resource(
        &self,
        uri: &str,
    ) -> Result<ReadResourceResult, ReadResourceError> {
        let (connection, original_uri) = self
            .route(uri)
            .ok_or_else(|| ReadResourceError::ResourceNotFound(uri.to_string()))?;
        let r = connection.read_resource(&original_uri).await;
        Ok(r?)
    }

    /// Resolve a `<server-name>_<original>` prefixed identifier to the
    /// owning connection and the original (un-prefixed) name the upstream
    /// actually knows.
    ///
    /// Server names that contain `_` are supported via longest-prefix
    /// match: if both `fs` and `fs_extra` are connected and the inbound
    /// name is `fs_extra_Read`, the `fs_extra` upstream wins.
    fn route<'a>(&'a self, prefixed: &str) -> Option<(&'a Connection, String)> {
        let mut best: Option<(&'a str, &'a Connection)> = None;
        for (name, conn) in &self.connections {
            // Need at least one char after the `_` to count as a real prefix
            // hit (otherwise an exact match `name == prefixed` would route
            // to an empty original name).
            if prefixed.len() > name.len() + 1
                && prefixed.as_bytes()[name.len()] == b'_'
                && prefixed.starts_with(name.as_str())
            {
                if best.map(|(b, _)| name.len() > b.len()).unwrap_or(true) {
                    best = Some((name.as_str(), conn));
                }
            }
        }
        best.map(|(name, conn)| {
            let original = prefixed[name.len() + 1..].to_string();
            (conn, original)
        })
    }
}

/// Prefix a tool name or resource URI with the upstream server name.
/// Format: `<server-name>_<original>`.
fn prefix_name(server_name: &str, name: &str) -> String {
    format!("{server_name}_{name}")
}

/// Failure modes for [`Session::call_tool`].
#[derive(Debug, thiserror::Error)]
pub enum CallToolError {
    #[error("tool not found on any upstream: {0}")]
    ToolNotFound(String),
    #[error("upstream call_tool failed: {0}")]
    Upstream(#[from] objectiveai_sdk::mcp::Error),
}

/// Failure modes for [`Session::read_resource`].
#[derive(Debug, thiserror::Error)]
pub enum ReadResourceError {
    #[error("resource not found on any upstream: {0}")]
    ResourceNotFound(String),
    #[error("upstream read_resource failed: {0}")]
    Upstream(#[from] objectiveai_sdk::mcp::Error),
}