Skip to main content

objectiveai_mcp_proxy/
queue_delegate.rs

1//! In-process delegate the embedder (the API) plugs in at
2//! [`crate::setup`] time so the proxy can surface client-notification
3//! content blocks on tool responses without HTTP round-trips.
4//!
5//! The proxy is **fully naive** about ban lists, content-row ids, and
6//! confirmation state. It hands the trait two things on each call
7//! (the per-session agent arguments + the MCP session id) and either
8//! gets back a [`QueueRead`] (token + blocks) to splice into the next
9//! tool response, or `None` for "nothing to surface right now"
10//! (including errors — the proxy never sees a `Result`).
11//!
12//! The `token` is opaque to the proxy: it gets embedded in the
13//! wrapper prefix the agent sees (via
14//! [`objectiveai_sdk::mcp::queue_notification::format_prefix`]),
15//! round-trips through the agent's tool-message text, and lands
16//! back at the API's run-loop, which calls back into its own
17//! `ApiQueueDelegate::confirm(token)` to finalize delivery. Tokens
18//! the run-loop never sees stay in the delegate's "pending" map
19//! until the loop unregisters, at which point those ids re-deliver
20//! on the next loop. That's how we avoid claiming delivery for
21//! content the agent didn't actually consume.
22//!
23//! The proxy calls the delegate **sequentially after the tool call
24//! succeeds**. Running the read in parallel with the tool call would
25//! advance the delegate's pending map even when the call ends up
26//! returning a JSON-RPC error (ToolNotFound / Upstream) — those rows
27//! would never surface in any tool-message text and would never get
28//! confirmed (correct behavior), but the wasted speculative state
29//! makes pending bloat unnecessarily. Sequential keeps things tidy.
30
31use std::future::Future;
32use std::pin::Pin;
33
34use indexmap::IndexMap;
35use objectiveai_sdk::mcp::tool::ContentBlock;
36
37/// Successful return shape: a per-row block list plus a
38/// confirmation token the delegate has reserved for this batch.
39/// The proxy embeds `token` in the wrapper prefix it prepends to
40/// the tool response; it does nothing else with the token.
41pub struct QueueRead {
42    /// Opaque confirmation token. The agent's tool-message text
43    /// will carry it inline; `run_agent_loop` regex-extracts it
44    /// and calls back to confirm delivery.
45    pub token: String,
46    /// One inner `Vec<ContentBlock>` per consumed `message_queue`
47    /// row, oldest-first.
48    pub blocks: Vec<Vec<ContentBlock>>,
49}
50
51/// Embedder-provided message-queue read path. `None` ⇒ no delegate
52/// installed at proxy boot (CLI standalone case); the proxy's
53/// tool-call dispatcher short-circuits without invoking anything.
54///
55/// The trait method returns a boxed-pinned `Future + Send` rather
56/// than using `async_trait`: no proc-macro dep, and the trait
57/// stays `dyn`-safe so the proxy can hold it as
58/// `Arc<dyn QueueDelegate>`. Implementations can still write the
59/// body as an `async move { ... }` block wrapped in `Box::pin`.
60pub trait QueueDelegate: Send + Sync {
61    /// Read pending content blocks for one MCP session. Returns
62    /// `Some(QueueRead { token, blocks })` on success, `None` on
63    /// error / empty / "nothing right now."
64    ///
65    /// `agent_arguments` is the proxy's per-session transient
66    /// header map (the agent-routing keys like
67    /// `X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY`); the delegate
68    /// parses out whatever it needs to look up the right per-loop
69    /// state. `mcp_session_id` is the base62-encoded session
70    /// envelope.
71    fn read_pending_blocks<'a>(
72        &'a self,
73        agent_arguments: &'a IndexMap<String, String>,
74        mcp_session_id: &'a str,
75    ) -> Pin<Box<dyn Future<Output = Option<QueueRead>> + Send + 'a>>;
76}