Skip to main content

mcpr_core/
event.rs

1//! Proxy event types and the event sink trait.
2//!
3//! [`ProxyEvent`] is the single event enum flowing through the event bus.
4//! [`EventSink`] is the trait sinks implement to consume events.
5//!
6//! Both live in `mcpr-core` so any crate can:
7//! - Emit events (proxy engine)
8//! - Consume events (sinks: stderr, sqlite, cloud, prometheus, etc.)
9
10use crate::protocol::schema::PageStatus;
11use serde::Serialize;
12
13// ── Event types ────────────────────────────────────────────────────────
14
15/// All events flowing through the event bus.
16///
17/// Each variant represents a distinct lifecycle moment. Sinks match on
18/// the variant to decide what to process.
19#[derive(Clone, Debug, Serialize)]
20#[serde(tag = "type", rename_all = "snake_case")]
21pub enum ProxyEvent {
22    /// An MCP request completed (success or error).
23    Request(Box<RequestEvent>),
24    /// A new MCP session established via `initialize` handshake.
25    SessionStart(SessionStartEvent),
26    /// A session was closed (clean transport disconnect).
27    SessionEnd(SessionEndEvent),
28    /// Periodic health snapshot emitted by the health check loop.
29    Heartbeat(HeartbeatEvent),
30    /// A schema discovery response was captured (before proxy rewrite).
31    SchemaCapture(SchemaCaptureEvent),
32    /// Server indicated its schema changed (e.g., `notifications/tools/list_changed`).
33    SchemaStale(SchemaStaleEvent),
34}
35
36/// An MCP request that flowed through the proxy.
37#[derive(Clone, Debug, Serialize)]
38pub struct RequestEvent {
39    /// Unique event ID (UUIDv4).
40    pub id: String,
41    /// Unix milliseconds (UTC).
42    pub ts: i64,
43    /// Proxy name (from config or derived from upstream URL).
44    pub proxy: String,
45    /// MCP session ID (from `mcp-session-id` header).
46    pub session_id: Option<String>,
47
48    /// HTTP method (POST, GET, DELETE).
49    pub method: String,
50    /// Request path.
51    pub path: String,
52    /// MCP JSON-RPC method (tools/call, resources/read, etc.).
53    pub mcp_method: Option<String>,
54    /// Tool name for `tools/call` requests.
55    pub tool: Option<String>,
56
57    /// HTTP response status code.
58    pub status: u16,
59    /// Wall-clock latency: proxy received request → sent response (μs).
60    pub latency_us: u64,
61    /// Time spent waiting for upstream response (μs).
62    pub upstream_us: Option<u64>,
63    /// Request payload size in bytes.
64    pub request_size: Option<u64>,
65    /// Response payload size in bytes.
66    pub response_size: Option<u64>,
67
68    /// MCP JSON-RPC error code (e.g., "-32600") if the response was an error.
69    pub error_code: Option<String>,
70    /// Error message (truncated to 512 chars).
71    pub error_msg: Option<String>,
72
73    /// Client name from session `clientInfo.name` (e.g., "claude-desktop").
74    pub client_name: Option<String>,
75    /// Client version from session `clientInfo.version` (e.g., "1.2.0").
76    pub client_version: Option<String>,
77
78    /// Classification note: "rewritten", "passthrough", "error", "sse", etc.
79    pub note: String,
80}
81
82/// MCP session established via `initialize` handshake.
83#[derive(Clone, Debug, Serialize)]
84pub struct SessionStartEvent {
85    pub session_id: String,
86    pub proxy: String,
87    pub ts: i64,
88    /// Client name from `clientInfo.name` (e.g., "claude-desktop").
89    pub client_name: Option<String>,
90    /// Client version from `clientInfo.version` (e.g., "1.2.0").
91    pub client_version: Option<String>,
92    /// Normalized platform: "claude", "chatgpt", "vscode", "cursor", "unknown".
93    pub client_platform: Option<String>,
94}
95
96/// Session closed (clean transport disconnect).
97#[derive(Clone, Debug, Serialize)]
98pub struct SessionEndEvent {
99    pub session_id: String,
100    pub ts: i64,
101}
102
103/// Periodic health snapshot.
104#[derive(Clone, Debug, Serialize)]
105pub struct HeartbeatEvent {
106    pub ts: i64,
107    pub proxy: String,
108    pub mcp_status: String,
109    pub tunnel_status: String,
110    pub widgets_status: String,
111    pub uptime_secs: u64,
112    pub request_count: u64,
113}
114
115/// Captured MCP schema discovery response, emitted BEFORE proxy rewrite.
116#[derive(Clone, Debug, Serialize)]
117pub struct SchemaCaptureEvent {
118    /// Unix milliseconds (UTC).
119    pub ts: i64,
120    /// Proxy name.
121    pub proxy: String,
122    /// Upstream MCP server URL.
123    pub upstream_url: String,
124    /// MCP method that produced this response (e.g., "initialize", "tools/list").
125    pub method: String,
126    /// The raw `result` field from the JSON-RPC response, serialized as JSON.
127    pub payload: String,
128    /// Pagination state — used by the writer to buffer multi-page responses.
129    pub page_status: PageStatus,
130}
131
132/// Server indicated its schema changed (e.g., `notifications/tools/list_changed`).
133#[derive(Clone, Debug, Serialize)]
134pub struct SchemaStaleEvent {
135    /// Unix milliseconds (UTC).
136    pub ts: i64,
137    /// Proxy name.
138    pub proxy: String,
139    /// Upstream MCP server URL.
140    pub upstream_url: String,
141    /// The method whose schema is now stale (e.g., "tools/list").
142    pub method: String,
143}
144
145// ── Event sink trait ───────────────────────────────────────────────────
146
147/// A sink that consumes proxy events from the event bus.
148///
149/// Register sinks at startup. The event bus calls `on_event` for every
150/// event, and sinks filter by variant. Example:
151///
152/// ```rust,ignore
153/// impl EventSink for PrometheusSink {
154///     fn on_event(&self, event: &ProxyEvent) {
155///         if let ProxyEvent::Request(e) = event {
156///             self.request_counter.inc();
157///             self.latency_histogram.observe(e.latency_us as f64);
158///         }
159///     }
160///     fn name(&self) -> &'static str { "prometheus" }
161/// }
162/// ```
163///
164/// # Contract
165///
166/// - **`on_event` must not block.** If the sink needs I/O (HTTP, disk),
167///   buffer internally and flush in `flush()` or a background thread.
168/// - **`on_batch`** is called when multiple events are available. Override
169///   for sinks that benefit from batching (SQL INSERT, HTTP POST).
170/// - **`flush`** is called periodically (~5s) and on graceful shutdown.
171pub trait EventSink: Send + Sync {
172    /// Process a single event. Must not block.
173    fn on_event(&self, event: &ProxyEvent);
174
175    /// Process a batch of events. Default calls `on_event` for each.
176    fn on_batch(&self, events: &[ProxyEvent]) {
177        for event in events {
178            self.on_event(event);
179        }
180    }
181
182    /// Flush internal buffers to their destination.
183    fn flush(&self) {}
184
185    /// Human-readable sink name (for logging and debugging).
186    fn name(&self) -> &'static str;
187}
188
189/// A no-op sink that discards all events. Used when no sinks are configured.
190pub struct NoopSink;
191
192impl EventSink for NoopSink {
193    fn on_event(&self, _event: &ProxyEvent) {}
194    fn name(&self) -> &'static str {
195        "noop"
196    }
197}