Skip to main content

mcpr_core/
event.rs

1//! Proxy event types and the event sink trait.
2//!
3//! [`ProxyEvent`] is the single event enum flowing through the event bus.
4//! [`EventSink`] is the trait sinks implement to consume events.
5//!
6//! Both live in `mcpr-core` so any crate can:
7//! - Emit events (proxy engine)
8//! - Consume events (sinks: stderr, sqlite, cloud, prometheus, etc.)
9
10use serde::Serialize;
11use serde_json::Value;
12
13// ── Event types ────────────────────────────────────────────────────────
14
15/// All events flowing through the event bus.
16///
17/// Each variant represents a distinct lifecycle moment. Sinks match on
18/// the variant to decide what to process.
19#[derive(Clone, Debug, Serialize)]
20#[serde(tag = "type", rename_all = "snake_case")]
21pub enum ProxyEvent {
22    /// An MCP request completed (success or error).
23    Request(Box<RequestEvent>),
24    /// A new MCP session established via `initialize` handshake.
25    SessionStart(SessionStartEvent),
26    /// A session was closed (clean transport disconnect).
27    SessionEnd(SessionEndEvent),
28    /// Periodic health snapshot emitted by the health check loop.
29    Heartbeat(HeartbeatEvent),
30    /// A new `SchemaVersion` was created inside the proxy's `SchemaManager`.
31    /// Emitted after pagination merge + change detection. Consumers
32    /// (SQLite writer, cloud sink) persist or forward the version directly
33    /// from the event — no secondary lookup required.
34    SchemaVersionCreated(SchemaVersionCreatedEvent),
35}
36
37/// An MCP request that flowed through the proxy.
38#[derive(Clone, Debug, Serialize)]
39pub struct RequestEvent {
40    /// Unique event ID (UUIDv4).
41    pub id: String,
42    /// Unix milliseconds (UTC).
43    pub ts: i64,
44    /// Proxy name (from config or derived from upstream URL).
45    pub proxy: String,
46    /// MCP session ID (from `mcp-session-id` header).
47    pub session_id: Option<String>,
48
49    /// HTTP method (POST, GET, DELETE).
50    pub method: String,
51    /// Request path.
52    pub path: String,
53    /// MCP JSON-RPC method (tools/call, resources/read, etc.).
54    pub mcp_method: Option<String>,
55    /// Tool name for `tools/call` requests.
56    pub tool: Option<String>,
57
58    /// HTTP response status code.
59    pub status: u16,
60    /// Wall-clock latency: proxy received request → sent response (μs).
61    pub latency_us: u64,
62    /// Time spent waiting for upstream response (μs).
63    pub upstream_us: Option<u64>,
64    /// Request payload size in bytes.
65    pub request_size: Option<u64>,
66    /// Response payload size in bytes.
67    pub response_size: Option<u64>,
68
69    /// MCP JSON-RPC error code (e.g., "-32600") if the response was an error.
70    pub error_code: Option<String>,
71    /// Error message (truncated to 512 chars).
72    pub error_msg: Option<String>,
73
74    /// Client name from session `clientInfo.name` (e.g., "claude-desktop").
75    pub client_name: Option<String>,
76    /// Client version from session `clientInfo.version` (e.g., "1.2.0").
77    pub client_version: Option<String>,
78
79    /// Classification note: "rewritten", "passthrough", "error", "sse", etc.
80    pub note: String,
81}
82
83/// MCP session established via `initialize` handshake.
84#[derive(Clone, Debug, Serialize)]
85pub struct SessionStartEvent {
86    pub session_id: String,
87    pub proxy: String,
88    pub ts: i64,
89    /// Client name from `clientInfo.name` (e.g., "claude-desktop").
90    pub client_name: Option<String>,
91    /// Client version from `clientInfo.version` (e.g., "1.2.0").
92    pub client_version: Option<String>,
93    /// Normalized platform: "claude", "chatgpt", "vscode", "cursor", "unknown".
94    pub client_platform: Option<String>,
95}
96
97/// Session closed (clean transport disconnect).
98#[derive(Clone, Debug, Serialize)]
99pub struct SessionEndEvent {
100    pub session_id: String,
101    pub ts: i64,
102}
103
104/// Periodic health snapshot.
105#[derive(Clone, Debug, Serialize)]
106pub struct HeartbeatEvent {
107    pub ts: i64,
108    pub proxy: String,
109    pub mcp_status: String,
110    pub tunnel_status: String,
111    pub widgets_status: String,
112    pub uptime_secs: u64,
113    pub request_count: u64,
114}
115
116/// A new `SchemaVersion` was persisted for an upstream.
117///
118/// Carries the full merged payload so consumers (SQLite writer, cloud
119/// sink) can persist or forward without a secondary lookup.
120#[derive(Clone, Debug, Serialize)]
121pub struct SchemaVersionCreatedEvent {
122    /// Unix milliseconds (UTC).
123    pub ts: i64,
124    /// Proxy config name (upstream identity).
125    pub upstream_id: String,
126    /// Upstream MCP server URL (for legacy table rows keyed on url).
127    pub upstream_url: String,
128    /// MCP method that produced this version.
129    pub method: String,
130    /// Monotonic version number per (upstream, method).
131    pub version: u32,
132    /// Opaque `SchemaVersionId` (first 16 hex chars of `content_hash`).
133    pub version_id: String,
134    /// Full SHA-256 hex digest of the merged payload.
135    pub content_hash: String,
136    /// Merged `result` payload (post-pagination).
137    pub payload: Value,
138}
139
140// ── Event sink trait ───────────────────────────────────────────────────
141
142/// A sink that consumes proxy events from the event bus.
143///
144/// Register sinks at startup. The event bus calls `on_event` for every
145/// event, and sinks filter by variant. Example:
146///
147/// ```rust,ignore
148/// impl EventSink for PrometheusSink {
149///     fn on_event(&self, event: &ProxyEvent) {
150///         if let ProxyEvent::Request(e) = event {
151///             self.request_counter.inc();
152///             self.latency_histogram.observe(e.latency_us as f64);
153///         }
154///     }
155///     fn name(&self) -> &'static str { "prometheus" }
156/// }
157/// ```
158///
159/// # Contract
160///
161/// - **`on_event` must not block.** If the sink needs I/O (HTTP, disk),
162///   buffer internally and flush in `flush()` or a background thread.
163/// - **`on_batch`** is called when multiple events are available. Override
164///   for sinks that benefit from batching (SQL INSERT, HTTP POST).
165/// - **`flush`** is called periodically (~5s) and on graceful shutdown.
166pub trait EventSink: Send + Sync {
167    /// Process a single event. Must not block.
168    fn on_event(&self, event: &ProxyEvent);
169
170    /// Process a batch of events. Default calls `on_event` for each.
171    fn on_batch(&self, events: &[ProxyEvent]) {
172        for event in events {
173            self.on_event(event);
174        }
175    }
176
177    /// Flush internal buffers to their destination.
178    fn flush(&self) {}
179
180    /// Human-readable sink name (for logging and debugging).
181    fn name(&self) -> &'static str;
182}
183
184/// A no-op sink that discards all events. Used when no sinks are configured.
185pub struct NoopSink;
186
187impl EventSink for NoopSink {
188    fn on_event(&self, _event: &ProxyEvent) {}
189    fn name(&self) -> &'static str {
190        "noop"
191    }
192}