vellaveto_http_proxy/proxy/websocket/mod.rs
1// Copyright 2026 Paolo Vella
2// SPDX-License-Identifier: BUSL-1.1
3//
4// Use of this software is governed by the Business Source License
5// included in the LICENSE-BSL-1.1 file at the root of this repository.
6//
7// Change Date: Three years from the date of publication of this version.
8// Change License: MPL-2.0
9
10//! WebSocket transport for MCP JSON-RPC messages (SEP-1288).
11//!
12//! Implements a WebSocket reverse proxy that sits between MCP clients and
13//! an upstream MCP server. WebSocket messages (text frames) are parsed as
14//! JSON-RPC, classified via `vellaveto_mcp::extractor`, evaluated against
15//! loaded policies, and forwarded to the upstream server.
16//!
17//! Security invariants:
18//! - **Fail-closed**: Unparseable messages close the connection (code 1008).
19//! - **No binary frames**: Only text frames are accepted (code 1003 for binary).
20//! - **Session binding**: Each WS connection is bound to exactly one session.
21//! - **Canonicalization**: Re-serialized JSON forwarded (TOCTOU defense).
22
23use axum::{
24 extract::{
25 ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade},
26 ConnectInfo, State,
27 },
28 http::HeaderMap,
29 response::Response,
30};
31use futures_util::{SinkExt, StreamExt};
32use serde_json::{json, Value};
33use std::net::SocketAddr;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::Mutex;
38use vellaveto_mcp::extractor::{self, MessageType};
39use vellaveto_mcp::inspection::{
40 inspect_for_injection, scan_notification_for_secrets, scan_parameters_for_secrets,
41 scan_response_for_secrets, scan_text_for_secrets, scan_tool_descriptions,
42 scan_tool_descriptions_with_scanner,
43};
44use vellaveto_mcp::output_validation::ValidationResult;
45use vellaveto_types::{Action, EvaluationContext, Verdict};
46
47use super::auth::{validate_agent_identity, validate_api_key, validate_oauth};
48use super::call_chain::{
49 check_privilege_escalation, sync_session_call_chain_from_headers, take_tracked_tool_call,
50 track_pending_tool_call,
51};
52use super::origin::validate_origin;
53use super::ProxyState;
54use crate::proxy_metrics::record_dlp_finding;
55
56/// Configuration for WebSocket transport.
57#[derive(Debug, Clone)]
58pub struct WebSocketConfig {
59 /// Maximum message size in bytes (default: 1 MB).
60 pub max_message_size: usize,
61 /// Idle timeout in seconds — close connection after no message activity (default: 300s).
62 /// SECURITY (FIND-R182-001): True idle timeout that resets on every message.
63 pub idle_timeout_secs: u64,
64 /// Maximum messages per second per connection for client-to-upstream (default: 100).
65 pub message_rate_limit: u32,
66 /// Maximum messages per second per connection for upstream-to-client (default: 500).
67 /// SECURITY (FIND-R46-WS-003): Rate limits on the upstream→client direction prevent
68 /// a malicious upstream from flooding the client with responses.
69 pub upstream_rate_limit: u32,
70}
71
72impl Default for WebSocketConfig {
73 fn default() -> Self {
74 Self {
75 max_message_size: 1_048_576,
76 idle_timeout_secs: 300,
77 message_rate_limit: 100,
78 upstream_rate_limit: 500,
79 }
80 }
81}
82
83/// WebSocket close codes per RFC 6455.
84const CLOSE_POLICY_VIOLATION: u16 = 1008;
85const CLOSE_UNSUPPORTED_DATA: u16 = 1003;
86/// Close code for oversized messages. Used by axum's `max_message_size`
87/// automatically; kept here for documentation and test assertions.
88#[cfg(test)]
89const CLOSE_MESSAGE_TOO_BIG: u16 = 1009;
90const CLOSE_NORMAL: u16 = 1000;
91
92/// Global WebSocket metrics counters.
93static WS_CONNECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
94static WS_MESSAGES_TOTAL: AtomicU64 = AtomicU64::new(0);
95
96/// Record WebSocket connection metric.
97fn record_ws_connection() {
98 // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
99 // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
100 let _ = WS_CONNECTIONS_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
101 Some(v.saturating_add(1))
102 });
103 metrics::counter!("vellaveto_ws_connections_total").increment(1);
104}
105
106/// Record WebSocket message metric.
107fn record_ws_message(direction: &str) {
108 // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
109 // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
110 let _ = WS_MESSAGES_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
111 Some(v.saturating_add(1))
112 });
113 metrics::counter!(
114 "vellaveto_ws_messages_total",
115 "direction" => direction.to_string()
116 )
117 .increment(1);
118}
119
120/// Get current connection count (for testing).
121#[cfg(test)]
122pub(crate) fn ws_connections_count() -> u64 {
123 WS_CONNECTIONS_TOTAL.load(Ordering::SeqCst)
124}
125
126/// Get current message count (for testing).
127#[cfg(test)]
128pub(crate) fn ws_messages_count() -> u64 {
129 WS_MESSAGES_TOTAL.load(Ordering::SeqCst)
130}
131
132use vellaveto_types::is_unicode_format_char as is_unicode_format_char_ws;
133
134/// Query parameters for the WebSocket upgrade endpoint.
135#[derive(Debug, serde::Deserialize, Default)]
136#[serde(deny_unknown_fields)]
137pub struct WsQueryParams {
138 /// Optional session ID for session resumption.
139 #[serde(default)]
140 pub session_id: Option<String>,
141}
142
143/// Handle WebSocket upgrade request at `/mcp/ws`.
144///
145/// Authenticates the request, validates origin, creates/resumes a session,
146/// and upgrades the HTTP connection to a WebSocket.
147pub async fn handle_ws_upgrade(
148 State(state): State<ProxyState>,
149 ConnectInfo(addr): ConnectInfo<SocketAddr>,
150 headers: HeaderMap,
151 query: axum::extract::Query<WsQueryParams>,
152 ws: WebSocketUpgrade,
153) -> Response {
154 // 1. Validate origin (CSRF / DNS rebinding defense)
155 if let Err(resp) = validate_origin(&headers, &state.bind_addr, &state.allowed_origins) {
156 return resp;
157 }
158
159 // 2. Authenticate before upgrade (API key or OAuth)
160 if let Err(resp) = validate_api_key(&state, &headers) {
161 return resp;
162 }
163
164 // SECURITY (FIND-R53-WS-001): Validate OAuth token at upgrade time.
165 // Parity with HTTP POST (handlers.rs:154) and GET (handlers.rs:2864).
166 // Without this, WS connections bypass token expiry checks.
167 let oauth_claims = match validate_oauth(
168 &state,
169 &headers,
170 "GET",
171 &super::auth::build_effective_request_uri(
172 &headers,
173 state.bind_addr,
174 &axum::http::Uri::from_static("/mcp/ws"),
175 false,
176 ),
177 query.session_id.as_deref(),
178 )
179 .await
180 {
181 Ok(claims) => claims,
182 Err(response) => return response,
183 };
184
185 // SECURITY (FIND-R53-WS-002, FIND-R159-003): Validate agent identity at upgrade time.
186 // Parity with HTTP POST (handlers.rs:160) and GET (handlers.rs:2871).
187 // FIND-R159-003: Identity MUST be stored in session (was previously discarded with `_`).
188 let agent_identity = match validate_agent_identity(&state, &headers).await {
189 Ok(identity) => identity,
190 Err(response) => return response,
191 };
192
193 // SECURITY (FIND-R55-WS-004, FIND-R81-001): Validate session_id length and
194 // control/format characters from query parameter. Parity with HTTP POST/GET
195 // handlers (handlers.rs:154, handlers.rs:2928) which reject control chars.
196 // SECURITY (FIND-R81-WS-001): Also reject Unicode format characters (zero-width,
197 // bidi overrides, BOM) that can bypass string-based security checks.
198 let ws_session_id = query.session_id.as_deref().filter(|id| {
199 !id.is_empty()
200 && id.len() <= 128
201 && !id
202 .chars()
203 .any(|c| c.is_control() || is_unicode_format_char_ws(c))
204 });
205
206 // 3. Get or create session
207 let session_id = state.sessions.get_or_create(ws_session_id);
208
209 // SECURITY (FIND-R53-WS-003): Session ownership binding — prevent session fixation.
210 // Parity with HTTP GET (handlers.rs:2914-2953).
211 if let Some(ref claims) = oauth_claims {
212 if let Some(mut session) = state.sessions.get_mut(&session_id) {
213 match &session.oauth_subject {
214 Some(owner) if owner != &claims.sub => {
215 tracing::warn!(
216 session_id = %session_id,
217 owner = %owner,
218 requester = %claims.sub,
219 "WS upgrade rejected: session owned by different principal"
220 );
221 return axum::response::IntoResponse::into_response((
222 axum::http::StatusCode::FORBIDDEN,
223 axum::Json(json!({
224 "error": "Session belongs to another principal"
225 })),
226 ));
227 }
228 None => {
229 // Bind session to this OAuth subject
230 session.oauth_subject = Some(claims.sub.clone());
231 // SECURITY (FIND-R73-SRV-006): Store token expiry, matching
232 // HTTP POST handler pattern to enforce token lifetime.
233 if claims.exp > 0 {
234 session.token_expires_at = Some(claims.exp);
235 }
236 }
237 _ => {
238 // Already owned by this principal — use earliest expiry
239 // SECURITY (FIND-R73-SRV-006): Parity with HTTP POST handler
240 // (R23-PROXY-6) — prevent long-lived tokens from extending
241 // sessions originally bound to short-lived tokens.
242 if claims.exp > 0 {
243 session.token_expires_at = Some(
244 session
245 .token_expires_at
246 .map_or(claims.exp, |existing| existing.min(claims.exp)),
247 );
248 }
249 }
250 }
251 }
252 }
253
254 // SECURITY (FIND-R159-003): Store agent identity in session — parity with HTTP
255 // POST (handlers.rs:295-298) and GET (handlers.rs:3641-3643). Without this,
256 // ABAC policies referencing agent_identity would evaluate against None for
257 // WebSocket connections, creating a policy bypass.
258 if let Some(ref identity) = agent_identity {
259 if let Some(mut session) = state.sessions.get_mut(&session_id) {
260 session.agent_identity = Some(identity.clone());
261 }
262 }
263
264 // SECURITY (FIND-R46-006): Validate and extract call chain from upgrade headers.
265 // The call chain is synced once during upgrade and reused for all messages
266 // in this WebSocket connection.
267 if let Err(reason) = super::call_chain::validate_call_chain_header(&headers, &state.limits) {
268 tracing::warn!(
269 session_id = %session_id,
270 "WS upgrade rejected: invalid call chain header: {}",
271 reason
272 );
273 return axum::response::IntoResponse::into_response((
274 axum::http::StatusCode::BAD_REQUEST,
275 axum::Json(json!({
276 "error": "Invalid request"
277 })),
278 ));
279 }
280 sync_session_call_chain_from_headers(
281 &state.sessions,
282 &session_id,
283 &headers,
284 state.call_chain_hmac_key.as_ref(),
285 &state.limits,
286 );
287
288 let ws_config = state.ws_config.clone().unwrap_or_default();
289
290 // Phase 28: Extract W3C Trace Context from the HTTP upgrade request headers.
291 // The trace_id is used for correlating all audit entries during this WS session.
292 let trace_ctx = super::trace_propagation::extract_trace_context(&headers);
293 let ws_trace_id = trace_ctx
294 .trace_id
295 .clone()
296 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string().replace('-', ""));
297
298 tracing::info!(
299 session_id = %session_id,
300 trace_id = %ws_trace_id,
301 peer = %addr,
302 "WebSocket upgrade accepted"
303 );
304
305 // 4. Configure and upgrade
306 ws.max_message_size(ws_config.max_message_size)
307 .on_upgrade(move |socket| {
308 handle_ws_connection(socket, state, session_id, ws_config, addr, ws_trace_id)
309 })
310}
311
312/// Handle an established WebSocket connection.
313///
314/// Establishes an upstream WS connection, then relays messages bidirectionally
315/// with policy enforcement on client→upstream messages and DLP/injection
316/// scanning on upstream→client messages.
317async fn handle_ws_connection(
318 client_ws: WebSocket,
319 state: ProxyState,
320 session_id: String,
321 ws_config: WebSocketConfig,
322 peer_addr: SocketAddr,
323 trace_id: String,
324) {
325 record_ws_connection();
326 let start = std::time::Instant::now();
327 tracing::debug!(
328 session_id = %session_id,
329 trace_id = %trace_id,
330 "WebSocket connection established with trace context"
331 );
332
333 // Connect to upstream — use gateway default backend if configured
334 let upstream_url = if let Some(ref gw) = state.gateway {
335 match gw.route("") {
336 Some(d) => convert_to_ws_url(&d.upstream_url),
337 None => {
338 tracing::error!(session_id = %session_id, "No healthy upstream for WebSocket");
339 let (mut client_sink, _) = client_ws.split();
340 let _ = client_sink
341 .send(Message::Close(Some(CloseFrame {
342 code: CLOSE_POLICY_VIOLATION,
343 reason: "No healthy upstream available".into(),
344 })))
345 .await;
346 return;
347 }
348 }
349 } else {
350 convert_to_ws_url(&state.upstream_url)
351 };
352 let upstream_ws = match connect_upstream_ws(&upstream_url).await {
353 Ok(ws) => ws,
354 Err(e) => {
355 tracing::error!(
356 session_id = %session_id,
357 "Failed to connect to upstream WebSocket: {}",
358 e
359 );
360 // Send close frame to client
361 let (mut client_sink, _) = client_ws.split();
362 let _ = client_sink
363 .send(Message::Close(Some(CloseFrame {
364 code: CLOSE_POLICY_VIOLATION,
365 reason: "Upstream connection failed".into(),
366 })))
367 .await;
368 return;
369 }
370 };
371
372 let (client_sink, client_stream) = client_ws.split();
373 let (upstream_sink, upstream_stream) = upstream_ws.split();
374
375 // Wrap sinks in Arc<Mutex> for shared access
376 let client_sink = Arc::new(Mutex::new(client_sink));
377 let upstream_sink = Arc::new(Mutex::new(upstream_sink));
378
379 // Rate limiter state: track messages in the current second window
380 let rate_counter = Arc::new(AtomicU64::new(0));
381 let rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
382
383 // SECURITY (FIND-R46-WS-003): Separate rate limiter for upstream→client direction
384 let upstream_rate_counter = Arc::new(AtomicU64::new(0));
385 let upstream_rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
386
387 let idle_timeout = Duration::from_secs(ws_config.idle_timeout_secs);
388
389 // SECURITY (FIND-R182-001): Shared last-activity tracker so idle timeout resets
390 // on every message (true idle detection, not max-lifetime).
391 let last_activity = Arc::new(AtomicU64::new(0));
392 let connection_epoch = std::time::Instant::now();
393
394 // Client → Vellaveto → Upstream relay
395 let client_to_upstream = {
396 let state = state.clone();
397 let session_id = session_id.clone();
398 let client_sink = client_sink.clone();
399 let upstream_sink = upstream_sink.clone();
400 let rate_counter = rate_counter.clone();
401 let rate_window_start = rate_window_start.clone();
402 let ws_config = ws_config.clone();
403 let last_activity = last_activity.clone();
404
405 relay_client_to_upstream(
406 client_stream,
407 client_sink,
408 upstream_sink,
409 state,
410 session_id,
411 ws_config,
412 rate_counter,
413 rate_window_start,
414 last_activity,
415 connection_epoch,
416 )
417 };
418
419 // Upstream → Vellaveto → Client relay
420 let upstream_to_client = {
421 let state = state.clone();
422 let session_id = session_id.clone();
423 let client_sink = client_sink.clone();
424 let ws_config = ws_config.clone();
425 let last_activity = last_activity.clone();
426
427 relay_upstream_to_client(
428 upstream_stream,
429 client_sink,
430 state,
431 session_id,
432 ws_config,
433 upstream_rate_counter,
434 upstream_rate_window_start,
435 last_activity,
436 connection_epoch,
437 )
438 };
439
440 // SECURITY (FIND-R182-001): True idle timeout — check periodically and
441 // close only if no message activity since last check.
442 let idle_check = {
443 let session_id = session_id.clone();
444 let last_activity = last_activity.clone();
445 async move {
446 // Check every 10% of idle timeout (min 1s) for responsive detection.
447 let check_interval = Duration::from_secs((ws_config.idle_timeout_secs / 10).max(1));
448 let mut interval = tokio::time::interval(check_interval);
449 interval.tick().await; // first tick is immediate, skip it
450 loop {
451 interval.tick().await;
452 let last_ms = last_activity.load(Ordering::Relaxed);
453 // SECURITY (FIND-R190-002): Use saturating_sub to prevent underflow
454 // if Relaxed ordering causes a stale last_ms value.
455 let elapsed_since_activity =
456 (connection_epoch.elapsed().as_millis() as u64).saturating_sub(last_ms);
457 if elapsed_since_activity >= idle_timeout.as_millis() as u64 {
458 tracing::info!(
459 session_id = %session_id,
460 idle_secs = elapsed_since_activity / 1000,
461 "WebSocket idle timeout ({}s), closing",
462 ws_config.idle_timeout_secs
463 );
464 break;
465 }
466 }
467 }
468 };
469
470 // Run both relay loops with idle timeout
471 tokio::select! {
472 _ = client_to_upstream => {
473 tracing::debug!(session_id = %session_id, "Client stream ended");
474 }
475 _ = upstream_to_client => {
476 tracing::debug!(session_id = %session_id, "Upstream stream ended");
477 }
478 _ = idle_check => {}
479 }
480
481 // Clean shutdown: close both sides
482 {
483 let mut sink = client_sink.lock().await;
484 let _ = sink
485 .send(Message::Close(Some(CloseFrame {
486 code: CLOSE_NORMAL,
487 reason: "Session ended".into(),
488 })))
489 .await;
490 }
491 {
492 let mut sink = upstream_sink.lock().await;
493 let _ = sink.close().await;
494 }
495
496 let duration = start.elapsed();
497 metrics::histogram!("vellaveto_ws_connection_duration_seconds").record(duration.as_secs_f64());
498
499 tracing::info!(
500 session_id = %session_id,
501 peer = %peer_addr,
502 duration_secs = duration.as_secs(),
503 "WebSocket connection closed"
504 );
505}
506
507/// Relay messages from client to upstream with policy enforcement.
508#[allow(clippy::too_many_arguments)]
509#[allow(deprecated)] // evaluate_action_with_context: migration tracked in FIND-CREATIVE-005
510async fn relay_client_to_upstream(
511 mut client_stream: futures_util::stream::SplitStream<WebSocket>,
512 client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
513 upstream_sink: Arc<
514 Mutex<
515 futures_util::stream::SplitSink<
516 tokio_tungstenite::WebSocketStream<
517 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
518 >,
519 tokio_tungstenite::tungstenite::Message,
520 >,
521 >,
522 >,
523 state: ProxyState,
524 session_id: String,
525 ws_config: WebSocketConfig,
526 rate_counter: Arc<AtomicU64>,
527 rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
528 last_activity: Arc<AtomicU64>,
529 connection_epoch: std::time::Instant,
530) {
531 while let Some(msg_result) = client_stream.next().await {
532 let msg = match msg_result {
533 Ok(m) => m,
534 Err(e) => {
535 tracing::debug!(session_id = %session_id, "Client WS error: {}", e);
536 break;
537 }
538 };
539
540 // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
541 last_activity.store(
542 connection_epoch.elapsed().as_millis() as u64,
543 Ordering::Relaxed,
544 );
545
546 record_ws_message("client_to_upstream");
547
548 // SECURITY (FIND-R52-WS-003): Per-message OAuth token expiry check.
549 // After WebSocket upgrade, the HTTP auth middleware no longer runs.
550 // A token that expires mid-connection must be rejected to prevent
551 // indefinite access via a long-lived WebSocket.
552 {
553 let token_expired = state
554 .sessions
555 .get_mut(&session_id)
556 .and_then(|s| {
557 s.token_expires_at.map(|exp| {
558 let now = std::time::SystemTime::now()
559 .duration_since(std::time::UNIX_EPOCH)
560 .unwrap_or_default()
561 .as_secs();
562 now >= exp
563 })
564 })
565 .unwrap_or(false);
566 if token_expired {
567 tracing::warn!(
568 session_id = %session_id,
569 "SECURITY: OAuth token expired during WebSocket session, closing"
570 );
571 let error = json!({
572 "jsonrpc": "2.0",
573 "error": {
574 "code": -32001,
575 "message": "Session expired"
576 },
577 "id": null
578 });
579 let error_text = serde_json::to_string(&error)
580 .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32001,"message":"Session expired"},"id":null}"#.to_string());
581 let mut sink = client_sink.lock().await;
582 let _ = sink.send(Message::Text(error_text.into())).await;
583 let _ = sink
584 .send(Message::Close(Some(CloseFrame {
585 code: CLOSE_POLICY_VIOLATION,
586 reason: "Token expired".into(),
587 })))
588 .await;
589 break;
590 }
591 }
592
593 match msg {
594 Message::Text(text) => {
595 // Rate limiting
596 if !check_rate_limit(
597 &rate_counter,
598 &rate_window_start,
599 ws_config.message_rate_limit,
600 ) {
601 tracing::warn!(
602 session_id = %session_id,
603 "WebSocket rate limit exceeded, closing"
604 );
605 let mut sink = client_sink.lock().await;
606 let _ = sink
607 .send(Message::Close(Some(CloseFrame {
608 code: CLOSE_POLICY_VIOLATION,
609 reason: "Rate limit exceeded".into(),
610 })))
611 .await;
612 break;
613 }
614
615 // SECURITY (FIND-R46-005): Reject JSON with duplicate keys before parsing.
616 // Prevents parser-disagreement attacks (CVE-2017-12635, CVE-2020-16250)
617 // where the proxy evaluates one key value but upstream sees another.
618 if let Some(dup_key) = vellaveto_mcp::framing::find_duplicate_json_key(&text) {
619 tracing::warn!(
620 session_id = %session_id,
621 "SECURITY: Rejected WS message with duplicate key: \"{}\"",
622 dup_key
623 );
624 let mut sink = client_sink.lock().await;
625 let _ = sink
626 .send(Message::Close(Some(CloseFrame {
627 code: CLOSE_POLICY_VIOLATION,
628 reason: "Duplicate JSON key detected".into(),
629 })))
630 .await;
631 break;
632 }
633
634 // SECURITY (FIND-R53-WS-004): Reject WS messages with control characters.
635 // Parity with HTTP GET event_id validation (handlers.rs:2899).
636 // Control chars in JSON-RPC messages can be used for log injection
637 // or to bypass string-based security checks.
638 if text.chars().any(|c| {
639 // Allow standard JSON whitespace (\t, \n, \r) but reject other
640 // ASCII control chars and Unicode format chars (FIND-R54-011).
641 (c.is_control() && c != '\n' && c != '\r' && c != '\t')
642 || is_unicode_format_char_ws(c)
643 }) {
644 tracing::warn!(
645 session_id = %session_id,
646 "SECURITY: Rejected WS message with control characters"
647 );
648 let error =
649 make_ws_error_response(None, -32600, "Message contains control characters");
650 let mut sink = client_sink.lock().await;
651 let _ = sink.send(Message::Text(error.into())).await;
652 continue;
653 }
654
655 // Parse JSON
656 let parsed: Value = match serde_json::from_str(&text) {
657 Ok(v) => v,
658 Err(_) => {
659 tracing::warn!(
660 session_id = %session_id,
661 "Unparseable JSON in WebSocket text frame, closing (fail-closed)"
662 );
663 let mut sink = client_sink.lock().await;
664 let _ = sink
665 .send(Message::Close(Some(CloseFrame {
666 code: CLOSE_POLICY_VIOLATION,
667 reason: "Invalid JSON".into(),
668 })))
669 .await;
670 break;
671 }
672 };
673
674 // SECURITY (FIND-R46-WS-001): Injection scanning on client→upstream text frames.
675 // The HTTP proxy scans request bodies for injection; the WebSocket proxy must
676 // do the same to maintain security parity. Fail-closed: if injection is detected
677 // and blocking is enabled, deny the message.
678 if !state.injection_disabled {
679 let scannable = extract_scannable_text_from_request(&parsed);
680 if !scannable.is_empty() {
681 let injection_matches: Vec<String> =
682 if let Some(ref scanner) = state.injection_scanner {
683 scanner
684 .inspect(&scannable)
685 .into_iter()
686 .map(|s| s.to_string())
687 .collect()
688 } else {
689 inspect_for_injection(&scannable)
690 .into_iter()
691 .map(|s| s.to_string())
692 .collect()
693 };
694
695 if !injection_matches.is_empty() {
696 tracing::warn!(
697 "SECURITY: Injection in WS client request! Session: {}, Patterns: {:?}",
698 session_id,
699 injection_matches,
700 );
701
702 let verdict = if state.injection_blocking {
703 Verdict::Deny {
704 reason: format!(
705 "WS request injection blocked: {:?}",
706 injection_matches
707 ),
708 }
709 } else {
710 Verdict::Allow
711 };
712
713 let action = Action::new(
714 "vellaveto",
715 "ws_request_injection",
716 json!({
717 "matched_patterns": injection_matches,
718 "session": session_id,
719 "transport": "websocket",
720 "direction": "client_to_upstream",
721 }),
722 );
723 if let Err(e) = state
724 .audit
725 .log_entry(
726 &action,
727 &verdict,
728 json!({
729 "source": "ws_proxy",
730 "event": "ws_request_injection_detected",
731 }),
732 )
733 .await
734 {
735 tracing::warn!("Failed to audit WS request injection: {}", e);
736 }
737
738 if state.injection_blocking {
739 let id = parsed.get("id");
740 let error = make_ws_error_response(
741 id,
742 -32001,
743 "Request blocked: injection detected",
744 );
745 let mut sink = client_sink.lock().await;
746 let _ = sink.send(Message::Text(error.into())).await;
747 continue;
748 }
749 }
750 }
751 }
752
753 // Classify and evaluate
754 let classified = extractor::classify_message(&parsed);
755 match classified {
756 MessageType::ToolCall {
757 ref id,
758 ref tool_name,
759 ref arguments,
760 } => {
761 // SECURITY (FIND-R46-009): Strict tool name validation (MCP 2025-11-25).
762 // When enabled, reject tool names that don't conform to the spec format.
763 if state.streamable_http.strict_tool_name_validation {
764 if let Err(e) = vellaveto_types::validate_mcp_tool_name(tool_name) {
765 tracing::warn!(
766 session_id = %session_id,
767 "SECURITY: Rejecting invalid WS tool name '{}': {}",
768 tool_name,
769 e
770 );
771 let error =
772 make_ws_error_response(Some(id), -32602, "Invalid tool name");
773 let mut sink = client_sink.lock().await;
774 let _ = sink.send(Message::Text(error.into())).await;
775 continue;
776 }
777 }
778
779 let mut action = extractor::extract_action(tool_name, arguments);
780
781 // SECURITY (FIND-R75-002): DNS resolution for IP-based policy evaluation.
782 // Parity with HTTP handler (handlers.rs:717). Without this, policies
783 // using ip_rules are completely bypassed on the WebSocket transport.
784 if state.engine.has_ip_rules() {
785 super::helpers::resolve_domains(&mut action).await;
786 }
787
788 // SECURITY (FIND-R46-006): Call chain validation and privilege escalation check.
789 // Extract X-Upstream-Agents from the initial WS upgrade headers stored in session.
790 // For WebSocket, we sync the call chain once during upgrade and reuse it.
791 let upstream_chain = {
792 let session_ref = state.sessions.get_mut(&session_id);
793 session_ref
794 .map(|s| s.current_call_chain.clone())
795 .unwrap_or_default()
796 };
797 let current_agent_id = {
798 let session_ref = state.sessions.get_mut(&session_id);
799 session_ref.and_then(|s| s.oauth_subject.clone())
800 };
801
802 // SECURITY (FIND-R46-006): Privilege escalation detection.
803 if !upstream_chain.is_empty() {
804 let priv_check = check_privilege_escalation(
805 &state.engine,
806 &state.policies,
807 &action,
808 &upstream_chain,
809 current_agent_id.as_deref(),
810 );
811 if priv_check.escalation_detected {
812 let verdict = Verdict::Deny {
813 reason: format!(
814 "Privilege escalation: agent '{}' would be denied",
815 priv_check
816 .escalating_from_agent
817 .as_deref()
818 .unwrap_or("unknown")
819 ),
820 };
821 if let Err(e) = state
822 .audit
823 .log_entry(
824 &action,
825 &verdict,
826 json!({
827 "source": "ws_proxy",
828 "session": session_id,
829 "transport": "websocket",
830 "event": "privilege_escalation_blocked",
831 "escalating_from_agent": priv_check.escalating_from_agent,
832 "upstream_deny_reason": priv_check.upstream_deny_reason,
833 }),
834 )
835 .await
836 {
837 tracing::warn!(
838 "Failed to audit WS privilege escalation: {}",
839 e
840 );
841 }
842 let error =
843 make_ws_error_response(Some(id), -32001, "Denied by policy");
844 let mut sink = client_sink.lock().await;
845 let _ = sink.send(Message::Text(error.into())).await;
846 continue;
847 }
848 }
849
850 // SECURITY (FIND-R46-007): Rug-pull detection.
851 // Block calls to tools whose annotations changed since initial tools/list.
852 let is_flagged = state
853 .sessions
854 .get_mut(&session_id)
855 .map(|s| s.flagged_tools.contains(tool_name))
856 .unwrap_or(false);
857 if is_flagged {
858 let verdict = Verdict::Deny {
859 reason: format!(
860 "Tool '{}' blocked: annotations changed (rug-pull detected)",
861 tool_name
862 ),
863 };
864 if let Err(e) = state
865 .audit
866 .log_entry(
867 &action,
868 &verdict,
869 json!({
870 "source": "ws_proxy",
871 "session": session_id,
872 "transport": "websocket",
873 "event": "rug_pull_tool_blocked",
874 "tool": tool_name,
875 }),
876 )
877 .await
878 {
879 tracing::warn!("Failed to audit WS rug-pull block: {}", e);
880 }
881 let error =
882 make_ws_error_response(Some(id), -32001, "Denied by policy");
883 let mut sink = client_sink.lock().await;
884 let _ = sink.send(Message::Text(error.into())).await;
885 continue;
886 }
887
888 // SECURITY (FIND-R52-WS-001): DLP scan parameters for secret exfiltration.
889 // Matches HTTP handler's DLP check to maintain security parity.
890 {
891 let dlp_findings = scan_parameters_for_secrets(arguments);
892 // SECURITY (FIND-R55-WS-001): DLP on request params always blocks,
893 // matching HTTP handler. Previously gated on injection_blocking flag.
894 if !dlp_findings.is_empty() {
895 for finding in &dlp_findings {
896 record_dlp_finding(&finding.pattern_name);
897 }
898 let patterns: Vec<String> = dlp_findings
899 .iter()
900 .map(|f| format!("{} at {}", f.pattern_name, f.location))
901 .collect();
902 let audit_reason = format!(
903 "DLP: secrets detected in tool parameters: {:?}",
904 patterns
905 );
906 tracing::warn!(
907 "SECURITY: DLP blocking WS tool '{}' in session {}: {}",
908 tool_name,
909 session_id,
910 audit_reason
911 );
912 let dlp_action = extractor::extract_action(tool_name, arguments);
913 if let Err(e) = state
914 .audit
915 .log_entry(
916 &dlp_action,
917 &Verdict::Deny {
918 reason: audit_reason,
919 },
920 json!({
921 "source": "ws_proxy",
922 "session": session_id,
923 "transport": "websocket",
924 "event": "dlp_secret_blocked",
925 "tool": tool_name,
926 "findings": patterns,
927 }),
928 )
929 .await
930 {
931 tracing::warn!("Failed to audit WS DLP finding: {}", e);
932 }
933 let error = make_ws_error_response(
934 Some(id),
935 -32001,
936 "Request blocked: security policy violation",
937 );
938 let mut sink = client_sink.lock().await;
939 let _ = sink.send(Message::Text(error.into())).await;
940 continue;
941 }
942 }
943
944 // SECURITY (FIND-R52-WS-002): Memory poisoning detection.
945 // Check if tool call parameters contain replayed response data,
946 // matching the HTTP handler's memory poisoning check.
947 {
948 let poisoning_detected = state
949 .sessions
950 .get_mut(&session_id)
951 .and_then(|session| {
952 let matches =
953 session.memory_tracker.check_parameters(arguments);
954 if !matches.is_empty() {
955 for m in &matches {
956 tracing::warn!(
957 "SECURITY: Memory poisoning detected in WS tool '{}' (session {}): \
958 param '{}' contains replayed data (fingerprint: {})",
959 tool_name,
960 session_id,
961 m.param_location,
962 m.fingerprint
963 );
964 }
965 Some(matches.len())
966 } else {
967 None
968 }
969 });
970 if let Some(match_count) = poisoning_detected {
971 let poison_action = extractor::extract_action(tool_name, arguments);
972 let deny_reason = format!(
973 "Memory poisoning detected: {} replayed data fragment(s) in tool '{}'",
974 match_count, tool_name
975 );
976 if let Err(e) = state
977 .audit
978 .log_entry(
979 &poison_action,
980 &Verdict::Deny {
981 reason: deny_reason,
982 },
983 json!({
984 "source": "ws_proxy",
985 "session": session_id,
986 "transport": "websocket",
987 "event": "memory_poisoning_detected",
988 "matches": match_count,
989 "tool": tool_name,
990 }),
991 )
992 .await
993 {
994 tracing::warn!("Failed to audit WS memory poisoning: {}", e);
995 }
996 let error = make_ws_error_response(
997 Some(id),
998 -32001,
999 "Request blocked: security policy violation",
1000 );
1001 let mut sink = client_sink.lock().await;
1002 let _ = sink.send(Message::Text(error.into())).await;
1003 continue;
1004 }
1005 }
1006
1007 // SECURITY (FIND-R46-008): Circuit breaker check.
1008 // If the circuit is open for this tool, reject immediately.
1009 if let Some(ref circuit_breaker) = state.circuit_breaker {
1010 if let Err(reason) = circuit_breaker.can_proceed(tool_name) {
1011 tracing::warn!(
1012 session_id = %session_id,
1013 "SECURITY: WS circuit breaker open for tool '{}': {}",
1014 tool_name,
1015 reason
1016 );
1017 let verdict = Verdict::Deny {
1018 reason: format!("Circuit breaker open: {}", reason),
1019 };
1020 if let Err(e) = state
1021 .audit
1022 .log_entry(
1023 &action,
1024 &verdict,
1025 json!({
1026 "source": "ws_proxy",
1027 "session": session_id,
1028 "transport": "websocket",
1029 "event": "circuit_breaker_rejected",
1030 "tool": tool_name,
1031 }),
1032 )
1033 .await
1034 {
1035 tracing::warn!(
1036 "Failed to audit WS circuit breaker rejection: {}",
1037 e
1038 );
1039 }
1040 let error = make_ws_error_response(
1041 Some(id),
1042 -32001,
1043 "Service temporarily unavailable",
1044 );
1045 let mut sink = client_sink.lock().await;
1046 let _ = sink.send(Message::Text(error.into())).await;
1047 continue;
1048 }
1049 }
1050
1051 // SECURITY (FIND-R46-013): Tool registry trust check.
1052 // If tool registry is configured, check trust level before evaluation.
1053 if let Some(ref registry) = state.tool_registry {
1054 let trust = registry.check_trust_level(tool_name).await;
1055 match trust {
1056 vellaveto_mcp::tool_registry::TrustLevel::Unknown => {
1057 registry.register_unknown(tool_name).await;
1058 let verdict = Verdict::Deny {
1059 reason: "Unknown tool requires approval".to_string(),
1060 };
1061 if let Err(e) = state
1062 .audit
1063 .log_entry(
1064 &action,
1065 &verdict,
1066 json!({
1067 "source": "ws_proxy",
1068 "session": session_id,
1069 "transport": "websocket",
1070 "registry": "unknown_tool",
1071 "tool": tool_name,
1072 }),
1073 )
1074 .await
1075 {
1076 tracing::warn!("Failed to audit WS unknown tool: {}", e);
1077 }
1078 let approval_reason = "Approval required";
1079 let approval_id = create_ws_approval(
1080 &state,
1081 &session_id,
1082 &action,
1083 approval_reason,
1084 )
1085 .await;
1086 let error = make_ws_error_response_with_data(
1087 Some(id),
1088 -32001,
1089 approval_reason,
1090 Some(json!({
1091 "verdict": "require_approval",
1092 "reason": approval_reason,
1093 "approval_id": approval_id,
1094 })),
1095 );
1096 let mut sink = client_sink.lock().await;
1097 let _ = sink.send(Message::Text(error.into())).await;
1098 continue;
1099 }
1100 vellaveto_mcp::tool_registry::TrustLevel::Untrusted {
1101 score: _,
1102 } => {
1103 let verdict = Verdict::Deny {
1104 reason: "Untrusted tool requires approval".to_string(),
1105 };
1106 if let Err(e) = state
1107 .audit
1108 .log_entry(
1109 &action,
1110 &verdict,
1111 json!({
1112 "source": "ws_proxy",
1113 "session": session_id,
1114 "transport": "websocket",
1115 "registry": "untrusted_tool",
1116 "tool": tool_name,
1117 }),
1118 )
1119 .await
1120 {
1121 tracing::warn!("Failed to audit WS untrusted tool: {}", e);
1122 }
1123 let approval_reason = "Approval required";
1124 let approval_id = create_ws_approval(
1125 &state,
1126 &session_id,
1127 &action,
1128 approval_reason,
1129 )
1130 .await;
1131 let error = make_ws_error_response_with_data(
1132 Some(id),
1133 -32001,
1134 approval_reason,
1135 Some(json!({
1136 "verdict": "require_approval",
1137 "reason": approval_reason,
1138 "approval_id": approval_id,
1139 })),
1140 );
1141 let mut sink = client_sink.lock().await;
1142 let _ = sink.send(Message::Text(error.into())).await;
1143 continue;
1144 }
1145 vellaveto_mcp::tool_registry::TrustLevel::Trusted => {
1146 // Trusted — proceed to engine evaluation
1147 }
1148 }
1149 }
1150
1151 // SECURITY (FIND-R130-002): Combine context read, evaluation,
1152 // and session update into a single block holding the DashMap
1153 // shard lock. Without this, concurrent WS connections sharing
1154 // a session can bypass max_calls_in_window by racing: both
1155 // clone the same stale call_counts, both pass evaluation, both
1156 // increment. Matches HTTP handler R19-TOCTOU pattern
1157 // (handlers.rs:725-789).
1158 let (verdict, ctx) = if let Some(mut session) =
1159 state.sessions.get_mut(&session_id)
1160 {
1161 let ctx = EvaluationContext {
1162 timestamp: None,
1163 agent_id: session.oauth_subject.clone(),
1164 agent_identity: session.agent_identity.clone(),
1165 call_counts: session.call_counts.clone(),
1166 previous_actions: session.action_history.iter().cloned().collect(),
1167 call_chain: session.current_call_chain.clone(),
1168 tenant_id: None,
1169 verification_tier: None,
1170 capability_token: None,
1171 session_state: None,
1172 };
1173
1174 let verdict = match state.engine.evaluate_action_with_context(
1175 &action,
1176 &state.policies,
1177 Some(&ctx),
1178 ) {
1179 Ok(v) => v,
1180 Err(e) => {
1181 tracing::error!(
1182 session_id = %session_id,
1183 "Policy evaluation error: {}",
1184 e
1185 );
1186 Verdict::Deny {
1187 reason: format!("Policy evaluation failed: {}", e),
1188 }
1189 }
1190 };
1191
1192 // Atomically update session on Allow while still holding
1193 // the shard lock — prevents TOCTOU bypass of call limits.
1194 if matches!(verdict, Verdict::Allow) {
1195 session.touch();
1196 use crate::proxy::call_chain::{
1197 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
1198 };
1199 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
1200 || session.call_counts.contains_key(tool_name)
1201 {
1202 let count = session
1203 .call_counts
1204 .entry(tool_name.to_string())
1205 .or_insert(0);
1206 *count = count.saturating_add(1);
1207 }
1208 if session.action_history.len() >= MAX_ACTION_HISTORY {
1209 session.action_history.pop_front();
1210 }
1211 session.action_history.push_back(tool_name.to_string());
1212 }
1213
1214 (verdict, ctx)
1215 } else {
1216 // No session — evaluate without context (fail-closed)
1217 let verdict = match state.engine.evaluate_action_with_context(
1218 &action,
1219 &state.policies,
1220 None,
1221 ) {
1222 Ok(v) => v,
1223 Err(e) => {
1224 tracing::error!(
1225 session_id = %session_id,
1226 "Policy evaluation error: {}",
1227 e
1228 );
1229 Verdict::Deny {
1230 reason: format!("Policy evaluation failed: {}", e),
1231 }
1232 }
1233 };
1234 (verdict, EvaluationContext::default())
1235 };
1236
1237 match verdict {
1238 Verdict::Allow => {
1239 // Phase 21: ABAC refinement — only runs when ABAC engine is configured
1240 if let Some(ref abac) = state.abac_engine {
1241 let principal_id =
1242 ctx.agent_id.as_deref().unwrap_or("anonymous");
1243 let principal_type = ctx.principal_type();
1244 let session_risk = state
1245 .sessions
1246 .get_mut(&session_id)
1247 .and_then(|s| s.risk_score.clone());
1248 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
1249 eval_ctx: &ctx,
1250 principal_type,
1251 principal_id,
1252 risk_score: session_risk.as_ref(),
1253 };
1254 match abac.evaluate(&action, &abac_ctx) {
1255 vellaveto_engine::abac::AbacDecision::Deny {
1256 policy_id,
1257 reason,
1258 } => {
1259 let deny_verdict = Verdict::Deny {
1260 reason: reason.clone(),
1261 };
1262 if let Err(e) = state
1263 .audit
1264 .log_entry(
1265 &action,
1266 &deny_verdict,
1267 json!({
1268 "source": "ws_proxy",
1269 "session": session_id,
1270 "transport": "websocket",
1271 "event": "abac_deny",
1272 "abac_policy": policy_id,
1273 }),
1274 )
1275 .await
1276 {
1277 tracing::warn!(
1278 "Failed to audit WS ABAC deny: {}",
1279 e
1280 );
1281 }
1282 // SECURITY (FIND-R46-012): Generic message to client;
1283 // detailed reason (ABAC policy_id, reason) is in
1284 // the audit log only.
1285 let error_resp = make_ws_error_response(
1286 Some(id),
1287 -32001,
1288 "Denied by policy",
1289 );
1290 let mut sink = client_sink.lock().await;
1291 let _ =
1292 sink.send(Message::Text(error_resp.into())).await;
1293 continue;
1294 }
1295 vellaveto_engine::abac::AbacDecision::Allow {
1296 policy_id,
1297 } => {
1298 if let Some(ref la) = state.least_agency {
1299 la.record_usage(
1300 principal_id,
1301 &session_id,
1302 &policy_id,
1303 tool_name,
1304 &action.function,
1305 );
1306 }
1307 }
1308 vellaveto_engine::abac::AbacDecision::NoMatch => {
1309 // Fall through — existing Allow stands
1310 }
1311 #[allow(unreachable_patterns)]
1312 // AbacDecision is #[non_exhaustive]
1313 _ => {
1314 // SECURITY (FIND-R74-002): Future variants — fail-closed (deny).
1315 // Must send deny and continue, not fall through to Allow path.
1316 tracing::warn!(
1317 "Unknown AbacDecision variant — fail-closed"
1318 );
1319 let error_resp = make_ws_error_response(
1320 Some(id),
1321 -32001,
1322 "Denied by policy",
1323 );
1324 let mut sink = client_sink.lock().await;
1325 let _ =
1326 sink.send(Message::Text(error_resp.into())).await;
1327 continue;
1328 }
1329 }
1330 }
1331
1332 // SECURITY (FIND-R46-013): Record tool call in registry on Allow
1333 if let Some(ref registry) = state.tool_registry {
1334 registry.record_call(tool_name).await;
1335 }
1336
1337 // NOTE: Session touch + call_counts/action_history
1338 // update already performed inside the TOCTOU-safe
1339 // block above (FIND-R130-002). No separate update here.
1340
1341 // Audit the allow
1342 if let Err(e) = state
1343 .audit
1344 .log_entry(
1345 &action,
1346 &Verdict::Allow,
1347 json!({
1348 "source": "ws_proxy",
1349 "session": session_id,
1350 "transport": "websocket",
1351 }),
1352 )
1353 .await
1354 {
1355 tracing::error!(
1356 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1357 e
1358 );
1359 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1360 // No unaudited security decisions can occur.
1361 if state.audit_strict_mode {
1362 let error = make_ws_error_response(
1363 Some(id),
1364 -32000,
1365 "Audit logging failed — request denied (strict audit mode)",
1366 );
1367 let mut sink = client_sink.lock().await;
1368 let _ = sink.send(Message::Text(error.into())).await;
1369 continue;
1370 }
1371 }
1372
1373 // Canonicalize and forward
1374 let forward_text = if state.canonicalize {
1375 match serde_json::to_string(&parsed) {
1376 Ok(canonical) => canonical,
1377 Err(e) => {
1378 tracing::error!(
1379 "SECURITY: WS canonicalization failed: {}",
1380 e
1381 );
1382 let error_resp = make_ws_error_response(
1383 Some(id),
1384 -32603,
1385 "Internal error",
1386 );
1387 let mut sink = client_sink.lock().await;
1388 let _ =
1389 sink.send(Message::Text(error_resp.into())).await;
1390 continue;
1391 }
1392 }
1393 } else {
1394 text.to_string()
1395 };
1396
1397 // Track request→response mapping for output-schema
1398 // enforcement when upstream omits result._meta.tool.
1399 track_pending_tool_call(
1400 &state.sessions,
1401 &session_id,
1402 id,
1403 tool_name,
1404 );
1405
1406 let mut sink = upstream_sink.lock().await;
1407 if let Err(e) = sink
1408 .send(tokio_tungstenite::tungstenite::Message::Text(
1409 forward_text.into(),
1410 ))
1411 .await
1412 {
1413 tracing::error!(
1414 session_id = %session_id,
1415 "Failed to forward to upstream: {}",
1416 e
1417 );
1418 break;
1419 }
1420 }
1421 Verdict::Deny { ref reason } => {
1422 // Audit the denial with detailed reason
1423 if let Err(e) = state
1424 .audit
1425 .log_entry(
1426 &action,
1427 &verdict,
1428 json!({
1429 "source": "ws_proxy",
1430 "session": session_id,
1431 "transport": "websocket",
1432 }),
1433 )
1434 .await
1435 {
1436 tracing::error!(
1437 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1438 e
1439 );
1440 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1441 if state.audit_strict_mode {
1442 let error = make_ws_error_response(
1443 Some(id),
1444 -32000,
1445 "Audit logging failed — request denied (strict audit mode)",
1446 );
1447 let mut sink = client_sink.lock().await;
1448 let _ = sink.send(Message::Text(error.into())).await;
1449 continue;
1450 }
1451 }
1452
1453 // SECURITY (FIND-R46-012): Generic message to client.
1454 // Detailed reason is in the audit log only.
1455 let _ = reason; // used in audit above
1456 let error =
1457 make_ws_error_response(Some(id), -32001, "Denied by policy");
1458 let mut sink = client_sink.lock().await;
1459 let _ = sink.send(Message::Text(error.into())).await;
1460 }
1461 Verdict::RequireApproval { ref reason, .. } => {
1462 // Treat as deny for audit, but preserve approval semantics.
1463 let deny_reason = format!("Requires approval: {}", reason);
1464 if let Err(e) = state
1465 .audit
1466 .log_entry(
1467 &action,
1468 &Verdict::Deny {
1469 reason: deny_reason.clone(),
1470 },
1471 json!({
1472 "source": "ws_proxy",
1473 "session": session_id,
1474 "transport": "websocket",
1475 }),
1476 )
1477 .await
1478 {
1479 tracing::error!(
1480 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1481 e
1482 );
1483 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1484 if state.audit_strict_mode {
1485 let error = make_ws_error_response(
1486 Some(id),
1487 -32000,
1488 "Audit logging failed — request denied (strict audit mode)",
1489 );
1490 let mut sink = client_sink.lock().await;
1491 let _ = sink.send(Message::Text(error.into())).await;
1492 continue;
1493 }
1494 }
1495 let approval_reason = "Approval required";
1496 let approval_id =
1497 create_ws_approval(&state, &session_id, &action, reason).await;
1498 let error = make_ws_error_response_with_data(
1499 Some(id),
1500 -32001,
1501 approval_reason,
1502 Some(json!({
1503 "verdict": "require_approval",
1504 "reason": approval_reason,
1505 "approval_id": approval_id,
1506 })),
1507 );
1508 let mut sink = client_sink.lock().await;
1509 let _ = sink.send(Message::Text(error.into())).await;
1510 }
1511 // Fail-closed: unknown Verdict variants produce Deny
1512 _ => {
1513 let error =
1514 make_ws_error_response(Some(id), -32001, "Denied by policy");
1515 let mut sink = client_sink.lock().await;
1516 let _ = sink.send(Message::Text(error.into())).await;
1517 }
1518 }
1519 }
1520 MessageType::ResourceRead { ref id, ref uri } => {
1521 // SECURITY (FIND-R74-007): Check for memory poisoning in resource URI.
1522 // ResourceRead is a likely exfiltration vector: a poisoned tool response
1523 // says "read this file" and the agent issues resources/read for that URI.
1524 // Parity with HTTP handler (handlers.rs:1472).
1525 {
1526 let poisoning_detected = state
1527 .sessions
1528 .get_mut(&session_id)
1529 .and_then(|session| {
1530 let uri_params = json!({"uri": uri});
1531 let matches =
1532 session.memory_tracker.check_parameters(&uri_params);
1533 if !matches.is_empty() {
1534 for m in &matches {
1535 tracing::warn!(
1536 "SECURITY: Memory poisoning detected in WS resources/read (session {}): \
1537 param '{}' contains replayed data (fingerprint: {})",
1538 session_id,
1539 m.param_location,
1540 m.fingerprint
1541 );
1542 }
1543 Some(matches.len())
1544 } else {
1545 None
1546 }
1547 });
1548 if let Some(match_count) = poisoning_detected {
1549 let poison_action = extractor::extract_resource_action(uri);
1550 let deny_reason = format!(
1551 "Memory poisoning detected: {} replayed data fragment(s) in resources/read",
1552 match_count
1553 );
1554 if let Err(e) = state
1555 .audit
1556 .log_entry(
1557 &poison_action,
1558 &Verdict::Deny {
1559 reason: deny_reason.clone(),
1560 },
1561 json!({
1562 "source": "ws_proxy",
1563 "session": session_id,
1564 "transport": "websocket",
1565 "event": "memory_poisoning_detected",
1566 "matches": match_count,
1567 "uri": uri,
1568 }),
1569 )
1570 .await
1571 {
1572 tracing::warn!(
1573 "Failed to audit WS resource memory poisoning: {}",
1574 e
1575 );
1576 }
1577 let error = make_ws_error_response(
1578 Some(id),
1579 -32001,
1580 "Request blocked: security policy violation",
1581 );
1582 let mut sink = client_sink.lock().await;
1583 let _ = sink.send(Message::Text(error.into())).await;
1584 continue;
1585 }
1586 }
1587
1588 // SECURITY (FIND-R115-041): Rug-pull detection for resource URIs.
1589 // If the upstream server was flagged (annotations changed since initial
1590 // tools/list), block resource reads from that server.
1591 // Parity with HTTP handler (handlers.rs:1555).
1592 {
1593 let is_flagged = state
1594 .sessions
1595 .get_mut(&session_id)
1596 .map(|s| s.flagged_tools.contains(uri.as_str()))
1597 .unwrap_or(false);
1598 if is_flagged {
1599 let action = extractor::extract_resource_action(uri);
1600 let verdict = Verdict::Deny {
1601 reason: format!(
1602 "Resource '{}' blocked: server flagged by rug-pull detection",
1603 uri
1604 ),
1605 };
1606 if let Err(e) = state
1607 .audit
1608 .log_entry(
1609 &action,
1610 &verdict,
1611 json!({
1612 "source": "ws_proxy",
1613 "session": session_id,
1614 "transport": "websocket",
1615 "event": "rug_pull_resource_blocked",
1616 "uri": uri,
1617 }),
1618 )
1619 .await
1620 {
1621 tracing::warn!(
1622 "Failed to audit WS resource rug-pull block: {}",
1623 e
1624 );
1625 }
1626 let error =
1627 make_ws_error_response(Some(id), -32001, "Denied by policy");
1628 let mut sink = client_sink.lock().await;
1629 let _ = sink.send(Message::Text(error.into())).await;
1630 continue;
1631 }
1632 }
1633
1634 // Build action for resource read
1635 let mut action = extractor::extract_resource_action(uri);
1636
1637 // SECURITY (FIND-R75-002): DNS resolution for resource reads.
1638 // Parity with HTTP handler (handlers.rs:1543).
1639 if state.engine.has_ip_rules() {
1640 super::helpers::resolve_domains(&mut action).await;
1641 }
1642
1643 // SECURITY (FIND-R116-004): DLP scan on resource URI.
1644 // Parity with HTTP handler (handlers.rs:1598).
1645 {
1646 let uri_params = json!({"uri": uri});
1647 let dlp_findings = scan_parameters_for_secrets(&uri_params);
1648 if !dlp_findings.is_empty() {
1649 for finding in &dlp_findings {
1650 record_dlp_finding(&finding.pattern_name);
1651 }
1652 tracing::warn!(
1653 "SECURITY: Secret detected in WS resource URI! Session: {}, URI: [redacted]",
1654 session_id,
1655 );
1656 let audit_verdict = Verdict::Deny {
1657 reason: "DLP blocked: secret detected in resource URI"
1658 .to_string(),
1659 };
1660 if let Err(e) = state.audit.log_entry(
1661 &action, &audit_verdict,
1662 json!({
1663 "source": "ws_proxy", "session": session_id,
1664 "transport": "websocket", "event": "resource_uri_dlp_alert",
1665 }),
1666 ).await {
1667 tracing::warn!("Failed to audit WS resource URI DLP: {}", e);
1668 }
1669 let error = make_ws_error_response(
1670 Some(id),
1671 -32001,
1672 "Request blocked: security policy violation",
1673 );
1674 let mut sink = client_sink.lock().await;
1675 let _ = sink.send(Message::Text(error.into())).await;
1676 continue;
1677 }
1678 }
1679
1680 // SECURITY (FIND-R115-042): Circuit breaker check for resource reads.
1681 // Parity with HTTP handler (handlers.rs:1668) — prevent resource reads
1682 // from hammering a failing upstream server.
1683 if let Some(ref circuit_breaker) = state.circuit_breaker {
1684 if let Err(reason) = circuit_breaker.can_proceed(uri) {
1685 tracing::warn!(
1686 "SECURITY: WS circuit breaker open for resource '{}' in session {}: {}",
1687 uri,
1688 session_id,
1689 reason
1690 );
1691 let verdict = Verdict::Deny {
1692 reason: format!("Circuit breaker open: {}", reason),
1693 };
1694 if let Err(e) = state
1695 .audit
1696 .log_entry(
1697 &action,
1698 &verdict,
1699 json!({
1700 "source": "ws_proxy",
1701 "session": session_id,
1702 "transport": "websocket",
1703 "event": "circuit_breaker_rejected",
1704 "uri": uri,
1705 }),
1706 )
1707 .await
1708 {
1709 tracing::warn!(
1710 "Failed to audit WS resource circuit breaker rejection: {}",
1711 e
1712 );
1713 }
1714 let error = make_ws_error_response(
1715 Some(id),
1716 -32001,
1717 "Service temporarily unavailable",
1718 );
1719 let mut sink = client_sink.lock().await;
1720 let _ = sink.send(Message::Text(error.into())).await;
1721 continue;
1722 }
1723 }
1724
1725 // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
1726 // for resource reads. Matches ToolCall fix above and HTTP
1727 // handler FIND-R112-002 pattern (handlers.rs:1711-1774).
1728 let (verdict, ctx) = if let Some(mut session) =
1729 state.sessions.get_mut(&session_id)
1730 {
1731 let ctx = EvaluationContext {
1732 timestamp: None,
1733 agent_id: session.oauth_subject.clone(),
1734 agent_identity: session.agent_identity.clone(),
1735 call_counts: session.call_counts.clone(),
1736 previous_actions: session.action_history.iter().cloned().collect(),
1737 call_chain: session.current_call_chain.clone(),
1738 tenant_id: None,
1739 verification_tier: None,
1740 capability_token: None,
1741 session_state: None,
1742 };
1743
1744 let verdict = match state.engine.evaluate_action_with_context(
1745 &action,
1746 &state.policies,
1747 Some(&ctx),
1748 ) {
1749 Ok(v) => v,
1750 Err(e) => {
1751 tracing::error!(
1752 session_id = %session_id,
1753 "Resource policy evaluation error: {}",
1754 e
1755 );
1756 Verdict::Deny {
1757 reason: format!("Policy evaluation failed: {}", e),
1758 }
1759 }
1760 };
1761
1762 // Atomically update session on Allow
1763 if matches!(verdict, Verdict::Allow) {
1764 session.touch();
1765 use crate::proxy::call_chain::{
1766 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
1767 };
1768 let resource_key = format!(
1769 "resources/read:{}",
1770 uri.chars().take(128).collect::<String>()
1771 );
1772 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
1773 || session.call_counts.contains_key(&resource_key)
1774 {
1775 let count =
1776 session.call_counts.entry(resource_key).or_insert(0);
1777 *count = count.saturating_add(1);
1778 }
1779 if session.action_history.len() >= MAX_ACTION_HISTORY {
1780 session.action_history.pop_front();
1781 }
1782 session
1783 .action_history
1784 .push_back("resources/read".to_string());
1785 }
1786
1787 (verdict, ctx)
1788 } else {
1789 let verdict = match state.engine.evaluate_action_with_context(
1790 &action,
1791 &state.policies,
1792 None,
1793 ) {
1794 Ok(v) => v,
1795 Err(e) => {
1796 tracing::error!(
1797 session_id = %session_id,
1798 "Resource policy evaluation error: {}",
1799 e
1800 );
1801 Verdict::Deny {
1802 reason: format!("Policy evaluation failed: {}", e),
1803 }
1804 }
1805 };
1806 (verdict, EvaluationContext::default())
1807 };
1808
1809 match verdict {
1810 Verdict::Allow => {
1811 // SECURITY (FIND-R116-002): ABAC refinement for resource reads.
1812 // Parity with HTTP handler (handlers.rs:1783) and gRPC (service.rs:972).
1813 if let Some(ref abac) = state.abac_engine {
1814 let principal_id =
1815 ctx.agent_id.as_deref().unwrap_or("anonymous");
1816 let principal_type = ctx.principal_type();
1817 let session_risk = state
1818 .sessions
1819 .get_mut(&session_id)
1820 .and_then(|s| s.risk_score.clone());
1821 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
1822 eval_ctx: &ctx,
1823 principal_type,
1824 principal_id,
1825 risk_score: session_risk.as_ref(),
1826 };
1827 match abac.evaluate(&action, &abac_ctx) {
1828 vellaveto_engine::abac::AbacDecision::Deny {
1829 policy_id,
1830 reason,
1831 } => {
1832 let deny_verdict = Verdict::Deny {
1833 reason: reason.clone(),
1834 };
1835 if let Err(e) = state
1836 .audit
1837 .log_entry(
1838 &action,
1839 &deny_verdict,
1840 json!({
1841 "source": "ws_proxy",
1842 "session": session_id,
1843 "transport": "websocket",
1844 "event": "abac_deny",
1845 "abac_policy": policy_id,
1846 "uri": uri,
1847 }),
1848 )
1849 .await
1850 {
1851 tracing::warn!(
1852 "Failed to audit WS resource ABAC deny: {}",
1853 e
1854 );
1855 }
1856 let error_resp = make_ws_error_response(
1857 Some(id),
1858 -32001,
1859 "Denied by policy",
1860 );
1861 let mut sink = client_sink.lock().await;
1862 let _ =
1863 sink.send(Message::Text(error_resp.into())).await;
1864 continue;
1865 }
1866 vellaveto_engine::abac::AbacDecision::Allow {
1867 policy_id,
1868 } => {
1869 // SECURITY (FIND-R192-002): record_usage parity.
1870 if let Some(ref la) = state.least_agency {
1871 la.record_usage(
1872 principal_id,
1873 &session_id,
1874 &policy_id,
1875 uri,
1876 &action.function,
1877 );
1878 }
1879 }
1880 vellaveto_engine::abac::AbacDecision::NoMatch => {
1881 // Fall through — existing Allow stands
1882 }
1883 #[allow(unreachable_patterns)]
1884 _ => {
1885 tracing::warn!(
1886 "Unknown AbacDecision variant in WS resource_read — fail-closed"
1887 );
1888 let error_resp = make_ws_error_response(
1889 Some(id),
1890 -32001,
1891 "Denied by policy",
1892 );
1893 let mut sink = client_sink.lock().await;
1894 let _ =
1895 sink.send(Message::Text(error_resp.into())).await;
1896 continue;
1897 }
1898 }
1899 }
1900
1901 // NOTE: Session touch + call_counts/action_history
1902 // update already performed inside the TOCTOU-safe
1903 // block above (FIND-R130-002). No separate update here.
1904
1905 // SECURITY (FIND-R46-WS-004): Audit log allowed resource reads
1906 if let Err(e) = state
1907 .audit
1908 .log_entry(
1909 &action,
1910 &Verdict::Allow,
1911 json!({
1912 "source": "ws_proxy",
1913 "session": session_id,
1914 "transport": "websocket",
1915 "resource_uri": uri,
1916 }),
1917 )
1918 .await
1919 {
1920 tracing::error!(
1921 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1922 e
1923 );
1924 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1925 if state.audit_strict_mode {
1926 let error = make_ws_error_response(
1927 Some(id),
1928 -32000,
1929 "Audit logging failed — request denied (strict audit mode)",
1930 );
1931 let mut sink = client_sink.lock().await;
1932 let _ = sink.send(Message::Text(error.into())).await;
1933 continue;
1934 }
1935 }
1936
1937 // SECURITY (FIND-R46-011): Fail-closed on canonicalization
1938 // failure. Do NOT fall back to original text.
1939 let forward_text = if state.canonicalize {
1940 match serde_json::to_string(&parsed) {
1941 Ok(canonical) => canonical,
1942 Err(e) => {
1943 tracing::error!(
1944 "SECURITY: WS resource canonicalization failed: {}",
1945 e
1946 );
1947 let error_resp = make_ws_error_response(
1948 Some(id),
1949 -32603,
1950 "Internal error",
1951 );
1952 let mut sink = client_sink.lock().await;
1953 let _ =
1954 sink.send(Message::Text(error_resp.into())).await;
1955 continue;
1956 }
1957 }
1958 } else {
1959 text.to_string()
1960 };
1961 let mut sink = upstream_sink.lock().await;
1962 if let Err(e) = sink
1963 .send(tokio_tungstenite::tungstenite::Message::Text(
1964 forward_text.into(),
1965 ))
1966 .await
1967 {
1968 tracing::error!("Failed to forward resource read: {}", e);
1969 break;
1970 }
1971 }
1972 // SECURITY (FIND-R116-009): Separate handling for Deny vs RequireApproval
1973 // with per-verdict audit logging. Parity with gRPC (service.rs:1051-1076).
1974 Verdict::Deny { ref reason } => {
1975 if let Err(e) = state
1976 .audit
1977 .log_entry(
1978 &action,
1979 &Verdict::Deny {
1980 reason: reason.clone(),
1981 },
1982 json!({
1983 "source": "ws_proxy",
1984 "session": session_id,
1985 "transport": "websocket",
1986 "resource_uri": uri,
1987 }),
1988 )
1989 .await
1990 {
1991 tracing::error!(
1992 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1993 e
1994 );
1995 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1996 if state.audit_strict_mode {
1997 let error = make_ws_error_response(
1998 Some(id),
1999 -32000,
2000 "Audit logging failed — request denied (strict audit mode)",
2001 );
2002 let mut sink = client_sink.lock().await;
2003 let _ = sink.send(Message::Text(error.into())).await;
2004 continue;
2005 }
2006 }
2007 let error =
2008 make_ws_error_response(Some(id), -32001, "Denied by policy");
2009 let mut sink = client_sink.lock().await;
2010 let _ = sink.send(Message::Text(error.into())).await;
2011 }
2012 Verdict::RequireApproval { ref reason, .. } => {
2013 let deny_reason = format!("Requires approval: {}", reason);
2014 if let Err(e) = state
2015 .audit
2016 .log_entry(
2017 &action,
2018 &Verdict::Deny {
2019 reason: deny_reason,
2020 },
2021 json!({
2022 "source": "ws_proxy",
2023 "session": session_id,
2024 "transport": "websocket",
2025 "resource_uri": uri,
2026 "event": "require_approval",
2027 }),
2028 )
2029 .await
2030 {
2031 tracing::error!(
2032 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2033 e
2034 );
2035 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2036 if state.audit_strict_mode {
2037 let error = make_ws_error_response(
2038 Some(id),
2039 -32000,
2040 "Audit logging failed — request denied (strict audit mode)",
2041 );
2042 let mut sink = client_sink.lock().await;
2043 let _ = sink.send(Message::Text(error.into())).await;
2044 continue;
2045 }
2046 }
2047 let error =
2048 make_ws_error_response(Some(id), -32001, "Denied by policy");
2049 let mut sink = client_sink.lock().await;
2050 let _ = sink.send(Message::Text(error.into())).await;
2051 }
2052 #[allow(unreachable_patterns)]
2053 _ => {
2054 // SECURITY: Future variants — fail-closed.
2055 tracing::warn!(
2056 "Unknown Verdict variant in WS resource_read — fail-closed"
2057 );
2058 let error =
2059 make_ws_error_response(Some(id), -32001, "Denied by policy");
2060 let mut sink = client_sink.lock().await;
2061 let _ = sink.send(Message::Text(error.into())).await;
2062 }
2063 }
2064 }
2065 MessageType::Batch => {
2066 // Reject batches per MCP spec
2067 let error = json!({
2068 "jsonrpc": "2.0",
2069 "error": {
2070 "code": -32600,
2071 "message": "JSON-RPC batch requests are not supported"
2072 },
2073 "id": null
2074 });
2075 let error_text = serde_json::to_string(&error)
2076 .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32600,"message":"Batch not supported"},"id":null}"#.to_string());
2077 let mut sink = client_sink.lock().await;
2078 let _ = sink.send(Message::Text(error_text.into())).await;
2079 }
2080 MessageType::Invalid { ref id, ref reason } => {
2081 tracing::warn!(
2082 "Invalid JSON-RPC request in WebSocket transport: {}",
2083 reason
2084 );
2085 let error =
2086 make_ws_error_response(Some(id), -32600, "Invalid JSON-RPC request");
2087 let mut sink = client_sink.lock().await;
2088 let _ = sink.send(Message::Text(error.into())).await;
2089 }
2090 MessageType::SamplingRequest { ref id } => {
2091 // SECURITY (FIND-R74-006): Call inspect_sampling() for full
2092 // verdict (enabled + model filter + tool output check + rate limit),
2093 // matching HTTP handler parity (handlers.rs:1681).
2094 let params = parsed.get("params").cloned().unwrap_or(json!({}));
2095 // SECURITY (FIND-R125-001): Per-session sampling rate limit
2096 // parity with elicitation. Atomically read + increment.
2097 let sampling_verdict = {
2098 let mut session_ref = state.sessions.get_mut(&session_id);
2099 let current_count =
2100 session_ref.as_ref().map(|s| s.sampling_count).unwrap_or(0);
2101 let verdict = vellaveto_mcp::elicitation::inspect_sampling(
2102 ¶ms,
2103 &state.sampling_config,
2104 current_count,
2105 );
2106 if matches!(verdict, vellaveto_mcp::elicitation::SamplingVerdict::Allow)
2107 {
2108 if let Some(ref mut s) = session_ref {
2109 s.sampling_count = s.sampling_count.saturating_add(1);
2110 }
2111 }
2112 verdict
2113 };
2114 match sampling_verdict {
2115 vellaveto_mcp::elicitation::SamplingVerdict::Allow => {
2116 // Forward allowed sampling request
2117 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2118 // Falling back to original text would create a TOCTOU gap.
2119 let forward_text = if state.canonicalize {
2120 match serde_json::to_string(&parsed) {
2121 Ok(canonical) => canonical,
2122 Err(e) => {
2123 tracing::error!(
2124 "SECURITY: WS sampling canonicalization failed: {}",
2125 e
2126 );
2127 let error_resp = make_ws_error_response(
2128 Some(id),
2129 -32603,
2130 "Internal error",
2131 );
2132 let mut sink = client_sink.lock().await;
2133 let _ =
2134 sink.send(Message::Text(error_resp.into())).await;
2135 continue;
2136 }
2137 }
2138 } else {
2139 text.to_string()
2140 };
2141 let mut sink = upstream_sink.lock().await;
2142 let _ = sink
2143 .send(tokio_tungstenite::tungstenite::Message::Text(
2144 forward_text.into(),
2145 ))
2146 .await;
2147 }
2148 vellaveto_mcp::elicitation::SamplingVerdict::Deny { reason } => {
2149 tracing::warn!(
2150 session_id = %session_id,
2151 "Blocked WS sampling/createMessage: {}",
2152 reason
2153 );
2154 let action = Action::new(
2155 "vellaveto",
2156 "ws_sampling_interception",
2157 json!({
2158 "method": "sampling/createMessage",
2159 "session": session_id,
2160 "transport": "websocket",
2161 "reason": &reason,
2162 }),
2163 );
2164 let verdict = Verdict::Deny {
2165 reason: reason.clone(),
2166 };
2167 if let Err(e) = state
2168 .audit
2169 .log_entry(
2170 &action,
2171 &verdict,
2172 json!({
2173 "source": "ws_proxy",
2174 "event": "ws_sampling_interception",
2175 }),
2176 )
2177 .await
2178 {
2179 tracing::warn!(
2180 "Failed to audit WS sampling interception: {}",
2181 e
2182 );
2183 }
2184 // SECURITY: Generic message to client — detailed reason
2185 // is in the audit log, not leaked to the client.
2186 let error = make_ws_error_response(
2187 Some(id),
2188 -32001,
2189 "sampling/createMessage blocked by policy",
2190 );
2191 let mut sink = client_sink.lock().await;
2192 let _ = sink.send(Message::Text(error.into())).await;
2193 }
2194 }
2195 }
2196 MessageType::TaskRequest {
2197 ref id,
2198 ref task_method,
2199 ref task_id,
2200 } => {
2201 // SECURITY (FIND-R76-001): Memory poisoning detection on task params.
2202 // Parity with HTTP handler (handlers.rs:2027-2084). Agents could
2203 // exfiltrate poisoned data via task management operations.
2204 {
2205 let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
2206 let poisoning_detected = state
2207 .sessions
2208 .get_mut(&session_id)
2209 .and_then(|session| {
2210 let matches =
2211 session.memory_tracker.check_parameters(&task_params);
2212 if !matches.is_empty() {
2213 for m in &matches {
2214 tracing::warn!(
2215 "SECURITY: Memory poisoning detected in WS task '{}' (session {}): \
2216 param '{}' contains replayed data (fingerprint: {})",
2217 task_method,
2218 session_id,
2219 m.param_location,
2220 m.fingerprint
2221 );
2222 }
2223 Some(matches.len())
2224 } else {
2225 None
2226 }
2227 });
2228 if let Some(match_count) = poisoning_detected {
2229 let poison_action =
2230 extractor::extract_task_action(task_method, task_id.as_deref());
2231 let deny_reason = format!(
2232 "Memory poisoning detected: {} replayed data fragment(s) in task '{}'",
2233 match_count, task_method
2234 );
2235 if let Err(e) = state
2236 .audit
2237 .log_entry(
2238 &poison_action,
2239 &Verdict::Deny {
2240 reason: deny_reason,
2241 },
2242 json!({
2243 "source": "ws_proxy",
2244 "session": session_id,
2245 "transport": "websocket",
2246 "event": "memory_poisoning_detected",
2247 "matches": match_count,
2248 "task_method": task_method,
2249 }),
2250 )
2251 .await
2252 {
2253 tracing::warn!(
2254 "Failed to audit WS task memory poisoning: {}",
2255 e
2256 );
2257 }
2258 let error = make_ws_error_response(
2259 Some(id),
2260 -32001,
2261 "Request blocked: security policy violation",
2262 );
2263 let mut sink = client_sink.lock().await;
2264 let _ = sink.send(Message::Text(error.into())).await;
2265 continue;
2266 }
2267 }
2268
2269 // SECURITY (FIND-R76-001): DLP scan task request parameters.
2270 // Parity with HTTP handler (handlers.rs:2086-2145). Agents could
2271 // embed secrets in task_id or params to exfiltrate them.
2272 {
2273 let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
2274 let dlp_findings = scan_parameters_for_secrets(&task_params);
2275 if !dlp_findings.is_empty() {
2276 for finding in &dlp_findings {
2277 record_dlp_finding(&finding.pattern_name);
2278 }
2279 let patterns: Vec<String> = dlp_findings
2280 .iter()
2281 .map(|f| format!("{} at {}", f.pattern_name, f.location))
2282 .collect();
2283 tracing::warn!(
2284 "SECURITY: DLP blocking WS task '{}' in session {}: {:?}",
2285 task_method,
2286 session_id,
2287 patterns
2288 );
2289 let dlp_action =
2290 extractor::extract_task_action(task_method, task_id.as_deref());
2291 if let Err(e) = state
2292 .audit
2293 .log_entry(
2294 &dlp_action,
2295 &Verdict::Deny {
2296 reason: format!(
2297 "DLP: secrets detected in task request: {:?}",
2298 patterns
2299 ),
2300 },
2301 json!({
2302 "source": "ws_proxy",
2303 "session": session_id,
2304 "transport": "websocket",
2305 "event": "dlp_secret_detected_task",
2306 "task_method": task_method,
2307 "findings": patterns,
2308 }),
2309 )
2310 .await
2311 {
2312 tracing::warn!("Failed to audit WS task DLP: {}", e);
2313 }
2314 let error = make_ws_error_response(
2315 Some(id),
2316 -32001,
2317 "Request blocked: security policy violation",
2318 );
2319 let mut sink = client_sink.lock().await;
2320 let _ = sink.send(Message::Text(error.into())).await;
2321 continue;
2322 }
2323 }
2324
2325 // Policy-evaluate task requests (async operations)
2326 let action =
2327 extractor::extract_task_action(task_method, task_id.as_deref());
2328 // SECURITY (FIND-R130-002): TOCTOU-safe context+eval for task
2329 // requests. Context is built inside the DashMap shard lock to
2330 // prevent stale snapshot evaluation races.
2331 // SECURITY (FIND-R190-006): Update session state on Allow
2332 // (touch + call_counts + action_history) while still holding
2333 // the shard lock, matching ToolCall/ResourceRead parity.
2334 let (verdict, task_eval_ctx) = if let Some(mut session) =
2335 state.sessions.get_mut(&session_id)
2336 {
2337 let ctx = EvaluationContext {
2338 timestamp: None,
2339 agent_id: session.oauth_subject.clone(),
2340 agent_identity: session.agent_identity.clone(),
2341 call_counts: session.call_counts.clone(),
2342 previous_actions: session.action_history.iter().cloned().collect(),
2343 call_chain: session.current_call_chain.clone(),
2344 tenant_id: None,
2345 verification_tier: None,
2346 capability_token: None,
2347 session_state: None,
2348 };
2349 let verdict = match state.engine.evaluate_action_with_context(
2350 &action,
2351 &state.policies,
2352 Some(&ctx),
2353 ) {
2354 Ok(v) => v,
2355 Err(e) => {
2356 tracing::error!(
2357 session_id = %session_id,
2358 "Task policy evaluation error: {}", e
2359 );
2360 Verdict::Deny {
2361 reason: format!("Policy evaluation failed: {}", e),
2362 }
2363 }
2364 };
2365
2366 // Update session atomically on Allow
2367 if matches!(verdict, Verdict::Allow) {
2368 session.touch();
2369 use crate::proxy::call_chain::{
2370 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
2371 };
2372 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
2373 || session.call_counts.contains_key(task_method)
2374 {
2375 let count = session
2376 .call_counts
2377 .entry(task_method.to_string())
2378 .or_insert(0);
2379 *count = count.saturating_add(1);
2380 }
2381 if session.action_history.len() >= MAX_ACTION_HISTORY {
2382 session.action_history.pop_front();
2383 }
2384 session.action_history.push_back(task_method.to_string());
2385 }
2386
2387 (verdict, ctx)
2388 } else {
2389 let verdict = match state.engine.evaluate_action_with_context(
2390 &action,
2391 &state.policies,
2392 None,
2393 ) {
2394 Ok(v) => v,
2395 Err(e) => {
2396 tracing::error!(
2397 session_id = %session_id,
2398 "Task policy evaluation error: {}", e
2399 );
2400 Verdict::Deny {
2401 reason: format!("Policy evaluation failed: {}", e),
2402 }
2403 }
2404 };
2405 (verdict, EvaluationContext::default())
2406 };
2407
2408 match verdict {
2409 Verdict::Allow => {
2410 // SECURITY (FIND-R190-001): ABAC refinement for TaskRequest,
2411 // matching ToolCall/ResourceRead parity.
2412 if let Some(ref abac) = state.abac_engine {
2413 let principal_id =
2414 task_eval_ctx.agent_id.as_deref().unwrap_or("anonymous");
2415 let principal_type = task_eval_ctx.principal_type();
2416 let session_risk = state
2417 .sessions
2418 .get_mut(&session_id)
2419 .and_then(|s| s.risk_score.clone());
2420 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
2421 eval_ctx: &task_eval_ctx,
2422 principal_type,
2423 principal_id,
2424 risk_score: session_risk.as_ref(),
2425 };
2426 match abac.evaluate(&action, &abac_ctx) {
2427 vellaveto_engine::abac::AbacDecision::Deny {
2428 policy_id,
2429 reason,
2430 } => {
2431 let deny_verdict = Verdict::Deny {
2432 reason: reason.clone(),
2433 };
2434 if let Err(e) = state
2435 .audit
2436 .log_entry(
2437 &action,
2438 &deny_verdict,
2439 json!({
2440 "source": "ws_proxy",
2441 "session": session_id,
2442 "transport": "websocket",
2443 "event": "abac_deny",
2444 "abac_policy": policy_id,
2445 "task_method": task_method,
2446 }),
2447 )
2448 .await
2449 {
2450 tracing::warn!(
2451 "Failed to audit WS task ABAC deny: {}",
2452 e
2453 );
2454 }
2455 let error_resp = make_ws_error_response(
2456 Some(id),
2457 -32001,
2458 "Denied by policy",
2459 );
2460 let mut sink = client_sink.lock().await;
2461 let _ =
2462 sink.send(Message::Text(error_resp.into())).await;
2463 continue;
2464 }
2465 vellaveto_engine::abac::AbacDecision::Allow {
2466 policy_id,
2467 } => {
2468 if let Some(ref la) = state.least_agency {
2469 la.record_usage(
2470 principal_id,
2471 &session_id,
2472 &policy_id,
2473 task_method,
2474 &action.function,
2475 );
2476 }
2477 }
2478 vellaveto_engine::abac::AbacDecision::NoMatch => {
2479 // Fall through — existing Allow stands
2480 }
2481 #[allow(unreachable_patterns)]
2482 _ => {
2483 tracing::warn!(
2484 "Unknown AbacDecision variant — fail-closed"
2485 );
2486 let error_resp = make_ws_error_response(
2487 Some(id),
2488 -32001,
2489 "Denied by policy",
2490 );
2491 let mut sink = client_sink.lock().await;
2492 let _ =
2493 sink.send(Message::Text(error_resp.into())).await;
2494 continue;
2495 }
2496 }
2497 }
2498
2499 if let Err(e) = state
2500 .audit
2501 .log_entry(
2502 &action,
2503 &Verdict::Allow,
2504 json!({
2505 "source": "ws_proxy",
2506 "session": session_id,
2507 "transport": "websocket",
2508 "task_method": task_method,
2509 }),
2510 )
2511 .await
2512 {
2513 tracing::warn!("Failed to audit WS task allow: {}", e);
2514 }
2515 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2516 let forward_text = if state.canonicalize {
2517 match serde_json::to_string(&parsed) {
2518 Ok(canonical) => canonical,
2519 Err(e) => {
2520 tracing::error!(
2521 "SECURITY: WS task canonicalization failed: {}",
2522 e
2523 );
2524 let error_resp = make_ws_error_response(
2525 Some(id),
2526 -32603,
2527 "Internal error",
2528 );
2529 let mut sink = client_sink.lock().await;
2530 let _ =
2531 sink.send(Message::Text(error_resp.into())).await;
2532 continue;
2533 }
2534 }
2535 } else {
2536 text.to_string()
2537 };
2538 let mut sink = upstream_sink.lock().await;
2539 if let Err(e) = sink
2540 .send(tokio_tungstenite::tungstenite::Message::Text(
2541 forward_text.into(),
2542 ))
2543 .await
2544 {
2545 tracing::error!("Failed to forward task request: {}", e);
2546 break;
2547 }
2548 }
2549 Verdict::Deny { ref reason } => {
2550 if let Err(e) = state
2551 .audit
2552 .log_entry(
2553 &action,
2554 &Verdict::Deny {
2555 reason: reason.clone(),
2556 },
2557 json!({
2558 "source": "ws_proxy",
2559 "session": session_id,
2560 "transport": "websocket",
2561 "task_method": task_method,
2562 }),
2563 )
2564 .await
2565 {
2566 tracing::error!(
2567 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2568 e
2569 );
2570 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
2571 if state.audit_strict_mode {
2572 let error = make_ws_error_response(
2573 Some(id),
2574 -32000,
2575 "Audit logging failed — request denied (strict audit mode)",
2576 );
2577 let mut sink = client_sink.lock().await;
2578 let _ = sink.send(Message::Text(error.into())).await;
2579 continue;
2580 }
2581 }
2582 // SECURITY (FIND-R55-WS-005): Generic denial message to prevent
2583 // leaking policy names/details. Detailed reason is in audit log.
2584 let denial =
2585 make_ws_error_response(Some(id), -32001, "Denied by policy");
2586 let mut sink = client_sink.lock().await;
2587 let _ = sink.send(Message::Text(denial.into())).await;
2588 }
2589 Verdict::RequireApproval { ref reason, .. } => {
2590 let deny_reason = format!("Requires approval: {}", reason);
2591 if let Err(e) = state
2592 .audit
2593 .log_entry(
2594 &action,
2595 &Verdict::Deny {
2596 reason: deny_reason,
2597 },
2598 json!({
2599 "source": "ws_proxy",
2600 "session": session_id,
2601 "transport": "websocket",
2602 "task_method": task_method,
2603 }),
2604 )
2605 .await
2606 {
2607 tracing::error!(
2608 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2609 e
2610 );
2611 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
2612 if state.audit_strict_mode {
2613 let error = make_ws_error_response(
2614 Some(id),
2615 -32000,
2616 "Audit logging failed — request denied (strict audit mode)",
2617 );
2618 let mut sink = client_sink.lock().await;
2619 let _ = sink.send(Message::Text(error.into())).await;
2620 continue;
2621 }
2622 }
2623 let approval_reason = "Approval required";
2624 let approval_id =
2625 create_ws_approval(&state, &session_id, &action, reason).await;
2626 let denial = make_ws_error_response_with_data(
2627 Some(id),
2628 -32001,
2629 approval_reason,
2630 Some(json!({
2631 "verdict": "require_approval",
2632 "reason": approval_reason,
2633 "approval_id": approval_id,
2634 })),
2635 );
2636 let mut sink = client_sink.lock().await;
2637 let _ = sink.send(Message::Text(denial.into())).await;
2638 }
2639 _ => {
2640 let denial =
2641 make_ws_error_response(Some(id), -32001, "Denied by policy");
2642 let mut sink = client_sink.lock().await;
2643 let _ = sink.send(Message::Text(denial.into())).await;
2644 }
2645 }
2646 }
2647 MessageType::ExtensionMethod {
2648 ref id,
2649 ref extension_id,
2650 ref method,
2651 } => {
2652 // Policy-evaluate extension method calls
2653 let params = parsed.get("params").cloned().unwrap_or(json!({}));
2654
2655 // SECURITY (FIND-R116-001): DLP scan extension method parameters.
2656 // Parity with gRPC handle_extension_method (service.rs:1542).
2657 let dlp_findings = scan_parameters_for_secrets(¶ms);
2658 if !dlp_findings.is_empty() {
2659 for finding in &dlp_findings {
2660 record_dlp_finding(&finding.pattern_name);
2661 }
2662 let patterns: Vec<String> = dlp_findings
2663 .iter()
2664 .map(|f| format!("{}:{}", f.pattern_name, f.location))
2665 .collect();
2666 tracing::warn!(
2667 "SECURITY: Secrets in WS extension method parameters! Session: {}, Extension: {}:{}, Findings: {:?}",
2668 session_id, extension_id, method, patterns,
2669 );
2670 let action =
2671 extractor::extract_extension_action(extension_id, method, ¶ms);
2672 let audit_verdict = Verdict::Deny {
2673 reason: format!(
2674 "DLP blocked: secret detected in extension parameters: {:?}",
2675 patterns
2676 ),
2677 };
2678 if let Err(e) = state.audit.log_entry(
2679 &action, &audit_verdict,
2680 json!({
2681 "source": "ws_proxy", "session": session_id, "transport": "websocket",
2682 "event": "ws_extension_parameter_dlp_alert",
2683 "extension_id": extension_id, "method": method, "findings": patterns,
2684 }),
2685 ).await {
2686 tracing::warn!("Failed to audit WS extension parameter DLP: {}", e);
2687 }
2688 let denial =
2689 make_ws_error_response(Some(id), -32001, "Denied by policy");
2690 let mut sink = client_sink.lock().await;
2691 let _ = sink.send(Message::Text(denial.into())).await;
2692 continue;
2693 }
2694
2695 // SECURITY (FIND-R116-001): Memory poisoning detection for extension params.
2696 // Parity with gRPC handle_extension_method (service.rs:1574).
2697 if let Some(session) = state.sessions.get_mut(&session_id) {
2698 let poisoning_matches =
2699 session.memory_tracker.check_parameters(¶ms);
2700 if !poisoning_matches.is_empty() {
2701 for m in &poisoning_matches {
2702 tracing::warn!(
2703 "SECURITY: Memory poisoning in WS extension '{}:{}' (session {}): \
2704 param '{}' replayed data (fingerprint: {})",
2705 extension_id, method, session_id, m.param_location, m.fingerprint
2706 );
2707 }
2708 let action = extractor::extract_extension_action(
2709 extension_id,
2710 method,
2711 ¶ms,
2712 );
2713 let deny_reason = format!(
2714 "Memory poisoning detected: {} replayed data fragment(s) in extension '{}:{}'",
2715 poisoning_matches.len(), extension_id, method
2716 );
2717 if let Err(e) = state.audit.log_entry(
2718 &action,
2719 &Verdict::Deny { reason: deny_reason.clone() },
2720 json!({
2721 "source": "ws_proxy", "session": session_id, "transport": "websocket",
2722 "event": "memory_poisoning_detected",
2723 "matches": poisoning_matches.len(),
2724 "extension_id": extension_id, "method": method,
2725 }),
2726 ).await {
2727 tracing::warn!("Failed to audit WS extension memory poisoning: {}", e);
2728 }
2729 let denial =
2730 make_ws_error_response(Some(id), -32001, "Denied by policy");
2731 let mut sink = client_sink.lock().await;
2732 let _ = sink.send(Message::Text(denial.into())).await;
2733 continue;
2734 }
2735 }
2736
2737 let mut action =
2738 extractor::extract_extension_action(extension_id, method, ¶ms);
2739
2740 // SECURITY (FIND-R118-004): DNS resolution for extension methods.
2741 // Parity with ToolCall (line 710) and ResourceRead (line 1439).
2742 if state.engine.has_ip_rules() {
2743 super::helpers::resolve_domains(&mut action).await;
2744 }
2745
2746 let ext_key = format!("extension:{}:{}", extension_id, method);
2747
2748 // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
2749 // for extension methods. Matches ToolCall/ResourceRead fixes.
2750 let (verdict, ctx) = if let Some(mut session) =
2751 state.sessions.get_mut(&session_id)
2752 {
2753 let ctx = EvaluationContext {
2754 timestamp: None,
2755 agent_id: session.oauth_subject.clone(),
2756 agent_identity: session.agent_identity.clone(),
2757 call_counts: session.call_counts.clone(),
2758 previous_actions: session.action_history.iter().cloned().collect(),
2759 call_chain: session.current_call_chain.clone(),
2760 tenant_id: None,
2761 verification_tier: None,
2762 capability_token: None,
2763 session_state: None,
2764 };
2765
2766 let verdict = match state.engine.evaluate_action_with_context(
2767 &action,
2768 &state.policies,
2769 Some(&ctx),
2770 ) {
2771 Ok(v) => v,
2772 Err(e) => {
2773 tracing::error!(
2774 session_id = %session_id,
2775 "Extension policy evaluation error: {}", e
2776 );
2777 Verdict::Deny {
2778 reason: format!("Policy evaluation failed: {}", e),
2779 }
2780 }
2781 };
2782
2783 // Atomically update session on Allow
2784 if matches!(verdict, Verdict::Allow) {
2785 session.touch();
2786 use crate::proxy::call_chain::{
2787 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
2788 };
2789 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
2790 || session.call_counts.contains_key(&ext_key)
2791 {
2792 let count =
2793 session.call_counts.entry(ext_key.clone()).or_insert(0);
2794 *count = count.saturating_add(1);
2795 }
2796 if session.action_history.len() >= MAX_ACTION_HISTORY {
2797 session.action_history.pop_front();
2798 }
2799 session.action_history.push_back(ext_key.clone());
2800 }
2801
2802 (verdict, ctx)
2803 } else {
2804 let verdict = match state.engine.evaluate_action_with_context(
2805 &action,
2806 &state.policies,
2807 None,
2808 ) {
2809 Ok(v) => v,
2810 Err(e) => {
2811 tracing::error!(
2812 session_id = %session_id,
2813 "Extension policy evaluation error: {}", e
2814 );
2815 Verdict::Deny {
2816 reason: format!("Policy evaluation failed: {}", e),
2817 }
2818 }
2819 };
2820 (verdict, EvaluationContext::default())
2821 };
2822
2823 match verdict {
2824 Verdict::Allow => {
2825 // SECURITY (FIND-R118-002): ABAC refinement for extension methods.
2826 // Parity with ToolCall (line 1099) and ResourceRead (line 1498).
2827 if let Some(ref abac) = state.abac_engine {
2828 let principal_id =
2829 ctx.agent_id.as_deref().unwrap_or("anonymous");
2830 let principal_type = ctx.principal_type();
2831 let session_risk = state
2832 .sessions
2833 .get_mut(&session_id)
2834 .and_then(|s| s.risk_score.clone());
2835 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
2836 eval_ctx: &ctx,
2837 principal_type,
2838 principal_id,
2839 risk_score: session_risk.as_ref(),
2840 };
2841 match abac.evaluate(&action, &abac_ctx) {
2842 vellaveto_engine::abac::AbacDecision::Deny {
2843 policy_id,
2844 reason,
2845 } => {
2846 let deny_verdict = Verdict::Deny {
2847 reason: reason.clone(),
2848 };
2849 if let Err(e) = state
2850 .audit
2851 .log_entry(
2852 &action,
2853 &deny_verdict,
2854 json!({
2855 "source": "ws_proxy",
2856 "session": session_id,
2857 "transport": "websocket",
2858 "event": "abac_deny",
2859 "extension_id": extension_id,
2860 "abac_policy": policy_id,
2861 }),
2862 )
2863 .await
2864 {
2865 tracing::warn!(
2866 "Failed to audit WS extension ABAC deny: {}",
2867 e
2868 );
2869 }
2870 let error_resp = make_ws_error_response(
2871 Some(id),
2872 -32001,
2873 "Denied by policy",
2874 );
2875 let mut sink = client_sink.lock().await;
2876 let _ =
2877 sink.send(Message::Text(error_resp.into())).await;
2878 continue;
2879 }
2880 vellaveto_engine::abac::AbacDecision::Allow {
2881 policy_id,
2882 } => {
2883 if let Some(ref la) = state.least_agency {
2884 la.record_usage(
2885 principal_id,
2886 &session_id,
2887 &policy_id,
2888 &ext_key,
2889 method,
2890 );
2891 }
2892 }
2893 vellaveto_engine::abac::AbacDecision::NoMatch => {
2894 // Fall through — existing Allow stands
2895 }
2896 #[allow(unreachable_patterns)]
2897 // AbacDecision is #[non_exhaustive]
2898 _ => {
2899 // SECURITY: Future variants — fail-closed (deny).
2900 tracing::warn!(
2901 "Unknown AbacDecision variant — fail-closed"
2902 );
2903 let error_resp = make_ws_error_response(
2904 Some(id),
2905 -32001,
2906 "Denied by policy",
2907 );
2908 let mut sink = client_sink.lock().await;
2909 let _ =
2910 sink.send(Message::Text(error_resp.into())).await;
2911 continue;
2912 }
2913 }
2914 }
2915
2916 // NOTE: Session touch + call_counts/action_history
2917 // update already performed inside the TOCTOU-safe
2918 // block above (FIND-R130-002). No separate update here.
2919
2920 if let Err(e) = state
2921 .audit
2922 .log_entry(
2923 &action,
2924 &Verdict::Allow,
2925 json!({
2926 "source": "ws_proxy",
2927 "session": session_id,
2928 "transport": "websocket",
2929 "extension_id": extension_id,
2930 }),
2931 )
2932 .await
2933 {
2934 tracing::error!(
2935 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2936 e
2937 );
2938 // SECURITY (FIND-R215-007): Strict audit mode — fail-closed.
2939 // Parity with Deny and RequireApproval paths.
2940 if state.audit_strict_mode {
2941 let error = make_ws_error_response(
2942 Some(id),
2943 -32000,
2944 "Audit logging failed — request denied (strict audit mode)",
2945 );
2946 let mut sink = client_sink.lock().await;
2947 let _ = sink.send(Message::Text(error.into())).await;
2948 continue;
2949 }
2950 }
2951 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2952 let forward_text = if state.canonicalize {
2953 match serde_json::to_string(&parsed) {
2954 Ok(canonical) => canonical,
2955 Err(e) => {
2956 tracing::error!("SECURITY: WS extension canonicalization failed: {}", e);
2957 let error_resp = make_ws_error_response(
2958 Some(id),
2959 -32603,
2960 "Internal error",
2961 );
2962 let mut sink = client_sink.lock().await;
2963 let _ =
2964 sink.send(Message::Text(error_resp.into())).await;
2965 continue;
2966 }
2967 }
2968 } else {
2969 text.to_string()
2970 };
2971 let mut sink = upstream_sink.lock().await;
2972 if let Err(e) = sink
2973 .send(tokio_tungstenite::tungstenite::Message::Text(
2974 forward_text.into(),
2975 ))
2976 .await
2977 {
2978 tracing::error!("Failed to forward extension request: {}", e);
2979 break;
2980 }
2981 }
2982 Verdict::Deny { ref reason } => {
2983 if let Err(e) = state
2984 .audit
2985 .log_entry(
2986 &action,
2987 &Verdict::Deny {
2988 reason: reason.clone(),
2989 },
2990 json!({
2991 "source": "ws_proxy",
2992 "session": session_id,
2993 "transport": "websocket",
2994 "extension_id": extension_id,
2995 }),
2996 )
2997 .await
2998 {
2999 tracing::error!(
3000 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3001 e
3002 );
3003 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3004 if state.audit_strict_mode {
3005 let error = make_ws_error_response(
3006 Some(id),
3007 -32000,
3008 "Audit logging failed — request denied (strict audit mode)",
3009 );
3010 let mut sink = client_sink.lock().await;
3011 let _ = sink.send(Message::Text(error.into())).await;
3012 continue;
3013 }
3014 }
3015 // SECURITY (FIND-R213-001): Generic denial message — do not leak
3016 // detailed policy reason to client. Reason is in the audit log.
3017 let _ = reason;
3018 let denial =
3019 make_ws_error_response(Some(id), -32001, "Denied by policy");
3020 let mut sink = client_sink.lock().await;
3021 let _ = sink.send(Message::Text(denial.into())).await;
3022 }
3023 Verdict::RequireApproval { ref reason, .. } => {
3024 let deny_reason = format!("Requires approval: {}", reason);
3025 if let Err(e) = state
3026 .audit
3027 .log_entry(
3028 &action,
3029 &Verdict::Deny {
3030 reason: deny_reason,
3031 },
3032 json!({
3033 "source": "ws_proxy",
3034 "session": session_id,
3035 "transport": "websocket",
3036 "extension_id": extension_id,
3037 }),
3038 )
3039 .await
3040 {
3041 tracing::error!(
3042 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3043 e
3044 );
3045 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3046 if state.audit_strict_mode {
3047 let error = make_ws_error_response(
3048 Some(id),
3049 -32000,
3050 "Audit logging failed — request denied (strict audit mode)",
3051 );
3052 let mut sink = client_sink.lock().await;
3053 let _ = sink.send(Message::Text(error.into())).await;
3054 continue;
3055 }
3056 }
3057 let approval_reason = "Approval required";
3058 let approval_id =
3059 create_ws_approval(&state, &session_id, &action, reason).await;
3060 let denial = make_ws_error_response_with_data(
3061 Some(id),
3062 -32001,
3063 approval_reason,
3064 Some(json!({
3065 "verdict": "require_approval",
3066 "reason": approval_reason,
3067 "approval_id": approval_id,
3068 })),
3069 );
3070 let mut sink = client_sink.lock().await;
3071 let _ = sink.send(Message::Text(denial.into())).await;
3072 }
3073 _ => {
3074 let denial =
3075 make_ws_error_response(Some(id), -32001, "Denied by policy");
3076 let mut sink = client_sink.lock().await;
3077 let _ = sink.send(Message::Text(denial.into())).await;
3078 }
3079 }
3080 }
3081 MessageType::ElicitationRequest { ref id } => {
3082 // SECURITY (FIND-R46-010): Policy checks for elicitation requests.
3083 // Match the HTTP POST handler's elicitation inspection logic.
3084 let params = parsed.get("params").cloned().unwrap_or(json!({}));
3085 let elicitation_verdict = {
3086 let mut session_ref = state.sessions.get_mut(&session_id);
3087 let current_count = session_ref
3088 .as_ref()
3089 .map(|s| s.elicitation_count)
3090 .unwrap_or(0);
3091 let verdict = vellaveto_mcp::elicitation::inspect_elicitation(
3092 ¶ms,
3093 &state.elicitation_config,
3094 current_count,
3095 );
3096 // Pre-increment while holding the lock to close the TOCTOU gap
3097 if matches!(
3098 verdict,
3099 vellaveto_mcp::elicitation::ElicitationVerdict::Allow
3100 ) {
3101 if let Some(ref mut s) = session_ref {
3102 // SECURITY (FIND-R51-008): Use saturating_add for consistency.
3103 s.elicitation_count = s.elicitation_count.saturating_add(1);
3104 }
3105 }
3106 verdict
3107 };
3108 match elicitation_verdict {
3109 vellaveto_mcp::elicitation::ElicitationVerdict::Allow => {
3110 let action = Action::new(
3111 "vellaveto",
3112 "ws_forward_message",
3113 json!({
3114 "message_type": "elicitation_request",
3115 "session": session_id,
3116 "transport": "websocket",
3117 "direction": "client_to_upstream",
3118 }),
3119 );
3120 if let Err(e) = state
3121 .audit
3122 .log_entry(
3123 &action,
3124 &Verdict::Allow,
3125 json!({
3126 "source": "ws_proxy",
3127 "event": "ws_elicitation_forwarded",
3128 }),
3129 )
3130 .await
3131 {
3132 tracing::warn!("Failed to audit WS elicitation: {}", e);
3133 }
3134
3135 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3136 let forward_text = if state.canonicalize {
3137 match serde_json::to_string(&parsed) {
3138 Ok(canonical) => canonical,
3139 Err(e) => {
3140 tracing::error!("SECURITY: WS elicitation canonicalization failed: {}", e);
3141 let error_resp = make_ws_error_response(
3142 Some(id),
3143 -32603,
3144 "Internal error",
3145 );
3146 let mut sink = client_sink.lock().await;
3147 let _ =
3148 sink.send(Message::Text(error_resp.into())).await;
3149 continue;
3150 }
3151 }
3152 } else {
3153 text.to_string()
3154 };
3155 let mut sink = upstream_sink.lock().await;
3156 if let Err(e) = sink
3157 .send(tokio_tungstenite::tungstenite::Message::Text(
3158 forward_text.into(),
3159 ))
3160 .await
3161 {
3162 // Rollback pre-incremented count on forward failure
3163 if let Some(mut s) = state.sessions.get_mut(&session_id) {
3164 s.elicitation_count = s.elicitation_count.saturating_sub(1);
3165 }
3166 tracing::error!("Failed to forward elicitation: {}", e);
3167 break;
3168 }
3169 }
3170 vellaveto_mcp::elicitation::ElicitationVerdict::Deny { reason } => {
3171 tracing::warn!(
3172 session_id = %session_id,
3173 "Blocked WS elicitation/create: {}",
3174 reason
3175 );
3176 let action = Action::new(
3177 "vellaveto",
3178 "ws_elicitation_interception",
3179 json!({
3180 "method": "elicitation/create",
3181 "session": session_id,
3182 "transport": "websocket",
3183 "reason": &reason,
3184 }),
3185 );
3186 let verdict = Verdict::Deny {
3187 reason: reason.clone(),
3188 };
3189 if let Err(e) = state
3190 .audit
3191 .log_entry(
3192 &action,
3193 &verdict,
3194 json!({
3195 "source": "ws_proxy",
3196 "event": "ws_elicitation_interception",
3197 }),
3198 )
3199 .await
3200 {
3201 tracing::warn!(
3202 "Failed to audit WS elicitation interception: {}",
3203 e
3204 );
3205 }
3206 // SECURITY (FIND-R46-012, FIND-R55-WS-006): Generic message to client.
3207 let error =
3208 make_ws_error_response(Some(id), -32001, "Denied by policy");
3209 let mut sink = client_sink.lock().await;
3210 let _ = sink.send(Message::Text(error.into())).await;
3211 }
3212 }
3213 }
3214 MessageType::PassThrough | MessageType::ProgressNotification { .. } => {
3215 // SECURITY (FIND-R76-003): DLP scan PassThrough params for secrets.
3216 // Parity with HTTP handler (handlers.rs:1795-1859). Agents could
3217 // exfiltrate secrets via prompts/get, completion/complete, or any
3218 // PassThrough method's parameters.
3219 // SECURITY (FIND-R97-001): Remove method gate — JSON-RPC responses
3220 // (sampling/elicitation replies) have no `method` field but carry
3221 // data in `result`. Parity with stdio proxy FIND-R96-001.
3222 if state.response_dlp_enabled {
3223 let mut dlp_findings = scan_notification_for_secrets(&parsed);
3224 // SECURITY (FIND-R97-001): Also scan `result` field for responses.
3225 if let Some(result_val) = parsed.get("result") {
3226 dlp_findings.extend(scan_parameters_for_secrets(result_val));
3227 }
3228 // SECURITY (FIND-R83-006): Cap combined findings from params+result
3229 // scans to maintain per-scan invariant (1000).
3230 dlp_findings.truncate(1000);
3231 if !dlp_findings.is_empty() {
3232 for finding in &dlp_findings {
3233 record_dlp_finding(&finding.pattern_name);
3234 }
3235 let patterns: Vec<String> = dlp_findings
3236 .iter()
3237 .map(|f| format!("{}:{}", f.pattern_name, f.location))
3238 .collect();
3239 tracing::warn!(
3240 "SECURITY: Secrets in WS passthrough params! Session: {}, Findings: {:?}",
3241 session_id,
3242 patterns
3243 );
3244 let n_action = Action::new(
3245 "vellaveto",
3246 "notification_dlp_scan",
3247 json!({
3248 "findings": patterns,
3249 "session": session_id,
3250 "transport": "websocket",
3251 }),
3252 );
3253 let verdict = if state.response_dlp_blocking {
3254 Verdict::Deny {
3255 reason: format!(
3256 "Notification blocked: secrets detected ({:?})",
3257 patterns
3258 ),
3259 }
3260 } else {
3261 Verdict::Allow
3262 };
3263 if let Err(e) = state
3264 .audit
3265 .log_entry(
3266 &n_action,
3267 &verdict,
3268 json!({
3269 "source": "ws_proxy",
3270 "event": "notification_dlp_alert",
3271 "blocked": state.response_dlp_blocking,
3272 }),
3273 )
3274 .await
3275 {
3276 tracing::warn!("Failed to audit WS passthrough DLP: {}", e);
3277 }
3278 if state.response_dlp_blocking {
3279 // Drop the message silently (passthrough has no id to respond to)
3280 continue;
3281 }
3282 }
3283 }
3284
3285 // SECURITY (FIND-R130-001): Injection scanning on PassThrough parameters.
3286 // Parity with HTTP handler (handlers.rs FIND-R112-008) and gRPC handler
3287 // (service.rs FIND-R113-001). An agent could inject prompt injection
3288 // payloads via any PassThrough method's parameters.
3289 if !state.injection_disabled {
3290 let mut inj_parts = Vec::new();
3291 if let Some(params) = parsed.get("params") {
3292 extract_strings_recursive(params, &mut inj_parts, 0);
3293 }
3294 if let Some(result) = parsed.get("result") {
3295 extract_strings_recursive(result, &mut inj_parts, 0);
3296 }
3297 let scannable = inj_parts.join("\n");
3298 if !scannable.is_empty() {
3299 let injection_matches: Vec<String> =
3300 if let Some(ref scanner) = state.injection_scanner {
3301 scanner
3302 .inspect(&scannable)
3303 .into_iter()
3304 .map(|s| s.to_string())
3305 .collect()
3306 } else {
3307 inspect_for_injection(&scannable)
3308 .into_iter()
3309 .map(|s| s.to_string())
3310 .collect()
3311 };
3312
3313 if !injection_matches.is_empty() {
3314 tracing::warn!(
3315 "SECURITY: Injection in WS passthrough params! \
3316 Session: {}, Patterns: {:?}",
3317 session_id,
3318 injection_matches,
3319 );
3320
3321 let verdict = if state.injection_blocking {
3322 Verdict::Deny {
3323 reason: format!(
3324 "WS passthrough injection blocked: {:?}",
3325 injection_matches
3326 ),
3327 }
3328 } else {
3329 Verdict::Allow
3330 };
3331
3332 let inj_action = Action::new(
3333 "vellaveto",
3334 "ws_passthrough_injection_scan",
3335 json!({
3336 "matched_patterns": injection_matches,
3337 "session": session_id,
3338 "transport": "websocket",
3339 "direction": "client_to_upstream",
3340 }),
3341 );
3342 if let Err(e) = state
3343 .audit
3344 .log_entry(
3345 &inj_action,
3346 &verdict,
3347 json!({
3348 "source": "ws_proxy",
3349 "event": "ws_passthrough_injection_detected",
3350 "blocking": state.injection_blocking,
3351 }),
3352 )
3353 .await
3354 {
3355 tracing::warn!(
3356 "Failed to audit WS passthrough injection: {}",
3357 e
3358 );
3359 }
3360
3361 if state.injection_blocking {
3362 // Drop the message (passthrough has no id to respond to)
3363 continue;
3364 }
3365 }
3366 }
3367 }
3368
3369 // SECURITY (IMP-R182-009): Memory poisoning check — parity with
3370 // tool calls, resource reads, tasks, and extension methods.
3371 if let Some(mut session) = state.sessions.get_mut(&session_id) {
3372 let params_to_scan = parsed.get("params").cloned().unwrap_or(json!({}));
3373 // SECURITY (IMP-R184-010): Also scan `result` field — parity
3374 // with DLP scan which scans both params and result.
3375 let mut poisoning_matches =
3376 session.memory_tracker.check_parameters(¶ms_to_scan);
3377 if let Some(result_val) = parsed.get("result") {
3378 poisoning_matches
3379 .extend(session.memory_tracker.check_parameters(result_val));
3380 }
3381 if !poisoning_matches.is_empty() {
3382 let method_name = parsed
3383 .get("method")
3384 .and_then(|m| m.as_str())
3385 .unwrap_or("unknown");
3386 for m in &poisoning_matches {
3387 tracing::warn!(
3388 "SECURITY: Memory poisoning in WS passthrough '{}' (session {}): \
3389 param '{}' replayed data (fingerprint: {})",
3390 method_name,
3391 session_id,
3392 m.param_location,
3393 m.fingerprint
3394 );
3395 }
3396 let poison_action = Action::new(
3397 "vellaveto",
3398 "ws_passthrough_memory_poisoning",
3399 json!({
3400 "method": method_name,
3401 "session": session_id,
3402 "matches": poisoning_matches.len(),
3403 "transport": "websocket",
3404 }),
3405 );
3406 if let Err(e) = state
3407 .audit
3408 .log_entry(
3409 &poison_action,
3410 &Verdict::Deny {
3411 reason: format!(
3412 "WS passthrough blocked: memory poisoning ({} matches)",
3413 poisoning_matches.len()
3414 ),
3415 },
3416 json!({
3417 "source": "ws_proxy",
3418 "event": "ws_passthrough_memory_poisoning",
3419 }),
3420 )
3421 .await
3422 {
3423 tracing::warn!(
3424 "Failed to audit WS passthrough memory poisoning: {}",
3425 e
3426 );
3427 }
3428 continue; // Drop the message
3429 }
3430 // Fingerprint for future poisoning detection.
3431 session.memory_tracker.extract_from_value(¶ms_to_scan);
3432 if let Some(result_val) = parsed.get("result") {
3433 session.memory_tracker.extract_from_value(result_val);
3434 }
3435 } else {
3436 // IMP-R186-005: Log when session is missing so the skip is observable.
3437 tracing::warn!(
3438 "Session {} not found for WS passthrough memory poisoning check",
3439 session_id
3440 );
3441 }
3442
3443 // SECURITY (FIND-R46-WS-004): Audit log forwarded passthrough/notification messages
3444 let msg_type = match &classified {
3445 MessageType::ProgressNotification { .. } => "progress_notification",
3446 _ => "passthrough",
3447 };
3448 let action = Action::new(
3449 "vellaveto",
3450 "ws_forward_message",
3451 json!({
3452 "message_type": msg_type,
3453 "session": session_id,
3454 "transport": "websocket",
3455 "direction": "client_to_upstream",
3456 }),
3457 );
3458 if let Err(e) = state
3459 .audit
3460 .log_entry(
3461 &action,
3462 &Verdict::Allow,
3463 json!({
3464 "source": "ws_proxy",
3465 "event": "ws_message_forwarded",
3466 }),
3467 )
3468 .await
3469 {
3470 tracing::warn!("Failed to audit WS passthrough: {}", e);
3471 }
3472
3473 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3474 let forward_text = if state.canonicalize {
3475 match serde_json::to_string(&parsed) {
3476 Ok(canonical) => canonical,
3477 Err(e) => {
3478 tracing::error!(
3479 "SECURITY: WS passthrough canonicalization failed: {}",
3480 e
3481 );
3482 continue;
3483 }
3484 }
3485 } else {
3486 text.to_string()
3487 };
3488 let mut sink = upstream_sink.lock().await;
3489 if let Err(e) = sink
3490 .send(tokio_tungstenite::tungstenite::Message::Text(
3491 forward_text.into(),
3492 ))
3493 .await
3494 {
3495 tracing::error!("Failed to forward passthrough: {}", e);
3496 break;
3497 }
3498 }
3499 }
3500 }
3501 Message::Binary(_data) => {
3502 // SECURITY: Binary frames not allowed for JSON-RPC
3503 tracing::warn!(
3504 session_id = %session_id,
3505 "Binary WebSocket frame rejected (JSON-RPC is text-only)"
3506 );
3507
3508 // SECURITY (FIND-R46-WS-004): Audit log binary frame rejection
3509 let action = Action::new(
3510 "vellaveto",
3511 "ws_binary_frame_rejected",
3512 json!({
3513 "session": session_id,
3514 "transport": "websocket",
3515 "direction": "client_to_upstream",
3516 }),
3517 );
3518 if let Err(e) = state
3519 .audit
3520 .log_entry(
3521 &action,
3522 &Verdict::Deny {
3523 reason: "Binary frames not supported for JSON-RPC".to_string(),
3524 },
3525 json!({
3526 "source": "ws_proxy",
3527 "event": "ws_binary_frame_rejected",
3528 }),
3529 )
3530 .await
3531 {
3532 tracing::warn!("Failed to audit WS binary frame rejection: {}", e);
3533 }
3534
3535 let mut sink = client_sink.lock().await;
3536 let _ = sink
3537 .send(Message::Close(Some(CloseFrame {
3538 code: CLOSE_UNSUPPORTED_DATA,
3539 reason: "Binary frames not supported".into(),
3540 })))
3541 .await;
3542 break;
3543 }
3544 Message::Ping(data) => {
3545 let mut sink = client_sink.lock().await;
3546 let _ = sink.send(Message::Pong(data)).await;
3547 }
3548 Message::Pong(_) => {
3549 // Ignored
3550 }
3551 Message::Close(_) => {
3552 tracing::debug!(session_id = %session_id, "Client sent close frame");
3553 break;
3554 }
3555 }
3556 }
3557}
3558
3559/// Relay messages from upstream to client with DLP and injection scanning.
3560#[allow(clippy::too_many_arguments)]
3561async fn relay_upstream_to_client(
3562 mut upstream_stream: futures_util::stream::SplitStream<
3563 tokio_tungstenite::WebSocketStream<
3564 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
3565 >,
3566 >,
3567 client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
3568 state: ProxyState,
3569 session_id: String,
3570 ws_config: WebSocketConfig,
3571 upstream_rate_counter: Arc<AtomicU64>,
3572 upstream_rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
3573 last_activity: Arc<AtomicU64>,
3574 connection_epoch: std::time::Instant,
3575) {
3576 while let Some(msg_result) = upstream_stream.next().await {
3577 let msg = match msg_result {
3578 Ok(m) => m,
3579 Err(e) => {
3580 tracing::debug!(session_id = %session_id, "Upstream WS error: {}", e);
3581 break;
3582 }
3583 };
3584
3585 // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
3586 last_activity.store(
3587 connection_epoch.elapsed().as_millis() as u64,
3588 Ordering::Relaxed,
3589 );
3590
3591 record_ws_message("upstream_to_client");
3592
3593 // SECURITY (FIND-R46-WS-003): Rate limiting on upstream→client direction.
3594 // A malicious or compromised upstream could flood the client with messages.
3595 if !check_rate_limit(
3596 &upstream_rate_counter,
3597 &upstream_rate_window_start,
3598 ws_config.upstream_rate_limit,
3599 ) {
3600 tracing::warn!(
3601 session_id = %session_id,
3602 "WebSocket upstream rate limit exceeded ({}/s), dropping message",
3603 ws_config.upstream_rate_limit,
3604 );
3605
3606 let action = Action::new(
3607 "vellaveto",
3608 "ws_upstream_rate_limit",
3609 json!({
3610 "session": session_id,
3611 "transport": "websocket",
3612 "direction": "upstream_to_client",
3613 "limit": ws_config.upstream_rate_limit,
3614 }),
3615 );
3616 if let Err(e) = state
3617 .audit
3618 .log_entry(
3619 &action,
3620 &Verdict::Deny {
3621 reason: "Upstream rate limit exceeded".to_string(),
3622 },
3623 json!({
3624 "source": "ws_proxy",
3625 "event": "ws_upstream_rate_limit_exceeded",
3626 }),
3627 )
3628 .await
3629 {
3630 tracing::warn!("Failed to audit WS upstream rate limit: {}", e);
3631 }
3632
3633 metrics::counter!(
3634 "vellaveto_ws_upstream_rate_limited_total",
3635 "session" => session_id.clone()
3636 )
3637 .increment(1);
3638
3639 // Drop the message (don't close the connection — upstream flood should not
3640 // disconnect the client, just throttle the flow)
3641 continue;
3642 }
3643
3644 match msg {
3645 tokio_tungstenite::tungstenite::Message::Text(text) => {
3646 // Try to parse for scanning
3647 let forward = if let Ok(json_val) = serde_json::from_str::<Value>(&text) {
3648 // Resolve tracked tool context for response-side schema checks.
3649 let tracked_tool_name =
3650 take_tracked_tool_call(&state.sessions, &session_id, json_val.get("id"));
3651
3652 // SECURITY (FIND-R75-003): Track whether DLP or injection was detected
3653 // (even in log-only mode) to gate memory_tracker.record_response().
3654 // Recording fingerprints from tainted responses would poison the tracker.
3655 let mut dlp_found = false;
3656 let mut injection_found = false;
3657
3658 // DLP scanning on responses
3659 if state.response_dlp_enabled {
3660 let dlp_findings = scan_response_for_secrets(&json_val);
3661 if !dlp_findings.is_empty() {
3662 dlp_found = true;
3663 for finding in &dlp_findings {
3664 record_dlp_finding(&finding.pattern_name);
3665 }
3666
3667 let patterns: Vec<String> = dlp_findings
3668 .iter()
3669 .map(|f| format!("{}:{}", f.pattern_name, f.location))
3670 .collect();
3671
3672 tracing::warn!(
3673 "SECURITY: Secrets in WS response! Session: {}, Findings: {:?}",
3674 session_id,
3675 patterns,
3676 );
3677
3678 let verdict = if state.response_dlp_blocking {
3679 Verdict::Deny {
3680 reason: format!("WS response DLP blocked: {:?}", patterns),
3681 }
3682 } else {
3683 Verdict::Allow
3684 };
3685
3686 let action = Action::new(
3687 "vellaveto",
3688 "ws_response_dlp_scan",
3689 json!({
3690 "findings": patterns,
3691 "session": session_id,
3692 "transport": "websocket",
3693 }),
3694 );
3695 if let Err(e) = state
3696 .audit
3697 .log_entry(
3698 &action,
3699 &verdict,
3700 json!({
3701 "source": "ws_proxy",
3702 "event": "ws_response_dlp_alert",
3703 }),
3704 )
3705 .await
3706 {
3707 tracing::warn!("Failed to audit WS DLP: {}", e);
3708 }
3709
3710 if state.response_dlp_blocking {
3711 // Send error response instead
3712 let id = json_val.get("id");
3713 let error = make_ws_error_response(
3714 id,
3715 -32001,
3716 "Response blocked by DLP policy",
3717 );
3718 let mut sink = client_sink.lock().await;
3719 let _ = sink.send(Message::Text(error.into())).await;
3720 continue;
3721 }
3722 }
3723 }
3724
3725 // Injection scanning
3726 if !state.injection_disabled {
3727 let text_to_scan = extract_scannable_text(&json_val);
3728 if !text_to_scan.is_empty() {
3729 let injection_matches: Vec<String> =
3730 if let Some(ref scanner) = state.injection_scanner {
3731 scanner
3732 .inspect(&text_to_scan)
3733 .into_iter()
3734 .map(|s| s.to_string())
3735 .collect()
3736 } else {
3737 inspect_for_injection(&text_to_scan)
3738 .into_iter()
3739 .map(|s| s.to_string())
3740 .collect()
3741 };
3742
3743 if !injection_matches.is_empty() {
3744 injection_found = true;
3745 tracing::warn!(
3746 "SECURITY: Injection in WS response! Session: {}, Patterns: {:?}",
3747 session_id,
3748 injection_matches,
3749 );
3750
3751 let verdict = if state.injection_blocking {
3752 Verdict::Deny {
3753 reason: format!(
3754 "WS response injection blocked: {:?}",
3755 injection_matches
3756 ),
3757 }
3758 } else {
3759 Verdict::Allow
3760 };
3761
3762 let action = Action::new(
3763 "vellaveto",
3764 "ws_response_injection",
3765 json!({
3766 "matched_patterns": injection_matches,
3767 "session": session_id,
3768 "transport": "websocket",
3769 }),
3770 );
3771 if let Err(e) = state
3772 .audit
3773 .log_entry(
3774 &action,
3775 &verdict,
3776 json!({
3777 "source": "ws_proxy",
3778 "event": "ws_injection_detected",
3779 }),
3780 )
3781 .await
3782 {
3783 tracing::warn!("Failed to audit WS injection: {}", e);
3784 }
3785
3786 if state.injection_blocking {
3787 let id = json_val.get("id");
3788 let error = make_ws_error_response(
3789 id,
3790 -32001,
3791 "Response blocked: injection detected",
3792 );
3793 let mut sink = client_sink.lock().await;
3794 let _ = sink.send(Message::Text(error.into())).await;
3795 continue;
3796 }
3797 }
3798 }
3799 }
3800
3801 // SECURITY (FIND-R46-007): Rug-pull detection on tools/list responses.
3802 // Check if this is a response to a tools/list request and extract
3803 // annotations for rug-pull detection.
3804 if json_val.get("result").is_some() {
3805 // Check if result contains "tools" array (tools/list response)
3806 if json_val
3807 .get("result")
3808 .and_then(|r| r.get("tools"))
3809 .and_then(|t| t.as_array())
3810 .is_some()
3811 {
3812 super::helpers::extract_annotations_from_response(
3813 &json_val,
3814 &session_id,
3815 &state.sessions,
3816 &state.audit,
3817 &state.known_tools,
3818 )
3819 .await;
3820
3821 // Verify manifest if configured
3822 if let Some(ref manifest_config) = state.manifest_config {
3823 super::helpers::verify_manifest_from_response(
3824 &json_val,
3825 &session_id,
3826 &state.sessions,
3827 manifest_config,
3828 &state.audit,
3829 )
3830 .await;
3831 }
3832
3833 // SECURITY (FIND-R130-003): Scan tool descriptions for embedded
3834 // injection. Parity with HTTP upstream handler (upstream.rs:648-698).
3835 if !state.injection_disabled {
3836 let desc_findings =
3837 if let Some(ref scanner) = state.injection_scanner {
3838 scan_tool_descriptions_with_scanner(&json_val, scanner)
3839 } else {
3840 scan_tool_descriptions(&json_val)
3841 };
3842 for finding in &desc_findings {
3843 injection_found = true;
3844 tracing::warn!(
3845 "SECURITY: Injection in tool '{}' description! \
3846 Session: {}, Patterns: {:?}",
3847 finding.tool_name,
3848 session_id,
3849 finding.matched_patterns
3850 );
3851 let action = Action::new(
3852 "vellaveto",
3853 "tool_description_injection",
3854 json!({
3855 "tool": finding.tool_name,
3856 "matched_patterns": finding.matched_patterns,
3857 "session": session_id,
3858 "transport": "websocket",
3859 "blocking": state.injection_blocking,
3860 }),
3861 );
3862 if let Err(e) = state
3863 .audit
3864 .log_entry(
3865 &action,
3866 &Verdict::Deny {
3867 reason: format!(
3868 "Tool '{}' description contains injection: {:?}",
3869 finding.tool_name, finding.matched_patterns
3870 ),
3871 },
3872 json!({
3873 "source": "ws_proxy",
3874 "event": "tool_description_injection",
3875 }),
3876 )
3877 .await
3878 {
3879 tracing::warn!(
3880 "Failed to audit WS tool description injection: {}",
3881 e
3882 );
3883 }
3884 }
3885 if !desc_findings.is_empty() && state.injection_blocking {
3886 let id = json_val.get("id");
3887 let error = make_ws_error_response(
3888 id,
3889 -32001,
3890 "Response blocked: suspicious content in tool descriptions",
3891 );
3892 let mut sink = client_sink.lock().await;
3893 let _ = sink.send(Message::Text(error.into())).await;
3894 continue;
3895 }
3896 }
3897 }
3898 }
3899
3900 // SECURITY: Enforce output schema on WS structuredContent.
3901 // SECURITY (FIND-R154-004): Track schema violations for the
3902 // record_response guard below, even in non-blocking mode.
3903 let schema_violation_found = validate_ws_structured_content_response(
3904 &json_val,
3905 &state,
3906 &session_id,
3907 tracked_tool_name.as_deref(),
3908 )
3909 .await;
3910 if schema_violation_found {
3911 let id = json_val.get("id");
3912 let error = make_ws_error_response(
3913 id,
3914 -32001,
3915 "Response blocked: output schema validation failed",
3916 );
3917 let mut sink = client_sink.lock().await;
3918 let _ = sink.send(Message::Text(error.into())).await;
3919 continue;
3920 }
3921
3922 // SECURITY (FIND-R75-003, FIND-R154-004): Record response
3923 // fingerprints for memory poisoning detection. Skip recording
3924 // when DLP, injection, or schema violation was detected (even
3925 // in log-only mode) to avoid poisoning the tracker with tainted
3926 // data. Parity with stdio relay (relay.rs:2919).
3927 if !dlp_found && !injection_found && !schema_violation_found {
3928 if let Some(mut session) = state.sessions.get_mut(&session_id) {
3929 session.memory_tracker.record_response(&json_val);
3930 }
3931 }
3932
3933 // SECURITY (FIND-R46-WS-004): Audit log forwarded upstream→client text messages
3934 {
3935 let msg_type = if json_val.get("result").is_some() {
3936 "response"
3937 } else if json_val.get("error").is_some() {
3938 "error_response"
3939 } else if json_val.get("method").is_some() {
3940 "notification"
3941 } else {
3942 "unknown"
3943 };
3944 let action = Action::new(
3945 "vellaveto",
3946 "ws_forward_upstream_message",
3947 json!({
3948 "message_type": msg_type,
3949 "session": session_id,
3950 "transport": "websocket",
3951 "direction": "upstream_to_client",
3952 }),
3953 );
3954 if let Err(e) = state
3955 .audit
3956 .log_entry(
3957 &action,
3958 &Verdict::Allow,
3959 json!({
3960 "source": "ws_proxy",
3961 "event": "ws_upstream_message_forwarded",
3962 }),
3963 )
3964 .await
3965 {
3966 tracing::warn!("Failed to audit WS upstream message forward: {}", e);
3967 }
3968 }
3969
3970 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3971 if state.canonicalize {
3972 match serde_json::to_string(&json_val) {
3973 Ok(canonical) => canonical,
3974 Err(e) => {
3975 tracing::error!(
3976 "SECURITY: WS response canonicalization failed: {}",
3977 e
3978 );
3979 continue;
3980 }
3981 }
3982 } else {
3983 text.to_string()
3984 }
3985 } else {
3986 // SECURITY (FIND-R166-001): Non-JSON upstream text must still be
3987 // scanned for DLP/injection before forwarding. A malicious upstream
3988 // could exfiltrate secrets or inject payloads via non-JSON frames.
3989 let text_str: &str = &text;
3990 // SECURITY (FIND-R168-001): DLP scan with audit logging parity.
3991 if state.response_dlp_enabled {
3992 let findings = scan_text_for_secrets(text_str, "ws.upstream.non_json_text");
3993 if !findings.is_empty() {
3994 let patterns: Vec<String> = findings
3995 .iter()
3996 .map(|f| format!("{}:{}", f.pattern_name, f.location))
3997 .collect();
3998 tracing::warn!(
3999 session_id = %session_id,
4000 "DLP: non-JSON upstream text contains sensitive data ({} findings)",
4001 findings.len(),
4002 );
4003 let verdict = if state.response_dlp_blocking {
4004 Verdict::Deny {
4005 reason: format!("WS non-JSON DLP: {:?}", patterns),
4006 }
4007 } else {
4008 Verdict::Allow
4009 };
4010 let action = Action::new(
4011 "vellaveto",
4012 "ws_nonjson_dlp_scan",
4013 json!({ "findings": patterns, "session": session_id, "transport": "websocket" }),
4014 );
4015 // SECURITY (SE-004): Log audit failures instead of silently discarding.
4016 if let Err(e) = state.audit.log_entry(
4017 &action, &verdict,
4018 json!({ "source": "ws_proxy", "event": "ws_nonjson_dlp_alert" }),
4019 ).await {
4020 tracing::error!(
4021 session_id = %session_id,
4022 error = %e,
4023 "AUDIT FAILURE: failed to log ws_nonjson_dlp_alert"
4024 );
4025 }
4026 if state.response_dlp_blocking {
4027 continue;
4028 }
4029 }
4030 }
4031 // SECURITY (FIND-R168-002): Injection scan with log-only mode
4032 // parity. Always log detections; only block when injection_blocking.
4033 {
4034 let alerts: Vec<String> = if let Some(ref scanner) = state.injection_scanner
4035 {
4036 scanner
4037 .inspect(text_str)
4038 .into_iter()
4039 .map(|s| s.to_string())
4040 .collect()
4041 } else {
4042 inspect_for_injection(text_str)
4043 .into_iter()
4044 .map(|s| s.to_string())
4045 .collect()
4046 };
4047 if !alerts.is_empty() {
4048 tracing::warn!(
4049 session_id = %session_id,
4050 "Injection: non-JSON upstream text ({} alerts), blocking={}",
4051 alerts.len(), state.injection_blocking,
4052 );
4053 let verdict = if state.injection_blocking {
4054 Verdict::Deny {
4055 reason: format!(
4056 "WS non-JSON injection: {} alerts",
4057 alerts.len()
4058 ),
4059 }
4060 } else {
4061 Verdict::Allow
4062 };
4063 let action = Action::new(
4064 "vellaveto",
4065 "ws_nonjson_injection_scan",
4066 json!({ "alerts": alerts.len(), "session": session_id, "transport": "websocket" }),
4067 );
4068 // SECURITY (SE-004): Log audit failures instead of silently discarding.
4069 if let Err(e) = state.audit.log_entry(
4070 &action, &verdict,
4071 json!({ "source": "ws_proxy", "event": "ws_nonjson_injection_alert" }),
4072 ).await {
4073 tracing::error!(
4074 session_id = %session_id,
4075 error = %e,
4076 "AUDIT FAILURE: failed to log ws_nonjson_injection_alert"
4077 );
4078 }
4079 if state.injection_blocking {
4080 continue;
4081 }
4082 }
4083 }
4084 text.to_string()
4085 };
4086
4087 let mut sink = client_sink.lock().await;
4088 if let Err(e) = sink.send(Message::Text(forward.into())).await {
4089 tracing::debug!("Failed to send to client: {}", e);
4090 break;
4091 }
4092 }
4093 tokio_tungstenite::tungstenite::Message::Binary(data) => {
4094 // SECURITY (FIND-R46-WS-002): DLP scanning on upstream binary frames.
4095 // Binary from upstream is unusual for JSON-RPC but must be scanned
4096 // before being dropped, to detect and audit secret exfiltration attempts
4097 // via binary frames.
4098 tracing::warn!(
4099 session_id = %session_id,
4100 "Unexpected binary frame from upstream ({} bytes), scanning before drop",
4101 data.len(),
4102 );
4103
4104 // DLP scan the binary data as UTF-8 lossy
4105 if state.response_dlp_enabled {
4106 let text_repr = String::from_utf8_lossy(&data);
4107 if !text_repr.is_empty() {
4108 let dlp_findings = scan_text_for_secrets(&text_repr, "ws_binary_frame");
4109 if !dlp_findings.is_empty() {
4110 for finding in &dlp_findings {
4111 record_dlp_finding(&finding.pattern_name);
4112 }
4113 let patterns: Vec<String> = dlp_findings
4114 .iter()
4115 .map(|f| format!("{}:{}", f.pattern_name, f.location))
4116 .collect();
4117
4118 tracing::warn!(
4119 "SECURITY: Secrets in WS upstream binary frame! Session: {}, Findings: {:?}",
4120 session_id,
4121 patterns,
4122 );
4123
4124 let action = Action::new(
4125 "vellaveto",
4126 "ws_binary_dlp_scan",
4127 json!({
4128 "findings": patterns,
4129 "session": session_id,
4130 "transport": "websocket",
4131 "direction": "upstream_to_client",
4132 "binary_size": data.len(),
4133 }),
4134 );
4135 if let Err(e) = state
4136 .audit
4137 .log_entry(
4138 &action,
4139 &Verdict::Deny {
4140 reason: format!("WS binary frame DLP: {:?}", patterns),
4141 },
4142 json!({
4143 "source": "ws_proxy",
4144 "event": "ws_binary_dlp_alert",
4145 }),
4146 )
4147 .await
4148 {
4149 tracing::warn!("Failed to audit WS binary DLP: {}", e);
4150 }
4151 }
4152 }
4153 }
4154
4155 // SECURITY (FIND-R46-WS-004): Audit log binary frame drop
4156 let action = Action::new(
4157 "vellaveto",
4158 "ws_upstream_binary_dropped",
4159 json!({
4160 "session": session_id,
4161 "transport": "websocket",
4162 "direction": "upstream_to_client",
4163 "binary_size": data.len(),
4164 }),
4165 );
4166 if let Err(e) = state
4167 .audit
4168 .log_entry(
4169 &action,
4170 &Verdict::Deny {
4171 reason: "Binary frames not supported for JSON-RPC".to_string(),
4172 },
4173 json!({
4174 "source": "ws_proxy",
4175 "event": "ws_upstream_binary_dropped",
4176 }),
4177 )
4178 .await
4179 {
4180 tracing::warn!("Failed to audit WS upstream binary drop: {}", e);
4181 }
4182 }
4183 tokio_tungstenite::tungstenite::Message::Ping(data) => {
4184 // Forward ping as pong to upstream (handled by tungstenite)
4185 let _ = data; // tungstenite auto-responds to pings
4186 }
4187 tokio_tungstenite::tungstenite::Message::Pong(_) => {}
4188 tokio_tungstenite::tungstenite::Message::Close(_) => {
4189 tracing::debug!(session_id = %session_id, "Upstream sent close frame");
4190 break;
4191 }
4192 tokio_tungstenite::tungstenite::Message::Frame(_) => {
4193 // Raw frame — ignore
4194 }
4195 }
4196 }
4197}
4198
4199/// Convert an HTTP URL to a WebSocket URL.
4200///
4201/// `http://` → `ws://`, `https://` → `wss://`.
4202///
4203/// SECURITY (FIND-R124-001): Only allows http/https/ws/wss schemes.
4204/// Unknown schemes (ftp://, file://, gopher://) are rejected with a
4205/// warning and fall back to the original URL prefixed with `ws://`
4206/// to maintain fail-closed behavior. This gives parity with scheme
4207/// validation in HTTP and gRPC transports (FIND-R42-015).
4208pub fn convert_to_ws_url(http_url: &str) -> String {
4209 if let Some(rest) = http_url.strip_prefix("https://") {
4210 format!("wss://{}", rest)
4211 } else if let Some(rest) = http_url.strip_prefix("http://") {
4212 format!("ws://{}", rest)
4213 } else if http_url.starts_with("wss://") || http_url.starts_with("ws://") {
4214 // Already a WebSocket URL — use as-is
4215 http_url.to_string()
4216 } else {
4217 // SECURITY (FIND-R124-001): Reject unknown schemes. Log warning
4218 // and return a URL that will fail to connect safely rather than
4219 // connecting to an unintended scheme (e.g., ftp://, file://).
4220 // SECURITY (FIND-R166-003): Sanitize logged value to prevent log injection
4221 // from URLs with control characters (possible in gateway mode).
4222 tracing::warn!(
4223 "convert_to_ws_url: rejecting URL with unsupported scheme: {}",
4224 vellaveto_types::sanitize_for_log(
4225 http_url.split("://").next().unwrap_or("unknown"),
4226 128,
4227 )
4228 );
4229 // Return invalid URL that will fail at connect_async()
4230 format!("ws://invalid-scheme-rejected.localhost/{}", http_url.len())
4231 }
4232}
4233
4234/// Connect to an upstream WebSocket server.
4235///
4236/// Returns the split WebSocket stream or an error.
4237async fn connect_upstream_ws(
4238 url: &str,
4239) -> Result<
4240 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
4241 String,
4242> {
4243 let connect_timeout = Duration::from_secs(10);
4244 match tokio::time::timeout(connect_timeout, tokio_tungstenite::connect_async(url)).await {
4245 Ok(Ok((ws_stream, _response))) => Ok(ws_stream),
4246 Ok(Err(e)) => Err(format!("WebSocket connection error: {}", e)),
4247 Err(_) => Err("WebSocket connection timeout (10s)".to_string()),
4248 }
4249}
4250
4251/// Register output schemas and validate WS response `structuredContent`.
4252///
4253/// Returns true when the response should be blocked.
4254async fn validate_ws_structured_content_response(
4255 json_val: &Value,
4256 state: &ProxyState,
4257 session_id: &str,
4258 tracked_tool_name: Option<&str>,
4259) -> bool {
4260 // Keep WS behavior aligned with HTTP/SSE paths.
4261 state
4262 .output_schema_registry
4263 .register_from_tools_list(json_val);
4264
4265 let Some(result) = json_val.get("result") else {
4266 return false;
4267 };
4268 let Some(structured) = result.get("structuredContent") else {
4269 return false;
4270 };
4271
4272 let meta_tool_name = result
4273 .get("_meta")
4274 .and_then(|m| m.get("tool"))
4275 .and_then(|t| t.as_str());
4276 let tool_name = match (meta_tool_name, tracked_tool_name) {
4277 (Some(meta), Some(tracked)) if !meta.eq_ignore_ascii_case(tracked) => {
4278 tracing::warn!(
4279 "SECURITY: WS structuredContent tool mismatch (meta='{}', tracked='{}'); using tracked tool name",
4280 meta,
4281 tracked
4282 );
4283 tracked
4284 }
4285 (Some(meta), _) => meta,
4286 (None, Some(tracked)) => tracked,
4287 (None, None) => "unknown",
4288 };
4289
4290 match state.output_schema_registry.validate(tool_name, structured) {
4291 ValidationResult::Invalid { violations } => {
4292 tracing::warn!(
4293 "SECURITY: WS structuredContent validation failed for tool '{}': {:?}",
4294 tool_name,
4295 violations
4296 );
4297 let action = Action::new(
4298 "vellaveto",
4299 "output_schema_violation",
4300 json!({
4301 "tool": tool_name,
4302 "violations": violations,
4303 "session": session_id,
4304 "transport": "websocket",
4305 }),
4306 );
4307 if let Err(e) = state
4308 .audit
4309 .log_entry(
4310 &action,
4311 &Verdict::Deny {
4312 reason: format!("WS structuredContent validation failed: {:?}", violations),
4313 },
4314 json!({"source": "ws_proxy", "event": "output_schema_violation_ws"}),
4315 )
4316 .await
4317 {
4318 tracing::warn!("Failed to audit WS output schema violation: {}", e);
4319 }
4320 true
4321 }
4322 ValidationResult::Valid => {
4323 tracing::debug!("WS structuredContent validated for tool '{}'", tool_name);
4324 false
4325 }
4326 ValidationResult::NoSchema => {
4327 tracing::debug!(
4328 "No output schema registered for WS tool '{}', skipping validation",
4329 tool_name
4330 );
4331 false
4332 }
4333 }
4334}
4335
4336// NOTE: build_ws_evaluation_context() was removed in FIND-R130-002 fix.
4337// All callers now build EvaluationContext inline inside the DashMap shard
4338// lock to prevent TOCTOU races on call_counts/action_history.
4339
4340/// Check per-connection rate limit. Returns true if within limit.
4341fn check_rate_limit(
4342 counter: &AtomicU64,
4343 window_start: &std::sync::Mutex<std::time::Instant>,
4344 max_per_sec: u32,
4345) -> bool {
4346 // SECURITY (FIND-R182-006): Fail-closed — zero rate limit blocks all messages.
4347 // Previously returned true (fail-open), which disabled rate limiting entirely.
4348 if max_per_sec == 0 {
4349 return false;
4350 }
4351
4352 let now = std::time::Instant::now();
4353 let mut start = match window_start.lock() {
4354 Ok(guard) => guard,
4355 Err(e) => {
4356 tracing::error!("WS rate limiter mutex poisoned — fail-closed: {}", e);
4357 return false;
4358 }
4359 };
4360
4361 if now.duration_since(*start) >= Duration::from_secs(1) {
4362 // Reset window
4363 *start = now;
4364 // SECURITY (FIND-R55-WS-003): Use SeqCst for security-critical rate limit counter.
4365 counter.store(1, Ordering::SeqCst);
4366 true
4367 } else {
4368 // SECURITY (FIND-R182-003): saturating arithmetic prevents overflow wrap-to-zero.
4369 // SECURITY (FIND-R155-WS-001): Conditional atomic increment — only increment if
4370 // within limit, reject otherwise. This eliminates the TOCTOU gap between
4371 // load()+compare and also prevents counter inflation from rejected requests.
4372 // The closure returns None when limit is reached, causing fetch_update to fail
4373 // without modifying the counter.
4374 let limit = max_per_sec as u64;
4375 match counter.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
4376 if v >= limit {
4377 None // Limit reached — do not increment
4378 } else {
4379 Some(v.saturating_add(1))
4380 }
4381 }) {
4382 Ok(_prev) => true, // Within limit, counter was incremented
4383 Err(_) => false, // Limit exceeded, counter unchanged
4384 }
4385 }
4386}
4387
4388/// Extract scannable text from a JSON-RPC request for injection scanning.
4389///
4390/// SECURITY (FIND-R46-WS-001): Scans tool call arguments, resource URIs,
4391/// and sampling request content for injection payloads in the client→upstream
4392/// direction. Matches the HTTP proxy's request-side injection scanning.
4393fn extract_scannable_text_from_request(json_val: &Value) -> String {
4394 let mut text_parts = Vec::new();
4395
4396 // SECURITY (FIND-R224-002): Recursively scan the entire `params` subtree,
4397 // not just specific fields. Previous narrow extraction missed injection
4398 // payloads in TaskRequest (params.message), ExtensionMethod, and other
4399 // message types with non-standard parameter structures. This gives parity
4400 // with the HTTP handler's `extract_passthrough_text_for_injection` which
4401 // scans all of params recursively.
4402 if let Some(params) = json_val.get("params") {
4403 extract_strings_recursive(params, &mut text_parts, 0);
4404 }
4405
4406 // SECURITY (FIND-R224-006): Also scan `result` field for injection payloads.
4407 // JSON-RPC response messages (sampling/elicitation replies) carry data in
4408 // `result` rather than `params`. Without this, upstream injection payloads
4409 // in response results bypass WS scanning while being caught by HTTP/gRPC.
4410 if let Some(result) = json_val.get("result") {
4411 extract_strings_recursive(result, &mut text_parts, 0);
4412 }
4413
4414 text_parts.join("\n")
4415}
4416
4417/// Recursively extract string values from a JSON value, with depth and count bounds.
4418///
4419/// SECURITY (FIND-R48-007): Added MAX_PARTS to prevent memory amplification
4420/// from messages containing many short strings.
4421fn extract_strings_recursive(val: &Value, parts: &mut Vec<String>, depth: usize) {
4422 // SECURITY (FIND-R154-005): Use depth 32 matching shared MAX_SCAN_DEPTH
4423 // in scanner_base.rs. Previous limit of 10 allowed injection payloads
4424 // nested between depth 11-32 to evade WS scanning while being caught
4425 // by the stdio relay and DLP scanner (both use MAX_SCAN_DEPTH=32).
4426 const MAX_DEPTH: usize = 32;
4427 const MAX_PARTS: usize = 1000;
4428 if depth > MAX_DEPTH || parts.len() >= MAX_PARTS {
4429 return;
4430 }
4431 match val {
4432 Value::String(s) => parts.push(s.clone()),
4433 Value::Array(arr) => {
4434 for item in arr {
4435 extract_strings_recursive(item, parts, depth + 1);
4436 }
4437 }
4438 Value::Object(map) => {
4439 for (key, v) in map {
4440 // SECURITY (FIND-R154-003): Also scan object keys for injection
4441 // payloads. Parity with stdio relay's traverse_json_strings_with_keys.
4442 // Without this, attackers can hide injection in JSON key names.
4443 if parts.len() < MAX_PARTS {
4444 parts.push(key.clone());
4445 }
4446 extract_strings_recursive(v, parts, depth + 1);
4447 }
4448 }
4449 _ => {}
4450 }
4451}
4452
4453/// Extract scannable text from a JSON-RPC response for injection scanning.
4454///
4455/// SECURITY (FIND-R130-004): Delegates to the shared `extract_text_from_result()`
4456/// which covers `resource.text`, `resource.blob` (base64-decoded), `annotations`,
4457/// and `_meta` — all missing from the previous WS-only implementation.
4458fn extract_scannable_text(json_val: &Value) -> String {
4459 let mut text_parts = Vec::new();
4460
4461 // Scan result via shared extraction (covers content[].text, resource.text,
4462 // resource.blob, annotations, instructionsForUser, structuredContent, _meta).
4463 if let Some(result) = json_val.get("result") {
4464 let result_text = super::inspection::extract_text_from_result(result);
4465 if !result_text.is_empty() {
4466 text_parts.push(result_text);
4467 }
4468 }
4469
4470 // Scan error messages (not covered by extract_text_from_result)
4471 if let Some(error) = json_val.get("error") {
4472 if let Some(msg) = error.get("message").and_then(|m| m.as_str()) {
4473 text_parts.push(msg.to_string());
4474 }
4475 // SECURITY (FIND-R168-005): Use as_str() first to avoid wrapping
4476 // string values in JSON quotes. Parity with scanner_base.rs.
4477 if let Some(data) = error.get("data") {
4478 if let Some(s) = data.as_str() {
4479 text_parts.push(s.to_string());
4480 } else {
4481 text_parts.push(data.to_string());
4482 }
4483 }
4484 }
4485
4486 text_parts.join("\n")
4487}
4488
4489/// Create a pending approval for WebSocket-denied actions when an approval store
4490/// is configured. Returns the pending approval ID on success.
4491async fn create_ws_approval(
4492 state: &ProxyState,
4493 session_id: &str,
4494 action: &Action,
4495 reason: &str,
4496) -> Option<String> {
4497 let store = state.approval_store.as_ref()?;
4498 let requested_by = state.sessions.get_mut(session_id).and_then(|session| {
4499 session
4500 .agent_identity
4501 .as_ref()
4502 .and_then(|identity| identity.subject.clone())
4503 .or_else(|| session.oauth_subject.clone())
4504 });
4505 match store
4506 .create(action.clone(), reason.to_string(), requested_by)
4507 .await
4508 {
4509 Ok(id) => Some(id),
4510 Err(e) => {
4511 tracing::error!(
4512 session_id = %session_id,
4513 "Failed to create WebSocket approval (fail-closed): {}",
4514 e
4515 );
4516 None
4517 }
4518 }
4519}
4520
4521/// Build a JSON-RPC error response string for WebSocket with optional `error.data`.
4522fn make_ws_error_response_with_data(
4523 id: Option<&Value>,
4524 code: i64,
4525 message: &str,
4526 data: Option<Value>,
4527) -> String {
4528 let mut error = serde_json::Map::new();
4529 error.insert("code".to_string(), Value::from(code));
4530 error.insert("message".to_string(), Value::from(message));
4531 if let Some(data) = data {
4532 error.insert("data".to_string(), data);
4533 }
4534 let response = json!({
4535 "jsonrpc": "2.0",
4536 "id": id.cloned().unwrap_or(Value::Null),
4537 "error": Value::Object(error),
4538 });
4539 serde_json::to_string(&response).unwrap_or_else(|_| {
4540 format!(
4541 r#"{{"jsonrpc":"2.0","error":{{"code":{},"message":"{}"}},"id":null}}"#,
4542 code, message
4543 )
4544 })
4545}
4546
4547/// Build a JSON-RPC error response string for WebSocket.
4548fn make_ws_error_response(id: Option<&Value>, code: i64, message: &str) -> String {
4549 make_ws_error_response_with_data(id, code, message, None)
4550}
4551
4552#[cfg(test)]
4553mod tests;