Skip to main content

vellaveto_http_proxy/proxy/websocket/
mod.rs

1// Copyright 2026 Paolo Vella
2// SPDX-License-Identifier: BUSL-1.1
3//
4// Use of this software is governed by the Business Source License
5// included in the LICENSE-BSL-1.1 file at the root of this repository.
6//
7// Change Date: Three years from the date of publication of this version.
8// Change License: MPL-2.0
9
10//! WebSocket transport for MCP JSON-RPC messages (SEP-1288).
11//!
12//! Implements a WebSocket reverse proxy that sits between MCP clients and
13//! an upstream MCP server. WebSocket messages (text frames) are parsed as
14//! JSON-RPC, classified via `vellaveto_mcp::extractor`, evaluated against
15//! loaded policies, and forwarded to the upstream server.
16//!
17//! Security invariants:
18//! - **Fail-closed**: Unparseable messages close the connection (code 1008).
19//! - **No binary frames**: Only text frames are accepted (code 1003 for binary).
20//! - **Session binding**: Each WS connection is bound to exactly one session.
21//! - **Canonicalization**: Re-serialized JSON forwarded (TOCTOU defense).
22
23use axum::{
24    extract::{
25        ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade},
26        ConnectInfo, State,
27    },
28    http::HeaderMap,
29    response::Response,
30};
31use futures_util::{SinkExt, StreamExt};
32use serde_json::{json, Value};
33use std::net::SocketAddr;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::Mutex;
38use vellaveto_mcp::extractor::{self, MessageType};
39use vellaveto_mcp::inspection::{
40    inspect_for_injection, scan_notification_for_secrets, scan_parameters_for_secrets,
41    scan_response_for_secrets, scan_text_for_secrets, scan_tool_descriptions,
42    scan_tool_descriptions_with_scanner,
43};
44use vellaveto_mcp::mediation::{
45    build_acis_envelope_with_security_context, build_secondary_acis_envelope_with_security_context,
46    mediate_with_security_context,
47};
48use vellaveto_mcp::output_validation::ValidationResult;
49use vellaveto_types::acis::{AcisDecisionEnvelope, DecisionOrigin};
50use vellaveto_types::{Action, EvaluationContext, RuntimeSecurityContext, Verdict};
51
52use super::auth::{validate_agent_identity, validate_api_key, validate_oauth};
53use super::call_chain::{
54    check_privilege_escalation, sync_session_call_chain_from_headers, take_tracked_tool_call,
55    track_pending_tool_call,
56};
57use super::origin::validate_origin;
58use super::ProxyState;
59use crate::proxy_metrics::record_dlp_finding;
60
61const INVALID_PRESENTED_APPROVAL_REASON: &str = "Supplied approval is not valid for this action";
62
63/// Configuration for WebSocket transport.
64#[derive(Debug, Clone)]
65pub struct WebSocketConfig {
66    /// Maximum message size in bytes (default: 1 MB).
67    pub max_message_size: usize,
68    /// Idle timeout in seconds — close connection after no message activity (default: 300s).
69    /// SECURITY (FIND-R182-001): True idle timeout that resets on every message.
70    pub idle_timeout_secs: u64,
71    /// Maximum messages per second per connection for client-to-upstream (default: 100).
72    pub message_rate_limit: u32,
73    /// Maximum messages per second per connection for upstream-to-client (default: 500).
74    /// SECURITY (FIND-R46-WS-003): Rate limits on the upstream→client direction prevent
75    /// a malicious upstream from flooding the client with responses.
76    pub upstream_rate_limit: u32,
77}
78
79impl Default for WebSocketConfig {
80    fn default() -> Self {
81        Self {
82            max_message_size: 1_048_576,
83            idle_timeout_secs: 300,
84            message_rate_limit: 100,
85            upstream_rate_limit: 500,
86        }
87    }
88}
89
90/// WebSocket close codes per RFC 6455.
91const CLOSE_POLICY_VIOLATION: u16 = 1008;
92const CLOSE_UNSUPPORTED_DATA: u16 = 1003;
93/// Close code for oversized messages. Used by axum's `max_message_size`
94/// automatically; kept here for documentation and test assertions.
95#[cfg(test)]
96const CLOSE_MESSAGE_TOO_BIG: u16 = 1009;
97const CLOSE_NORMAL: u16 = 1000;
98
99/// Global WebSocket metrics counters.
100static WS_CONNECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
101static WS_MESSAGES_TOTAL: AtomicU64 = AtomicU64::new(0);
102
103/// Record WebSocket connection metric.
104fn record_ws_connection() {
105    // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
106    // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
107    let _ = WS_CONNECTIONS_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
108        Some(v.saturating_add(1))
109    });
110    metrics::counter!("vellaveto_ws_connections_total").increment(1);
111}
112
113/// Record WebSocket message metric.
114fn record_ws_message(direction: &str) {
115    // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
116    // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
117    let _ = WS_MESSAGES_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
118        Some(v.saturating_add(1))
119    });
120    metrics::counter!(
121        "vellaveto_ws_messages_total",
122        "direction" => direction.to_string()
123    )
124    .increment(1);
125}
126
127/// Get current connection count (for testing).
128#[cfg(test)]
129pub(crate) fn ws_connections_count() -> u64 {
130    WS_CONNECTIONS_TOTAL.load(Ordering::SeqCst)
131}
132
133/// Get current message count (for testing).
134#[cfg(test)]
135pub(crate) fn ws_messages_count() -> u64 {
136    WS_MESSAGES_TOTAL.load(Ordering::SeqCst)
137}
138
139#[derive(Clone)]
140struct WsHandshakeSecurity {
141    oauth_claims: Option<crate::proxy::auth::OAuthValidationEvidence>,
142    headers: HeaderMap,
143}
144
145fn build_ws_runtime_security_context(
146    msg: &Value,
147    action: &Action,
148    headers: &HeaderMap,
149    inputs: super::helpers::TransportSecurityInputs<'_>,
150) -> Option<RuntimeSecurityContext> {
151    super::helpers::build_runtime_security_context(msg, action, headers, inputs)
152}
153
154fn refresh_ws_acis_envelope(
155    envelope: &AcisDecisionEnvelope,
156    action: &Action,
157    verdict: &Verdict,
158    origin: DecisionOrigin,
159    session_id: &str,
160    eval_ctx: &EvaluationContext,
161    security_context: Option<&RuntimeSecurityContext>,
162) -> AcisDecisionEnvelope {
163    build_acis_envelope_with_security_context(
164        &envelope.decision_id,
165        action,
166        verdict,
167        origin,
168        "websocket",
169        &envelope.findings,
170        envelope.evaluation_us,
171        Some(session_id),
172        None,
173        Some(eval_ctx),
174        security_context,
175    )
176}
177
178use vellaveto_types::is_unicode_format_char as is_unicode_format_char_ws;
179
180/// Query parameters for the WebSocket upgrade endpoint.
181#[derive(Debug, serde::Deserialize, Default)]
182#[serde(deny_unknown_fields)]
183pub struct WsQueryParams {
184    /// Optional session ID for session resumption.
185    #[serde(default)]
186    pub session_id: Option<String>,
187}
188
189/// Handle WebSocket upgrade request at `/mcp/ws`.
190///
191/// Authenticates the request, validates origin, creates/resumes a session,
192/// and upgrades the HTTP connection to a WebSocket.
193pub async fn handle_ws_upgrade(
194    State(state): State<ProxyState>,
195    ConnectInfo(addr): ConnectInfo<SocketAddr>,
196    headers: HeaderMap,
197    query: axum::extract::Query<WsQueryParams>,
198    ws: WebSocketUpgrade,
199) -> Response {
200    // 1. Validate origin (CSRF / DNS rebinding defense)
201    if let Err(resp) = validate_origin(&headers, &state.bind_addr, &state.allowed_origins) {
202        return resp;
203    }
204
205    // 2. Authenticate before upgrade (API key or OAuth)
206    if let Err(resp) = validate_api_key(&state, &headers) {
207        return resp;
208    }
209
210    // SECURITY (FIND-R53-WS-001): Validate OAuth token at upgrade time.
211    // Parity with HTTP POST (handlers.rs:154) and GET (handlers.rs:2864).
212    // Without this, WS connections bypass token expiry checks.
213    let oauth_claims = match validate_oauth(
214        &state,
215        &headers,
216        "GET",
217        &super::auth::build_effective_request_uri(
218            &headers,
219            state.bind_addr,
220            &axum::http::Uri::from_static("/mcp/ws"),
221            false,
222        ),
223        query.session_id.as_deref(),
224    )
225    .await
226    {
227        Ok(claims) => claims,
228        Err(response) => return response,
229    };
230
231    // SECURITY (FIND-R53-WS-002, FIND-R159-003): Validate agent identity at upgrade time.
232    // Parity with HTTP POST (handlers.rs:160) and GET (handlers.rs:2871).
233    // FIND-R159-003: Identity MUST be stored in session (was previously discarded with `_`).
234    let agent_identity = match validate_agent_identity(&state, &headers).await {
235        Ok(identity) => identity,
236        Err(response) => return response,
237    };
238
239    // SECURITY (FIND-R55-WS-004, FIND-R81-001): Validate session_id length and
240    // control/format characters from query parameter. Parity with HTTP POST/GET
241    // handlers (handlers.rs:154, handlers.rs:2928) which reject control chars.
242    // SECURITY (FIND-R81-WS-001): Also reject Unicode format characters (zero-width,
243    // bidi overrides, BOM) that can bypass string-based security checks.
244    let ws_session_id = query.session_id.as_deref().filter(|id| {
245        !id.is_empty()
246            && id.len() <= 128
247            && !id
248                .chars()
249                .any(|c| c.is_control() || is_unicode_format_char_ws(c))
250    });
251
252    // 3. Get or create session
253    let session_id = state.sessions.get_or_create(ws_session_id);
254
255    // SECURITY (FIND-R53-WS-003): Session ownership binding — prevent session fixation.
256    // Parity with HTTP GET (handlers.rs:2914-2953).
257    if let Some(ref claims) = oauth_claims {
258        if let Some(mut session) = state.sessions.get_mut(&session_id) {
259            match &session.oauth_subject {
260                Some(owner) if owner != &claims.sub => {
261                    tracing::warn!(
262                        session_id = %session_id,
263                        owner = %owner,
264                        requester = %claims.sub,
265                        "WS upgrade rejected: session owned by different principal"
266                    );
267                    return axum::response::IntoResponse::into_response((
268                        axum::http::StatusCode::FORBIDDEN,
269                        axum::Json(json!({
270                            "error": "Session belongs to another principal"
271                        })),
272                    ));
273                }
274                None => {
275                    // Bind session to this OAuth subject
276                    session.oauth_subject = Some(claims.sub.clone());
277                    // SECURITY (FIND-R73-SRV-006): Store token expiry, matching
278                    // HTTP POST handler pattern to enforce token lifetime.
279                    if claims.exp > 0 {
280                        session.token_expires_at = Some(claims.exp);
281                    }
282                }
283                _ => {
284                    // Already owned by this principal — use earliest expiry
285                    // SECURITY (FIND-R73-SRV-006): Parity with HTTP POST handler
286                    // (R23-PROXY-6) — prevent long-lived tokens from extending
287                    // sessions originally bound to short-lived tokens.
288                    if claims.exp > 0 {
289                        session.token_expires_at = Some(
290                            session
291                                .token_expires_at
292                                .map_or(claims.exp, |existing| existing.min(claims.exp)),
293                        );
294                    }
295                }
296            }
297        }
298    }
299
300    // SECURITY (FIND-R159-003): Store agent identity in session — parity with HTTP
301    // POST (handlers.rs:295-298) and GET (handlers.rs:3641-3643). Without this,
302    // ABAC policies referencing agent_identity would evaluate against None for
303    // WebSocket connections, creating a policy bypass.
304    if let Some(ref identity) = agent_identity {
305        if let Some(mut session) = state.sessions.get_mut(&session_id) {
306            session.agent_identity = Some(identity.clone());
307        }
308    } else if let Some(ref oauth_evidence) = oauth_claims {
309        match oauth_evidence.projected_agent_identity() {
310            Ok(Some(identity)) => {
311                if let Some(mut session) = state.sessions.get_mut(&session_id) {
312                    session.agent_identity.get_or_insert(identity);
313                }
314            }
315            Ok(None) => {}
316            Err(error) => {
317                tracing::warn!(
318                    "WebSocket transport identity claims validation failed: {}",
319                    error
320                );
321                return axum::response::IntoResponse::into_response((
322                    axum::http::StatusCode::UNAUTHORIZED,
323                    axum::Json(json!({
324                        "error": "Invalid authorization token"
325                    })),
326                ));
327            }
328        }
329    }
330
331    // SECURITY (FIND-R46-006): Validate and extract call chain from upgrade headers.
332    // The call chain is synced once during upgrade and reused for all messages
333    // in this WebSocket connection.
334    if let Err(reason) = super::call_chain::validate_call_chain_header(&headers, &state.limits) {
335        tracing::warn!(
336            session_id = %session_id,
337            "WS upgrade rejected: invalid call chain header: {}",
338            reason
339        );
340        return axum::response::IntoResponse::into_response((
341            axum::http::StatusCode::BAD_REQUEST,
342            axum::Json(json!({
343                "error": "Invalid request"
344            })),
345        ));
346    }
347    sync_session_call_chain_from_headers(
348        &state.sessions,
349        &session_id,
350        &headers,
351        state.call_chain_hmac_key.as_ref(),
352        &state.limits,
353    );
354
355    let ws_config = state.ws_config.clone().unwrap_or_default();
356
357    // Phase 28: Extract W3C Trace Context from the HTTP upgrade request headers.
358    // The trace_id is used for correlating all audit entries during this WS session.
359    let trace_ctx = super::trace_propagation::extract_trace_context(&headers);
360    let ws_trace_id = trace_ctx
361        .trace_id
362        .clone()
363        .unwrap_or_else(|| uuid::Uuid::new_v4().to_string().replace('-', ""));
364
365    tracing::info!(
366        session_id = %session_id,
367        trace_id = %ws_trace_id,
368        peer = %addr,
369        "WebSocket upgrade accepted"
370    );
371
372    // 4. Configure and upgrade
373    ws.max_message_size(ws_config.max_message_size)
374        .on_upgrade(move |socket| {
375            handle_ws_connection(
376                socket,
377                state,
378                session_id,
379                ws_config,
380                addr,
381                ws_trace_id,
382                WsHandshakeSecurity {
383                    oauth_claims,
384                    headers,
385                },
386            )
387        })
388}
389
390/// Handle an established WebSocket connection.
391///
392/// Establishes an upstream WS connection, then relays messages bidirectionally
393/// with policy enforcement on client→upstream messages and DLP/injection
394/// scanning on upstream→client messages.
395async fn handle_ws_connection(
396    client_ws: WebSocket,
397    state: ProxyState,
398    session_id: String,
399    ws_config: WebSocketConfig,
400    peer_addr: SocketAddr,
401    trace_id: String,
402    handshake_security: WsHandshakeSecurity,
403) {
404    record_ws_connection();
405    let start = std::time::Instant::now();
406    tracing::debug!(
407        session_id = %session_id,
408        trace_id = %trace_id,
409        "WebSocket connection established with trace context"
410    );
411
412    // Connect to upstream — use gateway default backend if configured
413    let upstream_url = if let Some(ref gw) = state.gateway {
414        match gw.route("") {
415            Some(d) => convert_to_ws_url(&d.upstream_url),
416            None => {
417                tracing::error!(session_id = %session_id, "No healthy upstream for WebSocket");
418                let (mut client_sink, _) = client_ws.split();
419                let _ = client_sink
420                    .send(Message::Close(Some(CloseFrame {
421                        code: CLOSE_POLICY_VIOLATION,
422                        reason: "No healthy upstream available".into(),
423                    })))
424                    .await;
425                return;
426            }
427        }
428    } else {
429        convert_to_ws_url(&state.upstream_url)
430    };
431    let upstream_ws = match connect_upstream_ws(&upstream_url).await {
432        Ok(ws) => ws,
433        Err(e) => {
434            tracing::error!(
435                session_id = %session_id,
436                "Failed to connect to upstream WebSocket: {}",
437                e
438            );
439            // Send close frame to client
440            let (mut client_sink, _) = client_ws.split();
441            let _ = client_sink
442                .send(Message::Close(Some(CloseFrame {
443                    code: CLOSE_POLICY_VIOLATION,
444                    reason: "Upstream connection failed".into(),
445                })))
446                .await;
447            return;
448        }
449    };
450
451    let (client_sink, client_stream) = client_ws.split();
452    let (upstream_sink, upstream_stream) = upstream_ws.split();
453
454    // Wrap sinks in Arc<Mutex> for shared access
455    let client_sink = Arc::new(Mutex::new(client_sink));
456    let upstream_sink = Arc::new(Mutex::new(upstream_sink));
457
458    // Rate limiter state: track messages in the current second window
459    let rate_counter = Arc::new(AtomicU64::new(0));
460    let rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
461
462    // SECURITY (FIND-R46-WS-003): Separate rate limiter for upstream→client direction
463    let upstream_rate_counter = Arc::new(AtomicU64::new(0));
464    let upstream_rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
465
466    let idle_timeout = Duration::from_secs(ws_config.idle_timeout_secs);
467
468    // SECURITY (FIND-R182-001): Shared last-activity tracker so idle timeout resets
469    // on every message (true idle detection, not max-lifetime).
470    let last_activity = Arc::new(AtomicU64::new(0));
471    let connection_epoch = std::time::Instant::now();
472
473    // Client → Vellaveto → Upstream relay
474    let client_to_upstream = {
475        let state = state.clone();
476        let session_id = session_id.clone();
477        let client_sink = client_sink.clone();
478        let upstream_sink = upstream_sink.clone();
479        let rate_counter = rate_counter.clone();
480        let rate_window_start = rate_window_start.clone();
481        let ws_config = ws_config.clone();
482        let last_activity = last_activity.clone();
483        let handshake_security = handshake_security.clone();
484
485        relay_client_to_upstream(
486            client_stream,
487            client_sink,
488            upstream_sink,
489            state,
490            session_id,
491            ws_config,
492            rate_counter,
493            rate_window_start,
494            last_activity,
495            connection_epoch,
496            handshake_security.oauth_claims,
497            handshake_security.headers,
498        )
499    };
500
501    // Upstream → Vellaveto → Client relay
502    let upstream_to_client = {
503        let state = state.clone();
504        let session_id = session_id.clone();
505        let client_sink = client_sink.clone();
506        let ws_config = ws_config.clone();
507        let last_activity = last_activity.clone();
508
509        relay_upstream_to_client(
510            upstream_stream,
511            client_sink,
512            state,
513            session_id,
514            ws_config,
515            upstream_rate_counter,
516            upstream_rate_window_start,
517            last_activity,
518            connection_epoch,
519        )
520    };
521
522    // SECURITY (FIND-R182-001): True idle timeout — check periodically and
523    // close only if no message activity since last check.
524    let idle_check = {
525        let session_id = session_id.clone();
526        let last_activity = last_activity.clone();
527        async move {
528            // Check every 10% of idle timeout (min 1s) for responsive detection.
529            let check_interval = Duration::from_secs((ws_config.idle_timeout_secs / 10).max(1));
530            let mut interval = tokio::time::interval(check_interval);
531            interval.tick().await; // first tick is immediate, skip it
532            loop {
533                interval.tick().await;
534                // SECURITY (R251-WS-1): SeqCst for cross-thread session timeout enforcement.
535                let last_ms = last_activity.load(Ordering::SeqCst);
536                // SECURITY (FIND-R190-002): Use saturating_sub to prevent underflow.
537                let elapsed_since_activity =
538                    (connection_epoch.elapsed().as_millis() as u64).saturating_sub(last_ms);
539                if elapsed_since_activity >= idle_timeout.as_millis() as u64 {
540                    tracing::info!(
541                        session_id = %session_id,
542                        idle_secs = elapsed_since_activity / 1000,
543                        "WebSocket idle timeout ({}s), closing",
544                        ws_config.idle_timeout_secs
545                    );
546                    break;
547                }
548            }
549        }
550    };
551
552    // Run both relay loops with idle timeout
553    tokio::select! {
554        _ = client_to_upstream => {
555            tracing::debug!(session_id = %session_id, "Client stream ended");
556        }
557        _ = upstream_to_client => {
558            tracing::debug!(session_id = %session_id, "Upstream stream ended");
559        }
560        _ = idle_check => {}
561    }
562
563    // Clean shutdown: close both sides
564    {
565        let mut sink = client_sink.lock().await;
566        let _ = sink
567            .send(Message::Close(Some(CloseFrame {
568                code: CLOSE_NORMAL,
569                reason: "Session ended".into(),
570            })))
571            .await;
572    }
573    {
574        let mut sink = upstream_sink.lock().await;
575        let _ = sink.close().await;
576    }
577
578    let duration = start.elapsed();
579    metrics::histogram!("vellaveto_ws_connection_duration_seconds").record(duration.as_secs_f64());
580
581    tracing::info!(
582        session_id = %session_id,
583        peer = %peer_addr,
584        duration_secs = duration.as_secs(),
585        "WebSocket connection closed"
586    );
587}
588
589/// Relay messages from client to upstream with policy enforcement.
590#[allow(clippy::too_many_arguments)]
591async fn relay_client_to_upstream(
592    mut client_stream: futures_util::stream::SplitStream<WebSocket>,
593    client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
594    upstream_sink: Arc<
595        Mutex<
596            futures_util::stream::SplitSink<
597                tokio_tungstenite::WebSocketStream<
598                    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
599                >,
600                tokio_tungstenite::tungstenite::Message,
601            >,
602        >,
603    >,
604    state: ProxyState,
605    session_id: String,
606    ws_config: WebSocketConfig,
607    rate_counter: Arc<AtomicU64>,
608    rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
609    last_activity: Arc<AtomicU64>,
610    connection_epoch: std::time::Instant,
611    oauth_claims: Option<crate::proxy::auth::OAuthValidationEvidence>,
612    handshake_headers: HeaderMap,
613) {
614    while let Some(msg_result) = client_stream.next().await {
615        let msg = match msg_result {
616            Ok(m) => m,
617            Err(e) => {
618                tracing::debug!(session_id = %session_id, "Client WS error: {}", e);
619                break;
620            }
621        };
622
623        // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
624        // SECURITY (R251-WS-1): SeqCst ensures visibility to timeout checker thread.
625        last_activity.store(
626            connection_epoch.elapsed().as_millis() as u64,
627            Ordering::SeqCst,
628        );
629
630        record_ws_message("client_to_upstream");
631
632        // SECURITY (FIND-R52-WS-003): Per-message OAuth token expiry check.
633        // After WebSocket upgrade, the HTTP auth middleware no longer runs.
634        // A token that expires mid-connection must be rejected to prevent
635        // indefinite access via a long-lived WebSocket.
636        {
637            let token_expired = state
638                .sessions
639                .get_mut(&session_id)
640                .and_then(|s| {
641                    s.token_expires_at.map(|exp| {
642                        let now = std::time::SystemTime::now()
643                            .duration_since(std::time::UNIX_EPOCH)
644                            .unwrap_or_default()
645                            .as_secs();
646                        now >= exp
647                    })
648                })
649                .unwrap_or(false);
650            if token_expired {
651                tracing::warn!(
652                    session_id = %session_id,
653                    "SECURITY: OAuth token expired during WebSocket session, closing"
654                );
655                let error = json!({
656                    "jsonrpc": "2.0",
657                    "error": {
658                        "code": -32001,
659                        "message": "Session expired"
660                    },
661                    "id": null
662                });
663                let error_text = serde_json::to_string(&error)
664                    .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32001,"message":"Session expired"},"id":null}"#.to_string());
665                let mut sink = client_sink.lock().await;
666                let _ = sink.send(Message::Text(error_text.into())).await;
667                let _ = sink
668                    .send(Message::Close(Some(CloseFrame {
669                        code: CLOSE_POLICY_VIOLATION,
670                        reason: "Token expired".into(),
671                    })))
672                    .await;
673                break;
674            }
675        }
676
677        match msg {
678            Message::Text(text) => {
679                // Rate limiting
680                if !check_rate_limit(
681                    &rate_counter,
682                    &rate_window_start,
683                    ws_config.message_rate_limit,
684                ) {
685                    tracing::warn!(
686                        session_id = %session_id,
687                        "WebSocket rate limit exceeded, closing"
688                    );
689                    let mut sink = client_sink.lock().await;
690                    let _ = sink
691                        .send(Message::Close(Some(CloseFrame {
692                            code: CLOSE_POLICY_VIOLATION,
693                            reason: "Rate limit exceeded".into(),
694                        })))
695                        .await;
696                    break;
697                }
698
699                // SECURITY (FIND-R46-005): Reject JSON with duplicate keys before parsing.
700                // Prevents parser-disagreement attacks (CVE-2017-12635, CVE-2020-16250)
701                // where the proxy evaluates one key value but upstream sees another.
702                if let Some(dup_key) = vellaveto_mcp::framing::find_duplicate_json_key(&text) {
703                    tracing::warn!(
704                        session_id = %session_id,
705                        "SECURITY: Rejected WS message with duplicate key: \"{}\"",
706                        dup_key
707                    );
708                    let mut sink = client_sink.lock().await;
709                    let _ = sink
710                        .send(Message::Close(Some(CloseFrame {
711                            code: CLOSE_POLICY_VIOLATION,
712                            reason: "Duplicate JSON key detected".into(),
713                        })))
714                        .await;
715                    break;
716                }
717
718                // SECURITY (FIND-R53-WS-004): Reject WS messages with control characters.
719                // Parity with HTTP GET event_id validation (handlers.rs:2899).
720                // Control chars in JSON-RPC messages can be used for log injection
721                // or to bypass string-based security checks.
722                if text.chars().any(|c| {
723                    // Allow standard JSON whitespace (\t, \n, \r) but reject other
724                    // ASCII control chars and Unicode format chars (FIND-R54-011).
725                    (c.is_control() && c != '\n' && c != '\r' && c != '\t')
726                        || is_unicode_format_char_ws(c)
727                }) {
728                    tracing::warn!(
729                        session_id = %session_id,
730                        "SECURITY: Rejected WS message with control characters"
731                    );
732                    let error =
733                        make_ws_error_response(None, -32600, "Message contains control characters");
734                    let mut sink = client_sink.lock().await;
735                    let _ = sink.send(Message::Text(error.into())).await;
736                    continue;
737                }
738
739                // Parse JSON
740                let parsed: Value = match serde_json::from_str(&text) {
741                    Ok(v) => v,
742                    Err(_) => {
743                        tracing::warn!(
744                            session_id = %session_id,
745                            "Unparseable JSON in WebSocket text frame, closing (fail-closed)"
746                        );
747                        let mut sink = client_sink.lock().await;
748                        let _ = sink
749                            .send(Message::Close(Some(CloseFrame {
750                                code: CLOSE_POLICY_VIOLATION,
751                                reason: "Invalid JSON".into(),
752                            })))
753                            .await;
754                        break;
755                    }
756                };
757
758                // SECURITY (FIND-R46-WS-001): Injection scanning on client→upstream text frames.
759                // The HTTP proxy scans request bodies for injection; the WebSocket proxy must
760                // do the same to maintain security parity. Fail-closed: if injection is detected
761                // and blocking is enabled, deny the message.
762                if !state.injection_disabled {
763                    let scannable = extract_scannable_text_from_request(&parsed);
764                    if !scannable.is_empty() {
765                        let injection_matches: Vec<String> =
766                            if let Some(ref scanner) = state.injection_scanner {
767                                scanner
768                                    .inspect(&scannable)
769                                    .into_iter()
770                                    .map(|s| s.to_string())
771                                    .collect()
772                            } else {
773                                inspect_for_injection(&scannable)
774                                    .into_iter()
775                                    .map(|s| s.to_string())
776                                    .collect()
777                            };
778
779                        if !injection_matches.is_empty() {
780                            tracing::warn!(
781                                "SECURITY: Injection in WS client request! Session: {}, Patterns: {:?}",
782                                session_id,
783                                injection_matches,
784                            );
785
786                            let verdict = if state.injection_blocking {
787                                Verdict::Deny {
788                                    reason: format!(
789                                        "WS request injection blocked: {injection_matches:?}"
790                                    ),
791                                }
792                            } else {
793                                Verdict::Allow
794                            };
795
796                            let action = Action::new(
797                                "vellaveto",
798                                "ws_request_injection",
799                                json!({
800                                    "matched_patterns": injection_matches,
801                                    "session": session_id,
802                                    "transport": "websocket",
803                                    "direction": "client_to_upstream",
804                                }),
805                            );
806                            let request_payload = parsed
807                                .get("params")
808                                .or_else(|| parsed.get("result"))
809                                .cloned();
810                            let injection_security_context =
811                                request_payload.as_ref().map(|payload| {
812                                    super::helpers::parameter_injection_security_context(
813                                        payload,
814                                        state.injection_blocking,
815                                        "request_injection",
816                                    )
817                                });
818                            let envelope = build_secondary_acis_envelope_with_security_context(
819                                &action,
820                                &verdict,
821                                DecisionOrigin::InjectionScanner,
822                                "websocket",
823                                Some(&session_id),
824                                injection_security_context.as_ref(),
825                            );
826                            if let Err(e) = state
827                                .audit
828                                .log_entry_with_acis(
829                                    &action,
830                                    &verdict,
831                                    json!({
832                                        "source": "ws_proxy",
833                                        "event": "ws_request_injection_detected",
834                                    }),
835                                    envelope,
836                                )
837                                .await
838                            {
839                                tracing::warn!("Failed to audit WS request injection: {}", e);
840                            }
841
842                            if state.injection_blocking {
843                                let id = parsed.get("id");
844                                let error = make_ws_error_response(
845                                    id,
846                                    -32001,
847                                    "Request blocked: injection detected",
848                                );
849                                let mut sink = client_sink.lock().await;
850                                let _ = sink.send(Message::Text(error.into())).await;
851                                continue;
852                            }
853                        }
854                    }
855                }
856
857                let presented_approval_id = super::helpers::extract_approval_id_from_meta(&parsed);
858
859                // Classify and evaluate
860                let classified = extractor::classify_message(&parsed);
861                match classified {
862                    MessageType::ToolCall {
863                        ref id,
864                        ref tool_name,
865                        ref arguments,
866                    } => {
867                        // SECURITY (FIND-R46-009): Strict tool name validation (MCP 2025-11-25).
868                        // When enabled, reject tool names that don't conform to the spec format.
869                        if state.streamable_http.strict_tool_name_validation {
870                            if let Err(e) = vellaveto_types::validate_mcp_tool_name(tool_name) {
871                                tracing::warn!(
872                                    session_id = %session_id,
873                                    "SECURITY: Rejecting invalid WS tool name '{}': {}",
874                                    tool_name,
875                                    e
876                                );
877                                let error =
878                                    make_ws_error_response(Some(id), -32602, "Invalid tool name");
879                                let mut sink = client_sink.lock().await;
880                                let _ = sink.send(Message::Text(error.into())).await;
881                                continue;
882                            }
883                        }
884
885                        let mut action = extractor::extract_action(tool_name, arguments);
886                        let mut matched_approval_id: Option<String> = None;
887                        let mut matched_approval_registry: Option<&'static str> = None;
888
889                        // SECURITY (FIND-R75-002): DNS resolution for IP-based policy evaluation.
890                        // Parity with HTTP handler (handlers.rs:717). Without this, policies
891                        // using ip_rules are completely bypassed on the WebSocket transport.
892                        if state.engine.has_ip_rules() {
893                            super::helpers::resolve_domains(&mut action).await;
894                        }
895
896                        // SECURITY (FIND-R46-006): Call chain validation and privilege escalation check.
897                        // Extract X-Upstream-Agents from the initial WS upgrade headers stored in session.
898                        // For WebSocket, we sync the call chain once during upgrade and reuse it.
899                        let upstream_chain = {
900                            let session_ref = state.sessions.get_mut(&session_id);
901                            session_ref
902                                .map(|s| s.current_call_chain.clone())
903                                .unwrap_or_default()
904                        };
905                        let current_agent_id = {
906                            let session_ref = state.sessions.get_mut(&session_id);
907                            session_ref.and_then(|s| s.oauth_subject.clone())
908                        };
909
910                        // SECURITY (FIND-R46-006): Privilege escalation detection.
911                        if !upstream_chain.is_empty() {
912                            let priv_check = check_privilege_escalation(
913                                &state.engine,
914                                &state.policies,
915                                &action,
916                                &upstream_chain,
917                                current_agent_id.as_deref(),
918                            );
919                            if priv_check.escalation_detected {
920                                let verdict = Verdict::Deny {
921                                    reason: format!(
922                                        "Privilege escalation: agent '{}' would be denied",
923                                        priv_check
924                                            .escalating_from_agent
925                                            .as_deref()
926                                            .unwrap_or("unknown")
927                                    ),
928                                };
929                                let privilege_escalation_security_context =
930                                    super::helpers::privilege_escalation_security_context(&action);
931                                let envelope = build_secondary_acis_envelope_with_security_context(
932                                    &action,
933                                    &verdict,
934                                    DecisionOrigin::PolicyEngine,
935                                    "websocket",
936                                    Some(&session_id),
937                                    Some(&privilege_escalation_security_context),
938                                );
939                                if let Err(e) = state
940                                    .audit
941                                    .log_entry_with_acis(
942                                        &action,
943                                        &verdict,
944                                        json!({
945                                            "source": "ws_proxy",
946                                            "session": session_id,
947                                            "transport": "websocket",
948                                            "event": "privilege_escalation_blocked",
949                                            "escalating_from_agent": priv_check.escalating_from_agent,
950                                            "upstream_deny_reason": priv_check.upstream_deny_reason,
951                                        }),
952                                        envelope,
953                                    )
954                                    .await
955                                {
956                                    tracing::warn!(
957                                        "Failed to audit WS privilege escalation: {}",
958                                        e
959                                    );
960                                }
961                                let error =
962                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
963                                let mut sink = client_sink.lock().await;
964                                let _ = sink.send(Message::Text(error.into())).await;
965                                continue;
966                            }
967                        }
968
969                        // SECURITY (FIND-R46-007): Rug-pull detection.
970                        // Block calls to tools whose annotations changed since initial tools/list.
971                        // SECURITY (R240-PROXY-1): Fall back to global registry on session miss.
972                        let is_flagged = state
973                            .sessions
974                            .get_mut(&session_id)
975                            .map(|s| s.flagged_tools.contains(tool_name))
976                            .unwrap_or_else(|| state.sessions.is_tool_globally_flagged(tool_name));
977                        if is_flagged {
978                            let verdict = Verdict::Deny {
979                                reason: format!(
980                                    "Tool '{tool_name}' blocked: annotations changed (rug-pull detected)"
981                                ),
982                            };
983                            let rug_pull_security_context =
984                                super::helpers::rug_pull_security_context(&action);
985                            let envelope = build_secondary_acis_envelope_with_security_context(
986                                &action,
987                                &verdict,
988                                DecisionOrigin::CapabilityEnforcement,
989                                "websocket",
990                                Some(&session_id),
991                                Some(&rug_pull_security_context),
992                            );
993                            if let Err(e) = state
994                                .audit
995                                .log_entry_with_acis(
996                                    &action,
997                                    &verdict,
998                                    json!({
999                                        "source": "ws_proxy",
1000                                        "session": session_id,
1001                                        "transport": "websocket",
1002                                        "event": "rug_pull_tool_blocked",
1003                                        "tool": tool_name,
1004                                    }),
1005                                    envelope,
1006                                )
1007                                .await
1008                            {
1009                                tracing::warn!("Failed to audit WS rug-pull block: {}", e);
1010                            }
1011                            let error =
1012                                make_ws_error_response(Some(id), -32001, "Denied by policy");
1013                            let mut sink = client_sink.lock().await;
1014                            let _ = sink.send(Message::Text(error.into())).await;
1015                            continue;
1016                        }
1017
1018                        // SECURITY (FIND-R52-WS-001): DLP scan parameters for secret exfiltration.
1019                        // Matches HTTP handler's DLP check to maintain security parity.
1020                        {
1021                            let dlp_findings = scan_parameters_for_secrets(arguments);
1022                            // SECURITY (FIND-R55-WS-001): DLP on request params always blocks,
1023                            // matching HTTP handler. Previously gated on injection_blocking flag.
1024                            if !dlp_findings.is_empty() {
1025                                for finding in &dlp_findings {
1026                                    record_dlp_finding(&finding.pattern_name);
1027                                }
1028                                let patterns: Vec<String> = dlp_findings
1029                                    .iter()
1030                                    .map(|f| format!("{} at {}", f.pattern_name, f.location))
1031                                    .collect();
1032                                let audit_reason = format!(
1033                                    "DLP: secrets detected in tool parameters: {patterns:?}"
1034                                );
1035                                tracing::warn!(
1036                                    "SECURITY: DLP blocking WS tool '{}' in session {}: {}",
1037                                    tool_name,
1038                                    session_id,
1039                                    audit_reason
1040                                );
1041                                let dlp_action = extractor::extract_action(tool_name, arguments);
1042                                let dlp_verdict = Verdict::Deny {
1043                                    reason: audit_reason,
1044                                };
1045                                let parameter_security_context =
1046                                    super::helpers::parameter_dlp_security_context(
1047                                        arguments,
1048                                        true,
1049                                        "tool_parameter_dlp",
1050                                    );
1051                                let envelope = build_secondary_acis_envelope_with_security_context(
1052                                    &dlp_action,
1053                                    &dlp_verdict,
1054                                    DecisionOrigin::Dlp,
1055                                    "websocket",
1056                                    Some(&session_id),
1057                                    Some(&parameter_security_context),
1058                                );
1059                                if let Err(e) = state
1060                                    .audit
1061                                    .log_entry_with_acis(
1062                                        &dlp_action,
1063                                        &dlp_verdict,
1064                                        json!({
1065                                            "source": "ws_proxy",
1066                                            "session": session_id,
1067                                            "transport": "websocket",
1068                                            "event": "dlp_secret_blocked",
1069                                            "tool": tool_name,
1070                                            "findings": patterns,
1071                                        }),
1072                                        envelope,
1073                                    )
1074                                    .await
1075                                {
1076                                    tracing::warn!("Failed to audit WS DLP finding: {}", e);
1077                                }
1078                                let error = make_ws_error_response(
1079                                    Some(id),
1080                                    -32001,
1081                                    "Request blocked: security policy violation",
1082                                );
1083                                let mut sink = client_sink.lock().await;
1084                                let _ = sink.send(Message::Text(error.into())).await;
1085                                continue;
1086                            }
1087                        }
1088
1089                        // SECURITY (FIND-R52-WS-002): Memory poisoning detection.
1090                        // Check if tool call parameters contain replayed response data,
1091                        // matching the HTTP handler's memory poisoning check.
1092                        {
1093                            let poisoning_detected = state
1094                                .sessions
1095                                .get_mut(&session_id)
1096                                .and_then(|session| {
1097                                    let matches =
1098                                        session.memory_tracker.check_parameters(arguments);
1099                                    if !matches.is_empty() {
1100                                        for m in &matches {
1101                                            tracing::warn!(
1102                                                "SECURITY: Memory poisoning detected in WS tool '{}' (session {}): \
1103                                                 param '{}' contains replayed data (fingerprint: {})",
1104                                                tool_name,
1105                                                session_id,
1106                                                m.param_location,
1107                                                m.fingerprint
1108                                            );
1109                                        }
1110                                        Some(matches.len())
1111                                    } else {
1112                                        None
1113                                    }
1114                                });
1115                            if let Some(match_count) = poisoning_detected {
1116                                let poison_action = extractor::extract_action(tool_name, arguments);
1117                                let deny_reason = format!(
1118                                    "Memory poisoning detected: {match_count} replayed data fragment(s) in tool '{tool_name}'"
1119                                );
1120                                let poison_verdict = Verdict::Deny {
1121                                    reason: deny_reason,
1122                                };
1123                                let poisoning_security_context =
1124                                    super::helpers::memory_poisoning_security_context(
1125                                        arguments,
1126                                        "memory_poisoning",
1127                                    );
1128                                let envelope = build_secondary_acis_envelope_with_security_context(
1129                                    &poison_action,
1130                                    &poison_verdict,
1131                                    DecisionOrigin::MemoryPoisoning,
1132                                    "websocket",
1133                                    Some(&session_id),
1134                                    Some(&poisoning_security_context),
1135                                );
1136                                if let Err(e) = state
1137                                    .audit
1138                                    .log_entry_with_acis(
1139                                        &poison_action,
1140                                        &poison_verdict,
1141                                        json!({
1142                                            "source": "ws_proxy",
1143                                            "session": session_id,
1144                                            "transport": "websocket",
1145                                            "event": "memory_poisoning_detected",
1146                                            "matches": match_count,
1147                                            "tool": tool_name,
1148                                        }),
1149                                        envelope,
1150                                    )
1151                                    .await
1152                                {
1153                                    tracing::warn!("Failed to audit WS memory poisoning: {}", e);
1154                                }
1155                                let error = make_ws_error_response(
1156                                    Some(id),
1157                                    -32001,
1158                                    "Request blocked: security policy violation",
1159                                );
1160                                let mut sink = client_sink.lock().await;
1161                                let _ = sink.send(Message::Text(error.into())).await;
1162                                continue;
1163                            }
1164                        }
1165
1166                        // SECURITY (FIND-R46-008): Circuit breaker check.
1167                        // If the circuit is open for this tool, reject immediately.
1168                        if let Some(ref circuit_breaker) = state.circuit_breaker {
1169                            if let Err(reason) = circuit_breaker.can_proceed(tool_name) {
1170                                tracing::warn!(
1171                                    session_id = %session_id,
1172                                    "SECURITY: WS circuit breaker open for tool '{}': {}",
1173                                    tool_name,
1174                                    reason
1175                                );
1176                                let verdict = Verdict::Deny {
1177                                    reason: format!("Circuit breaker open: {reason}"),
1178                                };
1179                                // SECURITY (R251-ACIS-1): Use CircuitBreaker origin, not RateLimiter.
1180                                let circuit_breaker_security_context =
1181                                    super::helpers::circuit_breaker_security_context(&action);
1182                                let envelope = build_secondary_acis_envelope_with_security_context(
1183                                    &action,
1184                                    &verdict,
1185                                    DecisionOrigin::CircuitBreaker,
1186                                    "websocket",
1187                                    Some(&session_id),
1188                                    Some(&circuit_breaker_security_context),
1189                                );
1190                                if let Err(e) = state
1191                                    .audit
1192                                    .log_entry_with_acis(
1193                                        &action,
1194                                        &verdict,
1195                                        json!({
1196                                            "source": "ws_proxy",
1197                                            "session": session_id,
1198                                            "transport": "websocket",
1199                                            "event": "circuit_breaker_rejected",
1200                                            "tool": tool_name,
1201                                        }),
1202                                        envelope,
1203                                    )
1204                                    .await
1205                                {
1206                                    tracing::warn!(
1207                                        "Failed to audit WS circuit breaker rejection: {}",
1208                                        e
1209                                    );
1210                                }
1211                                let error = make_ws_error_response(
1212                                    Some(id),
1213                                    -32001,
1214                                    "Service temporarily unavailable",
1215                                );
1216                                let mut sink = client_sink.lock().await;
1217                                let _ = sink.send(Message::Text(error.into())).await;
1218                                continue;
1219                            }
1220                        }
1221
1222                        // SECURITY (FIND-R46-013): Tool registry trust check.
1223                        // If tool registry is configured, check trust level before evaluation.
1224                        if let Some(ref registry) = state.tool_registry {
1225                            let registry_eval_ctx = state
1226                                .sessions
1227                                .get(&session_id)
1228                                .map(|session| EvaluationContext {
1229                                    timestamp: None,
1230                                    agent_id: session.oauth_subject.clone(),
1231                                    agent_identity: session.agent_identity.clone(),
1232                                    call_counts: session.call_counts.clone(),
1233                                    previous_actions: session
1234                                        .action_history
1235                                        .iter()
1236                                        .cloned()
1237                                        .collect(),
1238                                    call_chain: session.current_call_chain.clone(),
1239                                    tenant_id: None,
1240                                    verification_tier: None,
1241                                    capability_token: None,
1242                                    session_state: None,
1243                                })
1244                                .unwrap_or_default();
1245                            let registry_runtime_security_context =
1246                                build_ws_runtime_security_context(
1247                                    &parsed,
1248                                    &action,
1249                                    &handshake_headers,
1250                                    super::helpers::TransportSecurityInputs {
1251                                        oauth_evidence: oauth_claims.as_ref(),
1252                                        eval_ctx: Some(&registry_eval_ctx),
1253                                        sessions: &state.sessions,
1254                                        session_id: Some(&session_id),
1255                                        trusted_request_signers: &state.trusted_request_signers,
1256                                        detached_signature_freshness: state
1257                                            .detached_signature_freshness,
1258                                    },
1259                                );
1260                            let trust = registry.check_trust_level(tool_name).await;
1261                            match trust {
1262                                vellaveto_mcp::tool_registry::TrustLevel::Unknown => {
1263                                    match super::helpers::presented_approval_matches_action(
1264                                        &state,
1265                                        &session_id,
1266                                        presented_approval_id.as_deref(),
1267                                        &action,
1268                                    )
1269                                    .await
1270                                    {
1271                                        Ok(Some(approval_id)) => {
1272                                            matched_approval_id = Some(approval_id);
1273                                            matched_approval_registry = Some("unknown_tool");
1274                                        }
1275                                        Ok(None) => {
1276                                            registry.register_unknown(tool_name).await;
1277                                            let verdict = Verdict::Deny {
1278                                                reason: "Unknown tool requires approval"
1279                                                    .to_string(),
1280                                            };
1281                                            let approval_security_context =
1282                                                super::helpers::unknown_tool_approval_gate_security_context(&action);
1283                                            let combined_security_context =
1284                                                super::helpers::merge_transport_security_context(
1285                                                    registry_runtime_security_context.as_ref(),
1286                                                    Some(&approval_security_context),
1287                                                );
1288                                            let effective_security_context =
1289                                                combined_security_context
1290                                                    .as_ref()
1291                                                    .unwrap_or(&approval_security_context);
1292                                            let envelope =
1293                                                build_secondary_acis_envelope_with_security_context(
1294                                                    &action,
1295                                                    &verdict,
1296                                                    DecisionOrigin::PolicyEngine,
1297                                                    "websocket",
1298                                                    Some(&session_id),
1299                                                    Some(effective_security_context),
1300                                                );
1301                                            if let Err(e) = state
1302                                                .audit
1303                                                .log_entry_with_acis(
1304                                                    &action,
1305                                                    &verdict,
1306                                                    json!({
1307                                                        "source": "ws_proxy",
1308                                                        "session": session_id,
1309                                                        "transport": "websocket",
1310                                                        "registry": "unknown_tool",
1311                                                        "tool": tool_name,
1312                                                    }),
1313                                                    envelope,
1314                                                )
1315                                                .await
1316                                            {
1317                                                tracing::warn!(
1318                                                    "Failed to audit WS unknown tool: {}",
1319                                                    e
1320                                                );
1321                                            }
1322                                            let approval_reason = "Approval required";
1323                                            let containment_context =
1324                                                super::helpers::approval_containment_context_from_security_context(
1325                                                    effective_security_context,
1326                                                    approval_reason,
1327                                                );
1328                                            let approval_id =
1329                                                super::helpers::create_pending_approval_with_context(
1330                                                    &state,
1331                                                    &session_id,
1332                                                    &action,
1333                                                    approval_reason,
1334                                                    containment_context,
1335                                                )
1336                                                .await;
1337                                            let error = make_ws_error_response_with_data(
1338                                                Some(id),
1339                                                -32001,
1340                                                approval_reason,
1341                                                Some(json!({
1342                                                    "verdict": "require_approval",
1343                                                    "reason": approval_reason,
1344                                                    "approval_id": approval_id,
1345                                                })),
1346                                            );
1347                                            let mut sink = client_sink.lock().await;
1348                                            let _ = sink.send(Message::Text(error.into())).await;
1349                                            continue;
1350                                        }
1351                                        Err(()) => {
1352                                            let verdict = Verdict::Deny {
1353                                                reason: INVALID_PRESENTED_APPROVAL_REASON
1354                                                    .to_string(),
1355                                            };
1356                                            let invalid_approval_security_context =
1357                                                super::helpers::invalid_presented_approval_security_context(&action);
1358                                            let combined_security_context =
1359                                                super::helpers::merge_transport_security_context(
1360                                                    registry_runtime_security_context.as_ref(),
1361                                                    Some(&invalid_approval_security_context),
1362                                                );
1363                                            let effective_security_context =
1364                                                combined_security_context
1365                                                    .as_ref()
1366                                                    .unwrap_or(&invalid_approval_security_context);
1367                                            let envelope =
1368                                                build_secondary_acis_envelope_with_security_context(
1369                                                    &action,
1370                                                    &verdict,
1371                                                    DecisionOrigin::PolicyEngine,
1372                                                    "websocket",
1373                                                    Some(&session_id),
1374                                                    Some(effective_security_context),
1375                                                );
1376                                            let _ = state
1377                                                .audit
1378                                                .log_entry_with_acis(
1379                                                    &action,
1380                                                    &verdict,
1381                                                    json!({
1382                                                        "source": "ws_proxy",
1383                                                        "session": session_id,
1384                                                        "transport": "websocket",
1385                                                        "registry": "unknown_tool",
1386                                                        "event": "presented_approval_replay_denied",
1387                                                        "approval_id": presented_approval_id,
1388                                                    }),
1389                                                    envelope,
1390                                                )
1391                                                .await;
1392                                            let error = make_ws_error_response(
1393                                                Some(id),
1394                                                -32001,
1395                                                "Denied by policy",
1396                                            );
1397                                            let mut sink = client_sink.lock().await;
1398                                            let _ = sink.send(Message::Text(error.into())).await;
1399                                            continue;
1400                                        }
1401                                    }
1402                                }
1403                                vellaveto_mcp::tool_registry::TrustLevel::Untrusted {
1404                                    score: _,
1405                                } => {
1406                                    match super::helpers::presented_approval_matches_action(
1407                                        &state,
1408                                        &session_id,
1409                                        presented_approval_id.as_deref(),
1410                                        &action,
1411                                    )
1412                                    .await
1413                                    {
1414                                        Ok(Some(approval_id)) => {
1415                                            matched_approval_id = Some(approval_id);
1416                                            matched_approval_registry = Some("untrusted_tool");
1417                                        }
1418                                        Ok(None) => {
1419                                            let verdict = Verdict::Deny {
1420                                                reason: "Untrusted tool requires approval"
1421                                                    .to_string(),
1422                                            };
1423                                            let approval_security_context =
1424                                                super::helpers::untrusted_tool_approval_gate_security_context(&action);
1425                                            let combined_security_context =
1426                                                super::helpers::merge_transport_security_context(
1427                                                    registry_runtime_security_context.as_ref(),
1428                                                    Some(&approval_security_context),
1429                                                );
1430                                            let effective_security_context =
1431                                                combined_security_context
1432                                                    .as_ref()
1433                                                    .unwrap_or(&approval_security_context);
1434                                            let envelope =
1435                                                build_secondary_acis_envelope_with_security_context(
1436                                                    &action,
1437                                                    &verdict,
1438                                                    DecisionOrigin::PolicyEngine,
1439                                                    "websocket",
1440                                                    Some(&session_id),
1441                                                    Some(effective_security_context),
1442                                                );
1443                                            if let Err(e) = state
1444                                                .audit
1445                                                .log_entry_with_acis(
1446                                                    &action,
1447                                                    &verdict,
1448                                                    json!({
1449                                                        "source": "ws_proxy",
1450                                                        "session": session_id,
1451                                                        "transport": "websocket",
1452                                                        "registry": "untrusted_tool",
1453                                                        "tool": tool_name,
1454                                                    }),
1455                                                    envelope,
1456                                                )
1457                                                .await
1458                                            {
1459                                                tracing::warn!(
1460                                                    "Failed to audit WS untrusted tool: {}",
1461                                                    e
1462                                                );
1463                                            }
1464                                            let approval_reason = "Approval required";
1465                                            let containment_context =
1466                                                super::helpers::approval_containment_context_from_security_context(
1467                                                    effective_security_context,
1468                                                    approval_reason,
1469                                                );
1470                                            let approval_id =
1471                                                super::helpers::create_pending_approval_with_context(
1472                                                    &state,
1473                                                    &session_id,
1474                                                    &action,
1475                                                    approval_reason,
1476                                                    containment_context,
1477                                                )
1478                                                .await;
1479                                            let error = make_ws_error_response_with_data(
1480                                                Some(id),
1481                                                -32001,
1482                                                approval_reason,
1483                                                Some(json!({
1484                                                    "verdict": "require_approval",
1485                                                    "reason": approval_reason,
1486                                                    "approval_id": approval_id,
1487                                                })),
1488                                            );
1489                                            let mut sink = client_sink.lock().await;
1490                                            let _ = sink.send(Message::Text(error.into())).await;
1491                                            continue;
1492                                        }
1493                                        Err(()) => {
1494                                            let verdict = Verdict::Deny {
1495                                                reason: INVALID_PRESENTED_APPROVAL_REASON
1496                                                    .to_string(),
1497                                            };
1498                                            let invalid_approval_security_context =
1499                                                super::helpers::invalid_presented_approval_security_context(&action);
1500                                            let combined_security_context =
1501                                                super::helpers::merge_transport_security_context(
1502                                                    registry_runtime_security_context.as_ref(),
1503                                                    Some(&invalid_approval_security_context),
1504                                                );
1505                                            let effective_security_context =
1506                                                combined_security_context
1507                                                    .as_ref()
1508                                                    .unwrap_or(&invalid_approval_security_context);
1509                                            let envelope =
1510                                                build_secondary_acis_envelope_with_security_context(
1511                                                    &action,
1512                                                    &verdict,
1513                                                    DecisionOrigin::PolicyEngine,
1514                                                    "websocket",
1515                                                    Some(&session_id),
1516                                                    Some(effective_security_context),
1517                                                );
1518                                            let _ = state
1519                                                .audit
1520                                                .log_entry_with_acis(
1521                                                    &action,
1522                                                    &verdict,
1523                                                    json!({
1524                                                        "source": "ws_proxy",
1525                                                        "session": session_id,
1526                                                        "transport": "websocket",
1527                                                        "registry": "untrusted_tool",
1528                                                        "event": "presented_approval_replay_denied",
1529                                                        "approval_id": presented_approval_id,
1530                                                    }),
1531                                                    envelope,
1532                                                )
1533                                                .await;
1534                                            let error = make_ws_error_response(
1535                                                Some(id),
1536                                                -32001,
1537                                                "Denied by policy",
1538                                            );
1539                                            let mut sink = client_sink.lock().await;
1540                                            let _ = sink.send(Message::Text(error.into())).await;
1541                                            continue;
1542                                        }
1543                                    }
1544                                }
1545                                vellaveto_mcp::tool_registry::TrustLevel::Trusted => {
1546                                    // Trusted — proceed to engine evaluation
1547                                }
1548                            }
1549                        }
1550
1551                        // SECURITY (FIND-R130-002): Combine context read, evaluation,
1552                        // and session update into a single block holding the DashMap
1553                        // shard lock. Without this, concurrent WS connections sharing
1554                        // a session can bypass max_calls_in_window by racing: both
1555                        // clone the same stale call_counts, both pass evaluation, both
1556                        // increment. Matches HTTP handler R19-TOCTOU pattern
1557                        // (handlers.rs:725-789).
1558
1559                        // Pre-compute security context BEFORE acquiring DashMap write lock
1560                        // to avoid deadlock — build_ws_runtime_security_context internally
1561                        // calls sessions.get()/get_mut() which would re-enter the lock.
1562                        let pre_eval_ctx = state
1563                            .sessions
1564                            .get(&session_id)
1565                            .map(|session| EvaluationContext {
1566                                timestamp: None,
1567                                agent_id: session.oauth_subject.clone(),
1568                                agent_identity: session.agent_identity.clone(),
1569                                call_chain: session.current_call_chain.clone(),
1570                                ..EvaluationContext::default()
1571                            })
1572                            .unwrap_or_else(|| EvaluationContext {
1573                                agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
1574                                ..EvaluationContext::default()
1575                            });
1576                        let security_context = build_ws_runtime_security_context(
1577                            &parsed,
1578                            &action,
1579                            &handshake_headers,
1580                            super::helpers::TransportSecurityInputs {
1581                                oauth_evidence: oauth_claims.as_ref(),
1582                                eval_ctx: Some(&pre_eval_ctx),
1583                                sessions: &state.sessions,
1584                                session_id: Some(&session_id),
1585                                trusted_request_signers: &state.trusted_request_signers,
1586                                detached_signature_freshness: state.detached_signature_freshness,
1587                            },
1588                        );
1589
1590                        // TOCTOU-safe evaluation + session update (write lock)
1591                        let (mediation_result, ctx) = if let Some(mut session) =
1592                            state.sessions.get_mut(&session_id)
1593                        {
1594                            let ctx = EvaluationContext {
1595                                timestamp: None,
1596                                agent_id: session.oauth_subject.clone(),
1597                                agent_identity: session.agent_identity.clone(),
1598                                call_counts: session.call_counts.clone(),
1599                                previous_actions: session.action_history.iter().cloned().collect(),
1600                                call_chain: session.current_call_chain.clone(),
1601                                tenant_id: None,
1602                                verification_tier: None,
1603                                capability_token: None,
1604                                session_state: None,
1605                            };
1606                            let result = mediate_with_security_context(
1607                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
1608                                &action,
1609                                &state.engine,
1610                                Some(&ctx),
1611                                security_context.as_ref(),
1612                                "websocket",
1613                                &state.mediation_config,
1614                                Some(&session_id),
1615                                None,
1616                            );
1617
1618                            // Atomically update session on Allow while still holding
1619                            // the shard lock — prevents TOCTOU bypass of call limits.
1620                            if matches!(result.verdict, Verdict::Allow) {
1621                                session.touch();
1622                                use crate::proxy::call_chain::{
1623                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
1624                                };
1625                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
1626                                    || session.call_counts.contains_key(tool_name)
1627                                {
1628                                    let count = session
1629                                        .call_counts
1630                                        .entry(tool_name.to_string())
1631                                        .or_insert(0);
1632                                    *count = count.saturating_add(1);
1633                                }
1634                                if session.action_history.len() >= MAX_ACTION_HISTORY {
1635                                    session.action_history.pop_front();
1636                                }
1637                                session.action_history.push_back(tool_name.to_string());
1638                            }
1639
1640                            (result, ctx)
1641                        } else {
1642                            // No session — evaluate without context (fail-closed)
1643                            let ctx = EvaluationContext::default();
1644                            let result = mediate_with_security_context(
1645                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
1646                                &action,
1647                                &state.engine,
1648                                None,
1649                                security_context.as_ref(),
1650                                "websocket",
1651                                &state.mediation_config,
1652                                Some(&session_id),
1653                                None,
1654                            );
1655                            (result, ctx)
1656                        };
1657
1658                        let mut final_origin = mediation_result.origin;
1659                        let mut acis_envelope = mediation_result.envelope.clone();
1660                        let mut refresh_envelope = false;
1661                        let verdict = match mediation_result.verdict {
1662                            Verdict::RequireApproval { reason } => {
1663                                if matched_approval_id.is_some() {
1664                                    final_origin = DecisionOrigin::PolicyEngine;
1665                                    refresh_envelope = true;
1666                                    Verdict::Allow
1667                                } else {
1668                                    match super::helpers::presented_approval_matches_action(
1669                                        &state,
1670                                        &session_id,
1671                                        presented_approval_id.as_deref(),
1672                                        &action,
1673                                    )
1674                                    .await
1675                                    {
1676                                        Ok(Some(approval_id)) => {
1677                                            matched_approval_id = Some(approval_id);
1678                                            final_origin = DecisionOrigin::PolicyEngine;
1679                                            refresh_envelope = true;
1680                                            Verdict::Allow
1681                                        }
1682                                        Ok(None) => Verdict::RequireApproval { reason },
1683                                        Err(()) => Verdict::Deny {
1684                                            reason: {
1685                                                final_origin = DecisionOrigin::ApprovalGate;
1686                                                refresh_envelope = true;
1687                                                INVALID_PRESENTED_APPROVAL_REASON.to_string()
1688                                            },
1689                                        },
1690                                    }
1691                                }
1692                            }
1693                            other => other,
1694                        };
1695                        if refresh_envelope {
1696                            acis_envelope = refresh_ws_acis_envelope(
1697                                &acis_envelope,
1698                                &action,
1699                                &verdict,
1700                                final_origin,
1701                                &session_id,
1702                                &ctx,
1703                                security_context.as_ref(),
1704                            );
1705                        }
1706
1707                        match verdict {
1708                            Verdict::Allow => {
1709                                // Phase 21: ABAC refinement — only runs when ABAC engine is configured
1710                                if let Some(ref abac) = state.abac_engine {
1711                                    let principal_id =
1712                                        ctx.agent_id.as_deref().unwrap_or("anonymous");
1713                                    let principal_type = ctx.principal_type();
1714                                    let session_risk = state
1715                                        .sessions
1716                                        .get_mut(&session_id)
1717                                        .and_then(|s| s.risk_score.clone());
1718                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
1719                                        eval_ctx: &ctx,
1720                                        principal_type,
1721                                        principal_id,
1722                                        risk_score: session_risk.as_ref(),
1723                                    };
1724                                    match abac.evaluate(&action, &abac_ctx) {
1725                                        vellaveto_engine::abac::AbacDecision::Deny {
1726                                            policy_id,
1727                                            reason,
1728                                        } => {
1729                                            let deny_verdict = Verdict::Deny {
1730                                                reason: reason.clone(),
1731                                            };
1732                                            let abac_security_context =
1733                                                super::helpers::abac_deny_security_context(&action);
1734                                            let envelope =
1735                                                build_secondary_acis_envelope_with_security_context(
1736                                                    &action,
1737                                                    &deny_verdict,
1738                                                    DecisionOrigin::PolicyEngine,
1739                                                    "websocket",
1740                                                    Some(&session_id),
1741                                                    Some(&abac_security_context),
1742                                                );
1743                                            if let Err(e) = state
1744                                                .audit
1745                                                .log_entry_with_acis(
1746                                                    &action,
1747                                                    &deny_verdict,
1748                                                    json!({
1749                                                        "source": "ws_proxy",
1750                                                        "session": session_id,
1751                                                        "transport": "websocket",
1752                                                        "event": "abac_deny",
1753                                                        "abac_policy": policy_id,
1754                                                    }),
1755                                                    envelope,
1756                                                )
1757                                                .await
1758                                            {
1759                                                tracing::warn!(
1760                                                    "Failed to audit WS ABAC deny: {}",
1761                                                    e
1762                                                );
1763                                            }
1764                                            // SECURITY (FIND-R46-012): Generic message to client;
1765                                            // detailed reason (ABAC policy_id, reason) is in
1766                                            // the audit log only.
1767                                            let error_resp = make_ws_error_response(
1768                                                Some(id),
1769                                                -32001,
1770                                                "Denied by policy",
1771                                            );
1772                                            let mut sink = client_sink.lock().await;
1773                                            let _ =
1774                                                sink.send(Message::Text(error_resp.into())).await;
1775                                            continue;
1776                                        }
1777                                        vellaveto_engine::abac::AbacDecision::Allow {
1778                                            policy_id,
1779                                        } => {
1780                                            if let Some(ref la) = state.least_agency {
1781                                                la.record_usage(
1782                                                    principal_id,
1783                                                    &session_id,
1784                                                    &policy_id,
1785                                                    tool_name,
1786                                                    &action.function,
1787                                                );
1788                                            }
1789                                        }
1790                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
1791                                            // Fall through — existing Allow stands
1792                                        }
1793                                        #[allow(unreachable_patterns)]
1794                                        // AbacDecision is #[non_exhaustive]
1795                                        _ => {
1796                                            // SECURITY (FIND-R74-002): Future variants — fail-closed (deny).
1797                                            // Must send deny and continue, not fall through to Allow path.
1798                                            tracing::warn!(
1799                                                "Unknown AbacDecision variant — fail-closed"
1800                                            );
1801                                            let error_resp = make_ws_error_response(
1802                                                Some(id),
1803                                                -32001,
1804                                                "Denied by policy",
1805                                            );
1806                                            let mut sink = client_sink.lock().await;
1807                                            let _ =
1808                                                sink.send(Message::Text(error_resp.into())).await;
1809                                            continue;
1810                                        }
1811                                    }
1812                                }
1813
1814                                // SECURITY (FIND-R46-013): Record tool call in registry on Allow
1815                                if let Some(ref registry) = state.tool_registry {
1816                                    registry.record_call(tool_name).await;
1817                                }
1818
1819                                // NOTE: Session touch + call_counts/action_history
1820                                // update already performed inside the TOCTOU-safe
1821                                // block above (FIND-R130-002). No separate update here.
1822
1823                                if super::helpers::consume_presented_approval(
1824                                    &state,
1825                                    &session_id,
1826                                    matched_approval_id.as_deref(),
1827                                    &action,
1828                                    security_context.as_ref(),
1829                                )
1830                                .await
1831                                .is_err()
1832                                {
1833                                    let deny_verdict = Verdict::Deny {
1834                                        reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
1835                                    };
1836                                    let invalid_approval_security_context =
1837                                        super::helpers::invalid_presented_approval_security_context(
1838                                            &action,
1839                                        );
1840                                    let combined_security_context =
1841                                        super::helpers::merge_transport_security_context(
1842                                            security_context.as_ref(),
1843                                            Some(&invalid_approval_security_context),
1844                                        );
1845                                    let effective_security_context = combined_security_context
1846                                        .as_ref()
1847                                        .unwrap_or(&invalid_approval_security_context);
1848                                    let envelope =
1849                                        build_secondary_acis_envelope_with_security_context(
1850                                            &action,
1851                                            &deny_verdict,
1852                                            DecisionOrigin::ApprovalGate,
1853                                            "websocket",
1854                                            Some(&session_id),
1855                                            Some(effective_security_context),
1856                                        );
1857                                    let mut audit_metadata = json!({
1858                                        "source": "ws_proxy",
1859                                        "session": session_id,
1860                                        "transport": "websocket",
1861                                        "event": "presented_approval_replay_denied",
1862                                        "approval_id": matched_approval_id,
1863                                    });
1864                                    if let Some(registry) = matched_approval_registry {
1865                                        audit_metadata["registry"] = json!(registry);
1866                                    }
1867                                    let _ = state
1868                                        .audit
1869                                        .log_entry_with_acis(
1870                                            &action,
1871                                            &deny_verdict,
1872                                            audit_metadata,
1873                                            envelope,
1874                                        )
1875                                        .await;
1876                                    let error_resp = make_ws_error_response(
1877                                        Some(id),
1878                                        -32001,
1879                                        "Denied by policy",
1880                                    );
1881                                    let mut sink = client_sink.lock().await;
1882                                    let _ = sink.send(Message::Text(error_resp.into())).await;
1883                                    continue;
1884                                }
1885
1886                                if let Err(e) = state
1887                                    .audit
1888                                    .log_entry_with_acis(
1889                                        &action,
1890                                        &Verdict::Allow,
1891                                        json!({
1892                                            "source": "ws_proxy",
1893                                            "session": session_id,
1894                                            "transport": "websocket",
1895                                        }),
1896                                        acis_envelope,
1897                                    )
1898                                    .await
1899                                {
1900                                    tracing::error!(
1901                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1902                                        e
1903                                    );
1904                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1905                                    // No unaudited security decisions can occur.
1906                                    if state.audit_strict_mode {
1907                                        let error = make_ws_error_response(
1908                                            Some(id),
1909                                            -32000,
1910                                            "Audit logging failed — request denied (strict audit mode)",
1911                                        );
1912                                        let mut sink = client_sink.lock().await;
1913                                        let _ = sink.send(Message::Text(error.into())).await;
1914                                        continue;
1915                                    }
1916                                }
1917
1918                                // Canonicalize and forward
1919                                let forward_text = if state.canonicalize {
1920                                    match serde_json::to_string(&parsed) {
1921                                        Ok(canonical) => canonical,
1922                                        Err(e) => {
1923                                            tracing::error!(
1924                                                "SECURITY: WS canonicalization failed: {}",
1925                                                e
1926                                            );
1927                                            let error_resp = make_ws_error_response(
1928                                                Some(id),
1929                                                -32603,
1930                                                "Internal error",
1931                                            );
1932                                            let mut sink = client_sink.lock().await;
1933                                            let _ =
1934                                                sink.send(Message::Text(error_resp.into())).await;
1935                                            continue;
1936                                        }
1937                                    }
1938                                } else {
1939                                    text.to_string()
1940                                };
1941
1942                                // Track request→response mapping for output-schema
1943                                // enforcement when upstream omits result._meta.tool.
1944                                track_pending_tool_call(
1945                                    &state.sessions,
1946                                    &session_id,
1947                                    id,
1948                                    tool_name,
1949                                );
1950
1951                                let mut sink = upstream_sink.lock().await;
1952                                if let Err(e) = sink
1953                                    .send(tokio_tungstenite::tungstenite::Message::Text(
1954                                        forward_text.into(),
1955                                    ))
1956                                    .await
1957                                {
1958                                    tracing::error!(
1959                                        session_id = %session_id,
1960                                        "Failed to forward to upstream: {}",
1961                                        e
1962                                    );
1963                                    break;
1964                                }
1965                            }
1966                            Verdict::Deny { ref reason } => {
1967                                // Audit the denial with detailed reason
1968                                let mut extra = json!({
1969                                    "source": "ws_proxy",
1970                                    "session": session_id,
1971                                    "transport": "websocket",
1972                                    "tool": tool_name,
1973                                });
1974                                if reason == INVALID_PRESENTED_APPROVAL_REASON
1975                                    && presented_approval_id.is_some()
1976                                {
1977                                    extra["event"] = json!("presented_approval_replay_denied");
1978                                    extra["approval_id"] = json!(presented_approval_id);
1979                                }
1980                                if let Err(e) = state
1981                                    .audit
1982                                    .log_entry_with_acis(&action, &verdict, extra, acis_envelope)
1983                                    .await
1984                                {
1985                                    tracing::error!(
1986                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1987                                        e
1988                                    );
1989                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1990                                    if state.audit_strict_mode {
1991                                        let error = make_ws_error_response(
1992                                            Some(id),
1993                                            -32000,
1994                                            "Audit logging failed — request denied (strict audit mode)",
1995                                        );
1996                                        let mut sink = client_sink.lock().await;
1997                                        let _ = sink.send(Message::Text(error.into())).await;
1998                                        continue;
1999                                    }
2000                                }
2001
2002                                // SECURITY (FIND-R46-012): Generic message to client.
2003                                // Detailed reason is in the audit log only.
2004                                let _ = reason; // used in audit above
2005                                let error =
2006                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2007                                let mut sink = client_sink.lock().await;
2008                                let _ = sink.send(Message::Text(error.into())).await;
2009                            }
2010                            Verdict::RequireApproval { ref reason, .. } => {
2011                                let approval_verdict = Verdict::RequireApproval {
2012                                    reason: reason.clone(),
2013                                };
2014                                let containment_context =
2015                                    super::helpers::approval_containment_context_from_envelope(
2016                                        &acis_envelope,
2017                                        reason,
2018                                    );
2019                                if let Err(e) = state
2020                                    .audit
2021                                    .log_entry_with_acis(
2022                                        &action,
2023                                        &approval_verdict,
2024                                        json!({
2025                                            "source": "ws_proxy",
2026                                            "session": session_id,
2027                                            "transport": "websocket",
2028                                        }),
2029                                        acis_envelope,
2030                                    )
2031                                    .await
2032                                {
2033                                    tracing::error!(
2034                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2035                                        e
2036                                    );
2037                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2038                                    if state.audit_strict_mode {
2039                                        let error = make_ws_error_response(
2040                                            Some(id),
2041                                            -32000,
2042                                            "Audit logging failed — request denied (strict audit mode)",
2043                                        );
2044                                        let mut sink = client_sink.lock().await;
2045                                        let _ = sink.send(Message::Text(error.into())).await;
2046                                        continue;
2047                                    }
2048                                }
2049                                let approval_reason = "Approval required";
2050                                let approval_id =
2051                                    super::helpers::create_pending_approval_with_context(
2052                                        &state,
2053                                        &session_id,
2054                                        &action,
2055                                        reason,
2056                                        containment_context,
2057                                    )
2058                                    .await;
2059                                let error = make_ws_error_response_with_data(
2060                                    Some(id),
2061                                    -32001,
2062                                    approval_reason,
2063                                    Some(json!({
2064                                        "verdict": "require_approval",
2065                                        "reason": approval_reason,
2066                                        "approval_id": approval_id,
2067                                    })),
2068                                );
2069                                let mut sink = client_sink.lock().await;
2070                                let _ = sink.send(Message::Text(error.into())).await;
2071                            }
2072                            // Fail-closed: unknown Verdict variants produce Deny
2073                            _ => {
2074                                let error =
2075                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2076                                let mut sink = client_sink.lock().await;
2077                                let _ = sink.send(Message::Text(error.into())).await;
2078                            }
2079                        }
2080                    }
2081                    MessageType::ResourceRead { ref id, ref uri } => {
2082                        // SECURITY (FIND-R74-007): Check for memory poisoning in resource URI.
2083                        // ResourceRead is a likely exfiltration vector: a poisoned tool response
2084                        // says "read this file" and the agent issues resources/read for that URI.
2085                        // Parity with HTTP handler (handlers.rs:1472).
2086                        {
2087                            let poisoning_detected = state
2088                                .sessions
2089                                .get_mut(&session_id)
2090                                .and_then(|session| {
2091                                    let uri_params = json!({"uri": uri});
2092                                    let matches =
2093                                        session.memory_tracker.check_parameters(&uri_params);
2094                                    if !matches.is_empty() {
2095                                        for m in &matches {
2096                                            tracing::warn!(
2097                                                "SECURITY: Memory poisoning detected in WS resources/read (session {}): \
2098                                                 param '{}' contains replayed data (fingerprint: {})",
2099                                                session_id,
2100                                                m.param_location,
2101                                                m.fingerprint
2102                                            );
2103                                        }
2104                                        Some(matches.len())
2105                                    } else {
2106                                        None
2107                                    }
2108                                });
2109                            if let Some(match_count) = poisoning_detected {
2110                                let poison_action = extractor::extract_resource_action(uri);
2111                                let deny_reason = format!(
2112                                    "Memory poisoning detected: {match_count} replayed data fragment(s) in resources/read"
2113                                );
2114                                let resource_poison_verdict = Verdict::Deny {
2115                                    reason: deny_reason.clone(),
2116                                };
2117                                let poisoning_security_context =
2118                                    super::helpers::memory_poisoning_security_context(
2119                                        &json!({ "uri": uri }),
2120                                        "memory_poisoning",
2121                                    );
2122                                let envelope = build_secondary_acis_envelope_with_security_context(
2123                                    &poison_action,
2124                                    &resource_poison_verdict,
2125                                    DecisionOrigin::MemoryPoisoning,
2126                                    "websocket",
2127                                    Some(&session_id),
2128                                    Some(&poisoning_security_context),
2129                                );
2130                                if let Err(e) = state
2131                                    .audit
2132                                    .log_entry_with_acis(
2133                                        &poison_action,
2134                                        &resource_poison_verdict,
2135                                        json!({
2136                                            "source": "ws_proxy",
2137                                            "session": session_id,
2138                                            "transport": "websocket",
2139                                            "event": "memory_poisoning_detected",
2140                                            "matches": match_count,
2141                                            "uri": uri,
2142                                        }),
2143                                        envelope,
2144                                    )
2145                                    .await
2146                                {
2147                                    tracing::warn!(
2148                                        "Failed to audit WS resource memory poisoning: {}",
2149                                        e
2150                                    );
2151                                }
2152                                let error = make_ws_error_response(
2153                                    Some(id),
2154                                    -32001,
2155                                    "Request blocked: security policy violation",
2156                                );
2157                                let mut sink = client_sink.lock().await;
2158                                let _ = sink.send(Message::Text(error.into())).await;
2159                                continue;
2160                            }
2161                        }
2162
2163                        // SECURITY (FIND-R115-041): Rug-pull detection for resource URIs.
2164                        // If the upstream server was flagged (annotations changed since initial
2165                        // tools/list), block resource reads from that server.
2166                        // Parity with HTTP handler (handlers.rs:1555).
2167                        // SECURITY (R240-PROXY-1): Fall back to global registry on session miss.
2168                        {
2169                            let is_flagged = state
2170                                .sessions
2171                                .get_mut(&session_id)
2172                                .map(|s| s.flagged_tools.contains(uri.as_str()))
2173                                .unwrap_or_else(|| {
2174                                    state.sessions.is_tool_globally_flagged(uri.as_str())
2175                                });
2176                            if is_flagged {
2177                                let action = extractor::extract_resource_action(uri);
2178                                let verdict = Verdict::Deny {
2179                                    reason: format!(
2180                                        "Resource '{uri}' blocked: server flagged by rug-pull detection"
2181                                    ),
2182                                };
2183                                let rug_pull_security_context =
2184                                    super::helpers::rug_pull_security_context(&action);
2185                                let envelope = build_secondary_acis_envelope_with_security_context(
2186                                    &action,
2187                                    &verdict,
2188                                    DecisionOrigin::CapabilityEnforcement,
2189                                    "websocket",
2190                                    Some(&session_id),
2191                                    Some(&rug_pull_security_context),
2192                                );
2193                                if let Err(e) = state
2194                                    .audit
2195                                    .log_entry_with_acis(
2196                                        &action,
2197                                        &verdict,
2198                                        json!({
2199                                            "source": "ws_proxy",
2200                                            "session": session_id,
2201                                            "transport": "websocket",
2202                                            "event": "rug_pull_resource_blocked",
2203                                            "uri": uri,
2204                                        }),
2205                                        envelope,
2206                                    )
2207                                    .await
2208                                {
2209                                    tracing::warn!(
2210                                        "Failed to audit WS resource rug-pull block: {}",
2211                                        e
2212                                    );
2213                                }
2214                                let error =
2215                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2216                                let mut sink = client_sink.lock().await;
2217                                let _ = sink.send(Message::Text(error.into())).await;
2218                                continue;
2219                            }
2220                        }
2221
2222                        // Build action for resource read
2223                        let mut action = extractor::extract_resource_action(uri);
2224                        let mut matched_approval_id: Option<String> = None;
2225
2226                        // SECURITY (FIND-R75-002): DNS resolution for resource reads.
2227                        // Parity with HTTP handler (handlers.rs:1543).
2228                        if state.engine.has_ip_rules() {
2229                            super::helpers::resolve_domains(&mut action).await;
2230                        }
2231
2232                        // SECURITY (FIND-R116-004): DLP scan on resource URI.
2233                        // Parity with HTTP handler (handlers.rs:1598).
2234                        {
2235                            let uri_params = json!({"uri": uri});
2236                            let dlp_findings = scan_parameters_for_secrets(&uri_params);
2237                            if !dlp_findings.is_empty() {
2238                                for finding in &dlp_findings {
2239                                    record_dlp_finding(&finding.pattern_name);
2240                                }
2241                                tracing::warn!(
2242                                    "SECURITY: Secret detected in WS resource URI! Session: {}, URI: [redacted]",
2243                                    session_id,
2244                                );
2245                                let audit_verdict = Verdict::Deny {
2246                                    reason: "DLP blocked: secret detected in resource URI"
2247                                        .to_string(),
2248                                };
2249                                let parameter_security_context =
2250                                    super::helpers::parameter_dlp_security_context(
2251                                        &uri_params,
2252                                        true,
2253                                        "resource_uri_dlp",
2254                                    );
2255                                let envelope = build_secondary_acis_envelope_with_security_context(
2256                                    &action,
2257                                    &audit_verdict,
2258                                    DecisionOrigin::Dlp,
2259                                    "websocket",
2260                                    Some(&session_id),
2261                                    Some(&parameter_security_context),
2262                                );
2263                                if let Err(e) = state.audit.log_entry_with_acis(
2264                                    &action, &audit_verdict,
2265                                    json!({
2266                                        "source": "ws_proxy", "session": session_id,
2267                                        "transport": "websocket", "event": "resource_uri_dlp_alert",
2268                                    }),
2269                                    envelope,
2270                                ).await {
2271                                    tracing::warn!("Failed to audit WS resource URI DLP: {}", e);
2272                                }
2273                                let error = make_ws_error_response(
2274                                    Some(id),
2275                                    -32001,
2276                                    "Request blocked: security policy violation",
2277                                );
2278                                let mut sink = client_sink.lock().await;
2279                                let _ = sink.send(Message::Text(error.into())).await;
2280                                continue;
2281                            }
2282                        }
2283
2284                        // SECURITY (FIND-R115-042): Circuit breaker check for resource reads.
2285                        // Parity with HTTP handler (handlers.rs:1668) — prevent resource reads
2286                        // from hammering a failing upstream server.
2287                        if let Some(ref circuit_breaker) = state.circuit_breaker {
2288                            if let Err(reason) = circuit_breaker.can_proceed(uri) {
2289                                tracing::warn!(
2290                                    "SECURITY: WS circuit breaker open for resource '{}' in session {}: {}",
2291                                    uri,
2292                                    session_id,
2293                                    reason
2294                                );
2295                                let verdict = Verdict::Deny {
2296                                    reason: format!("Circuit breaker open: {reason}"),
2297                                };
2298                                // SECURITY (R251-ACIS-1): Use CircuitBreaker origin, not RateLimiter.
2299                                let circuit_breaker_security_context =
2300                                    super::helpers::circuit_breaker_security_context(&action);
2301                                let envelope = build_secondary_acis_envelope_with_security_context(
2302                                    &action,
2303                                    &verdict,
2304                                    DecisionOrigin::CircuitBreaker,
2305                                    "websocket",
2306                                    Some(&session_id),
2307                                    Some(&circuit_breaker_security_context),
2308                                );
2309                                if let Err(e) = state
2310                                    .audit
2311                                    .log_entry_with_acis(
2312                                        &action,
2313                                        &verdict,
2314                                        json!({
2315                                            "source": "ws_proxy",
2316                                            "session": session_id,
2317                                            "transport": "websocket",
2318                                            "event": "circuit_breaker_rejected",
2319                                            "uri": uri,
2320                                        }),
2321                                        envelope,
2322                                    )
2323                                    .await
2324                                {
2325                                    tracing::warn!(
2326                                        "Failed to audit WS resource circuit breaker rejection: {}",
2327                                        e
2328                                    );
2329                                }
2330                                let error = make_ws_error_response(
2331                                    Some(id),
2332                                    -32001,
2333                                    "Service temporarily unavailable",
2334                                );
2335                                let mut sink = client_sink.lock().await;
2336                                let _ = sink.send(Message::Text(error.into())).await;
2337                                continue;
2338                            }
2339                        }
2340
2341                        // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
2342                        // for resource reads. Matches ToolCall fix above and HTTP
2343                        // handler FIND-R112-002 pattern (handlers.rs:1711-1774).
2344
2345                        // Pre-compute security context BEFORE acquiring DashMap write lock
2346                        // to avoid deadlock — build_ws_runtime_security_context internally
2347                        // calls sessions.get()/get_mut() which would re-enter the lock.
2348                        let pre_eval_ctx = state
2349                            .sessions
2350                            .get(&session_id)
2351                            .map(|session| EvaluationContext {
2352                                timestamp: None,
2353                                agent_id: session.oauth_subject.clone(),
2354                                agent_identity: session.agent_identity.clone(),
2355                                call_chain: session.current_call_chain.clone(),
2356                                ..EvaluationContext::default()
2357                            })
2358                            .unwrap_or_else(|| EvaluationContext {
2359                                agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
2360                                ..EvaluationContext::default()
2361                            });
2362                        let security_context = build_ws_runtime_security_context(
2363                            &parsed,
2364                            &action,
2365                            &handshake_headers,
2366                            super::helpers::TransportSecurityInputs {
2367                                oauth_evidence: oauth_claims.as_ref(),
2368                                eval_ctx: Some(&pre_eval_ctx),
2369                                sessions: &state.sessions,
2370                                session_id: Some(&session_id),
2371                                trusted_request_signers: &state.trusted_request_signers,
2372                                detached_signature_freshness: state.detached_signature_freshness,
2373                            },
2374                        );
2375
2376                        // TOCTOU-safe evaluation + session update (write lock)
2377                        let (mediation_result, ctx) = if let Some(mut session) =
2378                            state.sessions.get_mut(&session_id)
2379                        {
2380                            let ctx = EvaluationContext {
2381                                timestamp: None,
2382                                agent_id: session.oauth_subject.clone(),
2383                                agent_identity: session.agent_identity.clone(),
2384                                call_counts: session.call_counts.clone(),
2385                                previous_actions: session.action_history.iter().cloned().collect(),
2386                                call_chain: session.current_call_chain.clone(),
2387                                tenant_id: None,
2388                                verification_tier: None,
2389                                capability_token: None,
2390                                session_state: None,
2391                            };
2392                            let result = mediate_with_security_context(
2393                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
2394                                &action,
2395                                &state.engine,
2396                                Some(&ctx),
2397                                security_context.as_ref(),
2398                                "websocket",
2399                                &state.mediation_config,
2400                                Some(&session_id),
2401                                None,
2402                            );
2403
2404                            // Atomically update session on Allow
2405                            if matches!(result.verdict, Verdict::Allow) {
2406                                session.touch();
2407                                use crate::proxy::call_chain::{
2408                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
2409                                };
2410                                let resource_key = format!(
2411                                    "resources/read:{}",
2412                                    uri.chars().take(128).collect::<String>()
2413                                );
2414                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
2415                                    || session.call_counts.contains_key(&resource_key)
2416                                {
2417                                    let count =
2418                                        session.call_counts.entry(resource_key).or_insert(0);
2419                                    *count = count.saturating_add(1);
2420                                }
2421                                if session.action_history.len() >= MAX_ACTION_HISTORY {
2422                                    session.action_history.pop_front();
2423                                }
2424                                session
2425                                    .action_history
2426                                    .push_back("resources/read".to_string());
2427                            }
2428
2429                            (result, ctx)
2430                        } else {
2431                            let ctx = EvaluationContext::default();
2432                            let result = mediate_with_security_context(
2433                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
2434                                &action,
2435                                &state.engine,
2436                                None,
2437                                security_context.as_ref(),
2438                                "websocket",
2439                                &state.mediation_config,
2440                                Some(&session_id),
2441                                None,
2442                            );
2443                            (result, ctx)
2444                        };
2445
2446                        let mut final_origin = mediation_result.origin;
2447                        let mut acis_envelope = mediation_result.envelope.clone();
2448                        let mut refresh_envelope = false;
2449                        let verdict = match mediation_result.verdict {
2450                            Verdict::RequireApproval { reason } => {
2451                                match super::helpers::presented_approval_matches_action(
2452                                    &state,
2453                                    &session_id,
2454                                    presented_approval_id.as_deref(),
2455                                    &action,
2456                                )
2457                                .await
2458                                {
2459                                    Ok(Some(approval_id)) => {
2460                                        matched_approval_id = Some(approval_id);
2461                                        final_origin = DecisionOrigin::PolicyEngine;
2462                                        refresh_envelope = true;
2463                                        Verdict::Allow
2464                                    }
2465                                    Ok(None) => Verdict::RequireApproval { reason },
2466                                    Err(()) => {
2467                                        final_origin = DecisionOrigin::ApprovalGate;
2468                                        refresh_envelope = true;
2469                                        Verdict::Deny {
2470                                            reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
2471                                        }
2472                                    }
2473                                }
2474                            }
2475                            other => other,
2476                        };
2477                        if refresh_envelope {
2478                            acis_envelope = refresh_ws_acis_envelope(
2479                                &acis_envelope,
2480                                &action,
2481                                &verdict,
2482                                final_origin,
2483                                &session_id,
2484                                &ctx,
2485                                security_context.as_ref(),
2486                            );
2487                        }
2488
2489                        match verdict {
2490                            Verdict::Allow => {
2491                                // SECURITY (FIND-R116-002): ABAC refinement for resource reads.
2492                                // Parity with HTTP handler (handlers.rs:1783) and gRPC (service.rs:972).
2493                                if let Some(ref abac) = state.abac_engine {
2494                                    let principal_id =
2495                                        ctx.agent_id.as_deref().unwrap_or("anonymous");
2496                                    let principal_type = ctx.principal_type();
2497                                    let session_risk = state
2498                                        .sessions
2499                                        .get_mut(&session_id)
2500                                        .and_then(|s| s.risk_score.clone());
2501                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
2502                                        eval_ctx: &ctx,
2503                                        principal_type,
2504                                        principal_id,
2505                                        risk_score: session_risk.as_ref(),
2506                                    };
2507                                    match abac.evaluate(&action, &abac_ctx) {
2508                                        vellaveto_engine::abac::AbacDecision::Deny {
2509                                            policy_id,
2510                                            reason,
2511                                        } => {
2512                                            let deny_verdict = Verdict::Deny {
2513                                                reason: reason.clone(),
2514                                            };
2515                                            let abac_security_context =
2516                                                super::helpers::abac_deny_security_context(&action);
2517                                            let envelope =
2518                                                build_secondary_acis_envelope_with_security_context(
2519                                                    &action,
2520                                                    &deny_verdict,
2521                                                    DecisionOrigin::PolicyEngine,
2522                                                    "websocket",
2523                                                    Some(&session_id),
2524                                                    Some(&abac_security_context),
2525                                                );
2526                                            if let Err(e) = state
2527                                                .audit
2528                                                .log_entry_with_acis(
2529                                                    &action,
2530                                                    &deny_verdict,
2531                                                    json!({
2532                                                        "source": "ws_proxy",
2533                                                        "session": session_id,
2534                                                        "transport": "websocket",
2535                                                        "event": "abac_deny",
2536                                                        "abac_policy": policy_id,
2537                                                        "uri": uri,
2538                                                    }),
2539                                                    envelope,
2540                                                )
2541                                                .await
2542                                            {
2543                                                tracing::warn!(
2544                                                    "Failed to audit WS resource ABAC deny: {}",
2545                                                    e
2546                                                );
2547                                            }
2548                                            let error_resp = make_ws_error_response(
2549                                                Some(id),
2550                                                -32001,
2551                                                "Denied by policy",
2552                                            );
2553                                            let mut sink = client_sink.lock().await;
2554                                            let _ =
2555                                                sink.send(Message::Text(error_resp.into())).await;
2556                                            continue;
2557                                        }
2558                                        vellaveto_engine::abac::AbacDecision::Allow {
2559                                            policy_id,
2560                                        } => {
2561                                            // SECURITY (FIND-R192-002): record_usage parity.
2562                                            if let Some(ref la) = state.least_agency {
2563                                                la.record_usage(
2564                                                    principal_id,
2565                                                    &session_id,
2566                                                    &policy_id,
2567                                                    uri,
2568                                                    &action.function,
2569                                                );
2570                                            }
2571                                        }
2572                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
2573                                            // Fall through — existing Allow stands
2574                                        }
2575                                        #[allow(unreachable_patterns)]
2576                                        _ => {
2577                                            tracing::warn!(
2578                                                "Unknown AbacDecision variant in WS resource_read — fail-closed"
2579                                            );
2580                                            let error_resp = make_ws_error_response(
2581                                                Some(id),
2582                                                -32001,
2583                                                "Denied by policy",
2584                                            );
2585                                            let mut sink = client_sink.lock().await;
2586                                            let _ =
2587                                                sink.send(Message::Text(error_resp.into())).await;
2588                                            continue;
2589                                        }
2590                                    }
2591                                }
2592
2593                                // NOTE: Session touch + call_counts/action_history
2594                                // update already performed inside the TOCTOU-safe
2595                                // block above (FIND-R130-002). No separate update here.
2596
2597                                if super::helpers::consume_presented_approval(
2598                                    &state,
2599                                    &session_id,
2600                                    matched_approval_id.as_deref(),
2601                                    &action,
2602                                    security_context.as_ref(),
2603                                )
2604                                .await
2605                                .is_err()
2606                                {
2607                                    let deny_verdict = Verdict::Deny {
2608                                        reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
2609                                    };
2610                                    let invalid_approval_security_context =
2611                                        super::helpers::invalid_presented_approval_security_context(
2612                                            &action,
2613                                        );
2614                                    let combined_security_context =
2615                                        super::helpers::merge_transport_security_context(
2616                                            security_context.as_ref(),
2617                                            Some(&invalid_approval_security_context),
2618                                        );
2619                                    let effective_security_context = combined_security_context
2620                                        .as_ref()
2621                                        .unwrap_or(&invalid_approval_security_context);
2622                                    let envelope =
2623                                        build_secondary_acis_envelope_with_security_context(
2624                                            &action,
2625                                            &deny_verdict,
2626                                            DecisionOrigin::ApprovalGate,
2627                                            "websocket",
2628                                            Some(&session_id),
2629                                            Some(effective_security_context),
2630                                        );
2631                                    let _ = state
2632                                        .audit
2633                                        .log_entry_with_acis(
2634                                            &action,
2635                                            &deny_verdict,
2636                                            json!({
2637                                                "source": "ws_proxy",
2638                                                "session": session_id,
2639                                                "transport": "websocket",
2640                                                "event": "presented_approval_replay_denied",
2641                                                "approval_id": matched_approval_id,
2642                                                "uri": uri,
2643                                            }),
2644                                            envelope,
2645                                        )
2646                                        .await;
2647                                    let error_resp = make_ws_error_response(
2648                                        Some(id),
2649                                        -32001,
2650                                        "Denied by policy",
2651                                    );
2652                                    let mut sink = client_sink.lock().await;
2653                                    let _ = sink.send(Message::Text(error_resp.into())).await;
2654                                    continue;
2655                                }
2656
2657                                if let Err(e) = state
2658                                    .audit
2659                                    .log_entry_with_acis(
2660                                        &action,
2661                                        &Verdict::Allow,
2662                                        json!({
2663                                            "source": "ws_proxy",
2664                                            "session": session_id,
2665                                            "transport": "websocket",
2666                                            "resource_uri": uri,
2667                                        }),
2668                                        acis_envelope,
2669                                    )
2670                                    .await
2671                                {
2672                                    tracing::error!(
2673                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2674                                        e
2675                                    );
2676                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2677                                    if state.audit_strict_mode {
2678                                        let error = make_ws_error_response(
2679                                            Some(id),
2680                                            -32000,
2681                                            "Audit logging failed — request denied (strict audit mode)",
2682                                        );
2683                                        let mut sink = client_sink.lock().await;
2684                                        let _ = sink.send(Message::Text(error.into())).await;
2685                                        continue;
2686                                    }
2687                                }
2688
2689                                // SECURITY (FIND-R46-011): Fail-closed on canonicalization
2690                                // failure. Do NOT fall back to original text.
2691                                let forward_text = if state.canonicalize {
2692                                    match serde_json::to_string(&parsed) {
2693                                        Ok(canonical) => canonical,
2694                                        Err(e) => {
2695                                            tracing::error!(
2696                                                "SECURITY: WS resource canonicalization failed: {}",
2697                                                e
2698                                            );
2699                                            let error_resp = make_ws_error_response(
2700                                                Some(id),
2701                                                -32603,
2702                                                "Internal error",
2703                                            );
2704                                            let mut sink = client_sink.lock().await;
2705                                            let _ =
2706                                                sink.send(Message::Text(error_resp.into())).await;
2707                                            continue;
2708                                        }
2709                                    }
2710                                } else {
2711                                    text.to_string()
2712                                };
2713                                let mut sink = upstream_sink.lock().await;
2714                                if let Err(e) = sink
2715                                    .send(tokio_tungstenite::tungstenite::Message::Text(
2716                                        forward_text.into(),
2717                                    ))
2718                                    .await
2719                                {
2720                                    tracing::error!("Failed to forward resource read: {}", e);
2721                                    break;
2722                                }
2723                            }
2724                            // SECURITY (FIND-R116-009): Separate handling for Deny vs RequireApproval
2725                            // with per-verdict audit logging. Parity with gRPC (service.rs:1051-1076).
2726                            Verdict::Deny { ref reason } => {
2727                                let mut audit_metadata = json!({
2728                                    "source": "ws_proxy",
2729                                    "session": session_id,
2730                                    "transport": "websocket",
2731                                    "resource_uri": uri,
2732                                });
2733                                if reason == INVALID_PRESENTED_APPROVAL_REASON
2734                                    && presented_approval_id.is_some()
2735                                {
2736                                    audit_metadata["event"] =
2737                                        json!("presented_approval_replay_denied");
2738                                    audit_metadata["approval_id"] = json!(presented_approval_id);
2739                                }
2740                                if let Err(e) = state
2741                                    .audit
2742                                    .log_entry_with_acis(
2743                                        &action,
2744                                        &Verdict::Deny {
2745                                            reason: reason.clone(),
2746                                        },
2747                                        audit_metadata,
2748                                        acis_envelope,
2749                                    )
2750                                    .await
2751                                {
2752                                    tracing::error!(
2753                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2754                                        e
2755                                    );
2756                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2757                                    if state.audit_strict_mode {
2758                                        let error = make_ws_error_response(
2759                                            Some(id),
2760                                            -32000,
2761                                            "Audit logging failed — request denied (strict audit mode)",
2762                                        );
2763                                        let mut sink = client_sink.lock().await;
2764                                        let _ = sink.send(Message::Text(error.into())).await;
2765                                        continue;
2766                                    }
2767                                }
2768                                let error =
2769                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2770                                let mut sink = client_sink.lock().await;
2771                                let _ = sink.send(Message::Text(error.into())).await;
2772                            }
2773                            Verdict::RequireApproval { ref reason, .. } => {
2774                                let approval_verdict = Verdict::RequireApproval {
2775                                    reason: reason.clone(),
2776                                };
2777                                let containment_context =
2778                                    super::helpers::approval_containment_context_from_envelope(
2779                                        &acis_envelope,
2780                                        reason,
2781                                    );
2782                                if let Err(e) = state
2783                                    .audit
2784                                    .log_entry_with_acis(
2785                                        &action,
2786                                        &approval_verdict,
2787                                        json!({
2788                                            "source": "ws_proxy",
2789                                            "session": session_id,
2790                                            "transport": "websocket",
2791                                            "resource_uri": uri,
2792                                            "event": "require_approval",
2793                                        }),
2794                                        acis_envelope,
2795                                    )
2796                                    .await
2797                                {
2798                                    tracing::error!(
2799                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2800                                        e
2801                                    );
2802                                    // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2803                                    if state.audit_strict_mode {
2804                                        let error = make_ws_error_response(
2805                                            Some(id),
2806                                            -32000,
2807                                            "Audit logging failed — request denied (strict audit mode)",
2808                                        );
2809                                        let mut sink = client_sink.lock().await;
2810                                        let _ = sink.send(Message::Text(error.into())).await;
2811                                        continue;
2812                                    }
2813                                }
2814                                let approval_id =
2815                                    super::helpers::create_pending_approval_with_context(
2816                                        &state,
2817                                        &session_id,
2818                                        &action,
2819                                        reason,
2820                                        containment_context,
2821                                    )
2822                                    .await;
2823                                let error = make_ws_error_response_with_data(
2824                                    Some(id),
2825                                    -32001,
2826                                    "Approval required",
2827                                    Some(json!({
2828                                        "verdict": "require_approval",
2829                                        "reason": reason,
2830                                        "approval_id": approval_id,
2831                                    })),
2832                                );
2833                                let mut sink = client_sink.lock().await;
2834                                let _ = sink.send(Message::Text(error.into())).await;
2835                            }
2836                            #[allow(unreachable_patterns)]
2837                            _ => {
2838                                // SECURITY: Future variants — fail-closed.
2839                                tracing::warn!(
2840                                    "Unknown Verdict variant in WS resource_read — fail-closed"
2841                                );
2842                                let error =
2843                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
2844                                let mut sink = client_sink.lock().await;
2845                                let _ = sink.send(Message::Text(error.into())).await;
2846                            }
2847                        }
2848                    }
2849                    MessageType::Batch => {
2850                        // Reject batches per MCP spec
2851                        let action = Action::new(
2852                            "vellaveto",
2853                            "batch_rejected",
2854                            json!({
2855                                "session": session_id,
2856                                "transport": "websocket",
2857                            }),
2858                        );
2859                        let batch_verdict = Verdict::Deny {
2860                            reason: "JSON-RPC batching not supported".to_string(),
2861                        };
2862                        let batch_security_context =
2863                            super::helpers::batch_rejection_security_context(&action);
2864                        let envelope = build_secondary_acis_envelope_with_security_context(
2865                            &action,
2866                            &batch_verdict,
2867                            DecisionOrigin::PolicyEngine,
2868                            "websocket",
2869                            Some(&session_id),
2870                            Some(&batch_security_context),
2871                        );
2872                        if let Err(e) = state
2873                            .audit
2874                            .log_entry_with_acis(
2875                                &action,
2876                                &batch_verdict,
2877                                json!({
2878                                    "source": "ws_proxy",
2879                                    "event": "batch_rejected",
2880                                }),
2881                                envelope,
2882                            )
2883                            .await
2884                        {
2885                            tracing::warn!("Failed to audit WS batch rejection: {}", e);
2886                        }
2887                        let error = json!({
2888                            "jsonrpc": "2.0",
2889                            "error": {
2890                                "code": -32600,
2891                                "message": "JSON-RPC batch requests are not supported"
2892                            },
2893                            "id": null
2894                        });
2895                        let error_text = serde_json::to_string(&error)
2896                            .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32600,"message":"Batch not supported"},"id":null}"#.to_string());
2897                        let mut sink = client_sink.lock().await;
2898                        let _ = sink.send(Message::Text(error_text.into())).await;
2899                    }
2900                    MessageType::Invalid { ref id, ref reason } => {
2901                        tracing::warn!(
2902                            "Invalid JSON-RPC request in WebSocket transport: {}",
2903                            reason
2904                        );
2905                        let error =
2906                            make_ws_error_response(Some(id), -32600, "Invalid JSON-RPC request");
2907                        let mut sink = client_sink.lock().await;
2908                        let _ = sink.send(Message::Text(error.into())).await;
2909                    }
2910                    MessageType::SamplingRequest { ref id } => {
2911                        // SECURITY (FIND-R74-006): Call inspect_sampling() for full
2912                        // verdict (enabled + model filter + tool output check + rate limit),
2913                        // matching HTTP handler parity (handlers.rs:1681).
2914                        let params = parsed.get("params").cloned().unwrap_or(json!({}));
2915                        // SECURITY (FIND-R125-001): Per-session sampling rate limit
2916                        // parity with elicitation. Atomically read + increment.
2917                        let sampling_verdict = {
2918                            let mut session_ref = state.sessions.get_mut(&session_id);
2919                            let current_count =
2920                                session_ref.as_ref().map(|s| s.sampling_count).unwrap_or(0);
2921                            let verdict = vellaveto_mcp::elicitation::inspect_sampling(
2922                                &params,
2923                                &state.sampling_config,
2924                                current_count,
2925                            );
2926                            if matches!(verdict, vellaveto_mcp::elicitation::SamplingVerdict::Allow)
2927                            {
2928                                if let Some(ref mut s) = session_ref {
2929                                    s.sampling_count = s.sampling_count.saturating_add(1);
2930                                }
2931                            }
2932                            verdict
2933                        };
2934                        match sampling_verdict {
2935                            vellaveto_mcp::elicitation::SamplingVerdict::Allow => {
2936                                // Forward allowed sampling request
2937                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2938                                // Falling back to original text would create a TOCTOU gap.
2939                                let forward_text = if state.canonicalize {
2940                                    match serde_json::to_string(&parsed) {
2941                                        Ok(canonical) => canonical,
2942                                        Err(e) => {
2943                                            tracing::error!(
2944                                                "SECURITY: WS sampling canonicalization failed: {}",
2945                                                e
2946                                            );
2947                                            let error_resp = make_ws_error_response(
2948                                                Some(id),
2949                                                -32603,
2950                                                "Internal error",
2951                                            );
2952                                            let mut sink = client_sink.lock().await;
2953                                            let _ =
2954                                                sink.send(Message::Text(error_resp.into())).await;
2955                                            continue;
2956                                        }
2957                                    }
2958                                } else {
2959                                    text.to_string()
2960                                };
2961                                let mut sink = upstream_sink.lock().await;
2962                                let _ = sink
2963                                    .send(tokio_tungstenite::tungstenite::Message::Text(
2964                                        forward_text.into(),
2965                                    ))
2966                                    .await;
2967                            }
2968                            vellaveto_mcp::elicitation::SamplingVerdict::Deny { reason } => {
2969                                tracing::warn!(
2970                                    session_id = %session_id,
2971                                    "Blocked WS sampling/createMessage: {}",
2972                                    reason
2973                                );
2974                                let action = Action::new(
2975                                    "vellaveto",
2976                                    "ws_sampling_interception",
2977                                    json!({
2978                                        "method": "sampling/createMessage",
2979                                        "session": session_id,
2980                                        "transport": "websocket",
2981                                        "reason": &reason,
2982                                    }),
2983                                );
2984                                let verdict = Verdict::Deny {
2985                                    reason: reason.clone(),
2986                                };
2987                                let sampling_security_context =
2988                                    super::helpers::sampling_interception_security_context(&action);
2989                                let envelope = build_secondary_acis_envelope_with_security_context(
2990                                    &action,
2991                                    &verdict,
2992                                    DecisionOrigin::PolicyEngine,
2993                                    "websocket",
2994                                    Some(&session_id),
2995                                    Some(&sampling_security_context),
2996                                );
2997                                if let Err(e) = state
2998                                    .audit
2999                                    .log_entry_with_acis(
3000                                        &action,
3001                                        &verdict,
3002                                        json!({
3003                                            "source": "ws_proxy",
3004                                            "event": "ws_sampling_interception",
3005                                        }),
3006                                        envelope,
3007                                    )
3008                                    .await
3009                                {
3010                                    tracing::warn!(
3011                                        "Failed to audit WS sampling interception: {}",
3012                                        e
3013                                    );
3014                                }
3015                                // SECURITY: Generic message to client — detailed reason
3016                                // is in the audit log, not leaked to the client.
3017                                let error = make_ws_error_response(
3018                                    Some(id),
3019                                    -32001,
3020                                    "sampling/createMessage blocked by policy",
3021                                );
3022                                let mut sink = client_sink.lock().await;
3023                                let _ = sink.send(Message::Text(error.into())).await;
3024                            }
3025                        }
3026                    }
3027                    MessageType::TaskRequest {
3028                        ref id,
3029                        ref task_method,
3030                        ref task_id,
3031                    } => {
3032                        // SECURITY (FIND-R76-001): Memory poisoning detection on task params.
3033                        // Parity with HTTP handler (handlers.rs:2027-2084). Agents could
3034                        // exfiltrate poisoned data via task management operations.
3035                        {
3036                            let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
3037                            let poisoning_detected = state
3038                                .sessions
3039                                .get_mut(&session_id)
3040                                .and_then(|session| {
3041                                    let matches =
3042                                        session.memory_tracker.check_parameters(&task_params);
3043                                    if !matches.is_empty() {
3044                                        for m in &matches {
3045                                            tracing::warn!(
3046                                                "SECURITY: Memory poisoning detected in WS task '{}' (session {}): \
3047                                                 param '{}' contains replayed data (fingerprint: {})",
3048                                                task_method,
3049                                                session_id,
3050                                                m.param_location,
3051                                                m.fingerprint
3052                                            );
3053                                        }
3054                                        Some(matches.len())
3055                                    } else {
3056                                        None
3057                                    }
3058                                });
3059                            if let Some(match_count) = poisoning_detected {
3060                                let poison_action =
3061                                    extractor::extract_task_action(task_method, task_id.as_deref());
3062                                let deny_reason = format!(
3063                                    "Memory poisoning detected: {match_count} replayed data fragment(s) in task '{task_method}'"
3064                                );
3065                                let task_poison_verdict = Verdict::Deny {
3066                                    reason: deny_reason,
3067                                };
3068                                let poisoning_security_context =
3069                                    super::helpers::memory_poisoning_security_context(
3070                                        &task_params,
3071                                        "memory_poisoning",
3072                                    );
3073                                let envelope = build_secondary_acis_envelope_with_security_context(
3074                                    &poison_action,
3075                                    &task_poison_verdict,
3076                                    DecisionOrigin::MemoryPoisoning,
3077                                    "websocket",
3078                                    Some(&session_id),
3079                                    Some(&poisoning_security_context),
3080                                );
3081                                if let Err(e) = state
3082                                    .audit
3083                                    .log_entry_with_acis(
3084                                        &poison_action,
3085                                        &task_poison_verdict,
3086                                        json!({
3087                                            "source": "ws_proxy",
3088                                            "session": session_id,
3089                                            "transport": "websocket",
3090                                            "event": "memory_poisoning_detected",
3091                                            "matches": match_count,
3092                                            "task_method": task_method,
3093                                        }),
3094                                        envelope,
3095                                    )
3096                                    .await
3097                                {
3098                                    tracing::warn!(
3099                                        "Failed to audit WS task memory poisoning: {}",
3100                                        e
3101                                    );
3102                                }
3103                                let error = make_ws_error_response(
3104                                    Some(id),
3105                                    -32001,
3106                                    "Request blocked: security policy violation",
3107                                );
3108                                let mut sink = client_sink.lock().await;
3109                                let _ = sink.send(Message::Text(error.into())).await;
3110                                continue;
3111                            }
3112                        }
3113
3114                        // SECURITY (FIND-R76-001): DLP scan task request parameters.
3115                        // Parity with HTTP handler (handlers.rs:2086-2145). Agents could
3116                        // embed secrets in task_id or params to exfiltrate them.
3117                        {
3118                            let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
3119                            let dlp_findings = scan_parameters_for_secrets(&task_params);
3120                            if !dlp_findings.is_empty() {
3121                                for finding in &dlp_findings {
3122                                    record_dlp_finding(&finding.pattern_name);
3123                                }
3124                                let patterns: Vec<String> = dlp_findings
3125                                    .iter()
3126                                    .map(|f| format!("{} at {}", f.pattern_name, f.location))
3127                                    .collect();
3128                                tracing::warn!(
3129                                    "SECURITY: DLP blocking WS task '{}' in session {}: {:?}",
3130                                    task_method,
3131                                    session_id,
3132                                    patterns
3133                                );
3134                                let dlp_action =
3135                                    extractor::extract_task_action(task_method, task_id.as_deref());
3136                                let task_dlp_verdict = Verdict::Deny {
3137                                    reason: format!(
3138                                        "DLP: secrets detected in task request: {patterns:?}"
3139                                    ),
3140                                };
3141                                let parameter_security_context =
3142                                    super::helpers::parameter_dlp_security_context(
3143                                        &task_params,
3144                                        true,
3145                                        "task_parameter_dlp",
3146                                    );
3147                                let envelope = build_secondary_acis_envelope_with_security_context(
3148                                    &dlp_action,
3149                                    &task_dlp_verdict,
3150                                    DecisionOrigin::Dlp,
3151                                    "websocket",
3152                                    Some(&session_id),
3153                                    Some(&parameter_security_context),
3154                                );
3155                                if let Err(e) = state
3156                                    .audit
3157                                    .log_entry_with_acis(
3158                                        &dlp_action,
3159                                        &task_dlp_verdict,
3160                                        json!({
3161                                            "source": "ws_proxy",
3162                                            "session": session_id,
3163                                            "transport": "websocket",
3164                                            "event": "dlp_secret_detected_task",
3165                                            "task_method": task_method,
3166                                            "findings": patterns,
3167                                        }),
3168                                        envelope,
3169                                    )
3170                                    .await
3171                                {
3172                                    tracing::warn!("Failed to audit WS task DLP: {}", e);
3173                                }
3174                                let error = make_ws_error_response(
3175                                    Some(id),
3176                                    -32001,
3177                                    "Request blocked: security policy violation",
3178                                );
3179                                let mut sink = client_sink.lock().await;
3180                                let _ = sink.send(Message::Text(error.into())).await;
3181                                continue;
3182                            }
3183                        }
3184
3185                        // Policy-evaluate task requests (async operations)
3186                        let action =
3187                            extractor::extract_task_action(task_method, task_id.as_deref());
3188                        let mut matched_approval_id: Option<String> = None;
3189                        // SECURITY (FIND-R130-002): TOCTOU-safe context+eval for task
3190                        // requests. Context is built inside the DashMap shard lock to
3191                        // prevent stale snapshot evaluation races.
3192                        // SECURITY (FIND-R190-006): Update session state on Allow
3193                        // (touch + call_counts + action_history) while still holding
3194                        // the shard lock, matching ToolCall/ResourceRead parity.
3195
3196                        // Pre-compute security context BEFORE acquiring DashMap write lock
3197                        // to avoid deadlock — build_ws_runtime_security_context internally
3198                        // calls sessions.get()/get_mut() which would re-enter the lock.
3199                        let pre_eval_ctx = state
3200                            .sessions
3201                            .get(&session_id)
3202                            .map(|session| EvaluationContext {
3203                                timestamp: None,
3204                                agent_id: session.oauth_subject.clone(),
3205                                agent_identity: session.agent_identity.clone(),
3206                                call_chain: session.current_call_chain.clone(),
3207                                ..EvaluationContext::default()
3208                            })
3209                            .unwrap_or_else(|| EvaluationContext {
3210                                agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
3211                                ..EvaluationContext::default()
3212                            });
3213                        let security_context = build_ws_runtime_security_context(
3214                            &parsed,
3215                            &action,
3216                            &handshake_headers,
3217                            super::helpers::TransportSecurityInputs {
3218                                oauth_evidence: oauth_claims.as_ref(),
3219                                eval_ctx: Some(&pre_eval_ctx),
3220                                sessions: &state.sessions,
3221                                session_id: Some(&session_id),
3222                                trusted_request_signers: &state.trusted_request_signers,
3223                                detached_signature_freshness: state.detached_signature_freshness,
3224                            },
3225                        );
3226
3227                        // TOCTOU-safe evaluation + session update (write lock)
3228                        let (mediation_result, task_eval_ctx) = if let Some(mut session) =
3229                            state.sessions.get_mut(&session_id)
3230                        {
3231                            let ctx = EvaluationContext {
3232                                timestamp: None,
3233                                agent_id: session.oauth_subject.clone(),
3234                                agent_identity: session.agent_identity.clone(),
3235                                call_counts: session.call_counts.clone(),
3236                                previous_actions: session.action_history.iter().cloned().collect(),
3237                                call_chain: session.current_call_chain.clone(),
3238                                tenant_id: None,
3239                                verification_tier: None,
3240                                capability_token: None,
3241                                session_state: None,
3242                            };
3243                            let result = mediate_with_security_context(
3244                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
3245                                &action,
3246                                &state.engine,
3247                                Some(&ctx),
3248                                security_context.as_ref(),
3249                                "websocket",
3250                                &state.mediation_config,
3251                                Some(&session_id),
3252                                None,
3253                            );
3254
3255                            // Update session atomically on Allow
3256                            if matches!(result.verdict, Verdict::Allow) {
3257                                session.touch();
3258                                use crate::proxy::call_chain::{
3259                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
3260                                };
3261                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
3262                                    || session.call_counts.contains_key(task_method)
3263                                {
3264                                    let count = session
3265                                        .call_counts
3266                                        .entry(task_method.to_string())
3267                                        .or_insert(0);
3268                                    *count = count.saturating_add(1);
3269                                }
3270                                if session.action_history.len() >= MAX_ACTION_HISTORY {
3271                                    session.action_history.pop_front();
3272                                }
3273                                session.action_history.push_back(task_method.to_string());
3274                            }
3275
3276                            (result, ctx)
3277                        } else {
3278                            let ctx = EvaluationContext::default();
3279                            let result = mediate_with_security_context(
3280                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
3281                                &action,
3282                                &state.engine,
3283                                None,
3284                                security_context.as_ref(),
3285                                "websocket",
3286                                &state.mediation_config,
3287                                Some(&session_id),
3288                                None,
3289                            );
3290                            (result, ctx)
3291                        };
3292
3293                        let mut final_origin = mediation_result.origin;
3294                        let mut acis_envelope = mediation_result.envelope.clone();
3295                        let mut refresh_envelope = false;
3296                        let verdict = match mediation_result.verdict {
3297                            Verdict::RequireApproval { reason } => {
3298                                match super::helpers::presented_approval_matches_action(
3299                                    &state,
3300                                    &session_id,
3301                                    presented_approval_id.as_deref(),
3302                                    &action,
3303                                )
3304                                .await
3305                                {
3306                                    Ok(Some(approval_id)) => {
3307                                        matched_approval_id = Some(approval_id);
3308                                        final_origin = DecisionOrigin::PolicyEngine;
3309                                        refresh_envelope = true;
3310                                        Verdict::Allow
3311                                    }
3312                                    Ok(None) => Verdict::RequireApproval { reason },
3313                                    Err(()) => {
3314                                        final_origin = DecisionOrigin::ApprovalGate;
3315                                        refresh_envelope = true;
3316                                        Verdict::Deny {
3317                                            reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
3318                                        }
3319                                    }
3320                                }
3321                            }
3322                            other => other,
3323                        };
3324                        if refresh_envelope {
3325                            acis_envelope = refresh_ws_acis_envelope(
3326                                &acis_envelope,
3327                                &action,
3328                                &verdict,
3329                                final_origin,
3330                                &session_id,
3331                                &task_eval_ctx,
3332                                security_context.as_ref(),
3333                            );
3334                        }
3335
3336                        match verdict {
3337                            Verdict::Allow => {
3338                                // SECURITY (FIND-R190-001): ABAC refinement for TaskRequest,
3339                                // matching ToolCall/ResourceRead parity.
3340                                if let Some(ref abac) = state.abac_engine {
3341                                    let principal_id =
3342                                        task_eval_ctx.agent_id.as_deref().unwrap_or("anonymous");
3343                                    let principal_type = task_eval_ctx.principal_type();
3344                                    let session_risk = state
3345                                        .sessions
3346                                        .get_mut(&session_id)
3347                                        .and_then(|s| s.risk_score.clone());
3348                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
3349                                        eval_ctx: &task_eval_ctx,
3350                                        principal_type,
3351                                        principal_id,
3352                                        risk_score: session_risk.as_ref(),
3353                                    };
3354                                    match abac.evaluate(&action, &abac_ctx) {
3355                                        vellaveto_engine::abac::AbacDecision::Deny {
3356                                            policy_id,
3357                                            reason,
3358                                        } => {
3359                                            let deny_verdict = Verdict::Deny {
3360                                                reason: reason.clone(),
3361                                            };
3362                                            let abac_security_context =
3363                                                super::helpers::abac_deny_security_context(&action);
3364                                            let envelope =
3365                                                build_secondary_acis_envelope_with_security_context(
3366                                                    &action,
3367                                                    &deny_verdict,
3368                                                    DecisionOrigin::PolicyEngine,
3369                                                    "websocket",
3370                                                    Some(&session_id),
3371                                                    Some(&abac_security_context),
3372                                                );
3373                                            if let Err(e) = state
3374                                                .audit
3375                                                .log_entry_with_acis(
3376                                                    &action,
3377                                                    &deny_verdict,
3378                                                    json!({
3379                                                        "source": "ws_proxy",
3380                                                        "session": session_id,
3381                                                        "transport": "websocket",
3382                                                        "event": "abac_deny",
3383                                                        "abac_policy": policy_id,
3384                                                        "task_method": task_method,
3385                                                    }),
3386                                                    envelope,
3387                                                )
3388                                                .await
3389                                            {
3390                                                tracing::warn!(
3391                                                    "Failed to audit WS task ABAC deny: {}",
3392                                                    e
3393                                                );
3394                                            }
3395                                            let error_resp = make_ws_error_response(
3396                                                Some(id),
3397                                                -32001,
3398                                                "Denied by policy",
3399                                            );
3400                                            let mut sink = client_sink.lock().await;
3401                                            let _ =
3402                                                sink.send(Message::Text(error_resp.into())).await;
3403                                            continue;
3404                                        }
3405                                        vellaveto_engine::abac::AbacDecision::Allow {
3406                                            policy_id,
3407                                        } => {
3408                                            if let Some(ref la) = state.least_agency {
3409                                                la.record_usage(
3410                                                    principal_id,
3411                                                    &session_id,
3412                                                    &policy_id,
3413                                                    task_method,
3414                                                    &action.function,
3415                                                );
3416                                            }
3417                                        }
3418                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
3419                                            // Fall through — existing Allow stands
3420                                        }
3421                                        #[allow(unreachable_patterns)]
3422                                        _ => {
3423                                            tracing::warn!(
3424                                                "Unknown AbacDecision variant — fail-closed"
3425                                            );
3426                                            let error_resp = make_ws_error_response(
3427                                                Some(id),
3428                                                -32001,
3429                                                "Denied by policy",
3430                                            );
3431                                            let mut sink = client_sink.lock().await;
3432                                            let _ =
3433                                                sink.send(Message::Text(error_resp.into())).await;
3434                                            continue;
3435                                        }
3436                                    }
3437                                }
3438
3439                                if super::helpers::consume_presented_approval(
3440                                    &state,
3441                                    &session_id,
3442                                    matched_approval_id.as_deref(),
3443                                    &action,
3444                                    security_context.as_ref(),
3445                                )
3446                                .await
3447                                .is_err()
3448                                {
3449                                    let deny_verdict = Verdict::Deny {
3450                                        reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
3451                                    };
3452                                    let invalid_approval_security_context =
3453                                        super::helpers::invalid_presented_approval_security_context(
3454                                            &action,
3455                                        );
3456                                    let combined_security_context =
3457                                        super::helpers::merge_transport_security_context(
3458                                            security_context.as_ref(),
3459                                            Some(&invalid_approval_security_context),
3460                                        );
3461                                    let effective_security_context = combined_security_context
3462                                        .as_ref()
3463                                        .unwrap_or(&invalid_approval_security_context);
3464                                    let envelope =
3465                                        build_secondary_acis_envelope_with_security_context(
3466                                            &action,
3467                                            &deny_verdict,
3468                                            DecisionOrigin::ApprovalGate,
3469                                            "websocket",
3470                                            Some(&session_id),
3471                                            Some(effective_security_context),
3472                                        );
3473                                    let _ = state
3474                                        .audit
3475                                        .log_entry_with_acis(
3476                                            &action,
3477                                            &deny_verdict,
3478                                            json!({
3479                                                "source": "ws_proxy",
3480                                                "session": session_id,
3481                                                "transport": "websocket",
3482                                                "event": "presented_approval_replay_denied",
3483                                                "approval_id": matched_approval_id,
3484                                                "task_method": task_method,
3485                                                "task_id": task_id,
3486                                            }),
3487                                            envelope,
3488                                        )
3489                                        .await;
3490                                    let error_resp = make_ws_error_response(
3491                                        Some(id),
3492                                        -32001,
3493                                        "Denied by policy",
3494                                    );
3495                                    let mut sink = client_sink.lock().await;
3496                                    let _ = sink.send(Message::Text(error_resp.into())).await;
3497                                    continue;
3498                                }
3499
3500                                if let Err(e) = state
3501                                    .audit
3502                                    .log_entry_with_acis(
3503                                        &action,
3504                                        &Verdict::Allow,
3505                                        json!({
3506                                            "source": "ws_proxy",
3507                                            "session": session_id,
3508                                            "transport": "websocket",
3509                                            "task_method": task_method,
3510                                        }),
3511                                        acis_envelope,
3512                                    )
3513                                    .await
3514                                {
3515                                    tracing::warn!("Failed to audit WS task allow: {}", e);
3516                                }
3517                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3518                                let forward_text = if state.canonicalize {
3519                                    match serde_json::to_string(&parsed) {
3520                                        Ok(canonical) => canonical,
3521                                        Err(e) => {
3522                                            tracing::error!(
3523                                                "SECURITY: WS task canonicalization failed: {}",
3524                                                e
3525                                            );
3526                                            let error_resp = make_ws_error_response(
3527                                                Some(id),
3528                                                -32603,
3529                                                "Internal error",
3530                                            );
3531                                            let mut sink = client_sink.lock().await;
3532                                            let _ =
3533                                                sink.send(Message::Text(error_resp.into())).await;
3534                                            continue;
3535                                        }
3536                                    }
3537                                } else {
3538                                    text.to_string()
3539                                };
3540                                let mut sink = upstream_sink.lock().await;
3541                                if let Err(e) = sink
3542                                    .send(tokio_tungstenite::tungstenite::Message::Text(
3543                                        forward_text.into(),
3544                                    ))
3545                                    .await
3546                                {
3547                                    tracing::error!("Failed to forward task request: {}", e);
3548                                    break;
3549                                }
3550                            }
3551                            Verdict::Deny { ref reason } => {
3552                                let mut audit_metadata = json!({
3553                                    "source": "ws_proxy",
3554                                    "session": session_id,
3555                                    "transport": "websocket",
3556                                    "task_method": task_method,
3557                                });
3558                                if reason == INVALID_PRESENTED_APPROVAL_REASON
3559                                    && presented_approval_id.is_some()
3560                                {
3561                                    audit_metadata["event"] =
3562                                        json!("presented_approval_replay_denied");
3563                                    audit_metadata["approval_id"] = json!(presented_approval_id);
3564                                    audit_metadata["task_id"] = json!(task_id);
3565                                }
3566                                if let Err(e) = state
3567                                    .audit
3568                                    .log_entry_with_acis(
3569                                        &action,
3570                                        &Verdict::Deny {
3571                                            reason: reason.clone(),
3572                                        },
3573                                        audit_metadata,
3574                                        acis_envelope,
3575                                    )
3576                                    .await
3577                                {
3578                                    tracing::error!(
3579                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3580                                        e
3581                                    );
3582                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3583                                    if state.audit_strict_mode {
3584                                        let error = make_ws_error_response(
3585                                            Some(id),
3586                                            -32000,
3587                                            "Audit logging failed — request denied (strict audit mode)",
3588                                        );
3589                                        let mut sink = client_sink.lock().await;
3590                                        let _ = sink.send(Message::Text(error.into())).await;
3591                                        continue;
3592                                    }
3593                                }
3594                                // SECURITY (FIND-R55-WS-005): Generic denial message to prevent
3595                                // leaking policy names/details. Detailed reason is in audit log.
3596                                let denial =
3597                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
3598                                let mut sink = client_sink.lock().await;
3599                                let _ = sink.send(Message::Text(denial.into())).await;
3600                            }
3601                            Verdict::RequireApproval { ref reason, .. } => {
3602                                let approval_verdict = Verdict::RequireApproval {
3603                                    reason: reason.clone(),
3604                                };
3605                                let containment_context =
3606                                    super::helpers::approval_containment_context_from_envelope(
3607                                        &acis_envelope,
3608                                        reason,
3609                                    );
3610                                if let Err(e) = state
3611                                    .audit
3612                                    .log_entry_with_acis(
3613                                        &action,
3614                                        &approval_verdict,
3615                                        json!({
3616                                            "source": "ws_proxy",
3617                                            "session": session_id,
3618                                            "transport": "websocket",
3619                                            "task_method": task_method,
3620                                        }),
3621                                        acis_envelope,
3622                                    )
3623                                    .await
3624                                {
3625                                    tracing::error!(
3626                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3627                                        e
3628                                    );
3629                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3630                                    if state.audit_strict_mode {
3631                                        let error = make_ws_error_response(
3632                                            Some(id),
3633                                            -32000,
3634                                            "Audit logging failed — request denied (strict audit mode)",
3635                                        );
3636                                        let mut sink = client_sink.lock().await;
3637                                        let _ = sink.send(Message::Text(error.into())).await;
3638                                        continue;
3639                                    }
3640                                }
3641                                let approval_id =
3642                                    super::helpers::create_pending_approval_with_context(
3643                                        &state,
3644                                        &session_id,
3645                                        &action,
3646                                        reason,
3647                                        containment_context,
3648                                    )
3649                                    .await;
3650                                let denial = make_ws_error_response_with_data(
3651                                    Some(id),
3652                                    -32001,
3653                                    "Approval required",
3654                                    Some(json!({
3655                                        "verdict": "require_approval",
3656                                        "reason": reason,
3657                                        "approval_id": approval_id,
3658                                    })),
3659                                );
3660                                let mut sink = client_sink.lock().await;
3661                                let _ = sink.send(Message::Text(denial.into())).await;
3662                            }
3663                            _ => {
3664                                let denial =
3665                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
3666                                let mut sink = client_sink.lock().await;
3667                                let _ = sink.send(Message::Text(denial.into())).await;
3668                            }
3669                        }
3670                    }
3671                    MessageType::ExtensionMethod {
3672                        ref id,
3673                        ref extension_id,
3674                        ref method,
3675                    } => {
3676                        // Policy-evaluate extension method calls
3677                        let params = parsed.get("params").cloned().unwrap_or(json!({}));
3678
3679                        // SECURITY (FIND-R116-001): DLP scan extension method parameters.
3680                        // Parity with gRPC handle_extension_method (service.rs:1542).
3681                        let dlp_findings = scan_parameters_for_secrets(&params);
3682                        if !dlp_findings.is_empty() {
3683                            for finding in &dlp_findings {
3684                                record_dlp_finding(&finding.pattern_name);
3685                            }
3686                            let patterns: Vec<String> = dlp_findings
3687                                .iter()
3688                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
3689                                .collect();
3690                            tracing::warn!(
3691                                "SECURITY: Secrets in WS extension method parameters! Session: {}, Extension: {}:{}, Findings: {:?}",
3692                                session_id, extension_id, method, patterns,
3693                            );
3694                            let action =
3695                                extractor::extract_extension_action(extension_id, method, &params);
3696                            let audit_verdict = Verdict::Deny {
3697                                reason: format!(
3698                                    "DLP blocked: secret detected in extension parameters: {patterns:?}"
3699                                ),
3700                            };
3701                            let parameter_security_context =
3702                                super::helpers::parameter_dlp_security_context(
3703                                    &params,
3704                                    true,
3705                                    "extension_parameter_dlp",
3706                                );
3707                            let envelope = build_secondary_acis_envelope_with_security_context(
3708                                &action,
3709                                &audit_verdict,
3710                                DecisionOrigin::Dlp,
3711                                "websocket",
3712                                Some(&session_id),
3713                                Some(&parameter_security_context),
3714                            );
3715                            if let Err(e) = state.audit.log_entry_with_acis(
3716                                &action, &audit_verdict,
3717                                json!({
3718                                    "source": "ws_proxy", "session": session_id, "transport": "websocket",
3719                                    "event": "ws_extension_parameter_dlp_alert",
3720                                    "extension_id": extension_id, "method": method, "findings": patterns,
3721                                }),
3722                                envelope,
3723                            ).await {
3724                                tracing::warn!("Failed to audit WS extension parameter DLP: {}", e);
3725                            }
3726                            let denial =
3727                                make_ws_error_response(Some(id), -32001, "Denied by policy");
3728                            let mut sink = client_sink.lock().await;
3729                            let _ = sink.send(Message::Text(denial.into())).await;
3730                            continue;
3731                        }
3732
3733                        // SECURITY (FIND-R116-001): Memory poisoning detection for extension params.
3734                        // Parity with gRPC handle_extension_method (service.rs:1574).
3735                        if let Some(session) = state.sessions.get_mut(&session_id) {
3736                            let poisoning_matches =
3737                                session.memory_tracker.check_parameters(&params);
3738                            if !poisoning_matches.is_empty() {
3739                                for m in &poisoning_matches {
3740                                    tracing::warn!(
3741                                        "SECURITY: Memory poisoning in WS extension '{}:{}' (session {}): \
3742                                         param '{}' replayed data (fingerprint: {})",
3743                                        extension_id, method, session_id, m.param_location, m.fingerprint
3744                                    );
3745                                }
3746                                let action = extractor::extract_extension_action(
3747                                    extension_id,
3748                                    method,
3749                                    &params,
3750                                );
3751                                let deny_reason = format!(
3752                                    "Memory poisoning detected: {} replayed data fragment(s) in extension '{}:{}'",
3753                                    poisoning_matches.len(), extension_id, method
3754                                );
3755                                let ext_poison_verdict = Verdict::Deny {
3756                                    reason: deny_reason.clone(),
3757                                };
3758                                let poisoning_security_context =
3759                                    super::helpers::memory_poisoning_security_context(
3760                                        &params,
3761                                        "memory_poisoning",
3762                                    );
3763                                let envelope = build_secondary_acis_envelope_with_security_context(
3764                                    &action,
3765                                    &ext_poison_verdict,
3766                                    DecisionOrigin::MemoryPoisoning,
3767                                    "websocket",
3768                                    Some(&session_id),
3769                                    Some(&poisoning_security_context),
3770                                );
3771                                if let Err(e) = state.audit.log_entry_with_acis(
3772                                    &action,
3773                                    &ext_poison_verdict,
3774                                    json!({
3775                                        "source": "ws_proxy", "session": session_id, "transport": "websocket",
3776                                        "event": "memory_poisoning_detected",
3777                                        "matches": poisoning_matches.len(),
3778                                        "extension_id": extension_id, "method": method,
3779                                    }),
3780                                    envelope,
3781                                ).await {
3782                                    tracing::warn!("Failed to audit WS extension memory poisoning: {}", e);
3783                                }
3784                                let denial =
3785                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
3786                                let mut sink = client_sink.lock().await;
3787                                let _ = sink.send(Message::Text(denial.into())).await;
3788                                continue;
3789                            }
3790                        }
3791
3792                        let mut action =
3793                            extractor::extract_extension_action(extension_id, method, &params);
3794                        let mut matched_approval_id: Option<String> = None;
3795
3796                        // SECURITY (FIND-R118-004): DNS resolution for extension methods.
3797                        // Parity with ToolCall (line 710) and ResourceRead (line 1439).
3798                        if state.engine.has_ip_rules() {
3799                            super::helpers::resolve_domains(&mut action).await;
3800                        }
3801
3802                        let ext_key = format!("extension:{extension_id}:{method}");
3803
3804                        // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
3805                        // for extension methods. Matches ToolCall/ResourceRead fixes.
3806
3807                        // Pre-compute security context BEFORE acquiring DashMap write lock
3808                        // to avoid deadlock — build_ws_runtime_security_context internally
3809                        // calls sessions.get()/get_mut() which would re-enter the lock.
3810                        let pre_eval_ctx = state
3811                            .sessions
3812                            .get(&session_id)
3813                            .map(|session| EvaluationContext {
3814                                timestamp: None,
3815                                agent_id: session.oauth_subject.clone(),
3816                                agent_identity: session.agent_identity.clone(),
3817                                call_chain: session.current_call_chain.clone(),
3818                                ..EvaluationContext::default()
3819                            })
3820                            .unwrap_or_else(|| EvaluationContext {
3821                                agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
3822                                ..EvaluationContext::default()
3823                            });
3824                        let security_context = build_ws_runtime_security_context(
3825                            &parsed,
3826                            &action,
3827                            &handshake_headers,
3828                            super::helpers::TransportSecurityInputs {
3829                                oauth_evidence: oauth_claims.as_ref(),
3830                                eval_ctx: Some(&pre_eval_ctx),
3831                                sessions: &state.sessions,
3832                                session_id: Some(&session_id),
3833                                trusted_request_signers: &state.trusted_request_signers,
3834                                detached_signature_freshness: state.detached_signature_freshness,
3835                            },
3836                        );
3837
3838                        // TOCTOU-safe evaluation + session update (write lock)
3839                        let (mediation_result, ctx) = if let Some(mut session) =
3840                            state.sessions.get_mut(&session_id)
3841                        {
3842                            let ctx = EvaluationContext {
3843                                timestamp: None,
3844                                agent_id: session.oauth_subject.clone(),
3845                                agent_identity: session.agent_identity.clone(),
3846                                call_counts: session.call_counts.clone(),
3847                                previous_actions: session.action_history.iter().cloned().collect(),
3848                                call_chain: session.current_call_chain.clone(),
3849                                tenant_id: None,
3850                                verification_tier: None,
3851                                capability_token: None,
3852                                session_state: None,
3853                            };
3854                            let result = mediate_with_security_context(
3855                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
3856                                &action,
3857                                &state.engine,
3858                                Some(&ctx),
3859                                security_context.as_ref(),
3860                                "websocket",
3861                                &state.mediation_config,
3862                                Some(&session_id),
3863                                None,
3864                            );
3865
3866                            // Atomically update session on Allow
3867                            if matches!(result.verdict, Verdict::Allow) {
3868                                session.touch();
3869                                use crate::proxy::call_chain::{
3870                                    MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
3871                                };
3872                                if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
3873                                    || session.call_counts.contains_key(&ext_key)
3874                                {
3875                                    let count =
3876                                        session.call_counts.entry(ext_key.clone()).or_insert(0);
3877                                    *count = count.saturating_add(1);
3878                                }
3879                                if session.action_history.len() >= MAX_ACTION_HISTORY {
3880                                    session.action_history.pop_front();
3881                                }
3882                                session.action_history.push_back(ext_key.clone());
3883                            }
3884
3885                            (result, ctx)
3886                        } else {
3887                            let ctx = EvaluationContext::default();
3888                            let result = mediate_with_security_context(
3889                                &uuid::Uuid::new_v4().to_string().replace('-', ""),
3890                                &action,
3891                                &state.engine,
3892                                None,
3893                                security_context.as_ref(),
3894                                "websocket",
3895                                &state.mediation_config,
3896                                Some(&session_id),
3897                                None,
3898                            );
3899                            (result, ctx)
3900                        };
3901
3902                        let mut final_origin = mediation_result.origin;
3903                        let mut acis_envelope = mediation_result.envelope.clone();
3904                        let mut refresh_envelope = false;
3905                        let verdict = match mediation_result.verdict {
3906                            Verdict::RequireApproval { reason } => {
3907                                match super::helpers::presented_approval_matches_action(
3908                                    &state,
3909                                    &session_id,
3910                                    presented_approval_id.as_deref(),
3911                                    &action,
3912                                )
3913                                .await
3914                                {
3915                                    Ok(Some(approval_id)) => {
3916                                        matched_approval_id = Some(approval_id);
3917                                        final_origin = DecisionOrigin::PolicyEngine;
3918                                        refresh_envelope = true;
3919                                        Verdict::Allow
3920                                    }
3921                                    Ok(None) => Verdict::RequireApproval { reason },
3922                                    Err(()) => {
3923                                        final_origin = DecisionOrigin::ApprovalGate;
3924                                        refresh_envelope = true;
3925                                        Verdict::Deny {
3926                                            reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
3927                                        }
3928                                    }
3929                                }
3930                            }
3931                            other => other,
3932                        };
3933                        if refresh_envelope {
3934                            acis_envelope = refresh_ws_acis_envelope(
3935                                &acis_envelope,
3936                                &action,
3937                                &verdict,
3938                                final_origin,
3939                                &session_id,
3940                                &ctx,
3941                                security_context.as_ref(),
3942                            );
3943                        }
3944
3945                        match verdict {
3946                            Verdict::Allow => {
3947                                // SECURITY (FIND-R118-002): ABAC refinement for extension methods.
3948                                // Parity with ToolCall (line 1099) and ResourceRead (line 1498).
3949                                if let Some(ref abac) = state.abac_engine {
3950                                    let principal_id =
3951                                        ctx.agent_id.as_deref().unwrap_or("anonymous");
3952                                    let principal_type = ctx.principal_type();
3953                                    let session_risk = state
3954                                        .sessions
3955                                        .get_mut(&session_id)
3956                                        .and_then(|s| s.risk_score.clone());
3957                                    let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
3958                                        eval_ctx: &ctx,
3959                                        principal_type,
3960                                        principal_id,
3961                                        risk_score: session_risk.as_ref(),
3962                                    };
3963                                    match abac.evaluate(&action, &abac_ctx) {
3964                                        vellaveto_engine::abac::AbacDecision::Deny {
3965                                            policy_id,
3966                                            reason,
3967                                        } => {
3968                                            let deny_verdict = Verdict::Deny {
3969                                                reason: reason.clone(),
3970                                            };
3971                                            let abac_security_context =
3972                                                super::helpers::abac_deny_security_context(&action);
3973                                            let envelope =
3974                                                build_secondary_acis_envelope_with_security_context(
3975                                                    &action,
3976                                                    &deny_verdict,
3977                                                    DecisionOrigin::PolicyEngine,
3978                                                    "websocket",
3979                                                    Some(&session_id),
3980                                                    Some(&abac_security_context),
3981                                                );
3982                                            if let Err(e) = state
3983                                                .audit
3984                                                .log_entry_with_acis(
3985                                                    &action,
3986                                                    &deny_verdict,
3987                                                    json!({
3988                                                        "source": "ws_proxy",
3989                                                        "session": session_id,
3990                                                        "transport": "websocket",
3991                                                        "event": "abac_deny",
3992                                                        "extension_id": extension_id,
3993                                                        "abac_policy": policy_id,
3994                                                    }),
3995                                                    envelope,
3996                                                )
3997                                                .await
3998                                            {
3999                                                tracing::warn!(
4000                                                    "Failed to audit WS extension ABAC deny: {}",
4001                                                    e
4002                                                );
4003                                            }
4004                                            let error_resp = make_ws_error_response(
4005                                                Some(id),
4006                                                -32001,
4007                                                "Denied by policy",
4008                                            );
4009                                            let mut sink = client_sink.lock().await;
4010                                            let _ =
4011                                                sink.send(Message::Text(error_resp.into())).await;
4012                                            continue;
4013                                        }
4014                                        vellaveto_engine::abac::AbacDecision::Allow {
4015                                            policy_id,
4016                                        } => {
4017                                            if let Some(ref la) = state.least_agency {
4018                                                la.record_usage(
4019                                                    principal_id,
4020                                                    &session_id,
4021                                                    &policy_id,
4022                                                    &ext_key,
4023                                                    method,
4024                                                );
4025                                            }
4026                                        }
4027                                        vellaveto_engine::abac::AbacDecision::NoMatch => {
4028                                            // Fall through — existing Allow stands
4029                                        }
4030                                        #[allow(unreachable_patterns)]
4031                                        // AbacDecision is #[non_exhaustive]
4032                                        _ => {
4033                                            // SECURITY: Future variants — fail-closed (deny).
4034                                            tracing::warn!(
4035                                                "Unknown AbacDecision variant — fail-closed"
4036                                            );
4037                                            let error_resp = make_ws_error_response(
4038                                                Some(id),
4039                                                -32001,
4040                                                "Denied by policy",
4041                                            );
4042                                            let mut sink = client_sink.lock().await;
4043                                            let _ =
4044                                                sink.send(Message::Text(error_resp.into())).await;
4045                                            continue;
4046                                        }
4047                                    }
4048                                }
4049
4050                                // NOTE: Session touch + call_counts/action_history
4051                                // update already performed inside the TOCTOU-safe
4052                                // block above (FIND-R130-002). No separate update here.
4053
4054                                if super::helpers::consume_presented_approval(
4055                                    &state,
4056                                    &session_id,
4057                                    matched_approval_id.as_deref(),
4058                                    &action,
4059                                    security_context.as_ref(),
4060                                )
4061                                .await
4062                                .is_err()
4063                                {
4064                                    let deny_verdict = Verdict::Deny {
4065                                        reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
4066                                    };
4067                                    let invalid_approval_security_context =
4068                                        super::helpers::invalid_presented_approval_security_context(
4069                                            &action,
4070                                        );
4071                                    let combined_security_context =
4072                                        super::helpers::merge_transport_security_context(
4073                                            security_context.as_ref(),
4074                                            Some(&invalid_approval_security_context),
4075                                        );
4076                                    let effective_security_context = combined_security_context
4077                                        .as_ref()
4078                                        .unwrap_or(&invalid_approval_security_context);
4079                                    let envelope =
4080                                        build_secondary_acis_envelope_with_security_context(
4081                                            &action,
4082                                            &deny_verdict,
4083                                            DecisionOrigin::ApprovalGate,
4084                                            "websocket",
4085                                            Some(&session_id),
4086                                            Some(effective_security_context),
4087                                        );
4088                                    let _ = state
4089                                        .audit
4090                                        .log_entry_with_acis(
4091                                            &action,
4092                                            &deny_verdict,
4093                                            json!({
4094                                                "source": "ws_proxy",
4095                                                "session": session_id,
4096                                                "transport": "websocket",
4097                                                "event": "presented_approval_replay_denied",
4098                                                "approval_id": matched_approval_id,
4099                                                "extension_id": extension_id,
4100                                                "method": method,
4101                                            }),
4102                                            envelope,
4103                                        )
4104                                        .await;
4105                                    let error_resp = make_ws_error_response(
4106                                        Some(id),
4107                                        -32001,
4108                                        "Denied by policy",
4109                                    );
4110                                    let mut sink = client_sink.lock().await;
4111                                    let _ = sink.send(Message::Text(error_resp.into())).await;
4112                                    continue;
4113                                }
4114
4115                                if let Err(e) = state
4116                                    .audit
4117                                    .log_entry_with_acis(
4118                                        &action,
4119                                        &Verdict::Allow,
4120                                        json!({
4121                                            "source": "ws_proxy",
4122                                            "session": session_id,
4123                                            "transport": "websocket",
4124                                            "extension_id": extension_id,
4125                                        }),
4126                                        acis_envelope,
4127                                    )
4128                                    .await
4129                                {
4130                                    tracing::error!(
4131                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
4132                                        e
4133                                    );
4134                                    // SECURITY (FIND-R215-007): Strict audit mode — fail-closed.
4135                                    // Parity with Deny and RequireApproval paths.
4136                                    if state.audit_strict_mode {
4137                                        let error = make_ws_error_response(
4138                                            Some(id),
4139                                            -32000,
4140                                            "Audit logging failed — request denied (strict audit mode)",
4141                                        );
4142                                        let mut sink = client_sink.lock().await;
4143                                        let _ = sink.send(Message::Text(error.into())).await;
4144                                        continue;
4145                                    }
4146                                }
4147                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
4148                                let forward_text = if state.canonicalize {
4149                                    match serde_json::to_string(&parsed) {
4150                                        Ok(canonical) => canonical,
4151                                        Err(e) => {
4152                                            tracing::error!("SECURITY: WS extension canonicalization failed: {}", e);
4153                                            let error_resp = make_ws_error_response(
4154                                                Some(id),
4155                                                -32603,
4156                                                "Internal error",
4157                                            );
4158                                            let mut sink = client_sink.lock().await;
4159                                            let _ =
4160                                                sink.send(Message::Text(error_resp.into())).await;
4161                                            continue;
4162                                        }
4163                                    }
4164                                } else {
4165                                    text.to_string()
4166                                };
4167                                let mut sink = upstream_sink.lock().await;
4168                                if let Err(e) = sink
4169                                    .send(tokio_tungstenite::tungstenite::Message::Text(
4170                                        forward_text.into(),
4171                                    ))
4172                                    .await
4173                                {
4174                                    tracing::error!("Failed to forward extension request: {}", e);
4175                                    break;
4176                                }
4177                            }
4178                            Verdict::Deny { ref reason } => {
4179                                let mut audit_metadata = json!({
4180                                    "source": "ws_proxy",
4181                                    "session": session_id,
4182                                    "transport": "websocket",
4183                                    "extension_id": extension_id,
4184                                });
4185                                if reason == INVALID_PRESENTED_APPROVAL_REASON
4186                                    && presented_approval_id.is_some()
4187                                {
4188                                    audit_metadata["event"] =
4189                                        json!("presented_approval_replay_denied");
4190                                    audit_metadata["approval_id"] = json!(presented_approval_id);
4191                                    audit_metadata["method"] = json!(method);
4192                                }
4193                                if let Err(e) = state
4194                                    .audit
4195                                    .log_entry_with_acis(
4196                                        &action,
4197                                        &Verdict::Deny {
4198                                            reason: reason.clone(),
4199                                        },
4200                                        audit_metadata,
4201                                        acis_envelope,
4202                                    )
4203                                    .await
4204                                {
4205                                    tracing::error!(
4206                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
4207                                        e
4208                                    );
4209                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
4210                                    if state.audit_strict_mode {
4211                                        let error = make_ws_error_response(
4212                                            Some(id),
4213                                            -32000,
4214                                            "Audit logging failed — request denied (strict audit mode)",
4215                                        );
4216                                        let mut sink = client_sink.lock().await;
4217                                        let _ = sink.send(Message::Text(error.into())).await;
4218                                        continue;
4219                                    }
4220                                }
4221                                // SECURITY (FIND-R213-001): Generic denial message — do not leak
4222                                // detailed policy reason to client. Reason is in the audit log.
4223                                let _ = reason;
4224                                let denial =
4225                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
4226                                let mut sink = client_sink.lock().await;
4227                                let _ = sink.send(Message::Text(denial.into())).await;
4228                            }
4229                            Verdict::RequireApproval { ref reason, .. } => {
4230                                let approval_verdict = Verdict::RequireApproval {
4231                                    reason: reason.clone(),
4232                                };
4233                                let containment_context =
4234                                    super::helpers::approval_containment_context_from_envelope(
4235                                        &acis_envelope,
4236                                        reason,
4237                                    );
4238                                if let Err(e) = state
4239                                    .audit
4240                                    .log_entry_with_acis(
4241                                        &action,
4242                                        &approval_verdict,
4243                                        json!({
4244                                            "source": "ws_proxy",
4245                                            "session": session_id,
4246                                            "transport": "websocket",
4247                                            "extension_id": extension_id,
4248                                        }),
4249                                        acis_envelope,
4250                                    )
4251                                    .await
4252                                {
4253                                    tracing::error!(
4254                                        "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
4255                                        e
4256                                    );
4257                                    // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
4258                                    if state.audit_strict_mode {
4259                                        let error = make_ws_error_response(
4260                                            Some(id),
4261                                            -32000,
4262                                            "Audit logging failed — request denied (strict audit mode)",
4263                                        );
4264                                        let mut sink = client_sink.lock().await;
4265                                        let _ = sink.send(Message::Text(error.into())).await;
4266                                        continue;
4267                                    }
4268                                }
4269                                let approval_id =
4270                                    super::helpers::create_pending_approval_with_context(
4271                                        &state,
4272                                        &session_id,
4273                                        &action,
4274                                        reason,
4275                                        containment_context,
4276                                    )
4277                                    .await;
4278                                let denial = make_ws_error_response_with_data(
4279                                    Some(id),
4280                                    -32001,
4281                                    "Approval required",
4282                                    Some(json!({
4283                                        "verdict": "require_approval",
4284                                        "reason": reason,
4285                                        "approval_id": approval_id,
4286                                    })),
4287                                );
4288                                let mut sink = client_sink.lock().await;
4289                                let _ = sink.send(Message::Text(denial.into())).await;
4290                            }
4291                            _ => {
4292                                let denial =
4293                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
4294                                let mut sink = client_sink.lock().await;
4295                                let _ = sink.send(Message::Text(denial.into())).await;
4296                            }
4297                        }
4298                    }
4299                    MessageType::ElicitationRequest { ref id } => {
4300                        // SECURITY (FIND-R46-010): Policy checks for elicitation requests.
4301                        // Match the HTTP POST handler's elicitation inspection logic.
4302                        let params = parsed.get("params").cloned().unwrap_or(json!({}));
4303                        let elicitation_verdict = {
4304                            let mut session_ref = state.sessions.get_mut(&session_id);
4305                            let current_count = session_ref
4306                                .as_ref()
4307                                .map(|s| s.elicitation_count)
4308                                .unwrap_or(0);
4309                            let verdict = vellaveto_mcp::elicitation::inspect_elicitation(
4310                                &params,
4311                                &state.elicitation_config,
4312                                current_count,
4313                            );
4314                            // Pre-increment while holding the lock to close the TOCTOU gap
4315                            if matches!(
4316                                verdict,
4317                                vellaveto_mcp::elicitation::ElicitationVerdict::Allow
4318                            ) {
4319                                if let Some(ref mut s) = session_ref {
4320                                    // SECURITY (FIND-R51-008): Use saturating_add for consistency.
4321                                    s.elicitation_count = s.elicitation_count.saturating_add(1);
4322                                }
4323                            }
4324                            verdict
4325                        };
4326                        match elicitation_verdict {
4327                            vellaveto_mcp::elicitation::ElicitationVerdict::Allow => {
4328                                let action = Action::new(
4329                                    "vellaveto",
4330                                    "ws_forward_message",
4331                                    json!({
4332                                        "message_type": "elicitation_request",
4333                                        "session": session_id,
4334                                        "transport": "websocket",
4335                                        "direction": "client_to_upstream",
4336                                    }),
4337                                );
4338                                let elicitation_security_context =
4339                                    super::helpers::protocol_forward_channel_security_context(
4340                                        "ws_elicitation_forwarded",
4341                                        vellaveto_types::ContextChannel::ApprovalPrompt,
4342                                    );
4343                                let envelope = build_secondary_acis_envelope_with_security_context(
4344                                    &action,
4345                                    &Verdict::Allow,
4346                                    DecisionOrigin::PolicyEngine,
4347                                    "websocket",
4348                                    Some(&session_id),
4349                                    Some(&elicitation_security_context),
4350                                );
4351                                if let Err(e) = state
4352                                    .audit
4353                                    .log_entry_with_acis(
4354                                        &action,
4355                                        &Verdict::Allow,
4356                                        json!({
4357                                            "source": "ws_proxy",
4358                                            "event": "ws_elicitation_forwarded",
4359                                        }),
4360                                        envelope,
4361                                    )
4362                                    .await
4363                                {
4364                                    tracing::warn!("Failed to audit WS elicitation: {}", e);
4365                                }
4366
4367                                // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
4368                                let forward_text = if state.canonicalize {
4369                                    match serde_json::to_string(&parsed) {
4370                                        Ok(canonical) => canonical,
4371                                        Err(e) => {
4372                                            tracing::error!("SECURITY: WS elicitation canonicalization failed: {}", e);
4373                                            let error_resp = make_ws_error_response(
4374                                                Some(id),
4375                                                -32603,
4376                                                "Internal error",
4377                                            );
4378                                            let mut sink = client_sink.lock().await;
4379                                            let _ =
4380                                                sink.send(Message::Text(error_resp.into())).await;
4381                                            continue;
4382                                        }
4383                                    }
4384                                } else {
4385                                    text.to_string()
4386                                };
4387                                let mut sink = upstream_sink.lock().await;
4388                                if let Err(e) = sink
4389                                    .send(tokio_tungstenite::tungstenite::Message::Text(
4390                                        forward_text.into(),
4391                                    ))
4392                                    .await
4393                                {
4394                                    // Rollback pre-incremented count on forward failure
4395                                    if let Some(mut s) = state.sessions.get_mut(&session_id) {
4396                                        s.elicitation_count = s.elicitation_count.saturating_sub(1);
4397                                    }
4398                                    tracing::error!("Failed to forward elicitation: {}", e);
4399                                    break;
4400                                }
4401                            }
4402                            vellaveto_mcp::elicitation::ElicitationVerdict::Deny { reason } => {
4403                                tracing::warn!(
4404                                    session_id = %session_id,
4405                                    "Blocked WS elicitation/create: {}",
4406                                    reason
4407                                );
4408                                let action = Action::new(
4409                                    "vellaveto",
4410                                    "ws_elicitation_interception",
4411                                    json!({
4412                                        "method": "elicitation/create",
4413                                        "session": session_id,
4414                                        "transport": "websocket",
4415                                        "reason": &reason,
4416                                    }),
4417                                );
4418                                let verdict = Verdict::Deny {
4419                                    reason: reason.clone(),
4420                                };
4421                                let elicitation_security_context =
4422                                    super::helpers::elicitation_interception_security_context(
4423                                        &action,
4424                                    );
4425                                let envelope = build_secondary_acis_envelope_with_security_context(
4426                                    &action,
4427                                    &verdict,
4428                                    DecisionOrigin::PolicyEngine,
4429                                    "websocket",
4430                                    Some(&session_id),
4431                                    Some(&elicitation_security_context),
4432                                );
4433                                if let Err(e) = state
4434                                    .audit
4435                                    .log_entry_with_acis(
4436                                        &action,
4437                                        &verdict,
4438                                        json!({
4439                                            "source": "ws_proxy",
4440                                            "event": "ws_elicitation_interception",
4441                                        }),
4442                                        envelope,
4443                                    )
4444                                    .await
4445                                {
4446                                    tracing::warn!(
4447                                        "Failed to audit WS elicitation interception: {}",
4448                                        e
4449                                    );
4450                                }
4451                                // SECURITY (FIND-R46-012, FIND-R55-WS-006): Generic message to client.
4452                                let error =
4453                                    make_ws_error_response(Some(id), -32001, "Denied by policy");
4454                                let mut sink = client_sink.lock().await;
4455                                let _ = sink.send(Message::Text(error.into())).await;
4456                            }
4457                        }
4458                    }
4459                    MessageType::PassThrough | MessageType::ProgressNotification { .. } => {
4460                        // SECURITY (FIND-R76-003): DLP scan PassThrough params for secrets.
4461                        // Parity with HTTP handler (handlers.rs:1795-1859). Agents could
4462                        // exfiltrate secrets via prompts/get, completion/complete, or any
4463                        // PassThrough method's parameters.
4464                        // SECURITY (FIND-R97-001): Remove method gate — JSON-RPC responses
4465                        // (sampling/elicitation replies) have no `method` field but carry
4466                        // data in `result`. Parity with stdio proxy FIND-R96-001.
4467                        if state.response_dlp_enabled {
4468                            let mut dlp_findings = scan_notification_for_secrets(&parsed);
4469                            // SECURITY (FIND-R97-001): Also scan `result` field for responses.
4470                            if let Some(result_val) = parsed.get("result") {
4471                                dlp_findings.extend(scan_parameters_for_secrets(result_val));
4472                            }
4473                            // SECURITY (FIND-R83-006): Cap combined findings from params+result
4474                            // scans to maintain per-scan invariant (1000).
4475                            dlp_findings.truncate(1000);
4476                            if !dlp_findings.is_empty() {
4477                                for finding in &dlp_findings {
4478                                    record_dlp_finding(&finding.pattern_name);
4479                                }
4480                                let patterns: Vec<String> = dlp_findings
4481                                    .iter()
4482                                    .map(|f| format!("{}:{}", f.pattern_name, f.location))
4483                                    .collect();
4484                                tracing::warn!(
4485                                    "SECURITY: Secrets in WS passthrough params! Session: {}, Findings: {:?}",
4486                                    session_id,
4487                                    patterns
4488                                );
4489                                let n_action = Action::new(
4490                                    "vellaveto",
4491                                    "notification_dlp_scan",
4492                                    json!({
4493                                        "findings": patterns,
4494                                        "session": session_id,
4495                                        "transport": "websocket",
4496                                    }),
4497                                );
4498                                let verdict = if state.response_dlp_blocking {
4499                                    Verdict::Deny {
4500                                        reason: format!(
4501                                            "Notification blocked: secrets detected ({patterns:?})"
4502                                        ),
4503                                    }
4504                                } else {
4505                                    Verdict::Allow
4506                                };
4507                                let notification_security_context =
4508                                    super::helpers::notification_dlp_security_context(
4509                                        &parsed,
4510                                        state.response_dlp_blocking,
4511                                    );
4512                                let envelope = build_secondary_acis_envelope_with_security_context(
4513                                    &n_action,
4514                                    &verdict,
4515                                    DecisionOrigin::Dlp,
4516                                    "websocket",
4517                                    Some(&session_id),
4518                                    Some(&notification_security_context),
4519                                );
4520                                if let Err(e) = state
4521                                    .audit
4522                                    .log_entry_with_acis(
4523                                        &n_action,
4524                                        &verdict,
4525                                        json!({
4526                                            "source": "ws_proxy",
4527                                            "event": "notification_dlp_alert",
4528                                            "blocked": state.response_dlp_blocking,
4529                                        }),
4530                                        envelope,
4531                                    )
4532                                    .await
4533                                {
4534                                    tracing::warn!("Failed to audit WS passthrough DLP: {}", e);
4535                                }
4536                                if state.response_dlp_blocking {
4537                                    // Drop the message silently (passthrough has no id to respond to)
4538                                    continue;
4539                                }
4540                            }
4541                        }
4542
4543                        // SECURITY (FIND-R130-001): Injection scanning on PassThrough parameters.
4544                        // Parity with HTTP handler (handlers.rs FIND-R112-008) and gRPC handler
4545                        // (service.rs FIND-R113-001). An agent could inject prompt injection
4546                        // payloads via any PassThrough method's parameters.
4547                        if !state.injection_disabled {
4548                            let mut inj_parts = Vec::new();
4549                            if let Some(params) = parsed.get("params") {
4550                                extract_strings_recursive(params, &mut inj_parts, 0);
4551                            }
4552                            if let Some(result) = parsed.get("result") {
4553                                extract_strings_recursive(result, &mut inj_parts, 0);
4554                            }
4555                            let scannable = inj_parts.join("\n");
4556                            if !scannable.is_empty() {
4557                                let injection_matches: Vec<String> =
4558                                    if let Some(ref scanner) = state.injection_scanner {
4559                                        scanner
4560                                            .inspect(&scannable)
4561                                            .into_iter()
4562                                            .map(|s| s.to_string())
4563                                            .collect()
4564                                    } else {
4565                                        inspect_for_injection(&scannable)
4566                                            .into_iter()
4567                                            .map(|s| s.to_string())
4568                                            .collect()
4569                                    };
4570
4571                                if !injection_matches.is_empty() {
4572                                    tracing::warn!(
4573                                        "SECURITY: Injection in WS passthrough params! \
4574                                         Session: {}, Patterns: {:?}",
4575                                        session_id,
4576                                        injection_matches,
4577                                    );
4578
4579                                    let verdict = if state.injection_blocking {
4580                                        Verdict::Deny {
4581                                            reason: format!(
4582                                                "WS passthrough injection blocked: {injection_matches:?}"
4583                                            ),
4584                                        }
4585                                    } else {
4586                                        Verdict::Allow
4587                                    };
4588
4589                                    let inj_action = Action::new(
4590                                        "vellaveto",
4591                                        "ws_passthrough_injection_scan",
4592                                        json!({
4593                                            "matched_patterns": injection_matches,
4594                                            "session": session_id,
4595                                            "transport": "websocket",
4596                                            "direction": "client_to_upstream",
4597                                        }),
4598                                    );
4599                                    let injection_security_context =
4600                                        super::helpers::notification_injection_security_context(
4601                                            &parsed,
4602                                            state.injection_blocking,
4603                                            "passthrough_injection",
4604                                        );
4605                                    let envelope =
4606                                        build_secondary_acis_envelope_with_security_context(
4607                                            &inj_action,
4608                                            &verdict,
4609                                            DecisionOrigin::InjectionScanner,
4610                                            "websocket",
4611                                            Some(&session_id),
4612                                            Some(&injection_security_context),
4613                                        );
4614                                    if let Err(e) = state
4615                                        .audit
4616                                        .log_entry_with_acis(
4617                                            &inj_action,
4618                                            &verdict,
4619                                            json!({
4620                                                "source": "ws_proxy",
4621                                                "event": "ws_passthrough_injection_detected",
4622                                                "blocking": state.injection_blocking,
4623                                            }),
4624                                            envelope,
4625                                        )
4626                                        .await
4627                                    {
4628                                        tracing::warn!(
4629                                            "Failed to audit WS passthrough injection: {}",
4630                                            e
4631                                        );
4632                                    }
4633
4634                                    if state.injection_blocking {
4635                                        // Drop the message (passthrough has no id to respond to)
4636                                        continue;
4637                                    }
4638                                }
4639                            }
4640                        }
4641
4642                        // SECURITY (IMP-R182-009): Memory poisoning check — parity with
4643                        // tool calls, resource reads, tasks, and extension methods.
4644                        if let Some(mut session) = state.sessions.get_mut(&session_id) {
4645                            let params_to_scan = parsed.get("params").cloned().unwrap_or(json!({}));
4646                            // SECURITY (IMP-R184-010): Also scan `result` field — parity
4647                            // with DLP scan which scans both params and result.
4648                            let mut poisoning_matches =
4649                                session.memory_tracker.check_parameters(&params_to_scan);
4650                            if let Some(result_val) = parsed.get("result") {
4651                                poisoning_matches
4652                                    .extend(session.memory_tracker.check_parameters(result_val));
4653                            }
4654                            if !poisoning_matches.is_empty() {
4655                                let method_name = parsed
4656                                    .get("method")
4657                                    .and_then(|m| m.as_str())
4658                                    .unwrap_or("unknown");
4659                                for m in &poisoning_matches {
4660                                    tracing::warn!(
4661                                        "SECURITY: Memory poisoning in WS passthrough '{}' (session {}): \
4662                                         param '{}' replayed data (fingerprint: {})",
4663                                        method_name,
4664                                        session_id,
4665                                        m.param_location,
4666                                        m.fingerprint
4667                                    );
4668                                }
4669                                let poison_action = Action::new(
4670                                    "vellaveto",
4671                                    "ws_passthrough_memory_poisoning",
4672                                    json!({
4673                                        "method": method_name,
4674                                        "session": session_id,
4675                                        "matches": poisoning_matches.len(),
4676                                        "transport": "websocket",
4677                                    }),
4678                                );
4679                                let passthrough_poison_verdict = Verdict::Deny {
4680                                    reason: format!(
4681                                        "WS passthrough blocked: memory poisoning ({} matches)",
4682                                        poisoning_matches.len()
4683                                    ),
4684                                };
4685                                let poisoning_security_context =
4686                                    super::helpers::notification_memory_poisoning_security_context(
4687                                        &parsed,
4688                                        "memory_poisoning",
4689                                    );
4690                                let envelope = build_secondary_acis_envelope_with_security_context(
4691                                    &poison_action,
4692                                    &passthrough_poison_verdict,
4693                                    DecisionOrigin::MemoryPoisoning,
4694                                    "websocket",
4695                                    Some(&session_id),
4696                                    Some(&poisoning_security_context),
4697                                );
4698                                if let Err(e) = state
4699                                    .audit
4700                                    .log_entry_with_acis(
4701                                        &poison_action,
4702                                        &passthrough_poison_verdict,
4703                                        json!({
4704                                            "source": "ws_proxy",
4705                                            "event": "ws_passthrough_memory_poisoning",
4706                                        }),
4707                                        envelope,
4708                                    )
4709                                    .await
4710                                {
4711                                    tracing::warn!(
4712                                        "Failed to audit WS passthrough memory poisoning: {}",
4713                                        e
4714                                    );
4715                                }
4716                                continue; // Drop the message
4717                            }
4718                            // Fingerprint for future poisoning detection.
4719                            session.memory_tracker.extract_from_value(&params_to_scan);
4720                            if let Some(result_val) = parsed.get("result") {
4721                                session.memory_tracker.extract_from_value(result_val);
4722                            }
4723                        } else {
4724                            // IMP-R186-005: Log when session is missing so the skip is observable.
4725                            tracing::warn!(
4726                                "Session {} not found for WS passthrough memory poisoning check",
4727                                session_id
4728                            );
4729                        }
4730
4731                        // SECURITY (FIND-R46-WS-004): Audit log forwarded passthrough/notification messages
4732                        let msg_type = match &classified {
4733                            MessageType::ProgressNotification { .. } => "progress_notification",
4734                            _ => "passthrough",
4735                        };
4736                        let action = Action::new(
4737                            "vellaveto",
4738                            "ws_forward_message",
4739                            json!({
4740                                "message_type": msg_type,
4741                                "session": session_id,
4742                                "transport": "websocket",
4743                                "direction": "client_to_upstream",
4744                            }),
4745                        );
4746                        let protocol_security_context =
4747                            super::helpers::protocol_forward_security_context(
4748                                "ws_message_forwarded",
4749                            );
4750                        let envelope = build_secondary_acis_envelope_with_security_context(
4751                            &action,
4752                            &Verdict::Allow,
4753                            DecisionOrigin::PolicyEngine,
4754                            "websocket",
4755                            Some(&session_id),
4756                            Some(&protocol_security_context),
4757                        );
4758                        if let Err(e) = state
4759                            .audit
4760                            .log_entry_with_acis(
4761                                &action,
4762                                &Verdict::Allow,
4763                                json!({
4764                                    "source": "ws_proxy",
4765                                    "event": "ws_message_forwarded",
4766                                }),
4767                                envelope,
4768                            )
4769                            .await
4770                        {
4771                            tracing::warn!("Failed to audit WS passthrough: {}", e);
4772                        }
4773
4774                        // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
4775                        let forward_text = if state.canonicalize {
4776                            match serde_json::to_string(&parsed) {
4777                                Ok(canonical) => canonical,
4778                                Err(e) => {
4779                                    tracing::error!(
4780                                        "SECURITY: WS passthrough canonicalization failed: {}",
4781                                        e
4782                                    );
4783                                    continue;
4784                                }
4785                            }
4786                        } else {
4787                            text.to_string()
4788                        };
4789                        let mut sink = upstream_sink.lock().await;
4790                        if let Err(e) = sink
4791                            .send(tokio_tungstenite::tungstenite::Message::Text(
4792                                forward_text.into(),
4793                            ))
4794                            .await
4795                        {
4796                            tracing::error!("Failed to forward passthrough: {}", e);
4797                            break;
4798                        }
4799                    }
4800                }
4801            }
4802            Message::Binary(_data) => {
4803                // SECURITY: Binary frames not allowed for JSON-RPC
4804                tracing::warn!(
4805                    session_id = %session_id,
4806                    "Binary WebSocket frame rejected (JSON-RPC is text-only)"
4807                );
4808
4809                // SECURITY (FIND-R46-WS-004): Audit log binary frame rejection
4810                let action = Action::new(
4811                    "vellaveto",
4812                    "ws_binary_frame_rejected",
4813                    json!({
4814                        "session": session_id,
4815                        "transport": "websocket",
4816                        "direction": "client_to_upstream",
4817                    }),
4818                );
4819                let binary_verdict = Verdict::Deny {
4820                    reason: "Binary frames not supported for JSON-RPC".to_string(),
4821                };
4822                let binary_security_context =
4823                    super::helpers::protocol_binary_rejection_security_context(
4824                        "ws_binary_frame_rejected",
4825                    );
4826                let envelope = build_secondary_acis_envelope_with_security_context(
4827                    &action,
4828                    &binary_verdict,
4829                    DecisionOrigin::PolicyEngine,
4830                    "websocket",
4831                    Some(&session_id),
4832                    Some(&binary_security_context),
4833                );
4834                if let Err(e) = state
4835                    .audit
4836                    .log_entry_with_acis(
4837                        &action,
4838                        &binary_verdict,
4839                        json!({
4840                            "source": "ws_proxy",
4841                            "event": "ws_binary_frame_rejected",
4842                        }),
4843                        envelope,
4844                    )
4845                    .await
4846                {
4847                    tracing::warn!("Failed to audit WS binary frame rejection: {}", e);
4848                }
4849
4850                let mut sink = client_sink.lock().await;
4851                let _ = sink
4852                    .send(Message::Close(Some(CloseFrame {
4853                        code: CLOSE_UNSUPPORTED_DATA,
4854                        reason: "Binary frames not supported".into(),
4855                    })))
4856                    .await;
4857                break;
4858            }
4859            Message::Ping(data) => {
4860                let mut sink = client_sink.lock().await;
4861                let _ = sink.send(Message::Pong(data)).await;
4862            }
4863            Message::Pong(_) => {
4864                // Ignored
4865            }
4866            Message::Close(_) => {
4867                tracing::debug!(session_id = %session_id, "Client sent close frame");
4868                break;
4869            }
4870        }
4871    }
4872}
4873
4874/// Relay messages from upstream to client with DLP and injection scanning.
4875#[allow(clippy::too_many_arguments)]
4876async fn relay_upstream_to_client(
4877    mut upstream_stream: futures_util::stream::SplitStream<
4878        tokio_tungstenite::WebSocketStream<
4879            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
4880        >,
4881    >,
4882    client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
4883    state: ProxyState,
4884    session_id: String,
4885    ws_config: WebSocketConfig,
4886    upstream_rate_counter: Arc<AtomicU64>,
4887    upstream_rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
4888    last_activity: Arc<AtomicU64>,
4889    connection_epoch: std::time::Instant,
4890) {
4891    while let Some(msg_result) = upstream_stream.next().await {
4892        let msg = match msg_result {
4893            Ok(m) => m,
4894            Err(e) => {
4895                tracing::debug!(session_id = %session_id, "Upstream WS error: {}", e);
4896                break;
4897            }
4898        };
4899
4900        // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
4901        // SECURITY (R251-WS-1): SeqCst ensures visibility to timeout checker thread.
4902        last_activity.store(
4903            connection_epoch.elapsed().as_millis() as u64,
4904            Ordering::SeqCst,
4905        );
4906
4907        record_ws_message("upstream_to_client");
4908
4909        // SECURITY (FIND-R46-WS-003): Rate limiting on upstream→client direction.
4910        // A malicious or compromised upstream could flood the client with messages.
4911        if !check_rate_limit(
4912            &upstream_rate_counter,
4913            &upstream_rate_window_start,
4914            ws_config.upstream_rate_limit,
4915        ) {
4916            tracing::warn!(
4917                session_id = %session_id,
4918                "WebSocket upstream rate limit exceeded ({}/s), dropping message",
4919                ws_config.upstream_rate_limit,
4920            );
4921
4922            let action = Action::new(
4923                "vellaveto",
4924                "ws_upstream_rate_limit",
4925                json!({
4926                    "session": session_id,
4927                    "transport": "websocket",
4928                    "direction": "upstream_to_client",
4929                    "limit": ws_config.upstream_rate_limit,
4930                }),
4931            );
4932            let rate_verdict = Verdict::Deny {
4933                reason: "Upstream rate limit exceeded".to_string(),
4934            };
4935            let rate_limit_security_context =
4936                super::helpers::protocol_rate_limit_security_context("ws_upstream_rate_limit");
4937            let envelope = build_secondary_acis_envelope_with_security_context(
4938                &action,
4939                &rate_verdict,
4940                DecisionOrigin::RateLimiter,
4941                "websocket",
4942                Some(&session_id),
4943                Some(&rate_limit_security_context),
4944            );
4945            if let Err(e) = state
4946                .audit
4947                .log_entry_with_acis(
4948                    &action,
4949                    &rate_verdict,
4950                    json!({
4951                        "source": "ws_proxy",
4952                        "event": "ws_upstream_rate_limit_exceeded",
4953                    }),
4954                    envelope,
4955                )
4956                .await
4957            {
4958                tracing::warn!("Failed to audit WS upstream rate limit: {}", e);
4959            }
4960
4961            metrics::counter!(
4962                "vellaveto_ws_upstream_rate_limited_total",
4963                "session" => session_id.clone()
4964            )
4965            .increment(1);
4966
4967            // Drop the message (don't close the connection — upstream flood should not
4968            // disconnect the client, just throttle the flow)
4969            continue;
4970        }
4971
4972        match msg {
4973            tokio_tungstenite::tungstenite::Message::Text(text) => {
4974                // Try to parse for scanning
4975                let forward = if let Ok(json_val) = serde_json::from_str::<Value>(&text) {
4976                    // Resolve tracked tool context for response-side schema checks.
4977                    let tracked_tool_name =
4978                        take_tracked_tool_call(&state.sessions, &session_id, json_val.get("id"));
4979
4980                    // SECURITY (FIND-R75-003): Track whether DLP or injection was detected
4981                    // (even in log-only mode) to gate memory_tracker.record_response().
4982                    // Recording fingerprints from tainted responses would poison the tracker.
4983                    let mut dlp_found = false;
4984                    let mut injection_found = false;
4985
4986                    // DLP scanning on responses
4987                    if state.response_dlp_enabled {
4988                        let dlp_findings = scan_response_for_secrets(&json_val);
4989                        if !dlp_findings.is_empty() {
4990                            dlp_found = true;
4991                            for finding in &dlp_findings {
4992                                record_dlp_finding(&finding.pattern_name);
4993                            }
4994
4995                            let patterns: Vec<String> = dlp_findings
4996                                .iter()
4997                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
4998                                .collect();
4999
5000                            tracing::warn!(
5001                                "SECURITY: Secrets in WS response! Session: {}, Findings: {:?}",
5002                                session_id,
5003                                patterns,
5004                            );
5005
5006                            let verdict = if state.response_dlp_blocking {
5007                                Verdict::Deny {
5008                                    reason: format!("WS response DLP blocked: {patterns:?}"),
5009                                }
5010                            } else {
5011                                Verdict::Allow
5012                            };
5013
5014                            let action = Action::new(
5015                                "vellaveto",
5016                                "ws_response_dlp_scan",
5017                                json!({
5018                                    "findings": patterns,
5019                                    "session": session_id,
5020                                    "transport": "websocket",
5021                                }),
5022                            );
5023                            let dlp_security_context =
5024                                super::helpers::response_dlp_security_context(
5025                                    tracked_tool_name.as_deref(),
5026                                    &json_val,
5027                                    state.response_dlp_blocking,
5028                                );
5029                            let envelope = build_secondary_acis_envelope_with_security_context(
5030                                &action,
5031                                &verdict,
5032                                DecisionOrigin::Dlp,
5033                                "websocket",
5034                                Some(&session_id),
5035                                Some(&dlp_security_context),
5036                            );
5037                            if let Err(e) = state
5038                                .audit
5039                                .log_entry_with_acis(
5040                                    &action,
5041                                    &verdict,
5042                                    json!({
5043                                        "source": "ws_proxy",
5044                                        "event": "ws_response_dlp_alert",
5045                                    }),
5046                                    envelope,
5047                                )
5048                                .await
5049                            {
5050                                tracing::warn!("Failed to audit WS DLP: {}", e);
5051                            }
5052
5053                            if state.response_dlp_blocking {
5054                                // Send error response instead
5055                                let id = json_val.get("id");
5056                                let error = make_ws_error_response(
5057                                    id,
5058                                    -32001,
5059                                    "Response blocked by DLP policy",
5060                                );
5061                                let mut sink = client_sink.lock().await;
5062                                let _ = sink.send(Message::Text(error.into())).await;
5063                                continue;
5064                            }
5065                        }
5066                    }
5067
5068                    // Injection scanning
5069                    if !state.injection_disabled {
5070                        let text_to_scan = extract_scannable_text(&json_val);
5071                        if !text_to_scan.is_empty() {
5072                            let injection_matches: Vec<String> =
5073                                if let Some(ref scanner) = state.injection_scanner {
5074                                    scanner
5075                                        .inspect(&text_to_scan)
5076                                        .into_iter()
5077                                        .map(|s| s.to_string())
5078                                        .collect()
5079                                } else {
5080                                    inspect_for_injection(&text_to_scan)
5081                                        .into_iter()
5082                                        .map(|s| s.to_string())
5083                                        .collect()
5084                                };
5085
5086                            if !injection_matches.is_empty() {
5087                                injection_found = true;
5088                                tracing::warn!(
5089                                    "SECURITY: Injection in WS response! Session: {}, Patterns: {:?}",
5090                                    session_id,
5091                                    injection_matches,
5092                                );
5093
5094                                let verdict = if state.injection_blocking {
5095                                    Verdict::Deny {
5096                                        reason: format!(
5097                                            "WS response injection blocked: {injection_matches:?}"
5098                                        ),
5099                                    }
5100                                } else {
5101                                    Verdict::Allow
5102                                };
5103
5104                                let action = Action::new(
5105                                    "vellaveto",
5106                                    "ws_response_injection",
5107                                    json!({
5108                                        "matched_patterns": injection_matches,
5109                                        "session": session_id,
5110                                        "transport": "websocket",
5111                                    }),
5112                                );
5113                                let injection_security_context =
5114                                    super::helpers::response_injection_security_context(
5115                                        tracked_tool_name.as_deref(),
5116                                        &json_val,
5117                                        state.injection_blocking,
5118                                        "response_injection",
5119                                    );
5120                                let envelope = build_secondary_acis_envelope_with_security_context(
5121                                    &action,
5122                                    &verdict,
5123                                    DecisionOrigin::InjectionScanner,
5124                                    "websocket",
5125                                    Some(&session_id),
5126                                    Some(&injection_security_context),
5127                                );
5128                                if let Err(e) = state
5129                                    .audit
5130                                    .log_entry_with_acis(
5131                                        &action,
5132                                        &verdict,
5133                                        json!({
5134                                            "source": "ws_proxy",
5135                                            "event": "ws_injection_detected",
5136                                        }),
5137                                        envelope,
5138                                    )
5139                                    .await
5140                                {
5141                                    tracing::warn!("Failed to audit WS injection: {}", e);
5142                                }
5143
5144                                if state.injection_blocking {
5145                                    let id = json_val.get("id");
5146                                    let error = make_ws_error_response(
5147                                        id,
5148                                        -32001,
5149                                        "Response blocked: injection detected",
5150                                    );
5151                                    let mut sink = client_sink.lock().await;
5152                                    let _ = sink.send(Message::Text(error.into())).await;
5153                                    continue;
5154                                }
5155                            }
5156                        }
5157                    }
5158
5159                    // SECURITY (FIND-R46-007): Rug-pull detection on tools/list responses.
5160                    // Check if this is a response to a tools/list request and extract
5161                    // annotations for rug-pull detection.
5162                    if json_val.get("result").is_some() {
5163                        // Check if result contains "tools" array (tools/list response)
5164                        if json_val
5165                            .get("result")
5166                            .and_then(|r| r.get("tools"))
5167                            .and_then(|t| t.as_array())
5168                            .is_some()
5169                        {
5170                            super::helpers::extract_annotations_from_response(
5171                                &json_val,
5172                                &session_id,
5173                                &state.sessions,
5174                                &state.audit,
5175                                &state.known_tools,
5176                            )
5177                            .await;
5178
5179                            // Verify manifest if configured
5180                            if let Some(ref manifest_config) = state.manifest_config {
5181                                super::helpers::verify_manifest_from_response(
5182                                    &json_val,
5183                                    &session_id,
5184                                    &state.sessions,
5185                                    manifest_config,
5186                                    &state.audit,
5187                                )
5188                                .await;
5189                            }
5190
5191                            // SECURITY (FIND-R130-003): Scan tool descriptions for embedded
5192                            // injection. Parity with HTTP upstream handler (upstream.rs:648-698).
5193                            if !state.injection_disabled {
5194                                let desc_findings =
5195                                    if let Some(ref scanner) = state.injection_scanner {
5196                                        scan_tool_descriptions_with_scanner(&json_val, scanner)
5197                                    } else {
5198                                        scan_tool_descriptions(&json_val)
5199                                    };
5200                                for finding in &desc_findings {
5201                                    injection_found = true;
5202                                    tracing::warn!(
5203                                        "SECURITY: Injection in tool '{}' description! \
5204                                         Session: {}, Patterns: {:?}",
5205                                        finding.tool_name,
5206                                        session_id,
5207                                        finding.matched_patterns
5208                                    );
5209                                    let action = Action::new(
5210                                        "vellaveto",
5211                                        "tool_description_injection",
5212                                        json!({
5213                                            "tool": finding.tool_name,
5214                                            "matched_patterns": finding.matched_patterns,
5215                                            "session": session_id,
5216                                            "transport": "websocket",
5217                                            "blocking": state.injection_blocking,
5218                                        }),
5219                                    );
5220                                    let tool_desc_verdict = Verdict::Deny {
5221                                        reason: format!(
5222                                            "Tool '{}' description contains injection: {:?}",
5223                                            finding.tool_name, finding.matched_patterns
5224                                        ),
5225                                    };
5226                                    let desc_security_context =
5227                                        super::helpers::tool_discovery_integrity_security_context(
5228                                            &finding.tool_name,
5229                                            vellaveto_types::ContextChannel::CommandLike,
5230                                            "tool_description_injection",
5231                                            true,
5232                                        );
5233                                    let envelope =
5234                                        build_secondary_acis_envelope_with_security_context(
5235                                            &action,
5236                                            &tool_desc_verdict,
5237                                            DecisionOrigin::InjectionScanner,
5238                                            "websocket",
5239                                            Some(&session_id),
5240                                            Some(&desc_security_context),
5241                                        );
5242                                    if let Err(e) = state
5243                                        .audit
5244                                        .log_entry_with_acis(
5245                                            &action,
5246                                            &tool_desc_verdict,
5247                                            json!({
5248                                                "source": "ws_proxy",
5249                                                "event": "tool_description_injection",
5250                                            }),
5251                                            envelope,
5252                                        )
5253                                        .await
5254                                    {
5255                                        tracing::warn!(
5256                                            "Failed to audit WS tool description injection: {}",
5257                                            e
5258                                        );
5259                                    }
5260                                }
5261                                if !desc_findings.is_empty() && state.injection_blocking {
5262                                    let id = json_val.get("id");
5263                                    let error = make_ws_error_response(
5264                                        id,
5265                                        -32001,
5266                                        "Response blocked: suspicious content in tool descriptions",
5267                                    );
5268                                    let mut sink = client_sink.lock().await;
5269                                    let _ = sink.send(Message::Text(error.into())).await;
5270                                    continue;
5271                                }
5272                            }
5273                        }
5274                    }
5275
5276                    // SECURITY: Enforce output schema on WS structuredContent.
5277                    // SECURITY (FIND-R154-004): Track schema violations for the
5278                    // record_response guard below, even in non-blocking mode.
5279                    let schema_violation_found = validate_ws_structured_content_response(
5280                        &json_val,
5281                        &state,
5282                        &session_id,
5283                        tracked_tool_name.as_deref(),
5284                    )
5285                    .await;
5286                    if schema_violation_found {
5287                        let id = json_val.get("id");
5288                        let error = make_ws_error_response(
5289                            id,
5290                            -32001,
5291                            "Response blocked: output schema validation failed",
5292                        );
5293                        let mut sink = client_sink.lock().await;
5294                        let _ = sink.send(Message::Text(error.into())).await;
5295                        continue;
5296                    }
5297
5298                    // SECURITY (FIND-R75-003, FIND-R154-004): Record response
5299                    // fingerprints for memory poisoning detection. Skip recording
5300                    // when DLP, injection, or schema violation was detected (even
5301                    // in log-only mode) to avoid poisoning the tracker with tainted
5302                    // data. Parity with stdio relay (relay.rs:2919).
5303                    if !dlp_found && !injection_found && !schema_violation_found {
5304                        if let Some(mut session) = state.sessions.get_mut(&session_id) {
5305                            session.memory_tracker.record_response(&json_val);
5306                        }
5307                    }
5308
5309                    // SECURITY (FIND-R46-WS-004): Audit log forwarded upstream→client text messages
5310                    {
5311                        let msg_type = if json_val.get("result").is_some() {
5312                            "response"
5313                        } else if json_val.get("error").is_some() {
5314                            "error_response"
5315                        } else if json_val.get("method").is_some() {
5316                            "notification"
5317                        } else {
5318                            "unknown"
5319                        };
5320                        let action = Action::new(
5321                            "vellaveto",
5322                            "ws_forward_upstream_message",
5323                            json!({
5324                                "message_type": msg_type,
5325                                "session": session_id,
5326                                "transport": "websocket",
5327                                "direction": "upstream_to_client",
5328                            }),
5329                        );
5330                        let message_security_context =
5331                            super::helpers::protocol_message_forward_security_context(
5332                                &json_val,
5333                                "ws_upstream_message_forwarded",
5334                            );
5335                        let envelope = build_secondary_acis_envelope_with_security_context(
5336                            &action,
5337                            &Verdict::Allow,
5338                            DecisionOrigin::PolicyEngine,
5339                            "websocket",
5340                            Some(&session_id),
5341                            Some(&message_security_context),
5342                        );
5343                        if let Err(e) = state
5344                            .audit
5345                            .log_entry_with_acis(
5346                                &action,
5347                                &Verdict::Allow,
5348                                json!({
5349                                    "source": "ws_proxy",
5350                                    "event": "ws_upstream_message_forwarded",
5351                                }),
5352                                envelope,
5353                            )
5354                            .await
5355                        {
5356                            tracing::warn!("Failed to audit WS upstream message forward: {}", e);
5357                        }
5358                    }
5359
5360                    // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
5361                    if state.canonicalize {
5362                        match serde_json::to_string(&json_val) {
5363                            Ok(canonical) => canonical,
5364                            Err(e) => {
5365                                tracing::error!(
5366                                    "SECURITY: WS response canonicalization failed: {}",
5367                                    e
5368                                );
5369                                continue;
5370                            }
5371                        }
5372                    } else {
5373                        text.to_string()
5374                    }
5375                } else {
5376                    // SECURITY (FIND-R166-001): Non-JSON upstream text must still be
5377                    // scanned for DLP/injection before forwarding. A malicious upstream
5378                    // could exfiltrate secrets or inject payloads via non-JSON frames.
5379                    let text_str: &str = &text;
5380                    // SECURITY (FIND-R168-001): DLP scan with audit logging parity.
5381                    if state.response_dlp_enabled {
5382                        let findings = scan_text_for_secrets(text_str, "ws.upstream.non_json_text");
5383                        if !findings.is_empty() {
5384                            let patterns: Vec<String> = findings
5385                                .iter()
5386                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
5387                                .collect();
5388                            tracing::warn!(
5389                                session_id = %session_id,
5390                                "DLP: non-JSON upstream text contains sensitive data ({} findings)",
5391                                findings.len(),
5392                            );
5393                            let verdict = if state.response_dlp_blocking {
5394                                Verdict::Deny {
5395                                    reason: format!("WS non-JSON DLP: {patterns:?}"),
5396                                }
5397                            } else {
5398                                Verdict::Allow
5399                            };
5400                            let action = Action::new(
5401                                "vellaveto",
5402                                "ws_nonjson_dlp_scan",
5403                                json!({ "findings": patterns, "session": session_id, "transport": "websocket" }),
5404                            );
5405                            // SECURITY (SE-004): Log audit failures instead of silently discarding.
5406                            let text_security_context = super::helpers::text_dlp_security_context(
5407                                text_str,
5408                                state.response_dlp_blocking,
5409                                "ws_nonjson_dlp",
5410                            );
5411                            let envelope = build_secondary_acis_envelope_with_security_context(
5412                                &action,
5413                                &verdict,
5414                                DecisionOrigin::Dlp,
5415                                "websocket",
5416                                Some(&session_id),
5417                                Some(&text_security_context),
5418                            );
5419                            if let Err(e) = state.audit.log_entry_with_acis(
5420                                &action, &verdict,
5421                                json!({ "source": "ws_proxy", "event": "ws_nonjson_dlp_alert" }),
5422                                envelope,
5423                            ).await {
5424                                tracing::error!(
5425                                    session_id = %session_id,
5426                                    error = %e,
5427                                    "AUDIT FAILURE: failed to log ws_nonjson_dlp_alert"
5428                                );
5429                            }
5430                            if state.response_dlp_blocking {
5431                                continue;
5432                            }
5433                        }
5434                    }
5435                    // SECURITY (FIND-R168-002): Injection scan with log-only mode
5436                    // parity. Always log detections; only block when injection_blocking.
5437                    {
5438                        let alerts: Vec<String> = if let Some(ref scanner) = state.injection_scanner
5439                        {
5440                            scanner
5441                                .inspect(text_str)
5442                                .into_iter()
5443                                .map(|s| s.to_string())
5444                                .collect()
5445                        } else {
5446                            inspect_for_injection(text_str)
5447                                .into_iter()
5448                                .map(|s| s.to_string())
5449                                .collect()
5450                        };
5451                        if !alerts.is_empty() {
5452                            tracing::warn!(
5453                                session_id = %session_id,
5454                                "Injection: non-JSON upstream text ({} alerts), blocking={}",
5455                                alerts.len(), state.injection_blocking,
5456                            );
5457                            let verdict = if state.injection_blocking {
5458                                Verdict::Deny {
5459                                    reason: format!(
5460                                        "WS non-JSON injection: {} alerts",
5461                                        alerts.len()
5462                                    ),
5463                                }
5464                            } else {
5465                                Verdict::Allow
5466                            };
5467                            let action = Action::new(
5468                                "vellaveto",
5469                                "ws_nonjson_injection_scan",
5470                                json!({ "alerts": alerts.len(), "session": session_id, "transport": "websocket" }),
5471                            );
5472                            // SECURITY (SE-004): Log audit failures instead of silently discarding.
5473                            let text_security_context =
5474                                super::helpers::text_injection_security_context(
5475                                    text_str,
5476                                    state.injection_blocking,
5477                                    "ws_nonjson_injection",
5478                                );
5479                            let envelope = build_secondary_acis_envelope_with_security_context(
5480                                &action,
5481                                &verdict,
5482                                DecisionOrigin::InjectionScanner,
5483                                "websocket",
5484                                Some(&session_id),
5485                                Some(&text_security_context),
5486                            );
5487                            if let Err(e) = state.audit.log_entry_with_acis(
5488                                &action, &verdict,
5489                                json!({ "source": "ws_proxy", "event": "ws_nonjson_injection_alert" }),
5490                                envelope,
5491                            ).await {
5492                                tracing::error!(
5493                                    session_id = %session_id,
5494                                    error = %e,
5495                                    "AUDIT FAILURE: failed to log ws_nonjson_injection_alert"
5496                                );
5497                            }
5498                            if state.injection_blocking {
5499                                continue;
5500                            }
5501                        }
5502                    }
5503                    text.to_string()
5504                };
5505
5506                let mut sink = client_sink.lock().await;
5507                if let Err(e) = sink.send(Message::Text(forward.into())).await {
5508                    tracing::debug!("Failed to send to client: {}", e);
5509                    break;
5510                }
5511            }
5512            tokio_tungstenite::tungstenite::Message::Binary(data) => {
5513                // SECURITY (FIND-R46-WS-002): DLP scanning on upstream binary frames.
5514                // Binary from upstream is unusual for JSON-RPC but must be scanned
5515                // before being dropped, to detect and audit secret exfiltration attempts
5516                // via binary frames.
5517                tracing::warn!(
5518                    session_id = %session_id,
5519                    "Unexpected binary frame from upstream ({} bytes), scanning before drop",
5520                    data.len(),
5521                );
5522
5523                // DLP scan the binary data as UTF-8 lossy
5524                if state.response_dlp_enabled {
5525                    let text_repr = String::from_utf8_lossy(&data);
5526                    if !text_repr.is_empty() {
5527                        let dlp_findings = scan_text_for_secrets(&text_repr, "ws_binary_frame");
5528                        if !dlp_findings.is_empty() {
5529                            for finding in &dlp_findings {
5530                                record_dlp_finding(&finding.pattern_name);
5531                            }
5532                            let patterns: Vec<String> = dlp_findings
5533                                .iter()
5534                                .map(|f| format!("{}:{}", f.pattern_name, f.location))
5535                                .collect();
5536
5537                            tracing::warn!(
5538                                "SECURITY: Secrets in WS upstream binary frame! Session: {}, Findings: {:?}",
5539                                session_id,
5540                                patterns,
5541                            );
5542
5543                            let action = Action::new(
5544                                "vellaveto",
5545                                "ws_binary_dlp_scan",
5546                                json!({
5547                                    "findings": patterns,
5548                                    "session": session_id,
5549                                    "transport": "websocket",
5550                                    "direction": "upstream_to_client",
5551                                    "binary_size": data.len(),
5552                                }),
5553                            );
5554                            let binary_dlp_verdict = Verdict::Deny {
5555                                reason: format!("WS binary frame DLP: {patterns:?}"),
5556                            };
5557                            let text_security_context = super::helpers::text_dlp_security_context(
5558                                &text_repr,
5559                                true,
5560                                "ws_binary_dlp",
5561                            );
5562                            let envelope = build_secondary_acis_envelope_with_security_context(
5563                                &action,
5564                                &binary_dlp_verdict,
5565                                DecisionOrigin::Dlp,
5566                                "websocket",
5567                                Some(&session_id),
5568                                Some(&text_security_context),
5569                            );
5570                            if let Err(e) = state
5571                                .audit
5572                                .log_entry_with_acis(
5573                                    &action,
5574                                    &binary_dlp_verdict,
5575                                    json!({
5576                                        "source": "ws_proxy",
5577                                        "event": "ws_binary_dlp_alert",
5578                                    }),
5579                                    envelope,
5580                                )
5581                                .await
5582                            {
5583                                tracing::warn!("Failed to audit WS binary DLP: {}", e);
5584                            }
5585                        }
5586                    }
5587                }
5588
5589                // SECURITY (FIND-R46-WS-004): Audit log binary frame drop
5590                let action = Action::new(
5591                    "vellaveto",
5592                    "ws_upstream_binary_dropped",
5593                    json!({
5594                        "session": session_id,
5595                        "transport": "websocket",
5596                        "direction": "upstream_to_client",
5597                        "binary_size": data.len(),
5598                    }),
5599                );
5600                let upstream_binary_verdict = Verdict::Deny {
5601                    reason: "Binary frames not supported for JSON-RPC".to_string(),
5602                };
5603                let upstream_binary_security_context =
5604                    super::helpers::protocol_binary_rejection_security_context(
5605                        "ws_upstream_binary_dropped",
5606                    );
5607                let envelope = build_secondary_acis_envelope_with_security_context(
5608                    &action,
5609                    &upstream_binary_verdict,
5610                    DecisionOrigin::PolicyEngine,
5611                    "websocket",
5612                    Some(&session_id),
5613                    Some(&upstream_binary_security_context),
5614                );
5615                if let Err(e) = state
5616                    .audit
5617                    .log_entry_with_acis(
5618                        &action,
5619                        &upstream_binary_verdict,
5620                        json!({
5621                            "source": "ws_proxy",
5622                            "event": "ws_upstream_binary_dropped",
5623                        }),
5624                        envelope,
5625                    )
5626                    .await
5627                {
5628                    tracing::warn!("Failed to audit WS upstream binary drop: {}", e);
5629                }
5630            }
5631            tokio_tungstenite::tungstenite::Message::Ping(data) => {
5632                // Forward ping as pong to upstream (handled by tungstenite)
5633                let _ = data; // tungstenite auto-responds to pings
5634            }
5635            tokio_tungstenite::tungstenite::Message::Pong(_) => {}
5636            tokio_tungstenite::tungstenite::Message::Close(_) => {
5637                tracing::debug!(session_id = %session_id, "Upstream sent close frame");
5638                break;
5639            }
5640            tokio_tungstenite::tungstenite::Message::Frame(_) => {
5641                // Raw frame — ignore
5642            }
5643        }
5644    }
5645}
5646
5647/// Convert an HTTP URL to a WebSocket URL.
5648///
5649/// `http://` → `ws://`, `https://` → `wss://`.
5650///
5651/// SECURITY (FIND-R124-001): Only allows http/https/ws/wss schemes.
5652/// Unknown schemes (ftp://, file://, gopher://) are rejected with a
5653/// warning and fall back to the original URL prefixed with `ws://`
5654/// to maintain fail-closed behavior. This gives parity with scheme
5655/// validation in HTTP and gRPC transports (FIND-R42-015).
5656pub fn convert_to_ws_url(http_url: &str) -> String {
5657    if let Some(rest) = http_url.strip_prefix("https://") {
5658        format!("wss://{rest}")
5659    } else if let Some(rest) = http_url.strip_prefix("http://") {
5660        format!("ws://{rest}")
5661    } else if http_url.starts_with("wss://") || http_url.starts_with("ws://") {
5662        // Already a WebSocket URL — use as-is
5663        http_url.to_string()
5664    } else {
5665        // SECURITY (FIND-R124-001): Reject unknown schemes. Log warning
5666        // and return a URL that will fail to connect safely rather than
5667        // connecting to an unintended scheme (e.g., ftp://, file://).
5668        // SECURITY (FIND-R166-003): Sanitize logged value to prevent log injection
5669        // from URLs with control characters (possible in gateway mode).
5670        tracing::warn!(
5671            "convert_to_ws_url: rejecting URL with unsupported scheme: {}",
5672            vellaveto_types::sanitize_for_log(
5673                http_url.split("://").next().unwrap_or("unknown"),
5674                128,
5675            )
5676        );
5677        // Return invalid URL that will fail at connect_async()
5678        format!("ws://invalid-scheme-rejected.localhost/{}", http_url.len())
5679    }
5680}
5681
5682/// Connect to an upstream WebSocket server.
5683///
5684/// Returns the split WebSocket stream or an error.
5685async fn connect_upstream_ws(
5686    url: &str,
5687) -> Result<
5688    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
5689    String,
5690> {
5691    let connect_timeout = Duration::from_secs(10);
5692    match tokio::time::timeout(connect_timeout, tokio_tungstenite::connect_async(url)).await {
5693        Ok(Ok((ws_stream, _response))) => Ok(ws_stream),
5694        Ok(Err(e)) => Err(format!("WebSocket connection error: {e}")),
5695        Err(_) => Err("WebSocket connection timeout (10s)".to_string()),
5696    }
5697}
5698
5699/// Register output schemas and validate WS response `structuredContent`.
5700///
5701/// Returns true when the response should be blocked.
5702async fn validate_ws_structured_content_response(
5703    json_val: &Value,
5704    state: &ProxyState,
5705    session_id: &str,
5706    tracked_tool_name: Option<&str>,
5707) -> bool {
5708    // Keep WS behavior aligned with HTTP/SSE paths.
5709    state
5710        .output_schema_registry
5711        .register_from_tools_list(json_val);
5712
5713    let Some(result) = json_val.get("result") else {
5714        return false;
5715    };
5716    let Some(structured) = result.get("structuredContent") else {
5717        return false;
5718    };
5719
5720    let meta_tool_name = result
5721        .get("_meta")
5722        .and_then(|m| m.get("tool"))
5723        .and_then(|t| t.as_str());
5724    let tool_name = match (meta_tool_name, tracked_tool_name) {
5725        (Some(meta), Some(tracked)) if !meta.eq_ignore_ascii_case(tracked) => {
5726            tracing::warn!(
5727                "SECURITY: WS structuredContent tool mismatch (meta='{}', tracked='{}'); using tracked tool name",
5728                meta,
5729                tracked
5730            );
5731            tracked
5732        }
5733        (Some(meta), _) => meta,
5734        (None, Some(tracked)) => tracked,
5735        (None, None) => "unknown",
5736    };
5737
5738    match state.output_schema_registry.validate(tool_name, structured) {
5739        ValidationResult::Invalid { violations } => {
5740            tracing::warn!(
5741                "SECURITY: WS structuredContent validation failed for tool '{}': {:?}",
5742                tool_name,
5743                violations
5744            );
5745            let action = Action::new(
5746                "vellaveto",
5747                "output_schema_violation",
5748                json!({
5749                    "tool": tool_name,
5750                    "violations": violations,
5751                    "session": session_id,
5752                    "transport": "websocket",
5753                }),
5754            );
5755            let schema_verdict = Verdict::Deny {
5756                reason: format!("WS structuredContent validation failed: {violations:?}"),
5757            };
5758            let schema_security_context =
5759                super::helpers::output_schema_violation_security_context(Some(tool_name), true);
5760            let envelope = build_secondary_acis_envelope_with_security_context(
5761                &action,
5762                &schema_verdict,
5763                DecisionOrigin::PolicyEngine,
5764                "websocket",
5765                Some(session_id),
5766                Some(&schema_security_context),
5767            );
5768            if let Err(e) = state
5769                .audit
5770                .log_entry_with_acis(
5771                    &action,
5772                    &schema_verdict,
5773                    json!({"source": "ws_proxy", "event": "output_schema_violation_ws"}),
5774                    envelope,
5775                )
5776                .await
5777            {
5778                tracing::warn!("Failed to audit WS output schema violation: {}", e);
5779            }
5780            true
5781        }
5782        ValidationResult::Valid => {
5783            tracing::debug!("WS structuredContent validated for tool '{}'", tool_name);
5784            false
5785        }
5786        ValidationResult::NoSchema => {
5787            tracing::debug!(
5788                "No output schema registered for WS tool '{}', skipping validation",
5789                tool_name
5790            );
5791            false
5792        }
5793    }
5794}
5795
5796// NOTE: build_ws_evaluation_context() was removed in FIND-R130-002 fix.
5797// All callers now build EvaluationContext inline inside the DashMap shard
5798// lock to prevent TOCTOU races on call_counts/action_history.
5799
5800/// Check per-connection rate limit. Returns true if within limit.
5801fn check_rate_limit(
5802    counter: &AtomicU64,
5803    window_start: &std::sync::Mutex<std::time::Instant>,
5804    max_per_sec: u32,
5805) -> bool {
5806    // SECURITY (FIND-R182-006): Fail-closed — zero rate limit blocks all messages.
5807    // Previously returned true (fail-open), which disabled rate limiting entirely.
5808    if max_per_sec == 0 {
5809        return false;
5810    }
5811
5812    let now = std::time::Instant::now();
5813    let mut start = match window_start.lock() {
5814        Ok(guard) => guard,
5815        Err(e) => {
5816            tracing::error!("WS rate limiter mutex poisoned — fail-closed: {}", e);
5817            return false;
5818        }
5819    };
5820
5821    if now.duration_since(*start) >= Duration::from_secs(1) {
5822        // Reset window
5823        *start = now;
5824        // SECURITY (FIND-R55-WS-003): Use SeqCst for security-critical rate limit counter.
5825        counter.store(1, Ordering::SeqCst);
5826        true
5827    } else {
5828        // SECURITY (FIND-R182-003): saturating arithmetic prevents overflow wrap-to-zero.
5829        // SECURITY (FIND-R155-WS-001): Conditional atomic increment — only increment if
5830        // within limit, reject otherwise. This eliminates the TOCTOU gap between
5831        // load()+compare and also prevents counter inflation from rejected requests.
5832        // The closure returns None when limit is reached, causing fetch_update to fail
5833        // without modifying the counter.
5834        let limit = max_per_sec as u64;
5835        match counter.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
5836            if v >= limit {
5837                None // Limit reached — do not increment
5838            } else {
5839                Some(v.saturating_add(1))
5840            }
5841        }) {
5842            Ok(_prev) => true, // Within limit, counter was incremented
5843            Err(_) => false,   // Limit exceeded, counter unchanged
5844        }
5845    }
5846}
5847
5848/// Extract scannable text from a JSON-RPC request for injection scanning.
5849///
5850/// SECURITY (FIND-R46-WS-001): Scans tool call arguments, resource URIs,
5851/// and sampling request content for injection payloads in the client→upstream
5852/// direction. Matches the HTTP proxy's request-side injection scanning.
5853fn extract_scannable_text_from_request(json_val: &Value) -> String {
5854    let mut text_parts = Vec::new();
5855
5856    // SECURITY (FIND-R224-002): Recursively scan the entire `params` subtree,
5857    // not just specific fields. Previous narrow extraction missed injection
5858    // payloads in TaskRequest (params.message), ExtensionMethod, and other
5859    // message types with non-standard parameter structures. This gives parity
5860    // with the HTTP handler's `extract_passthrough_text_for_injection` which
5861    // scans all of params recursively.
5862    if let Some(params) = json_val.get("params") {
5863        extract_strings_recursive(params, &mut text_parts, 0);
5864    }
5865
5866    // SECURITY (FIND-R224-006): Also scan `result` field for injection payloads.
5867    // JSON-RPC response messages (sampling/elicitation replies) carry data in
5868    // `result` rather than `params`. Without this, upstream injection payloads
5869    // in response results bypass WS scanning while being caught by HTTP/gRPC.
5870    if let Some(result) = json_val.get("result") {
5871        extract_strings_recursive(result, &mut text_parts, 0);
5872    }
5873
5874    text_parts.join("\n")
5875}
5876
5877/// Recursively extract string values from a JSON value, with depth and count bounds.
5878///
5879/// SECURITY (FIND-R48-007): Added MAX_PARTS to prevent memory amplification
5880/// from messages containing many short strings.
5881fn extract_strings_recursive(val: &Value, parts: &mut Vec<String>, depth: usize) {
5882    // SECURITY (FIND-R154-005): Use depth 32 matching shared MAX_SCAN_DEPTH
5883    // in scanner_base.rs. Previous limit of 10 allowed injection payloads
5884    // nested between depth 11-32 to evade WS scanning while being caught
5885    // by the stdio relay and DLP scanner (both use MAX_SCAN_DEPTH=32).
5886    const MAX_DEPTH: usize = 32;
5887    const MAX_PARTS: usize = 1000;
5888    if depth > MAX_DEPTH || parts.len() >= MAX_PARTS {
5889        return;
5890    }
5891    match val {
5892        Value::String(s) => parts.push(s.clone()),
5893        Value::Array(arr) => {
5894            for item in arr {
5895                extract_strings_recursive(item, parts, depth + 1);
5896            }
5897        }
5898        Value::Object(map) => {
5899            for (key, v) in map {
5900                // SECURITY (FIND-R154-003): Also scan object keys for injection
5901                // payloads. Parity with stdio relay's traverse_json_strings_with_keys.
5902                // Without this, attackers can hide injection in JSON key names.
5903                if parts.len() < MAX_PARTS {
5904                    parts.push(key.clone());
5905                }
5906                extract_strings_recursive(v, parts, depth + 1);
5907            }
5908        }
5909        _ => {}
5910    }
5911}
5912
5913/// Extract scannable text from a JSON-RPC response for injection scanning.
5914///
5915/// SECURITY (FIND-R130-004): Delegates to the shared `extract_text_from_result()`
5916/// which covers `resource.text`, `resource.blob` (base64-decoded), `annotations`,
5917/// and `_meta` — all missing from the previous WS-only implementation.
5918fn extract_scannable_text(json_val: &Value) -> String {
5919    let mut text_parts = Vec::new();
5920
5921    // Scan result via shared extraction (covers content[].text, resource.text,
5922    // resource.blob, annotations, instructionsForUser, structuredContent, _meta).
5923    if let Some(result) = json_val.get("result") {
5924        let result_text = super::inspection::extract_text_from_result(result);
5925        if !result_text.is_empty() {
5926            text_parts.push(result_text);
5927        }
5928    }
5929
5930    // Scan error messages (not covered by extract_text_from_result)
5931    if let Some(error) = json_val.get("error") {
5932        if let Some(msg) = error.get("message").and_then(|m| m.as_str()) {
5933            text_parts.push(msg.to_string());
5934        }
5935        // SECURITY (FIND-R168-005): Use as_str() first to avoid wrapping
5936        // string values in JSON quotes. Parity with scanner_base.rs.
5937        if let Some(data) = error.get("data") {
5938            if let Some(s) = data.as_str() {
5939                text_parts.push(s.to_string());
5940            } else {
5941                text_parts.push(data.to_string());
5942            }
5943        }
5944    }
5945
5946    text_parts.join("\n")
5947}
5948
5949/// Test-only compatibility helper for approval-store assertions.
5950#[cfg(test)]
5951async fn create_ws_approval(
5952    state: &ProxyState,
5953    session_id: &str,
5954    action: &Action,
5955    reason: &str,
5956) -> Option<String> {
5957    let store = state.approval_store.as_ref()?;
5958    let requested_by = state.sessions.get_mut(session_id).and_then(|session| {
5959        session
5960            .agent_identity
5961            .as_ref()
5962            .and_then(|identity| identity.subject.clone())
5963            .or_else(|| session.oauth_subject.clone())
5964    });
5965    match store
5966        .create(
5967            action.clone(),
5968            reason.to_string(),
5969            requested_by,
5970            Some(session_id.to_string()),
5971            Some(vellaveto_engine::acis::fingerprint_action(action)),
5972        )
5973        .await
5974    {
5975        Ok(id) => Some(id),
5976        Err(e) => {
5977            tracing::error!(
5978                session_id = %session_id,
5979                "Failed to create WebSocket approval (fail-closed): {}",
5980                e
5981            );
5982            None
5983        }
5984    }
5985}
5986
5987/// Build a JSON-RPC error response string for WebSocket with optional `error.data`.
5988fn make_ws_error_response_with_data(
5989    id: Option<&Value>,
5990    code: i64,
5991    message: &str,
5992    data: Option<Value>,
5993) -> String {
5994    let mut error = serde_json::Map::new();
5995    error.insert("code".to_string(), Value::from(code));
5996    error.insert("message".to_string(), Value::from(message));
5997    if let Some(data) = data {
5998        error.insert("data".to_string(), data);
5999    }
6000    let response = json!({
6001        "jsonrpc": "2.0",
6002        "id": id.cloned().unwrap_or(Value::Null),
6003        "error": Value::Object(error),
6004    });
6005    serde_json::to_string(&response).unwrap_or_else(|_| {
6006        format!(r#"{{"jsonrpc":"2.0","error":{{"code":{code},"message":"{message}"}},"id":null}}"#)
6007    })
6008}
6009
6010/// Build a JSON-RPC error response string for WebSocket.
6011fn make_ws_error_response(id: Option<&Value>, code: i64, message: &str) -> String {
6012    make_ws_error_response_with_data(id, code, message, None)
6013}
6014
6015#[cfg(test)]
6016mod tests;