Skip to main content

vellaveto_http_proxy/proxy/websocket/
mod.rs

1// Copyright 2026 Paolo Vella
2// SPDX-License-Identifier: BUSL-1.1
3//
4// Use of this software is governed by the Business Source License
5// included in the LICENSE-BSL-1.1 file at the root of this repository.
6//
7// Change Date: Three years from the date of publication of this version.
8// Change License: MPL-2.0
9
10//! WebSocket transport for MCP JSON-RPC messages (SEP-1288).
11//!
12//! Implements a WebSocket reverse proxy that sits between MCP clients and
13//! an upstream MCP server. WebSocket messages (text frames) are parsed as
14//! JSON-RPC, classified via `vellaveto_mcp::extractor`, evaluated against
15//! loaded policies, and forwarded to the upstream server.
16//!
17//! Security invariants:
18//! - **Fail-closed**: Unparseable messages close the connection (code 1008).
19//! - **No binary frames**: Only text frames are accepted (code 1003 for binary).
20//! - **Session binding**: Each WS connection is bound to exactly one session.
21//! - **Canonicalization**: Re-serialized JSON forwarded (TOCTOU defense).
22
23use axum::{
24    extract::{
25        ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade},
26        ConnectInfo, State,
27    },
28    http::HeaderMap,
29    response::Response,
30};
31use futures_util::{SinkExt, StreamExt};
32use serde_json::{json, Value};
33use std::net::SocketAddr;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::Mutex;
38use vellaveto_mcp::extractor::{self, MessageType};
39use vellaveto_mcp::inspection::{
40    inspect_for_injection, scan_notification_for_secrets, scan_parameters_for_secrets,
41    scan_response_for_secrets, scan_text_for_secrets, scan_tool_descriptions,
42    scan_tool_descriptions_with_scanner,
43};
44use vellaveto_mcp::output_validation::ValidationResult;
45use vellaveto_types::{Action, EvaluationContext, Verdict};
46
47use super::auth::{validate_agent_identity, validate_api_key, validate_oauth};
48use super::call_chain::{
49    check_privilege_escalation, sync_session_call_chain_from_headers, take_tracked_tool_call,
50    track_pending_tool_call,
51};
52use super::origin::validate_origin;
53use super::ProxyState;
54use crate::proxy_metrics::record_dlp_finding;
55
56/// Configuration for WebSocket transport.
57#[derive(Debug, Clone)]
58pub struct WebSocketConfig {
59    /// Maximum message size in bytes (default: 1 MB).
60    pub max_message_size: usize,
61    /// Idle timeout in seconds — close connection after no message activity (default: 300s).
62    /// SECURITY (FIND-R182-001): True idle timeout that resets on every message.
63    pub idle_timeout_secs: u64,
64    /// Maximum messages per second per connection for client-to-upstream (default: 100).
65    pub message_rate_limit: u32,
66    /// Maximum messages per second per connection for upstream-to-client (default: 500).
67    /// SECURITY (FIND-R46-WS-003): Rate limits on the upstream→client direction prevent
68    /// a malicious upstream from flooding the client with responses.
69    pub upstream_rate_limit: u32,
70}
71
72impl Default for WebSocketConfig {
73    fn default() -> Self {
74        Self {
75            max_message_size: 1_048_576,
76            idle_timeout_secs: 300,
77            message_rate_limit: 100,
78            upstream_rate_limit: 500,
79        }
80    }
81}
82
83/// WebSocket close codes per RFC 6455.
84const CLOSE_POLICY_VIOLATION: u16 = 1008;
85const CLOSE_UNSUPPORTED_DATA: u16 = 1003;
86/// Close code for oversized messages. Used by axum's `max_message_size`
87/// automatically; kept here for documentation and test assertions.
88#[cfg(test)]
89const CLOSE_MESSAGE_TOO_BIG: u16 = 1009;
90const CLOSE_NORMAL: u16 = 1000;
91
92/// Global WebSocket metrics counters.
93static WS_CONNECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
94static WS_MESSAGES_TOTAL: AtomicU64 = AtomicU64::new(0);
95
96/// Record WebSocket connection metric.
97fn record_ws_connection() {
98    // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
99    // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
100    let _ = WS_CONNECTIONS_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
101        Some(v.saturating_add(1))
102    });
103    metrics::counter!("vellaveto_ws_connections_total").increment(1);
104}
105
106/// Record WebSocket message metric.
107fn record_ws_message(direction: &str) {
108    // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
109    // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
110    let _ = WS_MESSAGES_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
111        Some(v.saturating_add(1))
112    });
113    metrics::counter!(
114        "vellaveto_ws_messages_total",
115        "direction" => direction.to_string()
116    )
117    .increment(1);
118}
119
120/// Get current connection count (for testing).
121#[cfg(test)]
122pub(crate) fn ws_connections_count() -> u64 {
123    WS_CONNECTIONS_TOTAL.load(Ordering::SeqCst)
124}
125
126/// Get current message count (for testing).
127#[cfg(test)]
128pub(crate) fn ws_messages_count() -> u64 {
129    WS_MESSAGES_TOTAL.load(Ordering::SeqCst)
130}
131
132use vellaveto_types::is_unicode_format_char as is_unicode_format_char_ws;
133
134/// Query parameters for the WebSocket upgrade endpoint.
135#[derive(Debug, serde::Deserialize, Default)]
136#[serde(deny_unknown_fields)]
137pub struct WsQueryParams {
138    /// Optional session ID for session resumption.
139    #[serde(default)]
140    pub session_id: Option<String>,
141}
142
143/// Handle WebSocket upgrade request at `/mcp/ws`.
144///
145/// Authenticates the request, validates origin, creates/resumes a session,
146/// and upgrades the HTTP connection to a WebSocket.
147pub async fn handle_ws_upgrade(
148    State(state): State<ProxyState>,
149    ConnectInfo(addr): ConnectInfo<SocketAddr>,
150    headers: HeaderMap,
151    query: axum::extract::Query<WsQueryParams>,
152    ws: WebSocketUpgrade,
153) -> Response {
154    // 1. Validate origin (CSRF / DNS rebinding defense)
155    if let Err(resp) = validate_origin(&headers, &state.bind_addr, &state.allowed_origins) {
156        return resp;
157    }
158
159    // 2. Authenticate before upgrade (API key or OAuth)
160    if let Err(resp) = validate_api_key(&state, &headers) {
161        return resp;
162    }
163
164    // SECURITY (FIND-R53-WS-001): Validate OAuth token at upgrade time.
165    // Parity with HTTP POST (handlers.rs:154) and GET (handlers.rs:2864).
166    // Without this, WS connections bypass token expiry checks.
167    let oauth_claims = match validate_oauth(
168        &state,
169        &headers,
170        "GET",
171        &super::auth::build_effective_request_uri(
172            &headers,
173            state.bind_addr,
174            &axum::http::Uri::from_static("/mcp/ws"),
175            false,
176        ),
177        query.session_id.as_deref(),
178    )
179    .await
180    {
181        Ok(claims) => claims,
182        Err(response) => return response,
183    };
184
185    // SECURITY (FIND-R53-WS-002, FIND-R159-003): Validate agent identity at upgrade time.
186    // Parity with HTTP POST (handlers.rs:160) and GET (handlers.rs:2871).
187    // FIND-R159-003: Identity MUST be stored in session (was previously discarded with `_`).
188    let agent_identity = match validate_agent_identity(&state, &headers).await {
189        Ok(identity) => identity,
190        Err(response) => return response,
191    };
192
193    // SECURITY (FIND-R55-WS-004, FIND-R81-001): Validate session_id length and
194    // control/format characters from query parameter. Parity with HTTP POST/GET
195    // handlers (handlers.rs:154, handlers.rs:2928) which reject control chars.
196    // SECURITY (FIND-R81-WS-001): Also reject Unicode format characters (zero-width,
197    // bidi overrides, BOM) that can bypass string-based security checks.
198    let ws_session_id = query.session_id.as_deref().filter(|id| {
199        !id.is_empty()
200            && id.len() <= 128
201            && !id
202                .chars()
203                .any(|c| c.is_control() || is_unicode_format_char_ws(c))
204    });
205
206    // 3. Get or create session
207    let session_id = state.sessions.get_or_create(ws_session_id);
208
209    // SECURITY (FIND-R53-WS-003): Session ownership binding — prevent session fixation.
210    // Parity with HTTP GET (handlers.rs:2914-2953).
211    if let Some(ref claims) = oauth_claims {
212        if let Some(mut session) = state.sessions.get_mut(&session_id) {
213            match &session.oauth_subject {
214                Some(owner) if owner != &claims.sub => {
215                    tracing::warn!(
216                        session_id = %session_id,
217                        owner = %owner,
218                        requester = %claims.sub,
219                        "WS upgrade rejected: session owned by different principal"
220                    );
221                    return axum::response::IntoResponse::into_response((
222                        axum::http::StatusCode::FORBIDDEN,
223                        axum::Json(json!({
224                            "error": "Session belongs to another principal"
225                        })),
226                    ));
227                }
228                None => {
229                    // Bind session to this OAuth subject
230                    session.oauth_subject = Some(claims.sub.clone());
231                    // SECURITY (FIND-R73-SRV-006): Store token expiry, matching
232                    // HTTP POST handler pattern to enforce token lifetime.
233                    if claims.exp > 0 {
234                        session.token_expires_at = Some(claims.exp);
235                    }
236                }
237                _ => {
238                    // Already owned by this principal — use earliest expiry
239                    // SECURITY (FIND-R73-SRV-006): Parity with HTTP POST handler
240                    // (R23-PROXY-6) — prevent long-lived tokens from extending
241                    // sessions originally bound to short-lived tokens.
242                    if claims.exp > 0 {
243                        session.token_expires_at = Some(
244                            session
245                                .token_expires_at
246                                .map_or(claims.exp, |existing| existing.min(claims.exp)),
247                        );
248                    }
249                }
250            }
251        }
252    }
253
254    // SECURITY (FIND-R159-003): Store agent identity in session — parity with HTTP
255    // POST (handlers.rs:295-298) and GET (handlers.rs:3641-3643). Without this,
256    // ABAC policies referencing agent_identity would evaluate against None for
257    // WebSocket connections, creating a policy bypass.
258    if let Some(ref identity) = agent_identity {
259        if let Some(mut session) = state.sessions.get_mut(&session_id) {
260            session.agent_identity = Some(identity.clone());
261        }
262    }
263
264    // SECURITY (FIND-R46-006): Validate and extract call chain from upgrade headers.
265    // The call chain is synced once during upgrade and reused for all messages
266    // in this WebSocket connection.
267    if let Err(reason) = super::call_chain::validate_call_chain_header(&headers, &state.limits) {
268        tracing::warn!(
269            session_id = %session_id,
270            "WS upgrade rejected: invalid call chain header: {}",
271            reason
272        );
273        return axum::response::IntoResponse::into_response((
274            axum::http::StatusCode::BAD_REQUEST,
275            axum::Json(json!({
276                "error": "Invalid request"
277            })),
278        ));
279    }
280    sync_session_call_chain_from_headers(
281        &state.sessions,
282        &session_id,
283        &headers,
284        state.call_chain_hmac_key.as_ref(),
285        &state.limits,
286    );
287
288    let ws_config = state.ws_config.clone().unwrap_or_default();
289
290    // Phase 28: Extract W3C Trace Context from the HTTP upgrade request headers.
291    // The trace_id is used for correlating all audit entries during this WS session.
292    let trace_ctx = super::trace_propagation::extract_trace_context(&headers);
293    let ws_trace_id = trace_ctx
294        .trace_id
295        .clone()
296        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string().replace('-', ""));
297
298    tracing::info!(
299        session_id = %session_id,
300        trace_id = %ws_trace_id,
301        peer = %addr,
302        "WebSocket upgrade accepted"
303    );
304
305    // 4. Configure and upgrade
306    ws.max_message_size(ws_config.max_message_size)
307        .on_upgrade(move |socket| {
308            handle_ws_connection(socket, state, session_id, ws_config, addr, ws_trace_id)
309        })
310}
311
312/// Handle an established WebSocket connection.
313///
314/// Establishes an upstream WS connection, then relays messages bidirectionally
315/// with policy enforcement on client→upstream messages and DLP/injection
316/// scanning on upstream→client messages.
317async fn handle_ws_connection(
318    client_ws: WebSocket,
319    state: ProxyState,
320    session_id: String,
321    ws_config: WebSocketConfig,
322    peer_addr: SocketAddr,
323    trace_id: String,
324) {
325    record_ws_connection();
326    let start = std::time::Instant::now();
327    tracing::debug!(
328        session_id = %session_id,
329        trace_id = %trace_id,
330        "WebSocket connection established with trace context"
331    );
332
333    // Connect to upstream — use gateway default backend if configured
334    let upstream_url = if let Some(ref gw) = state.gateway {
335        match gw.route("") {
336            Some(d) => convert_to_ws_url(&d.upstream_url),
337            None => {
338                tracing::error!(session_id = %session_id, "No healthy upstream for WebSocket");
339                let (mut client_sink, _) = client_ws.split();
340                let _ = client_sink
341                    .send(Message::Close(Some(CloseFrame {
342                        code: CLOSE_POLICY_VIOLATION,
343                        reason: "No healthy upstream available".into(),
344                    })))
345                    .await;
346                return;
347            }
348        }
349    } else {
350        convert_to_ws_url(&state.upstream_url)
351    };
352    let upstream_ws = match connect_upstream_ws(&upstream_url).await {
353        Ok(ws) => ws,
354        Err(e) => {
355            tracing::error!(
356                session_id = %session_id,
357                "Failed to connect to upstream WebSocket: {}",
358                e
359            );
360            // Send close frame to client
361            let (mut client_sink, _) = client_ws.split();
362            let _ = client_sink
363                .send(Message::Close(Some(CloseFrame {
364                    code: CLOSE_POLICY_VIOLATION,
365                    reason: "Upstream connection failed".into(),
366                })))
367                .await;
368            return;
369        }
370    };
371
372    let (client_sink, client_stream) = client_ws.split();
373    let (upstream_sink, upstream_stream) = upstream_ws.split();
374
375    // Wrap sinks in Arc<Mutex> for shared access
376    let client_sink = Arc::new(Mutex::new(client_sink));
377    let upstream_sink = Arc::new(Mutex::new(upstream_sink));
378
379    // Rate limiter state: track messages in the current second window
380    let rate_counter = Arc::new(AtomicU64::new(0));
381    let rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
382
383    // SECURITY (FIND-R46-WS-003): Separate rate limiter for upstream→client direction
384    let upstream_rate_counter = Arc::new(AtomicU64::new(0));
385    let upstream_rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
386
387    let idle_timeout = Duration::from_secs(ws_config.idle_timeout_secs);
388
389    // SECURITY (FIND-R182-001): Shared last-activity tracker so idle timeout resets
390    // on every message (true idle detection, not max-lifetime).
391    let last_activity = Arc::new(AtomicU64::new(0));
392    let connection_epoch = std::time::Instant::now();
393
394    // Client → Vellaveto → Upstream relay
395    let client_to_upstream = {
396        let state = state.clone();
397        let session_id = session_id.clone();
398        let client_sink = client_sink.clone();
399        let upstream_sink = upstream_sink.clone();
400        let rate_counter = rate_counter.clone();
401        let rate_window_start = rate_window_start.clone();
402        let ws_config = ws_config.clone();
403        let last_activity = last_activity.clone();
404
405        relay_client_to_upstream(
406            client_stream,
407            client_sink,
408            upstream_sink,
409            state,
410            session_id,
411            ws_config,
412            rate_counter,
413            rate_window_start,
414            last_activity,
415            connection_epoch,
416        )
417    };
418
419    // Upstream → Vellaveto → Client relay
420    let upstream_to_client = {
421        let state = state.clone();
422        let session_id = session_id.clone();
423        let client_sink = client_sink.clone();
424        let ws_config = ws_config.clone();
425        let last_activity = last_activity.clone();
426
427        relay_upstream_to_client(
428            upstream_stream,
429            client_sink,
430            state,
431            session_id,
432            ws_config,
433            upstream_rate_counter,
434            upstream_rate_window_start,
435            last_activity,
436            connection_epoch,
437        )
438    };
439
440    // SECURITY (FIND-R182-001): True idle timeout — check periodically and
441    // close only if no message activity since last check.
442    let idle_check = {
443        let session_id = session_id.clone();
444        let last_activity = last_activity.clone();
445        async move {
446            // Check every 10% of idle timeout (min 1s) for responsive detection.
447            let check_interval = Duration::from_secs((ws_config.idle_timeout_secs / 10).max(1));
448            let mut interval = tokio::time::interval(check_interval);
449            interval.tick().await; // first tick is immediate, skip it
450            loop {
451                interval.tick().await;
452                let last_ms = last_activity.load(Ordering::Relaxed);
453                // SECURITY (FIND-R190-002): Use saturating_sub to prevent underflow
454                // if Relaxed ordering causes a stale last_ms value.
455                let elapsed_since_activity =
456                    (connection_epoch.elapsed().as_millis() as u64).saturating_sub(last_ms);
457                if elapsed_since_activity >= idle_timeout.as_millis() as u64 {
458                    tracing::info!(
459                        session_id = %session_id,
460                        idle_secs = elapsed_since_activity / 1000,
461                        "WebSocket idle timeout ({}s), closing",
462                        ws_config.idle_timeout_secs
463                    );
464                    break;
465                }
466            }
467        }
468    };
469
470    // Run both relay loops with idle timeout
471    tokio::select! {
472        _ = client_to_upstream => {
473            tracing::debug!(session_id = %session_id, "Client stream ended");
474        }
475        _ = upstream_to_client => {
476            tracing::debug!(session_id = %session_id, "Upstream stream ended");
477        }
478        _ = idle_check => {}
479    }
480
481    // Clean shutdown: close both sides
482    {
483        let mut sink = client_sink.lock().await;
484        let _ = sink
485            .send(Message::Close(Some(CloseFrame {
486                code: CLOSE_NORMAL,
487                reason: "Session ended".into(),
488            })))
489            .await;
490    }
491    {
492        let mut sink = upstream_sink.lock().await;
493        let _ = sink.close().await;
494    }
495
496    let duration = start.elapsed();
497    metrics::histogram!("vellaveto_ws_connection_duration_seconds").record(duration.as_secs_f64());
498
499    tracing::info!(
500        session_id = %session_id,
501        peer = %peer_addr,
502        duration_secs = duration.as_secs(),
503        "WebSocket connection closed"
504    );
505}
506
507/// Relay messages from client to upstream with policy enforcement.
508#[allow(clippy::too_many_arguments)]
509#[allow(deprecated)] // evaluate_action_with_context: migration tracked in FIND-CREATIVE-005
510async fn relay_client_to_upstream(
511    mut client_stream: futures_util::stream::SplitStream<WebSocket>,
512    client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
513    upstream_sink: Arc<
514        Mutex<
515            futures_util::stream::SplitSink<
516                tokio_tungstenite::WebSocketStream<
517                    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
518                >,
519                tokio_tungstenite::tungstenite::Message,
520            >,
521        >,
522    >,
523    state: ProxyState,
524    session_id: String,
525    ws_config: WebSocketConfig,
526    rate_counter: Arc<AtomicU64>,
527    rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
528    last_activity: Arc<AtomicU64>,
529    connection_epoch: std::time::Instant,
530) {
531    while let Some(msg_result) = client_stream.next().await {
532        let msg = match msg_result {
533            Ok(m) => m,
534            Err(e) => {
535                tracing::debug!(session_id = %session_id, "Client WS error: {}", e);
536                break;
537            }
538        };
539
540        // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
541        last_activity.store(
542            connection_epoch.elapsed().as_millis() as u64,
543            Ordering::Relaxed,
544        );
545
546        record_ws_message("client_to_upstream");
547
548        // SECURITY (FIND-R52-WS-003): Per-message OAuth token expiry check.
549        // After WebSocket upgrade, the HTTP auth middleware no longer runs.
550        // A token that expires mid-connection must be rejected to prevent
551        // indefinite access via a long-lived WebSocket.
552        {
553            let token_expired = state
554                .sessions
555                .get_mut(&session_id)
556                .and_then(|s| {
557                    s.token_expires_at.map(|exp| {
558                        let now = std::time::SystemTime::now()
559                            .duration_since(std::time::UNIX_EPOCH)
560                            .unwrap_or_default()
561                            .as_secs();
562                        now >= exp
563                    })
564                })
565                .unwrap_or(false);
566            if token_expired {
567                tracing::warn!(
568                    session_id = %session_id,
569                    "SECURITY: OAuth token expired during WebSocket session, closing"
570                );
571                let error = json!({
572                    "jsonrpc": "2.0",
573                    "error": {
574                        "code": -32001,
575                        "message": "Session expired"
576                    },
577                    "id": null
578                });
579                let error_text = serde_json::to_string(&error)
580                    .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32001,"message":"Session expired"},"id":null}"#.to_string());
581                let mut sink = client_sink.lock().await;
582                let _ = sink.send(Message::Text(error_text.into())).await;
583                let _ = sink
584                    .send(Message::Close(Some(CloseFrame {
585                        code: CLOSE_POLICY_VIOLATION,
586                        reason: "Token expired".into(),
587                    })))
588                    .await;
589                break;
590            }
591        }
592
593        match msg {
594            Message::Text(text) => {
595                // Rate limiting
596                if !check_rate_limit(
597                    &rate_counter,
598                    &rate_window_start,
599                    ws_config.message_rate_limit,
600                ) {
601                    tracing::warn!(
602                        session_id = %session_id,
603                        "WebSocket rate limit exceeded, closing"
604                    );
605                    let mut sink = client_sink.lock().await;
606                    let _ = sink
607                        .send(Message::Close(Some(CloseFrame {
608                            code: CLOSE_POLICY_VIOLATION,
609                            reason: "Rate limit exceeded".into(),
610                        })))
611                        .await;
612                    break;
613                }
614
615                // SECURITY (FIND-R46-005): Reject JSON with duplicate keys before parsing.
616                // Prevents parser-disagreement attacks (CVE-2017-12635, CVE-2020-16250)
617                // where the proxy evaluates one key value but upstream sees another.
618                if let Some(dup_key) = vellaveto_mcp::framing::find_duplicate_json_key(&text) {
619                    tracing::warn!(
620                        session_id = %session_id,
621                        "SECURITY: Rejected WS message with duplicate key: \"{}\"",
622                        dup_key
623                    );
624                    let mut sink = client_sink.lock().await;
625                    let _ = sink
626                        .send(Message::Close(Some(CloseFrame {
627                            code: CLOSE_POLICY_VIOLATION,
628                            reason: "Duplicate JSON key detected".into(),
629                        })))
630                        .await;
631                    break;
632                }
633
634                // SECURITY (FIND-R53-WS-004): Reject WS messages with control characters.
635                // Parity with HTTP GET event_id validation (handlers.rs:2899).
636                // Control chars in JSON-RPC messages can be used for log injection
637                // or to bypass string-based security checks.
638                if text.chars().any(|c| {
639                    // Allow standard JSON whitespace (\t, \n, \r) but reject other
640                    // ASCII control chars and Unicode format chars (FIND-R54-011).
641                    (c.is_control() && c != '\n' && c != '\r' && c != '\t')
642                        || is_unicode_format_char_ws(c)
643                }) {
644                    tracing::warn!(
645                        session_id = %session_id,
646                        "SECURITY: Rejected WS message with control characters"
647                    );
648                    let error =
649                        make_ws_error_response(None, -32600, "Message contains control characters");
650                    let mut sink = client_sink.lock().await;
651                    let _ = sink.send(Message::Text(error.into())).await;
652                    continue;
653                }
654
655                // Parse JSON
656                let parsed: Value = match serde_json::from_str(&text) {
657                    Ok(v) => v,
658                    Err(_) => {
659                        tracing::warn!(
660                            session_id = %session_id,
661                            "Unparseable JSON in WebSocket text frame, closing (fail-closed)"
662                        );
663                        let mut sink = client_sink.lock().await;
664                        let _ = sink
665                            .send(Message::Close(Some(CloseFrame {
666                                code: CLOSE_POLICY_VIOLATION,
667                                reason: "Invalid JSON".into(),
668                            })))
669                            .await;
670                        break;
671                    }
672                };
673
674                // SECURITY (FIND-R46-WS-001): Injection scanning on client→upstream text frames.
675                // The HTTP proxy scans request bodies for injection; the WebSocket proxy must
676                // do the same to maintain security parity. Fail-closed: if injection is detected
677                // and blocking is enabled, deny the message.
678                if !state.injection_disabled {
679                    let scannable = extract_scannable_text_from_request(&parsed);
680                    if !scannable.is_empty() {
681                        let injection_matches: Vec<String> =
682                            if let Some(ref scanner) = state.injection_scanner {
683                                scanner
684                                    .inspect(&scannable)
685                                    .into_iter()
686                                    .map(|s| s.to_string())
687                                    .collect()
688                            } else {
689                                inspect_for_injection(&scannable)
690                                    .into_iter()
691                                    .map(|s| s.to_string())
692                                    .collect()
693                            };
694
695                        if !injection_matches.is_empty() {
696                            tracing::warn!(
697                                "SECURITY: Injection in WS client request! Session: {}, Patterns: {:?}",
698                                session_id,
699                                injection_matches,
700                            );
701
702                            let verdict = if state.injection_blocking {
703                                Verdict::Deny {
704                                    reason: format!(
705                                        "WS request injection blocked: {:?}",
706                                        injection_matches
707                                    ),
708                                }
709                            } else {
710                                Verdict::Allow
711                            };
712
713                            let action = Action::new(
714                                "vellaveto",
715                                "ws_request_injection",
716                                json!({
717                                    "matched_patterns": injection_matches,
718                                    "session": session_id,
719                                    "transport": "websocket",
720                                    "direction": "client_to_upstream",
721                                }),
722                            );
723                            if let Err(e) = state
724                                .audit
725                                .log_entry(
726                                    &action,
727                                    &verdict,
728                                    json!({
729                                        "source": "ws_proxy",
730                                        "event": "ws_request_injection_detected",
731                                    }),
732                                )
733                                .await
734                            {
735                                tracing::warn!("Failed to audit WS request injection: {}", e);
736                            }
737
738                            if state.injection_blocking {
739                                let id = parsed.get("id");
740                                let error = make_ws_error_response(
741                                    id,
742                                    -32001,
743                                    "Request blocked: injection detected",
744                                );
745                                let mut sink = client_sink.lock().await;
746                                let _ = sink.send(Message::Text(error.into())).await;
747                                continue;
748                            }
749                        }
750                    }
751                }
752
753                // Classify and evaluate
754                let classified = extractor::classify_message(&parsed);
755                match classified {
756                    MessageType::ToolCall {
757                        ref id,
758                        ref tool_name,
759                        ref arguments,
760                    } => {
761                        // SECURITY (FIND-R46-009): Strict tool name validation (MCP 2025-11-25).
762                        // When enabled, reject tool names that don't conform to the spec format.
763                        if state.streamable_http.strict_tool_name_validation {
764                            if let Err(e) = vellaveto_types::validate_mcp_tool_name(tool_name) {
765                                tracing::warn!(
766                                    session_id = %session_id,
767                                    "SECURITY: Rejecting invalid WS tool name '{}': {}",
768                                    tool_name,
769                                    e
770                                );
771                                let error =
772                                    make_ws_error_response(Some(id), -32602, "Invalid tool name");
773                                let mut sink = client_sink.lock().await;
774                                let _ = sink.send(Message::Text(error.into())).await;
775                                continue;
776                            }
777                        }
778
779                        let mut action = extractor::extract_action(tool_name, arguments);
780
781                        // SECURITY (FIND-R75-002): DNS resolution for IP-based policy evaluation.
782                        // Parity with HTTP handler (handlers.rs:717). Without this, policies
783                        // using ip_rules are completely bypassed on the WebSocket transport.
784                        if state.engine.has_ip_rules() {
785                            super::helpers::resolve_domains(&mut action).await;
786                        }
787
788                        // SECURITY (FIND-R46-006): Call chain validation and privilege escalation check.
789                        // Extract X-Upstream-Agents from the initial WS upgrade headers stored in session.
790                        // For WebSocket, we sync the call chain once during upgrade and reuse it.
791                        let upstream_chain = {
792                            let session_ref = state.sessions.get_mut(&session_id);
793                            session_ref
794                                .map(|s| s.current_call_chain.clone())
795                                .unwrap_or_default()
796                        };
797                        let current_agent_id = {
798                            let session_ref = state.sessions.get_mut(&session_id);
799                            session_ref.and_then(|s| s.oauth_subject.clone())
800                        };
801
802                        // SECURITY (FIND-R46-006): Privilege escalation detection.
803                        if !upstream_chain.is_empty() {
804                            let priv_check = check_privilege_escalation(
805                                &state.engine,
806                                &state.policies,
807                                &action,
808                                &upstream_chain,
809                                current_agent_id.as_deref(),
810                            );
811                            if priv_check.escalation_detected {
812                                let verdict = Verdict::Deny {
813                                    reason: format!(
814                                        "Privilege escalation: agent '{}' would be denied",
815                                        priv_check
816                                            .escalating_from_agent
817                                            .as_deref()
818                                            .unwrap_or("unknown")
819                                    ),
820                                };
821                                if let Err(e) = state
822                                    .audit
823                                    .log_entry(
824                                        &action,
825                                        &verdict,
826                                        json!({
827                                            "source": "ws_proxy",
828                                            "session": session_id,
829                                            "transport": "websocket",
830                                            "event": "privilege_escalation_blocked",
831                                            "escalating_from_agent": priv_check.escalating_from_agent,
832                                            "upstream_deny_reason": priv_check.upstream_deny_reason,
833                                        }),
834                                    )
835                                    .await
836                                {
837                                    tracing::warn!(
838                                        "Failed to audit WS privilege escalation: {}",
839                                        e
840                                    );
841                                }
842                                let error =
843                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
844                                let mut sink = client_sink.lock().await;
845                                let _ = sink.send(Message::Text(error.into())).await;
846                                continue;
847                            }
848                        }
849
850                        // SECURITY (FIND-R46-007): Rug-pull detection.
851                        // Block calls to tools whose annotations changed since initial tools/list.
852                        let is_flagged = state
853                            .sessions
854                            .get_mut(&session_id)
855                            .map(|s| s.flagged_tools.contains(tool_name))
856                            .unwrap_or(false);
857                        if is_flagged {
858                            let verdict = Verdict::Deny {
859                                reason: format!(
860                                    "Tool '{}' blocked: annotations changed (rug-pull detected)",
861                                    tool_name
862                                ),
863                            };
864                            if let Err(e) = state
865                                .audit
866                                .log_entry(
867                                    &action,
868                                    &verdict,
869                                    json!({
870                                        "source": "ws_proxy",
871                                        "session": session_id,
872                                        "transport": "websocket",
873                                        "event": "rug_pull_tool_blocked",
874                                        "tool": tool_name,
875                                    }),
876                                )
877                                .await
878                            {
879                                tracing::warn!("Failed to audit WS rug-pull block: {}", e);
880                            }
881                            let error =
882                                make_ws_error_response(Some(id), -32001, "Denied by policy");
883                            let mut sink = client_sink.lock().await;
884                            let _ = sink.send(Message::Text(error.into())).await;
885                            continue;
886                        }
887
888                        // SECURITY (FIND-R52-WS-001): DLP scan parameters for secret exfiltration.
889                        // Matches HTTP handler's DLP check to maintain security parity.
890                        {
891                            let dlp_findings = scan_parameters_for_secrets(arguments);
892                            // SECURITY (FIND-R55-WS-001): DLP on request params always blocks,
893                            // matching HTTP handler. Previously gated on injection_blocking flag.
894                            if !dlp_findings.is_empty() {
895                                for finding in &dlp_findings {
896                                    record_dlp_finding(&finding.pattern_name);
897                                }
898                                let patterns: Vec<String> = dlp_findings
899                                    .iter()
900                                    .map(|f| format!("{} at {}", f.pattern_name, f.location))
901                                    .collect();
902                                let audit_reason = format!(
903                                    "DLP: secrets detected in tool parameters: {:?}",
904                                    patterns
905                                );
906                                tracing::warn!(
907                                    "SECURITY: DLP blocking WS tool '{}' in session {}: {}",
908                                    tool_name,
909                                    session_id,
910                                    audit_reason
911                                );
912                                let dlp_action = extractor::extract_action(tool_name, arguments);
913                                if let Err(e) = state
914                                    .audit
915                                    .log_entry(
916                                        &dlp_action,
917                                        &Verdict::Deny {
918                                            reason: audit_reason,
919                                        },
920                                        json!({
921                                            "source": "ws_proxy",
922                                            "session": session_id,
923                                            "transport": "websocket",
924                                            "event": "dlp_secret_blocked",
925                                            "tool": tool_name,
926                                            "findings": patterns,
927                                        }),
928                                    )
929                                    .await
930                                {
931                                    tracing::warn!("Failed to audit WS DLP finding: {}", e);
932                                }
933                                let error = make_ws_error_response(
934                                    Some(id),
935                                    -32001,
936                                    "Request blocked: security policy violation",
937                                );
938                                let mut sink = client_sink.lock().await;
939                                let _ = sink.send(Message::Text(error.into())).await;
940                                continue;
941                            }
942                        }
943
944                        // SECURITY (FIND-R52-WS-002): Memory poisoning detection.
945                        // Check if tool call parameters contain replayed response data,
946                        // matching the HTTP handler's memory poisoning check.
947                        {
948                            let poisoning_detected = state
949                                .sessions
950                                .get_mut(&session_id)
951                                .and_then(|session| {
952                                    let matches =
953                                        session.memory_tracker.check_parameters(arguments);
954                                    if !matches.is_empty() {
955                                        for m in &matches {
956                                            tracing::warn!(
957                                                "SECURITY: Memory poisoning detected in WS tool '{}' (session {}): \
958                                                 param '{}' contains replayed data (fingerprint: {})",
959                                                tool_name,
960                                                session_id,
961                                                m.param_location,
962                                                m.fingerprint
963                                            );
964                                        }
965                                        Some(matches.len())
966                                    } else {
967                                        None
968                                    }
969                                });
970                            if let Some(match_count) = poisoning_detected {
971                                let poison_action = extractor::extract_action(tool_name, arguments);
972                                let deny_reason = format!(
973                                    "Memory poisoning detected: {} replayed data fragment(s) in tool '{}'",
974                                    match_count, tool_name
975                                );
976                                if let Err(e) = state
977                                    .audit
978                                    .log_entry(
979                                        &poison_action,
980                                        &Verdict::Deny {
981                                            reason: deny_reason,
982                                        },
983                                        json!({
984                                            "source": "ws_proxy",
985                                            "session": session_id,
986                                            "transport": "websocket",
987                                            "event": "memory_poisoning_detected",
988                                            "matches": match_count,
989                                            "tool": tool_name,
990                                        }),
991                                    )
992                                    .await
993                                {
994                                    tracing::warn!("Failed to audit WS memory poisoning: {}", e);
995                                }
996                                let error = make_ws_error_response(
997                                    Some(id),
998                                    -32001,
999                                    "Request blocked: security policy violation",
1000                                );
1001                                let mut sink = client_sink.lock().await;
1002                                let _ = sink.send(Message::Text(error.into())).await;
1003                                continue;
1004                            }
1005                        }
1006
1007                        // SECURITY (FIND-R46-008): Circuit breaker check.
1008                        // If the circuit is open for this tool, reject immediately.
1009                        if let Some(ref circuit_breaker) = state.circuit_breaker {
1010                            if let Err(reason) = circuit_breaker.can_proceed(tool_name) {
1011                                tracing::warn!(
1012                                    session_id = %session_id,
1013                                    "SECURITY: WS circuit breaker open for tool '{}': {}",
1014                                    tool_name,
1015                                    reason
1016                                );
1017                                let verdict = Verdict::Deny {
1018                                    reason: format!("Circuit breaker open: {}", reason),
1019                                };
1020                                if let Err(e) = state
1021                                    .audit
1022                                    .log_entry(
1023                                        &action,
1024                                        &verdict,
1025                                        json!({
1026                                            "source": "ws_proxy",
1027                                            "session": session_id,
1028                                            "transport": "websocket",
1029                                            "event": "circuit_breaker_rejected",
1030                                            "tool": tool_name,
1031                                        }),
1032                                    )
1033                                    .await
1034                                {
1035                                    tracing::warn!(
1036                                        "Failed to audit WS circuit breaker rejection: {}",
1037                                        e
1038                                    );
1039                                }
1040                                let error = make_ws_error_response(
1041                                    Some(id),
1042                                    -32001,
1043                                    "Service temporarily unavailable",
1044                                );
1045                                let mut sink = client_sink.lock().await;
1046                                let _ = sink.send(Message::Text(error.into())).await;
1047                                continue;
1048                            }
1049                        }
1050
1051                        // SECURITY (FIND-R46-013): Tool registry trust check.
1052                        // If tool registry is configured, check trust level before evaluation.
1053                        if let Some(ref registry) = state.tool_registry {
1054                            let trust = registry.check_trust_level(tool_name).await;
1055                            match trust {
1056                                vellaveto_mcp::tool_registry::TrustLevel::Unknown => {
1057                                    registry.register_unknown(tool_name).await;
1058                                    let verdict = Verdict::Deny {
1059                                        reason: "Unknown tool requires approval".to_string(),
1060                                    };
1061                                    if let Err(e) = state
1062                                        .audit
1063                                        .log_entry(
1064                                            &action,
1065                                            &verdict,
1066                                            json!({
1067                                                "source": "ws_proxy",
1068                                                "session": session_id,
1069                                                "transport": "websocket",
1070                                                "registry": "unknown_tool",
1071                                                "tool": tool_name,
1072                                            }),
1073                                        )
1074                                        .await
1075                                    {
1076                                        tracing::warn!("Failed to audit WS unknown tool: {}", e);
1077                                    }
1078                                    let approval_reason = "Approval required";
1079                                    let approval_id = create_ws_approval(
1080                                        &state,
1081                                        &session_id,
1082                                        &action,
1083                                        approval_reason,
1084                                    )
1085                                    .await;
1086                                    let error = make_ws_error_response_with_data(
1087                                        Some(id),
1088                                        -32001,
1089                                        approval_reason,
1090                                        Some(json!({
1091                                            "verdict": "require_approval",
1092                                            "reason": approval_reason,
1093                                            "approval_id": approval_id,
1094                                        })),
1095                                    );
1096                                    let mut sink = client_sink.lock().await;
1097                                    let _ = sink.send(Message::Text(error.into())).await;
1098                                    continue;
1099                                }
1100                                vellaveto_mcp::tool_registry::TrustLevel::Untrusted {
1101                                    score: _,
1102                                } => {
1103                                    let verdict = Verdict::Deny {
1104                                        reason: "Untrusted tool requires approval".to_string(),
1105                                    };
1106                                    if let Err(e) = state
1107                                        .audit
1108                                        .log_entry(
1109                                            &action,
1110                                            &verdict,
1111                                            json!({
1112                                                "source": "ws_proxy",
1113                                                "session": session_id,
1114                                                "transport": "websocket",
1115                                                "registry": "untrusted_tool",
1116                                                "tool": tool_name,
1117                                            }),
1118                                        )
1119                                        .await
1120                                    {
1121                                        tracing::warn!("Failed to audit WS untrusted tool: {}", e);
1122                                    }
1123                                    let approval_reason = "Approval required";
1124                                    let approval_id = create_ws_approval(
1125                                        &state,
1126                                        &session_id,
1127                                        &action,
1128                                        approval_reason,
1129                                    )
1130                                    .await;
1131                                    let error = make_ws_error_response_with_data(
1132                                        Some(id),
1133                                        -32001,
1134                                        approval_reason,
1135                                        Some(json!({
1136                                            "verdict": "require_approval",
1137                                            "reason": approval_reason,
1138                                            "approval_id": approval_id,
1139                                        })),
1140                                    );
1141                                    let mut sink = client_sink.lock().await;
1142                                    let _ = sink.send(Message::Text(error.into())).await;
1143                                    continue;
1144                                }
1145                                vellaveto_mcp::tool_registry::TrustLevel::Trusted => {
1146                                    // Trusted — proceed to engine evaluation
1147                                }
1148                            }
1149                        }
1150
1151                        // SECURITY (FIND-R130-002): Combine context read, evaluation,
1152                        // and session update into a single block holding the DashMap
1153                        // shard lock. Without this, concurrent WS connections sharing
1154                        // a session can bypass max_calls_in_window by racing: both
1155                        // clone the same stale call_counts, both pass evaluation, both
1156                        // increment. Matches HTTP handler R19-TOCTOU pattern
1157                        // (handlers.rs:725-789).
1158                        let (verdict, ctx) = if let Some(mut session) =
1159                            state.sessions.get_mut(&session_id)
1160                        {
1161                            let ctx = EvaluationContext {
1162                                timestamp: None,
1163                                agent_id: session.oauth_subject.clone(),
1164                                agent_identity: session.agent_identity.clone(),
1165                                call_counts: session.call_counts.clone(),
1166                                previous_actions: session.action_history.iter().cloned().collect(),
1167                                call_chain: session.current_call_chain.clone(),
1168                                tenant_id: None,
1169                                verification_tier: None,
1170                                capability_token: None,
1171                                session_state: None,
1172                            };
1173
1174                            let verdict = match state.engine.evaluate_action_with_context(
1175                                &action,
1176                                &state.policies,
1177                                Some(&ctx),
1178                            ) {
1179                                Ok(v) => v,
1180                                Err(e) => {
1181                                    tracing::error!(
1182                                        session_id = %session_id,
1183                                        "Policy evaluation error: {}",
1184                                        e
1185                                    );
1186                                    Verdict::Deny {
1187                                        reason: format!("Policy evaluation failed: {}", e),
1188                                    }
1189                                }
1190                            };
1191
1192                            // Atomically update session on Allow while still holding
1193                            // the shard lock — prevents TOCTOU bypass of call limits.
1194                            if matches!(verdict, Verdict::Allow) {
1195                                session.touch();
1196                                use crate::proxy::call_chain::{
1197                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
1198                                };
1199                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
1200                                    || session.call_counts.contains_key(tool_name)
1201                                {
1202                                    let count = session
1203                                        .call_counts
1204                                        .entry(tool_name.to_string())
1205                                        .or_insert(0);
1206                                    *count = count.saturating_add(1);
1207                                }
1208                                if session.action_history.len() >= MAX_ACTION_HISTORY {
1209                                    session.action_history.pop_front();
1210                                }
1211                                session.action_history.push_back(tool_name.to_string());
1212                            }
1213
1214                            (verdict, ctx)
1215                        } else {
1216                            // No session — evaluate without context (fail-closed)
1217                            let verdict = match state.engine.evaluate_action_with_context(
1218                                &action,
1219                                &state.policies,
1220                                None,
1221                            ) {
1222                                Ok(v) => v,
1223                                Err(e) => {
1224                                    tracing::error!(
1225                                        session_id = %session_id,
1226                                        "Policy evaluation error: {}",
1227                                        e
1228                                    );
1229                                    Verdict::Deny {
1230                                        reason: format!("Policy evaluation failed: {}", e),
1231                                    }
1232                                }
1233                            };
1234                            (verdict, EvaluationContext::default())
1235                        };
1236
1237                        match verdict {
1238                            Verdict::Allow => {
1239                                // Phase 21: ABAC refinement — only runs when ABAC engine is configured
1240                                if let Some(ref abac) = state.abac_engine {
1241                                    let principal_id =
1242                                        ctx.agent_id.as_deref().unwrap_or("anonymous");
1243                                    let principal_type = ctx.principal_type();
1244                                    let session_risk = state
1245                                        .sessions
1246                                        .get_mut(&session_id)
1247                                        .and_then(|s| s.risk_score.clone());
1248                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
1249                                        eval_ctx: &ctx,
1250                                        principal_type,
1251                                        principal_id,
1252                                        risk_score: session_risk.as_ref(),
1253                                    };
1254                                    match abac.evaluate(&action, &abac_ctx) {
1255                                        vellaveto_engine::abac::AbacDecision::Deny {
1256                                            policy_id,
1257                                            reason,
1258                                        } => {
1259                                            let deny_verdict = Verdict::Deny {
1260                                                reason: reason.clone(),
1261                                            };
1262                                            if let Err(e) = state
1263                                                .audit
1264                                                .log_entry(
1265                                                    &action,
1266                                                    &deny_verdict,
1267                                                    json!({
1268                                                        "source": "ws_proxy",
1269                                                        "session": session_id,
1270                                                        "transport": "websocket",
1271                                                        "event": "abac_deny",
1272                                                        "abac_policy": policy_id,
1273                                                    }),
1274                                                )
1275                                                .await
1276                                            {
1277                                                tracing::warn!(
1278                                                    "Failed to audit WS ABAC deny: {}",
1279                                                    e
1280                                                );
1281                                            }
1282                                            // SECURITY (FIND-R46-012): Generic message to client;
1283                                            // detailed reason (ABAC policy_id, reason) is in
1284                                            // the audit log only.
1285                                            let error_resp = make_ws_error_response(
1286                                                Some(id),
1287                                                -32001,
1288                                                "Denied by policy",
1289                                            );
1290                                            let mut sink = client_sink.lock().await;
1291                                            let _ =
1292                                                sink.send(Message::Text(error_resp.into())).await;
1293                                            continue;
1294                                        }
1295                                        vellaveto_engine::abac::AbacDecision::Allow {
1296                                            policy_id,
1297                                        } => {
1298                                            if let Some(ref la) = state.least_agency {
1299                                                la.record_usage(
1300                                                    principal_id,
1301                                                    &session_id,
1302                                                    &policy_id,
1303                                                    tool_name,
1304                                                    &action.function,
1305                                                );
1306                                            }
1307                                        }
1308                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
1309                                            // Fall through — existing Allow stands
1310                                        }
1311                                        #[allow(unreachable_patterns)]
1312                                        // AbacDecision is #[non_exhaustive]
1313                                        _ => {
1314                                            // SECURITY (FIND-R74-002): Future variants — fail-closed (deny).
1315                                            // Must send deny and continue, not fall through to Allow path.
1316                                            tracing::warn!(
1317                                                "Unknown AbacDecision variant — fail-closed"
1318                                            );
1319                                            let error_resp = make_ws_error_response(
1320                                                Some(id),
1321                                                -32001,
1322                                                "Denied by policy",
1323                                            );
1324                                            let mut sink = client_sink.lock().await;
1325                                            let _ =
1326                                                sink.send(Message::Text(error_resp.into())).await;
1327                                            continue;
1328                                        }
1329                                    }
1330                                }
1331
1332                                // SECURITY (FIND-R46-013): Record tool call in registry on Allow
1333                                if let Some(ref registry) = state.tool_registry {
1334                                    registry.record_call(tool_name).await;
1335                                }
1336
1337                                // NOTE: Session touch + call_counts/action_history
1338                                // update already performed inside the TOCTOU-safe
1339                                // block above (FIND-R130-002). No separate update here.
1340
1341                                // Audit the allow
1342                                if let Err(e) = state
1343                                    .audit
1344                                    .log_entry(
1345                                        &action,
1346                                        &Verdict::Allow,
1347                                        json!({
1348                                            "source": "ws_proxy",
1349                                            "session": session_id,
1350                                            "transport": "websocket",
1351                                        }),
1352                                    )
1353                                    .await
1354                                {
1355                                    tracing::error!(
1356                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1357                                        e
1358                                    );
1359                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1360                                    // No unaudited security decisions can occur.
1361                                    if state.audit_strict_mode {
1362                                        let error = make_ws_error_response(
1363                                            Some(id),
1364                                            -32000,
1365                                            "Audit logging failed — request denied (strict audit mode)",
1366                                        );
1367                                        let mut sink = client_sink.lock().await;
1368                                        let _ = sink.send(Message::Text(error.into())).await;
1369                                        continue;
1370                                    }
1371                                }
1372
1373                                // Canonicalize and forward
1374                                let forward_text = if state.canonicalize {
1375                                    match serde_json::to_string(&parsed) {
1376                                        Ok(canonical) => canonical,
1377                                        Err(e) => {
1378                                            tracing::error!(
1379                                                "SECURITY: WS canonicalization failed: {}",
1380                                                e
1381                                            );
1382                                            let error_resp = make_ws_error_response(
1383                                                Some(id),
1384                                                -32603,
1385                                                "Internal error",
1386                                            );
1387                                            let mut sink = client_sink.lock().await;
1388                                            let _ =
1389                                                sink.send(Message::Text(error_resp.into())).await;
1390                                            continue;
1391                                        }
1392                                    }
1393                                } else {
1394                                    text.to_string()
1395                                };
1396
1397                                // Track request→response mapping for output-schema
1398                                // enforcement when upstream omits result._meta.tool.
1399                                track_pending_tool_call(
1400                                    &state.sessions,
1401                                    &session_id,
1402                                    id,
1403                                    tool_name,
1404                                );
1405
1406                                let mut sink = upstream_sink.lock().await;
1407                                if let Err(e) = sink
1408                                    .send(tokio_tungstenite::tungstenite::Message::Text(
1409                                        forward_text.into(),
1410                                    ))
1411                                    .await
1412                                {
1413                                    tracing::error!(
1414                                        session_id = %session_id,
1415                                        "Failed to forward to upstream: {}",
1416                                        e
1417                                    );
1418                                    break;
1419                                }
1420                            }
1421                            Verdict::Deny { ref reason } => {
1422                                // Audit the denial with detailed reason
1423                                if let Err(e) = state
1424                                    .audit
1425                                    .log_entry(
1426                                        &action,
1427                                        &verdict,
1428                                        json!({
1429                                            "source": "ws_proxy",
1430                                            "session": session_id,
1431                                            "transport": "websocket",
1432                                        }),
1433                                    )
1434                                    .await
1435                                {
1436                                    tracing::error!(
1437                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1438                                        e
1439                                    );
1440                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1441                                    if state.audit_strict_mode {
1442                                        let error = make_ws_error_response(
1443                                            Some(id),
1444                                            -32000,
1445                                            "Audit logging failed — request denied (strict audit mode)",
1446                                        );
1447                                        let mut sink = client_sink.lock().await;
1448                                        let _ = sink.send(Message::Text(error.into())).await;
1449                                        continue;
1450                                    }
1451                                }
1452
1453                                // SECURITY (FIND-R46-012): Generic message to client.
1454                                // Detailed reason is in the audit log only.
1455                                let _ = reason; // used in audit above
1456                                let error =
1457                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
1458                                let mut sink = client_sink.lock().await;
1459                                let _ = sink.send(Message::Text(error.into())).await;
1460                            }
1461                            Verdict::RequireApproval { ref reason, .. } => {
1462                                // Treat as deny for audit, but preserve approval semantics.
1463                                let deny_reason = format!("Requires approval: {}", reason);
1464                                if let Err(e) = state
1465                                    .audit
1466                                    .log_entry(
1467                                        &action,
1468                                        &Verdict::Deny {
1469                                            reason: deny_reason.clone(),
1470                                        },
1471                                        json!({
1472                                            "source": "ws_proxy",
1473                                            "session": session_id,
1474                                            "transport": "websocket",
1475                                        }),
1476                                    )
1477                                    .await
1478                                {
1479                                    tracing::error!(
1480                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1481                                        e
1482                                    );
1483                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1484                                    if state.audit_strict_mode {
1485                                        let error = make_ws_error_response(
1486                                            Some(id),
1487                                            -32000,
1488                                            "Audit logging failed — request denied (strict audit mode)",
1489                                        );
1490                                        let mut sink = client_sink.lock().await;
1491                                        let _ = sink.send(Message::Text(error.into())).await;
1492                                        continue;
1493                                    }
1494                                }
1495                                let approval_reason = "Approval required";
1496                                let approval_id =
1497                                    create_ws_approval(&state, &session_id, &action, reason).await;
1498                                let error = make_ws_error_response_with_data(
1499                                    Some(id),
1500                                    -32001,
1501                                    approval_reason,
1502                                    Some(json!({
1503                                        "verdict": "require_approval",
1504                                        "reason": approval_reason,
1505                                        "approval_id": approval_id,
1506                                    })),
1507                                );
1508                                let mut sink = client_sink.lock().await;
1509                                let _ = sink.send(Message::Text(error.into())).await;
1510                            }
1511                            // Fail-closed: unknown Verdict variants produce Deny
1512                            _ => {
1513                                let error =
1514                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
1515                                let mut sink = client_sink.lock().await;
1516                                let _ = sink.send(Message::Text(error.into())).await;
1517                            }
1518                        }
1519                    }
1520                    MessageType::ResourceRead { ref id, ref uri } => {
1521                        // SECURITY (FIND-R74-007): Check for memory poisoning in resource URI.
1522                        // ResourceRead is a likely exfiltration vector: a poisoned tool response
1523                        // says "read this file" and the agent issues resources/read for that URI.
1524                        // Parity with HTTP handler (handlers.rs:1472).
1525                        {
1526                            let poisoning_detected = state
1527                                .sessions
1528                                .get_mut(&session_id)
1529                                .and_then(|session| {
1530                                    let uri_params = json!({"uri": uri});
1531                                    let matches =
1532                                        session.memory_tracker.check_parameters(&uri_params);
1533                                    if !matches.is_empty() {
1534                                        for m in &matches {
1535                                            tracing::warn!(
1536                                                "SECURITY: Memory poisoning detected in WS resources/read (session {}): \
1537                                                 param '{}' contains replayed data (fingerprint: {})",
1538                                                session_id,
1539                                                m.param_location,
1540                                                m.fingerprint
1541                                            );
1542                                        }
1543                                        Some(matches.len())
1544                                    } else {
1545                                        None
1546                                    }
1547                                });
1548                            if let Some(match_count) = poisoning_detected {
1549                                let poison_action = extractor::extract_resource_action(uri);
1550                                let deny_reason = format!(
1551                                    "Memory poisoning detected: {} replayed data fragment(s) in resources/read",
1552                                    match_count
1553                                );
1554                                if let Err(e) = state
1555                                    .audit
1556                                    .log_entry(
1557                                        &poison_action,
1558                                        &Verdict::Deny {
1559                                            reason: deny_reason.clone(),
1560                                        },
1561                                        json!({
1562                                            "source": "ws_proxy",
1563                                            "session": session_id,
1564                                            "transport": "websocket",
1565                                            "event": "memory_poisoning_detected",
1566                                            "matches": match_count,
1567                                            "uri": uri,
1568                                        }),
1569                                    )
1570                                    .await
1571                                {
1572                                    tracing::warn!(
1573                                        "Failed to audit WS resource memory poisoning: {}",
1574                                        e
1575                                    );
1576                                }
1577                                let error = make_ws_error_response(
1578                                    Some(id),
1579                                    -32001,
1580                                    "Request blocked: security policy violation",
1581                                );
1582                                let mut sink = client_sink.lock().await;
1583                                let _ = sink.send(Message::Text(error.into())).await;
1584                                continue;
1585                            }
1586                        }
1587
1588                        // SECURITY (FIND-R115-041): Rug-pull detection for resource URIs.
1589                        // If the upstream server was flagged (annotations changed since initial
1590                        // tools/list), block resource reads from that server.
1591                        // Parity with HTTP handler (handlers.rs:1555).
1592                        {
1593                            let is_flagged = state
1594                                .sessions
1595                                .get_mut(&session_id)
1596                                .map(|s| s.flagged_tools.contains(uri.as_str()))
1597                                .unwrap_or(false);
1598                            if is_flagged {
1599                                let action = extractor::extract_resource_action(uri);
1600                                let verdict = Verdict::Deny {
1601                                    reason: format!(
1602                                        "Resource '{}' blocked: server flagged by rug-pull detection",
1603                                        uri
1604                                    ),
1605                                };
1606                                if let Err(e) = state
1607                                    .audit
1608                                    .log_entry(
1609                                        &action,
1610                                        &verdict,
1611                                        json!({
1612                                            "source": "ws_proxy",
1613                                            "session": session_id,
1614                                            "transport": "websocket",
1615                                            "event": "rug_pull_resource_blocked",
1616                                            "uri": uri,
1617                                        }),
1618                                    )
1619                                    .await
1620                                {
1621                                    tracing::warn!(
1622                                        "Failed to audit WS resource rug-pull block: {}",
1623                                        e
1624                                    );
1625                                }
1626                                let error =
1627                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
1628                                let mut sink = client_sink.lock().await;
1629                                let _ = sink.send(Message::Text(error.into())).await;
1630                                continue;
1631                            }
1632                        }
1633
1634                        // Build action for resource read
1635                        let mut action = extractor::extract_resource_action(uri);
1636
1637                        // SECURITY (FIND-R75-002): DNS resolution for resource reads.
1638                        // Parity with HTTP handler (handlers.rs:1543).
1639                        if state.engine.has_ip_rules() {
1640                            super::helpers::resolve_domains(&mut action).await;
1641                        }
1642
1643                        // SECURITY (FIND-R116-004): DLP scan on resource URI.
1644                        // Parity with HTTP handler (handlers.rs:1598).
1645                        {
1646                            let uri_params = json!({"uri": uri});
1647                            let dlp_findings = scan_parameters_for_secrets(&uri_params);
1648                            if !dlp_findings.is_empty() {
1649                                for finding in &dlp_findings {
1650                                    record_dlp_finding(&finding.pattern_name);
1651                                }
1652                                tracing::warn!(
1653                                    "SECURITY: Secret detected in WS resource URI! Session: {}, URI: [redacted]",
1654                                    session_id,
1655                                );
1656                                let audit_verdict = Verdict::Deny {
1657                                    reason: "DLP blocked: secret detected in resource URI"
1658                                        .to_string(),
1659                                };
1660                                if let Err(e) = state.audit.log_entry(
1661                                    &action, &audit_verdict,
1662                                    json!({
1663                                        "source": "ws_proxy", "session": session_id,
1664                                        "transport": "websocket", "event": "resource_uri_dlp_alert",
1665                                    }),
1666                                ).await {
1667                                    tracing::warn!("Failed to audit WS resource URI DLP: {}", e);
1668                                }
1669                                let error = make_ws_error_response(
1670                                    Some(id),
1671                                    -32001,
1672                                    "Request blocked: security policy violation",
1673                                );
1674                                let mut sink = client_sink.lock().await;
1675                                let _ = sink.send(Message::Text(error.into())).await;
1676                                continue;
1677                            }
1678                        }
1679
1680                        // SECURITY (FIND-R115-042): Circuit breaker check for resource reads.
1681                        // Parity with HTTP handler (handlers.rs:1668) — prevent resource reads
1682                        // from hammering a failing upstream server.
1683                        if let Some(ref circuit_breaker) = state.circuit_breaker {
1684                            if let Err(reason) = circuit_breaker.can_proceed(uri) {
1685                                tracing::warn!(
1686                                    "SECURITY: WS circuit breaker open for resource '{}' in session {}: {}",
1687                                    uri,
1688                                    session_id,
1689                                    reason
1690                                );
1691                                let verdict = Verdict::Deny {
1692                                    reason: format!("Circuit breaker open: {}", reason),
1693                                };
1694                                if let Err(e) = state
1695                                    .audit
1696                                    .log_entry(
1697                                        &action,
1698                                        &verdict,
1699                                        json!({
1700                                            "source": "ws_proxy",
1701                                            "session": session_id,
1702                                            "transport": "websocket",
1703                                            "event": "circuit_breaker_rejected",
1704                                            "uri": uri,
1705                                        }),
1706                                    )
1707                                    .await
1708                                {
1709                                    tracing::warn!(
1710                                        "Failed to audit WS resource circuit breaker rejection: {}",
1711                                        e
1712                                    );
1713                                }
1714                                let error = make_ws_error_response(
1715                                    Some(id),
1716                                    -32001,
1717                                    "Service temporarily unavailable",
1718                                );
1719                                let mut sink = client_sink.lock().await;
1720                                let _ = sink.send(Message::Text(error.into())).await;
1721                                continue;
1722                            }
1723                        }
1724
1725                        // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
1726                        // for resource reads. Matches ToolCall fix above and HTTP
1727                        // handler FIND-R112-002 pattern (handlers.rs:1711-1774).
1728                        let (verdict, ctx) = if let Some(mut session) =
1729                            state.sessions.get_mut(&session_id)
1730                        {
1731                            let ctx = EvaluationContext {
1732                                timestamp: None,
1733                                agent_id: session.oauth_subject.clone(),
1734                                agent_identity: session.agent_identity.clone(),
1735                                call_counts: session.call_counts.clone(),
1736                                previous_actions: session.action_history.iter().cloned().collect(),
1737                                call_chain: session.current_call_chain.clone(),
1738                                tenant_id: None,
1739                                verification_tier: None,
1740                                capability_token: None,
1741                                session_state: None,
1742                            };
1743
1744                            let verdict = match state.engine.evaluate_action_with_context(
1745                                &action,
1746                                &state.policies,
1747                                Some(&ctx),
1748                            ) {
1749                                Ok(v) => v,
1750                                Err(e) => {
1751                                    tracing::error!(
1752                                        session_id = %session_id,
1753                                        "Resource policy evaluation error: {}",
1754                                        e
1755                                    );
1756                                    Verdict::Deny {
1757                                        reason: format!("Policy evaluation failed: {}", e),
1758                                    }
1759                                }
1760                            };
1761
1762                            // Atomically update session on Allow
1763                            if matches!(verdict, Verdict::Allow) {
1764                                session.touch();
1765                                use crate::proxy::call_chain::{
1766                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
1767                                };
1768                                let resource_key = format!(
1769                                    "resources/read:{}",
1770                                    uri.chars().take(128).collect::<String>()
1771                                );
1772                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
1773                                    || session.call_counts.contains_key(&resource_key)
1774                                {
1775                                    let count =
1776                                        session.call_counts.entry(resource_key).or_insert(0);
1777                                    *count = count.saturating_add(1);
1778                                }
1779                                if session.action_history.len() >= MAX_ACTION_HISTORY {
1780                                    session.action_history.pop_front();
1781                                }
1782                                session
1783                                    .action_history
1784                                    .push_back("resources/read".to_string());
1785                            }
1786
1787                            (verdict, ctx)
1788                        } else {
1789                            let verdict = match state.engine.evaluate_action_with_context(
1790                                &action,
1791                                &state.policies,
1792                                None,
1793                            ) {
1794                                Ok(v) => v,
1795                                Err(e) => {
1796                                    tracing::error!(
1797                                        session_id = %session_id,
1798                                        "Resource policy evaluation error: {}",
1799                                        e
1800                                    );
1801                                    Verdict::Deny {
1802                                        reason: format!("Policy evaluation failed: {}", e),
1803                                    }
1804                                }
1805                            };
1806                            (verdict, EvaluationContext::default())
1807                        };
1808
1809                        match verdict {
1810                            Verdict::Allow => {
1811                                // SECURITY (FIND-R116-002): ABAC refinement for resource reads.
1812                                // Parity with HTTP handler (handlers.rs:1783) and gRPC (service.rs:972).
1813                                if let Some(ref abac) = state.abac_engine {
1814                                    let principal_id =
1815                                        ctx.agent_id.as_deref().unwrap_or("anonymous");
1816                                    let principal_type = ctx.principal_type();
1817                                    let session_risk = state
1818                                        .sessions
1819                                        .get_mut(&session_id)
1820                                        .and_then(|s| s.risk_score.clone());
1821                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
1822                                        eval_ctx: &ctx,
1823                                        principal_type,
1824                                        principal_id,
1825                                        risk_score: session_risk.as_ref(),
1826                                    };
1827                                    match abac.evaluate(&action, &abac_ctx) {
1828                                        vellaveto_engine::abac::AbacDecision::Deny {
1829                                            policy_id,
1830                                            reason,
1831                                        } => {
1832                                            let deny_verdict = Verdict::Deny {
1833                                                reason: reason.clone(),
1834                                            };
1835                                            if let Err(e) = state
1836                                                .audit
1837                                                .log_entry(
1838                                                    &action,
1839                                                    &deny_verdict,
1840                                                    json!({
1841                                                        "source": "ws_proxy",
1842                                                        "session": session_id,
1843                                                        "transport": "websocket",
1844                                                        "event": "abac_deny",
1845                                                        "abac_policy": policy_id,
1846                                                        "uri": uri,
1847                                                    }),
1848                                                )
1849                                                .await
1850                                            {
1851                                                tracing::warn!(
1852                                                    "Failed to audit WS resource ABAC deny: {}",
1853                                                    e
1854                                                );
1855                                            }
1856                                            let error_resp = make_ws_error_response(
1857                                                Some(id),
1858                                                -32001,
1859                                                "Denied by policy",
1860                                            );
1861                                            let mut sink = client_sink.lock().await;
1862                                            let _ =
1863                                                sink.send(Message::Text(error_resp.into())).await;
1864                                            continue;
1865                                        }
1866                                        vellaveto_engine::abac::AbacDecision::Allow {
1867                                            policy_id,
1868                                        } => {
1869                                            // SECURITY (FIND-R192-002): record_usage parity.
1870                                            if let Some(ref la) = state.least_agency {
1871                                                la.record_usage(
1872                                                    principal_id,
1873                                                    &session_id,
1874                                                    &policy_id,
1875                                                    uri,
1876                                                    &action.function,
1877                                                );
1878                                            }
1879                                        }
1880                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
1881                                            // Fall through — existing Allow stands
1882                                        }
1883                                        #[allow(unreachable_patterns)]
1884                                        _ => {
1885                                            tracing::warn!(
1886                                                "Unknown AbacDecision variant in WS resource_read — fail-closed"
1887                                            );
1888                                            let error_resp = make_ws_error_response(
1889                                                Some(id),
1890                                                -32001,
1891                                                "Denied by policy",
1892                                            );
1893                                            let mut sink = client_sink.lock().await;
1894                                            let _ =
1895                                                sink.send(Message::Text(error_resp.into())).await;
1896                                            continue;
1897                                        }
1898                                    }
1899                                }
1900
1901                                // NOTE: Session touch + call_counts/action_history
1902                                // update already performed inside the TOCTOU-safe
1903                                // block above (FIND-R130-002). No separate update here.
1904
1905                                // SECURITY (FIND-R46-WS-004): Audit log allowed resource reads
1906                                if let Err(e) = state
1907                                    .audit
1908                                    .log_entry(
1909                                        &action,
1910                                        &Verdict::Allow,
1911                                        json!({
1912                                            "source": "ws_proxy",
1913                                            "session": session_id,
1914                                            "transport": "websocket",
1915                                            "resource_uri": uri,
1916                                        }),
1917                                    )
1918                                    .await
1919                                {
1920                                    tracing::error!(
1921                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1922                                        e
1923                                    );
1924                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1925                                    if state.audit_strict_mode {
1926                                        let error = make_ws_error_response(
1927                                            Some(id),
1928                                            -32000,
1929                                            "Audit logging failed — request denied (strict audit mode)",
1930                                        );
1931                                        let mut sink = client_sink.lock().await;
1932                                        let _ = sink.send(Message::Text(error.into())).await;
1933                                        continue;
1934                                    }
1935                                }
1936
1937                                // SECURITY (FIND-R46-011): Fail-closed on canonicalization
1938                                // failure. Do NOT fall back to original text.
1939                                let forward_text = if state.canonicalize {
1940                                    match serde_json::to_string(&parsed) {
1941                                        Ok(canonical) => canonical,
1942                                        Err(e) => {
1943                                            tracing::error!(
1944                                                "SECURITY: WS resource canonicalization failed: {}",
1945                                                e
1946                                            );
1947                                            let error_resp = make_ws_error_response(
1948                                                Some(id),
1949                                                -32603,
1950                                                "Internal error",
1951                                            );
1952                                            let mut sink = client_sink.lock().await;
1953                                            let _ =
1954                                                sink.send(Message::Text(error_resp.into())).await;
1955                                            continue;
1956                                        }
1957                                    }
1958                                } else {
1959                                    text.to_string()
1960                                };
1961                                let mut sink = upstream_sink.lock().await;
1962                                if let Err(e) = sink
1963                                    .send(tokio_tungstenite::tungstenite::Message::Text(
1964                                        forward_text.into(),
1965                                    ))
1966                                    .await
1967                                {
1968                                    tracing::error!("Failed to forward resource read: {}", e);
1969                                    break;
1970                                }
1971                            }
1972                            // SECURITY (FIND-R116-009): Separate handling for Deny vs RequireApproval
1973                            // with per-verdict audit logging. Parity with gRPC (service.rs:1051-1076).
1974                            Verdict::Deny { ref reason } => {
1975                                if let Err(e) = state
1976                                    .audit
1977                                    .log_entry(
1978                                        &action,
1979                                        &Verdict::Deny {
1980                                            reason: reason.clone(),
1981                                        },
1982                                        json!({
1983                                            "source": "ws_proxy",
1984                                            "session": session_id,
1985                                            "transport": "websocket",
1986                                            "resource_uri": uri,
1987                                        }),
1988                                    )
1989                                    .await
1990                                {
1991                                    tracing::error!(
1992                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1993                                        e
1994                                    );
1995                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1996                                    if state.audit_strict_mode {
1997                                        let error = make_ws_error_response(
1998                                            Some(id),
1999                                            -32000,
2000                                            "Audit logging failed — request denied (strict audit mode)",
2001                                        );
2002                                        let mut sink = client_sink.lock().await;
2003                                        let _ = sink.send(Message::Text(error.into())).await;
2004                                        continue;
2005                                    }
2006                                }
2007                                let error =
2008                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2009                                let mut sink = client_sink.lock().await;
2010                                let _ = sink.send(Message::Text(error.into())).await;
2011                            }
2012                            Verdict::RequireApproval { ref reason, .. } => {
2013                                let deny_reason = format!("Requires approval: {}", reason);
2014                                if let Err(e) = state
2015                                    .audit
2016                                    .log_entry(
2017                                        &action,
2018                                        &Verdict::Deny {
2019                                            reason: deny_reason,
2020                                        },
2021                                        json!({
2022                                            "source": "ws_proxy",
2023                                            "session": session_id,
2024                                            "transport": "websocket",
2025                                            "resource_uri": uri,
2026                                            "event": "require_approval",
2027                                        }),
2028                                    )
2029                                    .await
2030                                {
2031                                    tracing::error!(
2032                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2033                                        e
2034                                    );
2035                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2036                                    if state.audit_strict_mode {
2037                                        let error = make_ws_error_response(
2038                                            Some(id),
2039                                            -32000,
2040                                            "Audit logging failed — request denied (strict audit mode)",
2041                                        );
2042                                        let mut sink = client_sink.lock().await;
2043                                        let _ = sink.send(Message::Text(error.into())).await;
2044                                        continue;
2045                                    }
2046                                }
2047                                let error =
2048                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2049                                let mut sink = client_sink.lock().await;
2050                                let _ = sink.send(Message::Text(error.into())).await;
2051                            }
2052                            #[allow(unreachable_patterns)]
2053                            _ => {
2054                                // SECURITY: Future variants — fail-closed.
2055                                tracing::warn!(
2056                                    "Unknown Verdict variant in WS resource_read — fail-closed"
2057                                );
2058                                let error =
2059                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2060                                let mut sink = client_sink.lock().await;
2061                                let _ = sink.send(Message::Text(error.into())).await;
2062                            }
2063                        }
2064                    }
2065                    MessageType::Batch => {
2066                        // Reject batches per MCP spec
2067                        let error = json!({
2068                            "jsonrpc": "2.0",
2069                            "error": {
2070                                "code": -32600,
2071                                "message": "JSON-RPC batch requests are not supported"
2072                            },
2073                            "id": null
2074                        });
2075                        let error_text = serde_json::to_string(&error)
2076                            .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32600,"message":"Batch not supported"},"id":null}"#.to_string());
2077                        let mut sink = client_sink.lock().await;
2078                        let _ = sink.send(Message::Text(error_text.into())).await;
2079                    }
2080                    MessageType::Invalid { ref id, ref reason } => {
2081                        tracing::warn!(
2082                            "Invalid JSON-RPC request in WebSocket transport: {}",
2083                            reason
2084                        );
2085                        let error =
2086                            make_ws_error_response(Some(id), -32600, "Invalid JSON-RPC request");
2087                        let mut sink = client_sink.lock().await;
2088                        let _ = sink.send(Message::Text(error.into())).await;
2089                    }
2090                    MessageType::SamplingRequest { ref id } => {
2091                        // SECURITY (FIND-R74-006): Call inspect_sampling() for full
2092                        // verdict (enabled + model filter + tool output check + rate limit),
2093                        // matching HTTP handler parity (handlers.rs:1681).
2094                        let params = parsed.get("params").cloned().unwrap_or(json!({}));
2095                        // SECURITY (FIND-R125-001): Per-session sampling rate limit
2096                        // parity with elicitation. Atomically read + increment.
2097                        let sampling_verdict = {
2098                            let mut session_ref = state.sessions.get_mut(&session_id);
2099                            let current_count =
2100                                session_ref.as_ref().map(|s| s.sampling_count).unwrap_or(0);
2101                            let verdict = vellaveto_mcp::elicitation::inspect_sampling(
2102                                &params,
2103                                &state.sampling_config,
2104                                current_count,
2105                            );
2106                            if matches!(verdict, vellaveto_mcp::elicitation::SamplingVerdict::Allow)
2107                            {
2108                                if let Some(ref mut s) = session_ref {
2109                                    s.sampling_count = s.sampling_count.saturating_add(1);
2110                                }
2111                            }
2112                            verdict
2113                        };
2114                        match sampling_verdict {
2115                            vellaveto_mcp::elicitation::SamplingVerdict::Allow => {
2116                                // Forward allowed sampling request
2117                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2118                                // Falling back to original text would create a TOCTOU gap.
2119                                let forward_text = if state.canonicalize {
2120                                    match serde_json::to_string(&parsed) {
2121                                        Ok(canonical) => canonical,
2122                                        Err(e) => {
2123                                            tracing::error!(
2124                                                "SECURITY: WS sampling canonicalization failed: {}",
2125                                                e
2126                                            );
2127                                            let error_resp = make_ws_error_response(
2128                                                Some(id),
2129                                                -32603,
2130                                                "Internal error",
2131                                            );
2132                                            let mut sink = client_sink.lock().await;
2133                                            let _ =
2134                                                sink.send(Message::Text(error_resp.into())).await;
2135                                            continue;
2136                                        }
2137                                    }
2138                                } else {
2139                                    text.to_string()
2140                                };
2141                                let mut sink = upstream_sink.lock().await;
2142                                let _ = sink
2143                                    .send(tokio_tungstenite::tungstenite::Message::Text(
2144                                        forward_text.into(),
2145                                    ))
2146                                    .await;
2147                            }
2148                            vellaveto_mcp::elicitation::SamplingVerdict::Deny { reason } => {
2149                                tracing::warn!(
2150                                    session_id = %session_id,
2151                                    "Blocked WS sampling/createMessage: {}",
2152                                    reason
2153                                );
2154                                let action = Action::new(
2155                                    "vellaveto",
2156                                    "ws_sampling_interception",
2157                                    json!({
2158                                        "method": "sampling/createMessage",
2159                                        "session": session_id,
2160                                        "transport": "websocket",
2161                                        "reason": &reason,
2162                                    }),
2163                                );
2164                                let verdict = Verdict::Deny {
2165                                    reason: reason.clone(),
2166                                };
2167                                if let Err(e) = state
2168                                    .audit
2169                                    .log_entry(
2170                                        &action,
2171                                        &verdict,
2172                                        json!({
2173                                            "source": "ws_proxy",
2174                                            "event": "ws_sampling_interception",
2175                                        }),
2176                                    )
2177                                    .await
2178                                {
2179                                    tracing::warn!(
2180                                        "Failed to audit WS sampling interception: {}",
2181                                        e
2182                                    );
2183                                }
2184                                // SECURITY: Generic message to client — detailed reason
2185                                // is in the audit log, not leaked to the client.
2186                                let error = make_ws_error_response(
2187                                    Some(id),
2188                                    -32001,
2189                                    "sampling/createMessage blocked by policy",
2190                                );
2191                                let mut sink = client_sink.lock().await;
2192                                let _ = sink.send(Message::Text(error.into())).await;
2193                            }
2194                        }
2195                    }
2196                    MessageType::TaskRequest {
2197                        ref id,
2198                        ref task_method,
2199                        ref task_id,
2200                    } => {
2201                        // SECURITY (FIND-R76-001): Memory poisoning detection on task params.
2202                        // Parity with HTTP handler (handlers.rs:2027-2084). Agents could
2203                        // exfiltrate poisoned data via task management operations.
2204                        {
2205                            let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
2206                            let poisoning_detected = state
2207                                .sessions
2208                                .get_mut(&session_id)
2209                                .and_then(|session| {
2210                                    let matches =
2211                                        session.memory_tracker.check_parameters(&task_params);
2212                                    if !matches.is_empty() {
2213                                        for m in &matches {
2214                                            tracing::warn!(
2215                                                "SECURITY: Memory poisoning detected in WS task '{}' (session {}): \
2216                                                 param '{}' contains replayed data (fingerprint: {})",
2217                                                task_method,
2218                                                session_id,
2219                                                m.param_location,
2220                                                m.fingerprint
2221                                            );
2222                                        }
2223                                        Some(matches.len())
2224                                    } else {
2225                                        None
2226                                    }
2227                                });
2228                            if let Some(match_count) = poisoning_detected {
2229                                let poison_action =
2230                                    extractor::extract_task_action(task_method, task_id.as_deref());
2231                                let deny_reason = format!(
2232                                    "Memory poisoning detected: {} replayed data fragment(s) in task '{}'",
2233                                    match_count, task_method
2234                                );
2235                                if let Err(e) = state
2236                                    .audit
2237                                    .log_entry(
2238                                        &poison_action,
2239                                        &Verdict::Deny {
2240                                            reason: deny_reason,
2241                                        },
2242                                        json!({
2243                                            "source": "ws_proxy",
2244                                            "session": session_id,
2245                                            "transport": "websocket",
2246                                            "event": "memory_poisoning_detected",
2247                                            "matches": match_count,
2248                                            "task_method": task_method,
2249                                        }),
2250                                    )
2251                                    .await
2252                                {
2253                                    tracing::warn!(
2254                                        "Failed to audit WS task memory poisoning: {}",
2255                                        e
2256                                    );
2257                                }
2258                                let error = make_ws_error_response(
2259                                    Some(id),
2260                                    -32001,
2261                                    "Request blocked: security policy violation",
2262                                );
2263                                let mut sink = client_sink.lock().await;
2264                                let _ = sink.send(Message::Text(error.into())).await;
2265                                continue;
2266                            }
2267                        }
2268
2269                        // SECURITY (FIND-R76-001): DLP scan task request parameters.
2270                        // Parity with HTTP handler (handlers.rs:2086-2145). Agents could
2271                        // embed secrets in task_id or params to exfiltrate them.
2272                        {
2273                            let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
2274                            let dlp_findings = scan_parameters_for_secrets(&task_params);
2275                            if !dlp_findings.is_empty() {
2276                                for finding in &dlp_findings {
2277                                    record_dlp_finding(&finding.pattern_name);
2278                                }
2279                                let patterns: Vec<String> = dlp_findings
2280                                    .iter()
2281                                    .map(|f| format!("{} at {}", f.pattern_name, f.location))
2282                                    .collect();
2283                                tracing::warn!(
2284                                    "SECURITY: DLP blocking WS task '{}' in session {}: {:?}",
2285                                    task_method,
2286                                    session_id,
2287                                    patterns
2288                                );
2289                                let dlp_action =
2290                                    extractor::extract_task_action(task_method, task_id.as_deref());
2291                                if let Err(e) = state
2292                                    .audit
2293                                    .log_entry(
2294                                        &dlp_action,
2295                                        &Verdict::Deny {
2296                                            reason: format!(
2297                                                "DLP: secrets detected in task request: {:?}",
2298                                                patterns
2299                                            ),
2300                                        },
2301                                        json!({
2302                                            "source": "ws_proxy",
2303                                            "session": session_id,
2304                                            "transport": "websocket",
2305                                            "event": "dlp_secret_detected_task",
2306                                            "task_method": task_method,
2307                                            "findings": patterns,
2308                                        }),
2309                                    )
2310                                    .await
2311                                {
2312                                    tracing::warn!("Failed to audit WS task DLP: {}", e);
2313                                }
2314                                let error = make_ws_error_response(
2315                                    Some(id),
2316                                    -32001,
2317                                    "Request blocked: security policy violation",
2318                                );
2319                                let mut sink = client_sink.lock().await;
2320                                let _ = sink.send(Message::Text(error.into())).await;
2321                                continue;
2322                            }
2323                        }
2324
2325                        // Policy-evaluate task requests (async operations)
2326                        let action =
2327                            extractor::extract_task_action(task_method, task_id.as_deref());
2328                        // SECURITY (FIND-R130-002): TOCTOU-safe context+eval for task
2329                        // requests. Context is built inside the DashMap shard lock to
2330                        // prevent stale snapshot evaluation races.
2331                        // SECURITY (FIND-R190-006): Update session state on Allow
2332                        // (touch + call_counts + action_history) while still holding
2333                        // the shard lock, matching ToolCall/ResourceRead parity.
2334                        let (verdict, task_eval_ctx) = if let Some(mut session) =
2335                            state.sessions.get_mut(&session_id)
2336                        {
2337                            let ctx = EvaluationContext {
2338                                timestamp: None,
2339                                agent_id: session.oauth_subject.clone(),
2340                                agent_identity: session.agent_identity.clone(),
2341                                call_counts: session.call_counts.clone(),
2342                                previous_actions: session.action_history.iter().cloned().collect(),
2343                                call_chain: session.current_call_chain.clone(),
2344                                tenant_id: None,
2345                                verification_tier: None,
2346                                capability_token: None,
2347                                session_state: None,
2348                            };
2349                            let verdict = match state.engine.evaluate_action_with_context(
2350                                &action,
2351                                &state.policies,
2352                                Some(&ctx),
2353                            ) {
2354                                Ok(v) => v,
2355                                Err(e) => {
2356                                    tracing::error!(
2357                                        session_id = %session_id,
2358                                        "Task policy evaluation error: {}", e
2359                                    );
2360                                    Verdict::Deny {
2361                                        reason: format!("Policy evaluation failed: {}", e),
2362                                    }
2363                                }
2364                            };
2365
2366                            // Update session atomically on Allow
2367                            if matches!(verdict, Verdict::Allow) {
2368                                session.touch();
2369                                use crate::proxy::call_chain::{
2370                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
2371                                };
2372                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
2373                                    || session.call_counts.contains_key(task_method)
2374                                {
2375                                    let count = session
2376                                        .call_counts
2377                                        .entry(task_method.to_string())
2378                                        .or_insert(0);
2379                                    *count = count.saturating_add(1);
2380                                }
2381                                if session.action_history.len() >= MAX_ACTION_HISTORY {
2382                                    session.action_history.pop_front();
2383                                }
2384                                session.action_history.push_back(task_method.to_string());
2385                            }
2386
2387                            (verdict, ctx)
2388                        } else {
2389                            let verdict = match state.engine.evaluate_action_with_context(
2390                                &action,
2391                                &state.policies,
2392                                None,
2393                            ) {
2394                                Ok(v) => v,
2395                                Err(e) => {
2396                                    tracing::error!(
2397                                        session_id = %session_id,
2398                                        "Task policy evaluation error: {}", e
2399                                    );
2400                                    Verdict::Deny {
2401                                        reason: format!("Policy evaluation failed: {}", e),
2402                                    }
2403                                }
2404                            };
2405                            (verdict, EvaluationContext::default())
2406                        };
2407
2408                        match verdict {
2409                            Verdict::Allow => {
2410                                // SECURITY (FIND-R190-001): ABAC refinement for TaskRequest,
2411                                // matching ToolCall/ResourceRead parity.
2412                                if let Some(ref abac) = state.abac_engine {
2413                                    let principal_id =
2414                                        task_eval_ctx.agent_id.as_deref().unwrap_or("anonymous");
2415                                    let principal_type = task_eval_ctx.principal_type();
2416                                    let session_risk = state
2417                                        .sessions
2418                                        .get_mut(&session_id)
2419                                        .and_then(|s| s.risk_score.clone());
2420                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
2421                                        eval_ctx: &task_eval_ctx,
2422                                        principal_type,
2423                                        principal_id,
2424                                        risk_score: session_risk.as_ref(),
2425                                    };
2426                                    match abac.evaluate(&action, &abac_ctx) {
2427                                        vellaveto_engine::abac::AbacDecision::Deny {
2428                                            policy_id,
2429                                            reason,
2430                                        } => {
2431                                            let deny_verdict = Verdict::Deny {
2432                                                reason: reason.clone(),
2433                                            };
2434                                            if let Err(e) = state
2435                                                .audit
2436                                                .log_entry(
2437                                                    &action,
2438                                                    &deny_verdict,
2439                                                    json!({
2440                                                        "source": "ws_proxy",
2441                                                        "session": session_id,
2442                                                        "transport": "websocket",
2443                                                        "event": "abac_deny",
2444                                                        "abac_policy": policy_id,
2445                                                        "task_method": task_method,
2446                                                    }),
2447                                                )
2448                                                .await
2449                                            {
2450                                                tracing::warn!(
2451                                                    "Failed to audit WS task ABAC deny: {}",
2452                                                    e
2453                                                );
2454                                            }
2455                                            let error_resp = make_ws_error_response(
2456                                                Some(id),
2457                                                -32001,
2458                                                "Denied by policy",
2459                                            );
2460                                            let mut sink = client_sink.lock().await;
2461                                            let _ =
2462                                                sink.send(Message::Text(error_resp.into())).await;
2463                                            continue;
2464                                        }
2465                                        vellaveto_engine::abac::AbacDecision::Allow {
2466                                            policy_id,
2467                                        } => {
2468                                            if let Some(ref la) = state.least_agency {
2469                                                la.record_usage(
2470                                                    principal_id,
2471                                                    &session_id,
2472                                                    &policy_id,
2473                                                    task_method,
2474                                                    &action.function,
2475                                                );
2476                                            }
2477                                        }
2478                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
2479                                            // Fall through — existing Allow stands
2480                                        }
2481                                        #[allow(unreachable_patterns)]
2482                                        _ => {
2483                                            tracing::warn!(
2484                                                "Unknown AbacDecision variant — fail-closed"
2485                                            );
2486                                            let error_resp = make_ws_error_response(
2487                                                Some(id),
2488                                                -32001,
2489                                                "Denied by policy",
2490                                            );
2491                                            let mut sink = client_sink.lock().await;
2492                                            let _ =
2493                                                sink.send(Message::Text(error_resp.into())).await;
2494                                            continue;
2495                                        }
2496                                    }
2497                                }
2498
2499                                if let Err(e) = state
2500                                    .audit
2501                                    .log_entry(
2502                                        &action,
2503                                        &Verdict::Allow,
2504                                        json!({
2505                                            "source": "ws_proxy",
2506                                            "session": session_id,
2507                                            "transport": "websocket",
2508                                            "task_method": task_method,
2509                                        }),
2510                                    )
2511                                    .await
2512                                {
2513                                    tracing::warn!("Failed to audit WS task allow: {}", e);
2514                                }
2515                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2516                                let forward_text = if state.canonicalize {
2517                                    match serde_json::to_string(&parsed) {
2518                                        Ok(canonical) => canonical,
2519                                        Err(e) => {
2520                                            tracing::error!(
2521                                                "SECURITY: WS task canonicalization failed: {}",
2522                                                e
2523                                            );
2524                                            let error_resp = make_ws_error_response(
2525                                                Some(id),
2526                                                -32603,
2527                                                "Internal error",
2528                                            );
2529                                            let mut sink = client_sink.lock().await;
2530                                            let _ =
2531                                                sink.send(Message::Text(error_resp.into())).await;
2532                                            continue;
2533                                        }
2534                                    }
2535                                } else {
2536                                    text.to_string()
2537                                };
2538                                let mut sink = upstream_sink.lock().await;
2539                                if let Err(e) = sink
2540                                    .send(tokio_tungstenite::tungstenite::Message::Text(
2541                                        forward_text.into(),
2542                                    ))
2543                                    .await
2544                                {
2545                                    tracing::error!("Failed to forward task request: {}", e);
2546                                    break;
2547                                }
2548                            }
2549                            Verdict::Deny { ref reason } => {
2550                                if let Err(e) = state
2551                                    .audit
2552                                    .log_entry(
2553                                        &action,
2554                                        &Verdict::Deny {
2555                                            reason: reason.clone(),
2556                                        },
2557                                        json!({
2558                                            "source": "ws_proxy",
2559                                            "session": session_id,
2560                                            "transport": "websocket",
2561                                            "task_method": task_method,
2562                                        }),
2563                                    )
2564                                    .await
2565                                {
2566                                    tracing::error!(
2567                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2568                                        e
2569                                    );
2570                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
2571                                    if state.audit_strict_mode {
2572                                        let error = make_ws_error_response(
2573                                            Some(id),
2574                                            -32000,
2575                                            "Audit logging failed — request denied (strict audit mode)",
2576                                        );
2577                                        let mut sink = client_sink.lock().await;
2578                                        let _ = sink.send(Message::Text(error.into())).await;
2579                                        continue;
2580                                    }
2581                                }
2582                                // SECURITY (FIND-R55-WS-005): Generic denial message to prevent
2583                                // leaking policy names/details. Detailed reason is in audit log.
2584                                let denial =
2585                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2586                                let mut sink = client_sink.lock().await;
2587                                let _ = sink.send(Message::Text(denial.into())).await;
2588                            }
2589                            Verdict::RequireApproval { ref reason, .. } => {
2590                                let deny_reason = format!("Requires approval: {}", reason);
2591                                if let Err(e) = state
2592                                    .audit
2593                                    .log_entry(
2594                                        &action,
2595                                        &Verdict::Deny {
2596                                            reason: deny_reason,
2597                                        },
2598                                        json!({
2599                                            "source": "ws_proxy",
2600                                            "session": session_id,
2601                                            "transport": "websocket",
2602                                            "task_method": task_method,
2603                                        }),
2604                                    )
2605                                    .await
2606                                {
2607                                    tracing::error!(
2608                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2609                                        e
2610                                    );
2611                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
2612                                    if state.audit_strict_mode {
2613                                        let error = make_ws_error_response(
2614                                            Some(id),
2615                                            -32000,
2616                                            "Audit logging failed — request denied (strict audit mode)",
2617                                        );
2618                                        let mut sink = client_sink.lock().await;
2619                                        let _ = sink.send(Message::Text(error.into())).await;
2620                                        continue;
2621                                    }
2622                                }
2623                                let approval_reason = "Approval required";
2624                                let approval_id =
2625                                    create_ws_approval(&state, &session_id, &action, reason).await;
2626                                let denial = make_ws_error_response_with_data(
2627                                    Some(id),
2628                                    -32001,
2629                                    approval_reason,
2630                                    Some(json!({
2631                                        "verdict": "require_approval",
2632                                        "reason": approval_reason,
2633                                        "approval_id": approval_id,
2634                                    })),
2635                                );
2636                                let mut sink = client_sink.lock().await;
2637                                let _ = sink.send(Message::Text(denial.into())).await;
2638                            }
2639                            _ => {
2640                                let denial =
2641                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2642                                let mut sink = client_sink.lock().await;
2643                                let _ = sink.send(Message::Text(denial.into())).await;
2644                            }
2645                        }
2646                    }
2647                    MessageType::ExtensionMethod {
2648                        ref id,
2649                        ref extension_id,
2650                        ref method,
2651                    } => {
2652                        // Policy-evaluate extension method calls
2653                        let params = parsed.get("params").cloned().unwrap_or(json!({}));
2654
2655                        // SECURITY (FIND-R116-001): DLP scan extension method parameters.
2656                        // Parity with gRPC handle_extension_method (service.rs:1542).
2657                        let dlp_findings = scan_parameters_for_secrets(&params);
2658                        if !dlp_findings.is_empty() {
2659                            for finding in &dlp_findings {
2660                                record_dlp_finding(&finding.pattern_name);
2661                            }
2662                            let patterns: Vec<String> = dlp_findings
2663                                .iter()
2664                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
2665                                .collect();
2666                            tracing::warn!(
2667                                "SECURITY: Secrets in WS extension method parameters! Session: {}, Extension: {}:{}, Findings: {:?}",
2668                                session_id, extension_id, method, patterns,
2669                            );
2670                            let action =
2671                                extractor::extract_extension_action(extension_id, method, &params);
2672                            let audit_verdict = Verdict::Deny {
2673                                reason: format!(
2674                                    "DLP blocked: secret detected in extension parameters: {:?}",
2675                                    patterns
2676                                ),
2677                            };
2678                            if let Err(e) = state.audit.log_entry(
2679                                &action, &audit_verdict,
2680                                json!({
2681                                    "source": "ws_proxy", "session": session_id, "transport": "websocket",
2682                                    "event": "ws_extension_parameter_dlp_alert",
2683                                    "extension_id": extension_id, "method": method, "findings": patterns,
2684                                }),
2685                            ).await {
2686                                tracing::warn!("Failed to audit WS extension parameter DLP: {}", e);
2687                            }
2688                            let denial =
2689                                make_ws_error_response(Some(id), -32001, "Denied by policy");
2690                            let mut sink = client_sink.lock().await;
2691                            let _ = sink.send(Message::Text(denial.into())).await;
2692                            continue;
2693                        }
2694
2695                        // SECURITY (FIND-R116-001): Memory poisoning detection for extension params.
2696                        // Parity with gRPC handle_extension_method (service.rs:1574).
2697                        if let Some(session) = state.sessions.get_mut(&session_id) {
2698                            let poisoning_matches =
2699                                session.memory_tracker.check_parameters(&params);
2700                            if !poisoning_matches.is_empty() {
2701                                for m in &poisoning_matches {
2702                                    tracing::warn!(
2703                                        "SECURITY: Memory poisoning in WS extension '{}:{}' (session {}): \
2704                                         param '{}' replayed data (fingerprint: {})",
2705                                        extension_id, method, session_id, m.param_location, m.fingerprint
2706                                    );
2707                                }
2708                                let action = extractor::extract_extension_action(
2709                                    extension_id,
2710                                    method,
2711                                    &params,
2712                                );
2713                                let deny_reason = format!(
2714                                    "Memory poisoning detected: {} replayed data fragment(s) in extension '{}:{}'",
2715                                    poisoning_matches.len(), extension_id, method
2716                                );
2717                                if let Err(e) = state.audit.log_entry(
2718                                    &action,
2719                                    &Verdict::Deny { reason: deny_reason.clone() },
2720                                    json!({
2721                                        "source": "ws_proxy", "session": session_id, "transport": "websocket",
2722                                        "event": "memory_poisoning_detected",
2723                                        "matches": poisoning_matches.len(),
2724                                        "extension_id": extension_id, "method": method,
2725                                    }),
2726                                ).await {
2727                                    tracing::warn!("Failed to audit WS extension memory poisoning: {}", e);
2728                                }
2729                                let denial =
2730                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2731                                let mut sink = client_sink.lock().await;
2732                                let _ = sink.send(Message::Text(denial.into())).await;
2733                                continue;
2734                            }
2735                        }
2736
2737                        let mut action =
2738                            extractor::extract_extension_action(extension_id, method, &params);
2739
2740                        // SECURITY (FIND-R118-004): DNS resolution for extension methods.
2741                        // Parity with ToolCall (line 710) and ResourceRead (line 1439).
2742                        if state.engine.has_ip_rules() {
2743                            super::helpers::resolve_domains(&mut action).await;
2744                        }
2745
2746                        let ext_key = format!("extension:{}:{}", extension_id, method);
2747
2748                        // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
2749                        // for extension methods. Matches ToolCall/ResourceRead fixes.
2750                        let (verdict, ctx) = if let Some(mut session) =
2751                            state.sessions.get_mut(&session_id)
2752                        {
2753                            let ctx = EvaluationContext {
2754                                timestamp: None,
2755                                agent_id: session.oauth_subject.clone(),
2756                                agent_identity: session.agent_identity.clone(),
2757                                call_counts: session.call_counts.clone(),
2758                                previous_actions: session.action_history.iter().cloned().collect(),
2759                                call_chain: session.current_call_chain.clone(),
2760                                tenant_id: None,
2761                                verification_tier: None,
2762                                capability_token: None,
2763                                session_state: None,
2764                            };
2765
2766                            let verdict = match state.engine.evaluate_action_with_context(
2767                                &action,
2768                                &state.policies,
2769                                Some(&ctx),
2770                            ) {
2771                                Ok(v) => v,
2772                                Err(e) => {
2773                                    tracing::error!(
2774                                        session_id = %session_id,
2775                                        "Extension policy evaluation error: {}", e
2776                                    );
2777                                    Verdict::Deny {
2778                                        reason: format!("Policy evaluation failed: {}", e),
2779                                    }
2780                                }
2781                            };
2782
2783                            // Atomically update session on Allow
2784                            if matches!(verdict, Verdict::Allow) {
2785                                session.touch();
2786                                use crate::proxy::call_chain::{
2787                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
2788                                };
2789                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
2790                                    || session.call_counts.contains_key(&ext_key)
2791                                {
2792                                    let count =
2793                                        session.call_counts.entry(ext_key.clone()).or_insert(0);
2794                                    *count = count.saturating_add(1);
2795                                }
2796                                if session.action_history.len() >= MAX_ACTION_HISTORY {
2797                                    session.action_history.pop_front();
2798                                }
2799                                session.action_history.push_back(ext_key.clone());
2800                            }
2801
2802                            (verdict, ctx)
2803                        } else {
2804                            let verdict = match state.engine.evaluate_action_with_context(
2805                                &action,
2806                                &state.policies,
2807                                None,
2808                            ) {
2809                                Ok(v) => v,
2810                                Err(e) => {
2811                                    tracing::error!(
2812                                        session_id = %session_id,
2813                                        "Extension policy evaluation error: {}", e
2814                                    );
2815                                    Verdict::Deny {
2816                                        reason: format!("Policy evaluation failed: {}", e),
2817                                    }
2818                                }
2819                            };
2820                            (verdict, EvaluationContext::default())
2821                        };
2822
2823                        match verdict {
2824                            Verdict::Allow => {
2825                                // SECURITY (FIND-R118-002): ABAC refinement for extension methods.
2826                                // Parity with ToolCall (line 1099) and ResourceRead (line 1498).
2827                                if let Some(ref abac) = state.abac_engine {
2828                                    let principal_id =
2829                                        ctx.agent_id.as_deref().unwrap_or("anonymous");
2830                                    let principal_type = ctx.principal_type();
2831                                    let session_risk = state
2832                                        .sessions
2833                                        .get_mut(&session_id)
2834                                        .and_then(|s| s.risk_score.clone());
2835                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
2836                                        eval_ctx: &ctx,
2837                                        principal_type,
2838                                        principal_id,
2839                                        risk_score: session_risk.as_ref(),
2840                                    };
2841                                    match abac.evaluate(&action, &abac_ctx) {
2842                                        vellaveto_engine::abac::AbacDecision::Deny {
2843                                            policy_id,
2844                                            reason,
2845                                        } => {
2846                                            let deny_verdict = Verdict::Deny {
2847                                                reason: reason.clone(),
2848                                            };
2849                                            if let Err(e) = state
2850                                                .audit
2851                                                .log_entry(
2852                                                    &action,
2853                                                    &deny_verdict,
2854                                                    json!({
2855                                                        "source": "ws_proxy",
2856                                                        "session": session_id,
2857                                                        "transport": "websocket",
2858                                                        "event": "abac_deny",
2859                                                        "extension_id": extension_id,
2860                                                        "abac_policy": policy_id,
2861                                                    }),
2862                                                )
2863                                                .await
2864                                            {
2865                                                tracing::warn!(
2866                                                    "Failed to audit WS extension ABAC deny: {}",
2867                                                    e
2868                                                );
2869                                            }
2870                                            let error_resp = make_ws_error_response(
2871                                                Some(id),
2872                                                -32001,
2873                                                "Denied by policy",
2874                                            );
2875                                            let mut sink = client_sink.lock().await;
2876                                            let _ =
2877                                                sink.send(Message::Text(error_resp.into())).await;
2878                                            continue;
2879                                        }
2880                                        vellaveto_engine::abac::AbacDecision::Allow {
2881                                            policy_id,
2882                                        } => {
2883                                            if let Some(ref la) = state.least_agency {
2884                                                la.record_usage(
2885                                                    principal_id,
2886                                                    &session_id,
2887                                                    &policy_id,
2888                                                    &ext_key,
2889                                                    method,
2890                                                );
2891                                            }
2892                                        }
2893                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
2894                                            // Fall through — existing Allow stands
2895                                        }
2896                                        #[allow(unreachable_patterns)]
2897                                        // AbacDecision is #[non_exhaustive]
2898                                        _ => {
2899                                            // SECURITY: Future variants — fail-closed (deny).
2900                                            tracing::warn!(
2901                                                "Unknown AbacDecision variant — fail-closed"
2902                                            );
2903                                            let error_resp = make_ws_error_response(
2904                                                Some(id),
2905                                                -32001,
2906                                                "Denied by policy",
2907                                            );
2908                                            let mut sink = client_sink.lock().await;
2909                                            let _ =
2910                                                sink.send(Message::Text(error_resp.into())).await;
2911                                            continue;
2912                                        }
2913                                    }
2914                                }
2915
2916                                // NOTE: Session touch + call_counts/action_history
2917                                // update already performed inside the TOCTOU-safe
2918                                // block above (FIND-R130-002). No separate update here.
2919
2920                                if let Err(e) = state
2921                                    .audit
2922                                    .log_entry(
2923                                        &action,
2924                                        &Verdict::Allow,
2925                                        json!({
2926                                            "source": "ws_proxy",
2927                                            "session": session_id,
2928                                            "transport": "websocket",
2929                                            "extension_id": extension_id,
2930                                        }),
2931                                    )
2932                                    .await
2933                                {
2934                                    tracing::error!(
2935                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2936                                        e
2937                                    );
2938                                    // SECURITY (FIND-R215-007): Strict audit mode — fail-closed.
2939                                    // Parity with Deny and RequireApproval paths.
2940                                    if state.audit_strict_mode {
2941                                        let error = make_ws_error_response(
2942                                            Some(id),
2943                                            -32000,
2944                                            "Audit logging failed — request denied (strict audit mode)",
2945                                        );
2946                                        let mut sink = client_sink.lock().await;
2947                                        let _ = sink.send(Message::Text(error.into())).await;
2948                                        continue;
2949                                    }
2950                                }
2951                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2952                                let forward_text = if state.canonicalize {
2953                                    match serde_json::to_string(&parsed) {
2954                                        Ok(canonical) => canonical,
2955                                        Err(e) => {
2956                                            tracing::error!("SECURITY: WS extension canonicalization failed: {}", e);
2957                                            let error_resp = make_ws_error_response(
2958                                                Some(id),
2959                                                -32603,
2960                                                "Internal error",
2961                                            );
2962                                            let mut sink = client_sink.lock().await;
2963                                            let _ =
2964                                                sink.send(Message::Text(error_resp.into())).await;
2965                                            continue;
2966                                        }
2967                                    }
2968                                } else {
2969                                    text.to_string()
2970                                };
2971                                let mut sink = upstream_sink.lock().await;
2972                                if let Err(e) = sink
2973                                    .send(tokio_tungstenite::tungstenite::Message::Text(
2974                                        forward_text.into(),
2975                                    ))
2976                                    .await
2977                                {
2978                                    tracing::error!("Failed to forward extension request: {}", e);
2979                                    break;
2980                                }
2981                            }
2982                            Verdict::Deny { ref reason } => {
2983                                if let Err(e) = state
2984                                    .audit
2985                                    .log_entry(
2986                                        &action,
2987                                        &Verdict::Deny {
2988                                            reason: reason.clone(),
2989                                        },
2990                                        json!({
2991                                            "source": "ws_proxy",
2992                                            "session": session_id,
2993                                            "transport": "websocket",
2994                                            "extension_id": extension_id,
2995                                        }),
2996                                    )
2997                                    .await
2998                                {
2999                                    tracing::error!(
3000                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3001                                        e
3002                                    );
3003                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3004                                    if state.audit_strict_mode {
3005                                        let error = make_ws_error_response(
3006                                            Some(id),
3007                                            -32000,
3008                                            "Audit logging failed — request denied (strict audit mode)",
3009                                        );
3010                                        let mut sink = client_sink.lock().await;
3011                                        let _ = sink.send(Message::Text(error.into())).await;
3012                                        continue;
3013                                    }
3014                                }
3015                                // SECURITY (FIND-R213-001): Generic denial message — do not leak
3016                                // detailed policy reason to client. Reason is in the audit log.
3017                                let _ = reason;
3018                                let denial =
3019                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
3020                                let mut sink = client_sink.lock().await;
3021                                let _ = sink.send(Message::Text(denial.into())).await;
3022                            }
3023                            Verdict::RequireApproval { ref reason, .. } => {
3024                                let deny_reason = format!("Requires approval: {}", reason);
3025                                if let Err(e) = state
3026                                    .audit
3027                                    .log_entry(
3028                                        &action,
3029                                        &Verdict::Deny {
3030                                            reason: deny_reason,
3031                                        },
3032                                        json!({
3033                                            "source": "ws_proxy",
3034                                            "session": session_id,
3035                                            "transport": "websocket",
3036                                            "extension_id": extension_id,
3037                                        }),
3038                                    )
3039                                    .await
3040                                {
3041                                    tracing::error!(
3042                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3043                                        e
3044                                    );
3045                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3046                                    if state.audit_strict_mode {
3047                                        let error = make_ws_error_response(
3048                                            Some(id),
3049                                            -32000,
3050                                            "Audit logging failed — request denied (strict audit mode)",
3051                                        );
3052                                        let mut sink = client_sink.lock().await;
3053                                        let _ = sink.send(Message::Text(error.into())).await;
3054                                        continue;
3055                                    }
3056                                }
3057                                let approval_reason = "Approval required";
3058                                let approval_id =
3059                                    create_ws_approval(&state, &session_id, &action, reason).await;
3060                                let denial = make_ws_error_response_with_data(
3061                                    Some(id),
3062                                    -32001,
3063                                    approval_reason,
3064                                    Some(json!({
3065                                        "verdict": "require_approval",
3066                                        "reason": approval_reason,
3067                                        "approval_id": approval_id,
3068                                    })),
3069                                );
3070                                let mut sink = client_sink.lock().await;
3071                                let _ = sink.send(Message::Text(denial.into())).await;
3072                            }
3073                            _ => {
3074                                let denial =
3075                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
3076                                let mut sink = client_sink.lock().await;
3077                                let _ = sink.send(Message::Text(denial.into())).await;
3078                            }
3079                        }
3080                    }
3081                    MessageType::ElicitationRequest { ref id } => {
3082                        // SECURITY (FIND-R46-010): Policy checks for elicitation requests.
3083                        // Match the HTTP POST handler's elicitation inspection logic.
3084                        let params = parsed.get("params").cloned().unwrap_or(json!({}));
3085                        let elicitation_verdict = {
3086                            let mut session_ref = state.sessions.get_mut(&session_id);
3087                            let current_count = session_ref
3088                                .as_ref()
3089                                .map(|s| s.elicitation_count)
3090                                .unwrap_or(0);
3091                            let verdict = vellaveto_mcp::elicitation::inspect_elicitation(
3092                                &params,
3093                                &state.elicitation_config,
3094                                current_count,
3095                            );
3096                            // Pre-increment while holding the lock to close the TOCTOU gap
3097                            if matches!(
3098                                verdict,
3099                                vellaveto_mcp::elicitation::ElicitationVerdict::Allow
3100                            ) {
3101                                if let Some(ref mut s) = session_ref {
3102                                    // SECURITY (FIND-R51-008): Use saturating_add for consistency.
3103                                    s.elicitation_count = s.elicitation_count.saturating_add(1);
3104                                }
3105                            }
3106                            verdict
3107                        };
3108                        match elicitation_verdict {
3109                            vellaveto_mcp::elicitation::ElicitationVerdict::Allow => {
3110                                let action = Action::new(
3111                                    "vellaveto",
3112                                    "ws_forward_message",
3113                                    json!({
3114                                        "message_type": "elicitation_request",
3115                                        "session": session_id,
3116                                        "transport": "websocket",
3117                                        "direction": "client_to_upstream",
3118                                    }),
3119                                );
3120                                if let Err(e) = state
3121                                    .audit
3122                                    .log_entry(
3123                                        &action,
3124                                        &Verdict::Allow,
3125                                        json!({
3126                                            "source": "ws_proxy",
3127                                            "event": "ws_elicitation_forwarded",
3128                                        }),
3129                                    )
3130                                    .await
3131                                {
3132                                    tracing::warn!("Failed to audit WS elicitation: {}", e);
3133                                }
3134
3135                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3136                                let forward_text = if state.canonicalize {
3137                                    match serde_json::to_string(&parsed) {
3138                                        Ok(canonical) => canonical,
3139                                        Err(e) => {
3140                                            tracing::error!("SECURITY: WS elicitation canonicalization failed: {}", e);
3141                                            let error_resp = make_ws_error_response(
3142                                                Some(id),
3143                                                -32603,
3144                                                "Internal error",
3145                                            );
3146                                            let mut sink = client_sink.lock().await;
3147                                            let _ =
3148                                                sink.send(Message::Text(error_resp.into())).await;
3149                                            continue;
3150                                        }
3151                                    }
3152                                } else {
3153                                    text.to_string()
3154                                };
3155                                let mut sink = upstream_sink.lock().await;
3156                                if let Err(e) = sink
3157                                    .send(tokio_tungstenite::tungstenite::Message::Text(
3158                                        forward_text.into(),
3159                                    ))
3160                                    .await
3161                                {
3162                                    // Rollback pre-incremented count on forward failure
3163                                    if let Some(mut s) = state.sessions.get_mut(&session_id) {
3164                                        s.elicitation_count = s.elicitation_count.saturating_sub(1);
3165                                    }
3166                                    tracing::error!("Failed to forward elicitation: {}", e);
3167                                    break;
3168                                }
3169                            }
3170                            vellaveto_mcp::elicitation::ElicitationVerdict::Deny { reason } => {
3171                                tracing::warn!(
3172                                    session_id = %session_id,
3173                                    "Blocked WS elicitation/create: {}",
3174                                    reason
3175                                );
3176                                let action = Action::new(
3177                                    "vellaveto",
3178                                    "ws_elicitation_interception",
3179                                    json!({
3180                                        "method": "elicitation/create",
3181                                        "session": session_id,
3182                                        "transport": "websocket",
3183                                        "reason": &reason,
3184                                    }),
3185                                );
3186                                let verdict = Verdict::Deny {
3187                                    reason: reason.clone(),
3188                                };
3189                                if let Err(e) = state
3190                                    .audit
3191                                    .log_entry(
3192                                        &action,
3193                                        &verdict,
3194                                        json!({
3195                                            "source": "ws_proxy",
3196                                            "event": "ws_elicitation_interception",
3197                                        }),
3198                                    )
3199                                    .await
3200                                {
3201                                    tracing::warn!(
3202                                        "Failed to audit WS elicitation interception: {}",
3203                                        e
3204                                    );
3205                                }
3206                                // SECURITY (FIND-R46-012, FIND-R55-WS-006): Generic message to client.
3207                                let error =
3208                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
3209                                let mut sink = client_sink.lock().await;
3210                                let _ = sink.send(Message::Text(error.into())).await;
3211                            }
3212                        }
3213                    }
3214                    MessageType::PassThrough | MessageType::ProgressNotification { .. } => {
3215                        // SECURITY (FIND-R76-003): DLP scan PassThrough params for secrets.
3216                        // Parity with HTTP handler (handlers.rs:1795-1859). Agents could
3217                        // exfiltrate secrets via prompts/get, completion/complete, or any
3218                        // PassThrough method's parameters.
3219                        // SECURITY (FIND-R97-001): Remove method gate — JSON-RPC responses
3220                        // (sampling/elicitation replies) have no `method` field but carry
3221                        // data in `result`. Parity with stdio proxy FIND-R96-001.
3222                        if state.response_dlp_enabled {
3223                            let mut dlp_findings = scan_notification_for_secrets(&parsed);
3224                            // SECURITY (FIND-R97-001): Also scan `result` field for responses.
3225                            if let Some(result_val) = parsed.get("result") {
3226                                dlp_findings.extend(scan_parameters_for_secrets(result_val));
3227                            }
3228                            // SECURITY (FIND-R83-006): Cap combined findings from params+result
3229                            // scans to maintain per-scan invariant (1000).
3230                            dlp_findings.truncate(1000);
3231                            if !dlp_findings.is_empty() {
3232                                for finding in &dlp_findings {
3233                                    record_dlp_finding(&finding.pattern_name);
3234                                }
3235                                let patterns: Vec<String> = dlp_findings
3236                                    .iter()
3237                                    .map(|f| format!("{}:{}", f.pattern_name, f.location))
3238                                    .collect();
3239                                tracing::warn!(
3240                                    "SECURITY: Secrets in WS passthrough params! Session: {}, Findings: {:?}",
3241                                    session_id,
3242                                    patterns
3243                                );
3244                                let n_action = Action::new(
3245                                    "vellaveto",
3246                                    "notification_dlp_scan",
3247                                    json!({
3248                                        "findings": patterns,
3249                                        "session": session_id,
3250                                        "transport": "websocket",
3251                                    }),
3252                                );
3253                                let verdict = if state.response_dlp_blocking {
3254                                    Verdict::Deny {
3255                                        reason: format!(
3256                                            "Notification blocked: secrets detected ({:?})",
3257                                            patterns
3258                                        ),
3259                                    }
3260                                } else {
3261                                    Verdict::Allow
3262                                };
3263                                if let Err(e) = state
3264                                    .audit
3265                                    .log_entry(
3266                                        &n_action,
3267                                        &verdict,
3268                                        json!({
3269                                            "source": "ws_proxy",
3270                                            "event": "notification_dlp_alert",
3271                                            "blocked": state.response_dlp_blocking,
3272                                        }),
3273                                    )
3274                                    .await
3275                                {
3276                                    tracing::warn!("Failed to audit WS passthrough DLP: {}", e);
3277                                }
3278                                if state.response_dlp_blocking {
3279                                    // Drop the message silently (passthrough has no id to respond to)
3280                                    continue;
3281                                }
3282                            }
3283                        }
3284
3285                        // SECURITY (FIND-R130-001): Injection scanning on PassThrough parameters.
3286                        // Parity with HTTP handler (handlers.rs FIND-R112-008) and gRPC handler
3287                        // (service.rs FIND-R113-001). An agent could inject prompt injection
3288                        // payloads via any PassThrough method's parameters.
3289                        if !state.injection_disabled {
3290                            let mut inj_parts = Vec::new();
3291                            if let Some(params) = parsed.get("params") {
3292                                extract_strings_recursive(params, &mut inj_parts, 0);
3293                            }
3294                            if let Some(result) = parsed.get("result") {
3295                                extract_strings_recursive(result, &mut inj_parts, 0);
3296                            }
3297                            let scannable = inj_parts.join("\n");
3298                            if !scannable.is_empty() {
3299                                let injection_matches: Vec<String> =
3300                                    if let Some(ref scanner) = state.injection_scanner {
3301                                        scanner
3302                                            .inspect(&scannable)
3303                                            .into_iter()
3304                                            .map(|s| s.to_string())
3305                                            .collect()
3306                                    } else {
3307                                        inspect_for_injection(&scannable)
3308                                            .into_iter()
3309                                            .map(|s| s.to_string())
3310                                            .collect()
3311                                    };
3312
3313                                if !injection_matches.is_empty() {
3314                                    tracing::warn!(
3315                                        "SECURITY: Injection in WS passthrough params! \
3316                                         Session: {}, Patterns: {:?}",
3317                                        session_id,
3318                                        injection_matches,
3319                                    );
3320
3321                                    let verdict = if state.injection_blocking {
3322                                        Verdict::Deny {
3323                                            reason: format!(
3324                                                "WS passthrough injection blocked: {:?}",
3325                                                injection_matches
3326                                            ),
3327                                        }
3328                                    } else {
3329                                        Verdict::Allow
3330                                    };
3331
3332                                    let inj_action = Action::new(
3333                                        "vellaveto",
3334                                        "ws_passthrough_injection_scan",
3335                                        json!({
3336                                            "matched_patterns": injection_matches,
3337                                            "session": session_id,
3338                                            "transport": "websocket",
3339                                            "direction": "client_to_upstream",
3340                                        }),
3341                                    );
3342                                    if let Err(e) = state
3343                                        .audit
3344                                        .log_entry(
3345                                            &inj_action,
3346                                            &verdict,
3347                                            json!({
3348                                                "source": "ws_proxy",
3349                                                "event": "ws_passthrough_injection_detected",
3350                                                "blocking": state.injection_blocking,
3351                                            }),
3352                                        )
3353                                        .await
3354                                    {
3355                                        tracing::warn!(
3356                                            "Failed to audit WS passthrough injection: {}",
3357                                            e
3358                                        );
3359                                    }
3360
3361                                    if state.injection_blocking {
3362                                        // Drop the message (passthrough has no id to respond to)
3363                                        continue;
3364                                    }
3365                                }
3366                            }
3367                        }
3368
3369                        // SECURITY (IMP-R182-009): Memory poisoning check — parity with
3370                        // tool calls, resource reads, tasks, and extension methods.
3371                        if let Some(mut session) = state.sessions.get_mut(&session_id) {
3372                            let params_to_scan = parsed.get("params").cloned().unwrap_or(json!({}));
3373                            // SECURITY (IMP-R184-010): Also scan `result` field — parity
3374                            // with DLP scan which scans both params and result.
3375                            let mut poisoning_matches =
3376                                session.memory_tracker.check_parameters(&params_to_scan);
3377                            if let Some(result_val) = parsed.get("result") {
3378                                poisoning_matches
3379                                    .extend(session.memory_tracker.check_parameters(result_val));
3380                            }
3381                            if !poisoning_matches.is_empty() {
3382                                let method_name = parsed
3383                                    .get("method")
3384                                    .and_then(|m| m.as_str())
3385                                    .unwrap_or("unknown");
3386                                for m in &poisoning_matches {
3387                                    tracing::warn!(
3388                                        "SECURITY: Memory poisoning in WS passthrough '{}' (session {}): \
3389                                         param '{}' replayed data (fingerprint: {})",
3390                                        method_name,
3391                                        session_id,
3392                                        m.param_location,
3393                                        m.fingerprint
3394                                    );
3395                                }
3396                                let poison_action = Action::new(
3397                                    "vellaveto",
3398                                    "ws_passthrough_memory_poisoning",
3399                                    json!({
3400                                        "method": method_name,
3401                                        "session": session_id,
3402                                        "matches": poisoning_matches.len(),
3403                                        "transport": "websocket",
3404                                    }),
3405                                );
3406                                if let Err(e) = state
3407                                    .audit
3408                                    .log_entry(
3409                                        &poison_action,
3410                                        &Verdict::Deny {
3411                                            reason: format!(
3412                                                "WS passthrough blocked: memory poisoning ({} matches)",
3413                                                poisoning_matches.len()
3414                                            ),
3415                                        },
3416                                        json!({
3417                                            "source": "ws_proxy",
3418                                            "event": "ws_passthrough_memory_poisoning",
3419                                        }),
3420                                    )
3421                                    .await
3422                                {
3423                                    tracing::warn!(
3424                                        "Failed to audit WS passthrough memory poisoning: {}",
3425                                        e
3426                                    );
3427                                }
3428                                continue; // Drop the message
3429                            }
3430                            // Fingerprint for future poisoning detection.
3431                            session.memory_tracker.extract_from_value(&params_to_scan);
3432                            if let Some(result_val) = parsed.get("result") {
3433                                session.memory_tracker.extract_from_value(result_val);
3434                            }
3435                        } else {
3436                            // IMP-R186-005: Log when session is missing so the skip is observable.
3437                            tracing::warn!(
3438                                "Session {} not found for WS passthrough memory poisoning check",
3439                                session_id
3440                            );
3441                        }
3442
3443                        // SECURITY (FIND-R46-WS-004): Audit log forwarded passthrough/notification messages
3444                        let msg_type = match &classified {
3445                            MessageType::ProgressNotification { .. } => "progress_notification",
3446                            _ => "passthrough",
3447                        };
3448                        let action = Action::new(
3449                            "vellaveto",
3450                            "ws_forward_message",
3451                            json!({
3452                                "message_type": msg_type,
3453                                "session": session_id,
3454                                "transport": "websocket",
3455                                "direction": "client_to_upstream",
3456                            }),
3457                        );
3458                        if let Err(e) = state
3459                            .audit
3460                            .log_entry(
3461                                &action,
3462                                &Verdict::Allow,
3463                                json!({
3464                                    "source": "ws_proxy",
3465                                    "event": "ws_message_forwarded",
3466                                }),
3467                            )
3468                            .await
3469                        {
3470                            tracing::warn!("Failed to audit WS passthrough: {}", e);
3471                        }
3472
3473                        // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3474                        let forward_text = if state.canonicalize {
3475                            match serde_json::to_string(&parsed) {
3476                                Ok(canonical) => canonical,
3477                                Err(e) => {
3478                                    tracing::error!(
3479                                        "SECURITY: WS passthrough canonicalization failed: {}",
3480                                        e
3481                                    );
3482                                    continue;
3483                                }
3484                            }
3485                        } else {
3486                            text.to_string()
3487                        };
3488                        let mut sink = upstream_sink.lock().await;
3489                        if let Err(e) = sink
3490                            .send(tokio_tungstenite::tungstenite::Message::Text(
3491                                forward_text.into(),
3492                            ))
3493                            .await
3494                        {
3495                            tracing::error!("Failed to forward passthrough: {}", e);
3496                            break;
3497                        }
3498                    }
3499                }
3500            }
3501            Message::Binary(_data) => {
3502                // SECURITY: Binary frames not allowed for JSON-RPC
3503                tracing::warn!(
3504                    session_id = %session_id,
3505                    "Binary WebSocket frame rejected (JSON-RPC is text-only)"
3506                );
3507
3508                // SECURITY (FIND-R46-WS-004): Audit log binary frame rejection
3509                let action = Action::new(
3510                    "vellaveto",
3511                    "ws_binary_frame_rejected",
3512                    json!({
3513                        "session": session_id,
3514                        "transport": "websocket",
3515                        "direction": "client_to_upstream",
3516                    }),
3517                );
3518                if let Err(e) = state
3519                    .audit
3520                    .log_entry(
3521                        &action,
3522                        &Verdict::Deny {
3523                            reason: "Binary frames not supported for JSON-RPC".to_string(),
3524                        },
3525                        json!({
3526                            "source": "ws_proxy",
3527                            "event": "ws_binary_frame_rejected",
3528                        }),
3529                    )
3530                    .await
3531                {
3532                    tracing::warn!("Failed to audit WS binary frame rejection: {}", e);
3533                }
3534
3535                let mut sink = client_sink.lock().await;
3536                let _ = sink
3537                    .send(Message::Close(Some(CloseFrame {
3538                        code: CLOSE_UNSUPPORTED_DATA,
3539                        reason: "Binary frames not supported".into(),
3540                    })))
3541                    .await;
3542                break;
3543            }
3544            Message::Ping(data) => {
3545                let mut sink = client_sink.lock().await;
3546                let _ = sink.send(Message::Pong(data)).await;
3547            }
3548            Message::Pong(_) => {
3549                // Ignored
3550            }
3551            Message::Close(_) => {
3552                tracing::debug!(session_id = %session_id, "Client sent close frame");
3553                break;
3554            }
3555        }
3556    }
3557}
3558
3559/// Relay messages from upstream to client with DLP and injection scanning.
3560#[allow(clippy::too_many_arguments)]
3561async fn relay_upstream_to_client(
3562    mut upstream_stream: futures_util::stream::SplitStream<
3563        tokio_tungstenite::WebSocketStream<
3564            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
3565        >,
3566    >,
3567    client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
3568    state: ProxyState,
3569    session_id: String,
3570    ws_config: WebSocketConfig,
3571    upstream_rate_counter: Arc<AtomicU64>,
3572    upstream_rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
3573    last_activity: Arc<AtomicU64>,
3574    connection_epoch: std::time::Instant,
3575) {
3576    while let Some(msg_result) = upstream_stream.next().await {
3577        let msg = match msg_result {
3578            Ok(m) => m,
3579            Err(e) => {
3580                tracing::debug!(session_id = %session_id, "Upstream WS error: {}", e);
3581                break;
3582            }
3583        };
3584
3585        // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
3586        last_activity.store(
3587            connection_epoch.elapsed().as_millis() as u64,
3588            Ordering::Relaxed,
3589        );
3590
3591        record_ws_message("upstream_to_client");
3592
3593        // SECURITY (FIND-R46-WS-003): Rate limiting on upstream→client direction.
3594        // A malicious or compromised upstream could flood the client with messages.
3595        if !check_rate_limit(
3596            &upstream_rate_counter,
3597            &upstream_rate_window_start,
3598            ws_config.upstream_rate_limit,
3599        ) {
3600            tracing::warn!(
3601                session_id = %session_id,
3602                "WebSocket upstream rate limit exceeded ({}/s), dropping message",
3603                ws_config.upstream_rate_limit,
3604            );
3605
3606            let action = Action::new(
3607                "vellaveto",
3608                "ws_upstream_rate_limit",
3609                json!({
3610                    "session": session_id,
3611                    "transport": "websocket",
3612                    "direction": "upstream_to_client",
3613                    "limit": ws_config.upstream_rate_limit,
3614                }),
3615            );
3616            if let Err(e) = state
3617                .audit
3618                .log_entry(
3619                    &action,
3620                    &Verdict::Deny {
3621                        reason: "Upstream rate limit exceeded".to_string(),
3622                    },
3623                    json!({
3624                        "source": "ws_proxy",
3625                        "event": "ws_upstream_rate_limit_exceeded",
3626                    }),
3627                )
3628                .await
3629            {
3630                tracing::warn!("Failed to audit WS upstream rate limit: {}", e);
3631            }
3632
3633            metrics::counter!(
3634                "vellaveto_ws_upstream_rate_limited_total",
3635                "session" => session_id.clone()
3636            )
3637            .increment(1);
3638
3639            // Drop the message (don't close the connection — upstream flood should not
3640            // disconnect the client, just throttle the flow)
3641            continue;
3642        }
3643
3644        match msg {
3645            tokio_tungstenite::tungstenite::Message::Text(text) => {
3646                // Try to parse for scanning
3647                let forward = if let Ok(json_val) = serde_json::from_str::<Value>(&text) {
3648                    // Resolve tracked tool context for response-side schema checks.
3649                    let tracked_tool_name =
3650                        take_tracked_tool_call(&state.sessions, &session_id, json_val.get("id"));
3651
3652                    // SECURITY (FIND-R75-003): Track whether DLP or injection was detected
3653                    // (even in log-only mode) to gate memory_tracker.record_response().
3654                    // Recording fingerprints from tainted responses would poison the tracker.
3655                    let mut dlp_found = false;
3656                    let mut injection_found = false;
3657
3658                    // DLP scanning on responses
3659                    if state.response_dlp_enabled {
3660                        let dlp_findings = scan_response_for_secrets(&json_val);
3661                        if !dlp_findings.is_empty() {
3662                            dlp_found = true;
3663                            for finding in &dlp_findings {
3664                                record_dlp_finding(&finding.pattern_name);
3665                            }
3666
3667                            let patterns: Vec<String> = dlp_findings
3668                                .iter()
3669                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
3670                                .collect();
3671
3672                            tracing::warn!(
3673                                "SECURITY: Secrets in WS response! Session: {}, Findings: {:?}",
3674                                session_id,
3675                                patterns,
3676                            );
3677
3678                            let verdict = if state.response_dlp_blocking {
3679                                Verdict::Deny {
3680                                    reason: format!("WS response DLP blocked: {:?}", patterns),
3681                                }
3682                            } else {
3683                                Verdict::Allow
3684                            };
3685
3686                            let action = Action::new(
3687                                "vellaveto",
3688                                "ws_response_dlp_scan",
3689                                json!({
3690                                    "findings": patterns,
3691                                    "session": session_id,
3692                                    "transport": "websocket",
3693                                }),
3694                            );
3695                            if let Err(e) = state
3696                                .audit
3697                                .log_entry(
3698                                    &action,
3699                                    &verdict,
3700                                    json!({
3701                                        "source": "ws_proxy",
3702                                        "event": "ws_response_dlp_alert",
3703                                    }),
3704                                )
3705                                .await
3706                            {
3707                                tracing::warn!("Failed to audit WS DLP: {}", e);
3708                            }
3709
3710                            if state.response_dlp_blocking {
3711                                // Send error response instead
3712                                let id = json_val.get("id");
3713                                let error = make_ws_error_response(
3714                                    id,
3715                                    -32001,
3716                                    "Response blocked by DLP policy",
3717                                );
3718                                let mut sink = client_sink.lock().await;
3719                                let _ = sink.send(Message::Text(error.into())).await;
3720                                continue;
3721                            }
3722                        }
3723                    }
3724
3725                    // Injection scanning
3726                    if !state.injection_disabled {
3727                        let text_to_scan = extract_scannable_text(&json_val);
3728                        if !text_to_scan.is_empty() {
3729                            let injection_matches: Vec<String> =
3730                                if let Some(ref scanner) = state.injection_scanner {
3731                                    scanner
3732                                        .inspect(&text_to_scan)
3733                                        .into_iter()
3734                                        .map(|s| s.to_string())
3735                                        .collect()
3736                                } else {
3737                                    inspect_for_injection(&text_to_scan)
3738                                        .into_iter()
3739                                        .map(|s| s.to_string())
3740                                        .collect()
3741                                };
3742
3743                            if !injection_matches.is_empty() {
3744                                injection_found = true;
3745                                tracing::warn!(
3746                                    "SECURITY: Injection in WS response! Session: {}, Patterns: {:?}",
3747                                    session_id,
3748                                    injection_matches,
3749                                );
3750
3751                                let verdict = if state.injection_blocking {
3752                                    Verdict::Deny {
3753                                        reason: format!(
3754                                            "WS response injection blocked: {:?}",
3755                                            injection_matches
3756                                        ),
3757                                    }
3758                                } else {
3759                                    Verdict::Allow
3760                                };
3761
3762                                let action = Action::new(
3763                                    "vellaveto",
3764                                    "ws_response_injection",
3765                                    json!({
3766                                        "matched_patterns": injection_matches,
3767                                        "session": session_id,
3768                                        "transport": "websocket",
3769                                    }),
3770                                );
3771                                if let Err(e) = state
3772                                    .audit
3773                                    .log_entry(
3774                                        &action,
3775                                        &verdict,
3776                                        json!({
3777                                            "source": "ws_proxy",
3778                                            "event": "ws_injection_detected",
3779                                        }),
3780                                    )
3781                                    .await
3782                                {
3783                                    tracing::warn!("Failed to audit WS injection: {}", e);
3784                                }
3785
3786                                if state.injection_blocking {
3787                                    let id = json_val.get("id");
3788                                    let error = make_ws_error_response(
3789                                        id,
3790                                        -32001,
3791                                        "Response blocked: injection detected",
3792                                    );
3793                                    let mut sink = client_sink.lock().await;
3794                                    let _ = sink.send(Message::Text(error.into())).await;
3795                                    continue;
3796                                }
3797                            }
3798                        }
3799                    }
3800
3801                    // SECURITY (FIND-R46-007): Rug-pull detection on tools/list responses.
3802                    // Check if this is a response to a tools/list request and extract
3803                    // annotations for rug-pull detection.
3804                    if json_val.get("result").is_some() {
3805                        // Check if result contains "tools" array (tools/list response)
3806                        if json_val
3807                            .get("result")
3808                            .and_then(|r| r.get("tools"))
3809                            .and_then(|t| t.as_array())
3810                            .is_some()
3811                        {
3812                            super::helpers::extract_annotations_from_response(
3813                                &json_val,
3814                                &session_id,
3815                                &state.sessions,
3816                                &state.audit,
3817                                &state.known_tools,
3818                            )
3819                            .await;
3820
3821                            // Verify manifest if configured
3822                            if let Some(ref manifest_config) = state.manifest_config {
3823                                super::helpers::verify_manifest_from_response(
3824                                    &json_val,
3825                                    &session_id,
3826                                    &state.sessions,
3827                                    manifest_config,
3828                                    &state.audit,
3829                                )
3830                                .await;
3831                            }
3832
3833                            // SECURITY (FIND-R130-003): Scan tool descriptions for embedded
3834                            // injection. Parity with HTTP upstream handler (upstream.rs:648-698).
3835                            if !state.injection_disabled {
3836                                let desc_findings =
3837                                    if let Some(ref scanner) = state.injection_scanner {
3838                                        scan_tool_descriptions_with_scanner(&json_val, scanner)
3839                                    } else {
3840                                        scan_tool_descriptions(&json_val)
3841                                    };
3842                                for finding in &desc_findings {
3843                                    injection_found = true;
3844                                    tracing::warn!(
3845                                        "SECURITY: Injection in tool '{}' description! \
3846                                         Session: {}, Patterns: {:?}",
3847                                        finding.tool_name,
3848                                        session_id,
3849                                        finding.matched_patterns
3850                                    );
3851                                    let action = Action::new(
3852                                        "vellaveto",
3853                                        "tool_description_injection",
3854                                        json!({
3855                                            "tool": finding.tool_name,
3856                                            "matched_patterns": finding.matched_patterns,
3857                                            "session": session_id,
3858                                            "transport": "websocket",
3859                                            "blocking": state.injection_blocking,
3860                                        }),
3861                                    );
3862                                    if let Err(e) = state
3863                                        .audit
3864                                        .log_entry(
3865                                            &action,
3866                                            &Verdict::Deny {
3867                                                reason: format!(
3868                                                    "Tool '{}' description contains injection: {:?}",
3869                                                    finding.tool_name, finding.matched_patterns
3870                                                ),
3871                                            },
3872                                            json!({
3873                                                "source": "ws_proxy",
3874                                                "event": "tool_description_injection",
3875                                            }),
3876                                        )
3877                                        .await
3878                                    {
3879                                        tracing::warn!(
3880                                            "Failed to audit WS tool description injection: {}",
3881                                            e
3882                                        );
3883                                    }
3884                                }
3885                                if !desc_findings.is_empty() && state.injection_blocking {
3886                                    let id = json_val.get("id");
3887                                    let error = make_ws_error_response(
3888                                        id,
3889                                        -32001,
3890                                        "Response blocked: suspicious content in tool descriptions",
3891                                    );
3892                                    let mut sink = client_sink.lock().await;
3893                                    let _ = sink.send(Message::Text(error.into())).await;
3894                                    continue;
3895                                }
3896                            }
3897                        }
3898                    }
3899
3900                    // SECURITY: Enforce output schema on WS structuredContent.
3901                    // SECURITY (FIND-R154-004): Track schema violations for the
3902                    // record_response guard below, even in non-blocking mode.
3903                    let schema_violation_found = validate_ws_structured_content_response(
3904                        &json_val,
3905                        &state,
3906                        &session_id,
3907                        tracked_tool_name.as_deref(),
3908                    )
3909                    .await;
3910                    if schema_violation_found {
3911                        let id = json_val.get("id");
3912                        let error = make_ws_error_response(
3913                            id,
3914                            -32001,
3915                            "Response blocked: output schema validation failed",
3916                        );
3917                        let mut sink = client_sink.lock().await;
3918                        let _ = sink.send(Message::Text(error.into())).await;
3919                        continue;
3920                    }
3921
3922                    // SECURITY (FIND-R75-003, FIND-R154-004): Record response
3923                    // fingerprints for memory poisoning detection. Skip recording
3924                    // when DLP, injection, or schema violation was detected (even
3925                    // in log-only mode) to avoid poisoning the tracker with tainted
3926                    // data. Parity with stdio relay (relay.rs:2919).
3927                    if !dlp_found && !injection_found && !schema_violation_found {
3928                        if let Some(mut session) = state.sessions.get_mut(&session_id) {
3929                            session.memory_tracker.record_response(&json_val);
3930                        }
3931                    }
3932
3933                    // SECURITY (FIND-R46-WS-004): Audit log forwarded upstream→client text messages
3934                    {
3935                        let msg_type = if json_val.get("result").is_some() {
3936                            "response"
3937                        } else if json_val.get("error").is_some() {
3938                            "error_response"
3939                        } else if json_val.get("method").is_some() {
3940                            "notification"
3941                        } else {
3942                            "unknown"
3943                        };
3944                        let action = Action::new(
3945                            "vellaveto",
3946                            "ws_forward_upstream_message",
3947                            json!({
3948                                "message_type": msg_type,
3949                                "session": session_id,
3950                                "transport": "websocket",
3951                                "direction": "upstream_to_client",
3952                            }),
3953                        );
3954                        if let Err(e) = state
3955                            .audit
3956                            .log_entry(
3957                                &action,
3958                                &Verdict::Allow,
3959                                json!({
3960                                    "source": "ws_proxy",
3961                                    "event": "ws_upstream_message_forwarded",
3962                                }),
3963                            )
3964                            .await
3965                        {
3966                            tracing::warn!("Failed to audit WS upstream message forward: {}", e);
3967                        }
3968                    }
3969
3970                    // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3971                    if state.canonicalize {
3972                        match serde_json::to_string(&json_val) {
3973                            Ok(canonical) => canonical,
3974                            Err(e) => {
3975                                tracing::error!(
3976                                    "SECURITY: WS response canonicalization failed: {}",
3977                                    e
3978                                );
3979                                continue;
3980                            }
3981                        }
3982                    } else {
3983                        text.to_string()
3984                    }
3985                } else {
3986                    // SECURITY (FIND-R166-001): Non-JSON upstream text must still be
3987                    // scanned for DLP/injection before forwarding. A malicious upstream
3988                    // could exfiltrate secrets or inject payloads via non-JSON frames.
3989                    let text_str: &str = &text;
3990                    // SECURITY (FIND-R168-001): DLP scan with audit logging parity.
3991                    if state.response_dlp_enabled {
3992                        let findings = scan_text_for_secrets(text_str, "ws.upstream.non_json_text");
3993                        if !findings.is_empty() {
3994                            let patterns: Vec<String> = findings
3995                                .iter()
3996                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
3997                                .collect();
3998                            tracing::warn!(
3999                                session_id = %session_id,
4000                                "DLP: non-JSON upstream text contains sensitive data ({} findings)",
4001                                findings.len(),
4002                            );
4003                            let verdict = if state.response_dlp_blocking {
4004                                Verdict::Deny {
4005                                    reason: format!("WS non-JSON DLP: {:?}", patterns),
4006                                }
4007                            } else {
4008                                Verdict::Allow
4009                            };
4010                            let action = Action::new(
4011                                "vellaveto",
4012                                "ws_nonjson_dlp_scan",
4013                                json!({ "findings": patterns, "session": session_id, "transport": "websocket" }),
4014                            );
4015                            // SECURITY (SE-004): Log audit failures instead of silently discarding.
4016                            if let Err(e) = state.audit.log_entry(
4017                                &action, &verdict,
4018                                json!({ "source": "ws_proxy", "event": "ws_nonjson_dlp_alert" }),
4019                            ).await {
4020                                tracing::error!(
4021                                    session_id = %session_id,
4022                                    error = %e,
4023                                    "AUDIT FAILURE: failed to log ws_nonjson_dlp_alert"
4024                                );
4025                            }
4026                            if state.response_dlp_blocking {
4027                                continue;
4028                            }
4029                        }
4030                    }
4031                    // SECURITY (FIND-R168-002): Injection scan with log-only mode
4032                    // parity. Always log detections; only block when injection_blocking.
4033                    {
4034                        let alerts: Vec<String> = if let Some(ref scanner) = state.injection_scanner
4035                        {
4036                            scanner
4037                                .inspect(text_str)
4038                                .into_iter()
4039                                .map(|s| s.to_string())
4040                                .collect()
4041                        } else {
4042                            inspect_for_injection(text_str)
4043                                .into_iter()
4044                                .map(|s| s.to_string())
4045                                .collect()
4046                        };
4047                        if !alerts.is_empty() {
4048                            tracing::warn!(
4049                                session_id = %session_id,
4050                                "Injection: non-JSON upstream text ({} alerts), blocking={}",
4051                                alerts.len(), state.injection_blocking,
4052                            );
4053                            let verdict = if state.injection_blocking {
4054                                Verdict::Deny {
4055                                    reason: format!(
4056                                        "WS non-JSON injection: {} alerts",
4057                                        alerts.len()
4058                                    ),
4059                                }
4060                            } else {
4061                                Verdict::Allow
4062                            };
4063                            let action = Action::new(
4064                                "vellaveto",
4065                                "ws_nonjson_injection_scan",
4066                                json!({ "alerts": alerts.len(), "session": session_id, "transport": "websocket" }),
4067                            );
4068                            // SECURITY (SE-004): Log audit failures instead of silently discarding.
4069                            if let Err(e) = state.audit.log_entry(
4070                                &action, &verdict,
4071                                json!({ "source": "ws_proxy", "event": "ws_nonjson_injection_alert" }),
4072                            ).await {
4073                                tracing::error!(
4074                                    session_id = %session_id,
4075                                    error = %e,
4076                                    "AUDIT FAILURE: failed to log ws_nonjson_injection_alert"
4077                                );
4078                            }
4079                            if state.injection_blocking {
4080                                continue;
4081                            }
4082                        }
4083                    }
4084                    text.to_string()
4085                };
4086
4087                let mut sink = client_sink.lock().await;
4088                if let Err(e) = sink.send(Message::Text(forward.into())).await {
4089                    tracing::debug!("Failed to send to client: {}", e);
4090                    break;
4091                }
4092            }
4093            tokio_tungstenite::tungstenite::Message::Binary(data) => {
4094                // SECURITY (FIND-R46-WS-002): DLP scanning on upstream binary frames.
4095                // Binary from upstream is unusual for JSON-RPC but must be scanned
4096                // before being dropped, to detect and audit secret exfiltration attempts
4097                // via binary frames.
4098                tracing::warn!(
4099                    session_id = %session_id,
4100                    "Unexpected binary frame from upstream ({} bytes), scanning before drop",
4101                    data.len(),
4102                );
4103
4104                // DLP scan the binary data as UTF-8 lossy
4105                if state.response_dlp_enabled {
4106                    let text_repr = String::from_utf8_lossy(&data);
4107                    if !text_repr.is_empty() {
4108                        let dlp_findings = scan_text_for_secrets(&text_repr, "ws_binary_frame");
4109                        if !dlp_findings.is_empty() {
4110                            for finding in &dlp_findings {
4111                                record_dlp_finding(&finding.pattern_name);
4112                            }
4113                            let patterns: Vec<String> = dlp_findings
4114                                .iter()
4115                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
4116                                .collect();
4117
4118                            tracing::warn!(
4119                                "SECURITY: Secrets in WS upstream binary frame! Session: {}, Findings: {:?}",
4120                                session_id,
4121                                patterns,
4122                            );
4123
4124                            let action = Action::new(
4125                                "vellaveto",
4126                                "ws_binary_dlp_scan",
4127                                json!({
4128                                    "findings": patterns,
4129                                    "session": session_id,
4130                                    "transport": "websocket",
4131                                    "direction": "upstream_to_client",
4132                                    "binary_size": data.len(),
4133                                }),
4134                            );
4135                            if let Err(e) = state
4136                                .audit
4137                                .log_entry(
4138                                    &action,
4139                                    &Verdict::Deny {
4140                                        reason: format!("WS binary frame DLP: {:?}", patterns),
4141                                    },
4142                                    json!({
4143                                        "source": "ws_proxy",
4144                                        "event": "ws_binary_dlp_alert",
4145                                    }),
4146                                )
4147                                .await
4148                            {
4149                                tracing::warn!("Failed to audit WS binary DLP: {}", e);
4150                            }
4151                        }
4152                    }
4153                }
4154
4155                // SECURITY (FIND-R46-WS-004): Audit log binary frame drop
4156                let action = Action::new(
4157                    "vellaveto",
4158                    "ws_upstream_binary_dropped",
4159                    json!({
4160                        "session": session_id,
4161                        "transport": "websocket",
4162                        "direction": "upstream_to_client",
4163                        "binary_size": data.len(),
4164                    }),
4165                );
4166                if let Err(e) = state
4167                    .audit
4168                    .log_entry(
4169                        &action,
4170                        &Verdict::Deny {
4171                            reason: "Binary frames not supported for JSON-RPC".to_string(),
4172                        },
4173                        json!({
4174                            "source": "ws_proxy",
4175                            "event": "ws_upstream_binary_dropped",
4176                        }),
4177                    )
4178                    .await
4179                {
4180                    tracing::warn!("Failed to audit WS upstream binary drop: {}", e);
4181                }
4182            }
4183            tokio_tungstenite::tungstenite::Message::Ping(data) => {
4184                // Forward ping as pong to upstream (handled by tungstenite)
4185                let _ = data; // tungstenite auto-responds to pings
4186            }
4187            tokio_tungstenite::tungstenite::Message::Pong(_) => {}
4188            tokio_tungstenite::tungstenite::Message::Close(_) => {
4189                tracing::debug!(session_id = %session_id, "Upstream sent close frame");
4190                break;
4191            }
4192            tokio_tungstenite::tungstenite::Message::Frame(_) => {
4193                // Raw frame — ignore
4194            }
4195        }
4196    }
4197}
4198
4199/// Convert an HTTP URL to a WebSocket URL.
4200///
4201/// `http://` → `ws://`, `https://` → `wss://`.
4202///
4203/// SECURITY (FIND-R124-001): Only allows http/https/ws/wss schemes.
4204/// Unknown schemes (ftp://, file://, gopher://) are rejected with a
4205/// warning and fall back to the original URL prefixed with `ws://`
4206/// to maintain fail-closed behavior. This gives parity with scheme
4207/// validation in HTTP and gRPC transports (FIND-R42-015).
4208pub fn convert_to_ws_url(http_url: &str) -> String {
4209    if let Some(rest) = http_url.strip_prefix("https://") {
4210        format!("wss://{}", rest)
4211    } else if let Some(rest) = http_url.strip_prefix("http://") {
4212        format!("ws://{}", rest)
4213    } else if http_url.starts_with("wss://") || http_url.starts_with("ws://") {
4214        // Already a WebSocket URL — use as-is
4215        http_url.to_string()
4216    } else {
4217        // SECURITY (FIND-R124-001): Reject unknown schemes. Log warning
4218        // and return a URL that will fail to connect safely rather than
4219        // connecting to an unintended scheme (e.g., ftp://, file://).
4220        // SECURITY (FIND-R166-003): Sanitize logged value to prevent log injection
4221        // from URLs with control characters (possible in gateway mode).
4222        tracing::warn!(
4223            "convert_to_ws_url: rejecting URL with unsupported scheme: {}",
4224            vellaveto_types::sanitize_for_log(
4225                http_url.split("://").next().unwrap_or("unknown"),
4226                128,
4227            )
4228        );
4229        // Return invalid URL that will fail at connect_async()
4230        format!("ws://invalid-scheme-rejected.localhost/{}", http_url.len())
4231    }
4232}
4233
4234/// Connect to an upstream WebSocket server.
4235///
4236/// Returns the split WebSocket stream or an error.
4237async fn connect_upstream_ws(
4238    url: &str,
4239) -> Result<
4240    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
4241    String,
4242> {
4243    let connect_timeout = Duration::from_secs(10);
4244    match tokio::time::timeout(connect_timeout, tokio_tungstenite::connect_async(url)).await {
4245        Ok(Ok((ws_stream, _response))) => Ok(ws_stream),
4246        Ok(Err(e)) => Err(format!("WebSocket connection error: {}", e)),
4247        Err(_) => Err("WebSocket connection timeout (10s)".to_string()),
4248    }
4249}
4250
4251/// Register output schemas and validate WS response `structuredContent`.
4252///
4253/// Returns true when the response should be blocked.
4254async fn validate_ws_structured_content_response(
4255    json_val: &Value,
4256    state: &ProxyState,
4257    session_id: &str,
4258    tracked_tool_name: Option<&str>,
4259) -> bool {
4260    // Keep WS behavior aligned with HTTP/SSE paths.
4261    state
4262        .output_schema_registry
4263        .register_from_tools_list(json_val);
4264
4265    let Some(result) = json_val.get("result") else {
4266        return false;
4267    };
4268    let Some(structured) = result.get("structuredContent") else {
4269        return false;
4270    };
4271
4272    let meta_tool_name = result
4273        .get("_meta")
4274        .and_then(|m| m.get("tool"))
4275        .and_then(|t| t.as_str());
4276    let tool_name = match (meta_tool_name, tracked_tool_name) {
4277        (Some(meta), Some(tracked)) if !meta.eq_ignore_ascii_case(tracked) => {
4278            tracing::warn!(
4279                "SECURITY: WS structuredContent tool mismatch (meta='{}', tracked='{}'); using tracked tool name",
4280                meta,
4281                tracked
4282            );
4283            tracked
4284        }
4285        (Some(meta), _) => meta,
4286        (None, Some(tracked)) => tracked,
4287        (None, None) => "unknown",
4288    };
4289
4290    match state.output_schema_registry.validate(tool_name, structured) {
4291        ValidationResult::Invalid { violations } => {
4292            tracing::warn!(
4293                "SECURITY: WS structuredContent validation failed for tool '{}': {:?}",
4294                tool_name,
4295                violations
4296            );
4297            let action = Action::new(
4298                "vellaveto",
4299                "output_schema_violation",
4300                json!({
4301                    "tool": tool_name,
4302                    "violations": violations,
4303                    "session": session_id,
4304                    "transport": "websocket",
4305                }),
4306            );
4307            if let Err(e) = state
4308                .audit
4309                .log_entry(
4310                    &action,
4311                    &Verdict::Deny {
4312                        reason: format!("WS structuredContent validation failed: {:?}", violations),
4313                    },
4314                    json!({"source": "ws_proxy", "event": "output_schema_violation_ws"}),
4315                )
4316                .await
4317            {
4318                tracing::warn!("Failed to audit WS output schema violation: {}", e);
4319            }
4320            true
4321        }
4322        ValidationResult::Valid => {
4323            tracing::debug!("WS structuredContent validated for tool '{}'", tool_name);
4324            false
4325        }
4326        ValidationResult::NoSchema => {
4327            tracing::debug!(
4328                "No output schema registered for WS tool '{}', skipping validation",
4329                tool_name
4330            );
4331            false
4332        }
4333    }
4334}
4335
4336// NOTE: build_ws_evaluation_context() was removed in FIND-R130-002 fix.
4337// All callers now build EvaluationContext inline inside the DashMap shard
4338// lock to prevent TOCTOU races on call_counts/action_history.
4339
4340/// Check per-connection rate limit. Returns true if within limit.
4341fn check_rate_limit(
4342    counter: &AtomicU64,
4343    window_start: &std::sync::Mutex<std::time::Instant>,
4344    max_per_sec: u32,
4345) -> bool {
4346    // SECURITY (FIND-R182-006): Fail-closed — zero rate limit blocks all messages.
4347    // Previously returned true (fail-open), which disabled rate limiting entirely.
4348    if max_per_sec == 0 {
4349        return false;
4350    }
4351
4352    let now = std::time::Instant::now();
4353    let mut start = match window_start.lock() {
4354        Ok(guard) => guard,
4355        Err(e) => {
4356            tracing::error!("WS rate limiter mutex poisoned — fail-closed: {}", e);
4357            return false;
4358        }
4359    };
4360
4361    if now.duration_since(*start) >= Duration::from_secs(1) {
4362        // Reset window
4363        *start = now;
4364        // SECURITY (FIND-R55-WS-003): Use SeqCst for security-critical rate limit counter.
4365        counter.store(1, Ordering::SeqCst);
4366        true
4367    } else {
4368        // SECURITY (FIND-R182-003): saturating arithmetic prevents overflow wrap-to-zero.
4369        // SECURITY (FIND-R155-WS-001): Conditional atomic increment — only increment if
4370        // within limit, reject otherwise. This eliminates the TOCTOU gap between
4371        // load()+compare and also prevents counter inflation from rejected requests.
4372        // The closure returns None when limit is reached, causing fetch_update to fail
4373        // without modifying the counter.
4374        let limit = max_per_sec as u64;
4375        match counter.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
4376            if v >= limit {
4377                None // Limit reached — do not increment
4378            } else {
4379                Some(v.saturating_add(1))
4380            }
4381        }) {
4382            Ok(_prev) => true, // Within limit, counter was incremented
4383            Err(_) => false,   // Limit exceeded, counter unchanged
4384        }
4385    }
4386}
4387
4388/// Extract scannable text from a JSON-RPC request for injection scanning.
4389///
4390/// SECURITY (FIND-R46-WS-001): Scans tool call arguments, resource URIs,
4391/// and sampling request content for injection payloads in the client→upstream
4392/// direction. Matches the HTTP proxy's request-side injection scanning.
4393fn extract_scannable_text_from_request(json_val: &Value) -> String {
4394    let mut text_parts = Vec::new();
4395
4396    // SECURITY (FIND-R224-002): Recursively scan the entire `params` subtree,
4397    // not just specific fields. Previous narrow extraction missed injection
4398    // payloads in TaskRequest (params.message), ExtensionMethod, and other
4399    // message types with non-standard parameter structures. This gives parity
4400    // with the HTTP handler's `extract_passthrough_text_for_injection` which
4401    // scans all of params recursively.
4402    if let Some(params) = json_val.get("params") {
4403        extract_strings_recursive(params, &mut text_parts, 0);
4404    }
4405
4406    // SECURITY (FIND-R224-006): Also scan `result` field for injection payloads.
4407    // JSON-RPC response messages (sampling/elicitation replies) carry data in
4408    // `result` rather than `params`. Without this, upstream injection payloads
4409    // in response results bypass WS scanning while being caught by HTTP/gRPC.
4410    if let Some(result) = json_val.get("result") {
4411        extract_strings_recursive(result, &mut text_parts, 0);
4412    }
4413
4414    text_parts.join("\n")
4415}
4416
4417/// Recursively extract string values from a JSON value, with depth and count bounds.
4418///
4419/// SECURITY (FIND-R48-007): Added MAX_PARTS to prevent memory amplification
4420/// from messages containing many short strings.
4421fn extract_strings_recursive(val: &Value, parts: &mut Vec<String>, depth: usize) {
4422    // SECURITY (FIND-R154-005): Use depth 32 matching shared MAX_SCAN_DEPTH
4423    // in scanner_base.rs. Previous limit of 10 allowed injection payloads
4424    // nested between depth 11-32 to evade WS scanning while being caught
4425    // by the stdio relay and DLP scanner (both use MAX_SCAN_DEPTH=32).
4426    const MAX_DEPTH: usize = 32;
4427    const MAX_PARTS: usize = 1000;
4428    if depth > MAX_DEPTH || parts.len() >= MAX_PARTS {
4429        return;
4430    }
4431    match val {
4432        Value::String(s) => parts.push(s.clone()),
4433        Value::Array(arr) => {
4434            for item in arr {
4435                extract_strings_recursive(item, parts, depth + 1);
4436            }
4437        }
4438        Value::Object(map) => {
4439            for (key, v) in map {
4440                // SECURITY (FIND-R154-003): Also scan object keys for injection
4441                // payloads. Parity with stdio relay's traverse_json_strings_with_keys.
4442                // Without this, attackers can hide injection in JSON key names.
4443                if parts.len() < MAX_PARTS {
4444                    parts.push(key.clone());
4445                }
4446                extract_strings_recursive(v, parts, depth + 1);
4447            }
4448        }
4449        _ => {}
4450    }
4451}
4452
4453/// Extract scannable text from a JSON-RPC response for injection scanning.
4454///
4455/// SECURITY (FIND-R130-004): Delegates to the shared `extract_text_from_result()`
4456/// which covers `resource.text`, `resource.blob` (base64-decoded), `annotations`,
4457/// and `_meta` — all missing from the previous WS-only implementation.
4458fn extract_scannable_text(json_val: &Value) -> String {
4459    let mut text_parts = Vec::new();
4460
4461    // Scan result via shared extraction (covers content[].text, resource.text,
4462    // resource.blob, annotations, instructionsForUser, structuredContent, _meta).
4463    if let Some(result) = json_val.get("result") {
4464        let result_text = super::inspection::extract_text_from_result(result);
4465        if !result_text.is_empty() {
4466            text_parts.push(result_text);
4467        }
4468    }
4469
4470    // Scan error messages (not covered by extract_text_from_result)
4471    if let Some(error) = json_val.get("error") {
4472        if let Some(msg) = error.get("message").and_then(|m| m.as_str()) {
4473            text_parts.push(msg.to_string());
4474        }
4475        // SECURITY (FIND-R168-005): Use as_str() first to avoid wrapping
4476        // string values in JSON quotes. Parity with scanner_base.rs.
4477        if let Some(data) = error.get("data") {
4478            if let Some(s) = data.as_str() {
4479                text_parts.push(s.to_string());
4480            } else {
4481                text_parts.push(data.to_string());
4482            }
4483        }
4484    }
4485
4486    text_parts.join("\n")
4487}
4488
4489/// Create a pending approval for WebSocket-denied actions when an approval store
4490/// is configured. Returns the pending approval ID on success.
4491async fn create_ws_approval(
4492    state: &ProxyState,
4493    session_id: &str,
4494    action: &Action,
4495    reason: &str,
4496) -> Option<String> {
4497    let store = state.approval_store.as_ref()?;
4498    let requested_by = state.sessions.get_mut(session_id).and_then(|session| {
4499        session
4500            .agent_identity
4501            .as_ref()
4502            .and_then(|identity| identity.subject.clone())
4503            .or_else(|| session.oauth_subject.clone())
4504    });
4505    match store
4506        .create(action.clone(), reason.to_string(), requested_by)
4507        .await
4508    {
4509        Ok(id) => Some(id),
4510        Err(e) => {
4511            tracing::error!(
4512                session_id = %session_id,
4513                "Failed to create WebSocket approval (fail-closed): {}",
4514                e
4515            );
4516            None
4517        }
4518    }
4519}
4520
4521/// Build a JSON-RPC error response string for WebSocket with optional `error.data`.
4522fn make_ws_error_response_with_data(
4523    id: Option<&Value>,
4524    code: i64,
4525    message: &str,
4526    data: Option<Value>,
4527) -> String {
4528    let mut error = serde_json::Map::new();
4529    error.insert("code".to_string(), Value::from(code));
4530    error.insert("message".to_string(), Value::from(message));
4531    if let Some(data) = data {
4532        error.insert("data".to_string(), data);
4533    }
4534    let response = json!({
4535        "jsonrpc": "2.0",
4536        "id": id.cloned().unwrap_or(Value::Null),
4537        "error": Value::Object(error),
4538    });
4539    serde_json::to_string(&response).unwrap_or_else(|_| {
4540        format!(
4541            r#"{{"jsonrpc":"2.0","error":{{"code":{},"message":"{}"}},"id":null}}"#,
4542            code, message
4543        )
4544    })
4545}
4546
4547/// Build a JSON-RPC error response string for WebSocket.
4548fn make_ws_error_response(id: Option<&Value>, code: i64, message: &str) -> String {
4549    make_ws_error_response_with_data(id, code, message, None)
4550}
4551
4552#[cfg(test)]
4553mod tests;